Skip to content

Commit

Permalink
fix(jetstream): more resilient ordered consumers and protocol message…
Browse files Browse the repository at this point in the history
… handling (#104)

Signed-off-by: Alberto Ricart <[email protected]>
  • Loading branch information
aricart authored Nov 1, 2024
1 parent 539a717 commit fb44600
Show file tree
Hide file tree
Showing 14 changed files with 48 additions and 33 deletions.
2 changes: 1 addition & 1 deletion jetstream/deno.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "@nats-io/jetstream",
"version": "3.0.0-21",
"version": "3.0.0-22",
"exports": {
".": "./src/mod.ts",
"./internal": "./src/internal_mod.ts"
Expand Down
2 changes: 1 addition & 1 deletion jetstream/package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "@nats-io/jetstream",
"version": "3.0.0-21",
"version": "3.0.0-22",
"files": [
"lib/",
"LICENSE",
Expand Down
19 changes: 11 additions & 8 deletions jetstream/src/consumer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -166,8 +166,15 @@ export class PullConsumerMessagesImpl extends QueuedIteratorImpl<JsMsg>
}
this.monitor?.work();

const isProtocol = msg.subject === this.inbox;
const isProtocol = this.consumer.ordered
? msg.subject.indexOf(this?.inboxPrefix!) === 0
: msg.subject === this.inbox;

if (isProtocol) {
if (msg.subject !== (this.sub as SubscriptionImpl).subject) {
// this is a stale message - was not sent to the current inbox
return;
}
const status = new JetStreamStatus(msg);

if (status.isIdleHeartbeat()) {
Expand All @@ -178,7 +185,6 @@ export class PullConsumerMessagesImpl extends QueuedIteratorImpl<JsMsg>
const description = status.description;

const { msgsLeft, bytesLeft } = status.parseDiscard();
console.log("pending", msgsLeft, bytesLeft);
if ((msgsLeft && msgsLeft > 0) || (bytesLeft && bytesLeft > 0)) {
this.pending.msgs -= msgsLeft;
this.pending.bytes -= bytesLeft;
Expand Down Expand Up @@ -842,15 +848,12 @@ export class PullConsumerImpl implements Consumer {
return ci;
}

info(cached = false): Promise<ConsumerInfo> {
async info(cached = false): Promise<ConsumerInfo> {
if (cached) {
return Promise.resolve(this._info);
}
const { stream_name, name } = this._info;
return this.api.info(stream_name, name)
.then((ci) => {
this._info = ci;
return this._info;
});
this._info = await this.api.info(stream_name, name);
return this._info;
}
}
4 changes: 2 additions & 2 deletions jetstream/src/jsbaseclient_api.ts
Original file line number Diff line number Diff line change
Expand Up @@ -119,8 +119,8 @@ export class BaseApiClientImpl {
} catch (err) {
const re = err instanceof RequestError ? err as RequestError : null;
if (
err instanceof errors.TimeoutError ||
re?.isNoResponders() && i + 1 < retries
(err instanceof errors.TimeoutError || re?.isNoResponders()) &&
i + 1 < retries
) {
await delay(bo.backoff(i));
} else {
Expand Down
4 changes: 3 additions & 1 deletion jetstream/src/jserrors.ts
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,9 @@ export class JetStreamStatus {

debug() {
console.log({
message: this.description,
subject: this.msg.subject,
reply: this.msg.reply,
description: this.description,
status: this.code,
headers: this.msg.headers,
});
Expand Down
28 changes: 19 additions & 9 deletions jetstream/src/pushconsumer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ import type {
QueuedIterator,
Status,
Subscription,
SubscriptionImpl,
} from "@nats-io/nats-core/internal";
import { JetStreamStatus } from "./jserrors.ts";

Expand Down Expand Up @@ -281,9 +282,26 @@ export class PushConsumerMessagesImpl extends QueuedIteratorImpl<JsMsg>
}
this.monitor?.work();

const isProtocol = msg.subject === subject;
// need to make sure to catch all protocol messages even
const isProtocol = this.ordered
? msg.subject.indexOf(this?.deliverPrefix!) === 0
: msg.subject === subject;

if (isProtocol) {
if (msg.subject !== (this.sub as SubscriptionImpl).subject) {
// this is a stale message - was not sent to the current inbox
return;
}

const status = new JetStreamStatus(msg);
if (status.isFlowControlRequest()) {
this._push(() => {
msg.respond();
this.notify(ConsumerDebugEvents.FlowControl, null);
});
return;
}

if (status.isIdleHeartbeat()) {
const natsLastConsumer = msg.headers?.get("Nats-Last-Consumer");
const natsLastStream = msg.headers?.get("Nats-Last-Stream");
Expand All @@ -293,14 +311,6 @@ export class PushConsumerMessagesImpl extends QueuedIteratorImpl<JsMsg>
});
return;
}
if (status.isFlowControlRequest()) {
status.debug();
this._push(() => {
msg.respond();
this.notify(ConsumerDebugEvents.FlowControl, null);
});
return;
}

const code = status.code;
const description = status.description;
Expand Down
2 changes: 1 addition & 1 deletion kv/deno.json
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,6 @@
},
"imports": {
"@nats-io/nats-core": "jsr:@nats-io/nats-core@~3.0.0-34",
"@nats-io/jetstream": "jsr:@nats-io/jetstream@~3.0.0-21"
"@nats-io/jetstream": "jsr:@nats-io/jetstream@~3.0.0-22"
}
}
4 changes: 2 additions & 2 deletions kv/import_map.json
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,8 @@
"imports": {
"@nats-io/nats-core": "jsr:@nats-io/nats-core@~3.0.0-34",
"@nats-io/nats-core/internal": "jsr:@nats-io/nats-core@~3.0.0-34/internal",
"@nats-io/jetstream": "jsr:@nats-io/jetstream@~3.0.0-21",
"@nats-io/jetstream/internal": "jsr:@nats-io/jetstream@~3.0.0-21/internal",
"@nats-io/jetstream": "jsr:@nats-io/jetstream@~3.0.0-22",
"@nats-io/jetstream/internal": "jsr:@nats-io/jetstream@~3.0.0-22/internal",
"test_helpers": "../test_helpers/mod.ts",
"@nats-io/nkeys": "jsr:@nats-io/[email protected]",
"@nats-io/nuid": "jsr:@nats-io/[email protected]",
Expand Down
2 changes: 1 addition & 1 deletion kv/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@
},
"description": "kv library - this library implements all the base functionality for NATS KV javascript clients",
"dependencies": {
"@nats-io/jetstream": "3.0.0-21",
"@nats-io/jetstream": "3.0.0-22",
"@nats-io/nats-core": "3.0.0-34"
},
"devDependencies": {
Expand Down
2 changes: 1 addition & 1 deletion obj/deno.json
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,6 @@
},
"imports": {
"@nats-io/nats-core": "jsr:@nats-io/nats-core@~3.0.0-34",
"@nats-io/jetstream": "jsr:@nats-io/jetstream@~3.0.0-21"
"@nats-io/jetstream": "jsr:@nats-io/jetstream@~3.0.0-22"
}
}
4 changes: 2 additions & 2 deletions obj/import_map.json
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,8 @@
"imports": {
"@nats-io/nats-core": "jsr:@nats-io/nats-core@~3.0.0-34",
"@nats-io/nats-core/internal": "jsr:@nats-io/nats-core@~3.0.0-34/internal",
"@nats-io/jetstream": "jsr:@nats-io/jetstream@~3.0.0-21",
"@nats-io/jetstream/internal": "jsr:@nats-io/jetstream@~3.0.0-21/internal",
"@nats-io/jetstream": "jsr:@nats-io/jetstream@~3.0.0-22",
"@nats-io/jetstream/internal": "jsr:@nats-io/jetstream@~3.0.0-22/internal",
"test_helpers": "../test_helpers/mod.ts",
"@nats-io/nkeys": "jsr:@nats-io/[email protected]",
"@nats-io/nuid": "jsr:@nats-io/[email protected]",
Expand Down
2 changes: 1 addition & 1 deletion obj/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@
},
"description": "obj library - this library implements all the base functionality for NATS objectstore for javascript clients",
"dependencies": {
"@nats-io/jetstream": "3.0.0-21",
"@nats-io/jetstream": "3.0.0-22",
"@nats-io/nats-core": "3.0.0-34"
},
"devDependencies": {
Expand Down
4 changes: 2 additions & 2 deletions transport-node/package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "@nats-io/transport-node",
"version": "3.0.0-19",
"version": "3.0.0-20",
"description": "Node.js client for NATS, a lightweight, high-performance cloud native messaging system",
"keywords": [
"nats",
Expand Down Expand Up @@ -64,7 +64,7 @@
"nats-jwt": "^0.0.9",
"shx": "^0.3.3",
"typescript": "5.6.3",
"@nats-io/jetstream": "3.0.0-21",
"@nats-io/jetstream": "3.0.0-22",
"@nats-io/kv": "3.0.0-16",
"@nats-io/obj": "3.0.0-17"
}
Expand Down
2 changes: 1 addition & 1 deletion transport-node/src/version.ts
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
// This file is generated - do not edit
export const version = "3.0.0-19";
export const version = "3.0.0-20";

0 comments on commit fb44600

Please sign in to comment.