summaryrefslogtreecommitdiffstats
path: root/bitbake/lib/hashserv/__init__.py
diff options
context:
space:
mode:
Diffstat (limited to 'bitbake/lib/hashserv/__init__.py')
-rw-r--r--bitbake/lib/hashserv/__init__.py177
1 files changed, 75 insertions, 102 deletions
diff --git a/bitbake/lib/hashserv/__init__.py b/bitbake/lib/hashserv/__init__.py
index 5f2e101e52..74367eb6b4 100644
--- a/bitbake/lib/hashserv/__init__.py
+++ b/bitbake/lib/hashserv/__init__.py
@@ -5,129 +5,102 @@
import asyncio
from contextlib import closing
-import re
-import sqlite3
import itertools
import json
+from collections import namedtuple
+from urllib.parse import urlparse
+from bb.asyncrpc.client import parse_address, ADDR_TYPE_UNIX, ADDR_TYPE_WS
+
+User = namedtuple("User", ("username", "permissions"))
+
+def create_server(
+ addr,
+ dbname,
+ *,
+ sync=True,
+ upstream=None,
+ read_only=False,
+ db_username=None,
+ db_password=None,
+ anon_perms=None,
+ admin_username=None,
+ admin_password=None,
+):
+ def sqlite_engine():
+ from .sqlite import DatabaseEngine
+
+ return DatabaseEngine(dbname, sync)
+
+ def sqlalchemy_engine():
+ from .sqlalchemy import DatabaseEngine
+
+ return DatabaseEngine(dbname, db_username, db_password)
-UNIX_PREFIX = "unix://"
-
-ADDR_TYPE_UNIX = 0
-ADDR_TYPE_TCP = 1
-
-# The Python async server defaults to a 64K receive buffer, so we hardcode our
-# maximum chunk size. It would be better if the client and server reported to
-# each other what the maximum chunk sizes were, but that will slow down the
-# connection setup with a round trip delay so I'd rather not do that unless it
-# is necessary
-DEFAULT_MAX_CHUNK = 32 * 1024
-
-TABLE_DEFINITION = (
- ("method", "TEXT NOT NULL"),
- ("outhash", "TEXT NOT NULL"),
- ("taskhash", "TEXT NOT NULL"),
- ("unihash", "TEXT NOT NULL"),
- ("created", "DATETIME"),
-
- # Optional fields
- ("owner", "TEXT"),
- ("PN", "TEXT"),
- ("PV", "TEXT"),
- ("PR", "TEXT"),
- ("task", "TEXT"),
- ("outhash_siginfo", "TEXT"),
-)
-
-TABLE_COLUMNS = tuple(name for name, _ in TABLE_DEFINITION)
-
-def setup_database(database, sync=True):
- db = sqlite3.connect(database)
- db.row_factory = sqlite3.Row
-
- with closing(db.cursor()) as cursor:
- cursor.execute('''
- CREATE TABLE IF NOT EXISTS tasks_v2 (
- id INTEGER PRIMARY KEY AUTOINCREMENT,
- %s
- UNIQUE(method, outhash, taskhash)
- )
- ''' % " ".join("%s %s," % (name, typ) for name, typ in TABLE_DEFINITION))
- cursor.execute('PRAGMA journal_mode = WAL')
- cursor.execute('PRAGMA synchronous = %s' % ('NORMAL' if sync else 'OFF'))
-
- # Drop old indexes
- cursor.execute('DROP INDEX IF EXISTS taskhash_lookup')
- cursor.execute('DROP INDEX IF EXISTS outhash_lookup')
-
- # Create new indexes
- cursor.execute('CREATE INDEX IF NOT EXISTS taskhash_lookup_v2 ON tasks_v2 (method, taskhash, created)')
- cursor.execute('CREATE INDEX IF NOT EXISTS outhash_lookup_v2 ON tasks_v2 (method, outhash)')
-
- return db
-
-
-def parse_address(addr):
- if addr.startswith(UNIX_PREFIX):
- return (ADDR_TYPE_UNIX, (addr[len(UNIX_PREFIX):],))
- else:
- m = re.match(r'\[(?P<host>[^\]]*)\]:(?P<port>\d+)$', addr)
- if m is not None:
- host = m.group('host')
- port = m.group('port')
- else:
- host, port = addr.split(':')
-
- return (ADDR_TYPE_TCP, (host, int(port)))
-
+ from . import server
-def chunkify(msg, max_chunk):
- if len(msg) < max_chunk - 1:
- yield ''.join((msg, "\n"))
+ if "://" in dbname:
+ db_engine = sqlalchemy_engine()
else:
- yield ''.join((json.dumps({
- 'chunk-stream': None
- }), "\n"))
+ db_engine = sqlite_engine()
- args = [iter(msg)] * (max_chunk - 1)
- for m in map(''.join, itertools.zip_longest(*args, fillvalue='')):
- yield ''.join(itertools.chain(m, "\n"))
- yield "\n"
+ if anon_perms is None:
+ anon_perms = server.DEFAULT_ANON_PERMS
-
-def create_server(addr, dbname, *, sync=True, upstream=None, read_only=False):
- from . import server
- db = setup_database(dbname, sync=sync)
- s = server.Server(db, upstream=upstream, read_only=read_only)
+ s = server.Server(
+ db_engine,
+ upstream=upstream,
+ read_only=read_only,
+ anon_perms=anon_perms,
+ admin_username=admin_username,
+ admin_password=admin_password,
+ )
(typ, a) = parse_address(addr)
if typ == ADDR_TYPE_UNIX:
s.start_unix_server(*a)
+ elif typ == ADDR_TYPE_WS:
+ url = urlparse(a[0])
+ s.start_websocket_server(url.hostname, url.port)
else:
s.start_tcp_server(*a)
return s
-def create_client(addr):
+def create_client(addr, username=None, password=None):
from . import client
- c = client.Client()
- (typ, a) = parse_address(addr)
- if typ == ADDR_TYPE_UNIX:
- c.connect_unix(*a)
- else:
- c.connect_tcp(*a)
+ c = client.Client(username, password)
+
+ try:
+ (typ, a) = parse_address(addr)
+ if typ == ADDR_TYPE_UNIX:
+ c.connect_unix(*a)
+ elif typ == ADDR_TYPE_WS:
+ c.connect_websocket(*a)
+ else:
+ c.connect_tcp(*a)
+ return c
+ except Exception as e:
+ c.close()
+ raise e
- return c
-async def create_async_client(addr):
+async def create_async_client(addr, username=None, password=None):
from . import client
- c = client.AsyncClient()
- (typ, a) = parse_address(addr)
- if typ == ADDR_TYPE_UNIX:
- await c.connect_unix(*a)
- else:
- await c.connect_tcp(*a)
+ c = client.AsyncClient(username, password)
+
+ try:
+ (typ, a) = parse_address(addr)
+ if typ == ADDR_TYPE_UNIX:
+ await c.connect_unix(*a)
+ elif typ == ADDR_TYPE_WS:
+ await c.connect_websocket(*a)
+ else:
+ await c.connect_tcp(*a)
- return c
+ return c
+ except Exception as e:
+ await c.close()
+ raise e