Skip to content

Commit

Permalink
Merge pull request #5 from alansaviolobo/master
Browse files Browse the repository at this point in the history
Support for PostgreSQL
  • Loading branch information
n0nag0n authored Jun 14, 2024
2 parents 6029357 + d270238 commit fd219cd
Show file tree
Hide file tree
Showing 2 changed files with 81 additions and 33 deletions.
24 changes: 22 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,26 @@ $Job_Queue->selectPipeline('send_important_emails');
$Job_Queue->addJob(json_encode([ 'something' => 'that', 'ends' => 'up', 'a' => 'string' ]));
```

#### PostgreSQL
```php
<?php

use n0nag0n\Job_Queue

// default is mysql based job queue
$Job_Queue = new Job_Queue('pgsql', [
'pgsql' => [
'table_name' => 'new_table_name', // default is job_queue_jobs
]
]);

$PDO = new PDO('pgsql:dbname=testdb;host=127.0.0.1', 'user', 'pass');
$Job_Queue->addQueueConnection($PDO);

$Job_Queue->selectPipeline('send_important_emails');
$Job_Queue->addJob(json_encode([ 'something' => 'that', 'ends' => 'up', 'a' => 'string' ]));
```

#### SQLite3
```php
<?php
Expand Down Expand Up @@ -72,7 +92,7 @@ See `example_worker.php` for file or see below:
$Job_Queue = new n0nag0n\Job_Queue('mysql');
$PDO = new PDO('mysql:dbname=testdb;host=127.0.0.1', 'user', 'pass');
$Job_Queue->addQueueConnection($PDO);
$Job_Queue->watchPipeline('some_cool_pipeline_name');
$Job_Queue->watchPipeline('send_important_emails');
while(true) {
$job = $Job_Queue->getNextJobAndReserve();

Expand Down Expand Up @@ -107,4 +127,4 @@ Supervisord is going to be your jam. Look up the many, many articles on how to i
PHPUnit Tests with sqlite3 examples for the time being.
```
vendor/bin/phpunit
```
```
90 changes: 59 additions & 31 deletions src/Job_Queue.php
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
class Job_Queue {

const QUEUE_TYPE_MYSQL = 'mysql';
const QUEUE_TYPE_PGSQL = 'pgsql';
const QUEUE_TYPE_SQLITE = 'sqlite';
const QUEUE_TYPE_BEANSTALKD = 'beanstalkd';

Expand Down Expand Up @@ -51,6 +52,7 @@ class Job_Queue {
*
* @param string $queue_type - self::QUEUE_TYPE_MYSQL is default
* @param array $options
* @throws Exception
*/
public function __construct(string $queue_type = self::QUEUE_TYPE_MYSQL, array $options = []) {

Expand Down Expand Up @@ -127,7 +129,7 @@ public function flushCache(): void {
/**
* Adds a generic connection for the queue type selected
*
* @param mixed $db
* @param mixed $connection
* @return void
*/
public function addQueueConnection($connection) {
Expand All @@ -144,6 +146,7 @@ public function selectPipeline(string $pipeline): Job_Queue {
$this->pipeline = $pipeline;
switch($this->queue_type) {
case self::QUEUE_TYPE_MYSQL:
case self::QUEUE_TYPE_PGSQL:
case self::QUEUE_TYPE_SQLITE:
// do nothing
break;
Expand All @@ -166,6 +169,7 @@ public function watchPipeline(string $pipeline) {
$this->pipeline = $pipeline;
switch($this->queue_type) {
case self::QUEUE_TYPE_MYSQL:
case self::QUEUE_TYPE_PGSQL:
case self::QUEUE_TYPE_SQLITE:
// do nothing
break;
Expand All @@ -176,10 +180,11 @@ public function watchPipeline(string $pipeline) {
}
}

/**
/**
* Runs necessary checks to make sure the queue will work properly
*
* @return void
* @throws Exception
*/
protected function runPreChecks() {

Expand All @@ -191,25 +196,28 @@ protected function runPreChecks() {
throw new Exception('You need to add the connection for this queue type via the addQueueConnection() method first.');
}

if($this->isMysqlQueueType() || $this->isSqliteQueueType()) {
if($this->isMysqlQueueType() || $this->isPgsqlQueueType() || $this->isSqliteQueueType()) {
$this->checkAndIfNecessaryCreateJobQueueTable();
}

}

/**
/**
* Adds a new job to the job queue
*
* @param string $payload
* @param integer $delay
* @param integer $priority
* @return void
* @param integer $time_to_retry
* @return array []
* @throws Exception
*/
public function addJob(string $payload, int $delay = 0, int $priority = 1024, int $time_to_retry = 60) {
$this->runPreChecks();

switch($this->queue_type) {
case self::QUEUE_TYPE_MYSQL:
case self::QUEUE_TYPE_PGSQL:
case self::QUEUE_TYPE_SQLITE:
$table_name = $this->getSqlTableName();
$field_value = $this->isMysqlQueueType() && $this->options['mysql']['use_compression'] === true ? 'COMPRESS(?)' : '?';
Expand Down Expand Up @@ -240,16 +248,18 @@ public function addJob(string $payload, int $delay = 0, int $priority = 1024, in
return $job;
}

/**
/**
* Gets the next available job and reserves it. Sorted by delay and priority
*
* @return mixed
* @return array
* @throws Exception
*/
public function getNextJobAndReserve() {
$this->runPreChecks();
$job = [];
switch($this->queue_type) {
case self::QUEUE_TYPE_MYSQL:
case self::QUEUE_TYPE_PGSQL:
case self::QUEUE_TYPE_SQLITE:
$table_name = $this->getSqlTableName();
$field = $this->isMysqlQueueType() && $this->options['mysql']['use_compression'] === true ? 'UNCOMPRESS(payload) payload' : 'payload';
Expand Down Expand Up @@ -281,17 +291,19 @@ public function getNextJobAndReserve() {
return $job;
}

/**
/**
* Gets the next available job. Sorted by delay and priority
* Requires `selectPipeline()` to be set.
*
* @return mixed
* @return array
* @throws Exception
*/
public function getNextBuriedJob() {
$this->runPreChecks();
$job = [];
switch($this->queue_type) {
case self::QUEUE_TYPE_MYSQL:
case self::QUEUE_TYPE_PGSQL:
case self::QUEUE_TYPE_SQLITE:
$table_name = $this->getSqlTableName();
$field = $this->isMysqlQueueType() && $this->options['mysql']['use_compression'] === true ? 'UNCOMPRESS(payload) payload' : 'payload';
Expand Down Expand Up @@ -325,11 +337,13 @@ public function getNextBuriedJob() {
*
* @param mixed $job
* @return void
* @throws Exception
*/
public function deleteJob($job): void {
$this->runPreChecks();
switch($this->queue_type) {
case self::QUEUE_TYPE_MYSQL:
case self::QUEUE_TYPE_PGSQL:
case self::QUEUE_TYPE_SQLITE:
$table_name = $this->getSqlTableName();
$statement = $this->connection->prepare("DELETE FROM {$table_name} WHERE id = ?");
Expand All @@ -347,11 +361,13 @@ public function deleteJob($job): void {
*
* @param mixed $job
* @return void
* @throws Exception
*/
public function buryJob($job): void {
$this->runPreChecks();
switch($this->queue_type) {
case self::QUEUE_TYPE_MYSQL:
case self::QUEUE_TYPE_PGSQL:
case self::QUEUE_TYPE_SQLITE:
$table_name = $this->getSqlTableName();
$buried_dt = gmdate('Y-m-d H:i:s');
Expand All @@ -370,11 +386,13 @@ public function buryJob($job): void {
*
* @param mixed $job
* @return void
* @throws Exception
*/
public function kickJob($job): void {
$this->runPreChecks();
switch($this->queue_type) {
case self::QUEUE_TYPE_MYSQL:
case self::QUEUE_TYPE_PGSQL:
case self::QUEUE_TYPE_SQLITE:
$table_name = $this->getSqlTableName();
$statement = $this->connection->prepare("UPDATE {$table_name} SET is_buried = 0, buried_dt = NULL WHERE id = ?");
Expand All @@ -396,39 +414,39 @@ public function kickJob($job): void {
public function getJobId($job) {
switch($this->queue_type) {
case self::QUEUE_TYPE_MYSQL:
case self::QUEUE_TYPE_PGSQL:
case self::QUEUE_TYPE_SQLITE:
return $job['id'];

case self::QUEUE_TYPE_BEANSTALKD:
return $job->getId();
break;
}
}

/**
* Gets the job payload from given job
*
* @param mixed $job
* @return string
* @return mixed
*/
public function getJobPayload($job): string {
public function getJobPayload($job) {
switch($this->queue_type) {
case self::QUEUE_TYPE_MYSQL:
case self::QUEUE_TYPE_PGSQL:
case self::QUEUE_TYPE_SQLITE:
return $job['payload'];

case self::QUEUE_TYPE_BEANSTALKD:
return $job->getData();
break;
}
}

/**
* Return quoted identifier name
* @return string
* @param $key
* @param bool $split
**/
* Return quoted identifier name
* @return string
* @param $key
* @param bool $split
*/
protected function quoteDatabaseKey(string $key, bool $split = true): string {
$delims = [
'sqlite2?|mysql'=>'``',
Expand All @@ -450,6 +468,10 @@ public function isMysqlQueueType(): bool {
return $this->queue_type === self::QUEUE_TYPE_MYSQL;
}

public function isPgsqlQueueType(): bool {
return $this->queue_type === self::QUEUE_TYPE_PGSQL;
}

public function isSqliteQueueType(): bool {
return $this->queue_type === self::QUEUE_TYPE_SQLITE;
}
Expand All @@ -462,6 +484,8 @@ protected function getSqlTableName(): string {
$table_name = 'job_queue_jobs';
if($this->isMysqlQueueType() && isset($this->options['mysql']['table_name'])) {
$table_name = $this->options['mysql']['table_name'];
} else if($this->isPgsqlQueueType() && isset($this->options['pgsql']['table_name'])) {
$table_name = $this->options['pgsql']['table_name'];
} else if($this->isSqliteQueueType() && isset($this->options['sqlite']['table_name'])) {
$table_name = $this->options['sqlite']['table_name'];
}
Expand All @@ -477,6 +501,9 @@ protected function checkAndIfNecessaryCreateJobQueueTable(): void {
// Doesn't like this in a prepared statement...
$escaped_table_name = $this->connection->quote($table_name);
$statement = $this->connection->query("SHOW TABLES LIKE {$escaped_table_name}");
} else if($this->isPgsqlQueueType()) {
$escaped_table_name = $this->connection->quote($table_name);
$statement = $this->connection->query("SELECT * FROM pg_catalog.pg_tables WHERE schemaname != 'pg_catalog' AND schemaname != 'information_schema' and tablename like {$escaped_table_name}");
} else {
$statement = $this->connection->prepare("SELECT name FROM sqlite_master WHERE type='table' AND name = ?");
$statement->execute([ $table_name ]);
Expand All @@ -503,22 +530,23 @@ protected function checkAndIfNecessaryCreateJobQueueTable(): void {
KEY `pipeline_send_dt_is_buried_is_reserved` (`pipeline`(75), `send_dt`, `is_buried`, `is_reserved`)
);");
} else {
$field_type = $this->isSqliteQueueType() ? 'INTEGER PRIMARY KEY AUTOINCREMENT' : 'serial';
$this->connection->exec("CREATE TABLE IF NOT EXISTS {$table_name} (
'id' INTEGER PRIMARY KEY AUTOINCREMENT NOT NULL,
'pipeline' TEXT NOT NULL,
'payload' TEXT NOT NULL,
'added_dt' TEXT NOT NULL, -- COMMENT 'In UTC'
'send_dt' TEXT NOT NULL, -- COMMENT 'In UTC'
'priority' INTEGER NOT NULL,
'is_reserved' INTEGER NOT NULL,
'reserved_dt' TEXT NULL, -- COMMENT 'In UTC'
'is_buried' INTEGER NOT NULL,
'buried_dt' TEXT NULL, -- COMMENT 'In UTC'
'time_to_retry_dt' TEXT NOT NULL,
'attempts' INTEGER NOT NULL
id {$field_type} NOT NULL,
pipeline TEXT NOT NULL,
payload TEXT NOT NULL,
added_dt TEXT NOT NULL, -- COMMENT 'In UTC'
send_dt TEXT NOT NULL, -- COMMENT 'In UTC'
priority INTEGER NOT NULL,
is_reserved INTEGER NOT NULL,
reserved_dt TEXT NULL, -- COMMENT 'In UTC'
is_buried INTEGER NOT NULL,
buried_dt TEXT NULL, -- COMMENT 'In UTC'
time_to_retry_dt TEXT NOT NULL,
attempts INTEGER NOT NULL
);");

$this->connection->exec("CREATE INDEX pipeline_send_dt_is_buried_is_reserved ON {$table_name} ('pipeline', 'send_dt', 'is_buried', 'is_reserved')");
$this->connection->exec("CREATE INDEX IF NOT EXISTS pipeline_send_dt_is_buried_is_reserved ON {$table_name} (pipeline, send_dt, is_buried, is_reserved)");
}
}
$cache['job-queue-table-check'] = true;
Expand Down

0 comments on commit fd219cd

Please sign in to comment.