diff --git a/src/cpp/rtps/builtin/discovery/endpoint/EDPServerListeners.cpp b/src/cpp/rtps/builtin/discovery/endpoint/EDPServerListeners.cpp index 29dd81a74d4..23c4156e7f3 100644 --- a/src/cpp/rtps/builtin/discovery/endpoint/EDPServerListeners.cpp +++ b/src/cpp/rtps/builtin/discovery/endpoint/EDPServerListeners.cpp @@ -62,8 +62,7 @@ void EDPServerPUBListener::onNewCacheChangeAdded( EPROSIMA_LOG_WARNING(RTPS_EDP_LISTENER, "Received change with no Key"); } - // Get writer's GUID and EDP publications' reader history - GUID_t auxGUID = iHandle2GUID(change->instanceHandle); + // Get EDP publications' reader history ReaderHistory* reader_history = reader->getHistory(); // Related_sample_identity could be lost in message delivered, so we set as sample_identity @@ -78,7 +77,6 @@ void EDPServerPUBListener::onNewCacheChangeAdded( change->writer_info.previous = nullptr; change->writer_info.num_sent_submessages = 0; - // DATA(w) case: new writer or updated information about an existing writer if (change->kind == ALIVE) { @@ -89,6 +87,9 @@ void EDPServerPUBListener::onNewCacheChangeAdded( // return it to the pool add_writer_from_change(reader, reader_history, change, sedp_, false, writer_added_callback); + // DATA(w) case: Retrieve the topic after creating the WriterProxyData (in add_writer_from_change()). This way, not matter + // whether the DATA(w) is a new one or an update, the WriterProxyData exists, and so the topic can be retrieved + // Stop and wait for callback in case of TypeLookupService needed time to process the types return; } @@ -97,29 +98,24 @@ void EDPServerPUBListener::onNewCacheChangeAdded( { EPROSIMA_LOG_INFO(RTPS_EDP_LISTENER, "Disposed Remote Writer, removing..."); + // DATA(Uw) case: Retrieve the topic before removing the WriterProxyData. We need it to add the DATA(Uw) to the database + GUID_t auxGUID = iHandle2GUID(change->instanceHandle); + std::string topic_name = get_writer_proxy_topic_name(auxGUID); + // Remove WriterProxy data information get_pdp()->removeWriterProxyData(auxGUID); - // Removing change from history, not returning the change to the pool, since the ownership will be yielded to - // the database + // Removing change from history, not returning the change to the pool, since the ownership will be yielded to the database reader_history->remove_change(reader_history->find_change(change), false); - // Continue without waiting - continue_with_writer(reader, change); + notify_discoverydatabase(topic_name, reader, change); } } -void EDPServerPUBListener::continue_with_writer( - RTPSReader* reader, - CacheChange_t* change) +std::string EDPServerPUBListener::get_writer_proxy_topic_name( + GUID_t auxGUID) { - GUID_t auxGUID = iHandle2GUID(change->instanceHandle); - // String to store the topic of the writer std::string topic_name = ""; - - // DATA(w) case: Retrieve the topic after creating the WriterProxyData (in add_writer_from_change()). This way, not matter - // whether the DATA(w) is a new one or an update, the WriterProxyData exists, and so the topic can be retrieved - // DATA(Uw) case: Retrieve the topic before removing the WriterProxyData. We need it to add the DATA(Uw) to the database auto temp_writer_data = get_pdp()->get_temporary_writer_proxies_pool().get(); if (get_pdp()->lookupWriterProxyData(auxGUID, *temp_writer_data)) { @@ -129,7 +125,14 @@ void EDPServerPUBListener::continue_with_writer( { EPROSIMA_LOG_WARNING(RTPS_EDP_LISTENER, "Writer Proxy Data missing for change " << auxGUID); } + return topic_name; +} +void EDPServerPUBListener::notify_discoverydatabase( + std::string topic_name, + RTPSReader* reader, + CacheChange_t* change) +{ // Notify the DiscoveryDataBase if it is enabled already // In case it is not enable, the change should not be updated or released because it is been // updated from a backup @@ -156,6 +159,14 @@ void EDPServerPUBListener::continue_with_writer( EPROSIMA_LOG_INFO(RTPS_EDP_LISTENER, ""); } +void EDPServerPUBListener::continue_with_writer( + RTPSReader* reader, + CacheChange_t* change) +{ + std::string topic_name = get_writer_proxy_topic_name(iHandle2GUID(change->instanceHandle)); + notify_discoverydatabase(topic_name, reader, change); +} + PDPServer* EDPServerSUBListener::get_pdp() { return sedp_->get_pdp(); @@ -198,12 +209,9 @@ void EDPServerSUBListener::onNewCacheChangeAdded( change->writer_info.previous = nullptr; change->writer_info.num_sent_submessages = 0; - // Get readers's GUID and EDP subscriptions' reader history - GUID_t auxGUID = iHandle2GUID(change->instanceHandle); + // Get EDP subscriptions' reader history ReaderHistory* reader_history = reader->getHistory(); - - // DATA(r) case: new reader or updated information about an existing reader if (change->kind == ALIVE) { @@ -214,6 +222,9 @@ void EDPServerSUBListener::onNewCacheChangeAdded( // return it to the pool add_reader_from_change(reader, reader_history, change, sedp_, false, reader_added_callback); + // DATA(w) case: Retrieve the topic after creating the ReaderProxyData (in add_reader_from_change()). This way, not matter + // whether the DATA(r) is a new one or an update, the ReaderProxyData exists, and so the topic can be retrieved + // Stop and wait for callback in case of TypeLookupService needed time to process the types return; } @@ -223,6 +234,10 @@ void EDPServerSUBListener::onNewCacheChangeAdded( //REMOVE WRITER FROM OUR READERS: EPROSIMA_LOG_INFO(RTPS_EDP_LISTENER, "Disposed Remote Reader, removing..."); + // DATA(Uw) case: Retrieve the topic before removing the ReaderProxyData. We need it to add the DATA(Ur) to the database + GUID_t auxGUID = iHandle2GUID(change->instanceHandle); + std::string topic_name = get_reader_proxy_topic_name(auxGUID); + // Remove ReaderProxy data information get_pdp()->removeReaderProxyData(auxGUID); @@ -230,22 +245,14 @@ void EDPServerSUBListener::onNewCacheChangeAdded( // the database reader_history->remove_change(reader_history->find_change(change), false); - // Continue without waiting - continue_with_reader(reader, change); + notify_discoverydatabase(topic_name, reader, change); } } -void EDPServerSUBListener::continue_with_reader( - RTPSReader* reader, - CacheChange_t* change) +std::string EDPServerSUBListener::get_reader_proxy_topic_name( + GUID_t auxGUID) { - GUID_t auxGUID = iHandle2GUID(change->instanceHandle); - // String to store the topic of the reader std::string topic_name = ""; - - // DATA(w) case: Retrieve the topic after creating the ReaderProxyData (in add_reader_from_change()). This way, not matter - // whether the DATA(r) is a new one or an update, the ReaderProxyData exists, and so the topic can be retrieved - // DATA(Uw) case: Retrieve the topic before removing the ReaderProxyData. We need it to add the DATA(Ur) to the database auto temp_reader_data = get_pdp()->get_temporary_reader_proxies_pool().get(); if (get_pdp()->lookupReaderProxyData(auxGUID, *temp_reader_data)) { @@ -255,7 +262,14 @@ void EDPServerSUBListener::continue_with_reader( { EPROSIMA_LOG_WARNING(RTPS_EDP_LISTENER, "Reader Proxy Data missing for change " << auxGUID); } + return topic_name; +} +void EDPServerSUBListener::notify_discoverydatabase( + std::string topic_name, + RTPSReader* reader, + CacheChange_t* change) +{ // Notify the DiscoveryDataBase if it is enabled already // In case it is not enable, the change should not be updated or released because it is been // updated from a backup @@ -282,6 +296,14 @@ void EDPServerSUBListener::continue_with_reader( EPROSIMA_LOG_INFO(RTPS_EDP_LISTENER, ""); } +void EDPServerSUBListener::continue_with_reader( + RTPSReader* reader, + CacheChange_t* change) +{ + std::string topic_name = get_reader_proxy_topic_name(iHandle2GUID(change->instanceHandle)); + notify_discoverydatabase(topic_name, reader, change); +} + } /* namespace rtps */ } // namespace fastdds } /* namespace eprosima */ diff --git a/src/cpp/rtps/builtin/discovery/endpoint/EDPServerListeners.hpp b/src/cpp/rtps/builtin/discovery/endpoint/EDPServerListeners.hpp index c796e0a0847..5dd370ec426 100644 --- a/src/cpp/rtps/builtin/discovery/endpoint/EDPServerListeners.hpp +++ b/src/cpp/rtps/builtin/discovery/endpoint/EDPServerListeners.hpp @@ -72,11 +72,19 @@ class EDPServerPUBListener : public fastrtps::rtps::EDPBasePUBListener fastrtps::rtps::RTPSReader* reader, const fastrtps::rtps::CacheChange_t* const change) override; - void continue_with_writer( +private: + + std::string get_writer_proxy_topic_name( + fastrtps::rtps::GUID_t auxGUID); + + void notify_discoverydatabase( + std::string topic_name, fastrtps::rtps::RTPSReader* reader, fastrtps::rtps::CacheChange_t* change); -private: + void continue_with_writer( + fastrtps::rtps::RTPSReader* reader, + fastrtps::rtps::CacheChange_t* change); //!Pointer to the EDPServer EDPServer* sedp_; @@ -110,11 +118,19 @@ class EDPServerSUBListener : public fastrtps::rtps::EDPBaseSUBListener fastrtps::rtps::RTPSReader* reader, const fastrtps::rtps::CacheChange_t* const change) override; - void continue_with_reader( +private: + + std::string get_reader_proxy_topic_name( + fastrtps::rtps::GUID_t auxGUID); + + void notify_discoverydatabase( + std::string topic_name, fastrtps::rtps::RTPSReader* reader, fastrtps::rtps::CacheChange_t* change); -private: + void continue_with_reader( + fastrtps::rtps::RTPSReader* reader, + fastrtps::rtps::CacheChange_t* change); //!Pointer to the EDPServer EDPServer* sedp_; diff --git a/src/cpp/rtps/builtin/discovery/endpoint/EDPSimpleListeners.h b/src/cpp/rtps/builtin/discovery/endpoint/EDPSimpleListeners.h index 6dcce1c9267..9c9c34b54ac 100644 --- a/src/cpp/rtps/builtin/discovery/endpoint/EDPSimpleListeners.h +++ b/src/cpp/rtps/builtin/discovery/endpoint/EDPSimpleListeners.h @@ -34,7 +34,7 @@ namespace fastrtps { namespace rtps { using EndpointAddedCallback = std::function< - void (RTPSReader* reader, const CacheChange_t* const change)>; + void (RTPSReader* reader, const CacheChange_t* change)>; class RTPSReader; struct CacheChange_t; diff --git a/src/cpp/rtps/builtin/discovery/participant/PDPClient.cpp b/src/cpp/rtps/builtin/discovery/participant/PDPClient.cpp index 7328573aade..fba0cf377fe 100644 --- a/src/cpp/rtps/builtin/discovery/participant/PDPClient.cpp +++ b/src/cpp/rtps/builtin/discovery/participant/PDPClient.cpp @@ -27,7 +27,6 @@ #include #include -#include #include #include #include @@ -37,6 +36,7 @@ #include #include +#include #include #include #include diff --git a/src/cpp/rtps/builtin/discovery/participant/PDPServer.cpp b/src/cpp/rtps/builtin/discovery/participant/PDPServer.cpp index d6814035965..8a4cd1e3247 100644 --- a/src/cpp/rtps/builtin/discovery/participant/PDPServer.cpp +++ b/src/cpp/rtps/builtin/discovery/participant/PDPServer.cpp @@ -22,7 +22,6 @@ #include #include -#include #include #include #include @@ -33,6 +32,7 @@ #include #include +#include #include #include #include