Skip to content

Commit

Permalink
feat(jetstream): priority_groups configuration on consumers and overf…
Browse files Browse the repository at this point in the history
…low support for pull consumer

https://github.com/nats-io/nats-architecture-and-design/blob/main/adr/ADR-42.md

Signed-off-by: Alberto Ricart <[email protected]>
  • Loading branch information
aricart committed Nov 8, 2024
1 parent 86ef437 commit efa88ca
Show file tree
Hide file tree
Showing 32 changed files with 729 additions and 92 deletions.
2 changes: 1 addition & 1 deletion core/deno.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "@nats-io/nats-core",
"version": "3.0.0-34",
"version": "3.0.0-35",
"exports": {
".": "./src/mod.ts",
"./internal": "./src/internal_mod.ts"
Expand Down
2 changes: 1 addition & 1 deletion core/package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "@nats-io/nats-core",
"version": "3.0.0-34",
"version": "3.0.0-35",
"files": [
"lib/",
"LICENSE",
Expand Down
2 changes: 1 addition & 1 deletion core/src/version.ts
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
// This file is generated - do not edit
export const version = "3.0.0-34";
export const version = "3.0.0-35";
4 changes: 2 additions & 2 deletions jetstream/deno.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "@nats-io/jetstream",
"version": "3.0.0-22",
"version": "3.0.0-23",
"exports": {
".": "./src/mod.ts",
"./internal": "./src/internal_mod.ts"
Expand Down Expand Up @@ -33,6 +33,6 @@
"test": "deno test -A --parallel --reload --trace-leaks --quiet tests/ --import-map=import_map.json"
},
"imports": {
"@nats-io/nats-core": "jsr:@nats-io/nats-core@~3.0.0-34"
"@nats-io/nats-core": "jsr:@nats-io/nats-core@~3.0.0-35"
}
}
4 changes: 2 additions & 2 deletions jetstream/import_map.json
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,8 @@
"imports": {
"@nats-io/nkeys": "jsr:@nats-io/[email protected]",
"@nats-io/nuid": "jsr:@nats-io/[email protected]",
"@nats-io/nats-core": "jsr:@nats-io/nats-core@~3.0.0-34",
"@nats-io/nats-core/internal": "jsr:@nats-io/nats-core@~3.0.0-34/internal",
"@nats-io/nats-core": "jsr:@nats-io/nats-core@~3.0.0-35",
"@nats-io/nats-core/internal": "jsr:@nats-io/nats-core@~3.0.0-35/internal",
"test_helpers": "../test_helpers/mod.ts",
"@std/io": "jsr:@std/[email protected]"
}
Expand Down
4 changes: 2 additions & 2 deletions jetstream/package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "@nats-io/jetstream",
"version": "3.0.0-22",
"version": "3.0.0-23",
"files": [
"lib/",
"LICENSE",
Expand Down Expand Up @@ -34,7 +34,7 @@
},
"description": "jetstream library - this library implements all the base functionality for NATS JetStream for javascript clients",
"dependencies": {
"@nats-io/nats-core": "3.0.0-34"
"@nats-io/nats-core": "3.0.0-35"
},
"devDependencies": {
"@types/node": "^22.7.6",
Expand Down
118 changes: 109 additions & 9 deletions jetstream/src/consumer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import {
delay,
errors,
Events,
Feature,
IdleHeartbeatMonitor,
nanos,
nuid,
Expand All @@ -43,6 +44,7 @@ import { toJsMsg } from "./jsmsg.ts";
import type {
ConsumerConfig,
ConsumerInfo,
OverflowMinPendingAndMinAck,
PullOptions,
} from "./jsapi_types.ts";
import { AckPolicy, DeliverPolicy } from "./jsapi_types.ts";
Expand All @@ -54,14 +56,21 @@ import type {
ConsumerCallbackFn,
ConsumerMessages,
ConsumerStatus,
Expires,
FetchMessages,
FetchOptions,
IdleHeartbeat,
MaxBytes,
MaxMessages,
NextOptions,
OrderedConsumerOptions,
PullConsumerOptions,
ThresholdBytes,
ThresholdMessages,
} from "./types.ts";
import { ConsumerDebugEvents, ConsumerEvents } from "./types.ts";
import { JetStreamStatus } from "./jserrors.ts";
import { minValidation } from "./jsutil.ts";

enum PullConsumerType {
Unset = -1,
Expand All @@ -85,10 +94,28 @@ export type PullConsumerInternalOptions = {
ordered?: OrderedConsumerOptions;
};

type InternalPullOptions =
& MaxMessages
& MaxBytes
& Expires
& IdleHeartbeat
& ThresholdMessages
& OverflowMinPendingAndMinAck
& ThresholdBytes;

export function isOverflowOptions(
opts: unknown,
): opts is OverflowMinPendingAndMinAck {
const oo = opts as OverflowMinPendingAndMinAck;
return oo && typeof oo.group === "string" ||
typeof oo.min_pending === "number" ||
typeof oo.min_ack_pending === "number";
}

export class PullConsumerMessagesImpl extends QueuedIteratorImpl<JsMsg>
implements ConsumerMessages {
consumer: PullConsumerImpl;
opts: Record<string, number>;
opts: InternalPullOptions;
sub!: Subscription;
monitor: IdleHeartbeatMonitor | null;
pending: { msgs: number; bytes: number; requests: number };
Expand Down Expand Up @@ -117,6 +144,13 @@ export class PullConsumerMessagesImpl extends QueuedIteratorImpl<JsMsg>
this.inbox = `${this.inboxPrefix}.${this.consumer.serial}`;

if (this.consumer.ordered) {
if (isOverflowOptions(opts)) {
throw errors.InvalidArgumentError.format([
"group",
"min_pending",
"min_ack_pending",
], "cannot be specified for ordered consumers");
}
if (this.consumer.orderedConsumerState === undefined) {
// initialize the state for the order consumer
const ocs = {} as OrderedConsumerState;
Expand Down Expand Up @@ -564,9 +598,21 @@ export class PullConsumerMessagesImpl extends QueuedIteratorImpl<JsMsg>
pullOptions(): Partial<PullOptions> {
const batch = this.opts.max_messages - this.pending.msgs;
const max_bytes = this.opts.max_bytes - this.pending.bytes;
const idle_heartbeat = nanos(this.opts.idle_heartbeat);
const expires = nanos(this.opts.expires);
return { batch, max_bytes, idle_heartbeat, expires };
const idle_heartbeat = nanos(this.opts.idle_heartbeat!);
const expires = nanos(this.opts.expires!);

const opts = { batch, max_bytes, idle_heartbeat, expires } as PullOptions;

if (isOverflowOptions(this.opts)) {
opts.group = this.opts.group;
if (this.opts.min_pending) {
opts.min_pending = this.opts.min_pending;
}
if (this.opts.min_ack_pending) {
opts.min_ack_pending = this.opts.min_ack_pending;
}
}
return opts;
}

trackTimeout(t: Timeout<unknown>) {
Expand Down Expand Up @@ -608,14 +654,15 @@ export class PullConsumerMessagesImpl extends QueuedIteratorImpl<JsMsg>
parseOptions(
opts: PullConsumerOptions,
refilling = false,
): Record<string, number> {
const args = (opts || {}) as Record<string, number>;
): InternalPullOptions {
const args = (opts || {}) as InternalPullOptions;
args.max_messages = args.max_messages || 0;
args.max_bytes = args.max_bytes || 0;

if (args.max_messages !== 0 && args.max_bytes !== 0) {
throw new Error(
`only specify one of max_messages or max_bytes`,
throw errors.InvalidArgumentError.format(
["max_messages", "max_bytes"],
"are mutually exclusive",
);
}

Expand Down Expand Up @@ -654,6 +701,25 @@ export class PullConsumerMessagesImpl extends QueuedIteratorImpl<JsMsg>
args.threshold_bytes = args.threshold_bytes || minBytes;
}

if (isOverflowOptions(opts)) {
const { min, ok } = this.consumer.api.nc.features.get(
Feature.JS_PRIORITY_GROUPS,
);
if (!ok) {
throw new Error(`priority_groups require server ${min}`);
}
validateOverflowPullOptions(opts);
if (opts.group) {
args.group = opts.group;
}
if (opts.min_ack_pending) {
args.min_ack_pending = opts.min_ack_pending;
}
if (opts.min_pending) {
args.min_pending = opts.min_pending;
}
}

return args;
}

Expand Down Expand Up @@ -784,7 +850,7 @@ export class PullConsumerImpl implements Consumer {
this.messages = m;
}
// FIXME: need some way to pad this correctly
const to = Math.round(m.opts.expires * 1.05);
const to = Math.round(m.opts.expires! * 1.05);
const timer = timeout(to);
m.closed().catch((err) => {
console.log(err);
Expand Down Expand Up @@ -857,3 +923,37 @@ export class PullConsumerImpl implements Consumer {
return this._info;
}
}

export function validateOverflowPullOptions(opts: unknown) {
if (isOverflowOptions(opts)) {
minValidation("group", opts.group);
if (opts.group.length > 16) {
throw errors.InvalidArgumentError.format(
"group",
"must be 16 characters or less",
);
}

const { min_pending, min_ack_pending } = opts;
if (!min_pending && !min_ack_pending) {
throw errors.InvalidArgumentError.format(
["min_pending", "min_ack_pending"],
"at least one must be specified",
);
}

if (min_pending && typeof min_pending !== "number") {
throw errors.InvalidArgumentError.format(
["min_pending"],
"must be a number",
);
}

if (min_ack_pending && typeof min_ack_pending !== "number") {
throw errors.InvalidArgumentError.format(
["min_ack_pending"],
"must be a number",
);
}
}
}
66 changes: 61 additions & 5 deletions jetstream/src/jsapi_types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -895,7 +895,12 @@ export interface JetStreamApiStats {
export interface AccountInfoResponse
extends ApiResponse, JetStreamAccountStats {}

export interface ConsumerConfig extends ConsumerUpdateConfig {
export type PriorityGroups = {
priority_groups?: string[];
priority_policy?: PriorityPolicy;
};

export type ConsumerConfig = ConsumerUpdateConfig & {
/**
* The type of acknowledgment required by the Consumer
*/
Expand Down Expand Up @@ -952,9 +957,9 @@ export interface ConsumerConfig extends ConsumerUpdateConfig {
* Specified as an ISO date time string (Date#toISOString()).
*/
"pause_until"?: string;
}
};

export interface ConsumerUpdateConfig {
export type ConsumerUpdateConfig = PriorityGroups & {
/**
* A short description of the purpose of this consume
*/
Expand Down Expand Up @@ -1037,6 +1042,11 @@ export interface ConsumerUpdateConfig {
* 2.10.x and better.
*/
metadata?: Record<string, string>;
};

export enum PriorityPolicy {
None = "none",
Overflow = "overflow",
}

export function defaultConsumer(
Expand All @@ -1052,12 +1062,54 @@ export function defaultConsumer(
}, opts);
}

export type OverflowMinPending = {
/**
* The name of the priority_group
*/
group: string;
/**
* Only deliver messages when num_pending for the consumer is greater than this value
*/
min_pending: number;
};

export type OverflowMinAckPending = {
/**
* The name of the priority_group
*/
group: string;
/**
* Only deliver messages when num_ack_pending for the consumer is greater than this value
*/
min_ack_pending: number;
};

export type OverflowMinPendingAndMinAck = {
/**
* The name of the priority_group
*/
group: string;
/**
* Only deliver messages when num_pending for the consumer is greater than this value
*/
min_pending: number;
/**
* Only deliver messages when num_ack_pending for the consumer is greater than this value
*/
min_ack_pending: number;
};

export type OverflowOptions =
| OverflowMinPending
| OverflowMinAckPending
| OverflowMinPendingAndMinAck;

/**
* Options for a JetStream pull subscription which define how long
* the pull request will remain open and limits the amount of data
* that the server could return.
*/
export interface PullOptions {
export type PullOptions = Partial<OverflowMinPendingAndMinAck> & {
/**
* Max number of messages to retrieve in a pull.
*/
Expand All @@ -1076,8 +1128,12 @@ export interface PullOptions {
* number of messages in the batch to fit within this setting.
*/
"max_bytes": number;

/**
* Number of nanos between messages for the server to emit an idle_heartbeat
*/
"idle_heartbeat": number;
}
};

export interface DeliveryInfo {
/**
Expand Down
Loading

0 comments on commit efa88ca

Please sign in to comment.