diff --git a/README.rst b/README.rst index 5ada9141..6d80a2b4 100644 --- a/README.rst +++ b/README.rst @@ -149,6 +149,61 @@ An example configuration for ``AsyncRoutingBackend`` is provided below:: } +Event Bus Routing +----------------- + +``event-tracking`` provides a solution for routing events to the Event Bus +using the ``EventBusBackend``. It extends ``RoutingBackend`` but sends events +to the Event Bus. + +It can: + +* Process event through the configured processors. +* If the event is allowed via `EVENT_BUS_TRACKING_LOGS`, send it to the Event Bus. + +Make sure to enable the setting: ``SEND_TRACKING_EVENT_EMITTED_SIGNAL`` to allow the +``EventBusBackend`` to send events to the Event Bus. + +An example configuration for ``EventBusBackend`` is provided below:: + + EVENT_TRACKING_BACKENDS = { + 'xapi': { + 'ENGINE': 'eventtracking.backends.event_bus.EventBusBackend', + 'OPTIONS': { + 'backend_name': 'xapi', + 'processors': [ + { + 'ENGINE': 'eventtracking.processors.regex_filter.RegexFilter', + 'OPTIONS':{ + 'filter_type': 'allowlist', + 'regular_expressions': [ + 'edx.course.enrollment.activated', + 'edx.course.enrollment.deactivated', + ] + } + } + ], + 'backends': { + 'xapi': { + 'ENGINE': 'dummy.backend.engine', + 'OPTIONS': { + ... + } + } + }, + }, + }, + 'tracking_logs': { + ... + } + ... + } + + EVENT_BUS_TRACKING_LOGS = [ + 'edx.course.enrollment.activated', + 'edx.course.enrollment.deactivated', + ] + Roadmap ------- diff --git a/doc/user_guide/api/eventtracking.backends.rst b/doc/user_guide/api/eventtracking.backends.rst index 51a037ae..f2f17fce 100644 --- a/doc/user_guide/api/eventtracking.backends.rst +++ b/doc/user_guide/api/eventtracking.backends.rst @@ -42,3 +42,11 @@ eventtracking.backends.segment :undoc-members: :show-inheritance: + +eventtracking.backends.event_bus +------------------------------ + +.. automodule:: eventtracking.backends.event_bus + :members: + :undoc-members: + :show-inheritance: diff --git a/eventtracking/__init__.py b/eventtracking/__init__.py index 054050cd..ffa15514 100644 --- a/eventtracking/__init__.py +++ b/eventtracking/__init__.py @@ -1,3 +1,3 @@ """A simple event tracking library""" -__version__ = '2.2.0' +__version__ = '2.3.0' diff --git a/eventtracking/backends/event_bus.py b/eventtracking/backends/event_bus.py new file mode 100644 index 00000000..7be870d0 --- /dev/null +++ b/eventtracking/backends/event_bus.py @@ -0,0 +1,52 @@ +"""Event tracker backend that emits events to the event-bus.""" + +import json +import logging +from datetime import datetime + +from django.conf import settings +from openedx_events.analytics.data import TrackingLogData +from openedx_events.analytics.signals import TRACKING_EVENT_EMITTED + +from eventtracking.backends.logger import DateTimeJSONEncoder +from eventtracking.backends.routing import RoutingBackend +from eventtracking.config import SEND_TRACKING_EVENT_EMITTED_SIGNAL + +logger = logging.getLogger(__name__) + + +class EventBusRoutingBackend(RoutingBackend): + """ + Event tracker backend for the event bus. + """ + + def send(self, event): + """ + Send the tracking log event to the event bus by emitting the + TRACKING_EVENT_EMITTED signal using custom metadata. + """ + if not SEND_TRACKING_EVENT_EMITTED_SIGNAL.is_enabled(): + return + + name = event.get("name") + + if name not in getattr(settings, "EVENT_BUS_TRACKING_LOGS", []): + return + + data = json.dumps(event.get("data"), cls=DateTimeJSONEncoder) + context = json.dumps(event.get("context"), cls=DateTimeJSONEncoder) + + timestamp = event.get("timestamp") + + if isinstance(timestamp, str): + timestamp = datetime.strptime(timestamp, "%Y-%m-%dT%H:%M:%S.%f%z") + + tracking_log = TrackingLogData( + name=event.get("name"), + timestamp=timestamp, + data=data, + context=context, + ) + TRACKING_EVENT_EMITTED.send_event(tracking_log=tracking_log) + + logger.info(f"Tracking log {tracking_log.name} emitted to the event bus.") diff --git a/eventtracking/backends/routing.py b/eventtracking/backends/routing.py index c8238a91..d59d941e 100644 --- a/eventtracking/backends/routing.py +++ b/eventtracking/backends/routing.py @@ -118,7 +118,7 @@ def process_event(self, event): processed_event = modified_event except EventEmissionExit: raise - except Exception: # pylint: disable=broad-except + except Exception: LOG.exception( 'Failed to execute processor: %s', str(processor) ) @@ -142,7 +142,7 @@ def send_to_backends(self, event): LOG.info('[send_to_backends] Failed to send edx event "%s" to "%s" backend. "%s" backend has' ' not been enabled, [%s]', event["name"], name, name, repr(exc) ) - except Exception: # pylint: disable=broad-except + except Exception: LOG.exception( 'Unable to send edx event "%s" to backend: %s', event["name"], name ) diff --git a/eventtracking/backends/tests/test_event_bus.py b/eventtracking/backends/tests/test_event_bus.py new file mode 100644 index 00000000..bd170f2f --- /dev/null +++ b/eventtracking/backends/tests/test_event_bus.py @@ -0,0 +1,69 @@ +""" +Test the async routing backend. +""" + +import json +from datetime import datetime +from unittest import TestCase +from unittest.mock import patch + +from django.test import override_settings +from openedx_events.analytics.data import TrackingLogData + +from eventtracking.backends.event_bus import EventBusRoutingBackend + + +class TestAsyncRoutingBackend(TestCase): + """ + Test the async routing backend. + """ + + def setUp(self): + super().setUp() + self.sample_event = { + "name": "sample_event", + "data": {"foo": "bar"}, + "timestamp": "2020-01-01T12:12:12.000000+00:00", + "context": {"baz": "qux"}, + } + + @patch("eventtracking.backends.event_bus.EventBusRoutingBackend.send") + def test_successful_send(self, mocked_send_event): + backend = EventBusRoutingBackend() + backend.send(self.sample_event) + mocked_send_event.assert_called_once_with(self.sample_event) + + @override_settings( + SEND_TRACKING_EVENT_EMITTED_SIGNAL=True, + EVENT_BUS_TRACKING_LOGS=["sample_event"], + ) + @patch("eventtracking.backends.event_bus.TRACKING_EVENT_EMITTED.send_event") + def test_successful_send_event(self, mock_send_event): + backend = EventBusRoutingBackend() + backend.send(self.sample_event) + + mock_send_event.assert_called() + self.assertDictContainsSubset( + { + "tracking_log": TrackingLogData( + name=self.sample_event["name"], + timestamp=datetime.strptime( + self.sample_event["timestamp"], "%Y-%m-%dT%H:%M:%S.%f%z" + ), + data=json.dumps(self.sample_event["data"]), + context=json.dumps(self.sample_event["context"]), + ) + }, + mock_send_event.call_args.kwargs, + ) + + @patch( + "eventtracking.backends.event_bus.SEND_TRACKING_EVENT_EMITTED_SIGNAL.is_enabled" + ) + @patch("eventtracking.backends.event_bus.TRACKING_EVENT_EMITTED.send_event") + def test_event_is_disabled(self, mock_send_event, mock_is_enabled): + mock_is_enabled.return_value = False + backend = EventBusRoutingBackend() + backend.send(self.sample_event) + mock_is_enabled.assert_called_once() + mock_send_event.assert_not_called() diff --git a/eventtracking/config.py b/eventtracking/config.py new file mode 100644 index 00000000..0dd5ef0f --- /dev/null +++ b/eventtracking/config.py @@ -0,0 +1,19 @@ +""" +This module contains various configuration settings via +waffle switches for the event-tracking app. +""" + +from edx_toggles.toggles import SettingToggle + +# .. toggle_name: SEND_TRACKING_EVENT_EMITTED_SIGNAL +# .. toggle_implementation: SettingToggle +# .. toggle_default: False +# .. toggle_description: When True, the system will publish `TRACKING_EVENT_EMITTED` signals to the event bus. The +# `TRACKING_EVENT_EMITTED` signal is emit when a tracking log is emitted. +# .. toggle_use_cases: opt_in +# .. toggle_creation_date: 2023-10-26 +SEND_TRACKING_EVENT_EMITTED_SIGNAL = SettingToggle( + 'SEND_TRACKING_EVENT_EMITTED_SIGNAL', + default=False, + module_name=__name__ +) diff --git a/eventtracking/django/apps.py b/eventtracking/django/apps.py index c2fc320a..2f1bb7b6 100644 --- a/eventtracking/django/apps.py +++ b/eventtracking/django/apps.py @@ -21,3 +21,5 @@ def ready(self): # pylint: disable=import-outside-toplevel from eventtracking.django.django_tracker import override_default_tracker override_default_tracker() + + from eventtracking import handlers # pylint: disable=import-outside-toplevel, unused-import diff --git a/eventtracking/handlers.py b/eventtracking/handlers.py new file mode 100644 index 00000000..22841128 --- /dev/null +++ b/eventtracking/handlers.py @@ -0,0 +1,61 @@ +""" +This module contains the handlers for signals emitted by the analytics app. +""" +import json +import logging + +from django.dispatch import receiver +from openedx_events.analytics.signals import TRACKING_EVENT_EMITTED +from openedx_events.tooling import SIGNAL_PROCESSED_FROM_EVENT_BUS + +from eventtracking.backends.event_bus import EventBusRoutingBackend +from eventtracking.processors.exceptions import EventEmissionExit +from eventtracking.tracker import get_tracker + +logger = logging.getLogger(__name__) + + +@receiver(TRACKING_EVENT_EMITTED) +def send_tracking_log_to_backends( + sender, signal, **kwargs +): # pylint: disable=unused-argument + """ + Listen for the TRACKING_EVENT_EMITTED signal and send the event to the enabled backends. + + The process is the following: + + 1. Unserialize the tracking log from the signal. + 2. Get the tracker instance to get the enabled backends (mongo, event_bus, logger, etc). + 3. Get the event bus backends that are the interested in the signals (multiple can be configured). + 4. Transform the event with the configured processors. + 5. Send the transformed event to the different event bus backends. + + This allows us to only send the tracking log to the event bus once and the event bus will send + the transformed event to the different configured backends. + """ + if not kwargs.get(SIGNAL_PROCESSED_FROM_EVENT_BUS, False): + logger.debug("Event received from a non-event bus backend, skipping...") + return + tracking_log = kwargs.get("tracking_log") + + event = { + "name": tracking_log.name, + "timestamp": tracking_log.timestamp, + "data": json.loads(tracking_log.data), + "context": json.loads(tracking_log.context), + } + + tracker = get_tracker() + + engines = { + name: engine + for name, engine in tracker.backends.items() + if isinstance(engine, EventBusRoutingBackend) + } + for name, engine in engines.items(): + try: + processed_event = engine.process_event(event) + logger.info('Successfully processed event "{}"'.format(event["name"])) + engine.send_to_backends(processed_event.copy()) + except EventEmissionExit: + logger.info("[EventEmissionExit] skipping event {}".format(event["name"])) diff --git a/eventtracking/tests/test_handlers.py b/eventtracking/tests/test_handlers.py new file mode 100644 index 00000000..009f713e --- /dev/null +++ b/eventtracking/tests/test_handlers.py @@ -0,0 +1,139 @@ +""" +Test handlers for signals emitted by the analytics app +""" + +from unittest.mock import Mock, patch + +from django.test import TestCase +from django.test.utils import override_settings +from openedx_events.analytics.data import TrackingLogData +from openedx_events.tooling import SIGNAL_PROCESSED_FROM_EVENT_BUS + +from eventtracking.django.django_tracker import DjangoTracker +from eventtracking.handlers import send_tracking_log_to_backends + + +class TestHandlers(TestCase): + """ + Tests handlers for signals emitted by the analytics app + """ + + @override_settings( + EVENT_TRACKING_BACKENDS={ + "event_bus": { + "ENGINE": "eventtracking.backends.event_bus.EventBusRoutingBackend", + "OPTIONS": {}, + }, + } + ) + @patch("eventtracking.handlers.get_tracker") + def test_send_tracking_log_to_backends( + self, mock_get_tracker + ): + """ + Test for send_tracking_log_to_backends + """ + tracker = DjangoTracker() + mock_get_tracker.return_value = tracker + mock_backend = Mock() + tracker.backends["event_bus"].send_to_backends = mock_backend + kwargs = { + SIGNAL_PROCESSED_FROM_EVENT_BUS: True, + } + + send_tracking_log_to_backends( + sender=None, + signal=None, + tracking_log=TrackingLogData( + name="test_name", + timestamp="test_timestamp", + data="{}", + context="{}", + ), + **kwargs + ) + + mock_backend.assert_called_once_with( + { + "name": "test_name", + "timestamp": "test_timestamp", + "data": {}, + "context": {}, + } + ) + + @override_settings( + EVENT_TRACKING_BACKENDS={ + "event_bus": { + "ENGINE": "eventtracking.backends.event_bus.EventBusRoutingBackend", + "OPTIONS": { + "processors": [ + { + "ENGINE": "eventtracking.processors.whitelist.NameWhitelistProcessor", + "OPTIONS": { + "whitelist": ["no_test_name"] + } + } + ], + }, + }, + } + ) + @patch("eventtracking.handlers.get_tracker") + @patch("eventtracking.handlers.isinstance") + @patch("eventtracking.handlers.logger") + def test_send_tracking_log_to_backends_error( + self, mock_logger, mock_is_instance, mock_get_tracker + ): + """ + Test for send_tracking_log_to_backends + """ + tracker = DjangoTracker() + mock_get_tracker.return_value = tracker + mock_is_instance.return_value = True + kwargs = { + SIGNAL_PROCESSED_FROM_EVENT_BUS: True, + } + + x = send_tracking_log_to_backends( + sender=None, + signal=None, + tracking_log=TrackingLogData( + name="test_name", + timestamp="test_timestamp", + data="{}", + context="{}", + ), + **kwargs + ) + + assert x is None + + mock_logger.info.assert_called_once_with( + "[EventEmissionExit] skipping event {}".format("test_name") + ) + + @patch("eventtracking.handlers.logger") + def test_send_tracking_log_to_backends_in_same_runtime( + self, mock_logger + ): + """ + Test for send_tracking_log_to_backends + """ + + x = send_tracking_log_to_backends( + sender=None, + signal=None, + tracking_log=TrackingLogData( + name="test_name", + timestamp="test_timestamp", + data="{}", + context="{}", + ) + ) + + assert x is None + + mock_logger.debug.assert_called_once_with( + "Event received from a non-event bus backend, skipping..." + ) diff --git a/requirements/base.in b/requirements/base.in index a2f491a7..d0f11a51 100644 --- a/requirements/base.in +++ b/requirements/base.in @@ -8,3 +8,5 @@ pytz six celery edx-django-utils +openedx-events>=9.5.1 +edx-toggles diff --git a/requirements/base.txt b/requirements/base.txt index 5d77697b..4e16d633 100644 --- a/requirements/base.txt +++ b/requirements/base.txt @@ -8,6 +8,8 @@ amqp==5.2.0 # via kombu asgiref==3.7.2 # via django +attrs==23.2.0 + # via openedx-events backports-zoneinfo[tzdata]==0.2.1 # via # celery @@ -26,6 +28,7 @@ click==8.1.7 # click-didyoumean # click-plugins # click-repl + # code-annotations # edx-django-utils click-didyoumean==0.3.0 # via celery @@ -33,6 +36,8 @@ click-plugins==1.1.1 # via celery click-repl==0.3.0 # via celery +code-annotations==1.6.0 + # via edx-toggles django==3.2.24 # via # -c https://raw.githubusercontent.com/edx/edx-lint/master/edx_lint/files/common_constraints.txt @@ -40,16 +45,37 @@ django==3.2.24 # django-crum # django-waffle # edx-django-utils + # edx-toggles + # openedx-events django-crum==0.7.9 - # via edx-django-utils + # via + # edx-django-utils + # edx-toggles django-waffle==4.1.0 - # via edx-django-utils + # via + # edx-django-utils + # edx-toggles edx-django-utils==5.10.1 + # via + # -r requirements/base.in + # edx-toggles + # openedx-events +edx-opaque-keys[django]==2.5.1 + # via openedx-events +edx-toggles==5.1.1 # via -r requirements/base.in +fastavro==1.9.3 + # via openedx-events +jinja2==3.1.3 + # via code-annotations kombu==5.3.5 # via celery +markupsafe==2.1.5 + # via jinja2 newrelic==9.6.0 # via edx-django-utils +openedx-events==9.5.1 + # via -r requirements/base.in pbr==6.0.0 # via stevedore prompt-toolkit==3.0.43 @@ -59,15 +85,21 @@ psutil==5.9.8 pycparser==2.21 # via cffi pymongo==3.13.0 - # via -r requirements/base.in + # via + # -r requirements/base.in + # edx-opaque-keys pynacl==1.5.0 # via edx-django-utils python-dateutil==2.8.2 # via celery +python-slugify==8.0.4 + # via code-annotations pytz==2024.1 # via # -r requirements/base.in # django +pyyaml==6.0.1 + # via code-annotations six==1.16.0 # via # -r requirements/base.in @@ -75,10 +107,16 @@ six==1.16.0 sqlparse==0.4.4 # via django stevedore==5.1.0 - # via edx-django-utils + # via + # code-annotations + # edx-django-utils + # edx-opaque-keys +text-unidecode==1.3 + # via python-slugify typing-extensions==4.9.0 # via # asgiref + # edx-opaque-keys # kombu tzdata==2024.1 # via diff --git a/requirements/dev.txt b/requirements/dev.txt index d237deb6..41d8d3b8 100644 --- a/requirements/dev.txt +++ b/requirements/dev.txt @@ -19,6 +19,10 @@ astroid==3.0.3 # -r requirements/test.txt # pylint # pylint-celery +attrs==23.2.0 + # via + # -r requirements/test.txt + # openedx-events babel==2.14.0 # via sphinx backports-zoneinfo[tzdata]==0.2.1 @@ -88,6 +92,7 @@ code-annotations==1.6.0 # via # -r requirements/test.txt # edx-lint + # edx-toggles colorama==0.4.6 # via # -r requirements/ci.txt @@ -115,26 +120,44 @@ django==3.2.24 # django-crum # django-waffle # edx-django-utils + # edx-toggles + # openedx-events django-crum==0.7.9 # via # -r requirements/test.txt # edx-django-utils + # edx-toggles django-waffle==4.1.0 # via # -r requirements/test.txt # edx-django-utils + # edx-toggles docutils==0.20.1 # via sphinx edx-django-utils==5.10.1 - # via -r requirements/test.txt + # via + # -r requirements/test.txt + # edx-toggles + # openedx-events edx-lint==5.3.6 # via # -r requirements/dev.in # -r requirements/test.txt +edx-opaque-keys[django]==2.5.1 + # via + # -r requirements/test.txt + # edx-opaque-keys + # openedx-events +edx-toggles==5.1.1 + # via -r requirements/test.txt exceptiongroup==1.2.0 # via # -r requirements/test.txt # pytest +fastavro==1.9.3 + # via + # -r requirements/test.txt + # openedx-events filelock==3.13.1 # via # -r requirements/ci.txt @@ -180,6 +203,8 @@ newrelic==9.6.0 # via # -r requirements/test.txt # edx-django-utils +openedx-events==9.5.1 + # via -r requirements/test.txt packaging==23.2 # via # -r requirements/ci.txt @@ -250,7 +275,9 @@ pylint-plugin-utils==0.8.2 # pylint-celery # pylint-django pymongo==3.13.0 - # via -r requirements/test.txt + # via + # -r requirements/test.txt + # edx-opaque-keys pynacl==1.5.0 # via # -r requirements/test.txt @@ -318,6 +345,7 @@ stevedore==5.1.0 # -r requirements/test.txt # code-annotations # edx-django-utils + # edx-opaque-keys text-unidecode==1.3 # via # -r requirements/test.txt @@ -348,6 +376,7 @@ typing-extensions==4.9.0 # -r requirements/test.txt # asgiref # astroid + # edx-opaque-keys # kombu # pylint tzdata==2024.1 diff --git a/requirements/test.txt b/requirements/test.txt index 9ae58e3b..a71e64bf 100644 --- a/requirements/test.txt +++ b/requirements/test.txt @@ -15,6 +15,10 @@ astroid==3.0.3 # via # pylint # pylint-celery +attrs==23.2.0 + # via + # -r requirements/base.txt + # openedx-events backports-zoneinfo[tzdata]==0.2.1 # via # -r requirements/base.txt @@ -54,7 +58,10 @@ click-plugins==1.1.1 # -r requirements/base.txt # celery code-annotations==1.6.0 - # via edx-lint + # via + # -r requirements/base.txt + # edx-lint + # edx-toggles coverage[toml]==7.4.1 # via # coverage @@ -69,31 +76,53 @@ dill==0.3.8 # django-crum # django-waffle # edx-django-utils + # edx-toggles + # openedx-events django-crum==0.7.9 # via # -r requirements/base.txt # edx-django-utils + # edx-toggles django-waffle==4.1.0 # via # -r requirements/base.txt # edx-django-utils + # edx-toggles edx-django-utils==5.10.1 - # via -r requirements/base.txt + # via + # -r requirements/base.txt + # edx-toggles + # openedx-events edx-lint==5.3.6 # via -r requirements/test.in +edx-opaque-keys[django]==2.5.1 + # via + # -r requirements/base.txt + # edx-opaque-keys + # openedx-events +edx-toggles==5.1.1 + # via -r requirements/base.txt exceptiongroup==1.2.0 # via pytest +fastavro==1.9.3 + # via + # -r requirements/base.txt + # openedx-events iniconfig==2.0.0 # via pytest isort==5.13.2 # via pylint jinja2==3.1.3 - # via code-annotations + # via + # -r requirements/base.txt + # code-annotations # via # -r requirements/base.txt # celery markupsafe==2.1.5 - # via jinja2 + # via + # -r requirements/base.txt + # jinja2 mccabe==0.7.0 # via pylint mock==5.1.0 @@ -102,6 +131,8 @@ newrelic==9.6.0 # via # -r requirements/base.txt # edx-django-utils +openedx-events==9.5.1 + # via -r requirements/base.txt packaging==23.2 # via pytest pbr==6.0.0 @@ -140,7 +171,9 @@ pylint-plugin-utils==0.8.2 # pylint-celery # pylint-django pymongo==3.13.0 - # via -r requirements/base.txt + # via + # -r requirements/base.txt + # edx-opaque-keys pynacl==1.5.0 # via # -r requirements/base.txt @@ -154,13 +187,17 @@ python-dateutil==2.8.2 # -r requirements/base.txt # celery python-slugify==8.0.4 - # via code-annotations + # via + # -r requirements/base.txt + # code-annotations pytz==2024.1 # via # -r requirements/base.txt # django pyyaml==6.0.1 - # via code-annotations + # via + # -r requirements/base.txt + # code-annotations six==1.16.0 # via # -r requirements/base.txt @@ -175,8 +212,11 @@ stevedore==5.1.0 # -r requirements/base.txt # code-annotations # edx-django-utils + # edx-opaque-keys text-unidecode==1.3 - # via python-slugify + # via + # -r requirements/base.txt + # python-slugify tomli==2.0.1 # via # coverage @@ -189,6 +229,7 @@ typing-extensions==4.9.0 # -r requirements/base.txt # asgiref # astroid + # edx-opaque-keys # kombu # pylint tzdata==2024.1