From 4f7759b5376c64187708101991fbbc592e5cc564 Mon Sep 17 00:00:00 2001 From: dilanSachi Date: Sat, 28 Oct 2023 10:08:18 +0530 Subject: [PATCH 01/17] [Automated] Update the native jar versions --- ballerina/Dependencies.toml | 25 ++++++++++++++++++++++++- 1 file changed, 24 insertions(+), 1 deletion(-) diff --git a/ballerina/Dependencies.toml b/ballerina/Dependencies.toml index 1932986..7e95fda 100644 --- a/ballerina/Dependencies.toml +++ b/ballerina/Dependencies.toml @@ -15,12 +15,35 @@ modules = [ {org = "ballerina", packageName = "jballerina.java", moduleName = "jballerina.java"} ] +[[package]] +org = "ballerina" +name = "lang.error" +version = "0.0.0" +scope = "testOnly" +dependencies = [ + {org = "ballerina", name = "jballerina.java"} +] + +[[package]] +org = "ballerina" +name = "test" +version = "0.0.0" +scope = "testOnly" +dependencies = [ + {org = "ballerina", name = "jballerina.java"}, + {org = "ballerina", name = "lang.error"} +] +modules = [ + {org = "ballerina", packageName = "test", moduleName = "test"} +] + [[package]] org = "ballerinax" name = "ibm.ibmmq" version = "0.1.0" dependencies = [ - {org = "ballerina", name = "jballerina.java"} + {org = "ballerina", name = "jballerina.java"}, + {org = "ballerina", name = "test"} ] modules = [ {org = "ballerinax", packageName = "ibm.ibmmq", moduleName = "ibm.ibmmq"} From f667d46a5c7927e52b54a2b286640a55ad72b925 Mon Sep 17 00:00:00 2001 From: dilanSachi Date: Sat, 28 Oct 2023 10:35:43 +0530 Subject: [PATCH 02/17] [Automated] Update the native jar versions --- ballerina/Dependencies.toml | 23 +++++++++++++++++++++++ 1 file changed, 23 insertions(+) diff --git a/ballerina/Dependencies.toml b/ballerina/Dependencies.toml index 7e95fda..748703b 100644 --- a/ballerina/Dependencies.toml +++ b/ballerina/Dependencies.toml @@ -7,6 +7,19 @@ dependencies-toml-version = "2" distribution-version = "2201.8.0" +[[package]] +org = "ballerina" +name = "io" +version = "1.6.0" +scope = "testOnly" +dependencies = [ + {org = "ballerina", name = "jballerina.java"}, + {org = "ballerina", name = "lang.value"} +] +modules = [ + {org = "ballerina", packageName = "io", moduleName = "io"} +] + [[package]] org = "ballerina" name = "jballerina.java" @@ -24,6 +37,15 @@ dependencies = [ {org = "ballerina", name = "jballerina.java"} ] +[[package]] +org = "ballerina" +name = "lang.value" +version = "0.0.0" +scope = "testOnly" +dependencies = [ + {org = "ballerina", name = "jballerina.java"} +] + [[package]] org = "ballerina" name = "test" @@ -42,6 +64,7 @@ org = "ballerinax" name = "ibm.ibmmq" version = "0.1.0" dependencies = [ + {org = "ballerina", name = "io"}, {org = "ballerina", name = "jballerina.java"}, {org = "ballerina", name = "test"} ] From 8a20cb00eb45ba5ce84a6d862a01f273ccb0cd54 Mon Sep 17 00:00:00 2001 From: dilanSachi Date: Sat, 28 Oct 2023 10:51:45 +0530 Subject: [PATCH 03/17] [Automated] Update the native jar versions --- ballerina/Dependencies.toml | 23 ----------------------- 1 file changed, 23 deletions(-) diff --git a/ballerina/Dependencies.toml b/ballerina/Dependencies.toml index 748703b..7e95fda 100644 --- a/ballerina/Dependencies.toml +++ b/ballerina/Dependencies.toml @@ -7,19 +7,6 @@ dependencies-toml-version = "2" distribution-version = "2201.8.0" -[[package]] -org = "ballerina" -name = "io" -version = "1.6.0" -scope = "testOnly" -dependencies = [ - {org = "ballerina", name = "jballerina.java"}, - {org = "ballerina", name = "lang.value"} -] -modules = [ - {org = "ballerina", packageName = "io", moduleName = "io"} -] - [[package]] org = "ballerina" name = "jballerina.java" @@ -37,15 +24,6 @@ dependencies = [ {org = "ballerina", name = "jballerina.java"} ] -[[package]] -org = "ballerina" -name = "lang.value" -version = "0.0.0" -scope = "testOnly" -dependencies = [ - {org = "ballerina", name = "jballerina.java"} -] - [[package]] org = "ballerina" name = "test" @@ -64,7 +42,6 @@ org = "ballerinax" name = "ibm.ibmmq" version = "0.1.0" dependencies = [ - {org = "ballerina", name = "io"}, {org = "ballerina", name = "jballerina.java"}, {org = "ballerina", name = "test"} ] From 139c10b2e6e8eddd93b983ceaec50af70fc4eaea Mon Sep 17 00:00:00 2001 From: dilanSachi Date: Sat, 28 Oct 2023 11:07:54 +0530 Subject: [PATCH 04/17] Add basic pub sub test case --- ballerina/build.gradle | 94 +++++++++---------- ballerina/constants.bal | 1 + ballerina/queue_manager.bal | 2 +- ballerina/tests/pub_sub_tests.bal | 19 ++++ ballerina/tests/resources/docker-compose.yaml | 11 +++ ballerina/types.bal | 8 +- .../ballerina/lib/ibm.ibmmq/CommonUtils.java | 11 ++- .../ballerina/lib/ibm.ibmmq/QueueManager.java | 2 +- 8 files changed, 91 insertions(+), 57 deletions(-) create mode 100644 ballerina/tests/pub_sub_tests.bal create mode 100644 ballerina/tests/resources/docker-compose.yaml diff --git a/ballerina/build.gradle b/ballerina/build.gradle index 24b2730..3b1cce2 100644 --- a/ballerina/build.gradle +++ b/ballerina/build.gradle @@ -102,51 +102,51 @@ task commitTomlFiles { } } -// task startActiveMQServer() { -// doLast { -// if (!Os.isFamily(Os.FAMILY_WINDOWS)) { -// def stdOut = new ByteArrayOutputStream() -// exec { -// commandLine 'sh', '-c', "docker ps --filter name=activemq-test" -// standardOutput = stdOut -// } -// if (!stdOut.toString().contains("activemq-test")) { -// println "Starting ActiveMQ server." -// exec { -// commandLine 'sh', '-c', "docker-compose -f tests/resources/docker-compose.yaml up -d" -// standardOutput = stdOut -// } -// println stdOut.toString() -// sleep(5 * 1000) -// } else { -// println "ActiveMQ server is already running." -// } -// } -// } -// } - -// task stopActiveMQServer() { -// doLast { -// if (!Os.isFamily(Os.FAMILY_WINDOWS)) { -// def stdOut = new ByteArrayOutputStream() -// exec { -// commandLine 'sh', '-c', "docker ps --filter name=activemq-test" -// standardOutput = stdOut -// } -// if (stdOut.toString().contains("activemq-test")) { -// println "Stopping ActiveMQ server." -// exec { -// commandLine 'sh', '-c', "docker-compose -f tests/resources/docker-compose.yaml rm -svf" -// standardOutput = stdOut -// } -// println stdOut.toString() -// sleep(5 * 1000) -// } else { -// println "ActiveMQ server is not started." -// } -// } -// } -// } + task startIBMMQServer() { + doLast { + if (!Os.isFamily(Os.FAMILY_WINDOWS)) { + def stdOut = new ByteArrayOutputStream() + exec { + commandLine 'sh', '-c', "docker ps --filter name=ibmmq-test" + standardOutput = stdOut + } + if (!stdOut.toString().contains("ibmmq-test")) { + println "Starting IBMMQ server." + exec { + commandLine 'sh', '-c', "docker-compose -f tests/resources/docker-compose.yaml up -d" + standardOutput = stdOut + } + println stdOut.toString() + sleep(5 * 1000) + } else { + println "IBMMQ server is already running." + } + } + } + } + + task stopIBMMQServer() { + doLast { + if (!Os.isFamily(Os.FAMILY_WINDOWS)) { + def stdOut = new ByteArrayOutputStream() + exec { + commandLine 'sh', '-c', "docker ps --filter name=ibmmq-test" + standardOutput = stdOut + } + if (stdOut.toString().contains("ibmmq-test")) { + println "Stopping IBMMQ server." + exec { + commandLine 'sh', '-c', "docker-compose -f tests/resources/docker-compose.yaml rm -svf" + standardOutput = stdOut + } + println stdOut.toString() + sleep(5 * 1000) + } else { + println "IBMMQ server is not started." + } + } + } + } publishing { publications { @@ -168,8 +168,8 @@ publishing { updateTomlFiles.dependsOn copyStdlibs -// test.dependsOn startActiveMQServer -// build.finalizedBy stopActiveMQServer + test.dependsOn startIBMMQServer + build.finalizedBy stopIBMMQServer build.dependsOn ":ibm.ibmmq-native:build" build.dependsOn "generatePomFileForMavenPublication" diff --git a/ballerina/constants.bal b/ballerina/constants.bal index f5e4b0e..15729b7 100644 --- a/ballerina/constants.bal +++ b/ballerina/constants.bal @@ -33,6 +33,7 @@ public const MQOO_PASS_ALL_CONTEXT = 512; public const MQOO_PASS_IDENTITY_CONTEXT = 256; public const MQOO_SET_ALL_CONTEXT = 2048; public const MQOO_SET_IDENTITY_CONTEXT = 1024; +public const MQSO_CREATE = 2; // Options related to the the get message in a topic. public const MQGMO_WAIT = 1; diff --git a/ballerina/queue_manager.bal b/ballerina/queue_manager.bal index 19261df..8b5e752 100644 --- a/ballerina/queue_manager.bal +++ b/ballerina/queue_manager.bal @@ -22,7 +22,7 @@ public isolated class QueueManager { check self.externInit(configurations); } - isolated function externInit(QueueManagerConfiguration configurations) returns Error? = @java:Method { + private isolated function externInit(QueueManagerConfiguration configurations) returns Error? = @java:Method { name: "init", 'class: "io.ballerina.lib.ibm.ibmmq.QueueManager" } external; diff --git a/ballerina/tests/pub_sub_tests.bal b/ballerina/tests/pub_sub_tests.bal new file mode 100644 index 0000000..0a7b43e --- /dev/null +++ b/ballerina/tests/pub_sub_tests.bal @@ -0,0 +1,19 @@ +import ballerina/test; + +@test:Config {} +function basicPublisherSubscriberTest() returns error? { + QueueManager queueManager = check new QueueManager(name = "QM1", host = "localhost", channel = "DEV.APP.SVRCONN"); + Topic subTopic = check queueManager.accessTopic("", "DEV.BASE.TOPIC", OPEN_AS_SUBSCRIPTION, MQSO_CREATE); + future messageFuture = start getMessageFromTopic(subTopic); + Topic pubTopic = check queueManager.accessTopic("", "DEV.BASE.TOPIC", OPEN_AS_PUBLICATION, MQOO_OUTPUT); + check pubTopic->put({ + payload: "Hello World".toBytes() + }); + Message message = check wait messageFuture; + test:assertEquals(string:fromBytes(message.payload), "Hello World"); +} + + +function getMessageFromTopic(Topic topic) returns Message|Error { + return topic->get(); +} \ No newline at end of file diff --git a/ballerina/tests/resources/docker-compose.yaml b/ballerina/tests/resources/docker-compose.yaml new file mode 100644 index 0000000..cba3df0 --- /dev/null +++ b/ballerina/tests/resources/docker-compose.yaml @@ -0,0 +1,11 @@ +version: "3.9" + +services: + mq: + image: icr.io/ibm-messaging/mq:latest + ports: + - "1414:1414" + - "9443:9443" + environment: + - LICENSE=accept + - MQ_QMGR_NAME=QM1 diff --git a/ballerina/types.bal b/ballerina/types.bal index f8e4547..63194db 100644 --- a/ballerina/types.bal +++ b/ballerina/types.bal @@ -28,11 +28,11 @@ public type QueueManagerConfiguration record {| |}; public type AccessQueueOptions MQOO_OUTPUT|MQOO_BROWSE|MQOO_INPUT_AS_Q_DEF|MQOO_INPUT_EXCLUSIVE|MQOO_INPUT_SHARED; -public type AccessTopicOptions MQOO_ALTERNATE_USER_AUTHORITY|MQOO_BIND_AS_Q_DEF|MQOO_FAIL_IF_QUIESCING|MQOO_OUTPUT|MQOO_PASS_ALL_CONTEXT|MQOO_PASS_IDENTITY_CONTEXT|MQOO_SET_ALL_CONTEXT|MQOO_SET_IDENTITY_CONTEXT; +public type AccessTopicOptions MQOO_ALTERNATE_USER_AUTHORITY|MQOO_BIND_AS_Q_DEF|MQOO_FAIL_IF_QUIESCING|MQOO_OUTPUT|MQOO_PASS_ALL_CONTEXT|MQOO_PASS_IDENTITY_CONTEXT|MQOO_SET_ALL_CONTEXT|MQOO_SET_IDENTITY_CONTEXT|MQSO_CREATE; public type GetMessageOptions record {| - GM_OPTIONS gmOptions = MQGMO_NO_SYNCPOINT; - int waitInterval = 0; + GM_OPTIONS gmOptions = MQGMO_WAIT; + int waitInterval = -1; |}; public type Property record {| @@ -41,6 +41,6 @@ public type Property record {| |}; public type Message record {| - map properties; + map properties?; byte[] payload; |}; diff --git a/native/src/main/java/io/ballerina/lib/ibm.ibmmq/CommonUtils.java b/native/src/main/java/io/ballerina/lib/ibm.ibmmq/CommonUtils.java index b4688cd..29de4f7 100644 --- a/native/src/main/java/io/ballerina/lib/ibm.ibmmq/CommonUtils.java +++ b/native/src/main/java/io/ballerina/lib/ibm.ibmmq/CommonUtils.java @@ -34,6 +34,7 @@ import java.io.IOException; import java.util.Collections; import java.util.Enumeration; +import java.util.Objects; import java.util.Optional; import static io.ballerina.lib.ibm.ibmmq.Constants.IBMMQ_ERROR; @@ -61,7 +62,7 @@ public class CommonUtils { private static final BString PROPERTY_VALUE = StringUtils.fromString("value"); private static final BString PROPERTY_DESCRIPTOR = StringUtils.fromString("descriptor"); private static final BString WAIT_INTERVAL = StringUtils.fromString("waitInterval"); - private static final BString OPTIONS = StringUtils.fromString("options"); + private static final BString OPTIONS = StringUtils.fromString("gmOptions"); private static final MQPropertyDescriptor defaultPropertyDescriptor = new MQPropertyDescriptor(); @@ -75,7 +76,9 @@ public static MQMessage getMqMessageFromBMessage(BMap bMessage) String.format("Error occurred while populating payload: %s", e.getMessage()), e); } BMap properties = (BMap) bMessage.getMapValue(MESSAGE_PROPERTIES); - populateMQProperties(properties, mqMessage); + if (Objects.nonNull(properties)) { + populateMQProperties(properties, mqMessage); + } return mqMessage; } @@ -84,7 +87,7 @@ public static BMap getBMessageFromMQMessage(MQMessage mqMessage try { byte[] payload = new byte[mqMessage.getDataLength()]; mqMessage.readFully(payload); - bMessage.put(MESSAGE_PAYLOAD, payload); + bMessage.put(MESSAGE_PAYLOAD, ValueCreator.createArrayValue(payload)); bMessage.put(MESSAGE_PROPERTY, getBProperties(mqMessage)); return bMessage; } catch (MQException | IOException e) { @@ -196,7 +199,7 @@ public static BError createError(String errorType, String message, Throwable thr BMap errorDetails = ValueCreator.createRecordValue(getModule(), ERROR_DETAILS); if (throwable instanceof MQException exception) { errorDetails.put(ERROR_REASON_CODE, exception.getReason()); - errorDetails.put(ERROR_ERROR_CODE, exception.getErrorCode()); + errorDetails.put(ERROR_ERROR_CODE, StringUtils.fromString(exception.getErrorCode())); errorDetails.put(ERROR_COMPLETION_CODE, exception.getCompCode()); } return ErrorCreator.createError( diff --git a/native/src/main/java/io/ballerina/lib/ibm.ibmmq/QueueManager.java b/native/src/main/java/io/ballerina/lib/ibm.ibmmq/QueueManager.java index 52e5ab8..1c034c5 100644 --- a/native/src/main/java/io/ballerina/lib/ibm.ibmmq/QueueManager.java +++ b/native/src/main/java/io/ballerina/lib/ibm.ibmmq/QueueManager.java @@ -74,7 +74,7 @@ private static Hashtable getConnectionProperties(BMap Date: Sat, 28 Oct 2023 11:10:04 +0530 Subject: [PATCH 05/17] Add newline --- ballerina/tests/pub_sub_tests.bal | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/ballerina/tests/pub_sub_tests.bal b/ballerina/tests/pub_sub_tests.bal index 0a7b43e..a18972e 100644 --- a/ballerina/tests/pub_sub_tests.bal +++ b/ballerina/tests/pub_sub_tests.bal @@ -13,7 +13,6 @@ function basicPublisherSubscriberTest() returns error? { test:assertEquals(string:fromBytes(message.payload), "Hello World"); } - function getMessageFromTopic(Topic topic) returns Message|Error { return topic->get(); -} \ No newline at end of file +} From fa63c93a8d6866224a3c536b674731c31ddaa824 Mon Sep 17 00:00:00 2001 From: dilanSachi Date: Sat, 28 Oct 2023 23:28:48 +0530 Subject: [PATCH 06/17] Add close disconnect apis --- ballerina/destination.bal | 16 ++++++++++++++-- ballerina/queue_manager.bal | 5 +++++ .../java/io/ballerina/lib/ibm.ibmmq/Queue.java | 17 +++++++++++++++++ .../ballerina/lib/ibm.ibmmq/QueueManager.java | 11 +++++++++++ .../java/io/ballerina/lib/ibm.ibmmq/Topic.java | 17 +++++++++++++++++ 5 files changed, 64 insertions(+), 2 deletions(-) diff --git a/ballerina/destination.bal b/ballerina/destination.bal index 25e864d..53aa24b 100644 --- a/ballerina/destination.bal +++ b/ballerina/destination.bal @@ -18,7 +18,9 @@ import ballerina/jballerina.java; public type Destination distinct client object { remote function put(Message message) returns Error?; - remote function get(*GetMessageOptions options) returns Message|Error?; + remote function get(*GetMessageOptions options) returns Message|Error; + + remote function close() returns Error?; }; public isolated client class Queue { @@ -33,6 +35,11 @@ public isolated client class Queue { @java:Method { 'class: "io.ballerina.lib.ibm.ibmmq.Queue" } external; + + remote function close() returns Error? = + @java:Method { + 'class: "io.ballerina.lib.ibm.ibmmq.Topic" + } external; } public isolated client class Topic { @@ -43,7 +50,12 @@ public isolated client class Topic { 'class: "io.ballerina.lib.ibm.ibmmq.Topic" } external; - remote function get(*GetMessageOptions options) returns Message|Error = + remote function get(*GetMessageOptions options) returns Message|Error = + @java:Method { + 'class: "io.ballerina.lib.ibm.ibmmq.Topic" + } external; + + remote function close() returns Error? = @java:Method { 'class: "io.ballerina.lib.ibm.ibmmq.Topic" } external; diff --git a/ballerina/queue_manager.bal b/ballerina/queue_manager.bal index 8b5e752..acce8b6 100644 --- a/ballerina/queue_manager.bal +++ b/ballerina/queue_manager.bal @@ -37,4 +37,9 @@ public isolated class QueueManager { @java:Method { 'class: "io.ballerina.lib.ibm.ibmmq.QueueManager" } external; + + public isolated function disconnect() returns Error? = + @java:Method { + 'class: "io.ballerina.lib.ibm.ibmmq.QueueManager" + } external; } diff --git a/native/src/main/java/io/ballerina/lib/ibm.ibmmq/Queue.java b/native/src/main/java/io/ballerina/lib/ibm.ibmmq/Queue.java index 27e9fe2..f1c5997 100644 --- a/native/src/main/java/io/ballerina/lib/ibm.ibmmq/Queue.java +++ b/native/src/main/java/io/ballerina/lib/ibm.ibmmq/Queue.java @@ -18,6 +18,7 @@ package io.ballerina.lib.ibm.ibmmq; +import com.ibm.mq.MQException; import com.ibm.mq.MQGetMessageOptions; import com.ibm.mq.MQMessage; import com.ibm.mq.MQQueue; @@ -75,4 +76,20 @@ public static Object get(Environment environment, BObject queueObject, BMap { + try { + queue.close(); + future.complete(null); + } catch (MQException e) { + BError bError = createError(IBMMQ_ERROR, + String.format("Error occurred while closing the queue: %s", e.getMessage()), e); + future.complete(bError); + } + }); + return null; + } } diff --git a/native/src/main/java/io/ballerina/lib/ibm.ibmmq/QueueManager.java b/native/src/main/java/io/ballerina/lib/ibm.ibmmq/QueueManager.java index 1c034c5..8517841 100644 --- a/native/src/main/java/io/ballerina/lib/ibm.ibmmq/QueueManager.java +++ b/native/src/main/java/io/ballerina/lib/ibm.ibmmq/QueueManager.java @@ -111,4 +111,15 @@ public static Object accessTopic(BObject queueManagerObject, BString topicName, String.format("Error occurred while accessing topic: %s", e.getMessage()), e); } } + + public static Object disconnect(BObject queueManagerObject) { + MQQueueManager queueManager = (MQQueueManager) queueManagerObject.getNativeData(NATIVE_QUEUE_MANAGER); + try { + queueManager.disconnect(); + return null; + } catch (MQException e) { + return createError(IBMMQ_ERROR, + String.format("Error occurred while disconnecting queue manager: %s", e.getMessage()), e); + } + } } diff --git a/native/src/main/java/io/ballerina/lib/ibm.ibmmq/Topic.java b/native/src/main/java/io/ballerina/lib/ibm.ibmmq/Topic.java index d1ecabe..a1f279e 100644 --- a/native/src/main/java/io/ballerina/lib/ibm.ibmmq/Topic.java +++ b/native/src/main/java/io/ballerina/lib/ibm.ibmmq/Topic.java @@ -18,6 +18,7 @@ package io.ballerina.lib.ibm.ibmmq; +import com.ibm.mq.MQException; import com.ibm.mq.MQGetMessageOptions; import com.ibm.mq.MQMessage; import com.ibm.mq.MQTopic; @@ -75,4 +76,20 @@ public static Object get(Environment environment, BObject topicObject, BMap { + try { + topic.close(); + future.complete(null); + } catch (MQException e) { + BError bError = createError(IBMMQ_ERROR, + String.format("Error occurred while closing the topic: %s", e.getMessage()), e); + future.complete(bError); + } + }); + return null; + } } From a091bdff04f5208bc49fe70e8ab2ff02561d3810 Mon Sep 17 00:00:00 2001 From: dilanSachi Date: Sat, 28 Oct 2023 23:29:04 +0530 Subject: [PATCH 07/17] Add negative tests cases --- ballerina/tests/pub_sub_tests.bal | 166 +++++++++++++++++- ballerina/tests/resources/docker-compose.yaml | 1 + 2 files changed, 161 insertions(+), 6 deletions(-) diff --git a/ballerina/tests/pub_sub_tests.bal b/ballerina/tests/pub_sub_tests.bal index a18972e..1aee56c 100644 --- a/ballerina/tests/pub_sub_tests.bal +++ b/ballerina/tests/pub_sub_tests.bal @@ -3,16 +3,170 @@ import ballerina/test; @test:Config {} function basicPublisherSubscriberTest() returns error? { QueueManager queueManager = check new QueueManager(name = "QM1", host = "localhost", channel = "DEV.APP.SVRCONN"); - Topic subTopic = check queueManager.accessTopic("", "DEV.BASE.TOPIC", OPEN_AS_SUBSCRIPTION, MQSO_CREATE); - future messageFuture = start getMessageFromTopic(subTopic); - Topic pubTopic = check queueManager.accessTopic("", "DEV.BASE.TOPIC", OPEN_AS_PUBLICATION, MQOO_OUTPUT); + Topic subTopic = check queueManager.accessTopic("dev", "DEV.BASE.TOPIC", OPEN_AS_SUBSCRIPTION, MQSO_CREATE); + Topic pubTopic = check queueManager.accessTopic("dev", "DEV.BASE.TOPIC", OPEN_AS_PUBLICATION, MQOO_OUTPUT); check pubTopic->put({ payload: "Hello World".toBytes() }); - Message message = check wait messageFuture; + Message message = check subTopic->get(); test:assertEquals(string:fromBytes(message.payload), "Hello World"); + check subTopic->close(); + check queueManager.disconnect(); } -function getMessageFromTopic(Topic topic) returns Message|Error { - return topic->get(); +@test:Config {} +function pubSubMultipleMessagesInOrderTest() returns error? { + QueueManager queueManager = check new QueueManager(name = "QM1", host = "localhost", channel = "DEV.APP.SVRCONN"); + Topic subTopic = check queueManager.accessTopic("dev", "DEV.BASE.TOPIC", OPEN_AS_SUBSCRIPTION, MQSO_CREATE); + Topic pubTopic = check queueManager.accessTopic("dev", "DEV.BASE.TOPIC", OPEN_AS_PUBLICATION, MQOO_OUTPUT); + foreach int i in 0 ... 4 { + check pubTopic->put({ + payload: i.toString().toBytes() + }); + } + foreach int i in 0 ... 4 { + Message message = check subTopic->get(waitInterval = 2); + test:assertEquals(string:fromBytes(message.payload), i.toString()); + } + check subTopic->close(); + check queueManager.disconnect(); +} + +@test:Config {} +function subscribeWithFiniteTimeout() returns error? { + QueueManager queueManager = check new QueueManager(name = "QM1", host = "localhost", channel = "DEV.APP.SVRCONN"); + Topic subTopic = check queueManager.accessTopic("dev", "DEV.BASE.TOPIC", OPEN_AS_SUBSCRIPTION, MQSO_CREATE); + Topic pubTopic = check queueManager.accessTopic("dev", "DEV.BASE.TOPIC", OPEN_AS_PUBLICATION, MQOO_OUTPUT); + check pubTopic->put({ + payload: "Hello World".toBytes() + }); + Message message = check subTopic->get(waitInterval = 5); + test:assertEquals(string:fromBytes(message.payload), "Hello World"); + check subTopic->close(); + check queueManager.disconnect(); +} + +@test:Config {} +function subscribeWithoutPublish() returns error? { + QueueManager queueManager = check new QueueManager(name = "QM1", host = "localhost", channel = "DEV.APP.SVRCONN"); + Topic subTopic = check queueManager.accessTopic("dev", "DEV.BASE.TOPIC", OPEN_AS_SUBSCRIPTION, MQSO_CREATE); + Message|Error result = subTopic->get(waitInterval = 5, gmOptions = MQGMO_NO_WAIT); + if result is Error { + test:assertEquals(result.message(), "Error occurred while getting a message from the topic: MQJE001: Completion Code '2', Reason '2033'."); + test:assertEquals(result.detail().reasonCode, 2033); + test:assertEquals(result.detail().completionCode, 2); + } else { + test:assertFail("Expected an error"); + } + check subTopic->close(); + check queueManager.disconnect(); +} + +@test:Config {} +function publishToNonExistingTopic() returns error? { + QueueManager queueManager = check new QueueManager(name = "QM1", host = "localhost", channel = "DEV.APP.SVRCONN"); + Topic|Error result = queueManager.accessTopic("dev", "NON.EXISTING.TOPIC", OPEN_AS_PUBLICATION, MQOO_OUTPUT); + if result is Error { + test:assertEquals(result.message(), "Error occurred while accessing topic: MQJE001: Completion Code '2', Reason '2085'."); + test:assertEquals(result.detail().reasonCode, 2085); + test:assertEquals(result.detail().completionCode, 2); + } else { + test:assertFail("Expected an error"); + } + check queueManager.disconnect(); +} + +@test:Config {} +function subscribeToNonExistingTopic() returns error? { + QueueManager queueManager = check new QueueManager(name = "QM1", host = "localhost", channel = "DEV.APP.SVRCONN"); + Topic|Error result = queueManager.accessTopic("dev", "NON.EXISTING.TOPIC", OPEN_AS_SUBSCRIPTION, MQSO_CREATE); + if result is Error { + test:assertEquals(result.message(), "Error occurred while accessing topic: MQJE001: Completion Code '2', Reason '2085'."); + test:assertEquals(result.detail().reasonCode, 2085); + test:assertEquals(result.detail().completionCode, 2); + } else { + test:assertFail("Expected an error"); + } + check queueManager.disconnect(); +} + +@test:Config {} +function subscribeWithInvalidTopicName() returns error? { + QueueManager queueManager = check new QueueManager(name = "QM1", host = "localhost", channel = "DEV.APP.SVRCONN"); + Topic|Error result = queueManager.accessTopic("dev", "INVALID TOPIC", OPEN_AS_SUBSCRIPTION, MQSO_CREATE); + if result is Error { + test:assertEquals(result.message(), "Error occurred while accessing topic: MQJE001: Completion Code '2', Reason '2152'."); + test:assertEquals(result.detail().reasonCode, 2152); + test:assertEquals(result.detail().completionCode, 2); + } else { + test:assertFail("Expected an error"); + } + check queueManager.disconnect(); +} + +@test:Config {} +function publishWithInvalidTopicName() returns error? { + QueueManager queueManager = check new QueueManager(name = "QM1", host = "localhost", channel = "DEV.APP.SVRCONN"); + Topic|Error result = queueManager.accessTopic("dev", "INVALID TOPIC", OPEN_AS_PUBLICATION, MQOO_OUTPUT); + if result is Error { + test:assertEquals(result.message(), "Error occurred while accessing topic: MQJE001: Completion Code '2', Reason '2085'."); + test:assertEquals(result.detail().reasonCode, 2085); + test:assertEquals(result.detail().completionCode, 2); + } else { + test:assertFail("Expected an error"); + } + check queueManager.disconnect(); +} + +@test:Config {} +function accessTopicAfterQMDisconnect() returns error? { + QueueManager queueManager = check new QueueManager(name = "QM1", host = "localhost", channel = "DEV.APP.SVRCONN"); + check queueManager.disconnect(); + Topic|Error result = queueManager.accessTopic("dev", "DEV.BASE.TOPIC", OPEN_AS_PUBLICATION, MQOO_OUTPUT); + if result is Error { + test:assertEquals(result.message(), "Error occurred while accessing topic: MQJE001: An MQException " + + "occurred: Completion Code '2', Reason '2018'\n'MQJI002: Not connected to a queue manager.'."); + test:assertEquals(result.detail().reasonCode, 2018); + test:assertEquals(result.detail().completionCode, 2); + } else { + test:assertFail("Expected an error"); + } +} + +@test:Config {} +function putToTopicAfterTopicClose() returns error? { + QueueManager queueManager = check new QueueManager(name = "QM1", host = "localhost", channel = "DEV.APP.SVRCONN"); + Topic pubTopic = check queueManager.accessTopic("dev", "DEV.BASE.TOPIC", OPEN_AS_PUBLICATION, MQOO_OUTPUT); + check pubTopic->close(); + Error? result = pubTopic->put({ + payload: "Hello World".toBytes() + }); + if result is Error { + test:assertEquals(result.message(), "Error occurred while putting a message to the topic: MQJE001: " + + "An MQException occurred: Completion Code '2', Reason '2019'\n'MQJI027: The queue has been closed.'."); + test:assertEquals(result.detail().reasonCode, 2019); + test:assertEquals(result.detail().completionCode, 2); + } else { + test:assertFail("Expected an error"); + } + check queueManager.disconnect(); +} + +@test:Config {} +function putToTopicAfterQMDisconnect() returns error? { + QueueManager queueManager = check new QueueManager(name = "QM1", host = "localhost", channel = "DEV.APP.SVRCONN"); + Topic pubTopic = check queueManager.accessTopic("dev", "DEV.BASE.TOPIC", OPEN_AS_PUBLICATION, MQOO_OUTPUT); + check queueManager.disconnect(); + Error? result = pubTopic->put({ + payload: "Hello World".toBytes() + }); + if result is Error { + test:assertEquals(result.message(), "Error occurred while putting a message to the topic: MQJE001: An MQException " + + "occurred: Completion Code '2', Reason '2018'\n'MQJI002: Not connected to a queue manager.'."); + test:assertEquals(result.detail().reasonCode, 2018); + test:assertEquals(result.detail().completionCode, 2); + } else { + test:assertFail("Expected an error"); + } + check queueManager.disconnect(); } diff --git a/ballerina/tests/resources/docker-compose.yaml b/ballerina/tests/resources/docker-compose.yaml index cba3df0..87f9597 100644 --- a/ballerina/tests/resources/docker-compose.yaml +++ b/ballerina/tests/resources/docker-compose.yaml @@ -3,6 +3,7 @@ version: "3.9" services: mq: image: icr.io/ibm-messaging/mq:latest + container_name: ibmmq-test ports: - "1414:1414" - "9443:9443" From 65f26570125614aeae3d0998aa6b81602f315be0 Mon Sep 17 00:00:00 2001 From: dilanSachi Date: Sat, 28 Oct 2023 23:31:28 +0530 Subject: [PATCH 08/17] Update test names --- ballerina/tests/pub_sub_tests.bal | 18 +++++++++--------- 1 file changed, 9 insertions(+), 9 deletions(-) diff --git a/ballerina/tests/pub_sub_tests.bal b/ballerina/tests/pub_sub_tests.bal index 1aee56c..23f0015 100644 --- a/ballerina/tests/pub_sub_tests.bal +++ b/ballerina/tests/pub_sub_tests.bal @@ -33,7 +33,7 @@ function pubSubMultipleMessagesInOrderTest() returns error? { } @test:Config {} -function subscribeWithFiniteTimeout() returns error? { +function subscribeWithFiniteTimeoutTest() returns error? { QueueManager queueManager = check new QueueManager(name = "QM1", host = "localhost", channel = "DEV.APP.SVRCONN"); Topic subTopic = check queueManager.accessTopic("dev", "DEV.BASE.TOPIC", OPEN_AS_SUBSCRIPTION, MQSO_CREATE); Topic pubTopic = check queueManager.accessTopic("dev", "DEV.BASE.TOPIC", OPEN_AS_PUBLICATION, MQOO_OUTPUT); @@ -47,7 +47,7 @@ function subscribeWithFiniteTimeout() returns error? { } @test:Config {} -function subscribeWithoutPublish() returns error? { +function subscribeWithoutPublishTest() returns error? { QueueManager queueManager = check new QueueManager(name = "QM1", host = "localhost", channel = "DEV.APP.SVRCONN"); Topic subTopic = check queueManager.accessTopic("dev", "DEV.BASE.TOPIC", OPEN_AS_SUBSCRIPTION, MQSO_CREATE); Message|Error result = subTopic->get(waitInterval = 5, gmOptions = MQGMO_NO_WAIT); @@ -63,7 +63,7 @@ function subscribeWithoutPublish() returns error? { } @test:Config {} -function publishToNonExistingTopic() returns error? { +function publishToNonExistingTopicTest() returns error? { QueueManager queueManager = check new QueueManager(name = "QM1", host = "localhost", channel = "DEV.APP.SVRCONN"); Topic|Error result = queueManager.accessTopic("dev", "NON.EXISTING.TOPIC", OPEN_AS_PUBLICATION, MQOO_OUTPUT); if result is Error { @@ -77,7 +77,7 @@ function publishToNonExistingTopic() returns error? { } @test:Config {} -function subscribeToNonExistingTopic() returns error? { +function subscribeToNonExistingTopicTest() returns error? { QueueManager queueManager = check new QueueManager(name = "QM1", host = "localhost", channel = "DEV.APP.SVRCONN"); Topic|Error result = queueManager.accessTopic("dev", "NON.EXISTING.TOPIC", OPEN_AS_SUBSCRIPTION, MQSO_CREATE); if result is Error { @@ -91,7 +91,7 @@ function subscribeToNonExistingTopic() returns error? { } @test:Config {} -function subscribeWithInvalidTopicName() returns error? { +function subscribeWithInvalidTopicNameTest() returns error? { QueueManager queueManager = check new QueueManager(name = "QM1", host = "localhost", channel = "DEV.APP.SVRCONN"); Topic|Error result = queueManager.accessTopic("dev", "INVALID TOPIC", OPEN_AS_SUBSCRIPTION, MQSO_CREATE); if result is Error { @@ -105,7 +105,7 @@ function subscribeWithInvalidTopicName() returns error? { } @test:Config {} -function publishWithInvalidTopicName() returns error? { +function publishWithInvalidTopicNameTest() returns error? { QueueManager queueManager = check new QueueManager(name = "QM1", host = "localhost", channel = "DEV.APP.SVRCONN"); Topic|Error result = queueManager.accessTopic("dev", "INVALID TOPIC", OPEN_AS_PUBLICATION, MQOO_OUTPUT); if result is Error { @@ -119,7 +119,7 @@ function publishWithInvalidTopicName() returns error? { } @test:Config {} -function accessTopicAfterQMDisconnect() returns error? { +function accessTopicAfterQMDisconnectTest() returns error? { QueueManager queueManager = check new QueueManager(name = "QM1", host = "localhost", channel = "DEV.APP.SVRCONN"); check queueManager.disconnect(); Topic|Error result = queueManager.accessTopic("dev", "DEV.BASE.TOPIC", OPEN_AS_PUBLICATION, MQOO_OUTPUT); @@ -134,7 +134,7 @@ function accessTopicAfterQMDisconnect() returns error? { } @test:Config {} -function putToTopicAfterTopicClose() returns error? { +function putToTopicAfterTopicCloseTest() returns error? { QueueManager queueManager = check new QueueManager(name = "QM1", host = "localhost", channel = "DEV.APP.SVRCONN"); Topic pubTopic = check queueManager.accessTopic("dev", "DEV.BASE.TOPIC", OPEN_AS_PUBLICATION, MQOO_OUTPUT); check pubTopic->close(); @@ -153,7 +153,7 @@ function putToTopicAfterTopicClose() returns error? { } @test:Config {} -function putToTopicAfterQMDisconnect() returns error? { +function putToTopicAfterQMDisconnectTest() returns error? { QueueManager queueManager = check new QueueManager(name = "QM1", host = "localhost", channel = "DEV.APP.SVRCONN"); Topic pubTopic = check queueManager.accessTopic("dev", "DEV.BASE.TOPIC", OPEN_AS_PUBLICATION, MQOO_OUTPUT); check queueManager.disconnect(); From ed334b95ff703147ec399b1b9ad765033d4d64e4 Mon Sep 17 00:00:00 2001 From: dilanSachi Date: Sun, 29 Oct 2023 11:34:27 +0530 Subject: [PATCH 09/17] Update java package name --- ballerina/destination.bal | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/ballerina/destination.bal b/ballerina/destination.bal index 53aa24b..620044f 100644 --- a/ballerina/destination.bal +++ b/ballerina/destination.bal @@ -38,7 +38,7 @@ public isolated client class Queue { remote function close() returns Error? = @java:Method { - 'class: "io.ballerina.lib.ibm.ibmmq.Topic" + 'class: "io.ballerina.lib.ibm.ibmmq.Queue" } external; } From afe99196bf32231b5a88624971416b3217cefe14 Mon Sep 17 00:00:00 2001 From: dilanSachi Date: Sun, 29 Oct 2023 11:36:07 +0530 Subject: [PATCH 10/17] Add license header --- ballerina/destination.bal | 1 + ballerina/tests/pub_sub_tests.bal | 16 ++++++++++++++++ 2 files changed, 17 insertions(+) diff --git a/ballerina/destination.bal b/ballerina/destination.bal index 620044f..081dd05 100644 --- a/ballerina/destination.bal +++ b/ballerina/destination.bal @@ -13,6 +13,7 @@ // KIND, either express or implied. See the License for the // specific language governing permissions and limitations // under the License. + import ballerina/jballerina.java; public type Destination distinct client object { diff --git a/ballerina/tests/pub_sub_tests.bal b/ballerina/tests/pub_sub_tests.bal index 23f0015..05c821f 100644 --- a/ballerina/tests/pub_sub_tests.bal +++ b/ballerina/tests/pub_sub_tests.bal @@ -1,3 +1,19 @@ +// Copyright (c) 2023, WSO2 LLC. (http://www.wso2.org) All Rights Reserved. +// +// WSO2 LLC. licenses this file to you under the Apache License, +// Version 2.0 (the "License"); you may not use this file except +// in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + import ballerina/test; @test:Config {} From 5f7de1d8e067ce3d4def7bb5c5b327d47f866457 Mon Sep 17 00:00:00 2001 From: dilanSachi Date: Sun, 29 Oct 2023 21:22:56 +0530 Subject: [PATCH 11/17] Fix review comments --- ballerina/tests/pub_sub_tests.bal | 49 ++++++++++--------- .../ballerina/lib/ibm.ibmmq/QueueManager.java | 2 +- 2 files changed, 27 insertions(+), 24 deletions(-) diff --git a/ballerina/tests/pub_sub_tests.bal b/ballerina/tests/pub_sub_tests.bal index 05c821f..81a8579 100644 --- a/ballerina/tests/pub_sub_tests.bal +++ b/ballerina/tests/pub_sub_tests.bal @@ -19,54 +19,57 @@ import ballerina/test; @test:Config {} function basicPublisherSubscriberTest() returns error? { QueueManager queueManager = check new QueueManager(name = "QM1", host = "localhost", channel = "DEV.APP.SVRCONN"); - Topic subTopic = check queueManager.accessTopic("dev", "DEV.BASE.TOPIC", OPEN_AS_SUBSCRIPTION, MQSO_CREATE); - Topic pubTopic = check queueManager.accessTopic("dev", "DEV.BASE.TOPIC", OPEN_AS_PUBLICATION, MQOO_OUTPUT); - check pubTopic->put({ + Topic subscriber = check queueManager.accessTopic("dev", "DEV.BASE.TOPIC", OPEN_AS_SUBSCRIPTION, MQSO_CREATE); + Topic publisher = check queueManager.accessTopic("dev", "DEV.BASE.TOPIC", OPEN_AS_PUBLICATION, MQOO_OUTPUT); + check publisher->put({ payload: "Hello World".toBytes() }); - Message message = check subTopic->get(); + Message message = check subscriber->get(); test:assertEquals(string:fromBytes(message.payload), "Hello World"); - check subTopic->close(); + check subscriber->close(); + check publisher->close(); check queueManager.disconnect(); } @test:Config {} function pubSubMultipleMessagesInOrderTest() returns error? { QueueManager queueManager = check new QueueManager(name = "QM1", host = "localhost", channel = "DEV.APP.SVRCONN"); - Topic subTopic = check queueManager.accessTopic("dev", "DEV.BASE.TOPIC", OPEN_AS_SUBSCRIPTION, MQSO_CREATE); - Topic pubTopic = check queueManager.accessTopic("dev", "DEV.BASE.TOPIC", OPEN_AS_PUBLICATION, MQOO_OUTPUT); + Topic subscriber = check queueManager.accessTopic("dev", "DEV.BASE.TOPIC", OPEN_AS_SUBSCRIPTION, MQSO_CREATE); + Topic publisher = check queueManager.accessTopic("dev", "DEV.BASE.TOPIC", OPEN_AS_PUBLICATION, MQOO_OUTPUT); foreach int i in 0 ... 4 { - check pubTopic->put({ + check publisher->put({ payload: i.toString().toBytes() }); } foreach int i in 0 ... 4 { - Message message = check subTopic->get(waitInterval = 2); + Message message = check subscriber->get(waitInterval = 2); test:assertEquals(string:fromBytes(message.payload), i.toString()); } - check subTopic->close(); + check subscriber->close(); + check publisher->close(); check queueManager.disconnect(); } @test:Config {} function subscribeWithFiniteTimeoutTest() returns error? { QueueManager queueManager = check new QueueManager(name = "QM1", host = "localhost", channel = "DEV.APP.SVRCONN"); - Topic subTopic = check queueManager.accessTopic("dev", "DEV.BASE.TOPIC", OPEN_AS_SUBSCRIPTION, MQSO_CREATE); - Topic pubTopic = check queueManager.accessTopic("dev", "DEV.BASE.TOPIC", OPEN_AS_PUBLICATION, MQOO_OUTPUT); - check pubTopic->put({ + Topic subscriber = check queueManager.accessTopic("dev", "DEV.BASE.TOPIC", OPEN_AS_SUBSCRIPTION, MQSO_CREATE); + Topic publisher = check queueManager.accessTopic("dev", "DEV.BASE.TOPIC", OPEN_AS_PUBLICATION, MQOO_OUTPUT); + check publisher->put({ payload: "Hello World".toBytes() }); - Message message = check subTopic->get(waitInterval = 5); + Message message = check subscriber->get(waitInterval = 5); test:assertEquals(string:fromBytes(message.payload), "Hello World"); - check subTopic->close(); + check subscriber->close(); + check publisher->close(); check queueManager.disconnect(); } @test:Config {} function subscribeWithoutPublishTest() returns error? { QueueManager queueManager = check new QueueManager(name = "QM1", host = "localhost", channel = "DEV.APP.SVRCONN"); - Topic subTopic = check queueManager.accessTopic("dev", "DEV.BASE.TOPIC", OPEN_AS_SUBSCRIPTION, MQSO_CREATE); - Message|Error result = subTopic->get(waitInterval = 5, gmOptions = MQGMO_NO_WAIT); + Topic subscriber = check queueManager.accessTopic("dev", "DEV.BASE.TOPIC", OPEN_AS_SUBSCRIPTION, MQSO_CREATE); + Message|Error result = subscriber->get(waitInterval = 5, gmOptions = MQGMO_NO_WAIT); if result is Error { test:assertEquals(result.message(), "Error occurred while getting a message from the topic: MQJE001: Completion Code '2', Reason '2033'."); test:assertEquals(result.detail().reasonCode, 2033); @@ -74,7 +77,7 @@ function subscribeWithoutPublishTest() returns error? { } else { test:assertFail("Expected an error"); } - check subTopic->close(); + check subscriber->close(); check queueManager.disconnect(); } @@ -152,9 +155,9 @@ function accessTopicAfterQMDisconnectTest() returns error? { @test:Config {} function putToTopicAfterTopicCloseTest() returns error? { QueueManager queueManager = check new QueueManager(name = "QM1", host = "localhost", channel = "DEV.APP.SVRCONN"); - Topic pubTopic = check queueManager.accessTopic("dev", "DEV.BASE.TOPIC", OPEN_AS_PUBLICATION, MQOO_OUTPUT); - check pubTopic->close(); - Error? result = pubTopic->put({ + Topic publisher = check queueManager.accessTopic("dev", "DEV.BASE.TOPIC", OPEN_AS_PUBLICATION, MQOO_OUTPUT); + check publisher->close(); + Error? result = publisher->put({ payload: "Hello World".toBytes() }); if result is Error { @@ -171,9 +174,9 @@ function putToTopicAfterTopicCloseTest() returns error? { @test:Config {} function putToTopicAfterQMDisconnectTest() returns error? { QueueManager queueManager = check new QueueManager(name = "QM1", host = "localhost", channel = "DEV.APP.SVRCONN"); - Topic pubTopic = check queueManager.accessTopic("dev", "DEV.BASE.TOPIC", OPEN_AS_PUBLICATION, MQOO_OUTPUT); + Topic publisher = check queueManager.accessTopic("dev", "DEV.BASE.TOPIC", OPEN_AS_PUBLICATION, MQOO_OUTPUT); check queueManager.disconnect(); - Error? result = pubTopic->put({ + Error? result = publisher->put({ payload: "Hello World".toBytes() }); if result is Error { diff --git a/native/src/main/java/io/ballerina/lib/ibm.ibmmq/QueueManager.java b/native/src/main/java/io/ballerina/lib/ibm.ibmmq/QueueManager.java index 8517841..85a4780 100644 --- a/native/src/main/java/io/ballerina/lib/ibm.ibmmq/QueueManager.java +++ b/native/src/main/java/io/ballerina/lib/ibm.ibmmq/QueueManager.java @@ -116,10 +116,10 @@ public static Object disconnect(BObject queueManagerObject) { MQQueueManager queueManager = (MQQueueManager) queueManagerObject.getNativeData(NATIVE_QUEUE_MANAGER); try { queueManager.disconnect(); - return null; } catch (MQException e) { return createError(IBMMQ_ERROR, String.format("Error occurred while disconnecting queue manager: %s", e.getMessage()), e); } + return null; } } From 77883a4087e494120263426c4fa0649aa6921481 Mon Sep 17 00:00:00 2001 From: dilanSachi Date: Mon, 30 Oct 2023 09:16:59 +0530 Subject: [PATCH 12/17] Update get api to be non blocking --- ballerina/destination.bal | 6 ++-- ballerina/tests/pub_sub_tests.bal | 35 +++++++++++-------- ballerina/types.bal | 4 +-- .../io/ballerina/lib/ibm.ibmmq/Topic.java | 14 +++++--- 4 files changed, 35 insertions(+), 24 deletions(-) diff --git a/ballerina/destination.bal b/ballerina/destination.bal index 081dd05..aa76ad4 100644 --- a/ballerina/destination.bal +++ b/ballerina/destination.bal @@ -19,7 +19,7 @@ import ballerina/jballerina.java; public type Destination distinct client object { remote function put(Message message) returns Error?; - remote function get(*GetMessageOptions options) returns Message|Error; + remote function get(*GetMessageOptions options) returns Message|Error?; remote function close() returns Error?; }; @@ -32,7 +32,7 @@ public isolated client class Queue { 'class: "io.ballerina.lib.ibm.ibmmq.Queue" } external; - remote function get(*GetMessageOptions options) returns Message|Error = + remote function get(*GetMessageOptions options) returns Message|Error? = @java:Method { 'class: "io.ballerina.lib.ibm.ibmmq.Queue" } external; @@ -51,7 +51,7 @@ public isolated client class Topic { 'class: "io.ballerina.lib.ibm.ibmmq.Topic" } external; - remote function get(*GetMessageOptions options) returns Message|Error = + remote function get(*GetMessageOptions options) returns Message|Error? = @java:Method { 'class: "io.ballerina.lib.ibm.ibmmq.Topic" } external; diff --git a/ballerina/tests/pub_sub_tests.bal b/ballerina/tests/pub_sub_tests.bal index 81a8579..297ad62 100644 --- a/ballerina/tests/pub_sub_tests.bal +++ b/ballerina/tests/pub_sub_tests.bal @@ -13,7 +13,6 @@ // KIND, either express or implied. See the License for the // specific language governing permissions and limitations // under the License. - import ballerina/test; @test:Config {} @@ -24,8 +23,12 @@ function basicPublisherSubscriberTest() returns error? { check publisher->put({ payload: "Hello World".toBytes() }); - Message message = check subscriber->get(); - test:assertEquals(string:fromBytes(message.payload), "Hello World"); + Message? message = check subscriber->get(); + if message !is () { + test:assertEquals(string:fromBytes(message.payload), "Hello World"); + } else { + test:assertFail("Expected a value for message"); + } check subscriber->close(); check publisher->close(); check queueManager.disconnect(); @@ -42,8 +45,12 @@ function pubSubMultipleMessagesInOrderTest() returns error? { }); } foreach int i in 0 ... 4 { - Message message = check subscriber->get(waitInterval = 2); - test:assertEquals(string:fromBytes(message.payload), i.toString()); + Message? message = check subscriber->get(waitInterval = 2); + if message !is () { + test:assertEquals(string:fromBytes(message.payload), i.toString()); + } else { + test:assertFail("Expected a value for message"); + } } check subscriber->close(); check publisher->close(); @@ -58,8 +65,12 @@ function subscribeWithFiniteTimeoutTest() returns error? { check publisher->put({ payload: "Hello World".toBytes() }); - Message message = check subscriber->get(waitInterval = 5); - test:assertEquals(string:fromBytes(message.payload), "Hello World"); + Message? message = check subscriber->get(waitInterval = 5); + if message !is () { + test:assertEquals(string:fromBytes(message.payload), "Hello World"); + } else { + test:assertFail("Expected a value for message"); + } check subscriber->close(); check publisher->close(); check queueManager.disconnect(); @@ -69,14 +80,8 @@ function subscribeWithFiniteTimeoutTest() returns error? { function subscribeWithoutPublishTest() returns error? { QueueManager queueManager = check new QueueManager(name = "QM1", host = "localhost", channel = "DEV.APP.SVRCONN"); Topic subscriber = check queueManager.accessTopic("dev", "DEV.BASE.TOPIC", OPEN_AS_SUBSCRIPTION, MQSO_CREATE); - Message|Error result = subscriber->get(waitInterval = 5, gmOptions = MQGMO_NO_WAIT); - if result is Error { - test:assertEquals(result.message(), "Error occurred while getting a message from the topic: MQJE001: Completion Code '2', Reason '2033'."); - test:assertEquals(result.detail().reasonCode, 2033); - test:assertEquals(result.detail().completionCode, 2); - } else { - test:assertFail("Expected an error"); - } + Message|Error? result = subscriber->get(waitInterval = 5); + test:assertTrue(result is ()); check subscriber->close(); check queueManager.disconnect(); } diff --git a/ballerina/types.bal b/ballerina/types.bal index 63194db..d5cdb6a 100644 --- a/ballerina/types.bal +++ b/ballerina/types.bal @@ -31,8 +31,8 @@ public type AccessQueueOptions MQOO_OUTPUT|MQOO_BROWSE|MQOO_INPUT_AS_Q_DEF|MQOO_ public type AccessTopicOptions MQOO_ALTERNATE_USER_AUTHORITY|MQOO_BIND_AS_Q_DEF|MQOO_FAIL_IF_QUIESCING|MQOO_OUTPUT|MQOO_PASS_ALL_CONTEXT|MQOO_PASS_IDENTITY_CONTEXT|MQOO_SET_ALL_CONTEXT|MQOO_SET_IDENTITY_CONTEXT|MQSO_CREATE; public type GetMessageOptions record {| - GM_OPTIONS gmOptions = MQGMO_WAIT; - int waitInterval = -1; + GM_OPTIONS gmOptions = MQGMO_NO_WAIT; + int waitInterval = 0; |}; public type Property record {| diff --git a/native/src/main/java/io/ballerina/lib/ibm.ibmmq/Topic.java b/native/src/main/java/io/ballerina/lib/ibm.ibmmq/Topic.java index a1f279e..b608ffe 100644 --- a/native/src/main/java/io/ballerina/lib/ibm.ibmmq/Topic.java +++ b/native/src/main/java/io/ballerina/lib/ibm.ibmmq/Topic.java @@ -22,6 +22,7 @@ import com.ibm.mq.MQGetMessageOptions; import com.ibm.mq.MQMessage; import com.ibm.mq.MQTopic; +import com.ibm.mq.constants.CMQC; import io.ballerina.runtime.api.Environment; import io.ballerina.runtime.api.Future; import io.ballerina.runtime.api.values.BError; @@ -68,10 +69,15 @@ public static Object get(Environment environment, BObject topicObject, BMap Date: Mon, 30 Oct 2023 09:17:44 +0530 Subject: [PATCH 13/17] Remove public from extern init --- ballerina/queue_manager.bal | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/ballerina/queue_manager.bal b/ballerina/queue_manager.bal index acce8b6..f65bedc 100644 --- a/ballerina/queue_manager.bal +++ b/ballerina/queue_manager.bal @@ -22,7 +22,7 @@ public isolated class QueueManager { check self.externInit(configurations); } - private isolated function externInit(QueueManagerConfiguration configurations) returns Error? = @java:Method { + isolated function externInit(QueueManagerConfiguration configurations) returns Error? = @java:Method { name: "init", 'class: "io.ballerina.lib.ibm.ibmmq.QueueManager" } external; From 62017d1d38477581ab9386ae0190fd222f65e451 Mon Sep 17 00:00:00 2001 From: ayeshLK Date: Mon, 30 Oct 2023 16:20:47 +0530 Subject: [PATCH 14/17] Add basic test case for queue producer and consumer --- .../tests/queue_producer_consumer_tests.bal | 38 +++++++++++++++++++ 1 file changed, 38 insertions(+) create mode 100644 ballerina/tests/queue_producer_consumer_tests.bal diff --git a/ballerina/tests/queue_producer_consumer_tests.bal b/ballerina/tests/queue_producer_consumer_tests.bal new file mode 100644 index 0000000..69651c6 --- /dev/null +++ b/ballerina/tests/queue_producer_consumer_tests.bal @@ -0,0 +1,38 @@ +// Copyright (c) 2023, WSO2 LLC. (http://www.wso2.org) All Rights Reserved. +// +// WSO2 LLC. licenses this file to you under the Apache License, +// Version 2.0 (the "License"); you may not use this file except +// in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +import ballerina/test; + +@test:Config { + groups: ["ibmq-queue"] +} +function basicQueueProducerConsumerTest() returns error? { + QueueManager queueManager = check new QueueManager(name = "QM1", host = "localhost", channel = "DEV.APP.SVRCONN"); + Queue producer = check queueManager.accessQueue("DEV.QUEUE.1", MQOO_OUTPUT); + Queue consumer = check queueManager.accessQueue("DEV.QUEUE.1", MQOO_INPUT_AS_Q_DEF); + check producer->put({ + payload: "Hello World".toBytes() + }); + Message? message = check consumer->get(); + if message !is () { + test:assertEquals(string:fromBytes(message.payload), "Hello World"); + } else { + test:assertFail("Expected a value for message"); + } + check producer->close(); + check consumer->close(); + check queueManager.disconnect(); +} From 8db5a92457d6e6bfcba42f1ec48c66bc4833b804 Mon Sep 17 00:00:00 2001 From: ayeshLK Date: Mon, 30 Oct 2023 16:21:34 +0530 Subject: [PATCH 15/17] Revert "Add basic test case for queue producer and consumer" This reverts commit 62017d1d38477581ab9386ae0190fd222f65e451. --- .../tests/queue_producer_consumer_tests.bal | 38 ------------------- 1 file changed, 38 deletions(-) delete mode 100644 ballerina/tests/queue_producer_consumer_tests.bal diff --git a/ballerina/tests/queue_producer_consumer_tests.bal b/ballerina/tests/queue_producer_consumer_tests.bal deleted file mode 100644 index 69651c6..0000000 --- a/ballerina/tests/queue_producer_consumer_tests.bal +++ /dev/null @@ -1,38 +0,0 @@ -// Copyright (c) 2023, WSO2 LLC. (http://www.wso2.org) All Rights Reserved. -// -// WSO2 LLC. licenses this file to you under the Apache License, -// Version 2.0 (the "License"); you may not use this file except -// in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -import ballerina/test; - -@test:Config { - groups: ["ibmq-queue"] -} -function basicQueueProducerConsumerTest() returns error? { - QueueManager queueManager = check new QueueManager(name = "QM1", host = "localhost", channel = "DEV.APP.SVRCONN"); - Queue producer = check queueManager.accessQueue("DEV.QUEUE.1", MQOO_OUTPUT); - Queue consumer = check queueManager.accessQueue("DEV.QUEUE.1", MQOO_INPUT_AS_Q_DEF); - check producer->put({ - payload: "Hello World".toBytes() - }); - Message? message = check consumer->get(); - if message !is () { - test:assertEquals(string:fromBytes(message.payload), "Hello World"); - } else { - test:assertFail("Expected a value for message"); - } - check producer->close(); - check consumer->close(); - check queueManager.disconnect(); -} From 03e6498fec5acdb9acaca78e8cdf43b1034f7b8b Mon Sep 17 00:00:00 2001 From: ayeshLK Date: Mon, 30 Oct 2023 16:20:23 +0530 Subject: [PATCH 16/17] Fix logic issue in saving native-queue in the bobject --- .../src/main/java/io/ballerina/lib/ibm.ibmmq/QueueManager.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/native/src/main/java/io/ballerina/lib/ibm.ibmmq/QueueManager.java b/native/src/main/java/io/ballerina/lib/ibm.ibmmq/QueueManager.java index 85a4780..846b102 100644 --- a/native/src/main/java/io/ballerina/lib/ibm.ibmmq/QueueManager.java +++ b/native/src/main/java/io/ballerina/lib/ibm.ibmmq/QueueManager.java @@ -89,7 +89,7 @@ public static Object accessQueue(BObject queueManagerObject, BString queueName, try { MQQueue mqQueue = queueManager.accessQueue(queueName.getValue(), options.intValue()); BObject bQueue = ValueCreator.createObjectValue(ModuleUtils.getModule(), BQUEUE); - bQueue.addNativeData(Constants.NATIVE_TOPIC, mqQueue); + bQueue.addNativeData(Constants.NATIVE_QUEUE, mqQueue); return bQueue; } catch (MQException e) { return createError(IBMMQ_ERROR, From 84aad4393a034fe91df7f524bf5056ebca8f7aee Mon Sep 17 00:00:00 2001 From: Dilan Sachintha Nayanajith Date: Tue, 31 Oct 2023 09:42:50 +0530 Subject: [PATCH 17/17] Update get message variable name --- ballerina/destination.bal | 6 +++--- ballerina/types.bal | 2 +- .../main/java/io/ballerina/lib/ibm.ibmmq/CommonUtils.java | 2 +- 3 files changed, 5 insertions(+), 5 deletions(-) diff --git a/ballerina/destination.bal b/ballerina/destination.bal index 33d0fbf..2e48a4e 100644 --- a/ballerina/destination.bal +++ b/ballerina/destination.bal @@ -19,7 +19,7 @@ import ballerina/jballerina.java; public type Destination distinct client object { remote function put(Message message) returns Error?; - remote function get(*GetMessageOptions options) returns Message|Error?; + remote function get(*GetMessageOptions getMessageOptions) returns Message|Error?; remote function close() returns Error?; }; @@ -32,7 +32,7 @@ public isolated client class Queue { 'class: "io.ballerina.lib.ibm.ibmmq.Queue" } external; - remote function get(*GetMessageOptions options) returns Message|Error? = + remote function get(*GetMessageOptions getMessageOptions) returns Message|Error? = @java:Method { 'class: "io.ballerina.lib.ibm.ibmmq.Queue" } external; @@ -51,7 +51,7 @@ public isolated client class Topic { 'class: "io.ballerina.lib.ibm.ibmmq.Topic" } external; - remote function get(*GetMessageOptions options) returns Message|Error? = + remote function get(*GetMessageOptions getMessageOptions) returns Message|Error? = @java:Method { 'class: "io.ballerina.lib.ibm.ibmmq.Topic" } external; diff --git a/ballerina/types.bal b/ballerina/types.bal index dd0d5a3..ecb6bba 100644 --- a/ballerina/types.bal +++ b/ballerina/types.bal @@ -26,7 +26,7 @@ public type QueueManagerConfiguration record {| |}; public type GetMessageOptions record {| - int gmOptions = MQGMO_NO_WAIT; + int options = MQGMO_NO_WAIT; int waitInterval = 10; |}; diff --git a/native/src/main/java/io/ballerina/lib/ibm.ibmmq/CommonUtils.java b/native/src/main/java/io/ballerina/lib/ibm.ibmmq/CommonUtils.java index d441506..295a071 100644 --- a/native/src/main/java/io/ballerina/lib/ibm.ibmmq/CommonUtils.java +++ b/native/src/main/java/io/ballerina/lib/ibm.ibmmq/CommonUtils.java @@ -62,7 +62,7 @@ public class CommonUtils { private static final BString PROPERTY_VALUE = StringUtils.fromString("value"); private static final BString PROPERTY_DESCRIPTOR = StringUtils.fromString("descriptor"); private static final BString WAIT_INTERVAL = StringUtils.fromString("waitInterval"); - private static final BString OPTIONS = StringUtils.fromString("gmOptions"); + private static final BString OPTIONS = StringUtils.fromString("options"); private static final MQPropertyDescriptor defaultPropertyDescriptor = new MQPropertyDescriptor();