Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add basic pub sub tests #7

Merged
merged 14 commits into from
Oct 30, 2023
25 changes: 24 additions & 1 deletion ballerina/Dependencies.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,35 @@ modules = [
{org = "ballerina", packageName = "jballerina.java", moduleName = "jballerina.java"}
]

[[package]]
org = "ballerina"
name = "lang.error"
version = "0.0.0"
scope = "testOnly"
dependencies = [
{org = "ballerina", name = "jballerina.java"}
]

[[package]]
org = "ballerina"
name = "test"
version = "0.0.0"
scope = "testOnly"
dependencies = [
{org = "ballerina", name = "jballerina.java"},
{org = "ballerina", name = "lang.error"}
]
modules = [
{org = "ballerina", packageName = "test", moduleName = "test"}
]

[[package]]
org = "ballerinax"
name = "ibm.ibmmq"
version = "0.1.0"
dependencies = [
{org = "ballerina", name = "jballerina.java"}
{org = "ballerina", name = "jballerina.java"},
{org = "ballerina", name = "test"}
]
modules = [
{org = "ballerinax", packageName = "ibm.ibmmq", moduleName = "ibm.ibmmq"}
Expand Down
94 changes: 47 additions & 47 deletions ballerina/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -102,51 +102,51 @@ task commitTomlFiles {
}
}

// task startActiveMQServer() {
// doLast {
// if (!Os.isFamily(Os.FAMILY_WINDOWS)) {
// def stdOut = new ByteArrayOutputStream()
// exec {
// commandLine 'sh', '-c', "docker ps --filter name=activemq-test"
// standardOutput = stdOut
// }
// if (!stdOut.toString().contains("activemq-test")) {
// println "Starting ActiveMQ server."
// exec {
// commandLine 'sh', '-c', "docker-compose -f tests/resources/docker-compose.yaml up -d"
// standardOutput = stdOut
// }
// println stdOut.toString()
// sleep(5 * 1000)
// } else {
// println "ActiveMQ server is already running."
// }
// }
// }
// }

// task stopActiveMQServer() {
// doLast {
// if (!Os.isFamily(Os.FAMILY_WINDOWS)) {
// def stdOut = new ByteArrayOutputStream()
// exec {
// commandLine 'sh', '-c', "docker ps --filter name=activemq-test"
// standardOutput = stdOut
// }
// if (stdOut.toString().contains("activemq-test")) {
// println "Stopping ActiveMQ server."
// exec {
// commandLine 'sh', '-c', "docker-compose -f tests/resources/docker-compose.yaml rm -svf"
// standardOutput = stdOut
// }
// println stdOut.toString()
// sleep(5 * 1000)
// } else {
// println "ActiveMQ server is not started."
// }
// }
// }
// }
task startIBMMQServer() {
doLast {
if (!Os.isFamily(Os.FAMILY_WINDOWS)) {
def stdOut = new ByteArrayOutputStream()
exec {
commandLine 'sh', '-c', "docker ps --filter name=ibmmq-test"
standardOutput = stdOut
}
if (!stdOut.toString().contains("ibmmq-test")) {
println "Starting IBMMQ server."
exec {
commandLine 'sh', '-c', "docker-compose -f tests/resources/docker-compose.yaml up -d"
standardOutput = stdOut
}
println stdOut.toString()
sleep(5 * 1000)
} else {
println "IBMMQ server is already running."
}
}
}
}

task stopIBMMQServer() {
doLast {
if (!Os.isFamily(Os.FAMILY_WINDOWS)) {
def stdOut = new ByteArrayOutputStream()
exec {
commandLine 'sh', '-c', "docker ps --filter name=ibmmq-test"
standardOutput = stdOut
}
if (stdOut.toString().contains("ibmmq-test")) {
println "Stopping IBMMQ server."
exec {
commandLine 'sh', '-c', "docker-compose -f tests/resources/docker-compose.yaml rm -svf"
standardOutput = stdOut
}
println stdOut.toString()
sleep(5 * 1000)
} else {
println "IBMMQ server is not started."
}
}
}
}

publishing {
publications {
Expand All @@ -168,8 +168,8 @@ publishing {

updateTomlFiles.dependsOn copyStdlibs

// test.dependsOn startActiveMQServer
// build.finalizedBy stopActiveMQServer
test.dependsOn startIBMMQServer
build.finalizedBy stopIBMMQServer

build.dependsOn ":ibm.ibmmq-native:build"
build.dependsOn "generatePomFileForMavenPublication"
Expand Down
196 changes: 196 additions & 0 deletions ballerina/tests/pub_sub_tests.bal
Original file line number Diff line number Diff line change
@@ -0,0 +1,196 @@
// Copyright (c) 2023, WSO2 LLC. (http://www.wso2.org) All Rights Reserved.
//
// WSO2 LLC. licenses this file to you under the Apache License,
// Version 2.0 (the "License"); you may not use this file except
// in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
import ballerina/test;

@test:Config {}
function basicPublisherSubscriberTest() returns error? {
QueueManager queueManager = check new QueueManager(name = "QM1", host = "localhost", channel = "DEV.APP.SVRCONN");
Topic subscriber = check queueManager.accessTopic("dev", "DEV.BASE.TOPIC", OPEN_AS_SUBSCRIPTION, MQSO_CREATE);
Topic publisher = check queueManager.accessTopic("dev", "DEV.BASE.TOPIC", OPEN_AS_PUBLICATION, MQOO_OUTPUT);
check publisher->put({
payload: "Hello World".toBytes()
});
Message? message = check subscriber->get();
if message !is () {
test:assertEquals(string:fromBytes(message.payload), "Hello World");
} else {
test:assertFail("Expected a value for message");
}
check subscriber->close();
check publisher->close();
check queueManager.disconnect();
}

@test:Config {}
function pubSubMultipleMessagesInOrderTest() returns error? {
QueueManager queueManager = check new QueueManager(name = "QM1", host = "localhost", channel = "DEV.APP.SVRCONN");
Topic subscriber = check queueManager.accessTopic("dev", "DEV.BASE.TOPIC", OPEN_AS_SUBSCRIPTION, MQSO_CREATE);
Topic publisher = check queueManager.accessTopic("dev", "DEV.BASE.TOPIC", OPEN_AS_PUBLICATION, MQOO_OUTPUT);
foreach int i in 0 ... 4 {
check publisher->put({
payload: i.toString().toBytes()
});
}
foreach int i in 0 ... 4 {
Message? message = check subscriber->get(waitInterval = 2);
if message !is () {
test:assertEquals(string:fromBytes(message.payload), i.toString());
} else {
test:assertFail("Expected a value for message");
}
}
check subscriber->close();
check publisher->close();
check queueManager.disconnect();
}

@test:Config {}
function subscribeWithFiniteTimeoutTest() returns error? {
QueueManager queueManager = check new QueueManager(name = "QM1", host = "localhost", channel = "DEV.APP.SVRCONN");
Topic subscriber = check queueManager.accessTopic("dev", "DEV.BASE.TOPIC", OPEN_AS_SUBSCRIPTION, MQSO_CREATE);
Topic publisher = check queueManager.accessTopic("dev", "DEV.BASE.TOPIC", OPEN_AS_PUBLICATION, MQOO_OUTPUT);
check publisher->put({
payload: "Hello World".toBytes()
});
Message? message = check subscriber->get(waitInterval = 5);
if message !is () {
test:assertEquals(string:fromBytes(message.payload), "Hello World");
} else {
test:assertFail("Expected a value for message");
}
check subscriber->close();
check publisher->close();
check queueManager.disconnect();
}

@test:Config {}
function subscribeWithoutPublishTest() returns error? {
QueueManager queueManager = check new QueueManager(name = "QM1", host = "localhost", channel = "DEV.APP.SVRCONN");
Topic subscriber = check queueManager.accessTopic("dev", "DEV.BASE.TOPIC", OPEN_AS_SUBSCRIPTION, MQSO_CREATE);
Message|Error? result = subscriber->get(waitInterval = 5);
test:assertTrue(result is ());
check subscriber->close();
check queueManager.disconnect();
}

@test:Config {}
function publishToNonExistingTopicTest() returns error? {
QueueManager queueManager = check new QueueManager(name = "QM1", host = "localhost", channel = "DEV.APP.SVRCONN");
Topic|Error result = queueManager.accessTopic("dev", "NON.EXISTING.TOPIC", OPEN_AS_PUBLICATION, MQOO_OUTPUT);
if result is Error {
test:assertEquals(result.message(), "Error occurred while accessing topic: MQJE001: Completion Code '2', Reason '2085'.");
test:assertEquals(result.detail().reasonCode, 2085);
test:assertEquals(result.detail().completionCode, 2);
} else {
test:assertFail("Expected an error");
}
check queueManager.disconnect();
}

@test:Config {}
function subscribeToNonExistingTopicTest() returns error? {
QueueManager queueManager = check new QueueManager(name = "QM1", host = "localhost", channel = "DEV.APP.SVRCONN");
Topic|Error result = queueManager.accessTopic("dev", "NON.EXISTING.TOPIC", OPEN_AS_SUBSCRIPTION, MQSO_CREATE);
if result is Error {
test:assertEquals(result.message(), "Error occurred while accessing topic: MQJE001: Completion Code '2', Reason '2085'.");
test:assertEquals(result.detail().reasonCode, 2085);
test:assertEquals(result.detail().completionCode, 2);
} else {
test:assertFail("Expected an error");
}
check queueManager.disconnect();
}

@test:Config {}
function subscribeWithInvalidTopicNameTest() returns error? {
QueueManager queueManager = check new QueueManager(name = "QM1", host = "localhost", channel = "DEV.APP.SVRCONN");
Topic|Error result = queueManager.accessTopic("dev", "INVALID TOPIC", OPEN_AS_SUBSCRIPTION, MQSO_CREATE);
if result is Error {
test:assertEquals(result.message(), "Error occurred while accessing topic: MQJE001: Completion Code '2', Reason '2152'.");
test:assertEquals(result.detail().reasonCode, 2152);
test:assertEquals(result.detail().completionCode, 2);
} else {
test:assertFail("Expected an error");
}
check queueManager.disconnect();
}

@test:Config {}
function publishWithInvalidTopicNameTest() returns error? {
QueueManager queueManager = check new QueueManager(name = "QM1", host = "localhost", channel = "DEV.APP.SVRCONN");
Topic|Error result = queueManager.accessTopic("dev", "INVALID TOPIC", OPEN_AS_PUBLICATION, MQOO_OUTPUT);
if result is Error {
test:assertEquals(result.message(), "Error occurred while accessing topic: MQJE001: Completion Code '2', Reason '2085'.");
test:assertEquals(result.detail().reasonCode, 2085);
test:assertEquals(result.detail().completionCode, 2);
} else {
test:assertFail("Expected an error");
}
check queueManager.disconnect();
}

@test:Config {}
function accessTopicAfterQMDisconnectTest() returns error? {
QueueManager queueManager = check new QueueManager(name = "QM1", host = "localhost", channel = "DEV.APP.SVRCONN");
check queueManager.disconnect();
Topic|Error result = queueManager.accessTopic("dev", "DEV.BASE.TOPIC", OPEN_AS_PUBLICATION, MQOO_OUTPUT);
if result is Error {
test:assertEquals(result.message(), "Error occurred while accessing topic: MQJE001: An MQException " +
"occurred: Completion Code '2', Reason '2018'\n'MQJI002: Not connected to a queue manager.'.");
test:assertEquals(result.detail().reasonCode, 2018);
test:assertEquals(result.detail().completionCode, 2);
} else {
test:assertFail("Expected an error");
}
}

@test:Config {}
function putToTopicAfterTopicCloseTest() returns error? {
QueueManager queueManager = check new QueueManager(name = "QM1", host = "localhost", channel = "DEV.APP.SVRCONN");
Topic publisher = check queueManager.accessTopic("dev", "DEV.BASE.TOPIC", OPEN_AS_PUBLICATION, MQOO_OUTPUT);
check publisher->close();
Error? result = publisher->put({
payload: "Hello World".toBytes()
});
if result is Error {
test:assertEquals(result.message(), "Error occurred while putting a message to the topic: MQJE001: " +
"An MQException occurred: Completion Code '2', Reason '2019'\n'MQJI027: The queue has been closed.'.");
test:assertEquals(result.detail().reasonCode, 2019);
test:assertEquals(result.detail().completionCode, 2);
} else {
test:assertFail("Expected an error");
}
check queueManager.disconnect();
}

@test:Config {}
function putToTopicAfterQMDisconnectTest() returns error? {
QueueManager queueManager = check new QueueManager(name = "QM1", host = "localhost", channel = "DEV.APP.SVRCONN");
Topic publisher = check queueManager.accessTopic("dev", "DEV.BASE.TOPIC", OPEN_AS_PUBLICATION, MQOO_OUTPUT);
check queueManager.disconnect();
Error? result = publisher->put({
payload: "Hello World".toBytes()
});
if result is Error {
test:assertEquals(result.message(), "Error occurred while putting a message to the topic: MQJE001: An MQException " +
"occurred: Completion Code '2', Reason '2018'\n'MQJI002: Not connected to a queue manager.'.");
test:assertEquals(result.detail().reasonCode, 2018);
test:assertEquals(result.detail().completionCode, 2);
} else {
test:assertFail("Expected an error");
}
check queueManager.disconnect();
}
12 changes: 12 additions & 0 deletions ballerina/tests/resources/docker-compose.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
version: "3.9"

services:
mq:
image: icr.io/ibm-messaging/mq:latest
container_name: ibmmq-test
ports:
- "1414:1414"
- "9443:9443"
environment:
- LICENSE=accept
- MQ_QMGR_NAME=QM1