Skip to content

Commit

Permalink
docs: StreamEngine and Streams documentation added. Close #14 (#99)
Browse files Browse the repository at this point in the history
Co-authored-by: Marcos Schroh <[email protected]>
  • Loading branch information
marcosschroh and marcosschroh authored Feb 21, 2023
1 parent d1ef71f commit ddf0b26
Show file tree
Hide file tree
Showing 5 changed files with 128 additions and 32 deletions.
7 changes: 7 additions & 0 deletions docs/engine.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
# StreamEngine

::: kstreams.engine.StreamEngine
options:
show_root_heading: true
docstring_section_style: table
show_signature_annotations: false
50 changes: 20 additions & 30 deletions docs/stream.md
Original file line number Diff line number Diff line change
@@ -1,33 +1,14 @@
# Streams

A `Stream` in `kstreams` is an extension of [AIOKafkaConsumer](https://aiokafka.readthedocs.io/en/stable/consumer.html)

Consuming can be done using `kstreams.Stream`. You only need to decorate a `coroutine` with `@stream_engine.streams`. The decorator has the same [aiokafka consumer](https://aiokafka.readthedocs.io/en/stable/api.html#aiokafkaconsumer-class) API at initialization, in other words they accept the same `args` and `kwargs` that the `aiokafka consumer` accepts.

```python title="Stream usage"
import aiorun
from kstreams import create_engine

stream_engine = create_engine(title="my-stream-engine")


# here you can add any other AIOKafkaConsumer config, for example auto_offset_reset
@stream_engine.stream("local--kstreams", group_id="de-my-partition")
async def stream(stream: Stream) -> None:
async for cr in stream:
print(f"Event consumed: headers: {cr.headers}, payload: {cr.value}")


async def start():
await stream_engine.start()
await produce()


async def shutdown(loop):
await stream_engine.stop()


if __name__ == "__main__":
aiorun.run(start(), stop_on_unhandled_errors=True, shutdown_callback=shutdown)
```
::: kstreams.streams.Stream
options:
show_root_heading: true
docstring_section_style: table
show_signature_annotations: false

## Creating a Stream instance

Expand Down Expand Up @@ -75,11 +56,13 @@ if __name__ == "__main__":
```

### Removing a stream from the engine

```python title="Removing stream"
stream_engine.remove_stream(stream)
```

### Starting the stream with initial offsets

If you want to start your consumption from certain offsets, you can include that in your stream instantiation.

Use case:
Expand All @@ -99,17 +82,24 @@ Also be aware that when your application restarts, it most likely will trigger t
This means that setting intial_offsets to be a hardcoded number might not get the results you expect.

```python title="Initial Offsets from Database"
from kstreams import Stream, structs


topic_name = "local--kstreams"
db_table = ExampleDatabase()
initial_offsets: [List[TopicPartitionOffset]] = [TopicPartitionOffset(topic=topic_name, partition=0, offset=db_table.offset)]
initial_offset = structs.TopicPartitionOffset(topic=topic_name, partition=0, offset=db_table.offset)


async def my_stream(stream: Stream):
...


stream = Stream(
topic_name,
name="my-stream"
func=stream, # coroutine or async generator
name="my-stream",
func=my_stream, # coroutine or async generator
deserializer=MyDeserializer(),
initial_offsets=initial_offsets
initial_offsets=[initial_offset],
)
```

Expand Down
47 changes: 47 additions & 0 deletions kstreams/engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,41 @@


class StreamEngine:
"""
Attributes:
backend kstreams.backends.Kafka: Backend to connect. Default `Kafka`
consumer_class kstreams.Consumer: The consumer class to use when
instanciate a consumer. Default kstreams.Consumer
producer_class kstreams.Producer: The producer class to use when
instanciate the producer. Default kstreams.Producer
monitor kstreams.PrometheusMonitor: Prometheus monitor that holds
the [metrics](https://kpn.github.io/kstreams/metrics/)
title str | None: Engine name
serializer kstreams.serializers.Serializer | None: Serializer to
use when an event is produced.
deserializer kstreams.serializers.Deserializer | None: Deserializer
to be used when an event is consumed.
If provided it will be used in all Streams instances as a general one.
To override it per Stream, you can provide one per Stream
!!! Example
```python title="Usage"
import kstreams
stream_engine = kstreams.create_engine(
title="my-stream-engine"
)
@kstreams.stream("local--hello-world", group_id="example-group")
async def consume(stream: kstreams.Stream) -> None:
async for cr in stream:
print(f"showing bytes: {cr.value}")
await stream_engine.start()
```
"""

def __init__(
self,
*,
Expand Down Expand Up @@ -54,6 +89,18 @@ async def send(
serializer: Optional[Serializer] = None,
serializer_kwargs: Optional[Dict] = None,
):
"""
Attributes:
topic str: Topic name to send the event to
value Any: Event value
key str | None: Event key
partition int | None: Topic partition
timestamp_ms int | None: Event timestamp in miliseconds
headers Dict[str, str] | None: Event headers
serializer kstreams.serializers.Serializer | None: Serializer to
encode the event
serializer_kwargs Dict[str, Any] | None: Serializer kwargs
"""
if self._producer is None:
raise EngineNotStartedException()

Expand Down
55 changes: 53 additions & 2 deletions kstreams/streams.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,55 @@


class Stream:
"""
Attributes:
name str: Stream name
topics List[str]: List of topics to consume
backend kstreams.backends.Kafka: backend kstreams.backends.kafka.Kafka:
Backend to connect. Default `Kafka`
func Callable[["Stream"], Awaitable[Any]]: Coroutine fucntion or generator
to be called when an event arrives
config Dict[str, Any]: Stream configuration. Here all the
[properties](https://aiokafka.readthedocs.io/en/stable/api.html#consumer-class)
can be passed in the dictionary
deserializer kstreams.serializers.Deserializer: Deserializer to be used
when an event is consumed
initial_offsets List[kstreams.TopicPartitionOffset]: List of
TopicPartitionOffset that will `seek` the initial offsets to
!!! Example
```python title="Usage"
import aiorun
from kstreams import create_engine
stream_engine = create_engine(title="my-stream-engine")
# here you can add any other AIOKafkaConsumer config
@stream_engine.stream("local--kstreams", group_id="de-my-partition")
async def stream(stream: Stream) -> None:
async for cr in stream:
print(f"Event consumed: headers: {cr.headers}, payload: {cr.value}")
async def start():
await stream_engine.start()
await produce()
async def shutdown(loop):
await stream_engine.stop()
if __name__ == "__main__":
aiorun.run(
start(),
stop_on_unhandled_errors=True,
shutdown_callback=shutdown
)
```
"""

def __init__(
self,
topics: Union[List[str], str],
Expand Down Expand Up @@ -129,9 +178,10 @@ def _seek_to_initial_offsets(self):

async def __aenter__(self) -> AsyncGenerator:
"""
Start the kafka Consumer and return an async_gen so it can be iterated
Start the kafka Consumer and return an `async_gen` so it can be iterated
Usage:
!!! Example
```python title="Usage"
@stream_engine.stream(topic, group_id=group_id, ...)
async def stream(consumer):
async for cr, value, headers in consumer:
Expand All @@ -142,6 +192,7 @@ async def stream(consumer):
async with stream as stream_flow:
async for value in stream_flow:
...
```
"""
logger.info("Starting async_gen Stream....")
async_gen = await self.start()
Expand Down
1 change: 1 addition & 0 deletions mkdocs.yml
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ extra_css:
nav:
- Introduction: 'index.md'
- Getting Started: 'getting_started.md'
- StreamEngine: 'engine.md'
- Stream: 'stream.md'
- Backends: 'backends.md'
- Metrics: 'metrics.md'
Expand Down

0 comments on commit ddf0b26

Please sign in to comment.