diff --git a/jetstream/src/consumer.ts b/jetstream/src/consumer.ts index 39d74839..ede37292 100644 --- a/jetstream/src/consumer.ts +++ b/jetstream/src/consumer.ts @@ -21,7 +21,6 @@ import type { Status, Subscription, SubscriptionImpl, - Timeout, } from "@nats-io/nats-core/internal"; import { backoff, @@ -33,7 +32,6 @@ import { nanos, nuid, QueuedIteratorImpl, - timeout, } from "@nats-io/nats-core/internal"; import type { ConsumerAPIImpl } from "./jsmconsumer_api.ts"; @@ -47,6 +45,8 @@ import type { PullOptions, } from "./jsapi_types.ts"; import { AckPolicy, DeliverPolicy } from "./jsapi_types.ts"; +import { ConsumerDebugEvents, ConsumerEvents } from "./types.ts"; + import type { ConsumeMessages, ConsumeOptions, @@ -67,8 +67,7 @@ import type { ThresholdBytes, ThresholdMessages, } from "./types.ts"; -import { ConsumerDebugEvents, ConsumerEvents } from "./types.ts"; -import { JetStreamStatus } from "./jserrors.ts"; +import { JetStreamError, JetStreamStatus } from "./jserrors.ts"; import { minValidation } from "./jsutil.ts"; enum PullConsumerType { @@ -120,7 +119,6 @@ export class PullConsumerMessagesImpl extends QueuedIteratorImpl pending: { msgs: number; bytes: number; requests: number }; isConsume: boolean; callback: ConsumerCallbackFn | null; - timeout: Timeout | null; listeners: QueuedIterator[]; statusIterator?: QueuedIteratorImpl; abortOnMissingResource?: boolean; @@ -170,7 +168,6 @@ export class PullConsumerMessagesImpl extends QueuedIteratorImpl this.noIterator = typeof this.callback === "function"; this.monitor = null; this.pending = { msgs: 0, bytes: 0, requests: 0 }; - this.timeout = null; this.listeners = []; this.abortOnMissingResource = copts.abort_on_missing_resource === true; this.bind = copts.bind === true; @@ -327,6 +324,17 @@ export class PullConsumerMessagesImpl extends QueuedIteratorImpl // on the next pull etc - the only assumption here is we should // reset and check if the consumer was deleted from under us this.notify(ConsumerEvents.HeartbeatsMissed, data); + if (!this.isConsume && !this.consumer.ordered) { + // if we are not a consume, give up - this was masked by an + // external timer on fetch - the hb is a more reliable timeout + // since it requires 2 to be missed - this creates an edge case + // that would timeout the client longer than they would expect: we + // could be waiting for one more message, nothing happens, and + // now we have to wait for 2 missed hbs, which would be 1m (max), so + // there wouldn't be a fail fast. + this.stop(new JetStreamError("heartbeats missed")); + return true; + } this.resetPending() .then(() => { }) @@ -614,10 +622,6 @@ export class PullConsumerMessagesImpl extends QueuedIteratorImpl return opts; } - trackTimeout(t: Timeout) { - this.timeout = t; - } - close(): Promise { this.stop(); return this.iterClosed; @@ -630,8 +634,6 @@ export class PullConsumerMessagesImpl extends QueuedIteratorImpl clearTimers() { this.monitor?.cancel(); this.monitor = null; - this.timeout?.cancel(); - this.timeout = null; } override stop(err?: Error) { @@ -686,12 +688,16 @@ export class PullConsumerMessagesImpl extends QueuedIteratorImpl ); } - // require idle_heartbeat + // require idle_heartbeat - clamp between 500-30_000 args.idle_heartbeat = args.idle_heartbeat || args.expires / 2; args.idle_heartbeat = args.idle_heartbeat > 30_000 ? 30_000 : args.idle_heartbeat; + if (args.idle_heartbeat < 500) { + args.idle_heartbeat = 500; + } + if (refilling) { const minMsgs = Math.round(args.max_messages * .75) || 1; args.threshold_messages = args.threshold_messages || minMsgs; @@ -848,18 +854,6 @@ export class PullConsumerImpl implements Consumer { if (this.ordered) { this.messages = m; } - // FIXME: need some way to pad this correctly - const to = Math.round(m.opts.expires! * 1.05); - const timer = timeout(to); - m.closed().catch((err) => { - console.log(err); - }).finally(() => { - timer.cancel(); - }); - timer.catch(() => { - m.close().catch(); - }); - m.trackTimeout(timer); return Promise.resolve(m); } @@ -871,8 +865,12 @@ export class PullConsumerImpl implements Consumer { fopts.max_messages = 1; const iter = await this.fetch(fopts); - for await (const m of iter) { - return m; + try { + for await (const m of iter) { + return m; + } + } catch (err) { + return Promise.reject(err); } return null; } diff --git a/jetstream/tests/consume_test.ts b/jetstream/tests/consume_test.ts index 79ca4413..77e28c39 100644 --- a/jetstream/tests/consume_test.ts +++ b/jetstream/tests/consume_test.ts @@ -13,13 +13,7 @@ * limitations under the License. */ -import { - cleanup, - connect, - jetstreamServerConf, - NatsServer, - setup, -} from "test_helpers"; +import { cleanup, jetstreamServerConf, setup } from "test_helpers"; import { setupStreamAndConsumer } from "../examples/util.ts"; import { assert, @@ -34,18 +28,19 @@ import { delay, errors, nanos, + type NatsConnectionImpl, syncIterator, -} from "@nats-io/nats-core"; +} from "@nats-io/nats-core/internal"; import type { PullConsumerMessagesImpl } from "../src/consumer.ts"; import { AckPolicy, ConsumerEvents, + type ConsumerStatus, DeliverPolicy, jetstream, jetstreamManager, } from "../src/mod.ts"; - -import type { ConsumerStatus } from "../src/mod.ts"; +import type { PushConsumerMessagesImpl } from "../src/pushconsumer.ts"; Deno.test("consumers - consume", async () => { const { ns, nc } = await setup(jetstreamServerConf()); @@ -103,8 +98,7 @@ Deno.test("consumers - consume callback rejects iter", async () => { }); Deno.test("consume - heartbeats", async () => { - const servers = await NatsServer.setupDataConnCluster(4); - const nc = await connect({ port: servers[0].port }); + const { ns, nc } = await setup(jetstreamServerConf()); const { stream } = await initStream(nc); const jsm = await jetstreamManager(nc); await jsm.consumers.add(stream, { @@ -118,20 +112,12 @@ Deno.test("consume - heartbeats", async () => { max_messages: 100, idle_heartbeat: 1000, expires: 30000, - }); + }) as PushConsumerMessagesImpl; - const buf: Promise[] = []; - // stop the data serverss - setTimeout(() => { - buf.push(servers[1].stop()); - buf.push(servers[2].stop()); - buf.push(servers[3].stop()); - }, 1000); - - await Promise.all(buf); + // make heartbeats trigger + (nc as NatsConnectionImpl)._resub(iter.sub, "foo"); const d = deferred(); - await (async () => { const status = iter.status(); for await (const s of status) { @@ -151,8 +137,7 @@ Deno.test("consume - heartbeats", async () => { assertEquals(cs.type, ConsumerEvents.HeartbeatsMissed); assertEquals(cs.data, 2); - await nc.close(); - await NatsServer.stopAll(servers, true); + await cleanup(ns, nc); }); Deno.test("consume - deleted consumer", async () => { @@ -448,3 +433,54 @@ Deno.test("consume - consumer bind", async () => { await cleanup(ns, nc); }); + +Deno.test("consume - timer is based on idle_hb", async () => { + const { ns, nc } = await setup(jetstreamServerConf()); + + const jsm = await jetstreamManager(nc); + await jsm.streams.add({ name: "A", subjects: ["a"] }); + + await jsm.consumers.add("A", { + durable_name: "a", + deliver_policy: DeliverPolicy.All, + ack_policy: AckPolicy.Explicit, + }); + + const js = jetstream(nc); + await js.publish("a"); + const c = await js.consumers.get("A", "a"); + + const iter = await c.fetch({ + expires: 2000, + max_messages: 10, + }) as PullConsumerMessagesImpl; + + let hbm = false; + (async () => { + for await (const s of iter.status()) { + if (s.type === ConsumerEvents.HeartbeatsMissed) { + hbm = true; + } + } + })().then(); + + const buf = []; + await assertRejects( + async () => { + for await (const m of iter) { + buf.push(m); + m.ack(); + // make the subscription now fail + const nci = nc as NatsConnectionImpl; + nci._resub(iter.sub, "foo"); + } + }, + Error, + "heartbeats missed", + ); + + assertEquals(buf.length, 1); + assertEquals(hbm, true); + + await cleanup(ns, nc); +}); diff --git a/jetstream/tests/consumers_ordered_test.ts b/jetstream/tests/consumers_ordered_test.ts index 07822eb2..ef2fb122 100644 --- a/jetstream/tests/consumers_ordered_test.ts +++ b/jetstream/tests/consumers_ordered_test.ts @@ -39,7 +39,6 @@ import type { PullConsumerImpl, PullConsumerMessagesImpl, } from "../src/consumer.ts"; -import { StreamImpl } from "../src/jsmstream_api.ts"; import { delayUntilAssetNotFound } from "./util.ts"; import { flakyTest } from "../../test_helpers/mod.ts"; import { ConsumerNotFoundError } from "../src/jserrors.ts"; @@ -747,62 +746,6 @@ Deno.test("ordered consumers - next deleted consumer", async () => { await cleanup(ns, nc); }); -Deno.test( - "ordered consumers - next stream not found", - flakyTest(async () => { - const { ns, nc } = await setup(jetstreamServerConf()); - - const jsm = await jetstreamManager(nc); - await jsm.streams.add({ name: "A", subjects: ["hello"] }); - - const js = jetstream(nc); - const c = await js.consumers.get("A"); - await jsm.streams.delete("A"); - - await assertRejects( - () => { - return c.next({ expires: 1000 }); - }, - Error, - "stream not found", - ); - - await cleanup(ns, nc); - }), -); - -Deno.test( - "ordered consumers - fetch stream not found", - flakyTest(async () => { - const { ns, nc } = await setup(jetstreamServerConf()); - const jsm = await jetstreamManager(nc); - const si = await jsm.streams.add({ name: "A", subjects: ["a"] }); - - const js = jetstream(nc); - const c = await js.consumers.get("A"); - - const s = new StreamImpl(jsm.streams, si); - await jsm.streams.delete("A"); - await delayUntilAssetNotFound(s); - - const iter = await c.fetch({ - expires: 3000, - }); - - await assertRejects( - async () => { - for await (const _ of iter) { - // ignore - } - }, - Error, - "stream not found", - ); - - await cleanup(ns, nc); - }), -); - Deno.test("ordered consumers - consume stream not found request abort", async () => { const { ns, nc } = await setup(jetstreamServerConf()); diff --git a/jetstream/tests/consumers_test.ts b/jetstream/tests/consumers_test.ts index d3a4401c..c6580f3a 100644 --- a/jetstream/tests/consumers_test.ts +++ b/jetstream/tests/consumers_test.ts @@ -25,15 +25,10 @@ import { ConsumerEvents, DeliverPolicy, jetstream, + JetStreamError, jetstreamManager, } from "../src/mod.ts"; -import type { - Consumer, - ConsumerMessages, - ConsumerStatus, - PullOptions, -} from "../src/mod.ts"; -import { connect, NatsServer } from "test_helpers"; +import type { Consumer, ConsumerStatus, PullOptions } from "../src/mod.ts"; import { deferred, nanos } from "@nats-io/nats-core"; import type { NatsConnectionImpl } from "@nats-io/nats-core/internal"; import { cleanup, jetstreamServerConf, setup } from "test_helpers"; @@ -181,9 +176,7 @@ Deno.test("consumers - push consumer on get", async () => { }); Deno.test("consumers - fetch heartbeats", async () => { - const servers = await NatsServer.setupDataConnCluster(4); - - const nc = await connect({ port: servers[0].port }); + const { ns, nc } = await setup(jetstreamServerConf()); const { stream } = await initStream(nc); const jsm = await jetstreamManager(nc); await jsm.consumers.add(stream, { @@ -193,21 +186,11 @@ Deno.test("consumers - fetch heartbeats", async () => { const js = jetstream(nc); const c = await js.consumers.get(stream, "a"); - const iter: ConsumerMessages = await c.fetch({ + const iter = await c.fetch({ max_messages: 100, idle_heartbeat: 1000, expires: 30000, - }); - - const buf: Promise[] = []; - // stop the data serverss - setTimeout(() => { - buf.push(servers[1].stop()); - buf.push(servers[2].stop()); - buf.push(servers[3].stop()); - }, 1000); - - await Promise.all(buf); + }) as PullConsumerMessagesImpl; const d = deferred(); @@ -215,23 +198,29 @@ Deno.test("consumers - fetch heartbeats", async () => { const status = iter.status(); for await (const s of status) { d.resolve(s); - iter.stop(); break; } })(); - await (async () => { - for await (const _r of iter) { - // nothing - } - })(); + await assertRejects( + async () => { + for await (const _ of iter) { + // ignore + } + }, + JetStreamError, + "heartbeats missed", + ); + + const nci = nc as NatsConnectionImpl; + // make it not get anything from the server + nci._resub(iter.sub, "foo"); const cs = await d; assertEquals(cs.type, ConsumerEvents.HeartbeatsMissed); assertEquals(cs.data, 2); - await nc.close(); - await NatsServer.stopAll(servers, true); + await cleanup(ns, nc); }); Deno.test("consumers - bad options", async () => { diff --git a/jetstream/tests/fetch_test.ts b/jetstream/tests/fetch_test.ts index c7d65ad4..b96e52d6 100644 --- a/jetstream/tests/fetch_test.ts +++ b/jetstream/tests/fetch_test.ts @@ -13,7 +13,7 @@ * limitations under the License. */ -import { cleanup, flakyTest, jetstreamServerConf, setup } from "test_helpers"; +import { cleanup, jetstreamServerConf, setup } from "test_helpers"; import { initStream } from "./jstest_util.ts"; import { assertEquals, assertExists, assertRejects } from "jsr:@std/assert"; import { delay, Empty, nanos, syncIterator } from "@nats-io/nats-core"; @@ -106,44 +106,6 @@ Deno.test("fetch - exactly messages", async () => { await cleanup(ns, nc); }); -Deno.test( - "fetch - consumer not found", - flakyTest(async () => { - const { ns, nc } = await setup(jetstreamServerConf()); - const jsm = await jetstreamManager(nc); - await jsm.streams.add({ name: "A", subjects: ["hello"] }); - - await jsm.consumers.add("A", { - durable_name: "a", - deliver_policy: DeliverPolicy.All, - ack_policy: AckPolicy.Explicit, - }); - - const js = jetstream(nc); - const c = await js.consumers.get("A", "a"); - - await c.delete(); - await delay(500); - - const iter = await c.fetch({ - expires: 3000, - }); - - const exited = assertRejects( - async () => { - for await (const _ of iter) { - // nothing - } - }, - Error, - "consumer not found", - ); - - await exited; - await cleanup(ns, nc); - }), -); - Deno.test("fetch - deleted consumer", async () => { const { ns, nc } = await setup(jetstreamServerConf()); const jsm = await jetstreamManager(nc); @@ -179,39 +141,6 @@ Deno.test("fetch - deleted consumer", async () => { await cleanup(ns, nc); }); -Deno.test("fetch - stream not found", async () => { - const { ns, nc } = await setup(jetstreamServerConf()); - - const jsm = await jetstreamManager(nc); - await jsm.streams.add({ name: "A", subjects: ["hello"] }); - - await jsm.consumers.add("A", { - durable_name: "a", - deliver_policy: DeliverPolicy.All, - ack_policy: AckPolicy.Explicit, - }); - - const js = jetstream(nc); - const c = await js.consumers.get("A", "a"); - const iter = await c.fetch({ - idle_heartbeat: 1_000, - }); - await jsm.streams.delete("A"); - await delay(500); - - await assertRejects( - async () => { - for await (const _ of iter) { - // nothing - } - }, - Error, - "stream not found", - ); - - await cleanup(ns, nc); -}); - Deno.test("fetch - listener leaks", async () => { const { ns, nc } = await setup(jetstreamServerConf()); const jsm = await jetstreamManager(nc); @@ -280,24 +209,29 @@ Deno.test("fetch - consumer bind", async () => { await js.publish("a"); const c = await js.consumers.get("A", "a"); - await c.delete(); const cisub = nc.subscribe("$JS.API.CONSUMER.INFO.A.a", { callback: () => {}, }); - const iter = await c.fetch({ + let iter = await c.fetch({ + expires: 1000, + bind: true, + }); + + for await (const _ of iter) { + // nothing + } + + iter = await c.fetch({ expires: 1000, bind: true, }); - const done = (async () => { - for await (const _ of iter) { - // nothing - } - })(); + for await (const _ of iter) { + // nothing + } - await done; assertEquals(cisub.getProcessed(), 0); await cleanup(ns, nc); }); diff --git a/jetstream/tests/jetstream_pullconsumer_test.ts b/jetstream/tests/jetstream_pullconsumer_test.ts index 2cf70a32..772cb4ed 100644 --- a/jetstream/tests/jetstream_pullconsumer_test.ts +++ b/jetstream/tests/jetstream_pullconsumer_test.ts @@ -70,7 +70,7 @@ Deno.test("jetstream - cross account pull", async () => { // create a durable config await admjsm.consumers.add(stream, { - ack_policy: AckPolicy.Explicit, + ack_policy: AckPolicy.None, durable_name: "me", }); @@ -90,7 +90,8 @@ Deno.test("jetstream - cross account pull", async () => { msg = await c.next(); assertExists(msg); assertEquals(msg.seq, 2); - msg = await c.next({ expires: 1000 }); + + msg = await c.next({ expires: 5000 }); assertEquals(msg, null); await cleanup(ns, admin, nc); diff --git a/jetstream/tests/jetstream_test.ts b/jetstream/tests/jetstream_test.ts index 5eccc06f..4b219204 100644 --- a/jetstream/tests/jetstream_test.ts +++ b/jetstream/tests/jetstream_test.ts @@ -744,7 +744,7 @@ Deno.test("jetstream - backoff", async () => { }); offset.slice(1).forEach((n, idx) => { - assertAlmostEquals(n, ms[idx], 20); + assertAlmostEquals(n, ms[idx], 50); }); await cleanup(ns, nc); diff --git a/jetstream/tests/next_test.ts b/jetstream/tests/next_test.ts index da27d5e1..d12313f0 100644 --- a/jetstream/tests/next_test.ts +++ b/jetstream/tests/next_test.ts @@ -13,19 +13,19 @@ * limitations under the License. */ -import { cleanup, flakyTest, jetstreamServerConf, setup } from "test_helpers"; +import { cleanup, jetstreamServerConf, setup } from "test_helpers"; import { initStream } from "./jstest_util.ts"; import { AckPolicy, DeliverPolicy } from "../src/jsapi_types.ts"; -import { assertEquals, assertRejects, fail } from "jsr:@std/assert"; +import { + assertEquals, + assertExists, + assertRejects, + fail, +} from "jsr:@std/assert"; import { delay, nanos } from "@nats-io/nats-core"; import type { NatsConnectionImpl } from "@nats-io/nats-core/internal"; -import { jetstream, jetstreamManager } from "../src/mod.ts"; -import { delayUntilAssetNotFound } from "./util.ts"; -import { - ConsumerNotFoundError, - JetStreamStatusError, - StreamNotFoundError, -} from "../src/jserrors.ts"; +import { jetstream, JetStreamError, jetstreamManager } from "../src/mod.ts"; +import { JetStreamStatusError } from "../src/jserrors.ts"; Deno.test("next - basics", async () => { const { ns, nc } = await setup(jetstreamServerConf()); @@ -117,35 +117,6 @@ Deno.test("next - listener leaks", async () => { await cleanup(ns, nc); }); -Deno.test( - "next - consumer not found", - flakyTest(async () => { - const { ns, nc } = await setup(jetstreamServerConf()); - const jsm = await jetstreamManager(nc); - await jsm.streams.add({ name: "A", subjects: ["hello"] }); - - await jsm.consumers.add("A", { - durable_name: "a", - deliver_policy: DeliverPolicy.All, - ack_policy: AckPolicy.Explicit, - }); - - const js = jetstream(nc); - const c = await js.consumers.get("A", "a"); - await c.delete(); - await delay(1000); - - await assertRejects( - () => { - return c.next({ expires: 1000 }); - }, - ConsumerNotFoundError, - ); - - await cleanup(ns, nc); - }), -); - Deno.test("next - deleted consumer", async () => { const { ns, nc } = await setup(jetstreamServerConf()); @@ -176,39 +147,6 @@ Deno.test("next - deleted consumer", async () => { await cleanup(ns, nc); }); -Deno.test( - "next - stream not found", - async () => { - const { ns, nc } = await setup(jetstreamServerConf()); - - const jsm = await jetstreamManager(nc); - await jsm.streams.add({ name: "A", subjects: ["hello"] }); - const s = await jsm.streams.get("A"); - - await jsm.consumers.add("A", { - durable_name: "a", - deliver_policy: DeliverPolicy.All, - ack_policy: AckPolicy.Explicit, - }); - - const js = jetstream(nc); - - const c = await js.consumers.get("A", "a"); - - await jsm.streams.delete("A"); - await delayUntilAssetNotFound(s); - - await assertRejects( - () => { - return c.next({ expires: 1000 }); - }, - StreamNotFoundError, - ); - - await cleanup(ns, nc); - }, -); - Deno.test("next - consumer bind", async () => { const { ns, nc } = await setup(jetstreamServerConf()); @@ -218,31 +156,50 @@ Deno.test("next - consumer bind", async () => { await jsm.consumers.add("A", { durable_name: "a", deliver_policy: DeliverPolicy.All, - ack_policy: AckPolicy.Explicit, + ack_policy: AckPolicy.None, }); const js = jetstream(nc); await js.publish("a"); const c = await js.consumers.get("A", "a"); - await c.delete(); // listen to see if the client does a consumer info - const cisub = nc.subscribe("$JS.API.CONSUMER.INFO.A.a", { + const sub = nc.subscribe("$JS.API.CONSUMER.INFO.A.a", { callback: () => { fail("saw a consumer info"); }, }); - const msg = await c.next({ + let msg = await c.next({ + expires: 1000, + bind: true, + }); + assertExists(msg); + + msg = await c.next({ expires: 1000, bind: true, }); + assertEquals(msg, null); + + await c.delete(); + + await assertRejects( + () => { + return c.next({ + expires: 1000, + bind: true, + }); + }, + JetStreamError, + "heartbeats missed", + ); await nc.flush(); assertEquals(msg, null); - assertEquals(cisub.getProcessed(), 0); + assertEquals(sub.getProcessed(), 0); await cleanup(ns, nc); }); diff --git a/migration.md b/migration.md index 47faf4e0..c05bb30c 100644 --- a/migration.md +++ b/migration.md @@ -133,6 +133,8 @@ To use JetStream, you must install and import `@nats/jetstream`. found error raises, this simplifies client usage and aligns with other APIs in the client. - MsgRequest for `Stream#getMessage()` removed deprecated number argument. +- For non-ordered consumers next/fetch() can will now throw/reject when + heartbeats are missed. ## Changes to KV diff --git a/test_helpers/mod.ts b/test_helpers/mod.ts index 1d934634..80c87978 100644 --- a/test_helpers/mod.ts +++ b/test_helpers/mod.ts @@ -66,8 +66,8 @@ export function jetstreamExportServerConf( jetstream: "enabled", users: [{ user: "js", password: "js" }], exports: [ - { service: "$JS.API.>" }, - { service: "$JS.ACK.>" }, + { service: "$JS.API.>", response_type: "stream" }, + { service: "$JS.ACK.>", response_type: "stream" }, { stream: "A.>", accounts: ["A"] }, ], },