diff --git a/examples/kafka-hub/hub/websub_subscribers.bal b/examples/kafka-hub/hub/websub_subscribers.bal index 761b8b46..05042e73 100644 --- a/examples/kafka-hub/hub/websub_subscribers.bal +++ b/examples/kafka-hub/hub/websub_subscribers.bal @@ -131,7 +131,20 @@ isolated function deSerializeKafkaRecord(kafka:ConsumerRecord kafkaRecord) retur json payload = check value:fromJsonString(message); websubhub:ContentDistributionMessage distributionMsg = { content: payload, - contentType: mime:APPLICATION_JSON + contentType: mime:APPLICATION_JSON, + headers: check getHeaders(kafkaRecord) }; return distributionMsg; } + +isolated function getHeaders(kafka:ConsumerRecord kafkaRecord) returns map|error { + map headers = {}; + foreach var ['key, value] in kafkaRecord.headers.entries().toArray() { + if value is string || value is string[] { + headers['key] = value; + } else if value is byte[] { + headers['key] = check string:fromBytes(value); + } + } + return headers; +}