diff --git a/src/Internal/PgSqlHandle.php b/src/Internal/PgSqlHandle.php index b0b5fb8..fd0ec41 100644 --- a/src/Internal/PgSqlHandle.php +++ b/src/Internal/PgSqlHandle.php @@ -17,9 +17,19 @@ use Revolt\EventLoop; use function Amp\async; -/** @internal */ +/** + * @internal + * + * @psalm-type PgSqlTypeMap = array Map of OID to corresponding PgSqlType. + */ final class PgSqlHandle extends AbstractHandle { + private const TYPE_QUERY = << "severity", \PGSQL_DIAG_SQLSTATE => "sqlstate", @@ -35,7 +45,7 @@ final class PgSqlHandle extends AbstractHandle \PGSQL_DIAG_SOURCE_FUNCTION => "source_function", ]; - /** @var array> */ + /** @var array> */ private static array $typeCache; private static ?\Closure $errorHandler = null; @@ -43,8 +53,8 @@ final class PgSqlHandle extends AbstractHandle /** @var \PgSql\Connection PostgreSQL connection handle. */ private ?\PgSql\Connection $handle; - /** @var array */ - private readonly array $types; + /** @var PgSqlTypeMap|null */ + private ?array $types = null; /** @var array> */ private array $statements = []; @@ -57,13 +67,11 @@ final class PgSqlHandle extends AbstractHandle public function __construct( \PgSql\Connection $handle, $socket, - string $id, + private readonly string $id, PostgresConfig $config, ) { $this->handle = $handle; - $this->types = (self::$typeCache[$id] ??= self::fetchTypes($handle)); - $handle = &$this->handle; $lastUsedAt = &$this->lastUsedAt; $deferred = &$this->pendingOperation; @@ -171,35 +179,50 @@ public function __construct( } /** - * @return array + * @return Future */ - private static function fetchTypes(\PgSql\Connection $handle): array + private function fetchTypes(): Future { - $result = \pg_query($handle, "SELECT t.oid, t.typcategory, t.typname, t.typdelim, t.typelem - FROM pg_catalog.pg_type t JOIN pg_catalog.pg_namespace n ON t.typnamespace=n.oid - WHERE t.typisdefined AND n.nspname IN ('pg_catalog', 'public') ORDER BY t.oid"); + if ($this->handle === null) { + throw new \Error("The connection to the database has been closed"); + } + $result = \pg_send_query($this->handle, self::TYPE_QUERY); if ($result === false) { - throw new SqlException(\pg_last_error($handle)); + $this->close(); + throw new SqlException(\pg_last_error($this->handle)); } - $types = []; - while ($row = \pg_fetch_array($result, mode: \PGSQL_NUM)) { - [$oid, $typeCategory, $typeName, $delimiter, $element] = $row; + $this->pendingOperation = $queryDeferred = new DeferredFuture(); + $typesDeferred = new DeferredFuture(); - \assert( - \is_numeric($oid) && \is_numeric($element), - "OID and element type expected to be integers", - ); - \assert( - \is_string($typeCategory) && \is_string($typeName) && \is_string($delimiter), - "Unexpected types in type catalog query results", - ); - - $types[(int) $oid] = new PgSqlType($typeCategory, $typeName, $delimiter, (int) $element); + EventLoop::reference($this->poll); + if ($result === 0) { + EventLoop::enable($this->await); } - return $types; + EventLoop::queue(function () use ($queryDeferred, $typesDeferred): void { + try { + $result = $queryDeferred->getFuture()->await(); + if (\pg_result_status($result) !== \PGSQL_TUPLES_OK) { + throw new SqlException(\pg_result_error($result)); + } + + $types = []; + while ($row = \pg_fetch_array($result, mode: \PGSQL_NUM)) { + [$oid, $typeCategory, $typeName, $delimiter, $element] = $row; + $types[(int) $oid] = new PgSqlType($typeCategory, $typeName, $delimiter, (int) $element); + } + + $typesDeferred->complete($types); + } catch (\Throwable $exception) { + $this->close(); + $typesDeferred->error($exception); + unset(self::$typeCache[$this->id]); + } + }); + + return $typesDeferred->getFuture(); } private static function getErrorHandler(): \Closure @@ -224,12 +247,12 @@ public function isClosed(): bool * @param \Closure $function Function to execute. * @param mixed ...$args Arguments to pass to function. * - * @return \PgSql\Result - * * @throws SqlException */ private function send(\Closure $function, mixed ...$args): mixed { + $this->types ??= (self::$typeCache[$this->id] ??= $this->fetchTypes())->await(); + while ($this->pendingOperation) { try { $this->pendingOperation->getFuture()->await(); @@ -275,6 +298,8 @@ private function createResult(\PgSql\Result $result, string $sql): PostgresResul throw new \Error("The connection to the database has been closed"); } + \assert($this->types !== null, 'Expected type array to be populated before creating a result'); + switch (\pg_result_status($result)) { case \PGSQL_EMPTY_QUERY: throw new SqlQueryError("Empty query string");