diff --git a/examples/kafka-hub/hub/modules/connections/connections.bal b/examples/kafka-hub/hub/modules/connections/connections.bal index 5f69578a..1347e90c 100644 --- a/examples/kafka-hub/hub/modules/connections/connections.bal +++ b/examples/kafka-hub/hub/modules/connections/connections.bal @@ -110,7 +110,6 @@ public isolated function createMessageConsumer(string topicName, string groupNam return new (config:KAFKA_URL, consumerConfiguration); } - log:printInfo("Assigning kafka-topic partitions manually", details = partitions); kafka:Consumer|kafka:Error consumerEp = check new (config:KAFKA_URL, consumerConfiguration); if consumerEp is kafka:Error { log:printError("Error occurred while creating the consumer", consumerEp); @@ -124,11 +123,33 @@ public isolated function createMessageConsumer(string topicName, string groupNam return paritionAssignmentErr; } - kafka:Error? kafkaSeekErr = consumerEp->seekToBeginning(kafkaTopicPartitions); - if kafkaSeekErr is kafka:Error { - log:printError("Error occurred while assigning seeking partitions for the consumer", paritionAssignmentErr); - return kafkaSeekErr; + kafka:TopicPartition[] parititionsWithoutCmtdOffsets = []; + foreach kafka:TopicPartition partition in kafkaTopicPartitions { + kafka:PartitionOffset|kafka:Error? offset = consumerEp->getCommittedOffset(partition); + if offset is kafka:Error { + log:printError("Error occurred while retrieving the commited offsets for the topic-partition", offset); + return offset; + } + + if offset is () { + parititionsWithoutCmtdOffsets.push(partition); + } + + if offset is kafka:PartitionOffset { + kafka:Error? kafkaSeekErr = consumerEp->seek(offset); + if kafkaSeekErr is kafka:Error { + log:printError("Error occurred while assigning seeking partitions for the consumer", kafkaSeekErr); + return kafkaSeekErr; + } + } } + if parititionsWithoutCmtdOffsets.length() > 0 { + kafka:Error? kafkaSeekErr = consumerEp->seekToBeginning(parititionsWithoutCmtdOffsets); + if kafkaSeekErr is kafka:Error { + log:printError("Error occurred while assigning seeking partitions (for paritions without committed offsets) for the consumer", kafkaSeekErr); + return kafkaSeekErr; + } + } return consumerEp; }