diff --git a/connection.ts b/connection.ts index 68b6b08d..fd8b3988 100644 --- a/connection.ts +++ b/connection.ts @@ -34,10 +34,12 @@ export interface SendCommandOptions { } export interface Connection extends TypedEventTarget { + /** @deprecated */ name: string | null; isClosed: boolean; isConnected: boolean; close(): void; + [Symbol.dispose](): void; connect(): Promise; reconnect(): Promise; sendCommand( @@ -95,7 +97,15 @@ interface PendingCommand { reject: (error: unknown) => void; } -export class RedisConnection +export function createRedisConnection( + hostname: string, + port: number | string | undefined, + options: RedisConnectionOptions, +): Connection { + return new RedisConnection(hostname, port ?? 6379, options); +} + +class RedisConnection implements Connection, TypedEventTarget { name: string | null = null; private maxRetryCount = 10; @@ -306,7 +316,11 @@ export class RedisConnection } close() { - this.#close(false); + return this[Symbol.dispose](); + } + + [Symbol.dispose](): void { + return this.#close(false); } #close(canReconnect = false) { diff --git a/deno.json b/deno.json index e9602e04..d040b638 100644 --- a/deno.json +++ b/deno.json @@ -3,6 +3,7 @@ "version": "0.37.1", "exports": { ".": "./mod.ts", + "./experimental/pool": "./experimental/pool/mod.ts", "./experimental/cluster": "./experimental/cluster/mod.ts", "./experimental/web-streams-connection": "./experimental/web_streams_connection/mod.ts" }, diff --git a/executor.ts b/executor.ts index fc444725..3ab952fe 100644 --- a/executor.ts +++ b/executor.ts @@ -2,6 +2,9 @@ import type { Connection, SendCommandOptions } from "./connection.ts"; import type { RedisReply, RedisValue } from "./protocol/shared/types.ts"; export interface CommandExecutor { + /** + * @deprecated + */ readonly connection: Connection; /** * @deprecated diff --git a/experimental/pool/mod.ts b/experimental/pool/mod.ts new file mode 100644 index 00000000..a93d3de5 --- /dev/null +++ b/experimental/pool/mod.ts @@ -0,0 +1 @@ +export * from "../../pool/mod.ts"; diff --git a/pool/default_pool.ts b/pool/default_pool.ts new file mode 100644 index 00000000..09a12f6e --- /dev/null +++ b/pool/default_pool.ts @@ -0,0 +1,103 @@ +import type { Pool } from "./pool.ts"; + +class AlreadyRemovedFromPoolError extends Error { + constructor() { + super("This connection has already been removed from the pool."); + } +} + +const kDefaultTimeout = 5_000; +class DefaultPool implements Pool { + readonly #idle: Array = []; + readonly #connections: Array = []; + #connectionCount: number = 0; + readonly #deferredQueue: Array> = []; + readonly #options: Required>; + + constructor( + { + maxConnections = 8, + acquire, + }: PoolOptions, + ) { + this.#options = { + acquire, + maxConnections, + }; + } + + async acquire(signal?: AbortSignal): Promise { + signal ||= AbortSignal.timeout(kDefaultTimeout); + signal.throwIfAborted(); + if (this.#idle.length > 0) { + const conn = this.#idle.shift()!; + return Promise.resolve(conn); + } + + if (this.#connectionCount < this.#options.maxConnections) { + this.#connectionCount++; + try { + const connection = await this.#options.acquire(); + this.#connections.push(connection); + return connection; + } catch (error) { + this.#connectionCount--; + throw error; + } + } + + const deferred = Promise.withResolvers(); + this.#deferredQueue.push(deferred); + const { promise, reject } = deferred; + const onAbort = () => { + const i = this.#deferredQueue.indexOf(deferred); + if (i === -1) return; + this.#deferredQueue.splice(i, 1); + reject(signal.reason); + }; + signal.addEventListener("abort", onAbort, { once: true }); + return promise; + } + + #has(conn: T): boolean { + return this.#connections.includes(conn); + } + + release(conn: T): void { + if (!this.#has(conn)) { + throw new AlreadyRemovedFromPoolError(); + } else if (this.#deferredQueue.length > 0) { + const i = this.#deferredQueue.shift()!; + i.resolve(conn); + } else { + this.#idle.push(conn); + } + } + + close() { + const errors: Array = []; + for (const x of [...this.#connections]) { + try { + x[Symbol.dispose](); + } catch (error) { + errors.push(error); + } + } + this.#connections.length = 0; + this.#idle.length = 0; + if (errors.length > 0) { + throw new AggregateError(errors); + } + } +} + +export interface PoolOptions { + maxConnections?: number; + acquire(): Promise; +} + +export function createDefaultPool( + options: PoolOptions, +): Pool { + return new DefaultPool(options); +} diff --git a/pool/default_pool_test.ts b/pool/default_pool_test.ts new file mode 100644 index 00000000..290b0b46 --- /dev/null +++ b/pool/default_pool_test.ts @@ -0,0 +1,85 @@ +import { assert, assertEquals, assertRejects } from "../deps/std/assert.ts"; +import { createDefaultPool } from "./default_pool.ts"; + +class FakeConnection implements Disposable { + #isClosed = false; + isClosed() { + return this.#isClosed; + } + [Symbol.dispose]() { + if (this.#isClosed) { + throw new Error("Already closed"); + } + this.#isClosed = true; + } +} + +Deno.test({ + name: "DefaultPool", + permissions: "none", + fn: async () => { + const openConnections: Array = []; + const pool = createDefaultPool({ + acquire: () => { + const connection = new FakeConnection(); + openConnections.push(connection); + return Promise.resolve(connection); + }, + maxConnections: 2, + }); + assertEquals(openConnections, []); + + const signal = AbortSignal.timeout(200); + + const conn1 = await pool.acquire(signal); + assertEquals(openConnections, [conn1]); + assert(openConnections.every((x) => !x.isClosed())); + assert(!signal.aborted); + + const conn2 = await pool.acquire(signal); + assertEquals(openConnections, [conn1, conn2]); + assert(!conn2.isClosed()); + assert(openConnections.every((x) => !x.isClosed())); + assert(!signal.aborted); + + { + // Tests timeout handling + await assertRejects( + () => pool.acquire(signal), + "Intentionally aborted", + ); + assert(signal.aborted); + assertEquals(openConnections, [conn1, conn2]); + assert(openConnections.every((x) => !x.isClosed())); + } + + { + // Tests `release()` + pool.release(conn2); + assertEquals(openConnections, [conn1, conn2]); + + const conn = await pool.acquire(new AbortController().signal); + assert(conn === conn2, "A new connection should not be created"); + assertEquals(openConnections, [conn1, conn2]); + } + + { + // `Pool#acquire` should wait for an active connection to be released. + const signal = AbortSignal.timeout(3_000); + const promise = pool.acquire(signal); + setTimeout(() => { + pool.release(conn1); + }, 50); + const conn = await promise; + assert(conn === conn1, "A new connection should not be created"); + assertEquals(openConnections, [conn1, conn2]); + assert(!signal.aborted); + } + + { + // `Pool#close` closes all connections + pool.close(); + assert(openConnections.every((x) => x.isClosed())); + } + }, +}); diff --git a/pool/executor.ts b/pool/executor.ts new file mode 100644 index 00000000..034a95bb --- /dev/null +++ b/pool/executor.ts @@ -0,0 +1,51 @@ +import type { Connection, SendCommandOptions } from "../connection.ts"; +import type { Pool } from "./pool.ts"; +import type { CommandExecutor } from "../executor.ts"; +import { DefaultExecutor } from "../executor.ts"; +import type { RedisReply, RedisValue } from "../protocol/shared/types.ts"; + +export function createPooledExecutor(pool: Pool): CommandExecutor { + return new PooledExecutor(pool); +} + +class PooledExecutor implements CommandExecutor { + readonly #pool: Pool; + constructor(pool: Pool) { + this.#pool = pool; + } + + get connection(): Connection { + throw new Error("PooledExecutor.connection is not supported"); + } + + async exec( + command: string, + ...args: RedisValue[] + ): Promise { + const connection = await this.#pool.acquire(); + try { + const executor = new DefaultExecutor(connection); + return await executor.exec(command, ...args); + } finally { + this.#pool.release(connection); + } + } + + async sendCommand( + command: string, + args?: RedisValue[], + options?: SendCommandOptions, + ): Promise { + const connection = await this.#pool.acquire(); + try { + const executor = new DefaultExecutor(connection); + return await executor.sendCommand(command, args, options); + } finally { + this.#pool.release(connection); + } + } + + close(): void { + return this.#pool.close(); + } +} diff --git a/pool/mod.ts b/pool/mod.ts new file mode 100644 index 00000000..693f4888 --- /dev/null +++ b/pool/mod.ts @@ -0,0 +1,30 @@ +import type { Redis, RedisConnectOptions } from "../redis.ts"; +import { create } from "../redis.ts"; +import type { Connection } from "../connection.ts"; +import { createRedisConnection } from "../connection.ts"; +import { createDefaultPool } from "./default_pool.ts"; +import { createPooledExecutor } from "./executor.ts"; + +export interface CreatePoolClientOptions { + connection: RedisConnectOptions; + maxConnections?: number; +} + +export function createPoolClient( + options: CreatePoolClientOptions, +): Promise { + const pool = createDefaultPool({ + acquire, + maxConnections: options.maxConnections ?? 8, + }); + const executor = createPooledExecutor(pool); + const client = create(executor); + return Promise.resolve(client); + + async function acquire(): Promise { + const { hostname, port, ...connectionOptions } = options.connection; + const connection = createRedisConnection(hostname, port, connectionOptions); + await connection.connect(); + return connection; + } +} diff --git a/pool/pool.ts b/pool/pool.ts new file mode 100644 index 00000000..c6fe1e18 --- /dev/null +++ b/pool/pool.ts @@ -0,0 +1,5 @@ +export interface Pool { + acquire(signal?: AbortSignal): Promise; + release(item: T): void; + close(): void; +} diff --git a/redis.ts b/redis.ts index 64dba1d5..23208254 100644 --- a/redis.ts +++ b/redis.ts @@ -43,7 +43,7 @@ import type { ZScanOpts, ZUnionstoreOpts, } from "./command.ts"; -import { RedisConnection } from "./connection.ts"; +import { createRedisConnection } from "./connection.ts"; import type { Connection, SendCommandOptions } from "./connection.ts"; import type { RedisConnectionOptions } from "./connection.ts"; import type { CommandExecutor } from "./executor.ts"; @@ -2452,7 +2452,8 @@ export interface RedisConnectOptions extends RedisConnectionOptions { * ``` */ export async function connect(options: RedisConnectOptions): Promise { - const connection = createRedisConnection(options); + const { hostname, port, ...connectionOptions } = options; + const connection = createRedisConnection(hostname, port, connectionOptions); await connection.connect(); const executor = new DefaultExecutor(connection); return create(executor); @@ -2471,7 +2472,8 @@ export async function connect(options: RedisConnectOptions): Promise { * ``` */ export function createLazyClient(options: RedisConnectOptions): Redis { - const connection = createRedisConnection(options); + const { hostname, port, ...connectionOptions } = options; + const connection = createRedisConnection(hostname, port, connectionOptions); const executor = createLazyExecutor(connection); return create(executor); } @@ -2519,11 +2521,6 @@ export function parseURL(url: string): RedisConnectOptions { }; } -function createRedisConnection(options: RedisConnectOptions): Connection { - const { hostname, port = 6379, ...opts } = options; - return new RedisConnection(hostname, port, opts); -} - function createLazyExecutor(connection: Connection): CommandExecutor { let executor: CommandExecutor | null = null; return { diff --git a/tests/pool_test.ts b/tests/pool_test.ts new file mode 100644 index 00000000..0d3960ea --- /dev/null +++ b/tests/pool_test.ts @@ -0,0 +1,43 @@ +import { createPoolClient } from "../pool/mod.ts"; +import { assertEquals } from "../deps/std/assert.ts"; +import { afterAll, beforeAll, describe, it } from "../deps/std/testing.ts"; +import type { TestServer } from "./test_util.ts"; +import { nextPort, startRedis, stopRedis } from "./test_util.ts"; + +describe("createPoolClient", () => { + let port!: number; + let server!: TestServer; + beforeAll(async () => { + port = nextPort(); + server = await startRedis({ port }); + }); + afterAll(() => stopRedis(server)); + + it("supports distributing commands to pooled connections", async () => { + const client = await createPoolClient({ + connection: { + hostname: "127.0.0.1", + port, + }, + }); + try { + const blpopPromise = client.blpop(500, "list"); + setTimeout(() => { + client.lpush("list", "foo"); + }, 100); + const existsPromise = client.exists("list"); + const replies = await Promise.all([ + blpopPromise, + existsPromise, + ]); + assertEquals( + replies, + [["list", "foo"], 0], + "BLPOP should not block subsequent EXISTS", + ); + } finally { + await client.flushdb(); + client.close(); + } + }); +});