Skip to content

Commit

Permalink
ecal_thread back again :-) (now based on std::thread)
Browse files Browse the repository at this point in the history
  • Loading branch information
rex-schilasky committed Nov 28, 2023
1 parent a238c1c commit aa39d7c
Show file tree
Hide file tree
Showing 8 changed files with 171 additions and 67 deletions.
1 change: 1 addition & 0 deletions ecal/core/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -300,6 +300,7 @@ set(ecal_util_src
src/util/convert_utf.cpp
src/util/convert_utf.h
src/util/ecal_expmap.h
src/util/ecal_thread.h
src/util/getenvvar.h
src/util/sys_usage.cpp
src/util/sys_usage.h
Expand Down
23 changes: 10 additions & 13 deletions ecal/core/src/io/udp/rcv_logging.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -47,30 +47,27 @@ namespace eCAL
m_msg_buffer.resize(MSG_BUFFER_SIZE);

// start receiver thread
m_receive_thread = std::thread(&CUDPLoggingReceiver::ReceiveThread, this);
m_udp_receiver_thread = std::make_shared<CallbackThread>(std::bind(&CUDPLoggingReceiver::ReceiveThread, this));
m_udp_receiver_thread->start(std::chrono::milliseconds(0));
}

CUDPLoggingReceiver::~CUDPLoggingReceiver()
{
m_receive_thread_stop.store(true, std::memory_order_release);
m_receive_thread.join();
m_udp_receiver_thread->stop();
}

void CUDPLoggingReceiver::ReceiveThread()
{
while (!m_receive_thread_stop.load(std::memory_order_acquire))
// wait for any incoming message
const size_t recv_len = m_udp_receiver.Receive(m_msg_buffer.data(), m_msg_buffer.size(), CMN_UDP_RECEIVE_THREAD_CYCLE_TIME_MS);
if (recv_len > 0)
{
// wait for any incoming message
const size_t recv_len = m_udp_receiver.Receive(m_msg_buffer.data(), m_msg_buffer.size(), CMN_UDP_RECEIVE_THREAD_CYCLE_TIME_MS);
if (recv_len > 0)
m_log_message.Clear();
if (m_log_message.ParseFromArray(m_msg_buffer.data(), static_cast<int>(recv_len)))
{
m_log_message.Clear();
if (m_log_message.ParseFromArray(m_msg_buffer.data(), static_cast<int>(recv_len)))
if (IsLocalHost(m_log_message) || m_network_mode)
{
if (IsLocalHost(m_log_message) || m_network_mode)
{
m_log_message_callback(m_log_message);
}
m_log_message_callback(m_log_message);
}
}
}
Expand Down
16 changes: 8 additions & 8 deletions ecal/core/src/io/udp/rcv_logging.h
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,10 @@

#include "ecal_def.h"
#include "udp_receiver.h"
#include "util/ecal_thread.h"

#include <string>
#include <thread>
#include <memory>
#include <vector>

#ifdef _MSC_VER
Expand All @@ -51,14 +52,13 @@ namespace eCAL
protected:
void ReceiveThread();

bool m_network_mode;
LogMessageCallbackT m_log_message_callback;
bool m_network_mode;
LogMessageCallbackT m_log_message_callback;

CUDPReceiver m_udp_receiver;
std::thread m_receive_thread;
std::atomic<bool> m_receive_thread_stop;
CUDPReceiver m_udp_receiver;
std::shared_ptr<CallbackThread> m_udp_receiver_thread;

std::vector<char> m_msg_buffer;
eCAL::pb::LogMessage m_log_message;
std::vector<char> m_msg_buffer;
eCAL::pb::LogMessage m_log_message;
};
}
17 changes: 7 additions & 10 deletions ecal/core/src/io/udp/rcv_sample.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -102,15 +102,15 @@ CUDPSampleReceiver::CUDPSampleReceiver(const eCAL::SReceiverAttr& attr_, HasSamp
m_msg_buffer.resize(MSG_BUFFER_SIZE);

// start receiver thread
m_receive_thread = std::thread(&CUDPSampleReceiver::ReceiveThread, this);
m_udp_receiver_thread = std::make_shared<eCAL::CallbackThread>(std::bind(&CUDPSampleReceiver::ReceiveThread, this));
m_udp_receiver_thread->start(std::chrono::milliseconds(0));

m_cleanup_start = std::chrono::steady_clock::now();
}

CUDPSampleReceiver::~CUDPSampleReceiver()
{
m_receive_thread_stop.store(true, std::memory_order_release);
m_receive_thread.join();
m_udp_receiver_thread->stop();
}

bool CUDPSampleReceiver::AddMultiCastGroup(const char* ipaddr_)
Expand All @@ -125,14 +125,11 @@ bool CUDPSampleReceiver::RemMultiCastGroup(const char* ipaddr_)

void CUDPSampleReceiver::ReceiveThread()
{
while (!m_receive_thread_stop.load(std::memory_order_acquire))
// wait for any incoming message
const size_t recv_len = m_udp_receiver.Receive(m_msg_buffer.data(), m_msg_buffer.size(), CMN_UDP_RECEIVE_THREAD_CYCLE_TIME_MS);
if (recv_len > 0)
{
// wait for any incoming message
const size_t recv_len = m_udp_receiver.Receive(m_msg_buffer.data(), m_msg_buffer.size(), CMN_UDP_RECEIVE_THREAD_CYCLE_TIME_MS);
if (recv_len > 0)
{
Process(m_msg_buffer.data(), recv_len);
}
Process(m_msg_buffer.data(), recv_len);
}
}

Expand Down
20 changes: 10 additions & 10 deletions ecal/core/src/io/udp/rcv_sample.h
Original file line number Diff line number Diff line change
Expand Up @@ -26,11 +26,12 @@
#include "ecal_def.h"
#include "udp_receiver.h"
#include "rcv_sample_slot.h"
#include "util/ecal_thread.h"

#include <chrono>
#include <functional>
#include <memory>
#include <string>
#include <thread>
#include <unordered_map>
#include <vector>

Expand Down Expand Up @@ -71,17 +72,16 @@ class CUDPSampleReceiver
void ReceiveThread();
void Process(const char* sample_buffer_, size_t sample_buffer_len_);

HasSampleCallbackT m_has_sample_callback;
ApplySampleCallbackT m_apply_sample_callback;
HasSampleCallbackT m_has_sample_callback;
ApplySampleCallbackT m_apply_sample_callback;

eCAL::CUDPReceiver m_udp_receiver;
std::thread m_receive_thread;
std::atomic<bool> m_receive_thread_stop;
eCAL::CUDPReceiver m_udp_receiver;
std::shared_ptr<eCAL::CallbackThread> m_udp_receiver_thread;

using ReceiveSlotMapT = std::unordered_map<int32_t, std::shared_ptr<CSampleReceiveSlot>>;
ReceiveSlotMapT m_receive_slot_map;
std::vector<char> m_msg_buffer;
eCAL::pb::Sample m_ecal_sample;
ReceiveSlotMapT m_receive_slot_map;
std::vector<char> m_msg_buffer;
eCAL::pb::Sample m_ecal_sample;

std::chrono::steady_clock::time_point m_cleanup_start;
std::chrono::steady_clock::time_point m_cleanup_start;
};
37 changes: 17 additions & 20 deletions ecal/core/src/pubsub/ecal_subgate.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -77,8 +77,9 @@ namespace eCAL
CDataReader::InitializeLayers();

// start timeout thread
m_subtimeout_thread = std::thread(&CSubGate::CheckTimeouts, this);

m_subtimeout_thread = std::make_shared<CallbackThread>(std::bind(&CSubGate::CheckTimeouts, this));
m_subtimeout_thread->start(std::chrono::milliseconds(CMN_DATAREADER_TIMEOUT_RESOLUTION_MS));

m_created = true;
}

Expand All @@ -87,8 +88,7 @@ namespace eCAL
if(!m_created) return;

// stop timeout thread
m_subtimeout_thread_stop.store(true, std::memory_order_release);
m_subtimeout_thread.join();
m_subtimeout_thread->stop();

// destroy all remaining subscriber
const std::unique_lock<std::shared_timed_mutex> lock(m_topic_name_datareader_sync);
Expand Down Expand Up @@ -372,25 +372,22 @@ namespace eCAL

void CSubGate::CheckTimeouts()
{
while (!m_subtimeout_thread_stop.load(std::memory_order_acquire))
// check subscriber timeouts
const std::shared_lock<std::shared_timed_mutex> lock(m_topic_name_datareader_sync);
for (auto iter = m_topic_name_datareader_map.begin(); iter != m_topic_name_datareader_map.end(); ++iter)
{
// check subscriber timeouts
const std::shared_lock<std::shared_timed_mutex> lock(m_topic_name_datareader_sync);
for (auto iter = m_topic_name_datareader_map.begin(); iter != m_topic_name_datareader_map.end(); ++iter)
{
iter->second->CheckReceiveTimeout();
}

// signal shutdown if eCAL is not okay
const bool ecal_is_ok = (g_globals_ctx != nullptr) && !gWaitForEvent(ShutdownProcEvent(), 0);
if (!ecal_is_ok)
{
g_shutdown = 1;
}
iter->second->CheckReceiveTimeout();
}

// idle thread
std::this_thread::sleep_for(std::chrono::milliseconds(CMN_DATAREADER_TIMEOUT_RESOLUTION_MS));
// signal shutdown if eCAL is not okay
const bool ecal_is_ok = (g_globals_ctx != nullptr) && !gWaitForEvent(ShutdownProcEvent(), 0);
if (!ecal_is_ok)
{
g_shutdown = 1;
}

// idle thread
std::this_thread::sleep_for(std::chrono::milliseconds(CMN_DATAREADER_TIMEOUT_RESOLUTION_MS));
}

bool CSubGate::ApplyTopicToDescGate(const std::string& topic_name_, const SDataTypeInformation& topic_info_)
Expand Down
12 changes: 6 additions & 6 deletions ecal/core/src/pubsub/ecal_subgate.h
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,12 @@
#pragma once

#include "readwrite/ecal_reader.h"
#include "util/ecal_thread.h"

#include <atomic>
#include <shared_mutex>
#include <memory>
#include <string>
#include <thread>
#include <unordered_map>

namespace eCAL
Expand Down Expand Up @@ -61,14 +62,13 @@ namespace eCAL
void CheckTimeouts();
bool ApplyTopicToDescGate(const std::string& topic_name_, const SDataTypeInformation& topic_info_);

static std::atomic<bool> m_created;
static std::atomic<bool> m_created;

// database data reader
using TopicNameDataReaderMapT = std::unordered_multimap<std::string, std::shared_ptr<CDataReader>>;
std::shared_timed_mutex m_topic_name_datareader_sync;
TopicNameDataReaderMapT m_topic_name_datareader_map;
std::shared_timed_mutex m_topic_name_datareader_sync;
TopicNameDataReaderMapT m_topic_name_datareader_map;

std::thread m_subtimeout_thread;
std::atomic<bool> m_subtimeout_thread_stop;
std::shared_ptr<CallbackThread> m_subtimeout_thread;
};
}
112 changes: 112 additions & 0 deletions ecal/core/src/util/ecal_thread.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
/* ========================= eCAL LICENSE =================================
*
* Copyright (C) 2016 - 2019 Continental Corporation
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
* ========================= eCAL LICENSE =================================
*/

/**
* @brief eCAL threading helper class
**/

#include <chrono>
#include <condition_variable>
#include <functional>
#include <mutex>
#include <thread>

#pragma once

namespace eCAL
{
/**
* @brief A class that encapsulates threaded functionality with a callback interface.
*/
class CallbackThread {
public:
/**
* @brief Constructor for the CallbackThread class.
* @param callback A callback function to be executed in the CallbackThread thread.
*/
CallbackThread(std::function<void()> callback)
: callback_(callback), stopThread_(false) {}

/**
* @brief Start the callback thread with a specified timeout.
* @param timeout The timeout duration for waiting in the callback thread.
*/
template <typename DurationType>
void start(DurationType timeout)
{
callbackThread_ = std::thread(&CallbackThread::callbackFunction<DurationType>, this, timeout);
}

/**
* @brief Stop the callback thread.
* Waits for the callback thread to finish its work.
*/
void stop()
{
{
std::unique_lock<std::mutex> lock(mtx_);
// Set the flag to signal the callback thread to stop
stopThread_ = true;
// Notify the callback thread to wake up and check the flag
cv_.notify_one();
}

// Wait for the callback thread to finish
if (callbackThread_.joinable()) {
callbackThread_.join();
}
}

private:
std::thread callbackThread_; /**< The callback thread object. */
std::function<void()> callback_; /**< The callback function to be executed in the callback thread. */
std::mutex mtx_; /**< Mutex for thread synchronization. */
std::condition_variable cv_; /**< Condition variable for signaling between threads. */
bool stopThread_; /**< Flag to indicate whether the callback thread should stop. */

/**
* @brief Callback function that runs in the callback thread.
* Periodically checks the stopThread flag and executes the callback function.
* @tparam DurationType The type of the timeout duration (e.g., std::chrono::seconds, std::chrono::milliseconds).
* @param timeout The timeout duration for waiting in the callback thread.
*/
template <typename DurationType>
void callbackFunction(DurationType timeout)
{
while (true)
{
{
std::unique_lock<std::mutex> lock(mtx_);
// Wait for a signal or a timeout
if (cv_.wait_for(lock, timeout, [this] { return stopThread_; }))
{
// If the stopThread flag is true, break out of the loop
break;
}
}

// Do some work in the callback thread
if (callback_)
{
callback_();
}
}
}
};
}

0 comments on commit aa39d7c

Please sign in to comment.