From e8a306a7fa7d10a34ee187775125699dfe87acaf Mon Sep 17 00:00:00 2001 From: Ayesh Almeida Date: Wed, 20 Nov 2024 14:16:44 +0530 Subject: [PATCH 1/5] Add functionality to map kafka-headers into http headers --- examples/kafka-hub/hub/websub_subscribers.bal | 15 ++++++++++++++- 1 file changed, 14 insertions(+), 1 deletion(-) diff --git a/examples/kafka-hub/hub/websub_subscribers.bal b/examples/kafka-hub/hub/websub_subscribers.bal index 761b8b46..05042e73 100644 --- a/examples/kafka-hub/hub/websub_subscribers.bal +++ b/examples/kafka-hub/hub/websub_subscribers.bal @@ -131,7 +131,20 @@ isolated function deSerializeKafkaRecord(kafka:ConsumerRecord kafkaRecord) retur json payload = check value:fromJsonString(message); websubhub:ContentDistributionMessage distributionMsg = { content: payload, - contentType: mime:APPLICATION_JSON + contentType: mime:APPLICATION_JSON, + headers: check getHeaders(kafkaRecord) }; return distributionMsg; } + +isolated function getHeaders(kafka:ConsumerRecord kafkaRecord) returns map|error { + map headers = {}; + foreach var ['key, value] in kafkaRecord.headers.entries().toArray() { + if value is string || value is string[] { + headers['key] = value; + } else if value is byte[] { + headers['key] = check string:fromBytes(value); + } + } + return headers; +} From 35503856f9b009869ddbeb0470f867fa46001cc7 Mon Sep 17 00:00:00 2001 From: Ayesh Almeida Date: Wed, 20 Nov 2024 15:14:54 +0530 Subject: [PATCH 2/5] Add support to map HTTP headers to kafka-headers --- .../hub/modules/persistence/persistence.bal | 17 ++++++++++++++--- 1 file changed, 14 insertions(+), 3 deletions(-) diff --git a/examples/kafka-hub/hub/modules/persistence/persistence.bal b/examples/kafka-hub/hub/modules/persistence/persistence.bal index 6c098573..41e48279 100644 --- a/examples/kafka-hub/hub/modules/persistence/persistence.bal +++ b/examples/kafka-hub/hub/modules/persistence/persistence.bal @@ -17,6 +17,7 @@ import ballerina/websubhub; import kafkaHub.config; import kafkaHub.connections as conn; +import ballerinax/kafka; public isolated function addRegsiteredTopic(websubhub:TopicRegistration message) returns error? { check updateHubState(message); @@ -49,8 +50,18 @@ public isolated function addUpdateMessage(string topicName, websubhub:UpdateMess check produceKafkaMessage(topicName, payload); } -isolated function produceKafkaMessage(string topicName, json payload) returns error? { - byte[] serializedContent = payload.toJsonString().toBytes(); - check conn:statePersistProducer->send({ topic: topicName, value: serializedContent }); +isolated function produceKafkaMessage(string topicName, json payload, + map headers = {}) returns error? { + kafka:AnydataProducerRecord message = getProducerMsg(topicName, payload, headers); + check conn:statePersistProducer->send(message); check conn:statePersistProducer->'flush(); } + +isolated function getProducerMsg(string topic, json payload, + map headers) returns kafka:AnydataProducerRecord { + byte[] value = payload.toJsonString().toBytes(); + if headers.length() == 0 { + return { topic, value }; + } + return { topic, value, headers }; +} From c2dfdddda9451b89442ba37c8c1847682b9b56a0 Mon Sep 17 00:00:00 2001 From: Ayesh Almeida <77491511+ayeshLK@users.noreply.github.com> Date: Wed, 20 Nov 2024 16:02:37 +0530 Subject: [PATCH 3/5] Refactor the kafka producer-message constructing logic Co-authored-by: Nipuna Ransinghe --- examples/kafka-hub/hub/modules/persistence/persistence.bal | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/examples/kafka-hub/hub/modules/persistence/persistence.bal b/examples/kafka-hub/hub/modules/persistence/persistence.bal index 41e48279..7ac8c6e2 100644 --- a/examples/kafka-hub/hub/modules/persistence/persistence.bal +++ b/examples/kafka-hub/hub/modules/persistence/persistence.bal @@ -60,8 +60,5 @@ isolated function produceKafkaMessage(string topicName, json payload, isolated function getProducerMsg(string topic, json payload, map headers) returns kafka:AnydataProducerRecord { byte[] value = payload.toJsonString().toBytes(); - if headers.length() == 0 { - return { topic, value }; - } - return { topic, value, headers }; + return headers.length() == 0 ? { topic, value } : { topic, value, headers }; } From d955225ad1eefa00113176b48d07e9e0a71c2c5d Mon Sep 17 00:00:00 2001 From: Ayesh Almeida Date: Wed, 20 Nov 2024 15:31:51 +0530 Subject: [PATCH 4/5] Add script to build the docker-images --- examples/kafka-hub/build-docker-images.sh | 17 +++++++++++++++++ 1 file changed, 17 insertions(+) create mode 100644 examples/kafka-hub/build-docker-images.sh diff --git a/examples/kafka-hub/build-docker-images.sh b/examples/kafka-hub/build-docker-images.sh new file mode 100644 index 00000000..c40a93ee --- /dev/null +++ b/examples/kafka-hub/build-docker-images.sh @@ -0,0 +1,17 @@ +#!/usr/bin/env bash +# Copyright 2024 WSO2 LLC. (http://wso2.com) +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +( cd hub ; bal clean ; bal build) +( cd consolidator ; bal clean ; bal build) From 358540c018e4393d49b3e38de7a53e6f0884660e Mon Sep 17 00:00:00 2001 From: Ayesh Almeida Date: Wed, 20 Nov 2024 16:03:32 +0530 Subject: [PATCH 5/5] Restructure import order --- examples/kafka-hub/hub/modules/persistence/persistence.bal | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/examples/kafka-hub/hub/modules/persistence/persistence.bal b/examples/kafka-hub/hub/modules/persistence/persistence.bal index 7ac8c6e2..a799ecff 100644 --- a/examples/kafka-hub/hub/modules/persistence/persistence.bal +++ b/examples/kafka-hub/hub/modules/persistence/persistence.bal @@ -15,9 +15,9 @@ // under the License. import ballerina/websubhub; +import ballerinax/kafka; import kafkaHub.config; import kafkaHub.connections as conn; -import ballerinax/kafka; public isolated function addRegsiteredTopic(websubhub:TopicRegistration message) returns error? { check updateHubState(message);