diff --git a/jetstream/src/jsapi_types.ts b/jetstream/src/jsapi_types.ts index 0f8097be..00903109 100644 --- a/jetstream/src/jsapi_types.ts +++ b/jetstream/src/jsapi_types.ts @@ -1005,8 +1005,8 @@ export interface ConsumerUpdateConfig { */ "inactive_threshold"?: Nanos; /** - * List of durations in nanoseconds format that represents a retry timescale for - * NaK'd messages or those being normally retried + * List of durations in nanoseconds that represents a retry timescale for + * the redelivery of messages */ "backoff"?: Nanos[]; /** diff --git a/jetstream/tests/jetstream_test.ts b/jetstream/tests/jetstream_test.ts index aa3ecf59..661e1f0d 100644 --- a/jetstream/tests/jetstream_test.ts +++ b/jetstream/tests/jetstream_test.ts @@ -38,6 +38,7 @@ import { } from "@nats-io/nats-core"; import { assert, + assertAlmostEquals, assertEquals, assertExists, assertInstanceOf, @@ -707,7 +708,8 @@ Deno.test("jetstream - backoff", async () => { const { stream, subj } = await initStream(nc); const jsm = await jetstreamManager(nc); - const backoff = [nanos(250), nanos(1000), nanos(3000)]; + const ms = [250, 1000, 3000]; + const backoff = ms.map((n) => nanos(n)); const ci = await jsm.consumers.add(stream, { durable_name: "me", ack_policy: AckPolicy.Explicit, @@ -723,10 +725,11 @@ Deno.test("jetstream - backoff", async () => { const js = jetstream(nc); await js.publish(subj); + const when: number[] = []; const c = await js.consumers.get(stream, "me"); const iter = await c.consume({ callback: (m) => { - console.log(m.info.redeliveryCount); + when.push(Date.now()); if (m.info.redeliveryCount === 4) { iter.stop(); } @@ -735,6 +738,58 @@ Deno.test("jetstream - backoff", async () => { await iter.closed(); + const offset = when.map((n, idx) => { + const p = idx > 0 ? idx - 1 : 0; + return n - when[p]; + }); + + offset.slice(1).forEach((n, idx) => { + assertAlmostEquals(n, ms[idx], 20); + }); + + await cleanup(ns, nc); +}); + +Deno.test("jetstream - redelivery", async () => { + const { ns, nc } = await setup(jetstreamServerConf({})); + if (await notCompatible(ns, nc, "2.7.2")) { + return; + } + + const { stream, subj } = await initStream(nc); + const jsm = await jetstreamManager(nc); + + const ci = await jsm.consumers.add(stream, { + durable_name: "me", + ack_policy: AckPolicy.Explicit, + max_deliver: 4, + ack_wait: nanos(1000), + }); + + assertEquals(ci.config.max_deliver, 4); + + const js = jetstream(nc); + await js.publish(subj); + + const c = await js.consumers.get(stream, "me"); + + let redeliveries = 0; + const iter = await c.consume({ + callback: (m) => { + if (m.redelivered) { + redeliveries++; + } + if (m.info.redeliveryCount === 4) { + setTimeout(() => { + iter.stop(); + }, 2000); + } + }, + }); + + await iter.closed(); + assertEquals(redeliveries, 3); + await cleanup(ns, nc); });