diff --git a/README.md b/README.md
index 192b2782b..907d8ab1c 100644
--- a/README.md
+++ b/README.md
@@ -980,6 +980,8 @@ You can however set the deliver policy which will be used to start the subscript
| JsConsumerCreate290NotAvailable | CON-90301 | Name field not valid when v2.9.0 consumer create api is not available. |
| JsConsumerNameDurableMismatch | CON-90302 | Name must match durable if both are supplied. |
| JsMultipleFilterSubjects210NotAvailable | CON-90303 | Multiple filter subjects not available until server version 2.10.0. |
+| JsAllowDirectRequired | CON-90304 | Stream must have allow direct set. |
+| JsDirectBatchGet211NotAvailable | CON-90305 | Batch direct get not available until server version 2.11.0. |
| OsObjectNotFound | OS-90201 | The object was not found. |
| OsObjectIsDeleted | OS-90202 | The object is deleted. |
| OsObjectAlreadyExists | OS-90203 | An object with that name already exists. |
diff --git a/src/main/java/io/nats/client/JetStreamManagement.java b/src/main/java/io/nats/client/JetStreamManagement.java
index 499c8ef4d..6755e49d0 100644
--- a/src/main/java/io/nats/client/JetStreamManagement.java
+++ b/src/main/java/io/nats/client/JetStreamManagement.java
@@ -17,6 +17,7 @@
import java.io.IOException;
import java.time.ZonedDateTime;
import java.util.List;
+import java.util.concurrent.LinkedBlockingQueue;
/**
* JetStream Management context for creation and access to streams and consumers in NATS.
@@ -324,6 +325,48 @@ public interface JetStreamManagement {
*/
MessageInfo getNextMessage(String streamName, long seq, String subject) throws IOException, JetStreamApiException;
+ /**
+ * Request a batch of messages using a {@link MessageBatchGetRequest}.
+ *
+ * This API is currently EXPERIMENTAL and is subject to change.
+ *
+ * @param streamName the name of the stream
+ * @param messageBatchGetRequest the request details
+ * @return a list containing {@link MessageInfo}
+ * @throws IOException covers various communication issues with the NATS
+ * server such as timeout or interruption
+ * @throws JetStreamApiException the request had an error related to the data
+ */
+ List fetchMessageBatch(String streamName, MessageBatchGetRequest messageBatchGetRequest) throws IOException, JetStreamApiException;
+
+ /**
+ * Request a batch of messages using a {@link MessageBatchGetRequest}.
+ *
+ * This API is currently EXPERIMENTAL and is subject to change.
+ *
+ * @param streamName the name of the stream
+ * @param messageBatchGetRequest the request details
+ * @return a queue used to asynchronously receive {@link MessageInfo}
+ * @throws IOException covers various communication issues with the NATS
+ * server such as timeout or interruption
+ * @throws JetStreamApiException the request had an error related to the data
+ */
+ LinkedBlockingQueue queueMessageBatch(String streamName, MessageBatchGetRequest messageBatchGetRequest) throws IOException, JetStreamApiException;
+
+ /**
+ * Request a batch of messages using a {@link MessageBatchGetRequest}.
+ *
+ * This API is currently EXPERIMENTAL and is subject to change.
+ *
+ * @param streamName the name of the stream
+ * @param messageBatchGetRequest the request details
+ * @param handler the handler used for receiving {@link MessageInfo}
+ * @throws IOException covers various communication issues with the NATS
+ * server such as timeout or interruption
+ * @throws JetStreamApiException the request had an error related to the data
+ */
+ void requestMessageBatch(String streamName, MessageBatchGetRequest messageBatchGetRequest, MessageInfoHandler handler) throws IOException, JetStreamApiException;
+
/**
* Deletes a message, overwriting the message data with garbage
* This can be considered an expensive (time-consuming) operation, but is more secure.
diff --git a/src/main/java/io/nats/client/MessageInfoHandler.java b/src/main/java/io/nats/client/MessageInfoHandler.java
new file mode 100644
index 000000000..6a9697a31
--- /dev/null
+++ b/src/main/java/io/nats/client/MessageInfoHandler.java
@@ -0,0 +1,29 @@
+// Copyright 2024 The NATS Authors
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at:
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package io.nats.client;
+
+import io.nats.client.api.MessageInfo;
+
+/**
+ * Handler for {@link MessageInfo}.
+ */
+public interface MessageInfoHandler {
+ /**
+ * Called to deliver a {@link MessageInfo} to the handler.
+ *
+ * @param messageInfo the received {@link MessageInfo}
+ * @throws InterruptedException if the thread for this handler is interrupted
+ */
+ void onMessageInfo(MessageInfo messageInfo) throws InterruptedException;
+}
diff --git a/src/main/java/io/nats/client/api/ApiResponse.java b/src/main/java/io/nats/client/api/ApiResponse.java
index 2dbe3a348..e97a42a41 100644
--- a/src/main/java/io/nats/client/api/ApiResponse.java
+++ b/src/main/java/io/nats/client/api/ApiResponse.java
@@ -76,6 +76,12 @@ public ApiResponse() {
type = NO_TYPE;
}
+ public ApiResponse(Error error) {
+ jv = null;
+ this.error = error;
+ type = NO_TYPE;
+ }
+
@SuppressWarnings("unchecked")
public T throwOnHasError() throws JetStreamApiException {
if (hasError()) {
diff --git a/src/main/java/io/nats/client/api/MessageBatchGetRequest.java b/src/main/java/io/nats/client/api/MessageBatchGetRequest.java
new file mode 100644
index 000000000..3be679f43
--- /dev/null
+++ b/src/main/java/io/nats/client/api/MessageBatchGetRequest.java
@@ -0,0 +1,347 @@
+// Copyright 2024 The NATS Authors
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at:
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package io.nats.client.api;
+
+import io.nats.client.support.JsonSerializable;
+
+import java.time.Duration;
+import java.time.ZonedDateTime;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.List;
+
+import static io.nats.client.support.ApiConstants.*;
+import static io.nats.client.support.JsonUtils.*;
+import static io.nats.client.support.Validator.*;
+
+/**
+ * Object used to make a request for message batch get requests.
+ */
+public class MessageBatchGetRequest implements JsonSerializable {
+
+ private final Duration timeout;
+ private final int batch;
+ private final int maxBytes;
+ private final long sequence;
+ private final ZonedDateTime startTime;
+ private final String nextBySubject;
+ private final List multiLastFor;
+ private final long upToSequence;
+ private final ZonedDateTime upToTime;
+
+ MessageBatchGetRequest(Builder b) {
+ this.timeout = b.timeout;
+ this.batch = b.batch;
+ this.maxBytes = b.maxBytes;
+ this.sequence = b.sequence;
+ this.startTime = b.startTime;
+ this.nextBySubject = b.nextBySubject;
+ this.multiLastFor = b.multiLastFor;
+ this.upToSequence = b.upToSequence;
+ this.upToTime = b.upToTime;
+ }
+
+ /**
+ * Timeout used for the request.
+ *
+ * @return Duration
+ */
+ public Duration getTimeout() {
+ return timeout;
+ }
+
+ /**
+ * Maximum amount of messages to be returned for this request.
+ *
+ * @return batch size
+ */
+ public int getBatch() {
+ return batch;
+ }
+
+ /**
+ * Maximum amount of returned bytes for this request.
+ * Limits the amount of returned messages to not exceed this.
+ *
+ * @return maximum bytes
+ */
+ public int getMaxBytes() {
+ return maxBytes;
+ }
+
+ /**
+ * Minimum sequence for returned messages.
+ * All returned messages will have a sequence equal to or higher than this.
+ *
+ * @return minimum message sequence
+ */
+ public long getSequence() {
+ return sequence;
+ }
+
+ /**
+ * Minimum start time for returned messages.
+ * All returned messages will have a start time equal to or higher than this.
+ *
+ * @return minimum message start time
+ */
+ public ZonedDateTime getStartTime() {
+ return startTime;
+ }
+
+ /**
+ * Subject used to filter messages that should be returned.
+ *
+ * @return the subject to filter
+ */
+ public String getSubject() {
+ return nextBySubject;
+ }
+
+ /**
+ * Subjects filter used, these can include wildcards.
+ * Will get the last messages matching the subjects.
+ *
+ * @return the subjects to get the last messages for
+ */
+ public List getMultiLastForSubjects() {
+ return multiLastFor;
+ }
+
+ /**
+ * Only return messages up to this sequence.
+ *
+ * @return the maximum message sequence to return results for
+ */
+ public long getUpToSequence() {
+ return upToSequence;
+ }
+
+ /**
+ * Only return messages up to this time.
+ *
+ * @return the maximum message time to return results for
+ */
+ public ZonedDateTime getUpToTime() {
+ return upToTime;
+ }
+
+ @Override
+ public String toJson() {
+ StringBuilder sb = beginJson();
+ addField(sb, BATCH, batch);
+ addField(sb, MAX_BYTES, maxBytes);
+ addField(sb, SEQ, sequence);
+ addField(sb, START_TIME, startTime);
+ addField(sb, NEXT_BY_SUBJECT, nextBySubject);
+ addStrings(sb, MULTI_LAST, multiLastFor);
+ addField(sb, UP_TO_SEQ, upToSequence);
+ addField(sb, UP_TO_TIME, upToTime);
+ return endJson(sb).toString();
+ }
+
+ /**
+ * Creates a builder for the request.
+ *
+ * @return Builder
+ */
+ public static Builder builder() {
+ return new Builder();
+ }
+
+ /**
+ * Creates a builder for the request.
+ *
+ * @param req the {@link MessageBatchGetRequest}
+ * @return Builder
+ */
+ public static Builder builder(MessageBatchGetRequest req) {
+ return req == null ? new Builder() : new Builder(req);
+ }
+
+ /**
+ * {@link MessageBatchGetRequest} is created using a Builder. The builder supports chaining and will
+ * create a default set of options if no methods are calls.
+ *
+ * {@code MessageBatchGetRequest.builder().build()} will create a default {@link MessageBatchGetRequest}.
+ */
+ public static class Builder {
+ private Duration timeout = Duration.ofSeconds(5);
+ private int batch = -1;
+ private int maxBytes = -1;
+ private long sequence = -1;
+ private ZonedDateTime startTime = null;
+ private String nextBySubject = null;
+ private List multiLastFor = new ArrayList<>();
+ private long upToSequence = -1;
+ private ZonedDateTime upToTime = null;
+
+ /**
+ * Construct the builder
+ */
+ public Builder() {
+ }
+
+ /**
+ * Construct the builder and initialize values with the existing {@link MessageBatchGetRequest}
+ *
+ * @param req the {@link MessageBatchGetRequest} to clone
+ */
+ public Builder(MessageBatchGetRequest req) {
+ if (req != null) {
+ this.timeout = req.timeout;
+ this.batch = req.batch;
+ this.maxBytes = req.maxBytes;
+ this.sequence = req.sequence;
+ this.startTime = req.startTime;
+ this.nextBySubject = req.nextBySubject;
+ this.multiLastFor = req.multiLastFor;
+ this.upToSequence = req.upToSequence;
+ this.upToTime = req.upToTime;
+ }
+ }
+
+ /**
+ * Set the timeout used for the request.
+ *
+ * @param timeout the timeout
+ * @return Builder
+ */
+ public Builder timeout(Duration timeout) {
+ validateDurationRequired(timeout);
+ this.timeout = timeout;
+ return this;
+ }
+
+ /**
+ * Set the maximum amount of messages to be returned for this request.
+ *
+ * @param batch the batch size
+ * @return Builder
+ */
+ public Builder batch(int batch) {
+ validateGtZero(batch, "Request batch size");
+ this.batch = batch;
+ return this;
+ }
+
+ /**
+ * Maximum amount of returned bytes for this request.
+ * Limits the amount of returned messages to not exceed this.
+ *
+ * @param maxBytes the maximum bytes
+ * @return Builder
+ */
+ public Builder maxBytes(int maxBytes) {
+ this.maxBytes = maxBytes;
+ return this;
+ }
+
+ /**
+ * Minimum sequence for returned messages.
+ * All returned messages will have a sequence equal to or higher than this.
+ *
+ * @param sequence the minimum message sequence
+ * @return Builder
+ */
+ public Builder sequence(long sequence) {
+ validateGtEqZero(sequence, "Sequence");
+ this.sequence = sequence;
+ return this;
+ }
+
+ /**
+ * Minimum start time for returned messages.
+ * All returned messages will have a start time equal to or higher than this.
+ *
+ * @param startTime the minimum message start time
+ * @return Builder
+ */
+ public Builder startTime(ZonedDateTime startTime) {
+ this.startTime = startTime;
+ return this;
+ }
+
+ /**
+ * Subject used to filter messages that should be returned.
+ *
+ * @param subject the subject to filter
+ * @return Builder
+ */
+ public Builder subject(String subject) {
+ this.nextBySubject = subject;
+ return this;
+ }
+
+ /**
+ * Subjects filter used, these can include wildcards.
+ * Will get the last messages matching the subjects.
+ *
+ * @param subjects the subjects to get the last messages for
+ * @return Builder
+ */
+ public Builder multiLastForSubjects(String... subjects) {
+ this.multiLastFor.clear();
+ this.multiLastFor.addAll(Arrays.asList(subjects));
+ return this;
+ }
+
+ /**
+ * Subjects filter used, these can include wildcards.
+ * Will get the last messages matching the subjects.
+ *
+ * @param subjects the subjects to get the last messages for
+ * @return Builder
+ */
+ public Builder multiLastForSubjects(Collection subjects) {
+ this.multiLastFor.clear();
+ this.multiLastFor.addAll(subjects);
+ return this;
+ }
+
+ /**
+ * Only return messages up to this sequence.
+ * If not set, will be last sequence for the stream.
+ *
+ * @param upToSequence the maximum message sequence to return results for
+ * @return Builder
+ */
+ public Builder upToSequence(long upToSequence) {
+ validateGtZero(upToSequence, "Up to sequence");
+ this.upToSequence = upToSequence;
+ return this;
+ }
+
+ /**
+ * Only return messages up to this time.
+ *
+ * @param upToTime the maximum message time to return results for
+ * @return Builder
+ */
+ public Builder upToTime(ZonedDateTime upToTime) {
+ this.upToTime = upToTime;
+ return this;
+ }
+
+ /**
+ * Build the {@link MessageBatchGetRequest}.
+ *
+ * @return MessageBatchGetRequest
+ */
+ public MessageBatchGetRequest build() {
+ return new MessageBatchGetRequest(this);
+ }
+ }
+}
diff --git a/src/main/java/io/nats/client/api/MessageInfo.java b/src/main/java/io/nats/client/api/MessageInfo.java
index 206cd8b92..5ded6746f 100644
--- a/src/main/java/io/nats/client/api/MessageInfo.java
+++ b/src/main/java/io/nats/client/api/MessageInfo.java
@@ -32,6 +32,11 @@
*/
public class MessageInfo extends ApiResponse {
+ /**
+ * Message returned as a response in {@link MessageBatchGetRequest} to signal end of data.
+ */
+ public static final MessageInfo EOD = new MessageInfo(null, false);
+
private final boolean direct;
private final String subject;
private final long seq;
@@ -40,6 +45,7 @@ public class MessageInfo extends ApiResponse {
private final Headers headers;
private final String stream;
private final long lastSeq;
+ private final long numPending;
/**
* Create a Message Info
@@ -51,6 +57,25 @@ public MessageInfo(Message msg) {
this(msg, null, false);
}
+ /**
+ * Create a Message Info
+ * This signature is public for testing purposes and is not intended to be used externally.
+ * @param error the error
+ * @param direct true if the object is being created from a get direct api call instead of the standard get message
+ */
+ public MessageInfo(Error error, boolean direct) {
+ super(error);
+ this.direct = direct;
+ subject = null;
+ data = null;
+ seq = -1;
+ time = null;
+ headers = null;
+ stream = null;
+ lastSeq = -1;
+ numPending = -1;
+ }
+
/**
* Create a Message Info
* This signature is public for testing purposes and is not intended to be used externally.
@@ -70,12 +95,20 @@ public MessageInfo(Message msg, String streamName, boolean direct) {
seq = Long.parseLong(msgHeaders.getLast(NATS_SEQUENCE));
time = DateTimeUtils.parseDateTime(msgHeaders.getLast(NATS_TIMESTAMP));
stream = msgHeaders.getLast(NATS_STREAM);
- String temp = msgHeaders.getLast(NATS_LAST_SEQUENCE);
- if (temp == null) {
+ String tempLastSeq = msgHeaders.getLast(NATS_LAST_SEQUENCE);
+ if (tempLastSeq == null) {
lastSeq = -1;
}
else {
- lastSeq = JsonUtils.safeParseLong(temp, -1);
+ lastSeq = JsonUtils.safeParseLong(tempLastSeq, -1);
+ }
+ String tempNumPending = msgHeaders.getLast(NATS_NUM_PENDING);
+ if (tempNumPending == null) {
+ numPending = -1;
+ }
+ else {
+ // Num pending is +1 since it includes EOB message, correct that here.
+ numPending = Long.parseLong(tempNumPending) - 1;
}
// these are control headers, not real headers so don't give them to the user.
headers = new Headers(msgHeaders, true, MESSAGE_INFO_HEADERS);
@@ -88,6 +121,7 @@ else if (hasError()) {
headers = null;
stream = null;
lastSeq = -1;
+ numPending = -1;
}
else {
JsonValue mjv = readValue(jv, MESSAGE);
@@ -99,6 +133,7 @@ else if (hasError()) {
headers = hdrBytes == null ? null : new IncomingHeadersProcessor(hdrBytes).getHeaders();
stream = streamName;
lastSeq = -1;
+ numPending = -1;
}
}
@@ -158,11 +193,19 @@ public long getLastSeq() {
return lastSeq;
}
+ /**
+ * Amount of pending messages that can be requested with a subsequent batch request.
+ * @return number of pending messages
+ */
+ public long getNumPending() {
+ return numPending;
+ }
+
@Override
public String toString() {
StringBuilder sb = JsonUtils.beginJsonPrefixed("\"MessageInfo\":");
JsonUtils.addField(sb, "direct", direct);
- JsonUtils.addField(sb, "error", getError());
+ JsonUtils.addField(sb, ERROR, getError());
JsonUtils.addField(sb, SUBJECT, subject);
JsonUtils.addField(sb, SEQ, seq);
if (data == null) {
@@ -173,7 +216,8 @@ public String toString() {
}
JsonUtils.addField(sb, TIME, time);
JsonUtils.addField(sb, STREAM, stream);
- JsonUtils.addField(sb, "last_seq", lastSeq);
+ JsonUtils.addField(sb, LAST_SEQ, lastSeq);
+ JsonUtils.addField(sb, NUM_PENDING, numPending);
JsonUtils.addField(sb, SUBJECT, subject);
JsonUtils.addField(sb, HDRS, headers);
return JsonUtils.endJson(sb).toString();
diff --git a/src/main/java/io/nats/client/impl/NatsJetStreamImpl.java b/src/main/java/io/nats/client/impl/NatsJetStreamImpl.java
index 853bfc3ab..b712cff27 100644
--- a/src/main/java/io/nats/client/impl/NatsJetStreamImpl.java
+++ b/src/main/java/io/nats/client/impl/NatsJetStreamImpl.java
@@ -47,6 +47,7 @@ public CachedStreamInfo(StreamInfo si) {
final JetStreamOptions jso;
final boolean consumerCreate290Available;
final boolean multipleSubjectFilter210Available;
+ final boolean directBatchGet211Available;
// ----------------------------------------------------------------------------------------------------
// Create / Init
@@ -63,6 +64,7 @@ public CachedStreamInfo(StreamInfo si) {
consumerCreate290Available = conn.getInfo().isSameOrNewerThanVersion("2.9.0") && !jso.isOptOut290ConsumerCreate();
multipleSubjectFilter210Available = conn.getInfo().isNewerVersionThan("2.9.99");
+ directBatchGet211Available = conn.getInfo().isNewerVersionThan("2.10.99");
}
NatsJetStreamImpl(NatsJetStreamImpl impl) {
@@ -70,6 +72,7 @@ public CachedStreamInfo(StreamInfo si) {
jso = impl.jso;
consumerCreate290Available = impl.consumerCreate290Available;
multipleSubjectFilter210Available = impl.multipleSubjectFilter210Available;
+ directBatchGet211Available = impl.directBatchGet211Available;
}
// ----------------------------------------------------------------------------------------------------
diff --git a/src/main/java/io/nats/client/impl/NatsJetStreamManagement.java b/src/main/java/io/nats/client/impl/NatsJetStreamManagement.java
index b6fe2c7cd..7ac235f55 100644
--- a/src/main/java/io/nats/client/impl/NatsJetStreamManagement.java
+++ b/src/main/java/io/nats/client/impl/NatsJetStreamManagement.java
@@ -16,12 +16,17 @@
import io.nats.client.*;
import io.nats.client.api.Error;
import io.nats.client.api.*;
+import io.nats.client.support.Status;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.time.ZonedDateTime;
+import java.util.ArrayList;
import java.util.List;
+import java.util.concurrent.LinkedBlockingQueue;
+import static io.nats.client.support.NatsJetStreamClientError.JsAllowDirectRequired;
+import static io.nats.client.support.NatsJetStreamClientError.JsDirectBatchGet211NotAvailable;
import static io.nats.client.support.Validator.*;
public class NatsJetStreamManagement extends NatsJetStreamImpl implements JetStreamManagement {
@@ -340,6 +345,109 @@ private MessageInfo _getMessage(String streamName, MessageGetRequest messageGetR
}
}
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public List fetchMessageBatch(String streamName, MessageBatchGetRequest messageBatchGetRequest) throws IOException, JetStreamApiException {
+ validateMessageBatchGetRequest(streamName, messageBatchGetRequest);
+ List results = new ArrayList<>();
+ _requestMessageBatch(streamName, messageBatchGetRequest, msg -> {
+ if (msg != MessageInfo.EOD) {
+ results.add(msg);
+ }
+ });
+ return results;
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public LinkedBlockingQueue queueMessageBatch(String streamName, MessageBatchGetRequest messageBatchGetRequest) throws IOException, JetStreamApiException {
+ validateMessageBatchGetRequest(streamName, messageBatchGetRequest);
+ final LinkedBlockingQueue q = new LinkedBlockingQueue<>();
+ conn.getOptions().getExecutor().submit(() -> _requestMessageBatch(streamName, messageBatchGetRequest, q::add));
+ return q;
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public void requestMessageBatch(String streamName, MessageBatchGetRequest messageBatchGetRequest, MessageInfoHandler handler) throws IOException, JetStreamApiException {
+ validateMessageBatchGetRequest(streamName, messageBatchGetRequest);
+ _requestMessageBatch(streamName, messageBatchGetRequest, handler);
+ }
+
+ public void _requestMessageBatch(String streamName, MessageBatchGetRequest messageBatchGetRequest, MessageInfoHandler handler) {
+ Subscription sub = null;
+ try {
+ String replyTo = conn.createInbox();
+ sub = conn.subscribe(replyTo);
+
+ String requestSubject = prependPrefix(String.format(JSAPI_DIRECT_GET, streamName));
+ conn.publish(requestSubject, replyTo, messageBatchGetRequest.serialize());
+
+ long start = System.currentTimeMillis();
+ long maxTimeMillis = messageBatchGetRequest.getTimeout().toMillis();
+ long timeLeft = maxTimeMillis;
+ while (true) {
+ Message msg = sub.nextMessage(timeLeft);
+ if (msg == null) {
+ break;
+ }
+ if (msg.isStatusMessage()) {
+ Status status = msg.getStatus();
+ // Report error, otherwise successful status.
+ if (status.getCode() < 200 || status.getCode() > 299) {
+ MessageInfo messageInfo = new MessageInfo(Error.convert(status), true);
+ handler.onMessageInfo(messageInfo);
+ }
+ break;
+ }
+
+ Headers headers = msg.getHeaders();
+ if (headers == null || headers.getLast(NATS_NUM_PENDING) == null) {
+ throw JsDirectBatchGet211NotAvailable.instance();
+ }
+
+ MessageInfo messageInfo = new MessageInfo(msg, streamName, true);
+ handler.onMessageInfo(messageInfo);
+ timeLeft = maxTimeMillis - (System.currentTimeMillis() - start);
+ }
+ } catch (InterruptedException e) {
+ // sub.nextMessage was fetching one message
+ // and data is not completely read
+ // so it seems like this is an error condition
+ Thread.currentThread().interrupt();
+ throw new RuntimeException(e);
+ } finally {
+ try {
+ handler.onMessageInfo(MessageInfo.EOD);
+ } catch (Exception ignore) {
+ }
+ try {
+ //noinspection DataFlowIssue
+ sub.unsubscribe();
+ } catch (Exception ignore) {
+ }
+ }
+ }
+
+ private void validateMessageBatchGetRequest(String streamName, MessageBatchGetRequest messageBatchGetRequest) throws IOException, JetStreamApiException {
+ validateNotNull(messageBatchGetRequest, "Message Batch Get Request");
+
+ if (!directBatchGet211Available) {
+ throw JsDirectBatchGet211NotAvailable.instance();
+ }
+
+ CachedStreamInfo csi = getCachedStreamInfo(streamName);
+ if (!csi.allowDirect) {
+ throw JsAllowDirectRequired.instance();
+ }
+ }
+
/**
* {@inheritDoc}
*/
diff --git a/src/main/java/io/nats/client/support/ApiConstants.java b/src/main/java/io/nats/client/support/ApiConstants.java
index 16ff26f17..e3479b4b8 100644
--- a/src/main/java/io/nats/client/support/ApiConstants.java
+++ b/src/main/java/io/nats/client/support/ApiConstants.java
@@ -126,6 +126,7 @@ public interface ApiConstants {
String MTIME = "mtime";
String MIRROR = "mirror";
String MSGS = "msgs";
+ String MULTI_LAST = "multi_last";
String NAME = "name";
String NEXT_BY_SUBJECT = "next_by_subj";
String NO_ACK = "no_ack";
@@ -202,5 +203,7 @@ public interface ApiConstants {
String TLS_AVAILABLE = "tls_available";
String TOTAL = "total";
String TYPE = "type";
+ String UP_TO_SEQ = "up_to_seq";
+ String UP_TO_TIME = "up_to_time";
String VERSION = "version";
}
diff --git a/src/main/java/io/nats/client/support/NatsJetStreamClientError.java b/src/main/java/io/nats/client/support/NatsJetStreamClientError.java
index 6bda9e345..5da56cdff 100644
--- a/src/main/java/io/nats/client/support/NatsJetStreamClientError.java
+++ b/src/main/java/io/nats/client/support/NatsJetStreamClientError.java
@@ -70,6 +70,8 @@ public class NatsJetStreamClientError {
public static final NatsJetStreamClientError JsConsumerCreate290NotAvailable = new NatsJetStreamClientError(CON, 90301, "Name field not valid when v2.9.0 consumer create api is not available.");
public static final NatsJetStreamClientError JsConsumerNameDurableMismatch = new NatsJetStreamClientError(CON, 90302, "Name must match durable if both are supplied.");
public static final NatsJetStreamClientError JsMultipleFilterSubjects210NotAvailable = new NatsJetStreamClientError(CON, 90303, "Multiple filter subjects not available until server version 2.10.0.");
+ public static final NatsJetStreamClientError JsAllowDirectRequired = new NatsJetStreamClientError(CON, 90304, "Stream must have allow direct set.");
+ public static final NatsJetStreamClientError JsDirectBatchGet211NotAvailable = new NatsJetStreamClientError(CON, 90305, "Batch direct get not available until server version 2.11.0.");
@Deprecated // Fixed spelling error
public static final NatsJetStreamClientError JsSubFcHbHbNotValidQueue = new NatsJetStreamClientError(SUB, 90006, "Flow Control and/or heartbeat is not valid in queue mode.");
diff --git a/src/main/java/io/nats/client/support/NatsJetStreamConstants.java b/src/main/java/io/nats/client/support/NatsJetStreamConstants.java
index 45aed12b2..c7e4d4a3b 100644
--- a/src/main/java/io/nats/client/support/NatsJetStreamConstants.java
+++ b/src/main/java/io/nats/client/support/NatsJetStreamConstants.java
@@ -100,7 +100,8 @@ public interface NatsJetStreamConstants {
String NATS_TIMESTAMP = "Nats-Time-Stamp";
String NATS_SUBJECT = "Nats-Subject";
String NATS_LAST_SEQUENCE = "Nats-Last-Sequence";
- String[] MESSAGE_INFO_HEADERS = new String[]{NATS_SUBJECT, NATS_SEQUENCE, NATS_TIMESTAMP, NATS_STREAM, NATS_LAST_SEQUENCE};
+ String NATS_NUM_PENDING = "Nats-Num-Pending";
+ String[] MESSAGE_INFO_HEADERS = new String[]{NATS_SUBJECT, NATS_SEQUENCE, NATS_TIMESTAMP, NATS_STREAM, NATS_LAST_SEQUENCE, NATS_NUM_PENDING};
String NATS_PENDING_MESSAGES = "Nats-Pending-Messages";
String NATS_PENDING_BYTES = "Nats-Pending-Bytes";
diff --git a/src/main/java/io/nats/client/support/Validator.java b/src/main/java/io/nats/client/support/Validator.java
index 517832637..e8dcbf109 100644
--- a/src/main/java/io/nats/client/support/Validator.java
+++ b/src/main/java/io/nats/client/support/Validator.java
@@ -383,6 +383,13 @@ public static int validateGtZero(int i, String label) {
return i;
}
+ public static long validateGtZero(long l, String label) {
+ if (l < 1) {
+ throw new IllegalArgumentException(label + " must be greater than zero");
+ }
+ return l;
+ }
+
public static long validateGtZeroOrMinus1(long l, String label) {
if (zeroOrLtMinus1(l)) {
throw new IllegalArgumentException(label + " must be greater than zero or -1 for unlimited");
diff --git a/src/test/java/io/nats/client/impl/JetStreamManagementTests.java b/src/test/java/io/nats/client/impl/JetStreamManagementTests.java
index 47f25d989..e05f21434 100644
--- a/src/test/java/io/nats/client/impl/JetStreamManagementTests.java
+++ b/src/test/java/io/nats/client/impl/JetStreamManagementTests.java
@@ -23,11 +23,13 @@
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.time.Duration;
+import java.time.Instant;
+import java.time.ZoneOffset;
import java.time.ZonedDateTime;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
+import java.util.*;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.stream.Collectors;
import static io.nats.client.support.DateTimeUtils.DEFAULT_TIME;
import static io.nats.client.support.DateTimeUtils.ZONE_ID_GMT;
@@ -1548,4 +1550,227 @@ public void testCreateConsumerUpdateConsumer() throws Exception {
assertEquals(fs1, ci.getConsumerConfiguration().getFilterSubject());
});
}
+
+ @Test
+ public void testBatchDirectGet() throws Exception {
+ jsServer.run(TestBase::atLeast2_11, nc -> {
+ JetStream js = nc.jetStream();
+ JetStreamManagement jsm = nc.jetStreamManagement();
+
+ TestingStreamContainer tsc = new TestingStreamContainer(nc);
+ assertFalse(tsc.si.getConfiguration().getAllowDirect());
+
+ List expected = Arrays.asList("foo", "bar", "baz");
+ for (String data : expected) {
+ js.publish(tsc.subject(), data.getBytes(StandardCharsets.UTF_8));
+ }
+
+ List batch = new ArrayList<>();
+ MessageInfoHandler handler = msg -> {
+ if (!msg.hasError() && msg != MessageInfo.EOD) {
+ batch.add(msg);
+ }
+ };
+
+ // Stream doesn't have AllowDirect enabled, will error.
+ assertThrows(IllegalArgumentException.class, () -> {
+ MessageBatchGetRequest request = MessageBatchGetRequest.builder().build();
+ jsm.requestMessageBatch(tsc.stream, request, handler);
+ });
+
+ // Enable AllowDirect.
+ StreamConfiguration sc = StreamConfiguration.builder(tsc.si.getConfiguration()).allowDirect(true).build();
+ StreamInfo si = jsm.updateStream(sc);
+ assertTrue(si.getConfiguration().getAllowDirect());
+
+ // Empty request errors.
+ AtomicBoolean hasError = new AtomicBoolean();
+ MessageInfoHandler errorHandler = msg -> {
+ hasError.compareAndSet(false, msg.hasError());
+ };
+ MessageBatchGetRequest request = MessageBatchGetRequest.builder().build();
+ jsm.requestMessageBatch(tsc.stream, request, errorHandler);
+ assertTrue(hasError.get());
+ List list = jsm.fetchMessageBatch(tsc.stream, request);
+ assertEquals(1, list.size());
+ assertTrue(list.get(0).hasError());
+ LinkedBlockingQueue queue = jsm.queueMessageBatch(tsc.stream, request);
+ assertTrue(queue.take().hasError());
+ assertEquals(MessageInfo.EOD, queue.take());
+
+ // First batch gets first two messages.
+ request = MessageBatchGetRequest.builder()
+ .batch(2)
+ .subject(tsc.subject())
+ .build();
+ jsm.requestMessageBatch(tsc.stream, request, handler);
+ MessageInfo last = batch.get(batch.size() - 1);
+ assertEquals(1, last.getNumPending());
+ assertEquals(2, last.getSeq());
+ assertEquals(1, last.getLastSeq());
+
+ // Second batch gets last message.
+ request = MessageBatchGetRequest.builder(request)
+ .sequence(last.getSeq() + 1)
+ .build();
+ jsm.requestMessageBatch(tsc.stream, request, handler);
+
+ List actual = batch.stream().map(m -> new String(m.getData())).collect(Collectors.toList());
+ assertEquals(expected, actual);
+
+ last = batch.get(batch.size() - 1);
+ assertEquals(0, last.getNumPending());
+ assertEquals(3, last.getSeq());
+ assertEquals(0, last.getLastSeq());
+ });
+ }
+
+ @Test
+ public void testBatchDirectGetAlternatives() throws Exception {
+ jsServer.run(TestBase::atLeast2_11, nc -> {
+ JetStream js = nc.jetStream();
+ JetStreamManagement jsm = nc.jetStreamManagement();
+
+ TestingStreamContainer tsc = new TestingStreamContainer(nc);
+ assertFalse(tsc.si.getConfiguration().getAllowDirect());
+
+ // Enable AllowDirect.
+ StreamConfiguration sc = StreamConfiguration.builder(tsc.si.getConfiguration()).allowDirect(true).build();
+ StreamInfo si = jsm.updateStream(sc);
+ assertTrue(si.getConfiguration().getAllowDirect());
+
+ List expected = Arrays.asList("foo", "bar", "baz");
+ for (String data : expected) {
+ js.publish(tsc.subject(), data.getBytes(StandardCharsets.UTF_8));
+ }
+
+ // Request stays the same for all options.
+ MessageBatchGetRequest request = MessageBatchGetRequest.builder()
+ .batch(3)
+ .subject(tsc.subject())
+ .build();
+
+ // Get using handler.
+ List batch = new ArrayList<>();
+ MessageInfoHandler handler = msg -> {
+ if (!msg.hasError() && msg != MessageInfo.EOD) {
+ batch.add(msg);
+ }
+ };
+ jsm.requestMessageBatch(tsc.stream, request, handler);
+ assertEquals(3, batch.size());
+ MessageInfo last = batch.get(batch.size() - 1);
+ assertEquals(0, last.getNumPending());
+ assertEquals(3, last.getSeq());
+ assertEquals(2, last.getLastSeq());
+
+ // Get using queue.
+ batch.clear();
+ LinkedBlockingQueue queue = jsm.queueMessageBatch(tsc.stream, request);
+ MessageInfo msg;
+ while ((msg = queue.take()) != MessageInfo.EOD) {
+ if (!msg.hasError()) {
+ batch.add(msg);
+ }
+ }
+ assertEquals(3, batch.size());
+ last = batch.get(batch.size() - 1);
+ assertEquals(0, last.getNumPending());
+ assertEquals(3, last.getSeq());
+ assertEquals(2, last.getLastSeq());
+
+ // Get using fetch.
+ batch.clear();
+ batch.addAll(jsm.fetchMessageBatch(tsc.stream, request));
+ assertEquals(3, batch.size());
+ last = batch.get(batch.size() - 1);
+ assertEquals(0, last.getNumPending());
+ assertEquals(3, last.getSeq());
+ assertEquals(2, last.getLastSeq());
+ });
+ }
+
+ @Test
+ public void testBatchDirectGetMultiLast() throws Exception {
+ jsServer.run(TestBase::atLeast2_11, nc -> {
+ JetStream js = nc.jetStream();
+ JetStreamManagement jsm = nc.jetStreamManagement();
+
+ String stream = stream();
+ jsm.addStream(StreamConfiguration.builder()
+ .name(stream)
+ .subjects(stream + ".a.>")
+ .allowDirect(true)
+ .build());
+
+ String subjectAFoo = stream + ".a.foo";
+ String subjectABar = stream + ".a.bar";
+ String subjectABaz = stream + ".a.baz";
+ js.publish(subjectAFoo, "foo".getBytes(StandardCharsets.UTF_8));
+ js.publish(subjectABar, "bar".getBytes(StandardCharsets.UTF_8));
+ js.publish(subjectABaz, "baz".getBytes(StandardCharsets.UTF_8));
+
+ MessageBatchGetRequest request = MessageBatchGetRequest.builder()
+ .multiLastForSubjects(subjectAFoo, subjectABaz)
+ .build();
+
+ List keys = new ArrayList<>();
+ MessageInfoHandler handler = msg -> {
+ if (!msg.hasError() && msg != MessageInfo.EOD) {
+ keys.add(msg.getSubject());
+ }
+ };
+ jsm.requestMessageBatch(stream, request, handler);
+ assertEquals(2, keys.size());
+ assertEquals(subjectAFoo, keys.get(0));
+ assertEquals(subjectABaz, keys.get(1));
+ });
+ }
+
+ @Test
+ public void testBatchDirectGetBuilder() {
+ // Default timeout
+ assertEquals(Duration.ofSeconds(5), MessageBatchGetRequest.builder().build().getTimeout());
+
+ // Request options.
+ MessageBatchGetRequest requestOptions = MessageBatchGetRequest.builder()
+ .timeout(Duration.ofSeconds(1))
+ .maxBytes(1234)
+ .batch(2)
+ .build();
+ assertEquals(Duration.ofSeconds(1), requestOptions.getTimeout());
+ assertEquals(1234, requestOptions.getMaxBytes());
+ assertEquals(2, requestOptions.getBatch());
+ assertEquals("{\"batch\":2,\"max_bytes\":1234}", requestOptions.toJson());
+
+ // Batch direct get - simple
+ ZonedDateTime time = Instant.EPOCH.atZone(ZoneOffset.UTC);
+ MessageBatchGetRequest simple = MessageBatchGetRequest.builder()
+ .sequence(1)
+ .startTime(time)
+ .subject("subject")
+ .build();
+ assertEquals(1, simple.getSequence());
+ assertEquals(time, simple.getStartTime());
+ assertEquals("subject", simple.getSubject());
+ assertEquals("{\"seq\":1,\"start_time\":\"1970-01-01T00:00:00.000000000Z\",\"next_by_subj\":\"subject\"}", simple.toJson());
+
+ // Batch direct get - multi last
+ List multiLastFor = Collections.singletonList("multi.last");
+ MessageBatchGetRequest multiLast = MessageBatchGetRequest.builder()
+ .multiLastForSubjects("multi.last")
+ .upToSequence(1)
+ .upToTime(time)
+ .build();
+ assertEquals(Collections.singletonList("multi.last"), multiLast.getMultiLastForSubjects());
+ assertEquals(1, multiLast.getUpToSequence());
+ assertEquals(time, multiLast.getUpToTime());
+ assertEquals("{\"multi_last\":[\"multi.last\"],\"up_to_seq\":1,\"up_to_time\":\"1970-01-01T00:00:00.000000000Z\"}", multiLast.toJson());
+
+ MessageBatchGetRequest multiLastAlternative = MessageBatchGetRequest.builder()
+ .multiLastForSubjects(multiLastFor)
+ .build();
+ assertEquals(multiLastFor, multiLastAlternative.getMultiLastForSubjects());
+ assertEquals("{\"multi_last\":[\"multi.last\"]}", multiLastAlternative.toJson());
+ }
}