diff --git a/composer.json b/composer.json
index 030922c..d6a4c33 100644
--- a/composer.json
+++ b/composer.json
@@ -22,18 +22,18 @@
"php": ">=8.1",
"amphp/amp": "^3",
"amphp/pipeline": "^1",
- "amphp/sql": "^2",
- "amphp/sql-common": "^2"
+ "amphp/sql": "dev-refactor-transactions as 2.0",
+ "amphp/sql-common": "dev-refactor-transactions as 2.0"
},
"require-dev": {
"ext-pgsql": "*",
"ext-pq": "*",
"amphp/phpunit-util": "^3",
"phpunit/phpunit": "^9",
- "amphp/php-cs-fixer-config": "^2-dev",
- "psalm/phar": "^5.4"
+ "amphp/php-cs-fixer-config": "^2",
+ "psalm/phar": "^5.11"
},
- "minimum-stability": "dev",
+ "minimum-stability": "beta",
"prefer-stable": true,
"autoload": {
"psr-4": {
diff --git a/psalm.xml b/psalm.xml
index 7df024c..6aa023a 100644
--- a/psalm.xml
+++ b/psalm.xml
@@ -20,6 +20,13 @@
+
+
+
+
+
+
+
diff --git a/src/DefaultPostgresConnector.php b/src/DefaultPostgresConnector.php
index adc2e40..569cad3 100644
--- a/src/DefaultPostgresConnector.php
+++ b/src/DefaultPostgresConnector.php
@@ -3,12 +3,13 @@
namespace Amp\Postgres;
use Amp\Cancellation;
+use Amp\Postgres\Internal\PostgresHandleConnection;
use Amp\Sql\SqlConfig;
use Amp\Sql\SqlConnector;
use Amp\Sql\SqlException;
/**
- * @implements SqlConnector
+ * @implements SqlConnector
*/
final class DefaultPostgresConnector implements SqlConnector
{
@@ -17,7 +18,7 @@ final class DefaultPostgresConnector implements SqlConnector
*
* @throws \Error If neither ext-pgsql nor pecl-pq is loaded.
*/
- public function connect(SqlConfig $config, ?Cancellation $cancellation = null): PostgresConnection
+ public function connect(SqlConfig $config, ?Cancellation $cancellation = null): PostgresHandleConnection
{
if (!$config instanceof PostgresConfig) {
throw new \TypeError(\sprintf("Must provide an instance of %s to Postgres connectors", PostgresConfig::class));
diff --git a/src/Internal/AbstractHandle.php b/src/Internal/AbstractHandle.php
index abc6d3c..0e2fc78 100644
--- a/src/Internal/AbstractHandle.php
+++ b/src/Internal/AbstractHandle.php
@@ -5,7 +5,7 @@
use Amp\DeferredFuture;
use Amp\Pipeline\Queue;
use Amp\Postgres\ByteA;
-use Amp\Postgres\PostgresHandle;
+use Amp\Postgres\PostgresConfig;
use Amp\Sql\ConnectionException;
use Revolt\EventLoop;
@@ -22,6 +22,7 @@ abstract class AbstractHandle implements PostgresHandle
protected int $lastUsedAt = 0;
public function __construct(
+ private readonly PostgresConfig $config,
protected readonly string $poll,
protected readonly string $await,
private readonly DeferredFuture $onClose,
@@ -36,6 +37,11 @@ public function __destruct()
}
}
+ public function getConfig(): PostgresConfig
+ {
+ return $this->config;
+ }
+
public function getLastUsedAt(): int
{
return $this->lastUsedAt;
@@ -87,4 +93,19 @@ protected function escapeParams(array $params): array
default => $param,
}, $params);
}
+
+ public function createSavepoint(string $identifier): void
+ {
+ $this->query("SAVEPOINT " . $this->quoteName($identifier));
+ }
+
+ public function rollbackTo(string $identifier): void
+ {
+ $this->query("ROLLBACK TO " . $this->quoteName($identifier));
+ }
+
+ public function releaseSavepoint(string $identifier): void
+ {
+ $this->query("RELEASE SAVEPOINT " . $this->quoteName($identifier));
+ }
}
diff --git a/src/Internal/PgSqlHandle.php b/src/Internal/PgSqlHandle.php
index 060b432..eb002a4 100644
--- a/src/Internal/PgSqlHandle.php
+++ b/src/Internal/PgSqlHandle.php
@@ -5,7 +5,7 @@
use Amp\DeferredFuture;
use Amp\Future;
use Amp\Pipeline\Queue;
-use Amp\Postgres\PostgresHandle;
+use Amp\Postgres\PostgresConfig;
use Amp\Postgres\PostgresListener;
use Amp\Postgres\PostgresNotification;
use Amp\Postgres\PostgresResult;
@@ -54,8 +54,12 @@ final class PgSqlHandle extends AbstractHandle
* @param resource $socket PostgreSQL connection stream socket.
* @param string $id Connection identifier for determining which cached type table to use.
*/
- public function __construct(\PgSql\Connection $handle, $socket, string $id)
- {
+ public function __construct(
+ \PgSql\Connection $handle,
+ $socket,
+ string $id,
+ PostgresConfig $config,
+ ) {
$this->handle = $handle;
$handle = &$this->handle;
@@ -151,7 +155,7 @@ public function __construct(\PgSql\Connection $handle, $socket, string $id)
EventLoop::unreference($poll);
EventLoop::disable($await);
- parent::__construct($poll, $await, $onClose);
+ parent::__construct($config, $poll, $await, $onClose);
$this->types = (self::$typeCache[$id] ??= self::fetchTypes($handle));
}
diff --git a/src/Internal/PostgresConnectionStatement.php b/src/Internal/PostgresConnectionStatement.php
index c3dae9f..aa0e470 100644
--- a/src/Internal/PostgresConnectionStatement.php
+++ b/src/Internal/PostgresConnectionStatement.php
@@ -3,7 +3,6 @@
namespace Amp\Postgres\Internal;
use Amp\DeferredFuture;
-use Amp\Postgres\PostgresHandle;
use Amp\Postgres\PostgresStatement;
use Amp\Sql\Result;
use Amp\Sql\SqlException;
diff --git a/src/Internal/PostgresConnectionTransaction.php b/src/Internal/PostgresConnectionTransaction.php
index cd19895..117cc50 100644
--- a/src/Internal/PostgresConnectionTransaction.php
+++ b/src/Internal/PostgresConnectionTransaction.php
@@ -3,7 +3,6 @@
namespace Amp\Postgres\Internal;
use Amp\DeferredFuture;
-use Amp\Postgres\PostgresHandle;
use Amp\Postgres\PostgresResult;
use Amp\Postgres\PostgresStatement;
use Amp\Postgres\PostgresTransaction;
@@ -22,6 +21,8 @@ final class PostgresConnectionTransaction implements PostgresTransaction
private readonly DeferredFuture $onClose;
+ private ?DeferredFuture $busy = null;
+
/** @var array Reference statements so de-allocation occurs after commit/rollback. */
private array $statements = [];
@@ -31,11 +32,15 @@ final class PostgresConnectionTransaction implements PostgresTransaction
public function __construct(
private readonly PostgresHandle $handle,
\Closure $release,
- private readonly TransactionIsolation $isolation
+ private readonly TransactionIsolation $isolation,
) {
+ $busy = &$this->busy;
$refCount = &$this->refCount;
$statements = &$this->statements;
- $this->release = static function () use (&$refCount, &$statements, $release): void {
+ $this->release = static function () use (&$busy, &$refCount, &$statements, $release): void {
+ $busy?->complete();
+ $busy = null;
+
if (--$refCount === 0) {
$release();
$statements = [];
@@ -73,6 +78,11 @@ public function getLastUsedAt(): int
return $this->handle->getLastUsedAt();
}
+ public function isNestedTransaction(): bool
+ {
+ return false;
+ }
+
private function assertOpen(): void
{
if ($this->isClosed()) {
@@ -118,7 +128,7 @@ public function getIsolationLevel(): TransactionIsolation
*/
public function query(string $sql): PostgresResult
{
- $this->assertOpen();
+ $this->awaitPendingNestedTransaction();
++$this->refCount;
try {
@@ -136,7 +146,7 @@ public function query(string $sql): PostgresResult
*/
public function prepare(string $sql): PostgresStatement
{
- $this->assertOpen();
+ $this->awaitPendingNestedTransaction();
++$this->refCount;
try {
@@ -156,7 +166,7 @@ public function prepare(string $sql): PostgresStatement
*/
public function execute(string $sql, array $params = []): PostgresResult
{
- $this->assertOpen();
+ $this->awaitPendingNestedTransaction();
++$this->refCount;
try {
@@ -169,12 +179,29 @@ public function execute(string $sql, array $params = []): PostgresResult
return new PostgresPooledResult($result, $this->release);
}
+ public function beginTransaction(): PostgresTransaction
+ {
+ $this->awaitPendingNestedTransaction();
+
+ ++$this->refCount;
+ $this->busy = new DeferredFuture();
+ try {
+ $identifier = bin2hex(\random_bytes(4));
+ $this->handle->createSavepoint($identifier);
+ } catch (\Throwable $exception) {
+ EventLoop::queue($this->release);
+ throw $exception;
+ }
+
+ return new PostgresNestedTransaction($this, $this->handle, $identifier, $this->release);
+ }
+
/**
* @throws TransactionError If the transaction has been committed or rolled back.
*/
public function notify(string $channel, string $payload = ""): PostgresResult
{
- $this->assertOpen();
+ $this->awaitPendingNestedTransaction();
return $this->handle->notify($channel, $payload);
}
@@ -185,7 +212,7 @@ public function notify(string $channel, string $payload = ""): PostgresResult
*/
public function commit(): void
{
- $this->assertOpen();
+ $this->awaitPendingNestedTransaction();
$this->onClose->complete();
$this->handle->query("COMMIT");
}
@@ -197,45 +224,19 @@ public function commit(): void
*/
public function rollback(): void
{
- $this->assertOpen();
+ $this->awaitPendingNestedTransaction();
$this->onClose->complete();
$this->handle->query("ROLLBACK");
}
- /**
- * Creates a savepoint with the given identifier.
- *
- * @param string $identifier Savepoint identifier.
- *
- * @throws TransactionError If the transaction has been committed or rolled back.
- */
- public function createSavepoint(string $identifier): void
- {
- $this->query("SAVEPOINT " . $this->quoteName($identifier));
- }
-
- /**
- * Rolls back to the savepoint with the given identifier.
- *
- * @param string $identifier Savepoint identifier.
- *
- * @throws TransactionError If the transaction has been committed or rolled back.
- */
- public function rollbackTo(string $identifier): void
+ public function onCommit(\Closure $onCommit): void
{
- $this->query("ROLLBACK TO " . $this->quoteName($identifier));
+ // TODO: Implement onCommit() method.
}
- /**
- * Releases the savepoint with the given identifier.
- *
- * @param string $identifier Savepoint identifier.
- *
- * @throws TransactionError If the transaction has been committed or rolled back.
- */
- public function releaseSavepoint(string $identifier): void
+ public function onRollback(\Closure $onRollback): void
{
- $this->query("RELEASE SAVEPOINT " . $this->quoteName($identifier));
+ // TODO: Implement onRollback() method.
}
/**
@@ -264,4 +265,13 @@ public function escapeByteA(string $data): string
$this->assertOpen();
return $this->handle->escapeByteA($data);
}
+
+ private function awaitPendingNestedTransaction(): void
+ {
+ while ($this->busy) {
+ $this->busy->getFuture()->await();
+ }
+
+ $this->assertOpen();
+ }
}
diff --git a/src/Internal/PostgresHandle.php b/src/Internal/PostgresHandle.php
new file mode 100644
index 0000000..befebe0
--- /dev/null
+++ b/src/Internal/PostgresHandle.php
@@ -0,0 +1,49 @@
+
+ */
+interface PostgresHandle extends PostgresQuoter, NestableTransactionExecutor
+{
+ public const STATEMENT_NAME_PREFIX = "amp_";
+
+ public function getConfig(): PostgresConfig;
+
+ public function query(string $sql): PostgresResult;
+
+ public function execute(string $sql, array $params = []): PostgresResult;
+
+ public function prepare(string $sql): PostgresStatement;
+
+ /**
+ * @param non-empty-string $channel
+ */
+ public function notify(string $channel, string $payload = ""): PostgresResult;
+
+ /**
+ * @param non-empty-string $channel
+ */
+ public function listen(string $channel): PostgresListener;
+
+ /**
+ * Execute the statement with the given name and parameters.
+ *
+ * @param list $params List of statement parameters, indexed starting at 0.
+ */
+ public function statementExecute(string $name, array $params): PostgresResult;
+
+ /**
+ * Deallocate the statement with the given name.
+ */
+ public function statementDeallocate(string $name): void;
+}
diff --git a/src/Internal/PostgresHandleConnection.php b/src/Internal/PostgresHandleConnection.php
new file mode 100644
index 0000000..137846e
--- /dev/null
+++ b/src/Internal/PostgresHandleConnection.php
@@ -0,0 +1,157 @@
+handle->getConfig();
+ }
+
+ final public function getLastUsedAt(): int
+ {
+ return $this->handle->getLastUsedAt();
+ }
+
+ final public function close(): void
+ {
+ $this->handle->close();
+ }
+
+ final public function isClosed(): bool
+ {
+ return $this->handle->isClosed();
+ }
+
+ final public function onClose(\Closure $onClose): void
+ {
+ $this->handle->onClose($onClose);
+ }
+
+ private function awaitPending(): void
+ {
+ while ($this->busy) {
+ $this->busy->getFuture()->await();
+ }
+ }
+
+ /**
+ * Reserves the connection for a transaction.
+ */
+ private function reserve(): void
+ {
+ \assert($this->busy === null);
+ $this->busy = new DeferredFuture;
+ }
+
+ /**
+ * Releases the transaction lock.
+ */
+ private function release(): void
+ {
+ \assert($this->busy !== null);
+
+ $this->busy->complete();
+ $this->busy = null;
+ }
+
+ final public function query(string $sql): PostgresResult
+ {
+ $this->awaitPending();
+ return $this->handle->query($sql);
+ }
+
+ final public function execute(string $sql, array $params = []): PostgresResult
+ {
+ $this->awaitPending();
+ return $this->handle->execute($sql, $params);
+ }
+
+ final public function prepare(string $sql): PostgresStatement
+ {
+ $this->awaitPending();
+ return $this->handle->prepare($sql);
+ }
+
+ final public function notify(string $channel, string $payload = ""): PostgresResult
+ {
+ $this->awaitPending();
+ return $this->handle->notify($channel, $payload);
+ }
+
+ final public function listen(string $channel): PostgresListener
+ {
+ $this->awaitPending();
+ return $this->handle->listen($channel);
+ }
+
+ final public function beginTransaction(): PostgresTransaction
+ {
+ $this->reserve();
+
+ try {
+ $this->handle->query("BEGIN TRANSACTION ISOLATION LEVEL " . $this->transactionIsolation->toSql());
+ } catch (\Throwable $exception) {
+ $this->release();
+ throw $exception;
+ }
+
+ return new Internal\PostgresConnectionTransaction(
+ $this->handle,
+ $this->release(...),
+ $this->transactionIsolation,
+ );
+ }
+
+ final public function setTransactionIsolation(TransactionIsolation $isolation): void
+ {
+ $this->transactionIsolation = $isolation;
+ }
+
+ final public function quoteString(string $data): string
+ {
+ return $this->handle->quoteString($data);
+ }
+
+ final public function quoteName(string $name): string
+ {
+ return $this->handle->quoteName($name);
+ }
+
+ final public function escapeByteA(string $data): string
+ {
+ return $this->handle->escapeByteA($data);
+ }
+}
diff --git a/src/Internal/PostgresNestedTransaction.php b/src/Internal/PostgresNestedTransaction.php
index 26d807f..dac2ee4 100644
--- a/src/Internal/PostgresNestedTransaction.php
+++ b/src/Internal/PostgresNestedTransaction.php
@@ -1,4 +1,4 @@
-
+ * @extends NestedTransaction
*/
final class PostgresNestedTransaction extends NestedTransaction implements PostgresTransaction
{
use PostgresTransactionDelegate;
+ /**
+ * @param non-empty-string $identifier
+ * @param \Closure():void $release
+ */
+ public function __construct(
+ private readonly PostgresTransaction $transaction,
+ PostgresHandle $handle,
+ string $identifier,
+ \Closure $release,
+ ) {
+ parent::__construct($transaction, $handle, $identifier, $release);
+ }
+
+ /**
+ * Switches return type to this library's return type.
+ */
protected function getTransaction(): PostgresTransaction
{
return $this->transaction;
}
+
+ protected function createNestedTransaction(
+ Transaction $transaction,
+ Executor $executor,
+ string $identifier,
+ \Closure $release,
+ ): Transaction {
+ return new PostgresNestedTransaction($transaction, $executor, $identifier, $release);
+ }
+
+ public function prepare(string $sql): PostgresStatement
+ {
+ $statement = parent::prepare($sql);
+
+ // Defer statement deallocation until parent is committed or rolled back.
+ $this->transaction->onClose(static fn () => $statement);
+
+ return $statement;
+ }
}
diff --git a/src/Internal/PostgresPooledTransaction.php b/src/Internal/PostgresPooledTransaction.php
index cfa9f66..1e758b6 100644
--- a/src/Internal/PostgresPooledTransaction.php
+++ b/src/Internal/PostgresPooledTransaction.php
@@ -6,6 +6,7 @@
use Amp\Postgres\PostgresStatement;
use Amp\Postgres\PostgresTransaction;
use Amp\Sql\Common\PooledTransaction;
+use Amp\Sql\Transaction;
/**
* @internal
@@ -15,8 +16,22 @@ final class PostgresPooledTransaction extends PooledTransaction implements Postg
{
use PostgresTransactionDelegate;
+ /**
+ * @param \Closure():void $release
+ */
+ public function __construct(private readonly PostgresTransaction $transaction, \Closure $release)
+ {
+ parent::__construct($transaction, $release);
+ }
+
protected function getTransaction(): PostgresTransaction
{
return $this->transaction;
}
+
+ protected function createTransaction(Transaction $transaction, \Closure $release): PostgresTransaction
+ {
+ \assert($transaction instanceof PostgresTransaction);
+ return new PostgresPooledTransaction($transaction, $release);
+ }
}
diff --git a/src/Internal/PostgresStatementPool.php b/src/Internal/PostgresStatementPool.php
index 97767f7..fd4c68c 100644
--- a/src/Internal/PostgresStatementPool.php
+++ b/src/Internal/PostgresStatementPool.php
@@ -2,6 +2,7 @@
namespace Amp\Postgres\Internal;
+use Amp\Postgres\PostgresConfig;
use Amp\Postgres\PostgresResult;
use Amp\Postgres\PostgresStatement;
use Amp\Postgres\PostgresTransaction;
@@ -10,7 +11,7 @@
/**
* @internal
- * @extends SqlStatementPool
+ * @extends SqlStatementPool
*/
final class PostgresStatementPool extends SqlStatementPool implements PostgresStatement
{
diff --git a/src/Internal/PostgresTransactionDelegate.php b/src/Internal/PostgresTransactionDelegate.php
index edb310f..9daaaa0 100644
--- a/src/Internal/PostgresTransactionDelegate.php
+++ b/src/Internal/PostgresTransactionDelegate.php
@@ -55,6 +55,14 @@ public function execute(string $sql, array $params = []): PostgresResult
return parent::execute($sql, $params);
}
+ /**
+ * Changes return type to this library's Transaction type.
+ */
+ public function beginTransaction(): PostgresTransaction
+ {
+ return parent::beginTransaction();
+ }
+
/**
* @param non-empty-string $channel
*/
diff --git a/src/Internal/PqHandle.php b/src/Internal/PqHandle.php
index 3694e4e..a366ad2 100644
--- a/src/Internal/PqHandle.php
+++ b/src/Internal/PqHandle.php
@@ -5,7 +5,7 @@
use Amp\DeferredFuture;
use Amp\Future;
use Amp\Pipeline\Queue;
-use Amp\Postgres\PostgresHandle;
+use Amp\Postgres\PostgresConfig;
use Amp\Postgres\PostgresListener;
use Amp\Postgres\PostgresNotification;
use Amp\Postgres\PostgresResult;
@@ -28,7 +28,7 @@ final class PqHandle extends AbstractHandle
/** @var array> */
private array $statements = [];
- public function __construct(pq\Connection $handle)
+ public function __construct(pq\Connection $handle, PostgresConfig $config)
{
$this->handle = $handle;
@@ -103,7 +103,7 @@ public function __construct(pq\Connection $handle)
EventLoop::unreference($poll);
EventLoop::disable($await);
- parent::__construct($poll, $await, $onClose);
+ parent::__construct($config, $poll, $await, $onClose);
}
public function isClosed(): bool
diff --git a/src/PgSqlConnection.php b/src/PgSqlConnection.php
index 0ddc91c..c15023b 100644
--- a/src/PgSqlConnection.php
+++ b/src/PgSqlConnection.php
@@ -7,12 +7,12 @@
use Amp\Sql\ConnectionException;
use Revolt\EventLoop;
-final class PgSqlConnection extends PostgresConnection implements PostgresLink
+final class PgSqlConnection extends Internal\PostgresHandleConnection implements PostgresConnection
{
/**
* @throws \Error If pecl-ev is used as a loop extension.
*/
- public static function connect(PostgresConfig $connectionConfig, ?Cancellation $cancellation = null): self
+ public static function connect(PostgresConfig $config, ?Cancellation $cancellation = null): self
{
// @codeCoverageIgnoreStart
/** @psalm-suppress UndefinedClass */
@@ -20,7 +20,7 @@ public static function connect(PostgresConfig $connectionConfig, ?Cancellation $
throw new \Error('ext-pgsql is not compatible with pecl-ev; use pecl-pq or a different loop extension');
} // @codeCoverageIgnoreEnd
- if (!$connection = \pg_connect($connectionConfig->getConnectionString(), \PGSQL_CONNECT_ASYNC | \PGSQL_CONNECT_FORCE_NEW)) {
+ if (!$connection = \pg_connect($config->getConnectionString(), \PGSQL_CONNECT_ASYNC | \PGSQL_CONNECT_FORCE_NEW)) {
throw new ConnectionException("Failed to create connection resource");
}
@@ -32,11 +32,11 @@ public static function connect(PostgresConfig $connectionConfig, ?Cancellation $
throw new ConnectionException("Failed to access connection socket");
}
- $hash = \sha1($connectionConfig->getHost() . $connectionConfig->getPort() . $connectionConfig->getUser());
+ $hash = \sha1($config->getHost() . $config->getPort() . $config->getUser());
$deferred = new DeferredFuture();
/** @psalm-suppress MissingClosureParamType $resource is a resource and cannot be inferred in this context */
- $callback = static function (string $callbackId, $resource) use (&$poll, &$await, $connection, $deferred, $hash): void {
+ $callback = static function (string $callbackId, $resource) use (&$poll, &$await, $connection, $config, $deferred, $hash): void {
switch ($result = \pg_connect_poll($connection)) {
case \PGSQL_POLLING_READING:
case \PGSQL_POLLING_WRITING:
@@ -47,7 +47,7 @@ public static function connect(PostgresConfig $connectionConfig, ?Cancellation $
break;
case \PGSQL_POLLING_OK:
- $deferred->complete(new self($connection, $resource, $hash));
+ $deferred->complete(new self($connection, $resource, $hash, $config));
break;
default:
@@ -75,8 +75,8 @@ public static function connect(PostgresConfig $connectionConfig, ?Cancellation $
* @param resource $socket PostgreSQL connection stream socket.
* @param string $id Connection identifier for determining which cached type table to use.
*/
- protected function __construct(\PgSql\Connection $handle, $socket, string $id)
+ protected function __construct(\PgSql\Connection $handle, $socket, string $id, PostgresConfig $config)
{
- parent::__construct(new Internal\PgSqlHandle($handle, $socket, $id));
+ parent::__construct(new Internal\PgSqlHandle($handle, $socket, $id, $config));
}
}
diff --git a/src/PostgresConnection.php b/src/PostgresConnection.php
index 16557c6..a840d73 100644
--- a/src/PostgresConnection.php
+++ b/src/PostgresConnection.php
@@ -2,133 +2,27 @@
namespace Amp\Postgres;
-use Amp\Cancellation;
-use Amp\DeferredFuture;
+use Amp\Sql\Connection;
use Amp\Sql\ConnectionException;
-use Amp\Sql\TransactionIsolation;
-use Amp\Sql\TransactionIsolationLevel;
+use Amp\Sql\QueryError;
+use Amp\Sql\SqlException;
-abstract class PostgresConnection implements PostgresLink, PostgresReceiver
+/**
+ * @extends Connection
+ */
+interface PostgresConnection extends Connection, PostgresLink
{
- /** @var DeferredFuture|null Used to only allow one transaction at a time. */
- private ?DeferredFuture $busy = null;
-
/**
- * @throws ConnectionException
+ * @return PostgresConfig Config object specific to this library.
*/
- abstract public static function connect(
- PostgresConfig $connectionConfig,
- ?Cancellation $cancellation = null,
- ): self;
-
- protected function __construct(private readonly PostgresHandle $handle)
- {
- }
-
- final public function getLastUsedAt(): int
- {
- return $this->handle->getLastUsedAt();
- }
-
- final public function close(): void
- {
- $this->handle->close();
- }
-
- final public function isClosed(): bool
- {
- return $this->handle->isClosed();
- }
-
- final public function onClose(\Closure $onClose): void
- {
- $this->handle->onClose($onClose);
- }
-
- private function awaitPending(): void
- {
- while ($this->busy) {
- $this->busy->getFuture()->await();
- }
- }
+ public function getConfig(): PostgresConfig;
/**
- * Reserves the connection for a transaction.
+ * @param non-empty-string $channel Channel name.
+ *
+ * @throws SqlException If the operation fails due to unexpected condition.
+ * @throws ConnectionException If the connection to the database is lost.
+ * @throws QueryError If the operation fails due to an error in the query (such as a syntax error).
*/
- private function reserve(): void
- {
- \assert($this->busy === null);
- $this->busy = new DeferredFuture;
- }
-
- /**
- * Releases the transaction lock.
- */
- private function release(): void
- {
- \assert($this->busy !== null);
-
- $this->busy->complete();
- $this->busy = null;
- }
-
- final public function query(string $sql): PostgresResult
- {
- $this->awaitPending();
- return $this->handle->query($sql);
- }
-
- final public function execute(string $sql, array $params = []): PostgresResult
- {
- $this->awaitPending();
- return $this->handle->execute($sql, $params);
- }
-
- final public function prepare(string $sql): PostgresStatement
- {
- $this->awaitPending();
- return $this->handle->prepare($sql);
- }
-
- final public function notify(string $channel, string $payload = ""): PostgresResult
- {
- $this->awaitPending();
- return $this->handle->notify($channel, $payload);
- }
-
- final public function listen(string $channel): PostgresListener
- {
- $this->awaitPending();
- return $this->handle->listen($channel);
- }
-
- final public function beginTransaction(
- TransactionIsolation $isolation = TransactionIsolationLevel::Committed
- ): PostgresTransaction {
- $this->reserve();
-
- try {
- $this->handle->query("BEGIN TRANSACTION ISOLATION LEVEL " . $isolation->toSql());
- } catch (\Throwable $exception) {
- $this->release();
- throw $exception;
- }
-
- return new Internal\PostgresConnectionTransaction($this->handle, $this->release(...), $isolation);
- }
-
- final public function quoteString(string $data): string
- {
- return $this->handle->quoteString($data);
- }
-
- final public function quoteName(string $name): string
- {
- return $this->handle->quoteName($name);
- }
-
- final public function escapeByteA(string $data): string
- {
- return $this->handle->escapeByteA($data);
- }
+ public function listen(string $channel): PostgresListener;
}
diff --git a/src/PostgresConnectionPool.php b/src/PostgresConnectionPool.php
index e92d0f8..0711593 100644
--- a/src/PostgresConnectionPool.php
+++ b/src/PostgresConnectionPool.php
@@ -3,21 +3,20 @@
namespace Amp\Postgres;
use Amp\Future;
+use Amp\Postgres\Internal\PostgresHandleConnection;
use Amp\Sql\Common\ConnectionPool;
use Amp\Sql\Result;
use Amp\Sql\SqlConnector;
use Amp\Sql\Statement;
use Amp\Sql\Transaction;
-use Amp\Sql\TransactionIsolation;
-use Amp\Sql\TransactionIsolationLevel;
use function Amp\async;
/**
- * @extends ConnectionPool
+ * @extends ConnectionPool
*/
-final class PostgresConnectionPool extends ConnectionPool implements PostgresLink, PostgresReceiver
+final class PostgresConnectionPool extends ConnectionPool implements PostgresConnection
{
- /** @var Future|null Connection used for notification listening. */
+ /** @var Future|null Connection used for notification listening. */
private Future|null $listeningConnection = null;
/** @var int Number of listeners on listening connection. */
@@ -27,7 +26,7 @@ final class PostgresConnectionPool extends ConnectionPool implements PostgresLin
* @param positive-int $maxConnections
* @param positive-int $idleTimeout
* @param bool $resetConnections True to automatically execute DISCARD ALL on a connection before use.
- * @param SqlConnector|null $connector
+ * @param SqlConnector|null $connector
*/
public function __construct(
PostgresConfig $config,
@@ -65,16 +64,7 @@ protected function createTransaction(Transaction $transaction, \Closure $release
return new Internal\PostgresPooledTransaction($transaction, $release);
}
- /**
- * Changes return type to this library's Transaction type.
- */
- public function beginTransaction(
- TransactionIsolation $isolation = TransactionIsolationLevel::Committed
- ): PostgresTransaction {
- return parent::beginTransaction($isolation);
- }
-
- protected function pop(): PostgresConnection
+ protected function pop(): PostgresHandleConnection
{
$connection = parent::pop();
@@ -109,6 +99,22 @@ public function execute(string $sql, array $params = []): PostgresResult
return parent::execute($sql, $params);
}
+ /**
+ * Changes return type to this library's Transaction type.
+ */
+ public function beginTransaction(): PostgresTransaction
+ {
+ return parent::beginTransaction();
+ }
+
+ /**
+ * Changes return type to this library's configuration type.
+ */
+ public function getConfig(): PostgresConfig
+ {
+ return parent::getConfig();
+ }
+
public function notify(string $channel, string $payload = ""): PostgresResult
{
$connection = $this->pop();
diff --git a/src/PostgresExecutor.php b/src/PostgresExecutor.php
index 96b9652..6039b82 100644
--- a/src/PostgresExecutor.php
+++ b/src/PostgresExecutor.php
@@ -11,20 +11,22 @@
*/
interface PostgresExecutor extends Executor, PostgresQuoter
{
- /**
- * @return PostgresResult Result object specific to this library.
- */
- public function query(string $sql): PostgresResult;
+// /**
+// * @return PostgresResult Result object specific to this library.
+// */
+// public function query(string $sql): PostgresResult;
+//
+// /**
+// * @return PostgresStatement Statement object specific to this library.
+// */
+// public function prepare(string $sql): PostgresStatement;
+//
+// /**
+// * @return PostgresResult Result object specific to this library.
+// */
+// public function execute(string $sql, array $params = []): PostgresResult;
+//
- /**
- * @return PostgresStatement Statement object specific to this library.
- */
- public function prepare(string $sql): PostgresStatement;
-
- /**
- * @return PostgresResult Result object specific to this library.
- */
- public function execute(string $sql, array $params = []): PostgresResult;
/**
* @param non-empty-string $channel Channel name.
diff --git a/src/PostgresHandle.php b/src/PostgresHandle.php
deleted file mode 100644
index 79ea9a0..0000000
--- a/src/PostgresHandle.php
+++ /dev/null
@@ -1,20 +0,0 @@
- $params List of statement parameters, indexed starting at 0.
- */
- public function statementExecute(string $name, array $params): PostgresResult;
-
- /**
- * Deallocate the statement with the given name.
- */
- public function statementDeallocate(string $name): void;
-}
diff --git a/src/PostgresLink.php b/src/PostgresLink.php
index de5a08f..bc9f256 100644
--- a/src/PostgresLink.php
+++ b/src/PostgresLink.php
@@ -3,18 +3,14 @@
namespace Amp\Postgres;
use Amp\Sql\Link;
-use Amp\Sql\TransactionIsolation;
-use Amp\Sql\TransactionIsolationLevel;
/**
* @extends Link
*/
-interface PostgresLink extends Link, PostgresQuoter
+interface PostgresLink extends Link, PostgresExecutor
{
- /**
- * @return PostgresTransaction Transaction object specific to this library.
- */
- public function beginTransaction(
- TransactionIsolation $isolation = TransactionIsolationLevel::Committed
- ): PostgresTransaction;
+// /**
+// * @return PostgresTransaction Transaction object specific to this library.
+// */
+// public function beginTransaction(): PostgresTransaction;
}
diff --git a/src/PostgresNestableTransaction.php b/src/PostgresNestableTransaction.php
deleted file mode 100644
index 05023f2..0000000
--- a/src/PostgresNestableTransaction.php
+++ /dev/null
@@ -1,46 +0,0 @@
-
- */
-final class PostgresNestableTransaction extends NestableTransaction implements PostgresLink
-{
- protected function createNestedTransaction(
- Transaction $transaction,
- \Closure $release,
- string $identifier,
- ): Transaction {
- return new Internal\PostgresNestedTransaction($transaction, $release, $identifier);
- }
-
- /**
- * Changes return type to this library's Transaction type.
- */
- public function beginTransaction(
- TransactionIsolation $isolation = TransactionIsolationLevel::Committed
- ): PostgresTransaction {
- return parent::beginTransaction($isolation);
- }
-
- public function quoteString(string $data): string
- {
- return $this->transaction->quoteString($data);
- }
-
- public function quoteName(string $name): string
- {
- return $this->transaction->quoteName($name);
- }
-
- public function escapeByteA(string $data): string
- {
- return $this->transaction->escapeByteA($data);
- }
-}
diff --git a/src/PostgresReceiver.php b/src/PostgresReceiver.php
deleted file mode 100644
index e7c4803..0000000
--- a/src/PostgresReceiver.php
+++ /dev/null
@@ -1,19 +0,0 @@
-
+ * @extends Transaction
*/
-interface PostgresTransaction extends PostgresExecutor, Transaction
+interface PostgresTransaction extends Transaction, PostgresLink
{
}
diff --git a/src/PqConnection.php b/src/PqConnection.php
index 77b1960..23a94c2 100644
--- a/src/PqConnection.php
+++ b/src/PqConnection.php
@@ -8,14 +8,14 @@
use pq;
use Revolt\EventLoop;
-final class PqConnection extends PostgresConnection implements PostgresLink
+final class PqConnection extends Internal\PostgresHandleConnection implements PostgresConnection
{
private readonly Internal\PqHandle $handle;
- public static function connect(PostgresConfig $connectionConfig, ?Cancellation $cancellation = null): self
+ public static function connect(PostgresConfig $config, ?Cancellation $cancellation = null): self
{
try {
- $connection = new pq\Connection($connectionConfig->getConnectionString(), pq\Connection::ASYNC);
+ $connection = new pq\Connection($config->getConnectionString(), pq\Connection::ASYNC);
} catch (pq\Exception $exception) {
throw new ConnectionException("Could not connect to PostgreSQL server", 0, $exception);
}
@@ -24,7 +24,7 @@ public static function connect(PostgresConfig $connectionConfig, ?Cancellation $
$connection->unbuffered = true;
$deferred = new DeferredFuture();
- $callback = function () use (&$poll, &$await, $connection, $deferred): void {
+ $callback = function () use (&$poll, &$await, $connection, $config, $deferred): void {
switch ($result = $connection->poll()) {
case pq\Connection::POLLING_READING:
case pq\Connection::POLLING_WRITING:
@@ -35,7 +35,7 @@ public static function connect(PostgresConfig $connectionConfig, ?Cancellation $
break;
case pq\Connection::POLLING_OK:
- $deferred->complete(new self($connection));
+ $deferred->complete(new self($connection, $config));
break;
default:
@@ -58,9 +58,9 @@ public static function connect(PostgresConfig $connectionConfig, ?Cancellation $
}
}
- protected function __construct(pq\Connection $handle)
+ protected function __construct(pq\Connection $handle, PostgresConfig $config)
{
- $this->handle = new Internal\PqHandle($handle);
+ $this->handle = new Internal\PqHandle($handle, $config);
parent::__construct($this->handle);
}
diff --git a/src/functions.php b/src/functions.php
index c1c7886..9bf65ad 100644
--- a/src/functions.php
+++ b/src/functions.php
@@ -3,15 +3,16 @@
namespace Amp\Postgres;
use Amp\Cancellation;
+use Amp\Postgres\Internal\PostgresHandleConnection;
use Amp\Sql\Common\RetrySqlConnector;
use Amp\Sql\SqlConnector;
use Amp\Sql\SqlException;
use Revolt\EventLoop;
/**
- * @param SqlConnector|null $connector
+ * @param SqlConnector|null $connector
*
- * @return SqlConnector
+ * @return SqlConnector
*/
function postgresConnector(?SqlConnector $connector = null): SqlConnector
{
@@ -25,7 +26,7 @@ function postgresConnector(?SqlConnector $connector = null): SqlConnector
/**
* @psalm-suppress InvalidArgument
- * @var SqlConnector
+ * @var SqlConnector
*/
return $map[$driver] ??= new RetrySqlConnector(new DefaultPostgresConnector());
}
@@ -37,7 +38,7 @@ function postgresConnector(?SqlConnector $connector = null): SqlConnector
*
* @throws \Error If neither ext-pgsql or pecl-pq is loaded.
*/
-function connect(PostgresConfig $config, ?Cancellation $cancellation = null): PostgresConnection
+function connect(PostgresConfig $config, ?Cancellation $cancellation = null): PostgresHandleConnection
{
return postgresConnector()->connect($config, $cancellation);
}
diff --git a/test/AbstractConnectTest.php b/test/AbstractConnectTest.php
index 3ea0fed..6e35b91 100644
--- a/test/AbstractConnectTest.php
+++ b/test/AbstractConnectTest.php
@@ -4,13 +4,13 @@
use Amp\Cancellation;
use Amp\PHPUnit\AsyncTestCase;
+use Amp\Postgres\Internal\PostgresHandleConnection;
use Amp\Postgres\PostgresConfig;
-use Amp\Postgres\PostgresConnection;
abstract class AbstractConnectTest extends AsyncTestCase
{
abstract public function connect(
PostgresConfig $connectionConfig,
?Cancellation $cancellation = null
- ): PostgresConnection;
+ ): PostgresHandleConnection;
}
diff --git a/test/AbstractConnectionTest.php b/test/AbstractConnectionTest.php
index 3426966..b860310 100644
--- a/test/AbstractConnectionTest.php
+++ b/test/AbstractConnectionTest.php
@@ -16,13 +16,13 @@ abstract class AbstractConnectionTest extends AbstractLinkTest
{
public function testIsClosed()
{
- $this->assertFalse($this->link->isClosed());
+ $this->assertFalse($this->executor->isClosed());
}
public function testConnectionCloseDuringQuery(): void
{
- $query = async($this->link->execute(...), 'SELECT pg_sleep(10)');
- $close = async($this->link->close(...));
+ $query = async($this->executor->execute(...), 'SELECT pg_sleep(10)');
+ $close = async($this->executor->close(...));
$start = \microtime(true);
@@ -41,14 +41,14 @@ public function testConnectionCloseDuringQuery(): void
public function testListen()
{
$channel = "test";
- $listener = $this->link->listen($channel);
+ $listener = $this->executor->listen($channel);
$this->assertInstanceOf(PostgresListener::class, $listener);
$this->assertSame($channel, $listener->getChannel());
EventLoop::delay(0.1, function () use ($channel): void {
- $this->link->query(\sprintf("NOTIFY %s, '%s'", $channel, '0'));
- $this->link->query(\sprintf("NOTIFY %s, '%s'", $channel, '1'));
+ $this->executor->query(\sprintf("NOTIFY %s, '%s'", $channel, '0'));
+ $this->executor->query(\sprintf("NOTIFY %s, '%s'", $channel, '1'));
});
$count = 0;
@@ -68,11 +68,11 @@ public function testListen()
public function testNotify()
{
$channel = "test";
- $listener = $this->link->listen($channel);
+ $listener = $this->executor->listen($channel);
EventLoop::delay(0.1, function () use ($channel) {
- $this->link->notify($channel, '0');
- $this->link->notify($channel, '1');
+ $this->executor->notify($channel, '0');
+ $this->executor->notify($channel, '1');
});
$count = 0;
@@ -95,18 +95,18 @@ public function testListenOnSameChannel()
$this->expectExceptionMessage('Already listening on channel');
$channel = "test";
- Future\await([$this->link->listen($channel), $this->link->listen($channel)]);
+ Future\await([$this->executor->listen($channel), $this->executor->listen($channel)]);
}
public function testQueryAfterErroredQuery()
{
try {
- $result = $this->link->query("INSERT INTO test VALUES ('github', 'com', '{1, 2, 3}', true, 4.2)");
+ $result = $this->executor->query("INSERT INTO test VALUES ('github', 'com', '{1, 2, 3}', true, 4.2)");
} catch (QueryExecutionError $exception) {
// Expected exception due to duplicate key.
}
- $result = $this->link->query("INSERT INTO test VALUES ('gitlab', 'com', '{1, 2, 3}', true, 4.2)");
+ $result = $this->executor->query("INSERT INTO test VALUES ('gitlab', 'com', '{1, 2, 3}', true, 4.2)");
$this->assertSame(1, $result->getRowCount());
}
diff --git a/test/AbstractLinkTest.php b/test/AbstractLinkTest.php
index b897597..05cb070 100644
--- a/test/AbstractLinkTest.php
+++ b/test/AbstractLinkTest.php
@@ -5,7 +5,8 @@
use Amp\Future;
use Amp\PHPUnit\AsyncTestCase;
use Amp\Postgres\ByteA;
-use Amp\Postgres\PostgresConnection;
+use Amp\Postgres\Internal\PostgresHandleConnection;
+use Amp\Postgres\PostgresExecutor;
use Amp\Postgres\PostgresLink;
use Amp\Postgres\PostgresTransaction;
use Amp\Postgres\QueryExecutionError;
@@ -13,7 +14,6 @@
use Amp\Sql\Result;
use Amp\Sql\Statement;
use Amp\Sql\TransactionError;
-use Amp\Sql\TransactionIsolationLevel;
use function Amp\async;
abstract class AbstractLinkTest extends AsyncTestCase
@@ -31,7 +31,7 @@ abstract class AbstractLinkTest extends AsyncTestCase
protected const INSERT_QUERY = 'INSERT INTO test VALUES ($1, $2, $3, $4, $5, $6, $7, $8)';
protected const FIELD_COUNT = 8;
- protected PostgresLink $link;
+ protected PostgresExecutor $executor;
private ?array $data = null;
@@ -74,12 +74,12 @@ protected function verifyResult(Result $result, array $data): void
}
/**
- * @return PostgresLink Connection or Link object to be tested.
+ * @return PostgresLink Executor object to be tested.
*/
abstract public function createLink(string $connectionString): PostgresLink;
/**
- * Helper method to invoke the protected constructor of classes extending {@see PostgresConnection}.
+ * Helper method to invoke the protected constructor of classes extending {@see PostgresHandleConnection}.
*
* @template T extends Connection
*
@@ -88,7 +88,7 @@ abstract public function createLink(string $connectionString): PostgresLink;
*
* @return T
*/
- protected function newConnection(string $className, mixed ...$args): PostgresConnection
+ protected function newConnection(string $className, mixed ...$args): PostgresHandleConnection
{
$reflection = new \ReflectionClass($className);
$connection = $reflection->newInstanceWithoutConstructor();
@@ -99,18 +99,18 @@ protected function newConnection(string $className, mixed ...$args): PostgresCon
public function setUp(): void
{
parent::setUp();
- $this->link = $this->createLink('host=localhost user=postgres password=postgres');
+ $this->executor = $this->createLink('host=localhost user=postgres password=postgres');
}
public function tearDown(): void
{
parent::tearDown();
- $this->link->close();
+ $this->executor->close();
}
public function testQueryFetchRow(): void
{
- $result = $this->link->query("SELECT * FROM test");
+ $result = $this->executor->query("SELECT * FROM test");
$data = $this->getData();
while ($row = $result->fetchRow()) {
@@ -122,7 +122,7 @@ public function testQueryFetchRow(): void
public function testQueryWithTupleResult()
{
- $result = $this->link->query("SELECT * FROM test");
+ $result = $this->executor->query("SELECT * FROM test");
$data = $this->getData();
@@ -133,7 +133,7 @@ public function testQueryWithTupleResult()
public function testMultipleQueryWithTupleResult()
{
- $result = $this->link->query("SELECT * FROM test; SELECT * FROM test");
+ $result = $this->executor->query("SELECT * FROM test; SELECT * FROM test");
$data = $this->getData();
@@ -150,7 +150,7 @@ public function testMultipleQueryWithTupleResult()
public function testMultipleQueryWithCommandResultFirst()
{
- $result = $this->link->query("INSERT INTO test VALUES ('canon', 'jp', '{1}', true, 4.2, null, null, '3.1415926'); SELECT * FROM test");
+ $result = $this->executor->query("INSERT INTO test VALUES ('canon', 'jp', '{1}', true, 4.2, null, null, '3.1415926'); SELECT * FROM test");
$this->assertSame(1, $result->getRowCount());
@@ -168,7 +168,7 @@ public function testMultipleQueryWithCommandResultFirst()
public function testMultipleQueryWithCommandResultSecond()
{
- $result = $this->link->query("SELECT * FROM test; INSERT INTO test VALUES ('canon', 'jp', '{1}', true, 4.2)");
+ $result = $this->executor->query("SELECT * FROM test; INSERT INTO test VALUES ('canon', 'jp', '{1}', true, 4.2)");
$data = $this->getData();
@@ -183,13 +183,13 @@ public function testMultipleQueryWithCommandResultSecond()
public function testQueryWithUnconsumedTupleResult()
{
- $result = $this->link->query("SELECT * FROM test");
+ $result = $this->executor->query("SELECT * FROM test");
$this->assertInstanceOf(Result::class, $result);
unset($result); // Force destruction of result object.
- $result = $this->link->query("SELECT * FROM test");
+ $result = $this->executor->query("SELECT * FROM test");
$this->assertInstanceOf(Result::class, $result);
@@ -200,7 +200,7 @@ public function testQueryWithUnconsumedTupleResult()
public function testQueryWithCommandResult(): void
{
- $result = $this->link->query("INSERT INTO test VALUES ('canon', 'jp', '{1}', true, 4.2)");
+ $result = $this->executor->query("INSERT INTO test VALUES ('canon', 'jp', '{1}', true, 4.2)");
$this->assertSame(1, $result->getRowCount());
}
@@ -208,14 +208,14 @@ public function testQueryWithCommandResult(): void
public function testQueryWithEmptyQuery(): void
{
$this->expectException(QueryError::class);
- $this->link->query('');
+ $this->executor->query('');
}
public function testQueryWithSyntaxError()
{
/** @var Result $result */
try {
- $result = $this->link->query("SELECT & FROM test");
+ $result = $this->executor->query("SELECT & FROM test");
$this->fail(\sprintf("An instance of %s was expected to be thrown", QueryExecutionError::class));
} catch (QueryExecutionError $exception) {
$diagnostics = $exception->getDiagnostics();
@@ -227,7 +227,7 @@ public function testPrepare()
{
$query = "SELECT * FROM test WHERE domain=\$1";
- $statement = $this->link->prepare($query);
+ $statement = $this->executor->prepare($query);
$this->assertSame($query, $statement->getQuery());
@@ -242,7 +242,7 @@ public function testPrepareWithCommandResult()
{
$query = "INSERT INTO test (domain, tld, keys, enabled, number, json) VALUES (:domain, :tld, :keys, :enabled, :number, :json)";
- $statement = $this->link->prepare($query);
+ $statement = $this->executor->prepare($query);
$fields = [
'domain' => 'canon',
@@ -259,7 +259,7 @@ public function testPrepareWithCommandResult()
$this->assertSame(1, $result->getRowCount());
- $result = $this->link->execute('SELECT * FROM test WHERE domain=? AND tld=?', ['canon', 'jp']);
+ $result = $this->executor->execute('SELECT * FROM test WHERE domain=? AND tld=?', ['canon', 'jp']);
$this->verifyResult($result, [\array_values($fields)]);
}
@@ -271,7 +271,7 @@ public function testPrepareWithNamedParams()
{
$query = "SELECT * FROM test WHERE domain=:domain AND tld=:tld";
- $statement = $this->link->prepare($query);
+ $statement = $this->executor->prepare($query);
$data = $this->getData()[0];
@@ -289,7 +289,7 @@ public function testPrepareWithUnnamedParams()
{
$query = "SELECT * FROM test WHERE domain=? AND tld=?";
- $statement = $this->link->prepare($query);
+ $statement = $this->executor->prepare($query);
$data = $this->getData()[0];
@@ -307,7 +307,7 @@ public function testPrepareWithNamedParamsWithDataAppearingAsNamedParam()
{
$query = "SELECT * FROM test WHERE domain=:domain OR domain=':domain'";
- $statement = $this->link->prepare($query);
+ $statement = $this->executor->prepare($query);
$data = $this->getData()[0];
@@ -328,7 +328,7 @@ public function testPrepareInvalidQuery()
$query = "SELECT * FROM test WHERE invalid=\$1";
- $statement = $this->link->prepare($query);
+ $statement = $this->executor->prepare($query);
$statement->execute(['param']);
}
@@ -340,9 +340,9 @@ public function testPrepareSameQuery()
{
$sql = "SELECT * FROM test WHERE domain=\$1";
- $statement1 = $this->link->prepare($sql);
+ $statement1 = $this->executor->prepare($sql);
- $statement2 = $this->link->prepare($sql);
+ $statement2 = $this->executor->prepare($sql);
$this->assertInstanceOf(Statement::class, $statement1);
$this->assertInstanceOf(Statement::class, $statement2);
@@ -363,8 +363,8 @@ public function testSimultaneousPrepareSameQuery()
{
$sql = "SELECT * FROM test WHERE domain=\$1";
- $statement1 = async(fn () => $this->link->prepare($sql));
- $statement2 = async(fn () => $this->link->prepare($sql));
+ $statement1 = async(fn () => $this->executor->prepare($sql));
+ $statement2 = async(fn () => $this->executor->prepare($sql));
[$statement1, $statement2] = Future\await([$statement1, $statement2]);
@@ -383,9 +383,9 @@ public function testSimultaneousPrepareSameQuery()
public function testPrepareSimilarQueryReturnsDifferentStatements()
{
- $statement1 = async(fn () => $this->link->prepare("SELECT * FROM test WHERE domain=\$1"));
+ $statement1 = async(fn () => $this->executor->prepare("SELECT * FROM test WHERE domain=\$1"));
- $statement2 = async(fn () => $this->link->prepare("SELECT * FROM test WHERE domain=:domain"));
+ $statement2 = async(fn () => $this->executor->prepare("SELECT * FROM test WHERE domain=:domain"));
[$statement1, $statement2] = Future\await([$statement1, $statement2]);
@@ -412,7 +412,7 @@ public function testPrepareSimilarQueryReturnsDifferentStatements()
public function testPrepareThenExecuteWithUnconsumedTupleResult()
{
- $statement = $this->link->prepare("SELECT * FROM test");
+ $statement = $this->executor->prepare("SELECT * FROM test");
$result = $statement->execute();
@@ -429,7 +429,7 @@ public function testExecute()
{
$data = $this->getData()[0];
- $result = $this->link->execute("SELECT * FROM test WHERE domain=\$1", [$data[0]]);
+ $result = $this->executor->execute("SELECT * FROM test WHERE domain=\$1", [$data[0]]);
$this->verifyResult($result, [$data]);
}
@@ -441,7 +441,7 @@ public function testExecuteWithNamedParams()
{
$data = $this->getData()[0];
- $result = $this->link->execute(
+ $result = $this->executor->execute(
"SELECT * FROM test WHERE domain=:domain",
['domain' => $data[0]]
);
@@ -457,7 +457,7 @@ public function testExecuteWithInvalidParams()
$this->expectException(\Error::class);
$this->expectExceptionMessage("Value for unnamed parameter at position 0 missing");
- $this->link->execute("SELECT * FROM test WHERE domain=\$1");
+ $this->executor->execute("SELECT * FROM test WHERE domain=\$1");
}
/**
@@ -468,7 +468,7 @@ public function testExecuteWithInvalidNamedParams()
$this->expectException(\Error::class);
$this->expectExceptionMessage("Value for named parameter 'domain' missing");
- $this->link->execute("SELECT * FROM test WHERE domain=:domain", ['tld' => 'com']);
+ $this->executor->execute("SELECT * FROM test WHERE domain=:domain", ['tld' => 'com']);
}
/**
@@ -477,7 +477,7 @@ public function testExecuteWithInvalidNamedParams()
public function testSimultaneousQuery()
{
$callback = fn (int $value) => async(function () use ($value): void {
- $result = $this->link->query("SELECT {$value} as value");
+ $result = $this->executor->query("SELECT {$value} as value");
foreach ($result as $row) {
$this->assertEquals($value, $row['value']);
@@ -493,7 +493,7 @@ public function testSimultaneousQuery()
public function testSimultaneousQueryWithOneFailing()
{
$callback = fn (string $query) => async(function () use ($query): Result {
- $result = $this->link->query($query);
+ $result = $this->executor->query($query);
$data = $this->getData();
@@ -527,13 +527,13 @@ public function testSimultaneousQueryAndPrepare()
{
$promises = [];
$promises[] = async(function () {
- $result = $this->link->query("SELECT * FROM test");
+ $result = $this->executor->query("SELECT * FROM test");
$data = $this->getData();
$this->verifyResult($result, $data);
});
$promises[] = async(function () {
- $statement = ($this->link->prepare("SELECT * FROM test"));
+ $statement = ($this->executor->prepare("SELECT * FROM test"));
$result = $statement->execute();
$data = $this->getData();
$this->verifyResult($result, $data);
@@ -545,14 +545,14 @@ public function testSimultaneousQueryAndPrepare()
public function testSimultaneousPrepareAndExecute()
{
$promises[] = async(function () {
- $statement = $this->link->prepare("SELECT * FROM test");
+ $statement = $this->executor->prepare("SELECT * FROM test");
$result = $statement->execute();
$data = $this->getData();
$this->verifyResult($result, $data);
});
$promises[] = async(function () {
- $result = $this->link->execute("SELECT * FROM test");
+ $result = $this->executor->execute("SELECT * FROM test");
$data = $this->getData();
$this->verifyResult($result, $data);
});
@@ -562,9 +562,7 @@ public function testSimultaneousPrepareAndExecute()
public function testTransaction()
{
- $isolation = TransactionIsolationLevel::Committed;
-
- $transaction = $this->link->beginTransaction($isolation);
+ $transaction = $this->executor->beginTransaction();
$this->assertInstanceOf(PostgresTransaction::class, $transaction);
@@ -572,20 +570,20 @@ public function testTransaction()
$this->assertFalse($transaction->isClosed());
$this->assertTrue($transaction->isActive());
- $this->assertSame($isolation, $transaction->getIsolationLevel());
- $transaction->createSavepoint('test');
+ $nested = $transaction->beginTransaction();
- $statement = $transaction->prepare("SELECT * FROM test WHERE domain=:domain");
+ $statement = $nested->prepare("SELECT * FROM test WHERE domain=:domain");
$result = $statement->execute(['domain' => $data[0]]);
unset($result); // Force destruction of result object.
+ unset($statement);
- $result = $transaction->execute("SELECT * FROM test WHERE domain=\$1 FOR UPDATE", [$data[0]]);
+ $result = $nested->execute("SELECT * FROM test WHERE domain=\$1 FOR UPDATE", [$data[0]]);
unset($result); // Force destruction of result object.
- $transaction->rollbackTo('test');
+ $nested->rollback();
$transaction->commit();
@@ -630,13 +628,13 @@ public function testStatementInsertByteA(
array $params,
array $expected
): void {
- $statement = $this->link->prepare($insertSql);
+ $statement = $this->executor->prepare($insertSql);
$result = $statement->execute($params);
$this->assertSame(1, $result->getRowCount());
- $result = $this->link->execute($selectSql, $params);
+ $result = $this->executor->execute($selectSql, $params);
$this->assertSame($expected, $result->fetchRow());
}
@@ -649,11 +647,11 @@ public function testExecuteInsertByteA(
array $params,
array $expected
): void {
- $result = $this->link->execute($insertSql, $params);
+ $result = $this->executor->execute($insertSql, $params);
$this->assertSame(1, $result->getRowCount());
- $result = $this->link->execute($selectSql, $params);
+ $result = $this->executor->execute($selectSql, $params);
$this->assertSame($expected, $result->fetchRow());
}
}
diff --git a/test/AbstractNestedTransactionTest.php b/test/AbstractNestedTransactionTest.php
new file mode 100644
index 0000000..b175886
--- /dev/null
+++ b/test/AbstractNestedTransactionTest.php
@@ -0,0 +1,43 @@
+connect($connectionConfig);
+
+ $connection->query(self::DROP_QUERY);
+
+ $connection->query(self::CREATE_QUERY);
+
+ foreach ($this->getParams() as $row) {
+ $connection->execute(self::INSERT_QUERY, $row);
+ }
+
+ $this->transaction = $connection->beginTransaction();
+ $this->nested = $this->transaction->beginTransaction();
+
+ return $this->nested;
+ }
+
+ public function tearDown(): void
+ {
+ $this->nested->rollback();
+ $this->transaction->rollback();
+
+ parent::tearDown();
+ }
+}
diff --git a/test/AbstractQuoteTest.php b/test/AbstractQuoteTest.php
index eac1a9d..07a0566 100644
--- a/test/AbstractQuoteTest.php
+++ b/test/AbstractQuoteTest.php
@@ -2,12 +2,12 @@
namespace Amp\Postgres\Test;
+use Amp\Postgres\Internal\PostgresHandleConnection;
use Amp\Postgres\PostgresConfig;
-use Amp\Postgres\PostgresConnection;
abstract class AbstractQuoteTest extends AbstractConnectTest
{
- private PostgresConnection $connection;
+ private PostgresHandleConnection $connection;
public function setUp(): void
{
diff --git a/test/FunctionsTest.php b/test/FunctionsTest.php
index a612ff1..aef26e5 100644
--- a/test/FunctionsTest.php
+++ b/test/FunctionsTest.php
@@ -5,8 +5,8 @@
use Amp\CancelledException;
use Amp\DeferredCancellation;
use Amp\PHPUnit\AsyncTestCase;
+use Amp\Postgres\Internal\PostgresHandleConnection;
use Amp\Postgres\PostgresConfig;
-use Amp\Postgres\PostgresConnection;
use function Amp\Postgres\connect;
class FunctionsTest extends AsyncTestCase
@@ -23,7 +23,7 @@ public function setUp(): void
public function testConnect()
{
$connection = connect(PostgresConfig::fromString('host=localhost user=postgres password=postgres'));
- $this->assertInstanceOf(PostgresConnection::class, $connection);
+ $this->assertInstanceOf(PostgresHandleConnection::class, $connection);
}
/**
diff --git a/test/PgSqlConnectionTest.php b/test/PgSqlConnectionTest.php
index 61d1657..b33f297 100644
--- a/test/PgSqlConnectionTest.php
+++ b/test/PgSqlConnectionTest.php
@@ -4,6 +4,7 @@
use Amp\Postgres\ByteA;
use Amp\Postgres\PgSqlConnection;
+use Amp\Postgres\PostgresConfig;
use Amp\Postgres\PostgresLink;
use Revolt\EventLoop;
use function Amp\Postgres\Internal\cast;
@@ -39,7 +40,13 @@ public function createLink(string $connectionString): PostgresLink
}
}
- return $this->newConnection(PgSqlConnection::class, $this->handle, $socket, 'mock-connection');
+ return $this->newConnection(
+ PgSqlConnection::class,
+ $this->handle,
+ $socket,
+ 'mock-connection',
+ PostgresConfig::fromString($connectionString),
+ );
}
private function cast(mixed $param): mixed
diff --git a/test/PgSqlNestedTransactionTest.php b/test/PgSqlNestedTransactionTest.php
index 36fda6f..bd88286 100644
--- a/test/PgSqlNestedTransactionTest.php
+++ b/test/PgSqlNestedTransactionTest.php
@@ -4,41 +4,15 @@
use Amp\Postgres\PgSqlConnection;
use Amp\Postgres\PostgresConfig;
-use Amp\Postgres\PostgresLink;
-use Amp\Postgres\PostgresNestableTransaction;
-use Amp\Postgres\PostgresTransaction;
+use Amp\Postgres\PostgresConnection;
/**
* @requires extension pgsql
*/
-class PgSqlNestedTransactionTest extends AbstractLinkTest
+class PgSqlNestedTransactionTest extends AbstractNestedTransactionTest
{
- protected PgSqlConnection $connection;
- protected PostgresTransaction $transaction;
-
- public function createLink(string $connectionString): PostgresLink
- {
- $connectionConfig = PostgresConfig::fromString($connectionString);
- $connection = PgSqlConnection::connect($connectionConfig);
-
- $connection->query(self::DROP_QUERY);
-
- $connection->query(self::CREATE_QUERY);
-
- foreach ($this->getParams() as $row) {
- $connection->execute(self::INSERT_QUERY, $row);
- }
-
- $this->connection = $connection;
- $this->transaction = $connection->beginTransaction();
-
- return new PostgresNestableTransaction($this->transaction);
- }
-
- public function tearDown(): void
+ public function connect(PostgresConfig $connectionConfig): PostgresConnection
{
- $this->transaction->rollback();
-
- parent::tearDown();
+ return PgSqlConnection::connect($connectionConfig);
}
}
diff --git a/test/PgSqlPoolTest.php b/test/PgSqlPoolTest.php
index 91843a5..304c9a1 100644
--- a/test/PgSqlPoolTest.php
+++ b/test/PgSqlPoolTest.php
@@ -32,16 +32,24 @@ public function createLink(string $connectionString): PostgresLink
$this->handles[] = \pg_connect($connectionString, \PGSQL_CONNECT_FORCE_NEW);
}
+ $config = PostgresConfig::fromString($connectionString);
+
$connector = $this->createMock(SqlConnector::class);
$connector->method('connect')
- ->willReturnCallback(function (): PgSqlConnection {
+ ->willReturnCallback(function () use ($config): PgSqlConnection {
static $count = 0;
if (!isset($this->handles[$count])) {
$this->fail("createConnection called too many times");
}
$handle = $this->handles[$count];
++$count;
- return $this->newConnection(PgSqlConnection::class, $handle, \pg_socket($handle), 'mock-connection');
+ return $this->newConnection(
+ PgSqlConnection::class,
+ $handle,
+ \pg_socket($handle),
+ 'mock-connection',
+ $config,
+ );
});
$pool = new PostgresConnectionPool(new PostgresConfig('localhost'), \count($this->handles), ConnectionPool::DEFAULT_IDLE_TIMEOUT, true, $connector);
diff --git a/test/PqConnectionTest.php b/test/PqConnectionTest.php
index 10a26e4..dc2248e 100644
--- a/test/PqConnectionTest.php
+++ b/test/PqConnectionTest.php
@@ -5,6 +5,7 @@
use Amp\Postgres\ByteA;
use Amp\Postgres\Internal\PqBufferedResultSet;
use Amp\Postgres\Internal\PqUnbufferedResultSet;
+use Amp\Postgres\PostgresConfig;
use Amp\Postgres\PostgresLink;
use Amp\Postgres\PqConnection;
use function Amp\Postgres\Internal\cast;
@@ -39,7 +40,7 @@ public function createLink(string $connectionString): PostgresLink
}
}
- return $this->newConnection(PqConnection::class, $this->handle);
+ return $this->newConnection(PqConnection::class, $this->handle, PostgresConfig::fromString($connectionString));
}
private function cast(mixed $param): mixed
@@ -60,12 +61,12 @@ public function tearDown(): void
public function testBufferedResults(): void
{
- \assert($this->link instanceof PqConnection);
- $this->link->shouldBufferResults();
+ \assert($this->executor instanceof PqConnection);
+ $this->executor->shouldBufferResults();
- $this->assertTrue($this->link->isBufferingResults());
+ $this->assertTrue($this->executor->isBufferingResults());
- $result = $this->link->query("SELECT * FROM test");
+ $result = $this->executor->query("SELECT * FROM test");
\assert($result instanceof PqBufferedResultSet);
$data = $this->getData();
@@ -77,12 +78,12 @@ public function testBufferedResults(): void
*/
public function testUnbufferedResults(): void
{
- \assert($this->link instanceof PqConnection);
- $this->link->shouldNotBufferResults();
+ \assert($this->executor instanceof PqConnection);
+ $this->executor->shouldNotBufferResults();
- $this->assertFalse($this->link->isBufferingResults());
+ $this->assertFalse($this->executor->isBufferingResults());
- $result = $this->link->query("SELECT * FROM test");
+ $result = $this->executor->query("SELECT * FROM test");
\assert($result instanceof PqUnbufferedResultSet);
$data = $this->getData();
@@ -91,7 +92,7 @@ public function testUnbufferedResults(): void
public function testNextResultBeforeConsumption()
{
- $result = $this->link->query("SELECT * FROM test; SELECT * FROM test;");
+ $result = $this->executor->query("SELECT * FROM test; SELECT * FROM test;");
$result = $result->getNextResult();
@@ -100,11 +101,11 @@ public function testNextResultBeforeConsumption()
public function testUnconsumedMultiResult()
{
- $result = $this->link->query("SELECT * FROM test; SELECT * FROM test");
+ $result = $this->executor->query("SELECT * FROM test; SELECT * FROM test");
unset($result);
- $result = $this->link->query("SELECT * FROM test; SELECT * FROM test");
+ $result = $this->executor->query("SELECT * FROM test; SELECT * FROM test");
$this->verifyResult($result, $this->getData());
diff --git a/test/PqNestedTransactionTest.php b/test/PqNestedTransactionTest.php
index edc9178..d7e417b 100644
--- a/test/PqNestedTransactionTest.php
+++ b/test/PqNestedTransactionTest.php
@@ -3,40 +3,16 @@
namespace Amp\Postgres\Test;
use Amp\Postgres\PostgresConfig;
-use Amp\Postgres\PostgresLink;
-use Amp\Postgres\PostgresNestableTransaction;
-use Amp\Postgres\PostgresTransaction;
+use Amp\Postgres\PostgresConnection;
use Amp\Postgres\PqConnection;
/**
* @requires extension pq
*/
-class PqNestedTransactionTest extends AbstractLinkTest
+class PqNestedTransactionTest extends AbstractNestedTransactionTest
{
- protected PostgresTransaction $transaction;
-
- public function createLink(string $connectionString): PostgresLink
- {
- $connectionConfig = PostgresConfig::fromString($connectionString);
- $connection = PqConnection::connect($connectionConfig);
-
- $connection->query(self::DROP_QUERY);
-
- $connection->query(self::CREATE_QUERY);
-
- foreach ($this->getParams() as $row) {
- $connection->execute(self::INSERT_QUERY, $row);
- }
-
- $this->transaction = $connection->beginTransaction();
-
- return new PostgresNestableTransaction($this->transaction);
- }
-
- public function tearDown(): void
+ public function connect(PostgresConfig $connectionConfig): PostgresConnection
{
- //$this->transaction->rollback();
-
- parent::tearDown();
+ return PqConnection::connect($connectionConfig);
}
}
diff --git a/test/PqPoolTest.php b/test/PqPoolTest.php
index e16d307..c2e88d0 100644
--- a/test/PqPoolTest.php
+++ b/test/PqPoolTest.php
@@ -29,16 +29,18 @@ public function createLink(string $connectionString): PostgresLink
$handle->unbuffered = true;
}
+ $config = PostgresConfig::fromString($connectionString);
+
$connector = $this->createMock(SqlConnector::class);
$connector->method('connect')
- ->willReturnCallback(function (): PqConnection {
+ ->willReturnCallback(function () use ($config): PqConnection {
static $count = 0;
if (!isset($this->handles[$count])) {
$this->fail("createConnection called too many times");
}
$handle = $this->handles[$count];
++$count;
- return $this->newConnection(PqConnection::class, $handle);
+ return $this->newConnection(PqConnection::class, $handle, $config);
});
$pool = new PostgresConnectionPool(new PostgresConfig('localhost'), \count($this->handles), ConnectionPool::DEFAULT_IDLE_TIMEOUT, true, $connector);