From b87211cf61a57ce7e14a27c9e81e29279eb8bd45 Mon Sep 17 00:00:00 2001 From: tim738745 <98717409+tim738745@users.noreply.github.com> Date: Fri, 7 Jun 2024 12:10:22 -0700 Subject: [PATCH] fix: remove func_timeout (#331) --- django/workers/scheduled_jobs.py | 2 +- django/workers/tasks.py | 106 +++++++++++++------------------ 2 files changed, 44 insertions(+), 64 deletions(-) diff --git a/django/workers/scheduled_jobs.py b/django/workers/scheduled_jobs.py index 28d878c7..9d280832 100644 --- a/django/workers/scheduled_jobs.py +++ b/django/workers/scheduled_jobs.py @@ -35,7 +35,7 @@ def schedule_batch_decode_vins(): 50, name="batch_decode_vins", schedule_type="C", - cron="* * * * *", + cron="*/2 * * * *", q_options={"timeout": 60, "ack_failure": True}, ) except IntegrityError: diff --git a/django/workers/tasks.py b/django/workers/tasks.py index 280a6c8a..3f13d1dd 100644 --- a/django/workers/tasks.py +++ b/django/workers/tasks.py @@ -18,6 +18,7 @@ def create_minio_bucket(): client.make_bucket(bucket_name) +@transaction.atomic def read_uploaded_vins_file(): # TODO: this job will probably have to become more involved; it currently just uploads whatever is in the file while skipping records # that encounter uniqueness conflicts. @@ -26,17 +27,6 @@ def read_uploaded_vins_file(): # then we'll have to compare the (vin, postal_code) keys to existing records in the database, and # determine which ones need to get bulk-inserted, and which ones bulk-updated. # also have to keep in mind the memory used by any data structures we use - def close_file_response(file_response): - if file_response is not None: - file_response.close() - file_response.release_conn() - - @transaction.atomic - def inner(vins_file, file_response): - if vins_file is not None and file_response is not None: - parse_and_save(vins_file, file_response) - - file_response = None vins_file = ( UploadedVinsFile.objects.filter(processed=False) .order_by("create_timestamp") @@ -44,60 +34,50 @@ def inner(vins_file, file_response): ) if vins_file is not None: file_response = get_minio_object(vins_file.filename) - try: - func_timeout(600, inner, args=(vins_file, file_response)) - close_file_response(file_response) - except FunctionTimedOut: - print("reading vins file job timed out") - close_file_response(file_response) - raise Exception - except Exception: - close_file_response(file_response) - raise Exception + if file_response is not None: + parse_and_save(vins_file, file_response) + try: + file_response.close() + file_response.release_conn() + except Exception: + pass def batch_decode_vins(service_name, batch_size=50): - def inner(): - max_decode_attempts = settings.MAX_DECODE_ATTEMPTS - service = get_service(service_name) - if service: - decoded_vin_model = service.MODEL.value - filters = { - service.CURRENT_DECODE_SUCCESSFUL.value: False, - service.NUMBER_OF_CURRENT_DECODE_ATTEMPTS.value - + "__lt": max_decode_attempts, - } - order_by = [ - service.NUMBER_OF_CURRENT_DECODE_ATTEMPTS.value, - "create_timestamp", - ] - uploaded_vin_records = UploadedVinRecord.objects.filter(**filters).order_by( - *order_by - )[:batch_size] - uploaded_vins = set() - for uploaded_record in uploaded_vin_records: - uploaded_vins.add(uploaded_record.vin) - vins_to_update = set() - decoded_records_to_update_map = get_map( - "vin", decoded_vin_model.objects.filter(vin__in=uploaded_vins) - ) - for decoded_vin in decoded_records_to_update_map: - vins_to_update.add(decoded_vin) - vins_to_insert = uploaded_vins.difference(vins_to_update) - - decoder = service.BATCH_DECODER.value - decoded_data = decoder(uploaded_vin_records) + max_decode_attempts = settings.MAX_DECODE_ATTEMPTS + service = get_service(service_name) + if service: + decoded_vin_model = service.MODEL.value + filters = { + service.CURRENT_DECODE_SUCCESSFUL.value: False, + service.NUMBER_OF_CURRENT_DECODE_ATTEMPTS.value + + "__lt": max_decode_attempts, + } + order_by = [ + service.NUMBER_OF_CURRENT_DECODE_ATTEMPTS.value, + "create_timestamp", + ] + uploaded_vin_records = UploadedVinRecord.objects.filter(**filters).order_by( + *order_by + )[:batch_size] + uploaded_vins = set() + for uploaded_record in uploaded_vin_records: + uploaded_vins.add(uploaded_record.vin) + vins_to_update = set() + decoded_records_to_update_map = get_map( + "vin", decoded_vin_model.objects.filter(vin__in=uploaded_vins) + ) + for decoded_vin in decoded_records_to_update_map: + vins_to_update.add(decoded_vin) + vins_to_insert = uploaded_vins.difference(vins_to_update) - save_decoded_data( - uploaded_vin_records, - vins_to_insert, - decoded_records_to_update_map, - service_name, - decoded_data, - ) + decoder = service.BATCH_DECODER.value + decoded_data = decoder(uploaded_vin_records) - try: - func_timeout(45, inner) - except FunctionTimedOut: - print("batch decode vins job timed out") - raise Exception + save_decoded_data( + uploaded_vin_records, + vins_to_insert, + decoded_records_to_update_map, + service_name, + decoded_data, + )