diff options
Diffstat (limited to 'bitbake/lib/bb/server/process.py')
-rw-r--r-- | bitbake/lib/bb/server/process.py | 588 |
1 files changed, 409 insertions, 179 deletions
diff --git a/bitbake/lib/bb/server/process.py b/bitbake/lib/bb/server/process.py index 80a7875ad9..76b189291d 100644 --- a/bitbake/lib/bb/server/process.py +++ b/bitbake/lib/bb/server/process.py @@ -3,18 +3,8 @@ # # Copyright (C) 2010 Bob Foerster <robert@erafx.com> # -# This program is free software; you can redistribute it and/or modify -# it under the terms of the GNU General Public License version 2 as -# published by the Free Software Foundation. +# SPDX-License-Identifier: GPL-2.0-only # -# This program is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU General Public License for more details. -# -# You should have received a copy of the GNU General Public License along -# with this program; if not, write to the Free Software Foundation, Inc., -# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. """ This module implements a multiprocessing.Process based server for bitbake. @@ -35,6 +25,10 @@ import subprocess import errno import re import datetime +import pickle +import traceback +import gc +import stat import bb.server.xmlrpcserver from bb import daemonize from multiprocessing import queues @@ -44,12 +38,52 @@ logger = logging.getLogger('BitBake') class ProcessTimeout(SystemExit): pass -class ProcessServer(multiprocessing.Process): +def currenttime(): + return datetime.datetime.now().strftime('%H:%M:%S.%f') + +def serverlog(msg): + print(str(os.getpid()) + " " + currenttime() + " " + msg) + #Seems a flush here triggers filesytem sync like behaviour and long hangs in the server + #sys.stdout.flush() + +# +# When we have lockfile issues, try and find infomation about which process is +# using the lockfile +# +def get_lockfile_process_msg(lockfile): + # Some systems may not have lsof available + procs = None + try: + procs = subprocess.check_output(["lsof", '-w', lockfile], stderr=subprocess.STDOUT) + except subprocess.CalledProcessError: + # File was deleted? + pass + except OSError as e: + if e.errno != errno.ENOENT: + raise + if procs is None: + # Fall back to fuser if lsof is unavailable + try: + procs = subprocess.check_output(["fuser", '-v', lockfile], stderr=subprocess.STDOUT) + except subprocess.CalledProcessError: + # File was deleted? + pass + except OSError as e: + if e.errno != errno.ENOENT: + raise + if procs: + return procs.decode("utf-8") + return None + +class idleFinish(): + def __init__(self, msg): + self.msg = msg + +class ProcessServer(): profile_filename = "profile.log" profile_processed_filename = "profile.log.processed" - def __init__(self, lock, sock, sockname): - multiprocessing.Process.__init__(self) + def __init__(self, lock, lockname, sock, sockname, server_timeout, xmlrpcinterface): self.command_channel = False self.command_channel_reply = False self.quit = False @@ -57,42 +91,42 @@ class ProcessServer(multiprocessing.Process): self.next_heartbeat = time.time() self.event_handle = None + self.hadanyui = False self.haveui = False - self.lastui = False + self.maxuiwait = 30 self.xmlrpc = False + self.idle = None + # Need a lock for _idlefuns changes self._idlefuns = {} + self._idlefuncsLock = threading.Lock() + self.idle_cond = threading.Condition(self._idlefuncsLock) self.bitbake_lock = lock + self.bitbake_lock_name = lockname self.sock = sock self.sockname = sockname + # It is possible the directory may be renamed. Cache the inode of the socket file + # so we can tell if things changed. + self.sockinode = os.stat(self.sockname)[stat.ST_INO] + + self.server_timeout = server_timeout + self.timeout = self.server_timeout + self.xmlrpcinterface = xmlrpcinterface def register_idle_function(self, function, data): """Register a function to be called while the server is idle""" assert hasattr(function, '__call__') - self._idlefuns[function] = data + with bb.utils.lock_timeout(self._idlefuncsLock): + self._idlefuns[function] = data + serverlog("Registering idle function %s" % str(function)) def run(self): if self.xmlrpcinterface[0]: self.xmlrpc = bb.server.xmlrpcserver.BitBakeXMLRPCServer(self.xmlrpcinterface, self.cooker, self) - print("Bitbake XMLRPC server address: %s, server port: %s" % (self.xmlrpc.host, self.xmlrpc.port)) - - heartbeat_event = self.cooker.data.getVar('BB_HEARTBEAT_EVENT') - if heartbeat_event: - try: - self.heartbeat_seconds = float(heartbeat_event) - except: - bb.warn('Ignoring invalid BB_HEARTBEAT_EVENT=%s, must be a float specifying seconds.' % heartbeat_event) - - self.timeout = self.server_timeout or self.cooker.data.getVar('BB_SERVER_TIMEOUT') - try: - if self.timeout: - self.timeout = float(self.timeout) - except: - bb.warn('Ignoring invalid BB_SERVER_TIMEOUT=%s, must be a float specifying seconds.' % self.timeout) - + serverlog("Bitbake XMLRPC server address: %s, server port: %s" % (self.xmlrpc.host, self.xmlrpc.port)) try: self.bitbake_lock.seek(0) @@ -103,7 +137,7 @@ class ProcessServer(multiprocessing.Process): self.bitbake_lock.write("%s\n" % (os.getpid())) self.bitbake_lock.flush() except Exception as e: - print("Error writing to lock file: %s" % str(e)) + serverlog("Error writing to lock file: %s" % str(e)) pass if self.cooker.configuration.profile: @@ -117,13 +151,38 @@ class ProcessServer(multiprocessing.Process): prof.dump_stats("profile.log") bb.utils.process_profilelog("profile.log") - print("Raw profiling information saved to profile.log and processed statistics to profile.log.processed") + serverlog("Raw profiling information saved to profile.log and processed statistics to profile.log.processed") else: ret = self.main() return ret + def _idle_check(self): + return len(self._idlefuns) == 0 and self.cooker.command.currentAsyncCommand is None + + def wait_for_idle(self, timeout=30): + # Wait for the idle loop to have cleared + with bb.utils.lock_timeout(self._idlefuncsLock): + return self.idle_cond.wait_for(self._idle_check, timeout) is not False + + def set_async_cmd(self, cmd): + with bb.utils.lock_timeout(self._idlefuncsLock): + ret = self.idle_cond.wait_for(self._idle_check, 30) + if ret is False: + return False + self.cooker.command.currentAsyncCommand = cmd + return True + + def clear_async_cmd(self): + with bb.utils.lock_timeout(self._idlefuncsLock): + self.cooker.command.currentAsyncCommand = None + self.idle_cond.notify_all() + + def get_async_cmd(self): + with bb.utils.lock_timeout(self._idlefuncsLock): + return self.cooker.command.currentAsyncCommand + def main(self): self.cooker.pre_serve() @@ -136,15 +195,21 @@ class ProcessServer(multiprocessing.Process): fds = [self.sock] if self.xmlrpc: fds.append(self.xmlrpc) - print("Entering server connection loop") + seendata = False + serverlog("Entering server connection loop") + serverlog("Lockfile is: %s\nSocket is %s (%s)" % (self.bitbake_lock_name, self.sockname, os.path.exists(self.sockname))) def disconnect_client(self, fds): - print("Disconnecting Client") + serverlog("Disconnecting Client (socket: %s)" % os.path.exists(self.sockname)) if self.controllersock: fds.remove(self.controllersock) self.controllersock.close() self.controllersock = False if self.haveui: + # Wait for the idle loop to have cleared (30s max) + if not self.wait_for_idle(30): + serverlog("Idle loop didn't finish queued commands after 30s, exiting.") + self.quit = True fds.remove(self.command_channel) bb.event.unregister_UIHhandler(self.event_handle, True) self.command_channel_reply.writer.close() @@ -156,31 +221,32 @@ class ProcessServer(multiprocessing.Process): self.cooker.clientComplete() self.haveui = False ready = select.select(fds,[],[],0)[0] - if newconnections: - print("Starting new client") + if newconnections and not self.quit: + serverlog("Starting new client") conn = newconnections.pop(-1) fds.append(conn) self.controllersock = conn - elif self.timeout is None and not ready: - print("No timeout, exiting.") + elif not self.timeout and not ready: + serverlog("No timeout, exiting.") self.quit = True + self.lastui = time.time() while not self.quit: if self.sock in ready: while select.select([self.sock],[],[],0)[0]: controllersock, address = self.sock.accept() if self.controllersock: - print("Queuing %s (%s)" % (str(ready), str(newconnections))) + serverlog("Queuing %s (%s)" % (str(ready), str(newconnections))) newconnections.append(controllersock) else: - print("Accepting %s (%s)" % (str(ready), str(newconnections))) + serverlog("Accepting %s (%s)" % (str(ready), str(newconnections))) self.controllersock = controllersock fds.append(controllersock) if self.controllersock in ready: try: - print("Processing Client") + serverlog("Processing Client") ui_fds = recvfds(self.controllersock, 3) - print("Connecting Client") + serverlog("Connecting Client") # Where to write events to writer = ConnectionWriter(ui_fds[0]) @@ -197,13 +263,21 @@ class ProcessServer(multiprocessing.Process): self.command_channel_reply = writer self.haveui = True + self.hadanyui = True except (EOFError, OSError): disconnect_client(self, fds) - if not self.timeout == -1.0 and not self.haveui and self.lastui and self.timeout and \ + if not self.timeout == -1.0 and not self.haveui and self.timeout and \ (self.lastui + self.timeout) < time.time(): - print("Server timeout, exiting.") + serverlog("Server timeout, exiting.") + self.quit = True + + # If we don't see a UI connection within maxuiwait, its unlikely we're going to see + # one. We have had issue with processes hanging indefinitely so timing out UI-less + # servers is useful. + if not self.hadanyui and not self.xmlrpc and not self.timeout and (self.lastui + self.maxuiwait) < time.time(): + serverlog("No UI connection within max timeout, exiting to avoid infinite loop.") self.quit = True if self.command_channel in ready: @@ -218,23 +292,56 @@ class ProcessServer(multiprocessing.Process): self.quit = True continue try: - print("Running command %s" % command) - self.command_channel_reply.send(self.cooker.command.runCommand(command)) + serverlog("Running command %s" % command) + reply = self.cooker.command.runCommand(command, self) + serverlog("Sending reply %s" % repr(reply)) + self.command_channel_reply.send(reply) + serverlog("Command Completed (socket: %s)" % os.path.exists(self.sockname)) except Exception as e: - logger.exception('Exception in server main event loop running command %s (%s)' % (command, str(e))) + stack = traceback.format_exc() + serverlog('Exception in server main event loop running command %s (%s)' % (command, stack)) + logger.exception('Exception in server main event loop running command %s (%s)' % (command, stack)) if self.xmlrpc in ready: self.xmlrpc.handle_requests() + if not seendata and hasattr(self.cooker, "data"): + heartbeat_event = self.cooker.data.getVar('BB_HEARTBEAT_EVENT') + if heartbeat_event: + try: + self.heartbeat_seconds = float(heartbeat_event) + except: + bb.warn('Ignoring invalid BB_HEARTBEAT_EVENT=%s, must be a float specifying seconds.' % heartbeat_event) + + self.timeout = self.server_timeout or self.cooker.data.getVar('BB_SERVER_TIMEOUT') + try: + if self.timeout: + self.timeout = float(self.timeout) + except: + bb.warn('Ignoring invalid BB_SERVER_TIMEOUT=%s, must be a float specifying seconds.' % self.timeout) + seendata = True + ready = self.idle_commands(.1, fds) - print("Exiting") + if self.idle: + self.idle.join() + + serverlog("Exiting (socket: %s)" % os.path.exists(self.sockname)) # Remove the socket file so we don't get any more connections to avoid races - os.unlink(self.sockname) + # The build directory could have been renamed so if the file isn't the one we created + # we shouldn't delete it. + try: + sockinode = os.stat(self.sockname)[stat.ST_INO] + if sockinode == self.sockinode: + os.unlink(self.sockname) + else: + serverlog("bitbake.sock inode mismatch (%s vs %s), not deleting." % (sockinode, self.sockinode)) + except Exception as err: + serverlog("Removing socket file '%s' failed (%s)" % (self.sockname, err)) self.sock.close() - try: - self.cooker.shutdown(True) + try: + self.cooker.shutdown(True, idle=False) self.cooker.notifier.stop() self.cooker.confignotifier.stop() except: @@ -242,84 +349,154 @@ class ProcessServer(multiprocessing.Process): self.cooker.post_serve() + if len(threading.enumerate()) != 1: + serverlog("More than one thread left?: " + str(threading.enumerate())) + + # Flush logs before we release the lock + sys.stdout.flush() + sys.stderr.flush() + # Finally release the lockfile but warn about other processes holding it open lock = self.bitbake_lock - lockfile = lock.name + lockfile = self.bitbake_lock_name + + def get_lock_contents(lockfile): + try: + with open(lockfile, "r") as f: + return f.readlines() + except FileNotFoundError: + return None + lock.close() lock = None while not lock: - with bb.utils.timeout(3): - lock = bb.utils.lockfile(lockfile, shared=False, retry=False, block=True) - if lock: - # We hold the lock so we can remove the file (hide stale pid data) - bb.utils.remove(lockfile) - bb.utils.unlockfile(lock) - return - + i = 0 + lock = None + if not os.path.exists(os.path.basename(lockfile)): + serverlog("Lockfile directory gone, exiting.") + return + + while not lock and i < 30: + lock = bb.utils.lockfile(lockfile, shared=False, retry=False, block=False) if not lock: - # Some systems may not have lsof available - procs = None + newlockcontents = get_lock_contents(lockfile) + if not newlockcontents[0].startswith([f"{os.getpid()}\n", f"{os.getpid()} "]): + # A new server was started, the lockfile contents changed, we can exit + serverlog("Lockfile now contains different contents, exiting: " + str(newlockcontents)) + return + time.sleep(0.1) + i += 1 + if lock: + # We hold the lock so we can remove the file (hide stale pid data) + # via unlockfile. + bb.utils.unlockfile(lock) + serverlog("Exiting as we could obtain the lock") + return + + if not lock: + procs = get_lockfile_process_msg(lockfile) + msg = ["Delaying shutdown due to active processes which appear to be holding bitbake.lock"] + if procs: + msg.append(":\n%s" % procs) + serverlog("".join(msg)) + + def idle_thread(self): + if self.cooker.configuration.profile: + try: + import cProfile as profile + except: + import profile + prof = profile.Profile() + + ret = profile.Profile.runcall(prof, self.idle_thread_internal) + + prof.dump_stats("profile-mainloop.log") + bb.utils.process_profilelog("profile-mainloop.log") + serverlog("Raw profiling information saved to profile-mainloop.log and processed statistics to profile-mainloop.log.processed") + else: + self.idle_thread_internal() + + def idle_thread_internal(self): + def remove_idle_func(function): + with bb.utils.lock_timeout(self._idlefuncsLock): + del self._idlefuns[function] + self.idle_cond.notify_all() + + while not self.quit: + nextsleep = 0.1 + fds = [] + + with bb.utils.lock_timeout(self._idlefuncsLock): + items = list(self._idlefuns.items()) + + for function, data in items: + try: + retval = function(self, data, False) + if isinstance(retval, idleFinish): + serverlog("Removing idle function %s at idleFinish" % str(function)) + remove_idle_func(function) + self.cooker.command.finishAsyncCommand(retval.msg) + nextsleep = None + elif retval is False: + serverlog("Removing idle function %s" % str(function)) + remove_idle_func(function) + nextsleep = None + elif retval is True: + nextsleep = None + elif isinstance(retval, float) and nextsleep: + if (retval < nextsleep): + nextsleep = retval + elif nextsleep is None: + continue + else: + fds = fds + retval + except SystemExit: + raise + except Exception as exc: + if not isinstance(exc, bb.BBHandledException): + logger.exception('Running idle function') + remove_idle_func(function) + serverlog("Exception %s broke the idle_thread, exiting" % traceback.format_exc()) + self.quit = True + + # Create new heartbeat event? + now = time.time() + if bb.event._heartbeat_enabled and now >= self.next_heartbeat: + # We might have missed heartbeats. Just trigger once in + # that case and continue after the usual delay. + self.next_heartbeat += self.heartbeat_seconds + if self.next_heartbeat <= now: + self.next_heartbeat = now + self.heartbeat_seconds + if hasattr(self.cooker, "data"): + heartbeat = bb.event.HeartbeatEvent(now) try: - procs = subprocess.check_output(["lsof", '-w', lockfile], stderr=subprocess.STDOUT) - except OSError as e: - if e.errno != errno.ENOENT: - raise - if procs is None: - # Fall back to fuser if lsof is unavailable - try: - procs = subprocess.check_output(["fuser", '-v', lockfile], stderr=subprocess.STDOUT) - except OSError as e: - if e.errno != errno.ENOENT: - raise - - msg = "Delaying shutdown due to active processes which appear to be holding bitbake.lock" - if procs: - msg += ":\n%s" % str(procs) - print(msg) + bb.event.fire(heartbeat, self.cooker.data) + except Exception as exc: + if not isinstance(exc, bb.BBHandledException): + logger.exception('Running heartbeat function') + serverlog("Exception %s broke in idle_thread, exiting" % traceback.format_exc()) + self.quit = True + if nextsleep and bb.event._heartbeat_enabled and now + nextsleep > self.next_heartbeat: + # Shorten timeout so that we we wake up in time for + # the heartbeat. + nextsleep = self.next_heartbeat - now + + if nextsleep is not None: + select.select(fds,[],[],nextsleep)[0] def idle_commands(self, delay, fds=None): nextsleep = delay if not fds: fds = [] - for function, data in list(self._idlefuns.items()): - try: - retval = function(self, data, False) - if retval is False: - del self._idlefuns[function] - nextsleep = None - elif retval is True: - nextsleep = None - elif isinstance(retval, float) and nextsleep: - if (retval < nextsleep): - nextsleep = retval - elif nextsleep is None: - continue - else: - fds = fds + retval - except SystemExit: - raise - except Exception as exc: - if not isinstance(exc, bb.BBHandledException): - logger.exception('Running idle function') - del self._idlefuns[function] - self.quit = True - - # Create new heartbeat event? - now = time.time() - if now >= self.next_heartbeat: - # We might have missed heartbeats. Just trigger once in - # that case and continue after the usual delay. - self.next_heartbeat += self.heartbeat_seconds - if self.next_heartbeat <= now: - self.next_heartbeat = now + self.heartbeat_seconds - heartbeat = bb.event.HeartbeatEvent(now) - bb.event.fire(heartbeat, self.cooker.data) - if nextsleep and now + nextsleep > self.next_heartbeat: - # Shorten timeout so that we we wake up in time for - # the heartbeat. - nextsleep = self.next_heartbeat - now + if not self.idle: + self.idle = threading.Thread(target=self.idle_thread) + self.idle.start() + elif self.idle and not self.idle.is_alive(): + serverlog("Idle thread terminated, main thread exiting too") + bb.error("Idle thread terminated, main thread exiting too") + self.quit = True if nextsleep is not None: if self.xmlrpc: @@ -339,10 +516,23 @@ class ServerCommunicator(): self.recv = recv def runCommand(self, command): - self.connection.send(command) + try: + self.connection.send(command) + except BrokenPipeError as e: + raise BrokenPipeError("bitbake-server might have died or been forcibly stopped, ie. OOM killed") from e if not self.recv.poll(30): - raise ProcessTimeout("Timeout while waiting for a reply from the bitbake server") - return self.recv.get() + logger.info("No reply from server in 30s (for command %s at %s)" % (command[0], currenttime())) + if not self.recv.poll(30): + raise ProcessTimeout("Timeout while waiting for a reply from the bitbake server (60s at %s)" % currenttime()) + try: + ret, exc = self.recv.get() + except EOFError as e: + raise EOFError("bitbake-server might have died or been forcibly stopped, ie. OOM killed") from e + # Should probably turn all exceptions in exc back into exceptions? + # For now, at least handle BBHandledException + if exc and ("BBHandledException" in exc or "SystemExit" in exc): + raise bb.BBHandledException() + return ret, exc def updateFeatureSet(self, featureset): _, error = self.runCommand(["setFeatures", featureset]) @@ -370,44 +560,33 @@ class BitBakeProcessServerConnection(object): self.socket_connection = sock def terminate(self): + self.events.close() self.socket_connection.close() self.connection.connection.close() self.connection.recv.close() return +start_log_format = '--- Starting bitbake server pid %s at %s ---' +start_log_datetime_format = '%Y-%m-%d %H:%M:%S.%f' + class BitBakeServer(object): - start_log_format = '--- Starting bitbake server pid %s at %s ---' - start_log_datetime_format = '%Y-%m-%d %H:%M:%S.%f' - def __init__(self, lock, sockname, configuration, featureset): + def __init__(self, lock, sockname, featureset, server_timeout, xmlrpcinterface, profile): - self.configuration = configuration + self.server_timeout = server_timeout + self.xmlrpcinterface = xmlrpcinterface self.featureset = featureset self.sockname = sockname self.bitbake_lock = lock + self.profile = profile self.readypipe, self.readypipein = os.pipe() - # Create server control socket - if os.path.exists(sockname): - os.unlink(sockname) - # Place the log in the builddirectory alongside the lock file logfile = os.path.join(os.path.dirname(self.bitbake_lock.name), "bitbake-cookerdaemon.log") + self.logfile = logfile - self.sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) - # AF_UNIX has path length issues so chdir here to workaround - cwd = os.getcwd() - try: - os.chdir(os.path.dirname(sockname)) - self.sock.bind(os.path.basename(sockname)) - finally: - os.chdir(cwd) - self.sock.listen(1) - - os.set_inheritable(self.sock.fileno(), True) startdatetime = datetime.datetime.now() bb.daemonize.createDaemon(self._startServer, logfile) - self.sock.close() self.bitbake_lock.close() os.close(self.readypipein) @@ -420,13 +599,13 @@ class BitBakeServer(object): try: r = ready.get() except EOFError: - # Trap the child exitting/closing the pipe and error out + # Trap the child exiting/closing the pipe and error out r = None if not r or r[0] != "r": ready.close() bb.error("Unable to start bitbake server (%s)" % str(r)) if os.path.exists(logfile): - logstart_re = re.compile(self.start_log_format % ('([0-9]+)', '([0-9-]+ [0-9:.]+)')) + logstart_re = re.compile(start_log_format % ('([0-9]+)', '([0-9-]+ [0-9:.]+)')) started = False lines = [] lastlines = [] @@ -436,9 +615,9 @@ class BitBakeServer(object): lines.append(line) else: lastlines.append(line) - res = logstart_re.match(line.rstrip()) + res = logstart_re.search(line.rstrip()) if res: - ldatetime = datetime.datetime.strptime(res.group(2), self.start_log_datetime_format) + ldatetime = datetime.datetime.strptime(res.group(2), start_log_datetime_format) if ldatetime >= startdatetime: started = True lines.append(line) @@ -459,23 +638,55 @@ class BitBakeServer(object): ready.close() def _startServer(self): - print(self.start_log_format % (os.getpid(), datetime.datetime.now().strftime(self.start_log_datetime_format))) - sys.stdout.flush() - - server = ProcessServer(self.bitbake_lock, self.sock, self.sockname) - self.configuration.setServerRegIdleCallback(server.register_idle_function) os.close(self.readypipe) - writer = ConnectionWriter(self.readypipein) - self.cooker = bb.cooker.BBCooker(self.configuration, self.featureset) + os.set_inheritable(self.bitbake_lock.fileno(), True) + os.set_inheritable(self.readypipein, True) + serverscript = os.path.realpath(os.path.dirname(__file__) + "/../../../bin/bitbake-server") + os.execl(sys.executable, sys.executable, serverscript, "decafbad", str(self.bitbake_lock.fileno()), str(self.readypipein), self.logfile, self.bitbake_lock.name, self.sockname, str(self.server_timeout or 0), str(int(self.profile)), str(self.xmlrpcinterface[0]), str(self.xmlrpcinterface[1])) + +def execServer(lockfd, readypipeinfd, lockname, sockname, server_timeout, xmlrpcinterface, profile): + + import bb.cookerdata + import bb.cooker + + serverlog(start_log_format % (os.getpid(), datetime.datetime.now().strftime(start_log_datetime_format))) + + try: + bitbake_lock = os.fdopen(lockfd, "w") + + # Create server control socket + if os.path.exists(sockname): + serverlog("WARNING: removing existing socket file '%s'" % sockname) + os.unlink(sockname) + + sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) + # AF_UNIX has path length issues so chdir here to workaround + cwd = os.getcwd() + try: + os.chdir(os.path.dirname(sockname)) + sock.bind(os.path.basename(sockname)) + finally: + os.chdir(cwd) + sock.listen(1) + + server = ProcessServer(bitbake_lock, lockname, sock, sockname, server_timeout, xmlrpcinterface) + writer = ConnectionWriter(readypipeinfd) + try: + featureset = [] + cooker = bb.cooker.BBCooker(featureset, server) + cooker.configuration.profile = profile + except bb.BBHandledException: + return None writer.send("r") writer.close() - server.cooker = self.cooker - server.server_timeout = self.configuration.server_timeout - server.xmlrpcinterface = self.configuration.xmlrpcinterface - print("Started bitbake server pid %d" % os.getpid()) - sys.stdout.flush() + server.cooker = cooker + serverlog("Started bitbake server pid %d" % os.getpid()) - server.start() + server.run() + finally: + # Flush any messages/errors to the logfile before exit + sys.stdout.flush() + sys.stderr.flush() def connectProcessServer(sockname, featureset): # Connect to socket @@ -578,23 +789,18 @@ class BBUIEventQueue: self.reader = ConnectionReader(readfd) self.t = threading.Thread() - self.t.setDaemon(True) self.t.run = self.startCallbackHandler self.t.start() def getEvent(self): - self.eventQueueLock.acquire() - - if len(self.eventQueue) == 0: - self.eventQueueLock.release() - return None - - item = self.eventQueue.pop(0) + with bb.utils.lock_timeout(self.eventQueueLock): + if len(self.eventQueue) == 0: + return None - if len(self.eventQueue) == 0: - self.eventQueueNotify.clear() + item = self.eventQueue.pop(0) + if len(self.eventQueue) == 0: + self.eventQueueNotify.clear() - self.eventQueueLock.release() return item def waitEvent(self, delay): @@ -602,10 +808,9 @@ class BBUIEventQueue: return self.getEvent() def queue_event(self, event): - self.eventQueueLock.acquire() - self.eventQueue.append(event) - self.eventQueueNotify.set() - self.eventQueueLock.release() + with bb.utils.lock_timeout(self.eventQueueLock): + self.eventQueue.append(event) + self.eventQueueNotify.set() def send_event(self, event): self.queue_event(pickle.loads(event)) @@ -614,13 +819,17 @@ class BBUIEventQueue: bb.utils.set_process_name("UIEventQueue") while True: try: - self.reader.wait() - event = self.reader.get() - self.queue_event(event) - except EOFError: + ready = self.reader.wait(0.25) + if ready: + event = self.reader.get() + self.queue_event(event) + except (EOFError, OSError, TypeError): # Easiest way to exit is to close the file descriptor to cause an exit break + + def close(self): self.reader.close() + self.t.join() class ConnectionReader(object): @@ -635,7 +844,7 @@ class ConnectionReader(object): return self.reader.poll(timeout) def get(self): - with self.rlock: + with bb.utils.lock_timeout(self.rlock): res = self.reader.recv_bytes() return multiprocessing.reduction.ForkingPickler.loads(res) @@ -654,10 +863,31 @@ class ConnectionWriter(object): # Why bb.event needs this I have no idea self.event = self + def _send(self, obj): + gc.disable() + with bb.utils.lock_timeout(self.wlock): + self.writer.send_bytes(obj) + gc.enable() + def send(self, obj): obj = multiprocessing.reduction.ForkingPickler.dumps(obj) - with self.wlock: - self.writer.send_bytes(obj) + # See notes/code in CookerParser + # We must not terminate holding this lock else processes will hang. + # For SIGTERM, raising afterwards avoids this. + # For SIGINT, we don't want to have written partial data to the pipe. + # pthread_sigmask block/unblock would be nice but doesn't work, https://bugs.python.org/issue47139 + process = multiprocessing.current_process() + if process and hasattr(process, "queue_signals"): + with bb.utils.lock_timeout(process.signal_threadlock): + process.queue_signals = True + self._send(obj) + process.queue_signals = False + + while len(process.signal_received) > 0: + sig = process.signal_received.pop() + process.handle_sig(sig, None) + else: + self._send(obj) def fileno(self): return self.writer.fileno() |