diff --git a/ballerina/Dependencies.toml b/ballerina/Dependencies.toml index 1932986..7e95fda 100644 --- a/ballerina/Dependencies.toml +++ b/ballerina/Dependencies.toml @@ -15,12 +15,35 @@ modules = [ {org = "ballerina", packageName = "jballerina.java", moduleName = "jballerina.java"} ] +[[package]] +org = "ballerina" +name = "lang.error" +version = "0.0.0" +scope = "testOnly" +dependencies = [ + {org = "ballerina", name = "jballerina.java"} +] + +[[package]] +org = "ballerina" +name = "test" +version = "0.0.0" +scope = "testOnly" +dependencies = [ + {org = "ballerina", name = "jballerina.java"}, + {org = "ballerina", name = "lang.error"} +] +modules = [ + {org = "ballerina", packageName = "test", moduleName = "test"} +] + [[package]] org = "ballerinax" name = "ibm.ibmmq" version = "0.1.0" dependencies = [ - {org = "ballerina", name = "jballerina.java"} + {org = "ballerina", name = "jballerina.java"}, + {org = "ballerina", name = "test"} ] modules = [ {org = "ballerinax", packageName = "ibm.ibmmq", moduleName = "ibm.ibmmq"} diff --git a/ballerina/build.gradle b/ballerina/build.gradle index 24b2730..3b1cce2 100644 --- a/ballerina/build.gradle +++ b/ballerina/build.gradle @@ -102,51 +102,51 @@ task commitTomlFiles { } } -// task startActiveMQServer() { -// doLast { -// if (!Os.isFamily(Os.FAMILY_WINDOWS)) { -// def stdOut = new ByteArrayOutputStream() -// exec { -// commandLine 'sh', '-c', "docker ps --filter name=activemq-test" -// standardOutput = stdOut -// } -// if (!stdOut.toString().contains("activemq-test")) { -// println "Starting ActiveMQ server." -// exec { -// commandLine 'sh', '-c', "docker-compose -f tests/resources/docker-compose.yaml up -d" -// standardOutput = stdOut -// } -// println stdOut.toString() -// sleep(5 * 1000) -// } else { -// println "ActiveMQ server is already running." -// } -// } -// } -// } - -// task stopActiveMQServer() { -// doLast { -// if (!Os.isFamily(Os.FAMILY_WINDOWS)) { -// def stdOut = new ByteArrayOutputStream() -// exec { -// commandLine 'sh', '-c', "docker ps --filter name=activemq-test" -// standardOutput = stdOut -// } -// if (stdOut.toString().contains("activemq-test")) { -// println "Stopping ActiveMQ server." -// exec { -// commandLine 'sh', '-c', "docker-compose -f tests/resources/docker-compose.yaml rm -svf" -// standardOutput = stdOut -// } -// println stdOut.toString() -// sleep(5 * 1000) -// } else { -// println "ActiveMQ server is not started." -// } -// } -// } -// } + task startIBMMQServer() { + doLast { + if (!Os.isFamily(Os.FAMILY_WINDOWS)) { + def stdOut = new ByteArrayOutputStream() + exec { + commandLine 'sh', '-c', "docker ps --filter name=ibmmq-test" + standardOutput = stdOut + } + if (!stdOut.toString().contains("ibmmq-test")) { + println "Starting IBMMQ server." + exec { + commandLine 'sh', '-c', "docker-compose -f tests/resources/docker-compose.yaml up -d" + standardOutput = stdOut + } + println stdOut.toString() + sleep(5 * 1000) + } else { + println "IBMMQ server is already running." + } + } + } + } + + task stopIBMMQServer() { + doLast { + if (!Os.isFamily(Os.FAMILY_WINDOWS)) { + def stdOut = new ByteArrayOutputStream() + exec { + commandLine 'sh', '-c', "docker ps --filter name=ibmmq-test" + standardOutput = stdOut + } + if (stdOut.toString().contains("ibmmq-test")) { + println "Stopping IBMMQ server." + exec { + commandLine 'sh', '-c', "docker-compose -f tests/resources/docker-compose.yaml rm -svf" + standardOutput = stdOut + } + println stdOut.toString() + sleep(5 * 1000) + } else { + println "IBMMQ server is not started." + } + } + } + } publishing { publications { @@ -168,8 +168,8 @@ publishing { updateTomlFiles.dependsOn copyStdlibs -// test.dependsOn startActiveMQServer -// build.finalizedBy stopActiveMQServer + test.dependsOn startIBMMQServer + build.finalizedBy stopIBMMQServer build.dependsOn ":ibm.ibmmq-native:build" build.dependsOn "generatePomFileForMavenPublication" diff --git a/ballerina/constants.bal b/ballerina/constants.bal new file mode 100644 index 0000000..15729b7 --- /dev/null +++ b/ballerina/constants.bal @@ -0,0 +1,51 @@ +// Copyright (c) 2023, WSO2 LLC. (http://www.wso2.org) All Rights Reserved. +// +// WSO2 LLC. licenses this file to you under the Apache License, +// Version 2.0 (the "License"); you may not use this file except +// in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +// Option to indicate whether the topic is being opened for either publication or subscription. +public const OPEN_AS_SUBSCRIPTION = 1; +public const OPEN_AS_PUBLICATION = 2; + +// Options that control the opening of the queue for a consumer. +public const MQOO_BROWSE = 8; +public const MQOO_INPUT_AS_Q_DEF = 1; +public const MQOO_INPUT_EXCLUSIVE = 4; +public const MQOO_INPUT_SHARED = 2; + +// Options that control the opening of the topic for either publication or subscription. +public const MQOO_ALTERNATE_USER_AUTHORITY = 4096; +public const MQOO_BIND_AS_Q_DEF = 0; +public const MQOO_FAIL_IF_QUIESCING = 8192; +public const MQOO_OUTPUT = 16; +public const MQOO_PASS_ALL_CONTEXT = 512; +public const MQOO_PASS_IDENTITY_CONTEXT = 256; +public const MQOO_SET_ALL_CONTEXT = 2048; +public const MQOO_SET_IDENTITY_CONTEXT = 1024; +public const MQSO_CREATE = 2; + +// Options related to the the get message in a topic. +public const MQGMO_WAIT = 1; +public const MQGMO_NO_WAIT = 0; +public const MQGMO_SYNCPOINT = 2; +public const MQGMO_NO_SYNCPOINT = 4; +public const MQGMO_BROWSE_FIRST = 16; +public const MQGMO_BROWSE_NEXT = 32; +public const MQGMO_BROWSE_MSG_UNDER_CURSOR = 2048; +public const MQGMO_MSG_UNDER_CURSOR = 256; +public const MQGMO_LOCK = 512; +public const MQGMO_UNLOCK = 1024; +public const MQGMO_ACCEPT_TRUNCATED_MSG = 64; +public const MQGMO_FAIL_IF_QUIESCING = 8192; +public const MQGMO_CONVERT = 16384; diff --git a/ballerina/destination.bal b/ballerina/destination.bal index b214c94..2e48a4e 100644 --- a/ballerina/destination.bal +++ b/ballerina/destination.bal @@ -14,16 +14,50 @@ // specific language governing permissions and limitations // under the License. +import ballerina/jballerina.java; + public type Destination distinct client object { remote function put(Message message) returns Error?; - remote function get() returns Message|Error?; + remote function get(*GetMessageOptions getMessageOptions) returns Message|Error?; + + remote function close() returns Error?; }; -public type Queue distinct client object { +public isolated client class Queue { *Destination; -}; -public type Topic distinct client object { + remote function put(Message message) returns Error? = + @java:Method { + 'class: "io.ballerina.lib.ibm.ibmmq.Queue" + } external; + + remote function get(*GetMessageOptions getMessageOptions) returns Message|Error? = + @java:Method { + 'class: "io.ballerina.lib.ibm.ibmmq.Queue" + } external; + + remote function close() returns Error? = + @java:Method { + 'class: "io.ballerina.lib.ibm.ibmmq.Queue" + } external; +} + +public isolated client class Topic { *Destination; + + remote function put(Message message) returns Error? = + @java:Method { + 'class: "io.ballerina.lib.ibm.ibmmq.Topic" + } external; + + remote function get(*GetMessageOptions getMessageOptions) returns Message|Error? = + @java:Method { + 'class: "io.ballerina.lib.ibm.ibmmq.Topic" + } external; + + remote function close() returns Error? = + @java:Method { + 'class: "io.ballerina.lib.ibm.ibmmq.Topic" + } external; }; diff --git a/ballerina/errors.bal b/ballerina/errors.bal index 70643ec..2438af3 100644 --- a/ballerina/errors.bal +++ b/ballerina/errors.bal @@ -14,4 +14,15 @@ // specific language governing permissions and limitations // under the License. -public type Error distinct error; +public type Error distinct error; + +# The error details type for the module. +# +# + reasonCode - The reason code for the error +# + errorCode - The error code for the error +# + completionCode - The completion code for the error +public type ErrorDetails record {| + int reasonCode?; + string errorCode?; + int completionCode?; +|}; diff --git a/ballerina/queue_manager.bal b/ballerina/queue_manager.bal index 2d63fca..2e6635f 100644 --- a/ballerina/queue_manager.bal +++ b/ballerina/queue_manager.bal @@ -27,11 +27,19 @@ public isolated class QueueManager { 'class: "io.ballerina.lib.ibm.ibmmq.QueueManager" } external; - public isolated function accessQueue(string queueName, ConnectionOpenOptions options) returns Queue|Error { - return error Error("Not implemented"); - } + public isolated function accessQueue(string queueName, int options) returns Queue|Error = + @java:Method { + 'class: "io.ballerina.lib.ibm.ibmmq.QueueManager" + } external; - public isolated function accessTopic(string topicName, string topicString, ConnectionOpenOptions options) returns Topic|Error { - return error Error("Not implemented"); - } + public isolated function accessTopic(string topicName, string topicString, OPEN_TOPIC_OPTION openTopicOption, + int options) returns Topic|Error = + @java:Method { + 'class: "io.ballerina.lib.ibm.ibmmq.QueueManager" + } external; + + public isolated function disconnect() returns Error? = + @java:Method { + 'class: "io.ballerina.lib.ibm.ibmmq.QueueManager" + } external; } diff --git a/ballerina/tests/pub_sub_tests.bal b/ballerina/tests/pub_sub_tests.bal new file mode 100644 index 0000000..297ad62 --- /dev/null +++ b/ballerina/tests/pub_sub_tests.bal @@ -0,0 +1,196 @@ +// Copyright (c) 2023, WSO2 LLC. (http://www.wso2.org) All Rights Reserved. +// +// WSO2 LLC. licenses this file to you under the Apache License, +// Version 2.0 (the "License"); you may not use this file except +// in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +import ballerina/test; + +@test:Config {} +function basicPublisherSubscriberTest() returns error? { + QueueManager queueManager = check new QueueManager(name = "QM1", host = "localhost", channel = "DEV.APP.SVRCONN"); + Topic subscriber = check queueManager.accessTopic("dev", "DEV.BASE.TOPIC", OPEN_AS_SUBSCRIPTION, MQSO_CREATE); + Topic publisher = check queueManager.accessTopic("dev", "DEV.BASE.TOPIC", OPEN_AS_PUBLICATION, MQOO_OUTPUT); + check publisher->put({ + payload: "Hello World".toBytes() + }); + Message? message = check subscriber->get(); + if message !is () { + test:assertEquals(string:fromBytes(message.payload), "Hello World"); + } else { + test:assertFail("Expected a value for message"); + } + check subscriber->close(); + check publisher->close(); + check queueManager.disconnect(); +} + +@test:Config {} +function pubSubMultipleMessagesInOrderTest() returns error? { + QueueManager queueManager = check new QueueManager(name = "QM1", host = "localhost", channel = "DEV.APP.SVRCONN"); + Topic subscriber = check queueManager.accessTopic("dev", "DEV.BASE.TOPIC", OPEN_AS_SUBSCRIPTION, MQSO_CREATE); + Topic publisher = check queueManager.accessTopic("dev", "DEV.BASE.TOPIC", OPEN_AS_PUBLICATION, MQOO_OUTPUT); + foreach int i in 0 ... 4 { + check publisher->put({ + payload: i.toString().toBytes() + }); + } + foreach int i in 0 ... 4 { + Message? message = check subscriber->get(waitInterval = 2); + if message !is () { + test:assertEquals(string:fromBytes(message.payload), i.toString()); + } else { + test:assertFail("Expected a value for message"); + } + } + check subscriber->close(); + check publisher->close(); + check queueManager.disconnect(); +} + +@test:Config {} +function subscribeWithFiniteTimeoutTest() returns error? { + QueueManager queueManager = check new QueueManager(name = "QM1", host = "localhost", channel = "DEV.APP.SVRCONN"); + Topic subscriber = check queueManager.accessTopic("dev", "DEV.BASE.TOPIC", OPEN_AS_SUBSCRIPTION, MQSO_CREATE); + Topic publisher = check queueManager.accessTopic("dev", "DEV.BASE.TOPIC", OPEN_AS_PUBLICATION, MQOO_OUTPUT); + check publisher->put({ + payload: "Hello World".toBytes() + }); + Message? message = check subscriber->get(waitInterval = 5); + if message !is () { + test:assertEquals(string:fromBytes(message.payload), "Hello World"); + } else { + test:assertFail("Expected a value for message"); + } + check subscriber->close(); + check publisher->close(); + check queueManager.disconnect(); +} + +@test:Config {} +function subscribeWithoutPublishTest() returns error? { + QueueManager queueManager = check new QueueManager(name = "QM1", host = "localhost", channel = "DEV.APP.SVRCONN"); + Topic subscriber = check queueManager.accessTopic("dev", "DEV.BASE.TOPIC", OPEN_AS_SUBSCRIPTION, MQSO_CREATE); + Message|Error? result = subscriber->get(waitInterval = 5); + test:assertTrue(result is ()); + check subscriber->close(); + check queueManager.disconnect(); +} + +@test:Config {} +function publishToNonExistingTopicTest() returns error? { + QueueManager queueManager = check new QueueManager(name = "QM1", host = "localhost", channel = "DEV.APP.SVRCONN"); + Topic|Error result = queueManager.accessTopic("dev", "NON.EXISTING.TOPIC", OPEN_AS_PUBLICATION, MQOO_OUTPUT); + if result is Error { + test:assertEquals(result.message(), "Error occurred while accessing topic: MQJE001: Completion Code '2', Reason '2085'."); + test:assertEquals(result.detail().reasonCode, 2085); + test:assertEquals(result.detail().completionCode, 2); + } else { + test:assertFail("Expected an error"); + } + check queueManager.disconnect(); +} + +@test:Config {} +function subscribeToNonExistingTopicTest() returns error? { + QueueManager queueManager = check new QueueManager(name = "QM1", host = "localhost", channel = "DEV.APP.SVRCONN"); + Topic|Error result = queueManager.accessTopic("dev", "NON.EXISTING.TOPIC", OPEN_AS_SUBSCRIPTION, MQSO_CREATE); + if result is Error { + test:assertEquals(result.message(), "Error occurred while accessing topic: MQJE001: Completion Code '2', Reason '2085'."); + test:assertEquals(result.detail().reasonCode, 2085); + test:assertEquals(result.detail().completionCode, 2); + } else { + test:assertFail("Expected an error"); + } + check queueManager.disconnect(); +} + +@test:Config {} +function subscribeWithInvalidTopicNameTest() returns error? { + QueueManager queueManager = check new QueueManager(name = "QM1", host = "localhost", channel = "DEV.APP.SVRCONN"); + Topic|Error result = queueManager.accessTopic("dev", "INVALID TOPIC", OPEN_AS_SUBSCRIPTION, MQSO_CREATE); + if result is Error { + test:assertEquals(result.message(), "Error occurred while accessing topic: MQJE001: Completion Code '2', Reason '2152'."); + test:assertEquals(result.detail().reasonCode, 2152); + test:assertEquals(result.detail().completionCode, 2); + } else { + test:assertFail("Expected an error"); + } + check queueManager.disconnect(); +} + +@test:Config {} +function publishWithInvalidTopicNameTest() returns error? { + QueueManager queueManager = check new QueueManager(name = "QM1", host = "localhost", channel = "DEV.APP.SVRCONN"); + Topic|Error result = queueManager.accessTopic("dev", "INVALID TOPIC", OPEN_AS_PUBLICATION, MQOO_OUTPUT); + if result is Error { + test:assertEquals(result.message(), "Error occurred while accessing topic: MQJE001: Completion Code '2', Reason '2085'."); + test:assertEquals(result.detail().reasonCode, 2085); + test:assertEquals(result.detail().completionCode, 2); + } else { + test:assertFail("Expected an error"); + } + check queueManager.disconnect(); +} + +@test:Config {} +function accessTopicAfterQMDisconnectTest() returns error? { + QueueManager queueManager = check new QueueManager(name = "QM1", host = "localhost", channel = "DEV.APP.SVRCONN"); + check queueManager.disconnect(); + Topic|Error result = queueManager.accessTopic("dev", "DEV.BASE.TOPIC", OPEN_AS_PUBLICATION, MQOO_OUTPUT); + if result is Error { + test:assertEquals(result.message(), "Error occurred while accessing topic: MQJE001: An MQException " + + "occurred: Completion Code '2', Reason '2018'\n'MQJI002: Not connected to a queue manager.'."); + test:assertEquals(result.detail().reasonCode, 2018); + test:assertEquals(result.detail().completionCode, 2); + } else { + test:assertFail("Expected an error"); + } +} + +@test:Config {} +function putToTopicAfterTopicCloseTest() returns error? { + QueueManager queueManager = check new QueueManager(name = "QM1", host = "localhost", channel = "DEV.APP.SVRCONN"); + Topic publisher = check queueManager.accessTopic("dev", "DEV.BASE.TOPIC", OPEN_AS_PUBLICATION, MQOO_OUTPUT); + check publisher->close(); + Error? result = publisher->put({ + payload: "Hello World".toBytes() + }); + if result is Error { + test:assertEquals(result.message(), "Error occurred while putting a message to the topic: MQJE001: " + + "An MQException occurred: Completion Code '2', Reason '2019'\n'MQJI027: The queue has been closed.'."); + test:assertEquals(result.detail().reasonCode, 2019); + test:assertEquals(result.detail().completionCode, 2); + } else { + test:assertFail("Expected an error"); + } + check queueManager.disconnect(); +} + +@test:Config {} +function putToTopicAfterQMDisconnectTest() returns error? { + QueueManager queueManager = check new QueueManager(name = "QM1", host = "localhost", channel = "DEV.APP.SVRCONN"); + Topic publisher = check queueManager.accessTopic("dev", "DEV.BASE.TOPIC", OPEN_AS_PUBLICATION, MQOO_OUTPUT); + check queueManager.disconnect(); + Error? result = publisher->put({ + payload: "Hello World".toBytes() + }); + if result is Error { + test:assertEquals(result.message(), "Error occurred while putting a message to the topic: MQJE001: An MQException " + + "occurred: Completion Code '2', Reason '2018'\n'MQJI002: Not connected to a queue manager.'."); + test:assertEquals(result.detail().reasonCode, 2018); + test:assertEquals(result.detail().completionCode, 2); + } else { + test:assertFail("Expected an error"); + } + check queueManager.disconnect(); +} diff --git a/ballerina/tests/resources/docker-compose.yaml b/ballerina/tests/resources/docker-compose.yaml new file mode 100644 index 0000000..87f9597 --- /dev/null +++ b/ballerina/tests/resources/docker-compose.yaml @@ -0,0 +1,12 @@ +version: "3.9" + +services: + mq: + image: icr.io/ibm-messaging/mq:latest + container_name: ibmmq-test + ports: + - "1414:1414" + - "9443:9443" + environment: + - LICENSE=accept + - MQ_QMGR_NAME=QM1 diff --git a/ballerina/types.bal b/ballerina/types.bal index 47e7f4d..ecb6bba 100644 --- a/ballerina/types.bal +++ b/ballerina/types.bal @@ -14,6 +14,8 @@ // specific language governing permissions and limitations // under the License. +public type OPEN_TOPIC_OPTION OPEN_AS_SUBSCRIPTION|OPEN_AS_PUBLICATION; + public type QueueManagerConfiguration record {| string name; string host; @@ -23,19 +25,17 @@ public type QueueManagerConfiguration record {| string password?; |}; -public enum ConnectionOpenOptions { - MQOO_OUTPUT = "MQOO_OUTPUT", - MQOO_INPUT_AS_Q_DEF = "MQOO_INPUT_AS_Q_DEF", - MQOO_INPUT_EXCLUSIVE = "MQOO_INPUT_EXCLUSIVE", - MQOO_INPUT_SHARED = "MQOO_INPUT_SHARED" -} +public type GetMessageOptions record {| + int options = MQGMO_NO_WAIT; + int waitInterval = 10; +|}; public type Property record {| - map descriptor; - boolean|byte|byte[]|decimal|float|int|string property; + map descriptor?; + boolean|byte|byte[]|decimal|float|int|string value; |}; public type Message record {| - map properties; + map properties?; byte[] payload; |}; diff --git a/native/src/main/java/io/ballerina/lib/ibm.ibmmq/CommonUtils.java b/native/src/main/java/io/ballerina/lib/ibm.ibmmq/CommonUtils.java index 6d354df..295a071 100644 --- a/native/src/main/java/io/ballerina/lib/ibm.ibmmq/CommonUtils.java +++ b/native/src/main/java/io/ballerina/lib/ibm.ibmmq/CommonUtils.java @@ -18,23 +18,192 @@ package io.ballerina.lib.ibm.ibmmq; +import com.ibm.mq.MQException; +import com.ibm.mq.MQGetMessageOptions; +import com.ibm.mq.MQMessage; +import com.ibm.mq.MQPropertyDescriptor; +import io.ballerina.runtime.api.PredefinedTypes; import io.ballerina.runtime.api.creators.ErrorCreator; +import io.ballerina.runtime.api.creators.TypeCreator; +import io.ballerina.runtime.api.creators.ValueCreator; import io.ballerina.runtime.api.utils.StringUtils; import io.ballerina.runtime.api.values.BError; import io.ballerina.runtime.api.values.BMap; import io.ballerina.runtime.api.values.BString; +import java.io.IOException; +import java.util.Collections; +import java.util.Enumeration; +import java.util.Objects; import java.util.Optional; +import static io.ballerina.lib.ibm.ibmmq.Constants.IBMMQ_ERROR; +import static io.ballerina.lib.ibm.ibmmq.ModuleUtils.getModule; + /** * {@code CommonUtils} contains the common utility functions for the Ballerina IBM MQ connector. */ public class CommonUtils { + private static final String ERROR_DETAILS = "ErrorDetails"; + private static final BString ERROR_REASON_CODE = StringUtils.fromString("reasonCode"); + private static final BString ERROR_ERROR_CODE = StringUtils.fromString("errorCode"); + private static final BString ERROR_COMPLETION_CODE = StringUtils.fromString("completionCode"); + private static final BString MESSAGE_PAYLOAD = StringUtils.fromString("payload"); + private static final BString MESSAGE_PROPERTIES = StringUtils.fromString("properties"); + private static final BString MESSAGE_PROPERTY = StringUtils.fromString("property"); + private static final String BPROPERTY = "Property"; + private static final String BMESSAGE_NAME = "Message"; + private static final BString PD_VERSION = StringUtils.fromString("version"); + private static final BString PD_COPY_OPTIONS = StringUtils.fromString("copyOptions"); + private static final BString PD_OPTIONS = StringUtils.fromString("options"); + private static final BString PD_SUPPORT = StringUtils.fromString("support"); + private static final BString PD_CONTEXT = StringUtils.fromString("context"); + private static final BString PROPERTY_VALUE = StringUtils.fromString("value"); + private static final BString PROPERTY_DESCRIPTOR = StringUtils.fromString("descriptor"); + private static final BString WAIT_INTERVAL = StringUtils.fromString("waitInterval"); + private static final BString OPTIONS = StringUtils.fromString("options"); + + private static final MQPropertyDescriptor defaultPropertyDescriptor = new MQPropertyDescriptor(); + + public static MQMessage getMqMessageFromBMessage(BMap bMessage) { + byte[] payload = bMessage.getArrayValue(MESSAGE_PAYLOAD).getBytes(); + MQMessage mqMessage = new MQMessage(); + try { + mqMessage.write(payload); + } catch (IOException e) { + throw createError(IBMMQ_ERROR, + String.format("Error occurred while populating payload: %s", e.getMessage()), e); + } + BMap properties = (BMap) bMessage.getMapValue(MESSAGE_PROPERTIES); + if (Objects.nonNull(properties)) { + populateMQProperties(properties, mqMessage); + } + return mqMessage; + } + + public static BMap getBMessageFromMQMessage(MQMessage mqMessage) { + BMap bMessage = ValueCreator.createRecordValue(getModule(), BMESSAGE_NAME); + try { + byte[] payload = new byte[mqMessage.getDataLength()]; + mqMessage.readFully(payload); + bMessage.put(MESSAGE_PAYLOAD, ValueCreator.createArrayValue(payload)); + bMessage.put(MESSAGE_PROPERTY, getBProperties(mqMessage)); + return bMessage; + } catch (MQException | IOException e) { + throw createError(IBMMQ_ERROR, + String.format("Error occurred while reading the message: %s", e.getMessage()), e); + } + } + + private static BMap getBProperties(MQMessage mqMessage) throws MQException { + BMap properties = ValueCreator.createMapValue(TypeCreator + .createMapType(TypeCreator.createRecordType(BPROPERTY, getModule(), 0, false, 0))); + Enumeration propertyNames = mqMessage.getPropertyNames("%"); + for (String propertyName : Collections.list(propertyNames)) { + BMap property = ValueCreator.createRecordValue(getModule(), BPROPERTY); + MQPropertyDescriptor propertyDescriptor = new MQPropertyDescriptor(); + Object propertyObject = mqMessage.getObjectProperty(propertyName, propertyDescriptor); + if (propertyObject instanceof Integer intProperty) { + property.put(PROPERTY_VALUE, intProperty.longValue()); + } else if (propertyObject instanceof String stringProperty) { + property.put(PROPERTY_VALUE, StringUtils.fromString(stringProperty)); + } else { + property.put(PROPERTY_VALUE, propertyObject); + } + property.put(PROPERTY_DESCRIPTOR, + populateDescriptorFromMQPropertyDescriptor(propertyDescriptor)); + properties.put(StringUtils.fromString(propertyName), property); + } + return properties; + } + + private static void populateMQProperties(BMap properties, MQMessage mqMessage) { + for (BString key : properties.getKeys()) { + try { + handlePropertyValue(properties, mqMessage, key); + } catch (MQException e) { + throw createError(IBMMQ_ERROR, + String.format("Error occurred while setting message properties: %s", e.getMessage()), e); + } + } + } + + private static void handlePropertyValue(BMap properties, MQMessage mqMessage, BString key) + throws MQException { + BMap property = (BMap) properties.getMapValue(key); + MQPropertyDescriptor propertyDescriptor = defaultPropertyDescriptor; + if (property.containsKey(PROPERTY_DESCRIPTOR)) { + propertyDescriptor = getMQPropertyDescriptor(properties.getMapValue(PROPERTY_DESCRIPTOR)); + } + Object value = property.get(PROPERTY_VALUE); + if (value instanceof Long longValue) { + mqMessage.setIntProperty(key.getValue(), propertyDescriptor, longValue.intValue()); + } else if (value instanceof Boolean booleanValue) { + mqMessage.setBooleanProperty(key.getValue(), propertyDescriptor, booleanValue); + } else if (value instanceof Byte byteValue) { + mqMessage.setByteProperty(key.getValue(), propertyDescriptor, byteValue); + } else if (value instanceof byte[] bytesValue) { + mqMessage.setBytesProperty(key.getValue(), propertyDescriptor, bytesValue); + } else if (value instanceof Float floatValue) { + mqMessage.setFloatProperty(key.getValue(), propertyDescriptor, floatValue); + } else if (value instanceof Double doubleValue) { + mqMessage.setDoubleProperty(key.getValue(), propertyDescriptor, doubleValue); + } else if (value instanceof BString stringValue) { + mqMessage.setStringProperty(key.getValue(), propertyDescriptor, stringValue.getValue()); + } + } + + private static MQPropertyDescriptor getMQPropertyDescriptor(BMap descriptor) { + MQPropertyDescriptor propertyDescriptor = new MQPropertyDescriptor(); + if (descriptor.containsKey(PD_VERSION)) { + propertyDescriptor.version = ((Long) descriptor.get(PD_VERSION)).intValue(); + } + if (descriptor.containsKey(PD_COPY_OPTIONS)) { + propertyDescriptor.copyOptions = ((Long) descriptor.get(PD_COPY_OPTIONS)).intValue(); + } + if (descriptor.containsKey(PD_OPTIONS)) { + propertyDescriptor.options = ((Long) descriptor.get(PD_OPTIONS)).intValue(); + } + if (descriptor.containsKey(PD_SUPPORT)) { + propertyDescriptor.support = ((Long) descriptor.get(PD_SUPPORT)).intValue(); + } + if (descriptor.containsKey(PD_CONTEXT)) { + propertyDescriptor.context = ((Long) descriptor.get(PD_CONTEXT)).intValue(); + } + return propertyDescriptor; + } + + private static BMap populateDescriptorFromMQPropertyDescriptor(MQPropertyDescriptor propertyDescriptor) { + BMap descriptor = ValueCreator.createMapValue(TypeCreator + .createMapType(PredefinedTypes.TYPE_INT)); + descriptor.put(PD_VERSION, propertyDescriptor.version); + descriptor.put(PD_COPY_OPTIONS, propertyDescriptor.copyOptions); + descriptor.put(PD_OPTIONS, propertyDescriptor.options); + descriptor.put(PD_SUPPORT, propertyDescriptor.support); + descriptor.put(PD_CONTEXT, propertyDescriptor.context); + return descriptor; + } + + public static MQGetMessageOptions getGetMessageOptions(BMap bOptions) { + int waitInterval = bOptions.getIntValue(WAIT_INTERVAL).intValue(); + int options = bOptions.getIntValue(OPTIONS).intValue(); + MQGetMessageOptions getMessageOptions = new MQGetMessageOptions(); + getMessageOptions.waitInterval = waitInterval * 1000; + getMessageOptions.options = options; + return getMessageOptions; + } + public static BError createError(String errorType, String message, Throwable throwable) { BError cause = ErrorCreator.createError(throwable); + BMap errorDetails = ValueCreator.createRecordValue(getModule(), ERROR_DETAILS); + if (throwable instanceof MQException exception) { + errorDetails.put(ERROR_REASON_CODE, exception.getReason()); + errorDetails.put(ERROR_ERROR_CODE, StringUtils.fromString(exception.getErrorCode())); + errorDetails.put(ERROR_COMPLETION_CODE, exception.getCompCode()); + } return ErrorCreator.createError( - ModuleUtils.getModule(), errorType, StringUtils.fromString(message), cause, null); + ModuleUtils.getModule(), errorType, StringUtils.fromString(message), cause, errorDetails); } public static Optional getOptionalStringProperty(BMap config, BString fieldName) { diff --git a/native/src/main/java/io/ballerina/lib/ibm.ibmmq/Constants.java b/native/src/main/java/io/ballerina/lib/ibm.ibmmq/Constants.java index d9ad77b..699bac9 100644 --- a/native/src/main/java/io/ballerina/lib/ibm.ibmmq/Constants.java +++ b/native/src/main/java/io/ballerina/lib/ibm.ibmmq/Constants.java @@ -23,5 +23,7 @@ public interface Constants { public static final String IBMMQ_ERROR = "Error"; // Native properties in respective ballerina objects - public static final String NATIVE_QUEUE_MANAGER = "queueManager"; + String NATIVE_QUEUE_MANAGER = "queueManager"; + String NATIVE_TOPIC = "topic"; + String NATIVE_QUEUE = "queue"; } diff --git a/native/src/main/java/io/ballerina/lib/ibm.ibmmq/MQThreadFactory.java b/native/src/main/java/io/ballerina/lib/ibm.ibmmq/MQThreadFactory.java new file mode 100644 index 0000000..462f1c9 --- /dev/null +++ b/native/src/main/java/io/ballerina/lib/ibm.ibmmq/MQThreadFactory.java @@ -0,0 +1,41 @@ +/* + * Copyright (c) 2023, WSO2 LLC. (http://www.wso2.org) All Rights Reserved. + * + * WSO2 LLC. licenses this file to you under the Apache License, + * Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.ballerina.lib.ibm.ibmmq; + +import java.util.concurrent.ThreadFactory; + + +/** + * A {@link ThreadFactory} object that creates new threads on demand for IBM MQ queue or topic actions. + */ +public class MQThreadFactory implements ThreadFactory { + + private final String threadGroupName; + + public MQThreadFactory(String threadGroupName) { + this.threadGroupName = threadGroupName; + } + + @Override + public Thread newThread(Runnable runnable) { + Thread ibmMqClientThread = new Thread(runnable); + ibmMqClientThread.setName(threadGroupName); + return ibmMqClientThread; + } +} diff --git a/native/src/main/java/io/ballerina/lib/ibm.ibmmq/Queue.java b/native/src/main/java/io/ballerina/lib/ibm.ibmmq/Queue.java new file mode 100644 index 0000000..ecb72d5 --- /dev/null +++ b/native/src/main/java/io/ballerina/lib/ibm.ibmmq/Queue.java @@ -0,0 +1,101 @@ +/* + * Copyright (c) 2023, WSO2 LLC. (http://www.wso2.org) All Rights Reserved. + * + * WSO2 LLC. licenses this file to you under the Apache License, + * Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.ballerina.lib.ibm.ibmmq; + +import com.ibm.mq.MQException; +import com.ibm.mq.MQGetMessageOptions; +import com.ibm.mq.MQMessage; +import com.ibm.mq.MQQueue; +import com.ibm.mq.constants.CMQC; +import io.ballerina.runtime.api.Environment; +import io.ballerina.runtime.api.Future; +import io.ballerina.runtime.api.values.BError; +import io.ballerina.runtime.api.values.BMap; +import io.ballerina.runtime.api.values.BObject; +import io.ballerina.runtime.api.values.BString; + +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; + +import static io.ballerina.lib.ibm.ibmmq.CommonUtils.createError; +import static io.ballerina.lib.ibm.ibmmq.Constants.IBMMQ_ERROR; + +/** + * Representation of {@link com.ibm.mq.MQQueue} with utility methods to invoke as inter-op functions. + */ +public class Queue { + private static final ExecutorService QUEUE_EXECUTOR_SERVICE = Executors.newCachedThreadPool( + new MQThreadFactory("balx-ibmmq-queue-client-network-thread")); + + public static Object put(Environment environment, BObject queueObject, BMap message) { + MQQueue queue = (MQQueue) queueObject.getNativeData(Constants.NATIVE_QUEUE); + MQMessage mqMessage = CommonUtils.getMqMessageFromBMessage(message); + Future future = environment.markAsync(); + QUEUE_EXECUTOR_SERVICE.execute(() -> { + try { + queue.put(mqMessage); + future.complete(null); + } catch (MQException e) { + BError bError = createError(IBMMQ_ERROR, + String.format("Error occurred while putting a message to the queue: %s", e.getMessage()), e); + future.complete(bError); + } + }); + return null; + } + + public static Object get(Environment environment, BObject queueObject, BMap options) { + MQQueue queue = (MQQueue) queueObject.getNativeData(Constants.NATIVE_QUEUE); + MQGetMessageOptions getMessageOptions = CommonUtils.getGetMessageOptions(options); + Future future = environment.markAsync(); + QUEUE_EXECUTOR_SERVICE.execute(() -> { + try { + MQMessage message = new MQMessage(); + queue.get(message, getMessageOptions); + future.complete(CommonUtils.getBMessageFromMQMessage(message)); + } catch (MQException e) { + if (e.reasonCode == CMQC.MQRC_NO_MSG_AVAILABLE) { + future.complete(null); + } else { + BError bError = createError(IBMMQ_ERROR, + String.format("Error occurred while getting a message from the queue: %s", + e.getMessage()), e); + future.complete(bError); + } + } + }); + return null; + } + + public static Object close(Environment env, BObject queueObject) { + MQQueue queue = (MQQueue) queueObject.getNativeData(Constants.NATIVE_QUEUE); + Future future = env.markAsync(); + QUEUE_EXECUTOR_SERVICE.execute(() -> { + try { + queue.close(); + future.complete(null); + } catch (MQException e) { + BError bError = createError(IBMMQ_ERROR, + String.format("Error occurred while closing the queue: %s", e.getMessage()), e); + future.complete(bError); + } + }); + return null; + } +} diff --git a/native/src/main/java/io/ballerina/lib/ibm.ibmmq/QueueManager.java b/native/src/main/java/io/ballerina/lib/ibm.ibmmq/QueueManager.java index c067909..846b102 100644 --- a/native/src/main/java/io/ballerina/lib/ibm.ibmmq/QueueManager.java +++ b/native/src/main/java/io/ballerina/lib/ibm.ibmmq/QueueManager.java @@ -19,8 +19,11 @@ package io.ballerina.lib.ibm.ibmmq; import com.ibm.mq.MQException; +import com.ibm.mq.MQQueue; import com.ibm.mq.MQQueueManager; +import com.ibm.mq.MQTopic; import com.ibm.mq.constants.MQConstants; +import io.ballerina.runtime.api.creators.ValueCreator; import io.ballerina.runtime.api.utils.StringUtils; import io.ballerina.runtime.api.values.BMap; import io.ballerina.runtime.api.values.BObject; @@ -43,9 +46,11 @@ public class QueueManager { private static final BString CHANNEL = StringUtils.fromString("channel"); private static final BString USER_ID = StringUtils.fromString("userID"); private static final BString PASSWORD = StringUtils.fromString("password"); + private static final String BTOPIC = "Topic"; + private static final String BQUEUE = "Queue"; /** - * Creates a JMS connection with the provided configurations. + * Creates a IBM MQ queue manager with the provided configurations. * * @param queueManager Ballerina queue-manager object * @param configurations IBM MQ connection configurations @@ -69,7 +74,7 @@ private static Hashtable getConnectionProperties(BMap getConnectionProperties(BMap properties.put(MQConstants.PASSWORD_PROPERTY, password)); return properties; } + + public static Object accessQueue(BObject queueManagerObject, BString queueName, Long options) { + MQQueueManager queueManager = (MQQueueManager) queueManagerObject.getNativeData(NATIVE_QUEUE_MANAGER); + try { + MQQueue mqQueue = queueManager.accessQueue(queueName.getValue(), options.intValue()); + BObject bQueue = ValueCreator.createObjectValue(ModuleUtils.getModule(), BQUEUE); + bQueue.addNativeData(Constants.NATIVE_QUEUE, mqQueue); + return bQueue; + } catch (MQException e) { + return createError(IBMMQ_ERROR, + String.format("Error occurred while accessing queue: %s", e.getMessage()), e); + } + } + + public static Object accessTopic(BObject queueManagerObject, BString topicName, + BString topicString, Long openTopicOption, Long options) { + MQQueueManager queueManager = (MQQueueManager) queueManagerObject.getNativeData(NATIVE_QUEUE_MANAGER); + try { + MQTopic mqTopic = queueManager.accessTopic(topicName.getValue(), topicString.getValue(), + openTopicOption.intValue(), options.intValue()); + BObject bTopic = ValueCreator.createObjectValue(ModuleUtils.getModule(), BTOPIC); + bTopic.addNativeData(Constants.NATIVE_TOPIC, mqTopic); + return bTopic; + } catch (MQException e) { + return createError(IBMMQ_ERROR, + String.format("Error occurred while accessing topic: %s", e.getMessage()), e); + } + } + + public static Object disconnect(BObject queueManagerObject) { + MQQueueManager queueManager = (MQQueueManager) queueManagerObject.getNativeData(NATIVE_QUEUE_MANAGER); + try { + queueManager.disconnect(); + } catch (MQException e) { + return createError(IBMMQ_ERROR, + String.format("Error occurred while disconnecting queue manager: %s", e.getMessage()), e); + } + return null; + } } diff --git a/native/src/main/java/io/ballerina/lib/ibm.ibmmq/Topic.java b/native/src/main/java/io/ballerina/lib/ibm.ibmmq/Topic.java new file mode 100644 index 0000000..b608ffe --- /dev/null +++ b/native/src/main/java/io/ballerina/lib/ibm.ibmmq/Topic.java @@ -0,0 +1,101 @@ +/* + * Copyright (c) 2023, WSO2 LLC. (http://www.wso2.org) All Rights Reserved. + * + * WSO2 LLC. licenses this file to you under the Apache License, + * Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.ballerina.lib.ibm.ibmmq; + +import com.ibm.mq.MQException; +import com.ibm.mq.MQGetMessageOptions; +import com.ibm.mq.MQMessage; +import com.ibm.mq.MQTopic; +import com.ibm.mq.constants.CMQC; +import io.ballerina.runtime.api.Environment; +import io.ballerina.runtime.api.Future; +import io.ballerina.runtime.api.values.BError; +import io.ballerina.runtime.api.values.BMap; +import io.ballerina.runtime.api.values.BObject; +import io.ballerina.runtime.api.values.BString; + +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; + +import static io.ballerina.lib.ibm.ibmmq.CommonUtils.createError; +import static io.ballerina.lib.ibm.ibmmq.Constants.IBMMQ_ERROR; + +/** + * Representation of {@link com.ibm.mq.MQTopic} with utility methods to invoke as inter-op functions. + */ +public class Topic { + private static final ExecutorService topicExecutorService = Executors.newCachedThreadPool( + new MQThreadFactory("balx-ibmmq-topic-client-network-thread")); + + public static Object put(Environment environment, BObject topicObject, BMap message) { + MQTopic topic = (MQTopic) topicObject.getNativeData(Constants.NATIVE_TOPIC); + MQMessage mqMessage = CommonUtils.getMqMessageFromBMessage(message); + Future future = environment.markAsync(); + topicExecutorService.execute(() -> { + try { + topic.put(mqMessage); + future.complete(null); + } catch (Exception e) { + BError bError = createError(IBMMQ_ERROR, + String.format("Error occurred while putting a message to the topic: %s", e.getMessage()), e); + future.complete(bError); + } + }); + return null; + } + + public static Object get(Environment environment, BObject topicObject, BMap options) { + MQTopic topic = (MQTopic) topicObject.getNativeData(Constants.NATIVE_TOPIC); + MQGetMessageOptions getMessageOptions = CommonUtils.getGetMessageOptions(options); + Future future = environment.markAsync(); + topicExecutorService.execute(() -> { + try { + MQMessage message = new MQMessage(); + topic.get(message, getMessageOptions); + future.complete(CommonUtils.getBMessageFromMQMessage(message)); + } catch (MQException e) { + if (e.reasonCode == CMQC.MQRC_NO_MSG_AVAILABLE) { + future.complete(null); + } else { + BError bError = createError(IBMMQ_ERROR, + String.format("Error occurred while getting a message from the topic: %s", e.getMessage()), + e); + future.complete(bError); + } + } + }); + return null; + } + + public static Object close(Environment env, BObject topicObject) { + MQTopic topic = (MQTopic) topicObject.getNativeData(Constants.NATIVE_TOPIC); + Future future = env.markAsync(); + topicExecutorService.execute(() -> { + try { + topic.close(); + future.complete(null); + } catch (MQException e) { + BError bError = createError(IBMMQ_ERROR, + String.format("Error occurred while closing the topic: %s", e.getMessage()), e); + future.complete(bError); + } + }); + return null; + } +}