diff options
Diffstat (limited to 'bitbake/lib/hashserv/__init__.py')
-rw-r--r-- | bitbake/lib/hashserv/__init__.py | 177 |
1 files changed, 75 insertions, 102 deletions
diff --git a/bitbake/lib/hashserv/__init__.py b/bitbake/lib/hashserv/__init__.py index 5f2e101e52..74367eb6b4 100644 --- a/bitbake/lib/hashserv/__init__.py +++ b/bitbake/lib/hashserv/__init__.py @@ -5,129 +5,102 @@ import asyncio from contextlib import closing -import re -import sqlite3 import itertools import json +from collections import namedtuple +from urllib.parse import urlparse +from bb.asyncrpc.client import parse_address, ADDR_TYPE_UNIX, ADDR_TYPE_WS + +User = namedtuple("User", ("username", "permissions")) + +def create_server( + addr, + dbname, + *, + sync=True, + upstream=None, + read_only=False, + db_username=None, + db_password=None, + anon_perms=None, + admin_username=None, + admin_password=None, +): + def sqlite_engine(): + from .sqlite import DatabaseEngine + + return DatabaseEngine(dbname, sync) + + def sqlalchemy_engine(): + from .sqlalchemy import DatabaseEngine + + return DatabaseEngine(dbname, db_username, db_password) -UNIX_PREFIX = "unix://" - -ADDR_TYPE_UNIX = 0 -ADDR_TYPE_TCP = 1 - -# The Python async server defaults to a 64K receive buffer, so we hardcode our -# maximum chunk size. It would be better if the client and server reported to -# each other what the maximum chunk sizes were, but that will slow down the -# connection setup with a round trip delay so I'd rather not do that unless it -# is necessary -DEFAULT_MAX_CHUNK = 32 * 1024 - -TABLE_DEFINITION = ( - ("method", "TEXT NOT NULL"), - ("outhash", "TEXT NOT NULL"), - ("taskhash", "TEXT NOT NULL"), - ("unihash", "TEXT NOT NULL"), - ("created", "DATETIME"), - - # Optional fields - ("owner", "TEXT"), - ("PN", "TEXT"), - ("PV", "TEXT"), - ("PR", "TEXT"), - ("task", "TEXT"), - ("outhash_siginfo", "TEXT"), -) - -TABLE_COLUMNS = tuple(name for name, _ in TABLE_DEFINITION) - -def setup_database(database, sync=True): - db = sqlite3.connect(database) - db.row_factory = sqlite3.Row - - with closing(db.cursor()) as cursor: - cursor.execute(''' - CREATE TABLE IF NOT EXISTS tasks_v2 ( - id INTEGER PRIMARY KEY AUTOINCREMENT, - %s - UNIQUE(method, outhash, taskhash) - ) - ''' % " ".join("%s %s," % (name, typ) for name, typ in TABLE_DEFINITION)) - cursor.execute('PRAGMA journal_mode = WAL') - cursor.execute('PRAGMA synchronous = %s' % ('NORMAL' if sync else 'OFF')) - - # Drop old indexes - cursor.execute('DROP INDEX IF EXISTS taskhash_lookup') - cursor.execute('DROP INDEX IF EXISTS outhash_lookup') - - # Create new indexes - cursor.execute('CREATE INDEX IF NOT EXISTS taskhash_lookup_v2 ON tasks_v2 (method, taskhash, created)') - cursor.execute('CREATE INDEX IF NOT EXISTS outhash_lookup_v2 ON tasks_v2 (method, outhash)') - - return db - - -def parse_address(addr): - if addr.startswith(UNIX_PREFIX): - return (ADDR_TYPE_UNIX, (addr[len(UNIX_PREFIX):],)) - else: - m = re.match(r'\[(?P<host>[^\]]*)\]:(?P<port>\d+)$', addr) - if m is not None: - host = m.group('host') - port = m.group('port') - else: - host, port = addr.split(':') - - return (ADDR_TYPE_TCP, (host, int(port))) - + from . import server -def chunkify(msg, max_chunk): - if len(msg) < max_chunk - 1: - yield ''.join((msg, "\n")) + if "://" in dbname: + db_engine = sqlalchemy_engine() else: - yield ''.join((json.dumps({ - 'chunk-stream': None - }), "\n")) + db_engine = sqlite_engine() - args = [iter(msg)] * (max_chunk - 1) - for m in map(''.join, itertools.zip_longest(*args, fillvalue='')): - yield ''.join(itertools.chain(m, "\n")) - yield "\n" + if anon_perms is None: + anon_perms = server.DEFAULT_ANON_PERMS - -def create_server(addr, dbname, *, sync=True, upstream=None, read_only=False): - from . import server - db = setup_database(dbname, sync=sync) - s = server.Server(db, upstream=upstream, read_only=read_only) + s = server.Server( + db_engine, + upstream=upstream, + read_only=read_only, + anon_perms=anon_perms, + admin_username=admin_username, + admin_password=admin_password, + ) (typ, a) = parse_address(addr) if typ == ADDR_TYPE_UNIX: s.start_unix_server(*a) + elif typ == ADDR_TYPE_WS: + url = urlparse(a[0]) + s.start_websocket_server(url.hostname, url.port) else: s.start_tcp_server(*a) return s -def create_client(addr): +def create_client(addr, username=None, password=None): from . import client - c = client.Client() - (typ, a) = parse_address(addr) - if typ == ADDR_TYPE_UNIX: - c.connect_unix(*a) - else: - c.connect_tcp(*a) + c = client.Client(username, password) + + try: + (typ, a) = parse_address(addr) + if typ == ADDR_TYPE_UNIX: + c.connect_unix(*a) + elif typ == ADDR_TYPE_WS: + c.connect_websocket(*a) + else: + c.connect_tcp(*a) + return c + except Exception as e: + c.close() + raise e - return c -async def create_async_client(addr): +async def create_async_client(addr, username=None, password=None): from . import client - c = client.AsyncClient() - (typ, a) = parse_address(addr) - if typ == ADDR_TYPE_UNIX: - await c.connect_unix(*a) - else: - await c.connect_tcp(*a) + c = client.AsyncClient(username, password) + + try: + (typ, a) = parse_address(addr) + if typ == ADDR_TYPE_UNIX: + await c.connect_unix(*a) + elif typ == ADDR_TYPE_WS: + await c.connect_websocket(*a) + else: + await c.connect_tcp(*a) - return c + return c + except Exception as e: + await c.close() + raise e |