-
Notifications
You must be signed in to change notification settings - Fork 11
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
docs: large projects documentation/tutorial added
- Loading branch information
1 parent
e0c03dd
commit 2927792
Showing
2 changed files
with
108 additions
and
0 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,107 @@ | ||
If you have multiple a large project with maybe multiple `streams` we recommend the following project structure: | ||
|
||
``` | ||
├── my-project | ||
│ ├── my_project | ||
│ │ ├── __init__.py | ||
│ │ ├── app.py | ||
│ │ ├── resources.py | ||
│ │ └── streams.py | ||
│ │── pyproject.toml | ||
│ │── README.md | ||
``` | ||
|
||
- The file `my_project/resouces.py` contains the creation of the `StreamEngine` | ||
- The file `my_project/app.py` contains the entrypoint of your program | ||
- The file `my_project/streams.py` contains all the `Streams` | ||
|
||
A full project example ready to use can be found [here](https://github.com/kpn/kstreams/tree/master/examples/recommended-worker-app) | ||
|
||
!!! note | ||
This is just a recommendation, there are many ways to structure your project | ||
|
||
## Resources | ||
|
||
This python module contains any global resource that will be used later in the application, for example `DB connections` or the `StreamEngine`. Typically we will have the following: | ||
|
||
```python | ||
import kstreams | ||
|
||
stream_engine = kstreams.create_engine(title="my-stream-engine") | ||
``` | ||
|
||
Then later `stream_engine` can be resused to start the application | ||
|
||
## Streams | ||
|
||
When starting your project you can have `N` number of `Streams`, let's say in `streams.py` module. All of the `Streams` will run next to each other and because they are in the same project it is really easy to share common code. However, this comes with a downside of `scalability` as it is not possible to take the advantages of `kafka` and scale up `Streams` individually. In next versions the `StreamEngine` will be able to select which `Stream/s` should run to mitigate this issue. Typically, your `streams.py` will look like: | ||
|
||
```python | ||
import logging | ||
|
||
from kstreams import ConsumerRecord, Send, stream | ||
|
||
logger = logging.getLogger(__name__) | ||
|
||
|
||
@stream("local--hello-world", group_id="example-group") | ||
async def my_stream(cr: ConsumerRecord, send: Send) -> None: | ||
logger.info(f"showing bytes: {cr.value}") | ||
value = f"Event confirmed. {cr.value}" | ||
|
||
await send( | ||
"another-topic-to-wink", | ||
value=value.encode(), | ||
key="1", | ||
) | ||
``` | ||
|
||
It is worth to note three things: | ||
|
||
- We create the `Stream` using the `stream` factory | ||
- If you need to produce events inside a `Stream` add the `send coroutine` using [dependency-injection](https://kpn.github.io/kstreams/stream/#dependency-injection) | ||
- We are not using `StreamEngine` at all to avoid `circular import` errors | ||
|
||
## Application | ||
|
||
The `entrypoint` is usually in `app.py`. The module contains the import of `stream_engine`, it's `hooks` and the `streams` to be added to the `engine`: | ||
|
||
```python | ||
import asyncio | ||
import logging | ||
|
||
from kstreams.stream_utils import StreamErrorPolicy | ||
|
||
import aiorun | ||
|
||
from .resources import stream_engine | ||
from .streams import my_stream | ||
|
||
logger = logging.getLogger(__name__) | ||
|
||
|
||
# hooks | ||
@stream_engine.after_startup | ||
async def init_events(): | ||
await stream_engine.send("local--hello-world", value="Hi Kstreams!") | ||
|
||
|
||
# add the stream to the stream_engine | ||
stream_engine.add_stream(my_stream, error_policy=StreamErrorPolicy.RESTART) | ||
|
||
|
||
async def start(): | ||
await stream_engine.start() | ||
|
||
|
||
async def stop(loop: asyncio.AbstractEventLoop): | ||
await stream_engine.stop() | ||
|
||
|
||
def main(): | ||
logging.basicConfig(level=logging.INFO) | ||
logger.info("Starting application...") | ||
aiorun.run(start(), stop_on_unhandled_errors=True, shutdown_callback=stop) | ||
``` | ||
|
||
To run it we recommend [aiorun](https://github.com/cjrh/aiorun). It can be also run with `asyncio` directly but `aiorun` does all the boilerplate for us. |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters