-
Notifications
You must be signed in to change notification settings - Fork 34
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
feat: migrate handler to eventtracking
- Loading branch information
Showing
3 changed files
with
202 additions
and
0 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,61 @@ | ||
""" | ||
This module contains the handlers for signals emitted by the analytics app. | ||
""" | ||
import json | ||
import logging | ||
|
||
from django.dispatch import receiver | ||
from openedx_events.analytics.signals import TRACKING_EVENT_EMITTED | ||
from openedx_events.tooling import SIGNAL_PROCESSED_FROM_EVENT_BUS | ||
|
||
from eventtracking.backends.event_bus import EventBusRoutingBackend | ||
from eventtracking.processors.exceptions import EventEmissionExit | ||
from eventtracking.tracker import get_tracker | ||
|
||
logger = logging.getLogger(__name__) | ||
|
||
|
||
@receiver(TRACKING_EVENT_EMITTED) | ||
def send_tracking_log_to_backends( | ||
sender, signal, **kwargs | ||
): # pylint: disable=unused-argument | ||
""" | ||
Listen for the TRACKING_EVENT_EMITTED signal and send the event to the enabled backends. | ||
The process is the following: | ||
1. Unserialize the tracking log from the signal. | ||
2. Get the tracker instance to get the enabled backends (mongo, event_bus, logger, etc). | ||
3. Get the event bus backends that are the interested in the signals (multiple can be configured). | ||
4. Transform the event with the configured processors. | ||
5. Send the transformed event to the different event bus backends. | ||
This allows us to only send the tracking log to the event bus once and the event bus will send | ||
the transformed event to the different configured backends. | ||
""" | ||
if not kwargs.get(SIGNAL_PROCESSED_FROM_EVENT_BUS, False): | ||
logger.debug("Event received from a non-event bus backend, skipping...") | ||
return | ||
tracking_log = kwargs.get("tracking_log") | ||
|
||
event = { | ||
"name": tracking_log.name, | ||
"timestamp": tracking_log.timestamp, | ||
"data": json.loads(tracking_log.data), | ||
"context": json.loads(tracking_log.context), | ||
} | ||
|
||
tracker = get_tracker() | ||
|
||
engines = { | ||
name: engine | ||
for name, engine in tracker.backends.items() | ||
if isinstance(engine, EventBusRoutingBackend) | ||
} | ||
for name, engine in engines.items(): | ||
try: | ||
processed_event = engine.process_event(event) | ||
logger.info('Successfully processed event "{}"'.format(event["name"])) | ||
engine.send_to_backends(processed_event.copy()) | ||
except EventEmissionExit: | ||
logger.info("[EventEmissionExit] skipping event {}".format(event["name"])) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,139 @@ | ||
""" | ||
Test handlers for signals emitted by the analytics app | ||
""" | ||
|
||
from unittest.mock import Mock, patch | ||
|
||
from django.test import TestCase | ||
from django.test.utils import override_settings | ||
from openedx_events.analytics.data import TrackingLogData | ||
from openedx_events.tooling import SIGNAL_PROCESSED_FROM_EVENT_BUS | ||
|
||
from eventtracking.django.django_tracker import DjangoTracker | ||
from eventtracking.handlers import send_tracking_log_to_backends | ||
|
||
|
||
class TestHandlers(TestCase): | ||
""" | ||
Tests handlers for signals emitted by the analytics app | ||
""" | ||
|
||
@override_settings( | ||
EVENT_TRACKING_BACKENDS={ | ||
"event_bus": { | ||
"ENGINE": "eventtracking.backends.event_bus.EventBusRoutingBackend", | ||
"OPTIONS": {}, | ||
}, | ||
} | ||
) | ||
@patch("eventtracking.handlers.get_tracker") | ||
def test_send_tracking_log_to_backends( | ||
self, mock_get_tracker | ||
): | ||
""" | ||
Test for send_tracking_log_to_backends | ||
""" | ||
tracker = DjangoTracker() | ||
mock_get_tracker.return_value = tracker | ||
mock_backend = Mock() | ||
tracker.backends["event_bus"].send_to_backends = mock_backend | ||
kwargs = { | ||
SIGNAL_PROCESSED_FROM_EVENT_BUS: True, | ||
} | ||
|
||
send_tracking_log_to_backends( | ||
sender=None, | ||
signal=None, | ||
tracking_log=TrackingLogData( | ||
name="test_name", | ||
timestamp="test_timestamp", | ||
data="{}", | ||
context="{}", | ||
), | ||
**kwargs | ||
) | ||
|
||
mock_backend.assert_called_once_with( | ||
{ | ||
"name": "test_name", | ||
"timestamp": "test_timestamp", | ||
"data": {}, | ||
"context": {}, | ||
} | ||
) | ||
|
||
@override_settings( | ||
EVENT_TRACKING_BACKENDS={ | ||
"event_bus": { | ||
"ENGINE": "eventtracking.backends.event_bus.EventBusRoutingBackend", | ||
"OPTIONS": { | ||
"processors": [ | ||
{ | ||
"ENGINE": "eventtracking.processors.whitelist.NameWhitelistProcessor", | ||
"OPTIONS": { | ||
"whitelist": ["no_test_name"] | ||
} | ||
} | ||
], | ||
}, | ||
}, | ||
} | ||
) | ||
@patch("eventtracking.handlers.get_tracker") | ||
@patch("eventtracking.handlers.isinstance") | ||
@patch("eventtracking.handlers.logger") | ||
def test_send_tracking_log_to_backends_error( | ||
self, mock_logger, mock_is_instance, mock_get_tracker | ||
): | ||
""" | ||
Test for send_tracking_log_to_backends | ||
""" | ||
tracker = DjangoTracker() | ||
mock_get_tracker.return_value = tracker | ||
mock_is_instance.return_value = True | ||
kwargs = { | ||
SIGNAL_PROCESSED_FROM_EVENT_BUS: True, | ||
} | ||
|
||
x = send_tracking_log_to_backends( | ||
sender=None, | ||
signal=None, | ||
tracking_log=TrackingLogData( | ||
name="test_name", | ||
timestamp="test_timestamp", | ||
data="{}", | ||
context="{}", | ||
), | ||
**kwargs | ||
) | ||
|
||
assert x is None | ||
|
||
mock_logger.info.assert_called_once_with( | ||
"[EventEmissionExit] skipping event {}".format("test_name") | ||
) | ||
|
||
@patch("eventtracking.handlers.logger") | ||
def test_send_tracking_log_to_backends_in_same_runtime( | ||
self, mock_logger | ||
): | ||
""" | ||
Test for send_tracking_log_to_backends | ||
""" | ||
|
||
x = send_tracking_log_to_backends( | ||
sender=None, | ||
signal=None, | ||
tracking_log=TrackingLogData( | ||
name="test_name", | ||
timestamp="test_timestamp", | ||
data="{}", | ||
context="{}", | ||
) | ||
) | ||
|
||
assert x is None | ||
|
||
mock_logger.debug.assert_called_once_with( | ||
"Event received from a non-event bus backend, skipping..." | ||
) |