Skip to content
This repository has been archived by the owner on Apr 3, 2024. It is now read-only.

Commit

Permalink
feat: allow to extend event sink
Browse files Browse the repository at this point in the history
  • Loading branch information
Ian2012 committed Aug 17, 2023
1 parent f46bfc7 commit e74568c
Show file tree
Hide file tree
Showing 8 changed files with 320 additions and 3 deletions.
2 changes: 2 additions & 0 deletions event_sink_clickhouse/apps.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,3 +48,5 @@ def ready(self):
super().ready()

from . import tasks # pylint: disable=import-outside-toplevel, unused-import
from event_sink_clickhouse.sinks import user_profile_sink
from event_sink_clickhouse import signals
13 changes: 10 additions & 3 deletions event_sink_clickhouse/settings/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,15 @@ def plugin_settings(settings):
# http://foo.openedx.org:8123/ . Note that we only support the ClickHouse HTTP interface
# to avoid pulling in more dependencies to the platform than necessary.
"url": "http://clickhouse:8123",
"username": "changeme",
"password": "changeme",
"username": "ch_cms",
"password": "TYreGozgtDG3vkoWPUHVVM6q",
"database": "event_sink",
"timeout_secs": 3,
"timeout_secs": 5,
}

settings.EVENT_SINK_CLICKHOUSE_MODEL_CONFIG = {
"user_profile": {
"module": "common.djangoapps.student.models",
"model": "UserProfile",
}
}
14 changes: 14 additions & 0 deletions event_sink_clickhouse/signals.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,10 @@
"""
Signal handler functions, mapped to specific signals in apps.py.
"""
from event_sink_clickhouse.sinks.utils import get_model

from django.db.models.signals import post_save
from django.dispatch import receiver


def receive_course_publish(sender, course_key, **kwargs): # pylint: disable=unused-argument # pragma: no cover
Expand All @@ -11,3 +15,13 @@ def receive_course_publish(sender, course_key, **kwargs): # pylint: disable=unu
from event_sink_clickhouse.tasks import dump_course_to_clickhouse # pylint: disable=import-outside-toplevel

dump_course_to_clickhouse.delay(str(course_key))

@receiver(post_save, sender=get_model("user_profile"))
def on_user_profile_updated(sender, instance, **kwargs): # pylint: disable=unused-argument # pragma: no cover
"""
Receives post save signal and queues the dump job.
"""
# import here, because signal is registered at startup, but items in tasks are not yet able to be loaded
from event_sink_clickhouse.tasks import dump_user_profile_to_clickhouse # pylint: disable=import-outside-toplevel

dump_user_profile_to_clickhouse.delay(instance.id)
209 changes: 209 additions & 0 deletions event_sink_clickhouse/sinks/base_sink.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,18 @@
"""
Base classes for event sinks
"""
import csv
import datetime
import io
import json
import uuid
from collections import namedtuple

import requests
from django.conf import settings
from django.utils import timezone

from event_sink_clickhouse.sinks.utils import get_model

ClickHouseAuth = namedtuple("ClickHouseAuth", ["username", "password"])

Expand All @@ -14,6 +21,11 @@ class BaseSink:
"""
Base class for ClickHouse event sink, allows overwriting of default settings
"""

CLICKHOUSE_BULK_INSERT_PARAMS = {
"input_format_allow_errors_num": 1,
"input_format_allow_errors_ratio": 0.1,
}
def __init__(self, connection_overrides, log):
self.log = log
self.ch_url = settings.EVENT_SINK_CLICKHOUSE_BACKEND_CONFIG["url"]
Expand Down Expand Up @@ -62,3 +74,200 @@ def _send_clickhouse_request(self, request, expected_insert_rows=None):
# ClickHouse can be configured not to return the metadata / summary we check above for
# performance reasons. It's not critical, so we eat those here.
return response


class ModelBaseSink(BaseSink):
"""
Base class for ClickHouse event sink, allows overwriting of default settings
This class is used for the model based event sink, which uses the Django ORM to write
events to ClickHouse.
"""
fields = None
unique_key = None
clickhouse_table_name = None
queryset = None
name = None
timestamp_field = None
serializer_class = None
model = None

def __init__(self, connection_overrides, log):
super().__init__(connection_overrides, log)

required_fields = [
self.model,
self.fields,
self.clickhouse_table_name,
self.timestamp_field,
self.unique_key,
self.name,
self.serializer_class,
]

if not all(required_fields):
raise NotImplementedError("ModelBaseSink needs to be subclassed with model, fields, clickhouse_table_name, timestamp_field, unique_key, name, and serializer_class")

def get_model(self):
"""
Return the model to be used for the insert
"""
return get_model("user_profile")

def get_queryset(self):
"""
Return the queryset to be used for the insert
"""
return self.get_model().objects.all()

def get_fields(self):
"""
Return the fields to be used for the insert
"""
return self.fields

def fetch_model_data(self):
"""
Fetch the data from the model queryset
"""
return self.get_queryset().values(*self.get_fields())

def dump(self, item_id):
"""
Do the serialization and send to ClickHouse
"""
item = self.get_model().objects.get(id=item_id)
serialized_items = self.serialize_item(item)

self.log.info(
f"Now dumping {self.name} {item_id} to ClickHouse",
)

try:
self.send_item(serialized_items)
self.log.info("Completed dumping %s to ClickHouse", item_id)
except Exception:
self.log.exception(
f"Error trying to dump {self.name} f{str(item_id)} to ClickHouse!",
)
raise

def serialize_item(self, item):
"""
Serialize the data to be sent to ClickHouse
"""
serializer = self.get_serializer()
return serializer.serialize(item)

def get_serializer(self):
"""
Return the serializer to be used for the insert
"""
return self.serializer_class()

def send_item(self, serialized_item):
"""
Create the insert query and CSV to send the serialized CourseOverview to ClickHouse.
We still use a CSV here even though there's only 1 row because it affords handles
type serialization for us and keeps the pattern consistent.
"""
params = self.CLICKHOUSE_BULK_INSERT_PARAMS.copy()

# "query" is a special param for the query, it's the best way to get the FORMAT CSV in there.
params["query"] = f"INSERT INTO {self.ch_database}.{self.clickhouse_table_name} FORMAT CSV"

output = io.StringIO()
writer = csv.writer(output, quoting=csv.QUOTE_NONNUMERIC)
writer.writerow(serialized_item.values())

request = requests.Request(
'POST',
self.ch_url,
data=output.getvalue().encode("utf-8"),
params=params,
auth=self.ch_auth
)

self._send_clickhouse_request(request, expected_insert_rows=1)


def get_last_dumped_timestamp(self, item_id):
"""
Return the last timestamp that was dumped to ClickHouse
"""
params = {
"query": f"SELECT max({self.timestamp_field}) as time_last_dumped "
f"FROM {self.ch_database}.{self.clickhouse_table_name} "
f"WHERE {self.unique_key} = '{item_id}'"
}

request = requests.Request(
'GET',
self.ch_url,
params=params,
auth=self.ch_auth
)

response = self._send_clickhouse_request(request)
response.raise_for_status()
if response.text.strip():
# ClickHouse returns timestamps in the format: "2023-05-03 15:47:39.331024+00:00"
# Our internal comparisons use the str() of a datetime object, this handles that
# transformation so that downstream comparisons will work.
return str(datetime.datetime.fromisoformat(response.text.strip()))

# Item has never been dumped, return None
return None


def should_dump_item(self, unique_key):
"""
Return True if the item should be dumped to ClickHouse, False otherwise
"""
return True

def fetch_target_items(self, ids=None, skip_ids=None, force_dump=False):
"""
Fetch the items that should be dumped to ClickHouse
"""
if ids:
item_keys = [self.convert_id(id) for ids in ids]
else:
item_keys = [
item.id for item in self.get_queryset()
]

for item_key in item_keys:
if item_key in skip_ids:
yield item_key, False, f"{self.name} is explicitly skipped"
elif force_dump:
yield item_key, True, "Force is set"
else:
should_be_dumped, reason = self.should_dump_item(item_key)
yield item_key, should_be_dumped, reason


class ItemSerializer:

def base_serializer(self):
"""
Return the base fields for the serializer
"""
return {
'dump_id': str(uuid.uuid4()),
'dump_timestamp': str(timezone.now())
}

def serialize(self, item):
"""
Serialize the item to be sent to ClickHouse
"""
serialized_item = {**self.serialize_item(item), **self.base_serializer()}
return serialized_item

def serialize_item(self, item):
"""
Serialize the item to be sent to ClickHouse
"""
raise NotImplementedError("ItemSerializer needs to be subclassed with serialize_item")
57 changes: 57 additions & 0 deletions event_sink_clickhouse/sinks/user_profile_sink.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
from event_sink_clickhouse.sinks.base_sink import ModelBaseSink, ItemSerializer

class UserProfileSerializer(ItemSerializer):
"""
Serializer for user profile events
"""
def serialize_item(self, user_profile):
return {
"id": user_profile.id,
"name": user_profile.name,
"meta": user_profile.meta,
"courseware": user_profile.courseware,
"language": user_profile.language,
"location": user_profile.location,
"year_of_birth": user_profile.year_of_birth,
"gender": user_profile.gender,
"level_of_education": user_profile.level_of_education,
"mailing_address": user_profile.mailing_address,
"city": user_profile.city,
"country": user_profile.country,
"state": user_profile.state,
"goals": user_profile.goals,
"bio": user_profile.bio,
"profile_image_uploaded_at": user_profile.profile_image_uploaded_at,
"phone_number": user_profile.phone_number,
}


class UserProfileSink(ModelBaseSink):
"""
Sink for user profile events
"""
model = "user_profile"
fields = [
"id",
"name",
"meta",
"courseware",
"language",
"location",
"year_of_birth",
"gender",
"level_of_education",
"mailing_address",
"city",
"country",
"state",
"goals",
"bio",
"profile_image_uploaded_at",
"phone_number",
]
unique_key= "id"
clickhouse_table_name = "user_profile"
timestamp_field = "time_last_dumped"
name = "User Profile"
serializer_class = UserProfileSerializer
11 changes: 11 additions & 0 deletions event_sink_clickhouse/sinks/utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
from importlib import import_module

from django.conf import settings


def get_model(model_name):
model_config = getattr(settings, "EVENT_SINK_CLICKHOUSE_MODEL_CONFIG").get(model_name)
module = model_config["module"]
model_name = model_config["model"]
model = getattr(import_module(module), model_name)
return model
16 changes: 16 additions & 0 deletions event_sink_clickhouse/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
from opaque_keys.edx.keys import CourseKey

from event_sink_clickhouse.sinks.course_published import CoursePublishedSink
from event_sink_clickhouse.sinks.user_profile_sink import UserProfileSink

log = logging.getLogger(__name__)
celery_log = logging.getLogger('edx.celery.task')
Expand All @@ -28,3 +29,18 @@ def dump_course_to_clickhouse(course_key_string, connection_overrides=None):
course_key = CourseKey.from_string(course_key_string)
sink = CoursePublishedSink(connection_overrides=connection_overrides, log=celery_log)
sink.dump(course_key)


@shared_task
@set_code_owner_attribute
def dump_user_profile_to_clickhouse(user_profile_id, connection_overrides=None):
"""
Serialize a user profile and writes it to ClickHouse.
Arguments:
user_profile_id: user profile id for the user profile to be exported
connection_overrides (dict): overrides to ClickHouse connection
parameters specified in `settings.EVENT_SINK_CLICKHOUSE_BACKEND_CONFIG`.
"""
sink = UserProfileSink(connection_overrides=connection_overrides, log=celery_log)
sink.dump(user_profile_id)
1 change: 1 addition & 0 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,7 @@ def is_requirement(line):
),
entry_points={
"lms.djangoapp": [
"event-sink-clickhouse = event_sink_clickhouse.apps:EventSinkClickhouseConfig",
],
"cms.djangoapp": [
"event-sink-clickhouse = event_sink_clickhouse.apps:EventSinkClickhouseConfig",
Expand Down

0 comments on commit e74568c

Please sign in to comment.