diff --git a/apps/meteor/app/livechat/lib/inquiries.ts b/apps/meteor/app/livechat/lib/inquiries.ts index dddf32ee5467..488151aa4166 100644 --- a/apps/meteor/app/livechat/lib/inquiries.ts +++ b/apps/meteor/app/livechat/lib/inquiries.ts @@ -6,13 +6,16 @@ type ReturnType = | { priorityWeight: SortOrder; ts: SortOrder; + _updatedAt: SortOrder; } | { estimatedWaitingTimeQueue: SortOrder; ts: SortOrder; + _updatedAt: SortOrder; } | { ts: SortOrder; + _updatedAt: SortOrder; }; export const getOmniChatSortQuery = ( @@ -20,11 +23,11 @@ export const getOmniChatSortQuery = ( ): ReturnType => { switch (sortByMechanism) { case OmnichannelSortingMechanismSettingType.Priority: - return { priorityWeight: 1, ts: 1 }; + return { priorityWeight: 1, ts: 1, _updatedAt: -1 }; case OmnichannelSortingMechanismSettingType.SLAs: - return { estimatedWaitingTimeQueue: 1, ts: 1 }; + return { estimatedWaitingTimeQueue: 1, ts: 1, _updatedAt: -1 }; case OmnichannelSortingMechanismSettingType.Timestamp: default: - return { ts: 1 }; + return { ts: 1, _updatedAt: -1 }; } }; diff --git a/apps/meteor/app/livechat/server/lib/RoutingManager.ts b/apps/meteor/app/livechat/server/lib/RoutingManager.ts index ebbd931c1b2b..0e975ca06763 100644 --- a/apps/meteor/app/livechat/server/lib/RoutingManager.ts +++ b/apps/meteor/app/livechat/server/lib/RoutingManager.ts @@ -1,4 +1,4 @@ -import { Message } from '@rocket.chat/core-services'; +import { Message, Omnichannel } from '@rocket.chat/core-services'; import type { ILivechatInquiryRecord, ILivechatVisitor, @@ -35,7 +35,7 @@ type Routing = { methods: Record; startQueue(): void; isMethodSet(): boolean; - setMethodNameAndStartQueue(name: string): void; + setMethodNameAndStartQueue(name: string): Promise; registerMethod(name: string, Method: IRoutingMethodConstructor): void; getMethod(): IRoutingMethod; getConfig(): RoutingMethodConfig | undefined; @@ -73,7 +73,7 @@ export const RoutingManager: Routing = { return !!this.methodName; }, - setMethodNameAndStartQueue(name) { + async setMethodNameAndStartQueue(name) { logger.debug(`Changing default routing method from ${this.methodName} to ${name}`); if (!this.methods[name]) { logger.warn(`Cannot change routing method to ${name}. Selected Routing method does not exists. Defaulting to Manual_Selection`); @@ -82,7 +82,7 @@ export const RoutingManager: Routing = { this.methodName = name; } - this.startQueue(); + void (await Omnichannel.getQueueWorker()).shouldStart(); }, // eslint-disable-next-line @typescript-eslint/naming-convention diff --git a/apps/meteor/app/livechat/server/startup.ts b/apps/meteor/app/livechat/server/startup.ts index 84b2adfa0755..f24f88975b22 100644 --- a/apps/meteor/app/livechat/server/startup.ts +++ b/apps/meteor/app/livechat/server/startup.ts @@ -73,7 +73,7 @@ Meteor.startup(async () => { }); settings.watch('Livechat_Routing_Method', (value) => { - RoutingManager.setMethodNameAndStartQueue(value); + void RoutingManager.setMethodNameAndStartQueue(value); }); // Remove when accounts.onLogout is async diff --git a/apps/meteor/ee/app/livechat-enterprise/server/lib/Helper.ts b/apps/meteor/ee/app/livechat-enterprise/server/lib/Helper.ts index 7944ca88320c..0df5d95c86c9 100644 --- a/apps/meteor/ee/app/livechat-enterprise/server/lib/Helper.ts +++ b/apps/meteor/ee/app/livechat-enterprise/server/lib/Helper.ts @@ -11,8 +11,6 @@ import { import moment from 'moment'; import type { Document } from 'mongodb'; -import { dispatchAgentDelegated } from '../../../../../app/livechat/server/lib/Helper'; -import { RoutingManager } from '../../../../../app/livechat/server/lib/RoutingManager'; import { getInquirySortMechanismSetting } from '../../../../../app/livechat/server/lib/settings'; import { settings } from '../../../../../app/settings/server'; import { callbacks } from '../../../../../lib/callbacks'; @@ -146,35 +144,6 @@ const dispatchWaitingQueueStatus = async (department?: string) => { // but we don't need to notify _each_ change that takes place, just their final position export const debouncedDispatchWaitingQueueStatus = memoizeDebounce(dispatchWaitingQueueStatus, 1200); -export const processWaitingQueue = async (department: string | undefined, inquiry: InquiryWithAgentInfo) => { - const queue = department || 'Public'; - helperLogger.debug(`Processing items on queue ${queue}`); - - helperLogger.debug(`Processing inquiry ${inquiry._id} from queue ${queue}`); - const { defaultAgent } = inquiry; - // TODO: remove this typecast when routing manager becomes TS - const room = (await RoutingManager.delegateInquiry(inquiry, defaultAgent)) as IOmnichannelRoom; - - const propagateAgentDelegated = async (rid: string, agentId: string) => { - await dispatchAgentDelegated(rid, agentId); - }; - - if (room?.servedBy) { - const { - _id: rid, - servedBy: { _id: agentId }, - } = room; - helperLogger.debug(`Inquiry ${inquiry._id} taken successfully by agent ${agentId}. Notifying`); - setTimeout(() => { - void propagateAgentDelegated(rid, agentId); - }, 1000); - - return true; - } - - return false; -}; - export const setPredictedVisitorAbandonmentTime = async (room: IOmnichannelRoom) => { if ( !room.v?.lastMessageTs || diff --git a/apps/meteor/ee/app/livechat-enterprise/server/lib/LivechatEnterprise.ts b/apps/meteor/ee/app/livechat-enterprise/server/lib/LivechatEnterprise.ts index d4d71ac14493..83a2963a54d8 100644 --- a/apps/meteor/ee/app/livechat-enterprise/server/lib/LivechatEnterprise.ts +++ b/apps/meteor/ee/app/livechat-enterprise/server/lib/LivechatEnterprise.ts @@ -1,11 +1,5 @@ -import type { - IOmnichannelBusinessUnit, - IOmnichannelServiceLevelAgreements, - LivechatDepartmentDTO, - InquiryWithAgentInfo, -} from '@rocket.chat/core-typings'; +import type { IOmnichannelBusinessUnit, IOmnichannelServiceLevelAgreements, LivechatDepartmentDTO } from '@rocket.chat/core-typings'; import { - LivechatInquiry, Users, LivechatDepartment as LivechatDepartmentRaw, OmnichannelServiceLevelAgreements, @@ -17,16 +11,12 @@ import { Match, check } from 'meteor/check'; import { Meteor } from 'meteor/meteor'; import { updateDepartmentAgents } from '../../../../../app/livechat/server/lib/Helper'; -import { RoutingManager } from '../../../../../app/livechat/server/lib/RoutingManager'; -import { getInquirySortMechanismSetting } from '../../../../../app/livechat/server/lib/settings'; -import { settings } from '../../../../../app/settings/server'; import { callbacks } from '../../../../../lib/callbacks'; import { addUserRolesAsync } from '../../../../../server/lib/roles/addUserRoles'; import { removeUserFromRolesAsync } from '../../../../../server/lib/roles/removeUserFromRoles'; import { hasLicense } from '../../../license/server/license'; -import { processWaitingQueue, updateSLAInquiries } from './Helper'; +import { updateSLAInquiries } from './Helper'; import { removeSLAFromRooms } from './SlaHelper'; -import { queueLogger } from './logger'; export const LivechatEnterprise = { async addMonitor(username: string) { @@ -292,116 +282,3 @@ export const LivechatEnterprise = { return hasLicense('livechat-enterprise') || (await LivechatDepartmentRaw.countTotal()) === 0; }, }; - -const DEFAULT_RACE_TIMEOUT = 5000; -let queueDelayTimeout = DEFAULT_RACE_TIMEOUT; - -type QueueWorker = { - running: boolean; - queues: (string | undefined)[]; - start(): Promise; - stop(): Promise; - getActiveQueues(): Promise<(string | undefined)[]>; - nextQueue(): Promise; - execute(): Promise; - checkQueue(queue: string | undefined): Promise; -}; - -const queueWorker: QueueWorker = { - running: false, - queues: [], - async start() { - queueLogger.debug('Starting queue'); - if (this.running) { - queueLogger.debug('Queue already running'); - return; - } - - const activeQueues = await this.getActiveQueues(); - queueLogger.debug(`Active queues: ${activeQueues.length}`); - - this.running = true; - return this.execute(); - }, - async stop() { - queueLogger.debug('Stopping queue'); - await LivechatInquiry.unlockAll(); - - this.running = false; - }, - async getActiveQueues() { - // undefined = public queue(without department) - return ([undefined] as (undefined | string)[]).concat(await LivechatInquiry.getDistinctQueuedDepartments({})); - }, - async nextQueue() { - if (!this.queues.length) { - queueLogger.debug('No more registered queues. Refreshing'); - this.queues = await this.getActiveQueues(); - } - - return this.queues.shift(); - }, - async execute() { - if (!this.running) { - queueLogger.debug('Queue stopped. Cannot execute'); - return; - } - - const queue = await this.nextQueue(); - queueLogger.debug(`Executing queue ${queue || 'Public'} with timeout of ${queueDelayTimeout}`); - - setTimeout(this.checkQueue.bind(this, queue), queueDelayTimeout); - }, - - async checkQueue(queue) { - queueLogger.debug(`Processing items for queue ${queue || 'Public'}`); - try { - const nextInquiry = await LivechatInquiry.findNextAndLock(getInquirySortMechanismSetting(), queue); - if (!nextInquiry) { - queueLogger.debug(`No more items for queue ${queue || 'Public'}`); - return; - } - - const result = await processWaitingQueue(queue, nextInquiry as InquiryWithAgentInfo); - - if (!result) { - await LivechatInquiry.unlock(nextInquiry._id); - } - } catch (e) { - queueLogger.error({ - msg: `Error processing queue ${queue || 'public'}`, - err: e, - }); - } finally { - void this.execute(); - } - }, -}; - -let omnichannelIsEnabled = false; -function shouldQueueStart() { - if (!omnichannelIsEnabled) { - void queueWorker.stop(); - return; - } - - const routingSupportsAutoAssign = RoutingManager.getConfig()?.autoAssignAgent; - queueLogger.debug( - `Routing method ${RoutingManager.methodName} supports auto assignment: ${routingSupportsAutoAssign}. ${ - routingSupportsAutoAssign ? 'Starting' : 'Stopping' - } queue`, - ); - - void (routingSupportsAutoAssign ? queueWorker.start() : queueWorker.stop()); -} - -RoutingManager.startQueue = shouldQueueStart; - -settings.watch('Livechat_enabled', (enabled) => { - omnichannelIsEnabled = enabled; - void (omnichannelIsEnabled && RoutingManager.isMethodSet() ? shouldQueueStart() : queueWorker.stop()); -}); - -settings.watch('Omnichannel_queue_delay_timeout', (timeout) => { - queueDelayTimeout = timeout < 1 ? DEFAULT_RACE_TIMEOUT : timeout * 1000; -}); diff --git a/apps/meteor/ee/app/livechat-enterprise/server/lib/logger.ts b/apps/meteor/ee/app/livechat-enterprise/server/lib/logger.ts index cef654325050..4e7a8eab5932 100644 --- a/apps/meteor/ee/app/livechat-enterprise/server/lib/logger.ts +++ b/apps/meteor/ee/app/livechat-enterprise/server/lib/logger.ts @@ -3,7 +3,6 @@ import { Logger } from '@rocket.chat/logger'; export const logger = new Logger('LivechatEnterprise'); export const queriesLogger = logger.section('Queries'); -export const queueLogger = logger.section('Queue'); export const helperLogger = logger.section('Helper'); export const cbLogger = logger.section('Callbacks'); export const bhLogger = logger.section('Business-Hours'); diff --git a/apps/meteor/server/services/omnichannel/logger.ts b/apps/meteor/server/services/omnichannel/logger.ts new file mode 100644 index 000000000000..208d1e5e9adf --- /dev/null +++ b/apps/meteor/server/services/omnichannel/logger.ts @@ -0,0 +1,3 @@ +import { Logger } from '@rocket.chat/logger'; + +export const queueLogger = new Logger('OmnichannelQueue'); diff --git a/apps/meteor/server/services/omnichannel/queue.ts b/apps/meteor/server/services/omnichannel/queue.ts new file mode 100644 index 000000000000..8bf5453587ce --- /dev/null +++ b/apps/meteor/server/services/omnichannel/queue.ts @@ -0,0 +1,142 @@ +import type { InquiryWithAgentInfo, IOmnichannelQueue } from '@rocket.chat/core-typings'; +import { LivechatInquiry } from '@rocket.chat/models'; + +import { dispatchAgentDelegated } from '../../../app/livechat/server/lib/Helper'; +import { RoutingManager } from '../../../app/livechat/server/lib/RoutingManager'; +import { getInquirySortMechanismSetting } from '../../../app/livechat/server/lib/settings'; +import { settings } from '../../../app/settings/server'; +import { queueLogger } from './logger'; + +const DEFAULT_RACE_TIMEOUT = 5000; + +export class OmnichannelQueue implements IOmnichannelQueue { + private running = false; + + private queues: (string | undefined)[] = []; + + private delay() { + const timeout = settings.get('Omnichannel_queue_delay_timeout'); + return timeout < 1 ? DEFAULT_RACE_TIMEOUT : timeout * 1000; + } + + async start() { + queueLogger.debug('Starting queue'); + if (this.running) { + queueLogger.debug('Queue already running'); + return; + } + + const activeQueues = await this.getActiveQueues(); + queueLogger.debug(`Active queues: ${activeQueues.length}`); + + this.running = true; + return this.execute(); + } + + async stop() { + queueLogger.debug('Stopping queue'); + await LivechatInquiry.unlockAll(); + + this.running = false; + } + + private async getActiveQueues() { + // undefined = public queue(without department) + return ([undefined] as typeof this.queues).concat(await LivechatInquiry.getDistinctQueuedDepartments({})); + } + + private async nextQueue() { + if (!this.queues.length) { + queueLogger.debug('No more registered queues. Refreshing'); + this.queues = await this.getActiveQueues(); + } + + return this.queues.shift(); + } + + private async execute() { + if (!this.running) { + queueLogger.debug('Queue stopped. Cannot execute'); + return; + } + + const queue = await this.nextQueue(); + const queueDelayTimeout = this.delay(); + queueLogger.debug(`Executing queue ${queue || 'Public'} with timeout of ${queueDelayTimeout}`); + + setTimeout(this.checkQueue.bind(this, queue), queueDelayTimeout); + } + + private async checkQueue(queue: string | undefined) { + queueLogger.debug(`Processing items for queue ${queue || 'Public'}`); + try { + const nextInquiry = await LivechatInquiry.findNextAndLock(getInquirySortMechanismSetting(), queue); + if (!nextInquiry) { + queueLogger.debug(`No more items for queue ${queue || 'Public'}`); + return; + } + + const result = await this.processWaitingQueue(queue, nextInquiry as InquiryWithAgentInfo); + + if (!result) { + // Note: this removes the "one-shot" behavior of queue, allowing it to take a conversation again in the future + // And sorting them by _updatedAt: -1 will make it so that the oldest inquiries are taken first + // preventing us from playing with the same inquiry over and over again + await LivechatInquiry.unlock(nextInquiry._id); + } + } catch (e) { + queueLogger.error({ + msg: 'Error processing queue', + queue: queue || 'Public', + err: e, + }); + } finally { + void this.execute(); + } + } + + shouldStart() { + if (!settings.get('Livechat_enabled')) { + void this.stop(); + return; + } + + const routingSupportsAutoAssign = RoutingManager.getConfig()?.autoAssignAgent; + queueLogger.debug({ + msg: 'Routing method supports auto assignment', + method: RoutingManager.methodName, + status: routingSupportsAutoAssign ? 'Starting' : 'Stopping', + }); + + void (routingSupportsAutoAssign ? this.start() : this.stop()); + } + + private async processWaitingQueue(department: string | undefined, inquiry: InquiryWithAgentInfo) { + const queue = department || 'Public'; + queueLogger.debug(`Processing items on queue ${queue}`); + + queueLogger.debug(`Processing inquiry ${inquiry._id} from queue ${queue}`); + const { defaultAgent } = inquiry; + const room = await RoutingManager.delegateInquiry(inquiry, defaultAgent); + + const propagateAgentDelegated = async (rid: string, agentId: string) => { + await dispatchAgentDelegated(rid, agentId); + }; + + if (room?.servedBy) { + const { + _id: rid, + servedBy: { _id: agentId }, + } = room; + queueLogger.debug(`Inquiry ${inquiry._id} taken successfully by agent ${agentId}. Notifying`); + setTimeout(() => { + void propagateAgentDelegated(rid, agentId); + }, 1000); + + return true; + } + + queueLogger.debug(`Inquiry ${inquiry._id} not taken by any agent. Queueing again`); + return false; + } +} diff --git a/apps/meteor/server/services/omnichannel/service.ts b/apps/meteor/server/services/omnichannel/service.ts index b5e83e30836b..7f35de104e1c 100644 --- a/apps/meteor/server/services/omnichannel/service.ts +++ b/apps/meteor/server/services/omnichannel/service.ts @@ -1,11 +1,22 @@ import { ServiceClassInternal } from '@rocket.chat/core-services'; import type { IOmnichannelService } from '@rocket.chat/core-services'; +import type { IOmnichannelQueue } from '@rocket.chat/core-typings'; import { Livechat } from '../../../app/livechat/server'; +import { RoutingManager } from '../../../app/livechat/server/lib/RoutingManager'; +import { settings } from '../../../app/settings/server'; +import { OmnichannelQueue } from './queue'; export class OmnichannelService extends ServiceClassInternal implements IOmnichannelService { protected name = 'omnichannel'; + private queueWorker: IOmnichannelQueue; + + constructor() { + super(); + this.queueWorker = new OmnichannelQueue(); + } + async created() { this.onEvent('presence.status', async ({ user }): Promise => { if (!user?._id) { @@ -18,4 +29,14 @@ export class OmnichannelService extends ServiceClassInternal implements IOmnicha } }); } + + async started() { + settings.watch('Livechat_enabled', (enabled) => { + void (enabled && RoutingManager.isMethodSet() ? this.queueWorker.shouldStart() : this.queueWorker.stop()); + }); + } + + getQueueWorker(): IOmnichannelQueue { + return this.queueWorker; + } } diff --git a/packages/core-services/src/index.ts b/packages/core-services/src/index.ts index 723b3b27103c..def7622c9881 100644 --- a/packages/core-services/src/index.ts +++ b/packages/core-services/src/index.ts @@ -153,6 +153,7 @@ export const Settings = proxifyWithWait('settings'); export const OmnichannelIntegration = proxifyWithWait('omnichannel-integration'); export const Federation = proxifyWithWait('federation'); export const FederationEE = proxifyWithWait('federation-enterprise'); +export const Omnichannel = proxifyWithWait('omnichannel'); export const OmnichannelEEService = proxifyWithWait('omnichannel-ee'); export const Import = proxifyWithWait('import'); diff --git a/packages/core-services/src/types/IOmnichannelService.ts b/packages/core-services/src/types/IOmnichannelService.ts index d5aaae341dad..fb3cc60d9243 100644 --- a/packages/core-services/src/types/IOmnichannelService.ts +++ b/packages/core-services/src/types/IOmnichannelService.ts @@ -1,3 +1,7 @@ +import type { IOmnichannelQueue } from '@rocket.chat/core-typings'; + import type { IServiceClass } from './ServiceClass'; -export type IOmnichannelService = IServiceClass; +export interface IOmnichannelService extends IServiceClass { + getQueueWorker(): IOmnichannelQueue; +} diff --git a/packages/core-typings/src/omnichannel/index.ts b/packages/core-typings/src/omnichannel/index.ts index 0995968370e4..703cf3b4ca77 100644 --- a/packages/core-typings/src/omnichannel/index.ts +++ b/packages/core-typings/src/omnichannel/index.ts @@ -1,3 +1,4 @@ export * from './sms'; export * from './routing'; +export * from './queue'; export * from './reports'; diff --git a/packages/core-typings/src/omnichannel/queue.ts b/packages/core-typings/src/omnichannel/queue.ts new file mode 100644 index 000000000000..46036622713f --- /dev/null +++ b/packages/core-typings/src/omnichannel/queue.ts @@ -0,0 +1,5 @@ +export interface IOmnichannelQueue { + start(): Promise; + shouldStart(): void; + stop(): Promise; +}