-
-
Notifications
You must be signed in to change notification settings - Fork 44
/
Copy pathpubsub.ts
106 lines (93 loc) · 2.83 KB
/
pubsub.ts
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
import { BufReader, BufWriter } from "https://deno.land/[email protected]/io/bufio.ts";
import { createRequest, readArrayReply, sendCommand } from "./io.ts";
export type RedisSubscription = {
readonly isClosed: boolean;
receive(): AsyncIterableIterator<RedisPubSubMessage>;
psubscribe(...patterns: string[]): Promise<void>;
subscribe(...channels: string[]): Promise<void>;
punsubscribe(...patterns: string[]): Promise<void>;
unsubscribe(...channels: string[]): Promise<void>;
close(): Promise<void>;
};
export type RedisPubSubMessage = {
pattern?: string;
channel: string;
message: string;
};
class RedisSubscriptionImpl implements RedisSubscription {
private _isClosed = false;
get isClosed(): boolean {
return this._isClosed;
}
private channels = Object.create(null);
private patterns = Object.create(null);
constructor(private writer: BufWriter, private reader: BufReader) {}
async psubscribe(...patterns) {
await sendCommand(this.writer, this.reader, "PSUBSCRIBE", ...patterns);
for (const pat of patterns) {
this.channels[pat] = true;
}
}
async punsubscribe(...patterns: string[]) {
await sendCommand(this.writer, this.reader, "PUNSUBSCRIBE", ...patterns);
for (const pat of patterns) {
delete this.patterns[pat];
}
}
async subscribe(...channels: string[]) {
await sendCommand(this.writer, this.reader, "SUBSCRIBE", ...channels);
for (const chan of channels) {
this.channels[chan] = true;
}
}
async unsubscribe(...channels: string[]) {
await sendCommand(this.writer, this.reader, "UNSUBSCRIBE", ...channels);
for (const chan of channels) {
delete this.channels[chan];
}
}
async *receive(): AsyncIterableIterator<RedisPubSubMessage> {
while (!this._isClosed) {
const rep = (await readArrayReply(this.reader)) as string[];
const ev = rep[0];
if (ev === "message" && rep.length === 3) {
yield {
channel: rep[1],
message: rep[2]
};
} else if (ev === "pmessage" && rep.length === 4) {
yield {
pattern: rep[1],
channel: rep[2],
message: rep[3]
};
}
}
}
async close() {
try {
await this.unsubscribe(...Object.keys(this.channels));
await this.punsubscribe(...Object.keys(this.patterns));
} finally {
this._isClosed = true;
}
}
}
export async function subscribe(
writer: BufWriter,
reader: BufReader,
...channels: string[]
): Promise<RedisSubscription> {
const sub = new RedisSubscriptionImpl(writer, reader);
await sub.subscribe(...channels);
return sub;
}
export async function psubscribe(
writer: BufWriter,
reader: BufReader,
...patterns: string[]
): Promise<RedisSubscription> {
const sub = new RedisSubscriptionImpl(writer, reader);
await sub.psubscribe(...patterns);
return sub;
}