summaryrefslogtreecommitdiffstats
path: root/bitbake/lib/bb/server/process.py
diff options
context:
space:
mode:
Diffstat (limited to 'bitbake/lib/bb/server/process.py')
-rw-r--r--bitbake/lib/bb/server/process.py588
1 files changed, 409 insertions, 179 deletions
diff --git a/bitbake/lib/bb/server/process.py b/bitbake/lib/bb/server/process.py
index 80a7875ad9..76b189291d 100644
--- a/bitbake/lib/bb/server/process.py
+++ b/bitbake/lib/bb/server/process.py
@@ -3,18 +3,8 @@
#
# Copyright (C) 2010 Bob Foerster <robert@erafx.com>
#
-# This program is free software; you can redistribute it and/or modify
-# it under the terms of the GNU General Public License version 2 as
-# published by the Free Software Foundation.
+# SPDX-License-Identifier: GPL-2.0-only
#
-# This program is distributed in the hope that it will be useful,
-# but WITHOUT ANY WARRANTY; without even the implied warranty of
-# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-# GNU General Public License for more details.
-#
-# You should have received a copy of the GNU General Public License along
-# with this program; if not, write to the Free Software Foundation, Inc.,
-# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
"""
This module implements a multiprocessing.Process based server for bitbake.
@@ -35,6 +25,10 @@ import subprocess
import errno
import re
import datetime
+import pickle
+import traceback
+import gc
+import stat
import bb.server.xmlrpcserver
from bb import daemonize
from multiprocessing import queues
@@ -44,12 +38,52 @@ logger = logging.getLogger('BitBake')
class ProcessTimeout(SystemExit):
pass
-class ProcessServer(multiprocessing.Process):
+def currenttime():
+ return datetime.datetime.now().strftime('%H:%M:%S.%f')
+
+def serverlog(msg):
+ print(str(os.getpid()) + " " + currenttime() + " " + msg)
+ #Seems a flush here triggers filesytem sync like behaviour and long hangs in the server
+ #sys.stdout.flush()
+
+#
+# When we have lockfile issues, try and find infomation about which process is
+# using the lockfile
+#
+def get_lockfile_process_msg(lockfile):
+ # Some systems may not have lsof available
+ procs = None
+ try:
+ procs = subprocess.check_output(["lsof", '-w', lockfile], stderr=subprocess.STDOUT)
+ except subprocess.CalledProcessError:
+ # File was deleted?
+ pass
+ except OSError as e:
+ if e.errno != errno.ENOENT:
+ raise
+ if procs is None:
+ # Fall back to fuser if lsof is unavailable
+ try:
+ procs = subprocess.check_output(["fuser", '-v', lockfile], stderr=subprocess.STDOUT)
+ except subprocess.CalledProcessError:
+ # File was deleted?
+ pass
+ except OSError as e:
+ if e.errno != errno.ENOENT:
+ raise
+ if procs:
+ return procs.decode("utf-8")
+ return None
+
+class idleFinish():
+ def __init__(self, msg):
+ self.msg = msg
+
+class ProcessServer():
profile_filename = "profile.log"
profile_processed_filename = "profile.log.processed"
- def __init__(self, lock, sock, sockname):
- multiprocessing.Process.__init__(self)
+ def __init__(self, lock, lockname, sock, sockname, server_timeout, xmlrpcinterface):
self.command_channel = False
self.command_channel_reply = False
self.quit = False
@@ -57,42 +91,42 @@ class ProcessServer(multiprocessing.Process):
self.next_heartbeat = time.time()
self.event_handle = None
+ self.hadanyui = False
self.haveui = False
- self.lastui = False
+ self.maxuiwait = 30
self.xmlrpc = False
+ self.idle = None
+ # Need a lock for _idlefuns changes
self._idlefuns = {}
+ self._idlefuncsLock = threading.Lock()
+ self.idle_cond = threading.Condition(self._idlefuncsLock)
self.bitbake_lock = lock
+ self.bitbake_lock_name = lockname
self.sock = sock
self.sockname = sockname
+ # It is possible the directory may be renamed. Cache the inode of the socket file
+ # so we can tell if things changed.
+ self.sockinode = os.stat(self.sockname)[stat.ST_INO]
+
+ self.server_timeout = server_timeout
+ self.timeout = self.server_timeout
+ self.xmlrpcinterface = xmlrpcinterface
def register_idle_function(self, function, data):
"""Register a function to be called while the server is idle"""
assert hasattr(function, '__call__')
- self._idlefuns[function] = data
+ with bb.utils.lock_timeout(self._idlefuncsLock):
+ self._idlefuns[function] = data
+ serverlog("Registering idle function %s" % str(function))
def run(self):
if self.xmlrpcinterface[0]:
self.xmlrpc = bb.server.xmlrpcserver.BitBakeXMLRPCServer(self.xmlrpcinterface, self.cooker, self)
- print("Bitbake XMLRPC server address: %s, server port: %s" % (self.xmlrpc.host, self.xmlrpc.port))
-
- heartbeat_event = self.cooker.data.getVar('BB_HEARTBEAT_EVENT')
- if heartbeat_event:
- try:
- self.heartbeat_seconds = float(heartbeat_event)
- except:
- bb.warn('Ignoring invalid BB_HEARTBEAT_EVENT=%s, must be a float specifying seconds.' % heartbeat_event)
-
- self.timeout = self.server_timeout or self.cooker.data.getVar('BB_SERVER_TIMEOUT')
- try:
- if self.timeout:
- self.timeout = float(self.timeout)
- except:
- bb.warn('Ignoring invalid BB_SERVER_TIMEOUT=%s, must be a float specifying seconds.' % self.timeout)
-
+ serverlog("Bitbake XMLRPC server address: %s, server port: %s" % (self.xmlrpc.host, self.xmlrpc.port))
try:
self.bitbake_lock.seek(0)
@@ -103,7 +137,7 @@ class ProcessServer(multiprocessing.Process):
self.bitbake_lock.write("%s\n" % (os.getpid()))
self.bitbake_lock.flush()
except Exception as e:
- print("Error writing to lock file: %s" % str(e))
+ serverlog("Error writing to lock file: %s" % str(e))
pass
if self.cooker.configuration.profile:
@@ -117,13 +151,38 @@ class ProcessServer(multiprocessing.Process):
prof.dump_stats("profile.log")
bb.utils.process_profilelog("profile.log")
- print("Raw profiling information saved to profile.log and processed statistics to profile.log.processed")
+ serverlog("Raw profiling information saved to profile.log and processed statistics to profile.log.processed")
else:
ret = self.main()
return ret
+ def _idle_check(self):
+ return len(self._idlefuns) == 0 and self.cooker.command.currentAsyncCommand is None
+
+ def wait_for_idle(self, timeout=30):
+ # Wait for the idle loop to have cleared
+ with bb.utils.lock_timeout(self._idlefuncsLock):
+ return self.idle_cond.wait_for(self._idle_check, timeout) is not False
+
+ def set_async_cmd(self, cmd):
+ with bb.utils.lock_timeout(self._idlefuncsLock):
+ ret = self.idle_cond.wait_for(self._idle_check, 30)
+ if ret is False:
+ return False
+ self.cooker.command.currentAsyncCommand = cmd
+ return True
+
+ def clear_async_cmd(self):
+ with bb.utils.lock_timeout(self._idlefuncsLock):
+ self.cooker.command.currentAsyncCommand = None
+ self.idle_cond.notify_all()
+
+ def get_async_cmd(self):
+ with bb.utils.lock_timeout(self._idlefuncsLock):
+ return self.cooker.command.currentAsyncCommand
+
def main(self):
self.cooker.pre_serve()
@@ -136,15 +195,21 @@ class ProcessServer(multiprocessing.Process):
fds = [self.sock]
if self.xmlrpc:
fds.append(self.xmlrpc)
- print("Entering server connection loop")
+ seendata = False
+ serverlog("Entering server connection loop")
+ serverlog("Lockfile is: %s\nSocket is %s (%s)" % (self.bitbake_lock_name, self.sockname, os.path.exists(self.sockname)))
def disconnect_client(self, fds):
- print("Disconnecting Client")
+ serverlog("Disconnecting Client (socket: %s)" % os.path.exists(self.sockname))
if self.controllersock:
fds.remove(self.controllersock)
self.controllersock.close()
self.controllersock = False
if self.haveui:
+ # Wait for the idle loop to have cleared (30s max)
+ if not self.wait_for_idle(30):
+ serverlog("Idle loop didn't finish queued commands after 30s, exiting.")
+ self.quit = True
fds.remove(self.command_channel)
bb.event.unregister_UIHhandler(self.event_handle, True)
self.command_channel_reply.writer.close()
@@ -156,31 +221,32 @@ class ProcessServer(multiprocessing.Process):
self.cooker.clientComplete()
self.haveui = False
ready = select.select(fds,[],[],0)[0]
- if newconnections:
- print("Starting new client")
+ if newconnections and not self.quit:
+ serverlog("Starting new client")
conn = newconnections.pop(-1)
fds.append(conn)
self.controllersock = conn
- elif self.timeout is None and not ready:
- print("No timeout, exiting.")
+ elif not self.timeout and not ready:
+ serverlog("No timeout, exiting.")
self.quit = True
+ self.lastui = time.time()
while not self.quit:
if self.sock in ready:
while select.select([self.sock],[],[],0)[0]:
controllersock, address = self.sock.accept()
if self.controllersock:
- print("Queuing %s (%s)" % (str(ready), str(newconnections)))
+ serverlog("Queuing %s (%s)" % (str(ready), str(newconnections)))
newconnections.append(controllersock)
else:
- print("Accepting %s (%s)" % (str(ready), str(newconnections)))
+ serverlog("Accepting %s (%s)" % (str(ready), str(newconnections)))
self.controllersock = controllersock
fds.append(controllersock)
if self.controllersock in ready:
try:
- print("Processing Client")
+ serverlog("Processing Client")
ui_fds = recvfds(self.controllersock, 3)
- print("Connecting Client")
+ serverlog("Connecting Client")
# Where to write events to
writer = ConnectionWriter(ui_fds[0])
@@ -197,13 +263,21 @@ class ProcessServer(multiprocessing.Process):
self.command_channel_reply = writer
self.haveui = True
+ self.hadanyui = True
except (EOFError, OSError):
disconnect_client(self, fds)
- if not self.timeout == -1.0 and not self.haveui and self.lastui and self.timeout and \
+ if not self.timeout == -1.0 and not self.haveui and self.timeout and \
(self.lastui + self.timeout) < time.time():
- print("Server timeout, exiting.")
+ serverlog("Server timeout, exiting.")
+ self.quit = True
+
+ # If we don't see a UI connection within maxuiwait, its unlikely we're going to see
+ # one. We have had issue with processes hanging indefinitely so timing out UI-less
+ # servers is useful.
+ if not self.hadanyui and not self.xmlrpc and not self.timeout and (self.lastui + self.maxuiwait) < time.time():
+ serverlog("No UI connection within max timeout, exiting to avoid infinite loop.")
self.quit = True
if self.command_channel in ready:
@@ -218,23 +292,56 @@ class ProcessServer(multiprocessing.Process):
self.quit = True
continue
try:
- print("Running command %s" % command)
- self.command_channel_reply.send(self.cooker.command.runCommand(command))
+ serverlog("Running command %s" % command)
+ reply = self.cooker.command.runCommand(command, self)
+ serverlog("Sending reply %s" % repr(reply))
+ self.command_channel_reply.send(reply)
+ serverlog("Command Completed (socket: %s)" % os.path.exists(self.sockname))
except Exception as e:
- logger.exception('Exception in server main event loop running command %s (%s)' % (command, str(e)))
+ stack = traceback.format_exc()
+ serverlog('Exception in server main event loop running command %s (%s)' % (command, stack))
+ logger.exception('Exception in server main event loop running command %s (%s)' % (command, stack))
if self.xmlrpc in ready:
self.xmlrpc.handle_requests()
+ if not seendata and hasattr(self.cooker, "data"):
+ heartbeat_event = self.cooker.data.getVar('BB_HEARTBEAT_EVENT')
+ if heartbeat_event:
+ try:
+ self.heartbeat_seconds = float(heartbeat_event)
+ except:
+ bb.warn('Ignoring invalid BB_HEARTBEAT_EVENT=%s, must be a float specifying seconds.' % heartbeat_event)
+
+ self.timeout = self.server_timeout or self.cooker.data.getVar('BB_SERVER_TIMEOUT')
+ try:
+ if self.timeout:
+ self.timeout = float(self.timeout)
+ except:
+ bb.warn('Ignoring invalid BB_SERVER_TIMEOUT=%s, must be a float specifying seconds.' % self.timeout)
+ seendata = True
+
ready = self.idle_commands(.1, fds)
- print("Exiting")
+ if self.idle:
+ self.idle.join()
+
+ serverlog("Exiting (socket: %s)" % os.path.exists(self.sockname))
# Remove the socket file so we don't get any more connections to avoid races
- os.unlink(self.sockname)
+ # The build directory could have been renamed so if the file isn't the one we created
+ # we shouldn't delete it.
+ try:
+ sockinode = os.stat(self.sockname)[stat.ST_INO]
+ if sockinode == self.sockinode:
+ os.unlink(self.sockname)
+ else:
+ serverlog("bitbake.sock inode mismatch (%s vs %s), not deleting." % (sockinode, self.sockinode))
+ except Exception as err:
+ serverlog("Removing socket file '%s' failed (%s)" % (self.sockname, err))
self.sock.close()
- try:
- self.cooker.shutdown(True)
+ try:
+ self.cooker.shutdown(True, idle=False)
self.cooker.notifier.stop()
self.cooker.confignotifier.stop()
except:
@@ -242,84 +349,154 @@ class ProcessServer(multiprocessing.Process):
self.cooker.post_serve()
+ if len(threading.enumerate()) != 1:
+ serverlog("More than one thread left?: " + str(threading.enumerate()))
+
+ # Flush logs before we release the lock
+ sys.stdout.flush()
+ sys.stderr.flush()
+
# Finally release the lockfile but warn about other processes holding it open
lock = self.bitbake_lock
- lockfile = lock.name
+ lockfile = self.bitbake_lock_name
+
+ def get_lock_contents(lockfile):
+ try:
+ with open(lockfile, "r") as f:
+ return f.readlines()
+ except FileNotFoundError:
+ return None
+
lock.close()
lock = None
while not lock:
- with bb.utils.timeout(3):
- lock = bb.utils.lockfile(lockfile, shared=False, retry=False, block=True)
- if lock:
- # We hold the lock so we can remove the file (hide stale pid data)
- bb.utils.remove(lockfile)
- bb.utils.unlockfile(lock)
- return
-
+ i = 0
+ lock = None
+ if not os.path.exists(os.path.basename(lockfile)):
+ serverlog("Lockfile directory gone, exiting.")
+ return
+
+ while not lock and i < 30:
+ lock = bb.utils.lockfile(lockfile, shared=False, retry=False, block=False)
if not lock:
- # Some systems may not have lsof available
- procs = None
+ newlockcontents = get_lock_contents(lockfile)
+ if not newlockcontents[0].startswith([f"{os.getpid()}\n", f"{os.getpid()} "]):
+ # A new server was started, the lockfile contents changed, we can exit
+ serverlog("Lockfile now contains different contents, exiting: " + str(newlockcontents))
+ return
+ time.sleep(0.1)
+ i += 1
+ if lock:
+ # We hold the lock so we can remove the file (hide stale pid data)
+ # via unlockfile.
+ bb.utils.unlockfile(lock)
+ serverlog("Exiting as we could obtain the lock")
+ return
+
+ if not lock:
+ procs = get_lockfile_process_msg(lockfile)
+ msg = ["Delaying shutdown due to active processes which appear to be holding bitbake.lock"]
+ if procs:
+ msg.append(":\n%s" % procs)
+ serverlog("".join(msg))
+
+ def idle_thread(self):
+ if self.cooker.configuration.profile:
+ try:
+ import cProfile as profile
+ except:
+ import profile
+ prof = profile.Profile()
+
+ ret = profile.Profile.runcall(prof, self.idle_thread_internal)
+
+ prof.dump_stats("profile-mainloop.log")
+ bb.utils.process_profilelog("profile-mainloop.log")
+ serverlog("Raw profiling information saved to profile-mainloop.log and processed statistics to profile-mainloop.log.processed")
+ else:
+ self.idle_thread_internal()
+
+ def idle_thread_internal(self):
+ def remove_idle_func(function):
+ with bb.utils.lock_timeout(self._idlefuncsLock):
+ del self._idlefuns[function]
+ self.idle_cond.notify_all()
+
+ while not self.quit:
+ nextsleep = 0.1
+ fds = []
+
+ with bb.utils.lock_timeout(self._idlefuncsLock):
+ items = list(self._idlefuns.items())
+
+ for function, data in items:
+ try:
+ retval = function(self, data, False)
+ if isinstance(retval, idleFinish):
+ serverlog("Removing idle function %s at idleFinish" % str(function))
+ remove_idle_func(function)
+ self.cooker.command.finishAsyncCommand(retval.msg)
+ nextsleep = None
+ elif retval is False:
+ serverlog("Removing idle function %s" % str(function))
+ remove_idle_func(function)
+ nextsleep = None
+ elif retval is True:
+ nextsleep = None
+ elif isinstance(retval, float) and nextsleep:
+ if (retval < nextsleep):
+ nextsleep = retval
+ elif nextsleep is None:
+ continue
+ else:
+ fds = fds + retval
+ except SystemExit:
+ raise
+ except Exception as exc:
+ if not isinstance(exc, bb.BBHandledException):
+ logger.exception('Running idle function')
+ remove_idle_func(function)
+ serverlog("Exception %s broke the idle_thread, exiting" % traceback.format_exc())
+ self.quit = True
+
+ # Create new heartbeat event?
+ now = time.time()
+ if bb.event._heartbeat_enabled and now >= self.next_heartbeat:
+ # We might have missed heartbeats. Just trigger once in
+ # that case and continue after the usual delay.
+ self.next_heartbeat += self.heartbeat_seconds
+ if self.next_heartbeat <= now:
+ self.next_heartbeat = now + self.heartbeat_seconds
+ if hasattr(self.cooker, "data"):
+ heartbeat = bb.event.HeartbeatEvent(now)
try:
- procs = subprocess.check_output(["lsof", '-w', lockfile], stderr=subprocess.STDOUT)
- except OSError as e:
- if e.errno != errno.ENOENT:
- raise
- if procs is None:
- # Fall back to fuser if lsof is unavailable
- try:
- procs = subprocess.check_output(["fuser", '-v', lockfile], stderr=subprocess.STDOUT)
- except OSError as e:
- if e.errno != errno.ENOENT:
- raise
-
- msg = "Delaying shutdown due to active processes which appear to be holding bitbake.lock"
- if procs:
- msg += ":\n%s" % str(procs)
- print(msg)
+ bb.event.fire(heartbeat, self.cooker.data)
+ except Exception as exc:
+ if not isinstance(exc, bb.BBHandledException):
+ logger.exception('Running heartbeat function')
+ serverlog("Exception %s broke in idle_thread, exiting" % traceback.format_exc())
+ self.quit = True
+ if nextsleep and bb.event._heartbeat_enabled and now + nextsleep > self.next_heartbeat:
+ # Shorten timeout so that we we wake up in time for
+ # the heartbeat.
+ nextsleep = self.next_heartbeat - now
+
+ if nextsleep is not None:
+ select.select(fds,[],[],nextsleep)[0]
def idle_commands(self, delay, fds=None):
nextsleep = delay
if not fds:
fds = []
- for function, data in list(self._idlefuns.items()):
- try:
- retval = function(self, data, False)
- if retval is False:
- del self._idlefuns[function]
- nextsleep = None
- elif retval is True:
- nextsleep = None
- elif isinstance(retval, float) and nextsleep:
- if (retval < nextsleep):
- nextsleep = retval
- elif nextsleep is None:
- continue
- else:
- fds = fds + retval
- except SystemExit:
- raise
- except Exception as exc:
- if not isinstance(exc, bb.BBHandledException):
- logger.exception('Running idle function')
- del self._idlefuns[function]
- self.quit = True
-
- # Create new heartbeat event?
- now = time.time()
- if now >= self.next_heartbeat:
- # We might have missed heartbeats. Just trigger once in
- # that case and continue after the usual delay.
- self.next_heartbeat += self.heartbeat_seconds
- if self.next_heartbeat <= now:
- self.next_heartbeat = now + self.heartbeat_seconds
- heartbeat = bb.event.HeartbeatEvent(now)
- bb.event.fire(heartbeat, self.cooker.data)
- if nextsleep and now + nextsleep > self.next_heartbeat:
- # Shorten timeout so that we we wake up in time for
- # the heartbeat.
- nextsleep = self.next_heartbeat - now
+ if not self.idle:
+ self.idle = threading.Thread(target=self.idle_thread)
+ self.idle.start()
+ elif self.idle and not self.idle.is_alive():
+ serverlog("Idle thread terminated, main thread exiting too")
+ bb.error("Idle thread terminated, main thread exiting too")
+ self.quit = True
if nextsleep is not None:
if self.xmlrpc:
@@ -339,10 +516,23 @@ class ServerCommunicator():
self.recv = recv
def runCommand(self, command):
- self.connection.send(command)
+ try:
+ self.connection.send(command)
+ except BrokenPipeError as e:
+ raise BrokenPipeError("bitbake-server might have died or been forcibly stopped, ie. OOM killed") from e
if not self.recv.poll(30):
- raise ProcessTimeout("Timeout while waiting for a reply from the bitbake server")
- return self.recv.get()
+ logger.info("No reply from server in 30s (for command %s at %s)" % (command[0], currenttime()))
+ if not self.recv.poll(30):
+ raise ProcessTimeout("Timeout while waiting for a reply from the bitbake server (60s at %s)" % currenttime())
+ try:
+ ret, exc = self.recv.get()
+ except EOFError as e:
+ raise EOFError("bitbake-server might have died or been forcibly stopped, ie. OOM killed") from e
+ # Should probably turn all exceptions in exc back into exceptions?
+ # For now, at least handle BBHandledException
+ if exc and ("BBHandledException" in exc or "SystemExit" in exc):
+ raise bb.BBHandledException()
+ return ret, exc
def updateFeatureSet(self, featureset):
_, error = self.runCommand(["setFeatures", featureset])
@@ -370,44 +560,33 @@ class BitBakeProcessServerConnection(object):
self.socket_connection = sock
def terminate(self):
+ self.events.close()
self.socket_connection.close()
self.connection.connection.close()
self.connection.recv.close()
return
+start_log_format = '--- Starting bitbake server pid %s at %s ---'
+start_log_datetime_format = '%Y-%m-%d %H:%M:%S.%f'
+
class BitBakeServer(object):
- start_log_format = '--- Starting bitbake server pid %s at %s ---'
- start_log_datetime_format = '%Y-%m-%d %H:%M:%S.%f'
- def __init__(self, lock, sockname, configuration, featureset):
+ def __init__(self, lock, sockname, featureset, server_timeout, xmlrpcinterface, profile):
- self.configuration = configuration
+ self.server_timeout = server_timeout
+ self.xmlrpcinterface = xmlrpcinterface
self.featureset = featureset
self.sockname = sockname
self.bitbake_lock = lock
+ self.profile = profile
self.readypipe, self.readypipein = os.pipe()
- # Create server control socket
- if os.path.exists(sockname):
- os.unlink(sockname)
-
# Place the log in the builddirectory alongside the lock file
logfile = os.path.join(os.path.dirname(self.bitbake_lock.name), "bitbake-cookerdaemon.log")
+ self.logfile = logfile
- self.sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
- # AF_UNIX has path length issues so chdir here to workaround
- cwd = os.getcwd()
- try:
- os.chdir(os.path.dirname(sockname))
- self.sock.bind(os.path.basename(sockname))
- finally:
- os.chdir(cwd)
- self.sock.listen(1)
-
- os.set_inheritable(self.sock.fileno(), True)
startdatetime = datetime.datetime.now()
bb.daemonize.createDaemon(self._startServer, logfile)
- self.sock.close()
self.bitbake_lock.close()
os.close(self.readypipein)
@@ -420,13 +599,13 @@ class BitBakeServer(object):
try:
r = ready.get()
except EOFError:
- # Trap the child exitting/closing the pipe and error out
+ # Trap the child exiting/closing the pipe and error out
r = None
if not r or r[0] != "r":
ready.close()
bb.error("Unable to start bitbake server (%s)" % str(r))
if os.path.exists(logfile):
- logstart_re = re.compile(self.start_log_format % ('([0-9]+)', '([0-9-]+ [0-9:.]+)'))
+ logstart_re = re.compile(start_log_format % ('([0-9]+)', '([0-9-]+ [0-9:.]+)'))
started = False
lines = []
lastlines = []
@@ -436,9 +615,9 @@ class BitBakeServer(object):
lines.append(line)
else:
lastlines.append(line)
- res = logstart_re.match(line.rstrip())
+ res = logstart_re.search(line.rstrip())
if res:
- ldatetime = datetime.datetime.strptime(res.group(2), self.start_log_datetime_format)
+ ldatetime = datetime.datetime.strptime(res.group(2), start_log_datetime_format)
if ldatetime >= startdatetime:
started = True
lines.append(line)
@@ -459,23 +638,55 @@ class BitBakeServer(object):
ready.close()
def _startServer(self):
- print(self.start_log_format % (os.getpid(), datetime.datetime.now().strftime(self.start_log_datetime_format)))
- sys.stdout.flush()
-
- server = ProcessServer(self.bitbake_lock, self.sock, self.sockname)
- self.configuration.setServerRegIdleCallback(server.register_idle_function)
os.close(self.readypipe)
- writer = ConnectionWriter(self.readypipein)
- self.cooker = bb.cooker.BBCooker(self.configuration, self.featureset)
+ os.set_inheritable(self.bitbake_lock.fileno(), True)
+ os.set_inheritable(self.readypipein, True)
+ serverscript = os.path.realpath(os.path.dirname(__file__) + "/../../../bin/bitbake-server")
+ os.execl(sys.executable, sys.executable, serverscript, "decafbad", str(self.bitbake_lock.fileno()), str(self.readypipein), self.logfile, self.bitbake_lock.name, self.sockname, str(self.server_timeout or 0), str(int(self.profile)), str(self.xmlrpcinterface[0]), str(self.xmlrpcinterface[1]))
+
+def execServer(lockfd, readypipeinfd, lockname, sockname, server_timeout, xmlrpcinterface, profile):
+
+ import bb.cookerdata
+ import bb.cooker
+
+ serverlog(start_log_format % (os.getpid(), datetime.datetime.now().strftime(start_log_datetime_format)))
+
+ try:
+ bitbake_lock = os.fdopen(lockfd, "w")
+
+ # Create server control socket
+ if os.path.exists(sockname):
+ serverlog("WARNING: removing existing socket file '%s'" % sockname)
+ os.unlink(sockname)
+
+ sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
+ # AF_UNIX has path length issues so chdir here to workaround
+ cwd = os.getcwd()
+ try:
+ os.chdir(os.path.dirname(sockname))
+ sock.bind(os.path.basename(sockname))
+ finally:
+ os.chdir(cwd)
+ sock.listen(1)
+
+ server = ProcessServer(bitbake_lock, lockname, sock, sockname, server_timeout, xmlrpcinterface)
+ writer = ConnectionWriter(readypipeinfd)
+ try:
+ featureset = []
+ cooker = bb.cooker.BBCooker(featureset, server)
+ cooker.configuration.profile = profile
+ except bb.BBHandledException:
+ return None
writer.send("r")
writer.close()
- server.cooker = self.cooker
- server.server_timeout = self.configuration.server_timeout
- server.xmlrpcinterface = self.configuration.xmlrpcinterface
- print("Started bitbake server pid %d" % os.getpid())
- sys.stdout.flush()
+ server.cooker = cooker
+ serverlog("Started bitbake server pid %d" % os.getpid())
- server.start()
+ server.run()
+ finally:
+ # Flush any messages/errors to the logfile before exit
+ sys.stdout.flush()
+ sys.stderr.flush()
def connectProcessServer(sockname, featureset):
# Connect to socket
@@ -578,23 +789,18 @@ class BBUIEventQueue:
self.reader = ConnectionReader(readfd)
self.t = threading.Thread()
- self.t.setDaemon(True)
self.t.run = self.startCallbackHandler
self.t.start()
def getEvent(self):
- self.eventQueueLock.acquire()
-
- if len(self.eventQueue) == 0:
- self.eventQueueLock.release()
- return None
-
- item = self.eventQueue.pop(0)
+ with bb.utils.lock_timeout(self.eventQueueLock):
+ if len(self.eventQueue) == 0:
+ return None
- if len(self.eventQueue) == 0:
- self.eventQueueNotify.clear()
+ item = self.eventQueue.pop(0)
+ if len(self.eventQueue) == 0:
+ self.eventQueueNotify.clear()
- self.eventQueueLock.release()
return item
def waitEvent(self, delay):
@@ -602,10 +808,9 @@ class BBUIEventQueue:
return self.getEvent()
def queue_event(self, event):
- self.eventQueueLock.acquire()
- self.eventQueue.append(event)
- self.eventQueueNotify.set()
- self.eventQueueLock.release()
+ with bb.utils.lock_timeout(self.eventQueueLock):
+ self.eventQueue.append(event)
+ self.eventQueueNotify.set()
def send_event(self, event):
self.queue_event(pickle.loads(event))
@@ -614,13 +819,17 @@ class BBUIEventQueue:
bb.utils.set_process_name("UIEventQueue")
while True:
try:
- self.reader.wait()
- event = self.reader.get()
- self.queue_event(event)
- except EOFError:
+ ready = self.reader.wait(0.25)
+ if ready:
+ event = self.reader.get()
+ self.queue_event(event)
+ except (EOFError, OSError, TypeError):
# Easiest way to exit is to close the file descriptor to cause an exit
break
+
+ def close(self):
self.reader.close()
+ self.t.join()
class ConnectionReader(object):
@@ -635,7 +844,7 @@ class ConnectionReader(object):
return self.reader.poll(timeout)
def get(self):
- with self.rlock:
+ with bb.utils.lock_timeout(self.rlock):
res = self.reader.recv_bytes()
return multiprocessing.reduction.ForkingPickler.loads(res)
@@ -654,10 +863,31 @@ class ConnectionWriter(object):
# Why bb.event needs this I have no idea
self.event = self
+ def _send(self, obj):
+ gc.disable()
+ with bb.utils.lock_timeout(self.wlock):
+ self.writer.send_bytes(obj)
+ gc.enable()
+
def send(self, obj):
obj = multiprocessing.reduction.ForkingPickler.dumps(obj)
- with self.wlock:
- self.writer.send_bytes(obj)
+ # See notes/code in CookerParser
+ # We must not terminate holding this lock else processes will hang.
+ # For SIGTERM, raising afterwards avoids this.
+ # For SIGINT, we don't want to have written partial data to the pipe.
+ # pthread_sigmask block/unblock would be nice but doesn't work, https://bugs.python.org/issue47139
+ process = multiprocessing.current_process()
+ if process and hasattr(process, "queue_signals"):
+ with bb.utils.lock_timeout(process.signal_threadlock):
+ process.queue_signals = True
+ self._send(obj)
+ process.queue_signals = False
+
+ while len(process.signal_received) > 0:
+ sig = process.signal_received.pop()
+ process.handle_sig(sig, None)
+ else:
+ self._send(obj)
def fileno(self):
return self.writer.fileno()