From edb4e2c91f4e8f90b0420be61270a75d49709732 Mon Sep 17 00:00:00 2001 From: Marcos Spessatto Defendi Date: Thu, 23 Nov 2023 14:52:41 -0300 Subject: [PATCH] chore: Adapt some messaging features to work without the DB watcher (#30408) --- .../hooks/propagateDiscussionMetadata.ts | 19 +++++++- .../server/methods/createDiscussion.ts | 8 ++-- .../app/lib/server/functions/deleteMessage.ts | 13 ++++-- .../app/lib/server/functions/sendMessage.js | 7 ++- .../app/lib/server/functions/updateMessage.ts | 8 +++- .../lib/server/lib/notifyUsersOnMessage.js | 7 ++- .../app/message-pin/server/pinMessage.ts | 7 ++- .../app/message-star/server/starMessage.ts | 7 +++ .../app/reactions/server/setReaction.ts | 6 +++ .../threads/server/hooks/aftersavemessage.ts | 6 +++ apps/meteor/server/models/raw/Messages.ts | 17 ++++--- .../modules/listeners/listeners.module.ts | 4 +- .../modules/watchers/watchers.module.ts | 46 +++---------------- .../server/services/messages/service.ts | 6 ++- apps/meteor/server/services/meteor/service.ts | 4 +- .../src/models/IMessagesModel.ts | 3 +- 16 files changed, 103 insertions(+), 65 deletions(-) diff --git a/apps/meteor/app/discussion/server/hooks/propagateDiscussionMetadata.ts b/apps/meteor/app/discussion/server/hooks/propagateDiscussionMetadata.ts index b6054b6dcccf..e366216ed7f9 100644 --- a/apps/meteor/app/discussion/server/hooks/propagateDiscussionMetadata.ts +++ b/apps/meteor/app/discussion/server/hooks/propagateDiscussionMetadata.ts @@ -1,8 +1,23 @@ +import { api } from '@rocket.chat/core-services'; +import type { IRoom } from '@rocket.chat/core-typings'; import { Messages, Rooms } from '@rocket.chat/models'; import { callbacks } from '../../../../lib/callbacks'; +import { broadcastMessageSentEvent } from '../../../../server/modules/watchers/lib/messages'; import { deleteRoom } from '../../../lib/server/functions/deleteRoom'; +const updateAndNotifyParentRoomWithParentMessage = async (room: IRoom): Promise => { + const { value: parentMessage } = await Messages.refreshDiscussionMetadata(room); + if (!parentMessage) { + return; + } + void broadcastMessageSentEvent({ + id: parentMessage._id, + data: parentMessage, + broadcastCallback: (message) => api.broadcast('message.sent', message), + }); +}; + /** * We need to propagate the writing of new message in a discussion to the linking * system message @@ -25,7 +40,7 @@ callbacks.add( return message; } - await Messages.refreshDiscussionMetadata(room); + await updateAndNotifyParentRoomWithParentMessage(room); return message; }, @@ -45,7 +60,7 @@ callbacks.add( }); if (room) { - await Messages.refreshDiscussionMetadata(room); + await updateAndNotifyParentRoomWithParentMessage(room); } } if (message.drid) { diff --git a/apps/meteor/app/discussion/server/methods/createDiscussion.ts b/apps/meteor/app/discussion/server/methods/createDiscussion.ts index c3869f8ff963..b39ef64255e8 100644 --- a/apps/meteor/app/discussion/server/methods/createDiscussion.ts +++ b/apps/meteor/app/discussion/server/methods/createDiscussion.ts @@ -186,13 +186,13 @@ const create = async ({ discussionMsg = await createDiscussionMessage(prid, user, discussion._id, discussionName); } - if (discussionMsg) { - callbacks.runAsync('afterSaveMessage', discussionMsg, parentRoom); - } - if (reply) { await sendMessage(user, { msg: reply }, discussion); } + + if (discussionMsg) { + callbacks.runAsync('afterSaveMessage', discussionMsg, parentRoom); + } return discussion; }; diff --git a/apps/meteor/app/lib/server/functions/deleteMessage.ts b/apps/meteor/app/lib/server/functions/deleteMessage.ts index 6fddbd719fa9..c197f93518ce 100644 --- a/apps/meteor/app/lib/server/functions/deleteMessage.ts +++ b/apps/meteor/app/lib/server/functions/deleteMessage.ts @@ -5,6 +5,7 @@ import { Meteor } from 'meteor/meteor'; import { Apps } from '../../../../ee/server/apps'; import { callbacks } from '../../../../lib/callbacks'; +import { broadcastMessageSentEvent } from '../../../../server/modules/watchers/lib/messages'; import { canDeleteMessageAsync } from '../../../authorization/server/functions/canDeleteMessage'; import { FileUpload } from '../../../file-upload/server'; import { settings } from '../../../settings/server'; @@ -68,7 +69,6 @@ export async function deleteMessage(message: IMessage, user: IUser): Promise>); @@ -86,11 +86,18 @@ export async function deleteMessage(message: IMessage, user: IUser): Promise api.broadcast('message.sent', message), + }); + } + if (bridges) { void bridges.getListenerBridge().messageEvent('IPostMessageDeleted', deletedMsg, user); } diff --git a/apps/meteor/app/lib/server/functions/sendMessage.js b/apps/meteor/app/lib/server/functions/sendMessage.js index f4d84c64d59e..e2f45d38fcbc 100644 --- a/apps/meteor/app/lib/server/functions/sendMessage.js +++ b/apps/meteor/app/lib/server/functions/sendMessage.js @@ -1,4 +1,4 @@ -import { Message } from '@rocket.chat/core-services'; +import { Message, api } from '@rocket.chat/core-services'; import { Messages } from '@rocket.chat/models'; import { Match, check } from 'meteor/check'; @@ -6,6 +6,7 @@ import { Apps } from '../../../../ee/server/apps'; import { callbacks } from '../../../../lib/callbacks'; import { isRelativeURL } from '../../../../lib/utils/isRelativeURL'; import { isURL } from '../../../../lib/utils/isURL'; +import { broadcastMessageSentEvent } from '../../../../server/modules/watchers/lib/messages'; import { hasPermissionAsync } from '../../../authorization/server/functions/hasPermission'; import { FileUpload } from '../../../file-upload/server'; import notifications from '../../../notifications/server/lib/Notifications'; @@ -288,6 +289,10 @@ export const sendMessage = async function (user, message, room, upsert = false, // Execute all callbacks await callbacks.run('afterSaveMessage', message, room); + void broadcastMessageSentEvent({ + id: message._id, + broadcastCallback: (message) => api.broadcast('message.sent', message), + }); return message; } }; diff --git a/apps/meteor/app/lib/server/functions/updateMessage.ts b/apps/meteor/app/lib/server/functions/updateMessage.ts index baca3f1b80ca..88c0b829e77d 100644 --- a/apps/meteor/app/lib/server/functions/updateMessage.ts +++ b/apps/meteor/app/lib/server/functions/updateMessage.ts @@ -1,10 +1,11 @@ -import { Message } from '@rocket.chat/core-services'; +import { Message, api } from '@rocket.chat/core-services'; import type { IEditedMessage, IMessage, IUser, AtLeast } from '@rocket.chat/core-typings'; import { Messages, Rooms } from '@rocket.chat/models'; import { Meteor } from 'meteor/meteor'; import { Apps } from '../../../../ee/server/apps'; import { callbacks } from '../../../../lib/callbacks'; +import { broadcastMessageSentEvent } from '../../../../server/modules/watchers/lib/messages'; import { settings } from '../../../settings/server'; import { parseUrlsInMessage } from './parseUrlsInMessage'; @@ -86,6 +87,11 @@ export const updateMessage = async function ( const msg = await Messages.findOneById(_id); if (msg) { await callbacks.run('afterSaveMessage', msg, room, user._id); + void broadcastMessageSentEvent({ + id: msg._id, + data: msg, + broadcastCallback: (message) => api.broadcast('message.sent', message), + }); } }); }; diff --git a/apps/meteor/app/lib/server/lib/notifyUsersOnMessage.js b/apps/meteor/app/lib/server/lib/notifyUsersOnMessage.js index 8c70ee3f9a4f..b55a272de558 100644 --- a/apps/meteor/app/lib/server/lib/notifyUsersOnMessage.js +++ b/apps/meteor/app/lib/server/lib/notifyUsersOnMessage.js @@ -191,4 +191,9 @@ export async function notifyUsersOnMessage(message, room) { return message; } -callbacks.add('afterSaveMessage', (message, room) => notifyUsersOnMessage(message, room), callbacks.priority.LOW, 'notifyUsersOnMessage'); +callbacks.add( + 'afterSaveMessage', + (message, room) => notifyUsersOnMessage(message, room), + callbacks.priority.MEDIUM, + 'notifyUsersOnMessage', +); diff --git a/apps/meteor/app/message-pin/server/pinMessage.ts b/apps/meteor/app/message-pin/server/pinMessage.ts index 24f910f01f71..652f465188c1 100644 --- a/apps/meteor/app/message-pin/server/pinMessage.ts +++ b/apps/meteor/app/message-pin/server/pinMessage.ts @@ -1,4 +1,4 @@ -import { Message } from '@rocket.chat/core-services'; +import { Message, api } from '@rocket.chat/core-services'; import { isQuoteAttachment, isRegisterUser } from '@rocket.chat/core-typings'; import type { IMessage, MessageAttachment, MessageQuoteAttachment } from '@rocket.chat/core-typings'; import { Messages, Rooms, Subscriptions, Users, ReadReceipts } from '@rocket.chat/models'; @@ -9,6 +9,7 @@ import { Meteor } from 'meteor/meteor'; import { Apps, AppEvents } from '../../../ee/server/apps/orchestrator'; import { callbacks } from '../../../lib/callbacks'; import { isTruthy } from '../../../lib/isTruthy'; +import { broadcastMessageSentEvent } from '../../../server/modules/watchers/lib/messages'; import { canAccessRoomAsync, roomAccessAttributes } from '../../authorization/server'; import { hasPermissionAsync } from '../../authorization/server/functions/hasPermission'; import { isTheLastMessage } from '../../lib/server/functions/isTheLastMessage'; @@ -226,6 +227,10 @@ Meteor.methods({ if (settings.get('Message_Read_Receipt_Store_Users')) { await ReadReceipts.setPinnedByMessageId(originalMessage._id, originalMessage.pinned); } + void broadcastMessageSentEvent({ + id: message._id, + broadcastCallback: (message) => api.broadcast('message.sent', message), + }); return true; }, diff --git a/apps/meteor/app/message-star/server/starMessage.ts b/apps/meteor/app/message-star/server/starMessage.ts index 006951ad0b72..aaa5657c5b35 100644 --- a/apps/meteor/app/message-star/server/starMessage.ts +++ b/apps/meteor/app/message-star/server/starMessage.ts @@ -1,9 +1,11 @@ +import { api } from '@rocket.chat/core-services'; import type { IMessage } from '@rocket.chat/core-typings'; import { Messages, Subscriptions, Rooms } from '@rocket.chat/models'; import type { ServerMethods } from '@rocket.chat/ui-contexts'; import { Meteor } from 'meteor/meteor'; import { Apps, AppEvents } from '../../../ee/server/apps/orchestrator'; +import { broadcastMessageSentEvent } from '../../../server/modules/watchers/lib/messages'; import { canAccessRoomAsync, roomAccessAttributes } from '../../authorization/server'; import { isTheLastMessage } from '../../lib/server/functions/isTheLastMessage'; import { settings } from '../../settings/server'; @@ -60,6 +62,11 @@ Meteor.methods({ await Messages.updateUserStarById(message._id, uid, message.starred); + void broadcastMessageSentEvent({ + id: message._id, + broadcastCallback: (message) => api.broadcast('message.sent', message), + }); + return true; }, }); diff --git a/apps/meteor/app/reactions/server/setReaction.ts b/apps/meteor/app/reactions/server/setReaction.ts index 50ffe76810dc..fab1100fc615 100644 --- a/apps/meteor/app/reactions/server/setReaction.ts +++ b/apps/meteor/app/reactions/server/setReaction.ts @@ -8,6 +8,7 @@ import _ from 'underscore'; import { AppEvents, Apps } from '../../../ee/server/apps/orchestrator'; import { callbacks } from '../../../lib/callbacks'; import { i18n } from '../../../server/lib/i18n'; +import { broadcastMessageSentEvent } from '../../../server/modules/watchers/lib/messages'; import { canAccessRoomAsync } from '../../authorization/server'; import { hasPermissionAsync } from '../../authorization/server/functions/hasPermission'; import { emoji } from '../../emoji/server'; @@ -106,6 +107,11 @@ async function setReaction(room: IRoom, user: IUser, message: IMessage, reaction } await Apps.triggerEvent(AppEvents.IPostMessageReacted, message, user, reaction, isReacted); + + void broadcastMessageSentEvent({ + id: message._id, + broadcastCallback: (message) => api.broadcast('message.sent', message), + }); } export async function executeSetReaction(userId: string, reaction: string, messageId: IMessage['_id'], shouldReact?: boolean) { diff --git a/apps/meteor/app/threads/server/hooks/aftersavemessage.ts b/apps/meteor/app/threads/server/hooks/aftersavemessage.ts index 6af3b2eedb7e..6fa780e12f8d 100644 --- a/apps/meteor/app/threads/server/hooks/aftersavemessage.ts +++ b/apps/meteor/app/threads/server/hooks/aftersavemessage.ts @@ -1,9 +1,11 @@ +import { api } from '@rocket.chat/core-services'; import type { IMessage, IRoom } from '@rocket.chat/core-typings'; import { isEditedMessage } from '@rocket.chat/core-typings'; import { Messages } from '@rocket.chat/models'; import { Meteor } from 'meteor/meteor'; import { callbacks } from '../../../../lib/callbacks'; +import { broadcastMessageSentEvent } from '../../../../server/modules/watchers/lib/messages'; import { updateThreadUsersSubscriptions, getMentions } from '../../../lib/server/lib/notifyUsersOnMessage'; import { sendMessageNotifications } from '../../../lib/server/lib/sendNotificationsOnMessage'; import { settings } from '../../../settings/server'; @@ -61,6 +63,10 @@ export async function processThreads(message: IMessage, room: IRoom) { await notifyUsersOnReply(message, replies, room); await metaData(message, parentMessage, replies); await notification(message, room, replies); + void broadcastMessageSentEvent({ + id: message.tmid, + broadcastCallback: (message) => api.broadcast('message.sent', message), + }); return message; } diff --git a/apps/meteor/server/models/raw/Messages.ts b/apps/meteor/server/models/raw/Messages.ts index 49930a8c92ea..93ef8982a604 100644 --- a/apps/meteor/server/models/raw/Messages.ts +++ b/apps/meteor/server/models/raw/Messages.ts @@ -27,6 +27,7 @@ import type { UpdateResult, Document, UpdateFilter, + ModifyResult, } from 'mongodb'; import { otrSystemMessages } from '../../../app/otr/lib/constants'; @@ -1593,19 +1594,23 @@ export class MessagesRaw extends BaseRaw implements IMessagesModel { * to race conditions: If multiple updates occur, the current state will be updated * only if the new state of the discussion room is really newer. */ - async refreshDiscussionMetadata(room: Pick): Promise { + async refreshDiscussionMetadata(room: Pick): Promise> { const { _id: drid, msgs: dcount, lm: dlm } = room; const query = { drid, }; - return this.updateMany(query, { - $set: { - dcount, - dlm, + return this.col.findOneAndUpdate( + query, + { + $set: { + dcount, + dlm, + }, }, - }); + { returnDocument: 'after' }, + ); } // ////////////////////////////////////////////////////////////////// diff --git a/apps/meteor/server/modules/listeners/listeners.module.ts b/apps/meteor/server/modules/listeners/listeners.module.ts index df940006d26e..aba351dbc958 100644 --- a/apps/meteor/server/modules/listeners/listeners.module.ts +++ b/apps/meteor/server/modules/listeners/listeners.module.ts @@ -1,7 +1,7 @@ import type { AppStatus } from '@rocket.chat/apps-engine/definition/AppStatus'; import type { ISetting as AppsSetting } from '@rocket.chat/apps-engine/definition/settings'; import type { IServiceClass } from '@rocket.chat/core-services'; -import { EnterpriseSettings } from '@rocket.chat/core-services'; +import { EnterpriseSettings, listenToMessageSentEvent } from '@rocket.chat/core-services'; import { UserStatus, isSettingColor, isSettingEnterprise } from '@rocket.chat/core-typings'; import type { IUser, IRoom, VideoConference, ISetting, IOmnichannelRoom } from '@rocket.chat/core-typings'; import { Logger } from '@rocket.chat/logger'; @@ -167,7 +167,7 @@ export class ListenersModule { }); }); - service.onEvent('watch.messages', ({ message }) => { + listenToMessageSentEvent(service, async (message) => { if (!message.rid) { return; } diff --git a/apps/meteor/server/modules/watchers/watchers.module.ts b/apps/meteor/server/modules/watchers/watchers.module.ts index 5ad081348abb..88e465edc018 100644 --- a/apps/meteor/server/modules/watchers/watchers.module.ts +++ b/apps/meteor/server/modules/watchers/watchers.module.ts @@ -14,7 +14,6 @@ import type { IIntegration, IEmailInbox, IPbxEvent, - SettingValue, ILivechatInquiryRecord, IRole, ILivechatPriority, @@ -37,10 +36,10 @@ import { Permissions, LivechatPriority, } from '@rocket.chat/models'; -import mem from 'mem'; import { subscriptionFields, roomFields } from '../../../lib/publishFields'; import type { DatabaseWatcher } from '../../database/DatabaseWatcher'; +import { broadcastMessageSentEvent } from './lib/messages'; type BroadcastCallback = (event: T, ...args: Parameters) => Promise; @@ -65,49 +64,16 @@ export function isWatcherRunning(): boolean { return watcherStarted; } -const getSettingCached = mem(async (setting: string): Promise => Settings.getValueById(setting), { maxAge: 10000 }); - -const getUserNameCached = mem( - async (userId: string): Promise => { - const user = await Users.findOne>(userId, { projection: { name: 1 } }); - return user?.name; - }, - { maxAge: 10000 }, -); - const messageWatcher = (watcher: DatabaseWatcher, broadcast: BroadcastCallback): void => { watcher.on(Messages.getCollectionName(), async ({ clientAction, id, data }) => { switch (clientAction) { case 'inserted': case 'updated': - const message = data ?? (await Messages.findOneById(id)); - if (!message) { - return; - } - - if (message._hidden !== true && message.imported == null) { - const UseRealName = (await getSettingCached('UI_Use_Real_Name')) === true; - - if (UseRealName) { - if (message.u?._id) { - const name = await getUserNameCached(message.u._id); - if (name) { - message.u.name = name; - } - } - - if (message.mentions?.length) { - for await (const mention of message.mentions) { - const name = await getUserNameCached(mention._id); - if (name) { - mention.name = name; - } - } - } - } - - void broadcast('watch.messages', { clientAction, message }); - } + void broadcastMessageSentEvent({ + id, + data, + broadcastCallback: (message) => broadcast('watch.messages', { clientAction, message }), + }); break; } }); diff --git a/apps/meteor/server/services/messages/service.ts b/apps/meteor/server/services/messages/service.ts index f28a9d71b2f5..48509f5d073d 100644 --- a/apps/meteor/server/services/messages/service.ts +++ b/apps/meteor/server/services/messages/service.ts @@ -10,6 +10,7 @@ import { updateMessage } from '../../../app/lib/server/functions/updateMessage'; import { executeSendMessage } from '../../../app/lib/server/methods/sendMessage'; import { executeSetReaction } from '../../../app/reactions/server/setReaction'; import { settings } from '../../../app/settings/server'; +import { broadcastMessageSentEvent } from '../../modules/watchers/lib/messages'; import { BeforeSavePreventMention } from './hooks/BeforeSavePreventMention'; import { configureBadWords } from './hooks/badwords'; @@ -78,7 +79,10 @@ export class MessageService extends ServiceClassInternal implements IMessageServ settings.get('Message_Read_Receipt_Enabled'), extraData, ); - + void broadcastMessageSentEvent({ + id: result.insertedId, + broadcastCallback: async (message) => this.api?.broadcast('message.sent', message), + }); return result.insertedId; } diff --git a/apps/meteor/server/services/meteor/service.ts b/apps/meteor/server/services/meteor/service.ts index 2420a70d7ada..396ad4dcc20a 100644 --- a/apps/meteor/server/services/meteor/service.ts +++ b/apps/meteor/server/services/meteor/service.ts @@ -1,4 +1,4 @@ -import { api, ServiceClassInternal } from '@rocket.chat/core-services'; +import { api, ServiceClassInternal, listenToMessageSentEvent } from '@rocket.chat/core-services'; import type { AutoUpdateRecord, IMeteor } from '@rocket.chat/core-services'; import type { ILivechatAgent } from '@rocket.chat/core-typings'; import { Users } from '@rocket.chat/models'; @@ -221,7 +221,7 @@ export class MeteorService extends ServiceClassInternal implements IMeteor { }); if (!disableMsgRoundtripTracking) { - this.onEvent('watch.messages', ({ message }) => { + listenToMessageSentEvent(this, async (message) => { if (message?._updatedAt) { metrics.messageRoundtripTime.set(Date.now() - message._updatedAt.getDate()); } diff --git a/packages/model-typings/src/models/IMessagesModel.ts b/packages/model-typings/src/models/IMessagesModel.ts index d681405850fa..9b7680b56120 100644 --- a/packages/model-typings/src/models/IMessagesModel.ts +++ b/packages/model-typings/src/models/IMessagesModel.ts @@ -18,6 +18,7 @@ import type { UpdateResult, Document, Filter, + ModifyResult, } from 'mongodb'; import type { FindPaginated, IBaseModel } from './IBaseModel'; @@ -252,7 +253,7 @@ export interface IMessagesModel extends IBaseModel { getMessageByFileId(fileID: string): Promise; setThreadMessagesAsRead(tmid: string, until: Date): Promise; updateRepliesByThreadId(tmid: string, replies: string[], ts: Date): Promise; - refreshDiscussionMetadata(room: Pick): Promise; + refreshDiscussionMetadata(room: Pick): Promise>; findUnreadThreadMessagesByDate( tmid: string, userId: string,