Skip to content

Conversation

@HonzaCuhel
Copy link
Contributor

@HonzaCuhel HonzaCuhel commented Mar 24, 2025

Purpose

Adding TwoStageSync node

Specification

  • Adding TwoStageSync and unittests

Dependencies & Potential Impact

None / not applicable

Deployment Plan

None / not applicable

Testing & Validation

None / not applicable

@HonzaCuhel HonzaCuhel self-assigned this Mar 24, 2025
@HonzaCuhel HonzaCuhel marked this pull request as draft March 24, 2025 14:49
@github-actions github-actions bot added the enhancement New feature or request label Mar 24, 2025
@codecov-commenter
Copy link

codecov-commenter commented Mar 27, 2025

Codecov Report

Attention: Patch coverage is 85.16129% with 23 lines in your changes missing coverage. Please review.

Project coverage is 51.44%. Comparing base (e8aeb49) to head (9e7a4e5).
Report is 1 commits behind head on main.

✅ All tests successful. No failed tests found.

Files with missing lines Patch % Lines
depthai_nodes/node/two_stage_sync.py 85.12% 18 Missing ⚠️
depthai_nodes/message/detected_recognitions.py 84.37% 5 Missing ⚠️
Additional details and impacted files
@@            Coverage Diff             @@
##             main     #194      +/-   ##
==========================================
+ Coverage   50.29%   51.44%   +1.15%     
==========================================
  Files          79       81       +2     
  Lines        4575     4731     +156     
==========================================
+ Hits         2301     2434     +133     
- Misses       2274     2297      +23     

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

@klemen1999 klemen1999 requested a review from dominik737 March 27, 2025 13:46
@kkeroo
Copy link
Collaborator

kkeroo commented Mar 27, 2025

Can we come up with a better name? Because the node can sync any other message not only recognitions?

@dominik737
Copy link
Contributor

dominik737 commented Mar 27, 2025

This node actually can sync only ImgDetections and ImgDetectionsExtended. It relies on the len(ImgDetections.detections) to determine for how many (recognition - NNData) messages to wait.

There is another GatheredData node that can sync any messages, the drawback is that it's not as plug&play and you have to have some type of a converter to get the number of messages you need to wait for.

We've agreed that this node will in the future inherit from the GatheredData since the logic is the same. But for now it is not necessary, because the API will stay the same when we decide for the refactor.

@kkeroo
Copy link
Collaborator

kkeroo commented Mar 27, 2025

I meant that this node can also sync detections with Keypoints msg. for example, right?

@dominik737
Copy link
Contributor

Actually, you are right. You can supply other objects beside the NNData. We just assume and typehint for that.

If we want to make it more generic we should change the typehints and then the name should follow.

@HonzaCuhel HonzaCuhel marked this pull request as ready for review April 5, 2025 10:55
@HonzaCuhel HonzaCuhel changed the title [Draft] Add DetectionsRecognitionsSync Add TwoStageSync node Apr 5, 2025
@HonzaCuhel HonzaCuhel requested review from jkbmrz and kkeroo April 5, 2025 10:59
from .img_detections import ImgDetectionsExtended


class DetectedRecognitions(dai.Buffer):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we make this message more general, i.e. to store arbitrary message types?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We could yea. Preferably using generics.

class TwoStageSync(dai.node.ThreadedHostNode):
FPS_TOLERANCE_DIVISOR = 2.0
INPUT_CHECKS_PER_FPS = 100
"""A class for synchronizing detections and recognitions.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"synchronizing detections and recognitions" - I thought we want to make this node more general. If that's the case, the docstring should adapt to the change in the node name.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

And same for parameter names below - they should adapt to the changes that are not aiming to only sync recognitions and detections.

self.out = self.createOutput()

def build(self, camera_fps: int) -> "TwoStageSync":
if camera_fps <= 0:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe rather move this check to the self.set_camera_fps() setter.

self._camera_fps = camera_fps
return self

def set_camera_fps(self, fps: int) -> None:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Rename to setCameraFPS() to better match the naming in other nodes.

Output for detected recognitions.
"""

def __init__(self, *args, **kwargs) -> None:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we rather define the expected params? Might be more convenient for the user to know exactly what is expected.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There are no params the node itself needs. You can delete the (k)args. I think I just passed it down to dai.ThreadedHostNode in case in future there is passed something important, but doesn't seem like it.

kwargs: Keyword arguments to be passed to the ThreadedHostNode class.
"""
super().__init__(*args, **kwargs)
self._camera_fps = 30
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we set a default _camera_fps value if we enforce setting it inside the build() method?

for unmatched_recognition in unmatched_recognitions_to_remove:
self._unmatched_recognitions.remove(unmatched_recognition)

def _get_total_seconds_ts(self, buffer_like: dai.Buffer) -> float:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we really need this method? It's only abstracts the .getTimestamp().total_seconds() call, which is quite straight-forward to me.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also, why the .total_seconds() is needed. Cannot we simply compare the timestamps?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The getTimestamp gives you datetime.timedelta object. With the object you cannot really easily do arithmetics like substracting plain number. For that reason the .total_seconds() is used instead.

When you reference something that is repeating (e.g. multiple method calls) you should create a method, otherwise you violate DRY principle. The _get_total_seconds_ts is referenced several times in the code. If the way to obtain total seconds would change somehow the replacement would have to be made in all cases. Also you would have to write these two methods every time you want to get the timestamp total seconds.

The camera FPS.
_unmatched_recognitions: List[dai.Buffer]
List of unmatched recognitions.
_recognitions_by_detection_ts: Dict[float, List[dai.Buffer]]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just a suggestion, might it be more clear to have:

  • _unmatched_detections ([float, List[dai.Buffer]])
  • _unmatched_recognitions ([float, List[dai.Buffer]])
  • _matched_messages ([float, Tuple[dai.Buffer, dai.Buffer]]

This way, the parameter names are more straight-forward, and the _update_ready_timestamps parameter is not needed anymore.


def _timestamps_in_tolerance(self, timestamp1: float, timestamp2: float) -> bool:
difference = abs(timestamp1 - timestamp2)
return difference < (1 / self._camera_fps / self.FPS_TOLERANCE_DIVISOR)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we allow the option to modify the self.FPS_TOLERANCE_DIVISOR param? Because, if understanding correctly, the delay depends on the "recognition" model which might be lower/higher depending on the specific architecture used.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The FPS_TOLERANCE_DIVISOR is basically saying how much deviation there can be in the timestamps to consider the detection/recognition belong together.

I've set it such because that is exactly splitting the interval into 2, meaning that consecutive detections do not have any overlapping timestamps. If you would set it to 3 you would have some timestamp space where the recognitions would not be matched to any detection. If set to 1.5 there would be overlap between two consecutive detections.

Hence, I view this as a constant. I doubt there would be a good reason to change it.


def _update_ready_timestamps(self, timestamp: float) -> None:
if not self._timestamp_ready(timestamp):
return
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is this return needed?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's a guard clause. It can be used to increase readability, although works well in my experience only if the function is short.

In this case if there is not ready timestamp then there is nothing to update so the return makes the executor jump out of the function.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wouldn't it be much simpler (and also had same efficiency) if written as:

def _update_ready_timestamps(self, timestamp: float) -> None:
    if self._timestamp_ready(timestamp):
        self._ready_timestamps.put(timestamp)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Efficiency is the same and negligible in this case, let's put that aside.

Simplicity is quite subjective. The one who is used to guard clauses, looks at the method and sees there are clauses that prevent further execution of the method. The method logic is less horizontally nested with the clause, which increases the readability.

If in the future there are more conditions why the ready timestamps should not be updated, it can be solved by simply adding another guard clause. Another guard clause will not make the condition more complicated and won't claim more horizontal space, this makes it quite extensible.

@HonzaCuhel HonzaCuhel requested a review from aljazkonec1 April 7, 2025 09:56
Copy link
Contributor

@aljazkonec1 aljazkonec1 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added some small comments about the structure that dont affect the logic.

The logic is sound, so LGTM

"""Initializes the DetectedRecognitions object."""
super().__init__()
self._img_detections = None
self._recognitions_data = []
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add type hints


self._ready_timestamps.put(timestamp)

def _timestamp_ready(self, timestamp: float) -> bool:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Function is called once in line 143. The code can just be put in that function instead. Makes the code more readable

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The use-case for functions is not to only make parts of code reusable. It's also great tool to create hierarchy and abstractions, as demonstrated here.

When you read the _update_ready_timestamps you can quickly see that the function is checking whether there are any timestamps ready and then it updates ready timestamps. The implementation logic of retrieving ready timestamps is just a detail of lower hierarchy - it has nothing to do with updating ready timestamps.

self._clear_unmatched_recognitions(current_timestamp)
self._clear_old_detections(current_timestamp)

def _clear_unmatched_recognitions(self, current_timestamp) -> None:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Called once 3 lines above. I would move it there to make the function _clear_old_data better readable

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

See the comment above.

In the abstraction hierarchy level of clearing old data there are two tasks:

  • clearing unmatched recognitions
  • clearing old detections

The implementation details of either are lower abstraction hierarchy level.

from .img_detections import ImgDetectionsExtended


class DetectedRecognitions(dai.Buffer):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We could yea. Preferably using generics.

self._recognitions_data = []

@property
def img_detections(self) -> Union[dai.ImgDetections, ImgDetectionsExtended]:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We might consider using typing.Protocol rather than Union. We basically only need the detections: list property. This way we won't have to add any other types to the Union in future. Also if we need to change the property name we depend upon it will be in once place (in the Protocol).

Comment on lines +21 to +30
_camera_fps: int
The camera FPS.
_unmatched_recognitions: List[dai.Buffer]
List of unmatched recognitions.
_recognitions_by_detection_ts: Dict[float, List[dai.Buffer]]
Dictionary of recognitions by detection timestamp.
_detections: Dict[float, Union[dai.ImgDetections, dai.SpatialImgDetections, ImgDetectionsExtended]]
Dictionary of detections.
_ready_timestamps: PriorityQueue
Priority queue of ready timestamps.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think private attributes should not be part of the docstrings.

Output for detected recognitions.
"""

def __init__(self, *args, **kwargs) -> None:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There are no params the node itself needs. You can delete the (k)args. I think I just passed it down to dai.ThreadedHostNode in case in future there is passed something important, but doesn't seem like it.


def _timestamps_in_tolerance(self, timestamp1: float, timestamp2: float) -> bool:
difference = abs(timestamp1 - timestamp2)
return difference < (1 / self._camera_fps / self.FPS_TOLERANCE_DIVISOR)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The FPS_TOLERANCE_DIVISOR is basically saying how much deviation there can be in the timestamps to consider the detection/recognition belong together.

I've set it such because that is exactly splitting the interval into 2, meaning that consecutive detections do not have any overlapping timestamps. If you would set it to 3 you would have some timestamp space where the recognitions would not be matched to any detection. If set to 1.5 there would be overlap between two consecutive detections.

Hence, I view this as a constant. I doubt there would be a good reason to change it.


def _update_ready_timestamps(self, timestamp: float) -> None:
if not self._timestamp_ready(timestamp):
return
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's a guard clause. It can be used to increase readability, although works well in my experience only if the function is short.

In this case if there is not ready timestamp then there is nothing to update so the return makes the executor jump out of the function.


self._ready_timestamps.put(timestamp)

def _timestamp_ready(self, timestamp: float) -> bool:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The use-case for functions is not to only make parts of code reusable. It's also great tool to create hierarchy and abstractions, as demonstrated here.

When you read the _update_ready_timestamps you can quickly see that the function is checking whether there are any timestamps ready and then it updates ready timestamps. The implementation logic of retrieving ready timestamps is just a detail of lower hierarchy - it has nothing to do with updating ready timestamps.

self._clear_unmatched_recognitions(current_timestamp)
self._clear_old_detections(current_timestamp)

def _clear_unmatched_recognitions(self, current_timestamp) -> None:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

See the comment above.

In the abstraction hierarchy level of clearing old data there are two tasks:

  • clearing unmatched recognitions
  • clearing old detections

The implementation details of either are lower abstraction hierarchy level.

for unmatched_recognition in unmatched_recognitions_to_remove:
self._unmatched_recognitions.remove(unmatched_recognition)

def _get_total_seconds_ts(self, buffer_like: dai.Buffer) -> float:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The getTimestamp gives you datetime.timedelta object. With the object you cannot really easily do arithmetics like substracting plain number. For that reason the .total_seconds() is used instead.

When you reference something that is repeating (e.g. multiple method calls) you should create a method, otherwise you violate DRY principle. The _get_total_seconds_ts is referenced several times in the code. If the way to obtain total seconds would change somehow the replacement would have to be made in all cases. Also you would have to write these two methods every time you want to get the timestamp total seconds.

@dominik737
Copy link
Contributor

Closing the PR with successor #203.

@dominik737 dominik737 closed this Apr 11, 2025
@klemen1999 klemen1999 deleted the feat/add-detections-recognitions-sync branch June 5, 2025 10:10
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

enhancement New feature or request

Projects

None yet

Development

Successfully merging this pull request may close these issues.

7 participants