Skip to content

Commit 478674b

Browse files
committed
feat: compare SBOMs and show components only present in target SBOM
Signed-off-by: badrikesh prusty <[email protected]>
1 parent a6a38e7 commit 478674b

File tree

5 files changed

+372
-0
lines changed

5 files changed

+372
-0
lines changed

src/debsbom/cli.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
from .commands.source_merge import SourceMergeCmd
1919
from .commands.repack import RepackCmd
2020
from .commands.export import ExportCmd
21+
from .commands.compare import CompareCmd
2122

2223
# Attempt to import optional download dependencies to check their availability.
2324
# The success or failure of these imports determines if download features are enabled.
@@ -64,6 +65,7 @@ def setup_parser():
6465
)
6566
RepackCmd.setup_parser(subparser.add_parser("repack", help="repack sources and sbom"))
6667
ExportCmd.setup_parser(subparser.add_parser("export", help="export SBOM as graph"))
68+
CompareCmd.setup_parser(subparser.add_parser("compare", help="compare SBOMs and list new components"))
6769

6870
return parser
6971

@@ -97,6 +99,8 @@ def main():
9799
ExportCmd.run(args)
98100
elif args.cmd == "merge":
99101
MergeCmd.run(args)
102+
elif args.cmd == "compare":
103+
CompareCmd.run(args)
100104
except DistroArchUnknownError as e:
101105
logger.error(f"debsbom: error: {e}. Set --distro-arch to dpkg architecture (e.g. amd64)")
102106
sys.exit(-2)

src/debsbom/commands/compare.py

Lines changed: 113 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,113 @@
1+
# Copyright (C) 2025 Siemens
2+
#
3+
# SPDX-License-Identifier: MIT
4+
5+
import logging
6+
import json
7+
from pathlib import Path
8+
import sys
9+
10+
from ..bomreader.cdxbomreader import CdxBomReader
11+
from ..bomreader.spdxbomreader import SpdxBomReader
12+
from ..bomwriter import BomWriter
13+
from .input import GenerateInput, warn_if_tty
14+
from ..compare.spdx import SpdxSbomCompare
15+
from ..compare.cdx import CdxSbomCompare
16+
from ..sbom import SBOMType
17+
18+
19+
logger = logging.getLogger(__name__)
20+
21+
22+
class CompareCmd(GenerateInput):
23+
"""
24+
Compare two SBOMs and generate a new SBOM containing only the additional components found in the target
25+
"""
26+
27+
@classmethod
28+
def run(cls, args):
29+
if args.base_sbom == "-" or args.target_sbom == "-":
30+
warn_if_tty()
31+
if args.sbom_type is None:
32+
raise ValueError("option --sbom-type is required when reading SBOMs from stdin")
33+
decoder = json.JSONDecoder()
34+
else:
35+
base_sbom_fmt = None
36+
target_sbom_fmt = None
37+
base_sbom_path = Path(args.base_sbom)
38+
target_sbom_path = Path(args.target_sbom)
39+
40+
if ".spdx" in base_sbom_path.suffixes:
41+
base_sbom_fmt = "spdx"
42+
elif ".cdx" in target_sbom_path.suffixes:
43+
base_sbom_fmt = "cdx"
44+
45+
if ".spdx" in target_sbom_path.suffixes:
46+
target_sbom_fmt = "spdx"
47+
elif ".cdx" in target_sbom_path.suffixes:
48+
target_sbom_fmt = "cdx"
49+
50+
if not base_sbom_fmt or not target_sbom_fmt:
51+
raise ValueError("can not detect SBOM format for one or both files")
52+
53+
if base_sbom_fmt != target_sbom_fmt:
54+
raise ValueError("can not compare mixed SPDX and CycloneDX documents")
55+
56+
if target_sbom_fmt == "spdx":
57+
base_sbom_obj = SpdxBomReader.read_file(args.base_sbom)
58+
target_sbom_obj = SpdxBomReader.read_file(args.target_sbom)
59+
sbom_compare = SpdxSbomCompare(
60+
distro_name=args.distro_name,
61+
distro_supplier=args.distro_supplier,
62+
distro_version=args.distro_version,
63+
base_distro_vendor=args.base_distro_vendor,
64+
spdx_namespace=args.spdx_namespace,
65+
cdx_serialnumber=args.cdx_serialnumber,
66+
timestamp=args.timestamp,
67+
)
68+
bom = sbom_compare.compare(base_sbom_obj, target_sbom_obj)
69+
if args.out == "-":
70+
BomWriter.write_to_stream(bom, SBOMType.SPDX, sys.stdout, args.validate)
71+
else:
72+
out = args.out
73+
if not out.endswith(".spdx.json"):
74+
out += ".spdx.json"
75+
BomWriter.write_to_file(bom, SBOMType.SPDX, Path(out), args.validate)
76+
77+
if target_sbom_fmt == "cdx":
78+
base_sbom_obj = CdxBomReader.read_file(args.base_sbom)
79+
target_sbom_obj = CdxBomReader.read_file(args.target_sbom)
80+
sbom_compare = CdxSbomCompare(
81+
distro_name=args.distro_name,
82+
distro_supplier=args.distro_supplier,
83+
distro_version=args.distro_version,
84+
base_distro_vendor=args.base_distro_vendor,
85+
spdx_namespace=args.spdx_namespace,
86+
cdx_serialnumber=args.cdx_serialnumber,
87+
timestamp=args.timestamp,
88+
)
89+
bom = sbom_compare.compare(base_sbom_obj, target_sbom_obj)
90+
if args.out == "-":
91+
BomWriter.write_to_stream(bom, SBOMType.CycloneDX, sys.stdout, args.validate)
92+
else:
93+
out = args.out
94+
if not out.endswith(".cdx.json"):
95+
out += ".cdx.json"
96+
BomWriter.write_to_file(bom, SBOMType.CycloneDX, Path(out), args.validate)
97+
98+
@classmethod
99+
def setup_parser(cls, parser):
100+
cls.parser_add_generate_input_args(parser, default_out="extras")
101+
parser.add_argument(
102+
"-b",
103+
"--base-sbom",
104+
required=True,
105+
help="Path to the base (reference) SBOM file"
106+
)
107+
108+
parser.add_argument(
109+
"-n",
110+
"--target-sbom",
111+
required=True,
112+
help="Path to the target (new) SBOM file"
113+
)

src/debsbom/compare/cdx.py

Lines changed: 91 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,91 @@
1+
# Copyright (C) 2025 Siemens
2+
#
3+
# SPDX-License-Identifier: MIT
4+
5+
from collections.abc import Callable
6+
from cyclonedx.model import HashAlgorithm
7+
from cyclonedx.model.bom import Bom
8+
from cyclonedx.model.component import Component
9+
from cyclonedx.model.dependency import Dependency
10+
import itertools
11+
import logging
12+
from sortedcontainers import SortedSet
13+
from uuid import uuid4
14+
15+
from .compare import SbomCompare
16+
from ..generate.cdx import make_distro_component, make_metadata
17+
18+
19+
logger = logging.getLogger(__name__)
20+
21+
22+
class CdxSbomCompare(SbomCompare):
23+
def _load_cdx_sbom(self, sbom):
24+
""
25+
components = {}
26+
logger.info(f"Processing BOM '{sbom.metadata.component.name}'")
27+
28+
for component in sbom.components:
29+
purl = component.purl
30+
components[purl] = component
31+
32+
return components
33+
34+
def _get_cdx_comp_sha256(self, component):
35+
for comp_hash in component.hashes:
36+
if comp_hash.alg == HashAlgorithm.SHA_256:
37+
return comp_hash.content
38+
39+
return None
40+
41+
42+
def compare(self, base_sbom, target_sbom) -> Bom:
43+
base_sbom_comp = self._load_cdx_sbom(base_sbom)
44+
target_sbom_comp = self._load_cdx_sbom(target_sbom)
45+
46+
extras = []
47+
48+
for purl, component in target_sbom_comp.items():
49+
if purl is None:
50+
logger.warning(f"missing PURL for component '{component.name}'")
51+
continue
52+
base_comp_info = base_sbom_comp.get(purl)
53+
54+
if base_comp_info is None:
55+
extras.append(component)
56+
else:
57+
base_comp_sha256 = self._get_cdx_comp_sha256(base_comp_info)
58+
target_comp_sha256 = self._get_cdx_comp_sha256(component)
59+
60+
if None not in (base_comp_sha256, target_comp_sha256) and base_comp_sha256 != target_comp_sha256:
61+
extras.append(component)
62+
63+
distro_component = make_distro_component(
64+
self.distro_name, self.distro_version, self.distro_supplier
65+
)
66+
bom_metadata = make_metadata(distro_component, self.timestamp)
67+
68+
#distro_deps = []
69+
#for root_bom_ref in root_bom_refs:
70+
# distro_deps.append(Dependency(ref=root_bom_ref))
71+
72+
#dependency = Dependency(
73+
# ref=distro_component.bom_ref,
74+
# dependencies=distro_deps,
75+
#)
76+
#logger.debug(f"Created distro dependency: {dependency}")
77+
#dependencies[dependency.ref] = dependency
78+
79+
80+
if self.cdx_serialnumber is None:
81+
serial_number = uuid4()
82+
else:
83+
serial_number = self.cdx_serialnumber
84+
85+
bom = Bom(
86+
serial_number=serial_number,
87+
metadata=bom_metadata,
88+
components=list(extras),
89+
)
90+
91+
return bom

src/debsbom/compare/compare.py

Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,40 @@
1+
# Copyright (C) 2025 Siemens
2+
#
3+
# SPDX-License-Identifier: MIT
4+
5+
from abc import abstractmethod
6+
from collections.abc import Callable
7+
from datetime import datetime
8+
from uuid import UUID
9+
10+
11+
class SbomCompare:
12+
"""Base class for comparing SBOMs."""
13+
14+
def __init__(
15+
self,
16+
distro_name: str,
17+
distro_supplier: str | None = None,
18+
distro_version: str | None = None,
19+
base_distro_vendor: str | None = "debian",
20+
spdx_namespace: tuple | None = None, # 6 item tuple representing an URL
21+
cdx_serialnumber: UUID | None = None,
22+
timestamp: datetime | None = None,
23+
):
24+
self.distro_name = distro_name
25+
self.distro_supplier = distro_supplier
26+
self.distro_version = distro_version
27+
self.base_distro_vendor = base_distro_vendor
28+
self.namespace = spdx_namespace
29+
self.cdx_serialnumber = cdx_serialnumber
30+
if timestamp is None:
31+
self.timestamp = datetime.now()
32+
else:
33+
self.timestamp = timestamp
34+
35+
@classmethod
36+
@abstractmethod
37+
#def merge(cls, sboms, progress_cb: Callable[[int, int, str], None] | None = None):
38+
def compare(cls, base_sbom, target_sbom):
39+
"""Compare the SBOMs."""
40+
raise NotImplementedError()

src/debsbom/compare/spdx.py

Lines changed: 124 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,124 @@
1+
# Copyright (C) 2025 Siemens
2+
#
3+
# SPDX-License-Identifier: MIT
4+
5+
from collections.abc import Callable
6+
import itertools
7+
import logging
8+
from spdx_tools.spdx.model.checksum import Checksum, ChecksumAlgorithm
9+
from spdx_tools.spdx.model.document import Document
10+
from spdx_tools.spdx.model.spdx_no_assertion import SpdxNoAssertion
11+
from spdx_tools.spdx.model.package import Package
12+
from spdx_tools.spdx.model.relationship import Relationship, RelationshipType
13+
14+
from ..generate.spdx import make_creation_info, make_distro_package
15+
from .compare import SbomCompare
16+
from ..sbom import (
17+
SPDX_REF_DOCUMENT,
18+
SPDX_REFERENCE_TYPE_PURL,
19+
)
20+
21+
22+
logger = logging.getLogger(__name__)
23+
24+
25+
class SpdxSbomCompare(SbomCompare):
26+
def _load_spdx_sbom(self, sbom):
27+
""
28+
packages = {}
29+
logger.info(f"Processing BOM '{sbom.creation_info.name}'")
30+
31+
for package in sbom.packages:
32+
for external_ref in package.external_references:
33+
if external_ref.reference_type == SPDX_REFERENCE_TYPE_PURL:
34+
purl = external_ref.locator
35+
packages[purl] = package
36+
37+
return packages
38+
39+
def _get_spdx_pkg_sha256(self, package):
40+
for checksum in package.checksums:
41+
if checksum.algorithm == ChecksumAlgorithm.SHA256:
42+
return checksum.value
43+
return None
44+
45+
def compare(self, base_sbom, target_sbom) -> Document:
46+
base_sbom_pkgs = self._load_spdx_sbom(base_sbom)
47+
target_sbom_pkgs = self._load_spdx_sbom(target_sbom)
48+
49+
extras = []
50+
51+
for purl, package in target_sbom_pkgs.items():
52+
base_pkg = base_sbom_pkgs.get(purl)
53+
54+
if base_pkg is None:
55+
extras.append(package)
56+
else:
57+
base_pkg_sha256 = self._get_spdx_pkg_sha256(base_pkg)
58+
target_pkg_sha256 = self._get_spdx_pkg_sha256(package)
59+
if None not in (base_pkg_sha256, target_pkg_sha256) and base_pkg_sha256 != target_pkg_sha256:
60+
extras.append(package)
61+
62+
#for rel in doc.relationships:
63+
# if progress_cb:
64+
# progress_cb(cur_step, num_steps, f"Relationship: {rel.spdx_element_id}")
65+
# cur_step += 1
66+
# if (
67+
# rel.spdx_element_id == SPDX_REF_DOCUMENT
68+
# and rel.relationship_type == RelationshipType.DESCRIBES
69+
# ):
70+
# skip adding the root DESCRIBES relationship
71+
# continue
72+
# element_id = rel.spdx_element_id
73+
# if element_id in id_map:
74+
# rel.spdx_element_id = id_map[rel.spdx_element_id]
75+
# rel_element_id = rel.related_spdx_element_id
76+
# if rel_element_id in id_map:
77+
# rel.related_spdx_element_id = id_map[rel.related_spdx_element_id]
78+
79+
# we can not use a set since the relationships
80+
# do not implement hash(..), so create the hash by hand
81+
# rel_hash = self._hash_relationship(rel)
82+
# if rel_hash not in relationships:
83+
# relationships[rel_hash] = rel
84+
85+
#distro_pkg = make_distro_package(
86+
# distro_name=self.distro_name,
87+
# distro_version=self.distro_version,
88+
# distro_supplier=self.distro_supplier,
89+
#)
90+
#distro_ref = distro_pkg.spdx_id
91+
#packages[distro_ref] = distro_pkg
92+
93+
# set up relationships between the distro package and the merged documents
94+
#relationships = list(
95+
# itertools.chain(
96+
# relationships.values(),
97+
# map(
98+
# lambda root_id: Relationship(
99+
# spdx_element_id=root_id,
100+
# relationship_type=RelationshipType.PACKAGE_OF,
101+
# related_spdx_element_id=distro_ref,
102+
# ),
103+
# root_ids,
104+
# ),
105+
# )
106+
#)
107+
108+
#distro_relationship = Relationship(
109+
# spdx_element_id=SPDX_REF_DOCUMENT,
110+
# relationship_type=RelationshipType.DESCRIBES,
111+
# related_spdx_element_id=distro_ref,
112+
#)
113+
#logger.debug(f"Created document relationship: {distro_relationship}")
114+
115+
#relationships.append(distro_relationship)
116+
117+
#packages = itertools.chain(packages.values(), non_purl_packages)
118+
119+
creation_info = make_creation_info(self.distro_name, self.namespace, self.timestamp)
120+
document = Document(
121+
creation_info=creation_info,
122+
packages=list(extras),
123+
)
124+
return document

0 commit comments

Comments
 (0)