Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Handle archives and preferred content better #219

Open
wants to merge 4 commits into
base: rawhide
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
55 changes: 52 additions & 3 deletions org_fedora_oscap/content_discovery.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
import logging
import pathlib
import shutil
import os
from glob import glob
from typing import List

Expand All @@ -12,6 +13,7 @@
from org_fedora_oscap import data_fetch, utils
from org_fedora_oscap import common
from org_fedora_oscap import content_handling
from org_fedora_oscap.content_handling import CONTENT_TYPES
from org_fedora_oscap import rule_handling

from org_fedora_oscap.common import _
Expand Down Expand Up @@ -191,6 +193,47 @@ def _verify_fingerprint(self, dest_filename, fingerprint=""):
raise content_handling.ContentCheckError(msg)
log.info(f"Integrity check passed using {hash_obj.name} hash")

def allow_one_expected_tailoring_or_no_tailoring(self, labelled_files):
expected_tailoring = self._addon_data.tailoring_path
tailoring_label = CONTENT_TYPES["TAILORING"]
if expected_tailoring:
labelled_files = self.reduce_files(labelled_files, expected_tailoring, [tailoring_label])
else:
labelled_files = {
path: label for path, label in labelled_files.items()
if label != tailoring_label
}
return labelled_files

def filter_discovered_content(self, labelled_files):
expected_path = self._addon_data.content_path
categories = (CONTENT_TYPES["DATASTREAM"], CONTENT_TYPES["XCCDF_CHECKLIST"])
if expected_path:
labelled_files = self.reduce_files(labelled_files, expected_path, categories)

labelled_files = self.allow_one_expected_tailoring_or_no_tailoring(labelled_files)

expected_path = self._addon_data.cpe_path
categories = (CONTENT_TYPES["CPE_DICT"], )
if expected_path:
labelled_files = self.reduce_files(labelled_files, expected_path, categories)

return labelled_files

def reduce_files(self, labelled_files, expected_path, categories):
reduced_files = dict()
if expected_path not in labelled_files:
msg = (
f"Expected a file {expected_path} to be part of the supplied content, "
f"but it was not the case, got only {list(labelled_files.keys())}"
)
raise RuntimeError(msg)
for path, label in labelled_files.items():
if label in categories and path != expected_path:
continue
reduced_files[path] = label
return reduced_files

def _finish_actual_fetch(self, wait_for, fingerprint, report_callback, dest_filename):
if wait_for:
log.info(f"OSCAP Addon: Waiting for thread {wait_for}")
Expand All @@ -209,9 +252,15 @@ def _finish_actual_fetch(self, wait_for, fingerprint, report_callback, dest_file
if content_type in ("archive", "rpm"):
structured_content.add_content_archive(dest_filename)

labelled_files = content_handling.identify_files(fpaths)
for fname, label in labelled_files.items():
structured_content.add_file(fname, label)
labelled_filenames = content_handling.identify_files(fpaths)
labelled_relative_filenames = {
os.path.relpath(path, self.CONTENT_DOWNLOAD_LOCATION): label
for path, label in labelled_filenames.items()}
labelled_relative_filenames = self.filter_discovered_content(labelled_relative_filenames)

for rel_fname, label in labelled_relative_filenames.items():
fname = self.CONTENT_DOWNLOAD_LOCATION / rel_fname
structured_content.add_file(str(fname), label)

if fingerprint and dest_filename:
structured_content.record_verification(dest_filename)
Expand Down
45 changes: 5 additions & 40 deletions org_fedora_oscap/content_handling.py
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,11 @@ def get_doc_type(file_path):
if line.startswith("Document type:"):
_prefix, _sep, type_info = line.partition(":")
content_type = type_info.strip()
if content_type not in CONTENT_TYPES.values():
log.info(
f"File {file_path} labelled by oscap as {content_type}, "
"which is an unexpected type.")
content_type = f"unknown - {content_type}"
break
except OSError:
# 'oscap info' exitted with a non-zero exit code -> unknown doc
Expand All @@ -136,43 +141,3 @@ def get_doc_type(file_path):
log.info("OSCAP addon: Identified {file_path} as {content_type}"
.format(file_path=file_path, content_type=content_type))
return content_type


def explore_content_files(fpaths):
"""
Function for finding content files in a list of file paths. SIMPLY PICKS
THE FIRST USABLE CONTENT FILE OF A PARTICULAR TYPE AND JUST PREFERS DATA
STREAMS OVER STANDALONE BENCHMARKS.

:param fpaths: a list of file paths to search for content files in
:type fpaths: [str]
:return: ContentFiles instance containing the file names of the XCCDF file,
CPE dictionary and tailoring file or "" in place of those items
if not found
:rtype: ContentFiles

"""
xccdf_file = ""
cpe_file = ""
tailoring_file = ""
found_ds = False

for fpath in fpaths:
doc_type = get_doc_type(fpath)
if not doc_type:
continue

# prefer DS over standalone XCCDF
if doc_type == "Source Data Stream" and (not xccdf_file or not found_ds):
xccdf_file = fpath
found_ds = True
elif doc_type == "XCCDF Checklist" and not xccdf_file:
xccdf_file = fpath
elif doc_type == "CPE Dictionary" and not cpe_file:
cpe_file = fpath
elif doc_type == "XCCDF Tailoring" and not tailoring_file:
tailoring_file = fpath

# TODO: raise exception if no xccdf_file is found?
files = ContentFiles(xccdf_file, cpe_file, tailoring_file)
return files
48 changes: 48 additions & 0 deletions tests/test_content_discovery.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
import pytest

import org_fedora_oscap.content_discovery as tested_module


@pytest.fixture
def labelled_files():
return {
"dir/datastream": "D",
"dir/datastream2": "D",
"dir/dir/datastream3": "D",
"dir/dir/datastream3": "D",
"dir/XCCDF": "X",
"XCCDF2": "X",
"cpe": "C",
"t1": "T",
"dir3/t2": "T",
}


def test_reduce(labelled_files):
bringer = tested_module.ContentBringer(None)

d_count = 0
x_count = 0
for l in labelled_files.values():
if l == "D":
d_count += 1
elif l == "X":
x_count += 1

reduced = bringer.reduce_files(labelled_files, "dir/datastream", ["D"])
assert len(reduced) == len(labelled_files) - d_count + 1
assert "dir/datastream" in reduced

reduced = bringer.reduce_files(labelled_files, "dir/datastream", ["D", "X"])
assert len(reduced) == len(labelled_files) - d_count - x_count + 1
assert "dir/datastream" in reduced

reduced = bringer.reduce_files(labelled_files, "dir/XCCDF", ["D", "X"])
assert len(reduced) == len(labelled_files) - d_count - x_count + 1
assert "dir/XCCDF" in reduced

with pytest.raises(RuntimeError, match="dir/datastream4"):
bringer.reduce_files(labelled_files, "dir/datastream4", ["D"])

reduced = bringer.reduce_files(labelled_files, "cpe", ["C"])
assert reduced == labelled_files