From 17ef648573dee4453ad6863c8e3468b27bb0c4ce Mon Sep 17 00:00:00 2001 From: Alan Lobo Date: Tue, 7 May 2024 20:57:49 +0530 Subject: [PATCH 1/5] Initial commit for supporting pgsql backend. --- src/Job_Queue.php | 145 +++++++++++++++++++++++++++------------------- 1 file changed, 86 insertions(+), 59 deletions(-) diff --git a/src/Job_Queue.php b/src/Job_Queue.php index c1c674d..90f7778 100644 --- a/src/Job_Queue.php +++ b/src/Job_Queue.php @@ -8,6 +8,7 @@ class Job_Queue { const QUEUE_TYPE_MYSQL = 'mysql'; + const QUEUE_TYPE_PGSQL = 'pgsql'; const QUEUE_TYPE_SQLITE = 'sqlite'; const QUEUE_TYPE_BEANSTALKD = 'beanstalkd'; @@ -46,12 +47,13 @@ class Job_Queue { */ protected static $cache = []; - /** - * The construct - * - * @param string $queue_type - self::QUEUE_TYPE_MYSQL is default - * @param array $options - */ + /** + * The construct + * + * @param string $queue_type - self::QUEUE_TYPE_MYSQL is default + * @param array $options + * @throws Exception + */ public function __construct(string $queue_type = self::QUEUE_TYPE_MYSQL, array $options = []) { if(empty($queue_type)) { @@ -127,7 +129,7 @@ public function flushCache(): void { /** * Adds a generic connection for the queue type selected * - * @param mixed $db + * @param mixed $connection * @return void */ public function addQueueConnection($connection) { @@ -144,6 +146,7 @@ public function selectPipeline(string $pipeline): Job_Queue { $this->pipeline = $pipeline; switch($this->queue_type) { case self::QUEUE_TYPE_MYSQL: + case self::QUEUE_TYPE_PGSQL: case self::QUEUE_TYPE_SQLITE: // do nothing break; @@ -165,8 +168,9 @@ public function selectPipeline(string $pipeline): Job_Queue { public function watchPipeline(string $pipeline) { $this->pipeline = $pipeline; switch($this->queue_type) { - case self::QUEUE_TYPE_MYSQL: - case self::QUEUE_TYPE_SQLITE: + case self::QUEUE_TYPE_MYSQL: + case self::QUEUE_TYPE_PGSQL: + case self::QUEUE_TYPE_SQLITE: // do nothing break; @@ -176,11 +180,12 @@ public function watchPipeline(string $pipeline) { } } - /** - * Runs necessary checks to make sure the queue will work properly - * - * @return void - */ + /** + * Runs necessary checks to make sure the queue will work properly + * + * @return void + * @throws Exception + */ protected function runPreChecks() { if(empty($this->pipeline)) { @@ -191,25 +196,28 @@ protected function runPreChecks() { throw new Exception('You need to add the connection for this queue type via the addQueueConnection() method first.'); } - if($this->isMysqlQueueType() || $this->isSqliteQueueType()) { + if($this->isMysqlQueueType() || $this->isPgsqlQueueType() || $this->isSqliteQueueType()) { $this->checkAndIfNecessaryCreateJobQueueTable(); } } - /** - * Adds a new job to the job queue - * - * @param string $payload - * @param integer $delay - * @param integer $priority - * @return void - */ + /** + * Adds a new job to the job queue + * + * @param string $payload + * @param integer $delay + * @param integer $priority + * @param integer $time_to_retry + * @return array [] + * @throws Exception + */ public function addJob(string $payload, int $delay = 0, int $priority = 1024, int $time_to_retry = 60) { $this->runPreChecks(); switch($this->queue_type) { case self::QUEUE_TYPE_MYSQL: + case self::QUEUE_TYPE_PGSQL: case self::QUEUE_TYPE_SQLITE: $table_name = $this->getSqlTableName(); $field_value = $this->isMysqlQueueType() && $this->options['mysql']['use_compression'] === true ? 'COMPRESS(?)' : '?'; @@ -240,16 +248,18 @@ public function addJob(string $payload, int $delay = 0, int $priority = 1024, in return $job; } - /** - * Gets the next available job and reserves it. Sorted by delay and priority - * - * @return mixed - */ + /** + * Gets the next available job and reserves it. Sorted by delay and priority + * + * @return array + * @throws Exception + */ public function getNextJobAndReserve() { $this->runPreChecks(); $job = []; switch($this->queue_type) { case self::QUEUE_TYPE_MYSQL: + case self::QUEUE_TYPE_PGSQL: case self::QUEUE_TYPE_SQLITE: $table_name = $this->getSqlTableName(); $field = $this->isMysqlQueueType() && $this->options['mysql']['use_compression'] === true ? 'UNCOMPRESS(payload) payload' : 'payload'; @@ -281,17 +291,19 @@ public function getNextJobAndReserve() { return $job; } - /** - * Gets the next available job. Sorted by delay and priority - * Requires `selectPipeline()` to be set. - * - * @return mixed - */ + /** + * Gets the next available job. Sorted by delay and priority + * Requires `selectPipeline()` to be set. + * + * @return array + * @throws Exception + */ public function getNextBuriedJob() { $this->runPreChecks(); $job = []; switch($this->queue_type) { case self::QUEUE_TYPE_MYSQL: + case self::QUEUE_TYPE_PGSQL: case self::QUEUE_TYPE_SQLITE: $table_name = $this->getSqlTableName(); $field = $this->isMysqlQueueType() && $this->options['mysql']['use_compression'] === true ? 'UNCOMPRESS(payload) payload' : 'payload'; @@ -320,16 +332,18 @@ public function getNextBuriedJob() { return $job; } - /** - * Deletes a job - * - * @param mixed $job - * @return void - */ + /** + * Deletes a job + * + * @param mixed $job + * @return void + * @throws Exception + */ public function deleteJob($job): void { $this->runPreChecks(); switch($this->queue_type) { case self::QUEUE_TYPE_MYSQL: + case self::QUEUE_TYPE_PGSQL: case self::QUEUE_TYPE_SQLITE: $table_name = $this->getSqlTableName(); $statement = $this->connection->prepare("DELETE FROM {$table_name} WHERE id = ?"); @@ -342,16 +356,18 @@ public function deleteJob($job): void { } } - /** - * Buries (hides) a job - * - * @param mixed $job - * @return void - */ + /** + * Buries (hides) a job + * + * @param mixed $job + * @return void + * @throws Exception + */ public function buryJob($job): void { $this->runPreChecks(); switch($this->queue_type) { case self::QUEUE_TYPE_MYSQL: + case self::QUEUE_TYPE_PGSQL: case self::QUEUE_TYPE_SQLITE: $table_name = $this->getSqlTableName(); $buried_dt = gmdate('Y-m-d H:i:s'); @@ -365,16 +381,18 @@ public function buryJob($job): void { } } - /** - * Kicks (releases, unburies) job - * - * @param mixed $job - * @return void - */ + /** + * Kicks (releases, unburies) job + * + * @param mixed $job + * @return void + * @throws Exception + */ public function kickJob($job): void { $this->runPreChecks(); switch($this->queue_type) { case self::QUEUE_TYPE_MYSQL: + case self::QUEUE_TYPE_PGSQL: case self::QUEUE_TYPE_SQLITE: $table_name = $this->getSqlTableName(); $statement = $this->connection->prepare("UPDATE {$table_name} SET is_buried = 0, buried_dt = NULL WHERE id = ?"); @@ -396,12 +414,12 @@ public function kickJob($job): void { public function getJobId($job) { switch($this->queue_type) { case self::QUEUE_TYPE_MYSQL: + case self::QUEUE_TYPE_PGSQL: case self::QUEUE_TYPE_SQLITE: return $job['id']; case self::QUEUE_TYPE_BEANSTALKD: return $job->getId(); - break; } } @@ -409,17 +427,17 @@ public function getJobId($job) { * Gets the job payload from given job * * @param mixed $job - * @return string + * @return mixed */ - public function getJobPayload($job): string { + public function getJobPayload($job) { switch($this->queue_type) { case self::QUEUE_TYPE_MYSQL: + case self::QUEUE_TYPE_PGSQL: case self::QUEUE_TYPE_SQLITE: return $job['payload']; case self::QUEUE_TYPE_BEANSTALKD: return $job->getData(); - break; } } @@ -450,6 +468,10 @@ public function isMysqlQueueType(): bool { return $this->queue_type === self::QUEUE_TYPE_MYSQL; } + public function isPgsqlQueueType(): bool { + return $this->queue_type === self::QUEUE_TYPE_PGSQL; + } + public function isSqliteQueueType(): bool { return $this->queue_type === self::QUEUE_TYPE_SQLITE; } @@ -462,7 +484,9 @@ protected function getSqlTableName(): string { $table_name = 'job_queue_jobs'; if($this->isMysqlQueueType() && isset($this->options['mysql']['table_name'])) { $table_name = $this->options['mysql']['table_name']; - } else if($this->isSqliteQueueType() && isset($this->options['sqlite']['table_name'])) { + } else if($this->isPgsqlQueueType() && isset($this->options['pgsql']['table_name'])) { + $table_name = $this->options['pgsql']['table_name']; + } else if($this->isSqliteQueueType() && isset($this->options['sqlite']['table_name'])) { $table_name = $this->options['sqlite']['table_name']; } return $this->quoteDatabaseKey($table_name); @@ -477,14 +501,17 @@ protected function checkAndIfNecessaryCreateJobQueueTable(): void { // Doesn't like this in a prepared statement... $escaped_table_name = $this->connection->quote($table_name); $statement = $this->connection->query("SHOW TABLES LIKE {$escaped_table_name}"); - } else { + } else if($this->isPgsqlQueueType()) { + $escaped_table_name = $this->connection->quote($table_name); + $statement = $this->connection->query("SELECT * FROM pg_catalog.pg_tables WHERE schemaname != 'pg_catalog' AND schemaname != 'information_schema' and tablename like {$escaped_table_name}"); + } else { $statement = $this->connection->prepare("SELECT name FROM sqlite_master WHERE type='table' AND name = ?"); $statement->execute([ $table_name ]); } $has_table = !!count($statement->fetchAll(PDO::FETCH_ASSOC)); if(!$has_table) { - if($this->isMysqlQueueType()) { + if($this->isMysqlQueueType() or $this->isPgsqlQueueType()) { $field_type = $this->options['mysql']['use_compression'] ? 'longblob' : 'longtext'; $this->connection->exec("CREATE TABLE IF NOT EXISTS {$table_name} ( `id` int(11) NOT NULL AUTO_INCREMENT, From f2c8cbe2b150f5e858bd1e9d52611f0f13c4486f Mon Sep 17 00:00:00 2001 From: Alan Lobo Date: Tue, 7 May 2024 22:39:27 +0530 Subject: [PATCH 2/5] postgres table creation. --- src/Job_Queue.php | 29 +++++++++++++++-------------- 1 file changed, 15 insertions(+), 14 deletions(-) diff --git a/src/Job_Queue.php b/src/Job_Queue.php index 90f7778..03209d8 100644 --- a/src/Job_Queue.php +++ b/src/Job_Queue.php @@ -511,7 +511,7 @@ protected function checkAndIfNecessaryCreateJobQueueTable(): void { $has_table = !!count($statement->fetchAll(PDO::FETCH_ASSOC)); if(!$has_table) { - if($this->isMysqlQueueType() or $this->isPgsqlQueueType()) { + if($this->isMysqlQueueType()) { $field_type = $this->options['mysql']['use_compression'] ? 'longblob' : 'longtext'; $this->connection->exec("CREATE TABLE IF NOT EXISTS {$table_name} ( `id` int(11) NOT NULL AUTO_INCREMENT, @@ -530,22 +530,23 @@ protected function checkAndIfNecessaryCreateJobQueueTable(): void { KEY `pipeline_send_dt_is_buried_is_reserved` (`pipeline`(75), `send_dt`, `is_buried`, `is_reserved`) );"); } else { + $field_type = $this->isSqliteQueueType() ? 'INTEGER PRIMARY KEY AUTOINCREMENT' : 'serial'; $this->connection->exec("CREATE TABLE IF NOT EXISTS {$table_name} ( - 'id' INTEGER PRIMARY KEY AUTOINCREMENT NOT NULL, - 'pipeline' TEXT NOT NULL, - 'payload' TEXT NOT NULL, - 'added_dt' TEXT NOT NULL, -- COMMENT 'In UTC' - 'send_dt' TEXT NOT NULL, -- COMMENT 'In UTC' - 'priority' INTEGER NOT NULL, - 'is_reserved' INTEGER NOT NULL, - 'reserved_dt' TEXT NULL, -- COMMENT 'In UTC' - 'is_buried' INTEGER NOT NULL, - 'buried_dt' TEXT NULL, -- COMMENT 'In UTC' - 'time_to_retry_dt' TEXT NOT NULL, - 'attempts' INTEGER NOT NULL + id {$field_type} NOT NULL, + pipeline TEXT NOT NULL, + payload TEXT NOT NULL, + added_dt TEXT NOT NULL, -- COMMENT 'In UTC' + send_dt TEXT NOT NULL, -- COMMENT 'In UTC' + priority INTEGER NOT NULL, + is_reserved INTEGER NOT NULL, + reserved_dt TEXT NULL, -- COMMENT 'In UTC' + is_buried INTEGER NOT NULL, + buried_dt TEXT NULL, -- COMMENT 'In UTC' + time_to_retry_dt TEXT NOT NULL, + attempts INTEGER NOT NULL );"); - $this->connection->exec("CREATE INDEX pipeline_send_dt_is_buried_is_reserved ON {$table_name} ('pipeline', 'send_dt', 'is_buried', 'is_reserved')"); + $this->connection->exec("CREATE INDEX pipeline_send_dt_is_buried_is_reserved ON {$table_name} (pipeline, send_dt, is_buried, is_reserved)"); } } $cache['job-queue-table-check'] = true; From ede292f1e6d5079eb688bd0027786ba6e3ae6dd8 Mon Sep 17 00:00:00 2001 From: Alan Lobo Date: Fri, 7 Jun 2024 09:36:27 +0530 Subject: [PATCH 3/5] bugfix --- src/Job_Queue.php | 22 +++++++++++----------- 1 file changed, 11 insertions(+), 11 deletions(-) diff --git a/src/Job_Queue.php b/src/Job_Queue.php index 03209d8..1a9248e 100644 --- a/src/Job_Queue.php +++ b/src/Job_Queue.php @@ -8,7 +8,7 @@ class Job_Queue { const QUEUE_TYPE_MYSQL = 'mysql'; - const QUEUE_TYPE_PGSQL = 'pgsql'; + const QUEUE_TYPE_PGSQL = 'pgsql'; const QUEUE_TYPE_SQLITE = 'sqlite'; const QUEUE_TYPE_BEANSTALKD = 'beanstalkd'; @@ -146,7 +146,7 @@ public function selectPipeline(string $pipeline): Job_Queue { $this->pipeline = $pipeline; switch($this->queue_type) { case self::QUEUE_TYPE_MYSQL: - case self::QUEUE_TYPE_PGSQL: + case self::QUEUE_TYPE_PGSQL: case self::QUEUE_TYPE_SQLITE: // do nothing break; @@ -217,7 +217,7 @@ public function addJob(string $payload, int $delay = 0, int $priority = 1024, in switch($this->queue_type) { case self::QUEUE_TYPE_MYSQL: - case self::QUEUE_TYPE_PGSQL: + case self::QUEUE_TYPE_PGSQL: case self::QUEUE_TYPE_SQLITE: $table_name = $this->getSqlTableName(); $field_value = $this->isMysqlQueueType() && $this->options['mysql']['use_compression'] === true ? 'COMPRESS(?)' : '?'; @@ -259,7 +259,7 @@ public function getNextJobAndReserve() { $job = []; switch($this->queue_type) { case self::QUEUE_TYPE_MYSQL: - case self::QUEUE_TYPE_PGSQL: + case self::QUEUE_TYPE_PGSQL: case self::QUEUE_TYPE_SQLITE: $table_name = $this->getSqlTableName(); $field = $this->isMysqlQueueType() && $this->options['mysql']['use_compression'] === true ? 'UNCOMPRESS(payload) payload' : 'payload'; @@ -303,7 +303,7 @@ public function getNextBuriedJob() { $job = []; switch($this->queue_type) { case self::QUEUE_TYPE_MYSQL: - case self::QUEUE_TYPE_PGSQL: + case self::QUEUE_TYPE_PGSQL: case self::QUEUE_TYPE_SQLITE: $table_name = $this->getSqlTableName(); $field = $this->isMysqlQueueType() && $this->options['mysql']['use_compression'] === true ? 'UNCOMPRESS(payload) payload' : 'payload'; @@ -343,7 +343,7 @@ public function deleteJob($job): void { $this->runPreChecks(); switch($this->queue_type) { case self::QUEUE_TYPE_MYSQL: - case self::QUEUE_TYPE_PGSQL: + case self::QUEUE_TYPE_PGSQL: case self::QUEUE_TYPE_SQLITE: $table_name = $this->getSqlTableName(); $statement = $this->connection->prepare("DELETE FROM {$table_name} WHERE id = ?"); @@ -367,7 +367,7 @@ public function buryJob($job): void { $this->runPreChecks(); switch($this->queue_type) { case self::QUEUE_TYPE_MYSQL: - case self::QUEUE_TYPE_PGSQL: + case self::QUEUE_TYPE_PGSQL: case self::QUEUE_TYPE_SQLITE: $table_name = $this->getSqlTableName(); $buried_dt = gmdate('Y-m-d H:i:s'); @@ -392,7 +392,7 @@ public function kickJob($job): void { $this->runPreChecks(); switch($this->queue_type) { case self::QUEUE_TYPE_MYSQL: - case self::QUEUE_TYPE_PGSQL: + case self::QUEUE_TYPE_PGSQL: case self::QUEUE_TYPE_SQLITE: $table_name = $this->getSqlTableName(); $statement = $this->connection->prepare("UPDATE {$table_name} SET is_buried = 0, buried_dt = NULL WHERE id = ?"); @@ -414,7 +414,7 @@ public function kickJob($job): void { public function getJobId($job) { switch($this->queue_type) { case self::QUEUE_TYPE_MYSQL: - case self::QUEUE_TYPE_PGSQL: + case self::QUEUE_TYPE_PGSQL: case self::QUEUE_TYPE_SQLITE: return $job['id']; @@ -432,7 +432,7 @@ public function getJobId($job) { public function getJobPayload($job) { switch($this->queue_type) { case self::QUEUE_TYPE_MYSQL: - case self::QUEUE_TYPE_PGSQL: + case self::QUEUE_TYPE_PGSQL: case self::QUEUE_TYPE_SQLITE: return $job['payload']; @@ -546,7 +546,7 @@ protected function checkAndIfNecessaryCreateJobQueueTable(): void { attempts INTEGER NOT NULL );"); - $this->connection->exec("CREATE INDEX pipeline_send_dt_is_buried_is_reserved ON {$table_name} (pipeline, send_dt, is_buried, is_reserved)"); + $this->connection->exec("CREATE INDEX IF NOT EXISTS pipeline_send_dt_is_buried_is_reserved ON {$table_name} (pipeline, send_dt, is_buried, is_reserved)"); } } $cache['job-queue-table-check'] = true; From 51cc2f3bed2de6042f02c026ce06c5e63896c7e4 Mon Sep 17 00:00:00 2001 From: Alan Lobo Date: Sat, 8 Jun 2024 11:24:57 +0530 Subject: [PATCH 4/5] correcting code formatting --- src/Job_Queue.php | 148 +++++++++++++++++++++++----------------------- 1 file changed, 74 insertions(+), 74 deletions(-) diff --git a/src/Job_Queue.php b/src/Job_Queue.php index 1a9248e..33ab896 100644 --- a/src/Job_Queue.php +++ b/src/Job_Queue.php @@ -47,13 +47,13 @@ class Job_Queue { */ protected static $cache = []; - /** - * The construct - * - * @param string $queue_type - self::QUEUE_TYPE_MYSQL is default - * @param array $options - * @throws Exception - */ + /** + * The construct + * + * @param string $queue_type - self::QUEUE_TYPE_MYSQL is default + * @param array $options + * @throws Exception + */ public function __construct(string $queue_type = self::QUEUE_TYPE_MYSQL, array $options = []) { if(empty($queue_type)) { @@ -168,9 +168,9 @@ public function selectPipeline(string $pipeline): Job_Queue { public function watchPipeline(string $pipeline) { $this->pipeline = $pipeline; switch($this->queue_type) { - case self::QUEUE_TYPE_MYSQL: - case self::QUEUE_TYPE_PGSQL: - case self::QUEUE_TYPE_SQLITE: + case self::QUEUE_TYPE_MYSQL: + case self::QUEUE_TYPE_PGSQL: + case self::QUEUE_TYPE_SQLITE: // do nothing break; @@ -180,12 +180,12 @@ public function watchPipeline(string $pipeline) { } } - /** - * Runs necessary checks to make sure the queue will work properly - * - * @return void - * @throws Exception - */ + /** + * Runs necessary checks to make sure the queue will work properly + * + * @return void + * @throws Exception + */ protected function runPreChecks() { if(empty($this->pipeline)) { @@ -202,16 +202,16 @@ protected function runPreChecks() { } - /** - * Adds a new job to the job queue - * - * @param string $payload - * @param integer $delay - * @param integer $priority - * @param integer $time_to_retry - * @return array [] - * @throws Exception - */ + /** + * Adds a new job to the job queue + * + * @param string $payload + * @param integer $delay + * @param integer $priority + * @param integer $time_to_retry + * @return array [] + * @throws Exception + */ public function addJob(string $payload, int $delay = 0, int $priority = 1024, int $time_to_retry = 60) { $this->runPreChecks(); @@ -248,12 +248,12 @@ public function addJob(string $payload, int $delay = 0, int $priority = 1024, in return $job; } - /** - * Gets the next available job and reserves it. Sorted by delay and priority - * - * @return array - * @throws Exception - */ + /** + * Gets the next available job and reserves it. Sorted by delay and priority + * + * @return array + * @throws Exception + */ public function getNextJobAndReserve() { $this->runPreChecks(); $job = []; @@ -291,13 +291,13 @@ public function getNextJobAndReserve() { return $job; } - /** - * Gets the next available job. Sorted by delay and priority - * Requires `selectPipeline()` to be set. - * - * @return array - * @throws Exception - */ + /** + * Gets the next available job. Sorted by delay and priority + * Requires `selectPipeline()` to be set. + * + * @return array + * @throws Exception + */ public function getNextBuriedJob() { $this->runPreChecks(); $job = []; @@ -332,13 +332,13 @@ public function getNextBuriedJob() { return $job; } - /** - * Deletes a job - * - * @param mixed $job - * @return void - * @throws Exception - */ + /** + * Deletes a job + * + * @param mixed $job + * @return void + * @throws Exception + */ public function deleteJob($job): void { $this->runPreChecks(); switch($this->queue_type) { @@ -356,13 +356,13 @@ public function deleteJob($job): void { } } - /** - * Buries (hides) a job - * - * @param mixed $job - * @return void - * @throws Exception - */ + /** + * Buries (hides) a job + * + * @param mixed $job + * @return void + * @throws Exception + */ public function buryJob($job): void { $this->runPreChecks(); switch($this->queue_type) { @@ -381,13 +381,13 @@ public function buryJob($job): void { } } - /** - * Kicks (releases, unburies) job - * - * @param mixed $job - * @return void - * @throws Exception - */ + /** + * Kicks (releases, unburies) job + * + * @param mixed $job + * @return void + * @throws Exception + */ public function kickJob($job): void { $this->runPreChecks(); switch($this->queue_type) { @@ -442,11 +442,11 @@ public function getJobPayload($job) { } /** - * Return quoted identifier name - * @return string - * @param $key - * @param bool $split - **/ + * Return quoted identifier name + * @return string + * @param $key + * @param bool $split + */ protected function quoteDatabaseKey(string $key, bool $split = true): string { $delims = [ 'sqlite2?|mysql'=>'``', @@ -468,9 +468,9 @@ public function isMysqlQueueType(): bool { return $this->queue_type === self::QUEUE_TYPE_MYSQL; } - public function isPgsqlQueueType(): bool { - return $this->queue_type === self::QUEUE_TYPE_PGSQL; - } + public function isPgsqlQueueType(): bool { + return $this->queue_type === self::QUEUE_TYPE_PGSQL; + } public function isSqliteQueueType(): bool { return $this->queue_type === self::QUEUE_TYPE_SQLITE; @@ -485,8 +485,8 @@ protected function getSqlTableName(): string { if($this->isMysqlQueueType() && isset($this->options['mysql']['table_name'])) { $table_name = $this->options['mysql']['table_name']; } else if($this->isPgsqlQueueType() && isset($this->options['pgsql']['table_name'])) { - $table_name = $this->options['pgsql']['table_name']; - } else if($this->isSqliteQueueType() && isset($this->options['sqlite']['table_name'])) { + $table_name = $this->options['pgsql']['table_name']; + } else if($this->isSqliteQueueType() && isset($this->options['sqlite']['table_name'])) { $table_name = $this->options['sqlite']['table_name']; } return $this->quoteDatabaseKey($table_name); @@ -502,9 +502,9 @@ protected function checkAndIfNecessaryCreateJobQueueTable(): void { $escaped_table_name = $this->connection->quote($table_name); $statement = $this->connection->query("SHOW TABLES LIKE {$escaped_table_name}"); } else if($this->isPgsqlQueueType()) { - $escaped_table_name = $this->connection->quote($table_name); - $statement = $this->connection->query("SELECT * FROM pg_catalog.pg_tables WHERE schemaname != 'pg_catalog' AND schemaname != 'information_schema' and tablename like {$escaped_table_name}"); - } else { + $escaped_table_name = $this->connection->quote($table_name); + $statement = $this->connection->query("SELECT * FROM pg_catalog.pg_tables WHERE schemaname != 'pg_catalog' AND schemaname != 'information_schema' and tablename like {$escaped_table_name}"); + } else { $statement = $this->connection->prepare("SELECT name FROM sqlite_master WHERE type='table' AND name = ?"); $statement->execute([ $table_name ]); } @@ -530,7 +530,7 @@ protected function checkAndIfNecessaryCreateJobQueueTable(): void { KEY `pipeline_send_dt_is_buried_is_reserved` (`pipeline`(75), `send_dt`, `is_buried`, `is_reserved`) );"); } else { - $field_type = $this->isSqliteQueueType() ? 'INTEGER PRIMARY KEY AUTOINCREMENT' : 'serial'; + $field_type = $this->isSqliteQueueType() ? 'INTEGER PRIMARY KEY AUTOINCREMENT' : 'serial'; $this->connection->exec("CREATE TABLE IF NOT EXISTS {$table_name} ( id {$field_type} NOT NULL, pipeline TEXT NOT NULL, From d270238b02e308a2c29b57ac6d826bb6a34b2414 Mon Sep 17 00:00:00 2001 From: Alan Lobo Date: Sat, 8 Jun 2024 11:32:58 +0530 Subject: [PATCH 5/5] Update README.md for postgres example --- README.md | 24 ++++++++++++++++++++++-- 1 file changed, 22 insertions(+), 2 deletions(-) diff --git a/README.md b/README.md index edac3f2..3210f06 100644 --- a/README.md +++ b/README.md @@ -29,6 +29,26 @@ $Job_Queue->selectPipeline('send_important_emails'); $Job_Queue->addJob(json_encode([ 'something' => 'that', 'ends' => 'up', 'a' => 'string' ])); ``` +#### PostgreSQL +```php + [ + 'table_name' => 'new_table_name', // default is job_queue_jobs + ] +]); + +$PDO = new PDO('pgsql:dbname=testdb;host=127.0.0.1', 'user', 'pass'); +$Job_Queue->addQueueConnection($PDO); + +$Job_Queue->selectPipeline('send_important_emails'); +$Job_Queue->addJob(json_encode([ 'something' => 'that', 'ends' => 'up', 'a' => 'string' ])); +``` + #### SQLite3 ```php addQueueConnection($PDO); - $Job_Queue->watchPipeline('some_cool_pipeline_name'); + $Job_Queue->watchPipeline('send_important_emails'); while(true) { $job = $Job_Queue->getNextJobAndReserve(); @@ -107,4 +127,4 @@ Supervisord is going to be your jam. Look up the many, many articles on how to i PHPUnit Tests with sqlite3 examples for the time being. ``` vendor/bin/phpunit -``` \ No newline at end of file +```