From faddd2373efe3d1754f1206062b61f8c91723ad3 Mon Sep 17 00:00:00 2001 From: Kerstin Keller Date: Wed, 24 Jul 2024 13:41:25 +0200 Subject: [PATCH] [core] unregistration of timed out samples in one central place (instead of all over the place). Modifies also CExpirationMap interface to return keys and values of expired items. --- ecal/core/CMakeLists.txt | 2 + ecal/core/src/ecal_descgate.cpp | 14 +- ecal/core/src/ecal_descgate.h | 17 +- ecal/core/src/ecal_globals.cpp | 2 +- .../src/monitoring/ecal_monitoring_impl.cpp | 16 +- .../src/monitoring/ecal_monitoring_impl.h | 26 +- ecal/core/src/readwrite/ecal_reader.cpp | 5 - ecal/core/src/readwrite/ecal_reader.h | 3 +- ecal/core/src/readwrite/ecal_writer.cpp | 5 - ecal/core/src/readwrite/ecal_writer.h | 3 +- .../ecal_registration_receiver.cpp | 29 +- .../registration/ecal_registration_receiver.h | 11 + .../ecal_registration_timeout_provider.cpp | 121 +++++++++ .../ecal_registration_timeout_provider.h | 115 ++++++++ .../ecal_struct_sample_registration.h | 88 ++++++ .../src/serialization/ecal_struct_service.h | 51 ++++ ecal/core/src/util/ecal_expmap.h | 8 +- ecal/tests/CMakeLists.txt | 2 + .../cpp/descgate_test/src/getpublisher.cpp | 93 +++++-- .../tests/cpp/expmap_test/src/expmap_test.cpp | 62 ++++- .../cpp/registration_test/CMakeLists.txt | 46 ++++ .../src/registration_timout_provider_test.cpp | 257 ++++++++++++++++++ 22 files changed, 873 insertions(+), 103 deletions(-) create mode 100644 ecal/core/src/registration/ecal_registration_timeout_provider.cpp create mode 100644 ecal/core/src/registration/ecal_registration_timeout_provider.h create mode 100644 ecal/tests/cpp/registration_test/CMakeLists.txt create mode 100644 ecal/tests/cpp/registration_test/src/registration_timout_provider_test.cpp diff --git a/ecal/core/CMakeLists.txt b/ecal/core/CMakeLists.txt index 09caa9e9f7..f12058c464 100644 --- a/ecal/core/CMakeLists.txt +++ b/ecal/core/CMakeLists.txt @@ -296,6 +296,8 @@ if (ECAL_CORE_REGISTRATION) src/registration/ecal_registration_sample_applier_gates.h src/registration/ecal_registration_sample_applier_user.cpp src/registration/ecal_registration_sample_applier_user.h + src/registration/ecal_registration_timeout_provider.cpp + src/registration/ecal_registration_timeout_provider.h src/registration/ecal_registration_sender.h src/registration/udp/ecal_registration_receiver_udp.cpp src/registration/udp/ecal_registration_receiver_udp.h diff --git a/ecal/core/src/ecal_descgate.cpp b/ecal/core/src/ecal_descgate.cpp index 52f82f5a1a..06d25fea9b 100644 --- a/ecal/core/src/ecal_descgate.cpp +++ b/ecal/core/src/ecal_descgate.cpp @@ -43,13 +43,7 @@ namespace namespace eCAL { - CDescGate::CDescGate(const std::chrono::milliseconds& exp_timeout_) : - m_publisher_info_map (exp_timeout_), - m_subscriber_info_map (exp_timeout_), - m_service_info_map (exp_timeout_), - m_client_info_map (exp_timeout_) - { - } + CDescGate::CDescGate() = default; CDescGate::~CDescGate() = default; Util::QualityTopicInfoMultiMap CDescGate::GetPublishers() @@ -77,7 +71,6 @@ namespace eCAL Util::QualityTopicInfoMultiMap multi_map; const std::lock_guard lock(topic_info_map_.mtx); - topic_info_map_.map.erase_expired(); for (const auto& topic_map_it : topic_info_map_.map) { @@ -92,7 +85,6 @@ namespace eCAL Util::QualityServiceInfoMultimap multi_map; const std::lock_guard lock(service_method_info_map_.mtx); - service_method_info_map_.map.erase_expired(); for (const auto& service_method_info_map_it : service_method_info_map_.map) { @@ -183,14 +175,12 @@ namespace eCAL topic_quality_info.quality = topic_quality_; const std::unique_lock lock(topic_info_map_.mtx); - topic_info_map_.map.erase_expired(); topic_info_map_.map[topic_info_key] = topic_quality_info; } void CDescGate::RemTopicDescription(SQualityTopicIdMap& topic_info_map_, const std::string& topic_name_, const Util::TopicId& topic_id_) { const std::unique_lock lock(topic_info_map_.mtx); - topic_info_map_.map.erase_expired(); topic_info_map_.map.erase(STopicIdKey{ topic_name_, topic_id_ }); } @@ -213,7 +203,6 @@ namespace eCAL service_quality_info.response_quality = response_type_quality_; const std::lock_guard lock(service_method_info_map_.mtx); - service_method_info_map_.map.erase_expired(); service_method_info_map_.map[service_method_info_key] = service_quality_info; } @@ -222,7 +211,6 @@ namespace eCAL std::list service_method_infos_to_remove; const std::lock_guard lock(service_method_info_map_.mtx); - service_method_info_map_.map.erase_expired(); for (auto&& service_it : service_method_info_map_.map) { diff --git a/ecal/core/src/ecal_descgate.h b/ecal/core/src/ecal_descgate.h index fbc1065324..8159075c4d 100644 --- a/ecal/core/src/ecal_descgate.h +++ b/ecal/core/src/ecal_descgate.h @@ -27,7 +27,6 @@ #include #include "serialization/ecal_struct_sample_registration.h" -#include "util/ecal_expmap.h" #include #include @@ -66,7 +65,7 @@ namespace eCAL class CDescGate { public: - CDescGate(const std::chrono::milliseconds& exp_timeout_); + CDescGate(); ~CDescGate(); // apply samples to description gate @@ -89,20 +88,18 @@ namespace eCAL CDescGate& operator=(CDescGate&&) = delete; protected: - using QualityTopicIdExpMap = eCAL::Util::CExpirationMap; + using QualityTopicIdMap = std::map; struct SQualityTopicIdMap { - explicit SQualityTopicIdMap(const std::chrono::milliseconds& timeout_) : map(timeout_) {}; - mutable std::mutex mtx; - QualityTopicIdExpMap map; + mutable std::mutex mtx; + QualityTopicIdMap map; }; - using QualityServiceIdExpMap = eCAL::Util::CExpirationMap; + using QualityServiceIdMap = std::map; struct SQualityServiceIdMap { - explicit SQualityServiceIdMap(const std::chrono::milliseconds& timeout_) : map(timeout_) {}; - mutable std::mutex mtx; - QualityServiceIdExpMap map; + mutable std::mutex mtx; + QualityServiceIdMap map; }; static Util::QualityTopicInfoMultiMap GetTopics (SQualityTopicIdMap& topic_info_map_); diff --git a/ecal/core/src/ecal_globals.cpp b/ecal/core/src/ecal_globals.cpp index 56634748d4..aa4c8ad98c 100644 --- a/ecal/core/src/ecal_globals.cpp +++ b/ecal/core/src/ecal_globals.cpp @@ -74,7 +74,7 @@ namespace eCAL if (descgate_instance == nullptr) { // create description gate with configured expiration timeout - descgate_instance = std::make_unique(std::chrono::milliseconds(Config::GetMonitoringTimeoutMs())); + descgate_instance = std::make_unique(); new_initialization = true; } diff --git a/ecal/core/src/monitoring/ecal_monitoring_impl.cpp b/ecal/core/src/monitoring/ecal_monitoring_impl.cpp index f5fb5774fc..ecd9eace15 100644 --- a/ecal/core/src/monitoring/ecal_monitoring_impl.cpp +++ b/ecal/core/src/monitoring/ecal_monitoring_impl.cpp @@ -42,12 +42,7 @@ namespace eCAL // Monitoring Implementation //////////////////////////////////////// CMonitoringImpl::CMonitoringImpl() : - m_init(false), - m_process_map (std::chrono::milliseconds(Config::GetMonitoringTimeoutMs())), - m_publisher_map (std::chrono::milliseconds(Config::GetMonitoringTimeoutMs())), - m_subscriber_map(std::chrono::milliseconds(Config::GetMonitoringTimeoutMs())), - m_server_map (std::chrono::milliseconds(Config::GetMonitoringTimeoutMs())), - m_clients_map (std::chrono::milliseconds(Config::GetMonitoringTimeoutMs())) + m_init(false) { } @@ -629,7 +624,6 @@ namespace eCAL monitoring_.processes.reserve(m_process_map.map->size()); // iterate map - m_process_map.map->erase_expired(); for (const auto& process : (*m_process_map.map)) { monitoring_.processes.emplace_back(process.second); @@ -647,7 +641,6 @@ namespace eCAL monitoring_.publisher.reserve(m_publisher_map.map->size()); // iterate map - m_publisher_map.map->erase_expired(); for (const auto& publisher : (*m_publisher_map.map)) { monitoring_.publisher.emplace_back(publisher.second); @@ -665,7 +658,6 @@ namespace eCAL monitoring_.subscriber.reserve(m_subscriber_map.map->size()); // iterate map - m_subscriber_map.map->erase_expired(); for (const auto& subscriber : (*m_subscriber_map.map)) { monitoring_.subscriber.emplace_back(subscriber.second); @@ -683,7 +675,6 @@ namespace eCAL monitoring_.server.reserve(m_server_map.map->size()); // iterate map - m_server_map.map->erase_expired(); for (const auto& server : (*m_server_map.map)) { monitoring_.server.emplace_back(server.second); @@ -701,7 +692,6 @@ namespace eCAL monitoring_.clients.reserve(m_clients_map.map->size()); // iterate map - m_clients_map.map->erase_expired(); for (const auto& client : (*m_clients_map.map)) { monitoring_.clients.emplace_back(client.second); @@ -715,7 +705,6 @@ namespace eCAL const std::lock_guard lock(m_process_map.sync); // iterate map - m_process_map.map->erase_expired(); for (const auto& process : (*m_process_map.map)) { // add process @@ -729,7 +718,6 @@ namespace eCAL const std::lock_guard lock(m_server_map.sync); // iterate map - m_server_map.map->erase_expired(); for (const auto& server : (*m_server_map.map)) { // add service @@ -743,7 +731,6 @@ namespace eCAL const std::lock_guard lock(m_clients_map.sync); // iterate map - m_clients_map.map->erase_expired(); for (const auto& client : (*m_clients_map.map)) { // add client @@ -757,7 +744,6 @@ namespace eCAL const std::lock_guard lock(map_.sync); // iterate map - map_.map->erase_expired(); for (const auto& topic : (*map_.map)) { if (direction_ == "publisher") diff --git a/ecal/core/src/monitoring/ecal_monitoring_impl.h b/ecal/core/src/monitoring/ecal_monitoring_impl.h index 703bda8594..35b1f51d8f 100644 --- a/ecal/core/src/monitoring/ecal_monitoring_impl.h +++ b/ecal/core/src/monitoring/ecal_monitoring_impl.h @@ -26,11 +26,11 @@ #include #include "ecal_def.h" -#include "util/ecal_expmap.h" #include "serialization/ecal_serialize_sample_registration.h" #include +#include #include #include #include @@ -81,44 +81,44 @@ namespace eCAL bool RegisterTopic(const Registration::Sample& sample_, enum ePubSub pubsub_type_); bool UnregisterTopic(const Registration::Sample& sample_, enum ePubSub pubsub_type_); - using TopicMonMapT = Util::CExpirationMap; + using TopicMonMapT = std::map; struct STopicMonMap { - explicit STopicMonMap(const std::chrono::milliseconds& timeout_) : - map(std::make_unique(timeout_)) + explicit STopicMonMap() : + map(std::make_unique()) { }; std::mutex sync; std::unique_ptr map; }; - using ProcessMonMapT = Util::CExpirationMap; + using ProcessMonMapT = std::map; struct SProcessMonMap { - explicit SProcessMonMap(const std::chrono::milliseconds& timeout_) : - map(std::make_unique(timeout_)) + explicit SProcessMonMap() : + map(std::make_unique()) { }; std::mutex sync; std::unique_ptr map; }; - using ServerMonMapT = Util::CExpirationMap; + using ServerMonMapT = std::map; struct SServerMonMap { - explicit SServerMonMap(const std::chrono::milliseconds& timeout_) : - map(std::make_unique(timeout_)) + explicit SServerMonMap() : + map(std::make_unique()) { }; std::mutex sync; std::unique_ptr map; }; - using ClientMonMapT = Util::CExpirationMap; + using ClientMonMapT = std::map; struct SClientMonMap { - explicit SClientMonMap(const std::chrono::milliseconds& timeout_) : - map(std::make_unique(timeout_)) + explicit SClientMonMap() : + map(std::make_unique()) { }; std::mutex sync; diff --git a/ecal/core/src/readwrite/ecal_reader.cpp b/ecal/core/src/readwrite/ecal_reader.cpp index 6b84834379..122db1b657 100644 --- a/ecal/core/src/readwrite/ecal_reader.cpp +++ b/ecal/core/src/readwrite/ecal_reader.cpp @@ -89,10 +89,6 @@ namespace eCAL counter << std::chrono::steady_clock::now().time_since_epoch().count(); m_topic_id = counter.str(); - // set registration expiration - const std::chrono::milliseconds registration_timeout(Config::GetRegistrationTimeoutMs()); - m_pub_map.set_expiration(registration_timeout); - // start transport layers InitializeLayers(); StartTransportLayer(); @@ -547,7 +543,6 @@ namespace eCAL void CDataReader::CheckConnections() { const std::lock_guard lock(m_pub_map_mtx); - m_pub_map.erase_expired(); if (m_pub_map.empty()) { diff --git a/ecal/core/src/readwrite/ecal_reader.h b/ecal/core/src/readwrite/ecal_reader.h index 6c4a0cf401..594b62bd74 100644 --- a/ecal/core/src/readwrite/ecal_reader.h +++ b/ecal/core/src/readwrite/ecal_reader.h @@ -29,7 +29,6 @@ #include "serialization/ecal_serialize_sample_payload.h" #include "serialization/ecal_serialize_sample_registration.h" -#include "util/ecal_expmap.h" #include "util/frequency_calculator.h" #include @@ -143,7 +142,7 @@ namespace eCAL Subscriber::Configuration m_config; std::atomic m_connected; - using PublicationMapT = Util::CExpirationMap>; + using PublicationMapT = std::map>; mutable std::mutex m_pub_map_mtx; PublicationMapT m_pub_map; diff --git a/ecal/core/src/readwrite/ecal_writer.cpp b/ecal/core/src/readwrite/ecal_writer.cpp index a8365a09fd..a4f93d8867 100644 --- a/ecal/core/src/readwrite/ecal_writer.cpp +++ b/ecal/core/src/readwrite/ecal_writer.cpp @@ -115,10 +115,6 @@ namespace eCAL counter << std::chrono::steady_clock::now().time_since_epoch().count(); m_topic_id = counter.str(); - // set registration expiration - const std::chrono::milliseconds registration_timeout(Config::GetRegistrationTimeoutMs()); - m_sub_map.set_expiration(registration_timeout); - // mark as created m_created = true; } @@ -594,7 +590,6 @@ namespace eCAL void CDataWriter::CheckConnections() { const std::lock_guard lock(m_sub_map_mtx); - m_sub_map.erase_expired(); if (m_sub_map.empty()) { diff --git a/ecal/core/src/readwrite/ecal_writer.h b/ecal/core/src/readwrite/ecal_writer.h index acb6c784bc..ddbd630020 100644 --- a/ecal/core/src/readwrite/ecal_writer.h +++ b/ecal/core/src/readwrite/ecal_writer.h @@ -30,7 +30,6 @@ #include #include "serialization/ecal_serialize_sample_registration.h" -#include "util/ecal_expmap.h" #include "util/frequency_calculator.h" #if ECAL_CORE_TRANSPORT_UDP @@ -156,7 +155,7 @@ namespace eCAL std::atomic m_connected; - using SSubscriptionMapT = Util::CExpirationMap>; + using SSubscriptionMapT = std::map>; mutable std::mutex m_sub_map_mtx; SSubscriptionMapT m_sub_map; diff --git a/ecal/core/src/registration/ecal_registration_receiver.cpp b/ecal/core/src/registration/ecal_registration_receiver.cpp index ed09206072..70e3d5ce89 100644 --- a/ecal/core/src/registration/ecal_registration_receiver.cpp +++ b/ecal/core/src/registration/ecal_registration_receiver.cpp @@ -27,12 +27,15 @@ #include "registration/ecal_registration_receiver.h" +#include "registration/ecal_registration_timeout_provider.h" +#include "util/ecal_thread.h" + #include "registration/udp/ecal_registration_receiver_udp.h" #if ECAL_CORE_REGISTRATION_SHM #include "registration/shm/ecal_registration_receiver_shm.h" #endif - #include "io/udp/ecal_udp_configurations.h" +#include #include #include #include @@ -47,7 +50,11 @@ namespace eCAL std::atomic CRegistrationReceiver::m_created; CRegistrationReceiver::CRegistrationReceiver() - : m_use_registration_udp(false) + : m_timeout_provider(nullptr) + , m_timeout_provider_thread(nullptr) + , m_registration_receiver_udp(nullptr) + , m_registration_receiver_shm(nullptr) + , m_use_registration_udp(false) , m_use_registration_shm(false) , m_sample_applier(Config::IsNetworkEnabled(), false, Process::GetHostGroupName(), Process::GetProcessID()) { @@ -75,6 +82,20 @@ namespace eCAL { if(m_created) return; + m_timeout_provider = std::make_unique>( + std::chrono::milliseconds(Config::GetMonitoringTimeoutMs()), + [this](const Registration::Sample& sample_) + { + return m_sample_applier.ApplySample(sample_); + } + ); + m_sample_applier.SetCustomApplySampleCallback("timeout", [this](const eCAL::Registration::Sample& sample_) + { + m_timeout_provider->ApplySample(sample_); + }); + m_timeout_provider_thread = std::make_unique([this]() {m_timeout_provider->CheckForTimeouts(); }); + m_timeout_provider_thread->start(std::chrono::milliseconds(100)); + // receive registration via udp or shared memory m_use_registration_shm = Config::IsShmRegistrationEnabled(); m_use_registration_udp = !m_use_registration_shm; @@ -111,6 +132,10 @@ namespace eCAL } #endif + m_timeout_provider_thread = nullptr; + m_sample_applier.RemCustomApplySampleCallback("timeout"); + m_timeout_provider = nullptr; + m_created = false; } diff --git a/ecal/core/src/registration/ecal_registration_receiver.h b/ecal/core/src/registration/ecal_registration_receiver.h index 2a6d1c6156..bf94610070 100644 --- a/ecal/core/src/registration/ecal_registration_receiver.h +++ b/ecal/core/src/registration/ecal_registration_receiver.h @@ -48,6 +48,13 @@ namespace eCAL class CRegistrationReceiverUDP; class CRegistrationReceiverSHM; + namespace Registration + { + template + class CTimeoutProvider; + } + class CCallbackThread; + class CRegistrationReceiver { public: @@ -72,6 +79,10 @@ namespace eCAL // why is this a static variable? can someone explain? static std::atomic m_created; + // this class gets samples and tracks them for timouts + std::unique_ptr> m_timeout_provider; + std::unique_ptr m_timeout_provider_thread; + std::unique_ptr m_registration_receiver_udp; #if ECAL_CORE_REGISTRATION_SHM std::unique_ptr m_registration_receiver_shm; diff --git a/ecal/core/src/registration/ecal_registration_timeout_provider.cpp b/ecal/core/src/registration/ecal_registration_timeout_provider.cpp new file mode 100644 index 0000000000..2817190807 --- /dev/null +++ b/ecal/core/src/registration/ecal_registration_timeout_provider.cpp @@ -0,0 +1,121 @@ +/* ========================= eCAL LICENSE ================================= + * + * Copyright (C) 2016 - 2024 Continental Corporation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * ========================= eCAL LICENSE ================================= +*/ + +/** + * @brief eCAL registration receiver + * + * Receives registration information from external eCAL processes and forwards them to + * the internal publisher/subscriber, server/clients. + * +**/ + +#include "registration/ecal_registration_timeout_provider.h" + + +namespace eCAL +{ + namespace Registration + { + bool IsUnregistrationSample(const Registration::Sample& sample_) + { + return sample_.cmd_type == bct_unreg_client || + sample_.cmd_type == bct_unreg_process || + sample_.cmd_type == bct_unreg_publisher || + sample_.cmd_type == bct_unreg_service || + sample_.cmd_type == bct_unreg_subscriber; + } + + Sample CreateUnregisterSample(const Sample& sample_) + { + Sample unregister_sample; + + unregister_sample.cmd_type = GetUnregistrationType(sample_); + unregister_sample.identifier = sample_.identifier; + + if (IsProcessRegistration(unregister_sample)) + { + const auto& sample_process = sample_.process; + auto& unregister_sample_process = unregister_sample.process; + unregister_sample_process.pname = sample_process.pname; + unregister_sample_process.uname = sample_process.uname; + } + + if (IsTopicRegistration(unregister_sample)) + { + const auto& sample_topic = sample_.topic; + auto& unregister_sample_topic = unregister_sample.topic; + unregister_sample_topic.hgname = sample_topic.hgname; + unregister_sample_topic.pname = sample_topic.pname; + unregister_sample_topic.tname = sample_topic.tname; + unregister_sample_topic.uname = sample_topic.uname; + } + + if (unregister_sample.cmd_type == bct_unreg_service) + { + const auto& sample_service = sample_.service; + auto& unregister_sample_service = unregister_sample.service; + unregister_sample_service.pname = sample_service.pname; + unregister_sample_service.sname = sample_service.sname; + unregister_sample_service.uname = sample_service.uname; + unregister_sample_service.version = sample_service.version; + } + + if (unregister_sample.cmd_type == bct_unreg_client) + { + const auto& sample_client = sample_.client; + auto& unregister_sample_client = unregister_sample.client; + + unregister_sample_client.pname = sample_client.pname; + unregister_sample_client.sname = sample_client.sname; + unregister_sample_client.uname = sample_client.uname; + unregister_sample_client.version = sample_client.version; + } + return unregister_sample; + } + + eCmdType GetUnregistrationType(const Registration::Sample& sample_) + { + if (sample_.cmd_type == bct_reg_client) + return bct_unreg_client; + if (sample_.cmd_type == bct_reg_process) + return bct_unreg_process; + if (sample_.cmd_type == bct_reg_publisher) + return bct_unreg_publisher; + if (sample_.cmd_type == bct_reg_service) + return bct_unreg_service; + if (sample_.cmd_type == bct_reg_subscriber) + return bct_unreg_subscriber; + return bct_none; + } + + bool IsProcessRegistration(const Registration::Sample& sample_) + { + return sample_.cmd_type == bct_reg_process || + sample_.cmd_type == bct_unreg_process; + } + + bool IsTopicRegistration(const Registration::Sample& sample_) + { + return sample_.cmd_type == bct_reg_publisher || + sample_.cmd_type == bct_reg_subscriber || + sample_.cmd_type == bct_unreg_publisher || + sample_.cmd_type == bct_unreg_subscriber; + } + } +} diff --git a/ecal/core/src/registration/ecal_registration_timeout_provider.h b/ecal/core/src/registration/ecal_registration_timeout_provider.h new file mode 100644 index 0000000000..8aac623536 --- /dev/null +++ b/ecal/core/src/registration/ecal_registration_timeout_provider.h @@ -0,0 +1,115 @@ +/* ========================= eCAL LICENSE ================================= + * + * Copyright (C) 2016 - 2024 Continental Corporation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * ========================= eCAL LICENSE ================================= +*/ + +/** + * @brief eCAL registration timeout provider + * + * Class that tracks incoming samples. + * It will call an unregistration sample callback, whenenver a sample has "timed out". + * This can be treated the same way if the other process had sent an unregister sample. + * +**/ + +#pragma once + +#include +#include + +#include + +namespace eCAL +{ + namespace Registration + { + bool IsUnregistrationSample(const Registration::Sample& sample_); + + // This function turns a registration sample into an unregistration sample + // This could happen also in another class / namespace + Registration::Sample CreateUnregisterSample(const Registration::Sample& sample_); + + // Returns the corresponding unregistration type + // RegSubscriber -> UnregSubscriber, ... + // Anything else will return bct_none + eCmdType GetUnregistrationType(const Registration::Sample& sample_); + + bool IsProcessRegistration(const Registration::Sample& sample_); + bool IsTopicRegistration(const Registration::Sample& sample_); + + + template < class ClockType = std::chrono::steady_clock> + class CTimeoutProvider + { + public: + CTimeoutProvider(const typename ClockType::duration& timeout_, const RegistrationApplySampleCallbackT& apply_sample_callback_) + : sample_tracker(timeout_) + , apply_sample_callback(apply_sample_callback_) + {} + + bool ApplySample(const Registration::Sample& sample_) { + // Is unregistration sample? + if (IsUnregistrationSample(sample_)) + { + DeleteUnregisterSample(sample_); + } + else + { + UpdateSample(sample_); + } + return true; + } + + // This function checks for timeouts. This means it scans the map for expired samples + // It then applies unregistration samples for all internally expired samples. + void CheckForTimeouts() + { + std::map expired_samples; + + { + std::lock_guard lock(sample_tracker_mutex); + expired_samples = sample_tracker.erase_expired(); + } + + for (const auto& registration_sample : expired_samples) + { + Sample unregistration_sample = CreateUnregisterSample(registration_sample.second); + apply_sample_callback(unregistration_sample); + } + } + + private: + void DeleteUnregisterSample(const Sample& sample_) + { + std::lock_guard lock(sample_tracker_mutex); + sample_tracker.erase(sample_.identifier); + } + + void UpdateSample(const Sample& sample_) + { + std::lock_guard lock(sample_tracker_mutex); + sample_tracker[sample_.identifier] = sample_; + } + + using SampleTrackerMap = Util::CExpirationMap; + SampleTrackerMap sample_tracker; + std::mutex sample_tracker_mutex; + + RegistrationApplySampleCallbackT apply_sample_callback; + }; + } +} \ No newline at end of file diff --git a/ecal/core/src/serialization/ecal_struct_sample_registration.h b/ecal/core/src/serialization/ecal_struct_sample_registration.h index f3a65fd98f..54b0bdcaea 100644 --- a/ecal/core/src/serialization/ecal_struct_sample_registration.h +++ b/ecal/core/src/serialization/ecal_struct_sample_registration.h @@ -34,6 +34,7 @@ #include #include #include +#include namespace eCAL { @@ -69,6 +70,10 @@ namespace eCAL struct OSInfo { std::string osname; // name + + bool operator==(const OSInfo& other) const { + return osname == other.osname; + } }; // eCAL host @@ -76,6 +81,10 @@ namespace eCAL { std::string hname; // host name OSInfo os; // operating system details + + bool operator==(const Host& other) const { + return hname == other.hname && os == other.os; + } }; // Process severity information @@ -84,23 +93,40 @@ namespace eCAL eProcessSeverity severity = proc_sev_unknown; // severity eProcessSeverityLevel severity_level = proc_sev_level_unknown; // severity level std::string info; // info string + + bool operator==(const ProcessState& other) const { + return severity == other.severity && severity_level == other.severity_level && info == other.info; + } }; // Transport layer parameters for ecal udp multicast struct LayerParUdpMC { + bool operator==(const LayerParUdpMC& other) const { + // Assuming there are no member variables to compare + return true; + } }; // Transport layer parameters for ecal tcp struct LayerParTcp { int32_t port = 0; // tcp writers port number + + bool operator==(const LayerParTcp& other) const { + return port == other.port; + } + }; // Transport layer parameters for ecal shm struct LayerParShm { std::list memory_file_list; // list of memory file names + + bool operator==(const LayerParShm& other) const { + return memory_file_list == other.memory_file_list; + } }; // Connection parameter for reader/writer @@ -109,6 +135,12 @@ namespace eCAL LayerParUdpMC layer_par_udpmc; // parameter for ecal udp multicast LayerParTcp layer_par_tcp; // parameter for ecal tcp LayerParShm layer_par_shm; // parameter for ecal shm + + bool operator==(const ConnectionPar& other) const { + return layer_par_udpmc == other.layer_par_udpmc && + layer_par_tcp == other.layer_par_tcp && + layer_par_shm == other.layer_par_shm; + } }; // Transport layer information @@ -119,6 +151,14 @@ namespace eCAL bool enabled = false; // transport layer enabled ? bool active = false; // transport layer in use ? ConnectionPar par_layer; // transport layer parameter + + bool operator==(const TLayer& other) const { + return type == other.type && + version == other.version && + enabled == other.enabled && + active == other.active && + par_layer == other.par_layer; + } }; // Process information @@ -135,6 +175,20 @@ namespace eCAL int32_t component_init_state = 0; // eCAL component initialization state (eCAL::Initialize(..)) std::string component_init_info; // like comp_init_state as a human-readable string (pub|sub|srv|mon|log|time|proc) std::string ecal_runtime_version; // loaded/runtime eCAL version of a component + + bool operator==(const Process& other) const { + return rclock == other.rclock && + hgname == other.hgname && + pname == other.pname && + uname == other.uname && + pparam == other.pparam && + state == other.state && + tsync_state == other.tsync_state && + tsync_mod_name == other.tsync_mod_name && + component_init_state == other.component_init_state && + component_init_info == other.component_init_info && + ecal_runtime_version == other.ecal_runtime_version; + } }; // eCAL topic information @@ -160,6 +214,25 @@ namespace eCAL int32_t dfreq = 0; // data frequency (send / receive registrations per second) [mHz] std::map attr; // generic topic description + + bool operator==(const Topic& other) const { + return rclock == other.rclock && + hgname == other.hgname && + pname == other.pname && + uname == other.uname && + tname == other.tname && + direction == other.direction && + tdatatype == other.tdatatype && + tlayer == other.tlayer && + tsize == other.tsize && + connections_loc == other.connections_loc && + connections_ext == other.connections_ext && + message_drops == other.message_drops && + did == other.did && + dclock == other.dclock && + dfreq == other.dfreq && + attr == other.attr; + } }; struct SampleIdentifier @@ -168,6 +241,11 @@ namespace eCAL int32_t process_id = 0; // process id which produced the sample std::string host_name; // host which produced the sample + bool operator==(const SampleIdentifier& other) const { + return entity_id == other.entity_id && + process_id == other.process_id && + host_name == other.host_name; + } bool operator<(const SampleIdentifier& other) const { @@ -186,6 +264,16 @@ namespace eCAL Service::Service service; // service information Service::Client client ; // client information Topic topic; // topic information + + bool operator==(const Sample& other) const { + return identifier == other.identifier && + cmd_type == other.cmd_type && + host == other.host && + process == other.process && + service == other.service && + client == other.client && + topic == other.topic; + } }; // Registration sample list diff --git a/ecal/core/src/serialization/ecal_struct_service.h b/ecal/core/src/serialization/ecal_struct_service.h index 241234610b..470f029845 100644 --- a/ecal/core/src/serialization/ecal_struct_service.h +++ b/ecal/core/src/serialization/ecal_struct_service.h @@ -27,6 +27,7 @@ #include #include #include +#include namespace eCAL { @@ -50,6 +51,16 @@ namespace eCAL std::string error; // Error message int32_t id = 0; // Session id eMethodCallState state = none; // Method call state + + bool operator==(const ServiceHeader& other) const { + return hname == other.hname && + sname == other.sname && + sid == other.sid && + mname == other.mname && + error == other.error && + id == other.id && + state == other.state; + } }; // Service Request @@ -57,6 +68,11 @@ namespace eCAL { ServiceHeader header; // Common service header std::string request; // Request payload + + bool operator==(const Request& other) const { + return header == other.header && + request == other.request; + } }; // Service Response @@ -65,6 +81,12 @@ namespace eCAL ServiceHeader header; // Common service header std::string response; // Response payload int64_t ret_state = 0; // Callback return state + + bool operator==(const Response& other) const { + return header == other.header && + response == other.response && + ret_state == other.ret_state; + } }; // Service Method @@ -76,6 +98,15 @@ namespace eCAL std::string resp_type; // Response type std::string resp_desc; // Response descriptor int64_t call_count = 0; // Call counter + + bool operator==(const Method& other) const { + return mname == other.mname && + req_type == other.req_type && + req_desc == other.req_desc && + resp_type == other.resp_type && + resp_desc == other.resp_desc && + call_count == other.call_count; + } }; // Service @@ -89,6 +120,17 @@ namespace eCAL uint32_t version = 0; // Service protocol version uint32_t tcp_port_v0 = 0; // The TCP port used for that service (v0) uint32_t tcp_port_v1 = 0; // The TCP port used for that service (v1) + + bool operator==(const Service& other) const { + return rclock == other.rclock && + pname == other.pname && + uname == other.uname && + sname == other.sname && + methods == other.methods && + version == other.version && + tcp_port_v0 == other.tcp_port_v0 && + tcp_port_v1 == other.tcp_port_v1; + } }; // Client @@ -100,6 +142,15 @@ namespace eCAL std::string sname; // Service name std::vector methods; // List of methods uint32_t version = 0; // Client protocol version + + bool operator==(const Client& other) const { + return rclock == other.rclock && + pname == other.pname && + uname == other.uname && + sname == other.sname && + methods == other.methods && + version == other.version; + } }; } } diff --git a/ecal/core/src/util/ecal_expmap.h b/ecal/core/src/util/ecal_expmap.h index fd71dcfca1..496ea80e72 100644 --- a/ecal/core/src/util/ecal_expmap.h +++ b/ecal/core/src/util/ecal_expmap.h @@ -300,18 +300,22 @@ namespace eCAL * This function erases all expired key-value pairs from the internal map / timestamp list. * The CExpirationMap class does not call this function internally, it has to be called explicitly by the user. */ - void erase_expired(std::list* keys_erased_from_expired_map = nullptr) //-V826 + std::map erase_expired() { + std::map erased_values; // To erase timestamps from the map, the time point of the last access is calculated, all older entries will be erased. // Since the list is sorted, we need to remove everything from the first element until the eviction limit. typename ClockType::time_point eviction_limit = get_curr_time() - _timeout; auto it(_access_timestamps_list.begin()); while (it != _access_timestamps_list.end() && it->timestamp < eviction_limit) { - if (keys_erased_from_expired_map != nullptr) keys_erased_from_expired_map->push_back(it->corresponding_map_key); + auto erased_value = _internal_map.find(it->corresponding_map_key); + // for performance reason, we should be able to move from the map, however with C++17 we can use std::map::extract + erased_values[it->corresponding_map_key] = erased_value->second.map_value; _internal_map.erase(it->corresponding_map_key); // erase the element from the map it = _access_timestamps_list.erase(it); // erase the element from the list } + return erased_values; } // Remove specific element from the cache diff --git a/ecal/tests/CMakeLists.txt b/ecal/tests/CMakeLists.txt index d598e1da60..4af1789a3e 100644 --- a/ecal/tests/CMakeLists.txt +++ b/ecal/tests/CMakeLists.txt @@ -32,8 +32,10 @@ add_subdirectory(cpp/descgate_test) add_subdirectory(cpp/config_test) + if(ECAL_CORE_REGISTRATION) add_subdirectory(cpp/util_test) + add_subdirectory(cpp/registration_test) endif() add_subdirectory(cpp/event_test) diff --git a/ecal/tests/cpp/descgate_test/src/getpublisher.cpp b/ecal/tests/cpp/descgate_test/src/getpublisher.cpp index 4d696477eb..3209a19cf2 100644 --- a/ecal/tests/cpp/descgate_test/src/getpublisher.cpp +++ b/ecal/tests/cpp/descgate_test/src/getpublisher.cpp @@ -40,6 +40,13 @@ namespace return reg_sample; } + eCAL::Registration::Sample DestroyPublisher(const std::string& topic_name_, std::uint64_t topic_id_) + { + eCAL::Registration::Sample reg_sample = CreatePublisher(topic_name_, topic_id_); + reg_sample.cmd_type = eCAL::bct_unreg_publisher; + return reg_sample; + } + eCAL::Registration::Sample CreateSubscriber(const std::string& topic_name_, std::uint64_t topic_id_) { eCAL::Registration::Sample reg_sample; @@ -52,6 +59,13 @@ namespace return reg_sample; } + eCAL::Registration::Sample DestroySubscriber(const std::string& topic_name_, std::uint64_t topic_id_) + { + eCAL::Registration::Sample reg_sample = CreateSubscriber(topic_name_, topic_id_); + reg_sample.cmd_type = eCAL::bct_unreg_subscriber; + return reg_sample; + } + eCAL::Registration::Sample CreateService(const std::string& service_name_, std::uint64_t service_id_) { eCAL::Registration::Sample reg_sample; @@ -65,6 +79,13 @@ namespace return reg_sample; } + eCAL::Registration::Sample DestroyService(const std::string& service_name_, std::uint64_t service_id_) + { + eCAL::Registration::Sample reg_sample = CreateService(service_name_, service_id_); + reg_sample.cmd_type = eCAL::bct_unreg_service; + return reg_sample; + } + eCAL::Registration::Sample CreateClient(const std::string& client_name_, std::uint64_t service_id_) { eCAL::Registration::Sample reg_sample; @@ -77,24 +98,29 @@ namespace reg_sample.client.methods.push_back(method); return reg_sample; } + + eCAL::Registration::Sample DestroyClient(const std::string& client_name_, std::uint64_t service_id_) + { + eCAL::Registration::Sample reg_sample = CreateClient(client_name_, service_id_); + reg_sample.cmd_type = eCAL::bct_unreg_client; + return reg_sample; + } } TEST(core_cpp_descgate, PublisherExpiration) { - eCAL::CDescGate desc_gate(std::chrono::milliseconds(DESCGATE_EXPIRATION_MS)); + eCAL::CDescGate desc_gate; // apply sample 5 times, sample should not expire auto runs(5); while ((runs--) != 0) { desc_gate.ApplySample(CreatePublisher("pub1", 1), eCAL::tl_none); - std::this_thread::sleep_for(std::chrono::milliseconds(DESCGATE_EXPIRATION_MS / 2)); - EXPECT_EQ(1, desc_gate.GetPublishers().size()); } // now let the sample expire - std::this_thread::sleep_for(std::chrono::milliseconds(DESCGATE_EXPIRATION_MS)); + desc_gate.ApplySample(DestroyPublisher("pub1", 1), eCAL::tl_none); // sample should be expired EXPECT_EQ(0, desc_gate.GetPublishers().size()); @@ -102,7 +128,7 @@ TEST(core_cpp_descgate, PublisherExpiration) TEST(core_cpp_descgate, PublisherQualities) { - eCAL::CDescGate desc_gate(std::chrono::milliseconds(DESCGATE_EXPIRATION_MS)); + eCAL::CDescGate desc_gate; // create and apply publisher pub1 desc_gate.ApplySample(CreatePublisher("pub1", 1), eCAL::tl_none); @@ -141,7 +167,8 @@ TEST(core_cpp_descgate, PublisherQualities) } // now let the sample expire - std::this_thread::sleep_for(std::chrono::milliseconds(DESCGATE_EXPIRATION_MS)); + desc_gate.ApplySample(DestroyPublisher("pub1", 1), eCAL::tl_none); + desc_gate.ApplySample(DestroyPublisher("pub2", 2), eCAL::tl_none); // sample should be expired EXPECT_EQ(0, desc_gate.GetPublishers().size()); @@ -149,7 +176,7 @@ TEST(core_cpp_descgate, PublisherQualities) TEST(core_cpp_descgate, ManyPublisher) { - eCAL::CDescGate desc_gate(std::chrono::milliseconds(DESCGATE_EXPIRATION_MS)); + eCAL::CDescGate desc_gate; constexpr int num_pub(1000); for (auto pub = 0; pub < num_pub; ++pub) @@ -162,7 +189,11 @@ TEST(core_cpp_descgate, ManyPublisher) EXPECT_EQ(num_pub, desc_gate.GetPublishers().size()); // now let the samples expire - std::this_thread::sleep_for(std::chrono::milliseconds(DESCGATE_EXPIRATION_MS)); + for (auto pub = 0; pub < num_pub; ++pub) + { + // create registration sample for pub-xx + desc_gate.ApplySample(DestroyPublisher("pub" + std::to_string(pub), pub), eCAL::tl_none); + } // samples should be expired EXPECT_EQ(0, desc_gate.GetPublishers().size()); @@ -170,20 +201,18 @@ TEST(core_cpp_descgate, ManyPublisher) TEST(core_cpp_descgate, SubscriberExpiration) { - eCAL::CDescGate desc_gate(std::chrono::milliseconds(DESCGATE_EXPIRATION_MS)); + eCAL::CDescGate desc_gate; // apply sample 5 times, sample should not expire auto runs(5); while ((runs--) != 0) { desc_gate.ApplySample(CreateSubscriber("sub1", 1), eCAL::tl_none); - std::this_thread::sleep_for(std::chrono::milliseconds(DESCGATE_EXPIRATION_MS / 2)); - EXPECT_EQ(1, desc_gate.GetSubscribers().size()); } // now let the sample expire - std::this_thread::sleep_for(std::chrono::milliseconds(DESCGATE_EXPIRATION_MS)); + desc_gate.ApplySample(DestroySubscriber("sub1", 1), eCAL::tl_none); // sample should be expired EXPECT_EQ(0, desc_gate.GetSubscribers().size()); @@ -191,7 +220,7 @@ TEST(core_cpp_descgate, SubscriberExpiration) TEST(core_cpp_descgate, SubscriberQualities) { - eCAL::CDescGate desc_gate(std::chrono::milliseconds(DESCGATE_EXPIRATION_MS)); + eCAL::CDescGate desc_gate; // create and apply subscriber sub1 desc_gate.ApplySample(CreateSubscriber("sub1", 1), eCAL::tl_none); @@ -230,7 +259,8 @@ TEST(core_cpp_descgate, SubscriberQualities) } // now let the sample expire - std::this_thread::sleep_for(std::chrono::milliseconds(DESCGATE_EXPIRATION_MS)); + desc_gate.ApplySample(DestroySubscriber("sub1", 1), eCAL::tl_none); + desc_gate.ApplySample(DestroySubscriber("sub2", 2), eCAL::tl_none); // sample should be expired EXPECT_EQ(0, desc_gate.GetSubscribers().size()); @@ -238,7 +268,7 @@ TEST(core_cpp_descgate, SubscriberQualities) TEST(core_cpp_descgate, ManySubscriber) { - eCAL::CDescGate desc_gate(std::chrono::milliseconds(DESCGATE_EXPIRATION_MS)); + eCAL::CDescGate desc_gate; constexpr int num_sub(1000); for (auto sub = 0; sub < num_sub; ++sub) @@ -251,7 +281,11 @@ TEST(core_cpp_descgate, ManySubscriber) EXPECT_EQ(num_sub, desc_gate.GetSubscribers().size()); // now let the samples expire - std::this_thread::sleep_for(std::chrono::milliseconds(DESCGATE_EXPIRATION_MS)); + for (auto sub = 0; sub < num_sub; ++sub) + { + // create registration sample for sub-xx + desc_gate.ApplySample(DestroySubscriber("sub" + std::to_string(sub), sub), eCAL::tl_none); + } // samples should be expired EXPECT_EQ(0, desc_gate.GetSubscribers().size()); @@ -259,20 +293,19 @@ TEST(core_cpp_descgate, ManySubscriber) TEST(core_cpp_descgate, ServiceExpiration) { - eCAL::CDescGate desc_gate(std::chrono::milliseconds(DESCGATE_EXPIRATION_MS)); + eCAL::CDescGate desc_gate; // apply sample 5 times, sample should not expire auto runs(5); while ((runs--) != 0) { desc_gate.ApplySample(CreateService("service1", 1), eCAL::tl_none); - std::this_thread::sleep_for(std::chrono::milliseconds(DESCGATE_EXPIRATION_MS / 2)); EXPECT_EQ(1, desc_gate.GetServices().size()); } // now let the sample expire - std::this_thread::sleep_for(std::chrono::milliseconds(DESCGATE_EXPIRATION_MS)); + desc_gate.ApplySample(DestroyService("service1", 1), eCAL::tl_none); // sample should be expired EXPECT_EQ(0, desc_gate.GetServices().size()); @@ -280,7 +313,7 @@ TEST(core_cpp_descgate, ServiceExpiration) TEST(core_cpp_descgate, ManyService) { - eCAL::CDescGate desc_gate(std::chrono::milliseconds(DESCGATE_EXPIRATION_MS)); + eCAL::CDescGate desc_gate; constexpr int num_service(1000); for (auto service = 0; service < num_service; ++service) @@ -293,7 +326,11 @@ TEST(core_cpp_descgate, ManyService) EXPECT_EQ(num_service, desc_gate.GetServices().size()); // now let the samples expire - std::this_thread::sleep_for(std::chrono::milliseconds(DESCGATE_EXPIRATION_MS)); + for (auto service = 0; service < num_service; ++service) + { + // create registration sample for service-xx + desc_gate.ApplySample(DestroyService("service" + std::to_string(service), service), eCAL::tl_none); + } // samples should be expired EXPECT_EQ(0, desc_gate.GetServices().size()); @@ -301,20 +338,18 @@ TEST(core_cpp_descgate, ManyService) TEST(core_cpp_descgate, ClientExpiration) { - eCAL::CDescGate desc_gate(std::chrono::milliseconds(DESCGATE_EXPIRATION_MS)); + eCAL::CDescGate desc_gate; // apply sample 5 times, sample should not expire auto runs(5); while ((runs--) != 0) { desc_gate.ApplySample(CreateClient("client1", 1), eCAL::tl_none); - std::this_thread::sleep_for(std::chrono::milliseconds(DESCGATE_EXPIRATION_MS / 2)); - EXPECT_EQ(1, desc_gate.GetClients().size()); } // now let the sample expire - std::this_thread::sleep_for(std::chrono::milliseconds(DESCGATE_EXPIRATION_MS)); + desc_gate.ApplySample(DestroyClient("client1", 1), eCAL::tl_none); // sample should be expired EXPECT_EQ(0, desc_gate.GetClients().size()); @@ -322,7 +357,7 @@ TEST(core_cpp_descgate, ClientExpiration) TEST(core_cpp_descgate, ManyClient) { - eCAL::CDescGate desc_gate(std::chrono::milliseconds(DESCGATE_EXPIRATION_MS)); + eCAL::CDescGate desc_gate; constexpr int num_client(1000); for (auto client = 0; client < num_client; ++client) @@ -335,7 +370,11 @@ TEST(core_cpp_descgate, ManyClient) EXPECT_EQ(num_client, desc_gate.GetClients().size()); // now let the samples expire - std::this_thread::sleep_for(std::chrono::milliseconds(DESCGATE_EXPIRATION_MS)); + for (auto client = 0; client < num_client; ++client) + { + // create registration sample for client-xx + desc_gate.ApplySample(DestroyClient("client" + std::to_string(client), client), eCAL::tl_none); + } // samples should be expired EXPECT_EQ(0, desc_gate.GetClients().size()); diff --git a/ecal/tests/cpp/expmap_test/src/expmap_test.cpp b/ecal/tests/cpp/expmap_test/src/expmap_test.cpp index 3a7f57aaef..4d6a6b3292 100644 --- a/ecal/tests/cpp/expmap_test/src/expmap_test.cpp +++ b/ecal/tests/cpp/expmap_test/src/expmap_test.cpp @@ -104,19 +104,69 @@ TEST(core_cpp_core, ExpMap_SetGet) TestingClock::increment_time(std::chrono::milliseconds(150)); expmap["B"] = 2; expmap["C"] = 3; - expmap.erase_expired(); - EXPECT_EQ(3, expmap.size()); + + { + auto erased = expmap.erase_expired(); + EXPECT_EQ(3, expmap.size()); + EXPECT_EQ(0, erased.size()); + } + TestingClock::increment_time(std::chrono::milliseconds(150)); expmap["B"] = 4; - expmap.erase_expired(); - EXPECT_EQ(2, expmap.size()); + + { + auto erased = expmap.erase_expired(); + EXPECT_EQ(2, expmap.size()); + EXPECT_EQ(1, erased.size()); + auto a = erased.find("A"); + EXPECT_NE(a, erased.end()); + EXPECT_EQ(a->second, 1); + } + TestingClock::increment_time(std::chrono::milliseconds(150)); - expmap.erase_expired(); - EXPECT_EQ(1, expmap.size()); + + { + auto erased = expmap.erase_expired(); + EXPECT_EQ(1, expmap.size()); + EXPECT_EQ(1, erased.size()); + auto c = erased.find("C"); + EXPECT_NE(c, erased.end()); + EXPECT_EQ(c->second, 3); + } + // sleep TestingClock::increment_time(std::chrono::milliseconds(150)); } +TEST(core_cpp_core, ExpMap_EraseMultiple) +{ + // create the map with 2500 ms expiration + eCAL::Util::CExpirationMap expmap(std::chrono::milliseconds(200)); + + expmap["A"] = 1; + expmap["B"] = 2; + expmap["C"] = 3; + + TestingClock::increment_time(std::chrono::milliseconds(250)); + + auto erased = expmap.erase_expired(); + EXPECT_EQ(0, expmap.size()); + EXPECT_EQ(3, erased.size()); + + auto a = erased.find("A"); + EXPECT_NE(a, erased.end()); + EXPECT_EQ(a->second, 1); + + auto b = erased.find("B"); + EXPECT_NE(b, erased.end()); + EXPECT_EQ(b->second, 2); + + auto c = erased.find("C"); + EXPECT_NE(c, erased.end()); + EXPECT_EQ(c->second, 3); +} + + TEST(core_cpp_core, ExpMap_EraseEmptyMap) { eCAL::Util::CExpirationMap expmap(std::chrono::milliseconds(200)); diff --git a/ecal/tests/cpp/registration_test/CMakeLists.txt b/ecal/tests/cpp/registration_test/CMakeLists.txt new file mode 100644 index 0000000000..dcb2781346 --- /dev/null +++ b/ecal/tests/cpp/registration_test/CMakeLists.txt @@ -0,0 +1,46 @@ +# ========================= eCAL LICENSE ================================= +# +# Copyright (C) 2016 - 2024 Continental Corporation +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +# ========================= eCAL LICENSE ================================= + +project(test_registration) + +find_package(Threads REQUIRED) +find_package(GTest REQUIRED) + +set(registration_test_src + src/registration_timout_provider_test.cpp + ${ECAL_CORE_PROJECT_ROOT}/core/src/registration/ecal_registration_timeout_provider.cpp +) + +ecal_add_gtest(${PROJECT_NAME} ${registration_test_src}) + +target_include_directories(${PROJECT_NAME} PRIVATE $) + +target_link_libraries(${PROJECT_NAME} + PRIVATE + $<$:dl> + $<$,$>>:rt> + Threads::Threads +) + + +target_compile_features(${PROJECT_NAME} PRIVATE cxx_std_14) +target_compile_definitions(${PROJECT_NAME} PRIVATE ECAL_CORE_TRANSPORT_SHM) + +ecal_install_gtest(${PROJECT_NAME}) + +set_property(TARGET ${PROJECT_NAME} PROPERTY FOLDER tests/cpp/core) diff --git a/ecal/tests/cpp/registration_test/src/registration_timout_provider_test.cpp b/ecal/tests/cpp/registration_test/src/registration_timout_provider_test.cpp new file mode 100644 index 0000000000..ff961d620b --- /dev/null +++ b/ecal/tests/cpp/registration_test/src/registration_timout_provider_test.cpp @@ -0,0 +1,257 @@ +/* ========================= eCAL LICENSE ================================= + * + * Copyright (C) 2016 - 2024 Continental Corporation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * ========================= eCAL LICENSE ================================= +*/ + +#include + +#include + +#include "registration/ecal_registration_timeout_provider.h" +#include "serialization/ecal_struct_sample_registration.h" + +eCAL::Registration::Sample pub_foo_process_a_unregister; +eCAL::Registration::Sample pub_foo_process_a_register_1; +eCAL::Registration::Sample pub_foo_process_a_register_2; + +eCAL::Registration::Sample sub_foo_process_a_unregister; +eCAL::Registration::Sample sub_foo_process_a_register_1; +eCAL::Registration::Sample sub_foo_process_a_register_2; + +eCAL::Registration::Sample sub_foo_process_b_unregister; +eCAL::Registration::Sample sub_foo_process_b_register_1; +eCAL::Registration::Sample sub_foo_process_b_register_2; + +// make sure we create unique topic IDs for our testcases +std::string getUniqueId() +{ + static int topic_id = 1; + return std::to_string(topic_id++); +} + +eCAL::Registration::Sample UpdateTopicSample(const eCAL::Registration::Sample& input_) +{ + // vary statistical data + eCAL::Registration::Sample updated = input_; + updated.topic.rclock = input_.topic.rclock + 1; + updated.topic.dclock = input_.topic.dclock + 10; + return updated; +} + +void InitializeAllSamples() +{ + // Publisher 1 + pub_foo_process_a_unregister.cmd_type = eCAL::bct_unreg_publisher; + pub_foo_process_a_unregister.identifier.host_name = "host0"; + pub_foo_process_a_unregister.identifier.process_id = 1000; + pub_foo_process_a_unregister.identifier.entity_id = getUniqueId(); + pub_foo_process_a_unregister.topic.hgname = "host0"; + pub_foo_process_a_unregister.topic.pname = "process_a"; + pub_foo_process_a_unregister.topic.tname = "foo"; + pub_foo_process_a_register_1.topic.uname = "abc"; + + pub_foo_process_a_register_1 = pub_foo_process_a_unregister; + pub_foo_process_a_register_1.cmd_type = eCAL::bct_reg_publisher; + pub_foo_process_a_register_1.topic.rclock = 1; + pub_foo_process_a_register_1.topic.direction = "publisher"; + pub_foo_process_a_register_1.topic.tdatatype = { "a", "b", "c" }; + pub_foo_process_a_register_1.topic.tsize = 100; + pub_foo_process_a_register_1.topic.connections_loc = 2; + pub_foo_process_a_register_1.topic.connections_ext = 2; + pub_foo_process_a_register_1.topic.message_drops = 0; + pub_foo_process_a_register_1.topic.did = 0; + pub_foo_process_a_register_1.topic.dclock = 1; + pub_foo_process_a_register_1.topic.dfreq = 10; + + pub_foo_process_a_register_2 = UpdateTopicSample(pub_foo_process_a_register_1); + + // Subscriber 1 + sub_foo_process_a_unregister.cmd_type = eCAL::bct_unreg_subscriber; + sub_foo_process_a_unregister.identifier.host_name = "host0"; + sub_foo_process_a_unregister.identifier.process_id = 1000; + sub_foo_process_a_unregister.identifier.entity_id = getUniqueId(); + sub_foo_process_a_unregister.topic.hgname = "host0"; + sub_foo_process_a_unregister.topic.pname = "process_a"; + sub_foo_process_a_unregister.topic.tname = "foo"; + sub_foo_process_a_register_1.topic.uname = "abc"; + + sub_foo_process_a_register_1 = sub_foo_process_a_unregister; + sub_foo_process_a_register_1.cmd_type = eCAL::bct_reg_subscriber; + sub_foo_process_a_register_1.topic.rclock = 1; + sub_foo_process_a_register_1.topic.direction = "subscriber"; + sub_foo_process_a_register_1.topic.tdatatype = { "a", "b", "c" }; + sub_foo_process_a_register_1.topic.tsize = 100; + sub_foo_process_a_register_1.topic.connections_loc = 2; + sub_foo_process_a_register_1.topic.connections_ext = 2; + sub_foo_process_a_register_1.topic.message_drops = 0; + sub_foo_process_a_register_1.topic.did = 0; + sub_foo_process_a_register_1.topic.dclock = 1; + sub_foo_process_a_register_1.topic.dfreq = 10; + + sub_foo_process_a_register_2 = UpdateTopicSample(sub_foo_process_a_register_1); + + // Subscriber 2 + sub_foo_process_b_unregister.cmd_type = eCAL::bct_unreg_subscriber; + sub_foo_process_b_unregister.identifier.host_name = "host0"; + sub_foo_process_b_unregister.identifier.process_id = 1000; + sub_foo_process_b_unregister.identifier.entity_id = getUniqueId(); + sub_foo_process_b_unregister.topic.hgname = "host0"; + sub_foo_process_b_unregister.topic.pname = "process_b"; + sub_foo_process_b_unregister.topic.tname = "foo"; + sub_foo_process_b_register_1.topic.uname = "abc"; + + sub_foo_process_b_register_1 = sub_foo_process_b_unregister; + sub_foo_process_b_register_1.cmd_type = eCAL::bct_reg_subscriber; + sub_foo_process_b_register_1.topic.rclock = 1; + sub_foo_process_b_register_1.topic.direction = "subscriber"; + sub_foo_process_b_register_1.topic.tdatatype = { "a", "b", "c" }; + sub_foo_process_b_register_1.topic.tsize = 100; + sub_foo_process_b_register_1.topic.connections_loc = 2; + sub_foo_process_b_register_1.topic.connections_ext = 2; + sub_foo_process_b_register_1.topic.message_drops = 0; + sub_foo_process_b_register_1.topic.did = 0; + sub_foo_process_b_register_1.topic.dclock = 1; + sub_foo_process_b_register_1.topic.dfreq = 10; + + sub_foo_process_b_register_2 = UpdateTopicSample(sub_foo_process_b_register_1); +} + +class TestingClock { +public: + // Define the required types for TrivialClock + using duration = std::chrono::milliseconds; + using rep = duration::rep; + using period = duration::period; + using time_point = std::chrono::time_point; + static const bool is_steady = false; + + // Function to get the current time + static time_point now() noexcept { + return time_point(current_time); + } + + // Function to manually set the current time + static void set_time(const time_point& tp) { + current_time = tp.time_since_epoch(); + } + + // Function to manually increment the current time by a given duration + static void increment_time(const duration& d) { + current_time += d; + } + +private: + static duration current_time; +}; + +// Initialize the static member +TestingClock::duration TestingClock::current_time{ 0 }; + +// Create a test fixture class +class core_cpp_registration : public ::testing::Test { +protected: + // Override the SetUp method to initialize the global variable + void SetUp() override { + InitializeAllSamples(); + TestingClock::set_time(std::chrono::time_point(std::chrono::milliseconds(0))); + } + + // You can also override the TearDown method if needed + void TearDown() override { + // Clean up if necessary + } +}; + +TEST_F(core_cpp_registration, IsUnregistrationSamples) +{ + EXPECT_EQ(eCAL::Registration::IsUnregistrationSample(pub_foo_process_a_unregister), true); + EXPECT_EQ(eCAL::Registration::IsUnregistrationSample(pub_foo_process_a_register_1), false); + EXPECT_EQ(eCAL::Registration::IsUnregistrationSample(pub_foo_process_a_register_2), false); + EXPECT_EQ(eCAL::Registration::IsUnregistrationSample(sub_foo_process_a_unregister), true); + EXPECT_EQ(eCAL::Registration::IsUnregistrationSample(sub_foo_process_a_register_1), false); + EXPECT_EQ(eCAL::Registration::IsUnregistrationSample(sub_foo_process_a_register_2), false); + EXPECT_EQ(eCAL::Registration::IsUnregistrationSample(sub_foo_process_b_unregister), true); + EXPECT_EQ(eCAL::Registration::IsUnregistrationSample(sub_foo_process_b_register_1), false); + EXPECT_EQ(eCAL::Registration::IsUnregistrationSample(sub_foo_process_b_register_2), false); +} + + +TEST_F(core_cpp_registration, CreateUnregistrationSamples) +{ + EXPECT_EQ(eCAL::Registration::CreateUnregisterSample(pub_foo_process_a_register_1), pub_foo_process_a_unregister); + EXPECT_EQ(eCAL::Registration::CreateUnregisterSample(pub_foo_process_a_register_2), pub_foo_process_a_unregister); + EXPECT_EQ(eCAL::Registration::CreateUnregisterSample(sub_foo_process_a_register_1), sub_foo_process_a_unregister); + EXPECT_EQ(eCAL::Registration::CreateUnregisterSample(sub_foo_process_a_register_2), sub_foo_process_a_unregister); + EXPECT_EQ(eCAL::Registration::CreateUnregisterSample(sub_foo_process_b_register_1), sub_foo_process_b_unregister); + EXPECT_EQ(eCAL::Registration::CreateUnregisterSample(sub_foo_process_b_register_2), sub_foo_process_b_unregister); +} + +// we apply samples and then unregistration samples +// we need to veryfy no callback is called +TEST_F(core_cpp_registration, TimeOutProviderApplyUnregistration) +{ + int callbacks_called = 0; + eCAL::Registration::CTimeoutProvider timout_provider(std::chrono::seconds(5), [&callbacks_called](const eCAL::Registration::Sample&) {callbacks_called++; return true; }); + + timout_provider.ApplySample(pub_foo_process_a_register_1); + TestingClock::increment_time(std::chrono::seconds(1)); + timout_provider.CheckForTimeouts(); + EXPECT_EQ(callbacks_called, 0); + + TestingClock::increment_time(std::chrono::seconds(1)); + timout_provider.ApplySample(pub_foo_process_a_register_2); + TestingClock::increment_time(std::chrono::seconds(1)); + timout_provider.CheckForTimeouts(); + EXPECT_EQ(callbacks_called, 0); + + TestingClock::increment_time(std::chrono::seconds(1)); + timout_provider.ApplySample(pub_foo_process_a_unregister); + TestingClock::increment_time(std::chrono::seconds(1)); + timout_provider.CheckForTimeouts(); + EXPECT_EQ(callbacks_called, 0); +} + +// we apply samples and then unregistration samples +// we need to veryfy no callback is called +TEST_F(core_cpp_registration, TimeOutProviderApplyTimeout) +{ + int callbacks_called = 0; + eCAL::Registration::Sample sample_from_callback; + eCAL::Registration::CTimeoutProvider timout_provider(std::chrono::seconds(5), + [&sample_from_callback, &callbacks_called](const eCAL::Registration::Sample& s) + { + sample_from_callback = s; + ++callbacks_called; + return true; + }); + + timout_provider.ApplySample(pub_foo_process_a_register_1); + TestingClock::increment_time(std::chrono::seconds(6)); + timout_provider.CheckForTimeouts(); + EXPECT_EQ(sample_from_callback, pub_foo_process_a_unregister); + EXPECT_EQ(callbacks_called, 1); + + // reset sample + sample_from_callback = eCAL::Registration::Sample{}; + callbacks_called = 0; + TestingClock::increment_time(std::chrono::seconds(6)); + timout_provider.ApplySample(pub_foo_process_a_register_2); + TestingClock::increment_time(std::chrono::seconds(6)); + timout_provider.CheckForTimeouts(); + EXPECT_EQ(sample_from_callback, pub_foo_process_a_unregister); + EXPECT_EQ(callbacks_called, 1); +}