Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: merge using the source timestamp #516

Merged
merged 77 commits into from
Aug 7, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
77 commits
Select commit Hold shift + click to select a range
a0ccba5
add test
ymski Jul 11, 2024
58ad355
add node
ymski Jul 11, 2024
094d27e
Fix: code style
h-suzuki-isp Jul 23, 2024
2f5efee
Fix: code style
h-suzuki-isp Jul 23, 2024
cf63f43
ci(pre-commit): autofix
pre-commit-ci[bot] Jul 23, 2024
a155df3
Delete: test_accept
h-suzuki-isp Jul 23, 2024
acdf821
Fix: rmw_subscription_handle
h-suzuki-isp Jul 23, 2024
fb0a8dd
Fix: record mergre
h-suzuki-isp Jul 23, 2024
90cc3cb
Fix: test use_latest_message
h-suzuki-isp Jul 24, 2024
219f145
Fix: flake8
h-suzuki-isp Jul 24, 2024
46ac570
Fix: corespond to mypy
h-suzuki-isp Jul 24, 2024
c5e5c58
Fix: spellcheck-differential
h-suzuki-isp Jul 24, 2024
33a6232
Delete: _get_rmw_handle_from_callback_object
h-suzuki-isp Jul 25, 2024
9f8f2d4
Add: test for NodeUseLatest when take impl
h-suzuki-isp Jul 26, 2024
a3acb40
Revert "Delete: _get_rmw_handle_from_callback_object"
h-suzuki-isp Jul 26, 2024
173413b
feat: get rmw_handle from callback_object
h-suzuki-isp Jul 26, 2024
f700e5e
feat: add test for NodeRecordsUseLatest normal flow
h-suzuki-isp Jul 26, 2024
b1a18d7
Fix: delete map_sub_handle_to_rmw_handle related
h-suzuki-isp Jul 26, 2024
23b26fa
feat: add test for subscription_take_records
h-suzuki-isp Jul 26, 2024
681c366
feat: add test for MergeRecords take_impl case
h-suzuki-isp Jul 26, 2024
44113f4
feat: add test for test_take_imple_case_include_first_callback
h-suzuki-isp Jul 29, 2024
b3c3151
Fix: test_subscription_take_records
h-suzuki-isp Jul 29, 2024
0c953fd
Fix: spellcheck differential
h-suzuki-isp Jul 29, 2024
6a7e200
Fix: flake8 error
h-suzuki-isp Jul 29, 2024
3c934ad
Update src/caret_analyze/infra/lttng/records_provider_lttng.py
h-suzuki-isp Jul 30, 2024
1b03dc7
Fix: I combined rmw_handle_to_node_name into a single loop
h-suzuki-isp Jul 30, 2024
5bf865c
feat: add description for using subscription_take_records
h-suzuki-isp Jul 30, 2024
3ffbc57
Fix: remove unused variable
h-suzuki-isp Jul 30, 2024
b426d64
feat: add docstring for _compose_intra_proc_comm_records
h-suzuki-isp Jul 30, 2024
3bcff16
Fix: if take impl, merge key is rmw_take_timestamp
h-suzuki-isp Jul 31, 2024
3cb32af
Fix: how to specify left_key
h-suzuki-isp Jul 31, 2024
ceb69bc
Fix: change if conversation
h-suzuki-isp Jul 31, 2024
ec28837
feat: add exeption when len(handle)!= 1
h-suzuki-isp Jul 31, 2024
c2749b9
Fix: remove unused variable
h-suzuki-isp Jul 31, 2024
aca2310
Update src/caret_analyze/infra/lttng/records_provider_lttng.py
h-suzuki-isp Jul 31, 2024
f9945f3
delete: docctring line
h-suzuki-isp Jul 31, 2024
b200996
Fix: remove comment
h-suzuki-isp Jul 31, 2024
79680ea
chore: add docstring for_compose_inter_proc_comm_records()
h-suzuki-isp Aug 1, 2024
81e906d
chore: add docstring for rmw_take_records
h-suzuki-isp Aug 1, 2024
fd7b407
Fix: warm message
h-suzuki-isp Aug 1, 2024
06d1ef4
Fix: delete unused implementation
h-suzuki-isp Aug 1, 2024
e9bd1af
feat: DataModelService move the location to init().
h-suzuki-isp Aug 1, 2024
581ae7e
feat: add lttng.data mock
h-suzuki-isp Aug 1, 2024
c903840
Fix: type check for _get_rmw_handle_from_callback_objec
h-suzuki-isp Aug 1, 2024
e599f6d
Fix: flake8
h-suzuki-isp Aug 1, 2024
704c4df
chore: warning message
h-suzuki-isp Aug 2, 2024
c4e44c1
chore: change message
h-suzuki-isp Aug 2, 2024
2a32afa
chore: add error message useful information
h-suzuki-isp Aug 2, 2024
477661b
Fix: the pattern that was not considered
h-suzuki-isp Aug 2, 2024
77d0f1d
Fix: mypy
h-suzuki-isp Aug 2, 2024
bf93822
Fix: flake8 & mypy
h-suzuki-isp Aug 2, 2024
9982cc7
chore: Change uppercase to lowercase.
h-suzuki-isp Aug 2, 2024
1ecf689
chore: message
h-suzuki-isp Aug 2, 2024
b985512
Update src/caret_analyze/runtime/path.py
h-suzuki-isp Aug 5, 2024
05980af
Fix: mistake docstring
h-suzuki-isp Aug 5, 2024
17aef41
delete: TODO
h-suzuki-isp Aug 5, 2024
edad585
feat: add test for test_take_impl_case_include_last_callback
h-suzuki-isp Aug 5, 2024
f55e0f8
feat: correspond to cases that source_timestamp is 0.
h-suzuki-isp Aug 5, 2024
e43e772
Fix: variable name
h-suzuki-isp Aug 6, 2024
2984716
Fix: subscription_take_records
h-suzuki-isp Aug 6, 2024
97cfc0a
Fix: retrun value of subscription_take_records
h-suzuki-isp Aug 6, 2024
0fb6cc1
Update src/test/infra/lttng/test_records_provider_lttng.py
h-suzuki-isp Aug 6, 2024
9bffd92
Update src/test/infra/lttng/test_records_provider_lttng.py
h-suzuki-isp Aug 6, 2024
5f1b21d
Update src/test/infra/lttng/test_records_provider_lttng.py
h-suzuki-isp Aug 6, 2024
d813288
Fix: test for uselatest
h-suzuki-isp Aug 6, 2024
79f95e7
Fix: use patch.object
h-suzuki-isp Aug 6, 2024
0e92039
chore: delete comment
h-suzuki-isp Aug 6, 2024
cf83bb5
Fix: grouped_rmw_records
h-suzuki-isp Aug 6, 2024
dec8851
Fix: rmw_handle value
h-suzuki-isp Aug 6, 2024
a5e0830
Fix: use_latest_message
h-suzuki-isp Aug 6, 2024
1d535c4
feat: add test for case that source_timestamp is 0
h-suzuki-isp Aug 7, 2024
19b3af6
Fix: has not records.clone
h-suzuki-isp Aug 7, 2024
0e07fd4
Fix: docstring
h-suzuki-isp Aug 7, 2024
d389b76
Fix: patch object
h-suzuki-isp Aug 7, 2024
625e506
Fix: variable name
h-suzuki-isp Aug 7, 2024
ef23c31
Fix: delete unused varialbel
h-suzuki-isp Aug 7, 2024
4525f2d
Merge branch 'main' into take_use_latest_message
h-suzuki-isp Aug 7, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 12 additions & 1 deletion src/caret_analyze/infra/lttng/event_counter.py
Original file line number Diff line number Diff line change
Expand Up @@ -232,6 +232,8 @@ def _build_count_df(data: Ros2DataModel) -> pd.DataFrame:
sub_cb_to_topic_name: dict[int, str] = {}
sub_to_topic_name: dict[int, str] = {}
sub_to_node_name: dict[int, str] = {}
rmw_handle_to_node_name: dict[int, str] = {}
rmw_handle_to_topic_name: dict[int, str] = {}

def ns_and_node_name(ns: str, name: str) -> str:
if ns[-1] == '/':
Expand All @@ -251,6 +253,9 @@ def ns_and_node_name(ns: str, name: str) -> str:
sub_handle_to_node_name[handler] = \
node_handle_to_node_name.get(row['node_handle'], '-')
sub_handle_to_topic_name[handler] = row['topic_name']
rmw_handle_to_node_name[row['rmw_handle']] = \
node_handle_to_node_name[row['node_handle']]
rmw_handle_to_topic_name[row['rmw_handle']] = row['topic_name']

for handler, row in data.timer_node_links.df.iterrows():
timer_handle_to_node_name[handler] = \
Expand Down Expand Up @@ -284,7 +289,7 @@ def ns_and_node_name(ns: str, name: str) -> str:
count_dict = []
group_keys = [
'callback_object', 'publisher_handle', 'subscription_handle',
'tilde_publisher', 'tilde_subscription'
'tilde_publisher', 'tilde_subscription', 'rmw_subscription_handle'
]
for trace_point, df in trace_point_and_df.items():
df = df.reset_index()
Expand All @@ -305,6 +310,8 @@ def ns_and_node_name(ns: str, name: str) -> str:
df['publisher_handle'] = '-'
if 'subscription_handle' not in df.columns:
df['subscription_handle'] = '-'
if 'rmw_subscription_handle' not in df.columns:
df['rmw_subscription_handle'] = '-'

if trace_point in ['ros2_caret:tilde_publish', 'ros2_caret:tilde_publisher_init']:
df['tilde_publisher'] = df['publisher']
Expand Down Expand Up @@ -342,6 +349,10 @@ def ns_and_node_name(ns: str, name: str) -> str:
key[4] in tilde_sub_to_topic_name:
topic_name = tilde_sub_to_topic_name.get(key[4], '-')
node_name = tilde_sub_to_node_name.get(key[4], '-')
elif key[5] in rmw_handle_to_node_name or \
key[5] in rmw_handle_to_topic_name:
topic_name = rmw_handle_to_topic_name.get(key[5], '-')
node_name = rmw_handle_to_node_name.get(key[5], '-')

count_dict.append(
{
Expand Down
5 changes: 5 additions & 0 deletions src/caret_analyze/infra/lttng/lttng.py
Original file line number Diff line number Diff line change
Expand Up @@ -953,6 +953,11 @@ def compose_subscribe_records(
) -> RecordsInterface:
return self._source.subscribe_records.clone()

def compose_rmw_take_records(
self,
) -> RecordsInterface:
return self._source.rmw_take_records.clone()

def create_timer_events_factory(
self,
timer_callback: TimerCallbackValueLttng
Expand Down
124 changes: 122 additions & 2 deletions src/caret_analyze/infra/lttng/records_provider_lttng.py
ymski marked this conversation as resolved.
Show resolved Hide resolved
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
from .bridge import LttngBridge
from .column_names import COLUMN_NAME
from .lttng import Lttng
from .ros2_tracing.data_model_service import DataModelService
from .value_objects import (PublisherValueLttng,
SubscriptionCallbackValueLttng,
TimerCallbackValueLttng)
Expand Down Expand Up @@ -67,6 +68,7 @@ def __init__(
self._lttng = lttng
self._source = FilteredRecordsSource(lttng)
self._helper = RecordsProviderLttngHelper(lttng)
self._srv = DataModelService(lttng.data)

def communication_records(
self,
Expand All @@ -88,6 +90,7 @@ def communication_records(
- [topic_name]/rclcpp_publish_timestamp
- [topic_name]/rcl_publish_timestamp (Optional)
- [topic_name]/dds_publish_timestamp (Optional)
- [topic_name]/source_timestamp (only inter process)
- [callback_name]/callback_start_timestamp

"""
Expand Down Expand Up @@ -195,6 +198,76 @@ def subscribe_records(

return self._subscribe_records_with_tilde(subscription)

def subscription_take_records(
self,
subscription: SubscriptionStructValue
) -> RecordsInterface:
"""
Provide subscription records.

This method is implemented for a node with a specific subscription usage.
Use this function when you get the message
using 'subscription->take' at any arbitrary timing
instead of using 'subscription_callback'.

Parameters
----------
subscription : SubscriptionStructValue
Target subscription value.
ymski marked this conversation as resolved.
Show resolved Hide resolved

Returns
-------
RecordsInterface
Columns

- [topic_name]/source_timestamp
- rmw_take_timestamp

"""
callback = subscription.callback
if callback is not None:
callback_objects = self._helper.get_subscription_callback_objects(callback)

try:
rmw_handle = self._srv._get_rmw_handle_from_callback_object(callback_objects[0])
except InvalidArgumentError:
rmw_handle = None

# get rmw_records, which relates to callback_object
rmw_records: RecordsInterface
if rmw_handle is not None:
rmw_records = self._source._grouped_rmw_records[rmw_handle].clone()
else:
rmw_records = RecordsFactory.create_instance(
None,
columns=[
ColumnValue(COLUMN_NAME.TID),
ColumnValue(COLUMN_NAME.RMW_TAKE_TIMESTAMP),
ColumnValue(COLUMN_NAME.RMW_SUBSCRIPTION_HANDLE),
ColumnValue(COLUMN_NAME.MESSAGE),
ColumnValue(COLUMN_NAME.SOURCE_TIMESTAMP)
]
)

# drop columns
columns = rmw_records.columns
drop_columns = list(set(columns) - {'source_timestamp', 'rmw_take_timestamp'})
rmw_records.drop_columns(drop_columns)

# reindex
rmw_records.reindex(['source_timestamp', 'rmw_take_timestamp'])

# add prefix to columns; e.g. [topic_name]/source_timestamp
if callback is not None:
self._rename_column(
rmw_records,
callback.callback_name,
subscription.topic_name,
None
)

return rmw_records

def _subscribe_records(
self,
subscription: SubscriptionStructValue
Expand Down Expand Up @@ -819,6 +892,7 @@ def _compose_inter_proc_comm_records(
- [topic_name]/rclcpp_publish_timestamp
- [topic_name]/rcl_publish_timestamp (Optional)
- [topic_name]/dds_write_timestamp (Optional)
- [topic_name]/source_timestamp
- [callback_name_name]/callback_start_timestamp

"""
Expand All @@ -838,6 +912,7 @@ def _compose_inter_proc_comm_records(
columns.append(COLUMN_NAME.RCL_PUBLISH_TIMESTAMP)
if COLUMN_NAME.DDS_WRITE_TIMESTAMP in records.columns:
columns.append(COLUMN_NAME.DDS_WRITE_TIMESTAMP)
columns.append(COLUMN_NAME.SOURCE_TIMESTAMP)
columns.append(COLUMN_NAME.CALLBACK_START_TIMESTAMP)

self._format(records, columns)
Expand Down Expand Up @@ -1241,17 +1316,56 @@ def to_records(self):
assert self._node_path.subscription is not None and self._node_path.publisher is not None

sub_records = self._provider.subscribe_records(self._node_path.subscription)

# If explicitly take message by user, there are cases that source_timestamp is 0.
def fill_source_timestamp_with_latest_timestamp(records):
source_columns = [s for s in records.columns if 'source_timestamp' in s]
if len(source_columns) != 1:
return records
source_column = source_columns[0]

columns = []
for column in records.columns:
columns += [ColumnValue(column)]

records_data = []
latest_timestamp = 0

for record in records.data:
source_timestamp = record.data[source_column]
if source_timestamp == 0:
record_dict = record.data
record_dict[source_column] = latest_timestamp
records_data.append(record_dict)
else:
latest_timestamp = source_timestamp
records_data.append(record.data)

new_records = RecordsFactory.create_instance(
records_data,
columns=columns
)
return new_records

is_take_node = len(sub_records) == 0
if is_take_node:
sub_records = self._provider.subscription_take_records(self._node_path.subscription)
sub_records = fill_source_timestamp_with_latest_timestamp(sub_records)
pub_records = self._provider.publish_records(self._node_path.publisher)

columns = [
sub_records.columns[0],
*sub_records.columns,
f'{self._node_path.publish_topic_name}/rclcpp_publish_timestamp',
]
left_key = sub_records.columns[0]
if 'rmw_take_timestamp' in columns:
columns.remove('rmw_take_timestamp')
left_key = 'rmw_take_timestamp'

pub_sub_records = merge_sequential(
left_records=sub_records,
right_records=pub_records,
left_stamp_key=sub_records.columns[0],
left_stamp_key=left_key,
right_stamp_key=pub_records.columns[0],
join_left_key=None,
join_right_key=None,
Expand Down Expand Up @@ -1848,6 +1962,12 @@ def _grouped_sub_records(self) -> dict[int, RecordsInterface]:
group = records.groupby([COLUMN_NAME.CALLBACK_OBJECT])
return self._expand_key_tuple(group)

@cached_property
def _grouped_rmw_records(self) -> dict[int, RecordsInterface]:
records = self._lttng.compose_rmw_take_records()
group = records.groupby([COLUMN_NAME.RMW_SUBSCRIPTION_HANDLE])
return self._expand_key_tuple(group)

@cached_property
def _grouped_tilde_pub_records(self) -> dict[int, RecordsInterface]:
records = self._lttng.compose_tilde_publish_records()
Expand Down
19 changes: 19 additions & 0 deletions src/caret_analyze/infra/lttng/records_source.py
Original file line number Diff line number Diff line change
Expand Up @@ -402,6 +402,25 @@ def subscribe_records(self) -> RecordsInterface:

return subscribe

@cached_property
def rmw_take_records(self) -> RecordsInterface:
"""
Compose rmw_take records.

Returns
-------
RecordsInterface
columns:
- tid
- rmw_take_timestamp
- rmw_subscription_handle
- message
- source_timestamp

"""
rmw_take_records = self._data.rmw_take_instances.clone()
return rmw_take_records

@cached_property
def intra_proc_comm_records(self) -> RecordsInterface:
"""
Expand Down
84 changes: 84 additions & 0 deletions src/caret_analyze/infra/lttng/ros2_tracing/data_model_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import pandas as pd

from .data_model import Ros2DataModel
from ....exceptions import InvalidArgumentError


logger = getLogger(__name__)
Expand Down Expand Up @@ -183,6 +184,89 @@ def _get_callback_symbols_from_handle(
except KeyError:
return [None]

def _get_rmw_handle_from_callback_object(
self,
cb_addr: int
) -> int | None:
try:
sub = self._get_sub_from_callback_object(cb_addr)
if sub is not None:
sub_handle = self._get_sub_handle_from_sub(sub)
if sub_handle is not None:
rmw_handle = self._get_rmw_handle_from_sub_handle(sub_handle)
return rmw_handle
except KeyError:
return None

def _get_sub_from_callback_object(
self,
cb_addr: int
) -> int | None:
try:
target_df = self._ensure_dataframe(
self._data.callback_objects.df.reset_index()[['reference', 'callback_object']])
sub = target_df[target_df['callback_object'] == cb_addr]['reference'].values
if len(sub) == 1:
return sub[0]
elif len(sub) == 0:
msg = f'There is no subscription that corresponds to callback_addr: {cb_addr}.'
raise InvalidArgumentError(msg)
else:
msg = f'Duplicated subscription : [{sub}] \
that corresponds to callback_addr: {cb_addr}'
raise InvalidArgumentError(msg)
except KeyError:
return None

def _get_sub_handle_from_sub(
self,
subscription: int
) -> int | None:
try:
target_df = self._ensure_dataframe(
self._data.subscription_objects.df.reset_index()[
['subscription', 'subscription_handle']
]
)
sub_handle = target_df[target_df['subscription'] == subscription][
'subscription_handle'
].values
if len(sub_handle) == 1:
return sub_handle[0]
elif len(sub_handle) == 0:
msg = f'There is no subscription_handle that \
corresponds to subscription : {subscription}.'
raise InvalidArgumentError(msg)
else:
msg = f'Duplicated subscription_handle: [{sub_handle}] that \
corresponds to subscription : {subscription}.'
raise InvalidArgumentError(msg)
except KeyError:
return None

def _get_rmw_handle_from_sub_handle(
self,
subscription_handle: int
) -> int | None:
try:
target_df = self._ensure_dataframe(
self._data.subscriptions.df.reset_index()[['subscription_handle', 'rmw_handle']])
rmw_handle = target_df[
target_df['subscription_handle'] == subscription_handle
]['rmw_handle'].values
if len(rmw_handle) == 1:
return rmw_handle[0]
elif len(rmw_handle) == 0:
msg = f'There is no rmw_handle that \
corresponds to subscription_handle: {subscription_handle}.'
raise InvalidArgumentError(msg)
else:
msg = f'Duplicated rmw_handle: [{rmw_handle}] that \
corresponds to subscription_handle: {subscription_handle}.'
raise InvalidArgumentError('len(rmw_handle) != 1')
except KeyError:
return None

@staticmethod
def _ensure_dataframe(
dataframe_or_series: pd.DataFrame | pd.Series
Expand Down
Loading
Loading