diff --git a/README.md b/README.md index 1baaad3..3bfd6be 100644 --- a/README.md +++ b/README.md @@ -274,11 +274,18 @@ to ensure the jobs table remains at a reasonable size. To start a worker: ``` -manage.py worker [queue_name] [--rate_limit] +manage.py worker [queue_name] [--rate_limit] [--shift_limit] ``` - `queue_name` is optional, and will default to `default` - The `--rate_limit` flag is optional, and will default to `1`. It is the minimum number of seconds that must have elapsed before a subsequent job can be run. +- The `--shift_limit` flag is optional, and will default to `0`. It is the maximum number of seconds that the worker + can seek new jobs to process. The worker will seek further jobs if time remains in the shift. If the + `--shift_limit` is exceeded once + a job is started, the job will still run to completion, regardless of the time remaining. One use case for `--shift_limit` is to run the worker process via a + CRON Job. See `tests.py` for illustrative effects of `[--rate_limit]` and `[--shift_limit]` combinations on processed + jobs. If `--shift_limit` is not supplied, the default of `0` will be used and the worker will continue to run until + the process is shutdown. ##### manage.py queue_depth If you'd like to check your queue depth from the command line, you can run `manage.py queue_depth [queue_name [queue_name ...]]` and any diff --git a/django_dbq/management/commands/worker.py b/django_dbq/management/commands/worker.py index 9215aad..39b7938 100644 --- a/django_dbq/management/commands/worker.py +++ b/django_dbq/management/commands/worker.py @@ -15,9 +15,11 @@ class Worker: - def __init__(self, name, rate_limit_in_seconds): + def __init__(self, name, rate_limit_in_seconds, shift_limit_in_seconds): self.queue_name = name self.rate_limit_in_seconds = rate_limit_in_seconds + self.shift_limit_in_seconds = shift_limit_in_seconds + self.shift_start = timezone.now() self.alive = True self.last_job_finished = None self.current_job = None @@ -39,7 +41,7 @@ def shutdown(self, signum, frame): self.current_job.save(update_fields=["state"]) def run(self): - while self.alive: + while self.alive and self._shift_availability(): self.process_job() def process_job(self): @@ -111,6 +113,24 @@ def _process_job(self): self.current_job = None + def _shift_availability(self): + """ + Setting a value for shift_limit_in_seconds enables the worker to be run via a CRON Job for a period of time, + whereby worker will seek further jobs if time remains in the shift. If the shift_limit_in_seconds is + exceeded once a job is started it will still run to completion, regardless of the time remaining. + Consequently, the duration of the CRON Interval should be greater than the anticipated duration of the + longest Job. + If shift_limit_in_seconds is not supplied, the default of 0 will be used and the worker will continue to run + until shutdown. + """ + if self.shift_limit_in_seconds <= 0: + return True + elif self.shift_limit_in_seconds > 0 and (timezone.now() - self.shift_start).total_seconds() < \ + self.shift_limit_in_seconds: + return True + else: + return False + class Command(BaseCommand): @@ -125,6 +145,15 @@ def add_arguments(self, parser): default=1, type=int, ) + parser.add_argument( + "--shift_limit", + help="The time limit in seconds within which the worker can process new jobs. The default rate " + "limit is 0 seconds, which disables this argument, allowing the worker to run indefinitely.", + nargs="?", + default=0, + type=int, + ) + parser.add_argument( "--dry-run", action="store_true", @@ -142,13 +171,20 @@ def handle(self, *args, **options): queue_name = options["queue_name"] rate_limit_in_seconds = options["rate_limit"] + shift_limit_in_seconds = options["shift_limit"] - self.stdout.write( - 'Starting job worker for queue "%s" with rate limit of one job per %s second(s)' - % (queue_name, rate_limit_in_seconds) - ) + if shift_limit_in_seconds: + self.stdout.write( + 'Starting job worker for queue "%s" with rate limit of one job per %s second(s) and a shift constraint of %s seconds.' + % (queue_name, rate_limit_in_seconds, shift_limit_in_seconds) + ) + else: + self.stdout.write( + 'Starting job worker for queue "%s" with rate limit of one job per %s second(s).' + % (queue_name, rate_limit_in_seconds) + ) - worker = Worker(queue_name, rate_limit_in_seconds) + worker = Worker(queue_name, rate_limit_in_seconds, shift_limit_in_seconds) if options["dry_run"]: return diff --git a/django_dbq/tests.py b/django_dbq/tests.py index 3ae7ab9..1affd47 100644 --- a/django_dbq/tests.py +++ b/django_dbq/tests.py @@ -26,6 +26,10 @@ def failing_task(job): raise Exception("uh oh") +def shift_task(job): + job.workspace["message"] = f"{job.id} ran" + + def failure_hook(job, exception): job.workspace["output"] = "failure hook ran" @@ -163,7 +167,7 @@ def test_process_job_previous_job_long_time_ago(self, mock_sleep): class ShutdownTestCase(TestCase): def test_shutdown_sets_state_to_stopping(self): job = Job.objects.create(name="testjob") - worker = Worker("default", 1) + worker = Worker("default", 1, 0) worker.current_job = job worker.shutdown(None, None) @@ -279,7 +283,7 @@ def test_task_sequence(self): class ProcessJobTestCase(TestCase): def test_process_job(self): job = Job.objects.create(name="testjob") - Worker("default", 1)._process_job() + Worker("default", 1, 0)._process_job() job = Job.objects.get() self.assertEqual(job.state, Job.STATES.COMPLETE) @@ -288,7 +292,7 @@ def test_process_job_wrong_queue(self): Processing a different queue shouldn't touch our other job """ job = Job.objects.create(name="testjob", queue_name="lol") - Worker("default", 1)._process_job() + Worker("default", 1, 0)._process_job() job = Job.objects.get() self.assertEqual(job.state, Job.STATES.NEW) @@ -327,7 +331,7 @@ def test_creation_hook_only_runs_on_create(self): class JobFailureHookTestCase(TestCase): def test_failure_hook(self): job = Job.objects.create(name="testjob") - Worker("default", 1)._process_job() + Worker("default", 1, 0)._process_job() job = Job.objects.get() self.assertEqual(job.state, Job.STATES.FAILED) self.assertEqual(job.workspace["output"], "failure hook ran") @@ -361,3 +365,54 @@ def test_delete_old_jobs(self): self.assertEqual(Job.objects.count(), 2) self.assertTrue(j4 in Job.objects.all()) self.assertTrue(j5 in Job.objects.all()) + + +@override_settings(JOBS={"testshift": {"tasks": ["django_dbq.tests.shift_task"]}}) +class ShiftTestCase(TestCase): + """ Tests various combinations of rate_limit and shift_limit in terms of their impact on processing jobs""" + def test_rate_with_shorter_shift_limit(self): + Job.objects.create(name="testshift") + Job.objects.create(name="testshift") + stdout = StringIO() + call_command("worker", rate_limit=2, shift_limit=1, stdout=stdout) + output = stdout.getvalue() + self.assertTrue("rate limit of one job per 2 second(s) and a shift constraint of 1 seconds" in output) + self.assertEqual(Job.objects.filter(state=Job.STATES.NEW).count(), 1) + self.assertEqual(Job.objects.filter(state=Job.STATES.COMPLETE).count(), 1) + + def test_rate_with_equal_shift_limit(self): + Job.objects.create(name="testshift") + Job.objects.create(name="testshift") + stdout = StringIO() + call_command("worker", rate_limit=1, shift_limit=1, stdout=stdout) + output = stdout.getvalue() + self.assertTrue("rate limit of one job per 1 second(s) and a shift constraint of 1 seconds" in output) + self.assertEqual(Job.objects.filter(state=Job.STATES.NEW).count(), 1) + self.assertEqual(Job.objects.filter(state=Job.STATES.COMPLETE).count(), 1) + + def test_rate_with_longer_shift_limit(self): + Job.objects.create(name="testshift") + Job.objects.create(name="testshift") + stdout = StringIO() + call_command("worker", rate_limit=1, shift_limit=2, stdout=stdout) + output = stdout.getvalue() + self.assertTrue("rate limit of one job per 1 second(s) and a shift constraint of 2 seconds" in output) + self.assertEqual(Job.objects.filter(state=Job.STATES.NEW).count(), 0) + self.assertEqual(Job.objects.filter(state=Job.STATES.COMPLETE).count(), 2) + + def test_rate_with_two_workers(self): + Job.objects.create(name="testshift") + Job.objects.create(name="testshift") + Job.objects.create(name="testshift") + Job.objects.create(name="testshift") + stdout = StringIO() + call_command("worker", rate_limit=1, shift_limit=2, stdout=stdout) + output = stdout.getvalue() + self.assertTrue("rate limit of one job per 1 second(s) and a shift constraint of 2 seconds" in output) + call_command("worker", rate_limit=1, shift_limit=1, stdout=stdout) + output = stdout.getvalue() + self.assertTrue("rate limit of one job per 1 second(s) and a shift constraint of 1 seconds" in output) + self.assertEqual(Job.objects.filter(state=Job.STATES.NEW).count(), 1) + self.assertEqual(Job.objects.filter(state=Job.STATES.COMPLETE).count(), 3) + +