Skip to content

Commit

Permalink
Merge pull request #8 from chordify/remotejob
Browse files Browse the repository at this point in the history
Add Database.Redis.Schema.RemoteJob
  • Loading branch information
isomorpheme authored Jul 15, 2024
2 parents b7c327d + 9b84cd8 commit fad7520
Show file tree
Hide file tree
Showing 5 changed files with 457 additions and 4 deletions.
75 changes: 73 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -894,8 +894,79 @@ how a library can be implemented on top of `Database.Redis.Schema`.

### Remote jobs

Sadly, this library has not been published yet.
We'd like to, though.
In `Database.Redis.Schema.RemoteJob` a Redis-based worker queue is implemented, to run CPU
intensive jobs on remote machines. The queue is strongly typed, and can contain multiple
different jobs to be executed, with priorities, that workers can pick up.

As an example, we define a queue that can contain three types of jobs:
```haskell
newtype ComputeFactorial = CF Integer deriving ( Binary )
newtype ComputeSquare = CS Integer deriving ( Binary )

data MyQueue
instance JobQueue MyQueue where
type RPC MyQueue =
'[ ComputeFactorial -> Integer
, ComputeSquare -> Integer
, String -> String
]
keyPrefix = "myqueue"
```
Here the `MyQueue` type is used only during compile time to let the compiler find the right
instances. To distinguish between the two `Integer -> Integer` functions, we wrap them in
newtypes. A `Binary` instance must exist for all inputs and outputs, so that they can be put
into Redis.

Based on this queue, we can now define a worker that executes the jobs. This worker must
define a function for each the the types in `RPC`, and runs in a monadic context (which
we fixed to `IO` for the example).

```haskell
fac :: ComputeFactorial -> IO Integer
fac (CF n) = do
putStrLn $ "Computing the factorial of " ++ show n
pure $ product [1..n]

sm :: ComputeSquare -> IO Integer
sm (CS n) = pure $ n * n

runWorker :: IO ()
runWorker = do
pool <- connect "redis:///" 10
let myId = "localworker"
let err e = error $ "Something went wrong: " ++ show e
remoteJobWorker @MyQueue myId pool err fac sm (pure . reverse)
```
The arguments to `remoteJobWorker` are a unique identifier for this worker (for counting
the workers, executing jobs will work fine even with overlapping ids), a connection pool,
a logging function for exceptions, and then for each element in `RPC` the right function.

Now if we call `runWorker` it will block until work needs to be done, and it will never
return except when an async exception is thrown. In production cases it is adviced to use
`withRemoteJobWorker` instead, which forks off a worker thread and provides a `WorkerHandle`
to it's continuation, which can be passed to `gracefulShutdown` to handle the currently
running job and then gracefully return.

Now from another process or even other machine we can 'execute jobs', e.g. add them to the
queue and synchronously wait for their result. For example:
```haskell
runJobs :: IO ()
runJobs = do
pool <- connect "redis:///" 10
a <- runRemoteJob @MyQueue @String @String False pool 1 "test"
print a
b <- runRemoteJob @MyQueue @ComputeFactorial @Integer False pool 1 (CF 5)
print b
```
This will print:
```ghci> runJobs
Right "tset"
Right 120
```
The underlying Redis implementation is based on blocking reads from sorted sets (`BZPOPMIN`),
which is concurrency safe and no polling is needed. An arbitrary amount of workers can be run
and jobs can be executed from arbitrary machines. Only the `countWorkers` implementation
is based on a keep-alive loop on the workers, to properly deal with TCP connection losses.

## Future work

Expand Down
1 change: 1 addition & 0 deletions package.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ dependencies:
- bytestring
- time
- mtl
- monadIO
- random
- exceptions
- hedis
Expand Down
6 changes: 4 additions & 2 deletions redis-schema.cabal
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
cabal-version: 1.12

-- This file has been generated from package.yaml by hpack version 0.34.7.
-- This file has been generated from package.yaml by hpack version 0.35.0.
--
-- see: https://github.com/sol/hpack
--
-- hash: 8ad979f047b1d31267791ddddc75577141d0b1f972f265c586894ab5f99c498c
-- hash: 1876630159ac153904237e5ab7ec2b440a07213ac8ca4b6420a02fff46e1524d

name: redis-schema
version: 0.1.0
Expand All @@ -31,6 +31,7 @@ library
exposed-modules:
Database.Redis.Schema
Database.Redis.Schema.Lock
Database.Redis.Schema.RemoteJob
other-modules:
Paths_redis_schema
hs-source-dirs:
Expand All @@ -45,6 +46,7 @@ library
, containers
, exceptions
, hedis
, monadIO
, mtl
, numeric-limits
, random
Expand Down
Loading

0 comments on commit fad7520

Please sign in to comment.