Skip to content

Commit

Permalink
fix(jetstream): added rejections for all the 409 limit notifications …
Browse files Browse the repository at this point in the history
…maxrequestexpires, maxwaiting - previously, only maxrequestbatch was getting rejected for next/fetch() .

Removed old `jetstream409_test.ts` due to outdated 409 error handling test cases. Added new tests to reflect the current Consumer and ensured proper handling of 409 status code scenarios in `consume_test.ts`. Updated `types.ts` and `consumer.ts` to enhance error notification and handling.

Signed-off-by: Alberto Ricart <[email protected]>
  • Loading branch information
aricart committed Oct 17, 2024
1 parent 1cf1725 commit 323a918
Show file tree
Hide file tree
Showing 6 changed files with 298 additions and 354 deletions.
67 changes: 39 additions & 28 deletions jetstream/src/consumer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -198,36 +198,25 @@ export class PullConsumerMessagesImpl extends QueuedIteratorImpl<JsMsg>
// these are real bad values - so this is bad request
// fail on this
// we got a bad request - no progress here
if (code === 400) {
this.stop(new NatsError(description, `${code}`));
return;
} else if (code === 409 && description === "consumer deleted") {
this.notify(
ConsumerEvents.ConsumerDeleted,
`${code} ${description}`,
);
if (!this.isConsume || this.abortOnMissingResource) {
const error = new NatsError(description, `${code}`);
this.stop(error);
return;
}
} else if (
code === 409 && description.includes("exceeded maxrequestbatch")
) {
this.notify(
ConsumerDebugEvents.DebugEvent,
`${code} ${description}`,
);
if (!this.isConsume) {
const error = new NatsError(description, `${code}`);
this.stop(error);
switch (code) {
case 400:
this.stop(new NatsError(description, `${code}`));
return;
case 409: {
const err = this.handle409(code, description);
if (err) {
this.stop(err);
return;
}
// stall, missed heartbeats will resuscitate
// proportionally to 2 missed heartbeats
break;
}
} else {
this.notify(
ConsumerDebugEvents.DebugEvent,
`${code} ${description}`,
);
default:
this.notify(
ConsumerDebugEvents.DebugEvent,
{ code, description },
);
}
}
} else {
Expand Down Expand Up @@ -349,6 +338,28 @@ export class PullConsumerMessagesImpl extends QueuedIteratorImpl<JsMsg>
this.pull(this.pullOptions());
}

/**
* handle the notification of the error, if
* @param code
* @param description
*/
handle409(code: number, description: string): Error | null {
const e = description === "consumer deleted"
? ConsumerEvents.ConsumerDeleted
: ConsumerEvents.ExceededLimit;
this.notify(e, { code, description });
if (!this.isConsume) {
// terminate the fetch/next
return new NatsError(description, `${code}`);
} else if (
e === ConsumerEvents.ConsumerDeleted && this.abortOnMissingResource
) {
// terminate the consume if abortOnMissingResource
return new NatsError(description, `${code}`);
}
return null;
}

reset() {
// stop the monitoring if running
this.monitor?.cancel();
Expand Down
6 changes: 6 additions & 0 deletions jetstream/src/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -571,6 +571,12 @@ export enum ConsumerEvents {
* the consumer is recreated. The argument is the name of the newly created consumer.
*/
OrderedConsumerRecreated = "ordered_consumer_recreated",

/**
* This notification is specific to pull consumers and will be notified whenever
* the pull request exceeds some limit such as maxwaiting, maxrequestbatch, etc.
*/
ExceededLimit = "limit_exceeded",
}

/**
Expand Down
43 changes: 0 additions & 43 deletions jetstream/tests/consume_test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ import {
cleanup,
connect,
jetstreamServerConf,
Lock,
NatsServer,
} from "test_helpers";
import { setupStreamAndConsumer } from "../examples/util.ts";
Expand All @@ -27,7 +26,6 @@ import {
assertEquals,
assertExists,
assertRejects,
fail,
} from "jsr:@std/assert";
import { initStream } from "./jstest_util.ts";
import {
Expand All @@ -47,7 +45,6 @@ import {
} from "../src/mod.ts";

import type { ConsumerStatus } from "../src/mod.ts";
import { ConsumerDebugEvents } from "../src/mod.ts";

Deno.test("consumers - consume", async () => {
const { ns, nc } = await _setup(connect, jetstreamServerConf());
Expand Down Expand Up @@ -450,43 +447,3 @@ Deno.test("consume - consumer bind", async () => {

await cleanup(ns, nc);
});

Deno.test("consume - exceeding max_messages will continue", async () => {
const { ns, nc } = await _setup(connect, jetstreamServerConf());
const jsm = await jetstreamManager(nc);
await jsm.streams.add({ name: "A", subjects: ["a"] });
await jsm.consumers.add("A", {
durable_name: "a",
max_batch: 100,
});

const js = jetstream(nc);
const c = await js.consumers.get("A", "a");
const lock = Lock(2, 0);
const iter = await c.consume({ max_messages: 1000, expires: 1000 });
(async () => {
const status = iter.status();
for await (const s of status) {
if (s.type === ConsumerDebugEvents.DebugEvent) {
const d = s.data as string;
if (d.includes("exceeded maxrequestbatch of 100")) {
lock.unlock();
}
}
}
})().catch((err) => {
fail(err.message);
});

(async () => {
for await (const m of iter) {
console.log(m);
}
})().then();

await lock;
iter.stop();
await iter.closed();

await cleanup(ns, nc);
});
Loading

0 comments on commit 323a918

Please sign in to comment.