Skip to content

Commit

Permalink
Add priority queue support
Browse files Browse the repository at this point in the history
  • Loading branch information
hylje committed Jan 28, 2025
1 parent e4443b3 commit 29cdf72
Show file tree
Hide file tree
Showing 9 changed files with 513 additions and 17 deletions.
42 changes: 42 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,48 @@ job = get_job(redis, job_id)
$ minique -u redis://localhost:6379/4 -q work -q anotherqueue -q thirdqueue --allow-callable 'my_jobs.*'
```

### Priority Queues

Minique supports priority queueing as an optional feature using the `enqueue_priority` API.

Priority queues are compatible with standard workers. However, priority is implemented using a
helper data structure, requiring the client needs to call `job.cleanup()` after each job and/or
`PriorityQueue(...).periodic_clean()` to prune this structure of jobs that have already been
processed.

Priority queue requires Lua scripting permissions from the Redis queue service.

```python
from redis import StrictRedis
from minique.api import enqueue_priority, get_job

# Get a Redis connection, somehow.
redis = StrictRedis.from_url('redis://localhost:6379/4')

job = enqueue_priority(
redis=redis,
queue_name='urgent_work',
callable='my_jobs.calcumacalate', # Dotted path to your callable.
kwargs={'a': 5, 'b': 5}, # Only kwargs supported.
priority=1, # Integer
# You can also set a `job_id` yourself (but it must be unique)
)

job_id = job.id # Save the job ID somewhere, maybe?

while not job.has_finished:
pass # Twiddle thumbs...

print(job.result) # Okay!

# Job priorities are stored in a helper hash table which should be cleaned using this method
# after the job has left the queue.
job.cleanup()

# Get the same job later (though not later than 7 days (by default)):
job = get_job(redis, job_id)
```

## Sentry Support

Minique automatically integrates with the [Sentry](https://sentry.io/welcome/)
Expand Down
70 changes: 61 additions & 9 deletions minique/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
from minique.excs import DuplicateJob
from minique.models.job import Job
from minique.models.queue import Queue
from minique.models.priority_queue import PriorityQueue
from minique.utils import get_random_pronounceable_string


Expand Down Expand Up @@ -35,6 +36,7 @@ def enqueue(
:raises minique.excs.DuplicateJob: If a job with the same ID already exists.
:raises minique.excs.NoSuchJob: If the job does not exist right after creation.
"""
queue = Queue(redis, queue_name)
job = _define_and_store_job(
redis=redis,
callable=callable,
Expand All @@ -43,7 +45,48 @@ def enqueue(
job_ttl=job_ttl,
result_ttl=result_ttl,
encoding_name=encoding_name,
queue_name=queue_name,
queue=queue,
)
return job


def enqueue_priority(
redis: "Redis[bytes]",
queue_name: str,
callable: Union[Callable[..., Any], str],
kwargs: Optional[Dict[str, Any]] = None,
job_id: Optional[str] = None,
job_ttl: int = 0,
result_ttl: int = 86400 * 7,
encoding_name: Optional[str] = None,
priority: int = 0,
) -> Job:
"""
Queue up callable as a job, placing the job at the last place for its priority.
:param redis: Redis connection
:param queue_name: Name of the queue to enqueue the job in.
:param callable: A dotted path to the callable to execute on the worker.
:param kwargs: Keyword arguments to pass to the callable.
:param job_id: An identifier for the job; defaults to a random string.
:param job_ttl: Time-to-live for the job in seconds; defaults to never expire.
:param result_ttl: Time-to-live for the result in seconds; defaults to 7 days.
:param encoding_name: Name of the encoding to use for the job payload; defaults to JSON.
:param priority: Priority number of this job, defaults to zero.
:raises minique.excs.DuplicateJob: If a job with the same ID already exists.
:raises minique.excs.NoSuchJob: If the job does not exist right after creation.
"""
queue = PriorityQueue(redis, queue_name)
job = _define_and_store_job(
redis=redis,
callable=callable,
kwargs=kwargs,
job_id=job_id,
job_ttl=job_ttl,
result_ttl=result_ttl,
encoding_name=encoding_name,
queue=queue,
priority=priority,
)
return job

Expand Down Expand Up @@ -116,8 +159,14 @@ def cancel_job(
p.hset(job.redis_key, "status", JobStatus.CANCELLED.value)
queue_name = job.get_queue_name()
if queue_name:
queue = Queue(redis, name=queue_name)
p.lrem(queue.redis_key, 0, job.id)
if job.has_priority:
queue = job.get_priority_queue()
p.lrem(queue.redis_key, 0, job_id)
p.hdel(queue.prio_key, job_id)
else:
queue = job.get_queue()
p.lrem(queue.redis_key, 0, job.id)

if expire_time:
p.expire(job.redis_key, expire_time)
p.execute()
Expand All @@ -134,7 +183,8 @@ def _define_and_store_job(
job_ttl: int = 0,
result_ttl: int = 86400 * 7,
encoding_name: Optional[str] = None,
queue_name: Optional[str] = None,
queue: Optional[Queue] = None,
priority: Optional[int] = None,
) -> Job:
if not encoding_name:
encoding_name = encoding.default_encoding_name
Expand All @@ -159,17 +209,19 @@ def _define_and_store_job(
"job_ttl": int(job_ttl),
"result_ttl": int(result_ttl),
}
if queue_name:
payload["queue"] = queue_name
if queue:
payload["queue"] = queue.name
if priority is not None:
payload["priority"] = priority

with redis.pipeline() as p:
p.hset(job.redis_key, mapping=payload) # type: ignore[arg-type]
if payload["job_ttl"] > 0:
p.expire(job.redis_key, payload["job_ttl"])
if queue_name:
queue = Queue(redis, name=queue_name)
p.rpush(queue.redis_key, job.id)
p.execute()

if queue:
queue.add_job(job)

job.ensure_exists()
return job
4 changes: 4 additions & 0 deletions minique/excs.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,3 +24,7 @@ class InvalidJob(ValueError):

class MissingJobData(ValueError):
pass


class Retry(Exception):
pass
46 changes: 43 additions & 3 deletions minique/models/job.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

if TYPE_CHECKING:
from minique.models.queue import Queue
from minique.models.priority_queue import PriorityQueue


class Job:
Expand Down Expand Up @@ -47,14 +48,33 @@ def ensure_enqueued(self) -> Tuple[bool, int]:
status = self.status
if status in (JobStatus.SUCCESS, JobStatus.FAILED, JobStatus.CANCELLED):
raise InvalidStatus(f"Job {self.id} has status {status}, will not enqueue")
return self.get_queue().ensure_enqueued(self)
if self.has_priority:
queue = self.get_priority_queue()
else:
queue = self.get_queue()

return queue.ensure_enqueued(self)

def dequeue(self) -> bool:
"""
Remove the job from the queue without changing its status.
"""
num_removed = self.redis.lrem(self.get_queue().redis_key, 0, self.id)
return num_removed > 0
queue: Queue
if self.has_priority:
queue = self.get_priority_queue()
else:
queue = self.get_queue()

return queue.dequeue_job(self.id)

def cleanup(self) -> bool:
has_priority = self.redis.hget(self.redis_key, "priority")
if has_priority is not None:
queue = self.get_priority_queue()
queue.clean_job(self)
return True

return False

@property
def redis_key(self) -> str:
Expand Down Expand Up @@ -140,6 +160,21 @@ def duration(self) -> float:
raise MissingJobData(f"Job {self.id} has no duration")
return float(duration)

@property
def has_priority(self) -> bool:
return bool(self.redis.hget(self.redis_key, "priority"))

@property
def priority(self) -> int:
priority = self.redis.hget(self.redis_key, "priority")
if priority is None:
raise MissingJobData(f"Job {self.id} has no priority")
return int(priority)

@priority.setter
def priority(self, new_priority: int) -> None:
self.redis.hset(self.redis_key, "priority", new_priority)

@property
def queue_name(self) -> str:
return self.get_queue_name(missing_ok=False) # type:ignore[return-value]
Expand Down Expand Up @@ -183,6 +218,11 @@ def get_queue(self) -> "Queue":

return Queue(redis=self.redis, name=self.queue_name)

def get_priority_queue(self) -> "PriorityQueue":
from minique.models.priority_queue import PriorityQueue

return PriorityQueue(redis=self.redis, name=self.queue_name)

def set_meta(self, meta: Any) -> None:
"""
Set the "in-band" progress metadata for this job.
Expand Down
137 changes: 137 additions & 0 deletions minique/models/priority_queue.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,137 @@
from typing import TYPE_CHECKING, cast

from redis import Redis

from minique.models.queue import Queue

if TYPE_CHECKING:
from minique.models.job import Job


ADD_JOB_SCRIPT = """
local queue_key = ARGV[1]
local queue_prio_lookup = ARGV[2]
local job_key = ARGV[3]
local job_id = ARGV[4]
local job_priority = tonumber(redis.call("HGET", job_key, "priority")) or 0
local job_index = 1
local insert_status = -1
-- Check the priority for the last job in the queue, we might become the new last job
-- This is generally more efficient than iterating the queue.
local last_job_id = redis.call("LRANGE", queue_key, -1, -1)[1]
if last_job_id ~= nil then
local last_job_priority = tonumber(redis.call("HGET", queue_prio_lookup, last_job_id))
if last_job_priority >= job_priority then
insert_status = redis.call("RPUSH", queue_key, job_id)
-- Insert status is the length of the queue
job_index = insert_status
end
else
-- The queue is empty.
insert_status = redis.call("RPUSH", queue_key, job_id)
end
if insert_status == -1 then
-- Our place in the queue is somewhere in the middle, search for it
for i, queued_job_id in ipairs(redis.call("LRANGE", queue_key, 0, -1)) do
local queued_job_priority = tonumber(redis.call("HGET", queue_prio_lookup, queued_job_id)) or 0
if queued_job_priority < job_priority then
-- Found the first job with a lower priority, insert before it
insert_status = redis.call("LINSERT", queue_key, "BEFORE", queued_job_id, job_id)
job_index = i
break
end
end
end
if insert_status == -1 then
-- No higher priority jobs found, insert to the end
redis.call("RPUSH", queue_key, job_id)
end
-- Introduce the newly added job to the priority lookup hash
redis.call("HSET", queue_prio_lookup, job_id, job_priority)
return job_index
"""

PRIO_HASH_CLEANER_SCRIPT = """
local queue_key = ARGV[1]
local queue_prio_lookup = ARGV[2]
local queue_len = tonumber(redis.call("LLEN", queue_key))
local hash_len = tonumber(redis.call("HLEN", queue_prio_lookup))
-- Tolerate some excess
if queue_len + 100 >= hash_len then
return -1
end
local new_hash = {}
local count = 0
for count, queued_job_id in ipairs(redis.call("LRANGE", queue_key, 0, -1)) do
local priority = redis.call("HGET", queue_prio_lookup, queued_job_id)
new_hash[queued_job_id] = priority
end
redis.call("DEL", queue_prio_lookup)
for queued_job_id, priority in pairs(new_hash) do
redis.call("HSET", queue_prio_lookup, queued_job_id, priority)
end
return hash_len - count
"""


class PriorityQueue(Queue):
"""Alternate Queue implementation that uses the optional `Job.priority` attribute
to sort the queue when adding jobs.
When using the priority queue, ensure the `PriorityQueue.finish_job()` method is called
for each job after it leaves the queue to trim the job priority lookup hash.
This can be performed by either the consumer or the job manager, or both.
Alternately, or in addition, periodically call `PriorityQueue.periodic_clean()` to
remove stale keys from the priority lookup hash.
"""

def __init__(self, redis: "Redis[bytes]", name: str):
super().__init__(redis, name)
self.add_job_script = redis.register_script(ADD_JOB_SCRIPT)
self.hash_clean_script = redis.register_script(PRIO_HASH_CLEANER_SCRIPT)

@property
def prio_key(self) -> str:
return f"{self.redis_key}prio"

def add_job(self, job: "Job") -> int:
script_response = self.add_job_script(
keys=[self.redis_key, self.prio_key, job.redis_key],
args=[self.redis_key, self.prio_key, job.redis_key, job.id],
)
return cast(int, script_response) - 1

def dequeue_job(self, job_id: str) -> bool:
"""Dequeue a job from any position in the queue, cleaning it from the priority
lookup hash.
"""
self.redis.hdel(self.prio_key, job_id)
num_removed = self.redis.lrem(self.redis_key, 0, job_id)
return num_removed > 0

def clean_job(self, job: "Job") -> None:
"""Cleans up job data after the job has exited the queue."""
self.redis.hdel(self.prio_key, job.id)

def periodic_clean(self) -> int:
"""Perform occasional maintenance on the data structures
:return: Number of cleaned up values
"""
script_response = self.hash_clean_script(
keys=[self.redis_key, self.prio_key],
args=[self.redis_key, self.prio_key],
)
return cast(int, script_response)
Loading

0 comments on commit 29cdf72

Please sign in to comment.