diff --git a/src/Internal/AbstractHandle.php b/src/Internal/AbstractHandle.php index 0e2fc78..a47a282 100644 --- a/src/Internal/AbstractHandle.php +++ b/src/Internal/AbstractHandle.php @@ -94,6 +94,16 @@ protected function escapeParams(array $params): array }, $params); } + public function commit(): void + { + $this->query("COMMIT"); + } + + public function rollback(): void + { + $this->query("ROLLBACK"); + } + public function createSavepoint(string $identifier): void { $this->query("SAVEPOINT " . $this->quoteName($identifier)); diff --git a/src/Internal/PgSqlHandle.php b/src/Internal/PgSqlHandle.php index b6f988c..33a64fd 100644 --- a/src/Internal/PgSqlHandle.php +++ b/src/Internal/PgSqlHandle.php @@ -391,7 +391,7 @@ public function prepare(string $sql): PostgresStatement $modifiedSql = parseNamedParams($sql, $names); - $name = PostgresHandle::STATEMENT_NAME_PREFIX . \sha1($modifiedSql); + $name = self::STATEMENT_NAME_PREFIX . \sha1($modifiedSql); while (isset($this->statements[$name])) { $storage = $this->statements[$name]; diff --git a/src/Internal/PostgresConnectionTransaction.php b/src/Internal/PostgresConnectionTransaction.php index cd3b053..3a4fefc 100644 --- a/src/Internal/PostgresConnectionTransaction.php +++ b/src/Internal/PostgresConnectionTransaction.php @@ -2,276 +2,43 @@ namespace Amp\Postgres\Internal; -use Amp\DeferredFuture; +use Amp\Postgres\PostgresExecutor; use Amp\Postgres\PostgresResult; use Amp\Postgres\PostgresStatement; use Amp\Postgres\PostgresTransaction; -use Amp\Sql\SqlException; -use Amp\Sql\TransactionError; +use Amp\Sql\Common\ConnectionTransaction; +use Amp\Sql\Common\NestableTransactionExecutor; +use Amp\Sql\Transaction; use Amp\Sql\TransactionIsolation; -use Revolt\EventLoop; -/** @internal */ -final class PostgresConnectionTransaction implements PostgresTransaction +/** + * @internal + * @extends ConnectionTransaction + */ +final class PostgresConnectionTransaction extends ConnectionTransaction implements PostgresTransaction { - /** @var \Closure():void */ - private readonly \Closure $release; + use PostgresTransactionDelegate; - private int $refCount = 1; - - private readonly DeferredFuture $onClose; - - private ?DeferredFuture $busy = null; - - /** @var array Reference statements so de-allocation occurs after commit/rollback. */ - private array $statements = []; - - /** - * @param \Closure():void $release - */ public function __construct( private readonly PostgresHandle $handle, \Closure $release, - private readonly TransactionIsolation $isolation, + TransactionIsolation $isolation ) { - $busy = &$this->busy; - $refCount = &$this->refCount; - $statements = &$this->statements; - $this->release = static function () use (&$busy, &$refCount, &$statements, $release): void { - $busy?->complete(); - $busy = null; - - if (--$refCount === 0) { - $release(); - $statements = []; - } - }; - - $this->onClose = new DeferredFuture(); - $this->onClose($this->release); - } - - public function __destruct() - { - if ($this->onClose->isComplete()) { - return; - } - - $this->onClose->complete(); - - if ($this->handle->isClosed()) { - return; - } - - $handle = $this->handle; - EventLoop::queue(static function () use ($handle): void { - try { - !$handle->isClosed() && $handle->query('ROLLBACK'); - } catch (SqlException) { - // Ignore failure if connection closes during query. - } - }); - } - - public function getLastUsedAt(): int - { - return $this->handle->getLastUsedAt(); - } - - public function isNestedTransaction(): bool - { - return false; - } - - private function assertOpen(): void - { - if ($this->isClosed()) { - throw new TransactionError("The transaction has been committed or rolled back"); - } - } - - /** - * Rolls back all changes in the transaction if it has not been committed. - */ - public function close(): void - { - if (!$this->isClosed()) { - $this->rollback(); // Invokes $this->release callback. - } - } - - public function isClosed(): bool - { - return $this->onClose->isComplete(); - } - - public function onClose(\Closure $onClose): void - { - $this->onClose->getFuture()->finally($onClose); - } - - /** - * @return bool True if the transaction is active, false if it has been committed or rolled back. - */ - public function isActive(): bool - { - return !$this->isClosed(); - } - - public function getIsolationLevel(): TransactionIsolation - { - return $this->isolation; - } - - /** - * @throws TransactionError If the transaction has been committed or rolled back. - */ - public function query(string $sql): PostgresResult - { - $this->awaitPendingNestedTransaction(); - - ++$this->refCount; - try { - $result = $this->handle->query($sql); - } catch (\Throwable $exception) { - EventLoop::queue($this->release); - throw $exception; - } - - return new PostgresPooledResult($result, $this->release); + parent::__construct($handle, $release, $isolation); } - /** - * @throws TransactionError If the transaction has been committed or rolled back. - */ - public function prepare(string $sql): PostgresStatement - { - $this->awaitPendingNestedTransaction(); - - ++$this->refCount; - try { - $statement = $this->handle->prepare($sql); - } catch (\Throwable $exception) { - EventLoop::queue($this->release); - throw $exception; - } - - $this->statements[\spl_object_id($statement)] ??= $statement; - - return new PostgresPooledStatement($statement, $this->release); - } - - /** - * @throws TransactionError If the transaction has been committed or rolled back. - */ - public function execute(string $sql, array $params = []): PostgresResult - { - $this->awaitPendingNestedTransaction(); - - ++$this->refCount; - try { - $result = $this->handle->execute($sql, $params); - } catch (\Throwable $exception) { - EventLoop::queue($this->release); - throw $exception; - } - - return new PostgresPooledResult($result, $this->release); - } - - public function beginTransaction(): PostgresTransaction - { - $this->awaitPendingNestedTransaction(); - - ++$this->refCount; - $this->busy = new DeferredFuture(); - try { - $identifier = \bin2hex(\random_bytes(8)); - $this->handle->createSavepoint($identifier); - } catch (\Throwable $exception) { - EventLoop::queue($this->release); - throw $exception; - } - - return new PostgresNestedTransaction($this, $this->handle, $identifier, $this->release); - } - - /** - * @throws TransactionError If the transaction has been committed or rolled back. - */ - public function notify(string $channel, string $payload = ""): PostgresResult - { - $this->awaitPendingNestedTransaction(); - return $this->handle->notify($channel, $payload); - } - - /** - * Commits the transaction and makes it inactive. - * - * @throws TransactionError If the transaction has been committed or rolled back. - */ - public function commit(): void - { - $this->awaitPendingNestedTransaction(); - $this->onClose->complete(); - $this->handle->query("COMMIT"); - } - - /** - * Rolls back the transaction and makes it inactive. - * - * @throws TransactionError If the transaction has been committed or rolled back. - */ - public function rollback(): void - { - $this->awaitPendingNestedTransaction(); - $this->onClose->complete(); - $this->handle->query("ROLLBACK"); - } - - public function onCommit(\Closure $onCommit): void - { - // TODO: Implement onCommit() method. - } - - public function onRollback(\Closure $onRollback): void - { - // TODO: Implement onRollback() method. - } - - /** - * @throws TransactionError If the transaction has been committed or rolled back. - */ - public function quoteString(string $data): string - { - $this->assertOpen(); - return $this->handle->quoteString($data); - } - - /** - * @throws TransactionError If the transaction has been committed or rolled back. - */ - public function quoteName(string $name): string - { - $this->assertOpen(); - return $this->handle->quoteName($name); - } - - /** - * @throws TransactionError If the transaction has been committed or rolled back. - */ - public function escapeByteA(string $data): string - { - $this->assertOpen(); - return $this->handle->escapeByteA($data); + protected function createNestedTransaction( + Transaction $transaction, + NestableTransactionExecutor $executor, + string $identifier, + \Closure $release, + ): PostgresTransaction { + \assert($executor instanceof PostgresHandle); + return new PostgresNestedTransaction($this, $executor, $identifier, $release); } - private function awaitPendingNestedTransaction(): void + protected function getExecutor(): PostgresExecutor { - while ($this->busy) { - $this->busy->getFuture()->await(); - } - - $this->assertOpen(); + return $this->handle; } } diff --git a/src/Internal/PostgresHandle.php b/src/Internal/PostgresHandle.php index befebe0..eb3b634 100644 --- a/src/Internal/PostgresHandle.php +++ b/src/Internal/PostgresHandle.php @@ -3,8 +3,8 @@ namespace Amp\Postgres\Internal; use Amp\Postgres\PostgresConfig; +use Amp\Postgres\PostgresExecutor; use Amp\Postgres\PostgresListener; -use Amp\Postgres\PostgresQuoter; use Amp\Postgres\PostgresResult; use Amp\Postgres\PostgresStatement; use Amp\Sql\Common\NestableTransactionExecutor; @@ -13,23 +13,12 @@ * @internal * @extends NestableTransactionExecutor */ -interface PostgresHandle extends PostgresQuoter, NestableTransactionExecutor +interface PostgresHandle extends PostgresExecutor, NestableTransactionExecutor { public const STATEMENT_NAME_PREFIX = "amp_"; public function getConfig(): PostgresConfig; - public function query(string $sql): PostgresResult; - - public function execute(string $sql, array $params = []): PostgresResult; - - public function prepare(string $sql): PostgresStatement; - - /** - * @param non-empty-string $channel - */ - public function notify(string $channel, string $payload = ""): PostgresResult; - /** * @param non-empty-string $channel */ diff --git a/src/Internal/PostgresNestedTransaction.php b/src/Internal/PostgresNestedTransaction.php index b8cf3ef..bdd631b 100644 --- a/src/Internal/PostgresNestedTransaction.php +++ b/src/Internal/PostgresNestedTransaction.php @@ -2,6 +2,7 @@ namespace Amp\Postgres\Internal; +use Amp\Postgres\PostgresExecutor; use Amp\Postgres\PostgresResult; use Amp\Postgres\PostgresStatement; use Amp\Postgres\PostgresTransaction; @@ -30,10 +31,7 @@ public function __construct( parent::__construct($transaction, $handle, $identifier, $release); } - /** - * Switches return type to this library's return type. - */ - protected function getTransaction(): PostgresTransaction + protected function getExecutor(): PostgresExecutor { return $this->transaction; } diff --git a/src/Internal/PostgresPooledTransaction.php b/src/Internal/PostgresPooledTransaction.php index 1e758b6..1a162a2 100644 --- a/src/Internal/PostgresPooledTransaction.php +++ b/src/Internal/PostgresPooledTransaction.php @@ -2,6 +2,7 @@ namespace Amp\Postgres\Internal; +use Amp\Postgres\PostgresExecutor; use Amp\Postgres\PostgresResult; use Amp\Postgres\PostgresStatement; use Amp\Postgres\PostgresTransaction; @@ -24,7 +25,7 @@ public function __construct(private readonly PostgresTransaction $transaction, \ parent::__construct($transaction, $release); } - protected function getTransaction(): PostgresTransaction + protected function getExecutor(): PostgresExecutor { return $this->transaction; } diff --git a/src/Internal/PostgresTransactionDelegate.php b/src/Internal/PostgresTransactionDelegate.php index 9daaaa0..a7874b6 100644 --- a/src/Internal/PostgresTransactionDelegate.php +++ b/src/Internal/PostgresTransactionDelegate.php @@ -2,6 +2,7 @@ namespace Amp\Postgres\Internal; +use Amp\Postgres\PostgresExecutor; use Amp\Postgres\PostgresResult; use Amp\Postgres\PostgresStatement; use Amp\Postgres\PostgresTransaction; @@ -11,15 +12,18 @@ /** @internal */ trait PostgresTransactionDelegate { - abstract protected function getTransaction(): PostgresTransaction; + abstract protected function getExecutor(): PostgresExecutor; /** * @param \Closure():void $release */ - protected function createStatement(Statement $statement, \Closure $release): PostgresStatement - { + protected function createStatement( + Statement $statement, + \Closure $release, + ?\Closure $awaitBusyResource = null, + ): PostgresStatement { \assert($statement instanceof PostgresStatement); - return new PostgresPooledStatement($statement, $release); + return new PostgresPooledStatement($statement, $release, $awaitBusyResource); } /** @@ -68,21 +72,21 @@ public function beginTransaction(): PostgresTransaction */ public function notify(string $channel, string $payload = ""): PostgresResult { - return $this->getTransaction()->notify($channel, $payload); + return $this->getExecutor()->notify($channel, $payload); } public function quoteString(string $data): string { - return $this->getTransaction()->quoteString($data); + return $this->getExecutor()->quoteString($data); } public function quoteName(string $name): string { - return $this->getTransaction()->quoteName($name); + return $this->getExecutor()->quoteName($name); } public function escapeByteA(string $data): string { - return $this->transaction->escapeByteA($data); + return $this->getExecutor()->escapeByteA($data); } } diff --git a/src/Internal/PqHandle.php b/src/Internal/PqHandle.php index a366ad2..0e22976 100644 --- a/src/Internal/PqHandle.php +++ b/src/Internal/PqHandle.php @@ -379,7 +379,7 @@ public function prepare(string $sql): PostgresStatement $modifiedSql = parseNamedParams($sql, $names); - $name = PostgresHandle::STATEMENT_NAME_PREFIX . \sha1($modifiedSql); + $name = self::STATEMENT_NAME_PREFIX . \sha1($modifiedSql); while (isset($this->statements[$name])) { $storage = $this->statements[$name];