Skip to content

Commit

Permalink
Node: refactor writeOrBufferCommandRequest (#2675)
Browse files Browse the repository at this point in the history
- seperate createWritePromise by comand_request category
- get value from split pointer at processResponse

Signed-off-by: jhpung <[email protected]>
  • Loading branch information
jhpung committed Jan 5, 2025
1 parent ffc679a commit 40bff6c
Show file tree
Hide file tree
Showing 2 changed files with 148 additions and 106 deletions.
205 changes: 107 additions & 98 deletions node/src/BaseClient.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@
* Copyright Valkey GLIDE Project Contributors - SPDX Identifier: Apache-2.0
*/
import {
ClusterScanCursor,
DEFAULT_CONNECTION_TIMEOUT_IN_MILLISECONDS,
DEFAULT_INFLIGHT_REQUESTS_LIMIT,
DEFAULT_REQUEST_TIMEOUT_IN_MILLISECONDS,
Expand Down Expand Up @@ -804,12 +803,17 @@ export interface PubSubMsg {
*/
export type WritePromiseOptions = RouteOption & DecoderOption;

export type InvokeScriptOptions = {
keys?: GlideString[];
args?: GlideString[];
} & DecoderOption &
RouteOption;

export class BaseClient {
private socket: net.Socket;
protected readonly promiseCallbackFunctions: [
PromiseFunction,
ErrorFunction,
][] = [];
protected readonly promiseCallbackFunctions:
| [PromiseFunction, ErrorFunction, Decoder | undefined][]
| [PromiseFunction, ErrorFunction][] = [];
private readonly availableCallbackSlots: number[] = [];
private requestWriter = new BufferWriter();
private writeInProgress = false;
Expand Down Expand Up @@ -910,7 +914,7 @@ export class BaseClient {
return;
}

const [resolve, reject] =
const [resolve, reject, decoder = this.defaultDecoder] =
this.promiseCallbackFunctions[message.callbackIdx];
this.availableCallbackSlots.push(message.callbackIdx);

Expand All @@ -932,7 +936,22 @@ export class BaseClient {
);
}

resolve(pointer);
try {
resolve(
valueFromSplitPointer(
pointer.high!,
pointer.low!,
decoder === Decoder.String,
),
);
} catch (err: unknown) {
Logger.log("error", "Decoder", `Decoding error: '${err}'`);
reject(
err instanceof ValkeyError
? err
: new Error(`Decoding error: '${err}'`),
);
}
} else if (message.constantResponse === response.ConstantResponse.OK) {
resolve("OK");
} else {
Expand Down Expand Up @@ -1012,87 +1031,90 @@ export class BaseClient {
});
}

protected ensureClientIsOpen() {
if (this.isClosed) {
throw new ClosingError(
"Unable to execute requests; the client is closed. Please create a new client.",
);
}
}

/**
* @internal
*/
protected createWritePromise<T>(
command:
| command_request.Command
| command_request.Command[]
| command_request.ScriptInvocation
| command_request.ClusterScan
| command_request.UpdateConnectionPassword,
command: command_request.Command | command_request.Command[],
options: WritePromiseOptions = {},
): Promise<T> {
const route = toProtobufRoute(options?.route);
const stringDecoder: boolean =
(options?.decoder ?? this.defaultDecoder) === Decoder.String;

if (this.isClosed) {
throw new ClosingError(
"Unable to execute requests; the client is closed. Please create a new client.",
);
}
this.ensureClientIsOpen();

const route = toProtobufRoute(options?.route);
return new Promise((resolve, reject) => {
const callbackIndex = this.getCallbackIndex();
this.promiseCallbackFunctions[callbackIndex] = [
(resolveAns: T) => {
try {
if (resolveAns instanceof PointerResponse) {
// valueFromSplitPointer method is used to convert a pointer from a protobuf response into a TypeScript object.
// The protobuf response is received on a socket and the value in the response is a pointer to a Rust object.
// The pointer is a split pointer because JavaScript doesn't support `u64` and pointers in Rust can be `u64`,
// so we represent it with two`u32`(`high` and`low`).
if (typeof resolveAns === "number") {
resolveAns = valueFromSplitPointer(
0,
resolveAns,
stringDecoder,
) as T;
} else {
resolveAns = valueFromSplitPointer(
resolveAns.high!,
resolveAns.low!,
stringDecoder,
) as T;
}
}

if (command instanceof command_request.ClusterScan) {
const resolveAnsArray = resolveAns as [
ClusterScanCursor,
GlideString[],
];
resolveAnsArray[0] = new ClusterScanCursor(
resolveAnsArray[0].toString(),
);
}

resolve(resolveAns);
} catch (err) {
Logger.log(
"error",
"Decoder",
`Decoding error: '${err}'`,
);
reject(err);
}
},
resolve,
reject,
options?.decoder,
];
this.writeOrBufferCommandRequest(callbackIndex, command, route);
});
}

protected createUpdateConnectionPasswordPromise(
command: command_request.UpdateConnectionPassword,
) {
this.ensureClientIsOpen();

return new Promise<GlideString>((resolve, reject) => {
const callbackIdx = this.getCallbackIndex();
this.promiseCallbackFunctions[callbackIdx] = [resolve, reject];
this.writeOrBufferRequest(
new command_request.CommandRequest({
callbackIdx,
updateConnectionPassword: command,
}),
(message: command_request.CommandRequest, writer: Writer) => {
command_request.CommandRequest.encodeDelimited(
message,
writer,
);
},
);
});
}

protected createScriptInvocationPromise<T = GlideString>(
command: command_request.ScriptInvocation,
options: InvokeScriptOptions = {},
) {
this.ensureClientIsOpen();

return new Promise<T>((resolve, reject) => {
const callbackIdx = this.getCallbackIndex();
this.promiseCallbackFunctions[callbackIdx] = [
resolve,
reject,
options?.decoder,
];
this.writeOrBufferRequest(
new command_request.CommandRequest({
callbackIdx,
scriptInvocation: command,
route: toProtobufRoute(options?.route),
}),
(message: command_request.CommandRequest, writer: Writer) => {
command_request.CommandRequest.encodeDelimited(
message,
writer,
);
},
);
});
}

protected writeOrBufferCommandRequest(
callbackIdx: number,
command:
| command_request.Command
| command_request.Command[]
| command_request.ScriptInvocation
| command_request.ClusterScan
| command_request.UpdateConnectionPassword,
command: command_request.Command | command_request.Command[],
route?: command_request.Routes,
) {
const message = Array.isArray(command)
Expand All @@ -1101,27 +1123,13 @@ export class BaseClient {
transaction: command_request.Transaction.create({
commands: command,
}),
route,
})
: command instanceof command_request.Command
? command_request.CommandRequest.create({
callbackIdx,
singleCommand: command,
})
: command instanceof command_request.ClusterScan
? command_request.CommandRequest.create({
callbackIdx,
clusterScan: command,
})
: command instanceof command_request.UpdateConnectionPassword
? command_request.CommandRequest.create({
callbackIdx,
updateConnectionPassword: command,
})
: command_request.CommandRequest.create({
callbackIdx,
scriptInvocation: command,
});
message.route = route;
: command_request.CommandRequest.create({
callbackIdx,
singleCommand: command,
route,
});

this.writeOrBufferRequest(
message,
Expand All @@ -1131,7 +1139,7 @@ export class BaseClient {
);
}

private writeOrBufferRequest<TRequest>(
protected writeOrBufferRequest<TRequest>(
message: TRequest,
encodeDelimited: (message: TRequest, writer: Writer) => void,
) {
Expand Down Expand Up @@ -3838,17 +3846,14 @@ export class BaseClient {
*/
public async invokeScript(
script: Script,
options?: {
keys?: GlideString[];
args?: GlideString[];
} & DecoderOption,
options?: InvokeScriptOptions,
): Promise<GlideReturnType> {
const scriptInvocation = command_request.ScriptInvocation.create({
hash: script.getHash(),
keys: options?.keys?.map(Buffer.from),
args: options?.args?.map(Buffer.from),
});
return this.createWritePromise(scriptInvocation, options);
return this.createScriptInvocationPromise(scriptInvocation, options);
}

/**
Expand Down Expand Up @@ -7664,7 +7669,11 @@ export class BaseClient {
*/
protected connectToServer(options: BaseClientConfiguration): Promise<void> {
return new Promise((resolve, reject) => {
this.promiseCallbackFunctions[0] = [resolve, reject];
this.promiseCallbackFunctions[0] = [
resolve,
reject,
options?.defaultDecoder,
];

const message = connection_request.ConnectionRequest.create(
this.createClientRequest(options),
Expand Down Expand Up @@ -7791,7 +7800,7 @@ export class BaseClient {
immediateAuth,
});

const response = await this.createWritePromise<GlideString>(
const response = await this.createUpdateConnectionPasswordPromise(
updateConnectionPassword,
);

Expand Down
49 changes: 41 additions & 8 deletions node/src/GlideClusterClient.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

import { ClusterScanCursor, Script } from "glide-rs";
import * as net from "net";
import { Writer } from "protobufjs";
import {
AdvancedBaseClientConfiguration,
BaseClient,
Expand All @@ -13,18 +14,19 @@ import {
GlideRecord,
GlideReturnType,
GlideString,
InvokeScriptOptions,
PubSubMsg,
convertGlideRecordToRecord,
} from "./BaseClient";
import {
ClusterScanOptions,
FlushMode,
FunctionListOptions,
FunctionListResponse,
FunctionRestorePolicy,
FunctionStatsSingleResponse,
InfoOptions,
LolwutOptions,
ClusterScanOptions,
createClientGetName,
createClientId,
createConfigGet,
Expand Down Expand Up @@ -634,11 +636,41 @@ export class GlideClusterClient extends BaseClient {
cursor: ClusterScanCursor,
options?: ClusterScanOptions & DecoderOption,
): Promise<[ClusterScanCursor, GlideString[]]> {
this.ensureClientIsOpen();
// separate decoder option from scan options
const { decoder, ...scanOptions } = options || {};
const { decoder = this.defaultDecoder, ...scanOptions } = options || {};
const cursorId = cursor.getCursor();
const command = this.scanOptionsToProto(cursorId, scanOptions);
return this.createWritePromise(command, { decoder });

return new Promise((resolve, reject) => {
const callbackIdx = this.getCallbackIndex();
this.promiseCallbackFunctions[callbackIdx] = [
(resolveAns: [ClusterScanCursor, GlideString[]]) => {
try {
resolve([
new ClusterScanCursor(resolveAns[0].toString()),
resolveAns[1],
]);
} catch (error) {
reject(error);
}
},
reject,
decoder,
];
this.writeOrBufferRequest(
new command_request.CommandRequest({
callbackIdx,
clusterScan: command,
}),
(message: command_request.CommandRequest, writer: Writer) => {
command_request.CommandRequest.encodeDelimited(
message,
writer,
);
},
);
});
}

/**
Expand Down Expand Up @@ -1755,17 +1787,18 @@ export class GlideClusterClient extends BaseClient {
*/
public async invokeScriptWithRoute(
script: Script,
options?: { args?: GlideString[] } & DecoderOption & RouteOption,
options?: InvokeScriptOptions,
): Promise<ClusterResponse<GlideReturnType>> {
const scriptInvocation = command_request.ScriptInvocation.create({
hash: script.getHash(),
keys: [],
args: options?.args?.map(Buffer.from),
});
return this.createWritePromise<ClusterGlideRecord<GlideReturnType>>(
scriptInvocation,
options,
).then((res) => convertClusterGlideRecord(res, true, options?.route));
return this.createScriptInvocationPromise<
ClusterGlideRecord<GlideReturnType>
>(scriptInvocation, options).then((res) =>
convertClusterGlideRecord(res, true, options?.route),
);
}

/**
Expand Down

0 comments on commit 40bff6c

Please sign in to comment.