A library for simple PHP job queueing, execution and management, used by Openclerk and live on CryptFolio.
While cron jobs are a simple approach to running regular tasks,
openclerk/jobs
allows tasks to be defined, executed and managed in reliable ways.
Include openclerk/jobs
as a requirement in your project composer.json
,
and run composer update
to install it into your project:
{
"require": {
"openclerk/jobs": "dev-master"
}
}
- Queue up jobs immediately for execution later
- Jobs can return success or failure (by throwing exceptions)
- Repeatedly failing jobs can be removed from the job execution queue
- Define your own job selection algorithms
- Any exceptions thrown during job execution are stored in the
job_exceptions
table
use \Openclerk\Jobs\Job;
use \Db\Connection;
use \Monolog\Logger;
class MyJob implements Job {
/**
* @param $job the `job` instance, an array of `job_type`, `arg_id` and optionally `user_id`
*/
function __construct($job) {
$this->job = $job;
}
function run(Connection $db, Logger $logger) {
$q = $db->prepare("SELECT * FROM table WHERE id=?");
$q->execute(array($this->job['arg_id']));
if (!$q->fetch()) {
throw new \Exception("Could not find that instance");
}
}
function passed(Connection $db, Logger $logger) {
// (optional) the job passed
}
function failed(\Exception $runtime_exception, Connection $db, Logger $logger) {
// (optional) the job failed
}
}
use \Openclerk\Jobs\JobQueuer;
use \Openclerk\Jobs\Job;
use \Db\Connection;
use \Monolog\Logger;
class MyJobQueuer extends JobQueuer {
/**
* Get a list of all jobs that need to be queued, as an array of associative
* arrays with (job_type, arg_id, [user_id]).
*/
function findJobs(Connection $db, Logger $logger) {
$result = array();
$q = $db->prepare("SELECT * FROM table WHERE is_queued=0");
$q->execute();
while ($r = $q->fetch()) {
$result[] = array(
'job_type' => 'table',
'arg_id' => $r['id'],
// optional: user_id
);
}
return $result;
}
/**
* The given job has been queued up, so we can mark it as successfully queued.
*/
function jobQueued(Connection $db, Logger $logger, $job) {
$q = $db->prepare("UPDATE table SET is_queued=1 WHERE id=?");
$q->execute(array($job['arg_id']));
}
}
use \Openclerk\Jobs\JobRunner;
use \Openclerk\Jobs\Job;
use \Db\Connection;
use \Monolog\Logger;
class MyJobRunner extends JobRunner {
/**
* Get the {@link Job} to run for this job type.
*/
function createJob($job, Connection $db, Logger $logger) {
switch ($job['job_type']) {
case 'table':
return new MyJob($job);
}
}
}
For example, a batch script to queue up new jobs:
$logger = new \Monolog\Logger("batch_queue");
$logger->pushHandler(new \Core\MyLogger());
$runner = new MyJobQueuer();
$runner->queue(db(), $logger);
Or, a batch script to run a single job:
$logger = new \Monolog\Logger("batch_run");
$logger->pushHandler(new \Core\MyLogger());
$runner = new MyJobRunner();
$runner->runOne(db(), $logger);
These batch scripts can then be setup with cron, etc.
- Using
require()
for running jobs instead of classes - Run jobs only of a certain type
- Run jobs only of users with particular properties
- Run jobs only with a particular job ID
- Run jobs from a web admin interface
- Capture jobs that timeout
- job_failed events through openclerk/events
- More tests