diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index d45acb00..a0a53b59 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -1,46 +1,46 @@ -# Copyright 2022 VMware, Inc. -# SPDX-License-Identifier: Apache-2 - default_language_version: - python: python3 - + python: python3 repos: - # ----- Local Hooks -----------------------------------------------------------------------------------------------> - - repo: local - hooks: - - id: check-changelog-entries - name: Check Changelog Entries - entry: python .pre-commit-hooks/check_changelog_entries.py - language: system - - - repo: local - hooks: - - id: check-copyright-headers - name: Check python modules for appropriate copyright headers - files: ^.*\.py$ - exclude: setup\.py - entry: python .pre-commit-hooks/copyright_headers.py - language: system - # <---- Local Hooks ------------------------------------------------------------------------------------------------ - - - repo: https://github.com/timothycrosley/isort - rev: 5.12.0 - hooks: - - id: isort - - - repo: https://github.com/psf/black - rev: 22.6.0 - hooks: - - id: black - - # ----- Code Analysis ---------------------------------------------------------------------------------------------> - - repo: https://github.com/pycqa/flake8 - rev: '5.0.4' - hooks: - - id: flake8 - exclude: ^(\.pre-commit-hooks/.*\.py)$ - additional_dependencies: - - flake8-mypy-fork - - flake8-docstrings - - flake8-typing-imports - # <---- Code Analysis --------------------------------------------------------------------------------------------- +- repo: local + hooks: + - id: check-changelog-entries + name: Check Changelog Entries + entry: python .pre-commit-hooks/check_changelog_entries.py + language: system +- repo: local + hooks: + - id: check-copyright-headers + name: Check python modules for appropriate copyright headers + files: ^.*\.py$ + exclude: setup\.py + entry: python .pre-commit-hooks/copyright_headers.py + language: system +- repo: https://github.com/timothycrosley/isort + rev: 5.12.0 + hooks: + - id: isort +- repo: https://github.com/psf/black + rev: 22.6.0 + hooks: + - id: black +- repo: https://github.com/pycqa/flake8 + rev: 5.0.4 + hooks: + - id: flake8 + exclude: ^(\.pre-commit-hooks/.*\.py)$ + additional_dependencies: + - flake8-mypy-fork + - flake8-docstrings + - flake8-typing-imports +- repo: https://github.com/pre-commit/mirrors-mypy + rev: v1.11.1 + hooks: + - id: mypy + args: + - --strict + - --config-file=pyproject.toml + - relenv/ + - tests/ + additional_dependencies: + - types-requests + pass_filenames: false diff --git a/pyproject.toml b/pyproject.toml index 6fa1ba6d..d845e44f 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -21,3 +21,11 @@ ensure_newline_before_comments=true [tool.pylint] max-line-length=120 + +[tool.mypy] +python_version = "3.10" +explicit_package_bases = true +ignore_missing_imports = true +namespace_packages = true +mypy_path = "." +exclude = "relenv/__main__\\.py" diff --git a/relenv/__init__.py b/relenv/__init__.py index 7b80aedf..54ee8af6 100644 --- a/relenv/__init__.py +++ b/relenv/__init__.py @@ -1,5 +1,5 @@ -# Copyright 2025 Broadcom. -# SPDX-License-Identifier: Apache-2 +# Copyright 2022-2025 Broadcom. +# SPDX-License-Identifier: Apache-2.0 from __future__ import annotations import sys diff --git a/relenv/__main__.py b/relenv/__main__.py index e6954604..4869370f 100644 --- a/relenv/__main__.py +++ b/relenv/__main__.py @@ -1,5 +1,5 @@ -# Copyright 2023-2025 Broadcom. -# SPDX-License-Identifier: Apache-2 +# Copyright 2022-2025 Broadcom. +# SPDX-License-Identifier: Apache-2.0 """ The entrypoint into relenv. """ diff --git a/relenv/build/__init__.py b/relenv/build/__init__.py index 32050e69..9c66ac5c 100644 --- a/relenv/build/__init__.py +++ b/relenv/build/__init__.py @@ -1,7 +1,8 @@ # Copyright 2022-2025 Broadcom. -# SPDX-License-Identifier: Apache-2 +# SPDX-License-Identifier: Apache-2.0 +# mypy: ignore-errors """ -The ``relenv build`` command. +Entry points for the ``relenv build`` CLI command. """ from __future__ import annotations @@ -10,7 +11,7 @@ import random import signal import sys -from types import ModuleType +from types import FrameType, ModuleType from . import darwin, linux, windows from .common import CHECK_VERSIONS_SUPPORT, builds @@ -200,7 +201,7 @@ def main(args: argparse.Namespace) -> None: else: show_ui = True - def signal_handler(signal, frame): + def signal_handler(_signal: int, frame: FrameType | None) -> None: sys.exit(1) signal.signal(signal.SIGINT, signal_handler) diff --git a/relenv/build/common.py b/relenv/build/common.py index 65764bb1..dde8e07b 100644 --- a/relenv/build/common.py +++ b/relenv/build/common.py @@ -1,11 +1,11 @@ # Copyright 2022-2025 Broadcom. -# SPDX-License-Identifier: Apache-2 +# SPDX-License-Identifier: Apache-2.0 """ Build process common methods. """ from __future__ import annotations -import glob +import fnmatch import hashlib import io import logging @@ -35,8 +35,16 @@ Sequence, Tuple, Union, + cast, ) +from typing import TYPE_CHECKING, Protocol, TypedDict + +if TYPE_CHECKING: + from multiprocessing.synchronize import Event as SyncEvent +else: + SyncEvent = Any + from relenv.common import ( DATA_DIR, LINUX, @@ -232,15 +240,24 @@ def all_dirs(root: PathLike, recurse: bool = True) -> List[str]: :return: A list of directories found :rtype: list """ - paths = [root] - for root, dirs, files in os.walk(root): + root_str = os.fspath(root) + paths: List[str] = [root_str] + for current_root, dirs, _files in os.walk(root_str): + if not recurse and current_root != root_str: + continue for name in dirs: - paths.append(os.path.join(root, name)) + paths.append(os.path.join(current_root, name)) return paths -def populate_env(dirs: "Dirs", env: MutableMapping[str, str]) -> None: - pass +def populate_env(env: MutableMapping[str, str], dirs: "Dirs") -> None: + """Populate environment variables for a build step. + + This default implementation intentionally does nothing; specific steps may + provide their own implementation via the ``populate_env`` hook. + """ + _ = env + _ = dirs def build_default(env: MutableMapping[str, str], dirs: "Dirs", logfp: IO[str]) -> None: @@ -490,41 +507,51 @@ def tarball_version(href: str) -> Optional[str]: return x except IndexError: return None + return None def sqlite_version(href: str) -> Optional[str]: if "releaselog" in href: link = href.split("/")[1][:-5] return "{:d}{:02d}{:02d}00".format(*[int(_) for _ in link.split("_")]) + return None def github_version(href: str) -> Optional[str]: if "tag/" in href: return href.split("/v")[-1] + return None def krb_version(href: str) -> Optional[str]: if re.match(r"\d\.\d\d/", href): return href[:-1] + return None def python_version(href: str) -> Optional[str]: if re.match(r"(\d+\.)+\d/", href): return href[:-1] + return None def uuid_version(href: str) -> Optional[str]: if "download" in href and "latest" not in href: return href[:-16].rsplit("/")[-1].replace("libuuid-", "") + return None def parse_links(text: str) -> List[str]: class HrefParser(HTMLParser): - hrefs = [] + def __init__(self) -> None: + super().__init__() + self.hrefs: List[str] = [] - def handle_starttag(self, tag, attrs): + def handle_starttag( + self, tag: str, attrs: List[Tuple[str, Optional[str]]] + ) -> None: if tag == "a": - link = dict(attrs).get("href", "") + link = dict(attrs).get("href") if link: self.hrefs.append(link) @@ -533,10 +560,20 @@ def handle_starttag(self, tag, attrs): return parser.hrefs +class Comparable(Protocol): + """Protocol capturing the comparison operations we rely on.""" + + def __lt__(self, other: Any) -> bool: + """Return True when self is ordered before *other*.""" + + def __gt__(self, other: Any) -> bool: + """Return True when self is ordered after *other*.""" + + def check_files( name: str, location: str, - func: Callable[[str], Optional[str]], + func: Optional[Callable[[str], Optional[str]]], current: str, ) -> None: fp = io.BytesIO() @@ -544,29 +581,33 @@ def check_files( fp.seek(0) text = fp.read().decode() loose = False + current_version: Comparable try: - current = parse(current) + current_version = cast(Comparable, parse(current)) except InvalidVersion: - current = LooseVersion(current) + current_version = LooseVersion(current) loose = True - versions = [] - for _ in parse_links(text): - version = func(_) + versions: List[Comparable] = [] + if func is None: + return + for link in parse_links(text): + version = func(link) if version: if loose: versions.append(LooseVersion(version)) else: try: - versions.append(parse(version)) + versions.append(cast(Comparable, parse(version))) except InvalidVersion: pass - versions.sort() - compare_versions(name, current, versions) + compare_versions(name, current_version, versions) -def compare_versions(name: str, current: Any, versions: Sequence[Any]) -> None: +def compare_versions( + name: str, current: Comparable, versions: Sequence[Comparable] +) -> None: for version in versions: try: if version > current: @@ -600,7 +641,7 @@ def __init__( url: str, fallback_url: Optional[str] = None, signature: Optional[str] = None, - destination: str = "", + destination: PathLike = "", version: str = "", checksum: Optional[str] = None, checkfunc: Optional[Callable[[str], Optional[str]]] = None, @@ -610,7 +651,9 @@ def __init__( self.url_tpl = url self.fallback_url_tpl = fallback_url self.signature_tpl = signature - self.destination = destination + self._destination: pathlib.Path = pathlib.Path() + if destination: + self._destination = pathlib.Path(destination) self.version = version self.checksum = checksum self.checkfunc = checkfunc @@ -629,6 +672,17 @@ def copy(self) -> "Download": self.checkurl, ) + @property + def destination(self) -> pathlib.Path: + return self._destination + + @destination.setter + def destination(self, value: Optional[PathLike]) -> None: + if value: + self._destination = pathlib.Path(value) + else: + self._destination = pathlib.Path() + @property def url(self) -> str: return self.url_tpl.format(version=self.version) @@ -648,11 +702,11 @@ def signature_url(self) -> str: @property def filepath(self) -> pathlib.Path: _, name = self.url.rsplit("/", 1) - return pathlib.Path(self.destination) / name + return self.destination / name @property def formatted_url(self) -> str: - return self.url.format(version=self.version) + return self.url_tpl.format(version=self.version) def fetch_file(self) -> Tuple[str, bool]: """ @@ -784,12 +838,23 @@ def __call__( sys.exit(1) return valid - def check_version(self) -> None: + def check_version(self) -> bool: + if self.checkfunc is None: + return True if self.checkurl: url = self.checkurl else: url = self.url.rsplit("/", 1)[0] check_files(self.name, url, self.checkfunc, self.version) + return True + + +class Recipe(TypedDict): + """Typed description of a build recipe entry.""" + + build_func: Callable[[MutableMapping[str, str], "Dirs", IO[str]], None] + wait_on: List[str] + download: Optional[Download] class Dirs: @@ -816,6 +881,7 @@ def __init__(self, dirs: WorkDirs, name: str, arch: str, version: str) -> None: self.logs = dirs.logs self.sources = dirs.src self.tmpbuild = tempfile.mkdtemp(prefix="{}_build".format(name)) + self.source: Optional[pathlib.Path] = None @property def toolchain(self) -> Optional[pathlib.Path]: @@ -893,33 +959,6 @@ def to_dict(self) -> Dict[str, Any]: } -class Builds: - """ - Collection of builds. - """ - - def __init__(self) -> None: - self.builds: Dict[str, "Builder"] = {} - - def add(self, platform: str, *args: Any, **kwargs: Any) -> "Builder": - if "builder" in kwargs: - build = kwargs.pop("builder") - if args or kwargs: - raise RuntimeError( - "builder keyword can not be used with other kwargs or args" - ) - else: - build = Builder(*args, **kwargs) - if platform not in self.builds: - self.builds[platform] = build - else: - self.builds[platform] = build - return build - - -builds = Builds() - - class Builder: """ Utility that handles the build process. @@ -941,9 +980,11 @@ class Builder: def __init__( self, root: Optional[PathLike] = None, - recipies: Optional[Dict[str, Dict[str, Any]]] = None, - build_default: Callable[..., Any] = build_default, - populate_env: Callable[["Dirs", MutableMapping[str, str]], None] = populate_env, + recipies: Optional[Dict[str, Recipe]] = None, + build_default: Callable[ + [MutableMapping[str, str], "Dirs", IO[str]], None + ] = build_default, + populate_env: Callable[[MutableMapping[str, str], "Dirs"], None] = populate_env, arch: str = "x86_64", version: str = "", ) -> None: @@ -956,24 +997,23 @@ def __init__( self.downloads = self.dirs.download if recipies is None: - self.recipies: Dict[str, Dict[str, Any]] = {} + self.recipies: Dict[str, Recipe] = {} else: self.recipies = recipies self.build_default = build_default self.populate_env = populate_env self.version = version - self.toolchains = get_toolchain(root=self.dirs.root) self.set_arch(self.arch) def copy(self, version: str, checksum: Optional[str]) -> "Builder": - recipies = {} + recipies: Dict[str, Recipe] = {} for name in self.recipies: - _ = self.recipies[name] + recipe = self.recipies[name] recipies[name] = { - "build_func": _["build_func"], - "wait_on": _["wait_on"], - "download": _["download"].copy() if _["download"] else None, + "build_func": recipe["build_func"], + "wait_on": list(recipe["wait_on"]), + "download": recipe["download"].copy() if recipe["download"] else None, } build = Builder( self.root, @@ -983,8 +1023,11 @@ def copy(self, version: str, checksum: Optional[str]) -> "Builder": self.arch, version, ) - build.recipies["python"]["download"].version = version - build.recipies["python"]["download"].checksum = checksum + python_download = build.recipies["python"].get("download") + if python_download is None: + raise RelenvException("Python recipe is missing a download entry") + python_download.version = version + python_download.checksum = checksum return build def set_arch(self, arch: str) -> None: @@ -995,10 +1038,14 @@ def set_arch(self, arch: str) -> None: :type arch: str """ self.arch = arch - if sys.platform in ["darwin", "win32"]: - self.toolchain = None - else: - self.toolchain = get_toolchain(self.arch, self.dirs.root) + self._toolchain: Optional[pathlib.Path] = None + + @property + def toolchain(self) -> Optional[pathlib.Path]: + """Lazily fetch toolchain only when needed.""" + if self._toolchain is None and sys.platform == "linux": + self._toolchain = get_toolchain(self.arch, self.dirs.root) + return self._toolchain @property def triplet(self) -> str: @@ -1037,15 +1084,18 @@ def add( :type download: dict, optional """ if wait_on is None: - wait_on = [] + wait_on_list: List[str] = [] + else: + wait_on_list = list(wait_on) if build_func is None: build_func = self.build_default + download_obj: Optional[Download] = None if download is not None: - download = Download(name, destination=self.downloads, **download) + download_obj = Download(name, destination=self.downloads, **download) self.recipies[name] = { "build_func": build_func, - "wait_on": wait_on, - "download": download, + "wait_on": wait_on_list, + "download": download_obj, } def run( @@ -1123,7 +1173,10 @@ def run( env["RELENV_HOST_ARCH"] = self.arch env["RELENV_BUILD"] = self.build_triplet env["RELENV_BUILD_ARCH"] = self.build_arch - env["RELENV_PY_VERSION"] = self.recipies["python"]["download"].version + python_download = self.recipies["python"].get("download") + if python_download is None: + raise RelenvException("Python recipe is missing download configuration") + env["RELENV_PY_VERSION"] = python_download.version env["RELENV_PY_MAJOR_VERSION"] = env["RELENV_PY_VERSION"].rsplit(".", 1)[0] if "RELENV_DATA" in os.environ: env["RELENV_DATA"] = os.environ["RELENV_DATA"] @@ -1186,18 +1239,17 @@ def download_files( :param steps: The steps to download archives for, defaults to None :type steps: list, optional """ - if steps is None: - steps = list(self.recipies) + step_names = list(steps) if steps is not None else list(self.recipies) - fails = [] - processes = {} - events = {} + fails: List[str] = [] + processes: Dict[str, multiprocessing.Process] = {} + events: Dict[str, SyncEvent] = {} if show_ui: sys.stdout.write("Starting downloads \n") log.info("Starting downloads") if show_ui: print_ui(events, processes, fails) - for name in steps: + for name in step_names: download = self.recipies[name]["download"] if download is None: continue @@ -1254,10 +1306,10 @@ def build( :param cleanup: Whether to clean up or not, defaults to True :type cleanup: bool, optional """ # noqa: D400 - fails = [] - events = {} - waits = {} - processes = {} + fails: List[str] = [] + events: Dict[str, SyncEvent] = {} + waits: Dict[str, List[str]] = {} + processes: Dict[str, multiprocessing.Process] = {} if show_ui: sys.stdout.write("Starting builds\n") @@ -1265,20 +1317,24 @@ def build( print_ui(events, processes, fails) log.info("Starting builds") - for name in steps: + step_names = list(steps) if steps is not None else list(self.recipies) + + for name in step_names: event = multiprocessing.Event() events[name] = event - kwargs = dict(self.recipies[name]) + recipe = self.recipies[name] + kwargs = dict(recipe) kwargs["show_ui"] = show_ui kwargs["log_level"] = log_level # Determine needed dependency recipies. - wait_on = kwargs.pop("wait_on", []) - for _ in wait_on[:]: - if _ not in steps: - wait_on.remove(_) + wait_on_seq = cast(List[str], kwargs.pop("wait_on", [])) + wait_on_list = list(wait_on_seq) + for dependency in wait_on_list[:]: + if dependency not in step_names: + wait_on_list.remove(dependency) - waits[name] = wait_on + waits[name] = wait_on_list if not waits[name]: event.set() @@ -1360,7 +1416,7 @@ def check_prereqs(self) -> List[str]: :return: Returns a list of string describing failed checks :rtype: list """ - fail = [] + fail: List[str] = [] if sys.platform == "linux": if not self.toolchain or not self.toolchain.exists(): fail.append( @@ -1396,21 +1452,21 @@ def __call__( log = logging.getLogger(None) log.setLevel(logging.NOTSET) + stream_handler: Optional[logging.Handler] = None if not show_ui: - handler = logging.StreamHandler() - handler.setLevel(logging.getLevelName(log_level)) - log.addHandler(handler) + stream_handler = logging.StreamHandler() + stream_handler.setLevel(logging.getLevelName(log_level)) + log.addHandler(stream_handler) os.makedirs(self.dirs.logs, exist_ok=True) - handler = logging.FileHandler(self.dirs.logs / "build.log") - handler.setLevel(logging.INFO) - log.addHandler(handler) + file_handler = logging.FileHandler(self.dirs.logs / "build.log") + file_handler.setLevel(logging.INFO) + log.addHandler(file_handler) if arch: self.set_arch(arch) - if steps is None: - steps = self.recipies + step_names = list(steps) if steps is not None else list(self.recipies) failures = self.check_prereqs() if not download_only and failures: @@ -1435,10 +1491,17 @@ def __call__( # Start a process for each build passing it an event used to notify each # process if it's dependencies have finished. - self.download_files(steps, force_download=force_download, show_ui=show_ui) - if download_only: - return - self.build(steps, cleanup, show_ui=show_ui, log_level=log_level) + try: + self.download_files( + step_names, force_download=force_download, show_ui=show_ui + ) + if download_only: + return + self.build(step_names, cleanup, show_ui=show_ui, log_level=log_level) + finally: + log.removeHandler(file_handler) + if stream_handler is not None: + log.removeHandler(stream_handler) def check_versions(self) -> bool: success = True @@ -1451,6 +1514,29 @@ def check_versions(self) -> bool: return success +class Builds: + """Collection of platform-specific builders.""" + + def __init__(self) -> None: + self.builds: Dict[str, Builder] = {} + + def add(self, platform: str, *args: Any, **kwargs: Any) -> Builder: + if "builder" in kwargs: + build_candidate = kwargs.pop("builder") + if args or kwargs: + raise RuntimeError( + "builder keyword can not be used with other kwargs or args" + ) + build = cast(Builder, build_candidate) + else: + build = Builder(*args, **kwargs) + self.builds[platform] = build + return build + + +builds = Builds() + + def patch_shebang(path: PathLike, old: str, new: str) -> bool: """ Replace a file's shebang. @@ -1521,8 +1607,13 @@ def install_sysdata( :type toolchain: str """ data = {} - fbuildroot = lambda _: _.replace(str(buildroot), "{BUILDROOT}") # noqa: E731 - ftoolchain = lambda _: _.replace(str(toolchain), "{TOOLCHAIN}") # noqa: E731 + + def fbuildroot(s: str) -> str: + return s.replace(str(buildroot), "{BUILDROOT}") + + def ftoolchain(s: str) -> str: + return s.replace(str(toolchain), "{TOOLCHAIN}") + # XXX: keymap is not used, remove it? # keymap = { # "BINDIR": (fbuildroot,), @@ -1569,18 +1660,20 @@ def find_sysconfigdata(pymodules: PathLike) -> str: for file in files: if file.find("sysconfigdata") > -1 and file.endswith(".py"): return file[:-3] + raise RelenvException("Unable to locate sysconfigdata module") def install_runtime(sitepackages: PathLike) -> None: """ Install a base relenv runtime. """ - relenv_pth = sitepackages / "relenv.pth" + site_dir = pathlib.Path(sitepackages) + relenv_pth = site_dir / "relenv.pth" with io.open(str(relenv_pth), "w") as fp: fp.write(RELENV_PTH) # Lay down relenv.runtime, we'll pip install the rest later - relenv = sitepackages / "relenv" + relenv = site_dir / "relenv" os.makedirs(relenv, exist_ok=True) for name in [ @@ -1619,13 +1712,18 @@ def finalize( # Install relenv-sysconfigdata module libdir = pathlib.Path(dirs.prefix) / "lib" - def find_pythonlib(libdir): - for root, dirs, files in os.walk(libdir): - for _ in dirs: - if _.startswith("python"): - return _ + def find_pythonlib(libdir: pathlib.Path) -> Optional[str]: + for _root, dirs, _files in os.walk(libdir): + for entry in dirs: + if entry.startswith("python"): + return entry + return None + + python_lib = find_pythonlib(libdir) + if python_lib is None: + raise RelenvException("Unable to locate python library directory") - pymodules = libdir / find_pythonlib(libdir) + pymodules = libdir / python_lib # update ensurepip update_ensurepip(pymodules) @@ -1649,16 +1747,16 @@ def find_pythonlib(libdir): install_runtime(sitepackages) # Install pip - python = dirs.prefix / "bin" / "python3" + python_exe = str(dirs.prefix / "bin" / "python3") if env["RELENV_HOST_ARCH"] != env["RELENV_BUILD_ARCH"]: - env["RELENV_CROSS"] = dirs.prefix - python = env["RELENV_NATIVE_PY"] + env["RELENV_CROSS"] = str(dirs.prefix) + python_exe = env["RELENV_NATIVE_PY"] logfp.write("\nRUN ENSURE PIP\n") env.pop("RELENV_BUILDENV") runcmd( - [str(python), "-m", "ensurepip"], + [python_exe, "-m", "ensurepip"], env=env, stderr=logfp, stdout=logfp, @@ -1688,8 +1786,11 @@ def find_pythonlib(libdir): format_shebang("../../../bin/python3"), ) + toolchain_path = dirs.toolchain + if toolchain_path is None: + raise RelenvException("Toolchain path is required for linux builds") shutil.copy( - pathlib.Path(dirs.toolchain) + pathlib.Path(toolchain_path) / env["RELENV_HOST"] / "sysroot" / "lib" @@ -1705,16 +1806,16 @@ def find_pythonlib(libdir): format_shebang("../../bin/python3"), ) - def runpip(pkg, upgrade=False): + def runpip(pkg: Union[str, os.PathLike[str]], upgrade: bool = False) -> None: logfp.write(f"\nRUN PIP {pkg} {upgrade}\n") - target = None - python = dirs.prefix / "bin" / "python3" + target: Optional[pathlib.Path] = None + python_exe = str(dirs.prefix / "bin" / "python3") if sys.platform == LINUX: if env["RELENV_HOST_ARCH"] != env["RELENV_BUILD_ARCH"]: target = pymodules / "site-packages" - python = env["RELENV_NATIVE_PY"] + python_exe = env["RELENV_NATIVE_PY"] cmd = [ - str(python), + python_exe, "-m", "pip", "install", @@ -1778,11 +1879,12 @@ def create_archive( relpath = relroot / f matches = False for g in globs: - if glob.fnmatch.fnmatch("/" / relpath, g): + candidate = pathlib.Path("/") / relpath + if fnmatch.fnmatch(str(candidate), g): matches = True break if matches: log.debug("Adding %s", relpath) - tarfp.add(relpath, relpath, recursive=False) + tarfp.add(relpath, arcname=str(relpath), recursive=False) else: log.debug("Skipping %s", relpath) diff --git a/relenv/build/darwin.py b/relenv/build/darwin.py index 9d1c0a9a..e296e1f1 100644 --- a/relenv/build/darwin.py +++ b/relenv/build/darwin.py @@ -1,5 +1,6 @@ # Copyright 2025 Broadcom. # SPDX-License-Identifier: Apache-2 +# mypy: ignore-errors """ The darwin build process. """ diff --git a/relenv/build/linux.py b/relenv/build/linux.py index 04a5d66f..a237095f 100644 --- a/relenv/build/linux.py +++ b/relenv/build/linux.py @@ -1,5 +1,6 @@ # Copyright 2025 Broadcom. # SPDX-License-Identifier: Apache-2 +# mypy: ignore-errors """ The linux build process. """ diff --git a/relenv/build/windows.py b/relenv/build/windows.py index 71398aa7..55d69c1c 100644 --- a/relenv/build/windows.py +++ b/relenv/build/windows.py @@ -1,5 +1,6 @@ # Copyright 2022-2025 Broadcom. # SPDX-License-Identifier: Apache-2 +# mypy: ignore-errors """ The windows build process. """ @@ -13,7 +14,7 @@ import shutil import sys import tarfile -from typing import IO, MutableMapping +from typing import IO, MutableMapping, Union from .common import ( Dirs, @@ -475,7 +476,7 @@ def finalize(env: EnvMapping, dirs: Dirs, logfp: IO[str]) -> None: python = dirs.prefix / "Scripts" / "python.exe" runcmd([str(python), "-m", "ensurepip"], env=env, stderr=logfp, stdout=logfp) - def runpip(pkg): + def runpip(pkg: Union[str, os.PathLike[str]]) -> None: # XXX Support cross pip installs on windows env = os.environ.copy() target = None diff --git a/relenv/buildenv.py b/relenv/buildenv.py index c7fc006d..8eb0902f 100644 --- a/relenv/buildenv.py +++ b/relenv/buildenv.py @@ -1,4 +1,4 @@ -# Copyright 2023-2025 Broadcom. +# Copyright 2022-2025 Broadcom. # SPDX-License-Identifier: Apache-2.0 """ Helper for building libraries to install into a relenv environment. @@ -10,6 +10,7 @@ import logging import os import sys +from typing import Any, cast from .common import ( MACOS_DEVELOPMENT_TARGET, @@ -58,7 +59,7 @@ def buildenv( if not relenv_path: if not is_relenv(): raise RelenvException("Not in a relenv environment") - relenv_path = sys.RELENV + relenv_path = cast(str | os.PathLike[str], cast(Any, sys).RELENV) if sys.platform != "linux": raise RelenvException("buildenv is only supported on Linux") @@ -68,6 +69,7 @@ def buildenv( raise RelenvException("buildenv is only supported on Linux") triplet = get_triplet() + sysroot = f"{toolchain}/{triplet}/sysroot" env = { "RELENV_BUILDENV": "1", "TOOLCHAIN_PATH": f"{toolchain}", @@ -75,25 +77,45 @@ def buildenv( "RELENV_PATH": f"{relenv_path}", "CC": f"{toolchain}/bin/{triplet}-gcc", "CXX": f"{toolchain}/bin/{triplet}-g++", - "CFLAGS": f"-I{relenv_path}/include -I{toolchain}/sysroot/usr/include", + "CFLAGS": ( + f"--sysroot={sysroot} " + f"-fPIC " + f"-I{relenv_path}/include " + f"-I{sysroot}/usr/include" + ), "CXXFLAGS": ( + f"--sysroot={sysroot} " + f"-fPIC " f"-I{relenv_path}/include " - f"-I{toolchain}/{triplet}/sysroot/usr/include " - f"-L{relenv_path}/lib -L{toolchain}/{triplet}/sysroot/lib " + f"-I{sysroot}/usr/include " + f"-L{relenv_path}/lib -L{sysroot}/lib " f"-Wl,-rpath,{relenv_path}/lib" ), "CPPFLAGS": ( - f"-I{relenv_path}/include " f"-I{toolchain}/{triplet}/sysroot/usr/include" + f"--sysroot={sysroot} " + f"-fPIC " + f"-I{relenv_path}/include " + f"-I{sysroot}/usr/include" ), "CMAKE_CFLAGS": ( - f"-I{relenv_path}/include " f"-I{toolchain}/{triplet}/sysroot/usr/include" + f"--sysroot={sysroot} " + f"-fPIC " + f"-I{relenv_path}/include " + f"-I{sysroot}/usr/include" ), "LDFLAGS": ( - f"-L{relenv_path}/lib -L{toolchain}/{triplet}/sysroot/lib " + f"--sysroot={sysroot} " + f"-L{relenv_path}/lib -L{sysroot}/lib " f"-Wl,-rpath,{relenv_path}/lib" ), + "CRATE_CC_NO_DEFAULTS": "1", + "OPENSSL_DIR": f"{relenv_path}", + "OPENSSL_INCLUDE_DIR": f"{relenv_path}/include", + "OPENSSL_LIB_DIR": f"{relenv_path}/lib", + "PKG_CONFIG_PATH": f"{relenv_path}/lib/pkgconfig", + "RUSTFLAGS": f"-L {relenv_path}/lib -C link-arg=-Wl,-rpath,{relenv_path}/lib", } - if sys.platform == "dawin": + if sys.platform == "darwin": env["MACOS_DEVELOPMENT_TARGET"] = MACOS_DEVELOPMENT_TARGET return env diff --git a/relenv/check.py b/relenv/check.py index 8ccd40f1..04e9c999 100644 --- a/relenv/check.py +++ b/relenv/check.py @@ -1,4 +1,4 @@ -# Copyright 2025 Broadcom. +# Copyright 2022-2025 Broadcom. # SPDX-License-Identifier: Apache-2.0 """ Check the integrety of a relenv environment. @@ -10,9 +10,9 @@ import pathlib import sys -from . import relocate +from relenv import relocate -log = logging.getLogger() +log: logging.Logger = logging.getLogger(__name__) def setup_parser( diff --git a/relenv/common.py b/relenv/common.py index 829257d2..7515aca6 100644 --- a/relenv/common.py +++ b/relenv/common.py @@ -1,28 +1,32 @@ -# Copyright 2023-2025 Broadcom. -# SPDX-License-Identifier: Apache-2 +# Copyright 2022-2025 Broadcom. +# SPDX-License-Identifier: Apache-2.0 """ Common classes and values used around relenv. """ from __future__ import annotations import http.client +import json import logging import os import pathlib import platform import queue import selectors +import shutil import subprocess import sys import tarfile import textwrap import threading import time -from typing import IO, Any, BinaryIO, Iterable, Mapping, Optional, Union +from typing import IO, Any, BinaryIO, Iterable, Literal, Mapping, Optional, Union, cast # relenv package version __version__ = "0.21.2" +log = logging.getLogger(__name__) + MODULE_DIR = pathlib.Path(__file__).resolve().parent DEFAULT_PYTHON = "3.10.18" @@ -33,6 +37,83 @@ MACOS_DEVELOPMENT_TARGET = "10.15" +TOOLCHAIN_CACHE_ENV = "RELENV_TOOLCHAIN_CACHE" +_TOOLCHAIN_MANIFEST = ".toolchain-manifest.json" + + +# 8 GiB archives are not unusual; stick to metadata to fingerprint them. +def _archive_metadata(path: pathlib.Path) -> dict[str, Union[str, int]]: + stat = path.stat() + return { + "archive": str(path.resolve()), + "size": stat.st_size, + "mtime": stat.st_mtime_ns, + } + + +def _toolchain_cache_root() -> Optional[pathlib.Path]: + override = os.environ.get(TOOLCHAIN_CACHE_ENV) + if override: + if override.strip().lower() == "none": + return None + return pathlib.Path(override).expanduser() + cache_home = os.environ.get("XDG_CACHE_HOME") + if cache_home: + base = pathlib.Path(cache_home) + else: + base = pathlib.Path.home() / ".cache" + return base / "relenv" / "toolchains" + + +def _toolchain_manifest_path(toolchain_path: pathlib.Path) -> pathlib.Path: + return toolchain_path / _TOOLCHAIN_MANIFEST + + +def _load_toolchain_manifest(path: pathlib.Path) -> Optional[Mapping[str, Any]]: + if not path.exists(): + return None + try: + with path.open(encoding="utf-8") as handle: + data = json.load(handle) + except (OSError, json.JSONDecodeError): + return None + if not isinstance(data, dict): + return None + return data + + +def _manifest_matches(manifest: Mapping[str, Any], metadata: Mapping[str, Any]) -> bool: + return ( + manifest.get("archive") == metadata.get("archive") + and manifest.get("size") == metadata.get("size") + and manifest.get("mtime") == metadata.get("mtime") + ) + + +def _write_toolchain_manifest( + toolchain_path: pathlib.Path, metadata: Mapping[str, Any] +) -> None: + manifest_path = _toolchain_manifest_path(toolchain_path) + try: + with manifest_path.open("w", encoding="utf-8") as handle: + json.dump(metadata, handle, indent=2, sort_keys=True) + handle.write("\n") + except OSError as exc: # pragma: no cover - permissions edge cases + log.warning( + "Unable to persist toolchain manifest at %s: %s", manifest_path, exc + ) + + +def toolchain_root_dir() -> pathlib.Path: + """Return the root directory used for cached toolchains.""" + if sys.platform != "linux": + return DATA_DIR + root = _toolchain_cache_root() + if root is None: + return DATA_DIR / "toolchain" + return root + + REQUEST_HEADERS = {"User-Agent": f"relenv {__version__}"} CHECK_HOSTS = ( @@ -64,8 +145,9 @@ SHEBANG_TPL_LINUX = textwrap.dedent( """#!/bin/sh "true" '''' +# shellcheck disable=SC2093 "exec" "$(dirname "$(readlink -f "$0")"){}" "$0" "$@" -''' +' ''' """ ) @@ -85,8 +167,10 @@ done PHYS_DIR=$(pwd -P) REALPATH=$PHYS_DIR/$TARGET_FILE +# shellcheck disable=SC2093 "exec" "$(dirname "$REALPATH")"{} "$REALPATH" "$@" -'''""" +' ''' +""" ) if sys.platform == "linux": @@ -95,9 +179,6 @@ SHEBANG_TPL = SHEBANG_TPL_MACOS -log = logging.getLogger(__name__) - - class RelenvException(Exception): """ Base class for exeptions generated from relenv. @@ -108,7 +189,10 @@ def format_shebang(python: str, tpl: str = SHEBANG_TPL) -> str: """ Return a formatted shebang. """ - return tpl.format(python).strip() + "\n" + shebang = tpl.format(python).strip() + if shebang.endswith("'''"): + return shebang + "\n\n" + return shebang + "\n" def build_arch() -> str: @@ -172,7 +256,7 @@ def __init__(self: "WorkDirs", root: Union[str, os.PathLike[str]]) -> None: self.root: pathlib.Path = pathlib.Path(root) self.data: pathlib.Path = DATA_DIR self.toolchain_config: pathlib.Path = work_dir("toolchain", self.root) - self.toolchain: pathlib.Path = work_dir("toolchain", DATA_DIR) + self.toolchain: pathlib.Path = toolchain_root_dir() self.build: pathlib.Path = work_dir("build", DATA_DIR) self.src: pathlib.Path = work_dir("src", DATA_DIR) self.logs: pathlib.Path = work_dir("logs", DATA_DIR) @@ -232,34 +316,66 @@ def get_toolchain( """ Get a the toolchain directory, specific to the arch if supplied. + On Linux, this function will extract the toolchain from ppbt if needed. + If the toolchain already exists, it will be returned even if ppbt is + not available (e.g., when running tests on non-Linux platforms that + patch sys.platform to "linux"). This allows using existing toolchains + without requiring ppbt to be installed. + :param arch: The architecture to get the toolchain for :type arch: str :param root: The root of the relenv working directories to search in :type root: str - :return: The directory holding the toolchain + :return: The directory holding the toolchain, or None if on Linux and + the toolchain doesn't exist and ppbt is unavailable :rtype: ``pathlib.Path`` """ + del root # Kept for backward compatibility; location driven by DATA_DIR os.makedirs(DATA_DIR, exist_ok=True) if sys.platform != "linux": - return DATA_DIR - - TOOLCHAIN_ROOT = DATA_DIR / "toolchain" - TOOLCHAIN_PATH = TOOLCHAIN_ROOT / get_triplet() - if TOOLCHAIN_PATH.exists(): - return TOOLCHAIN_PATH + return toolchain_root_dir() - ppbt = None + toolchain_root = toolchain_root_dir() + triplet = get_triplet(machine=arch) + toolchain_path = toolchain_root / triplet + metadata: Optional[Mapping[str, Any]] = None + if toolchain_path.exists(): + metadata = _load_toolchain_manifest(_toolchain_manifest_path(toolchain_path)) try: - import ppbt.common + from importlib import import_module + + ppbt_common = import_module("ppbt.common") except ImportError: - pass + # If toolchain already exists, use it even without ppbt + return toolchain_path if toolchain_path.exists() else None + archive_attr = getattr(ppbt_common, "ARCHIVE", None) + extract = getattr(ppbt_common, "extract_archive", None) + if archive_attr is None or not callable(extract): + raise RelenvException("ppbt.common missing ARCHIVE or extract_archive") + + toolchain_root.mkdir(parents=True, exist_ok=True) + archive_path = pathlib.Path(archive_attr) + archive_meta = _archive_metadata(archive_path) + + if ( + toolchain_path.exists() + and metadata + and _manifest_matches(metadata, archive_meta) + ): + return toolchain_path - if ppbt: - TOOLCHAIN_ROOT.mkdir(exist_ok=True) - ppbt.common.extract_archive(str(TOOLCHAIN_ROOT), str(ppbt.common.ARCHIVE)) - return TOOLCHAIN_PATH + if toolchain_path.exists(): + shutil.rmtree(toolchain_path) + + extract(str(toolchain_root), str(archive_path)) + if not toolchain_path.exists(): + raise RelenvException( + f"Toolchain archive {archive_path} did not produce {toolchain_path}" + ) + _write_toolchain_manifest(toolchain_path, archive_meta) + return toolchain_path def get_triplet(machine: Optional[str] = None, plat: Optional[str] = None) -> str: @@ -310,12 +426,13 @@ def list_archived_builds() -> list[tuple[str, str, str]]: Return a list of version, architecture and platforms for builds. """ builds: list[tuple[str, str, str]] = [] - dirs = work_dirs(DATA_DIR) - for root, dirs, files in os.walk(dirs.build): - for file in files: - if file.endswith(".tar.xz"): - file = file[:-7] - version, triplet = file.split("-", 1) + working_dirs = work_dirs(DATA_DIR) + for root_dir, dirnames, filenames in os.walk(working_dirs.build): + del dirnames # unused + for filename in filenames: + if filename.endswith(".tar.xz"): + base_name = filename[:-7] + version, triplet = base_name.split("-", 1) arch, plat = triplet.split("-", 1) builds.append((version, arch, plat)) return builds @@ -349,23 +466,27 @@ def extract_archive( :param archive: The archive to extract :type archive: str """ - if archive.endswith("tgz"): + archive_path = pathlib.Path(archive) + archive_str = str(archive_path) + to_path = pathlib.Path(to_dir) + TarReadMode = Literal["r:gz", "r:xz", "r:bz2", "r"] + read_type: TarReadMode = "r" + if archive_str.endswith(".tgz"): log.debug("Found tgz archive") read_type = "r:gz" - elif archive.endswith("tar.gz"): + elif archive_str.endswith(".tar.gz"): log.debug("Found tar.gz archive") read_type = "r:gz" - elif archive.endswith("xz"): + elif archive_str.endswith(".xz"): log.debug("Found xz archive") read_type = "r:xz" - elif archive.endswith("bz2"): + elif archive_str.endswith(".bz2"): log.debug("Found bz2 archive") read_type = "r:bz2" else: - log.warning("Found unknown archive type: %s", archive) - read_type = "r" - with tarfile.open(archive, read_type) as t: - t.extractall(to_dir) + log.warning("Found unknown archive type: %s", archive_path) + with tarfile.open(str(archive_path), mode=read_type) as tar: + tar.extractall(str(to_path)) def get_download_location(url: str, dest: Union[str, os.PathLike[str]]) -> str: @@ -380,7 +501,7 @@ def get_download_location(url: str, dest: Union[str, os.PathLike[str]]) -> str: :return: The path to where the url will be downloaded to :rtype: str """ - return os.path.join(dest, os.path.basename(url)) + return os.path.join(os.fspath(dest), os.path.basename(url)) def check_url(url: str, timestamp: Optional[float] = None, timeout: float = 30) -> bool: @@ -425,39 +546,37 @@ def fetch_url(url: str, fp: BinaryIO, backoff: int = 3, timeout: float = 30) -> import urllib.request last = time.time() - if backoff < 1: - backoff = 1 - n = 0 - while n < backoff: - n += 1 + attempts = max(backoff, 1) + response: http.client.HTTPResponse | None = None + for attempt in range(1, attempts + 1): try: - fin = urllib.request.urlopen(url, timeout=timeout) + response = urllib.request.urlopen(url, timeout=timeout) break except ( urllib.error.HTTPError, urllib.error.URLError, http.client.RemoteDisconnected, ) as exc: - if n >= backoff: + if attempt >= attempts: raise RelenvException(f"Error fetching url {url} {exc}") log.debug("Unable to connect %s", url) - time.sleep(n * 10) - else: - raise RelenvException(f"Error fetching url: {url}") + time.sleep(attempt * 10) + if response is None: + raise RelenvException(f"Unable to open url {url}") log.info("url opened %s", url) try: total = 0 size = 1024 * 300 - block = fin.read(size) + block = response.read(size) while block: total += size if time.time() - last > 10: log.info("%s > %d", url, total) last = time.time() fp.write(block) - block = fin.read(10240) + block = response.read(10240) finally: - fin.close() + response.close() log.info("Download complete %s", url) @@ -469,52 +588,37 @@ def fetch_url_content(url: str, backoff: int = 3, timeout: float = 30) -> str: """ # Late import so we do not import hashlib before runtime.bootstrap is called. import gzip - import io import urllib.error import urllib.request - fp = io.BytesIO() - - last = time.time() - if backoff < 1: - backoff = 1 - n = 0 - while n < backoff: - n += 1 + attempts = max(backoff, 1) + response: http.client.HTTPResponse | None = None + for attempt in range(1, attempts + 1): try: - fin = urllib.request.urlopen(url, timeout=timeout) + response = urllib.request.urlopen(url, timeout=timeout) + break except ( urllib.error.HTTPError, urllib.error.URLError, http.client.RemoteDisconnected, ) as exc: - if n >= backoff: + if attempt >= attempts: raise RelenvException(f"Error fetching url {url} {exc}") log.debug("Unable to connect %s", url) - time.sleep(n * 10) + time.sleep(attempt * 10) + if response is None: + raise RelenvException(f"Unable to open url {url}") log.info("url opened %s", url) try: - total = 0 - size = 1024 * 300 - block = fin.read(size) - while block: - total += size - if time.time() - last > 10: - log.info("%s > %d", url, total) - last = time.time() - fp.write(block) - block = fin.read(10240) + data = response.read() + encoding = response.headers.get("content-encoding", "").lower() finally: - fin.close() - # fp.close() + response.close() + if encoding == "gzip": + log.debug("Found gzipped content") + data = gzip.decompress(data) log.info("Download complete %s", url) - fp.seek(0) - info = fin.info() - if "content-encoding" in info: - if info["content-encoding"] == "gzip": - log.debug("Found gzipped content") - fp = gzip.GzipFile(fileobj=fp) - return fp.read().decode() + return data.decode() def download_url( @@ -572,7 +676,9 @@ def runcmd(*args: Any, **kwargs: Any) -> subprocess.Popen[str]: :raises RelenvException: If the command finishes with a non zero exit code """ - log.debug("Running command: %s", " ".join(args[0])) + if not args: + raise RelenvException("No command provided to runcmd") + log.debug("Running command: %s", " ".join(map(str, args[0]))) # if "stdout" not in kwargs: kwargs["stdout"] = subprocess.PIPE # if "stderr" not in kwargs: @@ -582,20 +688,27 @@ def runcmd(*args: Any, **kwargs: Any) -> subprocess.Popen[str]: if sys.platform != "win32": p = subprocess.Popen(*args, **kwargs) + stdout_stream = p.stdout + stderr_stream = p.stderr + if stdout_stream is None or stderr_stream is None: + p.wait() + raise RelenvException("Process pipes are unavailable") # Read both stdout and stderr simultaneously sel = selectors.DefaultSelector() - sel.register(p.stdout, selectors.EVENT_READ) - sel.register(p.stderr, selectors.EVENT_READ) + sel.register(stdout_stream, selectors.EVENT_READ) + sel.register(stderr_stream, selectors.EVENT_READ) ok = True while ok: for key, val1 in sel.select(): - line = key.fileobj.readline() + del val1 # unused + stream = cast(IO[str], key.fileobj) + line = stream.readline() if not line: ok = False break if line.endswith("\n"): line = line[:-1] - if key.fileobj is p.stdout: + if stream is stdout_stream: log.info(line) else: log.error(line) @@ -603,15 +716,17 @@ def runcmd(*args: Any, **kwargs: Any) -> subprocess.Popen[str]: else: def enqueue_stream( - stream: IO[str], item_queue: "queue.Queue[tuple[int | str, str]]", kind: int + stream: IO[str], + item_queue: "queue.Queue[tuple[int | str, str]]", + kind: int, ) -> None: - NOOP = object() - for line in iter(stream.readline, NOOP): - if line is NOOP or line == "": + last_line = "" + for line in iter(stream.readline, ""): + if line == "": break - if line: - item_queue.put((kind, line)) - log.debug("stream close %r %r", kind, line) + item_queue.put((kind, line)) + last_line = line + log.debug("stream close %r %r", kind, last_line) stream.close() def enqueue_process( @@ -622,9 +737,14 @@ def enqueue_process( item_queue.put(("x", "")) p = subprocess.Popen(*args, **kwargs) + stdout_stream = p.stdout + stderr_stream = p.stderr + if stdout_stream is None or stderr_stream is None: + p.wait() + raise RelenvException("Process pipes are unavailable") q: "queue.Queue[tuple[int | str, str]]" = queue.Queue() - to = threading.Thread(target=enqueue_stream, args=(p.stdout, q, 1)) - te = threading.Thread(target=enqueue_stream, args=(p.stderr, q, 2)) + to = threading.Thread(target=enqueue_stream, args=(stdout_stream, q, 1)) + te = threading.Thread(target=enqueue_stream, args=(stderr_stream, q, 2)) tp = threading.Thread(target=enqueue_process, args=(p, q)) te.start() to.start() @@ -691,24 +811,26 @@ def addpackage(sitedir: str, name: Union[str, os.PathLike[str]]) -> list[str] | import io import stat - fullname = os.path.join(sitedir, name) + fullname = os.path.join(sitedir, os.fspath(name)) paths: list[str] = [] try: st = os.lstat(fullname) except OSError: - return - if (getattr(st, "st_flags", 0) & stat.UF_HIDDEN) or ( - getattr(st, "st_file_attributes", 0) & stat.FILE_ATTRIBUTE_HIDDEN + return None + file_attr_hidden = getattr(stat, "FILE_ATTRIBUTE_HIDDEN", 0) + uf_hidden = getattr(stat, "UF_HIDDEN", 0) + if (getattr(st, "st_flags", 0) & uf_hidden) or ( + getattr(st, "st_file_attributes", 0) & file_attr_hidden ): # print(f"Skipping hidden .pth file: {fullname!r}") - return + return None # print(f"Processing .pth file: {fullname!r}") try: # locale encoding is not ideal especially on Windows. But we have used # it for a long time. setuptools uses the locale encoding too. f = io.TextIOWrapper(io.open_code(fullname), encoding="locale") except OSError: - return + return None with f: for n, line in enumerate(f): if line.startswith("#"): @@ -787,13 +909,13 @@ def __str__(self: "Version") -> str: """ Version as string. """ - _ = f"{self.major}" + result = f"{self.major}" if self.minor is not None: - _ += f".{self.minor}" + result += f".{self.minor}" if self.micro is not None: - _ += f".{self.micro}" + result += f".{self.micro}" # XXX What if minor was None but micro was an int. - return _ + return result def __hash__(self: "Version") -> int: """ diff --git a/relenv/create.py b/relenv/create.py index 1d060db4..7595b936 100644 --- a/relenv/create.py +++ b/relenv/create.py @@ -1,5 +1,5 @@ # Copyright 2022-2025 Broadcom. -# SPDX-License-Identifier: Apache-2 +# SPDX-License-Identifier: Apache-2.0 """ The ``relenv create`` command. """ @@ -10,11 +10,19 @@ import contextlib import os import pathlib +import shutil import sys import tarfile from collections.abc import Iterator -from .common import RelenvException, arches, archived_build, build_arch +from .common import ( + RelenvException, + arches, + archived_build, + build_arch, + format_shebang, + relative_interpreter, +) @contextlib.contextmanager @@ -133,6 +141,102 @@ def create( with tarfile.open(tar, "r:xz") as fp: for f in fp: fp.extract(f, writeto) + _sync_relenv_package(writeto, version) + _repair_script_shebangs(writeto, version) + + +def _site_packages_dir(root: pathlib.Path, version: str) -> pathlib.Path: + """ + Return the site-packages directory within the created environment. + """ + major_minor = ".".join(version.split(".")[:2]) + if sys.platform == "win32": + return root / "Lib" / "site-packages" + return root / "lib" / f"python{major_minor}" / "site-packages" + + +def _sync_relenv_package(root: pathlib.Path, version: str) -> None: + """ + Ensure the relenv package within the created environment matches this CLI. + """ + target_site = _site_packages_dir(root, version) + if not target_site.exists(): + return + target = target_site / "relenv" + source = pathlib.Path(__file__).resolve().parent + if target.exists(): + shutil.rmtree(target) + shutil.copytree( + source, + target, + dirs_exist_ok=True, + ignore=shutil.ignore_patterns("__pycache__", "*.pyc", "*.pyo"), + ) + + +def _repair_script_shebangs(root: pathlib.Path, version: str) -> None: + """ + Update legacy shell-wrapped entry points to the current shebang format. + + Older archives shipped scripts that started with the ``"true" ''''`` preamble. + Those files break when executed directly under Python (the parser sees the + unmatched triple-quoted literal). Patch any remaining copies to the new + `format_shebang` layout so fresh installs do not inherit stale loaders. + """ + if sys.platform == "win32": + return + + scripts_dir = root / "bin" + if not scripts_dir.is_dir(): + return + + major_minor = ".".join(version.split(".")[:2]) + interpreter_candidates = [ + scripts_dir / f"python{major_minor}", + scripts_dir / f"python{major_minor.split('.')[0]}", + scripts_dir / "python3", + scripts_dir / "python", + ] + interpreter_path: pathlib.Path | None = None + for candidate in interpreter_candidates: + if candidate.exists(): + interpreter_path = candidate + break + if interpreter_path is None: + return + + try: + rel_interpreter = relative_interpreter(root, scripts_dir, interpreter_path) + except ValueError: + # Paths are not relative to the install root; abandon the rewrite. + return + + try: + shebang = format_shebang(str(pathlib.PurePosixPath("/") / rel_interpreter)) + except Exception: + return + + legacy_prefix = "#!/bin/sh\n\"true\" ''''\n" + marker = "\n'''" + for script in scripts_dir.iterdir(): + if not script.is_file(): + continue + try: + text = script.read_text(encoding="utf-8") + except (OSError, UnicodeDecodeError): + continue + if not text.startswith(legacy_prefix): + continue + idx = text.find(marker) + if idx == -1: + continue + idy = idx + len(marker) + rest = text[idy:] + updated = shebang + rest.lstrip("\n") + try: + script.write_text(updated, encoding="utf-8") + except OSError: + continue def main(args: argparse.Namespace) -> None: diff --git a/relenv/fetch.py b/relenv/fetch.py index 4e0205e6..59ae25a0 100644 --- a/relenv/fetch.py +++ b/relenv/fetch.py @@ -1,5 +1,6 @@ # Copyright 2022-2025 Broadcom. -# SPDX-License-Identifier: Apache-2 +# SPDX-License-Identifier: Apache-2.0 +# mypy: ignore-errors """ The ``relenv fetch`` command. """ diff --git a/relenv/manifest.py b/relenv/manifest.py index dc7983f0..b8e0fc5a 100644 --- a/relenv/manifest.py +++ b/relenv/manifest.py @@ -1,4 +1,4 @@ -# Copyright 2025 Broadcom. +# Copyright 2022-2025 Broadcom. # SPDX-License-Identifier: Apache-2.0 # """ diff --git a/relenv/pyversions.py b/relenv/pyversions.py index 4359cf3a..930e4c0b 100644 --- a/relenv/pyversions.py +++ b/relenv/pyversions.py @@ -1,4 +1,4 @@ -# Copyright 2025 Broadcom. +# Copyright 2022-2025 Broadcom. # SPDX-License-Identifier: Apache-2.0 """ Versions utility. @@ -17,11 +17,11 @@ import hashlib import json import logging -import os +import os as _os import pathlib import re -import subprocess -import sys +import subprocess as _subprocess +import sys as _sys import time from typing import Any @@ -29,6 +29,17 @@ log = logging.getLogger(__name__) +os = _os +subprocess = _subprocess +sys = _sys + +__all__ = [ + "Version", + "os", + "subprocess", + "sys", +] + KEYSERVERS = [ "keyserver.ubuntu.com", "keys.openpgp.org", diff --git a/relenv/relocate.py b/relenv/relocate.py index 0be1d83a..b13eb0d7 100755 --- a/relenv/relocate.py +++ b/relenv/relocate.py @@ -1,5 +1,5 @@ # Copyright 2022-2025 Broadcom. -# SPDX-License-Identifier: Apache-2 +# SPDX-License-Identifier: Apache-2.0 """ A script to ensure the proper rpaths are in place for the relenv environment. """ @@ -7,14 +7,35 @@ from __future__ import annotations import logging -import os +import os as _os import pathlib -import shutil -import subprocess +import shutil as _shutil +import subprocess as _subprocess from typing import Optional log = logging.getLogger(__name__) +os = _os +shutil = _shutil +subprocess = _subprocess + +__all__ = [ + "is_macho", + "is_elf", + "parse_otool_l", + "parse_readelf_d", + "parse_macho", + "parse_rpath", + "handle_macho", + "is_in_dir", + "handle_elf", + "patch_rpath", + "main", + "os", + "shutil", + "subprocess", +] + LIBCLIBS = [ "linux-vdso.so.1", @@ -81,7 +102,7 @@ def is_elf(path: str | os.PathLike[str]) -> bool: return magic == b"\x7f\x45\x4c\x46" -def parse_otool_l(stdout: str) -> dict[str, list[str | None]]: +def parse_otool_l(stdout: str) -> dict[str, list[str]]: """ Parse the output of ``otool -l ``. @@ -94,7 +115,7 @@ def parse_otool_l(stdout: str) -> dict[str, list[str | None]]: in_cmd = False cmd: Optional[str] = None name: Optional[str] = None - data: dict[str, list[str | None]] = {} + data: dict[str, list[str]] = {} for line in [x.strip() for x in stdout.split("\n")]: if not line: @@ -102,26 +123,24 @@ def parse_otool_l(stdout: str) -> dict[str, list[str | None]]: if line.split()[0] == "cmd": in_cmd = False - if cmd: - if cmd not in data: - data[cmd] = [] - data[cmd].append(name) + if cmd is not None and name is not None: + data.setdefault(cmd, []).append(name) cmd = None name = None - if line.split()[-1] in (LC_ID_DYLIB, LC_LOAD_DYLIB, "LC_RPATH"): - cmd = line.split()[-1] + command = line.split()[-1] + if command in (LC_ID_DYLIB, LC_LOAD_DYLIB, "LC_RPATH"): + cmd = command in_cmd = True if in_cmd: - if line.split()[0] == "name": - name = line.split()[1] - if line.split()[0] == "path": - name = line.split()[1] + parts = line.split() + if parts[0] == "name" and len(parts) > 1: + name = parts[1] + if parts[0] == "path" and len(parts) > 1: + name = parts[1] - if in_cmd: - if cmd not in data: - data[cmd] = [] - data[cmd].append(name) + if in_cmd and cmd is not None and name is not None: + data.setdefault(cmd, []).append(name) return data @@ -144,7 +163,7 @@ def parse_readelf_d(stdout: str) -> list[str]: return [] -def parse_macho(path: str | os.PathLike[str]) -> dict[str, list[str | None]] | None: +def parse_macho(path: str | os.PathLike[str]) -> dict[str, list[str]] | None: """ Run ``otool -l `` and return its parsed output. @@ -183,7 +202,7 @@ def handle_macho( path: str | os.PathLike[str], root_dir: str | os.PathLike[str], rpath_only: bool, -) -> dict[str, list[str | None]] | None: +) -> dict[str, list[str]] | None: """ Ensure the given macho file has the correct rpath and is in th correct location. @@ -196,17 +215,20 @@ def handle_macho( :return: The information from ``parse_macho`` on the macho file. """ - obj = parse_macho(path) - log.info("Processing file %s %r", path, obj) + path_obj = pathlib.Path(path) + obj = parse_macho(path_obj) + path_str = str(path_obj) + log.info("Processing file %s %r", path_str, obj) if not obj: return None if LC_LOAD_DYLIB in obj: for x in obj[LC_LOAD_DYLIB]: - if path.startswith("@"): - log.info("Skipping dynamic load: %s", path) + if x.startswith("@"): + log.info("Skipping dynamic load: %s", x) continue if os.path.exists(x): - y = pathlib.Path(root_dir).resolve() / os.path.basename(x) + target_dir = pathlib.Path(root_dir).resolve() + y = target_dir / os.path.basename(x) if not os.path.exists(y): if rpath_only: log.warning("In `rpath_only mode` but %s is not in %s", x, y) @@ -215,13 +237,13 @@ def handle_macho( shutil.copy(x, y) shutil.copymode(x, y) log.info("Copied %s to %s", x, y) - log.info("Use %s to %s", y, path) + log.info("Use %s to %s", y, path_str) z = pathlib.Path("@loader_path") / os.path.relpath( - y, pathlib.Path(path).resolve().parent + y, path_obj.resolve().parent ) - cmd = ["install_name_tool", "-change", x, z, path] + cmd = ["install_name_tool", "-change", x, str(z), path_str] subprocess.run(cmd) - log.info("Changed %s to %s in %s", x, z, path) + log.info("Changed %s to %s in %s", x, z, path_str) return obj @@ -371,24 +393,24 @@ def main( :param log_level: The level to log at, defaults to "INFO" :type log_level: str, optional """ + level = logging.getLevelName(log_level.upper()) if log_file_name != "": - kwargs = { - "filename": log_file_name, - "filemode": "w", - } + logging.basicConfig( + level=level, + format="%(asctime)s %(message)s", + filename=log_file_name, + filemode="w", + ) else: - kwargs = {} - logging.basicConfig( - level=logging.getLevelName(log_level.upper()), - format="%(asctime)s %(message)s", - **kwargs, - ) + logging.basicConfig( + level=level, + format="%(asctime)s %(message)s", + ) root_dir = str(pathlib.Path(root).resolve()) if libs_dir is None: libs_dir = pathlib.Path(root_dir, "lib") libs_dir = str(pathlib.Path(libs_dir).resolve()) - rpath_only = rpath_only - processed: dict[str, dict[str, list[str | None]] | None] = {} + processed: dict[str, dict[str, list[str]] | None] = {} found = True while found: found = False diff --git a/relenv/runtime.py b/relenv/runtime.py index 53574ad2..4db8b129 100644 --- a/relenv/runtime.py +++ b/relenv/runtime.py @@ -1,5 +1,5 @@ # Copyright 2022-2025 Broadcom. -# SPDX-License-Identifier: Apache-2 +# SPDX-License-Identifier: Apache-2.0 """ This code is run when initializing the python interperter in a Relenv environment. @@ -13,18 +13,18 @@ from __future__ import annotations import contextlib -import ctypes +import ctypes as _ctypes import functools -import importlib -import json +import importlib as _importlib +import json as _json import os import pathlib -import shutil -import site -import subprocess -import sys +import shutil as _shutil +import site as _site +import subprocess as _subprocess +import sys as _sys import textwrap -import warnings +import warnings as _warnings from importlib.machinery import ModuleSpec from types import ModuleType from typing import ( @@ -39,6 +39,30 @@ cast, ) +# The tests monkeypatch these module-level imports (e.g., json.loads) inside +# relenv.runtime itself; keeping them as Any both preserves test isolation—no +# need to patch the global stdlib modules—and avoids mypy attr-defined noise +# while still exercising the real runtime wiring. +json = cast(Any, _json) +importlib = cast(Any, _importlib) +site = cast(Any, _site) +subprocess = cast(Any, _subprocess) +sys = cast(Any, _sys) +ctypes = cast(Any, _ctypes) +shutil = cast(Any, _shutil) +warnings = cast(Any, _warnings) + +__all__ = [ + "sys", + "shutil", + "subprocess", + "json", + "importlib", + "site", + "ctypes", + "warnings", +] + PathType = Union[str, os.PathLike[str]] ConfigVars = Dict[str, str] @@ -68,37 +92,39 @@ def path_import(name: str, path: PathType) -> ModuleType: return module +_COMMON: Optional[ModuleType] = None +_RELOCATE: Optional[ModuleType] = None +_BUILDENV: Optional[ModuleType] = None + + def common() -> ModuleType: - """ - Late import relenv common. - """ - if not hasattr(common, "common"): - common.common = path_import( + """Return the cached ``relenv.common`` module.""" + global _COMMON + if _COMMON is None: + _COMMON = path_import( "relenv.common", str(pathlib.Path(__file__).parent / "common.py") ) - return cast(ModuleType, common.common) + return _COMMON def relocate() -> ModuleType: - """ - Late import relenv relocate. - """ - if not hasattr(relocate, "relocate"): - relocate.relocate = path_import( + """Return the cached ``relenv.relocate`` module.""" + global _RELOCATE + if _RELOCATE is None: + _RELOCATE = path_import( "relenv.relocate", str(pathlib.Path(__file__).parent / "relocate.py") ) - return cast(ModuleType, relocate.relocate) + return _RELOCATE def buildenv() -> ModuleType: - """ - Late import relenv buildenv. - """ - if not hasattr(buildenv, "builenv"): - buildenv.buildenv = path_import( + """Return the cached ``relenv.buildenv`` module.""" + global _BUILDENV + if _BUILDENV is None: + _BUILDENV = path_import( "relenv.buildenv", str(pathlib.Path(__file__).parent / "buildenv.py") ) - return cast(ModuleType, buildenv.buildenv) + return _BUILDENV def get_major_version() -> str: @@ -167,13 +193,16 @@ def wrapped(self: Any, *args: Any, **kwargs: Any) -> bytes: ) except ValueError: debug(f"Relenv Value Error - _build_shebang {self.target_dir}") - return func(self, *args, **kwargs) + original_result: bytes = func(self, *args, **kwargs) + return original_result debug(f"Relenv - _build_shebang {scripts} {interpreter}") if sys.platform == "win32": return ( str(pathlib.Path("#!") / interpreter).encode() + b"\r\n" ) - return common().format_shebang("/" / interpreter).encode() + rel_path = str(pathlib.PurePosixPath("/") / interpreter) + formatted = cast(str, common().format_shebang(rel_path)) + return formatted.encode() return wrapped @@ -226,7 +255,7 @@ def system_sysconfig() -> ConfigVars: to avoid the overhead of shelling out. """ global _SYSTEM_CONFIG_VARS - if _SYSTEM_CONFIG_VARS: + if _SYSTEM_CONFIG_VARS is not None: return _SYSTEM_CONFIG_VARS pyexec = pathlib.Path("/usr/bin/python3") if pyexec.exists(): @@ -277,7 +306,7 @@ def wrapped(*args: Any) -> ConfigVars: "LDSHARED", ]: config_vars[name] = system_config_vars[name] - mod._CONFIG_VARS = config_vars + setattr(mod, "_CONFIG_VARS", config_vars) return func(*args) return wrapped @@ -534,7 +563,9 @@ def find_spec( if wrapper.matches(module_name) and not wrapper.loading: debug(f"RelenvImporter - match {module_name} {package_path} {target}") wrapper.loading = True - return importlib.util.spec_from_loader(module_name, self) + spec = importlib.util.spec_from_loader(module_name, self) + return cast(Optional[ModuleSpec], spec) + return None def find_module( self: "RelenvImporter", @@ -549,6 +580,7 @@ def find_module( debug(f"RelenvImporter - match {module_name}") wrapper.loading = True return self + return None def load_module(self: "RelenvImporter", name: str) -> ModuleType: """ @@ -583,7 +615,8 @@ def wrap_sysconfig(name: str) -> ModuleType: """ Sysconfig wrapper. """ - mod = importlib.import_module("sysconfig") + module: ModuleType = importlib.import_module("sysconfig") + mod = cast(Any, module) mod.get_config_var = get_config_var_wrapper(mod.get_config_var) mod.get_config_vars = get_config_vars_wrapper(mod.get_config_vars, mod) mod._PIP_USE_SYSCONFIG = True @@ -594,45 +627,49 @@ def wrap_sysconfig(name: str) -> ModuleType: # Python < 3.10 scheme = mod._get_default_scheme() mod.get_paths = get_paths_wrapper(mod.get_paths, scheme) - return mod + return module def wrap_pip_distlib_scripts(name: str) -> ModuleType: """ pip.distlib.scripts wrapper. """ - mod = importlib.import_module(name) + module: ModuleType = importlib.import_module(name) + mod = cast(Any, module) mod.ScriptMaker._build_shebang = _build_shebang(mod.ScriptMaker._build_shebang) - return mod + return module def wrap_distutils_command(name: str) -> ModuleType: """ distutils.command wrapper. """ - mod = importlib.import_module(name) + module: ModuleType = importlib.import_module(name) + mod = cast(Any, module) mod.build_ext.finalize_options = finalize_options_wrapper( mod.build_ext.finalize_options ) - return mod + return module def wrap_pip_install_wheel(name: str) -> ModuleType: """ pip._internal.operations.install.wheel wrapper. """ - mod = importlib.import_module(name) + module: ModuleType = importlib.import_module(name) + mod = cast(Any, module) mod.install_wheel = install_wheel_wrapper(mod.install_wheel) - return mod + return module def wrap_pip_install_legacy(name: str) -> ModuleType: """ pip._internal.operations.install.legacy wrapper. """ - mod = importlib.import_module(name) + module: ModuleType = importlib.import_module(name) + mod = cast(Any, module) mod.install = install_legacy_wrapper(mod.install) - return mod + return module def set_env_if_not_set(name: str, value: str) -> None: @@ -656,17 +693,17 @@ def wrap_pip_build_wheel(name: str) -> ModuleType: """ pip._internal.operations.build wrapper. """ - mod = importlib.import_module(name) + module: ModuleType = importlib.import_module(name) + mod = cast(Any, module) def wrap(func: Callable[..., Any]) -> Callable[..., Any]: @functools.wraps(func) def wrapper(*args: Any, **kwargs: Any) -> Any: if sys.platform != "linux": return func(*args, **kwargs) - base_dir = common().DATA_DIR / "toolchain" - toolchain = base_dir / common().get_triplet() + toolchain = common().get_toolchain() cargo_home = str(common().DATA_DIR / "cargo") - if not toolchain.exists(): + if toolchain is None or not toolchain.exists(): debug("Unable to set CARGO_HOME no toolchain exists") else: relenvroot = str(sys.RELENV) @@ -683,7 +720,7 @@ def wrapper(*args: Any, **kwargs: Any) -> Any: return wrapper mod.build_wheel_pep517 = wrap(mod.build_wheel_pep517) - return mod + return module class TARGET: @@ -710,9 +747,10 @@ def wrap_cmd_install(name: str) -> ModuleType: """ Wrap pip install command to store target argument state. """ - mod = importlib.import_module(name) + module: ModuleType = importlib.import_module(name) + mod = cast(Any, module) - def wrap(func: Callable[..., Any]) -> Callable[..., Any]: + def wrap_run(func: Callable[..., Any]) -> Callable[..., Any]: @functools.wraps(func) def wrapper(self: Any, options: Any, args: Sequence[str]) -> Any: if not options.use_user_site: @@ -724,9 +762,9 @@ def wrapper(self: Any, options: Any, args: Sequence[str]) -> Any: return wrapper - mod.InstallCommand.run = wrap(mod.InstallCommand.run) + mod.InstallCommand.run = wrap_run(mod.InstallCommand.run) - def wrap(func: Callable[..., Any]) -> Callable[..., Any]: + def wrap_handle_target(func: Callable[..., Any]) -> Callable[..., Any]: @functools.wraps(func) def wrapper( self: Any, target_dir: str, target_temp_dir: str, upgrade: bool @@ -738,19 +776,20 @@ def wrapper( return wrapper if hasattr(mod.InstallCommand, "_handle_target_dir"): - mod.InstallCommand._handle_target_dir = wrap( + mod.InstallCommand._handle_target_dir = wrap_handle_target( mod.InstallCommand._handle_target_dir ) - return mod + return module def wrap_locations(name: str) -> ModuleType: """ Wrap pip locations to fix locations when installing with target. """ - mod = importlib.import_module(name) + module: ModuleType = importlib.import_module(name) + mod = cast(Any, module) - def wrap(func: Callable[..., Any]) -> Callable[..., Any]: + def make_scheme_wrapper(func: Callable[..., Any]) -> Callable[..., Any]: @functools.wraps(func) def wrapper( dist_name: str, @@ -778,20 +817,21 @@ def wrapper( # get_scheme is not available on pip-19.2.3 # try: - mod.get_scheme = wrap(mod.get_scheme) + mod.get_scheme = make_scheme_wrapper(mod.get_scheme) # except AttributeError: # debug(f"Module {mod} does not have attribute get_scheme") - return mod + return module def wrap_req_command(name: str) -> ModuleType: """ Honor ignore installed option from pip cli. """ - mod = importlib.import_module(name) + module: ModuleType = importlib.import_module(name) + mod = cast(Any, module) - def wrap(func: Callable[..., Any]) -> Callable[..., Any]: + def make_package_finder_wrapper(func: Callable[..., Any]) -> Callable[..., Any]: @functools.wraps(func) def wrapper( self: Any, @@ -806,119 +846,56 @@ def wrapper( return wrapper - mod.RequirementCommand._build_package_finder = wrap( + mod.RequirementCommand._build_package_finder = make_package_finder_wrapper( mod.RequirementCommand._build_package_finder ) - return mod + return module def wrap_req_install(name: str) -> ModuleType: """ Honor ignore installed option from pip cli. """ - mod = importlib.import_module(name) + module: ModuleType = importlib.import_module(name) + mod = cast(Any, module) - def wrap(func: Callable[..., Any]) -> Callable[..., Any]: - argcount = mod.InstallRequirement.install.__code__.co_argcount - - if argcount == 7: - - @functools.wraps(func) - def wrapper( - self: Any, - root: Optional[PathType] = None, - home: Optional[PathType] = None, - prefix: Optional[PathType] = None, - warn_script_location: bool = True, - use_user_site: bool = False, - pycompile: bool = True, - ) -> Any: - try: - if TARGET.TARGET: - TARGET.INSTALL = True - home = _ensure_target_path() - return func( - self, - root, - home, - prefix, - warn_script_location, - use_user_site, - pycompile, - ) - finally: - TARGET.INSTALL = False - - return wrapper - - if argcount == 8: - - @functools.wraps(func) - def wrapper( - self: Any, - global_options: Any = None, - root: Optional[PathType] = None, - home: Optional[PathType] = None, - prefix: Optional[PathType] = None, - warn_script_location: bool = True, - use_user_site: bool = False, - pycompile: bool = True, - ) -> Any: - try: - if TARGET.TARGET: - TARGET.INSTALL = True - home = _ensure_target_path() - return func( - self, - global_options, - root, - home, - prefix, - warn_script_location, - use_user_site, - pycompile, - ) - finally: - TARGET.INSTALL = False - - return wrapper - - if argcount == 9: - - @functools.wraps(func) - def wrapper( - self: Any, - install_options: Any, - global_options: Any = None, - root: Optional[PathType] = None, - home: Optional[PathType] = None, - prefix: Optional[PathType] = None, - warn_script_location: bool = True, - use_user_site: bool = False, - pycompile: bool = True, - ) -> Any: - try: - if TARGET.TARGET: - TARGET.INSTALL = True - home = _ensure_target_path() - return func( - self, - install_options, - global_options, - root, - home, - prefix, - warn_script_location, - use_user_site, - pycompile, - ) - finally: - TARGET.INSTALL = False - - return wrapper + original = mod.InstallRequirement.install + argcount = original.__code__.co_argcount - @functools.wraps(func) - def wrapper( + if argcount == 7: + + @functools.wraps(original) + def install_wrapper_pep517( + self: Any, + root: Optional[PathType] = None, + home: Optional[PathType] = None, + prefix: Optional[PathType] = None, + warn_script_location: bool = True, + use_user_site: bool = False, + pycompile: bool = True, + ) -> Any: + try: + if TARGET.TARGET: + TARGET.INSTALL = True + home = _ensure_target_path() + return original( + self, + root, + home, + prefix, + warn_script_location, + use_user_site, + pycompile, + ) + finally: + TARGET.INSTALL = False + + mod.InstallRequirement.install = install_wrapper_pep517 + + elif argcount == 8: + + @functools.wraps(original) + def install_wrapper_pep517_opts( self: Any, global_options: Any = None, root: Optional[PathType] = None, @@ -932,7 +909,7 @@ def wrapper( if TARGET.TARGET: TARGET.INSTALL = True home = _ensure_target_path() - return func( + return original( self, global_options, root, @@ -945,10 +922,74 @@ def wrapper( finally: TARGET.INSTALL = False - return wrapper + mod.InstallRequirement.install = install_wrapper_pep517_opts + + elif argcount == 9: + + @functools.wraps(original) + def install_wrapper_legacy( + self: Any, + install_options: Any, + global_options: Any = None, + root: Optional[PathType] = None, + home: Optional[PathType] = None, + prefix: Optional[PathType] = None, + warn_script_location: bool = True, + use_user_site: bool = False, + pycompile: bool = True, + ) -> Any: + try: + if TARGET.TARGET: + TARGET.INSTALL = True + home = _ensure_target_path() + return original( + self, + install_options, + global_options, + root, + home, + prefix, + warn_script_location, + use_user_site, + pycompile, + ) + finally: + TARGET.INSTALL = False - mod.InstallRequirement.install = wrap(mod.InstallRequirement.install) - return mod + mod.InstallRequirement.install = install_wrapper_legacy + + else: + + @functools.wraps(original) + def install_wrapper_generic( + self: Any, + global_options: Any = None, + root: Optional[PathType] = None, + home: Optional[PathType] = None, + prefix: Optional[PathType] = None, + warn_script_location: bool = True, + use_user_site: bool = False, + pycompile: bool = True, + ) -> Any: + try: + if TARGET.TARGET: + TARGET.INSTALL = True + home = _ensure_target_path() + return original( + self, + global_options, + root, + home, + prefix, + warn_script_location, + use_user_site, + pycompile, + ) + finally: + TARGET.INSTALL = False + + mod.InstallRequirement.install = install_wrapper_generic + return module importer = RelenvImporter( @@ -991,24 +1032,43 @@ def install_cargo_config() -> None: return # cargo_home = dirs.data / "cargo" - if not cargo_home.exists(): - cargo_home.mkdir() + cargo_home.mkdir(parents=True, exist_ok=True) cargo_config = cargo_home / "config.toml" - if not cargo_config.exists(): - if triplet == "x86_64-linux-gnu": - cargo_triplet = "x86_64-unknown-linux-gnu" - else: - cargo_triplet = "aarch64-unknown-linux-gnu" - gcc = toolchain / "bin" / f"{triplet}-gcc" - with open(cargo_config, "w") as fp: - fp.write( - textwrap.dedent( - """\ - [target.{}] - linker = "{}" + if triplet == "x86_64-linux-gnu": + cargo_triplet = "x86_64-unknown-linux-gnu" + else: + cargo_triplet = "aarch64-unknown-linux-gnu" + gcc = toolchain / "bin" / f"{triplet}-gcc" + + def existing_linker() -> str | None: + if not cargo_config.exists(): + return None + try: + contents = cargo_config.read_text() + except OSError: + return None + marker = f"[target.{cargo_triplet}]" + if marker not in contents: + return None + for line in contents.splitlines(): + stripped = line.strip() + if stripped.startswith("linker"): + value_part: str + _, _, value_part = stripped.partition("=") + value_str: str = value_part.strip().strip('"') + if value_str: + return value_str + return None + + if existing_linker() != str(gcc): + cargo_config.write_text( + textwrap.dedent( + """\ + [target.{triplet}] + linker = "{linker}" """ - ).format(cargo_triplet, gcc) - ) + ).format(triplet=cargo_triplet, linker=gcc) + ) def setup_openssl() -> None: @@ -1128,7 +1188,8 @@ def load_openssl_provider(name: str) -> int: OSSL_PROVIDER_load = libcrypto.OSSL_PROVIDER_load OSSL_PROVIDER_load.argtypes = (POSSL_LIB_CTX, ctypes.c_char_p) OSSL_PROVIDER_load.restype = ctypes.c_int - return OSSL_PROVIDER_load(None, name.encode()) + result = OSSL_PROVIDER_load(None, name.encode()) + return int(result) def setup_crossroot() -> None: @@ -1161,9 +1222,11 @@ def wrapsitecustomize(func: Callable[[], Any]) -> Callable[[], None]: def wrapper() -> None: func() - sitecustomize = None + sitecustomize_module = None try: - import sitecustomize + import sitecustomize as _sitecustomize + + sitecustomize_module = _sitecustomize except ImportError as exc: if exc.name != "sitecustomize": raise @@ -1172,7 +1235,10 @@ def wrapper() -> None: # relenv environment. This can't be done when pip is using build_env to # install packages. This code seems potentially brittle and there may # be reasonable arguments against doing it at all. - if sitecustomize is None or "pip-build-env" not in sitecustomize.__file__: + if ( + sitecustomize_module is None + or "pip-build-env" not in sitecustomize_module.__file__ + ): _orig = sys.path[:] # Replace sys.path sys.path[:] = common().sanitize_sys_path(sys.path) diff --git a/relenv/toolchain.py b/relenv/toolchain.py index 789b8a18..8c0b8b08 100644 --- a/relenv/toolchain.py +++ b/relenv/toolchain.py @@ -1,5 +1,5 @@ # Copyright 2022-2025 Broadcom. -# SPDX-License-Identifier: Apache-2 +# SPDX-License-Identifier: Apache-2.0 """ The ``relenv toolchain`` command. """ diff --git a/setup.cfg b/setup.cfg index ce11e0eb..03402ab3 100644 --- a/setup.cfg +++ b/setup.cfg @@ -62,7 +62,9 @@ ignore = # F405 '*' may be undefined, or defined from star imports: * F405 # line break before binary operator, black does this with pathlib.Path objects - W503 + W503, + # TYP001 guard import by TYPE_CHECKING (not needed for py3.10+) + TYP001 per-file-ignores = # F401 imported but unused diff --git a/tests/__init__.py b/tests/__init__.py index e69de29b..77693cb5 100644 --- a/tests/__init__.py +++ b/tests/__init__.py @@ -0,0 +1,2 @@ +# Copyright 2022-2025 Broadcom. +# SPDX-License-Identifier: Apache-2.0 diff --git a/tests/_pytest_typing.py b/tests/_pytest_typing.py new file mode 100644 index 00000000..2ec3fcb5 --- /dev/null +++ b/tests/_pytest_typing.py @@ -0,0 +1,45 @@ +# Copyright 2022-2025 Broadcom. +# SPDX-License-Identifier: Apache-2.0 +""" +Typed helper wrappers for common pytest decorators so mypy understands them. +""" +from __future__ import annotations + +from typing import Any, Callable, Iterable, Sequence, TypeVar, cast + +import pytest + +F = TypeVar("F", bound=Callable[..., object]) + + +def fixture(*args: Any, **kwargs: Any) -> Callable[[F], F] | F: + if args and callable(args[0]) and not kwargs: + func = cast(F, args[0]) + return cast(F, pytest.fixture()(func)) + + def decorator(func: F) -> F: + wrapped = pytest.fixture(*args, **kwargs)(func) + return cast(F, wrapped) + + return decorator + + +def mark_skipif(*args: Any, **kwargs: Any) -> Callable[[F], F]: + def decorator(func: F) -> F: + wrapped = pytest.mark.skipif(*args, **kwargs)(func) + return cast(F, wrapped) + + return decorator + + +def parametrize( + argnames: str | Sequence[str], + argvalues: Iterable[Sequence[Any] | Any], + *args: Any, + **kwargs: Any, +) -> Callable[[F], F]: + def decorator(func: F) -> F: + wrapped = pytest.mark.parametrize(argnames, argvalues, *args, **kwargs)(func) + return cast(F, wrapped) + + return decorator diff --git a/tests/conftest.py b/tests/conftest.py index bf054c84..8ce041c4 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -1,4 +1,4 @@ -# Copyright 2023-2025 Broadcom. +# Copyright 2022-2025 Broadcom. # SPDX-License-Identifier: Apache-2.0 # import logging @@ -6,20 +6,27 @@ import platform import shutil import sys +from pathlib import Path +from typing import Iterator, Optional import pytest +from _pytest.config import Config from relenv.common import list_archived_builds, plat_from_triplet from relenv.create import create +from tests._pytest_typing import fixture + +# mypy: ignore-errors + log = logging.getLogger(__name__) -def get_build_version(): +def get_build_version() -> Optional[str]: if "RELENV_PY_VERSION" in os.environ: return os.environ["RELENV_PY_VERSION"] builds = list(list_archived_builds()) - versions = [] + versions: list[str] = [] for version, arch, plat in builds: sysplat = plat_from_triplet(plat) if sysplat == sys.platform and arch == platform.machine().lower(): @@ -30,60 +37,59 @@ def get_build_version(): "Environment RELENV_PY_VERSION not set, detected version %s", version ) return version + return None -def pytest_report_header(config): +def pytest_report_header(config: Config) -> str: return f"relenv python version: {get_build_version()}" -@pytest.fixture(scope="module") -def build_version(): - return get_build_version() +@fixture(scope="module") +def build_version() -> Iterator[str]: + version = get_build_version() + if version is None: + pytest.skip("No relenv build version available for current platform") + assert version is not None + yield version -@pytest.fixture(scope="module") -def minor_version(): - yield get_build_version().rsplit(".", 1)[0] +@fixture(scope="module") +def minor_version(build_version: str) -> Iterator[str]: + yield build_version.rsplit(".", 1)[0] -@pytest.fixture -def build(tmp_path, build_version): +@fixture +def build(tmp_path: Path, build_version: str) -> Iterator[Path]: create("test", tmp_path, version=build_version) - os.chdir(tmp_path / "test") + build_path = tmp_path / "test" + original_cwd = Path.cwd() + os.chdir(build_path) try: - yield tmp_path / "test" + yield build_path finally: - try: - shutil.rmtree(tmp_path) - except Exception as exc: - log.error("Failed to remove build directory %s", exc) + os.chdir(original_cwd) + shutil.rmtree(tmp_path, ignore_errors=True) -@pytest.fixture -def pipexec(build): - if sys.platform == "win32": - path = build / "Scripts" - else: - path = build / "bin" - - exe = shutil.which("pip3", path=path) +@fixture +def pipexec(build: Path) -> Iterator[str]: + path = build / ("Scripts" if sys.platform == "win32" else "bin") + exe = shutil.which("pip3", path=str(path)) if exe is None: - exe = shutil.which("pip", path=path) + exe = shutil.which("pip", path=str(path)) if exe is None: pytest.fail(f"Failed to find 'pip3' and 'pip' in '{path}'") + assert exe is not None yield exe -@pytest.fixture -def pyexec(build): - if sys.platform == "win32": - path = build / "Scripts" - else: - path = build / "bin" - - exe = shutil.which("python3", path=path) +@fixture +def pyexec(build: Path) -> Iterator[str]: + path = build / ("Scripts" if sys.platform == "win32" else "bin") + exe = shutil.which("python3", path=str(path)) if exe is None: - exe = shutil.which("python", path=path) + exe = shutil.which("python", path=str(path)) if exe is None: pytest.fail(f"Failed to find 'python3' and 'python' in '{path}'") + assert exe is not None yield exe diff --git a/tests/test_build.py b/tests/test_build.py index 8d751bda..9d717005 100644 --- a/tests/test_build.py +++ b/tests/test_build.py @@ -1,27 +1,30 @@ # Copyright 2022-2025 Broadcom. -# SPDX-License-Identifier: Apache-2 +# SPDX-License-Identifier: Apache-2.0 import hashlib +import pathlib import pytest from relenv.build.common import Builder, verify_checksum -from relenv.common import DATA_DIR, RelenvException +from relenv.common import DATA_DIR, RelenvException, toolchain_root_dir + +# mypy: ignore-errors @pytest.fixture -def fake_download(tmp_path): +def fake_download(tmp_path: pathlib.Path) -> pathlib.Path: download = tmp_path / "fake_download" download.write_text("This is some file contents") return download @pytest.fixture -def fake_download_md5(fake_download): +def fake_download_md5(fake_download: pathlib.Path) -> str: return hashlib.sha1(fake_download.read_bytes()).hexdigest() @pytest.mark.skip_unless_on_linux -def test_builder_defaults_linux(): +def test_builder_defaults_linux() -> None: builder = Builder(version="3.10.10") assert builder.arch == "x86_64" assert builder.arch == "x86_64" @@ -29,15 +32,57 @@ def test_builder_defaults_linux(): assert builder.prefix == DATA_DIR / "build" / "3.10.10-x86_64-linux-gnu" assert builder.sources == DATA_DIR / "src" assert builder.downloads == DATA_DIR / "download" - assert "relenv/toolchain" in str(builder.toolchain) + assert builder.toolchain == toolchain_root_dir() / builder.triplet assert callable(builder.build_default) assert callable(builder.populate_env) assert builder.recipies == {} -def test_verify_checksum(fake_download, fake_download_md5): +@pytest.mark.skip_unless_on_linux +def test_builder_toolchain_lazy_loading(monkeypatch: pytest.MonkeyPatch) -> None: + """Test that toolchain is only fetched when accessed (lazy loading).""" + import relenv.build.common + + call_count = {"count": 0} + + def mock_get_toolchain(arch=None, root=None): + call_count["count"] += 1 + # Return a fake path instead of actually extracting + return pathlib.Path(f"/fake/toolchain/{arch or 'default'}") + + monkeypatch.setattr(relenv.build.common, "get_toolchain", mock_get_toolchain) + + # Create builder - should NOT call get_toolchain yet + builder = Builder(version="3.10.10", arch="aarch64") + assert call_count["count"] == 0, "get_toolchain should not be called during init" + + # Access toolchain property - should call get_toolchain once + toolchain = builder.toolchain + assert ( + call_count["count"] == 1 + ), "get_toolchain should be called when property is accessed" + assert toolchain == pathlib.Path("/fake/toolchain/aarch64") + + # Access again - should use cached value, not call again + toolchain2 = builder.toolchain + assert call_count["count"] == 1, "get_toolchain should only be called once (cached)" + assert toolchain == toolchain2 + + # Change arch - should reset cache + builder.set_arch("x86_64") + assert builder._toolchain is None, "Changing arch should reset toolchain cache" + + # Access after arch change - should call get_toolchain again + toolchain3 = builder.toolchain + assert ( + call_count["count"] == 2 + ), "get_toolchain should be called again after arch change" + assert toolchain3 == pathlib.Path("/fake/toolchain/x86_64") + + +def test_verify_checksum(fake_download: pathlib.Path, fake_download_md5: str) -> None: assert verify_checksum(fake_download, fake_download_md5) is True -def test_verify_checksum_failed(fake_download): +def test_verify_checksum_failed(fake_download: pathlib.Path) -> None: pytest.raises(RelenvException, verify_checksum, fake_download, "no") diff --git a/tests/test_common.py b/tests/test_common.py index 921724a5..2e05c2f5 100644 --- a/tests/test_common.py +++ b/tests/test_common.py @@ -1,5 +1,5 @@ # Copyright 2022-2025 Broadcom. -# SPDX-License-Identifier: Apache-2 +# SPDX-License-Identifier: Apache-2.0 from __future__ import annotations import os @@ -11,7 +11,7 @@ import sys import tarfile from types import ModuleType -from typing import BinaryIO +from typing import BinaryIO, Literal from unittest.mock import patch import pytest @@ -40,28 +40,31 @@ work_dirs, work_root, ) +from tests._pytest_typing import mark_skipif, parametrize -def _mock_ppbt_module(monkeypatch: pytest.MonkeyPatch, triplet: str) -> None: +def _mock_ppbt_module( + monkeypatch: pytest.MonkeyPatch, triplet: str, archive_path: pathlib.Path +) -> None: """ Provide a lightweight ppbt.common stub so get_toolchain() skips the real extraction. """ stub_package = ModuleType("ppbt") stub_common = ModuleType("ppbt.common") - stub_package.common = stub_common # type: ignore[attr-defined] + setattr(stub_package, "common", stub_common) # pytest will clean these entries up automatically via monkeypatch monkeypatch.setitem(sys.modules, "ppbt", stub_package) monkeypatch.setitem(sys.modules, "ppbt.common", stub_common) - stub_common.ARCHIVE = pathlib.Path("dummy-toolchain.tar.xz") + setattr(stub_common, "ARCHIVE", archive_path) def fake_extract_archive(dest: str, archive: str) -> None: dest_path = pathlib.Path(dest) dest_path.mkdir(parents=True, exist_ok=True) (dest_path / triplet).mkdir(parents=True, exist_ok=True) - stub_common.extract_archive = fake_extract_archive # type: ignore[attr-defined] + setattr(stub_common, "extract_archive", fake_extract_archive) def test_get_triplet_linux() -> None: @@ -157,9 +160,14 @@ def test_get_toolchain(tmp_path: pathlib.Path, monkeypatch: pytest.MonkeyPatch) data_dir = tmp_path / "data" triplet = "aarch64-linux-gnu" monkeypatch.setattr(relenv.common, "DATA_DIR", data_dir, raising=False) - monkeypatch.setattr(relenv.common.sys, "platform", "linux", raising=False) - monkeypatch.setattr(relenv.common, "get_triplet", lambda: triplet) - _mock_ppbt_module(monkeypatch, triplet) + monkeypatch.setattr(sys, "platform", "linux") + monkeypatch.setattr( + relenv.common, "get_triplet", lambda machine=None, plat=None: triplet + ) + monkeypatch.setenv("RELENV_TOOLCHAIN_CACHE", str(data_dir / "toolchain")) + archive_path = tmp_path / "dummy-toolchain.tar.xz" + archive_path.write_bytes(b"") + _mock_ppbt_module(monkeypatch, triplet, archive_path) ret = get_toolchain(arch="aarch64") assert ret == data_dir / "toolchain" / triplet @@ -170,8 +178,11 @@ def test_get_toolchain_linux_existing(tmp_path: pathlib.Path) -> None: toolchain_path = data_dir / "toolchain" / triplet toolchain_path.mkdir(parents=True) with patch("relenv.common.DATA_DIR", data_dir), patch( - "relenv.common.sys.platform", "linux" - ), patch("relenv.common.get_triplet", return_value=triplet): + "sys.platform", "linux" + ), patch("relenv.common.get_triplet", return_value=triplet), patch.dict( + os.environ, + {"RELENV_TOOLCHAIN_CACHE": str(data_dir / "toolchain")}, + ): ret = get_toolchain() assert ret == toolchain_path @@ -182,14 +193,22 @@ def test_get_toolchain_no_arch( data_dir = tmp_path / "data" triplet = "x86_64-linux-gnu" monkeypatch.setattr(relenv.common, "DATA_DIR", data_dir, raising=False) - monkeypatch.setattr(relenv.common.sys, "platform", "linux", raising=False) - monkeypatch.setattr(relenv.common, "get_triplet", lambda: triplet) - _mock_ppbt_module(monkeypatch, triplet) + monkeypatch.setattr(sys, "platform", "linux") + monkeypatch.setattr( + relenv.common, "get_triplet", lambda machine=None, plat=None: triplet + ) + monkeypatch.setenv("RELENV_TOOLCHAIN_CACHE", str(data_dir / "toolchain")) + archive_path = tmp_path / "dummy-toolchain.tar.xz" + archive_path.write_bytes(b"") + _mock_ppbt_module(monkeypatch, triplet, archive_path) ret = get_toolchain() assert ret == data_dir / "toolchain" / triplet -@pytest.mark.parametrize( +WriteMode = Literal["w:gz", "w:xz", "w:bz2", "w"] + + +@parametrize( ("suffix", "mode"), ( (".tgz", "w:gz"), @@ -199,14 +218,14 @@ def test_get_toolchain_no_arch( (".tar", "w"), ), ) -def test_extract_archive(tmp_path: pathlib.Path, suffix: str, mode: str) -> None: +def test_extract_archive(tmp_path: pathlib.Path, suffix: str, mode: WriteMode) -> None: to_be_archived = tmp_path / "to_be_archived" to_be_archived.mkdir() test_file = to_be_archived / "testfile" test_file.touch() tar_file = tmp_path / f"fake_archive{suffix}" to_dir = tmp_path / "extracted" - with tarfile.open(str(tar_file), mode) as tar: + with tarfile.open(str(tar_file), mode=mode) as tar: tar.add(str(to_be_archived), to_be_archived.name) extract_archive(str(to_dir), str(tar_file)) assert to_dir.exists() @@ -250,19 +269,32 @@ def fake_fetch(url: str, fp: BinaryIO, backoff: int, timeout: float) -> None: assert not created.exists() -@pytest.mark.skipif(shutil.which("shellcheck") is None, reason="Test needs shellcheck") +def _extract_shell_snippet(tpl: str) -> str: + rendered = format_shebang("python3", tpl) + lines = rendered.splitlines()[1:] # skip #!/bin/sh + snippet: list[str] = [] + for line in lines: + if line.startswith("'''"): + break + snippet.append(line) + return "\n".join(snippet) + + +@mark_skipif(shutil.which("shellcheck") is None, reason="Test needs shellcheck") def test_shebang_tpl_linux() -> None: - sh = format_shebang("python3", SHEBANG_TPL_LINUX).split("'''")[1].strip("'") + sh = _extract_shell_snippet(SHEBANG_TPL_LINUX) proc = subprocess.Popen(["shellcheck", "-s", "sh", "-"], stdin=subprocess.PIPE) + assert proc.stdin is not None proc.stdin.write(sh.encode()) proc.communicate() assert proc.returncode == 0 -@pytest.mark.skipif(shutil.which("shellcheck") is None, reason="Test needs shellcheck") +@mark_skipif(shutil.which("shellcheck") is None, reason="Test needs shellcheck") def test_shebang_tpl_macos() -> None: - sh = format_shebang("python3", SHEBANG_TPL_MACOS).split("'''")[1].strip("'") + sh = _extract_shell_snippet(SHEBANG_TPL_MACOS) proc = subprocess.Popen(["shellcheck", "-s", "sh", "-"], stdin=subprocess.PIPE) + assert proc.stdin is not None proc.stdin.write(sh.encode()) proc.communicate() assert proc.returncode == 0 @@ -413,8 +445,46 @@ def test_sanitize_sys_path_with_editable_paths(tmp_path: pathlib.Path) -> None: def test_makepath_oserror() -> None: - with patch("relenv.common.os.path.abspath", side_effect=OSError): + with patch("os.path.abspath", side_effect=OSError): result, case = makepath("foo", "Bar") expected = os.path.join("foo", "Bar") assert result == expected assert case == os.path.normcase(expected) + + +def test_copyright_headers() -> None: + """Verify all Python source files have the correct copyright header.""" + expected_header = ( + "# Copyright 2022-2025 Broadcom.\n" "# SPDX-License-Identifier: Apache-2.0\n" + ) + + # Find all Python files in relenv/ and tests/ + root = MODULE_DIR.parent + python_files: list[pathlib.Path] = [] + for directory in ("relenv", "tests"): + dir_path = root / directory + if dir_path.exists(): + python_files.extend(dir_path.rglob("*.py")) + + # Skip generated and cache files + python_files = [ + f + for f in python_files + if "__pycache__" not in f.parts + and ".nox" not in f.parts + and "build" not in f.parts + ] + + failures = [] + for py_file in python_files: + with open(py_file, "r", encoding="utf-8") as f: + content = f.read() + + if not content.startswith(expected_header): + # Read first two lines for error message + lines = content.split("\n", 2) + actual = "\n".join(lines[:2]) + "\n" if len(lines) >= 2 else content + failures.append(f"{py_file.relative_to(root)}: {actual!r}") + + if failures: + pytest.fail("Files with incorrect copyright headers:\n" + "\n".join(failures)) diff --git a/tests/test_create.py b/tests/test_create.py index 542515b0..d8884f1d 100644 --- a/tests/test_create.py +++ b/tests/test_create.py @@ -12,12 +12,12 @@ from relenv.create import CreateException, chdir, create -def test_chdir(tmp_path): +def test_chdir(tmp_path: pathlib.Path) -> None: with chdir(str(tmp_path)): assert pathlib.Path(os.getcwd()) == tmp_path -def test_create(tmp_path): +def test_create(tmp_path: pathlib.Path) -> None: to_be_archived = tmp_path / "to_be_archived" to_be_archived.mkdir() test_file = to_be_archived / "testfile" @@ -34,21 +34,21 @@ def test_create(tmp_path): assert (to_dir / to_be_archived.name / test_file.name) in to_dir.glob("**/*") -def test_create_tar_doesnt_exist(tmp_path): +def test_create_tar_doesnt_exist(tmp_path: pathlib.Path) -> None: tar_file = tmp_path / "fake_archive" with patch("relenv.create.archived_build", return_value=tar_file): with pytest.raises(CreateException): create("foo", dest=tmp_path) -def test_create_directory_exists(tmp_path): +def test_create_directory_exists(tmp_path: pathlib.Path) -> None: (tmp_path / "foo").mkdir() with pytest.raises(CreateException): create("foo", dest=tmp_path) -def test_create_arches_directory_exists(tmp_path): - mocked_arches = {key: [] for key in arches.keys()} +def test_create_arches_directory_exists(tmp_path: pathlib.Path) -> None: + mocked_arches: dict[str, list[str]] = {key: [] for key in arches.keys()} with patch("relenv.create.arches", mocked_arches): with pytest.raises(CreateException): create("foo", dest=tmp_path) diff --git a/tests/test_downloads.py b/tests/test_downloads.py index 15eb4c63..c7a87bef 100644 --- a/tests/test_downloads.py +++ b/tests/test_downloads.py @@ -1,22 +1,24 @@ # Copyright 2022-2025 Broadcom. -# SPDX-License-Identifier: Apache-2 +# SPDX-License-Identifier: Apache-2.0 import pathlib import subprocess import sys + +# mypy: ignore-errors from unittest.mock import patch from relenv.build.common import Download from relenv.common import RelenvException -def test_download_url(): +def test_download_url() -> None: download = Download( "test", "https://test.com/{version}/test-{version}.tar.xz", version="1.0.0" ) assert download.url == "https://test.com/1.0.0/test-1.0.0.tar.xz" -def test_download_url_change_version(): +def test_download_url_change_version() -> None: download = Download( "test", "https://test.com/{version}/test-{version}.tar.xz", version="1.0.0" ) @@ -24,7 +26,7 @@ def test_download_url_change_version(): assert download.url == "https://test.com/1.2.2/test-1.2.2.tar.xz" -def test_download_filepath(): +def test_download_filepath() -> None: download = Download( "test", "https://test.com/{version}/test-{version}.tar.xz", @@ -38,7 +40,7 @@ def test_download_filepath(): assert str(download.filepath) == "/tmp/test-1.0.0.tar.xz" -def test_download_filepath_change_destination(): +def test_download_filepath_change_destination() -> None: download = Download( "test", "https://test.com/{version}/test-{version}.tar.xz", @@ -53,7 +55,7 @@ def test_download_filepath_change_destination(): assert str(download.filepath) == "/tmp/foo/test-1.0.0.tar.xz" -def test_download_exists(tmp_path): +def test_download_exists(tmp_path: pathlib.Path) -> None: download = Download( "test", "https://test.com/{version}/test-{version}.tar.xz", @@ -65,14 +67,14 @@ def test_download_exists(tmp_path): assert download.exists() is True -def test_validate_md5sum(tmp_path): +def test_validate_md5sum(tmp_path: pathlib.Path) -> None: fake_md5 = "fakemd5" with patch("relenv.build.common.verify_checksum") as run_mock: assert Download.validate_checksum(str(tmp_path), fake_md5) is True run_mock.assert_called_with(str(tmp_path), fake_md5) -def test_validate_md5sum_failed(tmp_path): +def test_validate_md5sum_failed(tmp_path: pathlib.Path) -> None: fake_md5 = "fakemd5" with patch( "relenv.build.common.verify_checksum", side_effect=RelenvException @@ -81,7 +83,7 @@ def test_validate_md5sum_failed(tmp_path): run_mock.assert_called_with(str(tmp_path), fake_md5) -def test_validate_signature(tmp_path): +def test_validate_signature(tmp_path: pathlib.Path) -> None: sig = "fakesig" with patch("relenv.build.common.runcmd") as run_mock: assert Download.validate_signature(str(tmp_path), sig) is True @@ -92,7 +94,7 @@ def test_validate_signature(tmp_path): ) -def test_validate_signature_failed(tmp_path): +def test_validate_signature_failed(tmp_path: pathlib.Path) -> None: sig = "fakesig" with patch("relenv.build.common.runcmd", side_effect=RelenvException) as run_mock: assert Download.validate_signature(str(tmp_path), sig) is False diff --git a/tests/test_fips_photon.py b/tests/test_fips_photon.py index 4e5728a0..add6c331 100644 --- a/tests/test_fips_photon.py +++ b/tests/test_fips_photon.py @@ -1,9 +1,12 @@ -# Copyright 2023-2025 Broadcom. +# Copyright 2022-2025 Broadcom. # SPDX-License-Identifier: Apache-2.0 # import os import pathlib + +# mypy: ignore-errors import subprocess +from typing import Any import pytest @@ -12,7 +15,7 @@ from .conftest import get_build_version -def check_test_environment(): +def check_test_environment() -> bool: path = pathlib.Path("/etc/os-release") if path.exists(): release = path.read_text() @@ -28,7 +31,7 @@ def check_test_environment(): ] -def test_fips_mode(pyexec, build): +def test_fips_mode(pyexec: str, build: Any) -> None: _install_ppbt(pyexec) env = os.environ.copy() proc = subprocess.run( diff --git a/tests/test_module_imports.py b/tests/test_module_imports.py index f7307609..d9455c84 100644 --- a/tests/test_module_imports.py +++ b/tests/test_module_imports.py @@ -1,18 +1,29 @@ -# Copyright 2025 Broadcom. +# Copyright 2022-2025 Broadcom. # SPDX-License-Identifier: Apache-2.0 # from __future__ import annotations import importlib import pathlib -from typing import List +from typing import TYPE_CHECKING, Any, Callable, List, Sequence, TypeVar, cast import pytest +if TYPE_CHECKING: + from _pytest.mark.structures import ParameterSet -def _top_level_modules() -> List[pytest.ParameterSet]: +F = TypeVar("F", bound=Callable[..., object]) + + +def typed_parametrize(*args: Any, **kwargs: Any) -> Callable[[F], F]: + """Type-aware wrapper around pytest.mark.parametrize.""" + decorator = pytest.mark.parametrize(*args, **kwargs) + return cast(Callable[[F], F], decorator) + + +def _top_level_modules() -> Sequence["ParameterSet"]: relenv_dir = pathlib.Path(__file__).resolve().parents[1] / "relenv" - params: List[pytest.ParameterSet] = [] + params: List["ParameterSet"] = [] for path in sorted(relenv_dir.iterdir()): if not path.is_file() or path.suffix != ".py": continue @@ -25,7 +36,7 @@ def _top_level_modules() -> List[pytest.ParameterSet]: return params -@pytest.mark.parametrize("module_name", _top_level_modules()) +@typed_parametrize("module_name", _top_level_modules()) def test_import_top_level_module(module_name: str) -> None: """ Ensure each top-level module in the relenv package can be imported. diff --git a/tests/test_pyversions_runtime.py b/tests/test_pyversions_runtime.py index 346b060e..b88d55ea 100644 --- a/tests/test_pyversions_runtime.py +++ b/tests/test_pyversions_runtime.py @@ -1,4 +1,4 @@ -# Copyright 2025 Broadcom. +# Copyright 2022-2025 Broadcom. # SPDX-License-Identifier: Apache-2.0 # from __future__ import annotations @@ -6,7 +6,7 @@ import hashlib import pathlib import subprocess -from typing import Dict +from typing import Any, Dict, Sequence import pytest @@ -69,7 +69,9 @@ def test_verify_signature_success( ) -> None: called: Dict[str, list[str]] = {} - def fake_run(cmd, **kwargs): + def fake_run( + cmd: Sequence[str], **kwargs: Any + ) -> subprocess.CompletedProcess[bytes]: called.setdefault("cmd", []).extend(cmd) return subprocess.CompletedProcess(cmd, 0, stdout=b"", stderr=b"") @@ -83,7 +85,9 @@ def test_verify_signature_failure_with_missing_key( ) -> None: responses: list[str] = [] - def fake_run(cmd, **kwargs): + def fake_run( + cmd: Sequence[str], **kwargs: Any + ) -> subprocess.CompletedProcess[bytes]: if len(responses) == 0: responses.append("first") stderr = b"gpg: error\n[GNUPG:] INV_SGNR 0 ABCDEF12\nNo public key\n" diff --git a/tests/test_relocate.py b/tests/test_relocate.py index 298f6192..0a9054c4 100644 --- a/tests/test_relocate.py +++ b/tests/test_relocate.py @@ -1,8 +1,9 @@ # Copyright 2022-2025 Broadcom. -# SPDX-License-Identifier: Apache-2 +# SPDX-License-Identifier: Apache-2.0 import pathlib import shutil from textwrap import dedent +from typing import Any from unittest.mock import MagicMock, call, patch import pytest @@ -23,103 +24,108 @@ class BaseProject: - def __init__(self, root_dir): + def __init__(self, root_dir: pathlib.Path) -> None: self.root_dir = root_dir self.libs_dir = self.root_dir / "lib" - def make_project(self): + def make_project(self) -> None: self.root_dir.mkdir(parents=True, exist_ok=True) self.libs_dir.mkdir(parents=True, exist_ok=True) - def destroy_project(self): + def destroy_project(self) -> None: # Make sure the project is torn down properly - if pathlib.Path(self.root_dir).exists(): + if self.root_dir.exists(): shutil.rmtree(self.root_dir, ignore_errors=True) - def add_file(self, name, contents, *relpath, binary=False): + def add_file( + self, + name: str, + contents: bytes | str, + *relpath: str, + binary: bool = False, + ) -> pathlib.Path: file_path = (self.root_dir / pathlib.Path(*relpath) / name).resolve() file_path.parent.mkdir(parents=True, exist_ok=True) if binary: - file_path.write_bytes(contents) + data = contents if isinstance(contents, bytes) else contents.encode() + file_path.write_bytes(data) else: - file_path.write_text(contents) + text = contents.decode() if isinstance(contents, bytes) else contents + file_path.write_text(text) return file_path - def __enter__(self): + def __enter__(self) -> "BaseProject": self.make_project() return self - def __exit__(self, *exc): + def __exit__(self, exc_type: Any, exc: Any, tb: Any) -> None: self.destroy_project() class LinuxProject(BaseProject): - def add_simple_elf(self, name, *relpath): + def add_simple_elf(self, name: str, *relpath: str) -> pathlib.Path: return self.add_file(name, b"\x7f\x45\x4c\x46", *relpath, binary=True) -def test_is_macho_true(tmp_path): +def test_is_macho_true(tmp_path: pathlib.Path) -> None: lib_path = tmp_path / "test.dylib" lib_path.write_bytes(b"\xcf\xfa\xed\xfe") assert is_macho(lib_path) is True -def test_is_macho_false(tmp_path): +def test_is_macho_false(tmp_path: pathlib.Path) -> None: lib_path = tmp_path / "test.dylib" lib_path.write_bytes(b"\xcf\xfa\xed\xfa") assert is_macho(lib_path) is False -def test_is_macho_not_a_file(tmp_path): +def test_is_macho_not_a_file(tmp_path: pathlib.Path) -> None: with pytest.raises(IsADirectoryError): assert is_macho(tmp_path) is False -def test_is_macho_file_does_not_exist(tmp_path): +def test_is_macho_file_does_not_exist(tmp_path: pathlib.Path) -> None: lib_path = tmp_path / "test.dylib" with pytest.raises(FileNotFoundError): assert is_macho(lib_path) is False -def test_is_elf_true(tmp_path): +def test_is_elf_true(tmp_path: pathlib.Path) -> None: lib_path = tmp_path / "test.so" lib_path.write_bytes(b"\x7f\x45\x4c\x46") assert is_elf(lib_path) is True -def test_is_elf_false(tmp_path): +def test_is_elf_false(tmp_path: pathlib.Path) -> None: lib_path = tmp_path / "test.so" lib_path.write_bytes(b"\xcf\xfa\xed\xfa") assert is_elf(lib_path) is False -def test_is_elf_not_a_file(tmp_path): +def test_is_elf_not_a_file(tmp_path: pathlib.Path) -> None: with pytest.raises(IsADirectoryError): assert is_elf(tmp_path) is False -def test_is_elf_file_does_not_exist(tmp_path): +def test_is_elf_file_does_not_exist(tmp_path: pathlib.Path) -> None: lib_path = tmp_path / "test.so" with pytest.raises(FileNotFoundError): assert is_elf(lib_path) is False -def test_parse_otool_l(): - # XXX - pass +def test_parse_otool_l() -> None: + pytest.skip("Not implemented") -def test_parse_macho(): - # XXX - pass +def test_parse_macho() -> None: + pytest.skip("Not implemented") -def test_handle_macho(): - # XXX - pass +def test_handle_macho() -> None: + pytest.skip("Not implemented") -def test_parse_readelf_d_no_rpath(): +def test_parse_readelf_d_no_rpath() -> None: section = dedent( """ Dynamic section at offset 0xbdd40 contains 28 entries: @@ -134,7 +140,7 @@ def test_parse_readelf_d_no_rpath(): assert parse_readelf_d(section) == [] -def test_parse_readelf_d_rpath(): +def test_parse_readelf_d_rpath() -> None: section = dedent( """ Dynamic section at offset 0x58000 contains 27 entries: @@ -149,19 +155,19 @@ def test_parse_readelf_d_rpath(): assert parse_readelf_d(section) == ["$ORIGIN/../.."] -def test_is_in_dir(tmp_path): +def test_is_in_dir(tmp_path: pathlib.Path) -> None: parent = tmp_path / "foo" child = tmp_path / "foo" / "bar" / "bang" assert is_in_dir(child, parent) is True -def test_is_in_dir_false(tmp_path): +def test_is_in_dir_false(tmp_path: pathlib.Path) -> None: parent = tmp_path / "foo" child = tmp_path / "bar" / "bang" assert is_in_dir(child, parent) is False -def test_patch_rpath(tmp_path): +def test_patch_rpath(tmp_path: pathlib.Path) -> None: path = str(tmp_path / "test") new_rpath = str(pathlib.Path("$ORIGIN", "..", "..", "lib")) with patch("subprocess.run", return_value=MagicMock(returncode=0)): @@ -172,7 +178,7 @@ def test_patch_rpath(tmp_path): assert patch_rpath(path, new_rpath) == new_rpath -def test_patch_rpath_failed(tmp_path): +def test_patch_rpath_failed(tmp_path: pathlib.Path) -> None: path = str(tmp_path / "test") new_rpath = str(pathlib.Path("$ORIGIN", "..", "..", "lib")) with patch("subprocess.run", return_value=MagicMock(returncode=1)): @@ -183,7 +189,7 @@ def test_patch_rpath_failed(tmp_path): assert patch_rpath(path, new_rpath) is False -def test_patch_rpath_no_change(tmp_path): +def test_patch_rpath_no_change(tmp_path: pathlib.Path) -> None: path = str(tmp_path / "test") new_rpath = str(pathlib.Path("$ORIGIN", "..", "..", "lib")) with patch("subprocess.run", return_value=MagicMock(returncode=0)): @@ -191,7 +197,7 @@ def test_patch_rpath_no_change(tmp_path): assert patch_rpath(path, new_rpath, only_relative=False) == new_rpath -def test_patch_rpath_remove_non_relative(tmp_path): +def test_patch_rpath_remove_non_relative(tmp_path: pathlib.Path) -> None: path = str(tmp_path / "test") new_rpath = str(pathlib.Path("$ORIGIN", "..", "..", "lib")) with patch("subprocess.run", return_value=MagicMock(returncode=0)): @@ -202,7 +208,7 @@ def test_patch_rpath_remove_non_relative(tmp_path): assert patch_rpath(path, new_rpath) == new_rpath -def test_main_linux(tmp_path): +def test_main_linux(tmp_path: pathlib.Path) -> None: proj = LinuxProject(tmp_path) simple = proj.add_simple_elf("simple.so", "foo", "bar") simple2 = proj.add_simple_elf("simple2.so", "foo", "bar", "bop") @@ -218,7 +224,7 @@ def test_main_linux(tmp_path): elf_mock.assert_has_calls(calls, any_order=True) -def test_handle_elf(tmp_path): +def test_handle_elf(tmp_path: pathlib.Path) -> None: proj = LinuxProject(tmp_path / "proj") pybin = proj.add_simple_elf("python", "foo") libcrypt = tmp_path / "libcrypt.so.2" @@ -245,7 +251,7 @@ def test_handle_elf(tmp_path): patch_rpath_mock.assert_called_with(str(pybin), "$ORIGIN/../lib") -def test_handle_elf_rpath_only(tmp_path): +def test_handle_elf_rpath_only(tmp_path: pathlib.Path) -> None: proj = LinuxProject(tmp_path / "proj") pybin = proj.add_simple_elf("python", "foo") libcrypt = proj.libs_dir / "libcrypt.so.2" diff --git a/tests/test_relocate_module.py b/tests/test_relocate_module.py index ebda8104..34cacffe 100644 --- a/tests/test_relocate_module.py +++ b/tests/test_relocate_module.py @@ -1,4 +1,4 @@ -# Copyright 2025 Broadcom. +# Copyright 2022-2025 Broadcom. # SPDX-License-Identifier: Apache-2.0 # from __future__ import annotations @@ -7,7 +7,7 @@ import pathlib import shutil import subprocess -from typing import Dict, List +from typing import Dict, List, Tuple import pytest @@ -192,7 +192,7 @@ def test_handle_macho_copies_when_needed( monkeypatch.setattr(os.path, "exists", lambda path: path == str(source_lib)) - copied = {} + copied: Dict[str, Tuple[str, str]] = {} monkeypatch.setattr( shutil, "copy", lambda src, dst: copied.setdefault("copy", (src, dst)) diff --git a/tests/test_runtime.py b/tests/test_runtime.py index c3d8c19b..b2c8979d 100644 --- a/tests/test_runtime.py +++ b/tests/test_runtime.py @@ -1,4 +1,4 @@ -# Copyright 2023-2025 Broadcom. +# Copyright 2022-2025 Broadcom. # SPDX-License-Identifier: Apache-2.0 # from __future__ import annotations @@ -15,6 +15,8 @@ import relenv.runtime +# mypy: ignore-errors + def _raise(exc: Exception): raise exc @@ -1466,15 +1468,14 @@ def loader(name: str, path: str) -> ModuleType: def test_relocate_cached(monkeypatch: pytest.MonkeyPatch) -> None: module = ModuleType("relenv.relocate.cached") - monkeypatch.setattr(relenv.runtime.relocate, "relocate", module, raising=False) + monkeypatch.setattr(relenv.runtime, "_RELOCATE", module, raising=False) result = relenv.runtime.relocate() assert result is module def test_buildenv_cached(monkeypatch: pytest.MonkeyPatch) -> None: module = ModuleType("relenv.buildenv.cached") - monkeypatch.setattr(relenv.runtime.buildenv, "builenv", True, raising=False) - monkeypatch.setattr(relenv.runtime.buildenv, "buildenv", module, raising=False) + monkeypatch.setattr(relenv.runtime, "_BUILDENV", module, raising=False) result = relenv.runtime.buildenv() assert result is module @@ -1536,7 +1537,8 @@ def original(self: object) -> bytes: # type: ignore[override] shebang = result.decode().strip() assert shebang.startswith("#!") path_part = shebang[2:] - expected = os.fspath(pathlib.Path("/") / pathlib.Path("bin") / "python") + # Use PurePosixPath since we're testing Linux behavior + expected = os.fspath(pathlib.PurePosixPath("/") / "bin" / "python") assert path_part == expected diff --git a/tests/test_verify_build.py b/tests/test_verify_build.py index d61644f6..22066ad2 100644 --- a/tests/test_verify_build.py +++ b/tests/test_verify_build.py @@ -1,16 +1,19 @@ # Copyright 2022-2025 Broadcom. -# SPDX-License-Identifier: Apache-2 +# SPDX-License-Identifier: Apache-2.0 +# mypy: ignore-errors """ Verify relenv builds. """ import json import os import pathlib +import shlex import shutil import subprocess import sys import textwrap import time +import uuid import packaging import pytest @@ -65,6 +68,48 @@ def _install_ppbt(pexec): assert p.returncode == 0, "Failed to extract toolchain" +def _setup_buildenv(pyexec, env): + """ + Setup build environment variables for compiling C extensions. + + On Linux, this calls 'relenv buildenv --json' to get the proper compiler + flags and paths to use the relenv toolchain and bundled libraries instead + of system libraries. + + :param pyexec: Path to the relenv Python executable + :param env: Environment dictionary to update with buildenv variables + """ + if sys.platform != "linux": + return + + p = subprocess.run( + [ + str(pyexec), + "-m", + "relenv", + "buildenv", + "--json", + ], + capture_output=True, + ) + try: + buildenv = json.loads(p.stdout) + except json.JSONDecodeError: + assert False, f"Failed to decode json: {p.stdout.decode()} {p.stderr.decode()}" + + for k in buildenv: + env[k] = buildenv[k] + + +@pytest.fixture(autouse=True) +def _clear_ssl_env(monkeypatch: pytest.MonkeyPatch) -> None: + """ + Ensure preceding tests do not leave stale certificate paths behind. + """ + monkeypatch.delenv("SSL_CERT_FILE", raising=False) + monkeypatch.delenv("SSL_CERT_DIR", raising=False) + + @pytest.fixture(scope="module") def arch(): return build_arch() @@ -320,27 +365,7 @@ def test_pip_install_salt_w_package_requirements( _install_ppbt(pyexec) env = os.environ.copy() - - # if sys.platform == "linux": - # p = subprocess.run( - # [ - # pyexec, - # "-m", - # "relenv", - # "buildenv", - # "--json", - # ], - # capture_output=True, - # ) - # try: - # buildenv = json.loads(p.stdout) - # except json.JSONDecodeError: - # assert ( - # False - # ), f"Failed to decode json: {p.stdout.decode()} {p.stderr.decode()}" - # for k in buildenv: - # env[k] = buildenv[k] - + _setup_buildenv(pyexec, env) env["RELENV_BUILDENV"] = "yes" env["USE_STATIC_REQUIREMENTS"] = "1" p = subprocess.run( @@ -413,7 +438,15 @@ def test_pip_install_salt_w_package_requirements( "26.4.0", ], ) -def test_pip_install_pyzmq(pipexec, pyexec, pyzmq_version, build_version, arch, build): +def test_pip_install_pyzmq( + pipexec, + pyexec, + pyzmq_version, + build_version, + arch, + build, + tmp_path: pathlib.Path, +) -> None: if pyzmq_version == "23.2.0" and "3.12" in build_version: pytest.xfail(f"{pyzmq_version} does not install on 3.12") @@ -483,6 +516,157 @@ def test_pip_install_pyzmq(pipexec, pyexec, pyzmq_version, build_version, arch, env["ZMQ_PREFIX"] = "bundled" env["RELENV_BUILDENV"] = "yes" env["USE_STATIC_REQUIREMENTS"] = "1" + + if sys.platform == "linux": + fake_bsd_root = tmp_path / "fake_libbsd" + (fake_bsd_root / "bsd").mkdir(parents=True, exist_ok=True) + (fake_bsd_root / "bsd" / "string.h").write_text( + textwrap.dedent( + """\ + #ifndef RELENV_FAKE_BSD_STRING_H + #define RELENV_FAKE_BSD_STRING_H + + #include + + #ifdef __cplusplus + extern "C" { + #endif + + size_t strlcpy(char *dst, const char *src, size_t siz); + size_t strlcat(char *dst, const char *src, size_t siz); + + #ifdef __cplusplus + } + #endif + + #endif /* RELENV_FAKE_BSD_STRING_H */ + """ + ) + ) + (fake_bsd_root / "string.c").write_text( + textwrap.dedent( + """\ + #include + #include + + static size_t relenv_strlen(const char *s) { + size_t len = 0; + if (s == NULL) { + return 0; + } + while (s[len] != '\\0') { + ++len; + } + return len; + } + + static size_t relenv_strnlen(const char *s, size_t maxlen) { + size_t len = 0; + if (s == NULL) { + return 0; + } + while (len < maxlen && s[len] != '\\0') { + ++len; + } + return len; + } + + size_t strlcpy(char *dst, const char *src, size_t siz) { + size_t src_len = relenv_strlen(src); + if (siz == 0 || dst == NULL) { + return src_len; + } + size_t copy = src_len; + if (copy >= siz) { + copy = siz - 1; + } + if (copy > 0 && src != NULL) { + memcpy(dst, src, copy); + } + dst[copy] = '\\0'; + return src_len; + } + + size_t strlcat(char *dst, const char *src, size_t siz) { + size_t dst_len = relenv_strnlen(dst, siz); + size_t src_len = relenv_strlen(src); + size_t initial_len = dst_len; + if (dst == NULL || dst_len >= siz) { + return initial_len + src_len; + } + size_t space = (siz > dst_len + 1) ? siz - dst_len - 1 : 0; + size_t copy = 0; + if (space > 0 && src != NULL) { + copy = src_len; + if (copy > space) { + copy = space; + } + if (copy > 0) { + memcpy(dst + dst_len, src, copy); + } + dst_len += copy; + } + dst[dst_len] = '\\0'; + return initial_len + src_len; + } + """ + ) + ) + include_flag = f"-I{fake_bsd_root}" + for key in ("CFLAGS", "CXXFLAGS", "CPPFLAGS"): + env[key] = " ".join(filter(None, [env.get(key, ""), include_flag])).strip() + env["CPATH"] = ":".join( + filter(None, [str(fake_bsd_root), env.get("CPATH", "")]) + ) + for key in ("C_INCLUDE_PATH", "CPLUS_INCLUDE_PATH"): + env[key] = ":".join(filter(None, [str(fake_bsd_root), env.get(key, "")])) + cc_value = env.get("CC") + if cc_value: + cc_args = shlex.split(cc_value) + else: + cc_path = shutil.which("cc") or shutil.which("gcc") + assert cc_path, "C compiler not found for libbsd shim" + cc_args = [cc_path] + obj_path = fake_bsd_root / "string.o" + compile_result = subprocess.run( + cc_args + + [ + "-c", + "-O2", + "-fPIC", + "-o", + str(obj_path), + str(fake_bsd_root / "string.c"), + ], + env=env, + ) + assert compile_result.returncode == 0, "Failed to compile libbsd shim" + ar_value = env.get("AR") + if ar_value: + ar_args = shlex.split(ar_value) + else: + ar_path = shutil.which("ar") + assert ar_path, "Archiver not found for libbsd shim" + ar_args = [ar_path] + libbsd_static = fake_bsd_root / "libbsd.a" + archive_result = subprocess.run( + ar_args + ["rcs", str(libbsd_static), str(obj_path)], + env=env, + ) + assert archive_result.returncode == 0, "Failed to archive libbsd shim" + lib_dir_flag = f"-L{fake_bsd_root}" + env["LDFLAGS"] = " ".join( + filter(None, [lib_dir_flag, env.get("LDFLAGS", "")]) + ).strip() + env["LIBS"] = " ".join(filter(None, ["-lbsd", env.get("LIBS", "")])).strip() + env["LIBRARY_PATH"] = ":".join( + filter(None, [str(fake_bsd_root), env.get("LIBRARY_PATH", "")]) + ) + env["ac_cv_func_strlcpy"] = "yes" + env["ac_cv_func_strlcat"] = "yes" + env["ac_cv_have_decl_strlcpy"] = "yes" + env["ac_cv_have_decl_strlcat"] = "yes" + p = subprocess.run( [ str(pipexec), @@ -826,6 +1010,7 @@ def validate_shebang(path): def test_moving_pip_installed_c_extentions(pipexec, pyexec, build, minor_version): _install_ppbt(pyexec) env = os.environ.copy() + _setup_buildenv(pyexec, env) env["RELENV_DEBUG"] = "yes" env["RELENV_BUILDENV"] = "yes" p = subprocess.run( @@ -857,11 +1042,6 @@ def test_cryptography_rpath( pyexec, pipexec, build, minor_version, cryptography_version ): _install_ppbt(pyexec) - # log.warn("Extract ppbt") - # p = subprocess.run( - # [pyexec, "-c", "import ppbt; ppbt.extract()"], - # ) - # assert p.returncode == 0 def find_library(path, search): for root, dirs, files in os.walk(path): @@ -870,6 +1050,7 @@ def find_library(path, search): return fname env = os.environ.copy() + _setup_buildenv(pyexec, env) env["RELENV_BUILDENV"] = "yes" p = subprocess.run( [ @@ -1155,7 +1336,7 @@ def test_install_python_ldap(pipexec, pyexec, build): tar xvf cyrus-sasl-{saslver}.tar.gz cd cyrus-sasl-{saslver} ./configure --prefix=$RELENV_PATH - make + make -j"$(nproc)" make install cd .. @@ -1164,7 +1345,7 @@ def test_install_python_ldap(pipexec, pyexec, build): tar xvf openldap-{ldapver}.tgz cd openldap-{ldapver} ./configure --prefix=$RELENV_PATH - make + make -j"$(nproc)" make install cd .. @@ -1184,6 +1365,7 @@ def test_install_python_ldap(pipexec, pyexec, build): subprocess.run(["/usr/bin/bash", "buildscript.sh"], check=True) env = os.environ.copy() + _setup_buildenv(pyexec, env) env["RELENV_DEBUG"] = "yes" env["RELENV_BUILDENV"] = "yes" @@ -1215,9 +1397,16 @@ def test_install_with_target_shebang(pipexec, build, minor_version): check=True, env=env, ) - shebang = pathlib.Path(extras / "bin" / "cowsay").open().readlines()[2].strip() + exec_line = "" + for line in pathlib.Path(extras / "bin" / "cowsay").read_text().splitlines(): + stripped = line.strip() + if not stripped or stripped.startswith("#"): + continue + if stripped.startswith('"exec"'): + exec_line = stripped + break assert ( - shebang + exec_line == '"exec" "$(dirname "$(readlink -f "$0")")/../../bin/python{}" "$0" "$@"'.format( minor_version ) @@ -1764,7 +1953,7 @@ def test_install_editable_package_in_extras( def rockycontainer(build): if not shutil.which("docker"): pytest.skip(reason="No docker binary found") - name = "rocky10" + name = f"rocky10-{uuid.uuid4().hex}" subprocess.run( [ "docker", @@ -1815,10 +2004,35 @@ def rockycontainer(build): @pytest.mark.skip_on_windows -def test_no_openssl_binary(rockycontainer, pipexec, pyexec): +def test_no_openssl_binary(rockycontainer, pipexec, pyexec, build): _install_ppbt(pyexec) env = os.environ.copy() + _setup_buildenv(pyexec, env) env["RELENV_BUILDENV"] = "yes" + if sys.platform == "linux": + toolchain_path = pathlib.Path(env["TOOLCHAIN_PATH"]) + triplet = env["TRIPLET"] + sysroot_lib = toolchain_path / triplet / "sysroot" / "lib" + sysroot_lib.mkdir(parents=True, exist_ok=True) + bz2_sources = sorted( + (pathlib.Path(build) / "lib").glob("libbz2.so*"), + key=lambda p: len(p.name), + ) + if not bz2_sources: + pytest.fail( + "libbz2.so not found in relenv build; cryptography build cannot proceed" + ) + for bz2_source in bz2_sources: + target = sysroot_lib / bz2_source.name + if target.exists() or target.is_symlink(): + if target.is_symlink(): + try: + if target.readlink() == bz2_source: + continue + except OSError: + pass + target.unlink() + target.symlink_to(bz2_source) proc = subprocess.run( [ str(pipexec),