diff options
Diffstat (limited to 'bitbake/lib/hashserv/client.py')
-rw-r--r-- | bitbake/lib/hashserv/client.py | 489 |
1 files changed, 322 insertions, 167 deletions
diff --git a/bitbake/lib/hashserv/client.py b/bitbake/lib/hashserv/client.py index a29af836d9..0b254beddd 100644 --- a/bitbake/lib/hashserv/client.py +++ b/bitbake/lib/hashserv/client.py @@ -3,189 +3,344 @@ # SPDX-License-Identifier: GPL-2.0-only # -import json import logging import socket -import os -from . import chunkify, DEFAULT_MAX_CHUNK - - -logger = logging.getLogger('hashserv.client') +import bb.asyncrpc +import json +from . import create_async_client -class HashConnectionError(Exception): - pass +logger = logging.getLogger("hashserv.client") -class Client(object): +class AsyncClient(bb.asyncrpc.AsyncClient): MODE_NORMAL = 0 MODE_GET_STREAM = 1 + MODE_EXIST_STREAM = 2 - def __init__(self): - self._socket = None - self.reader = None - self.writer = None + def __init__(self, username=None, password=None): + super().__init__("OEHASHEQUIV", "1.1", logger) self.mode = self.MODE_NORMAL - self.max_chunk = DEFAULT_MAX_CHUNK - - def connect_tcp(self, address, port): - def connect_sock(): - s = socket.create_connection((address, port)) - - s.setsockopt(socket.SOL_TCP, socket.TCP_NODELAY, 1) - s.setsockopt(socket.SOL_TCP, socket.TCP_QUICKACK, 1) - s.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1) - return s - - self._connect_sock = connect_sock - - def connect_unix(self, path): - def connect_sock(): - s = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) - # AF_UNIX has path length issues so chdir here to workaround - cwd = os.getcwd() - try: - os.chdir(os.path.dirname(path)) - s.connect(os.path.basename(path)) - finally: - os.chdir(cwd) - return s - - self._connect_sock = connect_sock - - def connect(self): - if self._socket is None: - self._socket = self._connect_sock() - - self.reader = self._socket.makefile('r', encoding='utf-8') - self.writer = self._socket.makefile('w', encoding='utf-8') - - self.writer.write('OEHASHEQUIV 1.1\n\n') - self.writer.flush() - - # Restore mode if the socket is being re-created - cur_mode = self.mode - self.mode = self.MODE_NORMAL - self._set_mode(cur_mode) - - return self._socket - - def close(self): - if self._socket is not None: - self._socket.close() - self._socket = None - self.reader = None - self.writer = None - - def _send_wrapper(self, proc): - count = 0 - while True: - try: - self.connect() - return proc() - except (OSError, HashConnectionError, json.JSONDecodeError, UnicodeDecodeError) as e: - logger.warning('Error talking to server: %s' % e) - if count >= 3: - if not isinstance(e, HashConnectionError): - raise HashConnectionError(str(e)) - raise e - self.close() - count += 1 - - def send_message(self, msg): - def get_line(): - line = self.reader.readline() - if not line: - raise HashConnectionError('Connection closed') - - if not line.endswith('\n'): - raise HashConnectionError('Bad message %r' % message) - - return line - - def proc(): - for c in chunkify(json.dumps(msg), self.max_chunk): - self.writer.write(c) - self.writer.flush() - - l = get_line() - - m = json.loads(l) - if 'chunk-stream' in m: - lines = [] - while True: - l = get_line().rstrip('\n') - if not l: - break - lines.append(l) - - m = json.loads(''.join(lines)) - - return m - - return self._send_wrapper(proc) - - def send_stream(self, msg): - def proc(): - self.writer.write("%s\n" % msg) - self.writer.flush() - l = self.reader.readline() - if not l: - raise HashConnectionError('Connection closed') - return l.rstrip() - - return self._send_wrapper(proc) - - def _set_mode(self, new_mode): - if new_mode == self.MODE_NORMAL and self.mode == self.MODE_GET_STREAM: - r = self.send_stream('END') - if r != 'ok': - raise HashConnectionError('Bad response from server %r' % r) - elif new_mode == self.MODE_GET_STREAM and self.mode == self.MODE_NORMAL: - r = self.send_message({'get-stream': None}) - if r != 'ok': - raise HashConnectionError('Bad response from server %r' % r) - elif new_mode != self.mode: - raise Exception('Undefined mode transition %r -> %r' % (self.mode, new_mode)) + self.username = username + self.password = password + self.saved_become_user = None + + async def setup_connection(self): + await super().setup_connection() + self.mode = self.MODE_NORMAL + if self.username: + # Save off become user temporarily because auth() resets it + become = self.saved_become_user + await self.auth(self.username, self.password) + + if become: + await self.become_user(become) + + async def send_stream(self, mode, msg): + async def proc(): + await self._set_mode(mode) + await self.socket.send(msg) + return await self.socket.recv() + + return await self._send_wrapper(proc) + + async def invoke(self, *args, **kwargs): + # It's OK if connection errors cause a failure here, because the mode + # is also reset to normal on a new connection + await self._set_mode(self.MODE_NORMAL) + return await super().invoke(*args, **kwargs) + + async def _set_mode(self, new_mode): + async def stream_to_normal(): + await self.socket.send("END") + return await self.socket.recv() + + async def normal_to_stream(command): + r = await self.invoke({command: None}) + if r != "ok": + raise ConnectionError( + f"Unable to transition to stream mode: Bad response from server {r!r}" + ) + + self.logger.debug("Mode is now %s", command) + + if new_mode == self.mode: + return + + self.logger.debug("Transitioning mode %s -> %s", self.mode, new_mode) + + # Always transition to normal mode before switching to any other mode + if self.mode != self.MODE_NORMAL: + r = await self._send_wrapper(stream_to_normal) + if r != "ok": + self.check_invoke_error(r) + raise ConnectionError( + f"Unable to transition to normal mode: Bad response from server {r!r}" + ) + self.logger.debug("Mode is now normal") + + if new_mode == self.MODE_GET_STREAM: + await normal_to_stream("get-stream") + elif new_mode == self.MODE_EXIST_STREAM: + await normal_to_stream("exists-stream") + elif new_mode != self.MODE_NORMAL: + raise Exception("Undefined mode transition {self.mode!r} -> {new_mode!r}") self.mode = new_mode - def get_unihash(self, method, taskhash): - self._set_mode(self.MODE_GET_STREAM) - r = self.send_stream('%s %s' % (method, taskhash)) + async def get_unihash(self, method, taskhash): + r = await self.send_stream(self.MODE_GET_STREAM, "%s %s" % (method, taskhash)) if not r: return None return r - def report_unihash(self, taskhash, method, outhash, unihash, extra={}): - self._set_mode(self.MODE_NORMAL) + async def report_unihash(self, taskhash, method, outhash, unihash, extra={}): m = extra.copy() - m['taskhash'] = taskhash - m['method'] = method - m['outhash'] = outhash - m['unihash'] = unihash - return self.send_message({'report': m}) - - def report_unihash_equiv(self, taskhash, method, unihash, extra={}): - self._set_mode(self.MODE_NORMAL) + m["taskhash"] = taskhash + m["method"] = method + m["outhash"] = outhash + m["unihash"] = unihash + return await self.invoke({"report": m}) + + async def report_unihash_equiv(self, taskhash, method, unihash, extra={}): m = extra.copy() - m['taskhash'] = taskhash - m['method'] = method - m['unihash'] = unihash - return self.send_message({'report-equiv': m}) - - def get_taskhash(self, method, taskhash, all_properties=False): - self._set_mode(self.MODE_NORMAL) - return self.send_message({'get': { - 'taskhash': taskhash, - 'method': method, - 'all': all_properties - }}) - - def get_stats(self): - self._set_mode(self.MODE_NORMAL) - return self.send_message({'get-stats': None}) - - def reset_stats(self): - self._set_mode(self.MODE_NORMAL) - return self.send_message({'reset-stats': None}) + m["taskhash"] = taskhash + m["method"] = method + m["unihash"] = unihash + return await self.invoke({"report-equiv": m}) + + async def get_taskhash(self, method, taskhash, all_properties=False): + return await self.invoke( + {"get": {"taskhash": taskhash, "method": method, "all": all_properties}} + ) + + async def unihash_exists(self, unihash): + r = await self.send_stream(self.MODE_EXIST_STREAM, unihash) + return r == "true" + + async def get_outhash(self, method, outhash, taskhash, with_unihash=True): + return await self.invoke( + { + "get-outhash": { + "outhash": outhash, + "taskhash": taskhash, + "method": method, + "with_unihash": with_unihash, + } + } + ) + + async def get_stats(self): + return await self.invoke({"get-stats": None}) + + async def reset_stats(self): + return await self.invoke({"reset-stats": None}) + + async def backfill_wait(self): + return (await self.invoke({"backfill-wait": None}))["tasks"] + + async def remove(self, where): + return await self.invoke({"remove": {"where": where}}) + + async def clean_unused(self, max_age): + return await self.invoke({"clean-unused": {"max_age_seconds": max_age}}) + + async def auth(self, username, token): + result = await self.invoke({"auth": {"username": username, "token": token}}) + self.username = username + self.password = token + self.saved_become_user = None + return result + + async def refresh_token(self, username=None): + m = {} + if username: + m["username"] = username + result = await self.invoke({"refresh-token": m}) + if ( + self.username + and not self.saved_become_user + and result["username"] == self.username + ): + self.password = result["token"] + return result + + async def set_user_perms(self, username, permissions): + return await self.invoke( + {"set-user-perms": {"username": username, "permissions": permissions}} + ) + + async def get_user(self, username=None): + m = {} + if username: + m["username"] = username + return await self.invoke({"get-user": m}) + + async def get_all_users(self): + return (await self.invoke({"get-all-users": {}}))["users"] + + async def new_user(self, username, permissions): + return await self.invoke( + {"new-user": {"username": username, "permissions": permissions}} + ) + + async def delete_user(self, username): + return await self.invoke({"delete-user": {"username": username}}) + + async def become_user(self, username): + result = await self.invoke({"become-user": {"username": username}}) + if username == self.username: + self.saved_become_user = None + else: + self.saved_become_user = username + return result + + async def get_db_usage(self): + return (await self.invoke({"get-db-usage": {}}))["usage"] + + async def get_db_query_columns(self): + return (await self.invoke({"get-db-query-columns": {}}))["columns"] + + async def gc_status(self): + return await self.invoke({"gc-status": {}}) + + async def gc_mark(self, mark, where): + """ + Starts a new garbage collection operation identified by "mark". If + garbage collection is already in progress with "mark", the collection + is continued. + + All unihash entries that match the "where" clause are marked to be + kept. In addition, any new entries added to the database after this + command will be automatically marked with "mark" + """ + return await self.invoke({"gc-mark": {"mark": mark, "where": where}}) + + async def gc_sweep(self, mark): + """ + Finishes garbage collection for "mark". All unihash entries that have + not been marked will be deleted. + + It is recommended to clean unused outhash entries after running this to + cleanup any dangling outhashes + """ + return await self.invoke({"gc-sweep": {"mark": mark}}) + + +class Client(bb.asyncrpc.Client): + def __init__(self, username=None, password=None): + self.username = username + self.password = password + + super().__init__() + self._add_methods( + "connect_tcp", + "connect_websocket", + "get_unihash", + "report_unihash", + "report_unihash_equiv", + "get_taskhash", + "unihash_exists", + "get_outhash", + "get_stats", + "reset_stats", + "backfill_wait", + "remove", + "clean_unused", + "auth", + "refresh_token", + "set_user_perms", + "get_user", + "get_all_users", + "new_user", + "delete_user", + "become_user", + "get_db_usage", + "get_db_query_columns", + "gc_status", + "gc_mark", + "gc_sweep", + ) + + def _get_async_client(self): + return AsyncClient(self.username, self.password) + + +class ClientPool(bb.asyncrpc.ClientPool): + def __init__( + self, + address, + max_clients, + *, + username=None, + password=None, + become=None, + ): + super().__init__(max_clients) + self.address = address + self.username = username + self.password = password + self.become = become + + async def _new_client(self): + client = await create_async_client( + self.address, + username=self.username, + password=self.password, + ) + if self.become: + await client.become_user(self.become) + return client + + def _run_key_tasks(self, queries, call): + results = {key: None for key in queries.keys()} + + def make_task(key, args): + async def task(client): + nonlocal results + unihash = await call(client, args) + results[key] = unihash + + return task + + def gen_tasks(): + for key, args in queries.items(): + yield make_task(key, args) + + self.run_tasks(gen_tasks()) + return results + + def get_unihashes(self, queries): + """ + Query multiple unihashes in parallel. + + The queries argument is a dictionary with arbitrary key. The values + must be a tuple of (method, taskhash). + + Returns a dictionary with a corresponding key for each input key, and + the value is the queried unihash (which might be none if the query + failed) + """ + + async def call(client, args): + method, taskhash = args + return await client.get_unihash(method, taskhash) + + return self._run_key_tasks(queries, call) + + def unihashes_exist(self, queries): + """ + Query multiple unihash existence checks in parallel. + + The queries argument is a dictionary with arbitrary key. The values + must be a unihash. + + Returns a dictionary with a corresponding key for each input key, and + the value is True or False if the unihash is known by the server (or + None if there was a failure) + """ + + async def call(client, unihash): + return await client.unihash_exists(unihash) + + return self._run_key_tasks(queries, call) |