Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ros2: support pub-sub message tracking using rmw layer only #24

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,13 @@
import org.eclipse.tracecompass.incubator.internal.ros2.core.model.HostProcessPointer;
import org.eclipse.tracecompass.incubator.internal.ros2.core.model.HostThread;
import org.eclipse.tracecompass.incubator.internal.ros2.core.model.objects.Ros2ObjectHandle;
import org.eclipse.tracecompass.incubator.internal.ros2.core.trace.Ros2Trace;
import org.eclipse.tracecompass.incubator.internal.ros2.core.trace.layout.IRos2EventLayout;
import org.eclipse.tracecompass.tmf.core.event.ITmfEvent;
import org.eclipse.tracecompass.tmf.core.statesystem.AbstractTmfStateProvider;
import org.eclipse.tracecompass.tmf.core.trace.ITmfTrace;
import org.eclipse.tracecompass.tmf.ctf.core.trace.CtfTmfTrace;
import org.osgi.framework.Version;

/**
* Abstract ROS 2 state provider with some common utilities.
Expand All @@ -34,9 +36,18 @@
*/
public abstract class AbstractRos2StateProvider extends AbstractTmfStateProvider {

/**
* First tracetools version that includes the source_timestamp for a
* published message in the rmw_publish tracepoint. See:
* https://github.com/ros2/ros2_tracing/pull/74.
*/
private static final @NonNull Version RMW_SOURCE_TIMESTAMP_MINIMUM_VERSION = new Version("8.0.0"); //$NON-NLS-1$

/** The event layout */
protected static final IRos2EventLayout LAYOUT = IRos2EventLayout.getDefault();

private final boolean fIsPubSourceTimestampAvailableFromRmw;

/**
* Constructor
*
Expand All @@ -47,6 +58,16 @@ public abstract class AbstractRos2StateProvider extends AbstractTmfStateProvider
*/
protected AbstractRos2StateProvider(ITmfTrace trace, String id) {
super(Objects.requireNonNull(trace), Objects.requireNonNull(id));
// Version in trace needs to be >= the minimum version
fIsPubSourceTimestampAvailableFromRmw = ((Ros2Trace) getTrace()).getTracetoolsVersion().compareTo(RMW_SOURCE_TIMESTAMP_MINIMUM_VERSION) >= 0;
}

/**
* @return whether the source_timestamp value on the publication side is
* available from the rmw layer; if not, it is available from DDS
*/
protected boolean isPubSourceTimestampAvailableFromRmw() {
return fIsPubSourceTimestampAvailableFromRmw;
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ public class Ros2MessagesStateProvider extends AbstractRos2StateProvider {

// Publications
private Collection<HostProcessPointer> fKnownDdsWriters = new ArrayList<>();
private Collection<Ros2ObjectHandle> fKnownRmwPublishers = new ArrayList<>();
private Map<HostProcessPointer, ITmfEvent> fRclcppPublishEvents = Maps.newHashMap();
private Map<HostProcessPointer, ITmfEvent> fRclPublishEvents = Maps.newHashMap();
private Map<HostProcessPointer, ITmfEvent> fDdsWritePreEvents = Maps.newHashMap();
Expand Down Expand Up @@ -116,62 +117,80 @@ protected void eventHandle(@NonNull ITmfEvent event) {
}

eventHandleHelpers(event);
// If we can get the source_timestamp from rmw, don't process DDS events
if (!isPubSourceTimestampAvailableFromRmw()) {
eventHandlePublishDds(event, ss, timestamp);
}
eventHandlePublish(event, ss, timestamp);
eventHandleTake(ss, event, timestamp);
eventHandleCallback(event, ss, timestamp);
}

private void eventHandleHelpers(@NonNull ITmfEvent event) {
// dds:create_writer
if (isEvent(event, LAYOUT.eventDdsCreateWriter())) {
/**
* Many DDS writers are internal to:
*
* <pre>
* (1) the RMW implementation; and
* (2) the DDS implementation.
* </pre>
*
* To simplify processing of DDS write-related events (i.e.,
* dds:write), we keep a list of known/"valid" DDS writers and only
* consider events concerning those DDS writers.
*
* For (1), we can filter out DDS writers using the topic name. Most
* DDS writer topic names have prefixes. These prefixes are
* implementation details and are not part of the actual ROS 2 topic
* name:
*
* <pre>
* * "rt" for publication/subscription topics
* * "rq" and "rr" for service request and service reply topics, respectively
* </pre>
*
* Some DDS writers with a topic name not starting with those
* prefixes are internal to RMW:
*
* <pre>
* * "ros_discovery_info" for internal discovery in RMW (per RMW context)
* </pre>
*
* For (2), we do not actually get dds:create_writer events for
* those DDS writers (at least currently), so just going with an
* allow-list allows us to filter those out.
*/
String topicName = (String) getField(event, LAYOUT.fieldTopicName());
if (!topicName.equals("ros_discovery_info")) { //$NON-NLS-1$
HostProcessPointer writer = hostProcessPointerFrom(event, (long) getField(event, LAYOUT.fieldWriter()));
fKnownDdsWriters.add(writer);
/**
* Many DDS writers and rmw publishers are internal to:
*
* <pre>
* (1) the rmw implementation; and
* (2) the DDS implementation.
* </pre>
*
* To simplify processing of DDS write-related and rmw publish-related
* events (i.e., dds:write, ros2:rmw_publish), we keep a list of
* known/"valid" DDS writers and rmw publishers and only consider events
* concerning those DDS writers or rmw publishers.
*/
if (!isPubSourceTimestampAvailableFromRmw()) {
// dds:create_writer
if (isEvent(event, LAYOUT.eventDdsCreateWriter())) {
/**
* For DDS:
*
* For (1), we can filter out DDS writers using the topic name.
* Most DDS writer topic names have prefixes. These prefixes are
* implementation details and are not part of the actual ROS 2
* topic name:
*
* <pre>
* * "rt" for publication/subscription topics
* * "rq" and "rr" for service request and service reply topics, respectively
* </pre>
*
* Some DDS writers with a topic name not starting with those
* prefixes are internal to rmw:
*
* <pre>
* * "ros_discovery_info" for internal discovery in rmw (per rmw context)
* </pre>
*
* For (2), we do not actually get dds:create_writer events for
* those DDS writers (at least currently), so just going with an
* allow-list allows us to filter those out.
*/
String topicName = (String) getField(event, LAYOUT.fieldTopicName());
if (!topicName.equals("ros_discovery_info")) { //$NON-NLS-1$
HostProcessPointer writer = hostProcessPointerFrom(event, (long) getField(event, LAYOUT.fieldWriter()));
fKnownDdsWriters.add(writer);
}
}
} else {
// rcl_publisher_init
if (isEvent(event, LAYOUT.eventRclPublisherInit())) {
/**
* For rmw:
*
* Build an allow-list of rmw publishers that belong to an rcl
* publisher by looking at ros2:rcl_publisher_init events,
* because then that means they're not implementation details of
* rmw or DDS.
*/
Ros2ObjectHandle rmwPublisher = handleFrom(event, (long) getField(event, LAYOUT.fieldRmwPublisherHandle()));
fKnownRmwPublishers.add(rmwPublisher);
}
}
}

private void eventHandlePublish(@NonNull ITmfEvent event, ITmfStateSystemBuilder ss, long timestamp) {
eventHandlePublishRos2(event);

eventHandlePublishDds(event, ss, timestamp);
}

private void eventHandlePublishRos2(@NonNull ITmfEvent event) {
// rclcpp_publish
if (isEvent(event, LAYOUT.eventRclcppPublish())) {
HostProcessPointer message = hostProcessPointerFrom(event, (long) getField(event, LAYOUT.fieldMessage()));
Expand All @@ -186,7 +205,11 @@ else if (isEvent(event, LAYOUT.eventRclPublish())) {
// Add to temporary map
fRclPublishEvents.put(message, event);
}
// TODO rmw_publish, use rmw-level timestamp
// rmw_publish
// We only need these events if we can get the source_timestamp
else if (isPubSourceTimestampAvailableFromRmw() && isEvent(event, LAYOUT.eventRmwPublish())) {
handleRmwPublish(event, ss, timestamp);
}
}

private void eventHandlePublishDds(@NonNull ITmfEvent event, ITmfStateSystemBuilder ss, long timestamp) {
Expand Down Expand Up @@ -214,7 +237,38 @@ private void eventHandlePublishDdsWrite(@NonNull ITmfEvent event, ITmfStateSyste
if (null == message) {
return;
}
handlePublish(event, ss, timestamp, message);
}

private HostProcessPointer getDdsMessage(@NonNull ITmfEvent event, HostProcessPointer writer) {
/**
* Currently, with the Fast DDS instrumentation, the dds:write event
* does not contain the message/data pointer. We need to get it from a
* separate previous event, dds:write_pre.
*/
if (hasField(event, LAYOUT.fieldData())) {
return hostProcessPointerFrom(event, (long) getField(event, LAYOUT.fieldData()));
}
ITmfEvent ddsWritePre = fDdsWritePreEvents.remove(writer);
if (null == ddsWritePre) {
Activator.getInstance().logError("could not get corresponding dds:write_pre event for writer=0x" + Long.toHexString(writer.getPointer())); //$NON-NLS-1$
return null;
}
return hostProcessPointerFrom(ddsWritePre, (long) getField(ddsWritePre, LAYOUT.fieldData()));
}

private void handleRmwPublish(@NonNull ITmfEvent event, ITmfStateSystemBuilder ss, long timestamp) {
// First check if we know the rmw publisher
Ros2ObjectHandle rmwPublisher = handleFrom(event, (long) getField(event, LAYOUT.fieldRmwPublisherHandle()));
if (!fKnownRmwPublishers.contains(rmwPublisher)) {
return;
}

HostProcessPointer message = hostProcessPointerFrom(event, (long) getField(event, LAYOUT.fieldMessage()));
handlePublish(event, ss, timestamp, message);
}

private void handlePublish(@NonNull ITmfEvent event, ITmfStateSystemBuilder ss, long endPubTimestamp, @NonNull HostProcessPointer message) {
/**
* Get corresponding rcl_publish event, since it's the main r*_publish
* event.
Expand All @@ -225,7 +279,7 @@ private void eventHandlePublishDdsWrite(@NonNull ITmfEvent event, ITmfStateSyste
return;
}
Ros2ObjectHandle publisherHandle = handleFrom(rclPublish, (long) getField(rclPublish, LAYOUT.fieldPublisherHandle()));
Ros2PublisherObject publisherObject = Ros2ObjectsUtil.getPublisherObjectFromHandle(fObjectsSs, timestamp, publisherHandle);
Ros2PublisherObject publisherObject = Ros2ObjectsUtil.getPublisherObjectFromHandle(fObjectsSs, endPubTimestamp, publisherHandle);
if (null == publisherObject) {
/**
* FIXME this happens with publishers for /rosout for some reason.
Expand All @@ -237,31 +291,6 @@ private void eventHandlePublishDdsWrite(@NonNull ITmfEvent event, ITmfStateSyste

// Get corresponding rclcpp_publish event
ITmfEvent rclcppPublish = fRclcppPublishEvents.remove(message);

addPublicationInstance(event, ss, timestamp, publisherObject, message, rclcppPublish);
}

private HostProcessPointer getDdsMessage(@NonNull ITmfEvent event, HostProcessPointer writer) {
/**
* Currently, with the Fast DDS instrumentation, the dds:write event
* does not contain the message/data pointer. We need to get it from a
* separate previous event, dds:write_pre.
*/
if (hasField(event, LAYOUT.fieldData())) {
return hostProcessPointerFrom(event, (long) getField(event, LAYOUT.fieldData()));
}
ITmfEvent ddsWritePre = fDdsWritePreEvents.remove(writer);
if (null == ddsWritePre) {
Activator.getInstance().logError("could not get corresponding dds:write_pre event for writer=0x" + Long.toHexString(writer.getPointer())); //$NON-NLS-1$
return null;
}
return hostProcessPointerFrom(ddsWritePre, (long) getField(ddsWritePre, LAYOUT.fieldData()));
}

private void addPublicationInstance(@NonNull ITmfEvent event, ITmfStateSystemBuilder ss, long timestamp, Ros2PublisherObject publisherObject, @NonNull HostProcessPointer message, ITmfEvent rclcppPublish) {
long sourceTimestamp = (long) getField(event, LAYOUT.fieldTimestamp());
long tid = getTid(event);

/**
* Some rcl_publish events (e.g., for internal rcl-level publisher) do
* not have a corresponding rclcpp_publish event.
Expand All @@ -270,27 +299,34 @@ private void addPublicationInstance(@NonNull ITmfEvent event, ITmfStateSystemBui
// TODO support this, using the rcl_publish timestamp instead
return;
}

long pubTimestamp = rclcppPublish.getTimestamp().toNanos();
long sourceTimestamp = (long) getField(event, LAYOUT.fieldTimestamp());
HostThread thread = hostThreadFrom(event);
Ros2PubInstance pubInstance = new Ros2PubInstance(publisherObject.getHandle(), thread.getTid(), message, sourceTimestamp);
addPublicationInstance(ss, pubTimestamp, endPubTimestamp, thread, publisherObject, pubInstance);
}

Integer pubQuark = Ros2MessagesUtil.getPublisherQuarkAndAdd(ss, fObjectsSs, timestamp, publisherObject.getHandle());
private void addPublicationInstance(ITmfStateSystemBuilder ss, long pubTimestamp, long endPubTimestamp, @NonNull HostThread thread, Ros2PublisherObject publisherObject, @NonNull Ros2PubInstance pubInstance) {
Integer pubQuark = Ros2MessagesUtil.getPublisherQuarkAndAdd(ss, fObjectsSs, endPubTimestamp, publisherObject.getHandle());
if (null == pubQuark) {
return;
}
// Mark publication event using state from rclcpp_pub->dds:write
Ros2PubInstance pubInstance = new Ros2PubInstance(publisherObject.getHandle(), tid, message, sourceTimestamp);
// Mark publication event using state from rclcpp_pub->(dds:write or
// rmw_publish)
ss.modifyAttribute(pubTimestamp, pubInstance, pubQuark);
ss.modifyAttribute(timestamp, null, pubQuark);
ss.modifyAttribute(endPubTimestamp, null, pubQuark);

// Keep pub event for pub-sub links
/**
* TODO match using publisher GID when rmw_cyclonedds correctly supports
* it.
*/
Ros2MessageTimestamp messageSourceTimestamp = new Ros2MessageTimestamp(sourceTimestamp, publisherObject.getTopicName());
fPublications.put(messageSourceTimestamp, new Pair<>(publisherObject.getHandle(), timestamp));
Ros2MessageTimestamp messageSourceTimestamp = new Ros2MessageTimestamp(pubInstance.getSourceTimestamp(), publisherObject.getTopicName());
fPublications.put(messageSourceTimestamp, new Pair<>(publisherObject.getHandle(), endPubTimestamp));

// Add publication to multimap for in-callback links
fCallbackPublications.put(hostThreadFrom(event), new Pair<>(publisherObject.getHandle(), pubTimestamp));
fCallbackPublications.put(thread, new Pair<>(publisherObject.getHandle(), pubTimestamp));
}

private void eventHandleTake(@NonNull ITmfStateSystemBuilder ss, @NonNull ITmfEvent event, long timestamp) {
Expand Down
Loading
Loading