Skip to content

Commit

Permalink
fix: typing issues
Browse files Browse the repository at this point in the history
  • Loading branch information
woile committed Nov 27, 2024
1 parent 6396b57 commit 540216b
Show file tree
Hide file tree
Showing 3 changed files with 78 additions and 65 deletions.
3 changes: 3 additions & 0 deletions kstreams/streams.py
Original file line number Diff line number Diff line change
Expand Up @@ -177,6 +177,9 @@ def __init__(
self.subscribe_by_pattern = subscribe_by_pattern
self.error_policy = error_policy

def __name__(self) -> str:
return self.name

def _create_consumer(self) -> Consumer:
if self.backend is None:
raise BackendNotSet("A backend has not been set for this stream")
Expand Down
3 changes: 1 addition & 2 deletions kstreams/types.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,7 @@

Headers = typing.Dict[str, str]
EncodedHeaders = typing.Sequence[typing.Tuple[str, bytes]]
StreamFunc = typing.Callable

StreamFunc = typing.Callable[..., typing.Any]
EngineHooks = typing.Sequence[typing.Callable[[], typing.Any]]


Expand Down
137 changes: 74 additions & 63 deletions tests/test_monitor.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,62 +20,68 @@ async def my_coroutine(_):
stream_engine.add_stream(stream=stream)
await stream.start()

assert stream.consumer is not None
await stream_engine.monitor.generate_consumer_metrics(stream.consumer)
consumer = stream.consumer

for topic_partition in consumer.assignment():
# super ugly notation but for now is the only way to get the metrics
met_committed = (
stream_engine.monitor.MET_COMMITTED.labels(
topic=topic_partition.topic,
partition=topic_partition.partition,
consumer_group=consumer._group_id,
)
.collect()[0]
list(
stream_engine.monitor.MET_COMMITTED.labels(
topic=topic_partition.topic,
partition=topic_partition.partition,
consumer_group=consumer._group_id,
).collect()
)[0]
.samples[0]
.value
)

met_position = (
stream_engine.monitor.MET_POSITION.labels(
topic=topic_partition.topic,
partition=topic_partition.partition,
consumer_group=consumer._group_id,
)
.collect()[0]
list(
stream_engine.monitor.MET_POSITION.labels(
topic=topic_partition.topic,
partition=topic_partition.partition,
consumer_group=consumer._group_id,
).collect()
)[0]
.samples[0]
.value
)

met_highwater = (
stream_engine.monitor.MET_HIGHWATER.labels(
topic=topic_partition.topic,
partition=topic_partition.partition,
consumer_group=consumer._group_id,
)
.collect()[0]
list(
stream_engine.monitor.MET_HIGHWATER.labels(
topic=topic_partition.topic,
partition=topic_partition.partition,
consumer_group=consumer._group_id,
).collect()
)[0]
.samples[0]
.value
)

met_lag = (
stream_engine.monitor.MET_LAG.labels(
topic=topic_partition.topic,
partition=topic_partition.partition,
consumer_group=consumer._group_id,
)
.collect()[0]
list(
stream_engine.monitor.MET_LAG.labels(
topic=topic_partition.topic,
partition=topic_partition.partition,
consumer_group=consumer._group_id,
).collect()
)[0]
.samples[0]
.value
)

met_position_lag = (
stream_engine.monitor.MET_POSITION_LAG.labels(
topic=topic_partition.topic,
partition=topic_partition.partition,
consumer_group=consumer._group_id,
)
.collect()[0]
list(
stream_engine.monitor.MET_POSITION_LAG.labels(
topic=topic_partition.topic,
partition=topic_partition.partition,
consumer_group=consumer._group_id,
).collect()
)[0]
.samples[0]
.value
)
Expand Down Expand Up @@ -135,56 +141,61 @@ async def my_coroutine(_):
for topic_partition in consumer.assignment():
# super ugly notation but for now is the only way to get the metrics
met_committed = (
stream_engine.monitor.MET_COMMITTED.labels(
topic=topic_partition.topic,
partition=topic_partition.partition,
consumer_group=consumer._group_id,
)
.collect()[0]
list(
stream_engine.monitor.MET_COMMITTED.labels(
topic=topic_partition.topic,
partition=topic_partition.partition,
consumer_group=consumer._group_id,
).collect()
)[0]
.samples[0]
.value
)

met_position = (
stream_engine.monitor.MET_POSITION.labels(
topic=topic_partition.topic,
partition=topic_partition.partition,
consumer_group=consumer._group_id,
)
.collect()[0]
list(
stream_engine.monitor.MET_POSITION.labels(
topic=topic_partition.topic,
partition=topic_partition.partition,
consumer_group=consumer._group_id,
).collect()
)[0]
.samples[0]
.value
)

met_highwater = (
stream_engine.monitor.MET_HIGHWATER.labels(
topic=topic_partition.topic,
partition=topic_partition.partition,
consumer_group=consumer._group_id,
)
.collect()[0]
list(
stream_engine.monitor.MET_HIGHWATER.labels(
topic=topic_partition.topic,
partition=topic_partition.partition,
consumer_group=consumer._group_id,
).collect()
)[0]
.samples[0]
.value
)

met_lag = (
stream_engine.monitor.MET_LAG.labels(
topic=topic_partition.topic,
partition=topic_partition.partition,
consumer_group=consumer._group_id,
)
.collect()[0]
list(
stream_engine.monitor.MET_LAG.labels(
topic=topic_partition.topic,
partition=topic_partition.partition,
consumer_group=consumer._group_id,
).collect()
)[0]
.samples[0]
.value
)

met_position_lag = (
stream_engine.monitor.MET_POSITION_LAG.labels(
topic=topic_partition.topic,
partition=topic_partition.partition,
consumer_group=consumer._group_id,
)
.collect()[0]
list(
stream_engine.monitor.MET_POSITION_LAG.labels(
topic=topic_partition.topic,
partition=topic_partition.partition,
consumer_group=consumer._group_id,
).collect()
)[0]
.samples[0]
.value
)
Expand All @@ -200,9 +211,9 @@ async def my_coroutine(_):
met_position_lag == consumer.highwater(topic_partition) - consumer_position
)

assert len(stream_engine.monitor.MET_POSITION_LAG.collect()[0].samples) == 2
assert len(list(stream_engine.monitor.MET_POSITION_LAG.collect())[0].samples) == 2
await stream_engine.remove_stream(stream)
assert len(stream_engine.monitor.MET_POSITION_LAG.collect()[0].samples) == 0
assert len(list(stream_engine.monitor.MET_POSITION_LAG.collect())[0].samples) == 0


@pytest.mark.asyncio
Expand All @@ -223,6 +234,6 @@ async def my_coroutine(_):
stream_engine.add_stream(stream=stream)
await stream.start()

assert len(stream_engine.monitor.MET_POSITION_LAG.collect()[0].samples) == 0
assert len(list(stream_engine.monitor.MET_POSITION_LAG.collect())[0].samples) == 0
await stream_engine.remove_stream(stream)
assert "Metrics for consumer with group-id: my-group not found" in caplog.text

0 comments on commit 540216b

Please sign in to comment.