Skip to content

Commit

Permalink
DataWriter/Reader get_matched_publication/subscription...() Feature…
Browse files Browse the repository at this point in the history
… implementation (#5284)

* Refs #21711: RTPSReader matched_writers_guids() implementation

Signed-off-by: Mario Dominguez <[email protected]>

* Refs #21711: RTPSWriter matched_readers_guids() implementation

Signed-off-by: Mario Dominguez <[email protected]>

* Refs #21711: ProxyDataConverters

Signed-off-by: Mario Dominguez <[email protected]>

* Refs #21711: RTPSParticipant get_subscription/publication_info() implementation

Signed-off-by: Mario Dominguez <[email protected]>

* Refs #21711: DataReader get_matched_publication_data() get_matched_publications() implementation

Signed-off-by: Mario Dominguez <[email protected]>

* Refs #21711: DataWriter get_matched_subscription_data() get_matched_subscriptions() implementation

Signed-off-by: Mario Dominguez <[email protected]>

* Refs #21711: versions.md

Signed-off-by: Mario Dominguez <[email protected]>

* Refs #21711: Update unsupported api tests and fastrtps_deprecated api

Signed-off-by: Mario Dominguez <[email protected]>

* Refs #21711: Apply Ricardo's rev

Signed-off-by: Mario Dominguez <[email protected]>

* Refs #21711: doxygen docs test fix

Signed-off-by: Mario Dominguez <[email protected]>

---------

Signed-off-by: Mario Dominguez <[email protected]>
  • Loading branch information
Mario-DL authored Oct 9, 2024
1 parent 9729667 commit 065776a
Show file tree
Hide file tree
Showing 35 changed files with 723 additions and 83 deletions.
12 changes: 8 additions & 4 deletions include/fastdds/dds/publisher/DataWriter.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -517,9 +517,9 @@ class DataWriter : public DomainEntity
*
* @param[out] subscription_data subscription data struct
* @param subscription_handle InstanceHandle_t of the subscription
* @return RETCODE_OK
* @return RETCODE_BAD_PARAMETER if the DataWriter is not matched with
* the given subscription handle, RETCODE_OK otherwise.
*
* @warning Not supported yet. Currently returns RETCODE_UNSUPPORTED
*/
RTPS_DllAPI ReturnCode_t get_matched_subscription_data(
builtin::SubscriptionBuiltinTopicData& subscription_data,
Expand All @@ -529,16 +529,20 @@ class DataWriter : public DomainEntity
* @brief Fills the given vector with the InstanceHandle_t of matched DataReaders
*
* @param[out] subscription_handles Vector where the InstanceHandle_t are returned
* @return RETCODE_OK
* @return RETCODE_OK if the operation succeeds.
*
* @note Returning an empty list is not an error, it returns RETCODE_OK.
*
* @warning Not supported yet. Currently returns RETCODE_UNSUPPORTED
*/
RTPS_DllAPI ReturnCode_t get_matched_subscriptions(
std::vector<InstanceHandle_t>& subscription_handles) const;

#ifndef DOXYGEN_SHOULD_SKIP_THIS
FASTDDS_DEPRECATED_UNTIL(3, "eprosima::fastdds::dds:DataWriter::get_matched_subscriptions()",
"In favor of version using std::vector<fastrtps::rtps::InstanceHandle_t>.")
/**
* @note User is responsible for the memory deallocation of the returned vector.
*/
RTPS_DllAPI ReturnCode_t get_matched_subscriptions(
std::vector<InstanceHandle_t*>& subscription_handles) const;
#endif // DOXYGEN_SHOULD_SKIP_THIS
Expand Down
9 changes: 5 additions & 4 deletions include/fastdds/dds/subscriber/DataReader.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -977,9 +977,9 @@ class DataReader : public DomainEntity
*
* @param[out] publication_data publication data struct
* @param publication_handle InstanceHandle_t of the publication
* @return RETCODE_OK
* @return RETCODE_BAD_PARAMETER if the DataReader is not matched with
* the given publication handle, RETCODE_OK otherwise.
*
* @warning Not supported yet. Currently returns RETCODE_UNSUPPORTED
*/
RTPS_DllAPI ReturnCode_t get_matched_publication_data(
builtin::PublicationBuiltinTopicData& publication_data,
Expand All @@ -989,9 +989,10 @@ class DataReader : public DomainEntity
* @brief Fills the given vector with the InstanceHandle_t of matched DataReaders
*
* @param[out] publication_handles Vector where the InstanceHandle_t are returned
* @return RETCODE_OK
* @return RETCODE_OK if the operation succeeds.
*
* @note Returning an empty list is not an error, it returns RETCODE_OK.
*
* @warning Not supported yet. Currently returns RETCODE_UNSUPPORTED
*/
RTPS_DllAPI ReturnCode_t get_matched_publications(
std::vector<InstanceHandle_t>& publication_handles) const;
Expand Down
8 changes: 4 additions & 4 deletions include/fastdds/rtps/participant/RTPSParticipant.h
Original file line number Diff line number Diff line change
Expand Up @@ -309,10 +309,10 @@ class RTPS_DllAPI RTPSParticipant
std::vector<fastdds::rtps::TransportNetmaskFilterInfo> get_netmask_filter_info() const;

/**
* @brief Fills the provided fastdds::dds::builtin::PublicationBuiltinTopicData with the information of the
* @brief Fills the provided publication data with the information of the
* writer identified by writer_guid.
*
* @param[out] data fastdds::dds::builtin::PublicationBuiltinTopicData to fill.
* @param[out] data publication data to fill.
* @param[in] writer_guid GUID of the writer to get the information from.
* @return True if the writer was found and the data was filled.
*/
Expand All @@ -321,10 +321,10 @@ class RTPS_DllAPI RTPSParticipant
const GUID_t& writer_guid) const;

/**
* @brief Fills the provided fastdds::dds::builtin::SubscriptionBuiltinTopicData with the information of the
* @brief Fills the provided subscription discovery data with the information of the
* reader identified by reader_guid.
*
* @param[out] data fastdds::dds::builtin::SubscriptionBuiltinTopicData to fill.
* @param[out] data subscription discovery data to fill.
* @param[in] reader_guid GUID of the reader to get the information from.
* @return True if the reader was found and the data was filled.
*/
Expand Down
4 changes: 2 additions & 2 deletions include/fastdds/rtps/reader/RTPSReader.h
Original file line number Diff line number Diff line change
Expand Up @@ -282,8 +282,8 @@ class RTPSReader
* @param[out] guids Vector to be filled with the GUIDs of the matched writers.
* @return True if the operation was successful.
*/
RTPS_DllAPI bool matched_writers_guids(
std::vector<GUID_t>& guids) const;
RTPS_DllAPI virtual bool matched_writers_guids(
std::vector<GUID_t>& guids) const = 0;

/*!
* @brief Returns there is a clean state with all Writers.
Expand Down
9 changes: 9 additions & 0 deletions include/fastdds/rtps/reader/StatefulReader.h
Original file line number Diff line number Diff line change
Expand Up @@ -328,6 +328,15 @@ class StatefulReader : public RTPSReader
WriterProxy* writer,
bool mark_as_read = true) override;

/**
* @brief Fills the provided vector with the GUIDs of the matched writers.
*
* @param[out] guids Vector to be filled with the GUIDs of the matched writers.
* @return True if the operation was successful.
*/
bool matched_writers_guids(
std::vector<GUID_t>& guids) const override;

#ifdef FASTDDS_STATISTICS
bool get_connections(
fastdds::statistics::rtps::ConnectionList& connection_list) override;
Expand Down
9 changes: 9 additions & 0 deletions include/fastdds/rtps/reader/StatelessReader.h
Original file line number Diff line number Diff line change
Expand Up @@ -253,6 +253,15 @@ class StatelessReader : public RTPSReader
WriterProxy* writer,
bool mark_as_read = true) override;

/**
* @brief Fills the provided vector with the GUIDs of the matched writers.
*
* @param[out] guids Vector to be filled with the GUIDs of the matched writers.
* @return True if the operation was successful.
*/
bool matched_writers_guids(
std::vector<GUID_t>& guids) const override;

#ifdef FASTDDS_STATISTICS
bool get_connections(
fastdds::statistics::rtps::ConnectionList& connection_list) override;
Expand Down
4 changes: 2 additions & 2 deletions include/fastdds/rtps/writer/RTPSWriter.h
Original file line number Diff line number Diff line change
Expand Up @@ -323,8 +323,8 @@ class RTPSWriter
* @param[out] guids Vector to be filled with the GUIDs of the matched readers.
* @return True if the operation was successful.
*/
RTPS_DllAPI bool matched_readers_guids(
std::vector<GUID_t>& guids) const;
RTPS_DllAPI virtual bool matched_readers_guids(
std::vector<GUID_t>& guids) const = 0;

/**
* Tries to remove a change waiting a maximum of the provided microseconds.
Expand Down
9 changes: 9 additions & 0 deletions include/fastdds/rtps/writer/StatefulWriter.h
Original file line number Diff line number Diff line change
Expand Up @@ -448,6 +448,15 @@ class StatefulWriter : public RTPSWriter
return locator_selector_async_;
}

/**
* @brief Fills the provided vector with the GUIDs of the matched readers.
*
* @param[out] guids Vector to be filled with the GUIDs of the matched readers.
* @return True if the operation was successful.
*/
bool matched_readers_guids(
std::vector<GUID_t>& guids) const override;

#ifdef FASTDDS_STATISTICS
bool get_connections(
fastdds::statistics::rtps::ConnectionList& connection_list) override;
Expand Down
9 changes: 9 additions & 0 deletions include/fastdds/rtps/writer/StatelessWriter.h
Original file line number Diff line number Diff line change
Expand Up @@ -241,6 +241,15 @@ class StatelessWriter : public RTPSWriter
return locator_selector_;
}

/**
* @brief Fills the provided vector with the GUIDs of the matched readers.
*
* @param[out] guids Vector to be filled with the GUIDs of the matched readers.
* @return True if the operation was successful.
*/
bool matched_readers_guids(
std::vector<GUID_t>& guids) const override;

#ifdef FASTDDS_STATISTICS
bool get_connections(
fastdds::statistics::rtps::ConnectionList& connection_list) override;
Expand Down
1 change: 1 addition & 0 deletions src/cpp/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -203,6 +203,7 @@ set(${PROJECT_NAME}_source_files
rtps/builtin/liveliness/WLP.cpp
rtps/builtin/liveliness/WLPListener.cpp
rtps/builtin/data/ParticipantProxyData.cpp
rtps/builtin/data/ProxyDataConverters.cpp
rtps/builtin/data/WriterProxyData.cpp
rtps/builtin/data/ReaderProxyData.cpp
rtps/flowcontrol/ThroughputControllerDescriptor.cpp
Expand Down
19 changes: 3 additions & 16 deletions src/cpp/fastdds/publisher/DataWriter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -302,32 +302,19 @@ ReturnCode_t DataWriter::get_matched_subscription_data(
builtin::SubscriptionBuiltinTopicData& subscription_data,
const InstanceHandle_t& subscription_handle) const
{
static_cast<void> (subscription_data);
static_cast<void> (subscription_handle);
return ReturnCode_t::RETCODE_UNSUPPORTED;
/*
return impl_->get_matched_subscription_data(subscription_data, subscription_handle);
*/
return impl_->get_matched_subscription_data(subscription_data, subscription_handle);
}

ReturnCode_t DataWriter::get_matched_subscriptions(
std::vector<InstanceHandle_t>& subscription_handles) const
{
static_cast<void> (subscription_handles);
return ReturnCode_t::RETCODE_UNSUPPORTED;
/*
return impl_->get_matched_subscription_data(subscription_handles);
*/
return impl_->get_matched_subscriptions(subscription_handles);
}

ReturnCode_t DataWriter::get_matched_subscriptions(
std::vector<InstanceHandle_t*>& subscription_handles) const
{
static_cast<void> (subscription_handles);
return ReturnCode_t::RETCODE_UNSUPPORTED;
/*
return impl_->get_matched_subscription_data(subscription_handles);
*/
return impl_->get_matched_subscriptions(subscription_handles);
}

ReturnCode_t DataWriter::clear_history(
Expand Down
63 changes: 63 additions & 0 deletions src/cpp/fastdds/publisher/DataWriterImpl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2228,6 +2228,69 @@ void DataWriterImpl::filter_is_being_removed(
}
}

ReturnCode_t DataWriterImpl::get_matched_subscription_data(
builtin::SubscriptionBuiltinTopicData& subscription_data,
const fastrtps::rtps::InstanceHandle_t& subscription_handle) const
{
fastrtps::types::ReturnCode_t ret = ReturnCode_t::RETCODE_BAD_PARAMETER;
GUID_t reader_guid = iHandle2GUID(subscription_handle);

if (writer_ && writer_->matched_reader_is_matched(reader_guid))
{
if (publisher_)
{
RTPSParticipant* rtps_participant = publisher_->rtps_participant();
if (rtps_participant &&
rtps_participant->get_subscription_info(subscription_data, reader_guid))
{
ret = ReturnCode_t::RETCODE_OK;
}
}
}

return ret;
}

ReturnCode_t DataWriterImpl::get_matched_subscriptions(
std::vector<InstanceHandle_t>& subscription_handles) const
{
ReturnCode_t ret = ReturnCode_t::RETCODE_ERROR;
std::vector<GUID_t> matched_reader_guids;
subscription_handles.clear();

if (writer_ && writer_->matched_readers_guids(matched_reader_guids))
{
for (const GUID_t& guid : matched_reader_guids)
{
subscription_handles.emplace_back(InstanceHandle_t(guid));
}
ret = ReturnCode_t::RETCODE_OK;
}

return ret;
}

ReturnCode_t DataWriterImpl::get_matched_subscriptions(
std::vector<InstanceHandle_t*>& subscription_handles) const
{
ReturnCode_t ret = ReturnCode_t::RETCODE_ERROR;
std::vector<GUID_t> matched_reader_guids;
subscription_handles.clear();

if (writer_ && writer_->matched_readers_guids(matched_reader_guids))
{
for (const GUID_t& guid : matched_reader_guids)
{
// Note: user is responsible for deleting the InstanceHandle_t objects
subscription_handles.push_back(new InstanceHandle_t(guid));
}

ret = ReturnCode_t::RETCODE_OK;
}

return ret;
}

bool DataWriterImpl::is_relevant(
const fastrtps::rtps::CacheChange_t& change,
const fastrtps::rtps::GUID_t& reader_guid) const
Expand Down
28 changes: 28 additions & 0 deletions src/cpp/fastdds/publisher/DataWriterImpl.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -389,6 +389,34 @@ class DataWriterImpl : protected rtps::IReaderDataFilter
void filter_is_being_removed(
const char* filter_class_name);

/**
* @brief Retrieves in a subscription associated with the DataWriter
*
* @param[out] subscription_data subscription data struct
* @param subscription_handle InstanceHandle_t of the subscription
* @return RETCODE_BAD_PARAMETER if the DataWriter is not matched with
* the given subscription handle, RETCODE_OK otherwise.
*
*/
ReturnCode_t get_matched_subscription_data(
builtin::SubscriptionBuiltinTopicData& subscription_data,
const InstanceHandle_t& subscription_handle) const;

/**
* @brief Fills the given vector with the InstanceHandle_t of matched DataReaders
*
* @param[out] subscription_handles Vector where the InstanceHandle_t are returned
* @return RETCODE_OK if the operation succeeds.
*
* @note Returning an empty list is not an error, it returns RETCODE_OK.
*
*/
ReturnCode_t get_matched_subscriptions(
std::vector<InstanceHandle_t>& subscription_handles) const;

ReturnCode_t get_matched_subscriptions(
std::vector<InstanceHandle_t*>& subscription_handles) const;

protected:

using IChangePool = eprosima::fastrtps::rtps::IChangePool;
Expand Down
13 changes: 2 additions & 11 deletions src/cpp/fastdds/subscriber/DataReader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -395,22 +395,13 @@ ReturnCode_t DataReader::get_matched_publication_data(
builtin::PublicationBuiltinTopicData& publication_data,
const fastrtps::rtps::InstanceHandle_t& publication_handle) const
{
static_cast<void> (publication_data);
static_cast<void> (publication_handle);
return ReturnCode_t::RETCODE_UNSUPPORTED;
/*
return impl_->get_matched_publication_data(publication_data, publication_handle);
*/
return impl_->get_matched_publication_data(publication_data, publication_handle);
}

ReturnCode_t DataReader::get_matched_publications(
std::vector<InstanceHandle_t>& publication_handles) const
{
static_cast<void> (publication_handles);
return ReturnCode_t::RETCODE_UNSUPPORTED;
/*
return impl_->get_matched_publication_data(publication_handles);
*/
return impl_->get_matched_publications(publication_handles);
}

ReadCondition* DataReader::create_readcondition(
Expand Down
42 changes: 42 additions & 0 deletions src/cpp/fastdds/subscriber/DataReaderImpl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1198,6 +1198,48 @@ ReturnCode_t DataReaderImpl::get_subscription_matched_status(
return ReturnCode_t::RETCODE_OK;
}

ReturnCode_t DataReaderImpl::get_matched_publication_data(
builtin::PublicationBuiltinTopicData& publication_data,
const fastrtps::rtps::InstanceHandle_t& publication_handle) const
{
fastrtps::types::ReturnCode_t ret = ReturnCode_t::RETCODE_BAD_PARAMETER;
GUID_t writer_guid = iHandle2GUID(publication_handle);

if (reader_ && reader_->matched_writer_is_matched(writer_guid))
{
if (subscriber_)
{
RTPSParticipant* rtps_participant = subscriber_->rtps_participant();
if (rtps_participant &&
rtps_participant->get_publication_info(publication_data, writer_guid))
{
ret = ReturnCode_t::RETCODE_OK;
}
}
}

return ret;
}

ReturnCode_t DataReaderImpl::get_matched_publications(
std::vector<InstanceHandle_t>& publication_handles) const
{
ReturnCode_t ret = ReturnCode_t::RETCODE_ERROR;
std::vector<GUID_t> matched_writers_guids;
publication_handles.clear();

if (reader_ && reader_->matched_writers_guids(matched_writers_guids))
{
for (const GUID_t& guid : matched_writers_guids)
{
publication_handles.emplace_back(InstanceHandle_t(guid));
}
ret = ReturnCode_t::RETCODE_OK;
}

return ret;
}

bool DataReaderImpl::deadline_timer_reschedule()
{
assert(qos_.deadline().period != c_TimeInfinite);
Expand Down
Loading

0 comments on commit 065776a

Please sign in to comment.