Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix Native Data Retention Issue #196

Merged
merged 6 commits into from
Oct 12, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 14 additions & 15 deletions ballerina/receiver.bal
Original file line number Diff line number Diff line change
Expand Up @@ -117,9 +117,9 @@ public isolated client class MessageReceiver {
isolated remote function complete(@display {label: "Message"} Message message)
returns Error? {
if message?.lockToken.toString() != DEFAULT_MESSAGE_LOCK_TOKEN {
return complete(self, message?.lockToken.toString());
return complete(self, message);
}
return createError("Failed to complete message with ID " + message?.messageId.toString());
return createError(string `Failed to complete message with ID: ${message?.messageId.toString()}`);
}

# Abandon message from queue or subscription based on messageLockToken. Abandon processing of the message for
Expand All @@ -131,9 +131,9 @@ public isolated client class MessageReceiver {
@display {label: "Abandon Message"}
isolated remote function abandon(@display {label: "Message"} Message message) returns Error? {
if message?.lockToken.toString() != DEFAULT_MESSAGE_LOCK_TOKEN {
return abandon(self, message?.lockToken.toString());
return abandon(self, message);
}
return createError("Failed to abandon message with ID " + message?.messageId.toString());
return createError(string `Failed to abandon message with ID: ${message?.messageId.toString()}`);
}

# Dead-Letter the message & moves the message to the Dead-Letter Queue based on messageLockToken. Transfer
Expand All @@ -149,10 +149,9 @@ public isolated client class MessageReceiver {
@display {label: "Dead Letter Description"}
string? deadLetterErrorDescription = ()) returns Error? {
if message?.lockToken.toString() != DEFAULT_MESSAGE_LOCK_TOKEN {
return deadLetter(self, message?.lockToken.toString(), deadLetterReason,
deadLetterErrorDescription);
return deadLetter(self, message, deadLetterReason, deadLetterErrorDescription);
}
return createError("Failed to deadletter message with ID " + message?.messageId.toString());
return createError(string `Failed to deadletter message with ID: ${message?.messageId.toString()}`);
}

# Defer the message in a Queue or Subscription based on messageLockToken. It prevents the message from being
Expand All @@ -163,7 +162,7 @@ public isolated client class MessageReceiver {
@display {label: "Defer Message"}
isolated remote function defer(@display {label: "Message"} Message message)
returns @display {label: "Deferred Msg Seq Num"} int|Error {
check defer(self, message?.lockToken.toString());
check defer(self, message);
return <int>message?.sequenceNumber;
}

Expand All @@ -188,9 +187,9 @@ public isolated client class MessageReceiver {
@display {label: "Renew Lock On Message"}
isolated remote function renewLock(@display {label: "Message"} Message message) returns Error? {
if message?.lockToken.toString() != DEFAULT_MESSAGE_LOCK_TOKEN {
return renewLock(self, message?.lockToken.toString());
return renewLock(self, message);
}
return createError("Failed to renew lock on message with ID " + message?.messageId.toString());
return createError(string `Failed to renew lock on message with ID: ${message?.messageId.toString()}`);
}

# Closes the ASB sender connection.
Expand All @@ -213,27 +212,27 @@ isolated function receiveBatch(MessageReceiver endpointClient, int? maxMessageCo
'class: "org.ballerinax.asb.receiver.MessageReceiver"
} external;

isolated function complete(MessageReceiver endpointClient, string lockToken) returns Error? = @java:Method {
isolated function complete(MessageReceiver endpointClient, Message message) returns Error? = @java:Method {
'class: "org.ballerinax.asb.receiver.MessageReceiver"
} external;

isolated function abandon(MessageReceiver endpointClient, string lockToken) returns Error? = @java:Method {
isolated function abandon(MessageReceiver endpointClient, Message message) returns Error? = @java:Method {
'class: "org.ballerinax.asb.receiver.MessageReceiver"
} external;

isolated function deadLetter(MessageReceiver endpointClient, string lockToken, string? deadLetterReason, string? deadLetterErrorDescription) returns
isolated function deadLetter(MessageReceiver endpointClient, Message message, string? deadLetterReason, string? deadLetterErrorDescription) returns
Error? = @java:Method {
'class: "org.ballerinax.asb.receiver.MessageReceiver"
} external;

isolated function defer(MessageReceiver endpointClient, string lockToken) returns Error? = @java:Method {
isolated function defer(MessageReceiver endpointClient, Message message) returns Error? = @java:Method {
'class: "org.ballerinax.asb.receiver.MessageReceiver"
} external;

isolated function receiveDeferred(MessageReceiver endpointClient, int sequenceNumber) returns Message|Error? = @java:Method {
'class: "org.ballerinax.asb.receiver.MessageReceiver"
} external;

isolated function renewLock(MessageReceiver endpointClient, string lockToken) returns Error? = @java:Method {
isolated function renewLock(MessageReceiver endpointClient, Message message) returns Error? = @java:Method {
'class: "org.ballerinax.asb.receiver.MessageReceiver"
} external;
83 changes: 83 additions & 0 deletions ballerina/tests/asb_sender_receiver_negative_tests.bal
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,10 @@

import ballerina/log;
import ballerina/test;
import ballerina/regex;

string invalidCompleteError = "^Failed to complete message with ID:.*$";
string invalidAbandonError = "^Failed to abandon message with ID:.*$";
@test:Config {
groups: ["asb_sender_receiver_negative"],
dependsOn: [testCreateQueue, testCreateTopicOperation, testCreateSubscription]
Expand Down Expand Up @@ -52,6 +55,86 @@ function testReceivePayloadWithIncorrectExpectedType() returns error? {
groups: ["asb_sender_receiver_negative"],
dependsOn: [testReceivePayloadWithIncorrectExpectedType]
}
function testInvalidComplete() returns error? {
RDPerera marked this conversation as resolved.
Show resolved Hide resolved
log:printInfo("[[testInvalidComplete]");
RDPerera marked this conversation as resolved.
Show resolved Hide resolved

log:printInfo("Initializing Asb sender client.");
MessageSender messageSender = check new (senderConfig);

log:printInfo("Initializing Asb receiver client.");
receiverConfig.receiveMode = RECEIVE_AND_DELETE;

MessageReceiver messageReceiver = check new (receiverConfig);

log:printInfo("Sending via Asb sender.");
check messageSender->send(message1);

log:printInfo("Receiving from Asb receiver client.");
Message|error? receivedMessage = messageReceiver->receive(serverWaitTime);

if receivedMessage is Message {
log:printInfo("messgae" + receivedMessage.toString());
Error? result = messageReceiver->complete(receivedMessage);
test:assertTrue(result is error, msg = "Unexpected Complete for Messages in Receive and Delete Mode");
test:assertTrue(regex:matches((<Error>result).message(),invalidCompleteError), msg = "Invalid Complete for " +
" Messages in Receive and Delete Mode");
} else if receivedMessage is () {
test:assertFail("No message in the queue.");
} else {
test:assertFail("Receiving message via Asb receiver connection failed.");
}

log:printInfo("Closing Asb sender client.");
check messageSender->close();

log:printInfo("Closing Asb receiver client.");
check messageReceiver->close();
}

@test:Config {
groups: ["asb_sender_receiver_negative"],
dependsOn: [testInvalidComplete]
}
function testInvalidAbandon() returns error? {
log:printInfo("[[testInvalidAbandon]");

log:printInfo("Initializing Asb sender client.");
MessageSender messageSender = check new (senderConfig);

log:printInfo("Initializing Asb receiver client.");
receiverConfig.receiveMode = RECEIVE_AND_DELETE;

MessageReceiver messageReceiver = check new (receiverConfig);

log:printInfo("Sending via Asb sender.");
check messageSender->send(message1);

log:printInfo("Receiving from Asb receiver client.");
Message|error? receivedMessage = messageReceiver->receive(serverWaitTime);

if receivedMessage is Message {
log:printInfo("messgae" + receivedMessage.toString());
Error? result = messageReceiver->abandon(receivedMessage);
test:assertTrue(result is error, msg = "Unexpected Abandon for Messages in Receive and Delete Mode");
test:assertTrue(regex:matches((<Error>result).message(),invalidAbandonError), msg = "Invalid Abandon for " +
" Messages in Receive and Delete Mode");
} else if receivedMessage is () {
test:assertFail("No message in the queue.");
} else {
test:assertFail("Receiving message via Asb receiver connection failed.");
}

log:printInfo("Closing Asb sender client.");
check messageSender->close();

log:printInfo("Closing Asb receiver client.");
check messageReceiver->close();
}

@test:Config {
groups: ["asb_sender_receiver_negative"],
dependsOn: [testInvalidAbandon]
}
function testReceivePayloadWithUnsupportedUnionExpectedType() returns error? {
log:printInfo("[[testReceivePayloadWithUnsupportedUnionExpectedType]]");
log:printInfo("Creating Asb message sender.");
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2021, WSO2 LLC. (http://www.wso2.org).
* Copyright (c) 2023, WSO2 LLC. (http://www.wso2.org).
*
* WSO2 LLC. licenses this file to you under the Apache License,
* Version 2.0 (the "License"); you may not use this file except
Expand Down Expand Up @@ -68,6 +68,7 @@
import static org.ballerinax.asb.util.ASBConstants.LABEL;
import static org.ballerinax.asb.util.ASBConstants.LOCK_TOKEN;
import static org.ballerinax.asb.util.ASBConstants.MESSAGE_ID;
import static org.ballerinax.asb.util.ASBConstants.NATIVE_MESSAGE;
import static org.ballerinax.asb.util.ASBConstants.PARTITION_KEY;
import static org.ballerinax.asb.util.ASBConstants.RECEIVER_CLIENT;
import static org.ballerinax.asb.util.ASBConstants.REPLY_TO;
Expand Down Expand Up @@ -265,22 +266,21 @@ public static Object receiveBatch(BObject receiverClient, Object maxMessageCount
* Completes Messages from Queue or Subscription based on messageLockToken.
*
* @param receiverClient Ballerina ASB client object
* @param lockToken Message lock token.
* @param message Message object.
* @return An error if failed to complete the message.
*/
public static Object complete(BObject receiverClient, BString lockToken) {
public static Object complete(BObject receiverClient, BMap<BString, Object> message) {
try {
ServiceBusReceivedMessage message = (ServiceBusReceivedMessage) receiverClient
.getNativeData(lockToken.getValue());
ServiceBusReceivedMessage nativeMessage = getNativeMessage(message);
ServiceBusReceiverClient receiver;
if (message.getDeadLetterReason() != null) {
if (nativeMessage.getDeadLetterReason() != null) {
receiver = (ServiceBusReceiverClient) getDeadLetterMessageReceiverFromBObject(receiverClient);
} else {
receiver = getReceiverFromBObject(receiverClient);
}
receiver.complete(message);
receiverClient.getNativeData().remove(lockToken.getValue());
LOGGER.debug("Completed the message(Id: " + message.getMessageId() + ") with lockToken " + lockToken);
receiver.complete(nativeMessage);
LOGGER.debug("Completed the message(Id: " + nativeMessage.getMessageId() + ") with lockToken " +
nativeMessage.getLockToken());
return null;
} catch (BError e) {
return ASBErrorCreator.fromBError(e);
Expand All @@ -295,18 +295,16 @@ public static Object complete(BObject receiverClient, BString lockToken) {
* Abandons message & make available again for processing from Queue or Subscription, based on messageLockToken.
*
* @param receiverClient Ballerina ASB client object
* @param lockToken Message lock token.
* @param message Message object.
* @return An error if failed to abandon the message.
*/
public static Object abandon(BObject receiverClient, BString lockToken) {
public static Object abandon(BObject receiverClient, BMap<BString, Object> message) {
try {
ServiceBusReceiverClient receiver = getReceiverFromBObject(receiverClient);
ServiceBusReceivedMessage message = (ServiceBusReceivedMessage) receiverClient
.getNativeData(lockToken.getValue());
receiver.abandon(message);
receiverClient.getNativeData().remove(lockToken.getValue());
ServiceBusReceivedMessage nativeMessage = getNativeMessage(message);
receiver.abandon(nativeMessage);
LOGGER.debug(String.format("Done abandoning a message(Id: %s) using its lock token from %n%s",
message.getMessageId(), receiver.getEntityPath()));
nativeMessage.getMessageId(), receiver.getEntityPath()));
return null;
} catch (BError e) {
return ASBErrorCreator.fromBError(e);
Expand All @@ -321,24 +319,22 @@ public static Object abandon(BObject receiverClient, BString lockToken) {
* Dead-Letter the message & moves the message to the Dead-Letter Queue based on messageLockToken.
*
* @param receiverClient Ballerina ASB client object
* @param lockToken Message lock token.
* @param message Message object.
* @param deadLetterReason The dead letter reason.
* @param deadLetterErrorDescription The dead letter error description.
* @return An error if failed to dead letter the message.
*/
public static Object deadLetter(BObject receiverClient, BString lockToken, Object deadLetterReason,
public static Object deadLetter(BObject receiverClient, BMap<BString, Object> message, Object deadLetterReason,
Object deadLetterErrorDescription) {
try {
ServiceBusReceiverClient receiver = getReceiverFromBObject(receiverClient);
ServiceBusReceivedMessage message = (ServiceBusReceivedMessage) receiverClient
.getNativeData(lockToken.getValue());
ServiceBusReceivedMessage nativeMessage = getNativeMessage(message);
DeadLetterOptions options = new DeadLetterOptions()
.setDeadLetterErrorDescription(ASBUtils.convertString(deadLetterErrorDescription));
options.setDeadLetterReason(ASBUtils.convertString(deadLetterReason));
receiver.deadLetter(message, options);
receiverClient.getNativeData().remove(lockToken.getValue());
receiver.deadLetter(nativeMessage, options);
LOGGER.debug(String.format("Done dead-lettering a message(Id: %s) using its lock token from %s",
message.getMessageId(), receiver.getEntityPath()));
nativeMessage.getMessageId(), receiver.getEntityPath()));
return null;
} catch (BError e) {
return ASBErrorCreator.fromBError(e);
Expand All @@ -353,18 +349,16 @@ public static Object deadLetter(BObject receiverClient, BString lockToken, Objec
* Defer the message in a Queue or Subscription based on messageLockToken.
*
* @param receiverClient Ballerina ASB client object
* @param lockToken Message lock token.
* @param message Message object.
* @return An error if failed to defer the message.
*/
public static Object defer(BObject receiverClient, BString lockToken) {
public static Object defer(BObject receiverClient, BMap<BString, Object> message) {
try {
ServiceBusReceiverClient receiver = getReceiverFromBObject(receiverClient);
ServiceBusReceivedMessage message = (ServiceBusReceivedMessage) receiverClient
.getNativeData(lockToken.getValue());
receiver.defer(message);
receiverClient.getNativeData().remove(lockToken.getValue());
ServiceBusReceivedMessage nativeMessage = getNativeMessage(message);
receiver.defer(nativeMessage);
LOGGER.debug(String.format("Done deferring a message(Id: %s) using its lock token from %s",
message.getMessageId(), receiver.getEntityPath()));
nativeMessage.getMessageId(), receiver.getEntityPath()));
return null;
} catch (BError e) {
return ASBErrorCreator.fromBError(e);
Expand Down Expand Up @@ -410,18 +404,16 @@ public static Object receiveDeferred(BObject receiverClient, Object sequenceNumb
* messageLockToken.
*
* @param receiverClient Ballerina ASB client object
* @param lockToken Message lock token.
* @param message Message object.
* @return An error if failed to renewLock of the message.
*/
public static Object renewLock(BObject receiverClient, BString lockToken) {
public static Object renewLock(BObject receiverClient, BMap<BString, Object> message) {
try {
ServiceBusReceivedMessage message = (ServiceBusReceivedMessage) receiverClient
.getNativeData(lockToken.getValue());
ServiceBusReceiverClient receiver = getReceiverFromBObject(receiverClient);
receiver.renewMessageLock(message);
receiverClient.getNativeData().remove(lockToken.getValue());
ServiceBusReceivedMessage nativeMessage = getNativeMessage(message);
receiver.renewMessageLock(nativeMessage);
LOGGER.debug(String.format("Done renewing a message(Id: %s) using its lock token from %s",
message.getMessageId(), receiver.getEntityPath()));
nativeMessage.getMessageId(), receiver.getEntityPath()));
return null;
} catch (BError e) {
return ASBErrorCreator.fromBError(e);
Expand Down Expand Up @@ -474,12 +466,12 @@ private static BMap<BString, Object> constructExpectedMessageRecord(BObject rece
} else {
map.put(BODY, messageBody);
}

// This is to avoid adding messages to the native data map, if the receive-mode is 'RECEIVE_AND_DELETE'.
BMap<BString, Object> constructedMessage = createBRecordValue(map, expectedType);
// Only add the native message if the message received in peek lock mode.
if (!message.getLockToken().equals(DEFAULT_MESSAGE_LOCK_TOKEN)) {
receiverClient.addNativeData(message.getLockToken(), message);
constructedMessage.addNativeData(NATIVE_MESSAGE, message);
}
return createBRecordValue(map, expectedType);
return constructedMessage;
}

private static Map<String, Object> populateOptionalFieldsMap(ServiceBusReceivedMessage message) {
Expand Down Expand Up @@ -558,7 +550,10 @@ private static BMap<BString, Object> getReceivedMessageBatch(BObject receiverCli
LinkedList<Object> receivedMessages = new LinkedList<>();
for (ServiceBusReceivedMessage receivedMessage : receivedMessageStream) {
BMap<BString, Object> recordMap = constructExpectedMessageRecord(receiverClient, receivedMessage, null);
receivedMessages.add(createRecordValue(ModuleUtils.getModule(), ASBConstants.MESSAGE_RECORD, recordMap));
BMap<BString, Object> messageRecord = createRecordValue(ModuleUtils.getModule(),
ASBConstants.MESSAGE_RECORD, recordMap);
messageRecord.addNativeData(NATIVE_MESSAGE, receivedMessage);
receivedMessages.add(messageRecord);
}

BMap<BString, Object> messageRecord = ValueCreator.createRecordValue(ModuleUtils.getModule(),
Expand Down Expand Up @@ -667,4 +662,8 @@ private static void setClient(BObject receiverObject, ServiceBusReceiverClient c
receiverObject.addNativeData(ASBConstants.RECEIVER_CLIENT, client);
}
}

private static ServiceBusReceivedMessage getNativeMessage(BMap<BString, Object> message) {
return (ServiceBusReceivedMessage) message.getNativeData(NATIVE_MESSAGE);
}
}
Loading