From 060475c4a8bf28204c24694cbb0447c6ecbfd083 Mon Sep 17 00:00:00 2001 From: Tomohito ANDO Date: Mon, 20 Nov 2023 10:33:45 +0900 Subject: [PATCH] refactor(perception_rviz_plugin): apply thread pool to manage detached thread (#1023) refactor(perception_rviz_plugin): apply thread pool to manage detached thread (#5418) * apply thread pool to manage detached thread * style(pre-commit): autofix * clean up the destructor * style(pre-commit): autofix * use function object in the queue instead * style(pre-commit): autofix * fix condition variable naming problem * add utility include for CI --------- Signed-off-by: Owen-Liuyuxuan Co-authored-by: Yuxuan Liu <619684051@qq.com> Co-authored-by: Owen-Liuyuxuan Co-authored-by: pre-commit-ci[bot] <66853113+pre-commit-ci[bot]@users.noreply.github.com> --- .../predicted_objects_display.hpp | 33 ++++++++++++++ .../predicted_objects_display.cpp | 45 +++++++++++++------ 2 files changed, 64 insertions(+), 14 deletions(-) diff --git a/common/autoware_auto_perception_rviz_plugin/include/object_detection/predicted_objects_display.hpp b/common/autoware_auto_perception_rviz_plugin/include/object_detection/predicted_objects_display.hpp index 5493f1dd594ce..2896286970217 100644 --- a/common/autoware_auto_perception_rviz_plugin/include/object_detection/predicted_objects_display.hpp +++ b/common/autoware_auto_perception_rviz_plugin/include/object_detection/predicted_objects_display.hpp @@ -24,9 +24,11 @@ #include #include +#include #include #include #include +#include #include namespace autoware @@ -45,10 +47,31 @@ class AUTOWARE_AUTO_PERCEPTION_RVIZ_PLUGIN_PUBLIC PredictedObjectsDisplay using PredictedObjects = autoware_auto_perception_msgs::msg::PredictedObjects; PredictedObjectsDisplay(); + ~PredictedObjectsDisplay() + { + { + std::unique_lock lock(queue_mutex); + should_terminate = true; + } + condition.notify_all(); + for (std::thread & active_thread : threads) { + active_thread.join(); + } + threads.clear(); + } private: void processMessage(PredictedObjects::ConstSharedPtr msg) override; + void queueJob(std::function job) + { + { + std::unique_lock lock(queue_mutex); + jobs.push(std::move(job)); + } + condition.notify_one(); + } + boost::uuids::uuid to_boost_uuid(const unique_identifier_msgs::msg::UUID & uuid_msg) { const std::string uuid_str = uuid_to_string(uuid_msg); @@ -100,6 +123,8 @@ class AUTOWARE_AUTO_PERCEPTION_RVIZ_PLUGIN_PUBLIC PredictedObjectsDisplay PredictedObjects::ConstSharedPtr msg); void workerThread(); + void messageProcessorThreadJob(); + void update(float wall_dt, float ros_dt) override; std::unordered_map> id_map; @@ -108,6 +133,14 @@ class AUTOWARE_AUTO_PERCEPTION_RVIZ_PLUGIN_PUBLIC PredictedObjectsDisplay int32_t marker_id = 0; const int32_t PATH_ID_CONSTANT = 1e3; + // max_num_threads: number of threads created in the thread pool, hard-coded to be 1; + int max_num_threads; + + bool should_terminate{false}; + std::mutex queue_mutex; + std::vector threads; + std::queue> jobs; + PredictedObjects::ConstSharedPtr msg; bool consumed{false}; std::mutex mutex; diff --git a/common/autoware_auto_perception_rviz_plugin/src/object_detection/predicted_objects_display.cpp b/common/autoware_auto_perception_rviz_plugin/src/object_detection/predicted_objects_display.cpp index 24e67a6f44e95..2cc5397d18721 100644 --- a/common/autoware_auto_perception_rviz_plugin/src/object_detection/predicted_objects_display.cpp +++ b/common/autoware_auto_perception_rviz_plugin/src/object_detection/predicted_objects_display.cpp @@ -25,27 +25,44 @@ namespace object_detection { PredictedObjectsDisplay::PredictedObjectsDisplay() : ObjectPolygonDisplayBase("tracks") { - std::thread worker(&PredictedObjectsDisplay::workerThread, this); - worker.detach(); + max_num_threads = 1; // hard code the number of threads to be created + + for (int ii = 0; ii < max_num_threads; ++ii) { + threads.emplace_back(std::thread(&PredictedObjectsDisplay::workerThread, this)); + } } void PredictedObjectsDisplay::workerThread() -{ +{ // A standard working thread that waiting for jobs while (true) { - std::unique_lock lock(mutex); - condition.wait(lock, [this] { return this->msg; }); + std::function job; + { + std::unique_lock lock(queue_mutex); + condition.wait(lock, [this] { return !jobs.empty() || should_terminate; }); + if (should_terminate) { + return; + } + job = jobs.front(); + jobs.pop(); + } + job(); + } +} - auto tmp_msg = this->msg; - this->msg.reset(); +void PredictedObjectsDisplay::messageProcessorThreadJob() +{ + // Receiving + std::unique_lock lock(mutex); + auto tmp_msg = this->msg; + this->msg.reset(); + lock.unlock(); - lock.unlock(); + auto tmp_markers = createMarkers(tmp_msg); - auto tmp_markers = createMarkers(tmp_msg); - lock.lock(); - markers = tmp_markers; + lock.lock(); + markers = tmp_markers; - consumed = true; - } + consumed = true; } std::vector PredictedObjectsDisplay::createMarkers( @@ -188,7 +205,7 @@ void PredictedObjectsDisplay::processMessage(PredictedObjects::ConstSharedPtr ms std::unique_lock lock(mutex); this->msg = msg; - condition.notify_one(); + queueJob(std::bind(&PredictedObjectsDisplay::messageProcessorThreadJob, this)); } void PredictedObjectsDisplay::update(float wall_dt, float ros_dt)