diff --git a/src/examples/java/io/nats/examples/jetstream/simple/ConsumeManuallyCallNext.java b/src/examples/java/io/nats/examples/jetstream/simple/ConsumeManuallyCallNext.java index 405fab7e5..e48d5188d 100644 --- a/src/examples/java/io/nats/examples/jetstream/simple/ConsumeManuallyCallNext.java +++ b/src/examples/java/io/nats/examples/jetstream/simple/ConsumeManuallyCallNext.java @@ -1,3 +1,16 @@ +// Copyright 2020-2023 The NATS Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at: +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + package io.nats.examples.jetstream.simple; import io.nats.client.*; @@ -5,8 +18,12 @@ import java.time.Duration; +import static io.nats.examples.jetstream.simple.Utils.Publisher; +import static io.nats.examples.jetstream.simple.Utils.setupStream; + /** - * This example will demonstrate simplified manual consume + * This example will demonstrate simplified manual consume. + * SIMPLIFICATION IS EXPERIMENTAL AND SUBJECT TO CHANGE */ public class ConsumeManuallyCallNext { private static final String STREAM = "simple-stream"; @@ -25,7 +42,7 @@ public static void main(String[] args) { JetStreamManagement jsm = nc.jetStreamManagement(); JetStream js = nc.jetStream(); - Utils.setupStream(jsm, STREAM, SUBJECT); + setupStream(jsm, STREAM, SUBJECT); String name = "simple-consumer-" + NUID.nextGlobal(); diff --git a/src/examples/java/io/nats/examples/jetstream/simple/ConsumeWithHandler.java b/src/examples/java/io/nats/examples/jetstream/simple/ConsumeWithHandler.java index 1b93c92ec..927d165f4 100644 --- a/src/examples/java/io/nats/examples/jetstream/simple/ConsumeWithHandler.java +++ b/src/examples/java/io/nats/examples/jetstream/simple/ConsumeWithHandler.java @@ -1,3 +1,16 @@ +// Copyright 2020-2023 The NATS Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at: +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + package io.nats.examples.jetstream.simple; import io.nats.client.*; @@ -7,8 +20,12 @@ import java.util.concurrent.CountDownLatch; import java.util.concurrent.atomic.AtomicInteger; +import static io.nats.examples.jetstream.simple.Utils.Publisher; +import static io.nats.examples.jetstream.simple.Utils.setupStream; + /** - * This example will demonstrate simplified fetch + * This example will demonstrate simplified consume with a handler + * SIMPLIFICATION IS EXPERIMENTAL AND SUBJECT TO CHANGE */ public class ConsumeWithHandler { private static final String STREAM = "simple-stream"; @@ -27,7 +44,7 @@ public static void main(String[] args) { JetStreamManagement jsm = nc.jetStreamManagement(); JetStream js = nc.jetStream(); - Utils.setupStream(jsm, STREAM, SUBJECT); + setupStream(jsm, STREAM, SUBJECT); String name = "simple-consumer-" + NUID.nextGlobal(); diff --git a/src/examples/java/io/nats/examples/jetstream/simple/FetchExample.java b/src/examples/java/io/nats/examples/jetstream/simple/FetchExample.java index 3975f2173..5c6ac426e 100644 --- a/src/examples/java/io/nats/examples/jetstream/simple/FetchExample.java +++ b/src/examples/java/io/nats/examples/jetstream/simple/FetchExample.java @@ -1,3 +1,16 @@ +// Copyright 2020-2023 The NATS Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at: +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + package io.nats.examples.jetstream.simple; import io.nats.client.*; @@ -7,6 +20,7 @@ /** * This example will demonstrate simplified fetch + * SIMPLIFICATION IS EXPERIMENTAL AND SUBJECT TO CHANGE */ public class FetchExample { private static final String STREAM = "simple-stream"; diff --git a/src/examples/java/io/nats/examples/jetstream/simple/Publisher.java b/src/examples/java/io/nats/examples/jetstream/simple/Publisher.java deleted file mode 100644 index db9a0186d..000000000 --- a/src/examples/java/io/nats/examples/jetstream/simple/Publisher.java +++ /dev/null @@ -1,36 +0,0 @@ -package io.nats.examples.jetstream.simple; - -import io.nats.client.JetStream; - -import java.util.concurrent.ThreadLocalRandom; -import java.util.concurrent.atomic.AtomicBoolean; - -class Publisher implements Runnable { - private final JetStream js; - private final String subject; - private final int jitter; - private final AtomicBoolean keepGoing = new AtomicBoolean(true); - private int pubNo; - - public Publisher(JetStream js, String subject, int jitter) { - this.js = js; - this.subject = subject; - this.jitter = jitter; - } - - public void stop() { - keepGoing.set(false); - } - - @Override - public void run() { - try { - while (keepGoing.get()) { - Thread.sleep(ThreadLocalRandom.current().nextLong(jitter)); - js.publish(subject, ("simple-message-" + (++pubNo)).getBytes()); - } - } catch (Exception e) { - throw new RuntimeException(e); - } - } -} diff --git a/src/examples/java/io/nats/examples/jetstream/simple/ThreeDifferentConsumers.java b/src/examples/java/io/nats/examples/jetstream/simple/ThreeDifferentConsumers.java new file mode 100644 index 000000000..160a324d8 --- /dev/null +++ b/src/examples/java/io/nats/examples/jetstream/simple/ThreeDifferentConsumers.java @@ -0,0 +1,146 @@ +// Copyright 2020-2023 The NATS Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at: +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package io.nats.examples.jetstream.simple; + +import io.nats.client.*; +import io.nats.client.api.ConsumerConfiguration; + +import java.util.concurrent.ThreadLocalRandom; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; + +import static io.nats.examples.jetstream.simple.Utils.Publisher; +import static io.nats.examples.jetstream.simple.Utils.setupStream; + +/** + * This example will demonstrate all 3 simplified consumes running at the same time. + * It just runs forever until you manually stop it. + * SIMPLIFICATION IS EXPERIMENTAL AND SUBJECT TO CHANGE + */ +public class ThreeDifferentConsumers { + private static final String STREAM = "simple-stream"; + private static final String SUBJECT = "simple-subject"; + private static final int REPORT_EVERY = 100; + private static final int JITTER = 30; + + // change this is you need to... + public static String SERVER = "nats://localhost:4222"; + + public static void main(String[] args) { + Options options = Options.builder().server(SERVER).build(); + try (Connection nc = Nats.connect(options)) { + + JetStreamManagement jsm = nc.jetStreamManagement(); + JetStream js = nc.jetStream(); + + setupStream(jsm, STREAM, SUBJECT); + + String name1 = "next-" + NUID.nextGlobal(); + String name2 = "handle-" + NUID.nextGlobal(); + String name3 = "fetch-" + NUID.nextGlobal(); + + jsm.addOrUpdateConsumer(STREAM, ConsumerConfiguration.builder().durable(name1).build()); + jsm.addOrUpdateConsumer(STREAM, ConsumerConfiguration.builder().durable(name2).build()); + jsm.addOrUpdateConsumer(STREAM, ConsumerConfiguration.builder().durable(name3).build()); + + // Consumer[Context] + ConsumerContext ctx1 = js.getConsumerContext(STREAM, name1); + ConsumerContext ctx2 = js.getConsumerContext(STREAM, name2); + ConsumerContext ctx3 = js.getConsumerContext(STREAM, name3); + + // create the consumer then use it + ManualConsumer con1 = ctx1.consume(); + + Thread con1Thread = new Thread(() -> { + long mark = System.currentTimeMillis(); + int count = 0; + long report = randomReportInterval(); + try { + while (true) { + Message msg = con1.nextMessage(1000); + if (msg != null) { + msg.ack(); + ++count; + if (System.currentTimeMillis() - mark > report) { + System.out.println("Manual " + count + " messages."); + mark = System.currentTimeMillis(); + report = randomReportInterval(); + } + } + } + } + catch (InterruptedException e) { + throw new RuntimeException(e); + } + }); + con1Thread.start(); + + Publisher publisher = new Publisher(js, SUBJECT, JITTER); + Thread pubThread = new Thread(publisher); + pubThread.start(); + + Thread.sleep(1000); // just makes the consumers be reading different messages + AtomicInteger atomicCount = new AtomicInteger(); + AtomicLong atomicMark = new AtomicLong(System.currentTimeMillis()); + AtomicLong atomicReport = new AtomicLong(randomReportInterval()); + + MessageHandler handler = msg -> { + msg.ack(); + int count = atomicCount.incrementAndGet(); + if (System.currentTimeMillis() - atomicMark.get() > atomicReport.get()) { + System.out.println("Handled " + count + " messages."); + atomicMark.set(System.currentTimeMillis()); + atomicReport.set(randomReportInterval()); + } + }; + SimpleConsumer con2 = ctx2.consume(handler); + + Thread.sleep(1000); // just makes the consumers be reading different messages + Thread con2Thread = new Thread(() -> { + int count = 0; + long mark = System.currentTimeMillis(); + long report = randomReportInterval(); + try { + while (true) { + FetchConsumer fc = ctx3.fetch(REPORT_EVERY); + Message msg = fc.nextMessage(); + while (msg != null) { + msg.ack(); + ++count; + if (System.currentTimeMillis() - mark > report) { + System.out.println("Fetched " + count + " messages."); + mark = System.currentTimeMillis(); + report = randomReportInterval(); + } + msg = fc.nextMessage(); + } + } + } + catch (Exception e) { + throw new RuntimeException(e); + } + }); + con2Thread.start(); + + con2Thread.join(); // never ends so program runs until stopped. + } + catch (Exception e) { + e.printStackTrace(); + } + } + + private static long randomReportInterval() { + return 1000 + ThreadLocalRandom.current().nextLong(1000); + } +} diff --git a/src/examples/java/io/nats/examples/jetstream/simple/Utils.java b/src/examples/java/io/nats/examples/jetstream/simple/Utils.java index c6f852137..5a379c733 100644 --- a/src/examples/java/io/nats/examples/jetstream/simple/Utils.java +++ b/src/examples/java/io/nats/examples/jetstream/simple/Utils.java @@ -1,3 +1,16 @@ +// Copyright 2020-2023 The NATS Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at: +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + package io.nats.examples.jetstream.simple; import io.nats.client.Connection; @@ -8,6 +21,8 @@ import io.nats.client.api.StorageType; import java.io.IOException; +import java.util.concurrent.ThreadLocalRandom; +import java.util.concurrent.atomic.AtomicBoolean; import static io.nats.examples.jetstream.NatsJsUtils.createOrReplaceStream; @@ -63,4 +78,34 @@ public static void setupConsumer(JetStreamManagement jsm, String stream, String .build(); jsm.addOrUpdateConsumer(stream, cc); } + + public static class Publisher implements Runnable { + private final JetStream js; + private final String subject; + private final int jitter; + private final AtomicBoolean keepGoing = new AtomicBoolean(true); + private int pubNo; + + public Publisher(JetStream js, String subject, int jitter) { + this.js = js; + this.subject = subject; + this.jitter = jitter; + } + + public void stop() { + keepGoing.set(false); + } + + @Override + public void run() { + try { + while (keepGoing.get()) { + Thread.sleep(ThreadLocalRandom.current().nextLong(jitter)); + js.publish(subject, ("simple-message-" + (++pubNo)).getBytes()); + } + } catch (Exception e) { + throw new RuntimeException(e); + } + } + } } diff --git a/src/main/java/io/nats/client/BaseConsumeOptions.java b/src/main/java/io/nats/client/BaseConsumeOptions.java index 8b1204026..12f61cc3d 100644 --- a/src/main/java/io/nats/client/BaseConsumeOptions.java +++ b/src/main/java/io/nats/client/BaseConsumeOptions.java @@ -18,6 +18,7 @@ /** * Base Consume Options are provided to customize the way the * consume and fetch operate. It is the base class for FetchConsumeOptions + * SIMPLIFICATION IS EXPERIMENTAL AND SUBJECT TO CHANGE */ public class BaseConsumeOptions { public static final int DEFAULT_MESSAGE_COUNT = 100; diff --git a/src/main/java/io/nats/client/ConsumeOptions.java b/src/main/java/io/nats/client/ConsumeOptions.java index 6bf83e711..a0311afad 100644 --- a/src/main/java/io/nats/client/ConsumeOptions.java +++ b/src/main/java/io/nats/client/ConsumeOptions.java @@ -15,6 +15,7 @@ /** * Consume Options are provided to customize the consume operation. + * SIMPLIFICATION IS EXPERIMENTAL AND SUBJECT TO CHANGE */ public class ConsumeOptions extends BaseConsumeOptions { public static ConsumeOptions DEFAULT_CONSUME_OPTIONS = ConsumeOptions.builder().build(); diff --git a/src/main/java/io/nats/client/ConsumerContext.java b/src/main/java/io/nats/client/ConsumerContext.java index 0a799842c..cc3a92b5c 100644 --- a/src/main/java/io/nats/client/ConsumerContext.java +++ b/src/main/java/io/nats/client/ConsumerContext.java @@ -18,7 +18,7 @@ import java.io.IOException; /** - * TODO + * SIMPLIFICATION IS EXPERIMENTAL AND SUBJECT TO CHANGE */ public interface ConsumerContext { String getName(); diff --git a/src/main/java/io/nats/client/FetchConsumeOptions.java b/src/main/java/io/nats/client/FetchConsumeOptions.java index f08b735ac..39bb5c3a5 100644 --- a/src/main/java/io/nats/client/FetchConsumeOptions.java +++ b/src/main/java/io/nats/client/FetchConsumeOptions.java @@ -15,6 +15,7 @@ /** * Fetch Consume Options are provided to customize the fetch operation. + * SIMPLIFICATION IS EXPERIMENTAL AND SUBJECT TO CHANGE */ public class FetchConsumeOptions extends BaseConsumeOptions { private FetchConsumeOptions(Builder b) { diff --git a/src/main/java/io/nats/client/FetchConsumer.java b/src/main/java/io/nats/client/FetchConsumer.java index c2f19fa1a..b17a8029d 100644 --- a/src/main/java/io/nats/client/FetchConsumer.java +++ b/src/main/java/io/nats/client/FetchConsumer.java @@ -13,6 +13,11 @@ package io.nats.client; +/** + * A fetch consumer gets messages by calling nextMessage. + * nextMessage returns null when there are no more messages + * SIMPLIFICATION IS EXPERIMENTAL AND SUBJECT TO CHANGE + */ public interface FetchConsumer extends SimpleConsumer { Message nextMessage() throws InterruptedException; } diff --git a/src/main/java/io/nats/client/JetStream.java b/src/main/java/io/nats/client/JetStream.java index 8f87a994c..111103e4d 100644 --- a/src/main/java/io/nats/client/JetStream.java +++ b/src/main/java/io/nats/client/JetStream.java @@ -13,7 +13,6 @@ package io.nats.client; -import io.nats.client.api.ConsumerConfiguration; import io.nats.client.api.PublishAck; import io.nats.client.impl.Headers; @@ -499,7 +498,40 @@ public interface JetStream { */ JetStreamSubscription subscribe(String subject, Dispatcher dispatcher, MessageHandler handler, PullSubscribeOptions options) throws IOException, JetStreamApiException; + /** + * Create a stream context for a specific named stream. Verifies that the stream exists. + * EXPERIMENTAL API SUBJECT TO CHANGE + * @param streamName the name of the stream + * @return a StreamContext object + * @throws IOException covers various communication issues with the NATS + * server such as timeout or interruption + * @throws JetStreamApiException the request had an error related to the data + */ StreamContext getStreamContext(String streamName) throws IOException, JetStreamApiException; + + /** + * Create a consumer context for a specific named stream and specific named consumer. + * Verifies that the stream and consumer exist. + * EXPERIMENTAL API SUBJECT TO CHANGE + * @param streamName the name of the stream + * @param consumerName the name of the consumer + * @return a ConsumerContext object + * @throws IOException covers various communication issues with the NATS + * server such as timeout or interruption + * @throws JetStreamApiException the request had an error related to the data + */ ConsumerContext getConsumerContext(String streamName, String consumerName) throws IOException, JetStreamApiException; - ConsumerContext getConsumerContext(String streamName, ConsumerConfiguration consumerConfiguration) throws IOException, JetStreamApiException; + +// /** +// * Create a consumer context for a specific named stream and specific named consumer. +// * Verifies that the stream and consumer exist. +// * EXPERIMENTAL API SUBJECT TO CHANGE +// * @param streamName the name of the stream +// * @param consumerConfiguration TODO +// * @return a ConsumerContext object +// * @throws IOException covers various communication issues with the NATS +// * server such as timeout or interruption +// * @throws JetStreamApiException the request had an error related to the data +// */ +// ConsumerContext getConsumerContext(String streamName, ConsumerConfiguration consumerConfiguration) throws IOException, JetStreamApiException; } diff --git a/src/main/java/io/nats/client/SimpleConsumer.java b/src/main/java/io/nats/client/SimpleConsumer.java index 32f6be8bb..2de13571a 100644 --- a/src/main/java/io/nats/client/SimpleConsumer.java +++ b/src/main/java/io/nats/client/SimpleConsumer.java @@ -19,6 +19,11 @@ import java.time.Duration; import java.util.concurrent.CompletableFuture; +/** + * The Simple Consumer interface is the core interface replacing + * a subscription for a simplified consumer. + * SIMPLIFICATION IS EXPERIMENTAL AND SUBJECT TO CHANGE + */ public interface SimpleConsumer { ConsumerInfo getConsumerInfo() throws IOException, JetStreamApiException; void unsubscribe(); diff --git a/src/main/java/io/nats/client/StreamContext.java b/src/main/java/io/nats/client/StreamContext.java index a70cc5850..2c4b23318 100644 --- a/src/main/java/io/nats/client/StreamContext.java +++ b/src/main/java/io/nats/client/StreamContext.java @@ -21,7 +21,7 @@ import java.io.IOException; /** - * TODO + * SIMPLIFICATION IS EXPERIMENTAL AND SUBJECT TO CHANGE */ public interface StreamContext { String getStream(); @@ -32,5 +32,5 @@ public interface StreamContext { boolean deleteConsumer(String consumerName) throws IOException, JetStreamApiException; ConsumerContext getConsumerContext(String consumerName) throws IOException, JetStreamApiException; - ConsumerContext getConsumerContext(ConsumerConfiguration config) throws IOException, JetStreamApiException; +// ConsumerContext getConsumerContext(ConsumerConfiguration config) throws IOException, JetStreamApiException; } diff --git a/src/main/java/io/nats/client/impl/NatsConsumerContext.java b/src/main/java/io/nats/client/impl/NatsConsumerContext.java index b6eb0f34d..f02e33d45 100644 --- a/src/main/java/io/nats/client/impl/NatsConsumerContext.java +++ b/src/main/java/io/nats/client/impl/NatsConsumerContext.java @@ -23,7 +23,7 @@ import static io.nats.client.ConsumeOptions.DEFAULT_CONSUME_OPTIONS; /** - * TODO + * SIMPLIFICATION IS EXPERIMENTAL AND SUBJECT TO CHANGE */ public class NatsConsumerContext extends NatsStreamContext implements ConsumerContext { diff --git a/src/main/java/io/nats/client/impl/NatsJetStream.java b/src/main/java/io/nats/client/impl/NatsJetStream.java index 14756acc6..8b5c93a33 100644 --- a/src/main/java/io/nats/client/impl/NatsJetStream.java +++ b/src/main/java/io/nats/client/impl/NatsJetStream.java @@ -610,6 +610,9 @@ public JetStreamSubscription subscribe(String subject, PullSubscribeOptions opti return createSubscription(subject, null, null, null, false, null, options); } + /** + * {@inheritDoc} + */ @Override public JetStreamSubscription subscribe(String subject, Dispatcher dispatcher, MessageHandler handler, PullSubscribeOptions options) throws IOException, JetStreamApiException { validateSubject(subject, isSubjectRequired(options)); @@ -623,6 +626,9 @@ private boolean isSubjectRequired(SubscribeOptions options) { return options == null || !options.isBind(); } + /** + * {@inheritDoc} + */ @Override public StreamContext getStreamContext(String streamName) throws IOException, JetStreamApiException { return getNatsStreamContext(streamName); @@ -632,15 +638,21 @@ private NatsStreamContext getNatsStreamContext(String streamName) throws IOExcep return new NatsStreamContext(conn, jso, streamName); } + /** + * {@inheritDoc} + */ @Override public ConsumerContext getConsumerContext(String streamName, String consumerName) throws IOException, JetStreamApiException { Validator.required(consumerName, "Consumer Name"); return new NatsConsumerContext(getNatsStreamContext(streamName), consumerName, null); } - @Override - public ConsumerContext getConsumerContext(String streamName, ConsumerConfiguration consumerConfiguration) throws IOException, JetStreamApiException { - Validator.required(consumerConfiguration, "Consumer Configuration"); - return new NatsConsumerContext(getNatsStreamContext(streamName), null, consumerConfiguration); - } +// /** +// * {@inheritDoc} +// */ +// @Override +// public ConsumerContext getConsumerContext(String streamName, ConsumerConfiguration consumerConfiguration) throws IOException, JetStreamApiException { +// Validator.required(consumerConfiguration, "Consumer Configuration"); +// return new NatsConsumerContext(getNatsStreamContext(streamName), null, consumerConfiguration); +// } } diff --git a/src/main/java/io/nats/client/impl/NatsStreamContext.java b/src/main/java/io/nats/client/impl/NatsStreamContext.java index 26807621c..cdc954baf 100644 --- a/src/main/java/io/nats/client/impl/NatsStreamContext.java +++ b/src/main/java/io/nats/client/impl/NatsStreamContext.java @@ -25,9 +25,9 @@ import java.io.IOException; /** - * TODO + * SIMPLIFICATION IS EXPERIMENTAL AND SUBJECT TO CHANGE */ -public class NatsStreamContext implements StreamContext { +class NatsStreamContext implements StreamContext { final NatsJetStreamManagement jsm; final String stream; @@ -72,8 +72,8 @@ public ConsumerContext getConsumerContext(String consumerName) throws IOExceptio return new NatsConsumerContext(jsm.conn, jsm.jso, stream, consumerName); } - @Override - public ConsumerContext getConsumerContext(ConsumerConfiguration config) throws IOException, JetStreamApiException { - return new NatsConsumerContext(jsm.conn, jsm.jso, stream, config); - } +// @Override +// public ConsumerContext getConsumerContext(ConsumerConfiguration config) throws IOException, JetStreamApiException { +// return new NatsConsumerContext(jsm.conn, jsm.jso, stream, config); +// } }