Skip to content

Commit

Permalink
fixed receive timeout (10 ms) in CSampleReceiver::Receive replaced by…
Browse files Browse the repository at this point in the history
… function parameter

separate udp receive thread cycle times introduced for registration, logging and payload
delta time (resolution) for checking subscriber timeouts increased from 10 ms to 100 ms
  • Loading branch information
rex-schilasky committed Nov 20, 2023
1 parent 0f41b3a commit 8460786
Show file tree
Hide file tree
Showing 8 changed files with 21 additions and 12 deletions.
15 changes: 12 additions & 3 deletions ecal/core/src/ecal_def.h
Original file line number Diff line number Diff line change
Expand Up @@ -166,13 +166,22 @@
/* ecal internal timings */
/**********************************************************************************************/
/* timeout for automatic removing registered topics and memory files in global database in ms */
#define CMN_REGISTRATION_TO (60*1000)
#define CMN_REGISTRATION_TO (60*1000)

/* time for resend registration info from publisher/subscriber in ms */
#define CMN_REGISTRATION_REFRESH 1000
#define CMN_REGISTRATION_REFRESH 1000

/* delta time to check timeout for data readers in ms */
#define CMN_DATAREADER_TIMEOUT_DTIME 10
#define CMN_DATAREADER_TIMEOUT_RESOLUTION_MS 100

/* cylce time udp registration receive thread in ms */
#define CMN_REGISTRATION_RECEIVE_THREAD_CYCLE_TIME_MS 1000

/* cylce time udp logging receive thread in ms */
#define CMN_LOGGING_RECEIVE_THREAD_CYCLE_TIME_MS 1000

/* cylce time udp paylaod receive thread in ms */
#define CMN_PAYLOAD_RECEIVE_THREAD_CYCLE_TIME_MS 1000

/**********************************************************************************************/
/* events */
Expand Down
2 changes: 1 addition & 1 deletion ecal/core/src/ecal_registration_receiver.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,7 @@ namespace eCAL
attr.rcvbuf = Config::GetUdpMulticastRcvBufSizeBytes();

m_reg_rcv.Create(attr);
m_reg_rcv_thread.Start(0, std::bind(&CUdpRegistrationReceiver::Receive, &m_reg_rcv_process, &m_reg_rcv));
m_reg_rcv_thread.Start(0, std::bind(&CUdpRegistrationReceiver::Receive, &m_reg_rcv_process, &m_reg_rcv, CMN_REGISTRATION_RECEIVE_THREAD_CYCLE_TIME_MS));
}

if (m_use_shm_monitoring)
Expand Down
4 changes: 2 additions & 2 deletions ecal/core/src/io/rcv_sample.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -222,12 +222,12 @@ CSampleReceiver::CSampleReceiver()

CSampleReceiver::~CSampleReceiver() = default;

int CSampleReceiver::Receive(eCAL::CUDPReceiver* sample_receiver_)
int CSampleReceiver::Receive(eCAL::CUDPReceiver* sample_receiver_, int timeout_)
{
if(sample_receiver_ == nullptr) return(-1);

// wait for any incoming message
const size_t recv_len = sample_receiver_->Receive(m_msg_buffer.data(), m_msg_buffer.size(), 10);
const size_t recv_len = sample_receiver_->Receive(m_msg_buffer.data(), m_msg_buffer.size(), timeout_);
if(recv_len > 0)
{
return(Process(m_msg_buffer.data(), recv_len));
Expand Down
2 changes: 1 addition & 1 deletion ecal/core/src/io/rcv_sample.h
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ class CSampleReceiver
virtual bool HasSample(const std::string& sample_name_) = 0;
virtual bool ApplySample(const eCAL::pb::Sample& ecal_sample_, eCAL::pb::eTLayerType layer_) = 0;

int Receive(eCAL::CUDPReceiver* sample_receiver_);
int Receive(eCAL::CUDPReceiver* sample_receiver_, int timeout_);
int Process(const char* sample_buffer_, size_t sample_buffer_len_);

protected:
Expand Down
4 changes: 2 additions & 2 deletions ecal/core/src/mon/ecal_monitoring_threads.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ namespace eCAL
int CLoggingReceiveThread::ThreadFun()
{
// wait for any incoming message
const size_t recv_len = m_log_rcv.Receive(m_msg_buffer.data(), m_msg_buffer.size(), 10);
const size_t recv_len = m_log_rcv.Receive(m_msg_buffer.data(), m_msg_buffer.size(), CMN_LOGGING_RECEIVE_THREAD_CYCLE_TIME_MS);
if (recv_len > 0)
{
m_log_ecal_msg.Clear();
Expand All @@ -88,7 +88,7 @@ namespace eCAL
CMonLogPublishingThread::CMonLogPublishingThread(MonitoringCallbackT mon_cb_, LoggingCallbackT log_cb_) :
m_mon_cb(mon_cb_), m_log_cb(log_cb_)
{
m_pub_thread.Start(CMN_REGISTRATION_REFRESH, std::bind(&CMonLogPublishingThread::ThreadFun, this));
m_pub_thread.Start(Config::GetRegistrationRefreshMs(), std::bind(&CMonLogPublishingThread::ThreadFun, this));
}

CMonLogPublishingThread::~CMonLogPublishingThread()
Expand Down
2 changes: 1 addition & 1 deletion ecal/core/src/pubsub/ecal_subgate.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ namespace eCAL
CDataReader::InitializeLayers();

// start timeout thread
m_subtimeout_thread.Start(CMN_DATAREADER_TIMEOUT_DTIME, std::bind(&CSubGate::CheckTimeouts, this));
m_subtimeout_thread.Start(CMN_DATAREADER_TIMEOUT_RESOLUTION_MS, std::bind(&CSubGate::CheckTimeouts, this));
m_created = true;
}

Expand Down
2 changes: 1 addition & 1 deletion ecal/core/src/readwrite/ecal_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -943,7 +943,7 @@ namespace eCAL
// check receive timeout
if(m_receive_timeout > 0)
{
m_receive_time += CMN_DATAREADER_TIMEOUT_DTIME;
m_receive_time += CMN_DATAREADER_TIMEOUT_RESOLUTION_MS;
if(m_receive_time > m_receive_timeout)
{
const std::lock_guard<std::mutex> lock(m_event_callback_map_sync);
Expand Down
2 changes: 1 addition & 1 deletion ecal/core/src/readwrite/ecal_reader_udp_mc.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ namespace eCAL
{
if (!started)
{
thread.Start(0, std::bind(&CDataReaderUDP::Receive, &reader, &rcv));
thread.Start(0, std::bind(&CDataReaderUDP::Receive, &reader, &rcv, CMN_PAYLOAD_RECEIVE_THREAD_CYCLE_TIME_MS));
started = true;
}
// add topic name based multicast address
Expand Down

0 comments on commit 8460786

Please sign in to comment.