Skip to content

Commit

Permalink
Feature: Extended incompatible QoS for monitor service (#5385)
Browse files Browse the repository at this point in the history
* Extended Incompatible QoS for Monitor Service Tests implementation (#5294)

* Refs #21756: Update Monitor Service IDL

Signed-off-by: Mario Dominguez <[email protected]>

* Refs #21756: Update Monitor Service Types

Signed-off-by: Mario Dominguez <[email protected]>

* Refs #21756: BB Tests

Signed-off-by: Mario Dominguez <[email protected]>

* Refs #21756: little bugfix in guids collections

Signed-off-by: Mario Dominguez <[email protected]>

* Refs #21756: Make processed msgs count cleaner

Signed-off-by: Mario Dominguez <[email protected]>

---------

Signed-off-by: Mario Dominguez <[email protected]>

* Extended Incompatible QoS for Monitor Service Feature implementation (#5345)

* Refs #21841: Update IProxyObserver

Signed-off-by: Mario Dominguez <[email protected]>

* Refs #21841: Add event hooks in PDP and EDP

Signed-off-by: Mario Dominguez <[email protected]>

* Refs #21841: Add new IProxyObserver implementor methods in MonitorServiceListener

Signed-off-by: Mario Dominguez <[email protected]>

* Refs #21841: Update MonitorService class

Signed-off-by: Mario Dominguez <[email protected]>

* Refs #21841: Update unittests

Signed-off-by: Mario Dominguez <[email protected]>

* Refs #21841: Linter

Signed-off-by: Mario Dominguez <[email protected]>

* Refs #21841: Apply Miguels review

Signed-off-by: Mario Dominguez <[email protected]>

* Refs #21841: Add mmissing methods to mock

Signed-off-by: Mario Dominguez <[email protected]>

* Refs #21841: Apply second round of suggestions

Signed-off-by: Mario Dominguez <[email protected]>

* Refs #21841: Apply third rev round

Signed-off-by: Mario Dominguez <[email protected]>

* Refs #21841: Apply last suggestions

Signed-off-by: Mario Dominguez <[email protected]>

---------

Signed-off-by: Mario Dominguez <[email protected]>

* Refs #21706: versions.md

Signed-off-by: Mario Dominguez <[email protected]>

---------

Signed-off-by: Mario Dominguez <[email protected]>
  • Loading branch information
Mario-DL authored Nov 8, 2024
1 parent 7458386 commit d607eef
Show file tree
Hide file tree
Showing 29 changed files with 2,632 additions and 88 deletions.
29 changes: 20 additions & 9 deletions include/fastdds/statistics/monitorservice_types.idl
Original file line number Diff line number Diff line change
Expand Up @@ -74,19 +74,28 @@ module statistics {
typedef BaseStatus_s InconsistentTopicStatus_s;
typedef BaseStatus_s SampleLostStatus_s;

struct ExtendedIncompatibleQoSStatus_s
{
detail::GUID_s remote_guid;
sequence<unsigned long> current_incompatible_policies;
};

typedef sequence<ExtendedIncompatibleQoSStatus_s> ExtendedIncompatibleQoSStatusSeq_s;

module StatusKind
{
typedef unsigned long StatusKind;

const StatusKind PROXY = 0;
const StatusKind CONNECTION_LIST = 1;
const StatusKind INCOMPATIBLE_QOS = 2;
const StatusKind INCONSISTENT_TOPIC = 3;
const StatusKind LIVELINESS_LOST = 4;
const StatusKind LIVELINESS_CHANGED = 5;
const StatusKind DEADLINE_MISSED = 6;
const StatusKind SAMPLE_LOST = 7;
const StatusKind STATUSES_SIZE = 8;
const StatusKind PROXY = 0;
const StatusKind CONNECTION_LIST = 1;
const StatusKind INCOMPATIBLE_QOS = 2;
const StatusKind INCONSISTENT_TOPIC = 3;
const StatusKind LIVELINESS_LOST = 4;
const StatusKind LIVELINESS_CHANGED = 5;
const StatusKind DEADLINE_MISSED = 6;
const StatusKind SAMPLE_LOST = 7;
const StatusKind EXTENDED_INCOMPATIBLE_QOS = 8;
const StatusKind STATUSES_SIZE = 9;
}; // module StatusKind

union MonitorServiceData switch(StatusKind::StatusKind)
Expand All @@ -107,6 +116,8 @@ module statistics {
DeadlineMissedStatus_s deadline_missed_status;
case StatusKind::SAMPLE_LOST:
SampleLostStatus_s sample_lost_status;
case StatusKind::EXTENDED_INCOMPATIBLE_QOS:
ExtendedIncompatibleQoSStatusSeq_s extended_incompatible_qos_status;
case StatusKind::STATUSES_SIZE:
octet statuses_size;
};
Expand Down
6 changes: 6 additions & 0 deletions src/cpp/rtps/builtin/discovery/endpoint/EDP.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -872,6 +872,7 @@ bool EDP::pairingReader(
if (no_match_reason.test(MatchingFailureMask::incompatible_qos) && reader->get_listener() != nullptr)
{
reader->get_listener()->on_requested_incompatible_qos(reader, incompatible_qos);
mp_PDP->notify_incompatible_qos_matching(R->getGuid(), wdatait->guid(), incompatible_qos);
}

//EPROSIMA_LOG_INFO(RTPS_EDP,RTPS_CYAN<<"Valid Matching to writerProxy: "<<wdatait->m_guid<<RTPS_DEF<<endl);
Expand Down Expand Up @@ -964,6 +965,7 @@ bool EDP::pairingWriter(
if (no_match_reason.test(MatchingFailureMask::incompatible_qos) && writer->get_listener() != nullptr)
{
writer->get_listener()->on_offered_incompatible_qos(writer, incompatible_qos);
mp_PDP->notify_incompatible_qos_matching(W->getGuid(), rdatait->guid(), incompatible_qos);
}

//EPROSIMA_LOG_INFO(RTPS_EDP,RTPS_CYAN<<"Valid Matching to writerProxy: "<<wdatait->m_guid<<RTPS_DEF<<endl);
Expand Down Expand Up @@ -1039,6 +1041,7 @@ bool EDP::pairing_reader_proxy_with_any_local_writer(
if (no_match_reason.test(MatchingFailureMask::incompatible_qos) && w.get_listener() != nullptr)
{
w.get_listener()->on_offered_incompatible_qos(&w, incompatible_qos);
mp_PDP->notify_incompatible_qos_matching(w.getGuid(), rdata->guid(), incompatible_qos);
}

if (w.matched_reader_is_matched(reader_guid)
Expand Down Expand Up @@ -1107,6 +1110,7 @@ bool EDP::pairing_reader_proxy_with_local_writer(
w.get_listener() != nullptr)
{
w.get_listener()->on_offered_incompatible_qos(&w, incompatible_qos);
mp_PDP->notify_incompatible_qos_matching(local_writer, rdata.guid(), incompatible_qos);
}

if (w.matched_reader_is_matched(reader_guid)
Expand Down Expand Up @@ -1230,6 +1234,7 @@ bool EDP::pairing_writer_proxy_with_any_local_reader(
if (no_match_reason.test(MatchingFailureMask::incompatible_qos) && r.get_listener() != nullptr)
{
r.get_listener()->on_requested_incompatible_qos(&r, incompatible_qos);
mp_PDP->notify_incompatible_qos_matching(r.getGuid(), wdata->guid(), incompatible_qos);
}

if (r.matched_writer_is_matched(writer_guid)
Expand Down Expand Up @@ -1298,6 +1303,7 @@ bool EDP::pairing_writer_proxy_with_local_reader(
r.get_listener() != nullptr)
{
r.get_listener()->on_requested_incompatible_qos(&r, incompatible_qos);
mp_PDP->notify_incompatible_qos_matching(local_reader, wdata.guid(), incompatible_qos);
}

if (r.matched_writer_is_matched(writer_guid)
Expand Down
1 change: 0 additions & 1 deletion src/cpp/rtps/builtin/discovery/endpoint/EDP.h
Original file line number Diff line number Diff line change
Expand Up @@ -364,7 +364,6 @@ class EDP
bool checkDataRepresentationQos(
const WriterProxyData* wdata,
const ReaderProxyData* rdata) const;

};

} // namespace rtps
Expand Down
37 changes: 37 additions & 0 deletions src/cpp/rtps/builtin/discovery/participant/PDP.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -808,6 +808,15 @@ bool PDP::removeReaderProxyData(
listener->on_reader_discovery(participant, reason, info, should_be_ignored);
}

#ifdef FASTDDS_STATISTICS
auto proxy_observer = get_proxy_observer();
// notify monitor service
if (nullptr != proxy_observer)
{
proxy_observer->on_remote_proxy_data_removed(pR->guid());
}
#endif // ifdef FASTDDS_STATISTICS

// Clear reader proxy data and move to pool in order to allow reuse
pR->clear();
pit->m_readers->erase(rit);
Expand Down Expand Up @@ -848,6 +857,15 @@ bool PDP::removeWriterProxyData(
listener->on_writer_discovery(participant, status, info, should_be_ignored);
}

#ifdef FASTDDS_STATISTICS
auto proxy_observer = get_proxy_observer();
// notify monitor service
if (nullptr != get_proxy_observer())
{
proxy_observer->on_remote_proxy_data_removed(pW->guid());
}
#endif // ifdef FASTDDS_STATISTICS

// Clear writer proxy data and move to pool in order to allow reuse
pW->clear();
pit->m_writers->erase(wit);
Expand Down Expand Up @@ -1746,6 +1764,25 @@ void PDP::local_participant_attributes_update_nts(
}
}

void PDP::notify_incompatible_qos_matching(
const GUID_t& local_guid,
const GUID_t& remote_guid,
const fastdds::dds::PolicyMask& incompatible_qos) const
{
#ifdef FASTDDS_STATISTICS
auto proxy_observer = get_proxy_observer();
// Notify the IProxyObserver implementor of a qos incompatibility
if (nullptr != proxy_observer)
{
proxy_observer->on_incompatible_qos_matching(local_guid, remote_guid, incompatible_qos);
}
#else
static_cast<void>(local_guid);
static_cast<void>(remote_guid);
static_cast<void>(incompatible_qos);
#endif // FASTDDS_STATISTICS
}

void PDP::update_endpoint_locators_if_default_nts(
const std::vector<BaseWriter*>& writers,
const std::vector<BaseReader*>& readers,
Expand Down
15 changes: 14 additions & 1 deletion src/cpp/rtps/builtin/discovery/participant/PDP.h
Original file line number Diff line number Diff line change
Expand Up @@ -470,7 +470,7 @@ class PDP : public fastdds::statistics::rtps::IProxyQueryable
void set_proxy_observer(
const fastdds::statistics::rtps::IProxyObserver* proxy_observer);

const fastdds::statistics::rtps::IProxyObserver* get_proxy_observer()
const fastdds::statistics::rtps::IProxyObserver* get_proxy_observer() const
{
return proxy_observer_.load();
}
Expand Down Expand Up @@ -500,6 +500,19 @@ class PDP : public fastdds::statistics::rtps::IProxyQueryable
const RTPSParticipantAttributes& old_atts,
const RTPSParticipantAttributes& new_atts);

/**
* @brief Notify monitor the IProxyObserver implementor about
* any incompatible QoS matching between a local and a remote entity.
*
* @param local_guid GUID of the local entity.
* @param remote_guid GUID of the remote entity.
* @param incompatible_qos The PolicyMask with the incompatible QoS.
*/
void notify_incompatible_qos_matching(
const GUID_t& local_guid,
const GUID_t& remote_guid,
const fastdds::dds::PolicyMask& incompatible_qos) const;

protected:

//!Pointer to the builtin protocols object.
Expand Down
132 changes: 132 additions & 0 deletions src/cpp/statistics/rtps/monitor-service/MonitorService.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -159,13 +159,26 @@ bool MonitorService::disable_monitor_service()
bool MonitorService::remove_local_entity(
const fastdds::rtps::EntityId_t& entity_id)
{
// Remove the entity from the extended incompatible QoS collection
{
std::lock_guard<std::mutex> lock(extended_incompatible_qos_mtx_);
GUID_t entity_guid = {local_participant_guid_.guidPrefix, entity_id};
extended_incompatible_qos_collection_.erase(entity_guid);
}

// Remove the entity from the local entities
{
std::lock_guard<std::mutex> lock (mtx_);

//! Add the entity to the changed entities if was not already present
if (!local_entities_[entity_id].second)
{
changed_entities_.push_back(entity_id);
if (!timer_active_.load())
{
event_->restart_timer();
timer_active_.store(true);
}
}

//! But remove it from the collection of entities
Expand Down Expand Up @@ -326,6 +339,12 @@ bool MonitorService::write_status(
status_retrieved = status_queryable_.get_monitoring_status(local_entity_guid, data);
break;
}
case StatusKind::EXTENDED_INCOMPATIBLE_QOS:
{
std::lock_guard<std::mutex> lock(extended_incompatible_qos_mtx_);
data.extended_incompatible_qos_status(extended_incompatible_qos_collection_[local_entity_guid]);
break;
}
default:
{
EPROSIMA_LOG_ERROR(MONITOR_SERVICE, "Referring to an unknown status");
Expand Down Expand Up @@ -570,6 +589,119 @@ bool MonitorService::spin_queue()
return re_schedule;
}

void MonitorService::on_incompatible_qos_matching(
const fastdds::rtps::GUID_t& local_guid,
const fastdds::rtps::GUID_t& remote_guid,
const fastdds::dds::PolicyMask& incompatible_qos_policies)
{
// Convert the PolicyMask to a vector of policy ids
std::vector<uint32_t> incompatible_policies;
for (uint32_t id = 1; id < dds::NEXT_QOS_POLICY_ID; ++id)
{
if (incompatible_qos_policies.test(id))
{
incompatible_policies.push_back(id);
}
}

std::lock_guard<std::mutex> lock(extended_incompatible_qos_mtx_);

if (!incompatible_policies.empty())
{
// Check if the local_guid is already in the collection. If not, create a new entry
auto local_entity_incompatibilites =
extended_incompatible_qos_collection_.insert({local_guid, {}});

bool first_incompatibility_with_remote = false;

// Local entity already in the collection (has any incompatible QoS with any remote entity)
if (!local_entity_incompatibilites.second)
{
// Check if the local entitiy already had an incompatibility with this remote entity
auto it = std::find_if(
local_entity_incompatibilites.first->second.begin(),
local_entity_incompatibilites.first->second.end(),
[&remote_guid](const ExtendedIncompatibleQoSStatus_s& status)
{
return to_fastdds_type(status.remote_guid()) == remote_guid;
});

if (it == local_entity_incompatibilites.first->second.end())
{
// First incompatibility with that remote entity
first_incompatibility_with_remote = true;
}
else
{
// Already had an incompatibility with that remote entity.
// Update them
it->current_incompatible_policies(incompatible_policies);
push_entity_update(local_guid.entityId, StatusKind::EXTENDED_INCOMPATIBLE_QOS);
}
}
else
{
// This will be the first incompatibility of this entity
first_incompatibility_with_remote = true;
}

if (first_incompatibility_with_remote)
{
ExtendedIncompatibleQoSStatus_s status;
status.remote_guid(to_statistics_type(remote_guid));
status.current_incompatible_policies(incompatible_policies);
local_entity_incompatibilites.first->second.emplace_back(status);
push_entity_update(local_guid.entityId, StatusKind::EXTENDED_INCOMPATIBLE_QOS);
}
}
else
{
// Remove remote guid from the local guid incompatibilities collection
auto it = extended_incompatible_qos_collection_.find(local_guid);

if (it != extended_incompatible_qos_collection_.end())
{
auto it_remote = std::find_if(
it->second.begin(),
it->second.end(),
[&remote_guid](const ExtendedIncompatibleQoSStatus_s& status)
{
return to_fastdds_type(status.remote_guid()) == remote_guid;
});

if (it_remote != it->second.end())
{
it->second.erase(it_remote);
push_entity_update(local_guid.entityId, StatusKind::EXTENDED_INCOMPATIBLE_QOS);
}
}
}
}

void MonitorService::on_remote_proxy_data_removed(
const fastdds::rtps::GUID_t& removed_proxy_guid)
{
auto& ext_incompatible_qos_collection = extended_incompatible_qos_collection_;
std::lock_guard<std::mutex> lock(extended_incompatible_qos_mtx_);

for (auto& local_entity : ext_incompatible_qos_collection)
{
auto it = std::find_if(
local_entity.second.begin(),
local_entity.second.end(),
[&removed_proxy_guid](const ExtendedIncompatibleQoSStatus_s& status)
{
return to_fastdds_type(status.remote_guid()) == removed_proxy_guid;
});

if (it != local_entity.second.end())
{
local_entity.second.erase(it);
push_entity_update(local_entity.first.entityId, StatusKind::EXTENDED_INCOMPATIBLE_QOS);
}
}
}

} // namespace rtps
} // namespace statistics
} // namespace fastdds
Expand Down
31 changes: 31 additions & 0 deletions src/cpp/statistics/rtps/monitor-service/MonitorService.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,30 @@ class MonitorService
const fastdds::rtps::EntityId_t& entity_id,
const uint32_t& status_id);

/**
* @brief Process any updates regarding
* remote entities incompatible QoS matching.
*
* @param local_guid The GUID_t identifying the local entity
* @param remote_guid The GUID_t identifying the remote entity
* @param incompatible_qos The PolicyMask with the incompatible QoS
*
*/
void on_incompatible_qos_matching(
const fastdds::rtps::GUID_t& local_guid,
const fastdds::rtps::GUID_t& remote_guid,
const fastdds::dds::PolicyMask& incompatible_qos_policies);

/**
* @brief Notifies that a remote proxy data has been removed.
* This is interesting to notify proxy removals independently
* of the remote entity being matched or not.
*
* @param removed_proxy_guid GUID of the removed proxy.
*/
void on_remote_proxy_data_removed(
const fastdds::rtps::GUID_t& removed_proxy_guid);

private:

/**
Expand Down Expand Up @@ -257,6 +281,13 @@ class MonitorService
endpoint_registrator_t endpoint_registrator_;

MonitorServiceStatusDataPubSubType type_;

// Stores the current extended incompatible qos status
// of local entities with remote entities and their policies.
std::map<fastdds::rtps::GUID_t, ExtendedIncompatibleQoSStatusSeq_s>
extended_incompatible_qos_collection_;

std::mutex extended_incompatible_qos_mtx_;
};

#endif // FASTDDS_STATISTICS
Expand Down
Loading

0 comments on commit d607eef

Please sign in to comment.