diff options
-rwxr-xr-x | bitbake/bin/bitbake-hashserv | 5 | ||||
-rw-r--r-- | bitbake/lib/hashserv/__init__.py | 40 |
2 files changed, 31 insertions, 14 deletions
diff --git a/bitbake/bin/bitbake-hashserv b/bitbake/bin/bitbake-hashserv index 6c911c098a7..222dff8376f 100755 --- a/bitbake/bin/bitbake-hashserv +++ b/bitbake/bin/bitbake-hashserv @@ -10,6 +10,7 @@ import sys import logging import argparse import sqlite3 +import multiprocessing sys.path.insert(0, os.path.join(os.path.dirname(os.path.dirname(__file__)),'lib')) @@ -27,6 +28,8 @@ def main(): parser.add_argument('--prefix', default='', help='HTTP path prefix (default "%(default)s")') parser.add_argument('--database', default='./hashserv.db', help='Database file (default "%(default)s")') parser.add_argument('--log', default='WARNING', help='Set logging level') + parser.add_argument('--threads', '-j', type=int, default=multiprocessing.cpu_count(), + help='Number of server threads. Default is %(default)d') args = parser.parse_args() @@ -41,7 +44,7 @@ def main(): console.setLevel(level) logger.addHandler(console) - server = hashserv.create_server((args.address, args.port), args.database, args.prefix) + server = hashserv.create_server((args.address, args.port), args.database, args.prefix, num_threads=args.threads) server.serve_forever() return 0 diff --git a/bitbake/lib/hashserv/__init__.py b/bitbake/lib/hashserv/__init__.py index 6be01f5fa85..76f5060f6ff 100644 --- a/bitbake/lib/hashserv/__init__.py +++ b/bitbake/lib/hashserv/__init__.py @@ -328,14 +328,22 @@ class ClientSocket(socket.socket): return s class ThreadedHTTPServer(HTTPServer): - quit = False + # The quit event needs to be a distinct object. Making a plain object() and + # checking it with "is" works well + quit = object() + + def __init__(self, *args, num_threads=1, **kwargs): + super().__init__(*args, **kwargs) + self.num_threads = num_threads def serve_forever(self): + logger.debug('Using %d threads' % self.num_threads) self.requestqueue = queue.Queue() - self.handlerthread = threading.Thread(target=self.process_request_thread) - self.handlerthread.daemon = False + self.handlerthreads = [threading.Thread(target=self.process_request_thread) for i in range(self.num_threads)] - self.handlerthread.start() + for t in self.handlerthreads: + t.daemon = False + t.start() signal.signal(signal.SIGTERM, self.sigterm_exception) super().serve_forever() @@ -346,13 +354,18 @@ class ThreadedHTTPServer(HTTPServer): os._exit(0) def process_request_thread(self): - while not self.quit: + while True: try: - (request, client_address) = self.requestqueue.get(True) + d = self.requestqueue.get(True) + if d is self.quit: + # Put the item back in the queue for another thread to get + self.requestqueue.put(d) + break + + (request, client_address) = d except queue.Empty: continue - if request is None: - continue + try: self.finish_request(request, client_address) except Exception: @@ -366,15 +379,16 @@ class ThreadedHTTPServer(HTTPServer): def server_close(self): super().server_close() - self.quit = True - self.requestqueue.put((None, None)) - self.handlerthread.join() + + self.requestqueue.put(self.quit) + for t in self.handlerthreads: + t.join() def get_request(self): sock, client_address = super().get_request() return ClientSocket.create(sock), client_address -def create_server(addr, dbname, prefix=''): +def create_server(addr, dbname, prefix='', num_threads=1): class Handler(HashEquivalenceServer): pass @@ -409,7 +423,7 @@ def create_server(addr, dbname, prefix=''): cursor.execute('CREATE INDEX IF NOT EXISTS taskhash_lookup ON tasks_v2 (method, taskhash)') cursor.execute('CREATE INDEX IF NOT EXISTS outhash_lookup ON tasks_v2 (method, outhash)') - ret = ThreadedHTTPServer(addr, Handler) + ret = ThreadedHTTPServer(addr, Handler, num_threads=num_threads) logger.info('Starting server on %s\n', ret.server_port) |