Skip to content

Commit

Permalink
Fix the barrier logic for threads synchronization (#811)
Browse files Browse the repository at this point in the history
  • Loading branch information
LoreMoretti authored Feb 20, 2024
1 parent 0af8ac5 commit eb4b82d
Show file tree
Hide file tree
Showing 5 changed files with 38 additions and 19 deletions.
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,10 @@ All notable changes to this project are documented in this file.
### Changed
- 🤖 [ergoCubSN001] Add logging of the wrist and fix the name of the waist imu (https://github.com/ami-iit/bipedal-locomotion-framework/pull/810)


### Fixed
- Fix the barrier logic for threads synchronization (https://github.com/ami-iit/bipedal-locomotion-framework/pull/811)

### Removed

## [0.18.0] - 2024-01-23
Expand Down
28 changes: 14 additions & 14 deletions src/System/include/BipedalLocomotion/System/AdvanceableRunner.h
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ template <class _Advanceable> class AdvanceableRunner
* @return a thread of containing the running process. If the runner was not correctly
* initialized the thread is invalid.
*/
std::thread run(std::optional<std::reference_wrapper<Barrier>> barrier = {});
std::thread run(std::shared_ptr<Barrier> barrier = nullptr);

/**
* Stop the thread
Expand Down Expand Up @@ -227,8 +227,7 @@ bool AdvanceableRunner<_Advanceable>::setOutputResource(
}

template <class _Advanceable>
std::thread
AdvanceableRunner<_Advanceable>::run(std::optional<std::reference_wrapper<Barrier>> barrier)
std::thread AdvanceableRunner<_Advanceable>::run(std::shared_ptr<Barrier> barrier)
{
constexpr auto logPrefix = "[AdvanceableRunner::run]";

Expand Down Expand Up @@ -273,8 +272,18 @@ AdvanceableRunner<_Advanceable>::run(std::optional<std::reference_wrapper<Barrie

// run the thread
m_isRunning = true;
auto function = [&]() -> bool {
auto function = [&](std::shared_ptr<Barrier> barrier) -> bool {
constexpr auto logPrefix = "[AdvanceableRunner::run]";

// synchronize the threads
if (!(barrier == nullptr))
{
log()->debug("{} - {} This thread is waiting for the other threads.",
logPrefix,
m_info.name);
barrier->wait();
}

auto time = BipedalLocomotion::clock().now();
auto oldTime = time;
auto wakeUpTime = time;
Expand Down Expand Up @@ -363,16 +372,7 @@ AdvanceableRunner<_Advanceable>::run(std::optional<std::reference_wrapper<Barrie
return this->m_advanceable->close();
};

// if the barrier is passed the run method, synchronization is performed
if (barrier.has_value())
{
log()->debug("{} - {} This thread is waiting for the other threads.",
logPrefix,
m_info.name);
barrier.value().get().wait();
}

return std::thread(function);
return std::thread(function, barrier);
}

template <class _Advanceable> void AdvanceableRunner<_Advanceable>::stop()
Expand Down
11 changes: 8 additions & 3 deletions src/System/include/BipedalLocomotion/System/Barrier.h
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,9 @@ class Barrier
{
public:
/**
* Constructor.
* @param counter initial value of the expected counter
* Calls the constructor. It creates a new Barrier with the given counter.
*/
explicit Barrier(const std::size_t counter);
static std::shared_ptr<Barrier> create(const std::size_t counter);

/**
* Blocks this thread at the phase synchronization point until its phase completion step is run
Expand All @@ -40,6 +39,12 @@ class Barrier
std::size_t m_initialCount;
std::size_t m_count;
std::size_t m_generation;

/**
* Constructor.
* @param counter initial value of the expected counter
*/
explicit Barrier(const std::size_t counter);
};
} // namespace System
} // namespace BipedalLocomotion
Expand Down
12 changes: 11 additions & 1 deletion src/System/src/Barrier.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,11 @@
* distributed under the terms of the BSD-3-Clause license.
*/

#include <memory>
#include <mutex>

#include <BipedalLocomotion/System/Barrier.h>
#include <BipedalLocomotion/TextLogging/Logger.h>

using namespace BipedalLocomotion::System;

Expand All @@ -18,11 +20,18 @@ Barrier::Barrier(const std::size_t counter)
{
}

std::shared_ptr<Barrier> Barrier::create(const std::size_t counter)
{
return std::shared_ptr<Barrier>(new Barrier(counter));
}

void Barrier::wait()
{
constexpr auto logPrefix = "[Barrier::wait]";

std::unique_lock lock{m_mutex};
const auto tempGeneration = m_generation;
if ((--m_count) == 1)
if ((--m_count) == 0)
{
// all threads reached the barrier, so we can consider them synchronized
m_generation++;
Expand All @@ -31,6 +40,7 @@ void Barrier::wait()
m_count = m_initialCount;

// notify the other threads
log()->debug("{} All threads reached the barrier.", logPrefix);
m_cond.notify_all();
} else
{
Expand Down
3 changes: 2 additions & 1 deletion src/System/tests/AdvanceableRunnerTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
#include <BipedalLocomotion/System/Barrier.h>
#include <BipedalLocomotion/System/SharedResource.h>
#include <BipedalLocomotion/System/Source.h>
#include <memory>

using namespace BipedalLocomotion::System;
using namespace BipedalLocomotion::ParametersHandler;
Expand Down Expand Up @@ -120,7 +121,7 @@ TEST_CASE("Test Block")
SECTION("With synchronization")
{
constexpr std::size_t numberOfRunners = 2;
Barrier barrier(numberOfRunners);
auto barrier = Barrier::create(numberOfRunners);

// run the block
auto thread0 = runner0.run(barrier);
Expand Down

0 comments on commit eb4b82d

Please sign in to comment.