Skip to content

Commit

Permalink
Add early return to pop method
Browse files Browse the repository at this point in the history
  • Loading branch information
albtam committed Apr 19, 2024
1 parent f15ca92 commit 0367bb3
Showing 1 changed file with 24 additions and 26 deletions.
50 changes: 24 additions & 26 deletions iceoryx_hoofs/concurrent/buffer/include/iox/detail/spsc_sofi.inl
Original file line number Diff line number Diff line change
Expand Up @@ -99,43 +99,42 @@ inline bool SpscSofi<ValueType, CapacityValue>::empty() const noexcept
template <class ValueType, uint64_t CapacityValue>
inline bool SpscSofi<ValueType, CapacityValue>::pop(ValueType& valueOut) noexcept
{
// Memory synchronization is not needed but we need to prevent operation reordering to avoid the following scenario
// where the CPU reorder the load of m_readPosition and m_writePosition:
// We need 'm_readPosition.load(std::memory_order_acquire)' to happen before
// 'm_writePosition.load(std::memory_order_acquire)' to avoid the following scenario where the
// CPU reorders the load of m_readPosition and m_writePosition:
// 0. Initial situation (the queue is full)
// |----|--B--|--C--|
// ^ ^
// w=3 r=1
// 1. The consumer thread loads m_writePosition => 3
// |----|--B--|--C--|
// ^ ^
// w=3 r=1
// 2. The producer thread pushes two times
// |--D--|--E--|--C--|
// ^ ^
// r=3 w=5
// 3. The consumer thread loads m_readPosition => 3. The pop method returns false
// => Whereas the queue was full, pop returned false giving the impression that the queue if empty
uint64_t currentReadPosition = m_readPosition.load(std::memory_order_acquire);
uint64_t nextReadPosition{0};

bool popWasSuccessful{true};

do
{
// SYNC POINT READ: m_data
// See explanation of the corresponding synchronization point in push()
if (currentReadPosition == m_writePosition.load(std::memory_order_acquire))
{
nextReadPosition = currentReadPosition;
popWasSuccessful = false;
// We cannot just return false (i.e. we need to continue the loop) to avoid the following situation:
// 0. Initial situation (the queue is full)
// |----|--B--|--C--|
// ^ ^
// w=3 r=1
// 1. The consumer thread loads m_writePosition => 3
// |----|--B--|--C--|
// ^ ^
// w=3 r=1
// 2. The producer thread pushes two times
// |--D--|--E--|-----|
// ^ ^
// r=3 w=5
// 3. The consumer thread loads m_readPosition => 3 The pop method returns false
// => Whereas the queue was full, pop returned false giving the impression that the queue if empty
return false;
// We don't need to check if read has changed, as it is enough to know that the empty
// state was valid in the past. The same race can also happen after the while loop and
// before the return operation
}
else
{
// we use memcpy here, to ensure that there is no logic in copying the data
std::memcpy(&valueOut, &m_data[currentReadPosition % m_size], sizeof(ValueType));
nextReadPosition = currentReadPosition + 1U;
popWasSuccessful = true;
}

// We need to check if m_readPosition hasn't changed otherwise valueOut might be corrupted
Expand All @@ -147,9 +146,9 @@ inline bool SpscSofi<ValueType, CapacityValue>::pop(ValueType& valueOut) noexcep
// while this thread is blocked, we would still need more than 500 years to overflow
// m_readPosition and encounter the ABA problem
} while (!m_readPosition.compare_exchange_weak(
currentReadPosition, nextReadPosition, std::memory_order_acq_rel, std::memory_order_acquire));
currentReadPosition, currentReadPosition + 1U, std::memory_order_acq_rel, std::memory_order_acquire));

return popWasSuccessful;
return true;
}

template <class ValueType, uint64_t CapacityValue>
Expand Down Expand Up @@ -185,8 +184,7 @@ inline bool SpscSofi<ValueType, CapacityValue>::push(const ValueType& valueIn, V
// 5. The consumer thread missed the chance to pop the element in the blink of an eye
m_writePosition.store(nextWritePosition, std::memory_order_release);

// While memory synchronization is not needed with m_readPosition, we need
// memory_order_acquire to avoid the reordering of the operation.
// We need to establish an happens-before relationship with 'm_writePosition.load(std::memory_order_relaxed)'
uint64_t currentReadPosition = m_readPosition.load(std::memory_order_acquire);

// Check if queue is full: since we have an extra element (INTERNAL_CAPACITY_ADD_ON), we need to
Expand Down

0 comments on commit 0367bb3

Please sign in to comment.