diff --git a/ballerina/Ballerina.toml b/ballerina/Ballerina.toml index abab5893..c2d95a58 100644 --- a/ballerina/Ballerina.toml +++ b/ballerina/Ballerina.toml @@ -2,7 +2,7 @@ distribution = "2201.8.0" org = "ballerinax" name = "asb" -version = "3.8.0" +version = "3.8.1-SNAPSHOT" license= ["Apache-2.0"] authors = ["Ballerina"] keywords = ["IT Operations/Message Brokers", "Cost/Paid", "Vendor/Microsoft"] @@ -19,5 +19,5 @@ graalvmCompatible = true groupId = "org.ballerinax" artifactId = "asb-native" module = "asb-native" -version = "3.8.0" -path = "../native/build/libs/asb-native-3.8.0.jar" +version = "3.8.1-SNAPSHOT" +path = "../native/build/libs/asb-native-3.8.1-SNAPSHOT.jar" diff --git a/ballerina/Dependencies.toml b/ballerina/Dependencies.toml index ea02f1a4..a3c73353 100644 --- a/ballerina/Dependencies.toml +++ b/ballerina/Dependencies.toml @@ -127,7 +127,7 @@ modules = [ [[package]] org = "ballerinax" name = "asb" -version = "3.8.0" +version = "3.8.1-SNAPSHOT" dependencies = [ {org = "ballerina", name = "jballerina.java"}, {org = "ballerina", name = "lang.runtime"}, diff --git a/ballerina/receiver.bal b/ballerina/receiver.bal index 47ef7b88..ab748ac2 100644 --- a/ballerina/receiver.bal +++ b/ballerina/receiver.bal @@ -207,7 +207,7 @@ isolated function initializeReceiver(MessageReceiver receiverClient, handle conn 'class: "org.ballerinax.asb.receiver.MessageReceiver" } external; -isolated function receiveBatch(MessageReceiver endpointClient, int? maxMessageCount, int? serverWaitTime, boolean deadLettered) +isolated function receiveBatch(MessageReceiver endpointClient, int maxMessageCount, int? serverWaitTime, boolean deadLettered) returns MessageBatch|Error? = @java:Method { 'class: "org.ballerinax.asb.receiver.MessageReceiver" } external; diff --git a/ballerina/tests/asb_sender_receiver_negative_tests.bal b/ballerina/tests/asb_sender_receiver_negative_tests.bal index 4c05b850..029a840e 100644 --- a/ballerina/tests/asb_sender_receiver_negative_tests.bal +++ b/ballerina/tests/asb_sender_receiver_negative_tests.bal @@ -31,7 +31,13 @@ function testReceivePayloadWithIncorrectExpectedType() returns error? { check messageSender->sendPayload(mapContent); log:printInfo("Creating Asb message receiver."); - receiverConfig.receiveMode = RECEIVE_AND_DELETE; + ASBServiceReceiverConfig receiverConfig = { + connectionString: connectionString, + entityConfig: { + queueName: testQueue1 + }, + receiveMode: RECEIVE_AND_DELETE + }; MessageReceiver messageReceiver = check new (receiverConfig); log:printInfo("Receiving from Asb receiver client."); @@ -61,7 +67,13 @@ function testInvalidComplete() returns error? { MessageSender messageSender = check new (senderConfig); log:printInfo("Initializing Asb receiver client."); - receiverConfig.receiveMode = RECEIVE_AND_DELETE; + ASBServiceReceiverConfig receiverConfig = { + connectionString: connectionString, + entityConfig: { + queueName: testQueue1 + }, + receiveMode: RECEIVE_AND_DELETE + }; MessageReceiver messageReceiver = check new (receiverConfig); @@ -102,7 +114,13 @@ function testInvalidAbandon() returns error? { MessageSender messageSender = check new (senderConfig); log:printInfo("Initializing Asb receiver client."); - receiverConfig.receiveMode = RECEIVE_AND_DELETE; + ASBServiceReceiverConfig receiverConfig = { + connectionString: connectionString, + entityConfig: { + queueName: testQueue1 + }, + receiveMode: RECEIVE_AND_DELETE + }; MessageReceiver messageReceiver = check new (receiverConfig); @@ -145,7 +163,13 @@ function testReceivePayloadWithUnsupportedUnionExpectedType() returns error? { check messageSender->sendPayload(mapContent); log:printInfo("Creating Asb message receiver."); - receiverConfig.receiveMode = RECEIVE_AND_DELETE; + ASBServiceReceiverConfig receiverConfig = { + connectionString: connectionString, + entityConfig: { + queueName: testQueue1 + }, + receiveMode: RECEIVE_AND_DELETE + }; MessageReceiver messageReceiver = check new (receiverConfig); log:printInfo("Receiving from Asb receiver client."); @@ -170,8 +194,12 @@ function testReceivePayloadWithUnsupportedUnionExpectedType() returns error? { function testSendToInvalidTopic() returns error? { log:printInfo("[[testSendToInvalidTopic]]"); log:printInfo("Creating Asb message sender."); - senderConfig.topicOrQueueName = "non-existing-topic"; - MessageSender messageSender = check new (senderConfig); + ASBServiceSenderConfig invalidSenderConfig = { + connectionString: connectionString, + entityType: QUEUE, + topicOrQueueName: "non-existing-topic" + }; + MessageSender messageSender = check new (invalidSenderConfig); log:printInfo("Sending payloads via ASB sender"); Error? e = messageSender->sendPayload("message"); @@ -190,8 +218,14 @@ function testSendToInvalidTopic() returns error? { function testReceiveFromInvalidQueue() returns error? { log:printInfo("[[testReceiveFromInvalidQueue]]"); log:printInfo("Creating Asb message receiver."); - receiverConfig.entityConfig = {queueName: "non-existing-queue"}; - MessageReceiver messageReceiver = check new (receiverConfig); + ASBServiceReceiverConfig invalidReceiverConfig = { + connectionString: connectionString, + entityConfig: { + queueName: "non-existing-queue" + }, + receiveMode: PEEK_LOCK + }; + MessageReceiver messageReceiver = check new (invalidReceiverConfig); log:printInfo("Sending payloads via ASB sender"); Message|error? e = messageReceiver->receive(5); @@ -210,8 +244,12 @@ function testReceiveFromInvalidQueue() returns error? { function testInvalidConnectionString() returns error? { log:printInfo("[[testInvalidConnectionString]]"); log:printInfo("Creating Asb message sender."); - senderConfig.connectionString = "invalid-connection-string"; - MessageSender|Error messageSender = new (senderConfig); + ASBServiceSenderConfig invalidSenderConfig = { + connectionString: "invalid-connection-string", + entityType: QUEUE, + topicOrQueueName: "testQueue1" + }; + MessageSender|Error messageSender = new (invalidSenderConfig); test:assertTrue(messageSender is error, msg = "Client creation should have failed."); test:assertEquals((messageSender).message(), "Error occurred while processing request: " + diff --git a/changelog.md b/changelog.md index e9abcd02..95c1aa00 100644 --- a/changelog.md +++ b/changelog.md @@ -7,6 +7,12 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ## [Unreleased] +### Changed + +- [Implement ASB sender/receiver client actions in a non-blocking way](https://github.com/ballerina-platform/ballerina-library/issues/4982) + +## [3.8.0] - 2024-05-31 + ### Added - [Add the listener-service implementation of the Azure service-bus connector](https://github.com/ballerina-platform/ballerina-library/issues/6495) diff --git a/native/src/main/java/org/ballerinax/asb/receiver/MessageReceiver.java b/native/src/main/java/org/ballerinax/asb/receiver/MessageReceiver.java index 432e78d9..d707fa92 100644 --- a/native/src/main/java/org/ballerinax/asb/receiver/MessageReceiver.java +++ b/native/src/main/java/org/ballerinax/asb/receiver/MessageReceiver.java @@ -26,6 +26,8 @@ import com.azure.messaging.servicebus.ServiceBusReceivedMessage; import com.azure.messaging.servicebus.ServiceBusReceiverClient; import com.azure.messaging.servicebus.models.DeadLetterOptions; +import io.ballerina.runtime.api.Environment; +import io.ballerina.runtime.api.Future; import io.ballerina.runtime.api.PredefinedTypes; import io.ballerina.runtime.api.creators.ErrorCreator; import io.ballerina.runtime.api.creators.TypeCreator; @@ -49,9 +51,12 @@ import java.time.Duration; import java.util.HashMap; -import java.util.LinkedList; +import java.util.List; import java.util.Map; +import java.util.Objects; import java.util.Optional; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; import static io.ballerina.runtime.api.creators.ValueCreator.createRecordValue; import static org.ballerinax.asb.util.ASBConstants.APPLICATION_PROPERTY_KEY; @@ -90,6 +95,8 @@ * Ballerina. */ public class MessageReceiver { + private static final ExecutorService EXECUTOR_SERVICE = Executors.newCachedThreadPool( + new ReceiverNetworkThreadFactory()); private static final Logger LOGGER = LoggerFactory.getLogger(MessageReceiver.class); @@ -139,40 +146,35 @@ public static Object initializeReceiver(BObject receiverClient, String connectio * message. * @return Message Object of the received message. */ - public static Object receive(BObject receiverClient, Object serverWaitTime, - BTypedesc expectedType, Object deadLettered) { - try { - ServiceBusReceiverClient receiver; - if ((boolean) deadLettered) { - receiver = (ServiceBusReceiverClient) getDeadLetterMessageReceiverFromBObject(receiverClient); - } else { - receiver = getReceiverFromBObject(receiverClient); - } - IterableStream receivedMessages; - if (serverWaitTime != null) { - receivedMessages = receiver.receiveMessages(1, Duration.ofSeconds((long) serverWaitTime)); - } else { - receivedMessages = receiver.receiveMessages(1); - } - - ServiceBusReceivedMessage receivedMessage = null; - for (ServiceBusReceivedMessage message : receivedMessages) { - receivedMessage = message; - } - if (receivedMessage == null) { - return null; + public static Object receive(Environment env, BObject receiverClient, Object serverWaitTime, BTypedesc expectedType, + boolean deadLettered) { + ServiceBusReceiverClient receiver = getNativeReceiver(receiverClient, deadLettered); + Future future = env.markAsync(); + EXECUTOR_SERVICE.execute(() -> { + try { + List messages = receiver + .receiveMessages(1, Duration.ofSeconds((long) serverWaitTime)) + .stream().toList(); + if (messages.isEmpty()) { + future.complete(null); + return; + } + ServiceBusReceivedMessage message = messages.get(0); + RecordType expectedRecordType = ASBUtils.getRecordType(expectedType); + BMap bMsg = constructExpectedMessageRecord(message, expectedRecordType); + future.complete(bMsg); + } catch (BError e) { + BError bError = ASBErrorCreator.fromBError(e); + future.complete(bError); + } catch (ServiceBusException e) { + BError bError = ASBErrorCreator.fromASBException(e); + future.complete(bError); + } catch (Exception e) { + BError bError = ASBErrorCreator.fromUnhandledException(e); + future.complete(bError); } - - LOGGER.debug("Received message with messageId: " + receivedMessage.getMessageId()); - RecordType expectedRecordType = ASBUtils.getRecordType(expectedType); - return constructExpectedMessageRecord(receiverClient, receivedMessage, expectedRecordType); - } catch (BError e) { - return ASBErrorCreator.fromBError(e); - } catch (ServiceBusException e) { - return ASBErrorCreator.fromASBException(e); - } catch (Exception e) { - return ASBErrorCreator.fromUnhandledException(e); - } + }); + return null; } /** @@ -182,48 +184,46 @@ public static Object receive(BObject receiverClient, Object serverWaitTime, * @param serverWaitTime Specified server wait time in seconds to receive message * @return message payload */ - - public static Object receivePayload(BObject receiverClient, Object serverWaitTime, - BTypedesc expectedType, Object deadLettered) { - try { - ServiceBusReceiverClient receiver; - if ((boolean) deadLettered) { - receiver = (ServiceBusReceiverClient) getDeadLetterMessageReceiverFromBObject(receiverClient); - } else { - receiver = getReceiverFromBObject(receiverClient); - } - IterableStream receivedMessages; - if (serverWaitTime != null) { - receivedMessages = receiver.receiveMessages(1, Duration.ofSeconds((long) serverWaitTime)); - } else { - receivedMessages = receiver.receiveMessages(1); - } - - ServiceBusReceivedMessage receivedMessage = null; - for (ServiceBusReceivedMessage message : receivedMessages) { - receivedMessage = message; - } - if (receivedMessage == null) { - return null; - } - - LOGGER.debug("Received message with messageId: " + receivedMessage.getMessageId()); - Object messageBody = getMessagePayload(receivedMessage); - if (messageBody instanceof byte[]) { - return getValueWithIntendedType((byte[]) messageBody, expectedType.getDescribingType()); - } else { - Optional bValue = convertJavaToBValue(receivedMessage.getMessageId(), messageBody); - return bValue.orElseGet(() -> - ErrorCreator.createError(StringUtils.fromString("Failed to bind the received ASB message " + - "value to the expected Ballerina type: '" + expectedType.toString() + "'"))); + public static Object receivePayload(Environment env, BObject receiverClient, Object serverWaitTime, + BTypedesc expectedType, boolean deadLettered) { + ServiceBusReceiverClient receiver = getNativeReceiver(receiverClient, deadLettered); + Future future = env.markAsync(); + EXECUTOR_SERVICE.execute(() -> { + try { + List messages = receiver + .receiveMessages(1, Duration.ofSeconds((long) serverWaitTime)) + .stream().toList(); + if (messages.isEmpty()) { + future.complete(null); + return; + } + ServiceBusReceivedMessage message = messages.get(0); + + Object messageBody = getMessagePayload(message); + if (messageBody instanceof byte[] binaryPayload) { + Object messagePayload = getValueWithIntendedType(binaryPayload, expectedType.getDescribingType()); + future.complete(messagePayload); + } else { + Optional bValue = convertJavaToBValue(message.getMessageId(), messageBody); + String payloadBindingErr = String.format( + "Failed to bind the received ASB message value to the expected Ballerina type: '%s'", + expectedType.toString()); + Object messagePayload = bValue.orElseGet(() -> ErrorCreator.createError( + StringUtils.fromString(payloadBindingErr))); + future.complete(messagePayload); + } + } catch (BError e) { + BError bError = ASBErrorCreator.fromBError(e); + future.complete(bError); + } catch (ServiceBusException e) { + BError bError = ASBErrorCreator.fromASBException(e); + future.complete(bError); + } catch (Exception e) { + BError bError = ASBErrorCreator.fromUnhandledException(e); + future.complete(bError); } - } catch (BError e) { - return ASBErrorCreator.fromBError(e); - } catch (ServiceBusException e) { - return ASBErrorCreator.fromASBException(e); - } catch (Exception e) { - return ASBErrorCreator.fromUnhandledException(e); - } + }); + return null; } /** @@ -237,29 +237,46 @@ public static Object receivePayload(BObject receiverClient, Object serverWaitTim * @param serverWaitTime Server wait time. * @return Batch Message Object of the received batch of messages. */ - public static Object receiveBatch(BObject receiverClient, Object maxMessageCount, Object serverWaitTime - , Object deadLettered) { - try { - ServiceBusReceiverClient receiver; - if ((boolean) deadLettered) { - receiver = (ServiceBusReceiverClient) getDeadLetterMessageReceiverFromBObject(receiverClient); - } else { - receiver = getReceiverFromBObject(receiverClient); - } - if (LOGGER.isDebugEnabled()) { - LOGGER.debug("Waiting up to 'serverWaitTime' seconds for messages from " + receiver.getEntityPath()); + public static Object receiveBatch(Environment env, BObject receiverClient, long maxMessageCount, + Object serverWaitTime, boolean deadLettered) { + ServiceBusReceiverClient receiver = getNativeReceiver(receiverClient, deadLettered); + Future future = env.markAsync(); + EXECUTOR_SERVICE.execute(() -> { + try { + IterableStream receivedMessageStream; + if (Objects.isNull(serverWaitTime)) { + receivedMessageStream = receiver.receiveMessages((int) maxMessageCount); + } else { + receivedMessageStream = receiver.receiveMessages( + (int) maxMessageCount, Duration.ofSeconds((long) serverWaitTime)); + } + List> bMessages = receivedMessageStream.stream().map(msg -> { + BMap bMsg = constructExpectedMessageRecord(msg, null); + bMsg.addNativeData(NATIVE_MESSAGE, msg); + return bMsg; + }).toList(); + BMap messageRecord = ValueCreator.createRecordValue(ModuleUtils.getModule(), + ASBConstants.MESSAGE_RECORD); + ArrayType sourceArrayType = TypeCreator.createArrayType(TypeUtils.getType(messageRecord)); + + Map value = new HashMap<>(); + value.put("messageCount", bMessages.size()); + value.put("messages", ValueCreator.createArrayValue(bMessages.toArray(new Object[0]), sourceArrayType)); + BMap bMsgBatch = createRecordValue( + ModuleUtils.getModule(), ASBConstants.MESSAGE_BATCH_RECORD, value); + future.complete(bMsgBatch); + } catch (BError e) { + BError bError = ASBErrorCreator.fromBError(e); + future.complete(bError); + } catch (ServiceBusException e) { + BError bError = ASBErrorCreator.fromASBException(e); + future.complete(bError); + } catch (Exception e) { + BError bError = ASBErrorCreator.fromUnhandledException(e); + future.complete(bError); } - return getReceivedMessageBatch(receiverClient, maxMessageCount, serverWaitTime, deadLettered); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - return null; - } catch (BError e) { - return ASBErrorCreator.fromBError(e); - } catch (ServiceBusException e) { - return ASBErrorCreator.fromASBException(e); - } catch (Exception e) { - return ASBErrorCreator.fromUnhandledException(e); - } + }); + return null; } /** @@ -269,26 +286,27 @@ public static Object receiveBatch(BObject receiverClient, Object maxMessageCount * @param message Message object. * @return An error if failed to complete the message. */ - public static Object complete(BObject receiverClient, BMap message) { - try { - ServiceBusReceivedMessage nativeMessage = getNativeMessage(message); - ServiceBusReceiverClient receiver; - if (nativeMessage.getDeadLetterReason() != null) { - receiver = (ServiceBusReceiverClient) getDeadLetterMessageReceiverFromBObject(receiverClient); - } else { - receiver = getReceiverFromBObject(receiverClient); + public static Object complete(Environment env, BObject receiverClient, BMap message) { + ServiceBusReceivedMessage nativeMessage = getNativeMessage(message); + ServiceBusReceiverClient receiver = getNativeReceiver( + receiverClient, Objects.nonNull(nativeMessage.getDeadLetterReason())); + Future future = env.markAsync(); + EXECUTOR_SERVICE.execute(() -> { + try { + receiver.complete(nativeMessage); + future.complete(null); + } catch (BError e) { + BError bError = ASBErrorCreator.fromBError(e); + future.complete(bError); + } catch (ServiceBusException e) { + BError bError = ASBErrorCreator.fromASBException(e); + future.complete(bError); + } catch (Exception e) { + BError bError = ASBErrorCreator.fromUnhandledException(e); + future.complete(bError); } - receiver.complete(nativeMessage); - LOGGER.debug("Completed the message(Id: " + nativeMessage.getMessageId() + ") with lockToken " + - nativeMessage.getLockToken()); - return null; - } catch (BError e) { - return ASBErrorCreator.fromBError(e); - } catch (ServiceBusException e) { - return ASBErrorCreator.fromASBException(e); - } catch (Exception e) { - return ASBErrorCreator.fromUnhandledException(e); - } + }); + return null; } /** @@ -298,21 +316,26 @@ public static Object complete(BObject receiverClient, BMap mess * @param message Message object. * @return An error if failed to abandon the message. */ - public static Object abandon(BObject receiverClient, BMap message) { - try { - ServiceBusReceiverClient receiver = getReceiverFromBObject(receiverClient); - ServiceBusReceivedMessage nativeMessage = getNativeMessage(message); - receiver.abandon(nativeMessage); - LOGGER.debug(String.format("Done abandoning a message(Id: %s) using its lock token from %n%s", - nativeMessage.getMessageId(), receiver.getEntityPath())); - return null; - } catch (BError e) { - return ASBErrorCreator.fromBError(e); - } catch (ServiceBusException e) { - return ASBErrorCreator.fromASBException(e); - } catch (Exception e) { - return ASBErrorCreator.fromUnhandledException(e); - } + public static Object abandon(Environment env, BObject receiverClient, BMap message) { + ServiceBusReceivedMessage nativeMessage = getNativeMessage(message); + ServiceBusReceiverClient receiver = getNativeReceiver(receiverClient, false); + Future future = env.markAsync(); + EXECUTOR_SERVICE.execute(() -> { + try { + receiver.abandon(nativeMessage); + future.complete(null); + } catch (BError e) { + BError bError = ASBErrorCreator.fromBError(e); + future.complete(bError); + } catch (ServiceBusException e) { + BError bError = ASBErrorCreator.fromASBException(e); + future.complete(bError); + } catch (Exception e) { + BError bError = ASBErrorCreator.fromUnhandledException(e); + future.complete(bError); + } + }); + return null; } /** @@ -324,25 +347,30 @@ public static Object abandon(BObject receiverClient, BMap messa * @param deadLetterErrorDescription The dead letter error description. * @return An error if failed to dead letter the message. */ - public static Object deadLetter(BObject receiverClient, BMap message, Object deadLetterReason, - Object deadLetterErrorDescription) { - try { - ServiceBusReceiverClient receiver = getReceiverFromBObject(receiverClient); - ServiceBusReceivedMessage nativeMessage = getNativeMessage(message); - DeadLetterOptions options = new DeadLetterOptions() - .setDeadLetterErrorDescription(ASBUtils.convertString(deadLetterErrorDescription)); - options.setDeadLetterReason(ASBUtils.convertString(deadLetterReason)); - receiver.deadLetter(nativeMessage, options); - LOGGER.debug(String.format("Done dead-lettering a message(Id: %s) using its lock token from %s", - nativeMessage.getMessageId(), receiver.getEntityPath())); - return null; - } catch (BError e) { - return ASBErrorCreator.fromBError(e); - } catch (ServiceBusException e) { - return ASBErrorCreator.fromASBException(e); - } catch (Exception e) { - return ASBErrorCreator.fromUnhandledException(e); - } + public static Object deadLetter(Environment env, BObject receiverClient, BMap message, + Object deadLetterReason, Object deadLetterErrorDescription) { + ServiceBusReceivedMessage nativeMessage = getNativeMessage(message); + ServiceBusReceiverClient receiver = getNativeReceiver(receiverClient, false); + Future future = env.markAsync(); + EXECUTOR_SERVICE.execute(() -> { + try { + DeadLetterOptions options = new DeadLetterOptions() + .setDeadLetterErrorDescription(ASBUtils.convertString(deadLetterErrorDescription)); + options.setDeadLetterReason(ASBUtils.convertString(deadLetterReason)); + receiver.deadLetter(nativeMessage, options); + future.complete(null); + } catch (BError e) { + BError bError = ASBErrorCreator.fromBError(e); + future.complete(bError); + } catch (ServiceBusException e) { + BError bError = ASBErrorCreator.fromASBException(e); + future.complete(bError); + } catch (Exception e) { + BError bError = ASBErrorCreator.fromUnhandledException(e); + future.complete(bError); + } + }); + return null; } /** @@ -352,21 +380,26 @@ public static Object deadLetter(BObject receiverClient, BMap me * @param message Message object. * @return An error if failed to defer the message. */ - public static Object defer(BObject receiverClient, BMap message) { - try { - ServiceBusReceiverClient receiver = getReceiverFromBObject(receiverClient); - ServiceBusReceivedMessage nativeMessage = getNativeMessage(message); - receiver.defer(nativeMessage); - LOGGER.debug(String.format("Done deferring a message(Id: %s) using its lock token from %s", - nativeMessage.getMessageId(), receiver.getEntityPath())); - return null; - } catch (BError e) { - return ASBErrorCreator.fromBError(e); - } catch (ServiceBusException e) { - return ASBErrorCreator.fromASBException(e); - } catch (Exception e) { - return ASBErrorCreator.fromUnhandledException(e); - } + public static Object defer(Environment env, BObject receiverClient, BMap message) { + ServiceBusReceivedMessage nativeMessage = getNativeMessage(message); + ServiceBusReceiverClient receiver = getNativeReceiver(receiverClient, false); + Future future = env.markAsync(); + EXECUTOR_SERVICE.execute(() -> { + try { + receiver.defer(nativeMessage); + future.complete(null); + } catch (BError e) { + BError bError = ASBErrorCreator.fromBError(e); + future.complete(bError); + } catch (ServiceBusException e) { + BError bError = ASBErrorCreator.fromASBException(e); + future.complete(bError); + } catch (Exception e) { + BError bError = ASBErrorCreator.fromUnhandledException(e); + future.complete(bError); + } + }); + return null; } /** @@ -381,22 +414,30 @@ public static Object defer(BObject receiverClient, BMap message * its true identifier. * @return The received Message or null if there is no message for given sequence number. */ - public static Object receiveDeferred(BObject receiverClient, Object sequenceNumber) { - try { - ServiceBusReceiverClient receiver = getReceiverFromBObject(receiverClient); - ServiceBusReceivedMessage receivedMessage = receiver.receiveDeferredMessage((long) sequenceNumber); - if (receivedMessage == null) { - return null; + public static Object receiveDeferred(Environment env, BObject receiverClient, long sequenceNumber) { + ServiceBusReceiverClient receiver = getNativeReceiver(receiverClient, false); + Future future = env.markAsync(); + EXECUTOR_SERVICE.execute(() -> { + try { + ServiceBusReceivedMessage message = receiver.receiveDeferredMessage(sequenceNumber); + if (Objects.isNull(message)) { + future.complete(null); + return; + } + BMap bMsg = constructExpectedMessageRecord(message, null); + future.complete(bMsg); + } catch (BError e) { + BError bError = ASBErrorCreator.fromBError(e); + future.complete(bError); + } catch (ServiceBusException e) { + BError bError = ASBErrorCreator.fromASBException(e); + future.complete(bError); + } catch (Exception e) { + BError bError = ASBErrorCreator.fromUnhandledException(e); + future.complete(bError); } - LOGGER.debug("Received deferred message using its sequenceNumber from " + receiver.getEntityPath()); - return constructExpectedMessageRecord(receiverClient, receivedMessage, null); - } catch (BError e) { - return ASBErrorCreator.fromBError(e); - } catch (ServiceBusException e) { - return ASBErrorCreator.fromASBException(e); - } catch (Exception e) { - return ASBErrorCreator.fromUnhandledException(e); - } + }); + return null; } /** @@ -407,21 +448,26 @@ public static Object receiveDeferred(BObject receiverClient, Object sequenceNumb * @param message Message object. * @return An error if failed to renewLock of the message. */ - public static Object renewLock(BObject receiverClient, BMap message) { - try { - ServiceBusReceiverClient receiver = getReceiverFromBObject(receiverClient); - ServiceBusReceivedMessage nativeMessage = getNativeMessage(message); - receiver.renewMessageLock(nativeMessage); - LOGGER.debug(String.format("Done renewing a message(Id: %s) using its lock token from %s", - nativeMessage.getMessageId(), receiver.getEntityPath())); - return null; - } catch (BError e) { - return ASBErrorCreator.fromBError(e); - } catch (ServiceBusException e) { - return ASBErrorCreator.fromASBException(e); - } catch (Exception e) { - return ASBErrorCreator.fromUnhandledException(e); - } + public static Object renewLock(Environment env, BObject receiverClient, BMap message) { + ServiceBusReceivedMessage nativeMessage = getNativeMessage(message); + ServiceBusReceiverClient receiver = getNativeReceiver(receiverClient, false); + Future future = env.markAsync(); + EXECUTOR_SERVICE.execute(() -> { + try { + receiver.renewMessageLock(nativeMessage); + future.complete(null); + } catch (BError e) { + BError bError = ASBErrorCreator.fromBError(e); + future.complete(bError); + } catch (ServiceBusException e) { + BError bError = ASBErrorCreator.fromASBException(e); + future.complete(bError); + } catch (Exception e) { + BError bError = ASBErrorCreator.fromUnhandledException(e); + future.complete(bError); + } + }); + return null; } /** @@ -429,31 +475,35 @@ public static Object renewLock(BObject receiverClient, BMap mes * * @return An error if failed to close the receiver. */ - public static Object closeReceiver(BObject receiverClient) { - try { - ServiceBusReceiverClient receiver = getReceiverFromBObject(receiverClient); - receiver.close(); - LOGGER.debug("Closed the receiver"); - return null; - } catch (BError e) { - return ASBErrorCreator.fromBError(e); - } catch (ServiceBusException e) { - return ASBErrorCreator.fromASBException(e); - } catch (Exception e) { - return ASBErrorCreator.fromUnhandledException(e); - } + public static Object closeReceiver(Environment env, BObject receiverClient) { + ServiceBusReceiverClient receiver = getNativeReceiver(receiverClient, false); + Future future = env.markAsync(); + EXECUTOR_SERVICE.execute(() -> { + try { + receiver.close(); + future.complete(null); + } catch (BError e) { + BError bError = ASBErrorCreator.fromBError(e); + future.complete(bError); + } catch (ServiceBusException e) { + BError bError = ASBErrorCreator.fromASBException(e); + future.complete(bError); + } catch (Exception e) { + BError bError = ASBErrorCreator.fromUnhandledException(e); + future.complete(bError); + } + }); + return null; } /** * Converts the received message to the contextually expected Ballerina record type (or to anydata, if not * specified). * - * @param receiverClient Ballerina client object * @param message Received Message */ - private static BMap constructExpectedMessageRecord(BObject receiverClient, - ServiceBusReceivedMessage message, - RecordType expectedType) { + private static BMap constructExpectedMessageRecord(ServiceBusReceivedMessage message, + RecordType expectedType) { Map map = populateOptionalFieldsMap(message); Object messageBody = getMessagePayload(message); if (messageBody instanceof byte[]) { @@ -530,42 +580,6 @@ public static Object getMessagePayload(ServiceBusReceivedMessage receivedMessage } } - private static BMap getReceivedMessageBatch(BObject receiverClient, Object maxMessageCount, - Object serverWaitTime, Object deadLettered) - throws InterruptedException, ServiceBusException { - ServiceBusReceiverClient receiver; - if ((boolean) deadLettered) { - receiver = (ServiceBusReceiverClient) getDeadLetterMessageReceiverFromBObject(receiverClient); - } else { - receiver = getReceiverFromBObject(receiverClient); - } - int maxCount = Long.valueOf(maxMessageCount.toString()).intValue(); - IterableStream receivedMessageStream; - if (serverWaitTime != null) { - receivedMessageStream = receiver.receiveMessages(maxCount, Duration.ofSeconds((long) serverWaitTime)); - } else { - receivedMessageStream = receiver.receiveMessages(maxCount); - } - - LinkedList receivedMessages = new LinkedList<>(); - for (ServiceBusReceivedMessage receivedMessage : receivedMessageStream) { - BMap recordMap = constructExpectedMessageRecord(receiverClient, receivedMessage, null); - BMap messageRecord = createRecordValue(ModuleUtils.getModule(), - ASBConstants.MESSAGE_RECORD, recordMap); - messageRecord.addNativeData(NATIVE_MESSAGE, receivedMessage); - receivedMessages.add(messageRecord); - } - - BMap messageRecord = ValueCreator.createRecordValue(ModuleUtils.getModule(), - ASBConstants.MESSAGE_RECORD); - ArrayType sourceArrayType = TypeCreator.createArrayType(TypeUtils.getType(messageRecord)); - - Map map = new HashMap<>(); - map.put("messageCount", receivedMessages.size()); - map.put("messages", ValueCreator.createArrayValue(receivedMessages.toArray(new Object[0]), sourceArrayType)); - return createRecordValue(ModuleUtils.getModule(), ASBConstants.MESSAGE_BATCH_RECORD, map); - } - private static BMap getApplicationProperties(ServiceBusReceivedMessage message) { BMap applicationPropertiesRecord = createRecordValue(ModuleUtils.getModule(), ASBConstants.APPLICATION_PROPERTY_TYPE); @@ -603,8 +617,11 @@ private static void populateApplicationProperty(BMap applicatio } } - private static ServiceBusReceiverClient getReceiverFromBObject(BObject receiverObject) { - return (ServiceBusReceiverClient) receiverObject.getNativeData(RECEIVER_CLIENT); + private static ServiceBusReceiverClient getNativeReceiver(BObject bReceiver, boolean isDeadLetter) { + if (isDeadLetter) { + return (ServiceBusReceiverClient) getDeadLetterMessageReceiverFromBObject(bReceiver); + } + return (ServiceBusReceiverClient) bReceiver.getNativeData(RECEIVER_CLIENT); } private static Object getDeadLetterMessageReceiverFromBObject(BObject receiverObject) { diff --git a/native/src/main/java/org/ballerinax/asb/receiver/ReceiverNetworkThreadFactory.java b/native/src/main/java/org/ballerinax/asb/receiver/ReceiverNetworkThreadFactory.java new file mode 100644 index 00000000..e70e28e4 --- /dev/null +++ b/native/src/main/java/org/ballerinax/asb/receiver/ReceiverNetworkThreadFactory.java @@ -0,0 +1,35 @@ +/* + * Copyright (c) 2024, WSO2 LLC. (http://www.wso2.org). + * + * WSO2 LLC. licenses this file to you under the Apache License, + * Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KINDither express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.ballerinax.asb.receiver; + +import java.util.concurrent.ThreadFactory; + +/** + * A {@link ThreadFactory} object that creates new threads on demand for ASB message-receiver network actions. + */ +public class ReceiverNetworkThreadFactory implements ThreadFactory { + private final String threadGroupName = "asb-receiver-network-thread"; + + @Override + public Thread newThread(Runnable runnable) { + Thread receiverThread = new Thread(runnable); + receiverThread.setName(threadGroupName); + return receiverThread; + } +} diff --git a/native/src/main/java/org/ballerinax/asb/sender/MessageSender.java b/native/src/main/java/org/ballerinax/asb/sender/MessageSender.java index a404315f..2dbe4163 100644 --- a/native/src/main/java/org/ballerinax/asb/sender/MessageSender.java +++ b/native/src/main/java/org/ballerinax/asb/sender/MessageSender.java @@ -26,6 +26,8 @@ import com.azure.messaging.servicebus.ServiceBusMessageBatch; import com.azure.messaging.servicebus.ServiceBusSenderClient; import com.azure.messaging.servicebus.models.CreateMessageBatchOptions; +import io.ballerina.runtime.api.Environment; +import io.ballerina.runtime.api.Future; import io.ballerina.runtime.api.TypeTags; import io.ballerina.runtime.api.types.Type; import io.ballerina.runtime.api.utils.StringUtils; @@ -49,6 +51,8 @@ import java.util.ArrayList; import java.util.Collection; import java.util.Map; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; import static org.ballerinax.asb.util.ASBUtils.getRetryOptions; @@ -56,6 +60,8 @@ * This facilitates the client operations of MessageSender client in Ballerina. */ public class MessageSender { + private static final ExecutorService EXECUTOR_SERVICE = Executors.newCachedThreadPool( + new SenderNetworkThreadFactory()); private static final Logger LOGGER = LoggerFactory.getLogger(MessageSender.class); @@ -98,22 +104,26 @@ public static Object initializeSender(BObject senderClient, String connectionStr * @param message Input message record as a BMap * @return An error if failed to send the message */ - public static Object send(BObject senderClient, BMap message) { - try { - ServiceBusSenderClient sender = getSenderFromBObject(senderClient); - ServiceBusMessage messageToSend = constructMessage(message); - sender.sendMessage(messageToSend); - if (LOGGER.isDebugEnabled()) { - LOGGER.debug("Sent the message successfully. Message Id = " + messageToSend.getMessageId()); + public static Object send(Environment env, BObject senderClient, BMap message) { + ServiceBusSenderClient sender = getNativeSender(senderClient); + Future future = env.markAsync(); + EXECUTOR_SERVICE.execute(() -> { + try { + ServiceBusMessage messageToSend = constructMessage(message); + sender.sendMessage(messageToSend); + future.complete(null); + } catch (BError e) { + BError bError = ASBErrorCreator.fromBError(e); + future.complete(bError); + } catch (ServiceBusException e) { + BError bError = ASBErrorCreator.fromASBException(e); + future.complete(bError); + } catch (Exception e) { + BError bError = ASBErrorCreator.fromUnhandledException(e); + future.complete(bError); } - return null; - } catch (BError e) { - return ASBErrorCreator.fromBError(e); - } catch (ServiceBusException e) { - return ASBErrorCreator.fromASBException(e); - } catch (Exception e) { - return ASBErrorCreator.fromUnhandledException(e); - } + }); + return null; } /** @@ -124,23 +134,27 @@ public static Object send(BObject senderClient, BMap message) { * @param scheduleTime Input schedule time record as a BMap * @return An error if failed to send the message */ - public static Object schedule(BObject senderClient, BMap message, + public static Object schedule(Environment env, BObject senderClient, BMap message, BMap scheduleTime) { - try { - ServiceBusSenderClient sender = getSenderFromBObject(senderClient); - ServiceBusMessage messageToSend = constructMessage(message); - Long sequenceNumber = sender.scheduleMessage(messageToSend, constructOffset(scheduleTime)); - if (LOGGER.isDebugEnabled()) { - LOGGER.debug("Scheduled the message successfully. Message Id = " + messageToSend.getMessageId()); + ServiceBusSenderClient sender = getNativeSender(senderClient); + Future future = env.markAsync(); + EXECUTOR_SERVICE.execute(() -> { + try { + ServiceBusMessage messageToSend = constructMessage(message); + Long sequenceNumber = sender.scheduleMessage(messageToSend, constructOffset(scheduleTime)); + future.complete(sequenceNumber); + } catch (BError e) { + BError bError = ASBErrorCreator.fromBError(e); + future.complete(bError); + } catch (ServiceBusException e) { + BError bError = ASBErrorCreator.fromASBException(e); + future.complete(bError); + } catch (Exception e) { + BError bError = ASBErrorCreator.fromUnhandledException(e); + future.complete(bError); } - return sequenceNumber; - } catch (BError e) { - return ASBErrorCreator.fromBError(e); - } catch (ServiceBusException e) { - return ASBErrorCreator.fromASBException(e); - } catch (Exception e) { - return ASBErrorCreator.fromUnhandledException(e); - } + }); + return null; } /** @@ -149,21 +163,25 @@ public static Object schedule(BObject senderClient, BMap messag * @param sequenceNumber The sequence number of the message to cance * @return An error if failed to send the message */ - public static Object cancel(BObject senderClient, long sequenceNumber) { - try { - ServiceBusSenderClient sender = getSenderFromBObject(senderClient); - sender.cancelScheduledMessage(sequenceNumber); - if (LOGGER.isDebugEnabled()) { - LOGGER.debug("Successfully cancelled scheduled message with sequenceNumber = " + sequenceNumber); + public static Object cancel(Environment env, BObject senderClient, long sequenceNumber) { + ServiceBusSenderClient sender = getNativeSender(senderClient); + Future future = env.markAsync(); + EXECUTOR_SERVICE.execute(() -> { + try { + sender.cancelScheduledMessage(sequenceNumber); + future.complete(null); + } catch (BError e) { + BError bError = ASBErrorCreator.fromBError(e); + future.complete(bError); + } catch (ServiceBusException e) { + BError bError = ASBErrorCreator.fromASBException(e); + future.complete(bError); + } catch (Exception e) { + BError bError = ASBErrorCreator.fromUnhandledException(e); + future.complete(bError); } - return null; - } catch (BError e) { - return ASBErrorCreator.fromBError(e); - } catch (ServiceBusException e) { - return ASBErrorCreator.fromASBException(e); - } catch (Exception e) { - return ASBErrorCreator.fromUnhandledException(e); - } + }); + return null; } /** @@ -174,47 +192,51 @@ public static Object cancel(BObject senderClient, long sequenceNumber) { * @param messages Input batch message record as a BMap * @return An error if failed send the message. */ - public static Object sendBatch(BObject senderClient, BMap messages) { - try { - ServiceBusSenderClient sender = getSenderFromBObject(senderClient); - Map messagesMap = ASBUtils.toObjectMap(messages); - BArray messageArray = (BArray) messagesMap.get("messages"); - Collection messageBatch = new ArrayList<>(); - for (int i = 0; i < messageArray.getLength(); i++) { - BMap messageBMap = (BMap) messageArray.get(i); - ServiceBusMessage asbMessage = constructMessage(messageBMap); - messageBatch.add(asbMessage); - } - ServiceBusMessageBatch currentBatch = sender.createMessageBatch(new CreateMessageBatchOptions()); - for (ServiceBusMessage message : messageBatch) { - if (currentBatch.tryAddMessage(message)) { - continue; + public static Object sendBatch(Environment env, BObject senderClient, BMap messages) { + ServiceBusSenderClient sender = getNativeSender(senderClient); + Future future = env.markAsync(); + EXECUTOR_SERVICE.execute(() -> { + try { + Map messagesMap = ASBUtils.toObjectMap(messages); + BArray messageArray = (BArray) messagesMap.get("messages"); + Collection messageBatch = new ArrayList<>(); + for (int i = 0; i < messageArray.getLength(); i++) { + BMap messageBMap = (BMap) messageArray.get(i); + ServiceBusMessage asbMessage = constructMessage(messageBMap); + messageBatch.add(asbMessage); } - // The batch is full, so we create a new batch and send the batch. - sender.sendMessages(currentBatch); - currentBatch = sender.createMessageBatch(); + ServiceBusMessageBatch currentBatch = sender.createMessageBatch(new CreateMessageBatchOptions()); + for (ServiceBusMessage message : messageBatch) { + if (currentBatch.tryAddMessage(message)) { + continue; + } + // The batch is full, so we create a new batch and send the batch. + sender.sendMessages(currentBatch); + currentBatch = sender.createMessageBatch(); - // Add that message that we couldn't before. - if (!currentBatch.tryAddMessage(message)) { - if (LOGGER.isDebugEnabled()) { - LOGGER.debug("Message is too large for an empty batch. Skipping. Max size: " - + currentBatch.getMaxSizeInBytes() + ". Message: " + - message.getBody().toString()); + // Add that message that we couldn't before. + if (!currentBatch.tryAddMessage(message)) { + if (LOGGER.isDebugEnabled()) { + LOGGER.debug("Message is too large for an empty batch. Skipping. Max size: " + + currentBatch.getMaxSizeInBytes() + ". Message: " + + message.getBody().toString()); + } } } + sender.sendMessages(currentBatch); + future.complete(null); + } catch (BError e) { + BError bError = ASBErrorCreator.fromBError(e); + future.complete(bError); + } catch (ServiceBusException e) { + BError bError = ASBErrorCreator.fromASBException(e); + future.complete(bError); + } catch (Exception e) { + BError bError = ASBErrorCreator.fromUnhandledException(e); + future.complete(bError); } - sender.sendMessages(currentBatch); - if (LOGGER.isDebugEnabled()) { - LOGGER.debug("Sent the batch message successfully"); - } - return null; - } catch (BError e) { - return ASBErrorCreator.fromBError(e); - } catch (ServiceBusException e) { - return ASBErrorCreator.fromASBException(e); - } catch (Exception e) { - return ASBErrorCreator.fromUnhandledException(e); - } + }); + return null; } /** @@ -222,19 +244,25 @@ public static Object sendBatch(BObject senderClient, BMap messa * * @return @return An error if failed close the sender. */ - public static Object close(BObject senderClient) { - try { - ServiceBusSenderClient sender = getSenderFromBObject(senderClient); - sender.close(); - LOGGER.debug("Closed the sender. Identifier=" + sender.getIdentifier()); - return null; - } catch (BError e) { - return ASBErrorCreator.fromBError(e); - } catch (ServiceBusException e) { - return ASBErrorCreator.fromASBException(e); - } catch (Exception e) { - return ASBErrorCreator.fromUnhandledException(e); - } + public static Object close(Environment env, BObject senderClient) { + ServiceBusSenderClient sender = getNativeSender(senderClient); + Future future = env.markAsync(); + EXECUTOR_SERVICE.execute(() -> { + try { + sender.close(); + future.complete(null); + } catch (BError e) { + BError bError = ASBErrorCreator.fromBError(e); + future.complete(bError); + } catch (ServiceBusException e) { + BError bError = ASBErrorCreator.fromASBException(e); + future.complete(bError); + } catch (Exception e) { + BError bError = ASBErrorCreator.fromUnhandledException(e); + future.complete(bError); + } + }); + return null; } private static ServiceBusMessage constructMessage(BMap message) { @@ -335,7 +363,7 @@ private static void setClient(BObject senderObject, ServiceBusSenderClient clien senderObject.addNativeData(ASBConstants.SENDER_CLIENT, client); } - private static ServiceBusSenderClient getSenderFromBObject(BObject senderObject) { + private static ServiceBusSenderClient getNativeSender(BObject senderObject) { return (ServiceBusSenderClient) senderObject.getNativeData(ASBConstants.SENDER_CLIENT); } } diff --git a/native/src/main/java/org/ballerinax/asb/sender/SenderNetworkThreadFactory.java b/native/src/main/java/org/ballerinax/asb/sender/SenderNetworkThreadFactory.java new file mode 100644 index 00000000..5a3a1b7f --- /dev/null +++ b/native/src/main/java/org/ballerinax/asb/sender/SenderNetworkThreadFactory.java @@ -0,0 +1,35 @@ +/* + * Copyright (c) 2024, WSO2 LLC. (http://www.wso2.org). + * + * WSO2 LLC. licenses this file to you under the Apache License, + * Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KINDither express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.ballerinax.asb.sender; + +import java.util.concurrent.ThreadFactory; + +/** + * A {@link ThreadFactory} object that creates new threads on demand for ASB message-sender network actions. + */ +public class SenderNetworkThreadFactory implements ThreadFactory { + private final String threadGroupName = "asb-sender-network-thread"; + + @Override + public Thread newThread(Runnable runnable) { + Thread senderThread = new Thread(runnable); + senderThread.setName(threadGroupName); + return senderThread; + } +}