Skip to content

Commit

Permalink
internal: removed Cancelable
Browse files Browse the repository at this point in the history
test: flappers
  • Loading branch information
aricart committed Sep 9, 2024
1 parent e8c08a1 commit 0b46c7f
Show file tree
Hide file tree
Showing 7 changed files with 31 additions and 23 deletions.
1 change: 0 additions & 1 deletion core/src/internal_mod.ts
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ export type { Transport, TransportFactory } from "./transport.ts";
export { Connect, INFO, ProtocolHandler } from "./protocol.ts";
export type {
Backoff,
Cancelable,
Deferred,
Delay,
ErrorResult,
Expand Down
1 change: 0 additions & 1 deletion core/src/mod.ts
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,6 @@ export type {
Authenticator,
Backoff,
BenchOpts,
Cancelable,
Codec,
ConnectionOptions,
Deferred,
Expand Down
14 changes: 5 additions & 9 deletions core/src/util.ts
Original file line number Diff line number Diff line change
Expand Up @@ -61,11 +61,7 @@ export function render(frame: Uint8Array): string {
.replace(/\r/g, cr);
}

export interface Cancelable {
cancel: () => void;
}

export interface Timeout<T> extends Promise<T>, Cancelable {
export interface Timeout<T> extends Promise<T> {
cancel: () => void;
}

Expand Down Expand Up @@ -94,20 +90,20 @@ export function timeout<T>(ms: number, asyncTraces = true): Timeout<T> {
return Object.assign(p, methods) as Timeout<T>;
}

export interface Delay extends Promise<boolean>, Cancelable {
export interface Delay extends Promise<void> {
cancel: () => void;
}

export function delay(ms = 0): Delay {
let methods;
const p = new Promise<boolean>((resolve) => {
const p = new Promise<void>((resolve) => {
const timer = setTimeout(() => {
resolve(true);
resolve();
}, ms);
const cancel = (): void => {
if (timer) {
clearTimeout(timer);
resolve(false);
resolve();
}
};
methods = { cancel };
Expand Down
2 changes: 1 addition & 1 deletion deno.json
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
"test_helpers": "./test_helpers/mod.ts"
},
"tasks": {
"clean": "rm -Rf ./coverage core/lib jetstream/lib services/lib kv/lib obj/lib transport-node/lib transport-ws/lib",
"clean": "rm -Rf ./coverage core/lib core/build jetstream/lib jetstream/build services/lib services/build kv/lib kv/build obj/lib obj/build transport-node/lib transport-ws/lib",
"test": "deno task clean && deno task lint && deno task test-all",
"test-all": "deno task test-core && deno task test-jetstream && deno task test-kv && deno task test-obj && deno task test-services && deno task test-unsafe",
"test-unsafe": "deno test -A --parallel --reload --quiet --unsafely-ignore-certificate-errors --coverage=coverage core/unsafe_tests",
Expand Down
4 changes: 2 additions & 2 deletions jetstream/examples/util.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,8 @@
*/

import { createConsumer, fill, initStream } from "../tests/jstest_util.ts";
import type { NatsConnection } from "jsr:@nats-io/[email protected]26";
import { nuid } from "jsr:@nats-io/[email protected]17";
import type { NatsConnection } from "jsr:@nats-io/[email protected]27";
import { nuid } from "jsr:@nats-io/[email protected]27";

export async function setupStreamAndConsumer(
nc: NatsConnection,
Expand Down
26 changes: 21 additions & 5 deletions jetstream/src/pushconsumer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import {
} from "@nats-io/nats-core/internal";
import type {
CallbackFn,
Delay,
QueuedIterator,
Status,
Subscription,
Expand All @@ -49,6 +50,7 @@ export class PushConsumerMessagesImpl extends QueuedIteratorImpl<JsMsg>
serial: number;
createFails!: number;
statusIterator!: QueuedIteratorImpl<Status>;
cancelables: Delay[];

constructor(
c: PushConsumerImpl,
Expand All @@ -59,6 +61,7 @@ export class PushConsumerMessagesImpl extends QueuedIteratorImpl<JsMsg>
this.consumer = c;
this.monitor = null;
this.listeners = [];
this.cancelables = [];
this.abortOnMissingResource =
userOptions.abort_on_missing_resource === true;
this.callback = userOptions.callback || null;
Expand Down Expand Up @@ -119,12 +122,20 @@ export class PushConsumerMessagesImpl extends QueuedIteratorImpl<JsMsg>
this.stop(err);
}
const bo = backoff();
delay(bo.backoff(this.createFails))
.then(() => {
if (!this.done) {
this.reset();
}
const c = delay(bo.backoff(this.createFails));
c.then(() => {
const idx = this.cancelables.indexOf(c);
if (idx !== -1) {
this.cancelables = this.cancelables.splice(idx, idx);
}
if (!this.done) {
this.reset();
}
})
.catch((_) => {
// canceled
});
this.cancelables.push(c);
});
}

Expand Down Expand Up @@ -161,6 +172,11 @@ export class PushConsumerMessagesImpl extends QueuedIteratorImpl<JsMsg>
this.statusIterator?.stop();
this.monitor?.cancel();
this.monitor = null;
// if we have delays, stop them
this.cancelables.forEach((c) => {
c.cancel();
});
this.cancelables = [];
this._push(() => {
super.stop(err);
this.listeners.forEach((n) => {
Expand Down
6 changes: 2 additions & 4 deletions jetstream/tests/pushconsumers_ordered_test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ import {
jetstreamServerConf,
notCompatible,
} from "test_helpers";
import { deferred } from "@nats-io/nats-core";
import type {
PushConsumerImpl,
PushConsumerMessagesImpl,
Expand Down Expand Up @@ -74,7 +73,6 @@ Deno.test("ordered push consumers - consume reset", async () => {
assertExists(oc);

const seen: number[] = new Array(3).fill(0);
const done = deferred();

const iter = await oc.consume({
callback: (m: JsMsg) => {
Expand All @@ -87,11 +85,10 @@ Deno.test("ordered push consumers - consume reset", async () => {
}
if (m.info.pending === 0) {
iter.stop();
done.resolve();
}
},
}) as PushConsumerMessagesImpl;
await done;
await iter.closed();

assertEquals(seen, [2, 2, 1]);
assertEquals(oc.serial, 3);
Expand Down Expand Up @@ -151,6 +148,7 @@ Deno.test("ordered push consumers - filters consume", async () => {
}
}

await iter.closed();
assertEquals(iter.getProcessed(), 1);

await cleanup(ns, nc);
Expand Down

0 comments on commit 0b46c7f

Please sign in to comment.