Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor(core,grpc-sdk,module-tools): rework service-discover, health checking and module communication #1268

Merged
merged 5 commits into from
Jan 3, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 4 additions & 2 deletions libraries/grpc-sdk/src/modules/config/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -105,11 +105,13 @@ export class Config extends ConduitModule<typeof ConfigDefinition> {
name: string,
url: string,
healthStatus: Omit<HealthCheckStatus, HealthCheckStatus.SERVICE_UNKNOWN>,
instanceId: string,
) {
const request: RegisterModuleRequest = {
moduleName: name.toString(),
url: url.toString(),
healthStatus: healthStatus as number,
instanceId,
};
const self = this;
return this.client!.registerModule(request).then(res => {
Expand All @@ -118,13 +120,13 @@ export class Config extends ConduitModule<typeof ConfigDefinition> {
});
}

moduleHealthProbe(name: string, url: string) {
moduleHealthProbe(name: string, url: string, instanceId: string) {
const request: ModuleHealthRequest = {
moduleName: name.toString(),
url,
status: this._serviceHealthStatusGetter
? this._serviceHealthStatusGetter()
: HealthCheckStatus.SERVICE_UNKNOWN,
instanceId,
};
const self = this;
this.client!.moduleHealthProbe(request)
Expand Down
2 changes: 2 additions & 0 deletions libraries/grpc-sdk/src/utilities/EventBus.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ export class EventBus {
this._clientSubscriber = redisManager.getClient({ keyPrefix: 'bus_' });
this._clientPublisher = redisManager.getClient({ keyPrefix: 'bus_' });
this._signature = crypto.randomBytes(20).toString('hex');
//@ts-expect-error
this._clientSubscriber.on('ready', () => {
ConduitGrpcSdk.Logger.log('The Bus is in the station...hehe');
});
Expand Down Expand Up @@ -59,6 +60,7 @@ export class EventBus {
this._subscribedChannels[channelName] = [callback];
this._clientSubscriber.subscribe(channelName, () => {});
const self = this;
//@ts-expect-error
this._clientSubscriber.on('message', (channel: string, message: string) => {
if (channel !== channelName) return;
// if the message supports the signature
Expand Down
25 changes: 18 additions & 7 deletions libraries/module-tools/src/ManagedModule.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import { initializeSdk, merge } from './utilities/index.js';
import { convictConfigParser } from './utilities/convictConfigParser.js';
import { RoutingManager } from './routing/index.js';
import { RoutingController } from './routing/RoutingController.js';
import crypto from 'crypto';

export abstract class ManagedModule<T> extends ConduitServiceModule {
readonly config?: convict.Config<T>;
Expand All @@ -25,8 +26,11 @@ export abstract class ManagedModule<T> extends ConduitServiceModule {
private readonly servicePort: string;
private _moduleState: ModuleLifecycleStage;

protected constructor(moduleName: string) {
super(moduleName);
protected constructor(moduleName: string, instanceId?: string) {
super(
moduleName,
`${moduleName}-${instanceId ?? crypto.randomBytes(16).toString('hex')}`,
);
this._moduleState = ModuleLifecycleStage.CREATE_GRPC;
if (!process.env.CONDUIT_SERVER) {
throw new Error('CONDUIT_SERVER is undefined, specify Conduit server URL');
Expand All @@ -48,6 +52,10 @@ export abstract class ManagedModule<T> extends ConduitServiceModule {
return this._moduleName;
}

get instanceId() {
return this._instanceId;
}

get address() {
return this._address;
}
Expand All @@ -57,7 +65,12 @@ export abstract class ManagedModule<T> extends ConduitServiceModule {
this.initialize(this.serviceAddress, this.servicePort);
try {
await this.preRegisterLifecycle();
await this.grpcSdk.config.registerModule(this.name, this.address, this.healthState);
await this.grpcSdk.config.registerModule(
this.name,
this.address,
this.healthState,
this.instanceId,
);
} catch (err) {
ConduitGrpcSdk.Logger.error('Failed to initialize server');
ConduitGrpcSdk.Logger.error(err as Error);
Expand Down Expand Up @@ -257,11 +270,9 @@ export abstract class ManagedModule<T> extends ConduitServiceModule {
convictConfigParser(configSchema),
this.configOverride,
);
ConfigController.getInstance().config = config;

ConfigController.getInstance();
if (config) ConfigController.getInstance().config = config;
if (!config || config.active || !config.hasOwnProperty('active'))
await this.onConfig();
await this.onConfig();
}
}
}
11 changes: 9 additions & 2 deletions libraries/module-tools/src/classes/ConduitServiceModule.ts
Original file line number Diff line number Diff line change
Expand Up @@ -20,16 +20,19 @@ import { fileURLToPath } from 'node:url';

const __filename = fileURLToPath(import.meta.url);
const __dirname = path.dirname(__filename);

export abstract class ConduitServiceModule {
protected readonly _moduleName: string;
protected readonly _instanceId: string;
protected _serviceName?: string;
protected _address!: string; // external address:port of service (LoadBalancer)
protected grpcServer!: GrpcServer;
protected readonly events: EventEmitter = new EventEmitter();
private _serviceHealthState: HealthCheckStatus = HealthCheckStatus.SERVING; // default for health-agnostic modules

protected constructor(moduleName: string) {
protected constructor(moduleName: string, instanceId: string) {
this._moduleName = camelCase(moduleName);
this._instanceId = instanceId;
}

private _registered = false;
Expand Down Expand Up @@ -83,7 +86,11 @@ export abstract class ConduitServiceModule {
// do not emit health updates until registered
if (this._registered) {
this.events.emit(`grpc-health-change:${this._serviceName}`, state);
this._grpcSdk?.config.moduleHealthProbe(this._moduleName, this._address);
this._grpcSdk?.config.moduleHealthProbe(
this._moduleName,
this._address,
this._instanceId,
);
}
}
}
Expand Down
21 changes: 8 additions & 13 deletions modules/forms/src/Forms.ts
Original file line number Diff line number Diff line change
Expand Up @@ -32,26 +32,13 @@ export default class Forms extends ManagedModule<Config> {
this.database = this.grpcSdk.database!;
await runMigrations(this.grpcSdk);
await this.registerSchemas();
this.grpcSdk.monitorModule('email', serving => {
if (serving && ConfigController.getInstance().config.active) {
this.onConfig()
.then()
.catch(() => {
ConduitGrpcSdk.Logger.error('Failed to update Forms configuration');
});
} else {
this.updateHealth(HealthCheckStatus.NOT_SERVING);
}
});
}

async onConfig() {
if (!ConfigController.getInstance().config.active) {
this.updateHealth(HealthCheckStatus.NOT_SERVING);
} else {
if (!this.isRunning) {
if (!this.grpcSdk.isAvailable('email')) return;
await this.grpcSdk.emailProvider!.registerTemplate(FormSubmissionTemplate);
this.formController = new FormsController(this.grpcSdk);
this.adminRouter = new AdminHandlers(
this.grpcServer,
Expand All @@ -68,6 +55,14 @@ export default class Forms extends ManagedModule<Config> {
.catch(e => {
ConduitGrpcSdk.Logger.error(e.message);
});
this.grpcSdk
.waitForExistence('email')
.then(() => {
this.grpcSdk.emailProvider!.registerTemplate(FormSubmissionTemplate);
})
.catch(e => {
ConduitGrpcSdk.Logger.error(e.message);
});
}
this.updateHealth(HealthCheckStatus.SERVING);
}
Expand Down
26 changes: 14 additions & 12 deletions modules/forms/src/routes/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -94,18 +94,20 @@ export class FormsRoutes {
Object.keys(data).forEach(r => {
text += `</br>${r}: ${data[r]}`;
});
await this.grpcSdk
.emailProvider!.sendEmail('FormSubmission', {
email: form.forwardTo,
replyTo: form.emailField ? data[form.emailField] : null,
variables: {
data: text,
},
attachments: Object.values(fileData),
})
.catch((e: Error) => {
throw new GrpcError(status.INTERNAL, e.message);
});
if (this.grpcSdk.isAvailable('email')) {
await this.grpcSdk
.emailProvider!.sendEmail('FormSubmission', {
email: form.forwardTo,
replyTo: form.emailField ? data[form.emailField] : null,
variables: {
data: text,
},
attachments: Object.values(fileData),
})
.catch((e: Error) => {
throw new GrpcError(status.INTERNAL, e.message);
});
}

return 'Ok';
}
Expand Down
4 changes: 0 additions & 4 deletions packages/commons/src/interfaces/RegisteredModule.ts

This file was deleted.

1 change: 0 additions & 1 deletion packages/commons/src/interfaces/index.ts
Original file line number Diff line number Diff line change
@@ -1,2 +1 @@
export * from './IConduitModule.js';
export * from './RegisteredModule.js';
27 changes: 19 additions & 8 deletions packages/core/src/config-manager/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,12 @@ import {
} from '@conduitplatform/grpc-sdk';
import {
ConduitCommons,
GetConfigRequest,
GetConfigResponse,
GetRedisDetailsResponse,
IConfigManager,
ModuleByNameRequest,
ModuleByNameResponse,
UpdateConfigRequest,
UpdateConfigResponse,
} from '@conduitplatform/commons';
Expand Down Expand Up @@ -44,22 +47,27 @@ export default class ConfigManager implements IConfigManager {
}

getModuleUrlByName(moduleName: string): string | undefined {
return this.serviceDiscovery.getModuleUrlByName(moduleName);
const result = this.serviceDiscovery.getModule(moduleName);
return result!.servingAddress ?? result!.allAddresses!;
}

getModuleUrlByNameGrpc(
call: GrpcRequest<{ name: string }>,
callback: GrpcResponse<{ moduleUrl: string }>,
call: GrpcRequest<ModuleByNameRequest>,
callback: GrpcResponse<ModuleByNameResponse>,
) {
const name = call.request.name;
const result = this.getModuleUrlByName(name);
const result = this.serviceDiscovery.getModule(name);
if (!result) {
return callback({
code: status.NOT_FOUND,
message: 'Module not found',
});
}
callback(null, { moduleUrl: result });
callback(null, {
moduleUrl: result.servingAddress ?? result.allAddresses!,
//@ts-expect-error
instances: result.instances,
});
}

async initialize(server: GrpcServer) {
Expand All @@ -81,7 +89,7 @@ export default class ConfigManager implements IConfigManager {
moduleHealthProbe: this.serviceDiscovery.moduleHealthProbe.bind(
this.serviceDiscovery,
),
getModuleUrlByName: this.getModuleUrlByNameGrpc.bind(this.serviceDiscovery),
getModuleUrlByName: this.getModuleUrlByNameGrpc.bind(this),
},
);
this.serviceDiscovery.beginMonitors();
Expand Down Expand Up @@ -156,7 +164,10 @@ export default class ConfigManager implements IConfigManager {
this._configStorage.onDatabaseAvailable();
}

getGrpc(call: GrpcRequest<{ key: string }>, callback: GrpcResponse<{ data: string }>) {
getGrpc(
call: GrpcRequest<GetConfigRequest>,
callback: GrpcResponse<GetConfigResponse>,
) {
this.get(call.request.key).then(r => {
if (!r) {
return callback({
Expand Down Expand Up @@ -231,7 +242,7 @@ export default class ConfigManager implements IConfigManager {
const module = ServiceRegistry.getInstance().getModule(moduleName);
if (!module) return false;
try {
await this.grpcSdk.isModuleUp(moduleName, module.address);
await this.grpcSdk.isModuleUp(moduleName, module.allAddresses!);
} catch (e) {
return false;
}
Expand Down
3 changes: 3 additions & 0 deletions packages/core/src/config-manager/models/Basic.message.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
export type BasicMessage = {
type: 'instance-health' | 'serving-modules-update';
};
Loading