Skip to content

Commit

Permalink
fix: remove func_timeout (#331)
Browse files Browse the repository at this point in the history
  • Loading branch information
tim738745 authored Jun 7, 2024
1 parent 08d2791 commit b87211c
Show file tree
Hide file tree
Showing 2 changed files with 44 additions and 64 deletions.
2 changes: 1 addition & 1 deletion django/workers/scheduled_jobs.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ def schedule_batch_decode_vins():
50,
name="batch_decode_vins",
schedule_type="C",
cron="* * * * *",
cron="*/2 * * * *",
q_options={"timeout": 60, "ack_failure": True},
)
except IntegrityError:
Expand Down
106 changes: 43 additions & 63 deletions django/workers/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ def create_minio_bucket():
client.make_bucket(bucket_name)


@transaction.atomic
def read_uploaded_vins_file():
# TODO: this job will probably have to become more involved; it currently just uploads whatever is in the file while skipping records
# that encounter uniqueness conflicts.
Expand All @@ -26,78 +27,57 @@ def read_uploaded_vins_file():
# then we'll have to compare the (vin, postal_code) keys to existing records in the database, and
# determine which ones need to get bulk-inserted, and which ones bulk-updated.
# also have to keep in mind the memory used by any data structures we use
def close_file_response(file_response):
if file_response is not None:
file_response.close()
file_response.release_conn()

@transaction.atomic
def inner(vins_file, file_response):
if vins_file is not None and file_response is not None:
parse_and_save(vins_file, file_response)

file_response = None
vins_file = (
UploadedVinsFile.objects.filter(processed=False)
.order_by("create_timestamp")
.first()
)
if vins_file is not None:
file_response = get_minio_object(vins_file.filename)
try:
func_timeout(600, inner, args=(vins_file, file_response))
close_file_response(file_response)
except FunctionTimedOut:
print("reading vins file job timed out")
close_file_response(file_response)
raise Exception
except Exception:
close_file_response(file_response)
raise Exception
if file_response is not None:
parse_and_save(vins_file, file_response)
try:
file_response.close()
file_response.release_conn()
except Exception:
pass


def batch_decode_vins(service_name, batch_size=50):
def inner():
max_decode_attempts = settings.MAX_DECODE_ATTEMPTS
service = get_service(service_name)
if service:
decoded_vin_model = service.MODEL.value
filters = {
service.CURRENT_DECODE_SUCCESSFUL.value: False,
service.NUMBER_OF_CURRENT_DECODE_ATTEMPTS.value
+ "__lt": max_decode_attempts,
}
order_by = [
service.NUMBER_OF_CURRENT_DECODE_ATTEMPTS.value,
"create_timestamp",
]
uploaded_vin_records = UploadedVinRecord.objects.filter(**filters).order_by(
*order_by
)[:batch_size]
uploaded_vins = set()
for uploaded_record in uploaded_vin_records:
uploaded_vins.add(uploaded_record.vin)
vins_to_update = set()
decoded_records_to_update_map = get_map(
"vin", decoded_vin_model.objects.filter(vin__in=uploaded_vins)
)
for decoded_vin in decoded_records_to_update_map:
vins_to_update.add(decoded_vin)
vins_to_insert = uploaded_vins.difference(vins_to_update)

decoder = service.BATCH_DECODER.value
decoded_data = decoder(uploaded_vin_records)
max_decode_attempts = settings.MAX_DECODE_ATTEMPTS
service = get_service(service_name)
if service:
decoded_vin_model = service.MODEL.value
filters = {
service.CURRENT_DECODE_SUCCESSFUL.value: False,
service.NUMBER_OF_CURRENT_DECODE_ATTEMPTS.value
+ "__lt": max_decode_attempts,
}
order_by = [
service.NUMBER_OF_CURRENT_DECODE_ATTEMPTS.value,
"create_timestamp",
]
uploaded_vin_records = UploadedVinRecord.objects.filter(**filters).order_by(
*order_by
)[:batch_size]
uploaded_vins = set()
for uploaded_record in uploaded_vin_records:
uploaded_vins.add(uploaded_record.vin)
vins_to_update = set()
decoded_records_to_update_map = get_map(
"vin", decoded_vin_model.objects.filter(vin__in=uploaded_vins)
)
for decoded_vin in decoded_records_to_update_map:
vins_to_update.add(decoded_vin)
vins_to_insert = uploaded_vins.difference(vins_to_update)

save_decoded_data(
uploaded_vin_records,
vins_to_insert,
decoded_records_to_update_map,
service_name,
decoded_data,
)
decoder = service.BATCH_DECODER.value
decoded_data = decoder(uploaded_vin_records)

try:
func_timeout(45, inner)
except FunctionTimedOut:
print("batch decode vins job timed out")
raise Exception
save_decoded_data(
uploaded_vin_records,
vins_to_insert,
decoded_records_to_update_map,
service_name,
decoded_data,
)

0 comments on commit b87211c

Please sign in to comment.