diff --git a/examples/kafka-hub/docker-compose.yaml b/examples/kafka-hub/docker-compose.yaml index 6bbf5a4a..fd6a2011 100644 --- a/examples/kafka-hub/docker-compose.yaml +++ b/examples/kafka-hub/docker-compose.yaml @@ -3,7 +3,7 @@ name: 'kafkahub' services: hub-1: - image: 'ayeshalmeida/kafkahub:10.0.0' + image: 'ayeshalmeida/kafkahub:11.0.0' hostname: hub1 container_name: hub-1 ports: diff --git a/examples/kafka-hub/hub/Cloud.toml b/examples/kafka-hub/hub/Cloud.toml index 3e39c08e..e09e3503 100644 --- a/examples/kafka-hub/hub/Cloud.toml +++ b/examples/kafka-hub/hub/Cloud.toml @@ -1,7 +1,7 @@ [container.image] repository="ballerina" name="kafkahub" -tag="10.0.0" +tag="11.0.0" [[container.copy.files]] sourceFile="./resources" diff --git a/examples/kafka-hub/hub/hub_service.bal b/examples/kafka-hub/hub/hub_service.bal index f5e5d303..f3655155 100644 --- a/examples/kafka-hub/hub/hub_service.bal +++ b/examples/kafka-hub/hub/hub_service.bal @@ -204,17 +204,18 @@ websubhub:Service hubService = service object { if config:SECURITY_ON { check security:authorize(headers, ["update_content"]); } - check self.updateMessage(message); + check self.updateMessage(message, headers); return websubhub:ACKNOWLEDGEMENT; } - isolated function updateMessage(websubhub:UpdateMessage msg) returns websubhub:UpdateMessageError? { + isolated function updateMessage(websubhub:UpdateMessage msg, http:Headers headers) returns websubhub:UpdateMessageError? { boolean topicAvailable = false; lock { topicAvailable = registeredTopicsCache.hasKey(msg.hubTopic); } if topicAvailable { - error? errorResponse = persist:addUpdateMessage(msg.hubTopic, msg); + map messageHeaders = getHeadersMap(headers); + error? errorResponse = persist:addUpdateMessage(msg.hubTopic, msg, messageHeaders); if errorResponse is websubhub:UpdateMessageError { return errorResponse; } else if errorResponse is error { @@ -228,3 +229,16 @@ websubhub:Service hubService = service object { } } }; + +isolated function getHeadersMap(http:Headers httpHeaders) returns map { + map headers = {}; + foreach string headerName in httpHeaders.getHeaderNames() { + var headerValues = httpHeaders.getHeaders(headerName); + // safe to ingore the error as here we are retrieving only the available headers + if headerValues is error { + continue; + } + headers[headerName] = headerValues; + } + return headers; +} diff --git a/examples/kafka-hub/hub/hub_state_update.bal b/examples/kafka-hub/hub/hub_state_update.bal index 0d9c0785..ba0efef1 100644 --- a/examples/kafka-hub/hub/hub_state_update.bal +++ b/examples/kafka-hub/hub/hub_state_update.bal @@ -39,11 +39,11 @@ function initializeHubState() returns error? { function updateHubState() returns error? { while true { - kafka:ConsumerRecord[] records = check conn:websubEventsConsumer->poll(config:POLLING_INTERVAL); + kafka:BytesConsumerRecord[] records = check conn:websubEventsConsumer->poll(config:POLLING_INTERVAL); if records.length() <= 0 { continue; } - foreach kafka:ConsumerRecord currentRecord in records { + foreach kafka:BytesConsumerRecord currentRecord in records { string lastPersistedData = check string:fromBytes(currentRecord.value); error? result = processStateUpdateEvent(lastPersistedData); if result is error { diff --git a/examples/kafka-hub/hub/modules/connections/connections.bal b/examples/kafka-hub/hub/modules/connections/connections.bal index 59b0e6b0..64bbec4b 100644 --- a/examples/kafka-hub/hub/modules/connections/connections.bal +++ b/examples/kafka-hub/hub/modules/connections/connections.bal @@ -94,9 +94,9 @@ public final kafka:Consumer websubEventsConsumer = check new (config:KAFKA_URL, # # + topicName - The kafka-topic to which the consumer should subscribe # + groupName - The consumer group name -# + partition - The kafka topic-partitions +# + partitions - The kafka topic-partitions # + return - `kafka:Consumer` if succcessful or else `error` -public isolated function createMessageConsumer(string topicName, string groupName, int|int[]? partition = ()) returns kafka:Consumer|error { +public isolated function createMessageConsumer(string topicName, string groupName, int[]? partitions = ()) returns kafka:Consumer|error { // Messages are distributed to subscribers in parallel. // In this scenario, manually committing offsets is unnecessary because // the next message polling starts as soon as the worker begins delivering messages to the subscribers. @@ -104,26 +104,36 @@ public isolated function createMessageConsumer(string topicName, string groupNam // Related issue: https://github.com/ballerina-platform/ballerina-library/issues/7376 kafka:ConsumerConfiguration consumerConfiguration = { groupId: groupName, - topics: [topicName], autoCommit: true, secureSocket: secureSocketConfig, securityProtocol: kafka:PROTOCOL_SSL, maxPollRecords: config:CONSUMER_MAX_POLL_RECORDS }; - if partition is () { + if partitions is () { + // Kafka consumer topic subscription should only be used when manual partition assignment is not used + consumerConfiguration.topics = [topicName]; return new (config:KAFKA_URL, consumerConfiguration); } - kafka:Consumer consumerEp = check new (config:KAFKA_URL, consumerConfiguration); - check consumerEp->assign(getTopicPartitions(topicName, partition)); - return consumerEp; -} + log:printInfo("Assigning kafka-topic partitions manually", details = partitions); + kafka:Consumer|kafka:Error consumerEp = check new (config:KAFKA_URL, consumerConfiguration); + if consumerEp is kafka:Error { + log:printError("Error occurred while creating the consumer", consumerEp); + return consumerEp; + } -isolated function getTopicPartitions(string topic, int|int[] partition) returns kafka:TopicPartition[] { - if partition is int { - return [ - {topic, partition} - ]; + kafka:TopicPartition[] kafkaTopicPartitions = partitions.'map(p => {topic: topicName, partition: p}); + kafka:Error? paritionAssignmentErr = consumerEp->assign(kafkaTopicPartitions); + if paritionAssignmentErr is kafka:Error { + log:printError("Error occurred while assigning partitions to the consumer", paritionAssignmentErr); + return paritionAssignmentErr; } - return partition.'map(p => {topic, partition: p}); + + kafka:Error? kafkaSeekErr = consumerEp->seekToBeginning(kafkaTopicPartitions); + if kafkaSeekErr is kafka:Error { + log:printError("Error occurred while assigning seeking partitions for the consumer", paritionAssignmentErr); + return kafkaSeekErr; + } + + return consumerEp; } diff --git a/examples/kafka-hub/hub/modules/persistence/persistence.bal b/examples/kafka-hub/hub/modules/persistence/persistence.bal index a799ecff..d8ee8797 100644 --- a/examples/kafka-hub/hub/modules/persistence/persistence.bal +++ b/examples/kafka-hub/hub/modules/persistence/persistence.bal @@ -45,7 +45,8 @@ isolated function updateHubState(websubhub:TopicRegistration|websubhub:TopicDere } } -public isolated function addUpdateMessage(string topicName, websubhub:UpdateMessage message) returns error? { +public isolated function addUpdateMessage(string topicName, websubhub:UpdateMessage message, + map headers = {}) returns error? { json payload = message.content; check produceKafkaMessage(topicName, payload); } diff --git a/examples/kafka-hub/hub/websub_subscribers.bal b/examples/kafka-hub/hub/websub_subscribers.bal index 5532ff73..411b5f99 100644 --- a/examples/kafka-hub/hub/websub_subscribers.bal +++ b/examples/kafka-hub/hub/websub_subscribers.bal @@ -26,7 +26,7 @@ import ballerina/mime; isolated map subscribersCache = {}; const string CONSUMER_GROUP = "consumerGroup"; -const string CONSUMER_TOPIC_PARTITION = "topicPartition"; +const string CONSUMER_TOPIC_PARTITIONS = "topicPartitions"; const string SERVER_ID = "SERVER_ID"; const string STATUS = "status"; const string STALE_STATE = "stale"; @@ -70,12 +70,9 @@ isolated function processUnsubscription(websubhub:VerifiedUnsubscription unsubsc isolated function pollForNewUpdates(string subscriberId, websubhub:VerifiedSubscription subscription) returns error? { string consumerGroup = check value:ensureType(subscription[CONSUMER_GROUP]); - int? topicPartition = (); - if subscription.hasKey(CONSUMER_TOPIC_PARTITION) { - string partitionDetails = check value:ensureType(subscription[CONSUMER_TOPIC_PARTITION]); - topicPartition = check int:fromString(partitionDetails); - } - kafka:Consumer consumerEp = check conn:createMessageConsumer(subscription.hubTopic, consumerGroup, topicPartition); + int[]? topicPartitions = check getTopicPartitions(subscription); + log:printInfo("Manually partition assignment", isRequired = topicPartitions !is ()); + kafka:Consumer consumerEp = check conn:createMessageConsumer(subscription.hubTopic, consumerGroup, topicPartitions); websubhub:HubClient clientEp = check new (subscription, { retryConfig: { interval: config:MESSAGE_DELIVERY_RETRY_INTERVAL, @@ -88,7 +85,7 @@ isolated function pollForNewUpdates(string subscriberId, websubhub:VerifiedSubsc }); do { while true { - readonly & kafka:ConsumerRecord[] records = check consumerEp->poll(config:POLLING_INTERVAL); + readonly & kafka:BytesConsumerRecord[] records = check consumerEp->poll(config:POLLING_INTERVAL); if !isValidConsumer(subscription.hubTopic, subscriberId) { fail error(string `Subscriber with Id ${subscriberId} or topic ${subscription.hubTopic} is invalid`); } @@ -116,7 +113,7 @@ isolated function isValidSubscription(string subscriberId) returns boolean { } } -isolated function notifySubscribers(kafka:ConsumerRecord[] records, websubhub:HubClient clientEp) returns error? { +isolated function notifySubscribers(kafka:BytesConsumerRecord[] records, websubhub:HubClient clientEp) returns error? { do { foreach var kafkaRecord in records { websubhub:ContentDistributionMessage message = check deSerializeKafkaRecord(kafkaRecord); @@ -127,7 +124,7 @@ isolated function notifySubscribers(kafka:ConsumerRecord[] records, websubhub:Hu } } -isolated function deSerializeKafkaRecord(kafka:ConsumerRecord kafkaRecord) returns websubhub:ContentDistributionMessage|error { +isolated function deSerializeKafkaRecord(kafka:BytesConsumerRecord kafkaRecord) returns websubhub:ContentDistributionMessage|error { byte[] content = kafkaRecord.value; string message = check string:fromBytes(content); json payload = check value:fromJsonString(message); @@ -139,14 +136,24 @@ isolated function deSerializeKafkaRecord(kafka:ConsumerRecord kafkaRecord) retur return distributionMsg; } -isolated function getHeaders(kafka:ConsumerRecord kafkaRecord) returns map|error { +isolated function getHeaders(kafka:BytesConsumerRecord kafkaRecord) returns map|error { map headers = {}; foreach var ['key, value] in kafkaRecord.headers.entries().toArray() { - if value is string || value is string[] { - headers['key] = value; - } else if value is byte[] { + if value is byte[] { headers['key] = check string:fromBytes(value); + } else if value is byte[][] { + string[] headerValue = value.'map(v => check string:fromBytes(v)); + headers['key] = headerValue; } } return headers; } + +isolated function getTopicPartitions(websubhub:VerifiedSubscription subscription) returns int[]|error? { + if !subscription.hasKey(CONSUMER_TOPIC_PARTITIONS) { + return; + } + // Kafka topic partitions will be a string with comma separated integers eg: "1,2,3,4" + string partitionInfo = check value:ensureType(subscription[CONSUMER_TOPIC_PARTITIONS]); + return re `,`.split(partitionInfo).'map(p => p.trim()).'map(p => check int:fromString(p)); +} diff --git a/examples/kafka-hub/subscriber/Cloud.toml b/examples/kafka-hub/subscriber/Cloud.toml index e3076be4..3189b754 100644 --- a/examples/kafka-hub/subscriber/Cloud.toml +++ b/examples/kafka-hub/subscriber/Cloud.toml @@ -1,7 +1,7 @@ [container.image] repository="ballerina" name="wbsbsubscriber" -tag="5.0.0" +tag="6.0.0" [[container.copy.files]] sourceFile="./resources" diff --git a/examples/kafka-hub/subscriber/subscriber_service.bal b/examples/kafka-hub/subscriber/subscriber_service.bal index 00b36d7a..fe87ca22 100644 --- a/examples/kafka-hub/subscriber/subscriber_service.bal +++ b/examples/kafka-hub/subscriber/subscriber_service.bal @@ -110,12 +110,19 @@ service /JuApTOXq19 on securedSubscriber { } isolated function getCustomParams() returns map { - if os:getEnv("CONSUMER_GROUP") == "" { - return {}; + if os:getEnv("CONSUMER_GROUP") !is "" { + return { + consumerGroup: os:getEnv("CONSUMER_GROUP") + }; } - return { - consumerGroup: os:getEnv("CONSUMER_GROUP") - }; + + if os:getEnv("TOPIC_PARTITIONS") !is "" { + return { + topicPartitions: os:getEnv("TOPIC_PARTITIONS") + }; + } + + return {}; } isolated function getListener() returns websub:Listener|error {