From ec48f57580c90dc6567649d0a6e182d14994d8bc Mon Sep 17 00:00:00 2001 From: RDPerera Date: Fri, 15 Sep 2023 04:50:39 +0530 Subject: [PATCH 1/6] Fix Native Data Retention Issue --- ballerina/receiver.bal | 21 +++-- .../asb/receiver/MessageReceiver.java | 84 +++++++++---------- .../org/ballerinax/asb/util/ASBConstants.java | 11 ++- 3 files changed, 61 insertions(+), 55 deletions(-) diff --git a/ballerina/receiver.bal b/ballerina/receiver.bal index c6ee1f5f..05ed3315 100644 --- a/ballerina/receiver.bal +++ b/ballerina/receiver.bal @@ -117,7 +117,7 @@ public isolated client class MessageReceiver { isolated remote function complete(@display {label: "Message"} Message message) returns Error? { if message?.lockToken.toString() != DEFAULT_MESSAGE_LOCK_TOKEN { - return complete(self, message?.lockToken.toString()); + return complete(self, message); } return createError("Failed to complete message with ID " + message?.messageId.toString()); } @@ -131,7 +131,7 @@ public isolated client class MessageReceiver { @display {label: "Abandon Message"} isolated remote function abandon(@display {label: "Message"} Message message) returns Error? { if message?.lockToken.toString() != DEFAULT_MESSAGE_LOCK_TOKEN { - return abandon(self, message?.lockToken.toString()); + return abandon(self, message); } return createError("Failed to abandon message with ID " + message?.messageId.toString()); } @@ -149,8 +149,7 @@ public isolated client class MessageReceiver { @display {label: "Dead Letter Description"} string? deadLetterErrorDescription = ()) returns Error? { if message?.lockToken.toString() != DEFAULT_MESSAGE_LOCK_TOKEN { - return deadLetter(self, message?.lockToken.toString(), deadLetterReason, - deadLetterErrorDescription); + return deadLetter(self, message, deadLetterReason, deadLetterErrorDescription); } return createError("Failed to deadletter message with ID " + message?.messageId.toString()); } @@ -163,7 +162,7 @@ public isolated client class MessageReceiver { @display {label: "Defer Message"} isolated remote function defer(@display {label: "Message"} Message message) returns @display {label: "Deferred Msg Seq Num"} int|Error { - check defer(self, message?.lockToken.toString()); + check defer(self, message); return message?.sequenceNumber; } @@ -188,7 +187,7 @@ public isolated client class MessageReceiver { @display {label: "Renew Lock On Message"} isolated remote function renewLock(@display {label: "Message"} Message message) returns Error? { if message?.lockToken.toString() != DEFAULT_MESSAGE_LOCK_TOKEN { - return renewLock(self, message?.lockToken.toString()); + return renewLock(self, message); } return createError("Failed to renew lock on message with ID " + message?.messageId.toString()); } @@ -213,20 +212,20 @@ isolated function receiveBatch(MessageReceiver endpointClient, int? maxMessageCo 'class: "org.ballerinax.asb.receiver.MessageReceiver" } external; -isolated function complete(MessageReceiver endpointClient, string lockToken) returns Error? = @java:Method { +isolated function complete(MessageReceiver endpointClient, Message message) returns Error? = @java:Method { 'class: "org.ballerinax.asb.receiver.MessageReceiver" } external; -isolated function abandon(MessageReceiver endpointClient, string lockToken) returns Error? = @java:Method { +isolated function abandon(MessageReceiver endpointClient, Message message) returns Error? = @java:Method { 'class: "org.ballerinax.asb.receiver.MessageReceiver" } external; -isolated function deadLetter(MessageReceiver endpointClient, string lockToken, string? deadLetterReason, string? deadLetterErrorDescription) returns +isolated function deadLetter(MessageReceiver endpointClient, Message message, string? deadLetterReason, string? deadLetterErrorDescription) returns Error? = @java:Method { 'class: "org.ballerinax.asb.receiver.MessageReceiver" } external; -isolated function defer(MessageReceiver endpointClient, string lockToken) returns Error? = @java:Method { +isolated function defer(MessageReceiver endpointClient, Message message) returns Error? = @java:Method { 'class: "org.ballerinax.asb.receiver.MessageReceiver" } external; @@ -234,6 +233,6 @@ isolated function receiveDeferred(MessageReceiver endpointClient, int sequenceNu 'class: "org.ballerinax.asb.receiver.MessageReceiver" } external; -isolated function renewLock(MessageReceiver endpointClient, string lockToken) returns Error? = @java:Method { +isolated function renewLock(MessageReceiver endpointClient, Message message) returns Error? = @java:Method { 'class: "org.ballerinax.asb.receiver.MessageReceiver" } external; diff --git a/native/src/main/java/org/ballerinax/asb/receiver/MessageReceiver.java b/native/src/main/java/org/ballerinax/asb/receiver/MessageReceiver.java index e79d8fa6..01835648 100644 --- a/native/src/main/java/org/ballerinax/asb/receiver/MessageReceiver.java +++ b/native/src/main/java/org/ballerinax/asb/receiver/MessageReceiver.java @@ -1,5 +1,5 @@ /* - * Copyright (c) 2021, WSO2 LLC. (http://www.wso2.org). + * Copyright (c) 2023, WSO2 LLC. (http://www.wso2.org). * * WSO2 LLC. licenses this file to you under the Apache License, * Version 2.0 (the "License"); you may not use this file except @@ -68,6 +68,7 @@ import static org.ballerinax.asb.util.ASBConstants.LABEL; import static org.ballerinax.asb.util.ASBConstants.LOCK_TOKEN; import static org.ballerinax.asb.util.ASBConstants.MESSAGE_ID; +import static org.ballerinax.asb.util.ASBConstants.NATIVE_MESSAGE; import static org.ballerinax.asb.util.ASBConstants.PARTITION_KEY; import static org.ballerinax.asb.util.ASBConstants.RECEIVER_CLIENT; import static org.ballerinax.asb.util.ASBConstants.REPLY_TO; @@ -265,22 +266,21 @@ public static Object receiveBatch(BObject receiverClient, Object maxMessageCount * Completes Messages from Queue or Subscription based on messageLockToken. * * @param receiverClient Ballerina ASB client object - * @param lockToken Message lock token. + * @param message Message object. * @return An error if failed to complete the message. */ - public static Object complete(BObject receiverClient, BString lockToken) { + public static Object complete(BObject receiverClient, BMap message) { try { - ServiceBusReceivedMessage message = (ServiceBusReceivedMessage) receiverClient - .getNativeData(lockToken.getValue()); + ServiceBusReceivedMessage nativeMessage = getNativeMessage(message); ServiceBusReceiverClient receiver; - if (message.getDeadLetterReason() != null) { + if (nativeMessage.getDeadLetterReason() != null) { receiver = (ServiceBusReceiverClient) getDeadLetterMessageReceiverFromBObject(receiverClient); } else { receiver = getReceiverFromBObject(receiverClient); } - receiver.complete(message); - receiverClient.getNativeData().remove(lockToken.getValue()); - LOGGER.debug("Completed the message(Id: " + message.getMessageId() + ") with lockToken " + lockToken); + receiver.complete(nativeMessage); + LOGGER.debug("Completed the message(Id: " + nativeMessage.getMessageId() + ") with lockToken " + + nativeMessage.getLockToken()); return null; } catch (BError e) { return ASBErrorCreator.fromBError(e); @@ -295,18 +295,17 @@ public static Object complete(BObject receiverClient, BString lockToken) { * Abandons message & make available again for processing from Queue or Subscription, based on messageLockToken. * * @param receiverClient Ballerina ASB client object - * @param lockToken Message lock token. + * @param message Message object. * @return An error if failed to abandon the message. */ - public static Object abandon(BObject receiverClient, BString lockToken) { + public static Object abandon(BObject receiverClient, BMap message) { try { ServiceBusReceiverClient receiver = getReceiverFromBObject(receiverClient); - ServiceBusReceivedMessage message = (ServiceBusReceivedMessage) receiverClient - .getNativeData(lockToken.getValue()); - receiver.abandon(message); - receiverClient.getNativeData().remove(lockToken.getValue()); + ServiceBusReceivedMessage nativeMessage = getNativeMessage(message); + ; + receiver.abandon(nativeMessage); LOGGER.debug(String.format("Done abandoning a message(Id: %s) using its lock token from %n%s", - message.getMessageId(), receiver.getEntityPath())); + nativeMessage.getMessageId(), receiver.getEntityPath())); return null; } catch (BError e) { return ASBErrorCreator.fromBError(e); @@ -321,24 +320,22 @@ public static Object abandon(BObject receiverClient, BString lockToken) { * Dead-Letter the message & moves the message to the Dead-Letter Queue based on messageLockToken. * * @param receiverClient Ballerina ASB client object - * @param lockToken Message lock token. + * @param message Message object. * @param deadLetterReason The dead letter reason. * @param deadLetterErrorDescription The dead letter error description. * @return An error if failed to dead letter the message. */ - public static Object deadLetter(BObject receiverClient, BString lockToken, Object deadLetterReason, + public static Object deadLetter(BObject receiverClient, BMap message, Object deadLetterReason, Object deadLetterErrorDescription) { try { ServiceBusReceiverClient receiver = getReceiverFromBObject(receiverClient); - ServiceBusReceivedMessage message = (ServiceBusReceivedMessage) receiverClient - .getNativeData(lockToken.getValue()); + ServiceBusReceivedMessage nativeMessage = getNativeMessage(message); DeadLetterOptions options = new DeadLetterOptions() .setDeadLetterErrorDescription(ASBUtils.convertString(deadLetterErrorDescription)); options.setDeadLetterReason(ASBUtils.convertString(deadLetterReason)); - receiver.deadLetter(message, options); - receiverClient.getNativeData().remove(lockToken.getValue()); + receiver.deadLetter(nativeMessage, options); LOGGER.debug(String.format("Done dead-lettering a message(Id: %s) using its lock token from %s", - message.getMessageId(), receiver.getEntityPath())); + nativeMessage.getMessageId(), receiver.getEntityPath())); return null; } catch (BError e) { return ASBErrorCreator.fromBError(e); @@ -353,18 +350,16 @@ public static Object deadLetter(BObject receiverClient, BString lockToken, Objec * Defer the message in a Queue or Subscription based on messageLockToken. * * @param receiverClient Ballerina ASB client object - * @param lockToken Message lock token. + * @param message Message object. * @return An error if failed to defer the message. */ - public static Object defer(BObject receiverClient, BString lockToken) { + public static Object defer(BObject receiverClient, BMap message) { try { ServiceBusReceiverClient receiver = getReceiverFromBObject(receiverClient); - ServiceBusReceivedMessage message = (ServiceBusReceivedMessage) receiverClient - .getNativeData(lockToken.getValue()); - receiver.defer(message); - receiverClient.getNativeData().remove(lockToken.getValue()); + ServiceBusReceivedMessage nativeMessage = getNativeMessage(message); + receiver.defer(nativeMessage); LOGGER.debug(String.format("Done deferring a message(Id: %s) using its lock token from %s", - message.getMessageId(), receiver.getEntityPath())); + nativeMessage.getMessageId(), receiver.getEntityPath())); return null; } catch (BError e) { return ASBErrorCreator.fromBError(e); @@ -410,18 +405,16 @@ public static Object receiveDeferred(BObject receiverClient, Object sequenceNumb * messageLockToken. * * @param receiverClient Ballerina ASB client object - * @param lockToken Message lock token. + * @param message Message object. * @return An error if failed to renewLock of the message. */ - public static Object renewLock(BObject receiverClient, BString lockToken) { + public static Object renewLock(BObject receiverClient, BMap message) { try { - ServiceBusReceivedMessage message = (ServiceBusReceivedMessage) receiverClient - .getNativeData(lockToken.getValue()); ServiceBusReceiverClient receiver = getReceiverFromBObject(receiverClient); - receiver.renewMessageLock(message); - receiverClient.getNativeData().remove(lockToken.getValue()); + ServiceBusReceivedMessage nativeMessage = getNativeMessage(message); + receiver.renewMessageLock(nativeMessage); LOGGER.debug(String.format("Done renewing a message(Id: %s) using its lock token from %s", - message.getMessageId(), receiver.getEntityPath())); + nativeMessage.getMessageId(), receiver.getEntityPath())); return null; } catch (BError e) { return ASBErrorCreator.fromBError(e); @@ -474,12 +467,12 @@ private static BMap constructExpectedMessageRecord(BObject rece } else { map.put(BODY, messageBody); } - - // This is to avoid adding messages to the native data map, if the receive-mode is 'RECEIVE_AND_DELETE'. + BMap constructedMessage = createBRecordValue(map, expectedType); + // Only add the native message if the message received in peek lock mode. if (!message.getLockToken().equals(DEFAULT_MESSAGE_LOCK_TOKEN)) { - receiverClient.addNativeData(message.getLockToken(), message); + constructedMessage.addNativeData(NATIVE_MESSAGE, message); } - return createBRecordValue(map, expectedType); + return constructedMessage; } private static Map populateOptionalFieldsMap(ServiceBusReceivedMessage message) { @@ -558,7 +551,10 @@ private static BMap getReceivedMessageBatch(BObject receiverCli LinkedList receivedMessages = new LinkedList<>(); for (ServiceBusReceivedMessage receivedMessage : receivedMessageStream) { BMap recordMap = constructExpectedMessageRecord(receiverClient, receivedMessage, null); - receivedMessages.add(createRecordValue(ModuleUtils.getModule(), ASBConstants.MESSAGE_RECORD, recordMap)); + BMap messageRecord = createRecordValue(ModuleUtils.getModule(), + ASBConstants.MESSAGE_RECORD, recordMap); + messageRecord.addNativeData(NATIVE_MESSAGE, receivedMessage); + receivedMessages.add(messageRecord); } BMap messageRecord = ValueCreator.createRecordValue(ModuleUtils.getModule(), @@ -667,4 +663,8 @@ private static void setClient(BObject receiverObject, ServiceBusReceiverClient c receiverObject.addNativeData(ASBConstants.RECEIVER_CLIENT, client); } } + + private static ServiceBusReceivedMessage getNativeMessage(BMap message) { + return (ServiceBusReceivedMessage) message.getNativeData(NATIVE_MESSAGE); + } } diff --git a/native/src/main/java/org/ballerinax/asb/util/ASBConstants.java b/native/src/main/java/org/ballerinax/asb/util/ASBConstants.java index 77e11c99..8e57d536 100644 --- a/native/src/main/java/org/ballerinax/asb/util/ASBConstants.java +++ b/native/src/main/java/org/ballerinax/asb/util/ASBConstants.java @@ -26,13 +26,14 @@ */ public class ASBConstants { - //Clients + //Native Object Identifiers public static final String RECEIVER_CLIENT = "RECEIVER_CLIENT"; public static final String ADMINISTRATOR_CLIENT = "ADMINISTRATOR_CLIENT"; public static final String SENDER_CLIENT = "SENDER_CLIENT"; public static final String DEAD_LETTER_RECEIVER_CLIENT = "DEAD_LETTER_RECEIVER_CLIENT"; + public static final String NATIVE_MESSAGE = "NATIVE_MESSAGE"; - //Client Init Data + //Receiver Client Init Data public static final String RECEIVER_CLIENT_CONNECTION_STRING = "CONNECTION_STRING"; public static final String RECEIVER_CLIENT_RECEIVE_MODE = "RECEIVE_MODE"; public static final String RECEIVER_CLIENT_TOPIC_NAME = "TOPIC_NAME"; @@ -45,17 +46,20 @@ public class ASBConstants { // Message constant fields public static final String MESSAGE_RECORD = "Message"; public static final String APPLICATION_PROPERTY_TYPE = "ApplicationProperties"; + // Message content data binding errors public static final String XML_CONTENT_ERROR = "Error while retrieving the xml content of the message. "; public static final String JSON_CONTENT_ERROR = "Error while retrieving the json content of the message. "; public static final String TEXT_CONTENT_ERROR = "Error while retrieving the string content of the message. "; public static final String INT_CONTENT_ERROR = "Error while retrieving the int content of the message. "; public static final String FLOAT_CONTENT_ERROR = "Error while retrieving the float content of the message. "; + // Batch Message constant fields public static final String MESSAGE_BATCH_RECORD = "MessageBatch"; public static final String MESSAGES_OBJECT = "Messages"; public static final BString MESSAGES_CONTENT = StringUtils.fromString("messages"); public static final BString MESSAGE_COUNT = StringUtils.fromString("messageCount"); + // Message receive modes public static final String PEEK_LOCK = "PEEKLOCK"; public static final String RECEIVE_AND_DELETE = "RECEIVEANDDELETE"; @@ -115,6 +119,7 @@ public class ASBConstants { // Error constant fields static final String ASB_ERROR = "Error"; + // Subscription/Topic/Queue Properties record fields public static final String SUBSCRIPTION_CREATED_RECORD = "SubscriptionProperties"; public static final String TOPIC_CREATED_RECORD = "TopicProperties"; public static final String QUEUE_CREATED_RECORD = "QueueProperties"; @@ -326,6 +331,7 @@ public class ASBConstants { "deadLetteringOnFilterEvaluationExceptions"; public static final String CREATED_SUBSCRIPTION_RECORD_FIELD_SESSION_REQUIRED = "requiresSession"; + // Subscription List public static final String LIST_OF_SUBSCRIPTIONS = "list"; public static final String LIST_OF_SUBSCRIPTIONS_RECORD = "SubscriptionList"; @@ -342,6 +348,7 @@ public class ASBConstants { public static final String CREATED_RULE_RECORD_FIELD_TYPE_NAME = "rule"; public static final String CREATED_RULE_RECORD_FIELD_ACTION = "action"; public static final String CREATED_RULE_RECORD_FIELD_FILTER = "filter"; + // Rule List public static final String LIST_OF_RULES = "list"; public static final String LIST_OF_RULES_RECORD = "RuleList"; From d68d5f20b64a6b0bb9f12a693a9b53510cb2ef25 Mon Sep 17 00:00:00 2001 From: RDPerera Date: Mon, 18 Sep 2023 07:22:01 +0530 Subject: [PATCH 2/6] Address Review Changes --- ballerina/receiver.bal | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/ballerina/receiver.bal b/ballerina/receiver.bal index 05ed3315..26145806 100644 --- a/ballerina/receiver.bal +++ b/ballerina/receiver.bal @@ -119,7 +119,7 @@ public isolated client class MessageReceiver { if message?.lockToken.toString() != DEFAULT_MESSAGE_LOCK_TOKEN { return complete(self, message); } - return createError("Failed to complete message with ID " + message?.messageId.toString()); + return createError(string `Failed to complete message with ID ${message?.messageId.toString()}`); } # Abandon message from queue or subscription based on messageLockToken. Abandon processing of the message for @@ -133,7 +133,7 @@ public isolated client class MessageReceiver { if message?.lockToken.toString() != DEFAULT_MESSAGE_LOCK_TOKEN { return abandon(self, message); } - return createError("Failed to abandon message with ID " + message?.messageId.toString()); + return createError(string `Failed to abandon message with ID ${message?.messageId.toString()}`); } # Dead-Letter the message & moves the message to the Dead-Letter Queue based on messageLockToken. Transfer @@ -151,7 +151,7 @@ public isolated client class MessageReceiver { if message?.lockToken.toString() != DEFAULT_MESSAGE_LOCK_TOKEN { return deadLetter(self, message, deadLetterReason, deadLetterErrorDescription); } - return createError("Failed to deadletter message with ID " + message?.messageId.toString()); + return createError(string `Failed to deadletter message with ID ${message?.messageId.toString()}`); } # Defer the message in a Queue or Subscription based on messageLockToken. It prevents the message from being @@ -189,7 +189,7 @@ public isolated client class MessageReceiver { if message?.lockToken.toString() != DEFAULT_MESSAGE_LOCK_TOKEN { return renewLock(self, message); } - return createError("Failed to renew lock on message with ID " + message?.messageId.toString()); + return createError(string `Failed to renew lock on message with ID ${message?.messageId.toString()}`); } # Closes the ASB sender connection. From 99aa1775c6f483964ef25fc150705d42f6ee0bdb Mon Sep 17 00:00:00 2001 From: RDPerera Date: Mon, 18 Sep 2023 09:21:54 +0530 Subject: [PATCH 3/6] Address Review Changes --- ballerina/receiver.bal | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/ballerina/receiver.bal b/ballerina/receiver.bal index 26145806..47ef7b88 100644 --- a/ballerina/receiver.bal +++ b/ballerina/receiver.bal @@ -119,7 +119,7 @@ public isolated client class MessageReceiver { if message?.lockToken.toString() != DEFAULT_MESSAGE_LOCK_TOKEN { return complete(self, message); } - return createError(string `Failed to complete message with ID ${message?.messageId.toString()}`); + return createError(string `Failed to complete message with ID: ${message?.messageId.toString()}`); } # Abandon message from queue or subscription based on messageLockToken. Abandon processing of the message for @@ -133,7 +133,7 @@ public isolated client class MessageReceiver { if message?.lockToken.toString() != DEFAULT_MESSAGE_LOCK_TOKEN { return abandon(self, message); } - return createError(string `Failed to abandon message with ID ${message?.messageId.toString()}`); + return createError(string `Failed to abandon message with ID: ${message?.messageId.toString()}`); } # Dead-Letter the message & moves the message to the Dead-Letter Queue based on messageLockToken. Transfer @@ -151,7 +151,7 @@ public isolated client class MessageReceiver { if message?.lockToken.toString() != DEFAULT_MESSAGE_LOCK_TOKEN { return deadLetter(self, message, deadLetterReason, deadLetterErrorDescription); } - return createError(string `Failed to deadletter message with ID ${message?.messageId.toString()}`); + return createError(string `Failed to deadletter message with ID: ${message?.messageId.toString()}`); } # Defer the message in a Queue or Subscription based on messageLockToken. It prevents the message from being @@ -189,7 +189,7 @@ public isolated client class MessageReceiver { if message?.lockToken.toString() != DEFAULT_MESSAGE_LOCK_TOKEN { return renewLock(self, message); } - return createError(string `Failed to renew lock on message with ID ${message?.messageId.toString()}`); + return createError(string `Failed to renew lock on message with ID: ${message?.messageId.toString()}`); } # Closes the ASB sender connection. From f421f488dc38f28b2d64f54f6162afc267b93148 Mon Sep 17 00:00:00 2001 From: RDPerera Date: Mon, 18 Sep 2023 13:29:19 +0530 Subject: [PATCH 4/6] Addressed review comments --- .../main/java/org/ballerinax/asb/receiver/MessageReceiver.java | 1 - 1 file changed, 1 deletion(-) diff --git a/native/src/main/java/org/ballerinax/asb/receiver/MessageReceiver.java b/native/src/main/java/org/ballerinax/asb/receiver/MessageReceiver.java index 01835648..90fa7e6b 100644 --- a/native/src/main/java/org/ballerinax/asb/receiver/MessageReceiver.java +++ b/native/src/main/java/org/ballerinax/asb/receiver/MessageReceiver.java @@ -302,7 +302,6 @@ public static Object abandon(BObject receiverClient, BMap messa try { ServiceBusReceiverClient receiver = getReceiverFromBObject(receiverClient); ServiceBusReceivedMessage nativeMessage = getNativeMessage(message); - ; receiver.abandon(nativeMessage); LOGGER.debug(String.format("Done abandoning a message(Id: %s) using its lock token from %n%s", nativeMessage.getMessageId(), receiver.getEntityPath())); From 54fdc3b17d198f8ff30a60fb12beed6b572abed6 Mon Sep 17 00:00:00 2001 From: RDPerera Date: Tue, 19 Sep 2023 14:05:55 +0530 Subject: [PATCH 5/6] Add test case to check invalid message completes --- .../asb_sender_receiver_negative_tests.bal | 43 +++++++++++++++++++ 1 file changed, 43 insertions(+) diff --git a/ballerina/tests/asb_sender_receiver_negative_tests.bal b/ballerina/tests/asb_sender_receiver_negative_tests.bal index d4ba4267..9e942925 100644 --- a/ballerina/tests/asb_sender_receiver_negative_tests.bal +++ b/ballerina/tests/asb_sender_receiver_negative_tests.bal @@ -16,6 +16,9 @@ import ballerina/log; import ballerina/test; +import ballerina/regex; + +string invalidCompleteError = "^Failed to complete message with ID:.*$"; @test:Config { groups: ["asb_sender_receiver_negative"], @@ -52,6 +55,46 @@ function testReceivePayloadWithIncorrectExpectedType() returns error? { groups: ["asb_sender_receiver_negative"], dependsOn: [testReceivePayloadWithIncorrectExpectedType] } +function testInvalidComplete() returns error? { + log:printInfo("[[testInvalidComplete]"); + + log:printInfo("Initializing Asb sender client."); + MessageSender messageSender = check new (senderConfig); + + log:printInfo("Initializing Asb receiver client."); + receiverConfig.receiveMode = RECEIVE_AND_DELETE; + + MessageReceiver messageReceiver = check new (receiverConfig); + + log:printInfo("Sending via Asb sender."); + check messageSender->send(message1); + + log:printInfo("Receiving from Asb receiver client."); + Message|error? receivedMessage = messageReceiver->receive(serverWaitTime); + + if receivedMessage is Message { + log:printInfo("messgae" + receivedMessage.toString()); + Error? result = messageReceiver->complete(receivedMessage); + test:assertTrue(result is error, msg = "Unexpected Complete for Messages in Receive and Delete Mode"); + test:assertTrue(regex:matches((result).message(),invalidCompleteError), msg = "Invalid Complete for " + + " Messages in Receive and Delete Mode"); + } else if receivedMessage is () { + test:assertFail("No message in the queue."); + } else { + test:assertFail("Receiving message via Asb receiver connection failed."); + } + + log:printInfo("Closing Asb sender client."); + check messageSender->close(); + + log:printInfo("Closing Asb receiver client."); + check messageSender->close(); +} + +@test:Config { + groups: ["asb_sender_receiver_negative"], + dependsOn: [testInvalidComplete] +} function testReceivePayloadWithUnsupportedUnionExpectedType() returns error? { log:printInfo("[[testReceivePayloadWithUnsupportedUnionExpectedType]]"); log:printInfo("Creating Asb message sender."); From cae2e534ab7ec039301f791cdbade2f00a2f609d Mon Sep 17 00:00:00 2001 From: RDPerera Date: Tue, 26 Sep 2023 11:36:34 +0530 Subject: [PATCH 6/6] Add negative test case for invalid abandon --- .../asb_sender_receiver_negative_tests.bal | 44 ++++++++++++++++++- 1 file changed, 42 insertions(+), 2 deletions(-) diff --git a/ballerina/tests/asb_sender_receiver_negative_tests.bal b/ballerina/tests/asb_sender_receiver_negative_tests.bal index 9e942925..59b34282 100644 --- a/ballerina/tests/asb_sender_receiver_negative_tests.bal +++ b/ballerina/tests/asb_sender_receiver_negative_tests.bal @@ -19,7 +19,7 @@ import ballerina/test; import ballerina/regex; string invalidCompleteError = "^Failed to complete message with ID:.*$"; - +string invalidAbandonError = "^Failed to abandon message with ID:.*$"; @test:Config { groups: ["asb_sender_receiver_negative"], dependsOn: [testCreateQueue, testCreateTopicOperation, testCreateSubscription] @@ -88,13 +88,53 @@ function testInvalidComplete() returns error? { check messageSender->close(); log:printInfo("Closing Asb receiver client."); - check messageSender->close(); + check messageReceiver->close(); } @test:Config { groups: ["asb_sender_receiver_negative"], dependsOn: [testInvalidComplete] } +function testInvalidAbandon() returns error? { + log:printInfo("[[testInvalidAbandon]"); + + log:printInfo("Initializing Asb sender client."); + MessageSender messageSender = check new (senderConfig); + + log:printInfo("Initializing Asb receiver client."); + receiverConfig.receiveMode = RECEIVE_AND_DELETE; + + MessageReceiver messageReceiver = check new (receiverConfig); + + log:printInfo("Sending via Asb sender."); + check messageSender->send(message1); + + log:printInfo("Receiving from Asb receiver client."); + Message|error? receivedMessage = messageReceiver->receive(serverWaitTime); + + if receivedMessage is Message { + log:printInfo("messgae" + receivedMessage.toString()); + Error? result = messageReceiver->abandon(receivedMessage); + test:assertTrue(result is error, msg = "Unexpected Abandon for Messages in Receive and Delete Mode"); + test:assertTrue(regex:matches((result).message(),invalidAbandonError), msg = "Invalid Abandon for " + + " Messages in Receive and Delete Mode"); + } else if receivedMessage is () { + test:assertFail("No message in the queue."); + } else { + test:assertFail("Receiving message via Asb receiver connection failed."); + } + + log:printInfo("Closing Asb sender client."); + check messageSender->close(); + + log:printInfo("Closing Asb receiver client."); + check messageReceiver->close(); +} + +@test:Config { + groups: ["asb_sender_receiver_negative"], + dependsOn: [testInvalidAbandon] +} function testReceivePayloadWithUnsupportedUnionExpectedType() returns error? { log:printInfo("[[testReceivePayloadWithUnsupportedUnionExpectedType]]"); log:printInfo("Creating Asb message sender.");