Skip to content

Commit

Permalink
Merge pull request #49 from kpn/47-update-examples
Browse files Browse the repository at this point in the history
47 update examples
  • Loading branch information
woile authored Aug 23, 2022
2 parents f4b7c2c + 6a27315 commit e499268
Show file tree
Hide file tree
Showing 13 changed files with 77 additions and 62 deletions.
37 changes: 0 additions & 37 deletions examples/fastapi-example/fastapi_example/app.py

This file was deleted.

Empty file.
10 changes: 0 additions & 10 deletions examples/fastapi-example/fastapi_example/streaming/streams.py

This file was deleted.

Original file line number Diff line number Diff line change
@@ -1,21 +1,16 @@
## FastAPI example
## FastAPI webapp example

Simple [`FastAPI`](https://fastapi.tiangolo.com/) example with `kstreams`

### Requirements

python 3.8+, poetry, docker-compose

### Installation

```bash
poetry install
```

## Usage

1. Start the kafka cluster: From `kstreams` project root execute `./scripts/cluster/start/`
2. Inside the `fastapi-example` folder exeute `poetry run python -m fastapi_example`
1. Start the kafka cluster: From `kstreams` project root execute `./scripts/cluster/start`
2. Run `cd examples/fastapi-webapp` and execute `poetry install`
3. Run `poetry run app`

Then you should see something similar to the following logs:

Expand All @@ -36,7 +31,7 @@ INFO: Application startup complete.
- The application also has a `stream` that consumes from the topic `local--kstream`.
- The application `metrics` are exposed with the endpoint `/metrics`. To see the do a `GET` `/metrics`.
After doing a `GET` to `http://localhost:8000` you should see the following logs:
After doing a `GET` to `http://localhost:8000/events` you should see the following logs:
```bash
Event consumed: headers: (), payload: b'{"message": "hello world!"}'
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,17 @@

import uvicorn

if __name__ == "__main__":

def main():
uvicorn.run(
app="fastapi_example.app:app",
app="fastapi_webapp.app:app",
host="localhost",
port=8000,
log_level=logging.INFO,
reload=True,
debug=True,
)


if __name__ == "__main__":
main()
24 changes: 24 additions & 0 deletions examples/fastapi-webapp/fastapi_webapp/app.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
from fastapi import FastAPI
from starlette_prometheus import PrometheusMiddleware, metrics

from .resources import stream_engine
from .streams import consume
from .views import router

app = FastAPI()


@app.on_event("startup")
async def startup_event():
await stream_engine.start()


@app.on_event("shutdown")
async def shutdown_event():
await stream_engine.stop()


stream_engine.add_stream(consume)
app.include_router(router)
app.add_middleware(PrometheusMiddleware, filter_unhandled_paths=True)
app.add_api_route("/metrics", metrics) # type: ignore
8 changes: 8 additions & 0 deletions examples/fastapi-webapp/fastapi_webapp/streams.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
from kstreams import Stream, stream


@stream("local--kstream")
async def consume(stream: Stream):
print("consuming.....")
async for cr in stream:
print(f"Event consumed: headers: {cr.headers}, payload: {cr.value}")
28 changes: 28 additions & 0 deletions examples/fastapi-webapp/fastapi_webapp/views.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
from fastapi import APIRouter
from fastapi.responses import JSONResponse

from .resources import stream_engine

router = APIRouter()


@router.get("/events")
async def produce_event():
"""Send an event to the cluster.
This should be a POST, but like this it can be
easily executed in the browser.
"""
payload = '{"message": "hello world!"}'

metadata = await stream_engine.send(
"local--kstream",
value=payload.encode(),
)

msg = {
"topic": metadata.topic,
"partition": metadata.partition,
"offset": metadata.offset,
}
return JSONResponse(content=msg)
File renamed without changes.
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
[tool.poetry]
name = "fastapi-example"
name = "fastapi_webapp"
version = "0.1.0"
description = ""
authors = ["Marcos Schroh <[email protected]>"]
Expand All @@ -14,3 +14,6 @@ uvicorn = "^0.18.2"
[build-system]
requires = ["poetry-core>=1.0.0"]
build-backend = "poetry.core.masonry.api"

[tool.poetry.scripts]
app = "fastapi_webapp.__main__:main"
3 changes: 1 addition & 2 deletions kstreams/streams.py
Original file line number Diff line number Diff line change
Expand Up @@ -130,8 +130,7 @@ async def __anext__(self) -> structs.ConsumerRecord:
await self.start()

try:
# value is a ConsumerRecord:
# namedtuple["topic", "partition", "offset", "key", "value"]
# value is a ConsumerRecord, which is a dataclass
consumer_record: structs.ConsumerRecord = (
await self.consumer.getone() # type: ignore
)
Expand Down

0 comments on commit e499268

Please sign in to comment.