diff --git a/.bumpversion.toml b/.bumpversion.toml index 46d0b93..7640993 100644 --- a/.bumpversion.toml +++ b/.bumpversion.toml @@ -1,5 +1,5 @@ [tool.bumpversion] -current_version = "0.5.8" +current_version = "0.5.9" parse = "(?P\\d+)\\.(?P\\d+)\\.(?P\\d+)" serialize = ["{major}.{minor}.{patch}"] search = "{current_version}" diff --git a/CHANGELOG.md b/CHANGELOG.md index 877563d..1c794a4 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,9 @@ # Changelog +## 0.5.9 (2024-11-28) +* Add `RETRY_SETTINGS` configuration setting. +* Add blocking retry behaviour and make it the default. + ## 0.5.8 (2024-11-19) * `ModelTopicConsumer.get_defaults` will skip fields not defined on the model. diff --git a/README.md b/README.md index f521d06..ea0d483 100644 --- a/README.md +++ b/README.md @@ -178,9 +178,13 @@ A few notes: 2. [Deletions](https://debezium.io/documentation/reference/stable/transformations/event-flattening.html#extract-new-record-state-delete-tombstone-handling-mode) are detected automatically based on a null message value or the presence of a `__deleted` field. 3. The message key is assumed to contain the model PK as a field, [which is the default behaviour](https://debezium.io/documentation/reference/stable/connectors/postgresql.html#postgresql-property-message-key-columns) for Debezium source connectors. If you need more complicated lookup behaviour, override `get_lookup_kwargs`. -## Non-Blocking Retries: +## Dead Letter Topic: -Add non-blocking retry behaviour to a topic by using the `retry` decorator: +Any message which fails to consume will be sent to the dead letter topic. The dead letter topic name is combined of the consumer group id, the original topic name, and a `.dlt` suffix (controllable with the `DEAD_LETTER_TOPIC_SUFFIX` setting). So for a failed message in `topic` received by consumer `group`, the dead letter topic name would be `group.topic.dlt`. + +## Retries: + +Add retry behaviour to a topic by using the `retry` decorator: ```python from django_kafka import kafka @@ -193,7 +197,17 @@ class RetryableTopic(Topic): ... ``` -When the consumption of a message in a retryable topic fails, the message is re-sent to a topic with a name combined of the consumer group id, the original topic name, a `.retry` suffix, and the retry number. Subsequent failed retries will then be sent to retry topics of incrementing retry number until the maximum attempts are reached, after which it will be sent to a dead letter topic suffixed by `.dlt`. So for a failed message in topic `topic` received by consumer group `group`, the expected topic sequence would be: +You can also configure retry behaviour globally for all topics with the `RETRY_SETTINGS` configuration (see [settings](#settings)). + +Retries can be either blocking or non-blocking, controlled by the `blocking` boolean parameter. By default, all retries are blocking. + +### Blocking Retries: + +When the consumption of a message fails in a blocking retryable topic, the consumer process will pause the partition and retry the message at a later time. Therefore, messages in that partition will be blocked until the failing message succeeds or the maximum retry attempts are reached, after which the message is sent to the dead letter topic. + +### Non-blocking Retries: + +When the consumption of a message fails in a non-blocking retryable topic, the message is re-sent to a topic with a name combined of the consumer group id, the original topic name, a `.retry` suffix (controllable with the `RETRY_TOPIC_SUFFIX` setting), and the retry number. Subsequent failed retries will then be sent to retry topics of incrementing retry number until the maximum attempts are reached, after which it will be sent to a dead letter topic. So for a failed message in topic `topic`, with a maximum retry attempts of 3 and received by consumer group `group`, the expected topic sequence would be: 1. `topic` 2. `group.topic.retry.1` @@ -201,7 +215,7 @@ When the consumption of a message in a retryable topic fails, the message is re- 4. `group.topic.retry.3` 5. `group.topic.dlt` -When consumers are started using [start commands](#start-the-Consumers), an additional retry consumer will be started in parallel for any consumer containing a retryable topic. This retry consumer will be assigned to a consumer group whose id is a combination of the original group id and a `.retry` suffix. This consumer is subscribed to the retry topics, and manages the message retry and delay behaviour. Please note that messages are retried directly by the retry consumer and are not sent back to the original topic. +When consumers are started using [start commands](#start-the-Consumers), an additional retry consumer will be started in parallel for any consumer containing a non-blocking retryable topic. This retry consumer will be assigned to a consumer group whose id is a combination of the original group id and a `.retry` suffix. This consumer is subscribed to the retry topics, and manages the message retry and delay behaviour. Please note that messages are retried directly by the retry consumer and are not sent back to the original topic. ## Connectors: @@ -282,6 +296,7 @@ DJANGO_KAFKA = { "enable.auto.offset.store": False, "topic.metadata.refresh.interval.ms": 10000, }, + "RETRY_SETTINGS": None, "RETRY_TOPIC_SUFFIX": "retry", "DEAD_LETTER_TOPIC_SUFFIX": "dlt", "POLLING_FREQUENCY": 1, # seconds @@ -326,6 +341,34 @@ Default: `{}` Defines configurations of the consumer. See [configs marked with `C`](https://github.com/confluentinc/librdkafka/blob/master/CONFIGURATION.md). +#### `RETRY_CONSUMER_CONFIG` +Default: + +```py +{ + "auto.offset.reset": "earliest", + "enable.auto.offset.store": False, + "topic.metadata.refresh.interval.ms": 10000, +} +``` + +Defines configuration for the retry consumer. See [Non-blocking retries](#non-blocking-retries). + +#### `RETRY_TOPIC_SUFFIX` +Default: `retry` + +Defines the retry topic suffix. See [Non-blocking retries](#non-blocking-retries). + +#### `RETRY_SETTINGS` +Default: `None` + +Defines the default retry settings. See [retries](#retries). + +#### `DEAD_LETTER_TOPIC_SUFFIX` +Default: `dlt` + +Defines the dead letter topic suffix. See [Dead Letter Topic](#dead-letter-topic). + #### `POLLING_FREQUENCY` Default: 1 # second diff --git a/django_kafka/__init__.py b/django_kafka/__init__.py index f83ecb0..efb5190 100644 --- a/django_kafka/__init__.py +++ b/django_kafka/__init__.py @@ -18,7 +18,7 @@ logger = logging.getLogger(__name__) -__version__ = "0.5.8" +__version__ = "0.5.9" __all__ = [ "autodiscover", diff --git a/django_kafka/conf.py b/django_kafka/conf.py index 0b7845b..07f9e7b 100644 --- a/django_kafka/conf.py +++ b/django_kafka/conf.py @@ -14,6 +14,7 @@ "enable.auto.offset.store": False, "topic.metadata.refresh.interval.ms": 10000, }, + "RETRY_SETTINGS": None, "RETRY_TOPIC_SUFFIX": "retry", "DEAD_LETTER_TOPIC_SUFFIX": "dlt", "POLLING_FREQUENCY": 1, # seconds diff --git a/django_kafka/consumer/__init__.py b/django_kafka/consumer/__init__.py new file mode 100644 index 0000000..8e951b5 --- /dev/null +++ b/django_kafka/consumer/__init__.py @@ -0,0 +1,7 @@ +from .consumer import Consumer +from .topics import Topics + +__all__ = [ + "Consumer", + "Topics", +] diff --git a/django_kafka/consumer.py b/django_kafka/consumer/consumer.py similarity index 50% rename from django_kafka/consumer.py rename to django_kafka/consumer/consumer.py index 5347786..3908fd7 100644 --- a/django_kafka/consumer.py +++ b/django_kafka/consumer/consumer.py @@ -1,45 +1,25 @@ import logging import traceback +from datetime import datetime from pydoc import locate from typing import TYPE_CHECKING, Optional from confluent_kafka import Consumer as ConfluentConsumer -from confluent_kafka import cimpl from django_kafka.conf import settings -from django_kafka.exceptions import DjangoKafkaError + +from .managers import PauseManager, RetryManager +from .topics import Topics if TYPE_CHECKING: + from confluent_kafka import cimpl + + from django_kafka.retry.settings import RetrySettings from django_kafka.topic import TopicConsumer logger = logging.getLogger(__name__) -class Topics: - _topic_consumers: tuple["TopicConsumer", ...] - _match: dict[str, "TopicConsumer"] - - def __init__(self, *topic_consumers: "TopicConsumer"): - self._topic_consumers = topic_consumers - self._match: dict[str, TopicConsumer] = {} - - def get(self, topic_name: str) -> "TopicConsumer": - if topic_name not in self._match: - topic_consumer = next((t for t in self if t.matches(topic_name)), None) - if not topic_consumer: - raise DjangoKafkaError(f"No topic registered for `{topic_name}`") - self._match[topic_name] = topic_consumer - - return self._match[topic_name] - - @property - def names(self) -> list[str]: - return [topic.name for topic in self] - - def __iter__(self): - yield from self._topic_consumers - - class Consumer: """ Available settings of the producers (P) and consumers (C) @@ -62,6 +42,8 @@ class Consumer: def __init__(self): self.config = self.build_config() self._consumer = ConfluentConsumer(self.config) + self._retries = RetryManager() # blocking retry manager + self._pauses = PauseManager() def __getattr__(self, name): """proxy consumer methods.""" @@ -82,7 +64,7 @@ def build_config(cls): def group_id(self) -> str: return self.config["group.id"] - def commit_offset(self, msg: cimpl.Message): + def commit_offset(self, msg: "cimpl.Message"): if not self.config.get("enable.auto.offset.store"): # Store the offset associated with msg to a local cache. # Stored offsets are committed to Kafka by a background @@ -90,20 +72,73 @@ def commit_offset(self, msg: cimpl.Message): # Explicitly storing offsets after processing gives at-least once semantics. self.store_offsets(msg) - def retry_msg(self, msg: cimpl.Message, exc: Exception) -> bool: + def pause_partition(self, msg: "cimpl.Message", until: datetime): + """pauses message partition to process the message at a later time + + note: pausing is only retained within the python consumer class, and is not + retained between separate consumer processes. + """ + partition = self._pauses.set(msg, until) + self.seek(partition) # seek back to message offset to re-poll on unpause + self.pause([partition]) + + def resume_partitions(self): + """resumes any paused partitions that are now ready""" + for partition in self._pauses.pop_ready(): + self.resume([partition]) + + def blocking_retry( + self, + retry_settings: "RetrySettings", + msg: "cimpl.Message", + exc: Exception, + ) -> bool: + """ + blocking retry, managed within the same consumer process + + :return: whether the message will be retried + """ + attempt = self._retries.next(msg) + if retry_settings.can_retry(attempt, exc): + until = retry_settings.get_retry_time(attempt) + self.pause_partition(msg, until) + self.log_error(exc) + return True + return False + + def non_blocking_retry( + self, + retry_settings: "RetrySettings", + msg: "cimpl.Message", + exc: Exception, + ): + """ + non-blocking retry, managed by a separate topic and consumer process + + :return: whether the message will be retried + """ from django_kafka.retry.topic import RetryTopicProducer - topic_consumer = self.get_topic_consumer(msg) - if not topic_consumer.retry_settings: - return False - return RetryTopicProducer( + retry_settings=retry_settings, group_id=self.group_id, - retry_settings=topic_consumer.retry_settings, msg=msg, ).retry(exc=exc) - def dead_letter_msg(self, msg: cimpl.Message, exc: Exception): + def retry_msg(self, msg: "cimpl.Message", exc: Exception) -> (bool, bool): + """ + :return tuple: The first element indicates if the message was retried and the + second indicates if the retry was blocking. + """ + retry_settings = self.get_topic(msg).retry_settings + if not retry_settings: + return False, False + if retry_settings.blocking: + return self.blocking_retry(retry_settings, msg, exc), True + return self.non_blocking_retry(retry_settings, msg, exc), False + + def dead_letter_msg(self, msg: "cimpl.Message", exc: Exception): + """publishes a message to the dead letter topic, with exception details""" from django_kafka.dead_letter.topic import DeadLetterTopicProducer DeadLetterTopicProducer(group_id=self.group_id, msg=msg).produce_for( @@ -111,35 +146,43 @@ def dead_letter_msg(self, msg: cimpl.Message, exc: Exception): header_detail=traceback.format_exc(), ) - def handle_exception(self, msg: cimpl.Message, exc: Exception): - retried = self.retry_msg(msg, exc) + def handle_exception(self, msg: "cimpl.Message", exc: Exception) -> bool: + """ + return bool: Indicates if the message was processed and offset can be committed. + """ + retried, blocking = self.retry_msg(msg, exc) if not retried: self.dead_letter_msg(msg, exc) self.log_error(exc) + return True + return not blocking - def get_topic_consumer(self, msg: cimpl.Message) -> "TopicConsumer": + def get_topic(self, msg: "cimpl.Message") -> "TopicConsumer": return self.topics.get(topic_name=msg.topic()) def log_error(self, error): logger.error(error, exc_info=True) def consume(self, msg): - self.get_topic_consumer(msg).consume(msg) + self.get_topic(msg).consume(msg) - def process_message(self, msg: cimpl.Message): + def process_message(self, msg: "cimpl.Message"): if msg_error := msg.error(): self.log_error(msg_error) return try: self.consume(msg) - # ruff: noqa: BLE001 (we do not want consumer to stop if message consumption fails in any circumstances) except Exception as exc: - self.handle_exception(msg, exc) + # ruff: noqa: BLE001 (do not stop consumer if message consumption fails in any circumstances) + processed = self.handle_exception(msg, exc) + else: + processed = True - self.commit_offset(msg) + if processed: + self.commit_offset(msg) - def poll(self) -> Optional[cimpl.Message]: + def poll(self) -> Optional["cimpl.Message"]: # poll for self.polling_freq seconds return self._consumer.poll(timeout=self.polling_freq) @@ -150,6 +193,7 @@ def run(self): try: self.start() while True: + self.resume_partitions() if (msg := self.poll()) is not None: self.process_message(msg) except Exception as exc: diff --git a/django_kafka/consumer/managers.py b/django_kafka/consumer/managers.py new file mode 100644 index 0000000..328acfd --- /dev/null +++ b/django_kafka/consumer/managers.py @@ -0,0 +1,71 @@ +from datetime import datetime +from typing import TYPE_CHECKING, Iterator + +from confluent_kafka import TopicPartition +from django.utils import timezone + +if TYPE_CHECKING: + from confluent_kafka import cimpl + + +class PauseManager: + """Manager for partition pauses""" + + __pauses: dict[TopicPartition, datetime] + + def __init__(self): + self.__pauses = {} + + @staticmethod + def get_msg_partition(msg: "cimpl.Message") -> TopicPartition: + """returns the message topic partition""" + return TopicPartition(msg.topic(), msg.partition()) + + def set(self, msg: "cimpl.Message", until: datetime) -> TopicPartition: + """adds message partition to the pause list, returning the partition""" + tp = self.get_msg_partition(msg) + self.__pauses[tp] = until + return tp + + def pop_ready(self) -> Iterator[TopicPartition]: + """returns the partitions ready to resume, removing them from the pause list""" + now = timezone.now() + for tp, pause in list(self.__pauses.items()): + if now >= pause: + yield tp + del self.__pauses[tp] + + def reset(self): + self.__pauses = {} + + +class RetryManager: + """Manager for blocking message retry attempts""" + + __retries: dict[TopicPartition, int] + + def __init__(self): + self.__retries = {} + + @staticmethod + def get_msg_partition(msg: "cimpl.Message") -> TopicPartition: + """returns the message topic partition, set to the message offset + + Note: TopicPartition hashes based on topic/partition, but not the offset. + """ + return TopicPartition(msg.topic(), msg.partition(), msg.offset()) + + def next(self, msg: "cimpl.Message"): + """increments and returns the partition attempt count""" + msg_tp = self.get_msg_partition(msg) + for tp in self.__retries: + if tp == msg_tp and tp.offset != msg_tp.offset: + del self.__retries[tp] # new offset encountered, reset entry + break + + next_attempt = self.__retries.get(msg_tp, 0) + 1 + self.__retries[msg_tp] = next_attempt + return next_attempt + + def reset(self): + self.__retries = {} diff --git a/django_kafka/consumer/topics.py b/django_kafka/consumer/topics.py new file mode 100644 index 0000000..1393f60 --- /dev/null +++ b/django_kafka/consumer/topics.py @@ -0,0 +1,34 @@ +from django_kafka.exceptions import DjangoKafkaError +from django_kafka.topic import TopicConsumer + + +class Topics: + _topic_consumers: tuple["TopicConsumer", ...] + _match: dict[str, "TopicConsumer"] + + def __init__(self, *topic_consumers: "TopicConsumer"): + self._topic_consumers = topic_consumers + self._match: dict[str, TopicConsumer] = {} + + def get(self, topic_name: str) -> "TopicConsumer": + if topic_name not in self._match: + topic_consumer = next((t for t in self if t.matches(topic_name)), None) + if not topic_consumer: + raise DjangoKafkaError(f"No topic registered for `{topic_name}`") + self._match[topic_name] = topic_consumer + + return self._match[topic_name] + + def get_retryable(self, blocking=False) -> list["TopicConsumer"]: + return [ + topic + for topic in self._topic_consumers + if topic.retry_settings and topic.retry_settings.blocking == blocking + ] + + @property + def names(self) -> list[str]: + return [topic.name for topic in self] + + def __iter__(self): + yield from self._topic_consumers diff --git a/django_kafka/dead_letter/header.py b/django_kafka/dead_letter/header.py index 1698625..73c948e 100644 --- a/django_kafka/dead_letter/header.py +++ b/django_kafka/dead_letter/header.py @@ -1,6 +1,6 @@ -from enum import StrEnum +from django_kafka.utils.message import Header -class DeadLetterHeader(StrEnum): +class DeadLetterHeader(Header): MESSAGE = "DEAD_LETTER_MESSAGE" DETAIL = "DEAD_LETTER_DETAIL" diff --git a/django_kafka/dead_letter/topic.py b/django_kafka/dead_letter/topic.py index a96c219..6227744 100644 --- a/django_kafka/dead_letter/topic.py +++ b/django_kafka/dead_letter/topic.py @@ -33,12 +33,11 @@ def name(self) -> str: return f"{self.group_id}.{topic}.{self.suffix()}" def produce_for(self, header_message, header_detail): - headers = [ - (DeadLetterHeader.MESSAGE, header_message), - (DeadLetterHeader.DETAIL, header_detail), - ] self.produce( key=self.msg.key(), value=self.msg.value(), - headers=headers, + headers=[ + (DeadLetterHeader.MESSAGE, header_message), + (DeadLetterHeader.DETAIL, header_detail), + ], ) diff --git a/django_kafka/management/commands/kafka_connect.py b/django_kafka/management/commands/kafka_connect.py index e8edb5f..3ff6ba2 100644 --- a/django_kafka/management/commands/kafka_connect.py +++ b/django_kafka/management/commands/kafka_connect.py @@ -6,8 +6,8 @@ from requests.exceptions import RetryError from django_kafka import kafka -from django_kafka.exceptions import DjangoKafkaError from django_kafka.connect.connector import Connector, ConnectorStatus +from django_kafka.exceptions import DjangoKafkaError from django_kafka.management.commands.errors import substitute_error from django_kafka.utils import retry @@ -54,7 +54,7 @@ def add_arguments(self, parser): action="store_true", default=False, help="The command wont fail if failures were detected. " - "By default if any failures were detected the command exist with error status.", + "By default if any failures were detected the command exist with error status.", ) def __init__(self, *args, **kwargs): @@ -68,7 +68,14 @@ def handle(self, connector, **options): self.list_connectors() return - if not any((connector, options["validate"], options["publish"], options["check_status"])): + if not any( + ( + connector, + options["validate"], + options["publish"], + options["check_status"], + ), + ): self.print_help("manage.py", "kafka_connect") return @@ -99,14 +106,16 @@ def list_connectors(self): def handle_validate(self): self.stdout.write(self.style.SUCCESS("Validating connectors...")) - + for connector_path in self.connectors: self.stdout.write(f"{connector_path}: ", ending="") connector = kafka.connectors[connector_path]() if connector.mark_for_removal: - self.stdout.write(self.style.WARNING("skip (REASON: marked for removal)")) + self.stdout.write( + self.style.WARNING("skip (REASON: marked for removal)"), + ) continue try: @@ -184,7 +193,9 @@ def handle_delete(self, connector: Connector): if deleted: self.stdout.write(self.style.SUCCESS("deleted")) else: - self.stdout.write(self.style.WARNING("does not exist (already deleted)")) + self.stdout.write( + self.style.WARNING("does not exist (already deleted)"), + ) def handle_submit(self, connector: Connector): try: diff --git a/django_kafka/retry/consumer.py b/django_kafka/retry/consumer.py index 4a1890f..46ac4cd 100644 --- a/django_kafka/retry/consumer.py +++ b/django_kafka/retry/consumer.py @@ -1,8 +1,6 @@ import traceback -from datetime import datetime -from typing import TYPE_CHECKING, Optional, Type, cast +from typing import TYPE_CHECKING, Optional, Type -from confluent_kafka import TopicPartition, cimpl from django.utils import timezone from django_kafka.conf import settings @@ -12,6 +10,8 @@ from django_kafka.retry.topic import RetryTopicConsumer if TYPE_CHECKING: + from confluent_kafka import cimpl + from django_kafka.topic import TopicConsumer @@ -27,17 +27,12 @@ def __init__(self, group_id: str, *topic_consumers: "TopicConsumer"): class RetryConsumer(Consumer): topics: RetryTopics - resume_times: dict[TopicPartition, datetime] - - def __init__(self): - super().__init__() - self.resume_times = {} @classmethod def build(cls, consumer_cls: Type["Consumer"]) -> Optional[Type["RetryConsumer"]]: """Generates RetryConsumer subclass linked to consumer class retryable topics""" - retryable_tcs = [t for t in consumer_cls.topics if t.retry_settings] - if not retryable_tcs: + retryable_topics = consumer_cls.topics.get_retryable(blocking=False) + if not retryable_topics: return None group_id = consumer_cls.build_config()["group.id"] @@ -46,7 +41,7 @@ def build(cls, consumer_cls: Type["Consumer"]) -> Optional[Type["RetryConsumer"] f"{consumer_cls.__name__}Retry", (cls, consumer_cls), { - "topics": RetryTopics(group_id, *retryable_tcs), + "topics": RetryTopics(group_id, *retryable_topics), "config": { **getattr(cls, "config", {}), "group.id": f"{group_id}.retry", @@ -62,44 +57,20 @@ def build_config(cls): **getattr(cls, "config", {}), } - def retry_msg(self, msg: cimpl.Message, exc: Exception) -> bool: - rt_consumer = cast(RetryTopicConsumer, self.get_topic_consumer(msg)) - return rt_consumer.producer_for(msg).retry(exc) + def retry_msg(self, msg: "cimpl.Message", exc: Exception) -> (bool, bool): + retry_topic: RetryTopicConsumer = self.get_topic(msg) + return retry_topic.producer_for(msg).retry(exc), False - def dead_letter_msg(self, msg: cimpl.Message, exc: Exception): - rt_consumer = cast(RetryTopicConsumer, self.get_topic_consumer(msg)) - DeadLetterTopicProducer(group_id=rt_consumer.group_id, msg=msg).produce_for( + def dead_letter_msg(self, msg: "cimpl.Message", exc: Exception): + retry_topic: RetryTopicConsumer = self.get_topic(msg) + DeadLetterTopicProducer(group_id=retry_topic.group_id, msg=msg).produce_for( header_message=str(exc), header_detail=traceback.format_exc(), ) - def pause_partition(self, msg, until: datetime): - """pauses the partition and stores the resumption time""" - tp = TopicPartition(msg.topic(), msg.partition(), msg.offset()) - self.seek(tp) # seek back to message offset, so it is re-polled on unpause - self.pause([tp]) - self.resume_times[tp] = until - - def resume_ready_partitions(self): - """resumes any partitions that were paused""" - now = timezone.now() - for tp, until in list(self.resume_times.items()): - if now < until: - continue - self.resume([tp]) - del self.resume_times[tp] - - def poll(self): - self.resume_ready_partitions() - return super().poll() - - def process_message(self, msg: cimpl.Message): + def process_message(self, msg: "cimpl.Message"): retry_time = RetryHeader.get_retry_time(msg.headers()) if retry_time and retry_time > timezone.now(): self.pause_partition(msg, retry_time) return super().process_message(msg) - - def stop(self): - self.resume_times = {} - super().stop() diff --git a/django_kafka/retry/header.py b/django_kafka/retry/header.py index 0271a11..a12b459 100644 --- a/django_kafka/retry/header.py +++ b/django_kafka/retry/header.py @@ -1,25 +1,19 @@ from datetime import datetime -from enum import StrEnum from typing import Optional from django.utils import timezone +from django_kafka.utils.message import Header -class RetryHeader(StrEnum): + +class RetryHeader(Header): MESSAGE = "RETRY_MESSAGE" TIMESTAMP = "RETRY_TIMESTAMP" - @staticmethod - def get_header(headers: list[tuple], header: "RetryHeader") -> Optional[str]: - return next((v for k, v in headers if k == header), None) - - @staticmethod - def get_retry_time(headers: Optional[list[tuple]]) -> Optional["datetime"]: + @classmethod + def get_retry_time(cls, headers: Optional[list[tuple]]) -> Optional["datetime"]: """returns the retry time from the message headers""" - if not headers: - return None - - header = RetryHeader.get_header(headers, RetryHeader.TIMESTAMP) + header = RetryHeader.get(headers, RetryHeader.TIMESTAMP) try: epoch = float(header) except (TypeError, ValueError): diff --git a/django_kafka/retry/settings.py b/django_kafka/retry/settings.py index a6d0cac..ad5114c 100644 --- a/django_kafka/retry/settings.py +++ b/django_kafka/retry/settings.py @@ -1,3 +1,4 @@ +from datetime import datetime, timedelta from typing import TYPE_CHECKING, Optional, Type from django.utils import timezone @@ -14,16 +15,18 @@ def __init__( backoff: bool = False, include: Optional[list[Type[Exception]]] = None, exclude: Optional[list[Type[Exception]]] = None, + blocking: bool = True, ): """ - :param max_retries: maximum number of retry attempts + :param max_retries: maximum number of retry attempts (use -1 for infinite) :param delay: delay (seconds) :param backoff: exponential backoff :param include: exception types to retry for :param exclude: exception types to exclude from retry + :param blocking: block consumer process during retry """ - if max_retries <= 0: - raise ValueError("max_retries must be greater than zero") + if max_retries < -1: + raise ValueError("max_retries must be greater than -1") if delay <= 0: raise ValueError("delay must be greater than zero") if include is not None and exclude is not None: @@ -34,22 +37,28 @@ def __init__( self.backoff = backoff self.include = include self.exclude = exclude + self.blocking = blocking def __call__(self, topic_cls: Type["TopicConsumer"]): topic_cls.retry_settings = self return topic_cls def attempts_exceeded(self, attempt): + if self.max_retries == -1: + return False return attempt > self.max_retries - def should_retry(self, exc: Exception) -> bool: + def can_retry(self, attempt: int, exc: Exception) -> bool: + if self.attempts_exceeded(attempt): + return False if self.include is not None: return any(isinstance(exc, e) for e in self.include) if self.exclude is not None: return not any(isinstance(exc, e) for e in self.exclude) - return True - def get_retry_timestamp(self, attempt: int) -> str: - delay = self.delay * 2 ** (attempt - 1) if self.backoff else self.delay - return str(timezone.now().timestamp() + delay) + def get_retry_delay(self, attempt: int) -> int: + return self.delay * (2 ** (attempt - 1)) if self.backoff else self.delay + + def get_retry_time(self, attempt: int) -> datetime: + return timezone.now() + timedelta(seconds=self.get_retry_delay(attempt)) diff --git a/django_kafka/retry/topic.py b/django_kafka/retry/topic.py index a14e54c..3fa8b73 100644 --- a/django_kafka/retry/topic.py +++ b/django_kafka/retry/topic.py @@ -19,10 +19,15 @@ class RetryTopicProducer(TopicProducer): def __init__( self, - group_id: str, retry_settings: "RetrySettings", + group_id: str, msg: "cimpl.Message", ): + if retry_settings.blocking: + raise DjangoKafkaError( + "RetryTopicProducer requires non-blocking RetrySettings", + ) + self.settings = retry_settings self.group_id = group_id self.msg = msg @@ -54,21 +59,17 @@ def name(self) -> str: return f"{self.group_id}.{self.msg.topic()}.{suffix}" def retry(self, exc: Exception) -> bool: - if not self.settings.should_retry(exc=exc): + if not self.settings.can_retry(attempt=self.attempt, exc=exc): return False - if self.settings.attempts_exceeded(attempt=self.attempt): - return False + retry_timestamp = self.settings.get_retry_time(self.attempt).timestamp() self.produce( key=self.msg.key(), value=self.msg.value(), headers=[ (RetryHeader.MESSAGE, str(exc)), - ( - RetryHeader.TIMESTAMP, - self.settings.get_retry_timestamp(self.attempt), - ), + (RetryHeader.TIMESTAMP, str(retry_timestamp)), ], ) return True @@ -79,12 +80,13 @@ class RetryTopicConsumer(TopicConsumer): value_deserializer = NoOpSerializer def __init__(self, group_id: str, topic_consumer: TopicConsumer): - if not topic_consumer.retry_settings: + retry_settings = topic_consumer.retry_settings + if not retry_settings or retry_settings.blocking: raise DjangoKafkaError( - f"TopicConsumer {topic_consumer} is not marked for retry", + f"TopicConsumer {topic_consumer} is not marked for non-blocking retry", ) - self.topic_consumer = topic_consumer self.group_id = group_id + self.topic_consumer = topic_consumer super().__init__() @property @@ -103,7 +105,7 @@ def consume(self, msg: "cimpl.Message"): def producer_for(self, msg: "cimpl.Message") -> RetryTopicProducer: return RetryTopicProducer( - group_id=self.group_id, retry_settings=self.topic_consumer.retry_settings, + group_id=self.group_id, msg=msg, ) diff --git a/django_kafka/tests/__init__.py b/django_kafka/tests/__init__.py index e69de29..5a8e144 100644 --- a/django_kafka/tests/__init__.py +++ b/django_kafka/tests/__init__.py @@ -0,0 +1,14 @@ +from unittest.mock import Mock + + +def message_mock(topic="topic", partition=0, offset=0, error=None, headers=None): + """mocking utility for confluent_kafka.cimpl.Message""" + return Mock( + **{ + "topic.return_value": topic, + "partition.return_value": partition, + "offset.return_value": offset, + "headers.return_value": headers, + "error.return_value": error, + }, + ) diff --git a/django_kafka/tests/retry/test_retry.py b/django_kafka/tests/retry/test_retry.py deleted file mode 100644 index 6edf86d..0000000 --- a/django_kafka/tests/retry/test_retry.py +++ /dev/null @@ -1,70 +0,0 @@ -from unittest import mock - -from django.test import TestCase - -from django_kafka.retry.settings import RetrySettings -from django_kafka.topic import Topic - - -class RetrySettingTestCase(TestCase): - def test_should_retry__include(self): - settings = RetrySettings(max_retries=5, delay=60, include=[ValueError]) - - self.assertEqual(settings.should_retry(ValueError()), True) - self.assertEqual(settings.should_retry(IndexError()), False) - - def test_should_retry__exclude(self): - settings = RetrySettings(max_retries=5, delay=60, exclude=[ValueError]) - - self.assertEqual(settings.should_retry(ValueError()), False) - self.assertEqual(settings.should_retry(IndexError()), True) - - def test_should_retry__no_include_exclude(self): - settings = RetrySettings(max_retries=5, delay=60) - - self.assertEqual(settings.should_retry(ValueError()), True) - - def test_attempts_exceeded(self): - settings = RetrySettings(max_retries=5, delay=60) - - self.assertEqual(settings.attempts_exceeded(6), True) - self.assertEqual(settings.attempts_exceeded(5), False) - self.assertEqual(settings.attempts_exceeded(4), False) - - @mock.patch("django.utils.timezone.now") - def test_get_retry_time__fixed_delay(self, mock_now): - settings = RetrySettings(max_retries=5, delay=60, backoff=False) - - timestamp = 1000 - mock_now.return_value.timestamp.return_value = timestamp - self.assertEqual(settings.get_retry_timestamp(1), str(timestamp + 60)) - self.assertEqual(settings.get_retry_timestamp(2), str(timestamp + 60)) - self.assertEqual(settings.get_retry_timestamp(3), str(timestamp + 60)) - self.assertEqual(settings.get_retry_timestamp(4), str(timestamp + 60)) - - @mock.patch("django.utils.timezone.now") - def test_get_retry_time__backoff_delay(self, mock_now): - settings = RetrySettings(max_retries=5, delay=60, backoff=True) - - timestamp = 1000 - mock_now.return_value.timestamp.return_value = timestamp - self.assertEqual(settings.get_retry_timestamp(1), str(timestamp + 60)) - self.assertEqual(settings.get_retry_timestamp(2), str(timestamp + 120)) - self.assertEqual(settings.get_retry_timestamp(3), str(timestamp + 240)) - self.assertEqual(settings.get_retry_timestamp(4), str(timestamp + 480)) - - -class RetryDecoratorTestCase(TestCase): - def test_retry_decorator(self): - class TopicA(Topic): - pass - - retry = RetrySettings(max_retries=5, delay=60) - result = retry(TopicA) - - self.assertIs(result, TopicA) - self.assertIsNotNone(TopicA.retry_settings) - self.assertEqual( - [TopicA.retry_settings.max_retries, TopicA.retry_settings.delay], - [5, 60], - ) diff --git a/django_kafka/tests/connect/__init__.py b/django_kafka/tests/test_connect/__init__.py similarity index 100% rename from django_kafka/tests/connect/__init__.py rename to django_kafka/tests/test_connect/__init__.py diff --git a/django_kafka/tests/connect/test_client.py b/django_kafka/tests/test_connect/test_client.py similarity index 100% rename from django_kafka/tests/connect/test_client.py rename to django_kafka/tests/test_connect/test_client.py diff --git a/django_kafka/tests/connect/test_connector.py b/django_kafka/tests/test_connect/test_connector.py similarity index 100% rename from django_kafka/tests/connect/test_connector.py rename to django_kafka/tests/test_connect/test_connector.py diff --git a/django_kafka/tests/connect/test_kafka_connect_command.py b/django_kafka/tests/test_connect/test_kafka_connect_command.py similarity index 100% rename from django_kafka/tests/connect/test_kafka_connect_command.py rename to django_kafka/tests/test_connect/test_kafka_connect_command.py diff --git a/django_kafka/tests/connect/test_models.py b/django_kafka/tests/test_connect/test_models.py similarity index 100% rename from django_kafka/tests/connect/test_models.py rename to django_kafka/tests/test_connect/test_models.py diff --git a/django_kafka/tests/connect/test_substitute_error.py b/django_kafka/tests/test_connect/test_substitute_error.py similarity index 100% rename from django_kafka/tests/connect/test_substitute_error.py rename to django_kafka/tests/test_connect/test_substitute_error.py diff --git a/django_kafka/tests/dead_letter/__init__.py b/django_kafka/tests/test_consumer/__init__.py similarity index 100% rename from django_kafka/tests/dead_letter/__init__.py rename to django_kafka/tests/test_consumer/__init__.py diff --git a/django_kafka/tests/test_consumer.py b/django_kafka/tests/test_consumer/test_consumer.py similarity index 51% rename from django_kafka/tests/test_consumer.py rename to django_kafka/tests/test_consumer/test_consumer.py index befdb14..c206b56 100644 --- a/django_kafka/tests/test_consumer.py +++ b/django_kafka/tests/test_consumer/test_consumer.py @@ -1,11 +1,16 @@ +import datetime import traceback from contextlib import suppress from unittest.mock import MagicMock, Mock, call, patch +from confluent_kafka import TopicPartition from django.test import TestCase, override_settings +from django.utils import timezone from django_kafka.conf import SETTINGS_KEY, settings from django_kafka.consumer import Consumer, Topics +from django_kafka.retry.settings import RetrySettings +from django_kafka.tests.utils import message_mock class StopWhileTrue: @@ -17,7 +22,7 @@ def __init__(self, stop_on): def __call__(self, arg): if arg == self.stop_on: - raise self.Error() + raise self.Error class ConsumerTestCase(TestCase): @@ -34,7 +39,7 @@ class SomeConsumer(Consumer): side_effect=StopWhileTrue(stop_on=False), ) @patch( - "django_kafka.consumer.ConfluentConsumer", + "django_kafka.consumer.consumer.ConfluentConsumer", **{ # first iteration - no message (None) # second iteration - 1 non-empty message represented as True for testing @@ -48,6 +53,7 @@ class SomeConsumer(Consumer): log_error = Mock() consumer = SomeConsumer() + consumer.resume_partitions = Mock() # hack to break infinite loop # `consumer.start` is using `while True:` loop which never ends @@ -55,6 +61,9 @@ class SomeConsumer(Consumer): with suppress(StopWhileTrue.Error): consumer.run() + # assert partitions were resumed for each poll + self.assertEqual(consumer.resume_partitions.call_count, 3) + # subscribed to the topics defined by consumer class mock_consumer_client.return_value.subscribe.assert_called_once_with( topics=consumer.topics.names, @@ -68,28 +77,81 @@ class SomeConsumer(Consumer): # process_message called twice as None message is ignored self.assertEqual(mock_process_message.call_args_list, [call(True), call(False)]) + @patch("django_kafka.consumer.consumer.ConfluentConsumer") + def test_pause_partition(self, mock_confluent_consumer): + class SomeConsumer(Consumer): + topics = MagicMock() + + consumer = SomeConsumer() + mock_msg = message_mock() + partition = TopicPartition( + mock_msg.topic(), + mock_msg.partition(), + mock_msg.offset(), + ) + consumer._pauses.set = Mock(return_value=partition) + retry_time = timezone.now() + + consumer.pause_partition(mock_msg, retry_time) + + consumer._pauses.set.assert_called_once_with(mock_msg, retry_time) + mock_confluent_consumer.return_value.seek.assert_called_once_with(partition) + mock_confluent_consumer.return_value.pause.assert_called_once_with([partition]) + + @patch("django_kafka.consumer.consumer.ConfluentConsumer") + def test_resume_partitions__before_time(self, mock_confluent_consumer): + class SomeConsumer(Consumer): + topics = MagicMock() + + consumer = SomeConsumer() + mock_msg = message_mock() + retry_time = timezone.now() + datetime.timedelta(minutes=1) + + consumer.pause_partition(mock_msg, retry_time) + consumer.resume_partitions() + + mock_confluent_consumer.return_value.resume.assert_not_called() + + @patch("django_kafka.consumer.consumer.ConfluentConsumer") + def test_resume_partitions__after_time(self, mock_confluent_consumer): + class SomeConsumer(Consumer): + topics = MagicMock() + + consumer = SomeConsumer() + mock_msg = message_mock() + partition = TopicPartition( + mock_msg.topic(), + mock_msg.partition(), + mock_msg.offset(), + ) + retry_time = timezone.now() - datetime.timedelta(minutes=1) + + consumer.pause_partition(mock_msg, retry_time) + consumer.resume_partitions() + + mock_confluent_consumer.return_value.resume.assert_called_once_with([partition]) + @patch("django_kafka.consumer.Consumer.commit_offset") - @patch("django_kafka.consumer.ConfluentConsumer") + @patch("django_kafka.consumer.consumer.ConfluentConsumer") def test_process_message_success(self, mock_consumer_client, mock_commit_offset): class SomeConsumer(Consumer): topics = MagicMock() - msg = Mock(error=Mock(return_value=False)) - + msg = message_mock() consumer = SomeConsumer() consumer.process_message(msg) - # check if msg has error before processing + # checks msg had error before processing msg.error.assert_called_once_with() # Topic.consume called - consumer.get_topic_consumer(msg.topic()).consume.assert_called_once_with(msg) + consumer.get_topic(msg).consume.assert_called_once_with(msg) # commit_offset triggered mock_commit_offset.assert_called_once_with(msg) @patch("django_kafka.consumer.Consumer.log_error") @patch("django_kafka.consumer.Consumer.commit_offset") - @patch("django_kafka.consumer.ConfluentConsumer") + @patch("django_kafka.consumer.consumer.ConfluentConsumer") def test_process_message_msg_error_logged( self, mock_consumer_client, @@ -99,13 +161,12 @@ def test_process_message_msg_error_logged( class SomeConsumer(Consumer): topics = MagicMock() - msg = Mock(error=Mock(return_value=True)) - + msg = message_mock(error=True) consumer = SomeConsumer() consumer.process_message(msg) - # check if msg has error before processing + # checks msg had error before processing msg.error.assert_called_once_with() # error handler was triggered log_error.assert_called_once_with(msg.error.return_value) @@ -114,25 +175,23 @@ class SomeConsumer(Consumer): # Consumer.commit_offset is not called mock_commit_offset.assert_not_called() - @patch("django_kafka.consumer.Consumer.handle_exception") + @patch("django_kafka.consumer.Consumer.handle_exception", return_value=True) @patch("django_kafka.consumer.Consumer.commit_offset") - @patch("django_kafka.consumer.ConfluentConsumer") - def test_process_message_exception( + @patch("django_kafka.consumer.consumer.ConfluentConsumer") + def test_process_message__processed_exception( self, mock_consumer_client, mock_commit_offset, - handle_exception, + mock_handle_exception, ): topic_consume_side_effect = TypeError("test") topic_consumer = Mock( **{ - "name": "topic", "consume.side_effect": topic_consume_side_effect, }, ) - msg = Mock( - **{"topic.return_value": topic_consumer.name, "error.return_value": False}, - ) + topic_consumer.name = "topic" + msg = message_mock(topic_consumer.name) class SomeConsumer(Consumer): topics = Topics(topic_consumer) @@ -141,22 +200,46 @@ class SomeConsumer(Consumer): consumer.process_message(msg) - # check if msg has error before processing - msg.error.assert_called_once_with() - # Topic.consume was triggered - topic_consumer.consume.assert_called_once_with(msg) - # error handler was triggered on exception - handle_exception.assert_called_once_with(msg, topic_consume_side_effect) - # Consumer.commit_offset is not called + mock_handle_exception.assert_called_once_with(msg, topic_consume_side_effect) + # Consumer.commit_offset is called mock_commit_offset.assert_called_once() + @patch("django_kafka.consumer.Consumer.handle_exception", return_value=False) + @patch("django_kafka.consumer.Consumer.commit_offset") + @patch("django_kafka.consumer.consumer.ConfluentConsumer") + def test_process_message__unprocessed_exception( + self, + mock_consumer_client, + mock_commit_offset, + mock_handle_exception, + ): + topic_consume_side_effect = TypeError("test") + topic_consumer = Mock( + **{ + "consume.side_effect": topic_consume_side_effect, + }, + ) + topic_consumer.name = "topic" + msg = message_mock(topic_consumer.name) + + class SomeConsumer(Consumer): + topics = Topics(topic_consumer) + + consumer = SomeConsumer() + + consumer.process_message(msg) + + mock_handle_exception.assert_called_once_with(msg, topic_consume_side_effect) + # Consumer.commit_offset was not called + mock_commit_offset.assert_not_called() + def test_handle_exception(self): - msg = Mock() + msg = message_mock() class SomeConsumer(Consumer): topics = Topics() config = {"group.id": "group_id"} - retry_msg = Mock(return_value=True) # successful retry + retry_msg = Mock(return_value=(True, False)) # successful retry dead_letter_msg = Mock() log_error = Mock() @@ -170,12 +253,12 @@ class SomeConsumer(Consumer): consumer.log_error.assert_not_called() def test_handle_exception__failed_retry(self): - msg = Mock() + msg = message_mock() class SomeConsumer(Consumer): topics = Topics() config = {"group.id": "group_id"} - retry_msg = Mock(return_value=False) # failed retry + retry_msg = Mock(return_value=(False, False)) # failed retry dead_letter_msg = Mock() log_error = Mock() @@ -188,52 +271,153 @@ class SomeConsumer(Consumer): consumer.dead_letter_msg.assert_called_once_with(msg, exc) consumer.log_error.assert_called_once_with(exc) + def test_blocking_retry(self): + retry_time = timezone.now() + retry_settings = RetrySettings(max_retries=5, delay=60, blocking=True) + retry_settings.get_retry_time = Mock(return_value=retry_time) + msg_mock = message_mock() + + class SomeConsumer(Consumer): + topics = Topics() + config = {"group.id": "group_id"} + + consumer = SomeConsumer() + consumer.pause_partition = Mock() + consumer.log_error = Mock() + exc = ValueError() + + retried = consumer.blocking_retry(retry_settings, msg_mock, exc) + + consumer.pause_partition.assert_called_once_with(msg_mock, retry_time) + consumer.log_error.assert_called_once_with(exc) + self.assertEqual(retried, True) + + @patch("django_kafka.consumer.Consumer.log_error") + def test_blocking_retry__maximum_attempts(self, log_error): + retry_settings = RetrySettings(max_retries=2, delay=60, blocking=True) + msg_mock = message_mock() + + class SomeConsumer(Consumer): + topics = Topics() + config = {"group.id": "group_id"} + + consumer = SomeConsumer() + consumer.pause_partition = Mock() + consumer.log_error = Mock() + exc = ValueError() + + retry_1 = consumer.blocking_retry(retry_settings, msg_mock, exc) + retry_2 = consumer.blocking_retry(retry_settings, msg_mock, exc) + retry_3 = consumer.blocking_retry(retry_settings, msg_mock, exc) + + self.assertTrue(retry_1) + self.assertTrue(retry_2) + self.assertFalse(retry_3) + self.assertEqual(consumer.pause_partition.call_count, 2) + self.assertEqual(consumer.log_error.call_count, 2) + @patch("django_kafka.retry.topic.RetryTopicProducer") - def test_retry_msg(self, mock_rt_producer_cls): - mock_topic_consumer = Mock(retry_settings=Mock()) + def test_non_blocking_retry(self, mock_rt_producer_cls): mock_retry = mock_rt_producer_cls.return_value.retry - msg_mock = Mock() + retry_settings = RetrySettings(max_retries=5, delay=60, blocking=False) + msg_mock = message_mock() class SomeConsumer(Consumer): topics = Topics() config = {"group.id": "group_id"} consumer = SomeConsumer() - consumer.get_topic_consumer = Mock(return_value=mock_topic_consumer) exc = ValueError() - retried = consumer.retry_msg(msg_mock, exc) + retried = consumer.non_blocking_retry( + retry_settings, + msg_mock, + exc, + ) mock_rt_producer_cls.assert_called_once_with( + retry_settings=retry_settings, group_id=consumer.group_id, - retry_settings=mock_topic_consumer.retry_settings, msg=msg_mock, ) mock_retry.assert_called_once_with(exc=exc) self.assertEqual(retried, mock_retry.return_value) - @patch("django_kafka.retry.topic.RetryTopicProducer") - def test_retry_msg__no_retry(self, mock_rt_producer_cls): + def test_retry__respects_blocking(self): + mock_topic_consumer = Mock(**{"retry_settings.blocking": True}) + msg_mock = message_mock() + + class SomeConsumer(Consumer): + topics = Topics() + config = {"group.id": "group_id"} + + consumer = SomeConsumer() + consumer.blocking_retry = Mock() + consumer.non_blocking_retry = Mock() + consumer.get_topic = Mock(return_value=mock_topic_consumer) + exc = ValueError() + + retried, blocking = consumer.retry_msg(msg_mock, exc) + + consumer.non_blocking_retry.assert_not_called() + consumer.blocking_retry.assert_called_once_with( + mock_topic_consumer.retry_settings, + msg_mock, + exc, + ) + self.assertEqual(retried, consumer.blocking_retry.return_value) + self.assertEqual(blocking, True) + + def test_retry__respects_non_blocking(self): + mock_topic_consumer = Mock(**{"retry_settings.blocking": False}) + msg_mock = message_mock() + + class SomeConsumer(Consumer): + topics = Topics() + config = {"group.id": "group_id"} + + consumer = SomeConsumer() + consumer.blocking_retry = Mock() + consumer.non_blocking_retry = Mock() + consumer.get_topic = Mock(return_value=mock_topic_consumer) + exc = ValueError() + + retried, blocking = consumer.retry_msg(msg_mock, exc) + + consumer.blocking_retry.assert_not_called() + consumer.non_blocking_retry.assert_called_once_with( + mock_topic_consumer.retry_settings, + msg_mock, + exc, + ) + self.assertEqual(retried, consumer.non_blocking_retry.return_value) + self.assertEqual(blocking, False) + + def test_retry_msg__no_retry(self): mock_topic_consumer = Mock(retry_settings=None) - msg_mock = Mock() + msg_mock = message_mock() class SomeConsumer(Consumer): topics = Topics() config = {"group.id": "group_id"} consumer = SomeConsumer() - consumer.get_topic_consumer = Mock(return_value=mock_topic_consumer) + consumer.blocking_retry = Mock() + consumer.non_blocking_retry = Mock() + consumer.get_topic = Mock(return_value=mock_topic_consumer) exc = ValueError() - retried = consumer.retry_msg(msg_mock, exc) + retried, blocking = consumer.retry_msg(msg_mock, exc) - mock_rt_producer_cls.assert_not_called() + consumer.blocking_retry.assert_not_called() + consumer.non_blocking_retry.assert_not_called() self.assertEqual(retried, False) + self.assertEqual(blocking, False) @patch("django_kafka.dead_letter.topic.DeadLetterTopicProducer") def test_dead_letter_msg(self, mock_dead_letter_topic_cls): mock_produce_for = mock_dead_letter_topic_cls.return_value.produce_for - msg_mock = Mock() + msg_mock = message_mock() class SomeConsumer(Consumer): topics = Topics() @@ -253,19 +437,19 @@ class SomeConsumer(Consumer): header_detail=traceback.format_exc(), ) - @patch("django_kafka.consumer.ConfluentConsumer") + @patch("django_kafka.consumer.consumer.ConfluentConsumer") def test_auto_offset_false(self, mock_consumer_client): class SomeConsumer(Consumer): config = {"enable.auto.offset.store": False} consumer = SomeConsumer() - msg = Mock() + msg = message_mock() consumer.commit_offset(msg) mock_consumer_client.return_value.store_offsets.assert_called_once_with(msg) - @patch("django_kafka.consumer.ConfluentConsumer") + @patch("django_kafka.consumer.consumer.ConfluentConsumer") def test_auto_offset_true(self, mock_consumer_client): class SomeConsumer(Consumer): config = {"enable.auto.offset.store": True} @@ -296,7 +480,7 @@ def test_settings_are_correctly_assigned(self): }, }, ) - @patch("django_kafka.consumer.ConfluentConsumer") + @patch("django_kafka.consumer.consumer.ConfluentConsumer") @patch("django_kafka.error_handlers.ClientErrorHandler") def test_config_merge_override(self, mock_error_handler, mock_consumer_client): """ diff --git a/django_kafka/tests/test_consumer/test_managers.py b/django_kafka/tests/test_consumer/test_managers.py new file mode 100644 index 0000000..1bac3bf --- /dev/null +++ b/django_kafka/tests/test_consumer/test_managers.py @@ -0,0 +1,72 @@ +import datetime + +from django.test import TestCase +from django.utils import timezone + +from django_kafka.consumer.managers import PauseManager, RetryManager +from django_kafka.tests.utils import message_mock + + +class PauseManagerTestCase(TestCase): + def test_get_msg_partition(self): + mock_msg = message_mock() + manager = PauseManager() + + tp = manager.get_msg_partition(mock_msg) + + self.assertEqual(tp.topic, mock_msg.topic()) + self.assertEqual(tp.partition, mock_msg.partition()) + + def test_set(self): + mock_msg = message_mock() + manager = PauseManager() + + tp = manager.set(mock_msg, timezone.now()) + + self.assertEqual(manager.get_msg_partition(mock_msg), tp) + + def test_pop_ready(self): + manager = PauseManager() + mock_msg_1 = message_mock(partition=1) + mock_msg_2 = message_mock(partition=2) + + manager.set(mock_msg_1, timezone.now() - datetime.timedelta(minutes=1)) + manager.set(mock_msg_2, timezone.now() + datetime.timedelta(minutes=1)) + + self.assertEqual( + list(manager.pop_ready()), + [manager.get_msg_partition(mock_msg_1)], + ) + self.assertEqual(list(manager.pop_ready()), []) # empty the second time + + +class RetryManagerTestCase(TestCase): + def test_get_msg_partition(self): + mock_msg = message_mock() + manager = RetryManager() + + tp = manager.get_msg_partition(mock_msg) + + self.assertEqual(tp.topic, mock_msg.topic()) + self.assertEqual(tp.partition, mock_msg.partition()) + self.assertEqual(tp.offset, mock_msg.offset()) + + def test_next(self): + mock_msg = message_mock() + manager = RetryManager() + + self.assertEqual(manager.next(mock_msg), 1) + self.assertEqual(manager.next(mock_msg), 2) + self.assertEqual(manager.next(mock_msg), 3) + + def test_next__resets_for_new_offset(self): + """tests retry attempt resets if requested for a new offset""" + mock_msg = message_mock(offset=0) + manager = RetryManager() + + self.assertEqual(manager.next(mock_msg), 1) + + mock_msg.offset.return_value = 1 + + self.assertEqual(manager.next(mock_msg), 1) + self.assertEqual(manager.next(mock_msg), 2) diff --git a/django_kafka/tests/test_consumer/test_topics.py b/django_kafka/tests/test_consumer/test_topics.py new file mode 100644 index 0000000..3784c67 --- /dev/null +++ b/django_kafka/tests/test_consumer/test_topics.py @@ -0,0 +1,21 @@ +from unittest.mock import Mock + +from django.test import TestCase + +from django_kafka.consumer import Topics +from django_kafka.retry.settings import RetrySettings + + +class TopicsTestCase(TestCase): + def test_get_retryable(self): + blocking_tc = Mock( + retry_settings=RetrySettings(delay=1, max_retries=1, blocking=True), + ) + non_blocking_tc = Mock( + retry_settings=RetrySettings(delay=1, max_retries=1, blocking=False), + ) + + topics = Topics(blocking_tc, non_blocking_tc) + + self.assertEqual(topics.get_retryable(blocking=True), [blocking_tc]) + self.assertEqual(topics.get_retryable(blocking=False), [non_blocking_tc]) diff --git a/django_kafka/tests/retry/__init__.py b/django_kafka/tests/test_dead_letter/__init__.py similarity index 100% rename from django_kafka/tests/retry/__init__.py rename to django_kafka/tests/test_dead_letter/__init__.py diff --git a/django_kafka/tests/dead_letter/test_topic.py b/django_kafka/tests/test_dead_letter/test_topic.py similarity index 95% rename from django_kafka/tests/dead_letter/test_topic.py rename to django_kafka/tests/test_dead_letter/test_topic.py index e8593fe..c4f4851 100644 --- a/django_kafka/tests/dead_letter/test_topic.py +++ b/django_kafka/tests/test_dead_letter/test_topic.py @@ -5,6 +5,7 @@ from django_kafka.conf import SETTINGS_KEY from django_kafka.dead_letter.header import DeadLetterHeader from django_kafka.dead_letter.topic import DeadLetterTopicProducer +from django_kafka.tests.utils import message_mock @override_settings( @@ -36,7 +37,7 @@ def test_name(self): self.assertEqual(dlt_producer_2.name, "group.id.topic.name.test-dlt") def test_produce_for(self): - msg_mock = mock.Mock(**{"topic.return_value": "msg_topic"}) + msg_mock = message_mock() dlt_producer = DeadLetterTopicProducer(group_id="group.id", msg=msg_mock) dlt_producer.produce = mock.Mock() header_message = "header message" diff --git a/django_kafka/tests/topic/__init__.py b/django_kafka/tests/test_retry/__init__.py similarity index 100% rename from django_kafka/tests/topic/__init__.py rename to django_kafka/tests/test_retry/__init__.py diff --git a/django_kafka/tests/retry/test_consumer.py b/django_kafka/tests/test_retry/test_consumer.py similarity index 62% rename from django_kafka/tests/retry/test_consumer.py rename to django_kafka/tests/test_retry/test_consumer.py index 361fa93..c2de6b2 100644 --- a/django_kafka/tests/retry/test_consumer.py +++ b/django_kafka/tests/test_retry/test_consumer.py @@ -3,7 +3,6 @@ from typing import Type from unittest.mock import Mock, patch -from confluent_kafka import TopicPartition from django.test import TestCase, override_settings from django.utils import timezone @@ -12,6 +11,7 @@ from django_kafka.retry.consumer import RetryConsumer, RetryTopics from django_kafka.retry.header import RetryHeader from django_kafka.retry.settings import RetrySettings +from django_kafka.tests.utils import message_mock from django_kafka.topic import TopicConsumer @@ -26,7 +26,7 @@ def _get_retryable_topic_consumer(self): class RetryableTopicConsumer(TopicConsumer): name = "retry_topic" - retry = RetrySettings(max_retries=5, delay=60) + retry = RetrySettings(max_retries=5, delay=60, blocking=False) retry(RetryableTopicConsumer) return RetryableTopicConsumer() @@ -57,7 +57,7 @@ def _get_retry_consumer(self, consumer_group_id="group_id") -> RetryConsumer: }, }, ) - @patch("django_kafka.consumer.ConfluentConsumer") + @patch("django_kafka.consumer.consumer.ConfluentConsumer") @patch("django_kafka.retry.consumer.Consumer.build_config") def test_config_merge_override( self, @@ -110,7 +110,7 @@ def test_build(self): ) self.assertIsInstance(retry_consumer_cls.topics, RetryTopics) self.assertCountEqual( - [t for t in consumer_cls.topics if t.retry_settings], + list(consumer_cls.topics.get_retryable(blocking=False)), [t.topic_consumer for t in retry_consumer_cls.topics], ) @@ -124,26 +124,27 @@ def test_retry_msg(self): mock_retry_topic_consumer = Mock() mock_producer_for = mock_retry_topic_consumer.producer_for mock_retry = mock_producer_for.return_value.retry - msg_mock = Mock() + msg_mock = message_mock() retry_consumer = self._get_retry_consumer() - retry_consumer.get_topic_consumer = Mock(return_value=mock_retry_topic_consumer) + retry_consumer.get_topic = Mock(return_value=mock_retry_topic_consumer) exc = ValueError() - retried = retry_consumer.retry_msg(msg_mock, exc) + retried, blocking = retry_consumer.retry_msg(msg_mock, exc) mock_producer_for.assert_called_once_with(msg_mock) mock_retry.assert_called_once_with(exc) self.assertEqual(retried, mock_retry.return_value) + self.assertEqual(blocking, False) @patch("django_kafka.retry.consumer.DeadLetterTopicProducer") def test_dead_letter_msg(self, mock_dlt_topic_producer_cls): mock_retry_topic_consumer = Mock() mock_produce_for = mock_dlt_topic_producer_cls.return_value.produce_for - msg_mock = Mock() + msg_mock = message_mock() retry_consumer = self._get_retry_consumer() - retry_consumer.get_topic_consumer = Mock(return_value=mock_retry_topic_consumer) + retry_consumer.get_topic = Mock(return_value=mock_retry_topic_consumer) exc = ValueError() retry_consumer.dead_letter_msg(msg_mock, exc) @@ -157,82 +158,8 @@ def test_dead_letter_msg(self, mock_dlt_topic_producer_cls): header_detail=traceback.format_exc(), ) - @patch("django_kafka.consumer.ConfluentConsumer") - def test_pause_partition(self, mock_confluent_consumer): - retry_consumer = self._get_retry_consumer() - mock_msg = Mock( - **{ - "topic.return_value": "msg_topic", - "partition.return_value": 0, - "offset.return_value": 0, - }, - ) - partition = TopicPartition( - mock_msg.topic(), - mock_msg.partition(), - mock_msg.offset(), - ) - retry_time = timezone.now() - - retry_consumer.pause_partition(mock_msg, retry_time) - - mock_confluent_consumer.return_value.seek.assert_called_once_with(partition) - mock_confluent_consumer.return_value.pause.assert_called_once_with([partition]) - - @patch("django_kafka.consumer.ConfluentConsumer") - def test_resume_partition__before_retry_time(self, mock_confluent_consumer): - retry_consumer = self._get_retry_consumer() - mock_msg = Mock( - **{ - "topic.return_value": "msg_topic", - "partition.return_value": 0, - "offset.return_value": 0, - }, - ) - retry_time = timezone.now() + datetime.timedelta(minutes=1) - - retry_consumer.pause_partition(mock_msg, retry_time) - retry_consumer.resume_ready_partitions() - - mock_confluent_consumer.return_value.resume.assert_not_called() - - @patch("django_kafka.consumer.ConfluentConsumer") - def test_resume_ready_partitions__after_retry_time(self, mock_confluent_consumer): - retry_consumer = self._get_retry_consumer() - mock_msg = Mock( - **{ - "topic.return_value": "msg_topic", - "partition.return_value": 0, - "offset.return_value": 0, - }, - ) - partition = TopicPartition( - mock_msg.topic(), - mock_msg.partition(), - mock_msg.offset(), - ) - retry_time = timezone.now() - datetime.timedelta(minutes=1) - - retry_consumer.pause_partition(mock_msg, retry_time) - retry_consumer.resume_ready_partitions() - - mock_confluent_consumer.return_value.resume.assert_called_once_with([partition]) - - @patch("django_kafka.consumer.ConfluentConsumer") - def test_poll(self, mock_confluent_consumer): - """tests poll resumes partitions""" - retry_consumer = self._get_retry_consumer() - retry_consumer.resume_ready_partitions = Mock() - mock_msg = Mock() - mock_confluent_consumer.return_value.poll.return_value = mock_msg - - msg = retry_consumer.poll() - - self.assertEqual(msg, mock_msg) - retry_consumer.resume_ready_partitions.assert_called_once() # always called - @patch("django_kafka.consumer.Consumer.process_message") - @patch("django_kafka.consumer.ConfluentConsumer") + @patch("django_kafka.consumer.consumer.ConfluentConsumer") def test_process_message__before_retry_time( self, mock_confluent_consumer, @@ -241,13 +168,10 @@ def test_process_message__before_retry_time( retry_consumer = self._get_retry_consumer() retry_consumer.pause_partition = Mock() retry_time = timezone.now() + datetime.timedelta(minutes=1) - mock_msg = Mock( - **{ - "error.return_value": None, - "headers.return_value": [ - (RetryHeader.TIMESTAMP, str(retry_time.timestamp())), - ], - }, + mock_msg = message_mock( + headers=[ + (RetryHeader.TIMESTAMP, str(retry_time.timestamp())), + ], ) retry_consumer.process_message(mock_msg) @@ -255,7 +179,7 @@ def test_process_message__before_retry_time( mock_consumer_process_message.process_message.assert_not_called() @patch("django_kafka.consumer.Consumer.process_message") - @patch("django_kafka.consumer.ConfluentConsumer") + @patch("django_kafka.consumer.consumer.ConfluentConsumer") def test_process_message__after_retry_time( self, mock_confluent_consumer, @@ -264,13 +188,10 @@ def test_process_message__after_retry_time( retry_consumer = self._get_retry_consumer() retry_consumer.pause_partition = Mock() retry_time = timezone.now() - datetime.timedelta(minutes=1) - mock_msg = Mock( - **{ - "error.return_value": None, - "headers.return_value": [ - (RetryHeader.TIMESTAMP, str(retry_time.timestamp())), - ], - }, + mock_msg = message_mock( + headers=[ + (RetryHeader.TIMESTAMP, str(retry_time.timestamp())), + ], ) retry_consumer.process_message(mock_msg) diff --git a/django_kafka/tests/retry/test_headers.py b/django_kafka/tests/test_retry/test_headers.py similarity index 73% rename from django_kafka/tests/retry/test_headers.py rename to django_kafka/tests/test_retry/test_headers.py index bf01038..a0ef320 100644 --- a/django_kafka/tests/retry/test_headers.py +++ b/django_kafka/tests/test_retry/test_headers.py @@ -5,12 +5,6 @@ class RetryHeaderTestCase(TestCase): - def test_get_header(self): - headers = [(RetryHeader.TIMESTAMP, "abc")] - - self.assertEqual(RetryHeader.get_header(headers, RetryHeader.TIMESTAMP), "abc") - self.assertEqual(RetryHeader.get_header([], RetryHeader.TIMESTAMP), None) - def test_get_retry_time(self): now = timezone.now() headers = [(RetryHeader.TIMESTAMP, str(now.timestamp()))] diff --git a/django_kafka/tests/test_retry/test_retry.py b/django_kafka/tests/test_retry/test_retry.py new file mode 100644 index 0000000..0200287 --- /dev/null +++ b/django_kafka/tests/test_retry/test_retry.py @@ -0,0 +1,95 @@ +import datetime +from unittest import mock + +from django.test import TestCase + +from django_kafka.retry.settings import RetrySettings +from django_kafka.topic import Topic + + +class RetrySettingTestCase(TestCase): + def test_should_retry__include(self): + settings = RetrySettings(max_retries=5, delay=60, include=[ValueError]) + + self.assertEqual(settings.can_retry(attempt=0, exc=ValueError()), True) + self.assertEqual(settings.can_retry(attempt=0, exc=IndexError()), False) + + def test_should_retry__exclude(self): + settings = RetrySettings(max_retries=5, delay=60, exclude=[ValueError]) + + self.assertEqual(settings.can_retry(attempt=0, exc=ValueError()), False) + self.assertEqual(settings.can_retry(attempt=0, exc=IndexError()), True) + + def test_should_retry__no_include_exclude(self): + settings = RetrySettings(max_retries=5, delay=60) + + self.assertEqual(settings.can_retry(attempt=0, exc=ValueError()), True) + + def test_attempts_exceeded(self): + settings = RetrySettings(max_retries=5, delay=60) + + self.assertEqual(settings.attempts_exceeded(6), True) + self.assertEqual(settings.attempts_exceeded(5), False) + self.assertEqual(settings.attempts_exceeded(4), False) + + @mock.patch("django.utils.timezone.now") + def test_get_retry_time__fixed_delay(self, mock_now): + settings = RetrySettings(max_retries=5, delay=60, backoff=False) + now = datetime.datetime.fromtimestamp(1000, datetime.UTC) + mock_now.return_value = now + + self.assertEqual( + settings.get_retry_time(1), + now + datetime.timedelta(seconds=60), + ) + self.assertEqual( + settings.get_retry_time(2), + now + datetime.timedelta(seconds=60), + ) + self.assertEqual( + settings.get_retry_time(3), + now + datetime.timedelta(seconds=60), + ) + self.assertEqual( + settings.get_retry_time(4), + now + datetime.timedelta(seconds=60), + ) + + @mock.patch("django.utils.timezone.now") + def test_get_retry_time__backoff_delay(self, mock_now): + settings = RetrySettings(max_retries=5, delay=60, backoff=True) + now = datetime.datetime.fromtimestamp(1000, datetime.UTC) + mock_now.return_value = now + + self.assertEqual( + settings.get_retry_time(1), + now + datetime.timedelta(seconds=60), + ) + self.assertEqual( + settings.get_retry_time(2), + now + datetime.timedelta(seconds=120), + ) + self.assertEqual( + settings.get_retry_time(3), + now + datetime.timedelta(seconds=240), + ) + self.assertEqual( + settings.get_retry_time(4), + now + datetime.timedelta(seconds=480), + ) + + +class RetryDecoratorTestCase(TestCase): + def test_retry_decorator(self): + class TopicA(Topic): + pass + + retry = RetrySettings(max_retries=5, delay=60) + result = retry(TopicA) + + self.assertIs(result, TopicA) + self.assertIsNotNone(TopicA.retry_settings) + self.assertEqual( + [TopicA.retry_settings.max_retries, TopicA.retry_settings.delay], + [5, 60], + ) diff --git a/django_kafka/tests/retry/test_topic.py b/django_kafka/tests/test_retry/test_topic.py similarity index 82% rename from django_kafka/tests/retry/test_topic.py rename to django_kafka/tests/test_retry/test_topic.py index 227c03a..e2a3c46 100644 --- a/django_kafka/tests/retry/test_topic.py +++ b/django_kafka/tests/test_retry/test_topic.py @@ -1,6 +1,7 @@ from unittest import mock from django.test import TestCase, override_settings +from django.utils import timezone from django_kafka.conf import SETTINGS_KEY from django_kafka.exceptions import DjangoKafkaError @@ -10,6 +11,7 @@ RetryTopicConsumer, RetryTopicProducer, ) +from django_kafka.tests.utils import message_mock from django_kafka.topic import TopicConsumer @@ -39,7 +41,7 @@ def test__get_next_attempt(self): ) def test_init(self): - retry_settings = RetrySettings(max_retries=5, delay=60) + retry_settings = RetrySettings(max_retries=5, delay=60, blocking=False) mock_msg_topic_consumer = mock.Mock(**{"topic.return_value": "topic.name"}) rt_producer = RetryTopicProducer( @@ -54,7 +56,7 @@ def test_init(self): self.assertEqual(rt_producer.attempt, 1) def test_name(self): - retry_settings = RetrySettings(max_retries=5, delay=60) + retry_settings = RetrySettings(max_retries=5, delay=60, blocking=False) mock_msg_topic_consumer = mock.Mock(**{"topic.return_value": "topic.name"}) mock_msg_rt_producer = mock.Mock( **{ @@ -79,7 +81,7 @@ def test_name(self): @override_settings(**{SETTINGS_KEY: {"RETRY_TOPIC_SUFFIX": "test-retry"}}) def test_name__uses_settings(self): - retry_settings = RetrySettings(max_retries=5, delay=60) + retry_settings = RetrySettings(max_retries=5, delay=60, blocking=False) mock_msg_topic_consumer = mock.Mock(**{"topic.return_value": "topic.name"}) rt_producer = RetryTopicProducer( @@ -90,10 +92,12 @@ def test_name__uses_settings(self): self.assertEqual(rt_producer.name, "group.id.topic.name.test-retry.1") - @mock.patch("django_kafka.retry.settings.RetrySettings.get_retry_timestamp") - def test_retry__first_retry(self, mock_get_retry_timestamp: mock.Mock): - mock_msg = mock.Mock(**{"topic.return_value": "msg_topic"}) - retry_settings = RetrySettings(max_retries=5, delay=60) + @mock.patch("django_kafka.retry.settings.RetrySettings.get_retry_time") + def test_retry__first_retry(self, mock_get_retry_time): + retry_time = timezone.now() + mock_get_retry_time.return_value = retry_time + mock_msg = message_mock() + retry_settings = RetrySettings(max_retries=5, delay=60, blocking=False) rt_producer = RetryTopicProducer( group_id="group.id", retry_settings=retry_settings, @@ -109,19 +113,19 @@ def test_retry__first_retry(self, mock_get_retry_timestamp: mock.Mock): value=mock_msg.value(), headers=[ (RetryHeader.MESSAGE, "error message"), - (RetryHeader.TIMESTAMP, mock_get_retry_timestamp.return_value), + (RetryHeader.TIMESTAMP, str(retry_time.timestamp())), ], ) - mock_get_retry_timestamp.assert_called_once_with(1) + mock_get_retry_time.assert_called_once_with(1) - @mock.patch("django_kafka.retry.settings.RetrySettings.get_retry_timestamp") - def test_retry__last_retry(self, mock_get_retry_timestamp): - mock_msg = mock.Mock( - **{"topic.return_value": "group.id.msg_topic.test-retry.4"}, - ) + @mock.patch("django_kafka.retry.settings.RetrySettings.get_retry_time") + def test_retry__last_retry(self, mock_get_retry_time): + retry_time = timezone.now() + mock_get_retry_time.return_value = retry_time + mock_msg = message_mock(topic="group.id.msg_topic.test-retry.4") rt_producer = RetryTopicProducer( group_id="group.id", - retry_settings=RetrySettings(max_retries=5, delay=60), + retry_settings=RetrySettings(max_retries=5, delay=60, blocking=False), msg=mock_msg, ) rt_producer.produce = mock.Mock() @@ -134,16 +138,16 @@ def test_retry__last_retry(self, mock_get_retry_timestamp): value=mock_msg.value(), headers=[ (RetryHeader.MESSAGE, "error message"), - (RetryHeader.TIMESTAMP, mock_get_retry_timestamp.return_value), + (RetryHeader.TIMESTAMP, str(retry_time.timestamp())), ], ) - mock_get_retry_timestamp.assert_called_once_with(5) + mock_get_retry_time.assert_called_once_with(5) def test_retry__no_more_retries(self): rt_producer = RetryTopicProducer( group_id="group.id", - retry_settings=RetrySettings(max_retries=5, delay=60), - msg=mock.Mock(**{"topic.return_value": "group.id.msg_topic.test-retry.5"}), + retry_settings=RetrySettings(max_retries=5, delay=60, blocking=False), + msg=message_mock(topic="group.id.msg_topic.test-retry.5"), ) rt_producer.produce = mock.Mock() @@ -155,8 +159,13 @@ def test_retry__no_more_retries(self): def test_retry__no_retry_excluded_error(self): rt_producer = RetryTopicProducer( group_id="group.id", - retry_settings=RetrySettings(max_retries=5, delay=60, exclude=[ValueError]), - msg=mock.Mock(**{"topic.return_value": "msg_topic"}), + retry_settings=RetrySettings( + max_retries=5, + delay=60, + exclude=[ValueError], + blocking=False, + ), + msg=message_mock(), ) rt_producer.produce = mock.Mock() @@ -176,7 +185,9 @@ def _get_retryable_topic_consumer( class SomeTopicConsumer(TopicConsumer): name = topic_name - retry = RetrySettings(**{"max_retries": 5, "delay": 60, **retry_kwargs}) + retry = RetrySettings( + **{"max_retries": 5, "delay": 60, **retry_kwargs, "blocking": False}, + ) topic_consumer_cls = retry(SomeTopicConsumer) return topic_consumer_cls() @@ -219,7 +230,7 @@ def test_consume(self): """tests RetryTopic uses the main topic consume method""" topic_consumer = self._get_retryable_topic_consumer() topic_consumer.consume = mock.Mock() - mock_msg = mock.Mock() + mock_msg = message_mock() RetryTopicConsumer(group_id="group.id", topic_consumer=topic_consumer).consume( mock_msg, @@ -230,7 +241,7 @@ def test_consume(self): @mock.patch("django_kafka.retry.topic.RetryTopicProducer") def test_producer_for(self, mock_rt_producer): topic_consumer = self._get_retryable_topic_consumer() - mock_msg = mock.Mock() + mock_msg = message_mock() rt_consumer = RetryTopicConsumer( group_id="group.id", @@ -240,7 +251,7 @@ def test_producer_for(self, mock_rt_producer): self.assertEqual(rt_producer, mock_rt_producer.return_value) mock_rt_producer.assert_called_once_with( - group_id=rt_consumer.group_id, retry_settings=topic_consumer.retry_settings, + group_id=rt_consumer.group_id, msg=mock_msg, ) diff --git a/django_kafka/tests/test_settings.py b/django_kafka/tests/test_settings.py index 65f7b33..944fff8 100644 --- a/django_kafka/tests/test_settings.py +++ b/django_kafka/tests/test_settings.py @@ -1,6 +1,6 @@ from unittest.mock import patch -from django.test import override_settings, SimpleTestCase +from django.test import SimpleTestCase, override_settings from django_kafka.conf import DEFAULTS, SETTINGS_KEY, settings @@ -24,13 +24,13 @@ class SettingsTestCase(SimpleTestCase): "CONNECTOR_NAME_PREFIX", ) - @patch("django_kafka.consumer.ConfluentConsumer") + @patch("django_kafka.consumer.consumer.ConfluentConsumer") def test_defaults(self, mock_consumer_client): # make sure defaults are assigned for key in self.settings_keys: self.assertEqual(getattr(settings, key), DEFAULTS[key]) - @patch("django_kafka.consumer.ConfluentConsumer") + @patch("django_kafka.consumer.consumer.ConfluentConsumer") def test_user_settings(self, mock_consumer_client): # make sure settings defined by user pulled up user_settings = { @@ -62,7 +62,7 @@ def test_user_settings(self, mock_consumer_client): "status_forcelist": [502, 503, 504], }, "CONNECT_REQUESTS_TIMEOUT": 60, - "CONNECTOR_NAME_PREFIX": "project_name" + "CONNECTOR_NAME_PREFIX": "project_name", } with override_settings(**{SETTINGS_KEY: user_settings}): for key in self.settings_keys: diff --git a/django_kafka/tests/test_topic/__init__.py b/django_kafka/tests/test_topic/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/django_kafka/tests/topic/test_avro.py b/django_kafka/tests/test_topic/test_avro.py similarity index 100% rename from django_kafka/tests/topic/test_avro.py rename to django_kafka/tests/test_topic/test_avro.py diff --git a/django_kafka/tests/topic/test_debezium.py b/django_kafka/tests/test_topic/test_debezium.py similarity index 100% rename from django_kafka/tests/topic/test_debezium.py rename to django_kafka/tests/test_topic/test_debezium.py diff --git a/django_kafka/tests/topic/test_model.py b/django_kafka/tests/test_topic/test_model.py similarity index 100% rename from django_kafka/tests/topic/test_model.py rename to django_kafka/tests/test_topic/test_model.py diff --git a/django_kafka/tests/topic/test_topic.py b/django_kafka/tests/test_topic/test_topic.py similarity index 100% rename from django_kafka/tests/topic/test_topic.py rename to django_kafka/tests/test_topic/test_topic.py diff --git a/django_kafka/tests/topic/test_transforms.py b/django_kafka/tests/test_topic/test_transforms.py similarity index 100% rename from django_kafka/tests/topic/test_transforms.py rename to django_kafka/tests/test_topic/test_transforms.py diff --git a/django_kafka/tests/test_utils/__init__.py b/django_kafka/tests/test_utils/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/django_kafka/tests/test_utils/test_message.py b/django_kafka/tests/test_utils/test_message.py new file mode 100644 index 0000000..3f651ad --- /dev/null +++ b/django_kafka/tests/test_utils/test_message.py @@ -0,0 +1,17 @@ +from django.test import TestCase + +from django_kafka.utils.message import Header + + +class HeaderTestCase(TestCase): + def test_get(self): + headers = [("header", "abc"), ("header", "def")] + + self.assertEqual(Header.get(headers, "header"), "abc") + self.assertEqual(Header.get(None, "header"), None) + + def test_list(self): + headers = [("header", "abc"), ("header", "def")] + + self.assertEqual(Header.list(headers, "header"), ["abc", "def"]) + self.assertEqual(Header.list(None, "header"), []) diff --git a/django_kafka/tests/utils.py b/django_kafka/tests/utils.py new file mode 100644 index 0000000..5a8e144 --- /dev/null +++ b/django_kafka/tests/utils.py @@ -0,0 +1,14 @@ +from unittest.mock import Mock + + +def message_mock(topic="topic", partition=0, offset=0, error=None, headers=None): + """mocking utility for confluent_kafka.cimpl.Message""" + return Mock( + **{ + "topic.return_value": topic, + "partition.return_value": partition, + "offset.return_value": offset, + "headers.return_value": headers, + "error.return_value": error, + }, + ) diff --git a/django_kafka/topic/__init__.py b/django_kafka/topic/__init__.py index aa370f1..bbe1c2b 100644 --- a/django_kafka/topic/__init__.py +++ b/django_kafka/topic/__init__.py @@ -13,8 +13,8 @@ ) from django_kafka import kafka +from django_kafka.conf import settings from django_kafka.exceptions import DjangoKafkaError -from django_kafka.retry.settings import RetrySettings if TYPE_CHECKING: from confluent_kafka import cimpl @@ -95,8 +95,7 @@ def produce(self, value: any, **kwargs): class TopicConsumer(ABC): key_deserializer: Type[Deserializer] = StringDeserializer value_deserializer: Type[Deserializer] = StringDeserializer - - retry_settings: Optional["RetrySettings"] = None + retry_settings = settings.RETRY_SETTINGS @property @abstractmethod diff --git a/django_kafka/utils/__init__.py b/django_kafka/utils/__init__.py new file mode 100644 index 0000000..db16946 --- /dev/null +++ b/django_kafka/utils/__init__.py @@ -0,0 +1,5 @@ +from .retry import retry + +__all__ = [ + "retry", +] diff --git a/django_kafka/utils/message.py b/django_kafka/utils/message.py new file mode 100644 index 0000000..7747834 --- /dev/null +++ b/django_kafka/utils/message.py @@ -0,0 +1,19 @@ +from typing import Optional + +MessageHeaders = list[tuple[str, str]] + + +class Header: + @classmethod + def list(cls, headers: Optional[MessageHeaders], header: str) -> list[str]: + """returns all occurrences for the header""" + if headers is not None: + return [v for k, v in headers if k == header] + return [] + + @classmethod + def get(cls, headers: Optional[MessageHeaders], header: str) -> Optional[str]: + """returns the first encountered value for the header""" + if headers is not None: + return next((v for k, v in headers if k == header), None) + return None diff --git a/django_kafka/utils/retry.py b/django_kafka/utils/retry.py new file mode 100644 index 0000000..422f374 --- /dev/null +++ b/django_kafka/utils/retry.py @@ -0,0 +1,28 @@ +import time +from functools import wraps +from typing import Type + + +def retry( + exceptions: tuple[Type[Exception]] = (Exception,), + tries: int = -1, + delay: int = 0, + backoff: int = 1, +): + def decorator(f): + @wraps(f) + def wrapper(*args, **kwargs): + _tries = tries + _delay = delay + + while _tries: + try: + return f(*args, **kwargs) + except exceptions: + _tries -= 1 + if not _tries: + raise + time.sleep(_delay) + _delay *= backoff + return wrapper + return decorator diff --git a/pyproject.toml b/pyproject.toml index ee95aa4..1f7aa2b 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -4,7 +4,7 @@ build-backend = "setuptools.build_meta" [project] name = "django-kafka" -version = "0.5.8" +version = "0.5.9" dependencies = [ "django>=4.2,<6.0", "confluent-kafka[avro, schema-registry]==2.6.0"