Skip to content

Commit

Permalink
test: add test for event bus backend
Browse files Browse the repository at this point in the history
fix: only generate metadata

fix: assert for subset ignoring runtime generated only metadata

chore: quality fixes

chore: improve docstring

fix: convert timestamp str to datetime object

test: add test for str datetime

fix: rename tracking log event emitted

fix: rename tracking log event emitted config

fix: rename tracking event

chore: handle comments

feat: add settings for allowed events in the event bus

chore: use opt_in for tracking logs event bus toggle

fix: serialize tracking log data and context dates as logger

chore: use send_event instead of custom metadata

test: update unit test

docs: add event bus routing documentation
  • Loading branch information
Ian2012 committed Feb 13, 2024
1 parent 31c1b54 commit fa5b0ac
Show file tree
Hide file tree
Showing 10 changed files with 209 additions and 68 deletions.
55 changes: 55 additions & 0 deletions README.rst
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,61 @@ An example configuration for ``AsyncRoutingBackend`` is provided below::
}


Event Bus Routing
-----------------

``event-tracking`` provides a solution for routing events to the Event Bus
using the ``EventBusBackend``. It extends ``RoutingBackend`` but sends events
to the Event Bus.

It can:

* Process event through the configured processors.
* If the event is allowed via `EVENT_BUS_TRACKING_LOGS`, send it to the Event Bus.

Make sure to enable the setting: ``SEND_TRACKING_EVENT_EMITTED_SIGNAL`` to allow the
``EventBusBackend`` to send events to the Event Bus.

An example configuration for ``EventBusBackend`` is provided below::

EVENT_TRACKING_BACKENDS = {
'xapi': {
'ENGINE': 'eventtracking.backends.event_bus.EventBusBackend',
'OPTIONS': {
'backend_name': 'xapi',
'processors': [
{
'ENGINE': 'eventtracking.processors.regex_filter.RegexFilter',
'OPTIONS':{
'filter_type': 'allowlist',
'regular_expressions': [
'edx.course.enrollment.activated',
'edx.course.enrollment.deactivated',
]
}
}
],
'backends': {
'xapi': {
'ENGINE': 'dummy.backend.engine',
'OPTIONS': {
...
}
}
},
},
},
'tracking_logs': {
...
}
...
}

EVENT_BUS_TRACKING_LOGS = [
'edx.course.enrollment.activated',
'edx.course.enrollment.deactivated',
]

Roadmap
-------

Expand Down
8 changes: 8 additions & 0 deletions doc/user_guide/api/eventtracking.backends.rst
Original file line number Diff line number Diff line change
Expand Up @@ -42,3 +42,11 @@ eventtracking.backends.segment
:undoc-members:
:show-inheritance:


eventtracking.backends.event_bus
------------------------------

.. automodule:: eventtracking.backends.event_bus
:members:
:undoc-members:
:show-inheritance:
55 changes: 24 additions & 31 deletions eventtracking/backends/event_bus.py
Original file line number Diff line number Diff line change
@@ -1,59 +1,52 @@
"""Event tracker backend that emits events to the event-bus."""

import json
import logging
from datetime import datetime

from django.conf import settings
from openedx_events.analytics.data import TrackingLogData
from openedx_events.analytics.signals import TRACKING_EVENT_EMITTED

from eventtracking.backends.logger import DateTimeJSONEncoder
from eventtracking.backends.routing import RoutingBackend
from eventtracking.config import SEND_TRACKING_EVENT_EMITTED_SIGNAL
from openedx_events.data import EventsMetadata
from openedx_events.event_bus import get_producer
from attrs import asdict
import logging

logger = logging.getLogger(__name__)

EVENT_BUS_SOURCE = "openedx/eventtracking"

class EventBusRoutingBackend(RoutingBackend):
"""
Event tracker backend that emits an Open edX public signal.
Event tracker backend for the event bus.
"""

def send(self, event):
"""
Emit the TRACKING_EVENT_EMITTED Open edX public signal to allow
other apps to listen for tracking events.
Send the tracking log event to the event bus by emitting the
TRACKING_EVENT_EMITTED signal using custom metadata.
"""
if not SEND_TRACKING_EVENT_EMITTED_SIGNAL.is_enabled():
return

data = json.dumps(event.get("data"))
context = json.dumps(event.get("context"))
name = event.get("name")

if name not in getattr(settings, "EVENT_BUS_TRACKING_LOGS", []):
return

data = json.dumps(event.get("data"), cls=DateTimeJSONEncoder)
context = json.dumps(event.get("context"), cls=DateTimeJSONEncoder)

timestamp = event.get("timestamp")

tracking_log=TrackingLogData(
if isinstance(timestamp, str):
timestamp = datetime.strptime(timestamp, "%Y-%m-%dT%H:%M:%S.%f%z")

tracking_log = TrackingLogData(
name=event.get("name"),
timestamp=event.get("timestamp"),
timestamp=timestamp,
data=data,
context=context,
)
TRACKING_EVENT_EMITTED.send_event(tracking_log=tracking_log)

logger.info(f"Sending tracking event emitted signal for event for {tracking_log.name}")
get_producer().send(
signal=TRACKING_EVENT_EMITTED,
topic="analytics",
event_key_field="tracking_log.name",
event_data={"tracking_log": tracking_log},
event_metadata=generate_signal_metadata()
)


def generate_signal_metadata():
"""
Generate the metadata for the signal with a custom source.
"""
metadata = TRACKING_EVENT_EMITTED.generate_signal_metadata()
medata_dict = asdict(metadata)
medata_dict["source"] = EVENT_BUS_SOURCE
metadata = EventsMetadata(**medata_dict)
return metadata
logger.info(f"Tracking log {tracking_log.name} emitted to the event bus.")
4 changes: 2 additions & 2 deletions eventtracking/backends/routing.py
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ def process_event(self, event):
processed_event = modified_event
except EventEmissionExit:
raise
except Exception: # pylint: disable=broad-except
except Exception:
LOG.exception(
'Failed to execute processor: %s', str(processor)
)
Expand All @@ -142,7 +142,7 @@ def send_to_backends(self, event):
LOG.info('[send_to_backends] Failed to send edx event "%s" to "%s" backend. "%s" backend has'
' not been enabled, [%s]', event["name"], name, name, repr(exc)
)
except Exception: # pylint: disable=broad-except
except Exception:
LOG.exception(
'Unable to send edx event "%s" to backend: %s', event["name"], name
)
60 changes: 44 additions & 16 deletions eventtracking/backends/tests/test_event_bus.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,18 @@
"""
Test the async routing backend.
"""

import json
from datetime import datetime
from unittest import TestCase
from unittest.mock import patch

from unittest.mock import sentinel, patch
from eventtracking.backends.event_bus import EventBusRoutingBackend
from django.test import override_settings
from openedx_events.analytics.data import TrackingLogData

from eventtracking.backends.event_bus import EventBusRoutingBackend


class TestAsyncRoutingBackend(TestCase):
"""
Test the async routing backend.
Expand All @@ -15,27 +21,49 @@ class TestAsyncRoutingBackend(TestCase):
def setUp(self):
super().setUp()
self.sample_event = {
'name': str(sentinel.name),
'data': 'data',
'timestamp': '2020-01-01T12:12:12.000000+00:00',
'context': {},
"name": "sample_event",
"data": {"foo": "bar"},
"timestamp": "2020-01-01T12:12:12.000000+00:00",
"context": {"baz": "qux"},
}

@patch('eventtracking.backends.event_bus.EventBusRoutingBackend.send')
@patch("eventtracking.backends.event_bus.EventBusRoutingBackend.send")
def test_successful_send(self, mocked_send_event):
backend = EventBusRoutingBackend()
backend.send(self.sample_event)
mocked_send_event.assert_called_once_with(self.sample_event)

@patch('eventtracking.backends.event_bus.TRACKING_EVENT_EMITTED.send_event')
def test_successful_send_event(self, mocked_send_event):
@override_settings(
SEND_TRACKING_EVENT_EMITTED_SIGNAL=True,
EVENT_BUS_TRACKING_LOGS=["sample_event"],
)
@patch("eventtracking.backends.event_bus.TRACKING_EVENT_EMITTED.send_event")
def test_successful_send_event(self, mock_send_event):
backend = EventBusRoutingBackend()
backend.send(self.sample_event)
mocked_send_event.assert_called_once_with(
tracking_log=TrackingLogData(
name=self.sample_event['name'],
timestamp=self.sample_event['timestamp'],
data=self.sample_event['data'],
context=self.sample_event['context']
)

mock_send_event.assert_called()
self.assertDictContainsSubset(
{
"tracking_log": TrackingLogData(
name=self.sample_event["name"],
timestamp=datetime.strptime(
self.sample_event["timestamp"], "%Y-%m-%dT%H:%M:%S.%f%z"
),
data=json.dumps(self.sample_event["data"]),
context=json.dumps(self.sample_event["context"]),
)
},
mock_send_event.call_args.kwargs,
)

@patch(
"eventtracking.backends.event_bus.SEND_TRACKING_EVENT_EMITTED_SIGNAL.is_enabled"
)
@patch("eventtracking.backends.event_bus.TRACKING_EVENT_EMITTED.send_event")
def test_event_is_disabled(self, mock_send_event, mock_is_enabled):
mock_is_enabled.return_value = False
backend = EventBusRoutingBackend()
backend.send(self.sample_event)
mock_is_enabled.assert_called_once()
mock_send_event.assert_not_called()
11 changes: 8 additions & 3 deletions eventtracking/config.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
"""
This module contains various configuration settings via
waffle switches for the Certificates app.
waffle switches for the event-tracking app.
"""

from edx_toggles.toggles import SettingToggle
Expand All @@ -10,5 +10,10 @@
# .. toggle_default: False
# .. toggle_description: When True, the system will publish `TRACKING_EVENT_EMITTED` signals to the event bus. The
# `TRACKING_EVENT_EMITTED` signal is emit when a tracking log is emitted.
# .. toggle_use_cases: publish
SEND_TRACKING_EVENT_EMITTED_SIGNAL = SettingToggle('SEND_TRACKING_EVENT_EMITTED_SIGNAL', default=True, module_name=__name__)
# .. toggle_use_cases: opt_in
# .. toggle_creation_date: 2023-10-26
SEND_TRACKING_EVENT_EMITTED_SIGNAL = SettingToggle(
'SEND_TRACKING_EVENT_EMITTED_SIGNAL',
default=False,
module_name=__name__
)
2 changes: 0 additions & 2 deletions eventtracking/django/apps.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,5 +21,3 @@ def ready(self):
# pylint: disable=import-outside-toplevel
from eventtracking.django.django_tracker import override_default_tracker
override_default_tracker()

import eventtracking.handlers # pylint: disable=unused-import
32 changes: 28 additions & 4 deletions requirements/base.txt
Original file line number Diff line number Diff line change
Expand Up @@ -28,36 +28,53 @@ click==8.1.7
# click-didyoumean
# click-plugins
# click-repl
# code-annotations
# edx-django-utils
click-didyoumean==0.3.0
# via celery
click-plugins==1.1.1
# via celery
click-repl==0.3.0
# via celery
code-annotations==1.6.0
# via edx-toggles
django==3.2.24
# via
# -c https://raw.githubusercontent.com/edx/edx-lint/master/edx_lint/files/common_constraints.txt
# -r requirements/base.in
# django-crum
# django-waffle
# edx-django-utils
# edx-toggles
# openedx-events
django-crum==0.7.9
# via edx-django-utils
# via
# edx-django-utils
# edx-toggles
django-waffle==4.1.0
# via edx-django-utils
# via
# edx-django-utils
# edx-toggles
edx-django-utils==5.10.1
# via -r requirements/base.in
# via
# -r requirements/base.in
# edx-toggles
# openedx-events
edx-opaque-keys[django]==2.5.1
# via openedx-events
edx-toggles==5.1.1
# via -r requirements/base.in
fastavro==1.9.3
# via openedx-events
jinja2==3.1.3
# via code-annotations
kombu==5.3.5
# via celery
markupsafe==2.1.5
# via jinja2
newrelic==9.6.0
# via edx-django-utils
openedx-events==9.4.0
openedx-events==9.5.1
# via -r requirements/base.in
pbr==6.0.0
# via stevedore
Expand All @@ -75,10 +92,14 @@ pynacl==1.5.0
# via edx-django-utils
python-dateutil==2.8.2
# via celery
python-slugify==8.0.4
# via code-annotations
pytz==2024.1
# via
# -r requirements/base.in
# django
pyyaml==6.0.1
# via code-annotations
six==1.16.0
# via
# -r requirements/base.in
Expand All @@ -87,8 +108,11 @@ sqlparse==0.4.4
# via django
stevedore==5.1.0
# via
# code-annotations
# edx-django-utils
# edx-opaque-keys
text-unidecode==1.3
# via python-slugify
typing-extensions==4.9.0
# via
# asgiref
Expand Down
Loading

0 comments on commit fa5b0ac

Please sign in to comment.