Skip to content

Commit

Permalink
Merge pull request #177 from NipunaRanasinghe/2201.4.x
Browse files Browse the repository at this point in the history
[2201.4.x] Fix memory leak in receiver client
  • Loading branch information
NipunaRanasinghe authored Aug 2, 2023
2 parents 778571b + 062cd95 commit 5af5314
Show file tree
Hide file tree
Showing 4 changed files with 17 additions and 6 deletions.
6 changes: 3 additions & 3 deletions asb-ballerina/Ballerina.toml
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
distribution = "2201.4.1"
org = "ballerinax"
name = "asb"
version = "3.3.1"
version = "3.3.2"
license= ["Apache-2.0"]
authors = ["Ballerina"]
keywords = ["IT Operations/Message Brokers", "Cost/Paid", "Vendor/Microsoft"]
Expand All @@ -13,9 +13,9 @@ repository = "https://github.com/ballerina-platform/module-ballerinax-azure-serv
observabilityIncluded = true

[[platform.java11.dependency]]
path = "../asb-native/build/libs/asb-native-3.3.1.jar"
path = "../asb-native/build/libs/asb-native-3.3.2.jar"
groupId = "org.ballerinax"
artifactId = "asb-native"
module = "asb-native"
version = "3.3.1"
version = "3.3.2"

Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@
import static org.ballerinax.asb.util.ASBConstants.DEAD_LETTER_ERROR_DESCRIPTION;
import static org.ballerinax.asb.util.ASBConstants.DEAD_LETTER_REASON;
import static org.ballerinax.asb.util.ASBConstants.DEAD_LETTER_SOURCE;
import static org.ballerinax.asb.util.ASBConstants.DEFAULT_MESSAGE_LOCK_TOKEN;
import static org.ballerinax.asb.util.ASBConstants.DELIVERY_COUNT;
import static org.ballerinax.asb.util.ASBConstants.ENQUEUED_SEQUENCE_NUMBER;
import static org.ballerinax.asb.util.ASBConstants.ENQUEUED_TIME;
Expand Down Expand Up @@ -281,6 +282,7 @@ public static Object complete(BObject endpointClient, BString lockToken) {
ServiceBusReceivedMessage message = (ServiceBusReceivedMessage) endpointClient
.getNativeData(lockToken.getValue());
receiver.complete(message);
endpointClient.getNativeData().remove(lockToken.getValue());
LOGGER.debug("Completed the message(Id: " + message.getMessageId() + ") with lockToken " + lockToken);
return null;
} catch (BError e) {
Expand All @@ -305,6 +307,7 @@ public static Object abandon(BObject endpointClient, BString lockToken) {
ServiceBusReceivedMessage message = (ServiceBusReceivedMessage) endpointClient
.getNativeData(lockToken.getValue());
receiver.abandon(message);
endpointClient.getNativeData().remove(lockToken.getValue());
LOGGER.debug(String.format("Done abandoning a message(Id: %s) using its lock token from \n%s",
message.getMessageId(), receiver.getEntityPath()));
return null;
Expand Down Expand Up @@ -336,6 +339,7 @@ public static Object deadLetter(BObject endpointClient, BString lockToken, Objec
.setDeadLetterErrorDescription(ASBUtils.convertString(deadLetterErrorDescription));
options.setDeadLetterReason(ASBUtils.convertString(deadLetterReason));
receiver.deadLetter(message, options);
endpointClient.getNativeData().remove(lockToken.getValue());
LOGGER.debug(String.format("Done dead-lettering a message(Id: %s) using its lock token from %s",
message.getMessageId(), receiver.getEntityPath()));
return null;
Expand All @@ -361,6 +365,7 @@ public static Object defer(BObject endpointClient, BString lockToken) {
ServiceBusReceivedMessage message = (ServiceBusReceivedMessage) endpointClient
.getNativeData(lockToken.getValue());
receiver.defer(message);
endpointClient.getNativeData().remove(lockToken.getValue());
LOGGER.debug(String.format("Done deferring a message(Id: %s) using its lock token from %s",
message.getMessageId(), receiver.getEntityPath()));
return null;
Expand Down Expand Up @@ -417,6 +422,7 @@ public static Object renewLock(BObject endpointClient, BString lockToken) {
.getNativeData(lockToken.getValue());
ServiceBusReceiverClient receiver = getReceiverFromBObject(endpointClient);
receiver.renewMessageLock(message);
endpointClient.getNativeData().remove(lockToken.getValue());
LOGGER.debug(String.format("Done renewing a message(Id: %s) using its lock token from %s",
message.getMessageId(), receiver.getEntityPath()));
return null;
Expand Down Expand Up @@ -471,7 +477,11 @@ private static BMap<BString, Object> constructExpectedMessageRecord(BObject endp
} else {
map.put(BODY, messageBody);
}
endpointClient.addNativeData(message.getLockToken(), message);

// This is to avoid adding messages to the native data map, if the receive mode is 'RECEIVE_AND_DELETE'.
if (!message.getLockToken().equals(DEFAULT_MESSAGE_LOCK_TOKEN)) {
endpointClient.addNativeData(message.getLockToken(), message);
}
return createBRecordValue(map, expectedType);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ public class ASBConstants {
public static final String APPLICATION_PROPERTIES = "properties";

public static final int DEFAULT_TIME_TO_LIVE = 60; // In seconds
public static final String DEFAULT_MESSAGE_LOCK_TOKEN = "00000000-0000-0000-0000-000000000000";

// listener constant fields
public static final String CONSUMER_SERVICES = "consumer_services";
Expand Down
4 changes: 2 additions & 2 deletions gradle.properties
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
org.gradle.caching=true
group=org.ballerinax.azure.servicebus
version=3.3.1
version=3.3.2
ballerinaLangVersion=2201.4.1

azureServiceBusVersion=7.13.1
azureServiceBusVersion=7.14.1
githubJohnrengelmanShadowVersion=5.2.0
underCouchDownloadVersion=4.0.4
checkstyleToolVersion=7.8.2
Expand Down

0 comments on commit 5af5314

Please sign in to comment.