Skip to content

Commit

Permalink
feat: add producer.suppress for global message suppression, refs #27
Browse files Browse the repository at this point in the history
  • Loading branch information
stefan-cardnell-rh committed Oct 17, 2024
1 parent 3a075b8 commit 44bb778
Show file tree
Hide file tree
Showing 9 changed files with 229 additions and 46 deletions.
5 changes: 5 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,10 @@
# Changelog

## 0.5.2 (2024-10-17)
* Added `producer.suppress` decorator.
* Renamed `KafkaSkipModel` to `KafkaConnectSkipModel`.
* Renamed `KafkaConnectSkipQueryset` to `KafkaConnectSkipQueryset`

## 0.5.1 (2024-10-16)
* `ModelTopicConsumer.sync` returns now the results of the `update_or_create` method.
* Add `days_from_epoch_to_date` function to convert `io.debezium.time.Date` to python `datetime.date`.
Expand Down
78 changes: 58 additions & 20 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -203,7 +203,7 @@ When the consumption of a message in a retryable topic fails, the message is re-

When consumers are started using [start commands](#start-the-Consumers), an additional retry consumer will be started in parallel for any consumer containing a retryable topic. This retry consumer will be assigned to a consumer group whose id is a combination of the original group id and a `.retry` suffix. This consumer is subscribed to the retry topics, and manages the message retry and delay behaviour. Please note that messages are retried directly by the retry consumer and are not sent back to the original topic.

## Connectors
## Connectors:

Connectors are auto-discovered and are expected to be located under the `some_django_app/kafka/connectors.py` or `some_django_app/connectors.py`.

Expand Down Expand Up @@ -378,56 +378,94 @@ Prefix which will be added to the connector name when publishing the connector.
Used by `django_kafka.connect.connector.Connector` to initialize `django_kafka.connect.client.KafkaConnectClient`.
## Bidirectional data sync with no infinite event loop.
## Suppressing message production:
**For example, you want to keep a User table in sync in multiple systems.**
`django-kafka` provides two ways to suppress message production system-wide:
### `producer.suppress`
Use the `producer.suppress` decorator and context manager to suppress the production of messages generated by the `Producer` class during a particular context.
```python
from django_kafka import producer
The idea is to send events from all systems to the same topic, and also consume events from the same topic, marking the record with `kafka_skip=True` at the consumption time.
- Producer should respect `kafka_skip=True` and do not produce new events when `True`.
- Any updates to the User table, which are happening outside the consumer, should set `kafka_skip=False` which will allow the producer to create an event again.
This way the chronology is strictly kept and the infinite events loop is avoided.
@producer.suppress(["topic1"]) # suppress production to topic1
def my_function():
...
The disadvantage is that each system will still consume its own message.
#### There are 2 classes for django Model and for QuerySet:
def my_function_two():
with producer.suppress(["topic1"]): # suppress production to topic1
...
```
#### KafkaSkipModel
Adds the `kafka_skip` boolean field, defaulting to `False`. This also automatically resets `kafka_skip` to `False` when updating model instances (if not explicitly set).
`producer.suppress` can take a list of topic names, or no arguments to suppress production to all topics.
Use `suppress(deactivate=True)` to deactivate any set suppression during a specific context.
### `KafkaConnectSkipModel.kafka_skip`
Pythonic suppression methods will not suffice when using Kafka Connect to directly produce events from database changes. In this scenario, it's more appropriate to add a flag to the model database table which indicates if the connector should generate an event. Two classes are provided for django's Model and QuerySet to manage this flag:
#### KafkaConnectSkipModel
Adds the `kafka_skip` boolean field, defaulting to `False`. This also automatically resets `kafka_skip` to `False` when saving instances (if not explicitly set).
Usage:
```python
from django.contrib.auth.base_user import AbstractBaseUser
from django.contrib.auth.models import PermissionsMixin
from django_kafka.models import KafkaSkipModel
from django_kafka.connect.models import KafkaConnectSkipModel
class User(KafkaSkipModel, PermissionsMixin, AbstractBaseUser):
# ...
class User(KafkaConnectSkipModel, PermissionsMixin, AbstractBaseUser):
# ...
```
#### KafkaSkipQueryset
If you have defined a custom manager on your model then you should inherit it from `KafkaSkipQueryset`. It adds `kafka_skip=False` when using `update` method.
#### KafkaConnectSkipQueryset
If you have defined a custom manager on your model then you should inherit it from `KafkaConnectSkipQueryset`. It adds `kafka_skip=False` when using the `update` method.
**Note:** `kafka_skip=False` is only set when it's not provided to the `update` kwargs. E.g. `User.objects.update(first_name="John", kafka_skip=True)` will not be changed to `kafka_skip=False`.
Usage:
```python
from django.contrib.auth.base_user import AbstractBaseUser
from django.contrib.auth.base_user import BaseUserManager
from django.contrib.auth.models import PermissionsMixin
from django_kafka.models import KafkaSkipModel, KafkaSkipQueryset
from django_kafka.connect.models import KafkaConnectSkipModel, KafkaConnectSkipQueryset
class UserManager(BaseUserManager.from_queryset(KafkaSkipQueryset)):
# ...
class UserManager(BaseUserManager.from_queryset(KafkaConnectSkipQueryset)):
# ...
class User(KafkaSkipModel, PermissionsMixin, AbstractBaseUser):
class User(KafkaConnectSkipModel, PermissionsMixin, AbstractBaseUser):
# ...
objects = UserManager()
```
## Bidirectional data sync with no infinite event loop:
**For example, you want to keep a User table in sync in multiple systems.**
### Infinite loop
You are likely to encounter infinite message generation when syncing data between multiple systems. Message suppression helps overcome this issue.
For purely pythonic producers and consumers, the `Consumer.suppress_producers` attribute can be set to suppress all messages produced during consumption.
If producing with Kafka Connect, the `KafkaConnectSkipModel` provides the `kafka_skip` flag; the record should be manually marked with `kafka_skip=True` at consumption time and the connector should be configured not to send events when this flag is set.
### Global message ordering
To maintain global message ordering between systems, all events for the same database table should be sent to the same topic. The disadvantage is that each system will still consume its own message.
## Making a new release
- [bump-my-version](https://github.com/callowayproject/bump-my-version) is used to manage releases.
Expand Down
28 changes: 14 additions & 14 deletions django_kafka/models.py → django_kafka/connect/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,36 +2,36 @@
from django.utils.translation import gettext_lazy as _


class KafkaSkipQueryset(models.QuerySet):
class KafkaConnectSkipQueryset(models.QuerySet):
def update(self, **kwargs) -> int:
kwargs.setdefault("kafka_skip", False)
return super().update(**kwargs)


class KafkaSkipModel(models.Model):
class KafkaConnectSkipModel(models.Model):
"""
For models (tables) which are synced with other database(s) in both directions.
For models (tables) which have Kafka Connect source connectors attached and require
a flag to suppress message production.
Every update which happens from within the system should set `kafka_skip=False`,
global producer (kafka connect, django post_save signal, etc.) will then create
a new event.
The Kafka Connect connector should filter out events based on the kafka_skip flag
provided in this model.
When db update comes from the consumed event, then the row should be manually
marked for skip `kafka_skip=True`, and kafka connector or global python producer
should not generate a new one by filtering it out based on `kafka_skip` field.
Any update to the model instance will reset the kafka_skip flag to False, if not
explicitly set.
This flag can help overcome infinite event loops during bidirectional data sync when
using Kafka. See README.md for more information.
"""

kafka_skip = models.BooleanField(
_("Kafka skip"),
help_text=_(
"Wont generate an event if `True`."
"\nThis field is used to filter out the events to break the infinite loop"
" of message generation when synchronizing 2+ databases."
"\nGets reset to False on .save() method call.",
"Used by Kafka Connect to suppress event creation."
"\nGets reset to False on .save() method call, unless explicitly set.",
),
default=False,
)
objects = KafkaSkipQueryset.as_manager()
objects = KafkaConnectSkipQueryset.as_manager()

class Meta:
abstract = True
Expand Down
12 changes: 10 additions & 2 deletions django_kafka/consumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
from confluent_kafka import Consumer as ConfluentConsumer
from confluent_kafka import cimpl

from django_kafka import producer
from django_kafka.conf import settings
from django_kafka.exceptions import DjangoKafkaError

Expand All @@ -21,7 +22,7 @@ class Topics:

def __init__(self, *topic_consumers: "TopicConsumer"):
self._topic_consumers = topic_consumers
self._match: dict[str, "TopicConsumer"] = {}
self._match: dict[str, TopicConsumer] = {}

def get(self, topic_name: str) -> "TopicConsumer":
if topic_name not in self._match:
Expand Down Expand Up @@ -58,6 +59,7 @@ class Consumer:
polling_freq = settings.POLLING_FREQUENCY
default_logger = logger
default_error_handler = settings.ERROR_HANDLER
suppress_producers = False

def __init__(self):
self.config = self.build_config()
Expand Down Expand Up @@ -123,13 +125,19 @@ def get_topic_consumer(self, msg: cimpl.Message) -> "TopicConsumer":
def log_error(self, error):
logger.error(error, exc_info=True)

def consume(self, msg):
consume = self.get_topic_consumer(msg).consume
if self.suppress_producers:
consume = producer.suppress(consume)
consume(msg)

def process_message(self, msg: cimpl.Message):
if msg_error := msg.error():
self.log_error(msg_error)
return

try:
self.get_topic_consumer(msg).consume(msg)
self.consume(msg)
# ruff: noqa: BLE001 (we do not want consumer to stop if message consumption fails in any circumstances)
except Exception as exc:
self.handle_exception(msg, exc)
Expand Down
46 changes: 45 additions & 1 deletion django_kafka/producer.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
import logging
from contextlib import ContextDecorator
from contextvars import ContextVar
from pydoc import locate
from typing import Optional
from typing import Callable, Optional

from confluent_kafka import Producer as ConfluentProducer

Expand Down Expand Up @@ -41,6 +43,10 @@ def __init__(self, config: Optional[dict] = None, **kwargs):
**kwargs,
)

def produce(self, name, *args, **kwargs):
if not Suppression.active(name):
self._producer.produce(name, *args, **kwargs)

def __getattr__(self, name):
"""
proxy producer methods.
Expand All @@ -51,3 +57,41 @@ def __getattr__(self, name):
# the initialization will fail because `_consumer` is not yet set.
return getattr(self._producer, name)
raise AttributeError(f"'{self.__class__.__name__}' has no attribute 'name'")


class Suppression(ContextDecorator):
"""context manager to help suppress producing messages to desired Kafka topics"""

_var = ContextVar(f"{__name__}.suppression", default=[])

@classmethod
def active(cls, topic: str):
"""returns if suppression is enabled for the given topic"""
topics = cls._var.get()
if topics is None:
return True # all topics
return topic in topics

def __init__(self, topics: Optional[list[str]], deactivate=False):
current = self._var.get()
if deactivate:
self.topics = []
elif topics is None or current is None:
self.topics = None # indicates all topics
elif isinstance(topics, list):
self.topics = current + topics
else:
raise ValueError(f"invalid producer suppression setting {topics}")

def __enter__(self):
self.token = self._var.set(self.topics)
return self

def __exit__(self, *args, **kwargs):
self._var.reset(self.token)


def suppress(topics: Optional[Callable | list[str]] = None, deactivate=False):
if callable(topics):
return Suppression(None)(topics)
return Suppression(topics, deactivate)
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
from unittest.mock import patch

from django_kafka.models import KafkaSkipModel
from django_kafka.connect.models import KafkaConnectSkipModel
from django_kafka.tests.models import AbstractModelTestCase


class KafkaSkipModelTestCase(AbstractModelTestCase):
abstract_model = KafkaSkipModel
model: type[KafkaSkipModel]
class KafkaConnectSkipModelTestCase(AbstractModelTestCase):
abstract_model = KafkaConnectSkipModel
model: type[KafkaConnectSkipModel]

def test_save__direct_instance_respects_set_kafka_skip(self):
"""test `save` on directly created instances will not ignore set kafka_skip"""
Expand Down
86 changes: 86 additions & 0 deletions django_kafka/tests/test_producers.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
from unittest import mock

from django.test import TestCase

from django_kafka.producer import Producer, suppress


@mock.patch("django_kafka.producer.ConfluentProducer")
class ProducerSuppressTestCase(TestCase):
def test_suppress_all(self, mock_confluent_producer):
producer = Producer()

with suppress():
producer.produce("topicA")

mock_confluent_producer.return_value.produce.assert_not_called()

def test_suppress_topic_list(self, mock_confluent_producer):
producer = Producer()

with suppress(["topicA"]):
producer.produce("topicA")
producer.produce("topicB")

mock_confluent_producer.return_value.produce.assert_called_once_with("topicB")

def test_suppress_nested_usage(self, mock_confluent_producer):
"""tests that message suppression lists are combined with later contexts"""
producer = Producer()

with suppress(["topicA"]):
with suppress(["topicB"]):
producer.produce("topicA")
producer.produce("topicB")
producer.produce("topicC")

mock_confluent_producer.return_value.produce.assert_called_once_with("topicC")

def test_suppress_nested_usage_all(self, mock_confluent_producer):
"""test that global message suppression is maintained by later contexts"""
producer = Producer()

with suppress():
with suppress(["topicA"]):
producer.produce("topicB")

mock_confluent_producer.return_value.produce.assert_not_called()

def test_suppress_nested_usage_deactivate(self, mock_confluent_producer):
producer = Producer()

with suppress(["topicA"]):
with suppress(deactivate=True):
producer.produce("topicA")

mock_confluent_producer.return_value.produce.assert_called_once_with("topicA")

def test_suppress_resets(self, mock_confluent_producer):
producer = Producer()

with suppress(["topicA", "topicB"]):
producer.produce("topicA")
producer.produce("topicB")

mock_confluent_producer.return_value.produce.assert_called_once_with("topicB")

def test_suppress_usable_as_decorator(self, mock_confluent_producer):
producer = Producer()

@suppress(["topicA"])
def _produce_args():
producer.produce("topicA")

@suppress()
def _produce_empty_args():
producer.produce("topicA")

@suppress
def _produce_no_args():
producer.produce("topicA")

_produce_args()
_produce_empty_args()
_produce_no_args()

mock_confluent_producer.return_value.produce.assert_not_called()
Loading

0 comments on commit 44bb778

Please sign in to comment.