Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add basic pub sub support #3

Merged
44 changes: 44 additions & 0 deletions ballerina/constants.bal
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
// Copyright (c) 2023, WSO2 LLC. (http://www.wso2.org) All Rights Reserved.
//
// WSO2 LLC. licenses this file to you under the Apache License,
// Version 2.0 (the "License"); you may not use this file except
// in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

// Option to indicate whether the topic is being opened for either publication or subscription.
public const OPEN_AS_SUBSCRIPTION = 1;
public const OPEN_AS_PUBLICATION = 2;

// Options that control the opening of the topic for either publication or subscription.
public const MQOO_ALTERNATE_USER_AUTHORITY = 4096;
public const MQOO_BIND_AS_Q_DEF = 0;
public const MQOO_FAIL_IF_QUIESCING = 8192;
public const MQOO_OUTPUT = 16;
public const MQOO_PASS_ALL_CONTEXT = 512;
public const MQOO_PASS_IDENTITY_CONTEXT = 256;
public const MQOO_SET_ALL_CONTEXT = 2048;
public const MQOO_SET_IDENTITY_CONTEXT = 1024;

// Options related to the the get message in a topic.
public const MQGMO_WAIT = 1;
public const MQGMO_NO_WAIT = 0;
public const MQGMO_SYNCPOINT = 2;
public const MQGMO_NO_SYNCPOINT = 4;
public const MQGMO_BROWSE_FIRST = 16;
public const MQGMO_BROWSE_NEXT = 32;
public const MQGMO_BROWSE_MSG_UNDER_CURSOR = 2048;
public const MQGMO_MSG_UNDER_CURSOR = 256;
public const MQGMO_LOCK = 512;
public const MQGMO_UNLOCK = 1024;
public const MQGMO_ACCEPT_TRUNCATED_MSG = 64;
public const MQGMO_FAIL_IF_QUIESCING = 8192;
public const MQGMO_CONVERT = 16384;
16 changes: 14 additions & 2 deletions ballerina/destination.bal
Original file line number Diff line number Diff line change
Expand Up @@ -13,17 +13,29 @@
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
import ballerina/jballerina.java;

public type Destination distinct client object {
remote function put(Message message) returns Error?;

remote function get() returns Message|Error?;
remote function get(GetMessageOptions options = {}) returns Message|Error?;
dilanSachi marked this conversation as resolved.
Show resolved Hide resolved
};

public type Queue distinct client object {
*Destination;
};

public type Topic distinct client object {
public client class Topic {
*Destination;

remote function put(Message message) returns Error? =
@java:Method {
'class: "io.ballerina.lib.ibm.ibmmq.Topic"
} external;

remote function get(GetMessageOptions options = {}) returns Message|Error =
@java:Method {
'class: "io.ballerina.lib.ibm.ibmmq.Topic"
} external;
};

13 changes: 12 additions & 1 deletion ballerina/errors.bal
Original file line number Diff line number Diff line change
Expand Up @@ -14,4 +14,15 @@
// specific language governing permissions and limitations
// under the License.

public type Error distinct error;
public type Error distinct error<ErrorDetails>;

# The error details type for the module.
#
# + reasonCode - The reason code for the error
# + errorCode - The error code for the error
# + completionCode - The completion code for the error
public type ErrorDetails record {|
int reasonCode?;
string errorCode?;
int completionCode?;
|};
7 changes: 4 additions & 3 deletions ballerina/queue_manager.bal
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,8 @@ public isolated class QueueManager {
return error Error("Not implemented");
}

public isolated function accessTopic(string topicName, string topicString, ConnectionOpenOptions options) returns Topic|Error {
return error Error("Not implemented");
}
public isolated function accessTopic(string topicName, string topicString, OPEN_AS_OPTION openAs, AccessTopicOptions options) returns Topic|Error =
@java:Method {
'class: "io.ballerina.lib.ibm.ibmmq.QueueManager"
} external;
}
17 changes: 14 additions & 3 deletions ballerina/types.bal
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,10 @@
// specific language governing permissions and limitations
// under the License.

public type GM_OPTIONS MQGMO_WAIT|MQGMO_NO_WAIT|MQGMO_SYNCPOINT|MQGMO_NO_SYNCPOINT|MQGMO_BROWSE_FIRST|MQGMO_BROWSE_MSG_UNDER_CURSOR|MQGMO_MSG_UNDER_CURSOR|MQGMO_LOCK|MQGMO_UNLOCK|MQGMO_ACCEPT_TRUNCATED_MSG|MQGMO_BROWSE_NEXT|MQGMO_ACCEPT_TRUNCATED_MSG|MQGMO_FAIL_IF_QUIESCING|MQGMO_CONVERT;

public type OPEN_AS_OPTION OPEN_AS_SUBSCRIPTION|OPEN_AS_PUBLICATION;
dilanSachi marked this conversation as resolved.
Show resolved Hide resolved
dilanSachi marked this conversation as resolved.
Show resolved Hide resolved

public type QueueManagerConfiguration record {|
string name;
string host;
Expand All @@ -24,15 +28,22 @@ public type QueueManagerConfiguration record {|
|};

public enum ConnectionOpenOptions {
MQOO_OUTPUT = "MQOO_OUTPUT",
// MQOO_OUTPUT = "MQOO_OUTPUT",
dilanSachi marked this conversation as resolved.
Show resolved Hide resolved
MQOO_INPUT_AS_Q_DEF = "MQOO_INPUT_AS_Q_DEF",
MQOO_INPUT_EXCLUSIVE = "MQOO_INPUT_EXCLUSIVE",
MQOO_INPUT_SHARED = "MQOO_INPUT_SHARED"
}

public type AccessTopicOptions MQOO_ALTERNATE_USER_AUTHORITY|MQOO_BIND_AS_Q_DEF|MQOO_FAIL_IF_QUIESCING|MQOO_OUTPUT|MQOO_PASS_ALL_CONTEXT|MQOO_PASS_IDENTITY_CONTEXT|MQOO_SET_ALL_CONTEXT|MQOO_SET_IDENTITY_CONTEXT;

public type GetMessageOptions record {|
GM_OPTIONS options = MQGMO_NO_SYNCPOINT;
int waitInterval = 0;
|};

public type Property record {|
map<anydata> descriptor;
boolean|byte|byte[]|decimal|float|int|string property;
map<anydata> descriptor?;
boolean|byte|byte[]|decimal|float|int|string value;
|};

public type Message record {|
Expand Down
158 changes: 157 additions & 1 deletion native/src/main/java/io/ballerina/lib/ibm.ibmmq/CommonUtils.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,23 +18,179 @@

package io.ballerina.lib.ibm.ibmmq;

import com.ibm.mq.MQException;
import com.ibm.mq.MQMessage;
import com.ibm.mq.MQPropertyDescriptor;
import io.ballerina.runtime.api.creators.ErrorCreator;
import io.ballerina.runtime.api.creators.TypeCreator;
import io.ballerina.runtime.api.creators.ValueCreator;
import io.ballerina.runtime.api.utils.StringUtils;
import io.ballerina.runtime.api.values.BError;
import io.ballerina.runtime.api.values.BMap;
import io.ballerina.runtime.api.values.BString;

import java.io.IOException;
import java.util.Collections;
import java.util.Enumeration;
import java.util.Optional;

import static io.ballerina.lib.ibm.ibmmq.Constants.IBMMQ_ERROR;
import static io.ballerina.lib.ibm.ibmmq.ModuleUtils.getModule;

/**
* {@code CommonUtils} contains the common utility functions for the Ballerina IBM MQ connector.
*/
public class CommonUtils {

public static final String BTOPIC = "TOPIC";
public static final String TOPIC_OBJECT = "TOPIC_OBJECT";
dilanSachi marked this conversation as resolved.
Show resolved Hide resolved
dilanSachi marked this conversation as resolved.
Show resolved Hide resolved

private static final String ERROR_DETAILS = "ErrorDetails";
private static final BString ERROR_REASON_CODE = StringUtils.fromString("reasonCode");
private static final BString ERROR_ERROR_CODE = StringUtils.fromString("errorCode");
private static final BString ERROR_COMPLETION_CODE = StringUtils.fromString("completionCode");
private static final BString MESSAGE_PAYLOAD = StringUtils.fromString("payload");
private static final BString MESSAGE_PROPERTIES = StringUtils.fromString("properties");
private static final BString MESSAGE_PROPERTY = StringUtils.fromString("property");
private static final String BPROPERTY = "Property";
private static final String BMESSAGE_NAME = "Message";
private static final BString PD_VERSION = StringUtils.fromString("version");
private static final BString PD_COPY_OPTIONS = StringUtils.fromString("copyOptions");
private static final BString PD_OPTIONS = StringUtils.fromString("options");
private static final BString PD_SUPPORT = StringUtils.fromString("support");
private static final BString PD_CONTEXT = StringUtils.fromString("context");
private static final BString PROPERTY_VALUE = StringUtils.fromString("value");
private static final BString PROPERTY_DESCRIPTOR = StringUtils.fromString("descriptor");


private static final MQPropertyDescriptor defaultPropertyDescriptor = new MQPropertyDescriptor();

public static MQMessage getMqMessageFromBMessage(BMap<BString, Object> bMessage) {
byte[] payload = bMessage.getArrayValue(MESSAGE_PAYLOAD).getBytes();
MQMessage mqMessage = new MQMessage();
try {
mqMessage.write(payload);
} catch (IOException e) {
throw createError(IBMMQ_ERROR,
String.format("Error occurred while populating payload: %s", e.getMessage()), e);
}
BMap<BString, Object> properties = (BMap<BString, Object>) bMessage.getMapValue(MESSAGE_PROPERTIES);
populateMQProperties(properties, mqMessage);
return mqMessage;
}

public static BMap<BString, Object> getBMessageFromMQMessage(MQMessage mqMessage) {
BMap<BString, Object> bMessage = ValueCreator.createRecordValue(getModule(), BMESSAGE_NAME);
try {
byte[] payload = new byte[mqMessage.getDataLength()];
mqMessage.readFully(payload);
bMessage.put(MESSAGE_PAYLOAD, payload);
bMessage.put(MESSAGE_PROPERTY, getBProperties(mqMessage));
return bMessage;
} catch (MQException | IOException e) {
throw createError(IBMMQ_ERROR,
String.format("Error occurred while reading the message: %s", e.getMessage()), e);
}
}

private static BMap<BString, Object> getBProperties(MQMessage mqMessage) throws MQException {
BMap<BString, Object> properties = ValueCreator.createMapValue(TypeCreator
.createMapType(TypeCreator.createRecordType(BPROPERTY, getModule(), 0, false, 0)));
Enumeration<String> propertyNames = mqMessage.getPropertyNames("%");
for (String propertyName : Collections.list(propertyNames)) {
BMap<BString, Object> property = ValueCreator.createRecordValue(getModule(), BPROPERTY);
MQPropertyDescriptor propertyDescriptor = new MQPropertyDescriptor();
Object propertyObject = mqMessage.getObjectProperty(propertyName, propertyDescriptor);
if (propertyObject instanceof Integer) {
dilanSachi marked this conversation as resolved.
Show resolved Hide resolved
property.put(PROPERTY_VALUE, ((Integer) propertyObject).longValue());
} else if (propertyObject instanceof String) {
property.put(PROPERTY_VALUE, StringUtils.fromString((String) propertyObject));
} else {
property.put(PROPERTY_VALUE, propertyObject);
}
property.put(PROPERTY_DESCRIPTOR,
populateDescriptorFromMQPropertyDescriptor(propertyDescriptor));
properties.put(StringUtils.fromString(propertyName), property);
}
return properties;
}

private static void populateMQProperties(BMap<BString, Object> properties, MQMessage mqMessage) {
for (BString key : properties.getKeys()) {
try {
handlePropertyValue(properties, mqMessage, key);
} catch (MQException e) {
throw createError(IBMMQ_ERROR,
String.format("Error occurred while setting message properties: %s", e.getMessage()), e);
}
}
}

private static void handlePropertyValue(BMap<BString, Object> properties, MQMessage mqMessage, BString key)
throws MQException {
BMap<BString, Object> property = (BMap<BString, Object>) properties.getMapValue(key);
MQPropertyDescriptor propertyDescriptor = defaultPropertyDescriptor;
if (property.containsKey(PROPERTY_DESCRIPTOR)) {
propertyDescriptor = getMQPropertyDescriptor(properties.getMapValue(PROPERTY_DESCRIPTOR));
}
Object value = property.get(PROPERTY_VALUE);
if (value instanceof Long) {
dilanSachi marked this conversation as resolved.
Show resolved Hide resolved
mqMessage.setIntProperty(key.getValue(), propertyDescriptor, ((Long) properties.get(key)).intValue());
} else if (value instanceof Boolean) {
mqMessage.setBooleanProperty(key.getValue(), propertyDescriptor, ((Boolean) properties.get(key)));
} else if (value instanceof Byte) {
mqMessage.setByteProperty(key.getValue(), propertyDescriptor, (Byte) properties.get(key));
} else if (value instanceof byte[]) {
mqMessage.setBytesProperty(key.getValue(), propertyDescriptor, ((byte[]) properties.get(key)));
} else if (value instanceof Float) {
mqMessage.setFloatProperty(key.getValue(), propertyDescriptor, (Float) properties.get(key));
} else if (value instanceof Double) {
mqMessage.setDoubleProperty(key.getValue(), propertyDescriptor, (Double) properties.get(key));
} else if (value instanceof BString) {
mqMessage.setStringProperty(key.getValue(), propertyDescriptor, ((BString) properties.get(key)).getValue());
}
}

private static MQPropertyDescriptor getMQPropertyDescriptor(BMap descriptor) {
MQPropertyDescriptor propertyDescriptor = new MQPropertyDescriptor();
if (descriptor.containsKey(PD_VERSION)) {
propertyDescriptor.version = ((Long) descriptor.get(PD_VERSION)).intValue();
}
if (descriptor.containsKey(PD_COPY_OPTIONS)) {
propertyDescriptor.copyOptions = ((Long) descriptor.get(PD_COPY_OPTIONS)).intValue();
}
if (descriptor.containsKey(PD_OPTIONS)) {
propertyDescriptor.options = ((Long) descriptor.get(PD_OPTIONS)).intValue();
}
if (descriptor.containsKey(PD_SUPPORT)) {
propertyDescriptor.support = ((Long) descriptor.get(PD_SUPPORT)).intValue();
}
if (descriptor.containsKey(PD_CONTEXT)) {
propertyDescriptor.context = ((Long) descriptor.get(PD_CONTEXT)).intValue();
}
return propertyDescriptor;
}

private static BMap populateDescriptorFromMQPropertyDescriptor(MQPropertyDescriptor propertyDescriptor) {
BMap<BString, Object> descriptor = ValueCreator.createMapValue();
descriptor.put(PD_VERSION, propertyDescriptor.version);
descriptor.put(PD_COPY_OPTIONS, propertyDescriptor.copyOptions);
descriptor.put(PD_OPTIONS, propertyDescriptor.options);
descriptor.put(PD_SUPPORT, propertyDescriptor.support);
descriptor.put(PD_CONTEXT, propertyDescriptor.context);
return descriptor;
}

public static BError createError(String errorType, String message, Throwable throwable) {
BError cause = ErrorCreator.createError(throwable);
BMap<BString, Object> errorDetails = ValueCreator.createRecordValue(getModule(), ERROR_DETAILS);
if (throwable instanceof MQException exception) {
errorDetails.put(ERROR_REASON_CODE, exception.getReason());
errorDetails.put(ERROR_ERROR_CODE, exception.getErrorCode());
errorDetails.put(ERROR_COMPLETION_CODE, exception.getCompCode());
}
return ErrorCreator.createError(
ModuleUtils.getModule(), errorType, StringUtils.fromString(message), cause, null);
ModuleUtils.getModule(), errorType, StringUtils.fromString(message), cause, errorDetails);
}

public static Optional<String> getOptionalStringProperty(BMap<BString, Object> config, BString fieldName) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
/*
* Copyright (c) 2023, WSO2 LLC. (http://www.wso2.org) All Rights Reserved.
*
* WSO2 LLC. licenses this file to you under the Apache License,
* Version 2.0 (the "License"); you may not use this file except
* in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package io.ballerina.lib.ibm.ibmmq;

import java.util.concurrent.ThreadFactory;

public class MQThreadFactory implements ThreadFactory {

private final String threadGroupName;

public MQThreadFactory(String threadGroupName) {
this.threadGroupName = threadGroupName;
}

@Override
public Thread newThread(Runnable runnable) {
Thread ibmMqClientThread = new Thread(runnable);
ibmMqClientThread.setName(threadGroupName);
return ibmMqClientThread;
}

dilanSachi marked this conversation as resolved.
Show resolved Hide resolved
}
18 changes: 18 additions & 0 deletions native/src/main/java/io/ballerina/lib/ibm.ibmmq/QueueManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,10 @@

import com.ibm.mq.MQException;
import com.ibm.mq.MQQueueManager;
import com.ibm.mq.MQTopic;
import com.ibm.mq.constants.MQConstants;
import io.ballerina.runtime.api.Environment;
import io.ballerina.runtime.api.creators.ValueCreator;
import io.ballerina.runtime.api.utils.StringUtils;
import io.ballerina.runtime.api.values.BMap;
import io.ballerina.runtime.api.values.BObject;
Expand Down Expand Up @@ -64,6 +67,21 @@ public static Object init(BObject queueManager, BMap<BString, Object> configurat
return null;
}

public static Object accessTopic(Environment env, BObject queueManagerObject, BString topicName,
dilanSachi marked this conversation as resolved.
Show resolved Hide resolved
BString topicString, Long openAs, Long options) {
MQQueueManager queueManager = (MQQueueManager) queueManagerObject.getNativeData(NATIVE_QUEUE_MANAGER);
try {
MQTopic mqTopic = queueManager.accessTopic(topicName.getValue(), topicString.getValue(),
openAs.intValue(), options.intValue());
BObject bTopic = ValueCreator.createObjectValue(ModuleUtils.getModule(), CommonUtils.BTOPIC);
bTopic.addNativeData(CommonUtils.TOPIC_OBJECT, mqTopic);
return bTopic;
} catch (MQException e) {
return createError(IBMMQ_ERROR,
String.format("Error occurred while accessing topic: %s", e.getMessage()), e);
}
}

private static Hashtable<String, Object> getConnectionProperties(BMap<BString, Object> configurations) {
Hashtable<String, Object> properties = new Hashtable<>();
String host = configurations.getStringValue(HOST).getValue();
Expand Down
Loading