Skip to content

Commit

Permalink
Add support to retrieve messages based on msg-id or correlation-id
Browse files Browse the repository at this point in the history
  • Loading branch information
ayeshLK committed Aug 27, 2024
1 parent 8d012f6 commit b26114c
Show file tree
Hide file tree
Showing 6 changed files with 151 additions and 19 deletions.
44 changes: 35 additions & 9 deletions native/src/main/java/io/ballerina/lib/ibm.ibmmq/CommonUtils.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,10 @@
import com.ibm.mq.MQGetMessageOptions;
import com.ibm.mq.MQMessage;
import com.ibm.mq.MQPropertyDescriptor;
import com.ibm.mq.constants.MQConstants;
import com.ibm.mq.headers.MQHeaderList;
import io.ballerina.lib.ibm.ibmmq.config.GetMessageOptions;
import io.ballerina.lib.ibm.ibmmq.config.MatchOptions;
import io.ballerina.lib.ibm.ibmmq.headers.MQRFH2Header;
import io.ballerina.runtime.api.PredefinedTypes;
import io.ballerina.runtime.api.Runtime;
Expand Down Expand Up @@ -66,7 +69,6 @@
import static io.ballerina.lib.ibm.ibmmq.Constants.MESSAGE_PROPERTY;
import static io.ballerina.lib.ibm.ibmmq.Constants.MESSAGE_PROPERTIES;
import static io.ballerina.lib.ibm.ibmmq.Constants.MESSAGE_TYPE_FIELD;
import static io.ballerina.lib.ibm.ibmmq.Constants.OPTIONS;
import static io.ballerina.lib.ibm.ibmmq.Constants.PD_CONTEXT;
import static io.ballerina.lib.ibm.ibmmq.Constants.PD_COPY_OPTIONS;
import static io.ballerina.lib.ibm.ibmmq.Constants.PD_OPTIONS;
Expand All @@ -79,7 +81,6 @@
import static io.ballerina.lib.ibm.ibmmq.Constants.PUT_APPLICATION_TYPE_FIELD;
import static io.ballerina.lib.ibm.ibmmq.Constants.REPLY_TO_QM_NAME_FIELD;
import static io.ballerina.lib.ibm.ibmmq.Constants.REPLY_TO_QUEUE_NAME_FIELD;
import static io.ballerina.lib.ibm.ibmmq.Constants.WAIT_INTERVAL;
import static io.ballerina.lib.ibm.ibmmq.ModuleUtils.getModule;
import static io.ballerina.lib.ibm.ibmmq.headers.MQCIHHeader.createMQCIHHeaderFromBHeader;
import static io.ballerina.lib.ibm.ibmmq.headers.MQIIHHeader.createMQIIHHeaderFromBHeader;
Expand Down Expand Up @@ -293,13 +294,38 @@ private static BMap populateDescriptorFromMQPropertyDescriptor(MQPropertyDescrip
return descriptor;
}

public static MQGetMessageOptions getGetMessageOptions(BMap<BString, Object> bOptions) {
int waitInterval = bOptions.getIntValue(WAIT_INTERVAL).intValue();
int options = bOptions.getIntValue(OPTIONS).intValue();
MQGetMessageOptions getMessageOptions = new MQGetMessageOptions();
getMessageOptions.waitInterval = waitInterval * 1000;
getMessageOptions.options = options;
return getMessageOptions;
public static MQMessage getMqMessage(MatchOptions matchOptions) {
MQMessage message = new MQMessage();
if (Objects.isNull(matchOptions)) {
return message;
}

if (Objects.nonNull(matchOptions.messageId())) {
message.messageId = matchOptions.messageId();
}
if (Objects.nonNull(matchOptions.correlationId())) {
message.correlationId = matchOptions.correlationId();
}
return message;
}

public static MQGetMessageOptions getMqGetMsgOptions(GetMessageOptions getMsgOptions) {
MQGetMessageOptions mqGetMsgOptions = new MQGetMessageOptions();
mqGetMsgOptions.waitInterval = getMsgOptions.waitInterval();
mqGetMsgOptions.options = getMsgOptions.options();

MatchOptions matchOptions = getMsgOptions.matchOptions();
if (Objects.isNull(matchOptions)) {
return mqGetMsgOptions;
}

if (Objects.nonNull(matchOptions.messageId())) {
mqGetMsgOptions.matchOptions = MQConstants.MQMO_MATCH_MSG_ID;
}
if (Objects.nonNull(matchOptions.correlationId())) {
mqGetMsgOptions.matchOptions |= MQConstants.MQMO_MATCH_CORREL_ID;
}
return mqGetMsgOptions;
}

private static Object getBHeaders(Runtime runtime, MQMessage mqMessage) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,7 @@ public interface Constants {
BString PROPERTY_DESCRIPTOR = StringUtils.fromString("descriptor");
BString WAIT_INTERVAL = StringUtils.fromString("waitInterval");
BString OPTIONS = StringUtils.fromString("options");
BString MATCH_OPTIONS = StringUtils.fromString("matchOptions");
BString FORMAT_FIELD = StringUtils.fromString("format");
BString MESSAGE_ID_FIELD = StringUtils.fromString("messageId");
BString CORRELATION_ID_FIELD = StringUtils.fromString("correlationId");
Expand Down
12 changes: 7 additions & 5 deletions native/src/main/java/io/ballerina/lib/ibm.ibmmq/Queue.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import com.ibm.mq.MQMessage;
import com.ibm.mq.MQQueue;
import com.ibm.mq.constants.CMQC;
import io.ballerina.lib.ibm.ibmmq.config.GetMessageOptions;
import io.ballerina.runtime.api.Environment;
import io.ballerina.runtime.api.Future;
import io.ballerina.runtime.api.values.BError;
Expand Down Expand Up @@ -60,15 +61,16 @@ public static Object put(Environment environment, BObject queueObject, BMap<BStr
return null;
}

public static Object get(Environment environment, BObject queueObject, BMap<BString, Object> options) {
public static Object get(Environment environment, BObject queueObject, BMap<BString, Object> bGetMsgOptions) {
MQQueue queue = (MQQueue) queueObject.getNativeData(Constants.NATIVE_QUEUE);
MQGetMessageOptions getMessageOptions = CommonUtils.getGetMessageOptions(options);
GetMessageOptions getMsgOptions = new GetMessageOptions(bGetMsgOptions);
MQMessage mqMessage = CommonUtils.getMqMessage(getMsgOptions.matchOptions());
MQGetMessageOptions mqGetMsgOptions = CommonUtils.getMqGetMsgOptions(getMsgOptions);
Future future = environment.markAsync();
QUEUE_EXECUTOR_SERVICE.execute(() -> {
try {
MQMessage message = new MQMessage();
queue.get(message, getMessageOptions);
future.complete(CommonUtils.getBMessageFromMQMessage(environment.getRuntime(), message));
queue.get(mqMessage, mqGetMsgOptions);
future.complete(CommonUtils.getBMessageFromMQMessage(environment.getRuntime(), mqMessage));
} catch (MQException e) {
if (e.reasonCode == CMQC.MQRC_NO_MSG_AVAILABLE) {
future.complete(null);
Expand Down
12 changes: 7 additions & 5 deletions native/src/main/java/io/ballerina/lib/ibm.ibmmq/Topic.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import com.ibm.mq.MQMessage;
import com.ibm.mq.MQTopic;
import com.ibm.mq.constants.CMQC;
import io.ballerina.lib.ibm.ibmmq.config.GetMessageOptions;
import io.ballerina.runtime.api.Environment;
import io.ballerina.runtime.api.Future;
import io.ballerina.runtime.api.values.BError;
Expand Down Expand Up @@ -60,15 +61,16 @@ public static Object put(Environment environment, BObject topicObject, BMap mess
return null;
}

public static Object get(Environment environment, BObject topicObject, BMap<BString, Object> options) {
public static Object get(Environment environment, BObject topicObject, BMap<BString, Object> bGetMsgOptions) {
MQTopic topic = (MQTopic) topicObject.getNativeData(Constants.NATIVE_TOPIC);
MQGetMessageOptions getMessageOptions = CommonUtils.getGetMessageOptions(options);
GetMessageOptions getMsgOptions = new GetMessageOptions(bGetMsgOptions);
MQMessage mqMessage = CommonUtils.getMqMessage(getMsgOptions.matchOptions());
MQGetMessageOptions mqGetMsgOptions = CommonUtils.getMqGetMsgOptions(getMsgOptions);
Future future = environment.markAsync();
topicExecutorService.execute(() -> {
try {
MQMessage message = new MQMessage();
topic.get(message, getMessageOptions);
future.complete(CommonUtils.getBMessageFromMQMessage(environment.getRuntime(), message));
topic.get(mqMessage, mqGetMsgOptions);
future.complete(CommonUtils.getBMessageFromMQMessage(environment.getRuntime(), mqMessage));
} catch (MQException e) {
if (e.reasonCode == CMQC.MQRC_NO_MSG_AVAILABLE) {
future.complete(null);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
/*
* Copyright (c) 2024, WSO2 LLC. (http://www.wso2.org).
*
* WSO2 LLC. licenses this file to you under the Apache License,
* Version 2.0 (the "License"); you may not use this file except
* in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package io.ballerina.lib.ibm.ibmmq.config;

import io.ballerina.runtime.api.values.BMap;
import io.ballerina.runtime.api.values.BString;

import static io.ballerina.lib.ibm.ibmmq.Constants.MATCH_OPTIONS;
import static io.ballerina.lib.ibm.ibmmq.Constants.OPTIONS;
import static io.ballerina.lib.ibm.ibmmq.Constants.WAIT_INTERVAL;

/**
* Represents the IBM MQ GET message options.
*
* @param options Get message option
* @param waitInterval The maximum time (in seconds) that a `get` call waits for a suitable message to arrive.
* It is used in conjunction with `MQGMO_WAIT`.
* @param matchOptions Message selection criteria
*/
public record GetMessageOptions(int options, int waitInterval, MatchOptions matchOptions) {

public GetMessageOptions(BMap<BString, Object> getMsgOptions) {
this (
getMsgOptions.getIntValue(OPTIONS).intValue(),
getMsgOptions.getIntValue(WAIT_INTERVAL).intValue() * 1000,
getMatchOptions(getMsgOptions)
);
}

@SuppressWarnings("unchecked")
private static MatchOptions getMatchOptions(BMap<BString, Object> getMsgOptions) {
if (!getMsgOptions.containsKey(MATCH_OPTIONS)) {
return null;
}
return new MatchOptions((BMap<BString, Object>) getMsgOptions.getMapValue(MATCH_OPTIONS));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
/*
* Copyright (c) 2024, WSO2 LLC. (http://www.wso2.org).
*
* WSO2 LLC. licenses this file to you under the Apache License,
* Version 2.0 (the "License"); you may not use this file except
* in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package io.ballerina.lib.ibm.ibmmq.config;

import io.ballerina.runtime.api.values.BMap;
import io.ballerina.runtime.api.values.BString;

import static io.ballerina.lib.ibm.ibmmq.Constants.CORRELATION_ID_FIELD;
import static io.ballerina.lib.ibm.ibmmq.Constants.MESSAGE_ID_FIELD;

/**
* Represents the selection criteria that determine which message is retrieved.
*
* @param messageId The message identifier of the message which needs to be retrieved
* @param correlationId The Correlation identifier of the message which needs to be retrieved
*/
public record MatchOptions(byte[] messageId, byte[] correlationId) {

public MatchOptions(BMap<BString, Object> matchOptions) {
this (
getValueIfPresent(matchOptions, MESSAGE_ID_FIELD),
getValueIfPresent(matchOptions, CORRELATION_ID_FIELD)
);
}

private static byte[] getValueIfPresent(BMap<BString, Object> matchOptions, BString key) {
if (!matchOptions.containsKey(key)) {
return null;
}
return matchOptions.getArrayValue(key).getByteArray();
}
}

0 comments on commit b26114c

Please sign in to comment.