Skip to content

Commit

Permalink
fix: use native AbortController
Browse files Browse the repository at this point in the history
  • Loading branch information
0xnigir1 committed Nov 1, 2024
1 parent 0000ae6 commit 1d4d9fe
Show file tree
Hide file tree
Showing 2 changed files with 22 additions and 17 deletions.
6 changes: 2 additions & 4 deletions packages/data-flow/src/orchestrator.ts
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,6 @@ import { CoreDependencies, DataLoader, delay, IQueue, iStrategyAbi, Queue } from
* - Error handling and logging for various failure scenarios
* - Registry tracking of supported/unsupported strategies and events
*
* TODO: Implement a circuit breaker to gracefully stop the orchestrator
* TODO: Enhance the error handling/retries, logging and observability
* TODO: Handle unhandled strategies appropriately
*/
Expand Down Expand Up @@ -91,9 +90,8 @@ export class Orchestrator {
this.eventsQueue = new Queue<ProcessorEvent<ContractName, AnyEvent>>(fetchLimit);
}

async run(): Promise<void> {
//TODO: implement a circuit breaker to gracefully stop the orchestrator
while (true) {
async run(signal: AbortSignal): Promise<void> {
while (!signal.aborted) {
let event: ProcessorEvent<ContractName, AnyEvent> | undefined;
try {
if (this.eventsQueue.isEmpty()) await this.fillQueue();
Expand Down
33 changes: 20 additions & 13 deletions packages/data-flow/test/unit/orchestrator.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,8 @@ describe("Orchestrator", { sequential: true }, () => {
let mockEventsRegistry: IEventsRegistry;
let mockStrategyRegistry: IStrategyRegistry;
let mockEvmProvider: EvmProvider;
let abortController: AbortController;
let runPromise: Promise<void> | undefined;

const chainId = 1 as ChainId;
const mockFetchLimit = 10;
Expand Down Expand Up @@ -91,6 +93,8 @@ describe("Orchestrator", { sequential: true }, () => {
},
};

abortController = new AbortController();

orchestrator = new Orchestrator(
chainId,
dependencies,
Expand All @@ -107,8 +111,11 @@ describe("Orchestrator", { sequential: true }, () => {
afterAll(async () => {
vi.clearAllMocks();

// Wait for any pending promises to resolve
await new Promise((resolve) => setImmediate(resolve));
abortController.abort();

await runPromise;

runPromise = undefined;
});

describe("Event Processing Flow", () => {
Expand Down Expand Up @@ -136,7 +143,7 @@ describe("Orchestrator", { sequential: true }, () => {
return Promise.resolve();
});

void orchestrator.run();
runPromise = orchestrator.run(abortController.signal);

await vi.waitFor(
() => {
Expand All @@ -159,7 +166,7 @@ describe("Orchestrator", { sequential: true }, () => {
[],
);

void orchestrator.run();
runPromise = orchestrator.run(abortController.signal);

// Wait for a few polling cycles
await vi.advanceTimersByTimeAsync(mockFetchDelay * 2);
Expand Down Expand Up @@ -224,7 +231,7 @@ describe("Orchestrator", { sequential: true }, () => {
numSuccessful: 3,
});

void orchestrator.run();
runPromise = orchestrator.run(abortController.signal);

await vi.waitFor(() => {
if (eventsProcessorSpy.mock.calls.length < 1) throw new Error("Not yet called");
Expand Down Expand Up @@ -287,7 +294,7 @@ describe("Orchestrator", { sequential: true }, () => {
numSuccessful: 1,
});

void orchestrator.run();
runPromise = orchestrator.run(abortController.signal);

await vi.waitFor(() => {
if (eventsProcessorSpy.mock.calls.length < 1) throw new Error("Not yet called");
Expand Down Expand Up @@ -325,7 +332,7 @@ describe("Orchestrator", { sequential: true }, () => {
vi.spyOn(mockStrategyRegistry, "getStrategyId").mockResolvedValue(unhandledStrategyId);
vi.spyOn(mockEvmProvider, "readContract").mockResolvedValue(unhandledStrategyId);

void orchestrator.run();
runPromise = orchestrator.run(abortController.signal);

await vi.advanceTimersByTimeAsync(mockFetchDelay * 2);

Expand Down Expand Up @@ -383,7 +390,7 @@ describe("Orchestrator", { sequential: true }, () => {
return Promise.resolve();
});

void orchestrator.run();
runPromise = orchestrator.run(abortController.signal);

await vi.waitFor(
() => {
Expand Down Expand Up @@ -433,7 +440,7 @@ describe("Orchestrator", { sequential: true }, () => {
numSuccessful: 1,
});

void orchestrator.run();
runPromise = orchestrator.run(abortController.signal);

await vi.waitFor(() => {
if (eventsProcessorSpy.mock.calls.length < 1) throw new Error("Not yet called");
Expand Down Expand Up @@ -472,7 +479,7 @@ describe("Orchestrator", { sequential: true }, () => {
return Promise.resolve();
});

void orchestrator.run();
runPromise = orchestrator.run(abortController.signal);

await vi.waitFor(
() => {
Expand Down Expand Up @@ -505,7 +512,7 @@ describe("Orchestrator", { sequential: true }, () => {
.mockResolvedValue([]);
eventsProcessorSpy.mockRejectedValue(error);

void orchestrator.run();
runPromise = orchestrator.run(abortController.signal);

await vi.waitFor(() => {
if (eventsProcessorSpy.mock.calls.length < 1) throw new Error("Not yet called");
Expand Down Expand Up @@ -535,7 +542,7 @@ describe("Orchestrator", { sequential: true }, () => {

const consoleSpy = vi.spyOn(console, "error");

void orchestrator.run();
runPromise = orchestrator.run(abortController.signal);

await vi.waitFor(() => {
if (consoleSpy.mock.calls.length < 1) throw new Error("Not yet called");
Expand Down Expand Up @@ -573,7 +580,7 @@ describe("Orchestrator", { sequential: true }, () => {
numSuccessful: 0,
});

void orchestrator.run();
runPromise = orchestrator.run(abortController.signal);

await vi.waitFor(() => {
if (dataLoaderSpy.mock.calls.length < 1) throw new Error("Not yet called");
Expand Down

0 comments on commit 1d4d9fe

Please sign in to comment.