From 950e97e706822680bce361839ab1e7a4dd16cdeb Mon Sep 17 00:00:00 2001 From: Mahmoud Al-Qudsi Date: Sun, 27 Jun 2021 18:42:19 -0500 Subject: [PATCH] Make `WaitForMultipleEvents` atomic when `waitAll` is true This patch significantly changes the behavior (and to an extent, the performance) of pevents to more closely mimic the documented behavior of `WaitForMultipleObjects` and `WaitForMultipleObjectsEx`. As reported in #9, the previous behavior did not make any atomicity guarantees when a call to `WaitForMultipleEvents()` was made with `waitAll = true`, and WFME would attempt to serially obtain the events in question, which could lead to a deadlock in case of circular locking and auto-reset events. The WFMO behavior documented on MSDN makes it clear that the Windows implementation does not modify the signalled state of any of the manual or auto reset events being awaited until the WFMO call is ready to return, at which point either the one event in question or all the events being awaited (dependent on `waitAll`) are atomically awaited. --- src/pevents.cpp | 411 ++++++++++++++++++++++++++++-------------------- 1 file changed, 243 insertions(+), 168 deletions(-) diff --git a/src/pevents.cpp b/src/pevents.cpp index 5934356..45ccd62 100644 --- a/src/pevents.cpp +++ b/src/pevents.cpp @@ -9,11 +9,12 @@ #include "pevents.h" #include +#include #include +#include #include #include -#include -#include +#include #ifdef WFMO #include #include @@ -30,7 +31,7 @@ namespace neosmart { std::atomic RefCount; union { int FiredEvent; // WFSO - int EventsLeft; // WFMO + std::atomic EventsLeft; // WFMO } Status; bool WaitAll; std::atomic StillWaiting; @@ -47,6 +48,7 @@ namespace neosmart { struct neosmart_wfmo_info_t_ { neosmart_wfmo_t Waiter; int WaitIndex; + bool Signalled; }; typedef neosmart_wfmo_info_t_ *neosmart_wfmo_info_t; #endif // WFMO @@ -135,7 +137,7 @@ namespace neosmart { result = 0; } else { - // We're trying to obtain a manual reset event with a signaled state; don't do anything + // We're trying to obtain a manual reset event with a signaled state; don't do anything. } if (result == 0 && event->AutoReset) { @@ -145,6 +147,24 @@ namespace neosmart { event->State.store(false, std::memory_order_relaxed); } +#ifdef WFMO + // Un-signal any registered waiters in case of an auto-reset event + // memory_order_relaxed: we are only trying to observe previous writes in this same thread. + if (!event->State.load(std::memory_order_relaxed)) { + for (auto &wfmo : event->RegisteredWaits) { + // We don't need to lock the WFMO mutex because the event mutex is required to + // change the event-specific `Signalled` flag, and `Status.EventsLeft` is + // synchronized separately. + if (wfmo.Signalled) { + wfmo.Signalled = false; + if (wfmo.Waiter->WaitAll) { + wfmo.Waiter->Status.EventsLeft.fetch_add(1, std::memory_order_acq_rel); + } + } + } + } +#endif + return result; } @@ -198,18 +218,20 @@ namespace neosmart { waitInfo.WaitIndex = -1; if (waitAll) { - wfmo->Status.EventsLeft = count; + // memory_order_release: make the initial value visible + wfmo->Status.EventsLeft.store(count, std::memory_order_release); } else { wfmo->Status.FiredEvent = -1; } wfmo->WaitAll = waitAll; + // memory_order_release: these are the initial values other threads should see wfmo->StillWaiting.store(true, std::memory_order_release); - // memory_order_release: this is the initial value other threads should see wfmo->RefCount.store(1 + count, std::memory_order_release); // Separately keep track of how many refs to decrement after the initialization loop, to // avoid repeatedly clearing the cache line. int skipped_refs = 0; + int events_skipped = 0; tempResult = pthread_mutex_lock(&wfmo->Mutex); assert(tempResult == 0); @@ -217,13 +239,13 @@ namespace neosmart { bool done = false; waitIndex = -1; - for (int i = 0; i < count; ++i) { + for (int i = 0; !done && i < count; ++i) { waitInfo.WaitIndex = i; - // Skip obtaining the mutex for manual reset events. This requires a memory barrier to - // ensure correctness. + // Skip obtaining the mutex for manual reset events. This is only possible for waitAny + // and requires a memory barrier to ensure correctness. bool skipLock = false; - if (!events[i]->AutoReset) { + if (!waitAll && !events[i]->AutoReset) { if (events[i]->State.load(std::memory_order_relaxed) && events[i]->State.load(std::memory_order_acquire)) { skipLock = true; @@ -243,66 +265,116 @@ namespace neosmart { events[i]->RegisteredWaits.end()); } - if (skipLock || UnlockedWaitForEvent(events[i], 0) == 0) { - if (!skipLock) { - tempResult = pthread_mutex_unlock(&events[i]->Mutex); - assert(tempResult == 0); - } - + // Set the signalled flag without modifying (i.e. consuming) the event + // memory_order_relaxed: this field is only changed with the mutex obtained. + waitInfo.Signalled = events[i]->State.load(std::memory_order_relaxed); + if (skipLock || waitInfo.Signalled) { if (waitAll) { - ++skipped_refs; - --wfmo->Status.EventsLeft; - assert(wfmo->Status.EventsLeft >= 0); + ++events_skipped; } else { skipped_refs += (count - i); + + if (!skipLock) { + // Consume the event because we don't need to atomically wait for all + tempResult = UnlockedWaitForEvent(events[i], 0); + assert(tempResult == 0); + } + wfmo->Status.FiredEvent = i; waitIndex = i; done = true; - break; } - } else { + } + + if (!done) { + // Register this wait with the event in question events[i]->RegisteredWaits.push_back(waitInfo); + } + if (!skipLock) { tempResult = pthread_mutex_unlock(&events[i]->Mutex); assert(tempResult == 0); } } - // We set the `done` flag above in case of WaitAny and at least one event was set. - // But we need to check again here if we were doing a WaitAll or else we'll incorrectly - // return WAIT_TIMEOUT. - if (waitAll && wfmo->Status.EventsLeft == 0) { - done = true; + if (waitAll) { + // Decrement EventsLeft in one go to avoid cache line pressure. + int left = wfmo->Status.EventsLeft.fetch_sub(events_skipped, std::memory_order_acq_rel); + assert(left > 0); } timespec ts; - if (!done) { - if (milliseconds == 0) { - result = WAIT_TIMEOUT; - done = true; - } else if (milliseconds != WAIT_INFINITE) { - timeval tv; - gettimeofday(&tv, NULL); + if (!done && milliseconds != WAIT_INFINITE && milliseconds != 0) { + timeval tv; + gettimeofday(&tv, NULL); - uint64_t nanoseconds = ((uint64_t)tv.tv_sec) * 1000 * 1000 * 1000 + - milliseconds * 1000 * 1000 + ((uint64_t)tv.tv_usec) * 1000; + uint64_t nanoseconds = ((uint64_t)tv.tv_sec) * 1000 * 1000 * 1000 + + milliseconds * 1000 * 1000 + ((uint64_t)tv.tv_usec) * 1000; - ts.tv_sec = (time_t) (nanoseconds / 1000 / 1000 / 1000); - ts.tv_nsec = (long) (nanoseconds - ((uint64_t)ts.tv_sec) * 1000 * 1000 * 1000); - } + ts.tv_sec = (time_t)(nanoseconds / 1000 / 1000 / 1000); + ts.tv_nsec = (long)(nanoseconds - ((uint64_t)ts.tv_sec) * 1000 * 1000 * 1000); } while (!done) { // One (or more) of the events we're monitoring has been triggered? - // If we're waiting for all events, assume we're done and check if there's an event that - // hasn't fired But if we're waiting for just one event, assume we're not done until we - // find a fired event - done = (waitAll && wfmo->Status.EventsLeft == 0) || - (!waitAll && wfmo->Status.FiredEvent != -1); + if (!waitAll) { + done = wfmo->Status.FiredEvent != -1; + } + // memory_order_acquire: `EventsLeft` is modified without the WFMO mutex. + else if (wfmo->Status.EventsLeft.load(std::memory_order_acquire) == 0) { + // All events are currently signalled, but we must atomically obtain them before + // returning. + + retry: + bool lockedAtomically = true; + for (int i = 0; i < count; ++i) { + tempResult = pthread_mutex_trylock(&events[i]->Mutex); + if (tempResult == EBUSY) { + // The event state is locked; we can't continue without knowing for sure if + // all the events can be atomically claimed because we risk missing a wake + // otherwise. To avoid a deadlock here, we return all the locks and try + // again. + for (int j = i - 1; j >= 0; --j) { + tempResult = pthread_mutex_unlock(&events[j]->Mutex); + assert(tempResult == 0); + } + goto retry; + } + + assert(tempResult == 0); + // memory_order_relaxed: `State` isn't changed without the mutex held. + if (!events[i]->State.load(std::memory_order_relaxed)) { + // The event has been stolen from under us; since we hold the WFMO lock, it + // should be safe to sleep until a relevant SetEvent() call is made. But + // first, release all the locks we've accumulated. + for (int j = i; j >= 0; --j) { + tempResult = pthread_mutex_unlock(&events[j]->Mutex); + assert(tempResult == 0); + } + lockedAtomically = false; + break; + } + } + + if (lockedAtomically) { + // We have all the locks, so we can atomically consume all the events + for (int i = 0; i < count; ++i) { + tempResult = UnlockedWaitForEvent(events[i], 0); + assert(tempResult == 0); + + tempResult = pthread_mutex_unlock(&events[i]->Mutex); + assert(tempResult == 0); + } + done = true; + } + } if (!done) { - if (milliseconds != WAIT_INFINITE) { + if (milliseconds == 0) { + result = WAIT_TIMEOUT; + done = true; + } else if (milliseconds != WAIT_INFINITE) { result = pthread_cond_timedwait(&wfmo->CVariable, &wfmo->Mutex, &ts); } else { result = pthread_cond_wait(&wfmo->CVariable, &wfmo->Mutex); @@ -364,171 +436,174 @@ namespace neosmart { int result = pthread_mutex_lock(&event->Mutex); assert(result == 0); - // Depending on the event type, we either trigger everyone or only one - if (event->AutoReset) { #ifdef WFMO - while (!event->RegisteredWaits.empty()) { - neosmart_wfmo_info_t i = &event->RegisteredWaits.front(); - - // memory_order_relaxed: this is just an optimization to see if it is OK to skip - // this waiter, and if it's observed to be false then it's OK to bypass the mutex at - // that point. - if (!i->Waiter->StillWaiting.load(std::memory_order_relaxed)) { - int ref_count = i->Waiter->RefCount.fetch_sub(1, std::memory_order_acq_rel); - assert(ref_count > 0); - if (ref_count == 1) { - i->Waiter->Destroy(); - delete i->Waiter; - } - - event->RegisteredWaits.pop_front(); - continue; + bool consumed = false; + for (std::deque::iterator i = event->RegisteredWaits.begin(); + !consumed && i != event->RegisteredWaits.end();) { + + // memory_order_relaxed: this is just an optimization to see if it is OK to skip + // this waiter, and if it's observed to be false then it's OK to bypass the mutex at + // that point. + if (!i->Waiter->StillWaiting.load(std::memory_order_relaxed)) { + int ref_count = i->Waiter->RefCount.fetch_sub(1, std::memory_order_acq_rel); + if (ref_count == 1) { + i->Waiter->Destroy(); + delete i->Waiter; } + i = event->RegisteredWaits.erase(i); + continue; + } - result = pthread_mutex_lock(&i->Waiter->Mutex); + result = pthread_mutex_lock(&i->Waiter->Mutex); + assert(result == 0); + + // We have to check `Waiter->StillWaiting` twice, once before locking as an + // optimization to bypass the mutex altogether, and then again after locking the + // WFMO mutex because we could have !waitAll and another event could have ended the + // wait, in which case we must not unlock the same waiter or else a SetEvent() call + // on an auto-reset event may end up with a lost wakeup. + // memory_order_relaxed: this is only changed with the lock acquired + if (!i->Waiter->StillWaiting.load(std::memory_order_relaxed)) { + result = pthread_mutex_unlock(&i->Waiter->Mutex); assert(result == 0); - // We have to check `Waiter->StillWaiting` twice, once before locking as an - // optimization to bypass the mutex altogether, and then again after locking the - // WFMO mutex because we could have !waitAll and another event could have ended the - // wait, in which case we must not unlock the same waiter or else a SetEvent() call - // on an auto-reset event may end up with a lost wakeup. - if (!i->Waiter->StillWaiting.load(std::memory_order_relaxed)) { + // memory_order_seq_cst: Ensure this is run after the mutex is unlocked + int ref_count = i->Waiter->RefCount.fetch_sub(1, std::memory_order_seq_cst); + assert(ref_count > 0); + + if (ref_count == 1) { + i->Waiter->Destroy(); + delete i->Waiter; + } + i = event->RegisteredWaits.erase(i); + continue; + } + + if (!i->Signalled) { + i->Signalled = true; + if (i->Waiter->WaitAll) { + int left = i->Waiter->Status.EventsLeft.fetch_sub(1, std::memory_order_acq_rel); + assert(left > 0); + + if (left == 1) { + // Wake the waiter but don't consume our event + result = pthread_cond_signal(&i->Waiter->CVariable); + assert(result == 0); + } + } else { + i->Waiter->Status.FiredEvent = i->WaitIndex; + + // If the waiter is waiting on any single event, just consume the call to + // SetEvent() that brought us here (in case of an auto-reset event) and + // stop. + if (event->AutoReset) { + consumed = true; + } + + // Explictly set `StillWaiting` to false ourselves rather than waiting for the + // waiter to do it so another `SetEvent()` on a different autoreset event also + // being awaited by the same waiter isn't also consumed if scheduled before the + // waiter gets around to setting StillWaiting to false. + // memory_order_relaxed: this field is only lazily checked without a mutex to + // bypass loading said mutex for a defunct waiter. + i->Waiter->StillWaiting.store(false, std::memory_order_relaxed); + result = pthread_mutex_unlock(&i->Waiter->Mutex); assert(result == 0); + result = pthread_cond_signal(&i->Waiter->CVariable); + assert(result == 0); + + // Delete the registered wait since we're definitely done with it. // memory_order_seq_cst: Ensure this is run after the wfmo mutex is unlocked int ref_count = i->Waiter->RefCount.fetch_sub(1, std::memory_order_seq_cst); assert(ref_count > 0); + if (ref_count == 1) { i->Waiter->Destroy(); delete i->Waiter; } - event->RegisteredWaits.pop_front(); + i = event->RegisteredWaits.erase(i); continue; } + } - if (i->Waiter->WaitAll) { - --i->Waiter->Status.EventsLeft; - assert(i->Waiter->Status.EventsLeft >= 0); - // We technically should do i->Waiter->StillWaiting = Waiter->Status.EventsLeft - // != 0 but the only time it'll be equal to zero is if we're the last event, so - // no one else will be checking the StillWaiting flag. We're good to go without - // it. - } else { - i->Waiter->Status.FiredEvent = i->WaitIndex; - // memory_order_relaxed: The flip to false is only lazily observed as an - // optimization to bypass the mutex for cleanup. - i->Waiter->StillWaiting.store(false, std::memory_order_relaxed); - } - - result = pthread_mutex_unlock(&i->Waiter->Mutex); - assert(result == 0); - - result = pthread_cond_signal(&i->Waiter->CVariable); - assert(result == 0); - - // memory_order_seq_cst: Ensure this is run after the wfmo mutex is unlocked - int ref_count = i->Waiter->RefCount.fetch_sub(1, std::memory_order_seq_cst); - assert(ref_count > 0); - if (ref_count == 1) { - i->Waiter->Destroy(); - delete i->Waiter; - } - - event->RegisteredWaits.pop_front(); - - result = pthread_mutex_unlock(&event->Mutex); - assert(result == 0); + result = pthread_mutex_unlock(&i->Waiter->Mutex); + assert(result == 0); - return 0; - } -#endif // WFMO - // memory_order_release: this is the synchronization point for any threads spin-waiting - // for the event to become available. - event->State.store(true, std::memory_order_release); + i = ++i; + } + if (consumed) { result = pthread_mutex_unlock(&event->Mutex); assert(result == 0); + return 0; + } +#endif // WFMO + + // memory_order_release: Unblock threads waiting for the event + event->State.store(true, std::memory_order_release); + + result = pthread_mutex_unlock(&event->Mutex); + assert(result == 0); + // Depending on the event type, we either trigger everyone or only one + if (event->AutoReset) { result = pthread_cond_signal(&event->CVariable); assert(result == 0); - - return 0; } else { // this is a manual reset event - // memory_order_release: this is the synchronization point for any threads spin-waiting - // for the event to become available. - event->State.store(true, std::memory_order_release); -#ifdef WFMO - for (size_t i = 0; i < event->RegisteredWaits.size(); ++i) { - neosmart_wfmo_info_t info = &event->RegisteredWaits[i]; - - // memory_order_relaxed: this is just an optimization to see if it is OK to skip - // this waiter, and if it's observed to be false then it's OK to bypass the mutex at - // that point. - if (!info->Waiter->StillWaiting.load(std::memory_order_relaxed)) { - int ref_count = info->Waiter->RefCount.fetch_sub(1, std::memory_order_acq_rel); - if (ref_count == 1) { - info->Waiter->Destroy(); - delete info->Waiter; - } - continue; - } + result = pthread_cond_broadcast(&event->CVariable); + assert(result == 0); + } - result = pthread_mutex_lock(&info->Waiter->Mutex); - assert(result == 0); + return 0; + } - // Waiter->StillWaiting may have become true by now, but we're just going to pretend - // it hasn't. So long as we hold a reference to the WFMO, this is safe since manual - // reset events are not one-time use. - - if (info->Waiter->WaitAll) { - --info->Waiter->Status.EventsLeft; - assert(info->Waiter->Status.EventsLeft >= 0); - // We technically should do i->Waiter->StillWaiting = Waiter->Status.EventsLeft - // != 0 but the only time it'll be equal to zero is if we're the last event, so - // no one else will be checking the StillWaiting flag. We're good to go without - // it. - } else { - info->Waiter->Status.FiredEvent = info->WaitIndex; - // memory_order_relaxed: The flip to false is only lazily observed as an - // optimization to bypass the mutex for cleanup. - info->Waiter->StillWaiting.store(false, std::memory_order_relaxed); - } + int ResetEvent(neosmart_event_t event) { + int result = pthread_mutex_lock(&event->Mutex); + assert(result == 0); - result = pthread_mutex_unlock(&info->Waiter->Mutex); - assert(result == 0); + // memory_order_relaxed: we never act on `State == true` without first obtaining the mutex, + // in which case we'll see the actual value. Additionally, ResetEvent() is a racy call and + // we make no guarantees about its effect on outstanding WFME calls. + event->State.store(false, std::memory_order_relaxed); - result = pthread_cond_signal(&info->Waiter->CVariable); - assert(result == 0); +#ifdef WFMO + for (std::deque::iterator i = event->RegisteredWaits.begin(); + i != event->RegisteredWaits.end();) { - // memory_order_seq_cst: Ensure this is run after the wfmo mutex is unlocked - int ref_count = info->Waiter->RefCount.fetch_sub(1, std::memory_order_seq_cst); + if (!i->Waiter->StillWaiting.load(std::memory_order_relaxed)) { + int ref_count = i->Waiter->RefCount.fetch_sub(1, std::memory_order_acq_rel); assert(ref_count > 0); + if (ref_count == 1) { - info->Waiter->Destroy(); - delete info->Waiter; + i->Waiter->Destroy(); + delete i->Waiter; } + i = event->RegisteredWaits.erase(i); continue; } - event->RegisteredWaits.clear(); -#endif // WFMO - result = pthread_mutex_unlock(&event->Mutex); - assert(result == 0); - result = pthread_cond_broadcast(&event->CVariable); - assert(result == 0); + // We don't need to lock the WFMO mutex because the event mutex (which we hold) is + // required to change the event-specific `Signalled` flag, and `Status.EventsLeft` is + // synchronized separately. + if (i->Signalled) { + if (i->Waiter->WaitAll) { + i->Signalled = false; + i->Waiter->Status.EventsLeft.fetch_add(1, std::memory_order_acq_rel); + } else { + // if !WaitAll then it's too late for this `ResetEvent()` to take effect. + } + } + + ++i; } +#endif // WFMO - return 0; - } + result = pthread_mutex_unlock(&event->Mutex); + assert(result == 0); - int ResetEvent(neosmart_event_t event) { - // memory_order_relaxed and no mutex: there can't be any guarantees about concurrent calls - // to either of WFMO()/SetEvent() and ResetEvent() because they're racy by nature. Only the - // behavior of concurrent WFMO() and SetEvent() calls is strongly defined. - event->State.store(false, std::memory_order_relaxed); return 0; }