Skip to content

Commit f3f8017

Browse files
committed
Rework some of the async to allow for parallel hash computation
1 parent f0c69e5 commit f3f8017

File tree

4 files changed

+379
-4
lines changed

4 files changed

+379
-4
lines changed

src/analyzeMFT/cli.py

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,10 @@ async def main():
5757
help="Number of records to process in each chunk (default: 1000)")
5858
performance_group.add_option("-H", "--hash", action="store_true", dest="compute_hashes",
5959
help="Compute hashes (MD5, SHA256, SHA512, CRC32)", default=False)
60+
performance_group.add_option("--no-multiprocessing-hashes", action="store_false", dest="multiprocessing_hashes",
61+
help="Disable multiprocessing for hash computation", default=True)
62+
performance_group.add_option("--hash-processes", dest="hash_processes", type="int",
63+
help="Number of processes for hash computation (default: auto-detect)")
6064
parser.add_option_group(performance_group)
6165

6266
# Configuration options
@@ -213,7 +217,9 @@ async def main():
213217
options.compute_hashes,
214218
options.export_format,
215219
profile,
216-
options.chunk_size
220+
options.chunk_size,
221+
options.multiprocessing_hashes,
222+
options.hash_processes
217223
)
218224

219225
await analyzer.analyze()

src/analyzeMFT/hash_processor.py

Lines changed: 317 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,317 @@
1+
"""
2+
Multiprocessing-capable hash computation for MFT records
3+
"""
4+
5+
import hashlib
6+
import zlib
7+
import multiprocessing as mp
8+
from concurrent.futures import ProcessPoolExecutor, as_completed
9+
from typing import List, Dict, Tuple, Optional, Any
10+
import logging
11+
import time
12+
from dataclasses import dataclass
13+
14+
15+
@dataclass
16+
class HashResult:
17+
"""Result container for hash computation"""
18+
record_index: int
19+
md5: str
20+
sha256: str
21+
sha512: str
22+
crc32: str
23+
processing_time: float
24+
25+
26+
def compute_hashes_for_record(data: Tuple[int, bytes]) -> HashResult:
27+
"""
28+
Compute all hashes for a single MFT record.
29+
30+
Args:
31+
data: Tuple of (record_index, raw_record_bytes)
32+
33+
Returns:
34+
HashResult containing all computed hashes
35+
"""
36+
record_index, raw_record = data
37+
start_time = time.time()
38+
39+
# Compute all hashes in one pass
40+
md5 = hashlib.md5()
41+
sha256 = hashlib.sha256()
42+
sha512 = hashlib.sha512()
43+
44+
md5.update(raw_record)
45+
sha256.update(raw_record)
46+
sha512.update(raw_record)
47+
48+
md5_hash = md5.hexdigest()
49+
sha256_hash = sha256.hexdigest()
50+
sha512_hash = sha512.hexdigest()
51+
crc32_hash = format(zlib.crc32(raw_record) & 0xFFFFFFFF, '08x')
52+
53+
processing_time = time.time() - start_time
54+
55+
return HashResult(
56+
record_index=record_index,
57+
md5=md5_hash,
58+
sha256=sha256_hash,
59+
sha512=sha512_hash,
60+
crc32=crc32_hash,
61+
processing_time=processing_time
62+
)
63+
64+
65+
class HashProcessor:
66+
"""
67+
Multiprocessing-capable hash computation processor for MFT records
68+
"""
69+
70+
def __init__(self, num_processes: Optional[int] = None, logger: Optional[logging.Logger] = None):
71+
"""
72+
Initialize the hash processor.
73+
74+
Args:
75+
num_processes: Number of processes to use. If None, uses optimal count.
76+
logger: Logger instance for debugging and progress reporting.
77+
"""
78+
self.num_processes = num_processes or min(mp.cpu_count(), 8) # Cap at 8 to avoid overhead
79+
self.logger = logger or logging.getLogger('analyzeMFT.hash_processor')
80+
self.stats = {
81+
'total_records': 0,
82+
'total_processing_time': 0.0,
83+
'multiprocessing_overhead': 0.0,
84+
'average_time_per_record': 0.0,
85+
'processes_used': self.num_processes
86+
}
87+
88+
def compute_hashes_single_threaded(self, raw_records: List[bytes]) -> List[HashResult]:
89+
"""
90+
Compute hashes using single-threaded processing.
91+
92+
Args:
93+
raw_records: List of raw MFT record bytes
94+
95+
Returns:
96+
List of HashResult objects
97+
"""
98+
start_time = time.time()
99+
results = []
100+
101+
for i, raw_record in enumerate(raw_records):
102+
result = compute_hashes_for_record((i, raw_record))
103+
results.append(result)
104+
105+
total_time = time.time() - start_time
106+
self.stats.update({
107+
'total_records': len(raw_records),
108+
'total_processing_time': total_time,
109+
'average_time_per_record': total_time / len(raw_records) if raw_records else 0,
110+
'processes_used': 1
111+
})
112+
113+
self.logger.debug(f"Single-threaded hash computation: {len(raw_records)} records in {total_time:.3f}s")
114+
return results
115+
116+
def compute_hashes_multiprocessed(self, raw_records: List[bytes]) -> List[HashResult]:
117+
"""
118+
Compute hashes using multiprocessing.
119+
120+
Args:
121+
raw_records: List of raw MFT record bytes
122+
123+
Returns:
124+
List of HashResult objects in original order
125+
"""
126+
if len(raw_records) < 10: # Use single-threaded for small batches
127+
return self.compute_hashes_single_threaded(raw_records)
128+
129+
start_time = time.time()
130+
131+
# Prepare data for multiprocessing
132+
indexed_records = [(i, record) for i, record in enumerate(raw_records)]
133+
134+
results = []
135+
with ProcessPoolExecutor(max_workers=self.num_processes) as executor:
136+
# Submit all tasks
137+
mp_start = time.time()
138+
future_to_index = {
139+
executor.submit(compute_hashes_for_record, data): data[0]
140+
for data in indexed_records
141+
}
142+
143+
# Collect results as they complete
144+
temp_results = {}
145+
for future in as_completed(future_to_index):
146+
result = future.result()
147+
temp_results[result.record_index] = result
148+
149+
# Sort results back to original order
150+
results = [temp_results[i] for i in range(len(raw_records))]
151+
152+
mp_overhead = time.time() - mp_start
153+
total_time = time.time() - start_time
154+
155+
self.stats.update({
156+
'total_records': len(raw_records),
157+
'total_processing_time': total_time,
158+
'multiprocessing_overhead': mp_overhead,
159+
'average_time_per_record': total_time / len(raw_records) if raw_records else 0,
160+
'processes_used': self.num_processes
161+
})
162+
163+
self.logger.debug(f"Multiprocessed hash computation: {len(raw_records)} records in {total_time:.3f}s using {self.num_processes} processes")
164+
return results
165+
166+
def compute_hashes_adaptive(self, raw_records: List[bytes]) -> List[HashResult]:
167+
"""
168+
Adaptively choose between single-threaded and multiprocessed computation
169+
based on the size of the batch and available resources.
170+
171+
Args:
172+
raw_records: List of raw MFT record bytes
173+
174+
Returns:
175+
List of HashResult objects
176+
"""
177+
if not raw_records:
178+
return []
179+
180+
# Adaptive thresholds
181+
mp_threshold = 50 # Use multiprocessing for batches larger than this
182+
cpu_count = mp.cpu_count()
183+
184+
# Use multiprocessing if:
185+
# 1. We have enough records to justify the overhead
186+
# 2. We have multiple CPU cores available
187+
# 3. The batch size is large enough per core
188+
use_multiprocessing = (
189+
len(raw_records) >= mp_threshold and
190+
cpu_count > 1 and
191+
len(raw_records) >= (cpu_count * 10) # At least 10 records per core
192+
)
193+
194+
if use_multiprocessing:
195+
self.logger.info(f"Using multiprocessing for {len(raw_records)} records with {self.num_processes} processes")
196+
return self.compute_hashes_multiprocessed(raw_records)
197+
else:
198+
self.logger.debug(f"Using single-threaded processing for {len(raw_records)} records")
199+
return self.compute_hashes_single_threaded(raw_records)
200+
201+
def get_performance_stats(self) -> Dict[str, Any]:
202+
"""
203+
Get performance statistics from the last computation.
204+
205+
Returns:
206+
Dictionary containing performance metrics
207+
"""
208+
return self.stats.copy()
209+
210+
def log_performance_summary(self):
211+
"""Log a summary of the last computation performance."""
212+
if self.stats['total_records'] == 0:
213+
return
214+
215+
records_per_second = self.stats['total_records'] / self.stats['total_processing_time']
216+
avg_time_ms = self.stats['average_time_per_record'] * 1000
217+
218+
self.logger.info(f"Hash computation performance:")
219+
self.logger.info(f" Records processed: {self.stats['total_records']}")
220+
self.logger.info(f" Total time: {self.stats['total_processing_time']:.3f}s")
221+
self.logger.info(f" Records/second: {records_per_second:.1f}")
222+
self.logger.info(f" Average time per record: {avg_time_ms:.2f}ms")
223+
self.logger.info(f" Processes used: {self.stats['processes_used']}")
224+
225+
if self.stats['processes_used'] > 1:
226+
efficiency = (self.stats['total_processing_time'] - self.stats['multiprocessing_overhead']) / self.stats['total_processing_time'] * 100
227+
self.logger.info(f" Multiprocessing efficiency: {efficiency:.1f}%")
228+
229+
230+
def get_optimal_process_count() -> int:
231+
"""
232+
Determine the optimal number of processes for hash computation
233+
based on system resources and hash computation characteristics.
234+
235+
Returns:
236+
Optimal number of processes
237+
"""
238+
cpu_count = mp.cpu_count()
239+
240+
# Hash computation is CPU-bound, so we can use more processes
241+
# But cap it to avoid excessive context switching overhead
242+
if cpu_count <= 2:
243+
return cpu_count
244+
elif cpu_count <= 4:
245+
return cpu_count
246+
elif cpu_count <= 8:
247+
return min(cpu_count, 6) # Leave some cores for other tasks
248+
else:
249+
return min(cpu_count - 2, 8) # Cap at 8, leave 2 cores free
250+
251+
252+
def benchmark_hash_methods(raw_records: List[bytes], logger: Optional[logging.Logger] = None) -> Dict[str, Any]:
253+
"""
254+
Benchmark both single-threaded and multiprocessed hash computation
255+
to compare performance.
256+
257+
Args:
258+
raw_records: List of raw MFT record bytes to benchmark
259+
logger: Optional logger for output
260+
261+
Returns:
262+
Dictionary with benchmark results
263+
"""
264+
if not raw_records:
265+
return {}
266+
267+
logger = logger or logging.getLogger('analyzeMFT.hash_benchmark')
268+
269+
# Test single-threaded
270+
processor_st = HashProcessor(num_processes=1, logger=logger)
271+
start_st = time.time()
272+
results_st = processor_st.compute_hashes_single_threaded(raw_records)
273+
time_st = time.time() - start_st
274+
275+
# Test multiprocessed
276+
processor_mp = HashProcessor(logger=logger)
277+
start_mp = time.time()
278+
results_mp = processor_mp.compute_hashes_multiprocessed(raw_records)
279+
time_mp = time.time() - start_mp
280+
281+
# Verify results are identical
282+
verification_passed = True
283+
if len(results_st) == len(results_mp):
284+
for i, (st_result, mp_result) in enumerate(zip(results_st, results_mp)):
285+
if (st_result.md5 != mp_result.md5 or
286+
st_result.sha256 != mp_result.sha256 or
287+
st_result.sha512 != mp_result.sha512 or
288+
st_result.crc32 != mp_result.crc32):
289+
verification_passed = False
290+
logger.error(f"Hash mismatch at record {i}")
291+
break
292+
else:
293+
verification_passed = False
294+
295+
speedup = time_st / time_mp if time_mp > 0 else 0
296+
efficiency = speedup / processor_mp.num_processes * 100
297+
298+
benchmark_results = {
299+
'records_tested': len(raw_records),
300+
'single_threaded_time': time_st,
301+
'multiprocessed_time': time_mp,
302+
'speedup_factor': speedup,
303+
'efficiency_percent': efficiency,
304+
'processes_used': processor_mp.num_processes,
305+
'verification_passed': verification_passed,
306+
'recommended_method': 'multiprocessing' if speedup > 1.2 else 'single_threaded'
307+
}
308+
309+
logger.info(f"Hash computation benchmark results:")
310+
logger.info(f" Records: {benchmark_results['records_tested']}")
311+
logger.info(f" Single-threaded: {time_st:.3f}s")
312+
logger.info(f" Multiprocessed: {time_mp:.3f}s")
313+
logger.info(f" Speedup: {speedup:.2f}x")
314+
logger.info(f" Efficiency: {efficiency:.1f}%")
315+
logger.info(f" Recommended: {benchmark_results['recommended_method']}")
316+
317+
return benchmark_results

0 commit comments

Comments
 (0)