Skip to content

Commit

Permalink
added start options
Browse files Browse the repository at this point in the history
Signed-off-by: Alberto Ricart <[email protected]>
  • Loading branch information
aricart committed Nov 23, 2024
1 parent fd08558 commit 94875c2
Show file tree
Hide file tree
Showing 4 changed files with 95 additions and 45 deletions.
11 changes: 9 additions & 2 deletions jetstream/src/jsapi_types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@

import type { Nanos } from "@nats-io/nats-core";
import { nanos } from "@nats-io/nats-core";
import type { StoredMsg } from "./types.ts";
import type { MaxBytes, StoredMsg } from "./types.ts";

export interface ApiPaged {
total: number;
Expand Down Expand Up @@ -514,14 +514,21 @@ export type CompletionResult = { err?: Error };
export type BatchCallback<T> = (done: CompletionResult | null, d: T) => void;
export type StartSeq = { seq?: number };
export type StartTime = { start_time?: Date | string };

export type DirectBatch = {
batch: number;
};
export type DirectMaxBytes = MaxBytes;

export type DirectBatchLimits = {
batch?: number;
max_bytes?: number;
callback?: BatchCallback<StoredMsg>;
};
export type DirectBatchStartSeq = StartSeq & DirectBatchLimits;
export type DirectBatchStartTime = StartTime & DirectBatchLimits;
export type DirectBatchOptions = DirectBatchStartSeq | DirectBatchStartTime;
export type DirectBatchOptions = DirectBatchStartSeq & DirectBatchStartTime;
export type DirectFetchOptions = DirectBatch & DirectMaxBytes;

export type DirectLastFor = {
multi_last: string[];
Expand Down
110 changes: 72 additions & 38 deletions jetstream/src/jsm_direct.ts
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ import type {
ConsumerNotification,
DirectMsg,
DirectStreamAPI,
FetchOptions,
JetStreamOptions,
StoredMsg,
} from "./types.ts";
Expand Down Expand Up @@ -47,10 +46,13 @@ import {
import type {
CompletionResult,
DirectBatchOptions,
DirectFetchOptions,
DirectLastFor,
DirectMsgRequest,
LastForMsgRequest,
PullOptions,
StartSeq,
StartTime,
} from "./jsapi_types.ts";
import { validateStreamName } from "./jsutil.ts";
import { JetStreamStatus, JetStreamStatusError } from "./jserrors.ts";
Expand Down Expand Up @@ -287,12 +289,47 @@ export class DirectConsumer {
api: DirectStreamAPIImpl;
cursor: { last: number; pending?: number };
listeners: QueuedIteratorImpl<ConsumerNotification>[];
start: StartSeq & StartTime;

constructor(stream: string, api: DirectStreamAPIImpl) {
constructor(
stream: string,
api: DirectStreamAPIImpl,
start: StartSeq & StartTime,
) {
this.stream = stream;
this.api = api;
this.cursor = { last: 0 };
this.listeners = [];
this.start = start;
}

getOptions(
opts: Partial<DirectFetchOptions> = {},
): Partial<DirectBatchOptions> {
const dbo: Partial<DirectBatchOptions> = {};

if (this.cursor.last === 0) {
// we have never pulled, honor initial request options
if (this.start.seq) {
dbo.seq = this.start.seq;
} else if (this.start.start_time) {
dbo.start_time = this.start.start_time;
} else {
dbo.seq = 1;
}
} else {
dbo.seq = this.cursor.last + 1;
}

if (opts.batch) {
dbo.batch = opts.batch;
} else if (opts.max_bytes) {
dbo.max_bytes = opts.max_bytes;
} else {
dbo.batch = 100;
}

return dbo;
}

status(): AsyncIterable<ConsumerNotification> {
Expand Down Expand Up @@ -322,10 +359,9 @@ export class DirectConsumer {
console.log(this.cursor);
}

consume(opts?: ConsumeOptions): Promise<QueuedIterator<StoredMsg>> {
consume(opts?: DirectBatchOptions): Promise<QueuedIterator<StoredMsg>> {
let pending: Delay;
let requestDone: Deferred<void>;
const fo = opts || {};
const qi = new QueuedIteratorImpl<StoredMsg>();

(async () => {
Expand All @@ -347,16 +383,13 @@ export class DirectConsumer {
break;
}
requestDone = deferred<void>();
const opts = {
batch: fo.max_messages ?? 100,
seq: this.cursor.last + 1,
} as DirectBatchOptions;
const dbo = this.getOptions(opts);
this.notify({
type: "next",
options: Object.assign({}, opts) as PullOptions,
});

opts.callback = (r: CompletionResult | null, sm: StoredMsg): void => {
dbo.callback = (r: CompletionResult | null, sm: StoredMsg): void => {
if (r) {
// if the current fetch is done, ready to schedule the next
if (r.err) {
Expand Down Expand Up @@ -395,7 +428,7 @@ export class DirectConsumer {

const src = await this.api.getBatch(
this.stream,
opts,
dbo,
) as QueuedIteratorImpl<StoredMsg>;

qi.iterClosed.then(() => {
Expand All @@ -413,35 +446,36 @@ export class DirectConsumer {
return Promise.resolve(qi);
}

async fetch(opts?: FetchOptions): Promise<QueuedIterator<StoredMsg>> {
async fetch(opts?: DirectBatchOptions): Promise<QueuedIterator<StoredMsg>> {
const dbo = this.getOptions(opts);
const qi = new QueuedIteratorImpl<StoredMsg>();
const fo = opts || {};
const src = await this.api.get(this.stream, {
seq: this.cursor.last + 1,
batch: fo.max_messages ?? 100,
callback: (done, sm) => {
if (done) {
// the server sent error or is done, we are done
qi.push(() => {
done.err ? qi.stop(done.err) : qi.stop();
});
} else if (
sm.lastSequence > 0 && sm.lastSequence !== this.cursor.last
) {
// we are done early because the sequence jumped unexpectedly
qi.push(() => {
qi.stop();
});
src.stop();
} else {
// pass-through to client, and record
qi.push(sm);
qi.received++;
this.cursor.last = sm.seq;
this.cursor.pending = sm.pending;
}
},
});
const src = await this.api.get(
this.stream,
Object.assign({
callback: (done: CompletionResult | null, sm: StoredMsg) => {
if (done) {
// the server sent error or is done, we are done
qi.push(() => {
done.err ? qi.stop(done.err) : qi.stop();
});
} else if (
sm.lastSequence > 0 && sm.lastSequence !== this.cursor.last
) {
// we are done early because the sequence jumped unexpectedly
qi.push(() => {
qi.stop();
});
src.stop();
} else {
// pass-through to client, and record
qi.push(sm);
qi.received++;
this.cursor.last = sm.seq;
this.cursor.pending = sm.pending;
}
},
}, dbo),
);
qi.iterClosed.then(() => {
src.stop();
});
Expand Down
1 change: 1 addition & 0 deletions jetstream/src/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -461,6 +461,7 @@ export type IdleHeartbeat = {
*/
idle_heartbeat?: number;
};

export type ConsumerCallbackFn = (r: JsMsg) => void;
export type ConsumeCallback = {
/**
Expand Down
18 changes: 13 additions & 5 deletions jetstream/tests/direct_consumer_test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ Deno.test("direct consumer - next", async () => {
js.publish("a"),
]);

const dc = new DirectConsumer("A", new DirectStreamAPIImpl(nc));
const dc = new DirectConsumer("A", new DirectStreamAPIImpl(nc), { seq: 0 });

const m = await dc.next();
assertEquals(m?.seq, 1);
Expand All @@ -65,9 +65,13 @@ Deno.test("direct consumer - batch", async () => {

await Promise.all(buf);

const dc = new DirectConsumer("A", new DirectStreamAPIImpl(nc));
const dc = new DirectConsumer(
"A",
new DirectStreamAPIImpl(nc),
{ seq: 0 },
);

let iter = await dc.fetch({ max_messages: 5 });
let iter = await dc.fetch({ batch: 5 });
let s = 0;
let last: StoredMsg | undefined;
for await (const sm of iter) {
Expand Down Expand Up @@ -116,8 +120,12 @@ Deno.test("direct consumer - consume", async () => {

await Promise.all(buf);

const dc = new DirectConsumer("A", new DirectStreamAPIImpl(nc));
const iter = await dc.consume({ max_messages: 7 });
const dc = new DirectConsumer(
"A",
new DirectStreamAPIImpl(nc),
{ seq: 0 },
);
const iter = await dc.consume({ batch: 7 });
for await (const m of iter) {
if (m.pending === 0) {
break;
Expand Down

0 comments on commit 94875c2

Please sign in to comment.