Skip to content

Commit

Permalink
Merge pull request #191 from RDPerera/main
Browse files Browse the repository at this point in the history
Add DLQ messages receive/complete support
  • Loading branch information
NipunaRanasinghe authored Aug 27, 2023
2 parents 9b9c34b + 42c4cbb commit 52d16e7
Show file tree
Hide file tree
Showing 32 changed files with 1,420 additions and 305 deletions.
10 changes: 2 additions & 8 deletions ballerina/admin.bal
Original file line number Diff line number Diff line change
Expand Up @@ -21,20 +21,14 @@ import ballerina/jballerina.java as java;
@display {label: "Azure Service Bus Administrator", iconPath: "icon.png"}
public isolated client class Administrator {

final handle adminHandle;

# Initialize the Azure Service Bus Admin client.
# Create an [Azure account](https://docs.microsoft.com/en-us/learn/modules/create-an-azure-account/) and
# obtain tokens following [this guide](https://docs.microsoft.com/en-us/azure/service-bus-messaging/service-bus-quickstart-portal#get-the-connection-string).
# Configure the connection string to have the [required permission](https://docs.microsoft.com/en-us/azure/service-bus-messaging/service-bus-sas).
#
# + connectionString - Azure Service Bus connection string
public isolated function init(@display {label: "Azure Service Bus connection string"} string connectionString) returns Error? {
handle|Error initResult = initializeAdmin(java:fromString(connectionString));
if (initResult is Error) {
return initResult;
}
self.adminHandle = initResult;
check initializeAdministrator(self, java:fromString(connectionString));
}

# Create a topic with the given name or name and options.
Expand Down Expand Up @@ -264,6 +258,6 @@ public isolated client class Administrator {
} external;
}

isolated function initializeAdmin(handle connectionString) returns handle|Error = @java:Method {
isolated function initializeAdministrator(Administrator adminClient, handle connectionString) returns Error? = @java:Method {
'class: "org.ballerinax.asb.admin.Administrator"
} external;
105 changes: 52 additions & 53 deletions ballerina/receiver.bal
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
// Copyright (c) 2021 WSO2 Inc. (http://www.wso2.org) All Rights Reserved.
// Copyright (c) 2023 WSO2 LLC. (http://www.wso2.org).
//
// WSO2 Inc. licenses this file to you under the Apache License,
// WSO2 LLC. licenses this file to you under the Apache License,
// Version 2.0 (the "License"); you may not use this file except
// in compliance with the License.
// You may obtain a copy of the License at
Expand All @@ -20,15 +20,14 @@ import ballerina/jballerina.java as java;
# Service Bus API provides data access to highly reliable queues and publish/subscribe topics of Azure Service Bus with deep feature capabilities.
@display {label: "Azure Service Bus Message Receiver", iconPath: "icon.png"}
public isolated client class MessageReceiver {
private string connectionString;

private string connectionString;
private string queueName;
private string subscriptionName;
private string topicName;
private string receiveMode;
private int maxAutoLockRenewDuration;
private LogLevel logLevel;
final handle receiverHandle;

# Initializes the connector. During initialization you can pass the [Shared Access Signature (SAS) authentication credentials](https://docs.microsoft.com/en-us/azure/service-bus-messaging/service-bus-sas)
# Create an [Azure account](https://docs.microsoft.com/en-us/learn/modules/create-an-azure-account/) and
Expand Down Expand Up @@ -58,63 +57,65 @@ public isolated client class MessageReceiver {
self.receiveMode = config.receiveMode;
self.maxAutoLockRenewDuration = config.maxAutoLockRenewDuration;
self.logLevel = customConfiguration.logLevel;
handle|Error initResult = initMessageReceiver(java:fromString(self.connectionString),
check initializeReceiver(self, java:fromString(self.connectionString),
java:fromString(self.queueName), java:fromString(self.topicName), java:fromString(self.subscriptionName),
java:fromString(self.receiveMode), self.maxAutoLockRenewDuration, java:fromString(self.logLevel), config.amqpRetryOptions);
if initResult is Error {
return initResult;
}

self.receiverHandle = initResult;
}

# Receive message from queue or subscription.
#
#
# + serverWaitTime - Specified server wait time in seconds to receive message (optional)
# + T - Expected type of the message. This can be either a `asb:Message` or a subtype of it.
# + deadLettered - If set to `true`, messages from dead-letter queue will be received. (optional)
# + return - A `asb:Message` record if message is received, `()` if no message is in the queue or else an `asb:Error`
# if failed to receive message
# if failed to receive message
@display {label: "Receive Message"}
isolated remote function receive(@display {label: "Server Wait Time"} int? serverWaitTime = 60,
@display {label: "Expected Type"} typedesc<Message> T = <>)
returns @display {label: "Message"} T|Error? = @java:Method {
@display {label: "Expected Type"} typedesc<Message> T = <>,
@display {label: "Dead-Lettered Messages"} boolean deadLettered = false)
returns @display {label: "Message"} T|Error? = @java:Method {
'class: "org.ballerinax.asb.receiver.MessageReceiver"
} external;

# Receive message payload from queue or subscription.
#
# + serverWaitTime - Specified server wait time in seconds to receive message (optional)
# + T - Expected type of the message. This can be any subtype of `anydata` type
# + deadLettered - If set to `true`, messages from dead-letter queue will be received. (optional)
# + return - A `asb:Message` record if message is received, `()` if no message is in the queue or else an `asb:Error`
# if failed to receive message
# if failed to receive message
@display {label: "Receive Message Payload"}
isolated remote function receivePayload(@display {label: "Server Wait Time"} int? serverWaitTime = 60,
@display {label: "Expected Type"} typedesc<anydata> T = <>)
isolated remote function receivePayload(@display {label: "Server Wait Time"} int? serverWaitTime = 60,
@display {label: "Expected Type"} typedesc<anydata> T = <>,
@display {label: "Dead-Lettered Messages"} boolean deadLettered = false)
returns @display {label: "Message Payload"} T|Error = @java:Method {
'class: "org.ballerinax.asb.receiver.MessageReceiver"
} external;

# Receive batch of messages from queue or subscription.
#
#
# + maxMessageCount - Maximum message count to receive in a batch
# + serverWaitTime - Specified server wait time in seconds to receive message (optional)
# + deadLettered - If set to `true`, messages from dead-letter queue will be received. (optional)
# + return - A `asb:MessageBatch` record if batch is received, `()` if no batch is in the queue or else an `asb:Error`
# if failed to receive batch
# if failed to receive batch
@display {label: "Receive Batch Message"}
isolated remote function receiveBatch(@display {label: "Maximum Message Count"} int maxMessageCount,
@display {label: "Server Wait Time"} int? serverWaitTime = ())
returns @display {label: "Batch Message"} MessageBatch|Error? {
MessageBatch|Error? receivedMessages = receiveBatch(self, maxMessageCount, serverWaitTime);
isolated remote function receiveBatch(@display {label: "Maximum Message Count"} int maxMessageCount,
@display {label: "Server Wait Time"} int? serverWaitTime = (),
@display {label: "Dead-Lettered Messages"} boolean deadLettered = false)
returns @display {label: "Batch Message"} MessageBatch|Error? {
MessageBatch|Error? receivedMessages = receiveBatch(self, maxMessageCount, serverWaitTime, deadLettered);
return receivedMessages;
}

# Complete message from queue or subscription based on messageLockToken. Declares the message processing to be
# successfully completed, removing the message from the queue.
#
#
# + message - `asb:Message` record
# + return - An `asb:Error` if failed to complete message or else `()`
@display {label: "Complete Message"}
isolated remote function complete(@display {label: "Message"} Message message) returns Error? {
isolated remote function complete(@display {label: "Message"} Message message)
returns Error? {
if message?.lockToken.toString() != DEFAULT_MESSAGE_LOCK_TOKEN {
return complete(self, message?.lockToken.toString());
}
Expand All @@ -124,7 +125,7 @@ public isolated client class MessageReceiver {
# Abandon message from queue or subscription based on messageLockToken. Abandon processing of the message for
# the time being, returning the message immediately back to the queue to be picked up by another (or the same)
# receiver.
#
#
# + message - `asb:Message` record
# + return - An `asb:Error` if failed to abandon message or else `()`
@display {label: "Abandon Message"}
Expand All @@ -137,52 +138,51 @@ public isolated client class MessageReceiver {

# Dead-Letter the message & moves the message to the Dead-Letter Queue based on messageLockToken. Transfer
# the message from the primary queue into a special "dead-letter sub-queue".
#
#
# + message - `asb:Message` record
# + deadLetterReason - The deadletter reason (optional)
# + deadLetterErrorDescription - The deadletter error description (optional)
# + return - An `asb:Error` if failed to deadletter message or else `()`
@display {label: "Dead Letter Message"}
isolated remote function deadLetter(@display {label: "Message"} Message message,
@display {label: "Dead Letter Reason"} string? deadLetterReason = (),
@display{label: "Dead Letter Description"}
isolated remote function deadLetter(@display {label: "Message"} Message message,
@display {label: "Dead Letter Reason"} string deadLetterReason = "DEADLETTERED_BY_RECEIVER",
@display {label: "Dead Letter Description"}
string? deadLetterErrorDescription = ()) returns Error? {
if message?.lockToken.toString() != DEFAULT_MESSAGE_LOCK_TOKEN {
return deadLetter(self, message?.lockToken.toString(), deadLetterReason,
return deadLetter(self, message?.lockToken.toString(), deadLetterReason,
deadLetterErrorDescription);
}
return createError("Failed to deadletter message with ID " + message?.messageId.toString());
}

# Defer the message in a Queue or Subscription based on messageLockToken. It prevents the message from being
# directly received from the queue by setting it aside such that it must be received by sequence number.
#
#
# + message - `asb:Message` record
# + return - An `asb:Error` if failed to defer message or else sequence number
@display {label: "Defer Message"}
isolated remote function defer(@display {label: "Message"} Message message)
returns @display {label: "Deferred Msg Seq Num"} int|Error {
isolated remote function defer(@display {label: "Message"} Message message)
returns @display {label: "Deferred Msg Seq Num"} int|Error {
check defer(self, message?.lockToken.toString());
return <int> message?.sequenceNumber;
return <int>message?.sequenceNumber;
}

# Receives a deferred Message. Deferred messages can only be received by using sequence number and return
# Message object.
#
#
# + sequenceNumber - Unique number assigned to a message by Service Bus. The sequence number is a unique 64-bit
# integer assigned to a message as it is accepted and stored by the broker and functions as
# its true identifier.
# integer assigned to a message as it is accepted and stored by the broker and functions as
# its true identifier.
# + return - An `asb:Error` if failed to receive deferred message, a Message record if successful or else `()`
@display {label: "Receive Deferred Message"}
isolated remote function receiveDeferred(@display {label: "Deferred Msg Seq Num"}
int sequenceNumber)
returns @display {label: "Deferred Message"} Message|Error? {
Message? message = check receiveDeferred(self, sequenceNumber);
return message;
}
isolated remote function receiveDeferred(@display {label: "Deferred Msg Seq Num"}
int sequenceNumber)
returns @display {label: "Deferred Message"} Message|Error? = @java:Method {
'class: "org.ballerinax.asb.receiver.MessageReceiver"
} external;

# The operation renews lock on a message in a queue or subscription based on messageLockToken.
#
#
# + message - `asb:Message` record
# + return - An `asb:Error` if failed to renew message or else `()`
@display {label: "Renew Lock On Message"}
Expand All @@ -203,14 +203,13 @@ public isolated client class MessageReceiver {
} external;
}

isolated function initMessageReceiver(handle connectionString, handle queueName, handle topicName,
handle subscriptionName, handle receiveMode, int maxAutoLockRenewDuration, handle isLogActive, AmqpRetryOptions retryOptions) returns handle|Error = @java:Method {
name: "initializeReceiver",
isolated function initializeReceiver(MessageReceiver receiverClient, handle connectionString, handle queueName, handle topicName,
handle subscriptionName, handle receiveMode, int maxAutoLockRenewDuration, handle isLogActive, AmqpRetryOptions retryOptions) returns Error? = @java:Method {
'class: "org.ballerinax.asb.receiver.MessageReceiver"
} external;

isolated function receiveBatch(MessageReceiver endpointClient, int? maxMessageCount, int? serverWaitTime)
returns MessageBatch|Error? = @java:Method {
isolated function receiveBatch(MessageReceiver endpointClient, int? maxMessageCount, int? serverWaitTime, boolean deadLettered)
returns MessageBatch|Error? = @java:Method {
'class: "org.ballerinax.asb.receiver.MessageReceiver"
} external;

Expand All @@ -222,8 +221,8 @@ isolated function abandon(MessageReceiver endpointClient, string lockToken) retu
'class: "org.ballerinax.asb.receiver.MessageReceiver"
} external;

isolated function deadLetter(MessageReceiver endpointClient, string lockToken, string? deadLetterReason, string? deadLetterErrorDescription) returns
Error? = @java:Method {
isolated function deadLetter(MessageReceiver endpointClient, string lockToken, string? deadLetterReason, string? deadLetterErrorDescription) returns
Error? = @java:Method {
'class: "org.ballerinax.asb.receiver.MessageReceiver"
} external;

Expand Down
40 changes: 13 additions & 27 deletions ballerina/sender.bal
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
// Copyright (c) 2021 WSO2 Inc. (http://www.wso2.org) All Rights Reserved.
// Copyright (c) 2023 WSO2 LLC.
//
// WSO2 Inc. licenses this file to you under the Apache License,
// WSO2 LLC. licenses this file to you under the Apache License,
// Version 2.0 (the "License"); you may not use this file except
// in compliance with the License.
// You may obtain a copy of the License at
Expand All @@ -23,7 +23,6 @@ import ballerina/time;
public isolated client class MessageSender {

private string connectionString;
final handle senderHandle;
private string topicOrQueueName;
private string entityType;
private LogLevel logLevel;
Expand All @@ -40,14 +39,9 @@ public isolated client class MessageSender {
self.topicOrQueueName = config.topicOrQueueName;
self.entityType = config.entityType;
self.logLevel = customConfiguration.logLevel;
handle|Error initResult = initMessageSender(java:fromString(self.connectionString),
check initializeSender(self, java:fromString(self.connectionString),
java:fromString(self.entityType), java:fromString(self.topicOrQueueName),
java:fromString(self.logLevel), config.amqpRetryOptions);
if (initResult is Error) {
return initResult;
}

self.senderHandle = initResult;
}

# Send message to queue or topic with a message body.
Expand Down Expand Up @@ -87,9 +81,9 @@ public isolated client class MessageSender {
#
# + sequenceNumber - The sequence number of the message to cancel
# + return - If the message could not be cancelled
isolated remote function cancel(@display {label: "Sequence Number"} int sequenceNumber) returns Error? {
return cancel(self, sequenceNumber);
}
isolated remote function cancel(@display {label: "Sequence Number"} int sequenceNumber) returns Error? = @java:Method {
'class: "org.ballerinax.asb.sender.MessageSender"
} external;

# Send batch of messages to queue or topic.
#
Expand All @@ -107,32 +101,24 @@ public isolated client class MessageSender {
#
# + return - An `asb:Error` if failed to close connection or else `()`
@display {label: "Close Sender Connection"}
isolated remote function close() returns Error? {
return closeSender(self);
}
isolated remote function close() returns Error? = @java:Method {
'class: "org.ballerinax.asb.sender.MessageSender"
} external;
}

isolated function initMessageSender(handle connectionString, handle entityType, handle topicOrQueueName, handle isLogEnabled, AmqpRetryOptions retryOptions) returns handle|Error = @java:Method {
name: "initializeSender",
isolated function initializeSender(MessageSender senderClient, handle connectionString, handle entityType, handle topicOrQueueName, handle isLogEnabled, AmqpRetryOptions retryOptions) returns Error? = @java:Method {
'class: "org.ballerinax.asb.sender.MessageSender"
} external;

isolated function send(MessageSender endpointClient, Message message) returns Error? = @java:Method {
isolated function send(MessageSender senderClient, Message message) returns Error? = @java:Method {
'class: "org.ballerinax.asb.sender.MessageSender"
} external;

isolated function sendBatch(MessageSender endpointClient, MessageBatch messages) returns Error? = @java:Method {
isolated function sendBatch(MessageSender senderClient, MessageBatch messages) returns Error? = @java:Method {
'class: "org.ballerinax.asb.sender.MessageSender"
} external;

isolated function schedule(MessageSender endpointClient, Message message, time:Civil scheduleTime) returns int|Error = @java:Method {
isolated function schedule(MessageSender senderClient, Message message, time:Civil scheduleTime) returns int|Error = @java:Method {
'class: "org.ballerinax.asb.sender.MessageSender"
} external;

isolated function cancel(MessageSender endpointClient, int sequenceNumber) returns Error? = @java:Method {
'class: "org.ballerinax.asb.sender.MessageSender"
} external;

isolated function closeSender(MessageSender endpointClient) returns Error? = @java:Method {
'class: "org.ballerinax.asb.sender.MessageSender"
} external;
Loading

0 comments on commit 52d16e7

Please sign in to comment.