Skip to content

Commit

Permalink
jetstream: PushConsumer (#45)
Browse files Browse the repository at this point in the history
  • Loading branch information
aricart authored Sep 6, 2024
1 parent 8fbdcf1 commit 2d93935
Show file tree
Hide file tree
Showing 21 changed files with 1,642 additions and 1,024 deletions.
2 changes: 1 addition & 1 deletion jetstream/examples/06_heartbeats.ts
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ while (true) {

// watch the to see if the consume operation misses heartbeats
(async () => {
for await (const s of await messages.status()) {
for await (const s of messages.status()) {
if (s.type === ConsumerEvents.HeartbeatsMissed) {
// you can decide how many heartbeats you are willing to miss
const n = s.data as number;
Expand Down
50 changes: 37 additions & 13 deletions jetstream/src/consumer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import {
timeout,
} from "@nats-io/nats-core/internal";
import type {
CallbackFn,
MsgHdrs,
QueuedIterator,
Status,
Expand Down Expand Up @@ -163,6 +164,12 @@ export class PullConsumerMessagesImpl extends QueuedIteratorImpl<JsMsg>

if (isProtocol) {
if (isHeartbeatMsg(msg)) {
const natsLastConsumer = msg.headers?.get("Nats-Last-Consumer");
const natsLastStream = msg.headers?.get("Nats-Last-Stream");
this.notify(ConsumerDebugEvents.Heartbeat, {
natsLastConsumer,
natsLastStream,
});
return;
}
const code = msg.headers?.code;
Expand Down Expand Up @@ -317,14 +324,15 @@ export class PullConsumerMessagesImpl extends QueuedIteratorImpl<JsMsg>
this.pull(this.pullOptions());
}

_push(r: JsMsg) {
_push(r: JsMsg | CallbackFn) {
if (!this.callback) {
super.push(r);
} else {
const fn = typeof r === "function" ? r as () => void : null;
const fn = typeof r === "function" ? r as CallbackFn : null;
try {
if (!fn) {
this.callback(r);
const m = r as JsMsg;
this.callback(m);
} else {
fn();
}
Expand Down Expand Up @@ -558,10 +566,10 @@ export class PullConsumerMessagesImpl extends QueuedIteratorImpl<JsMsg>
return args;
}

status(): Promise<AsyncIterable<ConsumerStatus>> {
status(): AsyncIterable<ConsumerStatus> {
const iter = new QueuedIteratorImpl<ConsumerStatus>();
this.listeners.push(iter);
return Promise.resolve(iter);
return iter;
}
}

Expand All @@ -586,7 +594,7 @@ export class OrderedConsumerMessages extends QueuedIteratorImpl<JsMsg>
this.stop(err || undefined);
});
(async () => {
const status = await this.src.status();
const status = this.src.status();
for await (const s of status) {
this.notify(s.type, s.data);
}
Expand Down Expand Up @@ -626,10 +634,10 @@ export class OrderedConsumerMessages extends QueuedIteratorImpl<JsMsg>
return this.iterClosed;
}

status(): Promise<AsyncIterable<ConsumerStatus>> {
status(): AsyncIterable<ConsumerStatus> {
const iter = new QueuedIteratorImpl<ConsumerStatus>();
this.listeners.push(iter);
return Promise.resolve(iter);
return iter;
}
}

Expand All @@ -646,6 +654,14 @@ export class PullConsumerImpl implements Consumer {
this.name = info.name;
}

isPullConsumer(): boolean {
return true;
}

isPushConsumer(): boolean {
return false;
}

consume(
opts: ConsumeOptions = {
max_messages: 100,
Expand Down Expand Up @@ -694,7 +710,7 @@ export class PullConsumerImpl implements Consumer {
// watch the messages for heartbeats missed
if (to >= 60_000) {
(async () => {
for await (const s of await iter.status()) {
for await (const s of iter.status()) {
if (
s.type === ConsumerEvents.HeartbeatsMissed &&
(s.data as number) >= 2
Expand Down Expand Up @@ -793,6 +809,14 @@ export class OrderedPullConsumerImpl implements Consumer {
this.cursor.stream_seq = this.startSeq > 0 ? this.startSeq - 1 : 0;
}

isPullConsumer(): boolean {
return true;
}

isPushConsumer(): boolean {
return false;
}

getConsumerOpts(seq: number): ConsumerConfig {
// change the serial - invalidating any callback not
// matching the serial
Expand All @@ -811,11 +835,11 @@ export class OrderedPullConsumerImpl implements Consumer {
if (this.consumerOpts.headers_only === true) {
config.headers_only = true;
}
if (Array.isArray(this.consumerOpts.filterSubjects)) {
config.filter_subjects = this.consumerOpts.filterSubjects;
if (Array.isArray(this.consumerOpts.filter_subjects)) {
config.filter_subjects = this.consumerOpts.filter_subjects;
}
if (typeof this.consumerOpts.filterSubjects === "string") {
config.filter_subject = this.consumerOpts.filterSubjects;
if (typeof this.consumerOpts.filter_subjects === "string") {
config.filter_subject = this.consumerOpts.filter_subjects;
}
if (this.consumerOpts.replay_policy) {
config.replay_policy = this.consumerOpts.replay_policy;
Expand Down
11 changes: 11 additions & 0 deletions jetstream/src/internal_mod.ts
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,11 @@ export {
ConsumerEvents,
consumerOpts,
DirectMsgHeaders,
isBoundPushConsumerOptions,
isConsumerOptsBuilder,
isOrderedPushConsumerOptions,
isPullConsumer,
isPushConsumer,
JsHeaders,
RepublishHeaders,
} from "./types.ts";
Expand All @@ -36,6 +40,7 @@ export type {
AbortOnMissingResource,
Advisory,
Bind,
BoundPushConsumerOptions,
Closed,
ConsumeBytes,
ConsumeCallback,
Expand All @@ -45,18 +50,21 @@ export type {
ConsumerAPI,
ConsumerCallbackFn,
ConsumerInfoable,
ConsumerKind,
ConsumerMessages,
ConsumerOpts,
ConsumerOptsBuilder,
Consumers,
ConsumerStatus,
DeleteableConsumer,
Destroyable,
DirectStreamAPI,
Expires,
FetchBytes,
FetchMessages,
FetchOptions,
IdleHeartbeat,
InfoableConsumer,
JetStreamClient,
JetStreamManager,
JetStreamManagerOptions,
Expand All @@ -73,8 +81,11 @@ export type {
MaxMessages,
NextOptions,
OrderedConsumerOptions,
OrderedPushConsumerOptions,
PubAck,
Pullable,
PushConsumer,
PushConsumerOptions,
StoredMsg,
Stream,
StreamAPI,
Expand Down
2 changes: 0 additions & 2 deletions jetstream/src/jsapi_types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -890,7 +890,6 @@ export interface ConsumerConfig extends ConsumerUpdateConfig {
"deliver_policy": DeliverPolicy;
/**
* Allows push consumers to form a queue group
* @deprecated
*/
"deliver_group"?: string;
/**
Expand Down Expand Up @@ -974,7 +973,6 @@ export interface ConsumerUpdateConfig {
"headers_only"?: boolean;
/**
* The subject where the push consumer should be sent the messages
* @deprecated
*/
"deliver_subject"?: string;
/**
Expand Down
Loading

0 comments on commit 2d93935

Please sign in to comment.