diff --git a/examples/package-lock.json b/examples/package-lock.json index bc9f2ba6e..5e77ac64a 100644 --- a/examples/package-lock.json +++ b/examples/package-lock.json @@ -30,6 +30,9 @@ "typescript": "4.4.3" } }, + "../dist/src": { + "extraneous": true + }, "node_modules/@assemblyscript/loader": { "version": "0.19.23", "resolved": "https://registry.npmjs.org/@assemblyscript/loader/-/loader-0.19.23.tgz", diff --git a/package-lock.json b/package-lock.json index 42d195ae5..19166977b 100644 --- a/package-lock.json +++ b/package-lock.json @@ -9,7 +9,7 @@ "version": "0.0.1", "license": "Apache-2.0", "dependencies": { - "@gomomento/generated-types": "0.50.0", + "@gomomento/generated-types": "0.51.1", "@grpc/grpc-js": "1.7.3", "google-protobuf": "^3.21.2", "jwt-decode": "3.1.2" @@ -737,9 +737,9 @@ } }, "node_modules/@gomomento/generated-types": { - "version": "0.50.0", - "resolved": "https://registry.npmjs.org/@gomomento/generated-types/-/generated-types-0.50.0.tgz", - "integrity": "sha512-QiEYOG5hoJHSIndCkIiN6r4NwQZBNjy1zmUk83kkxlDk+9sQQmVTADH8VAnv73qizVqTZXfrOsUjnPwsLhrXZQ==", + "version": "0.51.1", + "resolved": "https://registry.npmjs.org/@gomomento/generated-types/-/generated-types-0.51.1.tgz", + "integrity": "sha512-1uiuZG1wBo49xshjnaekvJiB34+MiDfAnQNU7zzHxKNbi5BfBN9UenuOY/GIJrMmXdNzWxumylAz2xTerMB+Rw==", "dependencies": { "@grpc/grpc-js": "1.7.3", "@types/google-protobuf": "3.15.5", @@ -7166,9 +7166,9 @@ } }, "@gomomento/generated-types": { - "version": "0.50.0", - "resolved": "https://registry.npmjs.org/@gomomento/generated-types/-/generated-types-0.50.0.tgz", - "integrity": "sha512-QiEYOG5hoJHSIndCkIiN6r4NwQZBNjy1zmUk83kkxlDk+9sQQmVTADH8VAnv73qizVqTZXfrOsUjnPwsLhrXZQ==", + "version": "0.51.1", + "resolved": "https://registry.npmjs.org/@gomomento/generated-types/-/generated-types-0.51.1.tgz", + "integrity": "sha512-1uiuZG1wBo49xshjnaekvJiB34+MiDfAnQNU7zzHxKNbi5BfBN9UenuOY/GIJrMmXdNzWxumylAz2xTerMB+Rw==", "requires": { "@grpc/grpc-js": "1.7.3", "@types/google-protobuf": "3.15.5", diff --git a/package.json b/package.json index 8b26d5112..f730ec83f 100644 --- a/package.json +++ b/package.json @@ -47,7 +47,7 @@ "uuid": "8.3.2" }, "dependencies": { - "@gomomento/generated-types": "0.50.0", + "@gomomento/generated-types": "0.51.1", "@grpc/grpc-js": "1.7.3", "google-protobuf": "^3.21.2", "jwt-decode": "3.1.2" diff --git a/src/index.ts b/src/index.ts index d0fa46bbb..4ed41c9bc 100644 --- a/src/index.ts +++ b/src/index.ts @@ -1,4 +1,5 @@ import {CacheClient, SimpleCacheClient} from './cache-client'; +import {TopicClient} from './topic-client'; import * as Configurations from './config/configurations'; import * as CacheGet from './messages/responses/cache-get'; import * as CacheListConcatenateBack from './messages/responses/cache-list-concatenate-back'; @@ -44,6 +45,8 @@ import * as CacheSortedSetGetScores from './messages/responses/cache-sorted-set- import * as CacheSortedSetIncrementScore from './messages/responses/cache-sorted-set-increment-score'; import * as CacheSortedSetRemoveElement from './messages/responses/cache-sorted-set-remove-element'; import * as CacheSortedSetRemoveElements from './messages/responses/cache-sorted-set-remove-elements'; +import * as TopicPublish from './messages/responses/topic-publish'; +import * as TopicSubscribe from './messages/responses/topic-subscribe'; import {CacheInfo} from './messages/cache-info'; import {CollectionTtl} from './utils/collection-ttl'; @@ -143,6 +146,9 @@ export { CacheSortedSetIncrementScore, CacheSortedSetRemoveElement, CacheSortedSetRemoveElements, + TopicClient, + TopicPublish, + TopicSubscribe, MomentoErrorCode, AlreadyExistsError, AuthenticationError, diff --git a/src/internal/pubsub-client.ts b/src/internal/pubsub-client.ts new file mode 100644 index 000000000..625bfcce0 --- /dev/null +++ b/src/internal/pubsub-client.ts @@ -0,0 +1,298 @@ +import {pubsub} from '@gomomento/generated-types'; +import grpcPubsub = pubsub.cache_client.pubsub; +// older versions of node don't have the global util variables https://github.com/nodejs/node/issues/20365 +import {Header, HeaderInterceptorProvider} from './grpc/headers-interceptor'; +import {ClientTimeoutInterceptor} from './grpc/client-timeout-interceptor'; +import {createRetryInterceptorIfEnabled} from './grpc/retry-interceptor'; +import {cacheServiceErrorMapper} from '../errors/cache-service-error-mapper'; +import {ChannelCredentials, Interceptor, ServiceError} from '@grpc/grpc-js'; +import {Status} from '@grpc/grpc-js/build/src/constants'; +import { + TopicPublish, + TopicSubscribe, + Configuration, + CredentialProvider, + InvalidArgumentError, + MomentoLogger, +} from '..'; +import {version} from '../../package.json'; +import {IdleGrpcClientWrapper} from './grpc/idle-grpc-client-wrapper'; +import {GrpcClientWrapper} from './grpc/grpc-client-wrapper'; +import {normalizeSdkError} from '../errors/error-utils'; +import {validateCacheName} from './utils/validators'; +import {TopicClientProps} from '../topic-client-props'; +import {middlewaresInterceptor} from './grpc/middlewares-interceptor'; +import {truncateString} from './utils/display'; +import {SubscribeCallOptions} from '../utils/topic-call-options'; + +export class PubsubClient { + private readonly clientWrapper: GrpcClientWrapper; + private readonly configuration: Configuration; + private readonly credentialProvider: CredentialProvider; + private readonly unaryRequestTimeoutMs: number; + private static readonly DEFAULT_REQUEST_TIMEOUT_MS: number = 5 * 1000; + private readonly logger: MomentoLogger; + private readonly unaryInterceptors: Interceptor[]; + private readonly streamingInterceptors: Interceptor[]; + + /** + * @param {TopicClientProps} props + */ + constructor(props: TopicClientProps) { + this.configuration = props.configuration; + this.credentialProvider = props.credentialProvider; + this.logger = this.configuration.getLoggerFactory().getLogger(this); + const grpcConfig = this.configuration + .getTransportStrategy() + .getGrpcConfig(); + + this.unaryRequestTimeoutMs = + grpcConfig.getDeadlineMillis() || PubsubClient.DEFAULT_REQUEST_TIMEOUT_MS; + this.validateRequestTimeout(this.unaryRequestTimeoutMs); + this.logger.debug( + `Creating topic client using endpoint: '${this.credentialProvider.getCacheEndpoint()}'` + ); + + this.clientWrapper = new IdleGrpcClientWrapper({ + clientFactoryFn: () => + new grpcPubsub.PubsubClient( + this.credentialProvider.getCacheEndpoint(), + ChannelCredentials.createSsl(), + { + // default value for max session memory is 10mb. Under high load, it is easy to exceed this, + // after which point all requests will fail with a client-side RESOURCE_EXHAUSTED exception. + 'grpc-node.max_session_memory': grpcConfig.getMaxSessionMemoryMb(), + // This flag controls whether channels use a shared global pool of subchannels, or whether + // each channel gets its own subchannel pool. The default value is 0, meaning a single global + // pool. Setting it to 1 provides significant performance improvements when we instantiate more + // than one grpc client. + 'grpc.use_local_subchannel_pool': 1, + } + ), + configuration: this.configuration, + }); + + const headers: Header[] = [ + new Header('Authorization', this.credentialProvider.getAuthToken()), + new Header('Agent', `nodejs:${version}`), + ]; + this.unaryInterceptors = PubsubClient.initializeUnaryInterceptors( + headers, + props.configuration, + this.unaryRequestTimeoutMs + ); + this.streamingInterceptors = + PubsubClient.initializeStreamingInterceptors(headers); + } + + public getEndpoint(): string { + const endpoint = this.credentialProvider.getCacheEndpoint(); + this.logger.debug(`Using cache endpoint: ${endpoint}`); + return endpoint; + } + + private validateRequestTimeout(timeout?: number) { + this.logger.debug(`Request timeout ms: ${String(timeout)}`); + if (timeout && timeout <= 0) { + throw new InvalidArgumentError( + 'request timeout must be greater than zero.' + ); + } + } + + public async publish( + cacheName: string, + topicName: string, + value: string | Uint8Array + ): Promise { + try { + validateCacheName(cacheName); + // todo: validate topic name + } catch (err) { + throw normalizeSdkError(err as Error); + } + this.logger.trace( + 'Issuing publish request; topic: %s, message length: %s', + truncateString(topicName), + value.length + ); + + return await this.sendPublish(cacheName, topicName, value); + } + + private async sendPublish( + cacheName: string, + topicName: string, + value: string | Uint8Array + ): Promise { + const topicValue = new grpcPubsub._TopicValue(); + if (typeof value === 'string') { + topicValue.text = value; + } else { + topicValue.binary = value; + } + + const request = new grpcPubsub._PublishRequest({ + cache_name: cacheName, + topic: topicName, + value: topicValue, + }); + + return await new Promise(resolve => { + this.clientWrapper.getClient().Publish( + request, + { + interceptors: this.unaryInterceptors, + }, + (err, resp) => { + if (resp) { + resolve(new TopicPublish.Success()); + } else { + resolve(new TopicPublish.Error(cacheServiceErrorMapper(err))); + } + } + ); + }); + } + + public async subscribe( + cacheName: string, + topicName: string, + options: SubscribeCallOptions + ): Promise { + try { + validateCacheName(cacheName); + // TODO: validate topic name + } catch (err) { + throw normalizeSdkError(err as Error); + } + this.logger.trace( + 'Issuing subscribe request; topic: %s', + truncateString(topicName) + ); + + return await new Promise(resolve => { + this.sendSubscribe(cacheName, topicName, options, 0); + resolve(); + }); + } + + private sendSubscribe( + cacheName: string, + topicName: string, + options: SubscribeCallOptions, + resumeAtTopicSequenceNumber = 0 + ): void { + const request = new grpcPubsub._SubscriptionRequest({ + cache_name: cacheName, + topic: topicName, + resume_at_topic_sequence_number: resumeAtTopicSequenceNumber, + }); + + const call = this.clientWrapper.getClient().Subscribe(request, { + interceptors: this.streamingInterceptors, + }); + + // The following are the outer handlers for the stream. + // They are responsible for reconnecting the stream if it ends unexpectedly, and for + // building the API facing response objects. + + // The last topic sequence number we received. This is used to resume the stream. + // If resumeAtTopicSequenceNumber is 0, then we reconnect from the beginning again. + // Otherwise we resume starting from the next sequence number. + let lastTopicSequenceNumber = + resumeAtTopicSequenceNumber === 0 ? -1 : resumeAtTopicSequenceNumber; + let restartedDueToError = false; + call + .on('data', (resp: grpcPubsub._SubscriptionItem) => { + if (resp?.item) { + lastTopicSequenceNumber = resp.item.topic_sequence_number; + if (resp.item.value.text) { + options.onItem(new TopicSubscribe.Item(resp.item.value.text)); + } else if (resp.item.value.binary) { + options.onItem(new TopicSubscribe.Item(resp.item.value.binary)); + } + } else if (resp?.heartbeat) { + this.logger.trace( + 'Received heartbeat from subscription stream; topic: %s', + truncateString(topicName) + ); + } else if (resp?.discontinuity) { + this.logger.trace( + 'Received discontinuity from subscription stream; topic: %s', + truncateString(topicName) + ); + } + }) + .on('error', (err: Error) => { + const serviceError = err as unknown as ServiceError; + // The service cuts the the stream after ~1 minute. Hence we reconnect. + if ( + serviceError.code === Status.INTERNAL && + serviceError.details === 'Received RST_STREAM with code 0' + ) { + this.logger.trace( + 'Server closed stream due to idle activity. Restarting.' + ); + this.sendSubscribe( + cacheName, + topicName, + options, + lastTopicSequenceNumber + 1 + ); + restartedDueToError = true; + return; + } + + // Otherwise we propagate the error to the caller. + options.onError( + new TopicSubscribe.Error(cacheServiceErrorMapper(serviceError)) + ); + }) + .on('end', () => { + // The stream could have already been restarted due to an error. + if (restartedDueToError) { + this.logger.trace( + 'Stream ended after error but was restarted on topic: %s', + topicName + ); + return; + } + + this.logger.trace('Stream ended on topic: %s; restarting.', topicName); + this.sendSubscribe( + cacheName, + topicName, + options, + lastTopicSequenceNumber + 1 + ); + }); + } + + private static initializeUnaryInterceptors( + headers: Header[], + configuration: Configuration, + requestTimeoutMs: number + ): Interceptor[] { + return [ + middlewaresInterceptor( + configuration.getLoggerFactory(), + configuration.getMiddlewares() + ), + new HeaderInterceptorProvider(headers).createHeadersInterceptor(), + ClientTimeoutInterceptor(requestTimeoutMs), + ...createRetryInterceptorIfEnabled( + configuration.getLoggerFactory(), + configuration.getRetryStrategy() + ), + ]; + } + + // TODO https://github.com/momentohq/client-sdk-nodejs/issues/349 + // decide on streaming interceptors and middlewares + private static initializeStreamingInterceptors( + headers: Header[] + ): Interceptor[] { + return [new HeaderInterceptorProvider(headers).createHeadersInterceptor()]; + } +} diff --git a/src/messages/responses/topic-publish.ts b/src/messages/responses/topic-publish.ts new file mode 100644 index 000000000..39b8b3a4c --- /dev/null +++ b/src/messages/responses/topic-publish.ts @@ -0,0 +1,48 @@ +import {SdkError} from '../../errors/errors'; +import {ResponseBase, ResponseError, ResponseSuccess} from './response-base'; + +/** + * Parent response type for a topic publish request. The + * response object is resolved to a type-safe object of one of + * the following subtypes: + * + * - {Success} + * - {Error} + * + * `instanceof` type guards can be used to operate on the appropriate subtype. + * @example + * For example: + * ``` + * if (response instanceof TopicPublish.Error) { + * // Handle error as appropriate. The compiler will smart-cast `response` to type + * // `TopicPublish.Error` in this block, so you will have access to the properties + * // of the Error class; e.g. `response.errorCode()`. + * } + * ``` + */ +export abstract class Response extends ResponseBase {} + +class _Success extends Response {} + +/** + * Indicates a Successful cache set request. + */ +export class Success extends ResponseSuccess(_Success) {} + +class _Error extends Response { + constructor(protected _innerException: SdkError) { + super(); + } +} + +/** + * Indicates that an error occurred during the cache set request. + * + * This response object includes the following fields that you can use to determine + * how you would like to handle the error: + * + * - `errorCode()` - a unique Momento error code indicating the type of error that occurred. + * - `message()` - a human-readable description of the error + * - `innerException()` - the original error that caused the failure; can be re-thrown. + */ +export class Error extends ResponseError(_Error) {} diff --git a/src/messages/responses/topic-subscribe.ts b/src/messages/responses/topic-subscribe.ts new file mode 100644 index 000000000..475e036f3 --- /dev/null +++ b/src/messages/responses/topic-subscribe.ts @@ -0,0 +1,64 @@ +// older versions of node don't have the global util variables https://github.com/nodejs/node/issues/20365 +import {SdkError} from '../../errors/errors'; +import {ResponseBase, ResponseError} from './response-base'; +import {truncateString} from '../../internal/utils/display'; + +/** + * Parent response type for a cache get request. The + * response object is resolved to a type-safe object of one of + * the following subtypes: + * + * - {Hit} + * - {Miss} + * - {Error} + * + * `instanceof` type guards can be used to operate on the appropriate subtype. + * @example + * For example: + * ``` + * if (response instanceof CacheGet.Error) { + * // Handle error as appropriate. The compiler will smart-cast `response` to type + * // `CacheGet.Error` in this block, so you will have access to the properties + * // of the Error class; e.g. `response.errorCode()`. + * } + * ``` + */ +export abstract class Response extends ResponseBase {} + +export class Item extends Response { + private readonly _value: string | Uint8Array; + constructor(_value: string | Uint8Array) { + super(); + this._value = _value; + } + /** + * Returns the data as a utf-8 string, decoded from the underlying byte array. + * @returns string + */ + public value(): string | Uint8Array { + return this._value; + } + + public override toString(): string { + const display = truncateString(this.value().toString()); + return `${super.toString()}: ${display}`; + } +} + +class _Error extends Response { + constructor(protected _innerException: SdkError) { + super(); + } +} + +/** + * Indicates that an error occurred during the cache get request. + * + * This response object includes the following fields that you can use to determine + * how you would like to handle the error: + * + * - `errorCode()` - a unique Momento error code indicating the type of error that occurred. + * - `message()` - a human-readable description of the error + * - `innerException()` - the original error that caused the failure; can be re-thrown. + */ +export class Error extends ResponseError(_Error) {} diff --git a/src/topic-client-props.ts b/src/topic-client-props.ts new file mode 100644 index 000000000..cb1d800b2 --- /dev/null +++ b/src/topic-client-props.ts @@ -0,0 +1,13 @@ +import {CredentialProvider} from './auth/credential-provider'; +import {Configuration} from './config/configuration'; + +export interface TopicClientProps { + /** + * Configuration settings for the topic client + */ + configuration: Configuration; + /** + * controls how the client will get authentication information for connecting to the Momento service + */ + credentialProvider: CredentialProvider; +} diff --git a/src/topic-client.ts b/src/topic-client.ts new file mode 100644 index 000000000..1f8d2104c --- /dev/null +++ b/src/topic-client.ts @@ -0,0 +1,60 @@ +import {PubsubClient} from './internal/pubsub-client'; +import {TopicPublish, MomentoLogger} from '.'; +import {TopicClientProps} from './topic-client-props'; +import {SubscribeCallOptions} from './utils/topic-call-options'; + +/** + * Momento Topic Client. + * + * Publish and subscribe to topics. + */ +export class TopicClient { + private readonly logger: MomentoLogger; + private readonly client: PubsubClient; + + /** + * Creates an instance of TopicClient. + */ + constructor(props: TopicClientProps) { + this.logger = props.configuration.getLoggerFactory().getLogger(this); + this.logger.info('Creating Momento CacheClient'); + + this.client = new PubsubClient(props); + } + + /** + * Publishes a value to a topic. + * + * @param {string} cacheName - The name of the cache to containing the topic to publish to. + * @param {string} topicName - The name of the topic to publish to. + * @param {string | Uint8Array} value - The value to publish. + * @returns {Promise} - + * {@link TopicPublish.Success} on success. + * {@link TopicPublish.Error} on failure. + */ + public async publish( + cacheName: string, + topicName: string, + value: string | Uint8Array + ): Promise { + return await this.client.publish(cacheName, topicName, value); + } + + /** + * Subscribes to a topic. + * + * @param {string} cacheName - The name of the cache to containing the topic to subscribe to. + * @param {string} topicName - The name of the topic to subscribe to. + * @param {SubscribeCallOptions} options - The options for the subscription. + * @param {function} options.onItem - The callback to invoke when data is received. + * @param {function} options.onError - The callback to invoke when an error is received. + * @returns + */ + public async subscribe( + cacheName: string, + topicName: string, + options: SubscribeCallOptions + ): Promise { + return await this.client.subscribe(cacheName, topicName, options); + } +} diff --git a/src/utils/topic-call-options.ts b/src/utils/topic-call-options.ts new file mode 100644 index 000000000..c8f7849f7 --- /dev/null +++ b/src/utils/topic-call-options.ts @@ -0,0 +1,20 @@ +import {TopicSubscribe} from '..'; + +/** + * Options for the subscribe call. + */ +export interface SubscribeCallOptions { + /** + * The callback to invoke when data is received from the topic subscription. + * + * @param data The data received from the topic subscription. + */ + onItem(data: TopicSubscribe.Item): void; + + /** + * The callback to invoke when an error is received from the topic subscription. + * + * @param error The error received from the topic subscription. + */ + onError(error: TopicSubscribe.Error): void; +}