diff --git a/jetstream/src/consumer.ts b/jetstream/src/consumer.ts index 559673ce..81aff680 100644 --- a/jetstream/src/consumer.ts +++ b/jetstream/src/consumer.ts @@ -43,6 +43,7 @@ import { toJsMsg } from "./jsmsg.ts"; import type { ConsumerConfig, ConsumerInfo, + OverflowMinPendingAndMinAck, PullOptions, } from "./jsapi_types.ts"; import { AckPolicy, DeliverPolicy } from "./jsapi_types.ts"; @@ -54,14 +55,21 @@ import type { ConsumerCallbackFn, ConsumerMessages, ConsumerStatus, + Expires, FetchMessages, FetchOptions, + IdleHeartbeat, + MaxBytes, + MaxMessages, NextOptions, OrderedConsumerOptions, PullConsumerOptions, + ThresholdBytes, + ThresholdMessages, } from "./types.ts"; import { ConsumerDebugEvents, ConsumerEvents } from "./types.ts"; import { JetStreamStatus } from "./jserrors.ts"; +import { minValidation } from "./jsutil.ts"; enum PullConsumerType { Unset = -1, @@ -85,10 +93,28 @@ export type PullConsumerInternalOptions = { ordered?: OrderedConsumerOptions; }; +type InternalPullOptions = + & MaxMessages + & MaxBytes + & Expires + & IdleHeartbeat + & ThresholdMessages + & OverflowMinPendingAndMinAck + & ThresholdBytes; + +export function isOverflowOptions( + opts: unknown, +): opts is OverflowMinPendingAndMinAck { + const oo = opts as OverflowMinPendingAndMinAck; + return oo && typeof oo.group === "string" || + typeof oo.min_pending === "number" || + typeof oo.min_ack_pending === "number"; +} + export class PullConsumerMessagesImpl extends QueuedIteratorImpl implements ConsumerMessages { consumer: PullConsumerImpl; - opts: Record; + opts: InternalPullOptions; sub!: Subscription; monitor: IdleHeartbeatMonitor | null; pending: { msgs: number; bytes: number; requests: number }; @@ -117,6 +143,13 @@ export class PullConsumerMessagesImpl extends QueuedIteratorImpl this.inbox = `${this.inboxPrefix}.${this.consumer.serial}`; if (this.consumer.ordered) { + if (isOverflowOptions(opts)) { + throw errors.InvalidArgumentError.format([ + "group", + "min_pending", + "min_ack_pending", + ], "cannot be specified for ordered consumers"); + } if (this.consumer.orderedConsumerState === undefined) { // initialize the state for the order consumer const ocs = {} as OrderedConsumerState; @@ -564,9 +597,21 @@ export class PullConsumerMessagesImpl extends QueuedIteratorImpl pullOptions(): Partial { const batch = this.opts.max_messages - this.pending.msgs; const max_bytes = this.opts.max_bytes - this.pending.bytes; - const idle_heartbeat = nanos(this.opts.idle_heartbeat); - const expires = nanos(this.opts.expires); - return { batch, max_bytes, idle_heartbeat, expires }; + const idle_heartbeat = nanos(this.opts.idle_heartbeat!); + const expires = nanos(this.opts.expires!); + + const opts = { batch, max_bytes, idle_heartbeat, expires } as PullOptions; + + if (isOverflowOptions(this.opts)) { + opts.group = this.opts.group; + if (this.opts.min_pending) { + opts.min_pending = this.opts.min_pending; + } + if (this.opts.min_ack_pending) { + opts.min_ack_pending = this.opts.min_ack_pending; + } + } + return opts; } trackTimeout(t: Timeout) { @@ -608,14 +653,15 @@ export class PullConsumerMessagesImpl extends QueuedIteratorImpl parseOptions( opts: PullConsumerOptions, refilling = false, - ): Record { - const args = (opts || {}) as Record; + ): InternalPullOptions { + const args = (opts || {}) as InternalPullOptions; args.max_messages = args.max_messages || 0; args.max_bytes = args.max_bytes || 0; if (args.max_messages !== 0 && args.max_bytes !== 0) { - throw new Error( - `only specify one of max_messages or max_bytes`, + throw errors.InvalidArgumentError.format( + ["max_messages", "max_bytes"], + "are mutually exclusive", ); } @@ -654,6 +700,19 @@ export class PullConsumerMessagesImpl extends QueuedIteratorImpl args.threshold_bytes = args.threshold_bytes || minBytes; } + if (isOverflowOptions(opts)) { + validateOverflowPullOptions(opts); + if (opts.group) { + args.group = opts.group; + } + if (opts.min_ack_pending) { + args.min_ack_pending = opts.min_ack_pending; + } + if (opts.min_pending) { + args.min_pending = opts.min_pending; + } + } + return args; } @@ -784,7 +843,7 @@ export class PullConsumerImpl implements Consumer { this.messages = m; } // FIXME: need some way to pad this correctly - const to = Math.round(m.opts.expires * 1.05); + const to = Math.round(m.opts.expires! * 1.05); const timer = timeout(to); m.closed().catch((err) => { console.log(err); @@ -857,3 +916,37 @@ export class PullConsumerImpl implements Consumer { return this._info; } } + +export function validateOverflowPullOptions(opts: unknown) { + if (isOverflowOptions(opts)) { + minValidation("group", opts.group); + if (opts.group.length > 16) { + throw errors.InvalidArgumentError.format( + "group", + "must be 16 characters or less", + ); + } + + const { min_pending, min_ack_pending } = opts; + if (!min_pending && !min_ack_pending) { + throw errors.InvalidArgumentError.format( + ["min_pending", "min_ack_pending"], + "at least one must be specified", + ); + } + + if (min_pending && typeof min_pending !== "number") { + throw errors.InvalidArgumentError.format( + ["min_pending"], + "must be a number", + ); + } + + if (min_ack_pending && typeof min_ack_pending !== "number") { + throw errors.InvalidArgumentError.format( + ["min_ack_pending"], + "must be a number", + ); + } + } +} diff --git a/jetstream/src/jsapi_types.ts b/jetstream/src/jsapi_types.ts index 0f8097be..85ceb80f 100644 --- a/jetstream/src/jsapi_types.ts +++ b/jetstream/src/jsapi_types.ts @@ -895,7 +895,12 @@ export interface JetStreamApiStats { export interface AccountInfoResponse extends ApiResponse, JetStreamAccountStats {} -export interface ConsumerConfig extends ConsumerUpdateConfig { +export type PriorityGroups = { + priority_groups?: string[]; + priority_policy?: PriorityPolicy; +}; + +export type ConsumerConfig = ConsumerUpdateConfig & { /** * The type of acknowledgment required by the Consumer */ @@ -952,9 +957,9 @@ export interface ConsumerConfig extends ConsumerUpdateConfig { * Specified as an ISO date time string (Date#toISOString()). */ "pause_until"?: string; -} +}; -export interface ConsumerUpdateConfig { +export type ConsumerUpdateConfig = PriorityGroups & { /** * A short description of the purpose of this consume */ @@ -1037,6 +1042,11 @@ export interface ConsumerUpdateConfig { * 2.10.x and better. */ metadata?: Record; +}; + +export enum PriorityPolicy { + None = "none", + Overflow = "overflow", } export function defaultConsumer( @@ -1052,12 +1062,54 @@ export function defaultConsumer( }, opts); } +export type OverflowMinPending = { + /** + * The name of the priority_group + */ + group: string; + /** + * Only deliver messages when num_pending for the consumer is greater than this value + */ + min_pending: number; +}; + +export type OverflowMinAckPending = { + /** + * The name of the priority_group + */ + group: string; + /** + * Only deliver messages when num_ack_pending for the consumer is greater than this value + */ + min_ack_pending: number; +}; + +export type OverflowMinPendingAndMinAck = { + /** + * The name of the priority_group + */ + group: string; + /** + * Only deliver messages when num_pending for the consumer is greater than this value + */ + min_pending: number; + /** + * Only deliver messages when num_ack_pending for the consumer is greater than this value + */ + min_ack_pending: number; +}; + +export type OverflowOptions = + | OverflowMinPending + | OverflowMinAckPending + | OverflowMinPendingAndMinAck; + /** * Options for a JetStream pull subscription which define how long * the pull request will remain open and limits the amount of data * that the server could return. */ -export interface PullOptions { +export type PullOptions = Partial & { /** * Max number of messages to retrieve in a pull. */ @@ -1076,8 +1128,12 @@ export interface PullOptions { * number of messages in the batch to fit within this setting. */ "max_bytes": number; + + /** + * Number of nanos between messages for the server to emit an idle_heartbeat + */ "idle_heartbeat": number; -} +}; export interface DeliveryInfo { /** diff --git a/jetstream/src/jsclient.ts b/jetstream/src/jsclient.ts index af1a2345..4f6c0ab4 100644 --- a/jetstream/src/jsclient.ts +++ b/jetstream/src/jsclient.ts @@ -15,7 +15,12 @@ import { BaseApiClientImpl } from "./jsbaseclient_api.ts"; import { ConsumerAPIImpl } from "./jsmconsumer_api.ts"; -import { delay, Empty, QueuedIteratorImpl } from "@nats-io/nats-core/internal"; +import { + backoff, + delay, + Empty, + QueuedIteratorImpl, +} from "@nats-io/nats-core/internal"; import { ConsumersImpl, StreamAPIImpl, StreamsImpl } from "./jsmstream_api.ts"; @@ -34,7 +39,7 @@ import type { StreamAPI, Streams, } from "./types.ts"; -import { errors, headers } from "@nats-io/nats-core/internal"; +import { errors, headers, RequestError } from "@nats-io/nats-core/internal"; import type { Msg, @@ -49,7 +54,7 @@ import type { JetStreamAccountStats, } from "./jsapi_types.ts"; import { DirectStreamAPIImpl } from "./jsm.ts"; -import { JetStreamError } from "./jserrors.ts"; +import { JetStreamError, JetStreamNotEnabled } from "./jserrors.ts"; export function toJetStreamClient( nc: NatsConnection | JetStreamClient, @@ -205,29 +210,35 @@ export class JetStreamClientImpl extends BaseApiClientImpl ro.headers = mh; } - let { retries, retry_delay } = opts as { + let { retries } = opts as { retries: number; - retry_delay: number; }; retries = retries || 1; - retry_delay = retry_delay || 250; + const bo = backoff(); - let r: Msg; + let r: Msg | null = null; for (let i = 0; i < retries; i++) { try { r = await this.nc.request(subj, data, ro); // if here we succeeded break; } catch (err) { + const re = err instanceof RequestError ? err as RequestError : null; if ( - err instanceof errors.RequestError && err.isNoResponders() + err instanceof errors.TimeoutError || + re?.isNoResponders() && i + 1 < retries ) { - await delay(retry_delay); + await delay(bo.backoff(i)); } else { - throw err; + throw re?.isNoResponders() + ? new JetStreamNotEnabled(`jetstream is not enabled`, { + cause: err, + }) + : err; } } } + const pa = this.parseJsResponse(r!) as PubAck; if (pa.stream === "") { throw new JetStreamError("invalid ack response"); diff --git a/jetstream/src/jsmconsumer_api.ts b/jetstream/src/jsmconsumer_api.ts index e507ca24..3f9e5b05 100644 --- a/jetstream/src/jsmconsumer_api.ts +++ b/jetstream/src/jsmconsumer_api.ts @@ -24,17 +24,21 @@ import type { NatsConnection, NatsConnectionImpl, } from "@nats-io/nats-core/internal"; -import { Feature, InvalidArgumentError } from "@nats-io/nats-core/internal"; -import { ConsumerApiAction } from "./jsapi_types.ts"; - +import { + errors, + Feature, + InvalidArgumentError, +} from "@nats-io/nats-core/internal"; import type { ConsumerConfig, ConsumerInfo, ConsumerListResponse, ConsumerUpdateConfig, CreateConsumerRequest, + PriorityGroups, SuccessResponse, } from "./jsapi_types.ts"; +import { ConsumerApiAction, PriorityPolicy } from "./jsapi_types.ts"; import type { ConsumerAPI, @@ -68,6 +72,16 @@ export class ConsumerAPIImpl extends BaseApiClientImpl implements ConsumerAPI { ); } + if (isPriorityGroup(cfg)) { + if (cfg.deliver_subject) { + throw InvalidArgumentError.format( + "deliver_subject", + "cannot be set when using priority groups", + ); + } + validatePriorityGroups(cfg); + } + const cr = {} as CreateConsumerRequest; cr.config = cfg; cr.stream_name = stream; @@ -214,3 +228,44 @@ export class ConsumerAPIImpl extends BaseApiClientImpl implements ConsumerAPI { >; } } + +function isPriorityGroup(config: unknown): config is PriorityGroups { + const pg = config as PriorityGroups; + return pg && pg.priority_groups !== undefined || + pg.priority_policy !== undefined; +} + +function validatePriorityGroups(pg: unknown): void { + if (isPriorityGroup(pg)) { + if (!Array.isArray(pg.priority_groups)) { + throw InvalidArgumentError.format( + ["priority_groups"], + "must be an array", + ); + } + if (pg.priority_groups.length === 0) { + throw InvalidArgumentError.format( + ["priority_groups"], + "must have at least one group", + ); + } + pg.priority_groups.forEach((g) => { + minValidation("priority_group", g); + if (g.length > 16) { + throw errors.InvalidArgumentError.format( + "group", + "must be 16 characters or less", + ); + } + }); + if ( + pg.priority_policy !== PriorityPolicy.None && + pg.priority_policy !== PriorityPolicy.Overflow + ) { + throw InvalidArgumentError.format( + ["priority_policy"], + "must be 'none' or 'overflow'", + ); + } + } +} diff --git a/jetstream/src/types.ts b/jetstream/src/types.ts index 957cf574..37022700 100644 --- a/jetstream/src/types.ts +++ b/jetstream/src/types.ts @@ -31,6 +31,7 @@ import type { DirectMsgRequest, JetStreamAccountStats, MsgRequest, + OverflowOptions, PurgeOpts, PurgeResponse, StreamAlternate, @@ -350,7 +351,8 @@ export type ConsumeBytes = & IdleHeartbeat & ConsumeCallback & AbortOnMissingResource - & Bind; + & Bind + & Partial; export type ConsumeMessages = & Partial & Partial @@ -358,7 +360,8 @@ export type ConsumeMessages = & IdleHeartbeat & ConsumeCallback & AbortOnMissingResource - & Bind; + & Bind + & Partial; export type ConsumeOptions = | ConsumeBytes | ConsumeMessages; @@ -370,7 +373,8 @@ export type FetchBytes = & Partial & Expires & IdleHeartbeat - & Bind; + & Bind + & Partial; /** * Options for fetching messages */ @@ -378,7 +382,9 @@ export type FetchMessages = & Partial & Expires & IdleHeartbeat - & Bind; + & Bind + & Partial; + export type FetchOptions = FetchBytes | FetchMessages; export type PullConsumerOptions = FetchOptions | ConsumeOptions; export type MaxMessages = { diff --git a/jetstream/tests/consumers_test.ts b/jetstream/tests/consumers_test.ts index f1029bda..d3a4401c 100644 --- a/jetstream/tests/consumers_test.ts +++ b/jetstream/tests/consumers_test.ts @@ -250,7 +250,7 @@ Deno.test("consumers - bad options", async () => { await c.consume({ max_messages: 100, max_bytes: 100 }); }, Error, - "only specify one of max_messages or max_bytes", + "'max_messages','max_bytes' are mutually exclusive", ); await assertRejects( diff --git a/jetstream/tests/jetstream_pullconsumer_test.ts b/jetstream/tests/jetstream_pullconsumer_test.ts index af28e70c..0c310752 100644 --- a/jetstream/tests/jetstream_pullconsumer_test.ts +++ b/jetstream/tests/jetstream_pullconsumer_test.ts @@ -21,11 +21,20 @@ import { setup, } from "test_helpers"; import { initStream } from "./jstest_util.ts"; -import type { ConsumerConfig } from "../src/jsapi_types.ts"; -import { AckPolicy, DeliverPolicy } from "../src/jsapi_types.ts"; +import { + AckPolicy, + type ConsumerConfig, + DeliverPolicy, + type OverflowMinPendingAndMinAck, + PriorityPolicy, +} from "../src/jsapi_types.ts"; import { assertEquals, assertExists } from "jsr:@std/assert"; -import { Empty, nanos, nuid } from "@nats-io/nats-core"; -import { jetstream, jetstreamManager } from "../src/mod.ts"; +import { deferred, Empty, type Msg, nanos, nuid } from "@nats-io/nats-core"; +import { + type ConsumeOptions, + jetstream, + jetstreamManager, +} from "../src/mod.ts"; Deno.test("jetstream - pull consumer options", async () => { const { ns, nc } = await setup(jetstreamServerConf({})); @@ -120,3 +129,168 @@ Deno.test("jetstream - last of", async () => { await cleanup(ns, nc); }); + +Deno.test("jetstream - priority group", async (t) => { + const { ns, nc } = await setup(jetstreamServerConf({})); + const jsm = await jetstreamManager(nc); + await jsm.streams.add({ + name: "A", + subjects: [`a`], + }); + + const js = jetstream(nc); + + const buf = []; + for (let i = 0; i < 100; i++) { + buf.push(js.publish("a", Empty)); + } + + await Promise.all(buf); + + const opts = { + durable_name: "a", + deliver_policy: DeliverPolicy.All, + ack_policy: AckPolicy.Explicit, + priority_groups: ["overflow"], + priority_policy: PriorityPolicy.Overflow, + }; + + await jsm.consumers.add("A", opts); + + function spyPull(): Promise { + const d = deferred(); + nc.subscribe(`$JS.API.CONSUMER.MSG.NEXT.A.a`, { + callback: (err, msg) => { + if (err) { + d.reject(err); + } + d.resolve(msg); + }, + }); + + return d; + } + + await t.step("consume", async () => { + async function check(opts: ConsumeOptions): Promise { + const c = await js.consumers.get("A", "a"); + + const d = spyPull(); + const c1 = await c.consume(opts); + const done = (async () => { + for await (const m of c1) { + m.ack(); + } + })(); + + const m = await d; + c1.stop(); + await done; + + const po = m.json(); + const oopts = opts as OverflowMinPendingAndMinAck; + assertEquals(po.group, opts.group); + assertEquals(po.min_ack_pending, oopts.min_ack_pending); + assertEquals(po.min_pending, oopts.min_pending); + } + + await check({ + max_messages: 2, + group: "overflow", + min_ack_pending: 2, + }); + + await check({ + max_messages: 2, + group: "overflow", + min_pending: 10, + }); + + await check({ + max_messages: 2, + group: "overflow", + min_pending: 10, + min_ack_pending: 100, + }); + }); + + await t.step("fetch", async () => { + async function check(opts: ConsumeOptions): Promise { + const c = await js.consumers.get("A", "a"); + + const d = spyPull(); + const iter = await c.fetch(opts); + for await (const m of iter) { + m.ack(); + } + + const m = await d; + const po = m.json(); + const oopts = opts as OverflowMinPendingAndMinAck; + assertEquals(po.group, opts.group); + assertEquals(po.min_ack_pending, oopts.min_ack_pending); + assertEquals(po.min_pending, oopts.min_pending); + } + + await check({ + max_messages: 2, + group: "overflow", + min_ack_pending: 2, + expires: 1000, + }); + + await check({ + max_messages: 2, + group: "overflow", + min_pending: 10, + expires: 1000, + }); + + await check({ + max_messages: 2, + group: "overflow", + min_pending: 10, + min_ack_pending: 100, + expires: 1000, + }); + }); + + await t.step("next", async () => { + async function check(opts: ConsumeOptions): Promise { + const c = await js.consumers.get("A", "a"); + const d = spyPull(); + await c.next(opts); + + const m = await d; + const po = m.json(); + const oopts = opts as OverflowMinPendingAndMinAck; + assertEquals(po.group, opts.group); + assertEquals(po.min_ack_pending, oopts.min_ack_pending); + assertEquals(po.min_pending, oopts.min_pending); + } + + await check({ + max_messages: 2, + group: "overflow", + min_ack_pending: 2, + expires: 1000, + }); + + await check({ + max_messages: 2, + group: "overflow", + min_pending: 10, + expires: 1000, + }); + + await check({ + max_messages: 2, + group: "overflow", + min_pending: 10, + min_ack_pending: 100, + expires: 1000, + }); + }); + + await cleanup(ns, nc); +}); diff --git a/jetstream/tests/jetstream_pushconsumer_test.ts b/jetstream/tests/jetstream_pushconsumer_test.ts index fcf69781..cdfe5baf 100644 --- a/jetstream/tests/jetstream_pushconsumer_test.ts +++ b/jetstream/tests/jetstream_pushconsumer_test.ts @@ -35,15 +35,20 @@ import { nuid, syncIterator, } from "@nats-io/nats-core"; -import { ConsumerDebugEvents, ConsumerEvents } from "../src/types.ts"; import type { BoundPushConsumerOptions, PubAck } from "../src/types.ts"; +import { ConsumerDebugEvents, ConsumerEvents } from "../src/types.ts"; import { assert, assertEquals, assertExists, assertRejects, } from "jsr:@std/assert"; -import { AckPolicy, DeliverPolicy, StorageType } from "../src/jsapi_types.ts"; +import { + AckPolicy, + DeliverPolicy, + PriorityPolicy, + StorageType, +} from "../src/jsapi_types.ts"; import type { JsMsg } from "../src/jsmsg.ts"; import { jetstream, jetstreamManager } from "../src/mod.ts"; import type { @@ -989,3 +994,23 @@ Deno.test("jetstream - ordered push consumer honors inbox prefix", async () => { await cleanup(ns, nc); }); + +Deno.test("jetstream - push consumer doesn't support priority groups", async () => { + const { ns, nc } = await setup(jetstreamServerConf()); + const jsm = await jetstreamManager(nc); + await jsm.streams.add({ name: "A", subjects: ["test"] }); + + await assertRejects( + () => { + return jsm.consumers.add("A", { + ack_policy: AckPolicy.None, + deliver_subject: "foo", + priority_groups: ["hello"], + priority_policy: PriorityPolicy.Overflow, + }); + }, + Error, + "'deliver_subject' cannot be set when using priority groups", + ); + await cleanup(ns, nc); +}); diff --git a/jetstream/tests/jetstream_test.ts b/jetstream/tests/jetstream_test.ts index 90f0dc00..aa3ecf59 100644 --- a/jetstream/tests/jetstream_test.ts +++ b/jetstream/tests/jetstream_test.ts @@ -32,12 +32,15 @@ import { Empty, headers, nanos, + NoRespondersError, nuid, + RequestError, } from "@nats-io/nats-core"; import { assert, assertEquals, assertExists, + assertInstanceOf, assertRejects, assertThrows, } from "jsr:@std/assert"; @@ -52,7 +55,7 @@ import { setup, } from "test_helpers"; import { PubHeaders } from "../src/jsapi_types.ts"; -import { JetStreamApiError } from "../src/jserrors.ts"; +import { JetStreamApiError, JetStreamNotEnabled } from "../src/jserrors.ts"; Deno.test("jetstream - default options", () => { const opts = defaultJsOptions(); @@ -1076,3 +1079,39 @@ Deno.test("jetstream - term reason", async () => { await cleanup(ns, nc); }); + +Deno.test("jetstream - publish no responder", async (t) => { + await t.step("not a jetstream server", async () => { + const { ns, nc } = await setup(); + const js = jetstream(nc); + const err = await assertRejects( + () => { + return js.publish("hello"); + }, + JetStreamNotEnabled, + ); + + assertInstanceOf(err.cause, RequestError); + assertInstanceOf(err.cause?.cause, NoRespondersError); + + await cleanup(ns, nc); + }); + + await t.step("jetstream not listening for subject", async () => { + const { ns, nc } = await setup(jetstreamServerConf()); + const jsm = await jetstreamManager(nc); + await jsm.streams.add({ name: "s", subjects: ["a", "b"] }); + const js = jetstream(nc); + const err = await assertRejects( + () => { + return js.publish("c"); + }, + JetStreamNotEnabled, + ); + + assertInstanceOf(err.cause, RequestError); + assertInstanceOf(err.cause?.cause, NoRespondersError); + + await cleanup(ns, nc); + }); +}); diff --git a/jetstream/tests/jsm_test.ts b/jetstream/tests/jsm_test.ts index 923178d6..1cd69c24 100644 --- a/jetstream/tests/jsm_test.ts +++ b/jetstream/tests/jsm_test.ts @@ -72,7 +72,11 @@ import { } from "jsr:@nats-io/jwt@0.0.9-3"; import { convertStreamSourceDomain } from "../src/jsmstream_api.ts"; import type { ConsumerAPIImpl } from "../src/jsmconsumer_api.ts"; -import { ConsumerApiAction, StoreCompression } from "../src/jsapi_types.ts"; +import { + ConsumerApiAction, + PriorityPolicy, + StoreCompression, +} from "../src/jsapi_types.ts"; import type { JetStreamManagerImpl } from "../src/jsclient.ts"; import { stripNatsMetadata } from "./util.ts"; import { jserrors } from "../src/jserrors.ts"; @@ -2754,3 +2758,85 @@ Deno.test("jsm - storage", async () => { await cleanup(ns, nc); }); + +Deno.test("jsm - pull consumer priority groups", async (t) => { + const { ns, nc } = await setup(jetstreamServerConf({})); + const jsm = await jetstreamManager(nc); + await jsm.streams.add({ + name: "A", + subjects: [`a`], + }); + + await t.step("priority group is not an array", async () => { + await assertRejects( + () => { + return jsm.consumers.add("A", { + name: "a", + ack_policy: AckPolicy.None, + //@ts-ignore: testing + priority_groups: "hello", + }); + }, + Error, + "'priority_groups' must be an array", + ); + }); + + await t.step("priority_group empty array", async () => { + await assertRejects( + () => { + return jsm.consumers.add("A", { + name: "a", + ack_policy: AckPolicy.None, + //@ts-ignore: testing + priority_groups: [], + }); + }, + Error, + "'priority_groups' must have at least one group", + ); + }); + + await t.step("missing priority_policy ", async () => { + await assertRejects( + () => { + return jsm.consumers.add("A", { + name: "a", + ack_policy: AckPolicy.None, + priority_groups: ["hello"], + }); + }, + Error, + "'priority_policy' must be 'none' or 'overflow'", + ); + }); + + await t.step("bad priority_policy ", async () => { + await assertRejects( + () => { + return jsm.consumers.add("A", { + name: "a", + ack_policy: AckPolicy.None, + priority_groups: ["hello"], + //@ts-ignore: test + priority_policy: "hello", + }); + }, + Error, + "'priority_policy' must be 'none' or 'overflow'", + ); + }); + + await t.step("check config", async () => { + const ci = await jsm.consumers.add("A", { + name: "a", + ack_policy: AckPolicy.None, + priority_groups: ["hello"], + priority_policy: PriorityPolicy.Overflow, + }); + assertEquals(ci.config.priority_policy, PriorityPolicy.Overflow); + assertEquals(ci.config.priority_groups, ["hello"]); + }); + + await cleanup(ns, nc); +});