Skip to content

Commit

Permalink
Work around lack of iterator support in django-cachalot
Browse files Browse the repository at this point in the history
  • Loading branch information
danlamanna committed Aug 29, 2024
1 parent 6b44767 commit 70231e9
Show file tree
Hide file tree
Showing 10 changed files with 199 additions and 171 deletions.
183 changes: 97 additions & 86 deletions isic/core/services/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import operator
from typing import Any

from cachalot.api import cachalot_disabled
from django.db.models import Func
from django.db.models.aggregates import Count
from django.db.models.query import QuerySet
Expand Down Expand Up @@ -73,69 +74,73 @@ def staff_image_metadata_csv(
+ sorted([f"unstructured.{key}" for key in used_unstructured_metadata_keys])
)

# Note this uses .values because populating django ORM objects is very slow, and doing this on
# large querysets can add ~5s per 100k images to the request time.
for image in (
qs.order_by("isic_id")
.values(
"accession__original_blob_name",
"isic_id",
"accession__cohort_id",
"accession__cohort__name",
"accession__cohort__attribution",
"accession__copyright_license",
"public",
*[f"accession__{key}" for key in used_metadata_keys],
*[f"accession__{field.csv_field_name}" for field in Accession.remapped_internal_fields],
*[f"accession__{field.input_field_name}" for field in Accession.computed_fields],
*[
f"accession__{field.relation_name}__{field.internal_id_name}"
for field in Accession.remapped_internal_fields
],
"accession__unstructured_metadata__value",
)
.iterator()
):
value = {
"original_filename": image["accession__original_blob_name"],
"isic_id": image["isic_id"],
"cohort_id": image["accession__cohort_id"],
"cohort": image["accession__cohort__name"],
"attribution": image["accession__cohort__attribution"],
"copyright_license": image["accession__copyright_license"],
"public": image["public"],
**{
k.replace("accession__", ""): v
for k, v in image.items()
if k.replace("accession__", "") in Accession.metadata_keys()
},
**{
field.internal_id_name: image[
with cachalot_disabled():
# Note this uses .values because populating django ORM objects is very slow, and doing this
# on large querysets can add ~5s per 100k images to the request time.
for image in (
qs.order_by("isic_id")
.values(
"accession__original_blob_name",
"isic_id",
"accession__cohort_id",
"accession__cohort__name",
"accession__cohort__attribution",
"accession__copyright_license",
"public",
*[f"accession__{key}" for key in used_metadata_keys],
*[
f"accession__{field.csv_field_name}"
for field in Accession.remapped_internal_fields
],
*[f"accession__{field.input_field_name}" for field in Accession.computed_fields],
*[
f"accession__{field.relation_name}__{field.internal_id_name}"
]
for field in Accession.remapped_internal_fields
},
**{
field.csv_field_name: image[f"accession__{field.csv_field_name}"]
for field in Accession.remapped_internal_fields
},
**{
f"unstructured.{k}": v
for k, v in image["accession__unstructured_metadata__value"].items()
},
}

for field in Accession.computed_fields:
computed_output_fields = field.transformer(
image[f"accession__{field.input_field_name}"]
if image.get(f"accession__{field.input_field_name}")
else None
for field in Accession.remapped_internal_fields
],
"accession__unstructured_metadata__value",
)

if computed_output_fields:
value.update(computed_output_fields)

yield value
.iterator()
):
value = {
"original_filename": image["accession__original_blob_name"],
"isic_id": image["isic_id"],
"cohort_id": image["accession__cohort_id"],
"cohort": image["accession__cohort__name"],
"attribution": image["accession__cohort__attribution"],
"copyright_license": image["accession__copyright_license"],
"public": image["public"],
**{
k.replace("accession__", ""): v
for k, v in image.items()
if k.replace("accession__", "") in Accession.metadata_keys()
},
**{
field.internal_id_name: image[
f"accession__{field.relation_name}__{field.internal_id_name}"
]
for field in Accession.remapped_internal_fields
},
**{
field.csv_field_name: image[f"accession__{field.csv_field_name}"]
for field in Accession.remapped_internal_fields
},
**{
f"unstructured.{k}": v
for k, v in image["accession__unstructured_metadata__value"].items()
},
}

for field in Accession.computed_fields:
computed_output_fields = field.transformer(
image[f"accession__{field.input_field_name}"]
if image.get(f"accession__{field.input_field_name}")
else None
)

if computed_output_fields:
value.update(computed_output_fields)

yield value


def image_metadata_csv(
Expand Down Expand Up @@ -167,28 +172,34 @@ def image_metadata_csv(
fieldnames = headers + sorted(used_metadata_keys)
yield fieldnames

# Note this uses .values because populating django ORM objects is very slow, and doing this on
# large querysets can add ~5s per 100k images to the request time.
for image in (
qs.order_by("isic_id")
.values(
"isic_id",
"accession__cohort__attribution",
"accession__copyright_license",
*[f"accession__{key}" for key in Accession.metadata_keys()],
*[f"accession__{field.csv_field_name}" for field in Accession.remapped_internal_fields],
)
.iterator()
):
image = {k.replace("accession__", ""): v for k, v in image.items()} # noqa: PLW2901

image["attribution"] = image.pop("cohort__attribution")

for computed_field in Accession.computed_fields:
if image[computed_field.input_field_name]:
computed_fields = computed_field.transformer(image[computed_field.input_field_name])
if computed_fields:
image.update(computed_fields)
del image[computed_field.input_field_name]

yield {k: v for k, v in image.items() if k in fieldnames}
with cachalot_disabled():
# Note this uses .values because populating django ORM objects is very slow, and doing this
# on large querysets can add ~5s per 100k images to the request time.
for image in (
qs.order_by("isic_id")
.values(
"isic_id",
"accession__cohort__attribution",
"accession__copyright_license",
*[f"accession__{key}" for key in Accession.metadata_keys()],
*[
f"accession__{field.csv_field_name}"
for field in Accession.remapped_internal_fields
],
)
.iterator()
):
image = {k.replace("accession__", ""): v for k, v in image.items()} # noqa: PLW2901

image["attribution"] = image.pop("cohort__attribution")

for computed_field in Accession.computed_fields:
if image[computed_field.input_field_name]:
computed_fields = computed_field.transformer(
image[computed_field.input_field_name]
)
if computed_fields:
image.update(computed_fields)
del image[computed_field.input_field_name]

yield {k: v for k, v in image.items() if k in fieldnames}
3 changes: 2 additions & 1 deletion isic/core/services/collection/image.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import itertools

from cachalot.api import cachalot_disabled
from django.contrib.auth.models import User
from django.core.exceptions import ValidationError
from django.db import transaction
Expand Down Expand Up @@ -31,7 +32,7 @@ def collection_add_images(
if collection.public and qs.private().exists():
raise ValidationError("Can't add private images to a public collection.")

with transaction.atomic():
with transaction.atomic(), cachalot_disabled():
CollectionImageM2M = Collection.images.through # noqa: N806
for image_batch in itertools.batched(qs.iterator(), 5_000):
# ignore_conflicts is necessary to make this method idempotent (consistent with
Expand Down
3 changes: 2 additions & 1 deletion isic/core/services/image/__init__.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import itertools

from cachalot.api import cachalot_disabled
from django.contrib.auth.models import User
from django.db import transaction
from django.db.models import QuerySet
Expand Down Expand Up @@ -30,7 +31,7 @@ def image_share(
if image:
qs = Image.objects.filter(pk=image.pk)

with transaction.atomic():
with transaction.atomic(), cachalot_disabled():
ImageShareM2M = Image.shares.through # noqa: N806
for image_batch in itertools.batched(qs.iterator(), 5_000):
# ignore_conflicts is necessary to make this method idempotent. ignore_conflicts only
Expand Down
4 changes: 3 additions & 1 deletion isic/core/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
from typing import cast
import uuid

from cachalot.api import cachalot_disabled
from celery import shared_task
from django.conf import settings
from django.contrib.auth.models import User
Expand Down Expand Up @@ -59,7 +60,8 @@ def share_collection_with_users_task(collection_pk: int, grantor_pk: int, user_p
retry_kwargs={"max_retries": 15},
)
def sync_elasticsearch_index_task():
bulk_add_to_search_index(Image.objects.with_elasticsearch_properties().iterator())
with cachalot_disabled():
bulk_add_to_search_index(Image.objects.with_elasticsearch_properties().iterator())


@shared_task(soft_time_limit=1800, time_limit=1810)
Expand Down
24 changes: 13 additions & 11 deletions isic/ingest/admin.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
from datetime import UTC, datetime
import logging

from cachalot.api import cachalot_disabled
from django.contrib import admin
from django.contrib.humanize.templatetags.humanize import intcomma
from django.db import models
Expand Down Expand Up @@ -136,17 +137,18 @@ def export_file_mapping(self, request, queryset):

writer.writeheader()
for cohort in queryset.select_related("contributor"):
for accession in cohort.accessions.values(
"original_blob_name",
"image__isic_id",
).iterator():
d = {
"contributor": cohort.contributor.institution_name,
"cohort": cohort.name,
"filename": accession["original_blob_name"],
"isic_id": accession["image__isic_id"],
}
writer.writerow(d)
with cachalot_disabled():
for accession in cohort.accessions.values(
"original_blob_name",
"image__isic_id",
).iterator():
d = {
"contributor": cohort.contributor.institution_name,
"cohort": cohort.name,
"filename": accession["original_blob_name"],
"isic_id": accession["image__isic_id"],
}
writer.writerow(d)
return response


Expand Down
3 changes: 2 additions & 1 deletion isic/ingest/services/cohort/__init__.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import logging

from cachalot.api import cachalot_disabled
from django.contrib.auth.models import User
from django.core.exceptions import ValidationError
from django.db import transaction
Expand Down Expand Up @@ -56,7 +57,7 @@ def cohort_publish(
)

# this creates a transaction
with lock_table_for_writes(IsicId):
with lock_table_for_writes(IsicId), cachalot_disabled():
for accession in cohort.accessions.publishable().iterator():
image = image_create(creator=publisher, accession=accession, public=public)
collection_add_images(collection=cohort.collection, image=image, ignore_lock=True)
Expand Down
14 changes: 8 additions & 6 deletions isic/ingest/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
import itertools
import time

from cachalot.api import cachalot_disabled
from celery import shared_task
from celery.exceptions import SoftTimeLimitExceeded
from celery.utils.log import get_task_logger
Expand Down Expand Up @@ -63,12 +64,13 @@ def extract_zip_task(zip_pk: int):
# rmq can only handle ~500msg/s - throttle significantly in places
# where we could be putting many messages onto the queue at once.
def generate_blobs():
for accession_id in throttled_iterator(
zip_upload.accessions.values_list("id", flat=True).iterator()
):
# avoid .delay since we want to avoid putting tens of thousands of elements
# into the transaction.on_commit list.
accession_generate_blob_task.apply_async(args=[accession_id])
with cachalot_disabled():
for accession_id in throttled_iterator(
zip_upload.accessions.values_list("id", flat=True).iterator()
):
# avoid .delay since we want to avoid putting tens of thousands of elements
# into the transaction.on_commit list.
accession_generate_blob_task.apply_async(args=[accession_id])

transaction.on_commit(generate_blobs)

Expand Down
17 changes: 10 additions & 7 deletions isic/ingest/utils/metadata.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import itertools
from typing import Any

from cachalot.api import cachalot_disabled
from django.forms.models import ModelForm
from isic_metadata.metadata import IGNORE_RCM_MODEL_CHECKS, MetadataBatch, MetadataRow
from pydantic import ValidationError as PydanticValidationError
Expand Down Expand Up @@ -50,11 +51,12 @@ def validate_csv_format_and_filenames(
)
)

matching_accessions = set(
Accession.objects.filter(cohort=cohort, original_blob_name__in=filenames.keys())
.values_list("original_blob_name", flat=True)
.iterator()
)
with cachalot_disabled():
matching_accessions = set(
Accession.objects.filter(cohort=cohort, original_blob_name__in=filenames.keys())
.values_list("original_blob_name", flat=True)
.iterator()
)

unknown_images = set(filenames.keys()) - matching_accessions
if unknown_images:
Expand Down Expand Up @@ -188,7 +190,8 @@ def accession_values_to_metadata_dict(accession_values: dict[str, Any]) -> dict[
else:
yield row

for row in accessions.exclude(original_blob_name__in=yielded_filenames).iterator():
yield accession_values_to_metadata_dict(row)
with cachalot_disabled():
for row in accessions.exclude(original_blob_name__in=yielded_filenames).iterator():
yield accession_values_to_metadata_dict(row)

return _validate_df_consistency(cohort_df_merged_metadata_rows())
Loading

0 comments on commit 70231e9

Please sign in to comment.