Skip to content

Commit

Permalink
doc(jestream): clarified consumer backoff - also added a test (#117)
Browse files Browse the repository at this point in the history
Signed-off-by: Alberto Ricart <[email protected]>
  • Loading branch information
aricart authored Nov 8, 2024
1 parent c7d578b commit 68f2ea5
Show file tree
Hide file tree
Showing 2 changed files with 59 additions and 4 deletions.
4 changes: 2 additions & 2 deletions jetstream/src/jsapi_types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1010,8 +1010,8 @@ export type ConsumerUpdateConfig = PriorityGroups & {
*/
"inactive_threshold"?: Nanos;
/**
* List of durations in nanoseconds format that represents a retry timescale for
* NaK'd messages or those being normally retried
* List of durations in nanoseconds that represents a retry timescale for
* the redelivery of messages
*/
"backoff"?: Nanos[];
/**
Expand Down
59 changes: 57 additions & 2 deletions jetstream/tests/jetstream_test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ import {
} from "@nats-io/nats-core";
import {
assert,
assertAlmostEquals,
assertEquals,
assertExists,
assertInstanceOf,
Expand Down Expand Up @@ -707,7 +708,8 @@ Deno.test("jetstream - backoff", async () => {
const { stream, subj } = await initStream(nc);
const jsm = await jetstreamManager(nc);

const backoff = [nanos(250), nanos(1000), nanos(3000)];
const ms = [250, 1000, 3000];
const backoff = ms.map((n) => nanos(n));
const ci = await jsm.consumers.add(stream, {
durable_name: "me",
ack_policy: AckPolicy.Explicit,
Expand All @@ -723,10 +725,11 @@ Deno.test("jetstream - backoff", async () => {
const js = jetstream(nc);
await js.publish(subj);

const when: number[] = [];
const c = await js.consumers.get(stream, "me");
const iter = await c.consume({
callback: (m) => {
console.log(m.info.redeliveryCount);
when.push(Date.now());
if (m.info.redeliveryCount === 4) {
iter.stop();
}
Expand All @@ -735,6 +738,58 @@ Deno.test("jetstream - backoff", async () => {

await iter.closed();

const offset = when.map((n, idx) => {
const p = idx > 0 ? idx - 1 : 0;
return n - when[p];
});

offset.slice(1).forEach((n, idx) => {
assertAlmostEquals(n, ms[idx], 20);
});

await cleanup(ns, nc);
});

Deno.test("jetstream - redelivery", async () => {
const { ns, nc } = await setup(jetstreamServerConf({}));
if (await notCompatible(ns, nc, "2.7.2")) {
return;
}

const { stream, subj } = await initStream(nc);
const jsm = await jetstreamManager(nc);

const ci = await jsm.consumers.add(stream, {
durable_name: "me",
ack_policy: AckPolicy.Explicit,
max_deliver: 4,
ack_wait: nanos(1000),
});

assertEquals(ci.config.max_deliver, 4);

const js = jetstream(nc);
await js.publish(subj);

const c = await js.consumers.get(stream, "me");

let redeliveries = 0;
const iter = await c.consume({
callback: (m) => {
if (m.redelivered) {
redeliveries++;
}
if (m.info.redeliveryCount === 4) {
setTimeout(() => {
iter.stop();
}, 2000);
}
},
});

await iter.closed();
assertEquals(redeliveries, 3);

await cleanup(ns, nc);
});

Expand Down

0 comments on commit 68f2ea5

Please sign in to comment.