Skip to content

Commit

Permalink
refactor: Move queueWorker logic to CE & add it to omnichannel serv…
Browse files Browse the repository at this point in the history
…ice (#29721)
  • Loading branch information
KevLehman authored Aug 31, 2023
1 parent 0f56aac commit 7e0f76b
Show file tree
Hide file tree
Showing 13 changed files with 191 additions and 166 deletions.
9 changes: 6 additions & 3 deletions apps/meteor/app/livechat/lib/inquiries.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,25 +6,28 @@ type ReturnType =
| {
priorityWeight: SortOrder;
ts: SortOrder;
_updatedAt: SortOrder;
}
| {
estimatedWaitingTimeQueue: SortOrder;
ts: SortOrder;
_updatedAt: SortOrder;
}
| {
ts: SortOrder;
_updatedAt: SortOrder;
};

export const getOmniChatSortQuery = (
sortByMechanism: OmnichannelSortingMechanismSettingType = OmnichannelSortingMechanismSettingType.Timestamp,
): ReturnType => {
switch (sortByMechanism) {
case OmnichannelSortingMechanismSettingType.Priority:
return { priorityWeight: 1, ts: 1 };
return { priorityWeight: 1, ts: 1, _updatedAt: -1 };
case OmnichannelSortingMechanismSettingType.SLAs:
return { estimatedWaitingTimeQueue: 1, ts: 1 };
return { estimatedWaitingTimeQueue: 1, ts: 1, _updatedAt: -1 };
case OmnichannelSortingMechanismSettingType.Timestamp:
default:
return { ts: 1 };
return { ts: 1, _updatedAt: -1 };
}
};
8 changes: 4 additions & 4 deletions apps/meteor/app/livechat/server/lib/RoutingManager.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { Message } from '@rocket.chat/core-services';
import { Message, Omnichannel } from '@rocket.chat/core-services';
import type {
ILivechatInquiryRecord,
ILivechatVisitor,
Expand Down Expand Up @@ -35,7 +35,7 @@ type Routing = {
methods: Record<string, IRoutingMethod>;
startQueue(): void;
isMethodSet(): boolean;
setMethodNameAndStartQueue(name: string): void;
setMethodNameAndStartQueue(name: string): Promise<void>;
registerMethod(name: string, Method: IRoutingMethodConstructor): void;
getMethod(): IRoutingMethod;
getConfig(): RoutingMethodConfig | undefined;
Expand Down Expand Up @@ -73,7 +73,7 @@ export const RoutingManager: Routing = {
return !!this.methodName;
},

setMethodNameAndStartQueue(name) {
async setMethodNameAndStartQueue(name) {
logger.debug(`Changing default routing method from ${this.methodName} to ${name}`);
if (!this.methods[name]) {
logger.warn(`Cannot change routing method to ${name}. Selected Routing method does not exists. Defaulting to Manual_Selection`);
Expand All @@ -82,7 +82,7 @@ export const RoutingManager: Routing = {
this.methodName = name;
}

this.startQueue();
void (await Omnichannel.getQueueWorker()).shouldStart();
},

// eslint-disable-next-line @typescript-eslint/naming-convention
Expand Down
2 changes: 1 addition & 1 deletion apps/meteor/app/livechat/server/startup.ts
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ Meteor.startup(async () => {
});

settings.watch<string>('Livechat_Routing_Method', (value) => {
RoutingManager.setMethodNameAndStartQueue(value);
void RoutingManager.setMethodNameAndStartQueue(value);
});

// Remove when accounts.onLogout is async
Expand Down
31 changes: 0 additions & 31 deletions apps/meteor/ee/app/livechat-enterprise/server/lib/Helper.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,6 @@ import {
import moment from 'moment';
import type { Document } from 'mongodb';

import { dispatchAgentDelegated } from '../../../../../app/livechat/server/lib/Helper';
import { RoutingManager } from '../../../../../app/livechat/server/lib/RoutingManager';
import { getInquirySortMechanismSetting } from '../../../../../app/livechat/server/lib/settings';
import { settings } from '../../../../../app/settings/server';
import { callbacks } from '../../../../../lib/callbacks';
Expand Down Expand Up @@ -146,35 +144,6 @@ const dispatchWaitingQueueStatus = async (department?: string) => {
// but we don't need to notify _each_ change that takes place, just their final position
export const debouncedDispatchWaitingQueueStatus = memoizeDebounce(dispatchWaitingQueueStatus, 1200);

export const processWaitingQueue = async (department: string | undefined, inquiry: InquiryWithAgentInfo) => {
const queue = department || 'Public';
helperLogger.debug(`Processing items on queue ${queue}`);

helperLogger.debug(`Processing inquiry ${inquiry._id} from queue ${queue}`);
const { defaultAgent } = inquiry;
// TODO: remove this typecast when routing manager becomes TS
const room = (await RoutingManager.delegateInquiry(inquiry, defaultAgent)) as IOmnichannelRoom;

const propagateAgentDelegated = async (rid: string, agentId: string) => {
await dispatchAgentDelegated(rid, agentId);
};

if (room?.servedBy) {
const {
_id: rid,
servedBy: { _id: agentId },
} = room;
helperLogger.debug(`Inquiry ${inquiry._id} taken successfully by agent ${agentId}. Notifying`);
setTimeout(() => {
void propagateAgentDelegated(rid, agentId);
}, 1000);

return true;
}

return false;
};

export const setPredictedVisitorAbandonmentTime = async (room: IOmnichannelRoom) => {
if (
!room.v?.lastMessageTs ||
Expand Down
Original file line number Diff line number Diff line change
@@ -1,11 +1,5 @@
import type {
IOmnichannelBusinessUnit,
IOmnichannelServiceLevelAgreements,
LivechatDepartmentDTO,
InquiryWithAgentInfo,
} from '@rocket.chat/core-typings';
import type { IOmnichannelBusinessUnit, IOmnichannelServiceLevelAgreements, LivechatDepartmentDTO } from '@rocket.chat/core-typings';
import {
LivechatInquiry,
Users,
LivechatDepartment as LivechatDepartmentRaw,
OmnichannelServiceLevelAgreements,
Expand All @@ -17,16 +11,12 @@ import { Match, check } from 'meteor/check';
import { Meteor } from 'meteor/meteor';

import { updateDepartmentAgents } from '../../../../../app/livechat/server/lib/Helper';
import { RoutingManager } from '../../../../../app/livechat/server/lib/RoutingManager';
import { getInquirySortMechanismSetting } from '../../../../../app/livechat/server/lib/settings';
import { settings } from '../../../../../app/settings/server';
import { callbacks } from '../../../../../lib/callbacks';
import { addUserRolesAsync } from '../../../../../server/lib/roles/addUserRoles';
import { removeUserFromRolesAsync } from '../../../../../server/lib/roles/removeUserFromRoles';
import { hasLicense } from '../../../license/server/license';
import { processWaitingQueue, updateSLAInquiries } from './Helper';
import { updateSLAInquiries } from './Helper';
import { removeSLAFromRooms } from './SlaHelper';
import { queueLogger } from './logger';

export const LivechatEnterprise = {
async addMonitor(username: string) {
Expand Down Expand Up @@ -292,116 +282,3 @@ export const LivechatEnterprise = {
return hasLicense('livechat-enterprise') || (await LivechatDepartmentRaw.countTotal()) === 0;
},
};

const DEFAULT_RACE_TIMEOUT = 5000;
let queueDelayTimeout = DEFAULT_RACE_TIMEOUT;

type QueueWorker = {
running: boolean;
queues: (string | undefined)[];
start(): Promise<void>;
stop(): Promise<void>;
getActiveQueues(): Promise<(string | undefined)[]>;
nextQueue(): Promise<string | undefined>;
execute(): Promise<void>;
checkQueue(queue: string | undefined): Promise<void>;
};

const queueWorker: QueueWorker = {
running: false,
queues: [],
async start() {
queueLogger.debug('Starting queue');
if (this.running) {
queueLogger.debug('Queue already running');
return;
}

const activeQueues = await this.getActiveQueues();
queueLogger.debug(`Active queues: ${activeQueues.length}`);

this.running = true;
return this.execute();
},
async stop() {
queueLogger.debug('Stopping queue');
await LivechatInquiry.unlockAll();

this.running = false;
},
async getActiveQueues() {
// undefined = public queue(without department)
return ([undefined] as (undefined | string)[]).concat(await LivechatInquiry.getDistinctQueuedDepartments({}));
},
async nextQueue() {
if (!this.queues.length) {
queueLogger.debug('No more registered queues. Refreshing');
this.queues = await this.getActiveQueues();
}

return this.queues.shift();
},
async execute() {
if (!this.running) {
queueLogger.debug('Queue stopped. Cannot execute');
return;
}

const queue = await this.nextQueue();
queueLogger.debug(`Executing queue ${queue || 'Public'} with timeout of ${queueDelayTimeout}`);

setTimeout(this.checkQueue.bind(this, queue), queueDelayTimeout);
},

async checkQueue(queue) {
queueLogger.debug(`Processing items for queue ${queue || 'Public'}`);
try {
const nextInquiry = await LivechatInquiry.findNextAndLock(getInquirySortMechanismSetting(), queue);
if (!nextInquiry) {
queueLogger.debug(`No more items for queue ${queue || 'Public'}`);
return;
}

const result = await processWaitingQueue(queue, nextInquiry as InquiryWithAgentInfo);

if (!result) {
await LivechatInquiry.unlock(nextInquiry._id);
}
} catch (e) {
queueLogger.error({
msg: `Error processing queue ${queue || 'public'}`,
err: e,
});
} finally {
void this.execute();
}
},
};

let omnichannelIsEnabled = false;
function shouldQueueStart() {
if (!omnichannelIsEnabled) {
void queueWorker.stop();
return;
}

const routingSupportsAutoAssign = RoutingManager.getConfig()?.autoAssignAgent;
queueLogger.debug(
`Routing method ${RoutingManager.methodName} supports auto assignment: ${routingSupportsAutoAssign}. ${
routingSupportsAutoAssign ? 'Starting' : 'Stopping'
} queue`,
);

void (routingSupportsAutoAssign ? queueWorker.start() : queueWorker.stop());
}

RoutingManager.startQueue = shouldQueueStart;

settings.watch<boolean>('Livechat_enabled', (enabled) => {
omnichannelIsEnabled = enabled;
void (omnichannelIsEnabled && RoutingManager.isMethodSet() ? shouldQueueStart() : queueWorker.stop());
});

settings.watch<number>('Omnichannel_queue_delay_timeout', (timeout) => {
queueDelayTimeout = timeout < 1 ? DEFAULT_RACE_TIMEOUT : timeout * 1000;
});
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ import { Logger } from '@rocket.chat/logger';
export const logger = new Logger('LivechatEnterprise');

export const queriesLogger = logger.section('Queries');
export const queueLogger = logger.section('Queue');
export const helperLogger = logger.section('Helper');
export const cbLogger = logger.section('Callbacks');
export const bhLogger = logger.section('Business-Hours');
Expand Down
3 changes: 3 additions & 0 deletions apps/meteor/server/services/omnichannel/logger.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
import { Logger } from '@rocket.chat/logger';

export const queueLogger = new Logger('OmnichannelQueue');
Loading

0 comments on commit 7e0f76b

Please sign in to comment.