Skip to content

Commit

Permalink
refactor(core): modify HA startup function to reduce redis lock times (
Browse files Browse the repository at this point in the history
…#942)

* refactor(core): modify HA startup function to reduce redis lock times

* chore(core): code-factor

---------

Co-authored-by: Konstantinos Feretos <[email protected]>
  • Loading branch information
kkopanidis and kon14 authored Feb 2, 2024
1 parent 5a39c7b commit 375b2d7
Show file tree
Hide file tree
Showing 2 changed files with 23 additions and 20 deletions.
41 changes: 22 additions & 19 deletions packages/core/src/config-manager/service-discovery/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import { ServerWritableStream, status } from '@grpc/grpc-js';
import { EventEmitter } from 'events';
import { ServiceRegistry } from './ServiceRegistry.js';
import { ServiceMonitor } from './ServiceMonitor.js';
import { isEmpty } from 'lodash-es';

/*
* - Multi-instance services are not handled individually (LoadBalancer)
Expand Down Expand Up @@ -41,12 +42,12 @@ export class ServiceDiscovery {
}

async highAvailability() {
await this.grpcSdk
.state!.modifyState(async (existingState: Indexable) => {
const state = existingState ?? {};
const state = await this.grpcSdk.state?.getState();
if (state && !isEmpty(state)) {
const parsedState = JSON.parse(state) as { modules: IModuleConfig[] };
if (parsedState.modules) {
const success: IModuleConfig[] = [];
if (!state.modules) return state;
for (const module of state.modules) {
for (const module of parsedState.modules) {
try {
await this._recoverModule(module.name, module.url);
success.push({
Expand All @@ -61,18 +62,20 @@ export class ServiceDiscovery {
ConduitGrpcSdk.Logger.error(`SD: recovery error: ${e}`);
}
}
if (state.modules.length > success.length) {
state.modules = success;
}
return state;
})
.then(() => {
ConduitGrpcSdk.Logger.log('Recovered state');
})
.catch(() => {
ConduitGrpcSdk.Logger.error('Failed to recover state');
});

await this.grpcSdk
.state!.modifyState(async (existingState: Indexable) => {
const state = existingState ?? {};
state.modules = success;
return state;
})
.then(() => {
ConduitGrpcSdk.Logger.log('Recovered state');
})
.catch(() => {
ConduitGrpcSdk.Logger.error('Failed to recover state');
});
}
}
this.grpcSdk.bus!.subscribe('config', (message: string) => {
const parsedMessage = JSON.parse(message);
if (parsedMessage.type === 'module-health') {
Expand Down Expand Up @@ -259,10 +262,10 @@ export class ServiceDiscovery {
return state;
})
.then(() => {
ConduitGrpcSdk.Logger.log('Updated state');
ConduitGrpcSdk.Logger.log(`SD: Updated state for ${name} ${url}`);
})
.catch(() => {
ConduitGrpcSdk.Logger.error('Failed to update state');
ConduitGrpcSdk.Logger.error(`SD: Failed to update state ${name} ${url}`);
});
}
}
2 changes: 1 addition & 1 deletion packages/core/src/interfaces/IModuleConfig.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
export interface IModuleConfig {
name: string;
instance: string;
instance?: string;
url: string;
configSchema?: string;
}

0 comments on commit 375b2d7

Please sign in to comment.