Skip to content
Open
Show file tree
Hide file tree
Changes from 19 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions conan/api/model/list.py
Original file line number Diff line number Diff line change
Expand Up @@ -328,6 +328,15 @@ def items(self) -> Iterable[Tuple[RecipeReference, Dict[PkgReference, Dict]]]:
packages[pref] = prev_info
yield recipe, packages

def has_items(self) -> bool:
"""Whether the package list contains any element with revision that can be iterated."""
it = self.items()
try:
next(it)
return True
except StopIteration:
return False

def recipe_dict(self, ref: RecipeReference):
""" Gives read/write access to the dictionary containing a specific RecipeReference
information.
Expand Down
71 changes: 71 additions & 0 deletions conan/api/subapi/cache.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,9 @@
from conan.errors import ConanException
from conan.api.model import PkgReference
from conan.api.model import RecipeReference
from conan.internal.api.uploader import PackagePreparator
from conan.internal.conan_app import ConanApp
from conan.internal.rest.pkg_sign import PkgSignaturesPlugin
from conan.internal.util.dates import revision_timestamp_now
from conan.internal.util.files import rmdir, mkdir, remove, save

Expand Down Expand Up @@ -77,6 +80,74 @@ def check_integrity(self, package_list):
checker = IntegrityChecker(cache)
checker.check(package_list)

def sign(self, package_list, only_packages=False):
"""Sign packages with the package signing plugin"""
cache = PkgCache(self._conan_api.cache_folder, self._api_helpers.global_conf)
pkg_signer = PkgSignaturesPlugin(cache, self._conan_api.home_folder)
if not pkg_signer.is_sign_configured:
raise ConanException(
"The sign() function in the package sign plugin is not defined. For more "
"information on how to configure the plugin, please read the documentation at "
"https://docs.conan.io/2/reference/extensions/package_signing.html.")
if not package_list.has_items():
raise ConanException("No packages to sign in the pkglist provided.")

app = ConanApp(self._conan_api)
preparator = PackagePreparator(app, self._api_helpers.global_conf)
preparator.prepare(package_list, [], force=True)

for rref, packages in package_list.items():
recipe_bundle = package_list.recipe_dict(rref)
if recipe_bundle:
rref_folder = cache.recipe_layout(rref).download_export()
try:
pkg_signer.sign_pkg(rref, recipe_bundle.get("files", {}), rref_folder)
except Exception as e:
recipe_bundle["pkgsign_error"] = str(e)
for pref in packages:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it possible to have packages without recipe_bundle?
So it would become possible to sign the packages without signing the recipe?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think both should be signed, as packages cannot be uploaded without their recipe, and the recipe is the proxy to packages, so it makes sense to have both signed

pkg_bundle = package_list.package_dict(pref)
if pkg_bundle:
pref_folder = cache.pkg_layout(pref).download_package()
try:
pkg_signer.sign_pkg(pref, pkg_bundle.get("files", {}), pref_folder)
except Exception as e:
pkg_bundle["pkgsign_error"] = str(e)
return package_list

def verify(self, package_list):
"""Verify packages with the package signing plugin"""
cache = PkgCache(self._conan_api.cache_folder, self._api_helpers.global_conf)
pkg_signer = PkgSignaturesPlugin(cache, self._conan_api.home_folder)
if not pkg_signer.is_verify_configured:
raise ConanException(
"The verify() function in the package sign plugin is not defined. For more "
"information on how to configure the plugin, please read the documentation at "
"https://docs.conan.io/2/reference/extensions/package_signing.html.")
if not package_list.has_items():
raise ConanException("No packages to verify in the pkglist provided.")

for rref, packages in package_list.items():
recipe_bundle = package_list.recipe_dict(rref)
if recipe_bundle:
rref_folder = cache.recipe_layout(rref).download_export()
files = {file: os.path.join(rref_folder, file) for file in
os.listdir(rref_folder) if not file.startswith(METADATA)}
try:
pkg_signer.verify(rref, rref_folder, files)
except Exception as e:
recipe_bundle["pkgsign_error"] = str(e)
for pref in packages:
pkg_bundle = package_list.package_dict(pref)
if pkg_bundle:
pref_folder = cache.pkg_layout(pref).download_package()
files = {file: os.path.join(pref_folder, file) for file in
os.listdir(pref_folder) if not file.startswith(METADATA)}
try:
pkg_signer.verify(pref, pref_folder, files)
except Exception as e:
pkg_bundle["pkgsign_error"] = str(e)
return package_list

def clean(self, package_list, source=True, build=True, download=True, temp=True,
backup_sources=False):
"""
Expand Down
127 changes: 126 additions & 1 deletion conan/cli/commands/cache.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,68 @@
from conan.api.output import cli_out_write, ConanOutput
from conan.cli import make_abs_path
from conan.cli.command import conan_command, conan_subcommand, OnceArgument
from conan.cli.commands.list import print_list_text, print_list_json
from conan.cli.commands.list import print_list_text, print_list_json, print_serial
from conan.errors import ConanException
from conan.api.model import PkgReference
from conan.api.model import RecipeReference


def _get_package_sign_error(pkg_list):
signs = []
for rref, packages in pkg_list.items():
recipe_bundle = pkg_list.recipe_dict(rref)
if recipe_bundle:
signs.append(recipe_bundle.get("pkgsign_error"))
for pref in packages:
pkg_bundle = pkg_list.package_dict(pref)
if pkg_bundle:
signs.append(pkg_bundle.get("pkgsign_error"))

if any((sign is not None) for sign in signs):
return ConanException("There were some errors in the package signing process. "
"Please check the output.")
return None


def print_package_sign_text(data):
results_dict = data.get("results", {})

signs = []
for ref_data in results_dict.values():
for revision_data in ref_data.get("revisions", {}).values():
sign = revision_data.get("pkgsign_error")
signs.append(sign)

for pkg in revision_data.get("packages", {}).values():
for prev in pkg.get("revisions", {}).values():
sign = prev.get("pkgsign_error")
signs.append(sign)

remove_keys = {"info", "timestamp", "files"}

def clean_inline(obj):
if not isinstance(obj, dict):
return obj

cleaned = {k: clean_inline(v) for k, v in obj.items() if k not in remove_keys}

if "packages" in cleaned and not cleaned["packages"]:
del cleaned["packages"]

return cleaned

items = {ref: clean_inline(item) for ref, item in results_dict.items()}

# Output
cli_out_write(f"[Package sign] Results:\n")
print_serial(items)

# Summary
fail = sum((s is not None) for s in signs)
ok = len(signs) - fail
cli_out_write(f"\n[Package sign] Summary: OK={ok}, FAILED={fail}")


def json_export(data):
cli_out_write(json.dumps({"cache_path": data}))

Expand Down Expand Up @@ -149,6 +205,75 @@ def cache_check_integrity(conan_api: ConanAPI, parser, subparser, *args):
conan_api.cache.check_integrity(package_list)
ConanOutput().success("Integrity check: ok")

@conan_subcommand(formatters={"text": print_package_sign_text,
"json": print_list_json})
def cache_sign(conan_api: ConanAPI, parser, subparser, *args):
"""
Sign packages with the Package Signing Plugin
"""
subparser.add_argument("pattern", nargs="?",
help="Selection pattern for references to be signed")
subparser.add_argument("-l", "--list", action=OnceArgument,
help="Package list of packages to be signed")
subparser.add_argument('-p', '--package-query', action=OnceArgument,
help="Only the packages matching a specific query, e.g., "
"os=Windows AND (arch=x86 OR compiler=gcc)")
args = parser.parse_args(*args)

if args.pattern is None and args.list is None:
raise ConanException("Missing pattern or package list file")
if args.pattern and args.list:
raise ConanException("Cannot specify both pattern and list")

if args.list:
listfile = make_abs_path(args.list)
multi_package_list = MultiPackagesList.load(listfile)
package_list = multi_package_list["Local Cache"]
else:
ref_pattern = ListPattern(args.pattern, package_id="*")
package_list = conan_api.list.select(ref_pattern, package_query=args.package_query)

conan_api.cache.sign(package_list)
return {
"conan_error": _get_package_sign_error(package_list),
"results": package_list.serialize()
}


@conan_subcommand(formatters={"text": print_package_sign_text,
"json": print_list_json})
def cache_verify(conan_api: ConanAPI, parser, subparser, *args):
"""
Check the signature of packages with the Package Signing Plugin
"""
subparser.add_argument("pattern", nargs="?",
help="Selection pattern for references to verify their signature")
subparser.add_argument("-l", "--list", action=OnceArgument,
help="Package list of packages to verify their signature")
subparser.add_argument('-p', '--package-query', action=OnceArgument,
help="Only the packages matching a specific query, e.g., "
"os=Windows AND (arch=x86 OR compiler=gcc)")
args = parser.parse_args(*args)

if args.pattern is None and args.list is None:
raise ConanException("Missing pattern or package list file")
if args.pattern and args.list:
raise ConanException("Cannot specify both pattern and list")

if args.list:
listfile = make_abs_path(args.list)
multi_package_list = MultiPackagesList.load(listfile)
package_list = multi_package_list["Local Cache"]
else:
ref_pattern = ListPattern(args.pattern, package_id="*")
package_list = conan_api.list.select(ref_pattern, package_query=args.package_query)

conan_api.cache.verify(package_list)
return {
"conan_error": _get_package_sign_error(package_list),
"results": package_list.serialize()
}


@conan_subcommand(formatters={"text": print_list_text,
"json": print_list_json})
Expand Down
2 changes: 1 addition & 1 deletion conan/cli/commands/list.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ def print_serial(item, indent=None, color_index=None):
if isinstance(item, dict):
for k, v in item.items():
if isinstance(v, (str, int)):
if k.lower() == "error":
if "error" in k.lower():
color = Color.BRIGHT_RED
k = "ERROR"
elif k.lower() == "warning":
Expand Down
6 changes: 3 additions & 3 deletions conan/internal/api/uploader.py
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ def __init__(self, app: ConanApp, global_conf):
self._app = app
self._global_conf = global_conf

def prepare(self, pkg_list, enabled_remotes):
def prepare(self, pkg_list, enabled_remotes, force=False):
local_url = self._global_conf.get("core.scm:local_url", choices=["allow", "block"])
for ref, packages in pkg_list.items():
layout = self._app.cache.recipe_layout(ref)
Expand All @@ -102,13 +102,13 @@ def prepare(self, pkg_list, enabled_remotes):
bundle = pkg_list.recipe_dict(ref)
bundle.pop("files", None)
bundle.pop("upload-urls", None)
if bundle.get("upload"):
if bundle.get("upload") or force:
self._prepare_recipe(ref, bundle, conanfile, enabled_remotes)
for pref in packages:
prev_bundle = pkg_list.package_dict(pref)
prev_bundle.pop("files", None) # If defined from a previous upload
prev_bundle.pop("upload-urls", None)
if prev_bundle.get("upload"):
if prev_bundle.get("upload") or force:
self._prepare_package(pref, prev_bundle)

def _prepare_recipe(self, ref, ref_bundle, conanfile, remotes):
Expand Down
40 changes: 26 additions & 14 deletions conan/internal/rest/pkg_sign.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import os

from conan.errors import ConanException
from conan.internal.cache.conan_reference_layout import METADATA
from conan.internal.cache.home_paths import HomePaths
from conan.internal.loader import load_python_file
Expand All @@ -12,34 +13,45 @@ def __init__(self, cache, home_folder):
signer = HomePaths(home_folder).sign_plugin_path
if os.path.isfile(signer):
mod, _ = load_python_file(signer)
# TODO: At the moment it requires both methods sign and verify, but that might be relaxed
self._plugin_sign_function = mod.sign
self._plugin_verify_function = mod.verify
self._plugin_sign_function = getattr(mod, "sign", None)
self._plugin_verify_function = getattr(mod, "verify", None)
else:
self._plugin_sign_function = self._plugin_verify_function = None

@property
def is_sign_configured(self):
return self._plugin_sign_function is not None

@property
def is_verify_configured(self):
return self._plugin_verify_function is not None

def sign_pkg(self, ref, files, folder):
metadata_sign = os.path.join(folder, METADATA, "sign")
mkdir(metadata_sign)
# TODO: Consider creating the package sign summary file by default and check after
# calling the plugins' sign function that provider and method fields are filled.
self._plugin_sign_function(ref, artifacts_folder=folder, signature_folder=metadata_sign)
for f in os.listdir(metadata_sign):
files[f"{METADATA}/sign/{f}"] = os.path.join(metadata_sign, f)

def sign(self, upload_data):
if self._plugin_sign_function is None:
if not self.is_sign_configured:
return

def _sign(ref, files, folder):
metadata_sign = os.path.join(folder, METADATA, "sign")
mkdir(metadata_sign)
self._plugin_sign_function(ref, artifacts_folder=folder, signature_folder=metadata_sign)
for f in os.listdir(metadata_sign):
files[f"{METADATA}/sign/{f}"] = os.path.join(metadata_sign, f)

for rref, packages in upload_data.items():
recipe_bundle = upload_data.recipe_dict(rref)
if recipe_bundle["upload"]:
_sign(rref, recipe_bundle["files"], self._cache.recipe_layout(rref).download_export())
self.sign_pkg(rref, recipe_bundle["files"],
self._cache.recipe_layout(rref).download_export())
for pref in packages:
pkg_bundle = upload_data.package_dict(pref)
if pkg_bundle["upload"]:
_sign(pref, pkg_bundle["files"], self._cache.pkg_layout(pref).download_package())
self.sign_pkg(pref, pkg_bundle["files"],
self._cache.pkg_layout(pref).download_package())

def verify(self, ref, folder, files):
if self._plugin_verify_function is None:
if not self.is_verify_configured:
return
metadata_sign = os.path.join(folder, METADATA, "sign")
self._plugin_verify_function(ref, artifacts_folder=folder, signature_folder=metadata_sign,
Expand Down
Empty file.
Loading
Loading