diff --git a/ballerina/caller.bal b/ballerina/caller.bal new file mode 100644 index 00000000..67166706 --- /dev/null +++ b/ballerina/caller.bal @@ -0,0 +1,93 @@ +// Copyright (c) 2023 WSO2 LLC. (http://www.wso2.org). +// +// WSO2 LLC. licenses this file to you under the Apache License, +// Version 2.0 (the "License"); you may not use this file except +// in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +import ballerina/jballerina.java as java; + +# Azure Service Bus Caller to perform functions on dispatched messages. +public class Caller { + + private final LogLevel logLevel; + + isolated function init(LogLevel logLevel) { + self.logLevel = logLevel; + } + + # Complete message from queue or subscription based on messageLockToken. Declares the message processing to be + # successfully completed, removing the message from the queue. + # + # + message - Message record + # + return - An `asb:Error` if failed to complete message or else `()` + public isolated function complete(@display {label: "Message record"} Message message) returns Error? { + return completeMessage(self, message.lockToken, self.logLevel); + } + + # Abandon message from queue or subscription based on messageLockToken. Abandon processing of the message for + # the time being, returning the message immediately back to the queue to be picked up by another (or the same) + # receiver. + # + # + message - Message record + # + return - An `asb:Error` if failed to abandon message or else `()` + public isolated function abandon(@display {label: "Message record"} Message message) returns Error? { + return abandonMessage(self, message.lockToken, self.logLevel); + } + + # Dead-Letter the message & moves the message to the Dead-Letter Queue based on messageLockToken. Transfer + # the message from the primary queue into a special "dead-letter sub-queue". + # + # + message - Message record + # + deadLetterReason - The deadletter reason. + # + deadLetterErrorDescription - The deadletter error description. + # + return - An `asb:Error` if failed to deadletter message or else `()` + public isolated function deadLetter(@display {label: "Message record"} Message message, + @display {label: "Dead letter reason (optional)"} string? deadLetterReason = (), + @display {label: "Dead letter description (optional)"} + string? deadLetterErrorDescription = ()) returns Error? { + return deadLetterMessage(self, message.lockToken, deadLetterReason, + deadLetterErrorDescription, self.logLevel); + } + + # Defer the message in a Queue or Subscription based on messageLockToken. It prevents the message from being + # directly received from the queue by setting it aside such that it must be received by sequence number. + # + # + message - Message record + # + return - An `asb:Error` if failed to defer message or else sequence number + public isolated function defer(@display {label: "Message record"} Message message) + returns @display {label: "Sequence Number of the deferred message"} int|Error { + check deferMessage(self, message.lockToken, self.logLevel); + return message.sequenceNumber; + } +} + +isolated function completeMessage(Caller caller, string? lockToken, string? logLevel) returns Error? = @java:Method { + name: "complete", + 'class: "org.ballerinax.asb.listener.Caller" +} external; + +isolated function abandonMessage(Caller caller, string? lockToken, string? logLevel) returns Error? = @java:Method { + name: "abandon", + 'class: "org.ballerinax.asb.listener.Caller" +} external; + +isolated function deadLetterMessage(Caller caller, string? lockToken, string? deadLetterReason, + string? deadLetterErrorDescription, string? logLevel) returns Error? = @java:Method { + name: "deadLetter", + 'class: "org.ballerinax.asb.listener.Caller" +} external; + +isolated function deferMessage(Caller caller, string? lockToken, string? logLevel) returns Error? = @java:Method { + name: "defer", + 'class: "org.ballerinax.asb.listener.Caller" +} external; diff --git a/ballerina/listener.bal b/ballerina/listener.bal new file mode 100644 index 00000000..f7e9a787 --- /dev/null +++ b/ballerina/listener.bal @@ -0,0 +1,113 @@ +// Copyright (c) 2023 WSO2 LLC. (http://www.wso2.org). +// +// WSO2 LLC. licenses this file to you under the Apache License, +// Version 2.0 (the "License"); you may not use this file except +// in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +import ballerina/jballerina.java as java; + +# Ballerina Azure Service Bus Message Listener. +public class Listener { + + private final string connectionString; + private final handle listenerHandle; + private final LogLevel logLevel; + Caller caller; + + # Gets invoked to initialize the `listener`. + # The listener initialization requires setting the credentials. + # Create an [Azure account](https://azure.microsoft.com) and obtain tokens following [this guide](https://docs.microsoft.com/en-us/azure/service-bus-messaging/service-bus-quickstart-portal). + # + # + listenerConfig - The configurations to be used when initializing the `listener` + # + return - An error if listener initialization failed + public isolated function init(*ListenerConfig listenerConfig) returns error? { + self.connectionString = listenerConfig.connectionString; + self.logLevel = customConfiguration.logLevel; + self.listenerHandle = initListener(java:fromString(self.connectionString), java:fromString(self.logLevel)); + self.caller = new Caller(self.logLevel); + externalInit(self.listenerHandle, self.caller); + } + + # Starts consuming the messages on all the attached services. + # + # + return - `()` or else an error upon failure to start + public isolated function 'start() returns Error? { + return 'start(self.listenerHandle, self); + } + + # Attaches the service to the `asb:Listener` endpoint. + # + # + s - Type descriptor of the service + # + name - Name of the service + # + return - `()` or else an error upon failure to register the service + public isolated function attach(MessageService s, string[]|string? name = ()) returns Error? { + return attach(self.listenerHandle, self, s); + } + + # Stops consuming messages and detaches the service from the `asb:Listener` endpoint. + # + # + s - Type descriptor of the service + # + return - `()` or else an error upon failure to detach the service + public isolated function detach(MessageService s) returns Error? { + return detach(self.listenerHandle, self, s); + } + + # Stops consuming messages through all consumer services by terminating the connection and all its channels. + # + # + return - `()` or else an error upon failure to close the `ChannelListener` + public isolated function gracefulStop() returns Error? { + return stop(self.listenerHandle, self); + } + + # Stops consuming messages through all the consumer services and terminates the connection + # with the server. + # + # + return - `()` or else an error upon failure to close the `ChannelListener`. + public isolated function immediateStop() returns Error? { + return forceStop(self.listenerHandle, self); + } +} + +isolated function initListener(handle connectionString, handle logLevel) +returns handle = @java:Constructor { + 'class: "org.ballerinax.asb.listener.MessageListener", + paramTypes: [ + "java.lang.String", + "java.lang.String" + ] +} external; + +isolated function externalInit(handle listenerHandle, Caller caller) = @java:Method { + 'class: "org.ballerinax.asb.listener.MessageListener" +} external; + +isolated function 'start(handle listenerHandle, Listener lis) returns Error? = @java:Method { + 'class: "org.ballerinax.asb.listener.MessageListener" +} external; + +isolated function stop(handle listenerHandle, Listener lis) returns Error? = @java:Method { + 'class: "org.ballerinax.asb.listener.MessageListener" +} external; + +isolated function attach(handle listenerHandle, Listener lis, MessageService serviceType) returns Error? = @java:Method { + 'class: "org.ballerinax.asb.listener.MessageListener" +} external; + +isolated function detach(handle listenerHandle, Listener lis, MessageService serviceType) returns Error? = @java:Method { + 'class: "org.ballerinax.asb.listener.MessageListener" +} external; + +isolated function forceStop(handle listenerHandle, Listener lis) returns Error? = @java:Method { + 'class: "org.ballerinax.asb.listener.MessageListener" +} external; + diff --git a/ballerina/service_types.bal b/ballerina/service_types.bal new file mode 100644 index 00000000..a3b98664 --- /dev/null +++ b/ballerina/service_types.bal @@ -0,0 +1,31 @@ +// Copyright (c) 2023 WSO2 LLC. (http://www.wso2.org). +// +// WSO2 LLC. licenses this file to you under the Apache License, +// Version 2.0 (the "License"); you may not use this file except +// in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +# Triggers when Choreo recieves a new message from Azure service bus. Available action: onMessage +public type MessageService service object { + # Triggers when a new message is received from Azure service bus + # + message - The Azure service bus message recieved + # + caller - The Azure service bus caller instance + # + return - Error on failure else nil() + isolated remote function onMessage(Message message, Caller caller) returns error?; + + # Triggers when there is an error in message processing + # + # + context - Error message details + # + error - Ballerina error + # + return - Error on failure else nil() + isolated remote function onError(ErrorContext context, error 'error) returns error?; +}; diff --git a/ballerina/tests/asb_admin_tests.bal b/ballerina/tests/asb_admin_tests.bal index ed1ca779..b4bde1ea 100644 --- a/ballerina/tests/asb_admin_tests.bal +++ b/ballerina/tests/asb_admin_tests.bal @@ -962,7 +962,7 @@ function createRuleWithInclusionParameters() returns error? { log:printInfo("[[createRuleWithInclusionParameters]]"); log:printInfo("Initializing Asb admin client."); Administrator adminClient = check new (connectionString); - RuleProperties? ruleProp = check adminClient->createRule(testTopic4, testSubscription4, testRule4, rule=rule); + RuleProperties? ruleProp = check adminClient->createRule(testTopic4, testSubscription4, testRule4, rule = rule); if ruleProp is RuleProperties { log:printInfo("Rule created successfully."); test:assertEquals(ruleProp.name, testRule4, msg = "Rule creation failed."); @@ -981,7 +981,7 @@ function updateRuleWithInclusionParameters() returns error? { log:printInfo("[[updateRuleWithInclusionParameters]]"); log:printInfo("Initializing Asb admin client."); Administrator adminClient = check new (connectionString); - RuleProperties? ruleProp = check adminClient->updateRule(testTopic4, testSubscription4, testRule4, rule=rule); + RuleProperties? ruleProp = check adminClient->updateRule(testTopic4, testSubscription4, testRule4, rule = rule); if ruleProp is RuleProperties { log:printInfo("Rule created successfully."); test:assertEquals(ruleProp.name, testRule4, msg = "Rule creation failed."); diff --git a/ballerina/tests/asb_listner_test.bal b/ballerina/tests/asb_listner_test.bal new file mode 100644 index 00000000..281f3cf6 --- /dev/null +++ b/ballerina/tests/asb_listner_test.bal @@ -0,0 +1,205 @@ +// Copyright (c) 2023, WSO2 LLC. (http://www.wso2.org). +// +// WSO2 LLC. licenses this file to you under the Apache License, +// Version 2.0 (the "License"); you may not use this file except +// in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +import ballerina/lang.runtime; +import ballerina/log; +import ballerina/test; + +// Connection Configurations +string lisnterTestQueueName = "pre-created-test-queue"; + +ListenerConfig configuration = { + connectionString: connectionString +}; + +ASBServiceSenderConfig listnerTestSenderConfig = { + connectionString: connectionString, + entityType: QUEUE, + topicOrQueueName: lisnterTestQueueName +}; + +listener Listener asbListener = new (configuration); + +enum State { + NONE, + RECIEVED, + RECIEVED_AND_COMPLETED, + RECIEVED_AND_ABANDON, + RECIEVED_AND_DLQ, + RECIEVED_AND_DEFER +} + +isolated State listnerState = NONE; + +isolated function getListnerState() returns State +{ + lock { + return listnerState; + } +} + +isolated function setListnerState(State state) +{ + lock { + listnerState = state; + } +} + +isolated int testCaseCounter = 0; + +isolated function getTestCaseCount() returns int +{ + lock { + return testCaseCounter; + } +} + +isolated function increamentTestCaseCount() +{ + lock { + testCaseCounter += 1; + } +} + +@ServiceConfig { + queueName: "pre-created-test-queue", + peekLockModeEnabled: true, + maxConcurrency: 1, + prefetchCount: 20, + maxAutoLockRenewDuration: 300 +} +service MessageService on asbListener { + isolated remote function onMessage(Message message, Caller caller) returns error? { + log:printInfo("Message received:" + message.body.toString()); + if message.body == "This is ASB connector test-Message Body".toBytes() { + if getTestCaseCount() == 0 { + setListnerState(RECIEVED); + } + else if getTestCaseCount() == 1 && caller.complete(message) == () { + setListnerState(RECIEVED_AND_COMPLETED); + } + else if getTestCaseCount() == 2 && caller.defer(message) is int { + setListnerState(RECIEVED_AND_DEFER); + } + else if getTestCaseCount() == 3 && caller.deadLetter(message, "Testing Purpose", "Manual DLQ : Testing Purpose") == () { + setListnerState(RECIEVED_AND_DLQ); + } + else if getTestCaseCount() == 4 && caller.abandon(message) == () { + setListnerState(RECIEVED_AND_ABANDON); + } + increamentTestCaseCount(); + } + return; + } + isolated remote function onError(ErrorContext context, error 'error) returns error? { + return; + } +}; + +function sendMessage() returns error? { + MessageSender queueSender = check new (listnerTestSenderConfig); + check queueSender->send(message); + check queueSender->close(); +} + +@test:Config { + enable: true, + groups: ["asb_listner"] +} +function testListnerReceive() returns error? { + log:printInfo("[[testListnerReceive]]"); + log:printInfo("Sending via Asb sender client."); + check sendMessage(); + int counter = 10; + while (getListnerState() != RECIEVED && counter >= 0) { + runtime:sleep(1); + log:printInfo("Waiting for the message to be received"); + counter -= 1; + } + test:assertTrue(getListnerState() == RECIEVED, msg = "ASB listener did not receive the message"); +} + +@test:Config { + enable: true, + groups: ["asb_listner"], + dependsOn: [testListnerReceive] +} +function testListnerReceiveAndCompleted() returns error? { + log:printInfo("[[testListnerReceiveAndCompleted]]"); + log:printInfo("Sending via Asb sender client."); + check sendMessage(); + int counter = 10; + while (getListnerState() != RECIEVED_AND_COMPLETED && counter >= 0) { + runtime:sleep(1); + log:printInfo("Waiting for the message to be received"); + counter -= 1; + } + test:assertTrue(getListnerState() == RECIEVED_AND_COMPLETED, msg = "ASB listener did not receive and completed the message"); +} + +@test:Config { + enable: true, + groups: ["asb_listner"], + dependsOn: [testListnerReceiveAndCompleted] +} +function testListnerReceiveAndDefer() returns error? { + log:printInfo("[[testListnerReceiveAndDefer]]"); + log:printInfo("Sending via Asb sender client."); + check sendMessage(); + int counter = 10; + while (getListnerState() != RECIEVED_AND_DEFER && counter >= 0) { + runtime:sleep(1); + log:printInfo("Waiting for the message to be received"); + counter -= 1; + } + test:assertTrue(getListnerState() == RECIEVED_AND_DEFER, msg = "ASB listener did not receive and defer the message"); +} + +@test:Config { + enable: true, + groups: ["asb_listner"], + dependsOn: [testListnerReceiveAndDefer] +} +function testListnerReceiveAndDLQ() returns error? { + log:printInfo("[[testListnerReceiveAndDLQ]]"); + log:printInfo("Sending via Asb sender client."); + check sendMessage(); + int counter = 10; + while (getListnerState() != RECIEVED_AND_DLQ && counter >= 0) { + runtime:sleep(1); + log:printInfo("Waiting for the message to be received"); + counter -= 1; + } + test:assertTrue(getListnerState() == RECIEVED_AND_DLQ, msg = "ASB listener did not receive and DLQ the message"); +} + +@test:Config { + enable: true, + groups: ["asb_listner"], + dependsOn: [testListnerReceiveAndDLQ] +} +function testListnerReceiveAndAbandon() returns error? { + log:printInfo("[[testListnerReceiveAndAbandon]]"); + log:printInfo("Sending via Asb sender client."); + check sendMessage(); + int counter = 10; + while (getListnerState() != RECIEVED_AND_ABANDON && counter >= 0) { + runtime:sleep(1); + log:printInfo("Waiting for the message to be received"); + counter -= 1; + } + test:assertTrue(getListnerState() == RECIEVED_AND_ABANDON, msg = "ASB listener did not receive and abandon the message"); +} diff --git a/ballerina/tests/asb_sender_receiver_negative_tests.bal b/ballerina/tests/asb_sender_receiver_negative_tests.bal index 59b34282..385cb5b1 100644 --- a/ballerina/tests/asb_sender_receiver_negative_tests.bal +++ b/ballerina/tests/asb_sender_receiver_negative_tests.bal @@ -22,6 +22,7 @@ string invalidCompleteError = "^Failed to complete message with ID:.*$"; string invalidAbandonError = "^Failed to abandon message with ID:.*$"; @test:Config { groups: ["asb_sender_receiver_negative"], + enable: true, dependsOn: [testCreateQueue, testCreateTopicOperation, testCreateSubscription] } function testReceivePayloadWithIncorrectExpectedType() returns error? { @@ -53,6 +54,7 @@ function testReceivePayloadWithIncorrectExpectedType() returns error? { @test:Config { groups: ["asb_sender_receiver_negative"], + enable: true, dependsOn: [testReceivePayloadWithIncorrectExpectedType] } function testInvalidComplete() returns error? { @@ -163,6 +165,7 @@ function testReceivePayloadWithUnsupportedUnionExpectedType() returns error? { @test:Config { groups: ["asb_sender_receiver_negative"], + enable: true, dependsOn: [testReceivePayloadWithUnsupportedUnionExpectedType] } function testSendToInvalidTopic() returns error? { @@ -182,6 +185,7 @@ function testSendToInvalidTopic() returns error? { @test:Config { groups: ["asb_sender_receiver_negative"], + enable: true, dependsOn: [testSendToInvalidTopic] } function testReceiveFromInvalidQueue() returns error? { @@ -201,6 +205,7 @@ function testReceiveFromInvalidQueue() returns error? { @test:Config { groups: ["asb_sender_receiver_negative"], + enable: true, dependsOn: [testReceiveFromInvalidQueue] } function testInvalidConnectionString() returns error? { diff --git a/ballerina/tests/asb_sender_receiver_tests.bal b/ballerina/tests/asb_sender_receiver_tests.bal index 88ec796e..26a10a9a 100644 --- a/ballerina/tests/asb_sender_receiver_tests.bal +++ b/ballerina/tests/asb_sender_receiver_tests.bal @@ -45,6 +45,13 @@ ApplicationProperties applicationProperties = { properties: properties }; +Message message = { + body: byteContent, + contentType: TEXT, + timeToLive: timeToLive, + applicationProperties: applicationProperties +}; + Message message1 = { body: byteContent, contentType: TEXT, @@ -924,7 +931,7 @@ function testDeferMessageFromSubscriptionOperation() returns error? { function testMessageScheduling() returns error? { log:printInfo("[[testMessageScheduling]]"); MessageSender? topicSender = (); - MessageReceiver? subscriptionReceiver= (); + MessageReceiver? subscriptionReceiver = (); do { diff --git a/ballerina/types.bal b/ballerina/types.bal index abc10d6d..db8cedfb 100644 --- a/ballerina/types.bal +++ b/ballerina/types.bal @@ -844,3 +844,70 @@ public enum EntityStatus { @display {label: "UNKNOWN"} UNKNOWN = "Unknown" }; + +# Azure service bus listener configuration. +# +public type ListenerConfig record { + # The connection string of Azure service bus + @display {label: "ASB Connection String"} + string connectionString; +}; + +# Represents Custom configurations for the ASB connector +# +# + logLevel - Enables the connector debug log prints (log4j log levels), default: OFF +public type Options record { + @display {label: "Log Level"} + LogLevel logLevel = OFF; +}; + +# ErrorContext is a record type that represents error context information +# +# + entityPath - The entity path of the error source +# + className - The name of the class that threw the error +# + namespace - The namespace of the error source +# + errorSource - The error source, such as a function or action name +# + reason - The error reason +public type ErrorContext record { + @display {label: "Entity Path"} + string entityPath; + @display {label: "Class Name"} + string className; + @display {label: "Namespace"} + string namespace; + @display {label: "Error Source"} + string errorSource; + @display {label: "Reason"} + string reason; +}; + +# Configurations for the ASB Service +# +# + queueName - The name of the queue to listen to +# + peekLockModeEnabled - Whether to use peekLock mode or not +# + topicName - The name of the topic to listen to +# + subscriptionName - The name of the subscription to listen to +# + maxConcurrency - The maximum number of concurrent messages to process +# + prefetchCount - The number of messages to prefetch +# + maxAutoLockRenewDuration - The maximum duration to renew the lock automatically +# + logLevel - The log level to use +public type ASBServiceConfig record {| + @display {label: "Queue Name"} + string queueName?; + @display {label: "Peek Lock Mode Enabled"} + boolean peekLockModeEnabled = false; + @display {label: "Topic Name"} + string topicName?; + @display {label: "Subscription Name"} + string subscriptionName?; + @display {label: "Max Concurrency"} + int maxConcurrency = 1; + @display {label: "Prefetch Count"} + int prefetchCount = 0; + @display {label: "Max Auto Lock Renew Duration"} + int maxAutoLockRenewDuration = 300; + @display {label: "Log Level"} + string logLevel = ERROR; +|}; + +public annotation ASBServiceConfig ServiceConfig on service, class; diff --git a/build-config/spotbugs-exclude.xml b/build-config/spotbugs-exclude.xml index cf3be97a..a5ff3711 100644 --- a/build-config/spotbugs-exclude.xml +++ b/build-config/spotbugs-exclude.xml @@ -16,4 +16,8 @@ ~ under the License. --> + + + + diff --git a/examples/listner/create_listner.bal b/examples/listner/create_listner.bal new file mode 100644 index 00000000..2021bee9 --- /dev/null +++ b/examples/listner/create_listner.bal @@ -0,0 +1,45 @@ +// Copyright (c) 2023 WSO2 LLC. (http://www.wso2.org). +// +// WSO2 LLC. licenses this file to you under the Apache License, +// Version 2.0 (the "License"); you may not use this file except +// in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +import ballerina/log; +import ballerinax/asb; + +configurable string connectionString = ?; + +// Listner Configurations +asb:ListenerConfig configuration = { + connectionString: connectionString +}; + +listener asb:Listener asbListener = new (configuration); + +@asb:ServiceConfig { + queueName: "test-queue", + peekLockModeEnabled: true, + maxConcurrency: 1, + prefetchCount: 10, + maxAutoLockRenewDuration: 300 +} +service asb:MessageService on asbListener { + isolated remote function onMessage(asb:Message message, asb:Caller caller) returns asb:Error? { + log:printInfo("Message received from queue: " + message.toBalString()); + _ = check caller.complete(message); + } + + isolated remote function onError(asb:ErrorContext context, error 'error) returns asb:Error? { + log:printInfo("Error received from queue: " + context.toBalString()); + } +}; diff --git a/native/src/main/java/org/ballerinax/asb/listener/ASBResourceCallback.java b/native/src/main/java/org/ballerinax/asb/listener/ASBResourceCallback.java new file mode 100644 index 00000000..318d2452 --- /dev/null +++ b/native/src/main/java/org/ballerinax/asb/listener/ASBResourceCallback.java @@ -0,0 +1,50 @@ +/* + * Copyright (c) 2023, WSO2 LLC. (http://www.wso2.org). + * + * WSO2 LLC. licenses this file to you under the Apache License, + * Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.ballerinax.asb.listener; + +import io.ballerina.runtime.api.async.Callback; +import io.ballerina.runtime.api.values.BError; + +import java.util.concurrent.CountDownLatch; + +/** + * Handles the Azure service bus resource callback. + */ +public class ASBResourceCallback implements Callback { + + private final CountDownLatch countDownLatch; + + ASBResourceCallback(CountDownLatch countDownLatch) { + this.countDownLatch = countDownLatch; + } + + @Override + public void notifySuccess(Object obj) { + if (obj instanceof BError) { + ((BError) obj).printStackTrace(); + } + countDownLatch.countDown(); + } + + @Override + public void notifyFailure(BError error) { + error.printStackTrace(); + countDownLatch.countDown(); + } +} diff --git a/native/src/main/java/org/ballerinax/asb/listener/Caller.java b/native/src/main/java/org/ballerinax/asb/listener/Caller.java new file mode 100644 index 00000000..029ca002 --- /dev/null +++ b/native/src/main/java/org/ballerinax/asb/listener/Caller.java @@ -0,0 +1,130 @@ +/* + * Copyright (c) 2023, WSO2 LLC. (http://www.wso2.org). + * + * WSO2 LLC. licenses this file to you under the Apache License, + * Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.ballerinax.asb.listener; + +import com.azure.messaging.servicebus.ServiceBusException; +import com.azure.messaging.servicebus.ServiceBusReceivedMessageContext; +import com.azure.messaging.servicebus.models.DeadLetterOptions; +import io.ballerina.runtime.api.values.BObject; +import org.ballerinax.asb.util.ASBUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + + + +/** + * Perform operations on dispatched messages. + */ +public class Caller { + + private static final Logger LOGGER = LoggerFactory.getLogger(MessageListener.class); + + /** + * Complete Messages from Queue or Subscription based on messageLockToken. + * + * @param caller Ballerina Caller object. + * @param lockToken Message lock token. + * @return InterruptedException or ServiceBusException on failure to complete + * the message. + */ + public static Object complete(BObject caller, Object lockToken, Object logLevel) { + try { + ServiceBusReceivedMessageContext context = (ServiceBusReceivedMessageContext) caller + .getNativeData(lockToken.toString()); + context.complete(); + LOGGER.debug("Completing the message(Message Id: " + context.getMessage().getMessageId() + + ") using its lock token from " + lockToken.toString()); + return null; + } catch (ServiceBusException e) { + return ASBUtils.createErrorValue("Exception while completing message" + e.getMessage()); + } + } + + /** + * Abandon message & make available again for processing from Queue or + * Subscription based on messageLockToken. + * + * @param caller Ballerina Caller object. + * @param lockToken Message lock token. + * @return InterruptedException or ServiceBusException on failure to abandon the + * message. + */ + public static Object abandon(BObject caller, Object lockToken, Object logLevel) { + try { + ServiceBusReceivedMessageContext context = (ServiceBusReceivedMessageContext) caller + .getNativeData(lockToken.toString()); + context.abandon(); + LOGGER.debug("Abandoned the message(Message Id: " + context.getMessage().getMessageId() + + ") using its lock token from " + lockToken.toString()); + return null; + } catch (ServiceBusException e) { + return ASBUtils.createErrorValue("Exception while abandon message" + e.getMessage()); + } + } + + /** + * Dead-Letter the message & moves the message to the Dead-Letter Queue based on + * messageLockToken. + * + * @param caller Ballerina Caller object. + * @param lockToken Message lock token. + * @param deadLetterReason The dead letter reason. + * @param deadLetterErrorDescription The dead letter error description. + * @return InterruptedException or ServiceBusException on failure to dead letter + * the message. + */ + public static Object deadLetter(BObject caller, Object lockToken, Object deadLetterReason, + Object deadLetterErrorDescription, Object logLevel) { + try { + ServiceBusReceivedMessageContext context = (ServiceBusReceivedMessageContext) caller + .getNativeData(lockToken.toString()); + DeadLetterOptions options = new DeadLetterOptions() + .setDeadLetterErrorDescription(deadLetterErrorDescription.toString()); + options.setDeadLetterReason(deadLetterReason.toString()); + context.deadLetter(options); + LOGGER.debug("Done deadLetter the message(Message Id: " + context.getMessage().getMessageId() + + ") using its lock token from " + lockToken.toString()); + return null; + } catch (ServiceBusException e) { + return ASBUtils.createErrorValue("Exception while dead lettering message" + e.getMessage()); + } + } + + /** + * Defer the message in a Queue or Subscription based on messageLockToken. + * + * @param caller Ballerina Caller object. + * @param lockToken Message lock token. + * @return InterruptedException or ServiceBusException on failure to defer the + * message. + */ + public static Object defer(BObject caller, Object lockToken, Object logLevel) { + try { + ServiceBusReceivedMessageContext context = (ServiceBusReceivedMessageContext) caller + .getNativeData(lockToken.toString()); + context.defer(); + LOGGER.debug("Deferred the message(Message Id: " + context.getMessage().getMessageId() + + ") using its lock token from " + lockToken.toString()); + return null; + } catch (ServiceBusException e) { + return ASBUtils.createErrorValue("Exception while deferring message" + e.getMessage()); + } + } +} diff --git a/native/src/main/java/org/ballerinax/asb/listener/MessageDispatcher.java b/native/src/main/java/org/ballerinax/asb/listener/MessageDispatcher.java new file mode 100644 index 00000000..a7028c9a --- /dev/null +++ b/native/src/main/java/org/ballerinax/asb/listener/MessageDispatcher.java @@ -0,0 +1,375 @@ +/* + * Copyright (c) 2023, WSO2 LLC. (http://www.wso2.org). + * + * WSO2 LLC. licenses this file to you under the Apache License, + * Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.ballerinax.asb.listener; + +import com.azure.core.amqp.models.AmqpAnnotatedMessage; +import com.azure.core.amqp.models.AmqpMessageBodyType; +import com.azure.messaging.servicebus.ServiceBusClientBuilder; +import com.azure.messaging.servicebus.ServiceBusClientBuilder.ServiceBusProcessorClientBuilder; +import com.azure.messaging.servicebus.ServiceBusErrorContext; +import com.azure.messaging.servicebus.ServiceBusException; +import com.azure.messaging.servicebus.ServiceBusFailureReason; +import com.azure.messaging.servicebus.ServiceBusProcessorClient; +import com.azure.messaging.servicebus.ServiceBusReceivedMessage; +import com.azure.messaging.servicebus.ServiceBusReceivedMessageContext; +import com.azure.messaging.servicebus.models.ServiceBusReceiveMode; +import io.ballerina.runtime.api.PredefinedTypes; +import io.ballerina.runtime.api.Runtime; +import io.ballerina.runtime.api.async.Callback; +import io.ballerina.runtime.api.async.StrandMetadata; +import io.ballerina.runtime.api.creators.ValueCreator; +import io.ballerina.runtime.api.types.MethodType; +import io.ballerina.runtime.api.utils.StringUtils; +import io.ballerina.runtime.api.values.BError; +import io.ballerina.runtime.api.values.BMap; +import io.ballerina.runtime.api.values.BObject; +import io.ballerina.runtime.api.values.BString; +import org.ballerinax.asb.util.ASBConstants; +import org.ballerinax.asb.util.ASBUtils; +import org.ballerinax.asb.util.ModuleUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.time.Duration; +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.CountDownLatch; + +/** + * Creates underlying listener and dispatches messages with data binding. + */ +public class MessageDispatcher { + + private static final Logger LOGGER = LoggerFactory.getLogger(MessageDispatcher.class); + + private Runtime runtime; + private BObject service; + private BObject caller; + private ServiceBusProcessorClient messageProcessor; + private boolean isStarted = false; + + /** + * Initializes the Message Dispatcher. + * + * @param service Ballerina service instance + * @param runtime Ballerina runtime instance + * @param sharedClientBuilder ASB message builder instance common to the + * listener + * @throws IllegalStateException If input values are wrong + * @throws IllegalArgumentException If queueName/topicname not set + * @throws NullPointerException If callbacks are not set + */ + MessageDispatcher(Runtime runtime, BObject service, + BObject caller, ServiceBusClientBuilder sharedClientBuilder) { + + this.runtime = runtime; + this.service = service; + this.caller = caller; + this.messageProcessor = createMessageProcessor(sharedClientBuilder); + LOGGER.debug("ServiceBusMessageDispatcher initialized"); + } + + private ServiceBusProcessorClient createMessageProcessor(ServiceBusClientBuilder sharedClientBuilder) { + String queueName = ASBUtils.getServiceConfigStringValue(service, ASBConstants.QUEUE_NAME_CONFIG_KEY); + String topicName = ASBUtils.getServiceConfigStringValue(service, ASBConstants.TOPIC_NAME_CONFIG_KEY); + String subscriptionName = ASBUtils.getServiceConfigStringValue(service, + ASBConstants.SUBSCRIPTION_NAME_CONFIG_KEY); + boolean isPeekLockModeEnabled = ASBUtils.isPeekLockModeEnabled(service); + int maxConcurrentCalls = ASBUtils.getServiceConfigSNumericValue(service, + ASBConstants.MAX_CONCURRENCY_CONFIG_KEY, ASBConstants.MAX_CONCURRENCY_DEFAULT); + int prefetchCount = ASBUtils.getServiceConfigSNumericValue(service, ASBConstants.MSG_PREFETCH_COUNT_CONFIG_KEY, + ASBConstants.MSG_PREFETCH_COUNT_DEFAULT); + int maxAutoLockRenewDuration = ASBUtils.getServiceConfigSNumericValue(service, + ASBConstants.LOCK_RENEW_DURATION_CONFIG_KEY, ASBConstants.LOCK_RENEW_DURATION_DEFAULT); + + LOGGER.debug( + "Initializing message listener with PeekLockModeEnabled = " + isPeekLockModeEnabled + + ", maxConcurrentCalls- " + + maxConcurrentCalls + ", prefetchCount - " + prefetchCount + + ", maxAutoLockRenewDuration(seconds) - " + maxAutoLockRenewDuration); + // create processor client using sharedClientBuilder attahed to the listener + ServiceBusProcessorClientBuilder clientBuilder = sharedClientBuilder.processor() + .maxConcurrentCalls(maxConcurrentCalls) + .disableAutoComplete() + .prefetchCount(prefetchCount); + if (!queueName.isEmpty()) { + if (isPeekLockModeEnabled) { + clientBuilder + .receiveMode(ServiceBusReceiveMode.PEEK_LOCK) + .queueName(queueName) + .maxAutoLockRenewDuration(Duration.ofSeconds(maxAutoLockRenewDuration)); + } else { + clientBuilder + .receiveMode(ServiceBusReceiveMode.RECEIVE_AND_DELETE) + .queueName(queueName); + } + } else if (!subscriptionName.isEmpty() && !topicName.isEmpty()) { + if (isPeekLockModeEnabled) { + clientBuilder + .receiveMode(ServiceBusReceiveMode.PEEK_LOCK) + .topicName(topicName) + .subscriptionName(subscriptionName) + .maxAutoLockRenewDuration(Duration.ofSeconds(maxAutoLockRenewDuration)); + } else { + clientBuilder + .receiveMode(ServiceBusReceiveMode.RECEIVE_AND_DELETE) + .topicName(topicName) + .subscriptionName(subscriptionName); + } + } + ServiceBusProcessorClient processorClient = clientBuilder.processMessage(t -> { + try { + this.processMessage(t); + } catch (Exception e) { + LOGGER.error("Exception occurred when processing the message", e); + } + }).processError(context -> { + try { + processError(context); + } catch (Exception e) { + LOGGER.error("Exception while processing the error found when processing the message", e); + } + }).buildProcessorClient(); + + return processorClient; + + } + + /** + * Starts receiving messages asynchronously and dispatch the messages to the + * attached service. + */ + public void startListeningAndDispatching() { + + this.messageProcessor.start(); + isStarted = true; + LOGGER.debug("[Message Dispatcher]Receiving started, identifier: " + messageProcessor.getIdentifier()); + + } + + /** + * Stops receiving messages and close the undlying ASB listener. + */ + public void stopListeningAndDispatching() { + this.messageProcessor.stop(); + LOGGER.debug("[Message Dispatcher]Receiving stopped, identifier: " + messageProcessor.getIdentifier()); + } + + /** + * Gets undeling ASB message listener instance. + * + * @return ServiceBusProcessorClient instance + */ + public ServiceBusProcessorClient getProcessorClient() { + return this.messageProcessor; + } + + /** + * Checks if dispatcher is running. + * + * @return true if dispatcher is listenering for messages + */ + public boolean isRunning() { + return isStarted; + } + + /** + * Handles the dispatching of message to the service. + * + * @param context ServiceBusReceivedMessageContext containing the ASB message + */ + private void processMessage(ServiceBusReceivedMessageContext context) throws InterruptedException { + MethodType method = this.getFunction(0, ASBConstants.FUNC_ON_MESSAGE); + if (method == null) { + return; + } + this.caller.addNativeData(context.getMessage().getLockToken(), context); + dispatchMessage(context.getMessage()); + } + + private MethodType getFunction(int index, String functionName) { + MethodType[] attachedFunctions = service.getType().getMethods(); + MethodType onMessageFunction = null; + if (functionName.equals(attachedFunctions[index].getName())) { + onMessageFunction = attachedFunctions[0]; + } + return onMessageFunction; + } + + /** + * Handles and dispatches errors occured when receiving messages to the attahed + * service. + * + * @param context ServiceBusErrorContext related to the ASB error + * @throws IOException + */ + private void processError(ServiceBusErrorContext context) throws InterruptedException { + MethodType method = this.getFunction(1, ASBConstants.FUNC_ON_ERROR); + if (method == null) { + return; + } + Throwable throwable = context.getException(); + ServiceBusException exception = (throwable instanceof ServiceBusException) ? (ServiceBusException) throwable : + new ServiceBusException(throwable, context.getErrorSource()); + ServiceBusFailureReason reason = exception.getReason(); + if (reason == ServiceBusFailureReason.MESSAGING_ENTITY_DISABLED + || reason == ServiceBusFailureReason.MESSAGING_ENTITY_NOT_FOUND + || reason == ServiceBusFailureReason.UNAUTHORIZED) { + LOGGER.error("An error occurred when processing with reason: " + reason + " message: " + + exception.getMessage()); + } + + Exception e = (throwable instanceof Exception) ? (Exception) throwable : new Exception(throwable); + BError error = ASBUtils.createErrorValue(e.getClass().getTypeName(), e); + BMap errorDetailBMap = getErrorMessage(context); + CountDownLatch countDownLatch = new CountDownLatch(1); + Callback callback = new ASBResourceCallback(countDownLatch); + executeResourceOnError(callback, errorDetailBMap, true, error, true); + countDownLatch.await(); + } + + /** + * Dispatches message to the service. + * + * @param message Received azure service bus message instance. + * @throws InterruptedException + */ + private void dispatchMessage(ServiceBusReceivedMessage message) throws InterruptedException { + BMap messageBObject = null; + CountDownLatch countDownLatch = new CountDownLatch(1); + Callback callback = new ASBResourceCallback(countDownLatch); + messageBObject = getReceivedMessage(message); + executeResourceOnMessage(callback, messageBObject, true, this.caller, true); + countDownLatch.await(); + } + + /** + * Prepares the message body content. + * + * @param receivedMessage ASB received message + * @return Object containing message data + */ + private Object getMessageContent(ServiceBusReceivedMessage receivedMessage) { + AmqpAnnotatedMessage rawAmqpMessage = receivedMessage.getRawAmqpMessage(); + AmqpMessageBodyType bodyType = rawAmqpMessage.getBody().getBodyType(); + switch (bodyType) { + case DATA: + return rawAmqpMessage.getBody().getFirstData(); + case VALUE: + Object amqpValue = rawAmqpMessage.getBody().getValue(); + LOGGER.debug("Received a message with messageId: " + receivedMessage.getMessageId() + + " AMQPMessageBodyType: {}" + + bodyType); + amqpValue = ASBUtils.convertAMQPToJava(receivedMessage.getMessageId(), amqpValue); + return amqpValue; + default: + throw new RuntimeException("Invalid message body type: " + receivedMessage.getMessageId()); + } + } + + /** + * Constructs Ballerina representaion of ASB message. + * + * @param receivedMessage Received ASB message + * @return BMap representing Ballerina record + */ + private BMap getReceivedMessage(ServiceBusReceivedMessage receivedMessage) { + Map map = new HashMap<>(); + Object body = getMessageContent(receivedMessage); + if (body instanceof byte[]) { + byte[] bodyA = (byte[]) body; + map.put("body", ValueCreator.createArrayValue(bodyA)); + } else { + map.put("body", body); + } + if (receivedMessage.getContentType() != null) { + map.put("contentType", StringUtils.fromString(receivedMessage.getContentType())); + } + map.put("messageId", StringUtils.fromString(receivedMessage.getMessageId())); + map.put("to", StringUtils.fromString(receivedMessage.getTo())); + map.put("replyTo", StringUtils.fromString(receivedMessage.getReplyTo())); + map.put("replyToSessionId", StringUtils.fromString(receivedMessage.getReplyToSessionId())); + map.put("label", StringUtils.fromString(receivedMessage.getSubject())); + map.put("sessionId", StringUtils.fromString(receivedMessage.getSessionId())); + map.put("correlationId", StringUtils.fromString(receivedMessage.getCorrelationId())); + map.put("partitionKey", StringUtils.fromString(receivedMessage.getPartitionKey())); + map.put("timeToLive", (int) receivedMessage.getTimeToLive().getSeconds()); + map.put("sequenceNumber", (int) receivedMessage.getSequenceNumber()); + map.put("lockToken", StringUtils.fromString(receivedMessage.getLockToken())); + map.put("deliveryCount", (int) receivedMessage.getDeliveryCount()); + map.put("enqueuedTime", StringUtils.fromString(receivedMessage.getEnqueuedTime().toString())); + map.put("enqueuedSequenceNumber", (int) receivedMessage.getEnqueuedSequenceNumber()); + map.put("deadLetterErrorDescription", StringUtils.fromString(receivedMessage.getDeadLetterErrorDescription())); + map.put("deadLetterReason", StringUtils.fromString(receivedMessage.getDeadLetterReason())); + map.put("deadLetterSource", StringUtils.fromString(receivedMessage.getDeadLetterSource())); + map.put("state", StringUtils.fromString(receivedMessage.getState().toString())); + BMap applicationProperties = ValueCreator.createRecordValue(ModuleUtils.getModule(), + ASBConstants.APPLICATION_PROPERTIES); + Object appProperties = ASBUtils.toBMap(receivedMessage.getApplicationProperties()); + map.put("applicationProperties", ValueCreator.createRecordValue(applicationProperties, appProperties)); + BMap createRecordValue = ValueCreator.createRecordValue(ModuleUtils.getModule(), + ASBConstants.MESSAGE_RECORD, map); + return createRecordValue; + } + + /** + * Constructs Ballerina representation of ASB error. + * + * @param context ServiceBusErrorContext containing error detail + * @return BMap representing Ballerina record + */ + private BMap getErrorMessage(ServiceBusErrorContext context) { + Map map = new HashMap<>(); + map.put("entityPath", StringUtils.fromString(context.getEntityPath())); + map.put("className", StringUtils.fromString(context.getClass().getSimpleName())); + map.put("namespace", StringUtils.fromString(context.getFullyQualifiedNamespace())); + map.put("errorSource", StringUtils.fromString(context.getErrorSource().toString())); + Throwable throwable = context.getException(); + ServiceBusException exception = (throwable instanceof ServiceBusException) ? (ServiceBusException) throwable : + new ServiceBusException(throwable, context.getErrorSource()); + ServiceBusFailureReason reason = exception.getReason(); + map.put("reason", StringUtils.fromString(reason.toString())); + BMap createRecordValue = ValueCreator.createRecordValue(ModuleUtils.getModule(), + "ErrorContext", map); + return createRecordValue; + } + + private void executeResourceOnMessage(Callback callback, Object... args) { + StrandMetadata metaData = new StrandMetadata(ModuleUtils.getModule().getOrg(), + ModuleUtils.getModule().getName(), ModuleUtils.getModule().getMajorVersion(), + ASBConstants.FUNC_ON_MESSAGE); + executeResource(ASBConstants.FUNC_ON_MESSAGE, callback, metaData, args); + } + + private void executeResourceOnError(Callback callback, Object... args) { + StrandMetadata metaData = new StrandMetadata(ModuleUtils.getModule().getOrg(), + ModuleUtils.getModule().getName(), ModuleUtils.getModule().getMajorVersion(), + ASBConstants.FUNC_ON_ERROR); + executeResource(ASBConstants.FUNC_ON_ERROR, callback, metaData, args); + } + + private void executeResource(String function, Callback callback, StrandMetadata metaData, + Object... args) { + runtime.invokeMethodAsyncSequentially(service, function, null, metaData, callback, null, + PredefinedTypes.TYPE_NULL, args); + } +} + + diff --git a/native/src/main/java/org/ballerinax/asb/listener/MessageListener.java b/native/src/main/java/org/ballerinax/asb/listener/MessageListener.java new file mode 100644 index 00000000..8946a58a --- /dev/null +++ b/native/src/main/java/org/ballerinax/asb/listener/MessageListener.java @@ -0,0 +1,184 @@ +/* + * Copyright (c) 2023, WSO2 LLC. (http://www.wso2.org). + * + * WSO2 LLC. licenses this file to you under the Apache License, + * Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.ballerinax.asb.listener; + +import com.azure.messaging.servicebus.ServiceBusClientBuilder; +import io.ballerina.runtime.api.Environment; +import io.ballerina.runtime.api.Runtime; +import io.ballerina.runtime.api.values.BObject; +import org.ballerinax.asb.util.ASBUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.Map; + +/** + * ASB message listener representation binding it to a Ballerina service. + */ +public class MessageListener { + private static final Logger LOGGER = LoggerFactory.getLogger(MessageListener.class); + + private Runtime runtime; + private ArrayList services = new ArrayList<>(); + private Map dispatcherSet = new HashMap(); + private BObject caller; + private ServiceBusClientBuilder sharedConnectionBuilder; + private boolean started = false; + + /** + * Initializes Azure Service Bus listener. This creates a connection to the pointed + * Azure Service Bus. Actual listeners will get created with the information + * of attached services. + * + * @param connectionString Azure service bus connection string. + */ + public MessageListener(String connectionString, String logLevel) { + this.sharedConnectionBuilder = new ServiceBusClientBuilder().connectionString(connectionString); + LOGGER.debug("ServiceBusListnerClient initialized"); + } + + /** + * Attaches Caller object to the listener. + * + * @param caller object represeting Ballerina Caller + */ + public void externalInit(BObject caller) { + this.caller = caller; + } + + /** + * Attaches the service to the ASB listener endpoint. Here, a new ASB message processor client + * is created internally with the message dispatcher, but not started. + * + * @param environment Ballerina runtime + * @param listenerBObject Ballerina listener object + * @param service Ballerina service instance + * @return An error if failed to create IMessageReceiver connection instance + */ + public Object attach(Environment environment, BObject listenerBObject, BObject service) { + try { + runtime = environment.getRuntime(); + if (service == null) { + throw new IllegalArgumentException("Service object is null. Cannot attach to the listener"); + } + if (!services.contains(service)) { + // We only create the dispatcher object here, but not start + MessageDispatcher msgDispatcher = new MessageDispatcher(runtime, service, caller, + sharedConnectionBuilder); + services.add(service); + dispatcherSet.put(service, msgDispatcher); + } else { + throw new IllegalStateException("Service already attached."); + } + return null; + } catch (IllegalStateException | IllegalArgumentException ex) { + return ASBUtils.createErrorValue("Error when attaching service to the listener and stating processor.", ex); + } catch (Exception ex) { + return ASBUtils.createErrorValue("An unexpected error occurred.", ex); + } + } + + /** + * Starts consuming the messages on all the attached + * services if not already started. + * + * @param listenerBObject Ballerina listener object + * @return An error if failed to start the listener + */ + public Object start(BObject listenerBObject) { + if (services.isEmpty()) { + return ASBUtils.createErrorValue("No attached services found"); + } + for (BObject service : services) { + try { + startMessageDispatch(service); + } catch (Exception e) { + return ASBUtils.createErrorValue("Error while starting message listening for service = ", e); + } + } + started = true; + return null; + } + + /** + * Stops consuming messages and detaches the service from the ASB Listener + * endpoint. + * + * @param listenerBObject Ballerina listener object. + * @param service Ballerina service instance. + * @return An error if failed detaching the service. + */ + public Object detach(BObject listenerBObject, BObject service) { + try { + stopMessageDispatch(service); + } catch (Exception e) { + return ASBUtils.createErrorValue("Error while closing the processor client id = " + dispatcherSet + .get(service).getProcessorClient().getIdentifier() + " upon detaching service"); + } + services.remove(service); + dispatcherSet.remove(service); + return null; + } + + /** + * Stops consuming messages through all consumer services by terminating the + * listeners and connection. + * + * @param listenerBObject Ballerina listener object. + * @return An error if listener fails to stop. + */ + public Object stop(BObject listenerBObject) { + if (!started) { + return ASBUtils.createErrorValue("Listener has not started."); + } else { + for (BObject service : services) { + stopMessageDispatch(service); + } + services.clear(); + dispatcherSet.clear(); + } + return null; + } + + /** + * Stops consuming messages through all the consumer services and terminates the + * listeners and the connection with server. + * + * @param listenerBObject Ballerina listener object. + * @return An error if listener fails to abort the connection. + */ + public Object forceStop(BObject listenerBObject) { + stop(listenerBObject); + return null; + } + + private void stopMessageDispatch(BObject service) { + MessageDispatcher msgDispatcher = dispatcherSet.get(service); + msgDispatcher.stopListeningAndDispatching(); + } + + private void startMessageDispatch(BObject service) { + MessageDispatcher msgDispatcher = dispatcherSet.get(service); + if (!msgDispatcher.isRunning()) { + msgDispatcher.startListeningAndDispatching(); + } + } +} diff --git a/native/src/main/java/org/ballerinax/asb/sender/MessageSender.java b/native/src/main/java/org/ballerinax/asb/sender/MessageSender.java index ff794e0b..a404315f 100644 --- a/native/src/main/java/org/ballerinax/asb/sender/MessageSender.java +++ b/native/src/main/java/org/ballerinax/asb/sender/MessageSender.java @@ -296,7 +296,7 @@ private static ServiceBusMessage constructMessage(BMap message) if (message.containsKey(StringUtils.fromString(ASBConstants.APPLICATION_PROPERTY_KEY))) { BMap propertyBMap = (BMap) message.get(StringUtils.fromString( ASBConstants.APPLICATION_PROPERTY_KEY)); - Object propertyMap = propertyBMap.get(StringUtils.fromString(ASBConstants.APPLICATION_PROPERTIES)); + Object propertyMap = propertyBMap.get(StringUtils.fromString(ASBConstants.PROPERTIES)); Map map = ASBUtils.toMap((BMap) propertyMap); asbMessage.getApplicationProperties().putAll(map); } diff --git a/native/src/main/java/org/ballerinax/asb/util/ASBConstants.java b/native/src/main/java/org/ballerinax/asb/util/ASBConstants.java index 8e57d536..9ee4c2f7 100644 --- a/native/src/main/java/org/ballerinax/asb/util/ASBConstants.java +++ b/native/src/main/java/org/ballerinax/asb/util/ASBConstants.java @@ -86,7 +86,7 @@ public class ASBConstants { public static final String DEAD_LETTER_ERROR_DESCRIPTION = "deadLetterErrorDescription"; public static final String STATE = "state"; public static final String APPLICATION_PROPERTY_KEY = "applicationProperties"; - public static final String APPLICATION_PROPERTIES = "properties"; + public static final String PROPERTIES = "properties"; public static final int DEFAULT_TIME_TO_LIVE = 60; // In seconds public static final String DEFAULT_MESSAGE_LOCK_TOKEN = "00000000-0000-0000-0000-000000000000"; @@ -352,4 +352,19 @@ public class ASBConstants { // Rule List public static final String LIST_OF_RULES = "list"; public static final String LIST_OF_RULES_RECORD = "RuleList"; + + // Asynchronous Listener + public static final String APPLICATION_PROPERTIES = "ApplicationProperties"; + public static final BString PEEK_LOCK_ENABLE_CONFIG_KEY = StringUtils.fromString("peekLockModeEnabled"); + public static final String QUEUE_NAME_CONFIG_KEY = "queueName"; + public static final String TOPIC_NAME_CONFIG_KEY = "topicName"; + public static final String SUBSCRIPTION_NAME_CONFIG_KEY = "subscriptionName"; + public static final String MAX_CONCURRENCY_CONFIG_KEY = "maxConcurrency"; + public static final String MSG_PREFETCH_COUNT_CONFIG_KEY = "prefetchCount"; + public static final String LOCK_RENEW_DURATION_CONFIG_KEY = "maxAutoLockRenewDuration"; + public static final String LOG_LEVEL_CONGIG_KEY = "logLevel"; + public static final String EMPTY_STRING = ""; + public static final int MAX_CONCURRENCY_DEFAULT = 1; + public static final int LOCK_RENEW_DURATION_DEFAULT = 300; + public static final int MSG_PREFETCH_COUNT_DEFAULT = 0; } diff --git a/native/src/main/java/org/ballerinax/asb/util/ASBUtils.java b/native/src/main/java/org/ballerinax/asb/util/ASBUtils.java index 3e51bbe8..da159eda 100644 --- a/native/src/main/java/org/ballerinax/asb/util/ASBUtils.java +++ b/native/src/main/java/org/ballerinax/asb/util/ASBUtils.java @@ -41,17 +41,21 @@ import io.ballerina.runtime.api.creators.TypeCreator; import io.ballerina.runtime.api.creators.ValueCreator; import io.ballerina.runtime.api.types.ArrayType; +import io.ballerina.runtime.api.types.ErrorType; import io.ballerina.runtime.api.types.IntersectionType; import io.ballerina.runtime.api.types.MapType; +import io.ballerina.runtime.api.types.ObjectType; import io.ballerina.runtime.api.types.RecordType; import io.ballerina.runtime.api.types.Type; import io.ballerina.runtime.api.types.UnionType; import io.ballerina.runtime.api.utils.JsonUtils; import io.ballerina.runtime.api.utils.StringUtils; +import io.ballerina.runtime.api.utils.TypeUtils; import io.ballerina.runtime.api.utils.XmlUtils; import io.ballerina.runtime.api.values.BDecimal; import io.ballerina.runtime.api.values.BError; import io.ballerina.runtime.api.values.BMap; +import io.ballerina.runtime.api.values.BObject; import io.ballerina.runtime.api.values.BString; import io.ballerina.runtime.api.values.BTypedesc; import org.apache.qpid.proton.amqp.Binary; @@ -64,6 +68,7 @@ import java.math.BigDecimal; import java.nio.charset.StandardCharsets; import java.time.Duration; +import java.util.Arrays; import java.util.HashMap; import java.util.Map; import java.util.Objects; @@ -77,6 +82,8 @@ import static io.ballerina.runtime.api.TypeTags.STRING_TAG; import static io.ballerina.runtime.api.TypeTags.UNION_TAG; import static io.ballerina.runtime.api.TypeTags.XML_TAG; +import static io.ballerina.runtime.api.constants.RuntimeConstants.ORG_NAME_SEPARATOR; +import static io.ballerina.runtime.api.constants.RuntimeConstants.VERSION_SEPARATOR; import static io.ballerina.runtime.api.utils.TypeUtils.getReferredType; import static org.ballerinax.asb.util.ASBConstants.DELAY; import static org.ballerinax.asb.util.ASBConstants.MAX_DELAY; @@ -958,4 +965,107 @@ public static void addFieldIfPresent(Map map, String key, Object map.put(key, receivedProperty); } } + + /** + * Get the value as string or as empty based on the object value. + * + * @param value Input value. + * @return value as a string or empty. + */ + public static String valueToEmptyOrToString(Object value) { + return (value == null || Objects.equals(value.toString(), "")) ? null : value.toString(); + } + + /** + * Returns a Ballerina Error with the given String message. + * + * @param errorMessage The error message + * @return Resulting Ballerina Error + */ + public static BError createErrorValue(String errorMessage) { + return ErrorCreator.createError(StringUtils.fromString(errorMessage)); + } + + /** + * Returns a Ballerina Error with the given String message and exception. + * + * @param message The error message + * @param error The exception + * @return Resulting Ballerina Error + */ + public static BError createErrorValue(String message, Exception error) { + ErrorType errorType = TypeCreator.createErrorType(error.getClass().getTypeName(), ModuleUtils.getModule()); + String errorFromClass = error.getStackTrace()[0].getClassName(); + String errorMessage = "An error occurred while processing your request. "; + errorMessage += "Cause: " + error.getCause() + " "; + errorMessage += "Class: " + error.getClass() + " "; + BError er = ErrorCreator.createError(StringUtils.fromString(errorMessage)); + + BMap map = ValueCreator.createMapValue(); + map.put(StringUtils.fromString("Type"), StringUtils.fromString(error.getClass().getSimpleName())); + map.put(StringUtils.fromString("errorCause"), StringUtils.fromString(error.getCause().getClass().getName())); + map.put(StringUtils.fromString("message"), StringUtils.fromString(error.getMessage())); + map.put(StringUtils.fromString("stackTrace"), StringUtils.fromString(Arrays.toString(error.getStackTrace()))); + return ErrorCreator.createError(errorType, StringUtils.fromString(message + " error from " + errorFromClass), + er, map); + } + + /** + * Checks if PEEK LOCK mode is enabled for listening for messages. + * + * @param service Service instance having configuration + * @return true if enabled + */ + public static boolean isPeekLockModeEnabled(BObject service) { + BMap serviceConfig = getServiceConfig(service); + boolean peekLockEnabled = false; + if (serviceConfig != null && serviceConfig.containsKey(ASBConstants.PEEK_LOCK_ENABLE_CONFIG_KEY)) { + peekLockEnabled = serviceConfig.getBooleanValue(ASBConstants.PEEK_LOCK_ENABLE_CONFIG_KEY); + } + return peekLockEnabled; + } + + /** + * Obtain string value of a service level configuration. + * + * @param service Service instance + * @param key Key of the configuration + * @return String value of the given config key, or empty string if not found + */ + public static String getServiceConfigStringValue(BObject service, String key) { + BMap serviceConfig = getServiceConfig(service); + if (serviceConfig != null && serviceConfig.containsKey(StringUtils.fromString(key))) { + return serviceConfig.getStringValue(StringUtils.fromString(key)).getValue(); + } else { + return ASBConstants.EMPTY_STRING; + } + } + + /** + * Obtain numeric value of a service level configuration. + * + * @param service Service instance + * @param key Key of the configuration + * @return Integer value of the given config key, or null if not found + */ + public static Integer getServiceConfigSNumericValue(BObject service, String key, int defaultValue) { + BMap serviceConfig = getServiceConfig(service); + if (serviceConfig != null && serviceConfig.containsKey(StringUtils.fromString(key))) { + return serviceConfig.getIntValue(StringUtils.fromString(key)).intValue(); + } else { + return defaultValue; + } + } + + private static BMap getServiceConfig(BObject service) { + ObjectType serviceType = (ObjectType) TypeUtils.getReferredType(service.getType()); + @SuppressWarnings("unchecked") + BMap serviceConfig = (BMap) serviceType + .getAnnotation(StringUtils.fromString(ModuleUtils.getModule().getOrg() + ORG_NAME_SEPARATOR + + ModuleUtils.getModule().getName() + VERSION_SEPARATOR + + ModuleUtils.getModule().getMajorVersion() + ":" + + ASBConstants.SERVICE_CONFIG)); + return serviceConfig; + } + }