diff --git a/src/pevents.cpp b/src/pevents.cpp index e23650b..a0a3f3f 100644 --- a/src/pevents.cpp +++ b/src/pevents.cpp @@ -33,7 +33,7 @@ namespace neosmart { int EventsLeft; // WFMO } Status; bool WaitAll; - bool StillWaiting; + std::atomic StillWaiting; void Destroy() { pthread_mutex_destroy(&Mutex); @@ -64,14 +64,14 @@ namespace neosmart { #ifdef WFMO static bool RemoveExpiredWaitHelper(neosmart_wfmo_info_t_ wait) { - if (wait.Waiter->StillWaiting) { + if (wait.Waiter->StillWaiting.load(std::memory_order_relaxed)) { return false; } - int ref_count = --wait.Waiter->RefCount; - assert(ref_count >= 0); + int ref_count = wait.Waiter->RefCount.fetch_sub(1, std::memory_order_acq_rel); + assert(ref_count > 0); - if (ref_count == 0) { + if (ref_count == 1) { wait.Waiter->Destroy(); delete wait.Waiter; } @@ -88,21 +88,18 @@ namespace neosmart { result = pthread_mutex_init(&event->Mutex, 0); assert(result == 0); - // memory_order_relaxed: Newly created event is guaranteed to not have any waiters - event->State.store(false, std::memory_order_relaxed); event->AutoReset = !manualReset; - - if (initialState) { - result = SetEvent(event); - assert(result == 0); - } + // memory_order_release: if `initialState == true`, allow a load with acquire semantics to + // see the value. + event->State.store(initialState, std::memory_order_release); return event; } static int UnlockedWaitForEvent(neosmart_event_t event, uint64_t milliseconds) { int result = 0; - // memory_order_relaxed: unlocking/ordering is guaranteed prior to calling this function + // memory_order_relaxed: `State` is only set to true with the mutex held, and we require + // that this function only be called after the mutex is obtained. if (!event->State.load(std::memory_order_relaxed)) { // Zero-timeout event state check optimization if (milliseconds == 0) { @@ -129,23 +126,24 @@ namespace neosmart { } else { result = pthread_cond_wait(&event->CVariable, &event->Mutex); } - // memory_order_relaxed: ordering is guaranteed by the mutex + // memory_order_relaxed: ordering is guaranteed by the mutex, as `State = true` is + // only ever written with the mutex held. } while (result == 0 && !event->State.load(std::memory_order_relaxed)); - - if (result == 0 && event->AutoReset) { - // We've only accquired the event if the wait succeeded - // memory_order_release: Prevent overlapping/interleaved Set/Reset contexts - event->State.store(false, std::memory_order_release); - } } else if (event->AutoReset) { // It's an auto-reset event that's currently available; // we need to stop anyone else from using it result = 0; - // memory_order_release: Prevent overlapping/interleaved Set/Reset contexts - event->State.store(false, std::memory_order_release); } - // Else we're trying to obtain a manual reset event with a signaled state; - // don't do anything + else { + // We're trying to obtain a manual reset event with a signaled state; don't do anything + } + + if (result == 0 && event->AutoReset) { + // We've only accquired the event if the wait succeeded + // memory_order_relaxed: we never act on `State == true` without fully synchronizing + // or grabbing the mutex, so it's OK to use relaxed semantics here. + event->State.store(false, std::memory_order_relaxed); + } return result; } @@ -191,16 +189,20 @@ namespace neosmart { waitInfo.Waiter = wfmo; waitInfo.WaitIndex = -1; - wfmo->WaitAll = waitAll; - wfmo->StillWaiting = true; - wfmo->RefCount = 1; - if (waitAll) { wfmo->Status.EventsLeft = count; } else { wfmo->Status.FiredEvent = -1; } + wfmo->WaitAll = waitAll; + wfmo->StillWaiting.store(true, std::memory_order_release); + // memory_order_release: this is the initial value other threads should see + wfmo->RefCount.store(1 + count, std::memory_order_release); + // Separately keep track of how many refs to decrement after the initialization loop, to + // avoid repeatedly clearing the cache line. + int skipped_refs = 0; + tempResult = pthread_mutex_lock(&wfmo->Mutex); assert(tempResult == 0); @@ -226,9 +228,11 @@ namespace neosmart { assert(tempResult == 0); if (waitAll) { + ++skipped_refs; --wfmo->Status.EventsLeft; assert(wfmo->Status.EventsLeft >= 0); } else { + skipped_refs += (count - i); wfmo->Status.FiredEvent = i; waitIndex = i; done = true; @@ -236,7 +240,6 @@ namespace neosmart { } } else { events[i]->RegisteredWaits.push_back(waitInfo); - wfmo->RefCount.fetch_add(1, std::memory_order_relaxed); tempResult = pthread_mutex_unlock(&events[i]->Mutex); assert(tempResult == 0); @@ -290,14 +293,18 @@ namespace neosmart { } waitIndex = wfmo->Status.FiredEvent; - wfmo->StillWaiting = false; + // memory_order_relaxed: this is only checked outside the mutex to determine if waiting has + // terminated meaning it's safe to decrement the ref count. If it's true (which we write + // with release semantics), then the mutex is always entered. + wfmo->StillWaiting.store(false, std::memory_order_relaxed); tempResult = pthread_mutex_unlock(&wfmo->Mutex); assert(tempResult == 0); - int ref_count = --wfmo->RefCount; - assert(ref_count >= 0); - if (ref_count == 0) { + // memory_order_seq_cst: Ensure this is run after the wfmo mutex is unlocked + int ref_count = wfmo->RefCount.fetch_sub(1 + skipped_refs, std::memory_order_seq_cst); + assert(ref_count > 0); + if (ref_count == 1 + skipped_refs) { wfmo->Destroy(); delete wfmo; } @@ -335,34 +342,50 @@ namespace neosmart { int result = pthread_mutex_lock(&event->Mutex); assert(result == 0); - // memory_order_release: Unblock threads waiting for the event - event->State.store(true, std::memory_order_release); - // Depending on the event type, we either trigger everyone or only one if (event->AutoReset) { #ifdef WFMO while (!event->RegisteredWaits.empty()) { neosmart_wfmo_info_t i = &event->RegisteredWaits.front(); + // memory_order_relaxed: this is just an optimization to see if it is OK to skip + // this waiter, and if it's observed to be false then it's OK to bypass the mutex at + // that point. + if (!i->Waiter->StillWaiting.load(std::memory_order_relaxed)) { + int ref_count = i->Waiter->RefCount.fetch_sub(1, std::memory_order_acq_rel); + assert(ref_count > 0); + if (ref_count == 1) { + i->Waiter->Destroy(); + delete i->Waiter; + } + + event->RegisteredWaits.pop_front(); + continue; + } + result = pthread_mutex_lock(&i->Waiter->Mutex); assert(result == 0); - int ref_count = --i->Waiter->RefCount; - assert(ref_count >= 0); - if (!i->Waiter->StillWaiting) { + // We have to check `Waiter->StillWaiting` twice, once before locking as an + // optimization to bypass the mutex altogether, and then again after locking the + // WFMO mutex because we could have !waitAll and another event could have ended the + // wait, in which case we must not unlock the same waiter or else a SetEvent() call + // on an auto-reset event may end up with a lost wakeup. + if (!i->Waiter->StillWaiting.load(std::memory_order_relaxed)) { result = pthread_mutex_unlock(&i->Waiter->Mutex); assert(result == 0); - if (ref_count == 0) { + // memory_order_seq_cst: Ensure this is run after the wfmo mutex is unlocked + int ref_count = i->Waiter->RefCount.fetch_sub(1, std::memory_order_seq_cst); + assert(ref_count > 0); + if (ref_count == 1) { i->Waiter->Destroy(); delete i->Waiter; } + event->RegisteredWaits.pop_front(); continue; } - else { - assert(ref_count > 0); - } if (i->Waiter->WaitAll) { --i->Waiter->Status.EventsLeft; @@ -373,7 +396,9 @@ namespace neosmart { // it. } else { i->Waiter->Status.FiredEvent = i->WaitIndex; - i->Waiter->StillWaiting = false; + // memory_order_relaxed: The flip to false is only lazily observed as an + // optimization to bypass the mutex for cleanup. + i->Waiter->StillWaiting.store(false, std::memory_order_relaxed); } result = pthread_mutex_unlock(&i->Waiter->Mutex); @@ -382,10 +407,15 @@ namespace neosmart { result = pthread_cond_signal(&i->Waiter->CVariable); assert(result == 0); - event->RegisteredWaits.pop_front(); + // memory_order_seq_cst: Ensure this is run after the wfmo mutex is unlocked + int ref_count = i->Waiter->RefCount.fetch_sub(1, std::memory_order_seq_cst); + assert(ref_count > 0); + if (ref_count == 1) { + i->Waiter->Destroy(); + delete i->Waiter; + } - // memory_order_release: Prevent overlapping of sequential Set/Reset states. - event->State.store(false, std::memory_order_release); + event->RegisteredWaits.pop_front(); result = pthread_mutex_unlock(&event->Mutex); assert(result == 0); @@ -393,39 +423,44 @@ namespace neosmart { return 0; } #endif // WFMO - // event->State can be false if compiled with WFMO support - // memory_order_relaxed: ordering is ensured by the mutex - if (event->State.load(std::memory_order_relaxed)) { - result = pthread_mutex_unlock(&event->Mutex); - assert(result == 0); + // memory_order_release: this is the synchronization point for any threads spin-waiting + // for the event to become available. + event->State.store(true, std::memory_order_release); - result = pthread_cond_signal(&event->CVariable); - assert(result == 0); + result = pthread_mutex_unlock(&event->Mutex); + assert(result == 0); - return 0; - } + result = pthread_cond_signal(&event->CVariable); + assert(result == 0); + + return 0; } else { // this is a manual reset event + // memory_order_release: this is the synchronization point for any threads spin-waiting + // for the event to become available. + event->State.store(true, std::memory_order_release); #ifdef WFMO for (size_t i = 0; i < event->RegisteredWaits.size(); ++i) { neosmart_wfmo_info_t info = &event->RegisteredWaits[i]; - result = pthread_mutex_lock(&info->Waiter->Mutex); - assert(result == 0); - - --info->Waiter->RefCount; - assert(info->Waiter->RefCount >= 0); - - if (!info->Waiter->StillWaiting) { - bool destroy = info->Waiter->RefCount == 0; - result = pthread_mutex_unlock(&info->Waiter->Mutex); - assert(result == 0); - if (destroy) { + // memory_order_relaxed: this is just an optimization to see if it is OK to skip + // this waiter, and if it's observed to be false then it's OK to bypass the mutex at + // that point. + if (!info->Waiter->StillWaiting.load(std::memory_order_relaxed)) { + int ref_count = info->Waiter->RefCount.fetch_sub(1, std::memory_order_acq_rel); + if (ref_count == 1) { info->Waiter->Destroy(); delete info->Waiter; } continue; } + result = pthread_mutex_lock(&info->Waiter->Mutex); + assert(result == 0); + + // Waiter->StillWaiting may have become true by now, but we're just going to pretend + // it hasn't. So long as we hold a reference to the WFMO, this is safe since manual + // reset events are not one-time use. + if (info->Waiter->WaitAll) { --info->Waiter->Status.EventsLeft; assert(info->Waiter->Status.EventsLeft >= 0); @@ -435,7 +470,9 @@ namespace neosmart { // it. } else { info->Waiter->Status.FiredEvent = info->WaitIndex; - info->Waiter->StillWaiting = false; + // memory_order_relaxed: The flip to false is only lazily observed as an + // optimization to bypass the mutex for cleanup. + info->Waiter->StillWaiting.store(false, std::memory_order_relaxed); } result = pthread_mutex_unlock(&info->Waiter->Mutex); @@ -443,6 +480,15 @@ namespace neosmart { result = pthread_cond_signal(&info->Waiter->CVariable); assert(result == 0); + + // memory_order_seq_cst: Ensure this is run after the wfmo mutex is unlocked + int ref_count = info->Waiter->RefCount.fetch_sub(1, std::memory_order_seq_cst); + assert(ref_count > 0); + if (ref_count == 1) { + info->Waiter->Destroy(); + delete info->Waiter; + } + continue; } event->RegisteredWaits.clear(); #endif // WFMO @@ -457,14 +503,10 @@ namespace neosmart { } int ResetEvent(neosmart_event_t event) { - int result = pthread_mutex_lock(&event->Mutex); - assert(result == 0); - - event->State.store(false, std::memory_order_release); - - result = pthread_mutex_unlock(&event->Mutex); - assert(result == 0); - + // memory_order_relaxed and no mutex: there can't be any guarantees about concurrent calls + // to either of WFMO()/SetEvent() and ResetEvent() because they're racy by nature. Only the + // behavior of concurrent WFMO() and SetEvent() calls is strongly defined. + event->State.store(false, std::memory_order_relaxed); return 0; }