diff options
-rwxr-xr-x | bitbake/bin/bitbake-worker | 6 | ||||
-rw-r--r-- | bitbake/lib/bb/runqueue.py | 133 | ||||
-rw-r--r-- | bitbake/lib/bb/siggen.py | 6 | ||||
-rw-r--r-- | meta/lib/oe/sstatesig.py | 4 |
4 files changed, 139 insertions, 10 deletions
diff --git a/bitbake/bin/bitbake-worker b/bitbake/bin/bitbake-worker index f63f060c577..3e502d5ca9f 100755 --- a/bitbake/bin/bitbake-worker +++ b/bitbake/bin/bitbake-worker @@ -234,6 +234,8 @@ def fork_off_task(cfg, data, databuilder, workerdata, fn, task, taskname, taskha the_data.setVar(varname, value) bb.parse.siggen.set_taskdata(workerdata["sigdata"]) + if "newhashes" in workerdata: + bb.parse.siggen.set_taskhashes(workerdata["newhashes"]) ret = 0 the_data = bb_cache.loadDataFull(fn, appends) @@ -377,6 +379,7 @@ class BitbakeWorker(object): self.handle_item(b"cookerconfig", self.handle_cookercfg) self.handle_item(b"extraconfigdata", self.handle_extraconfigdata) self.handle_item(b"workerdata", self.handle_workerdata) + self.handle_item(b"newtaskhashes", self.handle_newtaskhashes) self.handle_item(b"runtask", self.handle_runtask) self.handle_item(b"finishnow", self.handle_finishnow) self.handle_item(b"ping", self.handle_ping) @@ -416,6 +419,9 @@ class BitbakeWorker(object): for mc in self.databuilder.mcdata: self.databuilder.mcdata[mc].setVar("PRSERV_HOST", self.workerdata["prhost"]) + def handle_newtaskhashes(self, data): + self.workerdata["newhashes"] = pickle.loads(data) + def handle_ping(self, _): workerlog_write("Handling ping\n") diff --git a/bitbake/lib/bb/runqueue.py b/bitbake/lib/bb/runqueue.py index 6a2de240cc8..b281289ebe7 100644 --- a/bitbake/lib/bb/runqueue.py +++ b/bitbake/lib/bb/runqueue.py @@ -149,7 +149,7 @@ class RunQueueScheduler(object): Return the id of the first task we find that is buildable """ self.buildable = [x for x in self.buildable if x not in self.rq.runq_running] - buildable = [x for x in self.buildable if (x in self.rq.tasks_covered or x in self.rq.tasks_notcovered)] + buildable = [x for x in self.buildable if (x in self.rq.tasks_covered or x in self.rq.tasks_notcovered) and x not in self.rq.holdoff_tasks] if not buildable: return None @@ -206,6 +206,9 @@ class RunQueueScheduler(object): def newbuildable(self, task): self.buildable.append(task) + def removebuildable(self, task): + self.buildable.remove(task) + def describe_task(self, taskid): result = 'ID %s' % taskid if self.rev_prio_map: @@ -1718,6 +1721,8 @@ class RunQueueExecute: self.sq_running = set() self.sq_live = set() + self.changed_setscene = set() + self.runq_buildable = set() self.runq_running = set() self.runq_complete = set() @@ -1729,6 +1734,7 @@ class RunQueueExecute: self.stampcache = {} + self.holdoff_tasks = set() self.sqdone = False self.stats = RunQueueStats(len(self.rqdata.runtaskentries)) @@ -1924,6 +1930,7 @@ class RunQueueExecute: """ self.rq.read_workers() + self.process_possible_migrations() task = None if not self.sqdone and self.can_start_task(): @@ -2006,7 +2013,7 @@ class RunQueueExecute: if self.can_start_task(): return True - if not self.sq_live and not self.sqdone and not self.sq_deferred: + if not self.sq_live and not self.sqdone and not self.sq_deferred and not self.changed_setscene: logger.info("Setscene tasks completed") logger.debug(1, 'We could skip tasks %s', "\n".join(sorted(self.scenequeue_covered))) @@ -2166,6 +2173,104 @@ class RunQueueExecute: #bb.note("Task %s: " % task + str(taskdepdata).replace("], ", "],\n")) return taskdepdata + def updated_taskhash(self, tid, unihash): + changed = set() + if unihash != self.rqdata.runtaskentries[tid].unihash: + logger.info("Task %s unihash changed to %s" % (tid, unihash)) + self.rqdata.runtaskentries[tid].unihash = unihash + (mc, fn, taskname, taskfn) = split_tid_mcfn(tid) + bb.parse.siggen.set_unihash(taskfn + "." + taskname, unihash) + + next = set(self.rqdata.runtaskentries[tid].revdeps) + while next: + current = next + next = set() + for tid in current: + next |= self.rqdata.runtaskentries[tid].revdeps + procdep = [] + for dep in self.rqdata.runtaskentries[tid].depends: + procdep.append(fn_from_tid(dep) + "." + taskname_from_tid(dep)) + (mc, fn, taskname, taskfn) = split_tid_mcfn(tid) + orighash = self.rqdata.runtaskentries[tid].hash + self.rqdata.runtaskentries[tid].hash = bb.parse.siggen.get_taskhash(taskfn, taskname, procdep, self.rqdata.dataCaches[mc]) + origuni = self.rqdata.runtaskentries[tid].unihash + self.rqdata.runtaskentries[tid].unihash = bb.parse.siggen.get_unihash(taskfn + "." + taskname) + logger.debug(1, "Task %s hash changes: %s->%s %s->%s" % (tid, orighash, self.rqdata.runtaskentries[tid].hash, origuni, self.rqdata.runtaskentries[tid].unihash)) + if origuni != self.rqdata.runtaskentries[tid].unihash: + changed.add(tid) + if changed: + for mc in self.rq.worker: + self.rq.worker[mc].process.stdin.write(b"<newtaskhashes>" + pickle.dumps(bb.parse.siggen.get_taskhashes()) + b"</newtaskhashes>") + for mc in self.rq.fakeworker: + self.rq.fakeworker[mc].process.stdin.write(b"<newtaskhashes>" + pickle.dumps(bb.parse.siggen.get_taskhashes()) + b"</newtaskhashes>") + + logger.debug(1, pprint.pformat("Tasks changed:\n%s" % (changed))) + + for tid in changed: + if tid not in self.rqdata.runq_setscene_tids: + continue + valid = self.rq.validate_hashes(set([tid]), self.cooker.data, None, False) + if not valid: + continue + self.changed_setscene.add(tid) + + if changed: + self.update_holdofftasks() + + def update_holdofftasks(self): + self.holdoff_tasks = set() + for tid in self.changed_setscene.copy(): + for dep in self.sqdata.sq_covered_tasks[tid]: + if dep not in self.runq_complete: + self.holdoff_tasks.add(dep) + logger.debug(2, "Holding off tasks %s" % str(self.holdoff_tasks)) + + def process_possible_migrations(self): + changes = False + for tid in self.changed_setscene.copy(): + valid = True + # Check no tasks this covers are running + for dep in self.sqdata.sq_covered_tasks[tid]: + if dep in self.runq_running and dep not in self.runq_complete: + logger.debug(2, "Task %s is running which blocks setscene for %s from running" % (dep, tid)) + valid = False + break + if not valid: + continue + + for dep in self.sqdata.sq_covered_tasks[tid]: + if dep in self.runq_buildable and dep not in self.runq_complete: + self.runq_buildable.remove(dep) + self.sched.removebuildable(dep) + if dep not in self.runq_complete: + self.tasks_scenequeue_done.remove(dep) + self.tasks_notcovered.remove(dep) + + if tid in self.sq_buildable: + self.sq_buildable.remove(tid) + if tid in self.sq_running: + self.sq_running.remove(tid) + if self.sqdata.sq_revdeps[tid].issubset(self.scenequeue_covered | self.scenequeue_notcovered): + if dep not in self.sq_buildable: + self.sq_buildable.add(tid) + + self.sqdata.outrightfail.remove(tid) + self.scenequeue_notcovered.remove(tid) + + (mc, fn, taskname, taskfn) = split_tid_mcfn(tid) + self.sqdata.stamps[tid] = bb.build.stampfile(taskname + "_setscene", self.rqdata.dataCaches[mc], taskfn, noextra=True) + + if tid in self.build_stamps: + del self.build_stamps[tid] + + logger.info("Setscene task %s now valid and being rerun" % tid) + self.sqdone = False + self.changed_setscene.remove(tid) + changes = True + + if changes: + self.update_holdofftasks() + def scenequeue_process_notcovered(self, task): if len(self.rqdata.runtaskentries[task].depends) == 0: self.setbuildable(task) @@ -2206,12 +2311,9 @@ class RunQueueExecute: logger.debug(2, "%s was unavailable and is a hard dependency of %s so skipping" % (task, dep)) self.sq_task_failoutright(dep) continue - if task not in self.sqdata.sq_revdeps2[dep]: - # May already have been removed by the fail case above - continue - self.sqdata.sq_revdeps2[dep].remove(task) - if len(self.sqdata.sq_revdeps2[dep]) == 0: - self.sq_buildable.add(dep) + if self.sqdata.sq_revdeps[dep].issubset(self.scenequeue_covered | self.scenequeue_notcovered): + if dep not in self.sq_buildable: + self.sq_buildable.add(dep) next = set([task]) while next: @@ -2258,6 +2360,8 @@ class RunQueueExecute: for tid in covered: if len(self.rqdata.runtaskentries[tid].depends) == 0: self.setbuildable(tid) + if self.rqdata.runtaskentries[tid].depends.issubset(self.runq_complete): + self.setbuildable(tid) def sq_task_completeoutright(self, task): """ @@ -2377,8 +2481,6 @@ class SQData(object): self.sq_deps = {} # SceneQueue reverse dependencies self.sq_revdeps = {} - # Copy of reverse dependencies used by sq processing code - self.sq_revdeps2 = {} # Injected inter-setscene task dependencies self.sq_harddeps = {} # Cache of stamp files so duplicates can't run in parallel @@ -2715,6 +2817,15 @@ class runQueueTaskSkipped(runQueueEvent): runQueueEvent.__init__(self, task, stats, rq) self.reason = reason +class taskUniHashUpdate(bb.event.Event): + """ + Base runQueue event class + """ + def __init__(self, task, unihash): + self.taskid = task + self.unihash = unihash + bb.event.Event.__init__(self) + class runQueuePipe(): """ Abstraction for a pipe between a worker thread and the server @@ -2757,6 +2868,8 @@ class runQueuePipe(): except ValueError as e: bb.msg.fatal("RunQueue", "failed load pickle '%s': '%s'" % (e, self.queue[7:index])) bb.event.fire_from_worker(event, self.d) + if isinstance(event, taskUniHashUpdate): + self.rqexec.updated_taskhash(event.taskid, event.unihash) found = True self.queue = self.queue[index+8:] index = self.queue.find(b"</event>") diff --git a/bitbake/lib/bb/siggen.py b/bitbake/lib/bb/siggen.py index 6a729f3b1ec..02bac57e73d 100644 --- a/bitbake/lib/bb/siggen.py +++ b/bitbake/lib/bb/siggen.py @@ -81,6 +81,12 @@ class SignatureGenerator(object): def reset(self, data): self.__init__(data) + def get_taskhashes(self): + return self.taskhash + + def set_taskhashes(self, hashes): + self.taskhash = hashes + class SignatureGeneratorBasic(SignatureGenerator): """ diff --git a/meta/lib/oe/sstatesig.py b/meta/lib/oe/sstatesig.py index 13af16e473c..a6af9a366f1 100644 --- a/meta/lib/oe/sstatesig.py +++ b/meta/lib/oe/sstatesig.py @@ -298,6 +298,9 @@ class SignatureGeneratorOEEquivHash(SignatureGeneratorOEBasicHash): return super().get_stampfile_hash(task) + def set_unihash(self, task, unihash): + self.unihashes[self.__get_task_unihash_key(task)] = unihash + def get_unihash(self, task): import urllib import json @@ -419,6 +422,7 @@ class SignatureGeneratorOEEquivHash(SignatureGeneratorOEBasicHash): if new_unihash != unihash: bb.debug(1, 'Task %s unihash changed %s -> %s by server %s' % (taskhash, unihash, new_unihash, self.server)) + bb.event.fire(bb.runqueue.taskUniHashUpdate(fn + ':do_' + task, new_unihash), d) else: bb.debug(1, 'Reported task %s as unihash %s to %s' % (taskhash, unihash, self.server)) except urllib.error.URLError as e: |