Skip to content

Commit

Permalink
attempt locking granularity improvement for ReadAction
Browse files Browse the repository at this point in the history
  • Loading branch information
simpsont-oci committed Oct 1, 2024
1 parent 230ed7d commit 1642f9b
Show file tree
Hide file tree
Showing 2 changed files with 36 additions and 9 deletions.
43 changes: 34 additions & 9 deletions performance-tests/bench/worker/ReadAction.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ ReadAction::ReadAction(OpenDDS::DCPS::EventDispatcher_rch event_dispatcher)
, stop_condition_(new DDS::GuardCondition())
, dr_listener_(0)
, read_period_(1, 0)
, in_do_read_(false)
{
}

Expand Down Expand Up @@ -85,52 +86,76 @@ void ReadAction::test_start()

void ReadAction::test_stop()
{
Builder::Log::log() << Bench::iso8601() << ": ReadAction::test_stop" << std::endl;
//Builder::Log::log() << Bench::iso8601() << ": ReadAction::test_stop" << std::endl;
}

void ReadAction::action_stop()
{
std::unique_lock<std::mutex> lock(mutex_);
if (started_ && !stopped_) {
Builder::Log::log() << Bench::iso8601() << ": ReadAction::action_stop - stopping" << std::endl;
//Builder::Log::log() << Bench::iso8601() << ": ReadAction::action_stop - stopping" << std::endl;
stopped_ = true;
event_dispatcher_->cancel(event_);
stop_condition_->set_trigger_value(true);
while (in_do_read_) {
cv_.wait(lock);
}
ws_->detach_condition(stop_condition_);
ws_->detach_condition(read_condition_);
data_dr_->delete_readcondition(read_condition_);
stop_condition_->set_trigger_value(true);
}
}

namespace {

struct bool_guard {
explicit bool_guard(bool& val, std::condition_variable& cv) : val_(val), cv_(cv) { val_ = true; }
~bool_guard() { val_ = false; cv_.notify_all(); }
bool& val_;
std::condition_variable& cv_;
};

}

void ReadAction::do_read()
{
std::unique_lock<std::mutex> lock(mutex_);
bool_guard bg(in_do_read_, cv_);
if (started_ && !stopped_) {
DDS::ConditionSeq active;
const DDS::Duration_t duration = {static_cast<CORBA::Long>(read_period_.sec()), static_cast<CORBA::ULong>(read_period_.usec() * 1000)};
Builder::Log::log() << Bench::iso8601() << ": ReadAction::do_read START" << std::endl;
DDS::ReturnCode_t ret = ws_->wait(active, duration);
Builder::Log::log() << Bench::iso8601() << ": ReadAction::do_read STOP" << std::endl;
DDS::WaitSet_var ws_copy= ws_;
DDS::ReturnCode_t ret;

lock.unlock();
//Builder::Log::log() << Bench::iso8601() << ": ReadAction::do_read START" << std::endl;
ret = ws_->wait(active, duration);
//Builder::Log::log() << Bench::iso8601() << ": ReadAction::do_read STOP" << std::endl;
lock.lock();

if (stopped_) {
return;
}

if (ret == DDS::RETCODE_OK) {
for (CORBA::ULong i = 0; i < active.length(); ++i) {
for (CORBA::ULong i = 0; !stopped_ && i < active.length(); ++i) {
if (active[i] == read_condition_) {
Bench::Data data;
DDS::SampleInfo si;
while ((ret = data_dr_->take_next_sample(data, si)) == DDS::RETCODE_OK) {
while (!stopped_ && (ret = data_dr_->take_next_sample(data, si)) == DDS::RETCODE_OK) {
if (si.valid_data && dr_listener_) {
lock.unlock();
dr_listener_->on_valid_data(data, si);
lock.lock();
}
}
}
}
}

event_dispatcher_->dispatch(event_);
if (!stopped_) {
event_dispatcher_->dispatch(event_);
}
}
}

Expand Down
2 changes: 2 additions & 0 deletions performance-tests/bench/worker/ReadAction.h
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ class ReadAction : public virtual Action, public std::enable_shared_from_this<Re

protected:
std::mutex mutex_;
std::condition_variable cv_;
OpenDDS::DCPS::EventDispatcher_rch event_dispatcher_;
bool started_, stopped_;
DDS::GuardCondition_var stop_condition_;
Expand All @@ -35,6 +36,7 @@ class ReadAction : public virtual Action, public std::enable_shared_from_this<Re
WorkerDataReaderListener* dr_listener_;
ACE_Time_Value read_period_;
OpenDDS::DCPS::EventBase_rch event_;
bool in_do_read_;
};

}

0 comments on commit 1642f9b

Please sign in to comment.