From 1642f9bdfc38cf642d4d8f74f0022f8460433968 Mon Sep 17 00:00:00 2001 From: Timothy Simpson Date: Tue, 1 Oct 2024 21:49:30 +0100 Subject: [PATCH] attempt locking granularity improvement for ReadAction --- performance-tests/bench/worker/ReadAction.cpp | 43 +++++++++++++++---- performance-tests/bench/worker/ReadAction.h | 2 + 2 files changed, 36 insertions(+), 9 deletions(-) diff --git a/performance-tests/bench/worker/ReadAction.cpp b/performance-tests/bench/worker/ReadAction.cpp index f46c8e621e..68e4b11032 100644 --- a/performance-tests/bench/worker/ReadAction.cpp +++ b/performance-tests/bench/worker/ReadAction.cpp @@ -14,6 +14,7 @@ ReadAction::ReadAction(OpenDDS::DCPS::EventDispatcher_rch event_dispatcher) , stop_condition_(new DDS::GuardCondition()) , dr_listener_(0) , read_period_(1, 0) +, in_do_read_(false) { } @@ -85,52 +86,76 @@ void ReadAction::test_start() void ReadAction::test_stop() { - Builder::Log::log() << Bench::iso8601() << ": ReadAction::test_stop" << std::endl; + //Builder::Log::log() << Bench::iso8601() << ": ReadAction::test_stop" << std::endl; } void ReadAction::action_stop() { std::unique_lock lock(mutex_); if (started_ && !stopped_) { - Builder::Log::log() << Bench::iso8601() << ": ReadAction::action_stop - stopping" << std::endl; + //Builder::Log::log() << Bench::iso8601() << ": ReadAction::action_stop - stopping" << std::endl; stopped_ = true; event_dispatcher_->cancel(event_); + stop_condition_->set_trigger_value(true); + while (in_do_read_) { + cv_.wait(lock); + } ws_->detach_condition(stop_condition_); ws_->detach_condition(read_condition_); data_dr_->delete_readcondition(read_condition_); - stop_condition_->set_trigger_value(true); } } +namespace { + +struct bool_guard { + explicit bool_guard(bool& val, std::condition_variable& cv) : val_(val), cv_(cv) { val_ = true; } + ~bool_guard() { val_ = false; cv_.notify_all(); } + bool& val_; + std::condition_variable& cv_; +}; + +} + void ReadAction::do_read() { std::unique_lock lock(mutex_); + bool_guard bg(in_do_read_, cv_); if (started_ && !stopped_) { DDS::ConditionSeq active; const DDS::Duration_t duration = {static_cast(read_period_.sec()), static_cast(read_period_.usec() * 1000)}; - Builder::Log::log() << Bench::iso8601() << ": ReadAction::do_read START" << std::endl; - DDS::ReturnCode_t ret = ws_->wait(active, duration); - Builder::Log::log() << Bench::iso8601() << ": ReadAction::do_read STOP" << std::endl; + DDS::WaitSet_var ws_copy= ws_; + DDS::ReturnCode_t ret; + + lock.unlock(); + //Builder::Log::log() << Bench::iso8601() << ": ReadAction::do_read START" << std::endl; + ret = ws_->wait(active, duration); + //Builder::Log::log() << Bench::iso8601() << ": ReadAction::do_read STOP" << std::endl; + lock.lock(); if (stopped_) { return; } if (ret == DDS::RETCODE_OK) { - for (CORBA::ULong i = 0; i < active.length(); ++i) { + for (CORBA::ULong i = 0; !stopped_ && i < active.length(); ++i) { if (active[i] == read_condition_) { Bench::Data data; DDS::SampleInfo si; - while ((ret = data_dr_->take_next_sample(data, si)) == DDS::RETCODE_OK) { + while (!stopped_ && (ret = data_dr_->take_next_sample(data, si)) == DDS::RETCODE_OK) { if (si.valid_data && dr_listener_) { + lock.unlock(); dr_listener_->on_valid_data(data, si); + lock.lock(); } } } } } - event_dispatcher_->dispatch(event_); + if (!stopped_) { + event_dispatcher_->dispatch(event_); + } } } diff --git a/performance-tests/bench/worker/ReadAction.h b/performance-tests/bench/worker/ReadAction.h index 345fd06ec7..5fea3a4e6e 100644 --- a/performance-tests/bench/worker/ReadAction.h +++ b/performance-tests/bench/worker/ReadAction.h @@ -26,6 +26,7 @@ class ReadAction : public virtual Action, public std::enable_shared_from_this