Skip to content

Commit

Permalink
Merge pull request #85 from cactusdynamics/lockless
Browse files Browse the repository at this point in the history
Basic lockless algorithms
  • Loading branch information
shuhaowu authored Jul 13, 2024
2 parents efb41a5 + b09fce0 commit 2fc51d2
Show file tree
Hide file tree
Showing 14 changed files with 740 additions and 35 deletions.
4 changes: 3 additions & 1 deletion CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,7 @@ add_library(cactus_rt
src/cactus_rt/thread.cc
src/cactus_rt/cyclic_thread.cc
src/cactus_rt/signal_handler.cc
src/cactus_rt/experimental/lockless/atomic_bitset.cc
)

target_include_directories(cactus_rt
Expand Down Expand Up @@ -174,8 +175,9 @@ if(${CMAKE_PROJECT_NAME} STREQUAL ${PROJECT_NAME})

if (ENABLE_EXAMPLES)
message(STATUS "Building example programs. Turn it off via ENABLE_EXAMPLES=OFF")
add_subdirectory(examples/message_passing_example)
add_subdirectory(examples/lockless_example)
add_subdirectory(examples/logging_example)
add_subdirectory(examples/message_passing_example)
add_subdirectory(examples/mutex_example)
add_subdirectory(examples/signal_handling_example)
add_subdirectory(examples/simple_deadline_example)
Expand Down
10 changes: 10 additions & 0 deletions examples/lockless_example/CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
add_executable(rt_lockless_example
main.cc
)

target_link_libraries(rt_lockless_example
PRIVATE
cactus_rt
)

setup_cactus_rt_target_options(rt_lockless_example)
145 changes: 145 additions & 0 deletions examples/lockless_example/main.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,145 @@
#include <cactus_rt/experimental/lockless.h>
#include <cactus_rt/rt.h>

#include <chrono>
#include <iostream>

using cactus_rt::App;
using cactus_rt::CyclicThread;
using cactus_rt::Thread;
using cactus_rt::experimental::lockless::AtomicMessage;
using cactus_rt::experimental::lockless::spsc::RealtimeReadableValue;
using namespace std::chrono_literals;

struct Pose {
// We want default constructed values to have a flag showing it is default
// constructed. This is because the RealtimeReadableValue will default
// construct a value and it can immediately be read. We need to tell the writer
// it is invalid. It may not be necessary to do this in general.
bool valid = false;
double x = 0.0;
double y = 0.0;
double z = 0.0;
double roll = 0.0;
double pitch = 0.0;
double yaw = 0.0;

Pose() = default;

Pose(double xx, double yy, double zz, double ro, double pi, double ya) : valid(true),
x(xx),
y(yy),
z(zz),
roll(ro),
pitch(pi),
yaw(ya) {}
};

bool operator==(const Pose& p1, const Pose& p2) {
return p1.x == p2.x &&
p1.y == p2.y &&
p1.z == p2.z &&
p1.roll == p2.roll &&
p1.pitch == p2.pitch &&
p1.yaw == p2.yaw;
}

bool operator!=(const Pose& p1, const Pose& p2) {
return !(p1 == p2);
}

/**
* A struct that holds all the shared data so it can be passed to both the real-time and non-real-time threads
*/
struct Context {
AtomicMessage<bool> done = false;
RealtimeReadableValue<Pose> target_pose = {};
};

/**
* This is a real-time thread
*/
class RTThread : public CyclicThread {
Context& ctx_;
Pose current_target_pose_ = {};

static cactus_rt::CyclicThreadConfig CreateThreadConfig() {
cactus_rt::CyclicThreadConfig thread_config;
thread_config.period_ns = 1'000'000;
thread_config.cpu_affinity = std::vector<size_t>{2};
thread_config.SetFifoScheduler(80);

return thread_config;
}

public:
RTThread(Context& ctx) : CyclicThread("RTThread", CreateThreadConfig()), ctx_(ctx) {}

protected:
bool Loop(int64_t /*now*/) noexcept final {
if (ctx_.done.Read()) {
return true;
}

const Pose new_pose = ctx_.target_pose.Read();
if (!new_pose.valid) {
return false;
}

if (new_pose != current_target_pose_) {
current_target_pose_ = new_pose;
LOG_INFO(
Logger(),
"detected new pose: {} {} {} {} {} {}",
current_target_pose_.x,
current_target_pose_.y,
current_target_pose_.z,
current_target_pose_.roll,
current_target_pose_.pitch,
current_target_pose_.yaw
);
}

return false;
}
};

class NonRTThread : public Thread {
Context& ctx_;

static cactus_rt::ThreadConfig CreateThreadConfig() {
cactus_rt::ThreadConfig thread_config;
thread_config.SetOtherScheduler();
return thread_config;
}

public:
NonRTThread(Context& ctx) : Thread("NonRTThread", CreateThreadConfig()), ctx_(ctx) {}

void Run() final {
ctx_.target_pose.Write(Pose(1.5, 2.5, 3.5, 4.5, 5.5, 6.5));
std::this_thread::sleep_for(1s);

// Realistically only one of these values should be visible on the real-time thread.
ctx_.target_pose.Write(Pose(1.5, 2.5, 3.5, 4.5, 5.5, 7.5));
ctx_.target_pose.Write(Pose(1.5, 2.5, 3.5, 4.5, 5.5, 8.5));
std::this_thread::sleep_for(1s);

ctx_.done.Write(true);
}
};

int main() {
Context ctx;
auto rt_thread = std::make_shared<RTThread>(ctx);
auto non_rt_thread = std::make_shared<NonRTThread>(ctx);

App app;
app.RegisterThread(rt_thread);
app.RegisterThread(non_rt_thread);

app.Start();
app.Join();

return 0;
}
8 changes: 8 additions & 0 deletions include/cactus_rt/experimental/lockless.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
#ifndef CACTUS_RT_EXPERIMENTAL_LOCKLESS_H_
#define CACTUS_RT_EXPERIMENTAL_LOCKLESS_H_

#include "lockless/atomic_message.h"
#include "lockless/spsc/realtime_readable_value.h"
#include "lockless/spsc/realtime_writable_value.h"

#endif
54 changes: 54 additions & 0 deletions include/cactus_rt/experimental/lockless/atomic_bitset.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
#ifndef CACTUS_RT_EXPERIMENTAL_LOCKLESS_ATOMIC_BITSET_H_
#define CACTUS_RT_EXPERIMENTAL_LOCKLESS_ATOMIC_BITSET_H_

#include <atomic>
#include <cstddef>
#include <initializer_list>
#include <limits>

namespace cactus_rt::experimental::lockless {

template <typename T>
class AtomicBitset {
static_assert(std::atomic<T>::is_always_lock_free);
std::atomic<T> data_;

// Avoid any casting that might occur during bit shifting later.
static constexpr T kOne = 1;

public:
static constexpr size_t kCapacity = std::numeric_limits<T>::digits;

/**
* Always initialize the bitset to be 0 at the start.
*/
AtomicBitset() : data_(0) {}

void Set(size_t i, std::memory_order order = std::memory_order_seq_cst);

void SetRange(std::initializer_list<size_t> indices, std::memory_order order = std::memory_order_seq_cst);

void Reset(size_t i, std::memory_order order = std::memory_order_seq_cst);

void ResetRange(std::initializer_list<size_t> indices, std::memory_order order = std::memory_order_seq_cst);

void Flip(size_t i, std::memory_order order = std::memory_order_seq_cst);

void FlipRange(std::initializer_list<size_t> indices, std::memory_order order = std::memory_order_seq_cst);

void SetValue(size_t i, bool value, std::memory_order order = std::memory_order_seq_cst);

bool Test(size_t i, std::memory_order order = std::memory_order_seq_cst) const;

T Value(const std::memory_order order = std::memory_order_seq_cst) const {
return data_.load(order);
}

bool operator[](const size_t i) const {
return Test(i);
}
};

} // namespace cactus_rt::experimental::lockless

#endif
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
#ifndef CACTUS_RT_EXPERIMENTAL_LOCKLESS_SPSC_REALTIME_READABLE_VALUE_
#define CACTUS_RT_EXPERIMENTAL_LOCKLESS_SPSC_REALTIME_READABLE_VALUE_

#include <atomic>
#include <memory>

namespace cactus_rt::experimental::lockless::spsc {

/**
* This uses the CAS exchange algorithm to allow a single (real-time) thread to
* atomically read a value of arbitrary size shared by a different
* (non-real-time) thread.
*
* The original algorithm was proposed [here][1] by Dave Rowland and Fabian
* Renn-Giles.
*
* The reader for this algorithm is wait-free while the writer is lock-free. The
* reader is unable to modify the value and transmit it back to the writer with
* this algorithm as the value will be deleted. Both read and write will incur
* a copy.
*
* TODO: scope type T to be trivially copyable and default constructable?
* TODO: at least make it so that we can initialize a struct without having to do default construction.
* TODO: write doesn't need to be a copy if we can write directly into the new storage pointer.
* TODO: read also potentially doesn't need a copy. However by doing that, we have to keep the writer waiting for a bit longer.
*
* [1]: https://www.youtube.com/watch?v=PoZAo2Vikbo
*/
template <typename T>
class RealtimeReadableValue {
std::unique_ptr<T> storage_ptr_ = std::make_unique<T>();
std::atomic<T*> atomic_ptr_ = storage_ptr_.get();

public:
/**
* This atomically reads the value. It returns a copy of the data.
*/
T Read() {
// TODO: need to figure out the atomic memory order here!
T* data_ptr = atomic_ptr_.exchange(nullptr);
T data = *data_ptr;
atomic_ptr_.store(data_ptr);
return data;
}

/**
* This atomically writes the value. It will copy the value into the storage
* and free a previous storage pointer.
*/
void Write(const T& new_value) {
auto new_ptr = std::make_unique<T>(new_value);
T* expected;

do {
expected = storage_ptr_.get();
// TODO: sequential consistency is probably too strict here. Need to
// understand if acq_rel is sufficient.
} while (!atomic_ptr_.compare_exchange_weak(expected, new_ptr.get()));

// This moves new_ptr to storage_ptr, which causes storage_ptr to free up
// whatever value was in it from before.
storage_ptr_ = std::move(new_ptr);
}
};

} // namespace cactus_rt::experimental::lockless::spsc

#endif
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
#ifndef CACTUS_RT_EXPERIMENTAL_LOCKLESS_SPSC_REALTIME_WRITABLE_VALUE_
#define CACTUS_RT_EXPERIMENTAL_LOCKLESS_SPSC_REALTIME_WRITABLE_VALUE_

#include <array>
#include <atomic>

namespace cactus_rt::experimental::lockless::spsc {

/**
* This uses a double buffer algorithm to allow a single (real-time) thread to
* atomically write a value of arbitrary size so it can be read by a different
* (non-real-time) thread.
*
* The original algorithm was proposed [here][1] by Dave Rowland and Fabian
* Renn-Giles.
*
* The writer for this algorithm is wait-free while the reader is lock-free.
* Both read and write will incur a copy.
*
* TODO: write doesn't need to be copied if we write in place.
*
* [1]: https://www.youtube.com/watch?v=PoZAo2Vikbo
*/
template <typename T>
class RealtimeWritableValue {
std::array<T, 2> buf_;
std::atomic<uint32_t> idx_ = 0;

static constexpr uint32_t kIdxMask = 1 << 0;
static constexpr uint32_t kNewDataMask = 1 << 1;
static constexpr uint32_t kBusyMask = 1 << 2;

public:
T Read() {
auto current = idx_.load();
if ((current & kNewDataMask) != 0) {
uint32_t new_value;
do {
current &= ~kBusyMask; // idx only change if not busy.
new_value = (current ^ kIdxMask) & kIdxMask;
} while (!idx_.compare_exchange_weak(current, new_value));

current = new_value;
}

return buf_[(current & kIdxMask) ^ 1];
}

void Write(const T& new_value) {
auto i = idx_.fetch_or(kBusyMask) & kIdxMask;
buf_[i] = new_value;
idx_.store((i & kIdxMask) | kNewDataMask);
}
};

} // namespace cactus_rt::experimental::lockless::spsc

#endif
Loading

0 comments on commit 2fc51d2

Please sign in to comment.