summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rwxr-xr-xbitbake/bin/bitbake-worker6
-rw-r--r--bitbake/lib/bb/runqueue.py133
-rw-r--r--bitbake/lib/bb/siggen.py6
-rw-r--r--meta/lib/oe/sstatesig.py4
4 files changed, 139 insertions, 10 deletions
diff --git a/bitbake/bin/bitbake-worker b/bitbake/bin/bitbake-worker
index f63f060c577..3e502d5ca9f 100755
--- a/bitbake/bin/bitbake-worker
+++ b/bitbake/bin/bitbake-worker
@@ -234,6 +234,8 @@ def fork_off_task(cfg, data, databuilder, workerdata, fn, task, taskname, taskha
the_data.setVar(varname, value)
bb.parse.siggen.set_taskdata(workerdata["sigdata"])
+ if "newhashes" in workerdata:
+ bb.parse.siggen.set_taskhashes(workerdata["newhashes"])
ret = 0
the_data = bb_cache.loadDataFull(fn, appends)
@@ -377,6 +379,7 @@ class BitbakeWorker(object):
self.handle_item(b"cookerconfig", self.handle_cookercfg)
self.handle_item(b"extraconfigdata", self.handle_extraconfigdata)
self.handle_item(b"workerdata", self.handle_workerdata)
+ self.handle_item(b"newtaskhashes", self.handle_newtaskhashes)
self.handle_item(b"runtask", self.handle_runtask)
self.handle_item(b"finishnow", self.handle_finishnow)
self.handle_item(b"ping", self.handle_ping)
@@ -416,6 +419,9 @@ class BitbakeWorker(object):
for mc in self.databuilder.mcdata:
self.databuilder.mcdata[mc].setVar("PRSERV_HOST", self.workerdata["prhost"])
+ def handle_newtaskhashes(self, data):
+ self.workerdata["newhashes"] = pickle.loads(data)
+
def handle_ping(self, _):
workerlog_write("Handling ping\n")
diff --git a/bitbake/lib/bb/runqueue.py b/bitbake/lib/bb/runqueue.py
index 6a2de240cc8..b281289ebe7 100644
--- a/bitbake/lib/bb/runqueue.py
+++ b/bitbake/lib/bb/runqueue.py
@@ -149,7 +149,7 @@ class RunQueueScheduler(object):
Return the id of the first task we find that is buildable
"""
self.buildable = [x for x in self.buildable if x not in self.rq.runq_running]
- buildable = [x for x in self.buildable if (x in self.rq.tasks_covered or x in self.rq.tasks_notcovered)]
+ buildable = [x for x in self.buildable if (x in self.rq.tasks_covered or x in self.rq.tasks_notcovered) and x not in self.rq.holdoff_tasks]
if not buildable:
return None
@@ -206,6 +206,9 @@ class RunQueueScheduler(object):
def newbuildable(self, task):
self.buildable.append(task)
+ def removebuildable(self, task):
+ self.buildable.remove(task)
+
def describe_task(self, taskid):
result = 'ID %s' % taskid
if self.rev_prio_map:
@@ -1718,6 +1721,8 @@ class RunQueueExecute:
self.sq_running = set()
self.sq_live = set()
+ self.changed_setscene = set()
+
self.runq_buildable = set()
self.runq_running = set()
self.runq_complete = set()
@@ -1729,6 +1734,7 @@ class RunQueueExecute:
self.stampcache = {}
+ self.holdoff_tasks = set()
self.sqdone = False
self.stats = RunQueueStats(len(self.rqdata.runtaskentries))
@@ -1924,6 +1930,7 @@ class RunQueueExecute:
"""
self.rq.read_workers()
+ self.process_possible_migrations()
task = None
if not self.sqdone and self.can_start_task():
@@ -2006,7 +2013,7 @@ class RunQueueExecute:
if self.can_start_task():
return True
- if not self.sq_live and not self.sqdone and not self.sq_deferred:
+ if not self.sq_live and not self.sqdone and not self.sq_deferred and not self.changed_setscene:
logger.info("Setscene tasks completed")
logger.debug(1, 'We could skip tasks %s', "\n".join(sorted(self.scenequeue_covered)))
@@ -2166,6 +2173,104 @@ class RunQueueExecute:
#bb.note("Task %s: " % task + str(taskdepdata).replace("], ", "],\n"))
return taskdepdata
+ def updated_taskhash(self, tid, unihash):
+ changed = set()
+ if unihash != self.rqdata.runtaskentries[tid].unihash:
+ logger.info("Task %s unihash changed to %s" % (tid, unihash))
+ self.rqdata.runtaskentries[tid].unihash = unihash
+ (mc, fn, taskname, taskfn) = split_tid_mcfn(tid)
+ bb.parse.siggen.set_unihash(taskfn + "." + taskname, unihash)
+
+ next = set(self.rqdata.runtaskentries[tid].revdeps)
+ while next:
+ current = next
+ next = set()
+ for tid in current:
+ next |= self.rqdata.runtaskentries[tid].revdeps
+ procdep = []
+ for dep in self.rqdata.runtaskentries[tid].depends:
+ procdep.append(fn_from_tid(dep) + "." + taskname_from_tid(dep))
+ (mc, fn, taskname, taskfn) = split_tid_mcfn(tid)
+ orighash = self.rqdata.runtaskentries[tid].hash
+ self.rqdata.runtaskentries[tid].hash = bb.parse.siggen.get_taskhash(taskfn, taskname, procdep, self.rqdata.dataCaches[mc])
+ origuni = self.rqdata.runtaskentries[tid].unihash
+ self.rqdata.runtaskentries[tid].unihash = bb.parse.siggen.get_unihash(taskfn + "." + taskname)
+ logger.debug(1, "Task %s hash changes: %s->%s %s->%s" % (tid, orighash, self.rqdata.runtaskentries[tid].hash, origuni, self.rqdata.runtaskentries[tid].unihash))
+ if origuni != self.rqdata.runtaskentries[tid].unihash:
+ changed.add(tid)
+ if changed:
+ for mc in self.rq.worker:
+ self.rq.worker[mc].process.stdin.write(b"<newtaskhashes>" + pickle.dumps(bb.parse.siggen.get_taskhashes()) + b"</newtaskhashes>")
+ for mc in self.rq.fakeworker:
+ self.rq.fakeworker[mc].process.stdin.write(b"<newtaskhashes>" + pickle.dumps(bb.parse.siggen.get_taskhashes()) + b"</newtaskhashes>")
+
+ logger.debug(1, pprint.pformat("Tasks changed:\n%s" % (changed)))
+
+ for tid in changed:
+ if tid not in self.rqdata.runq_setscene_tids:
+ continue
+ valid = self.rq.validate_hashes(set([tid]), self.cooker.data, None, False)
+ if not valid:
+ continue
+ self.changed_setscene.add(tid)
+
+ if changed:
+ self.update_holdofftasks()
+
+ def update_holdofftasks(self):
+ self.holdoff_tasks = set()
+ for tid in self.changed_setscene.copy():
+ for dep in self.sqdata.sq_covered_tasks[tid]:
+ if dep not in self.runq_complete:
+ self.holdoff_tasks.add(dep)
+ logger.debug(2, "Holding off tasks %s" % str(self.holdoff_tasks))
+
+ def process_possible_migrations(self):
+ changes = False
+ for tid in self.changed_setscene.copy():
+ valid = True
+ # Check no tasks this covers are running
+ for dep in self.sqdata.sq_covered_tasks[tid]:
+ if dep in self.runq_running and dep not in self.runq_complete:
+ logger.debug(2, "Task %s is running which blocks setscene for %s from running" % (dep, tid))
+ valid = False
+ break
+ if not valid:
+ continue
+
+ for dep in self.sqdata.sq_covered_tasks[tid]:
+ if dep in self.runq_buildable and dep not in self.runq_complete:
+ self.runq_buildable.remove(dep)
+ self.sched.removebuildable(dep)
+ if dep not in self.runq_complete:
+ self.tasks_scenequeue_done.remove(dep)
+ self.tasks_notcovered.remove(dep)
+
+ if tid in self.sq_buildable:
+ self.sq_buildable.remove(tid)
+ if tid in self.sq_running:
+ self.sq_running.remove(tid)
+ if self.sqdata.sq_revdeps[tid].issubset(self.scenequeue_covered | self.scenequeue_notcovered):
+ if dep not in self.sq_buildable:
+ self.sq_buildable.add(tid)
+
+ self.sqdata.outrightfail.remove(tid)
+ self.scenequeue_notcovered.remove(tid)
+
+ (mc, fn, taskname, taskfn) = split_tid_mcfn(tid)
+ self.sqdata.stamps[tid] = bb.build.stampfile(taskname + "_setscene", self.rqdata.dataCaches[mc], taskfn, noextra=True)
+
+ if tid in self.build_stamps:
+ del self.build_stamps[tid]
+
+ logger.info("Setscene task %s now valid and being rerun" % tid)
+ self.sqdone = False
+ self.changed_setscene.remove(tid)
+ changes = True
+
+ if changes:
+ self.update_holdofftasks()
+
def scenequeue_process_notcovered(self, task):
if len(self.rqdata.runtaskentries[task].depends) == 0:
self.setbuildable(task)
@@ -2206,12 +2311,9 @@ class RunQueueExecute:
logger.debug(2, "%s was unavailable and is a hard dependency of %s so skipping" % (task, dep))
self.sq_task_failoutright(dep)
continue
- if task not in self.sqdata.sq_revdeps2[dep]:
- # May already have been removed by the fail case above
- continue
- self.sqdata.sq_revdeps2[dep].remove(task)
- if len(self.sqdata.sq_revdeps2[dep]) == 0:
- self.sq_buildable.add(dep)
+ if self.sqdata.sq_revdeps[dep].issubset(self.scenequeue_covered | self.scenequeue_notcovered):
+ if dep not in self.sq_buildable:
+ self.sq_buildable.add(dep)
next = set([task])
while next:
@@ -2258,6 +2360,8 @@ class RunQueueExecute:
for tid in covered:
if len(self.rqdata.runtaskentries[tid].depends) == 0:
self.setbuildable(tid)
+ if self.rqdata.runtaskentries[tid].depends.issubset(self.runq_complete):
+ self.setbuildable(tid)
def sq_task_completeoutright(self, task):
"""
@@ -2377,8 +2481,6 @@ class SQData(object):
self.sq_deps = {}
# SceneQueue reverse dependencies
self.sq_revdeps = {}
- # Copy of reverse dependencies used by sq processing code
- self.sq_revdeps2 = {}
# Injected inter-setscene task dependencies
self.sq_harddeps = {}
# Cache of stamp files so duplicates can't run in parallel
@@ -2715,6 +2817,15 @@ class runQueueTaskSkipped(runQueueEvent):
runQueueEvent.__init__(self, task, stats, rq)
self.reason = reason
+class taskUniHashUpdate(bb.event.Event):
+ """
+ Base runQueue event class
+ """
+ def __init__(self, task, unihash):
+ self.taskid = task
+ self.unihash = unihash
+ bb.event.Event.__init__(self)
+
class runQueuePipe():
"""
Abstraction for a pipe between a worker thread and the server
@@ -2757,6 +2868,8 @@ class runQueuePipe():
except ValueError as e:
bb.msg.fatal("RunQueue", "failed load pickle '%s': '%s'" % (e, self.queue[7:index]))
bb.event.fire_from_worker(event, self.d)
+ if isinstance(event, taskUniHashUpdate):
+ self.rqexec.updated_taskhash(event.taskid, event.unihash)
found = True
self.queue = self.queue[index+8:]
index = self.queue.find(b"</event>")
diff --git a/bitbake/lib/bb/siggen.py b/bitbake/lib/bb/siggen.py
index 6a729f3b1ec..02bac57e73d 100644
--- a/bitbake/lib/bb/siggen.py
+++ b/bitbake/lib/bb/siggen.py
@@ -81,6 +81,12 @@ class SignatureGenerator(object):
def reset(self, data):
self.__init__(data)
+ def get_taskhashes(self):
+ return self.taskhash
+
+ def set_taskhashes(self, hashes):
+ self.taskhash = hashes
+
class SignatureGeneratorBasic(SignatureGenerator):
"""
diff --git a/meta/lib/oe/sstatesig.py b/meta/lib/oe/sstatesig.py
index 13af16e473c..a6af9a366f1 100644
--- a/meta/lib/oe/sstatesig.py
+++ b/meta/lib/oe/sstatesig.py
@@ -298,6 +298,9 @@ class SignatureGeneratorOEEquivHash(SignatureGeneratorOEBasicHash):
return super().get_stampfile_hash(task)
+ def set_unihash(self, task, unihash):
+ self.unihashes[self.__get_task_unihash_key(task)] = unihash
+
def get_unihash(self, task):
import urllib
import json
@@ -419,6 +422,7 @@ class SignatureGeneratorOEEquivHash(SignatureGeneratorOEBasicHash):
if new_unihash != unihash:
bb.debug(1, 'Task %s unihash changed %s -> %s by server %s' % (taskhash, unihash, new_unihash, self.server))
+ bb.event.fire(bb.runqueue.taskUniHashUpdate(fn + ':do_' + task, new_unihash), d)
else:
bb.debug(1, 'Reported task %s as unihash %s to %s' % (taskhash, unihash, self.server))
except urllib.error.URLError as e: