Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix Native Data Retention Issue #196

Merged
merged 6 commits into from
Oct 12, 2023
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 14 additions & 15 deletions ballerina/receiver.bal
Original file line number Diff line number Diff line change
Expand Up @@ -117,9 +117,9 @@ public isolated client class MessageReceiver {
isolated remote function complete(@display {label: "Message"} Message message)
returns Error? {
if message?.lockToken.toString() != DEFAULT_MESSAGE_LOCK_TOKEN {
return complete(self, message?.lockToken.toString());
return complete(self, message);
}
return createError("Failed to complete message with ID " + message?.messageId.toString());
return createError(string `Failed to complete message with ID ${message?.messageId.toString()}`);
ayeshLK marked this conversation as resolved.
Show resolved Hide resolved
}

# Abandon message from queue or subscription based on messageLockToken. Abandon processing of the message for
Expand All @@ -131,9 +131,9 @@ public isolated client class MessageReceiver {
@display {label: "Abandon Message"}
isolated remote function abandon(@display {label: "Message"} Message message) returns Error? {
if message?.lockToken.toString() != DEFAULT_MESSAGE_LOCK_TOKEN {
return abandon(self, message?.lockToken.toString());
return abandon(self, message);
}
return createError("Failed to abandon message with ID " + message?.messageId.toString());
return createError(string `Failed to abandon message with ID ${message?.messageId.toString()}`);
}

# Dead-Letter the message & moves the message to the Dead-Letter Queue based on messageLockToken. Transfer
Expand All @@ -149,10 +149,9 @@ public isolated client class MessageReceiver {
@display {label: "Dead Letter Description"}
string? deadLetterErrorDescription = ()) returns Error? {
if message?.lockToken.toString() != DEFAULT_MESSAGE_LOCK_TOKEN {
return deadLetter(self, message?.lockToken.toString(), deadLetterReason,
deadLetterErrorDescription);
return deadLetter(self, message, deadLetterReason, deadLetterErrorDescription);
}
return createError("Failed to deadletter message with ID " + message?.messageId.toString());
return createError(string `Failed to deadletter message with ID ${message?.messageId.toString()}`);
}

# Defer the message in a Queue or Subscription based on messageLockToken. It prevents the message from being
Expand All @@ -163,7 +162,7 @@ public isolated client class MessageReceiver {
@display {label: "Defer Message"}
isolated remote function defer(@display {label: "Message"} Message message)
returns @display {label: "Deferred Msg Seq Num"} int|Error {
check defer(self, message?.lockToken.toString());
check defer(self, message);
return <int>message?.sequenceNumber;
}

Expand All @@ -188,9 +187,9 @@ public isolated client class MessageReceiver {
@display {label: "Renew Lock On Message"}
isolated remote function renewLock(@display {label: "Message"} Message message) returns Error? {
if message?.lockToken.toString() != DEFAULT_MESSAGE_LOCK_TOKEN {
return renewLock(self, message?.lockToken.toString());
return renewLock(self, message);
}
return createError("Failed to renew lock on message with ID " + message?.messageId.toString());
return createError(string `Failed to renew lock on message with ID ${message?.messageId.toString()}`);
}

# Closes the ASB sender connection.
Expand All @@ -213,27 +212,27 @@ isolated function receiveBatch(MessageReceiver endpointClient, int? maxMessageCo
'class: "org.ballerinax.asb.receiver.MessageReceiver"
} external;

isolated function complete(MessageReceiver endpointClient, string lockToken) returns Error? = @java:Method {
isolated function complete(MessageReceiver endpointClient, Message message) returns Error? = @java:Method {
'class: "org.ballerinax.asb.receiver.MessageReceiver"
} external;

isolated function abandon(MessageReceiver endpointClient, string lockToken) returns Error? = @java:Method {
isolated function abandon(MessageReceiver endpointClient, Message message) returns Error? = @java:Method {
'class: "org.ballerinax.asb.receiver.MessageReceiver"
} external;

isolated function deadLetter(MessageReceiver endpointClient, string lockToken, string? deadLetterReason, string? deadLetterErrorDescription) returns
isolated function deadLetter(MessageReceiver endpointClient, Message message, string? deadLetterReason, string? deadLetterErrorDescription) returns
Error? = @java:Method {
'class: "org.ballerinax.asb.receiver.MessageReceiver"
} external;

isolated function defer(MessageReceiver endpointClient, string lockToken) returns Error? = @java:Method {
isolated function defer(MessageReceiver endpointClient, Message message) returns Error? = @java:Method {
'class: "org.ballerinax.asb.receiver.MessageReceiver"
} external;

isolated function receiveDeferred(MessageReceiver endpointClient, int sequenceNumber) returns Message|Error? = @java:Method {
'class: "org.ballerinax.asb.receiver.MessageReceiver"
} external;

isolated function renewLock(MessageReceiver endpointClient, string lockToken) returns Error? = @java:Method {
isolated function renewLock(MessageReceiver endpointClient, Message message) returns Error? = @java:Method {
'class: "org.ballerinax.asb.receiver.MessageReceiver"
} external;
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2021, WSO2 LLC. (http://www.wso2.org).
* Copyright (c) 2023, WSO2 LLC. (http://www.wso2.org).
*
* WSO2 LLC. licenses this file to you under the Apache License,
* Version 2.0 (the "License"); you may not use this file except
Expand Down Expand Up @@ -68,6 +68,7 @@
import static org.ballerinax.asb.util.ASBConstants.LABEL;
import static org.ballerinax.asb.util.ASBConstants.LOCK_TOKEN;
import static org.ballerinax.asb.util.ASBConstants.MESSAGE_ID;
import static org.ballerinax.asb.util.ASBConstants.NATIVE_MESSAGE;
import static org.ballerinax.asb.util.ASBConstants.PARTITION_KEY;
import static org.ballerinax.asb.util.ASBConstants.RECEIVER_CLIENT;
import static org.ballerinax.asb.util.ASBConstants.REPLY_TO;
Expand Down Expand Up @@ -265,22 +266,21 @@ public static Object receiveBatch(BObject receiverClient, Object maxMessageCount
* Completes Messages from Queue or Subscription based on messageLockToken.
*
* @param receiverClient Ballerina ASB client object
* @param lockToken Message lock token.
* @param message Message object.
* @return An error if failed to complete the message.
*/
public static Object complete(BObject receiverClient, BString lockToken) {
public static Object complete(BObject receiverClient, BMap<BString, Object> message) {
try {
ServiceBusReceivedMessage message = (ServiceBusReceivedMessage) receiverClient
.getNativeData(lockToken.getValue());
ServiceBusReceivedMessage nativeMessage = getNativeMessage(message);
ServiceBusReceiverClient receiver;
if (message.getDeadLetterReason() != null) {
if (nativeMessage.getDeadLetterReason() != null) {
receiver = (ServiceBusReceiverClient) getDeadLetterMessageReceiverFromBObject(receiverClient);
} else {
receiver = getReceiverFromBObject(receiverClient);
}
receiver.complete(message);
receiverClient.getNativeData().remove(lockToken.getValue());
LOGGER.debug("Completed the message(Id: " + message.getMessageId() + ") with lockToken " + lockToken);
receiver.complete(nativeMessage);
LOGGER.debug("Completed the message(Id: " + nativeMessage.getMessageId() + ") with lockToken " +
nativeMessage.getLockToken());
return null;
} catch (BError e) {
return ASBErrorCreator.fromBError(e);
Expand All @@ -295,18 +295,17 @@ public static Object complete(BObject receiverClient, BString lockToken) {
* Abandons message & make available again for processing from Queue or Subscription, based on messageLockToken.
*
* @param receiverClient Ballerina ASB client object
* @param lockToken Message lock token.
* @param message Message object.
* @return An error if failed to abandon the message.
*/
public static Object abandon(BObject receiverClient, BString lockToken) {
public static Object abandon(BObject receiverClient, BMap<BString, Object> message) {
try {
ServiceBusReceiverClient receiver = getReceiverFromBObject(receiverClient);
ServiceBusReceivedMessage message = (ServiceBusReceivedMessage) receiverClient
.getNativeData(lockToken.getValue());
receiver.abandon(message);
receiverClient.getNativeData().remove(lockToken.getValue());
ServiceBusReceivedMessage nativeMessage = getNativeMessage(message);
;
ayeshLK marked this conversation as resolved.
Show resolved Hide resolved
receiver.abandon(nativeMessage);
LOGGER.debug(String.format("Done abandoning a message(Id: %s) using its lock token from %n%s",
message.getMessageId(), receiver.getEntityPath()));
nativeMessage.getMessageId(), receiver.getEntityPath()));
return null;
} catch (BError e) {
return ASBErrorCreator.fromBError(e);
Expand All @@ -321,24 +320,22 @@ public static Object abandon(BObject receiverClient, BString lockToken) {
* Dead-Letter the message & moves the message to the Dead-Letter Queue based on messageLockToken.
*
* @param receiverClient Ballerina ASB client object
* @param lockToken Message lock token.
* @param message Message object.
* @param deadLetterReason The dead letter reason.
* @param deadLetterErrorDescription The dead letter error description.
* @return An error if failed to dead letter the message.
*/
public static Object deadLetter(BObject receiverClient, BString lockToken, Object deadLetterReason,
public static Object deadLetter(BObject receiverClient, BMap<BString, Object> message, Object deadLetterReason,
Object deadLetterErrorDescription) {
try {
ServiceBusReceiverClient receiver = getReceiverFromBObject(receiverClient);
ServiceBusReceivedMessage message = (ServiceBusReceivedMessage) receiverClient
.getNativeData(lockToken.getValue());
ServiceBusReceivedMessage nativeMessage = getNativeMessage(message);
DeadLetterOptions options = new DeadLetterOptions()
.setDeadLetterErrorDescription(ASBUtils.convertString(deadLetterErrorDescription));
options.setDeadLetterReason(ASBUtils.convertString(deadLetterReason));
receiver.deadLetter(message, options);
receiverClient.getNativeData().remove(lockToken.getValue());
receiver.deadLetter(nativeMessage, options);
LOGGER.debug(String.format("Done dead-lettering a message(Id: %s) using its lock token from %s",
message.getMessageId(), receiver.getEntityPath()));
nativeMessage.getMessageId(), receiver.getEntityPath()));
return null;
} catch (BError e) {
return ASBErrorCreator.fromBError(e);
Expand All @@ -353,18 +350,16 @@ public static Object deadLetter(BObject receiverClient, BString lockToken, Objec
* Defer the message in a Queue or Subscription based on messageLockToken.
*
* @param receiverClient Ballerina ASB client object
* @param lockToken Message lock token.
* @param message Message object.
* @return An error if failed to defer the message.
*/
public static Object defer(BObject receiverClient, BString lockToken) {
public static Object defer(BObject receiverClient, BMap<BString, Object> message) {
try {
ServiceBusReceiverClient receiver = getReceiverFromBObject(receiverClient);
ServiceBusReceivedMessage message = (ServiceBusReceivedMessage) receiverClient
.getNativeData(lockToken.getValue());
receiver.defer(message);
receiverClient.getNativeData().remove(lockToken.getValue());
ServiceBusReceivedMessage nativeMessage = getNativeMessage(message);
receiver.defer(nativeMessage);
LOGGER.debug(String.format("Done deferring a message(Id: %s) using its lock token from %s",
message.getMessageId(), receiver.getEntityPath()));
nativeMessage.getMessageId(), receiver.getEntityPath()));
return null;
} catch (BError e) {
return ASBErrorCreator.fromBError(e);
Expand Down Expand Up @@ -410,18 +405,16 @@ public static Object receiveDeferred(BObject receiverClient, Object sequenceNumb
* messageLockToken.
*
* @param receiverClient Ballerina ASB client object
* @param lockToken Message lock token.
* @param message Message object.
* @return An error if failed to renewLock of the message.
*/
public static Object renewLock(BObject receiverClient, BString lockToken) {
public static Object renewLock(BObject receiverClient, BMap<BString, Object> message) {
try {
ServiceBusReceivedMessage message = (ServiceBusReceivedMessage) receiverClient
.getNativeData(lockToken.getValue());
ServiceBusReceiverClient receiver = getReceiverFromBObject(receiverClient);
receiver.renewMessageLock(message);
receiverClient.getNativeData().remove(lockToken.getValue());
ServiceBusReceivedMessage nativeMessage = getNativeMessage(message);
receiver.renewMessageLock(nativeMessage);
LOGGER.debug(String.format("Done renewing a message(Id: %s) using its lock token from %s",
message.getMessageId(), receiver.getEntityPath()));
nativeMessage.getMessageId(), receiver.getEntityPath()));
return null;
} catch (BError e) {
return ASBErrorCreator.fromBError(e);
Expand Down Expand Up @@ -474,12 +467,12 @@ private static BMap<BString, Object> constructExpectedMessageRecord(BObject rece
} else {
map.put(BODY, messageBody);
}

// This is to avoid adding messages to the native data map, if the receive-mode is 'RECEIVE_AND_DELETE'.
BMap<BString, Object> constructedMessage = createBRecordValue(map, expectedType);
// Only add the native message if the message received in peek lock mode.
if (!message.getLockToken().equals(DEFAULT_MESSAGE_LOCK_TOKEN)) {
receiverClient.addNativeData(message.getLockToken(), message);
constructedMessage.addNativeData(NATIVE_MESSAGE, message);
}
return createBRecordValue(map, expectedType);
return constructedMessage;
}

private static Map<String, Object> populateOptionalFieldsMap(ServiceBusReceivedMessage message) {
Expand Down Expand Up @@ -558,7 +551,10 @@ private static BMap<BString, Object> getReceivedMessageBatch(BObject receiverCli
LinkedList<Object> receivedMessages = new LinkedList<>();
for (ServiceBusReceivedMessage receivedMessage : receivedMessageStream) {
BMap<BString, Object> recordMap = constructExpectedMessageRecord(receiverClient, receivedMessage, null);
receivedMessages.add(createRecordValue(ModuleUtils.getModule(), ASBConstants.MESSAGE_RECORD, recordMap));
BMap<BString, Object> messageRecord = createRecordValue(ModuleUtils.getModule(),
ASBConstants.MESSAGE_RECORD, recordMap);
messageRecord.addNativeData(NATIVE_MESSAGE, receivedMessage);
receivedMessages.add(messageRecord);
}

BMap<BString, Object> messageRecord = ValueCreator.createRecordValue(ModuleUtils.getModule(),
Expand Down Expand Up @@ -667,4 +663,8 @@ private static void setClient(BObject receiverObject, ServiceBusReceiverClient c
receiverObject.addNativeData(ASBConstants.RECEIVER_CLIENT, client);
}
}

private static ServiceBusReceivedMessage getNativeMessage(BMap<BString, Object> message) {
return (ServiceBusReceivedMessage) message.getNativeData(NATIVE_MESSAGE);
}
}
11 changes: 9 additions & 2 deletions native/src/main/java/org/ballerinax/asb/util/ASBConstants.java
Original file line number Diff line number Diff line change
Expand Up @@ -26,13 +26,14 @@
*/
public class ASBConstants {

//Clients
//Native Object Identifiers
public static final String RECEIVER_CLIENT = "RECEIVER_CLIENT";
public static final String ADMINISTRATOR_CLIENT = "ADMINISTRATOR_CLIENT";
public static final String SENDER_CLIENT = "SENDER_CLIENT";
public static final String DEAD_LETTER_RECEIVER_CLIENT = "DEAD_LETTER_RECEIVER_CLIENT";
public static final String NATIVE_MESSAGE = "NATIVE_MESSAGE";

//Client Init Data
//Receiver Client Init Data
public static final String RECEIVER_CLIENT_CONNECTION_STRING = "CONNECTION_STRING";
public static final String RECEIVER_CLIENT_RECEIVE_MODE = "RECEIVE_MODE";
public static final String RECEIVER_CLIENT_TOPIC_NAME = "TOPIC_NAME";
Expand All @@ -45,17 +46,20 @@ public class ASBConstants {
// Message constant fields
public static final String MESSAGE_RECORD = "Message";
public static final String APPLICATION_PROPERTY_TYPE = "ApplicationProperties";

// Message content data binding errors
public static final String XML_CONTENT_ERROR = "Error while retrieving the xml content of the message. ";
public static final String JSON_CONTENT_ERROR = "Error while retrieving the json content of the message. ";
public static final String TEXT_CONTENT_ERROR = "Error while retrieving the string content of the message. ";
public static final String INT_CONTENT_ERROR = "Error while retrieving the int content of the message. ";
public static final String FLOAT_CONTENT_ERROR = "Error while retrieving the float content of the message. ";

// Batch Message constant fields
public static final String MESSAGE_BATCH_RECORD = "MessageBatch";
public static final String MESSAGES_OBJECT = "Messages";
public static final BString MESSAGES_CONTENT = StringUtils.fromString("messages");
public static final BString MESSAGE_COUNT = StringUtils.fromString("messageCount");

// Message receive modes
public static final String PEEK_LOCK = "PEEKLOCK";
public static final String RECEIVE_AND_DELETE = "RECEIVEANDDELETE";
Expand Down Expand Up @@ -115,6 +119,7 @@ public class ASBConstants {
// Error constant fields
static final String ASB_ERROR = "Error";

// Subscription/Topic/Queue Properties record fields
public static final String SUBSCRIPTION_CREATED_RECORD = "SubscriptionProperties";
public static final String TOPIC_CREATED_RECORD = "TopicProperties";
public static final String QUEUE_CREATED_RECORD = "QueueProperties";
Expand Down Expand Up @@ -326,6 +331,7 @@ public class ASBConstants {
"deadLetteringOnFilterEvaluationExceptions";
public static final String CREATED_SUBSCRIPTION_RECORD_FIELD_SESSION_REQUIRED =
"requiresSession";

// Subscription List
public static final String LIST_OF_SUBSCRIPTIONS = "list";
public static final String LIST_OF_SUBSCRIPTIONS_RECORD = "SubscriptionList";
Expand All @@ -342,6 +348,7 @@ public class ASBConstants {
public static final String CREATED_RULE_RECORD_FIELD_TYPE_NAME = "rule";
public static final String CREATED_RULE_RECORD_FIELD_ACTION = "action";
public static final String CREATED_RULE_RECORD_FIELD_FILTER = "filter";

// Rule List
public static final String LIST_OF_RULES = "list";
public static final String LIST_OF_RULES_RECORD = "RuleList";
Expand Down