diff --git a/consumer/config/constants.js b/consumer/config/constants.js index e9157ee..a386a32 100644 --- a/consumer/config/constants.js +++ b/consumer/config/constants.js @@ -4,5 +4,6 @@ export const EVENT = { PROJECT_DRAFT_CREATED: 'project.draft-created', PROJECT_UPDATED: 'project.updated', PROJECT_DELETED: 'project.deleted', + CONNECT_TO_SF_FAILED: 'connect2sf.failed' }, }; diff --git a/consumer/src/services/ConfigurationService.js b/consumer/src/services/ConfigurationService.js index 55c2ee3..025dcb4 100644 --- a/consumer/src/services/ConfigurationService.js +++ b/consumer/src/services/ConfigurationService.js @@ -24,7 +24,6 @@ class ConfigurationService { }, }, }).promise(); - console.log('node env: ' + process.env.NODE_ENV); if (!result.Items.length) { throw new Error('Configuration for AppXpressConfig not found'); } diff --git a/consumer/src/worker.js b/consumer/src/worker.js index 6ea3af5..7254c35 100644 --- a/consumer/src/worker.js +++ b/consumer/src/worker.js @@ -35,8 +35,9 @@ export function initHandlers(handlers) { * @param {String} exchangeName the exchange name * @param {String} queue the queue name */ -export async function consume(channel, exchangeName, queue) { +export async function consume(channel, exchangeName, queue, publishChannel) { channel.assertExchange(exchangeName, 'topic', { durable: true }); + publishChannel.assertExchange(exchangeName, 'topic', { durable: true }); channel.assertQueue(queue, { durable: true }); const bindings = _.keys(EVENT_HANDLERS); const bindingPromises = _.map(bindings, rk => @@ -73,8 +74,22 @@ export async function consume(channel, exchangeName, queue) { if (e.shouldAck) { channel.ack(msg); } else { - // acking for debugging issue on production. this would prevent log pile up + // ack the message but copy it to other queue where no consumer is listening + // we can listen to that queue on adhoc basis when we see error case like lead not created in SF + // we can use cloudamqp console to check the messages and may be manually create SF lead + // nacking here was causing flood of messages to the worker and it keep on consuming high resources channel.ack(msg); + try { + publishChannel.publish( + exchangeName, + EVENT.ROUTING_KEY.CONNECT_TO_SF_FAILED, + new Buffer(msg.content.toString()) + ); + } catch(e) { + // TODO decide if we want nack the original msg here + // for now just ignoring the error in requeue + logger.logFullError(e, `Error in publising Exchange to ${exchangeName}`); + } } } }); @@ -91,7 +106,13 @@ async function start() { debug('created connection successfully with URL: ' + config.rabbitmqURL); const channel = await connection.createConfirmChannel(); debug('Channel confirmed...'); - consume(channel, config.rabbitmq.projectsExchange, config.rabbitmq.queues.project); + const publishChannel = await connection.createConfirmChannel(); + consume( + channel, + config.rabbitmq.projectsExchange, + config.rabbitmq.queues.project, + publishChannel + ); } catch (e) { debug('Unable to connect to RabbitMQ'); } diff --git a/consumer/test/worker.spec.js b/consumer/test/worker.spec.js index 368a7f8..02b3102 100644 --- a/consumer/test/worker.spec.js +++ b/consumer/test/worker.spec.js @@ -9,7 +9,7 @@ import './setup'; describe('worker', () => { describe('consume', () => { const queueName = 'sample-queue'; - const exchangeName = EVENT.ROUTING_KEY.PROJECT_DRAFT_CREATED//'sample-exchange'; + const exchangeName = 'sample-exchange'; const validMessage = { content: JSON.stringify({ sampleData: 'foo' }), properties: { correlationId : 'unit-tests'}, @@ -24,6 +24,7 @@ describe('worker', () => { let rabbitConsume; let exchangeHandlerSpy = sinon.spy(); let fakeExchangeHandlerSpy = sinon.spy(); + let channelPublishSpy = sinon.spy(); beforeEach(() => { handler = sinon.spy(); @@ -58,7 +59,11 @@ describe('worker', () => { done(e); } }, - }, exchangeName, queueName); + }, exchangeName, queueName, + { + publish: channelPublishSpy, + assertExchange + }); } it('should consume and ack a message successfully', (done) => { @@ -91,7 +96,7 @@ describe('worker', () => { invokeConsume(done); }); - xit('should nack if error is thrown', (done) => { + it('should ack, with message being copied to temp queue, if error is thrown', (done) => { initHandlers({ [exchangeName] : () => { throw new Error('foo'); @@ -99,7 +104,8 @@ describe('worker', () => { }) rabbitConsume = async (queue, fn) => { await fn(validMessage); - nack.should.have.been.calledWith(validMessage); + ack.should.have.been.calledWith(validMessage); + channelPublishSpy.should.have.been.calledWith(exchangeName, EVENT.ROUTING_KEY.CONNECT_TO_SF_FAILED, new Buffer(validMessage.content)); }; invokeConsume(done); });