Skip to content

Commit

Permalink
WIP
Browse files Browse the repository at this point in the history
  • Loading branch information
trowski committed Nov 20, 2023
1 parent 5dc5e01 commit 28df9a6
Show file tree
Hide file tree
Showing 36 changed files with 496 additions and 394 deletions.
10 changes: 5 additions & 5 deletions composer.json
Original file line number Diff line number Diff line change
Expand Up @@ -22,18 +22,18 @@
"php": ">=8.1",
"amphp/amp": "^3",
"amphp/pipeline": "^1",
"amphp/sql": "^2",
"amphp/sql-common": "^2"
"amphp/sql": "dev-refactor-transactions as 2.0",
"amphp/sql-common": "dev-refactor-transactions as 2.0"
},
"require-dev": {
"ext-pgsql": "*",
"ext-pq": "*",
"amphp/phpunit-util": "^3",
"phpunit/phpunit": "^9",
"amphp/php-cs-fixer-config": "^2-dev",
"psalm/phar": "^5.4"
"amphp/php-cs-fixer-config": "^2",
"psalm/phar": "^5.11"
},
"minimum-stability": "dev",
"minimum-stability": "beta",
"prefer-stable": true,
"autoload": {
"psr-4": {
Expand Down
5 changes: 3 additions & 2 deletions src/DefaultPostgresConnector.php
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,13 @@
namespace Amp\Postgres;

use Amp\Cancellation;
use Amp\Postgres\Internal\PostgresHandleConnection;
use Amp\Sql\SqlConfig;
use Amp\Sql\SqlConnector;
use Amp\Sql\SqlException;

/**
* @implements SqlConnector<PostgresConfig, PostgresConnection>
* @implements SqlConnector<PostgresConfig, PostgresHandleConnection>
*/
final class DefaultPostgresConnector implements SqlConnector
{
Expand All @@ -17,7 +18,7 @@ final class DefaultPostgresConnector implements SqlConnector
*
* @throws \Error If neither ext-pgsql nor pecl-pq is loaded.
*/
public function connect(SqlConfig $config, ?Cancellation $cancellation = null): PostgresConnection
public function connect(SqlConfig $config, ?Cancellation $cancellation = null): PostgresHandleConnection
{
if (!$config instanceof PostgresConfig) {
throw new \TypeError(\sprintf("Must provide an instance of %s to Postgres connectors", PostgresConfig::class));
Expand Down
8 changes: 7 additions & 1 deletion src/Internal/AbstractHandle.php
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
use Amp\DeferredFuture;
use Amp\Pipeline\Queue;
use Amp\Postgres\ByteA;
use Amp\Postgres\PostgresHandle;
use Amp\Postgres\PostgresConfig;
use Amp\Sql\ConnectionException;
use Revolt\EventLoop;

Expand All @@ -22,6 +22,7 @@ abstract class AbstractHandle implements PostgresHandle
protected int $lastUsedAt = 0;

public function __construct(
private readonly PostgresConfig $config,
protected readonly string $poll,
protected readonly string $await,
private readonly DeferredFuture $onClose,
Expand All @@ -36,6 +37,11 @@ public function __destruct()
}
}

public function getConfig(): PostgresConfig
{
return $this->config;
}

public function getLastUsedAt(): int
{
return $this->lastUsedAt;
Expand Down
12 changes: 8 additions & 4 deletions src/Internal/PgSqlHandle.php
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
use Amp\DeferredFuture;
use Amp\Future;
use Amp\Pipeline\Queue;
use Amp\Postgres\PostgresHandle;
use Amp\Postgres\PostgresConfig;
use Amp\Postgres\PostgresListener;
use Amp\Postgres\PostgresNotification;
use Amp\Postgres\PostgresResult;
Expand Down Expand Up @@ -54,8 +54,12 @@ final class PgSqlHandle extends AbstractHandle
* @param resource $socket PostgreSQL connection stream socket.
* @param string $id Connection identifier for determining which cached type table to use.
*/
public function __construct(\PgSql\Connection $handle, $socket, string $id)
{
public function __construct(
\PgSql\Connection $handle,
$socket,
string $id,
PostgresConfig $config,
) {
$this->handle = $handle;

$handle = &$this->handle;
Expand Down Expand Up @@ -151,7 +155,7 @@ public function __construct(\PgSql\Connection $handle, $socket, string $id)
EventLoop::unreference($poll);
EventLoop::disable($await);

parent::__construct($poll, $await, $onClose);
parent::__construct($config, $poll, $await, $onClose);

$this->types = (self::$typeCache[$id] ??= self::fetchTypes($handle));
}
Expand Down
1 change: 0 additions & 1 deletion src/Internal/PostgresConnectionStatement.php
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
namespace Amp\Postgres\Internal;

use Amp\DeferredFuture;
use Amp\Postgres\PostgresHandle;
use Amp\Postgres\PostgresStatement;
use Amp\Sql\Result;
use Amp\Sql\SqlException;
Expand Down
36 changes: 33 additions & 3 deletions src/Internal/PostgresConnectionTransaction.php
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
namespace Amp\Postgres\Internal;

use Amp\DeferredFuture;
use Amp\Postgres\PostgresHandle;
use Amp\Postgres\PostgresResult;
use Amp\Postgres\PostgresStatement;
use Amp\Postgres\PostgresTransaction;
Expand All @@ -13,7 +12,7 @@
use Revolt\EventLoop;

/** @internal */
final class PostgresConnectionTransaction implements PostgresTransaction
final class PostgresConnectionTransaction implements PostgresNestableTransaction
{
/** @var \Closure():void */
private readonly \Closure $release;
Expand All @@ -31,7 +30,7 @@ final class PostgresConnectionTransaction implements PostgresTransaction
public function __construct(
private readonly PostgresHandle $handle,
\Closure $release,
private readonly TransactionIsolation $isolation
private readonly TransactionIsolation $isolation,
) {
$refCount = &$this->refCount;
$statements = &$this->statements;
Expand Down Expand Up @@ -73,6 +72,11 @@ public function getLastUsedAt(): int
return $this->handle->getLastUsedAt();
}

public function isNestedTransaction(): bool
{
return false;
}

private function assertOpen(): void
{
if ($this->isClosed()) {
Expand Down Expand Up @@ -169,6 +173,22 @@ public function execute(string $sql, array $params = []): PostgresResult
return new PostgresPooledResult($result, $this->release);
}

public function beginTransaction(): PostgresTransaction
{
$this->assertOpen();

++$this->refCount;
try {
$identifier = bin2hex(\random_bytes(4));
$this->createSavepoint($identifier);
} catch (\Throwable $exception) {
EventLoop::queue($this->release);
throw $exception;
}

return new PostgresNestedTransaction($this, $identifier, $this->release);
}

/**
* @throws TransactionError If the transaction has been committed or rolled back.
*/
Expand Down Expand Up @@ -238,6 +258,16 @@ public function releaseSavepoint(string $identifier): void
$this->query("RELEASE SAVEPOINT " . $this->quoteName($identifier));
}

public function onCommit(\Closure $onCommit): void
{
// TODO: Implement onCommit() method.
}

public function onRollback(\Closure $onRollback): void
{
// TODO: Implement onRollback() method.
}

/**
* @throws TransactionError If the transaction has been committed or rolled back.
*/
Expand Down
46 changes: 46 additions & 0 deletions src/Internal/PostgresHandle.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
<?php declare(strict_types=1);

namespace Amp\Postgres\Internal;

use Amp\Postgres\PostgresConfig;
use Amp\Postgres\PostgresListener;
use Amp\Postgres\PostgresQuoter;
use Amp\Postgres\PostgresResult;
use Amp\Postgres\PostgresStatement;
use Amp\Sql\TransientResource;

/** @internal */
interface PostgresHandle extends PostgresQuoter, TransientResource
{
public const STATEMENT_NAME_PREFIX = "amp_";

public function getConfig(): PostgresConfig;

public function query(string $sql): PostgresResult;

public function execute(string $sql, array $params = []): PostgresResult;

public function prepare(string $sql): PostgresStatement;

/**
* @param non-empty-string $channel
*/
public function notify(string $channel, string $payload = ""): PostgresResult;

/**
* @param non-empty-string $channel
*/
public function listen(string $channel): PostgresListener;

/**
* Execute the statement with the given name and parameters.
*
* @param list<mixed> $params List of statement parameters, indexed starting at 0.
*/
public function statementExecute(string $name, array $params): PostgresResult;

/**
* Deallocate the statement with the given name.
*/
public function statementDeallocate(string $name): void;
}
Loading

0 comments on commit 28df9a6

Please sign in to comment.