Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

iox-#2177 Update SPSC SoFi #2214

Merged
merged 8 commits into from
Aug 29, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions doc/website/release-notes/iceoryx-unreleased.md
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,8 @@
- Listener examples need to take all samples in the callback [#2251](https://github.com/eclipse-iceoryx/iceoryx/issues/2251)
- 'iox::string' tests can exceed the translation unit compilation timeout [#2278](https://github.com/eclipse-iceoryx/iceoryx/issues/2278)
- Building iceoryx with bazel on Windows is broken [#2320](https://github.com/eclipse-iceoryx/iceoryx/issues/2320)
- Fix wrong memory orders in SpscSoFi [#2177](https://github.com/eclipse-iceoryx/iceoryx/issues/2177)
-

**Refactoring:**

Expand Down
166 changes: 84 additions & 82 deletions iceoryx_hoofs/concurrent/buffer/include/iox/detail/spsc_sofi.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -25,117 +25,118 @@

#include <cstdint>
#include <cstring>
#include <utility>

namespace iox
{
namespace concurrent
{
/// @brief
/// Thread safe producer and consumer queue with a safe overflowing behavior.
/// SpscSofi is designed in a FIFO Manner but prevents data loss when pushing into
/// a full SpscSofi. When SpscSofi is full and a Sender tries to push, the data at the
/// current read position will be returned. SpscSofi is threadsafe without using
/// locks. When the buffer is filled, new data is written starting at the
/// beginning of the buffer and overwriting the old.The SpscSofi is especially
/// designed to provide fixed capacity storage. When its capacity is exhausted,
/// newly inserted elements will cause elements either at the beginning
/// to be overwritten.The SpscSofi only allocates memory when
/// created , capacity can be is adjusted explicitly.
///
/// @brief Thread safe lock-free single producer and single consumer queue with a safe
/// overflowing behavior
/// @note When SpscSoFi is full and a sender tries to push, the data at the current read pos will be
/// returned. This behavior mimics a FiFo queue but prevents resource leaks when pushing into
/// a full SpscSoFi.
/// SpscSoFi is especially designed to provide fixed capacity storage.
/// It's an expected behavior that when push/pop are called concurrently and SpscSoFi is full, as
/// many elements as specified with 'CapacityValue' can be removed
/// @param[in] ValueType DataType to be stored, must be trivially copyable
/// @param[in] CapacityValue Capacity of the SpscSofi
template <class ValueType, uint64_t CapacityValue>
class SpscSofi
{
static_assert(std::is_trivially_copyable<ValueType>::value,
"SpscSofi can handle only trivially copyable data types");
"SpscSofi can only handle trivially copyable data types since 'memcpy' is used internally");
/// @brief Check if Atomic integer is lockfree on platform
/// ATOMIC_INT_LOCK_FREE = 2 - is always lockfree
/// ATOMIC_INT_LOCK_FREE = 1 - is sometimes lockfree
/// ATOMIC_INT_LOCK_FREE = 0 - is never lockfree
static_assert(2 <= ATOMIC_INT_LOCK_FREE, "SpscSofi is not able to run lock free on this data type");

/// @brief Internal size needs to be bigger than the size desirred by the user
/// This is because of buffer empty detection and overflow handling
static constexpr uint32_t INTERNAL_SIZE_ADD_ON = 1;

/// @brief This is the resulting internal size on creation
static constexpr uint32_t INTERNAL_SPSC_SOFI_SIZE = CapacityValue + INTERNAL_SIZE_ADD_ON;
// To ensure a consumer gets at least the amount of capacity of data when a queue is full, an additional free
// slot (add-on) is required.
// ========================================================================
// Consider the following scenario when there is no "capacity add-on":
// 1. CapacityValue = 2
// |--A--|--B--|
// ^
// w=2, r=0
// 2. The producer thread pushes a new element
// 3. Increment the read position (this effectively reduces the capacity and is the reason the internal capacity
// needs to be larger;
// |--A--|--B--|
// ^ ^
// w=2 r=1
// 4. The producer thread is suspended, the consumer thread pops a value
// |--A--|-----|
// ^
// w=2, r=2
// 5. The consumer tries to pop another value but the queue looks empty as
// write position == read position: the consumer cannot pop
// out CAPACITY amount of samples even though the queue was full
// ========================================================================
// With "capacity add-on"
// 1. CapacityValue = 2, InternalCapacity = 3
// |--A--|--B--|----|
// ^ ^
// r=0 w=2
// 2. The producer threads pushes a new element
// 3. First write the element at index 2 % capacity and increment the write index
// |--A--|--B--|--C--|
// ^
// w=3, r=0,
// 4. Then increment the read position and return the overflowing 'A'
// |-----|--B--|--C--|
// ^ ^
// w=3 r=1
// 5. The producer thread is suspended, the consumer thread pops a value
// |--A--|-----|--C--|
// ^ ^
// w=3 r=2
// 6. The consumer thread pops another value
// |--A--|-----|-----|
// ^
// w=3, r=3
// 7. Now, write position == read position so we cannot pop another element: the queue looks empty. We managed to
// pop CapacityValue elements
// ========================================================================
static constexpr uint32_t INTERNAL_CAPACITY_ADDON = 1;

/// @brief Internal capacity of the queue at creation
static constexpr uint32_t INTERNAL_SPSC_SOFI_CAPACITY = CapacityValue + INTERNAL_CAPACITY_ADDON;

public:
/// @brief default constructor which constructs an empty sofi
/// @brief default constructor which constructs an empty SpscSofi
SpscSofi() noexcept = default;

/// @brief pushs an element into SpscSofi. if SpscSofi is full the oldest data will be
/// @brief push an element into SpscSofi. if SpscSofi is full the oldest data will be
/// returned and the pushed element is stored in its place instead.
/// @param[in] valueIn value which should be stored
/// @param[out] valueOut if SpscSofi is overflowing the value of the overridden value
/// @param[in] value_in value which should be stored
/// @param[out] value_out if SpscSofi is overflowing the value of the overridden value
/// is stored here
/// @concurrent restricted thread safe: single pop, single push no
/// push calls from multiple contexts
/// @return return true if push was sucessfull else false.
/// @code
/// (initial situation, SpscSofi is FULL)
/// Start|-----A-------|
/// |-----B-------|
/// |-----C-------|
/// |-----D-------|
///
///
/// (calling push with data ’E’)
/// Start|-----E-------|
/// |-----A-------|
/// |-----B-------|
/// |-----C-------|
/// (’D’ is returned as valueOut)
///
/// ###################################################################
///
/// (if SpscSofi is not FULL , calling push() add new data)
/// Start|-------------|
/// |-------------| ( Initial SpscSofi )
/// (push() Called two times)
///
/// |-------------|
/// (New Data)
/// |-------------|
/// (New Data)
/// @endcode
/// @note restricted thread safe: can only be called from one thread. The authorization to push into the
/// SpscSofi can be transferred to another thread if appropriate synchronization mechanisms are used.
/// @return return true if push was successful else false.
/// @remarks
/// 1. SpscSofi is empty |-----|-----|
/// 2. push an element |--A--|-----|
/// 3. push an element |--A--|--B--|
/// 5. SpscSofi is full
/// 6. push an element |--C--|--B--| -> value_out is set to 'A'
bool push(const ValueType& valueIn, ValueType& valueOut) noexcept;

/// @brief pop the oldest element
/// @param[out] valueOut storage of the pop'ed value
/// @concurrent restricted thread safe: single pop, single push no
/// pop or popIf calls from multiple contexts
/// @concurrent restricted thread safe: can only be called from one thread. The authorization to pop from the
/// SpscSofi can be transferred to another thread if appropriate synchronization mechanisms are used.
/// @return false if SpscSofi is empty, otherwise true
bool pop(ValueType& valueOut) noexcept;

/// @brief conditional pop call to provide an alternative for a peek
/// and pop approach. If the verificator returns true the
/// peeked element is returned.
/// @param[out] valueOut storage of the pop'ed value
/// @param[in] verificator callable of type bool(const ValueType& peekValue)
/// which takes the value which would be pop'ed as argument and returns
/// true if it should be pop'ed, otherwise false
/// @code
/// int limit = 7128;
/// mysofi.popIf(value, [=](const ValueType & peek)
/// {
/// return peek < limit; // pop only when peek is smaller than limit
/// }
/// ); // pop's a value only if it is smaller than 9012
/// @endcode
/// @concurrent restricted thread safe: single pop, single push no
/// pop or popIf calls from multiple contexts
/// @return false if SpscSofi is empty or when verificator returns false, otherwise true
template <typename Verificator_T>
bool popIf(ValueType& valueOut, const Verificator_T& verificator) noexcept;

/// @brief returns true if SpscSofi is empty, otherwise false
/// @note the use of this function is limited in the concurrency case. if you
/// call this and in another thread pop is called the result can be out
/// of date as soon as you require it
/// @concurrent unrestricted thread safe
/// @concurrent unrestricted thread safe (the result might already be outdated when used). Expected to be called
/// from either the producer or the consumer thread but not from a third thread
bool empty() const noexcept;

/// @brief resizes SpscSofi
Expand All @@ -150,15 +151,16 @@ class SpscSofi
uint64_t capacity() const noexcept;

/// @brief returns the current size of SpscSofi
/// @concurrent unrestricted thread safe
/// @concurrent unrestricted thread safe (the result might already be outdated when used). Expected to be called
/// from either the producer or the consumer thread but not from a third thread
uint64_t size() const noexcept;

private:
UninitializedArray<ValueType, INTERNAL_SPSC_SOFI_SIZE> m_data;
uint64_t m_size = INTERNAL_SPSC_SOFI_SIZE;
std::pair<uint64_t, uint64_t> getReadWritePositions() const noexcept;

/// @brief the write/read pointers are "atomic pointers" so that they are not
/// reordered (read or written too late)
private:
UninitializedArray<ValueType, INTERNAL_SPSC_SOFI_CAPACITY> m_data;
uint64_t m_size = INTERNAL_SPSC_SOFI_CAPACITY;
Atomic<uint64_t> m_readPosition{0};
Atomic<uint64_t> m_writePosition{0};
};
Expand Down
Loading
Loading