From 29277924ee682116e1d3b9527a9bbe8100370206 Mon Sep 17 00:00:00 2001 From: marcosschroh Date: Wed, 4 Sep 2024 16:21:01 +0200 Subject: [PATCH] docs: large projects documentation/tutorial added --- docs/large_project_structure.md | 107 ++++++++++++++++++++++++++++++++ mkdocs.yml | 1 + 2 files changed, 108 insertions(+) create mode 100644 docs/large_project_structure.md diff --git a/docs/large_project_structure.md b/docs/large_project_structure.md new file mode 100644 index 00000000..5b47e5aa --- /dev/null +++ b/docs/large_project_structure.md @@ -0,0 +1,107 @@ +If you have multiple a large project with maybe multiple `streams` we recommend the following project structure: + +``` +├── my-project +│ ├── my_project +│ │   ├── __init__.py +│ │   ├── app.py +│ │   ├── resources.py +│ │   └── streams.py +│ │── pyproject.toml +│ │── README.md +``` + +- The file `my_project/resouces.py` contains the creation of the `StreamEngine` +- The file `my_project/app.py` contains the entrypoint of your program +- The file `my_project/streams.py` contains all the `Streams` + +A full project example ready to use can be found [here](https://github.com/kpn/kstreams/tree/master/examples/recommended-worker-app) + +!!! note + This is just a recommendation, there are many ways to structure your project + +## Resources + +This python module contains any global resource that will be used later in the application, for example `DB connections` or the `StreamEngine`. Typically we will have the following: + +```python +import kstreams + +stream_engine = kstreams.create_engine(title="my-stream-engine") +``` + +Then later `stream_engine` can be resused to start the application + +## Streams + +When starting your project you can have `N` number of `Streams`, let's say in `streams.py` module. All of the `Streams` will run next to each other and because they are in the same project it is really easy to share common code. However, this comes with a downside of `scalability` as it is not possible to take the advantages of `kafka` and scale up `Streams` individually. In next versions the `StreamEngine` will be able to select which `Stream/s` should run to mitigate this issue. Typically, your `streams.py` will look like: + +```python +import logging + +from kstreams import ConsumerRecord, Send, stream + +logger = logging.getLogger(__name__) + + +@stream("local--hello-world", group_id="example-group") +async def my_stream(cr: ConsumerRecord, send: Send) -> None: + logger.info(f"showing bytes: {cr.value}") + value = f"Event confirmed. {cr.value}" + + await send( + "another-topic-to-wink", + value=value.encode(), + key="1", + ) +``` + +It is worth to note three things: + +- We create the `Stream` using the `stream` factory +- If you need to produce events inside a `Stream` add the `send coroutine` using [dependency-injection](https://kpn.github.io/kstreams/stream/#dependency-injection) +- We are not using `StreamEngine` at all to avoid `circular import` errors + +## Application + +The `entrypoint` is usually in `app.py`. The module contains the import of `stream_engine`, it's `hooks` and the `streams` to be added to the `engine`: + +```python +import asyncio +import logging + +from kstreams.stream_utils import StreamErrorPolicy + +import aiorun + +from .resources import stream_engine +from .streams import my_stream + +logger = logging.getLogger(__name__) + + +# hooks +@stream_engine.after_startup +async def init_events(): + await stream_engine.send("local--hello-world", value="Hi Kstreams!") + + +# add the stream to the stream_engine +stream_engine.add_stream(my_stream, error_policy=StreamErrorPolicy.RESTART) + + +async def start(): + await stream_engine.start() + + +async def stop(loop: asyncio.AbstractEventLoop): + await stream_engine.stop() + + +def main(): + logging.basicConfig(level=logging.INFO) + logger.info("Starting application...") + aiorun.run(start(), stop_on_unhandled_errors=True, shutdown_callback=stop) +``` + +To run it we recommend [aiorun](https://github.com/cjrh/aiorun). It can be also run with `asyncio` directly but `aiorun` does all the boilerplate for us. diff --git a/mkdocs.yml b/mkdocs.yml index 0925568b..1f863e78 100644 --- a/mkdocs.yml +++ b/mkdocs.yml @@ -36,6 +36,7 @@ nav: - Testing: 'test_client.md' - Middleware: 'middleware.md' - Utils: 'utils.md' + - Large Projects: 'large_project_structure.md' markdown_extensions: - pymdownx.highlight