From 759205bab3fb8d6745bce14ccf094a9b143d07d2 Mon Sep 17 00:00:00 2001 From: Ayesh Almeida Date: Sat, 23 Nov 2024 20:13:20 +0530 Subject: [PATCH 1/9] Add support to convert http-headers to kafka-headers --- examples/kafka-hub/hub/hub_service.bal | 20 ++++++++++++++++--- .../hub/modules/persistence/persistence.bal | 3 ++- 2 files changed, 19 insertions(+), 4 deletions(-) diff --git a/examples/kafka-hub/hub/hub_service.bal b/examples/kafka-hub/hub/hub_service.bal index f5e5d303..f3655155 100644 --- a/examples/kafka-hub/hub/hub_service.bal +++ b/examples/kafka-hub/hub/hub_service.bal @@ -204,17 +204,18 @@ websubhub:Service hubService = service object { if config:SECURITY_ON { check security:authorize(headers, ["update_content"]); } - check self.updateMessage(message); + check self.updateMessage(message, headers); return websubhub:ACKNOWLEDGEMENT; } - isolated function updateMessage(websubhub:UpdateMessage msg) returns websubhub:UpdateMessageError? { + isolated function updateMessage(websubhub:UpdateMessage msg, http:Headers headers) returns websubhub:UpdateMessageError? { boolean topicAvailable = false; lock { topicAvailable = registeredTopicsCache.hasKey(msg.hubTopic); } if topicAvailable { - error? errorResponse = persist:addUpdateMessage(msg.hubTopic, msg); + map messageHeaders = getHeadersMap(headers); + error? errorResponse = persist:addUpdateMessage(msg.hubTopic, msg, messageHeaders); if errorResponse is websubhub:UpdateMessageError { return errorResponse; } else if errorResponse is error { @@ -228,3 +229,16 @@ websubhub:Service hubService = service object { } } }; + +isolated function getHeadersMap(http:Headers httpHeaders) returns map { + map headers = {}; + foreach string headerName in httpHeaders.getHeaderNames() { + var headerValues = httpHeaders.getHeaders(headerName); + // safe to ingore the error as here we are retrieving only the available headers + if headerValues is error { + continue; + } + headers[headerName] = headerValues; + } + return headers; +} diff --git a/examples/kafka-hub/hub/modules/persistence/persistence.bal b/examples/kafka-hub/hub/modules/persistence/persistence.bal index a799ecff..d8ee8797 100644 --- a/examples/kafka-hub/hub/modules/persistence/persistence.bal +++ b/examples/kafka-hub/hub/modules/persistence/persistence.bal @@ -45,7 +45,8 @@ isolated function updateHubState(websubhub:TopicRegistration|websubhub:TopicDere } } -public isolated function addUpdateMessage(string topicName, websubhub:UpdateMessage message) returns error? { +public isolated function addUpdateMessage(string topicName, websubhub:UpdateMessage message, + map headers = {}) returns error? { json payload = message.content; check produceKafkaMessage(topicName, payload); } From 22b167055ec94448ec7dbd871d582f574f0035e7 Mon Sep 17 00:00:00 2001 From: Ayesh Almeida Date: Sun, 24 Nov 2024 13:40:50 +0530 Subject: [PATCH 2/9] Restructure kafka topic-partition assignment logic --- .../hub/modules/connections/connections.bal | 18 +++++------------- examples/kafka-hub/hub/websub_subscribers.bal | 17 +++++++++++------ 2 files changed, 16 insertions(+), 19 deletions(-) diff --git a/examples/kafka-hub/hub/modules/connections/connections.bal b/examples/kafka-hub/hub/modules/connections/connections.bal index 59b0e6b0..51a79f2c 100644 --- a/examples/kafka-hub/hub/modules/connections/connections.bal +++ b/examples/kafka-hub/hub/modules/connections/connections.bal @@ -94,9 +94,9 @@ public final kafka:Consumer websubEventsConsumer = check new (config:KAFKA_URL, # # + topicName - The kafka-topic to which the consumer should subscribe # + groupName - The consumer group name -# + partition - The kafka topic-partitions +# + partitions - The kafka topic-partitions # + return - `kafka:Consumer` if succcessful or else `error` -public isolated function createMessageConsumer(string topicName, string groupName, int|int[]? partition = ()) returns kafka:Consumer|error { +public isolated function createMessageConsumer(string topicName, string groupName, int[]? partitions = ()) returns kafka:Consumer|error { // Messages are distributed to subscribers in parallel. // In this scenario, manually committing offsets is unnecessary because // the next message polling starts as soon as the worker begins delivering messages to the subscribers. @@ -110,20 +110,12 @@ public isolated function createMessageConsumer(string topicName, string groupNam securityProtocol: kafka:PROTOCOL_SSL, maxPollRecords: config:CONSUMER_MAX_POLL_RECORDS }; - if partition is () { + if partitions is () { return new (config:KAFKA_URL, consumerConfiguration); } kafka:Consumer consumerEp = check new (config:KAFKA_URL, consumerConfiguration); - check consumerEp->assign(getTopicPartitions(topicName, partition)); + kafka:TopicPartition[] kafkaTopicPartitions = partitions.'map(p => {topic: topicName, partition: p}); + check consumerEp->assign(kafkaTopicPartitions); return consumerEp; } - -isolated function getTopicPartitions(string topic, int|int[] partition) returns kafka:TopicPartition[] { - if partition is int { - return [ - {topic, partition} - ]; - } - return partition.'map(p => {topic, partition: p}); -} diff --git a/examples/kafka-hub/hub/websub_subscribers.bal b/examples/kafka-hub/hub/websub_subscribers.bal index 5532ff73..31cd6826 100644 --- a/examples/kafka-hub/hub/websub_subscribers.bal +++ b/examples/kafka-hub/hub/websub_subscribers.bal @@ -70,12 +70,8 @@ isolated function processUnsubscription(websubhub:VerifiedUnsubscription unsubsc isolated function pollForNewUpdates(string subscriberId, websubhub:VerifiedSubscription subscription) returns error? { string consumerGroup = check value:ensureType(subscription[CONSUMER_GROUP]); - int? topicPartition = (); - if subscription.hasKey(CONSUMER_TOPIC_PARTITION) { - string partitionDetails = check value:ensureType(subscription[CONSUMER_TOPIC_PARTITION]); - topicPartition = check int:fromString(partitionDetails); - } - kafka:Consumer consumerEp = check conn:createMessageConsumer(subscription.hubTopic, consumerGroup, topicPartition); + int[]? topicPartitions = check getTopicPartitions(subscription); + kafka:Consumer consumerEp = check conn:createMessageConsumer(subscription.hubTopic, consumerGroup, topicPartitions); websubhub:HubClient clientEp = check new (subscription, { retryConfig: { interval: config:MESSAGE_DELIVERY_RETRY_INTERVAL, @@ -150,3 +146,12 @@ isolated function getHeaders(kafka:ConsumerRecord kafkaRecord) returns map p.trim()).'map(p => check int:fromString(p)); +} From 55323f52a3bed67364125eb85b84f3eca4f1b0ba Mon Sep 17 00:00:00 2001 From: Ayesh Almeida Date: Sun, 24 Nov 2024 13:45:01 +0530 Subject: [PATCH 3/9] Rename the topic-partition key in the subscription message --- examples/kafka-hub/hub/websub_subscribers.bal | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/examples/kafka-hub/hub/websub_subscribers.bal b/examples/kafka-hub/hub/websub_subscribers.bal index 31cd6826..4b1a56e0 100644 --- a/examples/kafka-hub/hub/websub_subscribers.bal +++ b/examples/kafka-hub/hub/websub_subscribers.bal @@ -26,7 +26,7 @@ import ballerina/mime; isolated map subscribersCache = {}; const string CONSUMER_GROUP = "consumerGroup"; -const string CONSUMER_TOPIC_PARTITION = "topicPartition"; +const string CONSUMER_TOPIC_PARTITIONS = "topicPartitions"; const string SERVER_ID = "SERVER_ID"; const string STATUS = "status"; const string STALE_STATE = "stale"; @@ -148,10 +148,10 @@ isolated function getHeaders(kafka:ConsumerRecord kafkaRecord) returns map p.trim()).'map(p => check int:fromString(p)); } From ebf069e6ed0a6ea0d4764ba6322a189df8caf89c Mon Sep 17 00:00:00 2001 From: Ayesh Almeida Date: Sun, 24 Nov 2024 13:45:33 +0530 Subject: [PATCH 4/9] Add support to provide topic-partitions as a custom parameter in the subscription request --- .../kafka-hub/subscriber/subscriber_service.bal | 14 +++++++++----- 1 file changed, 9 insertions(+), 5 deletions(-) diff --git a/examples/kafka-hub/subscriber/subscriber_service.bal b/examples/kafka-hub/subscriber/subscriber_service.bal index 00b36d7a..e91f583e 100644 --- a/examples/kafka-hub/subscriber/subscriber_service.bal +++ b/examples/kafka-hub/subscriber/subscriber_service.bal @@ -110,12 +110,16 @@ service /JuApTOXq19 on securedSubscriber { } isolated function getCustomParams() returns map { - if os:getEnv("CONSUMER_GROUP") == "" { - return {}; + map params = {}; + if os:getEnv("CONSUMER_GROUP") !is "" { + params["consumerGroup"] = os:getEnv("CONSUMER_GROUP"); } - return { - consumerGroup: os:getEnv("CONSUMER_GROUP") - }; + + if os:getEnv("TOPIC_PARTITIONS") !is "" { + params["topicPartitions"] = os:getEnv("TOPIC_PARTITIONS"); + } + + return params; } isolated function getListener() returns websub:Listener|error { From 5b08cb82a5701e494a032466d7233b7629618789 Mon Sep 17 00:00:00 2001 From: Ayesh Almeida Date: Mon, 25 Nov 2024 08:26:31 +0530 Subject: [PATCH 5/9] Refactor the code --- .../kafka-hub/hub/modules/connections/connections.bal | 3 ++- examples/kafka-hub/subscriber/subscriber_service.bal | 11 +++++++---- 2 files changed, 9 insertions(+), 5 deletions(-) diff --git a/examples/kafka-hub/hub/modules/connections/connections.bal b/examples/kafka-hub/hub/modules/connections/connections.bal index 51a79f2c..3080389e 100644 --- a/examples/kafka-hub/hub/modules/connections/connections.bal +++ b/examples/kafka-hub/hub/modules/connections/connections.bal @@ -103,7 +103,6 @@ public isolated function createMessageConsumer(string topicName, string groupNam // Therefore, auto-commit is enabled to handle offset management automatically. // Related issue: https://github.com/ballerina-platform/ballerina-library/issues/7376 kafka:ConsumerConfiguration consumerConfiguration = { - groupId: groupName, topics: [topicName], autoCommit: true, secureSocket: secureSocketConfig, @@ -111,6 +110,8 @@ public isolated function createMessageConsumer(string topicName, string groupNam maxPollRecords: config:CONSUMER_MAX_POLL_RECORDS }; if partitions is () { + // Kafka will require a consumer group only if the consumer does not assign partitions manually + consumerConfiguration.groupId = groupName; return new (config:KAFKA_URL, consumerConfiguration); } diff --git a/examples/kafka-hub/subscriber/subscriber_service.bal b/examples/kafka-hub/subscriber/subscriber_service.bal index e91f583e..fe87ca22 100644 --- a/examples/kafka-hub/subscriber/subscriber_service.bal +++ b/examples/kafka-hub/subscriber/subscriber_service.bal @@ -110,16 +110,19 @@ service /JuApTOXq19 on securedSubscriber { } isolated function getCustomParams() returns map { - map params = {}; if os:getEnv("CONSUMER_GROUP") !is "" { - params["consumerGroup"] = os:getEnv("CONSUMER_GROUP"); + return { + consumerGroup: os:getEnv("CONSUMER_GROUP") + }; } if os:getEnv("TOPIC_PARTITIONS") !is "" { - params["topicPartitions"] = os:getEnv("TOPIC_PARTITIONS"); + return { + topicPartitions: os:getEnv("TOPIC_PARTITIONS") + }; } - return params; + return {}; } isolated function getListener() returns websub:Listener|error { From 1ba3c18615f04485affe65458dc31978c73c3c10 Mon Sep 17 00:00:00 2001 From: Ayesh Almeida Date: Mon, 25 Nov 2024 08:27:55 +0530 Subject: [PATCH 6/9] Update docker image versions --- examples/kafka-hub/docker-compose.yaml | 2 +- examples/kafka-hub/hub/Cloud.toml | 2 +- examples/kafka-hub/subscriber/Cloud.toml | 2 +- 3 files changed, 3 insertions(+), 3 deletions(-) diff --git a/examples/kafka-hub/docker-compose.yaml b/examples/kafka-hub/docker-compose.yaml index 6bbf5a4a..fd6a2011 100644 --- a/examples/kafka-hub/docker-compose.yaml +++ b/examples/kafka-hub/docker-compose.yaml @@ -3,7 +3,7 @@ name: 'kafkahub' services: hub-1: - image: 'ayeshalmeida/kafkahub:10.0.0' + image: 'ayeshalmeida/kafkahub:11.0.0' hostname: hub1 container_name: hub-1 ports: diff --git a/examples/kafka-hub/hub/Cloud.toml b/examples/kafka-hub/hub/Cloud.toml index 3e39c08e..e09e3503 100644 --- a/examples/kafka-hub/hub/Cloud.toml +++ b/examples/kafka-hub/hub/Cloud.toml @@ -1,7 +1,7 @@ [container.image] repository="ballerina" name="kafkahub" -tag="10.0.0" +tag="11.0.0" [[container.copy.files]] sourceFile="./resources" diff --git a/examples/kafka-hub/subscriber/Cloud.toml b/examples/kafka-hub/subscriber/Cloud.toml index e3076be4..3189b754 100644 --- a/examples/kafka-hub/subscriber/Cloud.toml +++ b/examples/kafka-hub/subscriber/Cloud.toml @@ -1,7 +1,7 @@ [container.image] repository="ballerina" name="wbsbsubscriber" -tag="5.0.0" +tag="6.0.0" [[container.copy.files]] sourceFile="./resources" From 04aa59c6ede5356cda20468f4a08ee2cb5f64e2f Mon Sep 17 00:00:00 2001 From: Ayesh Almeida Date: Wed, 27 Nov 2024 11:18:26 +0530 Subject: [PATCH 7/9] Fix kafka-consumer initializing logic --- .../hub/modules/connections/connections.bal | 27 +++++++++++++++---- 1 file changed, 22 insertions(+), 5 deletions(-) diff --git a/examples/kafka-hub/hub/modules/connections/connections.bal b/examples/kafka-hub/hub/modules/connections/connections.bal index 3080389e..64bbec4b 100644 --- a/examples/kafka-hub/hub/modules/connections/connections.bal +++ b/examples/kafka-hub/hub/modules/connections/connections.bal @@ -103,20 +103,37 @@ public isolated function createMessageConsumer(string topicName, string groupNam // Therefore, auto-commit is enabled to handle offset management automatically. // Related issue: https://github.com/ballerina-platform/ballerina-library/issues/7376 kafka:ConsumerConfiguration consumerConfiguration = { - topics: [topicName], + groupId: groupName, autoCommit: true, secureSocket: secureSocketConfig, securityProtocol: kafka:PROTOCOL_SSL, maxPollRecords: config:CONSUMER_MAX_POLL_RECORDS }; if partitions is () { - // Kafka will require a consumer group only if the consumer does not assign partitions manually - consumerConfiguration.groupId = groupName; + // Kafka consumer topic subscription should only be used when manual partition assignment is not used + consumerConfiguration.topics = [topicName]; return new (config:KAFKA_URL, consumerConfiguration); } - kafka:Consumer consumerEp = check new (config:KAFKA_URL, consumerConfiguration); + log:printInfo("Assigning kafka-topic partitions manually", details = partitions); + kafka:Consumer|kafka:Error consumerEp = check new (config:KAFKA_URL, consumerConfiguration); + if consumerEp is kafka:Error { + log:printError("Error occurred while creating the consumer", consumerEp); + return consumerEp; + } + kafka:TopicPartition[] kafkaTopicPartitions = partitions.'map(p => {topic: topicName, partition: p}); - check consumerEp->assign(kafkaTopicPartitions); + kafka:Error? paritionAssignmentErr = consumerEp->assign(kafkaTopicPartitions); + if paritionAssignmentErr is kafka:Error { + log:printError("Error occurred while assigning partitions to the consumer", paritionAssignmentErr); + return paritionAssignmentErr; + } + + kafka:Error? kafkaSeekErr = consumerEp->seekToBeginning(kafkaTopicPartitions); + if kafkaSeekErr is kafka:Error { + log:printError("Error occurred while assigning seeking partitions for the consumer", paritionAssignmentErr); + return kafkaSeekErr; + } + return consumerEp; } From fe3cff7247fe4c22d16d728ed4337a9375a532ef Mon Sep 17 00:00:00 2001 From: Ayesh Almeida Date: Wed, 27 Nov 2024 11:18:54 +0530 Subject: [PATCH 8/9] Restructure codebase to use the latest Kafka changes --- examples/kafka-hub/hub/hub_state_update.bal | 4 ++-- examples/kafka-hub/hub/websub_subscribers.bal | 16 +++++++++------- 2 files changed, 11 insertions(+), 9 deletions(-) diff --git a/examples/kafka-hub/hub/hub_state_update.bal b/examples/kafka-hub/hub/hub_state_update.bal index 0d9c0785..ba0efef1 100644 --- a/examples/kafka-hub/hub/hub_state_update.bal +++ b/examples/kafka-hub/hub/hub_state_update.bal @@ -39,11 +39,11 @@ function initializeHubState() returns error? { function updateHubState() returns error? { while true { - kafka:ConsumerRecord[] records = check conn:websubEventsConsumer->poll(config:POLLING_INTERVAL); + kafka:BytesConsumerRecord[] records = check conn:websubEventsConsumer->poll(config:POLLING_INTERVAL); if records.length() <= 0 { continue; } - foreach kafka:ConsumerRecord currentRecord in records { + foreach kafka:BytesConsumerRecord currentRecord in records { string lastPersistedData = check string:fromBytes(currentRecord.value); error? result = processStateUpdateEvent(lastPersistedData); if result is error { diff --git a/examples/kafka-hub/hub/websub_subscribers.bal b/examples/kafka-hub/hub/websub_subscribers.bal index 4b1a56e0..411b5f99 100644 --- a/examples/kafka-hub/hub/websub_subscribers.bal +++ b/examples/kafka-hub/hub/websub_subscribers.bal @@ -71,6 +71,7 @@ isolated function processUnsubscription(websubhub:VerifiedUnsubscription unsubsc isolated function pollForNewUpdates(string subscriberId, websubhub:VerifiedSubscription subscription) returns error? { string consumerGroup = check value:ensureType(subscription[CONSUMER_GROUP]); int[]? topicPartitions = check getTopicPartitions(subscription); + log:printInfo("Manually partition assignment", isRequired = topicPartitions !is ()); kafka:Consumer consumerEp = check conn:createMessageConsumer(subscription.hubTopic, consumerGroup, topicPartitions); websubhub:HubClient clientEp = check new (subscription, { retryConfig: { @@ -84,7 +85,7 @@ isolated function pollForNewUpdates(string subscriberId, websubhub:VerifiedSubsc }); do { while true { - readonly & kafka:ConsumerRecord[] records = check consumerEp->poll(config:POLLING_INTERVAL); + readonly & kafka:BytesConsumerRecord[] records = check consumerEp->poll(config:POLLING_INTERVAL); if !isValidConsumer(subscription.hubTopic, subscriberId) { fail error(string `Subscriber with Id ${subscriberId} or topic ${subscription.hubTopic} is invalid`); } @@ -112,7 +113,7 @@ isolated function isValidSubscription(string subscriberId) returns boolean { } } -isolated function notifySubscribers(kafka:ConsumerRecord[] records, websubhub:HubClient clientEp) returns error? { +isolated function notifySubscribers(kafka:BytesConsumerRecord[] records, websubhub:HubClient clientEp) returns error? { do { foreach var kafkaRecord in records { websubhub:ContentDistributionMessage message = check deSerializeKafkaRecord(kafkaRecord); @@ -123,7 +124,7 @@ isolated function notifySubscribers(kafka:ConsumerRecord[] records, websubhub:Hu } } -isolated function deSerializeKafkaRecord(kafka:ConsumerRecord kafkaRecord) returns websubhub:ContentDistributionMessage|error { +isolated function deSerializeKafkaRecord(kafka:BytesConsumerRecord kafkaRecord) returns websubhub:ContentDistributionMessage|error { byte[] content = kafkaRecord.value; string message = check string:fromBytes(content); json payload = check value:fromJsonString(message); @@ -135,13 +136,14 @@ isolated function deSerializeKafkaRecord(kafka:ConsumerRecord kafkaRecord) retur return distributionMsg; } -isolated function getHeaders(kafka:ConsumerRecord kafkaRecord) returns map|error { +isolated function getHeaders(kafka:BytesConsumerRecord kafkaRecord) returns map|error { map headers = {}; foreach var ['key, value] in kafkaRecord.headers.entries().toArray() { - if value is string || value is string[] { - headers['key] = value; - } else if value is byte[] { + if value is byte[] { headers['key] = check string:fromBytes(value); + } else if value is byte[][] { + string[] headerValue = value.'map(v => check string:fromBytes(v)); + headers['key] = headerValue; } } return headers; From 570406008f89f52bf3810f99d492ef9fc591ecca Mon Sep 17 00:00:00 2001 From: Ayesh Almeida <77491511+ayeshLK@users.noreply.github.com> Date: Sat, 23 Nov 2024 19:42:46 +0530 Subject: [PATCH 9/9] Update docker commands for subscriber/publisher containers --- ...ting Websub Hub backed by Kafka Message Broker.md | 12 ++++++++++++ 1 file changed, 12 insertions(+) diff --git a/examples/kafka-hub/A Guide on implementing Websub Hub backed by Kafka Message Broker.md b/examples/kafka-hub/A Guide on implementing Websub Hub backed by Kafka Message Broker.md index 9e9f919d..440ce4c8 100644 --- a/examples/kafka-hub/A Guide on implementing Websub Hub backed by Kafka Message Broker.md +++ b/examples/kafka-hub/A Guide on implementing Websub Hub backed by Kafka Message Broker.md @@ -184,6 +184,12 @@ Then execute the below command to run the program. BAL_CONFIG_FILES=/path/to/Config.toml bal run target/bin/wbsbsubscriber.jar ``` +Running the subscriber on docker. + +```sh +docker run --rm --name -e TOPIC_NAME="" -e CONSUMER_GROUP="" -e HUB_URL="" -e UNSUB_ON_SHUTDOWN="true" -e SVC_PORT="" --network="" --hostname= ayeshalmeida/wbsbsubscriber:5.0.0 +``` + ## Publishing to the Hub Go into the `publisher` directory and execute the following command. @@ -198,6 +204,12 @@ Then execute the below command to run the program. BAL_CONFIG_FILES=/path/to/Config.toml bal run target/bin/wbsbpublisher.jar ``` +Running the publisher on docker. + +```sh +docker run --rm --network host --name websub-publisher -e TOPIC_NAME="" ayeshalmeida/wbsbpublisher:4.0.0 +``` + # Scaling the Hub The scaling of the hub can be done vertically or horizontally. As the hub itself does not maintain any state, those can be scaled up and down as needed. The entire cluster of hubs can be considered as one unit as in you can publish to a particular hub and consume the update message from another hub.