Skip to content

Commit

Permalink
feat(structured-concurrency): first step to introduce structured conc…
Browse files Browse the repository at this point in the history
…urrency paradigm. Now StreamEngine will stop on a Stream crash (only when running with aiorun). Closes #60 due to the use of flag `stop_on_unhandled_errors` with aiorun. Related to #162 (#190)

anyio has been introduced as the way to run programs in worker mode
  • Loading branch information
marcosschroh authored Jul 1, 2024
1 parent 78032d2 commit 32e362d
Show file tree
Hide file tree
Showing 53 changed files with 2,873 additions and 2,317 deletions.
10 changes: 7 additions & 3 deletions examples/confluent-example/confluent_example/app.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,13 @@
import asyncio
import logging

import aiorun

from .schemas import country_schema, deployment_schema
from .streaming.streams import stream_engine

logger = logging.getLogger(__name__)

deployment_topic = "local--deployment"
country_topic = "local--country"

Expand All @@ -23,7 +26,7 @@ async def produce():
"schema": deployment_schema,
},
)
print(f"Event produced on topic {deployment_topic}. Metadata: {metadata}")
logger.info(f"Event produced on topic {deployment_topic}. Metadata: {metadata}")

metadata = await stream_engine.send(
country_topic,
Expand All @@ -35,7 +38,7 @@ async def produce():
"schema": country_schema,
},
)
print(f"Event produced on topic {country_topic}. Metadata: {metadata}")
logger.info(f"Event produced on topic {country_topic}. Metadata: {metadata}")

await asyncio.sleep(3)

Expand All @@ -45,9 +48,10 @@ async def start():
await produce()


async def shutdown(loop):
async def shutdown(loop: asyncio.AbstractEventLoop):
await stream_engine.stop()


def main():
logging.basicConfig(level=logging.INFO)
aiorun.run(start(), stop_on_unhandled_errors=True, shutdown_callback=shutdown)
685 changes: 335 additions & 350 deletions examples/confluent-example/poetry.lock

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion examples/confluent-example/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ authors = ["Marcos Schroh <[email protected]>"]
python = "^3.8"
kstreams = { path = "../../.", develop = true }
python-schema-registry-client = "^2.4.0"
aiorun = "^2022.4.1"
aiorun = "^2024.5.1"

[build-system]
requires = ["poetry-core>=1.0.0"]
Expand Down
Original file line number Diff line number Diff line change
@@ -1,16 +1,21 @@
import asyncio
import json
import logging

import aiorun

from kstreams import ConsumerRecord, create_engine

logger = logging.getLogger(__name__)

topics = ["local--kstreams-2", "local--hello-world"]

stream_engine = create_engine(title="my-stream-engine")


@stream_engine.stream(topics, group_id="example-group")
async def consume(cr: ConsumerRecord) -> None:
print(
logger.info(
f"Event consumed from topic: {cr.topic}, "
f"headers: {cr.headers}, payload: {cr.value}"
)
Expand All @@ -21,7 +26,7 @@ async def produce(events_per_topic: int = 5, delay_seconds: int = 1) -> None:
for topic in topics:
payload = json.dumps({"message": f"Hello world from topic {topic}!"})
metadata = await stream_engine.send(topic, value=payload.encode(), key="1")
print(f"Message sent: {metadata}")
logger.info(f"Message sent: {metadata}")
await asyncio.sleep(delay_seconds)


Expand All @@ -30,15 +35,10 @@ async def start():
await produce()


async def shutdown():
async def shutdown(loop: asyncio.AbstractEventLoop):
await stream_engine.stop()


def main():
loop = asyncio.get_event_loop()
try:
loop.run_until_complete(start())
loop.run_forever()
finally:
loop.run_until_complete(shutdown())
loop.close()
logging.basicConfig(level=logging.INFO)
aiorun.run(start(), stop_on_unhandled_errors=True, shutdown_callback=shutdown)
333 changes: 333 additions & 0 deletions examples/consume-multiple-topics-example/poetry.lock

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion examples/consume-multiple-topics-example/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ authors = ["Marcos Schroh <[email protected]>, Santiago Fraire <santiago.fra

[tool.poetry.dependencies]
python = "^3.8"
aiorun = "^2022.4.1"
aiorun = "^2024.5.1"
kstreams = { path = "../../.", develop = true }

[tool.poetry.dev-dependencies]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ async def start():
await produce()


async def shutdown(loop):
async def shutdown(loop: asyncio.AbstractEventLoop):
await stream_engine.stop()


Expand Down
374 changes: 193 additions & 181 deletions examples/dataclasses-avroschema-example/poetry.lock

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion examples/dataclasses-avroschema-example/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ authors = ["Marcos Schroh <[email protected]>"]
[tool.poetry.dependencies]
python = "^3.8"
kstreams = { path = "../../.", develop = true }
aiorun = "^2023.7.2"
aiorun = "^2024.5.1"
dataclasses-avroschema = {version = "^0.56.1", extras = ["faker"]}

[build-system]
Expand Down
4 changes: 3 additions & 1 deletion examples/dlq-middleware/dlq_middleware/app.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
import asyncio

import aiorun

from kstreams import ConsumerRecord, create_engine, middleware
Expand Down Expand Up @@ -35,7 +37,7 @@ async def start():
await stream_engine.start()


async def shutdown(loop):
async def shutdown(loop: asyncio.AbstractEventLoop):
await stream_engine.stop()


Expand Down
319 changes: 152 additions & 167 deletions examples/dlq-middleware/poetry.lock

Large diffs are not rendered by default.

1 change: 1 addition & 0 deletions examples/dlq-middleware/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ packages = [{include = "dlq_middleware"}]
[tool.poetry.dependencies]
python = "^3.8"
kstreams = { path = "../../.", develop = true }
aiorun = "^2024.5.1"

[build-system]
requires = ["poetry-core"]
Expand Down
335 changes: 155 additions & 180 deletions examples/fastapi-sse/poetry.lock

Large diffs are not rendered by default.

4 changes: 4 additions & 0 deletions examples/fastapi-webapp/fastapi_webapp/streams.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
import logging

from kstreams import ConsumerRecord, stream

logger = logging.getLogger(__name__)


@stream("local--kstream", group_id="kstreams--group-id")
async def consume(cr: ConsumerRecord):
Expand Down
283 changes: 182 additions & 101 deletions examples/fastapi-webapp/poetry.lock

Large diffs are not rendered by default.

94 changes: 94 additions & 0 deletions examples/graceful-shutdown-example/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
# Graceful Shutdown Example

The idea is to demostrate that a `Graceful Shutdown` is possbile when a stream crashes. In this example we have two streams, one consuming from `local--hello-world`
topic and the other one consuming from `local--kstream`.

The stream consuming from `local--kstream` has a delay of 20 seconds after an `event` is received (this is to simulate a super slow consumption process).
The stream consuming from `local--hello-world` will raise a `ValueError("error....")` exception when the event value is `error`.
If an event was send to `local--kstream` in a time `t` and later an event with the value `error` was send to `local--hello-world` in a windows of less than `20 seconds`, then the stoping program process will be delay `20 seconds - t seconds`

Example:

1. Send an event to topic `local--kstream` now
2. Send an event to topic `local--hello-world` 5 seconds after sending the event in the previous step
3. You will see that after `15 seconds` the program stops, because it must wait that the event on `step 1` is processed.

## Requirements

python 3.8+, poetry, docker-compose

### Installation

```bash
poetry install
```

## Usage

1. Start the kafka cluster: From `kstreams` project root execute `./scripts/cluster/start`
2. Inside this folder execute `poetry run app`
3. From `kstreams` project root, you can use the `./scripts/cluster/events/send` to send events to the kafka cluster. A prompt will open. Enter messages to send. The command is:
```bash
./scripts/cluster/events/send "local--kstream"
```
Then, on the consume side, you should see something similar to the following logs:

```bash
❯ me@me-pc simple-consumer-example % poetry run app

Group Coordinator Request failed: [Error 15] CoordinatorNotAvailableError
Group Coordinator Request failed: [Error 15] CoordinatorNotAvailableError
Group Coordinator Request failed: [Error 15] CoordinatorNotAvailableError
Group Coordinator Request failed: [Error 15] CoordinatorNotAvailableError
Group Coordinator Request failed: [Error 15] CoordinatorNotAvailableError
Consumer started
Event consumed: headers: (), payload: ConsumerRecord(topic='local--hello-world', partition=0, offset=0, timestamp=1660733921761, timestamp_type=0, key=None, value=b'boo', checksum=None, serialized_key_size=-1, serialized_value_size=3, headers=())
```
4. In another terminal repeat the same to send events to the other topic and send the event `error`
```bash
./scripts/cluster/events/send "local--hello-world"
```
5. Then, on the consume side, you should see something similar to the following logs:
```bash
INFO:graceful_shutdown_example.app:Event finished...
INFO:aiokafka.consumer.group_coordinator:LeaveGroup request succeeded
INFO:kstreams.streams:Stream consuming from topics ['local--kstream'] has stopped!!!


INFO:kstreams.engine:Streams have STOPPED....
INFO:aiorun:Cancelling pending tasks.
INFO:aiorun:Running pending tasks till complete
INFO:aiorun:Waiting for executor shutdown.
INFO:aiorun:Shutting down async generators
INFO:aiorun:Closing the loop.
INFO:aiorun:Leaving. Bye!
INFO:aiorun:Reraising unhandled exception
Traceback (most recent call last):
File "<string>", line 1, in <module>
File "/kstreams/examples/graceful-shutdown-example/graceful_shutdown_example/app.py", line 38, in main
aiorun.run(start(), stop_on_unhandled_errors=True, shutdown_callback=stop)
File "/kstreams/examples/graceful-shutdown-example/.venv/lib/python3.12/site-packages/aiorun.py", line 370, in run
raise pending_exception_to_raise
File "/kstreams/kstreams/streams.py", line 231, in start
await self.func_wrapper_with_typing()
File "/kstreams/kstreams/streams.py", line 239, in func_wrapper_with_typing
await self.func(cr)
File "/kstreams/kstreams/middleware/middleware.py", line 80, in __call__
raise exc
File "/kstreams/kstreams/middleware/middleware.py", line 66, in __call__
return await self.next_call(cr)
^^^^^^^^^^^^^^^^^^^^^^^^
File "/kstreams/kstreams/streams.py", line 348, in __call__
return await self.handler(*params)
^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/kstreams/examples/graceful-shutdown-example/graceful_shutdown_example/app.py", line 18, in consume
raise ValueError("error....")
ValueError: error....
Handler: <kstreams.middleware.middleware.ExceptionMiddleware object at 0x10361dd00>
Topics: ['local--hello-world']
```

## Note

If you plan on using this example, pay attention to the `pyproject.toml` dependencies, where
`kstreams` is pointing to the parent folder. You will have to set the latest version.
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
import asyncio
import logging

import aiorun

import kstreams

logger = logging.getLogger(__name__)

stream_engine = kstreams.create_engine(title="my-stream-engine")


@stream_engine.stream(topics=["local--hello-world"], group_id="example-group")
async def consume(cr: kstreams.ConsumerRecord):
logger.info(f"Event consumed: headers: {cr.headers}, payload: {cr}")

if cr.value == b"error":
raise ValueError("error....")


@stream_engine.stream(topics=["local--kstream"], group_id="example-group-2")
async def consume_2(cr: kstreams.ConsumerRecord):
logger.info(f"Event consumed: headers: {cr.headers}, payload: {cr}")
await asyncio.sleep(20)
logger.info("Event finished...")


async def start():
await stream_engine.start()


async def stop(loop: asyncio.AbstractEventLoop):
await stream_engine.stop()


def main():
logging.basicConfig(level=logging.INFO)
aiorun.run(start(), stop_on_unhandled_errors=True, shutdown_callback=stop)
Loading

0 comments on commit 32e362d

Please sign in to comment.