From 0619fdcecd0686ead113573afc398867b8203639 Mon Sep 17 00:00:00 2001 From: Alberto Ricart Date: Wed, 20 Nov 2024 16:10:41 -0600 Subject: [PATCH] change(jetstream): `ConsumerStatus` and `ConsumerDebugStatus` are now replaced by `ConsumerNotificationType` which is an alias for specific types for the the different notifications. All discriminated by their `type` field. This simplifies down-stream consumption and provides a richer API for accessing the data in the notification. Signed-off-by: Alberto Ricart --- jetstream/deno.json | 2 +- jetstream/package.json | 2 +- jetstream/src/consumer.ts | 64 ++-- jetstream/src/internal_mod.ts | 4 +- jetstream/src/jserrors.ts | 9 +- jetstream/src/mod.ts | 4 +- jetstream/src/pushconsumer.ts | 49 +-- jetstream/src/types.ts | 312 ++++++++++++------ jetstream/tests/consume_test.ts | 19 +- jetstream/tests/consumers_ordered_test.ts | 13 +- jetstream/tests/consumers_test.ts | 36 +- jetstream/tests/jetstream409_test.ts | 33 +- .../tests/jetstream_pushconsumer_test.ts | 20 +- kv/deno.json | 4 +- kv/import_map.json | 4 +- kv/package.json | 4 +- migration.md | 3 + obj/deno.json | 4 +- obj/import_map.json | 4 +- obj/package.json | 4 +- transport-node/package.json | 6 +- 21 files changed, 358 insertions(+), 242 deletions(-) diff --git a/jetstream/deno.json b/jetstream/deno.json index 99d26b90..d10f4859 100644 --- a/jetstream/deno.json +++ b/jetstream/deno.json @@ -1,6 +1,6 @@ { "name": "@nats-io/jetstream", - "version": "3.0.0-29", + "version": "3.0.0-31", "exports": { ".": "./src/mod.ts", "./internal": "./src/internal_mod.ts" diff --git a/jetstream/package.json b/jetstream/package.json index 14802392..f7023c6a 100644 --- a/jetstream/package.json +++ b/jetstream/package.json @@ -1,6 +1,6 @@ { "name": "@nats-io/jetstream", - "version": "3.0.0-29", + "version": "3.0.0-31", "files": [ "lib/", "LICENSE", diff --git a/jetstream/src/consumer.ts b/jetstream/src/consumer.ts index ede37292..d7d974db 100644 --- a/jetstream/src/consumer.ts +++ b/jetstream/src/consumer.ts @@ -45,7 +45,6 @@ import type { PullOptions, } from "./jsapi_types.ts"; import { AckPolicy, DeliverPolicy } from "./jsapi_types.ts"; -import { ConsumerDebugEvents, ConsumerEvents } from "./types.ts"; import type { ConsumeMessages, @@ -54,7 +53,7 @@ import type { ConsumerAPI, ConsumerCallbackFn, ConsumerMessages, - ConsumerStatus, + ConsumerNotification, Expires, FetchMessages, FetchOptions, @@ -119,7 +118,7 @@ export class PullConsumerMessagesImpl extends QueuedIteratorImpl pending: { msgs: number; bytes: number; requests: number }; isConsume: boolean; callback: ConsumerCallbackFn | null; - listeners: QueuedIterator[]; + listeners: QueuedIterator[]; statusIterator?: QueuedIteratorImpl; abortOnMissingResource?: boolean; bind: boolean; @@ -207,8 +206,9 @@ export class PullConsumerMessagesImpl extends QueuedIteratorImpl } const status = new JetStreamStatus(msg); - if (status.isIdleHeartbeat()) { - this.notify(ConsumerDebugEvents.Heartbeat, status.parseHeartbeat()); + const hb = status.parseHeartbeat(); + if (hb) { + this.notify(hb); return; } const code = status.code; @@ -219,7 +219,11 @@ export class PullConsumerMessagesImpl extends QueuedIteratorImpl this.pending.msgs -= msgsLeft; this.pending.bytes -= bytesLeft; this.pending.requests--; - this.notify(ConsumerDebugEvents.Discard, { msgsLeft, bytesLeft }); + this.notify({ + type: "discard", + messagesLeft: msgsLeft, + bytesLeft: bytesLeft, + }); } else { // Examine the error codes // FIXME: 408 can be a Timeout or bad request, @@ -247,8 +251,7 @@ export class PullConsumerMessagesImpl extends QueuedIteratorImpl } default: this.notify( - ConsumerDebugEvents.DebugEvent, - { code, description }, + { type: "debug", code, description }, ); } } @@ -319,11 +322,11 @@ export class PullConsumerMessagesImpl extends QueuedIteratorImpl if (idle_heartbeat) { this.monitor = new IdleHeartbeatMonitor( idle_heartbeat, - (data): boolean => { + (count): boolean => { // for the pull consumer - missing heartbeats may be corrected // on the next pull etc - the only assumption here is we should // reset and check if the consumer was deleted from under us - this.notify(ConsumerEvents.HeartbeatsMissed, data); + this.notify({ type: "heartbeats_missed", count }); if (!this.isConsume && !this.consumer.ordered) { // if we are not a consume, give up - this was masked by an // external timer on fetch - the hb is a more reliable timeout @@ -390,9 +393,9 @@ export class PullConsumerMessagesImpl extends QueuedIteratorImpl handle409(status: JetStreamStatus): Error | null { const { code, description } = status; if (status.isConsumerDeleted()) { - this.notify(ConsumerEvents.ConsumerDeleted, { code, description }); + this.notify({ type: "consumer_deleted", code, description }); } else if (status.isExceededLimit()) { - this.notify(ConsumerEvents.ExceededLimit, { code, description }); + this.notify({ type: "exceeded_limits", code, description }); } if (!this.isConsume) { return status.toError(); @@ -410,7 +413,7 @@ export class PullConsumerMessagesImpl extends QueuedIteratorImpl const ocs = this.consumer.orderedConsumerState!; const { name } = this.consumer._info?.config; if (name) { - this.notify(ConsumerDebugEvents.Reset, name); + this.notify({ type: "reset", name }); this.consumer.api.delete(this.consumer.stream, name) .catch(() => { // ignored @@ -434,16 +437,17 @@ export class PullConsumerMessagesImpl extends QueuedIteratorImpl ).then((ci) => { ocs.createFails = 0; this.consumer._info = ci; - this.notify(ConsumerEvents.OrderedConsumerRecreated, ci.name); + this.notify({ type: "ordered_consumer_recreated", name: ci.name }); this.monitor?.restart(); this.pull(this.pullOptions()); }).catch((err) => { ocs.createFails++; if (err.message === "stream not found") { - this.notify( - ConsumerEvents.StreamNotFound, - ocs.createFails, - ); + this.notify({ + type: "stream_not_found", + consumerCreateFails: ocs.createFails, + name: this.consumer.stream, + }); if (this.abortOnMissingResource) { this.stop(err); return; @@ -494,13 +498,13 @@ export class PullConsumerMessagesImpl extends QueuedIteratorImpl } } - notify(type: ConsumerEvents | ConsumerDebugEvents, data: unknown) { + notify(n: ConsumerNotification) { if (this.listeners.length > 0) { (() => { this.listeners.forEach((l) => { - const qi = l as QueuedIteratorImpl; + const qi = l as QueuedIteratorImpl; if (!qi.done) { - qi.push({ type, data }); + qi.push(n); } }); })(); @@ -549,14 +553,18 @@ export class PullConsumerMessagesImpl extends QueuedIteratorImpl // game over if ((err as Error).message === "stream not found") { streamNotFound++; - this.notify(ConsumerEvents.StreamNotFound, streamNotFound); + this.notify({ type: "stream_not_found", name: this.consumer.stream }); if (!this.isConsume || this.abortOnMissingResource) { this.stop(err as Error); return false; } } else if ((err as Error).message === "consumer not found") { notFound++; - this.notify(ConsumerEvents.ConsumerNotFound, notFound); + this.notify({ + type: "consumer_not_found", + name: this.consumer.name, + count: notFound, + }); if (!this.isConsume || this.abortOnMissingResource) { if (this.consumer.ordered) { const ocs = this.consumer.orderedConsumerState!; @@ -598,7 +606,7 @@ export class PullConsumerMessagesImpl extends QueuedIteratorImpl JSON.stringify(opts), { reply: this.inbox }, ); - this.notify(ConsumerDebugEvents.Next, opts); + this.notify({ type: "next", options: opts as PullOptions }); }); } @@ -646,8 +654,8 @@ export class PullConsumerMessagesImpl extends QueuedIteratorImpl //@ts-ignore: fn this._push(() => { super.stop(err); - this.listeners.forEach((n) => { - n.stop(); + this.listeners.forEach((iter) => { + iter.stop(); }); }); } @@ -728,8 +736,8 @@ export class PullConsumerMessagesImpl extends QueuedIteratorImpl return args; } - status(): AsyncIterable { - const iter = new QueuedIteratorImpl(); + status(): AsyncIterable { + const iter = new QueuedIteratorImpl(); this.listeners.push(iter); return iter; } diff --git a/jetstream/src/internal_mod.ts b/jetstream/src/internal_mod.ts index 2240e378..baeeab73 100644 --- a/jetstream/src/internal_mod.ts +++ b/jetstream/src/internal_mod.ts @@ -15,8 +15,6 @@ export { AdvisoryKind, - ConsumerDebugEvents, - ConsumerEvents, DirectMsgHeaders, isBoundPushConsumerOptions, isOrderedPushConsumerOptions, @@ -48,8 +46,8 @@ export type { ConsumerCallbackFn, ConsumerKind, ConsumerMessages, + ConsumerNotification, Consumers, - ConsumerStatus, DeleteableConsumer, Destroyable, DirectStreamAPI, diff --git a/jetstream/src/jserrors.ts b/jetstream/src/jserrors.ts index 343376a2..c4f4ba69 100644 --- a/jetstream/src/jserrors.ts +++ b/jetstream/src/jserrors.ts @@ -14,7 +14,7 @@ */ import type { Msg } from "@nats-io/nats-core"; -import { JsHeaders } from "./types.ts"; +import { type Heartbeat, JsHeaders } from "./types.ts"; import type { ApiError } from "./jsapi_types.ts"; export class JetStreamNotEnabled extends Error { @@ -89,14 +89,15 @@ export class JetStreamStatus { } parseHeartbeat(): - | { natsLastConsumer: number; natsLastStream: number } + | Heartbeat | null { if (this.isIdleHeartbeat()) { return { - natsLastConsumer: parseInt( + type: "heartbeat", + lastConsumerSequence: parseInt( this.msg.headers?.get("Nats-Last-Consumer") || "0", ), - natsLastStream: parseInt( + lastStreamSequence: parseInt( this.msg.headers?.get("Nats-Last-Stream") || "0", ), }; diff --git a/jetstream/src/mod.ts b/jetstream/src/mod.ts index ce45e594..685d5958 100644 --- a/jetstream/src/mod.ts +++ b/jetstream/src/mod.ts @@ -17,8 +17,6 @@ export { jetstream, jetstreamManager } from "./internal_mod.ts"; export { AckPolicy, AdvisoryKind, - ConsumerDebugEvents, - ConsumerEvents, DeliverPolicy, DirectMsgHeaders, DiscardPolicy, @@ -56,8 +54,8 @@ export type { ConsumerInfo, ConsumerKind, ConsumerMessages, + ConsumerNotification, Consumers, - ConsumerStatus, ConsumerUpdateConfig, DeleteableConsumer, DeliveryInfo, diff --git a/jetstream/src/pushconsumer.ts b/jetstream/src/pushconsumer.ts index 61c9d480..71cdb179 100644 --- a/jetstream/src/pushconsumer.ts +++ b/jetstream/src/pushconsumer.ts @@ -17,13 +17,12 @@ import { toJsMsg } from "./jsmsg.ts"; import type { JsMsg } from "./jsmsg.ts"; import { AckPolicy, DeliverPolicy } from "./jsapi_types.ts"; import type { ConsumerConfig, ConsumerInfo } from "./jsapi_types.ts"; -import { ConsumerDebugEvents, ConsumerEvents } from "./types.ts"; +import type { ConsumerNotification } from "./types.ts"; import type { ConsumerAPI, ConsumerCallbackFn, ConsumerMessages, - ConsumerStatus, PushConsumer, PushConsumerOptions, } from "./types.ts"; @@ -54,7 +53,7 @@ export class PushConsumerMessagesImpl extends QueuedIteratorImpl consumer: PushConsumerImpl; sub!: Subscription; monitor: IdleHeartbeatMonitor | null; - listeners: QueuedIterator[]; + listeners: QueuedIterator[]; abortOnMissingResource: boolean; callback: ConsumerCallbackFn | null; ordered: boolean; @@ -123,11 +122,15 @@ export class PushConsumerMessagesImpl extends QueuedIteratorImpl ).then((ci) => { this.createFails = 0; this.consumer._info = ci; - this.notify(ConsumerEvents.OrderedConsumerRecreated, ci.name); + this.notify({ type: "ordered_consumer_recreated", name: ci.name }); }).catch((err) => { this.createFails++; if (err.message === "stream not found") { - this.notify(ConsumerEvents.StreamNotFound, this.createFails); + this.notify({ + type: "stream_not_found", + name: this.consumer.stream, + consumerCreateFails: this.createFails, + }); if (this.abortOnMissingResource) { this.stop(err); return; @@ -226,8 +229,8 @@ export class PushConsumerMessagesImpl extends QueuedIteratorImpl } } } - status(): AsyncIterable { - const iter = new QueuedIteratorImpl(); + status(): AsyncIterable { + const iter = new QueuedIteratorImpl(); this.listeners.push(iter); return iter; } @@ -247,8 +250,8 @@ export class PushConsumerMessagesImpl extends QueuedIteratorImpl const ms = millis(hbNanos); this.monitor = new IdleHeartbeatMonitor( ms, - (data): boolean => { - this.notify(ConsumerEvents.HeartbeatsMissed, data); + (count): boolean => { + this.notify({ type: "heartbeats_missed", count }); return false; }, { maxOut: 2 }, @@ -296,17 +299,22 @@ export class PushConsumerMessagesImpl extends QueuedIteratorImpl if (status.isFlowControlRequest()) { this._push(() => { msg.respond(); - this.notify(ConsumerDebugEvents.FlowControl, null); + this.notify({ type: "flow_control" }); }); return; } if (status.isIdleHeartbeat()) { - const natsLastConsumer = msg.headers?.get("Nats-Last-Consumer"); - const natsLastStream = msg.headers?.get("Nats-Last-Stream"); - this.notify(ConsumerDebugEvents.Heartbeat, { - natsLastConsumer, - natsLastStream, + const lastConsumerSequence = parseInt( + msg.headers?.get("Nats-Last-Consumer") || "0", + ); + const lastStreamSequence = parseInt( + msg.headers?.get("Nats-Last-Stream") ?? "0", + ); + this.notify({ + type: "heartbeat", + lastStreamSequence, + lastConsumerSequence, }); return; } @@ -315,10 +323,7 @@ export class PushConsumerMessagesImpl extends QueuedIteratorImpl const description = status.description; if (status.isConsumerDeleted()) { - this.notify( - ConsumerEvents.ConsumerDeleted, - `${code} ${description}`, - ); + this.notify({ type: "consumer_deleted", code, description }); } if (this.abortOnMissingResource) { this._push(() => { @@ -354,13 +359,13 @@ export class PushConsumerMessagesImpl extends QueuedIteratorImpl }); } - notify(type: ConsumerEvents | ConsumerDebugEvents, data: unknown) { + notify(n: ConsumerNotification) { if (this.listeners.length > 0) { (() => { this.listeners.forEach((l) => { - const qi = l as QueuedIteratorImpl; + const qi = l as QueuedIteratorImpl; if (!qi.done) { - qi.push({ type, data }); + qi.push(n); } }); })(); diff --git a/jetstream/src/types.ts b/jetstream/src/types.ts index b9d115de..36c2da83 100644 --- a/jetstream/src/types.ts +++ b/jetstream/src/types.ts @@ -24,6 +24,7 @@ import type { import type { DeliverPolicy, DirectLastFor, + PullOptions, ReplayPolicy, } from "./jsapi_types.ts"; @@ -471,104 +472,227 @@ export type ConsumeCallback = { }; /** - * ConsumerEvents are informational notifications emitted by ConsumerMessages + * ConsumerNotifications are informational notifications emitted by ConsumerMessages * that may be of interest to a client. */ -export enum ConsumerEvents { - /** - * Notification that heartbeats were missed. This notification is informational. - * The `data` portion of the status, is a number indicating the number of missed heartbeats. - * Note that when a client disconnects, heartbeat tracking is paused while - * the client is disconnected. - */ - HeartbeatsMissed = "heartbeats_missed", - /** - * Notification that the consumer was not found. Consumers that were accessible at - * least once, will be retried for more messages regardless of the not being found - * or timeouts etc. This notification includes a count of consecutive attempts to - * find the consumer. Note that if you get this notification possibly your code should - * attempt to recreate the consumer. Note that this notification is only informational - * for ordered consumers, as the consumer will be created in those cases automatically. - */ - ConsumerNotFound = "consumer_not_found", - - /** - * Notification that the stream was not found. Consumers were accessible at least once, - * will be retried for more messages regardless of the not being found - * or timeouts etc. This notification includes a count of consecutive attempts to - * find the consumer. Note that if you get this notification possibly your code should - * attempt to recreate the consumer. Note that this notification is only informational - * for ordered consumers, as the consumer will be created in those cases automatically. - */ - StreamNotFound = "stream_not_found", - - /** - * Notification that the consumer was deleted. This notification - * means the consumer will not get messages unless it is recreated. The client - * will continue to attempt to pull messages. Ordered consumer will recreate it. - */ - ConsumerDeleted = "consumer_deleted", - - /** - * This notification is specific of ordered consumers and will be notified whenever - * the consumer is recreated. The argument is the name of the newly created consumer. - */ - OrderedConsumerRecreated = "ordered_consumer_recreated", - - /** - * This notification is specific to pull consumers and will be notified whenever - * the pull request exceeds some limit such as maxwaiting, maxrequestbatch, etc. - * The data component has the code (409) and the message from the server. - */ - ExceededLimit = "limit_exceeded", -} +export type ConsumerNotification = + | HeartbeatsMissed + | ConsumerNotFound + | StreamNotFound + | ConsumerDeleted + | OrderedConsumerRecreated + | ExceededLimits + | Debug + | Discard + | Reset + | Next + | Heartbeat + | FlowControl; /** - * These events represent informational notifications emitted by ConsumerMessages - * that can be safely ignored by clients. + * Notification that heartbeats were missed. This notification is informational. + * The `data` portion of the status, is a number indicating the number of missed heartbeats. + * Note that when a client disconnects, heartbeat tracking is paused while + * the client is disconnected. */ -export enum ConsumerDebugEvents { - /** - * DebugEvents are effectively statuses returned by the server that were ignored - * by the client. The `data` portion of the - * status is just a string indicating the code/message of the status. - */ - DebugEvent = "debug", - /** - * Requests for messages can be terminated by the server, these notifications - * provide information on the number of messages and/or bytes that couldn't - * be satisfied by the consumer request. The `data` portion of the status will - * have the format of `{msgsLeft: number, bytesLeft: number}`. - */ - Discard = "discard", - /** - * Notifies that the current consumer will be reset - */ - Reset = "reset", - /** - * Notifies whenever there's a request for additional messages from the server. - * This notification telegraphs the request options, which should be treated as - * read-only. This notification is only useful for debugging. Data is PullOptions. - */ - Next = "next", - - /** - * Notifies that the client received a server-side heartbeat. The payload the data - * portion has the format `{natsLastConsumer: number, natsLastStream: number}`; - */ - Heartbeat = "heartbeat", - - /** - * Notifies that the client received a server-side flow control message. - * The data is null. - */ - FlowControl = "flow_control", -} - -export interface ConsumerStatus { - type: ConsumerEvents | ConsumerDebugEvents; - data: unknown; -} +export type HeartbeatsMissed = { + type: "heartbeats_missed"; + count: number; +}; +/** + * Notification that the consumer was not found. Consumers that were accessible at + * least once, will be retried for more messages regardless of the not being found + * or timeouts etc. This notification includes a count of consecutive attempts to + * find the consumer. Note that if you get this notification possibly your code should + * attempt to recreate the consumer. Note that this notification is only informational + * for ordered consumers, as the consumer will be created in those cases automatically. + */ +export type ConsumerNotFound = { + type: "consumer_not_found"; + name: string; + count: number; +}; +/** + * Notification that the stream was not found. Consumers were accessible at least once, + * will be retried for more messages regardless of the not being found + * or timeouts etc. This notification includes a count of consecutive attempts to + * find the consumer. Note that if you get this notification possibly your code should + * attempt to recreate the consumer. Note that this notification is only informational + * for ordered consumers, as the consumer will be created in those cases automatically. + */ +export type StreamNotFound = { + type: "stream_not_found"; + name: string; + consumerCreateFails?: number; +}; +/** + * Notification that the consumer was deleted. This notification + * means the consumer will not get messages unless it is recreated. The client + * will continue to attempt to pull messages. Ordered consumer will recreate it. + */ +export type ConsumerDeleted = { + type: "consumer_deleted"; + code: number; + description: string; +}; +/** + * This notification is specific of ordered consumers and will be notified whenever + * the consumer is recreated. The argument is the name of the newly created consumer. + */ +export type OrderedConsumerRecreated = { + type: "ordered_consumer_recreated"; + name: string; +}; +/** + * This notification is specific to pull consumers and will be notified whenever + * the pull request exceeds some limit such as maxwaiting, maxrequestbatch, etc. + * The data component has the code (409) and the message from the server. + */ +export type ExceededLimits = { + type: "exceeded_limits"; + code: number; + description: string; +}; +/** + * DebugEvents are effectively statuses returned by the server that were ignored + * by the client. The `code` and `description` indicate the server specified code and description. + */ +export type Debug = { + type: "debug"; + code: number; + description: string; +}; +/** + * Requests for messages can be terminated by the server, these notifications + * provide information on the number of messages and/or bytes that couldn't + * be satisfied by the consumer request. + */ +export type Discard = { + type: "discard"; + messagesLeft: number; + bytesLeft: number; +}; +/** + * Notifies that the current consumer will be reset + */ +export type Reset = { + type: "reset"; + name: string; +}; +/** + * Notifies whenever there's a request for additional messages from the server. + * This notification telegraphs the request options, which should be treated as + * read-only. This notification is only useful for debugging. Data is PullOptions. + */ +export type Next = { + type: "next"; + options: PullOptions; +}; +/** + * Notifies that the client received a server-side heartbeat. The payload the data + * portion has the format `{natsLastConsumer: number, natsLastStream: number}`; + */ +export type Heartbeat = { + type: "heartbeat"; + lastConsumerSequence: number; + lastStreamSequence: number; +}; +/** + * Notifies that the client received a server-side flow control message. + * The data is null. + */ +export type FlowControl = { + type: "flow_control"; +}; +// +// /** +// * ConsumerEvents are informational notifications emitted by ConsumerMessages +// * that may be of interest to a client. +// */ +// export enum ConsumerEvents { +// /** +// * Notification that heartbeats were missed. This notification is informational. +// * The `data` portion of the status, is a number indicating the number of missed heartbeats. +// * Note that when a client disconnects, heartbeat tracking is paused while +// * the client is disconnected. +// */ +// HeartbeatsMissed = "heartbeats_missed", +// /** +// * Notification that the consumer was not found. Consumers that were accessible at +// * least once, will be retried for more messages regardless of the not being found +// * or timeouts etc. This notification includes a count of consecutive attempts to +// * find the consumer. Note that if you get this notification possibly your code should +// * attempt to recreate the consumer. Note that this notification is only informational +// * for ordered consumers, as the consumer will be created in those cases automatically. +// */ +// ConsumerNotFound = "consumer_not_found", +// +// /** +// * Notification that the stream was not found. Consumers were accessible at least once, +// * will be retried for more messages regardless of the not being found +// * or timeouts etc. This notification includes a count of consecutive attempts to +// * find the consumer. Note that if you get this notification possibly your code should +// * attempt to recreate the consumer. Note that this notification is only informational +// * for ordered consumers, as the consumer will be created in those cases automatically. +// */ +// StreamNotFound = "stream_not_found", +// +// /** +// * Notification that the consumer was deleted. This notification +// * means the consumer will not get messages unless it is recreated. The client +// * will continue to attempt to pull messages. Ordered consumer will recreate it. +// */ +// ConsumerDeleted = "consumer_deleted", +// +// /** +// * This notification is specific of ordered consumers and will be notified whenever +// * the consumer is recreated. The argument is the name of the newly created consumer. +// */ +// OrderedConsumerRecreated = "ordered_consumer_recreated", +// +// /** +// * This notification is specific to pull consumers and will be notified whenever +// * the pull request exceeds some limit such as maxwaiting, maxrequestbatch, etc. +// * The data component has the code (409) and the message from the server. +// */ +// ExceededLimit = "limit_exceeded", +// } +// +// /** +// * These events represent informational notifications emitted by ConsumerMessages +// * that can be safely ignored by clients. +// */ +// export enum ConsumerDebugEvents { +// /** +// * Requests for messages can be terminated by the server, these notifications +// * provide information on the number of messages and/or bytes that couldn't +// * be satisfied by the consumer request. The `data` portion of the status will +// * have the format of `{msgsLeft: number, bytesLeft: number}`. +// */ +// Discard = "discard", +// /** +// * Notifies that the current consumer will be reset +// */ +// Reset = "reset", +// /** +// * Notifies whenever there's a request for additional messages from the server. +// * This notification telegraphs the request options, which should be treated as +// * read-only. This notification is only useful for debugging. Data is PullOptions. +// */ +// Next = "next", +// +// /** +// * Notifies that the client received a server-side heartbeat. The payload the data +// * portion has the format `{natsLastConsumer: number, natsLastStream: number}`; +// */ +// Heartbeat = "heartbeat", +// +// /** +// * Notifies that the client received a server-side flow control message. +// * The data is null. +// */ +// FlowControl = "flow_control", +// } export interface PushConsumer extends InfoableConsumer, DeleteableConsumer, ConsumerKind { @@ -613,7 +737,7 @@ export interface Close { } export interface ConsumerMessages extends QueuedIterator, Close { - status(): AsyncIterable; + status(): AsyncIterable; } /** diff --git a/jetstream/tests/consume_test.ts b/jetstream/tests/consume_test.ts index 77e28c39..5043879b 100644 --- a/jetstream/tests/consume_test.ts +++ b/jetstream/tests/consume_test.ts @@ -34,13 +34,12 @@ import { import type { PullConsumerMessagesImpl } from "../src/consumer.ts"; import { AckPolicy, - ConsumerEvents, - type ConsumerStatus, DeliverPolicy, jetstream, jetstreamManager, } from "../src/mod.ts"; import type { PushConsumerMessagesImpl } from "../src/pushconsumer.ts"; +import type { ConsumerNotification, HeartbeatsMissed } from "../src/types.ts"; Deno.test("consumers - consume", async () => { const { ns, nc } = await setup(jetstreamServerConf()); @@ -117,7 +116,7 @@ Deno.test("consume - heartbeats", async () => { // make heartbeats trigger (nc as NatsConnectionImpl)._resub(iter.sub, "foo"); - const d = deferred(); + const d = deferred(); await (async () => { const status = iter.status(); for await (const s of status) { @@ -134,8 +133,8 @@ Deno.test("consume - heartbeats", async () => { })(); const cs = await d; - assertEquals(cs.type, ConsumerEvents.HeartbeatsMissed); - assertEquals(cs.data, 2); + assertEquals(cs.type, "heartbeats_missed"); + assertEquals((cs as HeartbeatsMissed).count, 2); await cleanup(ns, nc); }); @@ -161,10 +160,10 @@ Deno.test("consume - deleted consumer", async () => { (async () => { const status = iter.status(); for await (const s of status) { - if (s.type === ConsumerEvents.ConsumerDeleted) { + if (s.type === "consumer_deleted") { deleted.resolve(); } - if (s.type === ConsumerEvents.ConsumerNotFound) { + if (s.type === "consumer_not_found") { notFound++; if (notFound > 1) { done.resolve(); @@ -407,13 +406,13 @@ Deno.test("consume - consumer bind", async () => { (async () => { for await (const s of iter.status()) { switch (s.type) { - case ConsumerEvents.HeartbeatsMissed: + case "heartbeats_missed": hbm++; if (hbm > 5) { iter.stop(); } break; - case ConsumerEvents.ConsumerNotFound: + case "consumer_not_found": cnf++; break; } @@ -458,7 +457,7 @@ Deno.test("consume - timer is based on idle_hb", async () => { let hbm = false; (async () => { for await (const s of iter.status()) { - if (s.type === ConsumerEvents.HeartbeatsMissed) { + if (s.type === "heartbeats_missed") { hbm = true; } } diff --git a/jetstream/tests/consumers_ordered_test.ts b/jetstream/tests/consumers_ordered_test.ts index ef2fb122..5fd3fcca 100644 --- a/jetstream/tests/consumers_ordered_test.ts +++ b/jetstream/tests/consumers_ordered_test.ts @@ -21,12 +21,7 @@ import { assertRejects, assertStringIncludes, } from "jsr:@std/assert"; -import { - ConsumerEvents, - DeliverPolicy, - jetstream, - jetstreamManager, -} from "../src/mod.ts"; +import { DeliverPolicy, jetstream, jetstreamManager } from "../src/mod.ts"; import type { ConsumerMessages, JsMsg } from "../src/mod.ts"; import { cleanup, @@ -884,7 +879,7 @@ Deno.test("ordered consumers - fetch reset", async () => { function countResets(iter: ConsumerMessages): Promise { return (async () => { for await (const s of iter.status()) { - if (s.type === ConsumerEvents.OrderedConsumerRecreated) { + if (s.type === "ordered_consumer_recreated") { recreates++; } } @@ -933,7 +928,7 @@ Deno.test("ordered consumers - consume reset", async () => { function countRecreates(iter: ConsumerMessages): Promise { return (async () => { for await (const s of iter.status()) { - if (s.type === ConsumerEvents.OrderedConsumerRecreated) { + if (s.type === "ordered_consumer_recreated") { recreates++; } } @@ -1106,7 +1101,7 @@ Deno.test( function countRecreates(iter: ConsumerMessages): Promise { return (async () => { for await (const s of iter.status()) { - if (s.type === ConsumerEvents.OrderedConsumerRecreated) { + if (s.type === "ordered_consumer_recreated") { recreates++; } } diff --git a/jetstream/tests/consumers_test.ts b/jetstream/tests/consumers_test.ts index c6580f3a..207a6d79 100644 --- a/jetstream/tests/consumers_test.ts +++ b/jetstream/tests/consumers_test.ts @@ -21,18 +21,21 @@ import { } from "jsr:@std/assert"; import { AckPolicy, - ConsumerDebugEvents, - ConsumerEvents, DeliverPolicy, jetstream, JetStreamError, jetstreamManager, } from "../src/mod.ts"; -import type { Consumer, ConsumerStatus, PullOptions } from "../src/mod.ts"; +import type { Consumer, PullOptions } from "../src/mod.ts"; import { deferred, nanos } from "@nats-io/nats-core"; import type { NatsConnectionImpl } from "@nats-io/nats-core/internal"; import { cleanup, jetstreamServerConf, setup } from "test_helpers"; import type { PullConsumerMessagesImpl } from "../src/consumer.ts"; +import type { + ConsumerNotification, + Discard, + HeartbeatsMissed, +} from "../src/types.ts"; Deno.test("consumers - min supported server", async () => { const { ns, nc } = await setup(jetstreamServerConf({})); @@ -192,7 +195,7 @@ Deno.test("consumers - fetch heartbeats", async () => { expires: 30000, }) as PullConsumerMessagesImpl; - const d = deferred(); + const d = deferred(); await (async () => { const status = iter.status(); @@ -217,8 +220,8 @@ Deno.test("consumers - fetch heartbeats", async () => { nci._resub(iter.sub, "foo"); const cs = await d; - assertEquals(cs.type, ConsumerEvents.HeartbeatsMissed); - assertEquals(cs.data, 2); + assertEquals(cs.type, "heartbeats_missed"); + assertEquals((cs as HeartbeatsMissed).count, 2); await cleanup(ns, nc); }); @@ -309,9 +312,8 @@ Deno.test("consumers - discard notifications", async () => { } })().then(); for await (const s of iter.status()) { - if (s.type === ConsumerDebugEvents.Discard) { - const r = s.data as Record; - assertEquals(r.msgsLeft, 100); + if (s.type === "discard") { + assertEquals(s.messagesLeft, 100); break; } } @@ -345,8 +347,8 @@ Deno.test("consumers - threshold_messages", async () => { let next: PullOptions[] = []; const done = (async () => { for await (const s of iter.status()) { - if (s.type === ConsumerDebugEvents.Next) { - next.push(s.data as PullOptions); + if (s.type === "next") { + next.push(s.options); } } })().then(); @@ -399,14 +401,14 @@ Deno.test("consumers - threshold_messages bytes", async () => { }) as PullConsumerMessagesImpl; const next: PullOptions[] = []; - const discards: { msgsLeft: number; bytesLeft: number }[] = []; + const discards: Discard[] = []; const done = (async () => { for await (const s of iter.status()) { - if (s.type === ConsumerDebugEvents.Next) { - next.push(s.data as PullOptions); + if (s.type === "next") { + next.push(s.options); } - if (s.type === ConsumerDebugEvents.Discard) { - discards.push(s.data as { msgsLeft: number; bytesLeft: number }); + if (s.type === "discard") { + discards.push(s); } } })().then(); @@ -433,7 +435,7 @@ Deno.test("consumers - threshold_messages bytes", async () => { continue; } // pull batch - close responses - received += next[i].batch - discards[i].msgsLeft; + received += next[i].batch - discards[i].messagesLeft; } // FIXME: this is wrong, making so test passes assertEquals(received, 996); diff --git a/jetstream/tests/jetstream409_test.ts b/jetstream/tests/jetstream409_test.ts index 361bd5a2..941067c4 100644 --- a/jetstream/tests/jetstream409_test.ts +++ b/jetstream/tests/jetstream409_test.ts @@ -14,13 +14,7 @@ */ import { nanos } from "@nats-io/nats-core"; -import { - AckPolicy, - ConsumerDebugEvents, - ConsumerEvents, - jetstream, - jetstreamManager, -} from "../src/mod.ts"; +import { AckPolicy, jetstream, jetstreamManager } from "../src/mod.ts"; import { assert, assertRejects, fail } from "jsr:@std/assert"; import { initStream } from "./jstest_util.ts"; @@ -67,9 +61,8 @@ Deno.test("409 - max expires", async () => { let count = 0; (async () => { for await (const s of iter.status()) { - if (s.type === ConsumerEvents.ExceededLimit) { - const data = s.data as { code: number; description: string }; - if (data.description.includes("exceeded maxrequestexpires")) { + if (s.type === "exceeded_limits") { + if (s.description.includes("exceeded maxrequestexpires")) { count++; if (count === 2) { iter.close(); @@ -100,9 +93,8 @@ Deno.test("409 - max message size", async () => { const msgs = await c.fetch({ max_bytes: 10 }); (async () => { for await (const s of msgs.status()) { - if (s.type === ConsumerDebugEvents.Discard) { - const data = s.data as { bytesLeft: number }; - if (data.bytesLeft === 10) { + if (s.type === "discard") { + if (s.bytesLeft === 10) { msgs.stop(); } } @@ -120,10 +112,9 @@ Deno.test("409 - max message size", async () => { }); let count = 0; for await (const s of iter.status()) { - if (s.type === ConsumerDebugEvents.Discard) { - const data = s.data as { bytesLeft: number }; + if (s.type === "discard") { count++; - if (data.bytesLeft === 10) { + if (s.bytesLeft === 10) { if (count >= 2) { iter.close(); } @@ -169,9 +160,8 @@ Deno.test("409 - max batch", async () => { callback: () => {}, }); for await (const s of iter.status()) { - if (s.type === ConsumerEvents.ExceededLimit) { - const data = s.data as { code: number; description: string }; - if (data.description.includes("exceeded maxrequestbatch")) { + if (s.type === "exceeded_limits") { + if (s.description.includes("exceeded maxrequestbatch")) { count++; if (count >= 2) { iter.stop(); @@ -227,9 +217,8 @@ Deno.test("409 - max waiting", async () => { const iter = await c.consume({ expires: 1_000, callback: () => {} }); for await (const s of iter.status()) { - if (s.type === ConsumerEvents.ExceededLimit) { - const data = s.data as { code: number; description: string }; - if (data.description.includes("exceeded maxwaiting")) { + if (s.type === "exceeded_limits") { + if (s.description.includes("exceeded maxwaiting")) { count++; if (count >= 2) { iter.stop(); diff --git a/jetstream/tests/jetstream_pushconsumer_test.ts b/jetstream/tests/jetstream_pushconsumer_test.ts index aab46dad..d2e7d43b 100644 --- a/jetstream/tests/jetstream_pushconsumer_test.ts +++ b/jetstream/tests/jetstream_pushconsumer_test.ts @@ -34,7 +34,6 @@ import { syncIterator, } from "@nats-io/nats-core"; import type { BoundPushConsumerOptions, PubAck } from "../src/types.ts"; -import { ConsumerDebugEvents, ConsumerEvents } from "../src/types.ts"; import { assert, assertEquals, @@ -476,14 +475,9 @@ Deno.test("jetstream - idle heartbeats", async () => { const status = messages.status(); for await (const s of status) { - if (s.type === ConsumerDebugEvents.Heartbeat) { - const d = s.data as { - natsLastConsumer: string; - natsLastStream: string; - }; - - assertEquals(d.natsLastConsumer, "1"); - assertEquals(d.natsLastStream, "1"); + if (s.type === "heartbeat") { + assertEquals(s.lastConsumerSequence, 1); + assertEquals(s.lastStreamSequence, 1); messages.stop(); } } @@ -530,7 +524,7 @@ Deno.test("jetstream - flow control", async () => { const iter = await c.consume({ callback: () => {} }); const status = iter.status(); for await (const s of status) { - if (s.type === ConsumerDebugEvents.FlowControl) { + if (s.type === "flow_control") { iter.stop(); } } @@ -700,9 +694,9 @@ Deno.test("jetstream - bind", async () => { let hb = 0; for await (const s of messages.status()) { - if (s.type === ConsumerDebugEvents.FlowControl) { + if (s.type === "flow_control") { fc++; - } else if (s.type === ConsumerDebugEvents.Heartbeat) { + } else if (s.type === "heartbeat") { hb++; } } @@ -834,7 +828,7 @@ Deno.test("jetstream - idleheartbeats notifications don't cancel", async () => { const msgs = await c.consume({ callback: () => {} }); let missed = 0; for await (const s of msgs.status()) { - if (s.type === ConsumerEvents.HeartbeatsMissed) { + if (s.type === "heartbeats_missed") { missed++; if (missed > 3) { await msgs.close(); diff --git a/kv/deno.json b/kv/deno.json index bf5dc963..291959d8 100644 --- a/kv/deno.json +++ b/kv/deno.json @@ -1,6 +1,6 @@ { "name": "@nats-io/kv", - "version": "3.0.0-23", + "version": "3.0.0-25", "exports": { ".": "./src/mod.ts", "./internal": "./src/internal_mod.ts" @@ -34,6 +34,6 @@ }, "imports": { "@nats-io/nats-core": "jsr:@nats-io/nats-core@~3.0.0-42", - "@nats-io/jetstream": "jsr:@nats-io/jetstream@~3.0.0-29" + "@nats-io/jetstream": "jsr:@nats-io/jetstream@~3.0.0-31" } } diff --git a/kv/import_map.json b/kv/import_map.json index aa1ab048..f6c68809 100644 --- a/kv/import_map.json +++ b/kv/import_map.json @@ -2,8 +2,8 @@ "imports": { "@nats-io/nats-core": "jsr:@nats-io/nats-core@~3.0.0-42", "@nats-io/nats-core/internal": "jsr:@nats-io/nats-core@~3.0.0-42/internal", - "@nats-io/jetstream": "jsr:@nats-io/jetstream@~3.0.0-29", - "@nats-io/jetstream/internal": "jsr:@nats-io/jetstream@~3.0.0-29/internal", + "@nats-io/jetstream": "jsr:@nats-io/jetstream@~3.0.0-31", + "@nats-io/jetstream/internal": "jsr:@nats-io/jetstream@~3.0.0-31/internal", "test_helpers": "../test_helpers/mod.ts", "@nats-io/nkeys": "jsr:@nats-io/nkeys@~2.0.0-2", "@nats-io/nuid": "jsr:@nats-io/nuid@~2.0.1-2", diff --git a/kv/package.json b/kv/package.json index d34228df..ab80d9fa 100644 --- a/kv/package.json +++ b/kv/package.json @@ -1,6 +1,6 @@ { "name": "@nats-io/kv", - "version": "3.0.0-23", + "version": "3.0.0-25", "files": [ "lib/", "LICENSE", @@ -34,7 +34,7 @@ }, "description": "kv library - this library implements all the base functionality for NATS KV javascript clients", "dependencies": { - "@nats-io/jetstream": "3.0.0-29", + "@nats-io/jetstream": "3.0.0-31", "@nats-io/nats-core": "3.0.0-42" }, "devDependencies": { diff --git a/migration.md b/migration.md index c05bb30c..d498f293 100644 --- a/migration.md +++ b/migration.md @@ -135,6 +135,9 @@ To use JetStream, you must install and import `@nats/jetstream`. - MsgRequest for `Stream#getMessage()` removed deprecated number argument. - For non-ordered consumers next/fetch() can will now throw/reject when heartbeats are missed. +- The `ConsumerEvents` and `ConsumerDebugEvents` enum has been removed and + replaced with `ConsumerNotification` which have a discriminating field `type`. + The status objects provide a more specific API for querying those events. ## Changes to KV diff --git a/obj/deno.json b/obj/deno.json index 3b8f52de..31833490 100644 --- a/obj/deno.json +++ b/obj/deno.json @@ -1,6 +1,6 @@ { "name": "@nats-io/obj", - "version": "3.0.0-24", + "version": "3.0.0-26", "exports": { ".": "./src/mod.ts", "./internal": "./src/internal_mod.ts" @@ -34,6 +34,6 @@ }, "imports": { "@nats-io/nats-core": "jsr:@nats-io/nats-core@~3.0.0-42", - "@nats-io/jetstream": "jsr:@nats-io/jetstream@~3.0.0-29" + "@nats-io/jetstream": "jsr:@nats-io/jetstream@~3.0.0-31" } } diff --git a/obj/import_map.json b/obj/import_map.json index aa1ab048..f6c68809 100644 --- a/obj/import_map.json +++ b/obj/import_map.json @@ -2,8 +2,8 @@ "imports": { "@nats-io/nats-core": "jsr:@nats-io/nats-core@~3.0.0-42", "@nats-io/nats-core/internal": "jsr:@nats-io/nats-core@~3.0.0-42/internal", - "@nats-io/jetstream": "jsr:@nats-io/jetstream@~3.0.0-29", - "@nats-io/jetstream/internal": "jsr:@nats-io/jetstream@~3.0.0-29/internal", + "@nats-io/jetstream": "jsr:@nats-io/jetstream@~3.0.0-31", + "@nats-io/jetstream/internal": "jsr:@nats-io/jetstream@~3.0.0-31/internal", "test_helpers": "../test_helpers/mod.ts", "@nats-io/nkeys": "jsr:@nats-io/nkeys@~2.0.0-2", "@nats-io/nuid": "jsr:@nats-io/nuid@~2.0.1-2", diff --git a/obj/package.json b/obj/package.json index fe3a7917..3d09bc91 100644 --- a/obj/package.json +++ b/obj/package.json @@ -1,6 +1,6 @@ { "name": "@nats-io/obj", - "version": "3.0.0-24", + "version": "3.0.0-26", "files": [ "lib/", "LICENSE", @@ -34,7 +34,7 @@ }, "description": "obj library - this library implements all the base functionality for NATS objectstore for javascript clients", "dependencies": { - "@nats-io/jetstream": "3.0.0-29", + "@nats-io/jetstream": "3.0.0-31", "@nats-io/nats-core": "3.0.0-42" }, "devDependencies": { diff --git a/transport-node/package.json b/transport-node/package.json index f6705b9f..3844cbbe 100644 --- a/transport-node/package.json +++ b/transport-node/package.json @@ -64,8 +64,8 @@ "nats-jwt": "^0.0.9", "shx": "^0.3.3", "typescript": "5.6.3", - "@nats-io/jetstream": "3.0.0-29", - "@nats-io/kv": "3.0.0-23", - "@nats-io/obj": "3.0.0-24" + "@nats-io/jetstream": "3.0.0-31", + "@nats-io/kv": "3.0.0-25", + "@nats-io/obj": "3.0.0-26" } }