From c4c20e0d7abd1f02e502510e2c3380c46065c479 Mon Sep 17 00:00:00 2001 From: Tim McCormack Date: Wed, 20 Sep 2023 17:21:16 +0000 Subject: [PATCH 1/2] feat: Reset request cache before handling event This is a just-in-case measure -- we don't know of any specific code that handles events and relies on the cache. But it's likely to come up at some point. Ticket: https://github.com/openedx/openedx-events/issues/235 --- CHANGELOG.rst | 6 +++++ edx_event_bus_kafka/__init__.py | 2 +- edx_event_bus_kafka/internal/consumer.py | 32 ++++++++++++++++++++++-- 3 files changed, 37 insertions(+), 3 deletions(-) diff --git a/CHANGELOG.rst b/CHANGELOG.rst index 30fed93..6cf1162 100644 --- a/CHANGELOG.rst +++ b/CHANGELOG.rst @@ -14,6 +14,12 @@ Change Log Unreleased ********** +[5.5.0] - 2023-09-21 +******************** +Changed +======= +* Reset edx-django-utils RequestCache before handling each event + [5.4.0] - 2023-08-28 ******************** Changed diff --git a/edx_event_bus_kafka/__init__.py b/edx_event_bus_kafka/__init__.py index be17940..aa63692 100644 --- a/edx_event_bus_kafka/__init__.py +++ b/edx_event_bus_kafka/__init__.py @@ -9,4 +9,4 @@ from edx_event_bus_kafka.internal.consumer import KafkaEventConsumer from edx_event_bus_kafka.internal.producer import KafkaEventProducer, create_producer -__version__ = '5.4.0' +__version__ = '5.5.0' diff --git a/edx_event_bus_kafka/internal/consumer.py b/edx_event_bus_kafka/internal/consumer.py index 9cb050a..9e037fd 100644 --- a/edx_event_bus_kafka/internal/consumer.py +++ b/edx_event_bus_kafka/internal/consumer.py @@ -10,6 +10,7 @@ from django.db import connection from django.dispatch import receiver from django.test.signals import setting_changed +from edx_django_utils.cache import RequestCache from edx_django_utils.monitoring import function_trace, record_exception, set_custom_attribute from edx_toggles.toggles import SettingToggle from openedx_events.event_bus import EventBusConsumer @@ -106,6 +107,31 @@ def _reconnect_to_db_if_needed(): connection.connect() +def _clear_request_cache(): + """ + Clear the RequestCache so that each event consumption starts fresh. + + Signal handlers may be written with the assumption that they are called in the context + of a web request, so we clear the request cache just in case. + """ + RequestCache.clear_all_namespaces() + + +def _prepare_for_new_work_cycle(): + """ + Ensure that the application state is appropriate for performing a new unit of work. + + This mimics some setup/teardown that is normally performed by Django in its + request/response based architecture and that is needed for ensuring a clean and + usable state in this worker-based application. + """ + # Ensure that the database connection is active and usable. + _reconnect_to_db_if_needed() + + # Clear the request cache, in case anything in the signal handlers rely on it. + _clear_request_cache() + + class KafkaEventConsumer(EventBusConsumer): """ Construct consumer for the given topic and group. The consumer can then @@ -278,8 +304,10 @@ def _consume_indefinitely(self): msg = self.consumer.poll(timeout=CONSUMER_POLL_TIMEOUT) if msg is not None: with function_trace('_consume_indefinitely_consume_single_message'): - # Before processing, make sure our db connection is still active - _reconnect_to_db_if_needed() + # Before processing, make sure our application state is similar to + # that of a new Django request. (Mimic setup/teardown.) + _prepare_for_new_work_cycle() + signal = self.determine_signal(msg) msg.set_value(self._deserialize_message_value(msg, signal)) self.emit_signals_from_message(msg, signal) From 3d5822ce2da1ee7f3ad2863f1bce0c0c6c6f8080 Mon Sep 17 00:00:00 2001 From: Tim McCormack Date: Wed, 20 Sep 2023 20:03:23 +0000 Subject: [PATCH 2/2] fixup! Link to broader ticket for application state resetting --- edx_event_bus_kafka/internal/consumer.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/edx_event_bus_kafka/internal/consumer.py b/edx_event_bus_kafka/internal/consumer.py index 9e037fd..50c963f 100644 --- a/edx_event_bus_kafka/internal/consumer.py +++ b/edx_event_bus_kafka/internal/consumer.py @@ -304,8 +304,9 @@ def _consume_indefinitely(self): msg = self.consumer.poll(timeout=CONSUMER_POLL_TIMEOUT) if msg is not None: with function_trace('_consume_indefinitely_consume_single_message'): - # Before processing, make sure our application state is similar to - # that of a new Django request. (Mimic setup/teardown.) + # Before processing, try to make sure our application state is cleaned + # up as would happen at the start of a Django request/response cycle. + # See https://github.com/openedx/openedx-events/issues/236 for details. _prepare_for_new_work_cycle() signal = self.determine_signal(msg)