summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorJoshua Watt <JPEWhacker@gmail.com>2019-08-15 12:54:30 -0500
committerJoshua Watt <JPEWhacker@gmail.com>2019-08-15 13:01:34 -0500
commit66c70708515cdeb903a93bd738b8f6a87cdd3926 (patch)
treedf2b3e54232f3bdf4b24b536212c1dfb58104aff
parent743a884ba4adc0ae98f359d15e7ce74a349fe89c (diff)
downloadpoky-contrib-66c70708515cdeb903a93bd738b8f6a87cdd3926.tar.gz
poky-contrib-66c70708515cdeb903a93bd738b8f6a87cdd3926.tar.bz2
poky-contrib-66c70708515cdeb903a93bd738b8f6a87cdd3926.zip
bitbake: hashserv: Add more threadsjpew/hashserve-stats
Adds support for running the hash server with more threads. If the server is invoked automatically by bitbake it will use a single thread because in the presence of a single client this is the fastest mechanism. If the server is started manually on the command line, it is assumed that multiple clients will be connecting, so it will use one thread per CPU core (although since the server might be I/O bound not CPU bound, this still may not be sufficient for optimal performance). Signed-off-by: Joshua Watt <JPEWhacker@gmail.com>
-rwxr-xr-xbitbake/bin/bitbake-hashserv5
-rw-r--r--bitbake/lib/hashserv/__init__.py40
2 files changed, 31 insertions, 14 deletions
diff --git a/bitbake/bin/bitbake-hashserv b/bitbake/bin/bitbake-hashserv
index 6c911c098a..222dff8376 100755
--- a/bitbake/bin/bitbake-hashserv
+++ b/bitbake/bin/bitbake-hashserv
@@ -10,6 +10,7 @@ import sys
import logging
import argparse
import sqlite3
+import multiprocessing
sys.path.insert(0, os.path.join(os.path.dirname(os.path.dirname(__file__)),'lib'))
@@ -27,6 +28,8 @@ def main():
parser.add_argument('--prefix', default='', help='HTTP path prefix (default "%(default)s")')
parser.add_argument('--database', default='./hashserv.db', help='Database file (default "%(default)s")')
parser.add_argument('--log', default='WARNING', help='Set logging level')
+ parser.add_argument('--threads', '-j', type=int, default=multiprocessing.cpu_count(),
+ help='Number of server threads. Default is %(default)d')
args = parser.parse_args()
@@ -41,7 +44,7 @@ def main():
console.setLevel(level)
logger.addHandler(console)
- server = hashserv.create_server((args.address, args.port), args.database, args.prefix)
+ server = hashserv.create_server((args.address, args.port), args.database, args.prefix, num_threads=args.threads)
server.serve_forever()
return 0
diff --git a/bitbake/lib/hashserv/__init__.py b/bitbake/lib/hashserv/__init__.py
index 6be01f5fa8..76f5060f6f 100644
--- a/bitbake/lib/hashserv/__init__.py
+++ b/bitbake/lib/hashserv/__init__.py
@@ -328,14 +328,22 @@ class ClientSocket(socket.socket):
return s
class ThreadedHTTPServer(HTTPServer):
- quit = False
+ # The quit event needs to be a distinct object. Making a plain object() and
+ # checking it with "is" works well
+ quit = object()
+
+ def __init__(self, *args, num_threads=1, **kwargs):
+ super().__init__(*args, **kwargs)
+ self.num_threads = num_threads
def serve_forever(self):
+ logger.debug('Using %d threads' % self.num_threads)
self.requestqueue = queue.Queue()
- self.handlerthread = threading.Thread(target=self.process_request_thread)
- self.handlerthread.daemon = False
+ self.handlerthreads = [threading.Thread(target=self.process_request_thread) for i in range(self.num_threads)]
- self.handlerthread.start()
+ for t in self.handlerthreads:
+ t.daemon = False
+ t.start()
signal.signal(signal.SIGTERM, self.sigterm_exception)
super().serve_forever()
@@ -346,13 +354,18 @@ class ThreadedHTTPServer(HTTPServer):
os._exit(0)
def process_request_thread(self):
- while not self.quit:
+ while True:
try:
- (request, client_address) = self.requestqueue.get(True)
+ d = self.requestqueue.get(True)
+ if d is self.quit:
+ # Put the item back in the queue for another thread to get
+ self.requestqueue.put(d)
+ break
+
+ (request, client_address) = d
except queue.Empty:
continue
- if request is None:
- continue
+
try:
self.finish_request(request, client_address)
except Exception:
@@ -366,15 +379,16 @@ class ThreadedHTTPServer(HTTPServer):
def server_close(self):
super().server_close()
- self.quit = True
- self.requestqueue.put((None, None))
- self.handlerthread.join()
+
+ self.requestqueue.put(self.quit)
+ for t in self.handlerthreads:
+ t.join()
def get_request(self):
sock, client_address = super().get_request()
return ClientSocket.create(sock), client_address
-def create_server(addr, dbname, prefix=''):
+def create_server(addr, dbname, prefix='', num_threads=1):
class Handler(HashEquivalenceServer):
pass
@@ -409,7 +423,7 @@ def create_server(addr, dbname, prefix=''):
cursor.execute('CREATE INDEX IF NOT EXISTS taskhash_lookup ON tasks_v2 (method, taskhash)')
cursor.execute('CREATE INDEX IF NOT EXISTS outhash_lookup ON tasks_v2 (method, outhash)')
- ret = ThreadedHTTPServer(addr, Handler)
+ ret = ThreadedHTTPServer(addr, Handler, num_threads=num_threads)
logger.info('Starting server on %s\n', ret.server_port)