From a4cdc522f63f5492d4e6d58ed6d557cc57632c7b Mon Sep 17 00:00:00 2001 From: Alberto Ricart Date: Thu, 31 Oct 2024 16:42:36 -0500 Subject: [PATCH] wip errors Signed-off-by: Alberto Ricart --- core/deno.json | 2 +- core/package.json | 2 +- core/src/core.ts | 10 +-- core/src/errors.ts | 26 ++++-- core/src/headers.ts | 25 ++---- core/src/internal_mod.ts | 17 +++- core/src/mod.ts | 13 +++ core/src/nats.ts | 48 ++++++----- core/src/options.ts | 22 ++---- core/src/protocol.ts | 18 ++--- core/src/request.ts | 17 ++-- core/src/util.ts | 13 ++- core/src/version.ts | 2 +- core/src/ws_transport.ts | 8 +- core/tests/basics_test.ts | 30 ++++++- jetstream/deno.json | 4 +- jetstream/import_map.json | 4 +- jetstream/package.json | 4 +- jetstream/src/consumer.ts | 17 ++-- jetstream/src/internal_mod.ts | 1 + jetstream/src/jsbaseclient_api.ts | 41 +++++++--- jetstream/src/jsclient.ts | 11 +-- jetstream/src/jserrors.ts | 79 +++++++++++-------- jetstream/src/jslister.ts | 5 +- jetstream/src/jsm.ts | 22 +++--- jetstream/src/jsmconsumer_api.ts | 21 +++-- jetstream/src/jsmstream_api.ts | 44 ++++++----- jetstream/src/pushconsumer.ts | 14 ++-- jetstream/tests/consumers_ordered_test.ts | 19 +++-- .../tests/jetstream_pushconsumer_test.ts | 8 +- jetstream/tests/jsm_test.ts | 48 +++++------ jetstream/tests/jsmsg_test.ts | 7 +- jetstream/tests/next_test.ts | 19 +++-- jetstream/tests/util.ts | 23 ++---- kv/deno.json | 6 +- kv/import_map.json | 8 +- kv/package.json | 6 +- migration.md | 58 +++++++------- obj/deno.json | 6 +- obj/import_map.json | 8 +- obj/package.json | 6 +- package.json | 10 +++ services/deno.json | 4 +- services/import_map.json | 4 +- services/package.json | 4 +- services/src/service.ts | 2 +- transport-deno/deno.json | 4 +- transport-deno/src/version.ts | 2 +- transport-node/package.json | 10 +-- transport-node/src/node_transport.ts | 8 +- transport-node/src/version.ts | 2 +- 51 files changed, 428 insertions(+), 364 deletions(-) create mode 100644 package.json diff --git a/core/deno.json b/core/deno.json index dd6dca6a..ddbc0ad0 100644 --- a/core/deno.json +++ b/core/deno.json @@ -1,6 +1,6 @@ { "name": "@nats-io/nats-core", - "version": "3.0.0-32", + "version": "3.0.0-34", "exports": { ".": "./src/mod.ts", "./internal": "./src/internal_mod.ts" diff --git a/core/package.json b/core/package.json index 5d61ee60..c986b250 100644 --- a/core/package.json +++ b/core/package.json @@ -1,6 +1,6 @@ { "name": "@nats-io/nats-core", - "version": "3.0.0-32", + "version": "3.0.0-34", "files": [ "lib/", "LICENSE", diff --git a/core/src/core.ts b/core/src/core.ts index c9225e74..6c61592e 100644 --- a/core/src/core.ts +++ b/core/src/core.ts @@ -14,7 +14,7 @@ */ import { nuid } from "./nuid.ts"; -import { errors, InvalidArgumentError } from "./errors.ts"; +import { InvalidArgumentError } from "./errors.ts"; /** * Events reported by the {@link NatsConnection#status} iterator. @@ -663,11 +663,9 @@ export function createInbox(prefix = ""): string { prefix.split(".") .forEach((v) => { if (v === "*" || v === ">") { - throw new errors.InvalidArgumentError( - InvalidArgumentError.format( - "prefix", - `cannot have wildcards ('${prefix}')`, - ), + throw InvalidArgumentError.format( + "prefix", + `cannot have wildcards ('${prefix}')`, ); } }); diff --git a/core/src/errors.ts b/core/src/errors.ts index 8ace4d67..eed0c1dd 100644 --- a/core/src/errors.ts +++ b/core/src/errors.ts @@ -33,13 +33,21 @@ export class InvalidArgumentError extends Error { this.name = "InvalidArgumentError"; } - static format(name: string, message: string): string { - return `'${name}' ${message}`; - } - - static formatMultiple(names: string[], message: string): string { - names = names.map((n) => `'${n}'`); - return `${names.join(",")} ${message}`; + static format( + property: string | string[], + message: string, + options?: ErrorOptions, + ): InvalidArgumentError { + if (Array.isArray(message) && message.length > 1) { + message = message[0]; + } + if (Array.isArray(property)) { + property = property.map((n) => `'${n}'`); + property = property.join(","); + } else { + property = `'${property}'`; + } + return new InvalidArgumentError(`${property} ${message}`, options); } } @@ -186,6 +194,10 @@ export class RequestError extends Error { super(message, options); this.name = "RequestError"; } + + isNoResponders(): boolean { + return this.cause instanceof NoRespondersError; + } } /** diff --git a/core/src/headers.ts b/core/src/headers.ts index e4cde44c..5e7f73b5 100644 --- a/core/src/headers.ts +++ b/core/src/headers.ts @@ -18,7 +18,7 @@ import { TD, TE } from "./encoders.ts"; import type { MsgHdrs } from "./core.ts"; import { Match } from "./core.ts"; -import { errors } from "./errors.ts"; +import { InvalidArgumentError } from "./errors.ts"; // https://www.ietf.org/rfc/rfc822.txt // 3.1.2. STRUCTURE OF HEADER FIELDS @@ -48,11 +48,9 @@ export function canonicalMIMEHeaderKey(k: string): string { for (let i = 0; i < k.length; i++) { let c = k.charCodeAt(i); if (c === colon || c < start || c > end) { - throw new errors.InvalidArgumentError( - errors.InvalidArgumentError.format( - "header", - `'${k[i]}' is not a valid character in a header name`, - ), + throw InvalidArgumentError.format( + "header", + `'${k[i]}' is not a valid character in a header name`, ); } if (upper && a <= c && c <= z) { @@ -68,12 +66,7 @@ export function canonicalMIMEHeaderKey(k: string): string { export function headers(code = 0, description = ""): MsgHdrs { if ((code === 0 && description !== "") || (code > 0 && description === "")) { - throw new errors.InvalidArgumentError( - errors.InvalidArgumentError.format( - "description", - "is required", - ), - ); + throw InvalidArgumentError.format("description", "is required"); } return new MsgHdrsImpl(code, description); } @@ -178,11 +171,9 @@ export class MsgHdrsImpl implements MsgHdrs { static validHeaderValue(k: string): string { const inv = /[\r\n]/; if (inv.test(k)) { - throw new errors.InvalidArgumentError( - errors.InvalidArgumentError.format( - "header", - "values cannot contain \\r or \\n", - ), + throw InvalidArgumentError.format( + "header", + "values cannot contain \\r or \\n", ); } return k.trim(); diff --git a/core/src/internal_mod.ts b/core/src/internal_mod.ts index 2d30ac39..fd6f48ee 100644 --- a/core/src/internal_mod.ts +++ b/core/src/internal_mod.ts @@ -139,4 +139,19 @@ export { SHA256 } from "./sha256.ts"; export { wsconnect, wsUrlParseFn } from "./ws_transport.ts"; -export { errors } from "./errors.ts"; +export { + AuthorizationError, + ClosedConnectionError, + ConnectionError, + DrainingConnectionError, + errors, + InvalidArgumentError, + InvalidOperationError, + InvalidSubjectError, + NoRespondersError, + PermissionViolationError, + ProtocolError, + RequestError, + TimeoutError, + UserAuthenticationExpiredError, +} from "./errors.ts"; diff --git a/core/src/mod.ts b/core/src/mod.ts index 016af975..db81c3aa 100644 --- a/core/src/mod.ts +++ b/core/src/mod.ts @@ -14,20 +14,27 @@ */ export { + AuthorizationError, backoff, Bench, buildAuthenticator, canonicalMIMEHeaderKey, + ClosedConnectionError, + ConnectionError, createInbox, credsAuthenticator, deadline, DebugEvents, deferred, delay, + DrainingConnectionError, Empty, errors, Events, headers, + InvalidArgumentError, + InvalidOperationError, + InvalidSubjectError, jwtAuthenticator, Match, Metric, @@ -36,11 +43,17 @@ export { nanos, nkeyAuthenticator, nkeys, + NoRespondersError, Nuid, nuid, + PermissionViolationError, + ProtocolError, + RequestError, RequestStrategy, syncIterator, + TimeoutError, tokenAuthenticator, + UserAuthenticationExpiredError, usernamePasswordAuthenticator, wsconnect, } from "./internal_mod.ts"; diff --git a/core/src/nats.ts b/core/src/nats.ts index 57bb6a3e..defa7df1 100644 --- a/core/src/nats.ts +++ b/core/src/nats.ts @@ -22,16 +22,13 @@ import { parseSemVer } from "./semver.ts"; import { parseOptions } from "./options.ts"; import { QueuedIteratorImpl } from "./queued_iterator.ts"; -import { RequestMany, RequestOne } from "./request.ts"; - import type { RequestManyOptionsInternal } from "./request.ts"; - -import { createInbox, RequestStrategy } from "./core.ts"; -import type { Dispatcher } from "./core.ts"; +import { RequestMany, RequestOne } from "./request.ts"; import type { ConnectionOptions, Context, + Dispatcher, Msg, NatsConnection, Payload, @@ -45,7 +42,8 @@ import type { Subscription, SubscriptionOptions, } from "./core.ts"; -import { errors, InvalidArgumentError } from "./errors.ts"; +import { createInbox, RequestStrategy } from "./core.ts"; +import { errors, InvalidArgumentError, TimeoutError } from "./errors.ts"; export class NatsConnectionImpl implements NatsConnection { options: ConnectionOptions; @@ -110,6 +108,9 @@ export class NatsConnectionImpl implements NatsConnection { options?: PublishOptions, ): void { this._check(subject, false, true); + if (options?.reply) { + this._check(options.reply, false, true); + } this.protocol.publish(subject, data, options); } @@ -185,10 +186,7 @@ export class NatsConnectionImpl implements NatsConnection { opts.maxWait = opts.maxWait || 1000; if (opts.maxWait < 1) { return Promise.reject( - new errors.InvalidArgumentError(InvalidArgumentError.format( - "timeout", - "must be greater than 0", - )), + InvalidArgumentError.format("timeout", "must be greater than 0"), ); } @@ -353,18 +351,14 @@ export class NatsConnectionImpl implements NatsConnection { opts.timeout = opts.timeout || 1000; if (opts.timeout < 1) { return Promise.reject( - new errors.InvalidArgumentError( - InvalidArgumentError.format("timeout", `must be greater than 0`), - ), + InvalidArgumentError.format("timeout", `must be greater than 0`), ); } if (!opts.noMux && opts.reply) { return Promise.reject( - new errors.InvalidArgumentError( - InvalidArgumentError.formatMultiple( - ["reply", "noMux"], - "are mutually exclusive", - ), + InvalidArgumentError.format( + ["reply", "noMux"], + "are mutually exclusive", ), ); } @@ -381,18 +375,20 @@ export class NatsConnectionImpl implements NatsConnection { max: 1, timeout: opts.timeout, callback: (err, msg) => { - // check for no responders + // check for no responders status if (msg && msg.data?.length === 0 && msg.headers?.code === 503) { err = new errors.NoRespondersError(subject); } if (err) { - // if we have a context, use that as the wrapper - if (errCtx) { - errCtx.message = err.message; - errCtx.cause = err; - err = errCtx; - } else { - err = new errors.RequestError(err.message, { cause: err }); + // we have a proper stack on timeout + if (!(err instanceof TimeoutError)) { + if (errCtx) { + errCtx.message = err.message; + errCtx.cause = err; + err = errCtx; + } else { + err = new errors.RequestError(err.message, { cause: err }); + } } d.reject(err); sub.unsubscribe(); diff --git a/core/src/options.ts b/core/src/options.ts index 7f38e994..a10eb009 100644 --- a/core/src/options.ts +++ b/core/src/options.ts @@ -23,7 +23,7 @@ import { tokenAuthenticator, usernamePasswordAuthenticator, } from "./authenticator.ts"; -import { errors } from "./errors.ts"; +import { errors, InvalidArgumentError } from "./errors.ts"; export const DEFAULT_MAX_RECONNECT_ATTEMPTS = 10; export const DEFAULT_JITTER = 100; @@ -84,11 +84,9 @@ export function parseOptions(opts?: ConnectionOptions): ConnectionOptions { } if (opts.servers.length > 0 && opts.port) { - throw new errors.InvalidArgumentError( - errors.InvalidArgumentError.formatMultiple( - ["servers", "port"], - "are mutually exclusive", - ), + throw InvalidArgumentError.format( + ["servers", "port"], + "are mutually exclusive", ); } @@ -133,11 +131,9 @@ export function parseOptions(opts?: ConnectionOptions): ConnectionOptions { if (options.resolve) { if (typeof getResolveFn() !== "function") { - throw new errors.InvalidArgumentError( - errors.InvalidArgumentError.format( - "resolve", - "is not supported in the current runtime", - ), + throw InvalidArgumentError.format( + "resolve", + "is not supported in the current runtime", ); } } @@ -159,8 +155,6 @@ export function checkOptions(info: ServerInfo, options: ConnectionOptions) { export function checkUnsupportedOption(prop: string, v?: unknown) { if (v) { - throw new errors.InvalidArgumentError( - errors.InvalidArgumentError.format(prop, "is not supported"), - ); + throw InvalidArgumentError.format(prop, "is not supported"); } } diff --git a/core/src/protocol.ts b/core/src/protocol.ts index 0bcb92a4..61f18c97 100644 --- a/core/src/protocol.ts +++ b/core/src/protocol.ts @@ -50,12 +50,13 @@ import { DEFAULT_PING_INTERVAL, DEFAULT_RECONNECT_TIME_WAIT, } from "./options.ts"; +import { errors, InvalidArgumentError } from "./errors.ts"; + import type { AuthorizationError, PermissionViolationError, UserAuthenticationExpiredError, } from "./errors.ts"; -import { errors } from "./errors.ts"; const FLUSH_THRESHOLD = 1024 * 32; @@ -870,11 +871,9 @@ export class ProtocolHandler implements Dispatcher { let hlen = 0; if (options.headers) { if (this.info && !this.info.headers) { - throw new errors.InvalidArgumentError( - errors.InvalidArgumentError.format( - "headers", - "are not available on this server", - ), + InvalidArgumentError.format( + "headers", + "are not available on this server", ); } const hdrs = options.headers as MsgHdrsImpl; @@ -884,12 +883,7 @@ export class ProtocolHandler implements Dispatcher { } if (this.info && len > this.info.max_payload) { - throw new errors.InvalidArgumentError( - errors.InvalidArgumentError.format( - "payload", - "max_payload size exceeded", - ), - ); + throw InvalidArgumentError.format("payload", "max_payload size exceeded"); } this.outBytes += len; this.outMsgs++; diff --git a/core/src/request.ts b/core/src/request.ts index c7721dd1..2b1c4e24 100644 --- a/core/src/request.ts +++ b/core/src/request.ts @@ -23,7 +23,7 @@ import type { RequestOptions, } from "./core.ts"; import { RequestStrategy } from "./core.ts"; -import { RequestError } from "./errors.ts"; +import { errors, RequestError, TimeoutError } from "./errors.ts"; export class BaseRequest { token: string; @@ -150,12 +150,15 @@ export class RequestOne extends BaseRequest implements Request { this.timer.cancel(); } if (err) { - if (this.ctx) { - this.ctx.message = err.message; - this.ctx.cause = err; - err = this.ctx; - } else { - err = new RequestError(err.message, { cause: err }); + // we have proper stack on timeout + if (!(err instanceof TimeoutError)) { + if (this.ctx) { + this.ctx.message = err.message; + this.ctx.cause = err; + err = this.ctx; + } else { + err = new errors.RequestError(err.message, { cause: err }); + } } this.deferred.reject(err); } else { diff --git a/core/src/util.ts b/core/src/util.ts index d512e85e..f2ea0786 100644 --- a/core/src/util.ts +++ b/core/src/util.ts @@ -111,14 +111,19 @@ export function delay(ms = 0): Delay { return Object.assign(p, methods) as Delay; } -export function deadline(p: Promise, millis = 1000): Promise { - const err = new Error(`deadline exceeded`); +export async function deadline(p: Promise, millis = 1000): Promise { const d = deferred(); const timer = setTimeout( - () => d.reject(err), + () => { + d.reject(new TimeoutError()); + }, millis, ); - return Promise.race([p, d]).finally(() => clearTimeout(timer)); + try { + return await Promise.race([p, d]); + } finally { + clearTimeout(timer); + } } export interface Deferred extends Promise { diff --git a/core/src/version.ts b/core/src/version.ts index 5e489324..837c22a5 100644 --- a/core/src/version.ts +++ b/core/src/version.ts @@ -1,2 +1,2 @@ // This file is generated - do not edit -export const version = "3.0.0-32"; +export const version = "3.0.0-34"; diff --git a/core/src/ws_transport.ts b/core/src/ws_transport.ts index 82b73bd0..12aad856 100644 --- a/core/src/ws_transport.ts +++ b/core/src/ws_transport.ts @@ -332,11 +332,9 @@ export function wsconnect( urlParseFn: wsUrlParseFn, factory: (): Transport => { if (opts.tls) { - throw new errors.InvalidArgumentError( - InvalidArgumentError.format( - "tls", - "is not configurable on w3c websocket connections", - ), + throw InvalidArgumentError.format( + "tls", + "is not configurable on w3c websocket connections", ); } return new WsTransport(); diff --git a/core/tests/basics_test.ts b/core/tests/basics_test.ts index 625955f0..aac4f38c 100644 --- a/core/tests/basics_test.ts +++ b/core/tests/basics_test.ts @@ -368,6 +368,19 @@ Deno.test("basics - request no responders", async () => { errors.RequestError, "no responders: 'q'", ); + + await cleanup(ns, nc); +}); + +Deno.test("basics - request no responders noMux", async () => { + const { ns, nc } = await _setup(connect); + await assertRejects( + () => { + return nc.request("q", Empty, { timeout: 100, noMux: true }); + }, + errors.RequestError, + "no responders: 'q'", + ); await cleanup(ns, nc); }); @@ -382,6 +395,17 @@ Deno.test("basics - request timeout", async () => { await cleanup(ns, nc); }); +Deno.test("basics - request timeout noMux", async () => { + const { ns, nc } = await _setup(connect); + const s = createInbox(); + nc.subscribe(s, { callback: () => {} }); + await assertRejects(() => { + return nc.request(s, Empty, { timeout: 100, noMux: true }); + }, errors.TimeoutError); + + await cleanup(ns, nc); +}); + Deno.test("basics - request cancel rejects", async () => { const { ns, nc } = await _setup(connect); const nci = nc as NatsConnectionImpl; @@ -583,8 +607,7 @@ Deno.test("basics - no mux requests timeout", async () => { () => { return nc.request(subj, Empty, { timeout: 500, noMux: true }); }, - errors.RequestError, - "timeout", + errors.TimeoutError, ); await cleanup(ns, nc); @@ -617,8 +640,7 @@ Deno.test("basics - no mux request timeout doesn't leak subs", async () => { () => { return nc.request("q", Empty, { noMux: true, timeout: 1000 }); }, - errors.RequestError, - "timeout", + errors.TimeoutError, ); assertEquals(nci.protocol.subscriptions.size(), 1); diff --git a/jetstream/deno.json b/jetstream/deno.json index ba34307d..52076eb5 100644 --- a/jetstream/deno.json +++ b/jetstream/deno.json @@ -1,6 +1,6 @@ { "name": "@nats-io/jetstream", - "version": "3.0.0-19", + "version": "3.0.0-21", "exports": { ".": "./src/mod.ts", "./internal": "./src/internal_mod.ts" @@ -33,6 +33,6 @@ "test": "deno test -A --parallel --reload --trace-leaks --quiet tests/ --import-map=import_map.json" }, "imports": { - "@nats-io/nats-core": "jsr:@nats-io/nats-core@~3.0.0-32" + "@nats-io/nats-core": "jsr:@nats-io/nats-core@~3.0.0-34" } } diff --git a/jetstream/import_map.json b/jetstream/import_map.json index 47161269..3d1a7d73 100644 --- a/jetstream/import_map.json +++ b/jetstream/import_map.json @@ -2,8 +2,8 @@ "imports": { "@nats-io/nkeys": "jsr:@nats-io/nkeys@1.2.0-4", "@nats-io/nuid": "jsr:@nats-io/nuid@2.0.1-2", - "@nats-io/nats-core": "jsr:@nats-io/nats-core@~3.0.0-32", - "@nats-io/nats-core/internal": "jsr:@nats-io/nats-core@~3.0.0-32/internal", + "@nats-io/nats-core": "jsr:@nats-io/nats-core@~3.0.0-34", + "@nats-io/nats-core/internal": "jsr:@nats-io/nats-core@~3.0.0-34/internal", "test_helpers": "../test_helpers/mod.ts", "@std/io": "jsr:@std/io@0.224.0" } diff --git a/jetstream/package.json b/jetstream/package.json index 88938566..d3a5b7f7 100644 --- a/jetstream/package.json +++ b/jetstream/package.json @@ -1,6 +1,6 @@ { "name": "@nats-io/jetstream", - "version": "3.0.0-19", + "version": "3.0.0-21", "files": [ "lib/", "LICENSE", @@ -34,7 +34,7 @@ }, "description": "jetstream library - this library implements all the base functionality for NATS JetStream for javascript clients", "dependencies": { - "@nats-io/nats-core": "3.0.0-32" + "@nats-io/nats-core": "3.0.0-34" }, "devDependencies": { "@types/node": "^22.7.6", diff --git a/jetstream/src/consumer.ts b/jetstream/src/consumer.ts index f134ae4a..64a544ee 100644 --- a/jetstream/src/consumer.ts +++ b/jetstream/src/consumer.ts @@ -169,7 +169,6 @@ export class PullConsumerMessagesImpl extends QueuedIteratorImpl const isProtocol = msg.subject === this.inbox; if (isProtocol) { const status = new JetStreamStatus(msg); - status.debug(); if (status.isIdleHeartbeat()) { this.notify(ConsumerDebugEvents.Heartbeat, status.parseHeartbeat()); @@ -629,11 +628,9 @@ export class PullConsumerMessagesImpl extends QueuedIteratorImpl args.expires = args.expires || 30_000; if (args.expires < 1000) { - throw new errors.InvalidArgumentError( - errors.InvalidArgumentError.format( - "expires", - "must be at least 1000ms", - ), + throw errors.InvalidArgumentError.format( + "expires", + "must be at least 1000ms", ); } @@ -712,9 +709,7 @@ export class PullConsumerImpl implements Consumer { if (this.ordered) { if (opts.bind) { return Promise.reject( - new errors.InvalidArgumentError( - errors.InvalidArgumentError.format("bind", "is not supported"), - ), + errors.InvalidArgumentError.format("bind", "is not supported"), ); } if (this.type === PullConsumerType.Fetch) { @@ -747,9 +742,7 @@ export class PullConsumerImpl implements Consumer { if (this.ordered) { if (opts.bind) { return Promise.reject( - new errors.InvalidArgumentError( - errors.InvalidArgumentError.format("bind", "is not supported"), - ), + errors.InvalidArgumentError.format("bind", "is not supported"), ); } if (this.type === PullConsumerType.Consume) { diff --git a/jetstream/src/internal_mod.ts b/jetstream/src/internal_mod.ts index e169bb57..15fb2e72 100644 --- a/jetstream/src/internal_mod.ts +++ b/jetstream/src/internal_mod.ts @@ -143,4 +143,5 @@ export { JetStreamApiCodes, JetStreamApiError, JetStreamError, + jserrors, } from "./jserrors.ts"; diff --git a/jetstream/src/jsbaseclient_api.ts b/jetstream/src/jsbaseclient_api.ts index 64005845..9f48457f 100644 --- a/jetstream/src/jsbaseclient_api.ts +++ b/jetstream/src/jsbaseclient_api.ts @@ -19,6 +19,7 @@ import { Empty, errors, extend, + RequestError, } from "@nats-io/nats-core/internal"; import type { Msg, @@ -28,7 +29,13 @@ import type { } from "@nats-io/nats-core/internal"; import type { ApiResponse } from "./jsapi_types.ts"; import type { JetStreamOptions } from "./types.ts"; -import { JetStreamApiError } from "./jserrors.ts"; +import { + ConsumerNotFoundError, + JetStreamApiCodes, + JetStreamApiError, + JetStreamNotEnabled, + StreamNotFoundError, +} from "./jserrors.ts"; const defaultPrefix = "$JS.API"; const defaultTimeout = 5000; @@ -71,9 +78,7 @@ export class BaseApiClientImpl { _parseOpts() { let prefix = this.opts.apiPrefix; if (!prefix || prefix.length === 0) { - throw new errors.InvalidArgumentError( - errors.InvalidArgumentError.format("prefix", "cannot be empty"), - ); + throw errors.InvalidArgumentError.format("prefix", "cannot be empty"); } const c = prefix[prefix.length - 1]; if (c === ".") { @@ -112,15 +117,18 @@ export class BaseApiClientImpl { ); return this.parseJsResponse(m); } catch (err) { - const { cause } = err as Error; + const re = err instanceof RequestError ? err as RequestError : null; if ( - (cause instanceof errors.TimeoutError || - cause instanceof errors.NoRespondersError) && - i + 1 < retries + err instanceof errors.TimeoutError || + re?.isNoResponders() && i + 1 < retries ) { await delay(bo.backoff(i)); } else { - throw err; + throw re?.isNoResponders() + ? new JetStreamNotEnabled("jetstream is not enabled", { + cause: err, + }) + : err; } } } @@ -131,7 +139,7 @@ export class BaseApiClientImpl { const r = await this._request(`${this.prefix}.STREAM.NAMES`, q); const names = r as StreamNames; if (!names.streams || names.streams.length !== 1) { - throw new Error("no stream matches subject"); + throw StreamNotFoundError.fromMessage("no stream matches subject"); } return names.streams[0]; } @@ -144,7 +152,18 @@ export class BaseApiClientImpl { const v = JSON.parse(new TextDecoder().decode(m.data)); const r = v as ApiResponse; if (r.error) { - throw new JetStreamApiError(r.error); + switch (r.error.err_code) { + case JetStreamApiCodes.ConsumerNotFound: + throw new ConsumerNotFoundError(r.error); + case JetStreamApiCodes.StreamNotFound: + throw new StreamNotFoundError(r.error); + case JetStreamApiCodes.JetStreamNotEnabledForAccount: { + const jserr = new JetStreamApiError(r.error); + throw new JetStreamNotEnabled(jserr.message, { cause: jserr }); + } + default: + throw new JetStreamApiError(r.error); + } } return v; } diff --git a/jetstream/src/jsclient.ts b/jetstream/src/jsclient.ts index 47bbd630..af1a2345 100644 --- a/jetstream/src/jsclient.ts +++ b/jetstream/src/jsclient.ts @@ -87,13 +87,7 @@ export async function jetstreamManager( try { await adm.getAccountInfo(); } catch (err) { - if ( - err instanceof errors.RequestError && - err.message.includes("no responders") - ) { - throw new JetStreamError("jetstream is not enabled", err); - } - throw new JetStreamError((err as Error).message, err as Error); + throw err; } } return adm; @@ -226,8 +220,7 @@ export class JetStreamClientImpl extends BaseApiClientImpl break; } catch (err) { if ( - err instanceof errors.RequestError && - err.message.includes("no responders") + err instanceof errors.RequestError && err.isNoResponders() ) { await delay(retry_delay); } else { diff --git a/jetstream/src/jserrors.ts b/jetstream/src/jserrors.ts index a5013a61..aacbad08 100644 --- a/jetstream/src/jserrors.ts +++ b/jetstream/src/jserrors.ts @@ -17,7 +17,21 @@ import type { Msg } from "@nats-io/nats-core"; import { JsHeaders } from "./types.ts"; import type { ApiError } from "./jsapi_types.ts"; -export class JetStreamStatusError extends Error { +export class JetStreamNotEnabled extends Error { + constructor(message: string, opts?: ErrorOptions) { + super(message, opts); + this.name = "JetStreamNotEnabled"; + } +} + +export class JetStreamError extends Error { + constructor(message: string, opts?: ErrorOptions) { + super(message, opts); + this.name = "JetStreamError"; + } +} + +export class JetStreamStatusError extends JetStreamError { code: number; constructor(message: string, code: number, opts?: ErrorOptions) { super(message, opts); @@ -35,6 +49,11 @@ export class JetStreamStatus { this._description = ""; } + static maybeParseStatus(msg: Msg): JetStreamStatus | null { + const status = new JetStreamStatus(msg); + return status.code === 0 ? null : status; + } + toError(): JetStreamStatusError { return new JetStreamStatusError(this.description, this.code); } @@ -153,13 +172,6 @@ export class JetStreamStatus { } } -export class JetStreamError extends Error { - constructor(message: string, opts?: ErrorOptions) { - super(message, opts); - this.name = "JetStreamError"; - } -} - export enum JetStreamApiCodes { ConsumerNotFound = 10014, StreamNotFound = 10059, @@ -173,33 +185,6 @@ export function isMessageNotFound(err: Error): boolean { err.code === JetStreamApiCodes.NoMessageFound; } -export class MessageNotFoundError extends Error { - constructor(message: string, opts?: ErrorOptions) { - super(message, opts); - this.name = "MessageNotFoundError"; - } -} - -export class ConsumerNotFoundError extends Error { - stream: string; - consumer: string; - constructor(stream: string, consumer: string, opts?: ErrorOptions) { - super(`consumer not found`, opts); - this.stream = stream; - this.consumer = consumer; - this.name = "ConsumerNotFoundError"; - } -} - -export class StreamNotFoundError extends Error { - stream: string; - constructor(stream: string, opts?: ErrorOptions) { - super(`stream not found`, opts); - this.stream = stream; - this.name = "StreamNotFoundError"; - } -} - export class InvalidNameError extends Error { constructor(name: string, message: string = "", opts?: ErrorOptions) { super(`'${name} ${message}`, opts); @@ -229,9 +214,33 @@ export class JetStreamApiError extends Error { } } +export class ConsumerNotFoundError extends JetStreamApiError { + constructor(jsErr: ApiError, opts?: ErrorOptions) { + super(jsErr, opts); + this.name = "ConsumerNotFoundError"; + } +} + +export class StreamNotFoundError extends JetStreamApiError { + constructor(jsErr: ApiError, opts?: ErrorOptions) { + super(jsErr, opts); + this.name = "StreamNotFoundError"; + } + + static fromMessage(message: string): JetStreamApiError { + return new StreamNotFoundError({ + err_code: JetStreamApiCodes.StreamNotFound, + description: message, + code: 404, + }); + } +} + export const jserrors = { InvalidNameError, ConsumerNotFoundError, StreamNotFoundError, JetStreamError, + JetStreamApiError, + JetStreamNotEnabled, }; diff --git a/jetstream/src/jslister.ts b/jetstream/src/jslister.ts index 2f783a30..02a191f9 100644 --- a/jetstream/src/jslister.ts +++ b/jetstream/src/jslister.ts @@ -1,5 +1,5 @@ /* - * Copyright 2021-2023 The NATS Authors + * Copyright 2021-2024 The NATS Authors * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at @@ -12,6 +12,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ +import { errors } from "@nats-io/nats-core/internal"; import type { BaseApiClientImpl } from "./jsbaseclient_api.ts"; import type { ApiPaged, @@ -38,7 +39,7 @@ export class ListerImpl implements Lister, AsyncIterable { payload?: unknown, ) { if (!subject) { - throw new Error("subject is required"); + throw errors.InvalidArgumentError.format("subject", "is required"); } this.subject = subject; this.jsm = jsm; diff --git a/jetstream/src/jsm.ts b/jetstream/src/jsm.ts index 2354d473..948b28dc 100644 --- a/jetstream/src/jsm.ts +++ b/jetstream/src/jsm.ts @@ -14,13 +14,13 @@ */ import { BaseApiClientImpl } from "./jsbaseclient_api.ts"; -import { DirectMsgHeaders } from "./types.ts"; import type { DirectMsg, DirectStreamAPI, JetStreamOptions, StoredMsg, } from "./types.ts"; +import { DirectMsgHeaders } from "./types.ts"; import type { Codec, Msg, @@ -31,6 +31,7 @@ import type { } from "@nats-io/nats-core"; import { Empty, + errors, QueuedIteratorImpl, RequestStrategy, TD, @@ -107,7 +108,9 @@ export class DirectStreamAPIImpl extends BaseApiClientImpl const pre = this.opts.apiPrefix || "$JS.API"; const subj = `${pre}.DIRECT.GET.${stream}`; if (!Array.isArray(opts.multi_last) || opts.multi_last.length === 0) { - return Promise.reject("multi_last is required"); + return Promise.reject( + errors.InvalidArgumentError.format("multi_last", "is required"), + ); } const payload = JSON.stringify(opts, (key, value) => { if (key === "up_to_time" && value instanceof Date) { @@ -129,13 +132,12 @@ export class DirectStreamAPIImpl extends BaseApiClientImpl (async () => { let gotFirst = false; let badServer = false; - let badRequest: string | undefined; + let status: JetStreamStatus | null = null; for await (const m of raw) { if (!gotFirst) { gotFirst = true; - const code = m.headers?.code || 0; - if (code !== 0 && code < 200 || code > 299) { - badRequest = m.headers?.description.toLowerCase(); + status = JetStreamStatus.maybeParseStatus(m); + if (status) { break; } // inspect the message and make sure that we have a supported server @@ -155,12 +157,14 @@ export class DirectStreamAPIImpl extends BaseApiClientImpl if (badServer) { throw new Error("batch direct get not supported by the server"); } - if (badRequest) { - throw new Error(`bad request: ${badRequest}`); + if (status) { + throw status.toError(); } iter.stop(); }); - })(); + })().catch((err) => { + iter.stop(err); + }); return Promise.resolve(iter); } diff --git a/jetstream/src/jsmconsumer_api.ts b/jetstream/src/jsmconsumer_api.ts index 4e3db947..e507ca24 100644 --- a/jetstream/src/jsmconsumer_api.ts +++ b/jetstream/src/jsmconsumer_api.ts @@ -24,7 +24,7 @@ import type { NatsConnection, NatsConnectionImpl, } from "@nats-io/nats-core/internal"; -import { Feature } from "@nats-io/nats-core/internal"; +import { Feature, InvalidArgumentError } from "@nats-io/nats-core/internal"; import { ConsumerApiAction } from "./jsapi_types.ts"; import type { @@ -56,13 +56,15 @@ export class ConsumerAPIImpl extends BaseApiClientImpl implements ConsumerAPI { validateStreamName(stream); if (cfg.deliver_group && cfg.flow_control) { - throw new Error( - "jetstream flow control is not supported with queue groups", + throw InvalidArgumentError.format( + ["flow_control", "deliver_group"], + "are mutually exclusive", ); } if (cfg.deliver_group && cfg.idle_heartbeat) { - throw new Error( - "jetstream idle heartbeat is not supported with queue groups", + throw InvalidArgumentError.format( + ["idle_heartbeat", "deliver_group"], + "are mutually exclusive", ); } @@ -82,7 +84,7 @@ export class ConsumerAPIImpl extends BaseApiClientImpl implements ConsumerAPI { const name = cfg.name === "" ? undefined : cfg.name; if (name && !newAPI) { - throw new Error(`consumer 'name' requires server ${min}`); + throw InvalidArgumentError.format("name", `requires server ${min}`); } if (name) { try { @@ -106,14 +108,17 @@ export class ConsumerAPIImpl extends BaseApiClientImpl implements ConsumerAPI { if (Array.isArray(cfg.filter_subjects)) { const { min, ok } = nci.features.get(Feature.JS_MULTIPLE_CONSUMER_FILTER); if (!ok) { - throw new Error(`consumer 'filter_subjects' requires server ${min}`); + throw InvalidArgumentError.format( + "filter_subjects", + `requires server ${min}`, + ); } newAPI = false; } if (cfg.metadata) { const { min, ok } = nci.features.get(Feature.JS_STREAM_CONSUMER_METADATA); if (!ok) { - throw new Error(`consumer 'metadata' requires server ${min}`); + throw InvalidArgumentError.format("metadata", `requires server ${min}`); } } if (newAPI) { diff --git a/jetstream/src/jsmstream_api.ts b/jetstream/src/jsmstream_api.ts index 294016eb..cb1f4d30 100644 --- a/jetstream/src/jsmstream_api.ts +++ b/jetstream/src/jsmstream_api.ts @@ -13,26 +13,27 @@ * limitations under the License. */ +import type { + Codec, + MsgHdrs, + NatsConnection, + NatsConnectionImpl, + ReviverFn, +} from "@nats-io/nats-core/internal"; import { createInbox, Empty, errors, Feature, headers, + InvalidArgumentError, MsgHdrsImpl, nanos, nuid, TD, } from "@nats-io/nats-core/internal"; -import type { - Codec, - MsgHdrs, - NatsConnection, - NatsConnectionImpl, - ReviverFn, -} from "@nats-io/nats-core/internal"; -import { BaseApiClientImpl } from "./jsbaseclient_api.ts"; import type { StreamNames } from "./jsbaseclient_api.ts"; +import { BaseApiClientImpl } from "./jsbaseclient_api.ts"; import { ListerImpl } from "./jslister.ts"; import { minValidation, validateStreamName } from "./jsutil.ts"; import type { @@ -51,11 +52,10 @@ import type { StreamAPI, Streams, } from "./types.ts"; - -import { isOrderedPushConsumerOptions } from "./types.ts"; - -import { isBoundPushConsumerOptions } from "./types.ts"; -import { AckPolicy, DeliverPolicy } from "./jsapi_types.ts"; +import { + isBoundPushConsumerOptions, + isOrderedPushConsumerOptions, +} from "./types.ts"; import type { ApiPagedRequest, ConsumerConfig, @@ -77,6 +77,7 @@ import type { StreamUpdateConfig, SuccessResponse, } from "./jsapi_types.ts"; +import { AckPolicy, DeliverPolicy } from "./jsapi_types.ts"; import { PullConsumerImpl } from "./consumer.ts"; import { ConsumerAPIImpl } from "./jsmconsumer_api.ts"; import type { PushConsumerInternalOptions } from "./pushconsumer.ts"; @@ -97,7 +98,10 @@ export function convertStreamSourceDomain(s?: StreamSource) { return copy; } if (copy.external) { - throw new Error("domain and external are both set"); + throw InvalidArgumentError.format( + ["domain", "external"], + "are mutually exclusive", + ); } copy.external = { api: `$JS.${domain}.API` } as ExternalStream; return copy; @@ -213,7 +217,9 @@ export class ConsumersImpl implements Consumers { new PushConsumerImpl(this.api, ci, { bound: true }), ); } else { - return Promise.reject(new Error("deliver_subject is required")); + return Promise.reject( + errors.InvalidArgumentError.format("deliver_subject", "is required"), + ); } } @@ -585,11 +591,9 @@ export class StreamAPIImpl extends BaseApiClientImpl implements StreamAPI { if (opts) { const { keep, seq } = opts as PurgeBySeq & PurgeTrimOpts; if (typeof keep === "number" && typeof seq === "number") { - throw new errors.InvalidArgumentError( - errors.InvalidArgumentError.formatMultiple( - ["keep", "seq"], - "are mutually exclusive", - ), + throw InvalidArgumentError.format( + ["keep", "seq"], + "are mutually exclusive", ); } } diff --git a/jetstream/src/pushconsumer.ts b/jetstream/src/pushconsumer.ts index 5c04e97c..4155e717 100644 --- a/jetstream/src/pushconsumer.ts +++ b/jetstream/src/pushconsumer.ts @@ -138,18 +138,18 @@ export class PushConsumerMessagesImpl extends QueuedIteratorImpl this.stop(err); } const bo = backoff(); + const c = delay(bo.backoff(this.createFails)); c.then(() => { - const idx = this.cancelables.indexOf(c); - if (idx !== -1) { - this.cancelables = this.cancelables.splice(idx, idx); - } if (!this.done) { this.reset(); } - }) - .catch((_) => { - // canceled + }).catch(() => {}) + .finally(() => { + const idx = this.cancelables.indexOf(c); + if (idx !== -1) { + this.cancelables = this.cancelables.splice(idx, idx); + } }); this.cancelables.push(c); }); diff --git a/jetstream/tests/consumers_ordered_test.ts b/jetstream/tests/consumers_ordered_test.ts index c4fb48b6..d41e8e38 100644 --- a/jetstream/tests/consumers_ordered_test.ts +++ b/jetstream/tests/consumers_ordered_test.ts @@ -43,7 +43,7 @@ import type { import { StreamImpl } from "../src/jsmstream_api.ts"; import { delayUntilAssetNotFound } from "./util.ts"; import { flakyTest } from "../../test_helpers/mod.ts"; -import { JetStreamApiError } from "../src/jserrors.ts"; +import { ConsumerNotFoundError } from "../src/jserrors.ts"; Deno.test("ordered consumers - get", async () => { const { ns, nc } = await _setup(connect, jetstreamServerConf()); @@ -1107,14 +1107,16 @@ Deno.test("ordered consumers - initial creation fails, consumer fails", async () Deno.test( "ordered consumers - stale reference recovers", - flakyTest(async () => { + async () => { const { ns, nc } = await _setup(connect, jetstreamServerConf()); const jsm = await jetstreamManager(nc); await jsm.streams.add({ name: "A", subjects: ["a"] }); const js = jsm.jetstream(); - await js.publish("a", JSON.stringify(1)); - await js.publish("a", JSON.stringify(2)); + await Promise.all([ + js.publish("a", JSON.stringify(1)), + js.publish("a", JSON.stringify(2)), + ]); const c = await js.consumers.get("A") as PullConsumerImpl; let m = await c.next({ expires: 1000 }); @@ -1124,25 +1126,22 @@ Deno.test( // continue until the server says the consumer doesn't exist await delayUntilAssetNotFound(c); - await nc.flush(); - // so should get that error once + // so should get CnF once await assertRejects( () => { return c.next({ expires: 1000 }); }, - JetStreamApiError, - "consumer not found", + ConsumerNotFoundError, ); // but now it will be created in line - m = await c.next({ expires: 1000 }); assertExists(m); assertEquals(m.json(), 2); await cleanup(ns, nc); - }), + }, ); Deno.test( diff --git a/jetstream/tests/jetstream_pushconsumer_test.ts b/jetstream/tests/jetstream_pushconsumer_test.ts index 3101428b..442748bf 100644 --- a/jetstream/tests/jetstream_pushconsumer_test.ts +++ b/jetstream/tests/jetstream_pushconsumer_test.ts @@ -30,6 +30,7 @@ import { delay, Empty, Events, + InvalidArgumentError, nanos, nuid, syncIterator, @@ -107,7 +108,7 @@ Deno.test("jetstream - queue error checks", async () => { }); }, Error, - "jetstream idle heartbeat is not supported with queue groups", + "'idle_heartbeat','deliver_group' are mutually exclusive", undefined, ); @@ -120,9 +121,8 @@ Deno.test("jetstream - queue error checks", async () => { flow_control: true, }); }, - Error, - "jetstream flow control is not supported with queue groups", - undefined, + InvalidArgumentError, + "'flow_control','deliver_group' are mutually exclusive", ); await cleanup(ns, nc); diff --git a/jetstream/tests/jsm_test.ts b/jetstream/tests/jsm_test.ts index 4060eb4d..e542fed2 100644 --- a/jetstream/tests/jsm_test.ts +++ b/jetstream/tests/jsm_test.ts @@ -28,7 +28,9 @@ import type { NatsConnection } from "@nats-io/nats-core"; import { deferred, Empty, + errors, headers, + InvalidArgumentError, jwtAuthenticator, nanos, nkeys, @@ -73,7 +75,7 @@ import type { ConsumerAPIImpl } from "../src/jsmconsumer_api.ts"; import { ConsumerApiAction, StoreCompression } from "../src/jsapi_types.ts"; import type { JetStreamManagerImpl } from "../src/jsclient.ts"; import { stripNatsMetadata } from "./util.ts"; -import { JetStreamApiError, JetStreamError } from "../src/jserrors.ts"; +import { jserrors } from "../src/jserrors.ts"; const StreamNameRequired = "stream name required"; const ConsumerNameRequired = "durable name required"; @@ -85,8 +87,7 @@ Deno.test("jsm - jetstream not enabled", async () => { () => { return jetstreamManager(nc); }, - JetStreamError, - "jetstream is not enabled", + jserrors.JetStreamNotEnabled, ); await cleanup(ns, nc); }); @@ -117,8 +118,7 @@ Deno.test("jsm - account not enabled", async () => { () => { return jetstreamManager(nc); }, - JetStreamError, - "not enabled for account", + jserrors.JetStreamNotEnabled, ); const a = await connect( @@ -215,9 +215,7 @@ Deno.test("jsm - info msg not found stream name fails", async () => { async () => { await jsm.streams.info(name); }, - Error, - "stream not found", - undefined, + jserrors.StreamNotFoundError, ); await cleanup(ns, nc); }); @@ -244,9 +242,7 @@ Deno.test("jsm - delete msg not found stream name fails", async () => { async () => { await jsm.streams.deleteMessage(name, 1); }, - Error, - "stream not found", - undefined, + jserrors.StreamNotFoundError, ); await cleanup(ns, nc); }); @@ -317,9 +313,7 @@ Deno.test("jsm - purge not found stream name fails", async () => { async () => { await jsm.streams.purge(name); }, - Error, - "stream not found", - undefined, + jserrors.StreamNotFoundError, ); await cleanup(ns, nc); }); @@ -564,8 +558,7 @@ Deno.test("jsm - stream delete", async () => { async () => { await jsm.streams.info(stream); }, - Error, - "stream not found", + jserrors.StreamNotFoundError, ); await cleanup(ns, nc); }); @@ -645,9 +638,7 @@ Deno.test("jsm - consumer info on not found stream fails", async () => { async () => { await jsm.consumers.info("foo", "dur"); }, - Error, - "stream not found", - undefined, + jserrors.StreamNotFoundError, ); await cleanup(ns, nc); }); @@ -660,9 +651,7 @@ Deno.test("jsm - consumer info on not found consumer", async () => { async () => { await jsm.consumers.info(stream, "dur"); }, - Error, - "consumer not found", - undefined, + jserrors.ConsumerNotFoundError, ); await cleanup(ns, nc); }); @@ -1033,7 +1022,7 @@ Deno.test("jsm - jetstream error info", async () => { }, ); }, - JetStreamApiError, + jserrors.JetStreamApiError, "replicas > 1 not supported in non-clustered mode", ); await cleanup(ns, nc); @@ -1346,8 +1335,8 @@ async function testConsumerNameAPI(nc: NatsConnection) { name: "a", }, "$JS.API.CONSUMER.CREATE.A"); }, - Error, - "consumer 'name' requires server", + errors.InvalidArgumentError, + "'name' requires server", ); const ci = await addC({ @@ -2078,8 +2067,8 @@ Deno.test("jsm - stream/consumer metadata", async () => { async () => { await addConsumer(stream, consumer, { hello: "world" }); }, - Error, - "consumer 'metadata' requires server 2.10.0", + InvalidArgumentError, + "'metadata' requires server", ); // add w/o metadata await addConsumer(stream, consumer); @@ -2088,8 +2077,8 @@ Deno.test("jsm - stream/consumer metadata", async () => { async () => { await updateConsumer(stream, consumer, { hello: "world" }); }, - Error, - "consumer 'metadata' requires server 2.10.0", + InvalidArgumentError, + "'metadata' requires server", ); await cleanup(ns, nc); @@ -2140,7 +2129,6 @@ Deno.test("jsm - validate stream name in operations", async () => { const jsm = await jetstreamManager(nc); const names = ["", ".", "*", ">", "\r", "\n", "\t", " "]; - type fn = (name: string) => Promise; const tests = [ { name: "add stream", diff --git a/jetstream/tests/jsmsg_test.ts b/jetstream/tests/jsmsg_test.ts index 7ea16e66..2d7a3b43 100644 --- a/jetstream/tests/jsmsg_test.ts +++ b/jetstream/tests/jsmsg_test.ts @@ -12,7 +12,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -import { assertEquals, assertRejects, fail } from "jsr:@std/assert"; +import { assert, assertEquals, assertRejects, fail } from "jsr:@std/assert"; import { AckPolicy, jetstream, @@ -159,14 +159,15 @@ Deno.test("jsmsg - no ack consumer is ackAck 503", async () => { const c = await js.consumers.get("A", "a"); const jm = await c.next(); - await assertRejects( + const err = await assertRejects( (): Promise => { return jm!.ackAck(); }, errors.RequestError, - "no responders", ); + assert(err.isNoResponders()); + await cleanup(ns, nc); }); diff --git a/jetstream/tests/next_test.ts b/jetstream/tests/next_test.ts index bce359cd..a41c298d 100644 --- a/jetstream/tests/next_test.ts +++ b/jetstream/tests/next_test.ts @@ -27,6 +27,11 @@ import { delay, nanos } from "@nats-io/nats-core"; import type { NatsConnectionImpl } from "@nats-io/nats-core/internal"; import { jetstream, jetstreamManager } from "../src/mod.ts"; import { delayUntilAssetNotFound } from "./util.ts"; +import { + ConsumerNotFoundError, + JetStreamStatusError, + StreamNotFoundError, +} from "../src/jserrors.ts"; Deno.test("next - basics", async () => { const { ns, nc } = await _setup(connect, jetstreamServerConf()); @@ -140,8 +145,7 @@ Deno.test( () => { return c.next({ expires: 1000 }); }, - Error, - "consumer not found", + ConsumerNotFoundError, ); await cleanup(ns, nc); @@ -167,7 +171,7 @@ Deno.test("next - deleted consumer", async () => { () => { return c.next({ expires: 4000 }); }, - Error, + JetStreamStatusError, "consumer deleted", ); await delay(1000); @@ -180,7 +184,7 @@ Deno.test("next - deleted consumer", async () => { Deno.test( "next - stream not found", - flakyTest(async () => { + async () => { const { ns, nc } = await _setup(connect, jetstreamServerConf()); const jsm = await jetstreamManager(nc); @@ -202,14 +206,13 @@ Deno.test( await assertRejects( () => { - return c.next({ expires: 4000 }); + return c.next({ expires: 1000 }); }, - Error, - "stream not found", + StreamNotFoundError, ); await cleanup(ns, nc); - }), + }, ); Deno.test("next - consumer bind", async () => { diff --git a/jetstream/tests/util.ts b/jetstream/tests/util.ts index fea26e5f..28047d12 100644 --- a/jetstream/tests/util.ts +++ b/jetstream/tests/util.ts @@ -17,7 +17,7 @@ import { delay } from "@nats-io/nats-core"; import type { Consumer, Stream } from "../src/types.ts"; import { fail } from "jsr:@std/assert"; import { StreamImpl } from "../src/jsmstream_api.ts"; -import { JetStreamApiCodes, JetStreamApiError } from "../src/jserrors.ts"; +import { ConsumerNotFoundError, StreamNotFoundError } from "../src/jserrors.ts"; export function stripNatsMetadata(md?: Record) { if (md) { @@ -38,20 +38,13 @@ export async function delayUntilAssetNotFound( await a.info(); await delay(20); } catch (err) { - if (err instanceof JetStreamApiError) { - const jserr = err as JetStreamApiError; - if ( - jserr.code === JetStreamApiCodes.ConsumerNotFound && - expected === "consumer" - ) { - break; - } - if ( - jserr.code === JetStreamApiCodes.StreamNotFound && - expected === "stream" - ) { - break; - } + if (err instanceof ConsumerNotFoundError && expected === "consumer") { + await delay(1000); + break; + } + if (err instanceof StreamNotFoundError && expected === "stream") { + await delay(1000); + break; } fail((err as Error).message); } diff --git a/kv/deno.json b/kv/deno.json index 3808b7b7..ac60b43e 100644 --- a/kv/deno.json +++ b/kv/deno.json @@ -1,6 +1,6 @@ { "name": "@nats-io/kv", - "version": "3.0.0-14", + "version": "3.0.0-16", "exports": { ".": "./src/mod.ts", "./internal": "./src/internal_mod.ts" @@ -33,7 +33,7 @@ "test": "deno test -A --parallel --reload --quiet tests/ --import-map=import_map.json" }, "imports": { - "@nats-io/nats-core": "jsr:@nats-io/nats-core@~3.0.0-32", - "@nats-io/jetstream": "jsr:@nats-io/jetstream@~3.0.0-19" + "@nats-io/nats-core": "jsr:@nats-io/nats-core@~3.0.0-34", + "@nats-io/jetstream": "jsr:@nats-io/jetstream@~3.0.0-21" } } diff --git a/kv/import_map.json b/kv/import_map.json index a174c0af..3b2e41e4 100644 --- a/kv/import_map.json +++ b/kv/import_map.json @@ -1,9 +1,9 @@ { "imports": { - "@nats-io/nats-core": "jsr:@nats-io/nats-core@~3.0.0-32", - "@nats-io/nats-core/internal": "jsr:@nats-io/nats-core@~3.0.0-32/internal", - "@nats-io/jetstream": "jsr:@nats-io/jetstream@~3.0.0-19", - "@nats-io/jetstream/internal": "jsr:@nats-io/jetstream@~3.0.0-19/internal", + "@nats-io/nats-core": "jsr:@nats-io/nats-core@~3.0.0-34", + "@nats-io/nats-core/internal": "jsr:@nats-io/nats-core@~3.0.0-34/internal", + "@nats-io/jetstream": "jsr:@nats-io/jetstream@~3.0.0-21", + "@nats-io/jetstream/internal": "jsr:@nats-io/jetstream@~3.0.0-21/internal", "test_helpers": "../test_helpers/mod.ts", "@nats-io/nkeys": "jsr:@nats-io/nkeys@1.2.0-4", "@nats-io/nuid": "jsr:@nats-io/nuid@2.0.1-2", diff --git a/kv/package.json b/kv/package.json index 7eb8b20f..ff1c99f5 100644 --- a/kv/package.json +++ b/kv/package.json @@ -1,6 +1,6 @@ { "name": "@nats-io/kv", - "version": "3.0.0-14", + "version": "3.0.0-16", "files": [ "lib/", "LICENSE", @@ -34,8 +34,8 @@ }, "description": "kv library - this library implements all the base functionality for NATS KV javascript clients", "dependencies": { - "@nats-io/jetstream": "3.0.0-19", - "@nats-io/nats-core": "3.0.0-32" + "@nats-io/jetstream": "3.0.0-21", + "@nats-io/nats-core": "3.0.0-34" }, "devDependencies": { "@types/node": "^22.7.6", diff --git a/migration.md b/migration.md index f678a9e8..3c7dabf9 100644 --- a/migration.md +++ b/migration.md @@ -2,7 +2,7 @@ The NATS ecosystem has grown a lot since the 2.0 release of the `nats` (nats.js) client. NATS currently runs in several JavaScript runtimes: Deno, Browser, and -Node (Bun). +Node/Bun. While the organization of the library has served developers well, there are a number of issues we would like to address going forward: @@ -13,7 +13,7 @@ number of issues we would like to address going forward: - Better presentation of NATS technologies to developers that are interested in KV, ObjectStore or JetStream. - Smaller dependencies for those that are only interested in the NATS core - functionality (no JetStream) + functionality. - More agility and independence to each of the modules, as well as their own version. - Easier understanding of the functionality in question, as each repository @@ -35,26 +35,26 @@ The transports have also been migrated: - `@nats-io/transport-node` has all the functionality of the original `nats.js` - `@nats-io/transport-deno` has all the functionality of `nats.deno` - `nats.ws` is now part of `@nats-io/nats-core` as it can be used from Deno or - latest version of Node directly. + latest version of Node directly or any runtime that has standard W3C Websocket + support. Note that when installing `@nats-io/transport-node` or `@nats-io/transport-deno`, the `@nats-io/core` APIs are also made available. Your library selection process will start by selecting your runtime, and importing any additional functionality you may be interested in. The -`@nats-io/node`, `@nats-io/deno`, `@nats-io/es-websocket` depend and re-export +`@nats-io/transport-node`, `@nats-io/transport-deno` depend on and re-export `@nats-io/core`. To use the extended functionality (JetStream, KV, ObjectStore, Services) you -will need to install and import from the other libraries and call API to create -an instance of the functionality the need. +will need to install and import from the other libraries to access those APIs. For example, developers that use JetStream can access it by using the functions `jetstream()` and `jetstreamManager()` and provide their NATS connection. Note that the `NatsConnection#jetstream/Manager()` APIs are no longer available. Developers interested in KV or ObjectStore can access the resources by calling -creating a Kvm and calling `create()` or `open()` using wither a +creating a Kvm and calling `create()` or `open()` using either a `JetStreamClient` or a plain `NatsConnection`. Note that the `JetStreamClient#views` API is also no longer available. @@ -76,19 +76,20 @@ these modules for cross-runtime consumption. - QueuedIterator type incorrectly exposed a `push()` operation - this operation is not public API and was removed from the interface. - The internal type `TypedSubscription` and associated interfaces have been - removed, these were supporting legacy JetStream APIs. If you were using these - internal types to transform the types in the subscription, take a look at + removed, these were supporting legacy JetStream APIs + (`subscribe/pullSubscribe()`. If you were using these internal types to + transform the types in the subscription, take a look at [messagepipeline](https://github.com/synadia-io/orbit.js/tree/main/messagepipeline). - The utilities `JSONCodec` and `StringCodec` have been removed, the `Msg` types - and derivatives can set string or Uint8Array payalods. To read payloads as - string or JSON use `string()` and `json()` methods. For publishing JSON - payloads, simply specify the output of `JSON.stringify()` to the publish - operation. + and derivatives can set string or Uint8Array payloads. To read payloads as + string or JSON use `string()` and `json()` methods on Msg or its derivatives. + For publishing JSON payloads, simply specify the output of `JSON.stringify()` + to the publish or request operation. - NatsError was removed in favor of more descriptive types. For example, if you - make a request, the request could fail with a RequestError. The RequestError - in turn will contain the cause `TimeoutError` or a `NoRespondersError`. This - also means that in typescript, the callback signature has been relaxed to just - `(Error, Msg)=>void`. For more information see the JsDoc for the client. + make a request, the request could fail with a RequestError or TimeoutError. + The RequestError in turn will contain the `cause` such as `NoRespondersError`. + This also means that in TypeScript, the callback signature has been relaxed to + just `(Error, Msg)=>void`. For more information see the JsDocs. ## Changes in JetStream @@ -111,13 +112,14 @@ To use JetStream, you must install and import `@nats/jetstream`. - `JetStreamClient.pull()` was deprecated and was removed. Use `Consumer.next()`. - The utility function `consumerOpts()` and associated function - `isConsumerOptsBuilder()` have been removed. Along side of it + `isConsumerOptsBuilder()` have been removed. Alongside of it `ConsumerOptsBuilder` which was used by `subscribe()` and `pullSubscribe()` - type has been removed. + type has also been removed. - JetStream errors are now expressed by the type `JetStreamError` and - `JetStreamAPIError`. For API calls where the server could return an error, - these are `JetStreamAPIError` and contain all the information returned by the - server. + `JetStreamAPIError`. Common errors such as `ConsumerNotFound`, and + `StreamNotFound`, `JetStreamNotEnabled` are subtypes of the above. For API + calls where the server could return an error, these are `JetStreamAPIError` + and contain all the information returned by the server. ## Changes to KV @@ -148,12 +150,12 @@ await kvm.open("mykv"); Previous versions of `Kv.watch()` allowed the client to specify a function that was called when the watch was done providing history values. In this version, -you can find out if a watch is yielding an update by examining into -`KvEntry.isUpdate`. Note that an empty Kv will not yield any watch information. -You can test for this initial condition, by getting the status of the KV, and -inspecting the `values` property, which will state the number of entries in the -Kv. Also note that watches with the option to do updates only, cannot notify -until there's an update. +you can find out if a watch is yielding an update by examining the `isUpdate` +property. Note that an empty Kv will not yield any watch information. You can +test for this initial condition, by getting the status of the KV, and inspecting +the `values` property, which will state the number of entries in the Kv. Also +note that watches with the option to do updates only, cannot notify until +there's an update. ## Changes to ObjectStore diff --git a/obj/deno.json b/obj/deno.json index e390b5e9..602234ba 100644 --- a/obj/deno.json +++ b/obj/deno.json @@ -1,6 +1,6 @@ { "name": "@nats-io/obj", - "version": "3.0.0-14", + "version": "3.0.0-17", "exports": { ".": "./src/mod.ts", "./internal": "./src/internal_mod.ts" @@ -33,7 +33,7 @@ "test": "deno test -A --parallel --reload --quiet tests/ --import-map=import_map.json" }, "imports": { - "@nats-io/nats-core": "jsr:@nats-io/nats-core@~3.0.0-32", - "@nats-io/jetstream": "jsr:@nats-io/jetstream@~3.0.0-19" + "@nats-io/nats-core": "jsr:@nats-io/nats-core@~3.0.0-34", + "@nats-io/jetstream": "jsr:@nats-io/jetstream@~3.0.0-21" } } diff --git a/obj/import_map.json b/obj/import_map.json index a174c0af..3b2e41e4 100644 --- a/obj/import_map.json +++ b/obj/import_map.json @@ -1,9 +1,9 @@ { "imports": { - "@nats-io/nats-core": "jsr:@nats-io/nats-core@~3.0.0-32", - "@nats-io/nats-core/internal": "jsr:@nats-io/nats-core@~3.0.0-32/internal", - "@nats-io/jetstream": "jsr:@nats-io/jetstream@~3.0.0-19", - "@nats-io/jetstream/internal": "jsr:@nats-io/jetstream@~3.0.0-19/internal", + "@nats-io/nats-core": "jsr:@nats-io/nats-core@~3.0.0-34", + "@nats-io/nats-core/internal": "jsr:@nats-io/nats-core@~3.0.0-34/internal", + "@nats-io/jetstream": "jsr:@nats-io/jetstream@~3.0.0-21", + "@nats-io/jetstream/internal": "jsr:@nats-io/jetstream@~3.0.0-21/internal", "test_helpers": "../test_helpers/mod.ts", "@nats-io/nkeys": "jsr:@nats-io/nkeys@1.2.0-4", "@nats-io/nuid": "jsr:@nats-io/nuid@2.0.1-2", diff --git a/obj/package.json b/obj/package.json index 5550bc3f..cf034449 100644 --- a/obj/package.json +++ b/obj/package.json @@ -1,6 +1,6 @@ { "name": "@nats-io/obj", - "version": "3.0.0-14", + "version": "3.0.0-17", "files": [ "lib/", "LICENSE", @@ -34,8 +34,8 @@ }, "description": "obj library - this library implements all the base functionality for NATS objectstore for javascript clients", "dependencies": { - "@nats-io/jetstream": "3.0.0-19", - "@nats-io/nats-core": "3.0.0-32" + "@nats-io/jetstream": "3.0.0-21", + "@nats-io/nats-core": "3.0.0-34" }, "devDependencies": { "@types/node": "^22.7.6", diff --git a/package.json b/package.json new file mode 100644 index 00000000..e8951e30 --- /dev/null +++ b/package.json @@ -0,0 +1,10 @@ +{ + "workspaces": [ + "./core", + "./jetstream", + "./kv", + "./obj", + "./services", + "./transport-node" + ] +} diff --git a/services/deno.json b/services/deno.json index db6f248d..acfb300e 100644 --- a/services/deno.json +++ b/services/deno.json @@ -1,6 +1,6 @@ { "name": "@nats-io/services", - "version": "3.0.0-11", + "version": "3.0.0-12", "exports": { ".": "./src/mod.ts", "./internal": "./src/internal_mod.ts" @@ -33,6 +33,6 @@ "test": "deno test -A --parallel --reload --quiet tests/ --import-map=import_map.json" }, "imports": { - "@nats-io/nats-core": "jsr:@nats-io/nats-core@~3.0.0-32" + "@nats-io/nats-core": "jsr:@nats-io/nats-core@~3.0.0-34" } } diff --git a/services/import_map.json b/services/import_map.json index 23558da1..6c3ae4d4 100644 --- a/services/import_map.json +++ b/services/import_map.json @@ -1,7 +1,7 @@ { "imports": { - "@nats-io/nats-core": "jsr:@nats-io/nats-core@~3.0.0-32", - "@nats-io/nats-core/internal": "jsr:@nats-io/nats-core@~3.0.0-32/internal", + "@nats-io/nats-core": "jsr:@nats-io/nats-core@~3.0.0-34", + "@nats-io/nats-core/internal": "jsr:@nats-io/nats-core@~3.0.0-34/internal", "test_helpers": "../test_helpers/mod.ts", "@nats-io/nkeys": "jsr:@nats-io/nkeys@1.2.0-4", "@nats-io/nuid": "jsr:@nats-io/nuid@2.0.1-2", diff --git a/services/package.json b/services/package.json index bf22e3c7..cb37f4b8 100644 --- a/services/package.json +++ b/services/package.json @@ -1,6 +1,6 @@ { "name": "@nats-io/services", - "version": "3.0.0-11", + "version": "3.0.0-12", "files": [ "lib/", "LICENSE", @@ -34,7 +34,7 @@ }, "description": "services library - this library implements all the base functionality for NATS services for javascript clients", "dependencies": { - "@nats-io/nats-core": "3.0.0-32" + "@nats-io/nats-core": "3.0.0-34" }, "devDependencies": { "@types/node": "^22.7.6", diff --git a/services/src/service.ts b/services/src/service.ts index ea6e0fc5..c6d0e1b2 100644 --- a/services/src/service.ts +++ b/services/src/service.ts @@ -68,7 +68,7 @@ function validateName(context: string, name = "") { } function validName(name = ""): string { - if (name === "") { + if (!name) { throw Error(`name required`); } const RE = /^[-\w]+$/g; diff --git a/transport-deno/deno.json b/transport-deno/deno.json index e6d771c7..857603ab 100644 --- a/transport-deno/deno.json +++ b/transport-deno/deno.json @@ -1,6 +1,6 @@ { "name": "@nats-io/transport-deno", - "version": "3.0.0-8", + "version": "3.0.0-9", "exports": { ".": "./src/mod.ts" }, @@ -20,7 +20,7 @@ }, "imports": { "@std/io": "jsr:@std/io@0.225.0", - "@nats-io/nats-core": "jsr:@nats-io/nats-core@~3.0.0-32", + "@nats-io/nats-core": "jsr:@nats-io/nats-core@~3.0.0-34", "@nats-io/nkeys": "jsr:@nats-io/nkeys@1.2.0-4", "@nats-io/nuid": "jsr:@nats-io/nuid@2.0.1-2" } diff --git a/transport-deno/src/version.ts b/transport-deno/src/version.ts index 5d57281f..75951750 100644 --- a/transport-deno/src/version.ts +++ b/transport-deno/src/version.ts @@ -1,2 +1,2 @@ // This file is generated - do not edit -export const version = "3.0.0-8"; +export const version = "3.0.0-9"; diff --git a/transport-node/package.json b/transport-node/package.json index 23af86d2..c186a64d 100644 --- a/transport-node/package.json +++ b/transport-node/package.json @@ -1,6 +1,6 @@ { "name": "@nats-io/transport-node", - "version": "3.0.0-17", + "version": "3.0.0-19", "description": "Node.js client for NATS, a lightweight, high-performance cloud native messaging system", "keywords": [ "nats", @@ -54,7 +54,7 @@ "node": ">= 18.0.0" }, "dependencies": { - "@nats-io/nats-core": "3.0.0-32", + "@nats-io/nats-core": "3.0.0-34", "@nats-io/nkeys": "1.2.0-7", "@nats-io/nuid": "2.0.1-2" }, @@ -64,8 +64,8 @@ "nats-jwt": "^0.0.9", "shx": "^0.3.3", "typescript": "5.6.3", - "@nats-io/jetstream": "3.0.0-19", - "@nats-io/kv": "3.0.0-14", - "@nats-io/obj": "3.0.0-14" + "@nats-io/jetstream": "3.0.0-21", + "@nats-io/kv": "3.0.0-16", + "@nats-io/obj": "3.0.0-17" } } diff --git a/transport-node/src/node_transport.ts b/transport-node/src/node_transport.ts index 88846259..02b28a54 100644 --- a/transport-node/src/node_transport.ts +++ b/transport-node/src/node_transport.ts @@ -87,11 +87,9 @@ export class NodeTransport implements Transport { //@ts-ignore: this is possibly a TlsSocket if (tlsRequired && this.socket.encrypted !== true) { - throw new errors.InvalidArgumentError( - errors.InvalidArgumentError.format( - "tls", - "is not available on this server", - ), + throw errors.InvalidArgumentError.format( + "tls", + "is not available on this server", ); } diff --git a/transport-node/src/version.ts b/transport-node/src/version.ts index df709f1a..327f3fe4 100644 --- a/transport-node/src/version.ts +++ b/transport-node/src/version.ts @@ -1,2 +1,2 @@ // This file is generated - do not edit -export const version = "3.0.0-17"; +export const version = "3.0.0-19";