diff --git a/isic/core/services/__init__.py b/isic/core/services/__init__.py index fecbf896..fe44c532 100644 --- a/isic/core/services/__init__.py +++ b/isic/core/services/__init__.py @@ -3,6 +3,7 @@ import operator from typing import Any +from cachalot.api import cachalot_disabled from django.db.models import Func from django.db.models.aggregates import Count from django.db.models.query import QuerySet @@ -73,69 +74,73 @@ def staff_image_metadata_csv( + sorted([f"unstructured.{key}" for key in used_unstructured_metadata_keys]) ) - # Note this uses .values because populating django ORM objects is very slow, and doing this on - # large querysets can add ~5s per 100k images to the request time. - for image in ( - qs.order_by("isic_id") - .values( - "accession__original_blob_name", - "isic_id", - "accession__cohort_id", - "accession__cohort__name", - "accession__cohort__attribution", - "accession__copyright_license", - "public", - *[f"accession__{key}" for key in used_metadata_keys], - *[f"accession__{field.csv_field_name}" for field in Accession.remapped_internal_fields], - *[f"accession__{field.input_field_name}" for field in Accession.computed_fields], - *[ - f"accession__{field.relation_name}__{field.internal_id_name}" - for field in Accession.remapped_internal_fields - ], - "accession__unstructured_metadata__value", - ) - .iterator() - ): - value = { - "original_filename": image["accession__original_blob_name"], - "isic_id": image["isic_id"], - "cohort_id": image["accession__cohort_id"], - "cohort": image["accession__cohort__name"], - "attribution": image["accession__cohort__attribution"], - "copyright_license": image["accession__copyright_license"], - "public": image["public"], - **{ - k.replace("accession__", ""): v - for k, v in image.items() - if k.replace("accession__", "") in Accession.metadata_keys() - }, - **{ - field.internal_id_name: image[ + with cachalot_disabled(): + # Note this uses .values because populating django ORM objects is very slow, and doing this + # on large querysets can add ~5s per 100k images to the request time. + for image in ( + qs.order_by("isic_id") + .values( + "accession__original_blob_name", + "isic_id", + "accession__cohort_id", + "accession__cohort__name", + "accession__cohort__attribution", + "accession__copyright_license", + "public", + *[f"accession__{key}" for key in used_metadata_keys], + *[ + f"accession__{field.csv_field_name}" + for field in Accession.remapped_internal_fields + ], + *[f"accession__{field.input_field_name}" for field in Accession.computed_fields], + *[ f"accession__{field.relation_name}__{field.internal_id_name}" - ] - for field in Accession.remapped_internal_fields - }, - **{ - field.csv_field_name: image[f"accession__{field.csv_field_name}"] - for field in Accession.remapped_internal_fields - }, - **{ - f"unstructured.{k}": v - for k, v in image["accession__unstructured_metadata__value"].items() - }, - } - - for field in Accession.computed_fields: - computed_output_fields = field.transformer( - image[f"accession__{field.input_field_name}"] - if image.get(f"accession__{field.input_field_name}") - else None + for field in Accession.remapped_internal_fields + ], + "accession__unstructured_metadata__value", ) - - if computed_output_fields: - value.update(computed_output_fields) - - yield value + .iterator() + ): + value = { + "original_filename": image["accession__original_blob_name"], + "isic_id": image["isic_id"], + "cohort_id": image["accession__cohort_id"], + "cohort": image["accession__cohort__name"], + "attribution": image["accession__cohort__attribution"], + "copyright_license": image["accession__copyright_license"], + "public": image["public"], + **{ + k.replace("accession__", ""): v + for k, v in image.items() + if k.replace("accession__", "") in Accession.metadata_keys() + }, + **{ + field.internal_id_name: image[ + f"accession__{field.relation_name}__{field.internal_id_name}" + ] + for field in Accession.remapped_internal_fields + }, + **{ + field.csv_field_name: image[f"accession__{field.csv_field_name}"] + for field in Accession.remapped_internal_fields + }, + **{ + f"unstructured.{k}": v + for k, v in image["accession__unstructured_metadata__value"].items() + }, + } + + for field in Accession.computed_fields: + computed_output_fields = field.transformer( + image[f"accession__{field.input_field_name}"] + if image.get(f"accession__{field.input_field_name}") + else None + ) + + if computed_output_fields: + value.update(computed_output_fields) + + yield value def image_metadata_csv( @@ -167,28 +172,34 @@ def image_metadata_csv( fieldnames = headers + sorted(used_metadata_keys) yield fieldnames - # Note this uses .values because populating django ORM objects is very slow, and doing this on - # large querysets can add ~5s per 100k images to the request time. - for image in ( - qs.order_by("isic_id") - .values( - "isic_id", - "accession__cohort__attribution", - "accession__copyright_license", - *[f"accession__{key}" for key in Accession.metadata_keys()], - *[f"accession__{field.csv_field_name}" for field in Accession.remapped_internal_fields], - ) - .iterator() - ): - image = {k.replace("accession__", ""): v for k, v in image.items()} # noqa: PLW2901 - - image["attribution"] = image.pop("cohort__attribution") - - for computed_field in Accession.computed_fields: - if image[computed_field.input_field_name]: - computed_fields = computed_field.transformer(image[computed_field.input_field_name]) - if computed_fields: - image.update(computed_fields) - del image[computed_field.input_field_name] - - yield {k: v for k, v in image.items() if k in fieldnames} + with cachalot_disabled(): + # Note this uses .values because populating django ORM objects is very slow, and doing this + # on large querysets can add ~5s per 100k images to the request time. + for image in ( + qs.order_by("isic_id") + .values( + "isic_id", + "accession__cohort__attribution", + "accession__copyright_license", + *[f"accession__{key}" for key in Accession.metadata_keys()], + *[ + f"accession__{field.csv_field_name}" + for field in Accession.remapped_internal_fields + ], + ) + .iterator() + ): + image = {k.replace("accession__", ""): v for k, v in image.items()} # noqa: PLW2901 + + image["attribution"] = image.pop("cohort__attribution") + + for computed_field in Accession.computed_fields: + if image[computed_field.input_field_name]: + computed_fields = computed_field.transformer( + image[computed_field.input_field_name] + ) + if computed_fields: + image.update(computed_fields) + del image[computed_field.input_field_name] + + yield {k: v for k, v in image.items() if k in fieldnames} diff --git a/isic/core/services/collection/image.py b/isic/core/services/collection/image.py index fc5ee3ca..6919590f 100644 --- a/isic/core/services/collection/image.py +++ b/isic/core/services/collection/image.py @@ -1,5 +1,6 @@ import itertools +from cachalot.api import cachalot_disabled from django.contrib.auth.models import User from django.core.exceptions import ValidationError from django.db import transaction @@ -31,7 +32,7 @@ def collection_add_images( if collection.public and qs.private().exists(): raise ValidationError("Can't add private images to a public collection.") - with transaction.atomic(): + with transaction.atomic(), cachalot_disabled(): CollectionImageM2M = Collection.images.through # noqa: N806 for image_batch in itertools.batched(qs.iterator(), 5_000): # ignore_conflicts is necessary to make this method idempotent (consistent with diff --git a/isic/core/services/image/__init__.py b/isic/core/services/image/__init__.py index e1009bef..5bb27f39 100644 --- a/isic/core/services/image/__init__.py +++ b/isic/core/services/image/__init__.py @@ -1,5 +1,6 @@ import itertools +from cachalot.api import cachalot_disabled from django.contrib.auth.models import User from django.db import transaction from django.db.models import QuerySet @@ -30,7 +31,7 @@ def image_share( if image: qs = Image.objects.filter(pk=image.pk) - with transaction.atomic(): + with transaction.atomic(), cachalot_disabled(): ImageShareM2M = Image.shares.through # noqa: N806 for image_batch in itertools.batched(qs.iterator(), 5_000): # ignore_conflicts is necessary to make this method idempotent. ignore_conflicts only diff --git a/isic/core/tasks.py b/isic/core/tasks.py index 9f79bc0f..d9bdbe67 100644 --- a/isic/core/tasks.py +++ b/isic/core/tasks.py @@ -5,6 +5,7 @@ from typing import cast import uuid +from cachalot.api import cachalot_disabled from celery import shared_task from django.conf import settings from django.contrib.auth.models import User @@ -59,7 +60,8 @@ def share_collection_with_users_task(collection_pk: int, grantor_pk: int, user_p retry_kwargs={"max_retries": 15}, ) def sync_elasticsearch_index_task(): - bulk_add_to_search_index(Image.objects.with_elasticsearch_properties().iterator()) + with cachalot_disabled(): + bulk_add_to_search_index(Image.objects.with_elasticsearch_properties().iterator()) @shared_task(soft_time_limit=1800, time_limit=1810) diff --git a/isic/ingest/admin.py b/isic/ingest/admin.py index 2c30da7c..ea258b7b 100644 --- a/isic/ingest/admin.py +++ b/isic/ingest/admin.py @@ -2,6 +2,7 @@ from datetime import UTC, datetime import logging +from cachalot.api import cachalot_disabled from django.contrib import admin from django.contrib.humanize.templatetags.humanize import intcomma from django.db import models @@ -136,17 +137,18 @@ def export_file_mapping(self, request, queryset): writer.writeheader() for cohort in queryset.select_related("contributor"): - for accession in cohort.accessions.values( - "original_blob_name", - "image__isic_id", - ).iterator(): - d = { - "contributor": cohort.contributor.institution_name, - "cohort": cohort.name, - "filename": accession["original_blob_name"], - "isic_id": accession["image__isic_id"], - } - writer.writerow(d) + with cachalot_disabled(): + for accession in cohort.accessions.values( + "original_blob_name", + "image__isic_id", + ).iterator(): + d = { + "contributor": cohort.contributor.institution_name, + "cohort": cohort.name, + "filename": accession["original_blob_name"], + "isic_id": accession["image__isic_id"], + } + writer.writerow(d) return response diff --git a/isic/ingest/services/cohort/__init__.py b/isic/ingest/services/cohort/__init__.py index 20e13d73..44068bf9 100644 --- a/isic/ingest/services/cohort/__init__.py +++ b/isic/ingest/services/cohort/__init__.py @@ -1,5 +1,6 @@ import logging +from cachalot.api import cachalot_disabled from django.contrib.auth.models import User from django.core.exceptions import ValidationError from django.db import transaction @@ -56,7 +57,7 @@ def cohort_publish( ) # this creates a transaction - with lock_table_for_writes(IsicId): + with lock_table_for_writes(IsicId), cachalot_disabled(): for accession in cohort.accessions.publishable().iterator(): image = image_create(creator=publisher, accession=accession, public=public) collection_add_images(collection=cohort.collection, image=image, ignore_lock=True) diff --git a/isic/ingest/tasks.py b/isic/ingest/tasks.py index b142b338..7a3f7187 100644 --- a/isic/ingest/tasks.py +++ b/isic/ingest/tasks.py @@ -2,6 +2,7 @@ import itertools import time +from cachalot.api import cachalot_disabled from celery import shared_task from celery.exceptions import SoftTimeLimitExceeded from celery.utils.log import get_task_logger @@ -63,12 +64,13 @@ def extract_zip_task(zip_pk: int): # rmq can only handle ~500msg/s - throttle significantly in places # where we could be putting many messages onto the queue at once. def generate_blobs(): - for accession_id in throttled_iterator( - zip_upload.accessions.values_list("id", flat=True).iterator() - ): - # avoid .delay since we want to avoid putting tens of thousands of elements - # into the transaction.on_commit list. - accession_generate_blob_task.apply_async(args=[accession_id]) + with cachalot_disabled(): + for accession_id in throttled_iterator( + zip_upload.accessions.values_list("id", flat=True).iterator() + ): + # avoid .delay since we want to avoid putting tens of thousands of elements + # into the transaction.on_commit list. + accession_generate_blob_task.apply_async(args=[accession_id]) transaction.on_commit(generate_blobs) diff --git a/isic/ingest/utils/metadata.py b/isic/ingest/utils/metadata.py index af609ae1..32f8e4ce 100644 --- a/isic/ingest/utils/metadata.py +++ b/isic/ingest/utils/metadata.py @@ -4,6 +4,7 @@ import itertools from typing import Any +from cachalot.api import cachalot_disabled from django.forms.models import ModelForm from isic_metadata.metadata import IGNORE_RCM_MODEL_CHECKS, MetadataBatch, MetadataRow from pydantic import ValidationError as PydanticValidationError @@ -50,11 +51,12 @@ def validate_csv_format_and_filenames( ) ) - matching_accessions = set( - Accession.objects.filter(cohort=cohort, original_blob_name__in=filenames.keys()) - .values_list("original_blob_name", flat=True) - .iterator() - ) + with cachalot_disabled(): + matching_accessions = set( + Accession.objects.filter(cohort=cohort, original_blob_name__in=filenames.keys()) + .values_list("original_blob_name", flat=True) + .iterator() + ) unknown_images = set(filenames.keys()) - matching_accessions if unknown_images: @@ -188,7 +190,8 @@ def accession_values_to_metadata_dict(accession_values: dict[str, Any]) -> dict[ else: yield row - for row in accessions.exclude(original_blob_name__in=yielded_filenames).iterator(): - yield accession_values_to_metadata_dict(row) + with cachalot_disabled(): + for row in accessions.exclude(original_blob_name__in=yielded_filenames).iterator(): + yield accession_values_to_metadata_dict(row) return _validate_df_consistency(cohort_df_merged_metadata_rows()) diff --git a/isic/studies/models.py b/isic/studies/models.py index 11e39122..3c58df97 100644 --- a/isic/studies/models.py +++ b/isic/studies/models.py @@ -1,6 +1,7 @@ import csv from datetime import timedelta +from cachalot.api import cachalot_disabled from django.contrib.auth.models import User from django.contrib.postgres.fields import ArrayField from django.core.exceptions import ValidationError @@ -372,53 +373,54 @@ def annotation_duration(self) -> timedelta | None: class ResponseQuerySet(models.QuerySet): def for_display(self) -> list: - for response in ( - self.annotate( - value_answer=Cast(F("value"), CharField()), - choice_answer=F("choice__text"), - study_id=F("annotation__study__id"), - study=F("annotation__study__name"), - image=F("annotation__image__isic_id"), - annotator=F("annotation__annotator__profile__hash_id"), - annotation_duration=F("annotation__created") - F("annotation__start_time"), - question_prompt=F("question__prompt"), - answer=Case( - When( - question__type=Question.QuestionType.SELECT, - then=F("choice_answer"), + with cachalot_disabled(): + for response in ( + self.annotate( + value_answer=Cast(F("value"), CharField()), + choice_answer=F("choice__text"), + study_id=F("annotation__study__id"), + study=F("annotation__study__name"), + image=F("annotation__image__isic_id"), + annotator=F("annotation__annotator__profile__hash_id"), + annotation_duration=F("annotation__created") - F("annotation__start_time"), + question_prompt=F("question__prompt"), + answer=Case( + When( + question__type=Question.QuestionType.SELECT, + then=F("choice_answer"), + ), + default=F("value_answer"), + output_field=CharField(), ), - default=F("value_answer"), - output_field=CharField(), - ), - ) - .order_by("annotation__image__isic_id") - .values( - "study_id", - "study", - "image", - "annotator", - "annotation_duration", - "question_prompt", - "answer", - ) - .iterator() - ): - if response["annotation_duration"] is None: - annotation_duration = "" - else: - # formatting as total seconds is easier, otherwise long durations get printed as - # 2 days, H:M:S.ms - annotation_duration = response["annotation_duration"].total_seconds() - - yield { - "study_id": response["study_id"], - "study": response["study"], - "image": response["image"], - "annotator": response["annotator"], - "annotation_duration": annotation_duration, - "question": response["question_prompt"], - "answer": response["answer"], - } + ) + .order_by("annotation__image__isic_id") + .values( + "study_id", + "study", + "image", + "annotator", + "annotation_duration", + "question_prompt", + "answer", + ) + .iterator() + ): + if response["annotation_duration"] is None: + annotation_duration = "" + else: + # formatting as total seconds is easier, otherwise long durations get printed as + # 2 days, H:M:S.ms + annotation_duration = response["annotation_duration"].total_seconds() + + yield { + "study_id": response["study_id"], + "study": response["study"], + "image": response["image"], + "annotator": response["annotator"], + "annotation_duration": annotation_duration, + "question": response["question_prompt"], + "answer": response["answer"], + } class Response(TimeStampedModel): diff --git a/isic/zip_download/api.py b/isic/zip_download/api.py index a82f07e5..aae4df18 100644 --- a/isic/zip_download/api.py +++ b/isic/zip_download/api.py @@ -6,6 +6,7 @@ import logging from botocore.signers import CloudFrontSigner +from cachalot.api import cachalot_disabled from django.conf import settings from django.contrib.sites.models import Site from django.core.signing import BadSignature, TimestampSigner @@ -124,21 +125,23 @@ def zip_file_listing( date_less_than=datetime.now(tz=UTC) + timedelta(days=1), ) signed_url = signer.generate_presigned_url(url, policy=policy) - files = [ - { - "url": signed_url.replace("*", image["accession__blob"]), - "zipPath": f"{image['isic_id']}.jpg", - } - for image in qs.values("accession__blob", "isic_id").iterator() - ] + with cachalot_disabled(): + files = [ + { + "url": signed_url.replace("*", image["accession__blob"]), + "zipPath": f"{image['isic_id']}.jpg", + } + for image in qs.values("accession__blob", "isic_id").iterator() + ] else: # development doesn't have any cloudfront frontend so we need to sign each individual url. # this is considerably slower because of the signing and the hydrating of the related # objects instead of being able to utilize .values. - files = [ - {"url": image.accession.blob.url, "zipPath": f"{image.isic_id}.jpg"} - for image in qs.iterator() - ] + with cachalot_disabled(): + files = [ + {"url": image.accession.blob.url, "zipPath": f"{image.isic_id}.jpg"} + for image in qs.iterator() + ] # initialize files with metadata and attribution files logger.info(