diff --git a/.changeset/green-shirts-fold.md b/.changeset/green-shirts-fold.md new file mode 100644 index 000000000000..c4dc5cfcf3ad --- /dev/null +++ b/.changeset/green-shirts-fold.md @@ -0,0 +1,5 @@ +--- +"@rocket.chat/meteor": patch +--- + +Fixes condition causing Omnichannel queue to start more than once. diff --git a/apps/meteor/server/services/omnichannel/queue.ts b/apps/meteor/server/services/omnichannel/queue.ts index 8db6eedd386b..29d48b9f1f6b 100644 --- a/apps/meteor/server/services/omnichannel/queue.ts +++ b/apps/meteor/server/services/omnichannel/queue.ts @@ -1,3 +1,4 @@ +import { ServiceStarter } from '@rocket.chat/core-services'; import { type InquiryWithAgentInfo, type IOmnichannelQueue } from '@rocket.chat/core-typings'; import { License } from '@rocket.chat/license'; import { LivechatInquiry, LivechatRooms } from '@rocket.chat/models'; @@ -11,6 +12,17 @@ import { settings } from '../../../app/settings/server'; const DEFAULT_RACE_TIMEOUT = 5000; export class OmnichannelQueue implements IOmnichannelQueue { + private serviceStarter: ServiceStarter; + + private timeoutHandler: ReturnType | null = null; + + constructor() { + this.serviceStarter = new ServiceStarter( + () => this._start(), + () => this._stop(), + ); + } + private running = false; private queues: (string | undefined)[] = []; @@ -24,7 +36,7 @@ export class OmnichannelQueue implements IOmnichannelQueue { return this.running; } - async start() { + private async _start() { if (this.running) { return; } @@ -37,7 +49,7 @@ export class OmnichannelQueue implements IOmnichannelQueue { return this.execute(); } - async stop() { + private async _stop() { if (!this.running) { return; } @@ -45,9 +57,23 @@ export class OmnichannelQueue implements IOmnichannelQueue { await LivechatInquiry.unlockAll(); this.running = false; + + if (this.timeoutHandler !== null) { + clearTimeout(this.timeoutHandler); + this.timeoutHandler = null; + } + queueLogger.info('Service stopped'); } + async start() { + return this.serviceStarter.start(); + } + + async stop() { + return this.serviceStarter.stop(); + } + private async getActiveQueues() { // undefined = public queue(without department) return ([undefined] as typeof this.queues).concat(await LivechatInquiry.getDistinctQueuedDepartments({})); @@ -118,10 +144,21 @@ export class OmnichannelQueue implements IOmnichannelQueue { err: e, }); } finally { - setTimeout(this.execute.bind(this), this.delay()); + this.scheduleExecution(); } } + private scheduleExecution(): void { + if (this.timeoutHandler !== null) { + return; + } + + this.timeoutHandler = setTimeout(() => { + this.timeoutHandler = null; + return this.execute(); + }, this.delay()); + } + async shouldStart() { if (!settings.get('Livechat_enabled')) { void this.stop(); diff --git a/apps/meteor/server/services/omnichannel/service.ts b/apps/meteor/server/services/omnichannel/service.ts index ccfe2026b2ba..e5b21f4aae97 100644 --- a/apps/meteor/server/services/omnichannel/service.ts +++ b/apps/meteor/server/services/omnichannel/service.ts @@ -33,11 +33,7 @@ export class OmnichannelService extends ServiceClassInternal implements IOmnicha } async started() { - settings.watch('Livechat_enabled', (enabled) => { - void (enabled && RoutingManager.isMethodSet() ? this.queueWorker.shouldStart() : this.queueWorker.stop()); - }); - - settings.watch('Livechat_Routing_Method', async () => { + settings.watchMultiple(['Livechat_enabled', 'Livechat_Routing_Method'], () => { this.queueWorker.shouldStart(); }); diff --git a/packages/core-services/src/index.ts b/packages/core-services/src/index.ts index a0b3f65ded0c..cae8d7c77d64 100644 --- a/packages/core-services/src/index.ts +++ b/packages/core-services/src/index.ts @@ -78,6 +78,7 @@ export { } from './types/IOmnichannelAnalyticsService'; export { getConnection, getTrashCollection } from './lib/mongo'; +export { ServiceStarter } from './lib/ServiceStarter'; export { AutoUpdateRecord, diff --git a/packages/core-services/src/lib/ServiceStarter.ts b/packages/core-services/src/lib/ServiceStarter.ts new file mode 100644 index 000000000000..9c38ea6b07ec --- /dev/null +++ b/packages/core-services/src/lib/ServiceStarter.ts @@ -0,0 +1,68 @@ +// This class is used to manage calls to a service's .start and .stop functions +// Specifically for cases where the start function has different conditions that may cause the service to actually start or not, +// or when the start process can take a while to complete +// Using this class, you ensure that calls to .start and .stop will be chained, so you avoid race conditions +// At the same time, it prevents those functions from running more times than necessary if there are several calls to them (for example when loading setting values) +export class ServiceStarter { + private lock = Promise.resolve(); + + private currentCall?: 'start' | 'stop'; + + private nextCall?: 'start' | 'stop'; + + private starterFn: () => Promise; + + private stopperFn?: () => Promise; + + constructor(starterFn: () => Promise, stopperFn?: () => Promise) { + this.starterFn = starterFn; + this.stopperFn = stopperFn; + } + + private async checkStatus(): Promise { + if (this.nextCall === 'start') { + return this.doCall('start'); + } + + if (this.nextCall === 'stop') { + return this.doCall('stop'); + } + } + + private async doCall(call: 'start' | 'stop'): Promise { + this.nextCall = undefined; + this.currentCall = call; + try { + if (call === 'start') { + await this.starterFn(); + } else if (this.stopperFn) { + await this.stopperFn(); + } + } finally { + this.currentCall = undefined; + await this.checkStatus(); + } + } + + private async call(call: 'start' | 'stop'): Promise { + // If something is already chained to run after the current call, it's okay to replace it with the new call + this.nextCall = call; + if (this.currentCall) { + return this.lock; + } + this.lock = this.checkStatus(); + return this.lock; + } + + async start(): Promise { + return this.call('start'); + } + + async stop(): Promise { + return this.call('stop'); + } + + async wait(): Promise { + return this.lock; + } +} diff --git a/packages/core-services/tests/ServiceStarter.test.ts b/packages/core-services/tests/ServiceStarter.test.ts new file mode 100644 index 000000000000..2c1a20da6115 --- /dev/null +++ b/packages/core-services/tests/ServiceStarter.test.ts @@ -0,0 +1,91 @@ +import { ServiceStarter } from '../src/lib/ServiceStarter'; + +const wait = (time: number) => { + return new Promise((resolve) => { + setTimeout(() => resolve(undefined), time); + }); +}; + +describe('ServiceStarter', () => { + it('should call the starterFn and stopperFn when calling .start and .stop', async () => { + const start = jest.fn(); + const stop = jest.fn(); + + const instance = new ServiceStarter(start, stop); + + expect(start).not.toHaveBeenCalled(); + expect(stop).not.toHaveBeenCalled(); + + await instance.start(); + + expect(start).toHaveBeenCalled(); + expect(stop).not.toHaveBeenCalled(); + + start.mockReset(); + + await instance.stop(); + + expect(start).not.toHaveBeenCalled(); + expect(stop).toHaveBeenCalled(); + }); + + it('should only call .start for the second time after the initial call has finished running', async () => { + let running = false; + const start = jest.fn(async () => { + expect(running).toBe(false); + + running = true; + await wait(100); + running = false; + }); + const stop = jest.fn(); + + const instance = new ServiceStarter(start, stop); + + void instance.start(); + void instance.start(); + + await instance.wait(); + + expect(start).toHaveBeenCalledTimes(2); + expect(stop).not.toHaveBeenCalled(); + }); + + it('should chain up to two calls to .start', async () => { + const start = jest.fn(async () => { + await wait(100); + }); + const stop = jest.fn(); + + const instance = new ServiceStarter(start, stop); + + void instance.start(); + void instance.start(); + void instance.start(); + void instance.start(); + + await instance.wait(); + + expect(start).toHaveBeenCalledTimes(2); + expect(stop).not.toHaveBeenCalled(); + }); + + it('should skip the chained calls to .start if .stop is called', async () => { + const start = jest.fn(async () => { + await wait(100); + }); + const stop = jest.fn(); + + const instance = new ServiceStarter(start, stop); + + void instance.start(); + void instance.start(); + void instance.start(); + void instance.stop(); + + await instance.wait(); + + expect(start).toHaveBeenCalledTimes(1); + expect(stop).toHaveBeenCalledTimes(1); + }); +});