Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat!: new event bus config format #272

Merged
merged 24 commits into from
Oct 6, 2023
Merged
Show file tree
Hide file tree
Changes from 23 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions CHANGELOG.rst
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,13 @@ Change Log

Unreleased
----------

[9.0.0] - 2023-10-04
--------------------
Changed
~~~~~~~
* Re-licensed this repository from AGPL 3.0 to Apache 2.0
rgraber marked this conversation as resolved.
Show resolved Hide resolved
* **Breaking change**: Restructured EVENT_BUS_PRODUCER_CONFIG

[8.9.0] - 2023-10-04
--------------------
Expand Down
65 changes: 65 additions & 0 deletions docs/decisions/0014-new-event-bus-producer-config.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
13. Change event producer config settings
#########################################

Status
******

**Accepted**

Context
*******

In a previous ADR, we set the structure for the event bus publishing configuration to a dictionary like the following:
robrap marked this conversation as resolved.
Show resolved Hide resolved

.. code-block:: python

{ 'org.openedx.content_authoring.xblock.published.v1': [
{'topic': 'content-authoring-xblock-lifecycle', 'event_key_field': 'xblock_info.usage_key', 'enabled': True},
{'topic': 'content-authoring-xblock-published', 'event_key_field': 'xblock_info.usage_key', 'enabled': False},
],
'org.openedx.content_authoring.xblock.deleted.v1': [
{'topic': 'content-authoring-xblock-lifecycle', 'event_key_field': 'xblock_info.usage_key', 'enabled': True},
],
}

While attempting to implement this for edx-platform, we came across some problems with using this structure. In particular, it results in ambiguity
because maintainers can accidentally add something like
``{'topic': 'content-authoring-xblock-lifecycle', 'event_key_field': 'xblock_info.usage_key', 'enabled': True}`` and
``{'topic': 'content-authoring-xblock-lifecycle', 'event_key_field': 'xblock_info.usage_key', 'enabled': False}`` to the same event_type.
Moreover, enabling/disabling an existing event/topic pair requires reaching into the structure, searching for the dictionary with the correct topic, and modifying
the existing object, which is awkward.

This ADR aims to propose a new structure that will provide greater flexibility in using this configuration.

Decision
********

The new EVENT_BUS_PRODUCER_CONFIG will have the following configuration format:

.. code-block:: python

# .. setting_name: EVENT_BUS_PRODUCER_CONFIG
# .. setting_default: {}
# .. setting_description: Dictionary of event_types to dictionaries for topic related configuration.
# Each topic configuration dictionary uses the topic as a key and contains a flag called `enabled`
# denoting whether the event will be and `event_key_field` which is a period-delimited string path
# to event data field to use as event key.
# Note: The topic names should not include environment prefix as it will be dynamically added based on
# EVENT_BUS_TOPIC_PREFIX setting.
EVENT_BUS_PRODUCER_CONFIG = {
'org.openedx.content_authoring.xblock.published.v1': {
'content-authoring-xblock-lifecycle': {'event_key_field': 'xblock_info.usage_key', 'enabled': False},
'content-authoring-xblock-published': {'event_key_field': 'xblock_info.usage_key', 'enabled': True}
},
'org.openedx.content_authoring.xblock.deleted.v1': {
'content-authoring-xblock-lifecycle': {'event_key_field': 'xblock_info.usage_key', 'enabled': True},
},
}

A new ``merge_publisher_configs`` method will be added to openedx-events.event_bus to make it easier to correctly determine the config map from multiple sources.
robrap marked this conversation as resolved.
Show resolved Hide resolved

Consequences
************

* As long as the implementing IDA calls ``merge_publisher_configs``, maintainers can add existing topics to new event_types without having to recreate the whole dictionary
* There is no ambiguity about whether an event/topic pair is enabled or disabled
1 change: 1 addition & 0 deletions docs/decisions/index.rst
Original file line number Diff line number Diff line change
Expand Up @@ -18,3 +18,4 @@ Architectural Decision Records (ADRs)
0011-depending-on-multiple-event-bus-implementations
0012-producing-to-event-bus-via-settings
0013-special-exam-submission-and-review-events
0014-new-event-bus-producer-config
2 changes: 1 addition & 1 deletion openedx_events/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,4 +5,4 @@
more information about the project.
"""

__version__ = "8.9.0"
__version__ = "9.0.0"
55 changes: 39 additions & 16 deletions openedx_events/apps.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,16 +12,22 @@

def general_signal_handler(sender, signal, **kwargs): # pylint: disable=unused-argument
"""
Signal handler for publishing events to configured event bus.
Signal handler for producing events to configured event bus.
"""
configurations = getattr(settings, "EVENT_BUS_PRODUCER_CONFIG", {}).get(signal.event_type, ())
event_type_producer_configs = getattr(settings, "EVENT_BUS_PRODUCER_CONFIG", {}).get(signal.event_type, {})
# event_type_producer_configs should look something like
# {
# "topic_a": { "event_key_field": "my.key.field", "enabled": True },
# "topic_b": { "event_key_field": "my.key.field", "enabled": False }
# }"
event_data = {key: kwargs.get(key) for key in signal.init_data}
for configuration in configurations:
if configuration["enabled"]:

for topic in event_type_producer_configs.keys():
if event_type_producer_configs[topic]["enabled"] is True:
Comment on lines +25 to +26
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
for topic in event_type_producer_configs.keys():
if event_type_producer_configs[topic]["enabled"] is True:
for topic, config in event_type_producer_configs.items():
if config["enabled"] is True:

get_producer().send(
signal=signal,
topic=configuration["topic"],
event_key_field=configuration["event_key_field"],
topic=topic,
event_key_field=event_type_producer_configs[topic]["event_key_field"],
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
event_key_field=event_type_producer_configs[topic]["event_key_field"],
event_key_field=config["event_key_field"],

event_data=event_data,
event_metadata=kwargs["metadata"],
)
Expand All @@ -34,47 +40,64 @@ class OpenedxEventsConfig(AppConfig):

name = "openedx_events"

def _get_validated_signal_config(self, event_type, configurations):
def _get_validated_signal_config(self, event_type, configuration):
"""
Validate signal configuration format.

Example expected signal configuration:
{
"topic_a": { "event_key_field": "my.key.field", "enabled": True },
"topic_b": { "event_key_field": "my.key.field", "enabled": False }
}

Raises:
ProducerConfigurationError: If configuration is not valid.
"""
if not isinstance(configurations, list) and not isinstance(configurations, tuple):
if not isinstance(configuration, dict):
raise ProducerConfigurationError(
event_type=event_type,
message="Configuration for event_types should be a list or a tuple of dictionaries"
message="Configuration for event_types should be a dict"
)
try:
signal = OpenEdxPublicSignal.get_signal_by_type(event_type)
except KeyError as exc:
raise ProducerConfigurationError(message=f"No OpenEdxPublicSignal of type: '{event_type}'.") from exc
for configuration in configurations:
if not isinstance(configuration, dict):
for _, topic_configuration in configuration.items():
if not isinstance(topic_configuration, dict):
raise ProducerConfigurationError(
event_type=event_type,
message="One of the configuration object is not a dictionary"
message="One of the configuration objects is not a dictionary"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Imagine everything is working fine, and someone adds a new bad config detail. Do we want that to break all working publishing? Seems scary. Something to consider.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point. I'm going to punt on this since it requires a little more thought than I want to put it while trying to land this but it will be a fast follow.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[punt]

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Isn't it better to fail early rather than allowing a back config detail? This error will stop the application from starting.

)
expected_keys = {"topic": str, "event_key_field": str, "enabled": bool}
expected_keys = {"event_key_field": str, "enabled": bool}
for expected_key, expected_type in expected_keys.items():
if expected_key not in configuration:
if expected_key not in topic_configuration.keys():
raise ProducerConfigurationError(
event_type=event_type,
message=f"One of the configuration object is missing '{expected_key}' key."
)
if not isinstance(configuration[expected_key], expected_type):
if not isinstance(topic_configuration[expected_key], expected_type):
raise ProducerConfigurationError(
event_type=event_type,
message=(f"Expected type: {expected_type} for '{expected_key}', "
f"found: {type(configuration[expected_key])}")
f"found: {type(topic_configuration[expected_key])}")
)
return signal

def ready(self):
"""
Read `EVENT_BUS_PRODUCER_CONFIG` setting and connects appropriate handlers to the events based on it.

Example expected configuration:
{
"org.openedx.content_authoring.xblock.deleted.v1" : {
"topic_a": { "event_key_field": "xblock_info.usage_key", "enabled": True },
"topic_b": { "event_key_field": "xblock_info.usage_key", "enabled": False }
},
"org.openedx.content_authoring.course.catalog_info.changed.v1" : {
"topic_c": {"event_key_field": "course_info.course_key", "enabled": True }
}
}

Raises:
ProducerConfigurationError: If `EVENT_BUS_PRODUCER_CONFIG` is not valid.
"""
Expand Down
26 changes: 26 additions & 0 deletions openedx_events/event_bus/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
``EVENT_BUS_CONSUMER``.
"""

import copy
import warnings
from abc import ABC, abstractmethod
from functools import lru_cache
Expand Down Expand Up @@ -183,3 +184,28 @@ def make_single_consumer(*, topic: str, group_id: str,
def _reset_state(sender, **kwargs): # pylint: disable=unused-argument
"""Reset caches when settings change during unit tests."""
get_producer.cache_clear()


def merge_producer_configs(producer_config_original, producer_config_overrides):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

NIT: renaming these variables to original and overrides would make it easier to understand as producer_config prefix should not be needed.

"""
Merge two EVENT_BUS_PRODUCER_CONFIG maps.

Arguments:
producer_config_original: An EVENT_BUS_PRODUCER_CONFIG-structured map
producer_config_overrides: An EVENT_BUS_PRODUCER_CONFIG-structured map

Returns:
A new EVENT_BUS_PRODUCER_CONFIG map created by combining the two maps. All event_type/topic pairs in
producer_config_overrides are added to the producer_config_original. If there is a conflict on whether a
particular event_type/topic pair is enabled, producer_config_overrides wins out.
"""
combined = copy.deepcopy(producer_config_original)
for event_type, event_type_config_overrides in producer_config_overrides.items():
event_type_config_combined = combined.get(event_type, {})
for topic, topic_config_overrides in event_type_config_overrides.items():
topic_config_combined = event_type_config_combined.get(topic, {})
topic_config_combined['enabled'] = topic_config_overrides['enabled']
topic_config_combined['event_key_field'] = topic_config_overrides['event_key_field']
event_type_config_combined[topic] = topic_config_combined
combined[event_type] = event_type_config_combined
return combined
61 changes: 60 additions & 1 deletion openedx_events/event_bus/tests/test_loader.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,15 @@
Tests for event bus implementation loader.
"""

import copy
import warnings
from contextlib import contextmanager
from unittest import TestCase

from django.test import override_settings

from openedx_events.data import EventsMetadata
from openedx_events.event_bus import _try_load, get_producer, make_single_consumer
from openedx_events.event_bus import _try_load, get_producer, make_single_consumer, merge_producer_configs
from openedx_events.learning.signals import SESSION_LOGIN_COMPLETED


Expand Down Expand Up @@ -126,3 +127,61 @@ def test_default_does_nothing(self):
with assert_warnings([]):
# Nothing thrown, no warnings.
assert consumer.consume_indefinitely() is None


class TestSettings(TestCase):
def test_merge_configs(self):
dict_a = {
'event_type_0': {
'topic_a': {'event_key_field': 'field', 'enabled': True},
'topic_b': {'event_key_field': 'field', 'enabled': True}
},
'event_type_1': {
'topic_c': {'event_key_field': 'field', 'enabled': True},
}
}
# for ensuring we didn't change the original dict
dict_a_copy = copy.deepcopy(dict_a)
dict_b = {
'event_type_0': {
# disable an existing event/topic pairing
'topic_a': {'event_key_field': 'field', 'enabled': False},
# add a new topic to an existing topic
'topic_d': {'event_key_field': 'field', 'enabled': True},
},
# add a new event_type
'event_type_2': {
'topic_e': {'event_key_field': 'field', 'enabled': True},
}
}
dict_b_copy = copy.deepcopy(dict_b)
result = merge_producer_configs(dict_a, dict_b)
self.assertDictEqual(result, {
'event_type_0': {
'topic_a': {'event_key_field': 'field', 'enabled': False},
'topic_b': {'event_key_field': 'field', 'enabled': True},
'topic_d': {'event_key_field': 'field', 'enabled': True},
},
'event_type_1': {
'topic_c': {'event_key_field': 'field', 'enabled': True},
},
'event_type_2': {
'topic_e': {'event_key_field': 'field', 'enabled': True},
}
})
self.assertDictEqual(dict_a, dict_a_copy)
self.assertDictEqual(dict_b, dict_b_copy)

def test_merge_configs_with_empty(self):
dict_a = {
'event_type_0': {
'topic_a': {'event_key_field': 'field', 'enabled': True},
'topic_b': {'event_key_field': 'field', 'enabled': True}
},
'event_type_1': {
'topic_c': {'event_key_field': 'field', 'enabled': True},
}
}
dict_b = {}
result = merge_producer_configs(dict_a, dict_b)
self.assertDictEqual(result, dict_a)
25 changes: 14 additions & 11 deletions openedx_events/tests/test_producer_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ def setUp(self) -> None:
@patch('openedx_events.apps.get_producer')
def test_enabled_disabled_events(self, mock_producer):
"""
Check whether XBLOCK_PUBLISHED is connected to the handler and the handler only publishes enabled events.
Check whether XBLOCK_PUBLISHED is connected to the handler and the handler only produces enabled events.

Args:
mock_producer: mock get_producer to inspect the arguments.
Expand All @@ -46,12 +46,12 @@ def test_enabled_disabled_events(self, mock_producer):
# check that call_args_list only consists of enabled topics.
call_args = mock_send.send.call_args_list[0][1]
self.assertDictContainsSubset(
{'topic': 'content-authoring-xblock-lifecycle', 'event_key_field': 'xblock_info.usage_key'},
{'topic': 'enabled_topic_a', 'event_key_field': 'xblock_info.usage_key'},
call_args
)
call_args = mock_send.send.call_args_list[1][1]
self.assertDictContainsSubset(
{'topic': 'content-authoring-all-status', 'event_key_field': 'xblock_info.usage_key'},
{'topic': 'enabled_topic_b', 'event_key_field': 'xblock_info.usage_key'},
call_args
)

Expand All @@ -78,31 +78,34 @@ def test_configuration_is_validated(self):
with pytest.raises(ProducerConfigurationError, match="should be a dictionary"):
apps.get_app_config("openedx_events").ready()

with override_settings(EVENT_BUS_PRODUCER_CONFIG={"invalid.event.type": []}):
with override_settings(EVENT_BUS_PRODUCER_CONFIG={"invalid.event.type": {}}):
with pytest.raises(ProducerConfigurationError, match="No OpenEdxPublicSignal of type"):
apps.get_app_config("openedx_events").ready()

with override_settings(EVENT_BUS_PRODUCER_CONFIG={"org.openedx.content_authoring.xblock.deleted.v1": ""}):
with pytest.raises(ProducerConfigurationError, match="should be a list or a tuple"):
with pytest.raises(ProducerConfigurationError, match="should be a dict"):
apps.get_app_config("openedx_events").ready()

with override_settings(EVENT_BUS_PRODUCER_CONFIG={"org.openedx.content_authoring.xblock.deleted.v1": [""]}):
with pytest.raises(ProducerConfigurationError, match="object is not a dictionary"):
with override_settings(EVENT_BUS_PRODUCER_CONFIG={"org.openedx.content_authoring.xblock.deleted.v1":
{"topic": ""}}):
with pytest.raises(ProducerConfigurationError, match="One of the configuration objects is not a"
" dictionary"):
apps.get_app_config("openedx_events").ready()

with override_settings(
EVENT_BUS_PRODUCER_CONFIG={
"org.openedx.content_authoring.xblock.deleted.v1": [{"topic": "some", "enabled": True}]
"org.openedx.content_authoring.xblock.deleted.v1": {"topic": {"enabled": True}}
}
):
with pytest.raises(ProducerConfigurationError, match="missing 'event_key_field' key."):
apps.get_app_config("openedx_events").ready()

with override_settings(
EVENT_BUS_PRODUCER_CONFIG={
"org.openedx.content_authoring.xblock.deleted.v1": [
{"topic": "some", "enabled": 1, "event_key_field": "some"}
]
"org.openedx.content_authoring.xblock.deleted.v1":
{
"some": {"enabled": 1, "event_key_field": "some"}
}
}
):
with pytest.raises(
Expand Down
Loading