Skip to content

Commit

Permalink
fix(grpc-sdk): module connection not re-established after core shutdo…
Browse files Browse the repository at this point in the history
…wn (#858)

* fix(grpc-sdk): module connection not re-established after core shutdown

* chore(grpc-sdk): rm unused vars

---------

Co-authored-by: Konstantinos Feretos <[email protected]>
  • Loading branch information
kkopanidis and kon14 authored Dec 19, 2023
1 parent 75a6b7a commit b7ce4b5
Show file tree
Hide file tree
Showing 3 changed files with 42 additions and 14 deletions.
13 changes: 10 additions & 3 deletions libraries/grpc-sdk/src/classes/ConduitModule.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,12 @@ import { getGrpcSignedTokenInterceptor, getModuleNameInterceptor } from '../inte
import { CompatServiceDefinition } from 'nice-grpc/lib/service-definitions';
import { Channel, Client, createChannel, createClientFactory } from 'nice-grpc';
import { retryMiddleware } from 'nice-grpc-client-middleware-retry';
import { HealthCheckResponse, HealthDefinition } from '../protoUtils';
import {
ConduitModuleDefinition,
HealthCheckResponse,
HealthDefinition,
} from '../protoUtils';
import { EventEmitter } from 'events';
import { ConduitModuleDefinition } from '../protoUtils';
import ConduitGrpcSdk from '../index';

export class ConduitModule<T extends CompatServiceDefinition> {
Expand Down Expand Up @@ -58,7 +61,11 @@ export class ConduitModule<T extends CompatServiceDefinition> {
}

openConnection() {
if (this.channel) return;
if (this.channel) {
// used to make sure a connection attempt is made
this.channel.getConnectivityState(true);
return;
}
// ConduitGrpcSdk.Logger.log(`Opening connection for ${this._serviceName}`);
this.channel = createChannel(this._serviceUrl, undefined, {
'grpc.max_receive_message_length': 1024 * 1024 * 100,
Expand Down
28 changes: 23 additions & 5 deletions libraries/grpc-sdk/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -274,19 +274,24 @@ export default class ConduitGrpcSdk {
return this._initialize();
}
(this._core as unknown) = new Core(this.name, this.serverUrl, this._grpcToken);
await this.connectToCore().catch(() => process.exit(-1));
this._initialize();
}

async connectToCore() {
ConduitGrpcSdk.Logger.log('Waiting for Core...');
while (true) {
try {
this.core.openConnection();
const state = await this.core.check();
if ((state as unknown as HealthCheckStatus) === HealthCheckStatus.SERVING) {
ConduitGrpcSdk.Logger.log('Core connection established');
this._initialize();
break;
return;
}
} catch (err) {
if ((err as GrpcError).code === status.PERMISSION_DENIED) {
ConduitGrpcSdk.Logger.error(err as Error);
process.exit(-1);
throw err;
}
await sleep(1000);
}
Expand Down Expand Up @@ -319,6 +324,15 @@ export default class ConduitGrpcSdk {
emitter.emit(`module-connection-update:${m.moduleName}`, true);
});
});
emitter.on('core-status-update', () => {
this.connectToCore()
.then(() => {
this._initialize();
})
.catch(() => {
process.exit(-1);
});
});
}

monitorModule(
Expand Down Expand Up @@ -528,8 +542,12 @@ export default class ConduitGrpcSdk {
}

private _initialize() {
if (this._initialized)
throw new Error("Module's grpc-sdk has already been initialized");
if (this._initialized) {
this._config?.openConnection();
this._admin?.openConnection();
this.config.watchModules().then();
return;
}
(this._config as unknown) = new Config(
this.name,
this.serverUrl,
Expand Down
15 changes: 9 additions & 6 deletions libraries/grpc-sdk/src/modules/config/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import {
ConfigDefinition,
ModuleHealthRequest,
RegisterModuleRequest,
} from '../../protoUtils/core';
} from '../../protoUtils';
import { Indexable } from '../../interfaces';
import ConduitGrpcSdk from '../../index';
import { ClusterOptions, RedisOptions } from 'ioredis';
Expand All @@ -24,6 +24,7 @@ export class Config extends ConduitModule<typeof ConfigDefinition> {
super(moduleName, 'config', url, grpcToken);
this.initializeClient(ConfigDefinition);
this._serviceHealthStatusGetter = serviceHealthStatusGetter;
this.emitter.setMaxListeners(150);
}

getServerConfig() {
Expand Down Expand Up @@ -150,17 +151,19 @@ export class Config extends ConduitModule<typeof ConfigDefinition> {
}

async watchModules() {
const self = this;
this.emitter.setMaxListeners(150);
self.emitter.emit('serving-modules-update', await self.moduleList().catch());
if (!this.coreLive) {
this.coreLive = true;
}
this.emitter.emit('serving-modules-update', await this.moduleList().catch());
try {
const call = this.client!.watchModules({});
for await (const data of call) {
self.emitter.emit('serving-modules-update', data.modules);
this.emitter.emit('serving-modules-update', data.modules);
}
} catch (error) {
self.coreLive = false;
this.coreLive = false;
ConduitGrpcSdk.Logger.warn('Core unhealthy');
this.emitter.emit('core-status-update', HealthCheckStatus.UNKNOWN);
}
}
}

0 comments on commit b7ce4b5

Please sign in to comment.