Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add basic pub sub support #3

Merged
44 changes: 44 additions & 0 deletions ballerina/constants.bal
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
// Copyright (c) 2023, WSO2 LLC. (http://www.wso2.org) All Rights Reserved.
//
// WSO2 LLC. licenses this file to you under the Apache License,
// Version 2.0 (the "License"); you may not use this file except
// in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

// Option to indicate whether the topic is being opened for either publication or subscription.
public const OPEN_AS_SUBSCRIPTION = 1;
public const OPEN_AS_PUBLICATION = 2;

// Options that control the opening of the topic for either publication or subscription.
public const MQOO_ALTERNATE_USER_AUTHORITY = 4096;
public const MQOO_BIND_AS_Q_DEF = 0;
public const MQOO_FAIL_IF_QUIESCING = 8192;
public const MQOO_OUTPUT = 16;
public const MQOO_PASS_ALL_CONTEXT = 512;
public const MQOO_PASS_IDENTITY_CONTEXT = 256;
public const MQOO_SET_ALL_CONTEXT = 2048;
public const MQOO_SET_IDENTITY_CONTEXT = 1024;

// Options related to the the get message in a topic.
public const MQGMO_WAIT = 1;
public const MQGMO_NO_WAIT = 0;
public const MQGMO_SYNCPOINT = 2;
public const MQGMO_NO_SYNCPOINT = 4;
public const MQGMO_BROWSE_FIRST = 16;
public const MQGMO_BROWSE_NEXT = 32;
public const MQGMO_BROWSE_MSG_UNDER_CURSOR = 2048;
public const MQGMO_MSG_UNDER_CURSOR = 256;
public const MQGMO_LOCK = 512;
public const MQGMO_UNLOCK = 1024;
public const MQGMO_ACCEPT_TRUNCATED_MSG = 64;
public const MQGMO_FAIL_IF_QUIESCING = 8192;
public const MQGMO_CONVERT = 16384;
18 changes: 16 additions & 2 deletions ballerina/destination.bal
Original file line number Diff line number Diff line change
Expand Up @@ -13,17 +13,31 @@
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
import ballerina/jballerina.java;

public type Destination distinct client object {
remote function put(Message message) returns Error?;

remote function get() returns Message|Error?;
remote function get(GetMessageOptions options = {}) returns Message|Error?;
dilanSachi marked this conversation as resolved.
Show resolved Hide resolved
};

public type Queue distinct client object {
*Destination;
};

public type Topic distinct client object {
public client class Topic {
*Destination;

remote function put(Message message) returns Error? =
@java:Method {
name: "externPut",
'class: "io.ballerina.lib.ibm.ibmmq.Topic"
} external;

remote function get(GetMessageOptions options = {}) returns Message|Error =
@java:Method {
name: "externGet",
'class: "io.ballerina.lib.ibm.ibmmq.Topic"
} external;
};

13 changes: 12 additions & 1 deletion ballerina/errors.bal
Original file line number Diff line number Diff line change
Expand Up @@ -14,4 +14,15 @@
// specific language governing permissions and limitations
// under the License.

public type Error distinct error;
public type Error distinct error<ErrorDetails>;

# The error details type for the module.
#
# + reasonCode - The reason code for the error
# + errorCode - The error code for the error
# + completionCode - The completion code for the error
public type ErrorDetails record {|
int reasonCode?;
string errorCode?;
int completionCode?;
|};
9 changes: 7 additions & 2 deletions ballerina/queue_manager.bal
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,12 @@ public isolated class QueueManager {
return error Error("Not implemented");
}

public isolated function accessTopic(string topicName, string topicString, ConnectionOpenOptions options) returns Topic|Error {
return error Error("Not implemented");
public isolated function accessTopic(string topicName, string topicString, OPEN_AS_OPTION openAs, AccessTopicOptions options) returns Topic|Error {
return self.externAccessTopic(topicName, topicString, openAs, options);
}

private isolated function externAccessTopic(string topicName, string topicString, OPEN_AS_OPTION openAs, AccessTopicOptions options) returns Topic|Error =
@java:Method {
'class: "io.ballerina.lib.ibm.ibmmq.QueueManager"
} external;
dilanSachi marked this conversation as resolved.
Show resolved Hide resolved
}
13 changes: 12 additions & 1 deletion ballerina/types.bal
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,10 @@
// specific language governing permissions and limitations
// under the License.

public type GM_OPTIONS MQGMO_WAIT|MQGMO_NO_WAIT|MQGMO_SYNCPOINT|MQGMO_NO_SYNCPOINT|MQGMO_BROWSE_FIRST|MQGMO_BROWSE_MSG_UNDER_CURSOR|MQGMO_MSG_UNDER_CURSOR|MQGMO_LOCK|MQGMO_UNLOCK|MQGMO_ACCEPT_TRUNCATED_MSG|MQGMO_BROWSE_NEXT|MQGMO_ACCEPT_TRUNCATED_MSG|MQGMO_FAIL_IF_QUIESCING|MQGMO_CONVERT;

public type OPEN_AS_OPTION OPEN_AS_SUBSCRIPTION|OPEN_AS_PUBLICATION;
dilanSachi marked this conversation as resolved.
Show resolved Hide resolved
dilanSachi marked this conversation as resolved.
Show resolved Hide resolved

public type QueueManagerConfiguration record {|
string name;
string host;
Expand All @@ -24,12 +28,19 @@ public type QueueManagerConfiguration record {|
|};

public enum ConnectionOpenOptions {
MQOO_OUTPUT = "MQOO_OUTPUT",
// MQOO_OUTPUT = "MQOO_OUTPUT",
dilanSachi marked this conversation as resolved.
Show resolved Hide resolved
MQOO_INPUT_AS_Q_DEF = "MQOO_INPUT_AS_Q_DEF",
MQOO_INPUT_EXCLUSIVE = "MQOO_INPUT_EXCLUSIVE",
MQOO_INPUT_SHARED = "MQOO_INPUT_SHARED"
}

public type AccessTopicOptions MQOO_ALTERNATE_USER_AUTHORITY|MQOO_BIND_AS_Q_DEF|MQOO_FAIL_IF_QUIESCING|MQOO_OUTPUT|MQOO_PASS_ALL_CONTEXT|MQOO_PASS_IDENTITY_CONTEXT|MQOO_SET_ALL_CONTEXT|MQOO_SET_IDENTITY_CONTEXT;

public type GetMessageOptions record {|
GM_OPTIONS options = MQGMO_NO_SYNCPOINT;
int waitInterval = 0;
|};

public type Property record {|
map<anydata> descriptor;
boolean|byte|byte[]|decimal|float|int|string property;
Expand Down
70 changes: 69 additions & 1 deletion native/src/main/java/io/ballerina/lib/ibm.ibmmq/CommonUtils.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,23 +18,91 @@

package io.ballerina.lib.ibm.ibmmq;

import com.ibm.mq.MQException;
import com.ibm.mq.MQMessage;
import io.ballerina.runtime.api.creators.ErrorCreator;
import io.ballerina.runtime.api.creators.ValueCreator;
import io.ballerina.runtime.api.utils.StringUtils;
import io.ballerina.runtime.api.values.BError;
import io.ballerina.runtime.api.values.BMap;
import io.ballerina.runtime.api.values.BString;

import java.io.IOException;
import java.util.Collections;
import java.util.Enumeration;
import java.util.Optional;

import static io.ballerina.lib.ibm.ibmmq.Constants.IBMMQ_ERROR;
import static io.ballerina.lib.ibm.ibmmq.ModuleUtils.getModule;

/**
* {@code CommonUtils} contains the common utility functions for the Ballerina IBM MQ connector.
*/
public class CommonUtils {

public static final String BTOPIC = "TOPIC";
public static final String TOPIC_OBJECT = "TOPIC_OBJECT";
dilanSachi marked this conversation as resolved.
Show resolved Hide resolved
dilanSachi marked this conversation as resolved.
Show resolved Hide resolved

private static final String ERROR_DETAILS = "ErrorDetails";
private static final BString ERROR_REASON_CODE = StringUtils.fromString("ErrorDetails");
private static final BString ERROR_ERROR_CODE = StringUtils.fromString("ErrorDetails");
private static final BString ERROR_COMPLETION_CODE = StringUtils.fromString("ErrorDetails");
private static final BString MESSAGE_PAYLOAD = StringUtils.fromString("payload");
private static final BString MESSAGE_PROPERTIES = StringUtils.fromString("properties");
private static final String BMESSAGE_NAME = "Message";

public static MQMessage getMqMessageFromMessage(BMap<BString, Object> bMessage) {
byte[] payload = bMessage.getArrayValue(MESSAGE_PAYLOAD).getBytes();
BMap<BString, Object> properties = (BMap<BString, Object>) bMessage.getMapValue(MESSAGE_PROPERTIES);

MQMessage mqMessage = new MQMessage();
try {
mqMessage.write(payload);
} catch (IOException e) {
throw createError(IBMMQ_ERROR,
String.format("Error occurred while populating payload: %s", e.getMessage()), e);
}
for (BString key : properties.getKeys()) {
dilanSachi marked this conversation as resolved.
Show resolved Hide resolved
try {
mqMessage.setObjectProperty(key.getValue(), properties.get(key));
} catch (MQException e) {
throw createError(IBMMQ_ERROR,
String.format("Error occurred while setting message properties: %s", e.getMessage()), e);
}
}
return mqMessage;
}

public static BMap<BString, Object> getBMessageFromMQMessage(MQMessage mqMessage) {
BMap<BString, Object> bMessage = ValueCreator.createRecordValue(getModule(), BMESSAGE_NAME);
try {
byte[] payload = new byte[mqMessage.getDataLength()];
mqMessage.readFully(payload);
bMessage.put(MESSAGE_PAYLOAD, payload);
BMap<BString, Object> properties = ValueCreator.createRecordValue(getModule(),
MESSAGE_PROPERTIES.getValue());
Enumeration<String> propertyNames = mqMessage.getPropertyNames("%");
for (String propertyName : Collections.list(propertyNames)) {
dilanSachi marked this conversation as resolved.
Show resolved Hide resolved
properties.put(StringUtils.fromString(propertyName), mqMessage.getObjectProperty(propertyName));
}
bMessage.put(MESSAGE_PROPERTIES, properties);
return bMessage;
} catch (MQException | IOException e) {
throw createError(IBMMQ_ERROR,
String.format("Error occurred while reading the message: %s", e.getMessage()), e);
}
}

public static BError createError(String errorType, String message, Throwable throwable) {
BError cause = ErrorCreator.createError(throwable);
BMap<BString, Object> errorDetails = ValueCreator.createRecordValue(getModule(), ERROR_DETAILS);
if (throwable instanceof MQException) {
errorDetails.put(ERROR_REASON_CODE, ((MQException) throwable).getReason());
errorDetails.put(ERROR_ERROR_CODE, ((MQException) throwable).getErrorCode());
errorDetails.put(ERROR_COMPLETION_CODE, ((MQException) throwable).getCompCode());
}
return ErrorCreator.createError(
ModuleUtils.getModule(), errorType, StringUtils.fromString(message), cause, null);
ModuleUtils.getModule(), errorType, StringUtils.fromString(message), cause, errorDetails);
}

public static Optional<String> getOptionalStringProperty(BMap<BString, Object> config, BString fieldName) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
/*
* Copyright (c) 2023, WSO2 LLC. (http://www.wso2.org) All Rights Reserved.
*
* WSO2 LLC. licenses this file to you under the Apache License,
* Version 2.0 (the "License"); you may not use this file except
* in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package io.ballerina.lib.ibm.ibmmq;

import java.util.concurrent.ThreadFactory;

public class MQThreadFactory implements ThreadFactory {

private final String threadGroupName;

public MQThreadFactory(String threadGroupName) {
this.threadGroupName = threadGroupName;
}

@Override
public Thread newThread(Runnable runnable) {
Thread ibmMqClientThread = new Thread(runnable);
ibmMqClientThread.setName(threadGroupName);
return ibmMqClientThread;
}

dilanSachi marked this conversation as resolved.
Show resolved Hide resolved
}
18 changes: 18 additions & 0 deletions native/src/main/java/io/ballerina/lib/ibm.ibmmq/QueueManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,10 @@

import com.ibm.mq.MQException;
import com.ibm.mq.MQQueueManager;
import com.ibm.mq.MQTopic;
import com.ibm.mq.constants.MQConstants;
import io.ballerina.runtime.api.Environment;
import io.ballerina.runtime.api.creators.ValueCreator;
import io.ballerina.runtime.api.utils.StringUtils;
import io.ballerina.runtime.api.values.BMap;
import io.ballerina.runtime.api.values.BObject;
Expand Down Expand Up @@ -64,6 +67,21 @@ public static Object init(BObject queueManager, BMap<BString, Object> configurat
return null;
}

public static Object externAccessTopic(Environment env, BObject queueManagerObject, BString topicName,
BString topicString, Long openAs, Long options) {
MQQueueManager queueManager = (MQQueueManager) queueManagerObject.getNativeData(NATIVE_QUEUE_MANAGER);
try {
MQTopic mqTopic = queueManager.accessTopic(topicName.getValue(), topicString.getValue(),
openAs.intValue(), options.intValue());
BObject bTopic = ValueCreator.createObjectValue(ModuleUtils.getModule(), CommonUtils.BTOPIC);
bTopic.addNativeData(CommonUtils.TOPIC_OBJECT, mqTopic);
return bTopic;
} catch (MQException e) {
return createError(IBMMQ_ERROR,
String.format("Error occurred while accessing topic: %s", e.getMessage()), e);
}
}

private static Hashtable<String, Object> getConnectionProperties(BMap<BString, Object> configurations) {
Hashtable<String, Object> properties = new Hashtable<>();
String host = configurations.getStringValue(HOST).getValue();
Expand Down
80 changes: 80 additions & 0 deletions native/src/main/java/io/ballerina/lib/ibm.ibmmq/Topic.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
/*
* Copyright (c) 2023, WSO2 LLC. (http://www.wso2.org) All Rights Reserved.
*
* WSO2 LLC. licenses this file to you under the Apache License,
* Version 2.0 (the "License"); you may not use this file except
* in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package io.ballerina.lib.ibm.ibmmq;

import com.ibm.mq.MQGetMessageOptions;
import com.ibm.mq.MQMessage;
import com.ibm.mq.MQTopic;
import io.ballerina.runtime.api.Environment;
import io.ballerina.runtime.api.Future;
import io.ballerina.runtime.api.utils.StringUtils;
import io.ballerina.runtime.api.values.BMap;
import io.ballerina.runtime.api.values.BObject;
import io.ballerina.runtime.api.values.BString;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class Topic {
private static final ExecutorService topicExecutorService =
Executors.newCachedThreadPool(new MQThreadFactory("balx-ibm-mq-client-network-thread"));
dilanSachi marked this conversation as resolved.
Show resolved Hide resolved

private static final BString WAIT_INTERVAL = StringUtils.fromString("waitInterval");
private static final BString OPTIONS = StringUtils.fromString("options");

public static Object externPut(Environment environment, BObject topicObject, BMap message) {
dilanSachi marked this conversation as resolved.
Show resolved Hide resolved
MQTopic topic = (MQTopic) topicObject.getNativeData(CommonUtils.TOPIC_OBJECT);
MQMessage mqMessage = CommonUtils.getMqMessageFromMessage(message);
Future future = environment.markAsync();
topicExecutorService.execute(() -> {
try {
topic.put(mqMessage);
future.complete(null);
} catch (Exception e) {
future.complete(e);
}
});
return null;
}

public static Object externGet(Environment environment, BObject topicObject, BMap<BString, Object> options) {
dilanSachi marked this conversation as resolved.
Show resolved Hide resolved
MQTopic topic = (MQTopic) topicObject.getNativeData(CommonUtils.TOPIC_OBJECT);
MQGetMessageOptions getMessageOptions = getGetMessageOptions(options);
Future future = environment.markAsync();
topicExecutorService.execute(() -> {
try {
MQMessage message = new MQMessage();
topic.get(message, getMessageOptions);
future.complete(CommonUtils.getBMessageFromMQMessage(message));
} catch (Exception e) {
future.complete(e);
}
});
return null;
}

private static MQGetMessageOptions getGetMessageOptions(BMap<BString, Object> bOptions) {
int waitInterval = bOptions.getIntValue(WAIT_INTERVAL).intValue();
int options = bOptions.getIntValue(OPTIONS).intValue();
MQGetMessageOptions getMessageOptions = new MQGetMessageOptions();
getMessageOptions.waitInterval = waitInterval;
getMessageOptions.options = options;
return getMessageOptions;
}
}