Skip to content

Commit

Permalink
fixed types
Browse files Browse the repository at this point in the history
Signed-off-by: Alberto Ricart <[email protected]>
  • Loading branch information
aricart committed Nov 23, 2024
1 parent 21295a7 commit 6a0f43b
Show file tree
Hide file tree
Showing 2 changed files with 43 additions and 20 deletions.
1 change: 0 additions & 1 deletion jetstream/src/jsapi_types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -528,7 +528,6 @@ export type DirectBatchLimits = {
export type DirectBatchStartSeq = StartSeq & DirectBatchLimits;
export type DirectBatchStartTime = StartTime & DirectBatchLimits;
export type DirectBatchOptions = DirectBatchStartSeq & DirectBatchStartTime;
export type DirectFetchOptions = DirectBatch & DirectMaxBytes;

export type DirectLastFor = {
multi_last: string[];
Expand Down
62 changes: 43 additions & 19 deletions jetstream/src/jsm_direct.ts
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,12 @@ import type {
DirectMsg,
DirectStreamAPI,
JetStreamOptions,
MaxBytes,
StoredMsg,
} from "./types.ts";
import { DirectMsgHeaders } from "./types.ts";
import type {
CallbackFn,
Codec,
Deferred,
Delay,
Msg,
Expand All @@ -44,14 +44,15 @@ import {
} from "@nats-io/nats-core/internal";
import type {
CompletionResult,
DirectBatch,
DirectBatchOptions,
DirectFetchOptions,
DirectBatchStartSeq,
DirectBatchStartTime,
DirectLastFor,
DirectMaxBytes,
DirectMsgRequest,
LastForMsgRequest,
PullOptions,
StartSeq,
StartTime,
} from "./jsapi_types.ts";
import { validateStreamName } from "./jsutil.ts";
import { JetStreamStatus, JetStreamStatusError } from "./jserrors.ts";
Expand Down Expand Up @@ -229,7 +230,6 @@ export class DirectStreamAPIImpl extends BaseApiClientImpl
export class DirectMsgImpl implements DirectMsg {
data: Uint8Array;
header: MsgHdrs;
static jc?: Codec<unknown>;

constructor(m: Msg) {
if (!m.headers) {
Expand Down Expand Up @@ -283,17 +283,44 @@ export class DirectMsgImpl implements DirectMsg {
}
}

/**
* Options for directly starting a direct consumer. The options can either specify
* a sequence start or a time start.
* @property {DirectBatchStartSeq} DirectBatchStartSeq - Specifies a sequence start for the consumer.
* @property {DirectBatchStartTime} DirectBatchStartTime - Specifies a time start for the consumer.
*/
export type DirectStartOptions = DirectBatchStartSeq | DirectBatchStartTime;

/**
* Represents the limits for the operation. For fetch requests it represents the maximum to be retrieved.
* For consume operations it represents the buffering for the consumer.
*
* This type is used to define constraints or configurations for batching processes that
* operate under specific limits, either in terms of quantity (DirectBatch) or size in bytes (DirectMaxBytes).
*/
export type DirectBatchLimits = DirectBatch | DirectMaxBytes;

function isDirectBatchStartTime(
t: DirectStartOptions,
): t is DirectBatchStartTime {
return typeof t === "object" && "start_time" in t;
}

function isMaxBytes(t: DirectBatchLimits): t is MaxBytes {
return typeof t === "object" && "max_bytes" in t;
}

export class DirectConsumer {
stream: string;
api: DirectStreamAPIImpl;
cursor: { last: number; pending?: number };
listeners: QueuedIteratorImpl<ConsumerNotification>[];
start: StartSeq & StartTime;
start: DirectStartOptions;

constructor(
stream: string,
api: DirectStreamAPIImpl,
start: StartSeq & StartTime,
start: DirectStartOptions,
) {
this.stream = stream;
this.api = api;
Expand All @@ -303,29 +330,26 @@ export class DirectConsumer {
}

getOptions(
opts: Partial<DirectFetchOptions> = {},
): Partial<DirectBatchOptions> {
opts?: DirectBatchLimits,
): DirectBatchOptions {
opts = opts || {} as DirectBatchLimits;
const dbo: Partial<DirectBatchOptions> = {};

if (this.cursor.last === 0) {
// we have never pulled, honor initial request options
if (this.start.seq) {
dbo.seq = this.start.seq;
} else if (this.start.start_time) {
if (isDirectBatchStartTime(this.start)) {
dbo.start_time = this.start.start_time;
} else {
dbo.seq = 1;
dbo.seq = this.start.seq || 1;
}
} else {
dbo.seq = this.cursor.last + 1;
}

if (opts.batch) {
dbo.batch = opts.batch;
} else if (opts.max_bytes) {
if (isMaxBytes(opts)) {
dbo.max_bytes = opts.max_bytes;
} else {
dbo.batch = 100;
dbo.batch = opts.batch ?? 100;
}

return dbo;
Expand Down Expand Up @@ -358,7 +382,7 @@ export class DirectConsumer {
console.log(this.cursor);
}

consume(opts?: DirectBatchOptions): Promise<QueuedIterator<StoredMsg>> {
consume(opts: DirectBatchLimits): Promise<QueuedIterator<StoredMsg>> {
let pending: Delay;
let requestDone: Deferred<void>;
const qi = new QueuedIteratorImpl<StoredMsg>();
Expand Down Expand Up @@ -445,7 +469,7 @@ export class DirectConsumer {
return Promise.resolve(qi);
}

async fetch(opts?: DirectBatchOptions): Promise<QueuedIterator<StoredMsg>> {
async fetch(opts?: DirectBatchLimits): Promise<QueuedIterator<StoredMsg>> {
const dbo = this.getOptions(opts);
const qi = new QueuedIteratorImpl<StoredMsg>();
const src = await this.api.get(
Expand Down

0 comments on commit 6a0f43b

Please sign in to comment.