summaryrefslogtreecommitdiffstats
path: root/bitbake/bin/bitbake-worker
diff options
context:
space:
mode:
Diffstat (limited to 'bitbake/bin/bitbake-worker')
-rwxr-xr-xbitbake/bin/bitbake-worker165
1 files changed, 117 insertions, 48 deletions
diff --git a/bitbake/bin/bitbake-worker b/bitbake/bin/bitbake-worker
index 97cc0fd60f..e8073f2ac3 100755
--- a/bitbake/bin/bitbake-worker
+++ b/bitbake/bin/bitbake-worker
@@ -1,11 +1,14 @@
#!/usr/bin/env python3
#
+# Copyright BitBake Contributors
+#
# SPDX-License-Identifier: GPL-2.0-only
#
import os
import sys
import warnings
+warnings.simplefilter("default")
sys.path.insert(0, os.path.join(os.path.dirname(os.path.dirname(sys.argv[0])), 'lib'))
from bb import fetch2
import logging
@@ -16,11 +19,12 @@ import signal
import pickle
import traceback
import queue
+import shlex
+import subprocess
from multiprocessing import Lock
from threading import Thread
-if sys.getfilesystemencoding() != "utf-8":
- sys.exit("Please use a locale setting which supports UTF-8 (such as LANG=en_US.UTF-8).\nPython can't change the filesystem locale after loading so we need a UTF-8 when Python starts or things won't work.")
+bb.utils.check_system_locale()
# Users shouldn't be running this code directly
if len(sys.argv) != 2 or not sys.argv[1].startswith("decafbad"):
@@ -87,19 +91,19 @@ def worker_fire_prepickled(event):
worker_thread_exit = False
def worker_flush(worker_queue):
- worker_queue_int = b""
+ worker_queue_int = bytearray()
global worker_pipe, worker_thread_exit
while True:
try:
- worker_queue_int = worker_queue_int + worker_queue.get(True, 1)
+ worker_queue_int.extend(worker_queue.get(True, 1))
except queue.Empty:
pass
while (worker_queue_int or not worker_queue.empty()):
try:
(_, ready, _) = select.select([], [worker_pipe], [], 1)
if not worker_queue.empty():
- worker_queue_int = worker_queue_int + worker_queue.get()
+ worker_queue_int.extend(worker_queue.get())
written = os.write(worker_pipe, worker_queue_int)
worker_queue_int = worker_queue_int[written:]
except (IOError, OSError) as e:
@@ -117,9 +121,10 @@ def worker_child_fire(event, d):
data = b"<event>" + pickle.dumps(event) + b"</event>"
try:
- worker_pipe_lock.acquire()
- worker_pipe.write(data)
- worker_pipe_lock.release()
+ with bb.utils.lock_timeout(worker_pipe_lock):
+ while(len(data)):
+ written = worker_pipe.write(data)
+ data = data[written:]
except IOError:
sigterm_handler(None, None)
raise
@@ -138,40 +143,59 @@ def sigterm_handler(signum, frame):
os.killpg(0, signal.SIGTERM)
sys.exit()
-def fork_off_task(cfg, data, databuilder, workerdata, fn, task, taskname, taskhash, unihash, appends, taskdepdata, extraconfigdata, quieterrors=False, dry_run_exec=False):
+def fork_off_task(cfg, data, databuilder, workerdata, extraconfigdata, runtask):
+
+ fn = runtask['fn']
+ task = runtask['task']
+ taskname = runtask['taskname']
+ taskhash = runtask['taskhash']
+ unihash = runtask['unihash']
+ appends = runtask['appends']
+ layername = runtask['layername']
+ taskdepdata = runtask['taskdepdata']
+ quieterrors = runtask['quieterrors']
# We need to setup the environment BEFORE the fork, since
# a fork() or exec*() activates PSEUDO...
envbackup = {}
+ fakeroot = False
fakeenv = {}
umask = None
- taskdep = workerdata["taskdeps"][fn]
+ uid = os.getuid()
+ gid = os.getgid()
+
+ taskdep = runtask['taskdep']
if 'umask' in taskdep and taskname in taskdep['umask']:
+ umask = taskdep['umask'][taskname]
+ elif workerdata["umask"]:
+ umask = workerdata["umask"]
+ if umask:
# umask might come in as a number or text string..
try:
- umask = int(taskdep['umask'][taskname],8)
+ umask = int(umask, 8)
except TypeError:
- umask = taskdep['umask'][taskname]
+ pass
- dry_run = cfg.dry_run or dry_run_exec
+ dry_run = cfg.dry_run or runtask['dry_run']
# We can't use the fakeroot environment in a dry run as it possibly hasn't been built
if 'fakeroot' in taskdep and taskname in taskdep['fakeroot'] and not dry_run:
- envvars = (workerdata["fakerootenv"][fn] or "").split()
- for key, value in (var.split('=') for var in envvars):
+ fakeroot = True
+ envvars = (runtask['fakerootenv'] or "").split()
+ for key, value in (var.split('=',1) for var in envvars):
envbackup[key] = os.environ.get(key)
os.environ[key] = value
fakeenv[key] = value
- fakedirs = (workerdata["fakerootdirs"][fn] or "").split()
+ fakedirs = (runtask['fakerootdirs'] or "").split()
for p in fakedirs:
bb.utils.mkdirhier(p)
- logger.debug(2, 'Running %s:%s under fakeroot, fakedirs: %s' %
+ logger.debug2('Running %s:%s under fakeroot, fakedirs: %s' %
(fn, taskname, ', '.join(fakedirs)))
else:
- envvars = (workerdata["fakerootnoenv"][fn] or "").split()
- for key, value in (var.split('=') for var in envvars):
+ envvars = (runtask['fakerootnoenv'] or "").split()
+ for key, value in (var.split('=',1) for var in envvars):
envbackup[key] = os.environ.get(key)
os.environ[key] = value
fakeenv[key] = value
@@ -213,19 +237,21 @@ def fork_off_task(cfg, data, databuilder, workerdata, fn, task, taskname, taskha
# Let SIGHUP exit as SIGTERM
signal.signal(signal.SIGHUP, sigterm_handler)
- # No stdin
- newsi = os.open(os.devnull, os.O_RDWR)
- os.dup2(newsi, sys.stdin.fileno())
+ # No stdin & stdout
+ # stdout is used as a status report channel and must not be used by child processes.
+ dumbio = os.open(os.devnull, os.O_RDWR)
+ os.dup2(dumbio, sys.stdin.fileno())
+ os.dup2(dumbio, sys.stdout.fileno())
- if umask:
+ if umask is not None:
os.umask(umask)
try:
- bb_cache = bb.cache.NoCache(databuilder)
(realfn, virtual, mc) = bb.cache.virtualfn2realfn(fn)
the_data = databuilder.mcdata[mc]
the_data.setVar("BB_WORKERCONTEXT", "1")
the_data.setVar("BB_TASKDEPDATA", taskdepdata)
+ the_data.setVar('BB_CURRENTTASK', taskname.replace("do_", ""))
if cfg.limited_deps:
the_data.setVar("BB_LIMITEDDEPS", "1")
the_data.setVar("BUILDNAME", workerdata["buildname"])
@@ -239,12 +265,20 @@ def fork_off_task(cfg, data, databuilder, workerdata, fn, task, taskname, taskha
bb.parse.siggen.set_taskhashes(workerdata["newhashes"])
ret = 0
- the_data = bb_cache.loadDataFull(fn, appends)
+ the_data = databuilder.parseRecipe(fn, appends, layername)
the_data.setVar('BB_TASKHASH', taskhash)
the_data.setVar('BB_UNIHASH', unihash)
+ bb.parse.siggen.setup_datacache_from_datastore(fn, the_data)
bb.utils.set_process_name("%s:%s" % (the_data.getVar("PN"), taskname.replace("do_", "")))
+ if not bb.utils.to_boolean(the_data.getVarFlag(taskname, 'network')):
+ if bb.utils.is_local_uid(uid):
+ logger.debug("Attempting to disable network for %s" % taskname)
+ bb.utils.disable_network(uid, gid)
+ else:
+ logger.debug("Skipping disable network for %s since %s is not a local uid." % (taskname, uid))
+
# exported_vars() returns a generator which *cannot* be passed to os.environ.update()
# successfully. We also need to unset anything from the environment which shouldn't be there
exports = bb.data.exported_vars(the_data)
@@ -273,10 +307,20 @@ def fork_off_task(cfg, data, databuilder, workerdata, fn, task, taskname, taskha
if not quieterrors:
logger.critical(traceback.format_exc())
os._exit(1)
+
+ sys.stdout.flush()
+ sys.stderr.flush()
+
try:
if dry_run:
return 0
- return bb.build.exec_task(fn, taskname, the_data, cfg.profile)
+ try:
+ ret = bb.build.exec_task(fn, taskname, the_data, cfg.profile)
+ finally:
+ if fakeroot:
+ fakerootcmd = shlex.split(the_data.getVar("FAKEROOTCMD"))
+ subprocess.run(fakerootcmd + ['-S'], check=True, stdout=subprocess.PIPE)
+ return ret
except:
os._exit(1)
if not profiling:
@@ -308,12 +352,12 @@ class runQueueWorkerPipe():
if pipeout:
pipeout.close()
bb.utils.nonblockingfd(self.input)
- self.queue = b""
+ self.queue = bytearray()
def read(self):
start = len(self.queue)
try:
- self.queue = self.queue + (self.input.read(102400) or b"")
+ self.queue.extend(self.input.read(102400) or b"")
except (OSError, IOError) as e:
if e.errno != errno.EAGAIN:
raise
@@ -321,7 +365,9 @@ class runQueueWorkerPipe():
end = len(self.queue)
index = self.queue.find(b"</event>")
while index != -1:
- worker_fire_prepickled(self.queue[:index+8])
+ msg = self.queue[:index+8]
+ assert msg.startswith(b"<event>") and msg.count(b"<event>") == 1
+ worker_fire_prepickled(msg)
self.queue = self.queue[index+8:]
index = self.queue.find(b"</event>")
return (end > start)
@@ -339,7 +385,7 @@ class BitbakeWorker(object):
def __init__(self, din):
self.input = din
bb.utils.nonblockingfd(self.input)
- self.queue = b""
+ self.queue = bytearray()
self.cookercfg = None
self.databuilder = None
self.data = None
@@ -373,7 +419,7 @@ class BitbakeWorker(object):
if len(r) == 0:
# EOF on pipe, server must have terminated
self.sigterm_exception(signal.SIGTERM, None)
- self.queue = self.queue + r
+ self.queue.extend(r)
except (OSError, IOError):
pass
if len(self.queue):
@@ -393,19 +439,35 @@ class BitbakeWorker(object):
while self.process_waitpid():
continue
-
def handle_item(self, item, func):
- if self.queue.startswith(b"<" + item + b">"):
- index = self.queue.find(b"</" + item + b">")
- while index != -1:
- func(self.queue[(len(item) + 2):index])
- self.queue = self.queue[(index + len(item) + 3):]
- index = self.queue.find(b"</" + item + b">")
+ opening_tag = b"<" + item + b">"
+ if not self.queue.startswith(opening_tag):
+ return
+
+ tag_len = len(opening_tag)
+ if len(self.queue) < tag_len + 4:
+ # we need to receive more data
+ return
+ header = self.queue[tag_len:tag_len + 4]
+ payload_len = int.from_bytes(header, 'big')
+ # closing tag has length (tag_len + 1)
+ if len(self.queue) < tag_len * 2 + 1 + payload_len:
+ # we need to receive more data
+ return
+
+ index = self.queue.find(b"</" + item + b">")
+ if index != -1:
+ try:
+ func(self.queue[(tag_len + 4):index])
+ except pickle.UnpicklingError:
+ workerlog_write("Unable to unpickle data: %s\n" % ":".join("{:02x}".format(c) for c in self.queue))
+ raise
+ self.queue = self.queue[(index + len(b"</") + len(item) + len(b">")):]
def handle_cookercfg(self, data):
self.cookercfg = pickle.loads(data)
self.databuilder = bb.cookerdata.CookerDataBuilder(self.cookercfg, worker=True)
- self.databuilder.parseBaseConfiguration()
+ self.databuilder.parseBaseConfiguration(worker=True)
self.data = self.databuilder.data
def handle_extraconfigdata(self, data):
@@ -413,13 +475,14 @@ class BitbakeWorker(object):
def handle_workerdata(self, data):
self.workerdata = pickle.loads(data)
+ bb.build.verboseShellLogging = self.workerdata["build_verbose_shell"]
+ bb.build.verboseStdoutLogging = self.workerdata["build_verbose_stdout"]
bb.msg.loggerDefaultLogLevel = self.workerdata["logdefaultlevel"]
- bb.msg.loggerDefaultVerbose = self.workerdata["logdefaultverbose"]
- bb.msg.loggerVerboseLogs = self.workerdata["logdefaultverboselogs"]
bb.msg.loggerDefaultDomains = self.workerdata["logdefaultdomain"]
for mc in self.databuilder.mcdata:
self.databuilder.mcdata[mc].setVar("PRSERV_HOST", self.workerdata["prhost"])
self.databuilder.mcdata[mc].setVar("BB_HASHSERVE", self.workerdata["hashservaddr"])
+ self.databuilder.mcdata[mc].setVar("__bbclasstype", "recipe")
def handle_newtaskhashes(self, data):
self.workerdata["newhashes"] = pickle.loads(data)
@@ -437,11 +500,15 @@ class BitbakeWorker(object):
sys.exit(0)
def handle_runtask(self, data):
- fn, task, taskname, taskhash, unihash, quieterrors, appends, taskdepdata, dry_run_exec = pickle.loads(data)
- workerlog_write("Handling runtask %s %s %s\n" % (task, fn, taskname))
+ runtask = pickle.loads(data)
- pid, pipein, pipeout = fork_off_task(self.cookercfg, self.data, self.databuilder, self.workerdata, fn, task, taskname, taskhash, unihash, appends, taskdepdata, self.extraconfigdata, quieterrors, dry_run_exec)
+ fn = runtask['fn']
+ task = runtask['task']
+ taskname = runtask['taskname']
+ workerlog_write("Handling runtask %s %s %s\n" % (task, fn, taskname))
+
+ pid, pipein, pipeout = fork_off_task(self.cookercfg, self.data, self.databuilder, self.workerdata, self.extraconfigdata, runtask)
self.build_pids[pid] = task
self.build_pipes[pid] = runQueueWorkerPipe(pipein, pipeout)
@@ -505,9 +572,11 @@ except BaseException as e:
import traceback
sys.stderr.write(traceback.format_exc())
sys.stderr.write(str(e))
+finally:
+ worker_thread_exit = True
+ worker_thread.join()
-worker_thread_exit = True
-worker_thread.join()
-
-workerlog_write("exitting")
+workerlog_write("exiting")
+if not normalexit:
+ sys.exit(1)
sys.exit(0)