Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(structured-concurrency): first step to introduce structured conc… #190

Merged
merged 1 commit into from
Jul 1, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 7 additions & 3 deletions examples/confluent-example/confluent_example/app.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,13 @@
import asyncio
import logging

import aiorun

from .schemas import country_schema, deployment_schema
from .streaming.streams import stream_engine

logger = logging.getLogger(__name__)

deployment_topic = "local--deployment"
country_topic = "local--country"

Expand All @@ -23,7 +26,7 @@ async def produce():
"schema": deployment_schema,
},
)
print(f"Event produced on topic {deployment_topic}. Metadata: {metadata}")
logger.info(f"Event produced on topic {deployment_topic}. Metadata: {metadata}")

metadata = await stream_engine.send(
country_topic,
Expand All @@ -35,7 +38,7 @@ async def produce():
"schema": country_schema,
},
)
print(f"Event produced on topic {country_topic}. Metadata: {metadata}")
logger.info(f"Event produced on topic {country_topic}. Metadata: {metadata}")

await asyncio.sleep(3)

Expand All @@ -45,9 +48,10 @@ async def start():
await produce()


async def shutdown(loop):
async def shutdown(loop: asyncio.AbstractEventLoop):
await stream_engine.stop()


def main():
logging.basicConfig(level=logging.INFO)
aiorun.run(start(), stop_on_unhandled_errors=True, shutdown_callback=shutdown)
685 changes: 335 additions & 350 deletions examples/confluent-example/poetry.lock

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion examples/confluent-example/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ authors = ["Marcos Schroh <[email protected]>"]
python = "^3.8"
kstreams = { path = "../../.", develop = true }
python-schema-registry-client = "^2.4.0"
aiorun = "^2022.4.1"
aiorun = "^2024.5.1"

[build-system]
requires = ["poetry-core>=1.0.0"]
Expand Down
Original file line number Diff line number Diff line change
@@ -1,16 +1,21 @@
import asyncio
import json
import logging

import aiorun

from kstreams import ConsumerRecord, create_engine

logger = logging.getLogger(__name__)

topics = ["local--kstreams-2", "local--hello-world"]

stream_engine = create_engine(title="my-stream-engine")


@stream_engine.stream(topics, group_id="example-group")
async def consume(cr: ConsumerRecord) -> None:
print(
logger.info(
f"Event consumed from topic: {cr.topic}, "
f"headers: {cr.headers}, payload: {cr.value}"
)
Expand All @@ -21,7 +26,7 @@ async def produce(events_per_topic: int = 5, delay_seconds: int = 1) -> None:
for topic in topics:
payload = json.dumps({"message": f"Hello world from topic {topic}!"})
metadata = await stream_engine.send(topic, value=payload.encode(), key="1")
print(f"Message sent: {metadata}")
logger.info(f"Message sent: {metadata}")
await asyncio.sleep(delay_seconds)


Expand All @@ -30,15 +35,10 @@ async def start():
await produce()


async def shutdown():
async def shutdown(loop: asyncio.AbstractEventLoop):
await stream_engine.stop()


def main():
loop = asyncio.get_event_loop()
try:
loop.run_until_complete(start())
loop.run_forever()
finally:
loop.run_until_complete(shutdown())
loop.close()
logging.basicConfig(level=logging.INFO)
aiorun.run(start(), stop_on_unhandled_errors=True, shutdown_callback=shutdown)
333 changes: 333 additions & 0 deletions examples/consume-multiple-topics-example/poetry.lock

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion examples/consume-multiple-topics-example/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ authors = ["Marcos Schroh <[email protected]>, Santiago Fraire <santiago.fra

[tool.poetry.dependencies]
python = "^3.8"
aiorun = "^2022.4.1"
aiorun = "^2024.5.1"
kstreams = { path = "../../.", develop = true }

[tool.poetry.dev-dependencies]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ async def start():
await produce()


async def shutdown(loop):
async def shutdown(loop: asyncio.AbstractEventLoop):
await stream_engine.stop()


Expand Down
374 changes: 193 additions & 181 deletions examples/dataclasses-avroschema-example/poetry.lock

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion examples/dataclasses-avroschema-example/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ authors = ["Marcos Schroh <[email protected]>"]
[tool.poetry.dependencies]
python = "^3.8"
kstreams = { path = "../../.", develop = true }
aiorun = "^2023.7.2"
aiorun = "^2024.5.1"
dataclasses-avroschema = {version = "^0.56.1", extras = ["faker"]}

[build-system]
Expand Down
4 changes: 3 additions & 1 deletion examples/dlq-middleware/dlq_middleware/app.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
import asyncio

import aiorun

from kstreams import ConsumerRecord, create_engine, middleware
Expand Down Expand Up @@ -35,7 +37,7 @@ async def start():
await stream_engine.start()


async def shutdown(loop):
async def shutdown(loop: asyncio.AbstractEventLoop):
await stream_engine.stop()


Expand Down
319 changes: 152 additions & 167 deletions examples/dlq-middleware/poetry.lock

Large diffs are not rendered by default.

1 change: 1 addition & 0 deletions examples/dlq-middleware/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ packages = [{include = "dlq_middleware"}]
[tool.poetry.dependencies]
python = "^3.8"
kstreams = { path = "../../.", develop = true }
aiorun = "^2024.5.1"

[build-system]
requires = ["poetry-core"]
Expand Down
335 changes: 155 additions & 180 deletions examples/fastapi-sse/poetry.lock

Large diffs are not rendered by default.

4 changes: 4 additions & 0 deletions examples/fastapi-webapp/fastapi_webapp/streams.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
import logging

from kstreams import ConsumerRecord, stream

logger = logging.getLogger(__name__)


@stream("local--kstream", group_id="kstreams--group-id")
async def consume(cr: ConsumerRecord):
Expand Down
283 changes: 182 additions & 101 deletions examples/fastapi-webapp/poetry.lock

Large diffs are not rendered by default.

94 changes: 94 additions & 0 deletions examples/graceful-shutdown-example/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
# Graceful Shutdown Example

The idea is to demostrate that a `Graceful Shutdown` is possbile when a stream crashes. In this example we have two streams, one consuming from `local--hello-world`
topic and the other one consuming from `local--kstream`.

The stream consuming from `local--kstream` has a delay of 20 seconds after an `event` is received (this is to simulate a super slow consumption process).
The stream consuming from `local--hello-world` will raise a `ValueError("error....")` exception when the event value is `error`.
If an event was send to `local--kstream` in a time `t` and later an event with the value `error` was send to `local--hello-world` in a windows of less than `20 seconds`, then the stoping program process will be delay `20 seconds - t seconds`

Example:

1. Send an event to topic `local--kstream` now
2. Send an event to topic `local--hello-world` 5 seconds after sending the event in the previous step
3. You will see that after `15 seconds` the program stops, because it must wait that the event on `step 1` is processed.

## Requirements

python 3.8+, poetry, docker-compose

### Installation

```bash
poetry install
```

## Usage

1. Start the kafka cluster: From `kstreams` project root execute `./scripts/cluster/start`
2. Inside this folder execute `poetry run app`
3. From `kstreams` project root, you can use the `./scripts/cluster/events/send` to send events to the kafka cluster. A prompt will open. Enter messages to send. The command is:
```bash
./scripts/cluster/events/send "local--kstream"
```
Then, on the consume side, you should see something similar to the following logs:

```bash
❯ me@me-pc simple-consumer-example % poetry run app

Group Coordinator Request failed: [Error 15] CoordinatorNotAvailableError
Group Coordinator Request failed: [Error 15] CoordinatorNotAvailableError
Group Coordinator Request failed: [Error 15] CoordinatorNotAvailableError
Group Coordinator Request failed: [Error 15] CoordinatorNotAvailableError
Group Coordinator Request failed: [Error 15] CoordinatorNotAvailableError
Consumer started
Event consumed: headers: (), payload: ConsumerRecord(topic='local--hello-world', partition=0, offset=0, timestamp=1660733921761, timestamp_type=0, key=None, value=b'boo', checksum=None, serialized_key_size=-1, serialized_value_size=3, headers=())
```
4. In another terminal repeat the same to send events to the other topic and send the event `error`
```bash
./scripts/cluster/events/send "local--hello-world"
```
5. Then, on the consume side, you should see something similar to the following logs:
```bash
INFO:graceful_shutdown_example.app:Event finished...
INFO:aiokafka.consumer.group_coordinator:LeaveGroup request succeeded
INFO:kstreams.streams:Stream consuming from topics ['local--kstream'] has stopped!!!


INFO:kstreams.engine:Streams have STOPPED....
INFO:aiorun:Cancelling pending tasks.
INFO:aiorun:Running pending tasks till complete
INFO:aiorun:Waiting for executor shutdown.
INFO:aiorun:Shutting down async generators
INFO:aiorun:Closing the loop.
INFO:aiorun:Leaving. Bye!
INFO:aiorun:Reraising unhandled exception
Traceback (most recent call last):
File "<string>", line 1, in <module>
File "/kstreams/examples/graceful-shutdown-example/graceful_shutdown_example/app.py", line 38, in main
aiorun.run(start(), stop_on_unhandled_errors=True, shutdown_callback=stop)
File "/kstreams/examples/graceful-shutdown-example/.venv/lib/python3.12/site-packages/aiorun.py", line 370, in run
raise pending_exception_to_raise
File "/kstreams/kstreams/streams.py", line 231, in start
await self.func_wrapper_with_typing()
File "/kstreams/kstreams/streams.py", line 239, in func_wrapper_with_typing
await self.func(cr)
File "/kstreams/kstreams/middleware/middleware.py", line 80, in __call__
raise exc
File "/kstreams/kstreams/middleware/middleware.py", line 66, in __call__
return await self.next_call(cr)
^^^^^^^^^^^^^^^^^^^^^^^^
File "/kstreams/kstreams/streams.py", line 348, in __call__
return await self.handler(*params)
^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/kstreams/examples/graceful-shutdown-example/graceful_shutdown_example/app.py", line 18, in consume
raise ValueError("error....")
ValueError: error....
Handler: <kstreams.middleware.middleware.ExceptionMiddleware object at 0x10361dd00>
Topics: ['local--hello-world']
```

## Note

If you plan on using this example, pay attention to the `pyproject.toml` dependencies, where
`kstreams` is pointing to the parent folder. You will have to set the latest version.
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
import asyncio
import logging

import aiorun

import kstreams

logger = logging.getLogger(__name__)

stream_engine = kstreams.create_engine(title="my-stream-engine")


@stream_engine.stream(topics=["local--hello-world"], group_id="example-group")
async def consume(cr: kstreams.ConsumerRecord):
logger.info(f"Event consumed: headers: {cr.headers}, payload: {cr}")

if cr.value == b"error":
raise ValueError("error....")


@stream_engine.stream(topics=["local--kstream"], group_id="example-group-2")
async def consume_2(cr: kstreams.ConsumerRecord):
logger.info(f"Event consumed: headers: {cr.headers}, payload: {cr}")
await asyncio.sleep(20)
logger.info("Event finished...")


async def start():
await stream_engine.start()


async def stop(loop: asyncio.AbstractEventLoop):
await stream_engine.stop()


def main():
logging.basicConfig(level=logging.INFO)
aiorun.run(start(), stop_on_unhandled_errors=True, shutdown_callback=stop)
Loading
Loading