Skip to content

Commit

Permalink
chore: Adapt some messaging features to work without the DB watcher (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
MarcosSpessatto authored Nov 23, 2023
1 parent bb65a2a commit edb4e2c
Show file tree
Hide file tree
Showing 16 changed files with 103 additions and 65 deletions.
Original file line number Diff line number Diff line change
@@ -1,8 +1,23 @@
import { api } from '@rocket.chat/core-services';
import type { IRoom } from '@rocket.chat/core-typings';
import { Messages, Rooms } from '@rocket.chat/models';

import { callbacks } from '../../../../lib/callbacks';
import { broadcastMessageSentEvent } from '../../../../server/modules/watchers/lib/messages';
import { deleteRoom } from '../../../lib/server/functions/deleteRoom';

const updateAndNotifyParentRoomWithParentMessage = async (room: IRoom): Promise<void> => {
const { value: parentMessage } = await Messages.refreshDiscussionMetadata(room);
if (!parentMessage) {
return;
}
void broadcastMessageSentEvent({
id: parentMessage._id,
data: parentMessage,
broadcastCallback: (message) => api.broadcast('message.sent', message),
});
};

/**
* We need to propagate the writing of new message in a discussion to the linking
* system message
Expand All @@ -25,7 +40,7 @@ callbacks.add(
return message;
}

await Messages.refreshDiscussionMetadata(room);
await updateAndNotifyParentRoomWithParentMessage(room);

return message;
},
Expand All @@ -45,7 +60,7 @@ callbacks.add(
});

if (room) {
await Messages.refreshDiscussionMetadata(room);
await updateAndNotifyParentRoomWithParentMessage(room);
}
}
if (message.drid) {
Expand Down
8 changes: 4 additions & 4 deletions apps/meteor/app/discussion/server/methods/createDiscussion.ts
Original file line number Diff line number Diff line change
Expand Up @@ -186,13 +186,13 @@ const create = async ({
discussionMsg = await createDiscussionMessage(prid, user, discussion._id, discussionName);
}

if (discussionMsg) {
callbacks.runAsync('afterSaveMessage', discussionMsg, parentRoom);
}

if (reply) {
await sendMessage(user, { msg: reply }, discussion);
}

if (discussionMsg) {
callbacks.runAsync('afterSaveMessage', discussionMsg, parentRoom);
}
return discussion;
};

Expand Down
13 changes: 10 additions & 3 deletions apps/meteor/app/lib/server/functions/deleteMessage.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import { Meteor } from 'meteor/meteor';

import { Apps } from '../../../../ee/server/apps';
import { callbacks } from '../../../../lib/callbacks';
import { broadcastMessageSentEvent } from '../../../../server/modules/watchers/lib/messages';
import { canDeleteMessageAsync } from '../../../authorization/server/functions/canDeleteMessage';
import { FileUpload } from '../../../file-upload/server';
import { settings } from '../../../settings/server';
Expand Down Expand Up @@ -68,7 +69,6 @@ export async function deleteMessage(message: IMessage, user: IUser): Promise<voi
file?._id && (await FileUpload.getStore('Uploads').deleteById(file._id));
}
}

if (showDeletedStatus) {
// TODO is there a better way to tell TS "IUser[username]" is not undefined?
await Messages.setAsDeletedByIdAndUser(message._id, user as Required<Pick<IUser, '_id' | 'username' | 'name'>>);
Expand All @@ -86,11 +86,18 @@ export async function deleteMessage(message: IMessage, user: IUser): Promise<voi
}
}

await callbacks.run('afterDeleteMessage', deletedMsg, room);

// decrease message count
await Rooms.decreaseMessageCountById(message.rid, 1);

await callbacks.run('afterDeleteMessage', deletedMsg, room);

if (keepHistory || showDeletedStatus) {
void broadcastMessageSentEvent({
id: message._id,
broadcastCallback: (message) => api.broadcast('message.sent', message),
});
}

if (bridges) {
void bridges.getListenerBridge().messageEvent('IPostMessageDeleted', deletedMsg, user);
}
Expand Down
7 changes: 6 additions & 1 deletion apps/meteor/app/lib/server/functions/sendMessage.js
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
import { Message } from '@rocket.chat/core-services';
import { Message, api } from '@rocket.chat/core-services';
import { Messages } from '@rocket.chat/models';
import { Match, check } from 'meteor/check';

import { Apps } from '../../../../ee/server/apps';
import { callbacks } from '../../../../lib/callbacks';
import { isRelativeURL } from '../../../../lib/utils/isRelativeURL';
import { isURL } from '../../../../lib/utils/isURL';
import { broadcastMessageSentEvent } from '../../../../server/modules/watchers/lib/messages';
import { hasPermissionAsync } from '../../../authorization/server/functions/hasPermission';
import { FileUpload } from '../../../file-upload/server';
import notifications from '../../../notifications/server/lib/Notifications';
Expand Down Expand Up @@ -288,6 +289,10 @@ export const sendMessage = async function (user, message, room, upsert = false,

// Execute all callbacks
await callbacks.run('afterSaveMessage', message, room);
void broadcastMessageSentEvent({
id: message._id,
broadcastCallback: (message) => api.broadcast('message.sent', message),
});
return message;
}
};
8 changes: 7 additions & 1 deletion apps/meteor/app/lib/server/functions/updateMessage.ts
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
import { Message } from '@rocket.chat/core-services';
import { Message, api } from '@rocket.chat/core-services';
import type { IEditedMessage, IMessage, IUser, AtLeast } from '@rocket.chat/core-typings';
import { Messages, Rooms } from '@rocket.chat/models';
import { Meteor } from 'meteor/meteor';

import { Apps } from '../../../../ee/server/apps';
import { callbacks } from '../../../../lib/callbacks';
import { broadcastMessageSentEvent } from '../../../../server/modules/watchers/lib/messages';
import { settings } from '../../../settings/server';
import { parseUrlsInMessage } from './parseUrlsInMessage';

Expand Down Expand Up @@ -86,6 +87,11 @@ export const updateMessage = async function (
const msg = await Messages.findOneById(_id);
if (msg) {
await callbacks.run('afterSaveMessage', msg, room, user._id);
void broadcastMessageSentEvent({
id: msg._id,
data: msg,
broadcastCallback: (message) => api.broadcast('message.sent', message),
});
}
});
};
7 changes: 6 additions & 1 deletion apps/meteor/app/lib/server/lib/notifyUsersOnMessage.js
Original file line number Diff line number Diff line change
Expand Up @@ -191,4 +191,9 @@ export async function notifyUsersOnMessage(message, room) {
return message;
}

callbacks.add('afterSaveMessage', (message, room) => notifyUsersOnMessage(message, room), callbacks.priority.LOW, 'notifyUsersOnMessage');
callbacks.add(
'afterSaveMessage',
(message, room) => notifyUsersOnMessage(message, room),
callbacks.priority.MEDIUM,
'notifyUsersOnMessage',
);
7 changes: 6 additions & 1 deletion apps/meteor/app/message-pin/server/pinMessage.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { Message } from '@rocket.chat/core-services';
import { Message, api } from '@rocket.chat/core-services';
import { isQuoteAttachment, isRegisterUser } from '@rocket.chat/core-typings';
import type { IMessage, MessageAttachment, MessageQuoteAttachment } from '@rocket.chat/core-typings';
import { Messages, Rooms, Subscriptions, Users, ReadReceipts } from '@rocket.chat/models';
Expand All @@ -9,6 +9,7 @@ import { Meteor } from 'meteor/meteor';
import { Apps, AppEvents } from '../../../ee/server/apps/orchestrator';
import { callbacks } from '../../../lib/callbacks';
import { isTruthy } from '../../../lib/isTruthy';
import { broadcastMessageSentEvent } from '../../../server/modules/watchers/lib/messages';
import { canAccessRoomAsync, roomAccessAttributes } from '../../authorization/server';
import { hasPermissionAsync } from '../../authorization/server/functions/hasPermission';
import { isTheLastMessage } from '../../lib/server/functions/isTheLastMessage';
Expand Down Expand Up @@ -226,6 +227,10 @@ Meteor.methods<ServerMethods>({
if (settings.get('Message_Read_Receipt_Store_Users')) {
await ReadReceipts.setPinnedByMessageId(originalMessage._id, originalMessage.pinned);
}
void broadcastMessageSentEvent({
id: message._id,
broadcastCallback: (message) => api.broadcast('message.sent', message),
});

return true;
},
Expand Down
7 changes: 7 additions & 0 deletions apps/meteor/app/message-star/server/starMessage.ts
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
import { api } from '@rocket.chat/core-services';
import type { IMessage } from '@rocket.chat/core-typings';
import { Messages, Subscriptions, Rooms } from '@rocket.chat/models';
import type { ServerMethods } from '@rocket.chat/ui-contexts';
import { Meteor } from 'meteor/meteor';

import { Apps, AppEvents } from '../../../ee/server/apps/orchestrator';
import { broadcastMessageSentEvent } from '../../../server/modules/watchers/lib/messages';
import { canAccessRoomAsync, roomAccessAttributes } from '../../authorization/server';
import { isTheLastMessage } from '../../lib/server/functions/isTheLastMessage';
import { settings } from '../../settings/server';
Expand Down Expand Up @@ -60,6 +62,11 @@ Meteor.methods<ServerMethods>({

await Messages.updateUserStarById(message._id, uid, message.starred);

void broadcastMessageSentEvent({
id: message._id,
broadcastCallback: (message) => api.broadcast('message.sent', message),
});

return true;
},
});
6 changes: 6 additions & 0 deletions apps/meteor/app/reactions/server/setReaction.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import _ from 'underscore';
import { AppEvents, Apps } from '../../../ee/server/apps/orchestrator';
import { callbacks } from '../../../lib/callbacks';
import { i18n } from '../../../server/lib/i18n';
import { broadcastMessageSentEvent } from '../../../server/modules/watchers/lib/messages';
import { canAccessRoomAsync } from '../../authorization/server';
import { hasPermissionAsync } from '../../authorization/server/functions/hasPermission';
import { emoji } from '../../emoji/server';
Expand Down Expand Up @@ -106,6 +107,11 @@ async function setReaction(room: IRoom, user: IUser, message: IMessage, reaction
}

await Apps.triggerEvent(AppEvents.IPostMessageReacted, message, user, reaction, isReacted);

void broadcastMessageSentEvent({
id: message._id,
broadcastCallback: (message) => api.broadcast('message.sent', message),
});
}

export async function executeSetReaction(userId: string, reaction: string, messageId: IMessage['_id'], shouldReact?: boolean) {
Expand Down
6 changes: 6 additions & 0 deletions apps/meteor/app/threads/server/hooks/aftersavemessage.ts
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
import { api } from '@rocket.chat/core-services';
import type { IMessage, IRoom } from '@rocket.chat/core-typings';
import { isEditedMessage } from '@rocket.chat/core-typings';
import { Messages } from '@rocket.chat/models';
import { Meteor } from 'meteor/meteor';

import { callbacks } from '../../../../lib/callbacks';
import { broadcastMessageSentEvent } from '../../../../server/modules/watchers/lib/messages';
import { updateThreadUsersSubscriptions, getMentions } from '../../../lib/server/lib/notifyUsersOnMessage';
import { sendMessageNotifications } from '../../../lib/server/lib/sendNotificationsOnMessage';
import { settings } from '../../../settings/server';
Expand Down Expand Up @@ -61,6 +63,10 @@ export async function processThreads(message: IMessage, room: IRoom) {
await notifyUsersOnReply(message, replies, room);
await metaData(message, parentMessage, replies);
await notification(message, room, replies);
void broadcastMessageSentEvent({
id: message.tmid,
broadcastCallback: (message) => api.broadcast('message.sent', message),
});

return message;
}
Expand Down
17 changes: 11 additions & 6 deletions apps/meteor/server/models/raw/Messages.ts
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import type {
UpdateResult,
Document,
UpdateFilter,
ModifyResult,
} from 'mongodb';

import { otrSystemMessages } from '../../../app/otr/lib/constants';
Expand Down Expand Up @@ -1593,19 +1594,23 @@ export class MessagesRaw extends BaseRaw<IMessage> implements IMessagesModel {
* to race conditions: If multiple updates occur, the current state will be updated
* only if the new state of the discussion room is really newer.
*/
async refreshDiscussionMetadata(room: Pick<IRoom, '_id' | 'msgs' | 'lm'>): Promise<UpdateResult | Document | false> {
async refreshDiscussionMetadata(room: Pick<IRoom, '_id' | 'msgs' | 'lm'>): Promise<ModifyResult<IMessage>> {
const { _id: drid, msgs: dcount, lm: dlm } = room;

const query = {
drid,
};

return this.updateMany(query, {
$set: {
dcount,
dlm,
return this.col.findOneAndUpdate(
query,
{
$set: {
dcount,
dlm,
},
},
});
{ returnDocument: 'after' },
);
}

// //////////////////////////////////////////////////////////////////
Expand Down
4 changes: 2 additions & 2 deletions apps/meteor/server/modules/listeners/listeners.module.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import type { AppStatus } from '@rocket.chat/apps-engine/definition/AppStatus';
import type { ISetting as AppsSetting } from '@rocket.chat/apps-engine/definition/settings';
import type { IServiceClass } from '@rocket.chat/core-services';
import { EnterpriseSettings } from '@rocket.chat/core-services';
import { EnterpriseSettings, listenToMessageSentEvent } from '@rocket.chat/core-services';
import { UserStatus, isSettingColor, isSettingEnterprise } from '@rocket.chat/core-typings';
import type { IUser, IRoom, VideoConference, ISetting, IOmnichannelRoom } from '@rocket.chat/core-typings';
import { Logger } from '@rocket.chat/logger';
Expand Down Expand Up @@ -167,7 +167,7 @@ export class ListenersModule {
});
});

service.onEvent('watch.messages', ({ message }) => {
listenToMessageSentEvent(service, async (message) => {
if (!message.rid) {
return;
}
Expand Down
46 changes: 6 additions & 40 deletions apps/meteor/server/modules/watchers/watchers.module.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ import type {
IIntegration,
IEmailInbox,
IPbxEvent,
SettingValue,
ILivechatInquiryRecord,
IRole,
ILivechatPriority,
Expand All @@ -37,10 +36,10 @@ import {
Permissions,
LivechatPriority,
} from '@rocket.chat/models';
import mem from 'mem';

import { subscriptionFields, roomFields } from '../../../lib/publishFields';
import type { DatabaseWatcher } from '../../database/DatabaseWatcher';
import { broadcastMessageSentEvent } from './lib/messages';

type BroadcastCallback = <T extends keyof EventSignatures>(event: T, ...args: Parameters<EventSignatures[T]>) => Promise<void>;

Expand All @@ -65,49 +64,16 @@ export function isWatcherRunning(): boolean {
return watcherStarted;
}

const getSettingCached = mem(async (setting: string): Promise<SettingValue> => Settings.getValueById(setting), { maxAge: 10000 });

const getUserNameCached = mem(
async (userId: string): Promise<string | undefined> => {
const user = await Users.findOne<Pick<IUser, 'name'>>(userId, { projection: { name: 1 } });
return user?.name;
},
{ maxAge: 10000 },
);

const messageWatcher = (watcher: DatabaseWatcher, broadcast: BroadcastCallback): void => {
watcher.on<IMessage>(Messages.getCollectionName(), async ({ clientAction, id, data }) => {
switch (clientAction) {
case 'inserted':
case 'updated':
const message = data ?? (await Messages.findOneById(id));
if (!message) {
return;
}

if (message._hidden !== true && message.imported == null) {
const UseRealName = (await getSettingCached('UI_Use_Real_Name')) === true;

if (UseRealName) {
if (message.u?._id) {
const name = await getUserNameCached(message.u._id);
if (name) {
message.u.name = name;
}
}

if (message.mentions?.length) {
for await (const mention of message.mentions) {
const name = await getUserNameCached(mention._id);
if (name) {
mention.name = name;
}
}
}
}

void broadcast('watch.messages', { clientAction, message });
}
void broadcastMessageSentEvent({
id,
data,
broadcastCallback: (message) => broadcast('watch.messages', { clientAction, message }),
});
break;
}
});
Expand Down
Loading

0 comments on commit edb4e2c

Please sign in to comment.