diff --git a/src/examples/java/io/nats/examples/jetstream/simple/IterableConsumerExample.java b/src/examples/java/io/nats/examples/jetstream/simple/IterableConsumerExample.java index e01cf8959..6276c60cd 100644 --- a/src/examples/java/io/nats/examples/jetstream/simple/IterableConsumerExample.java +++ b/src/examples/java/io/nats/examples/jetstream/simple/IterableConsumerExample.java @@ -26,13 +26,12 @@ * SIMPLIFICATION IS EXPERIMENTAL AND SUBJECT TO CHANGE */ public class IterableConsumerExample { - private static final String STREAM = "manually-stream"; - private static final String SUBJECT = "manually-subject"; - private static final String CONSUMER_NAME = "manually-consumer"; - private static final String MESSAGE_TEXT = "manually"; + private static final String STREAM = "iterable-stream"; + private static final String SUBJECT = "iterable-subject"; + private static final String CONSUMER_NAME = "iterable-consumer"; + private static final String MESSAGE_TEXT = "iterable"; private static final int STOP_COUNT = 500; private static final int REPORT_EVERY = 50; - private static final int JITTER = 20; private static final String SERVER = "nats://localhost:4222"; @@ -58,6 +57,12 @@ public static void main(String[] args) { return; } + System.out.println("Starting publish..."); + Publisher publisher = new Publisher(js, SUBJECT, MESSAGE_TEXT, 10); + Thread pubThread = new Thread(publisher); + pubThread.start(); + + // set up the iterable consumer Thread consumeThread = new Thread(() -> { int count = 0; long start = System.currentTimeMillis(); @@ -68,23 +73,28 @@ public static void main(String[] args) { if (msg != null) { msg.ack(); if (++count % REPORT_EVERY == 0) { - report("Main Loop Running", System.currentTimeMillis() - start, count); + report("Main loop running", System.currentTimeMillis() - start, count); } } } - report("Main Loop Stopped", System.currentTimeMillis() - start, count); + report("Main loop stopped", System.currentTimeMillis() - start, count); - System.out.println("Pausing for effect...allow more messages come across."); - Thread.sleep(JITTER * 2); // allows more messages to come across + // The consumer has at least 1 pull request active. When stop is called, + // no more pull requests will be made, but messages already requested + // will still come across the wire to the client. consumer.stop(); System.out.println("Starting post-stop loop."); - Message msg = consumer.nextMessage(1000); - while (msg != null) { - msg.ack(); - report("Post-stop loop running", System.currentTimeMillis() - start, ++count); - msg = consumer.nextMessage(1000); + while (!consumer.isFinished()) { + Message msg = consumer.nextMessage(1000); + if (msg != null) { + msg.ack(); + if (++count % REPORT_EVERY == 0) { + report("Post-stop loop running", System.currentTimeMillis() - start, ++count); + } + } } + report("Post-stop loop stopped", System.currentTimeMillis() - start, count); } catch (JetStreamStatusCheckedException | InterruptedException | IOException | JetStreamApiException e) { // JetStreamStatusCheckedException: @@ -105,13 +115,9 @@ public static void main(String[] args) { report("Done", System.currentTimeMillis() - start, count); }); consumeThread.start(); - - Publisher publisher = new Publisher(js, SUBJECT, MESSAGE_TEXT, JITTER); - Thread pubThread = new Thread(publisher); - pubThread.start(); - consumeThread.join(); - publisher.stopPublishing(); + + publisher.stopPublishing(); // otherwise it will complain when the connection goes away pubThread.join(); } catch (IOException | InterruptedException ioe) { diff --git a/src/examples/java/io/nats/examples/jetstream/simple/MessageConsumerExample.java b/src/examples/java/io/nats/examples/jetstream/simple/MessageConsumerExample.java index e9aa2f397..196135f10 100644 --- a/src/examples/java/io/nats/examples/jetstream/simple/MessageConsumerExample.java +++ b/src/examples/java/io/nats/examples/jetstream/simple/MessageConsumerExample.java @@ -20,18 +20,18 @@ import java.util.concurrent.CountDownLatch; import java.util.concurrent.atomic.AtomicInteger; +import static io.nats.examples.jetstream.simple.Utils.Publisher; import static io.nats.examples.jetstream.simple.Utils.createOrReplaceStream; -import static io.nats.examples.jetstream.simple.Utils.publish; /** * This example will demonstrate simplified consume with a handler * SIMPLIFICATION IS EXPERIMENTAL AND SUBJECT TO CHANGE */ public class MessageConsumerExample { - private static final String STREAM = "consume-handler-stream"; - private static final String SUBJECT = "consume-handler-subject"; - private static final String CONSUMER_NAME = "consume-handler-consumer"; - private static final String MESSAGE_TEXT = "consume-handler"; + private static final String STREAM = "consume-stream"; + private static final String SUBJECT = "consume-subject"; + private static final String CONSUMER_NAME = "consume-consumer"; + private static final String MESSAGE_TEXT = "consume"; private static final int STOP_COUNT = 500; private static final int REPORT_EVERY = 100; @@ -43,9 +43,11 @@ public static void main(String[] args) { JetStream js = nc.jetStream(); createOrReplaceStream(nc.jetStreamManagement(), STREAM, SUBJECT); - // publishing so there are lots of messages - System.out.println("Publishing..."); - publish(js, SUBJECT, MESSAGE_TEXT, 2500); + System.out.println("Starting publish..."); +// publish(js, SUBJECT, MESSAGE_TEXT, 2500); + Publisher publisher = new Publisher(js, SUBJECT, MESSAGE_TEXT, 10); + Thread pubThread = new Thread(publisher); + pubThread.start(); // get stream context, create consumer and get the consumer context StreamContext streamContext; @@ -81,10 +83,17 @@ public static void main(String[] args) { try { MessageConsumer consumer = consumerContext.consume(handler); latch.await(); - // once the consumer is stopped, the client will drain messages + + // The consumer has at least 1 pull request active. When stop is called, + // no more pull requests will be made, but messages already requested + // will still come across the wire to the client. System.out.println("Stop the consumer..."); consumer.stop(); - Thread.sleep(1000); // enough for messages to drain after stop + + // wait until the consumer is finished + while (!consumer.isFinished()) { + Thread.sleep(10); + } } catch (JetStreamApiException | IOException e) { // JetStreamApiException: @@ -103,6 +112,9 @@ public static void main(String[] args) { } report("Final", start, atomicCount.get()); + + publisher.stopPublishing(); // otherwise it will complain when the connection goes away + pubThread.join(); } catch (IOException | InterruptedException ioe) { // IOException: diff --git a/src/examples/java/io/nats/examples/jetstream/simple/Utils.java b/src/examples/java/io/nats/examples/jetstream/simple/Utils.java index a5e53b9fb..bd3fa4743 100644 --- a/src/examples/java/io/nats/examples/jetstream/simple/Utils.java +++ b/src/examples/java/io/nats/examples/jetstream/simple/Utils.java @@ -20,6 +20,7 @@ import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; public class Utils { @@ -58,28 +59,34 @@ public static class Publisher implements Runnable { private final String messageText; private final int jitter; private final AtomicBoolean keepGoing = new AtomicBoolean(true); - private int pubNo; + private final AtomicInteger pubCount; public Publisher(JetStream js, String subject, String messageText, int jitter) { this.js = js; this.subject = subject; this.messageText = messageText; this.jitter = jitter; + pubCount = new AtomicInteger(); } public void stopPublishing() { keepGoing.set(false); } + public int getPubCount() { + return pubCount.get(); + } + @Override public void run() { try { while (keepGoing.get()) { //noinspection BusyWait Thread.sleep(ThreadLocalRandom.current().nextLong(jitter)); - js.publish(subject, (messageText + "-" + (++pubNo)).getBytes()); + js.publish(subject, (messageText + "-" + (pubCount.incrementAndGet())).getBytes()); } - } catch (Exception e) { + } + catch (Exception e) { throw new RuntimeException(e); } } diff --git a/src/main/java/io/nats/client/impl/NatsMessageConsumer.java b/src/main/java/io/nats/client/impl/NatsMessageConsumer.java index 613b6ce1e..13ee4a2c4 100644 --- a/src/main/java/io/nats/client/impl/NatsMessageConsumer.java +++ b/src/main/java/io/nats/client/impl/NatsMessageConsumer.java @@ -72,10 +72,10 @@ public SequenceTracker(MessageHandler userHandler) { @Override public void onMessage(Message msg) throws InterruptedException { + userHandler.onMessage(msg); if (stopped && pmm.noMorePending()) { finished = true; } - userHandler.onMessage(msg); } } }