Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Revamp asb:Listener/asb:Service based implementation #220

Merged
merged 39 commits into from
May 20, 2024
Merged
Show file tree
Hide file tree
Changes from 35 commits
Commits
Show all changes
39 commits
Select commit Hold shift + click to select a range
c7b4be7
[Automated] Update the toml files
ayeshLK May 6, 2024
4a15917
Add basic ballerina skeleton for the listener-svc
ayeshLK May 9, 2024
08c34df
Add initial native listener functionality
ayeshLK May 9, 2024
5bf9b5b
Add native-caller related functionalities
ayeshLK May 10, 2024
9d76dc7
Update doc-comments
ayeshLK May 10, 2024
7f8c4a3
Introduce functionality to invoke onError method
ayeshLK May 10, 2024
f3e55c6
Restructure onError cosumer logic
ayeshLK May 10, 2024
e74400d
Restructure the native-caller
ayeshLK May 10, 2024
c37e980
Restructure error consumer
ayeshLK May 10, 2024
cddc00d
Introduce a java level wrapper for ballerina svc object
ayeshLK May 10, 2024
d7d8c0e
Restructure the code base using the native bService wrapper
ayeshLK May 10, 2024
e4e02e9
Refactor the code-base
ayeshLK May 10, 2024
c4058a9
Restructure the code-base
ayeshLK May 10, 2024
18f567b
Introduce message consumer related functionalities
ayeshLK May 10, 2024
b11e651
Expose several APIs as public
ayeshLK May 10, 2024
d6f24cb
Remove unwanted files
ayeshLK May 10, 2024
b4b1595
Add the missing doc comments
ayeshLK May 10, 2024
6ec91c1
Resolve checkstyle error
ayeshLK May 10, 2024
4e34203
Refactor code base
ayeshLK May 10, 2024
7764ddb
Fix compilation errors
ayeshLK May 10, 2024
e7f4e21
Fix errorneous API signature
ayeshLK May 10, 2024
bc7cee8
Fix build failures
ayeshLK May 10, 2024
0a6b074
Refactor the code base
ayeshLK May 10, 2024
946ddbc
Add change log
ayeshLK May 10, 2024
35df0e6
Fix doc comments
ayeshLK May 10, 2024
73a9f7c
Properly update changelog
ayeshLK May 10, 2024
32f16a0
Remove unwanted validations
ayeshLK May 10, 2024
c26c976
Introduced manual auto-complete mode at native level
ayeshLK May 14, 2024
75e8c64
Remove unwanted validations
ayeshLK May 14, 2024
bb5c168
Add validation for multiple-service attached to the same listener
ayeshLK May 15, 2024
b62d1f4
Refactor the code base
ayeshLK May 15, 2024
9cbb8d0
Incorporate review suggestions
ayeshLK May 15, 2024
ecca40b
Incorporate review suggestions
ayeshLK May 15, 2024
c6cc6bb
Reintroduce auto-complete validation
ayeshLK May 20, 2024
8b4e6e9
Restructure the code base
ayeshLK May 20, 2024
3a041e7
Refactor test cases
ayeshLK May 20, 2024
19d9080
Update change log
ayeshLK May 20, 2024
5c8e70c
Update copyright statement
ayeshLK May 20, 2024
7c4fc42
Incorporate review suggestions
ayeshLK May 20, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion ballerina/Dependencies.toml
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ modules = [
[[package]]
org = "ballerina"
name = "observe"
version = "1.2.2"
version = "1.2.3"
dependencies = [
{org = "ballerina", name = "jballerina.java"}
]
Expand Down
78 changes: 21 additions & 57 deletions ballerina/caller.bal
Original file line number Diff line number Diff line change
Expand Up @@ -16,78 +16,42 @@

import ballerina/jballerina.java as java;

# Azure Service Bus Caller to perform functions on dispatched messages.
public class Caller {

private final LogLevel logLevel;

isolated function init(LogLevel logLevel) {
self.logLevel = logLevel;
}
# Represents a ASB caller, which can be used to mark messages as complete, abandon, deadLetter, or defer.
public isolated client class Caller {

# Complete message from queue or subscription based on messageLockToken. Declares the message processing to be
# successfully completed, removing the message from the queue.
#
# + message - Message record
#
# + return - An `asb:Error` if failed to complete message or else `()`
public isolated function complete(@display {label: "Message record"} Message message) returns Error? {
return completeMessage(self, message.lockToken, self.logLevel);
}
isolated remote function complete() returns Error? = @java:Method {
'class: "org.ballerinax.asb.listener.NativeCaller"
} external;

# Abandon message from queue or subscription based on messageLockToken. Abandon processing of the message for
# the time being, returning the message immediately back to the queue to be picked up by another (or the same)
# receiver.
#
# + message - Message record
#
# + propertiesToModify - Message properties to modify
# + return - An `asb:Error` if failed to abandon message or else `()`
public isolated function abandon(@display {label: "Message record"} Message message) returns Error? {
return abandonMessage(self, message.lockToken, self.logLevel);
}
isolated remote function abandon(*record {|anydata...;|} propertiesToModify) returns Error? = @java:Method {
'class: "org.ballerinax.asb.listener.NativeCaller"
} external;

# Dead-Letter the message & moves the message to the Dead-Letter Queue based on messageLockToken. Transfer
# the message from the primary queue into a special "dead-letter sub-queue".
#
# + message - Message record
# + deadLetterReason - The deadletter reason.
# + deadLetterErrorDescription - The deadletter error description.
#
# + options - Options to specify while putting message in dead-letter queue
# + return - An `asb:Error` if failed to deadletter message or else `()`
public isolated function deadLetter(@display {label: "Message record"} Message message,
@display {label: "Dead letter reason (optional)"} string? deadLetterReason = (),
@display {label: "Dead letter description (optional)"}
string? deadLetterErrorDescription = ()) returns Error? {
return deadLetterMessage(self, message.lockToken, deadLetterReason,
deadLetterErrorDescription, self.logLevel);
}
isolated remote function deadLetter(*DeadLetterOptions options) returns Error? = @java:Method {
'class: "org.ballerinax.asb.listener.NativeCaller"
} external;

# Defer the message in a Queue or Subscription based on messageLockToken. It prevents the message from being
# directly received from the queue by setting it aside such that it must be received by sequence number.
#
# + message - Message record
#
# + propertiesToModify - Message properties to modify
# + return - An `asb:Error` if failed to defer message or else sequence number
public isolated function defer(@display {label: "Message record"} Message message)
returns @display {label: "Sequence Number of the deferred message"} int|Error {
check deferMessage(self, message.lockToken, self.logLevel);
return <int>message.sequenceNumber;
}
isolated remote function defer(*record {|anydata...;|} propertiesToModify) returns Error? = @java:Method {
'class: "org.ballerinax.asb.listener.NativeCaller"
} external;
}

isolated function completeMessage(Caller caller, string? lockToken, string? logLevel) returns Error? = @java:Method {
name: "complete",
'class: "org.ballerinax.asb.listener.Caller"
} external;

isolated function abandonMessage(Caller caller, string? lockToken, string? logLevel) returns Error? = @java:Method {
name: "abandon",
'class: "org.ballerinax.asb.listener.Caller"
} external;

isolated function deadLetterMessage(Caller caller, string? lockToken, string? deadLetterReason,
string? deadLetterErrorDescription, string? logLevel) returns Error? = @java:Method {
name: "deadLetter",
'class: "org.ballerinax.asb.listener.Caller"
} external;

isolated function deferMessage(Caller caller, string? lockToken, string? logLevel) returns Error? = @java:Method {
name: "defer",
'class: "org.ballerinax.asb.listener.Caller"
} external;
3 changes: 3 additions & 0 deletions ballerina/errors.bal
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,9 @@
# Defines the common error type for the module.
public type Error distinct error;

# Error type to capture the errors occurred while retrieving messages in Azure service bus listener.
public type MessageRetrievalError distinct (Error & error<ErrorContext>);
ayeshLK marked this conversation as resolved.
Show resolved Hide resolved

isolated function createError(string|error|Error errorOrMessage) returns Error {
if errorOrMessage is Error {
// input is a ASB 'error' value
Expand Down
157 changes: 70 additions & 87 deletions ballerina/listener.bal
Original file line number Diff line number Diff line change
Expand Up @@ -16,98 +16,81 @@

import ballerina/jballerina.java as java;

# Ballerina Azure Service Bus Message Listener.
public class Listener {

private final string connectionString;
private final handle listenerHandle;
private final LogLevel logLevel;
Caller caller;

# Gets invoked to initialize the `listener`.
# The listener initialization requires setting the credentials.
# Create an [Azure account](https://azure.microsoft.com) and obtain tokens following [this guide](https://docs.microsoft.com/en-us/azure/service-bus-messaging/service-bus-quickstart-portal).
#
# + listenerConfig - The configurations to be used when initializing the `listener`
# + return - An error if listener initialization failed
public isolated function init(*ListenerConfig listenerConfig) returns error? {
self.connectionString = listenerConfig.connectionString;
self.logLevel = customConfiguration.logLevel;
self.listenerHandle = initListener(java:fromString(self.connectionString), java:fromString(self.logLevel));
self.caller = new Caller(self.logLevel);
externalInit(self.listenerHandle, self.caller);
# Represents a ASB consumer listener.
public isolated class Listener {

# Creates a new `asb:Listener`.
# ```ballerina
# listener asb:Listener asbListener = check new (
# connectionString = "xxxxxxxx",
# entityConfig = {
# queueName: "test-queue"
# },
# autoComplete = false
# );
# ```
#
# + config - ASB listener configurations
# + return - An `asb:Error` if an error is encountered or else '()'
public isolated function init(*ListenerConfiguration config) returns Error? {
return self.externInit(config);
}

# Starts consuming the messages on all the attached services.
#
# + return - `()` or else an error upon failure to start
public isolated function 'start() returns Error? {
return 'start(self.listenerHandle, self);
}

# Attaches the service to the `asb:Listener` endpoint.
#
# + s - Type descriptor of the service
# + name - Name of the service
# + return - `()` or else an error upon failure to register the service
public isolated function attach(MessageService s, string[]|string? name = ()) returns Error? {
return attach(self.listenerHandle, self, s);
}
private isolated function externInit(ListenerConfiguration config) returns Error? = @java:Method {
'class: "org.ballerinax.asb.listener.NativeListener"
} external;

# Stops consuming messages and detaches the service from the `asb:Listener` endpoint.
# Attaches an `asb:Service` to a listener.
# ```ballerina
# check asbListener.attach(asbService);
# ```
#
# + 'service - The service instance
# + name - Name of the service
# + return - An `asb:Error` if there is an error or else `()`
public isolated function attach(Service 'service, string[]|string? name = ()) returns Error? = @java:Method {
'class: "org.ballerinax.asb.listener.NativeListener"
} external;

# Detaches an `asb:Service` from the the listener.
# ```ballerina
# check asbListener.detach(asbService);
# ```
#
# + s - Type descriptor of the service
# + return - `()` or else an error upon failure to detach the service
public isolated function detach(MessageService s) returns Error? {
return detach(self.listenerHandle, self, s);
}

# Stops consuming messages through all consumer services by terminating the connection and all its channels.
# + 'service - The service to be detached
# + return - An `asb:Error` if there is an error or else `()`
public isolated function detach(Service 'service) returns Error? = @java:Method {
'class: "org.ballerinax.asb.listener.NativeListener"
} external;

# Starts the `asb:Listener`.
# ```ballerina
# check asbListener.'start();
# ```
#
# + return - `()` or else an error upon failure to close the `ChannelListener`
public isolated function gracefulStop() returns Error? {
return stop(self.listenerHandle, self);
}

# Stops consuming messages through all the consumer services and terminates the connection
# with the server.
# + return - An `asb:Error` if there is an error or else `()`
public isolated function 'start() returns Error? = @java:Method {
'class: "org.ballerinax.asb.listener.NativeListener"
} external;

# Stops the `asb:Listener` gracefully.
# ```ballerina
# check asbListener.gracefulStop();
# ```
#
# + return - `()` or else an error upon failure to close the `ChannelListener`.
public isolated function immediateStop() returns Error? {
return forceStop(self.listenerHandle, self);
}
# + return - An `asb:Error` if there is an error or else `()`
public isolated function gracefulStop() returns Error? = @java:Method {
'class: "org.ballerinax.asb.listener.NativeListener"
} external;

# Stops the `asb:Listener` immediately.
# ```ballerina
# check asbListener.immediateStop();
# ```
#
# + return - An `asb:Error` if there is an error or else `()`
public isolated function immediateStop() returns Error? = @java:Method {
'class: "org.ballerinax.asb.listener.NativeListener"
} external;
}

isolated function initListener(handle connectionString, handle logLevel)
returns handle = @java:Constructor {
'class: "org.ballerinax.asb.listener.MessageListener",
paramTypes: [
"java.lang.String",
"java.lang.String"
]
} external;

isolated function externalInit(handle listenerHandle, Caller caller) = @java:Method {
'class: "org.ballerinax.asb.listener.MessageListener"
} external;

isolated function 'start(handle listenerHandle, Listener lis) returns Error? = @java:Method {
'class: "org.ballerinax.asb.listener.MessageListener"
} external;

isolated function stop(handle listenerHandle, Listener lis) returns Error? = @java:Method {
'class: "org.ballerinax.asb.listener.MessageListener"
} external;

isolated function attach(handle listenerHandle, Listener lis, MessageService serviceType) returns Error? = @java:Method {
'class: "org.ballerinax.asb.listener.MessageListener"
} external;

isolated function detach(handle listenerHandle, Listener lis, MessageService serviceType) returns Error? = @java:Method {
'class: "org.ballerinax.asb.listener.MessageListener"
} external;

isolated function forceStop(handle listenerHandle, Listener lis) returns Error? = @java:Method {
'class: "org.ballerinax.asb.listener.MessageListener"
} external;

17 changes: 4 additions & 13 deletions ballerina/service_types.bal
Original file line number Diff line number Diff line change
Expand Up @@ -14,18 +14,9 @@
// specific language governing permissions and limitations
// under the License.

# Triggers when Choreo recieves a new message from Azure service bus. Available action: onMessage
public type MessageService service object {
# Triggers when a new message is received from Azure service bus
# + message - The Azure service bus message recieved
# + caller - The Azure service bus caller instance
# + return - Error on failure else nil()
isolated remote function onMessage(Message message, Caller caller) returns error?;
# The ASB service type.
public type Service distinct service object {
// isolated remote function onMessage(asb:Message message, asb:Caller caller) returns error?;

# Triggers when there is an error in message processing
#
# + context - Error message details
# + error - Ballerina error
# + return - Error on failure else nil()
isolated remote function onError(ErrorContext context, error 'error) returns error?;
// isolated remote function onError(asb:MessageRetrievalError 'error) returns error?;
ayeshLK marked this conversation as resolved.
Show resolved Hide resolved
};
Loading
Loading