Skip to content

Commit

Permalink
refactor code
Browse files Browse the repository at this point in the history
  • Loading branch information
kelvin-muchiri committed Jan 24, 2025
1 parent 639576a commit 14584ab
Showing 1 changed file with 30 additions and 71 deletions.
101 changes: 30 additions & 71 deletions onadata/libs/utils/logger_tools.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@
ValidationError,
)
from django.core.files.storage import get_storage_class
from django.db import DataError, IntegrityError, connection, transaction
from django.db import DataError, IntegrityError, transaction
from django.db.models import F, Q
from django.db.models.query import QuerySet
from django.http import (
Expand Down Expand Up @@ -1526,101 +1526,60 @@ def delete_xform_submissions(
)


def _register_instance_repeat_columns(instance: Instance, register_pk: int) -> None:
"""
Add Instance repeat columns to the export columns register.
def _register_instance_repeat_columns(instance: Instance, register: MetaData) -> None:
"""Add Instance repeat columns to the export columns register
:param instance: Instance object
:param register_pk: Primary key of the export columns register
:param metadata: MetaData object that stores the export repeat register
"""
# Avoid cyclic import by using importlib
csv_builder_module = importlib.import_module("onadata.libs.utils.csv_builder")

def get_existing_columns(register):
with connection.cursor() as cursor:
cursor.execute(
"""
SELECT
extra_data->'merged_multiples' AS merged_multiples,
extra_data->'split_multiples' AS split_multiples
FROM main_metadata
WHERE id = %s
""",
[register.pk],
)
row = cursor.fetchone()

merged_multiples = (
json.loads(row[0], object_pairs_hook=OrderedDict)
if row[0]
else OrderedDict()
with transaction.atomic():
# We use select_for_update to acquire a row-level lock
# Only one process updates it at a time. This prevents race conditions
# and updates extra_data atomically
register = MetaData.objects.select_for_update().get(pk=register.pk)
merged_multiples = json.loads(
register.extra_data["merged_multiples"], object_pairs_hook=OrderedDict
)
split_multiples = (
json.loads(row[1], object_pairs_hook=OrderedDict)
if row[1]
else OrderedDict()
split_multiples = json.loads(
register.extra_data["split_multiples"], object_pairs_hook=OrderedDict
)

return merged_multiples, split_multiples

def process_and_update_columns(merged_multiples, split_multiples):
from onadata.libs.utils.csv_builder import CSVDataFrameBuilder

xform = instance.xform
csv_builder = CSVDataFrameBuilder(
csv_builder_module = csv_builder_module.CSVDataFrameBuilder(
xform=xform, username=xform.user.username, id_string=xform.id_string
)
data = instance.get_full_dict()
changes = {
"merged_multiples": merged_multiples,
"split_multiples": split_multiples,
}

for key, value in data.items():
csv_builder._reindex(
# Reindex split multiples
csv_builder_module._reindex(
key,
value,
split_multiples,
changes["split_multiples"],
data,
xform,
include_images=[],
split_select_multiples=True,
)
csv_builder._reindex(
# Reindex merged multiples
csv_builder_module._reindex(
key,
value,
merged_multiples,
changes["merged_multiples"],
data,
xform,
include_images=[],
split_select_multiples=False,
)

def update_extra_data(register, merged_multiples, split_multiples):
with connection.cursor() as cursor:
cursor.execute(
"""
UPDATE main_metadata
SET extra_data = jsonb_set(
jsonb_set(
COALESCE(extra_data, '{}'::jsonb),
'{merged_multiples}',
%s::jsonb,
true
),
'{split_multiples}',
%s::jsonb,
true
)
WHERE id = %s;
""",
[
json.dumps(merged_multiples),
json.dumps(split_multiples),
register.pk,
],
)

with transaction.atomic():
# Acquire a row-level lock to prevent race conditions
register = MetaData.objects.select_for_update().get(pk=register_pk)
merged_multiples, split_multiples = get_existing_columns(register)
process_and_update_columns(merged_multiples, split_multiples)
update_extra_data(register, merged_multiples, split_multiples)
register.extra_data = {key: json.dumps(value) for key, value in changes.items()}
register.save()


@transaction.atomic()
Expand All @@ -1641,7 +1600,7 @@ def register_instance_repeat_columns(instance: Instance) -> None:
except MetaData.DoesNotExist:
return

_register_instance_repeat_columns(instance, register.pk)
_register_instance_repeat_columns(instance, register)


def update_or_create_export_register(xform: XForm) -> tuple[MetaData, bool]:
Expand Down Expand Up @@ -1680,4 +1639,4 @@ def reconstruct_xform_export_register(xform: XForm) -> None:
instance_qs = xform.instances.filter(deleted_at__isnull=True)

for instance in queryset_iterator(instance_qs, chunksize=500):
_register_instance_repeat_columns(instance, register.pk)
_register_instance_repeat_columns(instance, register)

0 comments on commit 14584ab

Please sign in to comment.