From d5010d9fab76102c6c4eee307031563e59416539 Mon Sep 17 00:00:00 2001 From: ashwinismath Date: Wed, 12 Dec 2018 12:40:35 +0530 Subject: [PATCH 1/4] Update group_consumer.js --- lib/group_consumer.js | 37 +++++++++++++++++++++++++++++++------ 1 file changed, 31 insertions(+), 6 deletions(-) diff --git a/lib/group_consumer.js b/lib/group_consumer.js index 483a085..80970b6 100644 --- a/lib/group_consumer.js +++ b/lib/group_consumer.js @@ -77,8 +77,11 @@ GroupConsumer.prototype.init = function (strategies) { self.strategies[s.name] = s; self.topics = self.topics.concat(s.subscriptions); }); - - return self._fullRejoin(); + + return self._fullRejoin() + .catch(function (err) { + throw err; + }); }); }; @@ -96,7 +99,12 @@ GroupConsumer.prototype._joinGroup = function () { return Promise.delay(1000).then(function () { return _tryJoinGroup(++attempt); }); - }); + }) + .catch(function () { + return Promise.delay(1000).then(function () { + return _tryJoinGroup(++attempt); + }); + }) }()) .then(function (response) { if (self.memberId) { @@ -110,7 +118,13 @@ GroupConsumer.prototype._joinGroup = function () { self.generationId = response.generationId; self.members = response.members; self.strategyName = response.groupProtocol; - }); + }) + .catch(function (err) { + if (err.length === 1) { + throw err[0]; + } + throw err; + }); }; GroupConsumer.prototype._syncGroup = function () { @@ -189,17 +203,28 @@ GroupConsumer.prototype._rejoin = function () { GroupConsumer.prototype._fullRejoin = function () { var self = this; + var fullRejoinAttempt = fullRejoinAttempt || 1; return (function _tryFullRejoin() { self.memberId = null; return self.client.updateGroupCoordinator(self.options.groupId).then(function () { return self._joinGroup().then(function () { // join group return self._rejoin(); // rejoin and sync with received memberId - }); + }).catch(function (err) { + return Promise.reject(err); + }); }) .catch(function (err) { self.client.error('Full rejoin attempt failed:', err); - return Promise.delay(1000).then(_tryFullRejoin); + fullRejoinAttempt++; + if (err.message == "Failed to join the group: GroupCoordinatorNotAvailable" && fullRejoinAttempt > 3) { + if (err.length === 1) { + throw err[0]; + } + throw err; + } else { + return Promise.delay(1000).then(_tryFullRejoin); + } }); }()) .tap(function () { From 888c5f8940d0f87e95acb248aa590b0c2bd0c020 Mon Sep 17 00:00:00 2001 From: ashwinismath Date: Wed, 12 Dec 2018 12:44:36 +0530 Subject: [PATCH 2/4] Update client.js --- lib/client.js | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/lib/client.js b/lib/client.js index 4d593a9..cdb612d 100644 --- a/lib/client.js +++ b/lib/client.js @@ -650,6 +650,13 @@ Client.prototype.joinConsumerGroupRequest = function (groupId, memberId, session } return result; }); + }).catch(function(err){ + if(err[0].code == 'GroupAuthorizationFailed' || err[0].code == 'GroupCoordinatorNotAvailable'){ + if (err.length === 1) { + throw err[0]; + } + throw err; + } }); }; From 9b370388795fa24e9650a4e02fd741356fc666b6 Mon Sep 17 00:00:00 2001 From: ashwinismath Date: Wed, 12 Dec 2018 15:03:44 +0530 Subject: [PATCH 3/4] Update client.js --- lib/client.js | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/lib/client.js b/lib/client.js index cdb612d..bbb117e 100644 --- a/lib/client.js +++ b/lib/client.js @@ -650,8 +650,8 @@ Client.prototype.joinConsumerGroupRequest = function (groupId, memberId, session } return result; }); - }).catch(function(err){ - if(err[0].code == 'GroupAuthorizationFailed' || err[0].code == 'GroupCoordinatorNotAvailable'){ + }).catch(function (err) { + if (err[0].code === 'GroupAuthorizationFailed' || err[0].code === 'GroupCoordinatorNotAvailable') { if (err.length === 1) { throw err[0]; } From b84427b7769db15759865856436d6c3067fa53b1 Mon Sep 17 00:00:00 2001 From: ashwinismath Date: Wed, 12 Dec 2018 15:05:08 +0530 Subject: [PATCH 4/4] Update group_consumer.js --- lib/group_consumer.js | 195 +++++++++++++++++++++--------------------- 1 file changed, 97 insertions(+), 98 deletions(-) diff --git a/lib/group_consumer.js b/lib/group_consumer.js index 80970b6..9ff8260 100644 --- a/lib/group_consumer.js +++ b/lib/group_consumer.js @@ -1,11 +1,11 @@ 'use strict'; -var Promise = require('./bluebird-configured'); -var _ = require('lodash'); +var Promise = require('./bluebird-configured'); +var _ = require('lodash'); var BaseConsumer = require('./base_consumer'); -var Kafka = require('./index'); -var util = require('util'); -var errors = require('./errors'); +var Kafka = require('./index'); +var util = require('util'); +var errors = require('./errors'); function GroupConsumer(options) { this.options = _.defaultsDeep(options || {}, { @@ -77,8 +77,7 @@ GroupConsumer.prototype.init = function (strategies) { self.strategies[s.name] = s; self.topics = self.topics.concat(s.subscriptions); }); - - return self._fullRejoin() + return self._fullRejoin() .catch(function (err) { throw err; }); @@ -95,36 +94,36 @@ GroupConsumer.prototype._joinGroup = function () { } return self.client.joinConsumerGroupRequest(self.options.groupId, self.memberId, self.options.sessionTimeout, _.values(self.strategies)) - .catch({ code: 'GroupCoordinatorNotAvailable' }, function () { - return Promise.delay(1000).then(function () { - return _tryJoinGroup(++attempt); + .catch({ code: 'GroupCoordinatorNotAvailable' }, function () { + return Promise.delay(1000).then(function () { + return _tryJoinGroup(++attempt); + }); + }) + .catch(function () { + return Promise.delay(1000).then(function () { + return _tryJoinGroup(++attempt); + }); }); - }) - .catch(function () { - return Promise.delay(1000).then(function () { - return _tryJoinGroup(++attempt); - }); - }) }()) - .then(function (response) { - if (self.memberId) { - self.client.log('Joined group', self.options.groupId, 'generationId', response.generationId, 'as', response.memberId); - if (response.memberId === response.leaderId) { - self.client.log('Elected as group leader'); + .then(function (response) { + if (self.memberId) { + self.client.log('Joined group', self.options.groupId, 'generationId', response.generationId, 'as', response.memberId); + if (response.memberId === response.leaderId) { + self.client.log('Elected as group leader'); + } } - } - self.memberId = response.memberId; - self.leaderId = response.leaderId; - self.generationId = response.generationId; - self.members = response.members; - self.strategyName = response.groupProtocol; - }) - .catch(function (err) { - if (err.length === 1) { - throw err[0]; - } - throw err; - }); + self.memberId = response.memberId; + self.leaderId = response.leaderId; + self.generationId = response.generationId; + self.members = response.members; + self.strategyName = response.groupProtocol; + }) + .catch(function (err) { + if (err.length === 1) { + throw err[0]; + } + throw err; + }); }; GroupConsumer.prototype._syncGroup = function () { @@ -155,29 +154,29 @@ GroupConsumer.prototype._syncGroup = function () { } return []; }) - .then(function (result) { - var assignments = _(result).groupBy('memberId').mapValues(function (mv, mk) { - return { - memberId: mk, - memberAssignment: { - version: 0, - metadata: null, - partitionAssignment: _(mv).groupBy('topic').map(function (tv, tk) { - return { - topic: tk, - partitions: _.map(tv, 'partition') - }; - }).value() - } - }; - }).values().value(); + .then(function (result) { + var assignments = _(result).groupBy('memberId').mapValues(function (mv, mk) { + return { + memberId: mk, + memberAssignment: { + version: 0, + metadata: null, + partitionAssignment: _(mv).groupBy('topic').map(function (tv, tk) { + return { + topic: tk, + partitions: _.map(tv, 'partition') + }; + }).value() + } + }; + }).values().value(); - // console.log(require('util').inspect(assignments, true, 10, true)); - return self.client.syncConsumerGroupRequest(self.options.groupId, self.memberId, self.generationId, assignments); - }) - .then(function (response) { - return self._updateSubscriptions(_.get(response, 'memberAssignment.partitionAssignment', [])); - }); + // console.log(require('util').inspect(assignments, true, 10, true)); + return self.client.syncConsumerGroupRequest(self.options.groupId, self.memberId, self.generationId, assignments); + }) + .then(function (response) { + return self._updateSubscriptions(_.get(response, 'memberAssignment.partitionAssignment', [])); + }); }; GroupConsumer.prototype._rejoin = function () { @@ -193,17 +192,17 @@ GroupConsumer.prototype._rejoin = function () { return self._joinGroup().then(function () { return self._syncGroup(); }) - .catch({ code: 'RebalanceInProgress' }, function () { - return Promise.delay(1000).then(function () { - return _tryRebalance(++attempt); + .catch({ code: 'RebalanceInProgress' }, function () { + return Promise.delay(1000).then(function () { + return _tryRebalance(++attempt); + }); }); - }); }()); }; GroupConsumer.prototype._fullRejoin = function () { var self = this; - var fullRejoinAttempt = fullRejoinAttempt || 1; + var fullRejoinAttempt = 1; return (function _tryFullRejoin() { self.memberId = null; @@ -212,49 +211,49 @@ GroupConsumer.prototype._fullRejoin = function () { return self._rejoin(); // rejoin and sync with received memberId }).catch(function (err) { return Promise.reject(err); - }); + }); }) - .catch(function (err) { - self.client.error('Full rejoin attempt failed:', err); - fullRejoinAttempt++; - if (err.message == "Failed to join the group: GroupCoordinatorNotAvailable" && fullRejoinAttempt > 3) { - if (err.length === 1) { - throw err[0]; - } - throw err; - } else { - return Promise.delay(1000).then(_tryFullRejoin); - } - }); + .catch(function (err) { + self.client.error('Full rejoin attempt failed:', err); + fullRejoinAttempt++; + if (err.message === 'Failed to join the group: GroupCoordinatorNotAvailable' && fullRejoinAttempt > 3) { + if (err.length === 1) { + throw err[0]; + } + throw err; + } else { + return Promise.delay(1000).then(_tryFullRejoin); + } + }); }()) - .tap(function () { - self._heartbeatPromise = self._heartbeat(); // start sending heartbeats - return null; - }); + .tap(function () { + self._heartbeatPromise = self._heartbeat(); // start sending heartbeats + return null; + }); }; GroupConsumer.prototype._heartbeat = function () { var self = this; return self.client.heartbeatRequest(self.options.groupId, self.memberId, self.generationId) - .catch({ code: 'RebalanceInProgress' }, function () { - // new group member has joined or existing member has left - self.client.log('Rejoining group on RebalanceInProgress'); - return self._rejoin(); - }) - .tap(function () { - self._heartbeatTimeout = setTimeout(function () { - self._heartbeatPromise = self._heartbeat(); - }, self.options.heartbeatTimeout); - }) - .catch(function (err) { - // some severe error, such as GroupCoordinatorNotAvailable or network error - // in this case we should start trying to rejoin from scratch - self.client.error('Sending heartbeat failed: ', err); - return self._fullRejoin().catch(function (_err) { - self.client.error(_err); + .catch({ code: 'RebalanceInProgress' }, function () { + // new group member has joined or existing member has left + self.client.log('Rejoining group on RebalanceInProgress'); + return self._rejoin(); + }) + .tap(function () { + self._heartbeatTimeout = setTimeout(function () { + self._heartbeatPromise = self._heartbeat(); + }, self.options.heartbeatTimeout); + }) + .catch(function (err) { + // some severe error, such as GroupCoordinatorNotAvailable or network error + // in this case we should start trying to rejoin from scratch + self.client.error('Sending heartbeat failed: ', err); + return self._fullRejoin().catch(function (_err) { + self.client.error(_err); + }); }); - }); }; /** @@ -271,9 +270,9 @@ GroupConsumer.prototype.end = function () { clearTimeout(self._heartbeatTimeout); return self.client.leaveGroupRequest(self.options.groupId, self.memberId) - .then(function () { - return BaseConsumer.prototype.end.call(self); - }); + .then(function () { + return BaseConsumer.prototype.end.call(self); + }); }; GroupConsumer.prototype._prepareOffsetRequest = function (type, commits) {