Skip to content

Commit

Permalink
Merge pull request #6 from chordify/lock-acquire-exceptions
Browse files Browse the repository at this point in the history
Throw more informative exceptions when lock acquisition times out
  • Loading branch information
isomorpheme authored Apr 15, 2024
2 parents b1f0895 + f0d2c77 commit b7c327d
Show file tree
Hide file tree
Showing 2 changed files with 61 additions and 15 deletions.
3 changes: 2 additions & 1 deletion src/Database/Redis/Schema.hs
Original file line number Diff line number Diff line change
Expand Up @@ -133,13 +133,13 @@ data RedisException
| TransactionAborted
| TransactionError String
| CouldNotDecodeValue (Maybe ByteString)
| LockAcquireTimeout
| UnexpectedStatus String Hedis.Status
| EmptyAlternative -- for 'instance Alternative Tx'
deriving (Show, Exception)

-- | Time-To-Live for Redis values. The Num instance works in (integral) seconds.
newtype TTL = TTLSec { ttlToSeconds :: Integer }
deriving stock (Show)
deriving newtype (Eq, Ord, Num)

run :: MonadIO m => Pool inst -> RedisM inst a -> m a
Expand Down Expand Up @@ -403,6 +403,7 @@ class Value inst val where
data SimpleValueIdentifier
= SviTopLevel ByteString -- ^ Stored in a top-level key.
| SviHash ByteString ByteString -- ^ Stored in a hash field.
deriving stock (Show)

-- | Simple values, like strings, integers or enums,
-- that be represented as a single bytestring.
Expand Down
73 changes: 59 additions & 14 deletions src/Database/Redis/Schema/Lock.hs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ module Database.Redis.Schema.Lock
, defaultMetaParams
, ExclusiveLock, withExclusiveLock
, ShareableLock, withShareableLock, LockSharing(..)
, ExclusiveLockAcquireTimeout(..)
, ShareableLockAcquireTimeout(..)
)
where

Expand All @@ -39,24 +41,49 @@ import qualified Data.ByteString.Char8 as BS
import System.Random ( randomIO )

import Control.Concurrent ( threadDelay, myThreadId )
import Control.Exception ( Exception )
import Control.Monad.Fix ( fix )
import Control.Monad.Catch ( MonadThrow(..), MonadCatch(..), MonadMask(..), throwM, finally )
import Control.Monad.IO.Class ( liftIO, MonadIO )

import qualified Database.Redis.Schema as Redis

data ExclusiveLockAcquireTimeout = ExclusiveLockAcquireTimeout
{ elatRefIdentifier :: Redis.SimpleValueIdentifier
, elatLockParams :: LockParams
, elatOurId :: LockOwnerId
}
deriving stock (Show)

instance Exception ExclusiveLockAcquireTimeout

data ShareableLockAcquireTimeout = ShareableLockAcquireTimeout
{ slatRefIdentifier :: ByteString
, slatLockParams :: ShareableLockParams
, slatSharing :: LockSharing
, slatOurId :: LockOwnerId
}
deriving stock (Show)

instance Exception ShareableLockAcquireTimeout

data LockParams = LockParams
{ lpMeanRetryInterval :: NominalDiffTime
, lpAcquireTimeout :: NominalDiffTime
, lpLockTTL :: Redis.TTL
}
deriving stock (Show)

-- | ID of the process that owns the Redis lock.
newtype LockOwnerId = LockOwnerId { _unLockOwnerId :: ByteString }
deriving stock (Show)
deriving newtype (Eq, Ord, Redis.Serializable)
instance Redis.Value inst LockOwnerId
instance Redis.SimpleValue inst LockOwnerId

data AcquireResult = Acquired | AcquireTimeout
deriving stock (Show)

--------------------
-- Exclusive lock --
--------------------
Expand All @@ -82,6 +109,9 @@ instance Redis.SimpleValue inst ExclusiveLock
--
-- * For exclusive locks, 'withExclusiveLock' is more efficient.
--
-- This throws 'ExclusiveLockAcquireTimeout' if we fail to acquire the lock before the
-- timeout specified in the 'LockParams'.
--
withExclusiveLock ::
( MonadCatch m, MonadThrow m, MonadMask m, MonadIO m
, Redis.Ref ref, Redis.ValueType ref ~ ExclusiveLock
Expand All @@ -92,31 +122,36 @@ withExclusiveLock ::
-> m a -- ^ The action to perform under lock
-> m a
withExclusiveLock redis lp ref action = do
exclusiveLockAcquire redis lp ref >>= \case
Nothing -> throwM Redis.LockAcquireTimeout
Just ourId -> action `finally` exclusiveLockRelease redis ref ourId
(result, ourId) <- exclusiveLockAcquire redis lp ref
case result of
AcquireTimeout -> throwM ExclusiveLockAcquireTimeout
{ elatRefIdentifier = Redis.toIdentifier ref
, elatLockParams = lp
, elatOurId = ourId
}
Acquired -> action `finally` exclusiveLockRelease redis ref ourId

-- | Acquire a distributed exclusive lock.
-- Returns Nothing on timeout. Otherwise it returns the unique client ID used for the lock.
exclusiveLockAcquire ::
( MonadCatch m, MonadThrow m, MonadMask m, MonadIO m
, Redis.Ref ref, Redis.ValueType ref ~ ExclusiveLock
)
=> Redis.Pool (Redis.RefInstance ref) -> LockParams -> ref -> m (Maybe LockOwnerId)
=> Redis.Pool (Redis.RefInstance ref) -> LockParams -> ref -> m (AcquireResult, LockOwnerId)
exclusiveLockAcquire redis lp ref = do
-- this is unique only if we have only one instance of HConductor running
ourId <- LockOwnerId . BS.pack . show <$> liftIO myThreadId -- unique client id
tsDeadline <- addUTCTime (lpAcquireTimeout lp) <$> liftIO getCurrentTime
fix $ \ ~retry -> do -- ~ makes the lambda lazy
tsNow <- liftIO getCurrentTime
if tsNow >= tsDeadline
then return Nothing -- didn't manage to acquire the lock before timeout
then return (AcquireTimeout, ourId) -- didn't manage to acquire the lock before timeout
else do
-- set the lock if it does not exist
didNotExist <- Redis.run redis $
Redis.setIfNotExistsTTL ref (ExclusiveLock ourId) (lpLockTTL lp)
if didNotExist
then return (Just ourId) -- everything went well
then return (Acquired, ourId) -- everything went well
else do
-- someone got there first; wait a bit and try again
fuzzySleep (lpMeanRetryInterval lp)
Expand Down Expand Up @@ -225,6 +260,7 @@ data ShareableLockParams = ShareableLockParams
{ slpParams :: LockParams
, slpMetaParams :: LockParams
}
deriving (Show)

defaultMetaParams :: LockParams
defaultMetaParams = LockParams
Expand All @@ -244,6 +280,9 @@ defaultMetaParams = LockParams
--
-- * For exclusive locks, withExclusiveLock is more efficient.
--
-- This throws 'ShareableLockAcquireTimeout' if we fail to acquire the lock before the
-- timeout specified in the 'ShareableLockParams'.
--
-- NOTE: the shareable lock seems to have quite a lot of performance overhead.
-- Always benchmark first whether the exclusive lock would perform better in your scenario,
-- even when a shareable lock would be sufficient in theory.
Expand All @@ -258,26 +297,32 @@ withShareableLock
-> ref -- ^ Lock ref
-> m a -- ^ The action to perform under lock
-> m a
withShareableLock redis slp lockSharing ref action =
shareableLockAcquire redis slp lockSharing ref >>= \case
Nothing -> throwM Redis.LockAcquireTimeout
Just ourId -> action
`finally` shareableLockRelease redis slp ref lockSharing ourId
withShareableLock redis slp lockSharing ref action = do
(result, ourId) <- shareableLockAcquire redis slp lockSharing ref
case result of
AcquireTimeout -> throwM ShareableLockAcquireTimeout
{ slatRefIdentifier = Redis.toIdentifier ref
, slatLockParams = slp
, slatSharing = lockSharing
, slatOurId = ourId
}
Acquired ->
action `finally` shareableLockRelease redis slp ref lockSharing ourId

shareableLockAcquire ::
forall m ref.
( MonadCatch m, MonadThrow m, MonadMask m, MonadIO m
, Redis.Ref ref, Redis.ValueType ref ~ ShareableLock
, Redis.SimpleValue (Redis.RefInstance ref) (MetaLock ref)
) => Redis.Pool (Redis.RefInstance ref) -> ShareableLockParams -> LockSharing -> ref -> m (Maybe LockOwnerId)
) => Redis.Pool (Redis.RefInstance ref) -> ShareableLockParams -> LockSharing -> ref -> m (AcquireResult, LockOwnerId)
shareableLockAcquire redis slp lockSharing ref = do
-- this is unique only if we have only one instance of HConductor running
ourId <- LockOwnerId . BS.pack . show <$> liftIO myThreadId -- unique client id
tsDeadline <- addUTCTime (lpAcquireTimeout $ slpParams slp) <$> liftIO getCurrentTime
fix $ \ ~retry -> do -- ~ makes the lambda lazy
tsNow <- liftIO getCurrentTime
if tsNow >= tsDeadline
then return Nothing -- didn't manage to acquire the lock before timeout
then return (AcquireTimeout, ourId) -- didn't manage to acquire the lock before timeout
else do
-- acquire the lock if possible, using the meta lock to synchronise access
success <- withExclusiveLock redis (slpMetaParams slp) (MetaLock ref) $
Expand All @@ -304,7 +349,7 @@ shareableLockAcquire redis slp lockSharing ref = do
then do
-- everything went well, set ttl and return
Redis.run redis $ Redis.setTTL ref (lpLockTTL $ slpParams slp)
return (Just ourId)
return (Acquired, ourId)
else do
-- someone got there first; wait a bit and try again
fuzzySleep $ lpMeanRetryInterval (slpParams slp)
Expand Down

0 comments on commit b7c327d

Please sign in to comment.