Skip to content

Commit

Permalink
fix(jetstream): fixed an issue with jetstream publish no handling the…
Browse files Browse the repository at this point in the history
… republish properly.

Signed-off-by: Alberto Ricart <[email protected]>
  • Loading branch information
aricart committed Nov 6, 2024
1 parent 69eb332 commit 6d5f468
Show file tree
Hide file tree
Showing 2 changed files with 61 additions and 11 deletions.
31 changes: 21 additions & 10 deletions jetstream/src/jsclient.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,12 @@

import { BaseApiClientImpl } from "./jsbaseclient_api.ts";
import { ConsumerAPIImpl } from "./jsmconsumer_api.ts";
import { delay, Empty, QueuedIteratorImpl } from "@nats-io/nats-core/internal";
import {
backoff,
delay,
Empty,
QueuedIteratorImpl,
} from "@nats-io/nats-core/internal";

import { ConsumersImpl, StreamAPIImpl, StreamsImpl } from "./jsmstream_api.ts";

Expand All @@ -34,7 +39,7 @@ import type {
StreamAPI,
Streams,
} from "./types.ts";
import { errors, headers } from "@nats-io/nats-core/internal";
import { errors, headers, RequestError } from "@nats-io/nats-core/internal";

import type {
Msg,
Expand All @@ -49,7 +54,7 @@ import type {
JetStreamAccountStats,
} from "./jsapi_types.ts";
import { DirectStreamAPIImpl } from "./jsm.ts";
import { JetStreamError } from "./jserrors.ts";
import { JetStreamError, JetStreamNotEnabled } from "./jserrors.ts";

export function toJetStreamClient(
nc: NatsConnection | JetStreamClient,
Expand Down Expand Up @@ -205,29 +210,35 @@ export class JetStreamClientImpl extends BaseApiClientImpl
ro.headers = mh;
}

let { retries, retry_delay } = opts as {
let { retries } = opts as {
retries: number;
retry_delay: number;
};
retries = retries || 1;
retry_delay = retry_delay || 250;
const bo = backoff();

let r: Msg;
let r: Msg | null = null;
for (let i = 0; i < retries; i++) {
try {
r = await this.nc.request(subj, data, ro);
// if here we succeeded
break;
} catch (err) {
const re = err instanceof RequestError ? err as RequestError : null;
if (
err instanceof errors.RequestError && err.isNoResponders()
err instanceof errors.TimeoutError ||
re?.isNoResponders() && i + 1 < retries
) {
await delay(retry_delay);
await delay(bo.backoff(i));
} else {
throw err;
throw re?.isNoResponders()
? new JetStreamNotEnabled(`jetstream is not enabled`, {
cause: err,
})
: err;
}
}
}

const pa = this.parseJsResponse(r!) as PubAck;
if (pa.stream === "") {
throw new JetStreamError("invalid ack response");
Expand Down
41 changes: 40 additions & 1 deletion jetstream/tests/jetstream_test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -32,12 +32,15 @@ import {
Empty,
headers,
nanos,
NoRespondersError,
nuid,
RequestError,
} from "@nats-io/nats-core";
import {
assert,
assertEquals,
assertExists,
assertInstanceOf,
assertRejects,
assertThrows,
} from "jsr:@std/assert";
Expand All @@ -52,7 +55,7 @@ import {
setup,
} from "test_helpers";
import { PubHeaders } from "../src/jsapi_types.ts";
import { JetStreamApiError } from "../src/jserrors.ts";
import { JetStreamApiError, JetStreamNotEnabled } from "../src/jserrors.ts";

Deno.test("jetstream - default options", () => {
const opts = defaultJsOptions();
Expand Down Expand Up @@ -1076,3 +1079,39 @@ Deno.test("jetstream - term reason", async () => {

await cleanup(ns, nc);
});

Deno.test("jetstream - publish no responder", async (t) => {
await t.step("not a jetstream server", async () => {
const { ns, nc } = await setup();
const js = jetstream(nc);
const err = await assertRejects(
() => {
return js.publish("hello");
},
JetStreamNotEnabled,
);

assertInstanceOf(err.cause, RequestError);
assertInstanceOf(err.cause?.cause, NoRespondersError);

await cleanup(ns, nc);
});

await t.step("jetstream not listening for subject", async () => {
const { ns, nc } = await setup(jetstreamServerConf());
const jsm = await jetstreamManager(nc);
await jsm.streams.add({ name: "s", subjects: ["a", "b"] });
const js = jetstream(nc);
const err = await assertRejects(
() => {
return js.publish("c");
},
JetStreamNotEnabled,
);

assertInstanceOf(err.cause, RequestError);
assertInstanceOf(err.cause?.cause, NoRespondersError);

await cleanup(ns, nc);
});
});

0 comments on commit 6d5f468

Please sign in to comment.