Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature to exclude future jobs from queue depth #68

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -288,6 +288,11 @@ queue_depths = Job.get_queue_depths()
print(queue_depths) # {"default": 1, "other_queue": 1}
```

You can also exclude jobs which exist but are scheduled to be run in the future from the queue depths, where `run_after` is set to a future time from now. To do this set the `exclude_future_jobs` kwarg like so:
```python
queue_depths = Job.get_queue_depths(exclude_future_jobs=True)
```

**Important:** When checking queue depths, do not assume that the key for your queue will always be available. Queue depths of zero won't be included
in the dict returned by this method.

Expand All @@ -312,6 +317,8 @@ manage.py worker [queue_name] [--rate_limit]
If you'd like to check your queue depth from the command line, you can run `manage.py queue_depth [queue_name [queue_name ...]]` and any
jobs in the "NEW" or "READY" states will be returned.

If you wish to exclude jobs which are scheduled to be run in the future you can add `--exclude_future_jobs` to the command.

**Important:** If you misspell or provide a queue name which does not have any jobs, a depth of 0 will always be returned.

### Gotcha: `bulk_create`
Expand Down
5 changes: 4 additions & 1 deletion django_dbq/management/commands/queue_depth.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,13 @@ class Command(BaseCommand):

def add_arguments(self, parser):
parser.add_argument("queue_name", nargs="*", default=["default"], type=str)
parser.add_argument("--exclude_future_jobs", default=False, type=bool)

def handle(self, *args, **options):
queue_names = options["queue_name"]
queue_depths = Job.get_queue_depths()
queue_depths = Job.get_queue_depths(
exclude_future_jobs=options["exclude_future_jobs"]
)

queue_depths_string = " ".join(
[
Expand Down
15 changes: 11 additions & 4 deletions django_dbq/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
get_failure_hook_name,
get_creation_hook_name,
)
from django.db.models import JSONField, UUIDField, Count, TextChoices
from django.db.models import JSONField, UUIDField, Count, TextChoices, Q
import datetime
import logging
import uuid
Expand Down Expand Up @@ -173,10 +173,17 @@ def run_creation_hook(self):
creation_hook_function(self)

@staticmethod
def get_queue_depths():
def get_queue_depths(*, exclude_future_jobs=False):
jobs_waiting_in_queue = Job.objects.filter(
state__in=(Job.STATES.READY, Job.STATES.NEW)
)
if exclude_future_jobs:
jobs_waiting_in_queue = jobs_waiting_in_queue.filter(
Q(run_after__isnull=True) | Q(run_after__lte=timezone.now())
)

annotation_dicts = (
Job.objects.filter(state__in=(Job.STATES.READY, Job.STATES.NEW))
.values("queue_name")
jobs_waiting_in_queue.values("queue_name")
.order_by("queue_name")
.annotate(Count("queue_name"))
)
Expand Down
55 changes: 52 additions & 3 deletions django_dbq/tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,12 +63,17 @@ def test_worker_with_queue_name(self):
self.assertTrue("test_queue" in output)


@freezegun.freeze_time("2025-01-01T12:00:00Z")
@override_settings(JOBS={"testjob": {"tasks": ["a"]}})
class JobModelMethodTestCase(TestCase):
def test_get_queue_depths(self):
Job.objects.create(name="testjob", queue_name="default")
Job.objects.create(name="testjob", queue_name="testworker")
Job.objects.create(name="testjob", queue_name="testworker")
Job.objects.create(
name="testjob",
queue_name="testworker",
run_after=timezone.make_aware(datetime(2025, 1, 1, 13, 0, 0)),
)
Job.objects.create(
name="testjob", queue_name="testworker", state=Job.STATES.FAILED
)
Expand All @@ -79,16 +84,38 @@ def test_get_queue_depths(self):
queue_depths = Job.get_queue_depths()
self.assertDictEqual(queue_depths, {"default": 1, "testworker": 2})

def test_get_queue_depths_exclude_future_jobs(self):
Job.objects.create(name="testjob", queue_name="default")
Job.objects.create(name="testjob", queue_name="testworker")
Job.objects.create(
name="testjob",
queue_name="testworker",
run_after=timezone.make_aware(datetime(2025, 1, 1, 13, 0, 0)),
)
Job.objects.create(
name="testjob", queue_name="testworker", state=Job.STATES.FAILED
)
Job.objects.create(
name="testjob", queue_name="testworker", state=Job.STATES.COMPLETE
)

queue_depths = Job.get_queue_depths(exclude_future_jobs=True)
self.assertDictEqual(queue_depths, {"default": 1, "testworker": 1})


@freezegun.freeze_time("2025-01-01T12:00:00Z")
@override_settings(JOBS={"testjob": {"tasks": ["a"]}})
class QueueDepthTestCase(TestCase):
def test_queue_depth(self):

Job.objects.create(name="testjob", state=Job.STATES.FAILED)
Job.objects.create(name="testjob", state=Job.STATES.NEW)
Job.objects.create(name="testjob", state=Job.STATES.FAILED)
Job.objects.create(name="testjob", state=Job.STATES.COMPLETE)
Job.objects.create(name="testjob", state=Job.STATES.READY)
Job.objects.create(
name="testjob",
state=Job.STATES.READY,
run_after=timezone.make_aware(datetime(2025, 1, 1, 13, 0, 0)),
)
Job.objects.create(
name="testjob", queue_name="testqueue", state=Job.STATES.READY
)
Expand All @@ -101,6 +128,28 @@ def test_queue_depth(self):
output = stdout.getvalue()
self.assertEqual(output.strip(), "event=queue_depths default=2")

def test_queue_depth_exclude_future_jobs(self):
Job.objects.create(name="testjob", state=Job.STATES.FAILED)
Job.objects.create(name="testjob", state=Job.STATES.NEW)
Job.objects.create(name="testjob", state=Job.STATES.FAILED)
Job.objects.create(name="testjob", state=Job.STATES.COMPLETE)
Job.objects.create(
name="testjob",
state=Job.STATES.READY,
run_after=timezone.make_aware(datetime(2025, 1, 1, 13, 0, 0)),
)
Job.objects.create(
name="testjob", queue_name="testqueue", state=Job.STATES.READY
)
Job.objects.create(
name="testjob", queue_name="testqueue", state=Job.STATES.READY
)

stdout = StringIO()
call_command("queue_depth", exclude_future_jobs=True, stdout=stdout)
output = stdout.getvalue()
self.assertEqual(output.strip(), "event=queue_depths default=1")

def test_queue_depth_multiple_queues(self):

Job.objects.create(name="testjob", state=Job.STATES.FAILED)
Expand Down