diff --git a/jetstream/src/jsapi_types.ts b/jetstream/src/jsapi_types.ts index 42c17094..be41d099 100644 --- a/jetstream/src/jsapi_types.ts +++ b/jetstream/src/jsapi_types.ts @@ -15,7 +15,7 @@ import type { Nanos } from "@nats-io/nats-core"; import { nanos } from "@nats-io/nats-core"; -import type { StoredMsg } from "./types.ts"; +import type { MaxBytes, StoredMsg } from "./types.ts"; export interface ApiPaged { total: number; @@ -514,6 +514,12 @@ export type CompletionResult = { err?: Error }; export type BatchCallback = (done: CompletionResult | null, d: T) => void; export type StartSeq = { seq?: number }; export type StartTime = { start_time?: Date | string }; + +export type DirectBatch = { + batch: number; +}; +export type DirectMaxBytes = MaxBytes; + export type DirectBatchLimits = { batch?: number; max_bytes?: number; @@ -521,7 +527,8 @@ export type DirectBatchLimits = { }; export type DirectBatchStartSeq = StartSeq & DirectBatchLimits; export type DirectBatchStartTime = StartTime & DirectBatchLimits; -export type DirectBatchOptions = DirectBatchStartSeq | DirectBatchStartTime; +export type DirectBatchOptions = DirectBatchStartSeq & DirectBatchStartTime; +export type DirectFetchOptions = DirectBatch & DirectMaxBytes; export type DirectLastFor = { multi_last: string[]; diff --git a/jetstream/src/jsm_direct.ts b/jetstream/src/jsm_direct.ts index e5512217..9b530ede 100644 --- a/jetstream/src/jsm_direct.ts +++ b/jetstream/src/jsm_direct.ts @@ -19,7 +19,6 @@ import type { ConsumerNotification, DirectMsg, DirectStreamAPI, - FetchOptions, JetStreamOptions, StoredMsg, } from "./types.ts"; @@ -47,10 +46,13 @@ import { import type { CompletionResult, DirectBatchOptions, + DirectFetchOptions, DirectLastFor, DirectMsgRequest, LastForMsgRequest, PullOptions, + StartSeq, + StartTime, } from "./jsapi_types.ts"; import { validateStreamName } from "./jsutil.ts"; import { JetStreamStatus, JetStreamStatusError } from "./jserrors.ts"; @@ -287,12 +289,47 @@ export class DirectConsumer { api: DirectStreamAPIImpl; cursor: { last: number; pending?: number }; listeners: QueuedIteratorImpl[]; + start: StartSeq & StartTime; - constructor(stream: string, api: DirectStreamAPIImpl) { + constructor( + stream: string, + api: DirectStreamAPIImpl, + start: StartSeq & StartTime, + ) { this.stream = stream; this.api = api; this.cursor = { last: 0 }; this.listeners = []; + this.start = start; + } + + getOptions( + opts: Partial = {}, + ): Partial { + const dbo: Partial = {}; + + if (this.cursor.last === 0) { + // we have never pulled, honor initial request options + if (this.start.seq) { + dbo.seq = this.start.seq; + } else if (this.start.start_time) { + dbo.start_time = this.start.start_time; + } else { + dbo.seq = 1; + } + } else { + dbo.seq = this.cursor.last + 1; + } + + if (opts.batch) { + dbo.batch = opts.batch; + } else if (opts.max_bytes) { + dbo.max_bytes = opts.max_bytes; + } else { + dbo.batch = 100; + } + + return dbo; } status(): AsyncIterable { @@ -322,10 +359,9 @@ export class DirectConsumer { console.log(this.cursor); } - consume(opts?: ConsumeOptions): Promise> { + consume(opts?: DirectBatchOptions): Promise> { let pending: Delay; let requestDone: Deferred; - const fo = opts || {}; const qi = new QueuedIteratorImpl(); (async () => { @@ -347,16 +383,13 @@ export class DirectConsumer { break; } requestDone = deferred(); - const opts = { - batch: fo.max_messages ?? 100, - seq: this.cursor.last + 1, - } as DirectBatchOptions; + const dbo = this.getOptions(opts); this.notify({ type: "next", options: Object.assign({}, opts) as PullOptions, }); - opts.callback = (r: CompletionResult | null, sm: StoredMsg): void => { + dbo.callback = (r: CompletionResult | null, sm: StoredMsg): void => { if (r) { // if the current fetch is done, ready to schedule the next if (r.err) { @@ -395,7 +428,7 @@ export class DirectConsumer { const src = await this.api.getBatch( this.stream, - opts, + dbo, ) as QueuedIteratorImpl; qi.iterClosed.then(() => { @@ -413,35 +446,36 @@ export class DirectConsumer { return Promise.resolve(qi); } - async fetch(opts?: FetchOptions): Promise> { + async fetch(opts?: DirectBatchOptions): Promise> { + const dbo = this.getOptions(opts); const qi = new QueuedIteratorImpl(); - const fo = opts || {}; - const src = await this.api.get(this.stream, { - seq: this.cursor.last + 1, - batch: fo.max_messages ?? 100, - callback: (done, sm) => { - if (done) { - // the server sent error or is done, we are done - qi.push(() => { - done.err ? qi.stop(done.err) : qi.stop(); - }); - } else if ( - sm.lastSequence > 0 && sm.lastSequence !== this.cursor.last - ) { - // we are done early because the sequence jumped unexpectedly - qi.push(() => { - qi.stop(); - }); - src.stop(); - } else { - // pass-through to client, and record - qi.push(sm); - qi.received++; - this.cursor.last = sm.seq; - this.cursor.pending = sm.pending; - } - }, - }); + const src = await this.api.get( + this.stream, + Object.assign({ + callback: (done: CompletionResult | null, sm: StoredMsg) => { + if (done) { + // the server sent error or is done, we are done + qi.push(() => { + done.err ? qi.stop(done.err) : qi.stop(); + }); + } else if ( + sm.lastSequence > 0 && sm.lastSequence !== this.cursor.last + ) { + // we are done early because the sequence jumped unexpectedly + qi.push(() => { + qi.stop(); + }); + src.stop(); + } else { + // pass-through to client, and record + qi.push(sm); + qi.received++; + this.cursor.last = sm.seq; + this.cursor.pending = sm.pending; + } + }, + }, dbo), + ); qi.iterClosed.then(() => { src.stop(); }); diff --git a/jetstream/src/types.ts b/jetstream/src/types.ts index 5a4f487c..36cef4d3 100644 --- a/jetstream/src/types.ts +++ b/jetstream/src/types.ts @@ -461,6 +461,7 @@ export type IdleHeartbeat = { */ idle_heartbeat?: number; }; + export type ConsumerCallbackFn = (r: JsMsg) => void; export type ConsumeCallback = { /** diff --git a/jetstream/tests/direct_consumer_test.ts b/jetstream/tests/direct_consumer_test.ts index bff89eb3..eb0b5da4 100644 --- a/jetstream/tests/direct_consumer_test.ts +++ b/jetstream/tests/direct_consumer_test.ts @@ -38,7 +38,7 @@ Deno.test("direct consumer - next", async () => { js.publish("a"), ]); - const dc = new DirectConsumer("A", new DirectStreamAPIImpl(nc)); + const dc = new DirectConsumer("A", new DirectStreamAPIImpl(nc), { seq: 0 }); const m = await dc.next(); assertEquals(m?.seq, 1); @@ -65,9 +65,13 @@ Deno.test("direct consumer - batch", async () => { await Promise.all(buf); - const dc = new DirectConsumer("A", new DirectStreamAPIImpl(nc)); + const dc = new DirectConsumer( + "A", + new DirectStreamAPIImpl(nc), + { seq: 0 }, + ); - let iter = await dc.fetch({ max_messages: 5 }); + let iter = await dc.fetch({ batch: 5 }); let s = 0; let last: StoredMsg | undefined; for await (const sm of iter) { @@ -116,8 +120,12 @@ Deno.test("direct consumer - consume", async () => { await Promise.all(buf); - const dc = new DirectConsumer("A", new DirectStreamAPIImpl(nc)); - const iter = await dc.consume({ max_messages: 7 }); + const dc = new DirectConsumer( + "A", + new DirectStreamAPIImpl(nc), + { seq: 0 }, + ); + const iter = await dc.consume({ batch: 7 }); for await (const m of iter) { if (m.pending === 0) { break;