From 1d4d9fe9ddc9df64ca80e088c96fee0c053a6707 Mon Sep 17 00:00:00 2001 From: nigiri <168690269+0xnigir1@users.noreply.github.com> Date: Fri, 1 Nov 2024 09:30:29 -0300 Subject: [PATCH] fix: use native AbortController --- packages/data-flow/src/orchestrator.ts | 6 ++-- .../data-flow/test/unit/orchestrator.spec.ts | 33 +++++++++++-------- 2 files changed, 22 insertions(+), 17 deletions(-) diff --git a/packages/data-flow/src/orchestrator.ts b/packages/data-flow/src/orchestrator.ts index d99f370..5c26dc9 100644 --- a/packages/data-flow/src/orchestrator.ts +++ b/packages/data-flow/src/orchestrator.ts @@ -48,7 +48,6 @@ import { CoreDependencies, DataLoader, delay, IQueue, iStrategyAbi, Queue } from * - Error handling and logging for various failure scenarios * - Registry tracking of supported/unsupported strategies and events * - * TODO: Implement a circuit breaker to gracefully stop the orchestrator * TODO: Enhance the error handling/retries, logging and observability * TODO: Handle unhandled strategies appropriately */ @@ -91,9 +90,8 @@ export class Orchestrator { this.eventsQueue = new Queue>(fetchLimit); } - async run(): Promise { - //TODO: implement a circuit breaker to gracefully stop the orchestrator - while (true) { + async run(signal: AbortSignal): Promise { + while (!signal.aborted) { let event: ProcessorEvent | undefined; try { if (this.eventsQueue.isEmpty()) await this.fillQueue(); diff --git a/packages/data-flow/test/unit/orchestrator.spec.ts b/packages/data-flow/test/unit/orchestrator.spec.ts index d78276a..38caaaf 100644 --- a/packages/data-flow/test/unit/orchestrator.spec.ts +++ b/packages/data-flow/test/unit/orchestrator.spec.ts @@ -53,6 +53,8 @@ describe("Orchestrator", { sequential: true }, () => { let mockEventsRegistry: IEventsRegistry; let mockStrategyRegistry: IStrategyRegistry; let mockEvmProvider: EvmProvider; + let abortController: AbortController; + let runPromise: Promise | undefined; const chainId = 1 as ChainId; const mockFetchLimit = 10; @@ -91,6 +93,8 @@ describe("Orchestrator", { sequential: true }, () => { }, }; + abortController = new AbortController(); + orchestrator = new Orchestrator( chainId, dependencies, @@ -107,8 +111,11 @@ describe("Orchestrator", { sequential: true }, () => { afterAll(async () => { vi.clearAllMocks(); - // Wait for any pending promises to resolve - await new Promise((resolve) => setImmediate(resolve)); + abortController.abort(); + + await runPromise; + + runPromise = undefined; }); describe("Event Processing Flow", () => { @@ -136,7 +143,7 @@ describe("Orchestrator", { sequential: true }, () => { return Promise.resolve(); }); - void orchestrator.run(); + runPromise = orchestrator.run(abortController.signal); await vi.waitFor( () => { @@ -159,7 +166,7 @@ describe("Orchestrator", { sequential: true }, () => { [], ); - void orchestrator.run(); + runPromise = orchestrator.run(abortController.signal); // Wait for a few polling cycles await vi.advanceTimersByTimeAsync(mockFetchDelay * 2); @@ -224,7 +231,7 @@ describe("Orchestrator", { sequential: true }, () => { numSuccessful: 3, }); - void orchestrator.run(); + runPromise = orchestrator.run(abortController.signal); await vi.waitFor(() => { if (eventsProcessorSpy.mock.calls.length < 1) throw new Error("Not yet called"); @@ -287,7 +294,7 @@ describe("Orchestrator", { sequential: true }, () => { numSuccessful: 1, }); - void orchestrator.run(); + runPromise = orchestrator.run(abortController.signal); await vi.waitFor(() => { if (eventsProcessorSpy.mock.calls.length < 1) throw new Error("Not yet called"); @@ -325,7 +332,7 @@ describe("Orchestrator", { sequential: true }, () => { vi.spyOn(mockStrategyRegistry, "getStrategyId").mockResolvedValue(unhandledStrategyId); vi.spyOn(mockEvmProvider, "readContract").mockResolvedValue(unhandledStrategyId); - void orchestrator.run(); + runPromise = orchestrator.run(abortController.signal); await vi.advanceTimersByTimeAsync(mockFetchDelay * 2); @@ -383,7 +390,7 @@ describe("Orchestrator", { sequential: true }, () => { return Promise.resolve(); }); - void orchestrator.run(); + runPromise = orchestrator.run(abortController.signal); await vi.waitFor( () => { @@ -433,7 +440,7 @@ describe("Orchestrator", { sequential: true }, () => { numSuccessful: 1, }); - void orchestrator.run(); + runPromise = orchestrator.run(abortController.signal); await vi.waitFor(() => { if (eventsProcessorSpy.mock.calls.length < 1) throw new Error("Not yet called"); @@ -472,7 +479,7 @@ describe("Orchestrator", { sequential: true }, () => { return Promise.resolve(); }); - void orchestrator.run(); + runPromise = orchestrator.run(abortController.signal); await vi.waitFor( () => { @@ -505,7 +512,7 @@ describe("Orchestrator", { sequential: true }, () => { .mockResolvedValue([]); eventsProcessorSpy.mockRejectedValue(error); - void orchestrator.run(); + runPromise = orchestrator.run(abortController.signal); await vi.waitFor(() => { if (eventsProcessorSpy.mock.calls.length < 1) throw new Error("Not yet called"); @@ -535,7 +542,7 @@ describe("Orchestrator", { sequential: true }, () => { const consoleSpy = vi.spyOn(console, "error"); - void orchestrator.run(); + runPromise = orchestrator.run(abortController.signal); await vi.waitFor(() => { if (consoleSpy.mock.calls.length < 1) throw new Error("Not yet called"); @@ -573,7 +580,7 @@ describe("Orchestrator", { sequential: true }, () => { numSuccessful: 0, }); - void orchestrator.run(); + runPromise = orchestrator.run(abortController.signal); await vi.waitFor(() => { if (dataLoaderSpy.mock.calls.length < 1) throw new Error("Not yet called");