diff options
Diffstat (limited to 'bitbake/lib/bb/runqueue.py')
-rw-r--r-- | bitbake/lib/bb/runqueue.py | 2281 |
1 files changed, 1415 insertions, 866 deletions
diff --git a/bitbake/lib/bb/runqueue.py b/bitbake/lib/bb/runqueue.py index 329cda33a4..bc7e18175d 100644 --- a/bitbake/lib/bb/runqueue.py +++ b/bitbake/lib/bb/runqueue.py @@ -1,6 +1,3 @@ -#!/usr/bin/env python -# ex:ts=4:sw=4:sts=4:et -# -*- tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- """ BitBake 'RunQueue' implementation @@ -9,38 +6,29 @@ Handles preparation and execution of a queue of tasks # Copyright (C) 2006-2007 Richard Purdie # -# This program is free software; you can redistribute it and/or modify -# it under the terms of the GNU General Public License version 2 as -# published by the Free Software Foundation. +# SPDX-License-Identifier: GPL-2.0-only # -# This program is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU General Public License for more details. -# -# You should have received a copy of the GNU General Public License along -# with this program; if not, write to the Free Software Foundation, Inc., -# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. import copy import os import sys -import signal import stat -import fcntl import errno import logging import re import bb -from bb import msg, data, event +from bb import msg, event from bb import monitordisk import subprocess import pickle from multiprocessing import Process import shlex +import pprint +import time bblogger = logging.getLogger("BitBake") logger = logging.getLogger("BitBake.RunQueue") +hashequiv_logger = logging.getLogger("BitBake.RunQueue.HashEquiv") __find_sha256__ = re.compile( r'(?i)(?<![a-z0-9])[a-f0-9]{64}(?![a-z0-9])' ) @@ -51,7 +39,7 @@ def taskname_from_tid(tid): return tid.rsplit(":", 1)[1] def mc_from_tid(tid): - if tid.startswith('multiconfig:'): + if tid.startswith('mc:') and tid.count(':') >= 2: return tid.split(':')[1] return "" @@ -59,13 +47,19 @@ def split_tid(tid): (mc, fn, taskname, _) = split_tid_mcfn(tid) return (mc, fn, taskname) +def split_mc(n): + if n.startswith("mc:") and n.count(':') >= 2: + _, mc, n = n.split(":", 2) + return (mc, n) + return ('', n) + def split_tid_mcfn(tid): - if tid.startswith('multiconfig:'): + if tid.startswith('mc:') and tid.count(':') >= 2: elems = tid.split(':') mc = elems[1] fn = ":".join(elems[2:-1]) taskname = elems[-1] - mcfn = "multiconfig:" + mc + ":" + fn + mcfn = "mc:" + mc + ":" + fn else: tid = tid.rsplit(":", 1) mc = "" @@ -77,22 +71,34 @@ def split_tid_mcfn(tid): def build_tid(mc, fn, taskname): if mc: - return "multiconfig:" + mc + ":" + fn + ":" + taskname + return "mc:" + mc + ":" + fn + ":" + taskname return fn + ":" + taskname +# Index used to pair up potentially matching multiconfig tasks +# We match on PN, taskname and hash being equal +def pending_hash_index(tid, rqdata): + (mc, fn, taskname, taskfn) = split_tid_mcfn(tid) + pn = rqdata.dataCaches[mc].pkg_fn[taskfn] + h = rqdata.runtaskentries[tid].unihash + return pn + ":" + "taskname" + h + class RunQueueStats: """ Holds statistics on the tasks handled by the associated runQueue """ - def __init__(self, total): + def __init__(self, total, setscene_total): self.completed = 0 self.skipped = 0 self.failed = 0 self.active = 0 + self.setscene_active = 0 + self.setscene_covered = 0 + self.setscene_notcovered = 0 + self.setscene_total = setscene_total self.total = total def copy(self): - obj = self.__class__(self.total) + obj = self.__class__(self.total, self.setscene_total) obj.__dict__.update(self.__dict__) return obj @@ -111,12 +117,17 @@ class RunQueueStats: def taskActive(self): self.active = self.active + 1 + def updateCovered(self, covered, notcovered): + self.setscene_covered = covered + self.setscene_notcovered = notcovered + + def updateActiveSetscene(self, active): + self.setscene_active = active + # These values indicate the next step due to be run in the # runQueue state machine runQueuePrepare = 2 runQueueSceneInit = 3 -runQueueSceneRun = 4 -runQueueRunInit = 5 runQueueRunning = 6 runQueueFailed = 7 runQueueCleanUp = 8 @@ -139,23 +150,104 @@ class RunQueueScheduler(object): self.prio_map = [self.rqdata.runtaskentries.keys()] - self.buildable = [] + self.buildable = set() self.skip_maxthread = {} self.stamps = {} for tid in self.rqdata.runtaskentries: (mc, fn, taskname, taskfn) = split_tid_mcfn(tid) - self.stamps[tid] = bb.build.stampfile(taskname, self.rqdata.dataCaches[mc], taskfn, noextra=True) + self.stamps[tid] = bb.parse.siggen.stampfile_mcfn(taskname, taskfn, extrainfo=False) if tid in self.rq.runq_buildable: - self.buildable.append(tid) + self.buildable.add(tid) self.rev_prio_map = None + self.is_pressure_usable() + + def is_pressure_usable(self): + """ + If monitoring pressure, return True if pressure files can be open and read. For example + openSUSE /proc/pressure/* files have readable file permissions but when read the error EOPNOTSUPP (Operation not supported) + is returned. + """ + if self.rq.max_cpu_pressure or self.rq.max_io_pressure or self.rq.max_memory_pressure: + try: + with open("/proc/pressure/cpu") as cpu_pressure_fds, \ + open("/proc/pressure/io") as io_pressure_fds, \ + open("/proc/pressure/memory") as memory_pressure_fds: + + self.prev_cpu_pressure = cpu_pressure_fds.readline().split()[4].split("=")[1] + self.prev_io_pressure = io_pressure_fds.readline().split()[4].split("=")[1] + self.prev_memory_pressure = memory_pressure_fds.readline().split()[4].split("=")[1] + self.prev_pressure_time = time.time() + self.check_pressure = True + except: + bb.note("The /proc/pressure files can't be read. Continuing build without monitoring pressure") + self.check_pressure = False + else: + self.check_pressure = False + + def exceeds_max_pressure(self): + """ + Monitor the difference in total pressure at least once per second, if + BB_PRESSURE_MAX_{CPU|IO|MEMORY} are set, return True if above threshold. + """ + if self.check_pressure: + with open("/proc/pressure/cpu") as cpu_pressure_fds, \ + open("/proc/pressure/io") as io_pressure_fds, \ + open("/proc/pressure/memory") as memory_pressure_fds: + # extract "total" from /proc/pressure/{cpu|io} + curr_cpu_pressure = cpu_pressure_fds.readline().split()[4].split("=")[1] + curr_io_pressure = io_pressure_fds.readline().split()[4].split("=")[1] + curr_memory_pressure = memory_pressure_fds.readline().split()[4].split("=")[1] + now = time.time() + tdiff = now - self.prev_pressure_time + psi_accumulation_interval = 1.0 + cpu_pressure = (float(curr_cpu_pressure) - float(self.prev_cpu_pressure)) / tdiff + io_pressure = (float(curr_io_pressure) - float(self.prev_io_pressure)) / tdiff + memory_pressure = (float(curr_memory_pressure) - float(self.prev_memory_pressure)) / tdiff + exceeds_cpu_pressure = self.rq.max_cpu_pressure and cpu_pressure > self.rq.max_cpu_pressure + exceeds_io_pressure = self.rq.max_io_pressure and io_pressure > self.rq.max_io_pressure + exceeds_memory_pressure = self.rq.max_memory_pressure and memory_pressure > self.rq.max_memory_pressure + + if tdiff > psi_accumulation_interval: + self.prev_cpu_pressure = curr_cpu_pressure + self.prev_io_pressure = curr_io_pressure + self.prev_memory_pressure = curr_memory_pressure + self.prev_pressure_time = now + + pressure_state = (exceeds_cpu_pressure, exceeds_io_pressure, exceeds_memory_pressure) + pressure_values = (round(cpu_pressure,1), self.rq.max_cpu_pressure, round(io_pressure,1), self.rq.max_io_pressure, round(memory_pressure,1), self.rq.max_memory_pressure) + if hasattr(self, "pressure_state") and pressure_state != self.pressure_state: + bb.note("Pressure status changed to CPU: %s, IO: %s, Mem: %s (CPU: %s/%s, IO: %s/%s, Mem: %s/%s) - using %s/%s bitbake threads" % (pressure_state + pressure_values + (len(self.rq.runq_running.difference(self.rq.runq_complete)), self.rq.number_tasks))) + self.pressure_state = pressure_state + return (exceeds_cpu_pressure or exceeds_io_pressure or exceeds_memory_pressure) + elif self.rq.max_loadfactor: + limit = False + loadfactor = float(os.getloadavg()[0]) / os.cpu_count() + # bb.warn("Comparing %s to %s" % (loadfactor, self.rq.max_loadfactor)) + if loadfactor > self.rq.max_loadfactor: + limit = True + if hasattr(self, "loadfactor_limit") and limit != self.loadfactor_limit: + bb.note("Load average limiting set to %s as load average: %s - using %s/%s bitbake threads" % (limit, loadfactor, len(self.rq.runq_running.difference(self.rq.runq_complete)), self.rq.number_tasks)) + self.loadfactor_limit = limit + return limit + return False def next_buildable_task(self): """ Return the id of the first task we find that is buildable """ - self.buildable = [x for x in self.buildable if x not in self.rq.runq_running] - if not self.buildable: + # Once tasks are running we don't need to worry about them again + self.buildable.difference_update(self.rq.runq_running) + buildable = set(self.buildable) + buildable.difference_update(self.rq.holdoff_tasks) + buildable.intersection_update(self.rq.tasks_covered | self.rq.tasks_notcovered) + if not buildable: + return None + + # Bitbake requires that at least one task be active. Only check for pressure if + # this is the case, otherwise the pressure limitation could result in no tasks + # being active and no new tasks started thereby, at times, breaking the scheduler. + if self.rq.stats.active and self.exceeds_max_pressure(): return None # Filter out tasks that have a max number of threads that have been exceeded @@ -171,8 +263,8 @@ class RunQueueScheduler(object): else: skip_buildable[rtaskname] = 1 - if len(self.buildable) == 1: - tid = self.buildable[0] + if len(buildable) == 1: + tid = buildable.pop() taskname = taskname_from_tid(tid) if taskname in skip_buildable and skip_buildable[taskname] >= int(self.skip_maxthread[taskname]): return None @@ -187,12 +279,12 @@ class RunQueueScheduler(object): best = None bestprio = None - for tid in self.buildable: - taskname = taskname_from_tid(tid) - if taskname in skip_buildable and skip_buildable[taskname] >= int(self.skip_maxthread[taskname]): - continue + for tid in buildable: prio = self.rev_prio_map[tid] if bestprio is None or bestprio > prio: + taskname = taskname_from_tid(tid) + if taskname in skip_buildable and skip_buildable[taskname] >= int(self.skip_maxthread[taskname]): + continue stamp = self.stamps[tid] if stamp in self.rq.build_stamps.values(): continue @@ -209,7 +301,10 @@ class RunQueueScheduler(object): return self.next_buildable_task() def newbuildable(self, task): - self.buildable.append(task) + self.buildable.add(task) + + def removebuildable(self, task): + self.buildable.remove(task) def describe_task(self, taskid): result = 'ID %s' % taskid @@ -368,10 +463,9 @@ class RunQueueData: self.rq = rq self.warn_multi_bb = False - self.stampwhitelist = cfgData.getVar("BB_STAMP_WHITELIST") or "" - self.multi_provider_whitelist = (cfgData.getVar("MULTI_PROVIDER_WHITELIST") or "").split() - self.setscenewhitelist = get_setscene_enforce_whitelist(cfgData) - self.setscenewhitelist_checked = False + self.multi_provider_allowed = (cfgData.getVar("BB_MULTI_PROVIDER_ALLOWED") or "").split() + self.setscene_ignore_tasks = get_setscene_enforce_ignore_tasks(cfgData, targets) + self.setscene_ignore_tasks_checked = False self.setscene_enforce = (cfgData.getVar('BB_SETSCENE_ENFORCE') == "1") self.init_progress_reporter = bb.progress.DummyMultiStageProcessProgressReporter() @@ -469,7 +563,7 @@ class RunQueueData: msgs.append(" Task %s (dependent Tasks %s)\n" % (dep, self.runq_depends_names(self.runtaskentries[dep].depends))) msgs.append("\n") if len(valid_chains) > 10: - msgs.append("Aborted dependency loops search after 10 matches.\n") + msgs.append("Halted dependency loops search after 10 matches.\n") raise TooManyLoops continue scan = False @@ -530,7 +624,7 @@ class RunQueueData: next_points.append(revdep) task_done[revdep] = True endpoints = next_points - if len(next_points) == 0: + if not next_points: break # Circular dependency sanity check @@ -538,8 +632,8 @@ class RunQueueData: for tid in self.runtaskentries: if task_done[tid] is False or deps_left[tid] != 0: problem_tasks.append(tid) - logger.debug(2, "Task %s is not buildable", tid) - logger.debug(2, "(Complete marker was %s and the remaining dependency count was %s)\n", task_done[tid], deps_left[tid]) + logger.debug2("Task %s is not buildable", tid) + logger.debug2("(Complete marker was %s and the remaining dependency count was %s)\n", task_done[tid], deps_left[tid]) self.runtaskentries[tid].weight = weight[tid] if problem_tasks: @@ -572,15 +666,18 @@ class RunQueueData: found = False for mc in self.taskData: - if len(taskData[mc].taskentries) > 0: + if taskData[mc].taskentries: found = True break if not found: # Nothing to do return 0 + bb.parse.siggen.setup_datacache(self.dataCaches) + self.init_progress_reporter.start() self.init_progress_reporter.next_stage() + bb.event.check_for_interrupts(self.cooker.data) # Step A - Work out a list of tasks to run # @@ -626,6 +723,8 @@ class RunQueueData: frommc = mcdependency[1] mcdep = mcdependency[2] deptask = mcdependency[4] + if mcdep not in taskData: + bb.fatal("Multiconfig '%s' is referenced in multiconfig dependency '%s' but not enabled in BBMULTICONFIG?" % (mcdep, dep)) if mc == frommc: fn = taskData[mcdep].build_targets[pn][0] newdep = '%s:%s' % (fn,deptask) @@ -637,7 +736,7 @@ class RunQueueData: (mc, fn, taskname, taskfn) = split_tid_mcfn(tid) #runtid = build_tid(mc, fn, taskname) - #logger.debug(2, "Processing %s,%s:%s", mc, fn, taskname) + #logger.debug2("Processing %s,%s:%s", mc, fn, taskname) depends = set() task_deps = self.dataCaches[mc].task_deps[taskfn] @@ -727,6 +826,7 @@ class RunQueueData: #self.dump_data() self.init_progress_reporter.next_stage() + bb.event.check_for_interrupts(self.cooker.data) # Resolve recursive 'recrdeptask' dependencies (Part B) # @@ -756,7 +856,7 @@ class RunQueueData: # Find the dependency chain endpoints endpoints = set() for tid in self.runtaskentries: - if len(deps[tid]) == 0: + if not deps[tid]: endpoints.add(tid) # Iterate the chains collating dependencies while endpoints: @@ -767,11 +867,11 @@ class RunQueueData: cumulativedeps[dep].update(cumulativedeps[tid]) if tid in deps[dep]: deps[dep].remove(tid) - if len(deps[dep]) == 0: + if not deps[dep]: next.add(dep) endpoints = next #for tid in deps: - # if len(deps[tid]) != 0: + # if deps[tid]: # bb.warn("Sanity test failure, dependencies left for %s (%s)" % (tid, deps[tid])) # Loop here since recrdeptasks can depend upon other recrdeptasks and we have to @@ -823,6 +923,7 @@ class RunQueueData: self.runtaskentries[tid].depends.difference_update(recursivetasksselfref) self.init_progress_reporter.next_stage() + bb.event.check_for_interrupts(self.cooker.data) #self.dump_data() @@ -849,6 +950,20 @@ class RunQueueData: for depend in depends: mark_active(depend, depth+1) + def invalidate_task(tid, error_nostamp): + (mc, fn, taskname, taskfn) = split_tid_mcfn(tid) + taskdep = self.dataCaches[mc].task_deps[taskfn] + if fn + ":" + taskname not in taskData[mc].taskentries: + logger.warning("Task %s does not exist, invalidating this task will have no effect" % taskname) + if 'nostamp' in taskdep and taskname in taskdep['nostamp']: + if error_nostamp: + bb.fatal("Task %s is marked nostamp, cannot invalidate this task" % taskname) + else: + bb.debug(1, "Task %s is marked nostamp, cannot invalidate this task" % taskname) + else: + logger.verbose("Invalidate task %s, %s", taskname, fn) + bb.parse.siggen.invalidate_task(taskname, taskfn) + self.target_tids = [] for (mc, target, task, fn) in self.targets: @@ -890,43 +1005,54 @@ class RunQueueData: mark_active(tid, 1) self.init_progress_reporter.next_stage() + bb.event.check_for_interrupts(self.cooker.data) # Step C - Prune all inactive tasks # # Once all active tasks are marked, prune the ones we don't need. - delcount = {} - for tid in list(self.runtaskentries.keys()): - if tid not in runq_build: - delcount[tid] = self.runtaskentries[tid] - del self.runtaskentries[tid] - # Handle --runall if self.cooker.configuration.runall: # re-run the mark_active and then drop unused tasks from new list - runq_build = {} - for task in self.cooker.configuration.runall: - runall_tids = set() - for tid in list(self.runtaskentries): - wanttid = fn_from_tid(tid) + ":do_%s" % task - if wanttid in delcount: - self.runtaskentries[wanttid] = delcount[wanttid] - if wanttid in self.runtaskentries: - runall_tids.add(wanttid) + runall_tids = set() + added = True + while added: + reduced_tasklist = set(self.runtaskentries.keys()) + for tid in list(self.runtaskentries.keys()): + if tid not in runq_build: + reduced_tasklist.remove(tid) + runq_build = {} - for tid in list(runall_tids): - mark_active(tid,1) - - for tid in list(self.runtaskentries.keys()): - if tid not in runq_build: - delcount[tid] = self.runtaskentries[tid] - del self.runtaskentries[tid] + orig = runall_tids + runall_tids = set() + for task in self.cooker.configuration.runall: + if not task.startswith("do_"): + task = "do_{0}".format(task) + for tid in reduced_tasklist: + wanttid = "{0}:{1}".format(fn_from_tid(tid), task) + if wanttid in self.runtaskentries: + runall_tids.add(wanttid) + + for tid in list(runall_tids): + mark_active(tid, 1) + self.target_tids.append(tid) + if self.cooker.configuration.force: + invalidate_task(tid, False) + added = runall_tids - orig + + delcount = set() + for tid in list(self.runtaskentries.keys()): + if tid not in runq_build: + delcount.add(tid) + del self.runtaskentries[tid] - if len(self.runtaskentries) == 0: + if self.cooker.configuration.runall: + if not self.runtaskentries: bb.msg.fatal("RunQueue", "Could not find any tasks with the tasknames %s to run within the recipes of the taskgraphs of the targets %s" % (str(self.cooker.configuration.runall), str(self.targets))) self.init_progress_reporter.next_stage() + bb.event.check_for_interrupts(self.cooker.data) # Handle runonly if self.cooker.configuration.runonly: @@ -934,17 +1060,21 @@ class RunQueueData: runq_build = {} for task in self.cooker.configuration.runonly: - runonly_tids = { k: v for k, v in self.runtaskentries.items() if taskname_from_tid(k) == "do_%s" % task } + if not task.startswith("do_"): + task = "do_{0}".format(task) + runonly_tids = [k for k in self.runtaskentries.keys() if taskname_from_tid(k) == task] - for tid in list(runonly_tids): - mark_active(tid,1) + for tid in runonly_tids: + mark_active(tid, 1) + if self.cooker.configuration.force: + invalidate_task(tid, False) for tid in list(self.runtaskentries.keys()): if tid not in runq_build: - delcount[tid] = self.runtaskentries[tid] + delcount.add(tid) del self.runtaskentries[tid] - if len(self.runtaskentries) == 0: + if not self.runtaskentries: bb.msg.fatal("RunQueue", "Could not find any tasks with the tasknames %s to run within the taskgraphs of the targets %s" % (str(self.cooker.configuration.runonly), str(self.targets))) # @@ -952,8 +1082,8 @@ class RunQueueData: # # Check to make sure we still have tasks to run - if len(self.runtaskentries) == 0: - if not taskData[''].abort: + if not self.runtaskentries: + if not taskData[''].halt: bb.msg.fatal("RunQueue", "All buildable tasks have been run but the build is incomplete (--continue mode). Errors for the tasks that failed will have been printed above.") else: bb.msg.fatal("RunQueue", "No active tasks and not in --continue mode?! Please report this bug.") @@ -963,6 +1093,7 @@ class RunQueueData: logger.verbose("Assign Weightings") self.init_progress_reporter.next_stage() + bb.event.check_for_interrupts(self.cooker.data) # Generate a list of reverse dependencies to ease future calculations for tid in self.runtaskentries: @@ -970,13 +1101,14 @@ class RunQueueData: self.runtaskentries[dep].revdeps.add(tid) self.init_progress_reporter.next_stage() + bb.event.check_for_interrupts(self.cooker.data) # Identify tasks at the end of dependency chains # Error on circular dependency loops (length two) endpoints = [] for tid in self.runtaskentries: revdeps = self.runtaskentries[tid].revdeps - if len(revdeps) == 0: + if not revdeps: endpoints.append(tid) for dep in revdeps: if dep in self.runtaskentries[tid].depends: @@ -986,12 +1118,14 @@ class RunQueueData: logger.verbose("Compute totals (have %s endpoint(s))", len(endpoints)) self.init_progress_reporter.next_stage() + bb.event.check_for_interrupts(self.cooker.data) # Calculate task weights # Check of higher length circular dependencies self.runq_weight = self.calculate_task_weights(endpoints) self.init_progress_reporter.next_stage() + bb.event.check_for_interrupts(self.cooker.data) # Sanity Check - Check for multiple tasks building the same provider for mc in self.dataCaches: @@ -1012,7 +1146,7 @@ class RunQueueData: for prov in prov_list: if len(prov_list[prov]) < 2: continue - if prov in self.multi_provider_whitelist: + if prov in self.multi_provider_allowed: continue seen_pn = [] # If two versions of the same PN are being built its fatal, we don't support it. @@ -1022,12 +1156,12 @@ class RunQueueData: seen_pn.append(pn) else: bb.fatal("Multiple versions of %s are due to be built (%s). Only one version of a given PN should be built in any given build. You likely need to set PREFERRED_VERSION_%s to select the correct version or don't depend on multiple versions." % (pn, " ".join(prov_list[prov]), pn)) - msg = "Multiple .bb files are due to be built which each provide %s:\n %s" % (prov, "\n ".join(prov_list[prov])) + msgs = ["Multiple .bb files are due to be built which each provide %s:\n %s" % (prov, "\n ".join(prov_list[prov]))] # # Construct a list of things which uniquely depend on each provider # since this may help the user figure out which dependency is triggering this warning # - msg += "\nA list of tasks depending on these providers is shown and may help explain where the dependency comes from." + msgs.append("\nA list of tasks depending on these providers is shown and may help explain where the dependency comes from.") deplist = {} commondeps = None for provfn in prov_list[prov]: @@ -1047,12 +1181,12 @@ class RunQueueData: commondeps &= deps deplist[provfn] = deps for provfn in deplist: - msg += "\n%s has unique dependees:\n %s" % (provfn, "\n ".join(deplist[provfn] - commondeps)) + msgs.append("\n%s has unique dependees:\n %s" % (provfn, "\n ".join(deplist[provfn] - commondeps))) # # Construct a list of provides and runtime providers for each recipe # (rprovides has to cover RPROVIDES, PACKAGES, PACKAGES_DYNAMIC) # - msg += "\nIt could be that one recipe provides something the other doesn't and should. The following provider and runtime provider differences may be helpful." + msgs.append("\nIt could be that one recipe provides something the other doesn't and should. The following provider and runtime provider differences may be helpful.") provide_results = {} rprovide_results = {} commonprovs = None @@ -1079,56 +1213,33 @@ class RunQueueData: else: commonrprovs &= rprovides rprovide_results[provfn] = rprovides - #msg += "\nCommon provides:\n %s" % ("\n ".join(commonprovs)) - #msg += "\nCommon rprovides:\n %s" % ("\n ".join(commonrprovs)) + #msgs.append("\nCommon provides:\n %s" % ("\n ".join(commonprovs))) + #msgs.append("\nCommon rprovides:\n %s" % ("\n ".join(commonrprovs))) for provfn in prov_list[prov]: - msg += "\n%s has unique provides:\n %s" % (provfn, "\n ".join(provide_results[provfn] - commonprovs)) - msg += "\n%s has unique rprovides:\n %s" % (provfn, "\n ".join(rprovide_results[provfn] - commonrprovs)) + msgs.append("\n%s has unique provides:\n %s" % (provfn, "\n ".join(provide_results[provfn] - commonprovs))) + msgs.append("\n%s has unique rprovides:\n %s" % (provfn, "\n ".join(rprovide_results[provfn] - commonrprovs))) if self.warn_multi_bb: - logger.verbnote(msg) + logger.verbnote("".join(msgs)) else: - logger.error(msg) + logger.error("".join(msgs)) self.init_progress_reporter.next_stage() - - # Create a whitelist usable by the stamp checks - self.stampfnwhitelist = {} - for mc in self.taskData: - self.stampfnwhitelist[mc] = [] - for entry in self.stampwhitelist.split(): - if entry not in self.taskData[mc].build_targets: - continue - fn = self.taskData.build_targets[entry][0] - self.stampfnwhitelist[mc].append(fn) - self.init_progress_reporter.next_stage() + bb.event.check_for_interrupts(self.cooker.data) # Iterate over the task list looking for tasks with a 'setscene' function - self.runq_setscene_tids = [] + self.runq_setscene_tids = set() if not self.cooker.configuration.nosetscene: for tid in self.runtaskentries: (mc, fn, taskname, _) = split_tid_mcfn(tid) setscenetid = tid + "_setscene" if setscenetid not in taskData[mc].taskentries: continue - self.runq_setscene_tids.append(tid) - - def invalidate_task(tid, error_nostamp): - (mc, fn, taskname, taskfn) = split_tid_mcfn(tid) - taskdep = self.dataCaches[mc].task_deps[taskfn] - if fn + ":" + taskname not in taskData[mc].taskentries: - logger.warning("Task %s does not exist, invalidating this task will have no effect" % taskname) - if 'nostamp' in taskdep and taskname in taskdep['nostamp']: - if error_nostamp: - bb.fatal("Task %s is marked nostamp, cannot invalidate this task" % taskname) - else: - bb.debug(1, "Task %s is marked nostamp, cannot invalidate this task" % taskname) - else: - logger.verbose("Invalidate task %s, %s", taskname, fn) - bb.parse.siggen.invalidate_task(taskname, self.dataCaches[mc], taskfn) + self.runq_setscene_tids.add(tid) self.init_progress_reporter.next_stage() + bb.event.check_for_interrupts(self.cooker.data) # Invalidate task if force mode active if self.cooker.configuration.force: @@ -1145,6 +1256,7 @@ class RunQueueData: invalidate_task(fn + ":" + st, True) self.init_progress_reporter.next_stage() + bb.event.check_for_interrupts(self.cooker.data) # Create and print to the logs a virtual/xxxx -> PN (fn) table for mc in taskData: @@ -1157,16 +1269,20 @@ class RunQueueData: bb.parse.siggen.tasks_resolved(virtmap, virtpnmap, self.dataCaches[mc]) self.init_progress_reporter.next_stage() + bb.event.check_for_interrupts(self.cooker.data) + + bb.parse.siggen.set_setscene_tasks(self.runq_setscene_tids) # Iterate over the task list and call into the siggen code dealtwith = set() todeal = set(self.runtaskentries) - while len(todeal) > 0: + while todeal: for tid in todeal.copy(): - if len(self.runtaskentries[tid].depends - dealtwith) == 0: + if not (self.runtaskentries[tid].depends - dealtwith): dealtwith.add(tid) todeal.remove(tid) self.prepare_task_hash(tid) + bb.event.check_for_interrupts(self.cooker.data) bb.parse.siggen.writeout_file_checksum_cache() @@ -1174,20 +1290,17 @@ class RunQueueData: return len(self.runtaskentries) def prepare_task_hash(self, tid): - procdep = [] - for dep in self.runtaskentries[tid].depends: - procdep.append(fn_from_tid(dep) + "." + taskname_from_tid(dep)) - (mc, fn, taskname, taskfn) = split_tid_mcfn(tid) - self.runtaskentries[tid].hash = bb.parse.siggen.get_taskhash(taskfn, taskname, procdep, self.dataCaches[mc]) - self.runtaskentries[tid].unihash = bb.parse.siggen.get_unihash(taskfn + "." + taskname) + bb.parse.siggen.prep_taskhash(tid, self.runtaskentries[tid].depends, self.dataCaches) + self.runtaskentries[tid].hash = bb.parse.siggen.get_taskhash(tid, self.runtaskentries[tid].depends, self.dataCaches) + self.runtaskentries[tid].unihash = bb.parse.siggen.get_unihash(tid) def dump_data(self): """ Dump some debug information on the internal data structures """ - logger.debug(3, "run_tasks:") + logger.debug3("run_tasks:") for tid in self.runtaskentries: - logger.debug(3, " %s: %s Deps %s RevDeps %s", tid, + logger.debug3(" %s: %s Deps %s RevDeps %s", tid, self.runtaskentries[tid].weight, self.runtaskentries[tid].depends, self.runtaskentries[tid].revdeps) @@ -1204,9 +1317,7 @@ class RunQueue: self.cfgData = cfgData self.rqdata = RunQueueData(self, cooker, cfgData, dataCaches, taskData, targets) - self.stamppolicy = cfgData.getVar("BB_STAMP_POLICY") or "perfile" self.hashvalidate = cfgData.getVar("BB_HASHCHECK_FUNCTION") or None - self.setsceneverify = cfgData.getVar("BB_SETSCENE_VERIFY_FUNCTION2") or None self.depvalidate = cfgData.getVar("BB_SETSCENE_DEPVALID") or None self.state = runQueuePrepare @@ -1215,7 +1326,7 @@ class RunQueue: # Invoked at regular time intervals via the bitbake heartbeat event # while the build is running. We generate a unique name for the handler # here, just in case that there ever is more than one RunQueue instance, - # start the handler when reaching runQueueSceneRun, and stop it when + # start the handler when reaching runQueueSceneInit, and stop it when # done with the build. self.dm = monitordisk.diskMonitor(cfgData) self.dm_event_handler_name = '_bb_diskmonitor_' + str(id(self)) @@ -1224,44 +1335,56 @@ class RunQueue: self.worker = {} self.fakeworker = {} + @staticmethod + def send_pickled_data(worker, data, name): + msg = bytearray() + msg.extend(b"<" + name.encode() + b">") + pickled_data = pickle.dumps(data) + msg.extend(len(pickled_data).to_bytes(4, 'big')) + msg.extend(pickled_data) + msg.extend(b"</" + name.encode() + b">") + worker.stdin.write(msg) + def _start_worker(self, mc, fakeroot = False, rqexec = None): - logger.debug(1, "Starting bitbake-worker") + logger.debug("Starting bitbake-worker") magic = "decafbad" if self.cooker.configuration.profile: magic = "decafbadbad" + fakerootlogs = None + + workerscript = os.path.realpath(os.path.dirname(__file__) + "/../../bin/bitbake-worker") if fakeroot: magic = magic + "beef" mcdata = self.cooker.databuilder.mcdata[mc] fakerootcmd = shlex.split(mcdata.getVar("FAKEROOTCMD")) fakerootenv = (mcdata.getVar("FAKEROOTBASEENV") or "").split() env = os.environ.copy() - for key, value in (var.split('=') for var in fakerootenv): + for key, value in (var.split('=',1) for var in fakerootenv): env[key] = value - worker = subprocess.Popen(fakerootcmd + ["bitbake-worker", magic], stdout=subprocess.PIPE, stdin=subprocess.PIPE, env=env) + worker = subprocess.Popen(fakerootcmd + [sys.executable, workerscript, magic], stdout=subprocess.PIPE, stdin=subprocess.PIPE, env=env) + fakerootlogs = self.rqdata.dataCaches[mc].fakerootlogs else: - worker = subprocess.Popen(["bitbake-worker", magic], stdout=subprocess.PIPE, stdin=subprocess.PIPE) + worker = subprocess.Popen([sys.executable, workerscript, magic], stdout=subprocess.PIPE, stdin=subprocess.PIPE) bb.utils.nonblockingfd(worker.stdout) - workerpipe = runQueuePipe(worker.stdout, None, self.cfgData, self, rqexec) + workerpipe = runQueuePipe(worker.stdout, None, self.cfgData, self, rqexec, fakerootlogs=fakerootlogs) workerdata = { - "taskdeps" : self.rqdata.dataCaches[mc].task_deps, - "fakerootenv" : self.rqdata.dataCaches[mc].fakerootenv, - "fakerootdirs" : self.rqdata.dataCaches[mc].fakerootdirs, - "fakerootnoenv" : self.rqdata.dataCaches[mc].fakerootnoenv, "sigdata" : bb.parse.siggen.get_taskdata(), - "logdefaultdebug" : bb.msg.loggerDefaultDebugLevel, - "logdefaultverbose" : bb.msg.loggerDefaultVerbose, - "logdefaultverboselogs" : bb.msg.loggerVerboseLogs, + "logdefaultlevel" : bb.msg.loggerDefaultLogLevel, + "build_verbose_shell" : self.cooker.configuration.build_verbose_shell, + "build_verbose_stdout" : self.cooker.configuration.build_verbose_stdout, "logdefaultdomain" : bb.msg.loggerDefaultDomains, "prhost" : self.cooker.prhost, "buildname" : self.cfgData.getVar("BUILDNAME"), "date" : self.cfgData.getVar("DATE"), "time" : self.cfgData.getVar("TIME"), + "hashservaddr" : self.cooker.hashservaddr, + "umask" : self.cfgData.getVar("BB_DEFAULT_UMASK"), } - worker.stdin.write(b"<cookerconfig>" + pickle.dumps(self.cooker.configuration) + b"</cookerconfig>") - worker.stdin.write(b"<extraconfigdata>" + pickle.dumps(self.cooker.extraconfigdata) + b"</extraconfigdata>") - worker.stdin.write(b"<workerdata>" + pickle.dumps(workerdata) + b"</workerdata>") + RunQueue.send_pickled_data(worker, self.cooker.configuration, "cookerconfig") + RunQueue.send_pickled_data(worker, self.cooker.extraconfigdata, "extraconfigdata") + RunQueue.send_pickled_data(worker, workerdata, "workerdata") worker.stdin.flush() return RunQueueWorker(worker, workerpipe) @@ -1269,9 +1392,9 @@ class RunQueue: def _teardown_worker(self, worker): if not worker: return - logger.debug(1, "Teardown for bitbake-worker") + logger.debug("Teardown for bitbake-worker") try: - worker.process.stdin.write(b"<quit></quit>") + RunQueue.send_pickled_data(worker.process, b"", "quit") worker.process.stdin.flush() worker.process.stdin.close() except IOError: @@ -1283,12 +1406,12 @@ class RunQueue: continue worker.pipe.close() - def start_worker(self): + def start_worker(self, rqexec): if self.worker: self.teardown_workers() self.teardown = False for mc in self.rqdata.dataCaches: - self.worker[mc] = self._start_worker(mc) + self.worker[mc] = self._start_worker(mc, False, rqexec) def start_fakeworker(self, rqexec, mc): if not mc in self.fakeworker: @@ -1330,27 +1453,19 @@ class RunQueue: if taskname is None: taskname = tn - if self.stamppolicy == "perfile": - fulldeptree = False - else: - fulldeptree = True - stampwhitelist = [] - if self.stamppolicy == "whitelist": - stampwhitelist = self.rqdata.stampfnwhitelist[mc] - - stampfile = bb.build.stampfile(taskname, self.rqdata.dataCaches[mc], taskfn) + stampfile = bb.parse.siggen.stampfile_mcfn(taskname, taskfn) # If the stamp is missing, it's not current if not os.access(stampfile, os.F_OK): - logger.debug(2, "Stampfile %s not available", stampfile) + logger.debug2("Stampfile %s not available", stampfile) return False # If it's a 'nostamp' task, it's not current taskdep = self.rqdata.dataCaches[mc].task_deps[taskfn] if 'nostamp' in taskdep and taskname in taskdep['nostamp']: - logger.debug(2, "%s.%s is nostamp\n", fn, taskname) + logger.debug2("%s.%s is nostamp\n", fn, taskname) return False - if taskname != "do_setscene" and taskname.endswith("_setscene"): + if taskname.endswith("_setscene"): return True if cache is None: @@ -1361,28 +1476,28 @@ class RunQueue: for dep in self.rqdata.runtaskentries[tid].depends: if iscurrent: (mc2, fn2, taskname2, taskfn2) = split_tid_mcfn(dep) - stampfile2 = bb.build.stampfile(taskname2, self.rqdata.dataCaches[mc2], taskfn2) - stampfile3 = bb.build.stampfile(taskname2 + "_setscene", self.rqdata.dataCaches[mc2], taskfn2) + stampfile2 = bb.parse.siggen.stampfile_mcfn(taskname2, taskfn2) + stampfile3 = bb.parse.siggen.stampfile_mcfn(taskname2 + "_setscene", taskfn2) t2 = get_timestamp(stampfile2) t3 = get_timestamp(stampfile3) if t3 and not t2: continue if t3 and t3 > t2: continue - if fn == fn2 or (fulldeptree and fn2 not in stampwhitelist): + if fn == fn2: if not t2: - logger.debug(2, 'Stampfile %s does not exist', stampfile2) + logger.debug2('Stampfile %s does not exist', stampfile2) iscurrent = False break if t1 < t2: - logger.debug(2, 'Stampfile %s < %s', stampfile, stampfile2) + logger.debug2('Stampfile %s < %s', stampfile, stampfile2) iscurrent = False break if recurse and iscurrent: if dep in cache: iscurrent = cache[dep] if not iscurrent: - logger.debug(2, 'Stampfile for dependency %s:%s invalid (cached)' % (fn2, taskname2)) + logger.debug2('Stampfile for dependency %s:%s invalid (cached)' % (fn2, taskname2)) else: iscurrent = self.check_stamp_task(dep, recurse=True, cache=cache) cache[dep] = iscurrent @@ -1390,37 +1505,42 @@ class RunQueue: cache[tid] = iscurrent return iscurrent - def validate_hash(self, *, sq_fn, sq_task, sq_hash, sq_hashfn, siginfo, sq_unihash, d): - locs = {"sq_fn" : sq_fn, "sq_task" : sq_task, "sq_hash" : sq_hash, "sq_hashfn" : sq_hashfn, - "sq_unihash" : sq_unihash, "siginfo" : siginfo, "d" : d} + def validate_hashes(self, tocheck, data, currentcount=0, siginfo=False, summary=True): + valid = set() + if self.hashvalidate: + sq_data = {} + sq_data['hash'] = {} + sq_data['hashfn'] = {} + sq_data['unihash'] = {} + for tid in tocheck: + (mc, fn, taskname, taskfn) = split_tid_mcfn(tid) + sq_data['hash'][tid] = self.rqdata.runtaskentries[tid].hash + sq_data['hashfn'][tid] = self.rqdata.dataCaches[mc].hashfn[taskfn] + sq_data['unihash'][tid] = self.rqdata.runtaskentries[tid].unihash + + valid = self.validate_hash(sq_data, data, siginfo, currentcount, summary) - hashvalidate_args = ("(sq_fn, sq_task, sq_hash, sq_hashfn, d, siginfo=siginfo, sq_unihash=sq_unihash)", - "(sq_fn, sq_task, sq_hash, sq_hashfn, d, siginfo=siginfo)", - "(sq_fn, sq_task, sq_hash, sq_hashfn, d)") + return valid - for args in hashvalidate_args[:-1]: - try: - call = self.hashvalidate + args - return bb.utils.better_eval(call, locs) - except TypeError: - continue + def validate_hash(self, sq_data, d, siginfo, currentcount, summary): + locs = {"sq_data" : sq_data, "d" : d, "siginfo" : siginfo, "currentcount" : currentcount, "summary" : summary} + + # Metadata has **kwargs so args can be added, sq_data can also gain new fields + call = self.hashvalidate + "(sq_data, d, siginfo=siginfo, currentcount=currentcount, summary=summary)" - # Call the last entry without a try...catch to propagate any thrown - # TypeError - call = self.hashvalidate + hashvalidate_args[-1] return bb.utils.better_eval(call, locs) def _execute_runqueue(self): """ Run the tasks in a queue prepared by rqdata.prepare() Upon failure, optionally try to recover the build using any alternate providers - (if the abort on failure configuration option isn't set) + (if the halt on failure configuration option isn't set) """ retval = True + bb.event.check_for_interrupts(self.cooker.data) if self.state is runQueuePrepare: - self.rqexe = RunQueueExecuteDummy(self) # NOTE: if you add, remove or significantly refactor the stages of this # process then you should recalculate the weightings here. This is quite # easy to do - just change the next line temporarily to pass debug=True as @@ -1434,21 +1554,26 @@ class RunQueue: self.state = runQueueComplete else: self.state = runQueueSceneInit - self.rqdata.init_progress_reporter.next_stage() - - # we are ready to run, emit dependency info to any UI or class which - # needs it - depgraph = self.cooker.buildDependTree(self, self.rqdata.taskData) - self.rqdata.init_progress_reporter.next_stage() - bb.event.fire(bb.event.DepTreeGenerated(depgraph), self.cooker.data) + bb.parse.siggen.save_unitaskhashes() if self.state is runQueueSceneInit: + self.rqdata.init_progress_reporter.next_stage() + + # we are ready to run, emit dependency info to any UI or class which + # needs it + depgraph = self.cooker.buildDependTree(self, self.rqdata.taskData) + self.rqdata.init_progress_reporter.next_stage() + bb.event.fire(bb.event.DepTreeGenerated(depgraph), self.cooker.data) + if not self.dm_event_handler_registered: res = bb.event.register(self.dm_event_handler_name, - lambda x: self.dm.check(self) if self.state in [runQueueSceneRun, runQueueRunning, runQueueCleanUp] else False, - ('bb.event.HeartbeatEvent',)) + lambda x, y: self.dm.check(self) if self.state in [runQueueRunning, runQueueCleanUp] else False, + ('bb.event.HeartbeatEvent',), data=self.cfgData) self.dm_event_handler_registered = True + self.rqdata.init_progress_reporter.next_stage() + self.rqexe = RunQueueExecute(self) + dump = self.cooker.configuration.dump_signatures if dump: self.rqdata.init_progress_reporter.finish() @@ -1458,24 +1583,21 @@ class RunQueue: if 'printdiff' in dump: self.write_diffscenetasks(invalidtasks) self.state = runQueueComplete - else: - self.rqdata.init_progress_reporter.next_stage() - self.start_worker() - self.rqdata.init_progress_reporter.next_stage() - self.rqexe = RunQueueExecuteScenequeue(self) - - if self.state is runQueueSceneRun: - retval = self.rqexe.execute() - if self.state is runQueueRunInit: - if self.cooker.configuration.setsceneonly: - self.state = runQueueComplete - else: - # Just in case we didn't setscene - self.rqdata.init_progress_reporter.finish() - logger.info("Executing RunQueue Tasks") - self.rqexe = RunQueueExecuteTasks(self) - self.state = runQueueRunning + if self.state is runQueueSceneInit: + self.start_worker(self.rqexe) + self.rqdata.init_progress_reporter.finish() + + # If we don't have any setscene functions, skip execution + if not self.rqdata.runq_setscene_tids: + logger.info('No setscene tasks') + for tid in self.rqdata.runtaskentries: + if not self.rqdata.runtaskentries[tid].depends: + self.rqexe.setbuildable(tid) + self.rqexe.tasks_notcovered.add(tid) + self.rqexe.sqdone = True + logger.info('Executing Tasks') + self.state = runQueueRunning if self.state is runQueueRunning: retval = self.rqexe.execute() @@ -1486,16 +1608,18 @@ class RunQueue: build_done = self.state is runQueueComplete or self.state is runQueueFailed if build_done and self.dm_event_handler_registered: - bb.event.remove(self.dm_event_handler_name, None) + bb.event.remove(self.dm_event_handler_name, None, data=self.cfgData) self.dm_event_handler_registered = False if build_done and self.rqexe: + bb.parse.siggen.save_unitaskhashes() self.teardown_workers() - if self.rqexe.stats.failed: - logger.info("Tasks Summary: Attempted %d tasks of which %d didn't need to be rerun and %d failed.", self.rqexe.stats.completed + self.rqexe.stats.failed, self.rqexe.stats.skipped, self.rqexe.stats.failed) - else: - # Let's avoid the word "failed" if nothing actually did - logger.info("Tasks Summary: Attempted %d tasks of which %d didn't need to be rerun and all succeeded.", self.rqexe.stats.completed, self.rqexe.stats.skipped) + if self.rqexe: + if self.rqexe.stats.failed: + logger.info("Tasks Summary: Attempted %d tasks of which %d didn't need to be rerun and %d failed.", self.rqexe.stats.completed + self.rqexe.stats.failed, self.rqexe.stats.skipped, self.rqexe.stats.failed) + else: + # Let's avoid the word "failed" if nothing actually did + logger.info("Tasks Summary: Attempted %d tasks of which %d didn't need to be rerun and all succeeded.", self.rqexe.stats.completed, self.rqexe.stats.skipped) if self.state is runQueueFailed: raise bb.runqueue.TaskFailure(self.rqexe.failed_tids) @@ -1541,28 +1665,28 @@ class RunQueue: else: self.rqexe.finish() - def rq_dump_sigfn(self, fn, options): - bb_cache = bb.cache.NoCache(self.cooker.databuilder) - the_data = bb_cache.loadDataFull(fn, self.cooker.collection.get_file_appends(fn)) - siggen = bb.parse.siggen - dataCaches = self.rqdata.dataCaches - siggen.dump_sigfn(fn, dataCaches, options) + def _rq_dump_sigtid(self, tids): + for tid in tids: + (mc, fn, taskname, taskfn) = split_tid_mcfn(tid) + dataCaches = self.rqdata.dataCaches + bb.parse.siggen.dump_sigtask(taskfn, taskname, dataCaches[mc].stamp[taskfn], True) def dump_signatures(self, options): - fns = set() - bb.note("Reparsing files to collect dependency data") + if bb.cooker.CookerFeatures.RECIPE_SIGGEN_INFO not in self.cooker.featureset: + bb.fatal("The dump signatures functionality needs the RECIPE_SIGGEN_INFO feature enabled") - for tid in self.rqdata.runtaskentries: - fn = fn_from_tid(tid) - fns.add(fn) + bb.note("Writing task signature files") max_process = int(self.cfgData.getVar("BB_NUMBER_PARSE_THREADS") or os.cpu_count() or 1) + def chunkify(l, n): + return [l[i::n] for i in range(n)] + tids = chunkify(list(self.rqdata.runtaskentries), max_process) # We cannot use the real multiprocessing.Pool easily due to some local data # that can't be pickled. This is a cheap multi-process solution. launched = [] - while fns: + while tids: if len(launched) < max_process: - p = Process(target=self.rq_dump_sigfn, args=(fns.pop(), options)) + p = Process(target=self._rq_dump_sigtid, args=(tids.pop(), )) p.start() launched.append(p) for q in launched: @@ -1577,17 +1701,20 @@ class RunQueue: return def print_diffscenetasks(self): + def get_root_invalid_tasks(task, taskdepends, valid, noexec, visited_invalid): + invalidtasks = [] + for t in taskdepends[task].depends: + if t not in valid and t not in visited_invalid: + invalidtasks.extend(get_root_invalid_tasks(t, taskdepends, valid, noexec, visited_invalid)) + visited_invalid.add(t) + + direct_invalid = [t for t in taskdepends[task].depends if t not in valid] + if not direct_invalid and task not in noexec: + invalidtasks = [task] + return invalidtasks - valid = [] - sq_hash = [] - sq_hashfn = [] - sq_unihash = [] - sq_fn = [] - sq_taskname = [] - sq_task = [] noexec = [] - stamppresent = [] - valid_new = set() + tocheck = set() for tid in self.rqdata.runtaskentries: (mc, fn, taskname, taskfn) = split_tid_mcfn(tid) @@ -1597,18 +1724,9 @@ class RunQueue: noexec.append(tid) continue - sq_fn.append(fn) - sq_hashfn.append(self.rqdata.dataCaches[mc].hashfn[taskfn]) - sq_hash.append(self.rqdata.runtaskentries[tid].hash) - sq_unihash.append(self.rqdata.runtaskentries[tid].unihash) - sq_taskname.append(taskname) - sq_task.append(tid) + tocheck.add(tid) - valid = self.validate_hash(sq_fn=sq_fn, sq_task=sq_taskname, sq_hash=sq_hash, sq_hashfn=sq_hashfn, - siginfo=True, sq_unihash=sq_unihash, d=self.cooker.data) - - for v in valid: - valid_new.add(sq_task[v]) + valid_new = self.validate_hashes(tocheck, self.cooker.data, 0, True, summary=False) # Tasks which are both setscene and noexec never care about dependencies # We therefore find tasks which are setscene and noexec and mark their @@ -1627,47 +1745,50 @@ class RunQueue: valid_new.add(dep) invalidtasks = set() - for tid in self.rqdata.runtaskentries: - if tid not in valid_new and tid not in noexec: - invalidtasks.add(tid) - found = set() - processed = set() - for tid in invalidtasks: + toptasks = set(["{}:{}".format(t[3], t[2]) for t in self.rqdata.targets]) + for tid in toptasks: toprocess = set([tid]) while toprocess: next = set() + visited_invalid = set() for t in toprocess: - for dep in self.rqdata.runtaskentries[t].depends: - if dep in invalidtasks: - found.add(tid) - if dep not in processed: - processed.add(dep) + if t not in valid_new and t not in noexec: + invalidtasks.update(get_root_invalid_tasks(t, self.rqdata.runtaskentries, valid_new, noexec, visited_invalid)) + continue + if t in self.rqdata.runq_setscene_tids: + for dep in self.rqexe.sqdata.sq_deps[t]: next.add(dep) + continue + + for dep in self.rqdata.runtaskentries[t].depends: + next.add(dep) + toprocess = next - if tid in found: - toprocess = set() tasklist = [] - for tid in invalidtasks.difference(found): + for tid in invalidtasks: tasklist.append(tid) if tasklist: bb.plain("The differences between the current build and any cached tasks start at the following tasks:\n" + "\n".join(tasklist)) - return invalidtasks.difference(found) + return invalidtasks def write_diffscenetasks(self, invalidtasks): + bb.siggen.check_siggen_version(bb.siggen) # Define recursion callback def recursecb(key, hash1, hash2): hashes = [hash1, hash2] + bb.debug(1, "Recursively looking for recipe {} hashes {}".format(key, hashes)) hashfiles = bb.siggen.find_siginfo(key, None, hashes, self.cfgData) + bb.debug(1, "Found hashfiles:\n{}".format(hashfiles)) recout = [] if len(hashfiles) == 2: - out2 = bb.siggen.compare_sigfiles(hashfiles[hash1], hashfiles[hash2], recursecb) - recout.extend(list(' ' + l for l in out2)) + out2 = bb.siggen.compare_sigfiles(hashfiles[hash1]['path'], hashfiles[hash2]['path'], recursecb) + recout.extend(list(' ' + l for l in out2)) else: recout.append("Unable to find matching sigdata for %s with hashes %s or %s" % (key, hash1, hash2)) @@ -1677,20 +1798,26 @@ class RunQueue: for tid in invalidtasks: (mc, fn, taskname, taskfn) = split_tid_mcfn(tid) pn = self.rqdata.dataCaches[mc].pkg_fn[taskfn] - h = self.rqdata.runtaskentries[tid].hash - matches = bb.siggen.find_siginfo(pn, taskname, [], self.cfgData) + h = self.rqdata.runtaskentries[tid].unihash + bb.debug(1, "Looking for recipe {} task {}".format(pn, taskname)) + matches = bb.siggen.find_siginfo(pn, taskname, [], self.cooker.databuilder.mcdata[mc]) + bb.debug(1, "Found hashfiles:\n{}".format(matches)) match = None - for m in matches: - if h in m: - match = m + for m in matches.values(): + if h in m['path']: + match = m['path'] if match is None: - bb.fatal("Can't find a task we're supposed to have written out? (hash: %s)?" % h) + bb.fatal("Can't find a task we're supposed to have written out? (hash: %s tid: %s)?" % (h, tid)) matches = {k : v for k, v in iter(matches.items()) if h not in k} + matches_local = {k : v for k, v in iter(matches.items()) if h not in k and not v['sstate']} + if matches_local: + matches = matches_local if matches: - latestmatch = sorted(matches.keys(), key=lambda f: matches[f])[-1] + latestmatch = matches[sorted(matches.keys(), key=lambda h: matches[h]['time'])[-1]]['path'] prevh = __find_sha256__.search(latestmatch).group(0) output = bb.siggen.compare_sigfiles(latestmatch, match, recursecb) - bb.plain("\nTask %s:%s couldn't be used from the cache because:\n We need hash %s, closest matching task was %s\n " % (pn, taskname, h, prevh) + '\n '.join(output)) + bb.plain("\nTask %s:%s couldn't be used from the cache because:\n We need hash %s, most recent matching task was %s\n " % (pn, taskname, h, prevh) + '\n '.join(output)) + class RunQueueExecute: @@ -1702,55 +1829,148 @@ class RunQueueExecute: self.number_tasks = int(self.cfgData.getVar("BB_NUMBER_THREADS") or 1) self.scheduler = self.cfgData.getVar("BB_SCHEDULER") or "speed" + self.max_cpu_pressure = self.cfgData.getVar("BB_PRESSURE_MAX_CPU") + self.max_io_pressure = self.cfgData.getVar("BB_PRESSURE_MAX_IO") + self.max_memory_pressure = self.cfgData.getVar("BB_PRESSURE_MAX_MEMORY") + self.max_loadfactor = self.cfgData.getVar("BB_LOADFACTOR_MAX") + + self.sq_buildable = set() + self.sq_running = set() + self.sq_live = set() + + self.updated_taskhash_queue = [] + self.pending_migrations = set() self.runq_buildable = set() self.runq_running = set() self.runq_complete = set() + self.runq_tasksrun = set() self.build_stamps = {} self.build_stamps2 = [] self.failed_tids = [] + self.sq_deferred = {} + self.sq_needed_harddeps = set() + self.sq_harddep_deferred = set() self.stampcache = {} - for mc in rq.worker: - rq.worker[mc].pipe.setrunqueueexec(self) - for mc in rq.fakeworker: - rq.fakeworker[mc].pipe.setrunqueueexec(self) + self.holdoff_tasks = set() + self.holdoff_need_update = True + self.sqdone = False + + self.stats = RunQueueStats(len(self.rqdata.runtaskentries), len(self.rqdata.runq_setscene_tids)) if self.number_tasks <= 0: bb.fatal("Invalid BB_NUMBER_THREADS %s" % self.number_tasks) - def runqueue_process_waitpid(self, task, status): + lower_limit = 1.0 + upper_limit = 1000000.0 + if self.max_cpu_pressure: + self.max_cpu_pressure = float(self.max_cpu_pressure) + if self.max_cpu_pressure < lower_limit: + bb.fatal("Invalid BB_PRESSURE_MAX_CPU %s, minimum value is %s." % (self.max_cpu_pressure, lower_limit)) + if self.max_cpu_pressure > upper_limit: + bb.warn("Your build will be largely unregulated since BB_PRESSURE_MAX_CPU is set to %s. It is very unlikely that such high pressure will be experienced." % (self.max_cpu_pressure)) + + if self.max_io_pressure: + self.max_io_pressure = float(self.max_io_pressure) + if self.max_io_pressure < lower_limit: + bb.fatal("Invalid BB_PRESSURE_MAX_IO %s, minimum value is %s." % (self.max_io_pressure, lower_limit)) + if self.max_io_pressure > upper_limit: + bb.warn("Your build will be largely unregulated since BB_PRESSURE_MAX_IO is set to %s. It is very unlikely that such high pressure will be experienced." % (self.max_io_pressure)) + + if self.max_memory_pressure: + self.max_memory_pressure = float(self.max_memory_pressure) + if self.max_memory_pressure < lower_limit: + bb.fatal("Invalid BB_PRESSURE_MAX_MEMORY %s, minimum value is %s." % (self.max_memory_pressure, lower_limit)) + if self.max_memory_pressure > upper_limit: + bb.warn("Your build will be largely unregulated since BB_PRESSURE_MAX_MEMORY is set to %s. It is very unlikely that such high pressure will be experienced." % (self.max_io_pressure)) + + if self.max_loadfactor: + self.max_loadfactor = float(self.max_loadfactor) + if self.max_loadfactor <= 0: + bb.fatal("Invalid BB_LOADFACTOR_MAX %s, needs to be greater than zero." % (self.max_loadfactor)) + + # List of setscene tasks which we've covered + self.scenequeue_covered = set() + # List of tasks which are covered (including setscene ones) + self.tasks_covered = set() + self.tasks_scenequeue_done = set() + self.scenequeue_notcovered = set() + self.tasks_notcovered = set() + self.scenequeue_notneeded = set() + + schedulers = self.get_schedulers() + for scheduler in schedulers: + if self.scheduler == scheduler.name: + self.sched = scheduler(self, self.rqdata) + logger.debug("Using runqueue scheduler '%s'", scheduler.name) + break + else: + bb.fatal("Invalid scheduler '%s'. Available schedulers: %s" % + (self.scheduler, ", ".join(obj.name for obj in schedulers))) + + #if self.rqdata.runq_setscene_tids: + self.sqdata = SQData() + build_scenequeue_data(self.sqdata, self.rqdata, self) + + update_scenequeue_data(self.sqdata.sq_revdeps, self.sqdata, self.rqdata, self.rq, self.cooker, self.stampcache, self, summary=True) + + # Compute a list of 'stale' sstate tasks where the current hash does not match the one + # in any stamp files. Pass the list out to metadata as an event. + found = {} + for tid in self.rqdata.runq_setscene_tids: + (mc, fn, taskname, taskfn) = split_tid_mcfn(tid) + stamps = bb.build.find_stale_stamps(taskname, taskfn) + if stamps: + if mc not in found: + found[mc] = {} + found[mc][tid] = stamps + for mc in found: + event = bb.event.StaleSetSceneTasks(found[mc]) + bb.event.fire(event, self.cooker.databuilder.mcdata[mc]) + + self.build_taskdepdata_cache() + + def runqueue_process_waitpid(self, task, status, fakerootlog=None): # self.build_stamps[pid] may not exist when use shared work directory. if task in self.build_stamps: self.build_stamps2.remove(self.build_stamps[task]) del self.build_stamps[task] - if status != 0: - self.task_fail(task, status) + if task in self.sq_live: + if status != 0: + self.sq_task_fail(task, status) + else: + self.sq_task_complete(task) + self.sq_live.remove(task) + self.stats.updateActiveSetscene(len(self.sq_live)) else: - self.task_complete(task) + if status != 0: + self.task_fail(task, status, fakerootlog=fakerootlog) + else: + self.task_complete(task) return True def finish_now(self): for mc in self.rq.worker: try: - self.rq.worker[mc].process.stdin.write(b"<finishnow></finishnow>") + RunQueue.send_pickled_data(self.rq.worker[mc].process, b"", "finishnow") self.rq.worker[mc].process.stdin.flush() except IOError: # worker must have died? pass for mc in self.rq.fakeworker: try: - self.rq.fakeworker[mc].process.stdin.write(b"<finishnow></finishnow>") + RunQueue.send_pickled_data(self.rq.fakeworker[mc].process, b"", "finishnow") self.rq.fakeworker[mc].process.stdin.flush() except IOError: # worker must have died? pass - if len(self.failed_tids) != 0: + if self.failed_tids: self.rq.state = runQueueFailed return @@ -1760,22 +1980,27 @@ class RunQueueExecute: def finish(self): self.rq.state = runQueueCleanUp - if self.stats.active > 0: - bb.event.fire(runQueueExitWait(self.stats.active), self.cfgData) + active = self.stats.active + len(self.sq_live) + if active > 0: + bb.event.fire(runQueueExitWait(active), self.cfgData) self.rq.read_workers() return self.rq.active_fds() - if len(self.failed_tids) != 0: + if self.failed_tids: self.rq.state = runQueueFailed return True self.rq.state = runQueueComplete return True - def check_dependencies(self, task, taskdeps, setscene = False): + # Used by setscene only + def check_dependencies(self, task, taskdeps): if not self.rq.depvalidate: return False + # Must not edit parent data + taskdeps = set(taskdeps) + taskdata = {} taskdeps.add(task) for dep in taskdeps: @@ -1788,121 +2013,10 @@ class RunQueueExecute: return valid def can_start_task(self): - can_start = self.stats.active < self.number_tasks + active = self.stats.active + len(self.sq_live) + can_start = active < self.number_tasks return can_start -class RunQueueExecuteDummy(RunQueueExecute): - def __init__(self, rq): - self.rq = rq - self.stats = RunQueueStats(0) - - def finish(self): - self.rq.state = runQueueComplete - return - -class RunQueueExecuteTasks(RunQueueExecute): - def __init__(self, rq): - RunQueueExecute.__init__(self, rq) - - self.stats = RunQueueStats(len(self.rqdata.runtaskentries)) - - self.stampcache = {} - - initial_covered = self.rq.scenequeue_covered.copy() - - # Mark initial buildable tasks - for tid in self.rqdata.runtaskentries: - if len(self.rqdata.runtaskentries[tid].depends) == 0: - self.runq_buildable.add(tid) - if len(self.rqdata.runtaskentries[tid].revdeps) > 0 and self.rqdata.runtaskentries[tid].revdeps.issubset(self.rq.scenequeue_covered): - self.rq.scenequeue_covered.add(tid) - - found = True - while found: - found = False - for tid in self.rqdata.runtaskentries: - if tid in self.rq.scenequeue_covered: - continue - logger.debug(1, 'Considering %s: %s' % (tid, str(self.rqdata.runtaskentries[tid].revdeps))) - - if len(self.rqdata.runtaskentries[tid].revdeps) > 0 and self.rqdata.runtaskentries[tid].revdeps.issubset(self.rq.scenequeue_covered): - if tid in self.rq.scenequeue_notcovered: - continue - found = True - self.rq.scenequeue_covered.add(tid) - - logger.debug(1, 'Skip list (pre setsceneverify) %s', sorted(self.rq.scenequeue_covered)) - - # Allow the metadata to elect for setscene tasks to run anyway - covered_remove = set() - if self.rq.setsceneverify: - invalidtasks = [] - tasknames = {} - fns = {} - for tid in self.rqdata.runtaskentries: - (mc, fn, taskname, taskfn) = split_tid_mcfn(tid) - taskdep = self.rqdata.dataCaches[mc].task_deps[taskfn] - fns[tid] = taskfn - tasknames[tid] = taskname - if 'noexec' in taskdep and taskname in taskdep['noexec']: - continue - if self.rq.check_stamp_task(tid, taskname + "_setscene", cache=self.stampcache): - logger.debug(2, 'Setscene stamp current for task %s', tid) - continue - if self.rq.check_stamp_task(tid, taskname, recurse = True, cache=self.stampcache): - logger.debug(2, 'Normal stamp current for task %s', tid) - continue - invalidtasks.append(tid) - - call = self.rq.setsceneverify + "(covered, tasknames, fns, d, invalidtasks=invalidtasks)" - locs = { "covered" : self.rq.scenequeue_covered, "tasknames" : tasknames, "fns" : fns, "d" : self.cooker.data, "invalidtasks" : invalidtasks } - covered_remove = bb.utils.better_eval(call, locs) - - def removecoveredtask(tid): - (mc, fn, taskname, taskfn) = split_tid_mcfn(tid) - taskname = taskname + '_setscene' - bb.build.del_stamp(taskname, self.rqdata.dataCaches[mc], taskfn) - self.rq.scenequeue_covered.remove(tid) - - toremove = covered_remove | self.rq.scenequeue_notcovered - for task in toremove: - logger.debug(1, 'Not skipping task %s due to setsceneverify', task) - while toremove: - covered_remove = [] - for task in toremove: - if task in self.rq.scenequeue_covered: - removecoveredtask(task) - for deptask in self.rqdata.runtaskentries[task].depends: - if deptask not in self.rq.scenequeue_covered: - continue - if deptask in toremove or deptask in covered_remove or deptask in initial_covered: - continue - logger.debug(1, 'Task %s depends on task %s so not skipping' % (task, deptask)) - covered_remove.append(deptask) - toremove = covered_remove - - logger.debug(1, 'Full skip list %s', self.rq.scenequeue_covered) - - - for mc in self.rqdata.dataCaches: - target_pairs = [] - for tid in self.rqdata.target_tids: - (tidmc, fn, taskname, _) = split_tid_mcfn(tid) - if tidmc == mc: - target_pairs.append((fn, taskname)) - - event.fire(bb.event.StampUpdate(target_pairs, self.rqdata.dataCaches[mc].stamp), self.cfgData) - - schedulers = self.get_schedulers() - for scheduler in schedulers: - if self.scheduler == scheduler.name: - self.sched = scheduler(self, self.rqdata) - logger.debug(1, "Using runqueue scheduler '%s'", scheduler.name) - break - else: - bb.fatal("Invalid scheduler '%s'. Available schedulers: %s" % - (self.scheduler, ", ".join(obj.name for obj in schedulers))) - def get_schedulers(self): schedulers = set(obj for obj in globals().values() if type(obj) is type and @@ -1919,8 +2033,7 @@ class RunQueueExecuteTasks(RunQueueExecute): try: module = __import__(modname, fromlist=(name,)) except ImportError as exc: - logger.critical("Unable to import scheduler '%s' from '%s': %s" % (name, modname, exc)) - raise SystemExit(1) + bb.fatal("Unable to import scheduler '%s' from '%s': %s" % (name, modname, exc)) else: schedulers.add(getattr(module, name)) return schedulers @@ -1948,22 +2061,54 @@ class RunQueueExecuteTasks(RunQueueExecute): break if alldeps: self.setbuildable(revdep) - logger.debug(1, "Marking task %s as buildable", revdep) + logger.debug("Marking task %s as buildable", revdep) + + found = None + for t in sorted(self.sq_deferred.copy()): + if self.sq_deferred[t] == task: + # Allow the next deferred task to run. Any other deferred tasks should be deferred after that task. + # We shouldn't allow all to run at once as it is prone to races. + if not found: + bb.debug(1, "Deferred task %s now buildable" % t) + del self.sq_deferred[t] + update_scenequeue_data([t], self.sqdata, self.rqdata, self.rq, self.cooker, self.stampcache, self, summary=False) + found = t + else: + bb.debug(1, "Deferring %s after %s" % (t, found)) + self.sq_deferred[t] = found def task_complete(self, task): self.stats.taskCompleted() bb.event.fire(runQueueTaskCompleted(task, self.stats, self.rq), self.cfgData) self.task_completeoutright(task) + self.runq_tasksrun.add(task) - def task_fail(self, task, exitcode): + def task_fail(self, task, exitcode, fakerootlog=None): """ Called when a task has failed Updates the state engine with the failure """ self.stats.taskFailed() self.failed_tids.append(task) - bb.event.fire(runQueueTaskFailed(task, self.stats, exitcode, self.rq), self.cfgData) - if self.rqdata.taskData[''].abort: + + fakeroot_log = [] + if fakerootlog and os.path.exists(fakerootlog): + with open(fakerootlog) as fakeroot_log_file: + fakeroot_failed = False + for line in reversed(fakeroot_log_file.readlines()): + for fakeroot_error in ['mismatch', 'error', 'fatal']: + if fakeroot_error in line.lower(): + fakeroot_failed = True + if 'doing new pid setup and server start' in line: + break + fakeroot_log.append(line) + + if not fakeroot_failed: + fakeroot_log = [] + + bb.event.fire(runQueueTaskFailed(task, self.stats, exitcode, self.rq, fakeroot_log=("".join(fakeroot_log) or None)), self.cfgData) + + if self.rqdata.taskData[''].halt: self.rq.state = runQueueCleanUp def task_skip(self, task, reason): @@ -1974,75 +2119,218 @@ class RunQueueExecuteTasks(RunQueueExecute): self.stats.taskSkipped() self.stats.taskCompleted() + def summarise_scenequeue_errors(self): + err = False + if not self.sqdone: + logger.debug('We could skip tasks %s', "\n".join(sorted(self.scenequeue_covered))) + completeevent = sceneQueueComplete(self.stats, self.rq) + bb.event.fire(completeevent, self.cfgData) + if self.sq_deferred: + logger.error("Scenequeue had deferred entries: %s" % pprint.pformat(self.sq_deferred)) + err = True + if self.updated_taskhash_queue: + logger.error("Scenequeue had unprocessed changed taskhash entries: %s" % pprint.pformat(self.updated_taskhash_queue)) + err = True + if self.holdoff_tasks: + logger.error("Scenequeue had holdoff tasks: %s" % pprint.pformat(self.holdoff_tasks)) + err = True + + for tid in self.scenequeue_covered.intersection(self.scenequeue_notcovered): + # No task should end up in both covered and uncovered, that is a bug. + logger.error("Setscene task %s in both covered and notcovered." % tid) + + for tid in self.rqdata.runq_setscene_tids: + if tid not in self.scenequeue_covered and tid not in self.scenequeue_notcovered: + err = True + logger.error("Setscene Task %s was never marked as covered or not covered" % tid) + if tid not in self.sq_buildable: + err = True + logger.error("Setscene Task %s was never marked as buildable" % tid) + if tid not in self.sq_running: + err = True + logger.error("Setscene Task %s was never marked as running" % tid) + + for x in self.rqdata.runtaskentries: + if x not in self.tasks_covered and x not in self.tasks_notcovered: + logger.error("Task %s was never moved from the setscene queue" % x) + err = True + if x not in self.tasks_scenequeue_done: + logger.error("Task %s was never processed by the setscene code" % x) + err = True + if not self.rqdata.runtaskentries[x].depends and x not in self.runq_buildable: + logger.error("Task %s was never marked as buildable by the setscene code" % x) + err = True + return err + + def execute(self): """ - Run the tasks in a queue prepared by rqdata.prepare() + Run the tasks in a queue prepared by prepare_runqueue """ - if self.rqdata.setscenewhitelist is not None and not self.rqdata.setscenewhitelist_checked: - self.rqdata.setscenewhitelist_checked = True + self.rq.read_workers() + if self.updated_taskhash_queue or self.pending_migrations: + self.process_possible_migrations() - # Check tasks that are going to run against the whitelist - def check_norun_task(tid, showerror=False): - (mc, fn, taskname, taskfn) = split_tid_mcfn(tid) - # Ignore covered tasks - if tid in self.rq.scenequeue_covered: - return False - # Ignore stamped tasks - if self.rq.check_stamp_task(tid, taskname, cache=self.stampcache): - return False - # Ignore noexec tasks - taskdep = self.rqdata.dataCaches[mc].task_deps[taskfn] - if 'noexec' in taskdep and taskname in taskdep['noexec']: - return False + if not hasattr(self, "sorted_setscene_tids"): + # Don't want to sort this set every execution + self.sorted_setscene_tids = sorted(self.rqdata.runq_setscene_tids) - pn = self.rqdata.dataCaches[mc].pkg_fn[taskfn] - if not check_setscene_enforce_whitelist(pn, taskname, self.rqdata.setscenewhitelist): - if showerror: - if tid in self.rqdata.runq_setscene_tids: - logger.error('Task %s.%s attempted to execute unexpectedly and should have been setscened' % (pn, taskname)) - else: - logger.error('Task %s.%s attempted to execute unexpectedly' % (pn, taskname)) - return True - return False - # Look to see if any tasks that we think shouldn't run are going to - unexpected = False - for tid in self.rqdata.runtaskentries: - if check_norun_task(tid): - unexpected = True + task = None + if not self.sqdone and self.can_start_task(): + # Find the next setscene to run + for nexttask in self.sorted_setscene_tids: + if nexttask in self.sq_buildable and nexttask not in self.sq_running and self.sqdata.stamps[nexttask] not in self.build_stamps.values() and nexttask not in self.sq_harddep_deferred: + if nexttask not in self.sqdata.unskippable and self.sqdata.sq_revdeps[nexttask] and \ + nexttask not in self.sq_needed_harddeps and \ + self.sqdata.sq_revdeps[nexttask].issubset(self.scenequeue_covered) and \ + self.check_dependencies(nexttask, self.sqdata.sq_revdeps[nexttask]): + if nexttask not in self.rqdata.target_tids: + logger.debug2("Skipping setscene for task %s" % nexttask) + self.sq_task_skip(nexttask) + self.scenequeue_notneeded.add(nexttask) + if nexttask in self.sq_deferred: + del self.sq_deferred[nexttask] + return True + if nexttask in self.sqdata.sq_harddeps_rev and not self.sqdata.sq_harddeps_rev[nexttask].issubset(self.scenequeue_covered | self.scenequeue_notcovered): + logger.debug2("Deferring %s due to hard dependencies" % nexttask) + updated = False + for dep in self.sqdata.sq_harddeps_rev[nexttask]: + if dep not in self.sq_needed_harddeps: + logger.debug2("Enabling task %s as it is a hard dependency" % dep) + self.sq_buildable.add(dep) + self.sq_needed_harddeps.add(dep) + updated = True + self.sq_harddep_deferred.add(nexttask) + if updated: + return True + continue + # If covered tasks are running, need to wait for them to complete + for t in self.sqdata.sq_covered_tasks[nexttask]: + if t in self.runq_running and t not in self.runq_complete: + continue + if nexttask in self.sq_deferred: + if self.sq_deferred[nexttask] not in self.runq_complete: + continue + logger.debug("Task %s no longer deferred" % nexttask) + del self.sq_deferred[nexttask] + valid = self.rq.validate_hashes(set([nexttask]), self.cooker.data, 0, False, summary=False) + if not valid: + logger.debug("%s didn't become valid, skipping setscene" % nexttask) + self.sq_task_failoutright(nexttask) + return True + if nexttask in self.sqdata.outrightfail: + logger.debug2('No package found, so skipping setscene task %s', nexttask) + self.sq_task_failoutright(nexttask) + return True + if nexttask in self.sqdata.unskippable: + logger.debug2("Setscene task %s is unskippable" % nexttask) + task = nexttask break - if unexpected: - # Run through the tasks in the rough order they'd have executed and print errors - # (since the order can be useful - usually missing sstate for the last few tasks - # is the cause of the problem) - task = self.sched.next() - while task is not None: - check_norun_task(task, showerror=True) - self.task_skip(task, 'Setscene enforcement check') - task = self.sched.next() + if task is not None: + (mc, fn, taskname, taskfn) = split_tid_mcfn(task) + taskname = taskname + "_setscene" + if self.rq.check_stamp_task(task, taskname_from_tid(task), recurse = True, cache=self.stampcache): + logger.debug2('Stamp for underlying task %s is current, so skipping setscene variant', task) + self.sq_task_failoutright(task) + return True - self.rq.state = runQueueCleanUp + if self.cooker.configuration.force: + if task in self.rqdata.target_tids: + self.sq_task_failoutright(task) + return True + + if self.rq.check_stamp_task(task, taskname, cache=self.stampcache): + logger.debug2('Setscene stamp current task %s, so skip it and its dependencies', task) + self.sq_task_skip(task) return True - self.rq.read_workers() + if self.cooker.configuration.skipsetscene: + logger.debug2('No setscene tasks should be executed. Skipping %s', task) + self.sq_task_failoutright(task) + return True - if self.stats.total == 0: - # nothing to do - self.rq.state = runQueueCleanUp + startevent = sceneQueueTaskStarted(task, self.stats, self.rq) + bb.event.fire(startevent, self.cfgData) + + taskdep = self.rqdata.dataCaches[mc].task_deps[taskfn] + realfn = bb.cache.virtualfn2realfn(taskfn)[0] + runtask = { + 'fn' : taskfn, + 'task' : task, + 'taskname' : taskname, + 'taskhash' : self.rqdata.get_task_hash(task), + 'unihash' : self.rqdata.get_task_unihash(task), + 'quieterrors' : True, + 'appends' : self.cooker.collections[mc].get_file_appends(taskfn), + 'layername' : self.cooker.collections[mc].calc_bbfile_priority(realfn)[2], + 'taskdepdata' : self.sq_build_taskdepdata(task), + 'dry_run' : False, + 'taskdep': taskdep, + 'fakerootenv' : self.rqdata.dataCaches[mc].fakerootenv[taskfn], + 'fakerootdirs' : self.rqdata.dataCaches[mc].fakerootdirs[taskfn], + 'fakerootnoenv' : self.rqdata.dataCaches[mc].fakerootnoenv[taskfn] + } + + if 'fakeroot' in taskdep and taskname in taskdep['fakeroot'] and not self.cooker.configuration.dry_run: + if not mc in self.rq.fakeworker: + self.rq.start_fakeworker(self, mc) + RunQueue.send_pickled_data(self.rq.fakeworker[mc].process, runtask, "runtask") + self.rq.fakeworker[mc].process.stdin.flush() + else: + RunQueue.send_pickled_data(self.rq.worker[mc].process, runtask, "runtask") + self.rq.worker[mc].process.stdin.flush() + + self.build_stamps[task] = bb.parse.siggen.stampfile_mcfn(taskname, taskfn, extrainfo=False) + self.build_stamps2.append(self.build_stamps[task]) + self.sq_running.add(task) + self.sq_live.add(task) + self.stats.updateActiveSetscene(len(self.sq_live)) + if self.can_start_task(): + return True + + self.update_holdofftasks() + + if not self.sq_live and not self.sqdone and not self.sq_deferred and not self.updated_taskhash_queue and not self.holdoff_tasks: + hashequiv_logger.verbose("Setscene tasks completed") + + err = self.summarise_scenequeue_errors() + if err: + self.rq.state = runQueueFailed + return True + + if self.cooker.configuration.setsceneonly: + self.rq.state = runQueueComplete + return True + self.sqdone = True + + if self.stats.total == 0: + # nothing to do + self.rq.state = runQueueComplete + return True - task = self.sched.next() + if self.cooker.configuration.setsceneonly: + task = None + else: + task = self.sched.next() if task is not None: (mc, fn, taskname, taskfn) = split_tid_mcfn(task) - if task in self.rq.scenequeue_covered: - logger.debug(2, "Setscene covered task %s", task) + if self.rqdata.setscene_ignore_tasks is not None: + if self.check_setscene_ignore_tasks(task): + self.task_fail(task, "setscene ignore_tasks") + return True + + if task in self.tasks_covered: + logger.debug2("Setscene covered task %s", task) self.task_skip(task, "covered") return True if self.rq.check_stamp_task(task, taskname, cache=self.stampcache): - logger.debug(2, "Stamp current task %s", task) + logger.debug2("Stamp current task %s", task) self.task_skip(task, "existing") + self.runq_tasksrun.add(task) return True taskdep = self.rqdata.dataCaches[mc].task_deps[taskfn] @@ -2053,18 +2341,32 @@ class RunQueueExecuteTasks(RunQueueExecute): self.runq_running.add(task) self.stats.taskActive() if not (self.cooker.configuration.dry_run or self.rqdata.setscene_enforce): - bb.build.make_stamp(taskname, self.rqdata.dataCaches[mc], taskfn) + bb.build.make_stamp_mcfn(taskname, taskfn) self.task_complete(task) return True else: startevent = runQueueTaskStarted(task, self.stats, self.rq) bb.event.fire(startevent, self.cfgData) - taskdepdata = self.build_taskdepdata(task) - taskdep = self.rqdata.dataCaches[mc].task_deps[taskfn] - taskhash = self.rqdata.get_task_hash(task) - unihash = self.rqdata.get_task_unihash(task) + realfn = bb.cache.virtualfn2realfn(taskfn)[0] + runtask = { + 'fn' : taskfn, + 'task' : task, + 'taskname' : taskname, + 'taskhash' : self.rqdata.get_task_hash(task), + 'unihash' : self.rqdata.get_task_unihash(task), + 'quieterrors' : False, + 'appends' : self.cooker.collections[mc].get_file_appends(taskfn), + 'layername' : self.cooker.collections[mc].calc_bbfile_priority(realfn)[2], + 'taskdepdata' : self.build_taskdepdata(task), + 'dry_run' : self.rqdata.setscene_enforce, + 'taskdep': taskdep, + 'fakerootenv' : self.rqdata.dataCaches[mc].fakerootenv[taskfn], + 'fakerootdirs' : self.rqdata.dataCaches[mc].fakerootdirs[taskfn], + 'fakerootnoenv' : self.rqdata.dataCaches[mc].fakerootnoenv[taskfn] + } + if 'fakeroot' in taskdep and taskname in taskdep['fakeroot'] and not (self.cooker.configuration.dry_run or self.rqdata.setscene_enforce): if not mc in self.rq.fakeworker: try: @@ -2074,68 +2376,93 @@ class RunQueueExecuteTasks(RunQueueExecute): self.rq.state = runQueueFailed self.stats.taskFailed() return True - self.rq.fakeworker[mc].process.stdin.write(b"<runtask>" + pickle.dumps((taskfn, task, taskname, taskhash, unihash, False, self.cooker.collection.get_file_appends(taskfn), taskdepdata, self.rqdata.setscene_enforce)) + b"</runtask>") + RunQueue.send_pickled_data(self.rq.fakeworker[mc].process, runtask, "runtask") self.rq.fakeworker[mc].process.stdin.flush() else: - self.rq.worker[mc].process.stdin.write(b"<runtask>" + pickle.dumps((taskfn, task, taskname, taskhash, unihash, False, self.cooker.collection.get_file_appends(taskfn), taskdepdata, self.rqdata.setscene_enforce)) + b"</runtask>") + RunQueue.send_pickled_data(self.rq.worker[mc].process, runtask, "runtask") self.rq.worker[mc].process.stdin.flush() - self.build_stamps[task] = bb.build.stampfile(taskname, self.rqdata.dataCaches[mc], taskfn, noextra=True) + self.build_stamps[task] = bb.parse.siggen.stampfile_mcfn(taskname, taskfn, extrainfo=False) self.build_stamps2.append(self.build_stamps[task]) self.runq_running.add(task) self.stats.taskActive() if self.can_start_task(): return True - if self.stats.active > 0: + if self.stats.active > 0 or self.sq_live: self.rq.read_workers() return self.rq.active_fds() - if len(self.failed_tids) != 0: + # No more tasks can be run. If we have deferred setscene tasks we should run them. + if self.sq_deferred: + deferred_tid = list(self.sq_deferred.keys())[0] + blocking_tid = self.sq_deferred.pop(deferred_tid) + logger.warning("Runqueue deadlocked on deferred tasks, forcing task %s blocked by %s" % (deferred_tid, blocking_tid)) + return True + + if self.failed_tids: self.rq.state = runQueueFailed return True # Sanity Checks + err = self.summarise_scenequeue_errors() for task in self.rqdata.runtaskentries: if task not in self.runq_buildable: logger.error("Task %s never buildable!", task) - if task not in self.runq_running: + err = True + elif task not in self.runq_running: logger.error("Task %s never ran!", task) - if task not in self.runq_complete: + err = True + elif task not in self.runq_complete: logger.error("Task %s never completed!", task) - self.rq.state = runQueueComplete + err = True + + if err: + self.rq.state = runQueueFailed + else: + self.rq.state = runQueueComplete return True - def filtermcdeps(self, task, deps): + def filtermcdeps(self, task, mc, deps): ret = set() - mainmc = mc_from_tid(task) for dep in deps: - mc = mc_from_tid(dep) - if mc != mainmc: + thismc = mc_from_tid(dep) + if thismc != mc: continue ret.add(dep) return ret - # We filter out multiconfig dependencies from taskdepdata we pass to the tasks + # Build the individual cache entries in advance once to save time + def build_taskdepdata_cache(self): + taskdepdata_cache = {} + for task in self.rqdata.runtaskentries: + (mc, fn, taskname, taskfn) = split_tid_mcfn(task) + pn = self.rqdata.dataCaches[mc].pkg_fn[taskfn] + deps = self.rqdata.runtaskentries[task].depends + provides = self.rqdata.dataCaches[mc].fn_provides[taskfn] + taskhash = self.rqdata.runtaskentries[task].hash + unihash = self.rqdata.runtaskentries[task].unihash + deps = self.filtermcdeps(task, mc, deps) + hashfn = self.rqdata.dataCaches[mc].hashfn[taskfn] + taskdepdata_cache[task] = [pn, taskname, fn, deps, provides, taskhash, unihash, hashfn] + + self.taskdepdata_cache = taskdepdata_cache + + # We filter out multiconfig dependencies from taskdepdata we pass to the tasks # as most code can't handle them def build_taskdepdata(self, task): taskdepdata = {} - next = self.rqdata.runtaskentries[task].depends + mc = mc_from_tid(task) + next = self.rqdata.runtaskentries[task].depends.copy() next.add(task) - next = self.filtermcdeps(task, next) + next = self.filtermcdeps(task, mc, next) while next: additional = [] for revdep in next: - (mc, fn, taskname, taskfn) = split_tid_mcfn(revdep) - pn = self.rqdata.dataCaches[mc].pkg_fn[taskfn] - deps = self.rqdata.runtaskentries[revdep].depends - provides = self.rqdata.dataCaches[mc].fn_provides[taskfn] - taskhash = self.rqdata.runtaskentries[revdep].hash - unihash = self.rqdata.runtaskentries[revdep].unihash - deps = self.filtermcdeps(task, deps) - taskdepdata[revdep] = [pn, taskname, fn, deps, provides, taskhash, unihash] - for revdep2 in deps: + self.taskdepdata_cache[revdep][6] = self.rqdata.runtaskentries[revdep].unihash + taskdepdata[revdep] = self.taskdepdata_cache[revdep] + for revdep2 in self.taskdepdata_cache[revdep][3]: if revdep2 not in taskdepdata: additional.append(revdep2) next = additional @@ -2143,415 +2470,319 @@ class RunQueueExecuteTasks(RunQueueExecute): #bb.note("Task %s: " % task + str(taskdepdata).replace("], ", "],\n")) return taskdepdata -class RunQueueExecuteScenequeue(RunQueueExecute): - def __init__(self, rq): - RunQueueExecute.__init__(self, rq) + def update_holdofftasks(self): - self.scenequeue_covered = set() - self.scenequeue_notcovered = set() - self.scenequeue_notneeded = set() - - # If we don't have any setscene functions, skip this step - if len(self.rqdata.runq_setscene_tids) == 0: - rq.scenequeue_covered = set() - rq.scenequeue_notcovered = set() - rq.state = runQueueRunInit + if not self.holdoff_need_update: return - self.stats = RunQueueStats(len(self.rqdata.runq_setscene_tids)) - - sq_revdeps = {} - sq_revdeps_new = {} - sq_revdeps_squash = {} - self.sq_harddeps = {} - self.stamps = {} + notcovered = set(self.scenequeue_notcovered) + notcovered |= self.sqdata.cantskip + for tid in self.scenequeue_notcovered: + notcovered |= self.sqdata.sq_covered_tasks[tid] + notcovered |= self.sqdata.unskippable.difference(self.rqdata.runq_setscene_tids) + notcovered.intersection_update(self.tasks_scenequeue_done) - # We need to construct a dependency graph for the setscene functions. Intermediate - # dependencies between the setscene tasks only complicate the code. This code - # therefore aims to collapse the huge runqueue dependency tree into a smaller one - # only containing the setscene functions. + covered = set(self.scenequeue_covered) + for tid in self.scenequeue_covered: + covered |= self.sqdata.sq_covered_tasks[tid] + covered.difference_update(notcovered) + covered.intersection_update(self.tasks_scenequeue_done) - self.rqdata.init_progress_reporter.next_stage() + for tid in notcovered | covered: + if not self.rqdata.runtaskentries[tid].depends: + self.setbuildable(tid) + elif self.rqdata.runtaskentries[tid].depends.issubset(self.runq_complete): + self.setbuildable(tid) - # First process the chains up to the first setscene task. - endpoints = {} - for tid in self.rqdata.runtaskentries: - sq_revdeps[tid] = copy.copy(self.rqdata.runtaskentries[tid].revdeps) - sq_revdeps_new[tid] = set() - if (len(sq_revdeps[tid]) == 0) and tid not in self.rqdata.runq_setscene_tids: - #bb.warn("Added endpoint %s" % (tid)) - endpoints[tid] = set() + self.tasks_covered = covered + self.tasks_notcovered = notcovered - self.rqdata.init_progress_reporter.next_stage() + self.holdoff_tasks = set() - # Secondly process the chains between setscene tasks. - for tid in self.rqdata.runq_setscene_tids: - #bb.warn("Added endpoint 2 %s" % (tid)) - for dep in self.rqdata.runtaskentries[tid].depends: - if tid in sq_revdeps[dep]: - sq_revdeps[dep].remove(tid) - if dep not in endpoints: - endpoints[dep] = set() - #bb.warn(" Added endpoint 3 %s" % (dep)) - endpoints[dep].add(tid) - - self.rqdata.init_progress_reporter.next_stage() - - def process_endpoints(endpoints): - newendpoints = {} - for point, task in endpoints.items(): - tasks = set() - if task: - tasks |= task - if sq_revdeps_new[point]: - tasks |= sq_revdeps_new[point] - sq_revdeps_new[point] = set() - if point in self.rqdata.runq_setscene_tids: - sq_revdeps_new[point] = tasks - tasks = set() - continue - for dep in self.rqdata.runtaskentries[point].depends: - if point in sq_revdeps[dep]: - sq_revdeps[dep].remove(point) - if tasks: - sq_revdeps_new[dep] |= tasks - if len(sq_revdeps[dep]) == 0 and dep not in self.rqdata.runq_setscene_tids: - newendpoints[dep] = task - if len(newendpoints) != 0: - process_endpoints(newendpoints) - - process_endpoints(endpoints) - - self.rqdata.init_progress_reporter.next_stage() - - # Build a list of setscene tasks which are "unskippable" - # These are direct endpoints referenced by the build - endpoints2 = {} - sq_revdeps2 = {} - sq_revdeps_new2 = {} - def process_endpoints2(endpoints): - newendpoints = {} - for point, task in endpoints.items(): - tasks = set([point]) - if task: - tasks |= task - if sq_revdeps_new2[point]: - tasks |= sq_revdeps_new2[point] - sq_revdeps_new2[point] = set() - if point in self.rqdata.runq_setscene_tids: - sq_revdeps_new2[point] = tasks - for dep in self.rqdata.runtaskentries[point].depends: - if point in sq_revdeps2[dep]: - sq_revdeps2[dep].remove(point) - if tasks: - sq_revdeps_new2[dep] |= tasks - if (len(sq_revdeps2[dep]) == 0 or len(sq_revdeps_new2[dep]) != 0) and dep not in self.rqdata.runq_setscene_tids: - newendpoints[dep] = tasks - if len(newendpoints) != 0: - process_endpoints2(newendpoints) - for tid in self.rqdata.runtaskentries: - sq_revdeps2[tid] = copy.copy(self.rqdata.runtaskentries[tid].revdeps) - sq_revdeps_new2[tid] = set() - if (len(sq_revdeps2[tid]) == 0) and tid not in self.rqdata.runq_setscene_tids: - endpoints2[tid] = set() - process_endpoints2(endpoints2) - self.unskippable = [] for tid in self.rqdata.runq_setscene_tids: - if sq_revdeps_new2[tid]: - self.unskippable.append(tid) + if tid not in self.scenequeue_covered and tid not in self.scenequeue_notcovered: + self.holdoff_tasks.add(tid) - self.rqdata.init_progress_reporter.next_stage(len(self.rqdata.runtaskentries)) + for tid in self.holdoff_tasks.copy(): + for dep in self.sqdata.sq_covered_tasks[tid]: + if dep not in self.runq_complete: + self.holdoff_tasks.add(dep) - for taskcounter, tid in enumerate(self.rqdata.runtaskentries): - if tid in self.rqdata.runq_setscene_tids: - deps = set() - for dep in sq_revdeps_new[tid]: - deps.add(dep) - sq_revdeps_squash[tid] = deps - elif len(sq_revdeps_new[tid]) != 0: - bb.msg.fatal("RunQueue", "Something went badly wrong during scenequeue generation, aborting. Please report this problem.") - self.rqdata.init_progress_reporter.update(taskcounter) - - self.rqdata.init_progress_reporter.next_stage() - - # Resolve setscene inter-task dependencies - # e.g. do_sometask_setscene[depends] = "targetname:do_someothertask_setscene" - # Note that anything explicitly depended upon will have its reverse dependencies removed to avoid circular dependencies - for tid in self.rqdata.runq_setscene_tids: - (mc, fn, taskname, taskfn) = split_tid_mcfn(tid) - realtid = tid + "_setscene" - idepends = self.rqdata.taskData[mc].taskentries[realtid].idepends - self.stamps[tid] = bb.build.stampfile(taskname + "_setscene", self.rqdata.dataCaches[mc], taskfn, noextra=True) - for (depname, idependtask) in idepends: + self.holdoff_need_update = False - if depname not in self.rqdata.taskData[mc].build_targets: - continue + def process_possible_migrations(self): - depfn = self.rqdata.taskData[mc].build_targets[depname][0] - if depfn is None: - continue - deptid = depfn + ":" + idependtask.replace("_setscene", "") - if deptid not in self.rqdata.runtaskentries: - bb.msg.fatal("RunQueue", "Task %s depends upon non-existent task %s:%s" % (realtid, depfn, idependtask)) + changed = set() + toprocess = set() + for tid, unihash in self.updated_taskhash_queue.copy(): + if tid in self.runq_running and tid not in self.runq_complete: + continue - if not deptid in self.sq_harddeps: - self.sq_harddeps[deptid] = set() - self.sq_harddeps[deptid].add(tid) + self.updated_taskhash_queue.remove((tid, unihash)) + + if unihash != self.rqdata.runtaskentries[tid].unihash: + # Make sure we rehash any other tasks with the same task hash that we're deferred against. + torehash = [tid] + for deftid in self.sq_deferred: + if self.sq_deferred[deftid] == tid: + torehash.append(deftid) + for hashtid in torehash: + hashequiv_logger.verbose("Task %s unihash changed to %s" % (hashtid, unihash)) + self.rqdata.runtaskentries[hashtid].unihash = unihash + bb.parse.siggen.set_unihash(hashtid, unihash) + toprocess.add(hashtid) + if torehash: + # Need to save after set_unihash above + bb.parse.siggen.save_unitaskhashes() + + # Work out all tasks which depend upon these + total = set() + next = set() + for p in toprocess: + next |= self.rqdata.runtaskentries[p].revdeps + while next: + current = next.copy() + total = total | next + next = set() + for ntid in current: + next |= self.rqdata.runtaskentries[ntid].revdeps + next.difference_update(total) + + # Now iterate those tasks in dependency order to regenerate their taskhash/unihash + next = set() + for p in total: + if not self.rqdata.runtaskentries[p].depends: + next.add(p) + elif self.rqdata.runtaskentries[p].depends.isdisjoint(total): + next.add(p) + + # When an item doesn't have dependencies in total, we can process it. Drop items from total when handled + while next: + current = next.copy() + next = set() + for tid in current: + if self.rqdata.runtaskentries[p].depends and not self.rqdata.runtaskentries[tid].depends.isdisjoint(total): + continue + orighash = self.rqdata.runtaskentries[tid].hash + newhash = bb.parse.siggen.get_taskhash(tid, self.rqdata.runtaskentries[tid].depends, self.rqdata.dataCaches) + origuni = self.rqdata.runtaskentries[tid].unihash + newuni = bb.parse.siggen.get_unihash(tid) + # FIXME, need to check it can come from sstate at all for determinism? + remapped = False + if newuni == origuni: + # Nothing to do, we match, skip code below + remapped = True + elif tid in self.scenequeue_covered or tid in self.sq_live: + # Already ran this setscene task or it running. Report the new taskhash + bb.parse.siggen.report_unihash_equiv(tid, newhash, origuni, newuni, self.rqdata.dataCaches) + hashequiv_logger.verbose("Already covered setscene for %s so ignoring rehash (remap)" % (tid)) + remapped = True + + if not remapped: + #logger.debug("Task %s hash changes: %s->%s %s->%s" % (tid, orighash, newhash, origuni, newuni)) + self.rqdata.runtaskentries[tid].hash = newhash + self.rqdata.runtaskentries[tid].unihash = newuni + changed.add(tid) + + next |= self.rqdata.runtaskentries[tid].revdeps + total.remove(tid) + next.intersection_update(total) + + if changed: + for mc in self.rq.worker: + RunQueue.send_pickled_data(self.rq.worker[mc].process, bb.parse.siggen.get_taskhashes(), "newtaskhashes") + for mc in self.rq.fakeworker: + RunQueue.send_pickled_data(self.rq.fakeworker[mc].process, bb.parse.siggen.get_taskhashes(), "newtaskhashes") + + hashequiv_logger.debug(pprint.pformat("Tasks changed:\n%s" % (changed))) + + for tid in changed: + if tid not in self.rqdata.runq_setscene_tids: + continue + if tid not in self.pending_migrations: + self.pending_migrations.add(tid) + + update_tasks = [] + for tid in self.pending_migrations.copy(): + if tid in self.runq_running or tid in self.sq_live: + # Too late, task already running, not much we can do now + self.pending_migrations.remove(tid) + continue - sq_revdeps_squash[tid].add(deptid) - # Have to zero this to avoid circular dependencies - sq_revdeps_squash[deptid] = set() + valid = True + # Check no tasks this covers are running + for dep in self.sqdata.sq_covered_tasks[tid]: + if dep in self.runq_running and dep not in self.runq_complete: + hashequiv_logger.debug2("Task %s is running which blocks setscene for %s from running" % (dep, tid)) + valid = False + break + if not valid: + continue - self.rqdata.init_progress_reporter.next_stage() + self.pending_migrations.remove(tid) + changed = True - for task in self.sq_harddeps: - for dep in self.sq_harddeps[task]: - sq_revdeps_squash[dep].add(task) + if tid in self.tasks_scenequeue_done: + self.tasks_scenequeue_done.remove(tid) + for dep in self.sqdata.sq_covered_tasks[tid]: + if dep in self.runq_complete and dep not in self.runq_tasksrun: + bb.error("Task %s marked as completed but now needing to rerun? Halting build." % dep) + self.failed_tids.append(tid) + self.rq.state = runQueueCleanUp + return - self.rqdata.init_progress_reporter.next_stage() + if dep not in self.runq_complete: + if dep in self.tasks_scenequeue_done and dep not in self.sqdata.unskippable: + self.tasks_scenequeue_done.remove(dep) + + if tid in self.sq_buildable: + self.sq_buildable.remove(tid) + if tid in self.sq_running: + self.sq_running.remove(tid) + if tid in self.sqdata.outrightfail: + self.sqdata.outrightfail.remove(tid) + if tid in self.scenequeue_notcovered: + self.scenequeue_notcovered.remove(tid) + if tid in self.scenequeue_covered: + self.scenequeue_covered.remove(tid) + if tid in self.scenequeue_notneeded: + self.scenequeue_notneeded.remove(tid) - #for tid in sq_revdeps_squash: - # for dep in sq_revdeps_squash[tid]: - # data = data + "\n %s" % dep - # bb.warn("Task %s_setscene: is %s " % (tid, data + (mc, fn, taskname, taskfn) = split_tid_mcfn(tid) + self.sqdata.stamps[tid] = bb.parse.siggen.stampfile_mcfn(taskname, taskfn, extrainfo=False) - self.sq_deps = {} - self.sq_revdeps = sq_revdeps_squash - self.sq_revdeps2 = copy.deepcopy(self.sq_revdeps) - - for tid in self.sq_revdeps: - self.sq_deps[tid] = set() - for tid in self.sq_revdeps: - for dep in self.sq_revdeps[tid]: - self.sq_deps[dep].add(tid) - - self.rqdata.init_progress_reporter.next_stage() - - for tid in self.sq_revdeps: - if len(self.sq_revdeps[tid]) == 0: - self.runq_buildable.add(tid) - - self.rqdata.init_progress_reporter.finish() - - self.outrightfail = [] - if self.rq.hashvalidate: - sq_hash = [] - sq_hashfn = [] - sq_unihash = [] - sq_fn = [] - sq_taskname = [] - sq_task = [] - noexec = [] - stamppresent = [] - for tid in self.sq_revdeps: - (mc, fn, taskname, taskfn) = split_tid_mcfn(tid) + if tid in self.stampcache: + del self.stampcache[tid] - taskdep = self.rqdata.dataCaches[mc].task_deps[taskfn] + if tid in self.build_stamps: + del self.build_stamps[tid] - if 'noexec' in taskdep and taskname in taskdep['noexec']: - noexec.append(tid) - self.task_skip(tid) - bb.build.make_stamp(taskname + "_setscene", self.rqdata.dataCaches[mc], taskfn) - continue + update_tasks.append(tid) - if self.rq.check_stamp_task(tid, taskname + "_setscene", cache=self.stampcache): - logger.debug(2, 'Setscene stamp current for task %s', tid) - stamppresent.append(tid) - self.task_skip(tid) + update_tasks2 = [] + for tid in update_tasks: + harddepfail = False + for t in self.sqdata.sq_harddeps_rev[tid]: + if t in self.scenequeue_notcovered: + harddepfail = True + break + if not harddepfail and self.sqdata.sq_revdeps[tid].issubset(self.scenequeue_covered | self.scenequeue_notcovered): + if tid not in self.sq_buildable: + self.sq_buildable.add(tid) + if not self.sqdata.sq_revdeps[tid]: + self.sq_buildable.add(tid) + + update_tasks2.append((tid, harddepfail, tid in self.sqdata.valid)) + + if update_tasks2: + self.sqdone = False + for mc in sorted(self.sqdata.multiconfigs): + for tid in sorted([t[0] for t in update_tasks2]): + if mc_from_tid(tid) != mc: + continue + h = pending_hash_index(tid, self.rqdata) + if h in self.sqdata.hashes and tid != self.sqdata.hashes[h]: + self.sq_deferred[tid] = self.sqdata.hashes[h] + bb.note("Deferring %s after %s" % (tid, self.sqdata.hashes[h])) + update_scenequeue_data([t[0] for t in update_tasks2], self.sqdata, self.rqdata, self.rq, self.cooker, self.stampcache, self, summary=False) + + for (tid, harddepfail, origvalid) in update_tasks2: + if tid in self.sqdata.valid and not origvalid: + hashequiv_logger.verbose("Setscene task %s became valid" % tid) + if harddepfail: + logger.debug2("%s has an unavailable hard dependency so skipping" % (tid)) + self.sq_task_failoutright(tid) + + if changed: + self.stats.updateCovered(len(self.scenequeue_covered), len(self.scenequeue_notcovered)) + self.sq_needed_harddeps = set() + self.sq_harddep_deferred = set() + self.holdoff_need_update = True + + def scenequeue_updatecounters(self, task, fail=False): + + if fail and task in self.sqdata.sq_harddeps: + for dep in sorted(self.sqdata.sq_harddeps[task]): + if dep in self.scenequeue_covered or dep in self.scenequeue_notcovered: + # dependency could be already processed, e.g. noexec setscene task continue - - if self.rq.check_stamp_task(tid, taskname, recurse = True, cache=self.stampcache): - logger.debug(2, 'Normal stamp current for task %s', tid) - stamppresent.append(tid) - self.task_skip(tid) + noexec, stamppresent = check_setscene_stamps(dep, self.rqdata, self.rq, self.stampcache) + if noexec or stamppresent: continue + logger.debug2("%s was unavailable and is a hard dependency of %s so skipping" % (task, dep)) + self.sq_task_failoutright(dep) + continue + for dep in sorted(self.sqdata.sq_deps[task]): + if self.sqdata.sq_revdeps[dep].issubset(self.scenequeue_covered | self.scenequeue_notcovered): + if dep not in self.sq_buildable: + self.sq_buildable.add(dep) - sq_fn.append(fn) - sq_hashfn.append(self.rqdata.dataCaches[mc].hashfn[taskfn]) - sq_hash.append(self.rqdata.runtaskentries[tid].hash) - sq_unihash.append(self.rqdata.runtaskentries[tid].unihash) - sq_taskname.append(taskname) - sq_task.append(tid) - - self.cooker.data.setVar("BB_SETSCENE_STAMPCURRENT_COUNT", len(stamppresent)) - - valid = self.rq.validate_hash(sq_fn=sq_fn, sq_task=sq_taskname, sq_hash=sq_hash, sq_hashfn=sq_hashfn, - siginfo=False, sq_unihash=sq_unihash, d=self.cooker.data) - - self.cooker.data.delVar("BB_SETSCENE_STAMPCURRENT_COUNT") - - valid_new = stamppresent - for v in valid: - valid_new.append(sq_task[v]) - - for tid in self.sq_revdeps: - if tid not in valid_new and tid not in noexec: - logger.debug(2, 'No package found, so skipping setscene task %s', tid) - self.outrightfail.append(tid) - - logger.info('Executing SetScene Tasks') + next = set([task]) + while next: + new = set() + for t in sorted(next): + self.tasks_scenequeue_done.add(t) + # Look down the dependency chain for non-setscene things which this task depends on + # and mark as 'done' + for dep in self.rqdata.runtaskentries[t].depends: + if dep in self.rqdata.runq_setscene_tids or dep in self.tasks_scenequeue_done: + continue + if self.rqdata.runtaskentries[dep].revdeps.issubset(self.tasks_scenequeue_done): + new.add(dep) + next = new - self.rq.state = runQueueSceneRun + # If this task was one which other setscene tasks have a hard dependency upon, we need + # to walk through the hard dependencies and allow execution of those which have completed dependencies. + if task in self.sqdata.sq_harddeps: + for dep in self.sq_harddep_deferred.copy(): + if self.sqdata.sq_harddeps_rev[dep].issubset(self.scenequeue_covered | self.scenequeue_notcovered): + self.sq_harddep_deferred.remove(dep) - def scenequeue_updatecounters(self, task, fail = False): - for dep in self.sq_deps[task]: - if fail and task in self.sq_harddeps and dep in self.sq_harddeps[task]: - logger.debug(2, "%s was unavailable and is a hard dependency of %s so skipping" % (task, dep)) - self.scenequeue_updatecounters(dep, fail) - continue - if task not in self.sq_revdeps2[dep]: - # May already have been removed by the fail case above - continue - self.sq_revdeps2[dep].remove(task) - if len(self.sq_revdeps2[dep]) == 0: - self.runq_buildable.add(dep) + self.stats.updateCovered(len(self.scenequeue_covered), len(self.scenequeue_notcovered)) + self.holdoff_need_update = True - def task_completeoutright(self, task): + def sq_task_completeoutright(self, task): """ Mark a task as completed Look at the reverse dependencies and mark any task with completed dependencies as buildable """ - logger.debug(1, 'Found task %s which could be accelerated', task) + logger.debug('Found task %s which could be accelerated', task) self.scenequeue_covered.add(task) self.scenequeue_updatecounters(task) - def check_taskfail(self, task): - if self.rqdata.setscenewhitelist is not None: + def sq_check_taskfail(self, task): + if self.rqdata.setscene_ignore_tasks is not None: realtask = task.split('_setscene')[0] (mc, fn, taskname, taskfn) = split_tid_mcfn(realtask) pn = self.rqdata.dataCaches[mc].pkg_fn[taskfn] - if not check_setscene_enforce_whitelist(pn, taskname, self.rqdata.setscenewhitelist): + if not check_setscene_enforce_ignore_tasks(pn, taskname, self.rqdata.setscene_ignore_tasks): logger.error('Task %s.%s failed' % (pn, taskname + "_setscene")) self.rq.state = runQueueCleanUp - def task_complete(self, task): - self.stats.taskCompleted() + def sq_task_complete(self, task): bb.event.fire(sceneQueueTaskCompleted(task, self.stats, self.rq), self.cfgData) - self.task_completeoutright(task) + self.sq_task_completeoutright(task) - def task_fail(self, task, result): - self.stats.taskFailed() + def sq_task_fail(self, task, result): bb.event.fire(sceneQueueTaskFailed(task, self.stats, result, self), self.cfgData) self.scenequeue_notcovered.add(task) self.scenequeue_updatecounters(task, True) - self.check_taskfail(task) + self.sq_check_taskfail(task) - def task_failoutright(self, task): - self.runq_running.add(task) - self.runq_buildable.add(task) - self.stats.taskSkipped() - self.stats.taskCompleted() + def sq_task_failoutright(self, task): + self.sq_running.add(task) + self.sq_buildable.add(task) self.scenequeue_notcovered.add(task) self.scenequeue_updatecounters(task, True) - def task_skip(self, task): - self.runq_running.add(task) - self.runq_buildable.add(task) - self.task_completeoutright(task) - self.stats.taskSkipped() - self.stats.taskCompleted() - - def execute(self): - """ - Run the tasks in a queue prepared by prepare_runqueue - """ - - self.rq.read_workers() - - task = None - if self.can_start_task(): - # Find the next setscene to run - for nexttask in self.rqdata.runq_setscene_tids: - if nexttask in self.runq_buildable and nexttask not in self.runq_running and self.stamps[nexttask] not in self.build_stamps.values(): - if nexttask in self.unskippable: - logger.debug(2, "Setscene task %s is unskippable" % nexttask) - if nexttask not in self.unskippable and len(self.sq_revdeps[nexttask]) > 0 and self.sq_revdeps[nexttask].issubset(self.scenequeue_covered) and self.check_dependencies(nexttask, self.sq_revdeps[nexttask], True): - fn = fn_from_tid(nexttask) - foundtarget = False - - if nexttask in self.rqdata.target_tids: - foundtarget = True - if not foundtarget: - logger.debug(2, "Skipping setscene for task %s" % nexttask) - self.task_skip(nexttask) - self.scenequeue_notneeded.add(nexttask) - return True - if nexttask in self.outrightfail: - self.task_failoutright(nexttask) - return True - task = nexttask - break - if task is not None: - (mc, fn, taskname, taskfn) = split_tid_mcfn(task) - taskname = taskname + "_setscene" - if self.rq.check_stamp_task(task, taskname_from_tid(task), recurse = True, cache=self.stampcache): - logger.debug(2, 'Stamp for underlying task %s is current, so skipping setscene variant', task) - self.task_failoutright(task) - return True - - if self.cooker.configuration.force: - if task in self.rqdata.target_tids: - self.task_failoutright(task) - return True - - if self.rq.check_stamp_task(task, taskname, cache=self.stampcache): - logger.debug(2, 'Setscene stamp current task %s, so skip it and its dependencies', task) - self.task_skip(task) - return True - - startevent = sceneQueueTaskStarted(task, self.stats, self.rq) - bb.event.fire(startevent, self.cfgData) - - taskdepdata = self.build_taskdepdata(task) - - taskdep = self.rqdata.dataCaches[mc].task_deps[taskfn] - taskhash = self.rqdata.get_task_hash(task) - unihash = self.rqdata.get_task_unihash(task) - if 'fakeroot' in taskdep and taskname in taskdep['fakeroot'] and not self.cooker.configuration.dry_run: - if not mc in self.rq.fakeworker: - self.rq.start_fakeworker(self, mc) - self.rq.fakeworker[mc].process.stdin.write(b"<runtask>" + pickle.dumps((taskfn, task, taskname, taskhash, unihash, True, self.cooker.collection.get_file_appends(taskfn), taskdepdata, False)) + b"</runtask>") - self.rq.fakeworker[mc].process.stdin.flush() - else: - self.rq.worker[mc].process.stdin.write(b"<runtask>" + pickle.dumps((taskfn, task, taskname, taskhash, unihash, True, self.cooker.collection.get_file_appends(taskfn), taskdepdata, False)) + b"</runtask>") - self.rq.worker[mc].process.stdin.flush() - - self.build_stamps[task] = bb.build.stampfile(taskname, self.rqdata.dataCaches[mc], taskfn, noextra=True) - self.build_stamps2.append(self.build_stamps[task]) - self.runq_running.add(task) - self.stats.taskActive() - if self.can_start_task(): - return True - - if self.stats.active > 0: - self.rq.read_workers() - return self.rq.active_fds() - - #for tid in self.sq_revdeps: - # if tid not in self.runq_running: - # buildable = tid in self.runq_buildable - # revdeps = self.sq_revdeps[tid] - # bb.warn("Found we didn't run %s %s %s" % (tid, buildable, str(revdeps))) - - self.rq.scenequeue_covered = self.scenequeue_covered - self.rq.scenequeue_notcovered = self.scenequeue_notcovered - - logger.debug(1, 'We can skip tasks %s', "\n".join(sorted(self.rq.scenequeue_covered))) - - self.rq.state = runQueueRunInit - - completeevent = sceneQueueComplete(self.stats, self.rq) - bb.event.fire(completeevent, self.cfgData) - - return True - - def runqueue_process_waitpid(self, task, status): - RunQueueExecute.runqueue_process_waitpid(self, task, status) + def sq_task_skip(self, task): + self.sq_running.add(task) + self.sq_buildable.add(task) + self.sq_task_completeoutright(task) - - def build_taskdepdata(self, task): + def sq_build_taskdepdata(self, task): def getsetscenedeps(tid): deps = set() (mc, fn, taskname, _) = split_tid_mcfn(tid) @@ -2580,7 +2811,8 @@ class RunQueueExecuteScenequeue(RunQueueExecute): provides = self.rqdata.dataCaches[mc].fn_provides[taskfn] taskhash = self.rqdata.runtaskentries[revdep].hash unihash = self.rqdata.runtaskentries[revdep].unihash - taskdepdata[revdep] = [pn, taskname, fn, deps, provides, taskhash, unihash] + hashfn = self.rqdata.dataCaches[mc].hashfn[taskfn] + taskdepdata[revdep] = [pn, taskname, fn, deps, provides, taskhash, unihash, hashfn] for revdep2 in deps: if revdep2 not in taskdepdata: additional.append(revdep2) @@ -2589,6 +2821,302 @@ class RunQueueExecuteScenequeue(RunQueueExecute): #bb.note("Task %s: " % task + str(taskdepdata).replace("], ", "],\n")) return taskdepdata + def check_setscene_ignore_tasks(self, tid): + # Check task that is going to run against the ignore tasks list + (mc, fn, taskname, taskfn) = split_tid_mcfn(tid) + # Ignore covered tasks + if tid in self.tasks_covered: + return False + # Ignore stamped tasks + if self.rq.check_stamp_task(tid, taskname, cache=self.stampcache): + return False + # Ignore noexec tasks + taskdep = self.rqdata.dataCaches[mc].task_deps[taskfn] + if 'noexec' in taskdep and taskname in taskdep['noexec']: + return False + + pn = self.rqdata.dataCaches[mc].pkg_fn[taskfn] + if not check_setscene_enforce_ignore_tasks(pn, taskname, self.rqdata.setscene_ignore_tasks): + if tid in self.rqdata.runq_setscene_tids: + msg = ['Task %s.%s attempted to execute unexpectedly and should have been setscened' % (pn, taskname)] + else: + msg = ['Task %s.%s attempted to execute unexpectedly' % (pn, taskname)] + for t in self.scenequeue_notcovered: + msg.append("\nTask %s, unihash %s, taskhash %s" % (t, self.rqdata.runtaskentries[t].unihash, self.rqdata.runtaskentries[t].hash)) + msg.append('\nThis is usually due to missing setscene tasks. Those missing in this build were: %s' % pprint.pformat(self.scenequeue_notcovered)) + logger.error("".join(msg)) + return True + return False + +class SQData(object): + def __init__(self): + # SceneQueue dependencies + self.sq_deps = {} + # SceneQueue reverse dependencies + self.sq_revdeps = {} + # Injected inter-setscene task dependencies + self.sq_harddeps = {} + self.sq_harddeps_rev = {} + # Cache of stamp files so duplicates can't run in parallel + self.stamps = {} + # Setscene tasks directly depended upon by the build + self.unskippable = set() + # List of setscene tasks which aren't present + self.outrightfail = set() + # A list of normal tasks a setscene task covers + self.sq_covered_tasks = {} + +def build_scenequeue_data(sqdata, rqdata, sqrq): + + sq_revdeps = {} + sq_revdeps_squash = {} + sq_collated_deps = {} + + # We can't skip specified target tasks which aren't setscene tasks + sqdata.cantskip = set(rqdata.target_tids) + sqdata.cantskip.difference_update(rqdata.runq_setscene_tids) + sqdata.cantskip.intersection_update(rqdata.runtaskentries) + + # We need to construct a dependency graph for the setscene functions. Intermediate + # dependencies between the setscene tasks only complicate the code. This code + # therefore aims to collapse the huge runqueue dependency tree into a smaller one + # only containing the setscene functions. + + rqdata.init_progress_reporter.next_stage() + + # First process the chains up to the first setscene task. + endpoints = {} + for tid in rqdata.runtaskentries: + sq_revdeps[tid] = copy.copy(rqdata.runtaskentries[tid].revdeps) + sq_revdeps_squash[tid] = set() + if not sq_revdeps[tid] and tid not in rqdata.runq_setscene_tids: + #bb.warn("Added endpoint %s" % (tid)) + endpoints[tid] = set() + + rqdata.init_progress_reporter.next_stage() + + # Secondly process the chains between setscene tasks. + for tid in rqdata.runq_setscene_tids: + sq_collated_deps[tid] = set() + #bb.warn("Added endpoint 2 %s" % (tid)) + for dep in rqdata.runtaskentries[tid].depends: + if tid in sq_revdeps[dep]: + sq_revdeps[dep].remove(tid) + if dep not in endpoints: + endpoints[dep] = set() + #bb.warn(" Added endpoint 3 %s" % (dep)) + endpoints[dep].add(tid) + + rqdata.init_progress_reporter.next_stage() + + def process_endpoints(endpoints): + newendpoints = {} + for point, task in endpoints.items(): + tasks = set() + if task: + tasks |= task + if sq_revdeps_squash[point]: + tasks |= sq_revdeps_squash[point] + if point not in rqdata.runq_setscene_tids: + for t in tasks: + sq_collated_deps[t].add(point) + sq_revdeps_squash[point] = set() + if point in rqdata.runq_setscene_tids: + sq_revdeps_squash[point] = tasks + continue + for dep in rqdata.runtaskentries[point].depends: + if point in sq_revdeps[dep]: + sq_revdeps[dep].remove(point) + if tasks: + sq_revdeps_squash[dep] |= tasks + if not sq_revdeps[dep] and dep not in rqdata.runq_setscene_tids: + newendpoints[dep] = task + if newendpoints: + process_endpoints(newendpoints) + + process_endpoints(endpoints) + + rqdata.init_progress_reporter.next_stage() + + # Build a list of tasks which are "unskippable" + # These are direct endpoints referenced by the build upto and including setscene tasks + # Take the build endpoints (no revdeps) and find the sstate tasks they depend upon + new = True + for tid in rqdata.runtaskentries: + if not rqdata.runtaskentries[tid].revdeps: + sqdata.unskippable.add(tid) + sqdata.unskippable |= sqdata.cantskip + while new: + new = False + orig = sqdata.unskippable.copy() + for tid in sorted(orig, reverse=True): + if tid in rqdata.runq_setscene_tids: + continue + if not rqdata.runtaskentries[tid].depends: + # These are tasks which have no setscene tasks in their chain, need to mark as directly buildable + sqrq.setbuildable(tid) + sqdata.unskippable |= rqdata.runtaskentries[tid].depends + if sqdata.unskippable != orig: + new = True + + sqrq.tasks_scenequeue_done |= sqdata.unskippable.difference(rqdata.runq_setscene_tids) + + rqdata.init_progress_reporter.next_stage(len(rqdata.runtaskentries)) + + # Sanity check all dependencies could be changed to setscene task references + for taskcounter, tid in enumerate(rqdata.runtaskentries): + if tid in rqdata.runq_setscene_tids: + pass + elif sq_revdeps_squash[tid]: + bb.msg.fatal("RunQueue", "Something went badly wrong during scenequeue generation, halting. Please report this problem.") + else: + del sq_revdeps_squash[tid] + rqdata.init_progress_reporter.update(taskcounter) + + rqdata.init_progress_reporter.next_stage() + + # Resolve setscene inter-task dependencies + # e.g. do_sometask_setscene[depends] = "targetname:do_someothertask_setscene" + # Note that anything explicitly depended upon will have its reverse dependencies removed to avoid circular dependencies + for tid in rqdata.runq_setscene_tids: + (mc, fn, taskname, taskfn) = split_tid_mcfn(tid) + realtid = tid + "_setscene" + idepends = rqdata.taskData[mc].taskentries[realtid].idepends + sqdata.stamps[tid] = bb.parse.siggen.stampfile_mcfn(taskname, taskfn, extrainfo=False) + + sqdata.sq_harddeps_rev[tid] = set() + for (depname, idependtask) in idepends: + + if depname not in rqdata.taskData[mc].build_targets: + continue + + depfn = rqdata.taskData[mc].build_targets[depname][0] + if depfn is None: + continue + deptid = depfn + ":" + idependtask.replace("_setscene", "") + if deptid not in rqdata.runtaskentries: + bb.msg.fatal("RunQueue", "Task %s depends upon non-existent task %s:%s" % (realtid, depfn, idependtask)) + + logger.debug2("Adding hard setscene dependency %s for %s" % (deptid, tid)) + + if not deptid in sqdata.sq_harddeps: + sqdata.sq_harddeps[deptid] = set() + sqdata.sq_harddeps[deptid].add(tid) + sqdata.sq_harddeps_rev[tid].add(deptid) + + rqdata.init_progress_reporter.next_stage() + + rqdata.init_progress_reporter.next_stage() + + #for tid in sq_revdeps_squash: + # data = "" + # for dep in sq_revdeps_squash[tid]: + # data = data + "\n %s" % dep + # bb.warn("Task %s_setscene: is %s " % (tid, data)) + + sqdata.sq_revdeps = sq_revdeps_squash + sqdata.sq_covered_tasks = sq_collated_deps + + # Build reverse version of revdeps to populate deps structure + for tid in sqdata.sq_revdeps: + sqdata.sq_deps[tid] = set() + for tid in sqdata.sq_revdeps: + for dep in sqdata.sq_revdeps[tid]: + sqdata.sq_deps[dep].add(tid) + + rqdata.init_progress_reporter.next_stage() + + sqdata.multiconfigs = set() + for tid in sqdata.sq_revdeps: + sqdata.multiconfigs.add(mc_from_tid(tid)) + if not sqdata.sq_revdeps[tid]: + sqrq.sq_buildable.add(tid) + + rqdata.init_progress_reporter.next_stage() + + sqdata.noexec = set() + sqdata.stamppresent = set() + sqdata.valid = set() + + sqdata.hashes = {} + sqrq.sq_deferred = {} + for mc in sorted(sqdata.multiconfigs): + for tid in sorted(sqdata.sq_revdeps): + if mc_from_tid(tid) != mc: + continue + h = pending_hash_index(tid, rqdata) + if h not in sqdata.hashes: + sqdata.hashes[h] = tid + else: + sqrq.sq_deferred[tid] = sqdata.hashes[h] + bb.debug(1, "Deferring %s after %s" % (tid, sqdata.hashes[h])) + +def check_setscene_stamps(tid, rqdata, rq, stampcache, noexecstamp=False): + + (mc, fn, taskname, taskfn) = split_tid_mcfn(tid) + + taskdep = rqdata.dataCaches[mc].task_deps[taskfn] + + if 'noexec' in taskdep and taskname in taskdep['noexec']: + bb.build.make_stamp_mcfn(taskname + "_setscene", taskfn) + return True, False + + if rq.check_stamp_task(tid, taskname + "_setscene", cache=stampcache): + logger.debug2('Setscene stamp current for task %s', tid) + return False, True + + if rq.check_stamp_task(tid, taskname, recurse = True, cache=stampcache): + logger.debug2('Normal stamp current for task %s', tid) + return False, True + + return False, False + +def update_scenequeue_data(tids, sqdata, rqdata, rq, cooker, stampcache, sqrq, summary=True): + + tocheck = set() + + for tid in sorted(tids): + if tid in sqdata.stamppresent: + sqdata.stamppresent.remove(tid) + if tid in sqdata.valid: + sqdata.valid.remove(tid) + if tid in sqdata.outrightfail: + sqdata.outrightfail.remove(tid) + + noexec, stamppresent = check_setscene_stamps(tid, rqdata, rq, stampcache, noexecstamp=True) + + if noexec: + sqdata.noexec.add(tid) + sqrq.sq_task_skip(tid) + logger.debug2("%s is noexec so skipping setscene" % (tid)) + continue + + if stamppresent: + sqdata.stamppresent.add(tid) + sqrq.sq_task_skip(tid) + logger.debug2("%s has a valid stamp, skipping" % (tid)) + continue + + tocheck.add(tid) + + sqdata.valid |= rq.validate_hashes(tocheck, cooker.data, len(sqdata.stamppresent), False, summary=summary) + + for tid in tids: + if tid in sqdata.stamppresent: + continue + if tid in sqdata.valid: + continue + if tid in sqdata.noexec: + continue + if tid in sqrq.scenequeue_covered: + continue + if tid in sqrq.scenequeue_notcovered: + continue + if tid in sqrq.sq_deferred: + continue + sqdata.outrightfail.add(tid) + logger.debug2("%s already handled (fallthrough), skipping" % (tid)) + class TaskFailure(Exception): """ Exception raised when a task in a runqueue fails @@ -2651,12 +3179,16 @@ class runQueueTaskFailed(runQueueEvent): """ Event notifying a task failed """ - def __init__(self, task, stats, exitcode, rq): + def __init__(self, task, stats, exitcode, rq, fakeroot_log=None): runQueueEvent.__init__(self, task, stats, rq) self.exitcode = exitcode + self.fakeroot_log = fakeroot_log def __str__(self): - return "Task (%s) failed with exit code '%s'" % (self.taskstring, self.exitcode) + if self.fakeroot_log: + return "Task (%s) failed with exit code '%s' \nPseudo log:\n%s" % (self.taskstring, self.exitcode, self.fakeroot_log) + else: + return "Task (%s) failed with exit code '%s'" % (self.taskstring, self.exitcode) class sceneQueueTaskFailed(sceneQueueEvent): """ @@ -2695,22 +3227,29 @@ class runQueueTaskSkipped(runQueueEvent): runQueueEvent.__init__(self, task, stats, rq) self.reason = reason +class taskUniHashUpdate(bb.event.Event): + """ + Base runQueue event class + """ + def __init__(self, task, unihash): + self.taskid = task + self.unihash = unihash + bb.event.Event.__init__(self) + class runQueuePipe(): """ Abstraction for a pipe between a worker thread and the server """ - def __init__(self, pipein, pipeout, d, rq, rqexec): + def __init__(self, pipein, pipeout, d, rq, rqexec, fakerootlogs=None): self.input = pipein if pipeout: pipeout.close() bb.utils.nonblockingfd(self.input) - self.queue = b"" + self.queue = bytearray() self.d = d self.rq = rq self.rqexec = rqexec - - def setrunqueueexec(self, rqexec): - self.rqexec = rqexec + self.fakerootlogs = fakerootlogs def read(self): for workers, name in [(self.rq.worker, "Worker"), (self.rq.fakeworker, "Fakeroot")]: @@ -2722,21 +3261,28 @@ class runQueuePipe(): start = len(self.queue) try: - self.queue = self.queue + (self.input.read(102400) or b"") + self.queue.extend(self.input.read(102400) or b"") except (OSError, IOError) as e: if e.errno != errno.EAGAIN: raise end = len(self.queue) found = True - while found and len(self.queue): + while found and self.queue: found = False index = self.queue.find(b"</event>") while index != -1 and self.queue.startswith(b"<event>"): try: event = pickle.loads(self.queue[7:index]) - except ValueError as e: + except (ValueError, pickle.UnpicklingError, AttributeError, IndexError) as e: + if isinstance(e, pickle.UnpicklingError) and "truncated" in str(e): + # The pickled data could contain "</event>" so search for the next occurance + # unpickling again, this should be the only way an unpickle error could occur + index = self.queue.find(b"</event>", index + 1) + continue bb.msg.fatal("RunQueue", "failed load pickle '%s': '%s'" % (e, self.queue[7:index])) bb.event.fire_from_worker(event, self.d) + if isinstance(event, taskUniHashUpdate): + self.rqexec.updated_taskhash_queue.append((event.taskid, event.unihash)) found = True self.queue = self.queue[index+8:] index = self.queue.find(b"</event>") @@ -2744,9 +3290,13 @@ class runQueuePipe(): while index != -1 and self.queue.startswith(b"<exitcode>"): try: task, status = pickle.loads(self.queue[10:index]) - except ValueError as e: + except (ValueError, pickle.UnpicklingError, AttributeError, IndexError) as e: bb.msg.fatal("RunQueue", "failed load pickle '%s': '%s'" % (e, self.queue[10:index])) - self.rqexec.runqueue_process_waitpid(task, status) + (_, _, _, taskfn) = split_tid_mcfn(task) + fakerootlog = None + if self.fakerootlogs and taskfn and taskfn in self.fakerootlogs: + fakerootlog = self.fakerootlogs[taskfn] + self.rqexec.runqueue_process_waitpid(task, status, fakerootlog=fakerootlog) found = True self.queue = self.queue[index+11:] index = self.queue.find(b"</exitcode>") @@ -2755,30 +3305,29 @@ class runQueuePipe(): def close(self): while self.read(): continue - if len(self.queue) > 0: + if self.queue: print("Warning, worker left partial message: %s" % self.queue) self.input.close() -def get_setscene_enforce_whitelist(d): +def get_setscene_enforce_ignore_tasks(d, targets): if d.getVar('BB_SETSCENE_ENFORCE') != '1': return None - whitelist = (d.getVar("BB_SETSCENE_ENFORCE_WHITELIST") or "").split() + ignore_tasks = (d.getVar("BB_SETSCENE_ENFORCE_IGNORE_TASKS") or "").split() outlist = [] - for item in whitelist[:]: + for item in ignore_tasks[:]: if item.startswith('%:'): - for target in sys.argv[1:]: - if not target.startswith('-'): - outlist.append(target.split(':')[0] + ':' + item.split(':')[1]) + for (mc, target, task, fn) in targets: + outlist.append(target + ':' + item.split(':')[1]) else: outlist.append(item) return outlist -def check_setscene_enforce_whitelist(pn, taskname, whitelist): +def check_setscene_enforce_ignore_tasks(pn, taskname, ignore_tasks): import fnmatch - if whitelist is not None: + if ignore_tasks is not None: item = '%s:%s' % (pn, taskname) - for whitelist_item in whitelist: - if fnmatch.fnmatch(item, whitelist_item): + for ignore_tasks in ignore_tasks: + if fnmatch.fnmatch(item, ignore_tasks): return True return False return True |