Skip to content

Commit

Permalink
feat(jetstream): overflow
Browse files Browse the repository at this point in the history
Signed-off-by: Alberto Ricart <[email protected]>
  • Loading branch information
aricart committed Nov 7, 2024
1 parent 69eb332 commit ab3e053
Show file tree
Hide file tree
Showing 10 changed files with 585 additions and 40 deletions.
111 changes: 102 additions & 9 deletions jetstream/src/consumer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ import { toJsMsg } from "./jsmsg.ts";
import type {
ConsumerConfig,
ConsumerInfo,
OverflowMinPendingAndMinAck,
PullOptions,
} from "./jsapi_types.ts";
import { AckPolicy, DeliverPolicy } from "./jsapi_types.ts";
Expand All @@ -54,14 +55,21 @@ import type {
ConsumerCallbackFn,
ConsumerMessages,
ConsumerStatus,
Expires,
FetchMessages,
FetchOptions,
IdleHeartbeat,
MaxBytes,
MaxMessages,
NextOptions,
OrderedConsumerOptions,
PullConsumerOptions,
ThresholdBytes,
ThresholdMessages,
} from "./types.ts";
import { ConsumerDebugEvents, ConsumerEvents } from "./types.ts";
import { JetStreamStatus } from "./jserrors.ts";
import { minValidation } from "./jsutil.ts";

enum PullConsumerType {
Unset = -1,
Expand All @@ -85,10 +93,28 @@ export type PullConsumerInternalOptions = {
ordered?: OrderedConsumerOptions;
};

type InternalPullOptions =
& MaxMessages
& MaxBytes
& Expires
& IdleHeartbeat
& ThresholdMessages
& OverflowMinPendingAndMinAck
& ThresholdBytes;

export function isOverflowOptions(
opts: unknown,
): opts is OverflowMinPendingAndMinAck {
const oo = opts as OverflowMinPendingAndMinAck;
return oo && typeof oo.group === "string" ||
typeof oo.min_pending === "number" ||
typeof oo.min_ack_pending === "number";
}

export class PullConsumerMessagesImpl extends QueuedIteratorImpl<JsMsg>
implements ConsumerMessages {
consumer: PullConsumerImpl;
opts: Record<string, number>;
opts: InternalPullOptions;
sub!: Subscription;
monitor: IdleHeartbeatMonitor | null;
pending: { msgs: number; bytes: number; requests: number };
Expand Down Expand Up @@ -117,6 +143,13 @@ export class PullConsumerMessagesImpl extends QueuedIteratorImpl<JsMsg>
this.inbox = `${this.inboxPrefix}.${this.consumer.serial}`;

if (this.consumer.ordered) {
if (isOverflowOptions(opts)) {
throw errors.InvalidArgumentError.format([
"group",
"min_pending",
"min_ack_pending",
], "cannot be specified for ordered consumers");
}
if (this.consumer.orderedConsumerState === undefined) {
// initialize the state for the order consumer
const ocs = {} as OrderedConsumerState;
Expand Down Expand Up @@ -564,9 +597,21 @@ export class PullConsumerMessagesImpl extends QueuedIteratorImpl<JsMsg>
pullOptions(): Partial<PullOptions> {
const batch = this.opts.max_messages - this.pending.msgs;
const max_bytes = this.opts.max_bytes - this.pending.bytes;
const idle_heartbeat = nanos(this.opts.idle_heartbeat);
const expires = nanos(this.opts.expires);
return { batch, max_bytes, idle_heartbeat, expires };
const idle_heartbeat = nanos(this.opts.idle_heartbeat!);
const expires = nanos(this.opts.expires!);

const opts = { batch, max_bytes, idle_heartbeat, expires } as PullOptions;

if (isOverflowOptions(this.opts)) {
opts.group = this.opts.group;
if (this.opts.min_pending) {
opts.min_pending = this.opts.min_pending;
}
if (this.opts.min_ack_pending) {
opts.min_ack_pending = this.opts.min_ack_pending;
}
}
return opts;
}

trackTimeout(t: Timeout<unknown>) {
Expand Down Expand Up @@ -608,14 +653,15 @@ export class PullConsumerMessagesImpl extends QueuedIteratorImpl<JsMsg>
parseOptions(
opts: PullConsumerOptions,
refilling = false,
): Record<string, number> {
const args = (opts || {}) as Record<string, number>;
): InternalPullOptions {
const args = (opts || {}) as InternalPullOptions;
args.max_messages = args.max_messages || 0;
args.max_bytes = args.max_bytes || 0;

if (args.max_messages !== 0 && args.max_bytes !== 0) {
throw new Error(
`only specify one of max_messages or max_bytes`,
throw errors.InvalidArgumentError.format(
["max_messages", "max_bytes"],
"are mutually exclusive",
);
}

Expand Down Expand Up @@ -654,6 +700,19 @@ export class PullConsumerMessagesImpl extends QueuedIteratorImpl<JsMsg>
args.threshold_bytes = args.threshold_bytes || minBytes;
}

if (isOverflowOptions(opts)) {
validateOverflowPullOptions(opts);
if (opts.group) {
args.group = opts.group;
}
if (opts.min_ack_pending) {
args.min_ack_pending = opts.min_ack_pending;
}
if (opts.min_pending) {
args.min_pending = opts.min_pending;
}
}

return args;
}

Expand Down Expand Up @@ -784,7 +843,7 @@ export class PullConsumerImpl implements Consumer {
this.messages = m;
}
// FIXME: need some way to pad this correctly
const to = Math.round(m.opts.expires * 1.05);
const to = Math.round(m.opts.expires! * 1.05);
const timer = timeout(to);
m.closed().catch((err) => {
console.log(err);
Expand Down Expand Up @@ -857,3 +916,37 @@ export class PullConsumerImpl implements Consumer {
return this._info;
}
}

export function validateOverflowPullOptions(opts: unknown) {
if (isOverflowOptions(opts)) {
minValidation("group", opts.group);
if (opts.group.length > 16) {
throw errors.InvalidArgumentError.format(
"group",
"must be 16 characters or less",
);
}

const { min_pending, min_ack_pending } = opts;
if (!min_pending && !min_ack_pending) {
throw errors.InvalidArgumentError.format(
["min_pending", "min_ack_pending"],
"at least one must be specified",
);
}

if (min_pending && typeof min_pending !== "number") {
throw errors.InvalidArgumentError.format(
["min_pending"],
"must be a number",
);
}

if (min_ack_pending && typeof min_ack_pending !== "number") {
throw errors.InvalidArgumentError.format(
["min_ack_pending"],
"must be a number",
);
}
}
}
66 changes: 61 additions & 5 deletions jetstream/src/jsapi_types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -895,7 +895,12 @@ export interface JetStreamApiStats {
export interface AccountInfoResponse
extends ApiResponse, JetStreamAccountStats {}

export interface ConsumerConfig extends ConsumerUpdateConfig {
export type PriorityGroups = {
priority_groups?: string[];
priority_policy?: PriorityPolicy;
};

export type ConsumerConfig = ConsumerUpdateConfig & {
/**
* The type of acknowledgment required by the Consumer
*/
Expand Down Expand Up @@ -952,9 +957,9 @@ export interface ConsumerConfig extends ConsumerUpdateConfig {
* Specified as an ISO date time string (Date#toISOString()).
*/
"pause_until"?: string;
}
};

export interface ConsumerUpdateConfig {
export type ConsumerUpdateConfig = PriorityGroups & {
/**
* A short description of the purpose of this consume
*/
Expand Down Expand Up @@ -1037,6 +1042,11 @@ export interface ConsumerUpdateConfig {
* 2.10.x and better.
*/
metadata?: Record<string, string>;
};

export enum PriorityPolicy {
None = "none",
Overflow = "overflow",
}

export function defaultConsumer(
Expand All @@ -1052,12 +1062,54 @@ export function defaultConsumer(
}, opts);
}

export type OverflowMinPending = {
/**
* The name of the priority_group
*/
group: string;
/**
* Only deliver messages when num_pending for the consumer is greater than this value
*/
min_pending: number;
};

export type OverflowMinAckPending = {
/**
* The name of the priority_group
*/
group: string;
/**
* Only deliver messages when num_ack_pending for the consumer is greater than this value
*/
min_ack_pending: number;
};

export type OverflowMinPendingAndMinAck = {
/**
* The name of the priority_group
*/
group: string;
/**
* Only deliver messages when num_pending for the consumer is greater than this value
*/
min_pending: number;
/**
* Only deliver messages when num_ack_pending for the consumer is greater than this value
*/
min_ack_pending: number;
};

export type OverflowOptions =
| OverflowMinPending
| OverflowMinAckPending
| OverflowMinPendingAndMinAck;

/**
* Options for a JetStream pull subscription which define how long
* the pull request will remain open and limits the amount of data
* that the server could return.
*/
export interface PullOptions {
export type PullOptions = Partial<OverflowMinPendingAndMinAck> & {
/**
* Max number of messages to retrieve in a pull.
*/
Expand All @@ -1076,8 +1128,12 @@ export interface PullOptions {
* number of messages in the batch to fit within this setting.
*/
"max_bytes": number;

/**
* Number of nanos between messages for the server to emit an idle_heartbeat
*/
"idle_heartbeat": number;
}
};

export interface DeliveryInfo {
/**
Expand Down
31 changes: 21 additions & 10 deletions jetstream/src/jsclient.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,12 @@

import { BaseApiClientImpl } from "./jsbaseclient_api.ts";
import { ConsumerAPIImpl } from "./jsmconsumer_api.ts";
import { delay, Empty, QueuedIteratorImpl } from "@nats-io/nats-core/internal";
import {
backoff,
delay,
Empty,
QueuedIteratorImpl,
} from "@nats-io/nats-core/internal";

import { ConsumersImpl, StreamAPIImpl, StreamsImpl } from "./jsmstream_api.ts";

Expand All @@ -34,7 +39,7 @@ import type {
StreamAPI,
Streams,
} from "./types.ts";
import { errors, headers } from "@nats-io/nats-core/internal";
import { errors, headers, RequestError } from "@nats-io/nats-core/internal";

import type {
Msg,
Expand All @@ -49,7 +54,7 @@ import type {
JetStreamAccountStats,
} from "./jsapi_types.ts";
import { DirectStreamAPIImpl } from "./jsm.ts";
import { JetStreamError } from "./jserrors.ts";
import { JetStreamError, JetStreamNotEnabled } from "./jserrors.ts";

export function toJetStreamClient(
nc: NatsConnection | JetStreamClient,
Expand Down Expand Up @@ -205,29 +210,35 @@ export class JetStreamClientImpl extends BaseApiClientImpl
ro.headers = mh;
}

let { retries, retry_delay } = opts as {
let { retries } = opts as {
retries: number;
retry_delay: number;
};
retries = retries || 1;
retry_delay = retry_delay || 250;
const bo = backoff();

let r: Msg;
let r: Msg | null = null;
for (let i = 0; i < retries; i++) {
try {
r = await this.nc.request(subj, data, ro);
// if here we succeeded
break;
} catch (err) {
const re = err instanceof RequestError ? err as RequestError : null;
if (
err instanceof errors.RequestError && err.isNoResponders()
err instanceof errors.TimeoutError ||
re?.isNoResponders() && i + 1 < retries
) {
await delay(retry_delay);
await delay(bo.backoff(i));
} else {
throw err;
throw re?.isNoResponders()
? new JetStreamNotEnabled(`jetstream is not enabled`, {
cause: err,
})
: err;
}
}
}

const pa = this.parseJsResponse(r!) as PubAck;
if (pa.stream === "") {
throw new JetStreamError("invalid ack response");
Expand Down
Loading

0 comments on commit ab3e053

Please sign in to comment.