Skip to content

Commit

Permalink
Merge pull request #151 from alkiskal/custom-headers
Browse files Browse the repository at this point in the history
add headers support
  • Loading branch information
krystianity authored Apr 19, 2020
2 parents df0e872 + 91db408 commit a90ba6c
Show file tree
Hide file tree
Showing 3 changed files with 58 additions and 21 deletions.
26 changes: 26 additions & 0 deletions lib/librdkafka/NConsumer.js
Original file line number Diff line number Diff line change
Expand Up @@ -325,6 +325,8 @@ class NConsumer extends EventEmitter {
this._lastReceived = Date.now();
message.value = this._convertMessageValue(message.value, asString, asJSON);

this._convertHeadersValue(message.headers);

if (!this._firstMessageConsumed) {
this._firstMessageConsumed = true;
super.emit("first-drain-message", message);
Expand Down Expand Up @@ -455,6 +457,24 @@ class NConsumer extends EventEmitter {
return value;
}

/**
* @private
* converts the header values to string
* @param {Array} headers
* @returns {void}
*/
_convertHeadersValue(headers) {
if (!headers || headers.length === 0) {
return;
}

headers.forEach(item => {
Object.keys(item).forEach(key => {
item[key] = item[key] && item[key].toString(MESSAGE_CHARSET);
});
});
}

/**
* subscribe and start to consume, should be called only once after connection is successfull
* options object supports the following fields:
Expand Down Expand Up @@ -521,6 +541,8 @@ class NConsumer extends EventEmitter {
this._lastReceived = Date.now();
message.value = this._convertMessageValue(message.value, asString, asJSON);

this._convertHeadersValue(message.headers);

if (!this._firstMessageConsumed) {
this._firstMessageConsumed = true;
super.emit("first-drain-message", message);
Expand Down Expand Up @@ -559,6 +581,8 @@ class NConsumer extends EventEmitter {
this.config.logger.debug(message);
message.value = this._convertMessageValue(message.value, asString, asJSON);

this._convertHeadersValue(message.headers);

if (!this._firstMessageConsumed) {
this._firstMessageConsumed = true;
super.emit("first-drain-message", message);
Expand Down Expand Up @@ -652,6 +676,8 @@ class NConsumer extends EventEmitter {

message.value = this._convertMessageValue(message.value, asString, asJSON);

this._convertHeadersValue(message.headers);

if (!this._firstMessageConsumed) {
this._firstMessageConsumed = true;
super.emit("first-drain-message", message);
Expand Down
40 changes: 20 additions & 20 deletions lib/librdkafka/NProducer.js
Original file line number Diff line number Diff line change
Expand Up @@ -282,15 +282,10 @@ class NProducer extends EventEmitter {
* @param {number} _partition - optional partition to produce to
* @param {string} _key - optional message key
* @param {string} _partitionKey - optional key to evaluate partition for this message
* @param {array} _headers - optional array containing custom key value pairs that provide message metadata
* @returns {Promise.<object>}
*/
async send(topicName, message, _partition = null, _key = null, _partitionKey = null) {

/*
these are not supported in the HighLevelProducer of node-rdkafka
_opaqueKey = null,
_headers = null,
*/
async send(topicName, message, _partition = null, _key = null, _partitionKey = null, _headers = null) {

if (!this.producer) {
throw new Error("You must call and await .connect() before trying to produce messages.");
Expand Down Expand Up @@ -342,7 +337,7 @@ class NProducer extends EventEmitter {
this._totalSentMessages++;

return new Promise((resolve, reject) => {
this.producer.produce(topicName, partition, message, key, producedAt, (error, offset) => {
this.producer.produce(topicName, partition, message, key, producedAt, _headers, (error, offset) => {

if (error) {
return reject(error);
Expand All @@ -366,9 +361,10 @@ class NProducer extends EventEmitter {
* @param {number} partition - optional partition to produce to
* @param {number} version - optional version of the message value
* @param {string} partitionKey - optional key to evaluate partition for this message
* @param {array} headers - optional array containing custom key value pairs that provide message metadata
* @returns {Promise.<object>}
*/
async buffer(topic, identifier, payload, partition = null, version = null, partitionKey = null) {
async buffer(topic, identifier, payload, partition = null, version = null, partitionKey = null, headers = null) {

if (typeof identifier === "undefined") {
identifier = uuid.v4();
Expand All @@ -390,7 +386,7 @@ class NProducer extends EventEmitter {
payload.version = version;
}

return await this.send(topic, JSON.stringify(payload), partition, identifier, partitionKey);
return await this.send(topic, JSON.stringify(payload), partition, identifier, partitionKey, headers);
}

/**
Expand All @@ -405,9 +401,10 @@ class NProducer extends EventEmitter {
* @param {string} partitionKey - optional key to deterministcally detect partition
* @param {number} partition - optional partition (overwrites partitionKey)
* @param {string} messageType - optional messageType (for the formatted message value)
* @param {array} headers - optional object containing custom key value pairs that provide message metadata
* @returns {Promise.<object>}
*/
async _sendBufferFormat(topic, identifier, _payload, version = 1, _, partitionKey = null, partition = null, messageType = "") {
async _sendBufferFormat(topic, identifier, _payload, version = 1, _, partitionKey = null, partition = null, messageType = "", headers = null) {

if (typeof identifier === "undefined") {
identifier = uuid.v4();
Expand Down Expand Up @@ -437,15 +434,15 @@ class NProducer extends EventEmitter {
type: topic + messageType
};

return await this.send(topic, JSON.stringify(payload), partition, identifier, partitionKey);
return await this.send(topic, JSON.stringify(payload), partition, identifier, partitionKey, headers);
}

/**
* an alias for bufferFormatPublish()
* @alias bufferFormatPublish
*/
bufferFormat(topic, identifier, payload, version = 1, compressionType = 0, partitionKey = null) {
return this.bufferFormatPublish(topic, identifier, payload, version, compressionType, partitionKey);
bufferFormat(topic, identifier, payload, version = 1, compressionType = 0, partitionKey = null, headers = null) {
return this.bufferFormatPublish(topic, identifier, payload, version, compressionType, partitionKey, headers);
}

/**
Expand All @@ -457,10 +454,11 @@ class NProducer extends EventEmitter {
* @param {*} _ -ignoreable, here for api compatibility
* @param {string} partitionKey - optional key to deterministcally detect partition
* @param {number} partition - optional partition (overwrites partitionKey)
* @param {array} headers - optional object containing custom key value pairs that provide message metadata
* @returns {Promise.<object>}
*/
bufferFormatPublish(topic, identifier, _payload, version = 1, _, partitionKey = null, partition = null) {
return this._sendBufferFormat(topic, identifier, _payload, version, _, partitionKey, partition, MESSAGE_TYPES.PUBLISH);
bufferFormatPublish(topic, identifier, _payload, version = 1, _, partitionKey = null, partition = null, headers = null) {
return this._sendBufferFormat(topic, identifier, _payload, version, _, partitionKey, partition, MESSAGE_TYPES.PUBLISH, headers);
}

/**
Expand All @@ -472,10 +470,11 @@ class NProducer extends EventEmitter {
* @param {*} _ -ignoreable, here for api compatibility
* @param {string} partitionKey - optional key to deterministcally detect partition
* @param {number} partition - optional partition (overwrites partitionKey)
* @param {array} headers - optional object containing custom key value pairs that provide message metadata
* @returns {Promise.<object>}
*/
bufferFormatUpdate(topic, identifier, _payload, version = 1, _, partitionKey = null, partition = null) {
return this._sendBufferFormat(topic, identifier, _payload, version, _, partitionKey, partition, MESSAGE_TYPES.UPDATE);
bufferFormatUpdate(topic, identifier, _payload, version = 1, _, partitionKey = null, partition = null, headers = null) {
return this._sendBufferFormat(topic, identifier, _payload, version, _, partitionKey, partition, MESSAGE_TYPES.UPDATE, headers);
}

/**
Expand All @@ -487,10 +486,11 @@ class NProducer extends EventEmitter {
* @param {*} _ -ignoreable, here for api compatibility
* @param {string} partitionKey - optional key to deterministcally detect partition
* @param {number} partition - optional partition (overwrites partitionKey)
* @param {array} headers - optional object containing custom key value pairs that provide message metadata
* @returns {Promise.<object>}
*/
bufferFormatUnpublish(topic, identifier, _payload, version = 1, _, partitionKey = null, partition = null) {
return this._sendBufferFormat(topic, identifier, _payload, version, _, partitionKey, partition, MESSAGE_TYPES.UNPUBLISH);
bufferFormatUnpublish(topic, identifier, _payload, version = 1, _, partitionKey = null, partition = null, headers = null) {
return this._sendBufferFormat(topic, identifier, _payload, version, _, partitionKey, partition, MESSAGE_TYPES.UNPUBLISH, headers);
}

/**
Expand Down
13 changes: 12 additions & 1 deletion test/int/NSinek.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,11 @@ describe("Native Client INT", () => {
producer.bufferFormatUpdate(topic, "2", {content: "a message 2"}, 1, null, 0),
producer.bufferFormatUnpublish(topic, "3", {content: "a message 3"}, 1, null, 0),
producer.send(topic, Buffer.from("a message buffer")),
producer.send(topic, "a message with headers", null, null, null, [{ myCustomKey: "myCustomValue" }]),
producer.bufferFormatPublish(topic, "1", {content: "a message with headers 1"}, 1, null, 0, null, [{ myCustomKey: "myCustomValue 1" }]),
producer.bufferFormatUpdate(topic, "2", {content: "a message with headers 2"}, 1, null, 0, null, [{ myCustomKey: "myCustomValue 2" }]),
producer.bufferFormatUnpublish(topic, "3", {content: "a message with headers 3"}, 1, null, 0, null, [{ myCustomKey: "myCustomValue 3" }]),
producer.send(topic, Buffer.from("a message buffer with headers"), null, null, null, [{ myCustomKey: "myCustomValue buffer" }]),
];

return Promise.all(promises).then((produceResults) => {
Expand Down Expand Up @@ -89,7 +94,7 @@ describe("Native Client INT", () => {

it("should be able to consume messages", done => {
// console.log(consumedMessages);
assert.ok(consumedMessages.length >= 5);
assert.ok(consumedMessages.length >= 10);
assert.ok(!Buffer.isBuffer(consumedMessages[0].value));
assert.ok(consumedMessages[1].key, "1");
assert.ok(consumedMessages[2].key, "2");
Expand All @@ -99,6 +104,12 @@ describe("Native Client INT", () => {
assert.equal(JSON.parse(consumedMessages[2].value).payload.content, "a message 2");
assert.equal(JSON.parse(consumedMessages[3].value).payload.content, "a message 3");
assert.equal(consumedMessages[4].value, "a message buffer");
assert.equal(consumedMessages[5].headers[0].myCustomKey, "myCustomValue");
assert.equal(consumedMessages[6].headers[0].myCustomKey, "myCustomValue 1");
assert.equal(consumedMessages[7].headers[0].myCustomKey, "myCustomValue 2");
assert.equal(consumedMessages[8].headers[0].myCustomKey, "myCustomValue 3");
assert.equal(consumedMessages[9].headers[0].myCustomKey, "myCustomValue buffer");

done();
});

Expand Down

0 comments on commit a90ba6c

Please sign in to comment.