diff --git a/.github/workflows/main.yml b/.github/workflows/main.yml index 1f53ffc530..79a586b1b4 100644 --- a/.github/workflows/main.yml +++ b/.github/workflows/main.yml @@ -66,4 +66,5 @@ jobs: uses: ./.github/actions/setup-git-lfs - name: Run pytest + timeout-minutes: 4 run: poetry run pytest -vvv diff --git a/setup.cfg b/setup.cfg index 1a74d71706..1a135bddd0 100644 --- a/setup.cfg +++ b/setup.cfg @@ -1,6 +1,9 @@ [flake8] # We start with a strict setting, this is ranked as A in radon max-complexity = 8 -ignore = E501,W503 +ignore = + E501 # line too long + W503 # line break before binary operator + E203 # whitespace before ':' - known black/flake8 incompatibility exclude = __init__.py diff --git a/tests/test_handlers.py b/tests/test_handlers.py index fbb43161a7..507a99979a 100644 --- a/tests/test_handlers.py +++ b/tests/test_handlers.py @@ -7,10 +7,13 @@ - The expected output in the __output__ folder. """ +import hashlib import inspect +from collections import Counter from pathlib import Path from typing import Type +import attr import pytest from unblob import handlers @@ -39,6 +42,111 @@ def test_all_handlers( check_output_is_the_same(output_dir, extraction_config.extract_root) +BLOCK_SHIFTING_PREFIX = bytes([0]) + b"unique unknown prefix" + bytes([0]) +PADDING_CHECK_SUFFIX = bytes([0] * 511) + b"unique unknown suffix" + bytes([0]) + + +@pytest.mark.parametrize( + "input_dir, output_dir", gather_integration_tests(TEST_DATA_PATH) +) +@pytest.mark.parametrize( + "prefix, suffix", + [ + # pytest.param(b"", b"", id="no-extras"), + pytest.param(BLOCK_SHIFTING_PREFIX, b"", id="block-shifted"), + pytest.param(b"", PADDING_CHECK_SUFFIX, id="padding-check"), + pytest.param( + BLOCK_SHIFTING_PREFIX, + PADDING_CHECK_SUFFIX, + id="block-shifted-padding-check", + ), + ], +) +def test_all_handlers_chunk_stability( + input_dir: Path, + output_dir: Path, + extraction_config: ExtractionConfig, + tmp_path: Path, + prefix: bytes, + suffix: bytes, +): + """Test that handlers tolerate a non-empty unknown chunk prefix/suffix""" + altered_input_file = tmp_path / "input_file" + + for input_file in input_dir.iterdir(): + altered_input_file.write_bytes(prefix + input_file.read_bytes() + suffix) + + config = attr.evolve( + extraction_config, + extract_root=extraction_config.extract_root / input_file.name, + ) + reports = process_file(config, altered_input_file) + check_result(reports) + + check_output_do_not_change_much_due_to_extras( + input_file, + expected=output_dir / (input_file.name + config.extract_suffix), + actual=config.extract_root, + ) + + +def hash_bytes(data: bytes) -> str: + return hashlib.sha512(data).hexdigest() + + +def hash_dir(root: Path, print=lambda *_: None) -> Counter: + """Hash all files under an unblob extraction directory :root:, ignoring test prefix/suffix. + + Directory structures having the same set of files will result in the same output, + even if the file names are different or the directory structure is different. + + Test prefix/suffix is excluded from hash calculation, so that unknown chunks extended + with them will produce the same hash. + + Returns: count of each hashes found + """ + hash_counter = Counter() + for path in sorted(root.rglob("*")): + if not path.is_file() or path.name == ".gitkeep": + continue + + content = path.read_bytes() + # ignore newly introduced unknown chunk files + if content in (BLOCK_SHIFTING_PREFIX, PADDING_CHECK_SUFFIX): + continue + + # remove extras introduced before hashing + if content.startswith(BLOCK_SHIFTING_PREFIX): + content = content[len(BLOCK_SHIFTING_PREFIX) :] + if content.endswith(PADDING_CHECK_SUFFIX): + content = content[: -len(PADDING_CHECK_SUFFIX)] + + hash = hash_bytes(content) + hash_counter[hash] += 1 + # Output for debugging failures + print(f" {path} =\n {hash}") + return hash_counter + + +def check_output_do_not_change_much_due_to_extras( + original_input_file: Path, expected: Path, actual: Path +): + print(f"{expected=}") + expected_counts = hash_dir(expected, print=print) + print(f"{actual=}") + actual_counts = hash_dir(actual, print=print) + + # original input will show up in the extraction of the modified input due to the extra unknown chunks + # but it might not show up in the expected output if it was a "whole file chunk" + hash_original_input = hash_bytes(original_input_file.read_bytes()) + if hash_original_input not in expected_counts: + print(f"Warn: hash of original input: {hash_original_input} not in expected") + assert actual_counts[hash_original_input] <= 1 + del actual_counts[hash_original_input] + + assert expected_counts == actual_counts + + @pytest.mark.parametrize( "handler", (pytest.param(handler, id=handler.NAME) for handler in handlers.BUILTIN_HANDLERS), diff --git a/unblob/handlers/filesystem/romfs.py b/unblob/handlers/filesystem/romfs.py index d6d1abdddf..09c349dbbb 100644 --- a/unblob/handlers/filesystem/romfs.py +++ b/unblob/handlers/filesystem/romfs.py @@ -10,7 +10,7 @@ from unblob.extractor import is_safe_path -from ...file_utils import Endian, InvalidInputFormat, read_until_past, round_up +from ...file_utils import Endian, InvalidInputFormat, round_up from ...models import Extractor, File, HexString, StructHandler, ValidChunk logger = get_logger() @@ -25,6 +25,7 @@ WORLD_RWX = 0o777 ROMFS_HEADER_SIZE = 512 ROMFS_SIGNATURE = b"-rom1fs-" +PADDING_ALIGNMENT = 1024 @unique @@ -361,29 +362,31 @@ class RomFSFSHandler(StructHandler): def calculate_chunk(self, file: File, start_offset: int) -> Optional[ValidChunk]: - if not valid_checksum(file.read(512)): + checksum = file.read(512) + if not valid_checksum(checksum): raise InvalidInputFormat("Invalid RomFS checksum.") - file.seek(-512, io.SEEK_CUR) + file.seek(-len(checksum), io.SEEK_CUR) # Every multi byte value must be in big endian order. header = self.parse_header(file, Endian.BIG) - # The zero terminated name of the volume, padded to 16 byte boundary. - get_string(file) - - # seek filesystem size (number of accessible bytes in this fs) - # from the actual end of the header - file.seek(header.full_size, io.SEEK_CUR) - # Another thing to note is that romfs works on file headers and data # aligned to 16 byte boundaries, but most hardware devices and the block # device drivers are unable to cope with smaller than block-sized data. # To overcome this limitation, the whole size of the file system must be # padded to an 1024 byte boundary. - read_until_past(file, b"\x00") + end_offset = start_offset + round_up( + len(header) + header.full_size, PADDING_ALIGNMENT + ) + fs_size = len(header) + header.full_size + padding_size = round_up(fs_size, PADDING_ALIGNMENT) - fs_size + if padding_size: + file.seek(start_offset + fs_size, io.SEEK_SET) + if file.read(padding_size) != bytes([0]) * padding_size: + raise InvalidInputFormat("Invalid padding.") return ValidChunk( start_offset=start_offset, - end_offset=file.tell(), + end_offset=end_offset, ) diff --git a/unblob/testing.py b/unblob/testing.py index e77957e1a3..f513f30b99 100644 --- a/unblob/testing.py +++ b/unblob/testing.py @@ -41,7 +41,7 @@ def gather_integration_tests(test_data_path: Path): @pytest.fixture def extraction_config(tmp_path: Path): config = ExtractionConfig( - extract_root=tmp_path, + extract_root=tmp_path / "extract_root", entropy_depth=0, keep_extracted_chunks=True, )