Skip to content

Commit

Permalink
FIX: Update DeprecationWarnings
Browse files Browse the repository at this point in the history
  • Loading branch information
woile committed Oct 9, 2024
1 parent ee8e610 commit df840fb
Show file tree
Hide file tree
Showing 4 changed files with 26 additions and 13 deletions.
6 changes: 3 additions & 3 deletions kstreams/engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
from .streams import Stream, StreamFunc
from .streams import stream as stream_func
from .streams_utils import StreamErrorPolicy, UDFType
from .types import EngineHooks, Headers, NextMiddlewareCall
from .types import Deprecated, EngineHooks, Headers, NextMiddlewareCall
from .utils import encode_headers, execute_hooks

logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -67,7 +67,7 @@ def __init__(
producer_class: typing.Type[Producer],
monitor: PrometheusMonitor,
title: typing.Optional[str] = None,
deserializer: typing.Optional[Deserializer] = None,
deserializer: Deprecated[typing.Optional[Deserializer]] = None,
serializer: typing.Optional[Serializer] = None,
on_startup: typing.Optional[EngineHooks] = None,
on_stop: typing.Optional[EngineHooks] = None,
Expand Down Expand Up @@ -424,7 +424,7 @@ def stream(
topics: typing.Union[typing.List[str], str],
*,
name: typing.Optional[str] = None,
deserializer: typing.Optional[Deserializer] = None,
deserializer: Deprecated[typing.Optional[Deserializer]] = None,
initial_offsets: typing.Optional[typing.List[TopicPartitionOffset]] = None,
rebalance_listener: typing.Optional[RebalanceListener] = None,
middlewares: typing.Optional[typing.List[Middleware]] = None,
Expand Down
25 changes: 15 additions & 10 deletions kstreams/streams.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import logging
import typing
import uuid
import warnings
from functools import update_wrapper

from aiokafka import errors
Expand All @@ -18,7 +19,7 @@
from .rebalance_listener import RebalanceListener
from .serializers import Deserializer
from .streams_utils import StreamErrorPolicy, UDFType
from .types import StreamFunc
from .types import Deprecated, StreamFunc

if typing.TYPE_CHECKING:
from kstreams import StreamEngine
Expand Down Expand Up @@ -152,7 +153,7 @@ def __init__(
consumer_class: typing.Type[Consumer] = Consumer,
name: typing.Optional[str] = None,
config: typing.Optional[typing.Dict] = None,
deserializer: typing.Optional[Deserializer] = None,
deserializer: Deprecated[typing.Optional[Deserializer]] = None,
initial_offsets: typing.Optional[typing.List[TopicPartitionOffset]] = None,
rebalance_listener: typing.Optional[RebalanceListener] = None,
middlewares: typing.Optional[typing.List[Middleware]] = None,
Expand Down Expand Up @@ -278,10 +279,11 @@ async def getone(self) -> ConsumerRecord:
# call deserializer if there is one regarless consumer_record.value
# as the end user might want to do something extra with headers or metadata
if self.deserializer is not None:
logger.warning(
"Deserializers will be deprecated in the future, "
"use middlewares instead: https://kpn.github.io/kstreams/middleware/"
msg = (
"Deserializers are deprecated and don't have any support.\n"
"Use middlewares instead:\nhttps://kpn.github.io/kstreams/middleware/"
)
warnings.warn(msg, DeprecationWarning, stacklevel=2)
return await self.deserializer.deserialize(consumer_record)

return consumer_record
Expand Down Expand Up @@ -341,11 +343,14 @@ async def start(self) -> None:

if self.udf_handler is not None:
if self.udf_handler.type == UDFType.NO_TYPING:
# normal use case
logging.warning(
"Streams with `async for in` loop approach might "
"be deprecated. Consider migrating to a typing approach."
# deprecated use case
msg = (
"Streams with `async for in` loop approach are deprecated.\n"
"Migrate to typed function.\n"
"Read more:\n\n"
"\thttps://kpn.github.io/kstreams/stream/#dependency-injection"
)
warnings.warn(msg, DeprecationWarning, stacklevel=2)

func = self.udf_handler.next_call(self)
await func
Expand Down Expand Up @@ -441,7 +446,7 @@ def stream(
*,
subscribe_by_pattern: bool = False,
name: typing.Optional[str] = None,
deserializer: typing.Optional[Deserializer] = None,
deserializer: Deprecated[typing.Optional[Deserializer]] = None,
initial_offsets: typing.Optional[typing.List[TopicPartitionOffset]] = None,
rebalance_listener: typing.Optional[RebalanceListener] = None,
middlewares: typing.Optional[typing.List[Middleware]] = None,
Expand Down
4 changes: 4 additions & 0 deletions kstreams/types.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,3 +24,7 @@ def __call__(
serializer: typing.Optional["Serializer"] = None,
serializer_kwargs: typing.Optional[typing.Dict] = None,
) -> typing.Awaitable[RecordMetadata]: ...


D = typing.TypeVar("D")
Deprecated = typing.Annotated[D, "deprecated"]
4 changes: 4 additions & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,10 @@ major_version_zero = true
[tool.pytest.ini_options]
asyncio_mode = "auto"
log_level = "DEBUG"
filterwarnings = [
"ignore::DeprecationWarning",
"ignore::PendingDeprecationWarning",
]

[[tool.mypy.overrides]]
module = "aiokafka.*"
Expand Down

0 comments on commit df840fb

Please sign in to comment.