-
Notifications
You must be signed in to change notification settings - Fork 11
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
feat: Middleware capability introduced
- Loading branch information
1 parent
cb7e6f4
commit 7a4b89a
Showing
25 changed files
with
1,726 additions
and
98 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,207 @@ | ||
# Middleware | ||
|
||
Kstreams allows you to include middlewares for adding behavior to streams. | ||
|
||
A *middleware* is a `callable` that works with every `ConsumerRecord` (CR) *before* it is processed by a specific `stream`. Also *after* the `CR` has been handled `Middlewares` also have access to the `stream`, `send` function. | ||
|
||
- It takes each `CR` that arrives to a topic. | ||
- It can then do something to the `CR` or run any needed code. | ||
- Then it passes the `CR` to be processed by the specific stream. | ||
- Once the `CR` is processed by the stream, the chain is "completed". | ||
- If there is code after the `self.next_call(cr)` then it will be executed. | ||
|
||
Kstreams `Middleware` have the following protocol: | ||
|
||
::: kstreams.middleware.middleware.Middleware | ||
|
||
## Creating a middleware | ||
|
||
To create a middleware you have to create a class that inherits from `BaseMiddleware`. Then, the method `async def __call__` must be defined. Let's consider that we want to save the CR to `elastic` before it is processed: | ||
|
||
```python | ||
from kstreams import ConsumerRecord, middleware | ||
|
||
async def save_to_elastic(cr: ConsumerRecord) -> None: | ||
... | ||
|
||
|
||
class ElasticMiddleware(middleware.BaseMiddleware): | ||
async def __call__(self, cr: ConsumerRecord) -> None: | ||
# save to elastic before calling the next | ||
await save_to_elastic(cr) | ||
|
||
# the next call could be another middleware | ||
await self.next_call(cr) | ||
``` | ||
|
||
Then, we have to include the middleware. It can be per `stream` or per application level (`stream_engine`): | ||
|
||
=== "Include middleware in stream" | ||
```python | ||
from kstreams import ConsumerRecord, middleware | ||
|
||
from .engine import stream_engine | ||
|
||
|
||
middlewares = [middleware.MiddlewareFactory(ElasticMiddleware)] | ||
|
||
@stream_engine.stream("kstreams-topic", middlewares=middlewares) | ||
async def processor(cr: ConsumerRecord): | ||
... | ||
``` | ||
=== "Include middleware at application level" | ||
```python | ||
from kstreams import ConsumerRecord, middleware, create | ||
|
||
middlewares = [middleware.MiddlewareFactory(ElasticMiddleware)] | ||
|
||
stream_engine = kstreams.create_engine( | ||
title="my-stream-engine", | ||
middlewares=middlewares | ||
) | ||
``` | ||
|
||
!!! note | ||
The example middlewares make sense to include it at the application level, but other ones like Dead Letter Queue (DLQ) make sense to add them at stream level | ||
|
||
## Adding options to the middleware | ||
|
||
If you want to provide configuration options to the middleware class you should override the __init__ method, ensuring that is contains the keywargs `next_call`, `send` and `stream`, then any remaining are optional keyword arguments. | ||
|
||
Let's consider that we want to send an event to a spcific topic when a `ValueError` is raised inside a `stream` (Dead Letter Queue) | ||
|
||
```python | ||
from kstreams import ConsumerRecord, types, Stream | ||
|
||
|
||
class DLQMiddleware: | ||
def __init__( | ||
self, | ||
*, | ||
next_call: types.NextMiddlewareCall, | ||
send: types.Send, | ||
stream: Stream, | ||
topic: str, | ||
) -> None: | ||
self.next_call = next_call | ||
self.send = send | ||
self.stream = stream | ||
self.topic = topic | ||
|
||
async def __call__(self, cr: ConsumerRecord) -> None: | ||
try: | ||
await self.next_call(cr) | ||
except ValueError: | ||
await self.send(self.topic, key=cr.key, value=cr.value) | ||
|
||
|
||
# Create the middlewares | ||
middlewares = [ | ||
middleware.MiddlewareFactory( | ||
DLQMiddleware, topic="kstreams-dlq-topic" | ||
) | ||
] | ||
|
||
@stream_engine.stream("kstreams-topic", middlewares=middlewares) | ||
async def processor(cr: ConsumerRecord): | ||
if cr.value == b"joker": | ||
raise ValueError("Joker received...") | ||
``` | ||
|
||
## Middleware by default | ||
|
||
Kstreams includes one middleware by default, `ExceptionMiddleware`. This middleware adds exception handlers, for particular types of expected exception cases, for example when the `Consumer` stops (kafka disconnects), user presses `CTRL+C` or any other exception that could cause the `stream` to crash. | ||
|
||
::: kstreams.middleware.middleware.ExceptionMiddleware | ||
|
||
## Middleware chain | ||
|
||
It is possible to add as many middlewares as you want in order to split and reuse business logic, however the downside is extra complexity and the code might become slower. Also, there are some points to take into account: | ||
|
||
- The order when adding middlewares is important. | ||
- If middlewares are added to a `stream` and `stream_engine`, then the middleware stack is build first with `stream` middlewares and then with the `stream_engines` middlewares. This means the first the `stream` middlewares are evaluated first. | ||
|
||
|
||
In the example we are adding three middelwares in the following order: `DLQMiddleware`, `ElasticMiddleware`, and `S3Middleware`. The code chain execution will be: | ||
|
||
`topic event` --> `ExceptionMiddleware` --> `DLQMiddleware` --> `ElasticMiddleware` --> `S3Middleware` --> `processor` | ||
|
||
```python title="Multiple middlewares example" | ||
from kstreams import ConsumerRecord, Stream, middleware | ||
|
||
|
||
class DLQMiddleware(middleware.BaseMiddleware): | ||
async def __call__(self, cr: ConsumerRecord) -> None: | ||
try: | ||
await self.next_call(cr) | ||
except ValueError: | ||
await dlq(cr.value) | ||
|
||
|
||
class ElasticMiddleware(middleware.BaseMiddleware): | ||
async def __call__(self, cr: ConsumerRecord) -> None: | ||
await save_to_elastic(cr.value) | ||
await self.next_call(cr) | ||
|
||
|
||
class S3Middleware(middleware.BaseMiddleware): | ||
async def __call__(self, cr: ConsumerRecord) -> None: | ||
await backup_to_s3(cr.value) | ||
await self.next_call(cr) | ||
|
||
|
||
middlewares = [ | ||
middleware.MiddlewareFactory(DLQMiddleware), | ||
middleware.MiddlewareFactory(ElasticMiddleware), | ||
middleware.MiddlewareFactory(S3Middleware), | ||
] | ||
|
||
|
||
@stream_engine.stream("kstreams-topic", middlewares=middlewares) | ||
async def processor(cr: ConsumerRecord): | ||
if cr.value == event_2: | ||
raise ValueError("Error from stream...") | ||
await save_to_db(cr.value) | ||
``` | ||
|
||
!!! note | ||
In the example we can see that always the `cr` will be save into `elastic` and `s3` regardless an error | ||
|
||
## Executing Code after the CR was processed | ||
|
||
As mentioned in the introduction, it is possible to execute code after the `CR` is handled. To do this, we need to place code after `next_call` is called: | ||
|
||
```python title="Execute code after CR is handled" | ||
from kstreams import ConsumerRecord, Stream, middleware | ||
|
||
|
||
class DLQMiddleware(middleware.BaseMiddleware): | ||
async def __call__(self, cr: ConsumerRecord) -> None: | ||
try: | ||
await self.next_call(cr) | ||
except ValueError: | ||
await dlq(cr.value) | ||
|
||
|
||
class ElasticMiddleware(middleware.BaseMiddleware): | ||
async def __call__(self, cr: ConsumerRecord) -> None: | ||
await self.next_call(cr) | ||
# This will be called after the whole chain has finished | ||
await save_to_elastic(cr.value) | ||
|
||
|
||
middlewares = [ | ||
middleware.MiddlewareFactory(DLQMiddleware), | ||
middleware.MiddlewareFactory(ElasticMiddleware), | ||
] | ||
|
||
|
||
@stream_engine.stream("kstreams-topic", middlewares=middlewares) | ||
async def processor(cr: ConsumerRecord): | ||
if cr.value == event_2: | ||
raise ValueError("Error from stream...") | ||
await save_to_db(cr.value) | ||
``` | ||
|
||
!!! note | ||
In the example we can see that only if there is not an `error` the event is saved to `elastic` |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,46 @@ | ||
# Dead Letter Queue (DLQ) middleware Example | ||
|
||
## Requirements | ||
|
||
python 3.8+, poetry, docker-compose | ||
|
||
## Installation | ||
|
||
```bash | ||
poetry install | ||
``` | ||
|
||
## Explanation | ||
|
||
This shows how the `Middleware` concept can be applied to use a `DLQ`. In this example every time that the `ValueError` exception is raised inside the `stream`, meaning that the event was nor processed, the middleware will send an event to the topic `kstreams--dlq-topic`. | ||
|
||
## Usage | ||
|
||
1. Start the kafka cluster: From `kstreams` project root execute `./scripts/cluster/start` | ||
2. Inside this folder execute `poetry run app` | ||
3. From `kstreams` project root, you can use the `./scripts/cluster/events/send` to send events to the kafka cluster. A prompt will open. Enter messages to send. The command is: | ||
```bash | ||
./scripts/cluster/events/send "local--hello-world" | ||
``` | ||
|
||
Then, on the consume side, you should see something similar to the following logs: | ||
|
||
```bash | ||
❯ me@me-pc middleware-example % poetry run app | ||
|
||
Group Coordinator Request failed: [Error 15] CoordinatorNotAvailableError | ||
Group Coordinator Request failed: [Error 15] CoordinatorNotAvailableError | ||
Group Coordinator Request failed: [Error 15] CoordinatorNotAvailableError | ||
Group Coordinator Request failed: [Error 15] CoordinatorNotAvailableError | ||
Group Coordinator Request failed: [Error 15] CoordinatorNotAvailableError | ||
Consumer started | ||
Event consumed: headers: (), payload: ConsumerRecord(topic='local--hello-world', partition=0, offset=0, timestamp=1660733921761, timestamp_type=0, key=None, value=b'boo', checksum=None, serialized_key_size=-1, serialized_value_size=3, headers=()) | ||
``` | ||
|
||
4. Consume from the topic `kstreams--dlq-topic`: `./scripts/cluster/events/read local--kstreams` | ||
5. Procude the event `joker` using the terminal opened in step `3`. Then check the terminal opened in the previous step and the event `joker` must appear | ||
|
||
## Note | ||
|
||
If you plan on using this example, pay attention to the `pyproject.toml` dependencies, where | ||
`kstreams` is pointing to the parent folder. You will have to set the latest version. |
Empty file.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,52 @@ | ||
import aiorun | ||
|
||
from kstreams import ConsumerRecord, Stream, create_engine, middleware, types | ||
|
||
|
||
class DLQMiddleware: | ||
def __init__( | ||
self, | ||
*, | ||
next_call: types.NextMiddlewareCall, | ||
send: types.Send, | ||
stream: Stream, | ||
topic: str, | ||
) -> None: | ||
self.next_call = next_call | ||
self.send = send | ||
self.stream = stream | ||
self.topic = topic | ||
|
||
async def __call__(self, cr: ConsumerRecord) -> None: | ||
try: | ||
print("here....") | ||
await self.next_call(cr) | ||
except ValueError: | ||
print(f"\n Producing event {cr.value} to DLQ topic {self.topic} \n") | ||
await self.send(self.topic, key=cr.key, value=cr.value) | ||
|
||
|
||
middlewares = [ | ||
middleware.MiddlewareFactory(DLQMiddleware, topic="kstreams--dlq-topic"), | ||
] | ||
stream_engine = create_engine(title="my-stream-engine") | ||
|
||
|
||
@stream_engine.stream(topics=["local--hello-world"], middlewares=middlewares) | ||
async def consume(cr: ConsumerRecord): | ||
print(f"Event consumed: headers: {cr.headers}, payload: {cr}") | ||
if cr.value == b"joker": | ||
raise ValueError("Stream crashed 🤡 🤡 🤡 🤡 🤡 🤡 🤡 🤡") | ||
print("evet has been proccesses") | ||
|
||
|
||
async def start(): | ||
await stream_engine.start() | ||
|
||
|
||
async def shutdown(loop): | ||
await stream_engine.stop() | ||
|
||
|
||
def main(): | ||
aiorun.run(start(), stop_on_unhandled_errors=True, shutdown_callback=shutdown) |
Oops, something went wrong.