Skip to content

Commit

Permalink
consumer based on direct
Browse files Browse the repository at this point in the history
Signed-off-by: Alberto Ricart <[email protected]>
  • Loading branch information
aricart committed Nov 22, 2024
1 parent 27ca929 commit 763c853
Show file tree
Hide file tree
Showing 4 changed files with 196 additions and 0 deletions.
71 changes: 71 additions & 0 deletions jetstream/src/jsm_direct.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,14 @@

import { BaseApiClientImpl } from "./jsbaseclient_api.ts";
import type {
ConsumeOptions,
Consumer,
ConsumerMessages,
DirectMsg,
DirectStreamAPI,
FetchOptions,
JetStreamOptions,
NextOptions,
StoredMsg,
} from "./types.ts";
import { DirectMsgHeaders } from "./types.ts";
Expand Down Expand Up @@ -257,6 +262,11 @@ export class DirectMsgImpl implements DirectMsg {
return typeof v === "string" ? parseInt(v) : 0;
}

get pending(): number {
const v = this.header.last(DirectMsgHeaders.NumPending);
return typeof v === "string" ? parseInt(v) : 0;
}

json<T = unknown>(reviver?: ReviverFn): T {
return JSON.parse(new TextDecoder().decode(this.data), reviver);
}
Expand All @@ -265,3 +275,64 @@ export class DirectMsgImpl implements DirectMsg {
return TD.decode(this.data);
}
}

export class DirectConsumer {
stream: string;
api: DirectStreamAPIImpl;
cursor: number;
ordered: boolean;

constructor(stream: string, api: DirectStreamAPIImpl) {
this.stream = stream;
this.api = api;
this.cursor = 0;
this.ordered = false;
}

consume(opts?: ConsumeOptions): Promise<QueuedIterator<StoredMsg>> {
throw new Error("Method not implemented.");
}

async fetch(opts?: FetchOptions): Promise<QueuedIterator<StoredMsg>> {
const qi = new QueuedIteratorImpl<StoredMsg>();
const fo = opts || {};
let abort = false;
const src = await this.api.get(this.stream, {
seq: this.cursor + 1,
batch: fo.max_messages ?? 100,
callback: (done, err, sm) => {
if (!abort) {
if (done) {
qi.push(() => {
err ? qi.stop(err) : qi.stop();
});
} else {
if (this.ordered && sm) {
if (sm.lastSequence !== this.cursor) {
qi.push(() => {
qi.stop();
});
abort = true;
qi.stop();
src.stop();
}
}
qi.push(sm);
this.cursor = sm.seq;
}
}
},
});

return qi;
}

async next(): Promise<StoredMsg | null> {
const sm = await this.api.getMessage(this.stream, { seq: this.cursor + 1 });
const seq = sm?.seq;
if (seq) {
this.cursor = seq;
}
return sm;
}
}
7 changes: 7 additions & 0 deletions jetstream/src/jsmstream_api.ts
Original file line number Diff line number Diff line change
Expand Up @@ -676,6 +676,13 @@ export class StoredMsgImpl implements StoredMsg {
constructor(smr: StreamMsgResponse) {
this.smr = smr;
}
get pending(): number {
return 0;
}

get lastSequence(): number {
return 0;
}

get subject(): string {
return this.smr.message.subject;
Expand Down
13 changes: 13 additions & 0 deletions jetstream/src/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -980,6 +980,18 @@ export interface StoredMsg {
*/
timestamp: string;

/**
* The previous sequence delivered to the client in the current batch.
* This value will be 0 if it was not from a batch request
*/
lastSequence: number;

/**
* The number of messages in the stream that are pending for a similar
* batch request. 0 if it was not a batch request
*/
pending: number;

/**
* Convenience method to parse the message payload as JSON. This method
* will throw an exception if there's a parsing error;
Expand Down Expand Up @@ -1120,6 +1132,7 @@ export enum DirectMsgHeaders {
TimeStamp = "Nats-Time-Stamp",
Subject = "Nats-Subject",
LastSequence = "Nats-Last-Sequence",
NumPending = "Nats-Num-Pending",
}

export enum RepublishHeaders {
Expand Down
105 changes: 105 additions & 0 deletions jetstream/tests/direct_consumer_test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
/*
* Copyright 2024 Synadia Communications, Inc
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import {
assertArrayIncludes,
assertEquals,
assertExists,
assertIsError,
assertRejects,
fail,
} from "jsr:@std/assert";

import { deferred, delay } from "@nats-io/nats-core";

import {
jetstream,
JetStreamError,
jetstreamManager,
StorageType,
type StoredMsg,
} from "../src/mod.ts";
import {
cleanup,
jetstreamServerConf,
notCompatible,
notSupported,
setup,
} from "test_helpers";
import { DirectConsumer, DirectStreamAPIImpl } from "../src/jsm_direct.ts";

Deno.test("direct consumer - next", async () => {
const { ns, nc } = await setup(jetstreamServerConf());
if (await notCompatible(ns, nc, "2.11.0")) {
return;
}
const jsm = await jetstreamManager(nc);
await jsm.streams.add({ name: "A", subjects: ["a"], allow_direct: true });

const js = jsm.jetstream();
await Promise.all([
js.publish("a"),
js.publish("a"),
js.publish("a"),
]);

const dc = new DirectConsumer("A", new DirectStreamAPIImpl(nc));

const m = await dc.next();
console.log(m);
assertEquals(m?.seq, 1);
assertEquals((await dc.next())?.seq, 2);
assertEquals((await dc.next())?.seq, 3);
assertEquals(await dc.next(), null);

await cleanup(ns, nc);
});

Deno.test("direct consumer - batch", async () => {
const { ns, nc } = await setup(jetstreamServerConf());
if (await notCompatible(ns, nc, "2.11.0")) {
return;
}
const jsm = await jetstreamManager(nc);
await jsm.streams.add({ name: "A", subjects: ["a"], allow_direct: true });

const js = jsm.jetstream();
const buf = [];
for (let i = 0; i < 100; i++) {
buf.push(js.publish("a", `${i}`));
}

await Promise.all(buf);

const dc = new DirectConsumer("A", new DirectStreamAPIImpl(nc));

let iter = await dc.fetch({ max_messages: 5 });
let s = 0;
for await (const sm of iter) {
console.log(sm);
assertEquals(sm.seq, ++s);
}
assertEquals(s, 5);
const m = await dc.next();
assertEquals(m?.seq, 6);
s = 6;

iter = await dc.fetch();
for await (const sm of iter) {
assertEquals(sm.seq, ++s);
}
assertEquals(s, 100);

await cleanup(ns, nc);
});

0 comments on commit 763c853

Please sign in to comment.