Skip to content

Commit

Permalink
updated examples
Browse files Browse the repository at this point in the history
  • Loading branch information
scottf committed Aug 9, 2023
1 parent 2bc6a20 commit 4c81595
Show file tree
Hide file tree
Showing 4 changed files with 59 additions and 34 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -26,13 +26,12 @@
* SIMPLIFICATION IS EXPERIMENTAL AND SUBJECT TO CHANGE
*/
public class IterableConsumerExample {
private static final String STREAM = "manually-stream";
private static final String SUBJECT = "manually-subject";
private static final String CONSUMER_NAME = "manually-consumer";
private static final String MESSAGE_TEXT = "manually";
private static final String STREAM = "iterable-stream";
private static final String SUBJECT = "iterable-subject";
private static final String CONSUMER_NAME = "iterable-consumer";
private static final String MESSAGE_TEXT = "iterable";
private static final int STOP_COUNT = 500;
private static final int REPORT_EVERY = 50;
private static final int JITTER = 20;

private static final String SERVER = "nats://localhost:4222";

Expand All @@ -58,6 +57,12 @@ public static void main(String[] args) {
return;
}

System.out.println("Starting publish...");
Publisher publisher = new Publisher(js, SUBJECT, MESSAGE_TEXT, 10);
Thread pubThread = new Thread(publisher);
pubThread.start();

// set up the iterable consumer
Thread consumeThread = new Thread(() -> {
int count = 0;
long start = System.currentTimeMillis();
Expand All @@ -68,23 +73,28 @@ public static void main(String[] args) {
if (msg != null) {
msg.ack();
if (++count % REPORT_EVERY == 0) {
report("Main Loop Running", System.currentTimeMillis() - start, count);
report("Main loop running", System.currentTimeMillis() - start, count);
}
}
}
report("Main Loop Stopped", System.currentTimeMillis() - start, count);
report("Main loop stopped", System.currentTimeMillis() - start, count);

System.out.println("Pausing for effect...allow more messages come across.");
Thread.sleep(JITTER * 2); // allows more messages to come across
// The consumer has at least 1 pull request active. When stop is called,
// no more pull requests will be made, but messages already requested
// will still come across the wire to the client.
consumer.stop();

System.out.println("Starting post-stop loop.");
Message msg = consumer.nextMessage(1000);
while (msg != null) {
msg.ack();
report("Post-stop loop running", System.currentTimeMillis() - start, ++count);
msg = consumer.nextMessage(1000);
while (!consumer.isFinished()) {
Message msg = consumer.nextMessage(1000);
if (msg != null) {
msg.ack();
if (++count % REPORT_EVERY == 0) {
report("Post-stop loop running", System.currentTimeMillis() - start, ++count);
}
}
}
report("Post-stop loop stopped", System.currentTimeMillis() - start, count);
}
catch (JetStreamStatusCheckedException | InterruptedException | IOException | JetStreamApiException e) {
// JetStreamStatusCheckedException:
Expand All @@ -105,13 +115,9 @@ public static void main(String[] args) {
report("Done", System.currentTimeMillis() - start, count);
});
consumeThread.start();

Publisher publisher = new Publisher(js, SUBJECT, MESSAGE_TEXT, JITTER);
Thread pubThread = new Thread(publisher);
pubThread.start();

consumeThread.join();
publisher.stopPublishing();

publisher.stopPublishing(); // otherwise it will complain when the connection goes away
pubThread.join();
}
catch (IOException | InterruptedException ioe) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,18 +20,18 @@
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicInteger;

import static io.nats.examples.jetstream.simple.Utils.Publisher;
import static io.nats.examples.jetstream.simple.Utils.createOrReplaceStream;
import static io.nats.examples.jetstream.simple.Utils.publish;

/**
* This example will demonstrate simplified consume with a handler
* SIMPLIFICATION IS EXPERIMENTAL AND SUBJECT TO CHANGE
*/
public class MessageConsumerExample {
private static final String STREAM = "consume-handler-stream";
private static final String SUBJECT = "consume-handler-subject";
private static final String CONSUMER_NAME = "consume-handler-consumer";
private static final String MESSAGE_TEXT = "consume-handler";
private static final String STREAM = "consume-stream";
private static final String SUBJECT = "consume-subject";
private static final String CONSUMER_NAME = "consume-consumer";
private static final String MESSAGE_TEXT = "consume";
private static final int STOP_COUNT = 500;
private static final int REPORT_EVERY = 100;

Expand All @@ -43,9 +43,11 @@ public static void main(String[] args) {
JetStream js = nc.jetStream();
createOrReplaceStream(nc.jetStreamManagement(), STREAM, SUBJECT);

// publishing so there are lots of messages
System.out.println("Publishing...");
publish(js, SUBJECT, MESSAGE_TEXT, 2500);
System.out.println("Starting publish...");
// publish(js, SUBJECT, MESSAGE_TEXT, 2500);
Publisher publisher = new Publisher(js, SUBJECT, MESSAGE_TEXT, 10);
Thread pubThread = new Thread(publisher);
pubThread.start();

// get stream context, create consumer and get the consumer context
StreamContext streamContext;
Expand Down Expand Up @@ -81,10 +83,17 @@ public static void main(String[] args) {
try {
MessageConsumer consumer = consumerContext.consume(handler);
latch.await();
// once the consumer is stopped, the client will drain messages

// The consumer has at least 1 pull request active. When stop is called,
// no more pull requests will be made, but messages already requested
// will still come across the wire to the client.
System.out.println("Stop the consumer...");
consumer.stop();
Thread.sleep(1000); // enough for messages to drain after stop

// wait until the consumer is finished
while (!consumer.isFinished()) {
Thread.sleep(10);
}
}
catch (JetStreamApiException | IOException e) {
// JetStreamApiException:
Expand All @@ -103,6 +112,9 @@ public static void main(String[] args) {
}

report("Final", start, atomicCount.get());

publisher.stopPublishing(); // otherwise it will complain when the connection goes away
pubThread.join();
}
catch (IOException | InterruptedException ioe) {
// IOException:
Expand Down
13 changes: 10 additions & 3 deletions src/examples/java/io/nats/examples/jetstream/simple/Utils.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;

public class Utils {

Expand Down Expand Up @@ -58,28 +59,34 @@ public static class Publisher implements Runnable {
private final String messageText;
private final int jitter;
private final AtomicBoolean keepGoing = new AtomicBoolean(true);
private int pubNo;
private final AtomicInteger pubCount;

public Publisher(JetStream js, String subject, String messageText, int jitter) {
this.js = js;
this.subject = subject;
this.messageText = messageText;
this.jitter = jitter;
pubCount = new AtomicInteger();
}

public void stopPublishing() {
keepGoing.set(false);
}

public int getPubCount() {
return pubCount.get();
}

@Override
public void run() {
try {
while (keepGoing.get()) {
//noinspection BusyWait
Thread.sleep(ThreadLocalRandom.current().nextLong(jitter));
js.publish(subject, (messageText + "-" + (++pubNo)).getBytes());
js.publish(subject, (messageText + "-" + (pubCount.incrementAndGet())).getBytes());
}
} catch (Exception e) {
}
catch (Exception e) {
throw new RuntimeException(e);
}
}
Expand Down
2 changes: 1 addition & 1 deletion src/main/java/io/nats/client/impl/NatsMessageConsumer.java
Original file line number Diff line number Diff line change
Expand Up @@ -72,10 +72,10 @@ public SequenceTracker(MessageHandler userHandler) {

@Override
public void onMessage(Message msg) throws InterruptedException {
userHandler.onMessage(msg);
if (stopped && pmm.noMorePending()) {
finished = true;
}
userHandler.onMessage(msg);
}
}
}

0 comments on commit 4c81595

Please sign in to comment.