Skip to content

Commit

Permalink
[Bench] Replace worker Proactor use with EventDispatcher
Browse files Browse the repository at this point in the history
  • Loading branch information
simpsont-oci committed Oct 1, 2024
1 parent 290031d commit ba4b450
Show file tree
Hide file tree
Showing 13 changed files with 110 additions and 141 deletions.
10 changes: 5 additions & 5 deletions performance-tests/bench/worker/ForwardAction.cpp
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
#include "ForwardAction.h"

#include "MemFunHandler.h"
#include "MemFunEvent.h"
#include "util.h"

namespace {
Expand All @@ -12,8 +12,8 @@ const size_t DEFAULT_QUEUE_SIZE(10u);

namespace Bench {

ForwardAction::ForwardAction(ACE_Proactor& proactor)
: proactor_(proactor)
ForwardAction::ForwardAction(OpenDDS::DCPS::EventDispatcher_rch event_dispatcher)
: event_dispatcher_(event_dispatcher)
, started_(false)
, stopped_(false)
, write_task_active_(false)
Expand Down Expand Up @@ -108,7 +108,7 @@ bool ForwardAction::init(const ActionConfig& config, ActionReport& report,
// hence 0 and 1 are not valid vector sizes and will not work
data_queue_.resize(queue_size > 0 ? queue_size + 1 : 2);

handler_.reset(new MemFunHandler<ForwardAction>(&ForwardAction::do_writes, *this));
event_ = OpenDDS::DCPS::make_rch<MemFunEvent<ForwardAction> >(shared_from_this(), &ForwardAction::do_writes);

return true;
}
Expand Down Expand Up @@ -159,7 +159,7 @@ void ForwardAction::on_data(const Data& data)
queue_last_ = (queue_last_ + 1) % data_queue_.size();
if (!write_task_active_) {
write_task_active_ = true;
proactor_.schedule_timer(*handler_, nullptr, ZERO);
event_dispatcher_->dispatch(event_);
}
} else {
for (auto it = data_dws_.begin(); it != data_dws_.end(); ++it) {
Expand Down
8 changes: 4 additions & 4 deletions performance-tests/bench/worker/ForwardAction.h
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
#pragma once

#include "ace/Proactor.h"
#include "dds/DCPS/EventDispatcher.h"

#include "Action.h"
#include "DataHandler.h"
Expand All @@ -15,7 +15,7 @@ namespace Bench {

class ForwardAction : public virtual Action, public virtual DataHandler, public std::enable_shared_from_this<ForwardAction> {
public:
explicit ForwardAction(ACE_Proactor& proactor);
explicit ForwardAction(OpenDDS::DCPS::EventDispatcher_rch event_dispatcher);

bool init(const ActionConfig& config, ActionReport& report, Builder::ReaderMap& readers,
Builder::WriterMap& writers, const Builder::ContentFilteredTopicMap& cft_map) override;
Expand All @@ -38,7 +38,7 @@ class ForwardAction : public virtual Action, public virtual DataHandler, public
};

std::mutex mutex_;
ACE_Proactor& proactor_;
OpenDDS::DCPS::EventDispatcher_rch event_dispatcher_;
bool started_, stopped_;
bool write_task_active_;
std::vector<std::shared_ptr<Registration> > registrations_;
Expand All @@ -52,7 +52,7 @@ class ForwardAction : public virtual Action, public virtual DataHandler, public
size_t queue_first_, queue_last_;
std::condition_variable queue_not_full_;
DDS::InstanceHandle_t instance_;
std::shared_ptr<ACE_Handler> handler_;
OpenDDS::DCPS::EventBase_rch event_;
};

}
36 changes: 36 additions & 0 deletions performance-tests/bench/worker/MemFunEvent.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
#pragma once

#include "dds/DCPS/EventDispatcher.h"

#include <memory>

namespace Bench {

/**
* MemFunEvent is a helper class for adapting void member funtions of existing
* classes into dispatchable events
*/
template <typename Delegate>
class MemFunEvent : public OpenDDS::DCPS::EventBase {
public:
typedef void (Delegate::*PMF)();

MemFunEvent(std::shared_ptr<Delegate> delegate, PMF function)
: delegate_(delegate)
, function_(function)
{}

void handle_event()
{
std::shared_ptr<Delegate> handle = delegate_.lock();
if (handle) {
((*handle).*function_)();
}
}

private:
std::weak_ptr<Delegate> delegate_;
PMF function_;
};

}
23 changes: 0 additions & 23 deletions performance-tests/bench/worker/MemFunHandler.h

This file was deleted.

14 changes: 7 additions & 7 deletions performance-tests/bench/worker/ReadAction.cpp
Original file line number Diff line number Diff line change
@@ -1,14 +1,14 @@
#include "ReadAction.h"

#include "MemFunHandler.h"
#include "MemFunEvent.h"
#include "util.h"

#include "dds/DCPS/WaitSet.h"

namespace Bench {

ReadAction::ReadAction(ACE_Proactor& proactor)
: proactor_(proactor)
ReadAction::ReadAction(OpenDDS::DCPS::EventDispatcher_rch event_dispatcher)
: event_dispatcher_(event_dispatcher)
, started_(false)
, stopped_(false)
, stop_condition_(new DDS::GuardCondition())
Expand Down Expand Up @@ -65,7 +65,7 @@ bool ReadAction::init(const ActionConfig& config, ActionReport& report, Builder:
read_period_ = ACE_Time_Value(read_period_prop->value.time_prop().sec, static_cast<suseconds_t>(read_period_prop->value.time_prop().nsec / 1000u));
}

handler_.reset(new MemFunHandler<ReadAction>(&ReadAction::do_read, *this));
event_ = OpenDDS::DCPS::make_rch<MemFunEvent<ReadAction> >(shared_from_this(), &ReadAction::do_read);

return true;
}
Expand All @@ -79,7 +79,7 @@ void ReadAction::test_start()
ws_ = new DDS::WaitSet();
ws_->attach_condition(stop_condition_);
ws_->attach_condition(read_condition_);
proactor_.schedule_timer(*handler_, nullptr, ZERO_TIME, ZERO_TIME);
event_dispatcher_->dispatch(event_);
}
}

Expand All @@ -92,7 +92,7 @@ void ReadAction::action_stop()
std::unique_lock<std::mutex> lock(mutex_);
if (started_ && !stopped_) {
stopped_ = true;
proactor_.cancel_timer(*handler_);
event_dispatcher_->cancel(event_);
ws_->detach_condition(stop_condition_);
ws_->detach_condition(read_condition_);
data_dr_->delete_readcondition(read_condition_);
Expand Down Expand Up @@ -126,7 +126,7 @@ void ReadAction::do_read()
}
}

proactor_.schedule_timer(*handler_, nullptr, ZERO_TIME, ZERO_TIME);
event_dispatcher_->dispatch(event_);
}
}

Expand Down
10 changes: 5 additions & 5 deletions performance-tests/bench/worker/ReadAction.h
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,13 @@

#include "dds/DCPS/GuardCondition.h"

#include "ace/Proactor.h"
#include "dds/DCPS/EventDispatcher.h"

namespace Bench {

class ReadAction : public Action {
class ReadAction : public virtual Action, public std::enable_shared_from_this<ReadAction> {
public:
explicit ReadAction(ACE_Proactor& proactor);
explicit ReadAction(OpenDDS::DCPS::EventDispatcher_rch event_dispatcher);

bool init(const ActionConfig& config, ActionReport& report, Builder::ReaderMap& readers,
Builder::WriterMap& writers, const Builder::ContentFilteredTopicMap& cft_map) override;
Expand All @@ -26,15 +26,15 @@ class ReadAction : public Action {

protected:
std::mutex mutex_;
ACE_Proactor& proactor_;
OpenDDS::DCPS::EventDispatcher_rch event_dispatcher_;
bool started_, stopped_;
DDS::GuardCondition_var stop_condition_;
DDS::ReadCondition_var read_condition_;
DataDataReader_var data_dr_;
DDS::WaitSet_var ws_;
WorkerDataReaderListener* dr_listener_;
ACE_Time_Value read_period_;
std::shared_ptr<ACE_Handler> handler_;
OpenDDS::DCPS::EventBase_rch event_;
};

}
15 changes: 9 additions & 6 deletions performance-tests/bench/worker/SetCftParametersAction.cpp
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
#include "SetCftParametersAction.h"

#include "MemFunHandler.h"
#include "MemFunEvent.h"
#include "util.h"

namespace Bench {

SetCftParametersAction::SetCftParametersAction(ACE_Proactor& proactor)
: proactor_(proactor)
SetCftParametersAction::SetCftParametersAction(OpenDDS::DCPS::EventDispatcher_rch event_dispatcher)
: event_dispatcher_(event_dispatcher)
, started_(false)
, stopped_(false)
, set_period_(1, 0)
Expand Down Expand Up @@ -113,7 +113,7 @@ bool SetCftParametersAction::init(const ActionConfig& config, ActionReport& repo
set_period_ = ACE_Time_Value(set_period_prop->value.time_prop().sec, static_cast<suseconds_t>(set_period_prop->value.time_prop().nsec / 1000u));
}

handler_.reset(new MemFunHandler<SetCftParametersAction>(&SetCftParametersAction::do_set_expression_parameters, *this));
event_ = OpenDDS::DCPS::make_rch<MemFunEvent<SetCftParametersAction> >(shared_from_this(), &SetCftParametersAction::do_set_expression_parameters);

return true;
}
Expand All @@ -123,7 +123,8 @@ void SetCftParametersAction::test_start()
std::unique_lock<std::mutex> lock(mutex_);
if (!started_) {
started_ = true;
proactor_.schedule_timer(*handler_, nullptr, ZERO_TIME, set_period_);
last_scheduled_time_ = OpenDDS::DCPS::MonotonicTimePoint::now();
event_dispatcher_->dispatch(event_);
}
}

Expand All @@ -132,7 +133,7 @@ void SetCftParametersAction::test_stop()
std::unique_lock<std::mutex> lock(mutex_);
if (started_ && !stopped_) {
stopped_ = true;
proactor_.cancel_timer(*handler_);
event_dispatcher_->cancel(event_);
}
}

Expand All @@ -156,6 +157,8 @@ void SetCftParametersAction::do_set_expression_parameters()
}
#endif
}
last_scheduled_time_ += OpenDDS::DCPS::TimeDuration(set_period_);
event_dispatcher_->schedule(event_, last_scheduled_time_);
}
}

Expand Down
11 changes: 6 additions & 5 deletions performance-tests/bench/worker/SetCftParametersAction.h
Original file line number Diff line number Diff line change
Expand Up @@ -3,17 +3,17 @@
#include "dds/DdsDcpsTopicC.h"

#include "Action.h"
#include "ace/Proactor.h"
#include "dds/DCPS/EventDispatcher.h"
#include "BenchTypeSupportImpl.h"

#include <random>
#include <vector>

namespace Bench {

class SetCftParametersAction : public Action {
class SetCftParametersAction : public virtual Action, public std::enable_shared_from_this<SetCftParametersAction> {
public:
explicit SetCftParametersAction(ACE_Proactor& proactor);
explicit SetCftParametersAction(OpenDDS::DCPS::EventDispatcher_rch event_dispatcher);

bool init(const ActionConfig& config, ActionReport& report, Builder::ReaderMap& readers,
Builder::WriterMap& writers, const Builder::ContentFilteredTopicMap& cft_map) override;
Expand All @@ -25,14 +25,15 @@ class SetCftParametersAction : public Action {

protected:
std::mutex mutex_;
ACE_Proactor& proactor_;
OpenDDS::DCPS::EventDispatcher_rch event_dispatcher_;
bool started_, stopped_;
ACE_Time_Value set_period_;
size_t max_count_;
size_t param_count_;
bool random_order_;
DDS::InstanceHandle_t instance_;
std::shared_ptr<ACE_Handler> handler_;
OpenDDS::DCPS::MonotonicTimePoint last_scheduled_time_;
OpenDDS::DCPS::EventBase_rch event_;
std::mt19937_64 mt_;
size_t set_call_count_;
#ifndef OPENDDS_NO_CONTENT_FILTERED_TOPIC
Expand Down
Loading

0 comments on commit ba4b450

Please sign in to comment.