#!/usr/bin/env python3 # bblock # lock/unlock task to latest signature # # Copyright (c) 2023 BayLibre, SAS # Author: Julien Stepahn # # SPDX-License-Identifier: GPL-2.0-only # import os import sys import logging scripts_path = os.path.dirname(os.path.realpath(__file__)) lib_path = scripts_path + "/lib" sys.path = sys.path + [lib_path] import scriptpath scriptpath.add_bitbake_lib_path() import bb.tinfoil import bb.msg import argparse_oe myname = os.path.basename(sys.argv[0]) logger = bb.msg.logger_create(myname) def getTaskSignatures(tinfoil, pn, tasks): tinfoil.set_event_mask( [ "bb.event.GetTaskSignatureResult", "logging.LogRecord", "bb.command.CommandCompleted", "bb.command.CommandFailed", ] ) ret = tinfoil.run_command("getTaskSignatures", pn, tasks) if ret: while True: event = tinfoil.wait_event(1) if event: if isinstance(event, bb.command.CommandCompleted): break elif isinstance(event, bb.command.CommandFailed): logger.error(str(event)) sys.exit(2) elif isinstance(event, bb.event.GetTaskSignatureResult): sig = event.sig elif isinstance(event, logging.LogRecord): logger.handle(event) else: logger.error("No result returned from getTaskSignatures command") sys.exit(2) return sig def parseRecipe(tinfoil, recipe): try: tinfoil.parse_recipes() d = tinfoil.parse_recipe(recipe) except Exception: logger.error("Failed to get recipe info for: %s" % recipe) sys.exit(1) return d def bblockDump(lockfile): try: with open(lockfile, "r") as lockfile: for line in lockfile: print(line.strip()) except IOError: return 1 return 0 def bblockReset(lockfile, pns, package_archs, tasks): if not pns: logger.info("Unlocking all recipes") try: os.remove(lockfile) except FileNotFoundError: pass else: logger.info("Unlocking {pns}".format(pns=pns)) tmp_lockfile = lockfile + ".tmp" with open(lockfile, "r") as infile, open(tmp_lockfile, "w") as outfile: for line in infile: if not ( any(element in line for element in pns) and any(element in line for element in package_archs.split()) ): outfile.write(line) else: if tasks and not any(element in line for element in tasks): outfile.write(line) os.remove(lockfile) os.rename(tmp_lockfile, lockfile) def main(): parser = argparse_oe.ArgumentParser(description="Lock and unlock a recipe") parser.add_argument("pn", nargs="*", help="Space separated list of recipe to lock") parser.add_argument( "-t", "--tasks", help="Comma separated list of tasks", type=lambda s: [ task if task.startswith("do_") else "do_" + task for task in s.split(",") ], ) parser.add_argument( "-r", "--reset", action="store_true", help="Unlock pn recipes, or all recipes if pn is empty", ) parser.add_argument( "-d", "--dump", action="store_true", help="Dump generated bblock.conf file", ) global_args, unparsed_args = parser.parse_known_args() with bb.tinfoil.Tinfoil() as tinfoil: tinfoil.prepare(config_only=True) package_archs = tinfoil.config_data.getVar("PACKAGE_ARCHS") builddir = tinfoil.config_data.getVar("TOPDIR") lockfile = "{builddir}/conf/bblock.conf".format(builddir=builddir) if global_args.dump: bblockDump(lockfile) return 0 if global_args.reset: bblockReset(lockfile, global_args.pn, package_archs, global_args.tasks) return 0 with open(lockfile, "a") as lockfile: s = "" if lockfile.tell() == 0: s = "# Generated by bblock\n" s += 'SIGGEN_LOCKEDSIGS_TASKSIG_CHECK = "info"\n' s += 'SIGGEN_LOCKEDSIGS_TYPES += "${PACKAGE_ARCHS}"\n' s += "\n" for pn in global_args.pn: d = parseRecipe(tinfoil, pn) package_arch = d.getVar("PACKAGE_ARCH") siggen_locked_sigs_package_arch = d.getVar( "SIGGEN_LOCKEDSIGS_{package_arch}".format(package_arch=package_arch) ) sigs = getTaskSignatures(tinfoil, [pn], global_args.tasks) for sig in sigs: new_entry = "{pn}:{taskname}:{sig}".format( pn=sig[0], taskname=sig[1], sig=sig[2] ) if ( siggen_locked_sigs_package_arch and not new_entry in siggen_locked_sigs_package_arch ) or not siggen_locked_sigs_package_arch: s += 'SIGGEN_LOCKEDSIGS_{package_arch} += "{new_entry}"\n'.format( package_arch=package_arch, new_entry=new_entry ) lockfile.write(s) return 0 if __name__ == "__main__": try: ret = main() except Exception: ret = 1 import traceback traceback.print_exc() sys.exit(ret)