diff --git a/libraries/grpc-sdk/src/modules/config/index.ts b/libraries/grpc-sdk/src/modules/config/index.ts index 8cfce82c9..e3864d93d 100644 --- a/libraries/grpc-sdk/src/modules/config/index.ts +++ b/libraries/grpc-sdk/src/modules/config/index.ts @@ -105,11 +105,13 @@ export class Config extends ConduitModule { name: string, url: string, healthStatus: Omit, + instanceId: string, ) { const request: RegisterModuleRequest = { moduleName: name.toString(), url: url.toString(), healthStatus: healthStatus as number, + instanceId, }; const self = this; return this.client!.registerModule(request).then(res => { @@ -118,13 +120,13 @@ export class Config extends ConduitModule { }); } - moduleHealthProbe(name: string, url: string) { + moduleHealthProbe(name: string, url: string, instanceId: string) { const request: ModuleHealthRequest = { moduleName: name.toString(), - url, status: this._serviceHealthStatusGetter ? this._serviceHealthStatusGetter() : HealthCheckStatus.SERVICE_UNKNOWN, + instanceId, }; const self = this; this.client!.moduleHealthProbe(request) diff --git a/libraries/grpc-sdk/src/utilities/EventBus.ts b/libraries/grpc-sdk/src/utilities/EventBus.ts index 9d30ae907..bf6a06e99 100644 --- a/libraries/grpc-sdk/src/utilities/EventBus.ts +++ b/libraries/grpc-sdk/src/utilities/EventBus.ts @@ -16,6 +16,7 @@ export class EventBus { this._clientSubscriber = redisManager.getClient({ keyPrefix: 'bus_' }); this._clientPublisher = redisManager.getClient({ keyPrefix: 'bus_' }); this._signature = crypto.randomBytes(20).toString('hex'); + //@ts-expect-error this._clientSubscriber.on('ready', () => { ConduitGrpcSdk.Logger.log('The Bus is in the station...hehe'); }); @@ -59,6 +60,7 @@ export class EventBus { this._subscribedChannels[channelName] = [callback]; this._clientSubscriber.subscribe(channelName, () => {}); const self = this; + //@ts-expect-error this._clientSubscriber.on('message', (channel: string, message: string) => { if (channel !== channelName) return; // if the message supports the signature diff --git a/libraries/module-tools/src/ManagedModule.ts b/libraries/module-tools/src/ManagedModule.ts index 3536d1a27..623cf0d40 100644 --- a/libraries/module-tools/src/ManagedModule.ts +++ b/libraries/module-tools/src/ManagedModule.ts @@ -14,6 +14,7 @@ import { initializeSdk, merge } from './utilities/index.js'; import { convictConfigParser } from './utilities/convictConfigParser.js'; import { RoutingManager } from './routing/index.js'; import { RoutingController } from './routing/RoutingController.js'; +import crypto from 'crypto'; export abstract class ManagedModule extends ConduitServiceModule { readonly config?: convict.Config; @@ -25,8 +26,11 @@ export abstract class ManagedModule extends ConduitServiceModule { private readonly servicePort: string; private _moduleState: ModuleLifecycleStage; - protected constructor(moduleName: string) { - super(moduleName); + protected constructor(moduleName: string, instanceId?: string) { + super( + moduleName, + `${moduleName}-${instanceId ?? crypto.randomBytes(16).toString('hex')}`, + ); this._moduleState = ModuleLifecycleStage.CREATE_GRPC; if (!process.env.CONDUIT_SERVER) { throw new Error('CONDUIT_SERVER is undefined, specify Conduit server URL'); @@ -48,6 +52,10 @@ export abstract class ManagedModule extends ConduitServiceModule { return this._moduleName; } + get instanceId() { + return this._instanceId; + } + get address() { return this._address; } @@ -57,7 +65,12 @@ export abstract class ManagedModule extends ConduitServiceModule { this.initialize(this.serviceAddress, this.servicePort); try { await this.preRegisterLifecycle(); - await this.grpcSdk.config.registerModule(this.name, this.address, this.healthState); + await this.grpcSdk.config.registerModule( + this.name, + this.address, + this.healthState, + this.instanceId, + ); } catch (err) { ConduitGrpcSdk.Logger.error('Failed to initialize server'); ConduitGrpcSdk.Logger.error(err as Error); @@ -257,11 +270,9 @@ export abstract class ManagedModule extends ConduitServiceModule { convictConfigParser(configSchema), this.configOverride, ); + ConfigController.getInstance().config = config; - ConfigController.getInstance(); - if (config) ConfigController.getInstance().config = config; - if (!config || config.active || !config.hasOwnProperty('active')) - await this.onConfig(); + await this.onConfig(); } } } diff --git a/libraries/module-tools/src/classes/ConduitServiceModule.ts b/libraries/module-tools/src/classes/ConduitServiceModule.ts index d00c0cd45..5276731ff 100644 --- a/libraries/module-tools/src/classes/ConduitServiceModule.ts +++ b/libraries/module-tools/src/classes/ConduitServiceModule.ts @@ -20,16 +20,19 @@ import { fileURLToPath } from 'node:url'; const __filename = fileURLToPath(import.meta.url); const __dirname = path.dirname(__filename); + export abstract class ConduitServiceModule { protected readonly _moduleName: string; + protected readonly _instanceId: string; protected _serviceName?: string; protected _address!: string; // external address:port of service (LoadBalancer) protected grpcServer!: GrpcServer; protected readonly events: EventEmitter = new EventEmitter(); private _serviceHealthState: HealthCheckStatus = HealthCheckStatus.SERVING; // default for health-agnostic modules - protected constructor(moduleName: string) { + protected constructor(moduleName: string, instanceId: string) { this._moduleName = camelCase(moduleName); + this._instanceId = instanceId; } private _registered = false; @@ -83,7 +86,11 @@ export abstract class ConduitServiceModule { // do not emit health updates until registered if (this._registered) { this.events.emit(`grpc-health-change:${this._serviceName}`, state); - this._grpcSdk?.config.moduleHealthProbe(this._moduleName, this._address); + this._grpcSdk?.config.moduleHealthProbe( + this._moduleName, + this._address, + this._instanceId, + ); } } } diff --git a/modules/forms/src/Forms.ts b/modules/forms/src/Forms.ts index 589e62ef6..755bc64b0 100644 --- a/modules/forms/src/Forms.ts +++ b/modules/forms/src/Forms.ts @@ -32,17 +32,6 @@ export default class Forms extends ManagedModule { this.database = this.grpcSdk.database!; await runMigrations(this.grpcSdk); await this.registerSchemas(); - this.grpcSdk.monitorModule('email', serving => { - if (serving && ConfigController.getInstance().config.active) { - this.onConfig() - .then() - .catch(() => { - ConduitGrpcSdk.Logger.error('Failed to update Forms configuration'); - }); - } else { - this.updateHealth(HealthCheckStatus.NOT_SERVING); - } - }); } async onConfig() { @@ -50,8 +39,6 @@ export default class Forms extends ManagedModule { this.updateHealth(HealthCheckStatus.NOT_SERVING); } else { if (!this.isRunning) { - if (!this.grpcSdk.isAvailable('email')) return; - await this.grpcSdk.emailProvider!.registerTemplate(FormSubmissionTemplate); this.formController = new FormsController(this.grpcSdk); this.adminRouter = new AdminHandlers( this.grpcServer, @@ -68,6 +55,14 @@ export default class Forms extends ManagedModule { .catch(e => { ConduitGrpcSdk.Logger.error(e.message); }); + this.grpcSdk + .waitForExistence('email') + .then(() => { + this.grpcSdk.emailProvider!.registerTemplate(FormSubmissionTemplate); + }) + .catch(e => { + ConduitGrpcSdk.Logger.error(e.message); + }); } this.updateHealth(HealthCheckStatus.SERVING); } diff --git a/modules/forms/src/routes/index.ts b/modules/forms/src/routes/index.ts index 9b06f8df3..7d7ab16dd 100644 --- a/modules/forms/src/routes/index.ts +++ b/modules/forms/src/routes/index.ts @@ -94,18 +94,20 @@ export class FormsRoutes { Object.keys(data).forEach(r => { text += `
${r}: ${data[r]}`; }); - await this.grpcSdk - .emailProvider!.sendEmail('FormSubmission', { - email: form.forwardTo, - replyTo: form.emailField ? data[form.emailField] : null, - variables: { - data: text, - }, - attachments: Object.values(fileData), - }) - .catch((e: Error) => { - throw new GrpcError(status.INTERNAL, e.message); - }); + if (this.grpcSdk.isAvailable('email')) { + await this.grpcSdk + .emailProvider!.sendEmail('FormSubmission', { + email: form.forwardTo, + replyTo: form.emailField ? data[form.emailField] : null, + variables: { + data: text, + }, + attachments: Object.values(fileData), + }) + .catch((e: Error) => { + throw new GrpcError(status.INTERNAL, e.message); + }); + } return 'Ok'; } diff --git a/packages/commons/src/interfaces/RegisteredModule.ts b/packages/commons/src/interfaces/RegisteredModule.ts deleted file mode 100644 index 7401c23d8..000000000 --- a/packages/commons/src/interfaces/RegisteredModule.ts +++ /dev/null @@ -1,4 +0,0 @@ -export interface RegisteredModule { - address: string; - serving: boolean; -} diff --git a/packages/commons/src/interfaces/index.ts b/packages/commons/src/interfaces/index.ts index 0162536b8..a8d0c20ae 100644 --- a/packages/commons/src/interfaces/index.ts +++ b/packages/commons/src/interfaces/index.ts @@ -1,2 +1 @@ export * from './IConduitModule.js'; -export * from './RegisteredModule.js'; diff --git a/packages/core/src/config-manager/index.ts b/packages/core/src/config-manager/index.ts index be1ad472c..f29fefd0e 100644 --- a/packages/core/src/config-manager/index.ts +++ b/packages/core/src/config-manager/index.ts @@ -8,9 +8,12 @@ import { } from '@conduitplatform/grpc-sdk'; import { ConduitCommons, + GetConfigRequest, GetConfigResponse, GetRedisDetailsResponse, IConfigManager, + ModuleByNameRequest, + ModuleByNameResponse, UpdateConfigRequest, UpdateConfigResponse, } from '@conduitplatform/commons'; @@ -44,22 +47,27 @@ export default class ConfigManager implements IConfigManager { } getModuleUrlByName(moduleName: string): string | undefined { - return this.serviceDiscovery.getModuleUrlByName(moduleName); + const result = this.serviceDiscovery.getModule(moduleName); + return result!.servingAddress ?? result!.allAddresses!; } getModuleUrlByNameGrpc( - call: GrpcRequest<{ name: string }>, - callback: GrpcResponse<{ moduleUrl: string }>, + call: GrpcRequest, + callback: GrpcResponse, ) { const name = call.request.name; - const result = this.getModuleUrlByName(name); + const result = this.serviceDiscovery.getModule(name); if (!result) { return callback({ code: status.NOT_FOUND, message: 'Module not found', }); } - callback(null, { moduleUrl: result }); + callback(null, { + moduleUrl: result.servingAddress ?? result.allAddresses!, + //@ts-expect-error + instances: result.instances, + }); } async initialize(server: GrpcServer) { @@ -81,7 +89,7 @@ export default class ConfigManager implements IConfigManager { moduleHealthProbe: this.serviceDiscovery.moduleHealthProbe.bind( this.serviceDiscovery, ), - getModuleUrlByName: this.getModuleUrlByNameGrpc.bind(this.serviceDiscovery), + getModuleUrlByName: this.getModuleUrlByNameGrpc.bind(this), }, ); this.serviceDiscovery.beginMonitors(); @@ -156,7 +164,10 @@ export default class ConfigManager implements IConfigManager { this._configStorage.onDatabaseAvailable(); } - getGrpc(call: GrpcRequest<{ key: string }>, callback: GrpcResponse<{ data: string }>) { + getGrpc( + call: GrpcRequest, + callback: GrpcResponse, + ) { this.get(call.request.key).then(r => { if (!r) { return callback({ @@ -231,7 +242,7 @@ export default class ConfigManager implements IConfigManager { const module = ServiceRegistry.getInstance().getModule(moduleName); if (!module) return false; try { - await this.grpcSdk.isModuleUp(moduleName, module.address); + await this.grpcSdk.isModuleUp(moduleName, module.allAddresses!); } catch (e) { return false; } diff --git a/packages/core/src/config-manager/models/Basic.message.ts b/packages/core/src/config-manager/models/Basic.message.ts new file mode 100644 index 000000000..73d1126ee --- /dev/null +++ b/packages/core/src/config-manager/models/Basic.message.ts @@ -0,0 +1,3 @@ +export type BasicMessage = { + type: 'instance-health' | 'serving-modules-update'; +}; diff --git a/packages/core/src/config-manager/models/RegisteredModule.ts b/packages/core/src/config-manager/models/RegisteredModule.ts new file mode 100644 index 000000000..e4f9dd48a --- /dev/null +++ b/packages/core/src/config-manager/models/RegisteredModule.ts @@ -0,0 +1,121 @@ +import { getAddressType } from '../utils/index.js'; +import { ConduitGrpcSdk, HealthCheckStatus } from '@conduitplatform/grpc-sdk'; + +export type ModuleInstance = { + instanceId: string; + address: string; + url: string; + status: HealthCheckStatus; + serving?: boolean; + addressType?: 'ipv4' | 'ipv6' | 'dns'; +}; + +export class RegisteredModule { + private readonly _name: string; + private _instances: ModuleInstance[] = []; + + constructor(name: string, grpcSdk: ConduitGrpcSdk); + constructor(name: string, grpcSdk: ConduitGrpcSdk, instances: ModuleInstance[]); + constructor( + name: string, + private readonly grpcSdk: ConduitGrpcSdk, + instances?: ModuleInstance[], + ) { + this._name = name; + if (instances) { + for (const instance of instances) { + this.addOrUpdateInstance(instance); + } + } + } + + get name() { + return this._name; + } + + public get isServing() { + return this._instances.some(i => i.serving); + } + + public get servingAddress() { + if (!this._instances.some(i => i.serving)) { + return undefined; + } + const servingInstances = this._instances.filter(i => i.serving); + let addressType = getAddressType(servingInstances[0].url); + if (addressType === 'dns') { + return `dns:///${servingInstances[0].url}`; + } else { + return `${addressType}:${servingInstances.map(i => i.url).join(',')}`; + } + } + + public get allAddresses() { + if (this._instances.length === 0) { + return undefined; + } + let addressType = getAddressType(this._instances[0].url); + if (addressType === 'dns') { + return `dns:///${this._instances[0].url}`; + } else { + return `${addressType}:${this._instances.map(i => i.url).join(',')}`; + } + } + + public updateInstanceHealth(instanceId: string, status: HealthCheckStatus) { + const instance = this._instances.find(i => i.instanceId === instanceId); + if (instance) { + instance.status = status; + instance.serving = instance.status === HealthCheckStatus.SERVING; + this.updateServingList(); + } + } + + public addOrUpdateInstance(instance: ModuleInstance) { + const index = this._instances.findIndex(i => i.instanceId === instance.instanceId); + if (!instance.addressType) { + instance.addressType = getAddressType(instance.url); + } + this._instances.findIndex(i => i.address === instance.instanceId); + if (index === -1) { + this._instances.push(instance); + } else { + this._instances[index] = { ...this._instances[index], ...instance }; + } + } + + private updateServingList() { + // count the number of instances that are serving per address type + const servingCount: Record = {}; + for (const instance of this._instances) { + if (instance.serving) { + servingCount[instance.addressType!] = + (servingCount[instance.addressType!] || 0) + 1; + } + } + if (!this._instances.some(i => i.status === HealthCheckStatus.SERVING)) { + ConduitGrpcSdk.Logger.warn( + `Module ${this._name} has no serving instances. Communication with this module will fail.`, + ); + } else { + ConduitGrpcSdk.Logger.log( + `Module ${this._name} has ${ + this._instances.filter(i => i.serving).length + } serving instances.`, + ); + this.grpcSdk.updateModuleHealth(this._name, true); + } + } + + public removeInstance(instanceId: string) { + this._instances = this._instances.filter(i => i.instanceId !== instanceId); + } + + public getInstance(instanceId: string) { + return this._instances.find(i => i.instanceId === instanceId); + } + + public get instances() { + return this._instances; + } +} diff --git a/packages/core/src/config-manager/models/ServiceDiscover.message.ts b/packages/core/src/config-manager/models/ServiceDiscover.message.ts new file mode 100644 index 000000000..95345c811 --- /dev/null +++ b/packages/core/src/config-manager/models/ServiceDiscover.message.ts @@ -0,0 +1,11 @@ +import { HealthCheckStatus } from '@conduitplatform/grpc-sdk'; +import { BasicMessage } from './Basic.message.js'; + +export type ServiceDiscoverMessage = BasicMessage & { + name: string; + address: string; + url?: string; + status: HealthCheckStatus; + addressType?: 'ipv4' | 'ipv6' | 'dns'; + instanceId: string; +}; diff --git a/packages/core/src/config-manager/service-discovery/ServiceMonitor.ts b/packages/core/src/config-manager/service-discovery/ServiceMonitor.ts index 43861ff58..e02c5a6bb 100644 --- a/packages/core/src/config-manager/service-discovery/ServiceMonitor.ts +++ b/packages/core/src/config-manager/service-discovery/ServiceMonitor.ts @@ -1,17 +1,10 @@ -import { - ConduitGrpcSdk, - HealthCheckResponse, - HealthCheckStatus, -} from '@conduitplatform/grpc-sdk'; +import { ConduitGrpcSdk, HealthCheckStatus } from '@conduitplatform/grpc-sdk'; import { ServiceRegistry } from './ServiceRegistry.js'; import { linearBackoffTimeout } from '@conduitplatform/module-tools'; import { EventEmitter } from 'events'; export class ServiceMonitor { private static _instance: ServiceMonitor; - private readonly moduleHealth: { - [module: string]: { address: string; timestamp: number; status: HealthCheckStatus }; - } = {}; private readonly _serviceRegistry = ServiceRegistry.getInstance(); private monitorIntervalMs = 5000; private serviceReconnectionInitMs = 500; @@ -58,106 +51,94 @@ export class ServiceMonitor { * Any services that do not provide a gRPC health check service are assumed to be healthy. * Used by healthCheckRegisteredModules(), reviveService() */ - private async healthCheckService(module: string, address: string) { - const healthClient = this.grpcSdk.getHealthClient(module); - let status = HealthCheckStatus.SERVING; - if (healthClient) { - status = await healthClient - .check({}) - .then((res: HealthCheckResponse) => res.status as unknown as HealthCheckStatus); - } - const isRegistered = Object.keys(this.moduleHealth).includes(module); - if (!isRegistered && status === HealthCheckStatus.SERVICE_UNKNOWN) return; - this.updateModuleHealth(module, address, status); + + // OK + private async healthCheckService(module: string, instanceId: string) { + const instance = this._serviceRegistry.getModule(module)!.getInstance(instanceId)!; + const status = await this.grpcSdk.isModuleUp(module, instance.address); + this.updateInstanceHealth(module, instanceId, status); return status; } - handleUnresponsiveModule( + handleUnresponsiveInstance(moduleName: string, instanceId: string) { + ConduitGrpcSdk.Logger.log(`SD/health: update ${moduleName}/${instanceId} unknown`); + this.reviveService(moduleName, instanceId); + } + + updateInstanceHealth( moduleName: string, - moduleUrl: string, - status: HealthCheckStatus, + instanceId: string, + instanceStatus: HealthCheckStatus, ) { - const isRegistered = Object.keys(this.moduleHealth).includes(moduleName); - if (!isRegistered && status === HealthCheckStatus.SERVICE_UNKNOWN) { - ConduitGrpcSdk.Logger.info( - `SD/health: unresponsive ${moduleName} ${moduleUrl} with no health history`, - ); - return; + let module = this._serviceRegistry.getModule(moduleName); + // if module doesn't exist in the registry, do nothing. + if (!module) { + return null; } - - ConduitGrpcSdk.Logger.log(`SD/health: update ${moduleName} ${moduleUrl} unknown`); - this.grpcSdk.updateModuleHealth(moduleName, false); - // Deregister Unresponsive Module - delete this.moduleHealth[moduleName]; - this._serviceRegistry.removeModule(moduleName); - this.reviveService(moduleName, moduleUrl); + module.updateInstanceHealth(instanceId, instanceStatus); } + // OK updateModuleHealth( moduleName: string, - moduleUrl: string, - moduleStatus: HealthCheckStatus, + instanceId: string, + instanceStatus: HealthCheckStatus, ) { - if (moduleStatus === HealthCheckStatus.SERVICE_UNKNOWN) { - return this.handleUnresponsiveModule(moduleName, moduleUrl, moduleStatus); + if (instanceStatus === HealthCheckStatus.SERVICE_UNKNOWN) { + return this.handleUnresponsiveInstance(moduleName, instanceId); } let module = this._serviceRegistry.getModule(moduleName); + // if module doesn't exist in the registry, do nothing. if (!module) { - module = { - address: moduleUrl, - serving: moduleStatus === HealthCheckStatus.SERVING, - }; - // ConduitGrpcSdk.Logger.log( - // `SD/health: update unregistered module ${moduleName} ${moduleUrl} ${moduleStatus}`, - // ); - this._serviceRegistry.updateModule(moduleName, module); - } else { - const prevStatus = module.serving; - // ConduitGrpcSdk.Logger.log( - // `SD/health: update registered module ${moduleName} ${moduleUrl} to ${moduleStatus} from serving: ${prevStatus}`, - // ); - module.serving = moduleStatus === HealthCheckStatus.SERVING; + return null; } - this.grpcSdk.updateModuleHealth( - moduleName, - moduleStatus === HealthCheckStatus.SERVING, + const instance = module.getInstance(instanceId); + // if instance doesn't exist in the registry, do nothing. + // instances should be registered before their first probe + if (!instance) { + return null; + } + const prevStatus = instance.serving; + ConduitGrpcSdk.Logger.log( + `SD/health: update instance ${moduleName}/${instanceId} to ${instanceStatus} from serving: ${prevStatus}`, ); - this._serviceRegistry.updateModule(moduleName, module); - this.moduleHealth[moduleName] = { - address: moduleUrl, - timestamp: Date.now(), - status: moduleStatus, - }; + module.addOrUpdateInstance({ ...instance, status: instanceStatus }); } /* * Attempt to reconnect to a recently removed module service. * Retries using linear backoff. */ - private reviveService(name: string, address: string) { + + // OK + private reviveService(name: string, instanceId: string) { + const instance = this._serviceRegistry.getModule(name)!.getInstance(instanceId)!; + const onTry = (stop: () => void) => { - if (Object.keys(this.moduleHealth).includes(name)) { + if (instance.serving) { stop(); - ConduitGrpcSdk.Logger.log(`SD/health/revive: found healthy ${name} ${address}`); + ConduitGrpcSdk.Logger.log( + `SD/health/revive: found healthy ${name}/${instanceId}`, + ); } else { - this.healthCheckService(name, address) + this.healthCheckService(name, instanceId) .then(() => { ConduitGrpcSdk.Logger.log( - `SD/health/revive: check completed ${name} ${address}`, + `SD/health/revive: check completed ${name}/${instanceId}`, ); }) .catch(() => { ConduitGrpcSdk.Logger.log( - `SD/health/revive: check failed ${name} ${address}`, + `SD/health/revive: check failed ${name}/${instanceId}`, ); }); } }; const onFailure = () => { - this.grpcSdk.getModule(name)?.closeConnection(); + this._serviceRegistry.getModule(name)?.removeInstance(instanceId); ConduitGrpcSdk.Logger.log( - `SD/health/revive: check connection closed ${name} ${address}`, + `SD/health/revive: failed to recover ${name}/${instanceId}`, ); }; linearBackoffTimeout( @@ -168,19 +149,23 @@ export class ServiceMonitor { ); } + // OK private async monitorModules() { for (const module of this._serviceRegistry.getRegisteredModules()) { - const registeredModule = this._serviceRegistry.getModule(module); - if (!registeredModule) continue; - try { - await this.healthCheckService(module, registeredModule.address); - } catch (e) { - if (this._serviceRegistry.getModule(module)) { - this.handleUnresponsiveModule( - module, - registeredModule.address, - HealthCheckStatus.SERVICE_UNKNOWN, - ); + const registeredModule = this._serviceRegistry.getModule(module)!; + for (const instance of registeredModule.instances) { + // skip instances with unknown status + if (instance.status === HealthCheckStatus.UNKNOWN) continue; + try { + await this.healthCheckService(module, instance.instanceId); + } catch (e) { + if (this._serviceRegistry.getModule(module)) { + registeredModule.updateInstanceHealth( + instance.instanceId, + HealthCheckStatus.UNKNOWN, + ); + this.handleUnresponsiveInstance(module, instance.instanceId); + } } } } diff --git a/packages/core/src/config-manager/service-discovery/ServiceRegistry.ts b/packages/core/src/config-manager/service-discovery/ServiceRegistry.ts index a55ff16ee..db8f76e7f 100644 --- a/packages/core/src/config-manager/service-discovery/ServiceRegistry.ts +++ b/packages/core/src/config-manager/service-discovery/ServiceRegistry.ts @@ -1,5 +1,5 @@ import { ConduitGrpcSdk } from '@conduitplatform/grpc-sdk'; -import { RegisteredModule } from '@conduitplatform/commons'; +import { ModuleInstance, RegisteredModule } from '../models/RegisteredModule.js'; export class ServiceRegistry { private static _instance: ServiceRegistry; @@ -24,29 +24,38 @@ export class ServiceRegistry { return this.registeredModules.get(moduleName); } - updateModule(moduleName: string, module: RegisteredModule) { - this.registeredModules.set(moduleName, module); + updateModule(moduleName: string, module: ModuleInstance) { + const existingModule = this.registeredModules.get(moduleName); + if (existingModule) { + existingModule.addOrUpdateInstance(module); + } else { + const newModule = new RegisteredModule(moduleName, this.grpcSdk, [module]); + this.registeredModules.set(moduleName, newModule); + } } removeModule(moduleName: string) { this.registeredModules.delete(moduleName); } - getModuleDetails( - moduleName: string, - ): { moduleName: string; url: string; serving: boolean } | undefined { - const module = this.registeredModules.get(moduleName); - if (!module) return undefined; - return { moduleName: moduleName, url: module.address, serving: module.serving }; - } - - getModuleDetailsList(): { moduleName: string; url: string; serving: boolean }[] { - const modules: { moduleName: string; url: string; serving: boolean }[] = []; + getModuleDetailsList(): { + moduleName: string; + url: string; + serving: boolean; + instances: ModuleInstance[]; + }[] { + const modules: { + moduleName: string; + url: string; + serving: boolean; + instances: ModuleInstance[]; + }[] = []; this.registeredModules.forEach((value: RegisteredModule, key: string) => { modules.push({ moduleName: key, - url: value.address, - serving: value.serving, + url: value.servingAddress ?? value.allAddresses!, + serving: value.isServing, + instances: value.instances, }); }); return modules; diff --git a/packages/core/src/config-manager/service-discovery/index.ts b/packages/core/src/config-manager/service-discovery/index.ts index 9ed5a2226..5faf285df 100644 --- a/packages/core/src/config-manager/service-discovery/index.ts +++ b/packages/core/src/config-manager/service-discovery/index.ts @@ -1,4 +1,11 @@ -import { ModuleListResponse } from '@conduitplatform/commons'; +import { + ModuleExistsRequest, + ModuleExistsResponse, + ModuleHealthRequest, + ModuleListResponse, + RegisterModuleRequest, + RegisterModuleResponse, +} from '@conduitplatform/commons'; import { ConduitGrpcSdk, GrpcCallback, @@ -6,6 +13,7 @@ import { GrpcResponse, HealthCheckStatus, Indexable, + ServerUnaryCall, } from '@conduitplatform/grpc-sdk'; import { IModuleConfig } from '../../interfaces/IModuleConfig.js'; import { ServerWritableStream, status } from '@grpc/grpc-js'; @@ -13,6 +21,8 @@ import { EventEmitter } from 'events'; import { ServiceRegistry } from './ServiceRegistry.js'; import { ServiceMonitor } from './ServiceMonitor.js'; import { isEmpty } from 'lodash-es'; +import { ServiceDiscoverMessage } from '../models/ServiceDiscover.message.js'; +import { ModuleInstance, RegisteredModule } from '../models/RegisteredModule.js'; /* * - Multi-instance services are not handled individually (LoadBalancer) @@ -34,8 +44,8 @@ export class ServiceDiscovery { this._serviceMonitor = ServiceMonitor.getInstance(grpcSdk, this.moduleRegister); } - getModuleUrlByName(name: string): string | undefined { - return this._serviceRegistry.getModule(name)?.address; + getModule(name: string): RegisteredModule | undefined { + return this._serviceRegistry.getModule(name); } beginMonitors() { @@ -47,48 +57,35 @@ export class ServiceDiscovery { if (state && !isEmpty(state)) { const parsedState = JSON.parse(state) as { modules: IModuleConfig[] }; if (parsedState.modules) { - const success: IModuleConfig[] = []; for (const module of parsedState.modules) { - try { - await this._recoverModule(module.name, module.url); - success.push({ - name: module.name, - url: module.url, - ...(module.configSchema && { configSchema: module.configSchema }), - }); - } catch (e) { - ConduitGrpcSdk.Logger.error( - `SD: failed to recover: ${module.name} ${module.url}`, - ); - ConduitGrpcSdk.Logger.error(`SD: recovery error: ${e}`); + for (const instance of module.instances) { + try { + await this._recoverModule(module.name, instance); + } catch (e) { + ConduitGrpcSdk.Logger.error( + `SD: failed to recover: ${module.name}@${instance.address}`, + ); + ConduitGrpcSdk.Logger.error(`SD: recovery error: ${e}`); + } } } - await this.grpcSdk - .state!.modifyState(async (existingState: Indexable) => { - const state = existingState ?? {}; - state.modules = success; - return state; - }) - .then(() => { - ConduitGrpcSdk.Logger.log('Recovered state'); - }) - .catch(() => { - ConduitGrpcSdk.Logger.error('Failed to recover state'); - }); } } this.grpcSdk.bus!.subscribe('service-discover', (message: string) => { - const parsedMessage = JSON.parse(message); - if (parsedMessage.type === 'module-health') { + const parsedMessage: ServiceDiscoverMessage = JSON.parse(message); + if (parsedMessage.type === 'instance-health') { this._serviceMonitor.updateModuleHealth( parsedMessage.name, - parsedMessage.url, + parsedMessage.instanceId, parsedMessage.status, ); } else if (parsedMessage.type === 'serving-modules-update') { this._serviceRegistry.updateModule(parsedMessage.name, { - address: parsedMessage.url, + address: parsedMessage.address, + url: parsedMessage.url!, serving: true, + instanceId: parsedMessage.instanceId, + status: parsedMessage.status, }); } }); @@ -98,7 +95,10 @@ export class ServiceDiscovery { * Used by modules to notify Core regarding changes in their health state. * Called on module health change via grpc-sdk. */ - moduleHealthProbe(call: any, callback: GrpcResponse) { + moduleHealthProbe( + call: GrpcRequest, + callback: GrpcResponse, + ) { if ( call.request.status < HealthCheckStatus.UNKNOWN || call.request.status > HealthCheckStatus.NOT_SERVING @@ -109,18 +109,20 @@ export class ServiceDiscovery { }); return; } + const callingIp = (call as unknown as ServerUnaryCall).getPeer(); ConduitGrpcSdk.Logger.log( - `SD: received: ${call.request.moduleName} ${call.request.url} ${call.request.status}`, + `SD: received: ${call.request.moduleName}/${call.request.instanceId}, source:${callingIp}, status${call.request.status}`, ); this._serviceMonitor.updateModuleHealth( call.request.moduleName, - call.request.url, + call.request.instanceId, call.request.status as HealthCheckStatus, ); - this.publishModuleData( - 'module-health', + this.publishInstanceHealth( + 'instance-health', call.request.moduleName, - call.request.url, + callingIp, + call.request.instanceId, call.request.status, ); callback(null, null); @@ -128,43 +130,49 @@ export class ServiceDiscovery { moduleList(call: GrpcRequest, callback: GrpcCallback) { const modules = this._serviceRegistry.getModuleDetailsList(); + //@ts-expect-error callback(null, { modules }); } watchModules(call: ServerWritableStream) { this.moduleRegister.on('serving-modules-update', () => { const modules = this._serviceRegistry.getModuleDetailsList(); + //@ts-expect-error call.write({ modules }); }); } - async _recoverModule(moduleName: string, moduleUrl: string) { + async _recoverModule(moduleName: string, instance: Omit) { let healthResponse; if (!this.grpcSdk.getModule(moduleName)) { - healthResponse = await this.grpcSdk.isModuleUp(moduleName, moduleUrl); - this.grpcSdk.createModuleClient(moduleName, moduleUrl); + healthResponse = await this.grpcSdk.isModuleUp(moduleName, instance.address); + this.grpcSdk.createModuleClient(moduleName, instance.address); } const healthStatus = healthResponse.status as unknown as HealthCheckStatus; ConduitGrpcSdk.Logger.log( - `SD: registering: ${moduleName} ${moduleUrl} ${healthStatus}`, + `SD: registering: ${moduleName}/${instance.instanceId}@${instance.address} ${healthStatus}`, ); - this._serviceRegistry.updateModule(moduleName, { - address: moduleUrl, - serving: healthStatus === HealthCheckStatus.SERVING, - }); + //@ts-expect-error + this._serviceRegistry.updateModule(moduleName, instance); if (!this.grpcSdk.isAvailable(moduleName)) { - this.grpcSdk.createModuleClient(moduleName, moduleUrl); + this.grpcSdk.createModuleClient(moduleName, instance.address); } - this._serviceMonitor.updateModuleHealth(moduleName, moduleUrl, healthStatus); + this._serviceMonitor.updateModuleHealth( + moduleName, + instance.instanceId, + healthStatus, + ); } async _registerModule( moduleName: string, moduleUrl: string, + instanceUrl: string, + instanceId: string, healthStatus?: HealthCheckStatus, ) { if (healthStatus === undefined) { @@ -174,21 +182,30 @@ export class ServiceDiscovery { `SD: registering: ${moduleName} ${moduleUrl} ${healthStatus}`, ); this._serviceRegistry.updateModule(moduleName, { - address: moduleUrl, + address: instanceUrl, + url: moduleUrl, + instanceId: instanceId, serving: healthStatus === HealthCheckStatus.SERVING, + status: healthStatus, }); if (!this.grpcSdk.isAvailable(moduleName)) { - this.grpcSdk.createModuleClient(moduleName, moduleUrl); + this.grpcSdk.createModuleClient( + moduleName, + this._serviceRegistry.getModule(moduleName)!.servingAddress!, + ); } - this._serviceMonitor.updateModuleHealth(moduleName, moduleUrl, healthStatus!); + this._serviceMonitor.updateModuleHealth(moduleName, instanceId, healthStatus!); } - async registerModule(call: any, callback: GrpcResponse<{ result: boolean }>) { + async registerModule( + call: GrpcRequest, + callback: GrpcResponse, + ) { if ( - call.request.status < HealthCheckStatus.UNKNOWN || - call.request.status > HealthCheckStatus.NOT_SERVING + call.request.healthStatus < HealthCheckStatus.UNKNOWN || + call.request.healthStatus > HealthCheckStatus.NOT_SERVING ) { callback({ code: status.INVALID_ARGUMENT, @@ -196,77 +213,145 @@ export class ServiceDiscovery { }); return; } + const callingIp = (call as unknown as ServerUnaryCall).getPeer(); await this._registerModule( call.request.moduleName, call.request.url, + callingIp.split(':')[0] + ':' + call.request.url.split(':')[1], + call.request.instanceId, call.request.healthStatus as HealthCheckStatus, ); - this.updateState(call.request.moduleName, call.request.url); + this.updateState(call.request.moduleName, { + address: callingIp.split(':')[0] + ':' + call.request.url.split(':')[1], + url: call.request.url, + instanceId: call.request.instanceId, + serving: call.request.healthStatus === HealthCheckStatus.SERVING, + status: call.request.healthStatus, + }); this.publishModuleData( 'serving-modules-update', call.request.moduleName, + callingIp, call.request.url, + call.request.instanceId, call.request.healthStatus, ); callback(null, { result: true }); } moduleExists( - call: GrpcRequest<{ moduleName: string }>, - callback: GrpcResponse<{ url: string }>, + call: GrpcRequest, + callback: GrpcResponse, ) { const module = this._serviceRegistry.getModule(call.request.moduleName); - if (module) { - callback(null, { url: module.address }); + const hasAddress = module?.servingAddress; + if (module && hasAddress) { + callback(null, { + url: module.servingAddress ?? module.allAddresses!, + //@ts-expect-error + instances: module.instances, + }); } else { callback({ code: status.NOT_FOUND, - message: 'Module is missing', + message: "Module is missing or doesn't have a registered address", }); } } + private publishInstanceHealth( + type: 'instance-health', + name: string, + address: string, + instanceId: string, + status: HealthCheckStatus, + ) { + const serviceDiscoverMessage: ServiceDiscoverMessage = { + type, + name, + address, + instanceId, + status, + }; + this.grpcSdk.bus!.publish('service-discover', JSON.stringify(serviceDiscoverMessage)); + } + private publishModuleData( - type: string, + type: 'serving-modules-update', name: string, - url?: string, - status?: HealthCheckStatus, + address: string, + url: string, + instanceId: string, + status: HealthCheckStatus, ) { - this.grpcSdk.bus!.publish( - 'service-discover', - JSON.stringify({ - type, - name, - url, - status, - }), - ); + const serviceDiscoverMessage: ServiceDiscoverMessage = { + type, + name, + address, + url, + instanceId, + status, + }; + this.grpcSdk.bus!.publish('service-discover', JSON.stringify(serviceDiscoverMessage)); } - private updateState(name: string, url: string) { + private updateState(name: string, instance: ModuleInstance) { this.grpcSdk .state!.modifyState(async (existingState: Indexable) => { const state = existingState ?? {}; if (!state.modules) state.modules = []; - const module = state.modules.find((module: IModuleConfig) => { - return module.url === url; + let module = state.modules.find((module: IModuleConfig) => { + return module.name === name; }); + if (!module) { + module = { + name, + addresses: '', + instances: [ + { + instanceId: instance.instanceId, + address: instance.address, + url: instance.url, + status: instance.status, + }, + ], + status: instance.status, + }; + } else { + let existingInstance = module.instances.find((instance: ModuleInstance) => { + return instance.instanceId === instance.instanceId; + }); + if (!existingInstance) { + module.instances.push({ + instanceId: instance.instanceId, + address: instance.address, + url: instance.url, + status: instance.status, + }); + } else { + existingInstance.address = instance.address; + existingInstance.url = instance.url; + existingInstance.status = instance.status; + } + } state.modules = [ ...state.modules.filter((module: IModuleConfig) => module.name !== name), { - ...module, //persist the module config schema - name, - url, + ...module, }, ]; return state; }) .then(() => { - ConduitGrpcSdk.Logger.log(`SD: Updated state for ${name} ${url}`); + ConduitGrpcSdk.Logger.log( + `SD: Updated state for ${name}/${instance.instanceId}:${instance.address}`, + ); }) .catch(() => { - ConduitGrpcSdk.Logger.error(`SD: Failed to update state ${name} ${url}`); + ConduitGrpcSdk.Logger.error( + `SD: Failed to update state ${name}/${instance.instanceId}:${instance.address}`, + ); }); } } diff --git a/packages/core/src/config-manager/utils/index.ts b/packages/core/src/config-manager/utils/index.ts new file mode 100644 index 000000000..f0e1b84f9 --- /dev/null +++ b/packages/core/src/config-manager/utils/index.ts @@ -0,0 +1,25 @@ +export const getAddressType = (address: string) => { + const ipv4Regex = new RegExp( + '^((25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?)\\.){3}(25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?)$', + ); + const ipv6Regex = new RegExp('^([0-9a-fA-F]{1,4}:){7}([0-9a-fA-F]{1,4}|:)$'); + if (ipv4Regex.test(address.split(':')[0])) { + return 'ipv4'; + } else if (ipv6Regex.test(address)) { + return 'ipv6'; + } else { + return 'dns'; + } +}; + +export const formatAddress = (address: string, addressType: 'ipv4' | 'ipv6' | 'dns') => { + if (addressType === 'dns') { + return address; + } else if (addressType === 'ipv4') { + return `${address}`; + } else { + const [ipPart, port] = address.includes(']:') ? address.split(']:') : [address, '']; + const ip = ipPart.replace(/^\[|\]$/g, ''); + return port ? `[${ip}]:${port}` : `${ip}`; + } +}; diff --git a/packages/core/src/core.proto b/packages/core/src/core.proto index 78129e42c..82b66beaa 100644 --- a/packages/core/src/core.proto +++ b/packages/core/src/core.proto @@ -78,8 +78,8 @@ message PatchRouteMiddlewaresRequest { message ModuleHealthRequest { string moduleName = 1; - string url = 2; - int32 status = 3; // [0, 1, 2] + int32 status = 2; // [0, 1, 2] + string instanceId = 3; } message ModuleByNameRequest { @@ -88,25 +88,35 @@ message ModuleByNameRequest { message ModuleByNameResponse { string moduleUrl = 1; + repeated ModuleInstance instances = 2; } message RegisterModuleRequest { string moduleName = 1; string url = 2; int32 healthStatus = 3; // [0, 1, 2] + string instanceId = 4; } message RegisterModuleResponse { bool result = 1; } - +message ModuleInstance { + string instanceId = 1; + string address = 2; + int32 status = 3; + bool serving = 4; + string addressType = 5; +} message ModuleListResponse { repeated ModuleResponse modules = 1; message ModuleResponse { string moduleName = 1; string url = 2; bool serving = 3; + repeated ModuleInstance instances = 4; } + } message ModuleExistsRequest { @@ -115,6 +125,7 @@ message ModuleExistsRequest { message ModuleExistsResponse { string url = 1; + repeated ModuleInstance instances = 2; } message GetConfigRequest { diff --git a/packages/core/src/interfaces/IModuleConfig.ts b/packages/core/src/interfaces/IModuleConfig.ts index fe62aaa7d..8a53c16bd 100644 --- a/packages/core/src/interfaces/IModuleConfig.ts +++ b/packages/core/src/interfaces/IModuleConfig.ts @@ -1,6 +1,14 @@ +import { HealthCheckStatus } from '@conduitplatform/grpc-sdk'; + export interface IModuleConfig { name: string; - instance?: string; - url: string; + addresses: string; + instances: { + instanceId: string; + address: string; + url: string; + status?: HealthCheckStatus; + }[]; + status?: HealthCheckStatus; configSchema?: string; }