diff --git a/libraries/grpc-sdk/package.json b/libraries/grpc-sdk/package.json index 2342d5238..268f9fea4 100644 --- a/libraries/grpc-sdk/package.json +++ b/libraries/grpc-sdk/package.json @@ -30,6 +30,7 @@ "nice-grpc-client-middleware-retry": "^3", "prom-client": "^15.0.0", "protobufjs": "^7.2.4", + "redlock": "^5.0.0-beta.2", "winston": "^3.8.2", "winston-loki": "^6.0.7" }, diff --git a/libraries/grpc-sdk/src/utilities/StateManager.ts b/libraries/grpc-sdk/src/utilities/StateManager.ts index 6b3347ca4..a4c9197c7 100644 --- a/libraries/grpc-sdk/src/utilities/StateManager.ts +++ b/libraries/grpc-sdk/src/utilities/StateManager.ts @@ -1,11 +1,56 @@ import { RedisManager } from './RedisManager'; import { Cluster, Redis } from 'ioredis'; +import Redlock, { Lock } from 'redlock'; +import { Indexable } from '../interfaces'; + +export enum KNOWN_LOCKS { + STATE_MODIFICATION = 'state_modification', +} export class StateManager { private readonly redisClient: Redis | Cluster; + private readonly redLock: Redlock; constructor(redisManager: RedisManager, name: string) { this.redisClient = redisManager.getClient({ keyPrefix: name + '_' }); + this.redLock = new Redlock([this.redisClient], { + // The expected clock drift; for more details see: + // http://redis.io/topics/distlock + driftFactor: 0.01, // multiplied by lock ttl to determine drift time + // The max number of times Redlock will attempt to lock a resource + // before error. + retryCount: 10, + + // the time in ms between attempts + retryDelay: 100, // time in ms + // the max time in ms randomly added to retries + // to improve performance under high contention + // see https://www.awsarchitectureblog.com/2015/03/backoff.html + retryJitter: 200, // time in ms + // The minimum remaining time on a lock before an extension is automatically + // attempted with the `using` API. + automaticExtensionThreshold: 500, // time in ms + }); + } + + async acquireLock(resource: string, ttl: number = 5000): Promise { + return await this.redLock.acquire([resource], ttl); + } + + async releaseLock(lock: Lock) { + await lock.release(); + } + + async modifyState(modifier: (state: Indexable) => Promise) { + const lock = await this.acquireLock(KNOWN_LOCKS.STATE_MODIFICATION); + try { + const retrievedState = (await this.getState()) ?? '{}'; + const currentState = JSON.parse(retrievedState); + const newState = await modifier(currentState); + await this.setState(JSON.stringify(newState)); + } finally { + await this.releaseLock(lock); + } } setState(stateObj: any) { diff --git a/libraries/module-tools/src/utilities/linearBackoffTimeout.ts b/libraries/module-tools/src/utilities/linearBackoffTimeout.ts index bb96c096d..e059fb485 100644 --- a/libraries/module-tools/src/utilities/linearBackoffTimeout.ts +++ b/libraries/module-tools/src/utilities/linearBackoffTimeout.ts @@ -6,18 +6,25 @@ import { clearTimeout } from 'timers'; export function linearBackoffTimeout( - onTry: (timeout: NodeJS.Timeout) => void, + onTry: (stop: () => void) => void, delay: number, reps?: number, onFailure?: () => void, ) { const nextRep = () => reps === undefined || --reps > 0; + let stopSignal = false; + const stop = () => { + stopSignal = true; + }; let timeout: NodeJS.Timeout | null; const invoker = async () => { + if (stopSignal) { + return timeout ? clearTimeout(timeout) : null; + } delay = Math.floor(delay * 2); if (delay > 0 && nextRep()) { timeout = setTimeout(invoker, delay); - onTry(timeout); + onTry(stop); } else { timeout = null; onFailure && onFailure(); diff --git a/packages/admin/src/index.ts b/packages/admin/src/index.ts index 2cbbcc6e0..f5effa6a9 100644 --- a/packages/admin/src/index.ts +++ b/packages/admin/src/index.ts @@ -24,7 +24,7 @@ import * as middleware from './middleware'; import * as adminRoutes from './routes'; import * as models from './models'; import { AdminMiddleware } from './models'; -import { protoTemplate, getSwaggerMetadata } from './hermes'; +import { getSwaggerMetadata, protoTemplate } from './hermes'; import path from 'path'; import { ConduitMiddleware, @@ -78,7 +78,10 @@ export default class AdminModule extends IConduitAdmin { private databaseHandled = false; private hasAppliedMiddleware: string[] = []; - constructor(readonly commons: ConduitCommons, grpcSdk: ConduitGrpcSdk) { + constructor( + readonly commons: ConduitCommons, + grpcSdk: ConduitGrpcSdk, + ) { super(commons); this.grpcSdk = grpcSdk; ProtoGenerator.getInstance(protoTemplate); @@ -358,7 +361,7 @@ export default class AdminModule extends IConduitAdmin { } private async highAvailability() { - const r = await this.grpcSdk.state!.getKey('admin'); + const r = await this.grpcSdk.state!.getState(); const proxyRoutes = await models.AdminProxyRoute.getInstance().findMany({}); if ((!r || r.length === 0) && (!proxyRoutes || proxyRoutes.length === 0)) { this.cleanupRoutes(); @@ -430,10 +433,9 @@ export default class AdminModule extends IConduitAdmin { url: string, moduleName?: string, ) { - this.grpcSdk - .state!.getKey('admin') - .then(r => { - const state = !r || r.length === 0 ? {} : JSON.parse(r); + this.grpcSdk.state + ?.modifyState(async (existingState: Indexable) => { + const state = existingState ?? {}; if (!state.routes) state.routes = []; let index; (state.routes as Indexable[]).forEach((val, i) => { @@ -450,7 +452,7 @@ export default class AdminModule extends IConduitAdmin { moduleName, }); } - return this.grpcSdk.state!.setKey('admin', JSON.stringify(state)); + return state; }) .then(() => { this.publishAdminRouteData(routes, url, moduleName); diff --git a/packages/core/src/config-manager/admin/routes/GetModules.route.ts b/packages/core/src/config-manager/admin/routes/GetModules.route.ts index 47349d80e..a0d644995 100644 --- a/packages/core/src/config-manager/admin/routes/GetModules.route.ts +++ b/packages/core/src/config-manager/admin/routes/GetModules.route.ts @@ -1,16 +1,15 @@ -import { RegisteredModule } from '@conduitplatform/commons'; import { ConduitError, ConduitRouteActions, ConduitRouteParameters, ConduitRouteReturnDefinition, - UntypedArray, } from '@conduitplatform/grpc-sdk'; import { ConduitRoute } from '@conduitplatform/hermes'; import { isNil } from 'lodash'; import { ConduitBoolean, ConduitString } from '@conduitplatform/module-tools'; +import { ServiceRegistry } from '../../service-discovery/ServiceRegistry'; -export function getModulesRoute(registeredModules: Map) { +export function getModulesRoute() { return new ConduitRoute( { path: '/config/modules', @@ -31,24 +30,15 @@ export function getModulesRoute(registeredModules: Map }), async (call: ConduitRouteParameters) => { const sortByName = call.params!.sortByName; - if (registeredModules.size !== 0) { - const modules: UntypedArray = []; - registeredModules.forEach((value: RegisteredModule, key: string) => { - modules.push({ - moduleName: key, - url: value.address, - serving: value.serving, - }); - }); - if (!isNil(sortByName)) { - if (sortByName) - modules!.sort((a, b) => a.moduleName.localeCompare(b.moduleName)); - else modules!.sort((a, b) => b.moduleName.localeCompare(a.moduleName)); - } - return { modules }; - } else { + const modules = ServiceRegistry.getInstance().getModuleDetailsList(); + if (modules.length === 0) { throw new ConduitError('INTERNAL', 500, 'Modules not available yet'); } + if (!isNil(sortByName)) { + if (sortByName) modules!.sort((a, b) => a.moduleName.localeCompare(b.moduleName)); + else modules!.sort((a, b) => b.moduleName.localeCompare(a.moduleName)); + } + return { modules }; }, ); } diff --git a/packages/core/src/config-manager/admin/routes/GetMonoConfig.route.ts b/packages/core/src/config-manager/admin/routes/GetMonoConfig.route.ts index e9c38a606..bcce50700 100644 --- a/packages/core/src/config-manager/admin/routes/GetMonoConfig.route.ts +++ b/packages/core/src/config-manager/admin/routes/GetMonoConfig.route.ts @@ -3,14 +3,10 @@ import ConduitGrpcSdk, { ConduitRouteReturnDefinition, } from '@conduitplatform/grpc-sdk'; import { ConduitJson } from '@conduitplatform/module-tools'; - -import { RegisteredModule } from '@conduitplatform/commons'; import { ConduitRoute } from '@conduitplatform/hermes'; +import { ServiceRegistry } from '../../service-discovery/ServiceRegistry'; -export function getMonoConfigRoute( - grpcSdk: ConduitGrpcSdk, - registeredModules: Map, -) { +export function getMonoConfigRoute(grpcSdk: ConduitGrpcSdk) { return new ConduitRoute( { path: '/config', @@ -26,7 +22,7 @@ export function getMonoConfigRoute( const sortedModules = [ 'core', 'admin', - ...Array.from(registeredModules.keys()), + ...ServiceRegistry.getInstance().getRegisteredModules(), ].sort(); for (const moduleName of sortedModules) { const moduleConfig = await grpcSdk.state!.getKey(`moduleConfigs.${moduleName}`); diff --git a/packages/core/src/config-manager/config-storage/index.ts b/packages/core/src/config-manager/config-storage/index.ts index 405c525ad..92b6f3fc2 100644 --- a/packages/core/src/config-manager/config-storage/index.ts +++ b/packages/core/src/config-manager/config-storage/index.ts @@ -4,6 +4,7 @@ import * as models from '../models'; import { ServiceDiscovery } from '../service-discovery'; import { clearInterval } from 'timers'; import { merge } from 'lodash'; +import { ServiceRegistry } from '../service-discovery/ServiceRegistry'; export class ConfigStorage { toBeReconciled: string[] = []; @@ -51,7 +52,7 @@ export class ConfigStorage { if (configDocs.length === 0) { // flush redis stored configuration to the database let moduleConfig; - for (const key of this.serviceDiscovery.registeredModules.keys()) { + for (const key of ServiceRegistry.getInstance().getRegisteredModules()) { try { moduleConfig = await this.getConfig(key, false); await models.Config.getInstance().create({ name: key, config: moduleConfig }); @@ -80,7 +81,7 @@ export class ConfigStorage { } } // Update Admin and all active modules - const registeredModules = Array.from(this.serviceDiscovery.registeredModules.keys()); + const registeredModules = ServiceRegistry.getInstance().getRegisteredModules(); const moduleConfigs = await models.Config.getInstance().findMany({}); for (const config of moduleConfigs) { if (config.name === 'core') continue; @@ -98,13 +99,16 @@ export class ConfigStorage { } reconcileMonitor() { - const reconciliationInterval = setInterval(() => { - if (this.grpcSdk.isAvailable('database') && this.toBeReconciled.length > 0) { - this.reconcile(); - } - // add a random extra amount to mitigate race-conditions, - // between core instances - }, 1500 + Math.floor(Math.random() * 300)); + const reconciliationInterval = setInterval( + () => { + if (this.grpcSdk.isAvailable('database') && this.toBeReconciled.length > 0) { + this.reconcile(); + } + // add a random extra amount to mitigate race-conditions, + // between core instances + }, + 1500 + Math.floor(Math.random() * 300), + ); process.on('exit', () => { clearInterval(reconciliationInterval); diff --git a/packages/core/src/config-manager/index.ts b/packages/core/src/config-manager/index.ts index 6d23ac3f5..b376ed78e 100644 --- a/packages/core/src/config-manager/index.ts +++ b/packages/core/src/config-manager/index.ts @@ -3,6 +3,7 @@ import ConduitGrpcSdk, { GrpcCallback, GrpcRequest, GrpcResponse, + Indexable, } from '@conduitplatform/grpc-sdk'; import { ConduitCommons, @@ -24,13 +25,17 @@ import convict from 'convict'; import { merge } from 'lodash'; import { GrpcServer } from '@conduitplatform/module-tools'; import { RedisOptions } from 'ioredis'; +import { ServiceRegistry } from './service-discovery/ServiceRegistry'; export default class ConfigManager implements IConfigManager { grpcSdk: ConduitGrpcSdk; private readonly serviceDiscovery: ServiceDiscovery; private _configStorage: ConfigStorage; - constructor(grpcSdk: ConduitGrpcSdk, private readonly sdk: ConduitCommons) { + constructor( + grpcSdk: ConduitGrpcSdk, + private readonly sdk: ConduitCommons, + ) { this.grpcSdk = grpcSdk; this.serviceDiscovery = new ServiceDiscovery(grpcSdk); this._configStorage = new ConfigStorage(sdk, grpcSdk, this.serviceDiscovery); @@ -40,6 +45,21 @@ export default class ConfigManager implements IConfigManager { return this.serviceDiscovery.getModuleUrlByName(moduleName); } + getModuleUrlByNameGrpc( + call: GrpcRequest<{ name: string }>, + callback: GrpcResponse<{ moduleUrl: string }>, + ) { + const name = call.request.name; + const result = this.getModuleUrlByName(name); + if (!result) { + return callback({ + code: status.NOT_FOUND, + message: 'Module not found', + }); + } + callback(null, { moduleUrl: result }); + } + async initialize(server: GrpcServer) { await server.addService( path.resolve(__dirname, '../../src/core.proto'), @@ -56,75 +76,28 @@ export default class ConfigManager implements IConfigManager { moduleHealthProbe: this.serviceDiscovery.moduleHealthProbe.bind( this.serviceDiscovery, ), - getModuleUrlByName: this.serviceDiscovery.getModuleUrlByNameGrpc.bind( - this.serviceDiscovery, - ), + getModuleUrlByName: this.getModuleUrlByNameGrpc.bind(this.serviceDiscovery), }, ); - await this.highAvailability(); this.serviceDiscovery.beginMonitors(); } - async highAvailability() { - const loadedState = await this.grpcSdk.state!.getKey('config'); - try { - if (!loadedState || loadedState.length === 0) return; - const state = JSON.parse(loadedState); - const success: IModuleConfig[] = []; - if (state.modules) { - for (const module of state.modules) { - try { - await this.serviceDiscovery._registerModule(module.name, module.url); - success.push({ - name: module.name, - url: module.url, - instance: module.instance, - ...(module.configSchema && { configSchema: module.configSchema }), - }); - } catch {} - } - if (state.modules.length > success.length) { - state.modules = success; - this.setState(state); - } - } else { - return Promise.resolve(); - } - } catch { - ConduitGrpcSdk.Logger.error('Failed to recover state'); - } - } - async recoverConfigRoutes() { const loadedState = await this.grpcSdk.state!.getKey('config'); try { if (!loadedState || loadedState.length === 0) return; const state = JSON.parse(loadedState); - if (state.modules) { - for (const module of state.modules) { - if (module.configSchema) { - this.registerConfigRoutes(module.name, module.configSchema); - } + if (!state.modules) return Promise.resolve(); + for (const module of state.modules) { + if (module.configSchema) { + this.registerConfigRoutes(module.name, module.configSchema); } - } else { - return Promise.resolve(); } } catch { ConduitGrpcSdk.Logger.error('Failed to recover state'); } } - setState(state: any) { - this.grpcSdk - .state!.setKey('config', JSON.stringify(state)) - .then(() => { - ConduitGrpcSdk.Logger.log('Updated state'); - }) - .catch(() => { - ConduitGrpcSdk.Logger.error('Failed to recover state'); - }); - } - getServerConfig(call: GrpcRequest, callback: GrpcCallback) { this._configStorage .getConfig('core') @@ -250,12 +223,10 @@ export default class ConfigManager implements IConfigManager { } async isModuleUp(moduleName: string) { - if (!this.serviceDiscovery.registeredModules.has(moduleName)) return false; + const module = ServiceRegistry.getInstance().getModule(moduleName); + if (!module) return false; try { - await this.grpcSdk.isModuleUp( - moduleName, - this.serviceDiscovery.registeredModules.get(moduleName)!.address, - ); + await this.grpcSdk.isModuleUp(moduleName, module.address); } catch (e) { return false; } @@ -263,25 +234,14 @@ export default class ConfigManager implements IConfigManager { } private registerAdminRoutes() { - this.sdk - .getAdmin() - .registerRoute( - adminRoutes.getModulesRoute(this.serviceDiscovery.registeredModules), - ); + this.sdk.getAdmin().registerRoute(adminRoutes.getModulesRoute()); } private registerConfigRoutes( moduleName: string, configSchema: convict.Config, ) { - this.sdk - .getAdmin() - .registerRoute( - adminRoutes.getMonoConfigRoute( - this.grpcSdk, - this.serviceDiscovery.registeredModules, - ), - ); + this.sdk.getAdmin().registerRoute(adminRoutes.getMonoConfigRoute(this.grpcSdk)); this.sdk .getAdmin() .registerRoute( @@ -301,17 +261,13 @@ export default class ConfigManager implements IConfigManager { private updateState(name: string, configSchema: convict.Config) { this.grpcSdk - .state!.getKey('config') - .then(r => { - if (!r || r.length === 0) { - throw new Error('No config state found'); - } - const state = JSON.parse(r); + .state!.modifyState(async (existingState: Indexable) => { + const state = existingState ?? {}; const module = state.modules.find((module: IModuleConfig) => { return module.name === name; }); if (!module) { - throw new Error('Cannot update module state'); + throw new Error('Config-manager: Cannot update module state'); } state.modules = [ ...state.modules.filter((module: IModuleConfig) => module.name !== name), @@ -320,13 +276,14 @@ export default class ConfigManager implements IConfigManager { configSchema, }, ]; - return this.grpcSdk.state!.setKey('config', JSON.stringify(state)); + return state; }) .then(() => { - ConduitGrpcSdk.Logger.log('Updated state'); + ConduitGrpcSdk.Logger.log('Config-manager: Updated state'); }) - .catch(() => { - ConduitGrpcSdk.Logger.error('Failed to recover state'); + .catch(e => { + console.error(e); + ConduitGrpcSdk.Logger.error('Config-manager: Failed to recover state'); }); } } diff --git a/packages/core/src/config-manager/service-discovery/ServiceMonitor.ts b/packages/core/src/config-manager/service-discovery/ServiceMonitor.ts new file mode 100644 index 000000000..6af99229f --- /dev/null +++ b/packages/core/src/config-manager/service-discovery/ServiceMonitor.ts @@ -0,0 +1,185 @@ +import ConduitGrpcSdk, { + HealthCheckResponse, + HealthCheckStatus, +} from '@conduitplatform/grpc-sdk'; +import { ServiceRegistry } from './ServiceRegistry'; +import { linearBackoffTimeout } from '@conduitplatform/module-tools'; +import { EventEmitter } from 'events'; + +export class ServiceMonitor { + private static _instance: ServiceMonitor; + private readonly moduleHealth: { + [module: string]: { address: string; timestamp: number; status: HealthCheckStatus }; + } = {}; + private readonly _serviceRegistry = ServiceRegistry.getInstance(); + private monitorIntervalMs = 5000; + private serviceReconnectionInitMs = 500; + private serviceReconnectionRetries = 10; + + private constructor( + private readonly grpcSdk: ConduitGrpcSdk, + private readonly moduleRegister: EventEmitter, + ) {} + + public static getInstance(grpcSdk?: ConduitGrpcSdk, moduleRegister?: EventEmitter) { + if ((!grpcSdk || !moduleRegister) && !this._instance) { + throw new Error('ServiceMonitor not initialized'); + } + if (!this._instance) { + this._instance = new ServiceMonitor(grpcSdk!, moduleRegister!); + } + return this._instance; + } + + beginMonitors() { + if (process.env.DEBUG__DISABLE_SERVICE_REMOVAL === 'true') { + ConduitGrpcSdk.Logger.warn( + 'Service discovery module removal disabled for Debugging!', + ); + } else { + const specifiedInterval = parseInt(process.env.SERVICE_MONITOR_INTERVAL_MS ?? ''); + const specifiedRetries = parseInt(process.env.SERVICE_RECONN_RETRIES ?? ''); + const specifiedRetryInit = parseInt(process.env.SERVICE_RECONN_INIT_MS ?? ''); + if (!isNaN(specifiedRetryInit)) this.serviceReconnectionInitMs = specifiedRetryInit; + if (!isNaN(specifiedInterval)) this.monitorIntervalMs = specifiedInterval; + if (!isNaN(specifiedRetries)) this.serviceReconnectionRetries = specifiedRetries; + ConduitGrpcSdk.Logger.log( + `Service discovery monitoring interval set to ${this.monitorIntervalMs}ms`, + ); + setInterval(() => { + this.monitorModules().then(); + }, this.monitorIntervalMs); + } + } + + /* + * Health checks target module service, updating its health state. + * Any services that do not provide a gRPC health check service are assumed to be healthy. + * Used by healthCheckRegisteredModules(), reviveService() + */ + private async healthCheckService(module: string, address: string) { + const healthClient = this.grpcSdk.getHealthClient(module); + let status = HealthCheckStatus.SERVING; + if (healthClient) { + status = await healthClient + .check({}) + .then((res: HealthCheckResponse) => res.status as unknown as HealthCheckStatus); + } + const isRegistered = Object.keys(this.moduleHealth).includes(module); + if (!isRegistered && status === HealthCheckStatus.SERVICE_UNKNOWN) return; + this.updateModuleHealth(module, address, status); + return status; + } + + handleUnresponsiveModule( + moduleName: string, + moduleUrl: string, + status: HealthCheckStatus, + ) { + const isRegistered = Object.keys(this.moduleHealth).includes(moduleName); + if (!isRegistered && status === HealthCheckStatus.SERVICE_UNKNOWN) { + ConduitGrpcSdk.Logger.info( + `SD/health: unresponsive ${moduleName} ${moduleUrl} with no health history`, + ); + return; + } + + ConduitGrpcSdk.Logger.log(`SD/health: update ${moduleName} ${moduleUrl} unknown`); + this.grpcSdk.updateModuleHealth(moduleName, false); + // Deregister Unresponsive Module + delete this.moduleHealth[moduleName]; + this._serviceRegistry.removeModule(moduleName); + this.reviveService(moduleName, moduleUrl); + } + + updateModuleHealth( + moduleName: string, + moduleUrl: string, + moduleStatus: HealthCheckStatus, + ) { + if (moduleStatus === HealthCheckStatus.SERVICE_UNKNOWN) { + return this.handleUnresponsiveModule(moduleName, moduleUrl, moduleStatus); + } + + let module = this._serviceRegistry.getModule(moduleName); + if (!module) { + module = { + address: moduleUrl, + serving: moduleStatus === HealthCheckStatus.SERVING, + }; + ConduitGrpcSdk.Logger.log( + `SD/health: update unregistered module ${moduleName} ${moduleUrl} ${moduleStatus}`, + ); + this._serviceRegistry.updateModule(moduleName, module); + } else { + const prevStatus = module.serving; + ConduitGrpcSdk.Logger.log( + `SD/health: update registered module ${moduleName} ${moduleUrl} to ${moduleStatus} from serving: ${prevStatus}`, + ); + module.serving = moduleStatus === HealthCheckStatus.SERVING; + } + this.grpcSdk.updateModuleHealth( + moduleName, + moduleStatus === HealthCheckStatus.SERVING, + ); + this._serviceRegistry.updateModule(moduleName, module); + this.moduleHealth[moduleName] = { + address: moduleUrl, + timestamp: Date.now(), + status: moduleStatus, + }; + } + + /* + * Attempt to reconnect to a recently removed module service. + * Retries using linear backoff. + */ + private reviveService(name: string, address: string) { + const onTry = (stop: () => void) => { + if (Object.keys(this.moduleHealth).includes(name)) { + stop(); + ConduitGrpcSdk.Logger.log(`SD/health/revive: found healthy ${name} ${address}`); + } else { + this.healthCheckService(name, address) + .then(() => { + ConduitGrpcSdk.Logger.log( + `SD/health/revive: check completed ${name} ${address}`, + ); + }) + .catch(() => { + ConduitGrpcSdk.Logger.log( + `SD/health/revive: check failed ${name} ${address}`, + ); + }); + } + }; + const onFailure = () => { + this.grpcSdk.getModule(name)?.closeConnection(); + ConduitGrpcSdk.Logger.log( + `SD/health/revive: check connection closed ${name} ${address}`, + ); + }; + linearBackoffTimeout( + onTry, + this.serviceReconnectionInitMs, + this.serviceReconnectionRetries, + onFailure, + ); + } + + private async monitorModules() { + for (const module of this._serviceRegistry.getRegisteredModules()) { + const registeredModule = this._serviceRegistry.getModule(module)!; + try { + await this.healthCheckService(module, registeredModule.address); + } catch (e) { + this.handleUnresponsiveModule( + module, + registeredModule.address, + HealthCheckStatus.SERVICE_UNKNOWN, + ); + } + } + this.moduleRegister.emit('serving-modules-update'); + } +} diff --git a/packages/core/src/config-manager/service-discovery/ServiceRegistry.ts b/packages/core/src/config-manager/service-discovery/ServiceRegistry.ts new file mode 100644 index 000000000..aaf8de265 --- /dev/null +++ b/packages/core/src/config-manager/service-discovery/ServiceRegistry.ts @@ -0,0 +1,58 @@ +import ConduitGrpcSdk from '@conduitplatform/grpc-sdk'; +import { RegisteredModule } from '@conduitplatform/commons'; + +export class ServiceRegistry { + private static _instance: ServiceRegistry; + private readonly registeredModules: Map = new Map< + string, + RegisteredModule + >(); + + private constructor(private readonly grpcSdk: ConduitGrpcSdk) {} + + public static getInstance(grpcSdk?: ConduitGrpcSdk) { + if (!grpcSdk && !this._instance) { + throw new Error('ServiceRegistry not initialized'); + } + if (!this._instance) { + this._instance = new ServiceRegistry(grpcSdk!); + } + return this._instance; + } + + getModule(moduleName: string) { + return this.registeredModules.get(moduleName); + } + + updateModule(moduleName: string, module: RegisteredModule) { + this.registeredModules.set(moduleName, module); + } + + removeModule(moduleName: string) { + this.registeredModules.delete(moduleName); + } + + getModuleDetails( + moduleName: string, + ): { moduleName: string; url: string; serving: boolean } | undefined { + const module = this.registeredModules.get(moduleName); + if (!module) return undefined; + return { moduleName: moduleName, url: module.address, serving: module.serving }; + } + + getModuleDetailsList(): { moduleName: string; url: string; serving: boolean }[] { + const modules: { moduleName: string; url: string; serving: boolean }[] = []; + this.registeredModules.forEach((value: RegisteredModule, key: string) => { + modules.push({ + moduleName: key, + url: value.address, + serving: value.serving, + }); + }); + return modules; + } + + getRegisteredModules() { + return Array.from(this.registeredModules.keys()); + } +} diff --git a/packages/core/src/config-manager/service-discovery/index.ts b/packages/core/src/config-manager/service-discovery/index.ts index d63085ffd..bf80bb102 100644 --- a/packages/core/src/config-manager/service-discovery/index.ts +++ b/packages/core/src/config-manager/service-discovery/index.ts @@ -1,66 +1,88 @@ -import { ModuleListResponse, RegisteredModule } from '@conduitplatform/commons'; +import { ModuleListResponse } from '@conduitplatform/commons'; import ConduitGrpcSdk, { GrpcCallback, GrpcRequest, GrpcResponse, HealthCheckStatus, - UntypedArray, + Indexable, } from '@conduitplatform/grpc-sdk'; -import { linearBackoffTimeout } from '@conduitplatform/module-tools'; import { IModuleConfig } from '../../interfaces/IModuleConfig'; import { ServerWritableStream, status } from '@grpc/grpc-js'; import { EventEmitter } from 'events'; -import { clearTimeout } from 'timers'; +import { ServiceRegistry } from './ServiceRegistry'; +import { ServiceMonitor } from './ServiceMonitor'; /* /* * - Multi-instance services are not handled individually (LoadBalancer) * - Online Services are recovered on startup * - Unresponsive services are instantly removed from the list of exposed services * - Reconnection to recently removed services is attempted using linear backoff - * - Services that do not provide a gRPC health check service are assumed to be healthy + * - Services that do not provide a gRPC health check service are considered as healthy */ export class ServiceDiscovery { - readonly registeredModules: Map = new Map< - string, - RegisteredModule - >(); - private readonly moduleHealth: { - [module: string]: { address: string; timestamp: number; status: HealthCheckStatus }; - } = {}; private readonly moduleRegister: EventEmitter; - private servingStatusUpdate: boolean = false; - private monitorIntervalMs = 30000; - private serviceReconnRetries = 5; - private serviceReconnInitMs = 250; + + private readonly _serviceRegistry: ServiceRegistry; + private readonly _serviceMonitor: ServiceMonitor; constructor(private readonly grpcSdk: ConduitGrpcSdk) { this.moduleRegister = new EventEmitter(); this.moduleRegister.setMaxListeners(150); + this._serviceRegistry = ServiceRegistry.getInstance(grpcSdk); + this._serviceMonitor = ServiceMonitor.getInstance(grpcSdk, this.moduleRegister); } getModuleUrlByName(name: string): string | undefined { - return this.registeredModules.get(name)?.address; + return this._serviceRegistry.getModule(name)?.address; } beginMonitors() { - if (process.env.DEBUG__DISABLE_SERVICE_REMOVAL === 'true') { - ConduitGrpcSdk.Logger.warn( - 'Service discovery module removal disabled for Debugging!', - ); - } else { - const specifiedInterval = parseInt(process.env.SERVICE_MONITOR_INTERVAL_MS ?? ''); - const specifiedRetries = parseInt(process.env.SERVICE_RECONN_RETRIES ?? ''); - const specifiedRetryInit = parseInt(process.env.SERVICE_RECONN_INIT_MS ?? ''); - if (!isNaN(specifiedRetryInit)) this.serviceReconnInitMs = specifiedRetryInit; - if (!isNaN(specifiedInterval)) this.monitorIntervalMs = specifiedInterval; - if (!isNaN(specifiedRetries)) this.serviceReconnRetries = specifiedRetries; - ConduitGrpcSdk.Logger.log( - `Service discovery monitoring interval set to ${this.monitorIntervalMs}ms`, - ); - setInterval(() => { - this.monitorModules().then(); - }, this.monitorIntervalMs); - } + this.highAvailability().then(() => this._serviceMonitor.beginMonitors()); + } + + async highAvailability() { + await this.grpcSdk + .state!.modifyState(async (existingState: Indexable) => { + const state = existingState ?? {}; + const success: IModuleConfig[] = []; + if (!state.modules) return state; + for (const module of state.modules) { + try { + await this._recoverModule(module.name, module.url); + success.push({ + name: module.name, + url: module.url, + ...(module.configSchema && { configSchema: module.configSchema }), + }); + } catch {} + } + if (state.modules.length > success.length) { + state.modules = success; + } + return state; + }) + .then(() => { + ConduitGrpcSdk.Logger.log('Recovered state'); + }) + .catch(() => { + ConduitGrpcSdk.Logger.error('Failed to recover state'); + }); + + this.grpcSdk.bus!.subscribe('config', (message: string) => { + const parsedMessage = JSON.parse(message); + if (parsedMessage.type === 'module-health') { + this._serviceMonitor.updateModuleHealth( + parsedMessage.name, + parsedMessage.url, + parsedMessage.status, + ); + } else if (parsedMessage.type === 'serving-modules-update') { + this._serviceRegistry.updateModule(parsedMessage.name, { + address: parsedMessage.url, + serving: true, + }); + } + }); } /* @@ -78,85 +100,73 @@ export class ServiceDiscovery { }); return; } - this.updateModuleHealth( + ConduitGrpcSdk.Logger.log( + `SD: received: ${call.request.moduleName} ${call.request.url} ${call.request.status}`, + ); + this._serviceMonitor.updateModuleHealth( call.request.moduleName, call.request.url, call.request.status as HealthCheckStatus, ); - this.publishModuleData('module-health', call.request.moduleName, call.getPeer()); + this.publishModuleData( + 'module-health', + call.request.moduleName, + call.request.url, + call.request.status, + ); callback(null, null); } moduleList(call: GrpcRequest, callback: GrpcCallback) { - const modules: { - moduleName: string; - url: string; - serving: boolean; - }[] = []; - this.registeredModules.forEach((value: RegisteredModule, key: string) => { - modules.push({ - moduleName: key, - url: value.address, - serving: value.serving, - }); - }); + const modules = this._serviceRegistry.getModuleDetailsList(); callback(null, { modules }); } watchModules(call: ServerWritableStream) { - const self = this; this.moduleRegister.on('serving-modules-update', () => { - const modules: UntypedArray = []; - self.registeredModules.forEach((value: RegisteredModule, key: string) => { - modules.push({ - moduleName: key, - url: value.address, - serving: value.serving, - }); - }); + const modules = this._serviceRegistry.getModuleDetailsList(); call.write({ modules }); }); - // todo this should close gracefully I guess. } - getModuleUrlByNameGrpc( - call: GrpcRequest<{ name: string }>, - callback: GrpcResponse<{ moduleUrl: string }>, - ) { - const name = call.request.name; - const result = this.getModuleUrlByName(name); - if (result) { - callback(null, { moduleUrl: result }); - } else { - callback({ - code: status.NOT_FOUND, - message: 'Module not found', - }); + async _recoverModule(moduleName: string, moduleUrl: string) { + let healthResponse; + try { + if (!this.grpcSdk.getModule(moduleName)) { + healthResponse = await this.grpcSdk.isModuleUp(moduleName, moduleUrl); + this.grpcSdk.createModuleClient(moduleName, moduleUrl); + } + } catch (e) { + throw new Error('Failed to register unresponsive module'); } + const healthStatus = healthResponse.status as unknown as HealthCheckStatus; + ConduitGrpcSdk.Logger.log( + `SD: registering: ${moduleName} ${moduleUrl} ${healthStatus}`, + ); + this._serviceRegistry.updateModule(moduleName, { + address: moduleUrl, + serving: healthStatus === HealthCheckStatus.SERVING, + }); + + if (!this.grpcSdk.isAvailable(moduleName)) { + this.grpcSdk.createModuleClient(moduleName, moduleUrl); + } + + this._serviceMonitor.updateModuleHealth(moduleName, moduleUrl, healthStatus); } async _registerModule( moduleName: string, moduleUrl: string, healthStatus?: HealthCheckStatus, - fromGrpc = false, ) { - if (fromGrpc && healthStatus === undefined) { + if (healthStatus === undefined) { throw new Error('No module health status provided'); } - if (!fromGrpc) { - let healthResponse; - try { - if (!this.grpcSdk.getModule(moduleName)) { - healthResponse = await this.grpcSdk.isModuleUp(moduleName, moduleUrl); - this.grpcSdk.createModuleClient(moduleName, moduleUrl); - } - } catch (e) { - throw new Error('Failed to register unresponsive module'); - } - healthStatus = healthResponse.status as unknown as HealthCheckStatus; - } - this.registeredModules.set(moduleName, { + ConduitGrpcSdk.Logger.log( + `SD: registering: ${moduleName} ${moduleUrl} ${healthStatus}`, + ); + this._serviceRegistry.updateModule(moduleName, { address: moduleUrl, serving: healthStatus === HealthCheckStatus.SERVING, }); @@ -165,13 +175,7 @@ export class ServiceDiscovery { this.grpcSdk.createModuleClient(moduleName, moduleUrl); } - this.updateModuleHealth( - moduleName, - moduleUrl, - fromGrpc ? healthStatus! : HealthCheckStatus.SERVING, - false, - ); - this.moduleRegister.emit('serving-modules-update'); + this._serviceMonitor.updateModuleHealth(moduleName, moduleUrl, healthStatus!); } async registerModule(call: any, callback: GrpcResponse<{ result: boolean }>) { @@ -189,14 +193,13 @@ export class ServiceDiscovery { call.request.moduleName, call.request.url, call.request.healthStatus as HealthCheckStatus, - true, ); - this.updateState(call.request.moduleName, call.request.url, call.getPeer()); + this.updateState(call.request.moduleName, call.request.url); this.publishModuleData( 'serving-modules-update', call.request.moduleName, - call.getPeer(), call.request.url, + call.request.healthStatus, ); callback(null, { result: true }); } @@ -205,9 +208,9 @@ export class ServiceDiscovery { call: GrpcRequest<{ moduleName: string }>, callback: GrpcResponse<{ url: string }>, ) { - if (this.registeredModules.has(call.request.moduleName)) { - const address = this.registeredModules.get(call.request.moduleName)!.address; - callback(null, { url: address }); + const module = this._serviceRegistry.getModule(call.request.moduleName); + if (module) { + callback(null, { url: module.address }); } else { callback({ code: status.NOT_FOUND, @@ -216,120 +219,27 @@ export class ServiceDiscovery { } } - private publishModuleData(type: string, name: string, instance: string, url?: string) { + private publishModuleData( + type: string, + name: string, + url?: string, + status?: HealthCheckStatus, + ) { this.grpcSdk.bus!.publish( 'config', JSON.stringify({ type, name, url, - instance, + status, }), ); } - /* - * Health checks target module service, updating its health state. - * Any services that do not provide a gRPC health check service are assumed to be healthy. - * Used by healthCheckRegisteredModules(), reviveService() - */ - private async healthCheckService(module: string, address: string) { - const healthClient = this.grpcSdk.getHealthClient(module); - let status = HealthCheckStatus.SERVING; - if (healthClient) { - status = await healthClient - .check({}) - .then(res => res.status as unknown as HealthCheckStatus) - .catch(() => { - return HealthCheckStatus.SERVICE_UNKNOWN; - }); - } - const isRegistered = Object.keys(this.moduleHealth).includes(module); - if (!isRegistered && status === HealthCheckStatus.SERVICE_UNKNOWN) return; - this.updateModuleHealth(module, address, status); - } - - private async monitorModules() { - for (const module of Object.keys(this.moduleHealth)) { - const registeredModule = this.registeredModules.get(module)!; - await this.healthCheckService(module, registeredModule.address); - } - if (this.servingStatusUpdate) { - this.moduleRegister.emit('serving-modules-update'); - this.servingStatusUpdate = false; - } - } - - /* - * Attempt to reconnect to a recently removed module service. - * Retries using linear backoff. - */ - private reviveService(name: string, address: string) { - const onTry = (timeout: NodeJS.Timeout) => { - if (Object.keys(this.moduleHealth).includes(name)) { - clearTimeout(timeout); - } else { - this.healthCheckService(name, address).then(); - } - }; - const onFailure = () => { - this.grpcSdk.getModule(name)?.closeConnection(); - }; - linearBackoffTimeout( - onTry, - this.serviceReconnInitMs, - this.serviceReconnRetries, - onFailure, - ); - } - - private updateModuleHealth( - moduleName: string, - moduleUrl: string, - moduleStatus: HealthCheckStatus, - broadcast = true, - ) { - if (moduleStatus === HealthCheckStatus.SERVICE_UNKNOWN) { - this.grpcSdk.updateModuleHealth(moduleName, false); - // Deregister Unresponsive Module - delete this.moduleHealth[moduleName]; - this.registeredModules.delete(moduleName); - this.servingStatusUpdate = true; - this.reviveService(moduleName, moduleUrl); - return; - } - let module = this.registeredModules.get(moduleName); - if (!module) { - module = { - address: moduleUrl, - serving: moduleStatus === HealthCheckStatus.SERVING, - }; - this.registeredModules.set(moduleName, module); - this.servingStatusUpdate = true; - } else { - const prevStatus = module.serving; - module.serving = moduleStatus === HealthCheckStatus.SERVING; - if (!this.servingStatusUpdate && prevStatus !== module.serving && broadcast) { - this.servingStatusUpdate = true; - } - } - this.grpcSdk.updateModuleHealth( - moduleName, - moduleStatus === HealthCheckStatus.SERVING, - ); - this.registeredModules.set(moduleName, module); - this.moduleHealth[moduleName] = { - address: moduleUrl, - timestamp: Date.now(), - status: moduleStatus, - }; - } - - private updateState(name: string, url: string, instance: string) { + private updateState(name: string, url: string) { this.grpcSdk - .state!.getKey('config') - .then(r => { - const state = !r || r.length === 0 ? {} : JSON.parse(r); + .state!.modifyState(async (existingState: Indexable) => { + const state = existingState ?? {}; if (!state.modules) state.modules = []; const module = state.modules.find((module: IModuleConfig) => { return module.url === url; @@ -340,18 +250,16 @@ export class ServiceDiscovery { { ...module, //persist the module config schema name, - instance, url, }, ]; - - return this.grpcSdk.state!.setKey('config', JSON.stringify(state)); + return state; }) .then(() => { ConduitGrpcSdk.Logger.log('Updated state'); }) .catch(() => { - ConduitGrpcSdk.Logger.error('Failed to recover state'); + ConduitGrpcSdk.Logger.error('Failed to update state'); }); } } diff --git a/yarn.lock b/yarn.lock index 4eaa5e4b9..16c2b7080 100644 --- a/yarn.lock +++ b/yarn.lock @@ -9853,7 +9853,7 @@ nice-grpc@^2.1.7: abort-controller-x "^0.4.0" nice-grpc-common "^2.0.2" -node-abort-controller@^3.1.1: +node-abort-controller@^3.0.1, node-abort-controller@^3.1.1: version "3.1.1" resolved "https://registry.yarnpkg.com/node-abort-controller/-/node-abort-controller-3.1.1.tgz#a94377e964a9a37ac3976d848cb5c765833b8548" integrity sha512-AGK2yQKIjRuqnc6VkX2Xj5d+QW8xZ87pa1UK6yA6ouUyuxfHuMP6umE5QK7UmTeOAymo+Zx1Fxiuw9rVx8taHQ== @@ -11257,6 +11257,13 @@ redis-parser@^3.0.0: dependencies: redis-errors "^1.0.0" +redlock@^5.0.0-beta.2: + version "5.0.0-beta.2" + resolved "https://registry.yarnpkg.com/redlock/-/redlock-5.0.0-beta.2.tgz#a629c07e07d001c0fdd9f2efa614144c4416fe44" + integrity sha512-2RDWXg5jgRptDrB1w9O/JgSZC0j7y4SlaXnor93H/UJm/QyDiFgBKNtrh0TI6oCXqYSaSoXxFh6Sd3VtYfhRXw== + dependencies: + node-abort-controller "^3.0.1" + request@^2.55.0, request@^2.88.0: version "2.88.2" resolved "https://registry.yarnpkg.com/request/-/request-2.88.2.tgz#d73c918731cb5a87da047e207234146f664d12b3"