From 2d05c7533b33365bd9d2fa9d65932476948a67c0 Mon Sep 17 00:00:00 2001 From: Daniil Pastukhov Date: Tue, 16 May 2023 00:34:50 +0200 Subject: [PATCH 1/8] Use xout.name in RecordAction.setup() --- .../src/depthai_sdk/trigger_action/actions/record_action.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/depthai_sdk/src/depthai_sdk/trigger_action/actions/record_action.py b/depthai_sdk/src/depthai_sdk/trigger_action/actions/record_action.py index 7458b1c86..f3b450d9b 100644 --- a/depthai_sdk/src/depthai_sdk/trigger_action/actions/record_action.py +++ b/depthai_sdk/src/depthai_sdk/trigger_action/actions/record_action.py @@ -137,7 +137,7 @@ def on_new_packets(self, packets: Dict[str, FramePacket]): self.buffers_status['ready']['after_t'].append(buffer_id) def setup(self, device: dai.Device, xouts: List['XoutFrames']): - self.stream_names = [xout.frames.name for xout in xouts] # e.g., [color_video, color_bitstream] + self.stream_names = [xout.name for xout in xouts] # e.g., [color_video, color_bitstream] logging.debug(f'RecordAction: stream_names = {self.stream_names}') self.recorder.update(self.path, device, xouts) self._run_thread() From 4f89ec160e74e7c21b7721900e52ccb2e3e01b77 Mon Sep 17 00:00:00 2001 From: Daniil Pastukhov Date: Tue, 16 May 2023 00:35:10 +0200 Subject: [PATCH 2/8] Use self.name in XoutFrames.new_msg() when available --- depthai_sdk/src/depthai_sdk/oak_outputs/xout/xout_frames.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/depthai_sdk/src/depthai_sdk/oak_outputs/xout/xout_frames.py b/depthai_sdk/src/depthai_sdk/oak_outputs/xout/xout_frames.py index 9ab96360c..0ceefae82 100644 --- a/depthai_sdk/src/depthai_sdk/oak_outputs/xout/xout_frames.py +++ b/depthai_sdk/src/depthai_sdk/oak_outputs/xout/xout_frames.py @@ -90,7 +90,7 @@ def new_msg(self, name: str, msg) -> None: if self.queue.full(): self.queue.get() # Get one, so queue isn't full - packet = FramePacket(name, + packet = FramePacket(self.name or name, msg, msg.getCvFrame() if cv2 else None, self._visualizer) From 4b788a6550ec8f9320d71936a26c8e22ccce3a34 Mon Sep 17 00:00:00 2001 From: Daniil Pastukhov Date: Mon, 22 May 2023 16:15:39 +0200 Subject: [PATCH 3/8] Add post action callback to RecordAction --- .../trigger_action/actions/record_action.py | 10 +++++++++- 1 file changed, 9 insertions(+), 1 deletion(-) diff --git a/depthai_sdk/src/depthai_sdk/trigger_action/actions/record_action.py b/depthai_sdk/src/depthai_sdk/trigger_action/actions/record_action.py index f3b450d9b..e78452d86 100644 --- a/depthai_sdk/src/depthai_sdk/trigger_action/actions/record_action.py +++ b/depthai_sdk/src/depthai_sdk/trigger_action/actions/record_action.py @@ -24,13 +24,16 @@ def __init__(self, inputs: Union[Component, Callable, List[Union[Component, Callable]]], dir_path: str, duration_before_trigger: Union[int, timedelta], - duration_after_trigger: Union[timedelta, int]): + duration_after_trigger: Union[timedelta, int], + on_finish_callback: Callable[[Union[Path, str]], None] = None, + ): """ Args: inputs: Inputs to record video from. Can be a single component or a list of components. dir_path: Path to directory where video files will be saved. Directory will be created if it doesn't exist. duration_before_trigger: Duration of video to record before trigger event. duration_after_trigger: Duration of video to record after trigger event. + on_finish_callback: Callback function that will be called when recording is finished. Should accept a single argument - path to the folder with recorded video files. """ super().__init__(inputs) self.path = Path(dir_path).resolve() @@ -47,6 +50,7 @@ def __init__(self, self.recorder = VideoRecorder() self.stream_names = [] # will be assigned during recorder setup self.buffers_status: Dict[str, Dict[str, Union[int, List[int]]]] = {} + self.on_finish_callback = on_finish_callback def _run(self): """ @@ -90,6 +94,10 @@ def _run(self): logging.debug(f'Saved to {str(self.path / subfolder)}') self.recorder.close_files() + # Call on_finish_callback if it is specified + if self.on_finish_callback is not None: + self.on_finish_callback(self.path) + def activate(self): # Setup for the current recording wbt_id = self.buffers_status['writing']['before_t'] From 41d4da7f1b3aab619404612ffe1928953b53a019 Mon Sep 17 00:00:00 2001 From: Erol444 Date: Wed, 24 May 2023 19:03:34 +0200 Subject: [PATCH 4/8] Reduce XLinkIn size (for Camera/Stereo control), which reduces 5MB of RAM per component --- depthai_sdk/src/depthai_sdk/components/camera_component.py | 1 + depthai_sdk/src/depthai_sdk/components/stereo_component.py | 1 + 2 files changed, 2 insertions(+) diff --git a/depthai_sdk/src/depthai_sdk/components/camera_component.py b/depthai_sdk/src/depthai_sdk/components/camera_component.py index a414dbdfd..2e867d4e1 100644 --- a/depthai_sdk/src/depthai_sdk/components/camera_component.py +++ b/depthai_sdk/src/depthai_sdk/components/camera_component.py @@ -206,6 +206,7 @@ def __init__(self, self._control_xlink_in = pipeline.create(dai.node.XLinkIn) self._control_xlink_in.setStreamName(f"{self.node.id}_inputControl") self._control_xlink_in.out.link(self.node.inputControl) + self._control_xlink_in.setMaxDataSize(1) # CameraControl message doesn't use any additional data (only metadata) def on_pipeline_started(self, device: dai.Device): if self._control_xlink_in is not None: diff --git a/depthai_sdk/src/depthai_sdk/components/stereo_component.py b/depthai_sdk/src/depthai_sdk/components/stereo_component.py index 45c1bd8a4..72d5fea46 100644 --- a/depthai_sdk/src/depthai_sdk/components/stereo_component.py +++ b/depthai_sdk/src/depthai_sdk/components/stereo_component.py @@ -172,6 +172,7 @@ def __init__(self, self._control_xlink_in = pipeline.create(dai.node.XLinkIn) self._control_xlink_in.setStreamName(f"{self.node.id}_inputControl") self._control_xlink_in.out.link(self.node.inputConfig) + self._control_xlink_in.setMaxDataSize(1) # CameraControl message doesn't use any additional data (only metadata) def on_pipeline_started(self, device: dai.Device): if self._control_xlink_in is not None: From 5863140708b25ceac03fac1662f685bb59fef479 Mon Sep 17 00:00:00 2001 From: Erol444 Date: Fri, 26 May 2023 22:27:00 +0200 Subject: [PATCH 5/8] Fix 2-stage pipelines, updated colorization for facemesh --- .../depthai_sdk/components/nn_component.py | 4 +++ .../nn_models/facemesh_192x192/handler.py | 29 +++++++++++++++++-- 2 files changed, 31 insertions(+), 2 deletions(-) diff --git a/depthai_sdk/src/depthai_sdk/components/nn_component.py b/depthai_sdk/src/depthai_sdk/components/nn_component.py index d2b23c60a..29dd98417 100644 --- a/depthai_sdk/src/depthai_sdk/components/nn_component.py +++ b/depthai_sdk/src/depthai_sdk/components/nn_component.py @@ -167,6 +167,7 @@ def __init__(self, # Here, ImageManip will only crop the high-res frame to correct aspect ratio # (without resizing!) and it also acts as a buffer (by default, its pool size is set to 20). + self.image_manip = pipeline.createImageManip() self.image_manip.setNumFramesPool(20) self._input._stream_input.link(self.image_manip.inputImage) @@ -384,6 +385,9 @@ def _change_resize_mode(self, mode: ResizeMode) -> None: Args: mode (ResizeMode): Resize mode to use """ + if self._is_multi_stage(): + return # We need high-res frames for multi-stage NN, so we can crop them later + self._ar_resize_mode = mode # TODO: uncomment this when depthai 2.21.3 is released. In some cases (eg. diff --git a/depthai_sdk/src/depthai_sdk/nn_models/facemesh_192x192/handler.py b/depthai_sdk/src/depthai_sdk/nn_models/facemesh_192x192/handler.py index a1d57c654..63c75747e 100644 --- a/depthai_sdk/src/depthai_sdk/nn_models/facemesh_192x192/handler.py +++ b/depthai_sdk/src/depthai_sdk/nn_models/facemesh_192x192/handler.py @@ -1,5 +1,6 @@ import numpy as np import depthai as dai +from typing import Tuple from depthai_sdk.classes.nn_results import ImgLandmarks THRESHOLD = 0.5 @@ -13,8 +14,32 @@ def decode(data: dai.NNData) -> ImgLandmarks: ldms = np.array(data.getLayerFp16('conv2d_210')).reshape((468, 3)) landmarks = [] - colors = [] + + all_colors = np.array([ldm[2] for ldm in ldms]) + colors = generate_colors_from_z(all_colors) + for ldm in ldms: - colors.append((0, 0, int(ldm[2]) * 5 + 100)) + # colors.append(calculate_color(ldm[2], minz, maxz)) + # colors.append((0, 0, int(ldm[2]) * 5 + 100)) landmarks.append((ldm[0] / 192, ldm[1] / 192)) return ImgLandmarks(data, landmarks=landmarks, colors=colors) + +def generate_colors_from_z(z_values): + """ + Generates BGR colors based on normalized Z-values. + + Parameters: + z_values (numpy.array): Array of Z-values. + + Returns: + List[Tuple]: List of BGR color tuples. + """ + def normalize_z_values(z_values, minZ, maxZ): + return (z_values - minZ) / (maxZ - minZ) + + def map_to_color(normalized_z_values): + return [(255 - int((1 - value) * 255), 0, 255 - int(value * 255)) for value in normalized_z_values] + minZ = min(z_values) + maxZ = max(z_values) + normalized_z_values = normalize_z_values(z_values, minZ, maxZ) + return map_to_color(normalized_z_values) \ No newline at end of file From ac5b94eee908f5ea00c942e185fa135af771f82e Mon Sep 17 00:00:00 2001 From: Filip Jeretina <59307111+zrezke@users.noreply.github.com> Date: Sat, 27 May 2023 00:56:56 +0200 Subject: [PATCH 6/8] Remove problematic depthai_pipeline_graph submodule (#1041) * remove problematic depthai_pipeline_graph submodule * add depthai_pipeline_graph as a dependency rather than submodule. * move to correct requirements file, fix depthai_pipeline_graph version, add pyside2 to dependencies --- .gitmodules | 3 --- depthai_sdk/requirements.txt | 4 +++- .../components/integrations/depthai_pipeline_graph | 1 - .../src/depthai_sdk/integrations/depthai_pipeline_graph | 1 - depthai_sdk/src/depthai_sdk/oak_camera.py | 2 +- 5 files changed, 4 insertions(+), 7 deletions(-) delete mode 160000 depthai_sdk/src/depthai_sdk/components/integrations/depthai_pipeline_graph delete mode 160000 depthai_sdk/src/depthai_sdk/integrations/depthai_pipeline_graph diff --git a/.gitmodules b/.gitmodules index 699759294..1c40b6d44 100644 --- a/.gitmodules +++ b/.gitmodules @@ -1,6 +1,3 @@ -[submodule "depthai_sdk/src/depthai_sdk/integrations/depthai_pipeline_graph"] - path = depthai_sdk/src/depthai_sdk/integrations/depthai_pipeline_graph - url = https://github.com/luxonis/depthai_pipeline_graph [submodule "resources/depthai_boards"] path = resources/depthai_boards url = https://github.com/luxonis/depthai-boards diff --git a/depthai_sdk/requirements.txt b/depthai_sdk/requirements.txt index 354d6a26b..37f0a330f 100644 --- a/depthai_sdk/requirements.txt +++ b/depthai_sdk/requirements.txt @@ -8,4 +8,6 @@ depthai==2.21.2 PyTurboJPEG==1.6.4 marshmallow==3.17.0 xmltodict -sentry-sdk==1.21.0 \ No newline at end of file +sentry-sdk==1.21.0 +depthai-pipeline-graph==0.0.5 +PySide2 diff --git a/depthai_sdk/src/depthai_sdk/components/integrations/depthai_pipeline_graph b/depthai_sdk/src/depthai_sdk/components/integrations/depthai_pipeline_graph deleted file mode 160000 index de3b807f8..000000000 --- a/depthai_sdk/src/depthai_sdk/components/integrations/depthai_pipeline_graph +++ /dev/null @@ -1 +0,0 @@ -Subproject commit de3b807f88833f261f136bae504c76b2516fa1ec diff --git a/depthai_sdk/src/depthai_sdk/integrations/depthai_pipeline_graph b/depthai_sdk/src/depthai_sdk/integrations/depthai_pipeline_graph deleted file mode 160000 index dd1c2a495..000000000 --- a/depthai_sdk/src/depthai_sdk/integrations/depthai_pipeline_graph +++ /dev/null @@ -1 +0,0 @@ -Subproject commit dd1c2a4956d1948753360029064b016d6df62554 diff --git a/depthai_sdk/src/depthai_sdk/oak_camera.py b/depthai_sdk/src/depthai_sdk/oak_camera.py index 2e594d8f3..c8c35425c 100644 --- a/depthai_sdk/src/depthai_sdk/oak_camera.py +++ b/depthai_sdk/src/depthai_sdk/oak_camera.py @@ -517,7 +517,7 @@ def show_graph(self): Shows DepthAI Pipeline graph, which can be useful when debugging. Builds the pipeline (oak.build()). """ self.build() - from depthai_sdk.integrations.depthai_pipeline_graph.depthai_pipeline_graph.pipeline_graph import \ + from depthai_pipeline_graph.pipeline_graph import \ PipelineGraph p = PipelineGraph() From 8d31ab581a4e0515b7915eab560afe2442474b06 Mon Sep 17 00:00:00 2001 From: Erol444 Date: Sat, 27 May 2023 15:12:23 +0200 Subject: [PATCH 7/8] Update XoutStream to get node id from output, to reduces boilerplate code --- .../components/camera_component.py | 8 +-- .../depthai_sdk/components/imu_component.py | 6 +-- .../depthai_sdk/components/nn_component.py | 49 ++++++++----------- .../components/pointcloud_component.py | 4 +- .../components/stereo_component.py | 14 +++--- .../depthai_sdk/oak_outputs/xout/xout_base.py | 4 +- 6 files changed, 39 insertions(+), 46 deletions(-) diff --git a/depthai_sdk/src/depthai_sdk/components/camera_component.py b/depthai_sdk/src/depthai_sdk/components/camera_component.py index 0fac45361..a9a9f97f1 100644 --- a/depthai_sdk/src/depthai_sdk/components/camera_component.py +++ b/depthai_sdk/src/depthai_sdk/components/camera_component.py @@ -399,12 +399,12 @@ def get_stream_xout(self) -> StreamXout: if self.is_replay(): return ReplayStream(self._source) elif self.is_mono(): - return StreamXout(self.node.id, self.stream, name=self.name) + return StreamXout(out=self.stream, name=self.name) else: # ColorCamera self.node.setVideoNumFramesPool(self._num_frames_pool) # node.video instead of preview (self.stream) was used to reduce bandwidth # consumption by 2 (3bytes/pixel vs 1.5bytes/pixel) - return StreamXout(self.node.id, self.node.video, name=self.name) + return StreamXout(out=self.node.video, name=self.name) def set_num_frames_pool(self, num_frames: int, preview_num_frames: Optional[int] = None): """ @@ -460,7 +460,7 @@ def encoded(self, pipeline: dai.Pipeline, device: dai.Device) -> XoutBase: """ if self._comp._encoder_profile == dai.VideoEncoderProperties.Profile.MJPEG: out = XoutMjpeg( - frames=StreamXout(self._comp.encoder.id, self._comp.encoder.bitstream, name=self._comp.name), + frames=StreamXout(out=self._comp.encoder.bitstream, name=self._comp.name), color=self._comp.is_color(), lossless=self._comp.encoder.getLossless(), fps=self._comp.encoder.getFrameRate(), @@ -468,7 +468,7 @@ def encoded(self, pipeline: dai.Pipeline, device: dai.Device) -> XoutBase: ) else: out = XoutH26x( - frames=StreamXout(self._comp.encoder.id, self._comp.encoder.bitstream, name=self._comp.name), + frames=StreamXout(out=self._comp.encoder.bitstream, name=self._comp.name), color=self._comp.is_color(), profile=self._comp._encoder_profile, fps=self._comp.encoder.getFrameRate(), diff --git a/depthai_sdk/src/depthai_sdk/components/imu_component.py b/depthai_sdk/src/depthai_sdk/components/imu_component.py index 34451f703..1345f6dc9 100644 --- a/depthai_sdk/src/depthai_sdk/components/imu_component.py +++ b/depthai_sdk/src/depthai_sdk/components/imu_component.py @@ -54,7 +54,7 @@ def main(self, pipeline: dai.Pipeline, device: dai.Device) -> XoutBase: return self.text(pipeline, device) def text(self, pipeline: dai.Pipeline, device: dai.Device) -> XoutBase: - out = self._comp.node.out - out = StreamXout(self._comp.node.id, out) - imu_out = XoutIMU(out) + imu_out = XoutIMU( + imu_xout=StreamXout(out=self._comp.node.out) + ) return self._comp._create_xout(pipeline, imu_out) diff --git a/depthai_sdk/src/depthai_sdk/components/nn_component.py b/depthai_sdk/src/depthai_sdk/components/nn_component.py index 29dd98417..a37707478 100644 --- a/depthai_sdk/src/depthai_sdk/components/nn_component.py +++ b/depthai_sdk/src/depthai_sdk/components/nn_component.py @@ -614,10 +614,9 @@ def main(self, pipeline: dai.Pipeline, device: dai.Device) -> XoutBase: """ if self._comp._is_multi_stage(): - det_nn_out = StreamXout(id=self._comp._input.node.id, - out=self._comp._input.node.out, + det_nn_out = StreamXout(out=self._comp._input.node.out, name=self._comp._input.name) - second_nn_out = StreamXout(id=self._comp.node.id, out=self._comp.node.out, name=self._comp.name) + second_nn_out = StreamXout(out=self._comp.node.out, name=self._comp.name) out = XoutTwoStage(det_nn=self._comp._input, second_nn=self._comp, @@ -627,11 +626,10 @@ def main(self, pipeline: dai.Pipeline, device: dai.Device) -> XoutBase: device=device, input_queue_name="input_queue" if self._comp.x_in else None) else: - det_nn_out = StreamXout(id=self._comp.node.id, out=self._comp.node.out, name=self._comp.name) + det_nn_out = StreamXout(out=self._comp.node.out, name=self._comp.name) input_stream = self._comp._stream_input out = XoutNnResults(det_nn=self._comp, - frames=StreamXout(id=input_stream.getParent().id, - out=input_stream, + frames=StreamXout(out=input_stream, name=self._comp.name), nn_results=det_nn_out) @@ -643,13 +641,11 @@ def passthrough(self, pipeline: dai.Pipeline, device: dai.Device) -> XoutBase: Produces DetectionPacket or TwoStagePacket (if it's 2. stage NNComponent). """ if self._comp._is_multi_stage(): - det_nn_out = StreamXout(id=self._comp._input.node.id, - out=self._comp._input.node.out, + det_nn_out = StreamXout(out=self._comp._input.node.out, name=self._comp._input.name) - frames = StreamXout(id=self._comp._input.node.id, - out=self._comp._input.node.passthrough, + frames = StreamXout(out=self._comp._input.node.passthrough, name=self._comp.name) - second_nn_out = StreamXout(self._comp.node.id, self._comp.node.out, name=self._comp.name) + second_nn_out = StreamXout(out=self._comp.node.out, name=self._comp.name) out = XoutTwoStage(det_nn=self._comp._input, second_nn=self._comp, @@ -659,8 +655,8 @@ def passthrough(self, pipeline: dai.Pipeline, device: dai.Device) -> XoutBase: device=device, input_queue_name="input_queue" if self._comp.x_in else None) else: - det_nn_out = StreamXout(id=self._comp.node.id, out=self._comp.node.out, name=self._comp.name) - frames = StreamXout(id=self._comp.node.id, out=self._comp.node.passthrough, name=self._comp.name) + det_nn_out = StreamXout(out=self._comp.node.out, name=self._comp.name) + frames = StreamXout(out=self._comp.node.passthrough, name=self._comp.name) out = XoutNnResults(det_nn=self._comp, frames=frames, @@ -669,14 +665,12 @@ def passthrough(self, pipeline: dai.Pipeline, device: dai.Device) -> XoutBase: return self._comp._create_xout(pipeline, out) def image_manip(self, pipeline: dai.Pipeline, device: dai.Device) -> XoutBase: - out = XoutFrames(frames=StreamXout(id=self._comp.image_manip.id, - out=self._comp.image_manip.out, + out = XoutFrames(frames=StreamXout(out=self._comp.image_manip.out, name=self._comp.name)) return self._comp._create_xout(pipeline, out) def input(self, pipeline: dai.Pipeline, device: dai.Device) -> XoutBase: - out = XoutFrames(frames=StreamXout(id=self._comp._input.node.id, - out=self._comp._stream_input, + out = XoutFrames(frames=StreamXout(out=self._comp._stream_input, name=self._comp.name)) return self._comp._create_xout(pipeline, out) @@ -691,8 +685,8 @@ def spatials(self, pipeline: dai.Pipeline, device: dai.Device) -> XoutSpatialBbM out = XoutSpatialBbMappings( device=device, stereo=self._comp._stereo_node, - frames=StreamXout(id=self._comp.node.id, out=self._comp.node.passthroughDepth, name=self._comp.name), - configs=StreamXout(id=self._comp.node.id, out=self._comp.node.out, name=self._comp.name) + frames=StreamXout(out=self._comp.node.passthroughDepth, name=self._comp.name), + configs=StreamXout(out=self._comp.node.out, name=self._comp.name) ) return self._comp._create_xout(pipeline, out) @@ -704,8 +698,7 @@ def twostage_crops(self, pipeline: dai.Pipeline, device: dai.Device) -> XoutFram if not self._comp._is_multi_stage(): raise Exception('SDK tried to output TwoStage crop frames, but this is not a Two-Stage NN component!') - out = XoutFrames(frames=StreamXout(id=self._comp._multi_stage_nn.manip.id, - out=self._comp._multi_stage_nn.manip.out, + out = XoutFrames(frames=StreamXout(out=self._comp._multi_stage_nn.manip.out, name=self._comp.name)) return self._comp._create_xout(pipeline, out) @@ -726,7 +719,7 @@ def tracker(self, pipeline: dai.Pipeline, device: dai.Device) -> XoutTracker: out = XoutTracker(det_nn=self._comp, frames=self._comp._input.get_stream_xout(), # CameraComponent device=device, - tracklets=StreamXout(self._comp.tracker.id, self._comp.tracker.out), + tracklets=StreamXout(out=self._comp.tracker.out), apply_kalman=self._comp.apply_tracking_filter, forget_after_n_frames=self._comp.forget_after_n_frames) @@ -746,8 +739,8 @@ def encoded(self, pipeline: dai.Pipeline, device: dai.Device) -> XoutNnResults: if self._comp._input._encoder_profile == dai.VideoEncoderProperties.Profile.MJPEG: out = XoutNnMjpeg( det_nn=self._comp, - frames=StreamXout(self._comp._input.encoder.id, self._comp._input.encoder.bitstream), - nn_results=StreamXout(self._comp.node.id, self._comp.node.out), + frames=StreamXout(out=self._comp._input.encoder.bitstream), + nn_results=StreamXout(out=self._comp.node.out), color=self._comp._input.is_color(), lossless=self._comp._input.encoder.getLossless(), fps=self._comp._input.encoder.getFrameRate(), @@ -756,8 +749,8 @@ def encoded(self, pipeline: dai.Pipeline, device: dai.Device) -> XoutNnResults: else: out = XoutNnH26x( det_nn=self._comp, - frames=StreamXout(self._comp._input.node.id, self._comp._input.encoder.bitstream), - nn_results=StreamXout(self._comp.node.id, self._comp.node.out), + frames=StreamXout(out=self._comp._input.encoder.bitstream), + nn_results=StreamXout(out=self._comp.node.out), color=self._comp._input.is_color(), profile=self._comp._input._encoder_profile, fps=self._comp._input.encoder.getFrameRate(), @@ -768,9 +761,9 @@ def encoded(self, pipeline: dai.Pipeline, device: dai.Device) -> XoutNnResults: def nn_data(self, pipeline: dai.Pipeline, device: dai.Device) -> XoutNnData: if type(self._comp.node) == dai.node.NeuralNetwork: - out = XoutNnData(xout=StreamXout(self._comp.node.id, self._comp.node.out)) + out = XoutNnData(xout=StreamXout(out=self._comp.node.out)) else: - out = XoutNnData(xout=StreamXout(self._comp.node.id, self._comp.node.outNetwork)) + out = XoutNnData(xout=StreamXout(out=self._comp.node.outNetwork)) return self._comp._create_xout(pipeline, out) # Checks diff --git a/depthai_sdk/src/depthai_sdk/components/pointcloud_component.py b/depthai_sdk/src/depthai_sdk/components/pointcloud_component.py index 8ed847272..902457525 100644 --- a/depthai_sdk/src/depthai_sdk/components/pointcloud_component.py +++ b/depthai_sdk/src/depthai_sdk/components/pointcloud_component.py @@ -102,10 +102,10 @@ def main(self, pipeline: dai.Pipeline, device: dai.Device) -> XoutBase: def pointcloud(self, pipeline: dai.Pipeline, device: dai.Device) -> XoutBase: colorize = None if self._comp.colorize_comp is not None: - colorize = StreamXout(self._comp.colorize_comp.node.id, self._comp.colorize_comp.stream, name="Color") + colorize = StreamXout(out=self._comp.colorize_comp.stream, name="Color") out = XoutPointcloud(device, - StreamXout(self._comp.stereo_depth_node.id, self._comp.depth, name=self._comp.name), + StreamXout(out=self._comp.depth, name=self._comp.name), color_frames=colorize, fps=30 ) diff --git a/depthai_sdk/src/depthai_sdk/components/stereo_component.py b/depthai_sdk/src/depthai_sdk/components/stereo_component.py index 0843df08a..1bbdc67fb 100644 --- a/depthai_sdk/src/depthai_sdk/components/stereo_component.py +++ b/depthai_sdk/src/depthai_sdk/components/stereo_component.py @@ -362,7 +362,7 @@ def _mono_frames(self): """ mono_frames = None if self._comp.wls_enabled or self._comp._colorize == StereoColor.RGBD: - mono_frames = StreamXout(self._comp.node.id, self._comp._right_stream, name=self._comp.name) + mono_frames = StreamXout(out=self._comp._right_stream, name=self._comp.name) return mono_frames def main(self, pipeline: dai.Pipeline, device: dai.Device) -> XoutBase: @@ -373,7 +373,7 @@ def disparity(self, pipeline: dai.Pipeline, device: dai.Device) -> XoutBase: fps = self._comp.left.get_fps() if self._comp._replay is None else self._comp._replay.get_fps() out = XoutDisparity( - frames=StreamXout(self._comp.node.id, self._comp.disparity, name=self._comp.name), + frames=StreamXout(out=self._comp.disparity, name=self._comp.name), disp_factor=255.0 / self._comp.node.getMaxDisparity(), fps=fps, mono_frames=self._mono_frames(), @@ -390,7 +390,7 @@ def disparity(self, pipeline: dai.Pipeline, device: dai.Device) -> XoutBase: def rectified_left(self, pipeline: dai.Pipeline, device: dai.Device) -> XoutBase: fps = self._comp.left.get_fps() if self._comp._replay is None else self._comp._replay.get_fps() out = XoutFrames( - frames=StreamXout(self._comp.node.id, self._comp.node.rectifiedLeft), + frames=StreamXout(out=self._comp.node.rectifiedLeft), fps=fps) out.name = 'Rectified left' return self._comp._create_xout(pipeline, out) @@ -398,7 +398,7 @@ def rectified_left(self, pipeline: dai.Pipeline, device: dai.Device) -> XoutBase def rectified_right(self, pipeline: dai.Pipeline, device: dai.Device) -> XoutBase: fps = self._comp.left.get_fps() if self._comp._replay is None else self._comp._replay.get_fps() out = XoutFrames( - frames=StreamXout(self._comp.node.id, self._comp.node.rectifiedRight), + frames=StreamXout(out=self._comp.node.rectifiedRight), fps=fps) out.name = 'Rectified right' return self._comp._create_xout(pipeline, out) @@ -407,7 +407,7 @@ def depth(self, pipeline: dai.Pipeline, device: dai.Device) -> XoutBase: fps = self._comp.left.get_fps() if self._comp._replay is None else self._comp._replay.get_fps() out = XoutDepth( - frames=StreamXout(self._comp.node.id, self._comp.depth, name=self._comp.name), + frames=StreamXout(out=self._comp.depth, name=self._comp.name), dispScaleFactor=depth_to_disp_factor(device, self._comp.node), fps=fps, mono_frames=self._mono_frames(), @@ -428,13 +428,13 @@ def encoded(self, pipeline: dai.Pipeline, device: dai.Device) -> XoutBase: warnings.warn('WLS filter is enabled, but cannot be applied to encoded frames.') if self._comp._encoderProfile == dai.VideoEncoderProperties.Profile.MJPEG: - out = XoutMjpeg(frames=StreamXout(self._comp.encoder.id, self._comp.encoder.bitstream), + out = XoutMjpeg(frames=StreamXout(out=self._comp.encoder.bitstream), color=self._comp.colormap is not None, lossless=self._comp.encoder.getLossless(), fps=self._comp.encoder.getFrameRate(), frame_shape=(1200, 800)) else: - out = XoutH26x(frames=StreamXout(self._comp.encoder.id, self._comp.encoder.bitstream), + out = XoutH26x(frames=StreamXout(out=self._comp.encoder.bitstream), color=self._comp.colormap is not None, profile=self._comp._encoderProfile, fps=self._comp.encoder.getFrameRate(), diff --git a/depthai_sdk/src/depthai_sdk/oak_outputs/xout/xout_base.py b/depthai_sdk/src/depthai_sdk/oak_outputs/xout/xout_base.py index 4ca3fa270..63da4cc66 100644 --- a/depthai_sdk/src/depthai_sdk/oak_outputs/xout/xout_base.py +++ b/depthai_sdk/src/depthai_sdk/oak_outputs/xout/xout_base.py @@ -10,12 +10,12 @@ class StreamXout: - def __init__(self, id: int, out: dai.Node.Output, name: Optional[str] = None): + def __init__(self, out: dai.Node.Output, name: Optional[str] = None): self.stream = out if name is not None: self.name = f'{name}_{str(out.name)}' else: - self.name = f"{str(id)}_{out.name}" + self.name = f"{str(out.getParent().id)}_{out.name}" class ReplayStream(StreamXout): From 75830133539cb9ddc4a9aa1b89a8893c8004a882 Mon Sep 17 00:00:00 2001 From: Erol444 Date: Sat, 27 May 2023 22:27:45 +0200 Subject: [PATCH 8/8] Started integrating Hand Tracker into SDK --- .../HandTrackerComponent/hand_landmarks.py | 21 + .../src/depthai_sdk/classes/packets.py | 54 +- .../components/hand_tracker/component.py | 268 +++++ .../hand_tracker/mediapipe_utils.py | 953 ++++++++++++++++++ .../components/hand_tracker/renderer.py | 180 ++++ .../template_manager_script_duo.py | 354 +++++++ depthai_sdk/src/depthai_sdk/oak_camera.py | 14 + .../oak_outputs/xout/xout_hand_tracker.py | 173 ++++ 8 files changed, 2013 insertions(+), 4 deletions(-) create mode 100644 depthai_sdk/examples/HandTrackerComponent/hand_landmarks.py create mode 100644 depthai_sdk/src/depthai_sdk/components/hand_tracker/component.py create mode 100644 depthai_sdk/src/depthai_sdk/components/hand_tracker/mediapipe_utils.py create mode 100644 depthai_sdk/src/depthai_sdk/components/hand_tracker/renderer.py create mode 100644 depthai_sdk/src/depthai_sdk/components/hand_tracker/template_manager_script_duo.py create mode 100644 depthai_sdk/src/depthai_sdk/oak_outputs/xout/xout_hand_tracker.py diff --git a/depthai_sdk/examples/HandTrackerComponent/hand_landmarks.py b/depthai_sdk/examples/HandTrackerComponent/hand_landmarks.py new file mode 100644 index 000000000..3c857fb38 --- /dev/null +++ b/depthai_sdk/examples/HandTrackerComponent/hand_landmarks.py @@ -0,0 +1,21 @@ +from depthai_sdk import OakCamera +from depthai_sdk.components.hand_tracker.renderer import HandTrackerRenderer +from depthai_sdk.classes.packets import HandTrackerPacket +import cv2 + +with OakCamera() as oak: + color = oak.create_camera('color') + + handtracker = oak.create_hand_tracker(color) + + render = HandTrackerRenderer(handtracker) + def cb(packet: HandTrackerPacket): + render.draw(packet.color_frame, packet.hands) + cv2.imshow("Hand tracking", render.frame) + + oak.callback(handtracker, cb) + oak.visualize(handtracker.out.palm_detection) + oak.visualize(handtracker.out.palm_crop) + + # oak.show_graph() + oak.start(blocking=True) diff --git a/depthai_sdk/src/depthai_sdk/classes/packets.py b/depthai_sdk/src/depthai_sdk/classes/packets.py index d53d61bcd..710de3528 100644 --- a/depthai_sdk/src/depthai_sdk/classes/packets.py +++ b/depthai_sdk/src/depthai_sdk/classes/packets.py @@ -1,8 +1,8 @@ from typing import Tuple, List, Union, Optional - +from depthai_sdk.components.hand_tracker.mediapipe_utils import HandRegion import depthai as dai import numpy as np - +import math try: import cv2 except ImportError: @@ -72,11 +72,22 @@ def __init__(self, name: str, points: np.ndarray, depth_map: dai.ImgFrame, - color_frame: Optional[np.ndarray], + color_frame: Optional[np.ndarray] = None, visualizer: 'Visualizer' = None): self.name = name self.points = points - self.depth_imgFrame = dai.ImgFrame + self.depth_imgFrame = depth_map + self.color_frame = color_frame + self.visualizer = visualizer + +class HandTrackerPacket: + def __init__(self, + name: str, + hands: List[HandRegion], + color_frame: Optional[np.ndarray] = None, + visualizer: 'Visualizer' = None): + self.name = name + self.hands = hands self.color_frame = color_frame self.visualizer = visualizer @@ -117,6 +128,41 @@ def __init__(self, self.spatials = spatials +class RotatedDetectionPacket(FramePacket): + def __init__(self, + name: str, + msg: dai.ImgFrame, + rotated_rects: List[dai.RotatedRect], + visualizer: 'Visualizer' = None): + super().__init__(name=name, + msg=msg, + frame=msg.getCvFrame() if cv2 else None, + visualizer=visualizer) + self.rotated_rects = rotated_rects + self.bb_corners = [self.rotated_rect_to_points(rr) for rr in self.rotated_rects] + + def rotated_rect_to_points(self, rr: dai.RotatedRect) -> List[Tuple]: + cx = rr.center.x + cy = rr.center.y + w = rr.size.width / 2 # half-width + h = rr.size.height / 2 # half-height + rotation = math.radians(rr.angle) # convert angle to radians + + b = math.cos(rotation) + a = math.sin(rotation) + + # calculate corners + p0x = cx - a*h - b*w + p0y = cy + b*h - a*w + p1x = cx + a*h - b*w + p1y = cy - b*h - a*w + p2x = 2*cx - p0x + p2y = 2*cy - p0y + p3x = 2*cx - p1x + p3y = 2*cy - p1y + + return [(p0x,p0y), (p1x,p1y), (p2x,p2y), (p3x,p3y)] + class DetectionPacket(FramePacket): """ Output from Detection Network nodes - image frame + image detections. Inherits FramePacket. diff --git a/depthai_sdk/src/depthai_sdk/components/hand_tracker/component.py b/depthai_sdk/src/depthai_sdk/components/hand_tracker/component.py new file mode 100644 index 000000000..712a1574e --- /dev/null +++ b/depthai_sdk/src/depthai_sdk/components/hand_tracker/component.py @@ -0,0 +1,268 @@ +from pathlib import Path +from typing import Union, Dict, Optional +from string import Template + +from depthai_sdk.oak_outputs.xout.xout_hand_tracker import XoutHandTracker, XoutPalmDetection + +import blobconverter +import depthai as dai +from depthai_sdk.components.camera_component import CameraComponent +from depthai_sdk.components.component import Component +from depthai_sdk.classes.enum import ResizeMode +from depthai_sdk.components.stereo_component import StereoComponent +from depthai_sdk.oak_outputs.xout.xout_base import StreamXout, XoutBase +from depthai_sdk.oak_outputs.xout.xout_frames import XoutFrames + +class HandTrackerComponent(Component): + def __init__(self, + device: dai.Device, + pipeline: dai.Pipeline, + input: CameraComponent, + # model: str = 'lite', # "lite", "full", or "sparse" + spatial: Union[None, bool, StereoComponent] = None, + args: Dict = None, # User defined args + name: Optional[str] = None + ) -> None: + """ + This component aims to port https://github.com/geaxgx/depthai_hand_tracker into the DepthAI SDK. It uses Google Mediapipe's hand landmark model. + """ + super().__init__() + + self.name = name + self.out = self.Out(self) + + # Private properties + self._ar_resize_mode: ResizeMode = ResizeMode.LETTERBOX # Default + self._input: CameraComponent = input # Input to the component node passed on initialization + self._stream_input: dai.Node.Output # Node Output that will be used as the input for this NNComponent + + self._args: Optional[Dict] = args + + self._spatial: Optional[Union[bool, StereoComponent]] = spatial + + self.use_lm = True + self.trace = 0 + self.lm_nb_threads = 2 # Number of inference threads for the landmark model + self.xyz = False + self.crop = False + self.use_world_landmarks = False + self.use_gesture = False + self.use_handedness_average = True + self.single_hand_tolerance_thresh = 10 + self.use_same_image = True + self.pd_score_thresh = 0.5 + self.lm_score_thresh = 0.4 + self.pd_resize_mode = ResizeMode.LETTERBOX + + self.img_w, self.img_h = self._input.stream_size + print(self._input.node.getPreviewSize(), self._input.node.getVideoSize(), self._input.node.getIspSize()) + print(self._input.stream_size) + + self.crop_w = 0 + self.frame_size = self.img_w + self.pad_w = 0 + self.pad_h = (self.img_w - self.img_h) // 2 + """ + Internal camera FPS set to: 36 + Internal camera image size: 1152 x 648 - pad_h: 252 + VALS: 252 648 1152 1152 0 + + MY vals: VALS: 0 720 1280 1280 0 + """ + + # _crop_w = self.crop_w, + + self.script = pipeline.create(dai.node.Script) + self.script.setScript(self.build_manager_script()) + self.script.setProcessor(dai.ProcessorType.LEON_CSS) # More stable + + # Palm detection --------- + self.pd_size = (128, 128) + pd_manip = pipeline.create(dai.node.ImageManip) + pd_manip.setMaxOutputFrameSize(self.pd_size[0] * self.pd_size[1] * 3) + pd_manip.setWaitForConfigInput(True) + pd_manip.inputImage.setQueueSize(1) + pd_manip.inputImage.setBlocking(False) + + # if self._ar_resize_mode == ResizeMode.CROP: + # pd_manip.initialConfig.setKeepAspectRatio(False) # Stretch the image to fit the NN input + # self.image_manip.initialConfig.setCenterCrop(1, self._size[0] / self._size[1]) + # elif self._ar_resize_mode == ResizeMode.LETTERBOX: + # pd_manip.initialConfig.setKeepAspectRatio(True) + # elif self._ar_resize_mode == ResizeMode.STRETCH: + # pd_manip.initialConfig.setKeepAspectRatio(False) + + self._input.stream.link(pd_manip.inputImage) + self.script.outputs['pre_pd_manip_cfg'].link(pd_manip.inputConfig) + + self.palm_detection_nn = pipeline.create(dai.node.NeuralNetwork) + self.palm_detection_nn.setBlobPath(blobconverter.from_zoo(name="palm_detection_128x128", + zoo_type='depthai', + shaves=6)) + pd_manip.out.link(self.palm_detection_nn.input) + + # Palm detection decoding ----------- + self.palm_detection_decoding = pipeline.create(dai.node.NeuralNetwork) + self.palm_detection_decoding.setBlobPath(blobconverter.from_zoo(name="palm_detection_128x128_decoding", + zoo_type='depthai', + shaves=6, + # Required, otherwise it will try to convert INT8 to FP16: + compile_params=[] + )) + self.palm_detection_nn.out.link(self.palm_detection_decoding.input) + self.palm_detection_decoding.out.link(self.script.inputs['from_post_pd_nn']) + + # Hand landmarks ------------ + + self.lm_input_length = 224 + self.pre_lm_manip = pipeline.create(dai.node.ImageManip) + self.pre_lm_manip.setMaxOutputFrameSize(self.lm_input_length*self.lm_input_length*3) + self.pre_lm_manip.setWaitForConfigInput(True) + self.pre_lm_manip.inputImage.setQueueSize(1) + self.pre_lm_manip.inputImage.setBlocking(False) + self._input.stream.link(self.pre_lm_manip.inputImage) + + self.script.outputs['pre_lm_manip_cfg'].link(self.pre_lm_manip.inputConfig) + + # Define landmark model + lm_nn = pipeline.create(dai.node.NeuralNetwork) + lm_nn.setBlobPath(blobconverter.from_zoo(name="hand_landmark_224x224", + zoo_type='depthai', + shaves=6)) + lm_nn.setNumInferenceThreads(self.lm_nb_threads) + self.pre_lm_manip.out.link(lm_nn.input) + lm_nn.out.link(self.script.inputs['from_lm_nn']) + + + def build_manager_script(self): + ''' + The code of the scripting node 'manager_script' depends on : + - the score threshold, + - the video frame shape + So we build this code from the content of the file template_manager_script_*.py which is a python template + ''' + path = Path(__file__).resolve().parent / "template_manager_script_duo.py" + + # Read the template + with open(path, 'r', encoding='utf-8') as file: + template = Template(file.read()) + + # self.trace = 3 + print('VALS:', self.pad_h, self.img_h, self.img_w, self.frame_size, self.crop_w) + + # Perform the substitution + code = template.substitute( + _TRACE1 = "node.warn" if self.trace & 1 else "#", + _TRACE2 = "node.warn" if self.trace & 2 else "#", + _pd_score_thresh = self.pd_score_thresh, + _lm_score_thresh = self.lm_score_thresh, + _pad_h = self.pad_h, + _img_h = self.img_h, + _img_w = self.img_w, + _frame_size = self.frame_size, + _crop_w = self.crop_w, + _IF_XYZ = "" if self.xyz else '"""', + _IF_USE_HANDEDNESS_AVERAGE = "" if self.use_handedness_average else '"""', + _single_hand_tolerance_thresh= self.single_hand_tolerance_thresh, + _IF_USE_SAME_IMAGE = "" if self.use_same_image else '"""', + _IF_USE_WORLD_LANDMARKS = "" if self.use_world_landmarks else '"""', + ) + # Remove comments and empty lines + import re + code = re.sub(r'"{3}.*?"{3}', '', code, flags=re.DOTALL) + code = re.sub(r'#.*', '', code) + code = re.sub('\n\s*\n', '\n', code) + # For debugging + return code + + # def _change_resize_mode(self, mode: ResizeMode) -> None: + # """ + # Changes the resize mode of the ImageManip node. + + # Args: + # mode (ResizeMode): Resize mode to use + # """ + # self._ar_resize_mode = mode + + # # TODO: uncomment this when depthai 2.21.3 is released. In some cases (eg. + # # setting first crop, then letterbox), the last config isn't used. + # # self.image_manip.initialConfig.set(dai.RawImageManipConfig()) + + # if self._ar_resize_mode == ResizeMode.CROP: + # self.image_manip.initialConfig.setResize(self._size) + # # With .setCenterCrop(1), ImageManip will first crop the frame to the correct aspect ratio, + # # and then resize it to the NN input size + # self.image_manip.initialConfig.setCenterCrop(1, self._size[0] / self._size[1]) + # elif self._ar_resize_mode == ResizeMode.LETTERBOX: + # self.image_manip.initialConfig.setResizeThumbnail(*self._size) + # elif self._ar_resize_mode == ResizeMode.STRETCH: + # # Not keeping aspect ratio -> stretching the image + # self.image_manip.initialConfig.setKeepAspectRatio(False) + # self.image_manip.initialConfig.setResize(self._size) + + """ + Available outputs (to the host) of this component + """ + + class Out: + def __init__(self, nn_component: 'HandTrackerComponent'): + self._comp = nn_component + + def main(self, pipeline: dai.Pipeline, device: dai.Device) -> XoutBase: + """ + Default output. Streams NN results and high-res frames that were downscaled and used for inferencing. + Produces DetectionPacket or TwoStagePacket (if it's 2. stage NNComponent). + """ + # manager_out = pipeline.create(dai.node.XLinkOut) + # manager_out.setStreamName("manager_out") + # manager_.link(manager_out.input) + script_out = StreamXout(out=self._comp.script.outputs['host'], + name='host') + + camera_comp = self._comp._input + frames = StreamXout(out=camera_comp.stream) + + out = XoutHandTracker( + script_stream=script_out, + frames=frames, + component=self._comp + ) + return self._comp._create_xout(pipeline, out) + + def palm_detection(self, pipeline: dai.Pipeline, device: dai.Device) -> XoutBase: + """ + Only palm detection outputs. Send both PD decoded results + passthrough frames (frames + used for inferencing PD) + """ + out = XoutPalmDetection( + frames=StreamXout(out=self._comp.palm_detection_nn.passthrough), + nn_results=StreamXout(out=self._comp.palm_detection_decoding.out, name="Palm detection results"), + hand_tracker=self._comp + ) + return self._comp._create_xout(pipeline, out) + + def palm_crop(self, pipeline: dai.Pipeline, device: dai.Device) -> XoutBase: + """ + Send out the cropped Palm frames, which are sent to the hand landmark NN. Useful for debugging. + """ + out = XoutFrames( + frames=StreamXout(out=self._comp.pre_lm_manip.out) + ) + return self._comp._create_xout(pipeline, out) + + + + # def passthrough(self, pipeline: dai.Pipeline, device: dai.Device) -> XoutBase: + # """ + # Default output. Streams NN results and passthrough frames (frames used for inferencing) + # Produces DetectionPacket or TwoStagePacket (if it's 2. stage NNComponent). + # """ + # det_nn_out = StreamXout(out=self._comp.node.out, name=self._comp.name) + # frames = StreamXout(out=self._comp.node.passthrough, name=self._comp.name) + + # out = XoutNnResults(det_nn=self._comp, + # frames=frames, + # nn_results=det_nn_out) + + # return self._comp._create_xout(pipeline, out) + diff --git a/depthai_sdk/src/depthai_sdk/components/hand_tracker/mediapipe_utils.py b/depthai_sdk/src/depthai_sdk/components/hand_tracker/mediapipe_utils.py new file mode 100644 index 000000000..aa146dfbe --- /dev/null +++ b/depthai_sdk/src/depthai_sdk/components/hand_tracker/mediapipe_utils.py @@ -0,0 +1,953 @@ +import cv2 +import numpy as np +from collections import namedtuple +from math import ceil, sqrt, exp, pi, floor, sin, cos, atan2, gcd + +# To not display: RuntimeWarning: overflow encountered in exp +# in line: scores = 1 / (1 + np.exp(-scores)) +np.seterr(over='ignore') + +class HandRegion: + """ + Attributes: + pd_score : detection score + pd_box : detection box [x, y, w, h], normalized [0,1] in the squared image + pd_kps : detection keypoints coordinates [x, y], normalized [0,1] in the squared image + rect_x_center, rect_y_center : center coordinates of the rotated bounding rectangle, normalized [0,1] in the squared image + rect_w, rect_h : width and height of the rotated bounding rectangle, normalized in the squared image (may be > 1) + rotation : rotation angle of rotated bounding rectangle with y-axis in radian + rect_x_center_a, rect_y_center_a : center coordinates of the rotated bounding rectangle, in pixels in the squared image + rect_w, rect_h : width and height of the rotated bounding rectangle, in pixels in the squared image + rect_points : list of the 4 points coordinates of the rotated bounding rectangle, in pixels + expressed in the squared image during processing, + expressed in the source rectangular image when returned to the user + lm_score: global landmark score + norm_landmarks : 3D landmarks coordinates in the rotated bounding rectangle, normalized [0,1] + landmarks : 2D landmark coordinates in pixel in the source rectangular image + world_landmarks : 3D landmark coordinates in meter + handedness: float between 0. and 1., > 0.5 for right hand, < 0.5 for left hand, + label: "left" or "right", handedness translated in a string, + xyz: real 3D world coordinates of the wrist landmark, or of the palm center (if landmarks are not used), + xyz_zone: (left, top, right, bottom), pixel coordinates in the source rectangular image + of the rectangular zone used to estimate the depth + gesture: (optional, set in recognize_gesture() when use_gesture==True) string corresponding to recognized gesture ("ONE","TWO","THREE","FOUR","FIVE","FIST","OK","PEACE") + or None if no gesture has been recognized + """ + def __init__(self, pd_score=None, pd_box=None, pd_kps=None): + self.pd_score = pd_score # Palm detection score + self.pd_box = pd_box # Palm detection box [x, y, w, h] normalized + self.pd_kps = pd_kps # Palm detection keypoints + + def get_rotated_world_landmarks(self): + world_landmarks_rotated = self.world_landmarks.copy() + sin_rot = sin(self.rotation) + cos_rot = cos(self.rotation) + rot_m = np.array([[cos_rot, sin_rot], [-sin_rot, cos_rot]]) + world_landmarks_rotated[:,:2] = np.dot(world_landmarks_rotated[:,:2], rot_m) + return world_landmarks_rotated + + def print(self): + attrs = vars(self) + print('\n'.join("%s: %s" % item for item in attrs.items())) + +class HandednessAverage: + """ + Used to store the average handeness + Why ? Handedness inferred by the landmark model is not perfect. For certain poses, it is not rare that the model thinks + that a right hand is a left hand (or vice versa). Instead of using the last inferred handedness, we prefer to use the average + of the inferred handedness on the last frames. This gives more robustness. + """ + def __init__(self): + self._total_handedness = 0 + self._nb = 0 + def update(self, new_handedness): + self._total_handedness += new_handedness + self._nb += 1 + return self._total_handedness / self._nb + def reset(self): + self._total_handedness = self._nb = 0 + + +SSDAnchorOptions = namedtuple('SSDAnchorOptions',[ + 'num_layers', + 'min_scale', + 'max_scale', + 'input_size_height', + 'input_size_width', + 'anchor_offset_x', + 'anchor_offset_y', + 'strides', + 'aspect_ratios', + 'reduce_boxes_in_lowest_layer', + 'interpolated_scale_aspect_ratio', + 'fixed_anchor_size']) + +def calculate_scale(min_scale, max_scale, stride_index, num_strides): + if num_strides == 1: + return (min_scale + max_scale) / 2 + else: + return min_scale + (max_scale - min_scale) * stride_index / (num_strides - 1) + +def generate_anchors(options): + """ + option : SSDAnchorOptions + # https://github.com/google/mediapipe/blob/master/mediapipe/calculators/tflite/ssd_anchors_calculator.cc + """ + anchors = [] + layer_id = 0 + n_strides = len(options.strides) + while layer_id < n_strides: + anchor_height = [] + anchor_width = [] + aspect_ratios = [] + scales = [] + # For same strides, we merge the anchors in the same order. + last_same_stride_layer = layer_id + while last_same_stride_layer < n_strides and \ + options.strides[last_same_stride_layer] == options.strides[layer_id]: + scale = calculate_scale(options.min_scale, options.max_scale, last_same_stride_layer, n_strides) + if last_same_stride_layer == 0 and options.reduce_boxes_in_lowest_layer: + # For first layer, it can be specified to use predefined anchors. + aspect_ratios += [1.0, 2.0, 0.5] + scales += [0.1, scale, scale] + else: + aspect_ratios += options.aspect_ratios + scales += [scale] * len(options.aspect_ratios) + if options.interpolated_scale_aspect_ratio > 0: + if last_same_stride_layer == n_strides -1: + scale_next = 1.0 + else: + scale_next = calculate_scale(options.min_scale, options.max_scale, last_same_stride_layer+1, n_strides) + scales.append(sqrt(scale * scale_next)) + aspect_ratios.append(options.interpolated_scale_aspect_ratio) + last_same_stride_layer += 1 + + for i,r in enumerate(aspect_ratios): + ratio_sqrts = sqrt(r) + anchor_height.append(scales[i] / ratio_sqrts) + anchor_width.append(scales[i] * ratio_sqrts) + + stride = options.strides[layer_id] + feature_map_height = ceil(options.input_size_height / stride) + feature_map_width = ceil(options.input_size_width / stride) + + for y in range(feature_map_height): + for x in range(feature_map_width): + for anchor_id in range(len(anchor_height)): + x_center = (x + options.anchor_offset_x) / feature_map_width + y_center = (y + options.anchor_offset_y) / feature_map_height + # new_anchor = Anchor(x_center=x_center, y_center=y_center) + if options.fixed_anchor_size: + new_anchor = [x_center, y_center, 1.0, 1.0] + # new_anchor.w = 1.0 + # new_anchor.h = 1.0 + else: + new_anchor = [x_center, y_center, anchor_width[anchor_id], anchor_height[anchor_id]] + # new_anchor.w = anchor_width[anchor_id] + # new_anchor.h = anchor_height[anchor_id] + anchors.append(new_anchor) + + layer_id = last_same_stride_layer + return np.array(anchors) + +def generate_handtracker_anchors(input_size_width, input_size_height): + # https://github.com/google/mediapipe/blob/master/mediapipe/modules/palm_detection/palm_detection_cpu.pbtxt + anchor_options = SSDAnchorOptions(num_layers=4, + min_scale=0.1484375, + max_scale=0.75, + input_size_height=input_size_height, + input_size_width=input_size_width, + anchor_offset_x=0.5, + anchor_offset_y=0.5, + strides=[8, 16, 16, 16], + aspect_ratios= [1.0], + reduce_boxes_in_lowest_layer=False, + interpolated_scale_aspect_ratio=1.0, + fixed_anchor_size=True) + return generate_anchors(anchor_options) + +def decode_bboxes(score_thresh, scores, bboxes, anchors, scale=128, best_only=False): + """ + wi, hi : NN input shape + mediapipe/calculators/tflite/tflite_tensors_to_detections_calculator.cc + # Decodes the detection tensors generated by the model, based on + # the SSD anchors and the specification in the options, into a vector of + # detections. Each detection describes a detected object. + + https://github.com/google/mediapipe/blob/master/mediapipe/modules/palm_detection/palm_detection_cpu.pbtxt : + node { + calculator: "TensorsToDetectionsCalculator" + input_stream: "TENSORS:detection_tensors" + input_side_packet: "ANCHORS:anchors" + output_stream: "DETECTIONS:unfiltered_detections" + options: { + [mediapipe.TensorsToDetectionsCalculatorOptions.ext] { + num_classes: 1 + num_boxes: 896 + num_coords: 18 + box_coord_offset: 0 + keypoint_coord_offset: 4 + num_keypoints: 7 + num_values_per_keypoint: 2 + sigmoid_score: true + score_clipping_thresh: 100.0 + reverse_output_order: true + + x_scale: 128.0 + y_scale: 128.0 + h_scale: 128.0 + w_scale: 128.0 + min_score_thresh: 0.5 + } + } + } + node { + calculator: "TensorsToDetectionsCalculator" + input_stream: "TENSORS:detection_tensors" + input_side_packet: "ANCHORS:anchors" + output_stream: "DETECTIONS:unfiltered_detections" + options: { + [mediapipe.TensorsToDetectionsCalculatorOptions.ext] { + num_classes: 1 + num_boxes: 2016 + num_coords: 18 + box_coord_offset: 0 + keypoint_coord_offset: 4 + num_keypoints: 7 + num_values_per_keypoint: 2 + sigmoid_score: true + score_clipping_thresh: 100.0 + reverse_output_order: true + + x_scale: 192.0 + y_scale: 192.0 + w_scale: 192.0 + h_scale: 192.0 + min_score_thresh: 0.5 + } + } + } + + scores: shape = [number of anchors 896 or 2016] + bboxes: shape = [ number of anchors x 18], 18 = 4 (bounding box : (cx,cy,w,h) + 14 (7 palm keypoints) + """ + regions = [] + scores = 1 / (1 + np.exp(-scores)) + if best_only: + best_id = np.argmax(scores) + if scores[best_id] < score_thresh: return regions + det_scores = scores[best_id:best_id+1] + det_bboxes2 = bboxes[best_id:best_id+1] + det_anchors = anchors[best_id:best_id+1] + else: + detection_mask = scores > score_thresh + det_scores = scores[detection_mask] + if det_scores.size == 0: return regions + det_bboxes2 = bboxes[detection_mask] + det_anchors = anchors[detection_mask] + + # scale = 128 # x_scale, y_scale, w_scale, h_scale + # scale = 192 # x_scale, y_scale, w_scale, h_scale + + # cx, cy, w, h = bboxes[i,:4] + # cx = cx * anchor.w / wi + anchor.x_center + # cy = cy * anchor.h / hi + anchor.y_center + # lx = lx * anchor.w / wi + anchor.x_center + # ly = ly * anchor.h / hi + anchor.y_center + det_bboxes = det_bboxes2* np.tile(det_anchors[:,2:4], 9) / scale + np.tile(det_anchors[:,0:2],9) + # w = w * anchor.w / wi (in the prvious line, we add anchor.x_center and anchor.y_center to w and h, we need to substract them now) + # h = h * anchor.h / hi + det_bboxes[:,2:4] = det_bboxes[:,2:4] - det_anchors[:,0:2] + # box = [cx - w*0.5, cy - h*0.5, w, h] + det_bboxes[:,0:2] = det_bboxes[:,0:2] - det_bboxes[:,3:4] * 0.5 + + for i in range(det_bboxes.shape[0]): + score = det_scores[i] + box = det_bboxes[i,0:4] + # Decoded detection boxes could have negative values for width/height due + # to model prediction. Filter out those boxes + if box[2] < 0 or box[3] < 0: continue + kps = [] + # 0 : wrist + # 1 : index finger joint + # 2 : middle finger joint + # 3 : ring finger joint + # 4 : little finger joint + # 5 : + # 6 : thumb joint + # for j, name in enumerate(["0", "1", "2", "3", "4", "5", "6"]): + # kps[name] = det_bboxes[i,4+j*2:6+j*2] + for kp in range(7): + kps.append(det_bboxes[i,4+kp*2:6+kp*2]) + regions.append(HandRegion(float(score), box, kps)) + return regions + +# Starting from opencv 4.5.4, cv2.dnn.NMSBoxes output format changed +import re +cv2_version = cv2.__version__.split('.') +v0 = int(cv2_version[0]) +v1 = int(cv2_version[1]) +v2 = int(re.sub(r'\D+', '', cv2_version[2])) +if v0 > 4 or (v0 == 4 and (v1 > 5 or (v1 == 5 and v2 >= 4))): + def non_max_suppression(regions, nms_thresh): + # cv2.dnn.NMSBoxes(boxes, scores, 0, nms_thresh) needs: + # boxes = [ [x, y, w, h], ...] with x, y, w, h of type int + # Currently, x, y, w, h are float between 0 and 1, so we arbitrarily multiply by 1000 and cast to int + # boxes = [r.box for r in regions] + boxes = [ [int(x*1000) for x in r.pd_box] for r in regions] + scores = [r.pd_score for r in regions] + indices = cv2.dnn.NMSBoxes(boxes, scores, 0, nms_thresh) # Not using top_k=2 here because it does not give expected result. Bug ? + return [regions[i] for i in indices] +else: + def non_max_suppression(regions, nms_thresh): + # cv2.dnn.NMSBoxes(boxes, scores, 0, nms_thresh) needs: + # boxes = [ [x, y, w, h], ...] with x, y, w, h of type int + # Currently, x, y, w, h are float between 0 and 1, so we arbitrarily multiply by 1000 and cast to int + # boxes = [r.box for r in regions] + boxes = [ [int(x*1000) for x in r.pd_box] for r in regions] + scores = [r.pd_score for r in regions] + indices = cv2.dnn.NMSBoxes(boxes, scores, 0, nms_thresh) # Not using top_k=2 here because it does not give expected result. Bug ? + return [regions[i[0]] for i in indices] + +def normalize_radians(angle): + return angle - 2 * pi * floor((angle + pi) / (2 * pi)) + +def rot_vec(vec, rotation): + vx, vy = vec + return [vx * cos(rotation) - vy * sin(rotation), vx * sin(rotation) + vy * cos(rotation)] + +def detections_to_rect(regions): + # https://github.com/google/mediapipe/blob/master/mediapipe/modules/hand_landmark/palm_detection_detection_to_roi.pbtxt + # # Converts results of palm detection into a rectangle (normalized by image size) + # # that encloses the palm and is rotated such that the line connecting center of + # # the wrist and MCP of the middle finger is aligned with the Y-axis of the + # # rectangle. + # node { + # calculator: "DetectionsToRectsCalculator" + # input_stream: "DETECTION:detection" + # input_stream: "IMAGE_SIZE:image_size" + # output_stream: "NORM_RECT:raw_roi" + # options: { + # [mediapipe.DetectionsToRectsCalculatorOptions.ext] { + # rotation_vector_start_keypoint_index: 0 # Center of wrist. + # rotation_vector_end_keypoint_index: 2 # MCP of middle finger. + # rotation_vector_target_angle_degrees: 90 + # } + # } + + target_angle = pi * 0.5 # 90 = pi/2 + for region in regions: + + region.rect_w = region.pd_box[2] + region.rect_h = region.pd_box[3] + region.rect_x_center = region.pd_box[0] + region.rect_w / 2 + region.rect_y_center = region.pd_box[1] + region.rect_h / 2 + + x0, y0 = region.pd_kps[0] # wrist center + x1, y1 = region.pd_kps[2] # middle finger + rotation = target_angle - atan2(-(y1 - y0), x1 - x0) + region.rotation = normalize_radians(rotation) + +def rotated_rect_to_points(cx, cy, w, h, rotation): + b = cos(rotation) * 0.5 + a = sin(rotation) * 0.5 + points = [] + p0x = cx - a*h - b*w + p0y = cy + b*h - a*w + p1x = cx + a*h - b*w + p1y = cy - b*h - a*w + p2x = int(2*cx - p0x) + p2y = int(2*cy - p0y) + p3x = int(2*cx - p1x) + p3y = int(2*cy - p1y) + p0x, p0y, p1x, p1y = int(p0x), int(p0y), int(p1x), int(p1y) + return [[p0x,p0y], [p1x,p1y], [p2x,p2y], [p3x,p3y]] + +def rect_transformation(regions, w, h): + """ + w, h : image input shape + """ + # https://github.com/google/mediapipe/blob/master/mediapipe/modules/hand_landmark/palm_detection_detection_to_roi.pbtxt + # # Expands and shifts the rectangle that contains the palm so that it's likely + # # to cover the entire hand. + # node { + # calculator: "RectTransformationCalculator" + # input_stream: "NORM_RECT:raw_roi" + # input_stream: "IMAGE_SIZE:image_size" + # output_stream: "roi" + # options: { + # [mediapipe.RectTransformationCalculatorOptions.ext] { + # scale_x: 2.6 + # scale_y: 2.6 + # shift_y: -0.5 + # square_long: true + # } + # } + # IMHO 2.9 is better than 2.6. With 2.6, it may happen that finger tips stay outside of the bouding rotated rectangle + scale_x = 2.9 + scale_y = 2.9 + shift_x = 0 + shift_y = -0.5 + for region in regions: + width = region.rect_w + height = region.rect_h + rotation = region.rotation + if rotation == 0: + region.rect_x_center_a = (region.rect_x_center + width * shift_x) * w + region.rect_y_center_a = (region.rect_y_center + height * shift_y) * h + else: + x_shift = (w * width * shift_x * cos(rotation) - h * height * shift_y * sin(rotation)) #/ w + y_shift = (w * width * shift_x * sin(rotation) + h * height * shift_y * cos(rotation)) #/ h + region.rect_x_center_a = region.rect_x_center*w + x_shift + region.rect_y_center_a = region.rect_y_center*h + y_shift + + # square_long: true + long_side = max(width * w, height * h) + region.rect_w_a = long_side * scale_x + region.rect_h_a = long_side * scale_y + region.rect_points = rotated_rect_to_points(region.rect_x_center_a, region.rect_y_center_a, region.rect_w_a, region.rect_h_a, region.rotation) + +def hand_landmarks_to_rect(hand): + # Calculates the ROI for the next frame from the current hand landmarks + id_wrist = 0 + id_index_mcp = 5 + id_middle_mcp = 9 + id_ring_mcp =13 + + lms_xy = hand.landmarks[:,:2] + # print(lms_xy) + # Compute rotation + x0, y0 = lms_xy[id_wrist] + x1, y1 = 0.25 * (lms_xy[id_index_mcp] + lms_xy[id_ring_mcp]) + 0.5 * lms_xy[id_middle_mcp] + rotation = 0.5 * pi - atan2(y0 - y1, x1 - x0) + rotation = normalize_radians(rotation) + # Now we work only on a subset of the landmarks + ids_for_bounding_box = [0, 1, 2, 3, 5, 6, 9, 10, 13, 14, 17, 18] + lms_xy = lms_xy[ids_for_bounding_box] + # Find center of the boundaries of landmarks + axis_aligned_center = 0.5 * (np.min(lms_xy, axis=0) + np.max(lms_xy, axis=0)) + # Find boundaries of rotated landmarks + original = lms_xy - axis_aligned_center + c, s = np.cos(rotation), np.sin(rotation) + rot_mat = np.array(((c, -s), (s, c))) + projected = original.dot(rot_mat) + min_proj = np.min(projected, axis=0) + max_proj = np.max(projected, axis=0) + projected_center = 0.5 * (min_proj + max_proj) + center = rot_mat.dot(projected_center) + axis_aligned_center + width, height = max_proj - min_proj + next_hand = HandRegion() + next_hand.rect_w_a = next_hand.rect_h_a = 2 * max(width, height) + next_hand.rect_x_center_a = center[0] + 0.1 * height * s + next_hand.rect_y_center_a = center[1] - 0.1 * height * c + next_hand.rotation = rotation + next_hand.rect_points = rotated_rect_to_points(next_hand.rect_x_center_a, next_hand.rect_y_center_a, next_hand.rect_w_a, next_hand.rect_h_a, next_hand.rotation) + return next_hand + +def warp_rect_img(rect_points, img, w, h): + src = np.array(rect_points[1:], dtype=np.float32) # rect_points[0] is left bottom point ! + dst = np.array([(0, 0), (h, 0), (h, w)], dtype=np.float32) + mat = cv2.getAffineTransform(src, dst) + return cv2.warpAffine(img, mat, (w, h)) + +def distance(a, b): + """ + a, b: 2 points (in 2D or 3D) + """ + return np.linalg.norm(a-b) + +def angle(a, b, c): + # https://stackoverflow.com/questions/35176451/python-code-to-calculate-angle-between-three-point-using-their-3d-coordinates + # a, b and c : points as np.array([x, y, z]) + ba = a - b + bc = c - b + cosine_angle = np.dot(ba, bc) / (np.linalg.norm(ba) * np.linalg.norm(bc)) + angle = np.arccos(cosine_angle) + + return np.degrees(angle) + +def find_isp_scale_params(size, resolution, is_height=True): + """ + Find closest valid size close to 'size' and and the corresponding parameters to setIspScale() + This function is useful to work around a bug in depthai where ImageManip is scrambling images that have an invalid size + resolution: sensor resolution (width, height) + is_height : boolean that indicates if the value 'size' represents the height or the width of the image + Returns: valid size, (numerator, denominator) + """ + # We want size >= 288 (first compatible size > lm_input_size) + if size < 288: + size = 288 + + width, height = resolution + + # We are looking for the list on integers that are divisible by 16 and + # that can be written like n/d where n <= 16 and d <= 63 + if is_height: + reference = height + other = width + else: + reference = width + other = height + size_candidates = {} + for s in range(288,reference,16): + f = gcd(reference, s) + n = s//f + d = reference//f + if n <= 16 and d <= 63 and int(round(other * n / d) % 2 == 0): + size_candidates[s] = (n, d) + + # What is the candidate size closer to 'size' ? + min_dist = -1 + for s in size_candidates: + dist = abs(size - s) + if min_dist == -1: + min_dist = dist + candidate = s + else: + if dist > min_dist: break + candidate = s + min_dist = dist + return candidate, size_candidates[candidate] + +def recognize_gesture(hand): + # Finger states + # state: -1=unknown, 0=close, 1=open + d_3_5 = distance(hand.norm_landmarks[3], hand.norm_landmarks[5]) + d_2_3 = distance(hand.norm_landmarks[2], hand.norm_landmarks[3]) + angle0 = angle(hand.norm_landmarks[0], hand.norm_landmarks[1], hand.norm_landmarks[2]) + angle1 = angle(hand.norm_landmarks[1], hand.norm_landmarks[2], hand.norm_landmarks[3]) + angle2 = angle(hand.norm_landmarks[2], hand.norm_landmarks[3], hand.norm_landmarks[4]) + hand.thumb_angle = angle0+angle1+angle2 + if angle0+angle1+angle2 > 460 and d_3_5 / d_2_3 > 1.2: + hand.thumb_state = 1 + else: + hand.thumb_state = 0 + + if hand.norm_landmarks[8][1] < hand.norm_landmarks[7][1] < hand.norm_landmarks[6][1]: + hand.index_state = 1 + elif hand.norm_landmarks[6][1] < hand.norm_landmarks[8][1]: + hand.index_state = 0 + else: + hand.index_state = -1 + + if hand.norm_landmarks[12][1] < hand.norm_landmarks[11][1] < hand.norm_landmarks[10][1]: + hand.middle_state = 1 + elif hand.norm_landmarks[10][1] < hand.norm_landmarks[12][1]: + hand.middle_state = 0 + else: + hand.middle_state = -1 + + if hand.norm_landmarks[16][1] < hand.norm_landmarks[15][1] < hand.norm_landmarks[14][1]: + hand.ring_state = 1 + elif hand.norm_landmarks[14][1] < hand.norm_landmarks[16][1]: + hand.ring_state = 0 + else: + hand.ring_state = -1 + + if hand.norm_landmarks[20][1] < hand.norm_landmarks[19][1] < hand.norm_landmarks[18][1]: + hand.little_state = 1 + elif hand.norm_landmarks[18][1] < hand.norm_landmarks[20][1]: + hand.little_state = 0 + else: + hand.little_state = -1 + + # Gesture + if hand.thumb_state == 1 and hand.index_state == 1 and hand.middle_state == 1 and hand.ring_state == 1 and hand.little_state == 1: + hand.gesture = "FIVE" + elif hand.thumb_state == 0 and hand.index_state == 0 and hand.middle_state == 0 and hand.ring_state == 0 and hand.little_state == 0: + hand.gesture = "FIST" + elif hand.thumb_state == 1 and hand.index_state == 0 and hand.middle_state == 0 and hand.ring_state == 0 and hand.little_state == 0: + hand.gesture = "OK" + elif hand.thumb_state == 0 and hand.index_state == 1 and hand.middle_state == 1 and hand.ring_state == 0 and hand.little_state == 0: + hand.gesture = "PEACE" + elif hand.thumb_state == 0 and hand.index_state == 1 and hand.middle_state == 0 and hand.ring_state == 0 and hand.little_state == 0: + hand.gesture = "ONE" + elif hand.thumb_state == 1 and hand.index_state == 1 and hand.middle_state == 0 and hand.ring_state == 0 and hand.little_state == 0: + hand.gesture = "TWO" + elif hand.thumb_state == 1 and hand.index_state == 1 and hand.middle_state == 1 and hand.ring_state == 0 and hand.little_state == 0: + hand.gesture = "THREE" + elif hand.thumb_state == 0 and hand.index_state == 1 and hand.middle_state == 1 and hand.ring_state == 1 and hand.little_state == 1: + hand.gesture = "FOUR" + else: + hand.gesture = None + + +# Movenet + +class Body: + def __init__(self, scores=None, keypoints_norm=None, keypoints=None, score_thresh=None, crop_region=None, next_crop_region=None): + """ + Attributes: + scores : scores of the keypoints + keypoints_norm : keypoints normalized ([0,1]) coordinates (x,y) in the squared cropped region + keypoints_square : keypoints coordinates (x,y) in pixels in the square padded image + keypoints : keypoints coordinates (x,y) in pixels in the source image (not padded) + score_thresh : score threshold used + crop_region : cropped region on which the current body was inferred + next_crop_region : cropping region calculated from the current body keypoints and that will be used on next frame + """ + self.scores = scores + self.keypoints_norm = keypoints_norm + self.keypoints = keypoints + self.score_thresh = score_thresh + self.crop_region = crop_region + self.next_crop_region = next_crop_region + # self.keypoints_square = (self.keypoints_norm * self.crop_region.size).astype(np.int32) + self.keypoints = (np.array([self.crop_region.xmin, self.crop_region.ymin]) + self.keypoints_norm * self.crop_region.size).astype(np.int32) + + def print(self): + attrs = vars(self) + print('\n'.join("%s: %s" % item for item in attrs.items())) + + def distance_to_wrist(self, hand, wrist_handedness, pad_w=0, pad_h=0): + """ + Calculate the distance between a hand (class Hand) wrist position + and one of the body wrist given by wrist_handedness (= "left" or "right") + As the hand.landmarks cooordinates are expressed in the padded image, we must substract the padding (given by pad_w and pad_w) + to be coherent with the body keypoint coordinates which are expressed in the source image. + """ + return distance(hand.landmarks[0]-np.array([pad_w, pad_h]), self.keypoints[BODY_KP[wrist_handedness+'_wrist']]) + +CropRegion = namedtuple('CropRegion',['xmin', 'ymin', 'xmax', 'ymax', 'size']) # All values are in pixel. The region is a square of size 'size' pixels + +# Dictionary that maps from joint names to keypoint indices. +BODY_KP = { + 'nose': 0, + 'left_eye': 1, + 'right_eye': 2, + 'left_ear': 3, + 'right_ear': 4, + 'left_shoulder': 5, + 'right_shoulder': 6, + 'left_elbow': 7, + 'right_elbow': 8, + 'left_wrist': 9, + 'right_wrist': 10, + 'left_hip': 11, + 'right_hip': 12, + 'left_knee': 13, + 'right_knee': 14, + 'left_ankle': 15, + 'right_ankle': 16 +} + +class BodyPreFocusing: + """ + Body Pre Focusing with Movenet + Contains all is needed for : + - Movenet smart cropping (determines from the body detected in frame N, + the region of frame N+1 ow which the Movenet inference is run). + - Body Pre Focusing (determining from the Movenet wrist keypoints a smaller zone + on which Palm detection is run). + Both Smart cropping and Body Pre Focusing are important for model accuracy when + the body is far. + """ + def __init__(self, img_w, img_h, pad_w, pad_h, frame_size, mode="group", score_thresh=0.2, scale=1.0, hands_up_only=True): + + self.img_w = img_w + self.img_h = img_h + self.pad_w = pad_w + self.pad_h = pad_h + self.frame_size = frame_size + self.mode = mode + self.score_thresh = score_thresh + self.scale = scale + self.hands_up_only = hands_up_only + # Defines the default crop region (pads the full image from both sides to make it a square image) + # Used when the algorithm cannot reliably determine the crop region from the previous frame. + self.init_crop_region = CropRegion(-self.pad_w, -self.pad_h,-self.pad_w+self.frame_size, -self.pad_h+self.frame_size, self.frame_size) + + + def torso_visible(self, scores): + """Checks whether there are enough torso keypoints. + + This function checks whether the model is confident at predicting one of the + shoulders/hips which is required to determine a good crop region. + """ + return ((scores[BODY_KP['left_hip']] > self.score_thresh or + scores[BODY_KP['right_hip']] > self.score_thresh) and + (scores[BODY_KP['left_shoulder']] > self.score_thresh or + scores[BODY_KP['right_shoulder']] > self.score_thresh)) + + def determine_torso_and_body_range(self, keypoints, scores, center_x, center_y): + """Calculates the maximum distance from each keypoints to the center location. + + The function returns the maximum distances from the two sets of keypoints: + full 17 keypoints and 4 torso keypoints. The returned information will be + used to determine the crop size. See determine_crop_region for more detail. + """ + torso_joints = ['left_shoulder', 'right_shoulder', 'left_hip', 'right_hip'] + max_torso_yrange = 0.0 + max_torso_xrange = 0.0 + for joint in torso_joints: + dist_y = abs(center_y - keypoints[BODY_KP[joint]][1]) + dist_x = abs(center_x - keypoints[BODY_KP[joint]][0]) + if dist_y > max_torso_yrange: + max_torso_yrange = dist_y + if dist_x > max_torso_xrange: + max_torso_xrange = dist_x + + max_body_yrange = 0.0 + max_body_xrange = 0.0 + for i in range(len(BODY_KP)): + if scores[i] < self.score_thresh: + continue + dist_y = abs(center_y - keypoints[i][1]) + dist_x = abs(center_x - keypoints[i][0]) + if dist_y > max_body_yrange: + max_body_yrange = dist_y + if dist_x > max_body_xrange: + max_body_xrange = dist_x + + return [max_torso_yrange, max_torso_xrange, max_body_yrange, max_body_xrange] + + def determine_crop_region(self, body): + """Determines the region to crop the image for the model to run inference on. + + The algorithm uses the detected joints from the previous frame to estimate + the square region that encloses the full body of the target person and + centers at the midpoint of two hip joints. The crop size is determined by + the distances between each joints and the center point. + When the model is not confident with the four torso joint predictions, the + function returns a default crop which is the full image padded to square. + """ + if self.torso_visible(body.scores): + center_x = (body.keypoints[BODY_KP['left_hip']][0] + body.keypoints[BODY_KP['right_hip']][0]) // 2 + center_y = (body.keypoints[BODY_KP['left_hip']][1] + body.keypoints[BODY_KP['right_hip']][1]) // 2 + max_torso_yrange, max_torso_xrange, max_body_yrange, max_body_xrange = self.determine_torso_and_body_range(body.keypoints, body.scores, center_x, center_y) + crop_length_half = np.amax([max_torso_xrange * 1.9, max_torso_yrange * 1.9, max_body_yrange * 1.2, max_body_xrange * 1.2]) + tmp = np.array([center_x, self.img_w - center_x, center_y, self.img_h - center_y]) + crop_length_half = int(round(np.amin([crop_length_half, np.amax(tmp)]))) + crop_corner = [center_x - crop_length_half, center_y - crop_length_half] + + if crop_length_half > max(self.img_w, self.img_h) / 2: + return self.init_crop_region + else: + crop_length = crop_length_half * 2 + return CropRegion(crop_corner[0], crop_corner[1], crop_corner[0]+crop_length, crop_corner[1]+crop_length,crop_length) + else: + return self.init_crop_region + + """ + Body Pre Focusing stuff + """ + def torso_visible(self, scores): + """Checks whether there are enough torso keypoints. + + This function checks whether the model is confident at predicting one of the + shoulders/hips which is required to determine a good crop region. + """ + return ((scores[BODY_KP['left_hip']] > self.score_thresh or + scores[BODY_KP['right_hip']] > self.score_thresh) and + (scores[BODY_KP['left_shoulder']] > self.score_thresh or + scores[BODY_KP['right_shoulder']] > self.score_thresh)) + + def determine_torso_and_body_range(self, keypoints, scores, center_x, center_y): + """Calculates the maximum distance from each keypoints to the center location. + + The function returns the maximum distances from the two sets of keypoints: + full 17 keypoints and 4 torso keypoints. The returned information will be + used to determine the crop size. See determine_crop_region for more detail. + """ + torso_joints = ['left_shoulder', 'right_shoulder', 'left_hip', 'right_hip'] + max_torso_yrange = 0.0 + max_torso_xrange = 0.0 + for joint in torso_joints: + dist_y = abs(center_y - keypoints[BODY_KP[joint]][1]) + dist_x = abs(center_x - keypoints[BODY_KP[joint]][0]) + if dist_y > max_torso_yrange: + max_torso_yrange = dist_y + if dist_x > max_torso_xrange: + max_torso_xrange = dist_x + + max_body_yrange = 0.0 + max_body_xrange = 0.0 + for i in range(len(BODY_KP)): + if scores[i] < self.score_thresh: + continue + dist_y = abs(center_y - keypoints[i][1]) + dist_x = abs(center_x - keypoints[i][0]) + if dist_y > max_body_yrange: + max_body_yrange = dist_y + if dist_x > max_body_xrange: + max_body_xrange = dist_x + + return [max_torso_yrange, max_torso_xrange, max_body_yrange, max_body_xrange] + + def determine_crop_region(self, body): + """Determines the region to crop the image for the model to run inference on. + + The algorithm uses the detected joints from the previous frame to estimate + the square region that encloses the full body of the target person and + centers at the midpoint of two hip joints. The crop size is determined by + the distances between each joints and the center point. + When the model is not confident with the four torso joint predictions, the + function returns a default crop which is the full image padded to square. + """ + if self.torso_visible(body.scores): + center_x = (body.keypoints[BODY_KP['left_hip']][0] + body.keypoints[BODY_KP['right_hip']][0]) // 2 + center_y = (body.keypoints[BODY_KP['left_hip']][1] + body.keypoints[BODY_KP['right_hip']][1]) // 2 + max_torso_yrange, max_torso_xrange, max_body_yrange, max_body_xrange = self.determine_torso_and_body_range(body.keypoints, body.scores, center_x, center_y) + crop_length_half = np.amax([max_torso_xrange * 1.9, max_torso_yrange * 1.9, max_body_yrange * 1.2, max_body_xrange * 1.2]) + tmp = np.array([center_x, self.img_w - center_x, center_y, self.img_h - center_y]) + crop_length_half = int(round(np.amin([crop_length_half, np.amax(tmp)]))) + crop_corner = [center_x - crop_length_half, center_y - crop_length_half] + + if crop_length_half > max(self.img_w, self.img_h) / 2: + return self.init_crop_region + else: + crop_length = crop_length_half * 2 + return CropRegion(crop_corner[0], crop_corner[1], crop_corner[0]+crop_length, crop_corner[1]+crop_length,crop_length) + else: + return self.init_crop_region + + def estimate_focus_zone_size(self, body): + """ + This function is called if at least the segment "wrist_elbow" is visible. + We calculate the length of every segment from a predefined list. A segment length + is the distance between the 2 endpoints weighted by a coefficient. The weight have been chosen + so that the length of all segments are roughly equal. We take the maximal length to estimate + the size of the focus zone. + If no segment are vissible, we consider the body is very close + to the camera, and therefore there is no need to focus. Return 0 + To not have at least one shoulder and one hip visible means the body is also very close + and the estimated size needs to be adjusted (bigger) + """ + segments = [ + ("left_shoulder", "left_elbow", 2.3), + ("left_elbow", "left_wrist", 2.3), + ("left_shoulder", "left_hip", 1), + ("left_shoulder", "right_shoulder", 1.5), + ("right_shoulder", "right_elbow", 2.3), + ("right_elbow", "right_wrist", 2.3), + ("right_shoulder", "right_hip", 1), + ] + lengths = [] + for s in segments: + if body.scores[BODY_KP[s[0]]] > self.score_thresh and body.scores[BODY_KP[s[1]]] > self.score_thresh: + l = np.linalg.norm(body.keypoints[BODY_KP[s[0]]] - body.keypoints[BODY_KP[s[1]]]) + lengths.append(l) + if lengths: + if ( body.scores[BODY_KP["left_hip"]] < self.score_thresh and + body.scores[BODY_KP["right_hip"]] < self.score_thresh or + body.scores[BODY_KP["left_shoulder"]] < self.score_thresh and + body.scores[BODY_KP["right_shoulder"]] < self.score_thresh) : + coef = 1.5 + else: + coef = 1.0 + return 2 * int(coef * self.scale * max(lengths) / 2) # The size is made even + else: + return 0 + + def get_focus_zone(self, body): + """ + Return a tuple (focus_zone, label) + 'body' = instance of class Body + 'focus_zone' is a zone around a hand or hands, depending on the value + of self.mode ("left", "right", "higher" or "group") and on the value of self.hands_up_only. + - self.mode = "left" (resp "right"): we are looking for the zone around the left (resp right) wrist, + - self.mode = "group": the zone encompasses both wrists, + - self.mode = "higher": the zone is around the higher wrist (smaller y value), + - self.hands_up_only = True: we don't take into consideration the wrist if the corresponding elbow is above the wrist, + focus_zone is a list [left, top, right, bottom] defining the top-left and right-bottom corners of a square. + Values are expressed in pixels in the source image C.S. + The zone is constrained to the squared source image (= source image with padding if necessary). + It means that values can be negative. + left and right in [-pad_w, img_w + pad_w] + top and bottom in [-pad_h, img_h + pad_h] + 'label' describes which wrist keypoint(s) were used to build the zone : "left", "right" or "group" (if built from both wrists) + + If the wrist keypoint(s) is(are) not present or is(are) present but self.hands_up_only = True and + wrist(s) is(are) below corresponding elbow(s), then focus_zone = None. + """ + + def zone_from_center_size(x, y, size): + """ + Return zone [left, top, right, bottom] + from zone center (x,y) and zone size (the zone is square). + """ + half_size = size // 2 + size = half_size * 2 + if size > self.img_w: + x = self.img_w // 2 + x1 = x - half_size + if x1 < -self.pad_w: + x1 = -self.pad_w + elif x1 + size > self.img_w + self.pad_w: + x1 = self.img_w + self.pad_w - size + x2 = x1 + size + if size > self.img_h: + y = self.img_h // 2 + y1 = y - half_size + if y1 < -self.pad_h: + y1 = -self.pad_h + elif y1 + size > self.img_h + self.pad_h: + y1 = self.img_h + self.pad_h - size + y2 = y1 + size + return [x1, y1, x2, y2] + + + def get_one_hand_zone(hand_label): + """ + Return the zone [left, top, right, bottom] around the hand given by its label "hand_label" ("left" or "right") + Values are expressed in pixels in the source image C.S. + If the wrist keypoint is not visible, return None. + If self.hands_up_only is True, return None if wrist keypoint is below elbow keypoint. + """ + wrist_kp = hand_label + "_wrist" + wrist_score = body.scores[BODY_KP[wrist_kp]] + if wrist_score < self.score_thresh: + return None + x, y = body.keypoints[BODY_KP[wrist_kp]] + if self.hands_up_only: + # We want to detect only hands where the wrist is above the elbow (when visible) + elbow_kp = hand_label + "_elbow" + if body.scores[BODY_KP[elbow_kp]] > self.score_thresh and \ + body.keypoints[BODY_KP[elbow_kp]][1] < body.keypoints[BODY_KP[wrist_kp]][1]: + return None + # Let's evaluate the size of the focus zone + size = self.estimate_focus_zone_size(body) + if size == 0: return [-self.pad_w, -self.pad_h, self.frame_size-self.pad_w, self.frame_size-self.pad_h] # The hand is too close. No need to focus + return zone_from_center_size(x, y, size) + + if self.mode == "group": + zonel = get_one_hand_zone("left") + if zonel: + zoner = get_one_hand_zone("right") + if zoner: + xl1, yl1, xl2, yl2 = zonel + xr1, yr1, xr2, yr2 = zoner + x1 = min(xl1, xr1) + y1 = min(yl1, yr1) + x2 = max(xl2, xr2) + y2 = max(yl2, yr2) + # Global zone center (x,y) + x = int((x1+x2)/2) + y = int((y1+y2)/2) + size_x = x2-x1 + size_y = y2-y1 + size = 2 * (max(size_x, size_y) // 2) + return (zone_from_center_size(x, y, size), "group") + else: + return (zonel, "left") + else: + return (get_one_hand_zone("right"), "right") + elif self.mode == "higher": + if body.scores[BODY_KP["left_wrist"]] > self.score_thresh: + if body.scores[BODY_KP["right_wrist"]] > self.score_thresh: + if body.keypoints[BODY_KP["left_wrist"]][1] > body.keypoints[BODY_KP["right_wrist"]][1]: + hand_label = "right" + else: + hand_label = "left" + else: + hand_label = "left" + else: + if body.scores[BODY_KP["right_wrist"]] > self.score_thresh: + hand_label = "right" + else: + return (None, None) + return (get_one_hand_zone(hand_label), hand_label) + else: # "left" or "right" + return (get_one_hand_zone(self.mode), self.mode) + + diff --git a/depthai_sdk/src/depthai_sdk/components/hand_tracker/renderer.py b/depthai_sdk/src/depthai_sdk/components/hand_tracker/renderer.py new file mode 100644 index 000000000..99dd092da --- /dev/null +++ b/depthai_sdk/src/depthai_sdk/components/hand_tracker/renderer.py @@ -0,0 +1,180 @@ +import cv2 +import numpy as np + +LINES_HAND = [[0,1],[1,2],[2,3],[3,4], + [0,5],[5,6],[6,7],[7,8], + [5,9],[9,10],[10,11],[11,12], + [9,13],[13,14],[14,15],[15,16], + [13,17],[17,18],[18,19],[19,20],[0,17]] + +# LINES_BODY to draw the body skeleton when Body Pre Focusing is used +LINES_BODY = [[4,2],[2,0],[0,1],[1,3], + [10,8],[8,6],[6,5],[5,7],[7,9], + [6,12],[12,11],[11,5], + [12,14],[14,16],[11,13],[13,15]] + +class HandTrackerRenderer: + def __init__(self, + tracker, + output=None): + + self.tracker = tracker + + self.show_pd_box = False + self.show_pd_kps = False + self.show_rot_rect = False + self.show_handedness = 0 + self.show_landmarks = True + self.show_scores = False + self.show_gesture = self.tracker.use_gesture + + + self.show_xyz_zone = self.show_xyz = self.tracker.xyz + self.show_fps = True + self.show_body = False # self.tracker.body_pre_focusing is not None + self.show_inferences_status = False + + if output is None: + self.output = None + else: + fourcc = cv2.VideoWriter_fourcc(*"MJPG") + self.output = cv2.VideoWriter(output,fourcc,self.tracker.video_fps,(self.tracker.img_w, self.tracker.img_h)) + + def norm2abs(self, x_y): + x = int(x_y[0] * self.tracker.frame_size - self.tracker.pad_w) + y = int(x_y[1] * self.tracker.frame_size - self.tracker.pad_h) + return (x, y) + + def draw_hand(self, hand): + + if self.tracker.use_lm: + # (info_ref_x, info_ref_y): coords in the image of a reference point + # relatively to which hands information (score, handedness, xyz,...) are drawn + info_ref_x = hand.landmarks[0,0] + info_ref_y = np.max(hand.landmarks[:,1]) + + # thick_coef is used to adapt the size of the draw landmarks features according to the size of the hand. + thick_coef = hand.rect_w_a / 400 + if hand.lm_score > self.tracker.lm_score_thresh: + if self.show_rot_rect: + cv2.polylines(self.frame, [np.array(hand.rect_points)], True, (0,255,255), 2, cv2.LINE_AA) + if self.show_landmarks: + lines = [np.array([hand.landmarks[point] for point in line]).astype(np.int32) for line in LINES_HAND] + if self.show_handedness == 3: + color = (0,255,0) if hand.handedness > 0.5 else (0,0,255) + else: + color = (255, 0, 0) + cv2.polylines(self.frame, lines, False, color, int(1+thick_coef*3), cv2.LINE_AA) + radius = int(1+thick_coef*5) + if self.tracker.use_gesture: + # color depending on finger state (1=open, 0=close, -1=unknown) + color = { 1: (0,255,0), 0: (0,0,255), -1:(0,255,255)} + cv2.circle(self.frame, (hand.landmarks[0][0], hand.landmarks[0][1]), radius, color[-1], -1) + for i in range(1,5): + cv2.circle(self.frame, (hand.landmarks[i][0], hand.landmarks[i][1]), radius, color[hand.thumb_state], -1) + for i in range(5,9): + cv2.circle(self.frame, (hand.landmarks[i][0], hand.landmarks[i][1]), radius, color[hand.index_state], -1) + for i in range(9,13): + cv2.circle(self.frame, (hand.landmarks[i][0], hand.landmarks[i][1]), radius, color[hand.middle_state], -1) + for i in range(13,17): + cv2.circle(self.frame, (hand.landmarks[i][0], hand.landmarks[i][1]), radius, color[hand.ring_state], -1) + for i in range(17,21): + cv2.circle(self.frame, (hand.landmarks[i][0], hand.landmarks[i][1]), radius, color[hand.little_state], -1) + else: + if self.show_handedness == 2: + color = (0,255,0) if hand.handedness > 0.5 else (0,0,255) + elif self.show_handedness == 3: + color = (255, 0, 0) + else: + color = (0,128,255) + for x,y in hand.landmarks[:,:2]: + cv2.circle(self.frame, (int(x), int(y)), radius, color, -1) + + if self.show_handedness == 1: + cv2.putText(self.frame, f"{hand.label.upper()} {hand.handedness:.2f}", + (info_ref_x-90, info_ref_y+40), + cv2.FONT_HERSHEY_PLAIN, 2, (0,255,0) if hand.handedness > 0.5 else (0,0,255), 2) + if self.show_scores: + cv2.putText(self.frame, f"Landmark score: {hand.lm_score:.2f}", + (info_ref_x-90, info_ref_y+110), + cv2.FONT_HERSHEY_PLAIN, 2, (255,255,0), 2) + if self.tracker.use_gesture and self.show_gesture: + cv2.putText(self.frame, hand.gesture, (info_ref_x-20, info_ref_y-50), + cv2.FONT_HERSHEY_PLAIN, 3, (255,255,255), 3) + + if hand.pd_box is not None: + box = hand.pd_box + box_tl = self.norm2abs((box[0], box[1])) + box_br = self.norm2abs((box[0]+box[2], box[1]+box[3])) + if self.show_pd_box: + cv2.rectangle(self.frame, box_tl, box_br, (0,255,0), 2) + if self.show_pd_kps: + for i,kp in enumerate(hand.pd_kps): + x_y = self.norm2abs(kp) + cv2.circle(self.frame, x_y, 6, (0,0,255), -1) + cv2.putText(self.frame, str(i), (x_y[0], x_y[1]+12), cv2.FONT_HERSHEY_PLAIN, 1.5, (0,255,0), 2) + if self.show_scores: + if self.tracker.use_lm: + x, y = info_ref_x - 90, info_ref_y + 80 + else: + x, y = box_tl[0], box_br[1]+60 + cv2.putText(self.frame, f"Palm score: {hand.pd_score:.2f}", + (x, y), + cv2.FONT_HERSHEY_PLAIN, 2, (255,255,0), 2) + + if self.show_xyz: + if self.tracker.use_lm: + x0, y0 = info_ref_x - 40, info_ref_y + 40 + else: + x0, y0 = box_tl[0], box_br[1]+20 + cv2.rectangle(self.frame, (x0,y0), (x0+100, y0+85), (220,220,240), -1) + cv2.putText(self.frame, f"X:{hand.xyz[0]/10:3.0f} cm", (x0+10, y0+20), cv2.FONT_HERSHEY_PLAIN, 1, (20,180,0), 2) + cv2.putText(self.frame, f"Y:{hand.xyz[1]/10:3.0f} cm", (x0+10, y0+45), cv2.FONT_HERSHEY_PLAIN, 1, (255,0,0), 2) + cv2.putText(self.frame, f"Z:{hand.xyz[2]/10:3.0f} cm", (x0+10, y0+70), cv2.FONT_HERSHEY_PLAIN, 1, (0,0,255), 2) + if self.show_xyz_zone: + # Show zone on which the spatial data were calculated + cv2.rectangle(self.frame, tuple(hand.xyz_zone[0:2]), tuple(hand.xyz_zone[2:4]), (180,0,180), 2) + + def draw_body(self, body): + lines = [np.array([body.keypoints[point] for point in line]) for line in LINES_BODY if body.scores[line[0]] > self.tracker.body_score_thresh and body.scores[line[1]] > self.tracker.body_score_thresh] + cv2.polylines(self.frame, lines, False, (255, 144, 30), 2, cv2.LINE_AA) + + def draw_bag(self, bag): + + if self.show_inferences_status: + # Draw inferences status + h = self.frame.shape[0] + u = h // 10 + status="" + if bag.get("bpf_inference", 0): + cv2.rectangle(self.frame, (u, 8*u), (2*u, 9*u), (255,144,30), -1) + if bag.get("pd_inference", 0): + cv2.rectangle(self.frame, (2*u, 8*u), (3*u, 9*u), (0,255,0), -1) + nb_lm_inferences = bag.get("lm_inference", 0) + if nb_lm_inferences: + cv2.rectangle(self.frame, (3*u, 8*u), ((3+nb_lm_inferences)*u, 9*u), (0,0,255), -1) + + body = bag.get("body", False) + if body and self.show_body: + # Draw skeleton + self.draw_body(body) + # Draw Movenet smart cropping rectangle + cv2.rectangle(self.frame, (body.crop_region.xmin, body.crop_region.ymin), (body.crop_region.xmax, body.crop_region.ymax), (0,255,255), 2) + # Draw focus zone + focus_zone= bag.get("focus_zone", None) + if focus_zone: + cv2.rectangle(self.frame, tuple(focus_zone[0:2]), tuple(focus_zone[2:4]), (0,255,0),2) + + def draw(self, frame, hands, bag={}): + self.frame = frame + if bag: + self.draw_bag(bag) + for hand in hands: + self.draw_hand(hand) + return self.frame + + def exit(self): + if self.output: + self.output.release() + cv2.destroyAllWindows() + diff --git a/depthai_sdk/src/depthai_sdk/components/hand_tracker/template_manager_script_duo.py b/depthai_sdk/src/depthai_sdk/components/hand_tracker/template_manager_script_duo.py new file mode 100644 index 000000000..9f09f5ceb --- /dev/null +++ b/depthai_sdk/src/depthai_sdk/components/hand_tracker/template_manager_script_duo.py @@ -0,0 +1,354 @@ +""" +This file is the template of the scripting node source code in edge mode +Substitution is made in HandTrackerEdge.py + +In the following: +rrn_ : normalized [0:1] coordinates in rotated rectangle coordinate systems +sqn_ : normalized [0:1] coordinates in squared input image +""" +import marshal +from math import sin, cos, atan2, pi, degrees, floor, dist +from re import T + +pad_h = ${_pad_h} +img_h = ${_img_h} +img_w = ${_img_w} +frame_size = ${_frame_size} +crop_w = ${_crop_w} + +seq_num = 0 + +${_TRACE1} ("Starting manager script node") + +${_IF_USE_HANDEDNESS_AVERAGE} +class HandednessAverage: + # Used to store the average handeness + # Why ? Handedness inferred by the landmark model is not perfect. For certain poses, it is not rare that the model thinks + # that a right hand is a left hand (or vice versa). Instead of using the last inferred handedness, we prefer to use the average + # of the inferred handedness on the last frames. This gives more robustness. + def __init__(self): + self._total_handedness = 0 + self._nb = 0 + def update(self, new_handedness): + self._total_handedness += new_handedness + self._nb += 1 + return self._total_handedness / self._nb + def reset(self): + self._total_handedness = self._nb = 0 + +handedness_avg = [HandednessAverage(), HandednessAverage()] +nb_hands_in_previous_frame = 0 +${_IF_USE_HANDEDNESS_AVERAGE} + +single_hand_count = 0 + +# BufferMgr is used to statically allocate buffers once +# (replace dynamic allocation). +# These buffers are used for sending result to host +class BufferMgr: + def __init__(self): + self._bufs = {} + def __call__(self, size): + try: + buf = self._bufs[size] + except KeyError: + buf = self._bufs[size] = NNData(size) + ${_TRACE2} (f"New buffer allocated: {size}") + return buf + +buffer_mgr = BufferMgr() + +def send_result(result): + result_serial = marshal.dumps(result) + buffer = buffer_mgr(len(result_serial)) + buffer.getData()[:] = result_serial + buffer.setSequenceNum(seq_num) + node.io['host'].send(buffer) + ${_TRACE2} ("Manager sent result to host") + +# pd_inf: boolean. Has the palm detection run on the frame ? +# nb_lm_inf: 0 or 1 (or 2 in duo mode). Number of landmark regression inferences on the frame. +# pd_inf=True and nb_lm_inf=0 means the palm detection hasn't found any hand +# pd_inf, nb_lm_inf are used for statistics +def send_result_no_hand(pd_inf, nb_lm_inf): + result = dict([("pd_inf", pd_inf), ("nb_lm_inf", nb_lm_inf)]) + send_result(result) + +def send_result_hands(pd_inf, nb_lm_inf, lm_score, handedness, rect_center_x, rect_center_y, rect_size, rotation, rrn_lms, sqn_lms, world_lms, xyz, xyz_zone): + result = dict([("pd_inf", pd_inf), ("nb_lm_inf", nb_lm_inf), ("lm_score", lm_score), ("handedness", handedness), ("rotation", rotation), + ("rect_center_x", rect_center_x), ("rect_center_y", rect_center_y), ("rect_size", rect_size), ("rrn_lms", rrn_lms), ('sqn_lms', sqn_lms), + ("world_lms", world_lms), ("xyz", xyz), ("xyz_zone", xyz_zone)]) + send_result(result) + +def rr2img(rrn_x, rrn_y): + # Convert a point (rrn_x, rrn_y) expressed in normalized rotated rectangle (rrn) + # into (X, Y) expressed in normalized image (sqn) + X = sqn_rr_center_x + sqn_rr_size * ((rrn_x - 0.5) * cos_rot + (0.5 - rrn_y) * sin_rot) + Y = sqn_rr_center_y + sqn_rr_size * ((rrn_y - 0.5) * cos_rot + (rrn_x - 0.5) * sin_rot) + return X, Y + +def normalize_radians(angle): + return angle - 2 * pi * floor((angle + pi) / (2 * pi)) + +# send_new_frame_to_branch defines on which branch new incoming frames are sent +# 1 = palm detection branch +# 2 = hand landmark branch +send_new_frame_to_branch = 1 + +cfg_pre_pd = ImageManipConfig() +cfg_pre_pd.setResizeThumbnail(128, 128, 0, 0, 0) + +id_wrist = 0 +id_index_mcp = 5 +id_middle_mcp = 9 +id_ring_mcp =13 +ids_for_bounding_box = [0, 1, 2, 3, 5, 6, 9, 10, 13, 14, 17, 18] + +lm_input_size = 224 + +detected_hands = [] + + + +while True: + nb_lm_inf = 0 + if send_new_frame_to_branch == 1: # Routing frame to pd branch + hands = [] + node.io['pre_pd_manip_cfg'].send(cfg_pre_pd) + ${_TRACE2} ("Manager sent thumbnail config to pre_pd manip") + # Wait for pd post processing's result + pd_nndata = node.io['from_post_pd_nn'].get() + seq_num = pd_nndata.getSequenceNum() + detection = pd_nndata.getLayerFp16("result") + ${_TRACE2} (f"Manager received pd result (len={len(detection)}) : "+str(detection)) + # detection is list of 2x8 float + # Looping the detection twice to obtain data for 2 hands + for i in range(2): + pd_score, box_x, box_y, box_size, kp0_x, kp0_y, kp2_x, kp2_y = detection[i*8:(i+1)*8] + if pd_score >= ${_pd_score_thresh} and box_size > 0: + # scale_center_x = sqn_scale_x - sqn_rr_center_x + # scale_center_y = sqn_scale_y - sqn_rr_center_y + kp02_x = kp2_x - kp0_x + kp02_y = kp2_y - kp0_y + sqn_rr_size = 2.9 * box_size + rotation = 0.5 * pi - atan2(-kp02_y, kp02_x) + rotation = normalize_radians(rotation) + sqn_rr_center_x = box_x + 0.5*box_size*sin(rotation) + sqn_rr_center_y = box_y - 0.5*box_size*cos(rotation) + hands.append([sqn_rr_size, rotation, sqn_rr_center_x, sqn_rr_center_y]) + + ${_TRACE1} (f"Palm detection - nb hands detected: {len(hands)}") + # If list is empty, meaning no hand is detected + if len(hands) == 0: + send_result_no_hand(True, 0) + send_new_frame_to_branch = 1 + ${_IF_USE_HANDEDNESS_AVERAGE} + nb_hands_in_previous_frame = 0 + ${_IF_USE_HANDEDNESS_AVERAGE} + continue + + if not(nb_hands_in_previous_frame == 1 and len(hands) <= 1): + detected_hands = hands + else: + # otherwise detected_hands come from last frame + ${_TRACE1} (f"Keep previous landmarks") + pass + + # Constructing input data for landmark inference, the input data of both hands are sent for inference without + # waiting for inference results. + last_hand = len(detected_hands) - 1 + for i,hand in enumerate(detected_hands): + sqn_rr_size, rotation, sqn_rr_center_x, sqn_rr_center_y = hand + # Tell pre_lm_manip how to crop hand region + rr = RotatedRect() + rr.center.x = sqn_rr_center_x + rr.center.y = (sqn_rr_center_y * frame_size - pad_h) / img_h + rr.size.width = sqn_rr_size + rr.size.height = sqn_rr_size * frame_size / img_h + rr.angle = degrees(rotation) + cfg = ImageManipConfig() + cfg.setCropRotatedRect(rr, True) + cfg.setResize(lm_input_size, lm_input_size) + ${_IF_USE_SAME_IMAGE} + cfg.setReusePreviousImage(False if i == last_hand else True) + ${_IF_USE_SAME_IMAGE} + node.io['pre_lm_manip_cfg'].send(cfg) + nb_lm_inf += 1 + ${_TRACE2} ("Manager sent config to pre_lm manip") + + hand_landmarks = dict([("lm_score", []), ("handedness", []), ("rotation", []), + ("rect_center_x", []), ("rect_center_y", []), ("rect_size", []), ("rrn_lms", []), ('sqn_lms', []), + ("world_lms", []), ("xyz", []), ("xyz_zone", [])]) + + updated_detect_hands = [] + + # Retrieve inference results in here for both hands + for ih, hand in enumerate(detected_hands): + sqn_rr_size, rotation, sqn_rr_center_x, sqn_rr_center_y = hand + # Wait for lm's result + lm_result = node.io['from_lm_nn'].get() + seq_num = lm_result.getSequenceNum() + ${_TRACE2} ("Manager received result from lm nn") + lm_score = lm_result.getLayerFp16("Identity_1")[0] + if lm_score > ${_lm_score_thresh}: + handedness = lm_result.getLayerFp16("Identity_2")[0] + + rrn_lms = lm_result.getLayerFp16("Identity_dense/BiasAdd/Add") + world_lms = 0 + ${_IF_USE_WORLD_LANDMARKS} + world_lms = lm_result.getLayerFp16("Identity_3_dense/BiasAdd/Add") + ${_IF_USE_WORLD_LANDMARKS} + # Retroproject landmarks into the original squared image + sqn_lms = [] + cos_rot = cos(rotation) + sin_rot = sin(rotation) + for i in range(21): + rrn_lms[3*i] /= lm_input_size + rrn_lms[3*i+1] /= lm_input_size + rrn_lms[3*i+2] /= lm_input_size #* 0.4 + sqn_x, sqn_y = rr2img(rrn_lms[3*i], rrn_lms[3*i+1]) + sqn_lms += [sqn_x, sqn_y] + xyz = 0 + xyz_zone = 0 + # Query xyz + ${_IF_XYZ} + cfg = SpatialLocationCalculatorConfig() + conf_data = SpatialLocationCalculatorConfigData() + conf_data.depthThresholds.lowerThreshold = 100 + conf_data.depthThresholds.upperThreshold = 10000 + zone_size = max(int(sqn_rr_size * frame_size / 10), 8) + c_x = int(sqn_lms[0] * frame_size -zone_size/2 + crop_w) + c_y = int(sqn_lms[1] * frame_size -zone_size/2 - pad_h) + rect_center = Point2f(c_x, c_y) + rect_size = Size2f(zone_size, zone_size) + conf_data.roi = Rect(rect_center, rect_size) + cfg = SpatialLocationCalculatorConfig() + cfg.addROI(conf_data) + node.io['spatial_location_config'].send(cfg) + ${_TRACE2} ("Manager sent ROI to spatial_location_config") + # Wait xyz response + xyz_data = node.io['spatial_data'].get().getSpatialLocations() + ${_TRACE2} ("Manager received spatial_location") + coords = xyz_data[0].spatialCoordinates + xyz = [coords.x, coords.y, coords.z] + roi = xyz_data[0].config.roi + xyz_zone = [int(roi.topLeft().x - crop_w), int(roi.topLeft().y), int(roi.bottomRight().x - crop_w), int(roi.bottomRight().y)] + ${_IF_XYZ} + + hand_landmarks["lm_score"].append(lm_score) + hand_landmarks["handedness"].append(handedness) + hand_landmarks["rotation"].append(rotation) + hand_landmarks["rect_center_x"].append(sqn_rr_center_x) + hand_landmarks["rect_center_y"].append(sqn_rr_center_y) + hand_landmarks["rect_size"].append(sqn_rr_size) + hand_landmarks["rrn_lms"].append(rrn_lms) + hand_landmarks["sqn_lms"].append(sqn_lms) + hand_landmarks["world_lms"].append(world_lms) + hand_landmarks["xyz"].append(xyz) + hand_landmarks["xyz_zone"].append(xyz_zone) + + # Calculate the ROI for next frame + # Compute rotation + x0 = sqn_lms[0] + y0 = sqn_lms[1] + x1 = 0.25 * (sqn_lms[2*id_index_mcp] + sqn_lms[2*id_ring_mcp]) + 0.5 * sqn_lms[2*id_middle_mcp] + y1 = 0.25 * (sqn_lms[2*id_index_mcp+1] + sqn_lms[2*id_ring_mcp+1]) + 0.5 * sqn_lms[2*id_middle_mcp+1] + rotation = 0.5 * pi - atan2(y0 - y1, x1 - x0) + rotation = normalize_radians(rotation) + # Find boundaries of landmarks + min_x = min_y = 1 + max_x = max_y = 0 + for id in ids_for_bounding_box: + min_x = min(min_x, sqn_lms[2*id]) + max_x = max(max_x, sqn_lms[2*id]) + min_y = min(min_y, sqn_lms[2*id+1]) + max_y = max(max_y, sqn_lms[2*id+1]) + axis_aligned_center_x = 0.5 * (max_x + min_x) + axis_aligned_center_y = 0.5 * (max_y + min_y) + cos_rot = cos(rotation) + sin_rot = sin(rotation) + # Find boundaries of rotated landmarks + min_x = min_y = 1 + max_x = max_y = -1 + for id in ids_for_bounding_box: + original_x = sqn_lms[2*id] - axis_aligned_center_x + original_y = sqn_lms[2*id+1] - axis_aligned_center_y + projected_x = original_x * cos_rot + original_y * sin_rot + projected_y = -original_x * sin_rot + original_y * cos_rot + min_x = min(min_x, projected_x) + max_x = max(max_x, projected_x) + min_y = min(min_y, projected_y) + max_y = max(max_y, projected_y) + projected_center_x = 0.5 * (max_x + min_x) + projected_center_y = 0.5 * (max_y + min_y) + center_x = (projected_center_x * cos_rot - projected_center_y * sin_rot + axis_aligned_center_x) + center_y = (projected_center_x * sin_rot + projected_center_y * cos_rot + axis_aligned_center_y) + width = (max_x - min_x) + height = (max_y - min_y) + sqn_rr_size = 2 * max(width, height) + sqn_rr_center_x = (center_x + 0.1 * height * sin_rot) + sqn_rr_center_y = (center_y - 0.1 * height * cos_rot) + + hand[0] = sqn_rr_size + hand[1] = rotation + hand[2] = sqn_rr_center_x + hand[3] = sqn_rr_center_y + + updated_detect_hands.append(hand) + detected_hands = updated_detect_hands + + ${_TRACE1} (f"Landmarks - nb hands confirmed : {len(detected_hands)}") + + # Check that 2 detected hands do not correspond to the same hand in the image + # That may happen when one hand in the image cross another one + # A simple method is to assure that the center of the rotated rectangles are not too close + if len(detected_hands) == 2: + dist_rr_centers = dist([detected_hands[0][2], detected_hands[0][3]], [detected_hands[1][2], detected_hands[1][3]]) + if dist_rr_centers < 0.02: + # Keep the hand with higher landmark score + if hand_landmarks["lm_score"][0] > hand_landmarks["lm_score"][1]: + pop_i = 1 + else: + pop_i = 0 + for k in hand_landmarks: + hand_landmarks[k].pop(pop_i) + detected_hands.pop(pop_i) + ${_TRACE1} ("!!! Removing one hand because too close to the other one") + + nb_hands = len(detected_hands) + + ${_IF_USE_HANDEDNESS_AVERAGE} + if send_new_frame_to_branch == 1 or nb_hands_in_previous_frame != nb_hands: + for i in range(2): + handedness_avg[i].reset() + ${_TRACE2} (f"Reset handedness_avg") + # Replace current inferred handedness by the average handedness + for i in range(nb_hands): + hand_landmarks["handedness"][i] = handedness_avg[i].update(hand_landmarks["handedness"][i]) + ${_IF_USE_HANDEDNESS_AVERAGE} + + # If the 2 hands have the same handedness, we discard the 2nd one + if nb_hands == 2 and (hand_landmarks["handedness"][0] - 0.5) * (hand_landmarks["handedness"][1] - 0.5) > 0: + for k in hand_landmarks: + hand_landmarks[k].pop(1) + nb_hands = 1 + ${_TRACE1} ("!!! Removing one hand because same handedness") + + if nb_hands == 1: + single_hand_count += 1 + else: + single_hand_count = 0 + + send_result_hands(send_new_frame_to_branch==1, nb_lm_inf, hand_landmarks["lm_score"], hand_landmarks["handedness"], hand_landmarks["rect_center_x"], hand_landmarks["rect_center_y"], hand_landmarks["rect_size"], hand_landmarks["rotation"], hand_landmarks["rrn_lms"], hand_landmarks["sqn_lms"], hand_landmarks["world_lms"], hand_landmarks["xyz"], hand_landmarks["xyz_zone"]) + + if nb_hands == 0: + send_new_frame_to_branch = 1 + elif nb_hands == 1 and single_hand_count >= ${_single_hand_tolerance_thresh}: + send_new_frame_to_branch = 1 + single_hand_count = 0 + else: + send_new_frame_to_branch = 2 + + ${_IF_USE_HANDEDNESS_AVERAGE} + nb_hands_in_previous_frame = nb_hands + ${_IF_USE_HANDEDNESS_AVERAGE} \ No newline at end of file diff --git a/depthai_sdk/src/depthai_sdk/oak_camera.py b/depthai_sdk/src/depthai_sdk/oak_camera.py index c8c35425c..089792bce 100644 --- a/depthai_sdk/src/depthai_sdk/oak_camera.py +++ b/depthai_sdk/src/depthai_sdk/oak_camera.py @@ -24,6 +24,7 @@ from depthai_sdk.components.parser import parse_usb_speed from depthai_sdk.components.stereo_component import StereoComponent from depthai_sdk.components.pointcloud_component import PointcloudComponent +from depthai_sdk.components.hand_tracker.component import HandTrackerComponent from depthai_sdk.oak_device import OakDevice from depthai_sdk.record import RecordType, Record from depthai_sdk.replay import Replay @@ -279,6 +280,19 @@ def create_pointcloud(self, self._components.append(comp) return comp + def create_hand_tracker(self, + input: CameraComponent, + name: Optional[str] = None, + ): + comp = HandTrackerComponent( + self._oak.device, + self.pipeline, + input=input, + name=name + ) + self._components.append(comp) + return comp + def _init_device(self, config: dai.Device.Config, device_str: Optional[str] = None, diff --git a/depthai_sdk/src/depthai_sdk/oak_outputs/xout/xout_hand_tracker.py b/depthai_sdk/src/depthai_sdk/oak_outputs/xout/xout_hand_tracker.py new file mode 100644 index 000000000..8eefbb1a2 --- /dev/null +++ b/depthai_sdk/src/depthai_sdk/oak_outputs/xout/xout_hand_tracker.py @@ -0,0 +1,173 @@ +from typing import List, Union, Dict + +import depthai as dai +import numpy as np + +from depthai_sdk.classes.enum import ResizeMode +from depthai_sdk.classes.packets import HandTrackerPacket, RotatedDetectionPacket +from depthai_sdk.classes.enum import ResizeMode +from depthai_sdk.oak_outputs.xout.xout_base import StreamXout +from depthai_sdk.oak_outputs.xout.xout_frames import XoutFrames +from depthai_sdk.oak_outputs.xout.xout_seq_sync import XoutSeqSync +from depthai_sdk.visualize.visualizer import Visualizer +import depthai_sdk.components.hand_tracker.mediapipe_utils as mpu +import marshal +from depthai_sdk.visualize.bbox import BoundingBox +from math import sin, cos, atan2, pi, degrees, floor, dist +try: + import cv2 +except ImportError: + cv2 = None + +class XoutHandTracker(XoutSeqSync, XoutFrames): + def __init__(self, + script_stream: StreamXout, + frames: StreamXout, + component: 'HandTrackerComponent' + ): + self.script_stream = script_stream + + XoutFrames.__init__(self, frames) + XoutSeqSync.__init__(self, [frames, script_stream]) + self.pad_h = component.pad_h + self.pad_w = component.pad_w + self.name = 'HandTracker' + + def visualize(self, packet: HandTrackerPacket): + print('Visualization of HandTracker is not supported') + + def xstreams(self) -> List[StreamXout]: + return [self.script_stream, self.frames] + + def package(self, msgs: Dict): + frame = msgs['0_preview'].getCvFrame() + + res = marshal.loads(msgs['host_host'].getData()) + hands = [] + pre_det_bb = BoundingBox().resize_to_aspect_ratio(frame.shape, (1, 1), resize_mode='letterbox') + for i in range(len(res.get("lm_score",[]))): + hand = self.extract_hand_data(res, i, frame, pre_det_bb) + hands.append(hand) + + packet = HandTrackerPacket(name=self.name, hands=hands, color_frame=frame) + self.queue.put(packet, block=False) + + def extract_hand_data(self, res, hand_idx, frame: np.ndarray, bb: BoundingBox): + frame_width = frame.shape[1] + frame_height = frame.shape[0] + hand = mpu.HandRegion() + + # center = bb.map_point(res["rect_center_x"][hand_idx], res["rect_center_y"][hand_idx]).denormalize(frame.shape) + hand.rect_x_center_a = res["rect_center_x"][hand_idx] * frame_width # center[0] + hand.rect_y_center_a = res["rect_center_y"][hand_idx] * frame_height # center[1] + hand.rect_w_a = hand.rect_h_a = res["rect_size"][hand_idx] * frame_width + hand.rotation = res["rotation"][hand_idx] + hand.rect_points = mpu.rotated_rect_to_points(hand.rect_x_center_a, hand.rect_y_center_a, hand.rect_w_a, hand.rect_h_a, hand.rotation) + hand.lm_score = res["lm_score"][hand_idx] + hand.handedness = res["handedness"][hand_idx] + hand.label = "right" if hand.handedness > 0.5 else "left" + hand.norm_landmarks = np.array(res['rrn_lms'][hand_idx]).reshape(-1,3) + hand.landmarks = (np.array(res["sqn_lms"][hand_idx]) * frame_width).reshape(-1,2).astype(np.int32) + + if res.get("xyz", None) is not None: + hand.xyz = np.array(res["xyz"][hand_idx]) + hand.xyz_zone = res["xyz_zone"][hand_idx] + + # If we added padding to make the image square, we need to remove this padding from landmark coordinates and from rect_points + if self.pad_h > 0: + hand.landmarks[:,1] -= self.pad_h + for i in range(len(hand.rect_points)): + hand.rect_points[i][1] -= self.pad_h + if self.pad_w > 0: + hand.landmarks[:,0] -= self.pad_w + for i in range(len(hand.rect_points)): + hand.rect_points[i][0] -= self.pad_w + + # World landmarks + if res.get("world_lms", None) is not None: + try: + hand.world_landmarks = np.array(res["world_lms"][hand_idx]).reshape(-1, 3) + except: + pass + + # if self.use_gesture: mpu.recognize_gesture(hand) + + return hand + +THRESHOLD = 0.5 +class XoutPalmDetection(XoutSeqSync, XoutFrames): + def __init__(self, + frames: StreamXout, + nn_results: StreamXout, + hand_tracker: 'HandTrackerComponent' + ): + self.frames = frames + self.nn_results = nn_results + + # self._decode_fn = None + # self._labels = False + self.size = hand_tracker.pd_size + self.resize_mode = hand_tracker.pd_resize_mode + + XoutFrames.__init__(self, frames) + XoutSeqSync.__init__(self, [frames, nn_results]) + + def xstreams(self) -> List[StreamXout]: + return [self.nn_results, self.frames] + + def normalize_radians(self, angle): + return angle - 2 * pi * floor((angle + pi) / (2 * pi)) + + def visualize(self, packet: RotatedDetectionPacket): + """ + Palm detection visualization callback + """ + frame = packet.frame + pre_det_bb = BoundingBox().resize_to_aspect_ratio(frame.shape, self.size, resize_mode=self.resize_mode) + for rr, corners in zip(packet.rotated_rects, packet.bb_corners): + # print(rr.center.x, rr.center.y, rr.size.width, rr.size.height, rr.angle) + corner_points = [pre_det_bb.map_point(*p).denormalize(frame.shape) for p in corners] + cv2.polylines(frame, [np.array(corner_points)], True, (127,255,0), 2, cv2.LINE_AA) + for corner in corner_points: + cv2.circle(frame, center=corner, radius=2, color=(0,127,255), thickness=-1) + + center = pre_det_bb.map_point(rr.center.x, rr.center.y).denormalize(frame.shape) + cv2.circle(frame, center=center, radius=3, color=(255,127,0), thickness=-1) + cv2.imshow('PD', frame) + + def package(self, msgs: Dict): + pd_nndata: dai.NNData = msgs[self.nn_results.name] + imgFrame: dai.ImgFrame = msgs[self.frames.name] + vals = pd_nndata.getLayerFp16("result") + + # detection is list of Nx8 float + # Looping the detection multiple times to obtain data for all hands + rrs = [] + for i in range(0, len(vals), 8): + pd_score, box_x, box_y, box_size, kp0_x, kp0_y, kp2_x, kp2_y = vals[i:i+8] + if pd_score >= THRESHOLD and box_size > 0: + # scale_center_x = sqn_scale_x - sqn_rr_center_x + # scale_center_y = sqn_scale_y - sqn_rr_center_y + kp02_x = kp2_x - kp0_x + kp02_y = kp2_y - kp0_y + sqn_rr_size = 2.9 * box_size + rotation = 0.5 * pi - atan2(-kp02_y, kp02_x) + rotation = self.normalize_radians(rotation) + sqn_rr_center_x = box_x + 0.5*box_size*sin(rotation) + sqn_rr_center_y = box_y - 0.5*box_size*cos(rotation) + + rr = dai.RotatedRect() + rr.center.x = sqn_rr_center_x + rr.center.y = sqn_rr_center_y + rr.size.width = sqn_rr_size + rr.size.height = sqn_rr_size + rr.angle = degrees(rotation) + rrs.append(rr) + + packet = RotatedDetectionPacket( + name = self.get_packet_name(), + msg = imgFrame, + rotated_rects=rrs, + ) + + self.queue.put(packet, block=False)