diff --git a/native/src/main/java/io/ballerina/lib/ibm.ibmmq/CommonUtils.java b/native/src/main/java/io/ballerina/lib/ibm.ibmmq/CommonUtils.java index d3957f9..504b8fd 100644 --- a/native/src/main/java/io/ballerina/lib/ibm.ibmmq/CommonUtils.java +++ b/native/src/main/java/io/ballerina/lib/ibm.ibmmq/CommonUtils.java @@ -22,7 +22,10 @@ import com.ibm.mq.MQGetMessageOptions; import com.ibm.mq.MQMessage; import com.ibm.mq.MQPropertyDescriptor; +import com.ibm.mq.constants.MQConstants; import com.ibm.mq.headers.MQHeaderList; +import io.ballerina.lib.ibm.ibmmq.config.GetMessageOptions; +import io.ballerina.lib.ibm.ibmmq.config.MatchOptions; import io.ballerina.lib.ibm.ibmmq.headers.MQRFH2Header; import io.ballerina.runtime.api.PredefinedTypes; import io.ballerina.runtime.api.Runtime; @@ -66,7 +69,6 @@ import static io.ballerina.lib.ibm.ibmmq.Constants.MESSAGE_PROPERTY; import static io.ballerina.lib.ibm.ibmmq.Constants.MESSAGE_PROPERTIES; import static io.ballerina.lib.ibm.ibmmq.Constants.MESSAGE_TYPE_FIELD; -import static io.ballerina.lib.ibm.ibmmq.Constants.OPTIONS; import static io.ballerina.lib.ibm.ibmmq.Constants.PD_CONTEXT; import static io.ballerina.lib.ibm.ibmmq.Constants.PD_COPY_OPTIONS; import static io.ballerina.lib.ibm.ibmmq.Constants.PD_OPTIONS; @@ -79,7 +81,6 @@ import static io.ballerina.lib.ibm.ibmmq.Constants.PUT_APPLICATION_TYPE_FIELD; import static io.ballerina.lib.ibm.ibmmq.Constants.REPLY_TO_QM_NAME_FIELD; import static io.ballerina.lib.ibm.ibmmq.Constants.REPLY_TO_QUEUE_NAME_FIELD; -import static io.ballerina.lib.ibm.ibmmq.Constants.WAIT_INTERVAL; import static io.ballerina.lib.ibm.ibmmq.ModuleUtils.getModule; import static io.ballerina.lib.ibm.ibmmq.headers.MQCIHHeader.createMQCIHHeaderFromBHeader; import static io.ballerina.lib.ibm.ibmmq.headers.MQIIHHeader.createMQIIHHeaderFromBHeader; @@ -293,13 +294,38 @@ private static BMap populateDescriptorFromMQPropertyDescriptor(MQPropertyDescrip return descriptor; } - public static MQGetMessageOptions getGetMessageOptions(BMap bOptions) { - int waitInterval = bOptions.getIntValue(WAIT_INTERVAL).intValue(); - int options = bOptions.getIntValue(OPTIONS).intValue(); - MQGetMessageOptions getMessageOptions = new MQGetMessageOptions(); - getMessageOptions.waitInterval = waitInterval * 1000; - getMessageOptions.options = options; - return getMessageOptions; + public static MQMessage getMqMessage(MatchOptions matchOptions) { + MQMessage message = new MQMessage(); + if (Objects.isNull(matchOptions)) { + return message; + } + + if (Objects.nonNull(matchOptions.messageId())) { + message.messageId = matchOptions.messageId(); + } + if (Objects.nonNull(matchOptions.correlationId())) { + message.correlationId = matchOptions.correlationId(); + } + return message; + } + + public static MQGetMessageOptions getMqGetMsgOptions(GetMessageOptions getMsgOptions) { + MQGetMessageOptions mqGetMsgOptions = new MQGetMessageOptions(); + mqGetMsgOptions.waitInterval = getMsgOptions.waitInterval(); + mqGetMsgOptions.options = getMsgOptions.options(); + + MatchOptions matchOptions = getMsgOptions.matchOptions(); + if (Objects.isNull(matchOptions)) { + return mqGetMsgOptions; + } + + if (Objects.nonNull(matchOptions.messageId())) { + mqGetMsgOptions.matchOptions = MQConstants.MQMO_MATCH_MSG_ID; + } + if (Objects.nonNull(matchOptions.correlationId())) { + mqGetMsgOptions.matchOptions |= MQConstants.MQMO_MATCH_CORREL_ID; + } + return mqGetMsgOptions; } private static Object getBHeaders(Runtime runtime, MQMessage mqMessage) { diff --git a/native/src/main/java/io/ballerina/lib/ibm.ibmmq/Constants.java b/native/src/main/java/io/ballerina/lib/ibm.ibmmq/Constants.java index 60d9551..91d3b3b 100644 --- a/native/src/main/java/io/ballerina/lib/ibm.ibmmq/Constants.java +++ b/native/src/main/java/io/ballerina/lib/ibm.ibmmq/Constants.java @@ -108,6 +108,7 @@ public interface Constants { BString PROPERTY_DESCRIPTOR = StringUtils.fromString("descriptor"); BString WAIT_INTERVAL = StringUtils.fromString("waitInterval"); BString OPTIONS = StringUtils.fromString("options"); + BString MATCH_OPTIONS = StringUtils.fromString("matchOptions"); BString FORMAT_FIELD = StringUtils.fromString("format"); BString MESSAGE_ID_FIELD = StringUtils.fromString("messageId"); BString CORRELATION_ID_FIELD = StringUtils.fromString("correlationId"); diff --git a/native/src/main/java/io/ballerina/lib/ibm.ibmmq/Queue.java b/native/src/main/java/io/ballerina/lib/ibm.ibmmq/Queue.java index a723fdc..5b314ed 100644 --- a/native/src/main/java/io/ballerina/lib/ibm.ibmmq/Queue.java +++ b/native/src/main/java/io/ballerina/lib/ibm.ibmmq/Queue.java @@ -23,6 +23,7 @@ import com.ibm.mq.MQMessage; import com.ibm.mq.MQQueue; import com.ibm.mq.constants.CMQC; +import io.ballerina.lib.ibm.ibmmq.config.GetMessageOptions; import io.ballerina.runtime.api.Environment; import io.ballerina.runtime.api.Future; import io.ballerina.runtime.api.values.BError; @@ -60,15 +61,16 @@ public static Object put(Environment environment, BObject queueObject, BMap options) { + public static Object get(Environment environment, BObject queueObject, BMap bGetMsgOptions) { MQQueue queue = (MQQueue) queueObject.getNativeData(Constants.NATIVE_QUEUE); - MQGetMessageOptions getMessageOptions = CommonUtils.getGetMessageOptions(options); + GetMessageOptions getMsgOptions = new GetMessageOptions(bGetMsgOptions); + MQMessage mqMessage = CommonUtils.getMqMessage(getMsgOptions.matchOptions()); + MQGetMessageOptions mqGetMsgOptions = CommonUtils.getMqGetMsgOptions(getMsgOptions); Future future = environment.markAsync(); QUEUE_EXECUTOR_SERVICE.execute(() -> { try { - MQMessage message = new MQMessage(); - queue.get(message, getMessageOptions); - future.complete(CommonUtils.getBMessageFromMQMessage(environment.getRuntime(), message)); + queue.get(mqMessage, mqGetMsgOptions); + future.complete(CommonUtils.getBMessageFromMQMessage(environment.getRuntime(), mqMessage)); } catch (MQException e) { if (e.reasonCode == CMQC.MQRC_NO_MSG_AVAILABLE) { future.complete(null); diff --git a/native/src/main/java/io/ballerina/lib/ibm.ibmmq/Topic.java b/native/src/main/java/io/ballerina/lib/ibm.ibmmq/Topic.java index fc6272b..1d5611d 100644 --- a/native/src/main/java/io/ballerina/lib/ibm.ibmmq/Topic.java +++ b/native/src/main/java/io/ballerina/lib/ibm.ibmmq/Topic.java @@ -23,6 +23,7 @@ import com.ibm.mq.MQMessage; import com.ibm.mq.MQTopic; import com.ibm.mq.constants.CMQC; +import io.ballerina.lib.ibm.ibmmq.config.GetMessageOptions; import io.ballerina.runtime.api.Environment; import io.ballerina.runtime.api.Future; import io.ballerina.runtime.api.values.BError; @@ -60,15 +61,16 @@ public static Object put(Environment environment, BObject topicObject, BMap mess return null; } - public static Object get(Environment environment, BObject topicObject, BMap options) { + public static Object get(Environment environment, BObject topicObject, BMap bGetMsgOptions) { MQTopic topic = (MQTopic) topicObject.getNativeData(Constants.NATIVE_TOPIC); - MQGetMessageOptions getMessageOptions = CommonUtils.getGetMessageOptions(options); + GetMessageOptions getMsgOptions = new GetMessageOptions(bGetMsgOptions); + MQMessage mqMessage = CommonUtils.getMqMessage(getMsgOptions.matchOptions()); + MQGetMessageOptions mqGetMsgOptions = CommonUtils.getMqGetMsgOptions(getMsgOptions); Future future = environment.markAsync(); topicExecutorService.execute(() -> { try { - MQMessage message = new MQMessage(); - topic.get(message, getMessageOptions); - future.complete(CommonUtils.getBMessageFromMQMessage(environment.getRuntime(), message)); + topic.get(mqMessage, mqGetMsgOptions); + future.complete(CommonUtils.getBMessageFromMQMessage(environment.getRuntime(), mqMessage)); } catch (MQException e) { if (e.reasonCode == CMQC.MQRC_NO_MSG_AVAILABLE) { future.complete(null); diff --git a/native/src/main/java/io/ballerina/lib/ibm.ibmmq/config/GetMessageOptions.java b/native/src/main/java/io/ballerina/lib/ibm.ibmmq/config/GetMessageOptions.java new file mode 100644 index 0000000..b8ab9a1 --- /dev/null +++ b/native/src/main/java/io/ballerina/lib/ibm.ibmmq/config/GetMessageOptions.java @@ -0,0 +1,53 @@ +/* + * Copyright (c) 2024, WSO2 LLC. (http://www.wso2.org). + * + * WSO2 LLC. licenses this file to you under the Apache License, + * Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.ballerina.lib.ibm.ibmmq.config; + +import io.ballerina.runtime.api.values.BMap; +import io.ballerina.runtime.api.values.BString; + +import static io.ballerina.lib.ibm.ibmmq.Constants.MATCH_OPTIONS; +import static io.ballerina.lib.ibm.ibmmq.Constants.OPTIONS; +import static io.ballerina.lib.ibm.ibmmq.Constants.WAIT_INTERVAL; + +/** + * Represents the IBM MQ GET message options. + * + * @param options Get message option + * @param waitInterval The maximum time (in seconds) that a `get` call waits for a suitable message to arrive. + * It is used in conjunction with `MQGMO_WAIT`. + * @param matchOptions Message selection criteria + */ +public record GetMessageOptions(int options, int waitInterval, MatchOptions matchOptions) { + + public GetMessageOptions(BMap getMsgOptions) { + this ( + getMsgOptions.getIntValue(OPTIONS).intValue(), + getMsgOptions.getIntValue(WAIT_INTERVAL).intValue() * 1000, + getMatchOptions(getMsgOptions) + ); + } + + @SuppressWarnings("unchecked") + private static MatchOptions getMatchOptions(BMap getMsgOptions) { + if (!getMsgOptions.containsKey(MATCH_OPTIONS)) { + return null; + } + return new MatchOptions((BMap) getMsgOptions.getMapValue(MATCH_OPTIONS)); + } +} diff --git a/native/src/main/java/io/ballerina/lib/ibm.ibmmq/config/MatchOptions.java b/native/src/main/java/io/ballerina/lib/ibm.ibmmq/config/MatchOptions.java new file mode 100644 index 0000000..e0f418a --- /dev/null +++ b/native/src/main/java/io/ballerina/lib/ibm.ibmmq/config/MatchOptions.java @@ -0,0 +1,48 @@ +/* + * Copyright (c) 2024, WSO2 LLC. (http://www.wso2.org). + * + * WSO2 LLC. licenses this file to you under the Apache License, + * Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.ballerina.lib.ibm.ibmmq.config; + +import io.ballerina.runtime.api.values.BMap; +import io.ballerina.runtime.api.values.BString; + +import static io.ballerina.lib.ibm.ibmmq.Constants.CORRELATION_ID_FIELD; +import static io.ballerina.lib.ibm.ibmmq.Constants.MESSAGE_ID_FIELD; + +/** + * Represents the selection criteria that determine which message is retrieved. + * + * @param messageId The message identifier of the message which needs to be retrieved + * @param correlationId The Correlation identifier of the message which needs to be retrieved + */ +public record MatchOptions(byte[] messageId, byte[] correlationId) { + + public MatchOptions(BMap matchOptions) { + this ( + getValueIfPresent(matchOptions, MESSAGE_ID_FIELD), + getValueIfPresent(matchOptions, CORRELATION_ID_FIELD) + ); + } + + private static byte[] getValueIfPresent(BMap matchOptions, BString key) { + if (!matchOptions.containsKey(key)) { + return null; + } + return matchOptions.getArrayValue(key).getByteArray(); + } +}