Skip to content

Commit

Permalink
Merge pull request OpenDDS#4778 from mitza-oci/rtm-errors
Browse files Browse the repository at this point in the history
Updated RelayThreadMonitor error reporting
  • Loading branch information
mitza-oci authored Aug 20, 2024
2 parents e3cee05 + d3b36d4 commit 34b0ea4
Show file tree
Hide file tree
Showing 2 changed files with 56 additions and 39 deletions.
8 changes: 8 additions & 0 deletions docs/news.d/relay-thread-monitor.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
.. news-prs: 4778
.. news-start-section: Additions
- Improved RelayThreadMonitor error reporting

- All threads that have missed the deadline are reported before the process dies. These log messages include the timestamp of the most recent update for each thread that has missed the deadline.

.. news-end-section
87 changes: 48 additions & 39 deletions tools/rtpsrelay/RelayThreadMonitor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,9 @@

#include <ace/Thread.h>

#include <cstdlib>
#include <vector>

using namespace OpenDDS::DCPS;
namespace RtpsRelay {

Expand Down Expand Up @@ -49,49 +52,55 @@ int RelayThreadMonitor::svc()

ACE_GUARD_RETURN(ACE_Thread_Mutex, g, mutex_, -1);

int count = 0;

while (running_) {
for (auto count = 0; running_; count = (count + 1) % safety_factor) {
condition_.wait_until(MonotonicTimePoint::now() + thread_status_interval, thread_status_manager);
if (running_) {
OpenDDS::DCPS::InternalThreadBuiltinTopicDataSeq datas;
DDS::SampleInfoSeq infos;
const DDS::ReturnCode_t ret = thread_status_reader_->read(datas,
infos,
DDS::LENGTH_UNLIMITED,
DDS::ANY_SAMPLE_STATE,
DDS::ANY_VIEW_STATE,
DDS::ANY_INSTANCE_STATE);
if (ret != DDS::RETCODE_OK) {
ACE_ERROR((LM_ERROR, ACE_TEXT("(%P|%t) ERROR: RelayThreadMonitor::svc failed to read %C\n"), OpenDDS::DCPS::retcode_to_string(ret)));
continue;
if (!running_) {
break;
}
OpenDDS::DCPS::InternalThreadBuiltinTopicDataSeq datas;
DDS::SampleInfoSeq infos;
const DDS::ReturnCode_t ret = thread_status_reader_->read(datas,
infos,
DDS::LENGTH_UNLIMITED,
DDS::ANY_SAMPLE_STATE,
DDS::ANY_VIEW_STATE,
DDS::ANY_INSTANCE_STATE);
if (ret != DDS::RETCODE_OK) {
ACE_ERROR((LM_ERROR, "(%P|%t) ERROR: RelayThreadMonitor::svc failed to read %C\n", OpenDDS::DCPS::retcode_to_string(ret)));
continue;
}

const SystemTimePoint expire = SystemTimePoint::now() - thread_status_interval * safety_factor;
const auto log_all_threads = count == 0 && config_.log_thread_status();
std::vector<DDS::UInt32> late_thread_indexes;

for (CORBA::ULong idx = 0; idx != infos.length(); ++idx) {
if (infos[idx].valid_data) {
utilization_[datas[idx].thread_id.in()] = datas[idx].utilization;
} else if (infos[idx].instance_state != DDS::ALIVE_INSTANCE_STATE) {
utilization_.erase(datas[idx].thread_id.in());
}

const SystemTimePoint expire = SystemTimePoint::now() - thread_status_interval * safety_factor;

for (CORBA::ULong idx = 0; idx != infos.length(); ++idx) {
if (infos[idx].valid_data) {
utilization_[datas[idx].thread_id.in()] = datas[idx].utilization;
} else if (infos[idx].instance_state != DDS::ALIVE_INSTANCE_STATE) {
utilization_.erase(datas[idx].thread_id.in());
}

if (count == 0 && config_.log_thread_status()) {
ACE_DEBUG((LM_INFO, ACE_TEXT("(%P|%t) INFO: Thread Status %C %C\n"),
to_json(datas[idx]).c_str(),
to_json(infos[idx]).c_str()));
}

const SystemTimePoint timestamp(infos[idx].source_timestamp);
if (infos[idx].instance_state == DDS::ALIVE_INSTANCE_STATE && timestamp < expire) {
ACE_ERROR((LM_ERROR,
ACE_TEXT("(%P|%t) ERROR: RelayThreadMonitor::svc thread %C failed to update status. Aborting...\n"),
datas[idx].thread_id.in()));
abort();
}
if (log_all_threads) {
ACE_DEBUG((LM_INFO, "(%P|%t) INFO: Thread Status %C %C\n",
infos[idx].valid_data ? to_json(datas[idx]).c_str() : datas[idx].thread_id.in(),
to_json(infos[idx]).c_str()));
}

count = (count + 1) % safety_factor;
if (infos[idx].instance_state == DDS::ALIVE_INSTANCE_STATE && SystemTimePoint(infos[idx].source_timestamp) < expire) {
late_thread_indexes.push_back(idx);
}
}

for (const auto idx : late_thread_indexes) {
const SystemTimePoint timestamp(infos[idx].source_timestamp);
ACE_ERROR((LM_ERROR,
"(%P|%t) ERROR: RelayThreadMonitor::svc thread %C last update %#T. Aborting...\n",
datas[idx].thread_id.in(), &timestamp.value()));
}

if (!late_thread_indexes.empty()) {
std::abort();
}
}

Expand All @@ -116,7 +125,7 @@ void RelayThreadMonitor::on_data_available(DDS::DataReader_ptr /*reader*/)
}

if (ret != DDS::RETCODE_OK) {
ACE_ERROR((LM_ERROR, ACE_TEXT("(%P|%t) ERROR: RelayThreadMonitor::on_data_available failed to read %C\n"), OpenDDS::DCPS::retcode_to_string(ret)));
ACE_ERROR((LM_ERROR, "(%P|%t) ERROR: RelayThreadMonitor::on_data_available failed to read %C\n", OpenDDS::DCPS::retcode_to_string(ret)));
return;
}

Expand Down

0 comments on commit 34b0ea4

Please sign in to comment.