Skip to content

Commit

Permalink
Merge pull request #26 from kpn/fix/add-mypy
Browse files Browse the repository at this point in the history
  • Loading branch information
woile authored Jul 27, 2022
2 parents 3eb4051 + 60b18a4 commit 4f5b5ed
Show file tree
Hide file tree
Showing 33 changed files with 285 additions and 247 deletions.
6 changes: 3 additions & 3 deletions examples/consume_multiple_topics.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
from kstreams import create_engine, Stream

import asyncio
import json

topics = ["local--kstreams", "local--hello-world"]
from kstreams import Stream, create_engine

topics = ["local--kstreams-2", "local--hello-world"]

stream_engine = create_engine(title="my-stream-engine")

Expand Down
1 change: 1 addition & 0 deletions examples/fastapi_example/__main__.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import logging

import uvicorn


Expand Down
5 changes: 3 additions & 2 deletions examples/fastapi_example/app.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
from .streaming.streams import stream_engine
from fastapi import FastAPI
from starlette.responses import Response
from starlette_prometheus import metrics, PrometheusMiddleware
from starlette_prometheus import PrometheusMiddleware, metrics

from .streaming.streams import stream_engine


def create_app():
Expand Down
3 changes: 2 additions & 1 deletion examples/fastapi_example/streaming/streams.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
from .engine import stream_engine
from kstreams import Stream

from .engine import stream_engine


@stream_engine.stream("local--kstream")
async def stream(stream: Stream):
Expand Down
9 changes: 5 additions & 4 deletions examples/json_serialization.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
from kstreams import consts, create_engine, Stream
from kstreams.custom_types import Headers
import asyncio
import json
from typing import Any, Dict, Optional

import aiokafka
import asyncio
import json

from kstreams import Stream, consts, create_engine
from kstreams.custom_types import Headers


class JsonSerializer:
Expand Down
5 changes: 3 additions & 2 deletions examples/simple.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
import asyncio

from aiokafka.structs import RecordMetadata

from kstreams import create_engine
from kstreams.streams import Stream

import asyncio

topic = "local--py-streams"

stream_engine = create_engine(title="my-stream-engine")
Expand Down
2 changes: 1 addition & 1 deletion kstreams/__init__.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from .clients import Consumer, ConsumerType, Producer, ProducerType # noqa: F401
from .create import create_engine, StreamEngine # noqa: F401
from .create import StreamEngine, create_engine # noqa: F401
from .prometheus.monitor import PrometheusMonitor, PrometheusMonitorType # noqa: F401
from .streams import KafkaConsumer, KafkaStream, Stream # noqa: F401
7 changes: 4 additions & 3 deletions kstreams/clients.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
from . import conf, utils
from pkgsettings import PrefixedSettings
import logging
from typing import Callable, Optional, TypeVar

import aiokafka
import logging
from pkgsettings import PrefixedSettings

from . import conf, utils

logger = logging.getLogger(__name__)

Expand Down
3 changes: 2 additions & 1 deletion kstreams/create.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
from typing import Optional

from .clients import Consumer, ConsumerType, Producer, ProducerType
from .engine import StreamEngine
from .prometheus.monitor import PrometheusMonitor, PrometheusMonitorType
from .serializers import ValueDeserializer, ValueSerializer
from typing import Optional


def create_engine(
Expand Down
15 changes: 8 additions & 7 deletions kstreams/engine.py
Original file line number Diff line number Diff line change
@@ -1,17 +1,18 @@
import asyncio
import inspect
import logging
from typing import Any, Coroutine, Dict, List, Optional, Type, Union

from kstreams.clients import ConsumerType, ProducerType
from kstreams.utils import encode_headers

from .custom_types import DecoratedCallable, Headers
from .exceptions import DuplicateStreamException
from .prometheus.monitor import PrometheusMonitorType
from .prometheus.tasks import metrics_task
from .serializers import ValueDeserializer, ValueSerializer
from .singlenton import Singleton
from .streams import KafkaStream, Stream
from kstreams.clients import ConsumerType, ProducerType
from kstreams.utils import encode_headers
from typing import Any, Coroutine, Dict, List, Optional, Type, Union

import asyncio
import inspect
import logging

logger = logging.getLogger(__name__)

Expand Down
6 changes: 4 additions & 2 deletions kstreams/prometheus/monitor.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
from kstreams.singlenton import Singleton
from prometheus_client import Gauge
from typing import Dict, TypeVar

from prometheus_client import Gauge

from kstreams.singlenton import Singleton

PrometheusMonitorType = TypeVar("PrometheusMonitorType", bound="PrometheusMonitor")


Expand Down
9 changes: 5 additions & 4 deletions kstreams/prometheus/tasks.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
from .monitor import PrometheusMonitorType
import asyncio
from typing import Any, DefaultDict, List

from kstreams.clients import Consumer
from kstreams.streams import KafkaStream
from typing import DefaultDict, List

import asyncio
from .monitor import PrometheusMonitorType


async def metrics_task(streams: List[KafkaStream], monitor: PrometheusMonitorType):
Expand Down Expand Up @@ -43,7 +44,7 @@ async def generate_consumer_metrics(consumer: Consumer, monitor: PrometheusMonit
}
"""

metrics = DefaultDict(str)
metrics: DefaultDict[Any, dict] = DefaultDict(dict)

topic_partitions = consumer.assignment()
for topic_partition in topic_partitions:
Expand Down
3 changes: 2 additions & 1 deletion kstreams/serializers.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
from .custom_types import Headers
from typing import Any, Dict, Optional, Protocol

import aiokafka

from .custom_types import Headers


class ValueDeserializer(Protocol):
async def deserialize(
Expand Down
5 changes: 4 additions & 1 deletion kstreams/singlenton.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
from typing import Dict


class Singleton(type):
_instances = {}
_instances: Dict = {}

def __call__(cls, *args, **kwargs):
if cls not in cls._instances:
Expand Down
15 changes: 8 additions & 7 deletions kstreams/streams.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
from .clients import Consumer, ConsumerType
from .serializers import ValueDeserializer
from aiokafka import errors, structs
import asyncio
import inspect
import logging
import uuid
from typing import (
Any,
AsyncGenerator,
Expand All @@ -13,10 +14,10 @@
Union,
)

import asyncio
import inspect
import logging
import uuid
from aiokafka import errors, structs

from .clients import Consumer, ConsumerType
from .serializers import ValueDeserializer

logger = logging.getLogger(__name__)

Expand Down
3 changes: 2 additions & 1 deletion kstreams/test_utils/structs.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
from dataclasses import dataclass
from kstreams.custom_types import KafkaHeaders
from typing import Generic, NamedTuple, Optional, TypeVar

from kstreams.custom_types import KafkaHeaders

KT = TypeVar("KT")
VT = TypeVar("VT")

Expand Down
10 changes: 7 additions & 3 deletions kstreams/test_utils/test_clients.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
from .structs import ConsumerRecord, RecordMetadata, TopicPartition
from .topics import TopicManager
from datetime import datetime
from typing import Any, Coroutine, Dict, List, Optional, Tuple, Union

from kstreams.clients import Consumer, Producer
from kstreams.custom_types import Headers
from kstreams.serializers import ValueSerializer
from typing import Any, Coroutine, Dict, List, Optional, Tuple, Union

from .structs import ConsumerRecord, RecordMetadata, TopicPartition
from .topics import TopicManager


class Base:
Expand Down Expand Up @@ -84,6 +86,8 @@ async def getone(
) -> Union[bytes, Dict]: # The return type must be fixed later on
for topic_partition in self._assigments:
topic = TopicManager.get_topic(topic_partition.topic)
if topic is None:
raise AttributeError("There should be a topic")

if not topic.consumed:
break
Expand Down
13 changes: 7 additions & 6 deletions kstreams/test_utils/test_utils.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,15 @@
from .structs import RecordMetadata
from .test_clients import TestConsumer, TestProducer
from .topics import TopicManager
import asyncio
from types import TracebackType
from typing import Any, Dict, List, Optional, Type

from kstreams.create import create_engine
from kstreams.custom_types import Headers
from kstreams.serializers import ValueSerializer
from kstreams.streams import Stream
from types import TracebackType
from typing import Any, Dict, List, Optional, Type

import asyncio
from .structs import RecordMetadata
from .test_clients import TestConsumer, TestProducer
from .topics import TopicManager


class TestStreamClient:
Expand Down
4 changes: 2 additions & 2 deletions kstreams/test_utils/topics.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
from . import test_clients
import asyncio
from dataclasses import dataclass
from typing import Any, ClassVar, Dict, Optional

import asyncio
from . import test_clients


@dataclass
Expand Down
8 changes: 4 additions & 4 deletions kstreams/utils.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
from kstreams import custom_types
from kstreams.conf import settings
from tempfile import NamedTemporaryFile

import contextlib
import ssl
import typing
from tempfile import NamedTemporaryFile

from kstreams import custom_types
from kstreams.conf import settings


class EmptySSLDataException(Exception):
Expand Down
Loading

0 comments on commit 4f5b5ed

Please sign in to comment.