Skip to content

Commit

Permalink
feat: migrate handler to eventtracking
Browse files Browse the repository at this point in the history
  • Loading branch information
Ian2012 committed Feb 13, 2024
1 parent 7347639 commit b0ce5b9
Show file tree
Hide file tree
Showing 3 changed files with 200 additions and 0 deletions.
2 changes: 2 additions & 0 deletions eventtracking/django/apps.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,3 +21,5 @@ def ready(self):
# pylint: disable=import-outside-toplevel
from eventtracking.django.django_tracker import override_default_tracker
override_default_tracker()

from eventtracking import handlers # noqa: F401
60 changes: 60 additions & 0 deletions eventtracking/handlers.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
"""
This module contains the handlers for signals emitted by the analytics app.
"""
import json
import logging

from django.dispatch import receiver
from eventtracking.backends.event_bus import EventBusRoutingBackend
from eventtracking.processors.exceptions import EventEmissionExit
from eventtracking.tracker import get_tracker
from openedx_events.analytics.signals import TRACKING_EVENT_EMITTED
from openedx_events.tooling import SIGNAL_PROCESSED_FROM_EVENT_BUS

logger = logging.getLogger(__name__)


@receiver(TRACKING_EVENT_EMITTED)
def send_tracking_log_to_backends(
sender, signal, **kwargs
): # pylint: disable=unused-argument
"""
Listen for the TRACKING_EVENT_EMITTED signal and send the event to the enabled backends.
The process is the following:
1. Unserialize the tracking log from the signal.
2. Get the tracker instance to get the enabled backends (mongo, event_bus, logger, etc).
3. Get the event bus backends that are the interested in the signals (multiple can be configured).
4. Transform the event with the configured processors.
5. Send the transformed event to the different event bus backends.
This allows us to only send the tracking log to the event bus once and the event bus will send
the transformed event to the different configured backends.
"""
if not kwargs.get(SIGNAL_PROCESSED_FROM_EVENT_BUS, False):
logger.debug("Event received from a non-event bus backend, skipping...")
return
tracking_log = kwargs.get("tracking_log")

event = {
"name": tracking_log.name,
"timestamp": tracking_log.timestamp,
"data": json.loads(tracking_log.data),
"context": json.loads(tracking_log.context),
}

tracker = get_tracker()

engines = {
name: engine
for name, engine in tracker.backends.items()
if isinstance(engine, EventBusRoutingBackend)
}
for name, engine in engines.items():
try:
processed_event = engine.process_event(event)
logger.info('Successfully processed event "{}"'.format(event["name"]))
engine.send_to_backends(processed_event.copy())
except EventEmissionExit:
logger.info("[EventEmissionExit] skipping event {}".format(event["name"]))
138 changes: 138 additions & 0 deletions eventtracking/tests/test_handlers.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,138 @@
"""
Test handlers for signals emitted by the analytics app
"""

from unittest.mock import Mock, patch

from django.test import TestCase
from django.test.utils import override_settings
from eventtracking.django.django_tracker import DjangoTracker
from eventtracking.handlers import send_tracking_log_to_backends
from openedx_events.analytics.data import TrackingLogData
from openedx_events.tooling import SIGNAL_PROCESSED_FROM_EVENT_BUS


class TestHandlers(TestCase):
"""
Tests handlers for signals emitted by the analytics app
"""

@override_settings(
EVENT_TRACKING_BACKENDS={
"event_bus": {
"ENGINE": "eventtracking.backends.event_bus.EventBusRoutingBackend",
"OPTIONS": {},
},
}
)
@patch("eventtracking.handlers.get_tracker")
def test_send_tracking_log_to_backends(
self, mock_get_tracker
):
"""
Test for send_tracking_log_to_backends
"""
tracker = DjangoTracker()
mock_get_tracker.return_value = tracker
mock_backend = Mock()
tracker.backends["event_bus"].send_to_backends = mock_backend
kwargs = {
SIGNAL_PROCESSED_FROM_EVENT_BUS: True,
}

send_tracking_log_to_backends(
sender=None,
signal=None,
tracking_log=TrackingLogData(
name="test_name",
timestamp="test_timestamp",
data="{}",
context="{}",
),
**kwargs
)

mock_backend.assert_called_once_with(
{
"name": "test_name",
"timestamp": "test_timestamp",
"data": {},
"context": {},
}
)

@override_settings(
EVENT_TRACKING_BACKENDS={
"event_bus": {
"ENGINE": "eventtracking.backends.event_bus.EventBusRoutingBackend",
"OPTIONS": {
"processors": [
{
"ENGINE": "eventtracking.processors.whitelist.NameWhitelistProcessor",
"OPTIONS": {
"whitelist": ["no_test_name"]
}
}
],
},
},
}
)
@patch("eventtracking.handlers.get_tracker")
@patch("eventtracking.handlers.isinstance")
@patch("eventtracking.handlers.logger")
def test_send_tracking_log_to_backends_error(
self, mock_logger, mock_is_instance, mock_get_tracker
):
"""
Test for send_tracking_log_to_backends
"""
tracker = DjangoTracker()
mock_get_tracker.return_value = tracker
mock_is_instance.return_value = True
kwargs = {
SIGNAL_PROCESSED_FROM_EVENT_BUS: True,
}

x = send_tracking_log_to_backends(
sender=None,
signal=None,
tracking_log=TrackingLogData(
name="test_name",
timestamp="test_timestamp",
data="{}",
context="{}",
),
**kwargs
)

assert x is None

mock_logger.info.assert_called_once_with(
"[EventEmissionExit] skipping event {}".format("test_name")
)

@patch("eventtracking.handlers.logger")
def test_send_tracking_log_to_backends_in_same_runtime(
self, mock_logger
):
"""
Test for send_tracking_log_to_backends
"""

x = send_tracking_log_to_backends(
sender=None,
signal=None,
tracking_log=TrackingLogData(
name="test_name",
timestamp="test_timestamp",
data="{}",
context="{}",
)
)

assert x is None

mock_logger.debug.assert_called_once_with(
"Event received from a non-event bus backend, skipping..."
)

0 comments on commit b0ce5b9

Please sign in to comment.