diff --git a/composer.json b/composer.json index 030922c..d6a4c33 100644 --- a/composer.json +++ b/composer.json @@ -22,18 +22,18 @@ "php": ">=8.1", "amphp/amp": "^3", "amphp/pipeline": "^1", - "amphp/sql": "^2", - "amphp/sql-common": "^2" + "amphp/sql": "dev-refactor-transactions as 2.0", + "amphp/sql-common": "dev-refactor-transactions as 2.0" }, "require-dev": { "ext-pgsql": "*", "ext-pq": "*", "amphp/phpunit-util": "^3", "phpunit/phpunit": "^9", - "amphp/php-cs-fixer-config": "^2-dev", - "psalm/phar": "^5.4" + "amphp/php-cs-fixer-config": "^2", + "psalm/phar": "^5.11" }, - "minimum-stability": "dev", + "minimum-stability": "beta", "prefer-stable": true, "autoload": { "psr-4": { diff --git a/src/DefaultPostgresConnector.php b/src/DefaultPostgresConnector.php index adc2e40..569cad3 100644 --- a/src/DefaultPostgresConnector.php +++ b/src/DefaultPostgresConnector.php @@ -3,12 +3,13 @@ namespace Amp\Postgres; use Amp\Cancellation; +use Amp\Postgres\Internal\PostgresHandleConnection; use Amp\Sql\SqlConfig; use Amp\Sql\SqlConnector; use Amp\Sql\SqlException; /** - * @implements SqlConnector + * @implements SqlConnector */ final class DefaultPostgresConnector implements SqlConnector { @@ -17,7 +18,7 @@ final class DefaultPostgresConnector implements SqlConnector * * @throws \Error If neither ext-pgsql nor pecl-pq is loaded. */ - public function connect(SqlConfig $config, ?Cancellation $cancellation = null): PostgresConnection + public function connect(SqlConfig $config, ?Cancellation $cancellation = null): PostgresHandleConnection { if (!$config instanceof PostgresConfig) { throw new \TypeError(\sprintf("Must provide an instance of %s to Postgres connectors", PostgresConfig::class)); diff --git a/src/Internal/AbstractHandle.php b/src/Internal/AbstractHandle.php index abc6d3c..ed1d499 100644 --- a/src/Internal/AbstractHandle.php +++ b/src/Internal/AbstractHandle.php @@ -5,7 +5,7 @@ use Amp\DeferredFuture; use Amp\Pipeline\Queue; use Amp\Postgres\ByteA; -use Amp\Postgres\PostgresHandle; +use Amp\Postgres\PostgresConfig; use Amp\Sql\ConnectionException; use Revolt\EventLoop; @@ -22,6 +22,7 @@ abstract class AbstractHandle implements PostgresHandle protected int $lastUsedAt = 0; public function __construct( + private readonly PostgresConfig $config, protected readonly string $poll, protected readonly string $await, private readonly DeferredFuture $onClose, @@ -36,6 +37,11 @@ public function __destruct() } } + public function getConfig(): PostgresConfig + { + return $this->config; + } + public function getLastUsedAt(): int { return $this->lastUsedAt; diff --git a/src/Internal/PgSqlHandle.php b/src/Internal/PgSqlHandle.php index 060b432..eb002a4 100644 --- a/src/Internal/PgSqlHandle.php +++ b/src/Internal/PgSqlHandle.php @@ -5,7 +5,7 @@ use Amp\DeferredFuture; use Amp\Future; use Amp\Pipeline\Queue; -use Amp\Postgres\PostgresHandle; +use Amp\Postgres\PostgresConfig; use Amp\Postgres\PostgresListener; use Amp\Postgres\PostgresNotification; use Amp\Postgres\PostgresResult; @@ -54,8 +54,12 @@ final class PgSqlHandle extends AbstractHandle * @param resource $socket PostgreSQL connection stream socket. * @param string $id Connection identifier for determining which cached type table to use. */ - public function __construct(\PgSql\Connection $handle, $socket, string $id) - { + public function __construct( + \PgSql\Connection $handle, + $socket, + string $id, + PostgresConfig $config, + ) { $this->handle = $handle; $handle = &$this->handle; @@ -151,7 +155,7 @@ public function __construct(\PgSql\Connection $handle, $socket, string $id) EventLoop::unreference($poll); EventLoop::disable($await); - parent::__construct($poll, $await, $onClose); + parent::__construct($config, $poll, $await, $onClose); $this->types = (self::$typeCache[$id] ??= self::fetchTypes($handle)); } diff --git a/src/Internal/PostgresConnectionStatement.php b/src/Internal/PostgresConnectionStatement.php index c3dae9f..aa0e470 100644 --- a/src/Internal/PostgresConnectionStatement.php +++ b/src/Internal/PostgresConnectionStatement.php @@ -3,7 +3,6 @@ namespace Amp\Postgres\Internal; use Amp\DeferredFuture; -use Amp\Postgres\PostgresHandle; use Amp\Postgres\PostgresStatement; use Amp\Sql\Result; use Amp\Sql\SqlException; diff --git a/src/Internal/PostgresConnectionTransaction.php b/src/Internal/PostgresConnectionTransaction.php index cd19895..08a244b 100644 --- a/src/Internal/PostgresConnectionTransaction.php +++ b/src/Internal/PostgresConnectionTransaction.php @@ -3,7 +3,6 @@ namespace Amp\Postgres\Internal; use Amp\DeferredFuture; -use Amp\Postgres\PostgresHandle; use Amp\Postgres\PostgresResult; use Amp\Postgres\PostgresStatement; use Amp\Postgres\PostgresTransaction; @@ -13,7 +12,7 @@ use Revolt\EventLoop; /** @internal */ -final class PostgresConnectionTransaction implements PostgresTransaction +final class PostgresConnectionTransaction implements PostgresNestableTransaction { /** @var \Closure():void */ private readonly \Closure $release; @@ -31,7 +30,7 @@ final class PostgresConnectionTransaction implements PostgresTransaction public function __construct( private readonly PostgresHandle $handle, \Closure $release, - private readonly TransactionIsolation $isolation + private readonly TransactionIsolation $isolation, ) { $refCount = &$this->refCount; $statements = &$this->statements; @@ -73,6 +72,11 @@ public function getLastUsedAt(): int return $this->handle->getLastUsedAt(); } + public function isNestedTransaction(): bool + { + return false; + } + private function assertOpen(): void { if ($this->isClosed()) { @@ -169,6 +173,22 @@ public function execute(string $sql, array $params = []): PostgresResult return new PostgresPooledResult($result, $this->release); } + public function beginTransaction(): PostgresTransaction + { + $this->assertOpen(); + + ++$this->refCount; + try { + $identifier = bin2hex(\random_bytes(4)); + $this->createSavepoint($identifier); + } catch (\Throwable $exception) { + EventLoop::queue($this->release); + throw $exception; + } + + return new PostgresNestedTransaction($this, $identifier, $this->release); + } + /** * @throws TransactionError If the transaction has been committed or rolled back. */ @@ -238,6 +258,16 @@ public function releaseSavepoint(string $identifier): void $this->query("RELEASE SAVEPOINT " . $this->quoteName($identifier)); } + public function onCommit(\Closure $onCommit): void + { + // TODO: Implement onCommit() method. + } + + public function onRollback(\Closure $onRollback): void + { + // TODO: Implement onRollback() method. + } + /** * @throws TransactionError If the transaction has been committed or rolled back. */ diff --git a/src/Internal/PostgresHandle.php b/src/Internal/PostgresHandle.php new file mode 100644 index 0000000..e78bea1 --- /dev/null +++ b/src/Internal/PostgresHandle.php @@ -0,0 +1,46 @@ + $params List of statement parameters, indexed starting at 0. + */ + public function statementExecute(string $name, array $params): PostgresResult; + + /** + * Deallocate the statement with the given name. + */ + public function statementDeallocate(string $name): void; +} diff --git a/src/Internal/PostgresHandleConnection.php b/src/Internal/PostgresHandleConnection.php new file mode 100644 index 0000000..137846e --- /dev/null +++ b/src/Internal/PostgresHandleConnection.php @@ -0,0 +1,157 @@ +handle->getConfig(); + } + + final public function getLastUsedAt(): int + { + return $this->handle->getLastUsedAt(); + } + + final public function close(): void + { + $this->handle->close(); + } + + final public function isClosed(): bool + { + return $this->handle->isClosed(); + } + + final public function onClose(\Closure $onClose): void + { + $this->handle->onClose($onClose); + } + + private function awaitPending(): void + { + while ($this->busy) { + $this->busy->getFuture()->await(); + } + } + + /** + * Reserves the connection for a transaction. + */ + private function reserve(): void + { + \assert($this->busy === null); + $this->busy = new DeferredFuture; + } + + /** + * Releases the transaction lock. + */ + private function release(): void + { + \assert($this->busy !== null); + + $this->busy->complete(); + $this->busy = null; + } + + final public function query(string $sql): PostgresResult + { + $this->awaitPending(); + return $this->handle->query($sql); + } + + final public function execute(string $sql, array $params = []): PostgresResult + { + $this->awaitPending(); + return $this->handle->execute($sql, $params); + } + + final public function prepare(string $sql): PostgresStatement + { + $this->awaitPending(); + return $this->handle->prepare($sql); + } + + final public function notify(string $channel, string $payload = ""): PostgresResult + { + $this->awaitPending(); + return $this->handle->notify($channel, $payload); + } + + final public function listen(string $channel): PostgresListener + { + $this->awaitPending(); + return $this->handle->listen($channel); + } + + final public function beginTransaction(): PostgresTransaction + { + $this->reserve(); + + try { + $this->handle->query("BEGIN TRANSACTION ISOLATION LEVEL " . $this->transactionIsolation->toSql()); + } catch (\Throwable $exception) { + $this->release(); + throw $exception; + } + + return new Internal\PostgresConnectionTransaction( + $this->handle, + $this->release(...), + $this->transactionIsolation, + ); + } + + final public function setTransactionIsolation(TransactionIsolation $isolation): void + { + $this->transactionIsolation = $isolation; + } + + final public function quoteString(string $data): string + { + return $this->handle->quoteString($data); + } + + final public function quoteName(string $name): string + { + return $this->handle->quoteName($name); + } + + final public function escapeByteA(string $data): string + { + return $this->handle->escapeByteA($data); + } +} diff --git a/src/Internal/PostgresNestableTransaction.php b/src/Internal/PostgresNestableTransaction.php new file mode 100644 index 0000000..64596ec --- /dev/null +++ b/src/Internal/PostgresNestableTransaction.php @@ -0,0 +1,16 @@ + + */ +interface PostgresNestableTransaction extends NestableTransaction, PostgresTransaction +{ +} diff --git a/src/Internal/PostgresNestedTransaction.php b/src/Internal/PostgresNestedTransaction.php index 26d807f..3218d76 100644 --- a/src/Internal/PostgresNestedTransaction.php +++ b/src/Internal/PostgresNestedTransaction.php @@ -1,17 +1,19 @@ - + * @extends NestedTransaction */ -final class PostgresNestedTransaction extends NestedTransaction implements PostgresTransaction +final class PostgresNestedTransaction extends NestedTransaction implements PostgresNestableTransaction { use PostgresTransactionDelegate; @@ -19,4 +21,12 @@ protected function getTransaction(): PostgresTransaction { return $this->transaction; } + + protected function createNestedTransaction( + NestableTransaction $transaction, + string $identifier, + \Closure $release, + ): Transaction { + return new PostgresNestedTransaction($transaction, $identifier, $release); + } } diff --git a/src/Internal/PostgresPooledTransaction.php b/src/Internal/PostgresPooledTransaction.php index cfa9f66..e1c1ec9 100644 --- a/src/Internal/PostgresPooledTransaction.php +++ b/src/Internal/PostgresPooledTransaction.php @@ -6,6 +6,7 @@ use Amp\Postgres\PostgresStatement; use Amp\Postgres\PostgresTransaction; use Amp\Sql\Common\PooledTransaction; +use Amp\Sql\Transaction; /** * @internal @@ -19,4 +20,10 @@ protected function getTransaction(): PostgresTransaction { return $this->transaction; } + + protected function createTransaction(Transaction $transaction, \Closure $release): PostgresTransaction + { + \assert($transaction instanceof PostgresConnectionTransaction); + return new PostgresPooledTransaction($transaction, $release); + } } diff --git a/src/Internal/PostgresStatementPool.php b/src/Internal/PostgresStatementPool.php index 97767f7..fd4c68c 100644 --- a/src/Internal/PostgresStatementPool.php +++ b/src/Internal/PostgresStatementPool.php @@ -2,6 +2,7 @@ namespace Amp\Postgres\Internal; +use Amp\Postgres\PostgresConfig; use Amp\Postgres\PostgresResult; use Amp\Postgres\PostgresStatement; use Amp\Postgres\PostgresTransaction; @@ -10,7 +11,7 @@ /** * @internal - * @extends SqlStatementPool + * @extends SqlStatementPool */ final class PostgresStatementPool extends SqlStatementPool implements PostgresStatement { diff --git a/src/Internal/PostgresTransactionDelegate.php b/src/Internal/PostgresTransactionDelegate.php index edb310f..9daaaa0 100644 --- a/src/Internal/PostgresTransactionDelegate.php +++ b/src/Internal/PostgresTransactionDelegate.php @@ -55,6 +55,14 @@ public function execute(string $sql, array $params = []): PostgresResult return parent::execute($sql, $params); } + /** + * Changes return type to this library's Transaction type. + */ + public function beginTransaction(): PostgresTransaction + { + return parent::beginTransaction(); + } + /** * @param non-empty-string $channel */ diff --git a/src/Internal/PqHandle.php b/src/Internal/PqHandle.php index 3694e4e..a366ad2 100644 --- a/src/Internal/PqHandle.php +++ b/src/Internal/PqHandle.php @@ -5,7 +5,7 @@ use Amp\DeferredFuture; use Amp\Future; use Amp\Pipeline\Queue; -use Amp\Postgres\PostgresHandle; +use Amp\Postgres\PostgresConfig; use Amp\Postgres\PostgresListener; use Amp\Postgres\PostgresNotification; use Amp\Postgres\PostgresResult; @@ -28,7 +28,7 @@ final class PqHandle extends AbstractHandle /** @var array> */ private array $statements = []; - public function __construct(pq\Connection $handle) + public function __construct(pq\Connection $handle, PostgresConfig $config) { $this->handle = $handle; @@ -103,7 +103,7 @@ public function __construct(pq\Connection $handle) EventLoop::unreference($poll); EventLoop::disable($await); - parent::__construct($poll, $await, $onClose); + parent::__construct($config, $poll, $await, $onClose); } public function isClosed(): bool diff --git a/src/PgSqlConnection.php b/src/PgSqlConnection.php index 0ddc91c..c15023b 100644 --- a/src/PgSqlConnection.php +++ b/src/PgSqlConnection.php @@ -7,12 +7,12 @@ use Amp\Sql\ConnectionException; use Revolt\EventLoop; -final class PgSqlConnection extends PostgresConnection implements PostgresLink +final class PgSqlConnection extends Internal\PostgresHandleConnection implements PostgresConnection { /** * @throws \Error If pecl-ev is used as a loop extension. */ - public static function connect(PostgresConfig $connectionConfig, ?Cancellation $cancellation = null): self + public static function connect(PostgresConfig $config, ?Cancellation $cancellation = null): self { // @codeCoverageIgnoreStart /** @psalm-suppress UndefinedClass */ @@ -20,7 +20,7 @@ public static function connect(PostgresConfig $connectionConfig, ?Cancellation $ throw new \Error('ext-pgsql is not compatible with pecl-ev; use pecl-pq or a different loop extension'); } // @codeCoverageIgnoreEnd - if (!$connection = \pg_connect($connectionConfig->getConnectionString(), \PGSQL_CONNECT_ASYNC | \PGSQL_CONNECT_FORCE_NEW)) { + if (!$connection = \pg_connect($config->getConnectionString(), \PGSQL_CONNECT_ASYNC | \PGSQL_CONNECT_FORCE_NEW)) { throw new ConnectionException("Failed to create connection resource"); } @@ -32,11 +32,11 @@ public static function connect(PostgresConfig $connectionConfig, ?Cancellation $ throw new ConnectionException("Failed to access connection socket"); } - $hash = \sha1($connectionConfig->getHost() . $connectionConfig->getPort() . $connectionConfig->getUser()); + $hash = \sha1($config->getHost() . $config->getPort() . $config->getUser()); $deferred = new DeferredFuture(); /** @psalm-suppress MissingClosureParamType $resource is a resource and cannot be inferred in this context */ - $callback = static function (string $callbackId, $resource) use (&$poll, &$await, $connection, $deferred, $hash): void { + $callback = static function (string $callbackId, $resource) use (&$poll, &$await, $connection, $config, $deferred, $hash): void { switch ($result = \pg_connect_poll($connection)) { case \PGSQL_POLLING_READING: case \PGSQL_POLLING_WRITING: @@ -47,7 +47,7 @@ public static function connect(PostgresConfig $connectionConfig, ?Cancellation $ break; case \PGSQL_POLLING_OK: - $deferred->complete(new self($connection, $resource, $hash)); + $deferred->complete(new self($connection, $resource, $hash, $config)); break; default: @@ -75,8 +75,8 @@ public static function connect(PostgresConfig $connectionConfig, ?Cancellation $ * @param resource $socket PostgreSQL connection stream socket. * @param string $id Connection identifier for determining which cached type table to use. */ - protected function __construct(\PgSql\Connection $handle, $socket, string $id) + protected function __construct(\PgSql\Connection $handle, $socket, string $id, PostgresConfig $config) { - parent::__construct(new Internal\PgSqlHandle($handle, $socket, $id)); + parent::__construct(new Internal\PgSqlHandle($handle, $socket, $id, $config)); } } diff --git a/src/PostgresConnection.php b/src/PostgresConnection.php index 16557c6..0010f75 100644 --- a/src/PostgresConnection.php +++ b/src/PostgresConnection.php @@ -2,133 +2,15 @@ namespace Amp\Postgres; -use Amp\Cancellation; -use Amp\DeferredFuture; -use Amp\Sql\ConnectionException; -use Amp\Sql\TransactionIsolation; -use Amp\Sql\TransactionIsolationLevel; +use Amp\Sql\Connection; -abstract class PostgresConnection implements PostgresLink, PostgresReceiver +/** + * @extends Connection + */ +interface PostgresConnection extends Connection, PostgresLink, PostgresReceiver { - /** @var DeferredFuture|null Used to only allow one transaction at a time. */ - private ?DeferredFuture $busy = null; - - /** - * @throws ConnectionException - */ - abstract public static function connect( - PostgresConfig $connectionConfig, - ?Cancellation $cancellation = null, - ): self; - - protected function __construct(private readonly PostgresHandle $handle) - { - } - - final public function getLastUsedAt(): int - { - return $this->handle->getLastUsedAt(); - } - - final public function close(): void - { - $this->handle->close(); - } - - final public function isClosed(): bool - { - return $this->handle->isClosed(); - } - - final public function onClose(\Closure $onClose): void - { - $this->handle->onClose($onClose); - } - - private function awaitPending(): void - { - while ($this->busy) { - $this->busy->getFuture()->await(); - } - } - - /** - * Reserves the connection for a transaction. - */ - private function reserve(): void - { - \assert($this->busy === null); - $this->busy = new DeferredFuture; - } - /** - * Releases the transaction lock. + * @return PostgresConfig Config object specific to this library. */ - private function release(): void - { - \assert($this->busy !== null); - - $this->busy->complete(); - $this->busy = null; - } - - final public function query(string $sql): PostgresResult - { - $this->awaitPending(); - return $this->handle->query($sql); - } - - final public function execute(string $sql, array $params = []): PostgresResult - { - $this->awaitPending(); - return $this->handle->execute($sql, $params); - } - - final public function prepare(string $sql): PostgresStatement - { - $this->awaitPending(); - return $this->handle->prepare($sql); - } - - final public function notify(string $channel, string $payload = ""): PostgresResult - { - $this->awaitPending(); - return $this->handle->notify($channel, $payload); - } - - final public function listen(string $channel): PostgresListener - { - $this->awaitPending(); - return $this->handle->listen($channel); - } - - final public function beginTransaction( - TransactionIsolation $isolation = TransactionIsolationLevel::Committed - ): PostgresTransaction { - $this->reserve(); - - try { - $this->handle->query("BEGIN TRANSACTION ISOLATION LEVEL " . $isolation->toSql()); - } catch (\Throwable $exception) { - $this->release(); - throw $exception; - } - - return new Internal\PostgresConnectionTransaction($this->handle, $this->release(...), $isolation); - } - - final public function quoteString(string $data): string - { - return $this->handle->quoteString($data); - } - - final public function quoteName(string $name): string - { - return $this->handle->quoteName($name); - } - - final public function escapeByteA(string $data): string - { - return $this->handle->escapeByteA($data); - } + public function getConfig(): PostgresConfig; } diff --git a/src/PostgresConnectionPool.php b/src/PostgresConnectionPool.php index e92d0f8..e5076a4 100644 --- a/src/PostgresConnectionPool.php +++ b/src/PostgresConnectionPool.php @@ -3,6 +3,7 @@ namespace Amp\Postgres; use Amp\Future; +use Amp\Postgres\Internal\PostgresHandleConnection; use Amp\Sql\Common\ConnectionPool; use Amp\Sql\Result; use Amp\Sql\SqlConnector; @@ -13,11 +14,11 @@ use function Amp\async; /** - * @extends ConnectionPool + * @extends ConnectionPool */ final class PostgresConnectionPool extends ConnectionPool implements PostgresLink, PostgresReceiver { - /** @var Future|null Connection used for notification listening. */ + /** @var Future|null Connection used for notification listening. */ private Future|null $listeningConnection = null; /** @var int Number of listeners on listening connection. */ @@ -27,7 +28,7 @@ final class PostgresConnectionPool extends ConnectionPool implements PostgresLin * @param positive-int $maxConnections * @param positive-int $idleTimeout * @param bool $resetConnections True to automatically execute DISCARD ALL on a connection before use. - * @param SqlConnector|null $connector + * @param SqlConnector|null $connector */ public function __construct( PostgresConfig $config, @@ -74,7 +75,7 @@ public function beginTransaction( return parent::beginTransaction($isolation); } - protected function pop(): PostgresConnection + protected function pop(): PostgresHandleConnection { $connection = parent::pop(); diff --git a/src/PostgresExecutor.php b/src/PostgresExecutor.php index 96b9652..41b81d5 100644 --- a/src/PostgresExecutor.php +++ b/src/PostgresExecutor.php @@ -7,7 +7,7 @@ use Amp\Sql\SqlException; /** - * @extends Executor + * @extends Executor */ interface PostgresExecutor extends Executor, PostgresQuoter { @@ -26,6 +26,11 @@ public function prepare(string $sql): PostgresStatement; */ public function execute(string $sql, array $params = []): PostgresResult; + /** + * @return PostgresTransaction Transaction object specific to this library. + */ + public function beginTransaction(): PostgresTransaction; + /** * @param non-empty-string $channel Channel name. * @param string $payload Notification payload. diff --git a/src/PostgresHandle.php b/src/PostgresHandle.php deleted file mode 100644 index 79ea9a0..0000000 --- a/src/PostgresHandle.php +++ /dev/null @@ -1,20 +0,0 @@ - $params List of statement parameters, indexed starting at 0. - */ - public function statementExecute(string $name, array $params): PostgresResult; - - /** - * Deallocate the statement with the given name. - */ - public function statementDeallocate(string $name): void; -} diff --git a/src/PostgresLink.php b/src/PostgresLink.php index de5a08f..2df56ee 100644 --- a/src/PostgresLink.php +++ b/src/PostgresLink.php @@ -3,18 +3,10 @@ namespace Amp\Postgres; use Amp\Sql\Link; -use Amp\Sql\TransactionIsolation; -use Amp\Sql\TransactionIsolationLevel; /** * @extends Link */ interface PostgresLink extends Link, PostgresQuoter { - /** - * @return PostgresTransaction Transaction object specific to this library. - */ - public function beginTransaction( - TransactionIsolation $isolation = TransactionIsolationLevel::Committed - ): PostgresTransaction; } diff --git a/src/PostgresNestableTransaction.php b/src/PostgresNestableTransaction.php deleted file mode 100644 index 05023f2..0000000 --- a/src/PostgresNestableTransaction.php +++ /dev/null @@ -1,46 +0,0 @@ - - */ -final class PostgresNestableTransaction extends NestableTransaction implements PostgresLink -{ - protected function createNestedTransaction( - Transaction $transaction, - \Closure $release, - string $identifier, - ): Transaction { - return new Internal\PostgresNestedTransaction($transaction, $release, $identifier); - } - - /** - * Changes return type to this library's Transaction type. - */ - public function beginTransaction( - TransactionIsolation $isolation = TransactionIsolationLevel::Committed - ): PostgresTransaction { - return parent::beginTransaction($isolation); - } - - public function quoteString(string $data): string - { - return $this->transaction->quoteString($data); - } - - public function quoteName(string $name): string - { - return $this->transaction->quoteName($name); - } - - public function escapeByteA(string $data): string - { - return $this->transaction->escapeByteA($data); - } -} diff --git a/src/PostgresTransaction.php b/src/PostgresTransaction.php index add8dc8..b0d787b 100644 --- a/src/PostgresTransaction.php +++ b/src/PostgresTransaction.php @@ -7,7 +7,7 @@ /** * Note that notifications sent during a transaction are not delivered until the transaction has been committed. * - * @extends Transaction + * @extends Transaction */ interface PostgresTransaction extends PostgresExecutor, Transaction { diff --git a/src/PqConnection.php b/src/PqConnection.php index 77b1960..23a94c2 100644 --- a/src/PqConnection.php +++ b/src/PqConnection.php @@ -8,14 +8,14 @@ use pq; use Revolt\EventLoop; -final class PqConnection extends PostgresConnection implements PostgresLink +final class PqConnection extends Internal\PostgresHandleConnection implements PostgresConnection { private readonly Internal\PqHandle $handle; - public static function connect(PostgresConfig $connectionConfig, ?Cancellation $cancellation = null): self + public static function connect(PostgresConfig $config, ?Cancellation $cancellation = null): self { try { - $connection = new pq\Connection($connectionConfig->getConnectionString(), pq\Connection::ASYNC); + $connection = new pq\Connection($config->getConnectionString(), pq\Connection::ASYNC); } catch (pq\Exception $exception) { throw new ConnectionException("Could not connect to PostgreSQL server", 0, $exception); } @@ -24,7 +24,7 @@ public static function connect(PostgresConfig $connectionConfig, ?Cancellation $ $connection->unbuffered = true; $deferred = new DeferredFuture(); - $callback = function () use (&$poll, &$await, $connection, $deferred): void { + $callback = function () use (&$poll, &$await, $connection, $config, $deferred): void { switch ($result = $connection->poll()) { case pq\Connection::POLLING_READING: case pq\Connection::POLLING_WRITING: @@ -35,7 +35,7 @@ public static function connect(PostgresConfig $connectionConfig, ?Cancellation $ break; case pq\Connection::POLLING_OK: - $deferred->complete(new self($connection)); + $deferred->complete(new self($connection, $config)); break; default: @@ -58,9 +58,9 @@ public static function connect(PostgresConfig $connectionConfig, ?Cancellation $ } } - protected function __construct(pq\Connection $handle) + protected function __construct(pq\Connection $handle, PostgresConfig $config) { - $this->handle = new Internal\PqHandle($handle); + $this->handle = new Internal\PqHandle($handle, $config); parent::__construct($this->handle); } diff --git a/src/functions.php b/src/functions.php index c1c7886..9bf65ad 100644 --- a/src/functions.php +++ b/src/functions.php @@ -3,15 +3,16 @@ namespace Amp\Postgres; use Amp\Cancellation; +use Amp\Postgres\Internal\PostgresHandleConnection; use Amp\Sql\Common\RetrySqlConnector; use Amp\Sql\SqlConnector; use Amp\Sql\SqlException; use Revolt\EventLoop; /** - * @param SqlConnector|null $connector + * @param SqlConnector|null $connector * - * @return SqlConnector + * @return SqlConnector */ function postgresConnector(?SqlConnector $connector = null): SqlConnector { @@ -25,7 +26,7 @@ function postgresConnector(?SqlConnector $connector = null): SqlConnector /** * @psalm-suppress InvalidArgument - * @var SqlConnector + * @var SqlConnector */ return $map[$driver] ??= new RetrySqlConnector(new DefaultPostgresConnector()); } @@ -37,7 +38,7 @@ function postgresConnector(?SqlConnector $connector = null): SqlConnector * * @throws \Error If neither ext-pgsql or pecl-pq is loaded. */ -function connect(PostgresConfig $config, ?Cancellation $cancellation = null): PostgresConnection +function connect(PostgresConfig $config, ?Cancellation $cancellation = null): PostgresHandleConnection { return postgresConnector()->connect($config, $cancellation); } diff --git a/test/AbstractConnectTest.php b/test/AbstractConnectTest.php index 3ea0fed..6e35b91 100644 --- a/test/AbstractConnectTest.php +++ b/test/AbstractConnectTest.php @@ -4,13 +4,13 @@ use Amp\Cancellation; use Amp\PHPUnit\AsyncTestCase; +use Amp\Postgres\Internal\PostgresHandleConnection; use Amp\Postgres\PostgresConfig; -use Amp\Postgres\PostgresConnection; abstract class AbstractConnectTest extends AsyncTestCase { abstract public function connect( PostgresConfig $connectionConfig, ?Cancellation $cancellation = null - ): PostgresConnection; + ): PostgresHandleConnection; } diff --git a/test/AbstractLinkTest.php b/test/AbstractLinkTest.php index b897597..8d3a5c8 100644 --- a/test/AbstractLinkTest.php +++ b/test/AbstractLinkTest.php @@ -5,7 +5,7 @@ use Amp\Future; use Amp\PHPUnit\AsyncTestCase; use Amp\Postgres\ByteA; -use Amp\Postgres\PostgresConnection; +use Amp\Postgres\Internal\PostgresHandleConnection; use Amp\Postgres\PostgresLink; use Amp\Postgres\PostgresTransaction; use Amp\Postgres\QueryExecutionError; @@ -79,7 +79,7 @@ protected function verifyResult(Result $result, array $data): void abstract public function createLink(string $connectionString): PostgresLink; /** - * Helper method to invoke the protected constructor of classes extending {@see PostgresConnection}. + * Helper method to invoke the protected constructor of classes extending {@see PostgresHandleConnection}. * * @template T extends Connection * @@ -88,7 +88,7 @@ abstract public function createLink(string $connectionString): PostgresLink; * * @return T */ - protected function newConnection(string $className, mixed ...$args): PostgresConnection + protected function newConnection(string $className, mixed ...$args): PostgresHandleConnection { $reflection = new \ReflectionClass($className); $connection = $reflection->newInstanceWithoutConstructor(); diff --git a/test/AbstractQuoteTest.php b/test/AbstractQuoteTest.php index eac1a9d..07a0566 100644 --- a/test/AbstractQuoteTest.php +++ b/test/AbstractQuoteTest.php @@ -2,12 +2,12 @@ namespace Amp\Postgres\Test; +use Amp\Postgres\Internal\PostgresHandleConnection; use Amp\Postgres\PostgresConfig; -use Amp\Postgres\PostgresConnection; abstract class AbstractQuoteTest extends AbstractConnectTest { - private PostgresConnection $connection; + private PostgresHandleConnection $connection; public function setUp(): void { diff --git a/test/FunctionsTest.php b/test/FunctionsTest.php index a612ff1..aef26e5 100644 --- a/test/FunctionsTest.php +++ b/test/FunctionsTest.php @@ -5,8 +5,8 @@ use Amp\CancelledException; use Amp\DeferredCancellation; use Amp\PHPUnit\AsyncTestCase; +use Amp\Postgres\Internal\PostgresHandleConnection; use Amp\Postgres\PostgresConfig; -use Amp\Postgres\PostgresConnection; use function Amp\Postgres\connect; class FunctionsTest extends AsyncTestCase @@ -23,7 +23,7 @@ public function setUp(): void public function testConnect() { $connection = connect(PostgresConfig::fromString('host=localhost user=postgres password=postgres')); - $this->assertInstanceOf(PostgresConnection::class, $connection); + $this->assertInstanceOf(PostgresHandleConnection::class, $connection); } /**