Skip to content

Commit

Permalink
feat: Middleware capability introduced
Browse files Browse the repository at this point in the history
  • Loading branch information
marcosschroh committed Jan 23, 2024
1 parent 96fc6ff commit d79b27a
Show file tree
Hide file tree
Showing 25 changed files with 1,679 additions and 101 deletions.
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ if __name__ == "__main__":
- [x] Easy to integrate with any `async` framework. No tied to any library!!
- [x] Yield events from streams
- [x] [Opentelemetry Instrumentation](https://github.com/kpn/opentelemetry-instrumentation-kstreams)
- [x] Middlewares
- [ ] Store (kafka streams pattern)
- [ ] Stream Join
- [ ] Windowing
Expand Down
207 changes: 207 additions & 0 deletions docs/middleware.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,207 @@
# Middleware

Kstreams allows you to include middlewares for adding behavior to streams.

A *middleware* is a `callable` that works with every `ConsumerRecord` (CR) *before* and *after* it is processed by a specific `stream`. `Middlewares` also have access to the `stream` and `send` function.

- It takes each `CR` that arrives to a `kafka topic`.
- Then it can do something to the `CR` or run any needed code.
- Then it passes the `CR` to be processed by another `callable` (other middleware or stream).
- Once the `CR` is processed by the stream, the chain is "completed".
- If there is code after the `self.next_call(cr)` then it will be executed.

Kstreams `Middleware` have the following protocol:

::: kstreams.middleware.middleware.MiddlewareProtocol

!!! note
The `__call__` method can return anything so previous calls can use the returned value. Make sure that the line `return await self.next_call(cr)` is in your method

!!! warning
Middlewares only work with the new [Dependency Injection approach](https://kpn.github.io/kstreams/stream/#dependency-injection-and-typing)

## Creating a middleware

To create a middleware you have to create a class that inherits from `BaseMiddleware`. Then, the method `async def __call__` must be defined. Let's consider that we want to save the CR to `elastic` before it is processed:

```python
import typing

from kstreams import ConsumerRecord, middleware

async def save_to_elastic(cr: ConsumerRecord) -> None:
...


class ElasticMiddleware(middleware.BaseMiddleware):
async def __call__(self, cr: ConsumerRecord) -> typing.Any:
# save to elastic before calling the next
await save_to_elastic(cr)

# the next call could be another middleware
return await self.next_call(cr)
```

Then, we have to include the middleware:

```python
from kstreams import ConsumerRecord, middleware

from .engine import stream_engine


middlewares = [middleware.Middleware(ElasticMiddleware)]

@stream_engine.stream("kstreams-topic", middlewares=middlewares)
async def processor(cr: ConsumerRecord):
...
```

!!! note
The `Middleware` concept also applies for `async generators` (yield from a stream)

## Adding extra configuration to middlewares

If you want to provide extra configuration to middleware you should override the __init__ method with the extra options as `keywargs` and then call `super().__init__(**kwargs)`

Let's consider that we want to send an event to a spcific `topic` when a `ValueError` is raised inside a `stream` (Dead Letter Queue)

```python
from kstreams import ConsumerRecord, types, Stream, middleware


class DLQMiddleware(middleware.BaseMiddleware):
def __init__(self, *, topic: str, **kwargs) -> None:
super().__init__(**kwargs)
self.topic = topic

async def __call__(self, cr: ConsumerRecord):
try:
return await self.next_call(cr)
except ValueError:
await self.send(self.topic, key=cr.key, value=cr.value)


# Create the middlewares
middlewares = [
middleware.Middleware(
DLQMiddleware, topic="kstreams-dlq-topic"
)
]

@stream_engine.stream("kstreams-topic", middlewares=middlewares)
async def processor(cr: ConsumerRecord):
if cr.value == b"joker":
raise ValueError("Joker received...")
```

## Middleware by default

Kstreams includes one middleware by default, `ExceptionMiddleware`. This middleware adds exception handlers, for particular types of expected exception cases, for example when the `Consumer` stops (kafka disconnects), user presses `CTRL+C` or any other exception that could cause the `stream` to crash.

::: kstreams.middleware.middleware.ExceptionMiddleware

## Middleware chain

It is possible to add as many middlewares as you want to split and reuse business logic, however the downside is extra complexity and the code might become slower. The middleware order is important as they are evaluated in the order that were placed in the stream.

In the following example we are adding three middelwares in the following order: `DLQMiddleware`, `ElasticMiddleware`, and `S3Middleware`. The code chain execution will be:

```mermaid
sequenceDiagram
autonumber
ExceptionMiddleware->>DLQMiddleware:
Note left of ExceptionMiddleware: Event received
alt No Processing Error
DLQMiddleware->>ElasticMiddleware:
Note right of ElasticMiddleware: Store CR on Elastic
ElasticMiddleware->>S3Middleware:
Note right of S3Middleware: Store CR on S3
S3Middleware->>Stream:
Note right of Stream: CR processed
Stream-->>S3Middleware:
S3Middleware-->>ElasticMiddleware:
ElasticMiddleware-->>DLQMiddleware:
DLQMiddleware-->>ExceptionMiddleware:
end
```

```python title="Multiple middlewares example"
from kstreams import ConsumerRecord, Stream, middleware


class DLQMiddleware(middleware.BaseMiddleware):
async def __call__(self, cr: ConsumerRecord):
try:
return await self.next_call(cr)
except ValueError:
await dlq(cr.value)


class ElasticMiddleware(middleware.BaseMiddleware):
async def __call__(self, cr: ConsumerRecord):
await save_to_elastic(cr.value)
return await self.next_call(cr)


class S3Middleware(middleware.BaseMiddleware):
async def __call__(self, cr: ConsumerRecord):
await backup_to_s3(cr.value)
return await self.next_call(cr)


middlewares = [
middleware.Middleware(DLQMiddleware),
middleware.Middleware(ElasticMiddleware),
middleware.Middleware(S3Middleware),
]


@stream_engine.stream("kstreams-topic", middlewares=middlewares)
async def processor(cr: ConsumerRecord):
if cr.value == event_2:
raise ValueError("Error from stream...")
await save_to_db(cr.value)
```

!!! note
In the example we can see that always the `cr` will be save into `elastic` and `s3` regardless an error

## Executing Code after the CR was processed

As mentioned in the introduction, it is possible to execute code after the `CR` is handled. To do this, we need to place code after `next_call` is called:

```python title="Execute code after CR is handled"
from kstreams import ConsumerRecord, Stream, middleware


class DLQMiddleware(middleware.BaseMiddleware):
async def __call__(self, cr: ConsumerRecord):
try:
return await self.next_call(cr)
except ValueError:
await dlq(cr.value)


class ElasticMiddleware(middleware.BaseMiddleware):
async def __call__(self, cr: ConsumerRecord):
return await self.next_call(cr)
# This will be called after the whole chain has finished
await save_to_elastic(cr.value)


middlewares = [
middleware.Middleware(DLQMiddleware),
middleware.Middleware(ElasticMiddleware),
]


@stream_engine.stream("kstreams-topic", middlewares=middlewares)
async def processor(cr: ConsumerRecord):
if cr.value == event_2:
raise ValueError("Error from stream...")
await save_to_db(cr.value)
```

!!! note
In the example we can see that only if there is not an `error` the event is saved to `elastic`
46 changes: 46 additions & 0 deletions examples/dlq-middleware/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
# Dead Letter Queue (DLQ) middleware Example

## Requirements

python 3.8+, poetry, docker-compose

## Installation

```bash
poetry install
```

## Explanation

This shows how the `Middleware` concept can be applied to use a `DLQ`. In this example every time that the `ValueError` exception is raised inside the `stream`, meaning that the event was nor processed, the middleware will send an event to the topic `kstreams--dlq-topic`.

## Usage

1. Start the kafka cluster: From `kstreams` project root execute `./scripts/cluster/start`
2. Inside this folder execute `poetry run app`
3. From `kstreams` project root, you can use the `./scripts/cluster/events/send` to send events to the kafka cluster. A prompt will open. Enter messages to send. The command is:
```bash
./scripts/cluster/events/send "local--hello-world"
```

Then, on the consume side, you should see something similar to the following logs:

```bash
❯ me@me-pc middleware-example % poetry run app

Group Coordinator Request failed: [Error 15] CoordinatorNotAvailableError
Group Coordinator Request failed: [Error 15] CoordinatorNotAvailableError
Group Coordinator Request failed: [Error 15] CoordinatorNotAvailableError
Group Coordinator Request failed: [Error 15] CoordinatorNotAvailableError
Group Coordinator Request failed: [Error 15] CoordinatorNotAvailableError
Consumer started
Event consumed: headers: (), payload: ConsumerRecord(topic='local--hello-world', partition=0, offset=0, timestamp=1660733921761, timestamp_type=0, key=None, value=b'boo', checksum=None, serialized_key_size=-1, serialized_value_size=3, headers=())
```

4. Consume from the topic `kstreams--dlq-topic`: `./scripts/cluster/events/read kstreams--dlq-topic`
5. Produce the event `joker` using the terminal opened in step `3`. Then check the terminal opened in the previous step and the event `joker` must appear

## Note

If you plan on using this example, pay attention to the `pyproject.toml` dependencies, where
`kstreams` is pointing to the parent folder. You will have to set the latest version.
Empty file.
43 changes: 43 additions & 0 deletions examples/dlq-middleware/dlq_middleware/app.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
import aiorun

from kstreams import ConsumerRecord, create_engine, middleware


class DLQMiddleware(middleware.BaseMiddleware):
def __init__(self, *, topic: str, **kwargs) -> None:
super().__init__(**kwargs)
self.topic = topic

async def __call__(self, cr: ConsumerRecord):
try:
return await self.next_call(cr)
except ValueError as exc:
print(f"\n Event crashed because {str(exc)} \n")
print(f"\n Producing event {cr.value} to DLQ topic {self.topic} \n")
await self.send(self.topic, key=cr.key, value=cr.value)


middlewares = [
middleware.Middleware(DLQMiddleware, topic="kstreams--dlq-topic"),
]
stream_engine = create_engine(title="my-stream-engine")


@stream_engine.stream(topics=["local--hello-world"], middlewares=middlewares)
async def consume(cr: ConsumerRecord):
print(f"Event consumed: headers: {cr.headers}, payload: {cr}")
if cr.value == b"joker":
raise ValueError("🤡 🤡 🤡 🤡 🤡 🤡 🤡 🤡")
print("Event has been proccesses")


async def start():
await stream_engine.start()


async def shutdown(loop):
await stream_engine.stop()


def main():
aiorun.run(start(), stop_on_unhandled_errors=True, shutdown_callback=shutdown)
Loading

0 comments on commit d79b27a

Please sign in to comment.