Skip to content

Commit

Permalink
refactor(core): service discovery mechanism (#811)
Browse files Browse the repository at this point in the history
* refactor(core): service discovery mechanism
fix(module-tools): linear backoff not stopping on clear

* chore: codefactor complaints

* fix: random f letter

* feat(grpc-sdk): add modifyState with redis distributed lock to protect states
fix(core): config state recovery issues
refactor(core): publish module updates frequently

* feat(grpc-sdk): add known locks enum
refactor(admin): use modify state and merge config/admin states

* chore: remove unused arg

---------

Co-authored-by: chris <[email protected]>
  • Loading branch information
kkopanidis and ChrisPdgn authored Nov 24, 2023
1 parent 23a425c commit 1d38eac
Show file tree
Hide file tree
Showing 12 changed files with 495 additions and 335 deletions.
1 change: 1 addition & 0 deletions libraries/grpc-sdk/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
"nice-grpc-client-middleware-retry": "^3",
"prom-client": "^15.0.0",
"protobufjs": "^7.2.4",
"redlock": "^5.0.0-beta.2",
"winston": "^3.8.2",
"winston-loki": "^6.0.7"
},
Expand Down
45 changes: 45 additions & 0 deletions libraries/grpc-sdk/src/utilities/StateManager.ts
Original file line number Diff line number Diff line change
@@ -1,11 +1,56 @@
import { RedisManager } from './RedisManager';
import { Cluster, Redis } from 'ioredis';
import Redlock, { Lock } from 'redlock';
import { Indexable } from '../interfaces';

export enum KNOWN_LOCKS {
STATE_MODIFICATION = 'state_modification',
}

export class StateManager {
private readonly redisClient: Redis | Cluster;
private readonly redLock: Redlock;

constructor(redisManager: RedisManager, name: string) {
this.redisClient = redisManager.getClient({ keyPrefix: name + '_' });
this.redLock = new Redlock([this.redisClient], {
// The expected clock drift; for more details see:
// http://redis.io/topics/distlock
driftFactor: 0.01, // multiplied by lock ttl to determine drift time
// The max number of times Redlock will attempt to lock a resource
// before error.
retryCount: 10,

// the time in ms between attempts
retryDelay: 100, // time in ms
// the max time in ms randomly added to retries
// to improve performance under high contention
// see https://www.awsarchitectureblog.com/2015/03/backoff.html
retryJitter: 200, // time in ms
// The minimum remaining time on a lock before an extension is automatically
// attempted with the `using` API.
automaticExtensionThreshold: 500, // time in ms
});
}

async acquireLock(resource: string, ttl: number = 5000): Promise<Lock> {
return await this.redLock.acquire([resource], ttl);
}

async releaseLock(lock: Lock) {
await lock.release();
}

async modifyState(modifier: (state: Indexable) => Promise<Indexable>) {
const lock = await this.acquireLock(KNOWN_LOCKS.STATE_MODIFICATION);
try {
const retrievedState = (await this.getState()) ?? '{}';
const currentState = JSON.parse(retrievedState);
const newState = await modifier(currentState);
await this.setState(JSON.stringify(newState));
} finally {
await this.releaseLock(lock);
}
}

setState(stateObj: any) {
Expand Down
11 changes: 9 additions & 2 deletions libraries/module-tools/src/utilities/linearBackoffTimeout.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,18 +6,25 @@
import { clearTimeout } from 'timers';

export function linearBackoffTimeout(
onTry: (timeout: NodeJS.Timeout) => void,
onTry: (stop: () => void) => void,
delay: number,
reps?: number,
onFailure?: () => void,
) {
const nextRep = () => reps === undefined || --reps > 0;
let stopSignal = false;
const stop = () => {
stopSignal = true;
};
let timeout: NodeJS.Timeout | null;
const invoker = async () => {
if (stopSignal) {
return timeout ? clearTimeout(timeout) : null;
}
delay = Math.floor(delay * 2);
if (delay > 0 && nextRep()) {
timeout = setTimeout(invoker, delay);
onTry(timeout);
onTry(stop);
} else {
timeout = null;
onFailure && onFailure();
Expand Down
18 changes: 10 additions & 8 deletions packages/admin/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import * as middleware from './middleware';
import * as adminRoutes from './routes';
import * as models from './models';
import { AdminMiddleware } from './models';
import { protoTemplate, getSwaggerMetadata } from './hermes';
import { getSwaggerMetadata, protoTemplate } from './hermes';
import path from 'path';
import {
ConduitMiddleware,
Expand Down Expand Up @@ -78,7 +78,10 @@ export default class AdminModule extends IConduitAdmin {
private databaseHandled = false;
private hasAppliedMiddleware: string[] = [];

constructor(readonly commons: ConduitCommons, grpcSdk: ConduitGrpcSdk) {
constructor(
readonly commons: ConduitCommons,
grpcSdk: ConduitGrpcSdk,
) {
super(commons);
this.grpcSdk = grpcSdk;
ProtoGenerator.getInstance(protoTemplate);
Expand Down Expand Up @@ -358,7 +361,7 @@ export default class AdminModule extends IConduitAdmin {
}

private async highAvailability() {
const r = await this.grpcSdk.state!.getKey('admin');
const r = await this.grpcSdk.state!.getState();
const proxyRoutes = await models.AdminProxyRoute.getInstance().findMany({});
if ((!r || r.length === 0) && (!proxyRoutes || proxyRoutes.length === 0)) {
this.cleanupRoutes();
Expand Down Expand Up @@ -430,10 +433,9 @@ export default class AdminModule extends IConduitAdmin {
url: string,
moduleName?: string,
) {
this.grpcSdk
.state!.getKey('admin')
.then(r => {
const state = !r || r.length === 0 ? {} : JSON.parse(r);
this.grpcSdk.state
?.modifyState(async (existingState: Indexable) => {
const state = existingState ?? {};
if (!state.routes) state.routes = [];
let index;
(state.routes as Indexable[]).forEach((val, i) => {
Expand All @@ -450,7 +452,7 @@ export default class AdminModule extends IConduitAdmin {
moduleName,
});
}
return this.grpcSdk.state!.setKey('admin', JSON.stringify(state));
return state;
})
.then(() => {
this.publishAdminRouteData(routes, url, moduleName);
Expand Down
28 changes: 9 additions & 19 deletions packages/core/src/config-manager/admin/routes/GetModules.route.ts
Original file line number Diff line number Diff line change
@@ -1,16 +1,15 @@
import { RegisteredModule } from '@conduitplatform/commons';
import {
ConduitError,
ConduitRouteActions,
ConduitRouteParameters,
ConduitRouteReturnDefinition,
UntypedArray,
} from '@conduitplatform/grpc-sdk';
import { ConduitRoute } from '@conduitplatform/hermes';
import { isNil } from 'lodash';
import { ConduitBoolean, ConduitString } from '@conduitplatform/module-tools';
import { ServiceRegistry } from '../../service-discovery/ServiceRegistry';

export function getModulesRoute(registeredModules: Map<string, RegisteredModule>) {
export function getModulesRoute() {
return new ConduitRoute(
{
path: '/config/modules',
Expand All @@ -31,24 +30,15 @@ export function getModulesRoute(registeredModules: Map<string, RegisteredModule>
}),
async (call: ConduitRouteParameters) => {
const sortByName = call.params!.sortByName;
if (registeredModules.size !== 0) {
const modules: UntypedArray = [];
registeredModules.forEach((value: RegisteredModule, key: string) => {
modules.push({
moduleName: key,
url: value.address,
serving: value.serving,
});
});
if (!isNil(sortByName)) {
if (sortByName)
modules!.sort((a, b) => a.moduleName.localeCompare(b.moduleName));
else modules!.sort((a, b) => b.moduleName.localeCompare(a.moduleName));
}
return { modules };
} else {
const modules = ServiceRegistry.getInstance().getModuleDetailsList();
if (modules.length === 0) {
throw new ConduitError('INTERNAL', 500, 'Modules not available yet');
}
if (!isNil(sortByName)) {
if (sortByName) modules!.sort((a, b) => a.moduleName.localeCompare(b.moduleName));
else modules!.sort((a, b) => b.moduleName.localeCompare(a.moduleName));
}
return { modules };
},
);
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,10 @@ import ConduitGrpcSdk, {
ConduitRouteReturnDefinition,
} from '@conduitplatform/grpc-sdk';
import { ConduitJson } from '@conduitplatform/module-tools';

import { RegisteredModule } from '@conduitplatform/commons';
import { ConduitRoute } from '@conduitplatform/hermes';
import { ServiceRegistry } from '../../service-discovery/ServiceRegistry';

export function getMonoConfigRoute(
grpcSdk: ConduitGrpcSdk,
registeredModules: Map<string, RegisteredModule>,
) {
export function getMonoConfigRoute(grpcSdk: ConduitGrpcSdk) {
return new ConduitRoute(
{
path: '/config',
Expand All @@ -26,7 +22,7 @@ export function getMonoConfigRoute(
const sortedModules = [
'core',
'admin',
...Array.from(registeredModules.keys()),
...ServiceRegistry.getInstance().getRegisteredModules(),
].sort();
for (const moduleName of sortedModules) {
const moduleConfig = await grpcSdk.state!.getKey(`moduleConfigs.${moduleName}`);
Expand Down
22 changes: 13 additions & 9 deletions packages/core/src/config-manager/config-storage/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import * as models from '../models';
import { ServiceDiscovery } from '../service-discovery';
import { clearInterval } from 'timers';
import { merge } from 'lodash';
import { ServiceRegistry } from '../service-discovery/ServiceRegistry';

export class ConfigStorage {
toBeReconciled: string[] = [];
Expand Down Expand Up @@ -51,7 +52,7 @@ export class ConfigStorage {
if (configDocs.length === 0) {
// flush redis stored configuration to the database
let moduleConfig;
for (const key of this.serviceDiscovery.registeredModules.keys()) {
for (const key of ServiceRegistry.getInstance().getRegisteredModules()) {
try {
moduleConfig = await this.getConfig(key, false);
await models.Config.getInstance().create({ name: key, config: moduleConfig });
Expand Down Expand Up @@ -80,7 +81,7 @@ export class ConfigStorage {
}
}
// Update Admin and all active modules
const registeredModules = Array.from(this.serviceDiscovery.registeredModules.keys());
const registeredModules = ServiceRegistry.getInstance().getRegisteredModules();
const moduleConfigs = await models.Config.getInstance().findMany({});
for (const config of moduleConfigs) {
if (config.name === 'core') continue;
Expand All @@ -98,13 +99,16 @@ export class ConfigStorage {
}

reconcileMonitor() {
const reconciliationInterval = setInterval(() => {
if (this.grpcSdk.isAvailable('database') && this.toBeReconciled.length > 0) {
this.reconcile();
}
// add a random extra amount to mitigate race-conditions,
// between core instances
}, 1500 + Math.floor(Math.random() * 300));
const reconciliationInterval = setInterval(
() => {
if (this.grpcSdk.isAvailable('database') && this.toBeReconciled.length > 0) {
this.reconcile();
}
// add a random extra amount to mitigate race-conditions,
// between core instances
},
1500 + Math.floor(Math.random() * 300),
);

process.on('exit', () => {
clearInterval(reconciliationInterval);
Expand Down
Loading

0 comments on commit 1d38eac

Please sign in to comment.