diff --git a/src/lidar_visualizer/datasets/helipr.py b/src/lidar_visualizer/datasets/helipr.py index 8874bce..beec336 100644 --- a/src/lidar_visualizer/datasets/helipr.py +++ b/src/lidar_visualizer/datasets/helipr.py @@ -22,6 +22,7 @@ # SOFTWARE. import importlib import os +import struct import sys from pathlib import Path @@ -65,50 +66,18 @@ def __init__(self, data_dir: Path, *_, **__): # Obtain the pointcloud reader for the given data folder if self.sequence_id == "Avia": - self.fields = [ - ("x", np.float32), - ("y", np.float32), - ("z", np.float32), - ("reflectivity", np.uint8), - ("tag", np.uint8), - ("line", np.uint8), - ("offset_time", np.uint32), - ] - + self.format_string = "fffBBBL" + self.intensity_channel = None elif self.sequence_id == "Aeva": - self.fields = [ - ("x", np.float32), - ("y", np.float32), - ("z", np.float32), - ("reflectivity", np.float32), - ("velocity", np.float32), - ("time_offset_ns", np.int32), - ("line_index", np.uint8), - ("intensity", np.float32), - ] - + self.format_string = "ffffflBf" + self.format_string_no_intensity = "ffffflB" + self.intensity_channel = 7 elif self.sequence_id == "Ouster": - self.fields = [ - ("x", np.float32), - ("y", np.float32), - ("z", np.float32), - ("intensity", np.float32), - ("t", np.uint32), - ("reflectivity", np.uint16), - ("ring", np.uint16), - ("ambient", np.uint16), - ] - + self.format_string = "ffffIHHH" + self.intensity_channel = 3 elif self.sequence_id == "Velodyne": - self.fields = [ - ("x", np.float32), - ("y", np.float32), - ("z", np.float32), - ("intensity", np.float32), - ("ring", np.uint16), - ("time", np.float32), - ] - + self.format_string = "ffffHf" + self.intensity_channel = 3 else: print("[ERROR] Unsupported LiDAR Type") sys.exit() @@ -119,45 +88,35 @@ def __len__(self): def __getitem__(self, idx): return self.read_point_cloud(idx) - def get_intensity_channel(self): - matches = [index for index, (name, _) in enumerate(self.fields) if name == "intensity"] - return matches[0] if len(matches) > 0 else None - - def read_point_cloud(self, idx: int): + def get_data(self, idx: int): file_path = self.scan_files[idx] - - intensity_channel = self.get_intensity_channel() - dtype = np.dtype(self.fields) + list_lines = [] # Special case, see https://github.com/minwoo0611/HeLiPR-File-Player/blob/e8d95e390454ece1415ae9deb51515f63730c10a/src/ROSThread.cpp#L632 if self.sequence_id == "Aeva" and int(Path(file_path).stem) <= 1691936557946849179: - intensity_channel = None - dtype = np.dtype( - [(name, np_type) for name, np_type in self.fields if name != "intensity"] - ) + self.intensity_channel = None + format_string = self.format_string_no_intensity + else: + format_string = self.format_string + + chunk_size = struct.calcsize(f"={format_string}") + with open(file_path, "rb") as f: + binary = f.read() + offset = 0 + while offset < len(binary): + list_lines.append(struct.unpack_from(f"={format_string}", binary, offset)) + offset += chunk_size + data = np.stack(list_lines) + return data + def read_point_cloud(self, idx: int): + data = self.get_data(idx) + points = data[:, :3] scan = self.o3d.geometry.PointCloud() - if intensity_channel is not None: - points_xyzi = np.stack( - [ - [line[0], line[1], line[2], line[intensity_channel]] - for line in np.fromfile(file_path, dtype=dtype).tolist() - ] - ) - points = points_xyzi[:, 0:3] - intensity = points_xyzi[:, -1] + scan.points = self.o3d.utility.Vector3dVector(points) + if self.intensity_channel is not None: + intensity = data[:, self.intensity_channel] intensity = (intensity - intensity.min()) / (intensity.max() - intensity.min()) colors = self.cmap(intensity)[:, :3].reshape(-1, 3) - scan.points = self.o3d.utility.Vector3dVector(points) scan.colors = self.o3d.utility.Vector3dVector(colors) - - else: - points = np.stack( - [ - [line[0], line[1], line[2]] - for line in np.fromfile(file_path, dtype=dtype).tolist() - ] - ) - scan.points = self.o3d.utility.Vector3dVector(points) - return scan