Skip to content

Commit

Permalink
atomic Initialization
Browse files Browse the repository at this point in the history
reader Create/Destroy replaced by Constructor/Destructor
  • Loading branch information
rex-schilasky committed May 8, 2024
1 parent 0f79cd1 commit c2970dd
Show file tree
Hide file tree
Showing 6 changed files with 45 additions and 89 deletions.
5 changes: 1 addition & 4 deletions ecal/core/src/pubsub/ecal_subgate.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -65,10 +65,7 @@ namespace eCAL

// destroy all remaining subscriber
const std::unique_lock<std::shared_timed_mutex> lock(m_topic_name_datareader_sync);
for (auto iter = m_topic_name_datareader_map.begin(); iter != m_topic_name_datareader_map.end(); ++iter)
{
iter->second->Destroy();
}
m_topic_name_datareader_map.clear();

m_created = false;
}
Expand Down
23 changes: 4 additions & 19 deletions ecal/core/src/pubsub/ecal_subscriber.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -99,20 +99,8 @@ namespace eCAL
}

// create data reader
m_datareader = std::make_shared<CDataReader>();
// create it
if (!m_datareader->Create(topic_name_, topic_info_))
{
#ifndef NDEBUG
// log it
if (g_log() != nullptr) g_log()->Log(log_level_debug1, std::string(topic_name_ + "::CSubscriber::Create - FAILED"));
#endif
return(false);
}
#ifndef NDEBUG
// log it
if (g_log() != nullptr) g_log()->Log(log_level_debug1, std::string(topic_name_ + "::CSubscriber::Create - SUCCESS"));
#endif
m_datareader = std::make_shared<CDataReader>(topic_name_, topic_info_);

// register to subscriber gateway for publisher memory file receive thread
g_subgate()->Register(topic_name_, m_datareader);

Expand All @@ -137,12 +125,9 @@ namespace eCAL
if (g_log() != nullptr) g_log()->Log(log_level_debug1, std::string(m_datareader->GetTopicName() + "::CSubscriber::Destroy"));
#endif

// destroy local data reader
m_datareader->Destroy();

// free datareader
// destroy data reader
m_datareader.reset();

// we made it :-)
m_created = false;

Expand Down
48 changes: 14 additions & 34 deletions ecal/core/src/readwrite/ecal_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -59,35 +59,25 @@ namespace eCAL
////////////////////////////////////////
// CDataReader
////////////////////////////////////////
CDataReader::CDataReader() :
CDataReader::CDataReader(const std::string& topic_name_, const SDataTypeInformation& topic_info_) :
m_host_name(Process::GetHostName()),
m_host_group_name(Process::GetHostGroupName()),
m_pid(Process::GetProcessID()),
m_pname(Process::GetProcessName()),
m_frequency_calculator(0.0f)
m_topic_name(topic_name_),
m_topic_info(topic_info_),
m_topic_size(0),
m_connected(false),
m_receive_time(0),
m_clock(0),
m_frequency_calculator(0.0f),
m_created(false)
{
}

CDataReader::~CDataReader()
{
Destroy();
}

bool CDataReader::Create(const std::string& topic_name_, const SDataTypeInformation& topic_info_)
{
if(m_created) return(false);

// set defaults
m_topic_name = topic_name_;
m_topic_id.clear();
m_topic_info = topic_info_;
m_clock = 0;
m_message_drops = 0;
m_created = false;
#ifndef NDEBUG
// log it
Logging::Log(log_level_debug1, m_topic_name + "::CDataReader::Create");
Logging::Log(log_level_debug1, m_topic_name + "::CDataReader::Constructor");
#endif

// build topic id
std::stringstream counter;
counter << std::chrono::steady_clock::now().time_since_epoch().count();
Expand All @@ -111,17 +101,13 @@ namespace eCAL

// register
Register(false);

return(true);
}

bool CDataReader::Destroy()
CDataReader::~CDataReader()
{
if (!m_created) return(false);

#ifndef NDEBUG
// log it
Logging::Log(log_level_debug1, m_topic_name + "::CDataReader::Destroy");
Logging::Log(log_level_debug1, m_topic_name + "::CDataReader::Destructor");
#endif

// stop transport layers
Expand All @@ -144,14 +130,8 @@ namespace eCAL

// and unregister
Unregister();

// reset defaults
m_clock = 0;
m_message_drops = 0;

return(true);
}

void CDataReader::InitializeLayers()
{
// initialize udp multicast layer
Expand Down
32 changes: 13 additions & 19 deletions ecal/core/src/readwrite/ecal_reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -23,31 +23,28 @@

#pragma once

#include <atomic>
#include <chrono>
#include <cstddef>
#include <deque>
#include <ecal/ecal.h>
#include <ecal/ecal_callback.h>
#include <ecal/ecal_types.h>

#include "serialization/ecal_serialize_sample_payload.h"
#include "serialization/ecal_serialize_sample_registration.h"
#include "util/ecal_expmap.h"
#include "util/frequency_calculator.h"

#include <atomic>
#include <chrono>
#include <condition_variable>
#include <cstddef>
#include <deque>
#include <map>
#include <mutex>
#include <atomic>
#include <set>
#include <string>
#include <tuple>
#include <queue>

#include <string>
#include <unordered_map>

#include <util/frequency_calculator.h>

namespace eCAL
{
class CDataReader
Expand All @@ -73,14 +70,11 @@ namespace eCAL
}
};

CDataReader();
CDataReader(const std::string& topic_name_, const SDataTypeInformation& topic_info_);
~CDataReader();

static void InitializeLayers();

bool Create(const std::string& topic_name_, const SDataTypeInformation& topic_info_);
bool Destroy();

bool Receive(std::string& buf_, long long* time_ = nullptr, int rcv_timeout_ms_ = 0);

bool AddReceiveCallback(ReceiveCallbackT callback_);
Expand Down Expand Up @@ -138,15 +132,15 @@ namespace eCAL

std::string m_host_name;
std::string m_host_group_name;
int m_pid;
int m_pid = 0;
std::string m_pname;
std::string m_topic_name;
std::string m_topic_id;
SDataTypeInformation m_topic_info;
std::map<std::string, std::string> m_attr;
std::atomic<size_t> m_topic_size = 0;
std::atomic<size_t> m_topic_size;

std::atomic<bool> m_connected = false;
std::atomic<bool> m_connected;
using PublicationMapT = Util::CExpMap<SPublicationInfo, std::tuple<SDataTypeInformation, SLayerStates>>;
mutable std::mutex m_pub_map_mtx;
PublicationMapT m_pub_map;
Expand All @@ -159,15 +153,15 @@ namespace eCAL

std::mutex m_receive_callback_mtx;
ReceiveCallbackT m_receive_callback;
std::atomic<int> m_receive_time = 0;
std::atomic<int> m_receive_time;

std::deque<size_t> m_sample_hash_queue;

using EventCallbackMapT = std::map<eCAL_Subscriber_Event, SubEventCallbackT>;
std::mutex m_event_callback_map_mtx;
EventCallbackMapT m_event_callback_map;

std::atomic<long long> m_clock = 0;
std::atomic<long long> m_clock;

std::mutex m_frequency_calculator_mtx;
ResettableFrequencyCalculator<std::chrono::steady_clock> m_frequency_calculator;
Expand All @@ -182,6 +176,6 @@ namespace eCAL
bool m_share_tdesc = false;

SLayerStates m_confirmed_layers;
std::atomic<bool> m_created = false;
std::atomic<bool> m_created;
};
}
14 changes: 8 additions & 6 deletions ecal/core/src/readwrite/ecal_writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -75,8 +75,15 @@ namespace eCAL
m_topic_name(topic_name_),
m_topic_info(topic_info_),
m_config(config_),
m_frequency_calculator(0.0f)
m_connected(false),
m_frequency_calculator(0.0f),
m_created(false)
{
#ifndef NDEBUG
// log it
Logging::Log(log_level_debug1, m_topic_name + "::CDataWriter::Constructor");
#endif

// build topic id
std::stringstream counter;
counter << std::chrono::steady_clock::now().time_since_epoch().count();
Expand All @@ -100,11 +107,6 @@ namespace eCAL

// create tcp layer
ActivateTcpLayer(config_.tcp.activate);

#ifndef NDEBUG
// log it
Logging::Log(log_level_debug1, m_topic_name + "::CDataWriter::Constructor");
#endif
}

CDataWriter::~CDataWriter()
Expand Down
12 changes: 5 additions & 7 deletions ecal/core/src/readwrite/ecal_writer.h
Original file line number Diff line number Diff line change
Expand Up @@ -23,14 +23,10 @@

#pragma once

#include <atomic>
#include <chrono>
#include <cstddef>
#include <ecal/ecal_callback.h>
#include <ecal/ecal_payload_writer.h>
#include <ecal/ecal_publisher_config.h>
#include <ecal/ecal_types.h>
#include <tuple>

#include "util/ecal_expmap.h"
#include <util/frequency_calculator.h>
Expand All @@ -47,10 +43,12 @@
#include "tcp/ecal_writer_tcp.h"
#endif

#include <atomic>
#include <chrono>
#include <cstddef>
#include <memory>
#include <mutex>
#include <string>
#include <atomic>
#include <map>
#include <tuple>
#include <vector>
Expand Down Expand Up @@ -148,7 +146,7 @@ namespace eCAL

std::vector<char> m_payload_buffer;

std::atomic<bool> m_connected = false;
std::atomic<bool> m_connected;

using SSubscriptionMapT = Util::CExpMap<SSubscriptionInfo, std::tuple<SDataTypeInformation, SLayerStates>>;
mutable std::mutex m_sub_map_mtx;
Expand All @@ -175,6 +173,6 @@ namespace eCAL
#endif

SLayerStates m_confirmed_layers;
std::atomic<bool> m_created = false;
std::atomic<bool> m_created;
};
}

0 comments on commit c2970dd

Please sign in to comment.