-
Notifications
You must be signed in to change notification settings - Fork 3
Expand file tree
/
Copy path09_batch_processing.py
More file actions
379 lines (284 loc) · 11.6 KB
/
09_batch_processing.py
File metadata and controls
379 lines (284 loc) · 11.6 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
"""
Batch Processing - Process Files at Scale
This example demonstrates how to process large numbers of files efficiently
using batch processing with resource limits, progress tracking, and error handling.
Requirements:
- Core ipfs_datasets_py dependencies
Usage:
python examples/09_batch_processing.py
"""
import asyncio
import tempfile
from pathlib import Path
import time
def create_test_dataset(num_files=20):
"""Create a test dataset with multiple files."""
tmpdir = Path(tempfile.mkdtemp())
print(f"\n📝 Creating {num_files} test files in {tmpdir}...")
files = []
for i in range(num_files):
# Create files of varying sizes and types
if i % 3 == 0:
# Text files
file_path = tmpdir / f"document_{i}.txt"
file_path.write_text(f"This is document {i}.\n" * (50 + i * 10))
elif i % 3 == 1:
# JSON files
file_path = tmpdir / f"data_{i}.json"
file_path.write_text(f'{{"id": {i}, "value": "test_{i}", "data": [1, 2, 3]}}')
else:
# Markdown files
file_path = tmpdir / f"note_{i}.md"
file_path.write_text(f"# Note {i}\n\nThis is a markdown note.\n\n" * 5)
files.append(file_path)
print(f" ✅ Created {len(files)} files")
return tmpdir, files
async def demo_basic_batch_processing():
"""Process multiple files in batch."""
print("\n" + "="*70)
print("DEMO 1: Basic Batch Processing")
print("="*70)
try:
from ipfs_datasets_py.processors.file_converter import BatchProcessor
# Create test files
tmpdir, files = create_test_dataset(10)
# Initialize batch processor
print("\n🔄 Initializing batch processor...")
processor = BatchProcessor(max_concurrent=3)
# Process all files
print(f"\n📊 Processing {len(files)} files...")
start_time = time.time()
results = await processor.process_batch(
files=files,
target_format='txt'
)
elapsed = time.time() - start_time
# Summary
successful = sum(1 for r in results if r.success)
failed = len(results) - successful
print(f"\n✅ Batch processing complete")
print(f" Total files: {len(results)}")
print(f" Successful: {successful}")
print(f" Failed: {failed}")
print(f" Time: {elapsed:.2f} seconds")
print(f" Throughput: {len(results)/elapsed:.2f} files/second")
# Show sample results
print("\n Sample results:")
for i, result in enumerate(results[:3], 1):
status = "✅" if result.success else "❌"
length = len(result.content) if result.success else 0
print(f" {status} File {i}: {length} characters")
# Cleanup
import shutil
shutil.rmtree(tmpdir)
except Exception as e:
print(f"\n❌ Error: {e}")
import traceback
traceback.print_exc()
async def demo_resource_limits():
"""Batch processing with resource limits."""
print("\n" + "="*70)
print("DEMO 2: Resource-Limited Batch Processing")
print("="*70)
try:
from ipfs_datasets_py.processors.file_converter import (
BatchProcessor,
ResourceLimits
)
tmpdir, files = create_test_dataset(15)
# Define resource limits
print("\n⚙️ Configuring resource limits...")
limits = ResourceLimits(
max_memory_mb=512, # Max 512MB memory
max_concurrent=5, # Max 5 concurrent tasks
timeout_seconds=30 # 30 second timeout per file
)
print(f" Max memory: {limits.max_memory_mb} MB")
print(f" Max concurrent: {limits.max_concurrent}")
print(f" Timeout: {limits.timeout_seconds}s")
# Process with limits
processor = BatchProcessor(resource_limits=limits)
print(f"\n📊 Processing {len(files)} files with limits...")
results = await processor.process_batch(files, target_format='txt')
successful = sum(1 for r in results if r.success)
print(f"\n✅ Completed: {successful}/{len(results)} files")
import shutil
shutil.rmtree(tmpdir)
except Exception as e:
print(f"\n❌ Error: {e}")
async def demo_progress_tracking():
"""Batch processing with progress tracking."""
print("\n" + "="*70)
print("DEMO 3: Progress Tracking")
print("="*70)
try:
from ipfs_datasets_py.processors.file_converter import (
BatchProcessor,
BatchProgress
)
tmpdir, files = create_test_dataset(20)
print("\n📊 Processing with progress tracking...")
# Progress callback
def on_progress(progress: BatchProgress):
percent = (progress.completed / progress.total) * 100
print(f" Progress: {progress.completed}/{progress.total} "
f"({percent:.1f}%) - "
f"Success: {progress.successful}, Failed: {progress.failed}")
processor = BatchProcessor(
max_concurrent=4,
progress_callback=on_progress
)
results = await processor.process_batch(files, target_format='txt')
print(f"\n✅ Processing complete")
import shutil
shutil.rmtree(tmpdir)
except Exception as e:
print(f"\n❌ Error: {e}")
async def demo_error_handling():
"""Batch processing with error handling."""
print("\n" + "="*70)
print("DEMO 4: Error Handling")
print("="*70)
try:
from ipfs_datasets_py.processors.file_converter import BatchProcessor
tmpdir, files = create_test_dataset(10)
# Add some invalid files
invalid_file = tmpdir / "corrupted.txt"
invalid_file.write_bytes(b'\x00\x00\xff\xff') # Binary garbage
files.append(invalid_file)
print(f"\n🔧 Processing {len(files)} files (including 1 invalid)...")
processor = BatchProcessor(
max_concurrent=3,
continue_on_error=True # Don't stop on errors
)
results = await processor.process_batch(files, target_format='txt')
# Analyze results
successful = [r for r in results if r.success]
failed = [r for r in results if not r.success]
print(f"\n📊 Results:")
print(f" Total: {len(results)}")
print(f" Successful: {len(successful)}")
print(f" Failed: {len(failed)}")
if failed:
print("\n Failed files:")
for result in failed:
print(f" - {result.source_file}: {result.error}")
import shutil
shutil.rmtree(tmpdir)
except Exception as e:
print(f"\n❌ Error: {e}")
async def demo_parallel_processing():
"""Compare sequential vs parallel processing."""
print("\n" + "="*70)
print("DEMO 5: Sequential vs Parallel Performance")
print("="*70)
try:
from ipfs_datasets_py.processors.file_converter import BatchProcessor
tmpdir, files = create_test_dataset(15)
# Sequential processing (1 at a time)
print("\n🐌 Sequential processing...")
processor_seq = BatchProcessor(max_concurrent=1)
start_seq = time.time()
results_seq = await processor_seq.process_batch(files, target_format='txt')
time_seq = time.time() - start_seq
print(f" Time: {time_seq:.2f}s")
print(f" Throughput: {len(files)/time_seq:.2f} files/second")
# Parallel processing (5 concurrent)
print("\n🚀 Parallel processing (5 concurrent)...")
processor_par = BatchProcessor(max_concurrent=5)
start_par = time.time()
results_par = await processor_par.process_batch(files, target_format='txt')
time_par = time.time() - start_par
print(f" Time: {time_par:.2f}s")
print(f" Throughput: {len(files)/time_par:.2f} files/second")
# Compare
speedup = time_seq / time_par
print(f"\n⚡ Speedup: {speedup:.2f}x faster with parallel processing")
import shutil
shutil.rmtree(tmpdir)
except Exception as e:
print(f"\n❌ Error: {e}")
async def demo_caching():
"""Batch processing with caching."""
print("\n" + "="*70)
print("DEMO 6: Caching for Repeated Processing")
print("="*70)
print("\n💾 Caching Example")
example_code = '''
from ipfs_datasets_py.processors.file_converter import (
BatchProcessor,
CacheManager
)
# Initialize cache
cache = CacheManager(cache_dir="/tmp/batch_cache")
# Create processor with cache
processor = BatchProcessor(
max_concurrent=5,
cache_manager=cache,
use_cache=True
)
# First run: processes all files
print("First run...")
results1 = await processor.process_batch(files, target_format='txt')
# Time: 10 seconds
# Second run: uses cached results
print("Second run (cached)...")
results2 = await processor.process_batch(files, target_format='txt')
# Time: 0.5 seconds (20x faster!)
'''
print(example_code)
print("\n💡 Caching benefits:")
print(" - Skip reprocessing unchanged files")
print(" - Much faster on repeated runs")
print(" - Save compute resources")
def show_tips():
"""Show tips for batch processing."""
print("\n" + "="*70)
print("TIPS FOR BATCH PROCESSING")
print("="*70)
print("\n1. Concurrency:")
print(" - Start with max_concurrent = CPU cores")
print(" - Adjust based on I/O vs CPU workload")
print(" - Monitor memory usage and adjust")
print("\n2. Resource Management:")
print(" - Set memory limits to prevent OOM")
print(" - Use timeouts for long-running tasks")
print(" - Implement backpressure if needed")
print("\n3. Error Handling:")
print(" - Use continue_on_error=True for resilience")
print(" - Log errors for later analysis")
print(" - Retry failed files separately")
print("\n4. Performance:")
print(" - Profile to find bottlenecks")
print(" - Use caching for repeated processing")
print(" - Consider distributing across machines")
print("\n5. Progress Tracking:")
print(" - Implement progress callbacks")
print(" - Persist state for long-running jobs")
print(" - Allow resumption after interruption")
print("\n6. Best Practices:")
print(" - Test on small batches first")
print(" - Monitor system resources")
print(" - Have fallback strategies")
print(" - Keep audit logs")
print("\n7. Scaling:")
print(" - For >10k files: Consider chunking")
print(" - For >100k files: Use distributed processing")
print(" - Consider cloud services for massive scale")
async def main():
"""Run all batch processing demonstrations."""
print("\n" + "="*70)
print("IPFS DATASETS PYTHON - BATCH PROCESSING")
print("="*70)
await demo_basic_batch_processing()
await demo_resource_limits()
await demo_progress_tracking()
await demo_error_handling()
await demo_parallel_processing()
await demo_caching()
show_tips()
print("\n" + "="*70)
print("✅ BATCH PROCESSING EXAMPLES COMPLETE")
print("="*70)
if __name__ == "__main__":
asyncio.run(main())