forked from denodrivers/redis
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathdefault_pool.ts
103 lines (92 loc) · 2.52 KB
/
default_pool.ts
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
import type { Pool } from "./pool.ts";
class AlreadyRemovedFromPoolError extends Error {
constructor() {
super("This connection has already been removed from the pool.");
}
}
const kDefaultTimeout = 5_000;
class DefaultPool<T extends Disposable> implements Pool<T> {
readonly #idle: Array<T> = [];
readonly #connections: Array<T> = [];
#connectionCount: number = 0;
readonly #deferredQueue: Array<PromiseWithResolvers<T>> = [];
readonly #options: Required<PoolOptions<T>>;
constructor(
{
maxConnections = 8,
acquire,
}: PoolOptions<T>,
) {
this.#options = {
acquire,
maxConnections,
};
}
async acquire(signal?: AbortSignal): Promise<T> {
signal ||= AbortSignal.timeout(kDefaultTimeout);
signal.throwIfAborted();
if (this.#idle.length > 0) {
const conn = this.#idle.shift()!;
return Promise.resolve(conn);
}
if (this.#connectionCount < this.#options.maxConnections) {
this.#connectionCount++;
try {
const connection = await this.#options.acquire();
this.#connections.push(connection);
return connection;
} catch (error) {
this.#connectionCount--;
throw error;
}
}
const deferred = Promise.withResolvers<T>();
this.#deferredQueue.push(deferred);
const { promise, reject } = deferred;
const onAbort = () => {
const i = this.#deferredQueue.indexOf(deferred);
if (i === -1) return;
this.#deferredQueue.splice(i, 1);
reject(signal.reason);
};
signal.addEventListener("abort", onAbort, { once: true });
return promise;
}
#has(conn: T): boolean {
return this.#connections.includes(conn);
}
release(conn: T): void {
if (!this.#has(conn)) {
throw new AlreadyRemovedFromPoolError();
} else if (this.#deferredQueue.length > 0) {
const i = this.#deferredQueue.shift()!;
i.resolve(conn);
} else {
this.#idle.push(conn);
}
}
close() {
const errors: Array<unknown> = [];
for (const x of [...this.#connections]) {
try {
x[Symbol.dispose]();
} catch (error) {
errors.push(error);
}
}
this.#connections.length = 0;
this.#idle.length = 0;
if (errors.length > 0) {
throw new AggregateError(errors);
}
}
}
export interface PoolOptions<T extends Disposable> {
maxConnections?: number;
acquire(): Promise<T>;
}
export function createDefaultPool<T extends Disposable>(
options: PoolOptions<T>,
): Pool<T> {
return new DefaultPool<T>(options);
}