Skip to content

Commit

Permalink
fix(grpc-sdk): health check middleware injection (#851)
Browse files Browse the repository at this point in the history
* fix(grpc-sdk): health check middleware injection
fix(grpc-sdk): middlewares not injected in module clients
refactor(core): error logging in module recovery

* chore(core): unnecessary try/catch wrapper
  • Loading branch information
kkopanidis authored Dec 18, 2023
1 parent 74ba6d5 commit cb27906
Show file tree
Hide file tree
Showing 3 changed files with 22 additions and 18 deletions.
8 changes: 6 additions & 2 deletions libraries/grpc-sdk/src/classes/ConduitModule.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,10 @@ import { getGrpcSignedTokenInterceptor, getModuleNameInterceptor } from '../inte
import { CompatServiceDefinition } from 'nice-grpc/lib/service-definitions';
import { Channel, Client, createChannel, createClientFactory } from 'nice-grpc';
import { retryMiddleware } from 'nice-grpc-client-middleware-retry';
import { HealthCheckResponse, HealthDefinition } from '../protoUtils/grpc_health_check';
import { HealthCheckResponse, HealthDefinition } from '../protoUtils';
import { EventEmitter } from 'events';
import { ConduitModuleDefinition } from '../protoUtils';
import ConduitGrpcSdk from '../index';

export class ConduitModule<T extends CompatServiceDefinition> {
protected channel?: Channel;
Expand Down Expand Up @@ -63,13 +64,16 @@ export class ConduitModule<T extends CompatServiceDefinition> {
'grpc.max_receive_message_length': 1024 * 1024 * 100,
'grpc.max_send_message_length': 1024 * 1024 * 100,
});
const clientFactory = createClientFactory()
let clientFactory = createClientFactory()
.use(
this._grpcToken
? getGrpcSignedTokenInterceptor(this._grpcToken)
: getModuleNameInterceptor(this._clientName),
)
.use(retryMiddleware);
for (const interceptor of ConduitGrpcSdk.interceptors) {
clientFactory = clientFactory.use(interceptor);
}
this._client = clientFactory.create(this.type!, this.channel, {
'*': {
// https://grpc.github.io/grpc/core/md_doc_statuscodes.html
Expand Down
11 changes: 5 additions & 6 deletions libraries/grpc-sdk/src/classes/HealthCheck.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import { getGrpcSignedTokenInterceptor, getModuleNameInterceptor } from '../interceptors';
import { createChannel, createClientFactory } from 'nice-grpc';
import { HealthCheckResponse, HealthDefinition } from '../protoUtils/grpc_health_check';
import { HealthCheckResponse, HealthDefinition } from '../protoUtils';
import ConduitGrpcSdk from '../index';

export async function checkModuleHealth(
Expand All @@ -13,15 +13,14 @@ export async function checkModuleHealth(
'grpc.max_receive_message_length': 1024 * 1024 * 100,
'grpc.max_send_message_length': 1024 * 1024 * 100,
});
const clientFactory = createClientFactory();
for (const interceptor of ConduitGrpcSdk.interceptors) {
clientFactory.use(interceptor);
}
clientFactory.use(
let clientFactory = createClientFactory().use(
grpcToken
? getGrpcSignedTokenInterceptor(grpcToken)
: getModuleNameInterceptor(clientName),
);
for (const interceptor of ConduitGrpcSdk.interceptors) {
clientFactory = clientFactory.use(interceptor);
}
const _healthClient = clientFactory.create(HealthDefinition, channel);

let error;
Expand Down
21 changes: 11 additions & 10 deletions packages/core/src/config-manager/service-discovery/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,12 @@ export class ServiceDiscovery {
url: module.url,
...(module.configSchema && { configSchema: module.configSchema }),
});
} catch {}
} catch (e) {
ConduitGrpcSdk.Logger.error(
`SD: failed to recover: ${module.name} ${module.url}`,
);
ConduitGrpcSdk.Logger.error(`SD: recovery error: ${e}`);
}
}
if (state.modules.length > success.length) {
state.modules = success;
Expand Down Expand Up @@ -131,16 +136,12 @@ export class ServiceDiscovery {

async _recoverModule(moduleName: string, moduleUrl: string) {
let healthResponse;
try {
if (!this.grpcSdk.getModule(moduleName)) {
healthResponse = await this.grpcSdk.isModuleUp(moduleName, moduleUrl);
this.grpcSdk.createModuleClient(moduleName, moduleUrl);
}
} catch (e) {
ConduitGrpcSdk.Logger.error(`SD: failed to recover: ${moduleName} ${moduleUrl}`);
ConduitGrpcSdk.Logger.error(`SD: recovery error: ${e}`);
throw new Error('Failed to register unresponsive module');

if (!this.grpcSdk.getModule(moduleName)) {
healthResponse = await this.grpcSdk.isModuleUp(moduleName, moduleUrl);
this.grpcSdk.createModuleClient(moduleName, moduleUrl);
}

const healthStatus = healthResponse.status as unknown as HealthCheckStatus;
ConduitGrpcSdk.Logger.log(
`SD: registering: ${moduleName} ${moduleUrl} ${healthStatus}`,
Expand Down

0 comments on commit cb27906

Please sign in to comment.