Skip to content

Commit

Permalink
fix(jetstream) fetch/next() now rely on missing heartbeats detection …
Browse files Browse the repository at this point in the history
…to fail a requests that were not properly "finished" by the server. This has the benefit that messages arriving "late" are now likely to complete successfully since the client won't be proactively ending the request.

change(jetstream) non-ordered fetch/next will now throw/reject if heartbeats are missed - previously you could find out if you had statuses enabled on the fetch. On next() it was silently ignored and the client received a null message.

tests(jetstream): removed flapper tests that attempted yield ConsumerNotFound/StreamNotFound when a resource as removed prior to a consume/fetch/next.

tests(jestream): fixed cross-account configurations to have a response_type of "stream"

There's also a small implication for the edge case where a client is partitioned from jetstream towards the end of the fetch/next - in those cases is possible for the request to extend for up-to a minute (max heartbeat interval is 30s, and two misses are required). This is an edge condition when compared to the normal fetch/next that gets a new message towards the end of the expires. In this more common case, there won't be a redelivery since the message won't be dropped because of an overly eager timeout.

Fix #149

Signed-off-by: Alberto Ricart <[email protected]>
  • Loading branch information
aricart committed Nov 20, 2024
1 parent 95ee79a commit 3c11b28
Show file tree
Hide file tree
Showing 10 changed files with 160 additions and 300 deletions.
52 changes: 25 additions & 27 deletions jetstream/src/consumer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ import type {
Status,
Subscription,
SubscriptionImpl,
Timeout,
} from "@nats-io/nats-core/internal";
import {
backoff,
Expand All @@ -33,7 +32,6 @@ import {
nanos,
nuid,
QueuedIteratorImpl,
timeout,
} from "@nats-io/nats-core/internal";
import type { ConsumerAPIImpl } from "./jsmconsumer_api.ts";

Expand All @@ -47,6 +45,8 @@ import type {
PullOptions,
} from "./jsapi_types.ts";
import { AckPolicy, DeliverPolicy } from "./jsapi_types.ts";
import { ConsumerDebugEvents, ConsumerEvents } from "./types.ts";

import type {
ConsumeMessages,
ConsumeOptions,
Expand All @@ -67,8 +67,7 @@ import type {
ThresholdBytes,
ThresholdMessages,
} from "./types.ts";
import { ConsumerDebugEvents, ConsumerEvents } from "./types.ts";
import { JetStreamStatus } from "./jserrors.ts";
import { JetStreamError, JetStreamStatus } from "./jserrors.ts";
import { minValidation } from "./jsutil.ts";

enum PullConsumerType {
Expand Down Expand Up @@ -120,7 +119,6 @@ export class PullConsumerMessagesImpl extends QueuedIteratorImpl<JsMsg>
pending: { msgs: number; bytes: number; requests: number };
isConsume: boolean;
callback: ConsumerCallbackFn | null;
timeout: Timeout<unknown> | null;
listeners: QueuedIterator<ConsumerStatus>[];
statusIterator?: QueuedIteratorImpl<Status>;
abortOnMissingResource?: boolean;
Expand Down Expand Up @@ -170,7 +168,6 @@ export class PullConsumerMessagesImpl extends QueuedIteratorImpl<JsMsg>
this.noIterator = typeof this.callback === "function";
this.monitor = null;
this.pending = { msgs: 0, bytes: 0, requests: 0 };
this.timeout = null;
this.listeners = [];
this.abortOnMissingResource = copts.abort_on_missing_resource === true;
this.bind = copts.bind === true;
Expand Down Expand Up @@ -327,6 +324,17 @@ export class PullConsumerMessagesImpl extends QueuedIteratorImpl<JsMsg>
// on the next pull etc - the only assumption here is we should
// reset and check if the consumer was deleted from under us
this.notify(ConsumerEvents.HeartbeatsMissed, data);
if (!this.isConsume && !this.consumer.ordered) {
// if we are not a consume, give up - this was masked by an
// external timer on fetch - the hb is a more reliable timeout
// since it requires 2 to be missed - this creates an edge case
// that would timeout the client longer than they would expect: we
// could be waiting for one more message, nothing happens, and
// now we have to wait for 2 missed hbs, which would be 1m (max), so
// there wouldn't be a fail fast.
this.stop(new JetStreamError("heartbeats missed"));
return true;
}
this.resetPending()
.then(() => {
})
Expand Down Expand Up @@ -614,10 +622,6 @@ export class PullConsumerMessagesImpl extends QueuedIteratorImpl<JsMsg>
return opts;
}

trackTimeout(t: Timeout<unknown>) {
this.timeout = t;
}

close(): Promise<void | Error> {
this.stop();
return this.iterClosed;
Expand All @@ -630,8 +634,6 @@ export class PullConsumerMessagesImpl extends QueuedIteratorImpl<JsMsg>
clearTimers() {
this.monitor?.cancel();
this.monitor = null;
this.timeout?.cancel();
this.timeout = null;
}

override stop(err?: Error) {
Expand Down Expand Up @@ -686,12 +688,16 @@ export class PullConsumerMessagesImpl extends QueuedIteratorImpl<JsMsg>
);
}

// require idle_heartbeat
// require idle_heartbeat - clamp between 500-30_000
args.idle_heartbeat = args.idle_heartbeat || args.expires / 2;
args.idle_heartbeat = args.idle_heartbeat > 30_000
? 30_000
: args.idle_heartbeat;

if (args.idle_heartbeat < 500) {
args.idle_heartbeat = 500;
}

if (refilling) {
const minMsgs = Math.round(args.max_messages * .75) || 1;
args.threshold_messages = args.threshold_messages || minMsgs;
Expand Down Expand Up @@ -848,18 +854,6 @@ export class PullConsumerImpl implements Consumer {
if (this.ordered) {
this.messages = m;
}
// FIXME: need some way to pad this correctly
const to = Math.round(m.opts.expires! * 1.05);
const timer = timeout(to);
m.closed().catch((err) => {
console.log(err);
}).finally(() => {
timer.cancel();
});
timer.catch(() => {
m.close().catch();
});
m.trackTimeout(timer);

return Promise.resolve(m);
}
Expand All @@ -871,8 +865,12 @@ export class PullConsumerImpl implements Consumer {
fopts.max_messages = 1;

const iter = await this.fetch(fopts);
for await (const m of iter) {
return m;
try {
for await (const m of iter) {
return m;
}
} catch (err) {
return Promise.reject(err);
}
return null;
}
Expand Down
86 changes: 61 additions & 25 deletions jetstream/tests/consume_test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,13 +13,7 @@
* limitations under the License.
*/

import {
cleanup,
connect,
jetstreamServerConf,
NatsServer,
setup,
} from "test_helpers";
import { cleanup, jetstreamServerConf, setup } from "test_helpers";
import { setupStreamAndConsumer } from "../examples/util.ts";
import {
assert,
Expand All @@ -34,18 +28,19 @@ import {
delay,
errors,
nanos,
type NatsConnectionImpl,
syncIterator,
} from "@nats-io/nats-core";
} from "@nats-io/nats-core/internal";
import type { PullConsumerMessagesImpl } from "../src/consumer.ts";
import {
AckPolicy,
ConsumerEvents,
type ConsumerStatus,
DeliverPolicy,
jetstream,
jetstreamManager,
} from "../src/mod.ts";

import type { ConsumerStatus } from "../src/mod.ts";
import type { PushConsumerMessagesImpl } from "../src/pushconsumer.ts";

Deno.test("consumers - consume", async () => {
const { ns, nc } = await setup(jetstreamServerConf());
Expand Down Expand Up @@ -103,8 +98,7 @@ Deno.test("consumers - consume callback rejects iter", async () => {
});

Deno.test("consume - heartbeats", async () => {
const servers = await NatsServer.setupDataConnCluster(4);
const nc = await connect({ port: servers[0].port });
const { ns, nc } = await setup(jetstreamServerConf());
const { stream } = await initStream(nc);
const jsm = await jetstreamManager(nc);
await jsm.consumers.add(stream, {
Expand All @@ -118,20 +112,12 @@ Deno.test("consume - heartbeats", async () => {
max_messages: 100,
idle_heartbeat: 1000,
expires: 30000,
});
}) as PushConsumerMessagesImpl;

const buf: Promise<void>[] = [];
// stop the data serverss
setTimeout(() => {
buf.push(servers[1].stop());
buf.push(servers[2].stop());
buf.push(servers[3].stop());
}, 1000);

await Promise.all(buf);
// make heartbeats trigger
(nc as NatsConnectionImpl)._resub(iter.sub, "foo");

const d = deferred<ConsumerStatus>();

await (async () => {
const status = iter.status();
for await (const s of status) {
Expand All @@ -151,8 +137,7 @@ Deno.test("consume - heartbeats", async () => {
assertEquals(cs.type, ConsumerEvents.HeartbeatsMissed);
assertEquals(cs.data, 2);

await nc.close();
await NatsServer.stopAll(servers, true);
await cleanup(ns, nc);
});

Deno.test("consume - deleted consumer", async () => {
Expand Down Expand Up @@ -448,3 +433,54 @@ Deno.test("consume - consumer bind", async () => {

await cleanup(ns, nc);
});

Deno.test("consume - timer is based on idle_hb", async () => {
const { ns, nc } = await setup(jetstreamServerConf());

const jsm = await jetstreamManager(nc);
await jsm.streams.add({ name: "A", subjects: ["a"] });

await jsm.consumers.add("A", {
durable_name: "a",
deliver_policy: DeliverPolicy.All,
ack_policy: AckPolicy.Explicit,
});

const js = jetstream(nc);
await js.publish("a");
const c = await js.consumers.get("A", "a");

const iter = await c.fetch({
expires: 2000,
max_messages: 10,
}) as PullConsumerMessagesImpl;

let hbm = false;
(async () => {
for await (const s of iter.status()) {
if (s.type === ConsumerEvents.HeartbeatsMissed) {
hbm = true;
}
}
})().then();

const buf = [];
await assertRejects(
async () => {
for await (const m of iter) {
buf.push(m);
m.ack();
// make the subscription now fail
const nci = nc as NatsConnectionImpl;
nci._resub(iter.sub, "foo");
}
},
Error,
"heartbeats missed",
);

assertEquals(buf.length, 1);
assertEquals(hbm, true);

await cleanup(ns, nc);
});
57 changes: 0 additions & 57 deletions jetstream/tests/consumers_ordered_test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,6 @@ import type {
PullConsumerImpl,
PullConsumerMessagesImpl,
} from "../src/consumer.ts";
import { StreamImpl } from "../src/jsmstream_api.ts";
import { delayUntilAssetNotFound } from "./util.ts";
import { flakyTest } from "../../test_helpers/mod.ts";
import { ConsumerNotFoundError } from "../src/jserrors.ts";
Expand Down Expand Up @@ -747,62 +746,6 @@ Deno.test("ordered consumers - next deleted consumer", async () => {
await cleanup(ns, nc);
});

Deno.test(
"ordered consumers - next stream not found",
flakyTest(async () => {
const { ns, nc } = await setup(jetstreamServerConf());

const jsm = await jetstreamManager(nc);
await jsm.streams.add({ name: "A", subjects: ["hello"] });

const js = jetstream(nc);
const c = await js.consumers.get("A");
await jsm.streams.delete("A");

await assertRejects(
() => {
return c.next({ expires: 1000 });
},
Error,
"stream not found",
);

await cleanup(ns, nc);
}),
);

Deno.test(
"ordered consumers - fetch stream not found",
flakyTest(async () => {
const { ns, nc } = await setup(jetstreamServerConf());
const jsm = await jetstreamManager(nc);
const si = await jsm.streams.add({ name: "A", subjects: ["a"] });

const js = jetstream(nc);
const c = await js.consumers.get("A");

const s = new StreamImpl(jsm.streams, si);
await jsm.streams.delete("A");
await delayUntilAssetNotFound(s);

const iter = await c.fetch({
expires: 3000,
});

await assertRejects(
async () => {
for await (const _ of iter) {
// ignore
}
},
Error,
"stream not found",
);

await cleanup(ns, nc);
}),
);

Deno.test("ordered consumers - consume stream not found request abort", async () => {
const { ns, nc } = await setup(jetstreamServerConf());

Expand Down
Loading

0 comments on commit 3c11b28

Please sign in to comment.