Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor the logic for parallel message delivery #1067

Merged
merged 2 commits into from
Dec 13, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -97,14 +97,9 @@ public final kafka:Consumer websubEventsConsumer = check new (config:KAFKA_URL,
# + partitions - The kafka topic-partitions
# + return - `kafka:Consumer` if succcessful or else `error`
public isolated function createMessageConsumer(string topicName, string groupName, int[]? partitions = ()) returns kafka:Consumer|error {
// Messages are distributed to subscribers in parallel.
// In this scenario, manually committing offsets is unnecessary because
// the next message polling starts as soon as the worker begins delivering messages to the subscribers.
// Therefore, auto-commit is enabled to handle offset management automatically.
// Related issue: https://github.com/ballerina-platform/ballerina-library/issues/7376
kafka:ConsumerConfiguration consumerConfiguration = {
groupId: groupName,
autoCommit: true,
autoCommit: false,
secureSocket: secureSocketConfig,
securityProtocol: kafka:PROTOCOL_SSL,
maxPollRecords: config:CONSUMER_MAX_POLL_RECORDS
Expand Down
24 changes: 20 additions & 4 deletions examples/kafka-hub/hub/websub_subscribers.bal
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,6 @@ isolated function processUnsubscription(websubhub:VerifiedUnsubscription unsubsc
isolated function pollForNewUpdates(string subscriberId, websubhub:VerifiedSubscription subscription) returns error? {
string consumerGroup = check value:ensureType(subscription[CONSUMER_GROUP]);
int[]? topicPartitions = check getTopicPartitions(subscription);
log:printInfo("Manually partition assignment", isRequired = topicPartitions !is ());
kafka:Consumer consumerEp = check conn:createMessageConsumer(subscription.hubTopic, consumerGroup, topicPartitions);
websubhub:HubClient clientEp = check new (subscription, {
retryConfig: {
Expand All @@ -89,7 +88,7 @@ isolated function pollForNewUpdates(string subscriberId, websubhub:VerifiedSubsc
if !isValidConsumer(subscription.hubTopic, subscriberId) {
fail error(string `Subscriber with Id ${subscriberId} or topic ${subscription.hubTopic} is invalid`);
}
_ = start notifySubscribers(records, clientEp);
_ = check notifySubscribers(consumerEp, records, clientEp);
}
} on fail var e {
util:logError("Error occurred while sending notification to subscriber", e);
Expand All @@ -113,17 +112,34 @@ isolated function isValidSubscription(string subscriberId) returns boolean {
}
}

isolated function notifySubscribers(kafka:BytesConsumerRecord[] records, websubhub:HubClient clientEp) returns error? {
isolated function notifySubscribers(kafka:Consumer consumerEp, kafka:BytesConsumerRecord[] records, websubhub:HubClient clientEp) returns error? {
do {
future<websubhub:ContentDistributionSuccess|error>[] distributionResponses = [];
foreach var kafkaRecord in records {
websubhub:ContentDistributionMessage message = check deSerializeKafkaRecord(kafkaRecord);
_ = check clientEp->notifyContentDistribution(message);
future<websubhub:ContentDistributionSuccess|error> distributionResponse = start clientEp->notifyContentDistribution(message.cloneReadOnly());
distributionResponses.push(distributionResponse);
}

boolean hasErrors = distributionResponses
.'map(f => waitAndGetResult(f))
.'map(r => r is error)
.reduce(isolated function (boolean a, boolean b) returns boolean => a && b, false);

if hasErrors {
return error("Error occurred while distributing content to the subscriber");
}
return consumerEp->'commit();
} on fail error e {
log:printError("Error occurred while delivering messages to the subscriber", err = e.message());
}
}

isolated function waitAndGetResult(future<websubhub:ContentDistributionSuccess|error> response) returns websubhub:ContentDistributionSuccess|error {
websubhub:ContentDistributionSuccess|error responseValue = wait response;
return responseValue;
}

isolated function deSerializeKafkaRecord(kafka:BytesConsumerRecord kafkaRecord) returns websubhub:ContentDistributionMessage|error {
byte[] content = kafkaRecord.value;
string message = check string:fromBytes(content);
Expand Down
Loading