Skip to content

Commit

Permalink
fix(jetstream): ordered pull consumer would never fail to reset on in…
Browse files Browse the repository at this point in the history
…itial creation if there was an error (#47)
  • Loading branch information
aricart authored Aug 23, 2024
1 parent d2019e9 commit d9ada37
Show file tree
Hide file tree
Showing 2 changed files with 25 additions and 1 deletion.
5 changes: 4 additions & 1 deletion jetstream/src/consumer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -764,6 +764,7 @@ export class OrderedPullConsumerImpl implements Consumer {
iter: OrderedConsumerMessages | null;
type: PullConsumerType;
startSeq: number;
maxInitialReset: number;

constructor(
api: ConsumerAPI,
Expand All @@ -780,6 +781,7 @@ export class OrderedPullConsumerImpl implements Consumer {
this.namePrefix = opts.name_prefix + this.namePrefix;
}
this.serial = 0;
this.maxInitialReset = 30;
this.currentConsumer = null;
this.userCallback = null;
this.iter = null;
Expand Down Expand Up @@ -854,6 +856,7 @@ export class OrderedPullConsumerImpl implements Consumer {
}

async resetConsumer(seq = 0): Promise<ConsumerInfo> {
const isNew = this.serial === 0;
// try to delete the consumer
this.consumer?.delete().catch(() => {});
seq = seq === 0 ? 1 : seq;
Expand Down Expand Up @@ -883,7 +886,7 @@ export class OrderedPullConsumerImpl implements Consumer {
}
}

if (seq === 0 && i >= 30) {
if (isNew && i >= this.maxInitialReset) {
// consumer was never created, so we can fail this
throw err;
} else {
Expand Down
21 changes: 21 additions & 0 deletions jetstream/tests/consumers_ordered_test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1055,3 +1055,24 @@ Deno.test("ordered consumers - next reset", async () => {

await cleanup(ns, nc);
});

Deno.test("ordered consumers - initial creation fails, consumer fails", async () => {
const { ns, nc } = await _setup(connect, jetstreamServerConf());
const jsm = await jetstreamManager(nc);

await jsm.streams.add({ name: "A", subjects: ["a"] });
const js = jsm.jetstream();

const c = await js.consumers.get("A") as OrderedPullConsumerImpl;
await jsm.streams.delete("A");
c.maxInitialReset = 3;
await assertRejects(
() => {
return c.consume();
},
Error,
"stream not found",
);

await cleanup(ns, nc);
});

0 comments on commit d9ada37

Please sign in to comment.