Skip to content

Commit

Permalink
Merge pull request #41 from RegioHelden/23-blocking-retries
Browse files Browse the repository at this point in the history
23 blocking retries
  • Loading branch information
stefan-cardnell-rh authored Nov 28, 2024
2 parents 9124954 + 50a6681 commit 3239e17
Show file tree
Hide file tree
Showing 50 changed files with 908 additions and 393 deletions.
2 changes: 1 addition & 1 deletion .bumpversion.toml
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
[tool.bumpversion]
current_version = "0.5.8"
current_version = "0.5.9"
parse = "(?P<major>\\d+)\\.(?P<minor>\\d+)\\.(?P<patch>\\d+)"
serialize = ["{major}.{minor}.{patch}"]
search = "{current_version}"
Expand Down
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
# Changelog

## 0.5.9 (2024-11-28)
* Add `RETRY_SETTINGS` configuration setting.
* Add blocking retry behaviour and make it the default.

## 0.5.8 (2024-11-19)
* `ModelTopicConsumer.get_defaults` will skip fields not defined on the model.

Expand Down
51 changes: 47 additions & 4 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -178,9 +178,13 @@ A few notes:
2. [Deletions](https://debezium.io/documentation/reference/stable/transformations/event-flattening.html#extract-new-record-state-delete-tombstone-handling-mode) are detected automatically based on a null message value or the presence of a `__deleted` field.
3. The message key is assumed to contain the model PK as a field, [which is the default behaviour](https://debezium.io/documentation/reference/stable/connectors/postgresql.html#postgresql-property-message-key-columns) for Debezium source connectors. If you need more complicated lookup behaviour, override `get_lookup_kwargs`.

## Non-Blocking Retries:
## Dead Letter Topic:

Add non-blocking retry behaviour to a topic by using the `retry` decorator:
Any message which fails to consume will be sent to the dead letter topic. The dead letter topic name is combined of the consumer group id, the original topic name, and a `.dlt` suffix (controllable with the `DEAD_LETTER_TOPIC_SUFFIX` setting). So for a failed message in `topic` received by consumer `group`, the dead letter topic name would be `group.topic.dlt`.

## Retries:

Add retry behaviour to a topic by using the `retry` decorator:

```python
from django_kafka import kafka
Expand All @@ -193,15 +197,25 @@ class RetryableTopic(Topic):
...
```

When the consumption of a message in a retryable topic fails, the message is re-sent to a topic with a name combined of the consumer group id, the original topic name, a `.retry` suffix, and the retry number. Subsequent failed retries will then be sent to retry topics of incrementing retry number until the maximum attempts are reached, after which it will be sent to a dead letter topic suffixed by `.dlt`. So for a failed message in topic `topic` received by consumer group `group`, the expected topic sequence would be:
You can also configure retry behaviour globally for all topics with the `RETRY_SETTINGS` configuration (see [settings](#settings)).

Retries can be either blocking or non-blocking, controlled by the `blocking` boolean parameter. By default, all retries are blocking.

### Blocking Retries:

When the consumption of a message fails in a blocking retryable topic, the consumer process will pause the partition and retry the message at a later time. Therefore, messages in that partition will be blocked until the failing message succeeds or the maximum retry attempts are reached, after which the message is sent to the dead letter topic.

### Non-blocking Retries:

When the consumption of a message fails in a non-blocking retryable topic, the message is re-sent to a topic with a name combined of the consumer group id, the original topic name, a `.retry` suffix (controllable with the `RETRY_TOPIC_SUFFIX` setting), and the retry number. Subsequent failed retries will then be sent to retry topics of incrementing retry number until the maximum attempts are reached, after which it will be sent to a dead letter topic. So for a failed message in topic `topic`, with a maximum retry attempts of 3 and received by consumer group `group`, the expected topic sequence would be:

1. `topic`
2. `group.topic.retry.1`
3. `group.topic.retry.2`
4. `group.topic.retry.3`
5. `group.topic.dlt`

When consumers are started using [start commands](#start-the-Consumers), an additional retry consumer will be started in parallel for any consumer containing a retryable topic. This retry consumer will be assigned to a consumer group whose id is a combination of the original group id and a `.retry` suffix. This consumer is subscribed to the retry topics, and manages the message retry and delay behaviour. Please note that messages are retried directly by the retry consumer and are not sent back to the original topic.
When consumers are started using [start commands](#start-the-Consumers), an additional retry consumer will be started in parallel for any consumer containing a non-blocking retryable topic. This retry consumer will be assigned to a consumer group whose id is a combination of the original group id and a `.retry` suffix. This consumer is subscribed to the retry topics, and manages the message retry and delay behaviour. Please note that messages are retried directly by the retry consumer and are not sent back to the original topic.

## Connectors:

Expand Down Expand Up @@ -282,6 +296,7 @@ DJANGO_KAFKA = {
"enable.auto.offset.store": False,
"topic.metadata.refresh.interval.ms": 10000,
},
"RETRY_SETTINGS": None,
"RETRY_TOPIC_SUFFIX": "retry",
"DEAD_LETTER_TOPIC_SUFFIX": "dlt",
"POLLING_FREQUENCY": 1, # seconds
Expand Down Expand Up @@ -326,6 +341,34 @@ Default: `{}`
Defines configurations of the consumer. See [configs marked with `C`](https://github.com/confluentinc/librdkafka/blob/master/CONFIGURATION.md).
#### `RETRY_CONSUMER_CONFIG`
Default:
```py
{
"auto.offset.reset": "earliest",
"enable.auto.offset.store": False,
"topic.metadata.refresh.interval.ms": 10000,
}
```
Defines configuration for the retry consumer. See [Non-blocking retries](#non-blocking-retries).
#### `RETRY_TOPIC_SUFFIX`
Default: `retry`
Defines the retry topic suffix. See [Non-blocking retries](#non-blocking-retries).
#### `RETRY_SETTINGS`
Default: `None`
Defines the default retry settings. See [retries](#retries).
#### `DEAD_LETTER_TOPIC_SUFFIX`
Default: `dlt`
Defines the dead letter topic suffix. See [Dead Letter Topic](#dead-letter-topic).
#### `POLLING_FREQUENCY`
Default: 1 # second
Expand Down
2 changes: 1 addition & 1 deletion django_kafka/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@

logger = logging.getLogger(__name__)

__version__ = "0.5.8"
__version__ = "0.5.9"

__all__ = [
"autodiscover",
Expand Down
1 change: 1 addition & 0 deletions django_kafka/conf.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
"enable.auto.offset.store": False,
"topic.metadata.refresh.interval.ms": 10000,
},
"RETRY_SETTINGS": None,
"RETRY_TOPIC_SUFFIX": "retry",
"DEAD_LETTER_TOPIC_SUFFIX": "dlt",
"POLLING_FREQUENCY": 1, # seconds
Expand Down
7 changes: 7 additions & 0 deletions django_kafka/consumer/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
from .consumer import Consumer
from .topics import Topics

__all__ = [
"Consumer",
"Topics",
]
132 changes: 88 additions & 44 deletions django_kafka/consumer.py → django_kafka/consumer/consumer.py
Original file line number Diff line number Diff line change
@@ -1,45 +1,25 @@
import logging
import traceback
from datetime import datetime
from pydoc import locate
from typing import TYPE_CHECKING, Optional

from confluent_kafka import Consumer as ConfluentConsumer
from confluent_kafka import cimpl

from django_kafka.conf import settings
from django_kafka.exceptions import DjangoKafkaError

from .managers import PauseManager, RetryManager
from .topics import Topics

if TYPE_CHECKING:
from confluent_kafka import cimpl

from django_kafka.retry.settings import RetrySettings
from django_kafka.topic import TopicConsumer

logger = logging.getLogger(__name__)


class Topics:
_topic_consumers: tuple["TopicConsumer", ...]
_match: dict[str, "TopicConsumer"]

def __init__(self, *topic_consumers: "TopicConsumer"):
self._topic_consumers = topic_consumers
self._match: dict[str, TopicConsumer] = {}

def get(self, topic_name: str) -> "TopicConsumer":
if topic_name not in self._match:
topic_consumer = next((t for t in self if t.matches(topic_name)), None)
if not topic_consumer:
raise DjangoKafkaError(f"No topic registered for `{topic_name}`")
self._match[topic_name] = topic_consumer

return self._match[topic_name]

@property
def names(self) -> list[str]:
return [topic.name for topic in self]

def __iter__(self):
yield from self._topic_consumers


class Consumer:
"""
Available settings of the producers (P) and consumers (C)
Expand All @@ -62,6 +42,8 @@ class Consumer:
def __init__(self):
self.config = self.build_config()
self._consumer = ConfluentConsumer(self.config)
self._retries = RetryManager() # blocking retry manager
self._pauses = PauseManager()

def __getattr__(self, name):
"""proxy consumer methods."""
Expand All @@ -82,64 +64,125 @@ def build_config(cls):
def group_id(self) -> str:
return self.config["group.id"]

def commit_offset(self, msg: cimpl.Message):
def commit_offset(self, msg: "cimpl.Message"):
if not self.config.get("enable.auto.offset.store"):
# Store the offset associated with msg to a local cache.
# Stored offsets are committed to Kafka by a background
# thread every 'auto.commit.interval.ms'.
# Explicitly storing offsets after processing gives at-least once semantics.
self.store_offsets(msg)

def retry_msg(self, msg: cimpl.Message, exc: Exception) -> bool:
def pause_partition(self, msg: "cimpl.Message", until: datetime):
"""pauses message partition to process the message at a later time
note: pausing is only retained within the python consumer class, and is not
retained between separate consumer processes.
"""
partition = self._pauses.set(msg, until)
self.seek(partition) # seek back to message offset to re-poll on unpause
self.pause([partition])

def resume_partitions(self):
"""resumes any paused partitions that are now ready"""
for partition in self._pauses.pop_ready():
self.resume([partition])

def blocking_retry(
self,
retry_settings: "RetrySettings",
msg: "cimpl.Message",
exc: Exception,
) -> bool:
"""
blocking retry, managed within the same consumer process
:return: whether the message will be retried
"""
attempt = self._retries.next(msg)
if retry_settings.can_retry(attempt, exc):
until = retry_settings.get_retry_time(attempt)
self.pause_partition(msg, until)
self.log_error(exc)
return True
return False

def non_blocking_retry(
self,
retry_settings: "RetrySettings",
msg: "cimpl.Message",
exc: Exception,
):
"""
non-blocking retry, managed by a separate topic and consumer process
:return: whether the message will be retried
"""
from django_kafka.retry.topic import RetryTopicProducer

topic_consumer = self.get_topic_consumer(msg)
if not topic_consumer.retry_settings:
return False

return RetryTopicProducer(
retry_settings=retry_settings,
group_id=self.group_id,
retry_settings=topic_consumer.retry_settings,
msg=msg,
).retry(exc=exc)

def dead_letter_msg(self, msg: cimpl.Message, exc: Exception):
def retry_msg(self, msg: "cimpl.Message", exc: Exception) -> (bool, bool):
"""
:return tuple: The first element indicates if the message was retried and the
second indicates if the retry was blocking.
"""
retry_settings = self.get_topic(msg).retry_settings
if not retry_settings:
return False, False
if retry_settings.blocking:
return self.blocking_retry(retry_settings, msg, exc), True
return self.non_blocking_retry(retry_settings, msg, exc), False

def dead_letter_msg(self, msg: "cimpl.Message", exc: Exception):
"""publishes a message to the dead letter topic, with exception details"""
from django_kafka.dead_letter.topic import DeadLetterTopicProducer

DeadLetterTopicProducer(group_id=self.group_id, msg=msg).produce_for(
header_message=str(exc),
header_detail=traceback.format_exc(),
)

def handle_exception(self, msg: cimpl.Message, exc: Exception):
retried = self.retry_msg(msg, exc)
def handle_exception(self, msg: "cimpl.Message", exc: Exception) -> bool:
"""
return bool: Indicates if the message was processed and offset can be committed.
"""
retried, blocking = self.retry_msg(msg, exc)
if not retried:
self.dead_letter_msg(msg, exc)
self.log_error(exc)
return True
return not blocking

def get_topic_consumer(self, msg: cimpl.Message) -> "TopicConsumer":
def get_topic(self, msg: "cimpl.Message") -> "TopicConsumer":
return self.topics.get(topic_name=msg.topic())

def log_error(self, error):
logger.error(error, exc_info=True)

def consume(self, msg):
self.get_topic_consumer(msg).consume(msg)
self.get_topic(msg).consume(msg)

def process_message(self, msg: cimpl.Message):
def process_message(self, msg: "cimpl.Message"):
if msg_error := msg.error():
self.log_error(msg_error)
return

try:
self.consume(msg)
# ruff: noqa: BLE001 (we do not want consumer to stop if message consumption fails in any circumstances)
except Exception as exc:
self.handle_exception(msg, exc)
# ruff: noqa: BLE001 (do not stop consumer if message consumption fails in any circumstances)
processed = self.handle_exception(msg, exc)
else:
processed = True

self.commit_offset(msg)
if processed:
self.commit_offset(msg)

def poll(self) -> Optional[cimpl.Message]:
def poll(self) -> Optional["cimpl.Message"]:
# poll for self.polling_freq seconds
return self._consumer.poll(timeout=self.polling_freq)

Expand All @@ -150,6 +193,7 @@ def run(self):
try:
self.start()
while True:
self.resume_partitions()
if (msg := self.poll()) is not None:
self.process_message(msg)
except Exception as exc:
Expand Down
Loading

0 comments on commit 3239e17

Please sign in to comment.