Skip to content

Commit

Permalink
Add logic for topic-partition retrieval from subscription
Browse files Browse the repository at this point in the history
  • Loading branch information
ayeshLK committed Nov 22, 2024
1 parent 63d1011 commit edf60fc
Showing 1 changed file with 6 additions and 1 deletion.
7 changes: 6 additions & 1 deletion examples/kafka-hub/hub/websub_subscribers.bal
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,12 @@ isolated function processUnsubscription(websubhub:VerifiedUnsubscription unsubsc

isolated function pollForNewUpdates(string subscriberId, websubhub:VerifiedSubscription subscription) returns error? {
string consumerGroup = check value:ensureType(subscription[CONSUMER_GROUP]);
kafka:Consumer consumerEp = check conn:createMessageConsumer(subscription.hubTopic, consumerGroup);
int? topicPartition = ();
if subscription.hasKey(CONSUMER_TOPIC_PARTITION) {
string partitionDetails = check value:ensureType(subscription[CONSUMER_TOPIC_PARTITION]);
topicPartition = check int:fromString(partitionDetails);
}
kafka:Consumer consumerEp = check conn:createMessageConsumer(subscription.hubTopic, consumerGroup, topicPartition);
websubhub:HubClient clientEp = check new (subscription, {
retryConfig: {
interval: config:MESSAGE_DELIVERY_RETRY_INTERVAL,
Expand Down

0 comments on commit edf60fc

Please sign in to comment.