diff --git a/django/api/decoder_constants.py b/django/api/decoder_constants.py new file mode 100644 index 00000000..dc6eca6f --- /dev/null +++ b/django/api/decoder_constants.py @@ -0,0 +1,32 @@ +import os +from enum import Enum +from functools import partial +from api.models.decoded_vin_record import VpicDecodedVinRecord, VinpowerDecodedVinRecord +from workers.external_apis.vpic import batch_decode as vpic_batch_decode +from workers.external_apis.vinpower import batch_decode as vinpower_batch_decode + + +class VPIC(Enum): + NAME = "vpic" + CURRENT_DECODE_SUCCESSFUL = "vpic_current_decode_successful" + NUMBER_OF_CURRENT_DECODE_ATTEMPTS = "vpic_number_of_current_decode_attempts" + MODEL = VpicDecodedVinRecord + BATCH_DECODER = partial(vpic_batch_decode) + + +class VINPOWER(Enum): + NAME = "vinpower" + CURRENT_DECODE_SUCCESSFUL = "vinpower_current_decode_successful" + NUMBER_OF_CURRENT_DECODE_ATTEMPTS = "vinpower_number_of_current_decode_attempts" + MODEL = VinpowerDecodedVinRecord + BATCH_DECODER = partial(vinpower_batch_decode) + + +SERVICES = [VPIC, VINPOWER] + + +def get_service(service_name): + for service in SERVICES: + if service.NAME.value == service_name: + return service + return None diff --git a/django/api/logging_filters.py b/django/api/logging_filters.py new file mode 100644 index 00000000..f04b64e2 --- /dev/null +++ b/django/api/logging_filters.py @@ -0,0 +1,8 @@ +import logging + +class HealthcheckFilter(logging.Filter): + def filter(self, record): + msg = record.getMessage() + if "GET /api/healthcheck HTTP/1.1" in msg: + return False + return True \ No newline at end of file diff --git a/django/api/management/commands/create_app_user_and_token.py b/django/api/management/commands/create_app_user_and_token.py new file mode 100644 index 00000000..37fc5538 --- /dev/null +++ b/django/api/management/commands/create_app_user_and_token.py @@ -0,0 +1,21 @@ +from django.core.management.base import BaseCommand, CommandError +from api.models.app_user import AppUser, AppToken +from django.conf import settings +from django.db import transaction + + +class Command(BaseCommand): + def add_arguments(self, parser): + parser.add_argument("app_name", type=str) + + @transaction.atomic + def handle(self, *args, **options): + app_name = options["app_name"] + + try: + app_user = AppUser.objects.create(app_name=app_name) + token = AppToken.objects.create(user=app_user) + except Exception: + raise CommandError("Error generating user and token") + + self.stdout.write("Generated token {} for app {}".format(token.key, app_name)) \ No newline at end of file diff --git a/django/api/management/commands/reset_app_token.py b/django/api/management/commands/reset_app_token.py new file mode 100644 index 00000000..644d6336 --- /dev/null +++ b/django/api/management/commands/reset_app_token.py @@ -0,0 +1,24 @@ +from django.core.management.base import BaseCommand, CommandError +from api.models.app_user import AppUser, AppToken +from django.conf import settings +from django.db import transaction + + +class Command(BaseCommand): + def add_arguments(self, parser): + parser.add_argument("app_name", type=str) + + @transaction.atomic + def handle(self, *args, **options): + app_name = options["app_name"] + + try: + app_user = AppUser.objects.get(app_name=app_name) + AppToken.objects.get(user=app_user).delete() + new_token = AppToken.objects.create(user=app_user) + except Exception: + raise CommandError("Error resetting token") + + self.stdout.write( + "Generated new token {} for app {}".format(new_token.key, app_name) + ) diff --git a/django/api/migrations/0023_auto_20240514_1721.py b/django/api/migrations/0023_auto_20240514_1721.py new file mode 100644 index 00000000..dd3f0be7 --- /dev/null +++ b/django/api/migrations/0023_auto_20240514_1721.py @@ -0,0 +1,116 @@ +# Generated by Django 3.2.25 on 2024-05-14 17:21 + +from django.db import migrations, models +import django.db.models.deletion + + +class Migration(migrations.Migration): + + dependencies = [ + ('api', '0022_auto_20240503_1823'), + ] + + operations = [ + migrations.CreateModel( + name='AppToken', + fields=[ + ('key', models.CharField(max_length=40, primary_key=True, serialize=False, verbose_name='Key')), + ('created', models.DateTimeField(auto_now_add=True, verbose_name='Created')), + ], + options={ + 'db_table': 'app_token', + }, + ), + migrations.CreateModel( + name='AppUser', + fields=[ + ('id', models.AutoField(auto_created=True, primary_key=True, serialize=False, verbose_name='ID')), + ('create_timestamp', models.DateTimeField(auto_now_add=True, null=True)), + ('create_user', models.CharField(default='SYSTEM', max_length=130)), + ('update_timestamp', models.DateTimeField(auto_now=True, null=True)), + ('update_user', models.CharField(max_length=130, null=True)), + ('is_active', models.BooleanField(default=True)), + ('app_name', models.CharField(max_length=100, unique=True)), + ], + options={ + 'db_table': 'app_user', + }, + ), + migrations.CreateModel( + name='UploadedVinRecord', + fields=[ + ('id', models.AutoField(auto_created=True, primary_key=True, serialize=False, verbose_name='ID')), + ('create_timestamp', models.DateTimeField(auto_now_add=True, null=True)), + ('create_user', models.CharField(default='SYSTEM', max_length=130)), + ('update_timestamp', models.DateTimeField(auto_now=True, null=True)), + ('update_user', models.CharField(max_length=130, null=True)), + ('vin', models.CharField(max_length=17)), + ('postal_code', models.CharField(blank=True, max_length=7, null=True)), + ('data', models.JSONField()), + ('vpic_current_decode_successful', models.BooleanField(default=False)), + ('vpic_number_of_current_decode_attempts', models.IntegerField(default=0)), + ('vinpower_current_decode_successful', models.BooleanField(default=False)), + ('vinpower_number_of_current_decode_attempts', models.IntegerField(default=0)), + ], + options={ + 'db_table': 'uploaded_vin_record', + }, + ), + migrations.CreateModel( + name='UploadedVinsFile', + fields=[ + ('id', models.AutoField(auto_created=True, primary_key=True, serialize=False, verbose_name='ID')), + ('create_timestamp', models.DateTimeField(auto_now_add=True, null=True)), + ('create_user', models.CharField(default='SYSTEM', max_length=130)), + ('update_timestamp', models.DateTimeField(auto_now=True, null=True)), + ('update_user', models.CharField(max_length=130, null=True)), + ('filename', models.CharField(max_length=32, unique=True)), + ('chunk_size', models.IntegerField(default=25000)), + ('chunks_per_run', models.IntegerField(default=4)), + ('start_index', models.IntegerField(default=0)), + ('processed', models.BooleanField(default=False)), + ], + options={ + 'db_table': 'uploaded_vins_file', + }, + ), + migrations.CreateModel( + name='VinpowerDecodedVinRecord', + fields=[ + ('id', models.AutoField(auto_created=True, primary_key=True, serialize=False, verbose_name='ID')), + ('create_timestamp', models.DateTimeField(auto_now_add=True, null=True)), + ('create_user', models.CharField(default='SYSTEM', max_length=130)), + ('update_timestamp', models.DateTimeField(auto_now=True, null=True)), + ('update_user', models.CharField(max_length=130, null=True)), + ('vin', models.CharField(max_length=17, unique=True)), + ('data', models.JSONField()), + ], + options={ + 'db_table': 'vinpower_decoded_vin_record', + }, + ), + migrations.CreateModel( + name='VpicDecodedVinRecord', + fields=[ + ('id', models.AutoField(auto_created=True, primary_key=True, serialize=False, verbose_name='ID')), + ('create_timestamp', models.DateTimeField(auto_now_add=True, null=True)), + ('create_user', models.CharField(default='SYSTEM', max_length=130)), + ('update_timestamp', models.DateTimeField(auto_now=True, null=True)), + ('update_user', models.CharField(max_length=130, null=True)), + ('vin', models.CharField(max_length=17, unique=True)), + ('data', models.JSONField()), + ], + options={ + 'db_table': 'vpic_decoded_vin_record', + }, + ), + migrations.AddConstraint( + model_name='uploadedvinrecord', + constraint=models.UniqueConstraint(fields=('vin', 'postal_code'), name='unique_vin_postal_code'), + ), + migrations.AddField( + model_name='apptoken', + name='user', + field=models.OneToOneField(on_delete=django.db.models.deletion.CASCADE, related_name='auth_token', to='api.appuser', verbose_name='User'), + ), + ] diff --git a/django/api/models/__init__.py b/django/api/models/__init__.py index 518b5b7f..79c55108 100644 --- a/django/api/models/__init__.py +++ b/django/api/models/__init__.py @@ -21,3 +21,7 @@ from . import user from . import permission from . import user_permission +from . import app_user +from . import uploaded_vins_file +from . import uploaded_vin_record +from . import decoded_vin_record diff --git a/django/api/models/app_user.py b/django/api/models/app_user.py new file mode 100644 index 00000000..10a75f36 --- /dev/null +++ b/django/api/models/app_user.py @@ -0,0 +1,33 @@ +from django.db import models +from auditable.models import Auditable +from rest_framework.authtoken.models import Token +from django.utils.translation import gettext_lazy as _ + + +class AppUser(Auditable): + is_active = models.BooleanField(default=True) + + app_name = models.CharField(max_length=100, unique=True) + + @property + def is_authenticated(self): + return True + + class Meta: + db_table = "app_user" + + db_table_comment = "represents an external application that integrates this app via API" + + +class AppToken(Token): + user = models.OneToOneField( + AppUser, + related_name="auth_token", + on_delete=models.CASCADE, + verbose_name=_("User"), + ) + + class Meta: + db_table = "app_token" + + db_table_comment = "the token of an external application that integrates this app via API" diff --git a/django/api/models/decoded_vin_record.py b/django/api/models/decoded_vin_record.py new file mode 100644 index 00000000..ab4eb6e6 --- /dev/null +++ b/django/api/models/decoded_vin_record.py @@ -0,0 +1,25 @@ +from django.db import models +from auditable.models import Auditable + + +class DecodedVinRecord(Auditable): + vin = models.CharField(max_length=17, unique=True) + + data = models.JSONField() + + class Meta: + abstract = True + + +class VpicDecodedVinRecord(DecodedVinRecord): + class Meta: + db_table = "vpic_decoded_vin_record" + + db_table_comment = "contains vpic-decoded VIN information" + + +class VinpowerDecodedVinRecord(DecodedVinRecord): + class Meta: + db_table = "vinpower_decoded_vin_record" + + db_table_comment = "contains vinpower-decoded VIN information" diff --git a/django/api/models/uploaded_vin_record.py b/django/api/models/uploaded_vin_record.py new file mode 100644 index 00000000..371b6b38 --- /dev/null +++ b/django/api/models/uploaded_vin_record.py @@ -0,0 +1,28 @@ +from django.db import models +from auditable.models import Auditable + + +class UploadedVinRecord(Auditable): + vin = models.CharField(max_length=17) + + postal_code = models.CharField(max_length=7, null=True, blank=True) + + data = models.JSONField() + + vpic_current_decode_successful = models.BooleanField(default=False) + + vpic_number_of_current_decode_attempts = models.IntegerField(default=0) + + vinpower_current_decode_successful = models.BooleanField(default=False) + + vinpower_number_of_current_decode_attempts = models.IntegerField(default=0) + + class Meta: + db_table = "uploaded_vin_record" + constraints = [ + models.UniqueConstraint( + fields=["vin", "postal_code"], name="unique_vin_postal_code" + ) + ] + + db_table_comment = "represents an uploaded VIN, and associated information" diff --git a/django/api/models/uploaded_vins_file.py b/django/api/models/uploaded_vins_file.py new file mode 100644 index 00000000..5f3345e1 --- /dev/null +++ b/django/api/models/uploaded_vins_file.py @@ -0,0 +1,20 @@ +from django.db import models +from auditable.models import Auditable + + +class UploadedVinsFile(Auditable): + filename = models.CharField(max_length=32, unique=True) + + chunk_size = models.IntegerField(default=25000) + + chunks_per_run = models.IntegerField(default=4) + + start_index = models.IntegerField(default=0) + + processed = models.BooleanField(default=False) + + class Meta: + db_table = "uploaded_vins_file" + + db_table_comment = "represents a file containing VINs, and parsing information" + diff --git a/django/api/services/decoded_vin_record.py b/django/api/services/decoded_vin_record.py new file mode 100644 index 00000000..f8846d3c --- /dev/null +++ b/django/api/services/decoded_vin_record.py @@ -0,0 +1,73 @@ +from api.models.uploaded_vin_record import UploadedVinRecord +from api.decoder_constants import get_service +from api.services.uploaded_vin_record import ( + set_decode_successful, + get_number_of_decode_attempts, + set_number_of_decode_attempts, +) +from django.db import transaction +from django.utils import timezone + + +@transaction.atomic +def save_decoded_data( + uploaded_vin_records, + vins_to_insert, + decoded_records_to_update_map, + service_name, + decoded_data, +): + decoded_records_to_insert = [] + decoded_records_to_update = [] + successful_records = decoded_data["successful_records"] + failed_vins = decoded_data["failed_vins"] + + service = get_service(service_name) + if service: + decoded_vin_model = service.MODEL.value + for uploaded_record in uploaded_vin_records: + vin = uploaded_record.vin + if vin in successful_records: + decoded_datum = successful_records.get(vin) + set_decode_successful(service_name, uploaded_record, True) + if vin in vins_to_insert: + decoded_records_to_insert.append( + decoded_vin_model(vin=vin, data=decoded_datum) + ) + elif vin in decoded_records_to_update_map: + decoded_record_to_update = decoded_records_to_update_map.get(vin) + decoded_record_to_update.update_timestamp = timezone.now() + decoded_record_to_update.data = decoded_datum + decoded_records_to_update.append(decoded_record_to_update) + elif vin in failed_vins: + set_decode_successful(service_name, uploaded_record, False) + + set_number_of_decode_attempts( + service_name, + uploaded_record, + get_number_of_decode_attempts(service_name, uploaded_record) + 1, + ) + + decoded_vin_model.objects.bulk_update( + decoded_records_to_update, ["update_timestamp", "data"] + ) + decoded_vin_model.objects.bulk_create(decoded_records_to_insert) + UploadedVinRecord.objects.bulk_update( + uploaded_vin_records, + [ + "update_timestamp", + service.CURRENT_DECODE_SUCCESSFUL.value, + service.NUMBER_OF_CURRENT_DECODE_ATTEMPTS.value, + ], + ) + + +def get_decoded_vins(service_name, vins): + result = {} + service = get_service(service_name) + if service: + decoded_records_model = service.MODEL.value + records = decoded_records_model.objects.filter(vin__in=vins) + for record in records: + result[record.vin] = record.data + return result diff --git a/django/api/services/minio.py b/django/api/services/minio.py index de1b0fd5..692225f1 100644 --- a/django/api/services/minio.py +++ b/django/api/services/minio.py @@ -3,12 +3,13 @@ from django.conf import settings -MINIO = Minio( - settings.MINIO_ENDPOINT, - access_key=settings.MINIO_ACCESS_KEY, - secret_key=settings.MINIO_SECRET_KEY, - secure=settings.MINIO_USE_SSL, -) +def get_minio_client(): + return Minio( + settings.MINIO_ENDPOINT, + access_key=settings.MINIO_ACCESS_KEY, + secret_key=settings.MINIO_SECRET_KEY, + secure=settings.MINIO_USE_SSL, + ) def get_refined_object_name(object_name): @@ -19,15 +20,24 @@ def get_refined_object_name(object_name): def minio_get_object(object_name): - return MINIO.presigned_get_object( + return get_minio_client().presigned_get_object( bucket_name=settings.MINIO_BUCKET_NAME, object_name=get_refined_object_name(object_name), expires=timedelta(seconds=3600), ) +def get_minio_object(object_name): + try: + client = get_minio_client() + refined_object_name = get_refined_object_name(object_name) + return client.get_object(settings.MINIO_BUCKET_NAME, refined_object_name) + except: + raise + + def minio_put_object(object_name): - return MINIO.presigned_put_object( + return get_minio_client().presigned_put_object( bucket_name=settings.MINIO_BUCKET_NAME, object_name=get_refined_object_name(object_name), expires=timedelta(seconds=7200), @@ -35,7 +45,7 @@ def minio_put_object(object_name): def minio_remove_object(object_name): - return MINIO.remove_object( + return get_minio_client().remove_object( bucket_name=settings.MINIO_BUCKET_NAME, object_name=get_refined_object_name(object_name), ) diff --git a/django/api/services/uploaded_vin_record.py b/django/api/services/uploaded_vin_record.py new file mode 100644 index 00000000..ae1bea36 --- /dev/null +++ b/django/api/services/uploaded_vin_record.py @@ -0,0 +1,72 @@ +import pandas as pd +from api.models.uploaded_vin_record import UploadedVinRecord +from api.decoder_constants import get_service + + +def parse_and_save(uploaded_vins_file, file_response): + processed = True + number_of_chunks_processed = 0 + number_of_chunks_to_process = uploaded_vins_file.chunks_per_run + chunksize = uploaded_vins_file.chunk_size + start_index = uploaded_vins_file.start_index + chunks = pd.read_csv(file_response, sep="|", chunksize=chunksize) + + for idx, chunk in enumerate(chunks): + if ( + idx >= start_index + and number_of_chunks_processed < number_of_chunks_to_process + ): + vin_records_to_insert = get_vin_records_to_insert(chunk) + UploadedVinRecord.objects.bulk_create( + vin_records_to_insert, + ignore_conflicts=True, + ) + number_of_chunks_processed = number_of_chunks_processed + 1 + elif idx >= start_index + number_of_chunks_processed: + processed = False + break + + new_start_index = start_index + number_of_chunks_processed + uploaded_vins_file.processed = processed + uploaded_vins_file.start_index = new_start_index + uploaded_vins_file.save() + + +def get_vin_records_to_insert(df): + result = [] + df.fillna("", inplace=True) + for _, row in df.iterrows(): + if row["vin"] != "": + vin = row["vin"] + postal_code = row["postal_code"] + data = row.to_dict() + del data["vin"] + del data["postal_code"] + result.append( + UploadedVinRecord(vin=vin, postal_code=postal_code, data=data) + ) + return result + + +def get_decode_successful(service_name, uploaded_record): + service = get_service(service_name) + if service: + return getattr(uploaded_record, service.CURRENT_DECODE_SUCCESSFUL.value) + + +def set_decode_successful(service_name, uploaded_record, value): + service = get_service(service_name) + if service: + setattr(uploaded_record, service.CURRENT_DECODE_SUCCESSFUL.value, value) + + +def get_number_of_decode_attempts(service_name, uploaded_record): + service = get_service(service_name) + if service: + return getattr(uploaded_record, service.NUMBER_OF_CURRENT_DECODE_ATTEMPTS.value) + + +def set_number_of_decode_attempts(service_name, uploaded_record, value): + service = get_service(service_name) + if service: + setattr(uploaded_record, service.NUMBER_OF_CURRENT_DECODE_ATTEMPTS.value, value) diff --git a/django/api/services/uploaded_vins_file.py b/django/api/services/uploaded_vins_file.py new file mode 100644 index 00000000..ed169f10 --- /dev/null +++ b/django/api/services/uploaded_vins_file.py @@ -0,0 +1,5 @@ +from api.models.uploaded_vins_file import UploadedVinsFile + + +def create_vins_file(filename, **kwargs): + UploadedVinsFile.objects.create(filename=filename, **kwargs) diff --git a/django/api/settings.py b/django/api/settings.py index 73856b9d..dbf9125a 100644 --- a/django/api/settings.py +++ b/django/api/settings.py @@ -40,6 +40,7 @@ INSTALLED_APPS = [ "api.apps.ApiConfig", + "workers.apps.Config", "tfrs.apps.ApiConfig", "metabase.apps.MetabaseConfig", "corsheaders", @@ -51,8 +52,11 @@ "django.contrib.sessions", "django.contrib.staticfiles", "rest_framework", + "django_q", ] +DEFAULT_AUTO_FIELD = 'django.db.models.AutoField' + MIDDLEWARE = [ "corsheaders.middleware.CorsMiddleware", "django.middleware.security.SecurityMiddleware", @@ -179,3 +183,46 @@ DECODER_ACCESS_KEY = os.getenv("DECODER_ACCESS_KEY") DECODER_SECRET_KEY = os.getenv("DECODER_SECRET_KEY") + +Q_CLUSTER = { + "name": "CTHUB", + "workers": 4, + "timeout": 90, + "retry": 120, + "queue_limit": 50, + "bulk": 10, + "orm": "default", + "save_limit": -1, + "max_attempts": 100, +} + +MAX_DECODE_ATTEMPTS = os.getenv("MAX_DECODE_ATTEMPTS", 5) + +VPIC_ENDPOINT = os.getenv( + "VPIC_ENDPOINT", + "https://vpic.nhtsa.dot.gov/api/vehicles", +) +VPIC_VIN_KEY = os.getenv("VPIC_VIN_KEY", "VIN") +VPIC_ERROR_CODE_NAME = os.getenv("VPIC_ERROR_CODE_NAME", "ErrorCode") +VPIC_SUCCESS_ERROR_CODE = os.getenv("VPIC_SUCCESS_ERROR_CODE", "0") + +LOGGING = { + "version": 1, + "disable_existing_loggers": False, + "filters": { + "healthcheck": { + "()": "api.logging_filters.HealthcheckFilter", + }, + }, + "handlers": { + "console": { + "class": "logging.StreamHandler", + "filters": ["healthcheck"], + } + }, + "loggers": { + "django": { + "handlers": ["console"], + }, + }, +} diff --git a/django/api/token_authentication.py b/django/api/token_authentication.py new file mode 100644 index 00000000..754e1280 --- /dev/null +++ b/django/api/token_authentication.py @@ -0,0 +1,6 @@ +from rest_framework.authentication import TokenAuthentication +from api.models.app_user import AppToken + + +class CustomTokenAuthentication(TokenAuthentication): + model = AppToken diff --git a/django/api/urls.py b/django/api/urls.py index 89abbbf9..fb628427 100644 --- a/django/api/urls.py +++ b/django/api/urls.py @@ -22,6 +22,9 @@ from api.viewsets.minio import MinioViewSet from api.viewsets.upload import UploadViewset from api.viewsets.user import UserViewSet +from api.viewsets.healthcheck import HealthCheckViewset +from api.viewsets.decoded_vin_record import DecodedVinRecordViewset + ROUTER = routers.SimpleRouter(trailing_slash=False) @@ -31,6 +34,8 @@ ROUTER.register(r"minio", MinioViewSet, basename="minio") ROUTER.register(r"users", UserViewSet) +ROUTER.register(r"healthcheck", HealthCheckViewset, basename="healthcheck") +ROUTER.register(r"decoded-vin-records", DecodedVinRecordViewset) urlpatterns = [ path("admin/", admin.site.urls), diff --git a/django/api/utilities/generic.py b/django/api/utilities/generic.py new file mode 100644 index 00000000..20d5a8d7 --- /dev/null +++ b/django/api/utilities/generic.py @@ -0,0 +1,6 @@ +def get_map(key_name, objects): + result = {} + for object in objects: + key = getattr(object, key_name) + result[key] = object + return result diff --git a/django/api/viewsets/decoded_vin_record.py b/django/api/viewsets/decoded_vin_record.py new file mode 100644 index 00000000..96f8dd0a --- /dev/null +++ b/django/api/viewsets/decoded_vin_record.py @@ -0,0 +1,19 @@ +from rest_framework.viewsets import GenericViewSet +from api.token_authentication import CustomTokenAuthentication +from api.models.decoded_vin_record import VpicDecodedVinRecord +from api.services.decoded_vin_record import get_decoded_vins +from rest_framework.decorators import action +from rest_framework.response import Response + + +class DecodedVinRecordViewset(GenericViewSet): + authentication_classes = [CustomTokenAuthentication] + + queryset = VpicDecodedVinRecord.objects.all() + + @action(detail=False, methods=["post"]) + def get_decoded_vins(self, request): + service_name = request.data.get("service_name") + vins = request.data.get("vins") + decoded_vins = get_decoded_vins(service_name, vins) + return Response(decoded_vins) diff --git a/django/api/viewsets/healthcheck.py b/django/api/viewsets/healthcheck.py new file mode 100644 index 00000000..1972bbb8 --- /dev/null +++ b/django/api/viewsets/healthcheck.py @@ -0,0 +1,11 @@ +from rest_framework.viewsets import GenericViewSet +from rest_framework.response import Response +from rest_framework import status + + +class HealthCheckViewset(GenericViewSet): + authentication_classes = [] + permission_classes=[] + + def list(self, request): + return Response(status=status.HTTP_200_OK) \ No newline at end of file diff --git a/django/api/viewsets/upload.py b/django/api/viewsets/upload.py index d81757cd..67971719 100644 --- a/django/api/viewsets/upload.py +++ b/django/api/viewsets/upload.py @@ -16,6 +16,7 @@ from api.services.spreadsheet_uploader import import_from_xls import api.constants as constants from api.services.spreadsheet_uploader_prep import * +from api.services.uploaded_vins_file import create_vins_file class UploadViewset(GenericViewSet): @@ -38,7 +39,9 @@ def datasets_list(self, request): datasets = Datasets.objects.all().exclude(name__in=incomplete_datasets) serializer = DatasetsSerializer(datasets, many=True, read_only=True) - return Response(serializer.data) + serializer_data = serializer.data + serializer_data.append({"id": -1, "name": "ICBC Vins"}) + return Response(serializer_data) @action(detail=False, methods=["post"]) @method_decorator(check_upload_permission()) @@ -48,6 +51,10 @@ def import_data(self, request): dataset_selected = request.data.get("datasetSelected") replace_data = request.data.get("replace", False) + if dataset_selected == "ICBC Vins": + create_vins_file(filename) + return Response({"success": True}, status=status.HTTP_200_OK) + try: url = minio_get_object(filename) urllib.request.urlretrieve(url, filename) diff --git a/django/requirements.txt b/django/requirements.txt index f17c6e41..b8a19ac5 100644 --- a/django/requirements.txt +++ b/django/requirements.txt @@ -1,9 +1,15 @@ +arrow==1.3.0 black==24.3.0 -Django==3.1.6 +blessed==1.20.0 +croniter==2.0.1 +Django==3.2.25 psycopg2-binary==2.8.6 djangorestframework==3.12.2 django-filter==2.4.0 django-cors-headers==3.7.0 +django-picklefield==3.1 +django-q==1.3.9 +func-timeout==4.3.5 coverage==5.4 pycodestyle==2.6.0 whitenoise==5.2.0 diff --git a/django/workers/apps.py b/django/workers/apps.py new file mode 100644 index 00000000..c05d62f6 --- /dev/null +++ b/django/workers/apps.py @@ -0,0 +1,18 @@ +from django.apps import AppConfig +import sys + + +class Config(AppConfig): + name = "workers" + + def ready(self): + from workers.scheduled_jobs import ( + schedule_create_minio_bucket, + schedule_read_uploaded_vins_file, + schedule_batch_decode_vins, + ) + + if "qcluster" in sys.argv: + schedule_create_minio_bucket() + schedule_read_uploaded_vins_file() + schedule_batch_decode_vins() diff --git a/django/workers/external_apis/vinpower.py b/django/workers/external_apis/vinpower.py new file mode 100644 index 00000000..735c918d --- /dev/null +++ b/django/workers/external_apis/vinpower.py @@ -0,0 +1,2 @@ +def batch_decode(uploaded_vin_records): + return {"successful_records": [], "failed_vins": []} diff --git a/django/workers/external_apis/vpic.py b/django/workers/external_apis/vpic.py new file mode 100644 index 00000000..fa941a56 --- /dev/null +++ b/django/workers/external_apis/vpic.py @@ -0,0 +1,38 @@ +import requests +from django.conf import settings + + +def batch_decode(uploaded_vin_records): + vpic_vin_key = settings.VPIC_VIN_KEY + vpic_error_code_name = settings.VPIC_ERROR_CODE_NAME + vpic_success_error_code = settings.VPIC_SUCCESS_ERROR_CODE + successful_records = {} + failed_vins = set() + + url = settings.VPIC_ENDPOINT + "/DecodeVINValuesBatch/" + + request_data = "" + for record in uploaded_vin_records: + request_data = request_data + record.vin + ";" + + body = {"format": "json", "data": request_data} + response = requests.post(url, data=body) + response.raise_for_status + data = response.json()["Results"] + decoded_vins_map = {} + for record in data: + vin = record.get(vpic_vin_key) + decoded_vins_map[vin] = record + + for record in uploaded_vin_records: + vin = record.vin + decoded_record = decoded_vins_map.get(vin) + if ( + decoded_record is not None + and decoded_record[vpic_error_code_name] == vpic_success_error_code + ): + successful_records[vin] = decoded_record + else: + failed_vins.add(vin) + + return {"successful_records": successful_records, "failed_vins": failed_vins} diff --git a/django/workers/scheduled_jobs.py b/django/workers/scheduled_jobs.py new file mode 100644 index 00000000..28d878c7 --- /dev/null +++ b/django/workers/scheduled_jobs.py @@ -0,0 +1,42 @@ +from django_q.tasks import schedule +from django.db import IntegrityError + + +def schedule_create_minio_bucket(): + try: + schedule( + "workers.tasks.create_minio_bucket", + name="create_minio_bucket", + schedule_type="O", + repeats=1, + ) + except IntegrityError: + pass + + +def schedule_read_uploaded_vins_file(): + try: + schedule( + "workers.tasks.read_uploaded_vins_file", + name="read_uploaded_vins_file", + schedule_type="C", + cron="*/15 * * * *", + q_options={"timeout": 660, "ack_failure": True}, + ) + except IntegrityError: + pass + + +def schedule_batch_decode_vins(): + try: + schedule( + "workers.tasks.batch_decode_vins", + "vpic", + 50, + name="batch_decode_vins", + schedule_type="C", + cron="* * * * *", + q_options={"timeout": 60, "ack_failure": True}, + ) + except IntegrityError: + pass diff --git a/django/workers/tasks.py b/django/workers/tasks.py new file mode 100644 index 00000000..1efe4076 --- /dev/null +++ b/django/workers/tasks.py @@ -0,0 +1,92 @@ +from django.conf import settings +from api.services.minio import get_minio_client, get_minio_object +from func_timeout import func_timeout, FunctionTimedOut +from api.models.uploaded_vins_file import UploadedVinsFile +from api.models.uploaded_vin_record import UploadedVinRecord +from api.decoder_constants import get_service +from api.utilities.generic import get_map +from api.services.decoded_vin_record import save_decoded_data +from api.services.uploaded_vin_record import parse_and_save +from django.db import transaction + + +def create_minio_bucket(): + bucket_name = settings.MINIO_BUCKET_NAME + client = get_minio_client() + found = client.bucket_exists(bucket_name) + if not found: + client.make_bucket(bucket_name) + + +def read_uploaded_vins_file(): + # TODO: this job will probably have to become more involved; it currently just uploads whatever is in the file while skipping records + # that encounter uniqueness conflicts. + # we'll probably have to do an initial, chunked read from the + # file in order to build a map of (vin, postal_code) -> (record chunk index, record index within chunk) of unique records (based on snapshot_date?), + # then we'll have to compare the (vin, postal_code) keys to existing records in the database, and + # determine which ones need to get bulk-inserted, and which ones bulk-updated. + # also have to keep in mind the memory used by any data structures we use + @transaction.atomic + def inner(vins_file, file_response): + if vins_file is not None and file_response is not None: + parse_and_save(vins_file, file_response) + + file_response = None + vins_file = ( + UploadedVinsFile.objects.filter(processed=False).order_by("create_timestamp").first() + ) + if vins_file is not None: + file_response = get_minio_object(vins_file.filename) + try: + func_timeout(600, inner, args=(vins_file, file_response)) + except FunctionTimedOut: + print("reading vins file job timed out") + raise Exception + finally: + if file_response is not None: + file_response.close() + file_response.release_conn() + + +def batch_decode_vins(service_name, batch_size=50): + def inner(): + max_decode_attempts = settings.MAX_DECODE_ATTEMPTS + service = get_service(service_name) + if service: + decoded_vin_model = service.MODEL.value + filters = { + service.CURRENT_DECODE_SUCCESSFUL.value: False, + service.NUMBER_OF_CURRENT_DECODE_ATTEMPTS.value + + "__lt": max_decode_attempts, + } + order_by = [service.NUMBER_OF_CURRENT_DECODE_ATTEMPTS.value, "create_timestamp"] + uploaded_vin_records = UploadedVinRecord.objects.filter(**filters).order_by( + *order_by + )[:batch_size] + uploaded_vins = set() + for uploaded_record in uploaded_vin_records: + uploaded_vins.add(uploaded_record.vin) + vins_to_update = set() + decoded_records_to_update_map = get_map( + "vin", decoded_vin_model.objects.filter(vin__in=uploaded_vins) + ) + for decoded_vin in decoded_records_to_update_map: + vins_to_update.add(decoded_vin) + vins_to_insert = uploaded_vins.difference(vins_to_update) + + decoder = service.BATCH_DECODER.value + decoded_data = decoder(uploaded_vin_records) + + save_decoded_data( + uploaded_vin_records, + vins_to_insert, + decoded_records_to_update_map, + service_name, + decoded_data, + ) + + try: + func_timeout(45, inner) + except FunctionTimedOut: + print("batch decode vins job timed out") + raise Exception diff --git a/docker-compose.yml b/docker-compose.yml index cd960728..52add316 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -35,6 +35,12 @@ services: ports: - 9000:9000 - 9001:9001 + healthcheck: + test: "curl -I --fail --max-time 10 http://localhost:9000/minio/health/live" + start_period: 5s + interval: 5s + timeout: 5s + retries: 3 api: build: ./django command: > @@ -59,9 +65,36 @@ services: - ./django:/api ports: - 8000:8000 + healthcheck: + test: "curl --fail --max-time 5 http://localhost:8000/api/healthcheck" + start_period: 15s + interval: 15s + timeout: 10s + retries: 2 depends_on: db: condition: service_healthy + minio: + condition: service_healthy + workers: + build: ./django + command: > + sh -c "python manage.py qcluster" + env_file: + - minio.env + environment: + - DB_ENGINE=django.db.backends.postgresql + - DB_HOST=db + - DB_NAME=postgres + - DB_PASSWORD=postgres + - DB_PORT=5432 + - DB_USER=postgres + - DJANGO_DEBUG=True + volumes: + - ./django:/api + depends_on: + api: + condition: service_healthy web: build: ./frontend command: npm run start