From 39c885aa214edb93c029a2305b7996324153c144 Mon Sep 17 00:00:00 2001 From: nigiri <168690269+0xnigir1@users.noreply.github.com> Date: Thu, 31 Oct 2024 13:36:27 -0300 Subject: [PATCH] test: add unit tests and natspec --- packages/data-flow/src/eventsRegistry.ts | 10 +- .../data-flow/src/exceptions/invalidEvent.ts | 6 +- .../interfaces/eventsRegistry.interface.ts | 12 + .../interfaces/strategyRegistry.interface.ts | 16 + packages/data-flow/src/orchestrator.ts | 93 ++- packages/data-flow/src/strategyRegistry.ts | 6 +- packages/data-flow/src/types/index.ts | 3 + packages/data-flow/src/utils/queue.ts | 16 + .../test/unit/eventsProcessor.spec.ts | 118 ++++ .../test/unit/eventsRegistry.spec.ts | 98 +++ .../data-flow/test/unit/orchestrator.spec.ts | 610 ++++++++++++++++++ .../test/unit/strategyRegistry.spec.ts | 42 ++ .../data-flow/test/unit/utils/queue.spec.ts | 79 +++ packages/data-flow/vitest.config.ts | 9 +- packages/processors/src/external.ts | 3 + 15 files changed, 1104 insertions(+), 17 deletions(-) create mode 100644 packages/data-flow/test/unit/eventsProcessor.spec.ts create mode 100644 packages/data-flow/test/unit/eventsRegistry.spec.ts create mode 100644 packages/data-flow/test/unit/orchestrator.spec.ts create mode 100644 packages/data-flow/test/unit/strategyRegistry.spec.ts create mode 100644 packages/data-flow/test/unit/utils/queue.spec.ts diff --git a/packages/data-flow/src/eventsRegistry.ts b/packages/data-flow/src/eventsRegistry.ts index 5e5852e..ba361a4 100644 --- a/packages/data-flow/src/eventsRegistry.ts +++ b/packages/data-flow/src/eventsRegistry.ts @@ -3,16 +3,22 @@ import type { AnyEvent, ContractName, ProcessorEvent } from "@grants-stack-index import type { IEventsRegistry } from "./internal.js"; /** - * Class to store the last processed event + * Class to store the last processed event in memory */ +//TODO: Implement storage version to persist the last processed event. we need to store it by chainId export class InMemoryEventsRegistry implements IEventsRegistry { - //TODO: Implement storage to persist the last processed event. we need to store it by chainId private lastProcessedEvent: ProcessorEvent | undefined; + /** + * @inheritdoc + */ async getLastProcessedEvent(): Promise | undefined> { return this.lastProcessedEvent; } + /** + * @inheritdoc + */ async saveLastProcessedEvent(event: ProcessorEvent): Promise { this.lastProcessedEvent = event; } diff --git a/packages/data-flow/src/exceptions/invalidEvent.ts b/packages/data-flow/src/exceptions/invalidEvent.ts index 918a3f6..849c145 100644 --- a/packages/data-flow/src/exceptions/invalidEvent.ts +++ b/packages/data-flow/src/exceptions/invalidEvent.ts @@ -1,7 +1,9 @@ -import { AnyEvent, ContractName, ProcessorEvent } from "@grants-stack-indexer/shared"; +import { AnyEvent, ContractName, ProcessorEvent, stringify } from "@grants-stack-indexer/shared"; export class InvalidEvent extends Error { constructor(event: ProcessorEvent) { - super(`Event couldn't be processed: ${event}`); + super(`Event couldn't be processed: ${stringify(event)}`); + + this.name = "InvalidEvent"; } } diff --git a/packages/data-flow/src/interfaces/eventsRegistry.interface.ts b/packages/data-flow/src/interfaces/eventsRegistry.interface.ts index 37bfd18..8f061f1 100644 --- a/packages/data-flow/src/interfaces/eventsRegistry.interface.ts +++ b/packages/data-flow/src/interfaces/eventsRegistry.interface.ts @@ -1,6 +1,18 @@ import { AnyEvent, ContractName, ProcessorEvent } from "@grants-stack-indexer/shared"; +/** + * The events registry saves as a checkpoint to the last processed event by the system. + * This is used to resume the indexing from the last processed event in case of an error or temporary interruption. + */ export interface IEventsRegistry { + /** + * Get the last processed event by the system + * @returns The last processed event or undefined if no event has been processed yet. + */ getLastProcessedEvent(): Promise | undefined>; + /** + * Save the last processed event by the system + * @param event - The event to save. + */ saveLastProcessedEvent(event: ProcessorEvent): Promise; } diff --git a/packages/data-flow/src/interfaces/strategyRegistry.interface.ts b/packages/data-flow/src/interfaces/strategyRegistry.interface.ts index 4cf0e89..8d3aa44 100644 --- a/packages/data-flow/src/interfaces/strategyRegistry.interface.ts +++ b/packages/data-flow/src/interfaces/strategyRegistry.interface.ts @@ -1,6 +1,22 @@ import { Address, Hex } from "viem"; +/** + * The strategy registry saves the mapping between the strategy address and the strategy id. Serves as a Cache + * to avoid having to read from the chain to get the strategy id every time. + */ +//TODO: implement a mechanism to record Strategy that we still don't have a corresponding handler +// we need to store and mark that this strategy is not handled yet, so when it's supported we can process all of the pending events for it export interface IStrategyRegistry { + /** + * Get the strategy id by the strategy address + * @param strategyAddress - The strategy address + * @returns The strategy id or undefined if the strategy address is not registered + */ getStrategyId(strategyAddress: Address): Promise; + /** + * Save the strategy id by the strategy address + * @param strategyAddress - The strategy address + * @param strategyId - The strategy id + */ saveStrategyId(strategyAddress: Address, strategyId: Hex): Promise; } diff --git a/packages/data-flow/src/orchestrator.ts b/packages/data-flow/src/orchestrator.ts index cd110f4..ca672bb 100644 --- a/packages/data-flow/src/orchestrator.ts +++ b/packages/data-flow/src/orchestrator.ts @@ -1,7 +1,11 @@ // class should contain the logic to orchestrate the data flow Events Fetcher -> Events Processor -> Data Loader import { IIndexerClient } from "@grants-stack-indexer/indexer-client"; -import { UnsupportedStrategy } from "@grants-stack-indexer/processors/dist/src/internal.js"; +import { + existsHandler, + UnsupportedEventException, + UnsupportedStrategy, +} from "@grants-stack-indexer/processors"; import { Address, AnyEvent, @@ -23,6 +27,33 @@ import { IEventsFetcher } from "./interfaces/index.js"; import { IStrategyRegistry } from "./interfaces/strategyRegistry.interface.js"; import { CoreDependencies, DataLoader, delay, IQueue, iStrategyAbi, Queue } from "./internal.js"; +/** + * The Orchestrator is the central coordinator of the data flow system, managing the interaction between + * three main components: + * + * 1. Events Fetcher: Retrieves blockchain events from the indexer service + * 2. Events Processor: Processes and transforms raw events into domain events + * 3. Data Loader: Persists the processed events into the database + * + * The Orchestrator implements a continuous processing loop that: + * + * 1. Fetches batches of events from the indexer and stores them in an internal queue + * 2. Processes each event from the queue: + * - For strategy events and PoolCreated from Allo contract, enhances them with strategyId + * - Forwards the event to the Events Processor which is in charge of delagating the processing of the event to the correct handler + * - Discards events for unsupported strategies or events + * 3. Loads the processed events into the database via the Data Loader + * + * The Orchestrator provides fault tolerance and performance optimization through: + * - Configurable batch sizes for event fetching + * - Delayed processing to prevent overwhelming the system + * - Error handling and logging for various failure scenarios + * - Registry tracking of supported/unsupported strategies and events + * + * TODO: Implement a circuit breaker to gracefully stop the orchestrator + * TODO: Enhance the error handling/retries, logging and observability + * TODO: Handle unhandled strategies appropriately + */ export class Orchestrator { private readonly eventsQueue: IQueue>; private readonly eventsFetcher: IEventsFetcher; @@ -55,6 +86,7 @@ export class Orchestrator { } async run(): Promise { + //TODO: implement a circuit breaker to gracefully stop the orchestrator while (true) { let event: ProcessorEvent | undefined; try { @@ -68,8 +100,19 @@ export class Orchestrator { } event = await this.enhanceStrategyId(event); - const changesets = await this.eventsProcessor.processEvent(event); + if (event.contractName === "Strategy" && "strategyId" in event) { + if (!existsHandler(event.strategyId)) { + //TODO: save to registry as unsupported strategy, so when the strategy is handled it will be backwards compatible and process all of the events + console.log( + `No handler found for strategyId: ${event.strategyId}. Event: ${stringify( + event, + )}`, + ); + continue; + } + } + const changesets = await this.eventsProcessor.processEvent(event); const executionResult = await this.dataLoader.applyChanges(changesets); if (executionResult.numFailed > 0) { @@ -79,13 +122,19 @@ export class Orchestrator { event, )}`, ); + } else { + await this.eventsRegistry.saveLastProcessedEvent(event); } - - await this.eventsRegistry.saveLastProcessedEvent(event); } catch (error: unknown) { - // TODO: improve error handling and notify - if (error instanceof UnsupportedStrategy || error instanceof InvalidEvent) { - console.error(`${error.name}: ${error.message}. Event: ${stringify(event)}`); + // TODO: improve error handling, retries and notify + if ( + error instanceof UnsupportedStrategy || + error instanceof InvalidEvent || + error instanceof UnsupportedEventException + ) { + console.error( + `Current event cannot be handled. ${error.name}: ${error.message}. Event: ${stringify(event)}`, + ); } else { console.error(`Error processing event: ${stringify(event)}`, error); } @@ -93,6 +142,9 @@ export class Orchestrator { } } + /** + * Fill the events queue with the events from the events fetcher + */ private async fillQueue(): Promise { const lastProcessedEvent = await this.eventsRegistry.getLastProcessedEvent(); const blockNumber = lastProcessedEvent?.blockNumber ?? 0; @@ -108,9 +160,15 @@ export class Orchestrator { this.eventsQueue.push(...events); } - // if poolCreated event, get strategyId and save in the map - // if strategy event populate event with strategyId if exists in the map - // get strategyId and populate event with it + /** + * Enhance the event with the strategy id when required + * @param event - The event + * @returns The event with the strategy id or the same event if strategyId is not required + * + * StrategyId is required for the following events: + * - PoolCreated from Allo contract + * - Any event from Strategy contract or its implementations + */ private async enhanceStrategyId( event: ProcessorEvent, ): Promise> { @@ -125,6 +183,11 @@ export class Orchestrator { return event; } + /** + * Get the strategy address from the event + * @param event - The event + * @returns The strategy address + */ private getStrategyAddress( event: ProcessorEvent<"Allo", "PoolCreated"> | ProcessorEvent<"Strategy", StrategyEvent>, ): Address { @@ -133,6 +196,11 @@ export class Orchestrator { : event.srcAddress; } + /** + * Get the strategy id from the strategy registry or fetch it from the chain + * @param strategyAddress - The strategy address + * @returns The strategy id + */ private async getOrFetchStrategyId(strategyAddress: Address): Promise { const existingId = await this.strategyRegistry.getStrategyId(strategyAddress); if (existingId) { @@ -150,6 +218,11 @@ export class Orchestrator { return strategyId; } + /** + * Check if the event requires a strategy id + * @param event - The event + * @returns True if the event requires a strategy id, false otherwise + */ private requiresStrategyId( event: ProcessorEvent, ): event is ProcessorEvent<"Allo", "PoolCreated"> | ProcessorEvent<"Strategy", StrategyEvent> { diff --git a/packages/data-flow/src/strategyRegistry.ts b/packages/data-flow/src/strategyRegistry.ts index ae6649f..c370c58 100644 --- a/packages/data-flow/src/strategyRegistry.ts +++ b/packages/data-flow/src/strategyRegistry.ts @@ -3,16 +3,18 @@ import type { Address, Hex } from "viem"; import type { IStrategyRegistry } from "./internal.js"; /** - * Class to store strategy ids + * Class to store strategy ids in memory */ +//TODO: Implement storage to persist strategies. since we're using address, do we need ChainId? export class InMemoryStrategyRegistry implements IStrategyRegistry { - //TODO: Implement storage to persist strategies. since we're using address, do we need ChainId? private strategiesMap: Map = new Map(); + /** @inheritdoc */ async getStrategyId(strategyAddress: Address): Promise { return this.strategiesMap.get(strategyAddress); } + /** @inheritdoc */ async saveStrategyId(strategyAddress: Address, strategyId: Hex): Promise { this.strategiesMap.set(strategyAddress, strategyId); } diff --git a/packages/data-flow/src/types/index.ts b/packages/data-flow/src/types/index.ts index a67db51..6996677 100644 --- a/packages/data-flow/src/types/index.ts +++ b/packages/data-flow/src/types/index.ts @@ -6,6 +6,9 @@ import { IRoundRepository, } from "@grants-stack-indexer/repository"; +/** + * The result of the execution of the changesets. + */ export type ExecutionResult = { changesets: Changeset["type"][]; numExecuted: number; diff --git a/packages/data-flow/src/utils/queue.ts b/packages/data-flow/src/utils/queue.ts index 703b125..8a9a815 100644 --- a/packages/data-flow/src/utils/queue.ts +++ b/packages/data-flow/src/utils/queue.ts @@ -5,6 +5,22 @@ export interface IQueue { get length(): number; isEmpty(): boolean; } +/** + * A circular buffer-based queue implementation that provides efficient O(1) operations + * with automatic resizing capabilities. + * + * Key benefits: + * - Constant O(1) time complexity for push/pop/peek operations + * - Memory efficient circular buffer that reuses space + * - Automatic buffer resizing to handle growth and shrinkage + * - Amortized O(1) push operations even when resizing is needed + * - Memory optimization by shrinking when queue becomes very empty + * - Initial capacity can be tuned based on expected usage + * + * The circular buffer approach avoids the need to shift elements, making it more + * efficient than array-based queues for high-throughput scenarios. The automatic + * resizing ensures memory usage adapts to actual needs while maintaining performance. + */ export class Queue implements IQueue { private buffer: (T | undefined)[]; diff --git a/packages/data-flow/test/unit/eventsProcessor.spec.ts b/packages/data-flow/test/unit/eventsProcessor.spec.ts new file mode 100644 index 0000000..6df3753 --- /dev/null +++ b/packages/data-flow/test/unit/eventsProcessor.spec.ts @@ -0,0 +1,118 @@ +import { beforeEach, describe, expect, it, vi } from "vitest"; + +import { + AlloProcessor, + ProcessorDependencies, + RegistryProcessor, + StrategyProcessor, +} from "@grants-stack-indexer/processors"; +import { Changeset } from "@grants-stack-indexer/repository"; +import { AnyEvent, ChainId, ContractName, ProcessorEvent } from "@grants-stack-indexer/shared"; + +import { EventsProcessor } from "../../src/eventsProcessor.js"; +import { InvalidEvent } from "../../src/exceptions/index.js"; + +vi.mock("@grants-stack-indexer/processors", () => ({ + AlloProcessor: vi.fn(), + RegistryProcessor: vi.fn(), + StrategyProcessor: vi.fn(), +})); + +describe("EventsProcessor", () => { + let eventsProcessor: EventsProcessor; + let mockAlloProcessor: AlloProcessor; + let mockRegistryProcessor: RegistryProcessor; + let mockStrategyProcessor: StrategyProcessor; + const chainId = 1 as ChainId; + const mockDependencies = {} as ProcessorDependencies; + + beforeEach(() => { + mockAlloProcessor = { process: vi.fn() } as unknown as AlloProcessor; + mockRegistryProcessor = { process: vi.fn() } as unknown as RegistryProcessor; + mockStrategyProcessor = { process: vi.fn() } as unknown as StrategyProcessor; + + vi.mocked(AlloProcessor).mockImplementation(() => mockAlloProcessor); + vi.mocked(RegistryProcessor).mockImplementation(() => mockRegistryProcessor); + vi.mocked(StrategyProcessor).mockImplementation(() => mockStrategyProcessor); + + eventsProcessor = new EventsProcessor(chainId, mockDependencies); + }); + + it("process Allo events using AlloProcessor", async () => { + const mockChangeset: Changeset[] = [ + { type: "UpdateProject", args: { chainId, projectId: "1", project: {} } }, + ]; + const mockEvent = { + contractName: "Allo", + eventName: "PoolCreated", + args: {}, + } as unknown as ProcessorEvent<"Allo", "PoolCreated">; + + vi.spyOn(mockAlloProcessor, "process").mockResolvedValue(mockChangeset); + + const result = await eventsProcessor.processEvent(mockEvent); + + expect(mockAlloProcessor.process).toHaveBeenCalledWith(mockEvent); + expect(result).toBe(mockChangeset); + }); + + it("process Registry events using RegistryProcessor", async () => { + const mockChangeset: Changeset[] = [ + { type: "UpdateProject", args: { chainId, projectId: "1", project: {} } }, + ]; + const mockEvent = { + contractName: "Registry", + eventName: "ProfileCreated", + args: {}, + } as unknown as ProcessorEvent<"Registry", "ProfileCreated">; + + vi.spyOn(mockRegistryProcessor, "process").mockResolvedValue(mockChangeset); + + const result = await eventsProcessor.processEvent(mockEvent); + + expect(mockRegistryProcessor.process).toHaveBeenCalledWith(mockEvent); + expect(result).toBe(mockChangeset); + }); + + it("process Strategy events using StrategyProcessor", async () => { + const mockChangeset: Changeset[] = [ + { type: "UpdateRound", args: { chainId, roundId: "1", round: {} } }, + ]; + const mockEvent = { + contractName: "Strategy", + eventName: "Distributed", + args: {}, + } as unknown as ProcessorEvent<"Strategy", "Distributed">; + + vi.spyOn(mockStrategyProcessor, "process").mockResolvedValue(mockChangeset); + + const result = await eventsProcessor.processEvent(mockEvent); + + expect(mockStrategyProcessor.process).toHaveBeenCalledWith(mockEvent); + expect(result).toBe(mockChangeset); + }); + + it("throw InvalidEvent for unknown event types", async () => { + const mockEvent: ProcessorEvent = { + contractName: "Unknown" as unknown as ContractName, + eventName: "PoolCreated", + blockNumber: 1, + blockTimestamp: 1, + chainId, + logIndex: 1, + srcAddress: "0x0", + params: { + sender: "0x0", + recipientAddress: "0x0", + recipientId: "0x0", + amount: 1, + }, + transactionFields: { + hash: "0x0", + transactionIndex: 1, + }, + }; + + await expect(eventsProcessor.processEvent(mockEvent)).rejects.toThrow(InvalidEvent); + }); +}); diff --git a/packages/data-flow/test/unit/eventsRegistry.spec.ts b/packages/data-flow/test/unit/eventsRegistry.spec.ts new file mode 100644 index 0000000..077f4d3 --- /dev/null +++ b/packages/data-flow/test/unit/eventsRegistry.spec.ts @@ -0,0 +1,98 @@ +import { describe, expect, it } from "vitest"; + +import { ChainId, ProcessorEvent } from "@grants-stack-indexer/shared"; + +import { InMemoryEventsRegistry } from "../../src/eventsRegistry.js"; + +describe("InMemoryEventsRegistry", () => { + it("return null when no event has been saved", async () => { + const registry = new InMemoryEventsRegistry(); + const lastEvent = await registry.getLastProcessedEvent(); + expect(lastEvent).toBeUndefined(); + }); + + it("save and retrieve the last processed event", async () => { + const registry = new InMemoryEventsRegistry(); + const mockEvent: ProcessorEvent<"Allo", "PoolCreated"> = { + contractName: "Allo", + eventName: "PoolCreated", + blockNumber: 1, + blockTimestamp: 1234567890, + chainId: 1 as ChainId, + logIndex: 0, + srcAddress: "0x123", + strategyId: "0xstrategy", + params: { + poolId: 1n, + profileId: "0x456", + strategy: "0x789", + token: "0xtoken", + amount: 0n, + metadata: [1n, "0xmetadata"], + }, + transactionFields: { + hash: "0xabc", + transactionIndex: 0, + }, + }; + + await registry.saveLastProcessedEvent(mockEvent); + const retrievedEvent = await registry.getLastProcessedEvent(); + + expect(retrievedEvent).toEqual(mockEvent); + }); + + it("should update the last processed event when saving multiple times", async () => { + const registry = new InMemoryEventsRegistry(); + + const firstEvent: ProcessorEvent<"Allo", "PoolCreated"> = { + contractName: "Allo", + eventName: "PoolCreated", + blockNumber: 1, + blockTimestamp: 1234567890, + chainId: 1 as ChainId, + logIndex: 0, + srcAddress: "0x123", + strategyId: "0xstrategy", + params: { + poolId: 1n, + profileId: "0x456", + strategy: "0x789", + token: "0xtoken", + amount: 0n, + metadata: [1n, "0xmetadata"], + }, + transactionFields: { + hash: "0xabc", + transactionIndex: 0, + }, + }; + + const secondEvent: ProcessorEvent<"Strategy", "Registered"> = { + contractName: "Strategy", + eventName: "Registered", + blockNumber: 1, + blockTimestamp: 1234567890, + chainId: 1 as ChainId, + logIndex: 0, + srcAddress: "0x123", + strategyId: "0xstrategy", + params: { + recipientId: "0xrecipient", + data: "0xdata", + sender: "0xsender", + }, + transactionFields: { + hash: "0xabc", + transactionIndex: 0, + }, + }; + + await registry.saveLastProcessedEvent(firstEvent); + await registry.saveLastProcessedEvent(secondEvent); + + const lastEvent = await registry.getLastProcessedEvent(); + expect(lastEvent).toEqual(secondEvent); + expect(lastEvent).not.toEqual(firstEvent); + }); +}); diff --git a/packages/data-flow/test/unit/orchestrator.spec.ts b/packages/data-flow/test/unit/orchestrator.spec.ts new file mode 100644 index 0000000..c273451 --- /dev/null +++ b/packages/data-flow/test/unit/orchestrator.spec.ts @@ -0,0 +1,610 @@ +import { Address } from "viem"; +import { beforeEach, describe, expect, it, vi } from "vitest"; + +import { EvmProvider } from "@grants-stack-indexer/chain-providers"; +import { IIndexerClient } from "@grants-stack-indexer/indexer-client"; +import { UnsupportedStrategy } from "@grants-stack-indexer/processors"; +import { + Changeset, + IApplicationRepository, + IProjectRepository, + IRoundRepository, +} from "@grants-stack-indexer/repository"; +import { + AlloEvent, + ChainId, + ContractName, + ContractToEventName, + EventParams, + Hex, + ProcessorEvent, + StrategyEvent, + stringify, +} from "@grants-stack-indexer/shared"; + +import { + CoreDependencies, + IEventsRegistry, + InvalidEvent, + IStrategyRegistry, +} from "../../src/internal.js"; +import { Orchestrator } from "../../src/orchestrator.js"; + +vi.mock("../../src/eventsProcessor.js", () => { + const EventsProcessor = vi.fn(); + // eslint-disable-next-line @typescript-eslint/no-unsafe-member-access + EventsProcessor.prototype.processEvent = vi.fn(); + return { + EventsProcessor, + }; +}); +vi.mock("../../src/data-loader/dataLoader.js", () => { + const DataLoader = vi.fn(); + // eslint-disable-next-line @typescript-eslint/no-unsafe-member-access + DataLoader.prototype.applyChanges = vi.fn(); + return { + DataLoader, + }; +}); + +describe("Orchestrator", { sequential: true }, () => { + let orchestrator: Orchestrator; + let mockIndexerClient: IIndexerClient; + let mockEventsRegistry: IEventsRegistry; + let mockStrategyRegistry: IStrategyRegistry; + let mockEvmProvider: EvmProvider; + + const chainId = 1 as ChainId; + const mockFetchLimit = 10; + const mockFetchDelay = 100; + + beforeEach(() => { + // Setup mock implementations + mockIndexerClient = { + getEventsAfterBlockNumberAndLogIndex: vi.fn(), + } as unknown as IIndexerClient; + + mockEventsRegistry = { + getLastProcessedEvent: vi.fn(), + saveLastProcessedEvent: vi.fn(), + }; + + mockStrategyRegistry = { + getStrategyId: vi.fn(), + saveStrategyId: vi.fn(), + }; + + mockEvmProvider = { + readContract: vi.fn(), + } as unknown as EvmProvider; + + const dependencies: CoreDependencies = { + evmProvider: mockEvmProvider, + projectRepository: {} as unknown as IProjectRepository, + roundRepository: {} as unknown as IRoundRepository, + applicationRepository: {} as unknown as IApplicationRepository, + pricingProvider: { + getTokenPrice: vi.fn(), + }, + metadataProvider: { + getMetadata: vi.fn(), + }, + }; + + // vi.mocked(EventsProcessor).mockImplementation(() => mockEventsProcessor); + // vi.mocked(DataLoader).mockImplementation(() => mockDataLoader); + + orchestrator = new Orchestrator( + chainId, + dependencies, + mockIndexerClient, + { + eventsRegistry: mockEventsRegistry, + strategyRegistry: mockStrategyRegistry, + }, + mockFetchLimit, + mockFetchDelay, + ); + }); + + describe("Event Processing Flow", () => { + it("process events in the correct order", async () => { + const mockEvents = [ + createMockEvent("Allo", "PoolCreated", 1), + createMockEvent("Registry", "ProfileCreated", 2), + ]; + + const eventsProcessorSpy = vi.spyOn(orchestrator["eventsProcessor"], "processEvent"); + + vi.spyOn(mockEventsRegistry, "getLastProcessedEvent").mockResolvedValue(undefined); + vi.spyOn(mockIndexerClient, "getEventsAfterBlockNumberAndLogIndex") + .mockResolvedValueOnce(mockEvents) + .mockResolvedValue([]); + eventsProcessorSpy.mockResolvedValue([]); + vi.spyOn(orchestrator["dataLoader"], "applyChanges").mockResolvedValue({ + numFailed: 0, + errors: [], + changesets: [], + numExecuted: 1, + numSuccessful: 1, + }); + vi.spyOn(mockEventsRegistry, "saveLastProcessedEvent").mockImplementation(() => { + return Promise.resolve(); + }); + + void orchestrator.run(); + + await vi.waitFor( + () => { + if (eventsProcessorSpy.mock.calls.length < 2) throw new Error("Not yet called"); + }, + { + timeout: 1000, + }, + ); + + expect(eventsProcessorSpy).toHaveBeenCalledWith(mockEvents[0]); + expect(eventsProcessorSpy).toHaveBeenCalledWith(mockEvents[1]); + expect(mockEventsRegistry.saveLastProcessedEvent).toHaveBeenCalledWith(mockEvents[0]); + expect(mockEventsRegistry.saveLastProcessedEvent).toHaveBeenCalledWith(mockEvents[1]); + }); + + it("wait and keep polling on empty queue", { timeout: 1000 }, async () => { + vi.useFakeTimers(); + vi.spyOn(mockIndexerClient, "getEventsAfterBlockNumberAndLogIndex").mockResolvedValue( + [], + ); + + void orchestrator.run(); + + // Wait for a few polling cycles + await vi.advanceTimersByTimeAsync(mockFetchDelay * 2); + + expect( + vi.spyOn(orchestrator["eventsProcessor"], "processEvent"), + ).not.toHaveBeenCalled(); + expect(mockIndexerClient.getEventsAfterBlockNumberAndLogIndex).toHaveBeenCalledTimes(3); + vi.useRealTimers(); + }); + }); + + describe("Strategy ID Enhancement", () => { + it("adds strategy ID to Allo PoolCreated events", async () => { + const strategyAddress = "0x123" as Address; + const strategyId = + "0x6f9291df02b2664139cec5703c124e4ebce32879c74b6297faa1468aa5ff9ebf" as Hex; + const mockEvent = createMockEvent("Allo", "PoolCreated", 1, { + strategy: strategyAddress, + poolId: 1n, + profileId: "0x123", + token: "0x123", + amount: 100n, + metadata: [1n, "1"], + }); + + const eventsProcessorSpy = vi.spyOn(orchestrator["eventsProcessor"], "processEvent"); + + vi.spyOn(mockEventsRegistry, "getLastProcessedEvent").mockResolvedValue(undefined); + vi.spyOn(mockIndexerClient, "getEventsAfterBlockNumberAndLogIndex") + .mockResolvedValueOnce([mockEvent]) + .mockResolvedValue([]); + + vi.spyOn(mockStrategyRegistry, "getStrategyId").mockResolvedValue(undefined); + vi.spyOn(mockEvmProvider, "readContract").mockResolvedValue(strategyId); + const changesets = [ + { + type: "InsertProject", + args: { chainId, projectId: "1", project: {} }, + } as unknown as Changeset, + { + type: "InsertRoundRole", + args: { chainId, roundId: "1", roundRole: {} }, + } as unknown as Changeset, + { + type: "DeletePendingRoundRoles", + args: { chainId, roundId: "1" }, + } as unknown as Changeset, + ]; + + eventsProcessorSpy.mockResolvedValue(changesets); + + vi.spyOn(mockEventsRegistry, "saveLastProcessedEvent").mockImplementation(() => { + return Promise.resolve(); + }); + + vi.spyOn(orchestrator["dataLoader"], "applyChanges").mockResolvedValue({ + numFailed: 0, + errors: [], + changesets: ["InsertProject", "InsertRoundRole", "DeletePendingRoundRoles"], + numExecuted: 3, + numSuccessful: 3, + }); + + void orchestrator.run(); + + await vi.waitFor(() => { + if (eventsProcessorSpy.mock.calls.length < 1) throw new Error("Not yet called"); + }); + + expect(orchestrator["eventsProcessor"].processEvent).toHaveBeenCalledTimes(1); + expect(mockStrategyRegistry.saveStrategyId).toHaveBeenCalledWith( + strategyAddress, + strategyId, + ); + expect(orchestrator["eventsProcessor"].processEvent).toHaveBeenCalledWith({ + ...mockEvent, + strategyId, + }); + expect(orchestrator["dataLoader"].applyChanges).toHaveBeenCalledTimes(1); + expect(orchestrator["dataLoader"].applyChanges).toHaveBeenCalledWith(changesets); + expect(mockEventsRegistry.saveLastProcessedEvent).toHaveBeenCalledWith(mockEvent); + }); + + const strategyEvents: Record = { + Registered: "", + Distributed: "", + TimestampsUpdated: "", + AllocatedWithToken: "", + }; + + for (const event of Object.keys(strategyEvents) as StrategyEvent[]) { + const strategyAddress = "0x123" as Address; + const strategyId = + "0x6f9291df02b2664139cec5703c124e4ebce32879c74b6297faa1468aa5ff9ebf" as Hex; + it(`adds strategy ID to Strategy ${event} events`, async () => { + const mockEvent = createMockEvent("Strategy", event, 1, undefined, strategyAddress); + const changesets = [ + { + type: "InsertApplication", + args: { chainId, applicationId: "1", application: {} }, + } as unknown as Changeset, + ]; + + const eventsProcessorSpy = vi.spyOn( + orchestrator["eventsProcessor"], + "processEvent", + ); + + vi.spyOn(mockEventsRegistry, "getLastProcessedEvent").mockResolvedValue(undefined); + vi.spyOn(mockIndexerClient, "getEventsAfterBlockNumberAndLogIndex") + .mockResolvedValueOnce([mockEvent]) + .mockResolvedValue([]); + + vi.spyOn(mockStrategyRegistry, "getStrategyId").mockResolvedValue(strategyId); + eventsProcessorSpy.mockResolvedValue(changesets); + vi.spyOn(mockEventsRegistry, "saveLastProcessedEvent").mockImplementation(() => { + return Promise.resolve(); + }); + vi.spyOn(orchestrator["dataLoader"], "applyChanges").mockResolvedValue({ + numFailed: 0, + errors: [], + changesets: ["InsertApplication"], + numExecuted: 1, + numSuccessful: 1, + }); + + void orchestrator.run(); + + await vi.waitFor(() => { + if (eventsProcessorSpy.mock.calls.length < 1) throw new Error("Not yet called"); + }); + + expect(orchestrator["eventsProcessor"].processEvent).toHaveBeenCalledTimes(1); + expect(orchestrator["eventsProcessor"].processEvent).toHaveBeenCalledWith({ + ...mockEvent, + strategyId, + }); + expect(mockStrategyRegistry.getStrategyId).toHaveBeenCalledWith(strategyAddress); + expect(orchestrator["dataLoader"].applyChanges).toHaveBeenCalledTimes(1); + expect(orchestrator["dataLoader"].applyChanges).toHaveBeenCalledWith(changesets); + expect(mockEventsRegistry.saveLastProcessedEvent).toHaveBeenCalledWith(mockEvent); + }); + } + + it("discards events from unhandled strategies", async () => { + vi.useFakeTimers(); + const unhandledStrategyId = "0x6f9aaaaf02b266413f" as Hex; + const strategyAddress = "0x123" as Address; + const mockEvent = createMockEvent( + "Strategy", + "Registered", + 1, + undefined, + strategyAddress, + ); + + vi.spyOn(mockEventsRegistry, "getLastProcessedEvent").mockResolvedValue(undefined); + vi.spyOn(mockIndexerClient, "getEventsAfterBlockNumberAndLogIndex") + .mockResolvedValueOnce([mockEvent]) + .mockResolvedValue([]); + + vi.spyOn(mockStrategyRegistry, "getStrategyId").mockResolvedValue(unhandledStrategyId); + vi.spyOn(mockEvmProvider, "readContract").mockResolvedValue(unhandledStrategyId); + + void orchestrator.run(); + + await vi.advanceTimersByTimeAsync(mockFetchDelay * 2); + + expect(orchestrator["eventsProcessor"].processEvent).not.toHaveBeenCalled(); + expect(orchestrator["dataLoader"].applyChanges).not.toHaveBeenCalled(); + expect(mockEventsRegistry.saveLastProcessedEvent).not.toHaveBeenCalled(); + vi.useRealTimers(); + }); + + it("uses cached strategy ID from registry", async () => { + const strategyAddress = "0x123" as Address; + const strategyId = + "0x6f9291df02b2664139cec5703c124e4ebce32879c74b6297faa1468aa5ff9ebf" as Hex; + const poolCreatedEvent = createMockEvent("Allo", "PoolCreated", 1, { + strategy: strategyAddress, + poolId: 1n, + profileId: "0x123", + token: "0x123", + amount: 100n, + metadata: [1n, "1"], + }); + const registeredEvent = createMockEvent( + "Strategy", + "Registered", + 2, + { + recipientId: "0x123", + data: "0x123", + sender: "0x123", + }, + strategyAddress, + ); + + const eventsProcessorSpy = vi.spyOn(orchestrator["eventsProcessor"], "processEvent"); + + vi.spyOn(mockStrategyRegistry, "getStrategyId") + .mockResolvedValueOnce(undefined) + .mockResolvedValue(strategyId); + + vi.spyOn(mockIndexerClient, "getEventsAfterBlockNumberAndLogIndex") + .mockResolvedValueOnce([poolCreatedEvent]) + .mockResolvedValueOnce([]) + .mockResolvedValueOnce([registeredEvent]) + .mockResolvedValue([]); + + eventsProcessorSpy.mockResolvedValue([]); + vi.spyOn(orchestrator["dataLoader"], "applyChanges").mockResolvedValue({ + numFailed: 0, + errors: [], + changesets: [], + numExecuted: 1, + numSuccessful: 1, + }); + vi.spyOn(mockEventsRegistry, "saveLastProcessedEvent").mockImplementation(() => { + return Promise.resolve(); + }); + + void orchestrator.run(); + + await vi.waitFor( + () => { + if (eventsProcessorSpy.mock.calls.length < 2) throw new Error("Not yet called"); + }, + { + timeout: 1000, + interval: mockFetchDelay, + }, + ); + + expect(mockEvmProvider.readContract).toHaveBeenCalledTimes(1); + expect(mockStrategyRegistry.getStrategyId).toHaveBeenLastCalledWith(strategyAddress); + expect(eventsProcessorSpy).toHaveBeenLastCalledWith({ + ...registeredEvent, + strategyId, + }); + }); + + it("does not add strategy ID to events from other strategies", async () => { + const mockEvent = createMockEvent("Registry", "ProfileCreated", 1, undefined); + const changesets = [ + { + type: "InsertPendingRoundRole", + args: { chainId, roundId: "1", roundRole: {} }, + } as unknown as Changeset, + ]; + + const eventsProcessorSpy = vi.spyOn(orchestrator["eventsProcessor"], "processEvent"); + + vi.spyOn(mockEventsRegistry, "getLastProcessedEvent").mockResolvedValue(undefined); + vi.spyOn(mockIndexerClient, "getEventsAfterBlockNumberAndLogIndex") + .mockResolvedValueOnce([mockEvent]) + .mockResolvedValue([]); + + eventsProcessorSpy.mockResolvedValue(changesets); + + vi.spyOn(mockEventsRegistry, "saveLastProcessedEvent").mockImplementation(() => { + return Promise.resolve(); + }); + + vi.spyOn(orchestrator["dataLoader"], "applyChanges").mockResolvedValue({ + numFailed: 0, + errors: [], + changesets: ["InsertPendingRoundRole"], + numExecuted: 1, + numSuccessful: 1, + }); + + void orchestrator.run(); + + await vi.waitFor(() => { + if (eventsProcessorSpy.mock.calls.length < 1) throw new Error("Not yet called"); + }); + + expect(orchestrator["eventsProcessor"].processEvent).toHaveBeenCalledTimes(1); + expect(orchestrator["eventsProcessor"].processEvent).toHaveBeenCalledWith(mockEvent); + expect(mockStrategyRegistry.getStrategyId).not.toHaveBeenCalled(); + expect(mockStrategyRegistry.saveStrategyId).not.toHaveBeenCalled(); + }); + }); + + describe("Error Handling", () => { + it.skip("retries error"); + + it("keeps running when there is an error", async () => { + const eventsProcessorSpy = vi.spyOn(orchestrator["eventsProcessor"], "processEvent"); + const consoleSpy = vi.spyOn(console, "error"); + const errorEvent = createMockEvent("Allo", "Unknown" as unknown as AlloEvent, 1); + const error = new Error("test"); + + vi.spyOn(mockIndexerClient, "getEventsAfterBlockNumberAndLogIndex") + .mockResolvedValueOnce([errorEvent]) + .mockResolvedValueOnce([createMockEvent("Registry", "ProfileCreated", 2)]) + .mockResolvedValue([]); + + eventsProcessorSpy.mockRejectedValueOnce(error).mockResolvedValueOnce([]); + vi.spyOn(orchestrator["dataLoader"], "applyChanges").mockResolvedValue({ + numFailed: 0, + errors: [], + changesets: [], + numExecuted: 1, + numSuccessful: 1, + }); + vi.spyOn(mockEventsRegistry, "saveLastProcessedEvent").mockImplementation(() => { + return Promise.resolve(); + }); + + void orchestrator.run(); + + await vi.waitFor( + () => { + if (eventsProcessorSpy.mock.calls.length < 2) throw new Error("Not yet called"); + }, + { + timeout: 1000, + }, + ); + + expect(eventsProcessorSpy).toHaveBeenCalledTimes(2); + expect(orchestrator["dataLoader"].applyChanges).toHaveBeenCalledTimes(1); + expect(mockEventsRegistry.saveLastProcessedEvent).toHaveBeenCalledTimes(1); + expect(consoleSpy).toHaveBeenCalledTimes(1); + expect(consoleSpy).toHaveBeenCalledWith( + expect.stringContaining(`Error processing event: ${stringify(errorEvent)}`), + error, + ); + }); + + it("logs error for InvalidEvent", async () => { + const mockEvent = createMockEvent("Allo", "Unknown" as unknown as AlloEvent, 1); + const error = new InvalidEvent(mockEvent); + + const consoleSpy = vi.spyOn(console, "error"); + const eventsProcessorSpy = vi.spyOn(orchestrator["eventsProcessor"], "processEvent"); + + vi.spyOn(mockIndexerClient, "getEventsAfterBlockNumberAndLogIndex") + .mockResolvedValueOnce([mockEvent]) + .mockResolvedValue([]); + eventsProcessorSpy.mockRejectedValue(error); + + void orchestrator.run(); + + await vi.waitFor(() => { + if (eventsProcessorSpy.mock.calls.length < 1) throw new Error("Not yet called"); + }); + + expect(consoleSpy).toHaveBeenCalledWith(expect.stringContaining("InvalidEvent")); + expect(consoleSpy).toHaveBeenCalledWith(expect.stringContaining(stringify(mockEvent))); + expect(orchestrator["dataLoader"].applyChanges).not.toHaveBeenCalled(); + expect(mockEventsRegistry.saveLastProcessedEvent).not.toHaveBeenCalled(); + }); + + it("logs error for UnsupportedEvent", async () => { + const strategyId = + "0x6f9291df02b2664139cec5703c124e4ebce32879c74b6297faa1468aa5ff9ebf" as Hex; + const mockEvent = createMockEvent( + "Strategy", + "NotHandled" as unknown as StrategyEvent, + 1, + ); + const error = new UnsupportedStrategy(strategyId); + + vi.spyOn(mockStrategyRegistry, "getStrategyId").mockResolvedValue(strategyId); + vi.spyOn(mockIndexerClient, "getEventsAfterBlockNumberAndLogIndex") + .mockResolvedValueOnce([mockEvent]) + .mockResolvedValue([]); + vi.spyOn(orchestrator["eventsProcessor"], "processEvent").mockRejectedValue(error); + + const consoleSpy = vi.spyOn(console, "error"); + + void orchestrator.run(); + + await vi.waitFor(() => { + if (consoleSpy.mock.calls.length < 1) throw new Error("Not yet called"); + }); + + expect(consoleSpy).toHaveBeenCalledTimes(1); + expect(consoleSpy).toHaveBeenCalledWith( + expect.stringContaining(`Strategy ${strategyId} unsupported`), + ); + expect(consoleSpy).toHaveBeenCalledWith(expect.stringContaining(stringify(mockEvent))); + expect(orchestrator["dataLoader"].applyChanges).not.toHaveBeenCalled(); + expect(mockEventsRegistry.saveLastProcessedEvent).not.toHaveBeenCalled(); + }); + + it("logs DataLoader errors", async () => { + const mockEvent = createMockEvent("Allo", "PoolCreated", 1); + const mockChangesets: Changeset[] = [ + { type: "UpdateProject", args: { chainId, projectId: "1", project: {} } }, + ]; + + const consoleSpy = vi.spyOn(console, "error").mockImplementation(() => {}); + const dataLoaderSpy = vi.spyOn(orchestrator["dataLoader"], "applyChanges"); + + vi.spyOn(mockIndexerClient, "getEventsAfterBlockNumberAndLogIndex") + .mockResolvedValueOnce([mockEvent]) + .mockResolvedValue([]); + vi.spyOn(orchestrator["eventsProcessor"], "processEvent").mockResolvedValue( + mockChangesets, + ); + dataLoaderSpy.mockResolvedValue({ + numFailed: 1, + errors: ["Failed to update project"], + changesets: ["UpdateProject"], + numExecuted: 1, + numSuccessful: 0, + }); + + void orchestrator.run(); + + await vi.waitFor(() => { + if (dataLoaderSpy.mock.calls.length < 1) throw new Error("Not yet called"); + }); + + expect(consoleSpy).toHaveBeenCalledWith( + expect.stringContaining("Failed to apply changesets"), + ); + expect(consoleSpy).toHaveBeenCalledWith(expect.stringContaining(stringify(mockEvent))); + expect(dataLoaderSpy).toHaveBeenCalledTimes(1); + expect(mockEventsRegistry.saveLastProcessedEvent).not.toHaveBeenCalled(); + }); + }); +}); + +// Helper function to create mock events +function createMockEvent>( + contractName: T, + eventName: E, + blockNumber: number, + params: EventParams = {} as EventParams, + srcAddress: Address = "0x123" as Address, +): ProcessorEvent { + return { + contractName, + eventName, + blockNumber, + blockTimestamp: 1234567890, + chainId: 1 as ChainId, + logIndex: 0, + srcAddress, + params, + transactionFields: { + hash: "0xabc", + transactionIndex: 0, + }, + } as ProcessorEvent; +} diff --git a/packages/data-flow/test/unit/strategyRegistry.spec.ts b/packages/data-flow/test/unit/strategyRegistry.spec.ts new file mode 100644 index 0000000..4ace8ee --- /dev/null +++ b/packages/data-flow/test/unit/strategyRegistry.spec.ts @@ -0,0 +1,42 @@ +import { Address, Hex } from "viem"; +import { describe, expect, it } from "vitest"; + +import { InMemoryStrategyRegistry } from "../../src/strategyRegistry.js"; + +describe("InMemoryStrategyRegistry", () => { + it("return undefined for non-existent strategy address", async () => { + const registry = new InMemoryStrategyRegistry(); + const strategyAddress = "0x123" as Address; + + const strategyId = await registry.getStrategyId(strategyAddress); + expect(strategyId).toBeUndefined(); + }); + + it("save and retrieve strategy id", async () => { + const registry = new InMemoryStrategyRegistry(); + const strategyAddress = "0x123" as Address; + const strategyId = "0xabc" as Hex; + + await registry.saveStrategyId(strategyAddress, strategyId); + const retrievedId = await registry.getStrategyId(strategyAddress); + + expect(retrievedId).toBe(strategyId); + }); + + it("handle multiple strategy addresses independently", async () => { + const registry = new InMemoryStrategyRegistry(); + const firstAddress = "0x123" as Address; + const secondAddress = "0x456" as Address; + const firstStrategyId = "0xabc" as Hex; + const secondStrategyId = "0xdef" as Hex; + + await registry.saveStrategyId(firstAddress, firstStrategyId); + await registry.saveStrategyId(secondAddress, secondStrategyId); + + const retrievedFirstId = await registry.getStrategyId(firstAddress); + const retrievedSecondId = await registry.getStrategyId(secondAddress); + + expect(retrievedFirstId).toBe(firstStrategyId); + expect(retrievedSecondId).toBe(secondStrategyId); + }); +}); diff --git a/packages/data-flow/test/unit/utils/queue.spec.ts b/packages/data-flow/test/unit/utils/queue.spec.ts new file mode 100644 index 0000000..6842109 --- /dev/null +++ b/packages/data-flow/test/unit/utils/queue.spec.ts @@ -0,0 +1,79 @@ +import { describe, expect, it } from "vitest"; + +import { Queue } from "../../../src/utils/queue.js"; + +describe("Queue", () => { + it("create an empty queue", () => { + const queue = new Queue(); + expect(queue.length).toBe(0); + expect(queue.isEmpty()).toBe(true); + }); + + it("push and pop items correctly", () => { + const queue = new Queue(); + queue.push(1, 2, 3); + + expect(queue.length).toBe(3); + expect(queue.pop()).toBe(1); + expect(queue.pop()).toBe(2); + expect(queue.pop()).toBe(3); + expect(queue.isEmpty()).toBe(true); + }); + + it("peek at the first item without removing it", () => { + const queue = new Queue(); + queue.push(1, 2); + + expect(queue.peek()).toBe(1); + expect(queue.length).toBe(2); // Length shouldn't change after peek + }); + + it("handles pushing more items than initial capacity", () => { + const queue = new Queue(2); + queue.push(1, 2, 3, 4, 5); + + expect(queue.length).toBe(5); + expect(queue.pop()).toBe(1); + expect(queue.pop()).toBe(2); + expect(queue.pop()).toBe(3); + expect(queue.pop()).toBe(4); + expect(queue.pop()).toBe(5); + }); + + it("handles circular buffer wrapping", () => { + const queue = new Queue(3); + queue.push(1, 2); + queue.pop(); // Remove 1 + queue.pop(); // Remove 2 + queue.push(3, 4); // Should wrap around + + expect(queue.length).toBe(2); + expect(queue.pop()).toBe(3); + expect(queue.pop()).toBe(4); + }); + + it("returns undefined when popping from empty queue", () => { + const queue = new Queue(); + expect(queue.pop()).toBeUndefined(); + expect(queue.peek()).toBeUndefined(); + }); + + it("shrinks buffer when queue becomes very empty", () => { + const queue = new Queue(20000); + const manyItems = Array.from({ length: 18000 }, (_, i) => i); + + queue.push(...manyItems); + expect(queue.length).toBe(18000); + + // Pop most items to trigger shrinking + for (let i = 0; i < 17000; i++) { + queue.pop(); + } + + expect(queue.length).toBe(1000); + // Verify remaining items are correct + for (let i = 17000; i < 18000; i++) { + expect(queue.pop()).toBe(i); + } + }); +}); diff --git a/packages/data-flow/vitest.config.ts b/packages/data-flow/vitest.config.ts index 8e1bbf4..22ef210 100644 --- a/packages/data-flow/vitest.config.ts +++ b/packages/data-flow/vitest.config.ts @@ -10,7 +10,14 @@ export default defineConfig({ coverage: { provider: "v8", reporter: ["text", "json", "html"], // Coverage reporters - exclude: ["node_modules", "dist", "src/index.ts", ...configDefaults.exclude], // Files to exclude from coverage + exclude: [ + "node_modules", + "dist", + "src/index.ts", + "src/external.ts", + "test/**", + ...configDefaults.exclude, + ], // Files to exclude from coverage }, }, resolve: { diff --git a/packages/processors/src/external.ts b/packages/processors/src/external.ts index 554a840..3cedb0e 100644 --- a/packages/processors/src/external.ts +++ b/packages/processors/src/external.ts @@ -1,5 +1,8 @@ // Add your external exports here export { StrategyProcessor, AlloProcessor, RegistryProcessor } from "./internal.js"; + +export { UnsupportedStrategy, UnsupportedEventException } from "./internal.js"; + export type { IProcessor, ProcessorDependencies } from "./internal.js"; export { existsHandler } from "./internal.js";