From 6a1b345b16c471e5472b2544035890e505bf654f Mon Sep 17 00:00:00 2001 From: Alberto Ricart Date: Tue, 12 Nov 2024 21:51:15 -0500 Subject: [PATCH] fix(core): fixed a resource leak that was not apparent if the processing for status was in an async function - the loop would terminate, but the actual closing didn't finish so any code following the async loop didn't execute. also simplified handling of connection status listeners (listeners were tracked by the connection and the protocol - now only by the protocol), fixed a leak on dial where a promise was not cancelled for the wait between dial interval. (#137) fix(test): jetstream test was not really testing the status listener leaks Fix #134 Signed-off-by: Alberto Ricart --- core/src/nats.ts | 18 +++++----------- core/src/protocol.ts | 40 ++++++++++++++++++----------------- core/tests/events_test.ts | 23 ++++++++++++++++++++ jetstream/tests/fetch_test.ts | 21 +++--------------- 4 files changed, 52 insertions(+), 50 deletions(-) diff --git a/core/src/nats.ts b/core/src/nats.ts index d6b6c075..2e54f291 100644 --- a/core/src/nats.ts +++ b/core/src/nats.ts @@ -28,7 +28,6 @@ import { RequestMany, RequestOne } from "./request.ts"; import type { ConnectionOptions, Context, - Dispatcher, Msg, NatsConnection, Payload, @@ -49,12 +48,10 @@ export class NatsConnectionImpl implements NatsConnection { options: ConnectionOptions; protocol!: ProtocolHandler; draining: boolean; - listeners: Dispatcher[]; private constructor(opts: ConnectionOptions) { this.draining = false; this.options = parseOptions(opts); - this.listeners = []; } public static connect(opts: ConnectionOptions = {}): Promise { @@ -63,13 +60,6 @@ export class NatsConnectionImpl implements NatsConnection { ProtocolHandler.connect(nc.options, nc) .then((ph: ProtocolHandler) => { nc.protocol = ph; - (async function () { - for await (const s of ph.status()) { - nc.listeners.forEach((l) => { - l.push(s); - }); - } - })(); resolve(nc); }) .catch((err: Error) => { @@ -482,10 +472,12 @@ export class NatsConnectionImpl implements NatsConnection { status(): AsyncIterable { const iter = new QueuedIteratorImpl(); iter.iterClosed.then(() => { - const idx = this.listeners.indexOf(iter); - this.listeners.splice(idx, 1); + const idx = this.protocol.listeners.indexOf(iter); + if (idx > -1) { + this.protocol.listeners.splice(idx, 1); + } }); - this.listeners.push(iter); + this.protocol.listeners.push(iter); return iter; } diff --git a/core/src/protocol.ts b/core/src/protocol.ts index 78965cc3..9553cc30 100644 --- a/core/src/protocol.ts +++ b/core/src/protocol.ts @@ -15,7 +15,7 @@ import { decode, Empty, encode, TE } from "./encoders.ts"; import type { Transport } from "./transport.ts"; import { CR_LF, CRLF, getResolveFn, newTransport } from "./transport.ts"; -import type { Deferred, Timeout } from "./util.ts"; +import type { Deferred, Delay, Timeout } from "./util.ts"; import { deferred, delay, extend, timeout } from "./util.ts"; import { DataBuffer } from "./databuffer.ts"; import type { ServerImpl } from "./servers.ts"; @@ -397,6 +397,7 @@ export class ProtocolHandler implements Dispatcher { server!: ServerImpl; features: Features; connectPromise: Promise | null; + dialDelay: Delay | null; raceTimer?: Timeout; constructor(options: ConnectionOptions, publisher: Publisher) { @@ -423,6 +424,7 @@ export class ProtocolHandler implements Dispatcher { this.pendingLimit = options.pendingLimit || this.pendingLimit; this.features = new Features({ major: 0, minor: 0, micro: 0 }); this.connectPromise = null; + this.dialDelay = null; const servers = typeof options.servers === "string" ? [options.servers] @@ -462,12 +464,6 @@ export class ProtocolHandler implements Dispatcher { }); } - status(): AsyncIterable { - const iter = new QueuedIteratorImpl(); - this.listeners.push(iter); - return iter; - } - private prepare(): Deferred { if (this.transport) { this.transport.discard(); @@ -541,10 +537,10 @@ export class ProtocolHandler implements Dispatcher { } }) .catch((err) => { - this._close(err); + this.close(err).catch(); }); } else { - await this._close(err); + await this.close(err).catch(); } } @@ -671,7 +667,8 @@ export class ProtocolHandler implements Dispatcher { } } else { maxWait = Math.min(maxWait, srv.lastConnect + wait - now); - await delay(maxWait); + this.dialDelay = delay(maxWait); + await this.dialDelay; } } } @@ -819,7 +816,7 @@ export class ProtocolHandler implements Dispatcher { this.transport.send(PING_CMD); } catch (err) { // if we are dying here, this is likely some an authenticator blowing up - this._close(err as Error); + this.close(err as Error).catch(); } } if (updates) { @@ -1025,7 +1022,7 @@ export class ProtocolHandler implements Dispatcher { } } - private async _close(err?: Error): Promise { + async close(err?: Error): Promise { if (this._closed) { return; } @@ -1037,19 +1034,24 @@ export class ProtocolHandler implements Dispatcher { } this.muxSubscriptions.close(); this.subscriptions.close(); - this.listeners.forEach((l) => { - l.stop(); - }); + const proms = []; + for (let i = 0; i < this.listeners.length; i++) { + const qi = this.listeners[i]; + if (qi) { + qi.stop(); + proms.push(qi.iterClosed); + } + } + if (proms.length) { + await Promise.all(proms); + } this._closed = true; await this.transport.close(err); this.raceTimer?.cancel(); + this.dialDelay?.cancel(); this.closed.resolve(err); } - close(): Promise { - return this._close(); - } - isClosed(): boolean { return this._closed; } diff --git a/core/tests/events_test.ts b/core/tests/events_test.ts index 935188e2..273fec65 100644 --- a/core/tests/events_test.ts +++ b/core/tests/events_test.ts @@ -18,6 +18,8 @@ import { assertEquals } from "jsr:@std/assert"; import { delay } from "../src/internal_mod.ts"; import type { NatsConnectionImpl } from "../src/internal_mod.ts"; import { setup } from "test_helpers"; +import { cleanup } from "../../test_helpers/mod.ts"; +import { deferred } from "https://deno.land/x/nats@v1.10.2/nats-base-client/mod.ts"; Deno.test("events - close on close", async () => { const { ns, nc } = await setup(); @@ -148,3 +150,24 @@ Deno.test("events - ignore server updates", async () => { await nc.close(); await NatsServer.stopAll(cluster, true); }); + +Deno.test("events - clean up", async () => { + const { ns, nc } = await setup(); + const finished = deferred(); + + const done = (async () => { + for await (const _ of nc.status()) { + // nothing + } + // let's make sure the iter broke... + finished.resolve(); + })(); + await nc.reconnect(); + await nc.close(); + + await finished; + await done; + await nc.closed(); + + await cleanup(ns, nc); +}); diff --git a/jetstream/tests/fetch_test.ts b/jetstream/tests/fetch_test.ts index 0608c2e4..c7d65ad4 100644 --- a/jetstream/tests/fetch_test.ts +++ b/jetstream/tests/fetch_test.ts @@ -24,7 +24,6 @@ import { jetstream, jetstreamManager, } from "../src/mod.ts"; -import type { PullConsumerMessagesImpl } from "../src/consumer.ts"; Deno.test("fetch - no messages", async () => { const { ns, nc } = await setup(jetstreamServerConf()); @@ -219,37 +218,23 @@ Deno.test("fetch - listener leaks", async () => { await jsm.streams.add({ name: "messages", subjects: ["hello"] }); const js = jetstream(nc); - await js.publish("hello"); await jsm.consumers.add("messages", { durable_name: "myconsumer", deliver_policy: DeliverPolicy.All, ack_policy: AckPolicy.Explicit, - ack_wait: nanos(3000), - max_waiting: 500, }); const nci = nc as NatsConnectionImpl; const base = nci.protocol.listeners.length; const consumer = await js.consumers.get("messages", "myconsumer"); - - let done = false; - while (!done) { - const iter = await consumer.fetch({ - max_messages: 1, - }) as PullConsumerMessagesImpl; - for await (const m of iter) { - assertEquals(nci.protocol.listeners.length, base); - m?.nak(); - if (m.info.redeliveryCount > 100) { - done = true; - } - } + const iter = await consumer.fetch({ max_messages: 1, expires: 2000 }); + for await (const _ of iter) { + // nothing } assertEquals(nci.protocol.listeners.length, base); - await cleanup(ns, nc); });