-
Notifications
You must be signed in to change notification settings - Fork 34
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Merge pull request #246 from eduNEXT/cag/add-openedx-event-signal
feat: add event bus backend
- Loading branch information
Showing
14 changed files
with
532 additions
and
17 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,3 +1,3 @@ | ||
"""A simple event tracking library""" | ||
|
||
__version__ = '2.2.0' | ||
__version__ = '2.3.0' |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,52 @@ | ||
"""Event tracker backend that emits events to the event-bus.""" | ||
|
||
import json | ||
import logging | ||
from datetime import datetime | ||
|
||
from django.conf import settings | ||
from openedx_events.analytics.data import TrackingLogData | ||
from openedx_events.analytics.signals import TRACKING_EVENT_EMITTED | ||
|
||
from eventtracking.backends.logger import DateTimeJSONEncoder | ||
from eventtracking.backends.routing import RoutingBackend | ||
from eventtracking.config import SEND_TRACKING_EVENT_EMITTED_SIGNAL | ||
|
||
logger = logging.getLogger(__name__) | ||
|
||
|
||
class EventBusRoutingBackend(RoutingBackend): | ||
""" | ||
Event tracker backend for the event bus. | ||
""" | ||
|
||
def send(self, event): | ||
""" | ||
Send the tracking log event to the event bus by emitting the | ||
TRACKING_EVENT_EMITTED signal using custom metadata. | ||
""" | ||
if not SEND_TRACKING_EVENT_EMITTED_SIGNAL.is_enabled(): | ||
return | ||
|
||
name = event.get("name") | ||
|
||
if name not in getattr(settings, "EVENT_BUS_TRACKING_LOGS", []): | ||
return | ||
|
||
data = json.dumps(event.get("data"), cls=DateTimeJSONEncoder) | ||
context = json.dumps(event.get("context"), cls=DateTimeJSONEncoder) | ||
|
||
timestamp = event.get("timestamp") | ||
|
||
if isinstance(timestamp, str): | ||
timestamp = datetime.strptime(timestamp, "%Y-%m-%dT%H:%M:%S.%f%z") | ||
|
||
tracking_log = TrackingLogData( | ||
name=event.get("name"), | ||
timestamp=timestamp, | ||
data=data, | ||
context=context, | ||
) | ||
TRACKING_EVENT_EMITTED.send_event(tracking_log=tracking_log) | ||
|
||
logger.info(f"Tracking log {tracking_log.name} emitted to the event bus.") |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,69 @@ | ||
""" | ||
Test the async routing backend. | ||
""" | ||
|
||
import json | ||
from datetime import datetime | ||
from unittest import TestCase | ||
from unittest.mock import patch | ||
|
||
from django.test import override_settings | ||
from openedx_events.analytics.data import TrackingLogData | ||
|
||
from eventtracking.backends.event_bus import EventBusRoutingBackend | ||
|
||
|
||
class TestAsyncRoutingBackend(TestCase): | ||
""" | ||
Test the async routing backend. | ||
""" | ||
|
||
def setUp(self): | ||
super().setUp() | ||
self.sample_event = { | ||
"name": "sample_event", | ||
"data": {"foo": "bar"}, | ||
"timestamp": "2020-01-01T12:12:12.000000+00:00", | ||
"context": {"baz": "qux"}, | ||
} | ||
|
||
@patch("eventtracking.backends.event_bus.EventBusRoutingBackend.send") | ||
def test_successful_send(self, mocked_send_event): | ||
backend = EventBusRoutingBackend() | ||
backend.send(self.sample_event) | ||
mocked_send_event.assert_called_once_with(self.sample_event) | ||
|
||
@override_settings( | ||
SEND_TRACKING_EVENT_EMITTED_SIGNAL=True, | ||
EVENT_BUS_TRACKING_LOGS=["sample_event"], | ||
) | ||
@patch("eventtracking.backends.event_bus.TRACKING_EVENT_EMITTED.send_event") | ||
def test_successful_send_event(self, mock_send_event): | ||
backend = EventBusRoutingBackend() | ||
backend.send(self.sample_event) | ||
|
||
mock_send_event.assert_called() | ||
self.assertDictContainsSubset( | ||
{ | ||
"tracking_log": TrackingLogData( | ||
name=self.sample_event["name"], | ||
timestamp=datetime.strptime( | ||
self.sample_event["timestamp"], "%Y-%m-%dT%H:%M:%S.%f%z" | ||
), | ||
data=json.dumps(self.sample_event["data"]), | ||
context=json.dumps(self.sample_event["context"]), | ||
) | ||
}, | ||
mock_send_event.call_args.kwargs, | ||
) | ||
|
||
@patch( | ||
"eventtracking.backends.event_bus.SEND_TRACKING_EVENT_EMITTED_SIGNAL.is_enabled" | ||
) | ||
@patch("eventtracking.backends.event_bus.TRACKING_EVENT_EMITTED.send_event") | ||
def test_event_is_disabled(self, mock_send_event, mock_is_enabled): | ||
mock_is_enabled.return_value = False | ||
backend = EventBusRoutingBackend() | ||
backend.send(self.sample_event) | ||
mock_is_enabled.assert_called_once() | ||
mock_send_event.assert_not_called() |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,19 @@ | ||
""" | ||
This module contains various configuration settings via | ||
waffle switches for the event-tracking app. | ||
""" | ||
|
||
from edx_toggles.toggles import SettingToggle | ||
|
||
# .. toggle_name: SEND_TRACKING_EVENT_EMITTED_SIGNAL | ||
# .. toggle_implementation: SettingToggle | ||
# .. toggle_default: False | ||
# .. toggle_description: When True, the system will publish `TRACKING_EVENT_EMITTED` signals to the event bus. The | ||
# `TRACKING_EVENT_EMITTED` signal is emit when a tracking log is emitted. | ||
# .. toggle_use_cases: opt_in | ||
# .. toggle_creation_date: 2023-10-26 | ||
SEND_TRACKING_EVENT_EMITTED_SIGNAL = SettingToggle( | ||
'SEND_TRACKING_EVENT_EMITTED_SIGNAL', | ||
default=False, | ||
module_name=__name__ | ||
) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,61 @@ | ||
""" | ||
This module contains the handlers for signals emitted by the analytics app. | ||
""" | ||
import json | ||
import logging | ||
|
||
from django.dispatch import receiver | ||
from openedx_events.analytics.signals import TRACKING_EVENT_EMITTED | ||
from openedx_events.tooling import SIGNAL_PROCESSED_FROM_EVENT_BUS | ||
|
||
from eventtracking.backends.event_bus import EventBusRoutingBackend | ||
from eventtracking.processors.exceptions import EventEmissionExit | ||
from eventtracking.tracker import get_tracker | ||
|
||
logger = logging.getLogger(__name__) | ||
|
||
|
||
@receiver(TRACKING_EVENT_EMITTED) | ||
def send_tracking_log_to_backends( | ||
sender, signal, **kwargs | ||
): # pylint: disable=unused-argument | ||
""" | ||
Listen for the TRACKING_EVENT_EMITTED signal and send the event to the enabled backends. | ||
The process is the following: | ||
1. Unserialize the tracking log from the signal. | ||
2. Get the tracker instance to get the enabled backends (mongo, event_bus, logger, etc). | ||
3. Get the event bus backends that are the interested in the signals (multiple can be configured). | ||
4. Transform the event with the configured processors. | ||
5. Send the transformed event to the different event bus backends. | ||
This allows us to only send the tracking log to the event bus once and the event bus will send | ||
the transformed event to the different configured backends. | ||
""" | ||
if not kwargs.get(SIGNAL_PROCESSED_FROM_EVENT_BUS, False): | ||
logger.debug("Event received from a non-event bus backend, skipping...") | ||
return | ||
tracking_log = kwargs.get("tracking_log") | ||
|
||
event = { | ||
"name": tracking_log.name, | ||
"timestamp": tracking_log.timestamp, | ||
"data": json.loads(tracking_log.data), | ||
"context": json.loads(tracking_log.context), | ||
} | ||
|
||
tracker = get_tracker() | ||
|
||
engines = { | ||
name: engine | ||
for name, engine in tracker.backends.items() | ||
if isinstance(engine, EventBusRoutingBackend) | ||
} | ||
for name, engine in engines.items(): | ||
try: | ||
processed_event = engine.process_event(event) | ||
logger.info('Successfully processed event "{}"'.format(event["name"])) | ||
engine.send_to_backends(processed_event.copy()) | ||
except EventEmissionExit: | ||
logger.info("[EventEmissionExit] skipping event {}".format(event["name"])) |
Oops, something went wrong.