From 0a3fee2ac2352833bd95b8c3b946e02bf1ebd034 Mon Sep 17 00:00:00 2001 From: Adam Mitz Date: Tue, 20 Aug 2024 19:52:32 +0000 Subject: [PATCH 1/2] Updated RelayThreadMonitor error reporting All threads that have missed the deadline are reported before the process dies. These log messages include the timestamp of the most recent update for each thread that has missed the deadline. --- tools/rtpsrelay/RelayThreadMonitor.cpp | 87 ++++++++++++++------------ 1 file changed, 48 insertions(+), 39 deletions(-) diff --git a/tools/rtpsrelay/RelayThreadMonitor.cpp b/tools/rtpsrelay/RelayThreadMonitor.cpp index 8346b16cebe..f14aee28e1e 100644 --- a/tools/rtpsrelay/RelayThreadMonitor.cpp +++ b/tools/rtpsrelay/RelayThreadMonitor.cpp @@ -7,6 +7,9 @@ #include +#include +#include + using namespace OpenDDS::DCPS; namespace RtpsRelay { @@ -49,49 +52,55 @@ int RelayThreadMonitor::svc() ACE_GUARD_RETURN(ACE_Thread_Mutex, g, mutex_, -1); - int count = 0; - - while (running_) { + for (auto count = 0; running_; count = (count + 1) % safety_factor) { condition_.wait_until(MonotonicTimePoint::now() + thread_status_interval, thread_status_manager); - if (running_) { - OpenDDS::DCPS::InternalThreadBuiltinTopicDataSeq datas; - DDS::SampleInfoSeq infos; - const DDS::ReturnCode_t ret = thread_status_reader_->read(datas, - infos, - DDS::LENGTH_UNLIMITED, - DDS::ANY_SAMPLE_STATE, - DDS::ANY_VIEW_STATE, - DDS::ANY_INSTANCE_STATE); - if (ret != DDS::RETCODE_OK) { - ACE_ERROR((LM_ERROR, ACE_TEXT("(%P|%t) ERROR: RelayThreadMonitor::svc failed to read %C\n"), OpenDDS::DCPS::retcode_to_string(ret))); - continue; + if (!running_) { + break; + } + OpenDDS::DCPS::InternalThreadBuiltinTopicDataSeq datas; + DDS::SampleInfoSeq infos; + const DDS::ReturnCode_t ret = thread_status_reader_->read(datas, + infos, + DDS::LENGTH_UNLIMITED, + DDS::ANY_SAMPLE_STATE, + DDS::ANY_VIEW_STATE, + DDS::ANY_INSTANCE_STATE); + if (ret != DDS::RETCODE_OK) { + ACE_ERROR((LM_ERROR, "(%P|%t) ERROR: RelayThreadMonitor::svc failed to read %C\n", OpenDDS::DCPS::retcode_to_string(ret))); + continue; + } + + const SystemTimePoint expire = SystemTimePoint::now() - thread_status_interval * safety_factor; + const auto log_all_threads = count == 0 && config_.log_thread_status(); + std::vector late_thread_indexes; + + for (CORBA::ULong idx = 0; idx != infos.length(); ++idx) { + if (infos[idx].valid_data) { + utilization_[datas[idx].thread_id.in()] = datas[idx].utilization; + } else if (infos[idx].instance_state != DDS::ALIVE_INSTANCE_STATE) { + utilization_.erase(datas[idx].thread_id.in()); } - const SystemTimePoint expire = SystemTimePoint::now() - thread_status_interval * safety_factor; - - for (CORBA::ULong idx = 0; idx != infos.length(); ++idx) { - if (infos[idx].valid_data) { - utilization_[datas[idx].thread_id.in()] = datas[idx].utilization; - } else if (infos[idx].instance_state != DDS::ALIVE_INSTANCE_STATE) { - utilization_.erase(datas[idx].thread_id.in()); - } - - if (count == 0 && config_.log_thread_status()) { - ACE_DEBUG((LM_INFO, ACE_TEXT("(%P|%t) INFO: Thread Status %C %C\n"), - to_json(datas[idx]).c_str(), - to_json(infos[idx]).c_str())); - } - - const SystemTimePoint timestamp(infos[idx].source_timestamp); - if (infos[idx].instance_state == DDS::ALIVE_INSTANCE_STATE && timestamp < expire) { - ACE_ERROR((LM_ERROR, - ACE_TEXT("(%P|%t) ERROR: RelayThreadMonitor::svc thread %C failed to update status. Aborting...\n"), - datas[idx].thread_id.in())); - abort(); - } + if (log_all_threads) { + ACE_DEBUG((LM_INFO, "(%P|%t) INFO: Thread Status %C %C\n", + infos[idx].valid_data ? to_json(datas[idx]).c_str() : datas[idx].thread_id.in(), + to_json(infos[idx]).c_str())); } - count = (count + 1) % safety_factor; + if (infos[idx].instance_state == DDS::ALIVE_INSTANCE_STATE && SystemTimePoint(infos[idx].source_timestamp) < expire) { + late_thread_indexes.push_back(idx); + } + } + + for (const auto idx : late_thread_indexes) { + const SystemTimePoint timestamp(infos[idx].source_timestamp); + ACE_ERROR((LM_ERROR, + "(%P|%t) ERROR: RelayThreadMonitor::svc thread %C last update %#T. Aborting...\n", + datas[idx].thread_id.in(), ×tamp.value())); + } + + if (!late_thread_indexes.empty()) { + std::abort(); } } @@ -116,7 +125,7 @@ void RelayThreadMonitor::on_data_available(DDS::DataReader_ptr /*reader*/) } if (ret != DDS::RETCODE_OK) { - ACE_ERROR((LM_ERROR, ACE_TEXT("(%P|%t) ERROR: RelayThreadMonitor::on_data_available failed to read %C\n"), OpenDDS::DCPS::retcode_to_string(ret))); + ACE_ERROR((LM_ERROR, "(%P|%t) ERROR: RelayThreadMonitor::on_data_available failed to read %C\n", OpenDDS::DCPS::retcode_to_string(ret))); return; } From d3b36d4ae3ff953936f320a2d23bf4e62b5f990a Mon Sep 17 00:00:00 2001 From: Adam Mitz Date: Tue, 20 Aug 2024 20:00:33 +0000 Subject: [PATCH 2/2] NEWS --- docs/news.d/relay-thread-monitor.rst | 8 ++++++++ 1 file changed, 8 insertions(+) create mode 100644 docs/news.d/relay-thread-monitor.rst diff --git a/docs/news.d/relay-thread-monitor.rst b/docs/news.d/relay-thread-monitor.rst new file mode 100644 index 00000000000..9254c772e2b --- /dev/null +++ b/docs/news.d/relay-thread-monitor.rst @@ -0,0 +1,8 @@ +.. news-prs: 4778 + +.. news-start-section: Additions +- Improved RelayThreadMonitor error reporting + + - All threads that have missed the deadline are reported before the process dies. These log messages include the timestamp of the most recent update for each thread that has missed the deadline. + +.. news-end-section