From a4e5c11db31b0f322981682fd8809da8afee6ae8 Mon Sep 17 00:00:00 2001 From: Nipuna Ranasinghe Date: Tue, 1 Aug 2023 13:58:45 +0530 Subject: [PATCH 1/2] FIx memory leak in receiver client --- .../org/ballerinax/asb/receiver/MessageReceiver.java | 12 +++++++++++- .../java/org/ballerinax/asb/util/ASBConstants.java | 1 + 2 files changed, 12 insertions(+), 1 deletion(-) diff --git a/asb-native/src/main/java/org/ballerinax/asb/receiver/MessageReceiver.java b/asb-native/src/main/java/org/ballerinax/asb/receiver/MessageReceiver.java index 428c6178..698a2b1f 100644 --- a/asb-native/src/main/java/org/ballerinax/asb/receiver/MessageReceiver.java +++ b/asb-native/src/main/java/org/ballerinax/asb/receiver/MessageReceiver.java @@ -66,6 +66,7 @@ import static org.ballerinax.asb.util.ASBConstants.DEAD_LETTER_ERROR_DESCRIPTION; import static org.ballerinax.asb.util.ASBConstants.DEAD_LETTER_REASON; import static org.ballerinax.asb.util.ASBConstants.DEAD_LETTER_SOURCE; +import static org.ballerinax.asb.util.ASBConstants.DEFAULT_MESSAGE_LOCK_TOKEN; import static org.ballerinax.asb.util.ASBConstants.DELIVERY_COUNT; import static org.ballerinax.asb.util.ASBConstants.ENQUEUED_SEQUENCE_NUMBER; import static org.ballerinax.asb.util.ASBConstants.ENQUEUED_TIME; @@ -281,6 +282,7 @@ public static Object complete(BObject endpointClient, BString lockToken) { ServiceBusReceivedMessage message = (ServiceBusReceivedMessage) endpointClient .getNativeData(lockToken.getValue()); receiver.complete(message); + endpointClient.getNativeData().remove(lockToken.getValue()); LOGGER.debug("Completed the message(Id: " + message.getMessageId() + ") with lockToken " + lockToken); return null; } catch (BError e) { @@ -305,6 +307,7 @@ public static Object abandon(BObject endpointClient, BString lockToken) { ServiceBusReceivedMessage message = (ServiceBusReceivedMessage) endpointClient .getNativeData(lockToken.getValue()); receiver.abandon(message); + endpointClient.getNativeData().remove(lockToken.getValue()); LOGGER.debug(String.format("Done abandoning a message(Id: %s) using its lock token from \n%s", message.getMessageId(), receiver.getEntityPath())); return null; @@ -336,6 +339,7 @@ public static Object deadLetter(BObject endpointClient, BString lockToken, Objec .setDeadLetterErrorDescription(ASBUtils.convertString(deadLetterErrorDescription)); options.setDeadLetterReason(ASBUtils.convertString(deadLetterReason)); receiver.deadLetter(message, options); + endpointClient.getNativeData().remove(lockToken.getValue()); LOGGER.debug(String.format("Done dead-lettering a message(Id: %s) using its lock token from %s", message.getMessageId(), receiver.getEntityPath())); return null; @@ -361,6 +365,7 @@ public static Object defer(BObject endpointClient, BString lockToken) { ServiceBusReceivedMessage message = (ServiceBusReceivedMessage) endpointClient .getNativeData(lockToken.getValue()); receiver.defer(message); + endpointClient.getNativeData().remove(lockToken.getValue()); LOGGER.debug(String.format("Done deferring a message(Id: %s) using its lock token from %s", message.getMessageId(), receiver.getEntityPath())); return null; @@ -417,6 +422,7 @@ public static Object renewLock(BObject endpointClient, BString lockToken) { .getNativeData(lockToken.getValue()); ServiceBusReceiverClient receiver = getReceiverFromBObject(endpointClient); receiver.renewMessageLock(message); + endpointClient.getNativeData().remove(lockToken.getValue()); LOGGER.debug(String.format("Done renewing a message(Id: %s) using its lock token from %s", message.getMessageId(), receiver.getEntityPath())); return null; @@ -471,7 +477,11 @@ private static BMap constructExpectedMessageRecord(BObject endp } else { map.put(BODY, messageBody); } - endpointClient.addNativeData(message.getLockToken(), message); + + // This is to avoid adding messages to the native data map, if the receive mode is 'RECEIVE_AND_DELETE'. + if (!message.getLockToken().equals(DEFAULT_MESSAGE_LOCK_TOKEN)) { + endpointClient.addNativeData(message.getLockToken(), message); + } return createBRecordValue(map, expectedType); } diff --git a/asb-native/src/main/java/org/ballerinax/asb/util/ASBConstants.java b/asb-native/src/main/java/org/ballerinax/asb/util/ASBConstants.java index 3a560ca7..1ab9fc84 100644 --- a/asb-native/src/main/java/org/ballerinax/asb/util/ASBConstants.java +++ b/asb-native/src/main/java/org/ballerinax/asb/util/ASBConstants.java @@ -69,6 +69,7 @@ public class ASBConstants { public static final String APPLICATION_PROPERTIES = "properties"; public static final int DEFAULT_TIME_TO_LIVE = 60; // In seconds + public static final String DEFAULT_MESSAGE_LOCK_TOKEN = "00000000-0000-0000-0000-000000000000"; // listener constant fields public static final String CONSUMER_SERVICES = "consumer_services"; From 062cd9509df5fdea790acca6a227744360071dc9 Mon Sep 17 00:00:00 2001 From: Nipuna Ranasinghe Date: Tue, 1 Aug 2023 13:59:04 +0530 Subject: [PATCH 2/2] Update versions for the next release --- asb-ballerina/Ballerina.toml | 6 +++--- gradle.properties | 4 ++-- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/asb-ballerina/Ballerina.toml b/asb-ballerina/Ballerina.toml index 81bada21..f76b850b 100644 --- a/asb-ballerina/Ballerina.toml +++ b/asb-ballerina/Ballerina.toml @@ -2,7 +2,7 @@ distribution = "2201.4.1" org = "ballerinax" name = "asb" -version = "3.3.1" +version = "3.3.2" license= ["Apache-2.0"] authors = ["Ballerina"] keywords = ["IT Operations/Message Brokers", "Cost/Paid", "Vendor/Microsoft"] @@ -13,9 +13,9 @@ repository = "https://github.com/ballerina-platform/module-ballerinax-azure-serv observabilityIncluded = true [[platform.java11.dependency]] -path = "../asb-native/build/libs/asb-native-3.3.1.jar" +path = "../asb-native/build/libs/asb-native-3.3.2.jar" groupId = "org.ballerinax" artifactId = "asb-native" module = "asb-native" -version = "3.3.1" +version = "3.3.2" diff --git a/gradle.properties b/gradle.properties index 5826da90..0218e0bb 100644 --- a/gradle.properties +++ b/gradle.properties @@ -1,9 +1,9 @@ org.gradle.caching=true group=org.ballerinax.azure.servicebus -version=3.3.1 +version=3.3.2 ballerinaLangVersion=2201.4.1 -azureServiceBusVersion=7.13.1 +azureServiceBusVersion=7.14.1 githubJohnrengelmanShadowVersion=5.2.0 underCouchDownloadVersion=4.0.4 checkstyleToolVersion=7.8.2