Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: Add the possibility for removing the watcher for the message collection #30381

Merged
merged 17 commits into from
Nov 23, 2023
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
Show all changes
17 commits
Select commit Hold shift + click to select a range
957fda2
chore: add a flag to disable db watchers on the message collection
MarcosSpessatto Sep 13, 2023
5e61b10
Merge branch 'develop' into chore/remove-message-db-watcher-1
MarcosSpessatto Sep 13, 2023
be538b1
Merge branch 'develop' into chore/remove-message-db-watcher-1
MarcosSpessatto Sep 15, 2023
b6327ff
Merge branch 'develop' into chore/remove-message-db-watcher-1
MarcosSpessatto Sep 18, 2023
ef450c8
Merge branch 'develop' into chore/remove-message-db-watcher-1
MarcosSpessatto Oct 26, 2023
1126a8a
Merge branch 'develop' into chore/remove-message-db-watcher-1
sampaiodiego Nov 3, 2023
de50690
Merge branch 'develop' into chore/remove-message-db-watcher-1
sampaiodiego Nov 7, 2023
23b67e3
Merge branch 'develop' into chore/remove-message-db-watcher-1
MarcosSpessatto Nov 8, 2023
ce94c2a
Merge branch 'develop' into chore/remove-message-db-watcher-1
MarcosSpessatto Nov 10, 2023
13e4a99
chore: remove unnecessary fn
MarcosSpessatto Nov 10, 2023
4504415
chore: publish events directly using broadcast
MarcosSpessatto Nov 10, 2023
4a92a42
chore: useless param
MarcosSpessatto Nov 10, 2023
62965d3
Merge branch 'develop' into chore/remove-message-db-watcher-1
MarcosSpessatto Nov 10, 2023
62a7f29
chore: undo param change
MarcosSpessatto Nov 10, 2023
040c1af
chore: delete user working without relying on msg db watcher (#30435)
MarcosSpessatto Nov 10, 2023
a7a264e
Merge branch 'develop' into chore/remove-message-db-watcher-1
MarcosSpessatto Nov 20, 2023
b790dd8
Merge branch 'develop' into chore/remove-message-db-watcher-1
sampaiodiego Nov 21, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions apps/meteor/server/lib/isRunningMs.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,15 @@
import type { EventSignatures } from '@rocket.chat/core-services';
import { api } from '@rocket.chat/core-services';

/**
* Checks if the server is running in micro services mode
* @returns {boolean}
*/
export function isRunningMs(): boolean {
return !!process.env.TRANSPORTER?.match(/^(?:nats|TCP)/);
}

export const broadcastEventToServices = <T extends keyof EventSignatures>(
event: T,
...args: Parameters<EventSignatures[T]>
): Promise<void> => (isRunningMs() ? api.broadcast(event, ...args) : api.broadcastLocal(event, ...args));
52 changes: 52 additions & 0 deletions apps/meteor/server/modules/watchers/lib/messages.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
import type { IMessage, SettingValue, IUser } from '@rocket.chat/core-typings';
import { Messages, Settings, Users } from '@rocket.chat/models';
import mem from 'mem';

const getSettingCached = mem(async (setting: string): Promise<SettingValue> => Settings.getValueById(setting), { maxAge: 10000 });

const getUserNameCached = mem(
async (userId: string): Promise<string | undefined> => {
const user = await Users.findOne<Pick<IUser, 'name'>>(userId, { projection: { name: 1 } });
return user?.name;
},
{ maxAge: 10000 },
);

export const broadcastMessageSentEvent = async ({
id,
data,
broadcastCallback,
}: {
id: IMessage['_id'];
broadcastCallback: (message: IMessage) => Promise<void>;
data?: IMessage;
}): Promise<void> => {
const message = data ?? (await Messages.findOneById(id));
if (!message) {
return;
}

if (message._hidden !== true && message.imported == null) {
const UseRealName = (await getSettingCached('UI_Use_Real_Name')) === true;

if (UseRealName) {
if (message.u?._id) {
const name = await getUserNameCached(message.u._id);
if (name) {
message.u.name = name;
}
}

if (message.mentions?.length) {
for await (const mention of message.mentions) {
const name = await getUserNameCached(mention._id);
if (name) {
mention.name = name;
}
}
}
}

void broadcastCallback(message);
}
};
26 changes: 17 additions & 9 deletions apps/meteor/server/modules/watchers/watchers.module.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import type { EventSignatures } from '@rocket.chat/core-services';
import { dbWatchersDisabled } from '@rocket.chat/core-services';
import type {
ISubscription,
IUser,
Expand Down Expand Up @@ -64,17 +65,17 @@ export function isWatcherRunning(): boolean {
return watcherStarted;
}

export function initWatchers(watcher: DatabaseWatcher, broadcast: BroadcastCallback): void {
const getSettingCached = mem(async (setting: string): Promise<SettingValue> => Settings.getValueById(setting), { maxAge: 10000 });
const getSettingCached = mem(async (setting: string): Promise<SettingValue> => Settings.getValueById(setting), { maxAge: 10000 });

const getUserNameCached = mem(
async (userId: string): Promise<string | undefined> => {
const user = await Users.findOne<Pick<IUser, 'name'>>(userId, { projection: { name: 1 } });
return user?.name;
},
{ maxAge: 10000 },
);
const getUserNameCached = mem(
async (userId: string): Promise<string | undefined> => {
const user = await Users.findOne<Pick<IUser, 'name'>>(userId, { projection: { name: 1 } });
return user?.name;
},
{ maxAge: 10000 },
);

const messageWatcher = (watcher: DatabaseWatcher, broadcast: BroadcastCallback): void => {
watcher.on<IMessage>(Messages.getCollectionName(), async ({ clientAction, id, data }) => {
switch (clientAction) {
case 'inserted':
Expand Down Expand Up @@ -110,6 +111,13 @@ export function initWatchers(watcher: DatabaseWatcher, broadcast: BroadcastCallb
break;
}
});
};

export function initWatchers(watcher: DatabaseWatcher, broadcast: BroadcastCallback): void {
const dbWatchersEnabled = !dbWatchersDisabled;
if (dbWatchersEnabled) {
messageWatcher(watcher, broadcast);
}

watcher.on<ISubscription>(Subscriptions.getCollectionName(), async ({ clientAction, id, data, diff }) => {
switch (clientAction) {
Expand Down
2 changes: 1 addition & 1 deletion packages/core-services/src/LocalBroker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ import { EventEmitter } from 'events';
import { InstanceStatus } from '@rocket.chat/models';

import { asyncLocalStorage } from '.';
import type { EventSignatures } from './Events';
import type { EventSignatures } from './events/Events';
import type { IBroker, IBrokerNode } from './types/IBroker';
import type { ServiceClass, IServiceClass } from './types/ServiceClass';

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ import type {
IBanner,
} from '@rocket.chat/core-typings';

import type { AutoUpdateRecord } from './types/IMeteor';
import type { AutoUpdateRecord } from '../types/IMeteor';

type ClientAction = 'inserted' | 'updated' | 'removed' | 'changed';

Expand Down Expand Up @@ -267,4 +267,5 @@ export type EventSignatures = {
'command.updated'(command: string): void;
'command.removed'(command: string): void;
'actions.changed'(): void;
'message.sent'(message: IMessage): void;
};
12 changes: 12 additions & 0 deletions packages/core-services/src/events/listeners.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
import type { IMessage } from '@rocket.chat/core-typings';

import type { IServiceClass } from '../types/ServiceClass';

export const dbWatchersDisabled = ['yes', 'true'].includes(String(process.env.DISABLE_DB_WATCHERS).toLowerCase());

export const listenToMessageSentEvent = (service: IServiceClass, action: (message: IMessage) => Promise<void>): void => {
if (dbWatchersDisabled) {
return service.onEvent('message.sent', (message: IMessage) => action(message));
}
return service.onEvent('watch.messages', ({ message }) => action(message));
};
Comment on lines +7 to +12
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@ggazzo @sampaiodiego I decided to create another event to handle the "manual" process of sending events instead of using the existing watch.messages.

3 changes: 2 additions & 1 deletion packages/core-services/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,8 @@ import type { IVoipService } from './types/IVoipService';
export { asyncLocalStorage } from './lib/asyncLocalStorage';
export { MeteorError, isMeteorError } from './MeteorError';
export { api } from './api';
export { EventSignatures } from './Events';
export { EventSignatures } from './events/Events';
export { listenToMessageSentEvent, dbWatchersDisabled } from './events/listeners';
export { LocalBroker } from './LocalBroker';

export { IBroker, IBrokerNode, BaseMetricOptions, IServiceMetrics } from './types/IBroker';
Expand Down
2 changes: 1 addition & 1 deletion packages/core-services/src/lib/Api.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import type { EventSignatures } from '../Events';
import type { EventSignatures } from '../events/Events';
import type { IApiService } from '../types/IApiService';
import type { IBroker, IBrokerNode } from '../types/IBroker';
import type { IServiceClass } from '../types/ServiceClass';
Expand Down
2 changes: 1 addition & 1 deletion packages/core-services/src/types/IApiService.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import type { EventSignatures } from '../Events';
import type { EventSignatures } from '../events/Events';
import type { IBroker, IBrokerNode } from './IBroker';
import type { IServiceClass } from './ServiceClass';

Expand Down
2 changes: 1 addition & 1 deletion packages/core-services/src/types/IBroker.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import type { EventSignatures } from '../Events';
import type { EventSignatures } from '../events/Events';
import type { IServiceClass } from './ServiceClass';

export interface IBrokerNode {
Expand Down
2 changes: 1 addition & 1 deletion packages/core-services/src/types/ServiceClass.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import { EventEmitter } from 'events';

import type { EventSignatures } from '../Events';
import type { EventSignatures } from '../events/Events';
import { asyncLocalStorage } from '../lib/asyncLocalStorage';
import type { IApiService } from './IApiService';
import type { IBroker, IBrokerNode } from './IBroker';
Expand Down