From 3cf0b2d98cd2d8033a624c7a1846eb74023d6fdc Mon Sep 17 00:00:00 2001 From: Zen Date: Thu, 17 Oct 2024 13:33:08 -0500 Subject: [PATCH] black format, improve docstrings, improve return types Signed-off-by: Zen --- src/ugrd/base/banner.py | 8 +- src/ugrd/base/base.py | 26 ++-- src/ugrd/base/checks.py | 21 ++- src/ugrd/base/cmdline.py | 103 ++++++++------- src/ugrd/base/console.py | 34 ++--- src/ugrd/base/core.py | 264 +++++++++++++++++++------------------- src/ugrd/base/debug.py | 22 ++-- src/ugrd/base/keymap.py | 56 ++++---- src/ugrd/base/plymouth.py | 74 +++++------ src/ugrd/base/test.py | 95 +++++++------- 10 files changed, 356 insertions(+), 347 deletions(-) diff --git a/src/ugrd/base/banner.py b/src/ugrd/base/banner.py index 54ad9325..c1d8986c 100644 --- a/src/ugrd/base/banner.py +++ b/src/ugrd/base/banner.py @@ -1,10 +1,10 @@ -__author__ = 'desultory' -__version__ = '0.1.0' +__author__ = "desultory" +__version__ = "0.1.0" def print_banner(self) -> list[str]: - """ Prints the banner. Prints the kernel version if set """ + """Prints the banner. Prints the kernel version if set""" banner = [self.banner] - if kver := self.get('kernel_version'): + if kver := self.get("kernel_version"): banner.append(f"einfo 'Kernel version: {kver}'") return banner diff --git a/src/ugrd/base/base.py b/src/ugrd/base/base.py index 3d765cbb..f4cf2d09 100644 --- a/src/ugrd/base/base.py +++ b/src/ugrd/base/base.py @@ -153,17 +153,15 @@ def setvar(self) -> str: def readvar(self) -> str: - """ - Returns a bash function that reads a variable from /run/vars/{name}. + """Returns a bash function that reads a variable from /run/vars/{name}. The second arg can be a default value. If no default is supplied, and the variable is not found, it returns an empty string. """ return 'cat "/run/vars/${1}" 2>/dev/null || echo "${2}"' -def check_var(self) -> str: - """ - Returns a bash function that checks the value of a variable. +def check_var(self) -> list[str]: + """Returns a bash function that checks the value of a variable. if it's not set, tries to read the cmdline. """ return [ @@ -181,9 +179,8 @@ def check_var(self) -> str: ] -def prompt_user(self) -> str: - """ - Returns a bash function that pauses until the user presses enter. +def prompt_user(self) -> list[str]: + """Returns a bash function that pauses until the user presses enter. The first argument is the prompt message. The second argument is the timeout in seconds. @@ -210,9 +207,8 @@ def prompt_user(self) -> str: return output -def retry(self) -> str: - """ - Returns a bash function that retries a command some number of times. +def retry(self) -> list[str]: + """Returns a bash function that retries a command some number of times. The first argument is the number of retries. if 0, it retries 100 times. The second argument is the timeout in seconds. The remaining arguments represent the command to run. @@ -241,7 +237,7 @@ def retry(self) -> str: # To feel more at home -def edebug(self) -> str: +def edebug(self) -> list[str]: """Returns a bash function like edebug.""" return [ "if check_var quiet; then", @@ -254,12 +250,12 @@ def edebug(self) -> str: ] -def einfo(self) -> str: +def einfo(self) -> list[str]: """Returns a bash function like einfo.""" return ["if check_var quiet; then", " return", "fi", r'echo -e "\e[1;32m *\e[0m ${*}"'] -def ewarn(self) -> str: +def ewarn(self) -> list[str]: """Returns a bash function like ewarn. If plymouth is running, it displays a message instead of echoing. """ @@ -290,7 +286,7 @@ def eerror(self) -> str: ' plymouth display-message --text="Error: ${*}"', " return", "fi", - r'echo -e "\e[1;31m *\e[0m ${*}"' + r'echo -e "\e[1;31m *\e[0m ${*}"', ] else: return [r'echo -e "\e[1;31m *\e[0m ${*}"'] diff --git a/src/ugrd/base/checks.py b/src/ugrd/base/checks.py index 111ce456..cd4a60ad 100644 --- a/src/ugrd/base/checks.py +++ b/src/ugrd/base/checks.py @@ -1,34 +1,33 @@ -__version__ = '0.2.1' +__version__ = "0.2.1" from zenlib.util import contains -@contains('check_included_funcs', 'Skipping included funcs check', log_level=30) +@contains("check_included_funcs", "Skipping included funcs check", log_level=30) def check_included_funcs(self): - """ Ensures required functions are included in the build dir. """ - bash_func_names = [func + '() {\n' for func in self.included_functions] - _check_in_file(self, '/etc/profile', bash_func_names) + """Ensures required functions are included in the build dir.""" + bash_func_names = [func + "() {\n" for func in self.included_functions] + _check_in_file(self, "/etc/profile", bash_func_names) return "All functions found in the build dir." -@contains('check_in_file', 'Skipping in file check') +@contains("check_in_file", "Skipping in file check") def check_in_file(self): - """ Runs all 'check_in_file' checks. """ - for file, lines in self['check_in_file'].items(): + """Runs all 'check_in_file' checks.""" + for file, lines in self["check_in_file"].items(): _check_in_file(self, file, lines) return "All 'check_in_file' checks passed" def _check_in_file(self, file, lines): - """ Checks that all lines are in the file. """ + """Checks that all lines are in the file.""" file = self._get_build_path(file) if not file.exists(): raise ValueError("File '%s' does not exist" % file) - with open(file, 'r') as f: + with open(file, "r") as f: file_lines = f.readlines() for check_line in lines: if check_line not in file_lines: raise ValueError("Failed to find line '%s' in file '%s'" % (check_line, file)) - diff --git a/src/ugrd/base/cmdline.py b/src/ugrd/base/cmdline.py index 696056c6..80251c4b 100644 --- a/src/ugrd/base/cmdline.py +++ b/src/ugrd/base/cmdline.py @@ -1,64 +1,71 @@ -__author__ = 'desultory' -__version__ = '2.4.0' +__author__ = "desultory" +__version__ = "2.5.0" -def parse_cmdline_bool(self) -> str: - """ - Returns a bash script to parse a boolean value from /proc/cmdline +def parse_cmdline_bool(self) -> list[str]: + """Returns a bash script to parse a boolean value from /proc/cmdline The only argument is the name of the variable to be read/set """ - return ['edebug "Parsing cmdline bool: $1"', - r'setvar "$1" "$(grep -qE "(^|\s)$1(\s|$)" /proc/cmdline && echo 1 || echo 0)"'] + return [ + 'edebug "Parsing cmdline bool: $1"', + r'setvar "$1" "$(grep -qE "(^|\s)$1(\s|$)" /proc/cmdline && echo 1 || echo 0)"', + ] -def parse_cmdline_str(self) -> str: - """ - Returns a bash script to parse a string value from /proc/cmdline +def parse_cmdline_str(self) -> list[str]: + """Returns a bash script to parse a string value from /proc/cmdline The only argument is the name of the variable to be read/set """ - return ['edebug "Parsing cmdline string: $1"', - r'val=$(grep -oP "(?<=$1=)[^\s]+" /proc/cmdline)', - 'if [ -n "$val" ]; then', - ' edebug "Parsed $1: $val"', - ' setvar "$1" "$val"', - 'fi'] + return [ + 'edebug "Parsing cmdline string: $1"', + r'val=$(grep -oP "(?<=$1=)[^\s]+" /proc/cmdline)', + 'if [ -n "$val" ]; then', + ' edebug "Parsed $1: $val"', + ' setvar "$1" "$val"', + "fi", + ] + +def parse_cmdline(self) -> list[str]: + """Returns bash script to parse /proc/cmdline""" + return [ + r"""cmdline=$(awk -F '--' '{print $1}' /proc/cmdline)""", # Get everything before '--' + r'''setvar INIT_ARGS "$(awk -F '--' '{print $2}' /proc/cmdline)"''', # Get everything after '--' + f"""for bool in {" ".join([f'"{bool}"' for bool in self['cmdline_bools']])}; do""", + ' parse_cmdline_bool "$bool"', + "done", + f"""for string in {" ".join([f'"{string}"' for string in self['cmdline_strings']])}; do""", + ' parse_cmdline_str "$string"', + "done", + 'einfo "Parsed cmdline: $cmdline"', + ] -def parse_cmdline(self) -> str: - """ Returns bash script to parse /proc/cmdline """ - return [r'''cmdline=$(awk -F '--' '{print $1}' /proc/cmdline)''', # Get everything before '--' - r'''setvar INIT_ARGS "$(awk -F '--' '{print $2}' /proc/cmdline)"''', # Get everything after '--' - f'''for bool in {" ".join([f'"{bool}"' for bool in self['cmdline_bools']])}; do''', - ' parse_cmdline_bool "$bool"', - 'done', - f'''for string in {" ".join([f'"{string}"' for string in self['cmdline_strings']])}; do''', - ' parse_cmdline_str "$string"', - 'done', - 'einfo "Parsed cmdline: $cmdline"'] +def mount_cmdline_root(self) -> list[str]: + """Returns bash script to mount root partition based on /proc/cmdline""" + return [ + "root=$(readvar root)", + 'if [ -z "$root" ]; then', + ' edebug "No root partition specified in /proc/cmdline, falling back to mount_root"', + " mount_root", + " return", + "fi", + 'roottype="$(readvar roottype auto)"', + '''rootflags="$(readvar rootflags 'defaults,ro')"''', + 'einfo "Mounting root partition based on /proc/cmdline: $root -t $roottype -o $rootflags"', + 'if ! mount "$root" "$(readvar MOUNTS_ROOT_TARGET)" -t "$roottype" -o "$rootflags"; then', + ' eerror "Failed to mount the root partition using /proc/cmdline: $root -t $roottype -o $rootflags"', + " mount_root", + "fi", + ] -def mount_cmdline_root(self) -> str: - """ Returns bash script to mount root partition based on /proc/cmdline """ - return ['root=$(readvar root)', - 'if [ -z "$root" ]; then', - ' edebug "No root partition specified in /proc/cmdline, falling back to mount_root"', - ' mount_root', - ' return', - 'fi', - 'roottype="$(readvar roottype auto)"', - '''rootflags="$(readvar rootflags 'defaults,ro')"''', - 'einfo "Mounting root partition based on /proc/cmdline: $root -t $roottype -o $rootflags"', - 'if ! mount "$root" "$(readvar MOUNTS_ROOT_TARGET)" -t "$roottype" -o "$rootflags"; then', - ' eerror "Failed to mount the root partition using /proc/cmdline: $root -t $roottype -o $rootflags"', - ' mount_root', - 'fi'] +def export_exports(self) -> list[str]: + """Returns a bash script exporting all exports defined in the exports key.""" + from importlib.metadata import PackageNotFoundError, version -def export_exports(self) -> list: - """ Returns a bash script exporting all exports defined in the exports key. """ - from importlib.metadata import version, PackageNotFoundError try: - self['exports']['VERSION'] = version(__package__.split('.')[0]) + self["exports"]["VERSION"] = version(__package__.split(".")[0]) except PackageNotFoundError: - self['exports']['VERSION'] = 9999 - return [f'setvar {key} "{value}"' for key, value in self['exports'].items()] + self["exports"]["VERSION"] = 9999 + return [f'setvar {key} "{value}"' for key, value in self["exports"].items()] diff --git a/src/ugrd/base/console.py b/src/ugrd/base/console.py index 66bf47d8..a31da97f 100644 --- a/src/ugrd/base/console.py +++ b/src/ugrd/base/console.py @@ -1,38 +1,38 @@ __author__ = "desultory" -__version__ = "1.2.0" +__version__ = "1.3.0" -def custom_init(self) -> str: - """ - init override for the console module. +def custom_init(self) -> list[str]: + """ init override for the console module. + Adds the shebang to the top of the file, runs the banner, followed by + most of the main init runlevels Write the main init runlevels to self._custom_init_file. Returns the output of console_init which is the command to start agetty. """ - custom_init_contents = [self['shebang'], - f'einfo "Starting console module v{__version__}"', - 'print_banner', - *self.generate_init_main()] + custom_init_contents = [ + self["shebang"], + f'einfo "Starting console module v{__version__}"', + "print_banner", + *self.generate_init_main(), + ] return console_init(self), custom_init_contents -def console_init(self) -> str: - """ - Start agetty on the primary console. - Tell it to execute the _custom_init_file +def console_init(self) -> list[str]: + """ Returns the command to start agetty on the primary console. If the console is a serial port, set the baud rate. """ - name = self['primary_console'] - console = self['console'][name] + name = self["primary_console"] + console = self["console"][name] out_str = f"agetty --autologin root --login-program {self['_custom_init_file']}" - console_type = console.get('type', 'tty') - if console_type != 'tty': + console_type = console.get("type", "tty") + if console_type != "tty": # This differs from usage in the man page but seems to work? out_str += f" --local-line {console['baud']}" out_str += f" {name} {console_type} || rd_restart" return out_str - diff --git a/src/ugrd/base/core.py b/src/ugrd/base/core.py index eaf9ecc5..b20e6d57 100644 --- a/src/ugrd/base/core.py +++ b/src/ugrd/base/core.py @@ -1,26 +1,27 @@ -__author__ = 'desultory' -__version__ = '3.9.1' +__author__ = "desultory" +__version__ = "3.9.1" from pathlib import Path from typing import Union -from zenlib.util import contains, unset, NoDupFlatList +from zenlib.util import NoDupFlatList, contains, unset def detect_tmpdir(self) -> None: - """ Reads TMPDIR from the environment, sets it as the temporary directory. """ + """Reads TMPDIR from the environment, sets it as the temporary directory.""" from os import environ - if tmpdir := environ.get('TMPDIR'): + + if tmpdir := environ.get("TMPDIR"): self.logger.info("Detected TMPDIR: %s" % tmpdir) - self['tmpdir'] = Path(tmpdir) + self["tmpdir"] = Path(tmpdir) -@contains('clean', "Skipping cleaning build directory", log_level=30) +@contains("clean", "Skipping cleaning build directory", log_level=30) def clean_build_dir(self) -> None: - """ Cleans the build directory. """ + """Cleans the build directory.""" from shutil import rmtree - build_dir = self._get_build_path('/') + build_dir = self._get_build_path("/") if build_dir.is_dir(): self.logger.warning("Cleaning build directory: %s" % build_dir) @@ -30,15 +31,15 @@ def clean_build_dir(self) -> None: def generate_structure(self) -> None: - """ Generates the initramfs directory structure. """ - for subdir in set(self['paths']): + """Generates the initramfs directory structure.""" + for subdir in set(self["paths"]): self._mkdir(subdir) def calculate_dependencies(self, binary: str) -> list[Path]: - """ - Calculates the dependencies of a binary using lddtree + """ Calculates the dependencies of a binary using lddtree :param binary: The binary to calculate dependencies for + :return: A list of dependency paths """ from shutil import which from subprocess import run @@ -50,16 +51,16 @@ def calculate_dependencies(self, binary: str) -> list[Path]: binary_path = Path(binary_path) self.logger.debug("Calculating dependencies for: %s" % binary_path) - dependencies = run(['lddtree', '-l', str(binary_path)], capture_output=True) + dependencies = run(["lddtree", "-l", str(binary_path)], capture_output=True) if dependencies.returncode != 0: self.logger.warning("Unable to calculate dependencies for: %s" % binary) - raise RuntimeError("Unable to resolve dependencies, error: %s" % dependencies.stderr.decode('utf-8')) + raise RuntimeError("Unable to resolve dependencies, error: %s" % dependencies.stderr.decode("utf-8")) dependency_paths = [] - for dependency in dependencies.stdout.decode('utf-8').splitlines(): + for dependency in dependencies.stdout.decode("utf-8").splitlines(): # Remove extra slash at the start if it exists - if dependency.startswith('//'): + if dependency.startswith("//"): dependency = dependency[1:] dependency_paths.append(Path(dependency)) @@ -68,24 +69,24 @@ def calculate_dependencies(self, binary: str) -> list[Path]: def handle_usr_symlinks(self) -> None: - """ Adds symlinks for /usr/bin and /usr/sbin to /bin and /sbin. """ - build_dir = self._get_build_path('/') + """Adds symlinks for /usr/bin and /usr/sbin to /bin and /sbin.""" + build_dir = self._get_build_path("/") - if not (build_dir / 'bin').is_dir(): - if (build_dir / 'usr/bin').is_dir(): - self._symlink('/usr/bin', '/bin/') + if not (build_dir / "bin").is_dir(): + if (build_dir / "usr/bin").is_dir(): + self._symlink("/usr/bin", "/bin/") else: raise RuntimeError("Neither /bin nor /usr/bin exist in the build directory") - if not (build_dir / 'sbin').is_dir() and (build_dir / 'usr/sbin').is_dir(): - self._symlink('/usr/sbin', '/sbin/') + if not (build_dir / "sbin").is_dir() and (build_dir / "usr/sbin").is_dir(): + self._symlink("/usr/sbin", "/sbin/") def deploy_dependencies(self) -> None: - """ Copies all dependencies to the build directory. """ - for dependency in self['dependencies']: + """Copies all dependencies to the build directory.""" + for dependency in self["dependencies"]: if dependency.is_symlink(): - if self['symlinks'].get(f'_auto_{dependency.name}'): + if self["symlinks"].get(f"_auto_{dependency.name}"): self.logger.debug("Dependency is a symlink, skipping: %s" % dependency) continue else: @@ -95,113 +96,116 @@ def deploy_dependencies(self) -> None: def deploy_xz_dependencies(self) -> None: - """ Decompresses all xz dependencies into the build directory. """ + """Decompresses all xz dependencies into the build directory.""" from lzma import decompress - for xz_dependency in self['xz_dependencies']: + + for xz_dependency in self["xz_dependencies"]: self.logger.debug("[xz] Decompressing: %s" % xz_dependency) - out_path = self._get_build_path(str(xz_dependency).replace('.xz', '')) + out_path = self._get_build_path(str(xz_dependency).replace(".xz", "")) if not out_path.parent.is_dir(): self.logger.debug("Creating parent directory: %s" % out_path.parent) self._mkdir(out_path.parent, resolve_build=False) - with out_path.open('wb') as out_file: + with out_path.open("wb") as out_file: out_file.write(decompress(xz_dependency.read_bytes())) self.logger.info("[xz] Decompressed '%s' to: %s" % (xz_dependency, out_path)) def deploy_gz_dependencies(self) -> None: - """ Decompresses all gzip dependencies into the build directory. """ + """Decompresses all gzip dependencies into the build directory.""" from gzip import decompress - for gz_dependency in self['gz_dependencies']: + + for gz_dependency in self["gz_dependencies"]: self.logger.debug("[gz] Decompressing: %s" % gz_dependency) - out_path = self._get_build_path(str(gz_dependency).replace('.gz', '')) + out_path = self._get_build_path(str(gz_dependency).replace(".gz", "")) if not out_path.parent.is_dir(): self.logger.debug("Creating parent directory: %s" % out_path.parent) self._mkdir(out_path.parent, resolve_build=False) - with out_path.open('wb') as out_file: + with out_path.open("wb") as out_file: out_file.write(decompress(gz_dependency.read_bytes())) self.logger.info("[gz] Decompressed '%s' to: %s" % (gz_dependency, out_path)) def deploy_copies(self) -> None: - """ Copies everything from self['copies'] into the build directory. """ - for copy_name, copy_parameters in self['copies'].items(): + """Copies everything from self['copies'] into the build directory.""" + for copy_name, copy_parameters in self["copies"].items(): self.logger.debug("[%s] Copying: %s" % (copy_name, copy_parameters)) - self._copy(copy_parameters['source'], copy_parameters['destination']) + self._copy(copy_parameters["source"], copy_parameters["destination"]) def deploy_symlinks(self) -> None: - """ Creates symlinks for all symlinks in self['symlinks'].""" - for symlink_name, symlink_parameters in self['symlinks'].items(): + """Creates symlinks for all symlinks in self['symlinks'].""" + for symlink_name, symlink_parameters in self["symlinks"].items(): self.logger.debug("[%s] Creating symlink: %s" % (symlink_name, symlink_parameters)) - self._symlink(symlink_parameters['source'], symlink_parameters['target']) + self._symlink(symlink_parameters["source"], symlink_parameters["target"]) -@unset('mknod_cpio', "Skipping real device node creation with mknod, as mknod_cpio is not specified.", log_level=20) +@unset("mknod_cpio", "Skipping real device node creation with mknod, as mknod_cpio is not specified.", log_level=20) def deploy_nodes(self) -> None: - """ Generates specified device nodes. """ + """Generates specified device nodes.""" from os import makedev, mknod from stat import S_IFCHR - for node, config in self['nodes'].items(): - node_path_abs = Path(config['path']) + for node, config in self["nodes"].items(): + node_path_abs = Path(config["path"]) - node_path = self._get_build_path('/') / node_path_abs.relative_to(node_path_abs.anchor) - node_mode = S_IFCHR | config['mode'] + node_path = self._get_build_path("/") / node_path_abs.relative_to(node_path_abs.anchor) + node_mode = S_IFCHR | config["mode"] try: - mknod(node_path, mode=node_mode, device=makedev(config['major'], config['minor'])) + mknod(node_path, mode=node_mode, device=makedev(config["major"], config["minor"])) self.logger.info("Created device node '%s' at path: %s" % (node, node_path)) except PermissionError as e: self.logger.error("Unable to create device node %s at path: %s" % (node, node_path)) - self.logger.info("`mknod_cpio` in `ugrd.base` can be used to generate device nodes within the initramfs archive if they cannot be created on the host system.") + self.logger.info( + "`mknod_cpio` in `ugrd.base` can be used to generate device nodes within the initramfs archive if they cannot be created on the host system." + ) raise e -@contains('find_libgcc', "Skipping libgcc_s dependency resolution", log_level=20) +@contains("find_libgcc", "Skipping libgcc_s dependency resolution", log_level=20) def find_libgcc(self) -> None: - """ - Finds libgcc.so, adds a 'dependencies' item for it. + """Finds libgcc.so, adds a 'dependencies' item for it. Adds the parent directory to 'library_paths' """ from pathlib import Path try: - ldconfig = self._run(['ldconfig', '-p']).stdout.decode().split("\n") + ldconfig = self._run(["ldconfig", "-p"]).stdout.decode().split("\n") except RuntimeError: return self.logger.critical("Unable to run ldconfig -p, if GCC is being used, this is fatal!") - libgcc = [lib for lib in ldconfig if 'libgcc_s' in lib and '(libc6,' in lib][0] - source_path = Path(libgcc.partition('=> ')[-1]) + libgcc = [lib for lib in ldconfig if "libgcc_s" in lib and "(libc6," in lib][0] + source_path = Path(libgcc.partition("=> ")[-1]) self.logger.info("Source path for libgcc_s: %s" % source_path) - self['dependencies'] = source_path - self['library_paths'] = str(source_path.parent) + self["dependencies"] = source_path + self["library_paths"] = str(source_path.parent) def _process_out_file(self, out_file): - """ Processes the out_file configuration option. """ + """Processes the out_file configuration option.""" if Path(out_file).is_dir(): self.logger.info("Specified out_file is a directory, setting out_dir: %s" % out_file) - self['out_dir'] = out_file + self["out_dir"] = out_file return - if out_file.startswith('./'): + if out_file.startswith("./"): self.logger.debug("Relative out_file path detected: %s" % out_file) - self['out_dir'] = Path('.').resolve() - self.logger.info("Resolved out_dir to: %s" % self['out_dir']) + self["out_dir"] = Path(".").resolve() + self.logger.info("Resolved out_dir to: %s" % self["out_dir"]) out_file = Path(out_file[2:]) - elif Path(out_file).parent.is_dir() and str(Path(out_file).parent) != '.': - self['out_dir'] = Path(out_file).parent - self.logger.info("Resolved out_dir to: %s" % self['out_dir']) + elif Path(out_file).parent.is_dir() and str(Path(out_file).parent) != ".": + self["out_dir"] = Path(out_file).parent + self.logger.info("Resolved out_dir to: %s" % self["out_dir"]) out_file = Path(out_file).name else: out_file = Path(out_file) - self.data['out_file'] = out_file + self.data["out_file"] = out_file def _process_paths_multi(self, path: Union[Path, str]) -> None: - """ + """ Processes a path entry. Converts the input to a Path if it is not one. Checks if the path is absolute, and if so, converts it to a relative path. """ @@ -215,16 +219,16 @@ def _process_paths_multi(self, path: Union[Path, str]) -> None: self.logger.debug("Path was absolute, converted to relative: %s" % path) self.logger.debug("Adding path: %s" % path) - self['paths'].append(path) + self["paths"].append(path) def _process_binaries_multi(self, binary: str) -> None: - """ Processes binaries into the binaries list, adding dependencies along the way. """ - if binary in self['binaries']: + """Processes binaries into the binaries list, adding dependencies along the way.""" + if binary in self["binaries"]: return self.logger.debug("Binary already in binaries list, skipping: %s" % binary) # Check if there is an import function that collides with the name of the binary - if funcs := self['imports'].get('functions'): + if funcs := self["imports"].get("functions"): if binary in funcs: raise ValueError("Binary name collides with import function name: %s" % binary) @@ -232,21 +236,21 @@ def _process_binaries_multi(self, binary: str) -> None: dependencies = calculate_dependencies(self, binary) # The first dependency will be the path of the binary itself, don't add this to the library paths - self['dependencies'] = dependencies[0] + self["dependencies"] = dependencies[0] for dependency in dependencies[1:]: - self['dependencies'] = dependency - if str(dependency.parent) not in self['library_paths']: + self["dependencies"] = dependency + if str(dependency.parent) not in self["library_paths"]: self.logger.info("Adding library path: %s" % dependency.parent) # Make it a string so NoDupFlatList can handle it # It being derived from a path should ensure it's a proper path - self['library_paths'] = str(dependency.parent) + self["library_paths"] = str(dependency.parent) self.logger.debug("Adding binary: %s" % binary) - self['binaries'].append(binary) + self["binaries"].append(binary) def _validate_dependency(self, dependency: Union[Path, str]) -> None: - """ Performas basic validation and normalization for dependencies. """ + """Performas basic validation and normalization for dependencies.""" if not isinstance(dependency, Path): dependency = Path(dependency) @@ -257,7 +261,7 @@ def _validate_dependency(self, dependency: Union[Path, str]) -> None: def _process_dependencies_multi(self, dependency: Union[Path, str]) -> None: - """ + """ Processes dependencies. Converts the input to a Path if it is not one, checks if it exists. If the dependency is a symlink, resolve it and add it to the symlinks list. """ @@ -265,25 +269,27 @@ def _process_dependencies_multi(self, dependency: Union[Path, str]) -> None: if dependency.is_dir(): self.logger.debug("Dependency is a directory, adding to paths: %s" % dependency) - self['paths'] = dependency + self["paths"] = dependency return while dependency.is_symlink(): - if self['symlinks'].get(f'_auto_{dependency.name}'): - self.logger.log(5, "Dependency is a symlink which is already in the symlinks list, skipping: %s" % dependency) + if self["symlinks"].get(f"_auto_{dependency.name}"): + self.logger.log( + 5, "Dependency is a symlink which is already in the symlinks list, skipping: %s" % dependency + ) break else: resolved_path = dependency.resolve() self.logger.debug("Dependency is a symlink, adding to symlinks: %s -> %s" % (dependency, resolved_path)) - self['symlinks'][f'_auto_{dependency.name}'] = {'source': resolved_path, 'target': dependency} + self["symlinks"][f"_auto_{dependency.name}"] = {"source": resolved_path, "target": dependency} dependency = resolved_path self.logger.debug("Added dependency: %s" % dependency) - self['dependencies'].append(dependency) + self["dependencies"].append(dependency) def _process_opt_dependencies_multi(self, dependency: Union[Path, str]) -> None: - """ Processes optional dependencies. """ + """Processes optional dependencies.""" try: _process_dependencies_multi(self, dependency) except FileNotFoundError as e: @@ -292,119 +298,113 @@ def _process_opt_dependencies_multi(self, dependency: Union[Path, str]) -> None: def _process_xz_dependencies_multi(self, dependency: Union[Path, str]) -> None: - """ + """ Processes xz dependencies. Checks that the file is a xz file, and adds it to the xz dependencies list. !! Resolves symlinks implicitly !! """ dependency = _validate_dependency(self, dependency) - if dependency.suffix != '.xz': + if dependency.suffix != ".xz": self.logger.warning("XZ dependency missing xz extension: %s" % dependency) - self['xz_dependencies'].append(dependency) + self["xz_dependencies"].append(dependency) def _process_gz_dependencies_multi(self, dependency: Union[Path, str]) -> None: - """ + """ Processes gzip dependencies. Checks that the file is a gz file, and adds it to the gz dependencies list. !! Resolves symlinks implicitly !! """ dependency = _validate_dependency(self, dependency) - if dependency.suffix != '.gz': + if dependency.suffix != ".gz": self.logger.warning("GZIP dependency missing gz extension: %s" % dependency) - self['gz_dependencies'].append(dependency) + self["gz_dependencies"].append(dependency) def _process_build_logging(self, log_build: bool) -> None: - """ Sets the build log flag. """ - build_log_level = self.get('_build_log_level', 10) + """Sets the build log flag.""" + build_log_level = self.get("_build_log_level", 10) if log_build: - self['_build_log_level'] = max(build_log_level + 10, 20) + self["_build_log_level"] = max(build_log_level + 10, 20) else: - if self['_build_log_level'] > 10: + if self["_build_log_level"] > 10: self.logger.warning("Resetting _build_log_level to 10, as build logging is disabled.") - self['_build_log_level'] = 10 - self.data['build_logging'] = log_build + self["_build_log_level"] = 10 + self.data["build_logging"] = log_build def _process_copies_multi(self, name: str, parameters: dict) -> None: - """ - Processes a copy from the copies parameter + """Processes a copy from the copies parameter Ensures the source and target are defined in the parameters. """ self.logger.log(5, "[%s] Processing copies: %s" % (name, parameters)) - if 'source' not in parameters: + if "source" not in parameters: raise ValueError("[%s] No source specified" % name) - if 'destination' not in parameters: + if "destination" not in parameters: raise ValueError("[%s] No destination specified" % name) self.logger.debug("[%s] Adding copies: %s" % (name, parameters)) - self['copies'][name] = parameters + self["copies"][name] = parameters def _process_symlinks_multi(self, name: str, parameters: dict) -> None: - """ - Processes a symlink, + """Processes a symlink. Ensures the source and target are defined in the parameters. """ self.logger.log(5, "[%s] Processing symlink: %s" % (name, parameters)) - if 'source' not in parameters: + if "source" not in parameters: raise ValueError("[%s] No source specified" % name) - if 'target' not in parameters: + if "target" not in parameters: raise ValueError("[%s] No target specified" % name) - self.logger.debug("[%s] Adding symlink: %s -> %s" % (name, parameters['source'], parameters['target'])) - self['symlinks'][name] = parameters + self.logger.debug("[%s] Adding symlink: %s -> %s" % (name, parameters["source"], parameters["target"])) + self["symlinks"][name] = parameters def _process_nodes_multi(self, name: str, config: dict) -> None: - """ - Process a device node. + """Process a device node. Validates the major and minor are defined in the parameters. """ - if 'major' not in config: + if "major" not in config: raise ValueError("[%s] No major specified" % name) - if 'minor' not in config: + if "minor" not in config: raise ValueError("[%s] No minor specified" % name) - if 'path' not in config: - config['path'] = f"dev/{name}" - self.logger.debug("[%s] No path specified, assuming: %s" % (name, config['path'])) + if "path" not in config: + config["path"] = f"dev/{name}" + self.logger.debug("[%s] No path specified, assuming: %s" % (name, config["path"])) - if 'mode' not in config: - config['mode'] = 0o660 - self.logger.debug("[%s] No mode specified, assuming: %s" % (name, config['mode'])) + if "mode" not in config: + config["mode"] = 0o660 + self.logger.debug("[%s] No mode specified, assuming: %s" % (name, config["mode"])) self.logger.debug("[%s] Adding node: %s" % (name, config)) - self['nodes'][name] = config + self["nodes"][name] = config def _process_masks_multi(self, runlevel: str, function: str) -> None: - """ Processes a mask definition. """ - if runlevel not in self['masks']: + """Processes a mask definition.""" + if runlevel not in self["masks"]: self.logger.debug("Creating new mask: %s" % runlevel) - self['masks'][runlevel] = NoDupFlatList(logger=self.logger) + self["masks"][runlevel] = NoDupFlatList(logger=self.logger) self.logger.info("[%s] Adding mask: %s" % (runlevel, function)) - self['masks'][runlevel] = function + self["masks"][runlevel] = function def _process_hostonly(self, hostonly: bool) -> None: - """ - Processes the hostonly parameter. + """Processes the hostonly parameter. If validation is enabled, and hostonly mode is set to disabled, disable validation and warn. """ self.logger.debug("Processing hostonly: %s" % hostonly) - self.data['hostonly'] = hostonly - if not hostonly and self['validate']: + self.data["hostonly"] = hostonly + if not hostonly and self["validate"]: self.logger.warning("Hostonly is disabled, disabling validation") - self['validate'] = False + self["validate"] = False def _process_validate(self, validate: bool) -> None: - """ - Processes the validate parameter. + """Processes the validate parameter. It should only be allowed if hostonly mode is enabled. """ self.logger.debug("Processing validate: %s" % validate) - if not self['hostonly'] and validate: + if not self["hostonly"] and validate: raise ValueError("Cannot enable validation when hostonly mode is disabled") - self.data['validate'] = validate - + self.data["validate"] = validate diff --git a/src/ugrd/base/debug.py b/src/ugrd/base/debug.py index 2103e97d..744901c4 100644 --- a/src/ugrd/base/debug.py +++ b/src/ugrd/base/debug.py @@ -1,20 +1,22 @@ __author__ = "desultory" -__version__ = "1.3.0" +__version__ = "1.3.1" from zenlib.util import contains def start_shell(self) -> str: - """ Start a bash shell at the start of the initramfs. """ - return ['if ! check_var debug; then', - ' ewarn "The debug module is enabled, but debug is not set enabled"', - ' return', - 'fi', - 'einfo "Starting debug shell"', - 'bash -l'] + """Start a bash shell at the start of the initramfs.""" + return [ + "if ! check_var debug; then", + ' ewarn "The debug module is enabled, but debug is not set enabled"', + " return", + "fi", + 'einfo "Starting debug shell"', + "bash -l", + ] -@contains('start_shell', 'Not enabling the debug shell, as the start_shell option is not set.', log_level=30) +@contains("start_shell", "Not enabling the debug shell, as the start_shell option is not set.", log_level=30) def enable_debug(self) -> str: - """ Enable debug mode. """ + """Enable debug mode.""" return "setvar debug 1" diff --git a/src/ugrd/base/keymap.py b/src/ugrd/base/keymap.py index 0a6b22f0..6367d904 100644 --- a/src/ugrd/base/keymap.py +++ b/src/ugrd/base/keymap.py @@ -1,12 +1,13 @@ -__author__ = 'desultory' -__version__ = '0.3.1' +__author__ = "desultory" +__version__ = "0.3.2" from zenlib.util import contains -def _find_keymap_include(self, base_path, included_file, no_recurse=False): - """ Finds the included file in the keymap file. """ +def _find_keymap_include(self, base_path, included_file, no_recurse=False) -> str: + """Finds the included file in the keymap file.""" from pathlib import Path + if not isinstance(base_path, Path): base_path = Path(base_path) if not base_path.is_dir(): @@ -16,13 +17,13 @@ def _find_keymap_include(self, base_path, included_file, no_recurse=False): for file in base_path.iterdir(): if file.name == included_file: return str(file) - if file.name == included_file + '.gz': + if file.name == included_file + ".gz": return str(file) self.logger.debug("Could not find included file '%s' in dir: %s" % (included_file, base_path)) - if base_path.name != 'include': - include_dir = base_path / 'include' + if base_path.name != "include": + include_dir = base_path / "include" if include_dir.exists(): self.logger.debug("Searching include directory: %s" % include_dir) try: @@ -30,50 +31,49 @@ def _find_keymap_include(self, base_path, included_file, no_recurse=False): except FileNotFoundError: pass - if base_path.name != 'keymaps' and not no_recurse: + if base_path.name != "keymaps" and not no_recurse: self.logger.debug("Searching parent directory: %s" % base_path.parent) return _find_keymap_include(self, base_path.parent, included_file) - if not included_file.endswith('.inc'): + if not included_file.endswith(".inc"): try: - return _find_keymap_include(self, base_path, included_file + '.inc') + return _find_keymap_include(self, base_path, included_file + ".inc") except FileNotFoundError: pass raise FileNotFoundError(f"Could not find included file: {included_file}") -def _add_keymap_file(self, keymap_file: str) -> str: - """ Adds an individual keymap file, handling gzipped files. """ - if keymap_file.endswith('.gz'): - self['gz_dependencies'] = keymap_file +def _add_keymap_file(self, keymap_file: str) -> None: + """Adds an individual keymap file, handling gzipped files.""" + if keymap_file.endswith(".gz"): + self["gz_dependencies"] = keymap_file import gzip - with gzip.open(keymap_file, 'rb') as f: + + with gzip.open(keymap_file, "rb") as f: keymap_data = f.read() keymap_file = keymap_file[:-3] else: - self['dependencies'] = keymap_file - keymap_data = open(keymap_file, 'rb').read() + self["dependencies"] = keymap_file + keymap_data = open(keymap_file, "rb").read() keymap_data = keymap_data.decode() for line in keymap_data.splitlines(): - if line.startswith('include'): - include_name = line.split()[1].replace('"', '') + if line.startswith("include"): + include_name = line.split()[1].replace('"', "") include_file = _find_keymap_include(self, keymap_file, include_name) self.logger.info("Detected keymap include, adding file: %s" % include_file) _add_keymap_file(self, include_file) -def _process_keymap_file(self, keymap_file: str) -> str: - """ Sets the keymap file, adding it to the list of files to be copied to the new root. """ +def _process_keymap_file(self, keymap_file: str) -> None: + """Sets the keymap file, adding it to the list of files to be copied to the new root.""" _add_keymap_file(self, keymap_file) - self.data['keymap_file'] = keymap_file.replace('.gz', '') - + self.data["keymap_file"] = keymap_file.replace(".gz", "") -@contains('keymap_file', "keymap_file must be set to use the keymap module", raise_exception=True) -def set_keymap(self) -> str: - """ Sets the specified keymap. """ - return [f'einfo "Setting keymap: {self["keymap_file"]}"', - f'loadkeys {self["keymap_file"]}'] +@contains("keymap_file", "keymap_file must be set to use the keymap module", raise_exception=True) +def set_keymap(self) -> list[str]: + """Sets the specified keymap.""" + return [f'einfo "Setting keymap: {self["keymap_file"]}"', f'loadkeys {self["keymap_file"]}'] diff --git a/src/ugrd/base/plymouth.py b/src/ugrd/base/plymouth.py index 4f1c7415..9b170565 100644 --- a/src/ugrd/base/plymouth.py +++ b/src/ugrd/base/plymouth.py @@ -1,69 +1,69 @@ -__version__ = '0.1.1' +__version__ = "0.1.1" -from zenlib.util import unset from configparser import ConfigParser from pathlib import Path -PLYMOUTH_CONFIG_FILES = ['/etc/plymouth/plymouthd.conf', '/usr/share/plymouth/plymouthd.defaults'] +from zenlib.util import unset + +PLYMOUTH_CONFIG_FILES = ["/etc/plymouth/plymouthd.conf", "/usr/share/plymouth/plymouthd.defaults"] -@unset('plymouth_config') -def find_plymouth_config(self): - """ Adds the plymouth config files to the build directory """ +@unset("plymouth_config") +def find_plymouth_config(self) -> None: + """Adds the plymouth config files to the build directory""" self.logger.info("Finding plymouthd.conf") for file in PLYMOUTH_CONFIG_FILES: plymouth_config = ConfigParser() plymouth_config.read(file) - if plymouth_config.has_section('Daemon') and plymouth_config.has_option('Daemon', 'Theme'): - self['plymouth_config'] = file + if plymouth_config.has_section("Daemon") and plymouth_config.has_option("Daemon", "Theme"): + self["plymouth_config"] = file break self.logger.debug("Plymouth config file missing theme option: %s" % file) else: - raise FileNotFoundError('Failed to find plymouthd.conf') + raise FileNotFoundError("Failed to find plymouthd.conf") -def _process_plymouth_config(self, file): - """ Checks that the config file is valid """ +def _process_plymouth_config(self, file) -> None: + """Checks that the config file is valid""" self.logger.info("Processing plymouthd.conf: %s" % file) plymouth_config = ConfigParser() plymouth_config.read(file) - self['plymouth_theme'] = plymouth_config['Daemon']['Theme'] - self.data['plymouth_config'] = file - self['copies'] = {'plymouth_config_file': {'source': file, 'destination': '/etc/plymouth/plymouthd.conf'}} - if device_timeout := plymouth_config.get('Daemon', 'DeviceTimeout', fallback=None): + self["plymouth_theme"] = plymouth_config["Daemon"]["Theme"] + self.data["plymouth_config"] = file + self["copies"] = {"plymouth_config_file": {"source": file, "destination": "/etc/plymouth/plymouthd.conf"}} + if device_timeout := plymouth_config.get("Daemon", "DeviceTimeout", fallback=None): if float(device_timeout) > 1: self.logger.warning("[Plymouth] DeviceTimeout is set to %s, this may cause boot delays." % device_timeout) -def _process_plymouth_theme(self, theme): - """ Checks that the theme is valid """ - theme_dir = Path('/usr/share/plymouth/themes') / theme +def _process_plymouth_theme(self, theme) -> None: + """Checks that the theme is valid""" + theme_dir = Path("/usr/share/plymouth/themes") / theme if not theme_dir.exists(): - raise FileNotFoundError('Theme directory not found: %s' % theme_dir) + raise FileNotFoundError("Theme directory not found: %s" % theme_dir) - self.data['plymouth_theme'] = theme + self.data["plymouth_theme"] = theme -def pull_plymouth(self): - """ Adds plymouth files to dependencies """ - dir_list = [Path('/usr/lib64/plymouth'), Path('/usr/share/plymouth/themes/') / self["plymouth_theme"]] +def pull_plymouth(self) -> None: + """Adds plymouth files to dependencies""" + dir_list = [Path("/usr/lib64/plymouth"), Path("/usr/share/plymouth/themes/") / self["plymouth_theme"]] self.logger.info("[%s] Adding plymouth files to dependencies." % self["plymouth_theme"]) for directory in dir_list: - for file in directory.rglob('*'): - self['dependencies'] = file + for file in directory.rglob("*"): + self["dependencies"] = file -def make_devpts(self): - """ Creates /dev/pts and mounts the fstab entry """ - return ['mkdir -m755 -p /dev/pts', - 'mount /dev/pts'] +def make_devpts(self) -> list[str]: + """Creates /dev/pts and mounts the fstab entry""" + return ["mkdir -m755 -p /dev/pts", "mount /dev/pts"] -def start_plymouth(self): - """ - Runs plymouthd - """ - return ['mkdir -p /run/plymouth', - 'plymouthd --mode boot --pid-file /run/plymouth/plymouth.pid --attach-to-session', - 'setvar plymouth 1', - 'plymouth show-splash'] +def start_plymouth(self) -> list[str]: + """Returns bash lines to run plymouthd""" + return [ + "mkdir -p /run/plymouth", + "plymouthd --mode boot --pid-file /run/plymouth/plymouth.pid --attach-to-session", + "setvar plymouth 1", + "plymouth show-splash", + ] diff --git a/src/ugrd/base/test.py b/src/ugrd/base/test.py index 1a2b5faf..fd91f989 100644 --- a/src/ugrd/base/test.py +++ b/src/ugrd/base/test.py @@ -2,53 +2,55 @@ from zenlib.util import unset +COPY_CONFIG = ["mounts", "out_dir", "tmpdir", "clean", "test_image_size", "test_flag"] -COPY_CONFIG = [ - 'mounts', 'out_dir', 'tmpdir', 'clean', - 'test_image_size', 'test_flag' -] - -@unset('test_kernel') +@unset("test_kernel") def find_kernel_path(self): + """Finds the kernel path for the current system""" from pathlib import Path - self.logger.info("Trying to find the kernel path for: %s", self['kernel_version']) - kernel_path = Path(self['_kmod_dir']) / 'vmlinuz' # try this first - if not (self['_kmod_dir'] / 'vmlinuz').exists(): - for search_dir in ['/boot', '/efi']: - for prefix in ['vmlinuz', 'kernel', 'linux', 'bzImage']: + + self.logger.info("Trying to find the kernel path for: %s", self["kernel_version"]) + kernel_path = Path(self["_kmod_dir"]) / "vmlinuz" # try this first + if not (self["_kmod_dir"] / "vmlinuz").exists(): + for search_dir in ["/boot", "/efi"]: + for prefix in ["vmlinuz", "kernel", "linux", "bzImage"]: kernel_path = Path(search_dir) / f'{prefix}-{self["kernel_version"]}' if kernel_path.exists(): break if kernel_path.exists(): break else: - raise FileNotFoundError("Kernel not found: %s" % self['kernel_version']) + raise FileNotFoundError("Kernel not found: %s" % self["kernel_version"]) self.logger.info("Found kernel at: %s", kernel_path) - self['test_kernel'] = kernel_path + self["test_kernel"] = kernel_path def init_test_vars(self): + """Initializes the test variables""" from uuid import uuid4 + find_kernel_path(self) - if not self['test_flag']: - self['test_flag'] = uuid4() + if not self["test_flag"]: + self["test_flag"] = uuid4() def _get_qemu_cmd_args(self, test_image): - """ Gets the qemu command from the configuration """ + """Returns arguements to run QEMU for the current test configuration.""" test_initrd = self._archive_out_path - test_rootfs = test_image['_archive_out_path'] - qemu_args = {'-m': self['test_memory'], - '-cpu': self['test_cpu'], - '-kernel': self['test_kernel'], - '-initrd': test_initrd, - '-serial': 'mon:stdio', - '-append': self['test_cmdline'], - '-drive': 'file=%s,format=raw' % test_rootfs} - - qemu_bools = [f'-{item}' for item in self['qemu_bool_args']] + test_rootfs = test_image["_archive_out_path"] + qemu_args = { + "-m": self["test_memory"], + "-cpu": self["test_cpu"], + "-kernel": self["test_kernel"], + "-initrd": test_initrd, + "-serial": "mon:stdio", + "-append": self["test_cmdline"], + "-drive": "file=%s,format=raw" % test_rootfs, + } + + qemu_bools = [f"-{item}" for item in self["qemu_bool_args"]] arglist = [f"qemu-system-{self['test_arch']}"] + qemu_bools for key, value in qemu_args.items(): @@ -59,17 +61,19 @@ def _get_qemu_cmd_args(self, test_image): def make_test_image(self): - """ Creates a new initramfs generator to create the test image """ + """Creates a new initramfs generator to create the test image""" from ugrd.initramfs_generator import InitramfsGenerator - kwargs = {'logger': self.logger, - 'validate': False, - 'NO_BASE': True, - 'config': None, - 'modules': 'ugrd.fs.test_image', - 'out_file': self['test_rootfs_name'], - 'build_dir': self['test_rootfs_build_dir'], - **{key: self[key] for key in COPY_CONFIG}} + kwargs = { + "logger": self.logger, + "validate": False, + "NO_BASE": True, + "config": None, + "modules": "ugrd.fs.test_image", + "out_file": self["test_rootfs_name"], + "build_dir": self["test_rootfs_build_dir"], + **{key: self[key] for key in COPY_CONFIG}, + } target_fs = InitramfsGenerator(**kwargs) try: @@ -82,34 +86,35 @@ def make_test_image(self): def test_image(self): + """Runs the test image in QEMU""" image = make_test_image(self) qemu_cmd = _get_qemu_cmd_args(self, image) - self.logger.info("Testing initramfs image: %s", self['_archive_out_path']) + self.logger.info("Testing initramfs image: %s", self["_archive_out_path"]) self.logger.debug("Test config:\n%s", image) - self.logger.info("Test kernel: %s", self['test_kernel']) - self.logger.info("Test rootfs: %s", image['_archive_out_path']) - self.logger.info("Test flag: %s", self['test_flag']) - self.logger.info("QEMU command: %s", ' '.join([str(arg) for arg in qemu_cmd])) + self.logger.info("Test kernel: %s", self["test_kernel"]) + self.logger.info("Test rootfs: %s", image["_archive_out_path"]) + self.logger.info("Test flag: %s", self["test_flag"]) + self.logger.info("QEMU command: %s", " ".join([str(arg) for arg in qemu_cmd])) try: - results = self._run(qemu_cmd, timeout=self['test_timeout']) + results = self._run(qemu_cmd, timeout=self["test_timeout"]) except RuntimeError as e: raise RuntimeError("QEMU test failed: %s" % e) - stdout = results.stdout.decode('utf-8').split('\r\n') + stdout = results.stdout.decode("utf-8").split("\r\n") self.logger.debug("QEMU output: %s", stdout) # Get the time of the kernel panic for line in stdout: - if line.endswith('exitcode=0x00000000'): - panic_time = line.split(']')[0][1:].strip() + if line.endswith("exitcode=0x00000000"): + panic_time = line.split("]")[0][1:].strip() self.logger.info("Test took: %s", panic_time) break else: self.logger.warning("Unable to determine test duration from panic message.") - if self['test_flag'] in stdout: + if self["test_flag"] in stdout: self.logger.info("Test passed") else: self.logger.error("Test failed")