From 40b760f662dbca69674c3ec60f81a42b89c4537e Mon Sep 17 00:00:00 2001 From: Cristhian Garcia Date: Thu, 17 Aug 2023 14:38:41 -0500 Subject: [PATCH] feat: use django serializer for sink serialization --- event_sink_clickhouse/apps.py | 2 +- event_sink_clickhouse/serializers.py | 49 ++++++++++++ event_sink_clickhouse/signals.py | 2 +- event_sink_clickhouse/sinks/base_sink.py | 79 +++++++++---------- .../sinks/user_profile_sink.py | 47 +---------- event_sink_clickhouse/{sinks => }/utils.py | 0 requirements/base.in | 1 + requirements/base.txt | 11 ++- requirements/dev.txt | 12 ++- requirements/doc.txt | 12 ++- requirements/pip-tools.txt | 2 +- requirements/pip.txt | 2 +- requirements/quality.txt | 12 ++- requirements/test.txt | 12 ++- 14 files changed, 143 insertions(+), 100 deletions(-) create mode 100644 event_sink_clickhouse/serializers.py rename event_sink_clickhouse/{sinks => }/utils.py (100%) diff --git a/event_sink_clickhouse/apps.py b/event_sink_clickhouse/apps.py index e16cd98..dd85b14 100644 --- a/event_sink_clickhouse/apps.py +++ b/event_sink_clickhouse/apps.py @@ -48,5 +48,5 @@ def ready(self): super().ready() from . import tasks # pylint: disable=import-outside-toplevel, unused-import - from event_sink_clickhouse.sinks import user_profile_sink + from event_sink_clickhouse import sinks from event_sink_clickhouse import signals diff --git a/event_sink_clickhouse/serializers.py b/event_sink_clickhouse/serializers.py new file mode 100644 index 0000000..0e9e82c --- /dev/null +++ b/event_sink_clickhouse/serializers.py @@ -0,0 +1,49 @@ +from rest_framework import serializers + +import uuid +from django.utils import timezone + +from event_sink_clickhouse.utils import get_model + + +class BaseSinkSerializer(serializers.Serializer): + dump_id = serializers.SerializerMethodField() + dump_timestamp = serializers.SerializerMethodField() + + class Meta: + fields = [ + "dump_id", + "dump_timestamp", + ] + + def get_dump_id(self, instance): + return uuid.uuid4() + + def get_dump_timestamp(self, instance): + return timezone.now() + + +class UserProfileSerializer(BaseSinkSerializer, serializers.ModelSerializer): + class Meta: + model = get_model("user_profile") + fields = [ + "id", + "name", + "meta", + "courseware", + "language", + "location", + "year_of_birth", + "gender", + "level_of_education", + "mailing_address", + "city", + "country", + "state", + "goals", + "bio", + "profile_image_uploaded_at", + "phone_number", + "dump_id", + "dump_timestamp", + ] diff --git a/event_sink_clickhouse/signals.py b/event_sink_clickhouse/signals.py index 3d07649..2a69f2d 100644 --- a/event_sink_clickhouse/signals.py +++ b/event_sink_clickhouse/signals.py @@ -1,7 +1,7 @@ """ Signal handler functions, mapped to specific signals in apps.py. """ -from event_sink_clickhouse.sinks.utils import get_model +from event_sink_clickhouse.utils import get_model from django.db.models.signals import post_save from django.dispatch import receiver diff --git a/event_sink_clickhouse/sinks/base_sink.py b/event_sink_clickhouse/sinks/base_sink.py index ae21193..6755adf 100644 --- a/event_sink_clickhouse/sinks/base_sink.py +++ b/event_sink_clickhouse/sinks/base_sink.py @@ -12,7 +12,7 @@ from django.conf import settings from django.utils import timezone -from event_sink_clickhouse.sinks.utils import get_model +from event_sink_clickhouse.utils import get_model ClickHouseAuth = namedtuple("ClickHouseAuth", ["username", "password"]) @@ -83,7 +83,6 @@ class ModelBaseSink(BaseSink): This class is used for the model based event sink, which uses the Django ORM to write events to ClickHouse. """ - fields = None unique_key = None clickhouse_table_name = None queryset = None @@ -97,16 +96,14 @@ def __init__(self, connection_overrides, log): required_fields = [ self.model, - self.fields, self.clickhouse_table_name, self.timestamp_field, self.unique_key, self.name, - self.serializer_class, ] if not all(required_fields): - raise NotImplementedError("ModelBaseSink needs to be subclassed with model, fields, clickhouse_table_name, timestamp_field, unique_key, name, and serializer_class") + raise NotImplementedError("ModelBaseSink needs to be subclassed with model, clickhouse_table_name, timestamp_field, unique_key, and name") def get_model(self): """ @@ -120,18 +117,6 @@ def get_queryset(self): """ return self.get_model().objects.all() - def get_fields(self): - """ - Return the fields to be used for the insert - """ - return self.fields - - def fetch_model_data(self): - """ - Fetch the data from the model queryset - """ - return self.get_queryset().values(*self.get_fields()) - def dump(self, item_id): """ Do the serialization and send to ClickHouse @@ -156,14 +141,15 @@ def serialize_item(self, item): """ Serialize the data to be sent to ClickHouse """ - serializer = self.get_serializer() - return serializer.serialize(item) + Serializer = self.get_serializer() + serializer = Serializer(item) + return serializer.data def get_serializer(self): """ Return the serializer to be used for the insert """ - return self.serializer_class() + return self.serializer_class def send_item(self, serialized_item): """ @@ -191,6 +177,32 @@ def send_item(self, serialized_item): self._send_clickhouse_request(request, expected_insert_rows=1) + def fetch_target_items(self, ids=None, skip_ids=None, force_dump=False): + """ + Fetch the items that should be dumped to ClickHouse + """ + if ids: + item_keys = [self.convert_id(id) for ids in ids] + else: + item_keys = [ + item.id for item in self.get_queryset() + ] + + for item_key in item_keys: + if item_key in skip_ids: + yield item_key, False, f"{self.name} is explicitly skipped" + elif force_dump: + yield item_key, True, "Force is set" + else: + should_be_dumped, reason = self.should_dump_item(item_key) + yield item_key, should_be_dumped, reason + + def should_dump_item(self, unique_key): + """ + Return True if the item should be dumped to ClickHouse, False otherwise + """ + return True + def get_last_dumped_timestamp(self, item_id): """ @@ -220,32 +232,17 @@ def get_last_dumped_timestamp(self, item_id): # Item has never been dumped, return None return None - - def should_dump_item(self, unique_key): + def get_fields(self): """ - Return True if the item should be dumped to ClickHouse, False otherwise + Return the fields to be used for the insert """ - return True + return self.fields - def fetch_target_items(self, ids=None, skip_ids=None, force_dump=False): + def fetch_model_data(self): """ - Fetch the items that should be dumped to ClickHouse + Fetch the data from the model queryset """ - if ids: - item_keys = [self.convert_id(id) for ids in ids] - else: - item_keys = [ - item.id for item in self.get_queryset() - ] - - for item_key in item_keys: - if item_key in skip_ids: - yield item_key, False, f"{self.name} is explicitly skipped" - elif force_dump: - yield item_key, True, "Force is set" - else: - should_be_dumped, reason = self.should_dump_item(item_key) - yield item_key, should_be_dumped, reason + return self.get_queryset().values(*self.get_fields()) class ItemSerializer: diff --git a/event_sink_clickhouse/sinks/user_profile_sink.py b/event_sink_clickhouse/sinks/user_profile_sink.py index 2c13114..de6ed94 100644 --- a/event_sink_clickhouse/sinks/user_profile_sink.py +++ b/event_sink_clickhouse/sinks/user_profile_sink.py @@ -1,29 +1,5 @@ -from event_sink_clickhouse.sinks.base_sink import ModelBaseSink, ItemSerializer - -class UserProfileSerializer(ItemSerializer): - """ - Serializer for user profile events - """ - def serialize_item(self, user_profile): - return { - "id": user_profile.id, - "name": user_profile.name, - "meta": user_profile.meta, - "courseware": user_profile.courseware, - "language": user_profile.language, - "location": user_profile.location, - "year_of_birth": user_profile.year_of_birth, - "gender": user_profile.gender, - "level_of_education": user_profile.level_of_education, - "mailing_address": user_profile.mailing_address, - "city": user_profile.city, - "country": user_profile.country, - "state": user_profile.state, - "goals": user_profile.goals, - "bio": user_profile.bio, - "profile_image_uploaded_at": user_profile.profile_image_uploaded_at, - "phone_number": user_profile.phone_number, - } +from event_sink_clickhouse.sinks.base_sink import ModelBaseSink +from event_sink_clickhouse.serializers import UserProfileSerializer class UserProfileSink(ModelBaseSink): @@ -31,25 +7,6 @@ class UserProfileSink(ModelBaseSink): Sink for user profile events """ model = "user_profile" - fields = [ - "id", - "name", - "meta", - "courseware", - "language", - "location", - "year_of_birth", - "gender", - "level_of_education", - "mailing_address", - "city", - "country", - "state", - "goals", - "bio", - "profile_image_uploaded_at", - "phone_number", - ] unique_key= "id" clickhouse_table_name = "user_profile" timestamp_field = "time_last_dumped" diff --git a/event_sink_clickhouse/sinks/utils.py b/event_sink_clickhouse/utils.py similarity index 100% rename from event_sink_clickhouse/sinks/utils.py rename to event_sink_clickhouse/utils.py diff --git a/requirements/base.in b/requirements/base.in index c08ac0a..c009410 100644 --- a/requirements/base.in +++ b/requirements/base.in @@ -6,3 +6,4 @@ Django # Web application framework requests # HTTP request library edx-django-utils # Django utilities, we use caching and monitoring edx-opaque-keys # Parsing library for course and usage keys +django-rest-framework # REST API framework diff --git a/requirements/base.txt b/requirements/base.txt index ab180cb..133dfba 100644 --- a/requirements/base.txt +++ b/requirements/base.txt @@ -22,7 +22,7 @@ cffi==1.15.1 # via pynacl charset-normalizer==3.2.0 # via requests -click==8.1.6 +click==8.1.7 # via # celery # click-didyoumean @@ -41,11 +41,16 @@ django==3.2.20 # -r requirements/base.in # django-crum # django-waffle + # djangorestframework # edx-django-utils django-crum==0.7.9 # via edx-django-utils +django-rest-framework==0.1.0 + # via -r requirements/base.in django-waffle==4.0.0 # via edx-django-utils +djangorestframework==3.14.0 + # via django-rest-framework edx-django-utils==5.7.0 # via -r requirements/base.in edx-opaque-keys==2.4.0 @@ -71,7 +76,9 @@ pynacl==1.5.0 python-dateutil==2.8.2 # via celery pytz==2023.3 - # via django + # via + # django + # djangorestframework requests==2.31.0 # via -r requirements/base.in six==1.16.0 diff --git a/requirements/dev.txt b/requirements/dev.txt index 617bdf2..53b149b 100644 --- a/requirements/dev.txt +++ b/requirements/dev.txt @@ -46,7 +46,7 @@ charset-normalizer==3.2.0 # via # -r requirements/quality.txt # requests -click==8.1.6 +click==8.1.7 # via # -r requirements/pip-tools.txt # -r requirements/quality.txt @@ -99,16 +99,23 @@ django==3.2.20 # -r requirements/quality.txt # django-crum # django-waffle + # djangorestframework # edx-django-utils # edx-i18n-tools django-crum==0.7.9 # via # -r requirements/quality.txt # edx-django-utils +django-rest-framework==0.1.0 + # via -r requirements/quality.txt django-waffle==4.0.0 # via # -r requirements/quality.txt # edx-django-utils +djangorestframework==3.14.0 + # via + # -r requirements/quality.txt + # django-rest-framework edx-django-utils==5.7.0 # via -r requirements/quality.txt edx-i18n-tools==1.1.0 @@ -117,7 +124,7 @@ edx-lint==5.3.4 # via -r requirements/quality.txt edx-opaque-keys==2.4.0 # via -r requirements/quality.txt -exceptiongroup==1.1.2 +exceptiongroup==1.1.3 # via # -r requirements/quality.txt # pytest @@ -269,6 +276,7 @@ pytz==2023.3 # via # -r requirements/quality.txt # django + # djangorestframework pyyaml==6.0.1 # via # -r requirements/quality.txt diff --git a/requirements/doc.txt b/requirements/doc.txt index 4b8fb65..44cb553 100644 --- a/requirements/doc.txt +++ b/requirements/doc.txt @@ -50,7 +50,7 @@ charset-normalizer==3.2.0 # via # -r requirements/test.txt # requests -click==8.1.6 +click==8.1.7 # via # -r requirements/test.txt # celery @@ -85,15 +85,22 @@ django==3.2.20 # -r requirements/test.txt # django-crum # django-waffle + # djangorestframework # edx-django-utils django-crum==0.7.9 # via # -r requirements/test.txt # edx-django-utils +django-rest-framework==0.1.0 + # via -r requirements/test.txt django-waffle==4.0.0 # via # -r requirements/test.txt # edx-django-utils +djangorestframework==3.14.0 + # via + # -r requirements/test.txt + # django-rest-framework doc8==1.1.1 # via -r requirements/doc.in docutils==0.19 @@ -107,7 +114,7 @@ edx-django-utils==5.7.0 # via -r requirements/test.txt edx-opaque-keys==2.4.0 # via -r requirements/test.txt -exceptiongroup==1.1.2 +exceptiongroup==1.1.3 # via # -r requirements/test.txt # pytest @@ -230,6 +237,7 @@ pytz==2023.3 # -r requirements/test.txt # babel # django + # djangorestframework pyyaml==6.0.1 # via # -r requirements/test.txt diff --git a/requirements/pip-tools.txt b/requirements/pip-tools.txt index 07c0707..3ad245f 100644 --- a/requirements/pip-tools.txt +++ b/requirements/pip-tools.txt @@ -6,7 +6,7 @@ # build==0.10.0 # via pip-tools -click==8.1.6 +click==8.1.7 # via pip-tools packaging==23.1 # via build diff --git a/requirements/pip.txt b/requirements/pip.txt index 5e2f760..7dfc08c 100644 --- a/requirements/pip.txt +++ b/requirements/pip.txt @@ -10,5 +10,5 @@ wheel==0.41.1 # The following packages are considered to be unsafe in a requirements file: pip==23.2.1 # via -r requirements/pip.in -setuptools==68.0.0 +setuptools==68.1.0 # via -r requirements/pip.in diff --git a/requirements/quality.txt b/requirements/quality.txt index 5bfc971..ca811f8 100644 --- a/requirements/quality.txt +++ b/requirements/quality.txt @@ -39,7 +39,7 @@ charset-normalizer==3.2.0 # via # -r requirements/test.txt # requests -click==8.1.6 +click==8.1.7 # via # -r requirements/test.txt # celery @@ -80,22 +80,29 @@ django==3.2.20 # -r requirements/test.txt # django-crum # django-waffle + # djangorestframework # edx-django-utils django-crum==0.7.9 # via # -r requirements/test.txt # edx-django-utils +django-rest-framework==0.1.0 + # via -r requirements/test.txt django-waffle==4.0.0 # via # -r requirements/test.txt # edx-django-utils +djangorestframework==3.14.0 + # via + # -r requirements/test.txt + # django-rest-framework edx-django-utils==5.7.0 # via -r requirements/test.txt edx-lint==5.3.4 # via -r requirements/quality.in edx-opaque-keys==2.4.0 # via -r requirements/test.txt -exceptiongroup==1.1.2 +exceptiongroup==1.1.3 # via # -r requirements/test.txt # pytest @@ -204,6 +211,7 @@ pytz==2023.3 # via # -r requirements/test.txt # django + # djangorestframework pyyaml==6.0.1 # via # -r requirements/test.txt diff --git a/requirements/test.txt b/requirements/test.txt index 1772977..c85102b 100644 --- a/requirements/test.txt +++ b/requirements/test.txt @@ -35,7 +35,7 @@ charset-normalizer==3.2.0 # via # -r requirements/base.txt # requests -click==8.1.6 +click==8.1.7 # via # -r requirements/base.txt # celery @@ -65,20 +65,27 @@ coverage[toml]==7.3.0 # -r requirements/base.txt # django-crum # django-waffle + # djangorestframework # edx-django-utils django-crum==0.7.9 # via # -r requirements/base.txt # edx-django-utils +django-rest-framework==0.1.0 + # via -r requirements/base.txt django-waffle==4.0.0 # via # -r requirements/base.txt # edx-django-utils +djangorestframework==3.14.0 + # via + # -r requirements/base.txt + # django-rest-framework edx-django-utils==5.7.0 # via -r requirements/base.txt edx-opaque-keys==2.4.0 # via -r requirements/base.txt -exceptiongroup==1.1.2 +exceptiongroup==1.1.3 # via pytest idna==3.4 # via @@ -144,6 +151,7 @@ pytz==2023.3 # via # -r requirements/base.txt # django + # djangorestframework pyyaml==6.0.1 # via # code-annotations