From b130791c4ba50fc1ca43b4a25ce37be5a5c5742c Mon Sep 17 00:00:00 2001 From: Rex Schilasky <49162693+rex-schilasky@users.noreply.github.com> Date: Thu, 29 Aug 2024 14:03:02 +0200 Subject: [PATCH] [core] add publisher registration event callback API (#1717) * add/rem publisher registration event callback API * add/rem subscriber registration event callback API --- ecal/core/include/ecal/ecal_registration.h | 48 +++++ ecal/core/src/ecal_descgate.cpp | 108 +++++++++-- ecal/core/src/ecal_descgate.h | 26 ++- .../src/registration/ecal_registration.cpp | 24 +++ .../ecal_registration_provider.cpp | 12 +- .../shm/ecal_registration_sender_shm.cpp | 8 +- .../shm/ecal_registration_sender_shm.h | 3 + .../massive_pub_sub/src/massive_pub_sub.cpp | 60 +++++- .../registration_test_public/CMakeLists.txt | 2 + .../src/registration_getpublisherids.cpp | 174 ++++++++++++++++++ .../src/registration_getsubscriberids.cpp | 174 ++++++++++++++++++ .../src/registration_gettopics.cpp | 66 ------- 12 files changed, 606 insertions(+), 99 deletions(-) create mode 100644 ecal/tests/cpp/registration_test_public/src/registration_getpublisherids.cpp create mode 100644 ecal/tests/cpp/registration_test_public/src/registration_getsubscriberids.cpp diff --git a/ecal/core/include/ecal/ecal_registration.h b/ecal/core/include/ecal/ecal_registration.h index 5afd1ffdec..4222efa840 100644 --- a/ecal/core/include/ecal/ecal_registration.h +++ b/ecal/core/include/ecal/ecal_registration.h @@ -28,6 +28,7 @@ #include #include +#include #include #include #include @@ -94,6 +95,17 @@ namespace eCAL using QualityServiceInfoMultimap = std::multimap; using SQualityServiceInfoSet = std::set; + using CallbackToken = std::size_t; + + enum class RegistrationEventType + { + new_entity, //!< Represents a new entity registration + deleted_entity //!< Represents a deletion of an entity + }; + + using TopicIDCallbackT = std::function; + using ServiceIDCallbackT = std::function; + /** * @brief Get complete snapshot of all known publisher. * @@ -108,6 +120,24 @@ namespace eCAL **/ ECAL_API bool GetPublisherInfo(const STopicId& id_, SQualityTopicInfo& topic_info_); + /** + * @brief Register a callback function to be notified when a new publisher becomes available. + * + * @param callback_ The callback function to be called with the STopicId of the new publisher. + * The callback function must not be blocked for a longer period of time, + * otherwise timeout mechanisms of the eCAL registration would be triggered. + * + * @return CallbackToken Token that can be used to unregister the callback. + */ + ECAL_API CallbackToken AddPublisherEventCallback(const TopicIDCallbackT& callback_); + + /** + * @brief Unregister the publisher callback using the provided token. + * + * @param token The token returned by AddPublisherCallback. + */ + ECAL_API void RemPublisherEventCallback(CallbackToken token_); + /** * @brief Get complete snapshot of all known subscriber. * @@ -122,6 +152,24 @@ namespace eCAL **/ ECAL_API bool GetSubscriberInfo(const STopicId& id_, SQualityTopicInfo& topic_info_); + /** + * @brief Register a callback function to be notified when a new subscriber becomes available. + * + * @param callback_ The callback function to be called with the STopicId of the new subscriber. + * The callback function must not be blocked for a longer period of time, + * otherwise timeout mechanisms of the eCAL registration would be triggered. + * + * @return CallbackToken Token that can be used to unregister the callback. + */ + ECAL_API CallbackToken AddSubscriberEventCallback(const TopicIDCallbackT& callback_); + + /** + * @brief Unregister the subscriber callback using the provided token. + * + * @param token The token returned by AddSubscriberCallback. + */ + ECAL_API void RemSubscriberEventCallback(CallbackToken token_); + /** * @brief Get complete snapshot of all known services. * diff --git a/ecal/core/src/ecal_descgate.cpp b/ecal/core/src/ecal_descgate.cpp index 2c62df3113..6f87e7ac62 100644 --- a/ecal/core/src/ecal_descgate.cpp +++ b/ecal/core/src/ecal_descgate.cpp @@ -64,6 +64,22 @@ namespace eCAL return GetTopic(id_, m_publisher_info_map, topic_info_); } + Registration::CallbackToken CDescGate::AddPublisherEventCallback(const Registration::TopicIDCallbackT& callback_) + { + const std::lock_guard lock(m_publisher_callback_map.mtx); + + const Registration::CallbackToken new_token = CreateToken(); + m_publisher_callback_map.map[new_token] = callback_; + + return new_token; + } + + void CDescGate::RemPublisherEventCallback(Registration::CallbackToken token_) + { + const std::lock_guard lock(m_publisher_callback_map.mtx); + m_publisher_callback_map.map.erase(token_); + } + std::set CDescGate::GetSubscriberIDs() const { return GetTopicIDs(m_subscriber_info_map); @@ -74,6 +90,22 @@ namespace eCAL return GetTopic(id_, m_subscriber_info_map, topic_info_); } + Registration::CallbackToken CDescGate::AddSubscriberEventCallback(const Registration::TopicIDCallbackT& callback_) + { + const std::lock_guard lock(m_subscriber_callback_map.mtx); + + const Registration::CallbackToken new_token = CreateToken(); + m_subscriber_callback_map.map[new_token] = callback_; + + return new_token; + } + + void CDescGate::RemSubscriberEventCallback(Registration::CallbackToken token_) + { + const std::lock_guard lock(m_subscriber_callback_map.mtx); + m_subscriber_callback_map.map.erase(token_); + } + std::set CDescGate::GetServiceIDs() const { return GetServiceIDs(m_service_info_map); @@ -126,7 +158,7 @@ namespace eCAL std::set service_id_set; const std::lock_guard lock(service_method_info_map_.mtx); - for (const auto& service_method_info_map_it : service_method_info_map_.map) + for (const auto& service_method_info_map_it : service_method_info_map_.id_map) { service_id_set.insert(service_method_info_map_it.first); } @@ -136,8 +168,8 @@ namespace eCAL bool CDescGate::GetService(const Registration::SServiceId& id_, const SQualityServiceIdMap& service_method_info_map_, Registration::SQualityServiceInfo& service_method_info_) { const std::lock_guard lock(service_method_info_map_.mtx); - auto iter = service_method_info_map_.map.find(id_); - if (iter == service_method_info_map_.map.end()) + auto iter = service_method_info_map_.id_map.find(id_); + if (iter == service_method_info_map_.id_map.end()) { return false; } @@ -194,16 +226,16 @@ namespace eCAL RemServiceDescription(m_client_info_map, sample_.identifier, sample_.client.sname); break; case bct_reg_publisher: - ApplyTopicDescription(m_publisher_info_map, sample_.identifier, sample_.topic.tname, sample_.topic.tdatatype, GetDataTypeInfoQuality(sample_.topic.tdatatype, true)); + ApplyTopicDescription(m_publisher_info_map, m_publisher_callback_map, sample_.identifier, sample_.topic.tname, sample_.topic.tdatatype, GetDataTypeInfoQuality(sample_.topic.tdatatype, true)); break; case bct_unreg_publisher: - RemTopicDescription(m_publisher_info_map, sample_.identifier, sample_.topic.tname); + RemTopicDescription(m_publisher_info_map, m_publisher_callback_map, sample_.identifier, sample_.topic.tname); break; case bct_reg_subscriber: - ApplyTopicDescription(m_subscriber_info_map, sample_.identifier, sample_.topic.tname, sample_.topic.tdatatype, GetDataTypeInfoQuality(sample_.topic.tdatatype, false)); + ApplyTopicDescription(m_subscriber_info_map, m_subscriber_callback_map, sample_.identifier, sample_.topic.tname, sample_.topic.tdatatype, GetDataTypeInfoQuality(sample_.topic.tdatatype, false)); break; case bct_unreg_subscriber: - RemTopicDescription(m_subscriber_info_map, sample_.identifier, sample_.topic.tname); + RemTopicDescription(m_subscriber_info_map, m_subscriber_callback_map, sample_.identifier, sample_.topic.tname); break; default: { @@ -214,6 +246,7 @@ namespace eCAL } void CDescGate::ApplyTopicDescription(SQualityTopicIdMap& topic_info_map_, + const STopicIdCallbackMap& topic_callback_map_, const Registration::SampleIdentifier& topic_id_, const std::string& topic_name_, const SDataTypeInformation& topic_info_, @@ -225,16 +258,55 @@ namespace eCAL topic_quality_info.info = topic_info_; topic_quality_info.quality = topic_quality_; - const std::unique_lock lock(topic_info_map_.mtx); - topic_info_map_.map[topic_info_key] = topic_quality_info; + // update topic info + bool new_topic_info(false); + { + const std::unique_lock lock(topic_info_map_.mtx); + const auto iter = topic_info_map_.map.find(topic_info_key); + new_topic_info = iter == topic_info_map_.map.end(); + topic_info_map_.map[topic_info_key] = topic_quality_info; + } + + // notify publisher / subscriber registration callbacks about new entity + if(new_topic_info) + { + const std::unique_lock lock(topic_callback_map_.mtx); + for (const auto& callback_iter : topic_callback_map_.map) + { + if (callback_iter.second) + { + callback_iter.second(topic_info_key, Registration::RegistrationEventType::new_entity); + } + } + } } void CDescGate::RemTopicDescription(SQualityTopicIdMap& topic_info_map_, + const STopicIdCallbackMap& topic_callback_map_, const Registration::SampleIdentifier& topic_id_, const std::string& topic_name_) { - const std::unique_lock lock(topic_info_map_.mtx); - topic_info_map_.map.erase(Registration::STopicId{ ConvertToEntityId(topic_id_) , topic_name_}); + const auto topic_info_key = Registration::STopicId{ ConvertToEntityId(topic_id_), topic_name_ }; + + // delete topic info + bool deleted_topic_info(false); + { + const std::unique_lock lock(topic_info_map_.mtx); + deleted_topic_info = topic_info_map_.map.erase(topic_info_key) > 0; + } + + // notify publisher / subscriber registration callbacks about deleted entity + if (deleted_topic_info) + { + const std::unique_lock lock(topic_callback_map_.mtx); + for (const auto& callback_iter : topic_callback_map_.map) + { + if (callback_iter.second) + { + callback_iter.second(topic_info_key, Registration::RegistrationEventType::deleted_entity); + } + } + } } void CDescGate::ApplyServiceDescription(SQualityServiceIdMap& service_method_info_map_, @@ -255,7 +327,7 @@ namespace eCAL service_quality_info.response_quality = response_type_quality_; const std::lock_guard lock(service_method_info_map_.mtx); - service_method_info_map_.map[service_method_info_key] = service_quality_info; + service_method_info_map_.id_map[service_method_info_key] = service_quality_info; } void CDescGate::RemServiceDescription(SQualityServiceIdMap& service_method_info_map_, @@ -266,7 +338,7 @@ namespace eCAL const std::lock_guard lock(service_method_info_map_.mtx); - for (auto&& service_it : service_method_info_map_.map) + for (auto&& service_it : service_method_info_map_.id_map) { const auto service_method_info_key = service_it.first; if ((service_method_info_key.service_name == service_name_) @@ -278,7 +350,15 @@ namespace eCAL for (const auto& service_method_info_key : service_method_info_keys_to_remove) { - service_method_info_map_.map.erase(service_method_info_key); + service_method_info_map_.id_map.erase(service_method_info_key); } } + + Registration::CallbackToken CDescGate::CreateToken() + { + // Atomically increment m_callback_token using fetch_add to ensure thread safety. + // fetch_add returns the value before increment, so we add 1 to get the new token value. + // memory_order_relaxed is used to optimize performance without additional synchronization. + return m_callback_token.fetch_add(1, std::memory_order_relaxed) + 1; + } } diff --git a/ecal/core/src/ecal_descgate.h b/ecal/core/src/ecal_descgate.h index 6224e4940d..76262e45e5 100644 --- a/ecal/core/src/ecal_descgate.h +++ b/ecal/core/src/ecal_descgate.h @@ -29,6 +29,7 @@ #include "serialization/ecal_struct_sample_registration.h" +#include #include #include #include @@ -49,10 +50,14 @@ namespace eCAL // get publisher information std::set GetPublisherIDs() const; bool GetPublisherInfo(const Registration::STopicId& id_, Registration::SQualityTopicInfo& topic_info_) const; + Registration::CallbackToken AddPublisherEventCallback(const Registration::TopicIDCallbackT& callback_); + void RemPublisherEventCallback(Registration::CallbackToken token_); // get subscriber information std::set GetSubscriberIDs() const; bool GetSubscriberInfo(const Registration::STopicId& id_, Registration::SQualityTopicInfo& topic_info_) const; + Registration::CallbackToken AddSubscriberEventCallback(const Registration::TopicIDCallbackT& callback_); + void RemSubscriberEventCallback(Registration::CallbackToken token_); // get service information std::set GetServiceIDs() const; @@ -71,18 +76,25 @@ namespace eCAL CDescGate& operator=(CDescGate&&) = delete; protected: - using QualityTopicIdMap = std::map; + using QualityTopicIdMap = std::map; struct SQualityTopicIdMap { mutable std::mutex mtx; QualityTopicIdMap map; }; + using TopicIdCallbackMap = std::map; + struct STopicIdCallbackMap + { + mutable std::mutex mtx; + TopicIdCallbackMap map; + }; + using QualityServiceIdMap = std::map; struct SQualityServiceIdMap { mutable std::mutex mtx; - QualityServiceIdMap map; + QualityServiceIdMap id_map; }; static std::set GetTopicIDs(const SQualityTopicIdMap& topic_info_map_); @@ -92,12 +104,14 @@ namespace eCAL static bool GetService (const Registration::SServiceId& id_, const SQualityServiceIdMap& service_method_info_map_, Registration::SQualityServiceInfo& service_method_info_); static void ApplyTopicDescription(SQualityTopicIdMap& topic_info_map_, + const STopicIdCallbackMap& topic_callback_map_, const Registration::SampleIdentifier& topic_id_, const std::string& topic_name_, const SDataTypeInformation& topic_info_, Registration::DescQualityFlags topic_quality_); static void RemTopicDescription(SQualityTopicIdMap& topic_info_map_, + const STopicIdCallbackMap& topic_callback_map_, const Registration::SampleIdentifier& topic_id_, const std::string& topic_name_); @@ -114,12 +128,20 @@ namespace eCAL const Registration::SampleIdentifier& service_id_, const std::string& service_name_); + Registration::CallbackToken CreateToken(); + // internal quality topic info publisher/subscriber maps SQualityTopicIdMap m_publisher_info_map; + STopicIdCallbackMap m_publisher_callback_map; + SQualityTopicIdMap m_subscriber_info_map; + STopicIdCallbackMap m_subscriber_callback_map; // internal quality service info service/client maps SQualityServiceIdMap m_service_info_map; SQualityServiceIdMap m_client_info_map; + + mutable std::mutex m_callback_token_mtx; + std::atomic m_callback_token{ 0 }; }; } diff --git a/ecal/core/src/registration/ecal_registration.cpp b/ecal/core/src/registration/ecal_registration.cpp index 21b7bc4ed2..1a7108b155 100644 --- a/ecal/core/src/registration/ecal_registration.cpp +++ b/ecal/core/src/registration/ecal_registration.cpp @@ -173,6 +173,18 @@ namespace eCAL return g_descgate()->GetPublisherInfo(id_, topic_info_); } + ECAL_API CallbackToken AddPublisherEventCallback(const TopicIDCallbackT& callback_) + { + if (g_descgate() == nullptr) return CallbackToken(); + return g_descgate()->AddPublisherEventCallback(callback_); + } + + ECAL_API void RemPublisherEventCallback(CallbackToken token_) + { + if (g_descgate() == nullptr) return; + return g_descgate()->RemPublisherEventCallback(token_); + } + std::set GetSubscriberIDs() { if (g_descgate() == nullptr) return std::set(); @@ -185,6 +197,18 @@ namespace eCAL return g_descgate()->GetSubscriberInfo(id_, topic_info_); } + ECAL_API CallbackToken AddSubscriberEventCallback(const TopicIDCallbackT& callback_) + { + if (g_descgate() == nullptr) return CallbackToken(); + return g_descgate()->AddSubscriberEventCallback(callback_); + } + + ECAL_API void RemSubscriberEventCallback(CallbackToken token_) + { + if (g_descgate() == nullptr) return; + return g_descgate()->RemSubscriberEventCallback(token_); + } + std::set GetServiceIDs() { if (g_descgate() == nullptr) return std::set(); diff --git a/ecal/core/src/registration/ecal_registration_provider.cpp b/ecal/core/src/registration/ecal_registration_provider.cpp index c953023a40..8121c33b4a 100644 --- a/ecal/core/src/registration/ecal_registration_provider.cpp +++ b/ecal/core/src/registration/ecal_registration_provider.cpp @@ -167,15 +167,15 @@ namespace eCAL if (g_clientgate() != nullptr) g_clientgate()->GetRegistrations(sample_list); #endif - // send collected registration sample list - m_reg_sender->SendSampleList(sample_list); - - // send asynchronously applied samples at the end of the registration loop + // append applied samples list to sample list + if (!m_applied_sample_list.samples.empty()) { const std::lock_guard lock(m_applied_sample_list_mtx); - m_reg_sender->SendSampleList(m_applied_sample_list); - m_applied_sample_list.samples.clear(); + sample_list.samples.splice(sample_list.samples.end(), m_applied_sample_list.samples); } + + // send collected registration sample list + m_reg_sender->SendSampleList(sample_list); } } } diff --git a/ecal/core/src/registration/shm/ecal_registration_sender_shm.cpp b/ecal/core/src/registration/shm/ecal_registration_sender_shm.cpp index a3f6e05f2b..c27eb9a306 100644 --- a/ecal/core/src/registration/shm/ecal_registration_sender_shm.cpp +++ b/ecal/core/src/registration/shm/ecal_registration_sender_shm.cpp @@ -54,13 +54,13 @@ bool eCAL::CRegistrationSenderSHM::SendSampleList(const Registration::SampleList { bool return_value{true}; // serialize whole sample list - std::vector sample_list_buffer; - if (SerializeToBuffer(sample_list, sample_list_buffer)) + m_sample_list_buffer.clear(); + if (SerializeToBuffer(sample_list, m_sample_list_buffer)) { - if (!sample_list_buffer.empty()) + if (!m_sample_list_buffer.empty()) { // broadcast sample list over shm - return_value &= m_memfile_broadcast_writer.Write(sample_list_buffer.data(), sample_list_buffer.size()); + return_value &= m_memfile_broadcast_writer.Write(m_sample_list_buffer.data(), m_sample_list_buffer.size()); } } return return_value; diff --git a/ecal/core/src/registration/shm/ecal_registration_sender_shm.h b/ecal/core/src/registration/shm/ecal_registration_sender_shm.h index 8cda6a9e67..f67ba0de80 100644 --- a/ecal/core/src/registration/shm/ecal_registration_sender_shm.h +++ b/ecal/core/src/registration/shm/ecal_registration_sender_shm.h @@ -35,6 +35,8 @@ #include "attributes/registration_shm_attributes.h" +#include + namespace eCAL { class CRegistrationSenderSHM : public CRegistrationSender @@ -55,5 +57,6 @@ namespace eCAL private: CMemoryFileBroadcast m_memfile_broadcast; CMemoryFileBroadcastWriter m_memfile_broadcast_writer; + std::vector m_sample_list_buffer; }; } \ No newline at end of file diff --git a/ecal/samples/cpp/benchmarks/massive_pub_sub/src/massive_pub_sub.cpp b/ecal/samples/cpp/benchmarks/massive_pub_sub/src/massive_pub_sub.cpp index fb30f34784..dc2b46e6e4 100644 --- a/ecal/samples/cpp/benchmarks/massive_pub_sub/src/massive_pub_sub.cpp +++ b/ecal/samples/cpp/benchmarks/massive_pub_sub/src/massive_pub_sub.cpp @@ -25,14 +25,14 @@ #include #include -const int subscriber_number (10000); +const int subscriber_number (5000); -const int publisher_number (10000); +const int publisher_number (5000); const int publisher_type_encoding_size_bytes (10*1024); const int publisher_type_descriptor_size_bytes (10*1024); const int in_between_sleep_sec (5); -const int final_sleep_sec (0); +const int final_sleep_sec (5); std::string GenerateSizedString(const std::string& name, size_t totalSize) { @@ -56,13 +56,38 @@ std::string GenerateSizedString(const std::string& name, size_t totalSize) int main(int argc, char** argv) { - // initialize eCAL API with shm monitoring + // set eCAL configuration eCAL::Configuration configuration; - configuration.registration.layer.shm.enable = true; - configuration.registration.layer.udp.enable = false; + configuration.registration.registration_timeout = 10000; // registration timeout == 10 sec + configuration.registration.layer.shm.enable = true; // switch shm registration on and + configuration.registration.layer.udp.enable = false; // switch udp registration off + + // initialize eCAL API eCAL::Initialize(configuration, "massive_pub_sub"); - eCAL::Util::EnableLoopback(true); + // publisher registration event callback + size_t created_publisher_num(0); + size_t deleted_publisher_num(0); + std::set created_publisher_ids; + std::set deleted_publisher_ids; + eCAL::Registration::AddPublisherEventCallback( + [&](const eCAL::Registration::STopicId& id_, eCAL::Registration::RegistrationEventType event_type_) + { + switch (event_type_) + { + case eCAL::Registration::RegistrationEventType::new_entity: + created_publisher_num++; + created_publisher_ids.insert(id_); + //std::cout << "Publisher created" << std::endl; + break; + case eCAL::Registration::RegistrationEventType::deleted_entity: + deleted_publisher_num++; + deleted_publisher_ids.insert(id_); + //std::cout << "Publisher deleted" << std::endl; + break; + } + } + ); // create subscriber std::vector vector_of_subscriber; @@ -174,7 +199,28 @@ int main(int argc, char** argv) std::cout << num_pub << ")" << std::endl << "Time taken to get publisher information: " << duration << " milliseconds" << std::endl << std::endl; } + // check creation events + const std::set publisher_ids = eCAL::Registration::GetPublisherIDs(); + std::cout << "Number of publisher creation events " << created_publisher_num << std::endl; + std::cout << "Size of publisher creation id set " << created_publisher_ids.size() << std::endl; + //std::cout << "Publisher creation id sets are equal " << (publisher_ids == created_publisher_ids) << std::endl; + std::cout << std::endl; + + // delete all publisher + std::cout << "Delete all publisher .." << std::endl; + vector_of_publisher.clear(); + std::cout << "Deletion done." << std::endl; + std::cout << std::endl; + // sleep for a few seconds + std::this_thread::sleep_for(std::chrono::seconds(in_between_sleep_sec)); + + // check deletion events + std::cout << "Number of publisher deletion events " << deleted_publisher_num << std::endl; + std::cout << "Size of publisher deletion id set " << deleted_publisher_ids.size() << std::endl; + //std::cout << "Publisher deleteion id sets are equal " << (publisher_ids == deleted_publisher_ids) << std::endl; + + // sleep final seconds std::this_thread::sleep_for(std::chrono::seconds(final_sleep_sec)); // finalize eCAL API diff --git a/ecal/tests/cpp/registration_test_public/CMakeLists.txt b/ecal/tests/cpp/registration_test_public/CMakeLists.txt index c20365afca..9605dc3ced 100644 --- a/ecal/tests/cpp/registration_test_public/CMakeLists.txt +++ b/ecal/tests/cpp/registration_test_public/CMakeLists.txt @@ -23,6 +23,8 @@ find_package(GTest REQUIRED) if(ECAL_CORE_PUBLISHER AND ECAL_CORE_SUBSCRIBER) set(registration_test_topics_src + src/registration_getpublisherids.cpp + src/registration_getsubscriberids.cpp src/registration_gettopics.cpp ) endif() diff --git a/ecal/tests/cpp/registration_test_public/src/registration_getpublisherids.cpp b/ecal/tests/cpp/registration_test_public/src/registration_getpublisherids.cpp new file mode 100644 index 0000000000..7f20b44bf3 --- /dev/null +++ b/ecal/tests/cpp/registration_test_public/src/registration_getpublisherids.cpp @@ -0,0 +1,174 @@ +/* ========================= eCAL LICENSE ================================= + * + * Copyright (C) 2016 - 2024 Continental Corporation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * ========================= eCAL LICENSE ================================= +*/ + +#include + +#include + +#include +#include +#include + +// struct to hold the test parameters +struct TestParams +{ + int publisher_count = 0; + eCAL::Configuration configuration; +}; + +// test class that accepts TestParams as a parameter +class EcalTest : public ::testing::TestWithParam +{ +protected: + void SetUp() override + { + // set configuration from the test parameters + auto params = GetParam(); + eCAL::Initialize(params.configuration, "core_cpp_registration_publisherids"); + } + + void TearDown() override + { + // clean up + eCAL::Finalize(); + } +}; + +TEST_P(EcalTest, GetPublisherIDsReturnsCorrectNumber) +{ + { + // create publishers for testing + std::vector publisher_vec; + for (int i = 0; i < GetParam().publisher_count; ++i) + { + std::stringstream tname; + tname << "topic_" << i; + + eCAL::SDataTypeInformation data_type_info; + data_type_info.name = tname.str() + "_type_name"; + data_type_info.encoding = tname.str() + "_type_encoding"; + data_type_info.descriptor = tname.str() + "_type_descriptor"; + + publisher_vec.emplace_back(tname.str(), data_type_info); + } + + // let's register + eCAL::Process::SleepMS(2 * GetParam().configuration.registration.registration_refresh); + + // get the list of publisher IDs + const auto pub_ids1 = eCAL::Registration::GetPublisherIDs(); + + // verify the number of publishers created + ASSERT_EQ(pub_ids1.size(), GetParam().publisher_count); + } + + // let's finally timeout + eCAL::Process::SleepMS(GetParam().configuration.registration.registration_timeout); + + // get the list of publisher IDs + const auto pub_ids2 = eCAL::Registration::GetPublisherIDs(); + + // all publisher should be timeouted + ASSERT_EQ(pub_ids2.size(), 0); +} + +TEST_P(EcalTest, PublisherEventCallbackIsTriggered) +{ + std::atomic created_publisher_num(0); + std::atomic deleted_publisher_num(0); + std::set created_publisher_ids; + std::set deleted_publisher_ids; + + // register the callback + auto callback_token = eCAL::Registration::AddPublisherEventCallback( + [&](const eCAL::Registration::STopicId& id, eCAL::Registration::RegistrationEventType event_type) + { + if (event_type == eCAL::Registration::RegistrationEventType::new_entity) + { + created_publisher_num++; + created_publisher_ids.insert(id); + } + else if (event_type == eCAL::Registration::RegistrationEventType::deleted_entity) + { + deleted_publisher_num++; + deleted_publisher_ids.insert(id); + } + }); + + { + // create publishers for testing + std::vector publisher_vec; + for (int i = 0; i < GetParam().publisher_count; ++i) + { + std::stringstream tname; + tname << "topic_" << i; + + eCAL::SDataTypeInformation data_type_info; + data_type_info.name = tname.str() + "_type_name"; + data_type_info.encoding = tname.str() + "_type_encoding"; + data_type_info.descriptor = tname.str() + "_type_descriptor"; + + publisher_vec.emplace_back(tname.str(), data_type_info); + } + + // let's register + eCAL::Process::SleepMS(2 * GetParam().configuration.registration.registration_refresh); + + // verify the number of publishers created through the callback + ASSERT_EQ(created_publisher_num.load(), GetParam().publisher_count); + + // clear publishers to trigger deletion events + publisher_vec.clear(); + + // let's register the deletion events + eCAL::Process::SleepMS(2 * GetParam().configuration.registration.registration_refresh); + + // verify the number of publishers deleted through the callback + ASSERT_EQ(deleted_publisher_num.load(), GetParam().publisher_count); + } + + // unregister the callback + eCAL::Registration::RemPublisherEventCallback(callback_token); +} + +// instantiate the test suite with different configurations and publisher counts +INSTANTIATE_TEST_SUITE_P( + GetPublisherIdTests, + EcalTest, + ::testing::Values( + TestParams{ 10, []() { + // shm + eCAL::Configuration config; + config.registration.registration_refresh = 100; + config.registration.registration_timeout = 200; + config.registration.layer.shm.enable = true; + config.registration.layer.udp.enable = false; + return config; + }() }, + TestParams{ 10, []() { + // udp + eCAL::Configuration config; + config.registration.registration_refresh = 100; + config.registration.registration_timeout = 200; + config.registration.layer.shm.enable = false; + config.registration.layer.udp.enable = true; + return config; + }() } + ) +); diff --git a/ecal/tests/cpp/registration_test_public/src/registration_getsubscriberids.cpp b/ecal/tests/cpp/registration_test_public/src/registration_getsubscriberids.cpp new file mode 100644 index 0000000000..40ba6a95be --- /dev/null +++ b/ecal/tests/cpp/registration_test_public/src/registration_getsubscriberids.cpp @@ -0,0 +1,174 @@ +/* ========================= eCAL LICENSE ================================= + * + * Copyright (C) 2016 - 2024 Continental Corporation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * ========================= eCAL LICENSE ================================= +*/ + +#include + +#include + +#include +#include +#include + +// struct to hold the test parameters +struct TestParams +{ + int subscriber_count = 0; + eCAL::Configuration configuration; +}; + +// test class that accepts TestParams as a parameter +class EcalTest : public ::testing::TestWithParam +{ +protected: + void SetUp() override + { + // set configuration from the test parameters + auto params = GetParam(); + eCAL::Initialize(params.configuration, "core_cpp_registration_subscriberids"); + } + + void TearDown() override + { + // clean up + eCAL::Finalize(); + } +}; + +TEST_P(EcalTest, GetSubscriberIDsReturnsCorrectNumber) +{ + { + // create subscribers for testing + std::vector subscriber_vec; + for (int i = 0; i < GetParam().subscriber_count; ++i) + { + std::stringstream tname; + tname << "topic_" << i; + + eCAL::SDataTypeInformation data_type_info; + data_type_info.name = tname.str() + "_type_name"; + data_type_info.encoding = tname.str() + "_type_encoding"; + data_type_info.descriptor = tname.str() + "_type_descriptor"; + + subscriber_vec.emplace_back(tname.str(), data_type_info); + } + + // let's register + eCAL::Process::SleepMS(2 * GetParam().configuration.registration.registration_refresh); + + // get the list of subscriber IDs + const auto sub_ids1 = eCAL::Registration::GetSubscriberIDs(); + + // verify the number of subscribers created + ASSERT_EQ(sub_ids1.size(), GetParam().subscriber_count); + } + + // let's finally timeout + eCAL::Process::SleepMS(2 * GetParam().configuration.registration.registration_timeout); + + // get the list of subscriber IDs + const auto sub_ids2 = eCAL::Registration::GetSubscriberIDs(); + + // all subscriber should be timeouted + ASSERT_EQ(sub_ids2.size(), 0); +} + +TEST_P(EcalTest, SubscriberEventCallbackIsTriggered) +{ + std::atomic created_subscriber_num(0); + std::atomic deleted_subscriber_num(0); + std::set created_subscriber_ids; + std::set deleted_subscriber_ids; + + // register the callback + auto callback_token = eCAL::Registration::AddSubscriberEventCallback( + [&](const eCAL::Registration::STopicId& id, eCAL::Registration::RegistrationEventType event_type) + { + if (event_type == eCAL::Registration::RegistrationEventType::new_entity) + { + created_subscriber_num++; + created_subscriber_ids.insert(id); + } + else if (event_type == eCAL::Registration::RegistrationEventType::deleted_entity) + { + deleted_subscriber_num++; + deleted_subscriber_ids.insert(id); + } + }); + + { + // create subscribers for testing + std::vector subscriber_vec; + for (int i = 0; i < GetParam().subscriber_count; ++i) + { + std::stringstream tname; + tname << "topic_" << i; + + eCAL::SDataTypeInformation data_type_info; + data_type_info.name = tname.str() + "_type_name"; + data_type_info.encoding = tname.str() + "_type_encoding"; + data_type_info.descriptor = tname.str() + "_type_descriptor"; + + subscriber_vec.emplace_back(tname.str(), data_type_info); + } + + // let's register + eCAL::Process::SleepMS(2 * GetParam().configuration.registration.registration_refresh); + + // verify the number of subscribers created through the callback + ASSERT_EQ(created_subscriber_num.load(), GetParam().subscriber_count); + + // clear subscribers to trigger deletion events + subscriber_vec.clear(); + + // let's register the deletion events + eCAL::Process::SleepMS(2 * GetParam().configuration.registration.registration_refresh); + + // verify the number of subscribers deleted through the callback + ASSERT_EQ(deleted_subscriber_num.load(), GetParam().subscriber_count); + } + + // unregister the callback + eCAL::Registration::RemSubscriberEventCallback(callback_token); +} + +// instantiate the test suite with different configurations and subscriber counts +INSTANTIATE_TEST_SUITE_P( + GetSubscriberIdTests, + EcalTest, + ::testing::Values( + TestParams{ 10, []() { + // shm + eCAL::Configuration config; + config.registration.registration_refresh = 100; + config.registration.registration_timeout = 200; + config.registration.layer.shm.enable = true; + config.registration.layer.udp.enable = false; + return config; + }() }, + TestParams{ 10, []() { + // udp + eCAL::Configuration config; + config.registration.registration_refresh = 100; + config.registration.registration_timeout = 200; + config.registration.layer.shm.enable = false; + config.registration.layer.udp.enable = true; + return config; + }() } + ) +); diff --git a/ecal/tests/cpp/registration_test_public/src/registration_gettopics.cpp b/ecal/tests/cpp/registration_test_public/src/registration_gettopics.cpp index 2f65bc9143..f88db3801e 100644 --- a/ecal/tests/cpp/registration_test_public/src/registration_gettopics.cpp +++ b/ecal/tests/cpp/registration_test_public/src/registration_gettopics.cpp @@ -201,69 +201,3 @@ TEST(core_cpp_registration_public, GetTopicsParallel) // finalize eCAL API eCAL::Finalize(); } - -TEST(core_cpp_registration_public, GetTopicIDs) -{ - // initialize eCAL API - eCAL::Initialize(0, nullptr, "core_cpp_registration_public"); - - // enable loop back communication in the same process - eCAL::Util::EnableLoopback(true); - - // create and check a few pub/sub entities - { - eCAL::SDataTypeInformation info_A1{ "typeA1" ,"", "descA1" }; - eCAL::SDataTypeInformation info_A2{ "typeA2" ,"", "descA2" }; - eCAL::SDataTypeInformation info_A3{ "typeA3" ,"", "descA3" }; - - eCAL::SDataTypeInformation info_B1{ "typeB1" ,"", "descB1" }; - eCAL::SDataTypeInformation info_B2{ "typeB2" ,"", "descB2" }; - - // create 3 publisher - eCAL::CPublisher pub1("A1", info_A1); - eCAL::CPublisher pub2("A2", info_A2); - eCAL::CPublisher pub3("A3", info_A3); - - // create 2 subscriber - eCAL::CSubscriber sub1("B1", info_B1); - eCAL::CSubscriber sub2("B2", info_B2); - - // let's register - eCAL::Process::SleepMS(2 * CMN_REGISTRATION_REFRESH_MS); - - // get publisher - { - auto id_set = eCAL::Registration::GetPublisherIDs(); - EXPECT_EQ(3, id_set.size()); - - // check publisher datatype information - for (const auto& id : id_set) - { - eCAL::Registration::SQualityTopicInfo info; - EXPECT_TRUE(eCAL::Registration::GetPublisherInfo(id, info)); - EXPECT_EQ(std::string("type") + id.topic_name, info.info.name); - EXPECT_EQ("", info.info.encoding); - EXPECT_EQ(std::string("desc") + id.topic_name, info.info.descriptor); - } - } - - // get subscriber - { - auto id_set = eCAL::Registration::GetSubscriberIDs(); - EXPECT_EQ(2, id_set.size()); - - // check subscriber datatype information - for (const auto& id : id_set) - { - eCAL::Registration::SQualityTopicInfo info; - EXPECT_TRUE(eCAL::Registration::GetSubscriberInfo(id, info)); - EXPECT_EQ(std::string("type") + id.topic_name, info.info.name); - EXPECT_EQ("", info.info.encoding); - EXPECT_EQ(std::string("desc") + id.topic_name, info.info.descriptor); - } - } - } - - // finalize eCAL API - eCAL::Finalize(); -}