Skip to content

Commit

Permalink
Merge pull request OpenDDS#4693 from mitza-oci/participant-location
Browse files Browse the repository at this point in the history
Publish the ParticipantLocation BIT before participant discovery completes
  • Loading branch information
mitza-oci authored Jun 14, 2024
2 parents 211ad7a + 814a4a1 commit a74c9b4
Show file tree
Hide file tree
Showing 11 changed files with 302 additions and 206 deletions.
8 changes: 0 additions & 8 deletions dds/DCPS/RTPS/Spdp.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -519,14 +519,6 @@ void Spdp::process_location_updates_i(const DiscoveredParticipantIter& iter, con
return;
}

if (iter->second.bit_ih_ == DDS::HANDLE_NIL) {
// Do not process updates until the participant exists in the built-in topics.
if (DCPS::log_bits) {
ACE_DEBUG((LM_DEBUG, "(%P|%t) DEBUG: Spdp::process_location_updates_i: %@ %C does not exist in participant bit, returning\n", this, LogGuid(iter->first).c_str()));
}
return;
}

DiscoveredParticipant::LocationUpdateList location_updates;
std::swap(iter->second.location_updates_, location_updates);

Expand Down
1 change: 1 addition & 0 deletions docs/devguide/built_in_topics.rst
Original file line number Diff line number Diff line change
Expand Up @@ -229,6 +229,7 @@ OpenDDSParticipantLocation Topic
The built-in topic ``OpenDDSParticipantLocation`` is published by the DDSI-RTPS discovery implementation to give applications visibility into the details of how each remote participant is connected over the network.
If the RtpsRelay (:ref:`internet_enabled_rtps--the-rtpsrelay`) and/or IETF ICE (:ref:`internet_enabled_rtps--interactive-connectivity-establishment-ice-for-rtps`) are enabled, their usage is reflected in the OpenDDSParticipantLocation topic data.
Instances of this built-in topic are published before participant discovery is complete so that applications can be notified that discovery is in progress.
The topic type ParticipantLocationBuiltinTopicData is defined in :ghfile:`dds/OpenddsDcpsExt.idl` in the ``OpenDDS::DCPS`` module:

* ``guid`` (key) -- The GUID of the remote participant.
Expand Down
7 changes: 7 additions & 0 deletions docs/news.d/participant-location.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
.. news-prs: 4693
.. news-start-section: Additions
- The ParticipantLocation BIT instance is now published before participant discovery completes.

- Applications can use ParticipantLocation to get notified that discovery is in progress. The spec-defined Participant BIT won't be published until participant discovery is complete.
.. news-end-section
48 changes: 42 additions & 6 deletions java/tests/participant_location/ParticipantLocationListener.java
Original file line number Diff line number Diff line change
Expand Up @@ -7,16 +7,21 @@

import DDS.*;
import OpenDDS.DCPS.*;

import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Set;

import java.util.concurrent.CountDownLatch;

class ParticipantLocationListener extends DDS._DataReaderListenerLocalBase {

private String id;
private boolean noIce;
private boolean noRelay;
private Map<String, Integer> map = new HashMap<String, Integer>();
private Set<String> set = new HashSet<String>();

private CountDownLatch latch;

Expand All @@ -39,22 +44,32 @@ public ParticipantLocationListener(String id, boolean noIce, boolean noRelay, Co
}

public synchronized void on_data_available(DDS.DataReader reader) {
ParticipantLocationBuiltinTopicDataDataReader bitDataReader = ParticipantLocationBuiltinTopicDataDataReaderHelper
.narrow(reader);
if (reader._is_a(ParticipantLocationBuiltinTopicDataDataReaderHelper.id())) {
ParticipantLocationBuiltinTopicDataDataReader bitDataReader = ParticipantLocationBuiltinTopicDataDataReaderHelper.narrow(reader);

if (bitDataReader == null) {
on_data_available_i(bitDataReader);
return;
}

ParticipantBuiltinTopicDataDataReader participantBitDataReader = ParticipantBuiltinTopicDataDataReaderHelper.narrow(reader);
if (participantBitDataReader != null) {
on_data_available_i(participantBitDataReader);
} else {
System.err.println("ParticipantLocationListener on_data_available: narrow failed.");
System.exit(1);
}
}

private void on_data_available_i(ParticipantLocationBuiltinTopicDataDataReader bitDataReader) {
ParticipantLocationBuiltinTopicDataHolder participant = new ParticipantLocationBuiltinTopicDataHolder(
new ParticipantLocationBuiltinTopicData(new byte[16], 0, 0, "", new DDS.Time_t(), "", new DDS.Time_t(), "",
new DDS.Time_t(), "", new DDS.Time_t(), "", new DDS.Time_t(), "", new DDS.Time_t(), new DDS.Duration_t()));
SampleInfoHolder si = new SampleInfoHolder(
new SampleInfo(0, 0, 0, new DDS.Time_t(), 0, 0, 0, 0, 0, 0, 0, false, 0));

for (int status = bitDataReader.read_next_sample(participant,
si); status == DDS.RETCODE_OK.value; status = bitDataReader.read_next_sample(participant, si)) {
for (int status = bitDataReader.read_next_sample(participant, si);
status == DDS.RETCODE_OK.value;
status = bitDataReader.read_next_sample(participant, si)) {

System.out.println("== " + id + " Participant Location ==");
System.out.println(" valid: " + si.value.valid_data);
Expand Down Expand Up @@ -111,6 +126,24 @@ public synchronized void on_data_available(DDS.DataReader reader) {
}
}

private void on_data_available_i(ParticipantBuiltinTopicDataDataReader bitDataReader) {
ParticipantBuiltinTopicDataHolder participant = new ParticipantBuiltinTopicDataHolder(
new ParticipantBuiltinTopicData(new BuiltinTopicKey_t(new byte[0]), new UserDataQosPolicy(new byte[0])));
SampleInfoHolder si = new SampleInfoHolder(
new SampleInfo(0, 0, 0, new DDS.Time_t(), 0, 0, 0, 0, 0, 0, 0, false, 0));

for (int status = bitDataReader.read_next_sample(participant, si);
status == DDS.RETCODE_OK.value;
status = bitDataReader.read_next_sample(participant, si)) {
if (si.value.valid_data) {
set.add(guidFormatter(participant.value.key.value));
if (check(false)) {
latch.countDown();
}
}
}
}

public void on_requested_deadline_missed(DDS.DataReader reader, DDS.RequestedDeadlineMissedStatus status) {
System.err.println("ParticipantLocationListener.on_requested_deadline_missed");
}
Expand Down Expand Up @@ -152,6 +185,9 @@ public boolean check(boolean print) {
boolean found = false;
for (Map.Entry entry : map.entrySet()) {
String key = (String) entry.getKey();
if (!set.contains(key)) {
continue;
}
Integer mask = (Integer) entry.getValue();
if (print) {
System.out.println(id + " " + key + ((mask & OpenDDS.DCPS.LOCATION_LOCAL.value) != 0 ? " LOCAL" : "")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,13 @@ public Boolean call() throws Exception {

DataReader dr = builtinSubscriber.lookup_datareader(BuiltinTopicUtils.BUILT_IN_PARTICIPANT_LOCATION_TOPIC);
if (dr == null) {
System.err.println("ERROR: subscriber could not lookup datareader");
System.err.println("ERROR: subscriber could not lookup datareader for BUILT_IN_PARTICIPANT_LOCATION_TOPIC");
return false;
}

DataReader dr2 = builtinSubscriber.lookup_datareader(BuiltinTopicUtils.BUILT_IN_PARTICIPANT_TOPIC);
if (dr2 == null) {
System.err.println("ERROR: Publisher could not lookup datareader for BUILT_IN_PARTICIPANT_TOPIC");
return false;
}

Expand All @@ -54,7 +60,10 @@ public Boolean call() throws Exception {

int ret = dr.set_listener(locationListener, OpenDDS.DCPS.DEFAULT_STATUS_MASK.value);
assert (ret == DDS.RETCODE_OK.value);
ret = dr2.set_listener(locationListener, OpenDDS.DCPS.DEFAULT_STATUS_MASK.value);
assert (ret == DDS.RETCODE_OK.value);
locationListener.on_data_available(dr);
locationListener.on_data_available(dr2);

Subscriber sub = participant.create_subscriber(SUBSCRIBER_QOS_DEFAULT.get(), null, DEFAULT_STATUS_MASK.value);
if (sub == null) {
Expand Down
13 changes: 11 additions & 2 deletions java/tests/participant_location/ParticipantLocationTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;

public class ParticipantLocationTest {
private static final int N_MSGS = 20;
private static final int DOMAIN_ID = 42;
Expand All @@ -29,7 +30,7 @@ public class ParticipantLocationTest {
private static boolean security = false;
private static boolean ipv6 = false;

public static void main (String[] args) throws Exception {
public static void main(String[] args) throws Exception {

for (String s: args) {
if (s.equals("-n")) {
Expand Down Expand Up @@ -143,7 +144,13 @@ else if (s.equals("-s")) {

DataReader pubDr = pubBuiltinSubscriber.lookup_datareader(BuiltinTopicUtils.BUILT_IN_PARTICIPANT_LOCATION_TOPIC);
if (pubDr == null) {
System.err.println("ERROR: Publisher could not lookup datareader");
System.err.println("ERROR: Publisher could not lookup datareader for BUILT_IN_PARTICIPANT_LOCATION_TOPIC");
return;
}

DataReader pubDr2 = pubBuiltinSubscriber.lookup_datareader(BuiltinTopicUtils.BUILT_IN_PARTICIPANT_TOPIC);
if (pubDr2 == null) {
System.err.println("ERROR: Publisher could not lookup datareader for BUILT_IN_PARTICIPANT_TOPIC");
return;
}

Expand All @@ -153,6 +160,8 @@ else if (s.equals("-s")) {

int ret = pubDr.set_listener(pubLocationListener, OpenDDS.DCPS.DEFAULT_STATUS_MASK.value);
assert (ret == DDS.RETCODE_OK.value);
ret = pubDr2.set_listener(pubLocationListener, OpenDDS.DCPS.DEFAULT_STATUS_MASK.value);
assert (ret == DDS.RETCODE_OK.value);
// Don't need to invoke the listener because the subscriber doesn't exist.

Publisher pub = pubParticipant.create_publisher(PUBLISHER_QOS_DEFAULT.get(), null, DEFAULT_STATUS_MASK.value);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,16 +5,16 @@
* See: http://www.opendds.org/license.html
*/

#include "ParticipantLocationListenerImpl.h"
#include <dds/OpenddsDcpsExtTypeSupportImpl.h>
#include "BitListener.h"

#include <ace/streams.h>

#include <string>

// Implementation skeleton constructor
ParticipantLocationListenerImpl::ParticipantLocationListenerImpl(const std::string& id,
bool noice,
bool ipv6,
callback_t done_callback)
BitListener::BitListener(const std::string& id,
bool noice,
bool ipv6,
callback_t done_callback)
: id_(id)
, no_ice_(noice)
, ipv6_(ipv6)
Expand All @@ -23,35 +23,43 @@ ParticipantLocationListenerImpl::ParticipantLocationListenerImpl(const std::stri
{
}

// Implementation skeleton destructor
ParticipantLocationListenerImpl::~ParticipantLocationListenerImpl()
BitListener::~BitListener()
{
}

void ParticipantLocationListenerImpl::on_data_available(DDS::DataReader_ptr reader)
void BitListener::on_data_available(DDS::DataReader_ptr reader)
{
ACE_Guard<ACE_Thread_Mutex> g(mutex_);

// 1. Narrow the DataReader to an ParticipantLocationBuiltinTopicDataDataReader
// 2. Read the samples from the data reader
// 3. Print out the contents of the samples
OpenDDS::DCPS::ParticipantLocationBuiltinTopicDataDataReader_var builtin_dr =
OpenDDS::DCPS::ParticipantLocationBuiltinTopicDataDataReader::_narrow(reader);
if (0 == builtin_dr)
{
std::cerr << "ParticipantLocationListenerImpl::"
<< "on_data_available: _narrow failed." << std::endl;
ACE_OS::exit(1);
}

if (builtin_dr) {
on_data_available_i(builtin_dr);
return;
}

DDS::ParticipantBuiltinTopicDataDataReader_var participant_dr =
DDS::ParticipantBuiltinTopicDataDataReader::_narrow(reader);

if (participant_dr) {
on_data_available_i(participant_dr);
} else {
std::cerr << "BitListener::"
<< "on_data_available: _narrow failed." << std::endl;
ACE_OS::exit(1);
}
}

void BitListener::on_data_available_i(OpenDDS::DCPS::ParticipantLocationBuiltinTopicDataDataReader_var builtin_dr)
{
OpenDDS::DCPS::ParticipantLocationBuiltinTopicData participant;
DDS::SampleInfo si;

for (DDS::ReturnCode_t status = builtin_dr->read_next_sample(participant, si);
status == DDS::RETCODE_OK;
status = builtin_dr->read_next_sample(participant, si)) {

// copy octet[] to guid
OpenDDS::DCPS::GUID_t guid;
std::memcpy(&guid, &participant.guid, sizeof(guid));

Expand Down Expand Up @@ -90,8 +98,7 @@ void ParticipantLocationListenerImpl::on_data_available(DDS::DataReader_ptr read
<< " lease: " << OpenDDS::DCPS::TimeDuration(participant.lease_duration).str(9) << std::endl;

// update locations if SampleInfo is valid.
if (si.valid_data == 1)
{
if (si.valid_data) {
std::pair<LocationMapType::iterator, bool> p = location_map.insert(std::make_pair(guid, 0));
p.first->second |= participant.location;
}
Expand All @@ -104,55 +111,72 @@ void ParticipantLocationListenerImpl::on_data_available(DDS::DataReader_ptr read
}
}

void ParticipantLocationListenerImpl::on_requested_deadline_missed(
void BitListener::on_data_available_i(DDS::ParticipantBuiltinTopicDataDataReader_var builtin_dr)
{
DDS::ParticipantBuiltinTopicData participant;
DDS::SampleInfo si;

for (DDS::ReturnCode_t status = builtin_dr->read_next_sample(participant, si);
status == DDS::RETCODE_OK;
status = builtin_dr->read_next_sample(participant, si)) {

if (si.valid_data) {
OpenDDS::DCPS::GUID_t guid;
std::memcpy(&guid, &participant.key, sizeof(guid));
participants_seen_.insert(guid);

if (!done_ && check(true)) {
done_ = true;
std::cout << "== " << id_ << " Participant received all expected locations" << std::endl;
done_callback_();
}
}
}
}

void BitListener::on_requested_deadline_missed(
DDS::DataReader_ptr,
const DDS::RequestedDeadlineMissedStatus &)
const DDS::RequestedDeadlineMissedStatus&)
{
std::cerr << "ParticipantLocationListenerImpl::"
<< "on_requested_deadline_missed" << std::endl;
std::cerr << "BitListener::on_requested_deadline_missed" << std::endl;
}

void ParticipantLocationListenerImpl::on_requested_incompatible_qos(
void BitListener::on_requested_incompatible_qos(
DDS::DataReader_ptr,
const DDS::RequestedIncompatibleQosStatus &)
const DDS::RequestedIncompatibleQosStatus&)
{
std::cerr << "ParticipantLocationListenerImpl::"
<< "on_requested_incompatible_qos" << std::endl;
std::cerr << "BitListener::on_requested_incompatible_qos" << std::endl;
}

void ParticipantLocationListenerImpl::on_liveliness_changed(
void BitListener::on_liveliness_changed(
DDS::DataReader_ptr,
const DDS::LivelinessChangedStatus&)
{
std::cerr << "ParticipantLocationListenerImpl::"
<< "on_liveliness_changed" << std::endl;
std::cerr << "BitListener::on_liveliness_changed" << std::endl;
}

void ParticipantLocationListenerImpl::on_subscription_matched(
void BitListener::on_subscription_matched(
DDS::DataReader_ptr,
const DDS::SubscriptionMatchedStatus &)
const DDS::SubscriptionMatchedStatus&)
{
std::cerr << "ParticipantLocationListenerImpl::"
<< "on_subscription_matched" << std::endl;
std::cerr << "BitListener::on_subscription_matched" << std::endl;
}

void ParticipantLocationListenerImpl::on_sample_rejected(
void BitListener::on_sample_rejected(
DDS::DataReader_ptr,
const DDS::SampleRejectedStatus&)
{
std::cerr << "ParticipantLocationListenerImpl::"
<< "on_sample_rejected" << std::endl;
std::cerr << "BitListener::on_sample_rejected" << std::endl;
}

void ParticipantLocationListenerImpl::on_sample_lost(
void BitListener::on_sample_lost(
DDS::DataReader_ptr,
const DDS::SampleLostStatus&)
{
std::cerr << "ParticipantLocationListenerImpl::"
<< "on_sample_lost" << std::endl;
std::cerr << "BitListener::on_sample_lost" << std::endl;
}

bool ParticipantLocationListenerImpl::check(bool print_results)
bool BitListener::check(bool print_results)
{
const unsigned long expected =
OpenDDS::DCPS::LOCATION_LOCAL
Expand Down Expand Up @@ -180,7 +204,10 @@ bool ParticipantLocationListenerImpl::check(bool print_results)

bool found = false;
for (LocationMapType::const_iterator pos = location_map.begin(), limit = location_map.end();
pos != limit; ++ pos) {
pos != limit; ++pos) {
if (participants_seen_.count(pos->first) == 0) {
continue;
}
if (print_results) {
std::cout << id_ << " " << pos->first
<< ((pos->second & OpenDDS::DCPS::LOCATION_LOCAL) ? " LOCAL" : "")
Expand Down
Loading

0 comments on commit a74c9b4

Please sign in to comment.