diff --git a/packages/core/src/config-manager/service-discovery/index.ts b/packages/core/src/config-manager/service-discovery/index.ts index 2331140a5..eaf599cdf 100644 --- a/packages/core/src/config-manager/service-discovery/index.ts +++ b/packages/core/src/config-manager/service-discovery/index.ts @@ -11,6 +11,7 @@ import { ServerWritableStream, status } from '@grpc/grpc-js'; import { EventEmitter } from 'events'; import { ServiceRegistry } from './ServiceRegistry.js'; import { ServiceMonitor } from './ServiceMonitor.js'; +import { isEmpty } from 'lodash-es'; /* * - Multi-instance services are not handled individually (LoadBalancer) @@ -41,12 +42,12 @@ export class ServiceDiscovery { } async highAvailability() { - await this.grpcSdk - .state!.modifyState(async (existingState: Indexable) => { - const state = existingState ?? {}; + const state = await this.grpcSdk.state?.getState(); + if (state && !isEmpty(state)) { + const parsedState = JSON.parse(state) as { modules: IModuleConfig[] }; + if (parsedState.modules) { const success: IModuleConfig[] = []; - if (!state.modules) return state; - for (const module of state.modules) { + for (const module of parsedState.modules) { try { await this._recoverModule(module.name, module.url); success.push({ @@ -61,18 +62,20 @@ export class ServiceDiscovery { ConduitGrpcSdk.Logger.error(`SD: recovery error: ${e}`); } } - if (state.modules.length > success.length) { - state.modules = success; - } - return state; - }) - .then(() => { - ConduitGrpcSdk.Logger.log('Recovered state'); - }) - .catch(() => { - ConduitGrpcSdk.Logger.error('Failed to recover state'); - }); - + await this.grpcSdk + .state!.modifyState(async (existingState: Indexable) => { + const state = existingState ?? {}; + state.modules = success; + return state; + }) + .then(() => { + ConduitGrpcSdk.Logger.log('Recovered state'); + }) + .catch(() => { + ConduitGrpcSdk.Logger.error('Failed to recover state'); + }); + } + } this.grpcSdk.bus!.subscribe('config', (message: string) => { const parsedMessage = JSON.parse(message); if (parsedMessage.type === 'module-health') { @@ -259,10 +262,10 @@ export class ServiceDiscovery { return state; }) .then(() => { - ConduitGrpcSdk.Logger.log('Updated state'); + ConduitGrpcSdk.Logger.log(`SD: Updated state for ${name} ${url}`); }) .catch(() => { - ConduitGrpcSdk.Logger.error('Failed to update state'); + ConduitGrpcSdk.Logger.error(`SD: Failed to update state ${name} ${url}`); }); } } diff --git a/packages/core/src/interfaces/IModuleConfig.ts b/packages/core/src/interfaces/IModuleConfig.ts index 99034f9c2..fe62aaa7d 100644 --- a/packages/core/src/interfaces/IModuleConfig.ts +++ b/packages/core/src/interfaces/IModuleConfig.ts @@ -1,6 +1,6 @@ export interface IModuleConfig { name: string; - instance: string; + instance?: string; url: string; configSchema?: string; }