From c2970dd98041d1a7751fcb486c6d45539f9e79f9 Mon Sep 17 00:00:00 2001 From: rex-schilasky <49162693+rex-schilasky@users.noreply.github.com> Date: Wed, 8 May 2024 17:19:58 +0200 Subject: [PATCH] atomic Initialization reader Create/Destroy replaced by Constructor/Destructor --- ecal/core/src/pubsub/ecal_subgate.cpp | 5 +-- ecal/core/src/pubsub/ecal_subscriber.cpp | 23 ++---------- ecal/core/src/readwrite/ecal_reader.cpp | 48 +++++++----------------- ecal/core/src/readwrite/ecal_reader.h | 32 +++++++--------- ecal/core/src/readwrite/ecal_writer.cpp | 14 ++++--- ecal/core/src/readwrite/ecal_writer.h | 12 +++--- 6 files changed, 45 insertions(+), 89 deletions(-) diff --git a/ecal/core/src/pubsub/ecal_subgate.cpp b/ecal/core/src/pubsub/ecal_subgate.cpp index 2c40e9ebd7..3015068c79 100644 --- a/ecal/core/src/pubsub/ecal_subgate.cpp +++ b/ecal/core/src/pubsub/ecal_subgate.cpp @@ -65,10 +65,7 @@ namespace eCAL // destroy all remaining subscriber const std::unique_lock lock(m_topic_name_datareader_sync); - for (auto iter = m_topic_name_datareader_map.begin(); iter != m_topic_name_datareader_map.end(); ++iter) - { - iter->second->Destroy(); - } + m_topic_name_datareader_map.clear(); m_created = false; } diff --git a/ecal/core/src/pubsub/ecal_subscriber.cpp b/ecal/core/src/pubsub/ecal_subscriber.cpp index 898af14e3b..4e6b2753a6 100644 --- a/ecal/core/src/pubsub/ecal_subscriber.cpp +++ b/ecal/core/src/pubsub/ecal_subscriber.cpp @@ -99,20 +99,8 @@ namespace eCAL } // create data reader - m_datareader = std::make_shared(); - // create it - if (!m_datareader->Create(topic_name_, topic_info_)) - { -#ifndef NDEBUG - // log it - if (g_log() != nullptr) g_log()->Log(log_level_debug1, std::string(topic_name_ + "::CSubscriber::Create - FAILED")); -#endif - return(false); - } -#ifndef NDEBUG - // log it - if (g_log() != nullptr) g_log()->Log(log_level_debug1, std::string(topic_name_ + "::CSubscriber::Create - SUCCESS")); -#endif + m_datareader = std::make_shared(topic_name_, topic_info_); + // register to subscriber gateway for publisher memory file receive thread g_subgate()->Register(topic_name_, m_datareader); @@ -137,12 +125,9 @@ namespace eCAL if (g_log() != nullptr) g_log()->Log(log_level_debug1, std::string(m_datareader->GetTopicName() + "::CSubscriber::Destroy")); #endif - // destroy local data reader - m_datareader->Destroy(); - - // free datareader + // destroy data reader m_datareader.reset(); - + // we made it :-) m_created = false; diff --git a/ecal/core/src/readwrite/ecal_reader.cpp b/ecal/core/src/readwrite/ecal_reader.cpp index 2507690852..25d716b206 100644 --- a/ecal/core/src/readwrite/ecal_reader.cpp +++ b/ecal/core/src/readwrite/ecal_reader.cpp @@ -59,35 +59,25 @@ namespace eCAL //////////////////////////////////////// // CDataReader //////////////////////////////////////// - CDataReader::CDataReader() : + CDataReader::CDataReader(const std::string& topic_name_, const SDataTypeInformation& topic_info_) : m_host_name(Process::GetHostName()), m_host_group_name(Process::GetHostGroupName()), m_pid(Process::GetProcessID()), m_pname(Process::GetProcessName()), - m_frequency_calculator(0.0f) + m_topic_name(topic_name_), + m_topic_info(topic_info_), + m_topic_size(0), + m_connected(false), + m_receive_time(0), + m_clock(0), + m_frequency_calculator(0.0f), + m_created(false) { - } - - CDataReader::~CDataReader() - { - Destroy(); - } - - bool CDataReader::Create(const std::string& topic_name_, const SDataTypeInformation& topic_info_) - { - if(m_created) return(false); - - // set defaults - m_topic_name = topic_name_; - m_topic_id.clear(); - m_topic_info = topic_info_; - m_clock = 0; - m_message_drops = 0; - m_created = false; #ifndef NDEBUG // log it - Logging::Log(log_level_debug1, m_topic_name + "::CDataReader::Create"); + Logging::Log(log_level_debug1, m_topic_name + "::CDataReader::Constructor"); #endif + // build topic id std::stringstream counter; counter << std::chrono::steady_clock::now().time_since_epoch().count(); @@ -111,17 +101,13 @@ namespace eCAL // register Register(false); - - return(true); } - bool CDataReader::Destroy() + CDataReader::~CDataReader() { - if (!m_created) return(false); - #ifndef NDEBUG // log it - Logging::Log(log_level_debug1, m_topic_name + "::CDataReader::Destroy"); + Logging::Log(log_level_debug1, m_topic_name + "::CDataReader::Destructor"); #endif // stop transport layers @@ -144,14 +130,8 @@ namespace eCAL // and unregister Unregister(); - - // reset defaults - m_clock = 0; - m_message_drops = 0; - - return(true); } - + void CDataReader::InitializeLayers() { // initialize udp multicast layer diff --git a/ecal/core/src/readwrite/ecal_reader.h b/ecal/core/src/readwrite/ecal_reader.h index c8c6b19371..9b504a85a5 100644 --- a/ecal/core/src/readwrite/ecal_reader.h +++ b/ecal/core/src/readwrite/ecal_reader.h @@ -23,10 +23,6 @@ #pragma once -#include -#include -#include -#include #include #include #include @@ -34,20 +30,21 @@ #include "serialization/ecal_serialize_sample_payload.h" #include "serialization/ecal_serialize_sample_registration.h" #include "util/ecal_expmap.h" +#include "util/frequency_calculator.h" +#include +#include #include +#include +#include #include #include -#include #include +#include #include #include - -#include #include -#include - namespace eCAL { class CDataReader @@ -73,14 +70,11 @@ namespace eCAL } }; - CDataReader(); + CDataReader(const std::string& topic_name_, const SDataTypeInformation& topic_info_); ~CDataReader(); static void InitializeLayers(); - bool Create(const std::string& topic_name_, const SDataTypeInformation& topic_info_); - bool Destroy(); - bool Receive(std::string& buf_, long long* time_ = nullptr, int rcv_timeout_ms_ = 0); bool AddReceiveCallback(ReceiveCallbackT callback_); @@ -138,15 +132,15 @@ namespace eCAL std::string m_host_name; std::string m_host_group_name; - int m_pid; + int m_pid = 0; std::string m_pname; std::string m_topic_name; std::string m_topic_id; SDataTypeInformation m_topic_info; std::map m_attr; - std::atomic m_topic_size = 0; + std::atomic m_topic_size; - std::atomic m_connected = false; + std::atomic m_connected; using PublicationMapT = Util::CExpMap>; mutable std::mutex m_pub_map_mtx; PublicationMapT m_pub_map; @@ -159,7 +153,7 @@ namespace eCAL std::mutex m_receive_callback_mtx; ReceiveCallbackT m_receive_callback; - std::atomic m_receive_time = 0; + std::atomic m_receive_time; std::deque m_sample_hash_queue; @@ -167,7 +161,7 @@ namespace eCAL std::mutex m_event_callback_map_mtx; EventCallbackMapT m_event_callback_map; - std::atomic m_clock = 0; + std::atomic m_clock; std::mutex m_frequency_calculator_mtx; ResettableFrequencyCalculator m_frequency_calculator; @@ -182,6 +176,6 @@ namespace eCAL bool m_share_tdesc = false; SLayerStates m_confirmed_layers; - std::atomic m_created = false; + std::atomic m_created; }; } diff --git a/ecal/core/src/readwrite/ecal_writer.cpp b/ecal/core/src/readwrite/ecal_writer.cpp index a7da2f9e25..6be1e87f73 100644 --- a/ecal/core/src/readwrite/ecal_writer.cpp +++ b/ecal/core/src/readwrite/ecal_writer.cpp @@ -75,8 +75,15 @@ namespace eCAL m_topic_name(topic_name_), m_topic_info(topic_info_), m_config(config_), - m_frequency_calculator(0.0f) + m_connected(false), + m_frequency_calculator(0.0f), + m_created(false) { +#ifndef NDEBUG + // log it + Logging::Log(log_level_debug1, m_topic_name + "::CDataWriter::Constructor"); +#endif + // build topic id std::stringstream counter; counter << std::chrono::steady_clock::now().time_since_epoch().count(); @@ -100,11 +107,6 @@ namespace eCAL // create tcp layer ActivateTcpLayer(config_.tcp.activate); - -#ifndef NDEBUG - // log it - Logging::Log(log_level_debug1, m_topic_name + "::CDataWriter::Constructor"); -#endif } CDataWriter::~CDataWriter() diff --git a/ecal/core/src/readwrite/ecal_writer.h b/ecal/core/src/readwrite/ecal_writer.h index c53d8b7ea0..896ab13970 100644 --- a/ecal/core/src/readwrite/ecal_writer.h +++ b/ecal/core/src/readwrite/ecal_writer.h @@ -23,14 +23,10 @@ #pragma once -#include -#include -#include #include #include #include #include -#include #include "util/ecal_expmap.h" #include @@ -47,10 +43,12 @@ #include "tcp/ecal_writer_tcp.h" #endif +#include +#include +#include #include #include #include -#include #include #include #include @@ -148,7 +146,7 @@ namespace eCAL std::vector m_payload_buffer; - std::atomic m_connected = false; + std::atomic m_connected; using SSubscriptionMapT = Util::CExpMap>; mutable std::mutex m_sub_map_mtx; @@ -175,6 +173,6 @@ namespace eCAL #endif SLayerStates m_confirmed_layers; - std::atomic m_created = false; + std::atomic m_created; }; }