Skip to content

Commit

Permalink
WIP
Browse files Browse the repository at this point in the history
  • Loading branch information
trowski committed Nov 24, 2023
1 parent 5dc5e01 commit a22029f
Show file tree
Hide file tree
Showing 35 changed files with 524 additions and 411 deletions.
10 changes: 5 additions & 5 deletions composer.json
Original file line number Diff line number Diff line change
Expand Up @@ -22,18 +22,18 @@
"php": ">=8.1",
"amphp/amp": "^3",
"amphp/pipeline": "^1",
"amphp/sql": "^2",
"amphp/sql-common": "^2"
"amphp/sql": "dev-refactor-transactions as 2.0",
"amphp/sql-common": "dev-refactor-transactions as 2.0"
},
"require-dev": {
"ext-pgsql": "*",
"ext-pq": "*",
"amphp/phpunit-util": "^3",
"phpunit/phpunit": "^9",
"amphp/php-cs-fixer-config": "^2-dev",
"psalm/phar": "^5.4"
"amphp/php-cs-fixer-config": "^2",
"psalm/phar": "^5.11"
},
"minimum-stability": "dev",
"minimum-stability": "beta",
"prefer-stable": true,
"autoload": {
"psr-4": {
Expand Down
5 changes: 3 additions & 2 deletions src/DefaultPostgresConnector.php
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,13 @@
namespace Amp\Postgres;

use Amp\Cancellation;
use Amp\Postgres\Internal\PostgresHandleConnection;
use Amp\Sql\SqlConfig;
use Amp\Sql\SqlConnector;
use Amp\Sql\SqlException;

/**
* @implements SqlConnector<PostgresConfig, PostgresConnection>
* @implements SqlConnector<PostgresConfig, PostgresHandleConnection>
*/
final class DefaultPostgresConnector implements SqlConnector
{
Expand All @@ -17,7 +18,7 @@ final class DefaultPostgresConnector implements SqlConnector
*
* @throws \Error If neither ext-pgsql nor pecl-pq is loaded.
*/
public function connect(SqlConfig $config, ?Cancellation $cancellation = null): PostgresConnection
public function connect(SqlConfig $config, ?Cancellation $cancellation = null): PostgresHandleConnection
{
if (!$config instanceof PostgresConfig) {
throw new \TypeError(\sprintf("Must provide an instance of %s to Postgres connectors", PostgresConfig::class));
Expand Down
23 changes: 22 additions & 1 deletion src/Internal/AbstractHandle.php
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
use Amp\DeferredFuture;
use Amp\Pipeline\Queue;
use Amp\Postgres\ByteA;
use Amp\Postgres\PostgresHandle;
use Amp\Postgres\PostgresConfig;
use Amp\Sql\ConnectionException;
use Revolt\EventLoop;

Expand All @@ -22,6 +22,7 @@ abstract class AbstractHandle implements PostgresHandle
protected int $lastUsedAt = 0;

public function __construct(
private readonly PostgresConfig $config,
protected readonly string $poll,
protected readonly string $await,
private readonly DeferredFuture $onClose,
Expand All @@ -36,6 +37,11 @@ public function __destruct()
}
}

public function getConfig(): PostgresConfig
{
return $this->config;
}

public function getLastUsedAt(): int
{
return $this->lastUsedAt;
Expand Down Expand Up @@ -87,4 +93,19 @@ protected function escapeParams(array $params): array
default => $param,
}, $params);
}

public function createSavepoint(string $identifier): void
{
$this->query("SAVEPOINT " . $this->quoteName($identifier));
}

public function rollbackTo(string $identifier): void
{
$this->query("ROLLBACK TO " . $this->quoteName($identifier));
}

public function releaseSavepoint(string $identifier): void
{
$this->query("RELEASE SAVEPOINT " . $this->quoteName($identifier));
}
}
12 changes: 8 additions & 4 deletions src/Internal/PgSqlHandle.php
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
use Amp\DeferredFuture;
use Amp\Future;
use Amp\Pipeline\Queue;
use Amp\Postgres\PostgresHandle;
use Amp\Postgres\PostgresConfig;
use Amp\Postgres\PostgresListener;
use Amp\Postgres\PostgresNotification;
use Amp\Postgres\PostgresResult;
Expand Down Expand Up @@ -54,8 +54,12 @@ final class PgSqlHandle extends AbstractHandle
* @param resource $socket PostgreSQL connection stream socket.
* @param string $id Connection identifier for determining which cached type table to use.
*/
public function __construct(\PgSql\Connection $handle, $socket, string $id)
{
public function __construct(
\PgSql\Connection $handle,
$socket,
string $id,
PostgresConfig $config,
) {
$this->handle = $handle;

$handle = &$this->handle;
Expand Down Expand Up @@ -151,7 +155,7 @@ public function __construct(\PgSql\Connection $handle, $socket, string $id)
EventLoop::unreference($poll);
EventLoop::disable($await);

parent::__construct($poll, $await, $onClose);
parent::__construct($config, $poll, $await, $onClose);

$this->types = (self::$typeCache[$id] ??= self::fetchTypes($handle));
}
Expand Down
1 change: 0 additions & 1 deletion src/Internal/PostgresConnectionStatement.php
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
namespace Amp\Postgres\Internal;

use Amp\DeferredFuture;
use Amp\Postgres\PostgresHandle;
use Amp\Postgres\PostgresStatement;
use Amp\Sql\Result;
use Amp\Sql\SqlException;
Expand Down
88 changes: 49 additions & 39 deletions src/Internal/PostgresConnectionTransaction.php
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
namespace Amp\Postgres\Internal;

use Amp\DeferredFuture;
use Amp\Postgres\PostgresHandle;
use Amp\Postgres\PostgresResult;
use Amp\Postgres\PostgresStatement;
use Amp\Postgres\PostgresTransaction;
Expand All @@ -22,6 +21,8 @@ final class PostgresConnectionTransaction implements PostgresTransaction

private readonly DeferredFuture $onClose;

private ?DeferredFuture $busy = null;

/** @var array<int, PostgresStatement> Reference statements so de-allocation occurs after commit/rollback. */
private array $statements = [];

Expand All @@ -31,11 +32,15 @@ final class PostgresConnectionTransaction implements PostgresTransaction
public function __construct(
private readonly PostgresHandle $handle,
\Closure $release,
private readonly TransactionIsolation $isolation
private readonly TransactionIsolation $isolation,
) {
$busy = &$this->busy;
$refCount = &$this->refCount;
$statements = &$this->statements;
$this->release = static function () use (&$refCount, &$statements, $release): void {
$this->release = static function () use (&$busy, &$refCount, &$statements, $release): void {
$busy?->complete();
$busy = null;

if (--$refCount === 0) {
$release();
$statements = [];
Expand Down Expand Up @@ -73,6 +78,11 @@ public function getLastUsedAt(): int
return $this->handle->getLastUsedAt();
}

public function isNestedTransaction(): bool
{
return false;
}

private function assertOpen(): void
{
if ($this->isClosed()) {
Expand Down Expand Up @@ -118,7 +128,7 @@ public function getIsolationLevel(): TransactionIsolation
*/
public function query(string $sql): PostgresResult
{
$this->assertOpen();
$this->awaitPendingNestedTransaction();

++$this->refCount;
try {
Expand All @@ -136,7 +146,7 @@ public function query(string $sql): PostgresResult
*/
public function prepare(string $sql): PostgresStatement
{
$this->assertOpen();
$this->awaitPendingNestedTransaction();

++$this->refCount;
try {
Expand All @@ -156,7 +166,7 @@ public function prepare(string $sql): PostgresStatement
*/
public function execute(string $sql, array $params = []): PostgresResult
{
$this->assertOpen();
$this->awaitPendingNestedTransaction();

++$this->refCount;
try {
Expand All @@ -169,12 +179,29 @@ public function execute(string $sql, array $params = []): PostgresResult
return new PostgresPooledResult($result, $this->release);
}

public function beginTransaction(): PostgresTransaction
{
$this->awaitPendingNestedTransaction();

++$this->refCount;
$this->busy = new DeferredFuture();
try {
$identifier = bin2hex(\random_bytes(4));
$this->handle->createSavepoint($identifier);
} catch (\Throwable $exception) {
EventLoop::queue($this->release);
throw $exception;
}

return new PostgresNestedTransaction($this, $this->handle, $identifier, $this->release);
}

/**
* @throws TransactionError If the transaction has been committed or rolled back.
*/
public function notify(string $channel, string $payload = ""): PostgresResult
{
$this->assertOpen();
$this->awaitPendingNestedTransaction();
return $this->handle->notify($channel, $payload);
}

Expand All @@ -185,7 +212,7 @@ public function notify(string $channel, string $payload = ""): PostgresResult
*/
public function commit(): void
{
$this->assertOpen();
$this->awaitPendingNestedTransaction();
$this->onClose->complete();
$this->handle->query("COMMIT");
}
Expand All @@ -197,45 +224,19 @@ public function commit(): void
*/
public function rollback(): void
{
$this->assertOpen();
$this->awaitPendingNestedTransaction();
$this->onClose->complete();
$this->handle->query("ROLLBACK");
}

/**
* Creates a savepoint with the given identifier.
*
* @param string $identifier Savepoint identifier.
*
* @throws TransactionError If the transaction has been committed or rolled back.
*/
public function createSavepoint(string $identifier): void
{
$this->query("SAVEPOINT " . $this->quoteName($identifier));
}

/**
* Rolls back to the savepoint with the given identifier.
*
* @param string $identifier Savepoint identifier.
*
* @throws TransactionError If the transaction has been committed or rolled back.
*/
public function rollbackTo(string $identifier): void
public function onCommit(\Closure $onCommit): void
{
$this->query("ROLLBACK TO " . $this->quoteName($identifier));
// TODO: Implement onCommit() method.
}

/**
* Releases the savepoint with the given identifier.
*
* @param string $identifier Savepoint identifier.
*
* @throws TransactionError If the transaction has been committed or rolled back.
*/
public function releaseSavepoint(string $identifier): void
public function onRollback(\Closure $onRollback): void
{
$this->query("RELEASE SAVEPOINT " . $this->quoteName($identifier));
// TODO: Implement onRollback() method.
}

/**
Expand Down Expand Up @@ -264,4 +265,13 @@ public function escapeByteA(string $data): string
$this->assertOpen();
return $this->handle->escapeByteA($data);
}

private function awaitPendingNestedTransaction(): void
{
while ($this->busy) {
$this->busy->getFuture()->await();
}

$this->assertOpen();
}
}
46 changes: 46 additions & 0 deletions src/Internal/PostgresHandle.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
<?php declare(strict_types=1);

namespace Amp\Postgres\Internal;

use Amp\Postgres\PostgresConfig;
use Amp\Postgres\PostgresListener;
use Amp\Postgres\PostgresQuoter;
use Amp\Postgres\PostgresResult;
use Amp\Postgres\PostgresStatement;
use Amp\Sql\Common\NestableTransactionExecutor;

/** @internal */
interface PostgresHandle extends PostgresQuoter, NestableTransactionExecutor
{
public const STATEMENT_NAME_PREFIX = "amp_";

public function getConfig(): PostgresConfig;

public function query(string $sql): PostgresResult;

public function execute(string $sql, array $params = []): PostgresResult;

public function prepare(string $sql): PostgresStatement;

/**
* @param non-empty-string $channel
*/
public function notify(string $channel, string $payload = ""): PostgresResult;

/**
* @param non-empty-string $channel
*/
public function listen(string $channel): PostgresListener;

/**
* Execute the statement with the given name and parameters.
*
* @param list<mixed> $params List of statement parameters, indexed starting at 0.
*/
public function statementExecute(string $name, array $params): PostgresResult;

/**
* Deallocate the statement with the given name.
*/
public function statementDeallocate(string $name): void;
}
Loading

0 comments on commit a22029f

Please sign in to comment.