summaryrefslogtreecommitdiffstats
path: root/bitbake/lib/hashserv/__init__.py
diff options
context:
space:
mode:
Diffstat (limited to 'bitbake/lib/hashserv/__init__.py')
-rw-r--r--bitbake/lib/hashserv/__init__.py40
1 files changed, 27 insertions, 13 deletions
diff --git a/bitbake/lib/hashserv/__init__.py b/bitbake/lib/hashserv/__init__.py
index 6be01f5fa85..76f5060f6ff 100644
--- a/bitbake/lib/hashserv/__init__.py
+++ b/bitbake/lib/hashserv/__init__.py
@@ -328,14 +328,22 @@ class ClientSocket(socket.socket):
return s
class ThreadedHTTPServer(HTTPServer):
- quit = False
+ # The quit event needs to be a distinct object. Making a plain object() and
+ # checking it with "is" works well
+ quit = object()
+
+ def __init__(self, *args, num_threads=1, **kwargs):
+ super().__init__(*args, **kwargs)
+ self.num_threads = num_threads
def serve_forever(self):
+ logger.debug('Using %d threads' % self.num_threads)
self.requestqueue = queue.Queue()
- self.handlerthread = threading.Thread(target=self.process_request_thread)
- self.handlerthread.daemon = False
+ self.handlerthreads = [threading.Thread(target=self.process_request_thread) for i in range(self.num_threads)]
- self.handlerthread.start()
+ for t in self.handlerthreads:
+ t.daemon = False
+ t.start()
signal.signal(signal.SIGTERM, self.sigterm_exception)
super().serve_forever()
@@ -346,13 +354,18 @@ class ThreadedHTTPServer(HTTPServer):
os._exit(0)
def process_request_thread(self):
- while not self.quit:
+ while True:
try:
- (request, client_address) = self.requestqueue.get(True)
+ d = self.requestqueue.get(True)
+ if d is self.quit:
+ # Put the item back in the queue for another thread to get
+ self.requestqueue.put(d)
+ break
+
+ (request, client_address) = d
except queue.Empty:
continue
- if request is None:
- continue
+
try:
self.finish_request(request, client_address)
except Exception:
@@ -366,15 +379,16 @@ class ThreadedHTTPServer(HTTPServer):
def server_close(self):
super().server_close()
- self.quit = True
- self.requestqueue.put((None, None))
- self.handlerthread.join()
+
+ self.requestqueue.put(self.quit)
+ for t in self.handlerthreads:
+ t.join()
def get_request(self):
sock, client_address = super().get_request()
return ClientSocket.create(sock), client_address
-def create_server(addr, dbname, prefix=''):
+def create_server(addr, dbname, prefix='', num_threads=1):
class Handler(HashEquivalenceServer):
pass
@@ -409,7 +423,7 @@ def create_server(addr, dbname, prefix=''):
cursor.execute('CREATE INDEX IF NOT EXISTS taskhash_lookup ON tasks_v2 (method, taskhash)')
cursor.execute('CREATE INDEX IF NOT EXISTS outhash_lookup ON tasks_v2 (method, outhash)')
- ret = ThreadedHTTPServer(addr, Handler)
+ ret = ThreadedHTTPServer(addr, Handler, num_threads=num_threads)
logger.info('Starting server on %s\n', ret.server_port)