Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: always subscribe to topics when a Stream is started. Multiple ca… #191

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 12 additions & 2 deletions examples/simple-example/simple_example/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,18 +24,28 @@ class EventStore:
def add(self, event: ConsumerRecord) -> None:
self.events.append(event)

def remove(self) -> None:
self.events.pop(0)

@property
def total(self):
def total(self) -> int:
return len(self.events)

def clean(self) -> None:
self.events = []


event_store = EventStore()


@stream_engine.stream(topic, group_id="example-group")
async def consume(cr: ConsumerRecord):
logger.info(f"Event consumed: {cr} \n")
event_store.add(cr)

if cr.value == b"remove-event":
event_store.remove()
else:
event_store.add(cr)

await stream_engine.send("local--hello-world", value=cr.value)

Expand Down
7 changes: 3 additions & 4 deletions kstreams/streams.py
Original file line number Diff line number Diff line change
Expand Up @@ -137,11 +137,10 @@ async def stop(self) -> None:

def subscribe(self) -> None:
if self.consumer is None:
# Always create a consumer on stream.start
# Only create a consumer if it was not previously created
self.consumer = self._create_consumer()
self.consumer.subscribe(
topics=self.topics, listener=self.rebalance_listener
)

self.consumer.subscribe(topics=self.topics, listener=self.rebalance_listener)

async def commit(
self, offsets: typing.Optional[typing.Dict[TopicPartition, int]] = None
Expand Down
60 changes: 60 additions & 0 deletions tests/test_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -405,6 +405,66 @@ async def test_e2e_example():
assert TopicManager.all_messages_consumed()


@pytest.mark.asyncio
async def test_repeat_e2e_example():
"""
This test is to show that the same Stream can be tested multiple
times and that all the resources must start from scratch on every unittest:
1. There must not be events in topics from previous tests
2. All extra partitions should be removed from Topics
3. Streams on new test must have only the default partitions (0, 1, 2)
4. Total events per Topic (asyncio.Queue) must be 0
"""
simple_example = importlib.import_module(
"examples.simple-example.simple_example.app"
)

topic_name = simple_example.topic
client = TestStreamClient(simple_example.stream_engine)

# From the previous test, we have produced 5 events
# which still they are on memory. This is the application
# logic
assert simple_example.event_store.total == 5

# clean the application store
simple_example.event_store.clean()
assert simple_example.event_store.total == 0

async with client:
topic = TopicManager.get(topic_name)
assert topic.is_empty()

# Even Though the topic is empty, the counter might be not
assert topic.total_events == 0

# check that all default partitions are empty
assert topic.total_partition_events[0] == -1
assert topic.total_partition_events[1] == -1
assert topic.total_partition_events[2] == -1

# add 1 event to the store
metadata = await client.send(topic=topic_name, value=b"add-event")

# give some time to process the event
await asyncio.sleep(0.1)
assert simple_example.event_store.total == 1
assert metadata.partition == 0

# remove 1 event from the store
metadata = await client.send(
topic=topic_name, value=b"remove-event", partition=1
)

# give some time to process the event
await asyncio.sleep(0.1)
assert simple_example.event_store.total == 0
assert metadata.partition == 1

# check that all events has been consumed
assert TopicManager.all_messages_consumed()


@pytest.mark.asyncio
@pytest.mark.parametrize(
"monitoring_enabled",
Expand Down
Loading