This is the set of all the lua scripts that comprise the qless library. We've begun migrating away from the system of having one lua script per command to a more object-oriented approach where all code is contained in a single unified lua script.
There are a few reasons for making this choice, but essentially it was getting too difficult to maintain and there was a lot of duplicated code in different sections. This also happens to have the added benefit of allowing you to build on top of the qless core library within your own lua scripts through composition.
For ease of development, we've broken this unified file into several smaller
submodules that are concatenated together into a qless.lua
script. These are:
base.lua
-- forward declarations and some uncategorized functionsconfig.lua
-- all configuration interactionsjob.lua
-- the regular job classqueue.lua
-- the queue classrecurring.lua
-- the recurring job classworker.lua
-- manage available workersapi.lua
-- exposing the interfaces that the clients invoke, it's a very thin wrapper around these classes
In order to build up the qless.lua
script, we've included a simple Makefile
though all it does is cat these files out in a particular order:
make qless.lua
If you'd like to use just the core library within your lua script, you can get lua script that contains all the classes, but none of the wrapping layer that the qless clients use:
make qless-lib.lua
Historically, tests have appeared only in the language-specific bindings of
qless, but that has become a tedious process. Not to mention the fact that
it's a steep barrier to entry for writing new clients. In light of that, we
now include tests directly in qless-core
, written in python. To run these,
you will need python and the py.test
and redis
libraries. If you have pip
installed:
pip -r requirements.txt
To run the tests, there is a directive included in the makefile:
make test
If you have Redis running somewhere other than localhost:6379
, you can supply
the REDIS_URL
environment variable:
REDIS_URL='redis://host:port' make test
There is docker compose file to run redis and test the library.
docker compose run --rm shell
To get a shell with python installed and redis running.
pip install -r requirements.txt && make test
To run the tests.
When originally developing this, I wrote some functions using the KEYS
portion of the lua scripts, but eventually realized that to do so didn't make
any sense. For just about all operations there's no way to determine a priori
which Redis keys would be touched, and so I abandoned that idea. However, in
many cases there were vestigial KEYS
in use, but that has now changed. No
more KEYS
!
To ease the client logic, every command now takes a timestamp with it. In many
cases this argument is ignored, but it is still required in order to make a
valid call. This requirement only comes through in the exposed script API, but
not in the class interface. At the class function level, only the functions
which require the now
argument list it.
The documentation of the code is present in each of the modules, but it is excluded from the production code to reduce the weight of it.
A worker is given an exclusive lock on a piece of work when it is given that piece of work. That lock may be renewed periodically so long as it's before the provided 'heartbeat' timestamp. Likewise, it may be completed.
If a worker attempts to heartbeat a job, it may optionally provide an updated
JSON blob to describe the job. If the job has been given to another worker,
the heartbeat should return false
and the worker should yield.
When a node attempts to heartbeat, the lua script should check to see if the node attempting to renew the lock is the same node that currently owns the lock. If so, then the lock's expiration should be pushed back accordingly, and the updated expiration returned. If not, an exception is raised.
Qless also collects statistics for job wait time (time popped - time put), and job completion time (time completed - time popped). By 'statistics', I mean average, variange, count and a histogram. Stats for the number of failures and retries for a given queue are also available.
Stats are grouped by day. In the case of job wait time, its stats are aggregated on the day when the job was popped. In the case of completion time, they are grouped by the day it was completed.
Jobs can be tracked, which just means that they are accessible and displayable.
This can be useful if you just want to keep tabs on the progress of jobs
through the pipeline. All the currently-tracked jobs are stored in a sorted
set, ql:tracked
.
Failures are stored in such a way that we can quickly summarize the number of
failures of a given type, but also which items have succumb to that type of
failure. With that in mind, there is a Redis set, ql:failures
whose members
are the names of the various failure lists. Each type of failure then has its
own list of instance ids that encountered such a failure. For example, we
might have:
ql:failures
=============
upload error
widget failure
ql:f:upload error
==================
deadbeef
...
We'll keep a sorted set of workers sorted by the last time they had any
activity. We'll store this set at ql:workers
.
In addition to this list, we'll keep a set of the jids that a worker currently
has locks for at ql:w:<worker>:jobs
. This should be sorted by the time when
we last saw a heartbeat (or pop) for that worker from that job.
TBD We will likely store data about each worker. Perhaps this, too, can be kept by day.
We should delete data about completed jobs periodically. We should prune both
by the policies for the maximum number of retained completed jobs, and by the
maximum age for retained jobs. To accomplish this, we'll use a sorted list to
keep track of which items should be expired. This list should be stored in the
key ql:completed
The configuration should go in the key ql:config
, and here are some of the
configuration options that qless
is meant to support:
heartbeat
(60) -- The default heartbeat in seconds for queuesstats-history
(30) -- The number of days to store summary statshistogram-history
(7) -- The number of days to store histogram datajobs-history-count
(50k) -- How many jobs to keep data for after they're completedjobs-history
(7 _ 24 _ 60 * 60) -- How many seconds to keep jobs after they're completedheartbeat-<queue name>
-- The heartbeat interval (in seconds) for a particular queuemax-worker-age
-- How long before workers are considered disappeared<queue>-max-concurrency
-- The maximum number of jobs that can be running in a queue. If this number is reduced, it does not impact any currently-running jobsmax-job-history
-- The maximum number of items in a job's history. This can be used to help control the size of long-running jobs' history
This section stands to speak to the internal structure and naming conventions.
Each job is stored primarily in a key ql:j:<jid>
, a Redis hash, which
contains most of the keys that describe the job. A set (possibly empty)
of jids on which this job depends is stored in ql:j:<jid>-dependencies
.
A set (also possibly empty) of jids that rely on the completion of this
job is stored in ql:j:<jid>-dependents
. For example, ql:j:<jid>
:
{
# This is the same id as identifies it in the key. It should be
# a hex value of a uuid
'jid' : 'deadbeef...',
# This is a 'type' identifier. Clients may choose to ignore it,
# or use it as a language-specific identifier for determining
# what code to run. For instance, it might be 'foo.bar.FooJob'
'type' : '...',
# This is the priority of the job -- lower means more priority.
# The default is 0
'priority' : 0,
# This is the user data associated with the job. (JSON blob)
'data' : '{"hello": "how are you"}',
# A JSON array of tags associated with this job
'tags' : '["testing", "experimental"]',
# The worker ID of the worker that owns it. Currently the worker
# id is <hostname>-<pid>
'worker' : 'ec2-...-4925',
# This is the time when it must next check in
'expires' : 1352375209,
# The current state of the job: 'waiting', 'pending', 'complete'
'state' : 'waiting',
# The queue that it's associated with. 'null' if complete
'queue' : 'example',
# The maximum number of retries this job is allowed per queue
'retries' : 3,
# The number of retries remaining
'remaining' : 3,
# The jids that depend on this job's completion
'dependents' : [...],
# The jids that this job is dependent upon
'dependencies': [...],
# A list of all the things that have happened to a job. Each entry has
# the keys 'what' and 'when', but it may also have arbitrary keys
# associated with it.
'history' : [
{
'what' : 'Popped',
'when' : 1352075209,
...
}, {
...
}
]
}
A queue is a priority queue and consists of three parts:
ql:q:<name>-scheduled
-- sorted set of all scheduled job idsql:q:<name>-work
-- sorted set (by priority) of all jobs waitingql:q:<name>-locks
-- sorted set of job locks and expirationsql:q:<name>-depends
-- sorted set of jobs in a queue, but waiting on other jobs
When looking for a unit of work, the client should first choose from the
next expired lock. If none are expired, then we should next make sure that
any jobs that should now be considered eligible (the scheduled time is in
the past) are then inserted into the work queue. A sorted set of all the
known queues is maintained at ql:queues
. Currently we're keeping it
sorted based on the time when we first saw the queue, but that's a little
bit at odd with only keeping queues around while they're being used.
When a job is completed, it removes itself as a dependency of all the jobs that depend on it. If it was the last job that a job depended on, it is then inserted into the queue's work.
Stats are grouped by day and queue. The day portion of the stats key is an integer timestamp of midnight for that day:
<day> = time - (time % (24 * 60 * 60))
Stats are stored under two hashes: ql:s:wait:<day>:<queue>
and
ql:s:run:<day>:<queue>
respectively. Each has the keys:
total
-- The total number of data points containedmean
-- The current mean valuevk
-- Not the actual variance, but a number that can be used to both numerically stable-ly find the variance, and compute it in a streaming fashions1
,s2
, ..., -- second-resolution histogram counts for the first minutem1
,m2
, ..., -- minute-resolution for the first hourh1
,h2
, ..., -- hour-resolution for the first dayd1
,d2
, ..., -- day-resolution for the rest
This is also another hash, ql:s:stats:<day>:<queue>
with keys:
failures
-- This is how many failures there have been. If a job is run twice and fails repeatedly, this is incremented twice.failed
-- This is how many are currently failedretries
-- This is how many jobs we've had to retry
All jobs store a JSON array of the tags that are associated with it. In
addition, the keys ql:t:<tag>
store a sorted set of all the jobs associated
with that particular tag. The score of each jid in that tag is the time when
that tag was added to that job. When jobs are tagged a second time with an
existing tag, then it's a no-op.
There are a few nuanced aspects of implementing bindings for your particular language that are worth bringing up. The canonical example for bindings should be the python and ruby bindings.
We recommend using git submodules to keep qless-core in your bindings.
The majority of tests are implemented in qless-core
, and so your bindings
should merely test that they provide sensible access to the functionality. This
should include a notion of queues
, workers
, jobs
, etc.
If your language supports dynamic importing of code, and in particular if a
class can be imported deterministically from a string identifier, then you
should include a worker executable with your release. For example, in Python,
given the class foo.Job
, that string is enough to know what module to import.
As such, a worker binary can just be given a list of queues, a number (and
perhaps type) of workers, wait intervals, etc., and then can import all the
code required to perform work.
Jobs with identical priorities are popped in the order they were inserted. The caveat is that it's only true to the precision of the timestamps your bindings provide. For example, if you provide timestamps to the second granularity, then jobs with the same priority inserted in the same second can be popped in any order. Timestamps at the thousandths of a second granularity will maintain this property better. While for most applications it's likely not important, it is something to be aware of when writing language bindings.
It's intended to be a common usecase that bindings provide a worker script or binary that runs several worker subprocesses. These should run with their working directory as a sandbox.
There are a couple of philosophies regarding how to best fork processes to do work. Certainly, there should be a parent process that manages child processes, if for no other reason than to ensure that child workers are well-behaved. But how exactly the child processes work is less clear. We encourage you to make all models available in your client:
- Fork once for each job -- This has the added benefit of containing potential issues like resource leaks, but it comes at the potentially high cost of forking once for each job.
- Fork long-running processes -- Forking long-running processes means that you will likely to be able to saturate the CPUs on a machine more easily, and reduces the cost per job of forking.
- Coroutines in long-running processes -- Especially for I/O-bound processes this is handy, since you can keep the number of processes relatively small and still get good I/O parallelism.
Each style of worker should be able to listen for worker-specific lock_lost
,
canceled
and put
events, each of which can signal that a worker has lost
its right to process a job. If that's discovered, a parent process could take
the opportunity to stop the child worker that's currently running that job (if
it exists). While qless
ensures correct behavior when taking action on jobs
where a lock has been lost, this is an opportunity to gain efficiency.
Workers are allowed (and encouraged) to pop off of more than one queue. But
then we get into the problem of what order they should be polled. Workers
should support two modes of popping: ordered and round-robin. Consider queues
A
, B
, and C
with job counts:
A: 5
B: 2
C: 3
In an ordered verion, the order in which the queues are specified has
significance in the order in which jobs are popped. For example, if our queued
were ordered C, B, A
in the worker, we'd pop jobs off:
C, C, C, B, B, A, A, A, A, A
In the round-robin implementation, a worker pops off a job from each queue as it progress through all queues:
C, B, A, C, B, A, C, A, A, A
These aren't meant to be stringent, but just to keep myself sane so that when moving between different chunks of code that it's all formatted similarly, and the same variable names have the same meaning.
- Parameter sanitization should be performed as early as possible. This
includes making use of
assert
anderror
based on the number and type of arguments. - Job ids should be referred to as
jid
, both internally and in the clients. - Failure types should be described with
group
. I'm not terribly thrilled with the term, but I thought it was better than 'kind.' After spending some time with a Thesaurus, I didn't find anything that appealed to me more - Job types should be described as
klass
(nod to Resque), because both 'type' and 'class' are commonly used in languages.