Skip to content

Commit

Permalink
eCAL::CThread replaced finally by std::thread
Browse files Browse the repository at this point in the history
CReceiveSlot logic in separate unit rcv_sample_slot.cpp
shm monitoring registration simplified
  • Loading branch information
rex-schilasky committed Nov 26, 2023
1 parent 7892840 commit e6c2458
Show file tree
Hide file tree
Showing 28 changed files with 487 additions and 641 deletions.
6 changes: 3 additions & 3 deletions ecal/core/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -151,10 +151,13 @@ set(ecal_io_udp_src
src/io/udp/rcv_logging.h
src/io/udp/rcv_sample.cpp
src/io/udp/rcv_sample.h
src/io/udp/rcv_sample_slot.cpp
src/io/udp/rcv_sample_slot.h
src/io/udp/snd_raw_buffer.cpp
src/io/udp/snd_raw_buffer.h
src/io/udp/snd_sample.cpp
src/io/udp/snd_sample.h
src/io/udp/topic2mcast.h
src/io/udp/udp_configurations.cpp
src/io/udp/udp_configurations.h
src/io/udp/udp_init.cpp
Expand Down Expand Up @@ -297,8 +300,6 @@ set(ecal_util_src
src/util/convert_utf.cpp
src/util/convert_utf.h
src/util/ecal_expmap.h
src/util/ecal_thread.cpp
src/util/ecal_thread.h
src/util/getenvvar.h
src/util/sys_usage.cpp
src/util/sys_usage.h
Expand All @@ -323,7 +324,6 @@ set(ecal_cmn_src
src/ecal_global_accessors.h
src/ecal_globals.h
src/ecal_sample_to_topicinfo.h
src/topic2mcast.h
)
if (WIN32)
list (APPEND
Expand Down
43 changes: 8 additions & 35 deletions ecal/core/src/io/udp/rcv_logging.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,14 +21,8 @@
* @brief UDP logging receiver to receive messages of type eCAL::pb::LogMessage
**/

#include <ecal/ecal.h>
#include <ecal/ecal_config.h>

#include "ecal_def.h"
#include "io/udp/msg_type.h"
#include "io/udp/udp_configurations.h"

#include "rcv_logging.h"
#include "io/udp/msg_type.h"

namespace
{
Expand All @@ -43,33 +37,17 @@ namespace

namespace eCAL
{
static bool IsLocalHost(const eCAL::pb::LogMessage& ecal_message_)
{
const std::string host_name = ecal_message_.hname();
if (host_name.empty()) return(false);
if (host_name == Process::GetHostName()) return(true);
return(false);
}

CUDPLoggingReceiver::CUDPLoggingReceiver(LogMessageCallbackT log_cb_) :
m_network_mode(false), m_log_message_callback(log_cb_)
CUDPLoggingReceiver::CUDPLoggingReceiver(const eCAL::SReceiverAttr& attr_, LogMessageCallbackT log_message_callback_) :
m_network_mode(!attr_.broadcast), m_log_message_callback(log_message_callback_)
{
// set network attributes
SReceiverAttr attr;
attr.address = UDP::GetLoggingAddress();
attr.port = UDP::GetLoggingPort();
attr.broadcast = !Config::IsNetworkEnabled();
attr.loopback = true;
attr.rcvbuf = Config::GetUdpMulticastRcvBufSizeBytes();

// create udp logging receiver
m_udp_receiver.Create(attr);

// start logging receiver thread
m_receive_thread = std::thread(&CUDPLoggingReceiver::ReceiveThread, this);
// create udp receiver
m_udp_receiver.Create(attr_);

// allocate receive buffer
m_msg_buffer.resize(MSG_BUFFER_SIZE);

// start receiver thread
m_receive_thread = std::thread(&CUDPLoggingReceiver::ReceiveThread, this);
}

CUDPLoggingReceiver::~CUDPLoggingReceiver()
Expand All @@ -78,11 +56,6 @@ namespace eCAL
m_receive_thread.join();
}

void CUDPLoggingReceiver::SetNetworkMode(bool network_mode_)
{
m_network_mode = network_mode_;
}

void CUDPLoggingReceiver::ReceiveThread()
{
while (!m_receive_thread_stop.load(std::memory_order_acquire))
Expand Down
8 changes: 1 addition & 7 deletions ecal/core/src/io/udp/rcv_logging.h
Original file line number Diff line number Diff line change
Expand Up @@ -25,13 +25,9 @@

#include "ecal_def.h"
#include "udp_receiver.h"
#include "msg_type.h"

#include <chrono>
#include <memory>
#include <string>
#include <thread>
#include <unordered_map>
#include <vector>

#ifdef _MSC_VER
Expand All @@ -49,10 +45,8 @@ namespace eCAL
public:
using LogMessageCallbackT = std::function<void(const eCAL::pb::LogMessage&)>;

CUDPLoggingReceiver(LogMessageCallbackT log_cb_);
CUDPLoggingReceiver(const eCAL::SReceiverAttr& attr_, LogMessageCallbackT log_message_callback_);
virtual ~CUDPLoggingReceiver();

void SetNetworkMode(bool network_mode_);

protected:
void ReceiveThread();
Expand Down
148 changes: 10 additions & 138 deletions ecal/core/src/io/udp/rcv_sample.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,130 +21,8 @@
* @brief UDP sample receiver to receive messages of type eCAL::pb::Sample
**/

#include <ecal/ecal.h>

#include <algorithm>

#include "ecal_def.h"
#include "rcv_sample.h"


CReceiveSlot::CReceiveSlot()
: m_timeout(0.0)
, m_recv_mode(rcm_waiting)
, m_message_id(0)
, m_message_total_num(0)
, m_message_total_len(0)
, m_message_curr_num(0)
, m_message_curr_len(0)
{
}

CReceiveSlot::~CReceiveSlot() = default;

int CReceiveSlot::ApplyMessage(const struct SUDPMessage& ecal_message_)
{
// reset timeout
m_timeout = std::chrono::duration<double>(0.0);

// process current packet
switch(ecal_message_.header.type)
{
// new message started
case msg_type_header:
OnMessageStart(ecal_message_);
break;
// message data package
case msg_type_content:
if(m_recv_mode == rcm_reading)
{
OnMessageData(ecal_message_);
}
break;
}

// we have a complete message in the receive buffer
if(m_recv_mode == rcm_completed)
{
// call complete event
OnMessageCompleted(std::move(m_recv_buffer));
}

return(0);
}

int CReceiveSlot::OnMessageStart(const struct SUDPMessage& ecal_message_)
{
// store header info
m_message_id = ecal_message_.header.id;
m_message_total_num = ecal_message_.header.num;
m_message_total_len = ecal_message_.header.len;

// reset current message states
m_message_curr_num = 0;
m_message_curr_len = 0;

// prepare receive buffer
m_recv_buffer.reserve(static_cast<size_t>(m_message_total_len));

// switch to reading mode
m_recv_mode = rcm_reading;

return(0);
}

int CReceiveSlot::OnMessageData(const struct SUDPMessage& ecal_message_)
{
// check message id
if(ecal_message_.header.id != m_message_id)
{
#ifndef NDEBUG
// log it
eCAL::Logging::Log(log_level_debug3, "UDP Sample OnMessageData - WRONG MESSAGE PACKET ID " + std::to_string(ecal_message_.header.id));
#endif
m_recv_mode = rcm_aborted;
return(-1);
}

// check current packet counter
if(ecal_message_.header.num != m_message_curr_num)
{
#ifndef NDEBUG
// log it
eCAL::Logging::Log(log_level_debug3, "UDP Sample OnMessageData - WRONG MESSAGE PACKET NUMBER " + std::to_string(ecal_message_.header.num) + " / " + std::to_string(m_message_curr_num));
#endif
m_recv_mode = rcm_aborted;
return(-1);
}

// check current packet length
if(ecal_message_.header.len <= 0)
{
#ifndef NDEBUG
// log it
eCAL::Logging::Log(log_level_debug3, "UDP Sample OnMessageData - WRONG MESSAGE PACKET LENGTH " + std::to_string(ecal_message_.header.len));
#endif
m_recv_mode = rcm_aborted;
return(-1);
}

// copy the message part to the receive message buffer
m_recv_buffer.resize(m_recv_buffer.size() + static_cast<size_t>(ecal_message_.header.len));
memcpy(m_recv_buffer.data() + m_recv_buffer.size() - static_cast<size_t>(ecal_message_.header.len), ecal_message_.payload, static_cast<size_t>(ecal_message_.header.len));

// increase packet counter
m_message_curr_num++;

// increase current length
m_message_curr_len += ecal_message_.header.len;

// last message packet ? -> switch to completed mode
if(m_message_curr_num == m_message_total_num) m_recv_mode = rcm_completed;

return(0);
}


CUDPSampleReceiver::CSampleReceiveSlot::CSampleReceiveSlot(CUDPSampleReceiver* sample_receiver_)
: m_sample_receiver(sample_receiver_)
{
Expand Down Expand Up @@ -214,28 +92,22 @@ int CUDPSampleReceiver::CSampleReceiveSlot::OnMessageCompleted(std::vector<char>
return(0);
}

CUDPSampleReceiver::CUDPSampleReceiver()
{
m_msg_buffer.resize(MSG_BUFFER_SIZE);
m_cleanup_start = std::chrono::steady_clock::now();
}

CUDPSampleReceiver::~CUDPSampleReceiver()
{
Stop();
}

void CUDPSampleReceiver::Start(const eCAL::SReceiverAttr& attr_, HasSampleCallbackT has_sample_callback_, ApplySampleCallbackT apply_sample_callback_)
CUDPSampleReceiver::CUDPSampleReceiver(const eCAL::SReceiverAttr& attr_, HasSampleCallbackT has_sample_callback_, ApplySampleCallbackT apply_sample_callback_) :
m_has_sample_callback(has_sample_callback_), m_apply_sample_callback(apply_sample_callback_)
{
m_has_sample_callback = has_sample_callback_;
m_apply_sample_callback = apply_sample_callback_;

// create udp receiver
m_udp_receiver.Create(attr_);

// allocate receive buffer
m_msg_buffer.resize(MSG_BUFFER_SIZE);

// start receiver thread
m_receive_thread = std::thread(&CUDPSampleReceiver::ReceiveThread, this);

m_cleanup_start = std::chrono::steady_clock::now();
}

void CUDPSampleReceiver::Stop()
CUDPSampleReceiver::~CUDPSampleReceiver()
{
m_receive_thread_stop.store(true, std::memory_order_release);
m_receive_thread.join();
Expand Down
Loading

0 comments on commit e6c2458

Please sign in to comment.