diff options
Diffstat (limited to 'bitbake/lib/bb/server/process.py')
-rw-r--r-- | bitbake/lib/bb/server/process.py | 375 |
1 files changed, 261 insertions, 114 deletions
diff --git a/bitbake/lib/bb/server/process.py b/bitbake/lib/bb/server/process.py index 8fdcc66dc7..76b189291d 100644 --- a/bitbake/lib/bb/server/process.py +++ b/bitbake/lib/bb/server/process.py @@ -27,6 +27,8 @@ import re import datetime import pickle import traceback +import gc +import stat import bb.server.xmlrpcserver from bb import daemonize from multiprocessing import queues @@ -36,9 +38,46 @@ logger = logging.getLogger('BitBake') class ProcessTimeout(SystemExit): pass +def currenttime(): + return datetime.datetime.now().strftime('%H:%M:%S.%f') + def serverlog(msg): - print(str(os.getpid()) + " " + datetime.datetime.now().strftime('%H:%M:%S.%f') + " " + msg) - sys.stdout.flush() + print(str(os.getpid()) + " " + currenttime() + " " + msg) + #Seems a flush here triggers filesytem sync like behaviour and long hangs in the server + #sys.stdout.flush() + +# +# When we have lockfile issues, try and find infomation about which process is +# using the lockfile +# +def get_lockfile_process_msg(lockfile): + # Some systems may not have lsof available + procs = None + try: + procs = subprocess.check_output(["lsof", '-w', lockfile], stderr=subprocess.STDOUT) + except subprocess.CalledProcessError: + # File was deleted? + pass + except OSError as e: + if e.errno != errno.ENOENT: + raise + if procs is None: + # Fall back to fuser if lsof is unavailable + try: + procs = subprocess.check_output(["fuser", '-v', lockfile], stderr=subprocess.STDOUT) + except subprocess.CalledProcessError: + # File was deleted? + pass + except OSError as e: + if e.errno != errno.ENOENT: + raise + if procs: + return procs.decode("utf-8") + return None + +class idleFinish(): + def __init__(self, msg): + self.msg = msg class ProcessServer(): profile_filename = "profile.log" @@ -57,12 +96,19 @@ class ProcessServer(): self.maxuiwait = 30 self.xmlrpc = False + self.idle = None + # Need a lock for _idlefuns changes self._idlefuns = {} + self._idlefuncsLock = threading.Lock() + self.idle_cond = threading.Condition(self._idlefuncsLock) self.bitbake_lock = lock self.bitbake_lock_name = lockname self.sock = sock self.sockname = sockname + # It is possible the directory may be renamed. Cache the inode of the socket file + # so we can tell if things changed. + self.sockinode = os.stat(self.sockname)[stat.ST_INO] self.server_timeout = server_timeout self.timeout = self.server_timeout @@ -71,7 +117,9 @@ class ProcessServer(): def register_idle_function(self, function, data): """Register a function to be called while the server is idle""" assert hasattr(function, '__call__') - self._idlefuns[function] = data + with bb.utils.lock_timeout(self._idlefuncsLock): + self._idlefuns[function] = data + serverlog("Registering idle function %s" % str(function)) def run(self): @@ -110,6 +158,31 @@ class ProcessServer(): return ret + def _idle_check(self): + return len(self._idlefuns) == 0 and self.cooker.command.currentAsyncCommand is None + + def wait_for_idle(self, timeout=30): + # Wait for the idle loop to have cleared + with bb.utils.lock_timeout(self._idlefuncsLock): + return self.idle_cond.wait_for(self._idle_check, timeout) is not False + + def set_async_cmd(self, cmd): + with bb.utils.lock_timeout(self._idlefuncsLock): + ret = self.idle_cond.wait_for(self._idle_check, 30) + if ret is False: + return False + self.cooker.command.currentAsyncCommand = cmd + return True + + def clear_async_cmd(self): + with bb.utils.lock_timeout(self._idlefuncsLock): + self.cooker.command.currentAsyncCommand = None + self.idle_cond.notify_all() + + def get_async_cmd(self): + with bb.utils.lock_timeout(self._idlefuncsLock): + return self.cooker.command.currentAsyncCommand + def main(self): self.cooker.pre_serve() @@ -124,14 +197,19 @@ class ProcessServer(): fds.append(self.xmlrpc) seendata = False serverlog("Entering server connection loop") + serverlog("Lockfile is: %s\nSocket is %s (%s)" % (self.bitbake_lock_name, self.sockname, os.path.exists(self.sockname))) def disconnect_client(self, fds): - serverlog("Disconnecting Client") + serverlog("Disconnecting Client (socket: %s)" % os.path.exists(self.sockname)) if self.controllersock: fds.remove(self.controllersock) self.controllersock.close() self.controllersock = False if self.haveui: + # Wait for the idle loop to have cleared (30s max) + if not self.wait_for_idle(30): + serverlog("Idle loop didn't finish queued commands after 30s, exiting.") + self.quit = True fds.remove(self.command_channel) bb.event.unregister_UIHhandler(self.event_handle, True) self.command_channel_reply.writer.close() @@ -143,7 +221,7 @@ class ProcessServer(): self.cooker.clientComplete() self.haveui = False ready = select.select(fds,[],[],0)[0] - if newconnections: + if newconnections and not self.quit: serverlog("Starting new client") conn = newconnections.pop(-1) fds.append(conn) @@ -215,8 +293,10 @@ class ProcessServer(): continue try: serverlog("Running command %s" % command) - self.command_channel_reply.send(self.cooker.command.runCommand(command)) - serverlog("Command Completed") + reply = self.cooker.command.runCommand(command, self) + serverlog("Sending reply %s" % repr(reply)) + self.command_channel_reply.send(reply) + serverlog("Command Completed (socket: %s)" % os.path.exists(self.sockname)) except Exception as e: stack = traceback.format_exc() serverlog('Exception in server main event loop running command %s (%s)' % (command, stack)) @@ -243,19 +323,25 @@ class ProcessServer(): ready = self.idle_commands(.1, fds) - if len(threading.enumerate()) != 1: - serverlog("More than one thread left?: " + str(threading.enumerate())) + if self.idle: + self.idle.join() - serverlog("Exiting") + serverlog("Exiting (socket: %s)" % os.path.exists(self.sockname)) # Remove the socket file so we don't get any more connections to avoid races + # The build directory could have been renamed so if the file isn't the one we created + # we shouldn't delete it. try: - os.unlink(self.sockname) - except: - pass + sockinode = os.stat(self.sockname)[stat.ST_INO] + if sockinode == self.sockinode: + os.unlink(self.sockname) + else: + serverlog("bitbake.sock inode mismatch (%s vs %s), not deleting." % (sockinode, self.sockinode)) + except Exception as err: + serverlog("Removing socket file '%s' failed (%s)" % (self.sockname, err)) self.sock.close() try: - self.cooker.shutdown(True) + self.cooker.shutdown(True, idle=False) self.cooker.notifier.stop() self.cooker.confignotifier.stop() except: @@ -263,6 +349,9 @@ class ProcessServer(): self.cooker.post_serve() + if len(threading.enumerate()) != 1: + serverlog("More than one thread left?: " + str(threading.enumerate())) + # Flush logs before we release the lock sys.stdout.flush() sys.stderr.flush() @@ -278,20 +367,21 @@ class ProcessServer(): except FileNotFoundError: return None - lockcontents = get_lock_contents(lockfile) - serverlog("Original lockfile contents: " + str(lockcontents)) - lock.close() lock = None while not lock: i = 0 lock = None + if not os.path.exists(os.path.basename(lockfile)): + serverlog("Lockfile directory gone, exiting.") + return + while not lock and i < 30: lock = bb.utils.lockfile(lockfile, shared=False, retry=False, block=False) if not lock: newlockcontents = get_lock_contents(lockfile) - if newlockcontents != lockcontents: + if not newlockcontents[0].startswith([f"{os.getpid()}\n", f"{os.getpid()} "]): # A new server was started, the lockfile contents changed, we can exit serverlog("Lockfile now contains different contents, exiting: " + str(newlockcontents)) return @@ -305,80 +395,108 @@ class ProcessServer(): return if not lock: - # Some systems may not have lsof available - procs = None - try: - procs = subprocess.check_output(["lsof", '-w', lockfile], stderr=subprocess.STDOUT) - except subprocess.CalledProcessError: - # File was deleted? - continue - except OSError as e: - if e.errno != errno.ENOENT: - raise - if procs is None: - # Fall back to fuser if lsof is unavailable - try: - procs = subprocess.check_output(["fuser", '-v', lockfile], stderr=subprocess.STDOUT) - except subprocess.CalledProcessError: - # File was deleted? - continue - except OSError as e: - if e.errno != errno.ENOENT: - raise - - msg = "Delaying shutdown due to active processes which appear to be holding bitbake.lock" + procs = get_lockfile_process_msg(lockfile) + msg = ["Delaying shutdown due to active processes which appear to be holding bitbake.lock"] if procs: - msg += ":\n%s" % str(procs.decode("utf-8")) - serverlog(msg) + msg.append(":\n%s" % procs) + serverlog("".join(msg)) - def idle_commands(self, delay, fds=None): - nextsleep = delay - if not fds: - fds = [] - - for function, data in list(self._idlefuns.items()): + def idle_thread(self): + if self.cooker.configuration.profile: try: - retval = function(self, data, False) - if retval is False: - del self._idlefuns[function] - nextsleep = None - elif retval is True: - nextsleep = None - elif isinstance(retval, float) and nextsleep: - if (retval < nextsleep): - nextsleep = retval - elif nextsleep is None: - continue - else: - fds = fds + retval - except SystemExit: - raise - except Exception as exc: - if not isinstance(exc, bb.BBHandledException): - logger.exception('Running idle function') + import cProfile as profile + except: + import profile + prof = profile.Profile() + + ret = profile.Profile.runcall(prof, self.idle_thread_internal) + + prof.dump_stats("profile-mainloop.log") + bb.utils.process_profilelog("profile-mainloop.log") + serverlog("Raw profiling information saved to profile-mainloop.log and processed statistics to profile-mainloop.log.processed") + else: + self.idle_thread_internal() + + def idle_thread_internal(self): + def remove_idle_func(function): + with bb.utils.lock_timeout(self._idlefuncsLock): del self._idlefuns[function] - self.quit = True + self.idle_cond.notify_all() - # Create new heartbeat event? - now = time.time() - if now >= self.next_heartbeat: - # We might have missed heartbeats. Just trigger once in - # that case and continue after the usual delay. - self.next_heartbeat += self.heartbeat_seconds - if self.next_heartbeat <= now: - self.next_heartbeat = now + self.heartbeat_seconds - if hasattr(self.cooker, "data"): - heartbeat = bb.event.HeartbeatEvent(now) + while not self.quit: + nextsleep = 0.1 + fds = [] + + with bb.utils.lock_timeout(self._idlefuncsLock): + items = list(self._idlefuns.items()) + + for function, data in items: try: - bb.event.fire(heartbeat, self.cooker.data) + retval = function(self, data, False) + if isinstance(retval, idleFinish): + serverlog("Removing idle function %s at idleFinish" % str(function)) + remove_idle_func(function) + self.cooker.command.finishAsyncCommand(retval.msg) + nextsleep = None + elif retval is False: + serverlog("Removing idle function %s" % str(function)) + remove_idle_func(function) + nextsleep = None + elif retval is True: + nextsleep = None + elif isinstance(retval, float) and nextsleep: + if (retval < nextsleep): + nextsleep = retval + elif nextsleep is None: + continue + else: + fds = fds + retval + except SystemExit: + raise except Exception as exc: if not isinstance(exc, bb.BBHandledException): - logger.exception('Running heartbeat function') + logger.exception('Running idle function') + remove_idle_func(function) + serverlog("Exception %s broke the idle_thread, exiting" % traceback.format_exc()) self.quit = True - if nextsleep and now + nextsleep > self.next_heartbeat: - # Shorten timeout so that we we wake up in time for - # the heartbeat. - nextsleep = self.next_heartbeat - now + + # Create new heartbeat event? + now = time.time() + if bb.event._heartbeat_enabled and now >= self.next_heartbeat: + # We might have missed heartbeats. Just trigger once in + # that case and continue after the usual delay. + self.next_heartbeat += self.heartbeat_seconds + if self.next_heartbeat <= now: + self.next_heartbeat = now + self.heartbeat_seconds + if hasattr(self.cooker, "data"): + heartbeat = bb.event.HeartbeatEvent(now) + try: + bb.event.fire(heartbeat, self.cooker.data) + except Exception as exc: + if not isinstance(exc, bb.BBHandledException): + logger.exception('Running heartbeat function') + serverlog("Exception %s broke in idle_thread, exiting" % traceback.format_exc()) + self.quit = True + if nextsleep and bb.event._heartbeat_enabled and now + nextsleep > self.next_heartbeat: + # Shorten timeout so that we we wake up in time for + # the heartbeat. + nextsleep = self.next_heartbeat - now + + if nextsleep is not None: + select.select(fds,[],[],nextsleep)[0] + + def idle_commands(self, delay, fds=None): + nextsleep = delay + if not fds: + fds = [] + + if not self.idle: + self.idle = threading.Thread(target=self.idle_thread) + self.idle.start() + elif self.idle and not self.idle.is_alive(): + serverlog("Idle thread terminated, main thread exiting too") + bb.error("Idle thread terminated, main thread exiting too") + self.quit = True if nextsleep is not None: if self.xmlrpc: @@ -398,12 +516,18 @@ class ServerCommunicator(): self.recv = recv def runCommand(self, command): - self.connection.send(command) + try: + self.connection.send(command) + except BrokenPipeError as e: + raise BrokenPipeError("bitbake-server might have died or been forcibly stopped, ie. OOM killed") from e if not self.recv.poll(30): - logger.info("No reply from server in 30s") + logger.info("No reply from server in 30s (for command %s at %s)" % (command[0], currenttime())) if not self.recv.poll(30): - raise ProcessTimeout("Timeout while waiting for a reply from the bitbake server (60s)") - ret, exc = self.recv.get() + raise ProcessTimeout("Timeout while waiting for a reply from the bitbake server (60s at %s)" % currenttime()) + try: + ret, exc = self.recv.get() + except EOFError as e: + raise EOFError("bitbake-server might have died or been forcibly stopped, ie. OOM killed") from e # Should probably turn all exceptions in exc back into exceptions? # For now, at least handle BBHandledException if exc and ("BBHandledException" in exc or "SystemExit" in exc): @@ -436,6 +560,7 @@ class BitBakeProcessServerConnection(object): self.socket_connection = sock def terminate(self): + self.events.close() self.socket_connection.close() self.connection.connection.close() self.connection.recv.close() @@ -446,13 +571,14 @@ start_log_datetime_format = '%Y-%m-%d %H:%M:%S.%f' class BitBakeServer(object): - def __init__(self, lock, sockname, featureset, server_timeout, xmlrpcinterface): + def __init__(self, lock, sockname, featureset, server_timeout, xmlrpcinterface, profile): self.server_timeout = server_timeout self.xmlrpcinterface = xmlrpcinterface self.featureset = featureset self.sockname = sockname self.bitbake_lock = lock + self.profile = profile self.readypipe, self.readypipein = os.pipe() # Place the log in the builddirectory alongside the lock file @@ -516,9 +642,9 @@ class BitBakeServer(object): os.set_inheritable(self.bitbake_lock.fileno(), True) os.set_inheritable(self.readypipein, True) serverscript = os.path.realpath(os.path.dirname(__file__) + "/../../../bin/bitbake-server") - os.execl(sys.executable, "bitbake-server", serverscript, "decafbad", str(self.bitbake_lock.fileno()), str(self.readypipein), self.logfile, self.bitbake_lock.name, self.sockname, str(self.server_timeout or 0), str(self.xmlrpcinterface[0]), str(self.xmlrpcinterface[1])) + os.execl(sys.executable, sys.executable, serverscript, "decafbad", str(self.bitbake_lock.fileno()), str(self.readypipein), self.logfile, self.bitbake_lock.name, self.sockname, str(self.server_timeout or 0), str(int(self.profile)), str(self.xmlrpcinterface[0]), str(self.xmlrpcinterface[1])) -def execServer(lockfd, readypipeinfd, lockname, sockname, server_timeout, xmlrpcinterface): +def execServer(lockfd, readypipeinfd, lockname, sockname, server_timeout, xmlrpcinterface, profile): import bb.cookerdata import bb.cooker @@ -530,6 +656,7 @@ def execServer(lockfd, readypipeinfd, lockname, sockname, server_timeout, xmlrpc # Create server control socket if os.path.exists(sockname): + serverlog("WARNING: removing existing socket file '%s'" % sockname) os.unlink(sockname) sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) @@ -546,7 +673,8 @@ def execServer(lockfd, readypipeinfd, lockname, sockname, server_timeout, xmlrpc writer = ConnectionWriter(readypipeinfd) try: featureset = [] - cooker = bb.cooker.BBCooker(featureset, server.register_idle_function) + cooker = bb.cooker.BBCooker(featureset, server) + cooker.configuration.profile = profile except bb.BBHandledException: return None writer.send("r") @@ -556,7 +684,7 @@ def execServer(lockfd, readypipeinfd, lockname, sockname, server_timeout, xmlrpc server.run() finally: - # Flush any ,essages/errors to the logfile before exit + # Flush any messages/errors to the logfile before exit sys.stdout.flush() sys.stderr.flush() @@ -661,23 +789,18 @@ class BBUIEventQueue: self.reader = ConnectionReader(readfd) self.t = threading.Thread() - self.t.daemon = True self.t.run = self.startCallbackHandler self.t.start() def getEvent(self): - self.eventQueueLock.acquire() - - if len(self.eventQueue) == 0: - self.eventQueueLock.release() - return None - - item = self.eventQueue.pop(0) + with bb.utils.lock_timeout(self.eventQueueLock): + if len(self.eventQueue) == 0: + return None - if len(self.eventQueue) == 0: - self.eventQueueNotify.clear() + item = self.eventQueue.pop(0) + if len(self.eventQueue) == 0: + self.eventQueueNotify.clear() - self.eventQueueLock.release() return item def waitEvent(self, delay): @@ -685,10 +808,9 @@ class BBUIEventQueue: return self.getEvent() def queue_event(self, event): - self.eventQueueLock.acquire() - self.eventQueue.append(event) - self.eventQueueNotify.set() - self.eventQueueLock.release() + with bb.utils.lock_timeout(self.eventQueueLock): + self.eventQueue.append(event) + self.eventQueueNotify.set() def send_event(self, event): self.queue_event(pickle.loads(event)) @@ -697,13 +819,17 @@ class BBUIEventQueue: bb.utils.set_process_name("UIEventQueue") while True: try: - self.reader.wait() - event = self.reader.get() - self.queue_event(event) - except EOFError: + ready = self.reader.wait(0.25) + if ready: + event = self.reader.get() + self.queue_event(event) + except (EOFError, OSError, TypeError): # Easiest way to exit is to close the file descriptor to cause an exit break + + def close(self): self.reader.close() + self.t.join() class ConnectionReader(object): @@ -718,7 +844,7 @@ class ConnectionReader(object): return self.reader.poll(timeout) def get(self): - with self.rlock: + with bb.utils.lock_timeout(self.rlock): res = self.reader.recv_bytes() return multiprocessing.reduction.ForkingPickler.loads(res) @@ -737,10 +863,31 @@ class ConnectionWriter(object): # Why bb.event needs this I have no idea self.event = self + def _send(self, obj): + gc.disable() + with bb.utils.lock_timeout(self.wlock): + self.writer.send_bytes(obj) + gc.enable() + def send(self, obj): obj = multiprocessing.reduction.ForkingPickler.dumps(obj) - with self.wlock: - self.writer.send_bytes(obj) + # See notes/code in CookerParser + # We must not terminate holding this lock else processes will hang. + # For SIGTERM, raising afterwards avoids this. + # For SIGINT, we don't want to have written partial data to the pipe. + # pthread_sigmask block/unblock would be nice but doesn't work, https://bugs.python.org/issue47139 + process = multiprocessing.current_process() + if process and hasattr(process, "queue_signals"): + with bb.utils.lock_timeout(process.signal_threadlock): + process.queue_signals = True + self._send(obj) + process.queue_signals = False + + while len(process.signal_received) > 0: + sig = process.signal_received.pop() + process.handle_sig(sig, None) + else: + self._send(obj) def fileno(self): return self.writer.fileno() |