Skip to content

Commit

Permalink
fix: fixed merge timestamp_key for nodes that get messages by take() …
Browse files Browse the repository at this point in the history
…on user code (#545)

* fix: remove rmw_take column

* refactor

Signed-off-by: Koudai Yamasaki <[email protected]>

* fix: rename column name for take impl node records

Signed-off-by: Koudai Yamasaki <[email protected]>

* fix communication latency records

Signed-off-by: Koudai Yamasaki <[email protected]>

* fix pytest

Signed-off-by: Koudai Yamasaki <[email protected]>

* adjust columns for the case that the message is not taken by callback

Signed-off-by: Koudai Yamasaki <[email protected]>

* fix take impl test case

Signed-off-by: Koudai Yamasaki <[email protected]>

---------

Signed-off-by: Koudai Yamasaki <[email protected]>
  • Loading branch information
ymski authored Jan 27, 2025
1 parent 9fb70d9 commit 54a5d6c
Show file tree
Hide file tree
Showing 5 changed files with 22 additions and 12 deletions.
22 changes: 13 additions & 9 deletions src/caret_analyze/infra/lttng/records_provider_lttng.py
Original file line number Diff line number Diff line change
Expand Up @@ -921,12 +921,10 @@ def _compose_inter_proc_comm_records(
columns.append(COLUMN_NAME.RCL_PUBLISH_TIMESTAMP)
if COLUMN_NAME.DDS_WRITE_TIMESTAMP in records.columns:
columns.append(COLUMN_NAME.DDS_WRITE_TIMESTAMP)
columns.append(COLUMN_NAME.SOURCE_TIMESTAMP)

sub_records = self._source.sub_records(callback_object, None)
is_take_node = len(sub_records) == 0
if not is_take_node:
columns.append(COLUMN_NAME.CALLBACK_START_TIMESTAMP)
columns += [
COLUMN_NAME.SOURCE_TIMESTAMP,
COLUMN_NAME.CALLBACK_START_TIMESTAMP,
]

self._format(records, columns)

Expand Down Expand Up @@ -1379,9 +1377,15 @@ def fill_source_timestamp_with_latest_timestamp(records):
f'{self._node_path.publish_topic_name}/rclcpp_publish_timestamp',
]
left_key = sub_records.columns[0]
if COLUMN_NAME.RMW_TAKE_TIMESTAMP in columns:
columns.remove(COLUMN_NAME.RMW_TAKE_TIMESTAMP)
left_key = COLUMN_NAME.RMW_TAKE_TIMESTAMP

# Set left_key to rmw_take timestamp
# if sub_records are obtained by RecordsProviderLttng.subscription_take_records()
if is_take_node:
for column in sub_records.columns:
if column.endswith(COLUMN_NAME.RMW_TAKE_TIMESTAMP):
columns.remove(column)
left_key = column
break

pub_sub_records = merge_sequential(
left_records=sub_records,
Expand Down
4 changes: 4 additions & 0 deletions src/caret_analyze/runtime/path.py
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,10 @@ def is_match_column(column: str, target_name: str) -> bool:
right_records)
right_records.rename_columns(rename_rule)

# adjust the columns for the case that the message is not taken by callback
if is_match_column(right_records.columns[0], 'source_timestamp'):
left_records.drop_columns([left_records.columns[-1]])

if left_records.columns[-1] != right_records.columns[0]:
raise InvalidRecordsError('left columns[-1] != right columns[0]')

Expand Down
1 change: 1 addition & 0 deletions src/test/infra/lttng/test_latency_definitions.py
Original file line number Diff line number Diff line change
Expand Up @@ -1381,6 +1381,7 @@ def test_inter_proc_empty_data(
columns=[
f'{communication.topic_name}/rclcpp_publish_timestamp',
f'{communication.topic_name}/source_timestamp',
f'{callback.callback_name}/callback_start_timestamp',
],
dtype='Int64'
)
Expand Down
6 changes: 3 additions & 3 deletions src/test/infra/lttng/test_records_provider_lttng.py
Original file line number Diff line number Diff line change
Expand Up @@ -608,18 +608,18 @@ def noop(*args, **kwargs):
take_records_data = [
RecordCppImpl({
f'{topic_name_1}/{COLUMN_NAME.SOURCE_TIMESTAMP}': 1,
COLUMN_NAME.RMW_TAKE_TIMESTAMP: 2,
f'{topic_name_1}/{COLUMN_NAME.RMW_TAKE_TIMESTAMP}': 2,
}),
RecordCppImpl({
f'{topic_name_1}/{COLUMN_NAME.SOURCE_TIMESTAMP}': 6,
COLUMN_NAME.RMW_TAKE_TIMESTAMP: 7,
f'{topic_name_1}/{COLUMN_NAME.RMW_TAKE_TIMESTAMP}': 7,
}),
]
take_records = RecordsCppImpl(
take_records_data,
[
ColumnValue(f'{topic_name_1}/{COLUMN_NAME.SOURCE_TIMESTAMP}'),
ColumnValue(COLUMN_NAME.RMW_TAKE_TIMESTAMP),
ColumnValue(f'{topic_name_1}/{COLUMN_NAME.RMW_TAKE_TIMESTAMP}'),
]
)

Expand Down
1 change: 1 addition & 0 deletions src/test/runtime/test_path.py
Original file line number Diff line number Diff line change
Expand Up @@ -596,6 +596,7 @@ def test_take_impl_case(self, mocker):
ColumnValue(f'{topic0}/rcl_publish_timestamp'),
ColumnValue(f'{topic0}/dds_write_timestamp'),
ColumnValue(f'{topic0}/source_timestamp'),
ColumnValue(f'{topic0}/callback_start_timestamp'),
]
)
)
Expand Down

0 comments on commit 54a5d6c

Please sign in to comment.