Skip to content

Commit fe8fc0e

Browse files
authored
Extend dataset export functionality to all different DatasetTypes (#374)
1 parent 2fa12ae commit fe8fc0e

24 files changed

+3603
-258
lines changed

luxonis_ml/data/datasets/luxonis_dataset.py

Lines changed: 91 additions & 239 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,6 @@
11
import json
22
import math
33
import shutil
4-
import sys
54
from collections import defaultdict
65
from collections.abc import Iterable, Mapping, Sequence
76
from concurrent.futures import ThreadPoolExecutor
@@ -18,9 +17,31 @@
1817
from semver.version import Version
1918
from typing_extensions import Self, override
2019

20+
from luxonis_ml.data.exporters import (
21+
BaseExporter,
22+
ClassificationDirectoryExporter,
23+
CocoExporter,
24+
CreateMLExporter,
25+
DarknetExporter,
26+
NativeExporter,
27+
PreparedLDF,
28+
SegmentationMaskDirectoryExporter,
29+
TensorflowCSVExporter,
30+
VOCExporter,
31+
YoloV4Exporter,
32+
YoloV6Exporter,
33+
YoloV8Exporter,
34+
YoloV8InstanceSegmentationExporter,
35+
YoloV8KeypointsExporter,
36+
)
37+
from luxonis_ml.data.exporters.exporter_utils import (
38+
ExporterSpec,
39+
create_zip_output,
40+
)
2141
from luxonis_ml.data.utils import (
2242
BucketStorage,
2343
BucketType,
44+
COCOFormat,
2445
ParquetFileManager,
2546
UpdateMode,
2647
get_class_distributions,
@@ -1497,266 +1518,97 @@ def export(
14971518
partition) after export. Default is False.
14981519
@rtype: Union[Path, List[Path]]
14991520
@return: Path(s) to the ZIP file(s) containing the exported
1500-
dataset.
1521+
dataset (if zip_output=True). Otherwise, the output
1522+
directory.
15011523
"""
1502-
1503-
def _dump_annotations(
1504-
annotations: dict[str, list[Any]],
1505-
output_path: Path,
1506-
identifier: str,
1507-
part: int | None = None,
1508-
) -> None:
1509-
for split_name, annotation_data in annotations.items():
1510-
if part is not None:
1511-
split_path = (
1512-
output_path / f"{identifier}_part{part}" / split_name
1513-
)
1514-
else:
1515-
split_path = output_path / identifier / split_name
1516-
split_path.mkdir(parents=True, exist_ok=True)
1517-
with open(split_path / "annotations.json", "w") as f:
1518-
json.dump(annotation_data, f, indent=4)
1519-
1520-
def resolve_path(
1521-
img_path: str | Path, uuid: str, media_path: str
1522-
) -> str:
1523-
img_path = Path(img_path)
1524-
if img_path.exists():
1525-
return str(img_path)
1526-
1527-
ext = img_path.suffix.lstrip(".")
1528-
fallback = Path(media_path) / f"{uuid}.{ext}"
1529-
if not fallback.exists():
1530-
raise FileNotFoundError(f"Missing image: {fallback}")
1531-
return str(fallback)
1532-
1533-
if dataset_type is not DatasetType.NATIVE:
1524+
EXPORTER_MAP: dict[DatasetType, ExporterSpec] = {
1525+
DatasetType.NATIVE: ExporterSpec(NativeExporter, {}),
1526+
DatasetType.COCO: ExporterSpec(
1527+
CocoExporter,
1528+
{
1529+
"format": COCOFormat.ROBOFLOW,
1530+
"skeletons": getattr(self.metadata, "skeletons", None),
1531+
},
1532+
),
1533+
DatasetType.YOLOV8BOUNDINGBOX: ExporterSpec(YoloV8Exporter, {}),
1534+
DatasetType.YOLOV8INSTANCESEGMENTATION: ExporterSpec(
1535+
YoloV8InstanceSegmentationExporter, {}
1536+
),
1537+
DatasetType.YOLOV8KEYPOINTS: ExporterSpec(
1538+
YoloV8KeypointsExporter, {}
1539+
),
1540+
DatasetType.YOLOV6: ExporterSpec(YoloV6Exporter, {}),
1541+
DatasetType.YOLOV4: ExporterSpec(YoloV4Exporter, {}),
1542+
DatasetType.DARKNET: ExporterSpec(DarknetExporter, {}),
1543+
DatasetType.CLSDIR: ExporterSpec(
1544+
ClassificationDirectoryExporter, {}
1545+
),
1546+
DatasetType.SEGMASK: ExporterSpec(
1547+
SegmentationMaskDirectoryExporter, {}
1548+
),
1549+
DatasetType.VOC: ExporterSpec(VOCExporter, {}),
1550+
DatasetType.CREATEML: ExporterSpec(CreateMLExporter, {}),
1551+
DatasetType.TFCSV: ExporterSpec(TensorflowCSVExporter, {}),
1552+
}
1553+
spec = EXPORTER_MAP.get(dataset_type)
1554+
if spec is None:
15341555
raise NotImplementedError(
1535-
"Only 'NATIVE' dataset export is supported at the moment"
1556+
f"Unsupported export format: {dataset_type}"
15361557
)
1558+
15371559
logger.info(
15381560
f"Exporting '{self.identifier}' to '{dataset_type.name}' format"
15391561
)
15401562

1541-
splits = self.get_splits()
1542-
if splits is None:
1543-
raise ValueError("Cannot export dataset without splits")
1544-
1545-
output_path = Path(output_path)
1546-
if output_path.exists():
1563+
out_path = Path(output_path)
1564+
if out_path.exists():
15471565
raise ValueError(
1548-
f"Export path '{output_path}' already exists. Please remove it first."
1566+
f"Export path '{out_path}' already exists. Please remove it first."
15491567
)
1550-
output_path.mkdir(parents=True)
1551-
image_indices = {}
1552-
annotations = {"train": [], "val": [], "test": []}
1553-
df = self._load_df_offline(raise_when_empty=True)
1554-
1555-
# Capture the original order. Assume annotations are ordered if instance_id's were not specified.
1556-
df = df.with_row_count("row_idx").with_columns(
1557-
pl.col("row_idx").min().over("file").alias("first_occur")
1558-
)
1568+
out_path.mkdir(parents=True)
15591569

1560-
# Resolve file paths to ensure they are absolute and exist
1561-
df = df.with_columns(
1562-
pl.struct(["file", "uuid"])
1563-
.map_elements(
1564-
lambda row: resolve_path(
1565-
row["file"], row["uuid"], str(self.media_path)
1566-
),
1567-
return_dtype=pl.Utf8,
1568-
)
1569-
.alias("file")
1570-
)
1571-
1572-
grouped_image_sources = df.select(
1573-
"group_id", "source_name", "file"
1574-
).unique()
1570+
prepared_ldf = PreparedLDF.from_dataset(self)
15751571

1576-
# Filter out rows without annotations and ensure we have at least one row per group_id (images without annotations)
1577-
df = (
1578-
df.with_columns(
1579-
[
1580-
pl.col("annotation").is_not_null().alias("has_annotation"),
1581-
pl.col("group_id")
1582-
.cumcount()
1583-
.over("group_id")
1584-
.alias("first_occur"),
1585-
]
1586-
)
1587-
.pipe(
1588-
lambda df: (
1589-
df.filter(pl.col("has_annotation")).vstack(
1590-
df.filter(
1591-
~pl.col("group_id").is_in(
1592-
df.filter(pl.col("has_annotation"))
1593-
.select("group_id")
1594-
.unique()["group_id"]
1595-
)
1596-
).unique(subset=["group_id"], keep="first")
1597-
)
1598-
)
1599-
)
1600-
.sort(["row_idx"])
1601-
.select(
1602-
[
1603-
col
1604-
for col in df.columns
1605-
if col not in ["has_annotation", "row_idx", "first_occur"]
1606-
]
1607-
)
1572+
exporter: BaseExporter = spec.cls(
1573+
self.identifier, out_path, max_partition_size_gb, **spec.kwargs
16081574
)
16091575

1610-
splits = self.get_splits()
1611-
assert splits is not None
1612-
1613-
current_size = 0
1614-
part = 0 if max_partition_size_gb else None
1615-
max_partition_size = (
1616-
max_partition_size_gb * 1024**3 if max_partition_size_gb else None
1617-
)
1618-
1619-
# Group the full dataframe by group_id
1620-
df = df.group_by("group_id", maintain_order=True)
1621-
copied_files = set()
1622-
1623-
for group_id, group_df in df:
1624-
matched_df = grouped_image_sources.filter(
1625-
pl.col("group_id") == group_id
1626-
)
1627-
group_files = matched_df.get_column("file").to_list()
1628-
group_source_names = matched_df.get_column("source_name").to_list()
1629-
1630-
split = next(
1631-
(
1632-
s
1633-
for s, group_ids in splits.items()
1634-
if group_id in group_ids
1635-
),
1636-
None,
1637-
)
1638-
assert split is not None
1639-
1640-
group_total_size = sum(Path(f).stat().st_size for f in group_files)
1641-
annotation_records = []
1642-
1643-
for row in group_df.iter_rows(named=True):
1644-
task_name = row["task_name"]
1645-
class_name = row["class_name"]
1646-
instance_id = row["instance_id"]
1647-
task_type = row["task_type"]
1648-
ann_str = row["annotation"]
1649-
1650-
source_to_file = {
1651-
name: str(
1652-
(
1653-
Path("images")
1654-
/ f"{image_indices.setdefault(Path(f), len(image_indices))}{Path(f).suffix}"
1655-
).as_posix()
1656-
)
1657-
for name, f in zip(
1658-
group_source_names, group_files, strict=True
1576+
exporter.export(prepared_ldf=prepared_ldf)
1577+
1578+
# Detect whether partitioned export was produced and the max part index
1579+
def _detect_last_part(base: Path, ds_id: str) -> int | None:
1580+
max_idx: int | None = None
1581+
prefix = f"{ds_id}_part"
1582+
for p in base.iterdir():
1583+
if p.is_dir() and p.name.startswith(prefix):
1584+
try:
1585+
idx = int(p.name[len(prefix) :])
1586+
except ValueError:
1587+
continue
1588+
max_idx = (
1589+
idx if (max_idx is None or idx > max_idx) else max_idx
16591590
)
1660-
}
1661-
1662-
record = {
1663-
"files" if len(group_source_names) > 1 else "file": (
1664-
source_to_file
1665-
if len(group_source_names) > 1
1666-
else source_to_file[group_source_names[0]]
1667-
),
1668-
"task_name": task_name,
1669-
}
1591+
return max_idx
16701592

1671-
if ann_str is not None:
1672-
data = json.loads(ann_str)
1673-
annotation_base = {
1674-
"instance_id": instance_id,
1675-
"class": class_name,
1676-
}
1677-
if task_type in {
1678-
"instance_segmentation",
1679-
"segmentation",
1680-
"boundingbox",
1681-
"keypoints",
1682-
}:
1683-
annotation_base[task_type] = data
1684-
elif task_type.startswith("metadata/"):
1685-
annotation_base["metadata"] = {task_type[9:]: data}
1686-
record["annotation"] = annotation_base
1687-
1688-
annotation_records.append(record)
1689-
1690-
annotations_size = sum(
1691-
sys.getsizeof(r) for r in annotation_records
1692-
)
1693-
1694-
if (
1695-
max_partition_size
1696-
and part is not None
1697-
and current_size + group_total_size + annotations_size
1698-
> max_partition_size
1699-
):
1700-
_dump_annotations(
1701-
annotations, output_path, self.identifier, part
1702-
)
1703-
current_size = 0
1704-
part += 1
1705-
annotations = {"train": [], "val": [], "test": []}
1706-
1707-
if max_partition_size:
1708-
data_path = (
1709-
output_path
1710-
/ f"{self.identifier}_part{part}"
1711-
/ split
1712-
/ "images"
1713-
)
1714-
else:
1715-
data_path = output_path / self.identifier / split / "images"
1716-
data_path.mkdir(parents=True, exist_ok=True)
1717-
1718-
for file in group_files:
1719-
file_path = Path(file)
1720-
if file_path not in copied_files:
1721-
copied_files.add(file_path)
1722-
image_index = image_indices[file_path]
1723-
dest_file = data_path / f"{image_index}{file_path.suffix}"
1724-
shutil.copy(file_path, dest_file)
1725-
current_size += file_path.stat().st_size
1726-
1727-
annotations[split].extend(annotation_records)
1728-
current_size += annotations_size
1729-
1730-
_dump_annotations(annotations, output_path, self.identifier, part)
1593+
last_part = _detect_last_part(out_path, self.identifier)
17311594

17321595
if zip_output:
1733-
archives = []
1734-
if max_partition_size:
1735-
assert part is not None
1736-
for i in range(part + 1):
1737-
folder = output_path / f"{self.identifier}_part{i}"
1738-
if folder.exists():
1739-
archive_file = shutil.make_archive(
1740-
str(folder), "zip", root_dir=folder
1741-
)
1742-
archives.append(Path(archive_file))
1743-
else:
1744-
folder = output_path / self.identifier
1745-
if folder.exists():
1746-
archive_file = shutil.make_archive(
1747-
str(folder), "zip", root_dir=folder
1748-
)
1749-
archives.append(Path(archive_file))
1750-
if len(archives) > 1:
1596+
archives = create_zip_output(
1597+
max_partition_size=max_partition_size_gb,
1598+
output_path=out_path,
1599+
part=last_part,
1600+
dataset_identifier=self.identifier,
1601+
)
1602+
if isinstance(archives, list):
17511603
logger.info(
17521604
f"Dataset successfully exported to: {[str(p) for p in archives]}"
17531605
)
17541606
return archives
1755-
logger.info(f"Dataset successfully exported to: {archives[0]}")
1756-
return archives[0]
1607+
logger.info(f"Dataset successfully exported to: {archives}")
1608+
return archives
17571609

1758-
logger.info(f"Dataset successfully exported to: {output_path}")
1759-
return output_path
1610+
logger.info(f"Dataset successfully exported to: {out_path}")
1611+
return out_path
17601612

17611613
def get_statistics(
17621614
self, sample_size: int | None = None, view: str | None = None

0 commit comments

Comments
 (0)