Skip to content

Commit

Permalink
feat(transactions): Transaction added to garantee only once paradigm.…
Browse files Browse the repository at this point in the history
… Related to kpn#265
  • Loading branch information
marcosschroh committed Mar 6, 2025
1 parent d6b2a51 commit b79b267
Show file tree
Hide file tree
Showing 33 changed files with 2,507 additions and 445 deletions.
136 changes: 134 additions & 2 deletions docs/test_client.md
Original file line number Diff line number Diff line change
Expand Up @@ -192,8 +192,7 @@ async def test_e2e_example():

## Producer only

In some scenarios, your application will only produce events and other application/s will consume it, but you want to make sure that
the event was procuced in a proper way and the `topic` contains that `event`.
In some scenarios, your application will only produce events and other application/s will consume it, but you want to make sure that the event was procuced in a proper way and the `topic` contains that `event`.

```python
# producer_example.py
Expand All @@ -206,6 +205,7 @@ stream_engine = create_engine(title="my-stream-engine")

async def produce(topic: str, value: bytes, key: str):
# This could be a complicated function or something like a FastAPI view
# we can also use transactions here
await stream_engine.send(topic, value=value, key=key)


Expand Down Expand Up @@ -256,6 +256,9 @@ async def test_event_produced():
for example a `FastAPI` view.
Then you don't want to use `client.send` directly, just called the function that contains `stream_engine.send(...)`

!!! note
The example is also applicable for a `transactions`

## Defining extra topics

For some uses cases is required to produce an event to a topic (`target topic`) after it was consumed (`source topic`). We are in control of the `source topic`
Expand Down Expand Up @@ -408,6 +411,135 @@ async def test_consume_events_topics_by_pattern():
assert TopicManager.all_messages_consumed()
```

## Transactions

To test application with transctions you have to also use the `TestStreamClient`. Again, we can have three scenarions: `producer only`, `consumer only` or a combination of `both`

- For `producer only` applications with transactions follow the [testing producer only](https://kpn.github.io/kstreams/test_client/#producer-only) guide.
- For `consume only` applications we have to produce events with a transaction using the `StreamEngine`

If we have the following application:

```python
from kstreams import (
ConsumerRecord,
Stream,
TopicPartition,
)
from .engine import stream_engine


@stream_engine.stream(
"local--kstreams-transactional,
enable_auto_commit=False,
isolation_level="read_committed", # <-- This will filter aborted txn's
name="transactional-stream",
)
async def consume_from_transaction(cr: ConsumerRecord, stream: Stream):
logger.info(
f"Event consumed from topic {transactional_topic} with value: {cr.value} \n\n"
)
tp = TopicPartition(
topic=transactional_topic,
partition=cr.partition,
)
await stream.commit({tp: cr.offset + 1})

```

Then we can test the application as the following:

```python
import pytest
from kstreams.test_utils import TestStreamClient

from .engine import stram_engine


@pytest.mark.asyncio
async def test_consume_from_transactional_topic():
client = TestStreamClient(stream_engine)

tp = TopicPartition(
topic="local--kstreams-transactional",
partition=1,
)

async with client:
async with client.transaction() as t:
await t.send(
"local--kstreams-transactional",
value=b"Some event in transaction",
partition=1,
)

stream = stream_engine.get_stream("transactional-stream")

# give some time to the streams to consume all the events
await asyncio.sleep(0.1)
assert (await stream.consumer.committed(tp)) == 1
```

- For applications that are produce and consume from/to `transactional topics`, we can use the `e2e` stretegy.

```python
from kstreams import (
ConsumerRecord,
Stream,
TopicPartition,
Transactional,
)

from .engine import stream_engine


async def save_to_db():
...


@stream_engine.stream("local--kstreams-transactional", isolation_level="read_committed")
async def stream_transactional(cr: ConsumerRecord):
save_to_db(cr.value)


@stream_engine.stream("local--kstreams", group_id="my-group-id")
async def my_stream(cr: ConsumerRecord, transaction: Transactional):
async with transaction(transaction_id="transaction_id") as t:
await t.send("local--kstreams-transactional", value=cr.value)
tp = TopicPartition(
topic="local--kstreams",
partition=cr.partition,
)
await t.commit_offsets(offsets={tp: cr.offset}, group_id="my-group-id")

```

Then we can test the application as the following:

```python
from unittest import mock

import pytest
from kstreams.test_utils import TestStreamClient

from .engine import stram_engine


@pytest.mark.asyncio
async def test_e2et():
client = TestStreamClient(stream_engine)

with mock.patch("transactions.streams.save_to_db") as save_to_db:
async with client:
await client.send(
topic="local--kstreams",
value=b"Hello world!",
partition=10,
)

save_to_db.assert_awaited_once_with(b"Hello world!")
```

## Disabling monitoring during testing

Monitoring streams and producers is vital for streaming application but it requires extra effort. Sometimes during testing,
Expand Down
Loading

0 comments on commit b79b267

Please sign in to comment.