From 7a40b77fe5469be6c3008a2ecfe5414e3d17b490 Mon Sep 17 00:00:00 2001 From: Jeroen Bransen Date: Thu, 23 May 2024 13:45:09 +0200 Subject: [PATCH 1/5] Add Database.Redis.Schema.RemoteJob --- package.yaml | 1 + redis-schema.cabal | 6 +- src/Database/Redis/Schema/RemoteJob.hs | 356 +++++++++++++++++++++++++ stack.yaml | 2 + 4 files changed, 363 insertions(+), 2 deletions(-) create mode 100644 src/Database/Redis/Schema/RemoteJob.hs diff --git a/package.yaml b/package.yaml index c2b31eb..d0c7cc1 100644 --- a/package.yaml +++ b/package.yaml @@ -21,6 +21,7 @@ dependencies: - bytestring - time - mtl +- monadIO - random - exceptions - hedis diff --git a/redis-schema.cabal b/redis-schema.cabal index 11407ff..cb0782c 100644 --- a/redis-schema.cabal +++ b/redis-schema.cabal @@ -1,10 +1,10 @@ cabal-version: 1.12 --- This file has been generated from package.yaml by hpack version 0.34.7. +-- This file has been generated from package.yaml by hpack version 0.35.0. -- -- see: https://github.com/sol/hpack -- --- hash: 8ad979f047b1d31267791ddddc75577141d0b1f972f265c586894ab5f99c498c +-- hash: 1876630159ac153904237e5ab7ec2b440a07213ac8ca4b6420a02fff46e1524d name: redis-schema version: 0.1.0 @@ -31,6 +31,7 @@ library exposed-modules: Database.Redis.Schema Database.Redis.Schema.Lock + Database.Redis.Schema.RemoteJob other-modules: Paths_redis_schema hs-source-dirs: @@ -45,6 +46,7 @@ library , containers , exceptions , hedis + , monadIO , mtl , numeric-limits , random diff --git a/src/Database/Redis/Schema/RemoteJob.hs b/src/Database/Redis/Schema/RemoteJob.hs new file mode 100644 index 0000000..1056626 --- /dev/null +++ b/src/Database/Redis/Schema/RemoteJob.hs @@ -0,0 +1,356 @@ +{-# LANGUAGE AllowAmbiguousTypes #-} -- for remoteJobWorker +{-# LANGUAGE DataKinds #-} +{-# LANGUAGE DeriveGeneric #-} +{-# LANGUAGE DerivingStrategies #-} +{-# LANGUAGE FlexibleContexts #-} +{-# LANGUAGE FlexibleInstances #-} +{-# LANGUAGE GeneralizedNewtypeDeriving #-} +{-# LANGUAGE LambdaCase #-} +{-# LANGUAGE MultiParamTypeClasses #-} +{-# LANGUAGE NumericUnderscores #-} +{-# LANGUAGE RankNTypes #-} +{-# LANGUAGE ScopedTypeVariables #-} +{-# LANGUAGE TypeApplications #-} +{-# LANGUAGE TypeFamilies #-} +{-# LANGUAGE TypeOperators #-} +{-# LANGUAGE UndecidableInstances #-} +module Database.Redis.Schema.RemoteJob ( + -- * Types + WorkerId (..), + WorkerHandle, + RemoteJobError (..), + JobQueue (..), + + -- * Main functionality + runRemoteJob, + runRemoteJobAsync, + remoteJobWorker, + withRemoteJobWorker, + gracefulShutdown, + + -- * Inspection + queueLength, + countRunningJobs, +) where + +import Data.Binary ( decode, encode, Binary(..) ) +import Data.Bifunctor as BF ( second ) +import Data.Kind ( Type ) +import Data.Maybe ( isJust ) +import Data.Proxy ( Proxy(..) ) +import Data.String ( IsString ) +import Data.Text ( Text ) +import Data.Time ( UTCTime, getCurrentTime, addUTCTime ) +import Data.Time.Clock.POSIX ( utcTimeToPOSIXSeconds ) +import Data.UUID ( UUID ) +import Data.UUID.V4 ( nextRandom ) +import qualified Data.ByteString.Char8 as BS +import qualified Data.ByteString.Lazy as BSL +import qualified Data.Set as Set + +import Database.Redis ( ConnectionLostException ) +import Database.Redis.Schema as Redis + +import Control.Concurrent.MonadIO +import Control.Monad ( when, forever ) +import Control.Monad.Catch +import Control.Exception ( SomeAsyncException ) + +import GHC.Generics ( Generic ) + + +-- | Errors that can occur in the remote job running. +data RemoteJobError + = RemoteJobException String + | NoActiveWorkers + | Timeout + deriving ( Show, Generic ) +instance Binary RemoteJobError + +instance Serializable RemoteJobError where + fromBS = readBinary + toBS = showBinary + +-- | Identifier for the worker process, is only used for inspecting the queue +newtype WorkerId = WorkerId { unWorkerId :: Text } + deriving newtype ( Show, Eq, Ord, IsString, Serializable, Binary ) + +-- | Handle of the worker process, which can be used in the graceful shutdown procedure +data WorkerHandle = WorkerHandle (MVar ()) ThreadId + +class JobQueue jq where + -- | The remote job protocol, a list of 'i -> o' entries indicating + -- this queue can contains jobs taking type 'i' as input and + -- returning type 'o'. Both 'i' and 'o' must have a binary instance. + type RPC jq :: [Type] + + -- | Prefix for the Redis keys + keyPrefix :: BS.ByteString + + -- | Which Redis instance the queue lives in. + type RedisInstance jq :: Instance + type RedisInstance jq = DefaultInstance + +-- | This queue contains many requests. +-- There is only one request queue and it's read by all workers. +data RequestQueue jq = RequestQueue +instance JobQueue jq => Ref (RequestQueue jq) where + type RefInstance (RequestQueue jq) = RedisInstance jq + type ValueType (RequestQueue jq) = [(Priority, (UUID, Int, BSL.ByteString))] + toIdentifier RequestQueue = + colonSep [keyPrefix @jq, "requests"] + +-- | This set contains the requests that are currently being processed. +data RunningJobs jq = RunningJobs +instance JobQueue jq => Ref (RunningJobs jq) where + type RefInstance (RunningJobs jq) = RedisInstance jq + type ValueType (RunningJobs jq) = Set.Set (WorkerId, (UUID, Int, BSL.ByteString)) + toIdentifier RunningJobs = + colonSep [keyPrefix @jq, "running"] + +-- | A box that contains only one response. +-- For every response, a unique box is created (tagged with job ID). +newtype ResultBox jq = ResultBox UUID +instance JobQueue jq => Ref (ResultBox jq) where + type RefInstance (ResultBox jq) = RedisInstance jq + type ValueType (ResultBox jq) = [Either RemoteJobError BSL.ByteString] + toIdentifier (ResultBox uuid) = + colonSep [keyPrefix @jq, "result", toBS uuid] + +-- | A registry of all active workers +data Workers jq = Workers +instance JobQueue jq => Ref (Workers jq) where + type RefInstance (Workers jq) = RedisInstance jq + type ValueType (Workers jq) = [(Priority, WorkerId)] + toIdentifier Workers = + Redis.colonSep [keyPrefix @jq, "workers"] + +-- | Type class to check where in the 'RPC' list a i->o job occurs, which +-- is then used together with 'CanHandle' to use the right handler. +class FindHandler (i :: Type) (o :: Type) (xs :: [Type]) where + handlerIdx :: Proxy (i -> o) -> Proxy xs -> Int + +instance (FindHandler' i o xs (IsHead (i -> o) xs)) => FindHandler i o xs where + handlerIdx = handlerIdx' (Proxy @(IsHead (i -> o) xs)) + +class FindHandler' (i :: Type) (o :: Type) (xs :: [Type]) (isHead :: Bool) where + handlerIdx' :: Proxy isHead -> Proxy (i -> o) -> Proxy xs -> Int + +instance FindHandler' i o ((i -> o) ': xs) 'True where + handlerIdx' _ _ _ = 0 + +instance FindHandler i o xs => FindHandler' i o (x ': xs) 'False where + handlerIdx' _ _ _ = 1 + handlerIdx (Proxy @(i -> o)) (Proxy @xs) + +type family IsHead (x :: Type) (xs :: [Type]) :: Bool where + IsHead x (x ': _) = True + IsHead x _ = 'False + + +-- | An instance 'CanHandle m xs' means that the list xs of i->o jobs +-- can be handled in monad m, e.g. there exists Binary instances for all +-- i and o, and the instances take care of encoding and decoding as the +-- right type. +class CanHandle (m :: Type -> Type) (xs :: [Type]) where + type HandleTy m xs r :: Type + doHandle :: Proxy m -> Proxy xs -> ((Int -> BSL.ByteString -> m BSL.ByteString) -> m r) -> HandleTy m xs r + +instance CanHandle m '[] where + type HandleTy m '[] r = m r + doHandle Proxy Proxy cont = cont $ \_ _ -> error "remoteJobWorker: protocol broken" + +instance (Monad m, Binary i, Binary o, CanHandle m xs) => CanHandle m ((i -> o) ': xs) where + type HandleTy m ((i -> o) ': xs) r = (i -> m o) -> HandleTy m xs r + doHandle Proxy Proxy cont f = doHandle (Proxy @m) (Proxy @xs) (cont . g) where + g handler i bsi + | i == 0 = encode <$> f (decode bsi) + | otherwise = handler (i - 1) bsi + +-- | Run a job on a remote worker. This will block until a 'remoteJobWorker' process picks up the +-- task. The 'Double' argument is the priority, jobs with a lower priority are picked up earlier. +runRemoteJob :: + forall q i o m. + (MonadCatch m, MonadIO m, JobQueue q, FindHandler i o (RPC q), Binary i, Binary o) => + Bool -> Pool (RedisInstance q) -> Priority -> i -> m (Either RemoteJobError o) +runRemoteJob waitForWorkers pool prio a = do + -- Check that there are active workers + abort <- + if waitForWorkers + then return False + else (==0) <$> run pool (countWorkers @q) + + if abort then return $ Left NoActiveWorkers + else do + -- Add the job + jobId <- liftIO nextRandom + let job = (jobId, handlerIdx (Proxy @(i -> o)) (Proxy @(RPC q)), encode a) + + -- Add to the queue and wait for the result. If any exception occurs at this point + -- (which is then likely an async exception), we remove the element from the queue, + -- because we will not listen to the result anymore anyway. + popResult <- run pool + ( do zInsert (RequestQueue @q) [(prio,job)] + lPopRightBlocking 0 (ResultBox @q jobId) + ) `onException` + run pool (zDelete (RequestQueue @q) job) + + -- Now look at the result and decode it. + return $ case popResult of + Just r -> BF.second decode r + Nothing -> Left Timeout + +-- | Run a job on a remote worker but do not wait for any results. This assumes the remote +-- job has some side-effect, which is executed by a 'remoteJobWorker' process that picks +-- up this task. +runRemoteJobAsync :: + forall q i m. + (MonadCatch m, MonadIO m, JobQueue q, FindHandler i () (RPC q), Binary i) => + Pool (RedisInstance q) -> Priority -> i -> m () +runRemoteJobAsync pool prio a = do + -- Add to the queue and forget about it. + jobId <- liftIO nextRandom + let job = (jobId, handlerIdx (Proxy @(i -> ())) (Proxy @(RPC q)), encode a) + run pool $ zInsert (RequestQueue @q) [(prio,job)] + +-- | The actual worker loop, this generalizes over 'remoteJobWorker' and 'forkRemoteJobWorker' +remoteJobWorker' :: forall q m r. (MonadIO m, MonadCatch m, MonadMask m, JobQueue q, CanHandle m (RPC q)) => + (MVar () -> m () -> m r) -> WorkerId -> Pool (RedisInstance q) -> (SomeException -> m ()) -> HandleTy m (RPC q) r +remoteJobWorker' cont wid pool logger = doHandle (Proxy @m) (Proxy @(RPC q)) $ \handler -> do + -- MVar that is used for the graceful shutdown procedure. When it is full, the worker + -- thread is not doing anything and can be killed. As soon as the worker starts working + -- it takes the value and puts it back when the work is done. + workerFree <- liftIO $ newMVar () + let + -- Main loop, pop elements from the queue and handle them + loop :: m () + loop = run pool (bzPopMin (RequestQueue @q) 0) >>= \case + Just (_, it@(jobId, idx, bsa)) -> do + -- Update the RunningJobs queue at the start and end of this block, + -- and keep the workerFree var up to date + bracket_ + (run pool (sInsert (RunningJobs @q) [(wid,it)]) >> liftIO (takeMVar workerFree)) + (run pool (sDelete (RunningJobs @q) [(wid,it)]) >> liftIO (putMVar workerFree ())) $ do + -- Call the actual handler + resp <- fmap Right (handler idx bsa) + `catchAll` + (return . Left) + + -- Send back the result + let bso = case resp of + Left e -> Left $ RemoteJobException $ show e + Right b -> Right b + run pool $ do + let box = ResultBox @q jobId + lPushLeft box [bso] + -- set ttl to ensure the data is not left behind in case of crashes, + -- the caller should be awaiting this already, so it's either read + -- directly or it is never read. + setTTLIfExists_ box (5 * Redis.second) + + -- Check for exceptions + case resp of + Right _ -> return () + Left e -> do + -- Call the parent logger + logger e + + -- And in case of an async exception, rethrow + let mbAsync :: Maybe SomeAsyncException + mbAsync = fromException e + when (isJust mbAsync) $ throwM e + + -- Sleep for a tiny bit, to allow the graceful shutdown procedure to interrupt when needed + liftIO $ threadDelay 1000 -- 1ms + loop + + -- With BRPOP and no timeout there should always be a result + _ -> error "remoteJobWorker: Got no result with BRPOP and timeout 0" + + -- Fork a keep-alive loop that updates our latest-seen time every + -- 5 seconds. We use the current UTC timestamp as priority, so + -- that we can efficiently count the servers that checked in recently. + signup = liftIO $ forkIO $ forever $ do + t <- liftIO getCurrentTime + run pool $ zInsert (Workers @q) [(utcTimeToPriority t, wid)] + threadDelay 5_000_000 -- 5s + + -- Kill the keep-alive loop and remove ourselves from the list. + signout tid = do + liftIO $ killThread tid + run pool $ zDelete (Workers @q) wid + + -- Signup and signout in an exception-safe way. + -- When the connection is lost (which will also throw exceptions + -- in signup/signout), we sleep for a while and try again + let outerLoop = bracket signup signout (const loop) + `catch` + \(e :: ConnectionLostException) -> do + -- Make sure to log the exception + logger (toException e) + + -- Sleep for 10s + liftIO $ threadDelay 10_000_000 + + -- And run the loop again + outerLoop + + -- Pass the continuation the function to start up the outer loop + cont workerFree outerLoop + + +-- | Worker for handling jobs from the queue. The first function that matches the given input/output types +-- will be executed. Multiple workers can be run in parallel. +-- When exceptions appear in the handling function, this exception will be sent to the caller +-- as a String and this exception is thrown from the worker process. +remoteJobWorker :: forall q m. (MonadIO m, MonadCatch m, MonadMask m, JobQueue q, CanHandle m (RPC q)) => + WorkerId -> Pool (RedisInstance q) -> (SomeException -> m ()) -> HandleTy m (RPC q) () +remoteJobWorker = remoteJobWorker' @q cont where + cont :: MVar () -> m () -> m () + cont _ runLoop = runLoop + +-- | Forking version of 'remoteJobWorker', which forks a thread to do the work and returns a 'WorkerHandle' +-- that can be passed to 'gracefulShutdown' to end the worker thread in a graceful way. +-- This function ensures that on exceptions, the worker is cleaned up properly. +withRemoteJobWorker :: forall q m a. (HasFork m, MonadIO m, MonadCatch m, MonadMask m, JobQueue q, CanHandle m (RPC q)) => + WorkerId -> Pool (RedisInstance q) -> (SomeException -> m ()) -> (WorkerHandle -> m a) -> HandleTy m (RPC q) a +withRemoteJobWorker wid pool logger outerCont = remoteJobWorker' @q cont wid pool logger where + cont :: MVar () -> m () -> m a + cont workerFree runLoop = do + tid <- fork runLoop + let hd = WorkerHandle workerFree tid + outerCont hd + `finally` -- on exceptions or when the outer thread finishes, make sure to clean up + hardShutdown hd + +-- | Gracefully shut down the worker, which means waiting for the current job to complete +-- There is a tiny race condition, so there is a small chance the worker just took +-- a new job when it is killed. +gracefulShutdown :: MonadIO m => WorkerHandle -> m () +gracefulShutdown (WorkerHandle workerFree tid) = liftIO $ do + takeMVar workerFree + killThread tid + +-- | Send an async exception to the worker thread to kill it and clean up. The remote job +-- caller will receive a 'RemoteJobException' if a job is running. +hardShutdown :: MonadIO m => WorkerHandle -> m () +hardShutdown (WorkerHandle _ tid) = liftIO $ killThread tid + +-- | Returns the number of workers that are currently connected to the job queue. +countWorkers :: forall jq. JobQueue jq => RedisM (RedisInstance jq) Integer +countWorkers = do + -- Count workers that checked in at most 10s ago. Workers are supposed to + -- do this every 5 seconds, so we allow missing one beat. + t <- liftIO getCurrentTime + zCount (Workers @jq) (utcTimeToPriority $ addUTCTime (-10) t) maxBound + +-- | Helper for worker list, which converts the current timestamp to a priority +utcTimeToPriority :: UTCTime -> Priority +utcTimeToPriority = Priority . realToFrac . utcTimeToPOSIXSeconds + +-- | Returns the number of jobs that are currently queued +queueLength :: forall jq. JobQueue jq => RedisM (RedisInstance jq) Integer +queueLength = zSize (RequestQueue @jq) + +-- | Returns the number of jobs that are currently being processed +countRunningJobs :: forall jq. JobQueue jq => RedisM (RedisInstance jq) Integer +countRunningJobs = sSize (RunningJobs @jq) diff --git a/stack.yaml b/stack.yaml index a5e4e03..5e48d9d 100644 --- a/stack.yaml +++ b/stack.yaml @@ -1 +1,3 @@ resolver: lts-18.23 +extra-deps: + - monadIO-0.11.1.0@sha256:2407c8aee3a74f3eba897f7c87f702f502394aec8cd412f3d2334cc353f54f13,964 From 753c248e7c66c14b097b1427c0ba909e653c6ac7 Mon Sep 17 00:00:00 2001 From: Jeroen Bransen Date: Sat, 25 May 2024 17:11:30 +0200 Subject: [PATCH 2/5] Add RemoteJob example to readme --- README.md | 75 +++++++++++++++++++++++++++++++++++++++++++++++++++++-- 1 file changed, 73 insertions(+), 2 deletions(-) diff --git a/README.md b/README.md index 2f458f9..4b3731e 100644 --- a/README.md +++ b/README.md @@ -894,8 +894,79 @@ how a library can be implemented on top of `Database.Redis.Schema`. ### Remote jobs -Sadly, this library has not been published yet. -We'd like to, though. +In `Database.Redis.Schema.RemoteJob` a Redis-based worker queue is implemented, to run CPU +intensive jobs on remote machines. The queue is strongly typed, and can contain multiple +different jobs to be executed, with priorities, that workers can pick up. + +As an example, we define a queue that can contain three types of jobs: +```haskell +newtype ComputeFactorial = CF Integer deriving ( Binary ) +newtype ComputeSquare = CS Integer deriving ( Binary ) + +data MyQueue +instance JobQueue MyQueue where + type RPC MyQueue = + '[ ComputeFactorial -> Integer + , ComputeSquare -> Integer + , String -> String + ] + keyPrefix = "myqueue" +``` +Here the `MyQueue` type is used only during compile time to let the compile find the right +instances. To distinguish between the two `Integer -> Integer` functions, we wrap them in +newtypes. A `Binary` instance must exist for all inputs and outputs, so that they can be put +into Redis. + +Based on this queue, we can now define a worker that executes the jobs. This worker must +define a function for each the the types in `RPC`, and runs in a monadic context (which +we fixed to `IO` for the example). + +```haskell +fac :: ComputeFactorial -> IO Integer +fac (CF n) = do + putStrLn $ "Computing the factorial of " ++ show n + pure $ product [1..n] + +sm :: ComputeSquare -> IO Integer +sm (CS n) = pure $ n * n + +runWorker :: IO () +runWorker = do + pool <- connect "redis:///" 10 + let myId = "localworker" + let err e = error $ "Something went wrong: " ++ show e + remoteJobWorker @MyQueue myId pool err fac sm (pure . reverse) +``` +The arguments to `remoteJobWorker` are a unique identifier for this worker (for counting +the workers, executing jobs will work fine even with overlapping ids), a connection pool, +a logging function for exceptions, and then for each element in `RPC` the right function. + +Now if we call `runWorker` it will block until work needs to be done, and it will never +return except when an async exception is thrown. In production cases it is adviced to use +`withRemoteJobWorker` instead, which forks off a worker thread and provides a `WorkerHandle` +to it's continuation, which can be passed to `gracefulShutdown` to handle the currently +running job and then gracefully return. + +Now from another process or even other machine we can 'execute jobs', e.g. add them to the +queue and synchronously wait for their result. For example: +```haskell +runJobs :: IO () +runJobs = do + pool <- connect "redis:///" 10 + a <- runRemoteJob @MyQueue @String @String False pool 1 "test" + print a + b <- runRemoteJob @MyQueue @ComputeFactorial @Integer False pool 1 (CF 5) + print b +``` +This will print: +```ghci> runJobs +Right "tset" +Right 120 +``` +The underlying Redis implementation is based on blocking reads from sorted sets (`BZPOPMIN`), +which is concurrency safe and no polling is needed. An arbitrary amount of workers can be run +and jobs can be executed from arbitrary machines. Only the `countWorkers` implementation +is based on a keep-alive loop on the workers, to properly deal with TCP connection losses. ## Future work From ac7790a5e1dd76590311aaacc2c20723838b1d05 Mon Sep 17 00:00:00 2001 From: Jeroen Bransen Date: Sat, 25 May 2024 18:30:23 +0200 Subject: [PATCH 3/5] Use tick for promoted constructor everywhere --- src/Database/Redis/Schema/RemoteJob.hs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/Database/Redis/Schema/RemoteJob.hs b/src/Database/Redis/Schema/RemoteJob.hs index 1056626..3b4c580 100644 --- a/src/Database/Redis/Schema/RemoteJob.hs +++ b/src/Database/Redis/Schema/RemoteJob.hs @@ -143,7 +143,7 @@ instance FindHandler i o xs => FindHandler' i o (x ': xs) 'False where handlerIdx' _ _ _ = 1 + handlerIdx (Proxy @(i -> o)) (Proxy @xs) type family IsHead (x :: Type) (xs :: [Type]) :: Bool where - IsHead x (x ': _) = True + IsHead x (x ': _) = 'True IsHead x _ = 'False From 46b0728ef0b238e051a0f8f4ece9f3e967cbee54 Mon Sep 17 00:00:00 2001 From: Jeroen Bransen Date: Fri, 21 Jun 2024 10:09:27 +0200 Subject: [PATCH 4/5] Review comments --- README.md | 2 +- src/Database/Redis/Schema/RemoteJob.hs | 58 +++++++++++++++++--------- 2 files changed, 40 insertions(+), 20 deletions(-) diff --git a/README.md b/README.md index 4b3731e..4b2fe04 100644 --- a/README.md +++ b/README.md @@ -912,7 +912,7 @@ instance JobQueue MyQueue where ] keyPrefix = "myqueue" ``` -Here the `MyQueue` type is used only during compile time to let the compile find the right +Here the `MyQueue` type is used only during compile time to let the compiler find the right instances. To distinguish between the two `Integer -> Integer` functions, we wrap them in newtypes. A `Binary` instance must exist for all inputs and outputs, so that they can be put into Redis. diff --git a/src/Database/Redis/Schema/RemoteJob.hs b/src/Database/Redis/Schema/RemoteJob.hs index 3b4c580..ee25694 100644 --- a/src/Database/Redis/Schema/RemoteJob.hs +++ b/src/Database/Redis/Schema/RemoteJob.hs @@ -91,12 +91,24 @@ class JobQueue jq where type RedisInstance jq :: Instance type RedisInstance jq = DefaultInstance + +-- | Type for representing a job request in Redis +data Job = Job + { jobId :: UUID + , jobHandlerIdx :: Int + , jobInput :: BSL.ByteString + } deriving ( Eq, Ord ) + +instance Serializable Job where + toBS j = toBS (jobId j, jobHandlerIdx j, jobInput j) + fromBS = fmap (\(jid,jidx,jinp) -> Job jid jidx jinp) . fromBS + -- | This queue contains many requests. -- There is only one request queue and it's read by all workers. data RequestQueue jq = RequestQueue instance JobQueue jq => Ref (RequestQueue jq) where type RefInstance (RequestQueue jq) = RedisInstance jq - type ValueType (RequestQueue jq) = [(Priority, (UUID, Int, BSL.ByteString))] + type ValueType (RequestQueue jq) = [(Priority, Job)] toIdentifier RequestQueue = colonSep [keyPrefix @jq, "requests"] @@ -104,7 +116,7 @@ instance JobQueue jq => Ref (RequestQueue jq) where data RunningJobs jq = RunningJobs instance JobQueue jq => Ref (RunningJobs jq) where type RefInstance (RunningJobs jq) = RedisInstance jq - type ValueType (RunningJobs jq) = Set.Set (WorkerId, (UUID, Int, BSL.ByteString)) + type ValueType (RunningJobs jq) = Set.Set (WorkerId, Job) toIdentifier RunningJobs = colonSep [keyPrefix @jq, "running"] @@ -127,19 +139,19 @@ instance JobQueue jq => Ref (Workers jq) where -- | Type class to check where in the 'RPC' list a i->o job occurs, which -- is then used together with 'CanHandle' to use the right handler. -class FindHandler (i :: Type) (o :: Type) (xs :: [Type]) where +class HasHandler (i :: Type) (o :: Type) (xs :: [Type]) where handlerIdx :: Proxy (i -> o) -> Proxy xs -> Int -instance (FindHandler' i o xs (IsHead (i -> o) xs)) => FindHandler i o xs where +instance (HasHandler' i o xs (IsHead (i -> o) xs)) => HasHandler i o xs where handlerIdx = handlerIdx' (Proxy @(IsHead (i -> o) xs)) -class FindHandler' (i :: Type) (o :: Type) (xs :: [Type]) (isHead :: Bool) where +class HasHandler' (i :: Type) (o :: Type) (xs :: [Type]) (isHead :: Bool) where handlerIdx' :: Proxy isHead -> Proxy (i -> o) -> Proxy xs -> Int -instance FindHandler' i o ((i -> o) ': xs) 'True where +instance HasHandler' i o ((i -> o) ': xs) 'True where handlerIdx' _ _ _ = 0 -instance FindHandler i o xs => FindHandler' i o (x ': xs) 'False where +instance HasHandler i o xs => HasHandler' i o (x ': xs) 'False where handlerIdx' _ _ _ = 1 + handlerIdx (Proxy @(i -> o)) (Proxy @xs) type family IsHead (x :: Type) (xs :: [Type]) :: Bool where @@ -170,7 +182,7 @@ instance (Monad m, Binary i, Binary o, CanHandle m xs) => CanHandle m ((i -> o) -- task. The 'Double' argument is the priority, jobs with a lower priority are picked up earlier. runRemoteJob :: forall q i o m. - (MonadCatch m, MonadIO m, JobQueue q, FindHandler i o (RPC q), Binary i, Binary o) => + (MonadCatch m, MonadIO m, JobQueue q, HasHandler i o (RPC q), Binary i, Binary o) => Bool -> Pool (RedisInstance q) -> Priority -> i -> m (Either RemoteJobError o) runRemoteJob waitForWorkers pool prio a = do -- Check that there are active workers @@ -182,15 +194,19 @@ runRemoteJob waitForWorkers pool prio a = do if abort then return $ Left NoActiveWorkers else do -- Add the job - jobId <- liftIO nextRandom - let job = (jobId, handlerIdx (Proxy @(i -> o)) (Proxy @(RPC q)), encode a) + jid <- liftIO nextRandom + let job = Job + { jobId = jid + , jobHandlerIdx = handlerIdx (Proxy @(i -> o)) (Proxy @(RPC q)) + , jobInput = encode a + } -- Add to the queue and wait for the result. If any exception occurs at this point -- (which is then likely an async exception), we remove the element from the queue, -- because we will not listen to the result anymore anyway. popResult <- run pool ( do zInsert (RequestQueue @q) [(prio,job)] - lPopRightBlocking 0 (ResultBox @q jobId) + lPopRightBlocking 0 (ResultBox @q jid) ) `onException` run pool (zDelete (RequestQueue @q) job) @@ -204,12 +220,16 @@ runRemoteJob waitForWorkers pool prio a = do -- up this task. runRemoteJobAsync :: forall q i m. - (MonadCatch m, MonadIO m, JobQueue q, FindHandler i () (RPC q), Binary i) => + (MonadCatch m, MonadIO m, JobQueue q, HasHandler i () (RPC q), Binary i) => Pool (RedisInstance q) -> Priority -> i -> m () runRemoteJobAsync pool prio a = do -- Add to the queue and forget about it. - jobId <- liftIO nextRandom - let job = (jobId, handlerIdx (Proxy @(i -> ())) (Proxy @(RPC q)), encode a) + jid <- liftIO nextRandom + let job = Job + { jobId = jid + , jobHandlerIdx = handlerIdx (Proxy @(i -> ())) (Proxy @(RPC q)) + , jobInput = encode a + } run pool $ zInsert (RequestQueue @q) [(prio,job)] -- | The actual worker loop, this generalizes over 'remoteJobWorker' and 'forkRemoteJobWorker' @@ -224,14 +244,14 @@ remoteJobWorker' cont wid pool logger = doHandle (Proxy @m) (Proxy @(RPC q)) $ \ -- Main loop, pop elements from the queue and handle them loop :: m () loop = run pool (bzPopMin (RequestQueue @q) 0) >>= \case - Just (_, it@(jobId, idx, bsa)) -> do + Just (_, job) -> do -- Update the RunningJobs queue at the start and end of this block, -- and keep the workerFree var up to date bracket_ - (run pool (sInsert (RunningJobs @q) [(wid,it)]) >> liftIO (takeMVar workerFree)) - (run pool (sDelete (RunningJobs @q) [(wid,it)]) >> liftIO (putMVar workerFree ())) $ do + (run pool (sInsert (RunningJobs @q) [(wid,job)]) >> liftIO (takeMVar workerFree)) + (run pool (sDelete (RunningJobs @q) [(wid,job)]) >> liftIO (putMVar workerFree ())) $ do -- Call the actual handler - resp <- fmap Right (handler idx bsa) + resp <- fmap Right (handler (jobHandlerIdx job) (jobInput job)) `catchAll` (return . Left) @@ -240,7 +260,7 @@ remoteJobWorker' cont wid pool logger = doHandle (Proxy @m) (Proxy @(RPC q)) $ \ Left e -> Left $ RemoteJobException $ show e Right b -> Right b run pool $ do - let box = ResultBox @q jobId + let box = ResultBox @q (jobId job) lPushLeft box [bso] -- set ttl to ensure the data is not left behind in case of crashes, -- the caller should be awaiting this already, so it's either read From 9b84cd8cb18b0b2af5f67a9fe530aea33b9c2eb2 Mon Sep 17 00:00:00 2001 From: Jeroen Bransen Date: Fri, 28 Jun 2024 09:19:41 +0200 Subject: [PATCH 5/5] Export countWorkers --- src/Database/Redis/Schema/RemoteJob.hs | 1 + 1 file changed, 1 insertion(+) diff --git a/src/Database/Redis/Schema/RemoteJob.hs b/src/Database/Redis/Schema/RemoteJob.hs index ee25694..cc9c830 100644 --- a/src/Database/Redis/Schema/RemoteJob.hs +++ b/src/Database/Redis/Schema/RemoteJob.hs @@ -29,6 +29,7 @@ module Database.Redis.Schema.RemoteJob ( gracefulShutdown, -- * Inspection + countWorkers, queueLength, countRunningJobs, ) where