Skip to content

Commit

Permalink
Allow simplification fetch to have noWait with or without expires.
Browse files Browse the repository at this point in the history
  • Loading branch information
scottf committed Feb 29, 2024
1 parent 39d651b commit 31178c2
Show file tree
Hide file tree
Showing 4 changed files with 56 additions and 15 deletions.
3 changes: 3 additions & 0 deletions src/main/java/io/nats/client/BaseConsumeOptions.java
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ public class BaseConsumeOptions {
protected final long expiresIn;
protected final long idleHeartbeat;
protected final int thresholdPercent;
protected final boolean noWait;

@SuppressWarnings("rawtypes") // Don't need the type of the builder to get its vars
protected BaseConsumeOptions(Builder b) {
Expand All @@ -45,6 +46,7 @@ protected BaseConsumeOptions(Builder b) {
// validation handled in builder
thresholdPercent = b.thresholdPercent;
expiresIn = b.expiresIn;
noWait = b.noWait;

// calculated
idleHeartbeat = Math.min(MAX_HEARTBEAT_MILLIS, expiresIn * MAX_IDLE_HEARTBEAT_PERCENT / 100);
Expand All @@ -67,6 +69,7 @@ protected static abstract class Builder<B, CO> {
protected long bytes = 0;
protected int thresholdPercent = DEFAULT_THRESHOLD_PERCENT;
protected long expiresIn = DEFAULT_EXPIRES_IN_MILLIS;
protected boolean noWait = false;

protected abstract B getThis();

Expand Down
23 changes: 23 additions & 0 deletions src/main/java/io/nats/client/FetchConsumeOptions.java
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,8 @@ public long getMaxBytes() {
return bytes;
}

public boolean isNoWait() { return noWait; }

public static Builder builder() {
return new Builder();
}
Expand Down Expand Up @@ -92,6 +94,27 @@ public Builder max(int maxBytes, int maxMessages) {
return bytes(maxBytes);
}

/**
* Set no wait to true
* @return the builder
*/
public Builder noWait() {
this.noWait = true;
this.expiresIn = 0;
return this;
}

/**
* Set no wait to true with an expiration, special behavior.
* @param expiresInMillis the expiration time in milliseconds
* @return the builder
*/
public Builder noWaitExpiresIn(long expiresInMillis) {
this.noWait = true;
expiresIn(expiresInMillis);
return this;
}

/**
* Build the FetchConsumeOptions.
* @return a FetchConsumeOptions instance
Expand Down
23 changes: 19 additions & 4 deletions src/main/java/io/nats/client/impl/NatsFetchConsumer.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import java.io.IOException;

class NatsFetchConsumer extends NatsMessageConsumerBase implements FetchConsumer, PullManagerObserver {
private final boolean isNoWait;
private final long maxWaitNanos;
private final String pullSubject;
private long startNanos;
Expand All @@ -29,13 +30,22 @@ class NatsFetchConsumer extends NatsMessageConsumerBase implements FetchConsumer
{
super(cachedConsumerInfo);

isNoWait = fetchConsumeOptions.isNoWait();
long expiresInMillis = fetchConsumeOptions.getExpiresInMillis();
maxWaitNanos = expiresInMillis * 1_000_000;
long inactiveThreshold = expiresInMillis * 110 / 100; // ten % longer than the wait
long inactiveThreshold;
if (expiresInMillis == 0) { // can be for noWait
maxWaitNanos = 100_000_000; // 100ms
inactiveThreshold = 1000;
}
else {
maxWaitNanos = expiresInMillis * 1_000_000;
inactiveThreshold = expiresInMillis * 110 / 100; // ten % longer than the wait
}
PullRequestOptions pro = PullRequestOptions.builder(fetchConsumeOptions.getMaxMessages())
.maxBytes(fetchConsumeOptions.getMaxBytes())
.expiresIn(fetchConsumeOptions.getExpiresInMillis())
.expiresIn(expiresInMillis)
.idleHeartbeat(fetchConsumeOptions.getIdleHeartbeat())
.noWait(isNoWait)
.build();
initSub(subscriptionMaker.subscribe(null, null, null, inactiveThreshold));
pullSubject = sub._pull(pro, false, this);
Expand Down Expand Up @@ -82,7 +92,12 @@ public Message nextMessage() throws InterruptedException, JetStreamStatusChecked
// if the timer has run out, don't allow waiting
// this might happen once, but it should already be noMorePending
if (timeLeftMillis < 1) {
return sub._nextUnmanagedNoWait(pullSubject); // null means don't wait
Message m = sub._nextUnmanagedNoWait(pullSubject); // null means don't wait
if (m == null && isNoWait) {
finished.set(true);
lenientClose();
}
return m;
}

return sub._nextUnmanaged(timeLeftMillis, pullSubject);
Expand Down
22 changes: 11 additions & 11 deletions src/main/java/io/nats/client/impl/NatsJetStreamMetaData.java
Original file line number Diff line number Diff line change
Expand Up @@ -35,22 +35,22 @@ public class NatsJetStreamMetaData {
private final long consumerSeq;
private final ZonedDateTime timestamp;
private final long pending;

@Override
public String toString() {
return "NatsJetStreamMetaData{" +
"prefix='" + prefix + '\'' +
", domain='" + domain + '\'' +
", stream='" + stream + '\'' +
", consumer='" + consumer + '\'' +
", delivered=" + delivered +
", streamSeq=" + streamSeq +
", consumerSeq=" + consumerSeq +
", timestamp=" + timestamp +
", pending=" + pending +
'}';
"prefix='" + prefix + '\'' +
", domain='" + domain + '\'' +
", stream='" + stream + '\'' +
", consumer='" + consumer + '\'' +
", delivered=" + delivered +
", streamSeq=" + streamSeq +
", consumerSeq=" + consumerSeq +
", timestamp=" + timestamp +
", pending=" + pending +
'}';
}


/*
v0 <prefix>.ACK.<stream name>.<consumer name>.<num delivered>.<stream sequence>.<consumer sequence>.<timestamp>
v1 <prefix>.ACK.<stream name>.<consumer name>.<num delivered>.<stream sequence>.<consumer sequence>.<timestamp>.<num pending>
Expand Down

0 comments on commit 31178c2

Please sign in to comment.