Skip to content

Commit

Permalink
refactor(perception_rviz_plugin): apply thread pool to manage detache…
Browse files Browse the repository at this point in the history
…d thread (autowarefoundation#5418)

* apply thread pool to manage detached thread

Signed-off-by: Owen-Liuyuxuan <[email protected]>

* style(pre-commit): autofix

* clean up the destructor

Signed-off-by: Owen-Liuyuxuan <[email protected]>

* style(pre-commit): autofix

* use function object in the queue instead

Signed-off-by: Owen-Liuyuxuan <[email protected]>

* style(pre-commit): autofix

* fix condition variable naming problem

Signed-off-by: Owen-Liuyuxuan <[email protected]>

* add utility include for CI

Signed-off-by: Owen-Liuyuxuan <[email protected]>

---------

Signed-off-by: Owen-Liuyuxuan <[email protected]>
Co-authored-by: Owen-Liuyuxuan <[email protected]>
Co-authored-by: pre-commit-ci[bot] <66853113+pre-commit-ci[bot]@users.noreply.github.com>
  • Loading branch information
3 people authored and TomohitoAndo committed Nov 17, 2023
1 parent b137419 commit 70b3698
Show file tree
Hide file tree
Showing 2 changed files with 64 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,11 @@

#include <condition_variable>
#include <list>
#include <queue>
#include <set>
#include <string>
#include <unordered_map>
#include <utility>
#include <vector>

namespace autoware
Expand All @@ -45,10 +47,31 @@ class AUTOWARE_AUTO_PERCEPTION_RVIZ_PLUGIN_PUBLIC PredictedObjectsDisplay
using PredictedObjects = autoware_auto_perception_msgs::msg::PredictedObjects;

PredictedObjectsDisplay();
~PredictedObjectsDisplay()
{
{
std::unique_lock<std::mutex> lock(queue_mutex);
should_terminate = true;
}
condition.notify_all();
for (std::thread & active_thread : threads) {
active_thread.join();
}
threads.clear();
}

private:
void processMessage(PredictedObjects::ConstSharedPtr msg) override;

void queueJob(std::function<void()> job)
{
{
std::unique_lock<std::mutex> lock(queue_mutex);
jobs.push(std::move(job));
}
condition.notify_one();
}

boost::uuids::uuid to_boost_uuid(const unique_identifier_msgs::msg::UUID & uuid_msg)
{
const std::string uuid_str = uuid_to_string(uuid_msg);
Expand Down Expand Up @@ -100,6 +123,8 @@ class AUTOWARE_AUTO_PERCEPTION_RVIZ_PLUGIN_PUBLIC PredictedObjectsDisplay
PredictedObjects::ConstSharedPtr msg);
void workerThread();

void messageProcessorThreadJob();

void update(float wall_dt, float ros_dt) override;

std::unordered_map<boost::uuids::uuid, int32_t, boost::hash<boost::uuids::uuid>> id_map;
Expand All @@ -108,6 +133,14 @@ class AUTOWARE_AUTO_PERCEPTION_RVIZ_PLUGIN_PUBLIC PredictedObjectsDisplay
int32_t marker_id = 0;
const int32_t PATH_ID_CONSTANT = 1e3;

// max_num_threads: number of threads created in the thread pool, hard-coded to be 1;
int max_num_threads;

bool should_terminate{false};
std::mutex queue_mutex;
std::vector<std::thread> threads;
std::queue<std::function<void()>> jobs;

PredictedObjects::ConstSharedPtr msg;
bool consumed{false};
std::mutex mutex;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,27 +25,44 @@ namespace object_detection
{
PredictedObjectsDisplay::PredictedObjectsDisplay() : ObjectPolygonDisplayBase("tracks")
{
std::thread worker(&PredictedObjectsDisplay::workerThread, this);
worker.detach();
max_num_threads = 1; // hard code the number of threads to be created

for (int ii = 0; ii < max_num_threads; ++ii) {
threads.emplace_back(std::thread(&PredictedObjectsDisplay::workerThread, this));
}
}

void PredictedObjectsDisplay::workerThread()
{
{ // A standard working thread that waiting for jobs
while (true) {
std::unique_lock<std::mutex> lock(mutex);
condition.wait(lock, [this] { return this->msg; });
std::function<void()> job;
{
std::unique_lock<std::mutex> lock(queue_mutex);
condition.wait(lock, [this] { return !jobs.empty() || should_terminate; });
if (should_terminate) {
return;
}
job = jobs.front();
jobs.pop();
}
job();
}
}

auto tmp_msg = this->msg;
this->msg.reset();
void PredictedObjectsDisplay::messageProcessorThreadJob()
{
// Receiving
std::unique_lock<std::mutex> lock(mutex);
auto tmp_msg = this->msg;
this->msg.reset();
lock.unlock();

lock.unlock();
auto tmp_markers = createMarkers(tmp_msg);

auto tmp_markers = createMarkers(tmp_msg);
lock.lock();
markers = tmp_markers;
lock.lock();
markers = tmp_markers;

consumed = true;
}
consumed = true;
}

std::vector<visualization_msgs::msg::Marker::SharedPtr> PredictedObjectsDisplay::createMarkers(
Expand Down Expand Up @@ -188,7 +205,7 @@ void PredictedObjectsDisplay::processMessage(PredictedObjects::ConstSharedPtr ms
std::unique_lock<std::mutex> lock(mutex);

this->msg = msg;
condition.notify_one();
queueJob(std::bind(&PredictedObjectsDisplay::messageProcessorThreadJob, this));
}

void PredictedObjectsDisplay::update(float wall_dt, float ros_dt)
Expand Down

0 comments on commit 70b3698

Please sign in to comment.