From 939f1bf5f0c9fc211292562f04a3d53716a39bee Mon Sep 17 00:00:00 2001 From: Alberto Ricart Date: Fri, 1 Nov 2024 16:17:46 -0500 Subject: [PATCH] fix(jetstream): more resilient ordered consumers and protocol message handling Signed-off-by: Alberto Ricart --- jetstream/deno.json | 2 +- jetstream/package.json | 2 +- jetstream/src/consumer.ts | 19 +++++++++++-------- jetstream/src/jsbaseclient_api.ts | 4 ++-- jetstream/src/jserrors.ts | 4 +++- jetstream/src/pushconsumer.ts | 28 +++++++++++++++++++--------- kv/deno.json | 2 +- kv/import_map.json | 4 ++-- kv/package.json | 2 +- obj/deno.json | 2 +- obj/import_map.json | 4 ++-- obj/package.json | 2 +- transport-node/package.json | 4 ++-- transport-node/src/version.ts | 2 +- 14 files changed, 48 insertions(+), 33 deletions(-) diff --git a/jetstream/deno.json b/jetstream/deno.json index 52076eb5..63b305f2 100644 --- a/jetstream/deno.json +++ b/jetstream/deno.json @@ -1,6 +1,6 @@ { "name": "@nats-io/jetstream", - "version": "3.0.0-21", + "version": "3.0.0-22", "exports": { ".": "./src/mod.ts", "./internal": "./src/internal_mod.ts" diff --git a/jetstream/package.json b/jetstream/package.json index d3a5b7f7..6705252d 100644 --- a/jetstream/package.json +++ b/jetstream/package.json @@ -1,6 +1,6 @@ { "name": "@nats-io/jetstream", - "version": "3.0.0-21", + "version": "3.0.0-22", "files": [ "lib/", "LICENSE", diff --git a/jetstream/src/consumer.ts b/jetstream/src/consumer.ts index 64a544ee..559673ce 100644 --- a/jetstream/src/consumer.ts +++ b/jetstream/src/consumer.ts @@ -166,8 +166,15 @@ export class PullConsumerMessagesImpl extends QueuedIteratorImpl } this.monitor?.work(); - const isProtocol = msg.subject === this.inbox; + const isProtocol = this.consumer.ordered + ? msg.subject.indexOf(this?.inboxPrefix!) === 0 + : msg.subject === this.inbox; + if (isProtocol) { + if (msg.subject !== (this.sub as SubscriptionImpl).subject) { + // this is a stale message - was not sent to the current inbox + return; + } const status = new JetStreamStatus(msg); if (status.isIdleHeartbeat()) { @@ -178,7 +185,6 @@ export class PullConsumerMessagesImpl extends QueuedIteratorImpl const description = status.description; const { msgsLeft, bytesLeft } = status.parseDiscard(); - console.log("pending", msgsLeft, bytesLeft); if ((msgsLeft && msgsLeft > 0) || (bytesLeft && bytesLeft > 0)) { this.pending.msgs -= msgsLeft; this.pending.bytes -= bytesLeft; @@ -842,15 +848,12 @@ export class PullConsumerImpl implements Consumer { return ci; } - info(cached = false): Promise { + async info(cached = false): Promise { if (cached) { return Promise.resolve(this._info); } const { stream_name, name } = this._info; - return this.api.info(stream_name, name) - .then((ci) => { - this._info = ci; - return this._info; - }); + this._info = await this.api.info(stream_name, name); + return this._info; } } diff --git a/jetstream/src/jsbaseclient_api.ts b/jetstream/src/jsbaseclient_api.ts index 9f48457f..8ccb5b19 100644 --- a/jetstream/src/jsbaseclient_api.ts +++ b/jetstream/src/jsbaseclient_api.ts @@ -119,8 +119,8 @@ export class BaseApiClientImpl { } catch (err) { const re = err instanceof RequestError ? err as RequestError : null; if ( - err instanceof errors.TimeoutError || - re?.isNoResponders() && i + 1 < retries + (err instanceof errors.TimeoutError || re?.isNoResponders()) && + i + 1 < retries ) { await delay(bo.backoff(i)); } else { diff --git a/jetstream/src/jserrors.ts b/jetstream/src/jserrors.ts index aacbad08..f7da88f5 100644 --- a/jetstream/src/jserrors.ts +++ b/jetstream/src/jserrors.ts @@ -60,7 +60,9 @@ export class JetStreamStatus { debug() { console.log({ - message: this.description, + subject: this.msg.subject, + reply: this.msg.reply, + description: this.description, status: this.code, headers: this.msg.headers, }); diff --git a/jetstream/src/pushconsumer.ts b/jetstream/src/pushconsumer.ts index 4155e717..d89473d6 100644 --- a/jetstream/src/pushconsumer.ts +++ b/jetstream/src/pushconsumer.ts @@ -46,6 +46,7 @@ import type { QueuedIterator, Status, Subscription, + SubscriptionImpl, } from "@nats-io/nats-core/internal"; import { JetStreamStatus } from "./jserrors.ts"; @@ -281,9 +282,26 @@ export class PushConsumerMessagesImpl extends QueuedIteratorImpl } this.monitor?.work(); - const isProtocol = msg.subject === subject; + // need to make sure to catch all protocol messages even + const isProtocol = this.ordered + ? msg.subject.indexOf(this?.deliverPrefix!) === 0 + : msg.subject === subject; + if (isProtocol) { + if (msg.subject !== (this.sub as SubscriptionImpl).subject) { + // this is a stale message - was not sent to the current inbox + return; + } + const status = new JetStreamStatus(msg); + if (status.isFlowControlRequest()) { + this._push(() => { + msg.respond(); + this.notify(ConsumerDebugEvents.FlowControl, null); + }); + return; + } + if (status.isIdleHeartbeat()) { const natsLastConsumer = msg.headers?.get("Nats-Last-Consumer"); const natsLastStream = msg.headers?.get("Nats-Last-Stream"); @@ -293,14 +311,6 @@ export class PushConsumerMessagesImpl extends QueuedIteratorImpl }); return; } - if (status.isFlowControlRequest()) { - status.debug(); - this._push(() => { - msg.respond(); - this.notify(ConsumerDebugEvents.FlowControl, null); - }); - return; - } const code = status.code; const description = status.description; diff --git a/kv/deno.json b/kv/deno.json index ac60b43e..c39bce2f 100644 --- a/kv/deno.json +++ b/kv/deno.json @@ -34,6 +34,6 @@ }, "imports": { "@nats-io/nats-core": "jsr:@nats-io/nats-core@~3.0.0-34", - "@nats-io/jetstream": "jsr:@nats-io/jetstream@~3.0.0-21" + "@nats-io/jetstream": "jsr:@nats-io/jetstream@~3.0.0-22" } } diff --git a/kv/import_map.json b/kv/import_map.json index 3b2e41e4..541bde98 100644 --- a/kv/import_map.json +++ b/kv/import_map.json @@ -2,8 +2,8 @@ "imports": { "@nats-io/nats-core": "jsr:@nats-io/nats-core@~3.0.0-34", "@nats-io/nats-core/internal": "jsr:@nats-io/nats-core@~3.0.0-34/internal", - "@nats-io/jetstream": "jsr:@nats-io/jetstream@~3.0.0-21", - "@nats-io/jetstream/internal": "jsr:@nats-io/jetstream@~3.0.0-21/internal", + "@nats-io/jetstream": "jsr:@nats-io/jetstream@~3.0.0-22", + "@nats-io/jetstream/internal": "jsr:@nats-io/jetstream@~3.0.0-22/internal", "test_helpers": "../test_helpers/mod.ts", "@nats-io/nkeys": "jsr:@nats-io/nkeys@1.2.0-4", "@nats-io/nuid": "jsr:@nats-io/nuid@2.0.1-2", diff --git a/kv/package.json b/kv/package.json index ff1c99f5..781a3edd 100644 --- a/kv/package.json +++ b/kv/package.json @@ -34,7 +34,7 @@ }, "description": "kv library - this library implements all the base functionality for NATS KV javascript clients", "dependencies": { - "@nats-io/jetstream": "3.0.0-21", + "@nats-io/jetstream": "3.0.0-22", "@nats-io/nats-core": "3.0.0-34" }, "devDependencies": { diff --git a/obj/deno.json b/obj/deno.json index 602234ba..f7555bac 100644 --- a/obj/deno.json +++ b/obj/deno.json @@ -34,6 +34,6 @@ }, "imports": { "@nats-io/nats-core": "jsr:@nats-io/nats-core@~3.0.0-34", - "@nats-io/jetstream": "jsr:@nats-io/jetstream@~3.0.0-21" + "@nats-io/jetstream": "jsr:@nats-io/jetstream@~3.0.0-22" } } diff --git a/obj/import_map.json b/obj/import_map.json index 3b2e41e4..541bde98 100644 --- a/obj/import_map.json +++ b/obj/import_map.json @@ -2,8 +2,8 @@ "imports": { "@nats-io/nats-core": "jsr:@nats-io/nats-core@~3.0.0-34", "@nats-io/nats-core/internal": "jsr:@nats-io/nats-core@~3.0.0-34/internal", - "@nats-io/jetstream": "jsr:@nats-io/jetstream@~3.0.0-21", - "@nats-io/jetstream/internal": "jsr:@nats-io/jetstream@~3.0.0-21/internal", + "@nats-io/jetstream": "jsr:@nats-io/jetstream@~3.0.0-22", + "@nats-io/jetstream/internal": "jsr:@nats-io/jetstream@~3.0.0-22/internal", "test_helpers": "../test_helpers/mod.ts", "@nats-io/nkeys": "jsr:@nats-io/nkeys@1.2.0-4", "@nats-io/nuid": "jsr:@nats-io/nuid@2.0.1-2", diff --git a/obj/package.json b/obj/package.json index cf034449..3f4d3dd6 100644 --- a/obj/package.json +++ b/obj/package.json @@ -34,7 +34,7 @@ }, "description": "obj library - this library implements all the base functionality for NATS objectstore for javascript clients", "dependencies": { - "@nats-io/jetstream": "3.0.0-21", + "@nats-io/jetstream": "3.0.0-22", "@nats-io/nats-core": "3.0.0-34" }, "devDependencies": { diff --git a/transport-node/package.json b/transport-node/package.json index c186a64d..2e19531c 100644 --- a/transport-node/package.json +++ b/transport-node/package.json @@ -1,6 +1,6 @@ { "name": "@nats-io/transport-node", - "version": "3.0.0-19", + "version": "3.0.0-20", "description": "Node.js client for NATS, a lightweight, high-performance cloud native messaging system", "keywords": [ "nats", @@ -64,7 +64,7 @@ "nats-jwt": "^0.0.9", "shx": "^0.3.3", "typescript": "5.6.3", - "@nats-io/jetstream": "3.0.0-21", + "@nats-io/jetstream": "3.0.0-22", "@nats-io/kv": "3.0.0-16", "@nats-io/obj": "3.0.0-17" } diff --git a/transport-node/src/version.ts b/transport-node/src/version.ts index 327f3fe4..b98255d5 100644 --- a/transport-node/src/version.ts +++ b/transport-node/src/version.ts @@ -1,2 +1,2 @@ // This file is generated - do not edit -export const version = "3.0.0-19"; +export const version = "3.0.0-20";