Skip to content

Commit

Permalink
Address reviewer's comments
Browse files Browse the repository at this point in the history
  • Loading branch information
albtam committed Apr 23, 2024
1 parent 0367bb3 commit eedfddc
Showing 1 changed file with 13 additions and 13 deletions.
26 changes: 13 additions & 13 deletions iceoryx_hoofs/concurrent/buffer/include/iox/detail/spsc_sofi.inl
Original file line number Diff line number Diff line change
Expand Up @@ -117,30 +117,28 @@ inline bool SpscSofi<ValueType, CapacityValue>::pop(ValueType& valueOut) noexcep
// 3. The consumer thread loads m_readPosition => 3. The pop method returns false
// => Whereas the queue was full, pop returned false giving the impression that the queue if empty
uint64_t currentReadPosition = m_readPosition.load(std::memory_order_acquire);
uint64_t nextReadPosition{0};

do
{
// SYNC POINT READ: m_data
// See explanation of the corresponding synchronization point in push()
if (currentReadPosition == m_writePosition.load(std::memory_order_acquire))
{
nextReadPosition = currentReadPosition;
return false;
// We don't need to check if read has changed, as it is enough to know that the empty
// state was valid in the past. The same race can also happen after the while loop and
// before the return operation
}
else
{
// we use memcpy here, to ensure that there is no logic in copying the data
std::memcpy(&valueOut, &m_data[currentReadPosition % m_size], sizeof(ValueType));
}

// we use memcpy here, to ensure that there is no logic in copying the data
std::memcpy(&valueOut, &m_data[currentReadPosition % m_size], sizeof(ValueType));


// We need to check if m_readPosition hasn't changed otherwise valueOut might be corrupted
// =============================================
// While memory synchronization is not needed for m_readPosition, we need to ensure that the
// 'memcpy' happens before the CAS operation.
// Corresponding m_readPosition.load(acquire) is in the push method
// 'memcpy' happens before updating m_readPosition.
// Corresponding m_readPosition load/acquire is in the push method
// =============================================
// ABA problem: m_readPosition is an uint64_t. Assuming a thread is pushing at a rate of 1 GHz
// while this thread is blocked, we would still need more than 500 years to overflow
Expand Down Expand Up @@ -184,8 +182,10 @@ inline bool SpscSofi<ValueType, CapacityValue>::push(const ValueType& valueIn, V
// 5. The consumer thread missed the chance to pop the element in the blink of an eye
m_writePosition.store(nextWritePosition, std::memory_order_release);

// We need to establish an happens-before relationship with 'm_writePosition.load(std::memory_order_relaxed)'
uint64_t currentReadPosition = m_readPosition.load(std::memory_order_acquire);
// Memory order relaxed is enough since:
// - no synchronization needed when loading
// - the operation cannot move below without observable changes
uint64_t currentReadPosition = m_readPosition.load(std::memory_order_relaxed);

// Check if queue is full: since we have an extra element (INTERNAL_CAPACITY_ADD_ON), we need to
// check if there is a free position for the *next* write position
Expand All @@ -206,14 +206,14 @@ inline bool SpscSofi<ValueType, CapacityValue>::push(const ValueType& valueIn, V
// Another issue might be that two consecutive pushes (not concurrent) happen on different CPU
// cores without synchronization, then the memory also needs to be synchronized for the overflow
// case.
// Memory order failure is memory_order_relaxed since there is no further synchronization ////
// Memory order failure is memory_order_relaxed since there is no further synchronization
// needed if there is no overflow
// ======================================
// ABA problem: m_readPosition is an uint64_t. Assuming a thread is popping at a rate of 1 GHz while
// this thread is blocked, we would still need more than 500 years to overflow m_readPosition and
// encounter the ABA problem
if (m_readPosition.compare_exchange_strong(
currentReadPosition, currentReadPosition + 1U, std::memory_order_acq_rel, std::memory_order_relaxed))
currentReadPosition, currentReadPosition + 1U, std::memory_order_acq_rel, std::memory_order_acquire))
{
// Since INTERNAL_SOFI_CAPACITY = CapacityValue + 1, it can happen that we return more
// elements than the CapacityValue by calling push and pop concurrently (in case of an
Expand Down

0 comments on commit eedfddc

Please sign in to comment.