diff --git a/src/analyzeMFT/cli.py b/src/analyzeMFT/cli.py index 05158a3..fa95ae4 100644 --- a/src/analyzeMFT/cli.py +++ b/src/analyzeMFT/cli.py @@ -55,28 +55,38 @@ async def main(): if not options.export_format: options.export_format = "csv" - - analyzer = MftAnalyzer(options.filename, options.output_file, options.debug, options.verbosity, options.compute_hashes, options.export_format) - await analyzer.analyze() - print(f"Analysis complete. Results written to {options.output_file}") - try: - analyzer = MftAnalyzer(options.filename, options.output_file, options.debug, options.compute_hashes, options.export_format) + analyzer = MftAnalyzer(options.filename, options.output_file, options.debug, options.verbosity, options.compute_hashes, options.export_format) + await analyzer.analyze() + print(f"Analysis complete. Results written to {options.output_file}") + except FileNotFoundError: + print(f"Error: The file '{options.filename}' was not found.") sys.exit(1) + except PermissionError: + print(f"Error: Permission denied when trying to read '{options.filename}' or write to '{options.output_file}'.") sys.exit(1) + except Exception as e: + print(f"An unexpected error occurred: {str(e)}") + if options.debug: import traceback traceback.print_exc() + sys.exit(1) if __name__ == "__main__": - asyncio.run(main()) \ No newline at end of file + try: + asyncio.run(main()) + except KeyboardInterrupt: + print("\nScript terminated by user.") + except Exception as e: + print(f"An unexpected error occurred: {e}") \ No newline at end of file diff --git a/src/analyzeMFT/mft_analyzer.py b/src/analyzeMFT/mft_analyzer.py index 919644e..aee2c8a 100644 --- a/src/analyzeMFT/mft_analyzer.py +++ b/src/analyzeMFT/mft_analyzer.py @@ -1,6 +1,7 @@ import asyncio import csv import io +import signal import sys import traceback from typing import Dict, Set, List, Optional, Any @@ -9,24 +10,20 @@ from .file_writers import FileWriters class MftAnalyzer: - - def __init__(self, mft_file: str, output_file: str, debug: bool = False, very_debug: bool = False, - verbosity: int = 0, compute_hashes: bool = False, export_format: str = "csv") -> None: + def __init__(self, mft_file: str, output_file: str, debug: int = 0, verbosity: int = 0, + compute_hashes: bool = False, export_format: str = "csv") -> None: self.mft_file = mft_file self.output_file = output_file self.debug = debug - self.very_debug = very_debug - self.verbosity = verbosity - self.compute_hashes = compute_hashes - self.export_format = export_format + self.verbosity = int(verbosity) self.compute_hashes = compute_hashes self.export_format = export_format - self.mft_records = {} - self.interrupt_flag = asyncio.Event() - self.csvfile = None self.csv_writer = None - + self.interrupt_flag = asyncio.Event() + self.setup_interrupt_handler() + + self.mft_records = {} self.stats = { 'total_records': 0, 'active_records': 0, @@ -41,12 +38,26 @@ def __init__(self, mft_file: str, output_file: str, debug: bool = False, very_de 'unique_crc32': set(), }) + def setup_interrupt_handler(self): + def interrupt_handler(signum, frame): + self.log("Interrupt received. Cleaning up...", 1) + self.interrupt_flag.set() + + if sys.platform == "win32": # Windows is evil ... + import win32api + win32api.SetConsoleCtrlHandler(lambda x: interrupt_handler(None, None), True) + + else: # On a proper operating system ... + signal.signal(signal.SIGINT, interrupt_handler) + signal.signal(signal.SIGTERM, interrupt_handler) + def log(self, message: str, level: int = 0): if level <= self.debug or level <= self.verbosity: print(message) async def analyze(self) -> None: try: + self.log("Starting MFT analysis...", 1) self.initialize_csv_writer() await self.process_mft() await self.write_output() @@ -57,6 +68,10 @@ async def analyze(self) -> None: finally: if self.csvfile: self.csvfile.close() + if self.interrupt_flag.is_set(): + self.log("Analysis interrupted by user.", 1) + else: + self.log("Analysis complete.", 1) self.print_statistics() @@ -65,12 +80,14 @@ async def process_mft(self) -> None: try: with open(self.mft_file, 'rb') as f: while not self.interrupt_flag.is_set(): - raw_record = f.read(MFT_RECORD_SIZE) + raw_record = await self.read_record(f) if not raw_record: break try: + self.log(f"Processing record {self.stats['total_records']}", 2) record = MftRecord(raw_record, self.compute_hashes) + self.log(f"Record parsed, recordnum: {record.recordnum}", 2) self.stats['total_records'] += 1 if record.flags & FILE_RECORD_IN_USE: @@ -90,9 +107,14 @@ async def process_mft(self) -> None: if self.stats['total_records'] % 1000 == 0: await self.write_csv_block() self.mft_records.clear() + + if self.interrupt_flag.is_set(): + self.log("Interrupt detected. Stopping processing.", 1) + break except Exception as e: self.log(f"Error processing record {self.stats['total_records']}: {str(e)}", 1) + self.log(f"Raw record (first 100 bytes): {raw_record[:100].hex()}", 2) if self.debug >= 2: traceback.print_exc() continue @@ -104,6 +126,8 @@ async def process_mft(self) -> None: self.log(f"MFT processing complete. Total records processed: {self.stats['total_records']}", 0) + async def read_record(self, file): + return file.read(MFT_RECORD_SIZE) def handle_interrupt(self) -> None: if sys.platform == "win32": @@ -146,11 +170,11 @@ async def write_csv_block(self) -> None: csv_row = [str(item) for item in csv_row] self.csv_writer.writerow(csv_row) - if self.very_debug: + if self.debug: self.log(f"Wrote record {record.recordnum} to CSV", 2) except Exception as e: self.log(f"Error writing record {record.recordnum}: {str(e)}", 1) - if self.very_debug: + if self.debug: traceback.print_exc() if self.csvfile: @@ -158,7 +182,7 @@ async def write_csv_block(self) -> None: self.log(f"CSV block written. Current file size: {self.csvfile.tell() if self.csvfile else 0} bytes", 2) except Exception as e: self.log(f"Error in write_csv_block: {str(e)}", 0) - if self.debug or self.very_debug: + if self.debug: traceback.print_exc() @@ -222,4 +246,10 @@ async def write_output(self) -> None: elif self.export_format == "excel": await FileWriters.write_excel(list(self.mft_records.values()), self.output_file) else: - print(f"Unsupported export format: {self.export_format}") \ No newline at end of file + print(f"Unsupported export format: {self.export_format}") + + async def cleanup(self): + self.log("Performing cleanup...", 1) + # to-do add more cleanup after database stuff is integrated. + await self.write_remaining_records() + self.log("Cleanup complete.", 1) \ No newline at end of file diff --git a/src/analyzeMFT/mft_record.py b/src/analyzeMFT/mft_record.py index cc22053..68be128 100644 --- a/src/analyzeMFT/mft_record.py +++ b/src/analyzeMFT/mft_record.py @@ -10,8 +10,10 @@ class MftRecord: - def __init__(self, raw_record: bytes, compute_hashes: bool = False) -> None: + def __init__(self, raw_record: bytes, compute_hashes: bool = False, debug_level: int = 0, logger=None): self.raw_record = raw_record + self.debug_level = debug_level + self.logger = logger or self._default_logger self.magic = 0 self.upd_off = 0 self.upd_cnt = 0 @@ -65,6 +67,12 @@ def __init__(self, raw_record: bytes, compute_hashes: bool = False) -> None: self.ea = None self.logged_utility_stream = None + def _default_logger(self, message: str, level: int = 0): + if level <= self.debug_level: + print(message) + + def log(self, message: str, level: int = 0): + self.logger(message, level) def parse_record(self) -> None: try: @@ -87,16 +95,20 @@ def parse_record(self) -> None: if hasattr(self, 'debug') and self.debug: print(f"Error parsing MFT record header for record {self.recordnum}") - def parse_attributes(self) -> None: - offset = self.attr_off + def parse_attributes(self): + offset = int(self.attr_off) while offset < len(self.raw_record) - 8: try: - attr_type = struct.unpack(" None: self.parse_logged_utility_stream(offset) offset += attr_len - except struct.error: + + except Exception as e: + print(f"Error processing record {self.recordnum}: {str(e)}") + print(f"attr_type: {attr_type} (type: {type(attr_type)})") + print(f"attr_len: {attr_len} (type: {type(attr_len)})") + print(f"offset: {offset}") + if self.debug >= 2: + traceback.print_exc() offset += 1 def parse_si_attribute(self, offset: int) -> None: