Skip to content

Commit

Permalink
Prevent overlapping jobs
Browse files Browse the repository at this point in the history
  • Loading branch information
faebebin committed Aug 29, 2023
1 parent 969bb6e commit d863373
Showing 1 changed file with 33 additions and 27 deletions.
60 changes: 33 additions & 27 deletions docker-app/qfieldcloud/core/management/commands/dequeue.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
from django.contrib.contenttypes.models import ContentType
from django.core.management.base import BaseCommand
from django.db import connection, transaction
from django_pglocks import advisory_lock
from qfieldcloud.core.models import Job
from worker_wrapper.wrapper import (
DeltaApplyJobRun,
Expand All @@ -16,6 +17,9 @@

SECONDS = 5

# Any number
JOBS_DEQUEUE_LOCK_ID = 1000


class GracefulKiller:
alive = True
Expand Down Expand Up @@ -59,33 +63,35 @@ def handle(self, *args, **options):

queued_job = None

with transaction.atomic():
with connection.cursor() as cursor:
cursor.execute("SET TRANSACTION ISOLATION LEVEL REPEATABLE READ")

busy_projects_ids_qs = Job.objects.filter(
status__in=[
Job.Status.QUEUED,
Job.Status.STARTED,
]
).values("project_id")

# select all the pending jobs, that their project has no other active job
jobs_qs = (
Job.objects.select_for_update(skip_locked=True)
.filter(status=Job.Status.PENDING)
.exclude(project_id__in=busy_projects_ids_qs)
.order_by("created_at")
)

# each `worker_wrapper` or `dequeue.py` script can handle only one job and we handle the oldest
queued_job = jobs_qs.first()

# there might be no jobs in the queue
if queued_job:
logging.info(f"Dequeued job {queued_job.id}, run!")
queued_job.status = Job.Status.QUEUED
queued_job.save(update_fields=["status"])
with advisory_lock(JOBS_DEQUEUE_LOCK_ID):
# Ensure dequeueing is done sequentially.
# This block of code is only executed when the pg lock is released from previous transaction.
#
# NOTE `select_for_update` seem to not always work regardless the isolation level
# probably due to race conditions during the locking.
with transaction.atomic():
busy_projects_ids_qs = Job.objects.filter(
status__in=[
Job.Status.QUEUED,
Job.Status.STARTED,
]
).values("project_id")

# select all the pending jobs, that their project has no other active job
jobs_qs = (
Job.objects.filter(status=Job.Status.PENDING)
.exclude(project_id__in=busy_projects_ids_qs)
.order_by("created_at")
)

# each `worker_wrapper` or `dequeue.py` script can handle only one job and we handle the oldest
queued_job = jobs_qs.first()

# there might be no jobs in the queue
if queued_job:
logging.info(f"Dequeued job {queued_job.id}, run!")
queued_job.status = Job.Status.QUEUED
queued_job.save(update_fields=["status"])

if queued_job:
self._run(queued_job)
Expand Down

0 comments on commit d863373

Please sign in to comment.