-
Notifications
You must be signed in to change notification settings - Fork 1
Feature/382 add nox task for dependencies update #393
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
e09ced4
5b02cc6
74d9f89
ccf9963
d4ffc3f
a1734f5
864a525
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change | ||||
---|---|---|---|---|---|---|
|
@@ -2,17 +2,28 @@ | |||||
|
||||||
import argparse | ||||||
import json | ||||||
import re | ||||||
import subprocess | ||||||
import tempfile | ||||||
from dataclasses import dataclass | ||||||
from contextlib import contextmanager | ||||||
from dataclasses import ( | ||||||
dataclass, | ||||||
field, | ||||||
) | ||||||
from inspect import cleandoc | ||||||
from json import loads | ||||||
from pathlib import Path | ||||||
from subprocess import CompletedProcess | ||||||
|
||||||
import nox | ||||||
import tomlkit | ||||||
from nox import Session | ||||||
|
||||||
from exasol.toolbox.security import ( | ||||||
GitHubVulnerabilityIssue, | ||||||
from_pip_audit, | ||||||
) | ||||||
|
||||||
|
||||||
@dataclass(frozen=True) | ||||||
class Package: | ||||||
|
@@ -30,15 +41,11 @@ def _dependencies(toml_str: str) -> dict[str, list]: | |||||
|
||||||
packages = poetry.get("dependencies", {}) | ||||||
if packages: | ||||||
dependencies["project"] = [] | ||||||
for package in packages: | ||||||
dependencies["project"].append(package) | ||||||
dependencies["project"] = [package for package in packages] | ||||||
|
||||||
packages = poetry.get("dev", {}).get("dependencies", {}) | ||||||
if packages: | ||||||
dependencies["dev"] = [] | ||||||
for package in packages: | ||||||
dependencies["dev"].append(package) | ||||||
dependencies["dev"] = [package for package in packages] | ||||||
|
||||||
groups = poetry.get("group", {}) | ||||||
for group in groups: | ||||||
|
@@ -260,13 +267,15 @@ def _parse_args(session) -> argparse.Namespace: | |||||
) | ||||||
return parser.parse_args(args=session.posargs) | ||||||
|
||||||
def run(self, session: Session) -> None: | ||||||
args = self._parse_args(session) | ||||||
|
||||||
command = ["poetry", "run", "pip-audit", "-f", "json"] | ||||||
def audit(self) -> tuple[dict, CompletedProcess]: | ||||||
command = ("poetry", "run", "pip-audit", "-f", "json") | ||||||
output = subprocess.run(command, capture_output=True) | ||||||
|
||||||
audit_json = self._filter_json_for_vulnerabilities(output.stdout) | ||||||
return audit_json, output | ||||||
|
||||||
def run(self, session: Session) -> None: | ||||||
args = self._parse_args(session) | ||||||
audit_json, output = self.audit() | ||||||
if args.output: | ||||||
with open(args.output, "w") as file: | ||||||
json.dump(audit_json, file) | ||||||
|
@@ -275,8 +284,226 @@ def run(self, session: Session) -> None: | |||||
|
||||||
if output.returncode != 0: | ||||||
session.warn( | ||||||
f"Command {' '.join(command)} failed with exit code {output.returncode}", | ||||||
f"Command {' '.join(output.args)} failed with exit code {output.returncode}", | ||||||
) | ||||||
|
||||||
|
||||||
@dataclass(frozen=True) | ||||||
class PackageVersion: | ||||||
name: str | ||||||
version: str | ||||||
|
||||||
|
||||||
@dataclass | ||||||
class PackageVersionTracker: | ||||||
""" | ||||||
Tracks direct dependencies for package versions before & after updates | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I am confused by this docstring, do you keep the package versions for direct dependencies, that would make more sense for me |
||||||
|
||||||
Assumption: | ||||||
- The dependency ranges in the pyproject.toml allows users to often update | ||||||
transitive dependencies on their own. It is, therefore, more important for us to | ||||||
track the changes of direct dependencies and, if present, the resolution of both | ||||||
vulnerabilities for direct and transitive dependencies. | ||||||
""" | ||||||
|
||||||
before_env: set[PackageVersion] = field(default_factory=set) | ||||||
after_env: set[PackageVersion] = field(default_factory=set) | ||||||
|
||||||
@staticmethod | ||||||
def _obtain_version_set() -> set[PackageVersion]: | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. How about naming the method There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. If it is a function it should contain a verb |
||||||
def _get_package_version(line: str) -> PackageVersion: | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. As this is a nested function, I think we can omit the underscore prefix I propose |
||||||
pattern = r"\s+(\d+(?:\.\d+)*)\s+" | ||||||
groups = re.split(pattern, line) | ||||||
return PackageVersion(name=groups[0], version=groups[1]) | ||||||
|
||||||
command = ("poetry", "show", "--top-level") | ||||||
result = subprocess.run(command, capture_output=True, check=True) | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. You can use argument |
||||||
return { | ||||||
_get_package_version(line) | ||||||
for line in result.stdout.decode("utf-8").splitlines() | ||||||
} | ||||||
|
||||||
@property | ||||||
def changes(self) -> tuple: | ||||||
before_update_dict = {pkg.name: pkg for pkg in self.before_env} | ||||||
after_update_dict = {pkg.name: pkg for pkg in self.after_env} | ||||||
|
||||||
def _get_change_str(pkg_name: str) -> str | None: | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. As this is a nested function, I think we can omit the underscore prefix There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Please take a look at the Project keeper: Maybe it's a good idea to maintain three separate lists of updated, added, and removed dependencies and only merge or resp. render them later on when creating a report? References in PK: |
||||||
if pkg_name not in after_update_dict.keys(): | ||||||
entry = before_update_dict[pkg_name] | ||||||
return f"* Removed {entry.name} ({entry.version})" | ||||||
if pkg_name not in before_update_dict.keys(): | ||||||
entry = after_update_dict[pkg_name] | ||||||
return f"* Added {entry.name} ({entry.version})" | ||||||
before_entry = before_update_dict[pkg_name] | ||||||
after_entry = after_update_dict[pkg_name] | ||||||
if before_entry.version != after_entry.version: | ||||||
return f"* Updated {pkg_name} ({before_entry.version} → {after_entry.version})" | ||||||
return None | ||||||
|
||||||
all_packages = before_update_dict.keys() | after_update_dict.keys() | ||||||
return tuple( | ||||||
change_str | ||||||
for pkg_name in all_packages | ||||||
if (change_str := _get_change_str(pkg_name)) | ||||||
) | ||||||
|
||||||
@property | ||||||
def packages(self) -> set[str]: | ||||||
return {pkg.name for pkg in self.before_env} | ||||||
|
||||||
def __enter__(self) -> PackageVersionTracker: | ||||||
self.before_env = self._obtain_version_set() | ||||||
return self | ||||||
|
||||||
def __exit__(self, exc_type, exc_val, exc_tb): | ||||||
self.after_env = self._obtain_version_set() | ||||||
|
||||||
|
||||||
@contextmanager | ||||||
def managed_file(file_obj: argparse.FileType): | ||||||
ArBridgeman marked this conversation as resolved.
Show resolved
Hide resolved
|
||||||
"""Context manager to manage a file provided by argparse""" | ||||||
yield file_obj | ||||||
|
||||||
|
||||||
@dataclass | ||||||
class VulnerabilityTracker: | ||||||
"""Tracks the resolution of GitHubVulnerabilityIssues before & after updates""" | ||||||
|
||||||
to_resolve: set[GitHubVulnerabilityIssue] = field(default_factory=set) | ||||||
resolved: set[GitHubVulnerabilityIssue] = field(default_factory=set) | ||||||
not_resolved: set[GitHubVulnerabilityIssue] = field(default_factory=set) | ||||||
|
||||||
def __init__(self, vulnerability_issues: argparse.FileType | None): | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I recommend to not execute time-consuming or potentially error raising tasks in the constructor, but rather to have a static or classmethod doing this and then only calling the constructor with prepared arguments. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Please add some more information to the docstring either of the class or the proposed static method explaining the input data and/or it's expected format. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Try to change to a set or more explicit start than init (not pytest-friendly) |
||||||
self.to_resolve: set[GitHubVulnerabilityIssue] = self._set_to_resolve( | ||||||
vulnerability_issues | ||||||
) | ||||||
|
||||||
@staticmethod | ||||||
def _set_to_resolve( | ||||||
vulnerability_issues: argparse.FileType | None, | ||||||
) -> set[GitHubVulnerabilityIssue]: | ||||||
if not vulnerability_issues: | ||||||
return set() | ||||||
with managed_file(vulnerability_issues) as f: | ||||||
lines = f.readlines() | ||||||
Comment on lines
+388
to
+389
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Actually, I don't understand this part. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Awkward doing iteration twice Check if can pass f directly to the extract_from_json. Or generator passing through to single to get. |
||||||
return set(GitHubVulnerabilityIssue.extract_from_jsonl(lines)) | ||||||
|
||||||
def _split_resolution_status(self) -> None: | ||||||
to_resolve_by_cve = {vuln.cve: vuln for vuln in self.to_resolve} | ||||||
cves_to_resolve = set(to_resolve_by_cve.keys()) | ||||||
|
||||||
audit_json, _ = Audit().audit() | ||||||
cve_audit = {vuln.cve for vuln in from_pip_audit(json.dumps(audit_json))} | ||||||
|
||||||
cves_not_resolved = cves_to_resolve.intersection(cve_audit) | ||||||
|
||||||
self.not_resolved = {to_resolve_by_cve[cve] for cve in cves_not_resolved} | ||||||
self.resolved = self.to_resolve - self.not_resolved | ||||||
|
||||||
def get_packages(self) -> set[str]: | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. How about? @property
def packages(self) -> set[str]: |
||||||
return {vuln.coordinates.split(":")[0] for vuln in self.to_resolve} | ||||||
|
||||||
@property | ||||||
def issues_not_resolved(self) -> tuple[str, ...]: | ||||||
return tuple( | ||||||
f"* Did NOT resolve {vuln.issue_url} ({vuln.cve})" | ||||||
for vuln in self.not_resolved | ||||||
) | ||||||
|
||||||
@property | ||||||
def issues_resolved(self) -> tuple[str, ...]: | ||||||
return tuple( | ||||||
f"* Closes {vuln.issue_url} ({vuln.cve})" for vuln in self.resolved | ||||||
) | ||||||
|
||||||
@property | ||||||
def summary(self) -> tuple[str, ...]: | ||||||
return tuple( | ||||||
cleandoc( | ||||||
f"""{vuln.cve} in dependency `{vuln.coordinates}`\n {vuln.description} | ||||||
""" | ||||||
Comment on lines
+407
to
+425
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think these properties / methods can then be moved to a rendering instance. I propose having a class |
||||||
) | ||||||
for vuln in self.resolved | ||||||
) | ||||||
|
||||||
@property | ||||||
def vulnerabilities_resolved(self) -> tuple[str, ...]: | ||||||
def get_issue_number(issue_url: str) -> str | None: | ||||||
pattern = r"/issues/(\d+)$" | ||||||
match = re.search(pattern, issue_url) | ||||||
return match.group(1) if match else None | ||||||
|
||||||
return tuple( | ||||||
f"* #{get_issue_number(vuln.issue_url)} Fixed vulnerability {vuln.cve} in `{vuln.coordinates}`" | ||||||
for vuln in self.resolved | ||||||
) | ||||||
|
||||||
def __enter__(self) -> VulnerabilityTracker: | ||||||
return self | ||||||
|
||||||
def __exit__(self, exc_type, exc_val, exc_tb): | ||||||
self._split_resolution_status() | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. How about making the changes to the attributes transparent: self.not_resolved, self.resolved = _split_vulnerabilities(self.to_resolve) This way we can even make the split method independent of the class. |
||||||
|
||||||
|
||||||
@dataclass(frozen=True) | ||||||
class DependencyChanges: | ||||||
package_changes: tuple[str, ...] | ||||||
issues_resolved: tuple[str, ...] | ||||||
issues_not_resolved: tuple[str, ...] | ||||||
vulnerabilities_resolved: tuple[str, ...] | ||||||
vulnerabilities_resolved_summary: tuple[str, ...] | ||||||
|
||||||
|
||||||
class DependencyUpdate: | ||||||
"""Update dependencies""" | ||||||
|
||||||
@staticmethod | ||||||
def _parse_args(session) -> argparse.Namespace: | ||||||
parser = argparse.ArgumentParser( | ||||||
description="Updates dependencies & returns changes", | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Please be a bit more specific or add a reference: What are "changes"? A report? A section for the changelog file? And what is meant by update dependencies? Is it in There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I also see a mismatch between the name of the nox task Would it be possible to stick to a single naming convention? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. check hint text -> should be update |
||||||
usage="nox -s dependency:audit -- -- [options]", | ||||||
) | ||||||
parser.add_argument( | ||||||
"-v", | ||||||
"--vulnerability-issues", | ||||||
type=argparse.FileType("r"), | ||||||
default=None, | ||||||
help="JSONL of vulnerabilities (of type `GitHubVulnerabilityIssue`)", | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Please add a reference to JSONL format, e.g. https://jsonlines.org/ |
||||||
) | ||||||
return parser.parse_args(args=session.posargs) | ||||||
|
||||||
@staticmethod | ||||||
def _perform_basic_vulnerability_update( | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. What is basic and what is it not? I propose to either rename the method (to not contain basic) or at least to add a docstring, informing about potential future additions or current limitations rendering the update to only be basic. |
||||||
pkg_tracker: PackageVersionTracker, vuln_tracker: VulnerabilityTracker | ||||||
) -> None: | ||||||
vuln_packages = vuln_tracker.get_packages() | ||||||
|
||||||
# vulnerabilities of direct dependencies require a pyproject.toml update | ||||||
vuln_direct_dependencies = vuln_packages.intersection(pkg_tracker.packages) | ||||||
if vuln_direct_dependencies: | ||||||
command = ("poetry", "up") + tuple(vuln_direct_dependencies) | ||||||
subprocess.run(command, capture_output=True) | ||||||
|
||||||
command = ("poetry", "update") + tuple(vuln_packages) | ||||||
subprocess.run(command, capture_output=True) | ||||||
|
||||||
def run(self, session: Session) -> DependencyChanges: | ||||||
"""Update the dependencies associated with GitHubVulnerabilityIssues""" | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Please ignore, if affected is incorrect in this context, and we need to stick to associated, here.
Suggested change
|
||||||
args = self._parse_args(session) | ||||||
with PackageVersionTracker() as pkg_tracker: | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I would enjoy a literate phrasing like the following with Dependencies() as dependencies:
with Vulnerabilities(args.vulnerability_issues) as vulnerabilities: Would that be possible / make sense? |
||||||
with VulnerabilityTracker(args.vulnerability_issues) as vuln_tracker: | ||||||
self._perform_basic_vulnerability_update( | ||||||
pkg_tracker=pkg_tracker, vuln_tracker=vuln_tracker | ||||||
) | ||||||
|
||||||
return DependencyChanges( | ||||||
package_changes=pkg_tracker.changes, | ||||||
issues_resolved=vuln_tracker.issues_resolved, | ||||||
issues_not_resolved=vuln_tracker.issues_not_resolved, | ||||||
vulnerabilities_resolved=vuln_tracker.vulnerabilities_resolved, | ||||||
vulnerabilities_resolved_summary=vuln_tracker.summary, | ||||||
) | ||||||
|
||||||
|
||||||
@nox.session(name="dependency:licenses", python=False) | ||||||
|
@@ -292,3 +519,19 @@ def dependency_licenses(session: Session) -> None: | |||||
def audit(session: Session) -> None: | ||||||
"""Check for known vulnerabilities""" | ||||||
Audit().run(session=session) | ||||||
|
||||||
|
||||||
@nox.session(name="dependency:update", python=False) | ||||||
def update(session: Session) -> None: | ||||||
"""Updates dependencies & returns changes""" | ||||||
dependency_changes = DependencyUpdate().run(session) | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Here we see, that actually There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Additionally, class There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. maybe rather in a module -> |
||||||
print("Resolved issues") | ||||||
print(*dependency_changes.issues_resolved, sep="\n") | ||||||
print("\nNot resolved issues") | ||||||
print(*dependency_changes.issues_not_resolved, sep="\n") | ||||||
print("\nSummary") | ||||||
print(*dependency_changes.vulnerabilities_resolved_summary, sep="\n") | ||||||
print("\nSecurity fixes") | ||||||
print(*dependency_changes.vulnerabilities_resolved, sep="\n") | ||||||
print("\nDependencies") | ||||||
print(*dependency_changes.package_changes, sep="\n") |
Uh oh!
There was an error while loading. Please reload this page.