Skip to content

Commit

Permalink
feat(core): added ability for subscriptions to have a slow option tha…
Browse files Browse the repository at this point in the history
…t when specified will check the depth of the iterator and report when the subscription is running slow.

fix #125

Signed-off-by: Alberto Ricart <[email protected]>
  • Loading branch information
aricart committed Nov 9, 2024
1 parent c7d578b commit af0f6eb
Show file tree
Hide file tree
Showing 6 changed files with 187 additions and 5 deletions.
11 changes: 11 additions & 0 deletions core/src/core.ts
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@ export enum Events {
LDM = "ldm",
/** Client received an async error from the server */
Error = "error",
/** Slow Consumer - a buffered subscription (iterator) that is accumulating messages beyond a specify threshold */
SlowConsumer = "slow_consumer",
}

/**
Expand Down Expand Up @@ -74,6 +76,15 @@ export interface SubOpts<T> {
* @param msg
*/
callback?: MsgCallback<T>;

/**
* Number of pending messages in a subscription to exceed prior to considering
* a subscription a Slow Consumer. By default, slow consumer is on a subscription
* is not accounted for.
*
* This is an experimental option.
*/
slow?: number;
}

export interface DnsResolveFn {
Expand Down
12 changes: 11 additions & 1 deletion core/src/nats.ts
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ import type {
Subscription,
SubscriptionOptions,
} from "./core.ts";
import { createInbox, RequestStrategy } from "./core.ts";
import { createInbox, Events, RequestStrategy } from "./core.ts";
import { errors, InvalidArgumentError, TimeoutError } from "./errors.ts";

export class NatsConnectionImpl implements NatsConnection {
Expand Down Expand Up @@ -138,6 +138,16 @@ export class NatsConnectionImpl implements NatsConnection {
): Subscription {
this._check(subject, true, false);
const sub = new SubscriptionImpl(this.protocol, subject, opts);

if (typeof opts.callback !== "function" && typeof opts.slow === "number") {
const subj = sub.getSubject();
sub.setSlowNotificationFn(opts.slow, (pending: number) => {
this.protocol.dispatchStatus({
type: Events.SlowConsumer,
data: `subscription (${sub.sid}) ${subj} is slow: msgs ${pending}`,
});
});
}
this.protocol.subscribe(sub);
return sub;
}
Expand Down
39 changes: 39 additions & 0 deletions core/src/protocol.ts
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,31 @@ export class Connect {
}
}

class SlowNotifier {
slow: number;
cb: (pending: number) => void;
notified: boolean;

constructor(slow: number, cb: (pending: number) => void) {
this.slow = slow;
this.cb = cb;
this.notified = false;
}

maybeNotify(pending: number): void {
// if we are below the threshold reset the ability to notify
if (pending <= this.slow) {
this.notified = false;
} else {
if (!this.notified) {
// crossed the threshold, notify and silence.
this.cb(pending);
this.notified = true;
}
}
}
}

export class SubscriptionImpl extends QueuedIteratorImpl<Msg>
implements Subscription {
sid!: number;
Expand All @@ -119,6 +144,7 @@ export class SubscriptionImpl extends QueuedIteratorImpl<Msg>
cleanupFn?: (sub: Subscription, info?: unknown) => void;
closed: Deferred<void>;
requestSubject?: string;
slow?: SlowNotifier;

constructor(
protocol: ProtocolHandler,
Expand Down Expand Up @@ -160,9 +186,22 @@ export class SubscriptionImpl extends QueuedIteratorImpl<Msg>
}
}

setSlowNotificationFn(slow: number, fn?: (pending: number) => void): void {
this.slow = undefined;
if (fn) {
if (this.noIterator) {
throw new Error("callbacks don't support slow notifications");
}
this.slow = new SlowNotifier(slow, fn);
}
}

callback(err: Error | null, msg: Msg) {
this.cancelTimeout();
err ? this.stop(err) : this.push(msg);
if (!err && this.slow) {
this.slow.maybeNotify(this.getPending());
}
}

close(): void {
Expand Down
67 changes: 67 additions & 0 deletions core/tests/basics_test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import {
deferred,
delay,
Empty,
Events,
Feature,
headers,
isIP,
Expand Down Expand Up @@ -1539,6 +1540,72 @@ Deno.test("basics - stats", async () => {
await cleanup(ns, nc);
});

Deno.test("basics - slow", async () => {
const { ns, nc } = await setup();

let slow = 0;
(async () => {
for await (const m of nc.status()) {
//@ts-ignore: test
if (m.type === Events.SlowConsumer) {
console.log(m);
slow++;
}
}
})().catch();
const sub = nc.subscribe("test", { slow: 10 });
const s = syncIterator(sub);

// we go over, should have a notification
for (let i = 0; i < 11; i++) {
nc.publish("test", "");
}

await delay(100);
assertEquals(sub.getPending(), 11);
assertEquals(slow, 1);
slow = 0;

// send one more, no more notifications until we drop below 10
nc.publish("test", "");
await delay(100); // 12
assertEquals(sub.getPending(), 12);
assertEquals(slow, 0);

await s.next(); // 12
await s.next(); // 11
await s.next(); // 10

nc.publish("test", ""); // 11
await delay(100);
assertEquals(sub.getPending(), 11);
assertEquals(slow, 0);

// now this will notify
await s.next(); // 11
await s.next(); // 10
await delay(100);
assertEquals(sub.getPending(), 9);

await s.next(); // 9
nc.publish("test", "");
await delay(100);
assertEquals(sub.getPending(), 9);
assertEquals(slow, 0);

nc.publish("test", ""); // 10
await delay(100);
assertEquals(sub.getPending(), 10);
assertEquals(slow, 0);

nc.publish("test", ""); // 11
await delay(100);
assertEquals(sub.getPending(), 11);
assertEquals(slow, 1);

await cleanup(ns, nc);
});

class MM implements Msg {
data!: Uint8Array;
sid: number;
Expand Down
4 changes: 2 additions & 2 deletions jetstream/src/jsapi_types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1010,8 +1010,8 @@ export type ConsumerUpdateConfig = PriorityGroups & {
*/
"inactive_threshold"?: Nanos;
/**
* List of durations in nanoseconds format that represents a retry timescale for
* NaK'd messages or those being normally retried
* List of durations in nanoseconds that represents a retry timescale for
* the redelivery of messages
*/
"backoff"?: Nanos[];
/**
Expand Down
59 changes: 57 additions & 2 deletions jetstream/tests/jetstream_test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ import {
} from "@nats-io/nats-core";
import {
assert,
assertAlmostEquals,
assertEquals,
assertExists,
assertInstanceOf,
Expand Down Expand Up @@ -707,7 +708,8 @@ Deno.test("jetstream - backoff", async () => {
const { stream, subj } = await initStream(nc);
const jsm = await jetstreamManager(nc);

const backoff = [nanos(250), nanos(1000), nanos(3000)];
const ms = [250, 1000, 3000];
const backoff = ms.map((n) => nanos(n));
const ci = await jsm.consumers.add(stream, {
durable_name: "me",
ack_policy: AckPolicy.Explicit,
Expand All @@ -723,10 +725,11 @@ Deno.test("jetstream - backoff", async () => {
const js = jetstream(nc);
await js.publish(subj);

const when: number[] = [];
const c = await js.consumers.get(stream, "me");
const iter = await c.consume({
callback: (m) => {
console.log(m.info.redeliveryCount);
when.push(Date.now());
if (m.info.redeliveryCount === 4) {
iter.stop();
}
Expand All @@ -735,6 +738,58 @@ Deno.test("jetstream - backoff", async () => {

await iter.closed();

const offset = when.map((n, idx) => {
const p = idx > 0 ? idx - 1 : 0;
return n - when[p];
});

offset.slice(1).forEach((n, idx) => {
assertAlmostEquals(n, ms[idx], 20);
});

await cleanup(ns, nc);
});

Deno.test("jetstream - redelivery", async () => {
const { ns, nc } = await setup(jetstreamServerConf({}));
if (await notCompatible(ns, nc, "2.7.2")) {
return;
}

const { stream, subj } = await initStream(nc);
const jsm = await jetstreamManager(nc);

const ci = await jsm.consumers.add(stream, {
durable_name: "me",
ack_policy: AckPolicy.Explicit,
max_deliver: 4,
ack_wait: nanos(1000),
});

assertEquals(ci.config.max_deliver, 4);

const js = jetstream(nc);
await js.publish(subj);

const c = await js.consumers.get(stream, "me");

let redeliveries = 0;
const iter = await c.consume({
callback: (m) => {
if (m.redelivered) {
redeliveries++;
}
if (m.info.redeliveryCount === 4) {
setTimeout(() => {
iter.stop();
}, 2000);
}
},
});

await iter.closed();
assertEquals(redeliveries, 3);

await cleanup(ns, nc);
});

Expand Down

0 comments on commit af0f6eb

Please sign in to comment.