-
Notifications
You must be signed in to change notification settings - Fork 11
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
feat: Middleware capability introduced
- Loading branch information
1 parent
3ae25c3
commit e472b0d
Showing
25 changed files
with
1,678 additions
and
100 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,207 @@ | ||
# Middleware | ||
|
||
Kstreams allows you to include middlewares for adding behavior to streams. | ||
|
||
A *middleware* is a `callable` that works with every `ConsumerRecord` (CR) *before* and *after* it is processed by a specific `stream`. `Middlewares` also have access to the `stream` and `send` function. | ||
|
||
- It takes each `CR` that arrives to a `kafka topic`. | ||
- Then it can do something to the `CR` or run any needed code. | ||
- Then it passes the `CR` to be processed by another `callable` (other middleware or stream). | ||
- Once the `CR` is processed by the stream, the chain is "completed". | ||
- If there is code after the `self.next_call(cr)` then it will be executed. | ||
|
||
Kstreams `Middleware` have the following protocol: | ||
|
||
::: kstreams.middleware.middleware.MiddlewareProtocol | ||
|
||
!!! note | ||
The `__call__` method can return anything so previous calls can use the returned value. Make sure that the line `return await self.next_call(cr)` is in your method | ||
|
||
!!! warning | ||
Middlewares only work with the new [Dependency Injection approach](https://kpn.github.io/kstreams/stream/#dependency-injection-and-typing) | ||
|
||
## Creating a middleware | ||
|
||
To create a middleware you have to create a class that inherits from `BaseMiddleware`. Then, the method `async def __call__` must be defined. Let's consider that we want to save the CR to `elastic` before it is processed: | ||
|
||
```python | ||
import typing | ||
|
||
from kstreams import ConsumerRecord, middleware | ||
|
||
async def save_to_elastic(cr: ConsumerRecord) -> None: | ||
... | ||
|
||
|
||
class ElasticMiddleware(middleware.BaseMiddleware): | ||
async def __call__(self, cr: ConsumerRecord) -> typing.Any: | ||
# save to elastic before calling the next | ||
await save_to_elastic(cr) | ||
|
||
# the next call could be another middleware | ||
return await self.next_call(cr) | ||
``` | ||
|
||
Then, we have to include the middleware: | ||
|
||
```python | ||
from kstreams import ConsumerRecord, middleware | ||
|
||
from .engine import stream_engine | ||
|
||
|
||
middlewares = [middleware.Middleware(ElasticMiddleware)] | ||
|
||
@stream_engine.stream("kstreams-topic", middlewares=middlewares) | ||
async def processor(cr: ConsumerRecord): | ||
... | ||
``` | ||
|
||
!!! note | ||
The `Middleware` concept also applies for `async generators` (yield from a stream) | ||
|
||
## Adding extra configuration to middlewares | ||
|
||
If you want to provide extra configuration to middleware you should override the __init__ method with the extra options as `keywargs` and then call `super().__init__(**kwargs)` | ||
|
||
Let's consider that we want to send an event to a spcific `topic` when a `ValueError` is raised inside a `stream` (Dead Letter Queue) | ||
|
||
```python | ||
from kstreams import ConsumerRecord, types, Stream, middleware | ||
|
||
|
||
class DLQMiddleware(middleware.BaseMiddleware): | ||
def __init__(self, *, topic: str, **kwargs) -> None: | ||
super().__init__(**kwargs) | ||
self.topic = topic | ||
|
||
async def __call__(self, cr: ConsumerRecord): | ||
try: | ||
return await self.next_call(cr) | ||
except ValueError: | ||
await self.send(self.topic, key=cr.key, value=cr.value) | ||
|
||
|
||
# Create the middlewares | ||
middlewares = [ | ||
middleware.Middleware( | ||
DLQMiddleware, topic="kstreams-dlq-topic" | ||
) | ||
] | ||
|
||
@stream_engine.stream("kstreams-topic", middlewares=middlewares) | ||
async def processor(cr: ConsumerRecord): | ||
if cr.value == b"joker": | ||
raise ValueError("Joker received...") | ||
``` | ||
|
||
## Middleware by default | ||
|
||
Kstreams includes one middleware by default, `ExceptionMiddleware`. This middleware adds exception handlers, for particular types of expected exception cases, for example when the `Consumer` stops (kafka disconnects), user presses `CTRL+C` or any other exception that could cause the `stream` to crash. | ||
|
||
::: kstreams.middleware.middleware.ExceptionMiddleware | ||
|
||
## Middleware chain | ||
|
||
It is possible to add as many middlewares as you want to split and reuse business logic, however the downside is extra complexity and the code might become slower. The middleware order is important as they are evaluated in the order that were placed in the stream. | ||
|
||
In the following example we are adding three middelwares in the following order: `DLQMiddleware`, `ElasticMiddleware`, and `S3Middleware`. The code chain execution will be: | ||
|
||
```mermaid | ||
sequenceDiagram | ||
autonumber | ||
ExceptionMiddleware->>DLQMiddleware: | ||
Note left of ExceptionMiddleware: Event received | ||
alt No Processing Error | ||
DLQMiddleware->>ElasticMiddleware: | ||
Note right of ElasticMiddleware: Store CR on Elastic | ||
ElasticMiddleware->>S3Middleware: | ||
Note right of S3Middleware: Store CR on S3 | ||
S3Middleware->>Stream: | ||
Note right of Stream: CR processed | ||
Stream-->>S3Middleware: | ||
S3Middleware-->>ElasticMiddleware: | ||
ElasticMiddleware-->>DLQMiddleware: | ||
DLQMiddleware-->>ExceptionMiddleware: | ||
end | ||
``` | ||
|
||
```python title="Multiple middlewares example" | ||
from kstreams import ConsumerRecord, Stream, middleware | ||
|
||
|
||
class DLQMiddleware(middleware.BaseMiddleware): | ||
async def __call__(self, cr: ConsumerRecord): | ||
try: | ||
return await self.next_call(cr) | ||
except ValueError: | ||
await dlq(cr.value) | ||
|
||
|
||
class ElasticMiddleware(middleware.BaseMiddleware): | ||
async def __call__(self, cr: ConsumerRecord): | ||
await save_to_elastic(cr.value) | ||
return await self.next_call(cr) | ||
|
||
|
||
class S3Middleware(middleware.BaseMiddleware): | ||
async def __call__(self, cr: ConsumerRecord): | ||
await backup_to_s3(cr.value) | ||
return await self.next_call(cr) | ||
|
||
|
||
middlewares = [ | ||
middleware.Middleware(DLQMiddleware), | ||
middleware.Middleware(ElasticMiddleware), | ||
middleware.Middleware(S3Middleware), | ||
] | ||
|
||
|
||
@stream_engine.stream("kstreams-topic", middlewares=middlewares) | ||
async def processor(cr: ConsumerRecord): | ||
if cr.value == event_2: | ||
raise ValueError("Error from stream...") | ||
await save_to_db(cr.value) | ||
``` | ||
|
||
!!! note | ||
In the example we can see that always the `cr` will be save into `elastic` and `s3` regardless an error | ||
|
||
## Executing Code after the CR was processed | ||
|
||
As mentioned in the introduction, it is possible to execute code after the `CR` is handled. To do this, we need to place code after `next_call` is called: | ||
|
||
```python title="Execute code after CR is handled" | ||
from kstreams import ConsumerRecord, Stream, middleware | ||
|
||
|
||
class DLQMiddleware(middleware.BaseMiddleware): | ||
async def __call__(self, cr: ConsumerRecord): | ||
try: | ||
return await self.next_call(cr) | ||
except ValueError: | ||
await dlq(cr.value) | ||
|
||
|
||
class ElasticMiddleware(middleware.BaseMiddleware): | ||
async def __call__(self, cr: ConsumerRecord): | ||
return await self.next_call(cr) | ||
# This will be called after the whole chain has finished | ||
await save_to_elastic(cr.value) | ||
|
||
|
||
middlewares = [ | ||
middleware.Middleware(DLQMiddleware), | ||
middleware.Middleware(ElasticMiddleware), | ||
] | ||
|
||
|
||
@stream_engine.stream("kstreams-topic", middlewares=middlewares) | ||
async def processor(cr: ConsumerRecord): | ||
if cr.value == event_2: | ||
raise ValueError("Error from stream...") | ||
await save_to_db(cr.value) | ||
``` | ||
|
||
!!! note | ||
In the example we can see that only if there is not an `error` the event is saved to `elastic` |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,46 @@ | ||
# Dead Letter Queue (DLQ) middleware Example | ||
|
||
## Requirements | ||
|
||
python 3.8+, poetry, docker-compose | ||
|
||
## Installation | ||
|
||
```bash | ||
poetry install | ||
``` | ||
|
||
## Explanation | ||
|
||
This shows how the `Middleware` concept can be applied to use a `DLQ`. In this example every time that the `ValueError` exception is raised inside the `stream`, meaning that the event was nor processed, the middleware will send an event to the topic `kstreams--dlq-topic`. | ||
|
||
## Usage | ||
|
||
1. Start the kafka cluster: From `kstreams` project root execute `./scripts/cluster/start` | ||
2. Inside this folder execute `poetry run app` | ||
3. From `kstreams` project root, you can use the `./scripts/cluster/events/send` to send events to the kafka cluster. A prompt will open. Enter messages to send. The command is: | ||
```bash | ||
./scripts/cluster/events/send "local--hello-world" | ||
``` | ||
|
||
Then, on the consume side, you should see something similar to the following logs: | ||
|
||
```bash | ||
❯ me@me-pc middleware-example % poetry run app | ||
|
||
Group Coordinator Request failed: [Error 15] CoordinatorNotAvailableError | ||
Group Coordinator Request failed: [Error 15] CoordinatorNotAvailableError | ||
Group Coordinator Request failed: [Error 15] CoordinatorNotAvailableError | ||
Group Coordinator Request failed: [Error 15] CoordinatorNotAvailableError | ||
Group Coordinator Request failed: [Error 15] CoordinatorNotAvailableError | ||
Consumer started | ||
Event consumed: headers: (), payload: ConsumerRecord(topic='local--hello-world', partition=0, offset=0, timestamp=1660733921761, timestamp_type=0, key=None, value=b'boo', checksum=None, serialized_key_size=-1, serialized_value_size=3, headers=()) | ||
``` | ||
|
||
4. Consume from the topic `kstreams--dlq-topic`: `./scripts/cluster/events/read kstreams--dlq-topic` | ||
5. Produce the event `joker` using the terminal opened in step `3`. Then check the terminal opened in the previous step and the event `joker` must appear | ||
|
||
## Note | ||
|
||
If you plan on using this example, pay attention to the `pyproject.toml` dependencies, where | ||
`kstreams` is pointing to the parent folder. You will have to set the latest version. |
Empty file.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,43 @@ | ||
import aiorun | ||
|
||
from kstreams import ConsumerRecord, create_engine, middleware | ||
|
||
|
||
class DLQMiddleware(middleware.BaseMiddleware): | ||
def __init__(self, *, topic: str, **kwargs) -> None: | ||
super().__init__(**kwargs) | ||
self.topic = topic | ||
|
||
async def __call__(self, cr: ConsumerRecord): | ||
try: | ||
return await self.next_call(cr) | ||
except ValueError as exc: | ||
print(f"\n Event crashed because {str(exc)} \n") | ||
print(f"\n Producing event {cr.value} to DLQ topic {self.topic} \n") | ||
await self.send(self.topic, key=cr.key, value=cr.value) | ||
|
||
|
||
middlewares = [ | ||
middleware.Middleware(DLQMiddleware, topic="kstreams--dlq-topic"), | ||
] | ||
stream_engine = create_engine(title="my-stream-engine") | ||
|
||
|
||
@stream_engine.stream(topics=["local--hello-world"], middlewares=middlewares) | ||
async def consume(cr: ConsumerRecord): | ||
print(f"Event consumed: headers: {cr.headers}, payload: {cr}") | ||
if cr.value == b"joker": | ||
raise ValueError("🤡 🤡 🤡 🤡 🤡 🤡 🤡 🤡") | ||
print("Event has been proccesses") | ||
|
||
|
||
async def start(): | ||
await stream_engine.start() | ||
|
||
|
||
async def shutdown(loop): | ||
await stream_engine.stop() | ||
|
||
|
||
def main(): | ||
aiorun.run(start(), stop_on_unhandled_errors=True, shutdown_callback=shutdown) |
Oops, something went wrong.