@@ -464,27 +464,27 @@ def _load_df_offline(
464464 / self .dataset_name
465465 / "annotations"
466466 )
467-
468- if not path . exists () :
467+ files = list ( path . glob ( "*.parquet" ))
468+ if not files :
469469 if raise_when_empty :
470470 raise FileNotFoundError (
471471 f"Dataset '{ self .dataset_name } ' is empty."
472472 )
473473 return None
474474
475+ lazy_df = pl .scan_parquet ([str (f ) for f in files ])
476+
475477 if lazy :
476- dfs = [pl .scan_parquet (file ) for file in path .glob ("*.parquet" )]
477- df = pl .concat (dfs ) if dfs else None
478- else :
479- dfs = [pl .read_parquet (file ) for file in path .glob ("*.parquet" )]
480- df = pl .concat (dfs ) if dfs else None
478+ return lazy_df
481479
482- if df is None and raise_when_empty :
480+ df = lazy_df .collect ()
481+ if df .is_empty () and raise_when_empty :
483482 raise FileNotFoundError (f"Dataset '{ self .dataset_name } ' is empty." )
484483
485- if not attempt_migration or self .version == LDF_VERSION or df is None :
486- return df
487- return migrate_dataframe (df ) # pragma: no cover
484+ if attempt_migration and self .version != LDF_VERSION :
485+ df = migrate_dataframe (df )
486+
487+ return df
488488
489489 @overload
490490 def _get_index (
@@ -514,24 +514,35 @@ def _get_index(
514514 ) -> pl .DataFrame | pl .LazyFrame | None :
515515 """Loads unique file entries from annotation data."""
516516 df = self ._load_df_offline (
517- lazy = lazy , raise_when_empty = raise_when_empty
517+ lazy = True , raise_when_empty = raise_when_empty
518518 )
519519 if df is None :
520520 return None
521521
522- processed = df .select (
523- pl .col ("uuid" ),
524- pl .col ("file" ).str .extract (r"([^\/\\]+)$" ).alias ("file" ),
525- pl .col ("file" )
526- .apply (lambda x : str (Path (x ).resolve ()), return_dtype = pl .Utf8 )
527- .alias ("original_filepath" ),
528- )
522+ if isinstance (df , pl .DataFrame ):
523+ df = df .lazy ()
529524
530- processed = processed .unique (
531- subset = ["uuid" , "original_filepath" ], maintain_order = False
532- )
525+ unique_files = df .select (pl .col ("file" )).unique ().collect ()
526+ files : list [str ] = unique_files ["file" ].to_list ()
527+
528+ def resolve_path (p : str ) -> str :
529+ return str (Path (p ).resolve ())
530+
531+ with ThreadPoolExecutor () as pool :
532+ resolved_paths = list (pool .map (resolve_path , files ))
533+ mapping : dict [str , str ] = dict (zip (files , resolved_paths , strict = True ))
533534
534- if not lazy and isinstance (processed , pl .LazyFrame ):
535+ processed = df .with_columns (
536+ [
537+ pl .col ("uuid" ),
538+ pl .col ("file" ).str .extract (r"([^\/\\]+)$" ).alias ("file" ),
539+ pl .col ("file" )
540+ .map_dict (mapping , default = None )
541+ .alias ("original_filepath" ),
542+ ]
543+ ).unique (subset = ["uuid" , "original_filepath" ], maintain_order = False )
544+
545+ if not lazy :
535546 processed = processed .collect ()
536547
537548 return processed
@@ -717,23 +728,18 @@ def pull_from_cloud(
717728 index = self ._get_index (lazy = False )
718729 missing_media_paths = []
719730 if index is not None :
720- missing = index .filter (
721- pl .col ("original_filepath" ).apply (
722- lambda path : not Path (path ).exists (),
723- return_dtype = pl .Boolean ,
724- )
725- & pl .col ("uuid" ).apply (
726- lambda uid : not (
727- Path ("local_dir" )
728- / "media"
729- / f"{ uid } { Path (str (uid )).suffix } "
730- ).exists (),
731- return_dtype = pl .Boolean ,
732- )
733- )
731+ df_small = index .select (["uuid" , "file" , "original_filepath" ])
732+ uuids = df_small ["uuid" ].to_list ()
733+ files = df_small ["file" ].to_list ()
734+ origps = df_small ["original_filepath" ].to_list ()
735+
736+ media_root = Path (local_dir ) / self .dataset_name / "media"
737+
734738 missing_media_paths = [
735- f"media/{ row [0 ]} { Path (str (row [1 ])).suffix } "
736- for row in missing .select (["uuid" , "file" ]).iter_rows ()
739+ f"media/{ uid } { Path (f ).suffix } "
740+ for uid , f , orig in zip (uuids , files , origps , strict = True )
741+ if not Path (orig ).exists ()
742+ and not (media_root / f"{ uid } { Path (f ).suffix } " ).exists ()
737743 ]
738744
739745 if update_mode == UpdateMode .ALL :
0 commit comments