From cfffb33e9619c6e441f16e9e80897b1d4c7804ab Mon Sep 17 00:00:00 2001 From: Quentin Kaiser Date: Fri, 14 Apr 2023 14:07:04 +0200 Subject: [PATCH] feat(reporting): report meta-data information about chunks. Allow handlers to provide a dict value as part of a ValidChunk metadata attribute. That dictionnary can contain any relevant metadata information from the perspective of the handler, but we advise handler writers to report parsed information such as header values. This metadata dict is later reported as part of our ChunkReports and available in the JSON report file if the user requested one. The idea is to expose metadata to further analysis steps through the unblob report. For example, a binary analysis toolkit would read the load address and architecture from a uImage chunk to analyze the file extracted from that chunk with the right settings. A note on the 'as_dict' implementation. The initial idea was to implement it in dissect.cstruct (see https://github.com/fox-it/dissect.cstruct/pull/29), but due to expected changes in the project's API I chose to implement it in unblob so we're not dependent on another project. --- tests/test_report.py | 83 ++++++++++++++--------------- unblob/handlers/archive/sevenzip.py | 4 +- unblob/models.py | 28 +++++++++- unblob/report.py | 1 + 4 files changed, 71 insertions(+), 45 deletions(-) diff --git a/tests/test_report.py b/tests/test_report.py index d27cd5d1a8..30816dc523 100644 --- a/tests/test_report.py +++ b/tests/test_report.py @@ -86,6 +86,7 @@ def test_simple_conversion(self): end_offset=384, size=384, is_encrypted=False, + metadata={}, extraction_reports=[], ) ) @@ -135,6 +136,7 @@ def test_simple_conversion(self): "handler_name": "zip", "chunk_id": "test_basic_conversion:id", "is_encrypted": False, + "metadata": {}, "size": 384, "start_offset": 0, }, @@ -180,63 +182,58 @@ def test_exotic_command_output(self): json_text = ProcessResult(results=[task_result]).to_json() decoded_report = json.loads(json_text) - assert decoded_report == [ { - "__typename__": "TaskResult", + "task": { + "path": "/nonexistent", + "depth": 0, + "chunk_id": "", + "__typename__": "Task", + }, "reports": [ { - "__typename__": "ChunkReport", + "chunk_id": "test", + "handler_name": "fail", + "start_offset": 0, "end_offset": 256, + "size": 256, + "is_encrypted": False, + "metadata": {}, "extraction_reports": [ { - "__typename__": "ExtractCommandFailedReport", - "command": "dump all bytes", - "exit_code": 1, "severity": "WARNING", + "command": "dump all bytes", + "stdout": "\x00\x01\x02\x03\x04\x05\x06\x07\x08" + "\t\n\x0b\x0c\r\x0e\x0f\x10\x11\x12\x13\x14\x15" + '\x16\x17\x18\x19\x1a\x1b\x1c\x1d\x1e\x1f !"#$' + "%&'()*+,-./0123456789:;<=>?@ABCDEFGHIJKLMNOPQR" + "STUVWXYZ[\\]^_`abcdefghijklmnopqrstuvwxyz{|}~" + "\x7f\udc80\udc81\udc82\udc83\udc84\udc85\udc86" + "\udc87\udc88\udc89\udc8a\udc8b\udc8c\udc8d\udc8e" + "\udc8f\udc90\udc91\udc92\udc93\udc94\udc95\udc96" + "\udc97\udc98\udc99\udc9a\udc9b\udc9c\udc9d\udc9e" + "\udc9f\udca0\udca1\udca2\udca3\udca4\udca5\udca6" + "\udca7\udca8\udca9\udcaa\udcab\udcac\udcad\udcae" + "\udcaf\udcb0\udcb1\udcb2\udcb3\udcb4\udcb5\udcb6" + "\udcb7\udcb8\udcb9\udcba\udcbb\udcbc\udcbd\udcbe" + "\udcbf\udcc0\udcc1\udcc2\udcc3\udcc4\udcc5\udcc6" + "\udcc7\udcc8\udcc9\udcca\udccb\udccc\udccd\udcce" + "\udccf\udcd0\udcd1\udcd2\udcd3\udcd4\udcd5\udcd6" + "\udcd7\udcd8\udcd9\udcda\udcdb\udcdc\udcdd\udcde" + "\udcdf\udce0\udce1\udce2\udce3\udce4\udce5\udce6" + "\udce7\udce8\udce9\udcea\udceb\udcec\udced\udcee" + "\udcef\udcf0\udcf1\udcf2\udcf3\udcf4\udcf5\udcf6" + "\udcf7\udcf8\udcf9\udcfa\udcfb\udcfc\udcfd\udcfe\udcff", "stderr": "stdout is pretty strange ;)", - "stdout": ( - "b'\\x00\\x01\\x02\\x03\\x04\\x05\\x06\\x07" - "\\x08\\t\\n\\x0b\\x0c\\r\\x0e\\x0f" - "\\x10\\x11\\x12\\x13\\x14\\x15\\x16\\x17" - '\\x18\\x19\\x1a\\x1b\\x1c\\x1d\\x1e\\x1f !"#' - "$%&\\'()*+,-./0123456789:;<=>?@AB" - "CDEFGHIJKLMNOPQRSTUVWXYZ[\\\\]^_`a" - "bcdefghijklmnopqrstuvwxyz{|}~\\x7f" - "\\x80\\x81\\x82\\x83\\x84\\x85\\x86\\x87" - "\\x88\\x89\\x8a\\x8b\\x8c\\x8d\\x8e\\x8f" - "\\x90\\x91\\x92\\x93\\x94\\x95\\x96\\x97" - "\\x98\\x99\\x9a\\x9b\\x9c\\x9d\\x9e\\x9f" - "\\xa0\\xa1\\xa2\\xa3\\xa4\\xa5\\xa6\\xa7" - "\\xa8\\xa9\\xaa\\xab\\xac\\xad\\xae\\xaf" - "\\xb0\\xb1\\xb2\\xb3\\xb4\\xb5\\xb6\\xb7" - "\\xb8\\xb9\\xba\\xbb\\xbc\\xbd\\xbe\\xbf" - "\\xc0\\xc1\\xc2\\xc3\\xc4\\xc5\\xc6\\xc7" - "\\xc8\\xc9\\xca\\xcb\\xcc\\xcd\\xce\\xcf" - "\\xd0\\xd1\\xd2\\xd3\\xd4\\xd5\\xd6\\xd7" - "\\xd8\\xd9\\xda\\xdb\\xdc\\xdd\\xde\\xdf" - "\\xe0\\xe1\\xe2\\xe3\\xe4\\xe5\\xe6\\xe7" - "\\xe8\\xe9\\xea\\xeb\\xec\\xed\\xee\\xef" - "\\xf0\\xf1\\xf2\\xf3\\xf4\\xf5\\xf6\\xf7" - "\\xf8\\xf9\\xfa\\xfb\\xfc\\xfd\\xfe\\xff" - "'" - ), + "exit_code": 1, + "__typename__": "ExtractCommandFailedReport", } ], - "handler_name": "fail", - "chunk_id": "test", - "is_encrypted": False, - "size": 256, - "start_offset": 0, + "__typename__": "ChunkReport", } ], "subtasks": [], - "task": { - "__typename__": "Task", - "chunk_id": "", - "depth": 0, - "path": "/nonexistent", - }, + "__typename__": "TaskResult", } ] diff --git a/unblob/handlers/archive/sevenzip.py b/unblob/handlers/archive/sevenzip.py index 040b409293..511e8a350f 100644 --- a/unblob/handlers/archive/sevenzip.py +++ b/unblob/handlers/archive/sevenzip.py @@ -70,4 +70,6 @@ def calculate_chunk(self, file: File, start_offset: int) -> Optional[ValidChunk] # We read the signature header here to get the offset to the header database first_db_header = start_offset + len(header) + header.next_header_offset end_offset = first_db_header + header.next_header_size - return ValidChunk(start_offset=start_offset, end_offset=end_offset) + return ValidChunk( + start_offset=start_offset, end_offset=end_offset, metadata=header + ) diff --git a/unblob/models.py b/unblob/models.py index 2b8431fa73..0822ded784 100644 --- a/unblob/models.py +++ b/unblob/models.py @@ -6,6 +6,7 @@ from typing import List, Optional, Tuple, Type import attr +from dissect.cstruct import Instance from structlog import get_logger from .file_utils import Endian, File, InvalidInputFormat, StructParser @@ -21,6 +22,29 @@ # +def as_dict(obj) -> dict: + """Convert a Python class instance to a dictionary.""" + if isinstance(obj, dict): + return obj + if isinstance(obj, list): + return [as_dict(item) for item in obj] # type: ignore + if isinstance(obj, Instance): + result = {} + for k, v in obj._values.items(): # noqa: SLF001 + result[k] = v + return result + + result = {} + for key, value in obj.__dict__.items(): + if key.startswith("_"): + continue + if isinstance(value, (list, tuple)): + result[key] = [as_dict(item) for item in value] + else: + result[key] = as_dict(value) + return result + + @attr.define(frozen=True) class Task: path: Path @@ -88,6 +112,7 @@ class ValidChunk(Chunk): handler: "Handler" = attr.ib(init=False, eq=False) is_encrypted: bool = attr.ib(default=False) + metadata: dict = attr.ib(factory=dict, converter=as_dict) def extract(self, inpath: Path, outdir: Path): if self.is_encrypted: @@ -108,6 +133,7 @@ def as_report(self, extraction_reports: List[Report]) -> ChunkReport: size=self.size, handler_name=self.handler.NAME, is_encrypted=self.is_encrypted, + metadata=self.metadata, extraction_reports=extraction_reports, ) @@ -190,7 +216,7 @@ def default(self, obj): try: return obj.decode() except UnicodeDecodeError: - return str(obj) + return obj.decode("utf-8", errors="surrogateescape") logger.error("JSONEncoder met a non-JSON encodable value", obj=obj) # the usual fail path of custom JSONEncoders is to call the parent and let it fail diff --git a/unblob/report.py b/unblob/report.py index 1b5bed1e71..967ef070fd 100644 --- a/unblob/report.py +++ b/unblob/report.py @@ -181,6 +181,7 @@ class ChunkReport(Report): end_offset: int size: int is_encrypted: bool + metadata: dict = attr.ib(factory=dict) extraction_reports: List[Report]