diff --git a/.canon.yaml b/.canon.yaml new file mode 100644 index 0000000..add89d7 --- /dev/null +++ b/.canon.yaml @@ -0,0 +1,4 @@ +# This file provides project-level configuration for the canon dev environment utility. https://github.com/viamrobotics/canon +oak: + default: true + image_arm64: ghcr.io/viamrobotics/cloud-build-base:arm64 diff --git a/.gitignore b/.gitignore index ede0945..dcdb48b 100644 --- a/.gitignore +++ b/.gitignore @@ -1,9 +1,9 @@ # top level -.build-venv -.canon.yaml +.build-env .DS_Store .pytest_cache .venv +build module.tar.gz oak-integration-tests diff --git a/integration-tests/tests/oak_d_test.go b/integration-tests/tests/oak_d_test.go index 2587fdc..e08b01c 100755 --- a/integration-tests/tests/oak_d_test.go +++ b/integration-tests/tests/oak_d_test.go @@ -40,11 +40,12 @@ func TestCameraServer(t *testing.T) { test.That(t, err, test.ShouldBeNil) }) - t.Run("Get images method", func(t *testing.T) { + t.Run("Get images method (two images)", func(t *testing.T) { images, metadata, err := cam.Images(timeoutCtx) test.That(t, err, test.ShouldBeNil) test.That(t, images, test.ShouldNotBeNil) test.That(t, metadata, test.ShouldNotBeNil) + test.That(t, len(images), test.ShouldEqual, 2) for _, img := range images { test.That(t, img.SourceName, test.ShouldEqual, componentName) bounds := img.Image.Bounds() @@ -64,7 +65,7 @@ func TestCameraServer(t *testing.T) { t.Run("Reconfigure module", func(t *testing.T) { cfg := resource.Config{ Attributes: utils.AttributeMap{ - "sensors": []string{"color"}, + "sensors": []string{"depth"}, }, } err := cam.Reconfigure(timeoutCtx, resource.Dependencies{}, cfg) diff --git a/src/components/helpers/shared.py b/src/components/helpers/shared.py index e6bef7b..c323972 100644 --- a/src/components/helpers/shared.py +++ b/src/components/helpers/shared.py @@ -1,15 +1,30 @@ from typing import Dict, List, Literal, Optional, Tuple from numpy.typing import NDArray -from depthai_sdk.components.camera_component import CameraComponent +from depthai import CameraBoardSocket + + +def get_socket_from_str(s: str) -> CameraBoardSocket: + if s == "cam_a": + return CameraBoardSocket.CAM_A + elif s == "cam_b": + return CameraBoardSocket.CAM_B + elif s == "cam_c": + return CameraBoardSocket.CAM_C + else: + raise Exception(f"Camera socket '{s}' is not recognized or supported.") class Sensor: - component: Optional[CameraComponent] = None + def get_unique_name(self) -> str: + if self.sensor_type == "color": + return f"{self.socket_str}_rgb" + else: + return f"{self.socket_str}_mono" def __init__( self, - socket: Literal["cam_a", "cam_b", "cam_c"], + socket_str: Literal["cam_a", "cam_b", "cam_c"], sensor_type: Literal["color", "depth"], width: int, height: int, @@ -17,7 +32,8 @@ def __init__( color_order: Literal["rgb", "bgr"] = "rgb", interleaved: bool = False, ): - self.socket = socket + self.socket_str = socket_str + self.socket = get_socket_from_str(socket_str) self.sensor_type = sensor_type self.width = width self.height = height @@ -35,7 +51,7 @@ class Sensors: def __init__(self, sensors: List[Sensor]): self._mapping = dict() for sensor in sensors: - self._mapping[sensor.socket] = sensor + self._mapping[sensor.socket_str] = sensor self.color_sensors = self._find_color_sensors() self.stereo_pair = self._find_stereo_pair() diff --git a/src/components/oak.py b/src/components/oak.py index 9d3bafb..8805364 100644 --- a/src/components/oak.py +++ b/src/components/oak.py @@ -170,6 +170,8 @@ def reconfigure( config (ComponentConfig) dependencies (Mapping[ResourceName, ResourceBase]) """ + self._close() + if self.model == self._oak_d_model: self.oak_cfg = OakDConfig(config) elif self.model == self._oak_ffc_3p_model: @@ -199,10 +201,16 @@ async def close(self) -> None: Implements `close` to free resources on shutdown. """ LOGGER.info("Closing OAK component.") - self.worker_manager.stop() - self.worker_manager.join() + self._close() LOGGER.debug("Closed OAK component.") + def _close(self) -> None: + if self.worker_manager: + self.worker_manager.stop() + self.worker_manager.join() + if self.worker: + self.worker.stop() + async def get_image( self, mime_type: str = "", @@ -278,7 +286,7 @@ async def get_images( """ LOGGER.debug("get_images called") - self._wait_until_worker_running() + await self._wait_until_worker_running() # Split logic into helpers for OAK-D and OAK-D like cameras with FFC-like cameras # Use MessageSynchronizer only for OAK-D-like @@ -352,15 +360,15 @@ async def get_point_cloud( details = "Cannot process PCD. OAK camera not configured for stereo depth outputs. See README for details" raise MethodNotAllowed(method_name="get_point_cloud", details=details) - self._wait_until_worker_running() + await self._wait_until_worker_running() # By default, we do not get point clouds even when color and depth are both requested - # We have to reinitialize the worker/OakCamera to start making point clouds + # We have to reinitialize the worker/pipeline+device to start making point clouds if not self.worker.user_wants_pc: self.worker.user_wants_pc = True self.worker.reset() - await self.worker.configure() - self.worker.start() + self.worker.configure() + await self.worker.start() while not self.worker.running: LOGGER.info("Waiting for worker to restart with pcd configured...") diff --git a/src/components/worker/worker.py b/src/components/worker/worker.py index 0f6501a..04f7f14 100644 --- a/src/components/worker/worker.py +++ b/src/components/worker/worker.py @@ -3,7 +3,6 @@ import math from queue import Empty from threading import Lock -import time from typing import ( Dict, List, @@ -18,11 +17,8 @@ import cv2 import depthai as dai -from depthai_sdk import OakCamera -from depthai_sdk.classes.packet_handlers import BasePacket, QueuePacketHandler -from depthai_sdk.components.camera_component import CameraComponent -from depthai_sdk.components.stereo_component import StereoComponent from numpy.typing import NDArray +import numpy as np from src.components.helpers.shared import CapturedData, Sensor from src.components.helpers.config import OakConfig @@ -40,6 +36,9 @@ } # color camera component only accepts this subset of depthai_sdk.components.camera_helper.colorResolutions MAX_GRPC_MESSAGE_BYTE_COUNT = 4194304 # Update this if the gRPC config ever changes +MAX_COLOR_DEPTH_QUEUE_SIZE = 30 +MAX_PC_QUEUE_SIZE = 5 +MAX_MSG_SYCHRONIZER_MSGS_SIZE = 50 LOGGER = getLogger("viam-oak-worker-logger") @@ -68,22 +67,22 @@ def get_closest_dai_resolution( Union[dai.ColorCameraProperties.SensorResolution, dai.MonoCameraProperties.SensorResolution] """ - def euclidian_distance(width_and_height: Tuple[int, int]) -> float: + def euclidean_distance(width_and_height: Tuple[int, int]) -> float: w1, h1 = width_and_height w2, h2 = width, height return math.sqrt((w1 - w2) ** 2 + (h1 - h2) ** 2) closest = min( dimensions_to_resolution.keys(), - key=euclidian_distance, + key=euclidean_distance, ) return dimensions_to_resolution[closest] -class SensorAndQueueHandler: - def __init__(self, sensor: Sensor, queue_handler: QueuePacketHandler): +class SensorAndQueue: + def __init__(self, sensor: Sensor, q: dai.DataOutputQueue): self.sensor = sensor - self.queue_handler = queue_handler + self.queue = q class Worker: @@ -91,15 +90,19 @@ class Worker: oak.py <-> worker.py <-> DepthAI SDK <-> DepthAI API (C++ core) <-> actual camera """ - oak: Optional[OakCamera] - color_handlers: Optional[List[SensorAndQueueHandler]] - stereo_queue_handler: Optional[QueuePacketHandler] - pc_queue_handler: Optional[QueuePacketHandler] + pipeline: Optional[dai.Pipeline] + device: Optional[dai.Device] + color_sensor_queues: Optional[List[SensorAndQueue]] + depth_queue: Optional[dai.DataOutputQueue] + pcd_queue: Optional[dai.DataOutputQueue] should_exec: bool configured: bool running: bool + depth_stream_name = "depth" + pc_stream_name = "pc" + def __init__( self, oak_config: OakConfig, @@ -109,10 +112,11 @@ def __init__( self.cfg = oak_config self.user_wants_pc = user_wants_pc - self.oak = None - self.color_handlers = None - self.stereo_queue_handler = None - self.pc_queue_handler = None + self.device = None + self.pipeline = None + self.color_sensor_queues = None + self.depth_queue = None + self.pcd_queue = None # Flag for stopping execution and busy loops self.should_exec = True @@ -120,54 +124,195 @@ def __init__( self.configured = False self.running = False - async def configure(self): - await self._init_oak_camera() + def configure(self): + def configure_color() -> Optional[dai.node.ColorCamera]: + """ + Creates and configures color cameras or doesn't + (based on the config). + """ + primary_color_cam = None + for sensor in self.cfg.sensors.color_sensors: + if sensor.sensor_type != "color": + continue + LOGGER.debug("Creating color camera component.") + resolution = get_closest_dai_resolution( + sensor.width, sensor.height, DIMENSIONS_TO_COLOR_RES + ) + LOGGER.debug( + f"Closest color resolution to inputted height & width is: {resolution}" + ) + + # Define source and output + color_cam = self.pipeline.create(dai.node.ColorCamera) + xout_color = self.pipeline.create(dai.node.XLinkOut) - if ( - not self.should_exec - ): # possible SIGTERM between initializing OakCamera and here - LOGGER.debug("Stopping configuration due to termination signal") - return + xout_color.setStreamName(sensor.get_unique_name()) - self._config_oak_camera() + # Properties + color_cam.setPreviewSize(sensor.width, sensor.height) + color_cam.setInterleaved(sensor.interleaved) + color_cam.setBoardSocket(sensor.socket) + color_cam.setFps(sensor.frame_rate) + + # Linking + color_cam.preview.link(xout_color.input) + + if primary_color_cam is None: + primary_color_cam = color_cam + return primary_color_cam + + def configure_stereo() -> Optional[dai.node.StereoDepth]: + """ + Creates and configures the pipeline for stereo depth— or doesn't + (based on the config). + """ + + def make_mono_cam(sensor: Sensor) -> dai.node.MonoCamera: + resolution = get_closest_dai_resolution( + sensor.width, sensor.height, DIMENSIONS_TO_MONO_RES + ) + LOGGER.debug( + f"Closest mono resolution: {resolution}. Inputted width & height: ({sensor.width}, {sensor.height})" + ) + + mono_cam = self.pipeline.create(dai.node.MonoCamera) + mono_cam.setResolution(resolution) + mono_cam.setBoardSocket(sensor.socket) + mono_cam.setFps(sensor.frame_rate) + return mono_cam + + stereo_pair = self.cfg.sensors.stereo_pair + depth = None + if stereo_pair: + LOGGER.debug("Creating stereo depth component.") + mono1, mono2 = stereo_pair + mono_cam_1 = make_mono_cam(mono1) + mono_cam_2 = make_mono_cam(mono2) + + depth = self.pipeline.create(dai.node.StereoDepth) + depth.setDefaultProfilePreset( + dai.node.StereoDepth.PresetMode.HIGH_DENSITY + ) + depth.initialConfig.setMedianFilter(dai.MedianFilter.KERNEL_7x7) + + mono_cam_1.out.link(depth.left) + mono_cam_2.out.link(depth.right) + + xout_depth = self.pipeline.create(dai.node.XLinkOut) + xout_depth.setStreamName(self.depth_stream_name) + depth.disparity.link(xout_depth.input) + + color_sensors = self.cfg.sensors.color_sensors + if color_sensors and len(color_sensors) > 0: + sensor_color_align = color_sensors[0] + depth.setDepthAlign(sensor_color_align.socket) + return depth + + def configure_pc( + color_node: Optional[dai.node.ColorCamera], + depth_node: Optional[dai.node.StereoDepth], + ) -> None: + """ + Creates and configures point clouds— or doesn't + (based on the config) + """ + if depth_node and self.user_wants_pc: + pointcloud = self.pipeline.create(dai.node.PointCloud) + sync = self.pipeline.create(dai.node.Sync) + xOut = self.pipeline.create(dai.node.XLinkOut) + + xOut.setStreamName(self.pc_stream_name) + xOut.input.setBlocking(False) + + # Link the nodes + depth_node.depth.link(pointcloud.inputDepth) + if color_node: + color_node.isp.link(sync.inputs["rgb"]) + pointcloud.outputPointCloud.link(sync.inputs["pcl"]) + sync.out.link(xOut.input) + + self.pipeline = dai.Pipeline() + try: + stage = "color" + color_node = configure_color() + + stage = "stereo" + depth_node = configure_stereo() + + stage = "point cloud" + configure_pc(color_node, depth_node) + + LOGGER.info("Successfully configured pipeline.") + except Exception as e: + msg = f"Error configuring pipeline at stage '{stage}'. Note the error following this log" + resolution_err_substr = "bigger than maximum at current sensor resolution" + calibration_err_substr = "no Camera data available" + if resolution_err_substr in str(e): + msg += ". Please adjust 'height_px' and 'width_px' in your config to an accepted resolution." + elif calibration_err_substr in str(e): + msg += ". If using a non-integrated model, please check that the camera is calibrated properly." + LOGGER.error(msg) + raise e self.message_synchronizer = MessageSynchronizer() self.configured = True - def start(self): - if self.oak: - self.oak.start() - LOGGER.info("Started OakCamera") - else: - raise AttributeError( - "oak.start() called before oak was assigned. Must configure worker first." + async def start(self): + self.device = None + while not self.device and self.should_exec: + try: + self.device = dai.Device(self.pipeline) + LOGGER.debug("Successfully initialized device.") + except Exception as e: + LOGGER.error(f"Error initializing device: {e}") + await asyncio.sleep(1) + + self.color_sensor_queues: List[SensorAndQueue] = [] + for cs in self.cfg.sensors.color_sensors: + q = self.device.getOutputQueue( + cs.get_unique_name(), MAX_COLOR_DEPTH_QUEUE_SIZE, blocking=False + ) + self.color_sensor_queues.append(SensorAndQueue(cs, q)) + + if self.cfg.sensors.stereo_pair: + self.depth_queue = self.device.getOutputQueue( + self.depth_stream_name, MAX_COLOR_DEPTH_QUEUE_SIZE, blocking=False + ) + if self.user_wants_pc: + self.pcd_queue = self.device.getOutputQueue( + self.pc_stream_name, MAX_PC_QUEUE_SIZE, blocking=False + ) + + should_get_synced_color_depth_outputs = len( + self.color_sensor_queues + ) == 1 and bool(self.depth_queue) + if should_get_synced_color_depth_outputs: + self.color_sensor_queues[0].queue.addCallback( + self.message_synchronizer.add_color_msg ) + self.depth_queue.addCallback(self.message_synchronizer.add_depth_msg) + self.message_synchronizer.callbacks_set = True + self.running = True + LOGGER.info("Successfully started camera worker.") async def get_synced_color_depth_data(self) -> Tuple[CapturedData, CapturedData]: if not self.running: raise Exception( "Error getting camera output: Worker is not currently running." ) - if len(self.color_handlers) != 1 or not self.stereo_queue_handler: + if len(self.color_sensor_queues) != 1 or not self.depth_queue: raise Exception( "Precondition error: must have exactly 1 color sensor and 2 mono sensors for synced data" ) while self.should_exec: - self.message_synchronizer._add_msgs_from_queue( - "color", self.color_handlers[0].queue_handler - ) - self.message_synchronizer._add_msgs_from_queue( - "depth", self.stereo_queue_handler - ) - synced_msgs = self.message_synchronizer.get_synced_msgs() if synced_msgs: color_frame, depth_frame, timestamp = ( - synced_msgs["color"].frame, - synced_msgs["depth"].frame, - synced_msgs["color"].get_timestamp().total_seconds(), + synced_msgs["color"].getCvFrame(), + synced_msgs["depth"].getCvFrame(), + synced_msgs["color"].getTimestamp().total_seconds(), ) if self.cfg.sensors.primary_sensor.color_order == "rgb": color_frame = cv2.cvtColor(color_frame, cv2.COLOR_BGR2RGB) @@ -193,20 +338,21 @@ def get_color_output(self, requested_sensor: Sensor): "Error getting color frame: requested sensor is not a color sensor" ) - for sensor_and_queue in self.color_handlers: - sensor = sensor_and_queue.sensor - q = sensor_and_queue.queue_handler - if requested_sensor.socket != sensor.socket: + for sensor_and_queue in self.color_sensor_queues: + this_sensor = sensor_and_queue.sensor + if requested_sensor.socket_str != this_sensor.socket_str: continue + + q = sensor_and_queue.queue try: - msg = q.get_queue().get(block=True, timeout=5) - except Empty: + msg = q.get() + frame = msg.getCvFrame() + except Exception: raise Empty("Error getting color frame: frame queue is empty.") - color_output = msg.frame - if sensor.color_order == "rgb": - color_output = cv2.cvtColor(msg.frame, cv2.COLOR_BGR2RGB) - timestamp = msg.get_timestamp().total_seconds() - return CapturedData(color_output, timestamp) + if this_sensor.color_order == "rgb": + frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB) + timestamp = msg.getTimestamp().total_seconds() + return CapturedData(frame, timestamp) raise Exception( "Error getting color frame: requested sensor not registered in worker" @@ -217,31 +363,41 @@ def get_depth_output(self): raise Exception( "Error getting depth frame: Worker is not currently running." ) - if self.stereo_queue_handler is None: + if not self.depth_queue: raise Exception( - "Error getting depth frame: stereo depth component is not configured" + "Error getting depth frame: stereo depth frame queue is not configured" ) try: - msg = self.stereo_queue_handler.get_queue().get(block=True, timeout=5) + msg = self.depth_queue.get() except Empty: raise Empty("Error getting depth frame: frame queue is empty.") depth_output = self._process_depth_frame( - self.cfg.sensors.stereo_pair[0], msg.frame + self.cfg.sensors.stereo_pair[0], msg.getCvFrame() ) - timestamp = msg.get_timestamp().total_seconds() + timestamp = msg.getTimestamp().total_seconds() return CapturedData(depth_output, timestamp) def get_pcd(self) -> CapturedData: if not self.running: raise Exception("Error getting PCD: Worker is not currently running.") + if not self.user_wants_pc: + raise Exception( + "Critical dev logic error getting PCD: get_pcd() called without toggling user_wants_pc." + ) + if not self.depth_queue: + raise Exception( + "Error getting PCD: depth frame queue not configured for current OAK camera." + ) try: - pc_msg = self.pc_queue_handler.get_queue().get(block=True, timeout=5) - if pc_msg.points.nbytes > MAX_GRPC_MESSAGE_BYTE_COUNT: - pc_output = self._downsample_pcd(pc_msg.points, pc_msg.points.nbytes) + pc_msg = self.pcd_queue.get() + pc_obj = pc_msg["pcl"] + points = pc_obj.getPoints().astype(np.float64) + if points.nbytes > MAX_GRPC_MESSAGE_BYTE_COUNT: + pc_output = self._downsample_pcd(points, points.nbytes) else: - pc_output = pc_msg.points - timestamp = pc_msg.get_timestamp().total_seconds() + pc_output = points + timestamp = pc_msg.getTimestamp().total_seconds() return CapturedData(pc_output, timestamp) except Empty: raise Exception("Error getting PCD: timed out waiting for PCD from queue.") @@ -254,151 +410,20 @@ def stop(self) -> None: self.should_exec = False self.configured = False self.running = False - if self.oak and self.oak.device and not self.oak.device.isClosed(): - self.oak.close() + if self.device: + self.device.close() + if self.pipeline: + self.pipeline = None def reset(self) -> None: LOGGER.debug("Resetting worker.") + self.stop() self.should_exec = True - self.configured = False - self.running = False - if self.oak and self.oak.device and not self.oak.device.isClosed(): - self.oak.close() - self.oak = None - - async def _init_oak_camera(self): - """ - Blocks until the OakCamera is successfully initialized. - """ - self.oak = None - while not self.oak and self.should_exec: - try: - self.oak = OakCamera(device=self.cfg.device_info) - LOGGER.info("Successfully initialized OakCamera.") - except Exception as e: - LOGGER.error(f"Error initializing OakCamera: {e}") - await asyncio.sleep(1) - - def _config_oak_camera(self): - """ - Safely configures the OakCamera. - """ - try: - stage = "color" - color_component = self._configure_color() - - stage = "stereo" - stereo_component = self._configure_stereo(color_component) - - stage = "point cloud" - self._configure_pc(color_component, stereo_component) - - LOGGER.info("Successfully configured OakCamera") - except Exception as e: - msg = f"Error configuring OakCamera at stage '{stage}': {e}" - resolution_err_substr = "bigger than maximum at current sensor resolution" - calibration_err_substr = "no Camera data available" - if resolution_err_substr in str(e): - msg += ". Please adjust 'height_px' and 'width_px' in your config to an accepted resolution." - elif calibration_err_substr in str(e): - msg += ". If using a non-integrated model, please check that the camera is calibrated properly." - LOGGER.error(msg) - - def _configure_color(self) -> Optional[CameraComponent]: - """ - Creates and configures color components— or doesn't - (based on the config). - """ - primary_color_component = None - color_handlers = [] - for sensor in self.cfg.sensors.color_sensors: - if sensor.sensor_type != "color": - continue - LOGGER.debug("Creating color camera component.") - resolution = get_closest_dai_resolution( - sensor.width, sensor.height, DIMENSIONS_TO_COLOR_RES - ) - LOGGER.debug( - f"Closest color resolution to inputted height & width is: {resolution}" - ) - color = self.oak.camera( - f"{sensor.socket},c", fps=sensor.frame_rate, resolution=resolution - ) - if not isinstance(color.node, dai.node.ColorCamera): - raise Exception( - f"Underlying DepthAI or hardware configuration error: expected node to be ColorCamera, got {type(color)} for socket {sensor.socket}" - ) - if primary_color_component is None: - primary_color_component = color - color.node.setPreviewSize(sensor.width, sensor.height) - color.node.setVideoSize(sensor.width, sensor.height) - color.node.setInterleaved(sensor.interleaved) - if sensor.color_order == "bgr": - color.node.setColorOrder(dai.ColorCameraProperties.ColorOrder.BGR) - elif sensor.color_order == "rgb": - color.node.setColorOrder(dai.ColorCameraProperties.ColorOrder.RGB) - color_handlers.append( - SensorAndQueueHandler(sensor, self.oak.queue(color, 30)) - ) - self.color_handlers = color_handlers - return primary_color_component # stereo and pc will be aligned to the primary color sensor if requested & possible - - def _configure_stereo( - self, color_component: CameraComponent - ) -> Optional[StereoComponent]: - """ - Creates and configures stereo depth component— or doesn't - (based on the config). - """ - - def make_mono_cam( - mono: Sensor, - ) -> Tuple[CameraComponent, dai.MonoCameraProperties.SensorResolution]: - resolution = get_closest_dai_resolution( - mono.width, mono.height, DIMENSIONS_TO_MONO_RES - ) - LOGGER.debug( - f"Closest mono resolution: {resolution}. Inputted width & height: ({mono.width}, {mono.height})" - ) - mono_component = self.oak.camera( - mono.socket, - resolution, - mono.frame_rate, - ) - return mono_component, resolution - - stereo_pair = self.cfg.sensors.stereo_pair - if stereo_pair: - LOGGER.debug("Creating stereo depth component.") - mono1, mono2 = stereo_pair - mono1_component, mono1_res = make_mono_cam(mono1) - mono2_component, mono2_res = make_mono_cam(mono2) - if mono1_res != mono2_res: - LOGGER.warn( - f"Stereo depth pair mono cams configured with different resolutions. Defaulting to {mono1_res}" - ) - stereo = self.oak.stereo( - mono1_res, mono1.frame_rate, mono1_component, mono2_component - ) - if color_component: - # Ensures camera alignment between color and depth - stereo.config_stereo(align=color_component) - self.stereo_queue_handler = self.oak.queue(stereo, max_size=30) - return stereo - - def _configure_pc( - self, color_component: CameraComponent, stereo_component: StereoComponent - ): - """ - Creates and configures point cloud component— or doesn't - (based on the config) - """ - if stereo_component: - LOGGER.debug("Creating point cloud component.") - pc_component = self.oak.create_pointcloud(stereo_component, color_component) - self.pc_queue_handler = self.oak.queue(pc_component, 5) def _process_depth_frame(self, sensor: Sensor, arr: NDArray) -> NDArray: + if arr.dtype != np.uint16: + arr = arr.astype(np.uint16) + if arr.shape[0] > sensor.height and arr.shape[1] > sensor.width: LOGGER.debug( f"Outputted depth map's shape is greater than specified in config: {arr.shape}; Manually resizing to {(sensor.height, sensor.width)}." @@ -416,27 +441,47 @@ def _downsample_pcd(self, arr: NDArray, byte_count: int) -> NDArray: LOGGER.warn( f"PCD bytes ({byte_count}) > max gRPC bytes count ({MAX_GRPC_MESSAGE_BYTE_COUNT}). Subsampling by 1/{factor}." ) - arr = arr[::factor, ::factor, :] + if arr.ndim == 2: + arr = arr[::factor, :] + else: + raise ValueError(f"Unexpected point cloud array dimensions: {arr.ndim}") return arr class MessageSynchronizer: """ - MessageSynchronizer manages synchronization of frame messages for color and depth data from OakCamera packet queues, + MessageSynchronizer manages synchronization of frame messages for color and depth data packet queues, maintaining an ordered dictionary of messages keyed chronologically by sequence number. """ - MAX_MSGS_SIZE = 50 - msgs: OrderedDict[int, Dict[str, BasePacket]] + msgs: OrderedDict[int, Dict[str, dai.ADatatype]] write_lock: Lock + callbacks_set: bool def __init__(self): # msgs maps frame sequence number to a dictionary that maps frame_type (i.e. "color" or "depth") to a data packet self.msgs = OrderedDict() self.write_lock = Lock() + self.callbacks_set = False + + def get_synced_msgs(self) -> Optional[Dict[str, dai.ADatatype]]: + if not self.callbacks_set: + raise Exception( + "Data queue callbacks were not set. Cannot get synced messages." + ) + for sync_msgs in self.msgs.values(): + if len(sync_msgs) == 2: # has both color and depth + return sync_msgs + return None - def add_msg( - self, msg: BasePacket, frame_type: Literal["color", "depth"], seq: int + def add_color_msg(self, msg: dai.ADatatype) -> None: + self._add_msg(msg, "color", msg.getSequenceNum()) + + def add_depth_msg(self, msg: dai.ADatatype) -> None: + self._add_msg(msg, "depth", msg.getSequenceNum()) + + def _add_msg( + self, msg: dai.ADatatype, frame_type: Literal["color", "depth"], seq: int ) -> None: with self.write_lock: # Update recency if previously already stored in dict @@ -446,41 +491,6 @@ def add_msg( self.msgs.setdefault(seq, {})[frame_type] = msg self._cleanup_msgs() - def get_synced_msgs(self) -> Optional[Dict[str, BasePacket]]: - for sync_msgs in self.msgs.values(): - if len(sync_msgs) == 2: # has both color and depth - return sync_msgs - return None - - def _add_msgs_from_queue( - self, frame_type: Literal["color", "depth"], queue_handler: QueuePacketHandler - ) -> None: - queue_obj = queue_handler.get_queue() - with queue_obj.mutex: - q_snapshot = list(queue_obj.queue) - - for msg in q_snapshot: - self.add_msg(msg, frame_type, msg.get_sequence_num()) - - async def get_most_recent_msg( - self, q_handler: QueuePacketHandler, frame_type: Literal["color", "depth"] - ) -> Optional[BasePacket]: - self._add_msgs_from_queue(frame_type, q_handler) - attempts = 0 - while len(self.msgs) < 1: - if attempts > 3: - raise Exception( - "Error getting messages from queue: exceeded max attempts, yet still no messages received" - ) - self._add_msgs_from_queue(frame_type, q_handler) - attempts += 1 - await asyncio.sleep(0.1) - # Traverse in reverse to get the most recent - for msg_dict in reversed(self.msgs.values()): - if frame_type in msg_dict: - return msg_dict[frame_type] - raise Exception(f"No message of type '{frame_type}' in frame queue.") - def _cleanup_msgs(self): - while len(self.msgs) > self.MAX_MSGS_SIZE: + while len(self.msgs) > MAX_MSG_SYCHRONIZER_MSGS_SIZE: self.msgs.popitem(last=False) # remove oldest item diff --git a/src/components/worker/worker_manager.py b/src/components/worker/worker_manager.py index 5c5f9d0..2183e70 100644 --- a/src/components/worker/worker_manager.py +++ b/src/components/worker/worker_manager.py @@ -30,20 +30,16 @@ def run(self): async def check_health(self) -> None: self.logger.debug("Starting worker manager.") - await self.worker.configure() - self.worker.start() + self.worker.configure() + await self.worker.start() while not self._stop_event.is_set(): self.logger.debug("Checking if worker must be restarted.") - if ( - self.worker.oak - and self.worker.oak.device - and self.worker.oak.device.isClosed() - ): + if self.worker.device and self.worker.device.isClosed(): self.logger.info("Camera is closed. Stopping and restarting worker.") self.worker.reset() - await self.worker.configure() - self.worker.start() + self.worker.configure() + await self.worker.start() await asyncio.sleep(3) def stop(self):