Skip to content

Commit

Permalink
Merge pull request #511 from momentohq/dry-out-clients
Browse files Browse the repository at this point in the history
chore: DRY out clients
  • Loading branch information
cprice404 authored May 17, 2023
2 parents c62ac80 + 45e26fb commit c508bc6
Show file tree
Hide file tree
Showing 8 changed files with 426 additions and 619 deletions.
214 changes: 14 additions & 200 deletions packages/client-sdk-nodejs/src/internal/pubsub-client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,66 +16,26 @@ import {Configuration} from '../config/configuration';
import {
CredentialProvider,
InvalidArgumentError,
MomentoErrorCode,
MomentoLogger,
TopicItem,
TopicPublish,
TopicSubscribe,
UnknownError,
} from '../';
import {SubscriptionState} from '@gomomento/sdk-core/dist/src/internal/subscription-state';
import {truncateString} from '@gomomento/sdk-core/dist/src/internal/utils';
import {
truncateString,
validateCacheName,
validateTopicName,
} from '@gomomento/sdk-core/dist/src/internal/utils';
import {normalizeSdkError} from '@gomomento/sdk-core/dist/src/errors';
import {SubscribeCallOptions} from '@gomomento/sdk-core/dist/src/utils';
import {IPubsubClient} from '@gomomento/sdk-core/dist/src/internal/clients';
AbstractPubsubClient,
SendSubscribeOptions,
PrepareSubscribeCallbackOptions,
} from '@gomomento/sdk-core/dist/src/internal/clients/pubsub/AbstractPubsubClient';

/**
* Encapsulates parameters for the `sendSubscribe` method.
*/
interface SendSubscribeOptions {
cacheName: string;
topicName: string;
onItem: (item: TopicItem) => void;
onError: (
error: TopicSubscribe.Error,
subscription: TopicSubscribe.Subscription
) => void;
subscriptionState: SubscriptionState;
subscription: TopicSubscribe.Subscription;
}

/**
* Encapsulates parameters for the subscribe callback prepare methods.
*/
interface PrepareSubscribeCallbackOptions extends SendSubscribeOptions {
/**
* The promise resolve function.
*/
resolve: (
value: TopicSubscribe.Response | PromiseLike<TopicSubscribe.Subscription>
) => void;
/**
* Whether the stream was restarted due to an error. If so, we skip the end stream handler
* logic as the error handler will have restarted the stream.
*/
restartedDueToError: boolean;
/**
* If the first message is an error, we return an error immediately and do not subscribe.
*/
firstMessage: boolean;
}

export class PubsubClient implements IPubsubClient {
export class PubsubClient extends AbstractPubsubClient {
private readonly clientWrapper: GrpcClientWrapper<grpcPubsub.PubsubClient>;
private readonly configuration: Configuration;
private readonly credentialProvider: CredentialProvider;
protected readonly credentialProvider: CredentialProvider;
private readonly unaryRequestTimeoutMs: number;
private static readonly DEFAULT_REQUEST_TIMEOUT_MS: number = 5 * 1000;
private readonly logger: MomentoLogger;
protected readonly logger: MomentoLogger;
private readonly unaryInterceptors: Interceptor[];
private readonly streamingInterceptors: Interceptor[];

Expand All @@ -86,6 +46,7 @@ export class PubsubClient implements IPubsubClient {
* @param {TopicClientProps} props
*/
constructor(props: TopicClientProps) {
super();
this.configuration = props.configuration;
this.credentialProvider = props.credentialProvider;
this.logger = this.configuration.getLoggerFactory().getLogger(this);
Expand Down Expand Up @@ -147,27 +108,7 @@ export class PubsubClient implements IPubsubClient {
}
}

public async publish(
cacheName: string,
topicName: string,
value: string | Uint8Array
): Promise<TopicPublish.Response> {
try {
validateCacheName(cacheName);
validateTopicName(topicName);
} catch (err) {
return new TopicPublish.Error(normalizeSdkError(err as Error));
}
this.logger.trace(
'Issuing publish request; topic: %s, message length: %s',
truncateString(topicName),
value.length
);

return await this.sendPublish(cacheName, topicName, value);
}

private async sendPublish(
protected async sendPublish(
cacheName: string,
topicName: string,
value: string | Uint8Array
Expand Down Expand Up @@ -202,45 +143,6 @@ export class PubsubClient implements IPubsubClient {
});
}

public async subscribe(
cacheName: string,
topicName: string,
options: SubscribeCallOptions
): Promise<TopicSubscribe.Response> {
try {
validateCacheName(cacheName);
validateTopicName(topicName);
} catch (err) {
return new TopicSubscribe.Error(normalizeSdkError(err as Error));
}
this.logger.trace(
'Issuing subscribe request; topic: %s',
truncateString(topicName)
);

const onItem =
options.onItem ??
(() => {
return;
});
const onError =
options.onError ??
(() => {
return;
});

const subscriptionState = new SubscriptionState();
const subscription = new TopicSubscribe.Subscription(subscriptionState);
return await this.sendSubscribe({
cacheName,
topicName,
onItem,
onError,
subscriptionState,
subscription,
});
}

/**
* @remark This method is responsible for restarting the stream if it ends unexpectedly.
* Since we return a single subscription object to the user, we need to update it with the
Expand All @@ -254,7 +156,7 @@ export class PubsubClient implements IPubsubClient {
* case we already returned a subscription object to the user, so we instead cancel the stream and
* propagate an error to the user via the error handler.
*/
private sendSubscribe(
protected sendSubscribe(
options: SendSubscribeOptions
): Promise<TopicSubscribe.Response> {
const request = new grpcPubsub._SubscriptionRequest({
Expand Down Expand Up @@ -351,101 +253,13 @@ export class PubsubClient implements IPubsubClient {
}

const serviceError = err as unknown as ServiceError;

// When the first message is an error, an irrecoverable error has happened,
// eg the cache does not exist. The user should not receive a subscription
// object but an error.
if (options.firstMessage) {
this.logger.trace(
'Received subscription stream error; topic: %s',
truncateString(options.topicName)
);

options.resolve(
new TopicSubscribe.Error(cacheServiceErrorMapper(serviceError))
);
options.subscription.unsubscribe();
return;
}

// The service cuts the the stream after a period of time.
// Transparently restart the stream instead of propagating an error.
if (
const isRstStreamNoError =
serviceError.code === Status.INTERNAL &&
serviceError.details === PubsubClient.RST_STREAM_NO_ERROR_MESSAGE
) {
this.logger.trace(
'Server closed stream due to idle activity. Restarting.'
);
// When restarting the stream we do not do anything with the promises,
// because we should have already returned the subscription object to the user.
this.sendSubscribe(options)
.then(() => {
return;
})
.catch(() => {
return;
});
options.restartedDueToError = true;
return;
}

serviceError.details === PubsubClient.RST_STREAM_NO_ERROR_MESSAGE;
const momentoError = new TopicSubscribe.Error(
cacheServiceErrorMapper(serviceError)
);

// Another special case is when the cache is not found.
// This happens here if the user deletes the cache in the middle of
// a subscription.
if (momentoError.errorCode() === MomentoErrorCode.NOT_FOUND_ERROR) {
this.logger.trace(
'Stream ended due to cache not found error on topic: %s',
options.topicName
);
options.subscription.unsubscribe();
options.onError(momentoError, options.subscription);
return;
} else {
options.onError(momentoError, options.subscription);
}
};
}

private prepareEndCallback(
options: PrepareSubscribeCallbackOptions
): () => void {
return () => {
// We want to restart on stream end, except if:
// 1. The stream was cancelled by the caller.
// 2. The stream was restarted following an error.
if (options.restartedDueToError) {
this.logger.trace(
'Stream ended after error but was restarted on topic: %s',
options.topicName
);
return;
} else if (!options.subscriptionState.isSubscribed) {
this.logger.trace(
'Stream ended after unsubscribe on topic: %s',
options.topicName
);
return;
}

this.logger.trace(
'Stream ended on topic: %s; restarting.',
options.topicName
);

// When restarting the stream we do not do anything with the promises,
// because we should have already returned the subscription object to the user.
this.sendSubscribe(options)
.then(() => {
return;
})
.catch(() => {
return;
});
this.handleSubscribeError(options, momentoError, isRstStreamNoError);
};
}

Expand Down
54 changes: 6 additions & 48 deletions packages/client-sdk-nodejs/src/topic-client.ts
Original file line number Diff line number Diff line change
@@ -1,68 +1,26 @@
import {
MomentoLogger,
TopicPublish,
TopicSubscribe,
SubscribeCallOptions,
} from '.';
import {AbstractTopicClient} from '@gomomento/sdk-core/dist/src/internal/clients/pubsub/AbstractTopicClient';
import {MomentoLogger} from '.';
import {PubsubClient} from './internal/pubsub-client';
import {TopicClientProps} from './topic-client-props';
import {ITopicClient} from '@gomomento/sdk-core/dist/src/internal/clients/pubsub/ITopicClient';
import {IPubsubClient} from '@gomomento/sdk-core/dist/src/internal/clients';

/**
* Momento Topic Client.
*
* Publish and subscribe to topics.
*/
export class TopicClient implements ITopicClient {
private readonly logger: MomentoLogger;
private readonly client: IPubsubClient;
export class TopicClient extends AbstractTopicClient {
protected readonly logger: MomentoLogger;
protected readonly client: IPubsubClient;

/**
* Creates an instance of TopicClient.
*/
constructor(props: TopicClientProps) {
super();
this.logger = props.configuration.getLoggerFactory().getLogger(this);
this.logger.info('Creating Momento CacheClient');

this.client = new PubsubClient(props);
}

/**
* Publishes a value to a topic.
*
* @param {string} cacheName - The name of the cache to containing the topic to publish to.
* @param {string} topicName - The name of the topic to publish to.
* @param {string | Uint8Array} value - The value to publish.
* @returns {Promise<TopicPublish.Response>} -
* {@link TopicPublish.Success} on success.
* {@link TopicPublish.Error} on failure.
*/
public async publish(
cacheName: string,
topicName: string,
value: string | Uint8Array
): Promise<TopicPublish.Response> {
return await this.client.publish(cacheName, topicName, value);
}

/**
* Subscribes to a topic.
*
* @param {string} cacheName - The name of the cache to containing the topic to subscribe to.
* @param {string} topicName - The name of the topic to subscribe to.
* @param {SubscribeCallOptions} options - The options for the subscription. Defaults to no-op handlers.
* @param {function} options.onItem - The callback to invoke when data is received. Defaults to no-op.
* @param {function} options.onError - The callback to invoke when an error is received. Defaults to no-op.
* @returns {Promise<TopicSubscribe.Response>} -
* {@link TopicSubscribe.Subscription} on success.
* {@link TopicSubscribe.Error} on failure.
*/
public async subscribe(
cacheName: string,
topicName: string,
options: SubscribeCallOptions
): Promise<TopicSubscribe.Response> {
return await this.client.subscribe(cacheName, topicName, options);
}
}
Loading

0 comments on commit c508bc6

Please sign in to comment.