From e967e077e8913d548fa03531d822f97c72b347e9 Mon Sep 17 00:00:00 2001 From: jhpung Date: Sun, 5 Jan 2025 21:05:57 +0900 Subject: [PATCH] Node: refactor writeOrBufferCommandRequest (#2675) - seperate createWritePromise by comand_request category - get value from split pointer at processResponse Signed-off-by: jhpung --- node/src/BaseClient.ts | 341 +++++++++++++++++---------------- node/src/GlideClusterClient.ts | 75 +++++++- 2 files changed, 241 insertions(+), 175 deletions(-) diff --git a/node/src/BaseClient.ts b/node/src/BaseClient.ts index a9aed75560..df0b36062a 100644 --- a/node/src/BaseClient.ts +++ b/node/src/BaseClient.ts @@ -2,7 +2,6 @@ * Copyright Valkey GLIDE Project Contributors - SPDX Identifier: Apache-2.0 */ import { - ClusterScanCursor, DEFAULT_CONNECTION_TIMEOUT_IN_MILLISECONDS, DEFAULT_INFLIGHT_REQUESTS_LIMIT, DEFAULT_REQUEST_TIMEOUT_IN_MILLISECONDS, @@ -717,80 +716,6 @@ function getRequestErrorClass( return RequestError; } -/** - * @internal - */ -function toProtobufRoute( - route: Routes | undefined, -): command_request.Routes | undefined { - if (!route) { - return undefined; - } - - if (route === "allPrimaries") { - return command_request.Routes.create({ - simpleRoutes: command_request.SimpleRoutes.AllPrimaries, - }); - } else if (route === "allNodes") { - return command_request.Routes.create({ - simpleRoutes: command_request.SimpleRoutes.AllNodes, - }); - } else if (route === "randomNode") { - return command_request.Routes.create({ - simpleRoutes: command_request.SimpleRoutes.Random, - }); - } else if (route.type === "primarySlotKey") { - return command_request.Routes.create({ - slotKeyRoute: command_request.SlotKeyRoute.create({ - slotType: command_request.SlotTypes.Primary, - slotKey: route.key, - }), - }); - } else if (route.type === "replicaSlotKey") { - return command_request.Routes.create({ - slotKeyRoute: command_request.SlotKeyRoute.create({ - slotType: command_request.SlotTypes.Replica, - slotKey: route.key, - }), - }); - } else if (route.type === "primarySlotId") { - return command_request.Routes.create({ - slotKeyRoute: command_request.SlotIdRoute.create({ - slotType: command_request.SlotTypes.Primary, - slotId: route.id, - }), - }); - } else if (route.type === "replicaSlotId") { - return command_request.Routes.create({ - slotKeyRoute: command_request.SlotIdRoute.create({ - slotType: command_request.SlotTypes.Replica, - slotId: route.id, - }), - }); - } else if (route.type === "routeByAddress") { - let port = route.port; - let host = route.host; - - if (port === undefined) { - const split = host.split(":"); - - if (split.length !== 2) { - throw new RequestError( - "No port provided, expected host to be formatted as `{hostname}:{port}`. Received " + - host, - ); - } - - host = split[0]; - port = Number(split[1]); - } - - return command_request.Routes.create({ - byAddressRoute: { host, port }, - }); - } -} - export interface PubSubMsg { message: GlideString; channel: GlideString; @@ -806,10 +731,9 @@ export type WritePromiseOptions = RouteOption & DecoderOption; export class BaseClient { private socket: net.Socket; - protected readonly promiseCallbackFunctions: [ - PromiseFunction, - ErrorFunction, - ][] = []; + protected readonly promiseCallbackFunctions: + | [PromiseFunction, ErrorFunction, Decoder | undefined][] + | [PromiseFunction, ErrorFunction][] = []; private readonly availableCallbackSlots: number[] = []; private requestWriter = new BufferWriter(); private writeInProgress = false; @@ -904,13 +828,84 @@ export class BaseClient { this.remainingReadData = undefined; } + protected toProtobufRoute( + route: Routes | undefined, + ): command_request.Routes | undefined { + if (!route) { + return undefined; + } + + if (route === "allPrimaries") { + return command_request.Routes.create({ + simpleRoutes: command_request.SimpleRoutes.AllPrimaries, + }); + } else if (route === "allNodes") { + return command_request.Routes.create({ + simpleRoutes: command_request.SimpleRoutes.AllNodes, + }); + } else if (route === "randomNode") { + return command_request.Routes.create({ + simpleRoutes: command_request.SimpleRoutes.Random, + }); + } else if (route.type === "primarySlotKey") { + return command_request.Routes.create({ + slotKeyRoute: command_request.SlotKeyRoute.create({ + slotType: command_request.SlotTypes.Primary, + slotKey: route.key, + }), + }); + } else if (route.type === "replicaSlotKey") { + return command_request.Routes.create({ + slotKeyRoute: command_request.SlotKeyRoute.create({ + slotType: command_request.SlotTypes.Replica, + slotKey: route.key, + }), + }); + } else if (route.type === "primarySlotId") { + return command_request.Routes.create({ + slotKeyRoute: command_request.SlotIdRoute.create({ + slotType: command_request.SlotTypes.Primary, + slotId: route.id, + }), + }); + } else if (route.type === "replicaSlotId") { + return command_request.Routes.create({ + slotKeyRoute: command_request.SlotIdRoute.create({ + slotType: command_request.SlotTypes.Replica, + slotId: route.id, + }), + }); + } else if (route.type === "routeByAddress") { + let port = route.port; + let host = route.host; + + if (port === undefined) { + const split = host.split(":"); + + if (split.length !== 2) { + throw new RequestError( + "No port provided, expected host to be formatted as `{hostname}:{port}`. Received " + + host, + ); + } + + host = split[0]; + port = Number(split[1]); + } + + return command_request.Routes.create({ + byAddressRoute: { host, port }, + }); + } + } + processResponse(message: response.Response) { if (message.closingError != null) { this.close(message.closingError); return; } - const [resolve, reject] = + const [resolve, reject, decoder = this.defaultDecoder] = this.promiseCallbackFunctions[message.callbackIdx]; this.availableCallbackSlots.push(message.callbackIdx); @@ -932,7 +927,22 @@ export class BaseClient { ); } - resolve(pointer); + try { + resolve( + valueFromSplitPointer( + pointer.high!, + pointer.low!, + decoder === Decoder.String, + ), + ); + } catch (err: unknown) { + Logger.log("error", "Decoder", `Decoding error: '${err}'`); + reject( + err instanceof ValkeyError + ? err + : new Error(`Decoding error: '${err}'`), + ); + } } else if (message.constantResponse === response.ConstantResponse.OK) { resolve("OK"); } else { @@ -1012,87 +1022,92 @@ export class BaseClient { }); } + protected ensureClientIsOpen() { + if (this.isClosed) { + throw new ClosingError( + "Unable to execute requests; the client is closed. Please create a new client.", + ); + } + } + /** * @internal */ protected createWritePromise( - command: - | command_request.Command - | command_request.Command[] - | command_request.ScriptInvocation - | command_request.ClusterScan - | command_request.UpdateConnectionPassword, + command: command_request.Command | command_request.Command[], options: WritePromiseOptions = {}, ): Promise { - const route = toProtobufRoute(options?.route); - const stringDecoder: boolean = - (options?.decoder ?? this.defaultDecoder) === Decoder.String; - - if (this.isClosed) { - throw new ClosingError( - "Unable to execute requests; the client is closed. Please create a new client.", - ); - } + this.ensureClientIsOpen(); + const route = this.toProtobufRoute(options?.route); return new Promise((resolve, reject) => { const callbackIndex = this.getCallbackIndex(); this.promiseCallbackFunctions[callbackIndex] = [ - (resolveAns: T) => { - try { - if (resolveAns instanceof PointerResponse) { - // valueFromSplitPointer method is used to convert a pointer from a protobuf response into a TypeScript object. - // The protobuf response is received on a socket and the value in the response is a pointer to a Rust object. - // The pointer is a split pointer because JavaScript doesn't support `u64` and pointers in Rust can be `u64`, - // so we represent it with two`u32`(`high` and`low`). - if (typeof resolveAns === "number") { - resolveAns = valueFromSplitPointer( - 0, - resolveAns, - stringDecoder, - ) as T; - } else { - resolveAns = valueFromSplitPointer( - resolveAns.high!, - resolveAns.low!, - stringDecoder, - ) as T; - } - } - - if (command instanceof command_request.ClusterScan) { - const resolveAnsArray = resolveAns as [ - ClusterScanCursor, - GlideString[], - ]; - resolveAnsArray[0] = new ClusterScanCursor( - resolveAnsArray[0].toString(), - ); - } - - resolve(resolveAns); - } catch (err) { - Logger.log( - "error", - "Decoder", - `Decoding error: '${err}'`, - ); - reject(err); - } - }, + resolve, reject, + options?.decoder, ]; this.writeOrBufferCommandRequest(callbackIndex, command, route); }); } + protected createUpdateConnectionPasswordPromise( + command: command_request.UpdateConnectionPassword, + ) { + this.ensureClientIsOpen(); + + return new Promise((resolve, reject) => { + const callbackIdx = this.getCallbackIndex(); + this.promiseCallbackFunctions[callbackIdx] = [resolve, reject]; + this.writeOrBufferRequest( + new command_request.CommandRequest({ + callbackIdx, + updateConnectionPassword: command, + }), + (message: command_request.CommandRequest, writer: Writer) => { + command_request.CommandRequest.encodeDelimited( + message, + writer, + ); + }, + ); + }); + } + + protected createScriptInvocationPromise( + command: command_request.ScriptInvocation, + options: { + keys?: GlideString[]; + args?: GlideString[]; + } & DecoderOption = {}, + ) { + this.ensureClientIsOpen(); + + return new Promise((resolve, reject) => { + const callbackIdx = this.getCallbackIndex(); + this.promiseCallbackFunctions[callbackIdx] = [ + resolve, + reject, + options?.decoder, + ]; + this.writeOrBufferRequest( + new command_request.CommandRequest({ + callbackIdx, + scriptInvocation: command, + }), + (message: command_request.CommandRequest, writer: Writer) => { + command_request.CommandRequest.encodeDelimited( + message, + writer, + ); + }, + ); + }); + } + protected writeOrBufferCommandRequest( callbackIdx: number, - command: - | command_request.Command - | command_request.Command[] - | command_request.ScriptInvocation - | command_request.ClusterScan - | command_request.UpdateConnectionPassword, + command: command_request.Command | command_request.Command[], route?: command_request.Routes, ) { const message = Array.isArray(command) @@ -1101,27 +1116,13 @@ export class BaseClient { transaction: command_request.Transaction.create({ commands: command, }), + route, }) - : command instanceof command_request.Command - ? command_request.CommandRequest.create({ - callbackIdx, - singleCommand: command, - }) - : command instanceof command_request.ClusterScan - ? command_request.CommandRequest.create({ - callbackIdx, - clusterScan: command, - }) - : command instanceof command_request.UpdateConnectionPassword - ? command_request.CommandRequest.create({ - callbackIdx, - updateConnectionPassword: command, - }) - : command_request.CommandRequest.create({ - callbackIdx, - scriptInvocation: command, - }); - message.route = route; + : command_request.CommandRequest.create({ + callbackIdx, + singleCommand: command, + route, + }); this.writeOrBufferRequest( message, @@ -1131,7 +1132,7 @@ export class BaseClient { ); } - private writeOrBufferRequest( + protected writeOrBufferRequest( message: TRequest, encodeDelimited: (message: TRequest, writer: Writer) => void, ) { @@ -3848,7 +3849,7 @@ export class BaseClient { keys: options?.keys?.map(Buffer.from), args: options?.args?.map(Buffer.from), }); - return this.createWritePromise(scriptInvocation, options); + return this.createScriptInvocationPromise(scriptInvocation, options); } /** @@ -7664,7 +7665,11 @@ export class BaseClient { */ protected connectToServer(options: BaseClientConfiguration): Promise { return new Promise((resolve, reject) => { - this.promiseCallbackFunctions[0] = [resolve, reject]; + this.promiseCallbackFunctions[0] = [ + resolve, + reject, + options?.defaultDecoder, + ]; const message = connection_request.ConnectionRequest.create( this.createClientRequest(options), @@ -7791,7 +7796,7 @@ export class BaseClient { immediateAuth, }); - const response = await this.createWritePromise( + const response = await this.createUpdateConnectionPasswordPromise( updateConnectionPassword, ); diff --git a/node/src/GlideClusterClient.ts b/node/src/GlideClusterClient.ts index c12264f078..a3d3ab830b 100644 --- a/node/src/GlideClusterClient.ts +++ b/node/src/GlideClusterClient.ts @@ -4,6 +4,7 @@ import { ClusterScanCursor, Script } from "glide-rs"; import * as net from "net"; +import { Writer } from "protobufjs"; import { AdvancedBaseClientConfiguration, BaseClient, @@ -17,6 +18,7 @@ import { convertGlideRecordToRecord, } from "./BaseClient"; import { + ClusterScanOptions, FlushMode, FunctionListOptions, FunctionListResponse, @@ -24,7 +26,6 @@ import { FunctionStatsSingleResponse, InfoOptions, LolwutOptions, - ClusterScanOptions, createClientGetName, createClientId, createConfigGet, @@ -634,11 +635,41 @@ export class GlideClusterClient extends BaseClient { cursor: ClusterScanCursor, options?: ClusterScanOptions & DecoderOption, ): Promise<[ClusterScanCursor, GlideString[]]> { + this.ensureClientIsOpen(); // separate decoder option from scan options - const { decoder, ...scanOptions } = options || {}; + const { decoder = this.defaultDecoder, ...scanOptions } = options || {}; const cursorId = cursor.getCursor(); const command = this.scanOptionsToProto(cursorId, scanOptions); - return this.createWritePromise(command, { decoder }); + + return new Promise((resolve, reject) => { + const callbackIdx = this.getCallbackIndex(); + this.promiseCallbackFunctions[callbackIdx] = [ + (resolveAns: [ClusterScanCursor, GlideString[]]) => { + try { + resolve([ + new ClusterScanCursor(resolveAns[0].toString()), + resolveAns[1], + ]); + } catch (error) { + reject(error); + } + }, + reject, + decoder, + ]; + this.writeOrBufferRequest( + new command_request.CommandRequest({ + callbackIdx, + clusterScan: command, + }), + (message: command_request.CommandRequest, writer: Writer) => { + command_request.CommandRequest.encodeDelimited( + message, + writer, + ); + }, + ); + }); } /** @@ -1762,10 +1793,40 @@ export class GlideClusterClient extends BaseClient { keys: [], args: options?.args?.map(Buffer.from), }); - return this.createWritePromise>( - scriptInvocation, - options, - ).then((res) => convertClusterGlideRecord(res, true, options?.route)); + return this.createScriptInvocationWithRoutePromise< + ClusterGlideRecord + >(scriptInvocation, options).then((res) => + convertClusterGlideRecord(res, true, options?.route), + ); + } + + private async createScriptInvocationWithRoutePromise( + command: command_request.ScriptInvocation, + options?: { args?: GlideString[] } & DecoderOption & RouteOption, + ) { + this.ensureClientIsOpen(); + + return new Promise((resolve, reject) => { + const callbackIdx = this.getCallbackIndex(); + this.promiseCallbackFunctions[callbackIdx] = [ + resolve, + reject, + options?.decoder, + ]; + this.writeOrBufferRequest( + new command_request.CommandRequest({ + callbackIdx, + scriptInvocation: command, + route: this.toProtobufRoute(options?.route), + }), + (message: command_request.CommandRequest, writer: Writer) => { + command_request.CommandRequest.encodeDelimited( + message, + writer, + ); + }, + ); + }); } /**