Skip to content

Commit 21cd393

Browse files
authored
fix: chunk initial symlinks on re-ingest (#882)
* fix: chunk initial symlinks on re-ingest * fix: rollback on bad chunk
1 parent 0a2c5a9 commit 21cd393

File tree

1 file changed

+74
-51
lines changed

1 file changed

+74
-51
lines changed

src/program/program.py

+74-51
Original file line numberDiff line numberDiff line change
@@ -393,63 +393,86 @@ def _init_db_from_symlinks(self):
393393
return
394394

395395
logger.log("PROGRAM", "Collecting items from symlinks, this may take a while depending on library size")
396-
items = self.services[SymlinkLibrary].run()
397-
errors = []
398-
added_items = set()
399-
400-
progress, console = create_progress_bar(len(items))
401-
task = progress.add_task("Enriching items with metadata", total=len(items), log="")
402-
403-
with Live(progress, console=console, refresh_per_second=10):
404-
workers = int(os.getenv("SYMLINK_MAX_WORKERS", 4))
405-
with ThreadPoolExecutor(thread_name_prefix="EnhanceSymlinks", max_workers=workers) as executor:
406-
future_to_item = {
407-
executor.submit(self._enhance_item, item): item
408-
for item in items
409-
if isinstance(item, (Movie, Show))
410-
}
411-
412-
for future in as_completed(future_to_item):
413-
item = future_to_item[future]
414-
log_message = ""
415-
396+
try:
397+
items = self.services[SymlinkLibrary].run()
398+
errors = []
399+
added_items = set()
400+
401+
# Convert items to list and get total count
402+
items_list = [item for item in items if isinstance(item, (Movie, Show))]
403+
total_items = len(items_list)
404+
405+
progress, console = create_progress_bar(total_items)
406+
task = progress.add_task("Enriching items with metadata", total=total_items, log="")
407+
408+
# Process in chunks of 100 items
409+
chunk_size = 100
410+
with Live(progress, console=console, refresh_per_second=10):
411+
workers = int(os.getenv("SYMLINK_MAX_WORKERS", 4))
412+
413+
for i in range(0, total_items, chunk_size):
414+
chunk = items_list[i:i + chunk_size]
415+
416416
try:
417-
if not item or item.imdb_id in added_items:
418-
errors.append(f"Duplicate symlink directory found for {item.log_string}")
419-
continue
420-
421-
# Check for existing item using your db_functions
422-
if db_functions.get_item_by_id(item.id, session=session):
423-
errors.append(f"Duplicate item found in database for id: {item.id}")
424-
continue
425-
426-
enhanced_item = future.result()
427-
if not enhanced_item:
428-
errors.append(f"Failed to enhance {item.log_string} ({item.imdb_id}) with Trakt Indexer")
429-
continue
430-
431-
enhanced_item.store_state()
432-
session.add(enhanced_item)
433-
added_items.add(item.imdb_id)
434-
435-
log_message = f"Indexed IMDb Id: {enhanced_item.id} as {enhanced_item.type.title()}: {enhanced_item.log_string}"
436-
except NotADirectoryError:
437-
errors.append(f"Skipping {item.log_string} as it is not a valid directory")
417+
with ThreadPoolExecutor(thread_name_prefix="EnhanceSymlinks", max_workers=workers) as executor:
418+
future_to_item = {
419+
executor.submit(self._enhance_item, item): item
420+
for item in chunk
421+
}
422+
423+
for future in as_completed(future_to_item):
424+
item = future_to_item[future]
425+
log_message = ""
426+
427+
try:
428+
if not item or item.imdb_id in added_items:
429+
errors.append(f"Duplicate symlink directory found for {item.log_string}")
430+
continue
431+
432+
if db_functions.get_item_by_id(item.id, session=session):
433+
errors.append(f"Duplicate item found in database for id: {item.id}")
434+
continue
435+
436+
enhanced_item = future.result()
437+
if not enhanced_item:
438+
errors.append(f"Failed to enhance {item.log_string} ({item.imdb_id}) with Trakt Indexer")
439+
continue
440+
441+
enhanced_item.store_state()
442+
session.add(enhanced_item)
443+
added_items.add(item.imdb_id)
444+
445+
log_message = f"Indexed IMDb Id: {enhanced_item.id} as {enhanced_item.type.title()}: {enhanced_item.log_string}"
446+
except NotADirectoryError:
447+
errors.append(f"Skipping {item.log_string} as it is not a valid directory")
448+
except Exception as e:
449+
logger.exception(f"Error processing {item.log_string}: {e}")
450+
raise # Re-raise to trigger rollback
451+
finally:
452+
progress.update(task, advance=1, log=log_message)
453+
454+
# Only commit if the entire chunk was successful
455+
session.commit()
456+
438457
except Exception as e:
439-
logger.exception(f"Error processing {item.log_string}: {e}")
440-
finally:
441-
progress.update(task, advance=1, log=log_message)
442-
458+
session.rollback()
459+
logger.error(f"Failed to process chunk {i//chunk_size + 1}, rolling back all changes: {str(e)}")
460+
raise # Re-raise to abort the entire process
461+
443462
progress.update(task, log="Finished Indexing Symlinks!")
444-
session.commit()
445463

446-
if errors:
447-
logger.error("Errors encountered during initialization")
448-
for error in errors:
449-
logger.error(error)
464+
if errors:
465+
logger.error("Errors encountered during initialization")
466+
for error in errors:
467+
logger.error(error)
468+
469+
except Exception as e:
470+
session.rollback()
471+
logger.error(f"Failed to initialize database from symlinks: {str(e)}")
472+
return
450473

451474
elapsed_time = datetime.now() - start_time
452475
total_seconds = elapsed_time.total_seconds()
453476
hours, remainder = divmod(total_seconds, 3600)
454477
minutes, seconds = divmod(remainder, 60)
455-
logger.success(f"Database initialized, time taken: h{int(hours):02d}:m{int(minutes):02d}:s{int(seconds):02d}")
478+
logger.success(f"Database initialized, time taken: h{int(hours):02d}:m{int(minutes):02d}:s{int(seconds):02d}")

0 commit comments

Comments
 (0)