Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[21808] DataWriter/Reader get_matched_publication/subscription() Tests & Feature 3.x #5312

Merged
merged 6 commits into from
Oct 25, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 24 additions & 0 deletions include/fastdds/rtps/participant/RTPSParticipant.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -291,6 +291,30 @@ class FASTDDS_EXPORTED_API RTPSParticipant
*/
std::vector<TransportNetmaskFilterInfo> get_netmask_filter_info() const;

/**
* @brief Fills the provided publication discovery data with the information of the
* writer identified by writer_guid.
*
* @param[out] data publication discovery data to fill.
* @param[in] writer_guid GUID of the writer to get the information from.
* @return True if the writer was found and the data was filled.
*/
bool get_publication_info(
fastdds::rtps::PublicationBuiltinTopicData& data,
const GUID_t& writer_guid) const;

/**
* @brief Fills the provided subscription discovery data with the information of the
* reader identified by reader_guid.
*
* @param[out] data subscription discovery data to fill.
* @param[in] reader_guid GUID of the reader to get the information from.
* @return True if the reader was found and the data was filled.
*/
bool get_subscription_info(
fastdds::rtps::SubscriptionBuiltinTopicData& data,
const GUID_t& reader_guid) const;

#if HAVE_SECURITY

/**
Expand Down
9 changes: 9 additions & 0 deletions include/fastdds/rtps/reader/RTPSReader.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,15 @@ class RTPSReader : public Endpoint
FASTDDS_EXPORTED_API virtual void set_content_filter(
eprosima::fastdds::rtps::IReaderDataFilter* filter) = 0;

/**
* @brief Fills the provided vector with the GUIDs of the matched writers.
*
* @param[out] guids Vector to be filled with the GUIDs of the matched writers.
* @return True if the operation was successful.
*/
FASTDDS_EXPORTED_API virtual bool matched_writers_guids(
std::vector<GUID_t>& guids) const = 0;

/**
* @brief Read the next unread CacheChange_t from the history.
*
Expand Down
9 changes: 9 additions & 0 deletions include/fastdds/rtps/writer/RTPSWriter.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -183,6 +183,15 @@ class RTPSWriter : public Endpoint
*/
FASTDDS_EXPORTED_API virtual bool get_disable_positive_acks() const = 0;

/**
* @brief Fills the provided vector with the GUIDs of the matched readers.
*
* @param[out] guids Vector to be filled with the GUIDs of the matched readers.
* @return True if the operation was successful.
*/
FASTDDS_EXPORTED_API virtual bool matched_readers_guids(
std::vector<GUID_t>& guids) const = 0;

#ifdef FASTDDS_STATISTICS

/**
Expand Down
13 changes: 2 additions & 11 deletions src/cpp/fastdds/publisher/DataWriter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -270,22 +270,13 @@ ReturnCode_t DataWriter::get_matched_subscription_data(
SubscriptionBuiltinTopicData& subscription_data,
const InstanceHandle_t& subscription_handle) const
{
static_cast<void> (subscription_data);
static_cast<void> (subscription_handle);
return RETCODE_UNSUPPORTED;
/*
return impl_->get_matched_subscription_data(subscription_data, subscription_handle);
*/
return impl_->get_matched_subscription_data(subscription_data, subscription_handle);
}

ReturnCode_t DataWriter::get_matched_subscriptions(
std::vector<InstanceHandle_t>& subscription_handles) const
{
static_cast<void> (subscription_handles);
return RETCODE_UNSUPPORTED;
/*
return impl_->get_matched_subscription_data(subscription_handles);
*/
return impl_->get_matched_subscriptions(subscription_handles);
}

ReturnCode_t DataWriter::clear_history(
Expand Down
42 changes: 42 additions & 0 deletions src/cpp/fastdds/publisher/DataWriterImpl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2335,6 +2335,48 @@ void DataWriterImpl::filter_is_being_removed(
}
}

ReturnCode_t DataWriterImpl::get_matched_subscription_data(
SubscriptionBuiltinTopicData& subscription_data,
const InstanceHandle_t& subscription_handle) const
{
ReturnCode_t ret = RETCODE_BAD_PARAMETER;
fastdds::rtps::GUID_t reader_guid = iHandle2GUID(subscription_handle);

if (writer_ && writer_->matched_reader_is_matched(reader_guid))
{
if (publisher_)
{
RTPSParticipant* rtps_participant = publisher_->rtps_participant();
if (rtps_participant &&
rtps_participant->get_subscription_info(subscription_data, reader_guid))
{
ret = RETCODE_OK;
}
}
}

return ret;
}

ReturnCode_t DataWriterImpl::get_matched_subscriptions(
std::vector<InstanceHandle_t>& subscription_handles) const
{
ReturnCode_t ret = RETCODE_ERROR;
std::vector<rtps::GUID_t> matched_reader_guids;
subscription_handles.clear();

if (writer_ && writer_->matched_readers_guids(matched_reader_guids))
{
for (const rtps::GUID_t& guid : matched_reader_guids)
{
subscription_handles.emplace_back(InstanceHandle_t(guid));
}
ret = RETCODE_OK;
}

return ret;
}

bool DataWriterImpl::is_relevant(
const fastdds::rtps::CacheChange_t& change,
const fastdds::rtps::GUID_t& reader_guid) const
Expand Down
25 changes: 25 additions & 0 deletions src/cpp/fastdds/publisher/DataWriterImpl.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -381,6 +381,31 @@ class DataWriterImpl : protected rtps::IReaderDataFilter
void filter_is_being_removed(
const char* filter_class_name);

/**
* @brief Retrieves in a subscription associated with the @ref DataWriter
*
* @param[out] subscription_data subscription data struct
* @param subscription_handle @ref InstanceHandle_t of the subscription
* @return @ref RETCODE_BAD_PARAMETER if the DataWriter is not matched with
* the given subscription handle, @ref RETCODE_OK otherwise.
*
*/
ReturnCode_t get_matched_subscription_data(
SubscriptionBuiltinTopicData& subscription_data,
const InstanceHandle_t& subscription_handle) const;

/**
* @brief Fills the given vector with the @ref InstanceHandle_t of matched DataReaders
*
* @param[out] subscription_handles Vector where the @ref InstanceHandle_t are returned
* @return @ref RETCODE_OK if the operation succeeds.
*
* @note Returning an empty list is not an error, it returns @ref RETCODE_OK.
*
*/
ReturnCode_t get_matched_subscriptions(
std::vector<InstanceHandle_t>& subscription_handles) const;

/**
* Retrieve the publication data discovery information.
*
Expand Down
13 changes: 2 additions & 11 deletions src/cpp/fastdds/subscriber/DataReader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -394,22 +394,13 @@ ReturnCode_t DataReader::get_matched_publication_data(
PublicationBuiltinTopicData& publication_data,
const fastdds::rtps::InstanceHandle_t& publication_handle) const
{
static_cast<void> (publication_data);
static_cast<void> (publication_handle);
return RETCODE_UNSUPPORTED;
/*
return impl_->get_matched_publication_data(publication_data, publication_handle);
*/
return impl_->get_matched_publication_data(publication_data, publication_handle);
}

ReturnCode_t DataReader::get_matched_publications(
std::vector<InstanceHandle_t>& publication_handles) const
{
static_cast<void> (publication_handles);
return RETCODE_UNSUPPORTED;
/*
return impl_->get_matched_publication_data(publication_handles);
*/
return impl_->get_matched_publications(publication_handles);
}

ReadCondition* DataReader::create_readcondition(
Expand Down
42 changes: 42 additions & 0 deletions src/cpp/fastdds/subscriber/DataReaderImpl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1211,6 +1211,48 @@ ReturnCode_t DataReaderImpl::get_subscription_matched_status(
return RETCODE_OK;
}

ReturnCode_t DataReaderImpl::get_matched_publication_data(
PublicationBuiltinTopicData& publication_data,
const InstanceHandle_t& publication_handle) const
{
ReturnCode_t ret = RETCODE_BAD_PARAMETER;
fastdds::rtps::GUID_t writer_guid = iHandle2GUID(publication_handle);

if (reader_ && reader_->matched_writer_is_matched(writer_guid))
{
if (subscriber_)
{
RTPSParticipant* rtps_participant = subscriber_->rtps_participant();
if (rtps_participant &&
rtps_participant->get_publication_info(publication_data, writer_guid))
{
ret = RETCODE_OK;
}
}
}

return ret;
}

ReturnCode_t DataReaderImpl::get_matched_publications(
std::vector<InstanceHandle_t>& publication_handles) const
{
ReturnCode_t ret = RETCODE_ERROR;
std::vector<rtps::GUID_t> matched_writers_guids;
publication_handles.clear();

if (reader_ && reader_->matched_writers_guids(matched_writers_guids))
{
for (const rtps::GUID_t& guid : matched_writers_guids)
{
publication_handles.emplace_back(InstanceHandle_t(guid));
}
ret = RETCODE_OK;
}

return ret;
}

bool DataReaderImpl::deadline_timer_reschedule()
{
assert(qos_.deadline().period != dds::c_TimeInfinite);
Expand Down
25 changes: 25 additions & 0 deletions src/cpp/fastdds/subscriber/DataReaderImpl.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -234,6 +234,31 @@ class DataReaderImpl
ReturnCode_t get_subscription_matched_status(
SubscriptionMatchedStatus& status);

/**
* @brief Retrieves in a publication associated with the DataWriter
*
* @param[out] publication_data publication data struct
* @param publication_handle @ref InstanceHandle_t of the publication
* @return @ref RETCODE_BAD_PARAMETER if the DataReader is not matched with
* the given publication handle, @ref RETCODE_OK otherwise.
*
*/
ReturnCode_t get_matched_publication_data(
rtps::PublicationBuiltinTopicData& publication_data,
const InstanceHandle_t& publication_handle) const;

/**
* @brief Fills the given vector with the @ref InstanceHandle_t of matched DataReaders
*
* @param[out] publication_handles Vector where the @ref InstanceHandle_t are returned
* @return @ref RETCODE_OK if the operation succeeds.
*
* @note Returning an empty list is not an error, it returns @ref RETCODE_OK.
*
*/
ReturnCode_t get_matched_publications(
std::vector<InstanceHandle_t>& publication_handles) const;

ReturnCode_t get_requested_deadline_missed_status(
RequestedDeadlineMissedStatus& status);

Expand Down
14 changes: 14 additions & 0 deletions src/cpp/rtps/participant/RTPSParticipant.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -194,6 +194,20 @@ std::vector<TransportNetmaskFilterInfo> RTPSParticipant::get_netmask_filter_info
return mp_impl->get_netmask_filter_info();
}

bool RTPSParticipant::get_publication_info(
PublicationBuiltinTopicData& data,
const GUID_t& writer_guid) const
{
return mp_impl->get_publication_info(data, writer_guid);
}

bool RTPSParticipant::get_subscription_info(
SubscriptionBuiltinTopicData& data,
const GUID_t& reader_guid) const
{
return mp_impl->get_subscription_info(data, reader_guid);
}

#if HAVE_SECURITY

bool RTPSParticipant::is_security_enabled_for_writer(
Expand Down
34 changes: 34 additions & 0 deletions src/cpp/rtps/participant/RTPSParticipantImpl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2791,6 +2791,40 @@ std::vector<TransportNetmaskFilterInfo> RTPSParticipantImpl::get_netmask_filter_
return m_network_Factory.netmask_filter_info();
}

bool RTPSParticipantImpl::get_publication_info(
PublicationBuiltinTopicData& data,
const GUID_t& writer_guid) const
{
bool ret = false;
WriterProxyData wproxy_data(m_att.allocation.locators.max_unicast_locators,
m_att.allocation.locators.max_multicast_locators);

if (mp_builtinProtocols->mp_PDP->lookupWriterProxyData(writer_guid, wproxy_data))
{
from_proxy_to_builtin(wproxy_data, data);
ret = true;
}

return ret;
}

bool RTPSParticipantImpl::get_subscription_info(
SubscriptionBuiltinTopicData& data,
const GUID_t& reader_guid) const
{
bool ret = false;
ReaderProxyData rproxy_data(m_att.allocation.locators.max_unicast_locators,
m_att.allocation.locators.max_multicast_locators);

if (mp_builtinProtocols->mp_PDP->lookupReaderProxyData(reader_guid, rproxy_data))
{
from_proxy_to_builtin(rproxy_data, data);
ret = true;
}

return ret;
}

#ifdef FASTDDS_STATISTICS

bool RTPSParticipantImpl::register_in_writer(
Expand Down
24 changes: 24 additions & 0 deletions src/cpp/rtps/participant/RTPSParticipantImpl.h
Original file line number Diff line number Diff line change
Expand Up @@ -1120,6 +1120,30 @@ class RTPSParticipantImpl
*/
std::vector<TransportNetmaskFilterInfo> get_netmask_filter_info() const;

/**
* @brief Fills the provided @ref PublicationBuiltinTopicData with the information of the
* writer identified by writer_guid.
*
* @param[out] data @ref PublicationBuiltinTopicData to fill.
* @param[in] writer_guid GUID of the writer to get the information from.
* @return True if the writer was found and the data was filled.
*/
bool get_publication_info(
PublicationBuiltinTopicData& data,
const GUID_t& writer_guid) const;

/**
* @brief Fills the provided @ref SubscriptionBuiltinTopicData with the information of the
* reader identified by reader_guid.
*
* @param[out] data @ref SubscriptionBuiltinTopicData to fill.
* @param[in] reader_guid GUID of the reader to get the information from.
* @return True if the reader was found and the data was filled.
*/
bool get_subscription_info(
SubscriptionBuiltinTopicData& data,
const GUID_t& reader_guid) const;

template <EndpointKind_t kind, octet no_key, octet with_key>
static bool preprocess_endpoint_attributes(
const EntityId_t& entity_id,
Expand Down
13 changes: 13 additions & 0 deletions src/cpp/rtps/reader/StatefulReader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1492,6 +1492,19 @@ void StatefulReader::end_sample_access_nts(
}
}

bool StatefulReader::matched_writers_guids(
std::vector<GUID_t>& guids) const
{
std::lock_guard<RecursiveTimedMutex> guard(mp_mutex);
guids.clear();
guids.reserve(matched_writers_.size());
for (WriterProxy* writer : matched_writers_)
{
guids.emplace_back(writer->guid());
}
return true;
}

#ifdef FASTDDS_STATISTICS

bool StatefulReader::get_connections(
Expand Down
Loading
Loading