Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Database.Redis.Schema.RemoteJob #8

Merged
merged 5 commits into from
Jul 15, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
75 changes: 73 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -894,8 +894,79 @@ how a library can be implemented on top of `Database.Redis.Schema`.

### Remote jobs

Sadly, this library has not been published yet.
We'd like to, though.
In `Database.Redis.Schema.RemoteJob` a Redis-based worker queue is implemented, to run CPU
intensive jobs on remote machines. The queue is strongly typed, and can contain multiple
different jobs to be executed, with priorities, that workers can pick up.

As an example, we define a queue that can contain three types of jobs:
```haskell
newtype ComputeFactorial = CF Integer deriving ( Binary )
newtype ComputeSquare = CS Integer deriving ( Binary )

data MyQueue
instance JobQueue MyQueue where
type RPC MyQueue =
'[ ComputeFactorial -> Integer
, ComputeSquare -> Integer
, String -> String
]
keyPrefix = "myqueue"
```
Here the `MyQueue` type is used only during compile time to let the compiler find the right
instances. To distinguish between the two `Integer -> Integer` functions, we wrap them in
newtypes. A `Binary` instance must exist for all inputs and outputs, so that they can be put
into Redis.

Based on this queue, we can now define a worker that executes the jobs. This worker must
define a function for each the the types in `RPC`, and runs in a monadic context (which
we fixed to `IO` for the example).

```haskell
fac :: ComputeFactorial -> IO Integer
fac (CF n) = do
putStrLn $ "Computing the factorial of " ++ show n
pure $ product [1..n]

sm :: ComputeSquare -> IO Integer
sm (CS n) = pure $ n * n

runWorker :: IO ()
runWorker = do
pool <- connect "redis:///" 10
let myId = "localworker"
let err e = error $ "Something went wrong: " ++ show e
remoteJobWorker @MyQueue myId pool err fac sm (pure . reverse)
```
The arguments to `remoteJobWorker` are a unique identifier for this worker (for counting
the workers, executing jobs will work fine even with overlapping ids), a connection pool,
a logging function for exceptions, and then for each element in `RPC` the right function.

Now if we call `runWorker` it will block until work needs to be done, and it will never
return except when an async exception is thrown. In production cases it is adviced to use
`withRemoteJobWorker` instead, which forks off a worker thread and provides a `WorkerHandle`
to it's continuation, which can be passed to `gracefulShutdown` to handle the currently
running job and then gracefully return.

Now from another process or even other machine we can 'execute jobs', e.g. add them to the
queue and synchronously wait for their result. For example:
```haskell
runJobs :: IO ()
runJobs = do
pool <- connect "redis:///" 10
a <- runRemoteJob @MyQueue @String @String False pool 1 "test"
print a
b <- runRemoteJob @MyQueue @ComputeFactorial @Integer False pool 1 (CF 5)
matus-chordify marked this conversation as resolved.
Show resolved Hide resolved
print b
```
This will print:
```ghci> runJobs
Right "tset"
Right 120
```
The underlying Redis implementation is based on blocking reads from sorted sets (`BZPOPMIN`),
which is concurrency safe and no polling is needed. An arbitrary amount of workers can be run
and jobs can be executed from arbitrary machines. Only the `countWorkers` implementation
is based on a keep-alive loop on the workers, to properly deal with TCP connection losses.

## Future work

Expand Down
1 change: 1 addition & 0 deletions package.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ dependencies:
- bytestring
- time
- mtl
- monadIO
- random
- exceptions
- hedis
Expand Down
6 changes: 4 additions & 2 deletions redis-schema.cabal
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
cabal-version: 1.12

-- This file has been generated from package.yaml by hpack version 0.34.7.
-- This file has been generated from package.yaml by hpack version 0.35.0.
--
-- see: https://github.com/sol/hpack
--
-- hash: 8ad979f047b1d31267791ddddc75577141d0b1f972f265c586894ab5f99c498c
-- hash: 1876630159ac153904237e5ab7ec2b440a07213ac8ca4b6420a02fff46e1524d

name: redis-schema
version: 0.1.0
Expand All @@ -31,6 +31,7 @@ library
exposed-modules:
Database.Redis.Schema
Database.Redis.Schema.Lock
Database.Redis.Schema.RemoteJob
other-modules:
Paths_redis_schema
hs-source-dirs:
Expand All @@ -45,6 +46,7 @@ library
, containers
, exceptions
, hedis
, monadIO
, mtl
, numeric-limits
, random
Expand Down
Loading
Loading