Skip to content

Commit

Permalink
fix: mypy updated from version 0.9 to 1.8.0. Some typing fixed. Closes
Browse files Browse the repository at this point in the history
  • Loading branch information
marcosschroh authored Jan 25, 2024
1 parent e6d81d2 commit b5c8fda
Show file tree
Hide file tree
Showing 9 changed files with 60 additions and 61 deletions.
4 changes: 1 addition & 3 deletions kstreams/__init__.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
from aiokafka.structs import ConsumerRecord, RecordMetadata, TopicPartition

from .clients import Consumer, ConsumerType, Producer, ProducerType
from .clients import Consumer, Producer
from .create import StreamEngine, create_engine
from .prometheus.monitor import PrometheusMonitor, PrometheusMonitorType
from .rebalance_listener import (
Expand All @@ -14,9 +14,7 @@

__all__ = [
"Consumer",
"ConsumerType",
"Producer",
"ProducerType",
"StreamEngine",
"create_engine",
"PrometheusMonitor",
Expand Down
6 changes: 1 addition & 5 deletions kstreams/clients.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import logging
from typing import Callable, Optional, TypeVar
from typing import Callable, Optional

import aiokafka

Expand Down Expand Up @@ -40,7 +40,3 @@ def key_serializer(key):
return key.encode("utf-8")

super().__init__(*args, key_serializer=key_serializer, **self.config)


ConsumerType = TypeVar("ConsumerType", bound=Consumer)
ProducerType = TypeVar("ProducerType", bound=Producer)
6 changes: 3 additions & 3 deletions kstreams/create.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
from typing import Optional, Type

from .backends.kafka import Kafka
from .clients import Consumer, ConsumerType, Producer, ProducerType
from .clients import Consumer, Producer
from .engine import StreamEngine
from .prometheus.monitor import PrometheusMonitor
from .serializers import Deserializer, Serializer
Expand All @@ -10,8 +10,8 @@
def create_engine(
title: Optional[str] = None,
backend: Optional[Kafka] = None,
consumer_class: Type[ConsumerType] = Consumer,
producer_class: Type[ProducerType] = Producer,
consumer_class: Type[Consumer] = Consumer,
producer_class: Type[Producer] = Producer,
serializer: Optional[Serializer] = None,
deserializer: Optional[Deserializer] = None,
monitor: Optional[PrometheusMonitor] = None,
Expand Down
8 changes: 4 additions & 4 deletions kstreams/engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
from kstreams.structs import TopicPartitionOffset

from .backends.kafka import Kafka
from .clients import ConsumerType, ProducerType
from .clients import Consumer, Producer
from .exceptions import DuplicateStreamException, EngineNotStartedException
from .middleware import ExceptionMiddleware, Middleware
from .prometheus.monitor import PrometheusMonitor
Expand Down Expand Up @@ -61,8 +61,8 @@ def __init__(
self,
*,
backend: Kafka,
consumer_class: typing.Type[ConsumerType],
producer_class: typing.Type[ProducerType],
consumer_class: typing.Type[Consumer],
producer_class: typing.Type[Producer],
monitor: PrometheusMonitor,
title: typing.Optional[str] = None,
deserializer: typing.Optional[Deserializer] = None,
Expand All @@ -75,7 +75,7 @@ def __init__(
self.deserializer = deserializer
self.serializer = serializer
self.monitor = monitor
self._producer: typing.Optional[typing.Type[ProducerType]] = None
self._producer: typing.Optional[typing.Type[Producer]] = None
self._streams: typing.List[Stream] = []

async def send(
Expand Down
4 changes: 2 additions & 2 deletions kstreams/prometheus/monitor.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
from prometheus_client import Gauge

from kstreams import TopicPartition
from kstreams.clients import ConsumerType
from kstreams.clients import Consumer
from kstreams.streams import Stream

logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -164,7 +164,7 @@ def add_producer(self, producer):
def add_streams(self, streams):
self._streams = streams

async def generate_consumer_metrics(self, consumer: ConsumerType):
async def generate_consumer_metrics(self, consumer: Consumer):
"""
Generate Consumer Metrics for Prometheus
Expand Down
12 changes: 6 additions & 6 deletions kstreams/streams.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
from kstreams.structs import TopicPartitionOffset

from .backends.kafka import Kafka
from .clients import Consumer, ConsumerType
from .clients import Consumer
from .middleware import Middleware
from .rebalance_listener import RebalanceListener
from .serializers import Deserializer
Expand Down Expand Up @@ -86,7 +86,7 @@ def __init__(
*,
func: StreamFunc,
backend: typing.Optional[Kafka] = None,
consumer_class: typing.Type[ConsumerType] = Consumer,
consumer_class: typing.Type[Consumer] = Consumer,
name: typing.Optional[str] = None,
config: typing.Optional[typing.Dict] = None,
deserializer: typing.Optional[Deserializer] = None,
Expand All @@ -97,7 +97,7 @@ def __init__(
self.func = func
self.backend = backend
self.consumer_class = consumer_class
self.consumer: typing.Optional[ConsumerType] = None
self.consumer: typing.Optional[Consumer] = None
self.config = config or {}
self._consumer_task: typing.Optional[asyncio.Task] = None
self.name = name or str(uuid.uuid4())
Expand All @@ -114,7 +114,7 @@ def __init__(
# so we always create a list and then we expand it with *topics
self.topics = [topics] if isinstance(topics, str) else topics

def _create_consumer(self) -> ConsumerType:
def _create_consumer(self) -> Consumer:
if self.backend is None:
raise BackendNotSet("A backend has not been set for this stream")
config = {**self.backend.model_dump(), **self.config}
Expand Down Expand Up @@ -240,8 +240,8 @@ async def func_wrapper_with_typing(self) -> None:
cr = await self.getone()
await self.func(cr)

def seek_to_initial_offsets(self):
if not self.seeked_initial_offsets:
def seek_to_initial_offsets(self) -> None:
if not self.seeked_initial_offsets and self.consumer is not None:
assignments: typing.Set[TopicPartition] = self.consumer.assignment()
if self.initial_offsets is not None:
topicPartitionOffset: TopicPartitionOffset
Expand Down
16 changes: 8 additions & 8 deletions kstreams/utils.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import contextlib
import ssl
from tempfile import NamedTemporaryFile
from typing import Optional, Union
from typing import Any, Optional, Union

from aiokafka.helpers import create_ssl_context as aiokafka_create_ssl_context

Expand Down Expand Up @@ -53,13 +53,13 @@ def create_ssl_context_from_mem(

def create_ssl_context(
*,
cafile: str = None,
capath: str = None,
cadata: Union[str, bytes] = None,
certfile: str = None,
keyfile: str = None,
password: str = None,
crlfile=None,
cafile: Optional[str] = None,
capath: Optional[str] = None,
cadata: Union[str, bytes, None] = None,
certfile: Optional[str] = None,
keyfile: Optional[str] = None,
password: Optional[str] = None,
crlfile: Any = None,
):
"""Wrapper of [aiokafka.helpers.create_ssl_context](
https://aiokafka.readthedocs.io/en/stable/api.html#helpers
Expand Down
63 changes: 34 additions & 29 deletions poetry.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ pydantic = ">=2.0.0,<3.0.0"

[tool.poetry.dev-dependencies]
pytest = "^6.1"
mypy = "^0.971"
mypy = "^1.8.0"
pytest-httpserver = "<1.0"
pytest-cov = "^2.11.1"
jedi = "0.17.2"
Expand Down

0 comments on commit b5c8fda

Please sign in to comment.