From 540216be91cfa145f3783a412f7313c0a5f39131 Mon Sep 17 00:00:00 2001 From: Santiago Fraire Willemoes Date: Wed, 27 Nov 2024 12:31:35 +0100 Subject: [PATCH] fix: typing issues --- kstreams/streams.py | 3 + kstreams/types.py | 3 +- tests/test_monitor.py | 137 +++++++++++++++++++++++------------------- 3 files changed, 78 insertions(+), 65 deletions(-) diff --git a/kstreams/streams.py b/kstreams/streams.py index bf17682e..edad0362 100644 --- a/kstreams/streams.py +++ b/kstreams/streams.py @@ -177,6 +177,9 @@ def __init__( self.subscribe_by_pattern = subscribe_by_pattern self.error_policy = error_policy + def __name__(self) -> str: + return self.name + def _create_consumer(self) -> Consumer: if self.backend is None: raise BackendNotSet("A backend has not been set for this stream") diff --git a/kstreams/types.py b/kstreams/types.py index 3562f3b6..90107722 100644 --- a/kstreams/types.py +++ b/kstreams/types.py @@ -8,8 +8,7 @@ Headers = typing.Dict[str, str] EncodedHeaders = typing.Sequence[typing.Tuple[str, bytes]] -StreamFunc = typing.Callable - +StreamFunc = typing.Callable[..., typing.Any] EngineHooks = typing.Sequence[typing.Callable[[], typing.Any]] diff --git a/tests/test_monitor.py b/tests/test_monitor.py index 0b94361b..bb013d98 100644 --- a/tests/test_monitor.py +++ b/tests/test_monitor.py @@ -20,62 +20,68 @@ async def my_coroutine(_): stream_engine.add_stream(stream=stream) await stream.start() + assert stream.consumer is not None await stream_engine.monitor.generate_consumer_metrics(stream.consumer) consumer = stream.consumer for topic_partition in consumer.assignment(): # super ugly notation but for now is the only way to get the metrics met_committed = ( - stream_engine.monitor.MET_COMMITTED.labels( - topic=topic_partition.topic, - partition=topic_partition.partition, - consumer_group=consumer._group_id, - ) - .collect()[0] + list( + stream_engine.monitor.MET_COMMITTED.labels( + topic=topic_partition.topic, + partition=topic_partition.partition, + consumer_group=consumer._group_id, + ).collect() + )[0] .samples[0] .value ) met_position = ( - stream_engine.monitor.MET_POSITION.labels( - topic=topic_partition.topic, - partition=topic_partition.partition, - consumer_group=consumer._group_id, - ) - .collect()[0] + list( + stream_engine.monitor.MET_POSITION.labels( + topic=topic_partition.topic, + partition=topic_partition.partition, + consumer_group=consumer._group_id, + ).collect() + )[0] .samples[0] .value ) met_highwater = ( - stream_engine.monitor.MET_HIGHWATER.labels( - topic=topic_partition.topic, - partition=topic_partition.partition, - consumer_group=consumer._group_id, - ) - .collect()[0] + list( + stream_engine.monitor.MET_HIGHWATER.labels( + topic=topic_partition.topic, + partition=topic_partition.partition, + consumer_group=consumer._group_id, + ).collect() + )[0] .samples[0] .value ) met_lag = ( - stream_engine.monitor.MET_LAG.labels( - topic=topic_partition.topic, - partition=topic_partition.partition, - consumer_group=consumer._group_id, - ) - .collect()[0] + list( + stream_engine.monitor.MET_LAG.labels( + topic=topic_partition.topic, + partition=topic_partition.partition, + consumer_group=consumer._group_id, + ).collect() + )[0] .samples[0] .value ) met_position_lag = ( - stream_engine.monitor.MET_POSITION_LAG.labels( - topic=topic_partition.topic, - partition=topic_partition.partition, - consumer_group=consumer._group_id, - ) - .collect()[0] + list( + stream_engine.monitor.MET_POSITION_LAG.labels( + topic=topic_partition.topic, + partition=topic_partition.partition, + consumer_group=consumer._group_id, + ).collect() + )[0] .samples[0] .value ) @@ -135,56 +141,61 @@ async def my_coroutine(_): for topic_partition in consumer.assignment(): # super ugly notation but for now is the only way to get the metrics met_committed = ( - stream_engine.monitor.MET_COMMITTED.labels( - topic=topic_partition.topic, - partition=topic_partition.partition, - consumer_group=consumer._group_id, - ) - .collect()[0] + list( + stream_engine.monitor.MET_COMMITTED.labels( + topic=topic_partition.topic, + partition=topic_partition.partition, + consumer_group=consumer._group_id, + ).collect() + )[0] .samples[0] .value ) met_position = ( - stream_engine.monitor.MET_POSITION.labels( - topic=topic_partition.topic, - partition=topic_partition.partition, - consumer_group=consumer._group_id, - ) - .collect()[0] + list( + stream_engine.monitor.MET_POSITION.labels( + topic=topic_partition.topic, + partition=topic_partition.partition, + consumer_group=consumer._group_id, + ).collect() + )[0] .samples[0] .value ) met_highwater = ( - stream_engine.monitor.MET_HIGHWATER.labels( - topic=topic_partition.topic, - partition=topic_partition.partition, - consumer_group=consumer._group_id, - ) - .collect()[0] + list( + stream_engine.monitor.MET_HIGHWATER.labels( + topic=topic_partition.topic, + partition=topic_partition.partition, + consumer_group=consumer._group_id, + ).collect() + )[0] .samples[0] .value ) met_lag = ( - stream_engine.monitor.MET_LAG.labels( - topic=topic_partition.topic, - partition=topic_partition.partition, - consumer_group=consumer._group_id, - ) - .collect()[0] + list( + stream_engine.monitor.MET_LAG.labels( + topic=topic_partition.topic, + partition=topic_partition.partition, + consumer_group=consumer._group_id, + ).collect() + )[0] .samples[0] .value ) met_position_lag = ( - stream_engine.monitor.MET_POSITION_LAG.labels( - topic=topic_partition.topic, - partition=topic_partition.partition, - consumer_group=consumer._group_id, - ) - .collect()[0] + list( + stream_engine.monitor.MET_POSITION_LAG.labels( + topic=topic_partition.topic, + partition=topic_partition.partition, + consumer_group=consumer._group_id, + ).collect() + )[0] .samples[0] .value ) @@ -200,9 +211,9 @@ async def my_coroutine(_): met_position_lag == consumer.highwater(topic_partition) - consumer_position ) - assert len(stream_engine.monitor.MET_POSITION_LAG.collect()[0].samples) == 2 + assert len(list(stream_engine.monitor.MET_POSITION_LAG.collect())[0].samples) == 2 await stream_engine.remove_stream(stream) - assert len(stream_engine.monitor.MET_POSITION_LAG.collect()[0].samples) == 0 + assert len(list(stream_engine.monitor.MET_POSITION_LAG.collect())[0].samples) == 0 @pytest.mark.asyncio @@ -223,6 +234,6 @@ async def my_coroutine(_): stream_engine.add_stream(stream=stream) await stream.start() - assert len(stream_engine.monitor.MET_POSITION_LAG.collect()[0].samples) == 0 + assert len(list(stream_engine.monitor.MET_POSITION_LAG.collect())[0].samples) == 0 await stream_engine.remove_stream(stream) assert "Metrics for consumer with group-id: my-group not found" in caplog.text