diff --git a/.changeset/chilly-pants-hunt.md b/.changeset/chilly-pants-hunt.md new file mode 100644 index 000000000000..0127ae7e174f --- /dev/null +++ b/.changeset/chilly-pants-hunt.md @@ -0,0 +1,5 @@ +--- +"@rocket.chat/meteor": patch +--- + +Removes a validation that allowed only the room creator to propagate E2EE room keys. This was causing issues when the rooms were created via apps or some other integration, as the creator may not be online or able to create E2EE keys diff --git a/.changeset/green-shirts-fold.md b/.changeset/green-shirts-fold.md new file mode 100644 index 000000000000..c4dc5cfcf3ad --- /dev/null +++ b/.changeset/green-shirts-fold.md @@ -0,0 +1,5 @@ +--- +"@rocket.chat/meteor": patch +--- + +Fixes condition causing Omnichannel queue to start more than once. diff --git a/.changeset/proud-cups-share.md b/.changeset/proud-cups-share.md new file mode 100644 index 000000000000..eb51d15a9382 --- /dev/null +++ b/.changeset/proud-cups-share.md @@ -0,0 +1,5 @@ +--- +"@rocket.chat/meteor": patch +--- + +Fixes `im.counters` endpoint returning `null` on `unread` messages property for users that have never opened the queried DM diff --git a/apps/meteor/app/api/server/v1/im.ts b/apps/meteor/app/api/server/v1/im.ts index fa274ef69467..d74d3decfbab 100644 --- a/apps/meteor/app/api/server/v1/im.ts +++ b/apps/meteor/app/api/server/v1/im.ts @@ -195,9 +195,9 @@ API.v1.addRoute( lm = room?.lm ? new Date(room.lm).toISOString() : new Date(room._updatedAt).toISOString(); // lm is the last message timestamp - if (subscription?.open) { + if (subscription) { + unreads = subscription.unread ?? null; if (subscription.ls && room.msgs) { - unreads = subscription.unread; unreadsFrom = new Date(subscription.ls).toISOString(); // last read timestamp } userMentions = subscription.userMentions; diff --git a/apps/meteor/app/e2e/client/rocketchat.e2e.room.ts b/apps/meteor/app/e2e/client/rocketchat.e2e.room.ts index dc7efb60dc14..f9913831533b 100644 --- a/apps/meteor/app/e2e/client/rocketchat.e2e.room.ts +++ b/apps/meteor/app/e2e/client/rocketchat.e2e.room.ts @@ -326,8 +326,7 @@ export class E2ERoom extends Emitter { try { const room = Rooms.findOne({ _id: this.roomId })!; - // Only room creator can set keys for room - if (!room.e2eKeyId && this.userShouldCreateKeys(room)) { + if (!room.e2eKeyId) { this.setState(E2ERoomState.CREATING_KEYS); await this.createGroupKey(); this.setState(E2ERoomState.READY); @@ -343,15 +342,6 @@ export class E2ERoom extends Emitter { } } - userShouldCreateKeys(room: any) { - // On DMs, we'll allow any user to set the keys - if (room.t === 'd') { - return true; - } - - return room.u._id === this.userId; - } - isSupportedRoomType(type: any) { return roomCoordinator.getRoomDirectives(type).allowRoomSettingChange({}, RoomSettingsEnum.E2E); } diff --git a/apps/meteor/server/services/omnichannel/queue.ts b/apps/meteor/server/services/omnichannel/queue.ts index 8db6eedd386b..29d48b9f1f6b 100644 --- a/apps/meteor/server/services/omnichannel/queue.ts +++ b/apps/meteor/server/services/omnichannel/queue.ts @@ -1,3 +1,4 @@ +import { ServiceStarter } from '@rocket.chat/core-services'; import { type InquiryWithAgentInfo, type IOmnichannelQueue } from '@rocket.chat/core-typings'; import { License } from '@rocket.chat/license'; import { LivechatInquiry, LivechatRooms } from '@rocket.chat/models'; @@ -11,6 +12,17 @@ import { settings } from '../../../app/settings/server'; const DEFAULT_RACE_TIMEOUT = 5000; export class OmnichannelQueue implements IOmnichannelQueue { + private serviceStarter: ServiceStarter; + + private timeoutHandler: ReturnType | null = null; + + constructor() { + this.serviceStarter = new ServiceStarter( + () => this._start(), + () => this._stop(), + ); + } + private running = false; private queues: (string | undefined)[] = []; @@ -24,7 +36,7 @@ export class OmnichannelQueue implements IOmnichannelQueue { return this.running; } - async start() { + private async _start() { if (this.running) { return; } @@ -37,7 +49,7 @@ export class OmnichannelQueue implements IOmnichannelQueue { return this.execute(); } - async stop() { + private async _stop() { if (!this.running) { return; } @@ -45,9 +57,23 @@ export class OmnichannelQueue implements IOmnichannelQueue { await LivechatInquiry.unlockAll(); this.running = false; + + if (this.timeoutHandler !== null) { + clearTimeout(this.timeoutHandler); + this.timeoutHandler = null; + } + queueLogger.info('Service stopped'); } + async start() { + return this.serviceStarter.start(); + } + + async stop() { + return this.serviceStarter.stop(); + } + private async getActiveQueues() { // undefined = public queue(without department) return ([undefined] as typeof this.queues).concat(await LivechatInquiry.getDistinctQueuedDepartments({})); @@ -118,10 +144,21 @@ export class OmnichannelQueue implements IOmnichannelQueue { err: e, }); } finally { - setTimeout(this.execute.bind(this), this.delay()); + this.scheduleExecution(); } } + private scheduleExecution(): void { + if (this.timeoutHandler !== null) { + return; + } + + this.timeoutHandler = setTimeout(() => { + this.timeoutHandler = null; + return this.execute(); + }, this.delay()); + } + async shouldStart() { if (!settings.get('Livechat_enabled')) { void this.stop(); diff --git a/apps/meteor/server/services/omnichannel/service.ts b/apps/meteor/server/services/omnichannel/service.ts index ccfe2026b2ba..e5b21f4aae97 100644 --- a/apps/meteor/server/services/omnichannel/service.ts +++ b/apps/meteor/server/services/omnichannel/service.ts @@ -33,11 +33,7 @@ export class OmnichannelService extends ServiceClassInternal implements IOmnicha } async started() { - settings.watch('Livechat_enabled', (enabled) => { - void (enabled && RoutingManager.isMethodSet() ? this.queueWorker.shouldStart() : this.queueWorker.stop()); - }); - - settings.watch('Livechat_Routing_Method', async () => { + settings.watchMultiple(['Livechat_enabled', 'Livechat_Routing_Method'], () => { this.queueWorker.shouldStart(); }); diff --git a/apps/meteor/tests/end-to-end/api/direct-message.ts b/apps/meteor/tests/end-to-end/api/direct-message.ts index 3146d351798b..9a6155fd40fe 100644 --- a/apps/meteor/tests/end-to-end/api/direct-message.ts +++ b/apps/meteor/tests/end-to-end/api/direct-message.ts @@ -343,26 +343,117 @@ describe('[Direct Messages]', () => { .end(done); }); - it('/im.counters', (done) => { - void request - .get(api('im.counters')) - .set(credentials) - .query({ - roomId: directMessage._id, - }) - .expect('Content-Type', 'application/json') - .expect(200) - .expect((res) => { - expect(res.body).to.have.property('success', true); - expect(res.body).to.have.property('joined', true); - expect(res.body).to.have.property('members'); - expect(res.body).to.have.property('unreads'); - expect(res.body).to.have.property('unreadsFrom'); - expect(res.body).to.have.property('msgs'); - expect(res.body).to.have.property('latest'); - expect(res.body).to.have.property('userMentions'); - }) - .end(done); + describe('/im.counters', () => { + it('should require auth', async () => { + await request + .get(api('im.counters')) + .expect('Content-Type', 'application/json') + .expect(401) + .expect((res) => { + expect(res.body).to.have.property('status', 'error'); + }); + }); + it('should require a roomId', async () => { + await request + .get(api('im.counters')) + .set(credentials) + .expect('Content-Type', 'application/json') + .expect(400) + .expect((res) => { + expect(res.body).to.have.property('success', false); + }); + }); + it('should work with all params right', (done) => { + void request + .get(api('im.counters')) + .set(credentials) + .query({ + roomId: directMessage._id, + }) + .expect('Content-Type', 'application/json') + .expect(200) + .expect((res) => { + expect(res.body).to.have.property('success', true); + expect(res.body).to.have.property('joined', true); + expect(res.body).to.have.property('members'); + expect(res.body).to.have.property('unreads'); + expect(res.body).to.have.property('unreadsFrom'); + expect(res.body).to.have.property('msgs'); + expect(res.body).to.have.property('latest'); + expect(res.body).to.have.property('userMentions'); + }) + .end(done); + }); + + describe('with valid room id', () => { + let testDM: IRoom & { rid: IRoom['_id'] }; + let user2: TestUser; + let userCreds: Credentials; + + before(async () => { + user2 = await createUser(); + userCreds = await login(user2.username, password); + await request + .post(api('im.create')) + .set(credentials) + .send({ + username: user2.username, + }) + .expect('Content-Type', 'application/json') + .expect(200) + .expect((res) => { + testDM = res.body.room; + }); + + await request + .post(api('chat.sendMessage')) + .set(credentials) + .send({ + message: { + text: 'Sample message', + rid: testDM._id, + }, + }) + .expect('Content-Type', 'application/json') + .expect(200) + .expect((res) => { + expect(res.body).to.have.property('success', true); + }); + }); + + after(async () => { + await request + .post(api('im.delete')) + .set(credentials) + .send({ + roomId: testDM._id, + }) + .expect(200); + + await deleteUser(user2); + }); + + it('should properly return counters before opening the dm', async () => { + await request + .get(api('im.counters')) + .set(userCreds) + .query({ + roomId: testDM._id, + }) + .expect('Content-Type', 'application/json') + .expect(200) + .expect((res) => { + expect(res.body).to.have.property('success', true); + expect(res.body).to.have.property('joined', true); + expect(res.body).to.have.property('members').and.to.be.a('number').and.to.be.eq(2); + expect(res.body).to.have.property('unreads').and.to.be.a('number').and.to.be.eq(1); + expect(res.body).to.have.property('unreadsFrom'); + expect(res.body).to.have.property('msgs').and.to.be.a('number').and.to.be.eq(1); + expect(res.body).to.have.property('latest'); + expect(res.body).to.have.property('userMentions').and.to.be.a('number').and.to.be.eq(0); + }); + }); + }); }); describe('[/im.files]', async () => { diff --git a/packages/core-services/src/index.ts b/packages/core-services/src/index.ts index a0b3f65ded0c..cae8d7c77d64 100644 --- a/packages/core-services/src/index.ts +++ b/packages/core-services/src/index.ts @@ -78,6 +78,7 @@ export { } from './types/IOmnichannelAnalyticsService'; export { getConnection, getTrashCollection } from './lib/mongo'; +export { ServiceStarter } from './lib/ServiceStarter'; export { AutoUpdateRecord, diff --git a/packages/core-services/src/lib/ServiceStarter.ts b/packages/core-services/src/lib/ServiceStarter.ts new file mode 100644 index 000000000000..9c38ea6b07ec --- /dev/null +++ b/packages/core-services/src/lib/ServiceStarter.ts @@ -0,0 +1,68 @@ +// This class is used to manage calls to a service's .start and .stop functions +// Specifically for cases where the start function has different conditions that may cause the service to actually start or not, +// or when the start process can take a while to complete +// Using this class, you ensure that calls to .start and .stop will be chained, so you avoid race conditions +// At the same time, it prevents those functions from running more times than necessary if there are several calls to them (for example when loading setting values) +export class ServiceStarter { + private lock = Promise.resolve(); + + private currentCall?: 'start' | 'stop'; + + private nextCall?: 'start' | 'stop'; + + private starterFn: () => Promise; + + private stopperFn?: () => Promise; + + constructor(starterFn: () => Promise, stopperFn?: () => Promise) { + this.starterFn = starterFn; + this.stopperFn = stopperFn; + } + + private async checkStatus(): Promise { + if (this.nextCall === 'start') { + return this.doCall('start'); + } + + if (this.nextCall === 'stop') { + return this.doCall('stop'); + } + } + + private async doCall(call: 'start' | 'stop'): Promise { + this.nextCall = undefined; + this.currentCall = call; + try { + if (call === 'start') { + await this.starterFn(); + } else if (this.stopperFn) { + await this.stopperFn(); + } + } finally { + this.currentCall = undefined; + await this.checkStatus(); + } + } + + private async call(call: 'start' | 'stop'): Promise { + // If something is already chained to run after the current call, it's okay to replace it with the new call + this.nextCall = call; + if (this.currentCall) { + return this.lock; + } + this.lock = this.checkStatus(); + return this.lock; + } + + async start(): Promise { + return this.call('start'); + } + + async stop(): Promise { + return this.call('stop'); + } + + async wait(): Promise { + return this.lock; + } +} diff --git a/packages/core-services/tests/ServiceStarter.test.ts b/packages/core-services/tests/ServiceStarter.test.ts new file mode 100644 index 000000000000..2c1a20da6115 --- /dev/null +++ b/packages/core-services/tests/ServiceStarter.test.ts @@ -0,0 +1,91 @@ +import { ServiceStarter } from '../src/lib/ServiceStarter'; + +const wait = (time: number) => { + return new Promise((resolve) => { + setTimeout(() => resolve(undefined), time); + }); +}; + +describe('ServiceStarter', () => { + it('should call the starterFn and stopperFn when calling .start and .stop', async () => { + const start = jest.fn(); + const stop = jest.fn(); + + const instance = new ServiceStarter(start, stop); + + expect(start).not.toHaveBeenCalled(); + expect(stop).not.toHaveBeenCalled(); + + await instance.start(); + + expect(start).toHaveBeenCalled(); + expect(stop).not.toHaveBeenCalled(); + + start.mockReset(); + + await instance.stop(); + + expect(start).not.toHaveBeenCalled(); + expect(stop).toHaveBeenCalled(); + }); + + it('should only call .start for the second time after the initial call has finished running', async () => { + let running = false; + const start = jest.fn(async () => { + expect(running).toBe(false); + + running = true; + await wait(100); + running = false; + }); + const stop = jest.fn(); + + const instance = new ServiceStarter(start, stop); + + void instance.start(); + void instance.start(); + + await instance.wait(); + + expect(start).toHaveBeenCalledTimes(2); + expect(stop).not.toHaveBeenCalled(); + }); + + it('should chain up to two calls to .start', async () => { + const start = jest.fn(async () => { + await wait(100); + }); + const stop = jest.fn(); + + const instance = new ServiceStarter(start, stop); + + void instance.start(); + void instance.start(); + void instance.start(); + void instance.start(); + + await instance.wait(); + + expect(start).toHaveBeenCalledTimes(2); + expect(stop).not.toHaveBeenCalled(); + }); + + it('should skip the chained calls to .start if .stop is called', async () => { + const start = jest.fn(async () => { + await wait(100); + }); + const stop = jest.fn(); + + const instance = new ServiceStarter(start, stop); + + void instance.start(); + void instance.start(); + void instance.start(); + void instance.stop(); + + await instance.wait(); + + expect(start).toHaveBeenCalledTimes(1); + expect(stop).toHaveBeenCalledTimes(1); + }); +});