Skip to content

Commit 5a2b91c

Browse files
committed
address review comments
1 parent 901f197 commit 5a2b91c

File tree

7 files changed

+45
-121
lines changed

7 files changed

+45
-121
lines changed

packages/client/lib/client/cache.ts

+14-93
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,9 @@
11
import { EventEmitter } from 'stream';
2-
import RedisClient, { RedisClientType } from '.';
2+
import RedisClient from '.';
33
import { RedisArgument, ReplyUnion, TransformReply, TypeMapping } from '../RESP/types';
44
import { BasicCommandParser } from './parser';
55

66
type CachingClient = RedisClient<any, any, any, any, any>;
7-
type CachingClientType = RedisClientType<any, any, any, any, any>;
87
type CmdFunc = () => Promise<ReplyUnion>;
98

109
export interface ClientSideCacheConfig {
@@ -23,6 +22,17 @@ interface ClientSideCacheEntry {
2322
validate(): boolean;
2423
}
2524

25+
function generateCacheKey(redisArgs: ReadonlyArray<RedisArgument>): string {
26+
const tmp = new Array(redisArgs.length*2);
27+
28+
for (let i = 0; i < redisArgs.length; i++) {
29+
tmp[i] = redisArgs[i].length;
30+
tmp[i+redisArgs.length] = redisArgs[i];
31+
}
32+
33+
return tmp.join('_');
34+
}
35+
2636
abstract class ClientSideCacheEntryBase implements ClientSideCacheEntry {
2737
#invalidated = false;
2838
readonly #expireTime: number;
@@ -125,7 +135,7 @@ export class BasicClientSideCache extends ClientSideCacheProvider {
125135
) {
126136
let reply: ReplyUnion;
127137

128-
const cacheKey = parser.cacheKey;
138+
const cacheKey = generateCacheKey(parser.redisArgs);
129139

130140
// "2"
131141
let cacheEntry = this.get(cacheKey);
@@ -339,10 +349,6 @@ export class BasicClientSideCache extends ClientSideCacheProvider {
339349
export abstract class PooledClientSideCacheProvider extends BasicClientSideCache {
340350
#disabled = false;
341351

342-
abstract updateRedirect(id: number): void;
343-
abstract addClient(client: CachingClientType): void;
344-
abstract removeClient(client: CachingClientType): void;
345-
346352
disable() {
347353
this.#disabled = true;
348354
}
@@ -367,27 +373,13 @@ export abstract class PooledClientSideCacheProvider extends BasicClientSideCache
367373
return super.has(cacheKey);
368374
}
369375

370-
onPoolConnect(factory: () => CachingClientType) {};
371-
372376
onPoolClose() {
373377
this.clear();
374378
};
375379
}
376380

377381
// doesn't do anything special in pooling, clears cache on every client disconnect
378382
export class BasicPooledClientSideCache extends PooledClientSideCacheProvider {
379-
380-
override updateRedirect(id: number): void {
381-
return;
382-
}
383-
384-
override addClient(client: CachingClientType): void {
385-
return;
386-
}
387-
override removeClient(client: CachingClientType): void {
388-
return;
389-
}
390-
391383
override onError() {
392384
this.clear(false);
393385
}
@@ -459,75 +451,4 @@ export class PooledNoRedirectClientSideCache extends BasicPooledClientSideCache
459451
override onError() {}
460452

461453
override onClose() {}
462-
}
463-
464-
// Only clears cache on "management"/"redirect" client disconnect
465-
export class PooledRedirectClientSideCache extends PooledClientSideCacheProvider {
466-
#id?: number;
467-
#clients: Set<CachingClientType> = new Set();
468-
#redirectClient?: CachingClientType;
469-
470-
constructor(config: ClientSideCacheConfig) {
471-
super(config);
472-
this.disable();
473-
}
474-
475-
override trackingOn(): string[] {
476-
if (this.#id) {
477-
return ['CLIENT', 'TRACKING', 'ON', 'REDIRECT', this.#id.toString()];
478-
} else {
479-
return [];
480-
}
481-
}
482-
483-
override updateRedirect(id: number) {
484-
this.#id = id;
485-
for (const client of this.#clients) {
486-
client.sendCommand(this.trackingOn()).catch(() => {});
487-
}
488-
}
489-
490-
override addClient(client: CachingClientType) {
491-
this.#clients.add(client);
492-
}
493-
494-
override removeClient(client: CachingClientType) {
495-
this.#clients.delete(client);
496-
}
497-
498-
override onError(): void {};
499-
500-
override async onPoolConnect(factory: () => CachingClientType) {
501-
const client = factory();
502-
this.#redirectClient = client;
503-
504-
client.on("error", () => {
505-
this.disable();
506-
this.clear();
507-
}).on("ready", async () => {
508-
const clientId = await client.withTypeMapping({}).clientId();
509-
this.updateRedirect(clientId);
510-
this.enable();
511-
})
512-
513-
try {
514-
await client.connect();
515-
} catch (err) {
516-
throw err;
517-
}
518-
}
519-
520-
override onClose() {};
521-
522-
override onPoolClose() {
523-
super.onPoolClose();
524-
525-
if (this.#redirectClient) {
526-
this.#id = undefined;
527-
const client = this.#redirectClient;
528-
this.#redirectClient = undefined;
529-
530-
return client.close();
531-
}
532-
}
533-
}
454+
}

packages/client/lib/client/commands-queue.ts

+1
Original file line numberDiff line numberDiff line change
@@ -111,6 +111,7 @@ export default class RedisCommandsQueue {
111111
onErrorReply: err => this.#onErrorReply(err),
112112
onPush: push => {
113113
if (!this.#onPush(push)) {
114+
// currently only supporting "invalidate" over RESP3 push messages
114115
switch (push[0].toString()) {
115116
case "invalidate": {
116117
if (this.#invalidateCallback) {

packages/client/lib/client/index.ts

+5-6
Original file line numberDiff line numberDiff line change
@@ -290,6 +290,9 @@ export default class RedisClient<
290290
#watchEpoch?: number;
291291

292292
#clientSideCache?: ClientSideCacheProvider;
293+
get clientSideCache() {
294+
return this._self.#clientSideCache;
295+
}
293296

294297
get options(): RedisClientOptions<M, F, S, RESP> | undefined {
295298
return this._self.#options;
@@ -311,7 +314,6 @@ export default class RedisClient<
311314
return this._self.#socket.socketEpoch;
312315
}
313316

314-
315317
get isWatching() {
316318
return this._self.#watchEpoch !== undefined;
317319
}
@@ -414,10 +416,7 @@ export default class RedisClient<
414416
}
415417

416418
if (this.#clientSideCache) {
417-
const tracking = this.#clientSideCache.trackingOn();
418-
if (tracking) {
419-
commands.push(tracking);
420-
}
419+
commands.push(this.#clientSideCache.trackingOn());
421420
}
422421

423422
return commands;
@@ -855,7 +854,7 @@ export default class RedisClient<
855854
}
856855

857856
const chainId = Symbol('Pipeline Chain'),
858-
promise = Promise.allSettled(
857+
promise = Promise.all(
859858
commands.map(({ args }) => this._self.#queue.addCommand(args, {
860859
chainId,
861860
typeMapping: this._commandOptions?.typeMapping

packages/client/lib/client/parser.ts

+8-2
Original file line numberDiff line numberDiff line change
@@ -34,8 +34,14 @@ export class BasicCommandParser implements CommandParser {
3434
}
3535

3636
get cacheKey() {
37-
let cacheKey = this.#redisArgs.map((arg) => arg.length).join('_');
38-
return cacheKey + '_' + this.#redisArgs.join('_');
37+
const tmp = new Array(this.#redisArgs.length*2);
38+
39+
for (let i = 0; i < this.#redisArgs.length; i++) {
40+
tmp[i] = this.#redisArgs[i].length;
41+
tmp[i+this.#redisArgs.length] = this.#redisArgs[i];
42+
}
43+
44+
return tmp.join('_');
3945
}
4046

4147
push(...arg: Array<RedisArgument>) {

packages/client/lib/client/pool.ts

+6-16
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@ import { TimeoutError } from '../errors';
77
import { attachConfig, functionArgumentsPrefix, getTransformReply, scriptArgumentsPrefix } from '../commander';
88
import { CommandOptions } from './commands-queue';
99
import RedisClientMultiCommand, { RedisClientMultiCommandType } from './multi-command';
10-
import { BasicPooledClientSideCache, ClientSideCacheConfig, PooledClientSideCacheProvider, PooledNoRedirectClientSideCache, PooledRedirectClientSideCache } from './cache';
10+
import { BasicPooledClientSideCache, ClientSideCacheConfig, PooledClientSideCacheProvider } from './cache';
1111
import { BasicCommandParser } from './parser';
1212

1313
export interface RedisPoolOptions {
@@ -215,6 +215,9 @@ export class RedisClientPool<
215215
}
216216

217217
#clientSideCache?: PooledClientSideCacheProvider;
218+
get clientSideCache() {
219+
return this._self.#clientSideCache;
220+
}
218221

219222
/**
220223
* You are probably looking for {@link RedisClient.createPool `RedisClient.createPool`},
@@ -241,8 +244,7 @@ export class RedisClientPool<
241244
} else {
242245
const cscConfig = options.clientSideCache;
243246
this.#clientSideCache = clientOptions.clientSideCache = new BasicPooledClientSideCache(cscConfig);
244-
this.#clientSideCache = clientOptions.clientSideCache = new PooledNoRedirectClientSideCache(cscConfig);
245-
this.#clientSideCache = clientOptions.clientSideCache = new PooledRedirectClientSideCache(cscConfig);
247+
// this.#clientSideCache = clientOptions.clientSideCache = new PooledNoRedirectClientSideCache(cscConfig);
246248
}
247249
}
248250

@@ -312,13 +314,6 @@ export class RedisClientPool<
312314
if (this._self.#isOpen) return; // TODO: throw error?
313315
this._self.#isOpen = true;
314316

315-
try {
316-
this._self.#clientSideCache?.onPoolConnect(this._self.#clientFactory);
317-
} catch (err) {
318-
this.destroy();
319-
throw err;
320-
}
321-
322317
const promises = [];
323318
while (promises.length < this._self.#options.minimum) {
324319
promises.push(this._self.#create());
@@ -334,18 +329,14 @@ export class RedisClientPool<
334329
return this as unknown as RedisClientPoolType<M, F, S, RESP, TYPE_MAPPING>;
335330
}
336331

337-
async #create(redirect?: boolean) {
332+
async #create() {
338333
const node = this._self.#clientsInUse.push(
339334
this._self.#clientFactory()
340335
.on('error', (err: Error) => this.emit('error', err))
341336
);
342337

343338
try {
344339
const client = node.value;
345-
if (this._self.#clientSideCache) {
346-
this._self.#clientSideCache.addClient(node.value);
347-
}
348-
349340
await client.connect();
350341
} catch (err) {
351342
this._self.#clientsInUse.remove(node);
@@ -436,7 +427,6 @@ export class RedisClientPool<
436427
for (let i = 0; i < toDestroy; i++) {
437428
// TODO: shift vs pop
438429
const client = this.#idleClients.shift()!
439-
this.#clientSideCache?.removeClient(client);
440430
client.destroy();
441431
}
442432
}

packages/client/lib/cluster/index.ts

+4
Original file line numberDiff line numberDiff line change
@@ -270,6 +270,10 @@ export default class RedisCluster<
270270
return this._self.#slots.slots;
271271
}
272272

273+
get clientSideCache() {
274+
return this._self.#slots.clientSideCache;
275+
}
276+
273277
/**
274278
* An array of the cluster masters.
275279
* Use with {@link RedisCluster.prototype.nodeClient} to get the client for a specific master node.

packages/client/lib/sentinel/index.ts

+7-4
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@ import { RedisVariadicArgument } from '../commands/generic-transformers';
1616
import { WaitQueue } from './wait-queue';
1717
import { TcpNetConnectOpts } from 'node:net';
1818
import { RedisTcpSocketOptions } from '../client/socket';
19-
import { BasicPooledClientSideCache, PooledClientSideCacheProvider, PooledNoRedirectClientSideCache, PooledRedirectClientSideCache } from '../client/cache';
19+
import { BasicPooledClientSideCache, PooledClientSideCacheProvider } from '../client/cache';
2020

2121
interface ClientInfo {
2222
id: number;
@@ -266,6 +266,10 @@ export default class RedisSentinel<
266266
#masterClientCount = 0;
267267
#masterClientInfo?: ClientInfo;
268268

269+
get clientSideCache() {
270+
return this._self.#internal.clientSideCache;
271+
}
272+
269273
constructor(options: RedisSentinelOptions<M, F, S, RESP, TYPE_MAPPING>) {
270274
super();
271275

@@ -558,7 +562,7 @@ class RedisSentinelInternal<
558562

559563
readonly #name: string;
560564
readonly #nodeClientOptions: RedisClientOptions<M, F, S, RESP, TYPE_MAPPING, RedisTcpSocketOptions>;
561-
readonly #sentinelClientOptions: RedisClientOptions<typeof RedisSentinelModule, F, S, RESP, TYPE_MAPPING, RedisTcpSocketOptions>;
565+
readonly #sentinelClientOptions: RedisClientOptions<typeof RedisSentinelModule, RedisFunctions, RedisScripts, RespVersions, TypeMapping, RedisTcpSocketOptions>;
562566
readonly #scanInterval: number;
563567
readonly #passthroughClientErrorEvents: boolean;
564568

@@ -619,8 +623,7 @@ class RedisSentinelInternal<
619623
} else {
620624
const cscConfig = options.clientSideCache;
621625
this.#clientSideCache = this.#nodeClientOptions.clientSideCache = new BasicPooledClientSideCache(cscConfig);
622-
this.#clientSideCache = this.#nodeClientOptions.clientSideCache = new PooledNoRedirectClientSideCache(cscConfig);
623-
this.#clientSideCache = this.#nodeClientOptions.clientSideCache = new PooledRedirectClientSideCache(cscConfig);
626+
// this.#clientSideCache = this.#nodeClientOptions.clientSideCache = new PooledNoRedirectClientSideCache(cscConfig);
624627
}
625628
}
626629

0 commit comments

Comments
 (0)