Skip to content

Commit

Permalink
Merge pull request OpenDDS#4808 from mitza-oci/relay-handler-thread-pool
Browse files Browse the repository at this point in the history
Use a configurable thread pool for the RtpsRelay event handlers
  • Loading branch information
mitza-oci authored Sep 30, 2024
2 parents 182c3be + 35e4ef3 commit 290031d
Show file tree
Hide file tree
Showing 9 changed files with 172 additions and 34 deletions.
4 changes: 4 additions & 0 deletions docs/devguide/internet_enabled_rtps.rst
Original file line number Diff line number Diff line change
Expand Up @@ -331,6 +331,10 @@ The command-line options for the RtpsRelay:
Amount of time to reject messages from client participants that show suspicious behavior, e.g., those that send messages from the RtpsRelay back to the RtpsRelay.
The default is 0 (disabled).

.. option:: -HandlerThreads <threads>

Use a thread pool with this many threads (default 1) to handle input/output/timer events.

.. _internet_enabled_rtps--deployment-considerations:

Deployment Considerations
Expand Down
8 changes: 8 additions & 0 deletions docs/news.d/relay-handler-threadpool.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
.. news-prs: 4808
.. news-start-section: Additions
- Use a configurable thread pool for the RtpsRelay event handlers

- See :option:`RtpsRelay -HandlerThreads`

.. news-end-section
2 changes: 1 addition & 1 deletion docs/news.d/relay-max-participants.rst
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
.. news-prs: 4774
.. news-start-section: Additions
- Added -AdmissionMaxParticipantsRange to RtpsRelay options
- Added :option:`RtpsRelay -AdmissionMaxParticipantsRange` to RtpsRelay options

- This option provides another mechanism for detecting load on each RtpsRelay instance

Expand Down
File renamed without changes.
1 change: 1 addition & 0 deletions tools/rtpsrelay/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ add_executable(RtpsRelay
ParticipantStatisticsReporter.cpp
PublicationListener.cpp
RelayAddressListener.cpp
RelayEventLoop.cpp
RelayHandler.cpp
RelayHttpMetaDiscovery.cpp
RelayParticipantStatusReporter.cpp
Expand Down
12 changes: 12 additions & 0 deletions tools/rtpsrelay/Config.h
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ class Config {
, max_ips_per_client_(0)
, admission_max_participants_high_water_(0)
, admission_max_participants_low_water_(0)
, handler_threads_(1)
{}

void relay_id(const std::string& value)
Expand Down Expand Up @@ -338,6 +339,16 @@ class Config {
return admission_max_participants_low_water_;
}

void handler_threads(size_t count)
{
handler_threads_ = count;
}

size_t handler_threads() const
{
return handler_threads_;
}

private:
std::string relay_id_;
OpenDDS::DCPS::GUID_t application_participant_guid_;
Expand Down Expand Up @@ -369,6 +380,7 @@ class Config {
OpenDDS::DCPS::TimeDuration rejected_address_duration_;
size_t admission_max_participants_high_water_;
size_t admission_max_participants_low_water_;
size_t handler_threads_;
};

}
Expand Down
115 changes: 115 additions & 0 deletions tools/rtpsrelay/RelayEventLoop.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
/*
* Distributed under the OpenDDS License.
* See: http://www.opendds.org/license.html
*/

#include "RelayEventLoop.h"

#include <ace/Select_Reactor.h>
#include <ace/TP_Reactor.h>
#include <ace/Task.h>

#include <cstdlib>

namespace RtpsRelay {

struct ThreadPool : ACE_Task_Base {

ThreadPool(const Config& config, ACE_Reactor& reactor, RelayThreadMonitor& monitor)
: config_(config)
, reactor_(reactor)
, monitor_(monitor)
{}

int svc() override;
int run();

const Config& config_;
ACE_Reactor& reactor_;
RelayThreadMonitor& monitor_;
OpenDDS::DCPS::ThreadStatusManager& thread_status_manager_ = TheServiceParticipant->get_thread_status_manager();
};

struct RunThreadMonitor {

RunThreadMonitor(OpenDDS::DCPS::ThreadStatusManager& thread_status_manager, RelayThreadMonitor& monitor)
: should_run_(thread_status_manager.update_thread_status())
, monitor_(monitor)
, status_(should_run_ ? monitor.start() : EXIT_SUCCESS)
{}

~RunThreadMonitor()
{
if (should_run_ && status_ == EXIT_SUCCESS) {
monitor_.stop();
}
}

const bool should_run_;
RelayThreadMonitor& monitor_;
const int status_;
};

int ThreadPool::run()
{
RunThreadMonitor rtm{thread_status_manager_, monitor_};
if (rtm.status_ != EXIT_SUCCESS) {
ACE_ERROR((LM_ERROR, "(%P:%t) ERROR: RtpsRelay::ThreadPool::run: failed to start Relay Thread Monitor\n"));
return EXIT_FAILURE;
}

const auto threads = config_.handler_threads();
if (threads == 1) {
return svc();
}

const auto status = activate(THR_NEW_LWP | THR_JOINABLE | THR_INHERIT_SCHED, static_cast<int>(threads));
if (status != EXIT_SUCCESS) {
ACE_ERROR((LM_ERROR, "(%P:%t) ERROR: RtpsRelay::ThreadPool::run: failed to start thread pool: %m\n"));
return status;
}

return wait();
}

int ThreadPool::svc()
{
const auto has_run_time = !config_.run_time().is_zero();
const auto end_time = OpenDDS::DCPS::MonotonicTimePoint::now() + config_.run_time();

if (thread_status_manager_.update_thread_status()) {
OpenDDS::DCPS::ThreadStatusManager::Start thread_status_monitoring_active(thread_status_manager_, "RtpsRelay Event Loop");

while (!has_run_time || OpenDDS::DCPS::MonotonicTimePoint::now() < end_time) {
auto t = thread_status_manager_.thread_status_interval().value();
OpenDDS::DCPS::ThreadStatusManager::Sleeper s(thread_status_manager_);
if (reactor_.run_reactor_event_loop(t, 0) != 0) {
break;
}
}

} else if (has_run_time) {
while (OpenDDS::DCPS::MonotonicTimePoint::now() < end_time) {
auto t = (end_time - OpenDDS::DCPS::MonotonicTimePoint::now()).value();
if (reactor_.run_reactor_event_loop(t, 0) != 0) {
break;
}
}

} else {
reactor_.run_reactor_event_loop();
}
return EXIT_SUCCESS;
}

ACE_Reactor_Impl* RelayEventLoop::make_reactor_impl(const Config& config)
{
return config.handler_threads() == 1 ? new ACE_Select_Reactor : new ACE_TP_Reactor;
}

int RelayEventLoop::run(const Config& config, ACE_Reactor& reactor, RelayThreadMonitor& monitor)
{
return ThreadPool{config, reactor, monitor}.run();
}

}
23 changes: 23 additions & 0 deletions tools/rtpsrelay/RelayEventLoop.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
/*
* Distributed under the OpenDDS License.
* See: http://www.opendds.org/license.html
*/
#ifndef RTPSRELAY_RELAY_EVENT_LOOP_H_
#define RTPSRELAY_RELAY_EVENT_LOOP_H_

#include "Config.h"
#include "RelayThreadMonitor.h"

#include <ace/Reactor.h>

namespace RtpsRelay {
namespace RelayEventLoop {

ACE_Reactor_Impl* make_reactor_impl(const Config& config);

int run(const Config& config, ACE_Reactor& reactor, RelayThreadMonitor& monitor);

}
}

#endif
41 changes: 8 additions & 33 deletions tools/rtpsrelay/RtpsRelay.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
#include "ParticipantStatisticsReporter.h"
#include "PublicationListener.h"
#include "RelayAddressListener.h"
#include "RelayEventLoop.h"
#include "RelayHandler.h"
#include "RelayHttpMetaDiscovery.h"
#include "RelayPartitionTable.h"
Expand Down Expand Up @@ -38,7 +39,6 @@
#include <ace/Arg_Shifter.h>
#include <ace/Argv_Type_Converter.h>
#include <ace/Reactor.h>
#include <ace/Select_Reactor.h>

#include <cstdlib>
#include <algorithm>
Expand Down Expand Up @@ -229,6 +229,9 @@ int run(int argc, ACE_TCHAR* argv[])
} else if ((arg = args.get_the_parameter("-RunTime"))) {
config.run_time(OpenDDS::DCPS::TimeDuration(ACE_OS::atoi(arg)));
args.consume_arg();
} else if ((arg = args.get_the_parameter("-HandlerThreads"))) {
config.handler_threads(std::atoi(arg));
args.consume_arg();
} else if ((arg = args.get_the_parameter("-MaxIpsPerClient"))) {
config.max_ips_per_client(ACE_OS::atoi(arg));
args.consume_arg();
Expand Down Expand Up @@ -737,7 +740,7 @@ int run(int argc, ACE_TCHAR* argv[])
RelayParticipantStatusReporter relay_participant_status_reporter(config, relay_participant_status_writer, relay_statistics_reporter);
RelayThreadMonitor* relay_thread_monitor = new RelayThreadMonitor(config);
GuidAddrSet guid_addr_set(config, rtps_discovery, relay_participant_status_reporter, relay_statistics_reporter, *relay_thread_monitor);
ACE_Reactor reactor_(new ACE_Select_Reactor, true);
ACE_Reactor reactor_(RelayEventLoop::make_reactor_impl(config), true);
const auto reactor = &reactor_;
GuidPartitionTable guid_partition_table(config, spdp_horizontal_addr, relay_partitions_writer, spdp_replay_writer);
RelayPartitionTable relay_partition_table;
Expand Down Expand Up @@ -961,37 +964,9 @@ int run(int argc, ACE_TCHAR* argv[])
}
ACE_DEBUG((LM_INFO, "(%P|%t) INFO: Meta Discovery listening on %C\n", OpenDDS::DCPS::LogAddr(meta_discovery_addr).c_str()));

const bool has_run_time = !config.run_time().is_zero();
const OpenDDS::DCPS::MonotonicTimePoint end_time = OpenDDS::DCPS::MonotonicTimePoint::now() + config.run_time();

OpenDDS::DCPS::ThreadStatusManager& thread_status_manager = TheServiceParticipant->get_thread_status_manager();
if (thread_status_manager.update_thread_status()) {
if (relay_thread_monitor->start() == -1) {
ACE_ERROR((LM_ERROR, "(%P:%t) ERROR: failed to start Relay Thread Monitor\n"));
return EXIT_FAILURE;
}

OpenDDS::DCPS::ThreadStatusManager::Start thread_status_monitoring_active(thread_status_manager, "RtpsRelay Main");

while (!has_run_time || OpenDDS::DCPS::MonotonicTimePoint::now() < end_time) {
ACE_Time_Value t = thread_status_manager.thread_status_interval().value();
OpenDDS::DCPS::ThreadStatusManager::Sleeper s(thread_status_manager);
if (reactor->run_reactor_event_loop(t, 0) != 0) {
break;
}
}

relay_thread_monitor->stop();
} else if (has_run_time) {
while (OpenDDS::DCPS::MonotonicTimePoint::now() < end_time) {
ACE_Time_Value t = (end_time - OpenDDS::DCPS::MonotonicTimePoint::now()).value();
if (reactor->run_reactor_event_loop(t, 0) != 0) {
break;
}
}

} else {
reactor->run_reactor_event_loop();
const auto status = RelayEventLoop::run(config, *reactor, *relay_thread_monitor);
if (status != EXIT_SUCCESS) {
return status;
}

application_participant->delete_contained_entities();
Expand Down

0 comments on commit 290031d

Please sign in to comment.