Skip to content

Commit

Permalink
Merge pull request OpenDDS#4513 from mitza-oci/thread-status
Browse files Browse the repository at this point in the history
Minor updates to ThreadStatusManager
  • Loading branch information
jrw972 authored Mar 12, 2024
2 parents 750e54a + 307e673 commit a7eb6ef
Show file tree
Hide file tree
Showing 15 changed files with 36 additions and 54 deletions.
4 changes: 2 additions & 2 deletions dds/DCPS/RTPS/Spdp.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -4445,8 +4445,8 @@ void Spdp::SpdpTransport::thread_status_task(const DCPS::MonotonicTimePoint& now
typedef DCPS::ThreadStatusManager::List List;
List running;
List removed;
TheServiceParticipant->get_thread_status_manager().harvest(last_harvest, running, removed);
last_harvest = now;
TheServiceParticipant->get_thread_status_manager().harvest(last_thread_status_harvest_, running, removed);
last_thread_status_harvest_ = now;
for (List::const_iterator i = removed.begin(); i != removed.end(); ++i) {
DCPS::InternalThreadBuiltinTopicData data;
data.thread_id = i->bit_key().c_str();
Expand Down
2 changes: 1 addition & 1 deletion dds/DCPS/RTPS/Spdp.h
Original file line number Diff line number Diff line change
Expand Up @@ -558,7 +558,7 @@ class OpenDDS_Rtps_Export Spdp
bool network_is_unreachable_;
bool ice_endpoint_added_;

DCPS::MonotonicTimePoint last_harvest;
DCPS::MonotonicTimePoint last_thread_status_harvest_;
DCPS::ConfigReader_rch config_reader_;
void on_data_available(DCPS::ConfigReader_rch reader);
};
Expand Down
3 changes: 1 addition & 2 deletions dds/DCPS/ReactorTask.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -76,8 +76,7 @@ void ReactorTask::cleanup()
timer_queue_ = 0;
}

int ReactorTask::open_reactor_task(void*,
ThreadStatusManager* thread_status_manager,
int ReactorTask::open_reactor_task(ThreadStatusManager* thread_status_manager,
const String& name)
{
GuardType guard(lock_);
Expand Down
10 changes: 4 additions & 6 deletions dds/DCPS/ReactorTask.h
Original file line number Diff line number Diff line change
Expand Up @@ -32,20 +32,18 @@ namespace OpenDDS {
namespace DCPS {

class OpenDDS_Dcps_Export ReactorTask : public virtual ACE_Task_Base,
public virtual RcObject {
public virtual RcObject {

public:

explicit ReactorTask(bool useAsyncSend);
virtual ~ReactorTask();

public:
int open_reactor_task(void*,
ThreadStatusManager* thread_status_manager = 0,
int open_reactor_task(ThreadStatusManager* thread_status_manager = 0,
const String& name = "");
virtual int open(void* ptr) {
return open_reactor_task(ptr);
}

virtual int open(void*) { return open_reactor_task(); }
virtual int svc();
virtual int close(u_long flags = 0);

Expand Down
4 changes: 1 addition & 3 deletions dds/DCPS/Service_Participant.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -495,9 +495,7 @@ Service_Participant::get_domain_participant_factory(int &argc,

dp_factory_servant_ = make_rch<DomainParticipantFactoryImpl>();

reactor_task_.open_reactor_task(0,
&thread_status_manager_,
"Service_Participant");
reactor_task_.open_reactor_task(&thread_status_manager_, "Service_Participant");

job_queue_ = make_rch<JobQueue>(reactor_task_.get_reactor());

Expand Down
26 changes: 10 additions & 16 deletions dds/DCPS/ThreadStatusManager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -25,36 +25,30 @@ void ThreadStatusManager::Thread::update(const MonotonicTimePoint& m_now,
timestamp_ = s_now;

if (nested) {
switch(next_status) {
case ThreadStatus_Active:
++nesting_depth_;
break;
case ThreadStatus_Idle:
--nesting_depth_;
break;
}
nesting_depth_ += (next_status == ThreadStatus_Active) ? 1 : -1;
}

if (!nested ||
(next_status == ThreadStatus_Active && nesting_depth_ == 1) ||
(next_status == ThreadStatus_Idle && nesting_depth_ == 0)) {
if (bucket_[current_bucket_].active_time + bucket_[current_bucket_].idle_time > bucket_limit) {
if (buckets_[current_bucket_].total_time() > bucket_limit) {
current_bucket_ = (current_bucket_ + 1) % bucket_count;
total_.active_time -= bucket_[current_bucket_].active_time;
bucket_[current_bucket_].active_time = 0;
total_.idle_time -= bucket_[current_bucket_].idle_time;
bucket_[current_bucket_].idle_time = 0;
Bucket& current = buckets_[current_bucket_];
total_.active_time -= current.active_time;
current.active_time = 0;
total_.idle_time -= current.idle_time;
current.idle_time = 0;
}

const TimeDuration t = m_now - last_update_;

switch (status_) {
case ThreadStatus_Active:
bucket_[current_bucket_].active_time += t;
buckets_[current_bucket_].active_time += t;
total_.active_time += t;
break;
case ThreadStatus_Idle:
bucket_[current_bucket_].idle_time += t;
buckets_[current_bucket_].idle_time += t;
total_.idle_time += t;
break;
}
Expand All @@ -68,7 +62,7 @@ double ThreadStatusManager::Thread::utilization(const MonotonicTimePoint& now) c
{
const TimeDuration active_bonus = (now > last_update_ && status_ == ThreadStatus_Active) ? (now - last_update_) : TimeDuration::zero_value;
const TimeDuration idle_bonus = (now > last_update_ && status_ == ThreadStatus_Idle) ? (now - last_update_) : TimeDuration::zero_value;
const TimeDuration denom = total_.active_time + active_bonus + total_.idle_time + idle_bonus;
const TimeDuration denom = total_.total_time() + active_bonus + idle_bonus;

if (!denom.is_zero()) {
return (total_.active_time + active_bonus) / denom;
Expand Down
7 changes: 4 additions & 3 deletions dds/DCPS/ThreadStatusManager.h
Original file line number Diff line number Diff line change
Expand Up @@ -67,10 +67,11 @@ class OpenDDS_Dcps_Export ThreadStatusManager {
struct OpenDDS_Dcps_Export Bucket {
TimeDuration active_time;
TimeDuration idle_time;
TimeDuration total_time() const { return active_time + idle_time; }
};
MonotonicTimePoint last_update_;
Bucket total_;
Bucket bucket_[bucket_count];
Bucket buckets_[bucket_count];
size_t current_bucket_;
size_t nesting_depth_;
};
Expand All @@ -93,7 +94,7 @@ class OpenDDS_Dcps_Export ThreadStatusManager {
return thread_status_interval_ > TimeDuration::zero_value;
}

/// Add the calling thread with the manager.
/// Add the calling thread to the manager.
/// name is for a more human-friendly name that will be appended to the BIT key.
/// Implicitly makes the thread active and finishes the thread on destruction.
class Start {
Expand All @@ -115,7 +116,7 @@ class OpenDDS_Dcps_Export ThreadStatusManager {

class Event {
public:
Event(ThreadStatusManager& thread_status_manager)
explicit Event(ThreadStatusManager& thread_status_manager)
: thread_status_manager_(thread_status_manager)
{
thread_status_manager_.active(true);
Expand Down
4 changes: 1 addition & 3 deletions dds/DCPS/transport/framework/TransportImpl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -107,9 +107,7 @@ TransportImpl::create_reactor_task(bool useAsyncSend, const OPENDDS_STRING& name

this->reactor_task_= make_rch<ReactorTask>(useAsyncSend);

if (reactor_task_->open_reactor_task(0,
&TheServiceParticipant->get_thread_status_manager(),
name)) {
if (reactor_task_->open_reactor_task(&TheServiceParticipant->get_thread_status_manager(), name)) {
throw Transport::MiscProblem(); // error already logged by TRT::open()
}
}
Expand Down
4 changes: 2 additions & 2 deletions docs/devguide/built_in_topics.rst
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ Four separate topics are defined for each domain.
Each is dedicated to a particular entity (domain participant :ref:`built_in_topics--dcpsparticipant-topic`, topic :ref:`built_in_topics--dcpsparticipant-topic`, data writer :ref:`built_in_topics--dcpspublication-topic`, data reader :ref:`built_in_topics--dcpssubscription-topic`) and publishes instances describing the state for each entity in the domain.

Subscriptions to built-in topics are automatically created for each domain participant.
A participant's support for Built-In-Topics can be toggled via the ``DCPSBit`` configuration option (see the table in :ref:`run_time_configuration--common-configuration-options`) (Note: this option cannot be used for RTPS discovery).
A participant's support for Built-In-Topics can be toggled via the ``DCPSBit`` configuration option (see the table in :ref:`run_time_configuration--common-configuration-options`).
To view the built-in topic data, simply obtain the built-in Subscriber and then use it to access the Data Reader for the built-in topic of interest.
The Data Reader can then be used like any other Data Reader.

Expand Down Expand Up @@ -284,7 +284,7 @@ OpenDDSInternalThread Topic
..
Sect<6.8.3>
The Built-In Topic "OpenDDSInternalThread" is published when OpenDDS is configured with DCPSThreadStatusInterval (:ref:`run_time_configuration--common-configuration-options`).
The Built-In Topic "OpenDDSInternalThread" is published by the DDSI-RTPS discovery implementation when OpenDDS is configured with DCPSThreadStatusInterval (:ref:`run_time_configuration--common-configuration-options`).
When enabled, the DataReader for this Built-In Topic will report the status of threads created and managed by OpenDDS within the current process.
The timestamp associated with samples can be used to determine the health (responsiveness) of the thread.

Expand Down
7 changes: 3 additions & 4 deletions docs/devguide/internet_enabled_rtps.rst
Original file line number Diff line number Diff line change
Expand Up @@ -193,18 +193,17 @@ The command-line options for the RtpsRelay:
``-Permissions PATH``

Provide paths to the DDS Security documents.
Requires a security-enabled build.

* ``-RestartDetection 0|1``

Setting RestartDetction to 1 causes the relay to track clients by the first 6 bytes of their RTPS GUID and source IP address and clean up older sessions with the same key.
The default is 0 (false).

* ``-LogWarnings0|1``
* ``-LogWarnings 0|1``

``-LogDiscovery0|1``
``-LogDiscovery 0|1``

``-LogActivity0|1``
``-LogActivity 0|1``

Enable/disable logging of the various event types.

Expand Down
2 changes: 1 addition & 1 deletion tests/stress-tests/dds/DCPS/MultiTask.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ TEST(dds_DCPS_MultiTask, TimingChecker)

ThreadStatusManager tsm;
ReactorTask reactor_task(false);
reactor_task.open_reactor_task(0, &tsm);
reactor_task.open_reactor_task(&tsm);

RcHandle<TestObj> obj = make_rch<TestObj>(reactor_task.interceptor());
obj->multi_->enable(TimeDuration::from_msec(2000)); // 2.0 seconds
Expand Down
2 changes: 1 addition & 1 deletion tests/stress-tests/dds/DCPS/SporadicTask.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ TEST(dds_DCPS_SporadicTask, TimingChecker)
TimeSource time_source;
ThreadStatusManager tsm;
ReactorTask reactor_task(false);
reactor_task.open_reactor_task(0, &tsm);
reactor_task.open_reactor_task(&tsm);

// Note: This test is modeled directly on the MultiTask stress test, which has a "fallback" timer
// Since SporadicTask doesn't have this, we expect the number of total executions to be different
Expand Down
2 changes: 1 addition & 1 deletion tests/stress-tests/dds/DCPS/TimingChecker.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ bool TimingChecker::check_timing(const OpenDDS::DCPS::TimeDuration& epsilon, con

ThreadStatusManager tsm;
ReactorTask reactor_task(false);
reactor_task.open_reactor_task(0, &tsm);
reactor_task.open_reactor_task(&tsm);

for (int i = 0; result && i < 5; ++i) {
RcHandle<TimingChecker> checker = make_rch<TimingChecker>();
Expand Down
3 changes: 0 additions & 3 deletions tools/rtpsrelay/GuidPartitionTable.h
Original file line number Diff line number Diff line change
Expand Up @@ -28,12 +28,10 @@ class GuidPartitionTable {
};

GuidPartitionTable(const Config& config,
GuidAddrSet& guid_addr_set,
const ACE_INET_Addr& address,
RelayPartitionsDataWriter_var relay_partitions_writer,
SpdpReplayDataWriter_var spdp_replay_writer)
: config_(config)
, guid_addr_set_(guid_addr_set)
, address_(OpenDDS::DCPS::LogAddr(address).c_str())
, relay_partitions_writer_(relay_partitions_writer)
, spdp_replay_writer_(spdp_replay_writer)
Expand Down Expand Up @@ -244,7 +242,6 @@ class GuidPartitionTable {
}

const Config& config_;
GuidAddrSet& guid_addr_set_;
const std::string address_;
RelayPartitionsDataWriter_var relay_partitions_writer_;

Expand Down
10 changes: 4 additions & 6 deletions tools/rtpsrelay/RtpsRelay.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -318,8 +318,7 @@ int run(int argc, ACE_TCHAR* argv[])
// Set up the relay participant.
DDS::DomainParticipantQos participant_qos;
factory->get_default_participant_qos(participant_qos);
DDS::PropertySeq& relay_properties = participant_qos.property.value;
append(relay_properties, OpenDDS::RTPS::RTPS_REFLECT_HEARTBEAT_COUNT, "true");
append(participant_qos.property.value, OpenDDS::RTPS::RTPS_REFLECT_HEARTBEAT_COUNT, "true");

DDS::DomainParticipant_var relay_participant = factory->create_participant(relay_domain, participant_qos, nullptr,
OpenDDS::DCPS::DEFAULT_STATUS_MASK);
Expand Down Expand Up @@ -718,7 +717,7 @@ int run(int argc, ACE_TCHAR* argv[])
GuidAddrSet guid_addr_set(config, rtps_discovery, relay_participant_status_reporter, relay_statistics_reporter, *relay_thread_monitor);
ACE_Reactor reactor_(new ACE_Select_Reactor, true);
const auto reactor = &reactor_;
GuidPartitionTable guid_partition_table(config, guid_addr_set, spdp_horizontal_addr, relay_partitions_writer, spdp_replay_writer);
GuidPartitionTable guid_partition_table(config, spdp_horizontal_addr, relay_partitions_writer, spdp_replay_writer);
RelayPartitionTable relay_partition_table;
relay_statistics_reporter.report();

Expand Down Expand Up @@ -937,8 +936,6 @@ int run(int argc, ACE_TCHAR* argv[])

RelayStatusReporter relay_status_reporter(config, guid_addr_set, relay_status_writer, reactor);

OpenDDS::DCPS::ThreadStatusManager& thread_status_manager = TheServiceParticipant->get_thread_status_manager();

RelayHttpMetaDiscovery relay_http_meta_discovery(config, meta_discovery_content_type, meta_discovery_content, guid_addr_set);
if (relay_http_meta_discovery.open(meta_discovery_addr, reactor) != 0) {
ACE_ERROR((LM_ERROR, ACE_TEXT("(%P|%t) ERROR: could not open RelayHttpMetaDiscovery\n")));
Expand All @@ -949,9 +946,10 @@ int run(int argc, ACE_TCHAR* argv[])
const bool has_run_time = !config.run_time().is_zero();
const OpenDDS::DCPS::MonotonicTimePoint end_time = OpenDDS::DCPS::MonotonicTimePoint::now() + config.run_time();

OpenDDS::DCPS::ThreadStatusManager& thread_status_manager = TheServiceParticipant->get_thread_status_manager();
if (thread_status_manager.update_thread_status()) {
if (relay_thread_monitor->start() == -1) {
ACE_ERROR((LM_ERROR, ACE_TEXT("(%P:%t) ERROR: failed to activate Thread Load Monitor\n")));
ACE_ERROR((LM_ERROR, ACE_TEXT("(%P:%t) ERROR: failed to start Relay Thread Monitor\n")));
return EXIT_FAILURE;
}

Expand Down

0 comments on commit a7eb6ef

Please sign in to comment.