From ac5a34239c397be674cd99a6fbe2346fcc5e210f Mon Sep 17 00:00:00 2001 From: Nipuna Ranasinghe Date: Fri, 26 May 2023 11:11:29 +0530 Subject: [PATCH 1/3] Fix array size when receiving message batches --- .../asb/receiver/MessageReceiver.java | 22 +++++++++---------- 1 file changed, 11 insertions(+), 11 deletions(-) diff --git a/asb-native/src/main/java/org/ballerinax/asb/receiver/MessageReceiver.java b/asb-native/src/main/java/org/ballerinax/asb/receiver/MessageReceiver.java index e5fb91dc..428c6178 100644 --- a/asb-native/src/main/java/org/ballerinax/asb/receiver/MessageReceiver.java +++ b/asb-native/src/main/java/org/ballerinax/asb/receiver/MessageReceiver.java @@ -53,10 +53,12 @@ import java.time.Duration; import java.util.HashMap; +import java.util.LinkedList; import java.util.Map; import java.util.Objects; import java.util.Optional; +import static io.ballerina.runtime.api.creators.ValueCreator.createRecordValue; import static org.ballerinax.asb.util.ASBConstants.APPLICATION_PROPERTY_KEY; import static org.ballerinax.asb.util.ASBConstants.BODY; import static org.ballerinax.asb.util.ASBConstants.CONTENT_TYPE; @@ -501,9 +503,9 @@ private static Map populateOptionalFieldsMap(ServiceBusReceivedM private static BMap createBRecordValue(Map map, RecordType recordType) { if (recordType == null) { - return ValueCreator.createRecordValue(ModuleUtils.getModule(), ASBConstants.MESSAGE_RECORD, map); + return createRecordValue(ModuleUtils.getModule(), ASBConstants.MESSAGE_RECORD, map); } else { - return ValueCreator.createRecordValue(recordType.getPackage(), recordType.getName(), map); + return createRecordValue(recordType.getPackage(), recordType.getName(), map); } } @@ -534,7 +536,6 @@ private static BMap getReceivedMessageBatch(BObject endpointCli throws InterruptedException, ServiceBusException { ServiceBusReceiverClient receiver = getReceiverFromBObject(endpointClient); int maxCount = Long.valueOf(maxMessageCount.toString()).intValue(); - Object[] messages = new Object[maxCount]; IterableStream receivedMessageStream; if (serverWaitTime != null) { receivedMessageStream = receiver.receiveMessages(maxCount, Duration.ofSeconds((long) serverWaitTime)); @@ -542,11 +543,10 @@ private static BMap getReceivedMessageBatch(BObject endpointCli receivedMessageStream = receiver.receiveMessages(maxCount); } - int messageCount = 0; + LinkedList receivedMessages = new LinkedList<>(); for (ServiceBusReceivedMessage receivedMessage : receivedMessageStream) { BMap recordMap = constructExpectedMessageRecord(endpointClient, receivedMessage, null); - messages[messageCount++] = ValueCreator.createRecordValue(ModuleUtils.getModule(), - ASBConstants.MESSAGE_RECORD, recordMap); + receivedMessages.add(createRecordValue(ModuleUtils.getModule(), ASBConstants.MESSAGE_RECORD, recordMap)); } BMap messageRecord = ValueCreator.createRecordValue(ModuleUtils.getModule(), @@ -554,20 +554,20 @@ private static BMap getReceivedMessageBatch(BObject endpointCli ArrayType sourceArrayType = TypeCreator.createArrayType(TypeUtils.getType(messageRecord)); Map map = new HashMap<>(); - map.put("messageCount", messageCount); - map.put("messages", ValueCreator.createArrayValue(messages, sourceArrayType)); - return ValueCreator.createRecordValue(ModuleUtils.getModule(), ASBConstants.MESSAGE_BATCH_RECORD, map); + map.put("messageCount", receivedMessages.size()); + map.put("messages", ValueCreator.createArrayValue(receivedMessages.toArray(new Object[0]), sourceArrayType)); + return createRecordValue(ModuleUtils.getModule(), ASBConstants.MESSAGE_BATCH_RECORD, map); } private static BMap getApplicationProperties(ServiceBusReceivedMessage message) { - BMap applicationPropertiesRecord = ValueCreator.createRecordValue(ModuleUtils.getModule(), + BMap applicationPropertiesRecord = createRecordValue(ModuleUtils.getModule(), ASBConstants.APPLICATION_PROPERTY_TYPE); MapType mapType = TypeCreator.createMapType(PredefinedTypes.TYPE_ANYDATA); BMap applicationProperties = ValueCreator.createMapValue(mapType); for (Map.Entry property : message.getApplicationProperties().entrySet()) { populateApplicationProperty(applicationProperties, property.getKey(), property.getValue()); } - return ValueCreator.createRecordValue(applicationPropertiesRecord, applicationProperties); + return createRecordValue(applicationPropertiesRecord, applicationProperties); } private static void populateApplicationProperty(BMap applicationProperties, From b72caf0d61de73d1868ab6359f3e97ff40ce480f Mon Sep 17 00:00:00 2001 From: Nipuna Ranasinghe Date: Fri, 26 May 2023 13:13:12 +0530 Subject: [PATCH 2/3] Update tests to capture the failing scenario --- asb-ballerina/tests/asb_test.bal | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/asb-ballerina/tests/asb_test.bal b/asb-ballerina/tests/asb_test.bal index 46e28fa4..7eea7ef0 100644 --- a/asb-ballerina/tests/asb_test.bal +++ b/asb-ballerina/tests/asb_test.bal @@ -206,11 +206,15 @@ function testSendAndReceiveBatchFromQueueOperation() returns error? { log:printInfo("Sending via Asb sender."); check messageSender->sendBatch(messages); + // Here we set the batch size to be more than the number of messages sent. + // This is to validate whether the received message count is always same as the sent count, + // even when the expected count is larger than the sent count log:printInfo("Receiving from Asb receiver."); - MessageBatch|error? messageReceived = messageReceiver->receiveBatch(maxMessageCount); + MessageBatch|error? messageReceived = messageReceiver->receiveBatch(messages.length() + 5); if (messageReceived is MessageBatch) { log:printInfo(messageReceived.toString()); + test:assertEquals(messageReceived.messages.length(), messages.length(), msg = "Sent & received message counts are not equal."); foreach Message message in messageReceived.messages { if (message.toString() != "") { string msg = check string:fromBytes(message.body); From 8519f1b7d8bfcb82f54479047e365d5c21eaf977 Mon Sep 17 00:00:00 2001 From: Nipuna Ranasinghe Date: Fri, 26 May 2023 13:13:27 +0530 Subject: [PATCH 3/3] Resolve merge conflicts --- asb-ballerina/Ballerina.toml | 6 +++--- gradle.properties | 2 +- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/asb-ballerina/Ballerina.toml b/asb-ballerina/Ballerina.toml index 044dcbed..81bada21 100644 --- a/asb-ballerina/Ballerina.toml +++ b/asb-ballerina/Ballerina.toml @@ -2,7 +2,7 @@ distribution = "2201.4.1" org = "ballerinax" name = "asb" -version = "3.3.0" +version = "3.3.1" license= ["Apache-2.0"] authors = ["Ballerina"] keywords = ["IT Operations/Message Brokers", "Cost/Paid", "Vendor/Microsoft"] @@ -13,9 +13,9 @@ repository = "https://github.com/ballerina-platform/module-ballerinax-azure-serv observabilityIncluded = true [[platform.java11.dependency]] -path = "../asb-native/build/libs/asb-native-3.3.0.jar" +path = "../asb-native/build/libs/asb-native-3.3.1.jar" groupId = "org.ballerinax" artifactId = "asb-native" module = "asb-native" -version = "3.3.0" +version = "3.3.1" diff --git a/gradle.properties b/gradle.properties index 00b8c27c..5826da90 100644 --- a/gradle.properties +++ b/gradle.properties @@ -1,6 +1,6 @@ org.gradle.caching=true group=org.ballerinax.azure.servicebus -version=3.3.0 +version=3.3.1 ballerinaLangVersion=2201.4.1 azureServiceBusVersion=7.13.1