summaryrefslogtreecommitdiffstats
path: root/bitbake/lib/bb/server/process.py
diff options
context:
space:
mode:
Diffstat (limited to 'bitbake/lib/bb/server/process.py')
-rw-r--r--bitbake/lib/bb/server/process.py375
1 files changed, 261 insertions, 114 deletions
diff --git a/bitbake/lib/bb/server/process.py b/bitbake/lib/bb/server/process.py
index 8fdcc66dc7..76b189291d 100644
--- a/bitbake/lib/bb/server/process.py
+++ b/bitbake/lib/bb/server/process.py
@@ -27,6 +27,8 @@ import re
import datetime
import pickle
import traceback
+import gc
+import stat
import bb.server.xmlrpcserver
from bb import daemonize
from multiprocessing import queues
@@ -36,9 +38,46 @@ logger = logging.getLogger('BitBake')
class ProcessTimeout(SystemExit):
pass
+def currenttime():
+ return datetime.datetime.now().strftime('%H:%M:%S.%f')
+
def serverlog(msg):
- print(str(os.getpid()) + " " + datetime.datetime.now().strftime('%H:%M:%S.%f') + " " + msg)
- sys.stdout.flush()
+ print(str(os.getpid()) + " " + currenttime() + " " + msg)
+ #Seems a flush here triggers filesytem sync like behaviour and long hangs in the server
+ #sys.stdout.flush()
+
+#
+# When we have lockfile issues, try and find infomation about which process is
+# using the lockfile
+#
+def get_lockfile_process_msg(lockfile):
+ # Some systems may not have lsof available
+ procs = None
+ try:
+ procs = subprocess.check_output(["lsof", '-w', lockfile], stderr=subprocess.STDOUT)
+ except subprocess.CalledProcessError:
+ # File was deleted?
+ pass
+ except OSError as e:
+ if e.errno != errno.ENOENT:
+ raise
+ if procs is None:
+ # Fall back to fuser if lsof is unavailable
+ try:
+ procs = subprocess.check_output(["fuser", '-v', lockfile], stderr=subprocess.STDOUT)
+ except subprocess.CalledProcessError:
+ # File was deleted?
+ pass
+ except OSError as e:
+ if e.errno != errno.ENOENT:
+ raise
+ if procs:
+ return procs.decode("utf-8")
+ return None
+
+class idleFinish():
+ def __init__(self, msg):
+ self.msg = msg
class ProcessServer():
profile_filename = "profile.log"
@@ -57,12 +96,19 @@ class ProcessServer():
self.maxuiwait = 30
self.xmlrpc = False
+ self.idle = None
+ # Need a lock for _idlefuns changes
self._idlefuns = {}
+ self._idlefuncsLock = threading.Lock()
+ self.idle_cond = threading.Condition(self._idlefuncsLock)
self.bitbake_lock = lock
self.bitbake_lock_name = lockname
self.sock = sock
self.sockname = sockname
+ # It is possible the directory may be renamed. Cache the inode of the socket file
+ # so we can tell if things changed.
+ self.sockinode = os.stat(self.sockname)[stat.ST_INO]
self.server_timeout = server_timeout
self.timeout = self.server_timeout
@@ -71,7 +117,9 @@ class ProcessServer():
def register_idle_function(self, function, data):
"""Register a function to be called while the server is idle"""
assert hasattr(function, '__call__')
- self._idlefuns[function] = data
+ with bb.utils.lock_timeout(self._idlefuncsLock):
+ self._idlefuns[function] = data
+ serverlog("Registering idle function %s" % str(function))
def run(self):
@@ -110,6 +158,31 @@ class ProcessServer():
return ret
+ def _idle_check(self):
+ return len(self._idlefuns) == 0 and self.cooker.command.currentAsyncCommand is None
+
+ def wait_for_idle(self, timeout=30):
+ # Wait for the idle loop to have cleared
+ with bb.utils.lock_timeout(self._idlefuncsLock):
+ return self.idle_cond.wait_for(self._idle_check, timeout) is not False
+
+ def set_async_cmd(self, cmd):
+ with bb.utils.lock_timeout(self._idlefuncsLock):
+ ret = self.idle_cond.wait_for(self._idle_check, 30)
+ if ret is False:
+ return False
+ self.cooker.command.currentAsyncCommand = cmd
+ return True
+
+ def clear_async_cmd(self):
+ with bb.utils.lock_timeout(self._idlefuncsLock):
+ self.cooker.command.currentAsyncCommand = None
+ self.idle_cond.notify_all()
+
+ def get_async_cmd(self):
+ with bb.utils.lock_timeout(self._idlefuncsLock):
+ return self.cooker.command.currentAsyncCommand
+
def main(self):
self.cooker.pre_serve()
@@ -124,14 +197,19 @@ class ProcessServer():
fds.append(self.xmlrpc)
seendata = False
serverlog("Entering server connection loop")
+ serverlog("Lockfile is: %s\nSocket is %s (%s)" % (self.bitbake_lock_name, self.sockname, os.path.exists(self.sockname)))
def disconnect_client(self, fds):
- serverlog("Disconnecting Client")
+ serverlog("Disconnecting Client (socket: %s)" % os.path.exists(self.sockname))
if self.controllersock:
fds.remove(self.controllersock)
self.controllersock.close()
self.controllersock = False
if self.haveui:
+ # Wait for the idle loop to have cleared (30s max)
+ if not self.wait_for_idle(30):
+ serverlog("Idle loop didn't finish queued commands after 30s, exiting.")
+ self.quit = True
fds.remove(self.command_channel)
bb.event.unregister_UIHhandler(self.event_handle, True)
self.command_channel_reply.writer.close()
@@ -143,7 +221,7 @@ class ProcessServer():
self.cooker.clientComplete()
self.haveui = False
ready = select.select(fds,[],[],0)[0]
- if newconnections:
+ if newconnections and not self.quit:
serverlog("Starting new client")
conn = newconnections.pop(-1)
fds.append(conn)
@@ -215,8 +293,10 @@ class ProcessServer():
continue
try:
serverlog("Running command %s" % command)
- self.command_channel_reply.send(self.cooker.command.runCommand(command))
- serverlog("Command Completed")
+ reply = self.cooker.command.runCommand(command, self)
+ serverlog("Sending reply %s" % repr(reply))
+ self.command_channel_reply.send(reply)
+ serverlog("Command Completed (socket: %s)" % os.path.exists(self.sockname))
except Exception as e:
stack = traceback.format_exc()
serverlog('Exception in server main event loop running command %s (%s)' % (command, stack))
@@ -243,19 +323,25 @@ class ProcessServer():
ready = self.idle_commands(.1, fds)
- if len(threading.enumerate()) != 1:
- serverlog("More than one thread left?: " + str(threading.enumerate()))
+ if self.idle:
+ self.idle.join()
- serverlog("Exiting")
+ serverlog("Exiting (socket: %s)" % os.path.exists(self.sockname))
# Remove the socket file so we don't get any more connections to avoid races
+ # The build directory could have been renamed so if the file isn't the one we created
+ # we shouldn't delete it.
try:
- os.unlink(self.sockname)
- except:
- pass
+ sockinode = os.stat(self.sockname)[stat.ST_INO]
+ if sockinode == self.sockinode:
+ os.unlink(self.sockname)
+ else:
+ serverlog("bitbake.sock inode mismatch (%s vs %s), not deleting." % (sockinode, self.sockinode))
+ except Exception as err:
+ serverlog("Removing socket file '%s' failed (%s)" % (self.sockname, err))
self.sock.close()
try:
- self.cooker.shutdown(True)
+ self.cooker.shutdown(True, idle=False)
self.cooker.notifier.stop()
self.cooker.confignotifier.stop()
except:
@@ -263,6 +349,9 @@ class ProcessServer():
self.cooker.post_serve()
+ if len(threading.enumerate()) != 1:
+ serverlog("More than one thread left?: " + str(threading.enumerate()))
+
# Flush logs before we release the lock
sys.stdout.flush()
sys.stderr.flush()
@@ -278,20 +367,21 @@ class ProcessServer():
except FileNotFoundError:
return None
- lockcontents = get_lock_contents(lockfile)
- serverlog("Original lockfile contents: " + str(lockcontents))
-
lock.close()
lock = None
while not lock:
i = 0
lock = None
+ if not os.path.exists(os.path.basename(lockfile)):
+ serverlog("Lockfile directory gone, exiting.")
+ return
+
while not lock and i < 30:
lock = bb.utils.lockfile(lockfile, shared=False, retry=False, block=False)
if not lock:
newlockcontents = get_lock_contents(lockfile)
- if newlockcontents != lockcontents:
+ if not newlockcontents[0].startswith([f"{os.getpid()}\n", f"{os.getpid()} "]):
# A new server was started, the lockfile contents changed, we can exit
serverlog("Lockfile now contains different contents, exiting: " + str(newlockcontents))
return
@@ -305,80 +395,108 @@ class ProcessServer():
return
if not lock:
- # Some systems may not have lsof available
- procs = None
- try:
- procs = subprocess.check_output(["lsof", '-w', lockfile], stderr=subprocess.STDOUT)
- except subprocess.CalledProcessError:
- # File was deleted?
- continue
- except OSError as e:
- if e.errno != errno.ENOENT:
- raise
- if procs is None:
- # Fall back to fuser if lsof is unavailable
- try:
- procs = subprocess.check_output(["fuser", '-v', lockfile], stderr=subprocess.STDOUT)
- except subprocess.CalledProcessError:
- # File was deleted?
- continue
- except OSError as e:
- if e.errno != errno.ENOENT:
- raise
-
- msg = "Delaying shutdown due to active processes which appear to be holding bitbake.lock"
+ procs = get_lockfile_process_msg(lockfile)
+ msg = ["Delaying shutdown due to active processes which appear to be holding bitbake.lock"]
if procs:
- msg += ":\n%s" % str(procs.decode("utf-8"))
- serverlog(msg)
+ msg.append(":\n%s" % procs)
+ serverlog("".join(msg))
- def idle_commands(self, delay, fds=None):
- nextsleep = delay
- if not fds:
- fds = []
-
- for function, data in list(self._idlefuns.items()):
+ def idle_thread(self):
+ if self.cooker.configuration.profile:
try:
- retval = function(self, data, False)
- if retval is False:
- del self._idlefuns[function]
- nextsleep = None
- elif retval is True:
- nextsleep = None
- elif isinstance(retval, float) and nextsleep:
- if (retval < nextsleep):
- nextsleep = retval
- elif nextsleep is None:
- continue
- else:
- fds = fds + retval
- except SystemExit:
- raise
- except Exception as exc:
- if not isinstance(exc, bb.BBHandledException):
- logger.exception('Running idle function')
+ import cProfile as profile
+ except:
+ import profile
+ prof = profile.Profile()
+
+ ret = profile.Profile.runcall(prof, self.idle_thread_internal)
+
+ prof.dump_stats("profile-mainloop.log")
+ bb.utils.process_profilelog("profile-mainloop.log")
+ serverlog("Raw profiling information saved to profile-mainloop.log and processed statistics to profile-mainloop.log.processed")
+ else:
+ self.idle_thread_internal()
+
+ def idle_thread_internal(self):
+ def remove_idle_func(function):
+ with bb.utils.lock_timeout(self._idlefuncsLock):
del self._idlefuns[function]
- self.quit = True
+ self.idle_cond.notify_all()
- # Create new heartbeat event?
- now = time.time()
- if now >= self.next_heartbeat:
- # We might have missed heartbeats. Just trigger once in
- # that case and continue after the usual delay.
- self.next_heartbeat += self.heartbeat_seconds
- if self.next_heartbeat <= now:
- self.next_heartbeat = now + self.heartbeat_seconds
- if hasattr(self.cooker, "data"):
- heartbeat = bb.event.HeartbeatEvent(now)
+ while not self.quit:
+ nextsleep = 0.1
+ fds = []
+
+ with bb.utils.lock_timeout(self._idlefuncsLock):
+ items = list(self._idlefuns.items())
+
+ for function, data in items:
try:
- bb.event.fire(heartbeat, self.cooker.data)
+ retval = function(self, data, False)
+ if isinstance(retval, idleFinish):
+ serverlog("Removing idle function %s at idleFinish" % str(function))
+ remove_idle_func(function)
+ self.cooker.command.finishAsyncCommand(retval.msg)
+ nextsleep = None
+ elif retval is False:
+ serverlog("Removing idle function %s" % str(function))
+ remove_idle_func(function)
+ nextsleep = None
+ elif retval is True:
+ nextsleep = None
+ elif isinstance(retval, float) and nextsleep:
+ if (retval < nextsleep):
+ nextsleep = retval
+ elif nextsleep is None:
+ continue
+ else:
+ fds = fds + retval
+ except SystemExit:
+ raise
except Exception as exc:
if not isinstance(exc, bb.BBHandledException):
- logger.exception('Running heartbeat function')
+ logger.exception('Running idle function')
+ remove_idle_func(function)
+ serverlog("Exception %s broke the idle_thread, exiting" % traceback.format_exc())
self.quit = True
- if nextsleep and now + nextsleep > self.next_heartbeat:
- # Shorten timeout so that we we wake up in time for
- # the heartbeat.
- nextsleep = self.next_heartbeat - now
+
+ # Create new heartbeat event?
+ now = time.time()
+ if bb.event._heartbeat_enabled and now >= self.next_heartbeat:
+ # We might have missed heartbeats. Just trigger once in
+ # that case and continue after the usual delay.
+ self.next_heartbeat += self.heartbeat_seconds
+ if self.next_heartbeat <= now:
+ self.next_heartbeat = now + self.heartbeat_seconds
+ if hasattr(self.cooker, "data"):
+ heartbeat = bb.event.HeartbeatEvent(now)
+ try:
+ bb.event.fire(heartbeat, self.cooker.data)
+ except Exception as exc:
+ if not isinstance(exc, bb.BBHandledException):
+ logger.exception('Running heartbeat function')
+ serverlog("Exception %s broke in idle_thread, exiting" % traceback.format_exc())
+ self.quit = True
+ if nextsleep and bb.event._heartbeat_enabled and now + nextsleep > self.next_heartbeat:
+ # Shorten timeout so that we we wake up in time for
+ # the heartbeat.
+ nextsleep = self.next_heartbeat - now
+
+ if nextsleep is not None:
+ select.select(fds,[],[],nextsleep)[0]
+
+ def idle_commands(self, delay, fds=None):
+ nextsleep = delay
+ if not fds:
+ fds = []
+
+ if not self.idle:
+ self.idle = threading.Thread(target=self.idle_thread)
+ self.idle.start()
+ elif self.idle and not self.idle.is_alive():
+ serverlog("Idle thread terminated, main thread exiting too")
+ bb.error("Idle thread terminated, main thread exiting too")
+ self.quit = True
if nextsleep is not None:
if self.xmlrpc:
@@ -398,12 +516,18 @@ class ServerCommunicator():
self.recv = recv
def runCommand(self, command):
- self.connection.send(command)
+ try:
+ self.connection.send(command)
+ except BrokenPipeError as e:
+ raise BrokenPipeError("bitbake-server might have died or been forcibly stopped, ie. OOM killed") from e
if not self.recv.poll(30):
- logger.info("No reply from server in 30s")
+ logger.info("No reply from server in 30s (for command %s at %s)" % (command[0], currenttime()))
if not self.recv.poll(30):
- raise ProcessTimeout("Timeout while waiting for a reply from the bitbake server (60s)")
- ret, exc = self.recv.get()
+ raise ProcessTimeout("Timeout while waiting for a reply from the bitbake server (60s at %s)" % currenttime())
+ try:
+ ret, exc = self.recv.get()
+ except EOFError as e:
+ raise EOFError("bitbake-server might have died or been forcibly stopped, ie. OOM killed") from e
# Should probably turn all exceptions in exc back into exceptions?
# For now, at least handle BBHandledException
if exc and ("BBHandledException" in exc or "SystemExit" in exc):
@@ -436,6 +560,7 @@ class BitBakeProcessServerConnection(object):
self.socket_connection = sock
def terminate(self):
+ self.events.close()
self.socket_connection.close()
self.connection.connection.close()
self.connection.recv.close()
@@ -446,13 +571,14 @@ start_log_datetime_format = '%Y-%m-%d %H:%M:%S.%f'
class BitBakeServer(object):
- def __init__(self, lock, sockname, featureset, server_timeout, xmlrpcinterface):
+ def __init__(self, lock, sockname, featureset, server_timeout, xmlrpcinterface, profile):
self.server_timeout = server_timeout
self.xmlrpcinterface = xmlrpcinterface
self.featureset = featureset
self.sockname = sockname
self.bitbake_lock = lock
+ self.profile = profile
self.readypipe, self.readypipein = os.pipe()
# Place the log in the builddirectory alongside the lock file
@@ -516,9 +642,9 @@ class BitBakeServer(object):
os.set_inheritable(self.bitbake_lock.fileno(), True)
os.set_inheritable(self.readypipein, True)
serverscript = os.path.realpath(os.path.dirname(__file__) + "/../../../bin/bitbake-server")
- os.execl(sys.executable, "bitbake-server", serverscript, "decafbad", str(self.bitbake_lock.fileno()), str(self.readypipein), self.logfile, self.bitbake_lock.name, self.sockname, str(self.server_timeout or 0), str(self.xmlrpcinterface[0]), str(self.xmlrpcinterface[1]))
+ os.execl(sys.executable, sys.executable, serverscript, "decafbad", str(self.bitbake_lock.fileno()), str(self.readypipein), self.logfile, self.bitbake_lock.name, self.sockname, str(self.server_timeout or 0), str(int(self.profile)), str(self.xmlrpcinterface[0]), str(self.xmlrpcinterface[1]))
-def execServer(lockfd, readypipeinfd, lockname, sockname, server_timeout, xmlrpcinterface):
+def execServer(lockfd, readypipeinfd, lockname, sockname, server_timeout, xmlrpcinterface, profile):
import bb.cookerdata
import bb.cooker
@@ -530,6 +656,7 @@ def execServer(lockfd, readypipeinfd, lockname, sockname, server_timeout, xmlrpc
# Create server control socket
if os.path.exists(sockname):
+ serverlog("WARNING: removing existing socket file '%s'" % sockname)
os.unlink(sockname)
sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
@@ -546,7 +673,8 @@ def execServer(lockfd, readypipeinfd, lockname, sockname, server_timeout, xmlrpc
writer = ConnectionWriter(readypipeinfd)
try:
featureset = []
- cooker = bb.cooker.BBCooker(featureset, server.register_idle_function)
+ cooker = bb.cooker.BBCooker(featureset, server)
+ cooker.configuration.profile = profile
except bb.BBHandledException:
return None
writer.send("r")
@@ -556,7 +684,7 @@ def execServer(lockfd, readypipeinfd, lockname, sockname, server_timeout, xmlrpc
server.run()
finally:
- # Flush any ,essages/errors to the logfile before exit
+ # Flush any messages/errors to the logfile before exit
sys.stdout.flush()
sys.stderr.flush()
@@ -661,23 +789,18 @@ class BBUIEventQueue:
self.reader = ConnectionReader(readfd)
self.t = threading.Thread()
- self.t.daemon = True
self.t.run = self.startCallbackHandler
self.t.start()
def getEvent(self):
- self.eventQueueLock.acquire()
-
- if len(self.eventQueue) == 0:
- self.eventQueueLock.release()
- return None
-
- item = self.eventQueue.pop(0)
+ with bb.utils.lock_timeout(self.eventQueueLock):
+ if len(self.eventQueue) == 0:
+ return None
- if len(self.eventQueue) == 0:
- self.eventQueueNotify.clear()
+ item = self.eventQueue.pop(0)
+ if len(self.eventQueue) == 0:
+ self.eventQueueNotify.clear()
- self.eventQueueLock.release()
return item
def waitEvent(self, delay):
@@ -685,10 +808,9 @@ class BBUIEventQueue:
return self.getEvent()
def queue_event(self, event):
- self.eventQueueLock.acquire()
- self.eventQueue.append(event)
- self.eventQueueNotify.set()
- self.eventQueueLock.release()
+ with bb.utils.lock_timeout(self.eventQueueLock):
+ self.eventQueue.append(event)
+ self.eventQueueNotify.set()
def send_event(self, event):
self.queue_event(pickle.loads(event))
@@ -697,13 +819,17 @@ class BBUIEventQueue:
bb.utils.set_process_name("UIEventQueue")
while True:
try:
- self.reader.wait()
- event = self.reader.get()
- self.queue_event(event)
- except EOFError:
+ ready = self.reader.wait(0.25)
+ if ready:
+ event = self.reader.get()
+ self.queue_event(event)
+ except (EOFError, OSError, TypeError):
# Easiest way to exit is to close the file descriptor to cause an exit
break
+
+ def close(self):
self.reader.close()
+ self.t.join()
class ConnectionReader(object):
@@ -718,7 +844,7 @@ class ConnectionReader(object):
return self.reader.poll(timeout)
def get(self):
- with self.rlock:
+ with bb.utils.lock_timeout(self.rlock):
res = self.reader.recv_bytes()
return multiprocessing.reduction.ForkingPickler.loads(res)
@@ -737,10 +863,31 @@ class ConnectionWriter(object):
# Why bb.event needs this I have no idea
self.event = self
+ def _send(self, obj):
+ gc.disable()
+ with bb.utils.lock_timeout(self.wlock):
+ self.writer.send_bytes(obj)
+ gc.enable()
+
def send(self, obj):
obj = multiprocessing.reduction.ForkingPickler.dumps(obj)
- with self.wlock:
- self.writer.send_bytes(obj)
+ # See notes/code in CookerParser
+ # We must not terminate holding this lock else processes will hang.
+ # For SIGTERM, raising afterwards avoids this.
+ # For SIGINT, we don't want to have written partial data to the pipe.
+ # pthread_sigmask block/unblock would be nice but doesn't work, https://bugs.python.org/issue47139
+ process = multiprocessing.current_process()
+ if process and hasattr(process, "queue_signals"):
+ with bb.utils.lock_timeout(process.signal_threadlock):
+ process.queue_signals = True
+ self._send(obj)
+ process.queue_signals = False
+
+ while len(process.signal_received) > 0:
+ sig = process.signal_received.pop()
+ process.handle_sig(sig, None)
+ else:
+ self._send(obj)
def fileno(self):
return self.writer.fileno()