diff --git a/examples/kafka-hub/A Guide on implementing Websub Hub backed by Kafka Message Broker.md b/examples/kafka-hub/A Guide on implementing Websub Hub backed by Kafka Message Broker.md index 9e9f919d..440ce4c8 100644 --- a/examples/kafka-hub/A Guide on implementing Websub Hub backed by Kafka Message Broker.md +++ b/examples/kafka-hub/A Guide on implementing Websub Hub backed by Kafka Message Broker.md @@ -184,6 +184,12 @@ Then execute the below command to run the program. BAL_CONFIG_FILES=/path/to/Config.toml bal run target/bin/wbsbsubscriber.jar ``` +Running the subscriber on docker. + +```sh +docker run --rm --name -e TOPIC_NAME="" -e CONSUMER_GROUP="" -e HUB_URL="" -e UNSUB_ON_SHUTDOWN="true" -e SVC_PORT="" --network="" --hostname= ayeshalmeida/wbsbsubscriber:5.0.0 +``` + ## Publishing to the Hub Go into the `publisher` directory and execute the following command. @@ -198,6 +204,12 @@ Then execute the below command to run the program. BAL_CONFIG_FILES=/path/to/Config.toml bal run target/bin/wbsbpublisher.jar ``` +Running the publisher on docker. + +```sh +docker run --rm --network host --name websub-publisher -e TOPIC_NAME="" ayeshalmeida/wbsbpublisher:4.0.0 +``` + # Scaling the Hub The scaling of the hub can be done vertically or horizontally. As the hub itself does not maintain any state, those can be scaled up and down as needed. The entire cluster of hubs can be considered as one unit as in you can publish to a particular hub and consume the update message from another hub. diff --git a/examples/kafka-hub/docker-compose.yaml b/examples/kafka-hub/docker-compose.yaml index 6bbf5a4a..fd6a2011 100644 --- a/examples/kafka-hub/docker-compose.yaml +++ b/examples/kafka-hub/docker-compose.yaml @@ -3,7 +3,7 @@ name: 'kafkahub' services: hub-1: - image: 'ayeshalmeida/kafkahub:10.0.0' + image: 'ayeshalmeida/kafkahub:11.0.0' hostname: hub1 container_name: hub-1 ports: diff --git a/examples/kafka-hub/hub/Cloud.toml b/examples/kafka-hub/hub/Cloud.toml index 3e39c08e..e09e3503 100644 --- a/examples/kafka-hub/hub/Cloud.toml +++ b/examples/kafka-hub/hub/Cloud.toml @@ -1,7 +1,7 @@ [container.image] repository="ballerina" name="kafkahub" -tag="10.0.0" +tag="11.0.0" [[container.copy.files]] sourceFile="./resources" diff --git a/examples/kafka-hub/hub/hub_service.bal b/examples/kafka-hub/hub/hub_service.bal index f5e5d303..f3655155 100644 --- a/examples/kafka-hub/hub/hub_service.bal +++ b/examples/kafka-hub/hub/hub_service.bal @@ -204,17 +204,18 @@ websubhub:Service hubService = service object { if config:SECURITY_ON { check security:authorize(headers, ["update_content"]); } - check self.updateMessage(message); + check self.updateMessage(message, headers); return websubhub:ACKNOWLEDGEMENT; } - isolated function updateMessage(websubhub:UpdateMessage msg) returns websubhub:UpdateMessageError? { + isolated function updateMessage(websubhub:UpdateMessage msg, http:Headers headers) returns websubhub:UpdateMessageError? { boolean topicAvailable = false; lock { topicAvailable = registeredTopicsCache.hasKey(msg.hubTopic); } if topicAvailable { - error? errorResponse = persist:addUpdateMessage(msg.hubTopic, msg); + map messageHeaders = getHeadersMap(headers); + error? errorResponse = persist:addUpdateMessage(msg.hubTopic, msg, messageHeaders); if errorResponse is websubhub:UpdateMessageError { return errorResponse; } else if errorResponse is error { @@ -228,3 +229,16 @@ websubhub:Service hubService = service object { } } }; + +isolated function getHeadersMap(http:Headers httpHeaders) returns map { + map headers = {}; + foreach string headerName in httpHeaders.getHeaderNames() { + var headerValues = httpHeaders.getHeaders(headerName); + // safe to ingore the error as here we are retrieving only the available headers + if headerValues is error { + continue; + } + headers[headerName] = headerValues; + } + return headers; +} diff --git a/examples/kafka-hub/hub/hub_state_update.bal b/examples/kafka-hub/hub/hub_state_update.bal index 0d9c0785..ba0efef1 100644 --- a/examples/kafka-hub/hub/hub_state_update.bal +++ b/examples/kafka-hub/hub/hub_state_update.bal @@ -39,11 +39,11 @@ function initializeHubState() returns error? { function updateHubState() returns error? { while true { - kafka:ConsumerRecord[] records = check conn:websubEventsConsumer->poll(config:POLLING_INTERVAL); + kafka:BytesConsumerRecord[] records = check conn:websubEventsConsumer->poll(config:POLLING_INTERVAL); if records.length() <= 0 { continue; } - foreach kafka:ConsumerRecord currentRecord in records { + foreach kafka:BytesConsumerRecord currentRecord in records { string lastPersistedData = check string:fromBytes(currentRecord.value); error? result = processStateUpdateEvent(lastPersistedData); if result is error { diff --git a/examples/kafka-hub/hub/modules/connections/connections.bal b/examples/kafka-hub/hub/modules/connections/connections.bal index 59b0e6b0..64bbec4b 100644 --- a/examples/kafka-hub/hub/modules/connections/connections.bal +++ b/examples/kafka-hub/hub/modules/connections/connections.bal @@ -94,9 +94,9 @@ public final kafka:Consumer websubEventsConsumer = check new (config:KAFKA_URL, # # + topicName - The kafka-topic to which the consumer should subscribe # + groupName - The consumer group name -# + partition - The kafka topic-partitions +# + partitions - The kafka topic-partitions # + return - `kafka:Consumer` if succcessful or else `error` -public isolated function createMessageConsumer(string topicName, string groupName, int|int[]? partition = ()) returns kafka:Consumer|error { +public isolated function createMessageConsumer(string topicName, string groupName, int[]? partitions = ()) returns kafka:Consumer|error { // Messages are distributed to subscribers in parallel. // In this scenario, manually committing offsets is unnecessary because // the next message polling starts as soon as the worker begins delivering messages to the subscribers. @@ -104,26 +104,36 @@ public isolated function createMessageConsumer(string topicName, string groupNam // Related issue: https://github.com/ballerina-platform/ballerina-library/issues/7376 kafka:ConsumerConfiguration consumerConfiguration = { groupId: groupName, - topics: [topicName], autoCommit: true, secureSocket: secureSocketConfig, securityProtocol: kafka:PROTOCOL_SSL, maxPollRecords: config:CONSUMER_MAX_POLL_RECORDS }; - if partition is () { + if partitions is () { + // Kafka consumer topic subscription should only be used when manual partition assignment is not used + consumerConfiguration.topics = [topicName]; return new (config:KAFKA_URL, consumerConfiguration); } - kafka:Consumer consumerEp = check new (config:KAFKA_URL, consumerConfiguration); - check consumerEp->assign(getTopicPartitions(topicName, partition)); - return consumerEp; -} + log:printInfo("Assigning kafka-topic partitions manually", details = partitions); + kafka:Consumer|kafka:Error consumerEp = check new (config:KAFKA_URL, consumerConfiguration); + if consumerEp is kafka:Error { + log:printError("Error occurred while creating the consumer", consumerEp); + return consumerEp; + } -isolated function getTopicPartitions(string topic, int|int[] partition) returns kafka:TopicPartition[] { - if partition is int { - return [ - {topic, partition} - ]; + kafka:TopicPartition[] kafkaTopicPartitions = partitions.'map(p => {topic: topicName, partition: p}); + kafka:Error? paritionAssignmentErr = consumerEp->assign(kafkaTopicPartitions); + if paritionAssignmentErr is kafka:Error { + log:printError("Error occurred while assigning partitions to the consumer", paritionAssignmentErr); + return paritionAssignmentErr; } - return partition.'map(p => {topic, partition: p}); + + kafka:Error? kafkaSeekErr = consumerEp->seekToBeginning(kafkaTopicPartitions); + if kafkaSeekErr is kafka:Error { + log:printError("Error occurred while assigning seeking partitions for the consumer", paritionAssignmentErr); + return kafkaSeekErr; + } + + return consumerEp; } diff --git a/examples/kafka-hub/hub/modules/persistence/persistence.bal b/examples/kafka-hub/hub/modules/persistence/persistence.bal index a799ecff..d8ee8797 100644 --- a/examples/kafka-hub/hub/modules/persistence/persistence.bal +++ b/examples/kafka-hub/hub/modules/persistence/persistence.bal @@ -45,7 +45,8 @@ isolated function updateHubState(websubhub:TopicRegistration|websubhub:TopicDere } } -public isolated function addUpdateMessage(string topicName, websubhub:UpdateMessage message) returns error? { +public isolated function addUpdateMessage(string topicName, websubhub:UpdateMessage message, + map headers = {}) returns error? { json payload = message.content; check produceKafkaMessage(topicName, payload); } diff --git a/examples/kafka-hub/hub/websub_subscribers.bal b/examples/kafka-hub/hub/websub_subscribers.bal index 5532ff73..411b5f99 100644 --- a/examples/kafka-hub/hub/websub_subscribers.bal +++ b/examples/kafka-hub/hub/websub_subscribers.bal @@ -26,7 +26,7 @@ import ballerina/mime; isolated map subscribersCache = {}; const string CONSUMER_GROUP = "consumerGroup"; -const string CONSUMER_TOPIC_PARTITION = "topicPartition"; +const string CONSUMER_TOPIC_PARTITIONS = "topicPartitions"; const string SERVER_ID = "SERVER_ID"; const string STATUS = "status"; const string STALE_STATE = "stale"; @@ -70,12 +70,9 @@ isolated function processUnsubscription(websubhub:VerifiedUnsubscription unsubsc isolated function pollForNewUpdates(string subscriberId, websubhub:VerifiedSubscription subscription) returns error? { string consumerGroup = check value:ensureType(subscription[CONSUMER_GROUP]); - int? topicPartition = (); - if subscription.hasKey(CONSUMER_TOPIC_PARTITION) { - string partitionDetails = check value:ensureType(subscription[CONSUMER_TOPIC_PARTITION]); - topicPartition = check int:fromString(partitionDetails); - } - kafka:Consumer consumerEp = check conn:createMessageConsumer(subscription.hubTopic, consumerGroup, topicPartition); + int[]? topicPartitions = check getTopicPartitions(subscription); + log:printInfo("Manually partition assignment", isRequired = topicPartitions !is ()); + kafka:Consumer consumerEp = check conn:createMessageConsumer(subscription.hubTopic, consumerGroup, topicPartitions); websubhub:HubClient clientEp = check new (subscription, { retryConfig: { interval: config:MESSAGE_DELIVERY_RETRY_INTERVAL, @@ -88,7 +85,7 @@ isolated function pollForNewUpdates(string subscriberId, websubhub:VerifiedSubsc }); do { while true { - readonly & kafka:ConsumerRecord[] records = check consumerEp->poll(config:POLLING_INTERVAL); + readonly & kafka:BytesConsumerRecord[] records = check consumerEp->poll(config:POLLING_INTERVAL); if !isValidConsumer(subscription.hubTopic, subscriberId) { fail error(string `Subscriber with Id ${subscriberId} or topic ${subscription.hubTopic} is invalid`); } @@ -116,7 +113,7 @@ isolated function isValidSubscription(string subscriberId) returns boolean { } } -isolated function notifySubscribers(kafka:ConsumerRecord[] records, websubhub:HubClient clientEp) returns error? { +isolated function notifySubscribers(kafka:BytesConsumerRecord[] records, websubhub:HubClient clientEp) returns error? { do { foreach var kafkaRecord in records { websubhub:ContentDistributionMessage message = check deSerializeKafkaRecord(kafkaRecord); @@ -127,7 +124,7 @@ isolated function notifySubscribers(kafka:ConsumerRecord[] records, websubhub:Hu } } -isolated function deSerializeKafkaRecord(kafka:ConsumerRecord kafkaRecord) returns websubhub:ContentDistributionMessage|error { +isolated function deSerializeKafkaRecord(kafka:BytesConsumerRecord kafkaRecord) returns websubhub:ContentDistributionMessage|error { byte[] content = kafkaRecord.value; string message = check string:fromBytes(content); json payload = check value:fromJsonString(message); @@ -139,14 +136,24 @@ isolated function deSerializeKafkaRecord(kafka:ConsumerRecord kafkaRecord) retur return distributionMsg; } -isolated function getHeaders(kafka:ConsumerRecord kafkaRecord) returns map|error { +isolated function getHeaders(kafka:BytesConsumerRecord kafkaRecord) returns map|error { map headers = {}; foreach var ['key, value] in kafkaRecord.headers.entries().toArray() { - if value is string || value is string[] { - headers['key] = value; - } else if value is byte[] { + if value is byte[] { headers['key] = check string:fromBytes(value); + } else if value is byte[][] { + string[] headerValue = value.'map(v => check string:fromBytes(v)); + headers['key] = headerValue; } } return headers; } + +isolated function getTopicPartitions(websubhub:VerifiedSubscription subscription) returns int[]|error? { + if !subscription.hasKey(CONSUMER_TOPIC_PARTITIONS) { + return; + } + // Kafka topic partitions will be a string with comma separated integers eg: "1,2,3,4" + string partitionInfo = check value:ensureType(subscription[CONSUMER_TOPIC_PARTITIONS]); + return re `,`.split(partitionInfo).'map(p => p.trim()).'map(p => check int:fromString(p)); +} diff --git a/examples/kafka-hub/subscriber/Cloud.toml b/examples/kafka-hub/subscriber/Cloud.toml index e3076be4..3189b754 100644 --- a/examples/kafka-hub/subscriber/Cloud.toml +++ b/examples/kafka-hub/subscriber/Cloud.toml @@ -1,7 +1,7 @@ [container.image] repository="ballerina" name="wbsbsubscriber" -tag="5.0.0" +tag="6.0.0" [[container.copy.files]] sourceFile="./resources" diff --git a/examples/kafka-hub/subscriber/subscriber_service.bal b/examples/kafka-hub/subscriber/subscriber_service.bal index 00b36d7a..fe87ca22 100644 --- a/examples/kafka-hub/subscriber/subscriber_service.bal +++ b/examples/kafka-hub/subscriber/subscriber_service.bal @@ -110,12 +110,19 @@ service /JuApTOXq19 on securedSubscriber { } isolated function getCustomParams() returns map { - if os:getEnv("CONSUMER_GROUP") == "" { - return {}; + if os:getEnv("CONSUMER_GROUP") !is "" { + return { + consumerGroup: os:getEnv("CONSUMER_GROUP") + }; } - return { - consumerGroup: os:getEnv("CONSUMER_GROUP") - }; + + if os:getEnv("TOPIC_PARTITIONS") !is "" { + return { + topicPartitions: os:getEnv("TOPIC_PARTITIONS") + }; + } + + return {}; } isolated function getListener() returns websub:Listener|error {