From 323a9188cbcc510c36b338d58057a24d667e63d4 Mon Sep 17 00:00:00 2001 From: Alberto Ricart Date: Thu, 17 Oct 2024 12:48:10 -0500 Subject: [PATCH] fix(jetstream): added rejections for all the 409 limit notifications maxrequestexpires, maxwaiting - previously, only maxrequestbatch was getting rejected for next/fetch() . Removed old `jetstream409_test.ts` due to outdated 409 error handling test cases. Added new tests to reflect the current Consumer and ensured proper handling of 409 status code scenarios in `consume_test.ts`. Updated `types.ts` and `consumer.ts` to enhance error notification and handling. Signed-off-by: Alberto Ricart --- jetstream/src/consumer.ts | 67 ++++--- jetstream/src/types.ts | 6 + jetstream/tests/consume_test.ts | 43 ----- jetstream/tests/jetream409_test.ts | 276 --------------------------- jetstream/tests/jetstream409_test.ts | 247 ++++++++++++++++++++++++ migration.md | 13 +- 6 files changed, 298 insertions(+), 354 deletions(-) delete mode 100644 jetstream/tests/jetream409_test.ts create mode 100644 jetstream/tests/jetstream409_test.ts diff --git a/jetstream/src/consumer.ts b/jetstream/src/consumer.ts index 9f1b2c5b..f11304dd 100644 --- a/jetstream/src/consumer.ts +++ b/jetstream/src/consumer.ts @@ -198,36 +198,25 @@ export class PullConsumerMessagesImpl extends QueuedIteratorImpl // these are real bad values - so this is bad request // fail on this // we got a bad request - no progress here - if (code === 400) { - this.stop(new NatsError(description, `${code}`)); - return; - } else if (code === 409 && description === "consumer deleted") { - this.notify( - ConsumerEvents.ConsumerDeleted, - `${code} ${description}`, - ); - if (!this.isConsume || this.abortOnMissingResource) { - const error = new NatsError(description, `${code}`); - this.stop(error); - return; - } - } else if ( - code === 409 && description.includes("exceeded maxrequestbatch") - ) { - this.notify( - ConsumerDebugEvents.DebugEvent, - `${code} ${description}`, - ); - if (!this.isConsume) { - const error = new NatsError(description, `${code}`); - this.stop(error); + switch (code) { + case 400: + this.stop(new NatsError(description, `${code}`)); return; + case 409: { + const err = this.handle409(code, description); + if (err) { + this.stop(err); + return; + } + // stall, missed heartbeats will resuscitate + // proportionally to 2 missed heartbeats + break; } - } else { - this.notify( - ConsumerDebugEvents.DebugEvent, - `${code} ${description}`, - ); + default: + this.notify( + ConsumerDebugEvents.DebugEvent, + { code, description }, + ); } } } else { @@ -349,6 +338,28 @@ export class PullConsumerMessagesImpl extends QueuedIteratorImpl this.pull(this.pullOptions()); } + /** + * handle the notification of the error, if + * @param code + * @param description + */ + handle409(code: number, description: string): Error | null { + const e = description === "consumer deleted" + ? ConsumerEvents.ConsumerDeleted + : ConsumerEvents.ExceededLimit; + this.notify(e, { code, description }); + if (!this.isConsume) { + // terminate the fetch/next + return new NatsError(description, `${code}`); + } else if ( + e === ConsumerEvents.ConsumerDeleted && this.abortOnMissingResource + ) { + // terminate the consume if abortOnMissingResource + return new NatsError(description, `${code}`); + } + return null; + } + reset() { // stop the monitoring if running this.monitor?.cancel(); diff --git a/jetstream/src/types.ts b/jetstream/src/types.ts index 18dd95fb..1f89595b 100644 --- a/jetstream/src/types.ts +++ b/jetstream/src/types.ts @@ -571,6 +571,12 @@ export enum ConsumerEvents { * the consumer is recreated. The argument is the name of the newly created consumer. */ OrderedConsumerRecreated = "ordered_consumer_recreated", + + /** + * This notification is specific to pull consumers and will be notified whenever + * the pull request exceeds some limit such as maxwaiting, maxrequestbatch, etc. + */ + ExceededLimit = "limit_exceeded", } /** diff --git a/jetstream/tests/consume_test.ts b/jetstream/tests/consume_test.ts index d186c653..3104108f 100644 --- a/jetstream/tests/consume_test.ts +++ b/jetstream/tests/consume_test.ts @@ -18,7 +18,6 @@ import { cleanup, connect, jetstreamServerConf, - Lock, NatsServer, } from "test_helpers"; import { setupStreamAndConsumer } from "../examples/util.ts"; @@ -27,7 +26,6 @@ import { assertEquals, assertExists, assertRejects, - fail, } from "jsr:@std/assert"; import { initStream } from "./jstest_util.ts"; import { @@ -47,7 +45,6 @@ import { } from "../src/mod.ts"; import type { ConsumerStatus } from "../src/mod.ts"; -import { ConsumerDebugEvents } from "../src/mod.ts"; Deno.test("consumers - consume", async () => { const { ns, nc } = await _setup(connect, jetstreamServerConf()); @@ -450,43 +447,3 @@ Deno.test("consume - consumer bind", async () => { await cleanup(ns, nc); }); - -Deno.test("consume - exceeding max_messages will continue", async () => { - const { ns, nc } = await _setup(connect, jetstreamServerConf()); - const jsm = await jetstreamManager(nc); - await jsm.streams.add({ name: "A", subjects: ["a"] }); - await jsm.consumers.add("A", { - durable_name: "a", - max_batch: 100, - }); - - const js = jetstream(nc); - const c = await js.consumers.get("A", "a"); - const lock = Lock(2, 0); - const iter = await c.consume({ max_messages: 1000, expires: 1000 }); - (async () => { - const status = iter.status(); - for await (const s of status) { - if (s.type === ConsumerDebugEvents.DebugEvent) { - const d = s.data as string; - if (d.includes("exceeded maxrequestbatch of 100")) { - lock.unlock(); - } - } - } - })().catch((err) => { - fail(err.message); - }); - - (async () => { - for await (const m of iter) { - console.log(m); - } - })().then(); - - await lock; - iter.stop(); - await iter.closed(); - - await cleanup(ns, nc); -}); diff --git a/jetstream/tests/jetream409_test.ts b/jetstream/tests/jetream409_test.ts deleted file mode 100644 index 3107cb0b..00000000 --- a/jetstream/tests/jetream409_test.ts +++ /dev/null @@ -1,276 +0,0 @@ -/* - * Copyright 2022-2024 The NATS Authors - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -import { Js409Errors, setMaxWaitingToFail } from "../src/jsutil.ts"; -import { deferred, nanos, StringCodec } from "@nats-io/nats-core"; - -import type { NatsError } from "@nats-io/nats-core"; -import { - AckPolicy, - consumerOpts, - jetstream, - jetstreamManager, -} from "../src/mod.ts"; -import type { JetStreamClient, PullOptions } from "../src/mod.ts"; -import { assertRejects, assertStringIncludes } from "jsr:@std/assert"; -import { initStream } from "./jstest_util.ts"; -import { - _setup, - cleanup, - connect, - jetstreamServerConf, - notCompatible, -} from "test_helpers"; - -type testArgs = { - js: JetStreamClient; - stream: string; - durable: string; - opts: PullOptions; - expected: Js409Errors; -}; - -async function expectFetchError(args: testArgs) { - const { js, stream, durable, opts, expected } = args; - const i = js.fetch(stream, durable, opts); - await assertRejects( - async () => { - for await (const _m of i) { - //nothing - } - }, - Error, - expected, - ); -} - -async function expectPullSubscribeIteratorError(args: testArgs) { - const { js, stream, durable, opts, expected } = args; - const co = consumerOpts(); - co.bind(stream, durable); - const sub = await js.pullSubscribe(">", co); - sub.pull(opts); - - await assertRejects( - async () => { - for await (const _m of sub) { - // nothing - } - }, - Error, - expected, - ); -} - -async function expectPullSubscribeCallbackError( - args: testArgs, -) { - const { js, stream, durable, opts, expected } = args; - - const d = deferred(); - const co = consumerOpts(); - co.bind(stream, durable); - co.callback((err) => { - d.resolve(err); - }); - const sub = await js.pullSubscribe(">", co); - sub.pull(opts); - const ne = await d; - assertStringIncludes(ne?.message || "", expected); -} - -Deno.test("409 - max_batch", async () => { - const { ns, nc } = await _setup(connect, jetstreamServerConf({})); - const { stream, subj } = await initStream(nc); - - const jsm = await jetstreamManager(nc); - - const sc = StringCodec(); - const js = jetstream(nc); - for (let i = 0; i < 10; i++) { - await js.publish(subj, sc.encode("hello")); - } - - await jsm.consumers.add(stream, { - durable_name: "a", - ack_policy: AckPolicy.Explicit, - max_batch: 1, - }); - - const opts = { batch: 10, expires: 1000 } as PullOptions; - const to = { - js, - stream, - durable: "a", - opts, - expected: Js409Errors.MaxBatchExceeded, - }; - - await expectFetchError(to); - await expectPullSubscribeIteratorError(to); - await expectPullSubscribeCallbackError(to); - - await cleanup(ns, nc); -}); - -Deno.test("409 - max_expires", async () => { - const { ns, nc } = await _setup(connect, jetstreamServerConf({})); - const { stream, subj } = await initStream(nc); - - const jsm = await jetstreamManager(nc); - - const sc = StringCodec(); - const js = jetstream(nc); - for (let i = 0; i < 10; i++) { - await js.publish(subj, sc.encode("hello")); - } - - await jsm.consumers.add(stream, { - durable_name: "a", - ack_policy: AckPolicy.Explicit, - max_expires: nanos(1000), - }); - - const opts = { batch: 1, expires: 5000 } as PullOptions; - const to = { - js, - stream, - durable: "a", - opts, - expected: Js409Errors.MaxExpiresExceeded, - }; - - await expectFetchError(to); - await expectPullSubscribeIteratorError(to); - await expectPullSubscribeCallbackError(to); - - await cleanup(ns, nc); -}); - -Deno.test("409 - max_bytes", async () => { - const { ns, nc } = await _setup(connect, jetstreamServerConf({})); - if (await notCompatible(ns, nc, "2.8.3")) { - return; - } - const { stream, subj } = await initStream(nc); - - const jsm = await jetstreamManager(nc); - - const sc = StringCodec(); - const js = jetstream(nc); - for (let i = 0; i < 10; i++) { - await js.publish(subj, sc.encode("hello")); - } - - await jsm.consumers.add(stream, { - durable_name: "a", - ack_policy: AckPolicy.Explicit, - max_bytes: 10, - }); - - const opts = { max_bytes: 1024, expires: 5000 } as PullOptions; - const to = { - js, - stream, - durable: "a", - opts, - expected: Js409Errors.MaxBytesExceeded, - }; - - await expectFetchError(to); - await expectPullSubscribeIteratorError(to); - await expectPullSubscribeCallbackError(to); - - await cleanup(ns, nc); -}); - -Deno.test("409 - max msg size", async () => { - const { ns, nc } = await _setup(connect, jetstreamServerConf({})); - if (await notCompatible(ns, nc, "2.9.0")) { - return; - } - const { stream, subj } = await initStream(nc); - - const jsm = await jetstreamManager(nc); - - const sc = StringCodec(); - const js = jetstream(nc); - for (let i = 0; i < 10; i++) { - await js.publish(subj, sc.encode("hello")); - } - - await jsm.consumers.add(stream, { - durable_name: "a", - ack_policy: AckPolicy.Explicit, - }); - - const opts = { max_bytes: 2, expires: 5000 } as PullOptions; - const to = { - js, - stream, - durable: "a", - opts, - expected: Js409Errors.MaxMessageSizeExceeded, - }; - - await expectFetchError(to); - await expectPullSubscribeIteratorError(to); - await expectPullSubscribeCallbackError(to); - - await cleanup(ns, nc); -}); - -Deno.test("409 - max waiting", async () => { - const { ns, nc } = await _setup(connect, jetstreamServerConf({})); - const { stream, subj } = await initStream(nc); - - const jsm = await jetstreamManager(nc); - - const sc = StringCodec(); - const js = jetstream(nc); - for (let i = 0; i < 10; i++) { - await js.publish(subj, sc.encode("hello")); - } - - await jsm.consumers.add(stream, { - durable_name: "a", - ack_policy: AckPolicy.Explicit, - max_waiting: 1, - }); - - const opts = { expires: 1000 } as PullOptions; - const to = { - js, - stream, - durable: "a", - opts, - expected: Js409Errors.MaxWaitingExceeded, - }; - - const iter = js.fetch(stream, "a", { batch: 1000, expires: 5000 }); - (async () => { - for await (const _m of iter) { - // nothing - } - })().then(); - - setMaxWaitingToFail(true); - - await expectFetchError(to); - await expectPullSubscribeIteratorError(to); - await expectPullSubscribeCallbackError(to); - - await cleanup(ns, nc); -}); diff --git a/jetstream/tests/jetstream409_test.ts b/jetstream/tests/jetstream409_test.ts new file mode 100644 index 00000000..f36ce313 --- /dev/null +++ b/jetstream/tests/jetstream409_test.ts @@ -0,0 +1,247 @@ +/* + * Copyright 2022-2024 The NATS Authors + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import { nanos } from "@nats-io/nats-core"; +import { + AckPolicy, + ConsumerDebugEvents, + ConsumerEvents, + jetstream, + jetstreamManager, +} from "../src/mod.ts"; + +import { assert, assertRejects, fail } from "jsr:@std/assert"; +import { initStream } from "./jstest_util.ts"; +import { _setup, cleanup, connect, jetstreamServerConf } from "test_helpers"; + +Deno.test("409 - max expires", async () => { + const { ns, nc } = await _setup(connect, jetstreamServerConf({})); + const { stream } = await initStream(nc); + + const jsm = await jetstreamManager(nc); + await jsm.consumers.add(stream, { + durable_name: "a", + ack_policy: AckPolicy.Explicit, + max_expires: nanos(1_000), + }); + + const js = jetstream(nc); + const c = await js.consumers.get(stream, "a"); + await assertRejects( + () => { + return c.next({ expires: 30_000 }); + }, + Error, + "exceeded maxrequestexpires", + ); + + await assertRejects( + async () => { + const iter = await c.fetch({ expires: 30_000 }); + for await (const _ of iter) { + fail("shouldn't have gotten a message"); + } + }, + Error, + "exceeded maxrequestexpires", + ); + + const iter = await c.consume({ + expires: 2_000, + callback: () => { + fail("shouldn't have gotten a message"); + }, + }); + let count = 0; + (async () => { + for await (const s of iter.status()) { + if (s.type === ConsumerEvents.ExceededLimit) { + const data = s.data as { code: number; description: string }; + if (data.description.includes("exceeded maxrequestexpires")) { + count++; + if (count === 2) { + iter.close(); + } + } + } + } + })().then(); + await iter.closed(); + assert(count >= 2); + await cleanup(ns, nc); +}); + +Deno.test("409 - max message size", async () => { + const { ns, nc } = await _setup(connect, jetstreamServerConf({})); + const { stream, subj } = await initStream(nc); + + const js = jetstream(nc); + await js.publish(subj, new Uint8Array(1024)); + + const jsm = await jetstreamManager(nc); + await jsm.consumers.add(stream, { + durable_name: "a", + ack_policy: AckPolicy.Explicit, + }); + + const c = await js.consumers.get(stream, "a"); + const msgs = await c.fetch({ max_bytes: 10 }); + (async () => { + for await (const s of msgs.status()) { + if (s.type === ConsumerDebugEvents.Discard) { + const data = s.data as { bytesLeft: number }; + if (data.bytesLeft === 10) { + msgs.stop(); + } + } + } + })().then(); + for await (const _ of msgs) { + fail("shoudn't have gotten any messages"); + } + + const iter = await c.consume({ + max_bytes: 10, + callback: () => { + fail("this shouldn't have been called"); + }, + }); + let count = 0; + for await (const s of iter.status()) { + if (s.type === ConsumerDebugEvents.Discard) { + const data = s.data as { bytesLeft: number }; + count++; + if (data.bytesLeft === 10) { + if (count >= 2) { + iter.close(); + } + } + } + } + await iter.closed(); + assert(count >= 2); + await cleanup(ns, nc); +}); + +Deno.test("409 - max batch", async () => { + const { ns, nc } = await _setup(connect, jetstreamServerConf({})); + const { stream } = await initStream(nc); + + const jsm = await jetstreamManager(nc); + + // only one pull request + await jsm.consumers.add(stream, { + durable_name: "a", + ack_policy: AckPolicy.Explicit, + max_batch: 10, + }); + + const js = jetstream(nc); + const c = await js.consumers.get(stream, "a"); + + await assertRejects( + async () => { + const msgs = await c.fetch({ max_messages: 100, expires: 1_000 }); + for await (const _ of msgs) { + // nothing + } + }, + Error, + "exceeded maxrequestbatch", + ); + + let count = 0; + const iter = await c.consume({ + max_messages: 100, + expires: 2_000, + callback: () => {}, + }); + for await (const s of iter.status()) { + if (s.type === ConsumerEvents.ExceededLimit) { + const data = s.data as { code: number; description: string }; + if (data.description.includes("exceeded maxrequestbatch")) { + count++; + if (count >= 2) { + iter.stop(); + } + } + } + } + await iter.closed(); + assert(count >= 2); + + await cleanup(ns, nc); +}); + +Deno.test("409 - max waiting", async () => { + const { ns, nc } = await _setup(connect, jetstreamServerConf({})); + const { stream } = await initStream(nc); + + const jsm = await jetstreamManager(nc); + + // only one pull request + await jsm.consumers.add(stream, { + durable_name: "a", + ack_policy: AckPolicy.Explicit, + max_waiting: 1, + }); + + const js = jetstream(nc); + const c = await js.consumers.get(stream, "a"); + + // consume with an open pull + const blocking = await c.consume({ callback: () => {} }); + + await assertRejects( + () => { + return c.next({ expires: 1_000 }); + }, + Error, + "exceeded maxwaiting", + ); + + await assertRejects( + async () => { + const msgs = await c.fetch({ expires: 1_000 }); + for await (const _ of msgs) { + // nothing + } + }, + Error, + "exceeded maxwaiting", + ); + + let count = 0; + + const iter = await c.consume({ expires: 1_000, callback: () => {} }); + for await (const s of iter.status()) { + if (s.type === ConsumerEvents.ExceededLimit) { + const data = s.data as { code: number; description: string }; + if (data.description.includes("exceeded maxwaiting")) { + count++; + if (count >= 2) { + iter.stop(); + } + } + } + } + await iter.closed(); + assert(count >= 2); + + // stop the consumer blocking + blocking.stop(); + await blocking.closed(); + await cleanup(ns, nc); +}); diff --git a/migration.md b/migration.md index 9c3dcb74..64e4b369 100644 --- a/migration.md +++ b/migration.md @@ -171,10 +171,9 @@ const service = await svc.add({ ### Watch -Object.watch() now returns an `ObjectWatchInfo` which is an `ObjectInfo` but adding the property -`isUpdate` this property is now true when the watch is notifying of a new entry. Note that previously -the iterator would yield `ObjectInfo | null`, the `null` signal has been removed. This means that -when doing a watch on an empty ObjectStore you won't get an update notification until an actual value -arrives. - - +Object.watch() now returns an `ObjectWatchInfo` which is an `ObjectInfo` but +adding the property `isUpdate` this property is now true when the watch is +notifying of a new entry. Note that previously the iterator would yield +`ObjectInfo | null`, the `null` signal has been removed. This means that when +doing a watch on an empty ObjectStore you won't get an update notification until +an actual value arrives.