#!/usr/bin/env python3 # # SPDX-License-Identifier: GPL-2.0-only # import os import sys import warnings sys.path.insert(0, os.path.join(os.path.dirname(os.path.dirname(sys.argv[0])), 'lib')) from bb import fetch2 import logging import bb import select import errno import signal import pickle import traceback import queue from multiprocessing import Lock from threading import Thread if sys.getfilesystemencoding() != "utf-8": sys.exit("Please use a locale setting which supports UTF-8 (such as LANG=en_US.UTF-8).\nPython can't change the filesystem locale after loading so we need a UTF-8 when Python starts or things won't work.") # Users shouldn't be running this code directly if len(sys.argv) != 2 or not sys.argv[1].startswith("decafbad"): print("bitbake-worker is meant for internal execution by bitbake itself, please don't use it standalone.") sys.exit(1) profiling = False if sys.argv[1].startswith("decafbadbad"): profiling = True try: import cProfile as profile except: import profile # Unbuffer stdout to avoid log truncation in the event # of an unorderly exit as well as to provide timely # updates to log files for use with tail try: if sys.stdout.name == '': import fcntl fl = fcntl.fcntl(sys.stdout.fileno(), fcntl.F_GETFL) fl |= os.O_SYNC fcntl.fcntl(sys.stdout.fileno(), fcntl.F_SETFL, fl) #sys.stdout = os.fdopen(sys.stdout.fileno(), 'w', 0) except: pass logger = logging.getLogger("BitBake") worker_pipe = sys.stdout.fileno() bb.utils.nonblockingfd(worker_pipe) # Need to guard against multiprocessing being used in child processes # and multiple processes trying to write to the parent at the same time worker_pipe_lock = None handler = bb.event.LogHandler() logger.addHandler(handler) if 0: # Code to write out a log file of all events passing through the worker logfilename = "/tmp/workerlogfile" format_str = "%(levelname)s: %(message)s" conlogformat = bb.msg.BBLogFormatter(format_str) consolelog = logging.FileHandler(logfilename) consolelog.setFormatter(conlogformat) logger.addHandler(consolelog) worker_queue = queue.Queue() def worker_fire(event, d): data = b"" + pickle.dumps(event) + b"" worker_fire_prepickled(data) def worker_fire_prepickled(event): global worker_queue worker_queue.put(event) # # We can end up with write contention with the cooker, it can be trying to send commands # and we can be trying to send event data back. Therefore use a separate thread for writing # back data to cooker. # worker_thread_exit = False def worker_flush(worker_queue): worker_queue_int = b"" global worker_pipe, worker_thread_exit while True: try: worker_queue_int = worker_queue_int + worker_queue.get(True, 1) except queue.Empty: pass while (worker_queue_int or not worker_queue.empty()): try: (_, ready, _) = select.select([], [worker_pipe], [], 1) if not worker_queue.empty(): worker_queue_int = worker_queue_int + worker_queue.get() written = os.write(worker_pipe, worker_queue_int) worker_queue_int = worker_queue_int[written:] except (IOError, OSError) as e: if e.errno != errno.EAGAIN and e.errno != errno.EPIPE: raise if worker_thread_exit and worker_queue.empty() and not worker_queue_int: return worker_thread = Thread(target=worker_flush, args=(worker_queue,)) worker_thread.start() def worker_child_fire(event, d): global worker_pipe global worker_pipe_lock data = b"" + pickle.dumps(event) + b"" try: worker_pipe_lock.acquire() worker_pipe.write(data) worker_pipe_lock.release() except IOError: sigterm_handler(None, None) raise bb.event.worker_fire = worker_fire lf = None #lf = open("/tmp/workercommandlog", "w+") def workerlog_write(msg): if lf: lf.write(msg) lf.flush() def sigterm_handler(signum, frame): signal.signal(signal.SIGTERM, signal.SIG_DFL) os.killpg(0, signal.SIGTERM) sys.exit() def fork_off_task(cfg, data, databuilder, workerdata, fn, task, taskname, taskhash, unihash, appends, taskdepdata, extraconfigdata, quieterrors=False, dry_run_exec=False): # We need to setup the environment BEFORE the fork, since # a fork() or exec*() activates PSEUDO... envbackup = {} fakeenv = {} umask = None taskdep = workerdata["taskdeps"][fn] if 'umask' in taskdep and taskname in taskdep['umask']: # umask might come in as a number or text string.. try: umask = int(taskdep['umask'][taskname],8) except TypeError: umask = taskdep['umask'][taskname] dry_run = cfg.dry_run or dry_run_exec # We can't use the fakeroot environment in a dry run as it possibly hasn't been built if 'fakeroot' in taskdep and taskname in taskdep['fakeroot'] and not dry_run: envvars = (workerdata["fakerootenv"][fn] or "").split() for key, value in (var.split('=') for var in envvars): envbackup[key] = os.environ.get(key) os.environ[key] = value fakeenv[key] = value fakedirs = (workerdata["fakerootdirs"][fn] or "").split() for p in fakedirs: bb.utils.mkdirhier(p) logger.debug(2, 'Running %s:%s under fakeroot, fakedirs: %s' % (fn, taskname, ', '.join(fakedirs))) else: envvars = (workerdata["fakerootnoenv"][fn] or "").split() for key, value in (var.split('=') for var in envvars): envbackup[key] = os.environ.get(key) os.environ[key] = value fakeenv[key] = value sys.stdout.flush() sys.stderr.flush() try: pipein, pipeout = os.pipe() pipein = os.fdopen(pipein, 'rb', 4096) pipeout = os.fdopen(pipeout, 'wb', 0) pid = os.fork() except OSError as e: logger.critical("fork failed: %d (%s)" % (e.errno, e.strerror)) sys.exit(1) if pid == 0: def child(): global worker_pipe global worker_pipe_lock pipein.close() bb.utils.signal_on_parent_exit("SIGTERM") # Save out the PID so that the event can include it the # events bb.event.worker_pid = os.getpid() bb.event.worker_fire = worker_child_fire worker_pipe = pipeout worker_pipe_lock = Lock() # Make the child the process group leader and ensure no # child process will be controlled by the current terminal # This ensures signals sent to the controlling terminal like Ctrl+C # don't stop the child processes. os.setsid() signal.signal(signal.SIGTERM, sigterm_handler) # Let SIGHUP exit as SIGTERM signal.signal(signal.SIGHUP, sigterm_handler) # No stdin newsi = os.open(os.devnull, os.O_RDWR) os.dup2(newsi, sys.stdin.fileno()) if umask: os.umask(umask) try: bb_cache = bb.cache.NoCache(databuilder) (realfn, virtual, mc) = bb.cache.virtualfn2realfn(fn) the_data = databuilder.mcdata[mc] the_data.setVar("BB_WORKERCONTEXT", "1") the_data.setVar("BB_TASKDEPDATA", taskdepdata) if cfg.limited_deps: the_data.setVar("BB_LIMITEDDEPS", "1") the_data.setVar("BUILDNAME", workerdata["buildname"]) the_data.setVar("DATE", workerdata["date"]) the_data.setVar("TIME", workerdata["time"]) for varname, value in extraconfigdata.items(): the_data.setVar(varname, value) bb.parse.siggen.set_taskdata(workerdata["sigdata"]) if "newhashes" in workerdata: bb.parse.siggen.set_taskhashes(workerdata["newhashes"]) ret = 0 the_data = bb_cache.loadDataFull(fn, appends) the_data.setVar('BB_TASKHASH', taskhash) the_data.setVar('BB_UNIHASH', unihash) bb.utils.set_process_name("%s:%s" % (the_data.getVar("PN"), taskname.replace("do_", ""))) # exported_vars() returns a generator which *cannot* be passed to os.environ.update() # successfully. We also need to unset anything from the environment which shouldn't be there exports = bb.data.exported_vars(the_data) bb.utils.empty_environment() for e, v in exports: os.environ[e] = v for e in fakeenv: os.environ[e] = fakeenv[e] the_data.setVar(e, fakeenv[e]) the_data.setVarFlag(e, 'export', "1") task_exports = the_data.getVarFlag(taskname, 'exports') if task_exports: for e in task_exports.split(): the_data.setVarFlag(e, 'export', '1') v = the_data.getVar(e) if v is not None: os.environ[e] = v if quieterrors: the_data.setVarFlag(taskname, "quieterrors", "1") except Exception: if not quieterrors: logger.critical(traceback.format_exc()) os._exit(1) try: if dry_run: return 0 return bb.build.exec_task(fn, taskname, the_data, cfg.profile) except: os._exit(1) if not profiling: os._exit(child()) else: profname = "profile-%s.log" % (fn.replace("/", "-") + "-" + taskname) prof = profile.Profile() try: ret = profile.Profile.runcall(prof, child) finally: prof.dump_stats(profname) bb.utils.process_profilelog(profname) os._exit(ret) else: for key, value in iter(envbackup.items()): if value is None: del os.environ[key] else: os.environ[key] = value return pid, pipein, pipeout class runQueueWorkerPipe(): """ Abstraction for a pipe between a worker thread and the worker server """ def __init__(self, pipein, pipeout): self.input = pipein if pipeout: pipeout.close() bb.utils.nonblockingfd(self.input) self.queue = b"" def read(self): start = len(self.queue) try: self.queue = self.queue + (self.input.read(102400) or b"") except (OSError, IOError) as e: if e.errno != errno.EAGAIN: raise end = len(self.queue) index = self.queue.find(b"") while index != -1: worker_fire_prepickled(self.queue[:index+8]) self.queue = self.queue[index+8:] index = self.queue.find(b"") return (end > start) def close(self): while self.read(): continue if len(self.queue) > 0: print("Warning, worker child left partial message: %s" % self.queue) self.input.close() normalexit = False class BitbakeWorker(object): def __init__(self, din): self.input = din bb.utils.nonblockingfd(self.input) self.queue = b"" self.cookercfg = None self.databuilder = None self.data = None self.extraconfigdata = None self.build_pids = {} self.build_pipes = {} signal.signal(signal.SIGTERM, self.sigterm_exception) # Let SIGHUP exit as SIGTERM signal.signal(signal.SIGHUP, self.sigterm_exception) if "beef" in sys.argv[1]: bb.utils.set_process_name("Worker (Fakeroot)") else: bb.utils.set_process_name("Worker") def sigterm_exception(self, signum, stackframe): if signum == signal.SIGTERM: bb.warn("Worker received SIGTERM, shutting down...") elif signum == signal.SIGHUP: bb.warn("Worker received SIGHUP, shutting down...") self.handle_finishnow(None) signal.signal(signal.SIGTERM, signal.SIG_DFL) os.kill(os.getpid(), signal.SIGTERM) def serve(self): while True: (ready, _, _) = select.select([self.input] + [i.input for i in self.build_pipes.values()], [] , [], 1) if self.input in ready: try: r = self.input.read() if len(r) == 0: # EOF on pipe, server must have terminated self.sigterm_exception(signal.SIGTERM, None) self.queue = self.queue + r except (OSError, IOError): pass if len(self.queue): self.handle_item(b"cookerconfig", self.handle_cookercfg) self.handle_item(b"extraconfigdata", self.handle_extraconfigdata) self.handle_item(b"workerdata", self.handle_workerdata) self.handle_item(b"newtaskhashes", self.handle_newtaskhashes) self.handle_item(b"runtask", self.handle_runtask) self.handle_item(b"finishnow", self.handle_finishnow) self.handle_item(b"ping", self.handle_ping) self.handle_item(b"quit", self.handle_quit) for pipe in self.build_pipes: if self.build_pipes[pipe].input in ready: self.build_pipes[pipe].read() if len(self.build_pids): while self.process_waitpid(): continue def handle_item(self, item, func): if self.queue.startswith(b"<" + item + b">"): index = self.queue.find(b"") while index != -1: func(self.queue[(len(item) + 2):index]) self.queue = self.queue[(index + len(item) + 3):] index = self.queue.find(b"") def handle_cookercfg(self, data): self.cookercfg = pickle.loads(data) self.databuilder = bb.cookerdata.CookerDataBuilder(self.cookercfg, worker=True) self.databuilder.parseBaseConfiguration() self.data = self.databuilder.data def handle_extraconfigdata(self, data): self.extraconfigdata = pickle.loads(data) def handle_workerdata(self, data): self.workerdata = pickle.loads(data) bb.msg.loggerDefaultLogLevel = self.workerdata["logdefaultlevel"] bb.msg.loggerDefaultVerbose = self.workerdata["logdefaultverbose"] bb.msg.loggerVerboseLogs = self.workerdata["logdefaultverboselogs"] bb.msg.loggerDefaultDomains = self.workerdata["logdefaultdomain"] for mc in self.databuilder.mcdata: self.databuilder.mcdata[mc].setVar("PRSERV_HOST", self.workerdata["prhost"]) self.databuilder.mcdata[mc].setVar("BB_HASHSERVE", self.workerdata["hashservaddr"]) def handle_newtaskhashes(self, data): self.workerdata["newhashes"] = pickle.loads(data) def handle_ping(self, _): workerlog_write("Handling ping\n") logger.warning("Pong from bitbake-worker!") def handle_quit(self, data): workerlog_write("Handling quit\n") global normalexit normalexit = True sys.exit(0) def handle_runtask(self, data): fn, task, taskname, taskhash, unihash, quieterrors, appends, taskdepdata, dry_run_exec = pickle.loads(data) workerlog_write("Handling runtask %s %s %s\n" % (task, fn, taskname)) pid, pipein, pipeout = fork_off_task(self.cookercfg, self.data, self.databuilder, self.workerdata, fn, task, taskname, taskhash, unihash, appends, taskdepdata, self.extraconfigdata, quieterrors, dry_run_exec) self.build_pids[pid] = task self.build_pipes[pid] = runQueueWorkerPipe(pipein, pipeout) def process_waitpid(self): """ Return none is there are no processes awaiting result collection, otherwise collect the process exit codes and close the information pipe. """ try: pid, status = os.waitpid(-1, os.WNOHANG) if pid == 0 or os.WIFSTOPPED(status): return False except OSError: return False workerlog_write("Exit code of %s for pid %s\n" % (status, pid)) if os.WIFEXITED(status): status = os.WEXITSTATUS(status) elif os.WIFSIGNALED(status): # Per shell conventions for $?, when a process exits due to # a signal, we return an exit code of 128 + SIGNUM status = 128 + os.WTERMSIG(status) task = self.build_pids[pid] del self.build_pids[pid] self.build_pipes[pid].close() del self.build_pipes[pid] worker_fire_prepickled(b"" + pickle.dumps((task, status)) + b"") return True def handle_finishnow(self, _): if self.build_pids: logger.info("Sending SIGTERM to remaining %s tasks", len(self.build_pids)) for k, v in iter(self.build_pids.items()): try: os.kill(-k, signal.SIGTERM) os.waitpid(-1, 0) except: pass for pipe in self.build_pipes: self.build_pipes[pipe].read() try: worker = BitbakeWorker(os.fdopen(sys.stdin.fileno(), 'rb')) if not profiling: worker.serve() else: profname = "profile-worker.log" prof = profile.Profile() try: profile.Profile.runcall(prof, worker.serve) finally: prof.dump_stats(profname) bb.utils.process_profilelog(profname) except BaseException as e: if not normalexit: import traceback sys.stderr.write(traceback.format_exc()) sys.stderr.write(str(e)) worker_thread_exit = True worker_thread.join() workerlog_write("exitting") sys.exit(0)