From 122b4ca50f482070f16187269e42214553608422 Mon Sep 17 00:00:00 2001 From: Santiago Fraire Willemoes Date: Wed, 27 Nov 2024 15:07:18 +0100 Subject: [PATCH] fix: circular imports on DI system --- ...1_20241127_140629_uncommited-changes.json} | 106 +++++++++--------- kstreams/consts.py | 14 +++ kstreams/engine.py | 2 +- kstreams/middleware/middleware.py | 2 +- kstreams/streams.py | 2 +- kstreams/streams_utils.py | 18 +-- tests/test_streams_error_policy.py | 2 +- 7 files changed, 75 insertions(+), 71 deletions(-) rename .benchmarks/Darwin-CPython-3.11-64bit/{0002_da9cb25b0f84535841637d318f379657f4a1f632_20241127_132937_uncommited-changes.json => 0002_81eb783e807835e049815241011f694a54404471_20241127_140629_uncommited-changes.json} (53%) diff --git a/.benchmarks/Darwin-CPython-3.11-64bit/0002_da9cb25b0f84535841637d318f379657f4a1f632_20241127_132937_uncommited-changes.json b/.benchmarks/Darwin-CPython-3.11-64bit/0002_81eb783e807835e049815241011f694a54404471_20241127_140629_uncommited-changes.json similarity index 53% rename from .benchmarks/Darwin-CPython-3.11-64bit/0002_da9cb25b0f84535841637d318f379657f4a1f632_20241127_132937_uncommited-changes.json rename to .benchmarks/Darwin-CPython-3.11-64bit/0002_81eb783e807835e049815241011f694a54404471_20241127_140629_uncommited-changes.json index 6eace691..442e2b1a 100644 --- a/.benchmarks/Darwin-CPython-3.11-64bit/0002_da9cb25b0f84535841637d318f379657f4a1f632_20241127_132937_uncommited-changes.json +++ b/.benchmarks/Darwin-CPython-3.11-64bit/0002_81eb783e807835e049815241011f694a54404471_20241127_140629_uncommited-changes.json @@ -1,6 +1,6 @@ { "machine_info": { - "node": "mac.home", + "node": "Woile-MacBook-Pro.local", "processor": "arm", "machine": "arm64", "python_compiler": "Clang 16.0.6 ", @@ -29,9 +29,9 @@ } }, "commit_info": { - "id": "da9cb25b0f84535841637d318f379657f4a1f632", - "time": "2024-11-27T12:34:46+01:00", - "author_time": "2024-11-27T12:34:46+01:00", + "id": "81eb783e807835e049815241011f694a54404471", + "time": "2024-11-27T14:40:28+01:00", + "author_time": "2024-11-27T14:40:28+01:00", "dirty": true, "project": "kstreams", "branch": "fix/generic-consumer-record" @@ -53,22 +53,22 @@ "warmup": false }, "stats": { - "min": 4.629199975170195e-05, - "max": 0.010518291994230822, - "mean": 0.00011184708857215268, - "stddev": 0.00014957945906818188, - "rounds": 5163, - "median": 0.00010833298438228667, - "iqr": 5.845900159329176e-05, - "q1": 7.954100146889687e-05, - "q3": 0.00013800000306218863, - "iqr_outliers": 15, - "stddev_outliers": 14, - "outliers": "14;15", - "ld15iqr": 4.629199975170195e-05, - "hd15iqr": 0.00023312499979510903, - "ops": 8940.77809951127, - "total": 0.5774665182980243, + "min": 4.199999966658652e-05, + "max": 0.012051916011841968, + "mean": 9.800166512286035e-05, + "stddev": 0.0001816612665091389, + "rounds": 4471, + "median": 9.42080223467201e-05, + "iqr": 4.929174610879272e-05, + "q1": 6.958325684536248e-05, + "q3": 0.0001188750029541552, + "iqr_outliers": 14, + "stddev_outliers": 9, + "outliers": "9;14", + "ld15iqr": 4.199999966658652e-05, + "hd15iqr": 0.00020029101870022714, + "ops": 10203.90825754179, + "total": 0.43816544476430863, "iterations": 1 } }, @@ -88,22 +88,22 @@ "warmup": false }, "stats": { - "min": 4.620800609700382e-05, - "max": 0.017021667008521035, - "mean": 0.00021773011284045453, - "stddev": 0.00022200387249880765, - "rounds": 13762, - "median": 0.00021172950800973922, - "iqr": 0.00016691599739715457, - "q1": 0.0001298749994020909, - "q3": 0.0002967909967992455, - "iqr_outliers": 17, - "stddev_outliers": 39, - "outliers": "39;17", - "ld15iqr": 4.620800609700382e-05, - "hd15iqr": 0.0005522920109797269, - "ops": 4592.84196822498, - "total": 2.996401812910335, + "min": 4.3625012040138245e-05, + "max": 0.01566049997927621, + "mean": 0.00022631162852908016, + "stddev": 0.0002073705067274279, + "rounds": 15019, + "median": 0.0002195000124629587, + "iqr": 0.0001750317678670399, + "q1": 0.00013429098180495203, + "q3": 0.0003093227496719919, + "iqr_outliers": 19, + "stddev_outliers": 133, + "outliers": "133;19", + "ld15iqr": 4.3625012040138245e-05, + "hd15iqr": 0.0005731249984819442, + "ops": 4418.685891217931, + "total": 3.398974348878255, "iterations": 1 } }, @@ -123,26 +123,26 @@ "warmup": false }, "stats": { - "min": 0.0007156250067055225, - "max": 0.000864708999870345, - "mean": 0.0007345807816986966, - "stddev": 1.2547058035859212e-05, - "rounds": 1027, - "median": 0.0007324170146603137, - "iqr": 9.614763257559389e-06, - "q1": 0.0007281249927473255, - "q3": 0.0007377397560048848, - "iqr_outliers": 60, - "stddev_outliers": 154, - "outliers": "154;60", - "ld15iqr": 0.0007156250067055225, - "hd15iqr": 0.0007525830005761236, - "ops": 1361.320667398253, - "total": 0.7544144628045615, + "min": 0.0006483749893959612, + "max": 0.0008417500066570938, + "mean": 0.0006934208655618783, + "stddev": 2.4773934466283123e-05, + "rounds": 943, + "median": 0.0006997500022407621, + "iqr": 3.609351551858708e-05, + "q1": 0.0006723854967276566, + "q3": 0.0007084790122462437, + "iqr_outliers": 16, + "stddev_outliers": 246, + "outliers": "246;16", + "ld15iqr": 0.0006483749893959612, + "hd15iqr": 0.0007627500162925571, + "ops": 1442.1256262453264, + "total": 0.6538958762248512, "iterations": 1 } } ], - "datetime": "2024-11-27T13:29:43.583338+00:00", + "datetime": "2024-11-27T14:06:35.464861+00:00", "version": "5.1.0" } \ No newline at end of file diff --git a/kstreams/consts.py b/kstreams/consts.py index 49d31d0a..ecf9f844 100644 --- a/kstreams/consts.py +++ b/kstreams/consts.py @@ -1,3 +1,5 @@ +import enum + APPLICATION_X_AVRO_BINARY = "application/x-avro-binary" APPLICATION_X_AVRO_JSON = "application/x-avro-json" APPLICATION_X_JSON_SCHEMA = "application/x-json-schema" @@ -11,3 +13,15 @@ APPLICATION_X_JSON_SCHEMA, APPLICATION_JSON, ) + + +class UDFType(str, enum.Enum): + NO_TYPING = "NO_TYPING" + WITH_TYPING = "WITH_TYPING" + + +class StreamErrorPolicy(str, enum.Enum): + RESTART = "RESTART" + STOP = "STOP" + STOP_ENGINE = "STOP_ENGINE" + STOP_APPLICATION = "STOP_APPLICATION" diff --git a/kstreams/engine.py b/kstreams/engine.py index b69095f6..a50ff623 100644 --- a/kstreams/engine.py +++ b/kstreams/engine.py @@ -9,6 +9,7 @@ from .backends.kafka import Kafka from .clients import Consumer, Producer +from .consts import StreamErrorPolicy, UDFType from .exceptions import DuplicateStreamException, EngineNotStartedException from .middleware import Middleware from .middleware.udf_middleware import UdfHandler @@ -17,7 +18,6 @@ from .serializers import Deserializer, Serializer from .streams import Stream, StreamFunc from .streams import stream as stream_func -from .streams_utils import StreamErrorPolicy, UDFType from .types import Deprecated, EngineHooks, Headers, NextMiddlewareCall from .utils import encode_headers, execute_hooks diff --git a/kstreams/middleware/middleware.py b/kstreams/middleware/middleware.py index 022767c6..8338a09c 100644 --- a/kstreams/middleware/middleware.py +++ b/kstreams/middleware/middleware.py @@ -4,7 +4,7 @@ import typing from kstreams import types -from kstreams.streams_utils import StreamErrorPolicy, UDFType +from kstreams.consts import StreamErrorPolicy, UDFType if typing.TYPE_CHECKING: from kstreams import Stream, StreamEngine # pragma: no cover diff --git a/kstreams/streams.py b/kstreams/streams.py index d9027eee..54570941 100644 --- a/kstreams/streams.py +++ b/kstreams/streams.py @@ -19,9 +19,9 @@ from .backends.kafka import Kafka from .clients import Consumer +from .consts import StreamErrorPolicy, UDFType from .rebalance_listener import RebalanceListener from .serializers import Deserializer -from .streams_utils import StreamErrorPolicy, UDFType from .types import ConsumerRecord, Deprecated, StreamFunc if typing.TYPE_CHECKING: diff --git a/kstreams/streams_utils.py b/kstreams/streams_utils.py index a47fa683..d2c0841c 100644 --- a/kstreams/streams_utils.py +++ b/kstreams/streams_utils.py @@ -1,20 +1,10 @@ -import enum import inspect from typing import List -# NOTE: remove this module when Stream with `no typing` support is deprecated - - -class UDFType(str, enum.Enum): - NO_TYPING = "NO_TYPING" - WITH_TYPING = "WITH_TYPING" +from kstreams.consts import UDFType +from kstreams.streams import Stream - -class StreamErrorPolicy(str, enum.Enum): - RESTART = "RESTART" - STOP = "STOP" - STOP_ENGINE = "STOP_ENGINE" - STOP_APPLICATION = "STOP_APPLICATION" +# NOTE: remove this module when Stream with `no typing` support is deprecated def setup_type(params: List[inspect.Parameter]) -> UDFType: @@ -56,7 +46,7 @@ async def consume(cr: ConsumerRecord, stream: Stream): async def consume(cr: ConsumerRecord, stream: Stream, send: Send): ... """ - from .streams import Stream + # from .streams import Stream no_type = len(params) == 1 and params[0] in (inspect._empty, Stream) if no_type: diff --git a/tests/test_streams_error_policy.py b/tests/test_streams_error_policy.py index 276598ac..9996b1a6 100644 --- a/tests/test_streams_error_policy.py +++ b/tests/test_streams_error_policy.py @@ -4,7 +4,7 @@ import pytest from kstreams import ConsumerRecord, StreamEngine, TestStreamClient -from kstreams.streams_utils import StreamErrorPolicy +from kstreams.consts import StreamErrorPolicy @pytest.mark.asyncio