diff --git a/core/src/nats.ts b/core/src/nats.ts index d6b6c075..2e54f291 100644 --- a/core/src/nats.ts +++ b/core/src/nats.ts @@ -28,7 +28,6 @@ import { RequestMany, RequestOne } from "./request.ts"; import type { ConnectionOptions, Context, - Dispatcher, Msg, NatsConnection, Payload, @@ -49,12 +48,10 @@ export class NatsConnectionImpl implements NatsConnection { options: ConnectionOptions; protocol!: ProtocolHandler; draining: boolean; - listeners: Dispatcher[]; private constructor(opts: ConnectionOptions) { this.draining = false; this.options = parseOptions(opts); - this.listeners = []; } public static connect(opts: ConnectionOptions = {}): Promise { @@ -63,13 +60,6 @@ export class NatsConnectionImpl implements NatsConnection { ProtocolHandler.connect(nc.options, nc) .then((ph: ProtocolHandler) => { nc.protocol = ph; - (async function () { - for await (const s of ph.status()) { - nc.listeners.forEach((l) => { - l.push(s); - }); - } - })(); resolve(nc); }) .catch((err: Error) => { @@ -482,10 +472,12 @@ export class NatsConnectionImpl implements NatsConnection { status(): AsyncIterable { const iter = new QueuedIteratorImpl(); iter.iterClosed.then(() => { - const idx = this.listeners.indexOf(iter); - this.listeners.splice(idx, 1); + const idx = this.protocol.listeners.indexOf(iter); + if (idx > -1) { + this.protocol.listeners.splice(idx, 1); + } }); - this.listeners.push(iter); + this.protocol.listeners.push(iter); return iter; } diff --git a/core/src/protocol.ts b/core/src/protocol.ts index 78965cc3..9553cc30 100644 --- a/core/src/protocol.ts +++ b/core/src/protocol.ts @@ -15,7 +15,7 @@ import { decode, Empty, encode, TE } from "./encoders.ts"; import type { Transport } from "./transport.ts"; import { CR_LF, CRLF, getResolveFn, newTransport } from "./transport.ts"; -import type { Deferred, Timeout } from "./util.ts"; +import type { Deferred, Delay, Timeout } from "./util.ts"; import { deferred, delay, extend, timeout } from "./util.ts"; import { DataBuffer } from "./databuffer.ts"; import type { ServerImpl } from "./servers.ts"; @@ -397,6 +397,7 @@ export class ProtocolHandler implements Dispatcher { server!: ServerImpl; features: Features; connectPromise: Promise | null; + dialDelay: Delay | null; raceTimer?: Timeout; constructor(options: ConnectionOptions, publisher: Publisher) { @@ -423,6 +424,7 @@ export class ProtocolHandler implements Dispatcher { this.pendingLimit = options.pendingLimit || this.pendingLimit; this.features = new Features({ major: 0, minor: 0, micro: 0 }); this.connectPromise = null; + this.dialDelay = null; const servers = typeof options.servers === "string" ? [options.servers] @@ -462,12 +464,6 @@ export class ProtocolHandler implements Dispatcher { }); } - status(): AsyncIterable { - const iter = new QueuedIteratorImpl(); - this.listeners.push(iter); - return iter; - } - private prepare(): Deferred { if (this.transport) { this.transport.discard(); @@ -541,10 +537,10 @@ export class ProtocolHandler implements Dispatcher { } }) .catch((err) => { - this._close(err); + this.close(err).catch(); }); } else { - await this._close(err); + await this.close(err).catch(); } } @@ -671,7 +667,8 @@ export class ProtocolHandler implements Dispatcher { } } else { maxWait = Math.min(maxWait, srv.lastConnect + wait - now); - await delay(maxWait); + this.dialDelay = delay(maxWait); + await this.dialDelay; } } } @@ -819,7 +816,7 @@ export class ProtocolHandler implements Dispatcher { this.transport.send(PING_CMD); } catch (err) { // if we are dying here, this is likely some an authenticator blowing up - this._close(err as Error); + this.close(err as Error).catch(); } } if (updates) { @@ -1025,7 +1022,7 @@ export class ProtocolHandler implements Dispatcher { } } - private async _close(err?: Error): Promise { + async close(err?: Error): Promise { if (this._closed) { return; } @@ -1037,19 +1034,24 @@ export class ProtocolHandler implements Dispatcher { } this.muxSubscriptions.close(); this.subscriptions.close(); - this.listeners.forEach((l) => { - l.stop(); - }); + const proms = []; + for (let i = 0; i < this.listeners.length; i++) { + const qi = this.listeners[i]; + if (qi) { + qi.stop(); + proms.push(qi.iterClosed); + } + } + if (proms.length) { + await Promise.all(proms); + } this._closed = true; await this.transport.close(err); this.raceTimer?.cancel(); + this.dialDelay?.cancel(); this.closed.resolve(err); } - close(): Promise { - return this._close(); - } - isClosed(): boolean { return this._closed; } diff --git a/core/tests/events_test.ts b/core/tests/events_test.ts index 935188e2..273fec65 100644 --- a/core/tests/events_test.ts +++ b/core/tests/events_test.ts @@ -18,6 +18,8 @@ import { assertEquals } from "jsr:@std/assert"; import { delay } from "../src/internal_mod.ts"; import type { NatsConnectionImpl } from "../src/internal_mod.ts"; import { setup } from "test_helpers"; +import { cleanup } from "../../test_helpers/mod.ts"; +import { deferred } from "https://deno.land/x/nats@v1.10.2/nats-base-client/mod.ts"; Deno.test("events - close on close", async () => { const { ns, nc } = await setup(); @@ -148,3 +150,24 @@ Deno.test("events - ignore server updates", async () => { await nc.close(); await NatsServer.stopAll(cluster, true); }); + +Deno.test("events - clean up", async () => { + const { ns, nc } = await setup(); + const finished = deferred(); + + const done = (async () => { + for await (const _ of nc.status()) { + // nothing + } + // let's make sure the iter broke... + finished.resolve(); + })(); + await nc.reconnect(); + await nc.close(); + + await finished; + await done; + await nc.closed(); + + await cleanup(ns, nc); +}); diff --git a/jetstream/tests/fetch_test.ts b/jetstream/tests/fetch_test.ts index 0608c2e4..c7d65ad4 100644 --- a/jetstream/tests/fetch_test.ts +++ b/jetstream/tests/fetch_test.ts @@ -24,7 +24,6 @@ import { jetstream, jetstreamManager, } from "../src/mod.ts"; -import type { PullConsumerMessagesImpl } from "../src/consumer.ts"; Deno.test("fetch - no messages", async () => { const { ns, nc } = await setup(jetstreamServerConf()); @@ -219,37 +218,23 @@ Deno.test("fetch - listener leaks", async () => { await jsm.streams.add({ name: "messages", subjects: ["hello"] }); const js = jetstream(nc); - await js.publish("hello"); await jsm.consumers.add("messages", { durable_name: "myconsumer", deliver_policy: DeliverPolicy.All, ack_policy: AckPolicy.Explicit, - ack_wait: nanos(3000), - max_waiting: 500, }); const nci = nc as NatsConnectionImpl; const base = nci.protocol.listeners.length; const consumer = await js.consumers.get("messages", "myconsumer"); - - let done = false; - while (!done) { - const iter = await consumer.fetch({ - max_messages: 1, - }) as PullConsumerMessagesImpl; - for await (const m of iter) { - assertEquals(nci.protocol.listeners.length, base); - m?.nak(); - if (m.info.redeliveryCount > 100) { - done = true; - } - } + const iter = await consumer.fetch({ max_messages: 1, expires: 2000 }); + for await (const _ of iter) { + // nothing } assertEquals(nci.protocol.listeners.length, base); - await cleanup(ns, nc); });