diff --git a/conan/api/model/list.py b/conan/api/model/list.py index de5e4725232..4f52cbc86ab 100644 --- a/conan/api/model/list.py +++ b/conan/api/model/list.py @@ -328,6 +328,15 @@ def items(self) -> Iterable[Tuple[RecipeReference, Dict[PkgReference, Dict]]]: packages[pref] = prev_info yield recipe, packages + def has_items(self) -> bool: + """Whether the package list contains any element with revision that can be iterated.""" + it = self.items() + try: + next(it) + return True + except StopIteration: + return False + def recipe_dict(self, ref: RecipeReference): """ Gives read/write access to the dictionary containing a specific RecipeReference information. diff --git a/conan/api/subapi/cache.py b/conan/api/subapi/cache.py index 5ffb83016d5..d0d6d6fdbbf 100644 --- a/conan/api/subapi/cache.py +++ b/conan/api/subapi/cache.py @@ -17,6 +17,9 @@ from conan.errors import ConanException from conan.api.model import PkgReference from conan.api.model import RecipeReference +from conan.internal.api.uploader import PackagePreparator +from conan.internal.conan_app import ConanApp +from conan.internal.rest.pkg_sign import PkgSignaturesPlugin from conan.internal.util.dates import revision_timestamp_now from conan.internal.util.files import rmdir, mkdir, remove, save @@ -77,6 +80,76 @@ def check_integrity(self, package_list): checker = IntegrityChecker(cache) checker.check(package_list) + def sign(self, package_list): + """Sign packages with the package signing plugin""" + cache = PkgCache(self._conan_api.cache_folder, self._api_helpers.global_conf) + pkg_signer = PkgSignaturesPlugin(cache, self._conan_api.home_folder) + if not pkg_signer.is_sign_configured: + raise ConanException( + "The sign() function in the package sign plugin is not defined. For more " + "information on how to configure the plugin, please read the documentation at " + "https://docs.conan.io/2/reference/extensions/package_signing.html.") + if not package_list.has_items(): + raise ConanException("No packages to sign in the pkglist provided.") + + app = ConanApp(self._conan_api) + preparator = PackagePreparator(app, self._api_helpers.global_conf) + # Some packages can have missing sources/exports_sources + enabled_remotes = self._conan_api.remotes.list() + preparator.prepare(package_list, enabled_remotes, force=True) + + for rref, packages in package_list.items(): + recipe_bundle = package_list.recipe_dict(rref) + if recipe_bundle: + rref_folder = cache.recipe_layout(rref).download_export() + try: + pkg_signer.sign_pkg(rref, recipe_bundle.get("files", {}), rref_folder) + except Exception as e: + recipe_bundle["pkgsign_error"] = str(e) + for pref in packages: + pkg_bundle = package_list.package_dict(pref) + if pkg_bundle: + pref_folder = cache.pkg_layout(pref).download_package() + try: + pkg_signer.sign_pkg(pref, pkg_bundle.get("files", {}), pref_folder) + except Exception as e: + pkg_bundle["pkgsign_error"] = str(e) + return package_list + + def verify(self, package_list): + """Verify packages with the package signing plugin""" + cache = PkgCache(self._conan_api.cache_folder, self._api_helpers.global_conf) + pkg_signer = PkgSignaturesPlugin(cache, self._conan_api.home_folder) + if not pkg_signer.is_verify_configured: + raise ConanException( + "The verify() function in the package sign plugin is not defined. For more " + "information on how to configure the plugin, please read the documentation at " + "https://docs.conan.io/2/reference/extensions/package_signing.html.") + if not package_list.has_items(): + raise ConanException("No packages to verify in the pkglist provided.") + + for rref, packages in package_list.items(): + recipe_bundle = package_list.recipe_dict(rref) + if recipe_bundle: + rref_folder = cache.recipe_layout(rref).download_export() + files = {file: os.path.join(rref_folder, file) for file in + os.listdir(rref_folder) if not file.startswith(METADATA)} + try: + pkg_signer.verify(rref, rref_folder, files) + except Exception as e: + recipe_bundle["pkgsign_error"] = str(e) + for pref in packages: + pkg_bundle = package_list.package_dict(pref) + if pkg_bundle: + pref_folder = cache.pkg_layout(pref).download_package() + files = {file: os.path.join(pref_folder, file) for file in + os.listdir(pref_folder) if not file.startswith(METADATA)} + try: + pkg_signer.verify(pref, pref_folder, files) + except Exception as e: + pkg_bundle["pkgsign_error"] = str(e) + return package_list + def clean(self, package_list, source=True, build=True, download=True, temp=True, backup_sources=False): """ diff --git a/conan/cli/commands/cache.py b/conan/cli/commands/cache.py index 975ff02b3c3..a069631b3b6 100644 --- a/conan/cli/commands/cache.py +++ b/conan/cli/commands/cache.py @@ -5,12 +5,67 @@ from conan.api.output import cli_out_write, ConanOutput from conan.cli import make_abs_path from conan.cli.command import conan_command, conan_subcommand, OnceArgument -from conan.cli.commands.list import print_list_text, print_list_json +from conan.cli.commands.list import print_list_text, print_list_json, print_serial from conan.errors import ConanException from conan.api.model import PkgReference from conan.api.model import RecipeReference +def _get_package_sign_error(pkg_list): + conan_exception = ConanException("There were some errors in the package signing process. " + "Please check the output.") + for rref, packages in pkg_list.items(): + recipe_bundle = pkg_list.recipe_dict(rref) + if recipe_bundle: + if recipe_bundle.get("pkgsign_error"): + return conan_exception + for pref in packages: + pkg_bundle = pkg_list.package_dict(pref) + if pkg_bundle: + if pkg_bundle.get("pkgsign_error"): + return conan_exception + return None + + +def print_package_sign_text(data): + results_dict = data.get("results", {}) + + signs = [] + for ref_data in results_dict.values(): + for revision_data in ref_data.get("revisions", {}).values(): + sign = revision_data.get("pkgsign_error") + signs.append(sign) + + for pkg in revision_data.get("packages", {}).values(): + for prev in pkg.get("revisions", {}).values(): + sign = prev.get("pkgsign_error") + signs.append(sign) + + remove_keys = {"info", "timestamp", "files"} + + def clean_inline(obj): + if not isinstance(obj, dict): + return obj + + cleaned = {k: clean_inline(v) for k, v in obj.items() if k not in remove_keys} + + if "packages" in cleaned and not cleaned["packages"]: + del cleaned["packages"] + + return cleaned + + items = {ref: clean_inline(item) for ref, item in results_dict.items()} + + # Output + cli_out_write(f"[Package sign] Results:\n") + print_serial(items) + + # Summary + fail = sum((s is not None) for s in signs) + ok = len(signs) - fail + cli_out_write(f"\n[Package sign] Summary: OK={ok}, FAILED={fail}") + + def json_export(data): cli_out_write(json.dumps({"cache_path": data})) @@ -149,6 +204,75 @@ def cache_check_integrity(conan_api: ConanAPI, parser, subparser, *args): conan_api.cache.check_integrity(package_list) ConanOutput().success("Integrity check: ok") +@conan_subcommand(formatters={"text": print_package_sign_text, + "json": print_list_json}) +def cache_sign(conan_api: ConanAPI, parser, subparser, *args): + """ + Sign packages with the Package Signing Plugin + """ + subparser.add_argument("pattern", nargs="?", + help="Selection pattern for references to be signed") + subparser.add_argument("-l", "--list", action=OnceArgument, + help="Package list of packages to be signed") + subparser.add_argument('-p', '--package-query', action=OnceArgument, + help="Only the packages matching a specific query, e.g., " + "os=Windows AND (arch=x86 OR compiler=gcc)") + args = parser.parse_args(*args) + + if args.pattern is None and args.list is None: + raise ConanException("Missing pattern or package list file") + if args.pattern and args.list: + raise ConanException("Cannot specify both pattern and list") + + if args.list: + listfile = make_abs_path(args.list) + multi_package_list = MultiPackagesList.load(listfile) + package_list = multi_package_list["Local Cache"] + else: + ref_pattern = ListPattern(args.pattern, package_id="*") + package_list = conan_api.list.select(ref_pattern, package_query=args.package_query) + + conan_api.cache.sign(package_list) + return { + "conan_error": _get_package_sign_error(package_list), + "results": package_list.serialize() + } + + +@conan_subcommand(formatters={"text": print_package_sign_text, + "json": print_list_json}) +def cache_verify(conan_api: ConanAPI, parser, subparser, *args): + """ + Check the signature of packages with the Package Signing Plugin + """ + subparser.add_argument("pattern", nargs="?", + help="Selection pattern for references to verify their signature") + subparser.add_argument("-l", "--list", action=OnceArgument, + help="Package list of packages to verify their signature") + subparser.add_argument('-p', '--package-query', action=OnceArgument, + help="Only the packages matching a specific query, e.g., " + "os=Windows AND (arch=x86 OR compiler=gcc)") + args = parser.parse_args(*args) + + if args.pattern is None and args.list is None: + raise ConanException("Missing pattern or package list file") + if args.pattern and args.list: + raise ConanException("Cannot specify both pattern and list") + + if args.list: + listfile = make_abs_path(args.list) + multi_package_list = MultiPackagesList.load(listfile) + package_list = multi_package_list["Local Cache"] + else: + ref_pattern = ListPattern(args.pattern, package_id="*") + package_list = conan_api.list.select(ref_pattern, package_query=args.package_query) + + conan_api.cache.verify(package_list) + return { + "conan_error": _get_package_sign_error(package_list), + "results": package_list.serialize() + } + @conan_subcommand(formatters={"text": print_list_text, "json": print_list_json}) diff --git a/conan/cli/commands/list.py b/conan/cli/commands/list.py index 3f8b6f7da70..37c642e8380 100644 --- a/conan/cli/commands/list.py +++ b/conan/cli/commands/list.py @@ -34,7 +34,7 @@ def print_serial(item, indent=None, color_index=None): if isinstance(item, dict): for k, v in item.items(): if isinstance(v, (str, int)): - if k.lower() == "error": + if "error" in k.lower(): color = Color.BRIGHT_RED k = "ERROR" elif k.lower() == "warning": diff --git a/conan/internal/api/uploader.py b/conan/internal/api/uploader.py index 4a8f90aca91..d62419c3b07 100644 --- a/conan/internal/api/uploader.py +++ b/conan/internal/api/uploader.py @@ -85,7 +85,7 @@ def __init__(self, app: ConanApp, global_conf): self._app = app self._global_conf = global_conf - def prepare(self, pkg_list, enabled_remotes): + def prepare(self, pkg_list, enabled_remotes, force=False): local_url = self._global_conf.get("core.scm:local_url", choices=["allow", "block"]) for ref, packages in pkg_list.items(): layout = self._app.cache.recipe_layout(ref) @@ -102,13 +102,13 @@ def prepare(self, pkg_list, enabled_remotes): bundle = pkg_list.recipe_dict(ref) bundle.pop("files", None) bundle.pop("upload-urls", None) - if bundle.get("upload"): + if bundle.get("upload") or force: self._prepare_recipe(ref, bundle, conanfile, enabled_remotes) for pref in packages: prev_bundle = pkg_list.package_dict(pref) prev_bundle.pop("files", None) # If defined from a previous upload prev_bundle.pop("upload-urls", None) - if prev_bundle.get("upload"): + if prev_bundle.get("upload") or force: self._prepare_package(pref, prev_bundle) def _prepare_recipe(self, ref, ref_bundle, conanfile, remotes): diff --git a/conan/internal/rest/pkg_sign.py b/conan/internal/rest/pkg_sign.py index 96d1eb1072a..43d0fe19981 100644 --- a/conan/internal/rest/pkg_sign.py +++ b/conan/internal/rest/pkg_sign.py @@ -4,6 +4,9 @@ from conan.internal.cache.home_paths import HomePaths from conan.internal.loader import load_python_file from conan.internal.util.files import mkdir +from conan.tools.pkg_signing.plugin import (get_manifest_filepath, get_signatures_filepath, + _save_manifest, _save_signatures, PKGSIGN_MANIFEST, + PKGSIGN_SIGNATURES) class PkgSignaturesPlugin: @@ -12,34 +15,53 @@ def __init__(self, cache, home_folder): signer = HomePaths(home_folder).sign_plugin_path if os.path.isfile(signer): mod, _ = load_python_file(signer) - # TODO: At the moment it requires both methods sign and verify, but that might be relaxed - self._plugin_sign_function = mod.sign - self._plugin_verify_function = mod.verify + self._plugin_sign_function = getattr(mod, "sign", None) + self._plugin_verify_function = getattr(mod, "verify", None) else: self._plugin_sign_function = self._plugin_verify_function = None + @property + def is_sign_configured(self): + return self._plugin_sign_function is not None + + @property + def is_verify_configured(self): + return self._plugin_verify_function is not None + + def sign_pkg(self, ref, files, folder): + metadata_sign = os.path.join(folder, METADATA, "sign") + mkdir(metadata_sign) + # Generate the package sign manifest before calling the plugin + _save_manifest(folder, metadata_sign) + signatures = self._plugin_sign_function(ref, artifacts_folder=folder, + signature_folder=metadata_sign) + # Save signatures file with the plugin's returned signatures data + _save_signatures(metadata_sign, signatures) + # Add files to package bundle so they get uploaded + files[f"{METADATA}/sign/{PKGSIGN_MANIFEST}"] = get_manifest_filepath(metadata_sign) + files[f"{METADATA}/sign/{PKGSIGN_SIGNATURES}"] = get_signatures_filepath(metadata_sign) + for sig in signatures: + for name, file in sig.get("sign_artifacts", {}).items(): + #TODO: print output? + files[f"{METADATA}/sign/{file}"] = os.path.join(metadata_sign, file) + def sign(self, upload_data): - if self._plugin_sign_function is None: + if not self.is_sign_configured: return - def _sign(ref, files, folder): - metadata_sign = os.path.join(folder, METADATA, "sign") - mkdir(metadata_sign) - self._plugin_sign_function(ref, artifacts_folder=folder, signature_folder=metadata_sign) - for f in os.listdir(metadata_sign): - files[f"{METADATA}/sign/{f}"] = os.path.join(metadata_sign, f) - for rref, packages in upload_data.items(): recipe_bundle = upload_data.recipe_dict(rref) if recipe_bundle["upload"]: - _sign(rref, recipe_bundle["files"], self._cache.recipe_layout(rref).download_export()) + self.sign_pkg(rref, recipe_bundle["files"], + self._cache.recipe_layout(rref).download_export()) for pref in packages: pkg_bundle = upload_data.package_dict(pref) if pkg_bundle["upload"]: - _sign(pref, pkg_bundle["files"], self._cache.pkg_layout(pref).download_package()) + self.sign_pkg(pref, pkg_bundle["files"], + self._cache.pkg_layout(pref).download_package()) def verify(self, ref, folder, files): - if self._plugin_verify_function is None: + if not self.is_verify_configured: return metadata_sign = os.path.join(folder, METADATA, "sign") self._plugin_verify_function(ref, artifacts_folder=folder, signature_folder=metadata_sign, diff --git a/conan/tools/pkg_signing/__init__.py b/conan/tools/pkg_signing/__init__.py new file mode 100644 index 00000000000..e69de29bb2d diff --git a/conan/tools/pkg_signing/plugin.py b/conan/tools/pkg_signing/plugin.py new file mode 100644 index 00000000000..a90744d8d0c --- /dev/null +++ b/conan/tools/pkg_signing/plugin.py @@ -0,0 +1,133 @@ +import copy +import os +import json + +from conan.api.output import ConanOutput +from conan.errors import ConanException +from conan.internal.util.files import load, sha256sum, save + +# FIXME: Maybe this tools should be placed at conan.api.subapi.cache as they are not recipe tools? + +PKGSIGN_MANIFEST = "pkgsign-manifest.json" +PKGSIGN_SIGNATURES = "pkgsign-signatures.json" + + +def get_manifest_filepath(signature_folder): + """ + Gets the path of the summary file path + @param signature_folder: Signature folder path + @return: Path of the summary file + """ + return os.path.join(signature_folder, PKGSIGN_MANIFEST) + + +def load_manifest(signature_folder): + """ + Loads the summary file from the signature folder + @param signature_folder: Signature folder path + @return: Dictionary object with the content of the summary + """ + return json.loads(load(get_manifest_filepath(signature_folder))) + + +def _create_manifest_content(artifacts_folder): + """ + Creates the summary content as a dictionary for manipulation. + + Returns a structure like: + { + "files": [ + {"file": "conan_package.tgz", "sha256": "abc123"}, + {"file": "other_file.bin", "sha256": "fff999"}, + ... + ] + } + """ + files_list = [] + + for fname in os.listdir(artifacts_folder): + file_path = os.path.join(artifacts_folder, fname) + + if os.path.isfile(file_path): + entry = { + "file": fname, + "sha256": sha256sum(file_path) + } + files_list.append(entry) + + # Sort files by filename to ensure consistent order + files_list.sort(key=lambda x: x["file"]) + + return {"files": files_list} + + +def _save_manifest(artifacts_folder, signature_folder): + """ + Saves the content of manifest file in the signature folder + @param signature_folder: Signature folder path + """ + content = _create_manifest_content(artifacts_folder) + save(get_manifest_filepath(signature_folder), json.dumps(content, indent=2)) + + +def get_signatures_filepath(signature_folder): + """ + Gets the path of the signatures file path + @param signature_folder: Signature folder path + @return: Path of the signatures file + """ + return os.path.join(signature_folder, PKGSIGN_SIGNATURES) + + +def load_signatures(signature_folder): + """ + Loads the signatures file pkgsign-signatures.json from the signature folder + @param signature_folder: Signature folder path + @return: dict of {filename: signature_value} + """ + return json.loads(load(get_signatures_filepath(signature_folder))) + + +def _save_signatures(signature_folder, signatures): + """ + Saves the content of signatures file in the signature folder + @param signature_folder: Signature folder path + @param signatures: dict of {filename: signature_value} + """ + assert isinstance(signatures, list),\ + "The signature plugin function must return a list of signatures values" + for signature in signatures: + assert signature.get("method"), "Signature 'method' must be set" + assert signature.get("provider"), "Signature 'provider' must be set" + assert signature.get("sign_artifacts"), "Signature 'sign_artifacts' must be set" + assert isinstance(signature.get("sign_artifacts"), dict), \ + "'sign_artifacts' must be a dict of {name: signature_filename}" + assert os.path.isfile(get_manifest_filepath(signature_folder)),\ + "Manifest file must exist before saving signatures" + content = { + "manifest": PKGSIGN_MANIFEST, + "signatures": signatures + } + save(get_signatures_filepath(signature_folder), json.dumps(content, indent=2)) + + +def verify_files_checksums(signature_folder, files): + """ + Verifies that the files' checksums match those stored in the summary. + + @param files: dict of {filename: filepath} of files in artifact folder to verify + """ + expected_list = load_manifest(signature_folder).get("files", []) + expected_files = {item["file"]: item["sha256"] for item in expected_list} + + for filename, file_path in files.items(): + expected_checksum = expected_files.get(filename) + actual_checksum = sha256sum(file_path) + + if actual_checksum != expected_checksum: + raise ConanException( + f"Checksum mismatch for file {filename}: " + f"expected {expected_checksum}, got {actual_checksum}." + ) + else: + ConanOutput().info(f"Checksum verified for file {filename} ({actual_checksum}).") diff --git a/test/integration/command/cache/test_cache_sign.py b/test/integration/command/cache/test_cache_sign.py new file mode 100644 index 00000000000..53f513d9a69 --- /dev/null +++ b/test/integration/command/cache/test_cache_sign.py @@ -0,0 +1,247 @@ +import json +import textwrap + +from conan.test.assets.genconanfile import GenConanfile +from conan.test.utils.tools import TestClient + +PLUGIN_CONTENT = textwrap.dedent(""" + import os + from conan.tools.files import save + + def sign(ref, artifacts_folder, signature_folder, **kwargs): + save(None, os.path.join(signature_folder, "signature.sig"), "signed-content") + return [{ + "method": "dummy-method", + "provider": "dummy-provider", + "sign_artifacts": {"signature": "signature.sig"} + }] + + def verify(ref, artifacts_folder, signature_folder, files, **kwargs): + pass +""") + +def test_pkg_sign_no_plugin(): + c = TestClient() + c.save({"conanfile.py": GenConanfile("pkg", "0.1")}) + c.run("create .") + c.run("cache sign *", assert_error=True) + assert "ERROR: The sign() function in the package sign plugin is not defined." in c.out + c.run("cache verify *", assert_error=True) + assert "ERROR: The verify() function in the package sign plugin is not defined." in c.out + + +def test_pkg_sign_no_plugin_functions(): + c = TestClient() + c.save_home({"extensions/plugins/sign/sign.py": ""}) + c.save({"conanfile.py": GenConanfile("pkg", "0.1")}) + c.run("create .") + c.run("cache sign *", assert_error=True) + assert "ERROR: The sign() function in the package sign plugin is not defined." in c.out + c.run("cache verify *", assert_error=True) + assert "ERROR: The verify() function in the package sign plugin is not defined." in c.out + + +def test_pkg_sign_basic(): + c = TestClient() + c.save({"conanfile.py": GenConanfile("pkg", "0.1")}) + c.save_home({"extensions/plugins/sign/sign.py": PLUGIN_CONTENT}) + c.run("create .") + c.run("cache sign *") + assert textwrap.dedent("""\ + [Package sign] Results: + + pkg/0.1 + revisions + 485dad6cb11e2fa99d9afbe44a57a164 + packages + da39a3ee5e6b4b0d3255bfef95601890afd80709 + revisions + 0ba8627bd47edc3a501e8f0eb9a79e5e + + [Package sign] Summary: OK=2, FAILED=0""") in c.out + + +def test_pkg_verify_basic(): + c = TestClient() + c.save({"conanfile.py": GenConanfile("pkg", "0.1")}) + c.save_home({"extensions/plugins/sign/sign.py": PLUGIN_CONTENT}) + c.run("create .") + c.run("cache verify *") + assert textwrap.dedent(""" + [Package sign] Results: + + pkg/0.1 + revisions + 485dad6cb11e2fa99d9afbe44a57a164 + packages + da39a3ee5e6b4b0d3255bfef95601890afd80709 + revisions + 0ba8627bd47edc3a501e8f0eb9a79e5e + + [Package sign] Summary: OK=2, FAILED=0""") in c.out + + +def test_pkg_sign_no_packages(): + c = TestClient() + c.save({"conanfile.py": GenConanfile("pkg", "0.1")}) + c.save_home({"extensions/plugins/sign/sign.py": PLUGIN_CONTENT}) + c.run("create .") + c.run("cache sign other-pkg/*", assert_error=True) + assert "ERROR: No packages to sign in the pkglist provided" in c.out + c.run("cache verify other-pkg/*", assert_error=True) + assert "ERROR: No packages to verify in the pkglist provided" in c.out + + +def test_pkg_sign_exception(): + c = TestClient() + signer = textwrap.dedent(r""" + import os + from conan.errors import ConanException + from conan.tools.files import save + + def sign(ref, artifacts_folder, signature_folder, **kwargs): + if "lib" in ref.repr_notime(): + raise ConanException("Error signing package") + save(None, os.path.join(signature_folder, "signature.sig"), "signed-content") + return [{ + "method": "dummy-method", + "provider": "dummy-provider", + "sign_artifacts": {"signature": "signature.sig"} + }] + """) + c.save_home({"extensions/plugins/sign/sign.py": signer}) + c.save({"conanfile.py": GenConanfile("pkg", "0.1")}) + c.run("export .") + c.save({"conanfile.py": GenConanfile("lib", "0.1")}) + c.run("export .") + c.save({"conanfile.py": GenConanfile("package", "0.1")}) + c.run("export .") + c.run("cache sign *", assert_error=True) + assert textwrap.dedent("""\ + [Package sign] Results: + + lib/0.1 + revisions + dbe307e08b1a344fef76f60c85c0c4e8 + ERROR: Error signing package + package/0.1 + revisions + 1fd0e5bcc411dcd3ff5b16024e2d7c04 + pkg/0.1 + revisions + 485dad6cb11e2fa99d9afbe44a57a164 + + [Package sign] Summary: OK=2, FAILED=1""") in c.out + # test json output + c.run("cache sign * -f json", assert_error=True) + assert "ERROR: There were some errors in the package signing process. " \ + "Please check the output." in c.out + results = json.loads(c.stdout) + assert results["lib/0.1"]["revisions"]["dbe307e08b1a344fef76f60c85c0c4e8"]["pkgsign_error"] == \ + "Error signing package" + + +def test_pkg_verify_exception(): + c = TestClient() + signer = textwrap.dedent(r""" + from conan.errors import ConanException + + def verify(ref, artifacts_folder, signature_folder, files, **kwargs): + if "lib" in ref.repr_notime(): + raise ConanException("Wrong signature") + """) + c.save_home({"extensions/plugins/sign/sign.py": signer}) + c.save({"conanfile.py": GenConanfile("pkg", "0.1")}) + c.run("export .") + c.save({"conanfile.py": GenConanfile("lib", "0.1")}) + c.run("export .") + c.save({"conanfile.py": GenConanfile("package", "0.1")}) + c.run("export .") + c.run("cache verify *", assert_error=True) + assert textwrap.dedent("""\ + [Package sign] Results: + + lib/0.1 + revisions + dbe307e08b1a344fef76f60c85c0c4e8 + ERROR: Wrong signature + package/0.1 + revisions + 1fd0e5bcc411dcd3ff5b16024e2d7c04 + pkg/0.1 + revisions + 485dad6cb11e2fa99d9afbe44a57a164 + + [Package sign] Summary: OK=2, FAILED=1""") in c.out + # test json output + c.run("cache verify * -f json", assert_error=True) + assert "ERROR: There were some errors in the package signing process. " \ + "Please check the output." in c.out + results = json.loads(c.stdout) + assert results["lib/0.1"]["revisions"]["dbe307e08b1a344fef76f60c85c0c4e8"]["pkgsign_error"] == \ + "Wrong signature" + + +def test_pkg_sign_verify_pkglist(): + c = TestClient() + c.save({"conanfile.py": GenConanfile("pkg", "0.1")}) + c.save_home({"extensions/plugins/sign/sign.py": PLUGIN_CONTENT}) + c.run("create .") + # test incomplete package list + c.run("list */* -f json", redirect_stdout="pkglist.json") + c.run("cache sign -l pkglist.json", assert_error=True) + assert "ERROR: No packages to sign in the pkglist provided" in c.out + c.run("cache verify -l pkglist.json", assert_error=True) + assert "ERROR: No packages to verify in the pkglist provided" in c.out + + # test recipe latest package list + c.run("list */*#latest -f json", redirect_stdout="pkglist.json") + c.run("cache sign -l pkglist.json") + expected = textwrap.dedent("""\ + [Package sign] Results: + + pkg/0.1 + revisions + 485dad6cb11e2fa99d9afbe44a57a164 + + [Package sign] Summary: OK=1, FAILED=0""") + assert expected in c.out + c.run("cache verify -l pkglist.json") + assert expected in c.out + + # test packages without prev package list + c.run("list */*:* -f json", redirect_stdout="pkglist.json") + # FIXME: list command is renturning packages without package revision, so packages are not signed + c.run("cache sign -l pkglist.json") + expected = textwrap.dedent("""\ + [Package sign] Results: + + pkg/0.1 + revisions + 485dad6cb11e2fa99d9afbe44a57a164 + packages + da39a3ee5e6b4b0d3255bfef95601890afd80709 + + [Package sign] Summary: OK=1, FAILED=0""") + assert expected in c.out + c.run("cache verify -l pkglist.json") + assert expected in c.out + + # test packages with prev package list + c.run("list */*:*#latest -f json", redirect_stdout="pkglist.json") + c.run("cache sign -l pkglist.json") + expected = textwrap.dedent("""\ + [Package sign] Results: + + pkg/0.1 + revisions + 485dad6cb11e2fa99d9afbe44a57a164 + packages + da39a3ee5e6b4b0d3255bfef95601890afd80709 + revisions + 0ba8627bd47edc3a501e8f0eb9a79e5e + + [Package sign] Summary: OK=2, FAILED=0""") + assert expected in c.out + c.run("cache verify -l pkglist.json") + assert expected in c.out diff --git a/test/integration/test_pkg_signing.py b/test/integration/test_pkg_signing.py index d29d21ef80d..a5c992b38da 100644 --- a/test/integration/test_pkg_signing.py +++ b/test/integration/test_pkg_signing.py @@ -13,7 +13,7 @@ def test_pkg_sign(): signer = textwrap.dedent(r""" import os - def sign(ref, artifacts_folder, signature_folder): + def sign(ref, artifacts_folder, signature_folder, **kwargs): print("Signing ref: ", ref) print("Signing folder: ", artifacts_folder) files = [] @@ -23,8 +23,13 @@ def sign(ref, artifacts_folder, signature_folder): print("Signing files: ", sorted(files)) signature = os.path.join(signature_folder, "signature.asc") open(signature, "w").write("\n".join(files)) + return [{ + "method": "dummy-method", + "provider": "dummy-provider", + "sign_artifacts": {"signature": "signature.asc"} + }] - def verify(ref, artifacts_folder, signature_folder, files): + def verify(ref, artifacts_folder, signature_folder, files, **kwargs): print("Verifying ref: ", ref) print("Verifying folder: ", artifacts_folder) signature = os.path.join(signature_folder, "signature.asc") @@ -54,3 +59,206 @@ def verify(ref, artifacts_folder, signature_folder, files): assert "Verifying ref: pkg/0.1" in c.out assert "VERIFYING conanfile.py" not in c.out # It doesn't re-verify previous contents assert "VERIFYING conan_sources.tgz" in c.out + + +def test_pkg_sign_manifest_signatures(): + """Test that the sign function generates the manifest and signatures files + and the verify function can access them""" + c = TestClient(default_server_user=True) + c.save({"conanfile.py": GenConanfile("pkg", "0.1").with_exports("export/*") + .with_exports_sources("export_sources/*").with_package_file("myfile", "mycontents!"), + "export/file1.txt": "file1!", + "export_sources/file2.txt": "file2!"}) + signer = textwrap.dedent(r""" + import os + from conan.tools.files import load, save + from conan.tools.pkg_signing.plugin import (load_manifest, load_signatures, + verify_files_checksums) + + def sign(ref, artifacts_folder, signature_folder, **kwargs): + save(None, os.path.join(signature_folder, "pkgsign-manifest.json.sig"), "") + print(f"Creating signature pkgsign-manifest.json.sig for {ref}") + # Return the pkgsign-signatures.json's content + return [{"method": "openssl-dgst", + "provider": "conan-client", + "sign_artifacts": {"signature": "pkgsign-manifest.json.sig"}}] + + def verify(ref, artifacts_folder, signature_folder, files, **kwargs): + print(f"Manifest content:\n {load_manifest(signature_folder)}") + verify_files_checksums(signature_folder, files) + signatures_content = load_signatures(signature_folder) + signatures = signatures_content["signatures"] + for signature in signatures_content["signatures"]: + provider = signature.get("provider") + method = signature.get("method") + signature = signature.get("sign_artifacts", {}).get("signature") + print(f"Provider: {provider}, Method: {method}, Signature: {signature}") + # Verify signature here + """) + c.save_home({"extensions/plugins/sign/sign.py": signer}) + c.run("create .") + c.run("cache sign *") + assert "Creating signature pkgsign-manifest.json.sig for pkg/0.1" in c.out + c.run("cache verify *") + assert "Manifest content:\n {'files': [{'file': 'conan_export.tgz'" in c.out + assert "Checksum verified for file conanfile.py" in c.out + assert "Provider: conan-client, Method: openssl-dgst, Signature: pkgsign-manifest.json.sig" + assert "Manifest content:\n {'files': [{'file': 'conan_package.tgz'" in c.out + assert "Checksum verified for file conan_package.tgz" in c.out + + +def test_pkg_sign_canonical(): + c = TestClient(default_server_user=True) + c.save({"conanfile1.py": GenConanfile("lib1ok", "0.1") + .with_exports_sources("*.txt").with_package_file("package.txt", "kk"), + "conanfile2.py": GenConanfile("lib2fail", "0.1"), # will fail when installed + "conanfile3.py": GenConanfile("lib3fail", "0.1"), # should always fail + "sources.txt": "kk"}) + c.run("create conanfile1.py") + c.run("create conanfile2.py") + c.run("create conanfile3.py") + signer = textwrap.dedent(r""" + import os + from conan.errors import ConanException + from conan.api.output import ConanOutput + from conan.tools.files import save + from conan.tools.pkg_signing.plugin import (get_signatures_filepath, load_signatures, + verify_files_checksums) + + def sign(ref, artifacts_folder, signature_folder, **kwargs): + ConanOutput().info(f"Signing reference {ref}") + ConanOutput().info(f"Signing folder: {artifacts_folder}") + + if "lib3fail" in str(ref): + raise ConanException("sign failed") + elif "lib2fail" in str(ref): + provider = "this will fail to verify" + else: + provider = "conan-client" + # Simulate signing the package + save(None, os.path.join(signature_folder, "pkgsign-manifest.json.sig"), "") + ConanOutput().info(f"Signature ok for {ref}") + return [{"method": "dummy-method", + "provider": provider, + "sign_artifacts": {"signature": "pkgsign-manifest.json.sig"} + }] + + def verify(ref, artifacts_folder, signature_folder, files, **kwargs): + ConanOutput().info(f"Verifying reference {ref}") + signatures_file_path = get_signatures_filepath(signature_folder) + if not os.path.isfile(signatures_file_path): + raise ConanException("Package is not signed") + + if "lib3fail" in str(ref): + raise ConanException(f"verify failed for {ref}") + # Simulate verification + signatures = load_signatures(signature_folder) + provider = signatures["signatures"][0]["provider"] + if provider != "conan-client": + raise ConanException(f"Failed to verify the package {ref}") + verify_files_checksums(signature_folder, files) + signature = signatures["signatures"][0]["sign_artifacts"]["signature"] + ConanOutput().info(f"Verification ok for {ref} with signature {signature}") + """) + c.save_home({"extensions/plugins/sign/sign.py": signer}) + + # Cache verify command fails and reports if package is not signed + c.run("cache verify *", assert_error=True) + assert textwrap.dedent(""" + [Package sign] Results: + + lib1ok/0.1 + revisions + a6a4e799bb673d6e5ca4f904118d672e + packages + da39a3ee5e6b4b0d3255bfef95601890afd80709 + revisions + 76285bcb59a81071122cba04b2269b52 + ERROR: Package is not signed + ERROR: Package is not signed + lib2fail/0.1 + revisions + 70a185be5a95af3dde25b74ae800b2f2 + packages + da39a3ee5e6b4b0d3255bfef95601890afd80709 + revisions + 0ba8627bd47edc3a501e8f0eb9a79e5e + ERROR: Package is not signed + ERROR: Package is not signed + lib3fail/0.1 + revisions + 09ccc766ddd11c96aa78307b3f166fd6 + packages + da39a3ee5e6b4b0d3255bfef95601890afd80709 + revisions + 0ba8627bd47edc3a501e8f0eb9a79e5e + ERROR: Package is not signed + ERROR: Package is not signed + + [Package sign] Summary: OK=0, FAILED=6""") in c.out + + # Cache sign command fails if a package fails to sign and reports it + c.run("cache sign *", assert_error=True) + assert textwrap.dedent(""" + [Package sign] Results: + + lib1ok/0.1 + revisions + a6a4e799bb673d6e5ca4f904118d672e + packages + da39a3ee5e6b4b0d3255bfef95601890afd80709 + revisions + 76285bcb59a81071122cba04b2269b52 + lib2fail/0.1 + revisions + 70a185be5a95af3dde25b74ae800b2f2 + packages + da39a3ee5e6b4b0d3255bfef95601890afd80709 + revisions + 0ba8627bd47edc3a501e8f0eb9a79e5e + lib3fail/0.1 + revisions + 09ccc766ddd11c96aa78307b3f166fd6 + packages + da39a3ee5e6b4b0d3255bfef95601890afd80709 + revisions + 0ba8627bd47edc3a501e8f0eb9a79e5e + ERROR: sign failed + ERROR: sign failed + + [Package sign] Summary: OK=4, FAILED=2 + """) in c.out + + # Upload sign fails if package signing fails + c.run("upload * -c -r default", assert_error=True) + assert "ERROR: sign failed" in c.out + + # If upload sign failed, no packages should be uploaded + c.run("list * -r default") + assert "WARN: There are no matching recipe references" in c.out + + # Upload packages individually to avoid previous failure + c.run("upload lib1ok* -c -r default") + c.run("upload lib2fail* -c -r default") + c.run("remove * -c") + + # Install verify command should fail if package sign verification fails + c.run("install --requires lib1ok/0.1 --requires lib2fail/0.1 -r default", + assert_error=True) + assert "ERROR: Package 'lib2fail/0.1' not resolved: Failed to verify " \ + "the package lib2fail/0.1" in c.out + + # If packages fail to verify signature, they should not be installed + c.run("list *") + assert "lib1ok" in c.out + assert "lib2fail" not in c.out + c.run("cache verify *") + assert textwrap.dedent("""\ + [Package sign] Results: + + lib1ok/0.1 + revisions + a6a4e799bb673d6e5ca4f904118d672e + + [Package sign] Summary: OK=1, FAILED=0 + """) in c.out diff --git a/test/unittests/tools/pkg_signing/__init__.py b/test/unittests/tools/pkg_signing/__init__.py new file mode 100644 index 00000000000..e69de29bb2d diff --git a/test/unittests/tools/pkg_signing/test_sign_tools.py b/test/unittests/tools/pkg_signing/test_sign_tools.py new file mode 100644 index 00000000000..c1558a70ef0 --- /dev/null +++ b/test/unittests/tools/pkg_signing/test_sign_tools.py @@ -0,0 +1,266 @@ +import os + +import pytest + +from conan.test.utils.tools import temp_folder, save_files + +from conan.tools.pkg_signing.plugin import (_create_manifest_content, get_manifest_filepath, + load_manifest, _save_manifest, get_signatures_filepath, + load_signatures, _save_signatures, verify_files_checksums) +from conan.errors import ConanException + + +@pytest.fixture +def pkg_sign_tools(): + main_folder = temp_folder() + artifacts_folder = os.path.join(main_folder, "af") + os.mkdir(artifacts_folder) + signature_folder = os.path.join(main_folder, "sf") + os.mkdir(signature_folder) + save_files(artifacts_folder, {"conan_package.tgz": "", "conanmanifest.txt": ""}) + return artifacts_folder, signature_folder + + +def test_get_manifest_filepath(pkg_sign_tools): + """Test that get_manifest_filepath returns the correct path for the manifest file.""" + _, signature_folder = pkg_sign_tools + manifest_path = get_manifest_filepath(signature_folder) + assert manifest_path == os.path.join(signature_folder, "pkgsign-manifest.json") + + +def test_create_manifest_content_with_empty_files(pkg_sign_tools): + """Test that _create_manifest_content correctly creates manifest for empty files.""" + artifacts_folder, _ = pkg_sign_tools + content = _create_manifest_content(artifacts_folder) + + # Verify structure + assert "files" in content + assert isinstance(content["files"], list) + assert len(content["files"]) == 2 + + # Files should be sorted alphabetically + files = content["files"] + assert files[0]["file"] == "conan_package.tgz" + assert files[1]["file"] == "conanmanifest.txt" + + # Empty file SHA256 + empty_file_sha256 = "e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855" + assert files[0]["sha256"] == empty_file_sha256 + assert files[1]["sha256"] == empty_file_sha256 + + +def test_create_manifest_content_ignores_directories(pkg_sign_tools): + """Test that _create_manifest_content only includes files, not directories.""" + artifacts_folder, _ = pkg_sign_tools + # Create a subdirectory + subdir = os.path.join(artifacts_folder, "subdir") + os.mkdir(subdir) + + content = _create_manifest_content(artifacts_folder) + filenames = [f["file"] for f in content["files"]] + + # Should not include the directory + assert "subdir" not in filenames + assert len(content["files"]) == 2 # Only the two original files + + +def test_save_load_manifest(pkg_sign_tools): + """Test that saving and loading manifest preserves all data correctly.""" + artifacts_folder, signature_folder = pkg_sign_tools + _save_manifest(artifacts_folder, signature_folder) + + # Verify file exists + manifest_path = get_manifest_filepath(signature_folder) + assert os.path.exists(manifest_path) + + # Load and verify content + manifest = load_manifest(signature_folder) + assert "files" in manifest + assert isinstance(manifest["files"], list) + assert len(manifest["files"]) == 2 + + # Verify files are sorted and contain expected data + filenames = [f["file"] for f in manifest["files"]] + assert filenames == ["conan_package.tgz", "conanmanifest.txt"] + + # Verify each file entry has required fields + for file_entry in manifest["files"]: + assert "file" in file_entry + assert "sha256" in file_entry + assert len(file_entry["sha256"]) == 64 + + +def test_get_signatures_filepath(pkg_sign_tools): + """Test that get_signatures_filepath returns the correct path for the signatures file.""" + _, signature_folder = pkg_sign_tools + signatures_path = get_signatures_filepath(signature_folder) + assert signatures_path == os.path.join(signature_folder, "pkgsign-signatures.json") + + +def test_save_load_signatures(pkg_sign_tools): + """Test that saving and loading signatures preserves all data correctly.""" + artifacts_folder, signature_folder = pkg_sign_tools + # Manifest must exist before saving signatures + _save_manifest(artifacts_folder, signature_folder) + + signatures = [{ + "method": "openssl-dgst", + "provider": "my-organization", + "sign_artifacts": { + "conan_package signature": "conan_package.tgz.sig", + "conanmanifest signature": "conanmanifest.txt.sig" + } + }] + _save_signatures(signature_folder, signatures) + + # Verify file exists + assert os.path.exists(get_signatures_filepath(signature_folder)) + + # Load and verify content + loaded = load_signatures(signature_folder) + assert loaded["manifest"] == "pkgsign-manifest.json" + assert len(loaded["signatures"]) == 1 + + signature = loaded["signatures"][0] + assert signature["method"] == "openssl-dgst" + assert signature["provider"] == "my-organization" + assert signature["sign_artifacts"]["conan_package signature"] == "conan_package.tgz.sig" + assert signature["sign_artifacts"]["conanmanifest signature"] == "conanmanifest.txt.sig" + + +def test_save_signatures_with_multiple_signatures(pkg_sign_tools): + """Test that _save_signatures can handle multiple signature entries.""" + artifacts_folder, signature_folder = pkg_sign_tools + _save_manifest(artifacts_folder, signature_folder) + + signatures = [ + { + "method": "gpg", + "provider": "my-organization", + "sign_artifacts": {"signature": "pkgsign-manifest.json.gpg"} + }, + { + "method": "cosign", + "provider": "my-organization", + "sign_artifacts": {"signature": "pkgsign-manifest.json.sig"} + } + ] + _save_signatures(signature_folder, signatures) + + loaded = load_signatures(signature_folder) + assert len(loaded["signatures"]) == 2 + assert loaded["signatures"][0]["method"] == "gpg" + assert loaded["signatures"][0]["sign_artifacts"]["signature"] == "pkgsign-manifest.json.gpg" + assert loaded["signatures"][1]["method"] == "cosign" + assert loaded["signatures"][1]["sign_artifacts"]["signature"] == "pkgsign-manifest.json.sig" + + +def test_save_signatures_requires_manifest(pkg_sign_tools): + """Test that _save_signatures raises an error if manifest doesn't exist.""" + _, signature_folder = pkg_sign_tools + + signatures = [{ + "method": "gpg", + "provider": "my-organization", + "sign_artifacts": {"file signature": "file.txt.gpg"} + }] + + with pytest.raises(AssertionError, match="Manifest file must exist"): + _save_signatures(signature_folder, signatures) + + +def test_save_signatures_validates_list_type(pkg_sign_tools): + """Test that _save_signatures validates signatures is a list.""" + artifacts_folder, signature_folder = pkg_sign_tools + _save_manifest(artifacts_folder, signature_folder) + + with pytest.raises(AssertionError, match="must return a list"): + _save_signatures(signature_folder, {"not": "a list"}) + + +def test_save_signatures_validates_required_fields(pkg_sign_tools): + """Test that _save_signatures validates required signature fields.""" + artifacts_folder, signature_folder = pkg_sign_tools + _save_manifest(artifacts_folder, signature_folder) + + # Missing method + with pytest.raises(AssertionError, match="'method' must be set"): + _save_signatures(signature_folder, + [{"provider": "my-organization","sign_artifacts": {}}]) + + # Missing provider + with pytest.raises(AssertionError, match="'provider' must be set"): + _save_signatures(signature_folder, [{"method": "gpg", "sign_artifacts": {}}]) + + # Missing sign_artifacts + with pytest.raises(AssertionError, match="'sign_artifacts' must be set"): + _save_signatures(signature_folder, + [{"method": "gpg", "provider": "my-organization"}]) + + # sign_artifacts not a dict + with pytest.raises(AssertionError, match="must be a dict"): + _save_signatures(signature_folder, + [{"method": "gpg", "provider": "my-organization", + "sign_artifacts": "not a dict"}]) + + +def test_verify_files_checksums_success(pkg_sign_tools): + """Test that verify_files_checksums succeeds when all checksums match.""" + artifacts_folder, signature_folder = pkg_sign_tools + _save_manifest(artifacts_folder, signature_folder) + + files = { + "conan_package.tgz": os.path.join(artifacts_folder, "conan_package.tgz"), + "conanmanifest.txt": os.path.join(artifacts_folder, "conanmanifest.txt") + } + # Should not raise an exception + verify_files_checksums(signature_folder, files) + + +def test_verify_files_checksums_partial_files(pkg_sign_tools): + """Test that verify_files_checksums works with a subset of files. This is to test in case that conan_sources.tgz is not present.""" + artifacts_folder, signature_folder = pkg_sign_tools + _save_manifest(artifacts_folder, signature_folder) + + # Verify only one file + files = { + "conanmanifest.txt": os.path.join(artifacts_folder, "conanmanifest.txt") + } + # Should not raise an exception + verify_files_checksums(signature_folder, files) + + +def test_verify_files_checksums_mismatch(pkg_sign_tools): + """Test that verify_files_checksums raises exception when checksums don't match.""" + artifacts_folder, signature_folder = pkg_sign_tools + _save_manifest(artifacts_folder, signature_folder) + + # Modify file content to cause checksum mismatch + modified_file = os.path.join(artifacts_folder, "conan_package.tgz") + with open(modified_file, "w") as f: + f.write("modified content") + + files = { + "conan_package.tgz": modified_file, + "conanmanifest.txt": os.path.join(artifacts_folder, "conanmanifest.txt") + } + + with pytest.raises(ConanException, match="Checksum mismatch for file conan_package.tgz"): + verify_files_checksums(signature_folder, files) + + +def test_verify_files_checksums_missing_file_in_manifest(pkg_sign_tools): + """Test that verify_files_checksums handles files not in manifest.""" + artifacts_folder, signature_folder = pkg_sign_tools + _save_manifest(artifacts_folder, signature_folder) + + # Try to verify a file that doesn't exist in manifest + new_file = os.path.join(artifacts_folder, "new_file.txt") + with open(new_file, "w") as f: + f.write("content") + + files = {"new_file.txt": new_file} + + # Should raise exception because file is not in manifest (expected_checksum is None) + with pytest.raises(ConanException, match="Checksum mismatch"): + verify_files_checksums(signature_folder, files)