Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Misc updates #85

Merged
merged 8 commits into from
Aug 17, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions flake.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

51 changes: 24 additions & 27 deletions nixgraph/graph.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
import graphviz as gv

from sbomnix.utils import (
LOGGER_NAME,
LOG,
LOG_SPAM,
exec_cmd,
df_to_csv_file,
Expand All @@ -29,9 +29,6 @@

from sbomnix.nix import find_deriver

###############################################################################

_LOG = logging.getLogger(LOGGER_NAME)

###############################################################################

Expand Down Expand Up @@ -98,12 +95,12 @@ def draw(self, start_path, args):
df = df_regex_filter(self.df, "src_path", self.inverse_regex)
for row in df.itertuples():
inverse_path = row.src_path
_LOG.debug("Start path inverse: %s", inverse_path)
LOG.debug("Start path inverse: %s", inverse_path)
nixfilter = NixGraphFilter(src_path=inverse_path)
self._graph(nixfilter)
else:
# Otherwise, draw the graph starting from the given start_path
_LOG.debug("Start path: %s", start_path)
LOG.debug("Start path: %s", start_path)
nixfilter = NixGraphFilter(target_path=start_path)
self._graph(nixfilter)

Expand All @@ -112,12 +109,12 @@ def draw(self, start_path, args):
self._render(args.out)
elif self.df_out_csv is not None and not self.df_out_csv.empty:
if hasattr(args, "return_df") and args.return_df:
_LOG.debug("Returning graph as dataframe")
LOG.debug("Returning graph as dataframe")
return self.df_out_csv
# Output csv if csv format was specified
df_to_csv_file(self.df_out_csv, args.out)
else:
_LOG.warning("Nothing to draw")
LOG.warning("Nothing to draw")
return None

def _init_df_out(self, args):
Expand All @@ -137,21 +134,21 @@ def _render(self, filename):
fname, extension = os.path.splitext(filename)
gformat = extension[1:]
self.digraph.render(filename=fname, format=gformat, cleanup=True)
_LOG.info("Wrote: %s", filename)
LOG.info("Wrote: %s", filename)

def _graph(self, nixfilter, curr_depth=0):
curr_depth += 1
if curr_depth > self.maxdepth:
_LOG.log(LOG_SPAM, "Reached maxdepth: %s", self.maxdepth)
LOG.log(LOG_SPAM, "Reached maxdepth: %s", self.maxdepth)
return
df = self._query(nixfilter, curr_depth)
if df.empty and curr_depth == 1:
# First match failed: print debug message and stop
_LOG.debug("No matching packages found")
LOG.debug("No matching packages found")
return
if df.empty:
# Reached leaf: no more matches
_LOG.debug("%sFound nothing", (DBG_INDENT * (curr_depth - 1)))
LOG.debug("%sFound nothing", (DBG_INDENT * (curr_depth - 1)))
return
if self.df_out_csv is not None:
df.insert(0, "graph_depth", curr_depth)
Expand All @@ -160,10 +157,10 @@ def _graph(self, nixfilter, curr_depth=0):
self._dbg_print_row(row, curr_depth)
# Stop drawing if 'until_regex' matches
if regex_match(self.until_regex, row.target_pname):
_LOG.debug("%sReached until_function", (DBG_INDENT * (curr_depth - 1)))
LOG.debug("%sReached until_function", (DBG_INDENT * (curr_depth - 1)))
continue
if self._path_drawn(row):
_LOG.debug("%sSkipping duplicate path", (DBG_INDENT * (curr_depth - 1)))
LOG.debug("%sSkipping duplicate path", (DBG_INDENT * (curr_depth - 1)))
continue
# Add source node
self._add_node(row.src_path, row.src_pname)
Expand Down Expand Up @@ -192,7 +189,7 @@ def _path_drawn(self, row):

def _query(self, nixfilter, depth):
query_str = nixfilter.get_query_str()
_LOG.debug("%sFiltering by: %s", (DBG_INDENT * (depth - 1)), query_str)
LOG.debug("%sFiltering by: %s", (DBG_INDENT * (depth - 1)), query_str)
if self.df.empty:
return pd.DataFrame()
return self.df.query(query_str)
Expand Down Expand Up @@ -220,7 +217,7 @@ def _add_node(self, path, pname):
self.digraph.node(node_id, label, style="rounded,filled", fillcolor=fillcolor)

def _dbg_print_row(self, row, depth):
_LOG.log(
LOG.log(
LOG_SPAM,
"%sFound: %s ==> %s",
(DBG_INDENT * (depth - 1)),
Expand Down Expand Up @@ -253,10 +250,10 @@ class NixDependencies:
"""Parse nix package dependencies"""

def __init__(self, nix_path, buildtime=False):
_LOG.debug("nix_path: %s", nix_path)
LOG.debug("nix_path: %s", nix_path)
self.dependencies = set()
self.dtype = "buildtime" if buildtime else "runtime"
_LOG.info("Loading %s dependencies referenced by '%s'", self.dtype, nix_path)
LOG.info("Loading %s dependencies referenced by '%s'", self.dtype, nix_path)
drv_path = _find_deriver(nix_path)
self.nix_store_path = _get_nix_store_path(drv_path)
if buildtime:
Expand All @@ -266,21 +263,21 @@ def __init__(self, nix_path, buildtime=False):
self.start_path = _find_outpath(drv_path)
self._parse_runtime_dependencies(drv_path)
if len(self.dependencies) <= 0:
_LOG.info("No %s dependencies", self.dtype)
LOG.info("No %s dependencies", self.dtype)

def _parse_runtime_dependencies(self, drv_path):
# nix-store -u -q --graph outputs runtime dependencies.
# We need to use -f (--force-realise) since runtime-only dependencies
# can not be determined unless the output paths are realised.
nix_query_out = exec_cmd(["nix-store", "-u", "-f", "-q", "--graph", drv_path])
_LOG.log(LOG_SPAM, "nix_query_out: %s", nix_query_out)
LOG.log(LOG_SPAM, "nix_query_out: %s", nix_query_out)
self._parse_nix_query_out(nix_query_out)

def _parse_buildtime_dependencies(self, drv_path):
# nix-store -q --graph outputs buildtime dependencies when applied
# to derivation path
nix_query_out = exec_cmd(["nix-store", "-q", "--graph", drv_path])
_LOG.log(LOG_SPAM, "nix_query_out: %s", nix_query_out)
LOG.log(LOG_SPAM, "nix_query_out: %s", nix_query_out)
self._parse_nix_query_out(nix_query_out)

def _parse_nix_query_out(self, nix_query_out):
Expand Down Expand Up @@ -315,7 +312,7 @@ def to_dataframe(self):
by=["src_pname", "src_path", "target_pname", "target_path"],
inplace=True,
)
if _LOG.level <= logging.DEBUG:
if LOG.level <= logging.DEBUG:
df_to_csv_file(df, f"nixgraph_deps_{self.dtype}.csv")
return df

Expand All @@ -338,16 +335,16 @@ def _get_nix_store_path(nix_path):
store_path_match = re_nix_store_path.match(nix_path)
if store_path_match:
store_path = store_path_match.group("store_path")
_LOG.debug("Using nix store path: '%s'", store_path)
LOG.debug("Using nix store path: '%s'", store_path)
return store_path


def _find_deriver(nix_path):
drv_path = find_deriver(nix_path)
if not drv_path:
_LOG.fatal("No deriver found for: '%s", nix_path)
LOG.fatal("No deriver found for: '%s", nix_path)
sys.exit(1)
_LOG.debug("nix_drv: %s", drv_path)
LOG.debug("nix_drv: %s", drv_path)
return drv_path


Expand All @@ -362,9 +359,9 @@ def _find_outpath(nix_path):
]
).strip()
if not out_path:
_LOG.fatal("No outpath found for: '%s'", nix_path)
LOG.fatal("No outpath found for: '%s'", nix_path)
sys.exit(1)
_LOG.debug("out_path: %s", out_path)
LOG.debug("out_path: %s", out_path)
return out_path


Expand Down
18 changes: 10 additions & 8 deletions nixgraph/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,15 +7,16 @@
""" Python script to query and visualize nix package dependencies """

import argparse
import logging
import pathlib
import sys
from nixgraph.graph import NixDependencies
from sbomnix.utils import setup_logging, get_py_pkg_version, check_positive, LOGGER_NAME

###############################################################################

_LOG = logging.getLogger(LOGGER_NAME)
from sbomnix.utils import (
LOG,
set_log_verbosity,
get_py_pkg_version,
check_positive,
exit_unless_nix_artifact,
)

###############################################################################

Expand Down Expand Up @@ -81,11 +82,12 @@ def getargs():
def main():
"""main entry point"""
args = getargs()
setup_logging(args.verbose)
set_log_verbosity(args.verbose)
if not args.NIX_PATH.exists():
_LOG.fatal("Invalid path: '%s'", args.NIX_PATH)
LOG.fatal("Invalid path: '%s'", args.NIX_PATH)
sys.exit(1)
target_path = args.NIX_PATH.resolve().as_posix()
exit_unless_nix_artifact(target_path)
deps = NixDependencies(target_path, args.buildtime)
deps.graph(args)

Expand Down
39 changes: 18 additions & 21 deletions sbomnix/cpe.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@

""" Generate CPE (Common Platform Enumeration) identifiers"""

import logging
import sys
import pathlib
import string
Expand All @@ -15,16 +14,14 @@
import requests

from sbomnix.utils import (
LOGGER_NAME,
LOG,
LOG_SPAM,
df_from_csv_file,
df_log,
)

###############################################################################

_LOG = logging.getLogger(LOGGER_NAME)

CACHE_DIR = "~/.cache/sbomnix"

###############################################################################
Expand All @@ -43,44 +40,44 @@ class _CPE:
_instance = None

def __init__(self):
_LOG.debug("")
LOG.debug("")
self.cpedict = pathlib.PosixPath(CACHE_DIR).expanduser() / "cpes.csv"
self.cpedict.parent.mkdir(parents=True, exist_ok=True)
self.df_cpedict = self._load_cpedict()
if self.df_cpedict is not None:
# Verify the loaded cpedict contains at least the following columns
required_cols = {"vendor", "product"}
if not required_cols.issubset(self.df_cpedict):
_LOG.fatal(
LOG.fatal(
"Missing required columns %s from cpedict, manually check: '%s'",
required_cols,
self.cpedict,
)
sys.exit(1)

def _load_cpedict(self):
_LOG.debug("")
LOG.debug("")
if not self.cpedict.exists() or self.cpedict.stat().st_size <= 0:
# Try updating cpe dictionary if it's not cached
if not self._update_cpedict():
_LOG.warning(
LOG.warning(
"Missing '%s': CPE identifiers will be inaccurate", self.cpedict
)
return None
cpe_updated = datetime.datetime.fromtimestamp(self.cpedict.lstat().st_mtime)
week_ago = datetime.datetime.now() - datetime.timedelta(days=7)
if cpe_updated < week_ago:
# Try updating cpe dictionary if it wasn't recently updated
_LOG.debug("Attempting periodic update of cpe dictionary")
LOG.debug("Attempting periodic update of cpe dictionary")
if not self._update_cpedict():
_LOG.warning(
LOG.warning(
"CPE data is not up-to-date: CPE identifiers will be inaccurate"
)
return df_from_csv_file(self.cpedict)

def _update_cpedict(self):
"""Updates local cpe dictionary"""
_LOG.debug("")
LOG.debug("")
cpedict_bak = None
if self.cpedict.exists() and self.cpedict.stat().st_size > 0:
# Backup the original cpedict to be able to rollback in case the update
Expand All @@ -93,34 +90,34 @@ def _update_cpedict(self):
f.write(requests.get(url, stream=True, timeout=10).content)
return True
except requests.exceptions.RequestException as e:
_LOG.warning("CPE data update failed: %s", e)
LOG.warning("CPE data update failed: %s", e)
if cpedict_bak:
_LOG.debug("Rollback earlier cpedict after failed update")
LOG.debug("Rollback earlier cpedict after failed update")
shutil.copy(cpedict_bak, self.cpedict)
return False

def _cpedict_vendor(self, product):
if not product or len(product) == 1:
_LOG.debug("invalid product name '%s'", product)
LOG.debug("invalid product name '%s'", product)
return None
if self.df_cpedict is None:
_LOG.log(LOG_SPAM, "missing cpedict")
LOG.log(LOG_SPAM, "missing cpedict")
return None
df = self.df_cpedict[self.df_cpedict["product"] == product]
if len(df) == 0:
_LOG.log(LOG_SPAM, "no matches for product '%s'", product)
LOG.log(LOG_SPAM, "no matches for product '%s'", product)
return None
if len(df) != 1:
# If there is more than one product with the same name,
# we cannot determine which vendor name should be used for the CPE.
# Therefore, if more than one product names match, treat it the
# same way as if there were no matches (returning None).
_LOG.log(LOG_SPAM, "more than one match for product '%s':", product)
LOG.log(LOG_SPAM, "more than one match for product '%s':", product)
df_log(df, LOG_SPAM)
return None

vendor = df["vendor"].values[0]
_LOG.log(LOG_SPAM, "found vendor for product '%s': '%s'", product, vendor)
LOG.log(LOG_SPAM, "found vendor for product '%s': '%s'", product, vendor)
return vendor

def _candidate_vendor(self, product):
Expand All @@ -137,12 +134,12 @@ def _candidate_vendor(self, product):
# possible trailing digits from the original product name
product_mod = product.rstrip(string.digits)
if product != product_mod:
_LOG.log(LOG_SPAM, "re-trying with product name '%s'", product_mod)
LOG.log(LOG_SPAM, "re-trying with product name '%s'", product_mod)
vendor = self._cpedict_vendor(product_mod)
if not vendor:
# Fallback: use the product name as vendor name
vendor = product
_LOG.log(LOG_SPAM, "fallback: use product name as vendor '%s'", vendor)
LOG.log(LOG_SPAM, "fallback: use product name as vendor '%s'", vendor)
return vendor

def generate(self, name, version):
Expand All @@ -152,7 +149,7 @@ def generate(self, name, version):
cpe_version = version.strip()
cpe_end = "*:*:*:*:*:*:*"
ret = f"cpe:2.3:a:{cpe_vendor}:{cpe_product}:{cpe_version}:{cpe_end}"
_LOG.log(LOG_SPAM, "CPE: '%s'", ret)
LOG.log(LOG_SPAM, "CPE: '%s'", ret)
return ret


Expand Down
Loading
Loading