Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: first steps to add dependency injection. Inspect udf coroutines… #141

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 3 additions & 4 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -28,15 +28,14 @@ pip install aiorun

```python
import aiorun
from kstreams import create_engine, Stream
from kstreams import create_engine, ConsumerRecord


stream_engine = create_engine(title="my-stream-engine")

@stream_engine.stream("local--kstream")
async def consume(stream: Stream):
async for cr in stream:
print(f"Event consumed: headers: {cr.headers}, payload: {cr.value}")
async def consume(cr: ConsumerRecord):
marcosschroh marked this conversation as resolved.
Show resolved Hide resolved
print(f"Event consumed: headers: {cr.headers}, payload: {cr.value}")


async def produce():
Expand Down
21 changes: 9 additions & 12 deletions docs/getting_started.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,15 +6,14 @@ You can starting using `kstreams` with simple `producers` and `consumers` and/or

```python title="Simple use case"
import asyncio
from kstreams import create_engine, Stream
from kstreams import create_engine, ConsumerRecord

stream_engine = create_engine(title="my-stream-engine")


@stream_engine.stream("local--py-stream", group_id="de-my-partition")
async def consume(stream: Stream):
async for cr in stream:
print(f"Event consumed: headers: {cr.headers}, payload: {value}")
async def consume(cr: ConsumerRecord):
print(f"Event consumed: headers: {cr.headers}, payload: {value}")


async def produce():
Expand Down Expand Up @@ -54,15 +53,14 @@ so you want have to worry about `set signal handlers`, `shutdown callbacks`, `gr

```python title="Usage with aiorun"
import aiorun
from kstreams import create_engine, Stream
from kstreams import create_engine, ConsumerRecord

stream_engine = create_engine(title="my-stream-engine")


@stream_engine.stream("local--py-stream", group_id="de-my-partition")
async def consume(stream: Stream):
async for cr in stream:
print(f"Event consumed: headers: {cr.headers}, payload: {value}")
async def consume(cr: ConsumerRecord):
print(f"Event consumed: headers: {cr.headers}, payload: {value}")


async def produce():
Expand Down Expand Up @@ -109,13 +107,12 @@ Define the `streams`:
```python title="Application stream"
# streaming.streams.py
from .engine import stream_engine
from kstreams import Stream
from kstreams import ConsumerRecord


@stream_engine.stream("local--kstream")
async def stream(stream: Stream):
async for cr in stream:
print(f"Event consumed: headers: {cr.headers}, payload: {cr.payload}")
async def stream(cr: ConsumerRecord):
print(f"Event consumed: headers: {cr.headers}, payload: {cr.payload}")
```

Create the `FastAPI`:
Expand Down
7 changes: 3 additions & 4 deletions docs/index.md
Original file line number Diff line number Diff line change
Expand Up @@ -26,15 +26,14 @@ pip install aiorun

```python
import aiorun
from kstreams import create_engine, Stream
from kstreams import create_engine, ConsumerRecord


stream_engine = create_engine(title="my-stream-engine")

@stream_engine.stream("local--kstream")
async def consume(stream: Stream):
async for cr in stream:
print(f"Event consumed: headers: {cr.headers}, payload: {cr.value}")
async def consume(cr: ConsumerRecord):
print(f"Event consumed: headers: {cr.headers}, payload: {cr.value}")


async def produce():
Expand Down
13 changes: 6 additions & 7 deletions docs/monitoring.md
Original file line number Diff line number Diff line change
Expand Up @@ -89,14 +89,13 @@ In our kstreams app, we can:
stream_engine = create_engine(title="my-engine", monitor=MyAppPrometheusMonitor())

@stream_engine.stream("my-special-orders")
async def consume_orders_received(consumer):
for cr, value, _ in consumer:
if value.status == "NEW":
stream_engine.monitor.increase_received()
elif value.status == "SHIPPED":
stream_engine.monitor.increase_shipped()
async def consume_orders_received(cr: ConsumerRecord):
if cr.value.status == "NEW":
stream_engine.monitor.increase_received()
elif cr.value.status == "SHIPPED":
stream_engine.monitor.increase_shipped()
```

Your app's prometheus would display this data, which you might utilize to build a stylish ✨dashboard✨ interface.

For further details, see the [Prometheus python client](https://github.com/prometheus/client) documentation.
For further details, see the [Prometheus python client](https://github.com/prometheus/client) documentation.
9 changes: 4 additions & 5 deletions docs/serialization.md
Original file line number Diff line number Diff line change
Expand Up @@ -51,11 +51,10 @@ stream_engine = create_engine(

```python
@stream_engine.stream(topic, deserializer=JsonDeserializer())
async def hello_stream(stream: Stream):
async for event in stream:
# remember event.value is now a dict
print(event.value["message"])
save_to_db(event)
async def hello_stream(cr: ConsumerRecord):
# remember event.value is now a dict
print(cr.value["message"])
save_to_db(cr)
```

```python
Expand Down
67 changes: 48 additions & 19 deletions docs/stream.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,40 @@ Consuming can be done using `kstreams.Stream`. You only need to decorate a `coro
docstring_section_style: table
show_signature_annotations: false

## Dependency Injection and typing
marcosschroh marked this conversation as resolved.
Show resolved Hide resolved

The old way to itereate over a stream is with the `async for _ in stream` loop. The iterable approach works but in most cases end users are interested only in the `ConsumerRecord` and not in the `stream`, for this reason now it is possible to remove the `loop` and every time that a new event is in the stream the `coroutine` function defined by the end user will ba `awaited`. If the `stream` is also needed, for example because `manual` commit is enabled then you can also add the `stream` as an argument in the coroutine.

=== "Use only the ConsumerRecord"
```python
@stream_engine.stream(topic, name="my-stream")
async def my_stream(cr: ConsumerRecord):
save_to_db(cr.value)
```

=== "Use ConsumerRecord and Stream"
```python
@stream_engine.stream(topic, name="my-stream", enable_auto_commit=False)
async def my_stream(cr: ConsumerRecord, stream: Stream):
save_to_db(cr.value)
await stream.commit()
```

=== "Old fashion"
```python
@stream_engine.stream(topic, name="my-stream")
async def consume(stream): # you can specify the type but it will be the same result
async for cr in stream:
save_to_db(cr.value)
# you can do something with the stream as well!!
```

!!! note
A proper typing is required in order to remove the `async for in` loop. The argument order is also important, this might change in the future.

!!! note
It is still possible to use the `async for in` loop, but it might be removed in the future.

## Creating a Stream instance

If for any reason you need to create `Streams` instances directly, you can do it without using the decorator `stream_engine.stream`.
Expand All @@ -27,9 +61,8 @@ class MyDeserializer:
return consumer_record.value.decode()


async def stream(stream: Stream) -> None:
async for cr in stream:
print(f"Event consumed: headers: {cr.headers}, payload: {cr.value}")
async def stream(cr: ConsumerRecord) -> None:
print(f"Event consumed: headers: {cr.headers}, payload: {cr.value}")


stream = Stream(
Expand Down Expand Up @@ -110,15 +143,14 @@ As an end user you are responsable of deciding what to do. In future version app

```python title="Crashing example"
import aiorun
from kstreams import create_engine
from kstreams import create_engine, ConsumerRecord

stream_engine = create_engine(title="my-stream-engine")


@stream_engine.stream("local--kstreams", group_id="de-my-partition")
async def stream(stream: Stream) -> None:
async for cr in stream:
print(f"Event consumed. Payload {cr.payload}")
async def stream(cr: ConsumerRecord) -> None:
print(f"Event consumed. Payload {cr.payload}")


async def produce():
Expand Down Expand Up @@ -162,9 +194,8 @@ stream_engine = create_engine(title="my-stream-engine")


@stream_engine.stream(["local--kstreams", "local--hello-world"], group_id="example-group")
async def consume(stream: Stream) -> None:
async for cr in stream:
print(f"Event consumed from topic {cr.topic}: headers: {cr.headers}, payload: {cr.value}")
async def consume(cr: ConsumerRecord) -> None:
print(f"Event consumed from topic {cr.topic}: headers: {cr.headers}, payload: {cr.value}")
```

## Changing consumer behavior
Expand All @@ -176,9 +207,8 @@ Most of the time you will only set the `topic` and the `group_id` to the `consum
# On OffsetOutOfRange errors, the offset will move to the oldest available message (‘earliest’)

@stream_engine.stream("local--kstream", group_id="de-my-partition", session_timeout_ms=500, auto_offset_reset"earliest")
async def stream(stream: Stream):
async for cr in stream:
print(f"Event consumed: headers: {cr.headers}, payload: {cr.value}")
async def stream(cr: ConsumerRecord):
print(f"Event consumed: headers: {cr.headers}, payload: {cr.value}")
```

## Manual commit
Expand All @@ -187,13 +217,12 @@ When processing more sensitive data and you want to be sure that the `kafka offe

```python title="Manual commit example"
@stream_engine.stream("local--kstream", group_id="de-my-partition", enable_auto_commit=False)
async def stream(stream: Stream):
async for cr in stream:
print(f"Event consumed: headers: {cr.headers}, payload: {cr.value}")
async def stream(cr: ConsumerRecord, stream: Stream):
print(f"Event consumed: headers: {cr.headers}, payload: {cr.value}")

# We need to make sure that the pyalod was stored before commiting the kafka offset
await store_in_database(payload)
await stream.consumer.commit() # You need to commit!!!
# We need to make sure that the pyalod was stored before commiting the kafka offset
await store_in_database(payload)
await stream.commit() # You need to commit!!!
```

!!! note
Expand Down
16 changes: 7 additions & 9 deletions docs/test_client.md
Original file line number Diff line number Diff line change
Expand Up @@ -58,9 +58,8 @@ event_store = EventStore()


@stream_engine.stream(topic, group_id="example-group")
async def consume(stream: Stream):
async for cr in stream:
event_store.add(cr)
async def consume(cr: ConsumerRecord):
event_store.add(cr)


async def produce():
Expand Down Expand Up @@ -129,7 +128,7 @@ In some cases your stream will commit, in this situation checking the commited p
```python
import pytest
from kstreams.test_utils import TestStreamClient
from kstreams import TopicPartition
from kstreams import ConsumerRecord, Stream, TopicPartition

from .example import produce, stream_engine

Expand All @@ -145,10 +144,9 @@ tp = TopicPartition(
total_events = 10

@stream_engine.stream(topic_name, name=name)
async def my_stream(stream: Stream):
async for cr in stream:
# commit every time that an event arrives
await stream.consumer.commit({tp: cr.offset})
async def my_stream(cr: ConsumerRecord, stream: Stream):
# commit every time that an event arrives
await stream.commit({tp: cr.offset})


# test the code
Expand All @@ -162,7 +160,7 @@ async def test_consumer_commit(stream_engine: StreamEngine):

# check that everything was commited
stream = stream_engine.get_stream(name)
assert (await stream.consumer.committed(tp)) == total_events
assert (await stream.committed(tp)) == total_events
```

### E2E test
Expand Down
4 changes: 2 additions & 2 deletions examples/README.md
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
## Welcome to `kstreams` examples.
# Welcome to `kstreams` examples

In order to run the examples you need `docker-compose`. In the ptoject root you will find the file `docker-compose.yaml` that contains a mininal setup to run `kafka` and `zookeeper`.

### Steps:
## Steps

1. Activate your `virtualenv`: `poetry shell`
2. Create the kafka cluster: `make kafka-cluster`
Expand Down
12 changes: 5 additions & 7 deletions examples/confluent-example/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -144,22 +144,20 @@ And there are two `streams` that will consume events from `local--deployment` an

```python
from .engine import stream_engine
from kstreams import Stream
from kstreams import ConsumerRecord

deployment_topic = "local--deployment"
country_topic = "local--country"


@stream_engine.stream(deployment_topic)
async def deployment_stream(stream: Stream):
async for cr in stream:
print(f"Event consumed on topic {deployment_topic}. The user is {cr.value}")
async def deployment_stream(cr: ConsumerRecord):
print(f"Event consumed on topic {deployment_topic}. The user is {cr.value}")


@stream_engine.stream(country_topic)
async def country_stream(stream: Stream):
async for cr in stream:
print(f"Event consumed on topic {country_topic}. The Address is {cr.value}")
async def country_stream(cr: ConsumerRecord):
print(f"Event consumed on topic {country_topic}. The Address is {cr.value}")
```

## Note
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from kstreams import Stream
from kstreams import ConsumerRecord

from .engine import stream_engine

Expand All @@ -7,12 +7,10 @@


@stream_engine.stream(deployment_topic)
async def deployment_stream(stream: Stream):
async for cr in stream:
print(f"Event consumed on topic {deployment_topic}. The user is {cr.value}")
async def deployment_stream(cr: ConsumerRecord):
print(f"Event consumed on topic {deployment_topic}. The user is {cr.value}")


@stream_engine.stream(country_topic)
async def country_stream(stream: Stream):
async for cr in stream:
print(f"Event consumed on topic {country_topic}. The Address is {cr.value}")
async def country_stream(cr: ConsumerRecord):
print(f"Event consumed on topic {country_topic}. The Address is {cr.value}")
Loading
Loading