From cb279068c6408b36339b381c837e8de7cca7e83a Mon Sep 17 00:00:00 2001 From: Konstantinos Kopanidis Date: Mon, 18 Dec 2023 21:48:13 +0200 Subject: [PATCH] fix(grpc-sdk): health check middleware injection (#851) * fix(grpc-sdk): health check middleware injection fix(grpc-sdk): middlewares not injected in module clients refactor(core): error logging in module recovery * chore(core): unnecessary try/catch wrapper --- .../grpc-sdk/src/classes/ConduitModule.ts | 8 +++++-- libraries/grpc-sdk/src/classes/HealthCheck.ts | 11 +++++----- .../config-manager/service-discovery/index.ts | 21 ++++++++++--------- 3 files changed, 22 insertions(+), 18 deletions(-) diff --git a/libraries/grpc-sdk/src/classes/ConduitModule.ts b/libraries/grpc-sdk/src/classes/ConduitModule.ts index 21e3008af..d84178eeb 100644 --- a/libraries/grpc-sdk/src/classes/ConduitModule.ts +++ b/libraries/grpc-sdk/src/classes/ConduitModule.ts @@ -2,9 +2,10 @@ import { getGrpcSignedTokenInterceptor, getModuleNameInterceptor } from '../inte import { CompatServiceDefinition } from 'nice-grpc/lib/service-definitions'; import { Channel, Client, createChannel, createClientFactory } from 'nice-grpc'; import { retryMiddleware } from 'nice-grpc-client-middleware-retry'; -import { HealthCheckResponse, HealthDefinition } from '../protoUtils/grpc_health_check'; +import { HealthCheckResponse, HealthDefinition } from '../protoUtils'; import { EventEmitter } from 'events'; import { ConduitModuleDefinition } from '../protoUtils'; +import ConduitGrpcSdk from '../index'; export class ConduitModule { protected channel?: Channel; @@ -63,13 +64,16 @@ export class ConduitModule { 'grpc.max_receive_message_length': 1024 * 1024 * 100, 'grpc.max_send_message_length': 1024 * 1024 * 100, }); - const clientFactory = createClientFactory() + let clientFactory = createClientFactory() .use( this._grpcToken ? getGrpcSignedTokenInterceptor(this._grpcToken) : getModuleNameInterceptor(this._clientName), ) .use(retryMiddleware); + for (const interceptor of ConduitGrpcSdk.interceptors) { + clientFactory = clientFactory.use(interceptor); + } this._client = clientFactory.create(this.type!, this.channel, { '*': { // https://grpc.github.io/grpc/core/md_doc_statuscodes.html diff --git a/libraries/grpc-sdk/src/classes/HealthCheck.ts b/libraries/grpc-sdk/src/classes/HealthCheck.ts index 3a19f977b..97d637a5f 100644 --- a/libraries/grpc-sdk/src/classes/HealthCheck.ts +++ b/libraries/grpc-sdk/src/classes/HealthCheck.ts @@ -1,6 +1,6 @@ import { getGrpcSignedTokenInterceptor, getModuleNameInterceptor } from '../interceptors'; import { createChannel, createClientFactory } from 'nice-grpc'; -import { HealthCheckResponse, HealthDefinition } from '../protoUtils/grpc_health_check'; +import { HealthCheckResponse, HealthDefinition } from '../protoUtils'; import ConduitGrpcSdk from '../index'; export async function checkModuleHealth( @@ -13,15 +13,14 @@ export async function checkModuleHealth( 'grpc.max_receive_message_length': 1024 * 1024 * 100, 'grpc.max_send_message_length': 1024 * 1024 * 100, }); - const clientFactory = createClientFactory(); - for (const interceptor of ConduitGrpcSdk.interceptors) { - clientFactory.use(interceptor); - } - clientFactory.use( + let clientFactory = createClientFactory().use( grpcToken ? getGrpcSignedTokenInterceptor(grpcToken) : getModuleNameInterceptor(clientName), ); + for (const interceptor of ConduitGrpcSdk.interceptors) { + clientFactory = clientFactory.use(interceptor); + } const _healthClient = clientFactory.create(HealthDefinition, channel); let error; diff --git a/packages/core/src/config-manager/service-discovery/index.ts b/packages/core/src/config-manager/service-discovery/index.ts index fa5186be6..fec70c9e5 100644 --- a/packages/core/src/config-manager/service-discovery/index.ts +++ b/packages/core/src/config-manager/service-discovery/index.ts @@ -54,7 +54,12 @@ export class ServiceDiscovery { url: module.url, ...(module.configSchema && { configSchema: module.configSchema }), }); - } catch {} + } catch (e) { + ConduitGrpcSdk.Logger.error( + `SD: failed to recover: ${module.name} ${module.url}`, + ); + ConduitGrpcSdk.Logger.error(`SD: recovery error: ${e}`); + } } if (state.modules.length > success.length) { state.modules = success; @@ -131,16 +136,12 @@ export class ServiceDiscovery { async _recoverModule(moduleName: string, moduleUrl: string) { let healthResponse; - try { - if (!this.grpcSdk.getModule(moduleName)) { - healthResponse = await this.grpcSdk.isModuleUp(moduleName, moduleUrl); - this.grpcSdk.createModuleClient(moduleName, moduleUrl); - } - } catch (e) { - ConduitGrpcSdk.Logger.error(`SD: failed to recover: ${moduleName} ${moduleUrl}`); - ConduitGrpcSdk.Logger.error(`SD: recovery error: ${e}`); - throw new Error('Failed to register unresponsive module'); + + if (!this.grpcSdk.getModule(moduleName)) { + healthResponse = await this.grpcSdk.isModuleUp(moduleName, moduleUrl); + this.grpcSdk.createModuleClient(moduleName, moduleUrl); } + const healthStatus = healthResponse.status as unknown as HealthCheckStatus; ConduitGrpcSdk.Logger.log( `SD: registering: ${moduleName} ${moduleUrl} ${healthStatus}`,