diff --git a/dev/ese/published/inc/sync.hxx b/dev/ese/published/inc/sync.hxx index 8bcb059..95495a1 100644 --- a/dev/ese/published/inc/sync.hxx +++ b/dev/ese/published/inc/sync.hxx @@ -316,6 +316,10 @@ extern const INT cbCacheLine; extern BOOL g_fSyncProcessAbort; +// system max spin count + +extern INT g_cSpinMax; + // Atomic Memory Manipulations @@ -1974,40 +1978,28 @@ class CSemaphoreState // ctors / dtors - CSemaphoreState( const CSyncStateInitNull& null ) : m_cAvail( 0 ) {} - CSemaphoreState( const INT cAvail ); - CSemaphoreState( const INT cWait, const INT irksem ); - CSemaphoreState( const CSemaphoreState& state ) - { - // This copy constructor is required because the default copy constructor tries to copy all members of the union - // on top of each other, which is a problem because atomicity is a requirement when copying sync state objects. - C_ASSERT( OffsetOf( CSemaphoreState, m_irksem ) == OffsetOf( CSemaphoreState, m_cAvail ) ); - C_ASSERT( OffsetOf( CSemaphoreState, m_cWaitNeg ) > OffsetOf( CSemaphoreState, m_cAvail ) ); - C_ASSERT( ( OffsetOf( CSemaphoreState, m_cWaitNeg ) + sizeof ( m_cWaitNeg ) ) <= ( OffsetOf( CSemaphoreState, m_cAvail ) + sizeof ( m_cAvail ) ) ); - m_cAvail = state.m_cAvail; - } + CSemaphoreState( const CSyncStateInitNull& null ) : m_cAvail( 0 ), m_cWait( 0 ) {} + CSemaphoreState( const INT cAvail, const INT cWait ) : m_cAvail( cAvail ), m_cWait( cWait ) {} + CSemaphoreState( const CSemaphoreState& state ) : m_qwState( AtomicRead( (QWORD*)&state.m_qwState ) ) {} ~CSemaphoreState() {} // operators - CSemaphoreState& operator=( CSemaphoreState& state ) { m_cAvail = state.m_cAvail; return *this; } + CSemaphoreState& operator=( CSemaphoreState& state ) { m_qwState = AtomicRead( (QWORD*)&state.m_qwState ); return *this; } // manipulators const BOOL FChange( const CSemaphoreState& stateCur, const CSemaphoreState& stateNew ); - const BOOL FIncAvail( const INT cToInc ); + void IncAvail( const INT cToInc ); const BOOL FDecAvail(); + void IncWait(); + void DecWait(); // accessors - const BOOL FNoWait() const { return m_cAvail >= 0; } - const BOOL FWait() const { return m_cAvail < 0; } - const BOOL FAvail() const { return m_cAvail > 0; } - const BOOL FNoWaitAndNoAvail() const { return m_cAvail == 0; } - - const INT CAvail() const { OSSYNCAssert( FNoWait() ); return m_cAvail; } - const INT CWait() const { OSSYNCAssert( FWait() ); return -m_cWaitNeg; } - const CKernelSemaphorePool::IRKSEM Irksem() const { OSSYNCAssert( FWait() ); return CKernelSemaphorePool::IRKSEM( m_irksem ); } + const INT CAvail() const { return m_cAvail; } + const INT CWait() const { return m_cWait; } + volatile LONG * GetAvailAddress() { return &m_cAvail; } // debugging support @@ -2017,129 +2009,33 @@ class CSemaphoreState // data members - // transacted state representation (switched on bit 31) + // transacted state representation union { - // Mode 0: no waiters - - volatile LONG m_cAvail; // 0 <= m_cAvail <= ( 1 << 31 ) - 1 - - // Mode 1: waiters + volatile QWORD m_qwState; struct { - volatile USHORT m_irksem; // 0 <= m_irksem <= ( 1 << 16 ) - 2 - volatile SHORT m_cWaitNeg; // -( ( 1 << 15 ) - 1 ) <= m_cWaitNeg <= -1 + volatile LONG m_cAvail; + volatile LONG m_cWait; }; }; }; -// ctor - -inline CSemaphoreState::CSemaphoreState( const INT cAvail ) -{ - // validate IN args - - OSSYNCAssert( cAvail >= 0 ); - OSSYNCAssert( cAvail <= 0x7FFFFFFF ); - - // set available count - - m_cAvail = LONG( cAvail ); -} - -// ctor - -inline CSemaphoreState::CSemaphoreState( const INT cWait, const INT irksem ) -{ - // validate IN args - - OSSYNCAssert( cWait > 0 ); - OSSYNCAssert( cWait <= 0x7FFF ); - OSSYNCAssert( irksem >= 0 ); - OSSYNCAssert( irksem <= 0xFFFE ); - - // set waiter count - - m_cWaitNeg = SHORT( -cWait ); - - // set semaphore - - m_irksem = (USHORT) irksem; -} - // changes the transacted state of the semaphore using a transacted memory // compare/exchange operation, returning fFalse on failure inline const BOOL CSemaphoreState::FChange( const CSemaphoreState& stateCur, const CSemaphoreState& stateNew ) { - return AtomicCompareExchange( (LONG*)&m_cAvail, stateCur.m_cAvail, stateNew.m_cAvail ) == stateCur.m_cAvail; + return AtomicCompareExchange( (QWORD*)&m_qwState, stateCur.m_qwState, stateNew.m_qwState ) == stateCur.m_qwState; } -// tries to increase the available count on the semaphore by the count -// given using a transacted memory compare/exchange operation, returning fFalse -// on failure +// atomically increase the available count on the semaphore by the count given -__forceinline const BOOL CSemaphoreState::FIncAvail( const INT cToInc ) +__forceinline void CSemaphoreState::IncAvail( const INT cToInc ) { - // try forever to change the state of the semaphore - - OSSYNC_FOREVER - { - // get current value - - const LONG cAvail = m_cAvail; - - // munge start value such that the transaction will only work if we are in - // mode 0 (we do this to save a branch) - - const LONG cAvailStart = cAvail & 0x7FFFFFFF; - - // compute end value relative to munged start value - - const LONG cAvailEnd = cAvailStart + cToInc; - - // validate transaction - - OSSYNCAssert( cAvail < 0 || ( cAvailStart >= 0 && cAvailEnd <= 0x7FFFFFFF && cAvailEnd == cAvailStart + cToInc ) ); - - // attempt the transaction - - const LONG cAvailOld = AtomicCompareExchange( (LONG*)&m_cAvail, cAvailStart, cAvailEnd ); - - // the transaction succeeded - - if ( cAvailOld == cAvailStart ) - { - // return success - - return fTrue; - } - - // the transaction failed - - else - { - // the transaction failed because of a collision with another context - - if ( cAvailOld >= 0 ) - { - // try again - - continue; - } - - // the transaction failed because there are waiters - - else - { - // return failure - - return fFalse; - } - } - } + AtomicExchangeAdd( (LONG*)&m_cAvail, cToInc ); } // tries to decrease the available count on the semaphore by one using a @@ -2155,61 +2051,38 @@ __forceinline const BOOL CSemaphoreState::FDecAvail() const LONG cAvail = m_cAvail; - // this function has no effect on 0x80000000, so this MUST be an illegal - // value! - - OSSYNCAssert( cAvail != 0x80000000 ); - - // munge end value such that the transaction will only work if we are in - // mode 0 and we have at least one available count (we do this to save a - // branch) + // see if we have an available count - const LONG cAvailEnd = ( cAvail - 1 ) & 0x7FFFFFFF; - - // compute start value relative to munged end value - - const LONG cAvailStart = cAvailEnd + 1; - - // validate transaction - - OSSYNCAssert( cAvail <= 0 || ( cAvailStart > 0 && cAvailEnd >= 0 && cAvailEnd == cAvail - 1 ) ); - - // attempt the transaction - - const LONG cAvailOld = AtomicCompareExchange( (LONG*)&m_cAvail, cAvailStart, cAvailEnd ); - - // the transaction succeeded - - if ( cAvailOld == cAvailStart ) + if ( cAvail == 0 ) { - // return success + // we do not have an available count, return failure - return fTrue; + return fFalse; } - // the transaction failed + // we have an available count, attempt the transaction - else + else if ( AtomicCompareExchange( (LONG*)&m_cAvail, cAvail, cAvail - 1 ) == cAvail ) { - // the transaction failed because of a collision with another context + // the transaction succeeded, return success - if ( cAvailOld > 0 ) - { - // try again + return fTrue; + } + } +} - continue; - } +// atomically increment the number of waiters - // the transaction failed because there are no available counts +__forceinline void CSemaphoreState::IncWait() +{ + AtomicIncrement( (LONG*)&m_cWait ); +} - else - { - // return failure +// atomically decrement the number of waiters - return fFalse; - } - } - } +__forceinline void CSemaphoreState::DecWait() +{ + AtomicDecrement( (LONG*)&m_cWait ); } @@ -2257,9 +2130,15 @@ class CSemaphore // manipulators - const BOOL _FAcquire( const INT cmsecTimeout ); - const BOOL _FWait( const INT cmsecTimeout ); + static const DWORD _DwOSTimeout( const INT cmsecTimeout ); + + // NOTE: all private methods use the OS level (DWORD) timeout values + + const BOOL _FTryAcquire( const INT cSpin ); + const BOOL _FAcquire( const DWORD dwTimeout ); void _Release( const INT cToRelease ); + const BOOL _FWait( const LONG cAvail, const DWORD dwTimeout ); + const BOOL _FOSWait( const LONG cAvail, const DWORD dwTimeout ); }; // acquire one count of the semaphore, waiting forever if necessary @@ -2288,29 +2167,20 @@ inline void CSemaphore::Wait() inline const BOOL CSemaphore::FTryAcquire() { - // only try to perform a simple decrement of the available count - - const BOOL fAcquire = State().FDecAvail(); - - // we did not acquire the semaphore - - if ( !fAcquire ) + if ( _FTryAcquire( 0 ) ) { - // this is a contention + // we successfully acquired the semaphore - State().SetContend(); + State().SetAcquire(); + return fTrue; } - - // we did acquire the semaphore - else { - // note that we acquired a count + // we did not acquire the semaphore, this is a contention - State().SetAcquire(); + State().SetContend(); + return fFalse; } - - return fAcquire; } // acquire one count of the semaphore, waiting only for the specified interval. @@ -2321,7 +2191,20 @@ inline const BOOL CSemaphore::FAcquire( const INT cmsecTimeout ) // first try to quickly grab an available count. if that doesn't work, // retry acquiring using the full state machine - return FTryAcquire() || _FAcquire( cmsecTimeout ); + if ( _FTryAcquire( g_cSpinMax ) || _FAcquire( _DwOSTimeout( cmsecTimeout ) ) ) + { + // we successfully acquired the semaphore + + State().SetAcquire(); + return fTrue; + } + else + { + // we did not acquire the semaphore, this is a contention + + State().SetContend(); + return fFalse; + } } // wait for the semaphore to become available without acquiring it, waiting only for the specified interval. @@ -2330,10 +2213,32 @@ inline const BOOL CSemaphore::FAcquire( const INT cmsecTimeout ) inline const BOOL CSemaphore::FWait( const INT cmsecTimeout ) { - // first try to quickly check for an available count. if that doesn't work, - // retry waiting using the full state machine + // first try to quickly check for an available count + + if ( State().CAvail() > 0 ) + { + return fTrue; + } + + // if that doesn't work, try to grab an available count without spinning. + // if that also doesn't work, attempt acquiring using the full state machine - return ( CAvail() > 0 ) || _FWait( cmsecTimeout ); + if ( _FTryAcquire( 0 ) || _FAcquire( _DwOSTimeout( cmsecTimeout ) ) ) + { + // we successfully acquired the semaphore, release it + + _Release( 1 ); + + State().SetAcquire(); + return fTrue; + } + else + { + // we did not acquire the semaphore, this is a contention + + State().SetContend(); + return fFalse; + } } // releases the given number of counts to the semaphore, waking the appropriate @@ -2341,40 +2246,91 @@ inline const BOOL CSemaphore::FWait( const INT cmsecTimeout ) inline void CSemaphore::Release( const INT cToRelease ) { - // we failed to perform a simple increment of the available count - - if ( !State().FIncAvail( cToRelease ) ) - { - // retry release using the full state machine - - _Release( cToRelease ); - } + _Release( cToRelease ); } // returns the number of execution contexts waiting on the semaphore inline const INT CSemaphore::CWait() const { - // read the current state of the semaphore + // try forever until we get a non-transitional state - const CSemaphoreState stateCur = (CSemaphoreState&) State(); + OSSYNC_FOREVER + { + // read the current state of the semaphore + + const CSemaphoreState stateCur = (CSemaphoreState&) State(); + + if ( stateCur.CAvail() > 0 && stateCur.CWait() > 0 ) + { + // the existing waiters are in transition, retry - // return the waiter count + continue; + } + else + { + // return the waiter count - return stateCur.FWait() ? stateCur.CWait() : 0; + return stateCur.CWait(); + } + } } // returns the number of available counts on the semaphore inline const INT CSemaphore::CAvail() const { - // read the current state of the semaphore + return State().CAvail(); +} - const CSemaphoreState stateCur = (CSemaphoreState&) State(); +// try to acquire one count of the semaphore, entering a loop which iterates +// up to cSpin times. returns fFalse if a count could not be acquired - // return the available count +inline const BOOL CSemaphore::_FTryAcquire( const INT cSpin ) +{ + INT cSpinRemaining = cSpin; + + // try forever to acquire the semaphore + + OSSYNC_FOREVER + { + // read the current state of the semaphore + + const CSemaphoreState stateCur = (CSemaphoreState&) State(); + + // see if we have an available count + // + // NOTE: we do not acquire the semaphore with waiting threads to + // avoid stealing it from those waiting threads themselves + + if ( stateCur.CAvail() == 0 || stateCur.CWait() > 0 ) + { + if ( cSpinRemaining ) + { + // we do not have an available count, but can keep spinning - return stateCur.FNoWait() ? stateCur.CAvail() : 0; + cSpinRemaining--; + + continue; + } + else + { + // we do not have an available count and have reached the + // spin limit, return failure + + return fFalse; + } + } + + // we have an available count, attempt the transaction + + else if ( State().FDecAvail() ) + { + // the transaction succeeded, return success + + return fTrue; + } + } } diff --git a/dev/ese/src/inc/_bf.hxx b/dev/ese/src/inc/_bf.hxx index 926a7a7..ee34ae0 100644 --- a/dev/ese/src/inc/_bf.hxx +++ b/dev/ese/src/inc/_bf.hxx @@ -409,7 +409,7 @@ C_ASSERT( sizeof( BF::bfbitfield ) == sizeof( FLAG32 ) ); #ifdef _WIN64 C_ASSERT( sizeof(BF) == 192 ); #else // !_WIN64 -C_ASSERT( sizeof(BF) == 160 ); +C_ASSERT( sizeof(BF) == 176 ); #endif // _WIN64 // Buffer Manager Global Flags diff --git a/dev/ese/src/inc/pib.hxx b/dev/ese/src/inc/pib.hxx index 377d7a0..9ef9a88 100644 --- a/dev/ese/src/inc/pib.hxx +++ b/dev/ese/src/inc/pib.hxx @@ -521,7 +521,7 @@ public: #ifdef _WIN64 C_ASSERT( sizeof(PIB) == 608 ); #else // !_WIN64 -C_ASSERT( sizeof(PIB) == 504 ); +C_ASSERT( sizeof(PIB) == 528 ); #endif // _WIN64 INLINE SIZE_T OffsetOfTrxOldestILE() { return OffsetOf( PIB, m_ileTrxOldest ); } diff --git a/dev/ese/src/inc/tdb.hxx b/dev/ese/src/inc/tdb.hxx index 0fc4729..9101e59 100644 --- a/dev/ese/src/inc/tdb.hxx +++ b/dev/ese/src/inc/tdb.hxx @@ -483,8 +483,6 @@ class TDB #ifdef _AMD64_ BYTE m_bReserved2[8]; // for alignment. fileopen.cxx: C_ASSERT( sizeof(TDB) % 16 == 0 ); -#else - BYTE m_bReserved2[4]; // for alignment. fileopen.cxx: C_ASSERT( sizeof(TDB) % 16 == 0 ); #endif // 208 / 272 bytes (amd64) diff --git a/dev/ese/src/sync/CMakeLists.txt b/dev/ese/src/sync/CMakeLists.txt index da2e8da..0dc560f 100644 --- a/dev/ese/src/sync/CMakeLists.txt +++ b/dev/ese/src/sync/CMakeLists.txt @@ -10,4 +10,8 @@ add_library(sync STATIC target_include_directories(sync PRIVATE ${INC_OUTPUT_DIRECTORY}/jet +) + +target_link_libraries(sync + synchronization ) \ No newline at end of file diff --git a/dev/ese/src/sync/sync.cxx b/dev/ese/src/sync/sync.cxx index b54096a..1e5cfd8 100644 --- a/dev/ese/src/sync/sync.cxx +++ b/dev/ese/src/sync/sync.cxx @@ -332,492 +332,6 @@ CSyncPerfAcquire::~CSyncPerfAcquire() } -// Semaphore - -// ctor - -CSemaphore::CSemaphore( const CSyncBasicInfo& sbi ) - : CEnhancedStateContainer< CSemaphoreState, CSyncStateInitNull, CSemaphoreInfo, CSyncBasicInfo >( syncstateNull, sbi ) -{ - // further init of CSyncBasicInfo - - State().SetTypeName( "CSemaphore" ); - State().SetInstance( (CSyncObject*)this ); -} - -// dtor - -CSemaphore::~CSemaphore() -{ -#ifdef SYNC_ANALYZE_PERFORMANCE -#ifdef SYNC_DUMP_PERF_DATA - - // dump performance data - - OSSyncStatsDump( State().SzTypeName(), - State().SzInstanceName(), - State().Instance(), - (DWORD)-1, - State().CWaitTotal(), - State().CsecWaitElapsed(), - State().CAcquireTotal(), - State().CContendTotal(), - 0, - 0 ); - -#endif // SYNC_DUMP_PERF_DATA -#endif // SYNC_ANALYZE_PERFORMANCE -} - -// releases all waiters on the semaphore - -void CSemaphore::ReleaseAllWaiters() -{ - // try forever until we successfully change the state of the semaphore - - OSSYNC_FOREVER - { - // read the current state of the semaphore - - const CSemaphoreState stateCur = State(); - - // there are no waiters - - if ( stateCur.FNoWait() ) - { - // we're done - - return; - } - - // there are waiters - - else - { - OSSYNCAssert( stateCur.FWait() ); - - // we successfully changed the semaphore to have an available count - // of zero - - if ( State().FChange( stateCur, CSemaphoreState( 0 ) ) ) - { - // release all waiters - - g_ksempoolGlobal.Ksem( stateCur.Irksem(), this ).Release( stateCur.CWait() ); - - // we're done - - return; - } - } - } -} - -// attempts to acquire a count from the semaphore, returning fFalse if unsuccessful -// in the time permitted. Infinite and Test-Only timeouts are supported. - -const BOOL CSemaphore::_FAcquire( const INT cmsecTimeout ) -{ - // if we spin, we will spin for the full amount recommended by the OS - - INT cSpin = g_cSpinMax; - - // we start with no kernel semaphore allocated - - CKernelSemaphorePool::IRKSEM irksemAlloc = CKernelSemaphorePool::irksemNil; - - // try forever until we successfully change the state of the semaphore - - OSSYNC_FOREVER - { - // read the current state of the semaphore - - const CSemaphoreState stateCur = (CSemaphoreState&) State(); - - // there is an available count - - if ( stateCur.FAvail() ) - { - // we successfully took a count - - if ( State().FChange( stateCur, CSemaphoreState( stateCur.CAvail() - 1 ) ) ) - { - // if we allocated a kernel semaphore, release it - - if ( irksemAlloc != CKernelSemaphorePool::irksemNil ) - { - g_ksempoolGlobal.Unreference( irksemAlloc ); - } - - // return success - - State().SetAcquire(); - return fTrue; - } - } - - // there is no available count and we still have spins left - - else if ( cSpin ) - { - // spin once and try again - - cSpin--; - continue; - } - - // there are no waiters and no available counts - - else if ( stateCur.FNoWaitAndNoAvail() ) - { - // allocate and reference a kernel semaphore if we haven't already - - if ( irksemAlloc == CKernelSemaphorePool::irksemNil ) - { - irksemAlloc = g_ksempoolGlobal.Allocate( this ); - } - - // we successfully installed ourselves as the first waiter - - if ( State().FChange( stateCur, CSemaphoreState( 1, irksemAlloc ) ) ) - { - // wait for next available count on semaphore - - State().StartWait(); - const BOOL fCompleted = g_ksempoolGlobal.Ksem( irksemAlloc, this ).FAcquire( cmsecTimeout ); - State().StopWait(); - - // our wait completed - - if ( fCompleted ) - { - // unreference the kernel semaphore - - g_ksempoolGlobal.Unreference( irksemAlloc ); - - // we successfully acquired a count - - State().SetAcquire(); - return fTrue; - } - - // our wait timed out - - else - { - // try forever until we successfully change the state of the semaphore - - OSSYNC_INNER_FOREVER - { - // read the current state of the semaphore - - const CSemaphoreState stateAfterWait = (CSemaphoreState&) State(); - - // there are no waiters or the kernel semaphore currently - // in the semaphore is not the same as the one we allocated - - if ( stateAfterWait.FNoWait() || stateAfterWait.Irksem() != irksemAlloc ) - { - // the kernel semaphore we allocated is no longer in - // use, so another context released it. this means that - // there is a count on the kernel semaphore that we must - // absorb, so we will - - // NOTE: we could end up blocking because the releasing - // context may not have released the semaphore yet - - g_ksempoolGlobal.Ksem( irksemAlloc, this ).Acquire(); - - // unreference the kernel semaphore - - g_ksempoolGlobal.Unreference( irksemAlloc ); - - // we successfully acquired a count - - return fTrue; - } - - // there is one waiter and the kernel semaphore currently - // in the semaphore is the same as the one we allocated - - else if ( stateAfterWait.CWait() == 1 ) - { - OSSYNCAssert( stateAfterWait.FWait() ); - OSSYNCAssert( stateAfterWait.Irksem() == irksemAlloc ); - - // we successfully changed the semaphore to have no - // available counts and no waiters - - if ( State().FChange( stateAfterWait, CSemaphoreState( 0 ) ) ) - { - // unreference the kernel semaphore - - g_ksempoolGlobal.Unreference( irksemAlloc ); - - // we did not successfully acquire a count - - return fFalse; - } - } - - // there are many waiters and the kernel semaphore currently - // in the semaphore is the same as the one we allocated - - else - { - OSSYNCAssert( stateAfterWait.CWait() > 1 ); - OSSYNCAssert( stateAfterWait.FWait() ); - OSSYNCAssert( stateAfterWait.Irksem() == irksemAlloc ); - - // we successfully reduced the number of waiters on the - // semaphore by one - - if ( State().FChange( stateAfterWait, CSemaphoreState( stateAfterWait.CWait() - 1, stateAfterWait.Irksem() ) ) ) - { - // unreference the kernel semaphore - - g_ksempoolGlobal.Unreference( irksemAlloc ); - - // we did not successfully acquire a count - - return fFalse; - } - } - } - } - } - } - - // there are waiters - - else - { - OSSYNCAssert( stateCur.FWait() ); - - // reference the kernel semaphore already in use - - g_ksempoolGlobal.Reference( stateCur.Irksem() ); - - // we successfully added ourself as another waiter - - if ( State().FChange( stateCur, CSemaphoreState( stateCur.CWait() + 1, stateCur.Irksem() ) ) ) - { - // if we allocated a kernel semaphore, unreference it - - if ( irksemAlloc != CKernelSemaphorePool::irksemNil ) - { - g_ksempoolGlobal.Unreference( irksemAlloc ); - } - - // wait for next available count on semaphore - - State().StartWait(); - const BOOL fCompleted = g_ksempoolGlobal.Ksem( stateCur.Irksem(), this ).FAcquire( cmsecTimeout ); - State().StopWait(); - - // our wait completed - - if ( fCompleted ) - { - // unreference the kernel semaphore - - g_ksempoolGlobal.Unreference( stateCur.Irksem() ); - - // we successfully acquired a count - - State().SetAcquire(); - return fTrue; - } - - // our wait timed out - - else - { - // try forever until we successfully change the state of the semaphore - - OSSYNC_INNER_FOREVER - { - // read the current state of the semaphore - - const CSemaphoreState stateAfterWait = (CSemaphoreState&) State(); - - // there are no waiters or the kernel semaphore currently - // in the semaphore is not the same as the one we waited on - - if ( stateAfterWait.FNoWait() || stateAfterWait.Irksem() != stateCur.Irksem() ) - { - // the kernel semaphore we waited on is no longer in - // use, so another context released it. this means that - // there is a count on the kernel semaphore that we must - // absorb, so we will - - // NOTE: we could end up blocking because the releasing - // context may not have released the semaphore yet - - g_ksempoolGlobal.Ksem( stateCur.Irksem(), this ).Acquire(); - - // unreference the kernel semaphore - - g_ksempoolGlobal.Unreference( stateCur.Irksem() ); - - // we successfully acquired a count - - return fTrue; - } - - // there is one waiter and the kernel semaphore currently - // in the semaphore is the same as the one we waited on - - else if ( stateAfterWait.CWait() == 1 ) - { - OSSYNCAssert( stateAfterWait.FWait() ); - OSSYNCAssert( stateAfterWait.Irksem() == stateCur.Irksem() ); - - // we successfully changed the semaphore to have no - // available counts and no waiters - - if ( State().FChange( stateAfterWait, CSemaphoreState( 0 ) ) ) - { - // unreference the kernel semaphore - - g_ksempoolGlobal.Unreference( stateCur.Irksem() ); - - // we did not successfully acquire a count - - return fFalse; - } - } - - // there are many waiters and the kernel semaphore currently - // in the semaphore is the same as the one we waited on - - else - { - OSSYNCAssert( stateAfterWait.CWait() > 1 ); - OSSYNCAssert( stateAfterWait.FWait() ); - OSSYNCAssert( stateAfterWait.Irksem() == stateCur.Irksem() ); - - // we successfully reduced the number of waiters on the - // semaphore by one - - if ( State().FChange( stateAfterWait, CSemaphoreState( stateAfterWait.CWait() - 1, stateAfterWait.Irksem() ) ) ) - { - // unreference the kernel semaphore - - g_ksempoolGlobal.Unreference( stateCur.Irksem() ); - - // we did not successfully acquire a count - - return fFalse; - } - } - } - } - } - - // unreference the kernel semaphore - - g_ksempoolGlobal.Unreference( stateCur.Irksem() ); - } - } -} - -// attempts to wait for an available a count from the semaphore, returning -// fFalse if unsuccessful in the time permitted or fTrue if one is available. -// Infinite and Test-Only timeouts are supported. This method does not acquire -// a resource, just waits for an available one. CAvail() may decrease for short -// amount of time if we need to go for the full state machine to wait on a kernel -// semaphore. - -const BOOL CSemaphore::_FWait( const INT cmsecTimeout ) -{ - if ( _FAcquire( cmsecTimeout ) ) - { - Release(); - return fTrue; - } - - return fFalse; -} - -// releases the given number of counts to the semaphore, waking the appropriate -// number of waiters - -void CSemaphore::_Release( const INT cToRelease ) -{ - // try forever until we successfully change the state of the semaphore - - OSSYNC_FOREVER - { - // read the current state of the semaphore - - const CSemaphoreState stateCur = State(); - - // there are no waiters - - if ( stateCur.FNoWait() ) - { - // we successfully added the count to the semaphore - - if ( State().FChange( stateCur, CSemaphoreState( stateCur.CAvail() + cToRelease ) ) ) - { - // we're done - - return; - } - } - - // there are waiters - - else - { - OSSYNCAssert( stateCur.FWait() ); - - // we are releasing more counts than waiters (or equal to) - - if ( stateCur.CWait() <= cToRelease ) - { - // we successfully changed the semaphore to have an available count - // that is equal to the specified release count minus the number of - // waiters to release - - if ( State().FChange( stateCur, CSemaphoreState( cToRelease - stateCur.CWait() ) ) ) - { - // release all waiters - - g_ksempoolGlobal.Ksem( stateCur.Irksem(), this ).Release( stateCur.CWait() ); - - // we're done - - return; - } - } - - // we are releasing less counts than waiters - - else - { - OSSYNCAssert( stateCur.CWait() > cToRelease ); - - // we successfully reduced the number of waiters on the semaphore by - // the number specified - - if ( State().FChange( stateCur, CSemaphoreState( stateCur.CWait() - cToRelease, stateCur.Irksem() ) ) ) - { - // release the specified number of waiters - - g_ksempoolGlobal.Ksem( stateCur.Irksem(), this ).Release( cToRelease ); - - // we're done - - return; - } - } - } - } -} - - // Auto-Reset Signal // ctor @@ -4303,6 +3817,325 @@ void CKernelSemaphore::Release( const INT cToRelease ) } +// Semaphore + +// ctor + +CSemaphore::CSemaphore( const CSyncBasicInfo& sbi ) + : CEnhancedStateContainer< CSemaphoreState, CSyncStateInitNull, CSemaphoreInfo, CSyncBasicInfo >( syncstateNull, sbi ) +{ + // further init of CSyncBasicInfo + + State().SetTypeName( "CSemaphore" ); + State().SetInstance( (CSyncObject*)this ); +} + +// dtor + +CSemaphore::~CSemaphore() +{ +#ifdef SYNC_ANALYZE_PERFORMANCE +#ifdef SYNC_DUMP_PERF_DATA + + // dump performance data + + OSSyncStatsDump( State().SzTypeName(), + State().SzInstanceName(), + State().Instance(), + (DWORD)-1, + State().CWaitTotal(), + State().CsecWaitElapsed(), + State().CAcquireTotal(), + State().CContendTotal(), + 0, + 0 ); + +#endif // SYNC_DUMP_PERF_DATA +#endif // SYNC_ANALYZE_PERFORMANCE +} + +// converts the internal timeout value to an OS level timeout + +const DWORD CSemaphore::_DwOSTimeout( const INT cmsecTimeout ) +{ + if ( cmsecTimeout == cmsecInfinite || cmsecTimeout == cmsecInfiniteNoDeadlock ) + { + return INFINITE; + } + else if ( cmsecTimeout >= 0 ) + { + return cmsecTimeout; + } + else + { + return 0; + } +} + +// attempts to acquire a count from the semaphore, returning fFalse if unsuccessful +// in the time permitted + +const BOOL CSemaphore::_FAcquire( const DWORD dwTimeout ) +{ + const DWORD dwStart = DwOSSyncITickTime(); + DWORD dwRemaining = dwTimeout; + + // try forever until we successfully change the state of the semaphore + + State().IncWait(); + OSSYNC_FOREVER + { + // read the current state of the semaphore + + const CSemaphoreState stateCur = (CSemaphoreState&) State(); + + // see if we have an available count + + if ( stateCur.CAvail() > 0 ) + { + // we ourselves are waiting on the semaphore, so there had better be at least + // one waiter in the state + + OSSYNCAssert( stateCur.CWait() > 0 ); + + // try to atomically acquire the semaphore and decrement the number of waiters + + const CSemaphoreState stateNew( stateCur.CAvail() - 1, stateCur.CWait() - 1 ); + + if ( State().FChange( stateCur, stateNew ) ) + { + // the transaction succeeded, return success + + return fTrue; + } + } + else if ( dwRemaining == 0 ) + { + // we were unable to acquire the semaphore in the time permitted + + break; + } + else + { + // check and update the remaining time + + const DWORD dwElapsed = DwOSSyncITickTime() - dwStart; + + if ( dwElapsed > dwTimeout ) + { + // we were unable to acquire the semaphore in the time permitted + + break; + } + + dwRemaining = dwTimeout - dwElapsed; + + // wait for the semaphore counter value to change + // + // NOTE: we may have exited the wait due to a spurious OS level wake up, + // so the state machine ensures that the available count is re-checked on + // each iteration + + if ( !_FWait( stateCur.CAvail(), dwRemaining ) ) + { + // we were unable to acquire the semaphore in the time permitted + + break; + } + } + } + State().DecWait(); + + return fFalse; +} + +// releases all waiters on the semaphore + +void CSemaphore::ReleaseAllWaiters() +{ + // try forever until we successfully change the state of the semaphore + + OSSYNC_FOREVER + { + // read the current state of the semaphore + + const CSemaphoreState stateCur = (CSemaphoreState&) State(); + + if ( stateCur.CAvail() > 0 && stateCur.CWait() > 0 ) + { + // the existing waiters are in transition, retry + + continue; + } + else if ( stateCur.CWait() <= 0 ) + { + // there are no waiters, we're done + + return; + } + else + { + // attempt to change the semaphore to have an available count equal + // to the number of waiters + + const CSemaphoreState stateNew( stateCur.CWait(), stateCur.CWait() ); + + if ( State().FChange( stateCur, stateNew ) ) + { + volatile LONG *pcAvail = State().GetAvailAddress(); + + // wake all waiting threads + + WakeByAddressAll( (void*)pcAvail ); + + // we're done + + return; + } + } + } +} + +// releases the given number of counts to the semaphore, waking the appropriate +// number of waiters + +void CSemaphore::_Release( const INT cToRelease ) +{ + if ( cToRelease <= 0 ) + { + return; + } + + // release the required number of counts + + State().IncAvail( cToRelease ); + + // check to see if we have any waiting threads to wake + + const INT cWait = State().CWait(); + + if ( cWait == 0 ) + { + // no one is waiting + } + else if ( cWait <= cToRelease ) + { + volatile LONG *pcAvail = State().GetAvailAddress(); + + // no more waiting threads than cToRelease, wake everyone + + WakeByAddressAll( (void*)pcAvail ); + } + else + { + volatile LONG *pcAvail = State().GetAvailAddress(); + + // wake at most cToRelease threads, as the benefit from not waking + // unnecessary threads is expected to be greater than the loss on + // extra calls below + + for ( INT i = 0; i < cToRelease; i++ ) + { + WakeByAddressSingle( (void*)pcAvail ); + } + } +} + +// waits until the semaphore counter value changes. this method has the same +// semantics as the WaitOnAddress() function and is guaranteed to return when +// the address is signaled, but it is also allowed to return for other reasons. +// the caller should compare the new value with the original + +const BOOL CSemaphore::_FWait( const LONG cAvail, const DWORD dwTimeout ) +{ + PERFOpt( AtomicIncrement( (LONG*)&g_cOSSYNCThreadBlock ) ); + State().StartWait(); + + // wait for semaphore + + BOOL fSuccess; + +#ifdef SYNC_DEADLOCK_DETECTION + if (dwTimeout > cmsecDeadlock ) + { + fSuccess = _FOSWait( cAvail, cmsecDeadlock ); + if ( !fSuccess ) + { +#ifdef DEBUG + SYNCDeadLockTimeOutState sdltosStatePre = sdltosEnabled; + OSSYNC_FOREVER // spin until we get a non- check-in-progress state ... + { + C_ASSERT( sizeof(g_sdltosState) == sizeof(LONG) ); + sdltosStatePre = (SYNCDeadLockTimeOutState)AtomicCompareExchange( (LONG*)&g_sdltosState, sdltosEnabled, sdltosCheckInProgress ); + if ( sdltosStatePre != sdltosCheckInProgress ) + { + break; + } + Sleep( 16 ); + } +#else + SYNCDeadLockTimeOutState sdltosStatePre = sdltosEnabled; +#endif + + OSSYNCAssertSzRTL( fSuccess /* superflous */ || sdltosStatePre == sdltosDisabled, "Potential Deadlock Detected (Timeout); FYI ed [dll]!OSSYNC::g_sdltosState to 0 to disable." ); + +#ifdef DEBUG + if ( sdltosStatePre != sdltosDisabled ) + { + // needs re-enabling (if SOMEONE/debugger didn't play with state) + OSSYNCAssert( sdltosStatePre != sdltosCheckInProgress ); // that'd be wrong on convergence loop above. + + // while it seems really simple this should alwyas be reset, this is designed to allow the user to kill + // timeout detection dynamically in the debugger by setting g_sdltosState = 0 (i.e. sdltosDisabled) via + // debugger, thus g_sdltosState won't == sdltosCheckInProgress and we won't reset it to sdltosEnabled / + // i.e. disabling deadlock detection for all subsequent hits. + + const SYNCDeadLockTimeOutState sdltosCheck = (SYNCDeadLockTimeOutState)AtomicCompareExchange( (LONG*)&g_sdltosState, sdltosCheckInProgress, sdltosEnabled ); + OSSYNCAssertSzRTL( sdltosCheck != sdltosDisabled, "Devs, the debugger used to set g_sdltosState to 0 and disables further timeout detection asserts!? Just an FYI. If you did not, then code is confused." ); + } +#endif + + DWORD dwNewTimeout = dwTimeout; + + if ( dwNewTimeout < INFINITE ) + { + dwNewTimeout -= cmsecDeadlock; + } + + fSuccess = _FOSWait( cAvail, dwNewTimeout ); + } + } + else +#endif // SYNC_DEADLOCK_DETECTION + { + fSuccess = _FOSWait( cAvail, dwTimeout ); + } + + State().StopWait(); + PERFOpt( AtomicIncrement( (LONG*)&g_cOSSYNCThreadResume ) ); + + return fSuccess; +} + +// performs an OS level wait until the available count of the semaphore changes. +// this method is guaranteed to return when the corresponding address is signaled, +// but it is also allowed to return for other reasons. the caller should compare +// the new value with the original + +const BOOL CSemaphore::_FOSWait( const LONG cAvail, const DWORD dwTimeout ) +{ + volatile LONG *pcAvail = State().GetAvailAddress(); + + static_assert( sizeof(*pcAvail) == sizeof(cAvail), "Should be of the same size." ); + + OnThreadWaitBegin(); + BOOL fSuccess = WaitOnAddress( pcAvail, (void*)&cAvail, sizeof(cAvail), dwTimeout ); + OnThreadWaitEnd(); + + return fSuccess; +} + + // performance data dumping #include @@ -5768,15 +5601,8 @@ void CKernelSemaphore::Dump( const CDumpContext& dc ) const void CSemaphoreState::Dump( const CDumpContext& dc ) const { - if ( FNoWait() ) - { - DumpMember( dc, m_cAvail ); - } - else - { - DumpMember( dc, m_irksem ); - DumpMember( dc, m_cWaitNeg ); - } + DumpMember( dc, m_cAvail ); + DumpMember( dc, m_cWait ); } void CSemaphoreInfo::Dump( const CDumpContext& dc ) const diff --git a/test/ese/src/devlibtest/sync/syncunit/semaphore.cxx b/test/ese/src/devlibtest/sync/syncunit/semaphore.cxx index 5628570..09a416f 100644 --- a/test/ese/src/devlibtest/sync/syncunit/semaphore.cxx +++ b/test/ese/src/devlibtest/sync/syncunit/semaphore.cxx @@ -675,3 +675,29 @@ ERR SyncSemaphorePerformsBasicUncontendedSemaphoreReleaseAndAcquireWithSemaphore SyncBasicTestTerm; return err; } + +CUnitTest( SyncSemaphoreNoopReleaseAllWaiters, 0x0, "" ); +ERR SyncSemaphoreNoopReleaseAllWaiters::ErrTest() +{ + SyncBasicTestInit; + + CSemaphore* psemaphore = new CSemaphore( CSyncBasicInfo( "CSemaphore test." ) ); + + TestCheck( psemaphore->CWait() == 0 ); + TestCheck( psemaphore->CAvail() == 0 ); + + psemaphore->Release(); + + TestCheck( psemaphore->CWait() == 0 ); + TestCheck( psemaphore->CAvail() == 1 ); + + psemaphore->ReleaseAllWaiters(); + + TestCheck( psemaphore->CWait() == 0 ); + TestCheck( psemaphore->CAvail() == 1 ); + +HandleError: + delete psemaphore; + SyncBasicTestTerm; + return err; +} diff --git a/test/ese/src/devlibtest/sync/syncunit/semaphoreperf.cxx b/test/ese/src/devlibtest/sync/syncunit/semaphoreperf.cxx index 738fb57..94f8a9a 100644 --- a/test/ese/src/devlibtest/sync/syncunit/semaphoreperf.cxx +++ b/test/ese/src/devlibtest/sync/syncunit/semaphoreperf.cxx @@ -751,4 +751,148 @@ ERR CSemaphorePerfTest::ErrTest() return JET_errSuccess; } +// Test fixture. + +class CSemaphoreFairnessTest : public UNITTEST +{ +private: + static CSemaphoreFairnessTest s_instance; + +public: + const char * SzName() const; + const char * SzDescription() const; + bool FRunUnderESE98() const; + bool FRunUnderESENT() const; + bool FRunUnderESE97() const; + ERR ErrTest(); + void TestCase( const LONG cThreads ); +}; + +CSemaphoreFairnessTest CSemaphoreFairnessTest::s_instance; + +const char * CSemaphoreFairnessTest::SzName() const +{ + return "CSemaphoreFairnessTest"; +}; + +const char * CSemaphoreFairnessTest::SzDescription() const +{ + return "Tests the CSemaphore for fairness."; +} + +bool CSemaphoreFairnessTest::FRunUnderESE98() const +{ + return true; +} + +bool CSemaphoreFairnessTest::FRunUnderESENT() const +{ + return true; +} + +bool CSemaphoreFairnessTest::FRunUnderESE97() const +{ + return true; +} + +struct CSemaphoreFairnessTestContext +{ + HANDLE hThread; + CSemaphore *pSemaphore; + BOOL bAggressive; + volatile ULONG *pulExit; + volatile ULONG ulAcquire; +}; + +static DWORD WINAPI FairnessTestThread( LPVOID pvContext ) +{ + CSemaphoreFairnessTestContext *pContext = (CSemaphoreFairnessTestContext*)pvContext; + + while ( !InterlockedCompareExchange( pContext->pulExit, 0, 0 ) ) + { + if ( pContext->bAggressive ) + { + pContext->pSemaphore->Acquire(); + InterlockedIncrement( &pContext->ulAcquire ); + Sleep(1); + pContext->pSemaphore->Release(); + } + else + { + Sleep(1); + pContext->pSemaphore->Acquire(); + InterlockedIncrement( &pContext->ulAcquire ); + pContext->pSemaphore->Release(); + } + } + + return ERROR_SUCCESS; +} + +ERR CSemaphoreFairnessTest::ErrTest() +{ + TestAssert( FOSSyncPreinit() ); + + TestCase( 2 ); + TestCase( 3 ); + TestCase( 4 ); + TestCase( 6 ); + TestCase( 8 ); + + OSSyncPostterm(); + + return JET_errSuccess; +} + +void CSemaphoreFairnessTest::TestCase( const LONG cThreads ) +{ + CSemaphore semaphore( CSyncBasicInfo( "CSemaphoreFairnessTest" ) ); + volatile ULONG ulExit = 0; + + CSemaphoreFairnessTestContext *pContexts = new CSemaphoreFairnessTestContext[cThreads]; + for ( LONG i = 0; i < cThreads; i++ ) + { + CSemaphoreFairnessTestContext *pContext = &pContexts[i]; + + memset( pContext, 0, sizeof(*pContext) ); + pContext->pSemaphore = &semaphore; + pContext->pulExit = &ulExit; + pContext->bAggressive = i % 2; + pContext->hThread = CreateThread( NULL, 0, FairnessTestThread, pContext, 0, NULL ); + TestAssert( pContext->hThread ); + } + + semaphore.Release( 1 ); + Sleep( 3 * 1000 ); + InterlockedExchange( &ulExit, 1 ); + + ULONG ulAcquireMin = ULONG_MAX; + ULONG ulAcquireMax = 0; + for ( LONG i = 0; i < cThreads; i++ ) + { + CSemaphoreFairnessTestContext *pContext = &pContexts[i]; + + TestAssert( WaitForSingleObject( pContext->hThread, INFINITE ) == WAIT_OBJECT_0 ); + TestAssert( CloseHandle( pContext->hThread ) ); + + if ( pContext->ulAcquire < ulAcquireMin ) + { + ulAcquireMin = pContext->ulAcquire; + } + + if ( pContext->ulAcquire > ulAcquireMax ) + { + ulAcquireMax = pContext->ulAcquire; + } + + wprintf( L"\t%lu", pContext->ulAcquire ); + } + + wprintf( L"\n" ); + + TestAssert( ulAcquireMax < ulAcquireMin * 4 ); + + delete[] pContexts; +} +