summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rwxr-xr-xbitbake/bin/bitbake-hashserv5
-rw-r--r--bitbake/lib/hashserv/__init__.py40
2 files changed, 31 insertions, 14 deletions
diff --git a/bitbake/bin/bitbake-hashserv b/bitbake/bin/bitbake-hashserv
index 6c911c098a7..222dff8376f 100755
--- a/bitbake/bin/bitbake-hashserv
+++ b/bitbake/bin/bitbake-hashserv
@@ -10,6 +10,7 @@ import sys
import logging
import argparse
import sqlite3
+import multiprocessing
sys.path.insert(0, os.path.join(os.path.dirname(os.path.dirname(__file__)),'lib'))
@@ -27,6 +28,8 @@ def main():
parser.add_argument('--prefix', default='', help='HTTP path prefix (default "%(default)s")')
parser.add_argument('--database', default='./hashserv.db', help='Database file (default "%(default)s")')
parser.add_argument('--log', default='WARNING', help='Set logging level')
+ parser.add_argument('--threads', '-j', type=int, default=multiprocessing.cpu_count(),
+ help='Number of server threads. Default is %(default)d')
args = parser.parse_args()
@@ -41,7 +44,7 @@ def main():
console.setLevel(level)
logger.addHandler(console)
- server = hashserv.create_server((args.address, args.port), args.database, args.prefix)
+ server = hashserv.create_server((args.address, args.port), args.database, args.prefix, num_threads=args.threads)
server.serve_forever()
return 0
diff --git a/bitbake/lib/hashserv/__init__.py b/bitbake/lib/hashserv/__init__.py
index 6be01f5fa85..76f5060f6ff 100644
--- a/bitbake/lib/hashserv/__init__.py
+++ b/bitbake/lib/hashserv/__init__.py
@@ -328,14 +328,22 @@ class ClientSocket(socket.socket):
return s
class ThreadedHTTPServer(HTTPServer):
- quit = False
+ # The quit event needs to be a distinct object. Making a plain object() and
+ # checking it with "is" works well
+ quit = object()
+
+ def __init__(self, *args, num_threads=1, **kwargs):
+ super().__init__(*args, **kwargs)
+ self.num_threads = num_threads
def serve_forever(self):
+ logger.debug('Using %d threads' % self.num_threads)
self.requestqueue = queue.Queue()
- self.handlerthread = threading.Thread(target=self.process_request_thread)
- self.handlerthread.daemon = False
+ self.handlerthreads = [threading.Thread(target=self.process_request_thread) for i in range(self.num_threads)]
- self.handlerthread.start()
+ for t in self.handlerthreads:
+ t.daemon = False
+ t.start()
signal.signal(signal.SIGTERM, self.sigterm_exception)
super().serve_forever()
@@ -346,13 +354,18 @@ class ThreadedHTTPServer(HTTPServer):
os._exit(0)
def process_request_thread(self):
- while not self.quit:
+ while True:
try:
- (request, client_address) = self.requestqueue.get(True)
+ d = self.requestqueue.get(True)
+ if d is self.quit:
+ # Put the item back in the queue for another thread to get
+ self.requestqueue.put(d)
+ break
+
+ (request, client_address) = d
except queue.Empty:
continue
- if request is None:
- continue
+
try:
self.finish_request(request, client_address)
except Exception:
@@ -366,15 +379,16 @@ class ThreadedHTTPServer(HTTPServer):
def server_close(self):
super().server_close()
- self.quit = True
- self.requestqueue.put((None, None))
- self.handlerthread.join()
+
+ self.requestqueue.put(self.quit)
+ for t in self.handlerthreads:
+ t.join()
def get_request(self):
sock, client_address = super().get_request()
return ClientSocket.create(sock), client_address
-def create_server(addr, dbname, prefix=''):
+def create_server(addr, dbname, prefix='', num_threads=1):
class Handler(HashEquivalenceServer):
pass
@@ -409,7 +423,7 @@ def create_server(addr, dbname, prefix=''):
cursor.execute('CREATE INDEX IF NOT EXISTS taskhash_lookup ON tasks_v2 (method, taskhash)')
cursor.execute('CREATE INDEX IF NOT EXISTS outhash_lookup ON tasks_v2 (method, outhash)')
- ret = ThreadedHTTPServer(addr, Handler)
+ ret = ThreadedHTTPServer(addr, Handler, num_threads=num_threads)
logger.info('Starting server on %s\n', ret.server_port)