summaryrefslogtreecommitdiffstats
path: root/bitbake/lib/bb/runqueue.py
diff options
context:
space:
mode:
Diffstat (limited to 'bitbake/lib/bb/runqueue.py')
-rw-r--r--bitbake/lib/bb/runqueue.py837
1 files changed, 527 insertions, 310 deletions
diff --git a/bitbake/lib/bb/runqueue.py b/bitbake/lib/bb/runqueue.py
index 10511a09dc..bc7e18175d 100644
--- a/bitbake/lib/bb/runqueue.py
+++ b/bitbake/lib/bb/runqueue.py
@@ -24,6 +24,7 @@ import pickle
from multiprocessing import Process
import shlex
import pprint
+import time
bblogger = logging.getLogger("BitBake")
logger = logging.getLogger("BitBake.RunQueue")
@@ -154,11 +155,82 @@ class RunQueueScheduler(object):
self.stamps = {}
for tid in self.rqdata.runtaskentries:
(mc, fn, taskname, taskfn) = split_tid_mcfn(tid)
- self.stamps[tid] = bb.build.stampfile(taskname, self.rqdata.dataCaches[mc], taskfn, noextra=True)
+ self.stamps[tid] = bb.parse.siggen.stampfile_mcfn(taskname, taskfn, extrainfo=False)
if tid in self.rq.runq_buildable:
- self.buildable.append(tid)
+ self.buildable.add(tid)
self.rev_prio_map = None
+ self.is_pressure_usable()
+
+ def is_pressure_usable(self):
+ """
+ If monitoring pressure, return True if pressure files can be open and read. For example
+ openSUSE /proc/pressure/* files have readable file permissions but when read the error EOPNOTSUPP (Operation not supported)
+ is returned.
+ """
+ if self.rq.max_cpu_pressure or self.rq.max_io_pressure or self.rq.max_memory_pressure:
+ try:
+ with open("/proc/pressure/cpu") as cpu_pressure_fds, \
+ open("/proc/pressure/io") as io_pressure_fds, \
+ open("/proc/pressure/memory") as memory_pressure_fds:
+
+ self.prev_cpu_pressure = cpu_pressure_fds.readline().split()[4].split("=")[1]
+ self.prev_io_pressure = io_pressure_fds.readline().split()[4].split("=")[1]
+ self.prev_memory_pressure = memory_pressure_fds.readline().split()[4].split("=")[1]
+ self.prev_pressure_time = time.time()
+ self.check_pressure = True
+ except:
+ bb.note("The /proc/pressure files can't be read. Continuing build without monitoring pressure")
+ self.check_pressure = False
+ else:
+ self.check_pressure = False
+
+ def exceeds_max_pressure(self):
+ """
+ Monitor the difference in total pressure at least once per second, if
+ BB_PRESSURE_MAX_{CPU|IO|MEMORY} are set, return True if above threshold.
+ """
+ if self.check_pressure:
+ with open("/proc/pressure/cpu") as cpu_pressure_fds, \
+ open("/proc/pressure/io") as io_pressure_fds, \
+ open("/proc/pressure/memory") as memory_pressure_fds:
+ # extract "total" from /proc/pressure/{cpu|io}
+ curr_cpu_pressure = cpu_pressure_fds.readline().split()[4].split("=")[1]
+ curr_io_pressure = io_pressure_fds.readline().split()[4].split("=")[1]
+ curr_memory_pressure = memory_pressure_fds.readline().split()[4].split("=")[1]
+ now = time.time()
+ tdiff = now - self.prev_pressure_time
+ psi_accumulation_interval = 1.0
+ cpu_pressure = (float(curr_cpu_pressure) - float(self.prev_cpu_pressure)) / tdiff
+ io_pressure = (float(curr_io_pressure) - float(self.prev_io_pressure)) / tdiff
+ memory_pressure = (float(curr_memory_pressure) - float(self.prev_memory_pressure)) / tdiff
+ exceeds_cpu_pressure = self.rq.max_cpu_pressure and cpu_pressure > self.rq.max_cpu_pressure
+ exceeds_io_pressure = self.rq.max_io_pressure and io_pressure > self.rq.max_io_pressure
+ exceeds_memory_pressure = self.rq.max_memory_pressure and memory_pressure > self.rq.max_memory_pressure
+
+ if tdiff > psi_accumulation_interval:
+ self.prev_cpu_pressure = curr_cpu_pressure
+ self.prev_io_pressure = curr_io_pressure
+ self.prev_memory_pressure = curr_memory_pressure
+ self.prev_pressure_time = now
+
+ pressure_state = (exceeds_cpu_pressure, exceeds_io_pressure, exceeds_memory_pressure)
+ pressure_values = (round(cpu_pressure,1), self.rq.max_cpu_pressure, round(io_pressure,1), self.rq.max_io_pressure, round(memory_pressure,1), self.rq.max_memory_pressure)
+ if hasattr(self, "pressure_state") and pressure_state != self.pressure_state:
+ bb.note("Pressure status changed to CPU: %s, IO: %s, Mem: %s (CPU: %s/%s, IO: %s/%s, Mem: %s/%s) - using %s/%s bitbake threads" % (pressure_state + pressure_values + (len(self.rq.runq_running.difference(self.rq.runq_complete)), self.rq.number_tasks)))
+ self.pressure_state = pressure_state
+ return (exceeds_cpu_pressure or exceeds_io_pressure or exceeds_memory_pressure)
+ elif self.rq.max_loadfactor:
+ limit = False
+ loadfactor = float(os.getloadavg()[0]) / os.cpu_count()
+ # bb.warn("Comparing %s to %s" % (loadfactor, self.rq.max_loadfactor))
+ if loadfactor > self.rq.max_loadfactor:
+ limit = True
+ if hasattr(self, "loadfactor_limit") and limit != self.loadfactor_limit:
+ bb.note("Load average limiting set to %s as load average: %s - using %s/%s bitbake threads" % (limit, loadfactor, len(self.rq.runq_running.difference(self.rq.runq_complete)), self.rq.number_tasks))
+ self.loadfactor_limit = limit
+ return limit
+ return False
def next_buildable_task(self):
"""
@@ -172,6 +244,12 @@ class RunQueueScheduler(object):
if not buildable:
return None
+ # Bitbake requires that at least one task be active. Only check for pressure if
+ # this is the case, otherwise the pressure limitation could result in no tasks
+ # being active and no new tasks started thereby, at times, breaking the scheduler.
+ if self.rq.stats.active and self.exceeds_max_pressure():
+ return None
+
# Filter out tasks that have a max number of threads that have been exceeded
skip_buildable = {}
for running in self.rq.runq_running.difference(self.rq.runq_complete):
@@ -202,11 +280,11 @@ class RunQueueScheduler(object):
best = None
bestprio = None
for tid in buildable:
- taskname = taskname_from_tid(tid)
- if taskname in skip_buildable and skip_buildable[taskname] >= int(self.skip_maxthread[taskname]):
- continue
prio = self.rev_prio_map[tid]
if bestprio is None or bestprio > prio:
+ taskname = taskname_from_tid(tid)
+ if taskname in skip_buildable and skip_buildable[taskname] >= int(self.skip_maxthread[taskname]):
+ continue
stamp = self.stamps[tid]
if stamp in self.rq.build_stamps.values():
continue
@@ -385,10 +463,9 @@ class RunQueueData:
self.rq = rq
self.warn_multi_bb = False
- self.stampwhitelist = cfgData.getVar("BB_STAMP_WHITELIST") or ""
- self.multi_provider_whitelist = (cfgData.getVar("MULTI_PROVIDER_WHITELIST") or "").split()
- self.setscenewhitelist = get_setscene_enforce_whitelist(cfgData, targets)
- self.setscenewhitelist_checked = False
+ self.multi_provider_allowed = (cfgData.getVar("BB_MULTI_PROVIDER_ALLOWED") or "").split()
+ self.setscene_ignore_tasks = get_setscene_enforce_ignore_tasks(cfgData, targets)
+ self.setscene_ignore_tasks_checked = False
self.setscene_enforce = (cfgData.getVar('BB_SETSCENE_ENFORCE') == "1")
self.init_progress_reporter = bb.progress.DummyMultiStageProcessProgressReporter()
@@ -486,7 +563,7 @@ class RunQueueData:
msgs.append(" Task %s (dependent Tasks %s)\n" % (dep, self.runq_depends_names(self.runtaskentries[dep].depends)))
msgs.append("\n")
if len(valid_chains) > 10:
- msgs.append("Aborted dependency loops search after 10 matches.\n")
+ msgs.append("Halted dependency loops search after 10 matches.\n")
raise TooManyLoops
continue
scan = False
@@ -547,7 +624,7 @@ class RunQueueData:
next_points.append(revdep)
task_done[revdep] = True
endpoints = next_points
- if len(next_points) == 0:
+ if not next_points:
break
# Circular dependency sanity check
@@ -589,15 +666,18 @@ class RunQueueData:
found = False
for mc in self.taskData:
- if len(taskData[mc].taskentries) > 0:
+ if taskData[mc].taskentries:
found = True
break
if not found:
# Nothing to do
return 0
+ bb.parse.siggen.setup_datacache(self.dataCaches)
+
self.init_progress_reporter.start()
self.init_progress_reporter.next_stage()
+ bb.event.check_for_interrupts(self.cooker.data)
# Step A - Work out a list of tasks to run
#
@@ -643,6 +723,8 @@ class RunQueueData:
frommc = mcdependency[1]
mcdep = mcdependency[2]
deptask = mcdependency[4]
+ if mcdep not in taskData:
+ bb.fatal("Multiconfig '%s' is referenced in multiconfig dependency '%s' but not enabled in BBMULTICONFIG?" % (mcdep, dep))
if mc == frommc:
fn = taskData[mcdep].build_targets[pn][0]
newdep = '%s:%s' % (fn,deptask)
@@ -744,6 +826,7 @@ class RunQueueData:
#self.dump_data()
self.init_progress_reporter.next_stage()
+ bb.event.check_for_interrupts(self.cooker.data)
# Resolve recursive 'recrdeptask' dependencies (Part B)
#
@@ -773,7 +856,7 @@ class RunQueueData:
# Find the dependency chain endpoints
endpoints = set()
for tid in self.runtaskentries:
- if len(deps[tid]) == 0:
+ if not deps[tid]:
endpoints.add(tid)
# Iterate the chains collating dependencies
while endpoints:
@@ -784,11 +867,11 @@ class RunQueueData:
cumulativedeps[dep].update(cumulativedeps[tid])
if tid in deps[dep]:
deps[dep].remove(tid)
- if len(deps[dep]) == 0:
+ if not deps[dep]:
next.add(dep)
endpoints = next
#for tid in deps:
- # if len(deps[tid]) != 0:
+ # if deps[tid]:
# bb.warn("Sanity test failure, dependencies left for %s (%s)" % (tid, deps[tid]))
# Loop here since recrdeptasks can depend upon other recrdeptasks and we have to
@@ -840,6 +923,7 @@ class RunQueueData:
self.runtaskentries[tid].depends.difference_update(recursivetasksselfref)
self.init_progress_reporter.next_stage()
+ bb.event.check_for_interrupts(self.cooker.data)
#self.dump_data()
@@ -878,7 +962,7 @@ class RunQueueData:
bb.debug(1, "Task %s is marked nostamp, cannot invalidate this task" % taskname)
else:
logger.verbose("Invalidate task %s, %s", taskname, fn)
- bb.parse.siggen.invalidate_task(taskname, self.dataCaches[mc], taskfn)
+ bb.parse.siggen.invalidate_task(taskname, taskfn)
self.target_tids = []
for (mc, target, task, fn) in self.targets:
@@ -921,47 +1005,54 @@ class RunQueueData:
mark_active(tid, 1)
self.init_progress_reporter.next_stage()
+ bb.event.check_for_interrupts(self.cooker.data)
# Step C - Prune all inactive tasks
#
# Once all active tasks are marked, prune the ones we don't need.
- delcount = {}
- for tid in list(self.runtaskentries.keys()):
- if tid not in runq_build:
- delcount[tid] = self.runtaskentries[tid]
- del self.runtaskentries[tid]
-
# Handle --runall
if self.cooker.configuration.runall:
# re-run the mark_active and then drop unused tasks from new list
- runq_build = {}
- for task in self.cooker.configuration.runall:
- if not task.startswith("do_"):
- task = "do_{0}".format(task)
- runall_tids = set()
- for tid in list(self.runtaskentries):
- wanttid = "{0}:{1}".format(fn_from_tid(tid), task)
- if wanttid in delcount:
- self.runtaskentries[wanttid] = delcount[wanttid]
- if wanttid in self.runtaskentries:
- runall_tids.add(wanttid)
-
- for tid in list(runall_tids):
- mark_active(tid,1)
- if self.cooker.configuration.force:
- invalidate_task(tid, False)
+ runall_tids = set()
+ added = True
+ while added:
+ reduced_tasklist = set(self.runtaskentries.keys())
+ for tid in list(self.runtaskentries.keys()):
+ if tid not in runq_build:
+ reduced_tasklist.remove(tid)
+ runq_build = {}
- for tid in list(self.runtaskentries.keys()):
- if tid not in runq_build:
- delcount[tid] = self.runtaskentries[tid]
- del self.runtaskentries[tid]
+ orig = runall_tids
+ runall_tids = set()
+ for task in self.cooker.configuration.runall:
+ if not task.startswith("do_"):
+ task = "do_{0}".format(task)
+ for tid in reduced_tasklist:
+ wanttid = "{0}:{1}".format(fn_from_tid(tid), task)
+ if wanttid in self.runtaskentries:
+ runall_tids.add(wanttid)
+
+ for tid in list(runall_tids):
+ mark_active(tid, 1)
+ self.target_tids.append(tid)
+ if self.cooker.configuration.force:
+ invalidate_task(tid, False)
+ added = runall_tids - orig
+
+ delcount = set()
+ for tid in list(self.runtaskentries.keys()):
+ if tid not in runq_build:
+ delcount.add(tid)
+ del self.runtaskentries[tid]
- if len(self.runtaskentries) == 0:
+ if self.cooker.configuration.runall:
+ if not self.runtaskentries:
bb.msg.fatal("RunQueue", "Could not find any tasks with the tasknames %s to run within the recipes of the taskgraphs of the targets %s" % (str(self.cooker.configuration.runall), str(self.targets)))
self.init_progress_reporter.next_stage()
+ bb.event.check_for_interrupts(self.cooker.data)
# Handle runonly
if self.cooker.configuration.runonly:
@@ -971,19 +1062,19 @@ class RunQueueData:
for task in self.cooker.configuration.runonly:
if not task.startswith("do_"):
task = "do_{0}".format(task)
- runonly_tids = { k: v for k, v in self.runtaskentries.items() if taskname_from_tid(k) == task }
+ runonly_tids = [k for k in self.runtaskentries.keys() if taskname_from_tid(k) == task]
- for tid in list(runonly_tids):
- mark_active(tid,1)
+ for tid in runonly_tids:
+ mark_active(tid, 1)
if self.cooker.configuration.force:
invalidate_task(tid, False)
for tid in list(self.runtaskentries.keys()):
if tid not in runq_build:
- delcount[tid] = self.runtaskentries[tid]
+ delcount.add(tid)
del self.runtaskentries[tid]
- if len(self.runtaskentries) == 0:
+ if not self.runtaskentries:
bb.msg.fatal("RunQueue", "Could not find any tasks with the tasknames %s to run within the taskgraphs of the targets %s" % (str(self.cooker.configuration.runonly), str(self.targets)))
#
@@ -991,8 +1082,8 @@ class RunQueueData:
#
# Check to make sure we still have tasks to run
- if len(self.runtaskentries) == 0:
- if not taskData[''].abort:
+ if not self.runtaskentries:
+ if not taskData[''].halt:
bb.msg.fatal("RunQueue", "All buildable tasks have been run but the build is incomplete (--continue mode). Errors for the tasks that failed will have been printed above.")
else:
bb.msg.fatal("RunQueue", "No active tasks and not in --continue mode?! Please report this bug.")
@@ -1002,6 +1093,7 @@ class RunQueueData:
logger.verbose("Assign Weightings")
self.init_progress_reporter.next_stage()
+ bb.event.check_for_interrupts(self.cooker.data)
# Generate a list of reverse dependencies to ease future calculations
for tid in self.runtaskentries:
@@ -1009,13 +1101,14 @@ class RunQueueData:
self.runtaskentries[dep].revdeps.add(tid)
self.init_progress_reporter.next_stage()
+ bb.event.check_for_interrupts(self.cooker.data)
# Identify tasks at the end of dependency chains
# Error on circular dependency loops (length two)
endpoints = []
for tid in self.runtaskentries:
revdeps = self.runtaskentries[tid].revdeps
- if len(revdeps) == 0:
+ if not revdeps:
endpoints.append(tid)
for dep in revdeps:
if dep in self.runtaskentries[tid].depends:
@@ -1025,12 +1118,14 @@ class RunQueueData:
logger.verbose("Compute totals (have %s endpoint(s))", len(endpoints))
self.init_progress_reporter.next_stage()
+ bb.event.check_for_interrupts(self.cooker.data)
# Calculate task weights
# Check of higher length circular dependencies
self.runq_weight = self.calculate_task_weights(endpoints)
self.init_progress_reporter.next_stage()
+ bb.event.check_for_interrupts(self.cooker.data)
# Sanity Check - Check for multiple tasks building the same provider
for mc in self.dataCaches:
@@ -1051,7 +1146,7 @@ class RunQueueData:
for prov in prov_list:
if len(prov_list[prov]) < 2:
continue
- if prov in self.multi_provider_whitelist:
+ if prov in self.multi_provider_allowed:
continue
seen_pn = []
# If two versions of the same PN are being built its fatal, we don't support it.
@@ -1061,12 +1156,12 @@ class RunQueueData:
seen_pn.append(pn)
else:
bb.fatal("Multiple versions of %s are due to be built (%s). Only one version of a given PN should be built in any given build. You likely need to set PREFERRED_VERSION_%s to select the correct version or don't depend on multiple versions." % (pn, " ".join(prov_list[prov]), pn))
- msg = "Multiple .bb files are due to be built which each provide %s:\n %s" % (prov, "\n ".join(prov_list[prov]))
+ msgs = ["Multiple .bb files are due to be built which each provide %s:\n %s" % (prov, "\n ".join(prov_list[prov]))]
#
# Construct a list of things which uniquely depend on each provider
# since this may help the user figure out which dependency is triggering this warning
#
- msg += "\nA list of tasks depending on these providers is shown and may help explain where the dependency comes from."
+ msgs.append("\nA list of tasks depending on these providers is shown and may help explain where the dependency comes from.")
deplist = {}
commondeps = None
for provfn in prov_list[prov]:
@@ -1086,12 +1181,12 @@ class RunQueueData:
commondeps &= deps
deplist[provfn] = deps
for provfn in deplist:
- msg += "\n%s has unique dependees:\n %s" % (provfn, "\n ".join(deplist[provfn] - commondeps))
+ msgs.append("\n%s has unique dependees:\n %s" % (provfn, "\n ".join(deplist[provfn] - commondeps)))
#
# Construct a list of provides and runtime providers for each recipe
# (rprovides has to cover RPROVIDES, PACKAGES, PACKAGES_DYNAMIC)
#
- msg += "\nIt could be that one recipe provides something the other doesn't and should. The following provider and runtime provider differences may be helpful."
+ msgs.append("\nIt could be that one recipe provides something the other doesn't and should. The following provider and runtime provider differences may be helpful.")
provide_results = {}
rprovide_results = {}
commonprovs = None
@@ -1118,30 +1213,20 @@ class RunQueueData:
else:
commonrprovs &= rprovides
rprovide_results[provfn] = rprovides
- #msg += "\nCommon provides:\n %s" % ("\n ".join(commonprovs))
- #msg += "\nCommon rprovides:\n %s" % ("\n ".join(commonrprovs))
+ #msgs.append("\nCommon provides:\n %s" % ("\n ".join(commonprovs)))
+ #msgs.append("\nCommon rprovides:\n %s" % ("\n ".join(commonrprovs)))
for provfn in prov_list[prov]:
- msg += "\n%s has unique provides:\n %s" % (provfn, "\n ".join(provide_results[provfn] - commonprovs))
- msg += "\n%s has unique rprovides:\n %s" % (provfn, "\n ".join(rprovide_results[provfn] - commonrprovs))
+ msgs.append("\n%s has unique provides:\n %s" % (provfn, "\n ".join(provide_results[provfn] - commonprovs)))
+ msgs.append("\n%s has unique rprovides:\n %s" % (provfn, "\n ".join(rprovide_results[provfn] - commonrprovs)))
if self.warn_multi_bb:
- logger.verbnote(msg)
+ logger.verbnote("".join(msgs))
else:
- logger.error(msg)
+ logger.error("".join(msgs))
self.init_progress_reporter.next_stage()
-
- # Create a whitelist usable by the stamp checks
- self.stampfnwhitelist = {}
- for mc in self.taskData:
- self.stampfnwhitelist[mc] = []
- for entry in self.stampwhitelist.split():
- if entry not in self.taskData[mc].build_targets:
- continue
- fn = self.taskData.build_targets[entry][0]
- self.stampfnwhitelist[mc].append(fn)
-
self.init_progress_reporter.next_stage()
+ bb.event.check_for_interrupts(self.cooker.data)
# Iterate over the task list looking for tasks with a 'setscene' function
self.runq_setscene_tids = set()
@@ -1154,6 +1239,7 @@ class RunQueueData:
self.runq_setscene_tids.add(tid)
self.init_progress_reporter.next_stage()
+ bb.event.check_for_interrupts(self.cooker.data)
# Invalidate task if force mode active
if self.cooker.configuration.force:
@@ -1170,6 +1256,7 @@ class RunQueueData:
invalidate_task(fn + ":" + st, True)
self.init_progress_reporter.next_stage()
+ bb.event.check_for_interrupts(self.cooker.data)
# Create and print to the logs a virtual/xxxx -> PN (fn) table
for mc in taskData:
@@ -1182,18 +1269,20 @@ class RunQueueData:
bb.parse.siggen.tasks_resolved(virtmap, virtpnmap, self.dataCaches[mc])
self.init_progress_reporter.next_stage()
+ bb.event.check_for_interrupts(self.cooker.data)
bb.parse.siggen.set_setscene_tasks(self.runq_setscene_tids)
# Iterate over the task list and call into the siggen code
dealtwith = set()
todeal = set(self.runtaskentries)
- while len(todeal) > 0:
+ while todeal:
for tid in todeal.copy():
- if len(self.runtaskentries[tid].depends - dealtwith) == 0:
+ if not (self.runtaskentries[tid].depends - dealtwith):
dealtwith.add(tid)
todeal.remove(tid)
self.prepare_task_hash(tid)
+ bb.event.check_for_interrupts(self.cooker.data)
bb.parse.siggen.writeout_file_checksum_cache()
@@ -1201,9 +1290,8 @@ class RunQueueData:
return len(self.runtaskentries)
def prepare_task_hash(self, tid):
- dc = bb.parse.siggen.get_data_caches(self.dataCaches, mc_from_tid(tid))
- bb.parse.siggen.prep_taskhash(tid, self.runtaskentries[tid].depends, dc)
- self.runtaskentries[tid].hash = bb.parse.siggen.get_taskhash(tid, self.runtaskentries[tid].depends, dc)
+ bb.parse.siggen.prep_taskhash(tid, self.runtaskentries[tid].depends, self.dataCaches)
+ self.runtaskentries[tid].hash = bb.parse.siggen.get_taskhash(tid, self.runtaskentries[tid].depends, self.dataCaches)
self.runtaskentries[tid].unihash = bb.parse.siggen.get_unihash(tid)
def dump_data(self):
@@ -1229,7 +1317,6 @@ class RunQueue:
self.cfgData = cfgData
self.rqdata = RunQueueData(self, cooker, cfgData, dataCaches, taskData, targets)
- self.stamppolicy = cfgData.getVar("BB_STAMP_POLICY") or "perfile"
self.hashvalidate = cfgData.getVar("BB_HASHCHECK_FUNCTION") or None
self.depvalidate = cfgData.getVar("BB_SETSCENE_DEPVALID") or None
@@ -1248,32 +1335,40 @@ class RunQueue:
self.worker = {}
self.fakeworker = {}
+ @staticmethod
+ def send_pickled_data(worker, data, name):
+ msg = bytearray()
+ msg.extend(b"<" + name.encode() + b">")
+ pickled_data = pickle.dumps(data)
+ msg.extend(len(pickled_data).to_bytes(4, 'big'))
+ msg.extend(pickled_data)
+ msg.extend(b"</" + name.encode() + b">")
+ worker.stdin.write(msg)
+
def _start_worker(self, mc, fakeroot = False, rqexec = None):
logger.debug("Starting bitbake-worker")
magic = "decafbad"
if self.cooker.configuration.profile:
magic = "decafbadbad"
fakerootlogs = None
+
+ workerscript = os.path.realpath(os.path.dirname(__file__) + "/../../bin/bitbake-worker")
if fakeroot:
magic = magic + "beef"
mcdata = self.cooker.databuilder.mcdata[mc]
fakerootcmd = shlex.split(mcdata.getVar("FAKEROOTCMD"))
fakerootenv = (mcdata.getVar("FAKEROOTBASEENV") or "").split()
env = os.environ.copy()
- for key, value in (var.split('=') for var in fakerootenv):
+ for key, value in (var.split('=',1) for var in fakerootenv):
env[key] = value
- worker = subprocess.Popen(fakerootcmd + ["bitbake-worker", magic], stdout=subprocess.PIPE, stdin=subprocess.PIPE, env=env)
+ worker = subprocess.Popen(fakerootcmd + [sys.executable, workerscript, magic], stdout=subprocess.PIPE, stdin=subprocess.PIPE, env=env)
fakerootlogs = self.rqdata.dataCaches[mc].fakerootlogs
else:
- worker = subprocess.Popen(["bitbake-worker", magic], stdout=subprocess.PIPE, stdin=subprocess.PIPE)
+ worker = subprocess.Popen([sys.executable, workerscript, magic], stdout=subprocess.PIPE, stdin=subprocess.PIPE)
bb.utils.nonblockingfd(worker.stdout)
workerpipe = runQueuePipe(worker.stdout, None, self.cfgData, self, rqexec, fakerootlogs=fakerootlogs)
workerdata = {
- "taskdeps" : self.rqdata.dataCaches[mc].task_deps,
- "fakerootenv" : self.rqdata.dataCaches[mc].fakerootenv,
- "fakerootdirs" : self.rqdata.dataCaches[mc].fakerootdirs,
- "fakerootnoenv" : self.rqdata.dataCaches[mc].fakerootnoenv,
"sigdata" : bb.parse.siggen.get_taskdata(),
"logdefaultlevel" : bb.msg.loggerDefaultLogLevel,
"build_verbose_shell" : self.cooker.configuration.build_verbose_shell,
@@ -1287,9 +1382,9 @@ class RunQueue:
"umask" : self.cfgData.getVar("BB_DEFAULT_UMASK"),
}
- worker.stdin.write(b"<cookerconfig>" + pickle.dumps(self.cooker.configuration) + b"</cookerconfig>")
- worker.stdin.write(b"<extraconfigdata>" + pickle.dumps(self.cooker.extraconfigdata) + b"</extraconfigdata>")
- worker.stdin.write(b"<workerdata>" + pickle.dumps(workerdata) + b"</workerdata>")
+ RunQueue.send_pickled_data(worker, self.cooker.configuration, "cookerconfig")
+ RunQueue.send_pickled_data(worker, self.cooker.extraconfigdata, "extraconfigdata")
+ RunQueue.send_pickled_data(worker, workerdata, "workerdata")
worker.stdin.flush()
return RunQueueWorker(worker, workerpipe)
@@ -1299,7 +1394,7 @@ class RunQueue:
return
logger.debug("Teardown for bitbake-worker")
try:
- worker.process.stdin.write(b"<quit></quit>")
+ RunQueue.send_pickled_data(worker.process, b"", "quit")
worker.process.stdin.flush()
worker.process.stdin.close()
except IOError:
@@ -1311,12 +1406,12 @@ class RunQueue:
continue
worker.pipe.close()
- def start_worker(self):
+ def start_worker(self, rqexec):
if self.worker:
self.teardown_workers()
self.teardown = False
for mc in self.rqdata.dataCaches:
- self.worker[mc] = self._start_worker(mc)
+ self.worker[mc] = self._start_worker(mc, False, rqexec)
def start_fakeworker(self, rqexec, mc):
if not mc in self.fakeworker:
@@ -1358,15 +1453,7 @@ class RunQueue:
if taskname is None:
taskname = tn
- if self.stamppolicy == "perfile":
- fulldeptree = False
- else:
- fulldeptree = True
- stampwhitelist = []
- if self.stamppolicy == "whitelist":
- stampwhitelist = self.rqdata.stampfnwhitelist[mc]
-
- stampfile = bb.build.stampfile(taskname, self.rqdata.dataCaches[mc], taskfn)
+ stampfile = bb.parse.siggen.stampfile_mcfn(taskname, taskfn)
# If the stamp is missing, it's not current
if not os.access(stampfile, os.F_OK):
@@ -1378,7 +1465,7 @@ class RunQueue:
logger.debug2("%s.%s is nostamp\n", fn, taskname)
return False
- if taskname != "do_setscene" and taskname.endswith("_setscene"):
+ if taskname.endswith("_setscene"):
return True
if cache is None:
@@ -1389,15 +1476,15 @@ class RunQueue:
for dep in self.rqdata.runtaskentries[tid].depends:
if iscurrent:
(mc2, fn2, taskname2, taskfn2) = split_tid_mcfn(dep)
- stampfile2 = bb.build.stampfile(taskname2, self.rqdata.dataCaches[mc2], taskfn2)
- stampfile3 = bb.build.stampfile(taskname2 + "_setscene", self.rqdata.dataCaches[mc2], taskfn2)
+ stampfile2 = bb.parse.siggen.stampfile_mcfn(taskname2, taskfn2)
+ stampfile3 = bb.parse.siggen.stampfile_mcfn(taskname2 + "_setscene", taskfn2)
t2 = get_timestamp(stampfile2)
t3 = get_timestamp(stampfile3)
if t3 and not t2:
continue
if t3 and t3 > t2:
continue
- if fn == fn2 or (fulldeptree and fn2 not in stampwhitelist):
+ if fn == fn2:
if not t2:
logger.debug2('Stampfile %s does not exist', stampfile2)
iscurrent = False
@@ -1447,10 +1534,11 @@ class RunQueue:
"""
Run the tasks in a queue prepared by rqdata.prepare()
Upon failure, optionally try to recover the build using any alternate providers
- (if the abort on failure configuration option isn't set)
+ (if the halt on failure configuration option isn't set)
"""
retval = True
+ bb.event.check_for_interrupts(self.cooker.data)
if self.state is runQueuePrepare:
# NOTE: if you add, remove or significantly refactor the stages of this
@@ -1479,10 +1567,13 @@ class RunQueue:
if not self.dm_event_handler_registered:
res = bb.event.register(self.dm_event_handler_name,
- lambda x: self.dm.check(self) if self.state in [runQueueRunning, runQueueCleanUp] else False,
+ lambda x, y: self.dm.check(self) if self.state in [runQueueRunning, runQueueCleanUp] else False,
('bb.event.HeartbeatEvent',), data=self.cfgData)
self.dm_event_handler_registered = True
+ self.rqdata.init_progress_reporter.next_stage()
+ self.rqexe = RunQueueExecute(self)
+
dump = self.cooker.configuration.dump_signatures
if dump:
self.rqdata.init_progress_reporter.finish()
@@ -1494,16 +1585,14 @@ class RunQueue:
self.state = runQueueComplete
if self.state is runQueueSceneInit:
- self.rqdata.init_progress_reporter.next_stage()
- self.start_worker()
- self.rqdata.init_progress_reporter.next_stage()
- self.rqexe = RunQueueExecute(self)
+ self.start_worker(self.rqexe)
+ self.rqdata.init_progress_reporter.finish()
# If we don't have any setscene functions, skip execution
- if len(self.rqdata.runq_setscene_tids) == 0:
+ if not self.rqdata.runq_setscene_tids:
logger.info('No setscene tasks')
for tid in self.rqdata.runtaskentries:
- if len(self.rqdata.runtaskentries[tid].depends) == 0:
+ if not self.rqdata.runtaskentries[tid].depends:
self.rqexe.setbuildable(tid)
self.rqexe.tasks_notcovered.add(tid)
self.rqexe.sqdone = True
@@ -1576,29 +1665,28 @@ class RunQueue:
else:
self.rqexe.finish()
- def rq_dump_sigfn(self, fn, options):
- bb_cache = bb.cache.NoCache(self.cooker.databuilder)
- mc = bb.runqueue.mc_from_tid(fn)
- the_data = bb_cache.loadDataFull(fn, self.cooker.collections[mc].get_file_appends(fn))
- siggen = bb.parse.siggen
- dataCaches = self.rqdata.dataCaches
- siggen.dump_sigfn(fn, dataCaches, options)
+ def _rq_dump_sigtid(self, tids):
+ for tid in tids:
+ (mc, fn, taskname, taskfn) = split_tid_mcfn(tid)
+ dataCaches = self.rqdata.dataCaches
+ bb.parse.siggen.dump_sigtask(taskfn, taskname, dataCaches[mc].stamp[taskfn], True)
def dump_signatures(self, options):
- fns = set()
- bb.note("Reparsing files to collect dependency data")
+ if bb.cooker.CookerFeatures.RECIPE_SIGGEN_INFO not in self.cooker.featureset:
+ bb.fatal("The dump signatures functionality needs the RECIPE_SIGGEN_INFO feature enabled")
- for tid in self.rqdata.runtaskentries:
- fn = fn_from_tid(tid)
- fns.add(fn)
+ bb.note("Writing task signature files")
max_process = int(self.cfgData.getVar("BB_NUMBER_PARSE_THREADS") or os.cpu_count() or 1)
+ def chunkify(l, n):
+ return [l[i::n] for i in range(n)]
+ tids = chunkify(list(self.rqdata.runtaskentries), max_process)
# We cannot use the real multiprocessing.Pool easily due to some local data
# that can't be pickled. This is a cheap multi-process solution.
launched = []
- while fns:
+ while tids:
if len(launched) < max_process:
- p = Process(target=self.rq_dump_sigfn, args=(fns.pop(), options))
+ p = Process(target=self._rq_dump_sigtid, args=(tids.pop(), ))
p.start()
launched.append(p)
for q in launched:
@@ -1613,6 +1701,17 @@ class RunQueue:
return
def print_diffscenetasks(self):
+ def get_root_invalid_tasks(task, taskdepends, valid, noexec, visited_invalid):
+ invalidtasks = []
+ for t in taskdepends[task].depends:
+ if t not in valid and t not in visited_invalid:
+ invalidtasks.extend(get_root_invalid_tasks(t, taskdepends, valid, noexec, visited_invalid))
+ visited_invalid.add(t)
+
+ direct_invalid = [t for t in taskdepends[task].depends if t not in valid]
+ if not direct_invalid and task not in noexec:
+ invalidtasks = [task]
+ return invalidtasks
noexec = []
tocheck = set()
@@ -1646,46 +1745,49 @@ class RunQueue:
valid_new.add(dep)
invalidtasks = set()
- for tid in self.rqdata.runtaskentries:
- if tid not in valid_new and tid not in noexec:
- invalidtasks.add(tid)
- found = set()
- processed = set()
- for tid in invalidtasks:
+ toptasks = set(["{}:{}".format(t[3], t[2]) for t in self.rqdata.targets])
+ for tid in toptasks:
toprocess = set([tid])
while toprocess:
next = set()
+ visited_invalid = set()
for t in toprocess:
- for dep in self.rqdata.runtaskentries[t].depends:
- if dep in invalidtasks:
- found.add(tid)
- if dep not in processed:
- processed.add(dep)
+ if t not in valid_new and t not in noexec:
+ invalidtasks.update(get_root_invalid_tasks(t, self.rqdata.runtaskentries, valid_new, noexec, visited_invalid))
+ continue
+ if t in self.rqdata.runq_setscene_tids:
+ for dep in self.rqexe.sqdata.sq_deps[t]:
next.add(dep)
+ continue
+
+ for dep in self.rqdata.runtaskentries[t].depends:
+ next.add(dep)
+
toprocess = next
- if tid in found:
- toprocess = set()
tasklist = []
- for tid in invalidtasks.difference(found):
+ for tid in invalidtasks:
tasklist.append(tid)
if tasklist:
bb.plain("The differences between the current build and any cached tasks start at the following tasks:\n" + "\n".join(tasklist))
- return invalidtasks.difference(found)
+ return invalidtasks
def write_diffscenetasks(self, invalidtasks):
+ bb.siggen.check_siggen_version(bb.siggen)
# Define recursion callback
def recursecb(key, hash1, hash2):
hashes = [hash1, hash2]
+ bb.debug(1, "Recursively looking for recipe {} hashes {}".format(key, hashes))
hashfiles = bb.siggen.find_siginfo(key, None, hashes, self.cfgData)
+ bb.debug(1, "Found hashfiles:\n{}".format(hashfiles))
recout = []
if len(hashfiles) == 2:
- out2 = bb.siggen.compare_sigfiles(hashfiles[hash1], hashfiles[hash2], recursecb)
+ out2 = bb.siggen.compare_sigfiles(hashfiles[hash1]['path'], hashfiles[hash2]['path'], recursecb)
recout.extend(list(' ' + l for l in out2))
else:
recout.append("Unable to find matching sigdata for %s with hashes %s or %s" % (key, hash1, hash2))
@@ -1696,20 +1798,25 @@ class RunQueue:
for tid in invalidtasks:
(mc, fn, taskname, taskfn) = split_tid_mcfn(tid)
pn = self.rqdata.dataCaches[mc].pkg_fn[taskfn]
- h = self.rqdata.runtaskentries[tid].hash
- matches = bb.siggen.find_siginfo(pn, taskname, [], self.cfgData)
+ h = self.rqdata.runtaskentries[tid].unihash
+ bb.debug(1, "Looking for recipe {} task {}".format(pn, taskname))
+ matches = bb.siggen.find_siginfo(pn, taskname, [], self.cooker.databuilder.mcdata[mc])
+ bb.debug(1, "Found hashfiles:\n{}".format(matches))
match = None
- for m in matches:
- if h in m:
- match = m
+ for m in matches.values():
+ if h in m['path']:
+ match = m['path']
if match is None:
- bb.fatal("Can't find a task we're supposed to have written out? (hash: %s)?" % h)
+ bb.fatal("Can't find a task we're supposed to have written out? (hash: %s tid: %s)?" % (h, tid))
matches = {k : v for k, v in iter(matches.items()) if h not in k}
+ matches_local = {k : v for k, v in iter(matches.items()) if h not in k and not v['sstate']}
+ if matches_local:
+ matches = matches_local
if matches:
- latestmatch = sorted(matches.keys(), key=lambda f: matches[f])[-1]
+ latestmatch = matches[sorted(matches.keys(), key=lambda h: matches[h]['time'])[-1]]['path']
prevh = __find_sha256__.search(latestmatch).group(0)
output = bb.siggen.compare_sigfiles(latestmatch, match, recursecb)
- bb.plain("\nTask %s:%s couldn't be used from the cache because:\n We need hash %s, closest matching task was %s\n " % (pn, taskname, h, prevh) + '\n '.join(output))
+ bb.plain("\nTask %s:%s couldn't be used from the cache because:\n We need hash %s, most recent matching task was %s\n " % (pn, taskname, h, prevh) + '\n '.join(output))
class RunQueueExecute:
@@ -1722,6 +1829,10 @@ class RunQueueExecute:
self.number_tasks = int(self.cfgData.getVar("BB_NUMBER_THREADS") or 1)
self.scheduler = self.cfgData.getVar("BB_SCHEDULER") or "speed"
+ self.max_cpu_pressure = self.cfgData.getVar("BB_PRESSURE_MAX_CPU")
+ self.max_io_pressure = self.cfgData.getVar("BB_PRESSURE_MAX_IO")
+ self.max_memory_pressure = self.cfgData.getVar("BB_PRESSURE_MAX_MEMORY")
+ self.max_loadfactor = self.cfgData.getVar("BB_LOADFACTOR_MAX")
self.sq_buildable = set()
self.sq_running = set()
@@ -1739,6 +1850,8 @@ class RunQueueExecute:
self.build_stamps2 = []
self.failed_tids = []
self.sq_deferred = {}
+ self.sq_needed_harddeps = set()
+ self.sq_harddep_deferred = set()
self.stampcache = {}
@@ -1748,14 +1861,37 @@ class RunQueueExecute:
self.stats = RunQueueStats(len(self.rqdata.runtaskentries), len(self.rqdata.runq_setscene_tids))
- for mc in rq.worker:
- rq.worker[mc].pipe.setrunqueueexec(self)
- for mc in rq.fakeworker:
- rq.fakeworker[mc].pipe.setrunqueueexec(self)
-
if self.number_tasks <= 0:
bb.fatal("Invalid BB_NUMBER_THREADS %s" % self.number_tasks)
+ lower_limit = 1.0
+ upper_limit = 1000000.0
+ if self.max_cpu_pressure:
+ self.max_cpu_pressure = float(self.max_cpu_pressure)
+ if self.max_cpu_pressure < lower_limit:
+ bb.fatal("Invalid BB_PRESSURE_MAX_CPU %s, minimum value is %s." % (self.max_cpu_pressure, lower_limit))
+ if self.max_cpu_pressure > upper_limit:
+ bb.warn("Your build will be largely unregulated since BB_PRESSURE_MAX_CPU is set to %s. It is very unlikely that such high pressure will be experienced." % (self.max_cpu_pressure))
+
+ if self.max_io_pressure:
+ self.max_io_pressure = float(self.max_io_pressure)
+ if self.max_io_pressure < lower_limit:
+ bb.fatal("Invalid BB_PRESSURE_MAX_IO %s, minimum value is %s." % (self.max_io_pressure, lower_limit))
+ if self.max_io_pressure > upper_limit:
+ bb.warn("Your build will be largely unregulated since BB_PRESSURE_MAX_IO is set to %s. It is very unlikely that such high pressure will be experienced." % (self.max_io_pressure))
+
+ if self.max_memory_pressure:
+ self.max_memory_pressure = float(self.max_memory_pressure)
+ if self.max_memory_pressure < lower_limit:
+ bb.fatal("Invalid BB_PRESSURE_MAX_MEMORY %s, minimum value is %s." % (self.max_memory_pressure, lower_limit))
+ if self.max_memory_pressure > upper_limit:
+ bb.warn("Your build will be largely unregulated since BB_PRESSURE_MAX_MEMORY is set to %s. It is very unlikely that such high pressure will be experienced." % (self.max_io_pressure))
+
+ if self.max_loadfactor:
+ self.max_loadfactor = float(self.max_loadfactor)
+ if self.max_loadfactor <= 0:
+ bb.fatal("Invalid BB_LOADFACTOR_MAX %s, needs to be greater than zero." % (self.max_loadfactor))
+
# List of setscene tasks which we've covered
self.scenequeue_covered = set()
# List of tasks which are covered (including setscene ones)
@@ -1765,11 +1901,6 @@ class RunQueueExecute:
self.tasks_notcovered = set()
self.scenequeue_notneeded = set()
- # We can't skip specified target tasks which aren't setscene tasks
- self.cantskip = set(self.rqdata.target_tids)
- self.cantskip.difference_update(self.rqdata.runq_setscene_tids)
- self.cantskip.intersection_update(self.rqdata.runtaskentries)
-
schedulers = self.get_schedulers()
for scheduler in schedulers:
if self.scheduler == scheduler.name:
@@ -1780,9 +1911,27 @@ class RunQueueExecute:
bb.fatal("Invalid scheduler '%s'. Available schedulers: %s" %
(self.scheduler, ", ".join(obj.name for obj in schedulers)))
- #if len(self.rqdata.runq_setscene_tids) > 0:
+ #if self.rqdata.runq_setscene_tids:
self.sqdata = SQData()
- build_scenequeue_data(self.sqdata, self.rqdata, self.rq, self.cooker, self.stampcache, self)
+ build_scenequeue_data(self.sqdata, self.rqdata, self)
+
+ update_scenequeue_data(self.sqdata.sq_revdeps, self.sqdata, self.rqdata, self.rq, self.cooker, self.stampcache, self, summary=True)
+
+ # Compute a list of 'stale' sstate tasks where the current hash does not match the one
+ # in any stamp files. Pass the list out to metadata as an event.
+ found = {}
+ for tid in self.rqdata.runq_setscene_tids:
+ (mc, fn, taskname, taskfn) = split_tid_mcfn(tid)
+ stamps = bb.build.find_stale_stamps(taskname, taskfn)
+ if stamps:
+ if mc not in found:
+ found[mc] = {}
+ found[mc][tid] = stamps
+ for mc in found:
+ event = bb.event.StaleSetSceneTasks(found[mc])
+ bb.event.fire(event, self.cooker.databuilder.mcdata[mc])
+
+ self.build_taskdepdata_cache()
def runqueue_process_waitpid(self, task, status, fakerootlog=None):
@@ -1808,20 +1957,20 @@ class RunQueueExecute:
def finish_now(self):
for mc in self.rq.worker:
try:
- self.rq.worker[mc].process.stdin.write(b"<finishnow></finishnow>")
+ RunQueue.send_pickled_data(self.rq.worker[mc].process, b"", "finishnow")
self.rq.worker[mc].process.stdin.flush()
except IOError:
# worker must have died?
pass
for mc in self.rq.fakeworker:
try:
- self.rq.fakeworker[mc].process.stdin.write(b"<finishnow></finishnow>")
+ RunQueue.send_pickled_data(self.rq.fakeworker[mc].process, b"", "finishnow")
self.rq.fakeworker[mc].process.stdin.flush()
except IOError:
# worker must have died?
pass
- if len(self.failed_tids) != 0:
+ if self.failed_tids:
self.rq.state = runQueueFailed
return
@@ -1837,7 +1986,7 @@ class RunQueueExecute:
self.rq.read_workers()
return self.rq.active_fds()
- if len(self.failed_tids) != 0:
+ if self.failed_tids:
self.rq.state = runQueueFailed
return True
@@ -1884,8 +2033,7 @@ class RunQueueExecute:
try:
module = __import__(modname, fromlist=(name,))
except ImportError as exc:
- logger.critical("Unable to import scheduler '%s' from '%s': %s" % (name, modname, exc))
- raise SystemExit(1)
+ bb.fatal("Unable to import scheduler '%s' from '%s': %s" % (name, modname, exc))
else:
schedulers.add(getattr(module, name))
return schedulers
@@ -1915,11 +2063,19 @@ class RunQueueExecute:
self.setbuildable(revdep)
logger.debug("Marking task %s as buildable", revdep)
- for t in self.sq_deferred.copy():
+ found = None
+ for t in sorted(self.sq_deferred.copy()):
if self.sq_deferred[t] == task:
- logger.debug2("Deferred task %s now buildable" % t)
- del self.sq_deferred[t]
- update_scenequeue_data([t], self.sqdata, self.rqdata, self.rq, self.cooker, self.stampcache, self, summary=False)
+ # Allow the next deferred task to run. Any other deferred tasks should be deferred after that task.
+ # We shouldn't allow all to run at once as it is prone to races.
+ if not found:
+ bb.debug(1, "Deferred task %s now buildable" % t)
+ del self.sq_deferred[t]
+ update_scenequeue_data([t], self.sqdata, self.rqdata, self.rq, self.cooker, self.stampcache, self, summary=False)
+ found = t
+ else:
+ bb.debug(1, "Deferring %s after %s" % (t, found))
+ self.sq_deferred[t] = found
def task_complete(self, task):
self.stats.taskCompleted()
@@ -1935,7 +2091,7 @@ class RunQueueExecute:
self.stats.taskFailed()
self.failed_tids.append(task)
- fakeroot_log = ""
+ fakeroot_log = []
if fakerootlog and os.path.exists(fakerootlog):
with open(fakerootlog) as fakeroot_log_file:
fakeroot_failed = False
@@ -1945,14 +2101,14 @@ class RunQueueExecute:
fakeroot_failed = True
if 'doing new pid setup and server start' in line:
break
- fakeroot_log = line + fakeroot_log
+ fakeroot_log.append(line)
if not fakeroot_failed:
- fakeroot_log = None
+ fakeroot_log = []
- bb.event.fire(runQueueTaskFailed(task, self.stats, exitcode, self.rq, fakeroot_log=fakeroot_log), self.cfgData)
+ bb.event.fire(runQueueTaskFailed(task, self.stats, exitcode, self.rq, fakeroot_log=("".join(fakeroot_log) or None)), self.cfgData)
- if self.rqdata.taskData[''].abort:
+ if self.rqdata.taskData[''].halt:
self.rq.state = runQueueCleanUp
def task_skip(self, task, reason):
@@ -2001,7 +2157,7 @@ class RunQueueExecute:
if x not in self.tasks_scenequeue_done:
logger.error("Task %s was never processed by the setscene code" % x)
err = True
- if len(self.rqdata.runtaskentries[x].depends) == 0 and x not in self.runq_buildable:
+ if not self.rqdata.runtaskentries[x].depends and x not in self.runq_buildable:
logger.error("Task %s was never marked as buildable by the setscene code" % x)
err = True
return err
@@ -2024,8 +2180,11 @@ class RunQueueExecute:
if not self.sqdone and self.can_start_task():
# Find the next setscene to run
for nexttask in self.sorted_setscene_tids:
- if nexttask in self.sq_buildable and nexttask not in self.sq_running and self.sqdata.stamps[nexttask] not in self.build_stamps.values():
- if nexttask not in self.sqdata.unskippable and len(self.sqdata.sq_revdeps[nexttask]) > 0 and self.sqdata.sq_revdeps[nexttask].issubset(self.scenequeue_covered) and self.check_dependencies(nexttask, self.sqdata.sq_revdeps[nexttask]):
+ if nexttask in self.sq_buildable and nexttask not in self.sq_running and self.sqdata.stamps[nexttask] not in self.build_stamps.values() and nexttask not in self.sq_harddep_deferred:
+ if nexttask not in self.sqdata.unskippable and self.sqdata.sq_revdeps[nexttask] and \
+ nexttask not in self.sq_needed_harddeps and \
+ self.sqdata.sq_revdeps[nexttask].issubset(self.scenequeue_covered) and \
+ self.check_dependencies(nexttask, self.sqdata.sq_revdeps[nexttask]):
if nexttask not in self.rqdata.target_tids:
logger.debug2("Skipping setscene for task %s" % nexttask)
self.sq_task_skip(nexttask)
@@ -2033,6 +2192,19 @@ class RunQueueExecute:
if nexttask in self.sq_deferred:
del self.sq_deferred[nexttask]
return True
+ if nexttask in self.sqdata.sq_harddeps_rev and not self.sqdata.sq_harddeps_rev[nexttask].issubset(self.scenequeue_covered | self.scenequeue_notcovered):
+ logger.debug2("Deferring %s due to hard dependencies" % nexttask)
+ updated = False
+ for dep in self.sqdata.sq_harddeps_rev[nexttask]:
+ if dep not in self.sq_needed_harddeps:
+ logger.debug2("Enabling task %s as it is a hard dependency" % dep)
+ self.sq_buildable.add(dep)
+ self.sq_needed_harddeps.add(dep)
+ updated = True
+ self.sq_harddep_deferred.add(nexttask)
+ if updated:
+ return True
+ continue
# If covered tasks are running, need to wait for them to complete
for t in self.sqdata.sq_covered_tasks[nexttask]:
if t in self.runq_running and t not in self.runq_complete:
@@ -2081,21 +2253,35 @@ class RunQueueExecute:
startevent = sceneQueueTaskStarted(task, self.stats, self.rq)
bb.event.fire(startevent, self.cfgData)
- taskdepdata = self.sq_build_taskdepdata(task)
-
taskdep = self.rqdata.dataCaches[mc].task_deps[taskfn]
- taskhash = self.rqdata.get_task_hash(task)
- unihash = self.rqdata.get_task_unihash(task)
+ realfn = bb.cache.virtualfn2realfn(taskfn)[0]
+ runtask = {
+ 'fn' : taskfn,
+ 'task' : task,
+ 'taskname' : taskname,
+ 'taskhash' : self.rqdata.get_task_hash(task),
+ 'unihash' : self.rqdata.get_task_unihash(task),
+ 'quieterrors' : True,
+ 'appends' : self.cooker.collections[mc].get_file_appends(taskfn),
+ 'layername' : self.cooker.collections[mc].calc_bbfile_priority(realfn)[2],
+ 'taskdepdata' : self.sq_build_taskdepdata(task),
+ 'dry_run' : False,
+ 'taskdep': taskdep,
+ 'fakerootenv' : self.rqdata.dataCaches[mc].fakerootenv[taskfn],
+ 'fakerootdirs' : self.rqdata.dataCaches[mc].fakerootdirs[taskfn],
+ 'fakerootnoenv' : self.rqdata.dataCaches[mc].fakerootnoenv[taskfn]
+ }
+
if 'fakeroot' in taskdep and taskname in taskdep['fakeroot'] and not self.cooker.configuration.dry_run:
if not mc in self.rq.fakeworker:
self.rq.start_fakeworker(self, mc)
- self.rq.fakeworker[mc].process.stdin.write(b"<runtask>" + pickle.dumps((taskfn, task, taskname, taskhash, unihash, True, self.cooker.collections[mc].get_file_appends(taskfn), taskdepdata, False)) + b"</runtask>")
+ RunQueue.send_pickled_data(self.rq.fakeworker[mc].process, runtask, "runtask")
self.rq.fakeworker[mc].process.stdin.flush()
else:
- self.rq.worker[mc].process.stdin.write(b"<runtask>" + pickle.dumps((taskfn, task, taskname, taskhash, unihash, True, self.cooker.collections[mc].get_file_appends(taskfn), taskdepdata, False)) + b"</runtask>")
+ RunQueue.send_pickled_data(self.rq.worker[mc].process, runtask, "runtask")
self.rq.worker[mc].process.stdin.flush()
- self.build_stamps[task] = bb.build.stampfile(taskname, self.rqdata.dataCaches[mc], taskfn, noextra=True)
+ self.build_stamps[task] = bb.parse.siggen.stampfile_mcfn(taskname, taskfn, extrainfo=False)
self.build_stamps2.append(self.build_stamps[task])
self.sq_running.add(task)
self.sq_live.add(task)
@@ -2130,9 +2316,9 @@ class RunQueueExecute:
if task is not None:
(mc, fn, taskname, taskfn) = split_tid_mcfn(task)
- if self.rqdata.setscenewhitelist is not None:
- if self.check_setscenewhitelist(task):
- self.task_fail(task, "setscene whitelist")
+ if self.rqdata.setscene_ignore_tasks is not None:
+ if self.check_setscene_ignore_tasks(task):
+ self.task_fail(task, "setscene ignore_tasks")
return True
if task in self.tasks_covered:
@@ -2155,18 +2341,32 @@ class RunQueueExecute:
self.runq_running.add(task)
self.stats.taskActive()
if not (self.cooker.configuration.dry_run or self.rqdata.setscene_enforce):
- bb.build.make_stamp(taskname, self.rqdata.dataCaches[mc], taskfn)
+ bb.build.make_stamp_mcfn(taskname, taskfn)
self.task_complete(task)
return True
else:
startevent = runQueueTaskStarted(task, self.stats, self.rq)
bb.event.fire(startevent, self.cfgData)
- taskdepdata = self.build_taskdepdata(task)
-
taskdep = self.rqdata.dataCaches[mc].task_deps[taskfn]
- taskhash = self.rqdata.get_task_hash(task)
- unihash = self.rqdata.get_task_unihash(task)
+ realfn = bb.cache.virtualfn2realfn(taskfn)[0]
+ runtask = {
+ 'fn' : taskfn,
+ 'task' : task,
+ 'taskname' : taskname,
+ 'taskhash' : self.rqdata.get_task_hash(task),
+ 'unihash' : self.rqdata.get_task_unihash(task),
+ 'quieterrors' : False,
+ 'appends' : self.cooker.collections[mc].get_file_appends(taskfn),
+ 'layername' : self.cooker.collections[mc].calc_bbfile_priority(realfn)[2],
+ 'taskdepdata' : self.build_taskdepdata(task),
+ 'dry_run' : self.rqdata.setscene_enforce,
+ 'taskdep': taskdep,
+ 'fakerootenv' : self.rqdata.dataCaches[mc].fakerootenv[taskfn],
+ 'fakerootdirs' : self.rqdata.dataCaches[mc].fakerootdirs[taskfn],
+ 'fakerootnoenv' : self.rqdata.dataCaches[mc].fakerootnoenv[taskfn]
+ }
+
if 'fakeroot' in taskdep and taskname in taskdep['fakeroot'] and not (self.cooker.configuration.dry_run or self.rqdata.setscene_enforce):
if not mc in self.rq.fakeworker:
try:
@@ -2176,32 +2376,31 @@ class RunQueueExecute:
self.rq.state = runQueueFailed
self.stats.taskFailed()
return True
- self.rq.fakeworker[mc].process.stdin.write(b"<runtask>" + pickle.dumps((taskfn, task, taskname, taskhash, unihash, False, self.cooker.collections[mc].get_file_appends(taskfn), taskdepdata, self.rqdata.setscene_enforce)) + b"</runtask>")
+ RunQueue.send_pickled_data(self.rq.fakeworker[mc].process, runtask, "runtask")
self.rq.fakeworker[mc].process.stdin.flush()
else:
- self.rq.worker[mc].process.stdin.write(b"<runtask>" + pickle.dumps((taskfn, task, taskname, taskhash, unihash, False, self.cooker.collections[mc].get_file_appends(taskfn), taskdepdata, self.rqdata.setscene_enforce)) + b"</runtask>")
+ RunQueue.send_pickled_data(self.rq.worker[mc].process, runtask, "runtask")
self.rq.worker[mc].process.stdin.flush()
- self.build_stamps[task] = bb.build.stampfile(taskname, self.rqdata.dataCaches[mc], taskfn, noextra=True)
+ self.build_stamps[task] = bb.parse.siggen.stampfile_mcfn(taskname, taskfn, extrainfo=False)
self.build_stamps2.append(self.build_stamps[task])
self.runq_running.add(task)
self.stats.taskActive()
if self.can_start_task():
return True
- if self.stats.active > 0 or len(self.sq_live) > 0:
+ if self.stats.active > 0 or self.sq_live:
self.rq.read_workers()
return self.rq.active_fds()
# No more tasks can be run. If we have deferred setscene tasks we should run them.
if self.sq_deferred:
- tid = self.sq_deferred.pop(list(self.sq_deferred.keys())[0])
- logger.warning("Runqeueue deadlocked on deferred tasks, forcing task %s" % tid)
- if tid not in self.runq_complete:
- self.sq_task_failoutright(tid)
+ deferred_tid = list(self.sq_deferred.keys())[0]
+ blocking_tid = self.sq_deferred.pop(deferred_tid)
+ logger.warning("Runqueue deadlocked on deferred tasks, forcing task %s blocked by %s" % (deferred_tid, blocking_tid))
return True
- if len(self.failed_tids) != 0:
+ if self.failed_tids:
self.rq.state = runQueueFailed
return True
@@ -2234,6 +2433,22 @@ class RunQueueExecute:
ret.add(dep)
return ret
+ # Build the individual cache entries in advance once to save time
+ def build_taskdepdata_cache(self):
+ taskdepdata_cache = {}
+ for task in self.rqdata.runtaskentries:
+ (mc, fn, taskname, taskfn) = split_tid_mcfn(task)
+ pn = self.rqdata.dataCaches[mc].pkg_fn[taskfn]
+ deps = self.rqdata.runtaskentries[task].depends
+ provides = self.rqdata.dataCaches[mc].fn_provides[taskfn]
+ taskhash = self.rqdata.runtaskentries[task].hash
+ unihash = self.rqdata.runtaskentries[task].unihash
+ deps = self.filtermcdeps(task, mc, deps)
+ hashfn = self.rqdata.dataCaches[mc].hashfn[taskfn]
+ taskdepdata_cache[task] = [pn, taskname, fn, deps, provides, taskhash, unihash, hashfn]
+
+ self.taskdepdata_cache = taskdepdata_cache
+
# We filter out multiconfig dependencies from taskdepdata we pass to the tasks
# as most code can't handle them
def build_taskdepdata(self, task):
@@ -2245,15 +2460,9 @@ class RunQueueExecute:
while next:
additional = []
for revdep in next:
- (mc, fn, taskname, taskfn) = split_tid_mcfn(revdep)
- pn = self.rqdata.dataCaches[mc].pkg_fn[taskfn]
- deps = self.rqdata.runtaskentries[revdep].depends
- provides = self.rqdata.dataCaches[mc].fn_provides[taskfn]
- taskhash = self.rqdata.runtaskentries[revdep].hash
- unihash = self.rqdata.runtaskentries[revdep].unihash
- deps = self.filtermcdeps(task, mc, deps)
- taskdepdata[revdep] = [pn, taskname, fn, deps, provides, taskhash, unihash]
- for revdep2 in deps:
+ self.taskdepdata_cache[revdep][6] = self.rqdata.runtaskentries[revdep].unihash
+ taskdepdata[revdep] = self.taskdepdata_cache[revdep]
+ for revdep2 in self.taskdepdata_cache[revdep][3]:
if revdep2 not in taskdepdata:
additional.append(revdep2)
next = additional
@@ -2267,7 +2476,7 @@ class RunQueueExecute:
return
notcovered = set(self.scenequeue_notcovered)
- notcovered |= self.cantskip
+ notcovered |= self.sqdata.cantskip
for tid in self.scenequeue_notcovered:
notcovered |= self.sqdata.sq_covered_tasks[tid]
notcovered |= self.sqdata.unskippable.difference(self.rqdata.runq_setscene_tids)
@@ -2280,7 +2489,7 @@ class RunQueueExecute:
covered.intersection_update(self.tasks_scenequeue_done)
for tid in notcovered | covered:
- if len(self.rqdata.runtaskentries[tid].depends) == 0:
+ if not self.rqdata.runtaskentries[tid].depends:
self.setbuildable(tid)
elif self.rqdata.runtaskentries[tid].depends.issubset(self.runq_complete):
self.setbuildable(tid)
@@ -2322,6 +2531,9 @@ class RunQueueExecute:
self.rqdata.runtaskentries[hashtid].unihash = unihash
bb.parse.siggen.set_unihash(hashtid, unihash)
toprocess.add(hashtid)
+ if torehash:
+ # Need to save after set_unihash above
+ bb.parse.siggen.save_unitaskhashes()
# Work out all tasks which depend upon these
total = set()
@@ -2339,7 +2551,7 @@ class RunQueueExecute:
# Now iterate those tasks in dependency order to regenerate their taskhash/unihash
next = set()
for p in total:
- if len(self.rqdata.runtaskentries[p].depends) == 0:
+ if not self.rqdata.runtaskentries[p].depends:
next.add(p)
elif self.rqdata.runtaskentries[p].depends.isdisjoint(total):
next.add(p)
@@ -2349,11 +2561,10 @@ class RunQueueExecute:
current = next.copy()
next = set()
for tid in current:
- if len(self.rqdata.runtaskentries[p].depends) and not self.rqdata.runtaskentries[tid].depends.isdisjoint(total):
+ if self.rqdata.runtaskentries[p].depends and not self.rqdata.runtaskentries[tid].depends.isdisjoint(total):
continue
orighash = self.rqdata.runtaskentries[tid].hash
- dc = bb.parse.siggen.get_data_caches(self.rqdata.dataCaches, mc_from_tid(tid))
- newhash = bb.parse.siggen.get_taskhash(tid, self.rqdata.runtaskentries[tid].depends, dc)
+ newhash = bb.parse.siggen.get_taskhash(tid, self.rqdata.runtaskentries[tid].depends, self.rqdata.dataCaches)
origuni = self.rqdata.runtaskentries[tid].unihash
newuni = bb.parse.siggen.get_unihash(tid)
# FIXME, need to check it can come from sstate at all for determinism?
@@ -2379,9 +2590,9 @@ class RunQueueExecute:
if changed:
for mc in self.rq.worker:
- self.rq.worker[mc].process.stdin.write(b"<newtaskhashes>" + pickle.dumps(bb.parse.siggen.get_taskhashes()) + b"</newtaskhashes>")
+ RunQueue.send_pickled_data(self.rq.worker[mc].process, bb.parse.siggen.get_taskhashes(), "newtaskhashes")
for mc in self.rq.fakeworker:
- self.rq.fakeworker[mc].process.stdin.write(b"<newtaskhashes>" + pickle.dumps(bb.parse.siggen.get_taskhashes()) + b"</newtaskhashes>")
+ RunQueue.send_pickled_data(self.rq.fakeworker[mc].process, bb.parse.siggen.get_taskhashes(), "newtaskhashes")
hashequiv_logger.debug(pprint.pformat("Tasks changed:\n%s" % (changed)))
@@ -2415,7 +2626,7 @@ class RunQueueExecute:
self.tasks_scenequeue_done.remove(tid)
for dep in self.sqdata.sq_covered_tasks[tid]:
if dep in self.runq_complete and dep not in self.runq_tasksrun:
- bb.error("Task %s marked as completed but now needing to rerun? Aborting build." % dep)
+ bb.error("Task %s marked as completed but now needing to rerun? Halting build." % dep)
self.failed_tids.append(tid)
self.rq.state = runQueueCleanUp
return
@@ -2428,17 +2639,6 @@ class RunQueueExecute:
self.sq_buildable.remove(tid)
if tid in self.sq_running:
self.sq_running.remove(tid)
- harddepfail = False
- for t in self.sqdata.sq_harddeps:
- if tid in self.sqdata.sq_harddeps[t] and t in self.scenequeue_notcovered:
- harddepfail = True
- break
- if not harddepfail and self.sqdata.sq_revdeps[tid].issubset(self.scenequeue_covered | self.scenequeue_notcovered):
- if tid not in self.sq_buildable:
- self.sq_buildable.add(tid)
- if len(self.sqdata.sq_revdeps[tid]) == 0:
- self.sq_buildable.add(tid)
-
if tid in self.sqdata.outrightfail:
self.sqdata.outrightfail.remove(tid)
if tid in self.scenequeue_notcovered:
@@ -2449,7 +2649,7 @@ class RunQueueExecute:
self.scenequeue_notneeded.remove(tid)
(mc, fn, taskname, taskfn) = split_tid_mcfn(tid)
- self.sqdata.stamps[tid] = bb.build.stampfile(taskname + "_setscene", self.rqdata.dataCaches[mc], taskfn, noextra=True)
+ self.sqdata.stamps[tid] = bb.parse.siggen.stampfile_mcfn(taskname, taskfn, extrainfo=False)
if tid in self.stampcache:
del self.stampcache[tid]
@@ -2457,31 +2657,52 @@ class RunQueueExecute:
if tid in self.build_stamps:
del self.build_stamps[tid]
- update_tasks.append((tid, harddepfail, tid in self.sqdata.valid))
+ update_tasks.append(tid)
- if update_tasks:
+ update_tasks2 = []
+ for tid in update_tasks:
+ harddepfail = False
+ for t in self.sqdata.sq_harddeps_rev[tid]:
+ if t in self.scenequeue_notcovered:
+ harddepfail = True
+ break
+ if not harddepfail and self.sqdata.sq_revdeps[tid].issubset(self.scenequeue_covered | self.scenequeue_notcovered):
+ if tid not in self.sq_buildable:
+ self.sq_buildable.add(tid)
+ if not self.sqdata.sq_revdeps[tid]:
+ self.sq_buildable.add(tid)
+
+ update_tasks2.append((tid, harddepfail, tid in self.sqdata.valid))
+
+ if update_tasks2:
self.sqdone = False
- for tid in [t[0] for t in update_tasks]:
- h = pending_hash_index(tid, self.rqdata)
- if h in self.sqdata.hashes and tid != self.sqdata.hashes[h]:
- self.sq_deferred[tid] = self.sqdata.hashes[h]
- bb.note("Deferring %s after %s" % (tid, self.sqdata.hashes[h]))
- update_scenequeue_data([t[0] for t in update_tasks], self.sqdata, self.rqdata, self.rq, self.cooker, self.stampcache, self, summary=False)
-
- for (tid, harddepfail, origvalid) in update_tasks:
+ for mc in sorted(self.sqdata.multiconfigs):
+ for tid in sorted([t[0] for t in update_tasks2]):
+ if mc_from_tid(tid) != mc:
+ continue
+ h = pending_hash_index(tid, self.rqdata)
+ if h in self.sqdata.hashes and tid != self.sqdata.hashes[h]:
+ self.sq_deferred[tid] = self.sqdata.hashes[h]
+ bb.note("Deferring %s after %s" % (tid, self.sqdata.hashes[h]))
+ update_scenequeue_data([t[0] for t in update_tasks2], self.sqdata, self.rqdata, self.rq, self.cooker, self.stampcache, self, summary=False)
+
+ for (tid, harddepfail, origvalid) in update_tasks2:
if tid in self.sqdata.valid and not origvalid:
hashequiv_logger.verbose("Setscene task %s became valid" % tid)
if harddepfail:
+ logger.debug2("%s has an unavailable hard dependency so skipping" % (tid))
self.sq_task_failoutright(tid)
if changed:
self.stats.updateCovered(len(self.scenequeue_covered), len(self.scenequeue_notcovered))
+ self.sq_needed_harddeps = set()
+ self.sq_harddep_deferred = set()
self.holdoff_need_update = True
def scenequeue_updatecounters(self, task, fail=False):
- for dep in sorted(self.sqdata.sq_deps[task]):
- if fail and task in self.sqdata.sq_harddeps and dep in self.sqdata.sq_harddeps[task]:
+ if fail and task in self.sqdata.sq_harddeps:
+ for dep in sorted(self.sqdata.sq_harddeps[task]):
if dep in self.scenequeue_covered or dep in self.scenequeue_notcovered:
# dependency could be already processed, e.g. noexec setscene task
continue
@@ -2491,6 +2712,7 @@ class RunQueueExecute:
logger.debug2("%s was unavailable and is a hard dependency of %s so skipping" % (task, dep))
self.sq_task_failoutright(dep)
continue
+ for dep in sorted(self.sqdata.sq_deps[task]):
if self.sqdata.sq_revdeps[dep].issubset(self.scenequeue_covered | self.scenequeue_notcovered):
if dep not in self.sq_buildable:
self.sq_buildable.add(dep)
@@ -2509,6 +2731,13 @@ class RunQueueExecute:
new.add(dep)
next = new
+ # If this task was one which other setscene tasks have a hard dependency upon, we need
+ # to walk through the hard dependencies and allow execution of those which have completed dependencies.
+ if task in self.sqdata.sq_harddeps:
+ for dep in self.sq_harddep_deferred.copy():
+ if self.sqdata.sq_harddeps_rev[dep].issubset(self.scenequeue_covered | self.scenequeue_notcovered):
+ self.sq_harddep_deferred.remove(dep)
+
self.stats.updateCovered(len(self.scenequeue_covered), len(self.scenequeue_notcovered))
self.holdoff_need_update = True
@@ -2524,11 +2753,11 @@ class RunQueueExecute:
self.scenequeue_updatecounters(task)
def sq_check_taskfail(self, task):
- if self.rqdata.setscenewhitelist is not None:
+ if self.rqdata.setscene_ignore_tasks is not None:
realtask = task.split('_setscene')[0]
(mc, fn, taskname, taskfn) = split_tid_mcfn(realtask)
pn = self.rqdata.dataCaches[mc].pkg_fn[taskfn]
- if not check_setscene_enforce_whitelist(pn, taskname, self.rqdata.setscenewhitelist):
+ if not check_setscene_enforce_ignore_tasks(pn, taskname, self.rqdata.setscene_ignore_tasks):
logger.error('Task %s.%s failed' % (pn, taskname + "_setscene"))
self.rq.state = runQueueCleanUp
@@ -2582,7 +2811,8 @@ class RunQueueExecute:
provides = self.rqdata.dataCaches[mc].fn_provides[taskfn]
taskhash = self.rqdata.runtaskentries[revdep].hash
unihash = self.rqdata.runtaskentries[revdep].unihash
- taskdepdata[revdep] = [pn, taskname, fn, deps, provides, taskhash, unihash]
+ hashfn = self.rqdata.dataCaches[mc].hashfn[taskfn]
+ taskdepdata[revdep] = [pn, taskname, fn, deps, provides, taskhash, unihash, hashfn]
for revdep2 in deps:
if revdep2 not in taskdepdata:
additional.append(revdep2)
@@ -2591,8 +2821,8 @@ class RunQueueExecute:
#bb.note("Task %s: " % task + str(taskdepdata).replace("], ", "],\n"))
return taskdepdata
- def check_setscenewhitelist(self, tid):
- # Check task that is going to run against the whitelist
+ def check_setscene_ignore_tasks(self, tid):
+ # Check task that is going to run against the ignore tasks list
(mc, fn, taskname, taskfn) = split_tid_mcfn(tid)
# Ignore covered tasks
if tid in self.tasks_covered:
@@ -2606,14 +2836,15 @@ class RunQueueExecute:
return False
pn = self.rqdata.dataCaches[mc].pkg_fn[taskfn]
- if not check_setscene_enforce_whitelist(pn, taskname, self.rqdata.setscenewhitelist):
+ if not check_setscene_enforce_ignore_tasks(pn, taskname, self.rqdata.setscene_ignore_tasks):
if tid in self.rqdata.runq_setscene_tids:
- msg = 'Task %s.%s attempted to execute unexpectedly and should have been setscened' % (pn, taskname)
+ msg = ['Task %s.%s attempted to execute unexpectedly and should have been setscened' % (pn, taskname)]
else:
- msg = 'Task %s.%s attempted to execute unexpectedly' % (pn, taskname)
+ msg = ['Task %s.%s attempted to execute unexpectedly' % (pn, taskname)]
for t in self.scenequeue_notcovered:
- msg = msg + "\nTask %s, unihash %s, taskhash %s" % (t, self.rqdata.runtaskentries[t].unihash, self.rqdata.runtaskentries[t].hash)
- logger.error(msg + '\nThis is usually due to missing setscene tasks. Those missing in this build were: %s' % pprint.pformat(self.scenequeue_notcovered))
+ msg.append("\nTask %s, unihash %s, taskhash %s" % (t, self.rqdata.runtaskentries[t].unihash, self.rqdata.runtaskentries[t].hash))
+ msg.append('\nThis is usually due to missing setscene tasks. Those missing in this build were: %s' % pprint.pformat(self.scenequeue_notcovered))
+ logger.error("".join(msg))
return True
return False
@@ -2625,6 +2856,7 @@ class SQData(object):
self.sq_revdeps = {}
# Injected inter-setscene task dependencies
self.sq_harddeps = {}
+ self.sq_harddeps_rev = {}
# Cache of stamp files so duplicates can't run in parallel
self.stamps = {}
# Setscene tasks directly depended upon by the build
@@ -2634,12 +2866,17 @@ class SQData(object):
# A list of normal tasks a setscene task covers
self.sq_covered_tasks = {}
-def build_scenequeue_data(sqdata, rqdata, rq, cooker, stampcache, sqrq):
+def build_scenequeue_data(sqdata, rqdata, sqrq):
sq_revdeps = {}
sq_revdeps_squash = {}
sq_collated_deps = {}
+ # We can't skip specified target tasks which aren't setscene tasks
+ sqdata.cantskip = set(rqdata.target_tids)
+ sqdata.cantskip.difference_update(rqdata.runq_setscene_tids)
+ sqdata.cantskip.intersection_update(rqdata.runtaskentries)
+
# We need to construct a dependency graph for the setscene functions. Intermediate
# dependencies between the setscene tasks only complicate the code. This code
# therefore aims to collapse the huge runqueue dependency tree into a smaller one
@@ -2652,7 +2889,7 @@ def build_scenequeue_data(sqdata, rqdata, rq, cooker, stampcache, sqrq):
for tid in rqdata.runtaskentries:
sq_revdeps[tid] = copy.copy(rqdata.runtaskentries[tid].revdeps)
sq_revdeps_squash[tid] = set()
- if (len(sq_revdeps[tid]) == 0) and tid not in rqdata.runq_setscene_tids:
+ if not sq_revdeps[tid] and tid not in rqdata.runq_setscene_tids:
#bb.warn("Added endpoint %s" % (tid))
endpoints[tid] = set()
@@ -2686,16 +2923,15 @@ def build_scenequeue_data(sqdata, rqdata, rq, cooker, stampcache, sqrq):
sq_revdeps_squash[point] = set()
if point in rqdata.runq_setscene_tids:
sq_revdeps_squash[point] = tasks
- tasks = set()
continue
for dep in rqdata.runtaskentries[point].depends:
if point in sq_revdeps[dep]:
sq_revdeps[dep].remove(point)
if tasks:
sq_revdeps_squash[dep] |= tasks
- if len(sq_revdeps[dep]) == 0 and dep not in rqdata.runq_setscene_tids:
+ if not sq_revdeps[dep] and dep not in rqdata.runq_setscene_tids:
newendpoints[dep] = task
- if len(newendpoints) != 0:
+ if newendpoints:
process_endpoints(newendpoints)
process_endpoints(endpoints)
@@ -2707,16 +2943,16 @@ def build_scenequeue_data(sqdata, rqdata, rq, cooker, stampcache, sqrq):
# Take the build endpoints (no revdeps) and find the sstate tasks they depend upon
new = True
for tid in rqdata.runtaskentries:
- if len(rqdata.runtaskentries[tid].revdeps) == 0:
+ if not rqdata.runtaskentries[tid].revdeps:
sqdata.unskippable.add(tid)
- sqdata.unskippable |= sqrq.cantskip
+ sqdata.unskippable |= sqdata.cantskip
while new:
new = False
orig = sqdata.unskippable.copy()
for tid in sorted(orig, reverse=True):
if tid in rqdata.runq_setscene_tids:
continue
- if len(rqdata.runtaskentries[tid].depends) == 0:
+ if not rqdata.runtaskentries[tid].depends:
# These are tasks which have no setscene tasks in their chain, need to mark as directly buildable
sqrq.setbuildable(tid)
sqdata.unskippable |= rqdata.runtaskentries[tid].depends
@@ -2731,8 +2967,8 @@ def build_scenequeue_data(sqdata, rqdata, rq, cooker, stampcache, sqrq):
for taskcounter, tid in enumerate(rqdata.runtaskentries):
if tid in rqdata.runq_setscene_tids:
pass
- elif len(sq_revdeps_squash[tid]) != 0:
- bb.msg.fatal("RunQueue", "Something went badly wrong during scenequeue generation, aborting. Please report this problem.")
+ elif sq_revdeps_squash[tid]:
+ bb.msg.fatal("RunQueue", "Something went badly wrong during scenequeue generation, halting. Please report this problem.")
else:
del sq_revdeps_squash[tid]
rqdata.init_progress_reporter.update(taskcounter)
@@ -2746,7 +2982,9 @@ def build_scenequeue_data(sqdata, rqdata, rq, cooker, stampcache, sqrq):
(mc, fn, taskname, taskfn) = split_tid_mcfn(tid)
realtid = tid + "_setscene"
idepends = rqdata.taskData[mc].taskentries[realtid].idepends
- sqdata.stamps[tid] = bb.build.stampfile(taskname + "_setscene", rqdata.dataCaches[mc], taskfn, noextra=True)
+ sqdata.stamps[tid] = bb.parse.siggen.stampfile_mcfn(taskname, taskfn, extrainfo=False)
+
+ sqdata.sq_harddeps_rev[tid] = set()
for (depname, idependtask) in idepends:
if depname not in rqdata.taskData[mc].build_targets:
@@ -2759,20 +2997,15 @@ def build_scenequeue_data(sqdata, rqdata, rq, cooker, stampcache, sqrq):
if deptid not in rqdata.runtaskentries:
bb.msg.fatal("RunQueue", "Task %s depends upon non-existent task %s:%s" % (realtid, depfn, idependtask))
+ logger.debug2("Adding hard setscene dependency %s for %s" % (deptid, tid))
+
if not deptid in sqdata.sq_harddeps:
sqdata.sq_harddeps[deptid] = set()
sqdata.sq_harddeps[deptid].add(tid)
-
- sq_revdeps_squash[tid].add(deptid)
- # Have to zero this to avoid circular dependencies
- sq_revdeps_squash[deptid] = set()
+ sqdata.sq_harddeps_rev[tid].add(deptid)
rqdata.init_progress_reporter.next_stage()
- for task in sqdata.sq_harddeps:
- for dep in sqdata.sq_harddeps[task]:
- sq_revdeps_squash[dep].add(task)
-
rqdata.init_progress_reporter.next_stage()
#for tid in sq_revdeps_squash:
@@ -2796,10 +3029,10 @@ def build_scenequeue_data(sqdata, rqdata, rq, cooker, stampcache, sqrq):
sqdata.multiconfigs = set()
for tid in sqdata.sq_revdeps:
sqdata.multiconfigs.add(mc_from_tid(tid))
- if len(sqdata.sq_revdeps[tid]) == 0:
+ if not sqdata.sq_revdeps[tid]:
sqrq.sq_buildable.add(tid)
- rqdata.init_progress_reporter.finish()
+ rqdata.init_progress_reporter.next_stage()
sqdata.noexec = set()
sqdata.stamppresent = set()
@@ -2816,23 +3049,7 @@ def build_scenequeue_data(sqdata, rqdata, rq, cooker, stampcache, sqrq):
sqdata.hashes[h] = tid
else:
sqrq.sq_deferred[tid] = sqdata.hashes[h]
- bb.note("Deferring %s after %s" % (tid, sqdata.hashes[h]))
-
- update_scenequeue_data(sqdata.sq_revdeps, sqdata, rqdata, rq, cooker, stampcache, sqrq, summary=True)
-
- # Compute a list of 'stale' sstate tasks where the current hash does not match the one
- # in any stamp files. Pass the list out to metadata as an event.
- found = {}
- for tid in rqdata.runq_setscene_tids:
- (mc, fn, taskname, taskfn) = split_tid_mcfn(tid)
- stamps = bb.build.find_stale_stamps(taskname, rqdata.dataCaches[mc], taskfn)
- if stamps:
- if mc not in found:
- found[mc] = {}
- found[mc][tid] = stamps
- for mc in found:
- event = bb.event.StaleSetSceneTasks(found[mc])
- bb.event.fire(event, cooker.databuilder.mcdata[mc])
+ bb.debug(1, "Deferring %s after %s" % (tid, sqdata.hashes[h]))
def check_setscene_stamps(tid, rqdata, rq, stampcache, noexecstamp=False):
@@ -2841,7 +3058,7 @@ def check_setscene_stamps(tid, rqdata, rq, stampcache, noexecstamp=False):
taskdep = rqdata.dataCaches[mc].task_deps[taskfn]
if 'noexec' in taskdep and taskname in taskdep['noexec']:
- bb.build.make_stamp(taskname + "_setscene", rqdata.dataCaches[mc], taskfn)
+ bb.build.make_stamp_mcfn(taskname + "_setscene", taskfn)
return True, False
if rq.check_stamp_task(tid, taskname + "_setscene", cache=stampcache):
@@ -2871,11 +3088,13 @@ def update_scenequeue_data(tids, sqdata, rqdata, rq, cooker, stampcache, sqrq, s
if noexec:
sqdata.noexec.add(tid)
sqrq.sq_task_skip(tid)
+ logger.debug2("%s is noexec so skipping setscene" % (tid))
continue
if stamppresent:
sqdata.stamppresent.add(tid)
sqrq.sq_task_skip(tid)
+ logger.debug2("%s has a valid stamp, skipping" % (tid))
continue
tocheck.add(tid)
@@ -2896,6 +3115,7 @@ def update_scenequeue_data(tids, sqdata, rqdata, rq, cooker, stampcache, sqrq, s
if tid in sqrq.sq_deferred:
continue
sqdata.outrightfail.add(tid)
+ logger.debug2("%s already handled (fallthrough), skipping" % (tid))
class TaskFailure(Exception):
"""
@@ -3025,15 +3245,12 @@ class runQueuePipe():
if pipeout:
pipeout.close()
bb.utils.nonblockingfd(self.input)
- self.queue = b""
+ self.queue = bytearray()
self.d = d
self.rq = rq
self.rqexec = rqexec
self.fakerootlogs = fakerootlogs
- def setrunqueueexec(self, rqexec):
- self.rqexec = rqexec
-
def read(self):
for workers, name in [(self.rq.worker, "Worker"), (self.rq.fakeworker, "Fakeroot")]:
for worker in workers.values():
@@ -3044,13 +3261,13 @@ class runQueuePipe():
start = len(self.queue)
try:
- self.queue = self.queue + (self.input.read(102400) or b"")
+ self.queue.extend(self.input.read(102400) or b"")
except (OSError, IOError) as e:
if e.errno != errno.EAGAIN:
raise
end = len(self.queue)
found = True
- while found and len(self.queue):
+ while found and self.queue:
found = False
index = self.queue.find(b"</event>")
while index != -1 and self.queue.startswith(b"<event>"):
@@ -3088,16 +3305,16 @@ class runQueuePipe():
def close(self):
while self.read():
continue
- if len(self.queue) > 0:
+ if self.queue:
print("Warning, worker left partial message: %s" % self.queue)
self.input.close()
-def get_setscene_enforce_whitelist(d, targets):
+def get_setscene_enforce_ignore_tasks(d, targets):
if d.getVar('BB_SETSCENE_ENFORCE') != '1':
return None
- whitelist = (d.getVar("BB_SETSCENE_ENFORCE_WHITELIST") or "").split()
+ ignore_tasks = (d.getVar("BB_SETSCENE_ENFORCE_IGNORE_TASKS") or "").split()
outlist = []
- for item in whitelist[:]:
+ for item in ignore_tasks[:]:
if item.startswith('%:'):
for (mc, target, task, fn) in targets:
outlist.append(target + ':' + item.split(':')[1])
@@ -3105,12 +3322,12 @@ def get_setscene_enforce_whitelist(d, targets):
outlist.append(item)
return outlist
-def check_setscene_enforce_whitelist(pn, taskname, whitelist):
+def check_setscene_enforce_ignore_tasks(pn, taskname, ignore_tasks):
import fnmatch
- if whitelist is not None:
+ if ignore_tasks is not None:
item = '%s:%s' % (pn, taskname)
- for whitelist_item in whitelist:
- if fnmatch.fnmatch(item, whitelist_item):
+ for ignore_tasks in ignore_tasks:
+ if fnmatch.fnmatch(item, ignore_tasks):
return True
return False
return True