#! /usr/bin/env python3 import asyncio import json import os import re import signal import sys import subprocess import pathlib import logging logger = logging.getLogger("RunFVP") # TODO: option to switch between telnet and netcat connect_command = "telnet localhost %port" terminal_map = { "tmux": f"tmux new-window -n \"%title\" \"{connect_command}\"", "xterm": f"xterm -title \"%title\" -e {connect_command}", "none": "" # TODO more terminals } def get_default_terminal(): import shutil import shlex for terminal in "xterm",: command = shlex.split(terminal_map[terminal])[0] if shutil.which(command): return terminal return "none" def get_image_directory(machine=None): """ Get the DEPLOY_DIR_IMAGE for the specified machine (or the configured machine if not set). """ try: import bb.tinfoil except ImportError: logger.error("Cannot connect to BitBake, did you oe-init-build-env?") sys.exit(1) if machine: os.environ["MACHINE"] = machine with bb.tinfoil.Tinfoil() as tinfoil: tinfoil.prepare(config_only=True) image_dir = tinfoil.config_data.getVar("DEPLOY_DIR_IMAGE") logger.debug(f"Got DEPLOY_DIR_IMAGE {image_dir}") return pathlib.Path(image_dir) def parse_args(arguments): import argparse parser = argparse.ArgumentParser(description="Run images in a FVP") parser.add_argument("config", nargs="?", help="Machine name or path to .fvpconf file") group = parser.add_mutually_exclusive_group() group.add_argument("-t", "--terminals", choices=terminal_map.keys(), default=get_default_terminal(), help="Automatically start terminals (default: %(default)s)") group.add_argument("-c", "--console", action="store_true", help="Attach the first uart to stdin/stdout") parser.add_argument("--verbose", action="store_true", help="Output verbose logging") parser.usage = f"{parser.format_usage().strip()} -- [ arguments passed to FVP ]" # TODO option for telnet vs netcat # If the arguments contains -- then everything after it should be passed to the FVP binary directly. if "--" in arguments: i = arguments.index("--") fvp_args = arguments[i+1:] arguments = arguments[:i] else: fvp_args = [] args = parser.parse_args(args=arguments) logging.basicConfig(level=args.verbose and logging.DEBUG or logging.WARNING) # If we're hooking up the console, don't start any terminals if args.console: args.terminals = "none" logger.debug(f"Parsed arguments: {vars(args)}") logger.debug(f"FVP arguments: {fvp_args}") return args, fvp_args def find_config(args): if args.config and os.path.exists(args.config): return args.config else: image_dir = get_image_directory(args.config) # All .fvpconf configuration files configs = image_dir.glob("*.fvpconf") # Just the files configs = [p for p in configs if p.is_file() and not p.is_symlink()] if not configs: print(f"Cannot find any .fvpconf in {image_dir}") sys.exit(1) # Sorted by modification time configs = sorted(configs, key=lambda p: p.stat().st_mtime) return configs[-1] def load_config(config_file): logger.debug(f"Loading {config_file}") with open(config_file) as f: config = json.load(f) # Ensure that all expected keys are present def sanitise(key, value): if key not in config or config[key] is None: config[key] = value sanitise("fvp-bindir", "") sanitise("exe", "") sanitise("parameters", {}) sanitise("data", {}) sanitise("applications", {}) sanitise("terminals", {}) sanitise("args", []) sanitise("console", "") if not config["exe"]: logger.error("Required value FVP_EXE not set in machine configuration") sys.exit(1) return config def parse_config(args, config): cli = [] if config["fvp-bindir"]: cli.append(os.path.join(config["fvp-bindir"], config["exe"])) else: cli.append(config["exe"]) for param, value in config["parameters"].items(): cli.extend(["--parameter", f"{param}={value}"]) for value in config["data"]: cli.extend(["--data", value]) for param, value in config["applications"].items(): cli.extend(["--application", f"{param}={value}"]) for terminal, name in config["terminals"].items(): # If terminals are enabled and this terminal has been named if args.terminals != "none" and name: # TODO if raw mode # cli.extend(["--parameter", f"{terminal}.mode=raw"]) # TODO put name into terminal title cli.extend(["--parameter", f"{terminal}.terminal_command={terminal_map[args.terminals]}"]) else: # Disable terminal cli.extend(["--parameter", f"{terminal}.start_telnet=0"]) cli.extend(config["args"]) return cli async def start_fvp(cli, console_cb): try: fvp_process = await asyncio.create_subprocess_exec(*cli, stdin=subprocess.DEVNULL, stdout=subprocess.PIPE, stderr=subprocess.STDOUT) async for line in fvp_process.stdout: line = line.strip().decode("utf-8", errors="replace") if console_cb: logger.debug(f"FVP output: {line}") else: print(line) # Look for serial connections opening if console_cb: m = re.match(fr"^(\S+): Listening for serial connection on port (\d+)$", line) if m: terminal = m.group(1) port = int(m.group(2)) logger.debug(f"Console for {terminal} started on port {port}") # When we can assume Py3.7+, this can be create_task asyncio.ensure_future(console_cb(terminal, port)) finally: # If we get cancelled or throw an exception, kill the FVP logger.debug(f"Killing FVP PID {fvp_process.pid}") fvp_process.terminate() if await fvp_process.wait() != 0: logger.info(f"{cli[0]} quit with code {fvp_process.returncode}") def runfvp(cli_args): args, fvp_args = parse_args(cli_args) config_file = find_config(args) config = load_config(config_file) cli = parse_config(args, config) cli.extend(fvp_args) logger.debug(f"Constructed FVP call: {cli}") if args.console: expected_terminal = config["console"] if not expected_terminal: logger.error("--console used but FVP_CONSOLE not set in machine configuration") sys.exit(1) else: expected_terminal = None async def console_started(name, port): if name == expected_terminal: telnet = await asyncio.create_subprocess_exec("telnet", "localhost", str(port), stdin=sys.stdin, stdout=sys.stdout) await telnet.wait() logger.debug(f"Telnet quit, cancelling tasks") for t in asyncio.all_tasks(): logger.debug(f"Cancelling {t}") t.cancel() try: # When we can assume Py3.7+, this can simply be asyncio.run() loop = asyncio.get_event_loop() loop.run_until_complete(asyncio.gather(start_fvp(cli, console_cb=console_started))) except asyncio.CancelledError: pass if __name__ == "__main__": try: # Set the process group so that it's possible to kill runfvp and # everything it spawns easily. os.setpgid(0, 0) if sys.stdin.isatty(): signal.signal(signal.SIGTTOU, signal.SIG_IGN) os.tcsetpgrp(sys.stdin.fileno(), os.getpgrp()) runfvp(sys.argv[1:]) except KeyboardInterrupt: pass