diff --git a/examples/kafka-hub/build-docker-images.sh b/examples/kafka-hub/build-docker-images.sh new file mode 100644 index 00000000..c40a93ee --- /dev/null +++ b/examples/kafka-hub/build-docker-images.sh @@ -0,0 +1,17 @@ +#!/usr/bin/env bash +# Copyright 2024 WSO2 LLC. (http://wso2.com) +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +( cd hub ; bal clean ; bal build) +( cd consolidator ; bal clean ; bal build) diff --git a/examples/kafka-hub/hub/modules/persistence/persistence.bal b/examples/kafka-hub/hub/modules/persistence/persistence.bal index 6c098573..a799ecff 100644 --- a/examples/kafka-hub/hub/modules/persistence/persistence.bal +++ b/examples/kafka-hub/hub/modules/persistence/persistence.bal @@ -15,6 +15,7 @@ // under the License. import ballerina/websubhub; +import ballerinax/kafka; import kafkaHub.config; import kafkaHub.connections as conn; @@ -49,8 +50,15 @@ public isolated function addUpdateMessage(string topicName, websubhub:UpdateMess check produceKafkaMessage(topicName, payload); } -isolated function produceKafkaMessage(string topicName, json payload) returns error? { - byte[] serializedContent = payload.toJsonString().toBytes(); - check conn:statePersistProducer->send({ topic: topicName, value: serializedContent }); +isolated function produceKafkaMessage(string topicName, json payload, + map headers = {}) returns error? { + kafka:AnydataProducerRecord message = getProducerMsg(topicName, payload, headers); + check conn:statePersistProducer->send(message); check conn:statePersistProducer->'flush(); } + +isolated function getProducerMsg(string topic, json payload, + map headers) returns kafka:AnydataProducerRecord { + byte[] value = payload.toJsonString().toBytes(); + return headers.length() == 0 ? { topic, value } : { topic, value, headers }; +} diff --git a/examples/kafka-hub/hub/websub_subscribers.bal b/examples/kafka-hub/hub/websub_subscribers.bal index 761b8b46..05042e73 100644 --- a/examples/kafka-hub/hub/websub_subscribers.bal +++ b/examples/kafka-hub/hub/websub_subscribers.bal @@ -131,7 +131,20 @@ isolated function deSerializeKafkaRecord(kafka:ConsumerRecord kafkaRecord) retur json payload = check value:fromJsonString(message); websubhub:ContentDistributionMessage distributionMsg = { content: payload, - contentType: mime:APPLICATION_JSON + contentType: mime:APPLICATION_JSON, + headers: check getHeaders(kafkaRecord) }; return distributionMsg; } + +isolated function getHeaders(kafka:ConsumerRecord kafkaRecord) returns map|error { + map headers = {}; + foreach var ['key, value] in kafkaRecord.headers.entries().toArray() { + if value is string || value is string[] { + headers['key] = value; + } else if value is byte[] { + headers['key] = check string:fromBytes(value); + } + } + return headers; +}