Skip to content

Commit

Permalink
Merge pull request #25 from doug1234/GetAddressInfo
Browse files Browse the repository at this point in the history
Added functions to get address information from readers/writers and p…
  • Loading branch information
jrw972 authored Feb 29, 2024
2 parents 3290f93 + df15500 commit 8d1a4f9
Show file tree
Hide file tree
Showing 3 changed files with 117 additions and 7 deletions.
45 changes: 45 additions & 0 deletions src/dds_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,9 @@
#include <dds/DCPS/Service_Participant.h>
#include <dds/DCPS/RTPS/RtpsDiscovery.h>
#include <dds/DCPS/ServiceEventDispatcher.h>
#include <dds/DCPS/DataWriterImpl.h>
#include <dds/DCPS/DataReaderImpl.h>
#include <dds/DCPS/LogAddr.h>

#ifdef WIN32
#pragma warning(pop)
Expand All @@ -35,6 +38,26 @@
#include <dds/DCPS/transport/rtps_udp/RtpsUdp.h>
#endif

//Helper function to get the address list for a sequence
std::string GetAddressInfo(const OpenDDS::DCPS::TransportLocatorSeq& info)
{
std::string strAddress;
for (unsigned int idx = 0; idx != info.length(); ++idx) {
const auto locators = OpenDDS::RTPS::transport_locator_to_locator_seq(info[idx]);
for (unsigned int idx2 = 0; idx2 != locators.length(); ++idx2) {
ACE_INET_Addr addr;
if (locator_to_address(addr, locators[idx2], false) == 0) {
if (!strAddress.empty()) {
strAddress += ",";
}
strAddress += OpenDDS::DCPS::LogAddr(addr).c_str();
}
}
}

return strAddress;
}

std::map<int, int> g_transportInstances;

//------------------------------------------------------------------------------
Expand Down Expand Up @@ -1149,6 +1172,28 @@ DDS::DataReader_var DDSManager::getReader(const std::string& topicName,
}


//------------------------------------------------------------------------------
std::string DDSManager::getWriterAddress(const std::string& topicName) const
{
DDS::DataWriter_var writer = getWriter(topicName);
auto dwi = dynamic_cast<OpenDDS::DCPS::DataWriterImpl*>(writer.in());
if (dwi == nullptr) {
return "Invalid Writer";
}
return GetAddressInfo(dwi->connection_info());
}

//------------------------------------------------------------------------------
std::string DDSManager::getReaderAddress(const std::string& topicName, const std::string& readerName) const
{
DDS::DataReader_var reader = getReader(topicName, readerName);
auto dri = dynamic_cast<OpenDDS::DCPS::DataReaderImpl*>(reader.in());
if (dri == nullptr) {
return "Invalid Reader";
}
return GetAddressInfo(dri->connection_info());
}

//------------------------------------------------------------------------------
DDS::DataWriter_var DDSManager::getWriter(const std::string& topicName) const
{
Expand Down
15 changes: 15 additions & 0 deletions src/dds_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -323,6 +323,21 @@ class DDSManager
DDS::DataReader_var getReader(const std::string& topicName,
const std::string& readerName) const;

/**
* @brief Get the addresses associated with a topic.
* @param[in] topicName The name of the topic.
* @return String containing the addresses used by the writer.
*/
std::string getWriterAddress(const std::string& topicName) const;

/**
* @brief Get the addresses associated with a topic.
* @param[in] topicName The name of the topic.
* @param[in] readerName The name of the reader.
* @return String containing the addresses used by the reader.
*/
std::string getReaderAddress(const std::string& topicName, const std::string& readerName) const;

/**
* @brief Get the data writer associated with a topic.
* @param[in] topicName The name of the topic.
Expand Down
64 changes: 57 additions & 7 deletions src/dds_simple.h
Original file line number Diff line number Diff line change
Expand Up @@ -242,22 +242,22 @@ class DDSSimpleManager : public DDSManager
// This function waits until it finds one (or more) Subscriber of topic T OR secondsToWait seconds expires.
// Useful when making sure your messages are being Published on a certain topic.
template <class T>
bool WaitForSubscriber(std::chrono::milliseconds timeToWait = std::chrono::seconds(2))
bool WaitForSubscriber(std::chrono::milliseconds timeToWait = std::chrono::seconds(15))
{
return GetNumberOfSubscribers<T>(1, timeToWait) > 0;
}

//Call WaitForPublisher(0) if you have already been discovered and want to see if you've lost all connection to publishers.
// readerName is not required unless user specifies a readerName when creating their Subscriber / Callback
template <class T>
bool WaitForPublisher(std::chrono::milliseconds timeToWait = std::chrono::seconds(2), std::string readerName = "")
bool WaitForPublisher(std::chrono::milliseconds timeToWait = std::chrono::seconds(15), std::string readerName = "")
{
return GetNumberOfPublishers<T>(1, timeToWait, readerName) > 0;
}

// Function that will wait until [max_wait] passes or until we find [min_count] number of Subscribers, whichever is faster
template<class T>
int GetNumberOfSubscribers(int min_count, std::chrono::milliseconds max_wait = std::chrono::seconds(2))
int GetNumberOfSubscribers(int min_count, std::chrono::milliseconds max_wait = std::chrono::seconds(15))
{
const std::chrono::milliseconds waitIncriment(100);
std::chrono::milliseconds timeWaited(0);
Expand Down Expand Up @@ -309,7 +309,8 @@ class DDSSimpleManager : public DDSManager
}
}

sstr << "Failed to find " << min_count << " Subscribers(s)... Only found " << pubStatus.current_count;
std::string addressInfo = getWriterAddress(temp);
sstr << "Failed to find " << min_count << " on " << addressInfo << ". Subscribers(s)... Only found " << pubStatus.current_count;
m_messageHandler(LogMessageType::DDS_INFO, sstr.str());

return pubStatus.current_count;
Expand All @@ -322,9 +323,31 @@ class DDSSimpleManager : public DDSManager
return 0;
}

template <class T>
std::string GetSubscriberAddress()
{
std::string topic_name = typeid(T).name();
try {
std::string temp;
{
decltype(m_sharedLock) lck(mutex_shr);
auto iter = m_pubMap.find(topic_name);
if (iter == m_pubMap.end()) {
return std::string("Invalid Publisher for ") + topic_name;
}

temp = iter->second;
}
return getWriterAddress(temp);
}
catch (...) {
}
return std::string("Invalid Publisher for ") + topic_name;
}

// Function that will wait until [max_wait] passes or until we find [min_count] number of Publishers, whichever is faster
template <class T>
int GetNumberOfPublishers(int min_count, std::chrono::milliseconds max_wait = std::chrono::seconds(2), std::string reader_name = "")
int GetNumberOfPublishers(int min_count, std::chrono::milliseconds max_wait = std::chrono::seconds(15), std::string reader_name = "")
{
const std::chrono::milliseconds waitIncriment(100);
std::chrono::milliseconds timeWaited(0);
Expand All @@ -349,7 +372,9 @@ class DDSSimpleManager : public DDSManager
std::string temp = iter->second;
lck.unlock();

auto dr = getReader(temp, GenerateReaderName(temp, reader_name)); // Reader name == Topic name + "Reader", unless user-specified
auto genReaderName = GenerateReaderName(temp, reader_name);

auto dr = getReader(temp, genReaderName); // Reader name == Topic name + "Reader", unless user-specified

if (dr == nullptr) {
sstr << "No reader found for: " << topic_name << ".";
Expand All @@ -375,7 +400,9 @@ class DDSSimpleManager : public DDSManager
return subStatus.current_count;
}
}
sstr << "Failed to find " << min_count << " Publisher(s)... Only found " << subStatus.current_count;

std::string addressInfo = getReaderAddress(temp, genReaderName);
sstr << "Failed to find " << min_count << " on " << addressInfo << ". Publisher(s)... Only found " << subStatus.current_count;
m_messageHandler(LogMessageType::DDS_INFO, sstr.str());

return subStatus.current_count;
Expand All @@ -389,6 +416,29 @@ class DDSSimpleManager : public DDSManager
return 0;
}

template <class T>
std::string GetPublisherAddress(std::string reader_name = "")
{
std::string topic_name = typeid(T).name();
try {
std::string temp;
{
decltype(m_sharedLock) lck(mutex_shr);
auto iter = m_subMap.find(topic_name);
if (iter == m_subMap.end()) {
return std::string("Invalid Subscriber for ") + topic_name;
}
temp = iter->second;
}

return getReaderAddress(temp, GenerateReaderName(temp, reader_name)); // Reader name == Topic name + "Reader", unless user-specified
}
catch (...) {
}

return std::string("Invalid subscriber for ") + topic_name;
}

void EventID(int id) { m_eventID = id; }

int EventID() { return m_eventID; }
Expand Down

0 comments on commit 8d1a4f9

Please sign in to comment.