From 9661ea38a62c5b4af4fd55dbe4f09fa2ba6ac522 Mon Sep 17 00:00:00 2001 From: Ricardo Garim Date: Tue, 10 Dec 2024 16:06:33 -0300 Subject: [PATCH] chore: move db calls to model definition --- .../server/local-services/instance/service.ts | 51 +++++++--------- apps/meteor/ee/server/startup/presence.ts | 6 +- .../server/models/raw/InstanceStatus.ts | 38 +++++++++++- ee/apps/ddp-streamer/src/DDPStreamer.ts | 4 +- packages/instance-status/src/index.ts | 61 ++++--------------- .../src/models/IInstanceStatusModel.ts | 5 +- 6 files changed, 81 insertions(+), 84 deletions(-) diff --git a/apps/meteor/ee/server/local-services/instance/service.ts b/apps/meteor/ee/server/local-services/instance/service.ts index cd4fcab21b99..93d6d0c45e98 100644 --- a/apps/meteor/ee/server/local-services/instance/service.ts +++ b/apps/meteor/ee/server/local-services/instance/service.ts @@ -33,8 +33,6 @@ export class InstanceService extends ServiceClassInternal implements IInstanceSe private transporter: Transporters.TCP | Transporters.NATS; - private isTransporterTCP = true; - private broker: ServiceBroker; private troubleshootDisableInstanceBroadcast = false; @@ -42,33 +40,6 @@ export class InstanceService extends ServiceClassInternal implements IInstanceSe constructor() { super(); - const transporter = getTransporter({ - transporter: process.env.TRANSPORTER, - port: process.env.TCP_PORT, - extra: process.env.TRANSPORTER_EXTRA, - }); - this.isTransporterTCP = typeof transporter !== 'string'; - - const activeInstances = InstanceStatusRaw.getActiveInstancesAddress() - .then((instances) => { - console.info(`Found ${instances.length} active instances`); - return instances; - }) - .catch(() => []); - - this.transporter = this.isTransporterTCP - ? new Transporters.TCP({ ...transporter, urls: activeInstances }) - : new Transporters.NATS({ url: transporter }); - - this.broker = new ServiceBroker({ - nodeID: InstanceStatus.id(), - transporter: this.transporter, - serializer: new EJSONSerializer(), - heartbeatInterval: defaultPingInterval, - heartbeatTimeout: indexExpire, - ...getLogger(process.env), - }); - this.onEvent('license.module', async ({ module, valid }) => { if (module === 'scalability' && valid) { await this.startBroadcast(); @@ -98,6 +69,28 @@ export class InstanceService extends ServiceClassInternal implements IInstanceSe } async created() { + const transporter = getTransporter({ + transporter: process.env.TRANSPORTER, + port: process.env.TCP_PORT, + extra: process.env.TRANSPORTER_EXTRA, + }); + + const activeInstances = InstanceStatusRaw.getActiveInstancesAddress(); + + this.transporter = + typeof transporter !== 'string' + ? new Transporters.TCP({ ...transporter, urls: activeInstances }) + : new Transporters.NATS({ url: transporter }); + + this.broker = new ServiceBroker({ + nodeID: InstanceStatus.id(), + transporter: this.transporter, + serializer: new EJSONSerializer(), + heartbeatInterval: defaultPingInterval, + heartbeatTimeout: indexExpire, + ...getLogger(process.env), + }); + this.broker.createService({ name: 'matrix', events: { diff --git a/apps/meteor/ee/server/startup/presence.ts b/apps/meteor/ee/server/startup/presence.ts index e0756e4b4c59..933399fc6cb6 100644 --- a/apps/meteor/ee/server/startup/presence.ts +++ b/apps/meteor/ee/server/startup/presence.ts @@ -1,12 +1,14 @@ import { Presence } from '@rocket.chat/core-services'; -import { InstanceStatus } from '@rocket.chat/instance-status'; +import { id as instanceId } from '@rocket.chat/instance-status'; +import { InstanceStatus } from '@rocket.chat/models'; import { Accounts } from 'meteor/accounts-base'; import { Meteor } from 'meteor/meteor'; import { throttle } from 'underscore'; // update connections count every 30 seconds const updateConns = throttle(function _updateConns() { - void InstanceStatus.updateConnections(Meteor.server.sessions.size); + const nodeId = instanceId(); + void InstanceStatus.updateConnections(nodeId, Meteor.server.sessions.size); }, 30000); Meteor.startup(() => { diff --git a/apps/meteor/server/models/raw/InstanceStatus.ts b/apps/meteor/server/models/raw/InstanceStatus.ts index c8763c431781..86aed7fbc128 100644 --- a/apps/meteor/server/models/raw/InstanceStatus.ts +++ b/apps/meteor/server/models/raw/InstanceStatus.ts @@ -1,6 +1,6 @@ import type { IInstanceStatus } from '@rocket.chat/core-typings'; import type { IInstanceStatusModel } from '@rocket.chat/model-typings'; -import type { Db, UpdateResult } from 'mongodb'; +import type { Db, ModifyResult, UpdateResult, DeleteResult } from 'mongodb'; import { BaseRaw } from './BaseRaw'; @@ -23,7 +23,43 @@ export class InstanceStatusRaw extends BaseRaw implements IInst return instances.map((instance) => `${instance.extraInformation.host}:${instance.extraInformation.tcpPort}/${instance._id}`); } + async removeInstanceById(_id: IInstanceStatus['_id']): Promise { + return this.deleteOne({ _id }); + } + async setDocumentHeartbeat(documentId: string): Promise { return this.updateOne({ _id: documentId }, { $currentDate: { _updatedAt: true } }); } + + async upsertInstance(instance: Partial): Promise> { + return this.findOneAndUpdate( + { + _id: instance._id, + }, + { + $set: instance, + $currentDate: { + _createdAt: true, + _updatedAt: true, + }, + }, + { + upsert: true, + returnDocument: 'after', + }, + ); + } + + async updateConnections(_id: IInstanceStatus['_id'], conns: number) { + return this.updateOne( + { + _id, + }, + { + $set: { + 'extraInformation.conns': conns, + }, + }, + ); + } } diff --git a/ee/apps/ddp-streamer/src/DDPStreamer.ts b/ee/apps/ddp-streamer/src/DDPStreamer.ts index 1c880929650d..416afa5f5cc2 100644 --- a/ee/apps/ddp-streamer/src/DDPStreamer.ts +++ b/ee/apps/ddp-streamer/src/DDPStreamer.ts @@ -2,7 +2,7 @@ import crypto from 'crypto'; import { MeteorService, Presence, ServiceClass } from '@rocket.chat/core-services'; import { InstanceStatus } from '@rocket.chat/instance-status'; -import { Users } from '@rocket.chat/models'; +import { Users, InstanceStatus as InstanceStatusModel } from '@rocket.chat/models'; import polka from 'polka'; import { throttle } from 'underscore'; import WebSocket from 'ws'; @@ -66,7 +66,7 @@ export class DDPStreamer extends ServiceClass { // update connections count every 30 seconds updateConnections = throttle(() => { - InstanceStatus.updateConnections(this.wss?.clients.size ?? 0); + InstanceStatusModel.updateConnections(InstanceStatus.id(), this.wss?.clients.size ?? 0); }, 30000); async created(): Promise { diff --git a/packages/instance-status/src/index.ts b/packages/instance-status/src/index.ts index b2375cb267e4..10128c302b62 100644 --- a/packages/instance-status/src/index.ts +++ b/packages/instance-status/src/index.ts @@ -1,10 +1,12 @@ +import type { IInstanceStatus } from '@rocket.chat/core-typings'; import { InstanceStatus as InstanceStatusModel } from '@rocket.chat/models'; import { v4 as uuidv4 } from 'uuid'; -export const defaultPingInterval = parseInt(String(process.env.MULTIPLE_INSTANCES_PING_INTERVAL)) || 10; // default to 10s -export const indexExpire = (parseInt(String(process.env.MULTIPLE_INSTANCES_EXPIRE)) || Math.ceil((defaultPingInterval * 3) / 60)) * 60; +const defaultPingInterval = parseInt(String(process.env.MULTIPLE_INSTANCES_PING_INTERVAL)) || 10; +const indexExpire = (parseInt(String(process.env.MULTIPLE_INSTANCES_EXPIRE)) || Math.ceil((defaultPingInterval * 3) / 60)) * 60; const ID = uuidv4(); +const id = (): IInstanceStatus['_id'] => ID; const currentInstance = { name: '', @@ -13,10 +15,6 @@ const currentInstance = { let pingInterval: NodeJS.Timeout | null; -export function id() { - return ID; -} - function start() { stop(); pingInterval = setInterval(async () => ping(), defaultPingInterval * 1000); @@ -52,56 +50,22 @@ let createIndexes = async () => { } }); - // eslint-disable-next-line @typescript-eslint/no-empty-function - createIndexes = async () => {}; + createIndexes = async () => { + // noop + }; }; -async function updateInstanceOnDB(instance: any) { - try { - return InstanceStatusModel.findOneAndUpdate( - { _id: ID }, - { - $set: instance, - $currentDate: { _createdAt: true, _updatedAt: true }, - }, - { upsert: true, returnDocument: 'after' }, - ); - } catch (e) { - return e; - } -} - -async function updateConnections(conns: number) { - await InstanceStatusModel.updateOne( - { - _id: ID, - }, - { - $set: { - 'extraInformation.conns': conns, - }, - }, - ); -} - -async function deleteInstanceOnDB() { - try { - await InstanceStatusModel.deleteOne({ _id: ID }); - } catch (e) { - return e; - } -} - -export async function registerInstance(name: string, extraInformation: Record): Promise { +async function registerInstance(name: string, extraInformation: Partial): Promise { createIndexes(); currentInstance.name = name; currentInstance.extraInformation = extraInformation; - const result = await updateInstanceOnDB({ + const result = await InstanceStatusModel.upsertInstance({ + _id: id(), pid: process.pid, name, - ...(extraInformation && { extraInformation }), + extraInformation: extraInformation as IInstanceStatus['extraInformation'], }); start(); @@ -112,7 +76,7 @@ export async function registerInstance(name: string, extraInformation: Record { getActiveInstanceCount(): Promise; getActiveInstancesAddress(): Promise; + removeInstanceById(_id: IInstanceStatus['_id']): Promise; setDocumentHeartbeat(documentId: string): Promise; + upsertInstance(instance: Partial): Promise>; + updateConnections(_id: IInstanceStatus['_id'], conns: number): Promise; }