summaryrefslogtreecommitdiffstats
path: root/bitbake/lib/hashserv/__init__.py
diff options
context:
space:
mode:
Diffstat (limited to 'bitbake/lib/hashserv/__init__.py')
-rw-r--r--bitbake/lib/hashserv/__init__.py169
1 files changed, 80 insertions, 89 deletions
diff --git a/bitbake/lib/hashserv/__init__.py b/bitbake/lib/hashserv/__init__.py
index f95e8f43f1..74367eb6b4 100644
--- a/bitbake/lib/hashserv/__init__.py
+++ b/bitbake/lib/hashserv/__init__.py
@@ -3,113 +3,104 @@
# SPDX-License-Identifier: GPL-2.0-only
#
+import asyncio
from contextlib import closing
-import re
-import sqlite3
import itertools
import json
+from collections import namedtuple
+from urllib.parse import urlparse
+from bb.asyncrpc.client import parse_address, ADDR_TYPE_UNIX, ADDR_TYPE_WS
+
+User = namedtuple("User", ("username", "permissions"))
+
+def create_server(
+ addr,
+ dbname,
+ *,
+ sync=True,
+ upstream=None,
+ read_only=False,
+ db_username=None,
+ db_password=None,
+ anon_perms=None,
+ admin_username=None,
+ admin_password=None,
+):
+ def sqlite_engine():
+ from .sqlite import DatabaseEngine
+
+ return DatabaseEngine(dbname, sync)
+
+ def sqlalchemy_engine():
+ from .sqlalchemy import DatabaseEngine
+
+ return DatabaseEngine(dbname, db_username, db_password)
-UNIX_PREFIX = "unix://"
-
-ADDR_TYPE_UNIX = 0
-ADDR_TYPE_TCP = 1
-
-# The Python async server defaults to a 64K receive buffer, so we hardcode our
-# maximum chunk size. It would be better if the client and server reported to
-# each other what the maximum chunk sizes were, but that will slow down the
-# connection setup with a round trip delay so I'd rather not do that unless it
-# is necessary
-DEFAULT_MAX_CHUNK = 32 * 1024
-
-def setup_database(database, sync=True):
- db = sqlite3.connect(database)
- db.row_factory = sqlite3.Row
-
- with closing(db.cursor()) as cursor:
- cursor.execute('''
- CREATE TABLE IF NOT EXISTS tasks_v2 (
- id INTEGER PRIMARY KEY AUTOINCREMENT,
- method TEXT NOT NULL,
- outhash TEXT NOT NULL,
- taskhash TEXT NOT NULL,
- unihash TEXT NOT NULL,
- created DATETIME,
-
- -- Optional fields
- owner TEXT,
- PN TEXT,
- PV TEXT,
- PR TEXT,
- task TEXT,
- outhash_siginfo TEXT,
-
- UNIQUE(method, outhash, taskhash)
- )
- ''')
- cursor.execute('PRAGMA journal_mode = WAL')
- cursor.execute('PRAGMA synchronous = %s' % ('NORMAL' if sync else 'OFF'))
-
- # Drop old indexes
- cursor.execute('DROP INDEX IF EXISTS taskhash_lookup')
- cursor.execute('DROP INDEX IF EXISTS outhash_lookup')
-
- # Create new indexes
- cursor.execute('CREATE INDEX IF NOT EXISTS taskhash_lookup_v2 ON tasks_v2 (method, taskhash, created)')
- cursor.execute('CREATE INDEX IF NOT EXISTS outhash_lookup_v2 ON tasks_v2 (method, outhash)')
-
- return db
-
-
-def parse_address(addr):
- if addr.startswith(UNIX_PREFIX):
- return (ADDR_TYPE_UNIX, (addr[len(UNIX_PREFIX):],))
- else:
- m = re.match(r'\[(?P<host>[^\]]*)\]:(?P<port>\d+)$', addr)
- if m is not None:
- host = m.group('host')
- port = m.group('port')
- else:
- host, port = addr.split(':')
-
- return (ADDR_TYPE_TCP, (host, int(port)))
-
+ from . import server
-def chunkify(msg, max_chunk):
- if len(msg) < max_chunk - 1:
- yield ''.join((msg, "\n"))
+ if "://" in dbname:
+ db_engine = sqlalchemy_engine()
else:
- yield ''.join((json.dumps({
- 'chunk-stream': None
- }), "\n"))
+ db_engine = sqlite_engine()
- args = [iter(msg)] * (max_chunk - 1)
- for m in map(''.join, itertools.zip_longest(*args, fillvalue='')):
- yield ''.join(itertools.chain(m, "\n"))
- yield "\n"
+ if anon_perms is None:
+ anon_perms = server.DEFAULT_ANON_PERMS
-
-def create_server(addr, dbname, *, sync=True):
- from . import server
- db = setup_database(dbname, sync=sync)
- s = server.Server(db)
+ s = server.Server(
+ db_engine,
+ upstream=upstream,
+ read_only=read_only,
+ anon_perms=anon_perms,
+ admin_username=admin_username,
+ admin_password=admin_password,
+ )
(typ, a) = parse_address(addr)
if typ == ADDR_TYPE_UNIX:
s.start_unix_server(*a)
+ elif typ == ADDR_TYPE_WS:
+ url = urlparse(a[0])
+ s.start_websocket_server(url.hostname, url.port)
else:
s.start_tcp_server(*a)
return s
-def create_client(addr):
+def create_client(addr, username=None, password=None):
from . import client
- c = client.Client()
- (typ, a) = parse_address(addr)
- if typ == ADDR_TYPE_UNIX:
- c.connect_unix(*a)
- else:
- c.connect_tcp(*a)
+ c = client.Client(username, password)
+
+ try:
+ (typ, a) = parse_address(addr)
+ if typ == ADDR_TYPE_UNIX:
+ c.connect_unix(*a)
+ elif typ == ADDR_TYPE_WS:
+ c.connect_websocket(*a)
+ else:
+ c.connect_tcp(*a)
+ return c
+ except Exception as e:
+ c.close()
+ raise e
+
+
+async def create_async_client(addr, username=None, password=None):
+ from . import client
+
+ c = client.AsyncClient(username, password)
+
+ try:
+ (typ, a) = parse_address(addr)
+ if typ == ADDR_TYPE_UNIX:
+ await c.connect_unix(*a)
+ elif typ == ADDR_TYPE_WS:
+ await c.connect_websocket(*a)
+ else:
+ await c.connect_tcp(*a)
- return c
+ return c
+ except Exception as e:
+ await c.close()
+ raise e