Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fetch()/next() rely on missed heartbeats to detect abnormally ending next requests #150

Merged
merged 1 commit into from
Nov 20, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
52 changes: 25 additions & 27 deletions jetstream/src/consumer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ import type {
Status,
Subscription,
SubscriptionImpl,
Timeout,
} from "@nats-io/nats-core/internal";
import {
backoff,
Expand All @@ -33,7 +32,6 @@ import {
nanos,
nuid,
QueuedIteratorImpl,
timeout,
} from "@nats-io/nats-core/internal";
import type { ConsumerAPIImpl } from "./jsmconsumer_api.ts";

Expand All @@ -47,6 +45,8 @@ import type {
PullOptions,
} from "./jsapi_types.ts";
import { AckPolicy, DeliverPolicy } from "./jsapi_types.ts";
import { ConsumerDebugEvents, ConsumerEvents } from "./types.ts";

import type {
ConsumeMessages,
ConsumeOptions,
Expand All @@ -67,8 +67,7 @@ import type {
ThresholdBytes,
ThresholdMessages,
} from "./types.ts";
import { ConsumerDebugEvents, ConsumerEvents } from "./types.ts";
import { JetStreamStatus } from "./jserrors.ts";
import { JetStreamError, JetStreamStatus } from "./jserrors.ts";
import { minValidation } from "./jsutil.ts";

enum PullConsumerType {
Expand Down Expand Up @@ -120,7 +119,6 @@ export class PullConsumerMessagesImpl extends QueuedIteratorImpl<JsMsg>
pending: { msgs: number; bytes: number; requests: number };
isConsume: boolean;
callback: ConsumerCallbackFn | null;
timeout: Timeout<unknown> | null;
listeners: QueuedIterator<ConsumerStatus>[];
statusIterator?: QueuedIteratorImpl<Status>;
abortOnMissingResource?: boolean;
Expand Down Expand Up @@ -170,7 +168,6 @@ export class PullConsumerMessagesImpl extends QueuedIteratorImpl<JsMsg>
this.noIterator = typeof this.callback === "function";
this.monitor = null;
this.pending = { msgs: 0, bytes: 0, requests: 0 };
this.timeout = null;
this.listeners = [];
this.abortOnMissingResource = copts.abort_on_missing_resource === true;
this.bind = copts.bind === true;
Expand Down Expand Up @@ -327,6 +324,17 @@ export class PullConsumerMessagesImpl extends QueuedIteratorImpl<JsMsg>
// on the next pull etc - the only assumption here is we should
// reset and check if the consumer was deleted from under us
this.notify(ConsumerEvents.HeartbeatsMissed, data);
if (!this.isConsume && !this.consumer.ordered) {
// if we are not a consume, give up - this was masked by an
// external timer on fetch - the hb is a more reliable timeout
// since it requires 2 to be missed - this creates an edge case
// that would timeout the client longer than they would expect: we
// could be waiting for one more message, nothing happens, and
// now we have to wait for 2 missed hbs, which would be 1m (max), so
// there wouldn't be a fail fast.
this.stop(new JetStreamError("heartbeats missed"));
return true;
}
this.resetPending()
.then(() => {
})
Expand Down Expand Up @@ -614,10 +622,6 @@ export class PullConsumerMessagesImpl extends QueuedIteratorImpl<JsMsg>
return opts;
}

trackTimeout(t: Timeout<unknown>) {
this.timeout = t;
}

close(): Promise<void | Error> {
this.stop();
return this.iterClosed;
Expand All @@ -630,8 +634,6 @@ export class PullConsumerMessagesImpl extends QueuedIteratorImpl<JsMsg>
clearTimers() {
this.monitor?.cancel();
this.monitor = null;
this.timeout?.cancel();
this.timeout = null;
}

override stop(err?: Error) {
Expand Down Expand Up @@ -686,12 +688,16 @@ export class PullConsumerMessagesImpl extends QueuedIteratorImpl<JsMsg>
);
}

// require idle_heartbeat
// require idle_heartbeat - clamp between 500-30_000
args.idle_heartbeat = args.idle_heartbeat || args.expires / 2;
args.idle_heartbeat = args.idle_heartbeat > 30_000
? 30_000
: args.idle_heartbeat;

if (args.idle_heartbeat < 500) {
args.idle_heartbeat = 500;
}

if (refilling) {
const minMsgs = Math.round(args.max_messages * .75) || 1;
args.threshold_messages = args.threshold_messages || minMsgs;
Expand Down Expand Up @@ -848,18 +854,6 @@ export class PullConsumerImpl implements Consumer {
if (this.ordered) {
this.messages = m;
}
// FIXME: need some way to pad this correctly
const to = Math.round(m.opts.expires! * 1.05);
const timer = timeout(to);
m.closed().catch((err) => {
console.log(err);
}).finally(() => {
timer.cancel();
});
timer.catch(() => {
m.close().catch();
});
m.trackTimeout(timer);

return Promise.resolve(m);
}
Expand All @@ -871,8 +865,12 @@ export class PullConsumerImpl implements Consumer {
fopts.max_messages = 1;

const iter = await this.fetch(fopts);
for await (const m of iter) {
return m;
try {
for await (const m of iter) {
return m;
}
} catch (err) {
return Promise.reject(err);
}
return null;
}
Expand Down
86 changes: 61 additions & 25 deletions jetstream/tests/consume_test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,13 +13,7 @@
* limitations under the License.
*/

import {
cleanup,
connect,
jetstreamServerConf,
NatsServer,
setup,
} from "test_helpers";
import { cleanup, jetstreamServerConf, setup } from "test_helpers";
import { setupStreamAndConsumer } from "../examples/util.ts";
import {
assert,
Expand All @@ -34,18 +28,19 @@ import {
delay,
errors,
nanos,
type NatsConnectionImpl,
syncIterator,
} from "@nats-io/nats-core";
} from "@nats-io/nats-core/internal";
import type { PullConsumerMessagesImpl } from "../src/consumer.ts";
import {
AckPolicy,
ConsumerEvents,
type ConsumerStatus,
DeliverPolicy,
jetstream,
jetstreamManager,
} from "../src/mod.ts";

import type { ConsumerStatus } from "../src/mod.ts";
import type { PushConsumerMessagesImpl } from "../src/pushconsumer.ts";

Deno.test("consumers - consume", async () => {
const { ns, nc } = await setup(jetstreamServerConf());
Expand Down Expand Up @@ -103,8 +98,7 @@ Deno.test("consumers - consume callback rejects iter", async () => {
});

Deno.test("consume - heartbeats", async () => {
const servers = await NatsServer.setupDataConnCluster(4);
const nc = await connect({ port: servers[0].port });
const { ns, nc } = await setup(jetstreamServerConf());
const { stream } = await initStream(nc);
const jsm = await jetstreamManager(nc);
await jsm.consumers.add(stream, {
Expand All @@ -118,20 +112,12 @@ Deno.test("consume - heartbeats", async () => {
max_messages: 100,
idle_heartbeat: 1000,
expires: 30000,
});
}) as PushConsumerMessagesImpl;

const buf: Promise<void>[] = [];
// stop the data serverss
setTimeout(() => {
buf.push(servers[1].stop());
buf.push(servers[2].stop());
buf.push(servers[3].stop());
}, 1000);

await Promise.all(buf);
// make heartbeats trigger
(nc as NatsConnectionImpl)._resub(iter.sub, "foo");

const d = deferred<ConsumerStatus>();

await (async () => {
const status = iter.status();
for await (const s of status) {
Expand All @@ -151,8 +137,7 @@ Deno.test("consume - heartbeats", async () => {
assertEquals(cs.type, ConsumerEvents.HeartbeatsMissed);
assertEquals(cs.data, 2);

await nc.close();
await NatsServer.stopAll(servers, true);
await cleanup(ns, nc);
});

Deno.test("consume - deleted consumer", async () => {
Expand Down Expand Up @@ -448,3 +433,54 @@ Deno.test("consume - consumer bind", async () => {

await cleanup(ns, nc);
});

Deno.test("consume - timer is based on idle_hb", async () => {
const { ns, nc } = await setup(jetstreamServerConf());

const jsm = await jetstreamManager(nc);
await jsm.streams.add({ name: "A", subjects: ["a"] });

await jsm.consumers.add("A", {
durable_name: "a",
deliver_policy: DeliverPolicy.All,
ack_policy: AckPolicy.Explicit,
});

const js = jetstream(nc);
await js.publish("a");
const c = await js.consumers.get("A", "a");

const iter = await c.fetch({
expires: 2000,
max_messages: 10,
}) as PullConsumerMessagesImpl;

let hbm = false;
(async () => {
for await (const s of iter.status()) {
if (s.type === ConsumerEvents.HeartbeatsMissed) {
hbm = true;
}
}
})().then();

const buf = [];
await assertRejects(
async () => {
for await (const m of iter) {
buf.push(m);
m.ack();
// make the subscription now fail
const nci = nc as NatsConnectionImpl;
nci._resub(iter.sub, "foo");
}
},
Error,
"heartbeats missed",
);

assertEquals(buf.length, 1);
assertEquals(hbm, true);

await cleanup(ns, nc);
});
57 changes: 0 additions & 57 deletions jetstream/tests/consumers_ordered_test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,6 @@ import type {
PullConsumerImpl,
PullConsumerMessagesImpl,
} from "../src/consumer.ts";
import { StreamImpl } from "../src/jsmstream_api.ts";
import { delayUntilAssetNotFound } from "./util.ts";
import { flakyTest } from "../../test_helpers/mod.ts";
import { ConsumerNotFoundError } from "../src/jserrors.ts";
Expand Down Expand Up @@ -747,62 +746,6 @@ Deno.test("ordered consumers - next deleted consumer", async () => {
await cleanup(ns, nc);
});

Deno.test(
"ordered consumers - next stream not found",
flakyTest(async () => {
const { ns, nc } = await setup(jetstreamServerConf());

const jsm = await jetstreamManager(nc);
await jsm.streams.add({ name: "A", subjects: ["hello"] });

const js = jetstream(nc);
const c = await js.consumers.get("A");
await jsm.streams.delete("A");

await assertRejects(
() => {
return c.next({ expires: 1000 });
},
Error,
"stream not found",
);

await cleanup(ns, nc);
}),
);

Deno.test(
"ordered consumers - fetch stream not found",
flakyTest(async () => {
const { ns, nc } = await setup(jetstreamServerConf());
const jsm = await jetstreamManager(nc);
const si = await jsm.streams.add({ name: "A", subjects: ["a"] });

const js = jetstream(nc);
const c = await js.consumers.get("A");

const s = new StreamImpl(jsm.streams, si);
await jsm.streams.delete("A");
await delayUntilAssetNotFound(s);

const iter = await c.fetch({
expires: 3000,
});

await assertRejects(
async () => {
for await (const _ of iter) {
// ignore
}
},
Error,
"stream not found",
);

await cleanup(ns, nc);
}),
);

Deno.test("ordered consumers - consume stream not found request abort", async () => {
const { ns, nc } = await setup(jetstreamServerConf());

Expand Down
Loading