From 6a8e0842f67bf5d4e33abb1740e24275e62f5aba Mon Sep 17 00:00:00 2001 From: h-kanazawa Date: Fri, 20 Mar 2020 16:07:05 +0000 Subject: [PATCH] feat(Worker): Allows subset of workers to reduce processing (LLC-9) https://github.com/LearningLocker/learninglocker/pull/1418 https://learningpool.atlassian.net/browse/LLC-9 --- .env.example | 11 ++++ .../handlers/statement/allowedWorkerQueues.js | 51 +++++++++++++++++++ worker/src/handlers/statement/index.js | 32 +++++++----- .../handlers/statement/statementHandler.js | 5 +- 4 files changed, 86 insertions(+), 13 deletions(-) create mode 100644 worker/src/handlers/statement/allowedWorkerQueues.js diff --git a/.env.example b/.env.example index b17e7096df..8332937dd0 100644 --- a/.env.example +++ b/.env.example @@ -181,6 +181,17 @@ QUEUE_NAMESPACE=DEV #PUBSUB_GOOGLE_CLOUD_PROJECT_ID= #PUBSUB_GOOGLE_CLOUD_SUBSCRIPTION_NAME=ll +# Allowed worker queue list +# Available queue names +# STATEMENT_QUERYBUILDERCACHE_QUEUE +# STATEMENT_PERSON_QUEUE +# STATEMENT_FORWARDING_QUEUE +# Separate them with comma. +# +# Not defining this variable would run all queues +# Having it empty string would run no queues +# ALLOWED_WORKER_QUEUES=STATEMENT_FORWARDING_QUEUE,STATEMENT_QUERYBUILDERCACHE_QUEUE + # Azure service bus endpoint, required if QUEUE_PROVIDER=SERVICE_BUS # SERVICE_BUS_ENDPOINT='Endpoint=sb://[namespace].servicebus.windows.net/;SharedAccessKeyName=[keyName];SharedAccessKey=[key]' diff --git a/worker/src/handlers/statement/allowedWorkerQueues.js b/worker/src/handlers/statement/allowedWorkerQueues.js new file mode 100644 index 0000000000..fd9814208c --- /dev/null +++ b/worker/src/handlers/statement/allowedWorkerQueues.js @@ -0,0 +1,51 @@ +import logger from 'lib/logger'; +import { + STATEMENT_QUERYBUILDERCACHE_QUEUE, + STATEMENT_EXTRACT_PERSONAS_QUEUE, + STATEMENT_FORWARDING_QUEUE, +} from 'lib/constants/statements'; + +/** + * allowable Worker Queues + * + * @type {string[]} + */ +const allowableWorkerQueues = [ + STATEMENT_QUERYBUILDERCACHE_QUEUE, + STATEMENT_EXTRACT_PERSONAS_QUEUE, + STATEMENT_FORWARDING_QUEUE, +]; + +/** + * @param {string | undefined} allowedWorkerQueuesString + * @return {string[]} + * @throws {Error} + */ +const getAllowedWorkerQueues = (allowedWorkerQueuesString) => { + if (allowedWorkerQueuesString === undefined) { + return allowableWorkerQueues; + } + + if (allowedWorkerQueuesString === '') { + return []; + } + + return allowedWorkerQueuesString.split(',').reduce( + (acc, queueString) => { + if (allowableWorkerQueues.includes(queueString)) { + return acc.concat([queueString]); + } + logger.warn(`"${queueString}" is ignored as an allowed worker queue. Allowable worker queues are ${allowableWorkerQueues.map(q => `"${q}"`).join(', ')}`); + return acc; + }, + [], + ); +}; + +/** + * @type {string[]} + */ +const allowedWorkerQueues = getAllowedWorkerQueues(process.env.ALLOWED_WORKER_QUEUES); + +/* eslint-disable import/prefer-default-export*/ +export const isAllowedWorkerQueue = queueName => allowedWorkerQueues.includes(queueName); diff --git a/worker/src/handlers/statement/index.js b/worker/src/handlers/statement/index.js index cf2a1e92b7..28b784825d 100644 --- a/worker/src/handlers/statement/index.js +++ b/worker/src/handlers/statement/index.js @@ -22,6 +22,8 @@ import { STATEMENT_FORWARDING_DEADLETTER_QUEUE } from 'lib/constants/statements'; +import { isAllowedWorkerQueue } from './allowedWorkerQueues'; + const defaultHandleResponse = queueName => (err) => { if (err) logger.error(`ERROR SUBSCRIBING TO QUEUE ${queueName}`, err); return err; @@ -46,20 +48,26 @@ export default ( onProcessed: statementHandlerProcessed }, handleResponse(STATEMENT_QUEUE)); - Queue.subscribe({ - queueName: STATEMENT_EXTRACT_PERSONAS_QUEUE, - handler: extractPersonasHandler(personaService) - }, handleResponse(STATEMENT_EXTRACT_PERSONAS_QUEUE)); + if (isAllowedWorkerQueue(STATEMENT_EXTRACT_PERSONAS_QUEUE)) { + Queue.subscribe({ + queueName: STATEMENT_EXTRACT_PERSONAS_QUEUE, + handler: extractPersonasHandler(personaService) + }, handleResponse(STATEMENT_EXTRACT_PERSONAS_QUEUE)); + } - Queue.subscribe({ - queueName: STATEMENT_QUERYBUILDERCACHE_QUEUE, - handler: queryBuilderCacheHandler - }, handleResponse(STATEMENT_QUERYBUILDERCACHE_QUEUE)); + if (isAllowedWorkerQueue(STATEMENT_QUERYBUILDERCACHE_QUEUE)) { + Queue.subscribe({ + queueName: STATEMENT_QUERYBUILDERCACHE_QUEUE, + handler: queryBuilderCacheHandler + }, handleResponse(STATEMENT_QUERYBUILDERCACHE_QUEUE)); + } - Queue.subscribe({ - queueName: STATEMENT_FORWARDING_QUEUE, - handler: statementForwardingHandler - }, handleResponse(STATEMENT_FORWARDING_QUEUE)); + if (isAllowedWorkerQueue(STATEMENT_FORWARDING_QUEUE)) { + Queue.subscribe({ + queueName: STATEMENT_FORWARDING_QUEUE, + handler: statementForwardingHandler + }, handleResponse(STATEMENT_FORWARDING_QUEUE)); + } Queue.subscribe({ queueName: STATEMENT_FORWARDING_REQUEST_QUEUE, diff --git a/worker/src/handlers/statement/statementHandler.js b/worker/src/handlers/statement/statementHandler.js index 50d41672df..64dfba78e5 100644 --- a/worker/src/handlers/statement/statementHandler.js +++ b/worker/src/handlers/statement/statementHandler.js @@ -17,6 +17,7 @@ import { STATEMENT_EXTRACT_PERSONAS_QUEUE, STATEMENT_FORWARDING_QUEUE } from 'lib/constants/statements'; +import { isAllowedWorkerQueue } from './allowedWorkerQueues'; const queueDependencies = { [STATEMENT_QUERYBUILDERCACHE_QUEUE]: { @@ -49,8 +50,10 @@ export const addStatementToPendingQueues = (statement, passedQueues, done) => { const queueCompleted = includes(completedQueues, queueName); // or is this queue in the queues being processed? const queueProcessing = includes(processingQueues, queueName); + // or is an allowed queue? + const isAllowed = isAllowedWorkerQueue(queueName); - return !preReqsCompleted || queueCompleted || queueProcessing; + return !preReqsCompleted || queueCompleted || queueProcessing || !isAllowed; }); return Statement.updateOne(