diff --git a/lib/oscap.py b/lib/oscap.py index e8dab49..53a1b3b 100644 --- a/lib/oscap.py +++ b/lib/oscap.py @@ -10,6 +10,46 @@ from lib import util, results +def parse_xml(path): + """ + Parse an XML file, yielding tuples of + (frames, elements) + where each is an ordered list of namespace-free tag names ('frames') and the + actual ElementTree objects ('elements') as it appears during a top-down + recursive traversal. + The yielded tuples are returned as child-first (as the parser *exits* the + elements) in order to return complete Element objects. + + Ie. for a containing , this would yield: + + (['Tag1', 'Tag2'], [Element at 0x...>, ]) + (['Tag1'], [Element at 0x...>]) + + The intention is for the caller to match a specific part of the XML file + by comparing the last N members of the frames list, and/or the element list, + extracting further details from the last element. + """ + # parse input XML tream in 10KB binary chunks (arbitrary reasonable value), + # pass them to ElementTree parser, which returns element start/end events + parser = ET.XMLPullParser(events=['start', 'end']) + frames = [] + elements = [] + with open(path, 'rb') as f: + while True: + chunk = f.read(10000) + if not chunk: + break + parser.feed(chunk) + for event, elem in parser.read_events(): + if event == 'start': + frames.append(elem.tag.partition('}')[2] or elem.tag) + elements.append(elem) + else: + yield (frames, elements) + frames.pop() + elements.pop() + + class Datastream: FixType = enum.Flag('FixType', ['bash', 'ansible']) @@ -37,94 +77,69 @@ def make_profile(): def make_rule(): return types.SimpleNamespace(fixes=self.FixType(0)) self.rules = collections.defaultdict(make_rule) - self._parse_xml(xml_file) + self._parse_datastream_xml(xml_file) self.path = Path(xml_file) - def _parse_xml(self, xml_file): - # parse input XML datastream in 10KB binary chunks (arbitrary - # reasonable value), pass them to the ElementTree parser, which - # returns element start/end events - parser = ET.XMLPullParser(events=['start', 'end']) - # this is a stack of parsed XML elements - it gets filled up as we - # recurse deeper into the XML element tree - stack = [] - with open(xml_file, 'rb') as f: - while True: - chunk = f.read(10000) - if not chunk: - break - parser.feed(chunk) - - for event, elem in parser.read_events(): - if event == 'start': - stack.append(elem) - else: - # optimize a bit - filter out elements too shallow for anything below - if len(stack) < 4: - stack.pop() - continue - - # the logic below tries to match the last one/two elements - # in the stack, to hopefully limit false positive matches - # elsewhere in the XML tree (if only element name was used) - # - note that this runs on the 'end' parser event, so child - # elements will appear before parent ones - - # transform stack elements into namespace-free tag names, for - # easier tag name matching - frames = [elem.tag.partition('}')[2] or elem.tag for elem in stack] - - # profiles - if frames[-1] == 'Profile': - profile = stack[-1].get('id') - # TODO: use str.removeprefix on python 3.9+ - profile = re.sub('^xccdf_org.ssgproject.content_profile_', '', profile) - self.profiles[profile] # let defaultdict fill in the values - - # profile contents - elif frames[-2] == 'Profile': - profile = stack[-2].get('id') - # TODO: use str.removeprefix on python 3.9+ - profile = re.sub('^xccdf_org.ssgproject.content_profile_', '', profile) - # title - if frames[-1] == 'title': - text = stack[-1].text - self.profiles[profile].title = text - # rule selection - elif frames[-1] == 'select': - if elem.get('selected') == 'true': - rule = stack[-1].get('idref') - # TODO: use str.removeprefix on python 3.9+ - rule = re.sub('^xccdf_org.ssgproject.content_rule_', '', rule) - self.profiles[profile].rules.add(rule) - # variable refinement - elif frames[-1] == 'refine-value': - name = stack[-1].get('idref') - # TODO: use str.removeprefix on python 3.9+ - name = re.sub('^xccdf_org.ssgproject.content_value_', '', name) - contents = stack[-1].get('selector') - self.profiles[profile].values.add((name, contents)) - - # rules - elif frames[-1] == 'Rule': - rule_id = stack[-1].get('id') - # TODO: use str.removeprefix on python 3.9+ - rule_id = re.sub('^xccdf_org.ssgproject.content_rule_', '', rule_id) - self.rules[rule_id] # let defaultdict fill in the values - - # fixes / remediations - elif frames[-2:] == ['Rule', 'fix']: - system = stack[-1].get('system') - if system == 'urn:xccdf:fix:script:sh': - fix_type = self.FixType.bash - elif system == 'urn:xccdf:fix:script:ansible': - fix_type = self.FixType.ansible - else: - fix_type = None - if fix_type: - for_rule = stack[-1].get('id') - self.rules[for_rule].fixes |= fix_type - stack.pop() + def _parse_datastream_xml(self, xml_file): + for frames, elements in parse_xml(xml_file): + # optimize a bit - filter out elements too shallow for anything below + if len(frames) < 4: + continue + + # the logic below tries to match the last one/two elements + # in the stack, to hopefully limit false positive matches + # elsewhere in the XML tree (if only element name was used) + + # profiles + if frames[-1] == 'Profile': + profile = elements[-1].get('id') + # TODO: use str.removeprefix on python 3.9+ + profile = re.sub('^xccdf_org.ssgproject.content_profile_', '', profile) + self.profiles[profile] # let defaultdict fill in the values + + # profile contents + elif frames[-2] == 'Profile': + profile = elements[-2].get('id') + # TODO: use str.removeprefix on python 3.9+ + profile = re.sub('^xccdf_org.ssgproject.content_profile_', '', profile) + # title + if frames[-1] == 'title': + text = elements[-1].text + self.profiles[profile].title = text + # rule selection + elif frames[-1] == 'select': + if elements[-1].get('selected') == 'true': + rule = elements[-1].get('idref') + # TODO: use str.removeprefix on python 3.9+ + rule = re.sub('^xccdf_org.ssgproject.content_rule_', '', rule) + self.profiles[profile].rules.add(rule) + # variable refinement + elif frames[-1] == 'refine-value': + name = elements[-1].get('idref') + # TODO: use str.removeprefix on python 3.9+ + name = re.sub('^xccdf_org.ssgproject.content_value_', '', name) + contents = elements[-1].get('selector') + self.profiles[profile].values.add((name, contents)) + + # rules + elif frames[-1] == 'Rule': + rule_id = elements[-1].get('id') + # TODO: use str.removeprefix on python 3.9+ + rule_id = re.sub('^xccdf_org.ssgproject.content_rule_', '', rule_id) + self.rules[rule_id] # let defaultdict fill in the values + + # fixes / remediations + elif frames[-2:] == ['Rule', 'fix']: + system = elements[-1].get('system') + if system == 'urn:xccdf:fix:script:sh': + fix_type = self.FixType.bash + elif system == 'urn:xccdf:fix:script:ansible': + fix_type = self.FixType.ansible + else: + fix_type = None + if fix_type: + for_rule = elements[-1].get('id') + self.rules[for_rule].fixes |= fix_type def has_remediation(self, rule): """