diff --git a/packages/data-flow/src/orchestrator.ts b/packages/data-flow/src/orchestrator.ts index ca672bb..d99f370 100644 --- a/packages/data-flow/src/orchestrator.ts +++ b/packages/data-flow/src/orchestrator.ts @@ -1,5 +1,3 @@ -// class should contain the logic to orchestrate the data flow Events Fetcher -> Events Processor -> Data Loader - import { IIndexerClient } from "@grants-stack-indexer/indexer-client"; import { existsHandler, @@ -62,6 +60,14 @@ export class Orchestrator { private readonly strategyRegistry: IStrategyRegistry; private readonly dataLoader: DataLoader; + /** + * @param chainId - The chain id + * @param dependencies - The core dependencies + * @param indexerClient - The indexer client + * @param registries - The registries + * @param fetchLimit - The fetch limit + * @param fetchDelayInMs - The fetch delay in milliseconds + */ constructor( private chainId: ChainId, private dependencies: Readonly, @@ -71,7 +77,7 @@ export class Orchestrator { strategyRegistry: IStrategyRegistry; }, private fetchLimit: number = 1000, - private fetchDelay: number = 10000, + private fetchDelayInMs: number = 10000, ) { this.eventsFetcher = new EventsFetcher(this.indexerClient); this.eventsProcessor = new EventsProcessor(this.chainId, this.dependencies); @@ -95,7 +101,7 @@ export class Orchestrator { event = this.eventsQueue.pop(); if (!event) { - await delay(this.fetchDelay); + await delay(this.fetchDelayInMs); continue; } diff --git a/packages/data-flow/test/unit/orchestrator.spec.ts b/packages/data-flow/test/unit/orchestrator.spec.ts index c273451..a3b94bc 100644 --- a/packages/data-flow/test/unit/orchestrator.spec.ts +++ b/packages/data-flow/test/unit/orchestrator.spec.ts @@ -1,5 +1,5 @@ import { Address } from "viem"; -import { beforeEach, describe, expect, it, vi } from "vitest"; +import { afterAll, beforeEach, describe, expect, it, vi } from "vitest"; import { EvmProvider } from "@grants-stack-indexer/chain-providers"; import { IIndexerClient } from "@grants-stack-indexer/indexer-client"; @@ -91,9 +91,6 @@ describe("Orchestrator", { sequential: true }, () => { }, }; - // vi.mocked(EventsProcessor).mockImplementation(() => mockEventsProcessor); - // vi.mocked(DataLoader).mockImplementation(() => mockDataLoader); - orchestrator = new Orchestrator( chainId, dependencies, @@ -107,6 +104,12 @@ describe("Orchestrator", { sequential: true }, () => { ); }); + afterAll(() => { + vi.clearAllMocks(); + + return new Promise((resolve) => setImmediate(resolve)); + }); + describe("Event Processing Flow", () => { it("process events in the correct order", async () => { const mockEvents = [