From 8709eaff9874874af604380acb1961d6c2fedea1 Mon Sep 17 00:00:00 2001 From: Konstantinos Kopanidis Date: Fri, 25 Oct 2024 17:17:24 +0300 Subject: [PATCH 1/5] refactor(core): rework proto files to reflect changes for instanceId refactor(grpc-sdk,module-tools): implement proto changes refactor(core): rework the service registry --- .../grpc-sdk/src/modules/config/index.ts | 5 +- libraries/grpc-sdk/src/utilities/EventBus.ts | 2 + libraries/module-tools/src/ManagedModule.ts | 16 ++- .../src/classes/ConduitServiceModule.ts | 11 +- .../src/interfaces/RegisteredModule.ts | 4 - packages/core/src/config-manager/index.ts | 12 +- .../config-manager/models/Basic.message.ts | 3 + .../config-manager/models/RegisteredModule.ts | 130 ++++++++++++++++++ .../models/ServiceDiscover.message.ts | 10 ++ .../service-discovery/ServiceRegistry.ts | 24 ++-- .../config-manager/service-discovery/index.ts | 47 ++++--- .../core/src/config-manager/utils/index.ts | 25 ++++ packages/core/src/core.proto | 5 + 13 files changed, 249 insertions(+), 45 deletions(-) delete mode 100644 packages/commons/src/interfaces/RegisteredModule.ts create mode 100644 packages/core/src/config-manager/models/Basic.message.ts create mode 100644 packages/core/src/config-manager/models/RegisteredModule.ts create mode 100644 packages/core/src/config-manager/models/ServiceDiscover.message.ts create mode 100644 packages/core/src/config-manager/utils/index.ts diff --git a/libraries/grpc-sdk/src/modules/config/index.ts b/libraries/grpc-sdk/src/modules/config/index.ts index 8cfce82c9..c385a0b2d 100644 --- a/libraries/grpc-sdk/src/modules/config/index.ts +++ b/libraries/grpc-sdk/src/modules/config/index.ts @@ -105,11 +105,13 @@ export class Config extends ConduitModule { name: string, url: string, healthStatus: Omit, + instanceId: string, ) { const request: RegisterModuleRequest = { moduleName: name.toString(), url: url.toString(), healthStatus: healthStatus as number, + instanceId, }; const self = this; return this.client!.registerModule(request).then(res => { @@ -118,13 +120,14 @@ export class Config extends ConduitModule { }); } - moduleHealthProbe(name: string, url: string) { + moduleHealthProbe(name: string, url: string, instanceId: string) { const request: ModuleHealthRequest = { moduleName: name.toString(), url, status: this._serviceHealthStatusGetter ? this._serviceHealthStatusGetter() : HealthCheckStatus.SERVICE_UNKNOWN, + instanceId, }; const self = this; this.client!.moduleHealthProbe(request) diff --git a/libraries/grpc-sdk/src/utilities/EventBus.ts b/libraries/grpc-sdk/src/utilities/EventBus.ts index 9d30ae907..bf6a06e99 100644 --- a/libraries/grpc-sdk/src/utilities/EventBus.ts +++ b/libraries/grpc-sdk/src/utilities/EventBus.ts @@ -16,6 +16,7 @@ export class EventBus { this._clientSubscriber = redisManager.getClient({ keyPrefix: 'bus_' }); this._clientPublisher = redisManager.getClient({ keyPrefix: 'bus_' }); this._signature = crypto.randomBytes(20).toString('hex'); + //@ts-expect-error this._clientSubscriber.on('ready', () => { ConduitGrpcSdk.Logger.log('The Bus is in the station...hehe'); }); @@ -59,6 +60,7 @@ export class EventBus { this._subscribedChannels[channelName] = [callback]; this._clientSubscriber.subscribe(channelName, () => {}); const self = this; + //@ts-expect-error this._clientSubscriber.on('message', (channel: string, message: string) => { if (channel !== channelName) return; // if the message supports the signature diff --git a/libraries/module-tools/src/ManagedModule.ts b/libraries/module-tools/src/ManagedModule.ts index 3536d1a27..1284bbb15 100644 --- a/libraries/module-tools/src/ManagedModule.ts +++ b/libraries/module-tools/src/ManagedModule.ts @@ -14,6 +14,7 @@ import { initializeSdk, merge } from './utilities/index.js'; import { convictConfigParser } from './utilities/convictConfigParser.js'; import { RoutingManager } from './routing/index.js'; import { RoutingController } from './routing/RoutingController.js'; +import crypto from 'crypto'; export abstract class ManagedModule extends ConduitServiceModule { readonly config?: convict.Config; @@ -25,8 +26,8 @@ export abstract class ManagedModule extends ConduitServiceModule { private readonly servicePort: string; private _moduleState: ModuleLifecycleStage; - protected constructor(moduleName: string) { - super(moduleName); + protected constructor(moduleName: string, instanceId?: string) { + super(moduleName, instanceId ?? crypto.randomBytes(16).toString('hex')); this._moduleState = ModuleLifecycleStage.CREATE_GRPC; if (!process.env.CONDUIT_SERVER) { throw new Error('CONDUIT_SERVER is undefined, specify Conduit server URL'); @@ -48,6 +49,10 @@ export abstract class ManagedModule extends ConduitServiceModule { return this._moduleName; } + get instanceId() { + return this._instanceId; + } + get address() { return this._address; } @@ -57,7 +62,12 @@ export abstract class ManagedModule extends ConduitServiceModule { this.initialize(this.serviceAddress, this.servicePort); try { await this.preRegisterLifecycle(); - await this.grpcSdk.config.registerModule(this.name, this.address, this.healthState); + await this.grpcSdk.config.registerModule( + this.name, + this.address, + this.healthState, + this.instanceId, + ); } catch (err) { ConduitGrpcSdk.Logger.error('Failed to initialize server'); ConduitGrpcSdk.Logger.error(err as Error); diff --git a/libraries/module-tools/src/classes/ConduitServiceModule.ts b/libraries/module-tools/src/classes/ConduitServiceModule.ts index d00c0cd45..5276731ff 100644 --- a/libraries/module-tools/src/classes/ConduitServiceModule.ts +++ b/libraries/module-tools/src/classes/ConduitServiceModule.ts @@ -20,16 +20,19 @@ import { fileURLToPath } from 'node:url'; const __filename = fileURLToPath(import.meta.url); const __dirname = path.dirname(__filename); + export abstract class ConduitServiceModule { protected readonly _moduleName: string; + protected readonly _instanceId: string; protected _serviceName?: string; protected _address!: string; // external address:port of service (LoadBalancer) protected grpcServer!: GrpcServer; protected readonly events: EventEmitter = new EventEmitter(); private _serviceHealthState: HealthCheckStatus = HealthCheckStatus.SERVING; // default for health-agnostic modules - protected constructor(moduleName: string) { + protected constructor(moduleName: string, instanceId: string) { this._moduleName = camelCase(moduleName); + this._instanceId = instanceId; } private _registered = false; @@ -83,7 +86,11 @@ export abstract class ConduitServiceModule { // do not emit health updates until registered if (this._registered) { this.events.emit(`grpc-health-change:${this._serviceName}`, state); - this._grpcSdk?.config.moduleHealthProbe(this._moduleName, this._address); + this._grpcSdk?.config.moduleHealthProbe( + this._moduleName, + this._address, + this._instanceId, + ); } } } diff --git a/packages/commons/src/interfaces/RegisteredModule.ts b/packages/commons/src/interfaces/RegisteredModule.ts deleted file mode 100644 index 7401c23d8..000000000 --- a/packages/commons/src/interfaces/RegisteredModule.ts +++ /dev/null @@ -1,4 +0,0 @@ -export interface RegisteredModule { - address: string; - serving: boolean; -} diff --git a/packages/core/src/config-manager/index.ts b/packages/core/src/config-manager/index.ts index be1ad472c..a1285282e 100644 --- a/packages/core/src/config-manager/index.ts +++ b/packages/core/src/config-manager/index.ts @@ -8,9 +8,12 @@ import { } from '@conduitplatform/grpc-sdk'; import { ConduitCommons, + GetConfigRequest, GetConfigResponse, GetRedisDetailsResponse, IConfigManager, + ModuleByNameRequest, + ModuleByNameResponse, UpdateConfigRequest, UpdateConfigResponse, } from '@conduitplatform/commons'; @@ -48,8 +51,8 @@ export default class ConfigManager implements IConfigManager { } getModuleUrlByNameGrpc( - call: GrpcRequest<{ name: string }>, - callback: GrpcResponse<{ moduleUrl: string }>, + call: GrpcRequest, + callback: GrpcResponse, ) { const name = call.request.name; const result = this.getModuleUrlByName(name); @@ -156,7 +159,10 @@ export default class ConfigManager implements IConfigManager { this._configStorage.onDatabaseAvailable(); } - getGrpc(call: GrpcRequest<{ key: string }>, callback: GrpcResponse<{ data: string }>) { + getGrpc( + call: GrpcRequest, + callback: GrpcResponse, + ) { this.get(call.request.key).then(r => { if (!r) { return callback({ diff --git a/packages/core/src/config-manager/models/Basic.message.ts b/packages/core/src/config-manager/models/Basic.message.ts new file mode 100644 index 000000000..973a2f059 --- /dev/null +++ b/packages/core/src/config-manager/models/Basic.message.ts @@ -0,0 +1,3 @@ +export type BasicMessage = { + type: 'module-health' | 'serving-modules-update'; +}; diff --git a/packages/core/src/config-manager/models/RegisteredModule.ts b/packages/core/src/config-manager/models/RegisteredModule.ts new file mode 100644 index 000000000..cc5c1a0c7 --- /dev/null +++ b/packages/core/src/config-manager/models/RegisteredModule.ts @@ -0,0 +1,130 @@ +import { formatAddress, getAddressType } from '../utils/index.js'; +import dns from 'node:dns'; +import { ConduitGrpcSdk } from '@conduitplatform/grpc-sdk'; + +export type ModuleInstance = { + instanceId: string; + address: string; + serving: boolean; + addressType?: 'ipv4' | 'ipv6' | 'dns'; +}; + +export class RegisteredModule { + private readonly _name: string; + private _instances: ModuleInstance[] = []; + // Should always be array of IP addresses of the same type, + // but may contain either ipv4 or ipv6 addresses. + private _resolvedAddresses: string[] = []; + // Should always be array of IP addresses of the same type + private _servingAddresses: string[] = []; + + constructor(name: string); + constructor(name: string, instances: ModuleInstance[]); + constructor(name: string, instances?: ModuleInstance[]) { + this._name = name; + if (instances) { + for (const instance of instances) { + this.addOrUpdateInstance(instance); + } + } + } + + get name() { + return this._name; + } + + public get isServing() { + return this._servingAddresses.length > 0; + } + + public get servingAddress() { + if (this._servingAddresses.length === 0) { + return undefined; + } + const addressType = getAddressType(this._servingAddresses[0]); + return `${addressType}:${this._servingAddresses.join(',')}`; + } + + public get allAddresses() { + const addressType = getAddressType(this._resolvedAddresses[0]); + return `${addressType}:${this._resolvedAddresses.join(',')}`; + } + + public addOrUpdateInstance(instance: ModuleInstance) { + const index = this._instances.findIndex(i => i.instanceId === instance.instanceId); + if (!instance.addressType) { + instance.addressType = getAddressType(instance.address); + } + if (index === -1) { + this._instances.push(instance); + } else { + this._instances[index] = instance; + } + this.resolveAddresses(instance.instanceId); + } + + private resolveAddresses(instanceId: string) { + const instance = this._instances.find(i => i.instanceId === instanceId); + if (!instance) return; + const addressType = instance.addressType; + if (addressType! === 'dns') { + // We shouldn't need to extend this to resolve ipv6 addresses, + // since all communications inside the mesh should be ipv4 + dns.resolve(instance.address, (err, addresses) => { + if (err) { + throw new Error('DNS resolution failed'); + } + for (const address of addresses) { + const formattedAddress = formatAddress(address, addressType!); + this._resolvedAddresses.push(formattedAddress); + if (instance.serving) { + this._servingAddresses.push(formattedAddress); + } + } + this.updateServingList(); + }); + } else { + this._resolvedAddresses = this._instances.map(i => + formatAddress(i.address, i.addressType!), + ); + this.updateServingList(); + } + // check if all instances have the same address type + const allSameType = this._instances.every(i => i.addressType === addressType); + if (!allSameType) { + ConduitGrpcSdk.Logger.warn( + `Module ${this._name} has instances with different address types. Some instances will be unavailable for communication.`, + ); + } + } + + private updateServingList() { + // count the number of instances that are serving per address type + const servingCount: Record = {}; + for (const instance of this._instances) { + if (instance.serving) { + servingCount[instance.addressType!] = + (servingCount[instance.addressType!] || 0) + 1; + } + } + // get the address type with the most serving instances + const maxServing = Math.max(...Object.values(servingCount)); + const mostServingType = Object.keys(servingCount).find( + type => servingCount[type] === maxServing, + ); + // update the serving addresses + this._servingAddresses = this._resolvedAddresses.filter(address => + this._instances.some( + i => i.address === address && i.addressType === mostServingType, + ), + ); + } + + public removeInstance(instanceId: string) { + this._instances = this._instances.filter(i => i.instanceId !== instanceId); + } + + public getInstance(instanceId: string) { + return this._instances.find(i => i.instanceId === instanceId); + } +} diff --git a/packages/core/src/config-manager/models/ServiceDiscover.message.ts b/packages/core/src/config-manager/models/ServiceDiscover.message.ts new file mode 100644 index 000000000..7a823116f --- /dev/null +++ b/packages/core/src/config-manager/models/ServiceDiscover.message.ts @@ -0,0 +1,10 @@ +import { HealthCheckStatus } from '@conduitplatform/grpc-sdk'; +import { BasicMessage } from './Basic.message.js'; + +export type ServiceDiscoverMessage = BasicMessage & { + name: string; + url: string; + status: HealthCheckStatus; + addressType: 'ipv4' | 'ipv6' | 'dns'; + instanceId: string; +}; diff --git a/packages/core/src/config-manager/service-discovery/ServiceRegistry.ts b/packages/core/src/config-manager/service-discovery/ServiceRegistry.ts index a55ff16ee..553e74949 100644 --- a/packages/core/src/config-manager/service-discovery/ServiceRegistry.ts +++ b/packages/core/src/config-manager/service-discovery/ServiceRegistry.ts @@ -1,5 +1,5 @@ import { ConduitGrpcSdk } from '@conduitplatform/grpc-sdk'; -import { RegisteredModule } from '@conduitplatform/commons'; +import { ModuleInstance, RegisteredModule } from '../models/RegisteredModule.js'; export class ServiceRegistry { private static _instance: ServiceRegistry; @@ -24,29 +24,27 @@ export class ServiceRegistry { return this.registeredModules.get(moduleName); } - updateModule(moduleName: string, module: RegisteredModule) { - this.registeredModules.set(moduleName, module); + updateModule(moduleName: string, module: ModuleInstance) { + const existingModule = this.registeredModules.get(moduleName); + if (existingModule) { + existingModule.addOrUpdateInstance(module); + } else { + const newModule = new RegisteredModule(moduleName, [module]); + this.registeredModules.set(moduleName, newModule); + } } removeModule(moduleName: string) { this.registeredModules.delete(moduleName); } - getModuleDetails( - moduleName: string, - ): { moduleName: string; url: string; serving: boolean } | undefined { - const module = this.registeredModules.get(moduleName); - if (!module) return undefined; - return { moduleName: moduleName, url: module.address, serving: module.serving }; - } - getModuleDetailsList(): { moduleName: string; url: string; serving: boolean }[] { const modules: { moduleName: string; url: string; serving: boolean }[] = []; this.registeredModules.forEach((value: RegisteredModule, key: string) => { modules.push({ moduleName: key, - url: value.address, - serving: value.serving, + url: value.servingAddress ?? value.allAddresses, + serving: value.isServing, }); }); return modules; diff --git a/packages/core/src/config-manager/service-discovery/index.ts b/packages/core/src/config-manager/service-discovery/index.ts index 9ed5a2226..cef227da4 100644 --- a/packages/core/src/config-manager/service-discovery/index.ts +++ b/packages/core/src/config-manager/service-discovery/index.ts @@ -1,4 +1,10 @@ -import { ModuleListResponse } from '@conduitplatform/commons'; +import { + ModuleExistsRequest, + ModuleExistsResponse, + ModuleListResponse, + RegisterModuleRequest, + RegisterModuleResponse, +} from '@conduitplatform/commons'; import { ConduitGrpcSdk, GrpcCallback, @@ -13,6 +19,7 @@ import { EventEmitter } from 'events'; import { ServiceRegistry } from './ServiceRegistry.js'; import { ServiceMonitor } from './ServiceMonitor.js'; import { isEmpty } from 'lodash-es'; +import { ServiceDiscoverMessage } from '../models/ServiceDiscover.message.js'; /* * - Multi-instance services are not handled individually (LoadBalancer) @@ -78,7 +85,7 @@ export class ServiceDiscovery { } } this.grpcSdk.bus!.subscribe('service-discover', (message: string) => { - const parsedMessage = JSON.parse(message); + const parsedMessage: ServiceDiscoverMessage = JSON.parse(message); if (parsedMessage.type === 'module-health') { this._serviceMonitor.updateModuleHealth( parsedMessage.name, @@ -89,6 +96,7 @@ export class ServiceDiscovery { this._serviceRegistry.updateModule(parsedMessage.name, { address: parsedMessage.url, serving: true, + instanceId: parsedMessage.instanceId, }); } }); @@ -185,10 +193,13 @@ export class ServiceDiscovery { this._serviceMonitor.updateModuleHealth(moduleName, moduleUrl, healthStatus!); } - async registerModule(call: any, callback: GrpcResponse<{ result: boolean }>) { + async registerModule( + call: GrpcRequest, + callback: GrpcResponse, + ) { if ( - call.request.status < HealthCheckStatus.UNKNOWN || - call.request.status > HealthCheckStatus.NOT_SERVING + call.request.healthStatus < HealthCheckStatus.UNKNOWN || + call.request.healthStatus > HealthCheckStatus.NOT_SERVING ) { callback({ code: status.INVALID_ARGUMENT, @@ -212,8 +223,8 @@ export class ServiceDiscovery { } moduleExists( - call: GrpcRequest<{ moduleName: string }>, - callback: GrpcResponse<{ url: string }>, + call: GrpcRequest, + callback: GrpcResponse, ) { const module = this._serviceRegistry.getModule(call.request.moduleName); if (module) { @@ -227,20 +238,18 @@ export class ServiceDiscovery { } private publishModuleData( - type: string, + type: 'module-health' | 'serving-modules-update', name: string, - url?: string, - status?: HealthCheckStatus, + url: string, + status: HealthCheckStatus, ) { - this.grpcSdk.bus!.publish( - 'service-discover', - JSON.stringify({ - type, - name, - url, - status, - }), - ); + const serviceDiscoverMessage: ServiceDiscoverMessage = { + type, + name, + url, + status, + }; + this.grpcSdk.bus!.publish('service-discover', JSON.stringify(serviceDiscoverMessage)); } private updateState(name: string, url: string) { diff --git a/packages/core/src/config-manager/utils/index.ts b/packages/core/src/config-manager/utils/index.ts new file mode 100644 index 000000000..ecd86527c --- /dev/null +++ b/packages/core/src/config-manager/utils/index.ts @@ -0,0 +1,25 @@ +export const getAddressType = (address: string) => { + const ipv4Regex = new RegExp( + '^((25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?)\\.){3}(25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?)$', + ); + const ipv6Regex = new RegExp('^([0-9a-fA-F]{1,4}:){7}([0-9a-fA-F]{1,4}|:)$'); + if (ipv4Regex.test(address)) { + return 'ipv4'; + } else if (ipv6Regex.test(address)) { + return 'ipv6'; + } else { + return 'dns'; + } +}; + +export const formatAddress = (address: string, addressType: 'ipv4' | 'ipv6' | 'dns') => { + if (addressType === 'dns') { + return address; + } else if (addressType === 'ipv4') { + return `${address}`; + } else { + const [ipPart, port] = address.includes(']:') ? address.split(']:') : [address, '']; + const ip = ipPart.replace(/^\[|\]$/g, ''); + return port ? `[${ip}]:${port}` : `${ip}`; + } +}; diff --git a/packages/core/src/core.proto b/packages/core/src/core.proto index 78129e42c..d9f6db2ad 100644 --- a/packages/core/src/core.proto +++ b/packages/core/src/core.proto @@ -80,6 +80,7 @@ message ModuleHealthRequest { string moduleName = 1; string url = 2; int32 status = 3; // [0, 1, 2] + string instanceId = 4; } message ModuleByNameRequest { @@ -88,12 +89,14 @@ message ModuleByNameRequest { message ModuleByNameResponse { string moduleUrl = 1; + repeated string instances = 2; } message RegisterModuleRequest { string moduleName = 1; string url = 2; int32 healthStatus = 3; // [0, 1, 2] + string instanceId = 4; } message RegisterModuleResponse { @@ -106,6 +109,7 @@ message ModuleListResponse { string moduleName = 1; string url = 2; bool serving = 3; + repeated string instances = 4; } } @@ -115,6 +119,7 @@ message ModuleExistsRequest { message ModuleExistsResponse { string url = 1; + repeated string instances = 2; } message GetConfigRequest { From 50fedec98c8af519a113581d4e92dcca48da2e52 Mon Sep 17 00:00:00 2001 From: Konstantinos Kopanidis Date: Fri, 22 Nov 2024 18:34:02 +0200 Subject: [PATCH 2/5] refactor(grpc-sdk): remove url from health probe fix(grpc-sdk): onConfig not invoked on module start --- libraries/grpc-sdk/src/modules/config/index.ts | 1 - libraries/module-tools/src/ManagedModule.ts | 11 ++++++----- 2 files changed, 6 insertions(+), 6 deletions(-) diff --git a/libraries/grpc-sdk/src/modules/config/index.ts b/libraries/grpc-sdk/src/modules/config/index.ts index c385a0b2d..e3864d93d 100644 --- a/libraries/grpc-sdk/src/modules/config/index.ts +++ b/libraries/grpc-sdk/src/modules/config/index.ts @@ -123,7 +123,6 @@ export class Config extends ConduitModule { moduleHealthProbe(name: string, url: string, instanceId: string) { const request: ModuleHealthRequest = { moduleName: name.toString(), - url, status: this._serviceHealthStatusGetter ? this._serviceHealthStatusGetter() : HealthCheckStatus.SERVICE_UNKNOWN, diff --git a/libraries/module-tools/src/ManagedModule.ts b/libraries/module-tools/src/ManagedModule.ts index 1284bbb15..623cf0d40 100644 --- a/libraries/module-tools/src/ManagedModule.ts +++ b/libraries/module-tools/src/ManagedModule.ts @@ -27,7 +27,10 @@ export abstract class ManagedModule extends ConduitServiceModule { private _moduleState: ModuleLifecycleStage; protected constructor(moduleName: string, instanceId?: string) { - super(moduleName, instanceId ?? crypto.randomBytes(16).toString('hex')); + super( + moduleName, + `${moduleName}-${instanceId ?? crypto.randomBytes(16).toString('hex')}`, + ); this._moduleState = ModuleLifecycleStage.CREATE_GRPC; if (!process.env.CONDUIT_SERVER) { throw new Error('CONDUIT_SERVER is undefined, specify Conduit server URL'); @@ -267,11 +270,9 @@ export abstract class ManagedModule extends ConduitServiceModule { convictConfigParser(configSchema), this.configOverride, ); + ConfigController.getInstance().config = config; - ConfigController.getInstance(); - if (config) ConfigController.getInstance().config = config; - if (!config || config.active || !config.hasOwnProperty('active')) - await this.onConfig(); + await this.onConfig(); } } } From da8d9c6267f7ab9524371b1ed856faad067e2f3c Mon Sep 17 00:00:00 2001 From: Konstantinos Kopanidis Date: Fri, 22 Nov 2024 18:34:26 +0200 Subject: [PATCH 3/5] fix(forms): initialization steps --- modules/forms/src/Forms.ts | 21 ++++++++------------- modules/forms/src/routes/index.ts | 26 ++++++++++++++------------ 2 files changed, 22 insertions(+), 25 deletions(-) diff --git a/modules/forms/src/Forms.ts b/modules/forms/src/Forms.ts index 589e62ef6..755bc64b0 100644 --- a/modules/forms/src/Forms.ts +++ b/modules/forms/src/Forms.ts @@ -32,17 +32,6 @@ export default class Forms extends ManagedModule { this.database = this.grpcSdk.database!; await runMigrations(this.grpcSdk); await this.registerSchemas(); - this.grpcSdk.monitorModule('email', serving => { - if (serving && ConfigController.getInstance().config.active) { - this.onConfig() - .then() - .catch(() => { - ConduitGrpcSdk.Logger.error('Failed to update Forms configuration'); - }); - } else { - this.updateHealth(HealthCheckStatus.NOT_SERVING); - } - }); } async onConfig() { @@ -50,8 +39,6 @@ export default class Forms extends ManagedModule { this.updateHealth(HealthCheckStatus.NOT_SERVING); } else { if (!this.isRunning) { - if (!this.grpcSdk.isAvailable('email')) return; - await this.grpcSdk.emailProvider!.registerTemplate(FormSubmissionTemplate); this.formController = new FormsController(this.grpcSdk); this.adminRouter = new AdminHandlers( this.grpcServer, @@ -68,6 +55,14 @@ export default class Forms extends ManagedModule { .catch(e => { ConduitGrpcSdk.Logger.error(e.message); }); + this.grpcSdk + .waitForExistence('email') + .then(() => { + this.grpcSdk.emailProvider!.registerTemplate(FormSubmissionTemplate); + }) + .catch(e => { + ConduitGrpcSdk.Logger.error(e.message); + }); } this.updateHealth(HealthCheckStatus.SERVING); } diff --git a/modules/forms/src/routes/index.ts b/modules/forms/src/routes/index.ts index 9b06f8df3..7d7ab16dd 100644 --- a/modules/forms/src/routes/index.ts +++ b/modules/forms/src/routes/index.ts @@ -94,18 +94,20 @@ export class FormsRoutes { Object.keys(data).forEach(r => { text += `
${r}: ${data[r]}`; }); - await this.grpcSdk - .emailProvider!.sendEmail('FormSubmission', { - email: form.forwardTo, - replyTo: form.emailField ? data[form.emailField] : null, - variables: { - data: text, - }, - attachments: Object.values(fileData), - }) - .catch((e: Error) => { - throw new GrpcError(status.INTERNAL, e.message); - }); + if (this.grpcSdk.isAvailable('email')) { + await this.grpcSdk + .emailProvider!.sendEmail('FormSubmission', { + email: form.forwardTo, + replyTo: form.emailField ? data[form.emailField] : null, + variables: { + data: text, + }, + attachments: Object.values(fileData), + }) + .catch((e: Error) => { + throw new GrpcError(status.INTERNAL, e.message); + }); + } return 'Ok'; } From 86f8f996e49776369aaccf64a879e41390294460 Mon Sep 17 00:00:00 2001 From: Konstantinos Kopanidis Date: Fri, 22 Nov 2024 18:34:56 +0200 Subject: [PATCH 4/5] refactor(core)!: service monitoring and discovery rework --- packages/commons/src/interfaces/index.ts | 1 - packages/core/src/config-manager/index.ts | 15 +- .../config-manager/models/Basic.message.ts | 2 +- .../config-manager/models/RegisteredModule.ts | 125 +++++++-------- .../models/ServiceDiscover.message.ts | 5 +- .../service-discovery/ServiceMonitor.ts | 147 ++++++++--------- .../service-discovery/ServiceRegistry.ts | 19 ++- .../config-manager/service-discovery/index.ts | 150 +++++++++++------- .../core/src/config-manager/utils/index.ts | 2 +- packages/core/src/core.proto | 20 ++- packages/core/src/interfaces/IModuleConfig.ts | 12 +- 11 files changed, 271 insertions(+), 227 deletions(-) diff --git a/packages/commons/src/interfaces/index.ts b/packages/commons/src/interfaces/index.ts index 0162536b8..a8d0c20ae 100644 --- a/packages/commons/src/interfaces/index.ts +++ b/packages/commons/src/interfaces/index.ts @@ -1,2 +1 @@ export * from './IConduitModule.js'; -export * from './RegisteredModule.js'; diff --git a/packages/core/src/config-manager/index.ts b/packages/core/src/config-manager/index.ts index a1285282e..f29fefd0e 100644 --- a/packages/core/src/config-manager/index.ts +++ b/packages/core/src/config-manager/index.ts @@ -47,7 +47,8 @@ export default class ConfigManager implements IConfigManager { } getModuleUrlByName(moduleName: string): string | undefined { - return this.serviceDiscovery.getModuleUrlByName(moduleName); + const result = this.serviceDiscovery.getModule(moduleName); + return result!.servingAddress ?? result!.allAddresses!; } getModuleUrlByNameGrpc( @@ -55,14 +56,18 @@ export default class ConfigManager implements IConfigManager { callback: GrpcResponse, ) { const name = call.request.name; - const result = this.getModuleUrlByName(name); + const result = this.serviceDiscovery.getModule(name); if (!result) { return callback({ code: status.NOT_FOUND, message: 'Module not found', }); } - callback(null, { moduleUrl: result }); + callback(null, { + moduleUrl: result.servingAddress ?? result.allAddresses!, + //@ts-expect-error + instances: result.instances, + }); } async initialize(server: GrpcServer) { @@ -84,7 +89,7 @@ export default class ConfigManager implements IConfigManager { moduleHealthProbe: this.serviceDiscovery.moduleHealthProbe.bind( this.serviceDiscovery, ), - getModuleUrlByName: this.getModuleUrlByNameGrpc.bind(this.serviceDiscovery), + getModuleUrlByName: this.getModuleUrlByNameGrpc.bind(this), }, ); this.serviceDiscovery.beginMonitors(); @@ -237,7 +242,7 @@ export default class ConfigManager implements IConfigManager { const module = ServiceRegistry.getInstance().getModule(moduleName); if (!module) return false; try { - await this.grpcSdk.isModuleUp(moduleName, module.address); + await this.grpcSdk.isModuleUp(moduleName, module.allAddresses!); } catch (e) { return false; } diff --git a/packages/core/src/config-manager/models/Basic.message.ts b/packages/core/src/config-manager/models/Basic.message.ts index 973a2f059..73d1126ee 100644 --- a/packages/core/src/config-manager/models/Basic.message.ts +++ b/packages/core/src/config-manager/models/Basic.message.ts @@ -1,3 +1,3 @@ export type BasicMessage = { - type: 'module-health' | 'serving-modules-update'; + type: 'instance-health' | 'serving-modules-update'; }; diff --git a/packages/core/src/config-manager/models/RegisteredModule.ts b/packages/core/src/config-manager/models/RegisteredModule.ts index cc5c1a0c7..e4f9dd48a 100644 --- a/packages/core/src/config-manager/models/RegisteredModule.ts +++ b/packages/core/src/config-manager/models/RegisteredModule.ts @@ -1,26 +1,26 @@ -import { formatAddress, getAddressType } from '../utils/index.js'; -import dns from 'node:dns'; -import { ConduitGrpcSdk } from '@conduitplatform/grpc-sdk'; +import { getAddressType } from '../utils/index.js'; +import { ConduitGrpcSdk, HealthCheckStatus } from '@conduitplatform/grpc-sdk'; export type ModuleInstance = { instanceId: string; address: string; - serving: boolean; + url: string; + status: HealthCheckStatus; + serving?: boolean; addressType?: 'ipv4' | 'ipv6' | 'dns'; }; export class RegisteredModule { private readonly _name: string; private _instances: ModuleInstance[] = []; - // Should always be array of IP addresses of the same type, - // but may contain either ipv4 or ipv6 addresses. - private _resolvedAddresses: string[] = []; - // Should always be array of IP addresses of the same type - private _servingAddresses: string[] = []; - constructor(name: string); - constructor(name: string, instances: ModuleInstance[]); - constructor(name: string, instances?: ModuleInstance[]) { + constructor(name: string, grpcSdk: ConduitGrpcSdk); + constructor(name: string, grpcSdk: ConduitGrpcSdk, instances: ModuleInstance[]); + constructor( + name: string, + private readonly grpcSdk: ConduitGrpcSdk, + instances?: ModuleInstance[], + ) { this._name = name; if (instances) { for (const instance of instances) { @@ -34,67 +34,53 @@ export class RegisteredModule { } public get isServing() { - return this._servingAddresses.length > 0; + return this._instances.some(i => i.serving); } public get servingAddress() { - if (this._servingAddresses.length === 0) { + if (!this._instances.some(i => i.serving)) { return undefined; } - const addressType = getAddressType(this._servingAddresses[0]); - return `${addressType}:${this._servingAddresses.join(',')}`; + const servingInstances = this._instances.filter(i => i.serving); + let addressType = getAddressType(servingInstances[0].url); + if (addressType === 'dns') { + return `dns:///${servingInstances[0].url}`; + } else { + return `${addressType}:${servingInstances.map(i => i.url).join(',')}`; + } } public get allAddresses() { - const addressType = getAddressType(this._resolvedAddresses[0]); - return `${addressType}:${this._resolvedAddresses.join(',')}`; + if (this._instances.length === 0) { + return undefined; + } + let addressType = getAddressType(this._instances[0].url); + if (addressType === 'dns') { + return `dns:///${this._instances[0].url}`; + } else { + return `${addressType}:${this._instances.map(i => i.url).join(',')}`; + } + } + + public updateInstanceHealth(instanceId: string, status: HealthCheckStatus) { + const instance = this._instances.find(i => i.instanceId === instanceId); + if (instance) { + instance.status = status; + instance.serving = instance.status === HealthCheckStatus.SERVING; + this.updateServingList(); + } } public addOrUpdateInstance(instance: ModuleInstance) { const index = this._instances.findIndex(i => i.instanceId === instance.instanceId); if (!instance.addressType) { - instance.addressType = getAddressType(instance.address); + instance.addressType = getAddressType(instance.url); } + this._instances.findIndex(i => i.address === instance.instanceId); if (index === -1) { this._instances.push(instance); } else { - this._instances[index] = instance; - } - this.resolveAddresses(instance.instanceId); - } - - private resolveAddresses(instanceId: string) { - const instance = this._instances.find(i => i.instanceId === instanceId); - if (!instance) return; - const addressType = instance.addressType; - if (addressType! === 'dns') { - // We shouldn't need to extend this to resolve ipv6 addresses, - // since all communications inside the mesh should be ipv4 - dns.resolve(instance.address, (err, addresses) => { - if (err) { - throw new Error('DNS resolution failed'); - } - for (const address of addresses) { - const formattedAddress = formatAddress(address, addressType!); - this._resolvedAddresses.push(formattedAddress); - if (instance.serving) { - this._servingAddresses.push(formattedAddress); - } - } - this.updateServingList(); - }); - } else { - this._resolvedAddresses = this._instances.map(i => - formatAddress(i.address, i.addressType!), - ); - this.updateServingList(); - } - // check if all instances have the same address type - const allSameType = this._instances.every(i => i.addressType === addressType); - if (!allSameType) { - ConduitGrpcSdk.Logger.warn( - `Module ${this._name} has instances with different address types. Some instances will be unavailable for communication.`, - ); + this._instances[index] = { ...this._instances[index], ...instance }; } } @@ -107,17 +93,18 @@ export class RegisteredModule { (servingCount[instance.addressType!] || 0) + 1; } } - // get the address type with the most serving instances - const maxServing = Math.max(...Object.values(servingCount)); - const mostServingType = Object.keys(servingCount).find( - type => servingCount[type] === maxServing, - ); - // update the serving addresses - this._servingAddresses = this._resolvedAddresses.filter(address => - this._instances.some( - i => i.address === address && i.addressType === mostServingType, - ), - ); + if (!this._instances.some(i => i.status === HealthCheckStatus.SERVING)) { + ConduitGrpcSdk.Logger.warn( + `Module ${this._name} has no serving instances. Communication with this module will fail.`, + ); + } else { + ConduitGrpcSdk.Logger.log( + `Module ${this._name} has ${ + this._instances.filter(i => i.serving).length + } serving instances.`, + ); + this.grpcSdk.updateModuleHealth(this._name, true); + } } public removeInstance(instanceId: string) { @@ -127,4 +114,8 @@ export class RegisteredModule { public getInstance(instanceId: string) { return this._instances.find(i => i.instanceId === instanceId); } + + public get instances() { + return this._instances; + } } diff --git a/packages/core/src/config-manager/models/ServiceDiscover.message.ts b/packages/core/src/config-manager/models/ServiceDiscover.message.ts index 7a823116f..95345c811 100644 --- a/packages/core/src/config-manager/models/ServiceDiscover.message.ts +++ b/packages/core/src/config-manager/models/ServiceDiscover.message.ts @@ -3,8 +3,9 @@ import { BasicMessage } from './Basic.message.js'; export type ServiceDiscoverMessage = BasicMessage & { name: string; - url: string; + address: string; + url?: string; status: HealthCheckStatus; - addressType: 'ipv4' | 'ipv6' | 'dns'; + addressType?: 'ipv4' | 'ipv6' | 'dns'; instanceId: string; }; diff --git a/packages/core/src/config-manager/service-discovery/ServiceMonitor.ts b/packages/core/src/config-manager/service-discovery/ServiceMonitor.ts index 43861ff58..e02c5a6bb 100644 --- a/packages/core/src/config-manager/service-discovery/ServiceMonitor.ts +++ b/packages/core/src/config-manager/service-discovery/ServiceMonitor.ts @@ -1,17 +1,10 @@ -import { - ConduitGrpcSdk, - HealthCheckResponse, - HealthCheckStatus, -} from '@conduitplatform/grpc-sdk'; +import { ConduitGrpcSdk, HealthCheckStatus } from '@conduitplatform/grpc-sdk'; import { ServiceRegistry } from './ServiceRegistry.js'; import { linearBackoffTimeout } from '@conduitplatform/module-tools'; import { EventEmitter } from 'events'; export class ServiceMonitor { private static _instance: ServiceMonitor; - private readonly moduleHealth: { - [module: string]: { address: string; timestamp: number; status: HealthCheckStatus }; - } = {}; private readonly _serviceRegistry = ServiceRegistry.getInstance(); private monitorIntervalMs = 5000; private serviceReconnectionInitMs = 500; @@ -58,106 +51,94 @@ export class ServiceMonitor { * Any services that do not provide a gRPC health check service are assumed to be healthy. * Used by healthCheckRegisteredModules(), reviveService() */ - private async healthCheckService(module: string, address: string) { - const healthClient = this.grpcSdk.getHealthClient(module); - let status = HealthCheckStatus.SERVING; - if (healthClient) { - status = await healthClient - .check({}) - .then((res: HealthCheckResponse) => res.status as unknown as HealthCheckStatus); - } - const isRegistered = Object.keys(this.moduleHealth).includes(module); - if (!isRegistered && status === HealthCheckStatus.SERVICE_UNKNOWN) return; - this.updateModuleHealth(module, address, status); + + // OK + private async healthCheckService(module: string, instanceId: string) { + const instance = this._serviceRegistry.getModule(module)!.getInstance(instanceId)!; + const status = await this.grpcSdk.isModuleUp(module, instance.address); + this.updateInstanceHealth(module, instanceId, status); return status; } - handleUnresponsiveModule( + handleUnresponsiveInstance(moduleName: string, instanceId: string) { + ConduitGrpcSdk.Logger.log(`SD/health: update ${moduleName}/${instanceId} unknown`); + this.reviveService(moduleName, instanceId); + } + + updateInstanceHealth( moduleName: string, - moduleUrl: string, - status: HealthCheckStatus, + instanceId: string, + instanceStatus: HealthCheckStatus, ) { - const isRegistered = Object.keys(this.moduleHealth).includes(moduleName); - if (!isRegistered && status === HealthCheckStatus.SERVICE_UNKNOWN) { - ConduitGrpcSdk.Logger.info( - `SD/health: unresponsive ${moduleName} ${moduleUrl} with no health history`, - ); - return; + let module = this._serviceRegistry.getModule(moduleName); + // if module doesn't exist in the registry, do nothing. + if (!module) { + return null; } - - ConduitGrpcSdk.Logger.log(`SD/health: update ${moduleName} ${moduleUrl} unknown`); - this.grpcSdk.updateModuleHealth(moduleName, false); - // Deregister Unresponsive Module - delete this.moduleHealth[moduleName]; - this._serviceRegistry.removeModule(moduleName); - this.reviveService(moduleName, moduleUrl); + module.updateInstanceHealth(instanceId, instanceStatus); } + // OK updateModuleHealth( moduleName: string, - moduleUrl: string, - moduleStatus: HealthCheckStatus, + instanceId: string, + instanceStatus: HealthCheckStatus, ) { - if (moduleStatus === HealthCheckStatus.SERVICE_UNKNOWN) { - return this.handleUnresponsiveModule(moduleName, moduleUrl, moduleStatus); + if (instanceStatus === HealthCheckStatus.SERVICE_UNKNOWN) { + return this.handleUnresponsiveInstance(moduleName, instanceId); } let module = this._serviceRegistry.getModule(moduleName); + // if module doesn't exist in the registry, do nothing. if (!module) { - module = { - address: moduleUrl, - serving: moduleStatus === HealthCheckStatus.SERVING, - }; - // ConduitGrpcSdk.Logger.log( - // `SD/health: update unregistered module ${moduleName} ${moduleUrl} ${moduleStatus}`, - // ); - this._serviceRegistry.updateModule(moduleName, module); - } else { - const prevStatus = module.serving; - // ConduitGrpcSdk.Logger.log( - // `SD/health: update registered module ${moduleName} ${moduleUrl} to ${moduleStatus} from serving: ${prevStatus}`, - // ); - module.serving = moduleStatus === HealthCheckStatus.SERVING; + return null; } - this.grpcSdk.updateModuleHealth( - moduleName, - moduleStatus === HealthCheckStatus.SERVING, + const instance = module.getInstance(instanceId); + // if instance doesn't exist in the registry, do nothing. + // instances should be registered before their first probe + if (!instance) { + return null; + } + const prevStatus = instance.serving; + ConduitGrpcSdk.Logger.log( + `SD/health: update instance ${moduleName}/${instanceId} to ${instanceStatus} from serving: ${prevStatus}`, ); - this._serviceRegistry.updateModule(moduleName, module); - this.moduleHealth[moduleName] = { - address: moduleUrl, - timestamp: Date.now(), - status: moduleStatus, - }; + module.addOrUpdateInstance({ ...instance, status: instanceStatus }); } /* * Attempt to reconnect to a recently removed module service. * Retries using linear backoff. */ - private reviveService(name: string, address: string) { + + // OK + private reviveService(name: string, instanceId: string) { + const instance = this._serviceRegistry.getModule(name)!.getInstance(instanceId)!; + const onTry = (stop: () => void) => { - if (Object.keys(this.moduleHealth).includes(name)) { + if (instance.serving) { stop(); - ConduitGrpcSdk.Logger.log(`SD/health/revive: found healthy ${name} ${address}`); + ConduitGrpcSdk.Logger.log( + `SD/health/revive: found healthy ${name}/${instanceId}`, + ); } else { - this.healthCheckService(name, address) + this.healthCheckService(name, instanceId) .then(() => { ConduitGrpcSdk.Logger.log( - `SD/health/revive: check completed ${name} ${address}`, + `SD/health/revive: check completed ${name}/${instanceId}`, ); }) .catch(() => { ConduitGrpcSdk.Logger.log( - `SD/health/revive: check failed ${name} ${address}`, + `SD/health/revive: check failed ${name}/${instanceId}`, ); }); } }; const onFailure = () => { - this.grpcSdk.getModule(name)?.closeConnection(); + this._serviceRegistry.getModule(name)?.removeInstance(instanceId); ConduitGrpcSdk.Logger.log( - `SD/health/revive: check connection closed ${name} ${address}`, + `SD/health/revive: failed to recover ${name}/${instanceId}`, ); }; linearBackoffTimeout( @@ -168,19 +149,23 @@ export class ServiceMonitor { ); } + // OK private async monitorModules() { for (const module of this._serviceRegistry.getRegisteredModules()) { - const registeredModule = this._serviceRegistry.getModule(module); - if (!registeredModule) continue; - try { - await this.healthCheckService(module, registeredModule.address); - } catch (e) { - if (this._serviceRegistry.getModule(module)) { - this.handleUnresponsiveModule( - module, - registeredModule.address, - HealthCheckStatus.SERVICE_UNKNOWN, - ); + const registeredModule = this._serviceRegistry.getModule(module)!; + for (const instance of registeredModule.instances) { + // skip instances with unknown status + if (instance.status === HealthCheckStatus.UNKNOWN) continue; + try { + await this.healthCheckService(module, instance.instanceId); + } catch (e) { + if (this._serviceRegistry.getModule(module)) { + registeredModule.updateInstanceHealth( + instance.instanceId, + HealthCheckStatus.UNKNOWN, + ); + this.handleUnresponsiveInstance(module, instance.instanceId); + } } } } diff --git a/packages/core/src/config-manager/service-discovery/ServiceRegistry.ts b/packages/core/src/config-manager/service-discovery/ServiceRegistry.ts index 553e74949..db8f76e7f 100644 --- a/packages/core/src/config-manager/service-discovery/ServiceRegistry.ts +++ b/packages/core/src/config-manager/service-discovery/ServiceRegistry.ts @@ -29,7 +29,7 @@ export class ServiceRegistry { if (existingModule) { existingModule.addOrUpdateInstance(module); } else { - const newModule = new RegisteredModule(moduleName, [module]); + const newModule = new RegisteredModule(moduleName, this.grpcSdk, [module]); this.registeredModules.set(moduleName, newModule); } } @@ -38,13 +38,24 @@ export class ServiceRegistry { this.registeredModules.delete(moduleName); } - getModuleDetailsList(): { moduleName: string; url: string; serving: boolean }[] { - const modules: { moduleName: string; url: string; serving: boolean }[] = []; + getModuleDetailsList(): { + moduleName: string; + url: string; + serving: boolean; + instances: ModuleInstance[]; + }[] { + const modules: { + moduleName: string; + url: string; + serving: boolean; + instances: ModuleInstance[]; + }[] = []; this.registeredModules.forEach((value: RegisteredModule, key: string) => { modules.push({ moduleName: key, - url: value.servingAddress ?? value.allAddresses, + url: value.servingAddress ?? value.allAddresses!, serving: value.isServing, + instances: value.instances, }); }); return modules; diff --git a/packages/core/src/config-manager/service-discovery/index.ts b/packages/core/src/config-manager/service-discovery/index.ts index cef227da4..67056a803 100644 --- a/packages/core/src/config-manager/service-discovery/index.ts +++ b/packages/core/src/config-manager/service-discovery/index.ts @@ -1,6 +1,7 @@ import { ModuleExistsRequest, ModuleExistsResponse, + ModuleHealthRequest, ModuleListResponse, RegisterModuleRequest, RegisterModuleResponse, @@ -12,6 +13,7 @@ import { GrpcResponse, HealthCheckStatus, Indexable, + ServerUnaryCall, } from '@conduitplatform/grpc-sdk'; import { IModuleConfig } from '../../interfaces/IModuleConfig.js'; import { ServerWritableStream, status } from '@grpc/grpc-js'; @@ -20,6 +22,7 @@ import { ServiceRegistry } from './ServiceRegistry.js'; import { ServiceMonitor } from './ServiceMonitor.js'; import { isEmpty } from 'lodash-es'; import { ServiceDiscoverMessage } from '../models/ServiceDiscover.message.js'; +import { ModuleInstance, RegisteredModule } from '../models/RegisteredModule.js'; /* * - Multi-instance services are not handled individually (LoadBalancer) @@ -41,8 +44,8 @@ export class ServiceDiscovery { this._serviceMonitor = ServiceMonitor.getInstance(grpcSdk, this.moduleRegister); } - getModuleUrlByName(name: string): string | undefined { - return this._serviceRegistry.getModule(name)?.address; + getModule(name: string): RegisteredModule | undefined { + return this._serviceRegistry.getModule(name); } beginMonitors() { @@ -54,49 +57,35 @@ export class ServiceDiscovery { if (state && !isEmpty(state)) { const parsedState = JSON.parse(state) as { modules: IModuleConfig[] }; if (parsedState.modules) { - const success: IModuleConfig[] = []; for (const module of parsedState.modules) { - try { - await this._recoverModule(module.name, module.url); - success.push({ - name: module.name, - url: module.url, - ...(module.configSchema && { configSchema: module.configSchema }), - }); - } catch (e) { - ConduitGrpcSdk.Logger.error( - `SD: failed to recover: ${module.name} ${module.url}`, - ); - ConduitGrpcSdk.Logger.error(`SD: recovery error: ${e}`); + for (const instance of module.instances) { + try { + await this._recoverModule(module.name, instance); + } catch (e) { + ConduitGrpcSdk.Logger.error( + `SD: failed to recover: ${module.name}@${instance.address}`, + ); + ConduitGrpcSdk.Logger.error(`SD: recovery error: ${e}`); + } } } - await this.grpcSdk - .state!.modifyState(async (existingState: Indexable) => { - const state = existingState ?? {}; - state.modules = success; - return state; - }) - .then(() => { - ConduitGrpcSdk.Logger.log('Recovered state'); - }) - .catch(() => { - ConduitGrpcSdk.Logger.error('Failed to recover state'); - }); } } this.grpcSdk.bus!.subscribe('service-discover', (message: string) => { const parsedMessage: ServiceDiscoverMessage = JSON.parse(message); - if (parsedMessage.type === 'module-health') { + if (parsedMessage.type === 'instance-health') { this._serviceMonitor.updateModuleHealth( parsedMessage.name, - parsedMessage.url, + parsedMessage.instanceId, parsedMessage.status, ); } else if (parsedMessage.type === 'serving-modules-update') { this._serviceRegistry.updateModule(parsedMessage.name, { - address: parsedMessage.url, + address: parsedMessage.address, + url: parsedMessage.url!, serving: true, instanceId: parsedMessage.instanceId, + status: parsedMessage.status, }); } }); @@ -106,7 +95,10 @@ export class ServiceDiscovery { * Used by modules to notify Core regarding changes in their health state. * Called on module health change via grpc-sdk. */ - moduleHealthProbe(call: any, callback: GrpcResponse) { + moduleHealthProbe( + call: GrpcRequest, + callback: GrpcResponse, + ) { if ( call.request.status < HealthCheckStatus.UNKNOWN || call.request.status > HealthCheckStatus.NOT_SERVING @@ -117,18 +109,20 @@ export class ServiceDiscovery { }); return; } + const callingIp = (call as unknown as ServerUnaryCall).getPeer(); ConduitGrpcSdk.Logger.log( - `SD: received: ${call.request.moduleName} ${call.request.url} ${call.request.status}`, + `SD: received: ${call.request.moduleName}/${call.request.instanceId}, source:${callingIp}, status${call.request.status}`, ); this._serviceMonitor.updateModuleHealth( call.request.moduleName, - call.request.url, + call.request.instanceId, call.request.status as HealthCheckStatus, ); - this.publishModuleData( - 'module-health', + this.publishInstanceHealth( + 'instance-health', call.request.moduleName, - call.request.url, + callingIp, + call.request.instanceId, call.request.status, ); callback(null, null); @@ -136,43 +130,49 @@ export class ServiceDiscovery { moduleList(call: GrpcRequest, callback: GrpcCallback) { const modules = this._serviceRegistry.getModuleDetailsList(); + //@ts-expect-error callback(null, { modules }); } watchModules(call: ServerWritableStream) { this.moduleRegister.on('serving-modules-update', () => { const modules = this._serviceRegistry.getModuleDetailsList(); + //@ts-expect-error call.write({ modules }); }); } - async _recoverModule(moduleName: string, moduleUrl: string) { + async _recoverModule(moduleName: string, instance: Omit) { let healthResponse; if (!this.grpcSdk.getModule(moduleName)) { - healthResponse = await this.grpcSdk.isModuleUp(moduleName, moduleUrl); - this.grpcSdk.createModuleClient(moduleName, moduleUrl); + healthResponse = await this.grpcSdk.isModuleUp(moduleName, instance.address); + this.grpcSdk.createModuleClient(moduleName, instance.address); } const healthStatus = healthResponse.status as unknown as HealthCheckStatus; ConduitGrpcSdk.Logger.log( - `SD: registering: ${moduleName} ${moduleUrl} ${healthStatus}`, + `SD: registering: ${moduleName}/${instance.instanceId}@${instance.address} ${healthStatus}`, ); - this._serviceRegistry.updateModule(moduleName, { - address: moduleUrl, - serving: healthStatus === HealthCheckStatus.SERVING, - }); + //@ts-expect-error + this._serviceRegistry.updateModule(moduleName, instance); if (!this.grpcSdk.isAvailable(moduleName)) { - this.grpcSdk.createModuleClient(moduleName, moduleUrl); + this.grpcSdk.createModuleClient(moduleName, instance.address); } - this._serviceMonitor.updateModuleHealth(moduleName, moduleUrl, healthStatus); + this._serviceMonitor.updateModuleHealth( + moduleName, + instance.instanceId, + healthStatus, + ); } async _registerModule( moduleName: string, moduleUrl: string, + instanceUrl: string, + instanceId: string, healthStatus?: HealthCheckStatus, ) { if (healthStatus === undefined) { @@ -182,15 +182,21 @@ export class ServiceDiscovery { `SD: registering: ${moduleName} ${moduleUrl} ${healthStatus}`, ); this._serviceRegistry.updateModule(moduleName, { - address: moduleUrl, + address: instanceUrl, + url: moduleUrl, + instanceId: instanceId, serving: healthStatus === HealthCheckStatus.SERVING, + status: healthStatus, }); if (!this.grpcSdk.isAvailable(moduleName)) { - this.grpcSdk.createModuleClient(moduleName, moduleUrl); + this.grpcSdk.createModuleClient( + moduleName, + this._serviceRegistry.getModule(moduleName)!.servingAddress!, + ); } - this._serviceMonitor.updateModuleHealth(moduleName, moduleUrl, healthStatus!); + this._serviceMonitor.updateModuleHealth(moduleName, instanceId, healthStatus!); } async registerModule( @@ -207,16 +213,21 @@ export class ServiceDiscovery { }); return; } + const callingIp = (call as unknown as ServerUnaryCall).getPeer(); await this._registerModule( call.request.moduleName, call.request.url, + callingIp.split(':')[0] + ':' + call.request.url.split(':')[1], + call.request.instanceId, call.request.healthStatus as HealthCheckStatus, ); this.updateState(call.request.moduleName, call.request.url); this.publishModuleData( 'serving-modules-update', call.request.moduleName, + callingIp, call.request.url, + call.request.instanceId, call.request.healthStatus, ); callback(null, { result: true }); @@ -227,39 +238,66 @@ export class ServiceDiscovery { callback: GrpcResponse, ) { const module = this._serviceRegistry.getModule(call.request.moduleName); - if (module) { - callback(null, { url: module.address }); + const hasAddress = module?.servingAddress; + if (module && hasAddress) { + callback(null, { + url: module.servingAddress ?? module.allAddresses!, + //@ts-expect-error + instances: module.instances, + }); } else { callback({ code: status.NOT_FOUND, - message: 'Module is missing', + message: "Module is missing or doesn't have a registered address", }); } } + private publishInstanceHealth( + type: 'instance-health', + name: string, + address: string, + instanceId: string, + status: HealthCheckStatus, + ) { + const serviceDiscoverMessage: ServiceDiscoverMessage = { + type, + name, + address, + instanceId, + status, + }; + this.grpcSdk.bus!.publish('service-discover', JSON.stringify(serviceDiscoverMessage)); + } + private publishModuleData( - type: 'module-health' | 'serving-modules-update', + type: 'serving-modules-update', name: string, + address: string, url: string, + instanceId: string, status: HealthCheckStatus, ) { const serviceDiscoverMessage: ServiceDiscoverMessage = { type, name, + address, url, + instanceId, status, }; this.grpcSdk.bus!.publish('service-discover', JSON.stringify(serviceDiscoverMessage)); } + // todo private updateState(name: string, url: string) { this.grpcSdk .state!.modifyState(async (existingState: Indexable) => { const state = existingState ?? {}; if (!state.modules) state.modules = []; - const module = state.modules.find((module: IModuleConfig) => { - return module.url === url; - }); + // const module = state.modules.find((module: IModuleConfig) => { + // return module.url === url; + // }); state.modules = [ ...state.modules.filter((module: IModuleConfig) => module.name !== name), diff --git a/packages/core/src/config-manager/utils/index.ts b/packages/core/src/config-manager/utils/index.ts index ecd86527c..f0e1b84f9 100644 --- a/packages/core/src/config-manager/utils/index.ts +++ b/packages/core/src/config-manager/utils/index.ts @@ -3,7 +3,7 @@ export const getAddressType = (address: string) => { '^((25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?)\\.){3}(25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?)$', ); const ipv6Regex = new RegExp('^([0-9a-fA-F]{1,4}:){7}([0-9a-fA-F]{1,4}|:)$'); - if (ipv4Regex.test(address)) { + if (ipv4Regex.test(address.split(':')[0])) { return 'ipv4'; } else if (ipv6Regex.test(address)) { return 'ipv6'; diff --git a/packages/core/src/core.proto b/packages/core/src/core.proto index d9f6db2ad..82b66beaa 100644 --- a/packages/core/src/core.proto +++ b/packages/core/src/core.proto @@ -78,9 +78,8 @@ message PatchRouteMiddlewaresRequest { message ModuleHealthRequest { string moduleName = 1; - string url = 2; - int32 status = 3; // [0, 1, 2] - string instanceId = 4; + int32 status = 2; // [0, 1, 2] + string instanceId = 3; } message ModuleByNameRequest { @@ -89,7 +88,7 @@ message ModuleByNameRequest { message ModuleByNameResponse { string moduleUrl = 1; - repeated string instances = 2; + repeated ModuleInstance instances = 2; } message RegisterModuleRequest { @@ -102,15 +101,22 @@ message RegisterModuleRequest { message RegisterModuleResponse { bool result = 1; } - +message ModuleInstance { + string instanceId = 1; + string address = 2; + int32 status = 3; + bool serving = 4; + string addressType = 5; +} message ModuleListResponse { repeated ModuleResponse modules = 1; message ModuleResponse { string moduleName = 1; string url = 2; bool serving = 3; - repeated string instances = 4; + repeated ModuleInstance instances = 4; } + } message ModuleExistsRequest { @@ -119,7 +125,7 @@ message ModuleExistsRequest { message ModuleExistsResponse { string url = 1; - repeated string instances = 2; + repeated ModuleInstance instances = 2; } message GetConfigRequest { diff --git a/packages/core/src/interfaces/IModuleConfig.ts b/packages/core/src/interfaces/IModuleConfig.ts index fe62aaa7d..8a53c16bd 100644 --- a/packages/core/src/interfaces/IModuleConfig.ts +++ b/packages/core/src/interfaces/IModuleConfig.ts @@ -1,6 +1,14 @@ +import { HealthCheckStatus } from '@conduitplatform/grpc-sdk'; + export interface IModuleConfig { name: string; - instance?: string; - url: string; + addresses: string; + instances: { + instanceId: string; + address: string; + url: string; + status?: HealthCheckStatus; + }[]; + status?: HealthCheckStatus; configSchema?: string; } From 30978be431f547f5d352bb856d6d472f45985e32 Mon Sep 17 00:00:00 2001 From: Konstantinos Kopanidis Date: Thu, 26 Dec 2024 18:39:33 +0200 Subject: [PATCH 5/5] refactor(core): re-work "updateState" function in SD to properly handle new structure --- .../config-manager/service-discovery/index.ts | 60 +++++++++++++++---- 1 file changed, 49 insertions(+), 11 deletions(-) diff --git a/packages/core/src/config-manager/service-discovery/index.ts b/packages/core/src/config-manager/service-discovery/index.ts index 67056a803..5faf285df 100644 --- a/packages/core/src/config-manager/service-discovery/index.ts +++ b/packages/core/src/config-manager/service-discovery/index.ts @@ -221,7 +221,13 @@ export class ServiceDiscovery { call.request.instanceId, call.request.healthStatus as HealthCheckStatus, ); - this.updateState(call.request.moduleName, call.request.url); + this.updateState(call.request.moduleName, { + address: callingIp.split(':')[0] + ':' + call.request.url.split(':')[1], + url: call.request.url, + instanceId: call.request.instanceId, + serving: call.request.healthStatus === HealthCheckStatus.SERVING, + status: call.request.healthStatus, + }); this.publishModuleData( 'serving-modules-update', call.request.moduleName, @@ -289,31 +295,63 @@ export class ServiceDiscovery { this.grpcSdk.bus!.publish('service-discover', JSON.stringify(serviceDiscoverMessage)); } - // todo - private updateState(name: string, url: string) { + private updateState(name: string, instance: ModuleInstance) { this.grpcSdk .state!.modifyState(async (existingState: Indexable) => { const state = existingState ?? {}; if (!state.modules) state.modules = []; - // const module = state.modules.find((module: IModuleConfig) => { - // return module.url === url; - // }); + let module = state.modules.find((module: IModuleConfig) => { + return module.name === name; + }); + if (!module) { + module = { + name, + addresses: '', + instances: [ + { + instanceId: instance.instanceId, + address: instance.address, + url: instance.url, + status: instance.status, + }, + ], + status: instance.status, + }; + } else { + let existingInstance = module.instances.find((instance: ModuleInstance) => { + return instance.instanceId === instance.instanceId; + }); + if (!existingInstance) { + module.instances.push({ + instanceId: instance.instanceId, + address: instance.address, + url: instance.url, + status: instance.status, + }); + } else { + existingInstance.address = instance.address; + existingInstance.url = instance.url; + existingInstance.status = instance.status; + } + } state.modules = [ ...state.modules.filter((module: IModuleConfig) => module.name !== name), { - ...module, //persist the module config schema - name, - url, + ...module, }, ]; return state; }) .then(() => { - ConduitGrpcSdk.Logger.log(`SD: Updated state for ${name} ${url}`); + ConduitGrpcSdk.Logger.log( + `SD: Updated state for ${name}/${instance.instanceId}:${instance.address}`, + ); }) .catch(() => { - ConduitGrpcSdk.Logger.error(`SD: Failed to update state ${name} ${url}`); + ConduitGrpcSdk.Logger.error( + `SD: Failed to update state ${name}/${instance.instanceId}:${instance.address}`, + ); }); } }