-
Notifications
You must be signed in to change notification settings - Fork 3
Expand file tree
/
Copy pathipld_performance_benchmark.py
More file actions
516 lines (401 loc) · 17.2 KB
/
ipld_performance_benchmark.py
File metadata and controls
516 lines (401 loc) · 17.2 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
#!/usr/bin/env python3
"""
IPLD Performance Benchmark
This script demonstrates and benchmarks the performance improvements
from the optimized IPLD codec for high-throughput processing.
It creates test datasets of various sizes and benchmarks encoding,
decoding, and CAR file operations using both standard and optimized
implementations.
Usage:
python ipld_performance_benchmark.py [--size small|medium|large]
Example:
python ipld_performance_benchmark.py --size medium
"""
import argparse
import json
import os
import random
import sys
import tempfile
import time
from typing import Dict, List, Tuple, Any
# Add parent directory to path to import the module
parent_dir = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
sys.path.append(parent_dir)
# Import required modules
from ipfs_datasets_py.ipld.storage import IPLDStorage
from ipfs_datasets_py.ipld.optimized_codec import (
OptimizedEncoder, OptimizedDecoder, BatchProcessor,
create_batch_processor, PBNode
)
def create_test_data(size: str) -> Tuple[List[Dict], List[bytes]]:
"""
Create test data of specified size.
Args:
size (str): Size category ('small', 'medium', 'large')
Returns:
Tuple of (json_objects, binary_data)
"""
# Define sizes
sizes = {
"small": 100,
"medium": 1000,
"large": 10000
}
count = sizes.get(size, 100)
print(f"Creating test data with {count} items...")
# Create JSON objects
json_objects = []
for i in range(count):
# Create a more complex object with nested data
obj = {
"id": i,
"name": f"Item {i}",
"created": time.time(),
"metadata": {
"type": random.choice(["document", "image", "video", "audio"]),
"tags": [f"tag{random.randint(1, 10)}" for _ in range(random.randint(1, 5))],
"size": random.randint(1000, 10000000),
"attributes": {
"width": random.randint(100, 4000),
"height": random.randint(100, 4000),
"color": f"#{random.randint(0, 0xFFFFFF):06x}"
}
},
"content": "x" * random.randint(100, 1000) # Random content length
}
json_objects.append(obj)
# Create binary data
binary_data = []
for i in range(count):
size = random.randint(100, 10000)
data = bytes(random.getrandbits(8) for _ in range(size))
binary_data.append(data)
return json_objects, binary_data
def benchmark_json_encoding(storage: IPLDStorage, json_objects: List[Dict]) -> Tuple[float, float]:
"""
Benchmark JSON encoding using standard and batch methods.
Args:
storage (IPLDStorage): Storage instance
json_objects (List[Dict]): JSON objects to encode
Returns:
Tuple of (standard_time, batch_time)
"""
print(f"Benchmarking JSON encoding with {len(json_objects)} objects...")
# Standard method (one by one)
start_time = time.time()
standard_cids = []
for obj in json_objects:
cid = storage.store_json(obj)
standard_cids.append(cid)
standard_time = time.time() - start_time
print(f"Standard JSON encoding: {standard_time:.2f} seconds")
# Batch method
start_time = time.time()
batch_cids = storage.store_json_batch(json_objects)
batch_time = time.time() - start_time
print(f"Batch JSON encoding: {batch_time:.2f} seconds")
# Verify results
assert len(standard_cids) == len(batch_cids), "CID count mismatch"
return standard_time, batch_time
def benchmark_binary_encoding(storage: IPLDStorage, binary_data: List[bytes]) -> Tuple[float, float]:
"""
Benchmark binary data encoding using standard and batch methods.
Args:
storage (IPLDStorage): Storage instance
binary_data (List[bytes]): Binary data to encode
Returns:
Tuple of (standard_time, batch_time)
"""
print(f"Benchmarking binary encoding with {len(binary_data)} blocks...")
# Standard method (one by one)
start_time = time.time()
standard_cids = []
for data in binary_data:
cid = storage.store(data)
standard_cids.append(cid)
standard_time = time.time() - start_time
print(f"Standard binary encoding: {standard_time:.2f} seconds")
# Batch method
start_time = time.time()
batch_cids = storage.store_batch(binary_data)
batch_time = time.time() - start_time
print(f"Batch binary encoding: {batch_time:.2f} seconds")
# Verify results
assert len(standard_cids) == len(batch_cids), "CID count mismatch"
return standard_time, batch_time
def benchmark_retrieval(storage: IPLDStorage, cids: List[str]) -> Tuple[float, float]:
"""
Benchmark data retrieval using standard and batch methods.
Args:
storage (IPLDStorage): Storage instance
cids (List[str]): CIDs to retrieve
Returns:
Tuple of (standard_time, batch_time)
"""
print(f"Benchmarking retrieval with {len(cids)} CIDs...")
# Standard method (one by one)
start_time = time.time()
standard_results = []
for cid in cids:
data = storage.get(cid)
standard_results.append(data)
standard_time = time.time() - start_time
print(f"Standard retrieval: {standard_time:.2f} seconds")
# Batch method
start_time = time.time()
batch_results = storage.get_batch(cids)
batch_time = time.time() - start_time
print(f"Batch retrieval: {batch_time:.2f} seconds")
# Verify results
assert len(standard_results) == len(batch_results), "Result count mismatch"
return standard_time, batch_time
def benchmark_car_operations(storage: IPLDStorage, cids: List[str]) -> Tuple[float, float, float, float]:
"""
Benchmark CAR file export/import using standard and streaming methods.
Args:
storage (IPLDStorage): Storage instance
cids (List[str]): CIDs to include in CAR
Returns:
Tuple of (standard_export_time, stream_export_time, standard_import_time, stream_import_time)
"""
print(f"Benchmarking CAR operations with {len(cids)} CIDs...")
# Create temp directory for CAR files
temp_dir = tempfile.mkdtemp()
standard_car_path = os.path.join(temp_dir, "standard.car")
stream_car_path = os.path.join(temp_dir, "stream.car")
# Standard export
start_time = time.time()
storage.export_to_car(cids, standard_car_path)
standard_export_time = time.time() - start_time
print(f"Standard CAR export: {standard_export_time:.2f} seconds")
# Streaming export
start_time = time.time()
with open(stream_car_path, 'wb') as f:
storage.export_to_car_stream(cids, f)
stream_export_time = time.time() - start_time
print(f"Streaming CAR export: {stream_export_time:.2f} seconds")
# Create new storage instances for import testing
standard_import_storage = IPLDStorage(base_dir=os.path.join(temp_dir, "standard_import"))
stream_import_storage = IPLDStorage(base_dir=os.path.join(temp_dir, "stream_import"))
# Standard import
start_time = time.time()
standard_import_storage.import_from_car(standard_car_path)
standard_import_time = time.time() - start_time
print(f"Standard CAR import: {standard_import_time:.2f} seconds")
# Streaming import
start_time = time.time()
with open(stream_car_path, 'rb') as f:
stream_import_storage.import_from_car_stream(f)
stream_import_time = time.time() - start_time
print(f"Streaming CAR import: {stream_import_time:.2f} seconds")
# Clean up
try:
import shutil
shutil.rmtree(temp_dir)
except:
print(f"Failed to remove temp directory: {temp_dir}")
return standard_export_time, stream_export_time, standard_import_time, stream_import_time
def benchmark_optimized_codec(binary_data: List[bytes]) -> Dict[str, float]:
"""
Benchmark the optimized codec directly.
Args:
binary_data (List[bytes]): Binary data to encode/decode
Returns:
Dict with benchmark results
"""
print(f"Benchmarking optimized codec with {len(binary_data)} blocks...")
# Create nodes for encoding
nodes = [PBNode(data=data) for data in binary_data]
# Standard encoder/decoder (without optimizations)
standard_encoder = OptimizedEncoder(use_cache=False, max_workers=1)
standard_decoder = OptimizedDecoder(use_cache=False, max_workers=1)
# Optimized encoder/decoder
optimized_encoder = OptimizedEncoder(use_cache=True, max_workers=None)
optimized_decoder = OptimizedDecoder(use_cache=True, max_workers=None)
# Benchmark standard encoding
start_time = time.time()
standard_encoded = []
for node in nodes:
encoded = standard_encoder.encode_node(node)
standard_encoded.append(encoded)
standard_encode_time = time.time() - start_time
print(f"Standard encoding: {standard_encode_time:.2f} seconds")
# Benchmark optimized encoding
start_time = time.time()
optimized_encoded = optimized_encoder.encode_batch(nodes)
optimized_encode_time = time.time() - start_time
print(f"Optimized encoding: {optimized_encode_time:.2f} seconds")
# Extract encoded data for decoding tests
encoded_data = [(data, cid) for data, cid in standard_encoded]
# Benchmark standard decoding
start_time = time.time()
for data, cid in encoded_data:
standard_decoder.decode_block(data, cid)
standard_decode_time = time.time() - start_time
print(f"Standard decoding: {standard_decode_time:.2f} seconds")
# Benchmark optimized decoding
start_time = time.time()
optimized_decoder.decode_batch(encoded_data)
optimized_decode_time = time.time() - start_time
print(f"Optimized decoding: {optimized_decode_time:.2f} seconds")
# Check cache impact
cache_encoder = OptimizedEncoder(use_cache=True, max_workers=1)
# First run to warm up cache
for node in nodes[:10]:
cache_encoder.encode_node(node)
# Benchmark repeated encoding with cache
start_time = time.time()
for _ in range(5):
for node in nodes[:10]:
cache_encoder.encode_node(node)
cache_encode_time = time.time() - start_time
print(f"Cached encoding (50 operations): {cache_encode_time:.2f} seconds")
if cache_encoder.stats:
cache_hits = cache_encoder.stats.cache_hits
cache_misses = cache_encoder.stats.cache_misses
cache_hit_rate = cache_hits / (cache_hits + cache_misses) if (cache_hits + cache_misses) > 0 else 0
print(f"Cache hit rate: {cache_hit_rate:.2%}")
# Return results
return {
"standard_encode_time": standard_encode_time,
"optimized_encode_time": optimized_encode_time,
"standard_decode_time": standard_decode_time,
"optimized_decode_time": optimized_decode_time,
"cache_encode_time": cache_encode_time
}
def run_full_benchmark(size: str) -> Dict[str, Any]:
"""
Run a full set of benchmarks and return the results.
Args:
size (str): Size category ('small', 'medium', 'large')
Returns:
Dict with benchmark results
"""
# Create test data
json_objects, binary_data = create_test_data(size)
# Create storage
temp_dir = tempfile.mkdtemp()
storage = IPLDStorage(base_dir=temp_dir)
# Run benchmarks
results = {}
# JSON encoding benchmark
standard_json_time, batch_json_time = benchmark_json_encoding(storage, json_objects)
results["json_encoding"] = {
"standard_time": standard_json_time,
"batch_time": batch_json_time,
"improvement": standard_json_time / batch_json_time if batch_json_time > 0 else 0
}
# Binary encoding benchmark
standard_binary_time, batch_binary_time = benchmark_binary_encoding(storage, binary_data)
results["binary_encoding"] = {
"standard_time": standard_binary_time,
"batch_time": batch_binary_time,
"improvement": standard_binary_time / batch_binary_time if batch_binary_time > 0 else 0
}
# Store data for retrieval benchmark
print("Storing data for retrieval benchmark...")
cids = storage.store_batch(binary_data)
# Retrieval benchmark
standard_retrieval_time, batch_retrieval_time = benchmark_retrieval(storage, cids)
results["retrieval"] = {
"standard_time": standard_retrieval_time,
"batch_time": batch_retrieval_time,
"improvement": standard_retrieval_time / batch_retrieval_time if batch_retrieval_time > 0 else 0
}
# CAR operations benchmark
# Use a subset of CIDs for larger datasets to keep benchmark reasonable
car_cids = cids[:min(len(cids), 100)]
standard_export_time, stream_export_time, standard_import_time, stream_import_time = (
benchmark_car_operations(storage, car_cids)
)
results["car_operations"] = {
"standard_export_time": standard_export_time,
"stream_export_time": stream_export_time,
"standard_import_time": standard_import_time,
"stream_import_time": stream_import_time,
"export_improvement": standard_export_time / stream_export_time if stream_export_time > 0 else 0,
"import_improvement": standard_import_time / stream_import_time if stream_import_time > 0 else 0
}
# Optimized codec benchmark
codec_results = benchmark_optimized_codec(binary_data)
results["codec"] = codec_results
results["codec"]["encode_improvement"] = (
codec_results["standard_encode_time"] / codec_results["optimized_encode_time"]
if codec_results["optimized_encode_time"] > 0 else 0
)
results["codec"]["decode_improvement"] = (
codec_results["standard_decode_time"] / codec_results["optimized_decode_time"]
if codec_results["optimized_decode_time"] > 0 else 0
)
# Clean up
try:
import shutil
shutil.rmtree(temp_dir)
except:
print(f"Failed to remove temp directory: {temp_dir}")
return results
def print_results(results: Dict[str, Any]) -> None:
"""
Print formatted benchmark results.
Args:
results (Dict): Benchmark results
"""
print("\n" + "=" * 60)
print("BENCHMARK RESULTS")
print("=" * 60)
print("\nJSON Encoding:")
print(f" Standard: {results['json_encoding']['standard_time']:.2f}s")
print(f" Batch: {results['json_encoding']['batch_time']:.2f}s")
print(f" Improvement: {results['json_encoding']['improvement']:.2f}x")
print("\nBinary Encoding:")
print(f" Standard: {results['binary_encoding']['standard_time']:.2f}s")
print(f" Batch: {results['binary_encoding']['batch_time']:.2f}s")
print(f" Improvement: {results['binary_encoding']['improvement']:.2f}x")
print("\nData Retrieval:")
print(f" Standard: {results['retrieval']['standard_time']:.2f}s")
print(f" Batch: {results['retrieval']['batch_time']:.2f}s")
print(f" Improvement: {results['retrieval']['improvement']:.2f}x")
print("\nCAR Operations:")
print(f" Standard Export: {results['car_operations']['standard_export_time']:.2f}s")
print(f" Stream Export: {results['car_operations']['stream_export_time']:.2f}s")
print(f" Export Improvement: {results['car_operations']['export_improvement']:.2f}x")
print(f" Standard Import: {results['car_operations']['standard_import_time']:.2f}s")
print(f" Stream Import: {results['car_operations']['stream_import_time']:.2f}s")
print(f" Import Improvement: {results['car_operations']['import_improvement']:.2f}x")
print("\nOptimized Codec:")
print(f" Standard Encoding: {results['codec']['standard_encode_time']:.2f}s")
print(f" Optimized Encoding: {results['codec']['optimized_encode_time']:.2f}s")
print(f" Encoding Improvement: {results['codec']['encode_improvement']:.2f}x")
print(f" Standard Decoding: {results['codec']['standard_decode_time']:.2f}s")
print(f" Optimized Decoding: {results['codec']['optimized_decode_time']:.2f}s")
print(f" Decoding Improvement: {results['codec']['decode_improvement']:.2f}x")
print("\nOverall Improvement:")
improvements = [
results['json_encoding']['improvement'],
results['binary_encoding']['improvement'],
results['retrieval']['improvement'],
results['car_operations']['export_improvement'],
results['car_operations']['import_improvement'],
results['codec']['encode_improvement'],
results['codec']['decode_improvement']
]
avg_improvement = sum(improvements) / len(improvements)
print(f" Average: {avg_improvement:.2f}x")
print("=" * 60)
if __name__ == "__main__":
parser = argparse.ArgumentParser(description='IPLD Performance Benchmark')
parser.add_argument('--size', choices=['small', 'medium', 'large'], default='small',
help='Size of the test dataset (default: small)')
args = parser.parse_args()
print(f"Starting IPLD performance benchmark with {args.size} dataset...")
start_time = time.time()
results = run_full_benchmark(args.size)
total_time = time.time() - start_time
print(f"\nBenchmark completed in {total_time:.2f} seconds")
print_results(results)
# Save results to file
result_file = f"ipld_benchmark_{args.size}_{int(time.time())}.json"
with open(result_file, 'w') as f:
json.dump(results, f, indent=2)
print(f"\nDetailed results saved to {result_file}")