Skip to content

Commit

Permalink
split off xml-parsing code from class Datastream
Browse files Browse the repository at this point in the history
Signed-off-by: Jiri Jaburek <[email protected]>
  • Loading branch information
comps committed Aug 30, 2024
1 parent 8c16a4d commit 5856401
Showing 1 changed file with 101 additions and 86 deletions.
187 changes: 101 additions & 86 deletions lib/oscap.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,46 @@
from lib import util, results


def parse_xml(path):
"""
Parse an XML file, yielding tuples of
(frames, elements)
where each is an ordered list of namespace-free tag names ('frames') and the
actual ElementTree objects ('elements') as it appears during a top-down
recursive traversal.
The yielded tuples are returned as child-first (as the parser *exits* the
elements) in order to return complete Element objects.
Ie. for a <Tag1> containing <Tag2>, this would yield:
(['Tag1', 'Tag2'], [Element <Tag1> at 0x...>, <Element 'Tag2' at 0x...>])
(['Tag1'], [Element <Tag1> at 0x...>])
The intention is for the caller to match a specific part of the XML file
by comparing the last N members of the frames list, and/or the element list,
extracting further details from the last element.
"""
# parse input XML tream in 10KB binary chunks (arbitrary reasonable value),
# pass them to ElementTree parser, which returns element start/end events
parser = ET.XMLPullParser(events=['start', 'end'])
frames = []
elements = []
with open(path, 'rb') as f:
while True:
chunk = f.read(10000)
if not chunk:
break
parser.feed(chunk)
for event, elem in parser.read_events():
if event == 'start':
frames.append(elem.tag.partition('}')[2] or elem.tag)
elements.append(elem)
else:
yield (frames, elements)
frames.pop()
elements.pop()


class Datastream:
FixType = enum.Flag('FixType', ['bash', 'ansible'])

Expand Down Expand Up @@ -37,94 +77,69 @@ def make_profile():
def make_rule():
return types.SimpleNamespace(fixes=self.FixType(0))
self.rules = collections.defaultdict(make_rule)
self._parse_xml(xml_file)
self._parse_datastream_xml(xml_file)
self.path = Path(xml_file)

def _parse_xml(self, xml_file):
# parse input XML datastream in 10KB binary chunks (arbitrary
# reasonable value), pass them to the ElementTree parser, which
# returns element start/end events
parser = ET.XMLPullParser(events=['start', 'end'])
# this is a stack of parsed XML elements - it gets filled up as we
# recurse deeper into the XML element tree
stack = []
with open(xml_file, 'rb') as f:
while True:
chunk = f.read(10000)
if not chunk:
break
parser.feed(chunk)

for event, elem in parser.read_events():
if event == 'start':
stack.append(elem)
else:
# optimize a bit - filter out elements too shallow for anything below
if len(stack) < 4:
stack.pop()
continue

# the logic below tries to match the last one/two elements
# in the stack, to hopefully limit false positive matches
# elsewhere in the XML tree (if only element name was used)
# - note that this runs on the 'end' parser event, so child
# elements will appear before parent ones

# transform stack elements into namespace-free tag names, for
# easier tag name matching
frames = [elem.tag.partition('}')[2] or elem.tag for elem in stack]

# profiles
if frames[-1] == 'Profile':
profile = stack[-1].get('id')
# TODO: use str.removeprefix on python 3.9+
profile = re.sub('^xccdf_org.ssgproject.content_profile_', '', profile)
self.profiles[profile] # let defaultdict fill in the values

# profile contents
elif frames[-2] == 'Profile':
profile = stack[-2].get('id')
# TODO: use str.removeprefix on python 3.9+
profile = re.sub('^xccdf_org.ssgproject.content_profile_', '', profile)
# title
if frames[-1] == 'title':
text = stack[-1].text
self.profiles[profile].title = text
# rule selection
elif frames[-1] == 'select':
if elem.get('selected') == 'true':
rule = stack[-1].get('idref')
# TODO: use str.removeprefix on python 3.9+
rule = re.sub('^xccdf_org.ssgproject.content_rule_', '', rule)
self.profiles[profile].rules.add(rule)
# variable refinement
elif frames[-1] == 'refine-value':
name = stack[-1].get('idref')
# TODO: use str.removeprefix on python 3.9+
name = re.sub('^xccdf_org.ssgproject.content_value_', '', name)
contents = stack[-1].get('selector')
self.profiles[profile].values.add((name, contents))

# rules
elif frames[-1] == 'Rule':
rule_id = stack[-1].get('id')
# TODO: use str.removeprefix on python 3.9+
rule_id = re.sub('^xccdf_org.ssgproject.content_rule_', '', rule_id)
self.rules[rule_id] # let defaultdict fill in the values

# fixes / remediations
elif frames[-2:] == ['Rule', 'fix']:
system = stack[-1].get('system')
if system == 'urn:xccdf:fix:script:sh':
fix_type = self.FixType.bash
elif system == 'urn:xccdf:fix:script:ansible':
fix_type = self.FixType.ansible
else:
fix_type = None
if fix_type:
for_rule = stack[-1].get('id')
self.rules[for_rule].fixes |= fix_type
stack.pop()
def _parse_datastream_xml(self, xml_file):
for frames, elements in parse_xml(xml_file):
# optimize a bit - filter out elements too shallow for anything below
if len(frames) < 4:
continue

# the logic below tries to match the last one/two elements
# in the stack, to hopefully limit false positive matches
# elsewhere in the XML tree (if only element name was used)

# profiles
if frames[-1] == 'Profile':
profile = elements[-1].get('id')
# TODO: use str.removeprefix on python 3.9+
profile = re.sub('^xccdf_org.ssgproject.content_profile_', '', profile)
self.profiles[profile] # let defaultdict fill in the values

# profile contents
elif frames[-2] == 'Profile':
profile = elements[-2].get('id')
# TODO: use str.removeprefix on python 3.9+
profile = re.sub('^xccdf_org.ssgproject.content_profile_', '', profile)
# title
if frames[-1] == 'title':
text = elements[-1].text
self.profiles[profile].title = text
# rule selection
elif frames[-1] == 'select':
if elements[-1].get('selected') == 'true':
rule = elements[-1].get('idref')
# TODO: use str.removeprefix on python 3.9+
rule = re.sub('^xccdf_org.ssgproject.content_rule_', '', rule)
self.profiles[profile].rules.add(rule)
# variable refinement
elif frames[-1] == 'refine-value':
name = elements[-1].get('idref')
# TODO: use str.removeprefix on python 3.9+
name = re.sub('^xccdf_org.ssgproject.content_value_', '', name)
contents = elements[-1].get('selector')
self.profiles[profile].values.add((name, contents))

# rules
elif frames[-1] == 'Rule':
rule_id = elements[-1].get('id')
# TODO: use str.removeprefix on python 3.9+
rule_id = re.sub('^xccdf_org.ssgproject.content_rule_', '', rule_id)
self.rules[rule_id] # let defaultdict fill in the values

# fixes / remediations
elif frames[-2:] == ['Rule', 'fix']:
system = elements[-1].get('system')
if system == 'urn:xccdf:fix:script:sh':
fix_type = self.FixType.bash
elif system == 'urn:xccdf:fix:script:ansible':
fix_type = self.FixType.ansible
else:
fix_type = None
if fix_type:
for_rule = elements[-1].get('id')
self.rules[for_rule].fixes |= fix_type

def has_remediation(self, rule):
"""
Expand Down

0 comments on commit 5856401

Please sign in to comment.