Skip to content

Commit

Permalink
Merge pull request #160 from NipunaRanasinghe/2201.4.x
Browse files Browse the repository at this point in the history
[2201.4.x] Fix message count inreceiveBatch() API when there are less messages than the requested count
  • Loading branch information
LakshanSS authored May 26, 2023
2 parents 1ec8f51 + 8519f1b commit 778571b
Show file tree
Hide file tree
Showing 4 changed files with 20 additions and 16 deletions.
6 changes: 3 additions & 3 deletions asb-ballerina/Ballerina.toml
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
distribution = "2201.4.1"
org = "ballerinax"
name = "asb"
version = "3.3.0"
version = "3.3.1"
license= ["Apache-2.0"]
authors = ["Ballerina"]
keywords = ["IT Operations/Message Brokers", "Cost/Paid", "Vendor/Microsoft"]
Expand All @@ -13,9 +13,9 @@ repository = "https://github.com/ballerina-platform/module-ballerinax-azure-serv
observabilityIncluded = true

[[platform.java11.dependency]]
path = "../asb-native/build/libs/asb-native-3.3.0.jar"
path = "../asb-native/build/libs/asb-native-3.3.1.jar"
groupId = "org.ballerinax"
artifactId = "asb-native"
module = "asb-native"
version = "3.3.0"
version = "3.3.1"

6 changes: 5 additions & 1 deletion asb-ballerina/tests/asb_test.bal
Original file line number Diff line number Diff line change
Expand Up @@ -206,11 +206,15 @@ function testSendAndReceiveBatchFromQueueOperation() returns error? {
log:printInfo("Sending via Asb sender.");
check messageSender->sendBatch(messages);

// Here we set the batch size to be more than the number of messages sent.
// This is to validate whether the received message count is always same as the sent count,
// even when the expected count is larger than the sent count
log:printInfo("Receiving from Asb receiver.");
MessageBatch|error? messageReceived = messageReceiver->receiveBatch(maxMessageCount);
MessageBatch|error? messageReceived = messageReceiver->receiveBatch(messages.length() + 5);

if (messageReceived is MessageBatch) {
log:printInfo(messageReceived.toString());
test:assertEquals(messageReceived.messages.length(), messages.length(), msg = "Sent & received message counts are not equal.");
foreach Message message in messageReceived.messages {
if (message.toString() != "") {
string msg = check string:fromBytes(<byte[]>message.body);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,10 +53,12 @@

import java.time.Duration;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;

import static io.ballerina.runtime.api.creators.ValueCreator.createRecordValue;
import static org.ballerinax.asb.util.ASBConstants.APPLICATION_PROPERTY_KEY;
import static org.ballerinax.asb.util.ASBConstants.BODY;
import static org.ballerinax.asb.util.ASBConstants.CONTENT_TYPE;
Expand Down Expand Up @@ -501,9 +503,9 @@ private static Map<String, Object> populateOptionalFieldsMap(ServiceBusReceivedM

private static BMap<BString, Object> createBRecordValue(Map<String, Object> map, RecordType recordType) {
if (recordType == null) {
return ValueCreator.createRecordValue(ModuleUtils.getModule(), ASBConstants.MESSAGE_RECORD, map);
return createRecordValue(ModuleUtils.getModule(), ASBConstants.MESSAGE_RECORD, map);
} else {
return ValueCreator.createRecordValue(recordType.getPackage(), recordType.getName(), map);
return createRecordValue(recordType.getPackage(), recordType.getName(), map);
}
}

Expand Down Expand Up @@ -534,40 +536,38 @@ private static BMap<BString, Object> getReceivedMessageBatch(BObject endpointCli
throws InterruptedException, ServiceBusException {
ServiceBusReceiverClient receiver = getReceiverFromBObject(endpointClient);
int maxCount = Long.valueOf(maxMessageCount.toString()).intValue();
Object[] messages = new Object[maxCount];
IterableStream<ServiceBusReceivedMessage> receivedMessageStream;
if (serverWaitTime != null) {
receivedMessageStream = receiver.receiveMessages(maxCount, Duration.ofSeconds((long) serverWaitTime));
} else {
receivedMessageStream = receiver.receiveMessages(maxCount);
}

int messageCount = 0;
LinkedList<Object> receivedMessages = new LinkedList<>();
for (ServiceBusReceivedMessage receivedMessage : receivedMessageStream) {
BMap<BString, Object> recordMap = constructExpectedMessageRecord(endpointClient, receivedMessage, null);
messages[messageCount++] = ValueCreator.createRecordValue(ModuleUtils.getModule(),
ASBConstants.MESSAGE_RECORD, recordMap);
receivedMessages.add(createRecordValue(ModuleUtils.getModule(), ASBConstants.MESSAGE_RECORD, recordMap));
}

BMap<BString, Object> messageRecord = ValueCreator.createRecordValue(ModuleUtils.getModule(),
ASBConstants.MESSAGE_RECORD);
ArrayType sourceArrayType = TypeCreator.createArrayType(TypeUtils.getType(messageRecord));

Map<String, Object> map = new HashMap<>();
map.put("messageCount", messageCount);
map.put("messages", ValueCreator.createArrayValue(messages, sourceArrayType));
return ValueCreator.createRecordValue(ModuleUtils.getModule(), ASBConstants.MESSAGE_BATCH_RECORD, map);
map.put("messageCount", receivedMessages.size());
map.put("messages", ValueCreator.createArrayValue(receivedMessages.toArray(new Object[0]), sourceArrayType));
return createRecordValue(ModuleUtils.getModule(), ASBConstants.MESSAGE_BATCH_RECORD, map);
}

private static BMap<BString, Object> getApplicationProperties(ServiceBusReceivedMessage message) {
BMap<BString, Object> applicationPropertiesRecord = ValueCreator.createRecordValue(ModuleUtils.getModule(),
BMap<BString, Object> applicationPropertiesRecord = createRecordValue(ModuleUtils.getModule(),
ASBConstants.APPLICATION_PROPERTY_TYPE);
MapType mapType = TypeCreator.createMapType(PredefinedTypes.TYPE_ANYDATA);
BMap<BString, Object> applicationProperties = ValueCreator.createMapValue(mapType);
for (Map.Entry<String, Object> property : message.getApplicationProperties().entrySet()) {
populateApplicationProperty(applicationProperties, property.getKey(), property.getValue());
}
return ValueCreator.createRecordValue(applicationPropertiesRecord, applicationProperties);
return createRecordValue(applicationPropertiesRecord, applicationProperties);
}

private static void populateApplicationProperty(BMap<BString, Object> applicationProperties,
Expand Down
2 changes: 1 addition & 1 deletion gradle.properties
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
org.gradle.caching=true
group=org.ballerinax.azure.servicebus
version=3.3.0
version=3.3.1
ballerinaLangVersion=2201.4.1

azureServiceBusVersion=7.13.1
Expand Down

0 comments on commit 778571b

Please sign in to comment.