Skip to content

Commit

Permalink
Merge pull request #196 from RDPerera/memory-leak-fix
Browse files Browse the repository at this point in the history
Fix Native Data Retention Issue
  • Loading branch information
ayeshLK authored Oct 12, 2023
2 parents c8a1f8f + cae2e53 commit d0b8b8d
Show file tree
Hide file tree
Showing 4 changed files with 147 additions and 59 deletions.
29 changes: 14 additions & 15 deletions ballerina/receiver.bal
Original file line number Diff line number Diff line change
Expand Up @@ -117,9 +117,9 @@ public isolated client class MessageReceiver {
isolated remote function complete(@display {label: "Message"} Message message)
returns Error? {
if message?.lockToken.toString() != DEFAULT_MESSAGE_LOCK_TOKEN {
return complete(self, message?.lockToken.toString());
return complete(self, message);
}
return createError("Failed to complete message with ID " + message?.messageId.toString());
return createError(string `Failed to complete message with ID: ${message?.messageId.toString()}`);
}

# Abandon message from queue or subscription based on messageLockToken. Abandon processing of the message for
Expand All @@ -131,9 +131,9 @@ public isolated client class MessageReceiver {
@display {label: "Abandon Message"}
isolated remote function abandon(@display {label: "Message"} Message message) returns Error? {
if message?.lockToken.toString() != DEFAULT_MESSAGE_LOCK_TOKEN {
return abandon(self, message?.lockToken.toString());
return abandon(self, message);
}
return createError("Failed to abandon message with ID " + message?.messageId.toString());
return createError(string `Failed to abandon message with ID: ${message?.messageId.toString()}`);
}

# Dead-Letter the message & moves the message to the Dead-Letter Queue based on messageLockToken. Transfer
Expand All @@ -149,10 +149,9 @@ public isolated client class MessageReceiver {
@display {label: "Dead Letter Description"}
string? deadLetterErrorDescription = ()) returns Error? {
if message?.lockToken.toString() != DEFAULT_MESSAGE_LOCK_TOKEN {
return deadLetter(self, message?.lockToken.toString(), deadLetterReason,
deadLetterErrorDescription);
return deadLetter(self, message, deadLetterReason, deadLetterErrorDescription);
}
return createError("Failed to deadletter message with ID " + message?.messageId.toString());
return createError(string `Failed to deadletter message with ID: ${message?.messageId.toString()}`);
}

# Defer the message in a Queue or Subscription based on messageLockToken. It prevents the message from being
Expand All @@ -163,7 +162,7 @@ public isolated client class MessageReceiver {
@display {label: "Defer Message"}
isolated remote function defer(@display {label: "Message"} Message message)
returns @display {label: "Deferred Msg Seq Num"} int|Error {
check defer(self, message?.lockToken.toString());
check defer(self, message);
return <int>message?.sequenceNumber;
}

Expand All @@ -188,9 +187,9 @@ public isolated client class MessageReceiver {
@display {label: "Renew Lock On Message"}
isolated remote function renewLock(@display {label: "Message"} Message message) returns Error? {
if message?.lockToken.toString() != DEFAULT_MESSAGE_LOCK_TOKEN {
return renewLock(self, message?.lockToken.toString());
return renewLock(self, message);
}
return createError("Failed to renew lock on message with ID " + message?.messageId.toString());
return createError(string `Failed to renew lock on message with ID: ${message?.messageId.toString()}`);
}

# Closes the ASB sender connection.
Expand All @@ -213,27 +212,27 @@ isolated function receiveBatch(MessageReceiver endpointClient, int? maxMessageCo
'class: "org.ballerinax.asb.receiver.MessageReceiver"
} external;

isolated function complete(MessageReceiver endpointClient, string lockToken) returns Error? = @java:Method {
isolated function complete(MessageReceiver endpointClient, Message message) returns Error? = @java:Method {
'class: "org.ballerinax.asb.receiver.MessageReceiver"
} external;

isolated function abandon(MessageReceiver endpointClient, string lockToken) returns Error? = @java:Method {
isolated function abandon(MessageReceiver endpointClient, Message message) returns Error? = @java:Method {
'class: "org.ballerinax.asb.receiver.MessageReceiver"
} external;

isolated function deadLetter(MessageReceiver endpointClient, string lockToken, string? deadLetterReason, string? deadLetterErrorDescription) returns
isolated function deadLetter(MessageReceiver endpointClient, Message message, string? deadLetterReason, string? deadLetterErrorDescription) returns
Error? = @java:Method {
'class: "org.ballerinax.asb.receiver.MessageReceiver"
} external;

isolated function defer(MessageReceiver endpointClient, string lockToken) returns Error? = @java:Method {
isolated function defer(MessageReceiver endpointClient, Message message) returns Error? = @java:Method {
'class: "org.ballerinax.asb.receiver.MessageReceiver"
} external;

isolated function receiveDeferred(MessageReceiver endpointClient, int sequenceNumber) returns Message|Error? = @java:Method {
'class: "org.ballerinax.asb.receiver.MessageReceiver"
} external;

isolated function renewLock(MessageReceiver endpointClient, string lockToken) returns Error? = @java:Method {
isolated function renewLock(MessageReceiver endpointClient, Message message) returns Error? = @java:Method {
'class: "org.ballerinax.asb.receiver.MessageReceiver"
} external;
83 changes: 83 additions & 0 deletions ballerina/tests/asb_sender_receiver_negative_tests.bal
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,10 @@

import ballerina/log;
import ballerina/test;
import ballerina/regex;

string invalidCompleteError = "^Failed to complete message with ID:.*$";
string invalidAbandonError = "^Failed to abandon message with ID:.*$";
@test:Config {
groups: ["asb_sender_receiver_negative"],
dependsOn: [testCreateQueue, testCreateTopicOperation, testCreateSubscription]
Expand Down Expand Up @@ -52,6 +55,86 @@ function testReceivePayloadWithIncorrectExpectedType() returns error? {
groups: ["asb_sender_receiver_negative"],
dependsOn: [testReceivePayloadWithIncorrectExpectedType]
}
function testInvalidComplete() returns error? {
log:printInfo("[[testInvalidComplete]");

log:printInfo("Initializing Asb sender client.");
MessageSender messageSender = check new (senderConfig);

log:printInfo("Initializing Asb receiver client.");
receiverConfig.receiveMode = RECEIVE_AND_DELETE;

MessageReceiver messageReceiver = check new (receiverConfig);

log:printInfo("Sending via Asb sender.");
check messageSender->send(message1);

log:printInfo("Receiving from Asb receiver client.");
Message|error? receivedMessage = messageReceiver->receive(serverWaitTime);

if receivedMessage is Message {
log:printInfo("messgae" + receivedMessage.toString());
Error? result = messageReceiver->complete(receivedMessage);
test:assertTrue(result is error, msg = "Unexpected Complete for Messages in Receive and Delete Mode");
test:assertTrue(regex:matches((<Error>result).message(),invalidCompleteError), msg = "Invalid Complete for " +
" Messages in Receive and Delete Mode");
} else if receivedMessage is () {
test:assertFail("No message in the queue.");
} else {
test:assertFail("Receiving message via Asb receiver connection failed.");
}

log:printInfo("Closing Asb sender client.");
check messageSender->close();

log:printInfo("Closing Asb receiver client.");
check messageReceiver->close();
}

@test:Config {
groups: ["asb_sender_receiver_negative"],
dependsOn: [testInvalidComplete]
}
function testInvalidAbandon() returns error? {
log:printInfo("[[testInvalidAbandon]");

log:printInfo("Initializing Asb sender client.");
MessageSender messageSender = check new (senderConfig);

log:printInfo("Initializing Asb receiver client.");
receiverConfig.receiveMode = RECEIVE_AND_DELETE;

MessageReceiver messageReceiver = check new (receiverConfig);

log:printInfo("Sending via Asb sender.");
check messageSender->send(message1);

log:printInfo("Receiving from Asb receiver client.");
Message|error? receivedMessage = messageReceiver->receive(serverWaitTime);

if receivedMessage is Message {
log:printInfo("messgae" + receivedMessage.toString());
Error? result = messageReceiver->abandon(receivedMessage);
test:assertTrue(result is error, msg = "Unexpected Abandon for Messages in Receive and Delete Mode");
test:assertTrue(regex:matches((<Error>result).message(),invalidAbandonError), msg = "Invalid Abandon for " +
" Messages in Receive and Delete Mode");
} else if receivedMessage is () {
test:assertFail("No message in the queue.");
} else {
test:assertFail("Receiving message via Asb receiver connection failed.");
}

log:printInfo("Closing Asb sender client.");
check messageSender->close();

log:printInfo("Closing Asb receiver client.");
check messageReceiver->close();
}

@test:Config {
groups: ["asb_sender_receiver_negative"],
dependsOn: [testInvalidAbandon]
}
function testReceivePayloadWithUnsupportedUnionExpectedType() returns error? {
log:printInfo("[[testReceivePayloadWithUnsupportedUnionExpectedType]]");
log:printInfo("Creating Asb message sender.");
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2021, WSO2 LLC. (http://www.wso2.org).
* Copyright (c) 2023, WSO2 LLC. (http://www.wso2.org).
*
* WSO2 LLC. licenses this file to you under the Apache License,
* Version 2.0 (the "License"); you may not use this file except
Expand Down Expand Up @@ -68,6 +68,7 @@
import static org.ballerinax.asb.util.ASBConstants.LABEL;
import static org.ballerinax.asb.util.ASBConstants.LOCK_TOKEN;
import static org.ballerinax.asb.util.ASBConstants.MESSAGE_ID;
import static org.ballerinax.asb.util.ASBConstants.NATIVE_MESSAGE;
import static org.ballerinax.asb.util.ASBConstants.PARTITION_KEY;
import static org.ballerinax.asb.util.ASBConstants.RECEIVER_CLIENT;
import static org.ballerinax.asb.util.ASBConstants.REPLY_TO;
Expand Down Expand Up @@ -265,22 +266,21 @@ public static Object receiveBatch(BObject receiverClient, Object maxMessageCount
* Completes Messages from Queue or Subscription based on messageLockToken.
*
* @param receiverClient Ballerina ASB client object
* @param lockToken Message lock token.
* @param message Message object.
* @return An error if failed to complete the message.
*/
public static Object complete(BObject receiverClient, BString lockToken) {
public static Object complete(BObject receiverClient, BMap<BString, Object> message) {
try {
ServiceBusReceivedMessage message = (ServiceBusReceivedMessage) receiverClient
.getNativeData(lockToken.getValue());
ServiceBusReceivedMessage nativeMessage = getNativeMessage(message);
ServiceBusReceiverClient receiver;
if (message.getDeadLetterReason() != null) {
if (nativeMessage.getDeadLetterReason() != null) {
receiver = (ServiceBusReceiverClient) getDeadLetterMessageReceiverFromBObject(receiverClient);
} else {
receiver = getReceiverFromBObject(receiverClient);
}
receiver.complete(message);
receiverClient.getNativeData().remove(lockToken.getValue());
LOGGER.debug("Completed the message(Id: " + message.getMessageId() + ") with lockToken " + lockToken);
receiver.complete(nativeMessage);
LOGGER.debug("Completed the message(Id: " + nativeMessage.getMessageId() + ") with lockToken " +
nativeMessage.getLockToken());
return null;
} catch (BError e) {
return ASBErrorCreator.fromBError(e);
Expand All @@ -295,18 +295,16 @@ public static Object complete(BObject receiverClient, BString lockToken) {
* Abandons message & make available again for processing from Queue or Subscription, based on messageLockToken.
*
* @param receiverClient Ballerina ASB client object
* @param lockToken Message lock token.
* @param message Message object.
* @return An error if failed to abandon the message.
*/
public static Object abandon(BObject receiverClient, BString lockToken) {
public static Object abandon(BObject receiverClient, BMap<BString, Object> message) {
try {
ServiceBusReceiverClient receiver = getReceiverFromBObject(receiverClient);
ServiceBusReceivedMessage message = (ServiceBusReceivedMessage) receiverClient
.getNativeData(lockToken.getValue());
receiver.abandon(message);
receiverClient.getNativeData().remove(lockToken.getValue());
ServiceBusReceivedMessage nativeMessage = getNativeMessage(message);
receiver.abandon(nativeMessage);
LOGGER.debug(String.format("Done abandoning a message(Id: %s) using its lock token from %n%s",
message.getMessageId(), receiver.getEntityPath()));
nativeMessage.getMessageId(), receiver.getEntityPath()));
return null;
} catch (BError e) {
return ASBErrorCreator.fromBError(e);
Expand All @@ -321,24 +319,22 @@ public static Object abandon(BObject receiverClient, BString lockToken) {
* Dead-Letter the message & moves the message to the Dead-Letter Queue based on messageLockToken.
*
* @param receiverClient Ballerina ASB client object
* @param lockToken Message lock token.
* @param message Message object.
* @param deadLetterReason The dead letter reason.
* @param deadLetterErrorDescription The dead letter error description.
* @return An error if failed to dead letter the message.
*/
public static Object deadLetter(BObject receiverClient, BString lockToken, Object deadLetterReason,
public static Object deadLetter(BObject receiverClient, BMap<BString, Object> message, Object deadLetterReason,
Object deadLetterErrorDescription) {
try {
ServiceBusReceiverClient receiver = getReceiverFromBObject(receiverClient);
ServiceBusReceivedMessage message = (ServiceBusReceivedMessage) receiverClient
.getNativeData(lockToken.getValue());
ServiceBusReceivedMessage nativeMessage = getNativeMessage(message);
DeadLetterOptions options = new DeadLetterOptions()
.setDeadLetterErrorDescription(ASBUtils.convertString(deadLetterErrorDescription));
options.setDeadLetterReason(ASBUtils.convertString(deadLetterReason));
receiver.deadLetter(message, options);
receiverClient.getNativeData().remove(lockToken.getValue());
receiver.deadLetter(nativeMessage, options);
LOGGER.debug(String.format("Done dead-lettering a message(Id: %s) using its lock token from %s",
message.getMessageId(), receiver.getEntityPath()));
nativeMessage.getMessageId(), receiver.getEntityPath()));
return null;
} catch (BError e) {
return ASBErrorCreator.fromBError(e);
Expand All @@ -353,18 +349,16 @@ public static Object deadLetter(BObject receiverClient, BString lockToken, Objec
* Defer the message in a Queue or Subscription based on messageLockToken.
*
* @param receiverClient Ballerina ASB client object
* @param lockToken Message lock token.
* @param message Message object.
* @return An error if failed to defer the message.
*/
public static Object defer(BObject receiverClient, BString lockToken) {
public static Object defer(BObject receiverClient, BMap<BString, Object> message) {
try {
ServiceBusReceiverClient receiver = getReceiverFromBObject(receiverClient);
ServiceBusReceivedMessage message = (ServiceBusReceivedMessage) receiverClient
.getNativeData(lockToken.getValue());
receiver.defer(message);
receiverClient.getNativeData().remove(lockToken.getValue());
ServiceBusReceivedMessage nativeMessage = getNativeMessage(message);
receiver.defer(nativeMessage);
LOGGER.debug(String.format("Done deferring a message(Id: %s) using its lock token from %s",
message.getMessageId(), receiver.getEntityPath()));
nativeMessage.getMessageId(), receiver.getEntityPath()));
return null;
} catch (BError e) {
return ASBErrorCreator.fromBError(e);
Expand Down Expand Up @@ -410,18 +404,16 @@ public static Object receiveDeferred(BObject receiverClient, Object sequenceNumb
* messageLockToken.
*
* @param receiverClient Ballerina ASB client object
* @param lockToken Message lock token.
* @param message Message object.
* @return An error if failed to renewLock of the message.
*/
public static Object renewLock(BObject receiverClient, BString lockToken) {
public static Object renewLock(BObject receiverClient, BMap<BString, Object> message) {
try {
ServiceBusReceivedMessage message = (ServiceBusReceivedMessage) receiverClient
.getNativeData(lockToken.getValue());
ServiceBusReceiverClient receiver = getReceiverFromBObject(receiverClient);
receiver.renewMessageLock(message);
receiverClient.getNativeData().remove(lockToken.getValue());
ServiceBusReceivedMessage nativeMessage = getNativeMessage(message);
receiver.renewMessageLock(nativeMessage);
LOGGER.debug(String.format("Done renewing a message(Id: %s) using its lock token from %s",
message.getMessageId(), receiver.getEntityPath()));
nativeMessage.getMessageId(), receiver.getEntityPath()));
return null;
} catch (BError e) {
return ASBErrorCreator.fromBError(e);
Expand Down Expand Up @@ -474,12 +466,12 @@ private static BMap<BString, Object> constructExpectedMessageRecord(BObject rece
} else {
map.put(BODY, messageBody);
}

// This is to avoid adding messages to the native data map, if the receive-mode is 'RECEIVE_AND_DELETE'.
BMap<BString, Object> constructedMessage = createBRecordValue(map, expectedType);
// Only add the native message if the message received in peek lock mode.
if (!message.getLockToken().equals(DEFAULT_MESSAGE_LOCK_TOKEN)) {
receiverClient.addNativeData(message.getLockToken(), message);
constructedMessage.addNativeData(NATIVE_MESSAGE, message);
}
return createBRecordValue(map, expectedType);
return constructedMessage;
}

private static Map<String, Object> populateOptionalFieldsMap(ServiceBusReceivedMessage message) {
Expand Down Expand Up @@ -558,7 +550,10 @@ private static BMap<BString, Object> getReceivedMessageBatch(BObject receiverCli
LinkedList<Object> receivedMessages = new LinkedList<>();
for (ServiceBusReceivedMessage receivedMessage : receivedMessageStream) {
BMap<BString, Object> recordMap = constructExpectedMessageRecord(receiverClient, receivedMessage, null);
receivedMessages.add(createRecordValue(ModuleUtils.getModule(), ASBConstants.MESSAGE_RECORD, recordMap));
BMap<BString, Object> messageRecord = createRecordValue(ModuleUtils.getModule(),
ASBConstants.MESSAGE_RECORD, recordMap);
messageRecord.addNativeData(NATIVE_MESSAGE, receivedMessage);
receivedMessages.add(messageRecord);
}

BMap<BString, Object> messageRecord = ValueCreator.createRecordValue(ModuleUtils.getModule(),
Expand Down Expand Up @@ -667,4 +662,8 @@ private static void setClient(BObject receiverObject, ServiceBusReceiverClient c
receiverObject.addNativeData(ASBConstants.RECEIVER_CLIENT, client);
}
}

private static ServiceBusReceivedMessage getNativeMessage(BMap<BString, Object> message) {
return (ServiceBusReceivedMessage) message.getNativeData(NATIVE_MESSAGE);
}
}
Loading

0 comments on commit d0b8b8d

Please sign in to comment.