diff --git a/consumer/.ebextensions/01-environment-variables.config b/consumer/.ebextensions/01-environment-variables.config index aa74b44..610ec7c 100644 --- a/consumer/.ebextensions/01-environment-variables.config +++ b/consumer/.ebextensions/01-environment-variables.config @@ -26,9 +26,15 @@ option_settings: - namespace: aws:elasticbeanstalk:application:environment option_name: RABBITMQ_PROJECTS_EXCHANGE value: dev.projects + - namespace: aws:elasticbeanstalk:application:environment + option_name: RABBITMQ_CONNECT2SF_EXCHANGE + value: dev.tc.connect2sf - namespace: aws:elasticbeanstalk:application:environment option_name: QUEUE_PROJECTS value: dev.project.service + - namespace: aws:elasticbeanstalk:application:environment + option_name: QUEUE_CONNECT2SF + value: dev.tc.connect2sf.exclusive - namespace: aws:elasticbeanstalk:application:environment option_name: IDENTITY_SERVICE_URL value: TBD diff --git a/consumer/Dockerfile b/consumer/Dockerfile index 0823b47..a2c1d26 100644 --- a/consumer/Dockerfile +++ b/consumer/Dockerfile @@ -5,6 +5,8 @@ LABEL description="Topcoder Salesforce Integration" RUN apt-get update && \ apt-get upgrade -y +#RUN apt-get install cron -y + # Create app directory RUN mkdir -p /usr/src/app @@ -15,10 +17,12 @@ COPY . /usr/src/app # Install app dependencies RUN npm install -RUN npm install -g forever +RUN npm install -g forever babel-cli -EXPOSE 80 +#RUN crontab config/scheduler-cron -CMD forever -c "npm start" --uid "consumer" . +#RUN service cron start + +EXPOSE 80 -#CMD npm start \ No newline at end of file +CMD forever -c "npm start" --uid "consumer" . \ No newline at end of file diff --git a/consumer/README.md b/consumer/README.md index 0a93f78..7bacafe 100644 --- a/consumer/README.md +++ b/consumer/README.md @@ -20,7 +20,7 @@ Env variable: `LOG_LEVEL` - **rabbitmqURL** The rabbitmq URL. Create a free account here https://www.cloudamqp.com/ and create a new instance in any region. -You can get URL by clicking on queue details button. +You can get URL by clicking on queue details button. For deployment in AWS, please make sure that this instance is launched in the VPC which target AWS server can communicate with. Env variable: `RABBITMQ_URL` - **ownerId** @@ -86,7 +86,7 @@ You can use the existing cert.pem from `config` directory. Or generate a new certificate and key using a command: `openssl req -newkey rsa:2048 -new -nodes -x509 -days 3650 -keyout key.pem -out cert.pem` -Private key of your certificate is read from environment variable, instead of reading from the config directory. So please make sure you replace all new line characters with `\n` before setting it in the environment variable. Application would add newline characters back to the key when using it to sign the requests. +**Private key of your certificate is read from environment variable, instead of reading from the config directory. So please make sure you replace all new line characters with `\n` before setting it in the environment variable. Application would add newline characters back to the key when using it to sign the requests.** ![Alt text](https://monosnap.com/file/tT9ZZXUH1aa1j7cFzYxaV9RjmHWCum.png) Click Save @@ -224,11 +224,19 @@ Check the Lead details in Saleforce ![Alt text](https://monosnap.com/file/PdMF97k18cBGeZjR9qOkkBe1AjYw2n.png) Lead is removed from the campaign +## Deployment Checklist +1. AppXpressConfig table exists in dynamodb with dripcampaignId +2. Make sure configured rabbitmq exchange and queue are created appropriately in cloumamqp +3. There should be proper mapping between exchange and queue specified in the conifguration +4. Grant permission, with user conifgured, for the app once using url https://login.salesforce.com/services/oauth2/authorize?client_id=[clientId]&redirect_uri=https://login.salesforce.com&response_type=code -Notes on Error Handling. -UnprocessableError is thrown if operation cannot be completed. +## CI +* All changes into dev will be built and deployed to AWS beanstalk environment `tc-connect2sf-dev` +* All changes into master will be built and deployed to AWS beanstalk environment `tc-connect2sf-prod` + +## Notes on Error Handling. +`UnprocessableError` is thrown if operation cannot be completed. For example: duplicated project id added to the queue, Lead cannot be found etc. In such situation, the message from rabbitmq will be marked as ACK (removed). -If we won't remove it from queue, the message will be stuck forever. - - +If we won't remove it from queue, the message will be stuck forever. +For any other type of error the message from the rabbitmq will me marked as ACK as well, however, it would requeued into another queue for later inspection. It right now publishes the message content to the same rabbitmq exchange (configured as mentioned in Configuration section) with routing key being `connect2sf.failed`. So, we have to map the exchange and routing key comibation to a queue to which no consumer is listeting e.g. `tc-connect2sf.failed` is used in dev environment. Now we can see messages, via rabbitmq manager UI, in this queue to check if any of the messages failed and what was id of the project which failed. We can either remove those messages from the queue, if we are going to add those leads manually in saleforce or move them again to the original queue after fixing the deployed environment. diff --git a/consumer/config/constants.js b/consumer/config/constants.js index a386a32..1d4c5af 100644 --- a/consumer/config/constants.js +++ b/consumer/config/constants.js @@ -4,6 +4,6 @@ export const EVENT = { PROJECT_DRAFT_CREATED: 'project.draft-created', PROJECT_UPDATED: 'project.updated', PROJECT_DELETED: 'project.deleted', - CONNECT_TO_SF_FAILED: 'connect2sf.failed' + FAILED_SUFFIX: '.failed' }, }; diff --git a/consumer/config/custom-environment-variables.json b/consumer/config/custom-environment-variables.json index c378910..ce0752e 100644 --- a/consumer/config/custom-environment-variables.json +++ b/consumer/config/custom-environment-variables.json @@ -21,8 +21,10 @@ }, "rabbitmq" : { "projectsExchange" : "RABBITMQ_PROJECTS_EXCHANGE", + "connect2sfExchange" : "RABBITMQ_CONNECT2SF_EXCHANGE", "queues": { - "project": "QUEUE_PROJECTS" + "project": "QUEUE_PROJECTS", + "connect2sf": "QUEUE_CONNECT2SF" } } } diff --git a/consumer/config/scheduler-cron b/consumer/config/scheduler-cron new file mode 100644 index 0000000..d6a8f8f --- /dev/null +++ b/consumer/config/scheduler-cron @@ -0,0 +1 @@ +*/5 * * * * babel-node /usr/src/app/src/scheduled-worker.js diff --git a/consumer/config/test.json b/consumer/config/test.json index 1ed3699..b406cf9 100644 --- a/consumer/config/test.json +++ b/consumer/config/test.json @@ -20,8 +20,10 @@ }, "rabbitmq" : { "projectsExchange" : "dev.projects", + "connect2sfExchange": "dev.tc.connect2sf", "queues": { - "project": "dev.project.service" + "project": "dev.project.service", + "connect2sf": "dev.tc.connect2sf.exclusive" } } } \ No newline at end of file diff --git a/consumer/package.json b/consumer/package.json index f3fa793..64c3dc3 100644 --- a/consumer/package.json +++ b/consumer/package.json @@ -46,6 +46,7 @@ "joi": "^9.0.4", "jsonwebtoken": "^7.1.7", "lodash": "^4.14.2", + "node-cron": "^1.1.3", "superagent": "^2.1.0", "superagent-promise": "^1.1.0", "winston": "^2.2.0" diff --git a/consumer/src/scheduled-worker.js b/consumer/src/scheduled-worker.js new file mode 100644 index 0000000..b82d6eb --- /dev/null +++ b/consumer/src/scheduled-worker.js @@ -0,0 +1,170 @@ +/** + * The main app entry + */ + +import config from 'config'; +import amqp from 'amqplib'; +import _ from 'lodash'; +import logger from './common/logger'; +import ConsumerService from './services/ConsumerService'; +import { EVENT } from '../config/constants'; + +const debug = require('debug')('app:worker'); + +const FETCH_LIMIT = 10; + +let connection; +process.once('SIGINT', () => { + debug('Received SIGINT...closing connection...') + try { + connection.close(); + } catch (ignore) { // eslint-ignore-line + logger.logFullError(ignore) + } + process.exit(); +}); + +let EVENT_HANDLERS = { + [EVENT.ROUTING_KEY.PROJECT_DRAFT_CREATED]: ConsumerService.processProjectCreated + // [EVENT.ROUTING_KEY.PROJECT_UPDATED]: ConsumerService.processProjectUpdated +} + +function close() { + console.log('closing self after processing messages...') + try { + setTimeout(connection.close.bind(connection), 30000); + } catch (ignore) { // eslint-ignore-line + logger.logFullError(ignore) + } +} + +export function initHandlers(handlers) { + EVENT_HANDLERS = handlers; +} + +/** + * Processes the given message and acks/nacks the channel + * @param {Object} channel the target channel + * @param {Object} msg the message to be processed + */ +export function processMessage(channel, msg) { + return new Promise((resolve, reject) => { + if (!msg) { + reject(new Error('Empty message. Ignoring')); + return; + } + debug(`Consuming message in \n${msg.content}`); + const key = _.get(msg, 'fields.routingKey'); + debug('Received Message', key, msg.fields); + + let handler; + let data; + try { + handler = EVENT_HANDLERS[key]; + if (!_.isFunction(handler)) { + logger.error(`Unknown message type: ${key}, NACKing... `); + reject(new Error(`Unknown message type: ${key}`)); + return; + } + data = JSON.parse(msg.content.toString()); + } catch (ignore) { + logger.info(ignore); + logger.error('Invalid message. Ignoring'); + resolve('Invalid message. Ignoring'); + return; + } + return handler(logger, data).then(() => { + resolve(msg); + return; + }) + .catch((e) => { + // logger.logFullError(e, `Error processing message`); + if (e.shouldAck) { + debug("Resolving for Unprocessable Error in handler..."); + resolve(msg); + } else { + debug("Rejecting promise for error in msg processing...") + reject(new Error('Error processing message')); + } + }); + }) +} + +function assertExchangeQueues(channel, exchangeName, queue) { + channel.assertExchange(exchangeName, 'topic', { durable: true }); + channel.assertQueue(queue, { durable: true }); + const bindings = _.keys(EVENT_HANDLERS); + const bindingPromises = _.map(bindings, rk => + channel.bindQueue(queue, exchangeName, rk)); + debug('binding queue ' + queue + ' to exchange: ' + exchangeName); + return Promise.all(bindingPromises); +} + +/** + * Start the worker + */ +export async function start() { + try { + console.log("Scheduled Worker Connecting to RabbitMQ: " + config.rabbitmqURL.substr(-5)); + connection = await amqp.connect(config.rabbitmqURL); + connection.on('error', (e) => { + logger.logFullError(e, `ERROR IN CONNECTION`); + }) + connection.on('close', () => { + debug('Before closing connection...') + }) + debug('created connection successfully with URL: ' + config.rabbitmqURL); + const connect2sfChannel = await connection.createConfirmChannel(); + debug('Channel created for consuming failed messages ...'); + connect2sfChannel.prefetch(FETCH_LIMIT); + assertExchangeQueues( + connect2sfChannel, + config.rabbitmq.connect2sfExchange, + config.rabbitmq.queues.connect2sf + ).then(() => { + debug('Asserted all required exchanges and queues'); + let counter = 0; + _.range(1, 11).forEach(() => { + return connect2sfChannel.get(config.rabbitmq.queues.connect2sf). + then((msg) => { + if (msg) { + return processMessage( + connect2sfChannel, + msg + ).then((responses) => { + counter++; + debug('Processed message'); + connect2sfChannel.ack(msg); + if (counter >= FETCH_LIMIT) { + close(); + } + }).catch((e) => { + counter++; + debug('Processed message with Error'); + connect2sfChannel.nack(msg); + logger.logFullError(e, `Unable to process one of the messages`); + if (counter >= FETCH_LIMIT) { + close(); + } + }) + } else { + counter++; + debug('Processed Empty message'); + if (counter >= FETCH_LIMIT) { + close(); + } + } + }).catch(() => { + console.log('get failed to consume') + }) + }) + }) + + } catch (e) { + logger.logFullError(e, `Unable to connect to RabbitMQ`); + } +} + +if (!module.parent) { + start(); +} diff --git a/consumer/src/services/ConsumerService.js b/consumer/src/services/ConsumerService.js index bf79216..1b76d67 100644 --- a/consumer/src/services/ConsumerService.js +++ b/consumer/src/services/ConsumerService.js @@ -41,46 +41,44 @@ class ConsumerService { * @param {Object} projectEvent the project event */ @logAndValidate(['logger','project'], projectCreatedSchema) - //@logAndValidate(['projectEvent'], {projectEvent: projectEventSchema}) - //async processProjectCreated(projectEvent) { - async processProjectCreated(logger, project) { + processProjectCreated(logger, project) { const member = _.find(project.members, {role: memberRole, isPrimary: true}); if (!member) { throw new UnprocessableError('Cannot find primary customer'); } - const [ - campaignId, - user, - {accessToken, instanceUrl}, - ] = await Promise.all([ + return Promise.all([ ConfigurationService.getSalesforceCampaignId(), IdentityService.getUser(member.userId), SalesforceService.authenticate(), - ]); - - const lead = { - FirstName: user.firstName, - LastName: user.lastName, - Email: user.email, - LeadSource: leadSource, - Company: company, - OwnerId: config.ownerId, - TC_Connect_Project_Id__c: project.id, - }; - let leadId; - try { - leadId = await SalesforceService.createObject('Lead', lead, accessToken, instanceUrl); - } catch (e) { - if (e.response && e.response.text && duplicateRecordRegex.test(e.response.text)) { - throw new UnprocessableError(`Lead already existing for project ${project.id}`); - } - throw e; - } - const campaignMember = { - LeadId: leadId, - CampaignId: campaignId, - }; - await SalesforceService.createObject('CampaignMember', campaignMember, accessToken, instanceUrl); + ]).then((responses) => { + const campaignId = responses[0]; + const user = responses[1]; + const { accessToken, instanceUrl } = responses[2]; + const lead = { + FirstName: user.firstName, + LastName: user.lastName, + Email: user.email, + LeadSource: leadSource, + Company: company, + OwnerId: config.ownerId, + TC_Connect_Project_Id__c: project.id, + }; + return SalesforceService.createObject('Lead', lead, accessToken, instanceUrl) + .then((leadId) => { + const campaignMember = { + LeadId: leadId, + CampaignId: campaignId, + }; + return SalesforceService.createObject('CampaignMember', campaignMember, accessToken, instanceUrl); + }).catch( (e) => { + if (e.response && e.response.text && duplicateRecordRegex.test(e.response.text)) { + throw new UnprocessableError(`Lead already existing for project ${project.id}`); + } + throw e; + }) + }).catch((error) => { + throw error; + }); } /** @@ -88,27 +86,44 @@ class ConsumerService { * @param {Object} projectEvent the project */ @logAndValidate(['logger', 'projectEvent'], projectUpdatedSchema) - async processProjectUpdated(logger, projectEvent) { + processProjectUpdated(logger, projectEvent) { logger.debug(projectEvent) var project = projectEvent.original; - const [ - campaignId, - {accessToken, instanceUrl}, - ] = await Promise.all([ + return Promise.all([ ConfigurationService.getSalesforceCampaignId(), SalesforceService.authenticate(), - ]); - let sql = `SELECT id FROM Lead WHERE TC_Connect_Project_Id__c = '${project.id}'`; - const {records: [lead]} = await SalesforceService.query(sql, accessToken, instanceUrl); - if (!lead) { - throw new UnprocessableError(`Cannot find Lead with TC_Connect_Project_Id__c = '${project.id}'`); - } - sql = `SELECT id FROM CampaignMember WHERE LeadId = '${lead.Id}' AND CampaignId ='${campaignId}'`; - const {records: [member]} = await SalesforceService.query(sql, accessToken, instanceUrl); - if (!member) { - throw new UnprocessableError(`Cannot find CampaignMember for Lead.TC_Connect_Project_Id__c = '${project.id}'`); - } - await SalesforceService.deleteObject('CampaignMember', member.Id, accessToken, instanceUrl); + ]).then((responses) => { + const campaignId = responses[0]; + const { accessToken, instanceUrl } = responses[1]; + // queries existing lead for the project + let sql = `SELECT id FROM Lead WHERE TC_Connect_Project_Id__c = '${project.id}'`; + return SalesforceService.query(sql, accessToken, instanceUrl) + .then((response) => { + const {records: [lead]} = response; + if (!lead) { + throw new UnprocessableError(`Cannot find Lead with TC_Connect_Project_Id__c = '${project.id}'`); + } + sql = `SELECT id FROM CampaignMember WHERE LeadId = '${lead.Id}' AND CampaignId ='${campaignId}'`; + return SalesforceService.query(sql, accessToken, instanceUrl) + .then((response) => { + const {records: [member]} = response; + if (!member) { + throw new UnprocessableError(`Cannot find CampaignMember for Lead.TC_Connect_Project_Id__c = '${project.id}'`); + } + return SalesforceService.deleteObject('CampaignMember', member.Id, accessToken, instanceUrl); + }) + }) + // const {records: [lead]} = await SalesforceService.query(sql, accessToken, instanceUrl); + // if (!lead) { + // throw new UnprocessableError(`Cannot find Lead with TC_Connect_Project_Id__c = '${project.id}'`); + // } + // sql = `SELECT id FROM CampaignMember WHERE LeadId = '${lead.Id}' AND CampaignId ='${campaignId}'`; + // const {records: [member]} = await SalesforceService.query(sql, accessToken, instanceUrl); + // if (!member) { + // throw new UnprocessableError(`Cannot find CampaignMember for Lead.TC_Connect_Project_Id__c = '${project.id}'`); + // } + // await SalesforceService.deleteObject('CampaignMember', member.Id, accessToken, instanceUrl); + }); } } diff --git a/consumer/src/worker.js b/consumer/src/worker.js index 7254c35..959465c 100644 --- a/consumer/src/worker.js +++ b/consumer/src/worker.js @@ -8,6 +8,8 @@ import _ from 'lodash'; import logger from './common/logger'; import ConsumerService from './services/ConsumerService'; import { EVENT } from '../config/constants'; +import cron from 'node-cron'; +import { start as scheduleStart } from './scheduled-worker' const debug = require('debug')('app:worker'); @@ -81,8 +83,9 @@ export async function consume(channel, exchangeName, queue, publishChannel) { channel.ack(msg); try { publishChannel.publish( - exchangeName, - EVENT.ROUTING_KEY.CONNECT_TO_SF_FAILED, + config.rabbitmq.connect2sfExchange, + // key + EVENT.ROUTING_KEY.FAILED_SUFFIX, + key, new Buffer(msg.content.toString()) ); } catch(e) { @@ -101,12 +104,13 @@ export async function consume(channel, exchangeName, queue, publishChannel) { */ async function start() { try { - console.log(config.rabbitmqURL); + console.log("Worker Connecting to RabbitMQ: " + config.rabbitmqURL.substr(-5)); connection = await amqp.connect(config.rabbitmqURL); debug('created connection successfully with URL: ' + config.rabbitmqURL); const channel = await connection.createConfirmChannel(); - debug('Channel confirmed...'); + debug('Channel created for projects exchange ...'); const publishChannel = await connection.createConfirmChannel(); + debug('Channel created for publishing failed messages ...'); consume( channel, config.rabbitmq.projectsExchange, @@ -120,4 +124,8 @@ async function start() { if (!module.parent) { start(); + + cron.schedule('*/1 * * * *', function(){ + scheduleStart(); + }); } diff --git a/consumer/test/ConsumerService.spec.js b/consumer/test/ConsumerService.spec.js index 6be7f89..db51476 100644 --- a/consumer/test/ConsumerService.spec.js +++ b/consumer/test/ConsumerService.spec.js @@ -95,8 +95,14 @@ describe('ConsumerService', () => { id: 1, members: [], }; - await expect(ConsumerService.processProjectCreated(logger, projectWihoutMembers)) - .to.be.rejectedWith(UnprocessableError, /Cannot find primary customer/); + try { + ConsumerService.processProjectCreated(logger, projectWihoutMembers); + sinon.fail('Should be rejected'); + } catch(err) { + expect(err).to.exist + .and.be.instanceof(UnprocessableError) + .and.have.property('message').and.match(/Cannot find primary customer/); + } }); it('should throw UnprocessableError if Lead already exists', async() => { @@ -108,7 +114,7 @@ describe('ConsumerService', () => { }; throw err; }); - await expect(ConsumerService.processProjectCreated(logger,project)) + return expect(ConsumerService.processProjectCreated(logger,project)) .to.be.rejectedWith(UnprocessableError, /Lead already existing for project 1/); createObjectStub.should.have.been.called; }); diff --git a/consumer/test/scheduled-worker.spec.js b/consumer/test/scheduled-worker.spec.js new file mode 100644 index 0000000..4882c35 --- /dev/null +++ b/consumer/test/scheduled-worker.spec.js @@ -0,0 +1,134 @@ +/** + * Unit tests for worker + */ +import {processMessage, initHandlers} from '../src/scheduled-worker'; +import {UnprocessableError} from '../src/common/errors'; +import { EVENT } from '../config/constants'; +import config from 'config'; +import './setup'; +import _ from 'lodash'; + +describe('scheduled-worker', () => { + describe('consume', () => { + const queueName = 'sample-queue'; + const exchangeName = 'sample-exchange'; + const validMessage = { + content: JSON.stringify({ sampleData: 'foo' }), + properties: { correlationId : 'unit-tests'}, + fields: { routingKey: exchangeName } + }; + let handler; + let ack; + let nack; + let assertQueue; + let assertExchange; + let bindQueue; + let rabbitGet; + let exchangeHandlerSpy = sinon.spy(); + let fakeExchangeHandlerSpy = sinon.spy(); + let channelPublishSpy = sinon.spy(); + + beforeEach(() => { + handler = sinon.spy(); + ack = sinon.spy(); + nack = sinon.spy(); + assertQueue = sinon.spy(); + assertExchange = sinon.spy(); + bindQueue = sinon.spy(); + + initHandlers({ + [exchangeName] : (logger, data) => new Promise((resolve, reject) => { + if (data.rejectWithError) { + reject(new Error()); + } else if (data.rejectWithUnprocessableError) { + reject(new UnprocessableError()); + } else { + resolve(data); + } + }), + 'fakeExchange' : fakeExchangeHandlerSpy + }) + }); + + /** + * Invoke the worker consume method using current parameters + * @param done the mocha done function + */ + function invokeProcessMessages(message, done) { + return processMessage({ + ack, + nack, + assertExchange, + bindQueue, + assertQueue, + }, message); + } + + it('should process and ack a message successfully', () => { + invokeProcessMessages(validMessage).then(() => { + ack.should.have.been.calledWith(validMessage); + nack.should.not.have.been.called; + }).catch(() => { + sinon.fail('should not fail'); + }); + }); + + it('should ignore an empty msg', () => { + invokeProcessMessages(null).then(()=> { + sinon.fail('should not scucced'); + }).catch(() => { + ack.should.not.have.been.called; + nack.should.not.have.been.called; + }); + }); + + it('should ignore an false msg', () => { + invokeProcessMessages(false).then(()=> { + sinon.fail('should not scucced'); + }).catch(() => { + ack.should.not.have.been.called; + nack.should.not.have.been.called; + }); + }); + + it('should ack a message with invalid JSON', () => { + const msg = { content: 'foo', fields: { routingKey: exchangeName } }; + invokeProcessMessages(msg).then(()=> { + sinon.fail('should not scucced'); + }).catch(() => { + ack.should.have.been.calledWith(msg); + nack.should.not.have.been.called; + }); + }); + + it('should nack, if error is thrown', () => { + const msg = { + content: JSON.stringify({ sampleData: 'foo', rejectWithError: true }), + properties: { correlationId : 'unit-tests'}, + fields: { routingKey: exchangeName } + }; + invokeProcessMessages(msg).then(() => { + sinon.fail('should not scucced'); + }) + .catch(() => { + ack.should.not.have.been.calledWith; + nack.should.have.been.calledWith(msg); + }); + }); + + it('should ack if error is UnprocessableError', () => { + const msg = { + content: JSON.stringify({ sampleData: 'foo', rejectWithUnprocessableError : true }), + properties: { correlationId : 'unit-tests'}, + fields: { routingKey: exchangeName } + }; + invokeProcessMessages(msg).then(() => { + sinon.fail('should not scucced'); + }) + .catch(() => { + ack.should.have.been.calledWith(msg); + nack.should.not.have.been.calledWith; + }); + }); + }); +}); diff --git a/consumer/test/worker.spec.js b/consumer/test/worker.spec.js index 02b3102..2552e6c 100644 --- a/consumer/test/worker.spec.js +++ b/consumer/test/worker.spec.js @@ -4,6 +4,7 @@ import {consume, initHandlers} from '../src/worker'; import {UnprocessableError} from '../src/common/errors'; import { EVENT } from '../config/constants'; +import config from 'config'; import './setup'; describe('worker', () => { @@ -89,9 +90,10 @@ describe('worker', () => { it('should ack a message with invalid JSON', (done) => { rabbitConsume = async (queue, fn) => { - const msg = { content: 'foo' }; + const msg = { content: 'foo', fields: { routingKey : exchangeName } }; await fn(msg); ack.should.have.been.calledWith(msg); + nack.should.not.have.been.called; }; invokeConsume(done); }); @@ -105,7 +107,9 @@ describe('worker', () => { rabbitConsume = async (queue, fn) => { await fn(validMessage); ack.should.have.been.calledWith(validMessage); - channelPublishSpy.should.have.been.calledWith(exchangeName, EVENT.ROUTING_KEY.CONNECT_TO_SF_FAILED, new Buffer(validMessage.content)); + const connect2sfExchange = config.rabbitmq.connect2sfExchange; + const failedRoutingKey = validMessage.fields.routingKey;// + EVENT.ROUTING_KEY.FAILED_SUFFIX; + channelPublishSpy.should.have.been.calledWith(connect2sfExchange, failedRoutingKey, new Buffer(validMessage.content)); }; invokeConsume(done); });