diff --git a/docker-app/qfieldcloud/core/utils2/jobs.py b/docker-app/qfieldcloud/core/utils2/jobs.py index 9384c0cb0..261c3bb5b 100644 --- a/docker-app/qfieldcloud/core/utils2/jobs.py +++ b/docker-app/qfieldcloud/core/utils2/jobs.py @@ -5,10 +5,14 @@ from django.conf import settings from django.db import transaction from django.db.models import Q +from django_pglocks import advisory_lock from qfieldcloud.core import exceptions logger = logging.getLogger(__name__) +# Any number +DELTAS_APPLY_LOCK_ID = 1001 + @transaction.atomic def apply_deltas( @@ -28,71 +32,76 @@ def apply_deltas( if not project.owner_can_create_job: return [] - # 2. Check if there are any pending deltas. - # We need to call .select_for_update() to make sure there would not be a concurrent - # request that will try to apply these deltas. - pending_deltas = models.Delta.objects.select_for_update().filter( - project=project, - last_status=models.Delta.Status.PENDING, - ) - - # 2.1. Filter only the deltas of interest. - if len(delta_ids) > 0: - pending_deltas = pending_deltas.filter(pk__in=delta_ids) - - # 3. If there are no pending deltas, do not create a new job and return. - if not pending_deltas.exists(): - return [] - - # 4. Find all the pending or queued jobs in the queue. - # If an "apply_delta" job is in a "started" status, we don't know how far the execution reached - # so we better assume the deltas will reach a non-"pending" status. - apply_jobs = models.ApplyJob.objects.filter( - project=project, - status=[ - models.Job.Status.PENDING, - models.Job.Status.QUEUED, - ], - ) - - # 5. Check whether there are jobs found in the queue and exclude all deltas that are part of any pending job. - if apply_jobs.exists(): - pending_deltas = pending_deltas.exclude(jobs_to_apply__in=apply_jobs) + with advisory_lock(DELTAS_APPLY_LOCK_ID): + # Ensure there is no concurrent request trying to apply these deltas. + # This block of code is only executed when the pg lock is released from previous transaction. + # + # NOTE `select_for_update` seem to create deadlocks sometimes + # probably due to race conditions during the locking. - # 6. If there are no pending deltas, do not create a new job and return. - deltas_count = pending_deltas.count() - if deltas_count == 0: - return [] + # 2. Check if there are any pending deltas. + pending_deltas = models.Delta.objects.filter( + project=project, + last_status=models.Delta.Status.PENDING, + ) - # 7. There are pending deltas that are not part of any pending job. So we create one. - apply_jobs = [] - for i in range(deltas_count // settings.APPLY_DELTAS_LIMIT + 1): - offset = settings.APPLY_DELTAS_LIMIT * i - limit = max(settings.APPLY_DELTAS_LIMIT * (i + 1), deltas_count) + # 2.1. Filter only the deltas of interest. + if len(delta_ids) > 0: + pending_deltas = pending_deltas.filter(pk__in=delta_ids) - if offset == limit: - break + # 3. If there are no pending deltas, do not create a new job and return. + if not pending_deltas.exists(): + return [] - apply_job = models.ApplyJob.objects.create( + # 4. Find all the pending or queued jobs in the queue. + # If an "apply_delta" job is in a "started" status, we don't know how far the execution reached + # so we better assume the deltas will reach a non-"pending" status. + apply_jobs = models.ApplyJob.objects.filter( project=project, - created_by=user, - overwrite_conflicts=overwrite_conflicts, + status=[ + models.Job.Status.PENDING, + models.Job.Status.QUEUED, + ], ) - models.ApplyJobDelta.objects.bulk_create( - [ - models.ApplyJobDelta( - apply_job=apply_job, - delta=delta, - ) - for delta in pending_deltas.order_by("created_at")[offset:limit] - ] - ) - - apply_jobs.append(apply_job) - - # 8. return the created job - return apply_jobs + # 5. Check whether there are jobs found in the queue and exclude all deltas that are part of any pending job. + if apply_jobs.exists(): + pending_deltas = pending_deltas.exclude(jobs_to_apply__in=apply_jobs) + + # 6. If there are no pending deltas, do not create a new job and return. + deltas_count = pending_deltas.count() + if deltas_count == 0: + return [] + + # 7. There are pending deltas that are not part of any pending job. So we create one. + apply_jobs = [] + for i in range(deltas_count // settings.APPLY_DELTAS_LIMIT + 1): + offset = settings.APPLY_DELTAS_LIMIT * i + limit = max(settings.APPLY_DELTAS_LIMIT * (i + 1), deltas_count) + + if offset == limit: + break + + apply_job = models.ApplyJob.objects.create( + project=project, + created_by=user, + overwrite_conflicts=overwrite_conflicts, + ) + + models.ApplyJobDelta.objects.bulk_create( + [ + models.ApplyJobDelta( + apply_job=apply_job, + delta=delta, + ) + for delta in pending_deltas.order_by("created_at")[offset:limit] + ] + ) + + apply_jobs.append(apply_job) + + # 8. return the created job + return apply_jobs def repackage(project: "models.Project", user: "models.User") -> "models.PackageJob":