diff options
Diffstat (limited to 'bitbake/lib/bb/server/process.py')
-rw-r--r-- | bitbake/lib/bb/server/process.py | 596 |
1 files changed, 393 insertions, 203 deletions
diff --git a/bitbake/lib/bb/server/process.py b/bitbake/lib/bb/server/process.py index 48da7fe46c..6edb0213ad 100644 --- a/bitbake/lib/bb/server/process.py +++ b/bitbake/lib/bb/server/process.py @@ -22,128 +22,205 @@ import bb import bb.event -import itertools import logging import multiprocessing +import threading +import array import os -import signal import sys import time import select -from queue import Empty -from multiprocessing import Event, Process, util, Queue, Pipe, queues, Manager +import socket +import subprocess +import errno +import bb.server.xmlrpcserver +from bb import daemonize +from multiprocessing import queues logger = logging.getLogger('BitBake') -class ServerCommunicator(): - def __init__(self, connection, event_handle, server): - self.connection = connection - self.event_handle = event_handle - self.server = server - - def runCommand(self, command): - # @todo try/except - self.connection.send(command) - - if not self.server.is_alive(): - raise SystemExit - - while True: - # don't let the user ctrl-c while we're waiting for a response - try: - for idx in range(0,4): # 0, 1, 2, 3 - if self.connection.poll(5): - return self.connection.recv() - else: - bb.warn("Timeout while attempting to communicate with bitbake server") - bb.fatal("Gave up; Too many tries: timeout while attempting to communicate with bitbake server") - except KeyboardInterrupt: - pass - - def getEventHandle(self): - handle, error = self.runCommand(["getUIHandlerNum"]) - if error: - logger.error("Unable to get UI Handler Number: %s" % error) - raise BaseException(error) +class ProcessTimeout(SystemExit): + pass - return handle - -class EventAdapter(): - """ - Adapter to wrap our event queue since the caller (bb.event) expects to - call a send() method, but our actual queue only has put() - """ - def __init__(self, queue): - self.queue = queue - - def send(self, event): - try: - self.queue.put(event) - except Exception as err: - print("EventAdapter puked: %s" % str(err)) - - -class ProcessServer(Process): +class ProcessServer(multiprocessing.Process): profile_filename = "profile.log" profile_processed_filename = "profile.log.processed" - def __init__(self, command_channel, event_queue, featurelist): - self._idlefuns = {} - Process.__init__(self) - self.command_channel = command_channel - self.event_queue = event_queue - self.event = EventAdapter(event_queue) - self.featurelist = featurelist + def __init__(self, lock, sock, sockname): + multiprocessing.Process.__init__(self) + self.command_channel = False + self.command_channel_reply = False self.quit = False self.heartbeat_seconds = 1 # default, BB_HEARTBEAT_EVENT will be checked once we have a datastore. self.next_heartbeat = time.time() - self.quitin, self.quitout = Pipe() - self.event_handle = multiprocessing.Value("i") + self.event_handle = None + self.haveui = False + self.lastui = False + self.xmlrpc = False + + self._idlefuns = {} + + self.bitbake_lock = lock + self.sock = sock + self.sockname = sockname + + def register_idle_function(self, function, data): + """Register a function to be called while the server is idle""" + assert hasattr(function, '__call__') + self._idlefuns[function] = data def run(self): - for event in bb.event.ui_queue: - self.event_queue.put(event) - self.event_handle.value = bb.event.register_UIHhandler(self, True) + + if self.xmlrpcinterface[0]: + self.xmlrpc = bb.server.xmlrpcserver.BitBakeXMLRPCServer(self.xmlrpcinterface, self.cooker, self) + + print("Bitbake XMLRPC server address: %s, server port: %s" % (self.xmlrpc.host, self.xmlrpc.port)) heartbeat_event = self.cooker.data.getVar('BB_HEARTBEAT_EVENT') if heartbeat_event: try: self.heartbeat_seconds = float(heartbeat_event) except: - # Throwing an exception here causes bitbake to hang. - # Just warn about the invalid setting and continue bb.warn('Ignoring invalid BB_HEARTBEAT_EVENT=%s, must be a float specifying seconds.' % heartbeat_event) + + self.timeout = self.server_timeout or self.cooker.data.getVar('BB_SERVER_TIMEOUT') + try: + if self.timeout: + self.timeout = float(self.timeout) + except: + bb.warn('Ignoring invalid BB_SERVER_TIMEOUT=%s, must be a float specifying seconds.' % self.timeout) + + + try: + self.bitbake_lock.seek(0) + self.bitbake_lock.truncate() + if self.xmlrpcinterface[0]: + self.bitbake_lock.write("%s %s:%s\n" % (os.getpid(), configuration.interface[0], configuration.interface[1])) + else: + self.bitbake_lock.write("%s\n" % (os.getpid())) + self.bitbake_lock.flush() + except: + pass + bb.cooker.server_main(self.cooker, self.main) def main(self): - # Ignore SIGINT within the server, as all SIGINT handling is done by - # the UI and communicated to us - self.quitin.close() - signal.signal(signal.SIGINT, signal.SIG_IGN) bb.utils.set_process_name("Cooker") + + ready = [] + + self.controllersock = False + fds = [self.sock] + if self.xmlrpc: + fds.append(self.xmlrpc) while not self.quit: - try: - if self.command_channel.poll(): - command = self.command_channel.recv() - self.runCommand(command) - if self.quitout.poll(): - self.quitout.recv() + if self.command_channel in ready: + command = self.command_channel.get() + if command[0] == "terminateServer": self.quit = True - try: - self.runCommand(["stateForceShutdown"]) - except: - pass - - self.idle_commands(.1, [self.command_channel, self.quitout]) - except Exception: - logger.exception('Running command %s', command) + continue + try: + print("Running command %s" % command) + self.command_channel_reply.send(self.cooker.command.runCommand(command)) + except Exception as e: + logger.exception('Exception in server main event loop running command %s (%s)' % (command, str(e))) + + if self.xmlrpc in ready: + self.xmlrpc.handle_requests() + if self.sock in ready: + self.controllersock, address = self.sock.accept() + if self.haveui: + print("Dropping connection attempt as we have a UI %s" % (str(ready))) + self.controllersock.close() + else: + print("Accepting %s" % (str(ready))) + fds.append(self.controllersock) + if self.controllersock in ready: + try: + print("Connecting Client") + ui_fds = recvfds(self.controllersock, 3) + + # Where to write events to + writer = ConnectionWriter(ui_fds[0]) + self.event_handle = bb.event.register_UIHhandler(writer, True) + self.event_writer = writer + + # Where to read commands from + reader = ConnectionReader(ui_fds[1]) + fds.append(reader) + self.command_channel = reader + + # Where to send command return values to + writer = ConnectionWriter(ui_fds[2]) + self.command_channel_reply = writer + + self.haveui = True + + except EOFError: + print("Disconnecting Client") + fds.remove(self.controllersock) + fds.remove(self.command_channel) + bb.event.unregister_UIHhandler(self.event_handle, True) + self.command_channel_reply.writer.close() + self.event_writer.writer.close() + del self.event_writer + self.controllersock.close() + self.haveui = False + self.lastui = time.time() + self.cooker.clientComplete() + if self.timeout is None: + print("No timeout, exiting.") + self.quit = True + if not self.haveui and self.lastui and self.timeout and (self.lastui + self.timeout) < time.time(): + print("Server timeout, exiting.") + self.quit = True - self.event_queue.close() - bb.event.unregister_UIHhandler(self.event_handle.value, True) - self.command_channel.close() - self.cooker.shutdown(True) - self.quitout.close() + ready = self.idle_commands(.1, fds) + + print("Exiting") + try: + self.cooker.shutdown(True) + except: + pass + + # Remove the socket file so we don't get any more connections to avoid races + os.unlink(self.sockname) + self.sock.close() + + # Finally release the lockfile but warn about other processes holding it open + lock = self.bitbake_lock + lockfile = lock.name + lock.close() + lock = None + + while not lock: + with bb.utils.timeout(3): + lock = bb.utils.lockfile(lockfile, shared=False, retry=False, block=True) + if not lock: + # Some systems may not have lsof available + procs = None + try: + procs = subprocess.check_output(["lsof", '-w', lockfile], stderr=subprocess.STDOUT) + except OSError as e: + if e.errno != errno.ENOENT: + raise + if procs is None: + # Fall back to fuser if lsof is unavailable + try: + procs = subprocess.check_output(["fuser", '-v', lockfile], stderr=subprocess.STDOUT) + except OSError as e: + if e.errno != errno.ENOENT: + raise + + msg = "Delaying shutdown due to active processes which appear to be holding bitbake.lock" + if procs: + msg += ":\n%s" % str(procs) + print(msg) + return + # We hold the lock so we can remove the file (hide stale pid data) + bb.utils.remove(lockfile) + bb.utils.unlockfile(lock) def idle_commands(self, delay, fds=None): nextsleep = delay @@ -189,140 +266,253 @@ class ProcessServer(Process): nextsleep = self.next_heartbeat - now if nextsleep is not None: + if self.xmlrpc: + nextsleep = self.xmlrpc.get_timeout(nextsleep) try: - select.select(fds,[],[],nextsleep) + return select.select(fds,[],[],nextsleep)[0] except InterruptedError: - # ignore EINTR error, nextsleep only used for wait - # certain time - pass + # Ignore EINTR + return [] + else: + return [] + + +class ServerCommunicator(): + def __init__(self, connection, recv): + self.connection = connection + self.recv = recv def runCommand(self, command): - """ - Run a cooker command on the server - """ - self.command_channel.send(self.cooker.command.runCommand(command)) - def stop(self): - self.quitin.send("quit") - self.quitin.close() + self.connection.send(command) + while True: + # don't let the user ctrl-c while we're waiting for a response + try: + for idx in range(0,4): # 0, 1, 2, 3 + if self.recv.poll(1): + return self.recv.get() + else: + bb.warn("Timeout while attempting to communicate with bitbake server") + raise ProcessTimeout("Gave up; Too many tries: timeout while attempting to communicate with bitbake server") + except KeyboardInterrupt: + pass + + def updateFeatureSet(self, featureset): + _, error = self.runCommand(["setFeatures", featureset]) + if error: + logger.error("Unable to set the cooker to the correct featureset: %s" % error) + raise BaseException(error) + + def getEventHandle(self): + handle, error = self.runCommand(["getUIHandlerNum"]) + if error: + logger.error("Unable to get UI Handler Number: %s" % error) + raise BaseException(error) - def addcooker(self, cooker): - self.cooker = cooker + return handle - def register_idle_function(self, function, data): - """Register a function to be called while the server is idle""" - assert hasattr(function, '__call__') - self._idlefuns[function] = data + def terminateServer(self): + self.connection.send(['terminateServer']) + return class BitBakeProcessServerConnection(object): - def __init__(self, serverImpl, ui_channel, event_queue): - self.procserver = serverImpl - self.ui_channel = ui_channel - self.event_queue = event_queue - self.connection = ServerCommunicator(self.ui_channel, self.procserver.event_handle, self.procserver) - self.events = self.event_queue - self.terminated = False - - def sigterm_terminate(self): - bb.error("UI received SIGTERM") - self.terminate() + def __init__(self, ui_channel, recv, eq): + self.connection = ServerCommunicator(ui_channel, recv) + self.events = eq def terminate(self): - if self.terminated: - return - self.terminated = True - def flushevents(): - while True: - try: - event = self.event_queue.get(block=False) - except (Empty, IOError): - break - if isinstance(event, logging.LogRecord): - logger.handle(event) - - self.procserver.stop() - - while self.procserver.is_alive(): - flushevents() - self.procserver.join(0.1) - - self.ui_channel.close() - self.event_queue.close() - self.event_queue.setexit() - # XXX: Call explicity close in _writer to avoid - # fd leakage because isn't called on Queue.close() - self.event_queue._writer.close() - - def setupEventQueue(self): - pass - -# Wrap Queue to provide API which isn't server implementation specific -class ProcessEventQueue(multiprocessing.queues.Queue): - def __init__(self, maxsize): - multiprocessing.queues.Queue.__init__(self, maxsize, ctx=multiprocessing.get_context()) - self.exit = False - bb.utils.set_process_name("ProcessEQueue") - - def setexit(self): - self.exit = True - - def waitEvent(self, timeout): - if self.exit: - return self.getEvent() + self.socket_connection.close() + return + +class BitBakeServer(object): + def __init__(self, lock, sockname, configuration, featureset): + + self.configuration = configuration + self.featureset = featureset + self.sockname = sockname + self.bitbake_lock = lock + + # Create server control socket + if os.path.exists(sockname): + os.unlink(sockname) + + self.sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) + # AF_UNIX has path length issues so chdir here to workaround + cwd = os.getcwd() try: - if not self.server.is_alive(): - return self.getEvent() - if timeout == 0: - return self.get(False) - return self.get(True, timeout) - except Empty: - return None + os.chdir(os.path.dirname(sockname)) + self.sock.bind(os.path.basename(sockname)) + finally: + os.chdir(cwd) + self.sock.listen(1) + + os.set_inheritable(self.sock.fileno(), True) + bb.daemonize.createDaemon(self._startServer, "bitbake-cookerdaemon.log") + self.sock.close() + self.bitbake_lock.close() + + def _startServer(self): + server = ProcessServer(self.bitbake_lock, self.sock, self.sockname) + self.configuration.setServerRegIdleCallback(server.register_idle_function) + + # Copy prefile and postfile to _server variants + for param in ('prefile', 'postfile'): + value = getattr(self.configuration, param) + if value: + setattr(self.configuration, "%s_server" % param, value) + + self.cooker = bb.cooker.BBCooker(self.configuration, self.featureset) + server.cooker = self.cooker + server.server_timeout = self.configuration.server_timeout + server.xmlrpcinterface = self.configuration.xmlrpcinterface + print("Started bitbake server pid %d" % os.getpid()) + server.start() + +def connectProcessServer(sockname, featureset): + # Connect to socket + sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) + # AF_UNIX has path length issues so chdir here to workaround + cwd = os.getcwd() + + try: + os.chdir(os.path.dirname(sockname)) + sock.connect(os.path.basename(sockname)) + finally: + os.chdir(cwd) + + try: + # Send an fd for the remote to write events to + readfd, writefd = os.pipe() + eq = BBUIEventQueue(readfd) + # Send an fd for the remote to recieve commands from + readfd1, writefd1 = os.pipe() + command_chan = ConnectionWriter(writefd1) + # Send an fd for the remote to write commands results to + readfd2, writefd2 = os.pipe() + command_chan_recv = ConnectionReader(readfd2) + + sendfds(sock, [writefd, readfd1, writefd2]) + + server_connection = BitBakeProcessServerConnection(command_chan, command_chan_recv, eq) + + server_connection.connection.updateFeatureSet(featureset) + + # Save sock so it doesn't get gc'd for the life of our connection + server_connection.socket_connection = sock + except: + sock.close() + raise + + return server_connection + +def sendfds(sock, fds): + '''Send an array of fds over an AF_UNIX socket.''' + fds = array.array('i', fds) + msg = bytes([len(fds) % 256]) + sock.sendmsg([msg], [(socket.SOL_SOCKET, socket.SCM_RIGHTS, fds)]) + +def recvfds(sock, size): + '''Receive an array of fds over an AF_UNIX socket.''' + a = array.array('i') + bytes_size = a.itemsize * size + msg, ancdata, flags, addr = sock.recvmsg(1, socket.CMSG_LEN(bytes_size)) + if not msg and not ancdata: + raise EOFError + try: + if len(ancdata) != 1: + raise RuntimeError('received %d items of ancdata' % + len(ancdata)) + cmsg_level, cmsg_type, cmsg_data = ancdata[0] + if (cmsg_level == socket.SOL_SOCKET and + cmsg_type == socket.SCM_RIGHTS): + if len(cmsg_data) % a.itemsize != 0: + raise ValueError + a.frombytes(cmsg_data) + assert len(a) % 256 == msg[0] + return list(a) + except (ValueError, IndexError): + pass + raise RuntimeError('Invalid data received') + +class BBUIEventQueue: + def __init__(self, readfd): + + self.eventQueue = [] + self.eventQueueLock = threading.Lock() + self.eventQueueNotify = threading.Event() + + self.reader = ConnectionReader(readfd) + + self.t = threading.Thread() + self.t.setDaemon(True) + self.t.run = self.startCallbackHandler + self.t.start() def getEvent(self): - try: - if not self.server.is_alive(): - self.setexit() - return self.get(False) - except Empty: - if self.exit: - sys.exit(1) + self.eventQueueLock.acquire() + + if len(self.eventQueue) == 0: + self.eventQueueLock.release() return None -class BitBakeServer(object): - def initServer(self, single_use=True): - # establish communication channels. We use bidirectional pipes for - # ui <--> server command/response pairs - # and a queue for server -> ui event notifications - # - self.ui_channel, self.server_channel = Pipe() - self.event_queue = ProcessEventQueue(0) - self.serverImpl = ProcessServer(self.server_channel, self.event_queue, None) - self.event_queue.server = self.serverImpl - - def detach(self): - self.serverImpl.start() - return + item = self.eventQueue.pop(0) - def establishConnection(self, featureset): + if len(self.eventQueue) == 0: + self.eventQueueNotify.clear() - self.connection = BitBakeProcessServerConnection(self.serverImpl, self.ui_channel, self.event_queue) + self.eventQueueLock.release() + return item - _, error = self.connection.connection.runCommand(["setFeatures", featureset]) - if error: - logger.error("Unable to set the cooker to the correct featureset: %s" % error) - raise BaseException(error) - signal.signal(signal.SIGTERM, lambda i, s: self.connection.sigterm_terminate()) - return self.connection + def waitEvent(self, delay): + self.eventQueueNotify.wait(delay) + return self.getEvent() - def addcooker(self, cooker): - self.cooker = cooker - self.serverImpl.addcooker(cooker) + def queue_event(self, event): + self.eventQueueLock.acquire() + self.eventQueue.append(event) + self.eventQueueNotify.set() + self.eventQueueLock.release() - def getServerIdleCB(self): - return self.serverImpl.register_idle_function + def send_event(self, event): + self.queue_event(pickle.loads(event)) - def saveConnectionDetails(self): - return + def startCallbackHandler(self): + bb.utils.set_process_name("UIEventQueue") + while True: + self.reader.wait() + event = self.reader.get() + self.queue_event(event) + +class ConnectionReader(object): + + def __init__(self, fd): + self.reader = multiprocessing.connection.Connection(fd, writable=False) + self.rlock = multiprocessing.Lock() + + def wait(self, timeout=None): + return multiprocessing.connection.wait([self.reader], timeout) + + def poll(self, timeout=None): + return self.reader.poll(timeout) + + def get(self): + with self.rlock: + res = self.reader.recv_bytes() + return multiprocessing.reduction.ForkingPickler.loads(res) + + def fileno(self): + return self.reader.fileno() + +class ConnectionWriter(object): + + def __init__(self, fd): + self.writer = multiprocessing.connection.Connection(fd, readable=False) + self.wlock = multiprocessing.Lock() + # Why bb.event needs this I have no idea + self.event = self + + def send(self, obj): + obj = multiprocessing.reduction.ForkingPickler.dumps(obj) + with self.wlock: + self.writer.send_bytes(obj) - def endSession(self): - self.connection.terminate() |