Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: Adapt some messaging features to work without the DB watcher #30408

Merged
merged 38 commits into from
Nov 23, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
38 commits
Select commit Hold shift + click to select a range
957fda2
chore: add a flag to disable db watchers on the message collection
MarcosSpessatto Sep 13, 2023
5e61b10
Merge branch 'develop' into chore/remove-message-db-watcher-1
MarcosSpessatto Sep 13, 2023
fef4fb0
chore: enable message event listeners depending on the event
MarcosSpessatto Sep 15, 2023
da806dd
chore: dispatch new event on send message
MarcosSpessatto Sep 15, 2023
66435f5
chore: dispatch new event on update message
MarcosSpessatto Sep 15, 2023
c85ab7e
chore: dispatch new event on delete message
MarcosSpessatto Sep 15, 2023
67b075b
chore: dispatch new event on send system message through svc
MarcosSpessatto Sep 15, 2023
28419b8
chore: adapt discussion feat to work without message events watcher
MarcosSpessatto Sep 15, 2023
be538b1
Merge branch 'develop' into chore/remove-message-db-watcher-1
MarcosSpessatto Sep 15, 2023
b6327ff
Merge branch 'develop' into chore/remove-message-db-watcher-1
MarcosSpessatto Sep 18, 2023
2d77cfb
Merge branch 'chore/remove-message-db-watcher-1' into chore/adapt-mes…
MarcosSpessatto Sep 18, 2023
ef450c8
Merge branch 'develop' into chore/remove-message-db-watcher-1
MarcosSpessatto Oct 26, 2023
95c99ef
Merge branch 'chore/remove-message-db-watcher-1' into chore/adapt-mes…
MarcosSpessatto Oct 26, 2023
1126a8a
Merge branch 'develop' into chore/remove-message-db-watcher-1
sampaiodiego Nov 3, 2023
de50690
Merge branch 'develop' into chore/remove-message-db-watcher-1
sampaiodiego Nov 7, 2023
c28e22f
Merge branch 'chore/remove-message-db-watcher-1' into chore/adapt-mes…
MarcosSpessatto Nov 7, 2023
23b67e3
Merge branch 'develop' into chore/remove-message-db-watcher-1
MarcosSpessatto Nov 8, 2023
9f59186
Merge branch 'chore/remove-message-db-watcher-1' into chore/adapt-mes…
MarcosSpessatto Nov 9, 2023
ce94c2a
Merge branch 'develop' into chore/remove-message-db-watcher-1
MarcosSpessatto Nov 10, 2023
13e4a99
chore: remove unnecessary fn
MarcosSpessatto Nov 10, 2023
4504415
chore: publish events directly using broadcast
MarcosSpessatto Nov 10, 2023
717ca5e
Merge branch 'chore/remove-message-db-watcher-1' into chore/adapt-mes…
MarcosSpessatto Nov 10, 2023
9ac4f19
chore: removing old references
MarcosSpessatto Nov 10, 2023
4a92a42
chore: useless param
MarcosSpessatto Nov 10, 2023
62965d3
Merge branch 'develop' into chore/remove-message-db-watcher-1
MarcosSpessatto Nov 10, 2023
d54a3db
Merge branch 'chore/remove-message-db-watcher-1' into chore/adapt-mes…
MarcosSpessatto Nov 10, 2023
62a7f29
chore: undo param change
MarcosSpessatto Nov 10, 2023
c4e25ba
Merge branch 'chore/remove-message-db-watcher-1' into chore/adapt-mes…
MarcosSpessatto Nov 10, 2023
5312f05
chore: reverting param
MarcosSpessatto Nov 10, 2023
040c1af
chore: delete user working without relying on msg db watcher (#30435)
MarcosSpessatto Nov 10, 2023
076c56c
Merge branch 'chore/remove-message-db-watcher-1' into chore/adapt-mes…
MarcosSpessatto Nov 10, 2023
95c58b6
chore: send updates to client about the thread message as well
MarcosSpessatto Nov 17, 2023
a7a264e
Merge branch 'develop' into chore/remove-message-db-watcher-1
MarcosSpessatto Nov 20, 2023
289cb22
Merge branch 'chore/remove-message-db-watcher-1' into chore/adapt-mes…
MarcosSpessatto Nov 20, 2023
3aac05d
Merge branch 'develop' into chore/adapt-message-events-without-db-wat…
sampaiodiego Nov 23, 2023
01fb41c
chore: Pin, star and react to message without oplog (#30906)
MarcosSpessatto Nov 23, 2023
5def71b
Update apps/meteor/app/lib/server/functions/deleteMessage.ts
MarcosSpessatto Nov 23, 2023
697e871
Merge branch 'develop' into chore/adapt-message-events-without-db-wat…
MarcosSpessatto Nov 23, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,8 +1,23 @@
import { api } from '@rocket.chat/core-services';
import type { IRoom } from '@rocket.chat/core-typings';
import { Messages, Rooms } from '@rocket.chat/models';

import { callbacks } from '../../../../lib/callbacks';
import { broadcastMessageSentEvent } from '../../../../server/modules/watchers/lib/messages';
import { deleteRoom } from '../../../lib/server/functions/deleteRoom';

const updateAndNotifyParentRoomWithParentMessage = async (room: IRoom): Promise<void> => {
const { value: parentMessage } = await Messages.refreshDiscussionMetadata(room);
if (!parentMessage) {
return;
}
void broadcastMessageSentEvent({
id: parentMessage._id,
data: parentMessage,
broadcastCallback: (message) => api.broadcast('message.sent', message),
});
};

/**
* We need to propagate the writing of new message in a discussion to the linking
* system message
Expand All @@ -25,7 +40,7 @@ callbacks.add(
return message;
}

await Messages.refreshDiscussionMetadata(room);
await updateAndNotifyParentRoomWithParentMessage(room);

return message;
},
Expand All @@ -45,7 +60,7 @@ callbacks.add(
});

if (room) {
await Messages.refreshDiscussionMetadata(room);
await updateAndNotifyParentRoomWithParentMessage(room);
}
}
if (message.drid) {
Expand Down
8 changes: 4 additions & 4 deletions apps/meteor/app/discussion/server/methods/createDiscussion.ts
Original file line number Diff line number Diff line change
Expand Up @@ -186,13 +186,13 @@ const create = async ({
discussionMsg = await createDiscussionMessage(prid, user, discussion._id, discussionName);
}

if (discussionMsg) {
callbacks.runAsync('afterSaveMessage', discussionMsg, parentRoom);
}

if (reply) {
await sendMessage(user, { msg: reply }, discussion);
}

if (discussionMsg) {
callbacks.runAsync('afterSaveMessage', discussionMsg, parentRoom);
}
return discussion;
};

Expand Down
13 changes: 10 additions & 3 deletions apps/meteor/app/lib/server/functions/deleteMessage.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import { Meteor } from 'meteor/meteor';

import { Apps } from '../../../../ee/server/apps';
import { callbacks } from '../../../../lib/callbacks';
import { broadcastMessageSentEvent } from '../../../../server/modules/watchers/lib/messages';
import { canDeleteMessageAsync } from '../../../authorization/server/functions/canDeleteMessage';
import { FileUpload } from '../../../file-upload/server';
import { settings } from '../../../settings/server';
Expand Down Expand Up @@ -68,7 +69,6 @@ export async function deleteMessage(message: IMessage, user: IUser): Promise<voi
file?._id && (await FileUpload.getStore('Uploads').deleteById(file._id));
}
}

if (showDeletedStatus) {
// TODO is there a better way to tell TS "IUser[username]" is not undefined?
await Messages.setAsDeletedByIdAndUser(message._id, user as Required<Pick<IUser, '_id' | 'username' | 'name'>>);
Expand All @@ -86,11 +86,18 @@ export async function deleteMessage(message: IMessage, user: IUser): Promise<voi
}
}

await callbacks.run('afterDeleteMessage', deletedMsg, room);

// decrease message count
await Rooms.decreaseMessageCountById(message.rid, 1);

await callbacks.run('afterDeleteMessage', deletedMsg, room);

if (keepHistory || showDeletedStatus) {
void broadcastMessageSentEvent({
id: message._id,
broadcastCallback: (message) => api.broadcast('message.sent', message),
});
}

if (bridges) {
void bridges.getListenerBridge().messageEvent('IPostMessageDeleted', deletedMsg, user);
}
Expand Down
7 changes: 6 additions & 1 deletion apps/meteor/app/lib/server/functions/sendMessage.js
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
import { Message } from '@rocket.chat/core-services';
import { Message, api } from '@rocket.chat/core-services';
import { Messages } from '@rocket.chat/models';
import { Match, check } from 'meteor/check';

import { Apps } from '../../../../ee/server/apps';
import { callbacks } from '../../../../lib/callbacks';
import { isRelativeURL } from '../../../../lib/utils/isRelativeURL';
import { isURL } from '../../../../lib/utils/isURL';
import { broadcastMessageSentEvent } from '../../../../server/modules/watchers/lib/messages';
import { hasPermissionAsync } from '../../../authorization/server/functions/hasPermission';
import { FileUpload } from '../../../file-upload/server';
import notifications from '../../../notifications/server/lib/Notifications';
Expand Down Expand Up @@ -288,6 +289,10 @@ export const sendMessage = async function (user, message, room, upsert = false,

// Execute all callbacks
await callbacks.run('afterSaveMessage', message, room);
void broadcastMessageSentEvent({
id: message._id,
broadcastCallback: (message) => api.broadcast('message.sent', message),
});
return message;
}
};
8 changes: 7 additions & 1 deletion apps/meteor/app/lib/server/functions/updateMessage.ts
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
import { Message } from '@rocket.chat/core-services';
import { Message, api } from '@rocket.chat/core-services';
import type { IEditedMessage, IMessage, IUser, AtLeast } from '@rocket.chat/core-typings';
import { Messages, Rooms } from '@rocket.chat/models';
import { Meteor } from 'meteor/meteor';

import { Apps } from '../../../../ee/server/apps';
import { callbacks } from '../../../../lib/callbacks';
import { broadcastMessageSentEvent } from '../../../../server/modules/watchers/lib/messages';
import { settings } from '../../../settings/server';
import { parseUrlsInMessage } from './parseUrlsInMessage';

Expand Down Expand Up @@ -86,6 +87,11 @@ export const updateMessage = async function (
const msg = await Messages.findOneById(_id);
if (msg) {
await callbacks.run('afterSaveMessage', msg, room, user._id);
void broadcastMessageSentEvent({
id: msg._id,
data: msg,
broadcastCallback: (message) => api.broadcast('message.sent', message),
});
}
});
};
7 changes: 6 additions & 1 deletion apps/meteor/app/lib/server/lib/notifyUsersOnMessage.js
Original file line number Diff line number Diff line change
Expand Up @@ -191,4 +191,9 @@ export async function notifyUsersOnMessage(message, room) {
return message;
}

callbacks.add('afterSaveMessage', (message, room) => notifyUsersOnMessage(message, room), callbacks.priority.LOW, 'notifyUsersOnMessage');
callbacks.add(
'afterSaveMessage',
(message, room) => notifyUsersOnMessage(message, room),
callbacks.priority.MEDIUM,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

any reason to change this to MEDIUM? just curious

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This comes from the discussion feature. I changed that because I need that to run before the discussion callback, otherwise, we would end up without the correct number of messages within that room.

await Rooms.incMsgCountAndSetLastMessageById(message.rid, 1, message.ts, settings.get('Store_Last_Message') && message);

'notifyUsersOnMessage',
);
7 changes: 6 additions & 1 deletion apps/meteor/app/message-pin/server/pinMessage.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { Message } from '@rocket.chat/core-services';
import { Message, api } from '@rocket.chat/core-services';
import { isQuoteAttachment, isRegisterUser } from '@rocket.chat/core-typings';
import type { IMessage, MessageAttachment, MessageQuoteAttachment } from '@rocket.chat/core-typings';
import { Messages, Rooms, Subscriptions, Users, ReadReceipts } from '@rocket.chat/models';
Expand All @@ -9,6 +9,7 @@ import { Meteor } from 'meteor/meteor';
import { Apps, AppEvents } from '../../../ee/server/apps/orchestrator';
import { callbacks } from '../../../lib/callbacks';
import { isTruthy } from '../../../lib/isTruthy';
import { broadcastMessageSentEvent } from '../../../server/modules/watchers/lib/messages';
import { canAccessRoomAsync, roomAccessAttributes } from '../../authorization/server';
import { hasPermissionAsync } from '../../authorization/server/functions/hasPermission';
import { isTheLastMessage } from '../../lib/server/functions/isTheLastMessage';
Expand Down Expand Up @@ -226,6 +227,10 @@ Meteor.methods<ServerMethods>({
if (settings.get('Message_Read_Receipt_Store_Users')) {
await ReadReceipts.setPinnedByMessageId(originalMessage._id, originalMessage.pinned);
}
void broadcastMessageSentEvent({
id: message._id,
broadcastCallback: (message) => api.broadcast('message.sent', message),
});

return true;
},
Expand Down
7 changes: 7 additions & 0 deletions apps/meteor/app/message-star/server/starMessage.ts
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
import { api } from '@rocket.chat/core-services';
import type { IMessage } from '@rocket.chat/core-typings';
import { Messages, Subscriptions, Rooms } from '@rocket.chat/models';
import type { ServerMethods } from '@rocket.chat/ui-contexts';
import { Meteor } from 'meteor/meteor';

import { Apps, AppEvents } from '../../../ee/server/apps/orchestrator';
import { broadcastMessageSentEvent } from '../../../server/modules/watchers/lib/messages';
import { canAccessRoomAsync, roomAccessAttributes } from '../../authorization/server';
import { isTheLastMessage } from '../../lib/server/functions/isTheLastMessage';
import { settings } from '../../settings/server';
Expand Down Expand Up @@ -60,6 +62,11 @@ Meteor.methods<ServerMethods>({

await Messages.updateUserStarById(message._id, uid, message.starred);

void broadcastMessageSentEvent({
id: message._id,
broadcastCallback: (message) => api.broadcast('message.sent', message),
});

return true;
},
});
6 changes: 6 additions & 0 deletions apps/meteor/app/reactions/server/setReaction.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import _ from 'underscore';
import { AppEvents, Apps } from '../../../ee/server/apps/orchestrator';
import { callbacks } from '../../../lib/callbacks';
import { i18n } from '../../../server/lib/i18n';
import { broadcastMessageSentEvent } from '../../../server/modules/watchers/lib/messages';
import { canAccessRoomAsync } from '../../authorization/server';
import { hasPermissionAsync } from '../../authorization/server/functions/hasPermission';
import { emoji } from '../../emoji/server';
Expand Down Expand Up @@ -106,6 +107,11 @@ async function setReaction(room: IRoom, user: IUser, message: IMessage, reaction
}

await Apps.triggerEvent(AppEvents.IPostMessageReacted, message, user, reaction, isReacted);

void broadcastMessageSentEvent({
id: message._id,
broadcastCallback: (message) => api.broadcast('message.sent', message),
});
}

export async function executeSetReaction(userId: string, reaction: string, messageId: IMessage['_id'], shouldReact?: boolean) {
Expand Down
6 changes: 6 additions & 0 deletions apps/meteor/app/threads/server/hooks/aftersavemessage.ts
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
import { api } from '@rocket.chat/core-services';
import type { IMessage, IRoom } from '@rocket.chat/core-typings';
import { isEditedMessage } from '@rocket.chat/core-typings';
import { Messages } from '@rocket.chat/models';
import { Meteor } from 'meteor/meteor';

import { callbacks } from '../../../../lib/callbacks';
import { broadcastMessageSentEvent } from '../../../../server/modules/watchers/lib/messages';
import { updateThreadUsersSubscriptions, getMentions } from '../../../lib/server/lib/notifyUsersOnMessage';
import { sendMessageNotifications } from '../../../lib/server/lib/sendNotificationsOnMessage';
import { settings } from '../../../settings/server';
Expand Down Expand Up @@ -61,6 +63,10 @@ export async function processThreads(message: IMessage, room: IRoom) {
await notifyUsersOnReply(message, replies, room);
await metaData(message, parentMessage, replies);
await notification(message, room, replies);
void broadcastMessageSentEvent({
id: message.tmid,
broadcastCallback: (message) => api.broadcast('message.sent', message),
});

return message;
}
Expand Down
17 changes: 11 additions & 6 deletions apps/meteor/server/models/raw/Messages.ts
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import type {
UpdateResult,
Document,
UpdateFilter,
ModifyResult,
} from 'mongodb';

import { otrSystemMessages } from '../../../app/otr/lib/constants';
Expand Down Expand Up @@ -1593,19 +1594,23 @@ export class MessagesRaw extends BaseRaw<IMessage> implements IMessagesModel {
* to race conditions: If multiple updates occur, the current state will be updated
* only if the new state of the discussion room is really newer.
*/
async refreshDiscussionMetadata(room: Pick<IRoom, '_id' | 'msgs' | 'lm'>): Promise<UpdateResult | Document | false> {
async refreshDiscussionMetadata(room: Pick<IRoom, '_id' | 'msgs' | 'lm'>): Promise<ModifyResult<IMessage>> {
const { _id: drid, msgs: dcount, lm: dlm } = room;

const query = {
drid,
};

return this.updateMany(query, {
$set: {
dcount,
dlm,
return this.col.findOneAndUpdate(
query,
{
$set: {
dcount,
dlm,
},
},
});
{ returnDocument: 'after' },
);
}

// //////////////////////////////////////////////////////////////////
Expand Down
4 changes: 2 additions & 2 deletions apps/meteor/server/modules/listeners/listeners.module.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import type { AppStatus } from '@rocket.chat/apps-engine/definition/AppStatus';
import type { ISetting as AppsSetting } from '@rocket.chat/apps-engine/definition/settings';
import type { IServiceClass } from '@rocket.chat/core-services';
import { EnterpriseSettings } from '@rocket.chat/core-services';
import { EnterpriseSettings, listenToMessageSentEvent } from '@rocket.chat/core-services';
import { UserStatus, isSettingColor, isSettingEnterprise } from '@rocket.chat/core-typings';
import type { IUser, IRoom, VideoConference, ISetting, IOmnichannelRoom } from '@rocket.chat/core-typings';
import { Logger } from '@rocket.chat/logger';
Expand Down Expand Up @@ -167,7 +167,7 @@ export class ListenersModule {
});
});

service.onEvent('watch.messages', ({ message }) => {
listenToMessageSentEvent(service, async (message) => {
if (!message.rid) {
return;
}
Expand Down
46 changes: 6 additions & 40 deletions apps/meteor/server/modules/watchers/watchers.module.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ import type {
IIntegration,
IEmailInbox,
IPbxEvent,
SettingValue,
ILivechatInquiryRecord,
IRole,
ILivechatPriority,
Expand All @@ -37,10 +36,10 @@ import {
Permissions,
LivechatPriority,
} from '@rocket.chat/models';
import mem from 'mem';

import { subscriptionFields, roomFields } from '../../../lib/publishFields';
import type { DatabaseWatcher } from '../../database/DatabaseWatcher';
import { broadcastMessageSentEvent } from './lib/messages';

type BroadcastCallback = <T extends keyof EventSignatures>(event: T, ...args: Parameters<EventSignatures[T]>) => Promise<void>;

Expand All @@ -65,49 +64,16 @@ export function isWatcherRunning(): boolean {
return watcherStarted;
}

const getSettingCached = mem(async (setting: string): Promise<SettingValue> => Settings.getValueById(setting), { maxAge: 10000 });

const getUserNameCached = mem(
async (userId: string): Promise<string | undefined> => {
const user = await Users.findOne<Pick<IUser, 'name'>>(userId, { projection: { name: 1 } });
return user?.name;
},
{ maxAge: 10000 },
);

const messageWatcher = (watcher: DatabaseWatcher, broadcast: BroadcastCallback): void => {
watcher.on<IMessage>(Messages.getCollectionName(), async ({ clientAction, id, data }) => {
switch (clientAction) {
case 'inserted':
case 'updated':
const message = data ?? (await Messages.findOneById(id));
if (!message) {
return;
}

if (message._hidden !== true && message.imported == null) {
const UseRealName = (await getSettingCached('UI_Use_Real_Name')) === true;

if (UseRealName) {
if (message.u?._id) {
const name = await getUserNameCached(message.u._id);
if (name) {
message.u.name = name;
}
}

if (message.mentions?.length) {
for await (const mention of message.mentions) {
const name = await getUserNameCached(mention._id);
if (name) {
mention.name = name;
}
}
}
}

void broadcast('watch.messages', { clientAction, message });
}
void broadcastMessageSentEvent({
id,
data,
broadcastCallback: (message) => broadcast('watch.messages', { clientAction, message }),
});
break;
}
});
Expand Down
Loading
Loading