diff --git a/dds/DCPS/RTPS/Spdp.cpp b/dds/DCPS/RTPS/Spdp.cpp index 4a3cb6f7f6b..71e4a9524ce 100644 --- a/dds/DCPS/RTPS/Spdp.cpp +++ b/dds/DCPS/RTPS/Spdp.cpp @@ -519,14 +519,6 @@ void Spdp::process_location_updates_i(const DiscoveredParticipantIter& iter, con return; } - if (iter->second.bit_ih_ == DDS::HANDLE_NIL) { - // Do not process updates until the participant exists in the built-in topics. - if (DCPS::log_bits) { - ACE_DEBUG((LM_DEBUG, "(%P|%t) DEBUG: Spdp::process_location_updates_i: %@ %C does not exist in participant bit, returning\n", this, LogGuid(iter->first).c_str())); - } - return; - } - DiscoveredParticipant::LocationUpdateList location_updates; std::swap(iter->second.location_updates_, location_updates); diff --git a/docs/devguide/built_in_topics.rst b/docs/devguide/built_in_topics.rst index f6f3ba16d2e..3184ae40b12 100644 --- a/docs/devguide/built_in_topics.rst +++ b/docs/devguide/built_in_topics.rst @@ -229,6 +229,7 @@ OpenDDSParticipantLocation Topic The built-in topic ``OpenDDSParticipantLocation`` is published by the DDSI-RTPS discovery implementation to give applications visibility into the details of how each remote participant is connected over the network. If the RtpsRelay (:ref:`internet_enabled_rtps--the-rtpsrelay`) and/or IETF ICE (:ref:`internet_enabled_rtps--interactive-connectivity-establishment-ice-for-rtps`) are enabled, their usage is reflected in the OpenDDSParticipantLocation topic data. +Instances of this built-in topic are published before participant discovery is complete so that applications can be notified that discovery is in progress. The topic type ParticipantLocationBuiltinTopicData is defined in :ghfile:`dds/OpenddsDcpsExt.idl` in the ``OpenDDS::DCPS`` module: * ``guid`` (key) -- The GUID of the remote participant. diff --git a/docs/news.d/participant-location.rst b/docs/news.d/participant-location.rst new file mode 100644 index 00000000000..e5360cf4eaf --- /dev/null +++ b/docs/news.d/participant-location.rst @@ -0,0 +1,7 @@ +.. news-prs: 4693 + +.. news-start-section: Additions +- The ParticipantLocation BIT instance is now published before participant discovery completes. + + - Applications can use ParticipantLocation to get notified that discovery is in progress. The spec-defined Participant BIT won't be published until participant discovery is complete. +.. news-end-section diff --git a/java/tests/participant_location/ParticipantLocationListener.java b/java/tests/participant_location/ParticipantLocationListener.java index 28a5225d3c6..abe8698bec0 100644 --- a/java/tests/participant_location/ParticipantLocationListener.java +++ b/java/tests/participant_location/ParticipantLocationListener.java @@ -7,9 +7,13 @@ import DDS.*; import OpenDDS.DCPS.*; + import java.util.Map; -import java.util.concurrent.CountDownLatch; import java.util.HashMap; +import java.util.HashSet; +import java.util.Set; + +import java.util.concurrent.CountDownLatch; class ParticipantLocationListener extends DDS._DataReaderListenerLocalBase { @@ -17,6 +21,7 @@ class ParticipantLocationListener extends DDS._DataReaderListenerLocalBase { private boolean noIce; private boolean noRelay; private Map map = new HashMap(); + private Set set = new HashSet(); private CountDownLatch latch; @@ -39,22 +44,32 @@ public ParticipantLocationListener(String id, boolean noIce, boolean noRelay, Co } public synchronized void on_data_available(DDS.DataReader reader) { - ParticipantLocationBuiltinTopicDataDataReader bitDataReader = ParticipantLocationBuiltinTopicDataDataReaderHelper - .narrow(reader); + if (reader._is_a(ParticipantLocationBuiltinTopicDataDataReaderHelper.id())) { + ParticipantLocationBuiltinTopicDataDataReader bitDataReader = ParticipantLocationBuiltinTopicDataDataReaderHelper.narrow(reader); - if (bitDataReader == null) { + on_data_available_i(bitDataReader); + return; + } + + ParticipantBuiltinTopicDataDataReader participantBitDataReader = ParticipantBuiltinTopicDataDataReaderHelper.narrow(reader); + if (participantBitDataReader != null) { + on_data_available_i(participantBitDataReader); + } else { System.err.println("ParticipantLocationListener on_data_available: narrow failed."); System.exit(1); } + } + private void on_data_available_i(ParticipantLocationBuiltinTopicDataDataReader bitDataReader) { ParticipantLocationBuiltinTopicDataHolder participant = new ParticipantLocationBuiltinTopicDataHolder( new ParticipantLocationBuiltinTopicData(new byte[16], 0, 0, "", new DDS.Time_t(), "", new DDS.Time_t(), "", new DDS.Time_t(), "", new DDS.Time_t(), "", new DDS.Time_t(), "", new DDS.Time_t(), new DDS.Duration_t())); SampleInfoHolder si = new SampleInfoHolder( new SampleInfo(0, 0, 0, new DDS.Time_t(), 0, 0, 0, 0, 0, 0, 0, false, 0)); - for (int status = bitDataReader.read_next_sample(participant, - si); status == DDS.RETCODE_OK.value; status = bitDataReader.read_next_sample(participant, si)) { + for (int status = bitDataReader.read_next_sample(participant, si); + status == DDS.RETCODE_OK.value; + status = bitDataReader.read_next_sample(participant, si)) { System.out.println("== " + id + " Participant Location =="); System.out.println(" valid: " + si.value.valid_data); @@ -111,6 +126,24 @@ public synchronized void on_data_available(DDS.DataReader reader) { } } + private void on_data_available_i(ParticipantBuiltinTopicDataDataReader bitDataReader) { + ParticipantBuiltinTopicDataHolder participant = new ParticipantBuiltinTopicDataHolder( + new ParticipantBuiltinTopicData(new BuiltinTopicKey_t(new byte[0]), new UserDataQosPolicy(new byte[0]))); + SampleInfoHolder si = new SampleInfoHolder( + new SampleInfo(0, 0, 0, new DDS.Time_t(), 0, 0, 0, 0, 0, 0, 0, false, 0)); + + for (int status = bitDataReader.read_next_sample(participant, si); + status == DDS.RETCODE_OK.value; + status = bitDataReader.read_next_sample(participant, si)) { + if (si.value.valid_data) { + set.add(guidFormatter(participant.value.key.value)); + if (check(false)) { + latch.countDown(); + } + } + } + } + public void on_requested_deadline_missed(DDS.DataReader reader, DDS.RequestedDeadlineMissedStatus status) { System.err.println("ParticipantLocationListener.on_requested_deadline_missed"); } @@ -152,6 +185,9 @@ public boolean check(boolean print) { boolean found = false; for (Map.Entry entry : map.entrySet()) { String key = (String) entry.getKey(); + if (!set.contains(key)) { + continue; + } Integer mask = (Integer) entry.getValue(); if (print) { System.out.println(id + " " + key + ((mask & OpenDDS.DCPS.LOCATION_LOCAL.value) != 0 ? " LOCAL" : "") diff --git a/java/tests/participant_location/ParticipantLocationSubscriber.java b/java/tests/participant_location/ParticipantLocationSubscriber.java index 1d1ebb0d84c..1d63618cd63 100644 --- a/java/tests/participant_location/ParticipantLocationSubscriber.java +++ b/java/tests/participant_location/ParticipantLocationSubscriber.java @@ -44,7 +44,13 @@ public Boolean call() throws Exception { DataReader dr = builtinSubscriber.lookup_datareader(BuiltinTopicUtils.BUILT_IN_PARTICIPANT_LOCATION_TOPIC); if (dr == null) { - System.err.println("ERROR: subscriber could not lookup datareader"); + System.err.println("ERROR: subscriber could not lookup datareader for BUILT_IN_PARTICIPANT_LOCATION_TOPIC"); + return false; + } + + DataReader dr2 = builtinSubscriber.lookup_datareader(BuiltinTopicUtils.BUILT_IN_PARTICIPANT_TOPIC); + if (dr2 == null) { + System.err.println("ERROR: Publisher could not lookup datareader for BUILT_IN_PARTICIPANT_TOPIC"); return false; } @@ -54,7 +60,10 @@ public Boolean call() throws Exception { int ret = dr.set_listener(locationListener, OpenDDS.DCPS.DEFAULT_STATUS_MASK.value); assert (ret == DDS.RETCODE_OK.value); + ret = dr2.set_listener(locationListener, OpenDDS.DCPS.DEFAULT_STATUS_MASK.value); + assert (ret == DDS.RETCODE_OK.value); locationListener.on_data_available(dr); + locationListener.on_data_available(dr2); Subscriber sub = participant.create_subscriber(SUBSCRIBER_QOS_DEFAULT.get(), null, DEFAULT_STATUS_MASK.value); if (sub == null) { diff --git a/java/tests/participant_location/ParticipantLocationTest.java b/java/tests/participant_location/ParticipantLocationTest.java index bb6fde2c752..81db54ef3cc 100644 --- a/java/tests/participant_location/ParticipantLocationTest.java +++ b/java/tests/participant_location/ParticipantLocationTest.java @@ -16,6 +16,7 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; + public class ParticipantLocationTest { private static final int N_MSGS = 20; private static final int DOMAIN_ID = 42; @@ -29,7 +30,7 @@ public class ParticipantLocationTest { private static boolean security = false; private static boolean ipv6 = false; - public static void main (String[] args) throws Exception { + public static void main(String[] args) throws Exception { for (String s: args) { if (s.equals("-n")) { @@ -143,7 +144,13 @@ else if (s.equals("-s")) { DataReader pubDr = pubBuiltinSubscriber.lookup_datareader(BuiltinTopicUtils.BUILT_IN_PARTICIPANT_LOCATION_TOPIC); if (pubDr == null) { - System.err.println("ERROR: Publisher could not lookup datareader"); + System.err.println("ERROR: Publisher could not lookup datareader for BUILT_IN_PARTICIPANT_LOCATION_TOPIC"); + return; + } + + DataReader pubDr2 = pubBuiltinSubscriber.lookup_datareader(BuiltinTopicUtils.BUILT_IN_PARTICIPANT_TOPIC); + if (pubDr2 == null) { + System.err.println("ERROR: Publisher could not lookup datareader for BUILT_IN_PARTICIPANT_TOPIC"); return; } @@ -153,6 +160,8 @@ else if (s.equals("-s")) { int ret = pubDr.set_listener(pubLocationListener, OpenDDS.DCPS.DEFAULT_STATUS_MASK.value); assert (ret == DDS.RETCODE_OK.value); + ret = pubDr2.set_listener(pubLocationListener, OpenDDS.DCPS.DEFAULT_STATUS_MASK.value); + assert (ret == DDS.RETCODE_OK.value); // Don't need to invoke the listener because the subscriber doesn't exist. Publisher pub = pubParticipant.create_publisher(PUBLISHER_QOS_DEFAULT.get(), null, DEFAULT_STATUS_MASK.value); diff --git a/tests/DCPS/ParticipantLocationTopic/ParticipantLocationListenerImpl.cpp b/tests/DCPS/ParticipantLocationTopic/BitListener.cpp similarity index 67% rename from tests/DCPS/ParticipantLocationTopic/ParticipantLocationListenerImpl.cpp rename to tests/DCPS/ParticipantLocationTopic/BitListener.cpp index eb866192995..c777cf4f3a3 100644 --- a/tests/DCPS/ParticipantLocationTopic/ParticipantLocationListenerImpl.cpp +++ b/tests/DCPS/ParticipantLocationTopic/BitListener.cpp @@ -5,16 +5,16 @@ * See: http://www.opendds.org/license.html */ -#include "ParticipantLocationListenerImpl.h" -#include +#include "BitListener.h" + #include + #include -// Implementation skeleton constructor -ParticipantLocationListenerImpl::ParticipantLocationListenerImpl(const std::string& id, - bool noice, - bool ipv6, - callback_t done_callback) +BitListener::BitListener(const std::string& id, + bool noice, + bool ipv6, + callback_t done_callback) : id_(id) , no_ice_(noice) , ipv6_(ipv6) @@ -23,27 +23,36 @@ ParticipantLocationListenerImpl::ParticipantLocationListenerImpl(const std::stri { } -// Implementation skeleton destructor -ParticipantLocationListenerImpl::~ParticipantLocationListenerImpl() +BitListener::~BitListener() { } -void ParticipantLocationListenerImpl::on_data_available(DDS::DataReader_ptr reader) +void BitListener::on_data_available(DDS::DataReader_ptr reader) { ACE_Guard g(mutex_); - // 1. Narrow the DataReader to an ParticipantLocationBuiltinTopicDataDataReader - // 2. Read the samples from the data reader - // 3. Print out the contents of the samples OpenDDS::DCPS::ParticipantLocationBuiltinTopicDataDataReader_var builtin_dr = OpenDDS::DCPS::ParticipantLocationBuiltinTopicDataDataReader::_narrow(reader); - if (0 == builtin_dr) - { - std::cerr << "ParticipantLocationListenerImpl::" - << "on_data_available: _narrow failed." << std::endl; - ACE_OS::exit(1); - } + if (builtin_dr) { + on_data_available_i(builtin_dr); + return; + } + + DDS::ParticipantBuiltinTopicDataDataReader_var participant_dr = + DDS::ParticipantBuiltinTopicDataDataReader::_narrow(reader); + + if (participant_dr) { + on_data_available_i(participant_dr); + } else { + std::cerr << "BitListener::" + << "on_data_available: _narrow failed." << std::endl; + ACE_OS::exit(1); + } +} + +void BitListener::on_data_available_i(OpenDDS::DCPS::ParticipantLocationBuiltinTopicDataDataReader_var builtin_dr) +{ OpenDDS::DCPS::ParticipantLocationBuiltinTopicData participant; DDS::SampleInfo si; @@ -51,7 +60,6 @@ void ParticipantLocationListenerImpl::on_data_available(DDS::DataReader_ptr read status == DDS::RETCODE_OK; status = builtin_dr->read_next_sample(participant, si)) { - // copy octet[] to guid OpenDDS::DCPS::GUID_t guid; std::memcpy(&guid, &participant.guid, sizeof(guid)); @@ -90,8 +98,7 @@ void ParticipantLocationListenerImpl::on_data_available(DDS::DataReader_ptr read << " lease: " << OpenDDS::DCPS::TimeDuration(participant.lease_duration).str(9) << std::endl; // update locations if SampleInfo is valid. - if (si.valid_data == 1) - { + if (si.valid_data) { std::pair p = location_map.insert(std::make_pair(guid, 0)); p.first->second |= participant.location; } @@ -104,55 +111,72 @@ void ParticipantLocationListenerImpl::on_data_available(DDS::DataReader_ptr read } } -void ParticipantLocationListenerImpl::on_requested_deadline_missed( +void BitListener::on_data_available_i(DDS::ParticipantBuiltinTopicDataDataReader_var builtin_dr) +{ + DDS::ParticipantBuiltinTopicData participant; + DDS::SampleInfo si; + + for (DDS::ReturnCode_t status = builtin_dr->read_next_sample(participant, si); + status == DDS::RETCODE_OK; + status = builtin_dr->read_next_sample(participant, si)) { + + if (si.valid_data) { + OpenDDS::DCPS::GUID_t guid; + std::memcpy(&guid, &participant.key, sizeof(guid)); + participants_seen_.insert(guid); + + if (!done_ && check(true)) { + done_ = true; + std::cout << "== " << id_ << " Participant received all expected locations" << std::endl; + done_callback_(); + } + } + } +} + +void BitListener::on_requested_deadline_missed( DDS::DataReader_ptr, - const DDS::RequestedDeadlineMissedStatus &) + const DDS::RequestedDeadlineMissedStatus&) { - std::cerr << "ParticipantLocationListenerImpl::" - << "on_requested_deadline_missed" << std::endl; + std::cerr << "BitListener::on_requested_deadline_missed" << std::endl; } -void ParticipantLocationListenerImpl::on_requested_incompatible_qos( +void BitListener::on_requested_incompatible_qos( DDS::DataReader_ptr, - const DDS::RequestedIncompatibleQosStatus &) + const DDS::RequestedIncompatibleQosStatus&) { - std::cerr << "ParticipantLocationListenerImpl::" - << "on_requested_incompatible_qos" << std::endl; + std::cerr << "BitListener::on_requested_incompatible_qos" << std::endl; } -void ParticipantLocationListenerImpl::on_liveliness_changed( +void BitListener::on_liveliness_changed( DDS::DataReader_ptr, const DDS::LivelinessChangedStatus&) { - std::cerr << "ParticipantLocationListenerImpl::" - << "on_liveliness_changed" << std::endl; + std::cerr << "BitListener::on_liveliness_changed" << std::endl; } -void ParticipantLocationListenerImpl::on_subscription_matched( +void BitListener::on_subscription_matched( DDS::DataReader_ptr, - const DDS::SubscriptionMatchedStatus &) + const DDS::SubscriptionMatchedStatus&) { - std::cerr << "ParticipantLocationListenerImpl::" - << "on_subscription_matched" << std::endl; + std::cerr << "BitListener::on_subscription_matched" << std::endl; } -void ParticipantLocationListenerImpl::on_sample_rejected( +void BitListener::on_sample_rejected( DDS::DataReader_ptr, const DDS::SampleRejectedStatus&) { - std::cerr << "ParticipantLocationListenerImpl::" - << "on_sample_rejected" << std::endl; + std::cerr << "BitListener::on_sample_rejected" << std::endl; } -void ParticipantLocationListenerImpl::on_sample_lost( +void BitListener::on_sample_lost( DDS::DataReader_ptr, const DDS::SampleLostStatus&) { - std::cerr << "ParticipantLocationListenerImpl::" - << "on_sample_lost" << std::endl; + std::cerr << "BitListener::on_sample_lost" << std::endl; } -bool ParticipantLocationListenerImpl::check(bool print_results) +bool BitListener::check(bool print_results) { const unsigned long expected = OpenDDS::DCPS::LOCATION_LOCAL @@ -180,7 +204,10 @@ bool ParticipantLocationListenerImpl::check(bool print_results) bool found = false; for (LocationMapType::const_iterator pos = location_map.begin(), limit = location_map.end(); - pos != limit; ++ pos) { + pos != limit; ++pos) { + if (participants_seen_.count(pos->first) == 0) { + continue; + } if (print_results) { std::cout << id_ << " " << pos->first << ((pos->second & OpenDDS::DCPS::LOCATION_LOCAL) ? " LOCAL" : "") diff --git a/tests/DCPS/ParticipantLocationTopic/BitListener.h b/tests/DCPS/ParticipantLocationTopic/BitListener.h new file mode 100644 index 00000000000..7305d4ceb4d --- /dev/null +++ b/tests/DCPS/ParticipantLocationTopic/BitListener.h @@ -0,0 +1,74 @@ +/* + * + * + * Distributed under the OpenDDS License. + * See: http://www.opendds.org/license.html + */ + +#ifndef BIT_LISTENER +#define BIT_LISTENER + +#include +#include +#include + +#include +#include + +#include + +#if !defined (ACE_LACKS_PRAGMA_ONCE) +#pragma once +#endif /* ACE_LACKS_PRAGMA_ONCE */ + +typedef void (*callback_t)(); + +class BitListener + : public virtual OpenDDS::DCPS::LocalObject { +public: + BitListener(const std::string& id, bool no_ice, bool ipv6, callback_t done_callback); + + virtual ~BitListener(); + + virtual void on_requested_deadline_missed(DDS::DataReader_ptr reader, + const DDS::RequestedDeadlineMissedStatus& status); + + virtual void on_requested_incompatible_qos(DDS::DataReader_ptr reader, + const DDS::RequestedIncompatibleQosStatus& status); + + virtual void on_liveliness_changed(DDS::DataReader_ptr reader, + const DDS::LivelinessChangedStatus& status); + + virtual void on_subscription_matched(DDS::DataReader_ptr reader, + const DDS::SubscriptionMatchedStatus& status); + + virtual void on_sample_rejected(DDS::DataReader_ptr reader, + const DDS::SampleRejectedStatus& status); + + virtual void on_data_available(DDS::DataReader_ptr reader); + + virtual void on_sample_lost(DDS::DataReader_ptr reader, + const DDS::SampleLostStatus& status); + + bool check(bool print_results = true); + +private: + ACE_Thread_Mutex mutex_; + + typedef std::map LocationMapType; + LocationMapType location_map; + + const std::string id_; + bool no_ice_; + bool ipv6_; + callback_t done_callback_; + bool done_; + + typedef std::set GuidSetType; + GuidSetType participants_seen_; + + void on_data_available_i(OpenDDS::DCPS::ParticipantLocationBuiltinTopicDataDataReader_var builtin_dr); + void on_data_available_i(DDS::ParticipantBuiltinTopicDataDataReader_var builtin_dr); +}; + +#endif diff --git a/tests/DCPS/ParticipantLocationTopic/ParticipantLocation.mpc b/tests/DCPS/ParticipantLocationTopic/ParticipantLocation.mpc index 5174e27abe6..2821389ea2e 100644 --- a/tests/DCPS/ParticipantLocationTopic/ParticipantLocation.mpc +++ b/tests/DCPS/ParticipantLocationTopic/ParticipantLocation.mpc @@ -1,9 +1,4 @@ project(ParticipantLocationTopic): dcps_test, dcps_cm, opendds_security, dcps_rtps_udp { exename = ParticipantLocationTest requires += built_in_topics - - Source_Files { - ParticipantLocationTest.cpp - ParticipantLocationListenerImpl.cpp - } } diff --git a/tests/DCPS/ParticipantLocationTopic/ParticipantLocationListenerImpl.h b/tests/DCPS/ParticipantLocationTopic/ParticipantLocationListenerImpl.h deleted file mode 100644 index 833cdb4c9fc..00000000000 --- a/tests/DCPS/ParticipantLocationTopic/ParticipantLocationListenerImpl.h +++ /dev/null @@ -1,76 +0,0 @@ -/* - * - * - * Distributed under the OpenDDS License. - * See: http://www.opendds.org/license.html - */ - -#ifndef PARTICIPANT_LOCATION_LISTENER_IMPL -#define PARTICIPANT_LOCATION_LISTENER_IMPL - -#include -#include -#include - -#include - -#if !defined (ACE_LACKS_PRAGMA_ONCE) -#pragma once -#endif /* ACE_LACKS_PRAGMA_ONCE */ - -typedef void (*callback_t)(); - -class ParticipantLocationListenerImpl - : public virtual OpenDDS::DCPS::LocalObject -{ -public: - //Constructor - explicit ParticipantLocationListenerImpl(const std::string& id, bool no_ice, - bool ipv6, callback_t done_callback); - - //Destructor - virtual ~ParticipantLocationListenerImpl(); - - virtual void on_requested_deadline_missed( - DDS::DataReader_ptr reader, - const DDS::RequestedDeadlineMissedStatus & status); - - virtual void on_requested_incompatible_qos( - DDS::DataReader_ptr reader, - const DDS::RequestedIncompatibleQosStatus & status); - - virtual void on_liveliness_changed( - DDS::DataReader_ptr reader, - const DDS::LivelinessChangedStatus & status); - - virtual void on_subscription_matched( - DDS::DataReader_ptr reader, - const DDS::SubscriptionMatchedStatus & status); - - virtual void on_sample_rejected( - DDS::DataReader_ptr reader, - const DDS::SampleRejectedStatus& status); - - virtual void on_data_available( - DDS::DataReader_ptr reader); - - virtual void on_sample_lost( - DDS::DataReader_ptr reader, - const DDS::SampleLostStatus& status); - - bool check(bool print_results = true); - - typedef std::map LocationMapType; - LocationMapType location_map; - -private: - ACE_Thread_Mutex mutex_; - const std::string id_; - bool no_ice_; - bool ipv6_; - callback_t done_callback_; - bool done_; - -}; - -#endif /* PARTICIPANT_LOCATION_LISTENER_IMPL */ diff --git a/tests/DCPS/ParticipantLocationTopic/ParticipantLocationTest.cpp b/tests/DCPS/ParticipantLocationTopic/ParticipantLocationTest.cpp index dcfe73e62c5..2bd2b0567e1 100644 --- a/tests/DCPS/ParticipantLocationTopic/ParticipantLocationTest.cpp +++ b/tests/DCPS/ParticipantLocationTopic/ParticipantLocationTest.cpp @@ -3,7 +3,7 @@ * See: http://www.opendds.org/license.html */ -#include "ParticipantLocationListenerImpl.h" +#include "BitListener.h" #include #include @@ -43,29 +43,27 @@ const char sub_permissions_file[] = "file:./permissions_subscriber_signed.p7s"; bool no_ice = false; bool ipv6 = false; -int parse_args (int argc, ACE_TCHAR *argv[]) +int parse_args(int argc, ACE_TCHAR* argv[]) { - ACE_Get_Opt get_opts (argc, argv, ACE_TEXT ("n6")); + ACE_Get_Opt get_opts(argc, argv, ACE_TEXT("n6")); int c; - while ((c = get_opts ()) != -1) - switch (c) - { - case 'n': - no_ice = true; - break; - case '6': - ipv6 = true; - break; - case '?': - default: - ACE_ERROR_RETURN ((LM_ERROR, - "usage: %s " - "-n do not check for ICE connections" - "\n", - argv [0]), - -1); - } + while ((c = get_opts()) != -1) + switch (c) { + case 'n': + no_ice = true; + break; + case '6': + ipv6 = true; + break; + case '?': + default: + ACE_ERROR_RETURN((LM_ERROR, "usage: %s\n" + "\t-n do not check for ICE connections\n" + "\t-6 use IPv6\n", + argv[0]), + -1); + } // Indicates successful parsing of the command line return 0; } @@ -89,7 +87,7 @@ void participants_done_callback() participants_done_cond.signal(); } -int ACE_TMAIN(int argc, ACE_TCHAR *argv[]) +int ACE_TMAIN(int argc, ACE_TCHAR* argv[]) { int status = EXIT_SUCCESS; @@ -97,10 +95,9 @@ int ACE_TMAIN(int argc, ACE_TCHAR *argv[]) std::cout << "Starting publisher" << std::endl; - // Initialize DomainParticipantFactory DDS::DomainParticipantFactory_var dpf = TheParticipantFactoryWithArgs(argc, argv); - if( parse_args(argc, argv) != 0) + if (parse_args(argc, argv) != 0) return 1; DDS::DomainParticipantQos pub_qos; @@ -110,7 +107,6 @@ int ACE_TMAIN(int argc, ACE_TCHAR *argv[]) dpf->get_default_participant_qos(sub_qos); #if OPENDDS_CONFIG_SECURITY - // Determine the path to the keys OPENDDS_STRING path_to_tests; const char* dds_root = ACE_OS::getenv("DDS_ROOT"); if (dds_root && dds_root[0]) { @@ -147,47 +143,43 @@ int ACE_TMAIN(int argc, ACE_TCHAR *argv[]) #endif - // Create Publisher DomainParticipant DDS::DomainParticipant_var participant = dpf->create_participant(42, pub_qos, DDS::DomainParticipantListener::_nil(), OpenDDS::DCPS::DEFAULT_STATUS_MASK); - if (CORBA::is_nil(participant.in())) { + if (!participant) { ACE_ERROR_RETURN((LM_ERROR, ACE_TEXT("%N:%l: main()") ACE_TEXT(" ERROR: create_participant failed!\n")), EXIT_FAILURE); } - // Register TypeSupport (Messenger::Message) Messenger::MessageTypeSupport_var mts = new Messenger::MessageTypeSupportImpl(); - if (mts->register_type(participant.in(), "") != DDS::RETCODE_OK) { + if (mts->register_type(participant, "") != DDS::RETCODE_OK) { ACE_ERROR_RETURN((LM_ERROR, ACE_TEXT("%N:%l: main()") ACE_TEXT(" ERROR: register_type failed!\n")), EXIT_FAILURE); } - // Create Topic CORBA::String_var type_name = mts->get_type_name(); DDS::Topic_var topic = participant->create_topic("Movie Discussion List", - type_name.in(), + type_name, TOPIC_QOS_DEFAULT, DDS::TopicListener::_nil(), OpenDDS::DCPS::DEFAULT_STATUS_MASK); - if (CORBA::is_nil(topic.in())) { + if (!topic) { ACE_ERROR_RETURN((LM_ERROR, ACE_TEXT("%N:%l: main()") ACE_TEXT(" ERROR: create_topic failed!\n")), EXIT_FAILURE); } - // Create Publisher DDS::PublisherQos publisher_qos; participant->get_default_publisher_qos(publisher_qos); publisher_qos.partition.name.length(1); @@ -198,7 +190,7 @@ int ACE_TMAIN(int argc, ACE_TCHAR *argv[]) DDS::PublisherListener::_nil(), OpenDDS::DCPS::DEFAULT_STATUS_MASK); - if (CORBA::is_nil(pub.in())) { + if (!pub) { ACE_ERROR_RETURN((LM_ERROR, ACE_TEXT("%N:%l: main()") ACE_TEXT(" ERROR: create_publisher failed!\n")), @@ -213,21 +205,19 @@ int ACE_TMAIN(int argc, ACE_TCHAR *argv[]) qos.history.kind = DDS::KEEP_ALL_HISTORY_QOS; qos.reliability.kind = DDS::RELIABLE_RELIABILITY_QOS; - // Create DataWriter DDS::DataWriter_var dw = - pub->create_datawriter(topic.in(), + pub->create_datawriter(topic, qos, DDS::DataWriterListener::_nil(), OpenDDS::DCPS::DEFAULT_STATUS_MASK); - if (CORBA::is_nil(dw.in())) { + if (!dw) { ACE_ERROR_RETURN((LM_ERROR, ACE_TEXT("%N:%l: main()") ACE_TEXT(" ERROR: create_datawriter failed!\n")), EXIT_FAILURE); } - // Get the Built-In Subscriber for Built-In Topics DDS::Subscriber_var bit_subscriber = participant->get_builtin_subscriber(); DDS::DataReader_var pub_loc_dr = bit_subscriber->lookup_datareader(OpenDDS::DCPS::BUILT_IN_PARTICIPANT_LOCATION_TOPIC); @@ -239,7 +229,7 @@ int ACE_TMAIN(int argc, ACE_TCHAR *argv[]) ACE_OS::exit(EXIT_FAILURE); } - ParticipantLocationListenerImpl* listener = new ParticipantLocationListenerImpl("Publisher", no_ice, ipv6, participants_done_callback); + BitListener* listener = new BitListener("Publisher", no_ice, ipv6, participants_done_callback); DDS::DataReaderListener_var listener_var(listener); CORBA::Long retcode = @@ -254,47 +244,61 @@ int ACE_TMAIN(int argc, ACE_TCHAR *argv[]) } // No need to call on_data_available since subscriber doesn't exist. - // Create Subscriber DomainParticipant + DDS::DataReader_var part_dr = bit_subscriber->lookup_datareader(OpenDDS::DCPS::BUILT_IN_PARTICIPANT_TOPIC); + if (!part_dr) { + ACE_ERROR((LM_ERROR, + ACE_TEXT("%N:%l main()") + ACE_TEXT(" ERROR: Could not get %C DataReader\n"), + OpenDDS::DCPS::BUILT_IN_PARTICIPANT_TOPIC)); + ACE_OS::exit(EXIT_FAILURE); + } + + retcode = part_dr->set_listener(listener, OpenDDS::DCPS::DEFAULT_STATUS_MASK); + if (retcode != DDS::RETCODE_OK) { + ACE_ERROR((LM_ERROR, + ACE_TEXT("%N:%l main()") + ACE_TEXT(" ERROR: set_listener for %C failed\n"), + OpenDDS::DCPS::BUILT_IN_PARTICIPANT_TOPIC)); + ACE_OS::exit(EXIT_FAILURE); + } + std::cout << "Starting subscriber" << std::endl; DDS::DomainParticipant_var sub_participant = dpf->create_participant(42, - sub_qos, - DDS::DomainParticipantListener::_nil(), - OpenDDS::DCPS::DEFAULT_STATUS_MASK); + sub_qos, + DDS::DomainParticipantListener::_nil(), + OpenDDS::DCPS::DEFAULT_STATUS_MASK); - if (CORBA::is_nil(sub_participant.in())) { + if (!sub_participant) { ACE_ERROR_RETURN((LM_ERROR, ACE_TEXT("%N:%l: main()") ACE_TEXT(" ERROR: subscriber create_participant failed!\n")), EXIT_FAILURE); } - // Register Type (Messenger::Message) Messenger::MessageTypeSupport_var sub_ts = new Messenger::MessageTypeSupportImpl(); - if (sub_ts->register_type(sub_participant.in(), "") != DDS::RETCODE_OK) { + if (sub_ts->register_type(sub_participant, "") != DDS::RETCODE_OK) { ACE_ERROR_RETURN((LM_ERROR, ACE_TEXT("%N:%l main()") ACE_TEXT(" ERROR: register_type() failed!\n")), EXIT_FAILURE); } - // Create Topic (Movie Discussion List) CORBA::String_var sub_type_name = sub_ts->get_type_name(); DDS::Topic_var sub_topic = sub_participant->create_topic("Movie Discussion List", - sub_type_name.in(), - TOPIC_QOS_DEFAULT, - DDS::TopicListener::_nil(), - OpenDDS::DCPS::DEFAULT_STATUS_MASK); + sub_type_name, + TOPIC_QOS_DEFAULT, + DDS::TopicListener::_nil(), + OpenDDS::DCPS::DEFAULT_STATUS_MASK); - if (CORBA::is_nil(sub_topic.in())) { + if (!sub_topic) { ACE_ERROR_RETURN((LM_ERROR, ACE_TEXT("%N:%l main()") ACE_TEXT(" ERROR: create_topic() failed!\n")), EXIT_FAILURE); } - // Create Subscriber DDS::SubscriberQos subscriber_qos; sub_participant->get_default_subscriber_qos(subscriber_qos); subscriber_qos.partition.name.length(1); @@ -305,7 +309,7 @@ int ACE_TMAIN(int argc, ACE_TCHAR *argv[]) DDS::SubscriberListener::_nil(), OpenDDS::DCPS::DEFAULT_STATUS_MASK); - if (CORBA::is_nil(sub.in())) { + if (!sub) { ACE_ERROR_RETURN((LM_ERROR, ACE_TEXT("%N:%l main()") ACE_TEXT(" ERROR: create_subscriber() failed!\n")), EXIT_FAILURE); @@ -319,18 +323,17 @@ int ACE_TMAIN(int argc, ACE_TCHAR *argv[]) dr_qos.reliability.kind = DDS::RELIABLE_RELIABILITY_QOS; DDS::DataReader_var reader = - sub->create_datareader(sub_topic.in(), + sub->create_datareader(sub_topic, dr_qos, 0, OpenDDS::DCPS::DEFAULT_STATUS_MASK); - if (CORBA::is_nil(reader.in())) { + if (!reader) { ACE_ERROR_RETURN((LM_ERROR, ACE_TEXT("%N:%l main()") ACE_TEXT(" ERROR: create_datareader() failed!\n")), EXIT_FAILURE); } - // Get the Built-In Subscriber for Built-In Topics DDS::Subscriber_var sub_bit_subscriber = sub_participant->get_builtin_subscriber(); DDS::DataReader_var sub_loc_dr = sub_bit_subscriber->lookup_datareader(OpenDDS::DCPS::BUILT_IN_PARTICIPANT_LOCATION_TOPIC); @@ -342,7 +345,7 @@ int ACE_TMAIN(int argc, ACE_TCHAR *argv[]) ACE_OS::exit(EXIT_FAILURE); } - ParticipantLocationListenerImpl* sub_listener = new ParticipantLocationListenerImpl("Subscriber", no_ice, ipv6, participants_done_callback); + BitListener* sub_listener = new BitListener("Subscriber", no_ice, ipv6, participants_done_callback); DDS::DataReaderListener_var sub_listener_var(sub_listener); retcode = @@ -359,6 +362,27 @@ int ACE_TMAIN(int argc, ACE_TCHAR *argv[]) // Call on_data_available in case there are samples which are waiting sub_listener->on_data_available(sub_loc_dr); + DDS::DataReader_var sub_part_dr = sub_bit_subscriber->lookup_datareader(OpenDDS::DCPS::BUILT_IN_PARTICIPANT_TOPIC); + if (!sub_loc_dr) { + ACE_ERROR((LM_ERROR, + ACE_TEXT("%N:%l main()") + ACE_TEXT(" ERROR: Could not get %C DataReader\n"), + OpenDDS::DCPS::BUILT_IN_PARTICIPANT_TOPIC)); + ACE_OS::exit(EXIT_FAILURE); + } + + retcode = sub_part_dr->set_listener(sub_listener, OpenDDS::DCPS::DEFAULT_STATUS_MASK); + if (retcode != DDS::RETCODE_OK) { + ACE_ERROR((LM_ERROR, + ACE_TEXT("%N:%l main()") + ACE_TEXT(" ERROR: set_listener for %C failed\n"), + OpenDDS::DCPS::BUILT_IN_PARTICIPANT_TOPIC)); + ACE_OS::exit(EXIT_FAILURE); + } + + // Call on_data_available in case there are samples which are waiting + sub_listener->on_data_available(sub_part_dr); + while (participants_done != 2) { participants_done_cond.wait(); } @@ -378,7 +402,6 @@ int ACE_TMAIN(int argc, ACE_TCHAR *argv[]) status = EXIT_FAILURE; } - // Clean-up! ACE_DEBUG((LM_DEBUG, ACE_TEXT("%N:%l main()") ACE_TEXT(" publisher participant deleting contained entities\n"))); @@ -390,8 +413,8 @@ int ACE_TMAIN(int argc, ACE_TCHAR *argv[]) ACE_DEBUG((LM_DEBUG, ACE_TEXT("%N:%l main()") ACE_TEXT(" domain participant factory deleting participants\n"))); - dpf->delete_participant(participant.in()); - dpf->delete_participant(sub_participant.in()); + dpf->delete_participant(participant); + dpf->delete_participant(sub_participant); ACE_DEBUG((LM_DEBUG, ACE_TEXT("%N:%l main()") ACE_TEXT(" shutdown service participant\n"))); @@ -400,8 +423,7 @@ int ACE_TMAIN(int argc, ACE_TCHAR *argv[]) } catch (const CORBA::Exception& e) { e._tao_print_exception("Exception caught in main():"); status = EXIT_FAILURE; - } catch (const OpenDDS::DCPS::Transport::Exception& ex) { - ACE_UNUSED_ARG(ex); + } catch (const OpenDDS::DCPS::Transport::Exception&) { ACE_ERROR((LM_ERROR, ACE_TEXT("%N:%l main()") ACE_TEXT(" ERROR: Transport Exception\n")));