diff --git a/kstreams/engine.py b/kstreams/engine.py index 3d4affee..b69095f6 100644 --- a/kstreams/engine.py +++ b/kstreams/engine.py @@ -18,7 +18,7 @@ from .streams import Stream, StreamFunc from .streams import stream as stream_func from .streams_utils import StreamErrorPolicy, UDFType -from .types import EngineHooks, Headers, NextMiddlewareCall +from .types import Deprecated, EngineHooks, Headers, NextMiddlewareCall from .utils import encode_headers, execute_hooks logger = logging.getLogger(__name__) @@ -67,7 +67,7 @@ def __init__( producer_class: typing.Type[Producer], monitor: PrometheusMonitor, title: typing.Optional[str] = None, - deserializer: typing.Optional[Deserializer] = None, + deserializer: Deprecated[typing.Optional[Deserializer]] = None, serializer: typing.Optional[Serializer] = None, on_startup: typing.Optional[EngineHooks] = None, on_stop: typing.Optional[EngineHooks] = None, @@ -424,7 +424,7 @@ def stream( topics: typing.Union[typing.List[str], str], *, name: typing.Optional[str] = None, - deserializer: typing.Optional[Deserializer] = None, + deserializer: Deprecated[typing.Optional[Deserializer]] = None, initial_offsets: typing.Optional[typing.List[TopicPartitionOffset]] = None, rebalance_listener: typing.Optional[RebalanceListener] = None, middlewares: typing.Optional[typing.List[Middleware]] = None, diff --git a/kstreams/streams.py b/kstreams/streams.py index 503d03bd..42f2cd55 100644 --- a/kstreams/streams.py +++ b/kstreams/streams.py @@ -3,6 +3,7 @@ import logging import typing import uuid +import warnings from functools import update_wrapper from aiokafka import errors @@ -18,7 +19,7 @@ from .rebalance_listener import RebalanceListener from .serializers import Deserializer from .streams_utils import StreamErrorPolicy, UDFType -from .types import StreamFunc +from .types import Deprecated, StreamFunc if typing.TYPE_CHECKING: from kstreams import StreamEngine @@ -152,7 +153,7 @@ def __init__( consumer_class: typing.Type[Consumer] = Consumer, name: typing.Optional[str] = None, config: typing.Optional[typing.Dict] = None, - deserializer: typing.Optional[Deserializer] = None, + deserializer: Deprecated[typing.Optional[Deserializer]] = None, initial_offsets: typing.Optional[typing.List[TopicPartitionOffset]] = None, rebalance_listener: typing.Optional[RebalanceListener] = None, middlewares: typing.Optional[typing.List[Middleware]] = None, @@ -278,10 +279,11 @@ async def getone(self) -> ConsumerRecord: # call deserializer if there is one regarless consumer_record.value # as the end user might want to do something extra with headers or metadata if self.deserializer is not None: - logger.warning( - "Deserializers will be deprecated in the future, " - "use middlewares instead: https://kpn.github.io/kstreams/middleware/" + msg = ( + "Deserializers are deprecated and don't have any support.\n" + "Consider using middlewares instead:\nhttps://kpn.github.io/kstreams/middleware/" ) + warnings.warn(msg, DeprecationWarning, stacklevel=2) return await self.deserializer.deserialize(consumer_record) return consumer_record @@ -341,11 +343,14 @@ async def start(self) -> None: if self.udf_handler is not None: if self.udf_handler.type == UDFType.NO_TYPING: - # normal use case - logging.warning( - "Streams with `async for in` loop approach might " - "be deprecated. Consider migrating to a typing approach." + # deprecated use case + msg = ( + "Streams with `async for in` loop approach are be deprecated. " + "Consider migrating to the typed approach.\n" + "Read more:\n\n" + "\thttps://kpn.github.io/kstreams/stream/#dependency-injection" ) + warnings.warn(msg, DeprecationWarning, stacklevel=2) func = self.udf_handler.next_call(self) await func @@ -441,7 +446,7 @@ def stream( *, subscribe_by_pattern: bool = False, name: typing.Optional[str] = None, - deserializer: typing.Optional[Deserializer] = None, + deserializer: Deprecated[typing.Optional[Deserializer]] = None, initial_offsets: typing.Optional[typing.List[TopicPartitionOffset]] = None, rebalance_listener: typing.Optional[RebalanceListener] = None, middlewares: typing.Optional[typing.List[Middleware]] = None, diff --git a/kstreams/types.py b/kstreams/types.py index 63951727..2b836367 100644 --- a/kstreams/types.py +++ b/kstreams/types.py @@ -24,3 +24,7 @@ def __call__( serializer: typing.Optional["Serializer"] = None, serializer_kwargs: typing.Optional[typing.Dict] = None, ) -> typing.Awaitable[RecordMetadata]: ... + + +D = typing.TypeVar("D") +Deprecated = typing.Annotated[D, "deprecated"] diff --git a/pyproject.toml b/pyproject.toml index ade9d5ab..e1d17a26 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -63,6 +63,10 @@ major_version_zero = true [tool.pytest.ini_options] asyncio_mode = "auto" log_level = "DEBUG" +filterwarnings = [ + "ignore::DeprecationWarning", + "ignore::PendingDeprecationWarning", +] [[tool.mypy.overrides]] module = "aiokafka.*"