From d8d033838bac53a47794666bfa57e8fa0f54f5bc Mon Sep 17 00:00:00 2001 From: Adam Mitz Date: Thu, 26 Sep 2024 20:08:17 +0000 Subject: [PATCH 1/3] Use a configurable thread pool for the RtpsRelay event handlers --- tools/rtpsrelay/CMakeLists.txt | 1 + tools/rtpsrelay/Config.h | 12 +++ tools/rtpsrelay/RelayEventLoop.cpp | 115 +++++++++++++++++++++++++++++ tools/rtpsrelay/RelayEventLoop.h | 23 ++++++ tools/rtpsrelay/RtpsRelay.cpp | 41 ++-------- 5 files changed, 159 insertions(+), 33 deletions(-) create mode 100644 tools/rtpsrelay/RelayEventLoop.cpp create mode 100644 tools/rtpsrelay/RelayEventLoop.h diff --git a/tools/rtpsrelay/CMakeLists.txt b/tools/rtpsrelay/CMakeLists.txt index 3f9c02a507f..51b9576d4d8 100644 --- a/tools/rtpsrelay/CMakeLists.txt +++ b/tools/rtpsrelay/CMakeLists.txt @@ -16,6 +16,7 @@ add_executable(RtpsRelay ParticipantStatisticsReporter.cpp PublicationListener.cpp RelayAddressListener.cpp + RelayEventLoop.cpp RelayHandler.cpp RelayHttpMetaDiscovery.cpp RelayParticipantStatusReporter.cpp diff --git a/tools/rtpsrelay/Config.h b/tools/rtpsrelay/Config.h index 9150eb2eecc..c304f62af33 100644 --- a/tools/rtpsrelay/Config.h +++ b/tools/rtpsrelay/Config.h @@ -36,6 +36,7 @@ class Config { , max_ips_per_client_(0) , admission_max_participants_high_water_(0) , admission_max_participants_low_water_(0) + , handler_threads_(1) {} void relay_id(const std::string& value) @@ -338,6 +339,16 @@ class Config { return admission_max_participants_low_water_; } + void handler_threads(size_t count) + { + handler_threads_ = count; + } + + size_t handler_threads() const + { + return handler_threads_; + } + private: std::string relay_id_; OpenDDS::DCPS::GUID_t application_participant_guid_; @@ -369,6 +380,7 @@ class Config { OpenDDS::DCPS::TimeDuration rejected_address_duration_; size_t admission_max_participants_high_water_; size_t admission_max_participants_low_water_; + size_t handler_threads_; }; } diff --git a/tools/rtpsrelay/RelayEventLoop.cpp b/tools/rtpsrelay/RelayEventLoop.cpp new file mode 100644 index 00000000000..e5d52c85f6b --- /dev/null +++ b/tools/rtpsrelay/RelayEventLoop.cpp @@ -0,0 +1,115 @@ +/* + * Distributed under the OpenDDS License. + * See: http://www.opendds.org/license.html + */ + +#include "RelayEventLoop.h" + +#include +#include +#include + +#include + +namespace RtpsRelay { + +struct ThreadPool : ACE_Task_Base { + + ThreadPool(const Config& config, ACE_Reactor& reactor, RelayThreadMonitor& monitor) + : config_(config) + , reactor_(reactor) + , monitor_(monitor) + {} + + int svc() override; + int run(); + + const Config& config_; + ACE_Reactor& reactor_; + RelayThreadMonitor& monitor_; + OpenDDS::DCPS::ThreadStatusManager& thread_status_manager_ = TheServiceParticipant->get_thread_status_manager(); +}; + +struct RunThreadMonitor { + + RunThreadMonitor(OpenDDS::DCPS::ThreadStatusManager& thread_status_manager, RelayThreadMonitor& monitor) + : should_run_(thread_status_manager.update_thread_status()) + , monitor_(monitor) + , status_(should_run_ ? monitor.start() : EXIT_SUCCESS) + {} + + ~RunThreadMonitor() + { + if (should_run_ && status_ == EXIT_SUCCESS) { + monitor_.stop(); + } + } + + const bool should_run_; + RelayThreadMonitor& monitor_; + const int status_; +}; + +int ThreadPool::run() +{ + RunThreadMonitor rtm{thread_status_manager_, monitor_}; + if (rtm.status_ != EXIT_SUCCESS) { + ACE_ERROR((LM_ERROR, "(%P:%t) ERROR: RtpsRelay::ThreadPool::run - failed to start Relay Thread Monitor\n")); + return EXIT_FAILURE; + } + + const auto threads = config_.handler_threads(); + if (threads == 1) { + return svc(); + } + + const auto status = activate(THR_NEW_LWP | THR_JOINABLE | THR_INHERIT_SCHED, static_cast(threads)); + if (status != EXIT_SUCCESS) { + ACE_ERROR((LM_ERROR, "(%P:%t) ERROR: RtpsRelay::ThreadPool::run - failed to start thread pool: %m\n")); + return status; + } + + return wait(); +} + +int ThreadPool::svc() +{ + const auto has_run_time = !config_.run_time().is_zero(); + const auto end_time = OpenDDS::DCPS::MonotonicTimePoint::now() + config_.run_time(); + + if (thread_status_manager_.update_thread_status()) { + OpenDDS::DCPS::ThreadStatusManager::Start thread_status_monitoring_active(thread_status_manager_, "RtpsRelay Event Loop"); + + while (!has_run_time || OpenDDS::DCPS::MonotonicTimePoint::now() < end_time) { + auto t = thread_status_manager_.thread_status_interval().value(); + OpenDDS::DCPS::ThreadStatusManager::Sleeper s(thread_status_manager_); + if (reactor_.run_reactor_event_loop(t, 0) != 0) { + break; + } + } + + } else if (has_run_time) { + while (OpenDDS::DCPS::MonotonicTimePoint::now() < end_time) { + auto t = (end_time - OpenDDS::DCPS::MonotonicTimePoint::now()).value(); + if (reactor_.run_reactor_event_loop(t, 0) != 0) { + break; + } + } + + } else { + reactor_.run_reactor_event_loop(); + } + return EXIT_SUCCESS; +} + +ACE_Reactor_Impl* RelayEventLoop::make_reactor_impl(const Config& config) +{ + return config.handler_threads() == 1 ? new ACE_Select_Reactor : new ACE_TP_Reactor; +} + +int RelayEventLoop::run(const Config& config, ACE_Reactor& reactor, RelayThreadMonitor& monitor) +{ + return ThreadPool{config, reactor, monitor}.run(); +} + +} diff --git a/tools/rtpsrelay/RelayEventLoop.h b/tools/rtpsrelay/RelayEventLoop.h new file mode 100644 index 00000000000..25e11f273e8 --- /dev/null +++ b/tools/rtpsrelay/RelayEventLoop.h @@ -0,0 +1,23 @@ +/* + * Distributed under the OpenDDS License. + * See: http://www.opendds.org/license.html + */ +#ifndef RTPSRELAY_RELAY_EVENT_LOOP_H_ +#define RTPSRELAY_RELAY_EVENT_LOOP_H_ + +#include "Config.h" +#include "RelayThreadMonitor.h" + +#include + +namespace RtpsRelay { +namespace RelayEventLoop { + +ACE_Reactor_Impl* make_reactor_impl(const Config& config); + +int run(const Config& config, ACE_Reactor& reactor, RelayThreadMonitor& monitor); + +} +} + +#endif diff --git a/tools/rtpsrelay/RtpsRelay.cpp b/tools/rtpsrelay/RtpsRelay.cpp index f1946bb57ca..245e1299500 100644 --- a/tools/rtpsrelay/RtpsRelay.cpp +++ b/tools/rtpsrelay/RtpsRelay.cpp @@ -10,6 +10,7 @@ #include "ParticipantStatisticsReporter.h" #include "PublicationListener.h" #include "RelayAddressListener.h" +#include "RelayEventLoop.h" #include "RelayHandler.h" #include "RelayHttpMetaDiscovery.h" #include "RelayPartitionTable.h" @@ -38,7 +39,6 @@ #include #include #include -#include #include #include @@ -229,6 +229,9 @@ int run(int argc, ACE_TCHAR* argv[]) } else if ((arg = args.get_the_parameter("-RunTime"))) { config.run_time(OpenDDS::DCPS::TimeDuration(ACE_OS::atoi(arg))); args.consume_arg(); + } else if ((arg = args.get_the_parameter("-HandlerThreads"))) { + config.handler_threads(std::atoi(arg)); + args.consume_arg(); } else if ((arg = args.get_the_parameter("-MaxIpsPerClient"))) { config.max_ips_per_client(ACE_OS::atoi(arg)); args.consume_arg(); @@ -737,7 +740,7 @@ int run(int argc, ACE_TCHAR* argv[]) RelayParticipantStatusReporter relay_participant_status_reporter(config, relay_participant_status_writer, relay_statistics_reporter); RelayThreadMonitor* relay_thread_monitor = new RelayThreadMonitor(config); GuidAddrSet guid_addr_set(config, rtps_discovery, relay_participant_status_reporter, relay_statistics_reporter, *relay_thread_monitor); - ACE_Reactor reactor_(new ACE_Select_Reactor, true); + ACE_Reactor reactor_(RelayEventLoop::make_reactor_impl(config), true); const auto reactor = &reactor_; GuidPartitionTable guid_partition_table(config, spdp_horizontal_addr, relay_partitions_writer, spdp_replay_writer); RelayPartitionTable relay_partition_table; @@ -961,37 +964,9 @@ int run(int argc, ACE_TCHAR* argv[]) } ACE_DEBUG((LM_INFO, "(%P|%t) INFO: Meta Discovery listening on %C\n", OpenDDS::DCPS::LogAddr(meta_discovery_addr).c_str())); - const bool has_run_time = !config.run_time().is_zero(); - const OpenDDS::DCPS::MonotonicTimePoint end_time = OpenDDS::DCPS::MonotonicTimePoint::now() + config.run_time(); - - OpenDDS::DCPS::ThreadStatusManager& thread_status_manager = TheServiceParticipant->get_thread_status_manager(); - if (thread_status_manager.update_thread_status()) { - if (relay_thread_monitor->start() == -1) { - ACE_ERROR((LM_ERROR, "(%P:%t) ERROR: failed to start Relay Thread Monitor\n")); - return EXIT_FAILURE; - } - - OpenDDS::DCPS::ThreadStatusManager::Start thread_status_monitoring_active(thread_status_manager, "RtpsRelay Main"); - - while (!has_run_time || OpenDDS::DCPS::MonotonicTimePoint::now() < end_time) { - ACE_Time_Value t = thread_status_manager.thread_status_interval().value(); - OpenDDS::DCPS::ThreadStatusManager::Sleeper s(thread_status_manager); - if (reactor->run_reactor_event_loop(t, 0) != 0) { - break; - } - } - - relay_thread_monitor->stop(); - } else if (has_run_time) { - while (OpenDDS::DCPS::MonotonicTimePoint::now() < end_time) { - ACE_Time_Value t = (end_time - OpenDDS::DCPS::MonotonicTimePoint::now()).value(); - if (reactor->run_reactor_event_loop(t, 0) != 0) { - break; - } - } - - } else { - reactor->run_reactor_event_loop(); + const auto status = RelayEventLoop::run(config, *reactor, *relay_thread_monitor); + if (status != EXIT_SUCCESS) { + return status; } application_participant->delete_contained_entities(); From 7c7b4fcf47e6b57ac9d58647e32714e7c8f42471 Mon Sep 17 00:00:00 2001 From: Adam Mitz Date: Mon, 30 Sep 2024 09:44:26 -0500 Subject: [PATCH 2/3] Corrected formatting of log messages Co-authored-by: Justin Wilson --- tools/rtpsrelay/RelayEventLoop.cpp | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/tools/rtpsrelay/RelayEventLoop.cpp b/tools/rtpsrelay/RelayEventLoop.cpp index e5d52c85f6b..90b0def19fd 100644 --- a/tools/rtpsrelay/RelayEventLoop.cpp +++ b/tools/rtpsrelay/RelayEventLoop.cpp @@ -54,7 +54,7 @@ int ThreadPool::run() { RunThreadMonitor rtm{thread_status_manager_, monitor_}; if (rtm.status_ != EXIT_SUCCESS) { - ACE_ERROR((LM_ERROR, "(%P:%t) ERROR: RtpsRelay::ThreadPool::run - failed to start Relay Thread Monitor\n")); + ACE_ERROR((LM_ERROR, "(%P:%t) ERROR: RtpsRelay::ThreadPool::run: failed to start Relay Thread Monitor\n")); return EXIT_FAILURE; } @@ -65,7 +65,7 @@ int ThreadPool::run() const auto status = activate(THR_NEW_LWP | THR_JOINABLE | THR_INHERIT_SCHED, static_cast(threads)); if (status != EXIT_SUCCESS) { - ACE_ERROR((LM_ERROR, "(%P:%t) ERROR: RtpsRelay::ThreadPool::run - failed to start thread pool: %m\n")); + ACE_ERROR((LM_ERROR, "(%P:%t) ERROR: RtpsRelay::ThreadPool::run: failed to start thread pool: %m\n")); return status; } From 35e4ef3def41fe1e9bcd39480c982503c60dd2fa Mon Sep 17 00:00:00 2001 From: Adam Mitz Date: Mon, 30 Sep 2024 14:58:41 +0000 Subject: [PATCH 3/3] Updated documentation --- docs/devguide/internet_enabled_rtps.rst | 4 ++++ docs/news.d/relay-handler-threadpool.rst | 8 ++++++++ docs/news.d/relay-max-participants.rst | 2 +- .../{release-reservatons.rst => release-reservations.rst} | 0 4 files changed, 13 insertions(+), 1 deletion(-) create mode 100644 docs/news.d/relay-handler-threadpool.rst rename docs/news.d/{release-reservatons.rst => release-reservations.rst} (100%) diff --git a/docs/devguide/internet_enabled_rtps.rst b/docs/devguide/internet_enabled_rtps.rst index a49e5fc847c..d7af6ac8f99 100644 --- a/docs/devguide/internet_enabled_rtps.rst +++ b/docs/devguide/internet_enabled_rtps.rst @@ -331,6 +331,10 @@ The command-line options for the RtpsRelay: Amount of time to reject messages from client participants that show suspicious behavior, e.g., those that send messages from the RtpsRelay back to the RtpsRelay. The default is 0 (disabled). +.. option:: -HandlerThreads + + Use a thread pool with this many threads (default 1) to handle input/output/timer events. + .. _internet_enabled_rtps--deployment-considerations: Deployment Considerations diff --git a/docs/news.d/relay-handler-threadpool.rst b/docs/news.d/relay-handler-threadpool.rst new file mode 100644 index 00000000000..3e08d734787 --- /dev/null +++ b/docs/news.d/relay-handler-threadpool.rst @@ -0,0 +1,8 @@ +.. news-prs: 4808 + +.. news-start-section: Additions +- Use a configurable thread pool for the RtpsRelay event handlers + + - See :option:`RtpsRelay -HandlerThreads` + +.. news-end-section diff --git a/docs/news.d/relay-max-participants.rst b/docs/news.d/relay-max-participants.rst index 5afe8016a6f..b5d585f979d 100644 --- a/docs/news.d/relay-max-participants.rst +++ b/docs/news.d/relay-max-participants.rst @@ -1,7 +1,7 @@ .. news-prs: 4774 .. news-start-section: Additions -- Added -AdmissionMaxParticipantsRange to RtpsRelay options +- Added :option:`RtpsRelay -AdmissionMaxParticipantsRange` to RtpsRelay options - This option provides another mechanism for detecting load on each RtpsRelay instance diff --git a/docs/news.d/release-reservatons.rst b/docs/news.d/release-reservations.rst similarity index 100% rename from docs/news.d/release-reservatons.rst rename to docs/news.d/release-reservations.rst