Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: setBatch and getBatch #1153

Merged
merged 5 commits into from
Feb 27, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions packages/client-sdk-nodejs/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,8 @@ import * as CacheKeysExist from '@gomomento/sdk-core/dist/src/messages/responses
import * as CacheUpdateTtl from '@gomomento/sdk-core/dist/src/messages/responses/cache-ttl-update';
import * as CacheIncreaseTtl from '@gomomento/sdk-core/dist/src/messages/responses/cache-ttl-increase';
import * as CacheDecreaseTtl from '@gomomento/sdk-core/dist/src/messages/responses/cache-ttl-decrease';
import * as GetBatch from '@gomomento/sdk-core/dist/src/messages/responses/cache-batch-get';
import * as SetBatch from '@gomomento/sdk-core/dist/src/messages/responses/cache-batch-set';

// TopicClient Response Types
import * as TopicPublish from '@gomomento/sdk-core/dist/src/messages/responses/topic-publish';
Expand Down Expand Up @@ -326,6 +328,8 @@ export {
CacheUpdateTtl,
CacheIncreaseTtl,
CacheDecreaseTtl,
GetBatch,
SetBatch,
// TopicClient
TopicConfigurations,
TopicConfiguration,
Expand Down
210 changes: 205 additions & 5 deletions packages/client-sdk-nodejs/src/internal/cache-data-client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,12 @@ import {Header, HeaderInterceptorProvider} from './grpc/headers-interceptor';
import {ClientTimeoutInterceptor} from './grpc/client-timeout-interceptor';
import {createRetryInterceptorIfEnabled} from './grpc/retry-interceptor';
import {CacheServiceErrorMapper} from '../errors/cache-service-error-mapper';
import {ChannelCredentials, Interceptor, Metadata} from '@grpc/grpc-js';
import {
ChannelCredentials,
Interceptor,
Metadata,
ServiceError,
} from '@grpc/grpc-js';
import {
CacheDecreaseTtl,
CacheDelete,
Expand Down Expand Up @@ -60,6 +65,8 @@ import {
MomentoLoggerFactory,
SortedSetOrder,
UnknownError,
GetBatch,
SetBatch,
} from '..';
import {version} from '../../package.json';
import {IdleGrpcClientWrapper} from './grpc/idle-grpc-client-wrapper';
Expand Down Expand Up @@ -113,6 +120,7 @@ export class CacheDataClient implements IDataClient {
private readonly logger: MomentoLogger;
private readonly cacheServiceErrorMapper: CacheServiceErrorMapper;
private readonly interceptors: Interceptor[];
private readonly streamingInterceptors: Interceptor[];

/**
* @param {CacheClientProps} props
Expand Down Expand Up @@ -165,11 +173,19 @@ export class CacheDataClient implements IDataClient {
// to be able to set it.
const context: MiddlewareRequestHandlerContext = {};
context[CONNECTION_ID_KEY] = dataClientID;

const headers = [
new Header('Authorization', this.credentialProvider.getAuthToken()),
new Header('Agent', `nodejs:${version}`),
];

this.interceptors = this.initializeInterceptors(
headers,
this.configuration.getLoggerFactory(),
this.configuration.getMiddlewares(),
context
);
this.streamingInterceptors = this.initializeStreamingInterceptors(headers);
}
public connect(timeoutSeconds = 10): Promise<void> {
this.logger.debug('Attempting to eagerly connect to channel');
Expand Down Expand Up @@ -720,6 +736,165 @@ export class CacheDataClient implements IDataClient {
});
}

public async getBatch(
cacheName: string,
keys: Array<string | Uint8Array>
): Promise<GetBatch.Response> {
try {
validateCacheName(cacheName);
} catch (err) {
return this.cacheServiceErrorMapper.returnOrThrowError(
err as Error,
err => new GetBatch.Error(err)
);
}
this.logger.trace(`Issuing 'getBatch' request; keys: ${keys.toString()}`);
const result = await this.sendGetBatch(
cacheName,
keys.map(key => this.convert(key))
);
this.logger.trace(`'getBatch' request result: ${result.toString()}`);
return result;
}

private async sendGetBatch(
cacheName: string,
keys: Uint8Array[]
): Promise<GetBatch.Response> {
const getRequests = [];
for (const k of keys) {
const getRequest = new grpcCache._GetRequest({
cache_key: k,
});
getRequests.push(getRequest);
}
const request = new grpcCache._GetBatchRequest({
items: getRequests,
});
const metadata = this.createMetadata(cacheName);

const call = this.clientWrapper.getClient().GetBatch(request, metadata, {
interceptors: this.streamingInterceptors,
});

return await new Promise((resolve, reject) => {
const results: CacheGet.Response[] = [];
call.on('data', (getResponse: grpcCache._GetResponse) => {
const result = getResponse.result;
switch (result) {
case grpcCache.ECacheResult.Hit:
results.push(new CacheGet.Hit(getResponse.cache_body));
break;
case grpcCache.ECacheResult.Miss:
results.push(new CacheGet.Miss());
break;
default:
results.push(
new CacheGet.Error(new UnknownError(getResponse.message))
);
}
});

call.on('end', () => {
resolve(new GetBatch.Success(results, keys));
});

call.on('error', (err: ServiceError | null) => {
this.cacheServiceErrorMapper.resolveOrRejectError({
err: err,
errorResponseFactoryFn: e => new GetBatch.Error(e),
resolveFn: resolve,
rejectFn: reject,
});
});
Comment on lines +782 to +809
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For the backlog: (1) the three callbacks -- data, end and error -- have boilerplate we can abstract away; and (2) we should require the developer to implement all three.

Otherwise 👍

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@malandis wdym? is your comment targeted at future streaming APIs?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

or just consolidating the get/set ones I guess?

});
}

public async setBatch(
cacheName: string,
items:
| Record<string, string | Uint8Array>
| Map<string | Uint8Array, string | Uint8Array>,
ttl?: number
): Promise<SetBatch.Response> {
try {
validateCacheName(cacheName);
if (ttl !== undefined) {
validateTtlSeconds(ttl);
}
} catch (err) {
return this.cacheServiceErrorMapper.returnOrThrowError(
err as Error,
err => new SetBatch.Error(err)
);
}

const itemsToUse = this.convertSetBatchElements(items);

const ttlToUse = ttl || this.defaultTtlSeconds;
this.logger.trace(
`Issuing 'setBatch' request; items length: ${
itemsToUse.length
}, ttl: ${ttlToUse.toString()}`
);

return await this.sendSetBatch(cacheName, itemsToUse, ttlToUse);
}

private async sendSetBatch(
cacheName: string,
items: Record<string, Uint8Array>[],
ttlSeconds: number
): Promise<SetBatch.Response> {
const setRequests = [];
for (const item of items) {
const setRequest = new grpcCache._SetRequest({
cache_key: item.key,
cache_body: item.value,
ttl_milliseconds: ttlSeconds * 1000,
});
setRequests.push(setRequest);
}
const request = new grpcCache._SetBatchRequest({
items: setRequests,
});

const metadata = this.createMetadata(cacheName);

const call = this.clientWrapper.getClient().SetBatch(request, metadata, {
interceptors: this.streamingInterceptors,
});

return await new Promise((resolve, reject) => {
const results: CacheSet.Response[] = [];
call.on('data', (setResponse: grpcCache._SetResponse) => {
const result = setResponse.result;
switch (result) {
case grpcCache.ECacheResult.Ok:
results.push(new CacheSet.Success());
break;
default:
results.push(
new CacheSet.Error(new UnknownError(setResponse.message))
);
}
});

call.on('end', () => {
resolve(new SetBatch.Success(results));
});

call.on('error', (err: ServiceError | null) => {
this.cacheServiceErrorMapper.resolveOrRejectError({
err: err,
errorResponseFactoryFn: e => new SetBatch.Error(e),
resolveFn: resolve,
rejectFn: reject,
});
});
});
}

public async listConcatenateBack(
cacheName: string,
listName: string,
Expand Down Expand Up @@ -3089,14 +3264,11 @@ export class CacheDataClient implements IDataClient {
}

private initializeInterceptors(
headers: Header[],
loggerFactory: MomentoLoggerFactory,
middlewares: Middleware[],
middlewareRequestContext: MiddlewareRequestHandlerContext
): Interceptor[] {
const headers = [
new Header('Authorization', this.credentialProvider.getAuthToken()),
new Header('Agent', `nodejs:${version}`),
];
return [
middlewaresInterceptor(
loggerFactory,
Expand All @@ -3113,6 +3285,12 @@ export class CacheDataClient implements IDataClient {
];
}

// TODO https://github.com/momentohq/client-sdk-nodejs/issues/349
// decide on streaming interceptors and middlewares
private initializeStreamingInterceptors(headers: Header[]): Interceptor[] {
return [new HeaderInterceptorProvider(headers).createHeadersInterceptor()];
}

private convert(v: string | Uint8Array): Uint8Array {
if (typeof v === 'string') {
return this.textEncoder.encode(v);
Expand Down Expand Up @@ -3178,6 +3356,28 @@ export class CacheDataClient implements IDataClient {
}
}

private convertSetBatchElements(
elements:
| Map<string | Uint8Array, string | Uint8Array>
| Record<string, string | Uint8Array>
): Record<string, Uint8Array>[] {
if (elements instanceof Map) {
return [...elements.entries()].map(element => {
return {
key: this.convert(element[0]),
value: this.convert(element[1]),
};
});
} else {
return Object.entries(elements).map(element => {
return {
key: this.convert(element[0]),
value: this.convert(element[1]),
};
});
}
}

public async itemGetType(
cacheName: string,
key: string | Uint8Array
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
import {runBatchGetSetTests} from '@gomomento/common-integration-tests';
import {SetupIntegrationTest} from '../integration-setup';

const {cacheClient, cacheClientWithThrowOnErrors, integrationTestCacheName} =
SetupIntegrationTest();

runBatchGetSetTests(
cacheClient,
cacheClientWithThrowOnErrors,
integrationTestCacheName
);
8 changes: 4 additions & 4 deletions packages/client-sdk-web/package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion packages/client-sdk-web/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@
"xhr2": "0.2.1"
},
"dependencies": {
"@gomomento/generated-types-webtext": "0.106.0",
"@gomomento/generated-types-webtext": "0.106.1",
"@gomomento/sdk-core": "file:../core",
"@types/google-protobuf": "3.15.6",
"google-protobuf": "3.21.2",
Expand Down
Loading
Loading