diff --git a/jetstream/src/jsclient.ts b/jetstream/src/jsclient.ts index af1a2345..4f6c0ab4 100644 --- a/jetstream/src/jsclient.ts +++ b/jetstream/src/jsclient.ts @@ -15,7 +15,12 @@ import { BaseApiClientImpl } from "./jsbaseclient_api.ts"; import { ConsumerAPIImpl } from "./jsmconsumer_api.ts"; -import { delay, Empty, QueuedIteratorImpl } from "@nats-io/nats-core/internal"; +import { + backoff, + delay, + Empty, + QueuedIteratorImpl, +} from "@nats-io/nats-core/internal"; import { ConsumersImpl, StreamAPIImpl, StreamsImpl } from "./jsmstream_api.ts"; @@ -34,7 +39,7 @@ import type { StreamAPI, Streams, } from "./types.ts"; -import { errors, headers } from "@nats-io/nats-core/internal"; +import { errors, headers, RequestError } from "@nats-io/nats-core/internal"; import type { Msg, @@ -49,7 +54,7 @@ import type { JetStreamAccountStats, } from "./jsapi_types.ts"; import { DirectStreamAPIImpl } from "./jsm.ts"; -import { JetStreamError } from "./jserrors.ts"; +import { JetStreamError, JetStreamNotEnabled } from "./jserrors.ts"; export function toJetStreamClient( nc: NatsConnection | JetStreamClient, @@ -205,29 +210,35 @@ export class JetStreamClientImpl extends BaseApiClientImpl ro.headers = mh; } - let { retries, retry_delay } = opts as { + let { retries } = opts as { retries: number; - retry_delay: number; }; retries = retries || 1; - retry_delay = retry_delay || 250; + const bo = backoff(); - let r: Msg; + let r: Msg | null = null; for (let i = 0; i < retries; i++) { try { r = await this.nc.request(subj, data, ro); // if here we succeeded break; } catch (err) { + const re = err instanceof RequestError ? err as RequestError : null; if ( - err instanceof errors.RequestError && err.isNoResponders() + err instanceof errors.TimeoutError || + re?.isNoResponders() && i + 1 < retries ) { - await delay(retry_delay); + await delay(bo.backoff(i)); } else { - throw err; + throw re?.isNoResponders() + ? new JetStreamNotEnabled(`jetstream is not enabled`, { + cause: err, + }) + : err; } } } + const pa = this.parseJsResponse(r!) as PubAck; if (pa.stream === "") { throw new JetStreamError("invalid ack response"); diff --git a/jetstream/tests/jetstream_test.ts b/jetstream/tests/jetstream_test.ts index 90f0dc00..aa3ecf59 100644 --- a/jetstream/tests/jetstream_test.ts +++ b/jetstream/tests/jetstream_test.ts @@ -32,12 +32,15 @@ import { Empty, headers, nanos, + NoRespondersError, nuid, + RequestError, } from "@nats-io/nats-core"; import { assert, assertEquals, assertExists, + assertInstanceOf, assertRejects, assertThrows, } from "jsr:@std/assert"; @@ -52,7 +55,7 @@ import { setup, } from "test_helpers"; import { PubHeaders } from "../src/jsapi_types.ts"; -import { JetStreamApiError } from "../src/jserrors.ts"; +import { JetStreamApiError, JetStreamNotEnabled } from "../src/jserrors.ts"; Deno.test("jetstream - default options", () => { const opts = defaultJsOptions(); @@ -1076,3 +1079,39 @@ Deno.test("jetstream - term reason", async () => { await cleanup(ns, nc); }); + +Deno.test("jetstream - publish no responder", async (t) => { + await t.step("not a jetstream server", async () => { + const { ns, nc } = await setup(); + const js = jetstream(nc); + const err = await assertRejects( + () => { + return js.publish("hello"); + }, + JetStreamNotEnabled, + ); + + assertInstanceOf(err.cause, RequestError); + assertInstanceOf(err.cause?.cause, NoRespondersError); + + await cleanup(ns, nc); + }); + + await t.step("jetstream not listening for subject", async () => { + const { ns, nc } = await setup(jetstreamServerConf()); + const jsm = await jetstreamManager(nc); + await jsm.streams.add({ name: "s", subjects: ["a", "b"] }); + const js = jetstream(nc); + const err = await assertRejects( + () => { + return js.publish("c"); + }, + JetStreamNotEnabled, + ); + + assertInstanceOf(err.cause, RequestError); + assertInstanceOf(err.cause?.cause, NoRespondersError); + + await cleanup(ns, nc); + }); +});