Skip to content

Commit

Permalink
chore: move db calls to model definition
Browse files Browse the repository at this point in the history
  • Loading branch information
ricardogarim committed Dec 10, 2024
1 parent d71f267 commit 9661ea3
Show file tree
Hide file tree
Showing 6 changed files with 81 additions and 84 deletions.
51 changes: 22 additions & 29 deletions apps/meteor/ee/server/local-services/instance/service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -33,42 +33,13 @@ export class InstanceService extends ServiceClassInternal implements IInstanceSe

private transporter: Transporters.TCP | Transporters.NATS;

private isTransporterTCP = true;

private broker: ServiceBroker;

private troubleshootDisableInstanceBroadcast = false;

constructor() {
super();

const transporter = getTransporter({
transporter: process.env.TRANSPORTER,
port: process.env.TCP_PORT,
extra: process.env.TRANSPORTER_EXTRA,
});
this.isTransporterTCP = typeof transporter !== 'string';

const activeInstances = InstanceStatusRaw.getActiveInstancesAddress()
.then((instances) => {
console.info(`Found ${instances.length} active instances`);
return instances;
})
.catch(() => []);

this.transporter = this.isTransporterTCP
? new Transporters.TCP({ ...transporter, urls: activeInstances })
: new Transporters.NATS({ url: transporter });

this.broker = new ServiceBroker({
nodeID: InstanceStatus.id(),
transporter: this.transporter,
serializer: new EJSONSerializer(),
heartbeatInterval: defaultPingInterval,
heartbeatTimeout: indexExpire,
...getLogger(process.env),
});

this.onEvent('license.module', async ({ module, valid }) => {
if (module === 'scalability' && valid) {
await this.startBroadcast();
Expand Down Expand Up @@ -98,6 +69,28 @@ export class InstanceService extends ServiceClassInternal implements IInstanceSe
}

async created() {
const transporter = getTransporter({
transporter: process.env.TRANSPORTER,
port: process.env.TCP_PORT,
extra: process.env.TRANSPORTER_EXTRA,
});

const activeInstances = InstanceStatusRaw.getActiveInstancesAddress();

this.transporter =
typeof transporter !== 'string'
? new Transporters.TCP({ ...transporter, urls: activeInstances })
: new Transporters.NATS({ url: transporter });

this.broker = new ServiceBroker({
nodeID: InstanceStatus.id(),
transporter: this.transporter,
serializer: new EJSONSerializer(),
heartbeatInterval: defaultPingInterval,
heartbeatTimeout: indexExpire,
...getLogger(process.env),
});

this.broker.createService({
name: 'matrix',
events: {
Expand Down
6 changes: 4 additions & 2 deletions apps/meteor/ee/server/startup/presence.ts
Original file line number Diff line number Diff line change
@@ -1,12 +1,14 @@
import { Presence } from '@rocket.chat/core-services';
import { InstanceStatus } from '@rocket.chat/instance-status';
import { id as instanceId } from '@rocket.chat/instance-status';
import { InstanceStatus } from '@rocket.chat/models';
import { Accounts } from 'meteor/accounts-base';
import { Meteor } from 'meteor/meteor';
import { throttle } from 'underscore';

// update connections count every 30 seconds
const updateConns = throttle(function _updateConns() {
void InstanceStatus.updateConnections(Meteor.server.sessions.size);
const nodeId = instanceId();
void InstanceStatus.updateConnections(nodeId, Meteor.server.sessions.size);
}, 30000);

Meteor.startup(() => {
Expand Down
38 changes: 37 additions & 1 deletion apps/meteor/server/models/raw/InstanceStatus.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import type { IInstanceStatus } from '@rocket.chat/core-typings';
import type { IInstanceStatusModel } from '@rocket.chat/model-typings';
import type { Db, UpdateResult } from 'mongodb';
import type { Db, ModifyResult, UpdateResult, DeleteResult } from 'mongodb';

import { BaseRaw } from './BaseRaw';

Expand All @@ -23,7 +23,43 @@ export class InstanceStatusRaw extends BaseRaw<IInstanceStatus> implements IInst
return instances.map((instance) => `${instance.extraInformation.host}:${instance.extraInformation.tcpPort}/${instance._id}`);
}

async removeInstanceById(_id: IInstanceStatus['_id']): Promise<DeleteResult> {
return this.deleteOne({ _id });
}

async setDocumentHeartbeat(documentId: string): Promise<UpdateResult> {
return this.updateOne({ _id: documentId }, { $currentDate: { _updatedAt: true } });
}

async upsertInstance(instance: Partial<IInstanceStatus>): Promise<ModifyResult<IInstanceStatus>> {
return this.findOneAndUpdate(
{
_id: instance._id,
},
{
$set: instance,
$currentDate: {
_createdAt: true,
_updatedAt: true,
},
},
{
upsert: true,
returnDocument: 'after',
},
);
}

async updateConnections(_id: IInstanceStatus['_id'], conns: number) {
return this.updateOne(
{
_id,
},
{
$set: {
'extraInformation.conns': conns,
},
},
);
}
}
4 changes: 2 additions & 2 deletions ee/apps/ddp-streamer/src/DDPStreamer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ import crypto from 'crypto';

import { MeteorService, Presence, ServiceClass } from '@rocket.chat/core-services';
import { InstanceStatus } from '@rocket.chat/instance-status';
import { Users } from '@rocket.chat/models';
import { Users, InstanceStatus as InstanceStatusModel } from '@rocket.chat/models';
import polka from 'polka';
import { throttle } from 'underscore';
import WebSocket from 'ws';
Expand Down Expand Up @@ -66,7 +66,7 @@ export class DDPStreamer extends ServiceClass {

// update connections count every 30 seconds
updateConnections = throttle(() => {
InstanceStatus.updateConnections(this.wss?.clients.size ?? 0);
InstanceStatusModel.updateConnections(InstanceStatus.id(), this.wss?.clients.size ?? 0);
}, 30000);

async created(): Promise<void> {
Expand Down
61 changes: 12 additions & 49 deletions packages/instance-status/src/index.ts
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
import type { IInstanceStatus } from '@rocket.chat/core-typings';
import { InstanceStatus as InstanceStatusModel } from '@rocket.chat/models';
import { v4 as uuidv4 } from 'uuid';

export const defaultPingInterval = parseInt(String(process.env.MULTIPLE_INSTANCES_PING_INTERVAL)) || 10; // default to 10s
export const indexExpire = (parseInt(String(process.env.MULTIPLE_INSTANCES_EXPIRE)) || Math.ceil((defaultPingInterval * 3) / 60)) * 60;
const defaultPingInterval = parseInt(String(process.env.MULTIPLE_INSTANCES_PING_INTERVAL)) || 10;
const indexExpire = (parseInt(String(process.env.MULTIPLE_INSTANCES_EXPIRE)) || Math.ceil((defaultPingInterval * 3) / 60)) * 60;

const ID = uuidv4();
const id = (): IInstanceStatus['_id'] => ID;

const currentInstance = {
name: '',
Expand All @@ -13,10 +15,6 @@ const currentInstance = {

let pingInterval: NodeJS.Timeout | null;

export function id() {
return ID;
}

function start() {
stop();
pingInterval = setInterval(async () => ping(), defaultPingInterval * 1000);
Expand Down Expand Up @@ -52,56 +50,22 @@ let createIndexes = async () => {
}
});

// eslint-disable-next-line @typescript-eslint/no-empty-function
createIndexes = async () => {};
createIndexes = async () => {
// noop
};
};

async function updateInstanceOnDB(instance: any) {
try {
return InstanceStatusModel.findOneAndUpdate(
{ _id: ID },
{
$set: instance,
$currentDate: { _createdAt: true, _updatedAt: true },
},
{ upsert: true, returnDocument: 'after' },
);
} catch (e) {
return e;
}
}

async function updateConnections(conns: number) {
await InstanceStatusModel.updateOne(
{
_id: ID,
},
{
$set: {
'extraInformation.conns': conns,
},
},
);
}

async function deleteInstanceOnDB() {
try {
await InstanceStatusModel.deleteOne({ _id: ID });
} catch (e) {
return e;
}
}

export async function registerInstance(name: string, extraInformation: Record<string, unknown>): Promise<unknown> {
async function registerInstance(name: string, extraInformation: Partial<IInstanceStatus['extraInformation']>): Promise<unknown> {
createIndexes();

currentInstance.name = name;
currentInstance.extraInformation = extraInformation;

const result = await updateInstanceOnDB({
const result = await InstanceStatusModel.upsertInstance({
_id: id(),
pid: process.pid,
name,
...(extraInformation && { extraInformation }),
extraInformation: extraInformation as IInstanceStatus['extraInformation'],
});

start();
Expand All @@ -112,7 +76,7 @@ export async function registerInstance(name: string, extraInformation: Record<st

async function unregisterInstance() {
try {
const result = await deleteInstanceOnDB();
const result = await InstanceStatusModel.removeInstanceById(id());
stop();
process.removeListener('exit', onExit);
return result;
Expand All @@ -136,7 +100,6 @@ async function onExit() {
export const InstanceStatus = {
id,
registerInstance,
updateConnections,
defaultPingInterval,
indexExpire,
};
5 changes: 4 additions & 1 deletion packages/model-typings/src/models/IInstanceStatusModel.ts
Original file line number Diff line number Diff line change
@@ -1,10 +1,13 @@
import type { IInstanceStatus } from '@rocket.chat/core-typings';
import type { UpdateResult } from 'mongodb';
import type { DeleteResult, ModifyResult, UpdateResult } from 'mongodb';

import type { IBaseModel } from './IBaseModel';

export interface IInstanceStatusModel extends IBaseModel<IInstanceStatus> {
getActiveInstanceCount(): Promise<number>;
getActiveInstancesAddress(): Promise<string[]>;
removeInstanceById(_id: IInstanceStatus['_id']): Promise<DeleteResult>;
setDocumentHeartbeat(documentId: string): Promise<UpdateResult>;
upsertInstance(instance: Partial<IInstanceStatus>): Promise<ModifyResult<IInstanceStatus>>;
updateConnections(_id: IInstanceStatus['_id'], conns: number): Promise<UpdateResult>;
}

0 comments on commit 9661ea3

Please sign in to comment.