diff --git a/asb-ballerina/Ballerina.toml b/asb-ballerina/Ballerina.toml index e19ab5b2..188e7c9f 100644 --- a/asb-ballerina/Ballerina.toml +++ b/asb-ballerina/Ballerina.toml @@ -2,7 +2,7 @@ distribution = "2201.3.1" org = "ballerinax" name = "asb" -version = "3.1.0" +version = "3.2.0" license= ["Apache-2.0"] authors = ["Ballerina"] keywords = ["IT Operations/Message Brokers", "Cost/Paid", "Vendor/Microsoft"] @@ -13,9 +13,9 @@ repository = "https://github.com/ballerina-platform/module-ballerinax-azure-serv observabilityIncluded = true [[platform.java11.dependency]] -path = "../asb-native/build/libs/asb-native-3.1.0.jar" +path = "../asb-native/build/libs/asb-native-3.2.0.jar" groupId = "org.ballerinax" artifactId = "asb-native" module = "asb-native" -version = "3.1.0" +version = "3.2.0" diff --git a/asb-ballerina/Dependencies.toml b/asb-ballerina/Dependencies.toml index 08383432..d4a7c603 100644 --- a/asb-ballerina/Dependencies.toml +++ b/asb-ballerina/Dependencies.toml @@ -104,7 +104,7 @@ modules = [ [[package]] org = "ballerinax" name = "asb" -version = "3.1.0" +version = "3.2.0" dependencies = [ {org = "ballerina", name = "jballerina.java"}, {org = "ballerina", name = "log"}, diff --git a/asb-ballerina/commons.bal b/asb-ballerina/commons.bal index 355a69e9..0ed403f8 100644 --- a/asb-ballerina/commons.bal +++ b/asb-ballerina/commons.bal @@ -129,6 +129,7 @@ public type QueueConfig record { # The valid values are TOPIC and QUEUE # + topicOrQueueName - A string field that holds the name of the topic or queue # + connectionString - A string field that holds the Service Bus connection string with Shared Access Signatures. +# + amqpRetryOptions - Retry configurations related to underlying AMQP message sender @display {label: "Sender Connection Config"} public type ASBServiceSenderConfig record { @display {label: "EntityType"} @@ -137,6 +138,8 @@ public type ASBServiceSenderConfig record { string topicOrQueueName; @display {label: "ConnectionString"} string connectionString; + @display {label: "AMQP retry configurations"} + AmqpRetryOptions amqpRetryOptions = {}; }; # Represents Custom configurations for the ASB connector diff --git a/asb-ballerina/messageSender.bal b/asb-ballerina/messageSender.bal index 53c530fd..b3bcf267 100644 --- a/asb-ballerina/messageSender.bal +++ b/asb-ballerina/messageSender.bal @@ -40,7 +40,7 @@ public isolated client class MessageSender { self.entityType = config.entityType; self.logLevel = customConfiguration.logLevel; self.senderHandle = check initMessageSender(java:fromString(self.connectionString), java:fromString(self.entityType), - java:fromString(self.topicOrQueueName), java:fromString(self.logLevel)); + java:fromString(self.topicOrQueueName), java:fromString(self.logLevel), config.amqpRetryOptions); } # Send message to queue or topic with a message body. @@ -102,9 +102,9 @@ public isolated client class MessageSender { } } -isolated function initMessageSender(handle connectionString, handle entityType, handle topicOrQueueName, handle isLogEnabled) returns handle|error = @java:Constructor { +isolated function initMessageSender(handle connectionString, handle entityType, handle topicOrQueueName, handle isLogEnabled, AmqpRetryOptions retryOptions) returns handle|error = @java:Constructor { 'class: "org.ballerinax.asb.sender.MessageSender", - paramTypes: ["java.lang.String", "java.lang.String", "java.lang.String", "java.lang.String"] + paramTypes: ["java.lang.String", "java.lang.String", "java.lang.String", "java.lang.String", "io.ballerina.runtime.api.values.BMap"] } external; isolated function send(handle senderHandle, Message message) returns error? = @java:Method { diff --git a/asb-native/src/main/java/org/ballerinax/asb/receiver/MessageReceiver.java b/asb-native/src/main/java/org/ballerinax/asb/receiver/MessageReceiver.java index 80327217..1d35dbae 100644 --- a/asb-native/src/main/java/org/ballerinax/asb/receiver/MessageReceiver.java +++ b/asb-native/src/main/java/org/ballerinax/asb/receiver/MessageReceiver.java @@ -57,12 +57,8 @@ import org.ballerinax.asb.util.ASBUtils; import org.ballerinax.asb.util.ModuleUtils; -import static org.ballerinax.asb.util.ASBConstants.DELAY; -import static org.ballerinax.asb.util.ASBConstants.MAX_DELAY; -import static org.ballerinax.asb.util.ASBConstants.MAX_RETRIES; import static org.ballerinax.asb.util.ASBConstants.RECEIVE_AND_DELETE; -import static org.ballerinax.asb.util.ASBConstants.RETRY_MODE; -import static org.ballerinax.asb.util.ASBConstants.TRY_TIMEOUT; +import static org.ballerinax.asb.util.ASBUtils.getRetryOptions; /** * This facilitates the client operations of MessageReceiver client in @@ -131,20 +127,6 @@ public MessageReceiver(String connectionString, String queueName, String topicNa log.debug("ServiceBusReceiverClient initialized"); } - private AmqpRetryOptions getRetryOptions(BMap retryConfigs) { - Long maxRetries = retryConfigs.getIntValue(MAX_RETRIES); - BigDecimal delayConfig = ((BDecimal) retryConfigs.get(DELAY)).decimalValue(); - BigDecimal maxDelay = ((BDecimal) retryConfigs.get(MAX_DELAY)).decimalValue(); - BigDecimal tryTimeout = ((BDecimal) retryConfigs.get(TRY_TIMEOUT)).decimalValue(); - String retryMode = retryConfigs.getStringValue(RETRY_MODE).getValue(); - return new AmqpRetryOptions() - .setMaxRetries(maxRetries.intValue()) - .setDelay(Duration.ofSeconds(delayConfig.intValue())) - .setMaxDelay(Duration.ofSeconds(maxDelay.intValue())) - .setTryTimeout(Duration.ofSeconds(tryTimeout.intValue())) - .setMode(AmqpRetryMode.valueOf(retryMode)); - } - /** * Receive Message with configurable parameters as Map when Receiver Connection * is given as a parameter and diff --git a/asb-native/src/main/java/org/ballerinax/asb/sender/MessageSender.java b/asb-native/src/main/java/org/ballerinax/asb/sender/MessageSender.java index 761c0601..25b60bce 100644 --- a/asb-native/src/main/java/org/ballerinax/asb/sender/MessageSender.java +++ b/asb-native/src/main/java/org/ballerinax/asb/sender/MessageSender.java @@ -18,6 +18,7 @@ package org.ballerinax.asb.sender; +import com.azure.core.amqp.AmqpRetryOptions; import com.azure.messaging.servicebus.ServiceBusClientBuilder; import com.azure.messaging.servicebus.ServiceBusException; import com.azure.messaging.servicebus.ServiceBusMessage; @@ -45,6 +46,8 @@ import org.ballerinax.asb.util.ASBConstants; import org.ballerinax.asb.util.ASBUtils; +import static org.ballerinax.asb.util.ASBUtils.getRetryOptions; + /** * This facilitates the client operations of MessageSender client in Ballerina. */ @@ -63,10 +66,14 @@ public class MessageSender { * @throws InterruptedException on failure initiating IMessage Receiver due to * thread interruption. */ - public MessageSender(String connectionString, String entityType, String topicOrQueueName, String logLevel) + public MessageSender(String connectionString, String entityType, String topicOrQueueName, String logLevel, + BMap retryConfigs) throws ServiceBusException, InterruptedException { log.setLevel(Level.toLevel(logLevel, Level.OFF)); - ServiceBusClientBuilder clientBuilder = new ServiceBusClientBuilder().connectionString(connectionString); + AmqpRetryOptions retryOptions = getRetryOptions(retryConfigs); + ServiceBusClientBuilder clientBuilder = new ServiceBusClientBuilder() + .retryOptions(retryOptions) + .connectionString(connectionString); if (!entityType.isEmpty() && entityType.equalsIgnoreCase("queue")) { this.sender = clientBuilder .sender() diff --git a/asb-native/src/main/java/org/ballerinax/asb/util/ASBUtils.java b/asb-native/src/main/java/org/ballerinax/asb/util/ASBUtils.java index 920bd4ff..610a7b33 100644 --- a/asb-native/src/main/java/org/ballerinax/asb/util/ASBUtils.java +++ b/asb-native/src/main/java/org/ballerinax/asb/util/ASBUtils.java @@ -18,6 +18,8 @@ package org.ballerinax.asb.util; +import com.azure.core.amqp.AmqpRetryMode; +import com.azure.core.amqp.AmqpRetryOptions; import io.ballerina.runtime.api.PredefinedTypes; import io.ballerina.runtime.api.creators.ErrorCreator; import io.ballerina.runtime.api.creators.TypeCreator; @@ -25,12 +27,22 @@ import io.ballerina.runtime.api.types.ErrorType; import io.ballerina.runtime.api.types.MapType; import io.ballerina.runtime.api.utils.StringUtils; +import io.ballerina.runtime.api.values.BDecimal; import io.ballerina.runtime.api.values.BError; import io.ballerina.runtime.api.values.BMap; import io.ballerina.runtime.api.values.BString; + +import java.math.BigDecimal; +import java.time.Duration; import java.util.HashMap; import java.util.Map; +import static org.ballerinax.asb.util.ASBConstants.DELAY; +import static org.ballerinax.asb.util.ASBConstants.MAX_DELAY; +import static org.ballerinax.asb.util.ASBConstants.MAX_RETRIES; +import static org.ballerinax.asb.util.ASBConstants.RETRY_MODE; +import static org.ballerinax.asb.util.ASBConstants.TRY_TIMEOUT; + public class ASBUtils { /** @@ -192,4 +204,18 @@ public static BError returnErrorValue(String message, Exception error) { return ErrorCreator.createError(errorType, StringUtils.fromString(message + "error from " + errorFromClass), er, null); } + + public static AmqpRetryOptions getRetryOptions(BMap retryConfigs) { + Long maxRetries = retryConfigs.getIntValue(MAX_RETRIES); + BigDecimal delayConfig = ((BDecimal) retryConfigs.get(DELAY)).decimalValue(); + BigDecimal maxDelay = ((BDecimal) retryConfigs.get(MAX_DELAY)).decimalValue(); + BigDecimal tryTimeout = ((BDecimal) retryConfigs.get(TRY_TIMEOUT)).decimalValue(); + String retryMode = retryConfigs.getStringValue(RETRY_MODE).getValue(); + return new AmqpRetryOptions() + .setMaxRetries(maxRetries.intValue()) + .setDelay(Duration.ofSeconds(delayConfig.intValue())) + .setMaxDelay(Duration.ofSeconds(maxDelay.intValue())) + .setTryTimeout(Duration.ofSeconds(tryTimeout.intValue())) + .setMode(AmqpRetryMode.valueOf(retryMode)); + } } diff --git a/gradle.properties b/gradle.properties index 449fc4d8..cf21c727 100644 --- a/gradle.properties +++ b/gradle.properties @@ -1,4 +1,4 @@ org.gradle.caching=true group=org.ballerinax.azure.servicebus -version=3.1.0 +version=3.2.0 ballerinaLangVersion=2201.3.1