Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[21711] DataWriter/Reader get_matched_publication/subscription...() Feature implementation #5284

Merged
merged 10 commits into from
Oct 9, 2024
12 changes: 8 additions & 4 deletions include/fastdds/dds/publisher/DataWriter.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -517,9 +517,9 @@ class DataWriter : public DomainEntity
*
* @param[out] subscription_data subscription data struct
* @param subscription_handle InstanceHandle_t of the subscription
* @return RETCODE_OK
* @return RETCODE_BAD_PARAMETER if the DataWriter is not matched with
* the given subscription handle, RETCODE_OK otherwise.
*
* @warning Not supported yet. Currently returns RETCODE_UNSUPPORTED
*/
RTPS_DllAPI ReturnCode_t get_matched_subscription_data(
builtin::SubscriptionBuiltinTopicData& subscription_data,
Expand All @@ -529,16 +529,20 @@ class DataWriter : public DomainEntity
* @brief Fills the given vector with the InstanceHandle_t of matched DataReaders
*
* @param[out] subscription_handles Vector where the InstanceHandle_t are returned
* @return RETCODE_OK
* @return RETCODE_OK if the operation succeeds.
*
* @note Returning an empty list is not an error, it returns RETCODE_OK.
*
* @warning Not supported yet. Currently returns RETCODE_UNSUPPORTED
*/
RTPS_DllAPI ReturnCode_t get_matched_subscriptions(
std::vector<InstanceHandle_t>& subscription_handles) const;

#ifndef DOXYGEN_SHOULD_SKIP_THIS
FASTDDS_DEPRECATED_UNTIL(3, "eprosima::fastdds::dds:DataWriter::get_matched_subscriptions()",
"In favor of version using std::vector<fastrtps::rtps::InstanceHandle_t>.")
/**
* @note User is responsible for the memory deallocation of the returned vector.
*/
RTPS_DllAPI ReturnCode_t get_matched_subscriptions(
std::vector<InstanceHandle_t*>& subscription_handles) const;
#endif // DOXYGEN_SHOULD_SKIP_THIS
Expand Down
9 changes: 5 additions & 4 deletions include/fastdds/dds/subscriber/DataReader.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -977,9 +977,9 @@ class DataReader : public DomainEntity
*
* @param[out] publication_data publication data struct
* @param publication_handle InstanceHandle_t of the publication
* @return RETCODE_OK
* @return RETCODE_BAD_PARAMETER if the DataReader is not matched with
* the given publication handle, RETCODE_OK otherwise.
*
* @warning Not supported yet. Currently returns RETCODE_UNSUPPORTED
*/
RTPS_DllAPI ReturnCode_t get_matched_publication_data(
builtin::PublicationBuiltinTopicData& publication_data,
Expand All @@ -989,9 +989,10 @@ class DataReader : public DomainEntity
* @brief Fills the given vector with the InstanceHandle_t of matched DataReaders
*
* @param[out] publication_handles Vector where the InstanceHandle_t are returned
* @return RETCODE_OK
* @return RETCODE_OK if the operation succeeds.
*
* @note Returning an empty list is not an error, it returns RETCODE_OK.
*
* @warning Not supported yet. Currently returns RETCODE_UNSUPPORTED
*/
RTPS_DllAPI ReturnCode_t get_matched_publications(
std::vector<InstanceHandle_t>& publication_handles) const;
Expand Down
8 changes: 4 additions & 4 deletions include/fastdds/rtps/participant/RTPSParticipant.h
Original file line number Diff line number Diff line change
Expand Up @@ -309,10 +309,10 @@ class RTPS_DllAPI RTPSParticipant
std::vector<fastdds::rtps::TransportNetmaskFilterInfo> get_netmask_filter_info() const;

/**
* @brief Fills the provided fastdds::dds::builtin::PublicationBuiltinTopicData with the information of the
* @brief Fills the provided publication data with the information of the
* writer identified by writer_guid.
*
* @param[out] data fastdds::dds::builtin::PublicationBuiltinTopicData to fill.
* @param[out] data publication data to fill.
* @param[in] writer_guid GUID of the writer to get the information from.
* @return True if the writer was found and the data was filled.
*/
Expand All @@ -321,10 +321,10 @@ class RTPS_DllAPI RTPSParticipant
const GUID_t& writer_guid) const;

/**
* @brief Fills the provided fastdds::dds::builtin::SubscriptionBuiltinTopicData with the information of the
* @brief Fills the provided subscription discovery data with the information of the
* reader identified by reader_guid.
*
* @param[out] data fastdds::dds::builtin::SubscriptionBuiltinTopicData to fill.
* @param[out] data subscription discovery data to fill.
* @param[in] reader_guid GUID of the reader to get the information from.
* @return True if the reader was found and the data was filled.
*/
Expand Down
4 changes: 2 additions & 2 deletions include/fastdds/rtps/reader/RTPSReader.h
Original file line number Diff line number Diff line change
Expand Up @@ -282,8 +282,8 @@ class RTPSReader
* @param[out] guids Vector to be filled with the GUIDs of the matched writers.
* @return True if the operation was successful.
*/
RTPS_DllAPI bool matched_writers_guids(
std::vector<GUID_t>& guids) const;
RTPS_DllAPI virtual bool matched_writers_guids(
std::vector<GUID_t>& guids) const = 0;

/*!
* @brief Returns there is a clean state with all Writers.
Expand Down
9 changes: 9 additions & 0 deletions include/fastdds/rtps/reader/StatefulReader.h
Original file line number Diff line number Diff line change
Expand Up @@ -328,6 +328,15 @@ class StatefulReader : public RTPSReader
WriterProxy* writer,
bool mark_as_read = true) override;

/**
* @brief Fills the provided vector with the GUIDs of the matched writers.
*
* @param[out] guids Vector to be filled with the GUIDs of the matched writers.
* @return True if the operation was successful.
*/
bool matched_writers_guids(
std::vector<GUID_t>& guids) const override;

#ifdef FASTDDS_STATISTICS
bool get_connections(
fastdds::statistics::rtps::ConnectionList& connection_list) override;
Expand Down
9 changes: 9 additions & 0 deletions include/fastdds/rtps/reader/StatelessReader.h
Original file line number Diff line number Diff line change
Expand Up @@ -253,6 +253,15 @@ class StatelessReader : public RTPSReader
WriterProxy* writer,
bool mark_as_read = true) override;

/**
* @brief Fills the provided vector with the GUIDs of the matched writers.
*
* @param[out] guids Vector to be filled with the GUIDs of the matched writers.
* @return True if the operation was successful.
*/
bool matched_writers_guids(
std::vector<GUID_t>& guids) const override;

#ifdef FASTDDS_STATISTICS
bool get_connections(
fastdds::statistics::rtps::ConnectionList& connection_list) override;
Expand Down
4 changes: 2 additions & 2 deletions include/fastdds/rtps/writer/RTPSWriter.h
Original file line number Diff line number Diff line change
Expand Up @@ -323,8 +323,8 @@ class RTPSWriter
* @param[out] guids Vector to be filled with the GUIDs of the matched readers.
* @return True if the operation was successful.
*/
RTPS_DllAPI bool matched_readers_guids(
std::vector<GUID_t>& guids) const;
RTPS_DllAPI virtual bool matched_readers_guids(
std::vector<GUID_t>& guids) const = 0;

/**
* Tries to remove a change waiting a maximum of the provided microseconds.
Expand Down
9 changes: 9 additions & 0 deletions include/fastdds/rtps/writer/StatefulWriter.h
Original file line number Diff line number Diff line change
Expand Up @@ -448,6 +448,15 @@ class StatefulWriter : public RTPSWriter
return locator_selector_async_;
}

/**
* @brief Fills the provided vector with the GUIDs of the matched readers.
*
* @param[out] guids Vector to be filled with the GUIDs of the matched readers.
* @return True if the operation was successful.
*/
bool matched_readers_guids(
std::vector<GUID_t>& guids) const override;

#ifdef FASTDDS_STATISTICS
bool get_connections(
fastdds::statistics::rtps::ConnectionList& connection_list) override;
Expand Down
9 changes: 9 additions & 0 deletions include/fastdds/rtps/writer/StatelessWriter.h
Original file line number Diff line number Diff line change
Expand Up @@ -241,6 +241,15 @@ class StatelessWriter : public RTPSWriter
return locator_selector_;
}

/**
* @brief Fills the provided vector with the GUIDs of the matched readers.
*
* @param[out] guids Vector to be filled with the GUIDs of the matched readers.
* @return True if the operation was successful.
*/
bool matched_readers_guids(
std::vector<GUID_t>& guids) const override;

#ifdef FASTDDS_STATISTICS
bool get_connections(
fastdds::statistics::rtps::ConnectionList& connection_list) override;
Expand Down
1 change: 1 addition & 0 deletions src/cpp/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -203,6 +203,7 @@ set(${PROJECT_NAME}_source_files
rtps/builtin/liveliness/WLP.cpp
rtps/builtin/liveliness/WLPListener.cpp
rtps/builtin/data/ParticipantProxyData.cpp
rtps/builtin/data/ProxyDataConverters.cpp
rtps/builtin/data/WriterProxyData.cpp
rtps/builtin/data/ReaderProxyData.cpp
rtps/flowcontrol/ThroughputControllerDescriptor.cpp
Expand Down
19 changes: 3 additions & 16 deletions src/cpp/fastdds/publisher/DataWriter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -302,32 +302,19 @@ ReturnCode_t DataWriter::get_matched_subscription_data(
builtin::SubscriptionBuiltinTopicData& subscription_data,
const InstanceHandle_t& subscription_handle) const
{
static_cast<void> (subscription_data);
static_cast<void> (subscription_handle);
return ReturnCode_t::RETCODE_UNSUPPORTED;
/*
return impl_->get_matched_subscription_data(subscription_data, subscription_handle);
*/
return impl_->get_matched_subscription_data(subscription_data, subscription_handle);
}

ReturnCode_t DataWriter::get_matched_subscriptions(
std::vector<InstanceHandle_t>& subscription_handles) const
{
static_cast<void> (subscription_handles);
return ReturnCode_t::RETCODE_UNSUPPORTED;
/*
return impl_->get_matched_subscription_data(subscription_handles);
*/
return impl_->get_matched_subscriptions(subscription_handles);
}

ReturnCode_t DataWriter::get_matched_subscriptions(
std::vector<InstanceHandle_t*>& subscription_handles) const
{
static_cast<void> (subscription_handles);
return ReturnCode_t::RETCODE_UNSUPPORTED;
/*
return impl_->get_matched_subscription_data(subscription_handles);
*/
return impl_->get_matched_subscriptions(subscription_handles);
}

ReturnCode_t DataWriter::clear_history(
Expand Down
63 changes: 63 additions & 0 deletions src/cpp/fastdds/publisher/DataWriterImpl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2228,6 +2228,69 @@ void DataWriterImpl::filter_is_being_removed(
}
}

ReturnCode_t DataWriterImpl::get_matched_subscription_data(
builtin::SubscriptionBuiltinTopicData& subscription_data,
const fastrtps::rtps::InstanceHandle_t& subscription_handle) const
{
fastrtps::types::ReturnCode_t ret = ReturnCode_t::RETCODE_BAD_PARAMETER;
GUID_t reader_guid = iHandle2GUID(subscription_handle);

if (writer_ && writer_->matched_reader_is_matched(reader_guid))
{
if (publisher_)
{
RTPSParticipant* rtps_participant = publisher_->rtps_participant();
if (rtps_participant &&
rtps_participant->get_subscription_info(subscription_data, reader_guid))
{
ret = ReturnCode_t::RETCODE_OK;
}
}
}

return ret;
}

ReturnCode_t DataWriterImpl::get_matched_subscriptions(
std::vector<InstanceHandle_t>& subscription_handles) const
{
ReturnCode_t ret = ReturnCode_t::RETCODE_ERROR;
std::vector<GUID_t> matched_reader_guids;
subscription_handles.clear();

if (writer_ && writer_->matched_readers_guids(matched_reader_guids))
{
for (const GUID_t& guid : matched_reader_guids)
{
subscription_handles.emplace_back(InstanceHandle_t(guid));
}
ret = ReturnCode_t::RETCODE_OK;
}

return ret;
}

ReturnCode_t DataWriterImpl::get_matched_subscriptions(
std::vector<InstanceHandle_t*>& subscription_handles) const
{
ReturnCode_t ret = ReturnCode_t::RETCODE_ERROR;
std::vector<GUID_t> matched_reader_guids;
subscription_handles.clear();

if (writer_ && writer_->matched_readers_guids(matched_reader_guids))
{
for (const GUID_t& guid : matched_reader_guids)
{
// Note: user is responsible for deleting the InstanceHandle_t objects
subscription_handles.push_back(new InstanceHandle_t(guid));
}

ret = ReturnCode_t::RETCODE_OK;
}

return ret;
}

bool DataWriterImpl::is_relevant(
const fastrtps::rtps::CacheChange_t& change,
const fastrtps::rtps::GUID_t& reader_guid) const
Expand Down
28 changes: 28 additions & 0 deletions src/cpp/fastdds/publisher/DataWriterImpl.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -389,6 +389,34 @@ class DataWriterImpl : protected rtps::IReaderDataFilter
void filter_is_being_removed(
const char* filter_class_name);

/**
* @brief Retrieves in a subscription associated with the DataWriter
*
* @param[out] subscription_data subscription data struct
* @param subscription_handle InstanceHandle_t of the subscription
* @return RETCODE_BAD_PARAMETER if the DataWriter is not matched with
* the given subscription handle, RETCODE_OK otherwise.
*
*/
ReturnCode_t get_matched_subscription_data(
builtin::SubscriptionBuiltinTopicData& subscription_data,
const InstanceHandle_t& subscription_handle) const;

/**
* @brief Fills the given vector with the InstanceHandle_t of matched DataReaders
*
* @param[out] subscription_handles Vector where the InstanceHandle_t are returned
* @return RETCODE_OK if the operation succeeds.
*
* @note Returning an empty list is not an error, it returns RETCODE_OK.
*
*/
ReturnCode_t get_matched_subscriptions(
std::vector<InstanceHandle_t>& subscription_handles) const;

ReturnCode_t get_matched_subscriptions(
std::vector<InstanceHandle_t*>& subscription_handles) const;

protected:

using IChangePool = eprosima::fastrtps::rtps::IChangePool;
Expand Down
13 changes: 2 additions & 11 deletions src/cpp/fastdds/subscriber/DataReader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -395,22 +395,13 @@ ReturnCode_t DataReader::get_matched_publication_data(
builtin::PublicationBuiltinTopicData& publication_data,
const fastrtps::rtps::InstanceHandle_t& publication_handle) const
{
static_cast<void> (publication_data);
static_cast<void> (publication_handle);
return ReturnCode_t::RETCODE_UNSUPPORTED;
/*
return impl_->get_matched_publication_data(publication_data, publication_handle);
*/
return impl_->get_matched_publication_data(publication_data, publication_handle);
}

ReturnCode_t DataReader::get_matched_publications(
std::vector<InstanceHandle_t>& publication_handles) const
{
static_cast<void> (publication_handles);
return ReturnCode_t::RETCODE_UNSUPPORTED;
/*
return impl_->get_matched_publication_data(publication_handles);
*/
return impl_->get_matched_publications(publication_handles);
}

ReadCondition* DataReader::create_readcondition(
Expand Down
42 changes: 42 additions & 0 deletions src/cpp/fastdds/subscriber/DataReaderImpl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1198,6 +1198,48 @@ ReturnCode_t DataReaderImpl::get_subscription_matched_status(
return ReturnCode_t::RETCODE_OK;
}

ReturnCode_t DataReaderImpl::get_matched_publication_data(
builtin::PublicationBuiltinTopicData& publication_data,
const fastrtps::rtps::InstanceHandle_t& publication_handle) const
{
fastrtps::types::ReturnCode_t ret = ReturnCode_t::RETCODE_BAD_PARAMETER;
GUID_t writer_guid = iHandle2GUID(publication_handle);

if (reader_ && reader_->matched_writer_is_matched(writer_guid))
{
if (subscriber_)
{
RTPSParticipant* rtps_participant = subscriber_->rtps_participant();
if (rtps_participant &&
rtps_participant->get_publication_info(publication_data, writer_guid))
{
ret = ReturnCode_t::RETCODE_OK;
}
}
}

return ret;
}

ReturnCode_t DataReaderImpl::get_matched_publications(
std::vector<InstanceHandle_t>& publication_handles) const
{
ReturnCode_t ret = ReturnCode_t::RETCODE_ERROR;
std::vector<GUID_t> matched_writers_guids;
publication_handles.clear();

if (reader_ && reader_->matched_writers_guids(matched_writers_guids))
{
for (const GUID_t& guid : matched_writers_guids)
{
publication_handles.emplace_back(InstanceHandle_t(guid));
}
ret = ReturnCode_t::RETCODE_OK;
}

return ret;
}

bool DataReaderImpl::deadline_timer_reschedule()
{
assert(qos_.deadline().period != c_TimeInfinite);
Expand Down
Loading
Loading