diff --git a/CHANGELOG.md b/CHANGELOG.md index 3f961323b4..a8f8e335a5 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -15,6 +15,7 @@ * Python, Node: Added PTTL command ([#1036](https://github.com/aws/glide-for-redis/pull/1036), [#1082](https://github.com/aws/glide-for-redis/pull/1082)) * Node: Added HVAL command ([#1022](https://github.com/aws/glide-for-redis/pull/1022)) * Node: Added PERSIST command ([#1023](https://github.com/aws/glide-for-redis/pull/1023)) +* Node: Added Xadd, Xtrim commands. ([#1057](https://github.com/aws/glide-for-redis/pull/1057)) #### Features diff --git a/node/src/BaseClient.ts b/node/src/BaseClient.ts index 64d5cf18fd..e5f8bfdefe 100644 --- a/node/src/BaseClient.ts +++ b/node/src/BaseClient.ts @@ -14,6 +14,8 @@ import { ExpireOptions, ScoreLimit, SetOptions, + StreamAddOptions, + StreamTrimOptions, ZaddOptions, createDecr, createDecrBy, @@ -61,6 +63,8 @@ import { createTTL, createType, createUnlink, + createXadd, + createXtrim, createZadd, createZcard, createZcount, @@ -1207,6 +1211,34 @@ export class BaseClient { return this.createWritePromise(createZremRangeByRank(key, start, end)); } + /** + * Adds an entry to the specified stream. + * See https://redis.io/commands/xadd/ for more details. + * + * @param key - The key of the stream. + * @param values - field-value pairs to be added to the entry. + * @returns The id of the added entry, or `null` if `options.makeStream` is set to `false` and no stream with the matching `key` exists. + */ + public xadd( + key: string, + values: [string, string][], + options?: StreamAddOptions, + ): Promise { + return this.createWritePromise(createXadd(key, values, options)); + } + + /** + * Trims the stream by evicting older entries. + * See https://redis.io/commands/xtrim/ for more details. + * + * @param key - the key of the stream + * @param options - options detailing how to trim the stream. + * @returns The number of entries deleted from the stream. + */ + public xtrim(key: string, options: StreamTrimOptions): Promise { + return this.createWritePromise(createXtrim(key, options)); + } + private readonly MAP_READ_FROM_STRATEGY: Record< ReadFrom, connection_request.ReadFrom diff --git a/node/src/Commands.ts b/node/src/Commands.ts index 27d47f7e9d..aa8c4fbbf7 100644 --- a/node/src/Commands.ts +++ b/node/src/Commands.ts @@ -916,9 +916,116 @@ export function createZremRangeByRank( ]); } +export function createPersist(key: string): redis_request.Command { + return createCommand(RequestType.Persist, [key]); +} + +export type StreamTrimOptions = ( + | { + /** + * Trim the stream according to entry ID. + * Equivalent to `MINID` in the Redis API. + */ + method: "minid"; + threshold: string; + } + | { + /** + * Trim the stream according to length. + * Equivalent to `MAXLEN` in the Redis API. + */ + method: "maxlen"; + threshold: number; + } +) & { + /** + * If `true`, the stream will be trimmed exactly. Equivalent to `=` in the Redis API. Otherwise the stream will be trimmed in a near-exact manner, which is more efficient, equivalent to `~` in the Redis API. + */ + exact: boolean; + /** + * If set, sets the maximal amount of entries that will be deleted. + */ + limit?: number; +}; + +export type StreamAddOptions = { + /** + * If set, the new entry will be added with this ID. + */ + id?: string; + /** + * If set to `false`, a new stream won't be created if no stream matches the given key. + * Equivalent to `NOMKSTREAM` in the Redis API. + */ + makeStream?: boolean; + /** + * If set, the add operation will also trim the older entries in the stream. + */ + trim?: StreamTrimOptions; +}; + +function addTrimOptions(options: StreamTrimOptions, args: string[]) { + if (options.method === "maxlen") { + args.push("MAXLEN"); + } else if (options.method === "minid") { + args.push("MINID"); + } + + if (options.exact) { + args.push("="); + } else { + args.push("~"); + } + + if (options.method === "maxlen") { + args.push(options.threshold.toString()); + } else if (options.method === "minid") { + args.push(options.threshold); + } + + if (options.limit) { + args.push("LIMIT"); + args.push(options.limit.toString()); + } +} + +export function createXadd( + key: string, + values: [string, string][], + options?: StreamAddOptions, +): redis_request.Command { + const args = [key]; + + if (options?.makeStream === false) { + args.push("NOMKSTREAM"); + } + + if (options?.trim) { + addTrimOptions(options.trim, args); + } + + if (options?.id) { + args.push(options.id); + } else { + args.push("*"); + } + + values.forEach(([field, value]) => { + args.push(field); + args.push(value); + }); + + return createCommand(RequestType.XAdd, args); +} + /** * @internal */ -export function createPersist(key: string): redis_request.Command { - return createCommand(RequestType.Persist, [key]); +export function createXtrim( + key: string, + options: StreamTrimOptions, +): redis_request.Command { + const args = [key]; + addTrimOptions(options, args); + return createCommand(RequestType.XTrim, args); } diff --git a/node/src/Transaction.ts b/node/src/Transaction.ts index 72d371d09b..df0371a41c 100644 --- a/node/src/Transaction.ts +++ b/node/src/Transaction.ts @@ -7,6 +7,8 @@ import { InfoOptions, ScoreLimit, SetOptions, + StreamAddOptions, + StreamTrimOptions, ZaddOptions, createClientGetName, createClientId, @@ -64,6 +66,8 @@ import { createTTL, createType, createUnlink, + createXadd, + createXtrim, createZadd, createZcard, createZcount, @@ -1011,6 +1015,34 @@ export class BaseTransaction> { public lindex(key: string, index: number): T { return this.addAndReturn(createLindex(key, index)); } + + /** + * Adds an entry to the specified stream. + * See https://redis.io/commands/xadd/ for more details. + * + * @param key - The key of the stream. + * @param values - field-value pairs to be added to the entry. + * @returns The id of the added entry, or `null` if `options.makeStream` is set to `false` and no stream with the matching `key` exists. + */ + public xadd( + key: string, + values: [string, string][], + options?: StreamAddOptions, + ): T { + return this.addAndReturn(createXadd(key, values, options)); + } + + /** + * Trims the stream by evicting older entries. + * See https://redis.io/commands/xtrim/ for more details. + * + * @param key - the key of the stream + * @param options - options detailing how to trim the stream. + * @returns The number of entries deleted from the stream. + */ + public xtrim(key: string, options: StreamTrimOptions): T { + return this.addAndReturn(createXtrim(key, options)); + } } /** diff --git a/node/tests/RedisClientInternals.test.ts b/node/tests/RedisClientInternals.test.ts index 6e2195f4b7..22f194acaf 100644 --- a/node/tests/RedisClientInternals.test.ts +++ b/node/tests/RedisClientInternals.test.ts @@ -627,4 +627,147 @@ describe("SocketConnectionInternals", () => { expect(result2).toBeNull(); }); }); + + it("should set arguments according to xadd options", async () => { + let counter = 0; + await testWithClusterResources(async (connection, socket) => { + socket.on("data", (data) => { + const reader = Reader.create(data); + const request = + redis_request.RedisRequest.decodeDelimited(reader); + expect(request.singleCommand?.requestType).toEqual( + RequestType.XAdd, + ); + + if (counter === 0) { + expect(request.singleCommand?.argsArray?.args).toEqual([ + "foo", + "*", + "a", + "1", + "b", + "2", + ]); + } else if (counter === 1) { + expect(request.singleCommand?.argsArray?.args).toEqual([ + "bar", + "YOLO", + "a", + "1", + ]); + } else if (counter === 2) { + expect(request.singleCommand?.argsArray?.args).toEqual([ + "baz", + "MAXLEN", + "=", + "1000", + "*", + "c", + "3", + ]); + } else if (counter === 3) { + expect(request.singleCommand?.argsArray?.args).toEqual([ + "foobar", + "NOMKSTREAM", + "MINID", + "~", + "foo", + "LIMIT", + "1000", + "*", + "d", + "4", + ]); + } else { + throw new Error("too many requests! " + counter); + } + + counter = counter + 1; + + sendResponse(socket, ResponseType.Null, request.callbackIdx); + }); + await connection.xadd("foo", [ + ["a", "1"], + ["b", "2"], + ]); + + await connection.xadd("bar", [["a", "1"]], { + id: "YOLO", + makeStream: true, + }); + + await connection.xadd("baz", [["c", "3"]], { + trim: { + method: "maxlen", + threshold: 1000, + exact: true, + }, + }); + + await connection.xadd("foobar", [["d", "4"]], { + makeStream: false, + trim: { + method: "minid", + threshold: "foo", + exact: false, + limit: 1000, + }, + }); + + expect(counter).toEqual(4); + }); + }); + + it("should set arguments according to xtrim options", async () => { + let counter = 0; + await testWithClusterResources(async (connection, socket) => { + socket.on("data", (data) => { + const reader = Reader.create(data); + const request = + redis_request.RedisRequest.decodeDelimited(reader); + expect(request.singleCommand?.requestType).toEqual( + RequestType.XTrim, + ); + + if (counter === 0) { + expect(request.singleCommand?.argsArray?.args).toEqual([ + "foo", + "MAXLEN", + "=", + "1000", + ]); + } else if (counter === 1) { + expect(request.singleCommand?.argsArray?.args).toEqual([ + "bar", + "MINID", + "~", + "foo", + "LIMIT", + "1000", + ]); + } else { + throw new Error("too many requests! " + counter); + } + + counter = counter + 1; + + sendResponse(socket, ResponseType.Null, request.callbackIdx); + }); + + await connection.xtrim("foo", { + method: "maxlen", + threshold: 1000, + exact: true, + }); + + await connection.xtrim("bar", { + method: "minid", + threshold: "foo", + exact: false, + limit: 1000, + }); + + expect(counter).toEqual(2); + }); + }); }); diff --git a/node/tests/SharedTests.ts b/node/tests/SharedTests.ts index 1891f09d4f..84c10c0aa9 100644 --- a/node/tests/SharedTests.ts +++ b/node/tests/SharedTests.ts @@ -1742,6 +1742,93 @@ export function runBaseTests(config: { }, config.timeout, ); + + it.each([ProtocolVersion.RESP2, ProtocolVersion.RESP3])( + `streams add and trim test_%p`, + async () => { + await runTest(async (client: BaseClient) => { + const key = uuidv4(); + const field1 = uuidv4(); + const field2 = uuidv4(); + + const nullResult = await client.xadd( + key, + [ + [field1, "foo"], + [field2, "bar"], + ], + { + makeStream: false, + }, + ); + expect(nullResult).toBeNull(); + + const timestamp1 = await client.xadd( + key, + [ + [field1, "foo1"], + [field2, "bar1"], + ], + { id: "0-1" }, + ); + expect(timestamp1).toEqual("0-1"); + expect( + await client.xadd(key, [ + [field1, "foo2"], + [field2, "bar2"], + ]), + ).not.toBeNull(); + expect(await client.customCommand(["XLEN", key])).toEqual(2); + + // this will trim the first entry. + const id = await client.xadd( + key, + [ + [field1, "foo3"], + [field2, "bar3"], + ], + { + trim: { + method: "maxlen", + threshold: 2, + exact: true, + }, + }, + ); + expect(id).not.toBeNull(); + expect(await client.customCommand(["XLEN", key])).toEqual(2); + + // this will trim the 2nd entry. + expect( + await client.xadd( + key, + [ + [field1, "foo4"], + [field2, "bar4"], + ], + { + trim: { + method: "minid", + threshold: id as string, + exact: true, + }, + }, + ), + ).not.toBeNull(); + expect(await client.customCommand(["XLEN", key])).toEqual(2); + + expect( + await client.xtrim(key, { + method: "maxlen", + threshold: 1, + exact: true, + }), + ).toEqual(1); + expect(await client.customCommand(["XLEN", key])).toEqual(1); + }, ProtocolVersion.RESP2); + }, + config.timeout, + ); } export function runCommonTests(config: { diff --git a/node/tests/TestUtilities.ts b/node/tests/TestUtilities.ts index 2f4b6926ce..fe806a3753 100644 --- a/node/tests/TestUtilities.ts +++ b/node/tests/TestUtilities.ts @@ -61,6 +61,7 @@ export function transactionTest( const key6 = "{key}" + uuidv4(); const key7 = "{key}" + uuidv4(); const key8 = "{key}" + uuidv4(); + const key9 = "{key}" + uuidv4(); const field = uuidv4(); const value = uuidv4(); const args: ReturnType[] = []; @@ -162,6 +163,16 @@ export function transactionTest( args.push({ member4: 4 }); baseTransaction.zremRangeByRank(key8, 0, 1); args.push(1); + baseTransaction.xadd(key9, [["foo", "bar"]], { id: "0-1" }); + args.push("0-1"); + baseTransaction.xadd(key9, [["foo", "bar"]], { id: "0-2" }); + args.push("0-2"); + baseTransaction.xtrim(key9, { + method: "minid", + threshold: "0-2", + exact: true, + }); + args.push(1); return args; }