diff options
-rw-r--r-- | bitbake/lib/hashserv/__init__.py | 66 | ||||
-rw-r--r-- | bitbake/lib/hashserv/client.py | 1 | ||||
-rw-r--r-- | bitbake/lib/hashserv/server.py | 340 | ||||
-rw-r--r-- | bitbake/lib/hashserv/tests.py | 60 |
4 files changed, 314 insertions, 153 deletions
diff --git a/bitbake/lib/hashserv/__init__.py b/bitbake/lib/hashserv/__init__.py index 5f2e101e525..9cb3fd57a51 100644 --- a/bitbake/lib/hashserv/__init__.py +++ b/bitbake/lib/hashserv/__init__.py @@ -22,46 +22,68 @@ ADDR_TYPE_TCP = 1 # is necessary DEFAULT_MAX_CHUNK = 32 * 1024 -TABLE_DEFINITION = ( - ("method", "TEXT NOT NULL"), - ("outhash", "TEXT NOT NULL"), - ("taskhash", "TEXT NOT NULL"), - ("unihash", "TEXT NOT NULL"), - ("created", "DATETIME"), +UNIHASH_TABLE_DEFINITION = ( + ("method", "TEXT NOT NULL", "UNIQUE"), + ("taskhash", "TEXT NOT NULL", "UNIQUE"), + ("unihash", "TEXT NOT NULL", ""), +) + +UNIHASH_TABLE_COLUMNS = tuple(name for name, _, _ in UNIHASH_TABLE_DEFINITION) + +OUTHASH_TABLE_DEFINITION = ( + ("method", "TEXT NOT NULL", "UNIQUE"), + ("taskhash", "TEXT NOT NULL", "UNIQUE"), + ("outhash", "TEXT NOT NULL", "UNIQUE"), + ("created", "DATETIME", ""), # Optional fields - ("owner", "TEXT"), - ("PN", "TEXT"), - ("PV", "TEXT"), - ("PR", "TEXT"), - ("task", "TEXT"), - ("outhash_siginfo", "TEXT"), + ("owner", "TEXT", ""), + ("PN", "TEXT", ""), + ("PV", "TEXT", ""), + ("PR", "TEXT", ""), + ("task", "TEXT", ""), + ("outhash_siginfo", "TEXT", ""), ) -TABLE_COLUMNS = tuple(name for name, _ in TABLE_DEFINITION) +OUTHASH_TABLE_COLUMNS = tuple(name for name, _, _ in OUTHASH_TABLE_DEFINITION) + +def _make_table(cursor, name, definition): + cursor.execute(''' + CREATE TABLE IF NOT EXISTS {name} ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + {fields} + UNIQUE({unique}) + ) + '''.format( + name=name, + fields=" ".join("%s %s," % (name, typ) for name, typ, _ in definition), + unique=", ".join(name for name, _, flags in definition if "UNIQUE" in flags) + )) + def setup_database(database, sync=True): db = sqlite3.connect(database) db.row_factory = sqlite3.Row with closing(db.cursor()) as cursor: - cursor.execute(''' - CREATE TABLE IF NOT EXISTS tasks_v2 ( - id INTEGER PRIMARY KEY AUTOINCREMENT, - %s - UNIQUE(method, outhash, taskhash) - ) - ''' % " ".join("%s %s," % (name, typ) for name, typ in TABLE_DEFINITION)) + _make_table(cursor, "unihashes_v2", UNIHASH_TABLE_DEFINITION) + _make_table(cursor, "outhashes_v2", OUTHASH_TABLE_DEFINITION) + cursor.execute('PRAGMA journal_mode = WAL') cursor.execute('PRAGMA synchronous = %s' % ('NORMAL' if sync else 'OFF')) # Drop old indexes cursor.execute('DROP INDEX IF EXISTS taskhash_lookup') cursor.execute('DROP INDEX IF EXISTS outhash_lookup') + cursor.execute('DROP INDEX IF EXISTS taskhash_lookup_v2') + cursor.execute('DROP INDEX IF EXISTS outhash_lookup_v2') + + # TODO: Upgrade from tasks_v2? + cursor.execute('DROP TABLE IF EXISTS tasks_v2') # Create new indexes - cursor.execute('CREATE INDEX IF NOT EXISTS taskhash_lookup_v2 ON tasks_v2 (method, taskhash, created)') - cursor.execute('CREATE INDEX IF NOT EXISTS outhash_lookup_v2 ON tasks_v2 (method, outhash)') + cursor.execute('CREATE INDEX IF NOT EXISTS taskhash_lookup_v3 ON unihashes_v2 (method, taskhash)') + cursor.execute('CREATE INDEX IF NOT EXISTS outhash_lookup_v3 ON outhashes_v2 (method, outhash)') return db diff --git a/bitbake/lib/hashserv/client.py b/bitbake/lib/hashserv/client.py index 8cfd90d6a83..b2aa1026ac9 100644 --- a/bitbake/lib/hashserv/client.py +++ b/bitbake/lib/hashserv/client.py @@ -111,6 +111,7 @@ class Client(bb.asyncrpc.Client): "report_unihash", "report_unihash_equiv", "get_taskhash", + "get_outhash", "get_stats", "reset_stats", "backfill_wait", diff --git a/bitbake/lib/hashserv/server.py b/bitbake/lib/hashserv/server.py index a059e521152..ef8227d430e 100644 --- a/bitbake/lib/hashserv/server.py +++ b/bitbake/lib/hashserv/server.py @@ -5,11 +5,12 @@ from contextlib import closing, contextmanager from datetime import datetime +import enum import asyncio import logging import math import time -from . import create_async_client, TABLE_COLUMNS +from . import create_async_client, UNIHASH_TABLE_COLUMNS, OUTHASH_TABLE_COLUMNS import bb.asyncrpc @@ -106,56 +107,64 @@ class Stats(object): return {k: getattr(self, k) for k in ('num', 'total_time', 'max_time', 'average', 'stdev')} -def insert_task(cursor, data, ignore=False): +@enum.unique +class Resolve(enum.Enum): + FAIL = enum.auto() + IGNORE = enum.auto() + REPLACE = enum.auto() + + +def insert_table(cursor, table, data, on_conflict): + resolve = { + Resolve.FAIL: "", + Resolve.IGNORE: " OR IGNORE", + Resolve.REPLACE: " OR REPLACE", + }[on_conflict] + keys = sorted(data.keys()) - query = '''INSERT%s INTO tasks_v2 (%s) VALUES (%s)''' % ( - " OR IGNORE" if ignore else "", - ', '.join(keys), - ', '.join(':' + k for k in keys)) + query = 'INSERT{resolve} INTO {table} ({fields}) VALUES({values})'.format( + resolve=resolve, + table=table, + fields=", ".join(keys), + values=", ".join(":" + k for k in keys), + ) + prevrowid = cursor.lastrowid cursor.execute(query, data) - -async def copy_from_upstream(client, db, method, taskhash): - d = await client.get_taskhash(method, taskhash, True) + logging.debug( + "Inserting %r into %s, %s", + data, + table, + on_conflict + ) + return (cursor.lastrowid, cursor.lastrowid != prevrowid) + +def insert_unihash(cursor, data, on_conflict): + return insert_table(cursor, "unihashes_v2", data, on_conflict) + +def insert_outhash(cursor, data, on_conflict): + return insert_table(cursor, "outhashes_v2", data, on_conflict) + +async def copy_unihash_from_upstream(client, db, method, taskhash): + d = await client.get_taskhash(method, taskhash) if d is not None: - # Filter out unknown columns - d = {k: v for k, v in d.items() if k in TABLE_COLUMNS} - with closing(db.cursor()) as cursor: - insert_task(cursor, d) + insert_unihash( + cursor, + {k: v for k, v in d.items() if k in UNIHASH_TABLE_COLUMNS}, + Resolve.IGNORE, + ) db.commit() - return d -async def copy_outhash_from_upstream(client, db, method, outhash, taskhash): - d = await client.get_outhash(method, outhash, taskhash) - if d is not None: - # Filter out unknown columns - d = {k: v for k, v in d.items() if k in TABLE_COLUMNS} - with closing(db.cursor()) as cursor: - insert_task(cursor, d) - db.commit() +class ServerCursor(object): + def __init__(self, db, cursor, upstream): + self.db = db + self.cursor = cursor + self.upstream = upstream - return d class ServerClient(bb.asyncrpc.AsyncServerConnection): - FAST_QUERY = 'SELECT taskhash, method, unihash FROM tasks_v2 WHERE method=:method AND taskhash=:taskhash ORDER BY created ASC LIMIT 1' - ALL_QUERY = 'SELECT * FROM tasks_v2 WHERE method=:method AND taskhash=:taskhash ORDER BY created ASC LIMIT 1' - OUTHASH_QUERY = ''' - -- Find tasks with a matching outhash (that is, tasks that - -- are equivalent) - SELECT * FROM tasks_v2 WHERE method=:method AND outhash=:outhash - - -- If there is an exact match on the taskhash, return it. - -- Otherwise return the oldest matching outhash of any - -- taskhash - ORDER BY CASE WHEN taskhash=:taskhash THEN 1 ELSE 2 END, - created ASC - - -- Only return one row - LIMIT 1 - ''' - def __init__(self, reader, writer, db, request_stats, backfill_queue, upstream, read_only): super().__init__(reader, writer, 'OEHASHEQUIV', logger) self.db = db @@ -210,36 +219,102 @@ class ServerClient(bb.asyncrpc.AsyncServerConnection): async def handle_get(self, request): method = request['method'] taskhash = request['taskhash'] + fetch_all = request.get('all', False) - if request.get('all', False): - row = self.query_equivalent(method, taskhash, self.ALL_QUERY) - else: - row = self.query_equivalent(method, taskhash, self.FAST_QUERY) + with closing(self.db.cursor()) as cursor: + d = await self.get_unihash(cursor, method, taskhash, fetch_all) - if row is not None: - logger.debug('Found equivalent task %s -> %s', (row['taskhash'], row['unihash'])) - d = {k: row[k] for k in row.keys()} - elif self.upstream_client is not None: - d = await copy_from_upstream(self.upstream_client, self.db, method, taskhash) + self.write_message(d) + + async def get_unihash(self, cursor, method, taskhash, fetch_all=False): + d = None + + if fetch_all: + cursor.execute( + ''' + SELECT *, unihashes_v2.unihash AS unihash FROM outhashes_v2 + INNER JOIN unihashes_v2 ON unihashes_v2.method=outhashes_v2.method AND unihashes_v2.taskhash=outhashes_v2.taskhash + WHERE outhashes_v2.method=:method AND outhashes_v2.taskhash=:taskhash + ORDER BY outhashes_v2.created ASC + LIMIT 1 + ''', + { + 'method': method, + 'taskhash': taskhash, + } + + ) + row = cursor.fetchone() + + if row is not None: + d = {k: row[k] for k in row.keys()} + elif self.upstream_client is not None: + d = await self.upstream_client.get_taskhash(method, taskhash, True) + self.update_unified(cursor, d) + self.db.commit() else: - d = None + row = self.query_equivalent(cursor, method, taskhash) + + if row is not None: + d = {k: row[k] for k in row.keys()} + elif self.upstream_client is not None: + d = await self.upstream_client.get_taskhash(method, taskhash) + d = {k: v for k, v in d.items() if k in UNIHASH_TABLE_COLUMNS} + insert_unihash(cursor, d, Resolve.IGNORE) + self.db.commit() - self.write_message(d) + return d async def handle_get_outhash(self, request): + method = request['method'] + outhash = request['outhash'] + taskhash = request['taskhash'] + with closing(self.db.cursor()) as cursor: - cursor.execute(self.OUTHASH_QUERY, - {k: request[k] for k in ('method', 'outhash', 'taskhash')}) + d = await self.get_outhash(cursor, method, outhash, taskhash) - row = cursor.fetchone() + self.write_message(d) + + async def get_outhash(self, cursor, method, outhash, taskhash): + d = None + cursor.execute( + ''' + SELECT *, unihashes_v2.unihash AS unihash FROM outhashes_v2 + INNER JOIN unihashes_v2 ON unihashes_v2.method=outhashes_v2.method AND unihashes_v2.taskhash=outhashes_v2.taskhash + WHERE outhashes_v2.method=:method AND outhashes_v2.outhash=:outhash + ORDER BY outhashes_v2.created ASC + LIMIT 1 + ''', + { + 'method': method, + 'outhash': outhash, + } + ) + row = cursor.fetchone() if row is not None: - logger.debug('Found equivalent outhash %s -> %s', (row['outhash'], row['unihash'])) d = {k: row[k] for k in row.keys()} - else: - d = None + elif self.upstream_client is not None: + d = await self.upstream_client.get_outhash(method, outhash, taskhash) + self.update_unified(cursor, d) + self.db.commit() - self.write_message(d) + return d + + def update_unified(self, cursor, data): + if data is None: + return + + insert_unihash( + cursor, + {k: v for k, v in data.items() if k in UNIHASH_TABLE_COLUMNS}, + Resolve.IGNORE + ) + insert_outhash( + cursor, + {k: v for k, v in data.items() if k in OUTHASH_TABLE_COLUMNS}, + Resolve.IGNORE + ) async def handle_get_stream(self, request): self.write_message('ok') @@ -267,7 +342,12 @@ class ServerClient(bb.asyncrpc.AsyncServerConnection): (method, taskhash) = l.split() #logger.debug('Looking up %s %s' % (method, taskhash)) - row = self.query_equivalent(method, taskhash, self.FAST_QUERY) + cursor = self.db.cursor() + try: + row = self.query_equivalent(cursor, method, taskhash) + finally: + cursor.close() + if row is not None: msg = ('%s\n' % row['unihash']).encode('utf-8') #logger.debug('Found equivalent task %s -> %s', (row['taskhash'], row['unihash'])) @@ -294,55 +374,82 @@ class ServerClient(bb.asyncrpc.AsyncServerConnection): async def handle_report(self, data): with closing(self.db.cursor()) as cursor: - cursor.execute(self.OUTHASH_QUERY, - {k: data[k] for k in ('method', 'outhash', 'taskhash')}) + outhash_data = { + 'method': data['method'], + 'outhash': data['outhash'], + 'taskhash': data['taskhash'], + 'created': datetime.now() + } - row = cursor.fetchone() + for k in ('owner', 'PN', 'PV', 'PR', 'task', 'outhash_siginfo'): + if k in data: + outhash_data[k] = data[k] + + # Insert the new entry, unless it already exists + (rowid, inserted) = insert_outhash(cursor, outhash_data, Resolve.IGNORE) + + if inserted: + # If this row is new, check if it is equivalent to another + # output hash + cursor.execute( + ''' + SELECT outhashes_v2.taskhash AS taskhash, unihashes_v2.unihash AS unihash FROM outhashes_v2 + INNER JOIN unihashes_v2 ON unihashes_v2.method=outhashes_v2.method AND unihashes_v2.taskhash=outhashes_v2.taskhash + -- Select any matching output hash except the one we just inserted + WHERE outhashes_v2.method=:method AND outhashes_v2.outhash=:outhash AND outhashes_v2.taskhash!=:taskhash + -- Pick the oldest hash + ORDER BY outhashes_v2.created ASC + LIMIT 1 + ''', + { + 'method': data['method'], + 'outhash': data['outhash'], + 'taskhash': data['taskhash'], + } + ) + row = cursor.fetchone() - if row is None and self.upstream_client: - # Try upstream - row = await copy_outhash_from_upstream(self.upstream_client, - self.db, - data['method'], - data['outhash'], - data['taskhash']) - - # If no matching outhash was found, or one *was* found but it - # wasn't an exact match on the taskhash, a new entry for this - # taskhash should be added - if row is None or row['taskhash'] != data['taskhash']: - # If a row matching the outhash was found, the unihash for - # the new taskhash should be the same as that one. - # Otherwise the caller provided unihash is used. - unihash = data['unihash'] if row is not None: + # A matching output hash was found. Set our taskhash to the + # same unihash since they are equivalent unihash = row['unihash'] + resolve = Resolve.REPLACE + else: + # No matching output hash was found. This is probably the + # first outhash to be added. + unihash = data['unihash'] + resolve = Resolve.IGNORE + + # Query upstream to see if it has a unihash we can use + if self.upstream_client is not None: + upstream_data = await self.upstream_client.get_outhash(data['method'], data['outhash'], data['taskhash']) + if upstream_data is not None: + unihash = upstream_data['unihash'] + + + insert_unihash( + cursor, + { + 'method': data['method'], + 'taskhash': data['taskhash'], + 'unihash': unihash, + }, + resolve + ) + + unihash_data = await self.get_unihash(cursor, data['method'], data['taskhash']) + if unihash_data is not None: + unihash = unihash_data['unihash'] + else: + unihash = data['unihash'] - insert_data = { - 'method': data['method'], - 'outhash': data['outhash'], - 'taskhash': data['taskhash'], - 'unihash': unihash, - 'created': datetime.now() - } - - for k in ('owner', 'PN', 'PV', 'PR', 'task', 'outhash_siginfo'): - if k in data: - insert_data[k] = data[k] - - insert_task(cursor, insert_data) - self.db.commit() - - logger.info('Adding taskhash %s with unihash %s', - data['taskhash'], unihash) + self.db.commit() - d = { - 'taskhash': data['taskhash'], - 'method': data['method'], - 'unihash': unihash - } - else: - d = {k: row[k] for k in ('taskhash', 'method', 'unihash')} + d = { + 'taskhash': data['taskhash'], + 'method': data['method'], + 'unihash': unihash, + } self.write_message(d) @@ -350,23 +457,16 @@ class ServerClient(bb.asyncrpc.AsyncServerConnection): with closing(self.db.cursor()) as cursor: insert_data = { 'method': data['method'], - 'outhash': "", 'taskhash': data['taskhash'], 'unihash': data['unihash'], - 'created': datetime.now() } - - for k in ('owner', 'PN', 'PV', 'PR', 'task', 'outhash_siginfo'): - if k in data: - insert_data[k] = data[k] - - insert_task(cursor, insert_data, ignore=True) + insert_unihash(cursor, insert_data, Resolve.IGNORE) self.db.commit() # Fetch the unihash that will be reported for the taskhash. If the # unihash matches, it means this row was inserted (or the mapping # was already valid) - row = self.query_equivalent(data['method'], data['taskhash'], self.FAST_QUERY) + row = self.query_equivalent(cursor, data['method'], data['taskhash']) if row['unihash'] == data['unihash']: logger.info('Adding taskhash equivalence for %s with unihash %s', @@ -399,14 +499,16 @@ class ServerClient(bb.asyncrpc.AsyncServerConnection): await self.backfill_queue.join() self.write_message(d) - def query_equivalent(self, method, taskhash, query): + def query_equivalent(self, cursor, method, taskhash): # This is part of the inner loop and must be as fast as possible - try: - cursor = self.db.cursor() - cursor.execute(query, {'method': method, 'taskhash': taskhash}) - return cursor.fetchone() - except: - cursor.close() + cursor.execute( + 'SELECT taskhash, method, unihash FROM unihashes_v2 WHERE method=:method AND taskhash=:taskhash', + { + 'method': method, + 'taskhash': taskhash, + } + ) + return cursor.fetchone() class Server(bb.asyncrpc.AsyncServer): @@ -435,7 +537,7 @@ class Server(bb.asyncrpc.AsyncServer): self.backfill_queue.task_done() break method, taskhash = item - await copy_from_upstream(client, self.db, method, taskhash) + await copy_unihash_from_upstream(client, self.db, method, taskhash) self.backfill_queue.task_done() finally: await client.close() diff --git a/bitbake/lib/hashserv/tests.py b/bitbake/lib/hashserv/tests.py index 1fcfb6b929a..efaf3bdf424 100644 --- a/bitbake/lib/hashserv/tests.py +++ b/bitbake/lib/hashserv/tests.py @@ -19,10 +19,10 @@ import time import signal def server_prefunc(server, idx): - logging.basicConfig(level=logging.DEBUG, filename='bbhashserv.log', filemode='w', + logging.basicConfig(level=logging.DEBUG, filename='bbhashserv-%d.log' % idx, filemode='w', format='%(levelname)s %(filename)s:%(lineno)d %(message)s') server.logger.debug("Running server %d" % idx) - sys.stdout = open('bbhashserv-%d.log' % idx, 'w') + sys.stdout = open('bbhashserv-stdout-%d.log' % idx, 'w') sys.stderr = sys.stdout class HashEquivalenceTestSetup(object): @@ -140,12 +140,17 @@ class HashEquivalenceCommonTests(object): }) self.assertEqual(result['unihash'], unihash, 'Server returned bad unihash') - result = self.client.get_taskhash(self.METHOD, taskhash, True) - self.assertEqual(result['taskhash'], taskhash) - self.assertEqual(result['unihash'], unihash) - self.assertEqual(result['method'], self.METHOD) - self.assertEqual(result['outhash'], outhash) - self.assertEqual(result['outhash_siginfo'], siginfo) + result_unihash = self.client.get_taskhash(self.METHOD, taskhash, True) + self.assertEqual(result_unihash['taskhash'], taskhash) + self.assertEqual(result_unihash['unihash'], unihash) + self.assertEqual(result_unihash['method'], self.METHOD) + + result_outhash = self.client.get_outhash(self.METHOD, outhash, taskhash) + self.assertEqual(result_outhash['taskhash'], taskhash) + self.assertEqual(result_outhash['method'], self.METHOD) + self.assertEqual(result_outhash['unihash'], unihash) + self.assertEqual(result_outhash['outhash'], outhash) + self.assertEqual(result_outhash['outhash_siginfo'], siginfo) def test_stress(self): def query_server(failures): @@ -260,6 +265,39 @@ class HashEquivalenceCommonTests(object): result = down_client.report_unihash(taskhash6, self.METHOD, outhash5, unihash6) self.assertEqual(result['unihash'], unihash5, 'Server failed to copy unihash from upstream') + # Tests read through from server with + taskhash7 = '9d81d76242cc7cfaf7bf74b94b9cd2e29324ed74' + outhash7 = '8470d56547eea6236d7c81a644ce74670ca0bbda998e13c629ef6bb3f0d60b69' + unihash7 = '05d2a63c81e32f0a36542ca677e8ad852365c538' + self.client.report_unihash(taskhash7, self.METHOD, outhash7, unihash7) + + result = down_client.get_taskhash(self.METHOD, taskhash7, True) + self.assertEqual(result['unihash'], unihash7, 'Server failed to copy unihash from upstream') + self.assertEqual(result['outhash'], outhash7, 'Server failed to copy unihash from upstream') + self.assertEqual(result['taskhash'], taskhash7, 'Server failed to copy unihash from upstream') + self.assertEqual(result['method'], self.METHOD) + + taskhash8 = '86978a4c8c71b9b487330b0152aade10c1ee58aa' + outhash8 = 'ca8c128e9d9e4a28ef24d0508aa20b5cf880604eacd8f65c0e366f7e0cc5fbcf' + unihash8 = 'd8bcf25369d40590ad7d08c84d538982f2023e01' + self.client.report_unihash(taskhash8, self.METHOD, outhash8, unihash8) + + result = down_client.get_outhash(self.METHOD, outhash8, taskhash8) + self.assertEqual(result['unihash'], unihash8, 'Server failed to copy unihash from upstream') + self.assertEqual(result['outhash'], outhash8, 'Server failed to copy unihash from upstream') + self.assertEqual(result['taskhash'], taskhash8, 'Server failed to copy unihash from upstream') + self.assertEqual(result['method'], self.METHOD) + + taskhash9 = 'ae6339531895ddf5b67e663e6a374ad8ec71d81c' + outhash9 = 'afc78172c81880ae10a1fec994b5b4ee33d196a001a1b66212a15ebe573e00b5' + unihash9 = '6662e699d6e3d894b24408ff9a4031ef9b038ee8' + self.client.report_unihash(taskhash9, self.METHOD, outhash9, unihash9) + + result = down_client.get_taskhash(self.METHOD, taskhash9, False) + self.assertEqual(result['unihash'], unihash9, 'Server failed to copy unihash from upstream') + self.assertEqual(result['taskhash'], taskhash9, 'Server failed to copy unihash from upstream') + self.assertEqual(result['method'], self.METHOD) + def test_ro_server(self): (ro_client, ro_server) = self.start_server(dbpath=self.server.dbpath, read_only=True) @@ -287,10 +325,8 @@ class HashEquivalenceCommonTests(object): def test_slow_server_start(self): - """ - Ensures that the server will exit correctly even if it gets a SIGTERM - before entering the main loop - """ + # Ensures that the server will exit correctly even if it gets a SIGTERM + # before entering the main loop event = multiprocessing.Event() |