Skip to content

Commit

Permalink
Merge pull request #1066 from ayeshLK/kafkahub-consumer-partition-ass…
Browse files Browse the repository at this point in the history
…ignment

Restructure kafka-consumer paritition-offset assignment logic
  • Loading branch information
ayeshLK authored Dec 13, 2024
2 parents de0a7a6 + b794c43 commit 38598fa
Showing 1 changed file with 26 additions and 5 deletions.
31 changes: 26 additions & 5 deletions examples/kafka-hub/hub/modules/connections/connections.bal
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,6 @@ public isolated function createMessageConsumer(string topicName, string groupNam
return new (config:KAFKA_URL, consumerConfiguration);
}

log:printInfo("Assigning kafka-topic partitions manually", details = partitions);
kafka:Consumer|kafka:Error consumerEp = check new (config:KAFKA_URL, consumerConfiguration);
if consumerEp is kafka:Error {
log:printError("Error occurred while creating the consumer", consumerEp);
Expand All @@ -129,11 +128,33 @@ public isolated function createMessageConsumer(string topicName, string groupNam
return paritionAssignmentErr;
}

kafka:Error? kafkaSeekErr = consumerEp->seekToBeginning(kafkaTopicPartitions);
if kafkaSeekErr is kafka:Error {
log:printError("Error occurred while assigning seeking partitions for the consumer", paritionAssignmentErr);
return kafkaSeekErr;
kafka:TopicPartition[] parititionsWithoutCmtdOffsets = [];
foreach kafka:TopicPartition partition in kafkaTopicPartitions {
kafka:PartitionOffset|kafka:Error? offset = consumerEp->getCommittedOffset(partition);
if offset is kafka:Error {
log:printError("Error occurred while retrieving the commited offsets for the topic-partition", offset);
return offset;
}

if offset is () {
parititionsWithoutCmtdOffsets.push(partition);
}

if offset is kafka:PartitionOffset {
kafka:Error? kafkaSeekErr = consumerEp->seek(offset);
if kafkaSeekErr is kafka:Error {
log:printError("Error occurred while assigning seeking partitions for the consumer", kafkaSeekErr);
return kafkaSeekErr;
}
}
}

if parititionsWithoutCmtdOffsets.length() > 0 {
kafka:Error? kafkaSeekErr = consumerEp->seekToBeginning(parititionsWithoutCmtdOffsets);
if kafkaSeekErr is kafka:Error {
log:printError("Error occurred while assigning seeking partitions (for paritions without committed offsets) for the consumer", kafkaSeekErr);
return kafkaSeekErr;
}
}
return consumerEp;
}

0 comments on commit 38598fa

Please sign in to comment.