Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
9f59950
Initial structure for refactoring exports to native and to COCO Roboflow
dtronmans Oct 30, 2025
7408af1
the extract function which is the same for every dataset type
dtronmans Oct 30, 2025
3431980
temp changes for now to compare to initial behavior
dtronmans Oct 30, 2025
9b4fde3
Temp changes and fixes, cleanup
dtronmans Oct 31, 2025
7a0e63e
Format avoid circular import
dtronmans Oct 31, 2025
5ff2f39
More moving of files and logic
dtronmans Oct 31, 2025
48edb64
More moving and cleaning up of export functions
dtronmans Nov 3, 2025
201c91d
Refactor COCO format enum re-usable by coco parser and coco exporter
dtronmans Nov 3, 2025
c46ea68
transform function cleaned with _row_to_record method
dtronmans Nov 3, 2025
af18bed
Support for annotation formats where there is one annotation file per…
dtronmans Nov 3, 2025
b261a37
Checkpoint: safer flow and fix both pyright and pre-commit
dtronmans Nov 3, 2025
ae29e7e
More comment reformatting and reorder of BaseExporter class
dtronmans Nov 3, 2025
506b308
move groupby away from function, coco exporter with custom logic per …
dtronmans Nov 3, 2025
25d96af
Working COCO exporter for both formats, some cleaning required
dtronmans Nov 4, 2025
2d55ea7
working coco and native exporters, minus refactor
dtronmans Nov 4, 2025
4cfff78
Refactor of the transform() method: move row-by-row processing away
dtronmans Nov 4, 2025
f6aa64c
Remove support for COCO exporter for now
dtronmans Nov 4, 2025
e5b2e57
Cleaned luxonis_dataset.py only accept NativeExporter for now
dtronmans Nov 4, 2025
8856957
Remove COCO Exporter file from tracking for now
dtronmans Nov 4, 2025
db77a01
temp changes: bring back intermediate saving
dtronmans Nov 4, 2025
7994413
Proper intermediate saving
dtronmans Nov 4, 2025
8740a92
More changes for cleaner prepare_ldf
dtronmans Nov 4, 2025
d9599a4
Fixed precommit and types and passing tests
dtronmans Nov 4, 2025
d5ddde7
Docformat CI
dtronmans Nov 4, 2025
5dfbeb1
Pre-commit
dtronmans Nov 4, 2025
c59275a
Revert COCOFormat changes: each parser/exporter defines its own Forma…
dtronmans Nov 5, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
283 changes: 44 additions & 239 deletions luxonis_ml/data/datasets/luxonis_dataset.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
import json
import math
import shutil
import sys
from collections import defaultdict
from collections.abc import Iterable, Mapping, Sequence
from concurrent.futures import ThreadPoolExecutor
Expand All @@ -18,6 +17,7 @@
from semver.version import Version
from typing_extensions import Self, override

from luxonis_ml.data.exporters import BaseExporter, NativeExporter, PreparedLDF
from luxonis_ml.data.utils import (
BucketStorage,
BucketType,
Expand Down Expand Up @@ -1497,266 +1497,71 @@ def export(
partition) after export. Default is False.
@rtype: Union[Path, List[Path]]
@return: Path(s) to the ZIP file(s) containing the exported
dataset.
dataset (if zip_output=True). Otherwise, the output
directory.
"""

def _dump_annotations(
annotations: dict[str, list[Any]],
output_path: Path,
identifier: str,
part: int | None = None,
) -> None:
for split_name, annotation_data in annotations.items():
if part is not None:
split_path = (
output_path / f"{identifier}_part{part}" / split_name
)
else:
split_path = output_path / identifier / split_name
split_path.mkdir(parents=True, exist_ok=True)
with open(split_path / "annotations.json", "w") as f:
json.dump(annotation_data, f, indent=4)

def resolve_path(
img_path: str | Path, uuid: str, media_path: str
) -> str:
img_path = Path(img_path)
if img_path.exists():
return str(img_path)

ext = img_path.suffix.lstrip(".")
fallback = Path(media_path) / f"{uuid}.{ext}"
if not fallback.exists():
raise FileNotFoundError(f"Missing image: {fallback}")
return str(fallback)

if dataset_type is not DatasetType.NATIVE:
EXPORTER_MAP = {
DatasetType.NATIVE: NativeExporter
} # More exporters to be defined here
exporter_cls = EXPORTER_MAP.get(dataset_type)
if exporter_cls is None:
raise NotImplementedError(
"Only 'NATIVE' dataset export is supported at the moment"
f"Unsupported export format: {dataset_type}"
)

logger.info(
f"Exporting '{self.identifier}' to '{dataset_type.name}' format"
)

splits = self.get_splits()
if splits is None:
raise ValueError("Cannot export dataset without splits")

output_path = Path(output_path)
if output_path.exists():
out_path = Path(output_path)
if out_path.exists():
raise ValueError(
f"Export path '{output_path}' already exists. Please remove it first."
f"Export path '{out_path}' already exists. Please remove it first."
)
output_path.mkdir(parents=True)
image_indices = {}
annotations = {"train": [], "val": [], "test": []}
df = self._load_df_offline(raise_when_empty=True)

# Capture the original order. Assume annotations are ordered if instance_id's were not specified.
df = df.with_row_count("row_idx").with_columns(
pl.col("row_idx").min().over("file").alias("first_occur")
)
out_path.mkdir(parents=True)

# Resolve file paths to ensure they are absolute and exist
df = df.with_columns(
pl.struct(["file", "uuid"])
.map_elements(
lambda row: resolve_path(
row["file"], row["uuid"], str(self.media_path)
),
return_dtype=pl.Utf8,
)
.alias("file")
)
prepared_ldf = PreparedLDF.from_dataset(self)
exporter: BaseExporter = exporter_cls(self.identifier)

grouped_image_sources = df.select(
"group_id", "source_name", "file"
).unique()

# Filter out rows without annotations and ensure we have at least one row per group_id (images without annotations)
df = (
df.with_columns(
[
pl.col("annotation").is_not_null().alias("has_annotation"),
pl.col("group_id")
.cumcount()
.over("group_id")
.alias("first_occur"),
]
)
.pipe(
lambda df: (
df.filter(pl.col("has_annotation")).vstack(
df.filter(
~pl.col("group_id").is_in(
df.filter(pl.col("has_annotation"))
.select("group_id")
.unique()["group_id"]
)
).unique(subset=["group_id"], keep="first")
)
)
)
.sort(["row_idx"])
.select(
[
col
for col in df.columns
if col not in ["has_annotation", "row_idx", "first_occur"]
]
)
exporter.transform(
prepared_ldf=prepared_ldf,
output_path=out_path,
max_partition_size_gb=max_partition_size_gb,
)

splits = self.get_splits()
assert splits is not None

current_size = 0
part = 0 if max_partition_size_gb else None
max_partition_size = (
max_partition_size_gb * 1024**3 if max_partition_size_gb else None
)

# Group the full dataframe by group_id
df = df.group_by("group_id", maintain_order=True)
copied_files = set()

for group_id, group_df in df:
matched_df = grouped_image_sources.filter(
pl.col("group_id") == group_id
)
group_files = matched_df.get_column("file").to_list()
group_source_names = matched_df.get_column("source_name").to_list()

split = next(
(
s
for s, group_ids in splits.items()
if group_id in group_ids
),
None,
)
assert split is not None

group_total_size = sum(Path(f).stat().st_size for f in group_files)
annotation_records = []

for row in group_df.iter_rows(named=True):
task_name = row["task_name"]
class_name = row["class_name"]
instance_id = row["instance_id"]
task_type = row["task_type"]
ann_str = row["annotation"]

source_to_file = {
name: str(
(
Path("images")
/ f"{image_indices.setdefault(Path(f), len(image_indices))}{Path(f).suffix}"
).as_posix()
)
for name, f in zip(
group_source_names, group_files, strict=True
# Detect whether partitioned export was produced and the max part index
def _detect_last_part(base: Path, ds_id: str) -> int | None:
max_idx: int | None = None
prefix = f"{ds_id}_part"
for p in base.iterdir():
if p.is_dir() and p.name.startswith(prefix):
try:
idx = int(p.name[len(prefix) :])
except ValueError:
continue
max_idx = (
idx if (max_idx is None or idx > max_idx) else max_idx
)
}

record = {
"files" if len(group_source_names) > 1 else "file": (
source_to_file
if len(group_source_names) > 1
else source_to_file[group_source_names[0]]
),
"task_name": task_name,
}

if ann_str is not None:
data = json.loads(ann_str)
annotation_base = {
"instance_id": instance_id,
"class": class_name,
}
if task_type in {
"instance_segmentation",
"segmentation",
"boundingbox",
"keypoints",
}:
annotation_base[task_type] = data
elif task_type.startswith("metadata/"):
annotation_base["metadata"] = {task_type[9:]: data}
record["annotation"] = annotation_base

annotation_records.append(record)

annotations_size = sum(
sys.getsizeof(r) for r in annotation_records
)

if (
max_partition_size
and part is not None
and current_size + group_total_size + annotations_size
> max_partition_size
):
_dump_annotations(
annotations, output_path, self.identifier, part
)
current_size = 0
part += 1
annotations = {"train": [], "val": [], "test": []}

if max_partition_size:
data_path = (
output_path
/ f"{self.identifier}_part{part}"
/ split
/ "images"
)
else:
data_path = output_path / self.identifier / split / "images"
data_path.mkdir(parents=True, exist_ok=True)
return max_idx

for file in group_files:
file_path = Path(file)
if file_path not in copied_files:
copied_files.add(file_path)
image_index = image_indices[file_path]
dest_file = data_path / f"{image_index}{file_path.suffix}"
shutil.copy(file_path, dest_file)
current_size += file_path.stat().st_size

annotations[split].extend(annotation_records)
current_size += annotations_size

_dump_annotations(annotations, output_path, self.identifier, part)
last_part = _detect_last_part(out_path, self.identifier)

if zip_output:
archives = []
if max_partition_size:
assert part is not None
for i in range(part + 1):
folder = output_path / f"{self.identifier}_part{i}"
if folder.exists():
archive_file = shutil.make_archive(
str(folder), "zip", root_dir=folder
)
archives.append(Path(archive_file))
else:
folder = output_path / self.identifier
if folder.exists():
archive_file = shutil.make_archive(
str(folder), "zip", root_dir=folder
)
archives.append(Path(archive_file))
if len(archives) > 1:
archives = exporter.create_zip_output(
max_partition_size=max_partition_size_gb,
output_path=out_path,
part=last_part,
)
if isinstance(archives, list):
logger.info(
f"Dataset successfully exported to: {[str(p) for p in archives]}"
)
return archives
logger.info(f"Dataset successfully exported to: {archives[0]}")
return archives[0]
logger.info(f"Dataset successfully exported to: {archives}")
return archives

logger.info(f"Dataset successfully exported to: {output_path}")
return output_path
logger.info(f"Dataset successfully exported to: {out_path}")
return out_path

def get_statistics(
self, sample_size: int | None = None, view: str | None = None
Expand Down
5 changes: 5 additions & 0 deletions luxonis_ml/data/exporters/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
from .base_exporter import BaseExporter
from .native_exporter import NativeExporter
from .prepared_ldf import PreparedLDF

__all__ = ["BaseExporter", "NativeExporter", "PreparedLDF"]
Loading