Skip to content

Commit

Permalink
Merge pull request ballerina-platform#3 from dilanSachi/add-pub-sub
Browse files Browse the repository at this point in the history
Add basic pub sub support
  • Loading branch information
ayeshLK authored Oct 27, 2023
2 parents b511c2d + 30067cb commit 06f7660
Show file tree
Hide file tree
Showing 10 changed files with 380 additions and 10 deletions.
44 changes: 44 additions & 0 deletions ballerina/constants.bal
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
// Copyright (c) 2023, WSO2 LLC. (http://www.wso2.org) All Rights Reserved.
//
// WSO2 LLC. licenses this file to you under the Apache License,
// Version 2.0 (the "License"); you may not use this file except
// in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

// Option to indicate whether the topic is being opened for either publication or subscription.
public const OPEN_AS_SUBSCRIPTION = 1;
public const OPEN_AS_PUBLICATION = 2;

// Options that control the opening of the topic for either publication or subscription.
public const MQOO_ALTERNATE_USER_AUTHORITY = 4096;
public const MQOO_BIND_AS_Q_DEF = 0;
public const MQOO_FAIL_IF_QUIESCING = 8192;
public const MQOO_OUTPUT = 16;
public const MQOO_PASS_ALL_CONTEXT = 512;
public const MQOO_PASS_IDENTITY_CONTEXT = 256;
public const MQOO_SET_ALL_CONTEXT = 2048;
public const MQOO_SET_IDENTITY_CONTEXT = 1024;

// Options related to the the get message in a topic.
public const MQGMO_WAIT = 1;
public const MQGMO_NO_WAIT = 0;
public const MQGMO_SYNCPOINT = 2;
public const MQGMO_NO_SYNCPOINT = 4;
public const MQGMO_BROWSE_FIRST = 16;
public const MQGMO_BROWSE_NEXT = 32;
public const MQGMO_BROWSE_MSG_UNDER_CURSOR = 2048;
public const MQGMO_MSG_UNDER_CURSOR = 256;
public const MQGMO_LOCK = 512;
public const MQGMO_UNLOCK = 1024;
public const MQGMO_ACCEPT_TRUNCATED_MSG = 64;
public const MQGMO_FAIL_IF_QUIESCING = 8192;
public const MQGMO_CONVERT = 16384;
16 changes: 14 additions & 2 deletions ballerina/destination.bal
Original file line number Diff line number Diff line change
Expand Up @@ -13,17 +13,29 @@
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
import ballerina/jballerina.java;

public type Destination distinct client object {
remote function put(Message message) returns Error?;

remote function get() returns Message|Error?;
remote function get(*GetMessageOptions options) returns Message|Error?;
};

public type Queue distinct client object {
*Destination;
};

public type Topic distinct client object {
public client class Topic {
*Destination;

remote function put(Message message) returns Error? =
@java:Method {
'class: "io.ballerina.lib.ibm.ibmmq.Topic"
} external;

remote function get(*GetMessageOptions options) returns Message|Error =
@java:Method {
'class: "io.ballerina.lib.ibm.ibmmq.Topic"
} external;
};

13 changes: 12 additions & 1 deletion ballerina/errors.bal
Original file line number Diff line number Diff line change
Expand Up @@ -14,4 +14,15 @@
// specific language governing permissions and limitations
// under the License.

public type Error distinct error;
public type Error distinct error<ErrorDetails>;

# The error details type for the module.
#
# + reasonCode - The reason code for the error
# + errorCode - The error code for the error
# + completionCode - The completion code for the error
public type ErrorDetails record {|
int reasonCode?;
string errorCode?;
int completionCode?;
|};
7 changes: 4 additions & 3 deletions ballerina/queue_manager.bal
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,8 @@ public isolated class QueueManager {
return error Error("Not implemented");
}

public isolated function accessTopic(string topicName, string topicString, ConnectionOpenOptions options) returns Topic|Error {
return error Error("Not implemented");
}
public isolated function accessTopic(string topicName, string topicString, OPEN_TOPIC_OPTION openTopicOption, AccessTopicOptions options) returns Topic|Error =
@java:Method {
'class: "io.ballerina.lib.ibm.ibmmq.QueueManager"
} external;
}
17 changes: 14 additions & 3 deletions ballerina/types.bal
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,10 @@
// specific language governing permissions and limitations
// under the License.

public type GM_OPTIONS MQGMO_WAIT|MQGMO_NO_WAIT|MQGMO_SYNCPOINT|MQGMO_NO_SYNCPOINT|MQGMO_BROWSE_FIRST|MQGMO_BROWSE_MSG_UNDER_CURSOR|MQGMO_MSG_UNDER_CURSOR|MQGMO_LOCK|MQGMO_UNLOCK|MQGMO_ACCEPT_TRUNCATED_MSG|MQGMO_BROWSE_NEXT|MQGMO_ACCEPT_TRUNCATED_MSG|MQGMO_FAIL_IF_QUIESCING|MQGMO_CONVERT;

public type OPEN_TOPIC_OPTION OPEN_AS_SUBSCRIPTION|OPEN_AS_PUBLICATION;

public type QueueManagerConfiguration record {|
string name;
string host;
Expand All @@ -24,15 +28,22 @@ public type QueueManagerConfiguration record {|
|};

public enum ConnectionOpenOptions {
MQOO_OUTPUT = "MQOO_OUTPUT",
// MQOO_OUTPUT = "MQOO_OUTPUT",
MQOO_INPUT_AS_Q_DEF = "MQOO_INPUT_AS_Q_DEF",
MQOO_INPUT_EXCLUSIVE = "MQOO_INPUT_EXCLUSIVE",
MQOO_INPUT_SHARED = "MQOO_INPUT_SHARED"
}

public type AccessTopicOptions MQOO_ALTERNATE_USER_AUTHORITY|MQOO_BIND_AS_Q_DEF|MQOO_FAIL_IF_QUIESCING|MQOO_OUTPUT|MQOO_PASS_ALL_CONTEXT|MQOO_PASS_IDENTITY_CONTEXT|MQOO_SET_ALL_CONTEXT|MQOO_SET_IDENTITY_CONTEXT;

public type GetMessageOptions record {|
GM_OPTIONS gmOptions = MQGMO_NO_SYNCPOINT;
int waitInterval = 0;
|};

public type Property record {|
map<anydata> descriptor;
boolean|byte|byte[]|decimal|float|int|string property;
map<int> descriptor?;
boolean|byte|byte[]|decimal|float|int|string value;
|};

public type Message record {|
Expand Down
156 changes: 155 additions & 1 deletion native/src/main/java/io/ballerina/lib/ibm.ibmmq/CommonUtils.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,23 +18,177 @@

package io.ballerina.lib.ibm.ibmmq;

import com.ibm.mq.MQException;
import com.ibm.mq.MQMessage;
import com.ibm.mq.MQPropertyDescriptor;
import io.ballerina.runtime.api.PredefinedTypes;
import io.ballerina.runtime.api.creators.ErrorCreator;
import io.ballerina.runtime.api.creators.TypeCreator;
import io.ballerina.runtime.api.creators.ValueCreator;
import io.ballerina.runtime.api.utils.StringUtils;
import io.ballerina.runtime.api.values.BError;
import io.ballerina.runtime.api.values.BMap;
import io.ballerina.runtime.api.values.BString;

import java.io.IOException;
import java.util.Collections;
import java.util.Enumeration;
import java.util.Optional;

import static io.ballerina.lib.ibm.ibmmq.Constants.IBMMQ_ERROR;
import static io.ballerina.lib.ibm.ibmmq.ModuleUtils.getModule;

/**
* {@code CommonUtils} contains the common utility functions for the Ballerina IBM MQ connector.
*/
public class CommonUtils {

private static final String ERROR_DETAILS = "ErrorDetails";
private static final BString ERROR_REASON_CODE = StringUtils.fromString("reasonCode");
private static final BString ERROR_ERROR_CODE = StringUtils.fromString("errorCode");
private static final BString ERROR_COMPLETION_CODE = StringUtils.fromString("completionCode");
private static final BString MESSAGE_PAYLOAD = StringUtils.fromString("payload");
private static final BString MESSAGE_PROPERTIES = StringUtils.fromString("properties");
private static final BString MESSAGE_PROPERTY = StringUtils.fromString("property");
private static final String BPROPERTY = "Property";
private static final String BMESSAGE_NAME = "Message";
private static final BString PD_VERSION = StringUtils.fromString("version");
private static final BString PD_COPY_OPTIONS = StringUtils.fromString("copyOptions");
private static final BString PD_OPTIONS = StringUtils.fromString("options");
private static final BString PD_SUPPORT = StringUtils.fromString("support");
private static final BString PD_CONTEXT = StringUtils.fromString("context");
private static final BString PROPERTY_VALUE = StringUtils.fromString("value");
private static final BString PROPERTY_DESCRIPTOR = StringUtils.fromString("descriptor");

private static final MQPropertyDescriptor defaultPropertyDescriptor = new MQPropertyDescriptor();

public static MQMessage getMqMessageFromBMessage(BMap<BString, Object> bMessage) {
byte[] payload = bMessage.getArrayValue(MESSAGE_PAYLOAD).getBytes();
MQMessage mqMessage = new MQMessage();
try {
mqMessage.write(payload);
} catch (IOException e) {
throw createError(IBMMQ_ERROR,
String.format("Error occurred while populating payload: %s", e.getMessage()), e);
}
BMap<BString, Object> properties = (BMap<BString, Object>) bMessage.getMapValue(MESSAGE_PROPERTIES);
populateMQProperties(properties, mqMessage);
return mqMessage;
}

public static BMap<BString, Object> getBMessageFromMQMessage(MQMessage mqMessage) {
BMap<BString, Object> bMessage = ValueCreator.createRecordValue(getModule(), BMESSAGE_NAME);
try {
byte[] payload = new byte[mqMessage.getDataLength()];
mqMessage.readFully(payload);
bMessage.put(MESSAGE_PAYLOAD, payload);
bMessage.put(MESSAGE_PROPERTY, getBProperties(mqMessage));
return bMessage;
} catch (MQException | IOException e) {
throw createError(IBMMQ_ERROR,
String.format("Error occurred while reading the message: %s", e.getMessage()), e);
}
}

private static BMap<BString, Object> getBProperties(MQMessage mqMessage) throws MQException {
BMap<BString, Object> properties = ValueCreator.createMapValue(TypeCreator
.createMapType(TypeCreator.createRecordType(BPROPERTY, getModule(), 0, false, 0)));
Enumeration<String> propertyNames = mqMessage.getPropertyNames("%");
for (String propertyName : Collections.list(propertyNames)) {
BMap<BString, Object> property = ValueCreator.createRecordValue(getModule(), BPROPERTY);
MQPropertyDescriptor propertyDescriptor = new MQPropertyDescriptor();
Object propertyObject = mqMessage.getObjectProperty(propertyName, propertyDescriptor);
if (propertyObject instanceof Integer intProperty) {
property.put(PROPERTY_VALUE, intProperty.longValue());
} else if (propertyObject instanceof String stringProperty) {
property.put(PROPERTY_VALUE, StringUtils.fromString(stringProperty));
} else {
property.put(PROPERTY_VALUE, propertyObject);
}
property.put(PROPERTY_DESCRIPTOR,
populateDescriptorFromMQPropertyDescriptor(propertyDescriptor));
properties.put(StringUtils.fromString(propertyName), property);
}
return properties;
}

private static void populateMQProperties(BMap<BString, Object> properties, MQMessage mqMessage) {
for (BString key : properties.getKeys()) {
try {
handlePropertyValue(properties, mqMessage, key);
} catch (MQException e) {
throw createError(IBMMQ_ERROR,
String.format("Error occurred while setting message properties: %s", e.getMessage()), e);
}
}
}

private static void handlePropertyValue(BMap<BString, Object> properties, MQMessage mqMessage, BString key)
throws MQException {
BMap<BString, Object> property = (BMap<BString, Object>) properties.getMapValue(key);
MQPropertyDescriptor propertyDescriptor = defaultPropertyDescriptor;
if (property.containsKey(PROPERTY_DESCRIPTOR)) {
propertyDescriptor = getMQPropertyDescriptor(properties.getMapValue(PROPERTY_DESCRIPTOR));
}
Object value = property.get(PROPERTY_VALUE);
if (value instanceof Long longValue) {
mqMessage.setIntProperty(key.getValue(), propertyDescriptor, longValue.intValue());
} else if (value instanceof Boolean booleanValue) {
mqMessage.setBooleanProperty(key.getValue(), propertyDescriptor, booleanValue);
} else if (value instanceof Byte byteValue) {
mqMessage.setByteProperty(key.getValue(), propertyDescriptor, byteValue);
} else if (value instanceof byte[] bytesValue) {
mqMessage.setBytesProperty(key.getValue(), propertyDescriptor, bytesValue);
} else if (value instanceof Float floatValue) {
mqMessage.setFloatProperty(key.getValue(), propertyDescriptor, floatValue);
} else if (value instanceof Double doubleValue) {
mqMessage.setDoubleProperty(key.getValue(), propertyDescriptor, doubleValue);
} else if (value instanceof BString stringValue) {
mqMessage.setStringProperty(key.getValue(), propertyDescriptor, stringValue.getValue());
}
}

private static MQPropertyDescriptor getMQPropertyDescriptor(BMap descriptor) {
MQPropertyDescriptor propertyDescriptor = new MQPropertyDescriptor();
if (descriptor.containsKey(PD_VERSION)) {
propertyDescriptor.version = ((Long) descriptor.get(PD_VERSION)).intValue();
}
if (descriptor.containsKey(PD_COPY_OPTIONS)) {
propertyDescriptor.copyOptions = ((Long) descriptor.get(PD_COPY_OPTIONS)).intValue();
}
if (descriptor.containsKey(PD_OPTIONS)) {
propertyDescriptor.options = ((Long) descriptor.get(PD_OPTIONS)).intValue();
}
if (descriptor.containsKey(PD_SUPPORT)) {
propertyDescriptor.support = ((Long) descriptor.get(PD_SUPPORT)).intValue();
}
if (descriptor.containsKey(PD_CONTEXT)) {
propertyDescriptor.context = ((Long) descriptor.get(PD_CONTEXT)).intValue();
}
return propertyDescriptor;
}

private static BMap populateDescriptorFromMQPropertyDescriptor(MQPropertyDescriptor propertyDescriptor) {
BMap<BString, Object> descriptor = ValueCreator.createMapValue(TypeCreator
.createMapType(PredefinedTypes.TYPE_INT));
descriptor.put(PD_VERSION, propertyDescriptor.version);
descriptor.put(PD_COPY_OPTIONS, propertyDescriptor.copyOptions);
descriptor.put(PD_OPTIONS, propertyDescriptor.options);
descriptor.put(PD_SUPPORT, propertyDescriptor.support);
descriptor.put(PD_CONTEXT, propertyDescriptor.context);
return descriptor;
}

public static BError createError(String errorType, String message, Throwable throwable) {
BError cause = ErrorCreator.createError(throwable);
BMap<BString, Object> errorDetails = ValueCreator.createRecordValue(getModule(), ERROR_DETAILS);
if (throwable instanceof MQException exception) {
errorDetails.put(ERROR_REASON_CODE, exception.getReason());
errorDetails.put(ERROR_ERROR_CODE, exception.getErrorCode());
errorDetails.put(ERROR_COMPLETION_CODE, exception.getCompCode());
}
return ErrorCreator.createError(
ModuleUtils.getModule(), errorType, StringUtils.fromString(message), cause, null);
ModuleUtils.getModule(), errorType, StringUtils.fromString(message), cause, errorDetails);
}

public static Optional<String> getOptionalStringProperty(BMap<BString, Object> config, BString fieldName) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,4 +24,6 @@ public interface Constants {

// Native properties in respective ballerina objects
public static final String NATIVE_QUEUE_MANAGER = "queueManager";

public static final String NATIVE_TOPIC = "topic";
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
/*
* Copyright (c) 2023, WSO2 LLC. (http://www.wso2.org) All Rights Reserved.
*
* WSO2 LLC. licenses this file to you under the Apache License,
* Version 2.0 (the "License"); you may not use this file except
* in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package io.ballerina.lib.ibm.ibmmq;

import java.util.concurrent.ThreadFactory;

public class MQThreadFactory implements ThreadFactory {

private final String threadGroupName;

public MQThreadFactory(String threadGroupName) {
this.threadGroupName = threadGroupName;
}

@Override
public Thread newThread(Runnable runnable) {
Thread ibmMqClientThread = new Thread(runnable);
ibmMqClientThread.setName(threadGroupName);
return ibmMqClientThread;
}
}
Loading

0 comments on commit 06f7660

Please sign in to comment.