Skip to content
Open
Show file tree
Hide file tree
Changes from 16 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
66 changes: 66 additions & 0 deletions conan/api/subapi/cache.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,9 @@
from conan.errors import ConanException
from conan.api.model import PkgReference
from conan.api.model import RecipeReference
from conan.internal.api.uploader import PackagePreparator
from conan.internal.conan_app import ConanApp
from conan.internal.rest.pkg_sign import PkgSignaturesPlugin
from conan.internal.util.dates import revision_timestamp_now
from conan.internal.util.files import rmdir, mkdir, remove, save

Expand Down Expand Up @@ -77,6 +80,69 @@ def check_integrity(self, package_list):
checker = IntegrityChecker(cache)
checker.check(package_list)

def sign(self, package_list):
"""Sign packages with the package signing plugin"""
cache = PkgCache(self._conan_api.cache_folder, self._api_helpers.global_conf)
pkg_signer = PkgSignaturesPlugin(cache, self._conan_api.home_folder)
if not pkg_signer.is_configured:
raise ConanException(
"The package sign plugin is not configured. For more information on how to "
"configure it, please read the documentation at "
"https://docs.conan.io/2/reference/extensions/package_signing.html.")
app = ConanApp(self._conan_api)
preparator = PackagePreparator(app, self._api_helpers.global_conf)
preparator.prepare(package_list, [], force=True)

for rref, packages in package_list.items():
recipe_bundle = package_list.recipe_dict(rref)
if recipe_bundle:
rref_folder = cache.recipe_layout(rref).download_export()
try:
pkg_signer.sign_pkg(rref, recipe_bundle.get("files", {}), rref_folder)
except Exception as e:
recipe_bundle["error"] = e
for pref in packages:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it possible to have packages without recipe_bundle?
So it would become possible to sign the packages without signing the recipe?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think both should be signed, as packages cannot be uploaded without their recipe, and the recipe is the proxy to packages, so it makes sense to have both signed

pkg_bundle = package_list.package_dict(pref)
if pkg_bundle:
pref_folder = cache.pkg_layout(pref).download_package()
try:
pkg_signer.sign_pkg(pref, pkg_bundle.get("files", {}), pref_folder)
except Exception as e:
pkg_bundle["error"] = e
return package_list

def verify(self, package_list):
"""Verify packages with the package signing plugin"""
cache = PkgCache(self._conan_api.cache_folder, self._api_helpers.global_conf)
pkg_signer = PkgSignaturesPlugin(cache, self._conan_api.home_folder)
if not pkg_signer.is_configured:
raise ConanException(
"The package sign plugin is not configured. For more information on how to"
"configure it, please read the documentation at "
"https://docs.conan.io/2/reference/extensions/package_signing.html.")

for rref, packages in package_list.items():
recipe_bundle = package_list.recipe_dict(rref)
if recipe_bundle:
rref_folder = cache.recipe_layout(rref).download_export()
files = {file: os.path.join(rref_folder, file) for file in
os.listdir(rref_folder) if not file.startswith(METADATA)}
try:
pkg_signer.verify(rref, rref_folder, files)
except Exception as e:
recipe_bundle["error"] = e
for pref in packages:
pkg_bundle = package_list.package_dict(pref)
if pkg_bundle:
pref_folder = cache.pkg_layout(pref).download_package()
files = {file: os.path.join(pref_folder, file) for file in
os.listdir(pref_folder) if not file.startswith(METADATA)}
try:
pkg_signer.verify(pref, pref_folder, files)
except Exception as e:
pkg_bundle["error"] = e
return package_list

def clean(self, package_list, source=True, build=True, download=True, temp=True,
backup_sources=False):
"""
Expand Down
143 changes: 142 additions & 1 deletion conan/cli/commands/cache.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,84 @@
from conan.api.output import cli_out_write, ConanOutput
from conan.cli import make_abs_path
from conan.cli.command import conan_command, conan_subcommand, OnceArgument
from conan.cli.commands.list import print_list_text, print_list_json
from conan.cli.commands.list import print_list_text, print_list_json, print_serial
from conan.errors import ConanException
from conan.api.model import PkgReference
from conan.api.model import RecipeReference


def _get_package_sign_error(pkg_list):
signs = []
for rref, packages in pkg_list.items():
recipe_bundle = pkg_list.recipe_dict(rref)
if recipe_bundle:
signs.append(recipe_bundle.get("error"))
for pref in packages:
pkg_bundle = pkg_list.package_dict(pref)
if pkg_bundle:
signs.append(pkg_bundle.get("error"))

if any(isinstance(sign, Exception) for sign in signs):
return ConanException("There were some errors in the package signing process. "
"Please check the output.")
return None


def print_package_sign_json(data):
"""Converts the package sign Exception to string for JSON serialization and prints the result."""
results_dict = data.get("results", {})

def convert_exceptions(obj):
if isinstance(obj, dict):
return {k: convert_exceptions(v) for k, v in obj.items()}
elif isinstance(obj, Exception):
return str(obj)
else:
return obj

items = {ref: convert_exceptions(item) for ref, item in results_dict.items()}
print_list_json({"results": items})


def print_package_sign_text(data):
results_dict = data.get("results", {})

signs = []
for ref_data in results_dict.values():
for revision_data in ref_data.get("revisions", {}).values():
sign = revision_data.get("error")
signs.append(sign)

for pkg in revision_data.get("packages", {}).values():
for prev in pkg.get("revisions", {}).values():
sign = prev.get("error")
signs.append(sign)

remove_keys = {"info", "timestamp", "files"}

def clean_inline(obj):
if not isinstance(obj, dict):
return obj

cleaned = {k: clean_inline(v) for k, v in obj.items() if k not in remove_keys}

if "packages" in cleaned and not cleaned["packages"]:
del cleaned["packages"]

return cleaned

items = {ref: clean_inline(item) for ref, item in results_dict.items()}

# Output
cli_out_write(f"[Package sign] Results:\n")
print_serial(items)

# Summary
fail = sum(isinstance(s, Exception) for s in signs)
ok = len(signs) - fail
cli_out_write(f"\n[Package sign] Summary: OK={ok}, FAILED={fail}")


def json_export(data):
cli_out_write(json.dumps({"cache_path": data}))

Expand Down Expand Up @@ -149,6 +221,75 @@ def cache_check_integrity(conan_api: ConanAPI, parser, subparser, *args):
conan_api.cache.check_integrity(package_list)
ConanOutput().success("Integrity check: ok")

@conan_subcommand(formatters={"text": print_package_sign_text,
"json": print_package_sign_json})
def cache_sign(conan_api: ConanAPI, parser, subparser, *args):
"""
Sign packages with the Package Signing Plugin
"""
subparser.add_argument("pattern", nargs="?",
help="Selection pattern for references to be signed")
subparser.add_argument("-l", "--list", action=OnceArgument,
help="Package list of packages to be signed")
subparser.add_argument('-p', '--package-query', action=OnceArgument,
help="Only the packages matching a specific query, e.g., "
"os=Windows AND (arch=x86 OR compiler=gcc)")
args = parser.parse_args(*args)

if args.pattern is None and args.list is None:
raise ConanException("Missing pattern or package list file")
if args.pattern and args.list:
raise ConanException("Cannot specify both pattern and list")

if args.list:
listfile = make_abs_path(args.list)
multi_package_list = MultiPackagesList.load(listfile)
package_list = multi_package_list["Local Cache"]
else:
ref_pattern = ListPattern(args.pattern, package_id="*")
package_list = conan_api.list.select(ref_pattern, package_query=args.package_query)

conan_api.cache.sign(package_list)
return {
"conan_error": _get_package_sign_error(package_list),
"results": package_list.serialize()
}


@conan_subcommand(formatters={"text": print_package_sign_text,
"json": print_package_sign_json})
def cache_verify(conan_api: ConanAPI, parser, subparser, *args):
"""
Check the signature of packages with the Package Signing Plugin
"""
subparser.add_argument("pattern", nargs="?",
help="Selection pattern for references to verify their signature")
subparser.add_argument("-l", "--list", action=OnceArgument,
help="Package list of packages to verify their signature")
subparser.add_argument('-p', '--package-query', action=OnceArgument,
help="Only the packages matching a specific query, e.g., "
"os=Windows AND (arch=x86 OR compiler=gcc)")
args = parser.parse_args(*args)

if args.pattern is None and args.list is None:
raise ConanException("Missing pattern or package list file")
if args.pattern and args.list:
raise ConanException("Cannot specify both pattern and list")

if args.list:
listfile = make_abs_path(args.list)
multi_package_list = MultiPackagesList.load(listfile)
package_list = multi_package_list["Local Cache"]
else:
ref_pattern = ListPattern(args.pattern, package_id="*")
package_list = conan_api.list.select(ref_pattern, package_query=args.package_query)

conan_api.cache.verify(package_list)
return {
"conan_error": _get_package_sign_error(package_list),
"results": package_list.serialize()
}


@conan_subcommand(formatters={"text": print_list_text,
"json": print_list_json})
Expand Down
2 changes: 1 addition & 1 deletion conan/cli/commands/list.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ def print_serial(item, indent=None, color_index=None):
color = color_array[color_index % len(color_array)]
if isinstance(item, dict):
for k, v in item.items():
if isinstance(v, (str, int)):
if isinstance(v, (str, int, Exception)):
if k.lower() == "error":
color = Color.BRIGHT_RED
k = "ERROR"
Expand Down
6 changes: 3 additions & 3 deletions conan/internal/api/uploader.py
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ def __init__(self, app: ConanApp, global_conf):
self._app = app
self._global_conf = global_conf

def prepare(self, pkg_list, enabled_remotes):
def prepare(self, pkg_list, enabled_remotes, force=False):
local_url = self._global_conf.get("core.scm:local_url", choices=["allow", "block"])
for ref, packages in pkg_list.items():
layout = self._app.cache.recipe_layout(ref)
Expand All @@ -102,13 +102,13 @@ def prepare(self, pkg_list, enabled_remotes):
bundle = pkg_list.recipe_dict(ref)
bundle.pop("files", None)
bundle.pop("upload-urls", None)
if bundle.get("upload"):
if bundle.get("upload") or force:
self._prepare_recipe(ref, bundle, conanfile, enabled_remotes)
for pref in packages:
prev_bundle = pkg_list.package_dict(pref)
prev_bundle.pop("files", None) # If defined from a previous upload
prev_bundle.pop("upload-urls", None)
if prev_bundle.get("upload"):
if prev_bundle.get("upload") or force:
self._prepare_package(pref, prev_bundle)

def _prepare_recipe(self, ref, ref_bundle, conanfile, remotes):
Expand Down
31 changes: 20 additions & 11 deletions conan/internal/rest/pkg_sign.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import os

from conan.errors import ConanException
from conan.internal.cache.conan_reference_layout import METADATA
from conan.internal.cache.home_paths import HomePaths
from conan.internal.loader import load_python_file
Expand All @@ -13,30 +14,38 @@ def __init__(self, cache, home_folder):
if os.path.isfile(signer):
mod, _ = load_python_file(signer)
# TODO: At the moment it requires both methods sign and verify, but that might be relaxed
self._plugin_sign_function = mod.sign
self._plugin_verify_function = mod.verify
self._plugin_sign_function = getattr(mod, "sign", None)
self._plugin_verify_function = getattr(mod, "verify", None)
else:
self._plugin_sign_function = self._plugin_verify_function = None

@property
def is_configured(self):
return self._plugin_sign_function is not None and self._plugin_verify_function is not None

def sign_pkg(self, ref, files, folder):
metadata_sign = os.path.join(folder, METADATA, "sign")
mkdir(metadata_sign)
# TODO: Consider creating the package sign summary file by default and check after
# calling the plugins' sign function that provider and method fields are filled.
self._plugin_sign_function(ref, artifacts_folder=folder, signature_folder=metadata_sign)
for f in os.listdir(metadata_sign):
files[f"{METADATA}/sign/{f}"] = os.path.join(metadata_sign, f)

def sign(self, upload_data):
if self._plugin_sign_function is None:
return

def _sign(ref, files, folder):
metadata_sign = os.path.join(folder, METADATA, "sign")
mkdir(metadata_sign)
self._plugin_sign_function(ref, artifacts_folder=folder, signature_folder=metadata_sign)
for f in os.listdir(metadata_sign):
files[f"{METADATA}/sign/{f}"] = os.path.join(metadata_sign, f)

for rref, packages in upload_data.items():
recipe_bundle = upload_data.recipe_dict(rref)
if recipe_bundle["upload"]:
_sign(rref, recipe_bundle["files"], self._cache.recipe_layout(rref).download_export())
self.sign_pkg(rref, recipe_bundle["files"],
self._cache.recipe_layout(rref).download_export())
for pref in packages:
pkg_bundle = upload_data.package_dict(pref)
if pkg_bundle["upload"]:
_sign(pref, pkg_bundle["files"], self._cache.pkg_layout(pref).download_package())
self.sign_pkg(pref, pkg_bundle["files"],
self._cache.pkg_layout(pref).download_package())

def verify(self, ref, folder, files):
if self._plugin_verify_function is None:
Expand Down
Empty file.
64 changes: 64 additions & 0 deletions conan/tools/pkg_signing/plugin.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
import copy
import os
import json

from conan.internal.util.files import load, sha256sum, save

# FIXME: Maybe this tools should be placed at conan.api.subapi.cache as they are not recipe tools?

SIGN_SUMMARY_CONTENT = {
"provider": None,
"method": None,
"files": {}
}

SIGN_SUMMARY_FILENAME = "sign-summary.json"


def get_summary_file_path(signature_folder):
"""
Gets the path of the summary file path
@param signature_folder: Signature folder path
@return: Path of the summary file
"""
return os.path.join(signature_folder, SIGN_SUMMARY_FILENAME)


def load_summary(signature_folder):
"""
Loads the summary file from the signature folder
@param signature_folder: Signature folder path
@return: Dictionary object with the content of the summary
"""
return json.loads(load(get_summary_file_path(signature_folder)))


def create_summary_content(artifacts_folder):
"""
Creates the summary content as a dictionary for manipulation
@param artifacts_folder: Artifacts folder path
@return: Dictionary with the summary content
"""
checksums = {}
for fname in os.listdir(artifacts_folder):
file_path = os.path.join(artifacts_folder, fname)
if os.path.isfile(file_path):
sha256 = sha256sum(file_path)
checksums[fname] = sha256
assert checksums, f"Summary file content cannot be created: No files found in {artifacts_folder}"
sorted_checksums = dict(sorted(checksums.items()))
content = copy.deepcopy(SIGN_SUMMARY_CONTENT)
content["files"] = sorted_checksums
return content


def save_summary(signature_folder, content):
"""
Saves the content of the summary to the signature folder using SIGN_SUMMARY_FILENAME as the
file name
@param signature_folder: Signature folder path
@param content: Content of the summary file
"""
assert content.get("provider")
assert content.get("method")
save(get_summary_file_path(signature_folder), json.dumps(content))
Loading
Loading