diff --git a/kstreams/__init__.py b/kstreams/__init__.py index 1db148a9..30499878 100644 --- a/kstreams/__init__.py +++ b/kstreams/__init__.py @@ -1,6 +1,6 @@ from aiokafka.structs import ConsumerRecord, RecordMetadata, TopicPartition -from .clients import Consumer, ConsumerType, Producer, ProducerType +from .clients import Consumer, Producer from .create import StreamEngine, create_engine from .prometheus.monitor import PrometheusMonitor, PrometheusMonitorType from .rebalance_listener import ( @@ -14,9 +14,7 @@ __all__ = [ "Consumer", - "ConsumerType", "Producer", - "ProducerType", "StreamEngine", "create_engine", "PrometheusMonitor", diff --git a/kstreams/clients.py b/kstreams/clients.py index a6011605..bdeacb28 100644 --- a/kstreams/clients.py +++ b/kstreams/clients.py @@ -1,5 +1,5 @@ import logging -from typing import Callable, Optional, TypeVar +from typing import Callable, Optional import aiokafka @@ -40,7 +40,3 @@ def key_serializer(key): return key.encode("utf-8") super().__init__(*args, key_serializer=key_serializer, **self.config) - - -ConsumerType = TypeVar("ConsumerType", bound=Consumer) -ProducerType = TypeVar("ProducerType", bound=Producer) diff --git a/kstreams/create.py b/kstreams/create.py index eda4b339..6ee24aa4 100644 --- a/kstreams/create.py +++ b/kstreams/create.py @@ -1,7 +1,7 @@ from typing import Optional, Type from .backends.kafka import Kafka -from .clients import Consumer, ConsumerType, Producer, ProducerType +from .clients import Consumer, Producer from .engine import StreamEngine from .prometheus.monitor import PrometheusMonitor from .serializers import Deserializer, Serializer @@ -10,8 +10,8 @@ def create_engine( title: Optional[str] = None, backend: Optional[Kafka] = None, - consumer_class: Type[ConsumerType] = Consumer, - producer_class: Type[ProducerType] = Producer, + consumer_class: Type[Consumer] = Consumer, + producer_class: Type[Producer] = Producer, serializer: Optional[Serializer] = None, deserializer: Optional[Deserializer] = None, monitor: Optional[PrometheusMonitor] = None, diff --git a/kstreams/engine.py b/kstreams/engine.py index 04f49bad..907b2903 100644 --- a/kstreams/engine.py +++ b/kstreams/engine.py @@ -7,7 +7,7 @@ from kstreams.structs import TopicPartitionOffset from .backends.kafka import Kafka -from .clients import ConsumerType, ProducerType +from .clients import Consumer, Producer from .exceptions import DuplicateStreamException, EngineNotStartedException from .middleware import ExceptionMiddleware, Middleware from .prometheus.monitor import PrometheusMonitor @@ -61,8 +61,8 @@ def __init__( self, *, backend: Kafka, - consumer_class: typing.Type[ConsumerType], - producer_class: typing.Type[ProducerType], + consumer_class: typing.Type[Consumer], + producer_class: typing.Type[Producer], monitor: PrometheusMonitor, title: typing.Optional[str] = None, deserializer: typing.Optional[Deserializer] = None, @@ -75,7 +75,7 @@ def __init__( self.deserializer = deserializer self.serializer = serializer self.monitor = monitor - self._producer: typing.Optional[typing.Type[ProducerType]] = None + self._producer: typing.Optional[typing.Type[Producer]] = None self._streams: typing.List[Stream] = [] async def send( diff --git a/kstreams/prometheus/monitor.py b/kstreams/prometheus/monitor.py index 7079233c..d475c86a 100644 --- a/kstreams/prometheus/monitor.py +++ b/kstreams/prometheus/monitor.py @@ -5,7 +5,7 @@ from prometheus_client import Gauge from kstreams import TopicPartition -from kstreams.clients import ConsumerType +from kstreams.clients import Consumer from kstreams.streams import Stream logger = logging.getLogger(__name__) @@ -164,7 +164,7 @@ def add_producer(self, producer): def add_streams(self, streams): self._streams = streams - async def generate_consumer_metrics(self, consumer: ConsumerType): + async def generate_consumer_metrics(self, consumer: Consumer): """ Generate Consumer Metrics for Prometheus diff --git a/kstreams/streams.py b/kstreams/streams.py index 95719e21..e8c87463 100644 --- a/kstreams/streams.py +++ b/kstreams/streams.py @@ -13,7 +13,7 @@ from kstreams.structs import TopicPartitionOffset from .backends.kafka import Kafka -from .clients import Consumer, ConsumerType +from .clients import Consumer from .middleware import Middleware from .rebalance_listener import RebalanceListener from .serializers import Deserializer @@ -86,7 +86,7 @@ def __init__( *, func: StreamFunc, backend: typing.Optional[Kafka] = None, - consumer_class: typing.Type[ConsumerType] = Consumer, + consumer_class: typing.Type[Consumer] = Consumer, name: typing.Optional[str] = None, config: typing.Optional[typing.Dict] = None, deserializer: typing.Optional[Deserializer] = None, @@ -97,7 +97,7 @@ def __init__( self.func = func self.backend = backend self.consumer_class = consumer_class - self.consumer: typing.Optional[ConsumerType] = None + self.consumer: typing.Optional[Consumer] = None self.config = config or {} self._consumer_task: typing.Optional[asyncio.Task] = None self.name = name or str(uuid.uuid4()) @@ -114,7 +114,7 @@ def __init__( # so we always create a list and then we expand it with *topics self.topics = [topics] if isinstance(topics, str) else topics - def _create_consumer(self) -> ConsumerType: + def _create_consumer(self) -> Consumer: if self.backend is None: raise BackendNotSet("A backend has not been set for this stream") config = {**self.backend.model_dump(), **self.config} @@ -240,8 +240,8 @@ async def func_wrapper_with_typing(self) -> None: cr = await self.getone() await self.func(cr) - def seek_to_initial_offsets(self): - if not self.seeked_initial_offsets: + def seek_to_initial_offsets(self) -> None: + if not self.seeked_initial_offsets and self.consumer is not None: assignments: typing.Set[TopicPartition] = self.consumer.assignment() if self.initial_offsets is not None: topicPartitionOffset: TopicPartitionOffset diff --git a/kstreams/utils.py b/kstreams/utils.py index ce41a505..f1794fe6 100644 --- a/kstreams/utils.py +++ b/kstreams/utils.py @@ -1,7 +1,7 @@ import contextlib import ssl from tempfile import NamedTemporaryFile -from typing import Optional, Union +from typing import Any, Optional, Union from aiokafka.helpers import create_ssl_context as aiokafka_create_ssl_context @@ -53,13 +53,13 @@ def create_ssl_context_from_mem( def create_ssl_context( *, - cafile: str = None, - capath: str = None, - cadata: Union[str, bytes] = None, - certfile: str = None, - keyfile: str = None, - password: str = None, - crlfile=None, + cafile: Optional[str] = None, + capath: Optional[str] = None, + cadata: Union[str, bytes, None] = None, + certfile: Optional[str] = None, + keyfile: Optional[str] = None, + password: Optional[str] = None, + crlfile: Any = None, ): """Wrapper of [aiokafka.helpers.create_ssl_context]( https://aiokafka.readthedocs.io/en/stable/api.html#helpers diff --git a/poetry.lock b/poetry.lock index 9c28db8a..b7e3779d 100644 --- a/poetry.lock +++ b/poetry.lock @@ -675,44 +675,49 @@ mkdocstrings = ">=0.20" [[package]] name = "mypy" -version = "0.971" +version = "1.8.0" description = "Optional static typing for Python" optional = false -python-versions = ">=3.6" +python-versions = ">=3.8" files = [ - {file = "mypy-0.971-cp310-cp310-macosx_10_9_universal2.whl", hash = "sha256:f2899a3cbd394da157194f913a931edfd4be5f274a88041c9dc2d9cdcb1c315c"}, - {file = "mypy-0.971-cp310-cp310-macosx_10_9_x86_64.whl", hash = "sha256:98e02d56ebe93981c41211c05adb630d1d26c14195d04d95e49cd97dbc046dc5"}, - {file = "mypy-0.971-cp310-cp310-macosx_11_0_arm64.whl", hash = "sha256:19830b7dba7d5356d3e26e2427a2ec91c994cd92d983142cbd025ebe81d69cf3"}, - {file = "mypy-0.971-cp310-cp310-manylinux_2_5_x86_64.manylinux1_x86_64.manylinux_2_12_x86_64.manylinux2010_x86_64.whl", hash = "sha256:02ef476f6dcb86e6f502ae39a16b93285fef97e7f1ff22932b657d1ef1f28655"}, - {file = "mypy-0.971-cp310-cp310-win_amd64.whl", hash = "sha256:25c5750ba5609a0c7550b73a33deb314ecfb559c350bb050b655505e8aed4103"}, - {file = "mypy-0.971-cp36-cp36m-macosx_10_9_x86_64.whl", hash = "sha256:d3348e7eb2eea2472db611486846742d5d52d1290576de99d59edeb7cd4a42ca"}, - {file = "mypy-0.971-cp36-cp36m-manylinux_2_5_x86_64.manylinux1_x86_64.manylinux_2_12_x86_64.manylinux2010_x86_64.whl", hash = "sha256:3fa7a477b9900be9b7dd4bab30a12759e5abe9586574ceb944bc29cddf8f0417"}, - {file = "mypy-0.971-cp36-cp36m-win_amd64.whl", hash = "sha256:2ad53cf9c3adc43cf3bea0a7d01a2f2e86db9fe7596dfecb4496a5dda63cbb09"}, - {file = "mypy-0.971-cp37-cp37m-macosx_10_9_x86_64.whl", hash = "sha256:855048b6feb6dfe09d3353466004490b1872887150c5bb5caad7838b57328cc8"}, - {file = "mypy-0.971-cp37-cp37m-manylinux_2_5_x86_64.manylinux1_x86_64.manylinux_2_12_x86_64.manylinux2010_x86_64.whl", hash = "sha256:23488a14a83bca6e54402c2e6435467a4138785df93ec85aeff64c6170077fb0"}, - {file = "mypy-0.971-cp37-cp37m-win_amd64.whl", hash = "sha256:4b21e5b1a70dfb972490035128f305c39bc4bc253f34e96a4adf9127cf943eb2"}, - {file = "mypy-0.971-cp38-cp38-macosx_10_9_universal2.whl", hash = "sha256:9796a2ba7b4b538649caa5cecd398d873f4022ed2333ffde58eaf604c4d2cb27"}, - {file = "mypy-0.971-cp38-cp38-macosx_10_9_x86_64.whl", hash = "sha256:5a361d92635ad4ada1b1b2d3630fc2f53f2127d51cf2def9db83cba32e47c856"}, - {file = "mypy-0.971-cp38-cp38-macosx_11_0_arm64.whl", hash = "sha256:b793b899f7cf563b1e7044a5c97361196b938e92f0a4343a5d27966a53d2ec71"}, - {file = "mypy-0.971-cp38-cp38-manylinux_2_5_x86_64.manylinux1_x86_64.manylinux_2_12_x86_64.manylinux2010_x86_64.whl", hash = "sha256:d1ea5d12c8e2d266b5fb8c7a5d2e9c0219fedfeb493b7ed60cd350322384ac27"}, - {file = "mypy-0.971-cp38-cp38-win_amd64.whl", hash = "sha256:23c7ff43fff4b0df93a186581885c8512bc50fc4d4910e0f838e35d6bb6b5e58"}, - {file = "mypy-0.971-cp39-cp39-macosx_10_9_universal2.whl", hash = "sha256:1f7656b69974a6933e987ee8ffb951d836272d6c0f81d727f1d0e2696074d9e6"}, - {file = "mypy-0.971-cp39-cp39-macosx_10_9_x86_64.whl", hash = "sha256:d2022bfadb7a5c2ef410d6a7c9763188afdb7f3533f22a0a32be10d571ee4bbe"}, - {file = "mypy-0.971-cp39-cp39-macosx_11_0_arm64.whl", hash = "sha256:ef943c72a786b0f8d90fd76e9b39ce81fb7171172daf84bf43eaf937e9f220a9"}, - {file = "mypy-0.971-cp39-cp39-manylinux_2_5_x86_64.manylinux1_x86_64.manylinux_2_12_x86_64.manylinux2010_x86_64.whl", hash = "sha256:d744f72eb39f69312bc6c2abf8ff6656973120e2eb3f3ec4f758ed47e414a4bf"}, - {file = "mypy-0.971-cp39-cp39-win_amd64.whl", hash = "sha256:77a514ea15d3007d33a9e2157b0ba9c267496acf12a7f2b9b9f8446337aac5b0"}, - {file = "mypy-0.971-py3-none-any.whl", hash = "sha256:0d054ef16b071149917085f51f89555a576e2618d5d9dd70bd6eea6410af3ac9"}, - {file = "mypy-0.971.tar.gz", hash = "sha256:40b0f21484238269ae6a57200c807d80debc6459d444c0489a102d7c6a75fa56"}, + {file = "mypy-1.8.0-cp310-cp310-macosx_10_9_x86_64.whl", hash = "sha256:485a8942f671120f76afffff70f259e1cd0f0cfe08f81c05d8816d958d4577d3"}, + {file = "mypy-1.8.0-cp310-cp310-macosx_11_0_arm64.whl", hash = "sha256:df9824ac11deaf007443e7ed2a4a26bebff98d2bc43c6da21b2b64185da011c4"}, + {file = "mypy-1.8.0-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:2afecd6354bbfb6e0160f4e4ad9ba6e4e003b767dd80d85516e71f2e955ab50d"}, + {file = "mypy-1.8.0-cp310-cp310-musllinux_1_1_x86_64.whl", hash = "sha256:8963b83d53ee733a6e4196954502b33567ad07dfd74851f32be18eb932fb1cb9"}, + {file = "mypy-1.8.0-cp310-cp310-win_amd64.whl", hash = "sha256:e46f44b54ebddbeedbd3d5b289a893219065ef805d95094d16a0af6630f5d410"}, + {file = "mypy-1.8.0-cp311-cp311-macosx_10_9_x86_64.whl", hash = "sha256:855fe27b80375e5c5878492f0729540db47b186509c98dae341254c8f45f42ae"}, + {file = "mypy-1.8.0-cp311-cp311-macosx_11_0_arm64.whl", hash = "sha256:4c886c6cce2d070bd7df4ec4a05a13ee20c0aa60cb587e8d1265b6c03cf91da3"}, + {file = "mypy-1.8.0-cp311-cp311-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:d19c413b3c07cbecf1f991e2221746b0d2a9410b59cb3f4fb9557f0365a1a817"}, + {file = "mypy-1.8.0-cp311-cp311-musllinux_1_1_x86_64.whl", hash = "sha256:9261ed810972061388918c83c3f5cd46079d875026ba97380f3e3978a72f503d"}, + {file = "mypy-1.8.0-cp311-cp311-win_amd64.whl", hash = "sha256:51720c776d148bad2372ca21ca29256ed483aa9a4cdefefcef49006dff2a6835"}, + {file = "mypy-1.8.0-cp312-cp312-macosx_10_9_x86_64.whl", hash = "sha256:52825b01f5c4c1c4eb0db253ec09c7aa17e1a7304d247c48b6f3599ef40db8bd"}, + {file = "mypy-1.8.0-cp312-cp312-macosx_11_0_arm64.whl", hash = "sha256:f5ac9a4eeb1ec0f1ccdc6f326bcdb464de5f80eb07fb38b5ddd7b0de6bc61e55"}, + {file = "mypy-1.8.0-cp312-cp312-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:afe3fe972c645b4632c563d3f3eff1cdca2fa058f730df2b93a35e3b0c538218"}, + {file = "mypy-1.8.0-cp312-cp312-musllinux_1_1_x86_64.whl", hash = "sha256:42c6680d256ab35637ef88891c6bd02514ccb7e1122133ac96055ff458f93fc3"}, + {file = "mypy-1.8.0-cp312-cp312-win_amd64.whl", hash = "sha256:720a5ca70e136b675af3af63db533c1c8c9181314d207568bbe79051f122669e"}, + {file = "mypy-1.8.0-cp38-cp38-macosx_10_9_x86_64.whl", hash = "sha256:028cf9f2cae89e202d7b6593cd98db6759379f17a319b5faf4f9978d7084cdc6"}, + {file = "mypy-1.8.0-cp38-cp38-macosx_11_0_arm64.whl", hash = "sha256:4e6d97288757e1ddba10dd9549ac27982e3e74a49d8d0179fc14d4365c7add66"}, + {file = "mypy-1.8.0-cp38-cp38-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:7f1478736fcebb90f97e40aff11a5f253af890c845ee0c850fe80aa060a267c6"}, + {file = "mypy-1.8.0-cp38-cp38-musllinux_1_1_x86_64.whl", hash = "sha256:42419861b43e6962a649068a61f4a4839205a3ef525b858377a960b9e2de6e0d"}, + {file = "mypy-1.8.0-cp38-cp38-win_amd64.whl", hash = "sha256:2b5b6c721bd4aabaadead3a5e6fa85c11c6c795e0c81a7215776ef8afc66de02"}, + {file = "mypy-1.8.0-cp39-cp39-macosx_10_9_x86_64.whl", hash = "sha256:5c1538c38584029352878a0466f03a8ee7547d7bd9f641f57a0f3017a7c905b8"}, + {file = "mypy-1.8.0-cp39-cp39-macosx_11_0_arm64.whl", hash = "sha256:4ef4be7baf08a203170f29e89d79064463b7fc7a0908b9d0d5114e8009c3a259"}, + {file = "mypy-1.8.0-cp39-cp39-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:7178def594014aa6c35a8ff411cf37d682f428b3b5617ca79029d8ae72f5402b"}, + {file = "mypy-1.8.0-cp39-cp39-musllinux_1_1_x86_64.whl", hash = "sha256:ab3c84fa13c04aeeeabb2a7f67a25ef5d77ac9d6486ff33ded762ef353aa5592"}, + {file = "mypy-1.8.0-cp39-cp39-win_amd64.whl", hash = "sha256:99b00bc72855812a60d253420d8a2eae839b0afa4938f09f4d2aa9bb4654263a"}, + {file = "mypy-1.8.0-py3-none-any.whl", hash = "sha256:538fd81bb5e430cc1381a443971c0475582ff9f434c16cd46d2c66763ce85d9d"}, + {file = "mypy-1.8.0.tar.gz", hash = "sha256:6ff8b244d7085a0b425b56d327b480c3b29cafbd2eff27316a004f9a7391ae07"}, ] [package.dependencies] -mypy-extensions = ">=0.4.3" +mypy-extensions = ">=1.0.0" tomli = {version = ">=1.1.0", markers = "python_version < \"3.11\""} -typing-extensions = ">=3.10" +typing-extensions = ">=4.1.0" [package.extras] dmypy = ["psutil (>=4.0)"] -python2 = ["typed-ast (>=1.4.0,<2)"] +install-types = ["pip"] +mypyc = ["setuptools (>=50)"] reports = ["lxml"] [[package]] @@ -1454,4 +1459,4 @@ testing = ["big-O", "jaraco.functools", "jaraco.itertools", "more-itertools", "p [metadata] lock-version = "2.0" python-versions = "^3.8" -content-hash = "22094be5d548ba5d5816d373ebe75a818bb72f67c9803917a6b120c379014095" +content-hash = "e3b934f7f9c99b60875cd358bc3792400be3d38e22f363794603bc967bdfb2f9" diff --git a/pyproject.toml b/pyproject.toml index 2243f16a..55e75c72 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -28,7 +28,7 @@ pydantic = ">=2.0.0,<3.0.0" [tool.poetry.dev-dependencies] pytest = "^6.1" -mypy = "^0.971" +mypy = "^1.8.0" pytest-httpserver = "<1.0" pytest-cov = "^2.11.1" jedi = "0.17.2"