Skip to content

Commit

Permalink
Additional concurrency checks on jobs
Browse files Browse the repository at this point in the history
  • Loading branch information
suricactus committed Sep 4, 2023
1 parent 5885c49 commit 99a6d99
Show file tree
Hide file tree
Showing 2 changed files with 27 additions and 0 deletions.
1 change: 1 addition & 0 deletions docker-app/qfieldcloud/core/utils2/jobs.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ def apply_deltas(
status=[
models.Job.Status.PENDING,
models.Job.Status.QUEUED,
models.Job.Status.STARTED,
],
)

Expand Down
26 changes: 26 additions & 0 deletions docker-app/worker_wrapper/wrapper.py
Original file line number Diff line number Diff line change
Expand Up @@ -107,13 +107,39 @@ def after_docker_exception(self) -> None:
pass

def run(self):
"""The main and first method to be called on `JobRun`.
Should not be overloaded by inheriting classes,
they should use `before_docker_run`, `after_docker_run`
and `after_docker_exception` hooks.
"""
feedback = {}

try:
self.job.status = Job.Status.STARTED
self.job.started_at = timezone.now()
self.job.save(update_fields=["status", "started_at"])

# # # CONCURRENCY CHECK # # #
# safety check whether there are no concurrent jobs running for that particular project
# if there are, reset the job back to `PENDING`
concurrent_jobs_count = (
self.job.project.jobs.filter(
status__in=[Job.Status.QUEUED, Job.Status.STARTED],
)
.exclude(pk=self.job.pk)
.count()
)

if concurrent_jobs_count > 0:
self.job.status = Job.Status.PENDING
self.job.started_at = None
self.job.save(update_fields=["status", "started_at"])
logger.warning(f"Concurrent jobs occured for job {self.job}.")
# sentry_sdk
return
# # # /CONCURRENCY CHECK # # #

self.before_docker_run()

command = self.get_command()
Expand Down

0 comments on commit 99a6d99

Please sign in to comment.