Skip to content

Commit

Permalink
Prevent deadlocks in apply_deltas
Browse files Browse the repository at this point in the history
  • Loading branch information
faebebin committed Aug 29, 2023
1 parent d863373 commit 92697a4
Showing 1 changed file with 67 additions and 58 deletions.
125 changes: 67 additions & 58 deletions docker-app/qfieldcloud/core/utils2/jobs.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,14 @@
from django.conf import settings
from django.db import transaction
from django.db.models import Q
from django_pglocks import advisory_lock
from qfieldcloud.core import exceptions

logger = logging.getLogger(__name__)

# Any number
DELTAS_APPLY_LOCK_ID = 1001


@transaction.atomic
def apply_deltas(
Expand All @@ -28,71 +32,76 @@ def apply_deltas(
if not project.owner_can_create_job:
return []

# 2. Check if there are any pending deltas.
# We need to call .select_for_update() to make sure there would not be a concurrent
# request that will try to apply these deltas.
pending_deltas = models.Delta.objects.select_for_update().filter(
project=project,
last_status=models.Delta.Status.PENDING,
)

# 2.1. Filter only the deltas of interest.
if len(delta_ids) > 0:
pending_deltas = pending_deltas.filter(pk__in=delta_ids)

# 3. If there are no pending deltas, do not create a new job and return.
if not pending_deltas.exists():
return []

# 4. Find all the pending or queued jobs in the queue.
# If an "apply_delta" job is in a "started" status, we don't know how far the execution reached
# so we better assume the deltas will reach a non-"pending" status.
apply_jobs = models.ApplyJob.objects.filter(
project=project,
status=[
models.Job.Status.PENDING,
models.Job.Status.QUEUED,
],
)

# 5. Check whether there are jobs found in the queue and exclude all deltas that are part of any pending job.
if apply_jobs.exists():
pending_deltas = pending_deltas.exclude(jobs_to_apply__in=apply_jobs)
with advisory_lock(DELTAS_APPLY_LOCK_ID):
# Ensure there is no concurrent request trying to apply these deltas.
# This block of code is only executed when the pg lock is released from previous transaction.
#
# NOTE `select_for_update` seem to create deadlocks sometimes
# probably due to race conditions during the locking.

# 6. If there are no pending deltas, do not create a new job and return.
deltas_count = pending_deltas.count()
if deltas_count == 0:
return []
# 2. Check if there are any pending deltas.
pending_deltas = models.Delta.objects.filter(
project=project,
last_status=models.Delta.Status.PENDING,
)

# 7. There are pending deltas that are not part of any pending job. So we create one.
apply_jobs = []
for i in range(deltas_count // settings.APPLY_DELTAS_LIMIT + 1):
offset = settings.APPLY_DELTAS_LIMIT * i
limit = max(settings.APPLY_DELTAS_LIMIT * (i + 1), deltas_count)
# 2.1. Filter only the deltas of interest.
if len(delta_ids) > 0:
pending_deltas = pending_deltas.filter(pk__in=delta_ids)

if offset == limit:
break
# 3. If there are no pending deltas, do not create a new job and return.
if not pending_deltas.exists():
return []

apply_job = models.ApplyJob.objects.create(
# 4. Find all the pending or queued jobs in the queue.
# If an "apply_delta" job is in a "started" status, we don't know how far the execution reached
# so we better assume the deltas will reach a non-"pending" status.
apply_jobs = models.ApplyJob.objects.filter(
project=project,
created_by=user,
overwrite_conflicts=overwrite_conflicts,
status=[
models.Job.Status.PENDING,
models.Job.Status.QUEUED,
],
)

models.ApplyJobDelta.objects.bulk_create(
[
models.ApplyJobDelta(
apply_job=apply_job,
delta=delta,
)
for delta in pending_deltas.order_by("created_at")[offset:limit]
]
)

apply_jobs.append(apply_job)

# 8. return the created job
return apply_jobs
# 5. Check whether there are jobs found in the queue and exclude all deltas that are part of any pending job.
if apply_jobs.exists():
pending_deltas = pending_deltas.exclude(jobs_to_apply__in=apply_jobs)

# 6. If there are no pending deltas, do not create a new job and return.
deltas_count = pending_deltas.count()
if deltas_count == 0:
return []

# 7. There are pending deltas that are not part of any pending job. So we create one.
apply_jobs = []
for i in range(deltas_count // settings.APPLY_DELTAS_LIMIT + 1):
offset = settings.APPLY_DELTAS_LIMIT * i
limit = max(settings.APPLY_DELTAS_LIMIT * (i + 1), deltas_count)

if offset == limit:
break

apply_job = models.ApplyJob.objects.create(
project=project,
created_by=user,
overwrite_conflicts=overwrite_conflicts,
)

models.ApplyJobDelta.objects.bulk_create(
[
models.ApplyJobDelta(
apply_job=apply_job,
delta=delta,
)
for delta in pending_deltas.order_by("created_at")[offset:limit]
]
)

apply_jobs.append(apply_job)

# 8. return the created job
return apply_jobs


def repackage(project: "models.Project", user: "models.User") -> "models.PackageJob":
Expand Down

0 comments on commit 92697a4

Please sign in to comment.