Skip to content

Commit

Permalink
Update to use ConnectionTransaction
Browse files Browse the repository at this point in the history
  • Loading branch information
trowski committed Dec 4, 2023
1 parent 4c53678 commit 8efff4b
Show file tree
Hide file tree
Showing 8 changed files with 52 additions and 283 deletions.
10 changes: 10 additions & 0 deletions src/Internal/AbstractHandle.php
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,16 @@ protected function escapeParams(array $params): array
}, $params);
}

public function commit(): void
{
$this->query("COMMIT");
}

public function rollback(): void
{
$this->query("ROLLBACK");
}

public function createSavepoint(string $identifier): void
{
$this->query("SAVEPOINT " . $this->quoteName($identifier));
Expand Down
2 changes: 1 addition & 1 deletion src/Internal/PgSqlHandle.php
Original file line number Diff line number Diff line change
Expand Up @@ -391,7 +391,7 @@ public function prepare(string $sql): PostgresStatement

$modifiedSql = parseNamedParams($sql, $names);

$name = PostgresHandle::STATEMENT_NAME_PREFIX . \sha1($modifiedSql);
$name = self::STATEMENT_NAME_PREFIX . \sha1($modifiedSql);

while (isset($this->statements[$name])) {
$storage = $this->statements[$name];
Expand Down
277 changes: 22 additions & 255 deletions src/Internal/PostgresConnectionTransaction.php
Original file line number Diff line number Diff line change
Expand Up @@ -2,276 +2,43 @@

namespace Amp\Postgres\Internal;

use Amp\DeferredFuture;
use Amp\Postgres\PostgresExecutor;
use Amp\Postgres\PostgresResult;
use Amp\Postgres\PostgresStatement;
use Amp\Postgres\PostgresTransaction;
use Amp\Sql\SqlException;
use Amp\Sql\TransactionError;
use Amp\Sql\Common\ConnectionTransaction;
use Amp\Sql\Common\NestableTransactionExecutor;
use Amp\Sql\Transaction;
use Amp\Sql\TransactionIsolation;
use Revolt\EventLoop;

/** @internal */
final class PostgresConnectionTransaction implements PostgresTransaction
/**
* @internal
* @extends ConnectionTransaction<PostgresResult, PostgresStatement, PostgresTransaction, PostgresHandle>
*/
final class PostgresConnectionTransaction extends ConnectionTransaction implements PostgresTransaction
{
/** @var \Closure():void */
private readonly \Closure $release;
use PostgresTransactionDelegate;

private int $refCount = 1;

private readonly DeferredFuture $onClose;

private ?DeferredFuture $busy = null;

/** @var array<int, PostgresStatement> Reference statements so de-allocation occurs after commit/rollback. */
private array $statements = [];

/**
* @param \Closure():void $release
*/
public function __construct(
private readonly PostgresHandle $handle,
\Closure $release,
private readonly TransactionIsolation $isolation,
TransactionIsolation $isolation
) {
$busy = &$this->busy;
$refCount = &$this->refCount;
$statements = &$this->statements;
$this->release = static function () use (&$busy, &$refCount, &$statements, $release): void {
$busy?->complete();
$busy = null;

if (--$refCount === 0) {
$release();
$statements = [];
}
};

$this->onClose = new DeferredFuture();
$this->onClose($this->release);
}

public function __destruct()
{
if ($this->onClose->isComplete()) {
return;
}

$this->onClose->complete();

if ($this->handle->isClosed()) {
return;
}

$handle = $this->handle;
EventLoop::queue(static function () use ($handle): void {
try {
!$handle->isClosed() && $handle->query('ROLLBACK');
} catch (SqlException) {
// Ignore failure if connection closes during query.
}
});
}

public function getLastUsedAt(): int
{
return $this->handle->getLastUsedAt();
}

public function isNestedTransaction(): bool
{
return false;
}

private function assertOpen(): void
{
if ($this->isClosed()) {
throw new TransactionError("The transaction has been committed or rolled back");
}
}

/**
* Rolls back all changes in the transaction if it has not been committed.
*/
public function close(): void
{
if (!$this->isClosed()) {
$this->rollback(); // Invokes $this->release callback.
}
}

public function isClosed(): bool
{
return $this->onClose->isComplete();
}

public function onClose(\Closure $onClose): void
{
$this->onClose->getFuture()->finally($onClose);
}

/**
* @return bool True if the transaction is active, false if it has been committed or rolled back.
*/
public function isActive(): bool
{
return !$this->isClosed();
}

public function getIsolationLevel(): TransactionIsolation
{
return $this->isolation;
}

/**
* @throws TransactionError If the transaction has been committed or rolled back.
*/
public function query(string $sql): PostgresResult
{
$this->awaitPendingNestedTransaction();

++$this->refCount;
try {
$result = $this->handle->query($sql);
} catch (\Throwable $exception) {
EventLoop::queue($this->release);
throw $exception;
}

return new PostgresPooledResult($result, $this->release);
parent::__construct($handle, $release, $isolation);
}

/**
* @throws TransactionError If the transaction has been committed or rolled back.
*/
public function prepare(string $sql): PostgresStatement
{
$this->awaitPendingNestedTransaction();

++$this->refCount;
try {
$statement = $this->handle->prepare($sql);
} catch (\Throwable $exception) {
EventLoop::queue($this->release);
throw $exception;
}

$this->statements[\spl_object_id($statement)] ??= $statement;

return new PostgresPooledStatement($statement, $this->release);
}

/**
* @throws TransactionError If the transaction has been committed or rolled back.
*/
public function execute(string $sql, array $params = []): PostgresResult
{
$this->awaitPendingNestedTransaction();

++$this->refCount;
try {
$result = $this->handle->execute($sql, $params);
} catch (\Throwable $exception) {
EventLoop::queue($this->release);
throw $exception;
}

return new PostgresPooledResult($result, $this->release);
}

public function beginTransaction(): PostgresTransaction
{
$this->awaitPendingNestedTransaction();

++$this->refCount;
$this->busy = new DeferredFuture();
try {
$identifier = \bin2hex(\random_bytes(8));
$this->handle->createSavepoint($identifier);
} catch (\Throwable $exception) {
EventLoop::queue($this->release);
throw $exception;
}

return new PostgresNestedTransaction($this, $this->handle, $identifier, $this->release);
}

/**
* @throws TransactionError If the transaction has been committed or rolled back.
*/
public function notify(string $channel, string $payload = ""): PostgresResult
{
$this->awaitPendingNestedTransaction();
return $this->handle->notify($channel, $payload);
}

/**
* Commits the transaction and makes it inactive.
*
* @throws TransactionError If the transaction has been committed or rolled back.
*/
public function commit(): void
{
$this->awaitPendingNestedTransaction();
$this->onClose->complete();
$this->handle->query("COMMIT");
}

/**
* Rolls back the transaction and makes it inactive.
*
* @throws TransactionError If the transaction has been committed or rolled back.
*/
public function rollback(): void
{
$this->awaitPendingNestedTransaction();
$this->onClose->complete();
$this->handle->query("ROLLBACK");
}

public function onCommit(\Closure $onCommit): void
{
// TODO: Implement onCommit() method.
}

public function onRollback(\Closure $onRollback): void
{
// TODO: Implement onRollback() method.
}

/**
* @throws TransactionError If the transaction has been committed or rolled back.
*/
public function quoteString(string $data): string
{
$this->assertOpen();
return $this->handle->quoteString($data);
}

/**
* @throws TransactionError If the transaction has been committed or rolled back.
*/
public function quoteName(string $name): string
{
$this->assertOpen();
return $this->handle->quoteName($name);
}

/**
* @throws TransactionError If the transaction has been committed or rolled back.
*/
public function escapeByteA(string $data): string
{
$this->assertOpen();
return $this->handle->escapeByteA($data);
protected function createNestedTransaction(
Transaction $transaction,
NestableTransactionExecutor $executor,
string $identifier,
\Closure $release,
): PostgresTransaction {
\assert($executor instanceof PostgresHandle);
return new PostgresNestedTransaction($this, $executor, $identifier, $release);
}

private function awaitPendingNestedTransaction(): void
protected function getExecutor(): PostgresExecutor
{
while ($this->busy) {
$this->busy->getFuture()->await();
}

$this->assertOpen();
return $this->handle;
}
}
15 changes: 2 additions & 13 deletions src/Internal/PostgresHandle.php
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,8 @@
namespace Amp\Postgres\Internal;

use Amp\Postgres\PostgresConfig;
use Amp\Postgres\PostgresExecutor;
use Amp\Postgres\PostgresListener;
use Amp\Postgres\PostgresQuoter;
use Amp\Postgres\PostgresResult;
use Amp\Postgres\PostgresStatement;
use Amp\Sql\Common\NestableTransactionExecutor;
Expand All @@ -13,23 +13,12 @@
* @internal
* @extends NestableTransactionExecutor<PostgresResult, PostgresStatement>
*/
interface PostgresHandle extends PostgresQuoter, NestableTransactionExecutor
interface PostgresHandle extends PostgresExecutor, NestableTransactionExecutor
{
public const STATEMENT_NAME_PREFIX = "amp_";

public function getConfig(): PostgresConfig;

public function query(string $sql): PostgresResult;

public function execute(string $sql, array $params = []): PostgresResult;

public function prepare(string $sql): PostgresStatement;

/**
* @param non-empty-string $channel
*/
public function notify(string $channel, string $payload = ""): PostgresResult;

/**
* @param non-empty-string $channel
*/
Expand Down
6 changes: 2 additions & 4 deletions src/Internal/PostgresNestedTransaction.php
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

namespace Amp\Postgres\Internal;

use Amp\Postgres\PostgresExecutor;
use Amp\Postgres\PostgresResult;
use Amp\Postgres\PostgresStatement;
use Amp\Postgres\PostgresTransaction;
Expand Down Expand Up @@ -30,10 +31,7 @@ public function __construct(
parent::__construct($transaction, $handle, $identifier, $release);
}

/**
* Switches return type to this library's return type.
*/
protected function getTransaction(): PostgresTransaction
protected function getExecutor(): PostgresExecutor
{
return $this->transaction;
}
Expand Down
3 changes: 2 additions & 1 deletion src/Internal/PostgresPooledTransaction.php
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

namespace Amp\Postgres\Internal;

use Amp\Postgres\PostgresExecutor;
use Amp\Postgres\PostgresResult;
use Amp\Postgres\PostgresStatement;
use Amp\Postgres\PostgresTransaction;
Expand All @@ -24,7 +25,7 @@ public function __construct(private readonly PostgresTransaction $transaction, \
parent::__construct($transaction, $release);
}

protected function getTransaction(): PostgresTransaction
protected function getExecutor(): PostgresExecutor
{
return $this->transaction;
}
Expand Down
Loading

0 comments on commit 8efff4b

Please sign in to comment.