diff --git a/docker-app/qfieldcloud/core/utils2/jobs.py b/docker-app/qfieldcloud/core/utils2/jobs.py index 9384c0cb0..3ec1cb897 100644 --- a/docker-app/qfieldcloud/core/utils2/jobs.py +++ b/docker-app/qfieldcloud/core/utils2/jobs.py @@ -52,6 +52,7 @@ def apply_deltas( status=[ models.Job.Status.PENDING, models.Job.Status.QUEUED, + models.Job.Status.STARTED, ], ) diff --git a/docker-app/worker_wrapper/wrapper.py b/docker-app/worker_wrapper/wrapper.py index f70e97538..0e6847bf0 100644 --- a/docker-app/worker_wrapper/wrapper.py +++ b/docker-app/worker_wrapper/wrapper.py @@ -107,6 +107,12 @@ def after_docker_exception(self) -> None: pass def run(self): + """The main and first method to be called on `JobRun`. + + Should not be overloaded by inheriting classes, + they should use `before_docker_run`, `after_docker_run` + and `after_docker_exception` hooks. + """ feedback = {} try: @@ -114,6 +120,26 @@ def run(self): self.job.started_at = timezone.now() self.job.save(update_fields=["status", "started_at"]) + # # # CONCURRENCY CHECK # # # + # safety check whether there are no concurrent jobs running for that particular project + # if there are, reset the job back to `PENDING` + concurrent_jobs_count = ( + self.job.project.jobs.filter( + status__in=[Job.Status.QUEUED, Job.Status.STARTED], + ) + .exclude(pk=self.job.pk) + .count() + ) + + if concurrent_jobs_count > 0: + self.job.status = Job.Status.PENDING + self.job.started_at = None + self.job.save(update_fields=["status", "started_at"]) + logger.warning(f"Concurrent jobs occured for job {self.job}.") + # sentry_sdk + return + # # # /CONCURRENCY CHECK # # # + self.before_docker_run() command = self.get_command()