Skip to content

Commit

Permalink
fixed kv tests, checks for subscription leaks were no counting before…
Browse files Browse the repository at this point in the history
… underlying consumer was cancelling.

Signed-off-by: Alberto Ricart <[email protected]>
  • Loading branch information
aricart committed Oct 3, 2024
1 parent 870d34b commit 3671901
Show file tree
Hide file tree
Showing 2 changed files with 27 additions and 39 deletions.
55 changes: 21 additions & 34 deletions kv/src/kv.ts
Original file line number Diff line number Diff line change
Expand Up @@ -852,7 +852,6 @@ export class Bucket implements KV, KvRemove {
const ignoreDeletes = opts.ignoreDeletes === true;

let fn = opts.initializedFn;
let count = 0;

const cc = this._buildCC(k, content, co);
cc.name = `KV_WATCHER_${nuid.next()}`;
Expand All @@ -862,8 +861,19 @@ export class Bucket implements KV, KvRemove {
}

const oc = await this.js.consumers.getPushConsumer(this.stream, cc);
qi._data = oc;
const info = await oc.info(true);
const count = info.num_pending;
if (count === 0 && fn) {
try {
fn();
} catch (_err) {
// ignoring
} finally {
fn = undefined;
}
}

qi._data = oc;
const iter = await oc.consume({
callback: (m) => {
const e = this.jmToEntry(m);
Expand All @@ -884,38 +894,14 @@ export class Bucket implements KV, KvRemove {
},
});

// by the time we are here, likely the subscription got messages
if (fn) {
// get the info used on consumer create
const last = await oc.info(true);
// this doesn't sound correct - we should be looking for a seq number instead
// then if we see a greater one, we are done.
const expect = last.num_pending + last.delivered.consumer_seq;
// if the iterator already queued - the only issue is other modifications
// did happen like stream was pruned, and the ordered consumer reset, etc
// we won't get what we are expecting - so the notification will never fire
// the sentinel ought to be coming from the server
if (expect === 0 || qi.received >= expect) {
try {
fn();
} catch (err) {
// fail it - there's something wrong in the user callback
qi.stop(err);
} finally {
fn = undefined;
}
} else {
count = expect;
}
}
qi.iterClosed.then(() => {
iter.stop();
});
iter.closed().then(() => {
qi.push(() => {
qi.stop();
});
});
qi.iterClosed.then(() => {
iter.stop();
});

return qi;
}
Expand All @@ -927,6 +913,12 @@ export class Bucket implements KV, KvRemove {
});

const oc = await this.js.consumers.getPushConsumer(this.stream, cc);
const info = await oc.info();
if (info.num_pending === 0) {
keys.stop();
return keys;
}

keys._data = oc;

const iter = await oc.consume({
Expand All @@ -942,11 +934,6 @@ export class Bucket implements KV, KvRemove {
},
});

// get the info used on consumer create
const last = await oc.info(true);
if (last.num_pending === 0) {
keys.stop();
}
iter.closed().then(() => {
keys.push(() => {
keys.stop();
Expand Down
11 changes: 6 additions & 5 deletions kv/tests/kv_test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -345,6 +345,8 @@ Deno.test("kv - cleanups/empty", async () => {
if (await notCompatible(ns, nc, "2.6.3")) {
return;
}
const nci = nc as NatsConnectionImpl;

const n = nuid.next();
const js = jetstream(nc);
const bucket = await new Kvm(js).create(n);
Expand All @@ -356,9 +358,9 @@ Deno.test("kv - cleanups/empty", async () => {
const keys = await collect(await bucket.keys());
assertEquals(keys.length, 0);

const nci = nc as NatsConnectionImpl;
// mux should be created
const min = nci.protocol.subscriptions.getMux() ? 1 : 0;

assertEquals(nci.protocol.subscriptions.subs.size, min);

await cleanup(ns, nc);
Expand Down Expand Up @@ -443,6 +445,7 @@ Deno.test("kv - bucket watch", async () => {
await b.put("x", Empty);

await done;
await delay(0);

assertEquals(iter.getProcessed(), 7);
assertEquals(m.get("a"), "2");
Expand Down Expand Up @@ -497,6 +500,7 @@ Deno.test("kv - key watch", async () => {
const js = jetstream(nc);
const bucket = await new Kvm(js).create(nuid.next()) as Bucket;
await keyWatch(bucket);
await delay(0);

const nci = nc as NatsConnectionImpl;
const min = nci.protocol.subscriptions.getMux() ? 1 : 0;
Expand All @@ -519,6 +523,7 @@ Deno.test("kv - codec key watch", async () => {
},
}) as Bucket;
await keyWatch(bucket);
await delay(0);

const nci = nc as NatsConnectionImpl;
const min = nci.protocol.subscriptions.getMux() ? 1 : 0;
Expand Down Expand Up @@ -660,10 +665,6 @@ Deno.test("kv - complex key", async () => {
const vvv = await dd;
assertEquals(vvv.string(), "hello");

const nci = nc as NatsConnectionImpl;
const min = nci.protocol.subscriptions.getMux() ? 1 : 0;
assertEquals(nci.protocol.subscriptions.subs.size, min);

await cleanup(ns, nc);
});

Expand Down

0 comments on commit 3671901

Please sign in to comment.