Skip to content
This repository has been archived by the owner on Apr 3, 2024. It is now read-only.

Commit

Permalink
feat: use django serializer for sink serialization
Browse files Browse the repository at this point in the history
  • Loading branch information
Ian2012 committed Aug 17, 2023
1 parent e74568c commit 40b760f
Show file tree
Hide file tree
Showing 14 changed files with 143 additions and 100 deletions.
2 changes: 1 addition & 1 deletion event_sink_clickhouse/apps.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,5 +48,5 @@ def ready(self):
super().ready()

from . import tasks # pylint: disable=import-outside-toplevel, unused-import
from event_sink_clickhouse.sinks import user_profile_sink
from event_sink_clickhouse import sinks
from event_sink_clickhouse import signals
49 changes: 49 additions & 0 deletions event_sink_clickhouse/serializers.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
from rest_framework import serializers

import uuid
from django.utils import timezone

from event_sink_clickhouse.utils import get_model


class BaseSinkSerializer(serializers.Serializer):
dump_id = serializers.SerializerMethodField()
dump_timestamp = serializers.SerializerMethodField()

class Meta:
fields = [
"dump_id",
"dump_timestamp",
]

def get_dump_id(self, instance):
return uuid.uuid4()

def get_dump_timestamp(self, instance):
return timezone.now()


class UserProfileSerializer(BaseSinkSerializer, serializers.ModelSerializer):
class Meta:
model = get_model("user_profile")
fields = [
"id",
"name",
"meta",
"courseware",
"language",
"location",
"year_of_birth",
"gender",
"level_of_education",
"mailing_address",
"city",
"country",
"state",
"goals",
"bio",
"profile_image_uploaded_at",
"phone_number",
"dump_id",
"dump_timestamp",
]
2 changes: 1 addition & 1 deletion event_sink_clickhouse/signals.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
"""
Signal handler functions, mapped to specific signals in apps.py.
"""
from event_sink_clickhouse.sinks.utils import get_model
from event_sink_clickhouse.utils import get_model

from django.db.models.signals import post_save
from django.dispatch import receiver
Expand Down
79 changes: 38 additions & 41 deletions event_sink_clickhouse/sinks/base_sink.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
from django.conf import settings
from django.utils import timezone

from event_sink_clickhouse.sinks.utils import get_model
from event_sink_clickhouse.utils import get_model

ClickHouseAuth = namedtuple("ClickHouseAuth", ["username", "password"])

Expand Down Expand Up @@ -83,7 +83,6 @@ class ModelBaseSink(BaseSink):
This class is used for the model based event sink, which uses the Django ORM to write
events to ClickHouse.
"""
fields = None
unique_key = None
clickhouse_table_name = None
queryset = None
Expand All @@ -97,16 +96,14 @@ def __init__(self, connection_overrides, log):

required_fields = [
self.model,
self.fields,
self.clickhouse_table_name,
self.timestamp_field,
self.unique_key,
self.name,
self.serializer_class,
]

if not all(required_fields):
raise NotImplementedError("ModelBaseSink needs to be subclassed with model, fields, clickhouse_table_name, timestamp_field, unique_key, name, and serializer_class")
raise NotImplementedError("ModelBaseSink needs to be subclassed with model, clickhouse_table_name, timestamp_field, unique_key, and name")

def get_model(self):
"""
Expand All @@ -120,18 +117,6 @@ def get_queryset(self):
"""
return self.get_model().objects.all()

def get_fields(self):
"""
Return the fields to be used for the insert
"""
return self.fields

def fetch_model_data(self):
"""
Fetch the data from the model queryset
"""
return self.get_queryset().values(*self.get_fields())

def dump(self, item_id):
"""
Do the serialization and send to ClickHouse
Expand All @@ -156,14 +141,15 @@ def serialize_item(self, item):
"""
Serialize the data to be sent to ClickHouse
"""
serializer = self.get_serializer()
return serializer.serialize(item)
Serializer = self.get_serializer()
serializer = Serializer(item)
return serializer.data

def get_serializer(self):
"""
Return the serializer to be used for the insert
"""
return self.serializer_class()
return self.serializer_class

def send_item(self, serialized_item):
"""
Expand Down Expand Up @@ -191,6 +177,32 @@ def send_item(self, serialized_item):

self._send_clickhouse_request(request, expected_insert_rows=1)

def fetch_target_items(self, ids=None, skip_ids=None, force_dump=False):
"""
Fetch the items that should be dumped to ClickHouse
"""
if ids:
item_keys = [self.convert_id(id) for ids in ids]
else:
item_keys = [
item.id for item in self.get_queryset()
]

for item_key in item_keys:
if item_key in skip_ids:
yield item_key, False, f"{self.name} is explicitly skipped"
elif force_dump:
yield item_key, True, "Force is set"
else:
should_be_dumped, reason = self.should_dump_item(item_key)
yield item_key, should_be_dumped, reason

def should_dump_item(self, unique_key):
"""
Return True if the item should be dumped to ClickHouse, False otherwise
"""
return True


def get_last_dumped_timestamp(self, item_id):
"""
Expand Down Expand Up @@ -220,32 +232,17 @@ def get_last_dumped_timestamp(self, item_id):
# Item has never been dumped, return None
return None


def should_dump_item(self, unique_key):
def get_fields(self):
"""
Return True if the item should be dumped to ClickHouse, False otherwise
Return the fields to be used for the insert
"""
return True
return self.fields

def fetch_target_items(self, ids=None, skip_ids=None, force_dump=False):
def fetch_model_data(self):
"""
Fetch the items that should be dumped to ClickHouse
Fetch the data from the model queryset
"""
if ids:
item_keys = [self.convert_id(id) for ids in ids]
else:
item_keys = [
item.id for item in self.get_queryset()
]

for item_key in item_keys:
if item_key in skip_ids:
yield item_key, False, f"{self.name} is explicitly skipped"
elif force_dump:
yield item_key, True, "Force is set"
else:
should_be_dumped, reason = self.should_dump_item(item_key)
yield item_key, should_be_dumped, reason
return self.get_queryset().values(*self.get_fields())


class ItemSerializer:
Expand Down
47 changes: 2 additions & 45 deletions event_sink_clickhouse/sinks/user_profile_sink.py
Original file line number Diff line number Diff line change
@@ -1,55 +1,12 @@
from event_sink_clickhouse.sinks.base_sink import ModelBaseSink, ItemSerializer

class UserProfileSerializer(ItemSerializer):
"""
Serializer for user profile events
"""
def serialize_item(self, user_profile):
return {
"id": user_profile.id,
"name": user_profile.name,
"meta": user_profile.meta,
"courseware": user_profile.courseware,
"language": user_profile.language,
"location": user_profile.location,
"year_of_birth": user_profile.year_of_birth,
"gender": user_profile.gender,
"level_of_education": user_profile.level_of_education,
"mailing_address": user_profile.mailing_address,
"city": user_profile.city,
"country": user_profile.country,
"state": user_profile.state,
"goals": user_profile.goals,
"bio": user_profile.bio,
"profile_image_uploaded_at": user_profile.profile_image_uploaded_at,
"phone_number": user_profile.phone_number,
}
from event_sink_clickhouse.sinks.base_sink import ModelBaseSink
from event_sink_clickhouse.serializers import UserProfileSerializer


class UserProfileSink(ModelBaseSink):
"""
Sink for user profile events
"""
model = "user_profile"
fields = [
"id",
"name",
"meta",
"courseware",
"language",
"location",
"year_of_birth",
"gender",
"level_of_education",
"mailing_address",
"city",
"country",
"state",
"goals",
"bio",
"profile_image_uploaded_at",
"phone_number",
]
unique_key= "id"
clickhouse_table_name = "user_profile"
timestamp_field = "time_last_dumped"
Expand Down
File renamed without changes.
1 change: 1 addition & 0 deletions requirements/base.in
Original file line number Diff line number Diff line change
Expand Up @@ -6,3 +6,4 @@ Django # Web application framework
requests # HTTP request library
edx-django-utils # Django utilities, we use caching and monitoring
edx-opaque-keys # Parsing library for course and usage keys
django-rest-framework # REST API framework
11 changes: 9 additions & 2 deletions requirements/base.txt
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ cffi==1.15.1
# via pynacl
charset-normalizer==3.2.0
# via requests
click==8.1.6
click==8.1.7
# via
# celery
# click-didyoumean
Expand All @@ -41,11 +41,16 @@ django==3.2.20
# -r requirements/base.in
# django-crum
# django-waffle
# djangorestframework
# edx-django-utils
django-crum==0.7.9
# via edx-django-utils
django-rest-framework==0.1.0
# via -r requirements/base.in
django-waffle==4.0.0
# via edx-django-utils
djangorestframework==3.14.0
# via django-rest-framework
edx-django-utils==5.7.0
# via -r requirements/base.in
edx-opaque-keys==2.4.0
Expand All @@ -71,7 +76,9 @@ pynacl==1.5.0
python-dateutil==2.8.2
# via celery
pytz==2023.3
# via django
# via
# django
# djangorestframework
requests==2.31.0
# via -r requirements/base.in
six==1.16.0
Expand Down
12 changes: 10 additions & 2 deletions requirements/dev.txt
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ charset-normalizer==3.2.0
# via
# -r requirements/quality.txt
# requests
click==8.1.6
click==8.1.7
# via
# -r requirements/pip-tools.txt
# -r requirements/quality.txt
Expand Down Expand Up @@ -99,16 +99,23 @@ django==3.2.20
# -r requirements/quality.txt
# django-crum
# django-waffle
# djangorestframework
# edx-django-utils
# edx-i18n-tools
django-crum==0.7.9
# via
# -r requirements/quality.txt
# edx-django-utils
django-rest-framework==0.1.0
# via -r requirements/quality.txt
django-waffle==4.0.0
# via
# -r requirements/quality.txt
# edx-django-utils
djangorestframework==3.14.0
# via
# -r requirements/quality.txt
# django-rest-framework
edx-django-utils==5.7.0
# via -r requirements/quality.txt
edx-i18n-tools==1.1.0
Expand All @@ -117,7 +124,7 @@ edx-lint==5.3.4
# via -r requirements/quality.txt
edx-opaque-keys==2.4.0
# via -r requirements/quality.txt
exceptiongroup==1.1.2
exceptiongroup==1.1.3
# via
# -r requirements/quality.txt
# pytest
Expand Down Expand Up @@ -269,6 +276,7 @@ pytz==2023.3
# via
# -r requirements/quality.txt
# django
# djangorestframework
pyyaml==6.0.1
# via
# -r requirements/quality.txt
Expand Down
Loading

0 comments on commit 40b760f

Please sign in to comment.