Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: subscribe topics by pattern #199

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ if __name__ == "__main__":

- [x] Produce events
- [x] Consumer events with `Streams`
- [x] Subscribe to topics by `pattern`
- [x] `Prometheus` metrics and custom monitoring
- [x] TestClient
- [x] Custom Serialization and Deserialization
Expand Down
15 changes: 1 addition & 14 deletions docs/stream.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ Consuming can be done using `kstreams.Stream`. You only need to decorate a `coro
options:
show_root_heading: true
docstring_section_style: table
show_signature_annotations: false
show_source: false
members:
-

Expand Down Expand Up @@ -187,19 +187,6 @@ Traceback (most recent call last):
AttributeError: 'ConsumerRecord' object has no attribute 'payload'
```

## Consuming from multiple topics

Consuming from multiple topics using one `stream` is possible. A `List[str]` of topics must be provided.

```python title="Consume from multiple topics"
stream_engine = create_engine(title="my-stream-engine")


@stream_engine.stream(["local--kstreams", "local--hello-world"], group_id="example-group")
async def consume(cr: ConsumerRecord) -> None:
print(f"Event consumed from topic {cr.topic}: headers: {cr.headers}, payload: {cr.value}")
```

## Changing consumer behavior

Most of the time you will only set the `topic` and the `group_id` to the `consumer`, but sometimes you might want more control over it, for example changing the `policy for resetting offsets on OffsetOutOfRange errors` or `session timeout`. To do this, you have to use the same `kwargs` as the [aiokafka consumer](https://aiokafka.readthedocs.io/en/stable/api.html#aiokafkaconsumer-class) API
Expand Down
55 changes: 55 additions & 0 deletions docs/test_client.md
Original file line number Diff line number Diff line change
Expand Up @@ -353,6 +353,61 @@ async with client:
assert event.key == key
```

## Topics subscribed by pattern

When a `Stream` is using `pattern` subscription it is not possible to know before hand how many topics the `Stream` will consume from.
To solve this problem the `topics` must be pre defined using the `extra topics` features from the `TestClient`:

In the following example we have a `Stream` that will consume from topics that match the regular expression `^dev--customer-.*$`, for example `dev--customer-invoice` and `dev--customer-profile`.

```python
# app.py
from kstreams import ConsumerRecord

stream_engine = create_engine(title="my-stream-engine")


@stream_engine.stream(topics="^dev--customer-.*$", subscribe_by_pattern=True)
async def stream(cr: ConsumerRecord):
if cr.topic == customer_invoice_topic:
assert cr.value == invoice_event
elif cr.topic == customer_profile_topic:
assert cr.value == profile_event
else:
raise ValueError(f"Invalid topic {cr.topic}")
```

Then to test our `Stream`, we need to pre define the topics:

```python
# test_stream.py
import pytest
from kstreams.test_utils import TestStreamClient

from app import stream_engine


@pytest.mark.asyncio
async def test_consume_events_topics_by_pattern():
"""
This test shows the possibility to subscribe to multiple topics using a pattern
"""
customer_invoice_topic = "dev--customer-invoice"
customer_profile_topic = "dev--customer-profile"

client = TestStreamClient(
stream_engine, topics=[customer_invoice_topic, customer_profile_topic]
)

async with client:
await client.send(customer_invoice_topic, value=b"invoice-1", key="1")
await client.send(customer_profile_topic, value=b"profile-1", key="1")

# give some time to consume all the events
await asyncio.sleep(0.1)
assert TopicManager.all_messages_consumed()
```

## Disabling monitoring during testing

Monitoring streams and producers is vital for streaming application but it requires extra effort. Sometimes during testing,
Expand Down
47 changes: 47 additions & 0 deletions examples/subscribe-topics-by-pattern/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
# Subscribe topics by pattern

In the following example we have a `Stream` that will consume from topics that match the regular expression `^local--customer-.*$`, for example
`local--customer-invoice` and `local--customer-profile`.

## Requirements

python 3.8+, poetry, docker-compose

### Installation

```bash
poetry install
```

## Usage

1. Start the kafka cluster: From `kstreams` project root execute `./scripts/cluster/start`
2. Inside this folder execute `poetry run app`

The app publishes events to the topics `local--customer-invoice` and `local--customer-profile`, then the events are consumed by the `stream` that has subscribed them using the pattern `^local--customer-.*$`.

You should see something similar to the following logs:

```bash
❯ poetry run app

INFO:aiokafka.consumer.consumer:Subscribed to topic pattern: re.compile('^local--customer-.*$')
INFO:kstreams.prometheus.monitor:Starting Prometheus Monitoring started...
INFO:aiokafka.consumer.subscription_state:Updating subscribed topics to: frozenset({'local--customer-profile', 'local--customer-invoice'})
INFO:aiokafka.consumer.group_coordinator:Discovered coordinator 1 for group topics-by-pattern-group
INFO:aiokafka.consumer.group_coordinator:Revoking previously assigned partitions set() for group topics-by-pattern-group
INFO:aiokafka.consumer.group_coordinator:(Re-)joining group topics-by-pattern-group
INFO:aiokafka.consumer.group_coordinator:Joined group 'topics-by-pattern-group' (generation 7) with member_id aiokafka-0.11.0-d4e8d901-666d-4286-8c6c-621a12b7216f
INFO:aiokafka.consumer.group_coordinator:Elected group leader -- performing partition assignments using roundrobin
INFO:aiokafka.consumer.group_coordinator:Successfully synced group topics-by-pattern-group with generation 7
INFO:aiokafka.consumer.group_coordinator:Setting newly assigned partitions {TopicPartition(topic='local--customer-profile', partition=0), TopicPartition(topic='local--customer-invoice', partition=0)} for group topics-by-pattern-group
INFO:subscribe_topics_by_pattern.app:Event b'profile-1' consumed from topic local--customer-profile
INFO:subscribe_topics_by_pattern.app:Event b'profile-1' consumed from topic local--customer-profile
INFO:subscribe_topics_by_pattern.app:Event b'invoice-1' consumed from topic local--customer-invoice
INFO:subscribe_topics_by_pattern.app:Event b'invoice-1' consumed from topic local--customer-invoice
```

## Note

If you plan on using this example, pay attention to the `pyproject.toml` dependencies, where
`kstreams` is pointing to the parent folder. You will have to set the latest version.
Loading
Loading