Skip to content

Commit

Permalink
change(jetstream): ConsumerStatus and ConsumerDebugStatus are now…
Browse files Browse the repository at this point in the history
… replaced by `ConsumerNotificationType` which is an alias for specific types for the the different notifications. All discriminated by their `type` field. This simplifies down-stream consumption and provides a richer API for accessing the data in the notification.

Signed-off-by: Alberto Ricart <[email protected]>
  • Loading branch information
aricart committed Nov 20, 2024
1 parent 8fe6b2c commit 0619fdc
Show file tree
Hide file tree
Showing 21 changed files with 358 additions and 242 deletions.
2 changes: 1 addition & 1 deletion jetstream/deno.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "@nats-io/jetstream",
"version": "3.0.0-29",
"version": "3.0.0-31",
"exports": {
".": "./src/mod.ts",
"./internal": "./src/internal_mod.ts"
Expand Down
2 changes: 1 addition & 1 deletion jetstream/package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "@nats-io/jetstream",
"version": "3.0.0-29",
"version": "3.0.0-31",
"files": [
"lib/",
"LICENSE",
Expand Down
64 changes: 36 additions & 28 deletions jetstream/src/consumer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,6 @@ import type {
PullOptions,
} from "./jsapi_types.ts";
import { AckPolicy, DeliverPolicy } from "./jsapi_types.ts";
import { ConsumerDebugEvents, ConsumerEvents } from "./types.ts";

import type {
ConsumeMessages,
Expand All @@ -54,7 +53,7 @@ import type {
ConsumerAPI,
ConsumerCallbackFn,
ConsumerMessages,
ConsumerStatus,
ConsumerNotification,
Expires,
FetchMessages,
FetchOptions,
Expand Down Expand Up @@ -119,7 +118,7 @@ export class PullConsumerMessagesImpl extends QueuedIteratorImpl<JsMsg>
pending: { msgs: number; bytes: number; requests: number };
isConsume: boolean;
callback: ConsumerCallbackFn | null;
listeners: QueuedIterator<ConsumerStatus>[];
listeners: QueuedIterator<ConsumerNotification>[];
statusIterator?: QueuedIteratorImpl<Status>;
abortOnMissingResource?: boolean;
bind: boolean;
Expand Down Expand Up @@ -207,8 +206,9 @@ export class PullConsumerMessagesImpl extends QueuedIteratorImpl<JsMsg>
}
const status = new JetStreamStatus(msg);

if (status.isIdleHeartbeat()) {
this.notify(ConsumerDebugEvents.Heartbeat, status.parseHeartbeat());
const hb = status.parseHeartbeat();
if (hb) {
this.notify(hb);
return;
}
const code = status.code;
Expand All @@ -219,7 +219,11 @@ export class PullConsumerMessagesImpl extends QueuedIteratorImpl<JsMsg>
this.pending.msgs -= msgsLeft;
this.pending.bytes -= bytesLeft;
this.pending.requests--;
this.notify(ConsumerDebugEvents.Discard, { msgsLeft, bytesLeft });
this.notify({
type: "discard",
messagesLeft: msgsLeft,
bytesLeft: bytesLeft,
});
} else {
// Examine the error codes
// FIXME: 408 can be a Timeout or bad request,
Expand Down Expand Up @@ -247,8 +251,7 @@ export class PullConsumerMessagesImpl extends QueuedIteratorImpl<JsMsg>
}
default:
this.notify(
ConsumerDebugEvents.DebugEvent,
{ code, description },
{ type: "debug", code, description },
);
}
}
Expand Down Expand Up @@ -319,11 +322,11 @@ export class PullConsumerMessagesImpl extends QueuedIteratorImpl<JsMsg>
if (idle_heartbeat) {
this.monitor = new IdleHeartbeatMonitor(
idle_heartbeat,
(data): boolean => {
(count): boolean => {
// for the pull consumer - missing heartbeats may be corrected
// on the next pull etc - the only assumption here is we should
// reset and check if the consumer was deleted from under us
this.notify(ConsumerEvents.HeartbeatsMissed, data);
this.notify({ type: "heartbeats_missed", count });
if (!this.isConsume && !this.consumer.ordered) {
// if we are not a consume, give up - this was masked by an
// external timer on fetch - the hb is a more reliable timeout
Expand Down Expand Up @@ -390,9 +393,9 @@ export class PullConsumerMessagesImpl extends QueuedIteratorImpl<JsMsg>
handle409(status: JetStreamStatus): Error | null {
const { code, description } = status;
if (status.isConsumerDeleted()) {
this.notify(ConsumerEvents.ConsumerDeleted, { code, description });
this.notify({ type: "consumer_deleted", code, description });
} else if (status.isExceededLimit()) {
this.notify(ConsumerEvents.ExceededLimit, { code, description });
this.notify({ type: "exceeded_limits", code, description });
}
if (!this.isConsume) {
return status.toError();
Expand All @@ -410,7 +413,7 @@ export class PullConsumerMessagesImpl extends QueuedIteratorImpl<JsMsg>
const ocs = this.consumer.orderedConsumerState!;
const { name } = this.consumer._info?.config;
if (name) {
this.notify(ConsumerDebugEvents.Reset, name);
this.notify({ type: "reset", name });
this.consumer.api.delete(this.consumer.stream, name)
.catch(() => {
// ignored
Expand All @@ -434,16 +437,17 @@ export class PullConsumerMessagesImpl extends QueuedIteratorImpl<JsMsg>
).then((ci) => {
ocs.createFails = 0;
this.consumer._info = ci;
this.notify(ConsumerEvents.OrderedConsumerRecreated, ci.name);
this.notify({ type: "ordered_consumer_recreated", name: ci.name });
this.monitor?.restart();
this.pull(this.pullOptions());
}).catch((err) => {
ocs.createFails++;
if (err.message === "stream not found") {
this.notify(
ConsumerEvents.StreamNotFound,
ocs.createFails,
);
this.notify({
type: "stream_not_found",
consumerCreateFails: ocs.createFails,
name: this.consumer.stream,
});
if (this.abortOnMissingResource) {
this.stop(err);
return;
Expand Down Expand Up @@ -494,13 +498,13 @@ export class PullConsumerMessagesImpl extends QueuedIteratorImpl<JsMsg>
}
}

notify(type: ConsumerEvents | ConsumerDebugEvents, data: unknown) {
notify(n: ConsumerNotification) {
if (this.listeners.length > 0) {
(() => {
this.listeners.forEach((l) => {
const qi = l as QueuedIteratorImpl<ConsumerStatus>;
const qi = l as QueuedIteratorImpl<ConsumerNotification>;
if (!qi.done) {
qi.push({ type, data });
qi.push(n);
}
});
})();
Expand Down Expand Up @@ -549,14 +553,18 @@ export class PullConsumerMessagesImpl extends QueuedIteratorImpl<JsMsg>
// game over
if ((err as Error).message === "stream not found") {
streamNotFound++;
this.notify(ConsumerEvents.StreamNotFound, streamNotFound);
this.notify({ type: "stream_not_found", name: this.consumer.stream });
if (!this.isConsume || this.abortOnMissingResource) {
this.stop(err as Error);
return false;
}
} else if ((err as Error).message === "consumer not found") {
notFound++;
this.notify(ConsumerEvents.ConsumerNotFound, notFound);
this.notify({
type: "consumer_not_found",
name: this.consumer.name,
count: notFound,
});
if (!this.isConsume || this.abortOnMissingResource) {
if (this.consumer.ordered) {
const ocs = this.consumer.orderedConsumerState!;
Expand Down Expand Up @@ -598,7 +606,7 @@ export class PullConsumerMessagesImpl extends QueuedIteratorImpl<JsMsg>
JSON.stringify(opts),
{ reply: this.inbox },
);
this.notify(ConsumerDebugEvents.Next, opts);
this.notify({ type: "next", options: opts as PullOptions });
});
}

Expand Down Expand Up @@ -646,8 +654,8 @@ export class PullConsumerMessagesImpl extends QueuedIteratorImpl<JsMsg>
//@ts-ignore: fn
this._push(() => {
super.stop(err);
this.listeners.forEach((n) => {
n.stop();
this.listeners.forEach((iter) => {
iter.stop();
});
});
}
Expand Down Expand Up @@ -728,8 +736,8 @@ export class PullConsumerMessagesImpl extends QueuedIteratorImpl<JsMsg>
return args;
}

status(): AsyncIterable<ConsumerStatus> {
const iter = new QueuedIteratorImpl<ConsumerStatus>();
status(): AsyncIterable<ConsumerNotification> {
const iter = new QueuedIteratorImpl<ConsumerNotification>();
this.listeners.push(iter);
return iter;
}
Expand Down
4 changes: 1 addition & 3 deletions jetstream/src/internal_mod.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,6 @@

export {
AdvisoryKind,
ConsumerDebugEvents,
ConsumerEvents,
DirectMsgHeaders,
isBoundPushConsumerOptions,
isOrderedPushConsumerOptions,
Expand Down Expand Up @@ -48,8 +46,8 @@ export type {
ConsumerCallbackFn,
ConsumerKind,
ConsumerMessages,
ConsumerNotification,
Consumers,
ConsumerStatus,
DeleteableConsumer,
Destroyable,
DirectStreamAPI,
Expand Down
9 changes: 5 additions & 4 deletions jetstream/src/jserrors.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
*/

import type { Msg } from "@nats-io/nats-core";
import { JsHeaders } from "./types.ts";
import { type Heartbeat, JsHeaders } from "./types.ts";
import type { ApiError } from "./jsapi_types.ts";

export class JetStreamNotEnabled extends Error {
Expand Down Expand Up @@ -89,14 +89,15 @@ export class JetStreamStatus {
}

parseHeartbeat():
| { natsLastConsumer: number; natsLastStream: number }
| Heartbeat
| null {
if (this.isIdleHeartbeat()) {
return {
natsLastConsumer: parseInt(
type: "heartbeat",
lastConsumerSequence: parseInt(
this.msg.headers?.get("Nats-Last-Consumer") || "0",
),
natsLastStream: parseInt(
lastStreamSequence: parseInt(
this.msg.headers?.get("Nats-Last-Stream") || "0",
),
};
Expand Down
4 changes: 1 addition & 3 deletions jetstream/src/mod.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,6 @@ export { jetstream, jetstreamManager } from "./internal_mod.ts";
export {
AckPolicy,
AdvisoryKind,
ConsumerDebugEvents,
ConsumerEvents,
DeliverPolicy,
DirectMsgHeaders,
DiscardPolicy,
Expand Down Expand Up @@ -56,8 +54,8 @@ export type {
ConsumerInfo,
ConsumerKind,
ConsumerMessages,
ConsumerNotification,
Consumers,
ConsumerStatus,
ConsumerUpdateConfig,
DeleteableConsumer,
DeliveryInfo,
Expand Down
49 changes: 27 additions & 22 deletions jetstream/src/pushconsumer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,12 @@ import { toJsMsg } from "./jsmsg.ts";
import type { JsMsg } from "./jsmsg.ts";
import { AckPolicy, DeliverPolicy } from "./jsapi_types.ts";
import type { ConsumerConfig, ConsumerInfo } from "./jsapi_types.ts";
import { ConsumerDebugEvents, ConsumerEvents } from "./types.ts";
import type { ConsumerNotification } from "./types.ts";

import type {
ConsumerAPI,
ConsumerCallbackFn,
ConsumerMessages,
ConsumerStatus,
PushConsumer,
PushConsumerOptions,
} from "./types.ts";
Expand Down Expand Up @@ -54,7 +53,7 @@ export class PushConsumerMessagesImpl extends QueuedIteratorImpl<JsMsg>
consumer: PushConsumerImpl;
sub!: Subscription;
monitor: IdleHeartbeatMonitor | null;
listeners: QueuedIterator<ConsumerStatus>[];
listeners: QueuedIterator<ConsumerNotification>[];
abortOnMissingResource: boolean;
callback: ConsumerCallbackFn | null;
ordered: boolean;
Expand Down Expand Up @@ -123,11 +122,15 @@ export class PushConsumerMessagesImpl extends QueuedIteratorImpl<JsMsg>
).then((ci) => {
this.createFails = 0;
this.consumer._info = ci;
this.notify(ConsumerEvents.OrderedConsumerRecreated, ci.name);
this.notify({ type: "ordered_consumer_recreated", name: ci.name });
}).catch((err) => {
this.createFails++;
if (err.message === "stream not found") {
this.notify(ConsumerEvents.StreamNotFound, this.createFails);
this.notify({
type: "stream_not_found",
name: this.consumer.stream,
consumerCreateFails: this.createFails,
});
if (this.abortOnMissingResource) {
this.stop(err);
return;
Expand Down Expand Up @@ -226,8 +229,8 @@ export class PushConsumerMessagesImpl extends QueuedIteratorImpl<JsMsg>
}
}
}
status(): AsyncIterable<ConsumerStatus> {
const iter = new QueuedIteratorImpl<ConsumerStatus>();
status(): AsyncIterable<ConsumerNotification> {
const iter = new QueuedIteratorImpl<ConsumerNotification>();
this.listeners.push(iter);
return iter;
}
Expand All @@ -247,8 +250,8 @@ export class PushConsumerMessagesImpl extends QueuedIteratorImpl<JsMsg>
const ms = millis(hbNanos);
this.monitor = new IdleHeartbeatMonitor(
ms,
(data): boolean => {
this.notify(ConsumerEvents.HeartbeatsMissed, data);
(count): boolean => {
this.notify({ type: "heartbeats_missed", count });
return false;
},
{ maxOut: 2 },
Expand Down Expand Up @@ -296,17 +299,22 @@ export class PushConsumerMessagesImpl extends QueuedIteratorImpl<JsMsg>
if (status.isFlowControlRequest()) {
this._push(() => {
msg.respond();
this.notify(ConsumerDebugEvents.FlowControl, null);
this.notify({ type: "flow_control" });
});
return;
}

if (status.isIdleHeartbeat()) {
const natsLastConsumer = msg.headers?.get("Nats-Last-Consumer");
const natsLastStream = msg.headers?.get("Nats-Last-Stream");
this.notify(ConsumerDebugEvents.Heartbeat, {
natsLastConsumer,
natsLastStream,
const lastConsumerSequence = parseInt(
msg.headers?.get("Nats-Last-Consumer") || "0",
);
const lastStreamSequence = parseInt(
msg.headers?.get("Nats-Last-Stream") ?? "0",
);
this.notify({
type: "heartbeat",
lastStreamSequence,
lastConsumerSequence,
});
return;
}
Expand All @@ -315,10 +323,7 @@ export class PushConsumerMessagesImpl extends QueuedIteratorImpl<JsMsg>
const description = status.description;

if (status.isConsumerDeleted()) {
this.notify(
ConsumerEvents.ConsumerDeleted,
`${code} ${description}`,
);
this.notify({ type: "consumer_deleted", code, description });
}
if (this.abortOnMissingResource) {
this._push(() => {
Expand Down Expand Up @@ -354,13 +359,13 @@ export class PushConsumerMessagesImpl extends QueuedIteratorImpl<JsMsg>
});
}

notify(type: ConsumerEvents | ConsumerDebugEvents, data: unknown) {
notify(n: ConsumerNotification) {
if (this.listeners.length > 0) {
(() => {
this.listeners.forEach((l) => {
const qi = l as QueuedIteratorImpl<ConsumerStatus>;
const qi = l as QueuedIteratorImpl<ConsumerNotification>;
if (!qi.done) {
qi.push({ type, data });
qi.push(n);
}
});
})();
Expand Down
Loading

0 comments on commit 0619fdc

Please sign in to comment.