Skip to content

Commit

Permalink
Merge pull request #200 from RDPerera/listner
Browse files Browse the repository at this point in the history
Integrate Asynchronous Listener
  • Loading branch information
ayeshLK authored Oct 23, 2023
2 parents d0b8b8d + c3eb1e2 commit 83e80d6
Show file tree
Hide file tree
Showing 17 changed files with 1,439 additions and 5 deletions.
93 changes: 93 additions & 0 deletions ballerina/caller.bal
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
// Copyright (c) 2023 WSO2 LLC. (http://www.wso2.org).
//
// WSO2 LLC. licenses this file to you under the Apache License,
// Version 2.0 (the "License"); you may not use this file except
// in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

import ballerina/jballerina.java as java;

# Azure Service Bus Caller to perform functions on dispatched messages.
public class Caller {

private final LogLevel logLevel;

isolated function init(LogLevel logLevel) {
self.logLevel = logLevel;
}

# Complete message from queue or subscription based on messageLockToken. Declares the message processing to be
# successfully completed, removing the message from the queue.
#
# + message - Message record
# + return - An `asb:Error` if failed to complete message or else `()`
public isolated function complete(@display {label: "Message record"} Message message) returns Error? {
return completeMessage(self, message.lockToken, self.logLevel);
}

# Abandon message from queue or subscription based on messageLockToken. Abandon processing of the message for
# the time being, returning the message immediately back to the queue to be picked up by another (or the same)
# receiver.
#
# + message - Message record
# + return - An `asb:Error` if failed to abandon message or else `()`
public isolated function abandon(@display {label: "Message record"} Message message) returns Error? {
return abandonMessage(self, message.lockToken, self.logLevel);
}

# Dead-Letter the message & moves the message to the Dead-Letter Queue based on messageLockToken. Transfer
# the message from the primary queue into a special "dead-letter sub-queue".
#
# + message - Message record
# + deadLetterReason - The deadletter reason.
# + deadLetterErrorDescription - The deadletter error description.
# + return - An `asb:Error` if failed to deadletter message or else `()`
public isolated function deadLetter(@display {label: "Message record"} Message message,
@display {label: "Dead letter reason (optional)"} string? deadLetterReason = (),
@display {label: "Dead letter description (optional)"}
string? deadLetterErrorDescription = ()) returns Error? {
return deadLetterMessage(self, message.lockToken, deadLetterReason,
deadLetterErrorDescription, self.logLevel);
}

# Defer the message in a Queue or Subscription based on messageLockToken. It prevents the message from being
# directly received from the queue by setting it aside such that it must be received by sequence number.
#
# + message - Message record
# + return - An `asb:Error` if failed to defer message or else sequence number
public isolated function defer(@display {label: "Message record"} Message message)
returns @display {label: "Sequence Number of the deferred message"} int|Error {
check deferMessage(self, message.lockToken, self.logLevel);
return <int>message.sequenceNumber;
}
}

isolated function completeMessage(Caller caller, string? lockToken, string? logLevel) returns Error? = @java:Method {
name: "complete",
'class: "org.ballerinax.asb.listener.Caller"
} external;

isolated function abandonMessage(Caller caller, string? lockToken, string? logLevel) returns Error? = @java:Method {
name: "abandon",
'class: "org.ballerinax.asb.listener.Caller"
} external;

isolated function deadLetterMessage(Caller caller, string? lockToken, string? deadLetterReason,
string? deadLetterErrorDescription, string? logLevel) returns Error? = @java:Method {
name: "deadLetter",
'class: "org.ballerinax.asb.listener.Caller"
} external;

isolated function deferMessage(Caller caller, string? lockToken, string? logLevel) returns Error? = @java:Method {
name: "defer",
'class: "org.ballerinax.asb.listener.Caller"
} external;
113 changes: 113 additions & 0 deletions ballerina/listener.bal
Original file line number Diff line number Diff line change
@@ -0,0 +1,113 @@
// Copyright (c) 2023 WSO2 LLC. (http://www.wso2.org).
//
// WSO2 LLC. licenses this file to you under the Apache License,
// Version 2.0 (the "License"); you may not use this file except
// in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

import ballerina/jballerina.java as java;

# Ballerina Azure Service Bus Message Listener.
public class Listener {

private final string connectionString;
private final handle listenerHandle;
private final LogLevel logLevel;
Caller caller;

# Gets invoked to initialize the `listener`.
# The listener initialization requires setting the credentials.
# Create an [Azure account](https://azure.microsoft.com) and obtain tokens following [this guide](https://docs.microsoft.com/en-us/azure/service-bus-messaging/service-bus-quickstart-portal).
#
# + listenerConfig - The configurations to be used when initializing the `listener`
# + return - An error if listener initialization failed
public isolated function init(*ListenerConfig listenerConfig) returns error? {
self.connectionString = listenerConfig.connectionString;
self.logLevel = customConfiguration.logLevel;
self.listenerHandle = initListener(java:fromString(self.connectionString), java:fromString(self.logLevel));
self.caller = new Caller(self.logLevel);
externalInit(self.listenerHandle, self.caller);
}

# Starts consuming the messages on all the attached services.
#
# + return - `()` or else an error upon failure to start
public isolated function 'start() returns Error? {
return 'start(self.listenerHandle, self);
}

# Attaches the service to the `asb:Listener` endpoint.
#
# + s - Type descriptor of the service
# + name - Name of the service
# + return - `()` or else an error upon failure to register the service
public isolated function attach(MessageService s, string[]|string? name = ()) returns Error? {
return attach(self.listenerHandle, self, s);
}

# Stops consuming messages and detaches the service from the `asb:Listener` endpoint.
#
# + s - Type descriptor of the service
# + return - `()` or else an error upon failure to detach the service
public isolated function detach(MessageService s) returns Error? {
return detach(self.listenerHandle, self, s);
}

# Stops consuming messages through all consumer services by terminating the connection and all its channels.
#
# + return - `()` or else an error upon failure to close the `ChannelListener`
public isolated function gracefulStop() returns Error? {
return stop(self.listenerHandle, self);
}

# Stops consuming messages through all the consumer services and terminates the connection
# with the server.
#
# + return - `()` or else an error upon failure to close the `ChannelListener`.
public isolated function immediateStop() returns Error? {
return forceStop(self.listenerHandle, self);
}
}

isolated function initListener(handle connectionString, handle logLevel)
returns handle = @java:Constructor {
'class: "org.ballerinax.asb.listener.MessageListener",
paramTypes: [
"java.lang.String",
"java.lang.String"
]
} external;

isolated function externalInit(handle listenerHandle, Caller caller) = @java:Method {
'class: "org.ballerinax.asb.listener.MessageListener"
} external;

isolated function 'start(handle listenerHandle, Listener lis) returns Error? = @java:Method {
'class: "org.ballerinax.asb.listener.MessageListener"
} external;

isolated function stop(handle listenerHandle, Listener lis) returns Error? = @java:Method {
'class: "org.ballerinax.asb.listener.MessageListener"
} external;

isolated function attach(handle listenerHandle, Listener lis, MessageService serviceType) returns Error? = @java:Method {
'class: "org.ballerinax.asb.listener.MessageListener"
} external;

isolated function detach(handle listenerHandle, Listener lis, MessageService serviceType) returns Error? = @java:Method {
'class: "org.ballerinax.asb.listener.MessageListener"
} external;

isolated function forceStop(handle listenerHandle, Listener lis) returns Error? = @java:Method {
'class: "org.ballerinax.asb.listener.MessageListener"
} external;

31 changes: 31 additions & 0 deletions ballerina/service_types.bal
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
// Copyright (c) 2023 WSO2 LLC. (http://www.wso2.org).
//
// WSO2 LLC. licenses this file to you under the Apache License,
// Version 2.0 (the "License"); you may not use this file except
// in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

# Triggers when Choreo recieves a new message from Azure service bus. Available action: onMessage
public type MessageService service object {
# Triggers when a new message is received from Azure service bus
# + message - The Azure service bus message recieved
# + caller - The Azure service bus caller instance
# + return - Error on failure else nil()
isolated remote function onMessage(Message message, Caller caller) returns error?;

# Triggers when there is an error in message processing
#
# + context - Error message details
# + error - Ballerina error
# + return - Error on failure else nil()
isolated remote function onError(ErrorContext context, error 'error) returns error?;
};
4 changes: 2 additions & 2 deletions ballerina/tests/asb_admin_tests.bal
Original file line number Diff line number Diff line change
Expand Up @@ -962,7 +962,7 @@ function createRuleWithInclusionParameters() returns error? {
log:printInfo("[[createRuleWithInclusionParameters]]");
log:printInfo("Initializing Asb admin client.");
Administrator adminClient = check new (connectionString);
RuleProperties? ruleProp = check adminClient->createRule(testTopic4, testSubscription4, testRule4, rule=rule);
RuleProperties? ruleProp = check adminClient->createRule(testTopic4, testSubscription4, testRule4, rule = rule);
if ruleProp is RuleProperties {
log:printInfo("Rule created successfully.");
test:assertEquals(ruleProp.name, testRule4, msg = "Rule creation failed.");
Expand All @@ -981,7 +981,7 @@ function updateRuleWithInclusionParameters() returns error? {
log:printInfo("[[updateRuleWithInclusionParameters]]");
log:printInfo("Initializing Asb admin client.");
Administrator adminClient = check new (connectionString);
RuleProperties? ruleProp = check adminClient->updateRule(testTopic4, testSubscription4, testRule4, rule=rule);
RuleProperties? ruleProp = check adminClient->updateRule(testTopic4, testSubscription4, testRule4, rule = rule);
if ruleProp is RuleProperties {
log:printInfo("Rule created successfully.");
test:assertEquals(ruleProp.name, testRule4, msg = "Rule creation failed.");
Expand Down
Loading

0 comments on commit 83e80d6

Please sign in to comment.