diff --git a/.benchmarks/Darwin-CPython-3.11-64bit/0002_81eb783e807835e049815241011f694a54404471_20241127_140629_uncommited-changes.json b/.benchmarks/Darwin-CPython-3.11-64bit/0002_81eb783e807835e049815241011f694a54404471_20241127_140629_uncommited-changes.json new file mode 100644 index 00000000..442e2b1a --- /dev/null +++ b/.benchmarks/Darwin-CPython-3.11-64bit/0002_81eb783e807835e049815241011f694a54404471_20241127_140629_uncommited-changes.json @@ -0,0 +1,148 @@ +{ + "machine_info": { + "node": "Woile-MacBook-Pro.local", + "processor": "arm", + "machine": "arm64", + "python_compiler": "Clang 16.0.6 ", + "python_implementation": "CPython", + "python_implementation_version": "3.11.10", + "python_version": "3.11.10", + "python_build": [ + "main", + "Sep 7 2024 01:03:31" + ], + "release": "24.1.0", + "system": "Darwin", + "cpu": { + "python_version": "3.11.10.final.0 (64 bit)", + "cpuinfo_version": [ + 9, + 0, + 0 + ], + "cpuinfo_version_string": "9.0.0", + "arch": "ARM_8", + "bits": 64, + "count": 12, + "arch_string_raw": "arm64", + "brand_raw": "Apple M3 Pro" + } + }, + "commit_info": { + "id": "81eb783e807835e049815241011f694a54404471", + "time": "2024-11-27T14:40:28+01:00", + "author_time": "2024-11-27T14:40:28+01:00", + "dirty": true, + "project": "kstreams", + "branch": "fix/generic-consumer-record" + }, + "benchmarks": [ + { + "group": null, + "name": "test_startup_and_processing_single_consumer_record", + "fullname": "tests/test_benchmarks.py::test_startup_and_processing_single_consumer_record", + "params": null, + "param": null, + "extra_info": {}, + "options": { + "disable_gc": false, + "timer": "perf_counter", + "min_rounds": 5, + "max_time": 1.0, + "min_time": 5e-06, + "warmup": false + }, + "stats": { + "min": 4.199999966658652e-05, + "max": 0.012051916011841968, + "mean": 9.800166512286035e-05, + "stddev": 0.0001816612665091389, + "rounds": 4471, + "median": 9.42080223467201e-05, + "iqr": 4.929174610879272e-05, + "q1": 6.958325684536248e-05, + "q3": 0.0001188750029541552, + "iqr_outliers": 14, + "stddev_outliers": 9, + "outliers": "9;14", + "ld15iqr": 4.199999966658652e-05, + "hd15iqr": 0.00020029101870022714, + "ops": 10203.90825754179, + "total": 0.43816544476430863, + "iterations": 1 + } + }, + { + "group": null, + "name": "test_startup_and_inject_all", + "fullname": "tests/test_benchmarks.py::test_startup_and_inject_all", + "params": null, + "param": null, + "extra_info": {}, + "options": { + "disable_gc": false, + "timer": "perf_counter", + "min_rounds": 5, + "max_time": 1.0, + "min_time": 5e-06, + "warmup": false + }, + "stats": { + "min": 4.3625012040138245e-05, + "max": 0.01566049997927621, + "mean": 0.00022631162852908016, + "stddev": 0.0002073705067274279, + "rounds": 15019, + "median": 0.0002195000124629587, + "iqr": 0.0001750317678670399, + "q1": 0.00013429098180495203, + "q3": 0.0003093227496719919, + "iqr_outliers": 19, + "stddev_outliers": 133, + "outliers": "133;19", + "ld15iqr": 4.3625012040138245e-05, + "hd15iqr": 0.0005731249984819442, + "ops": 4418.685891217931, + "total": 3.398974348878255, + "iterations": 1 + } + }, + { + "group": null, + "name": "test_consume_many", + "fullname": "tests/test_benchmarks.py::test_consume_many", + "params": null, + "param": null, + "extra_info": {}, + "options": { + "disable_gc": false, + "timer": "perf_counter", + "min_rounds": 5, + "max_time": 1.0, + "min_time": 5e-06, + "warmup": false + }, + "stats": { + "min": 0.0006483749893959612, + "max": 0.0008417500066570938, + "mean": 0.0006934208655618783, + "stddev": 2.4773934466283123e-05, + "rounds": 943, + "median": 0.0006997500022407621, + "iqr": 3.609351551858708e-05, + "q1": 0.0006723854967276566, + "q3": 0.0007084790122462437, + "iqr_outliers": 16, + "stddev_outliers": 246, + "outliers": "246;16", + "ld15iqr": 0.0006483749893959612, + "hd15iqr": 0.0007627500162925571, + "ops": 1442.1256262453264, + "total": 0.6538958762248512, + "iterations": 1 + } + } + ], + "datetime": "2024-11-27T14:06:35.464861+00:00", + "version": "5.1.0" +} \ No newline at end of file diff --git a/.github/workflows/bench-release.yml b/.github/workflows/bench-release.yml index 1d15de8d..4bea8502 100644 --- a/.github/workflows/bench-release.yml +++ b/.github/workflows/bench-release.yml @@ -1,4 +1,4 @@ -name: Bump version +name: Benchmark latest release on: push: @@ -46,5 +46,5 @@ jobs: git config --global user.email "action@github.com" git config --global user.name "GitHub Action" git add .benchmarks/ - git commit -m "bench: bench: add benchmark current release" + git commit -m "bench: current release" git push origin master diff --git a/.github/workflows/pr-tests.yaml b/.github/workflows/pr-tests.yaml index e4920cdb..9b2c7169 100644 --- a/.github/workflows/pr-tests.yaml +++ b/.github/workflows/pr-tests.yaml @@ -17,7 +17,7 @@ on: required: true jobs: - build_test_bench: + test: runs-on: ubuntu-latest strategy: matrix: @@ -31,7 +31,6 @@ jobs: with: python-version: ${{ matrix.python-version }} architecture: x64 - - name: Set Cache uses: actions/cache@v4 id: cache # name for referring later @@ -42,7 +41,6 @@ jobs: restore-keys: | ${{ runner.os }}-cache- ${{ runner.os }}- - - name: Install Dependencies # if: steps.cache.outputs.cache-hit != 'true' run: | @@ -50,17 +48,11 @@ jobs: poetry --version poetry config --local virtualenvs.in-project true poetry install - - name: Test and Lint run: | git config --global user.email "action@github.com" git config --global user.name "GitHub Action" ./scripts/test - - - name: Benchmark regression test - run: | - ./scripts/bench-compare - - name: Upload coverage to Codecov uses: codecov/codecov-action@v5.0.2 with: @@ -68,3 +60,34 @@ jobs: name: kstreams fail_ci_if_error: true token: ${{secrets.CODECOV_TOKEN}} + bench: + runs-on: ubuntu-latest + steps: + - uses: actions/checkout@v4 + with: + fetch-depth: 0 + - name: Setup python + uses: actions/setup-python@v5 + with: + python-version: '3.13' + architecture: x64 + - name: Set Cache + uses: actions/cache@v4 + id: cache # name for referring later + with: + path: .venv/ + # The cache key depends on poetry.lock + key: ${{ runner.os }}-cache-${{ hashFiles('poetry.lock') }} + restore-keys: | + ${{ runner.os }}-cache- + ${{ runner.os }}- + - name: Install Dependencies + # if: steps.cache.outputs.cache-hit != 'true' + run: | + python -m pip install -U pip poetry + poetry --version + poetry config --local virtualenvs.in-project true + poetry install + - name: Benchmark regression test + run: | + ./scripts/bench-compare \ No newline at end of file diff --git a/kstreams/consts.py b/kstreams/consts.py index 49d31d0a..ecf9f844 100644 --- a/kstreams/consts.py +++ b/kstreams/consts.py @@ -1,3 +1,5 @@ +import enum + APPLICATION_X_AVRO_BINARY = "application/x-avro-binary" APPLICATION_X_AVRO_JSON = "application/x-avro-json" APPLICATION_X_JSON_SCHEMA = "application/x-json-schema" @@ -11,3 +13,15 @@ APPLICATION_X_JSON_SCHEMA, APPLICATION_JSON, ) + + +class UDFType(str, enum.Enum): + NO_TYPING = "NO_TYPING" + WITH_TYPING = "WITH_TYPING" + + +class StreamErrorPolicy(str, enum.Enum): + RESTART = "RESTART" + STOP = "STOP" + STOP_ENGINE = "STOP_ENGINE" + STOP_APPLICATION = "STOP_APPLICATION" diff --git a/kstreams/engine.py b/kstreams/engine.py index b69095f6..a50ff623 100644 --- a/kstreams/engine.py +++ b/kstreams/engine.py @@ -9,6 +9,7 @@ from .backends.kafka import Kafka from .clients import Consumer, Producer +from .consts import StreamErrorPolicy, UDFType from .exceptions import DuplicateStreamException, EngineNotStartedException from .middleware import Middleware from .middleware.udf_middleware import UdfHandler @@ -17,7 +18,6 @@ from .serializers import Deserializer, Serializer from .streams import Stream, StreamFunc from .streams import stream as stream_func -from .streams_utils import StreamErrorPolicy, UDFType from .types import Deprecated, EngineHooks, Headers, NextMiddlewareCall from .utils import encode_headers, execute_hooks diff --git a/kstreams/middleware/middleware.py b/kstreams/middleware/middleware.py index f5b164bd..8338a09c 100644 --- a/kstreams/middleware/middleware.py +++ b/kstreams/middleware/middleware.py @@ -4,7 +4,7 @@ import typing from kstreams import types -from kstreams.streams_utils import StreamErrorPolicy +from kstreams.consts import StreamErrorPolicy, UDFType if typing.TYPE_CHECKING: from kstreams import Stream, StreamEngine # pragma: no cover @@ -14,6 +14,10 @@ class MiddlewareProtocol(typing.Protocol): + next_call: types.NextMiddlewareCall + send: types.Send + stream: "Stream" + def __init__( self, *, @@ -45,6 +49,10 @@ def __repr__(self) -> str: class BaseMiddleware: + next_call: types.NextMiddlewareCall + send: types.Send + stream: "Stream" + def __init__( self, *, @@ -92,7 +100,7 @@ async def __call__(self, cr: types.ConsumerRecord) -> typing.Any: async def cleanup_policy(self, exc: Exception) -> None: """ - Execute clenup policicy according to the Stream configuration. + Execute cleanup policy according to the Stream configuration. At this point we are inside the asyncio.Lock `is_processing` as an event is being processed and an exeption has occured. @@ -145,3 +153,15 @@ async def cleanup_policy(self, exc: Exception) -> None: await self.engine.stop() await self.stream.is_processing.acquire() signal.raise_signal(signal.SIGTERM) + + +class BaseDependcyMiddleware(MiddlewareProtocol, typing.Protocol): + """Base class for Dependency Injection Middleware. + + `get_type` is used to identify the way to call the user defined function, + whether to use DI or not. + + On top of that, this middleware helps **avoid circular dependencies**. + """ + + def get_type(self) -> UDFType: ... diff --git a/kstreams/middleware/udf_middleware.py b/kstreams/middleware/udf_middleware.py index 2bc1f295..7ba4d2d0 100644 --- a/kstreams/middleware/udf_middleware.py +++ b/kstreams/middleware/udf_middleware.py @@ -18,9 +18,15 @@ class UdfHandler(BaseMiddleware): def __init__(self, *args, **kwargs) -> None: super().__init__(*args, **kwargs) signature = inspect.signature(self.next_call) - self.params = list(signature.parameters.values()) + self.params: typing.List[typing.Any] = [ + typing.get_origin(param.annotation) or param.annotation + for param in signature.parameters.values() + ] self.type: UDFType = setup_type(self.params) + def get_type(self) -> UDFType: + return self.type + def bind_udf_params(self, cr: types.ConsumerRecord) -> typing.List: # NOTE: When `no typing` support is deprecated then this can # be more eficient as the CR will be always there. @@ -30,7 +36,7 @@ def bind_udf_params(self, cr: types.ConsumerRecord) -> typing.List: types.Send: self.send, } - return [ANNOTATIONS_TO_PARAMS[param.annotation] for param in self.params] + return [ANNOTATIONS_TO_PARAMS[param_type] for param_type in self.params] async def __call__(self, cr: types.ConsumerRecord) -> typing.Any: """ diff --git a/kstreams/streams.py b/kstreams/streams.py index bf17682e..54570941 100644 --- a/kstreams/streams.py +++ b/kstreams/streams.py @@ -10,15 +10,18 @@ from kstreams import TopicPartition from kstreams.exceptions import BackendNotSet -from kstreams.middleware.middleware import ExceptionMiddleware +from kstreams.middleware.middleware import ( + BaseDependcyMiddleware, + ExceptionMiddleware, + Middleware, +) from kstreams.structs import TopicPartitionOffset from .backends.kafka import Kafka from .clients import Consumer -from .middleware import Middleware, udf_middleware +from .consts import StreamErrorPolicy, UDFType from .rebalance_listener import RebalanceListener from .serializers import Deserializer -from .streams_utils import StreamErrorPolicy, UDFType from .types import ConsumerRecord, Deprecated, StreamFunc if typing.TYPE_CHECKING: @@ -172,11 +175,14 @@ def __init__( self.seeked_initial_offsets = False self.rebalance_listener = rebalance_listener self.middlewares = middlewares or [] - self.udf_handler: typing.Optional[udf_middleware.UdfHandler] = None + self.udf_handler: typing.Optional[BaseDependcyMiddleware] = None self.topics = [topics] if isinstance(topics, str) else topics self.subscribe_by_pattern = subscribe_by_pattern self.error_policy = error_policy + def __name__(self) -> str: + return self.name + def _create_consumer(self) -> Consumer: if self.backend is None: raise BackendNotSet("A backend has not been set for this stream") @@ -342,7 +348,7 @@ async def start(self) -> None: self.running = True if self.udf_handler is not None: - if self.udf_handler.type == UDFType.NO_TYPING: + if self.udf_handler.get_type() == UDFType.NO_TYPING: # deprecated use case msg = ( "Streams with `async for in` loop approach are deprecated.\n" @@ -436,7 +442,7 @@ async def __anext__(self) -> ConsumerRecord: if ( self.udf_handler is not None - and self.udf_handler.type == UDFType.NO_TYPING + and self.udf_handler.get_type() == UDFType.NO_TYPING ): return cr return await self.func(cr) diff --git a/kstreams/streams_utils.py b/kstreams/streams_utils.py index ad36f226..d2c0841c 100644 --- a/kstreams/streams_utils.py +++ b/kstreams/streams_utils.py @@ -1,20 +1,10 @@ -import enum import inspect from typing import List -# NOTE: remove this module when Stream with `no typing` support is deprecated - - -class UDFType(str, enum.Enum): - NO_TYPING = "NO_TYPING" - WITH_TYPING = "WITH_TYPING" - +from kstreams.consts import UDFType +from kstreams.streams import Stream -class StreamErrorPolicy(str, enum.Enum): - RESTART = "RESTART" - STOP = "STOP" - STOP_ENGINE = "STOP_ENGINE" - STOP_APPLICATION = "STOP_APPLICATION" +# NOTE: remove this module when Stream with `no typing` support is deprecated def setup_type(params: List[inspect.Parameter]) -> UDFType: @@ -56,12 +46,10 @@ async def consume(cr: ConsumerRecord, stream: Stream): async def consume(cr: ConsumerRecord, stream: Stream, send: Send): ... """ - from .streams import Stream - - first_annotation = params[0].annotation + # from .streams import Stream - if first_annotation in (inspect._empty, Stream) and len(params) < 2: - # use case 1 NO_TYPING + no_type = len(params) == 1 and params[0] in (inspect._empty, Stream) + if no_type: return UDFType.NO_TYPING - # typing cases + return UDFType.WITH_TYPING diff --git a/kstreams/types.py b/kstreams/types.py index 3562f3b6..90107722 100644 --- a/kstreams/types.py +++ b/kstreams/types.py @@ -8,8 +8,7 @@ Headers = typing.Dict[str, str] EncodedHeaders = typing.Sequence[typing.Tuple[str, bytes]] -StreamFunc = typing.Callable - +StreamFunc = typing.Callable[..., typing.Any] EngineHooks = typing.Sequence[typing.Callable[[], typing.Any]] diff --git a/scripts/bench-compare b/scripts/bench-compare index 99e44829..0c93be75 100755 --- a/scripts/bench-compare +++ b/scripts/bench-compare @@ -16,4 +16,4 @@ if [ -d '.venv' ] ; then fi # Commented out until after merge, so there will be date to compare with. -# ${PREFIX}pytest tests/test_benchmarks.py --benchmark-compare --benchmark-compare-fail=min:5% +${PREFIX}pytest tests/test_benchmarks.py --benchmark-compare --benchmark-compare-fail=mean:10% diff --git a/scripts/test b/scripts/test index a20ebb12..6f26e659 100755 --- a/scripts/test +++ b/scripts/test @@ -8,4 +8,4 @@ fi ${PREFIX}pytest -x --cov-report term-missing --cov-report=xml:coverage.xml --cov=kstreams ${1-"./tests"} $2 ${PREFIX}ruff check kstreams tests ${PREFIX}ruff format --check kstreams tests examples -${PREFIX}mypy kstreams/ +${PREFIX}mypy kstreams/ tests/ diff --git a/tests/test_benchmarks.py b/tests/test_benchmarks.py index 592a99eb..0d839c00 100644 --- a/tests/test_benchmarks.py +++ b/tests/test_benchmarks.py @@ -18,6 +18,24 @@ in one second (calculated as 1 / Mean). - Rounds: The number of times the test was run. - Iterations: Number of iterations per round. + +Performance may be affected by: +- Power-saving modes +- CPU frequency scaling +- Background Processes + +To get accurate results, run benchmarks on a dedicated machine with no other +applications running. + +## Profiling + +Profile and visualize your code with `py-spy`: + +```python +pip install py-spy +sudo py-spy record -o profile.svg -- python tests/test_benchmarks.py +``` + """ from typing import Callable, List diff --git a/tests/test_monitor.py b/tests/test_monitor.py index 0b94361b..bb013d98 100644 --- a/tests/test_monitor.py +++ b/tests/test_monitor.py @@ -20,62 +20,68 @@ async def my_coroutine(_): stream_engine.add_stream(stream=stream) await stream.start() + assert stream.consumer is not None await stream_engine.monitor.generate_consumer_metrics(stream.consumer) consumer = stream.consumer for topic_partition in consumer.assignment(): # super ugly notation but for now is the only way to get the metrics met_committed = ( - stream_engine.monitor.MET_COMMITTED.labels( - topic=topic_partition.topic, - partition=topic_partition.partition, - consumer_group=consumer._group_id, - ) - .collect()[0] + list( + stream_engine.monitor.MET_COMMITTED.labels( + topic=topic_partition.topic, + partition=topic_partition.partition, + consumer_group=consumer._group_id, + ).collect() + )[0] .samples[0] .value ) met_position = ( - stream_engine.monitor.MET_POSITION.labels( - topic=topic_partition.topic, - partition=topic_partition.partition, - consumer_group=consumer._group_id, - ) - .collect()[0] + list( + stream_engine.monitor.MET_POSITION.labels( + topic=topic_partition.topic, + partition=topic_partition.partition, + consumer_group=consumer._group_id, + ).collect() + )[0] .samples[0] .value ) met_highwater = ( - stream_engine.monitor.MET_HIGHWATER.labels( - topic=topic_partition.topic, - partition=topic_partition.partition, - consumer_group=consumer._group_id, - ) - .collect()[0] + list( + stream_engine.monitor.MET_HIGHWATER.labels( + topic=topic_partition.topic, + partition=topic_partition.partition, + consumer_group=consumer._group_id, + ).collect() + )[0] .samples[0] .value ) met_lag = ( - stream_engine.monitor.MET_LAG.labels( - topic=topic_partition.topic, - partition=topic_partition.partition, - consumer_group=consumer._group_id, - ) - .collect()[0] + list( + stream_engine.monitor.MET_LAG.labels( + topic=topic_partition.topic, + partition=topic_partition.partition, + consumer_group=consumer._group_id, + ).collect() + )[0] .samples[0] .value ) met_position_lag = ( - stream_engine.monitor.MET_POSITION_LAG.labels( - topic=topic_partition.topic, - partition=topic_partition.partition, - consumer_group=consumer._group_id, - ) - .collect()[0] + list( + stream_engine.monitor.MET_POSITION_LAG.labels( + topic=topic_partition.topic, + partition=topic_partition.partition, + consumer_group=consumer._group_id, + ).collect() + )[0] .samples[0] .value ) @@ -135,56 +141,61 @@ async def my_coroutine(_): for topic_partition in consumer.assignment(): # super ugly notation but for now is the only way to get the metrics met_committed = ( - stream_engine.monitor.MET_COMMITTED.labels( - topic=topic_partition.topic, - partition=topic_partition.partition, - consumer_group=consumer._group_id, - ) - .collect()[0] + list( + stream_engine.monitor.MET_COMMITTED.labels( + topic=topic_partition.topic, + partition=topic_partition.partition, + consumer_group=consumer._group_id, + ).collect() + )[0] .samples[0] .value ) met_position = ( - stream_engine.monitor.MET_POSITION.labels( - topic=topic_partition.topic, - partition=topic_partition.partition, - consumer_group=consumer._group_id, - ) - .collect()[0] + list( + stream_engine.monitor.MET_POSITION.labels( + topic=topic_partition.topic, + partition=topic_partition.partition, + consumer_group=consumer._group_id, + ).collect() + )[0] .samples[0] .value ) met_highwater = ( - stream_engine.monitor.MET_HIGHWATER.labels( - topic=topic_partition.topic, - partition=topic_partition.partition, - consumer_group=consumer._group_id, - ) - .collect()[0] + list( + stream_engine.monitor.MET_HIGHWATER.labels( + topic=topic_partition.topic, + partition=topic_partition.partition, + consumer_group=consumer._group_id, + ).collect() + )[0] .samples[0] .value ) met_lag = ( - stream_engine.monitor.MET_LAG.labels( - topic=topic_partition.topic, - partition=topic_partition.partition, - consumer_group=consumer._group_id, - ) - .collect()[0] + list( + stream_engine.monitor.MET_LAG.labels( + topic=topic_partition.topic, + partition=topic_partition.partition, + consumer_group=consumer._group_id, + ).collect() + )[0] .samples[0] .value ) met_position_lag = ( - stream_engine.monitor.MET_POSITION_LAG.labels( - topic=topic_partition.topic, - partition=topic_partition.partition, - consumer_group=consumer._group_id, - ) - .collect()[0] + list( + stream_engine.monitor.MET_POSITION_LAG.labels( + topic=topic_partition.topic, + partition=topic_partition.partition, + consumer_group=consumer._group_id, + ).collect() + )[0] .samples[0] .value ) @@ -200,9 +211,9 @@ async def my_coroutine(_): met_position_lag == consumer.highwater(topic_partition) - consumer_position ) - assert len(stream_engine.monitor.MET_POSITION_LAG.collect()[0].samples) == 2 + assert len(list(stream_engine.monitor.MET_POSITION_LAG.collect())[0].samples) == 2 await stream_engine.remove_stream(stream) - assert len(stream_engine.monitor.MET_POSITION_LAG.collect()[0].samples) == 0 + assert len(list(stream_engine.monitor.MET_POSITION_LAG.collect())[0].samples) == 0 @pytest.mark.asyncio @@ -223,6 +234,6 @@ async def my_coroutine(_): stream_engine.add_stream(stream=stream) await stream.start() - assert len(stream_engine.monitor.MET_POSITION_LAG.collect()[0].samples) == 0 + assert len(list(stream_engine.monitor.MET_POSITION_LAG.collect())[0].samples) == 0 await stream_engine.remove_stream(stream) assert "Metrics for consumer with group-id: my-group not found" in caplog.text diff --git a/tests/test_streams.py b/tests/test_streams.py index 9f05a5ae..8546069e 100644 --- a/tests/test_streams.py +++ b/tests/test_streams.py @@ -70,7 +70,44 @@ async def getone(_): @stream_engine.stream(topic_name) async def stream(cr: ConsumerRecord): assert cr.value == value - await asyncio.sleep(0.2) + await asyncio.sleep(0.1) + + assert stream.consumer is None + assert stream.topics == [topic_name] + + with contextlib.suppress(TimeoutErrorException): + # now it is possible to run a stream directly, so we need + # to stop the `forever` consumption + await asyncio.wait_for(stream.start(), timeout=0.1) + + assert stream.consumer + Consumer.subscribe.assert_called_once_with( + topics=[topic_name], listener=stream.rebalance_listener, pattern=None + ) + await stream.stop() + + +@pytest.mark.asyncio +async def test_stream_generic_cr_with_typing( + stream_engine: StreamEngine, consumer_record_factory +): + topic_name = "local--kstreams" + value = b"test" + + async def getone(_): + return consumer_record_factory(value=value) + + with mock.patch.multiple( + Consumer, + start=mock.DEFAULT, + subscribe=mock.DEFAULT, + getone=getone, + ): + + @stream_engine.stream(topic_name) + async def stream(cr: ConsumerRecord[str, bytes]): + assert cr.value == value + await asyncio.sleep(0.1) assert stream.consumer is None assert stream.topics == [topic_name] @@ -107,7 +144,7 @@ async def getone(_): async def stream(cr: ConsumerRecord, stream: Stream): assert cr.value == value assert isinstance(stream, Stream) - await asyncio.sleep(0.2) + await asyncio.sleep(0.1) with contextlib.suppress(TimeoutErrorException): # now it is possible to run a stream directly, so we need @@ -137,7 +174,7 @@ async def stream(cr: ConsumerRecord, send: Send, stream: Stream): assert cr.value == value assert isinstance(stream, Stream) assert send == stream_engine.send - await asyncio.sleep(0.2) + await asyncio.sleep(0.1) assert stream.consumer is None assert stream.topics == [topic_name] @@ -176,7 +213,7 @@ async def stream(stream: Stream, cr: ConsumerRecord, send: Send): assert cr.value == value assert isinstance(stream, Stream) assert send == stream_engine.send - await asyncio.sleep(0.2) + await asyncio.sleep(0.1) assert stream.consumer is None assert stream.topics == [topic_name] diff --git a/tests/test_streams_error_policy.py b/tests/test_streams_error_policy.py index 276598ac..9996b1a6 100644 --- a/tests/test_streams_error_policy.py +++ b/tests/test_streams_error_policy.py @@ -4,7 +4,7 @@ import pytest from kstreams import ConsumerRecord, StreamEngine, TestStreamClient -from kstreams.streams_utils import StreamErrorPolicy +from kstreams.consts import StreamErrorPolicy @pytest.mark.asyncio