diff --git a/lib/librdkafka/NConsumer.js b/lib/librdkafka/NConsumer.js index 41fa522..58bebf4 100644 --- a/lib/librdkafka/NConsumer.js +++ b/lib/librdkafka/NConsumer.js @@ -325,6 +325,8 @@ class NConsumer extends EventEmitter { this._lastReceived = Date.now(); message.value = this._convertMessageValue(message.value, asString, asJSON); + this._convertHeadersValue(message.headers); + if (!this._firstMessageConsumed) { this._firstMessageConsumed = true; super.emit("first-drain-message", message); @@ -455,6 +457,24 @@ class NConsumer extends EventEmitter { return value; } + /** + * @private + * converts the header values to string + * @param {Array} headers + * @returns {void} + */ + _convertHeadersValue(headers) { + if (!headers || headers.length === 0) { + return; + } + + headers.forEach(item => { + Object.keys(item).forEach(key => { + item[key] = item[key] && item[key].toString(MESSAGE_CHARSET); + }); + }); + } + /** * subscribe and start to consume, should be called only once after connection is successfull * options object supports the following fields: @@ -521,6 +541,8 @@ class NConsumer extends EventEmitter { this._lastReceived = Date.now(); message.value = this._convertMessageValue(message.value, asString, asJSON); + this._convertHeadersValue(message.headers); + if (!this._firstMessageConsumed) { this._firstMessageConsumed = true; super.emit("first-drain-message", message); @@ -559,6 +581,8 @@ class NConsumer extends EventEmitter { this.config.logger.debug(message); message.value = this._convertMessageValue(message.value, asString, asJSON); + this._convertHeadersValue(message.headers); + if (!this._firstMessageConsumed) { this._firstMessageConsumed = true; super.emit("first-drain-message", message); @@ -652,6 +676,8 @@ class NConsumer extends EventEmitter { message.value = this._convertMessageValue(message.value, asString, asJSON); + this._convertHeadersValue(message.headers); + if (!this._firstMessageConsumed) { this._firstMessageConsumed = true; super.emit("first-drain-message", message); diff --git a/lib/librdkafka/NProducer.js b/lib/librdkafka/NProducer.js index 318aca1..1eebacb 100644 --- a/lib/librdkafka/NProducer.js +++ b/lib/librdkafka/NProducer.js @@ -282,15 +282,10 @@ class NProducer extends EventEmitter { * @param {number} _partition - optional partition to produce to * @param {string} _key - optional message key * @param {string} _partitionKey - optional key to evaluate partition for this message + * @param {array} _headers - optional array containing custom key value pairs that provide message metadata * @returns {Promise.} */ - async send(topicName, message, _partition = null, _key = null, _partitionKey = null) { - - /* - these are not supported in the HighLevelProducer of node-rdkafka - _opaqueKey = null, - _headers = null, - */ + async send(topicName, message, _partition = null, _key = null, _partitionKey = null, _headers = null) { if (!this.producer) { throw new Error("You must call and await .connect() before trying to produce messages."); @@ -342,7 +337,7 @@ class NProducer extends EventEmitter { this._totalSentMessages++; return new Promise((resolve, reject) => { - this.producer.produce(topicName, partition, message, key, producedAt, (error, offset) => { + this.producer.produce(topicName, partition, message, key, producedAt, _headers, (error, offset) => { if (error) { return reject(error); @@ -366,9 +361,10 @@ class NProducer extends EventEmitter { * @param {number} partition - optional partition to produce to * @param {number} version - optional version of the message value * @param {string} partitionKey - optional key to evaluate partition for this message + * @param {array} headers - optional array containing custom key value pairs that provide message metadata * @returns {Promise.} */ - async buffer(topic, identifier, payload, partition = null, version = null, partitionKey = null) { + async buffer(topic, identifier, payload, partition = null, version = null, partitionKey = null, headers = null) { if (typeof identifier === "undefined") { identifier = uuid.v4(); @@ -390,7 +386,7 @@ class NProducer extends EventEmitter { payload.version = version; } - return await this.send(topic, JSON.stringify(payload), partition, identifier, partitionKey); + return await this.send(topic, JSON.stringify(payload), partition, identifier, partitionKey, headers); } /** @@ -405,9 +401,10 @@ class NProducer extends EventEmitter { * @param {string} partitionKey - optional key to deterministcally detect partition * @param {number} partition - optional partition (overwrites partitionKey) * @param {string} messageType - optional messageType (for the formatted message value) + * @param {array} headers - optional object containing custom key value pairs that provide message metadata * @returns {Promise.} */ - async _sendBufferFormat(topic, identifier, _payload, version = 1, _, partitionKey = null, partition = null, messageType = "") { + async _sendBufferFormat(topic, identifier, _payload, version = 1, _, partitionKey = null, partition = null, messageType = "", headers = null) { if (typeof identifier === "undefined") { identifier = uuid.v4(); @@ -437,15 +434,15 @@ class NProducer extends EventEmitter { type: topic + messageType }; - return await this.send(topic, JSON.stringify(payload), partition, identifier, partitionKey); + return await this.send(topic, JSON.stringify(payload), partition, identifier, partitionKey, headers); } /** * an alias for bufferFormatPublish() * @alias bufferFormatPublish */ - bufferFormat(topic, identifier, payload, version = 1, compressionType = 0, partitionKey = null) { - return this.bufferFormatPublish(topic, identifier, payload, version, compressionType, partitionKey); + bufferFormat(topic, identifier, payload, version = 1, compressionType = 0, partitionKey = null, headers = null) { + return this.bufferFormatPublish(topic, identifier, payload, version, compressionType, partitionKey, headers); } /** @@ -457,10 +454,11 @@ class NProducer extends EventEmitter { * @param {*} _ -ignoreable, here for api compatibility * @param {string} partitionKey - optional key to deterministcally detect partition * @param {number} partition - optional partition (overwrites partitionKey) + * @param {array} headers - optional object containing custom key value pairs that provide message metadata * @returns {Promise.} */ - bufferFormatPublish(topic, identifier, _payload, version = 1, _, partitionKey = null, partition = null) { - return this._sendBufferFormat(topic, identifier, _payload, version, _, partitionKey, partition, MESSAGE_TYPES.PUBLISH); + bufferFormatPublish(topic, identifier, _payload, version = 1, _, partitionKey = null, partition = null, headers = null) { + return this._sendBufferFormat(topic, identifier, _payload, version, _, partitionKey, partition, MESSAGE_TYPES.PUBLISH, headers); } /** @@ -472,10 +470,11 @@ class NProducer extends EventEmitter { * @param {*} _ -ignoreable, here for api compatibility * @param {string} partitionKey - optional key to deterministcally detect partition * @param {number} partition - optional partition (overwrites partitionKey) + * @param {array} headers - optional object containing custom key value pairs that provide message metadata * @returns {Promise.} */ - bufferFormatUpdate(topic, identifier, _payload, version = 1, _, partitionKey = null, partition = null) { - return this._sendBufferFormat(topic, identifier, _payload, version, _, partitionKey, partition, MESSAGE_TYPES.UPDATE); + bufferFormatUpdate(topic, identifier, _payload, version = 1, _, partitionKey = null, partition = null, headers = null) { + return this._sendBufferFormat(topic, identifier, _payload, version, _, partitionKey, partition, MESSAGE_TYPES.UPDATE, headers); } /** @@ -487,10 +486,11 @@ class NProducer extends EventEmitter { * @param {*} _ -ignoreable, here for api compatibility * @param {string} partitionKey - optional key to deterministcally detect partition * @param {number} partition - optional partition (overwrites partitionKey) + * @param {array} headers - optional object containing custom key value pairs that provide message metadata * @returns {Promise.} */ - bufferFormatUnpublish(topic, identifier, _payload, version = 1, _, partitionKey = null, partition = null) { - return this._sendBufferFormat(topic, identifier, _payload, version, _, partitionKey, partition, MESSAGE_TYPES.UNPUBLISH); + bufferFormatUnpublish(topic, identifier, _payload, version = 1, _, partitionKey = null, partition = null, headers = null) { + return this._sendBufferFormat(topic, identifier, _payload, version, _, partitionKey, partition, MESSAGE_TYPES.UNPUBLISH, headers); } /** diff --git a/test/int/NSinek.test.js b/test/int/NSinek.test.js index 15ffba5..46ec95d 100644 --- a/test/int/NSinek.test.js +++ b/test/int/NSinek.test.js @@ -58,6 +58,11 @@ describe("Native Client INT", () => { producer.bufferFormatUpdate(topic, "2", {content: "a message 2"}, 1, null, 0), producer.bufferFormatUnpublish(topic, "3", {content: "a message 3"}, 1, null, 0), producer.send(topic, Buffer.from("a message buffer")), + producer.send(topic, "a message with headers", null, null, null, [{ myCustomKey: "myCustomValue" }]), + producer.bufferFormatPublish(topic, "1", {content: "a message with headers 1"}, 1, null, 0, null, [{ myCustomKey: "myCustomValue 1" }]), + producer.bufferFormatUpdate(topic, "2", {content: "a message with headers 2"}, 1, null, 0, null, [{ myCustomKey: "myCustomValue 2" }]), + producer.bufferFormatUnpublish(topic, "3", {content: "a message with headers 3"}, 1, null, 0, null, [{ myCustomKey: "myCustomValue 3" }]), + producer.send(topic, Buffer.from("a message buffer with headers"), null, null, null, [{ myCustomKey: "myCustomValue buffer" }]), ]; return Promise.all(promises).then((produceResults) => { @@ -89,7 +94,7 @@ describe("Native Client INT", () => { it("should be able to consume messages", done => { // console.log(consumedMessages); - assert.ok(consumedMessages.length >= 5); + assert.ok(consumedMessages.length >= 10); assert.ok(!Buffer.isBuffer(consumedMessages[0].value)); assert.ok(consumedMessages[1].key, "1"); assert.ok(consumedMessages[2].key, "2"); @@ -99,6 +104,12 @@ describe("Native Client INT", () => { assert.equal(JSON.parse(consumedMessages[2].value).payload.content, "a message 2"); assert.equal(JSON.parse(consumedMessages[3].value).payload.content, "a message 3"); assert.equal(consumedMessages[4].value, "a message buffer"); + assert.equal(consumedMessages[5].headers[0].myCustomKey, "myCustomValue"); + assert.equal(consumedMessages[6].headers[0].myCustomKey, "myCustomValue 1"); + assert.equal(consumedMessages[7].headers[0].myCustomKey, "myCustomValue 2"); + assert.equal(consumedMessages[8].headers[0].myCustomKey, "myCustomValue 3"); + assert.equal(consumedMessages[9].headers[0].myCustomKey, "myCustomValue buffer"); + done(); });