Skip to content

Commit

Permalink
Merge remote-tracking branch 'upstream/kafkahub-mtls' into kafkahub-m…
Browse files Browse the repository at this point in the history
…tls-dev2
  • Loading branch information
ayeshLK committed Dec 13, 2024
2 parents c93759e + 38598fa commit 67e3a5f
Showing 1 changed file with 26 additions and 5 deletions.
31 changes: 26 additions & 5 deletions examples/kafka-hub/hub/modules/connections/connections.bal
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,6 @@ public isolated function createMessageConsumer(string topicName, string groupNam
return new (config:KAFKA_URL, consumerConfiguration);
}

log:printInfo("Assigning kafka-topic partitions manually", details = partitions);
kafka:Consumer|kafka:Error consumerEp = check new (config:KAFKA_URL, consumerConfiguration);
if consumerEp is kafka:Error {
log:printError("Error occurred while creating the consumer", consumerEp);
Expand All @@ -124,11 +123,33 @@ public isolated function createMessageConsumer(string topicName, string groupNam
return paritionAssignmentErr;
}

kafka:Error? kafkaSeekErr = consumerEp->seekToBeginning(kafkaTopicPartitions);
if kafkaSeekErr is kafka:Error {
log:printError("Error occurred while assigning seeking partitions for the consumer", paritionAssignmentErr);
return kafkaSeekErr;
kafka:TopicPartition[] parititionsWithoutCmtdOffsets = [];
foreach kafka:TopicPartition partition in kafkaTopicPartitions {
kafka:PartitionOffset|kafka:Error? offset = consumerEp->getCommittedOffset(partition);
if offset is kafka:Error {
log:printError("Error occurred while retrieving the commited offsets for the topic-partition", offset);
return offset;
}

if offset is () {
parititionsWithoutCmtdOffsets.push(partition);
}

if offset is kafka:PartitionOffset {
kafka:Error? kafkaSeekErr = consumerEp->seek(offset);
if kafkaSeekErr is kafka:Error {
log:printError("Error occurred while assigning seeking partitions for the consumer", kafkaSeekErr);
return kafkaSeekErr;
}
}
}

if parititionsWithoutCmtdOffsets.length() > 0 {
kafka:Error? kafkaSeekErr = consumerEp->seekToBeginning(parititionsWithoutCmtdOffsets);
if kafkaSeekErr is kafka:Error {
log:printError("Error occurred while assigning seeking partitions (for paritions without committed offsets) for the consumer", kafkaSeekErr);
return kafkaSeekErr;
}
}
return consumerEp;
}

0 comments on commit 67e3a5f

Please sign in to comment.