Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Simplification More Review #948

Merged
merged 13 commits into from
Aug 11, 2023
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -764,7 +764,7 @@ You can however set the deliver policy which will be used to start the subscript
| JsSoOrderedNotAllowedWithDurable | SO | 90106 | Durable is not allowed with an ordered consumer. |
| JsSoOrderedNotAllowedWithDeliverSubject | SO | 90107 | Deliver subject is not allowed with an ordered consumer. |
| JsSoOrderedRequiresAckPolicyNone | SO | 90108 | Ordered consumer requires Ack Policy None. |
| JsSoOrderedRequiresMaxDeliver | SO | 90109 | Max deliver is limited to 1 with an ordered consumer. |
| JsSoOrderedRequiresMaxDeliverOfOne | SO | 90109 | Max deliver is limited to 1 with an ordered consumer. |
| JsSoNameMismatch | SO | 90110 | Builder name must match the consumer configuration name if both are provided. |
| JsSoOrderedMemStorageNotSuppliedOrTrue | SO | 90111 | Mem Storage must be true if supplied. |
| JsSoOrderedReplicasNotSuppliedOrOne | SO | 90112 | Replicas must be 1 if supplied. |
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,31 +38,31 @@ public static void main(String[] args) {
createOrReplaceStream(nc.jetStreamManagement(), STREAM, SUBJECT);

// get a stream context from the connection
StreamContext streamContext = nc.streamContext(STREAM);
StreamContext streamContext = nc.getStreamContext(STREAM);
System.out.println("S1. " + streamContext.getStreamInfo());

// get a stream context from the connection, supplying custom JetStreamOptions
streamContext = nc.streamContext(STREAM, JetStreamOptions.builder().build());
streamContext = nc.getStreamContext(STREAM, JetStreamOptions.builder().build());
System.out.println("S2. " + streamContext.getStreamInfo());

// get a stream context from the JetStream context
streamContext = js.streamContext(STREAM);
streamContext = js.getStreamContext(STREAM);
System.out.println("S3. " + streamContext.getStreamInfo());

// when you create a consumer from the stream context you get a ConsumerContext in return
ConsumerContext consumerContext = streamContext.createOrUpdateConsumer(ConsumerConfiguration.builder().durable(CONSUMER_NAME).build());
System.out.println("C1. " + consumerContext.getCachedConsumerInfo());

// get a ConsumerContext from the connection for a pre-existing consumer
consumerContext = nc.consumerContext(STREAM, CONSUMER_NAME);
consumerContext = nc.getConsumerContext(STREAM, CONSUMER_NAME);
System.out.println("C2. " + consumerContext.getCachedConsumerInfo());

// get a ConsumerContext from the connection for a pre-existing consumer, supplying custom JetStreamOptions
consumerContext = nc.consumerContext(STREAM, CONSUMER_NAME, JetStreamOptions.builder().build());
consumerContext = nc.getConsumerContext(STREAM, CONSUMER_NAME, JetStreamOptions.builder().build());
System.out.println("C3. " + consumerContext.getCachedConsumerInfo());

// get a ConsumerContext from the stream context for a pre-existing consumer
consumerContext = streamContext.consumerContext(CONSUMER_NAME);
consumerContext = streamContext.getConsumerContext(CONSUMER_NAME);
System.out.println("C4. " + consumerContext.getCachedConsumerInfo());
}
catch (JetStreamApiException | IOException | InterruptedException ioe) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ private static void simpleFetch(Connection nc, JetStream js, String label, int m
StreamContext streamContext;
ConsumerContext consumerContext;
try {
streamContext = nc.streamContext(STREAM);
streamContext = nc.getStreamContext(STREAM);
consumerContext = streamContext.createOrUpdateConsumer(ConsumerConfiguration.builder().durable(consumerName).build());
}
catch (JetStreamApiException | IOException e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ private static void simpleFetch(Connection nc, JetStream js, String label, int m
StreamContext streamContext;
ConsumerContext consumerContext;
try {
streamContext = nc.streamContext(STREAM);
streamContext = nc.getStreamContext(STREAM);
consumerContext = streamContext.createOrUpdateConsumer(ConsumerConfiguration.builder().durable(consumerName).build());
}
catch (JetStreamApiException | IOException e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,13 +26,12 @@
* SIMPLIFICATION IS EXPERIMENTAL AND SUBJECT TO CHANGE
*/
public class IterableConsumerExample {
private static final String STREAM = "manually-stream";
private static final String SUBJECT = "manually-subject";
private static final String CONSUMER_NAME = "manually-consumer";
private static final String MESSAGE_TEXT = "manually";
private static final String STREAM = "iterable-stream";
private static final String SUBJECT = "iterable-subject";
private static final String CONSUMER_NAME = "iterable-consumer";
private static final String MESSAGE_TEXT = "iterable";
private static final int STOP_COUNT = 500;
private static final int REPORT_EVERY = 50;
private static final int JITTER = 20;

private static final String SERVER = "nats://localhost:4222";

Expand All @@ -46,7 +45,7 @@ public static void main(String[] args) {
StreamContext streamContext;
ConsumerContext consumerContext;
try {
streamContext = nc.streamContext(STREAM);
streamContext = nc.getStreamContext(STREAM);
consumerContext = streamContext.createOrUpdateConsumer(ConsumerConfiguration.builder().durable(CONSUMER_NAME).build());
}
catch (JetStreamApiException | IOException e) {
Expand All @@ -58,33 +57,44 @@ public static void main(String[] args) {
return;
}

System.out.println("Starting publish...");
Publisher publisher = new Publisher(js, SUBJECT, MESSAGE_TEXT, 10);
Thread pubThread = new Thread(publisher);
pubThread.start();

// set up the iterable consumer
Thread consumeThread = new Thread(() -> {
int count = 0;
long start = System.currentTimeMillis();
try (IterableConsumer consumer = consumerContext.consume()) {
try (IterableConsumer consumer = consumerContext.iterate()) {
System.out.println("Starting main loop.");
while (count < STOP_COUNT) {
Message msg = consumer.nextMessage(1000);
if (msg != null) {
msg.ack();
if (++count % REPORT_EVERY == 0) {
report("Main Loop Running", System.currentTimeMillis() - start, count);
report("Main loop running", System.currentTimeMillis() - start, count);
}
}
}
report("Main Loop Stopped", System.currentTimeMillis() - start, count);
report("Main loop stopped", System.currentTimeMillis() - start, count);

System.out.println("Pausing for effect...allow more messages come across.");
Thread.sleep(JITTER * 2); // allows more messages to come across
consumer.stop(1000);
// The consumer has at least 1 pull request active. When stop is called,
// no more pull requests will be made, but messages already requested
// will still come across the wire to the client.
consumer.stop();

System.out.println("Starting post-stop loop.");
Message msg = consumer.nextMessage(1000);
while (msg != null) {
msg.ack();
report("Post-stop loop running", System.currentTimeMillis() - start, ++count);
msg = consumer.nextMessage(1000);
while (!consumer.isFinished()) {
Message msg = consumer.nextMessage(1000);
if (msg != null) {
msg.ack();
if (++count % REPORT_EVERY == 0) {
report("Post-stop loop running", System.currentTimeMillis() - start, ++count);
}
}
}
report("Post-stop loop stopped", System.currentTimeMillis() - start, count);
}
catch (JetStreamStatusCheckedException | InterruptedException | IOException | JetStreamApiException e) {
// JetStreamStatusCheckedException:
Expand All @@ -105,13 +115,9 @@ public static void main(String[] args) {
report("Done", System.currentTimeMillis() - start, count);
});
consumeThread.start();

Publisher publisher = new Publisher(js, SUBJECT, MESSAGE_TEXT, JITTER);
Thread pubThread = new Thread(publisher);
pubThread.start();

consumeThread.join();
publisher.stopPublishing();

publisher.stopPublishing(); // otherwise it will complain when the connection goes away
pubThread.join();
}
catch (IOException | InterruptedException ioe) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,18 +20,18 @@
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicInteger;

import static io.nats.examples.jetstream.simple.Utils.Publisher;
import static io.nats.examples.jetstream.simple.Utils.createOrReplaceStream;
import static io.nats.examples.jetstream.simple.Utils.publish;

/**
* This example will demonstrate simplified consume with a handler
* SIMPLIFICATION IS EXPERIMENTAL AND SUBJECT TO CHANGE
*/
public class MessageConsumerExample {
private static final String STREAM = "consume-handler-stream";
private static final String SUBJECT = "consume-handler-subject";
private static final String CONSUMER_NAME = "consume-handler-consumer";
private static final String MESSAGE_TEXT = "consume-handler";
private static final String STREAM = "consume-stream";
private static final String SUBJECT = "consume-subject";
private static final String CONSUMER_NAME = "consume-consumer";
private static final String MESSAGE_TEXT = "consume";
private static final int STOP_COUNT = 500;
private static final int REPORT_EVERY = 100;

Expand All @@ -43,17 +43,18 @@ public static void main(String[] args) {
JetStream js = nc.jetStream();
createOrReplaceStream(nc.jetStreamManagement(), STREAM, SUBJECT);

// publishing so there are lots of messages
System.out.println("Publishing...");
publish(js, SUBJECT, MESSAGE_TEXT, 2500);
System.out.println("Starting publish...");
Publisher publisher = new Publisher(js, SUBJECT, MESSAGE_TEXT, 10);
Thread pubThread = new Thread(publisher);
pubThread.start();

// get stream context, create consumer and get the consumer context
StreamContext streamContext;
ConsumerContext consumerContext;
try {
streamContext = nc.streamContext(STREAM);
streamContext = nc.getStreamContext(STREAM);
streamContext.createOrUpdateConsumer(ConsumerConfiguration.builder().durable(CONSUMER_NAME).build());
consumerContext = streamContext.consumerContext(CONSUMER_NAME);
consumerContext = streamContext.getConsumerContext(CONSUMER_NAME);
}
catch (JetStreamApiException | IOException e) {
// JetStreamApiException:
Expand All @@ -78,13 +79,19 @@ public static void main(String[] args) {
};

// create the consumer then use it
try {
MessageConsumer consumer = consumerContext.consume(handler);
try (MessageConsumer consumer = consumerContext.consume(handler)) {
latch.await();
// once the consumer is stopped, the client will drain messages

// The consumer has at least 1 pull request active. When stop is called,
// no more pull requests will be made, but messages already requested
// will still come across the wire to the client.
System.out.println("Stop the consumer...");
consumer.stop(1000);
Thread.sleep(1000); // enough for messages to drain after stop
consumer.stop();

// wait until the consumer is finished
while (!consumer.isFinished()) {
Thread.sleep(10);
}
}
catch (JetStreamApiException | IOException e) {
// JetStreamApiException:
Expand All @@ -103,6 +110,9 @@ public static void main(String[] args) {
}

report("Final", start, atomicCount.get());

publisher.stopPublishing(); // otherwise it will complain when the connection goes away
pubThread.join();
}
catch (IOException | InterruptedException ioe) {
// IOException:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ public static void main(String[] args) {
StreamContext streamContext;
ConsumerContext consumerContext;
try {
streamContext = nc.streamContext(STREAM);
streamContext = nc.getStreamContext(STREAM);
consumerContext = streamContext.createOrUpdateConsumer(ConsumerConfiguration.builder().durable(CONSUMER_NAME).build());
}
catch (JetStreamApiException | IOException e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ public static void main(String[] args) {
// to allow ConsumerContexts to be created from the name.
StreamContext streamContext;
try {
streamContext = nc.streamContext(STREAM);
streamContext = nc.getStreamContext(STREAM);
streamContext.createOrUpdateConsumer(ConsumerConfiguration.builder().durable(CONSUMER_NAME).build());
}
catch (JetStreamApiException | IOException e) {
Expand Down Expand Up @@ -139,7 +139,7 @@ public HandlerConsumerHolder(int id, StreamContext sc, CountDownLatch latch) thr

@Override
public void stop() throws InterruptedException {
messageConsumer.stop(1000);
messageConsumer.stop();
}
}

Expand All @@ -150,7 +150,7 @@ static class IterableConsumerHolder extends ConsumerHolder {

public IterableConsumerHolder(int id, StreamContext sc, CountDownLatch latch) throws JetStreamApiException, IOException {
super(id, sc, latch);
iterableConsumer = consumerContext.consume();
iterableConsumer = consumerContext.iterate();
t = new Thread(() -> {
while (latch.getCount() > 0) {
try {
Expand Down Expand Up @@ -187,7 +187,7 @@ public ConsumerHolder(int id, StreamContext sc, CountDownLatch latch) throws Jet
this.id = id;
thisReceived = new AtomicInteger();
this.latch = latch;
consumerContext = sc.consumerContext(CONSUMER_NAME);
consumerContext = sc.getConsumerContext(CONSUMER_NAME);
}

public abstract void stop() throws InterruptedException;
Expand Down
13 changes: 10 additions & 3 deletions src/examples/java/io/nats/examples/jetstream/simple/Utils.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;

public class Utils {

Expand Down Expand Up @@ -58,28 +59,34 @@ public static class Publisher implements Runnable {
private final String messageText;
private final int jitter;
private final AtomicBoolean keepGoing = new AtomicBoolean(true);
private int pubNo;
private final AtomicInteger pubCount;

public Publisher(JetStream js, String subject, String messageText, int jitter) {
this.js = js;
this.subject = subject;
this.messageText = messageText;
this.jitter = jitter;
pubCount = new AtomicInteger();
}

public void stopPublishing() {
keepGoing.set(false);
}

public int getPubCount() {
return pubCount.get();
}

@Override
public void run() {
try {
while (keepGoing.get()) {
//noinspection BusyWait
Thread.sleep(ThreadLocalRandom.current().nextLong(jitter));
js.publish(subject, (messageText + "-" + (++pubNo)).getBytes());
js.publish(subject, (messageText + "-" + (pubCount.incrementAndGet())).getBytes());
}
} catch (Exception e) {
}
catch (Exception e) {
throw new RuntimeException(e);
}
}
Expand Down
2 changes: 1 addition & 1 deletion src/main/java/io/nats/client/BaseConsumeOptions.java
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ protected BaseConsumeOptions(Builder b) {
idleHeartbeat = Math.min(MAX_HEARTBEAT_MILLIS, expiresIn * MAX_IDLE_HEARTBEAT_PERCENT / 100);
}

public long getExpiresIn() {
public long getExpiresInMillis() {
return expiresIn;
}

Expand Down
Loading