diff --git a/jetstream/src/jsm_direct.ts b/jetstream/src/jsm_direct.ts index 7d349a9c..9bb2e949 100644 --- a/jetstream/src/jsm_direct.ts +++ b/jetstream/src/jsm_direct.ts @@ -15,9 +15,14 @@ import { BaseApiClientImpl } from "./jsbaseclient_api.ts"; import type { + ConsumeOptions, + Consumer, + ConsumerMessages, DirectMsg, DirectStreamAPI, + FetchOptions, JetStreamOptions, + NextOptions, StoredMsg, } from "./types.ts"; import { DirectMsgHeaders } from "./types.ts"; @@ -257,6 +262,11 @@ export class DirectMsgImpl implements DirectMsg { return typeof v === "string" ? parseInt(v) : 0; } + get pending(): number { + const v = this.header.last(DirectMsgHeaders.NumPending); + return typeof v === "string" ? parseInt(v) : 0; + } + json(reviver?: ReviverFn): T { return JSON.parse(new TextDecoder().decode(this.data), reviver); } @@ -265,3 +275,64 @@ export class DirectMsgImpl implements DirectMsg { return TD.decode(this.data); } } + +export class DirectConsumer { + stream: string; + api: DirectStreamAPIImpl; + cursor: number; + ordered: boolean; + + constructor(stream: string, api: DirectStreamAPIImpl) { + this.stream = stream; + this.api = api; + this.cursor = 0; + this.ordered = false; + } + + consume(opts?: ConsumeOptions): Promise> { + throw new Error("Method not implemented."); + } + + async fetch(opts?: FetchOptions): Promise> { + const qi = new QueuedIteratorImpl(); + const fo = opts || {}; + let abort = false; + const src = await this.api.get(this.stream, { + seq: this.cursor + 1, + batch: fo.max_messages ?? 100, + callback: (done, err, sm) => { + if (!abort) { + if (done) { + qi.push(() => { + err ? qi.stop(err) : qi.stop(); + }); + } else { + if (this.ordered && sm) { + if (sm.lastSequence !== this.cursor) { + qi.push(() => { + qi.stop(); + }); + abort = true; + qi.stop(); + src.stop(); + } + } + qi.push(sm); + this.cursor = sm.seq; + } + } + }, + }); + + return qi; + } + + async next(): Promise { + const sm = await this.api.getMessage(this.stream, { seq: this.cursor + 1 }); + const seq = sm?.seq; + if (seq) { + this.cursor = seq; + } + return sm; + } +} diff --git a/jetstream/src/jsmstream_api.ts b/jetstream/src/jsmstream_api.ts index 36e3ed57..6cf330f5 100644 --- a/jetstream/src/jsmstream_api.ts +++ b/jetstream/src/jsmstream_api.ts @@ -676,6 +676,13 @@ export class StoredMsgImpl implements StoredMsg { constructor(smr: StreamMsgResponse) { this.smr = smr; } + get pending(): number { + return 0; + } + + get lastSequence(): number { + return 0; + } get subject(): string { return this.smr.message.subject; diff --git a/jetstream/src/types.ts b/jetstream/src/types.ts index 36c2da83..b19e8a61 100644 --- a/jetstream/src/types.ts +++ b/jetstream/src/types.ts @@ -980,6 +980,18 @@ export interface StoredMsg { */ timestamp: string; + /** + * The previous sequence delivered to the client in the current batch. + * This value will be 0 if it was not from a batch request + */ + lastSequence: number; + + /** + * The number of messages in the stream that are pending for a similar + * batch request. 0 if it was not a batch request + */ + pending: number; + /** * Convenience method to parse the message payload as JSON. This method * will throw an exception if there's a parsing error; @@ -1120,6 +1132,7 @@ export enum DirectMsgHeaders { TimeStamp = "Nats-Time-Stamp", Subject = "Nats-Subject", LastSequence = "Nats-Last-Sequence", + NumPending = "Nats-Num-Pending", } export enum RepublishHeaders { diff --git a/jetstream/tests/direct_consumer_test.ts b/jetstream/tests/direct_consumer_test.ts new file mode 100644 index 00000000..eaaaf958 --- /dev/null +++ b/jetstream/tests/direct_consumer_test.ts @@ -0,0 +1,105 @@ +/* + * Copyright 2024 Synadia Communications, Inc + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +import { + assertArrayIncludes, + assertEquals, + assertExists, + assertIsError, + assertRejects, + fail, +} from "jsr:@std/assert"; + +import { deferred, delay } from "@nats-io/nats-core"; + +import { + jetstream, + JetStreamError, + jetstreamManager, + StorageType, + type StoredMsg, +} from "../src/mod.ts"; +import { + cleanup, + jetstreamServerConf, + notCompatible, + notSupported, + setup, +} from "test_helpers"; +import { DirectConsumer, DirectStreamAPIImpl } from "../src/jsm_direct.ts"; + +Deno.test("direct consumer - next", async () => { + const { ns, nc } = await setup(jetstreamServerConf()); + if (await notCompatible(ns, nc, "2.11.0")) { + return; + } + const jsm = await jetstreamManager(nc); + await jsm.streams.add({ name: "A", subjects: ["a"], allow_direct: true }); + + const js = jsm.jetstream(); + await Promise.all([ + js.publish("a"), + js.publish("a"), + js.publish("a"), + ]); + + const dc = new DirectConsumer("A", new DirectStreamAPIImpl(nc)); + + const m = await dc.next(); + console.log(m); + assertEquals(m?.seq, 1); + assertEquals((await dc.next())?.seq, 2); + assertEquals((await dc.next())?.seq, 3); + assertEquals(await dc.next(), null); + + await cleanup(ns, nc); +}); + +Deno.test("direct consumer - batch", async () => { + const { ns, nc } = await setup(jetstreamServerConf()); + if (await notCompatible(ns, nc, "2.11.0")) { + return; + } + const jsm = await jetstreamManager(nc); + await jsm.streams.add({ name: "A", subjects: ["a"], allow_direct: true }); + + const js = jsm.jetstream(); + const buf = []; + for (let i = 0; i < 100; i++) { + buf.push(js.publish("a", `${i}`)); + } + + await Promise.all(buf); + + const dc = new DirectConsumer("A", new DirectStreamAPIImpl(nc)); + + let iter = await dc.fetch({ max_messages: 5 }); + let s = 0; + for await (const sm of iter) { + console.log(sm); + assertEquals(sm.seq, ++s); + } + assertEquals(s, 5); + const m = await dc.next(); + assertEquals(m?.seq, 6); + s = 6; + + iter = await dc.fetch(); + for await (const sm of iter) { + assertEquals(sm.seq, ++s); + } + assertEquals(s, 100); + + await cleanup(ns, nc); +});