diff --git a/ballerina/Dependencies.toml b/ballerina/Dependencies.toml index 1932986..7e95fda 100644 --- a/ballerina/Dependencies.toml +++ b/ballerina/Dependencies.toml @@ -15,12 +15,35 @@ modules = [ {org = "ballerina", packageName = "jballerina.java", moduleName = "jballerina.java"} ] +[[package]] +org = "ballerina" +name = "lang.error" +version = "0.0.0" +scope = "testOnly" +dependencies = [ + {org = "ballerina", name = "jballerina.java"} +] + +[[package]] +org = "ballerina" +name = "test" +version = "0.0.0" +scope = "testOnly" +dependencies = [ + {org = "ballerina", name = "jballerina.java"}, + {org = "ballerina", name = "lang.error"} +] +modules = [ + {org = "ballerina", packageName = "test", moduleName = "test"} +] + [[package]] org = "ballerinax" name = "ibm.ibmmq" version = "0.1.0" dependencies = [ - {org = "ballerina", name = "jballerina.java"} + {org = "ballerina", name = "jballerina.java"}, + {org = "ballerina", name = "test"} ] modules = [ {org = "ballerinax", packageName = "ibm.ibmmq", moduleName = "ibm.ibmmq"} diff --git a/ballerina/build.gradle b/ballerina/build.gradle index 24b2730..3b1cce2 100644 --- a/ballerina/build.gradle +++ b/ballerina/build.gradle @@ -102,51 +102,51 @@ task commitTomlFiles { } } -// task startActiveMQServer() { -// doLast { -// if (!Os.isFamily(Os.FAMILY_WINDOWS)) { -// def stdOut = new ByteArrayOutputStream() -// exec { -// commandLine 'sh', '-c', "docker ps --filter name=activemq-test" -// standardOutput = stdOut -// } -// if (!stdOut.toString().contains("activemq-test")) { -// println "Starting ActiveMQ server." -// exec { -// commandLine 'sh', '-c', "docker-compose -f tests/resources/docker-compose.yaml up -d" -// standardOutput = stdOut -// } -// println stdOut.toString() -// sleep(5 * 1000) -// } else { -// println "ActiveMQ server is already running." -// } -// } -// } -// } - -// task stopActiveMQServer() { -// doLast { -// if (!Os.isFamily(Os.FAMILY_WINDOWS)) { -// def stdOut = new ByteArrayOutputStream() -// exec { -// commandLine 'sh', '-c', "docker ps --filter name=activemq-test" -// standardOutput = stdOut -// } -// if (stdOut.toString().contains("activemq-test")) { -// println "Stopping ActiveMQ server." -// exec { -// commandLine 'sh', '-c', "docker-compose -f tests/resources/docker-compose.yaml rm -svf" -// standardOutput = stdOut -// } -// println stdOut.toString() -// sleep(5 * 1000) -// } else { -// println "ActiveMQ server is not started." -// } -// } -// } -// } + task startIBMMQServer() { + doLast { + if (!Os.isFamily(Os.FAMILY_WINDOWS)) { + def stdOut = new ByteArrayOutputStream() + exec { + commandLine 'sh', '-c', "docker ps --filter name=ibmmq-test" + standardOutput = stdOut + } + if (!stdOut.toString().contains("ibmmq-test")) { + println "Starting IBMMQ server." + exec { + commandLine 'sh', '-c', "docker-compose -f tests/resources/docker-compose.yaml up -d" + standardOutput = stdOut + } + println stdOut.toString() + sleep(5 * 1000) + } else { + println "IBMMQ server is already running." + } + } + } + } + + task stopIBMMQServer() { + doLast { + if (!Os.isFamily(Os.FAMILY_WINDOWS)) { + def stdOut = new ByteArrayOutputStream() + exec { + commandLine 'sh', '-c', "docker ps --filter name=ibmmq-test" + standardOutput = stdOut + } + if (stdOut.toString().contains("ibmmq-test")) { + println "Stopping IBMMQ server." + exec { + commandLine 'sh', '-c', "docker-compose -f tests/resources/docker-compose.yaml rm -svf" + standardOutput = stdOut + } + println stdOut.toString() + sleep(5 * 1000) + } else { + println "IBMMQ server is not started." + } + } + } + } publishing { publications { @@ -168,8 +168,8 @@ publishing { updateTomlFiles.dependsOn copyStdlibs -// test.dependsOn startActiveMQServer -// build.finalizedBy stopActiveMQServer + test.dependsOn startIBMMQServer + build.finalizedBy stopIBMMQServer build.dependsOn ":ibm.ibmmq-native:build" build.dependsOn "generatePomFileForMavenPublication" diff --git a/ballerina/destination.bal b/ballerina/destination.bal index 61119c4..12afd8c 100644 --- a/ballerina/destination.bal +++ b/ballerina/destination.bal @@ -20,7 +20,7 @@ import ballerina/jballerina.java; public type Destination distinct client object { remote function put(Message message) returns Error?; - remote function get(*GetMessageOptions options) returns Message|Error?; + remote function get(*GetMessageOptions getMessageOptions) returns Message|Error?; remote function close() returns Error?; }; @@ -40,10 +40,10 @@ public isolated client class Queue { # Retrieves a message from an IBM MQ queue. # - # + options - Options to control message retrieval + # + getMessageOptions - Options to control message retrieval # + return - An `ibmmq:Message` if there is any message in the queue, `()` if there # is no message or else `ibmmq:Error` if the operation fails - remote function get(*GetMessageOptions options) returns Message|Error? = + remote function get(*GetMessageOptions getMessageOptions) returns Message|Error? = @java:Method { 'class: "io.ballerina.lib.ibm.ibmmq.Queue" } external; @@ -69,10 +69,10 @@ public isolated client class Topic { # Retrieves a message from an IBM MQ topic. # - # + options - Options to control message retrieval + # + getMessageOptions - Options to control message retrieval # + return - An `ibmmq:Message` if there is any message in the queue, `()` if there # is no message or else `ibmmq:Error` if the operation fails - remote function get(*GetMessageOptions options) returns Message|Error? = + remote function get(*GetMessageOptions getMessageOptions) returns Message|Error? = @java:Method { 'class: "io.ballerina.lib.ibm.ibmmq.Topic" } external; diff --git a/ballerina/tests/pub_sub_tests.bal b/ballerina/tests/pub_sub_tests.bal new file mode 100644 index 0000000..297ad62 --- /dev/null +++ b/ballerina/tests/pub_sub_tests.bal @@ -0,0 +1,196 @@ +// Copyright (c) 2023, WSO2 LLC. (http://www.wso2.org) All Rights Reserved. +// +// WSO2 LLC. licenses this file to you under the Apache License, +// Version 2.0 (the "License"); you may not use this file except +// in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +import ballerina/test; + +@test:Config {} +function basicPublisherSubscriberTest() returns error? { + QueueManager queueManager = check new QueueManager(name = "QM1", host = "localhost", channel = "DEV.APP.SVRCONN"); + Topic subscriber = check queueManager.accessTopic("dev", "DEV.BASE.TOPIC", OPEN_AS_SUBSCRIPTION, MQSO_CREATE); + Topic publisher = check queueManager.accessTopic("dev", "DEV.BASE.TOPIC", OPEN_AS_PUBLICATION, MQOO_OUTPUT); + check publisher->put({ + payload: "Hello World".toBytes() + }); + Message? message = check subscriber->get(); + if message !is () { + test:assertEquals(string:fromBytes(message.payload), "Hello World"); + } else { + test:assertFail("Expected a value for message"); + } + check subscriber->close(); + check publisher->close(); + check queueManager.disconnect(); +} + +@test:Config {} +function pubSubMultipleMessagesInOrderTest() returns error? { + QueueManager queueManager = check new QueueManager(name = "QM1", host = "localhost", channel = "DEV.APP.SVRCONN"); + Topic subscriber = check queueManager.accessTopic("dev", "DEV.BASE.TOPIC", OPEN_AS_SUBSCRIPTION, MQSO_CREATE); + Topic publisher = check queueManager.accessTopic("dev", "DEV.BASE.TOPIC", OPEN_AS_PUBLICATION, MQOO_OUTPUT); + foreach int i in 0 ... 4 { + check publisher->put({ + payload: i.toString().toBytes() + }); + } + foreach int i in 0 ... 4 { + Message? message = check subscriber->get(waitInterval = 2); + if message !is () { + test:assertEquals(string:fromBytes(message.payload), i.toString()); + } else { + test:assertFail("Expected a value for message"); + } + } + check subscriber->close(); + check publisher->close(); + check queueManager.disconnect(); +} + +@test:Config {} +function subscribeWithFiniteTimeoutTest() returns error? { + QueueManager queueManager = check new QueueManager(name = "QM1", host = "localhost", channel = "DEV.APP.SVRCONN"); + Topic subscriber = check queueManager.accessTopic("dev", "DEV.BASE.TOPIC", OPEN_AS_SUBSCRIPTION, MQSO_CREATE); + Topic publisher = check queueManager.accessTopic("dev", "DEV.BASE.TOPIC", OPEN_AS_PUBLICATION, MQOO_OUTPUT); + check publisher->put({ + payload: "Hello World".toBytes() + }); + Message? message = check subscriber->get(waitInterval = 5); + if message !is () { + test:assertEquals(string:fromBytes(message.payload), "Hello World"); + } else { + test:assertFail("Expected a value for message"); + } + check subscriber->close(); + check publisher->close(); + check queueManager.disconnect(); +} + +@test:Config {} +function subscribeWithoutPublishTest() returns error? { + QueueManager queueManager = check new QueueManager(name = "QM1", host = "localhost", channel = "DEV.APP.SVRCONN"); + Topic subscriber = check queueManager.accessTopic("dev", "DEV.BASE.TOPIC", OPEN_AS_SUBSCRIPTION, MQSO_CREATE); + Message|Error? result = subscriber->get(waitInterval = 5); + test:assertTrue(result is ()); + check subscriber->close(); + check queueManager.disconnect(); +} + +@test:Config {} +function publishToNonExistingTopicTest() returns error? { + QueueManager queueManager = check new QueueManager(name = "QM1", host = "localhost", channel = "DEV.APP.SVRCONN"); + Topic|Error result = queueManager.accessTopic("dev", "NON.EXISTING.TOPIC", OPEN_AS_PUBLICATION, MQOO_OUTPUT); + if result is Error { + test:assertEquals(result.message(), "Error occurred while accessing topic: MQJE001: Completion Code '2', Reason '2085'."); + test:assertEquals(result.detail().reasonCode, 2085); + test:assertEquals(result.detail().completionCode, 2); + } else { + test:assertFail("Expected an error"); + } + check queueManager.disconnect(); +} + +@test:Config {} +function subscribeToNonExistingTopicTest() returns error? { + QueueManager queueManager = check new QueueManager(name = "QM1", host = "localhost", channel = "DEV.APP.SVRCONN"); + Topic|Error result = queueManager.accessTopic("dev", "NON.EXISTING.TOPIC", OPEN_AS_SUBSCRIPTION, MQSO_CREATE); + if result is Error { + test:assertEquals(result.message(), "Error occurred while accessing topic: MQJE001: Completion Code '2', Reason '2085'."); + test:assertEquals(result.detail().reasonCode, 2085); + test:assertEquals(result.detail().completionCode, 2); + } else { + test:assertFail("Expected an error"); + } + check queueManager.disconnect(); +} + +@test:Config {} +function subscribeWithInvalidTopicNameTest() returns error? { + QueueManager queueManager = check new QueueManager(name = "QM1", host = "localhost", channel = "DEV.APP.SVRCONN"); + Topic|Error result = queueManager.accessTopic("dev", "INVALID TOPIC", OPEN_AS_SUBSCRIPTION, MQSO_CREATE); + if result is Error { + test:assertEquals(result.message(), "Error occurred while accessing topic: MQJE001: Completion Code '2', Reason '2152'."); + test:assertEquals(result.detail().reasonCode, 2152); + test:assertEquals(result.detail().completionCode, 2); + } else { + test:assertFail("Expected an error"); + } + check queueManager.disconnect(); +} + +@test:Config {} +function publishWithInvalidTopicNameTest() returns error? { + QueueManager queueManager = check new QueueManager(name = "QM1", host = "localhost", channel = "DEV.APP.SVRCONN"); + Topic|Error result = queueManager.accessTopic("dev", "INVALID TOPIC", OPEN_AS_PUBLICATION, MQOO_OUTPUT); + if result is Error { + test:assertEquals(result.message(), "Error occurred while accessing topic: MQJE001: Completion Code '2', Reason '2085'."); + test:assertEquals(result.detail().reasonCode, 2085); + test:assertEquals(result.detail().completionCode, 2); + } else { + test:assertFail("Expected an error"); + } + check queueManager.disconnect(); +} + +@test:Config {} +function accessTopicAfterQMDisconnectTest() returns error? { + QueueManager queueManager = check new QueueManager(name = "QM1", host = "localhost", channel = "DEV.APP.SVRCONN"); + check queueManager.disconnect(); + Topic|Error result = queueManager.accessTopic("dev", "DEV.BASE.TOPIC", OPEN_AS_PUBLICATION, MQOO_OUTPUT); + if result is Error { + test:assertEquals(result.message(), "Error occurred while accessing topic: MQJE001: An MQException " + + "occurred: Completion Code '2', Reason '2018'\n'MQJI002: Not connected to a queue manager.'."); + test:assertEquals(result.detail().reasonCode, 2018); + test:assertEquals(result.detail().completionCode, 2); + } else { + test:assertFail("Expected an error"); + } +} + +@test:Config {} +function putToTopicAfterTopicCloseTest() returns error? { + QueueManager queueManager = check new QueueManager(name = "QM1", host = "localhost", channel = "DEV.APP.SVRCONN"); + Topic publisher = check queueManager.accessTopic("dev", "DEV.BASE.TOPIC", OPEN_AS_PUBLICATION, MQOO_OUTPUT); + check publisher->close(); + Error? result = publisher->put({ + payload: "Hello World".toBytes() + }); + if result is Error { + test:assertEquals(result.message(), "Error occurred while putting a message to the topic: MQJE001: " + + "An MQException occurred: Completion Code '2', Reason '2019'\n'MQJI027: The queue has been closed.'."); + test:assertEquals(result.detail().reasonCode, 2019); + test:assertEquals(result.detail().completionCode, 2); + } else { + test:assertFail("Expected an error"); + } + check queueManager.disconnect(); +} + +@test:Config {} +function putToTopicAfterQMDisconnectTest() returns error? { + QueueManager queueManager = check new QueueManager(name = "QM1", host = "localhost", channel = "DEV.APP.SVRCONN"); + Topic publisher = check queueManager.accessTopic("dev", "DEV.BASE.TOPIC", OPEN_AS_PUBLICATION, MQOO_OUTPUT); + check queueManager.disconnect(); + Error? result = publisher->put({ + payload: "Hello World".toBytes() + }); + if result is Error { + test:assertEquals(result.message(), "Error occurred while putting a message to the topic: MQJE001: An MQException " + + "occurred: Completion Code '2', Reason '2018'\n'MQJI002: Not connected to a queue manager.'."); + test:assertEquals(result.detail().reasonCode, 2018); + test:assertEquals(result.detail().completionCode, 2); + } else { + test:assertFail("Expected an error"); + } + check queueManager.disconnect(); +} diff --git a/ballerina/tests/resources/docker-compose.yaml b/ballerina/tests/resources/docker-compose.yaml new file mode 100644 index 0000000..87f9597 --- /dev/null +++ b/ballerina/tests/resources/docker-compose.yaml @@ -0,0 +1,12 @@ +version: "3.9" + +services: + mq: + image: icr.io/ibm-messaging/mq:latest + container_name: ibmmq-test + ports: + - "1414:1414" + - "9443:9443" + environment: + - LICENSE=accept + - MQ_QMGR_NAME=QM1 diff --git a/ballerina/types.bal b/ballerina/types.bal index 210bf76..d8357a3 100644 --- a/ballerina/types.bal +++ b/ballerina/types.bal @@ -36,11 +36,11 @@ public type QueueManagerConfiguration record {| # IBM MQ get message options. # -# + gmOptions - Get message option +# + options - Get message option # + waitInterval - The maximum time (in seconds) that an `get` call waits for a suitable message to # arrive. It is used in conjunction with `ibmmq.MQGMO_WAIT`. public type GetMessageOptions record {| - int gmOptions = MQGMO_NO_WAIT; + int options = MQGMO_NO_WAIT; int waitInterval = 10; |}; diff --git a/native/src/main/java/io/ballerina/lib/ibm.ibmmq/CommonUtils.java b/native/src/main/java/io/ballerina/lib/ibm.ibmmq/CommonUtils.java index d441506..295a071 100644 --- a/native/src/main/java/io/ballerina/lib/ibm.ibmmq/CommonUtils.java +++ b/native/src/main/java/io/ballerina/lib/ibm.ibmmq/CommonUtils.java @@ -62,7 +62,7 @@ public class CommonUtils { private static final BString PROPERTY_VALUE = StringUtils.fromString("value"); private static final BString PROPERTY_DESCRIPTOR = StringUtils.fromString("descriptor"); private static final BString WAIT_INTERVAL = StringUtils.fromString("waitInterval"); - private static final BString OPTIONS = StringUtils.fromString("gmOptions"); + private static final BString OPTIONS = StringUtils.fromString("options"); private static final MQPropertyDescriptor defaultPropertyDescriptor = new MQPropertyDescriptor();