diff --git a/packages/data-flow/package.json b/packages/data-flow/package.json index 4502619..fc13c15 100644 --- a/packages/data-flow/package.json +++ b/packages/data-flow/package.json @@ -28,7 +28,11 @@ "test:cov": "vitest run --config vitest.config.ts --coverage" }, "dependencies": { + "@grants-stack-indexer/chain-providers": "workspace:*", "@grants-stack-indexer/indexer-client": "workspace:*", + "@grants-stack-indexer/metadata": "workspace:*", + "@grants-stack-indexer/pricing": "workspace:*", + "@grants-stack-indexer/processors": "workspace:*", "@grants-stack-indexer/repository": "workspace:*", "@grants-stack-indexer/shared": "workspace:*", "viem": "2.21.19" diff --git a/packages/data-flow/src/abis/index.ts b/packages/data-flow/src/abis/index.ts new file mode 100644 index 0000000..8b6c147 --- /dev/null +++ b/packages/data-flow/src/abis/index.ts @@ -0,0 +1 @@ +export * from "./strategy.abi.js"; diff --git a/packages/data-flow/src/abis/strategy.abi.ts b/packages/data-flow/src/abis/strategy.abi.ts new file mode 100644 index 0000000..e1e182d --- /dev/null +++ b/packages/data-flow/src/abis/strategy.abi.ts @@ -0,0 +1,225 @@ +export const iStrategyAbi = [ + { + type: "function", + name: "allocate", + inputs: [ + { name: "_data", type: "bytes", internalType: "bytes" }, + { name: "_sender", type: "address", internalType: "address" }, + ], + outputs: [], + stateMutability: "payable", + }, + { + type: "function", + name: "distribute", + inputs: [ + { name: "_recipientIds", type: "address[]", internalType: "address[]" }, + { name: "_data", type: "bytes", internalType: "bytes" }, + { name: "_sender", type: "address", internalType: "address" }, + ], + outputs: [], + stateMutability: "nonpayable", + }, + { + type: "function", + name: "getAllo", + inputs: [], + outputs: [{ name: "", type: "address", internalType: "contract IAllo" }], + stateMutability: "view", + }, + { + type: "function", + name: "getPayouts", + inputs: [ + { name: "_recipientIds", type: "address[]", internalType: "address[]" }, + { name: "_data", type: "bytes[]", internalType: "bytes[]" }, + ], + outputs: [ + { + name: "", + type: "tuple[]", + internalType: "struct IStrategy.PayoutSummary[]", + components: [ + { + name: "recipientAddress", + type: "address", + internalType: "address", + }, + { name: "amount", type: "uint256", internalType: "uint256" }, + ], + }, + ], + stateMutability: "view", + }, + { + type: "function", + name: "getPoolAmount", + inputs: [], + outputs: [{ name: "", type: "uint256", internalType: "uint256" }], + stateMutability: "view", + }, + { + type: "function", + name: "getPoolId", + inputs: [], + outputs: [{ name: "", type: "uint256", internalType: "uint256" }], + stateMutability: "view", + }, + { + type: "function", + name: "getRecipientStatus", + inputs: [{ name: "_recipientId", type: "address", internalType: "address" }], + outputs: [{ name: "", type: "uint8", internalType: "enum IStrategy.Status" }], + stateMutability: "view", + }, + { + type: "function", + name: "getStrategyId", + inputs: [], + outputs: [{ name: "", type: "bytes32", internalType: "bytes32" }], + stateMutability: "view", + }, + { + type: "function", + name: "increasePoolAmount", + inputs: [{ name: "_amount", type: "uint256", internalType: "uint256" }], + outputs: [], + stateMutability: "nonpayable", + }, + { + type: "function", + name: "initialize", + inputs: [ + { name: "_poolId", type: "uint256", internalType: "uint256" }, + { name: "_data", type: "bytes", internalType: "bytes" }, + ], + outputs: [], + stateMutability: "nonpayable", + }, + { + type: "function", + name: "isPoolActive", + inputs: [], + outputs: [{ name: "", type: "bool", internalType: "bool" }], + stateMutability: "nonpayable", + }, + { + type: "function", + name: "isValidAllocator", + inputs: [{ name: "_allocator", type: "address", internalType: "address" }], + outputs: [{ name: "", type: "bool", internalType: "bool" }], + stateMutability: "view", + }, + { + type: "function", + name: "registerRecipient", + inputs: [ + { name: "_data", type: "bytes", internalType: "bytes" }, + { name: "_sender", type: "address", internalType: "address" }, + ], + outputs: [{ name: "", type: "address", internalType: "address" }], + stateMutability: "payable", + }, + { + type: "event", + name: "Allocated", + inputs: [ + { + name: "recipientId", + type: "address", + indexed: true, + internalType: "address", + }, + { + name: "amount", + type: "uint256", + indexed: false, + internalType: "uint256", + }, + { + name: "token", + type: "address", + indexed: false, + internalType: "address", + }, + { + name: "sender", + type: "address", + indexed: false, + internalType: "address", + }, + ], + anonymous: false, + }, + { + type: "event", + name: "Distributed", + inputs: [ + { + name: "recipientId", + type: "address", + indexed: true, + internalType: "address", + }, + { + name: "recipientAddress", + type: "address", + indexed: false, + internalType: "address", + }, + { + name: "amount", + type: "uint256", + indexed: false, + internalType: "uint256", + }, + { + name: "sender", + type: "address", + indexed: false, + internalType: "address", + }, + ], + anonymous: false, + }, + { + type: "event", + name: "Initialized", + inputs: [ + { + name: "poolId", + type: "uint256", + indexed: false, + internalType: "uint256", + }, + { name: "data", type: "bytes", indexed: false, internalType: "bytes" }, + ], + anonymous: false, + }, + { + type: "event", + name: "PoolActive", + inputs: [{ name: "active", type: "bool", indexed: false, internalType: "bool" }], + anonymous: false, + }, + { + type: "event", + name: "Registered", + inputs: [ + { + name: "recipientId", + type: "address", + indexed: true, + internalType: "address", + }, + { name: "data", type: "bytes", indexed: false, internalType: "bytes" }, + { + name: "sender", + type: "address", + indexed: false, + internalType: "address", + }, + ], + anonymous: false, + }, +] as const; diff --git a/packages/data-flow/src/eventsFetcher.ts b/packages/data-flow/src/eventsFetcher.ts index cf83383..3d87a16 100644 --- a/packages/data-flow/src/eventsFetcher.ts +++ b/packages/data-flow/src/eventsFetcher.ts @@ -1,5 +1,5 @@ import { IIndexerClient } from "@grants-stack-indexer/indexer-client"; -import { AnyProtocolEvent } from "@grants-stack-indexer/shared"; +import { AnyIndexerFetchedEvent, ChainId } from "@grants-stack-indexer/shared"; import { IEventsFetcher } from "./interfaces/index.js"; @@ -7,11 +7,11 @@ export class EventsFetcher implements IEventsFetcher { constructor(private indexerClient: IIndexerClient) {} /* @inheritdoc */ async fetchEventsByBlockNumberAndLogIndex( - chainId: bigint, - blockNumber: bigint, + chainId: ChainId, + blockNumber: number, logIndex: number, limit: number = 100, - ): Promise { + ): Promise { return await this.indexerClient.getEventsAfterBlockNumberAndLogIndex( chainId, blockNumber, diff --git a/packages/data-flow/src/eventsProcessor.ts b/packages/data-flow/src/eventsProcessor.ts new file mode 100644 index 0000000..b27948e --- /dev/null +++ b/packages/data-flow/src/eventsProcessor.ts @@ -0,0 +1,55 @@ +import type { Changeset } from "@grants-stack-indexer/repository"; +import { + AlloProcessor, + ProcessorDependencies, + RegistryProcessor, + StrategyProcessor, +} from "@grants-stack-indexer/processors"; +import { + AnyEvent, + ChainId, + ContractName, + isAlloEvent, + isRegistryEvent, + isStrategyEvent, + ProcessorEvent, +} from "@grants-stack-indexer/shared"; + +import { InvalidEvent } from "./exceptions/index.js"; + +/** + * EventsProcessor handles the processing of Allo V2 events by delegating them to the appropriate processor + * (Allo, Registry, or Strategy) based on the event type. Each processor generates changesets that represent + * the required database updates. + */ +export class EventsProcessor { + alloProcessor: AlloProcessor; + registryProcessor: RegistryProcessor; + strategyProcessor: StrategyProcessor; + + constructor(chainId: ChainId, dependencies: Readonly) { + this.alloProcessor = new AlloProcessor(chainId, dependencies); + this.registryProcessor = new RegistryProcessor(chainId, dependencies); + this.strategyProcessor = new StrategyProcessor(chainId, dependencies); + } + + /** + * Process an Allo V2 event and return the changesets + * @param event - The event to process + * @returns The changesets + * @throws InvalidEvent if the event is not a valid Allo V2 event (Allo, Registry or Strategy) + */ + public async processEvent(event: ProcessorEvent): Promise { + if (isAlloEvent(event)) { + return await this.alloProcessor.process(event); + } + if (isRegistryEvent(event)) { + return await this.registryProcessor.process(event); + } + if (isStrategyEvent(event)) { + return await this.strategyProcessor.process(event); + } + + throw new InvalidEvent(event); + } +} diff --git a/packages/data-flow/src/eventsRegistry.ts b/packages/data-flow/src/eventsRegistry.ts new file mode 100644 index 0000000..ba361a4 --- /dev/null +++ b/packages/data-flow/src/eventsRegistry.ts @@ -0,0 +1,25 @@ +import type { AnyEvent, ContractName, ProcessorEvent } from "@grants-stack-indexer/shared"; + +import type { IEventsRegistry } from "./internal.js"; + +/** + * Class to store the last processed event in memory + */ +//TODO: Implement storage version to persist the last processed event. we need to store it by chainId +export class InMemoryEventsRegistry implements IEventsRegistry { + private lastProcessedEvent: ProcessorEvent | undefined; + + /** + * @inheritdoc + */ + async getLastProcessedEvent(): Promise | undefined> { + return this.lastProcessedEvent; + } + + /** + * @inheritdoc + */ + async saveLastProcessedEvent(event: ProcessorEvent): Promise { + this.lastProcessedEvent = event; + } +} diff --git a/packages/data-flow/src/exceptions/index.ts b/packages/data-flow/src/exceptions/index.ts index 6054bb7..ce2d72e 100644 --- a/packages/data-flow/src/exceptions/index.ts +++ b/packages/data-flow/src/exceptions/index.ts @@ -1 +1,2 @@ +export * from "./invalidEvent.js"; export * from "./invalidChangeset.exception.js"; diff --git a/packages/data-flow/src/exceptions/invalidEvent.ts b/packages/data-flow/src/exceptions/invalidEvent.ts new file mode 100644 index 0000000..849c145 --- /dev/null +++ b/packages/data-flow/src/exceptions/invalidEvent.ts @@ -0,0 +1,9 @@ +import { AnyEvent, ContractName, ProcessorEvent, stringify } from "@grants-stack-indexer/shared"; + +export class InvalidEvent extends Error { + constructor(event: ProcessorEvent) { + super(`Event couldn't be processed: ${stringify(event)}`); + + this.name = "InvalidEvent"; + } +} diff --git a/packages/data-flow/src/interfaces/eventsFetcher.interface.ts b/packages/data-flow/src/interfaces/eventsFetcher.interface.ts index a02c04e..e16ab86 100644 --- a/packages/data-flow/src/interfaces/eventsFetcher.interface.ts +++ b/packages/data-flow/src/interfaces/eventsFetcher.interface.ts @@ -1,4 +1,4 @@ -import { AnyProtocolEvent } from "@grants-stack-indexer/shared"; +import { AnyIndexerFetchedEvent, ChainId } from "@grants-stack-indexer/shared"; /** * Interface for the events fetcher @@ -12,9 +12,9 @@ export interface IEventsFetcher { * @param limit limit of events to fetch */ fetchEventsByBlockNumberAndLogIndex( - chainId: bigint, - blockNumber: bigint, + chainId: ChainId, + blockNumber: number, logIndex: number, limit?: number, - ): Promise; + ): Promise; } diff --git a/packages/data-flow/src/interfaces/eventsRegistry.interface.ts b/packages/data-flow/src/interfaces/eventsRegistry.interface.ts new file mode 100644 index 0000000..8f061f1 --- /dev/null +++ b/packages/data-flow/src/interfaces/eventsRegistry.interface.ts @@ -0,0 +1,18 @@ +import { AnyEvent, ContractName, ProcessorEvent } from "@grants-stack-indexer/shared"; + +/** + * The events registry saves as a checkpoint to the last processed event by the system. + * This is used to resume the indexing from the last processed event in case of an error or temporary interruption. + */ +export interface IEventsRegistry { + /** + * Get the last processed event by the system + * @returns The last processed event or undefined if no event has been processed yet. + */ + getLastProcessedEvent(): Promise | undefined>; + /** + * Save the last processed event by the system + * @param event - The event to save. + */ + saveLastProcessedEvent(event: ProcessorEvent): Promise; +} diff --git a/packages/data-flow/src/interfaces/index.ts b/packages/data-flow/src/interfaces/index.ts index 883b4bb..b46333d 100644 --- a/packages/data-flow/src/interfaces/index.ts +++ b/packages/data-flow/src/interfaces/index.ts @@ -1,2 +1,4 @@ export * from "./eventsFetcher.interface.js"; export * from "./dataLoader.interface.js"; +export * from "./eventsRegistry.interface.js"; +export * from "./strategyRegistry.interface.js"; diff --git a/packages/data-flow/src/interfaces/strategyRegistry.interface.ts b/packages/data-flow/src/interfaces/strategyRegistry.interface.ts new file mode 100644 index 0000000..8d3aa44 --- /dev/null +++ b/packages/data-flow/src/interfaces/strategyRegistry.interface.ts @@ -0,0 +1,22 @@ +import { Address, Hex } from "viem"; + +/** + * The strategy registry saves the mapping between the strategy address and the strategy id. Serves as a Cache + * to avoid having to read from the chain to get the strategy id every time. + */ +//TODO: implement a mechanism to record Strategy that we still don't have a corresponding handler +// we need to store and mark that this strategy is not handled yet, so when it's supported we can process all of the pending events for it +export interface IStrategyRegistry { + /** + * Get the strategy id by the strategy address + * @param strategyAddress - The strategy address + * @returns The strategy id or undefined if the strategy address is not registered + */ + getStrategyId(strategyAddress: Address): Promise; + /** + * Save the strategy id by the strategy address + * @param strategyAddress - The strategy address + * @param strategyId - The strategy id + */ + saveStrategyId(strategyAddress: Address, strategyId: Hex): Promise; +} diff --git a/packages/data-flow/src/internal.ts b/packages/data-flow/src/internal.ts index b6a22e0..e7261a5 100644 --- a/packages/data-flow/src/internal.ts +++ b/packages/data-flow/src/internal.ts @@ -1,5 +1,7 @@ export * from "./types/index.js"; export * from "./interfaces/index.js"; export * from "./exceptions/index.js"; +export * from "./abis/index.js"; +export * from "./utils/index.js"; export * from "./data-loader/index.js"; export * from "./eventsFetcher.js"; diff --git a/packages/data-flow/src/orchestrator.ts b/packages/data-flow/src/orchestrator.ts new file mode 100644 index 0000000..d1bb8e6 --- /dev/null +++ b/packages/data-flow/src/orchestrator.ts @@ -0,0 +1,233 @@ +import { IIndexerClient } from "@grants-stack-indexer/indexer-client"; +import { + existsHandler, + UnsupportedEventException, + UnsupportedStrategy, +} from "@grants-stack-indexer/processors"; +import { + Address, + AnyEvent, + ChainId, + ContractName, + Hex, + isAlloEvent, + isStrategyEvent, + ProcessorEvent, + StrategyEvent, + stringify, +} from "@grants-stack-indexer/shared"; + +import type { IEventsFetcher, IEventsRegistry, IStrategyRegistry } from "./interfaces/index.js"; +import { EventsFetcher } from "./eventsFetcher.js"; +import { EventsProcessor } from "./eventsProcessor.js"; +import { InvalidEvent } from "./exceptions/index.js"; +import { CoreDependencies, DataLoader, delay, IQueue, iStrategyAbi, Queue } from "./internal.js"; + +/** + * The Orchestrator is the central coordinator of the data flow system, managing the interaction between + * three main components: + * + * 1. Events Fetcher: Retrieves blockchain events from the indexer service + * 2. Events Processor: Processes and transforms raw events into domain events + * 3. Data Loader: Persists the processed events into the database + * + * The Orchestrator implements a continuous processing loop that: + * + * 1. Fetches batches of events from the indexer and stores them in an internal queue + * 2. Processes each event from the queue: + * - For strategy events and PoolCreated from Allo contract, enhances them with strategyId + * - Forwards the event to the Events Processor which is in charge of delagating the processing of the event to the correct handler + * - Discards events for unsupported strategies or events + * 3. Loads the processed events into the database via the Data Loader + * + * The Orchestrator provides fault tolerance and performance optimization through: + * - Configurable batch sizes for event fetching + * - Delayed processing to prevent overwhelming the system + * - Error handling and logging for various failure scenarios + * - Registry tracking of supported/unsupported strategies and events + * + * TODO: Enhance the error handling/retries, logging and observability + * TODO: Handle unhandled strategies appropriately + */ +export class Orchestrator { + private readonly eventsQueue: IQueue>; + private readonly eventsFetcher: IEventsFetcher; + private readonly eventsProcessor: EventsProcessor; + private readonly eventsRegistry: IEventsRegistry; + private readonly strategyRegistry: IStrategyRegistry; + private readonly dataLoader: DataLoader; + + /** + * @param chainId - The chain id + * @param dependencies - The core dependencies + * @param indexerClient - The indexer client + * @param registries - The registries + * @param fetchLimit - The fetch limit + * @param fetchDelayInMs - The fetch delay in milliseconds + */ + constructor( + private chainId: ChainId, + private dependencies: Readonly, + private indexerClient: IIndexerClient, + private registries: { + eventsRegistry: IEventsRegistry; + strategyRegistry: IStrategyRegistry; + }, + private fetchLimit: number = 1000, + private fetchDelayInMs: number = 10000, + ) { + this.eventsFetcher = new EventsFetcher(this.indexerClient); + this.eventsProcessor = new EventsProcessor(this.chainId, this.dependencies); + this.eventsRegistry = registries.eventsRegistry; + this.strategyRegistry = registries.strategyRegistry; + this.dataLoader = new DataLoader({ + project: this.dependencies.projectRepository, + round: this.dependencies.roundRepository, + application: this.dependencies.applicationRepository, + }); + this.eventsQueue = new Queue>(fetchLimit); + } + + async run(signal: AbortSignal): Promise { + while (!signal.aborted) { + let event: ProcessorEvent | undefined; + try { + if (this.eventsQueue.isEmpty()) await this.enqueueEvents(); + + event = this.eventsQueue.pop(); + + if (!event) { + await delay(this.fetchDelayInMs); + continue; + } + + event = await this.enhanceStrategyId(event); + if (event.contractName === "Strategy" && "strategyId" in event) { + if (!existsHandler(event.strategyId)) { + //TODO: save to registry as unsupported strategy, so when the strategy is handled it will be backwards compatible and process all of the events + console.log( + `No handler found for strategyId: ${event.strategyId}. Event: ${stringify( + event, + )}`, + ); + continue; + } + } + + const changesets = await this.eventsProcessor.processEvent(event); + const executionResult = await this.dataLoader.applyChanges(changesets); + + if (executionResult.numFailed > 0) { + //TODO: should we retry the failed changesets? + console.error( + `Failed to apply changesets. ${executionResult.errors.join("\n")} Event: ${stringify( + event, + )}`, + ); + } else { + await this.eventsRegistry.saveLastProcessedEvent(event); + } + } catch (error: unknown) { + // TODO: improve error handling, retries and notify + if ( + error instanceof UnsupportedStrategy || + error instanceof InvalidEvent || + error instanceof UnsupportedEventException + ) { + console.error( + `Current event cannot be handled. ${error.name}: ${error.message}. Event: ${stringify(event)}`, + ); + } else { + console.error(`Error processing event: ${stringify(event)}`, error); + } + } + } + } + + /** + * Enqueue new events from the events fetcher using the last processed event as a starting point + */ + private async enqueueEvents(): Promise { + const lastProcessedEvent = await this.eventsRegistry.getLastProcessedEvent(); + const blockNumber = lastProcessedEvent?.blockNumber ?? 0; + const logIndex = lastProcessedEvent?.logIndex ?? 0; + + const events = await this.eventsFetcher.fetchEventsByBlockNumberAndLogIndex( + this.chainId, + blockNumber, + logIndex, + this.fetchLimit, + ); + + this.eventsQueue.push(...events); + } + + /** + * Enhance the event with the strategy id when required + * @param event - The event + * @returns The event with the strategy id or the same event if strategyId is not required + * + * StrategyId is required for the following events: + * - PoolCreated from Allo contract + * - Any event from Strategy contract or its implementations + */ + private async enhanceStrategyId( + event: ProcessorEvent, + ): Promise> { + if (!this.requiresStrategyId(event)) { + return event; + } + + const strategyAddress = this.getStrategyAddress(event); + const strategyId = await this.getOrFetchStrategyId(strategyAddress); + event.strategyId = strategyId; + + return event; + } + + /** + * Get the strategy address from the event + * @param event - The event + * @returns The strategy address + */ + private getStrategyAddress( + event: ProcessorEvent<"Allo", "PoolCreated"> | ProcessorEvent<"Strategy", StrategyEvent>, + ): Address { + return isAlloEvent(event) && event.eventName === "PoolCreated" + ? event.params.strategy + : event.srcAddress; + } + + /** + * Get the strategy id from the strategy registry or fetch it from the chain + * @param strategyAddress - The strategy address + * @returns The strategy id + */ + private async getOrFetchStrategyId(strategyAddress: Address): Promise { + const existingId = await this.strategyRegistry.getStrategyId(strategyAddress); + if (existingId) { + return existingId; + } + + const strategyId = await this.dependencies.evmProvider.readContract( + strategyAddress, + iStrategyAbi, + "getStrategyId", + ); + + await this.strategyRegistry.saveStrategyId(strategyAddress, strategyId); + + return strategyId; + } + + /** + * Check if the event requires a strategy id + * @param event - The event + * @returns True if the event requires a strategy id, false otherwise + */ + private requiresStrategyId( + event: ProcessorEvent, + ): event is ProcessorEvent<"Allo", "PoolCreated"> | ProcessorEvent<"Strategy", StrategyEvent> { + return (isAlloEvent(event) && event.eventName === "PoolCreated") || isStrategyEvent(event); + } +} diff --git a/packages/data-flow/src/strategyRegistry.ts b/packages/data-flow/src/strategyRegistry.ts new file mode 100644 index 0000000..c370c58 --- /dev/null +++ b/packages/data-flow/src/strategyRegistry.ts @@ -0,0 +1,21 @@ +import type { Address, Hex } from "viem"; + +import type { IStrategyRegistry } from "./internal.js"; + +/** + * Class to store strategy ids in memory + */ +//TODO: Implement storage to persist strategies. since we're using address, do we need ChainId? +export class InMemoryStrategyRegistry implements IStrategyRegistry { + private strategiesMap: Map = new Map(); + + /** @inheritdoc */ + async getStrategyId(strategyAddress: Address): Promise { + return this.strategiesMap.get(strategyAddress); + } + + /** @inheritdoc */ + async saveStrategyId(strategyAddress: Address, strategyId: Hex): Promise { + this.strategiesMap.set(strategyAddress, strategyId); + } +} diff --git a/packages/data-flow/src/types/index.ts b/packages/data-flow/src/types/index.ts index 2bd755b..af4aa25 100644 --- a/packages/data-flow/src/types/index.ts +++ b/packages/data-flow/src/types/index.ts @@ -1,5 +1,14 @@ -import { Changeset } from "@grants-stack-indexer/repository"; +import { ProcessorDependencies } from "@grants-stack-indexer/processors"; +import { + Changeset, + IApplicationRepository, + IProjectRepository, + IRoundRepository, +} from "@grants-stack-indexer/repository"; +/** + * The result of the execution of the changesets. + */ export type ExecutionResult = { changesets: Changeset["type"][]; numExecuted: number; @@ -7,3 +16,19 @@ export type ExecutionResult = { numFailed: number; errors: string[]; }; + +/** + * The core dependencies for the data flow + * + * Note: for Repositories, we type the Read & Write interfaces + * while the ProcessorDependencies type uses the ReadOnly interfaces + * so that's why we need this type + */ +export type CoreDependencies = Pick< + ProcessorDependencies, + "evmProvider" | "pricingProvider" | "metadataProvider" +> & { + roundRepository: IRoundRepository; + projectRepository: IProjectRepository; + applicationRepository: IApplicationRepository; +}; diff --git a/packages/data-flow/src/utils/delay.ts b/packages/data-flow/src/utils/delay.ts new file mode 100644 index 0000000..b7a032b --- /dev/null +++ b/packages/data-flow/src/utils/delay.ts @@ -0,0 +1,3 @@ +export function delay(ms: number): Promise { + return new Promise((resolve) => setTimeout(resolve, ms)); +} diff --git a/packages/data-flow/src/utils/index.ts b/packages/data-flow/src/utils/index.ts new file mode 100644 index 0000000..0fa9eb7 --- /dev/null +++ b/packages/data-flow/src/utils/index.ts @@ -0,0 +1,2 @@ +export * from "./queue.js"; +export * from "./delay.js"; diff --git a/packages/data-flow/src/utils/queue.ts b/packages/data-flow/src/utils/queue.ts new file mode 100644 index 0000000..8a9a815 --- /dev/null +++ b/packages/data-flow/src/utils/queue.ts @@ -0,0 +1,88 @@ +export interface IQueue { + push(...items: T[]): void; + pop(): T | undefined; + peek(): T | undefined; + get length(): number; + isEmpty(): boolean; +} +/** + * A circular buffer-based queue implementation that provides efficient O(1) operations + * with automatic resizing capabilities. + * + * Key benefits: + * - Constant O(1) time complexity for push/pop/peek operations + * - Memory efficient circular buffer that reuses space + * - Automatic buffer resizing to handle growth and shrinkage + * - Amortized O(1) push operations even when resizing is needed + * - Memory optimization by shrinking when queue becomes very empty + * - Initial capacity can be tuned based on expected usage + * + * The circular buffer approach avoids the need to shift elements, making it more + * efficient than array-based queues for high-throughput scenarios. The automatic + * resizing ensures memory usage adapts to actual needs while maintaining performance. + */ + +export class Queue implements IQueue { + private buffer: (T | undefined)[]; + private head: number = 0; + private tail: number = 0; + private size: number = 0; + + constructor(initialCapacity: number = 5000) { + this.buffer = new Array(initialCapacity).fill(undefined) as (T | undefined)[]; + } + + push(...items: T[]): void { + const requiredCapacity = this.size + items.length; + if (requiredCapacity > this.buffer.length) { + const newCapacity = Math.max(this.buffer.length * 2, requiredCapacity * 1.5); + this.resize(Math.ceil(newCapacity)); + } + + for (const item of items) { + this.buffer[this.tail] = item; + this.tail = (this.tail + 1) % this.buffer.length; + this.size++; + } + } + + pop(): T | undefined { + if (this.size === 0) { + return undefined; + } + + const item = this.buffer[this.head]; + this.buffer[this.head] = undefined; + this.head = (this.head + 1) % this.buffer.length; + this.size--; + + // Only shrink when very empty and buffer is large + if (this.size < this.buffer.length / 8 && this.buffer.length > 16384) { + this.resize(this.buffer.length / 2); + } + + return item; + } + + peek(): T | undefined { + return this.buffer[this.head]; + } + + get length(): number { + return this.size; + } + + isEmpty(): boolean { + return this.size === 0; + } + + private resize(newCapacity: number): void { + const newBuffer = Array(Math.ceil(newCapacity)).fill(undefined) as (T | undefined)[]; + for (let i = 0; i < this.size; i++) { + newBuffer[i] = this.buffer[(this.head + i) % this.buffer.length]; + } + this.buffer = newBuffer; + this.head = 0; + this.tail = this.size; + } +} diff --git a/packages/data-flow/test/unit/eventsFetcher.spec.ts b/packages/data-flow/test/unit/eventsFetcher.spec.ts index ee65a4e..89694f2 100644 --- a/packages/data-flow/test/unit/eventsFetcher.spec.ts +++ b/packages/data-flow/test/unit/eventsFetcher.spec.ts @@ -1,7 +1,7 @@ import { beforeEach, describe, expect, it, Mocked, vi } from "vitest"; import { IIndexerClient } from "@grants-stack-indexer/indexer-client"; -import { AnyProtocolEvent } from "@grants-stack-indexer/shared"; +import { AnyIndexerFetchedEvent, ChainId } from "@grants-stack-indexer/shared"; import { EventsFetcher } from "../../src/eventsFetcher.js"; @@ -18,7 +18,7 @@ describe("EventsFetcher", () => { }); it("fetches events by block number and log index", async () => { - const mockEvents: AnyProtocolEvent[] = [ + const mockEvents: AnyIndexerFetchedEvent[] = [ { chainId: 1, blockNumber: 12345, @@ -46,8 +46,8 @@ describe("EventsFetcher", () => { transactionFields: { hash: "0x1234", transactionIndex: 1 }, }, ]; - const chainId = 1n; - const blockNumber = 1000n; + const chainId = 1 as ChainId; + const blockNumber = 1000; const logIndex = 0; const limit = 100; @@ -69,8 +69,8 @@ describe("EventsFetcher", () => { }); it("handles errors thrown by indexer client", async () => { - const chainId = 1n; - const blockNumber = 1000n; + const chainId = 1 as ChainId; + const blockNumber = 1000; const logIndex = 0; indexerClientMock.getEventsAfterBlockNumberAndLogIndex.mockRejectedValue( diff --git a/packages/data-flow/test/unit/eventsProcessor.spec.ts b/packages/data-flow/test/unit/eventsProcessor.spec.ts new file mode 100644 index 0000000..6df3753 --- /dev/null +++ b/packages/data-flow/test/unit/eventsProcessor.spec.ts @@ -0,0 +1,118 @@ +import { beforeEach, describe, expect, it, vi } from "vitest"; + +import { + AlloProcessor, + ProcessorDependencies, + RegistryProcessor, + StrategyProcessor, +} from "@grants-stack-indexer/processors"; +import { Changeset } from "@grants-stack-indexer/repository"; +import { AnyEvent, ChainId, ContractName, ProcessorEvent } from "@grants-stack-indexer/shared"; + +import { EventsProcessor } from "../../src/eventsProcessor.js"; +import { InvalidEvent } from "../../src/exceptions/index.js"; + +vi.mock("@grants-stack-indexer/processors", () => ({ + AlloProcessor: vi.fn(), + RegistryProcessor: vi.fn(), + StrategyProcessor: vi.fn(), +})); + +describe("EventsProcessor", () => { + let eventsProcessor: EventsProcessor; + let mockAlloProcessor: AlloProcessor; + let mockRegistryProcessor: RegistryProcessor; + let mockStrategyProcessor: StrategyProcessor; + const chainId = 1 as ChainId; + const mockDependencies = {} as ProcessorDependencies; + + beforeEach(() => { + mockAlloProcessor = { process: vi.fn() } as unknown as AlloProcessor; + mockRegistryProcessor = { process: vi.fn() } as unknown as RegistryProcessor; + mockStrategyProcessor = { process: vi.fn() } as unknown as StrategyProcessor; + + vi.mocked(AlloProcessor).mockImplementation(() => mockAlloProcessor); + vi.mocked(RegistryProcessor).mockImplementation(() => mockRegistryProcessor); + vi.mocked(StrategyProcessor).mockImplementation(() => mockStrategyProcessor); + + eventsProcessor = new EventsProcessor(chainId, mockDependencies); + }); + + it("process Allo events using AlloProcessor", async () => { + const mockChangeset: Changeset[] = [ + { type: "UpdateProject", args: { chainId, projectId: "1", project: {} } }, + ]; + const mockEvent = { + contractName: "Allo", + eventName: "PoolCreated", + args: {}, + } as unknown as ProcessorEvent<"Allo", "PoolCreated">; + + vi.spyOn(mockAlloProcessor, "process").mockResolvedValue(mockChangeset); + + const result = await eventsProcessor.processEvent(mockEvent); + + expect(mockAlloProcessor.process).toHaveBeenCalledWith(mockEvent); + expect(result).toBe(mockChangeset); + }); + + it("process Registry events using RegistryProcessor", async () => { + const mockChangeset: Changeset[] = [ + { type: "UpdateProject", args: { chainId, projectId: "1", project: {} } }, + ]; + const mockEvent = { + contractName: "Registry", + eventName: "ProfileCreated", + args: {}, + } as unknown as ProcessorEvent<"Registry", "ProfileCreated">; + + vi.spyOn(mockRegistryProcessor, "process").mockResolvedValue(mockChangeset); + + const result = await eventsProcessor.processEvent(mockEvent); + + expect(mockRegistryProcessor.process).toHaveBeenCalledWith(mockEvent); + expect(result).toBe(mockChangeset); + }); + + it("process Strategy events using StrategyProcessor", async () => { + const mockChangeset: Changeset[] = [ + { type: "UpdateRound", args: { chainId, roundId: "1", round: {} } }, + ]; + const mockEvent = { + contractName: "Strategy", + eventName: "Distributed", + args: {}, + } as unknown as ProcessorEvent<"Strategy", "Distributed">; + + vi.spyOn(mockStrategyProcessor, "process").mockResolvedValue(mockChangeset); + + const result = await eventsProcessor.processEvent(mockEvent); + + expect(mockStrategyProcessor.process).toHaveBeenCalledWith(mockEvent); + expect(result).toBe(mockChangeset); + }); + + it("throw InvalidEvent for unknown event types", async () => { + const mockEvent: ProcessorEvent = { + contractName: "Unknown" as unknown as ContractName, + eventName: "PoolCreated", + blockNumber: 1, + blockTimestamp: 1, + chainId, + logIndex: 1, + srcAddress: "0x0", + params: { + sender: "0x0", + recipientAddress: "0x0", + recipientId: "0x0", + amount: 1, + }, + transactionFields: { + hash: "0x0", + transactionIndex: 1, + }, + }; + + await expect(eventsProcessor.processEvent(mockEvent)).rejects.toThrow(InvalidEvent); + }); +}); diff --git a/packages/data-flow/test/unit/eventsRegistry.spec.ts b/packages/data-flow/test/unit/eventsRegistry.spec.ts new file mode 100644 index 0000000..077f4d3 --- /dev/null +++ b/packages/data-flow/test/unit/eventsRegistry.spec.ts @@ -0,0 +1,98 @@ +import { describe, expect, it } from "vitest"; + +import { ChainId, ProcessorEvent } from "@grants-stack-indexer/shared"; + +import { InMemoryEventsRegistry } from "../../src/eventsRegistry.js"; + +describe("InMemoryEventsRegistry", () => { + it("return null when no event has been saved", async () => { + const registry = new InMemoryEventsRegistry(); + const lastEvent = await registry.getLastProcessedEvent(); + expect(lastEvent).toBeUndefined(); + }); + + it("save and retrieve the last processed event", async () => { + const registry = new InMemoryEventsRegistry(); + const mockEvent: ProcessorEvent<"Allo", "PoolCreated"> = { + contractName: "Allo", + eventName: "PoolCreated", + blockNumber: 1, + blockTimestamp: 1234567890, + chainId: 1 as ChainId, + logIndex: 0, + srcAddress: "0x123", + strategyId: "0xstrategy", + params: { + poolId: 1n, + profileId: "0x456", + strategy: "0x789", + token: "0xtoken", + amount: 0n, + metadata: [1n, "0xmetadata"], + }, + transactionFields: { + hash: "0xabc", + transactionIndex: 0, + }, + }; + + await registry.saveLastProcessedEvent(mockEvent); + const retrievedEvent = await registry.getLastProcessedEvent(); + + expect(retrievedEvent).toEqual(mockEvent); + }); + + it("should update the last processed event when saving multiple times", async () => { + const registry = new InMemoryEventsRegistry(); + + const firstEvent: ProcessorEvent<"Allo", "PoolCreated"> = { + contractName: "Allo", + eventName: "PoolCreated", + blockNumber: 1, + blockTimestamp: 1234567890, + chainId: 1 as ChainId, + logIndex: 0, + srcAddress: "0x123", + strategyId: "0xstrategy", + params: { + poolId: 1n, + profileId: "0x456", + strategy: "0x789", + token: "0xtoken", + amount: 0n, + metadata: [1n, "0xmetadata"], + }, + transactionFields: { + hash: "0xabc", + transactionIndex: 0, + }, + }; + + const secondEvent: ProcessorEvent<"Strategy", "Registered"> = { + contractName: "Strategy", + eventName: "Registered", + blockNumber: 1, + blockTimestamp: 1234567890, + chainId: 1 as ChainId, + logIndex: 0, + srcAddress: "0x123", + strategyId: "0xstrategy", + params: { + recipientId: "0xrecipient", + data: "0xdata", + sender: "0xsender", + }, + transactionFields: { + hash: "0xabc", + transactionIndex: 0, + }, + }; + + await registry.saveLastProcessedEvent(firstEvent); + await registry.saveLastProcessedEvent(secondEvent); + + const lastEvent = await registry.getLastProcessedEvent(); + expect(lastEvent).toEqual(secondEvent); + expect(lastEvent).not.toEqual(firstEvent); + }); +}); diff --git a/packages/data-flow/test/unit/orchestrator.spec.ts b/packages/data-flow/test/unit/orchestrator.spec.ts new file mode 100644 index 0000000..293bd8c --- /dev/null +++ b/packages/data-flow/test/unit/orchestrator.spec.ts @@ -0,0 +1,628 @@ +import { Address } from "viem"; +import { afterEach, beforeEach, describe, expect, it, vi } from "vitest"; + +import { EvmProvider } from "@grants-stack-indexer/chain-providers"; +import { IIndexerClient } from "@grants-stack-indexer/indexer-client"; +import { UnsupportedStrategy } from "@grants-stack-indexer/processors"; +import { + Changeset, + IApplicationRepository, + IProjectRepository, + IRoundRepository, +} from "@grants-stack-indexer/repository"; +import { + AlloEvent, + ChainId, + ContractName, + ContractToEventName, + EventParams, + Hex, + ProcessorEvent, + StrategyEvent, + stringify, +} from "@grants-stack-indexer/shared"; + +import { + CoreDependencies, + IEventsRegistry, + InvalidEvent, + IStrategyRegistry, +} from "../../src/internal.js"; +import { Orchestrator } from "../../src/orchestrator.js"; + +vi.mock("../../src/eventsProcessor.js", () => { + const EventsProcessor = vi.fn(); + // eslint-disable-next-line @typescript-eslint/no-unsafe-member-access + EventsProcessor.prototype.processEvent = vi.fn(); + return { + EventsProcessor, + }; +}); +vi.mock("../../src/data-loader/dataLoader.js", () => { + const DataLoader = vi.fn(); + // eslint-disable-next-line @typescript-eslint/no-unsafe-member-access + DataLoader.prototype.applyChanges = vi.fn(); + return { + DataLoader, + }; +}); + +describe("Orchestrator", { sequential: true }, () => { + let orchestrator: Orchestrator; + let mockIndexerClient: IIndexerClient; + let mockEventsRegistry: IEventsRegistry; + let mockStrategyRegistry: IStrategyRegistry; + let mockEvmProvider: EvmProvider; + let abortController: AbortController; + let runPromise: Promise | undefined; + + const chainId = 1 as ChainId; + const mockFetchLimit = 10; + const mockFetchDelay = 100; + + beforeEach(() => { + // Setup mock implementations + mockIndexerClient = { + getEventsAfterBlockNumberAndLogIndex: vi.fn(), + } as unknown as IIndexerClient; + + mockEventsRegistry = { + getLastProcessedEvent: vi.fn(), + saveLastProcessedEvent: vi.fn(), + }; + + mockStrategyRegistry = { + getStrategyId: vi.fn(), + saveStrategyId: vi.fn(), + }; + + mockEvmProvider = { + readContract: vi.fn(), + } as unknown as EvmProvider; + + const dependencies: CoreDependencies = { + evmProvider: mockEvmProvider, + projectRepository: {} as unknown as IProjectRepository, + roundRepository: {} as unknown as IRoundRepository, + applicationRepository: {} as unknown as IApplicationRepository, + pricingProvider: { + getTokenPrice: vi.fn(), + }, + metadataProvider: { + getMetadata: vi.fn(), + }, + }; + + abortController = new AbortController(); + + orchestrator = new Orchestrator( + chainId, + dependencies, + mockIndexerClient, + { + eventsRegistry: mockEventsRegistry, + strategyRegistry: mockStrategyRegistry, + }, + mockFetchLimit, + mockFetchDelay, + ); + }); + + afterEach(async () => { + vi.clearAllMocks(); + + abortController.abort(); + + await runPromise; + + runPromise = undefined; + }); + + describe("Event Processing Flow", () => { + it("process events in the correct order", async () => { + const mockEvents = [ + createMockEvent("Allo", "PoolCreated", 1), + createMockEvent("Registry", "ProfileCreated", 2), + ]; + + const eventsProcessorSpy = vi.spyOn(orchestrator["eventsProcessor"], "processEvent"); + + vi.spyOn(mockEventsRegistry, "getLastProcessedEvent").mockResolvedValue(undefined); + vi.spyOn(mockIndexerClient, "getEventsAfterBlockNumberAndLogIndex") + .mockResolvedValueOnce(mockEvents) + .mockResolvedValue([]); + eventsProcessorSpy.mockResolvedValue([]); + vi.spyOn(orchestrator["dataLoader"], "applyChanges").mockResolvedValue({ + numFailed: 0, + errors: [], + changesets: [], + numExecuted: 1, + numSuccessful: 1, + }); + vi.spyOn(mockEventsRegistry, "saveLastProcessedEvent").mockImplementation(() => { + return Promise.resolve(); + }); + + runPromise = orchestrator.run(abortController.signal); + + await vi.waitFor( + () => { + if (eventsProcessorSpy.mock.calls.length < 2) throw new Error("Not yet called"); + }, + { + timeout: 1000, + }, + ); + + expect(eventsProcessorSpy).toHaveBeenCalledWith(mockEvents[0]); + expect(eventsProcessorSpy).toHaveBeenCalledWith(mockEvents[1]); + expect(mockEventsRegistry.saveLastProcessedEvent).toHaveBeenCalledWith(mockEvents[0]); + expect(mockEventsRegistry.saveLastProcessedEvent).toHaveBeenCalledWith(mockEvents[1]); + }); + + it("wait and keep polling on empty queue", async () => { + const getEventsAfterBlockNumberAndLogIndexSpy = vi.spyOn( + mockIndexerClient, + "getEventsAfterBlockNumberAndLogIndex", + ); + getEventsAfterBlockNumberAndLogIndexSpy.mockResolvedValue([]); + + runPromise = orchestrator.run(abortController.signal); + + // Wait for a few polling cycles + await new Promise((resolve) => setTimeout(resolve, mockFetchDelay * 3)); + + expect( + vi.spyOn(orchestrator["eventsProcessor"], "processEvent"), + ).not.toHaveBeenCalled(); + expect( + getEventsAfterBlockNumberAndLogIndexSpy.mock.calls.length, + ).toBeGreaterThanOrEqual(3); + }); + }); + + describe("Strategy ID Enhancement", () => { + it("adds strategy ID to Allo PoolCreated events", async () => { + const strategyAddress = "0x123" as Address; + const strategyId = + "0x6f9291df02b2664139cec5703c124e4ebce32879c74b6297faa1468aa5ff9ebf" as Hex; + const mockEvent = createMockEvent("Allo", "PoolCreated", 1, { + strategy: strategyAddress, + poolId: 1n, + profileId: "0x123", + token: "0x123", + amount: 100n, + metadata: [1n, "1"], + }); + + const eventsProcessorSpy = vi.spyOn(orchestrator["eventsProcessor"], "processEvent"); + + vi.spyOn(mockEventsRegistry, "getLastProcessedEvent").mockResolvedValue(undefined); + vi.spyOn(mockIndexerClient, "getEventsAfterBlockNumberAndLogIndex") + .mockResolvedValueOnce([mockEvent]) + .mockResolvedValue([]); + + vi.spyOn(mockStrategyRegistry, "getStrategyId").mockResolvedValue(undefined); + vi.spyOn(mockEvmProvider, "readContract").mockResolvedValue(strategyId); + const changesets = [ + { + type: "InsertProject", + args: { chainId, projectId: "1", project: {} }, + } as unknown as Changeset, + { + type: "InsertRoundRole", + args: { chainId, roundId: "1", roundRole: {} }, + } as unknown as Changeset, + { + type: "DeletePendingRoundRoles", + args: { chainId, roundId: "1" }, + } as unknown as Changeset, + ]; + + eventsProcessorSpy.mockResolvedValue(changesets); + + vi.spyOn(mockEventsRegistry, "saveLastProcessedEvent").mockImplementation(() => { + return Promise.resolve(); + }); + + vi.spyOn(orchestrator["dataLoader"], "applyChanges").mockResolvedValue({ + numFailed: 0, + errors: [], + changesets: ["InsertProject", "InsertRoundRole", "DeletePendingRoundRoles"], + numExecuted: 3, + numSuccessful: 3, + }); + + runPromise = orchestrator.run(abortController.signal); + + await vi.waitFor(() => { + if (eventsProcessorSpy.mock.calls.length < 1) throw new Error("Not yet called"); + }); + + expect(orchestrator["eventsProcessor"].processEvent).toHaveBeenCalledTimes(1); + expect(mockStrategyRegistry.saveStrategyId).toHaveBeenCalledWith( + strategyAddress, + strategyId, + ); + expect(orchestrator["eventsProcessor"].processEvent).toHaveBeenCalledWith({ + ...mockEvent, + strategyId, + }); + expect(orchestrator["dataLoader"].applyChanges).toHaveBeenCalledTimes(1); + expect(orchestrator["dataLoader"].applyChanges).toHaveBeenCalledWith(changesets); + expect(mockEventsRegistry.saveLastProcessedEvent).toHaveBeenCalledWith(mockEvent); + }); + + const strategyEvents: Record = { + Registered: "", + Distributed: "", + TimestampsUpdated: "", + AllocatedWithToken: "", + }; + + for (const event of Object.keys(strategyEvents) as StrategyEvent[]) { + const strategyAddress = "0x123" as Address; + const strategyId = + "0x6f9291df02b2664139cec5703c124e4ebce32879c74b6297faa1468aa5ff9ebf" as Hex; + it(`adds strategy ID to Strategy ${event} events`, async () => { + const mockEvent = createMockEvent("Strategy", event, 1, undefined, strategyAddress); + const changesets = [ + { + type: "InsertApplication", + args: { chainId, applicationId: "1", application: {} }, + } as unknown as Changeset, + ]; + + const eventsProcessorSpy = vi.spyOn( + orchestrator["eventsProcessor"], + "processEvent", + ); + + vi.spyOn(mockEventsRegistry, "getLastProcessedEvent").mockResolvedValue(undefined); + vi.spyOn(mockIndexerClient, "getEventsAfterBlockNumberAndLogIndex") + .mockResolvedValueOnce([mockEvent]) + .mockResolvedValue([]); + + vi.spyOn(mockStrategyRegistry, "getStrategyId").mockResolvedValue(strategyId); + eventsProcessorSpy.mockResolvedValue(changesets); + vi.spyOn(mockEventsRegistry, "saveLastProcessedEvent").mockImplementation(() => { + return Promise.resolve(); + }); + vi.spyOn(orchestrator["dataLoader"], "applyChanges").mockResolvedValue({ + numFailed: 0, + errors: [], + changesets: ["InsertApplication"], + numExecuted: 1, + numSuccessful: 1, + }); + + runPromise = orchestrator.run(abortController.signal); + + await vi.waitFor(() => { + if (eventsProcessorSpy.mock.calls.length < 1) throw new Error("Not yet called"); + }); + + expect(orchestrator["eventsProcessor"].processEvent).toHaveBeenCalledTimes(1); + expect(orchestrator["eventsProcessor"].processEvent).toHaveBeenCalledWith({ + ...mockEvent, + strategyId, + }); + expect(mockStrategyRegistry.getStrategyId).toHaveBeenCalledWith(strategyAddress); + expect(orchestrator["dataLoader"].applyChanges).toHaveBeenCalledTimes(1); + expect(orchestrator["dataLoader"].applyChanges).toHaveBeenCalledWith(changesets); + expect(mockEventsRegistry.saveLastProcessedEvent).toHaveBeenCalledWith(mockEvent); + }); + } + + it("discards events from unhandled strategies", async () => { + const unhandledStrategyId = "0x6f9aaaaf02b266413f" as Hex; + const strategyAddress = "0x123" as Address; + const mockEvent = createMockEvent( + "Strategy", + "Registered", + 1, + undefined, + strategyAddress, + ); + + const getEventsAfterBlockNumberAndLogIndexSpy = vi.spyOn( + mockIndexerClient, + "getEventsAfterBlockNumberAndLogIndex", + ); + vi.spyOn(mockEventsRegistry, "getLastProcessedEvent").mockResolvedValue(undefined); + getEventsAfterBlockNumberAndLogIndexSpy + .mockResolvedValueOnce([mockEvent]) + .mockResolvedValue([]); + + vi.spyOn(mockStrategyRegistry, "getStrategyId").mockResolvedValue(unhandledStrategyId); + vi.spyOn(mockEvmProvider, "readContract").mockResolvedValue(unhandledStrategyId); + + runPromise = orchestrator.run(abortController.signal); + + await vi.waitFor(() => { + if (getEventsAfterBlockNumberAndLogIndexSpy.mock.calls.length >= 2) + throw new Error("Not yet called"); + }); + + expect(orchestrator["eventsProcessor"].processEvent).not.toHaveBeenCalled(); + expect(orchestrator["dataLoader"].applyChanges).not.toHaveBeenCalled(); + expect(mockEventsRegistry.saveLastProcessedEvent).not.toHaveBeenCalled(); + }); + + it("uses cached strategy ID from registry", async () => { + const strategyAddress = "0x123" as Address; + const strategyId = + "0x6f9291df02b2664139cec5703c124e4ebce32879c74b6297faa1468aa5ff9ebf" as Hex; + const poolCreatedEvent = createMockEvent("Allo", "PoolCreated", 1, { + strategy: strategyAddress, + poolId: 1n, + profileId: "0x123", + token: "0x123", + amount: 100n, + metadata: [1n, "1"], + }); + const registeredEvent = createMockEvent( + "Strategy", + "Registered", + 2, + { + recipientId: "0x123", + data: "0x123", + sender: "0x123", + }, + strategyAddress, + ); + + const eventsProcessorSpy = vi.spyOn(orchestrator["eventsProcessor"], "processEvent"); + + vi.spyOn(mockStrategyRegistry, "getStrategyId") + .mockResolvedValueOnce(undefined) + .mockResolvedValue(strategyId); + + vi.spyOn(mockIndexerClient, "getEventsAfterBlockNumberAndLogIndex") + .mockResolvedValueOnce([poolCreatedEvent]) + .mockResolvedValueOnce([]) + .mockResolvedValueOnce([registeredEvent]) + .mockResolvedValue([]); + + eventsProcessorSpy.mockResolvedValue([]); + vi.spyOn(orchestrator["dataLoader"], "applyChanges").mockResolvedValue({ + numFailed: 0, + errors: [], + changesets: [], + numExecuted: 1, + numSuccessful: 1, + }); + vi.spyOn(mockEventsRegistry, "saveLastProcessedEvent").mockImplementation(() => { + return Promise.resolve(); + }); + + runPromise = orchestrator.run(abortController.signal); + + await vi.waitFor( + () => { + if (eventsProcessorSpy.mock.calls.length < 2) throw new Error("Not yet called"); + }, + { + timeout: 1000, + interval: mockFetchDelay, + }, + ); + + expect(mockEvmProvider.readContract).toHaveBeenCalledTimes(1); + expect(mockStrategyRegistry.getStrategyId).toHaveBeenLastCalledWith(strategyAddress); + expect(eventsProcessorSpy).toHaveBeenLastCalledWith({ + ...registeredEvent, + strategyId, + }); + }); + + it("does not add strategy ID to events from other strategies", async () => { + const mockEvent = createMockEvent("Registry", "ProfileCreated", 1, undefined); + const changesets = [ + { + type: "InsertPendingRoundRole", + args: { chainId, roundId: "1", roundRole: {} }, + } as unknown as Changeset, + ]; + + const eventsProcessorSpy = vi.spyOn(orchestrator["eventsProcessor"], "processEvent"); + + vi.spyOn(mockEventsRegistry, "getLastProcessedEvent").mockResolvedValue(undefined); + vi.spyOn(mockIndexerClient, "getEventsAfterBlockNumberAndLogIndex") + .mockResolvedValueOnce([mockEvent]) + .mockResolvedValue([]); + + eventsProcessorSpy.mockResolvedValue(changesets); + + vi.spyOn(mockEventsRegistry, "saveLastProcessedEvent").mockImplementation(() => { + return Promise.resolve(); + }); + + vi.spyOn(orchestrator["dataLoader"], "applyChanges").mockResolvedValue({ + numFailed: 0, + errors: [], + changesets: ["InsertPendingRoundRole"], + numExecuted: 1, + numSuccessful: 1, + }); + + runPromise = orchestrator.run(abortController.signal); + + await vi.waitFor(() => { + if (eventsProcessorSpy.mock.calls.length < 1) throw new Error("Not yet called"); + }); + + expect(orchestrator["eventsProcessor"].processEvent).toHaveBeenCalledTimes(1); + expect(orchestrator["eventsProcessor"].processEvent).toHaveBeenCalledWith(mockEvent); + expect(mockStrategyRegistry.getStrategyId).not.toHaveBeenCalled(); + expect(mockStrategyRegistry.saveStrategyId).not.toHaveBeenCalled(); + }); + }); + + describe("Error Handling", () => { + it.skip("retries error"); + + it("keeps running when there is an error", async () => { + const eventsProcessorSpy = vi.spyOn(orchestrator["eventsProcessor"], "processEvent"); + const consoleSpy = vi.spyOn(console, "error"); + const errorEvent = createMockEvent("Allo", "Unknown" as unknown as AlloEvent, 1); + const error = new Error("test"); + + vi.spyOn(mockIndexerClient, "getEventsAfterBlockNumberAndLogIndex") + .mockResolvedValueOnce([errorEvent]) + .mockResolvedValueOnce([createMockEvent("Registry", "ProfileCreated", 2)]) + .mockResolvedValue([]); + + eventsProcessorSpy.mockRejectedValueOnce(error).mockResolvedValueOnce([]); + vi.spyOn(orchestrator["dataLoader"], "applyChanges").mockResolvedValue({ + numFailed: 0, + errors: [], + changesets: [], + numExecuted: 1, + numSuccessful: 1, + }); + vi.spyOn(mockEventsRegistry, "saveLastProcessedEvent").mockImplementation(() => { + return Promise.resolve(); + }); + + runPromise = orchestrator.run(abortController.signal); + + await vi.waitFor( + () => { + if (eventsProcessorSpy.mock.calls.length < 2) throw new Error("Not yet called"); + }, + { + timeout: 1000, + }, + ); + + expect(eventsProcessorSpy).toHaveBeenCalledTimes(2); + expect(orchestrator["dataLoader"].applyChanges).toHaveBeenCalledTimes(1); + expect(mockEventsRegistry.saveLastProcessedEvent).toHaveBeenCalledTimes(1); + expect(consoleSpy).toHaveBeenCalledTimes(1); + expect(consoleSpy).toHaveBeenCalledWith( + expect.stringContaining(`Error processing event: ${stringify(errorEvent)}`), + error, + ); + }); + + it("logs error for InvalidEvent", async () => { + const mockEvent = createMockEvent("Allo", "Unknown" as unknown as AlloEvent, 1); + const error = new InvalidEvent(mockEvent); + + const consoleSpy = vi.spyOn(console, "error"); + const eventsProcessorSpy = vi.spyOn(orchestrator["eventsProcessor"], "processEvent"); + + vi.spyOn(mockIndexerClient, "getEventsAfterBlockNumberAndLogIndex") + .mockResolvedValueOnce([mockEvent]) + .mockResolvedValue([]); + eventsProcessorSpy.mockRejectedValue(error); + + runPromise = orchestrator.run(abortController.signal); + + await vi.waitFor(() => { + if (eventsProcessorSpy.mock.calls.length < 1) throw new Error("Not yet called"); + }); + + expect(consoleSpy).toHaveBeenCalledWith(expect.stringContaining("InvalidEvent")); + expect(consoleSpy).toHaveBeenCalledWith(expect.stringContaining(stringify(mockEvent))); + expect(orchestrator["dataLoader"].applyChanges).not.toHaveBeenCalled(); + expect(mockEventsRegistry.saveLastProcessedEvent).not.toHaveBeenCalled(); + }); + + it("logs error for UnsupportedEvent", async () => { + const strategyId = + "0x6f9291df02b2664139cec5703c124e4ebce32879c74b6297faa1468aa5ff9ebf" as Hex; + const mockEvent = createMockEvent( + "Strategy", + "NotHandled" as unknown as StrategyEvent, + 1, + ); + const error = new UnsupportedStrategy(strategyId); + + vi.spyOn(mockStrategyRegistry, "getStrategyId").mockResolvedValue(strategyId); + vi.spyOn(mockIndexerClient, "getEventsAfterBlockNumberAndLogIndex") + .mockResolvedValueOnce([mockEvent]) + .mockResolvedValue([]); + vi.spyOn(orchestrator["eventsProcessor"], "processEvent").mockRejectedValue(error); + + const consoleSpy = vi.spyOn(console, "error"); + + runPromise = orchestrator.run(abortController.signal); + + await vi.waitFor(() => { + if (consoleSpy.mock.calls.length < 1) throw new Error("Not yet called"); + }); + + expect(consoleSpy).toHaveBeenCalledTimes(1); + expect(consoleSpy).toHaveBeenCalledWith( + expect.stringContaining(`Strategy ${strategyId} unsupported`), + ); + expect(consoleSpy).toHaveBeenCalledWith(expect.stringContaining(stringify(mockEvent))); + expect(orchestrator["dataLoader"].applyChanges).not.toHaveBeenCalled(); + expect(mockEventsRegistry.saveLastProcessedEvent).not.toHaveBeenCalled(); + }); + + it("logs DataLoader errors", async () => { + const mockEvent = createMockEvent("Allo", "PoolCreated", 1); + const mockChangesets: Changeset[] = [ + { type: "UpdateProject", args: { chainId, projectId: "1", project: {} } }, + ]; + + const consoleSpy = vi.spyOn(console, "error").mockImplementation(() => {}); + const dataLoaderSpy = vi.spyOn(orchestrator["dataLoader"], "applyChanges"); + + vi.spyOn(mockIndexerClient, "getEventsAfterBlockNumberAndLogIndex") + .mockResolvedValueOnce([mockEvent]) + .mockResolvedValue([]); + vi.spyOn(orchestrator["eventsProcessor"], "processEvent").mockResolvedValue( + mockChangesets, + ); + dataLoaderSpy.mockResolvedValue({ + numFailed: 1, + errors: ["Failed to update project"], + changesets: ["UpdateProject"], + numExecuted: 1, + numSuccessful: 0, + }); + + runPromise = orchestrator.run(abortController.signal); + + await vi.waitFor(() => { + if (dataLoaderSpy.mock.calls.length < 1) throw new Error("Not yet called"); + }); + + expect(consoleSpy).toHaveBeenCalledWith( + expect.stringContaining("Failed to apply changesets"), + ); + expect(consoleSpy).toHaveBeenCalledWith(expect.stringContaining(stringify(mockEvent))); + expect(dataLoaderSpy).toHaveBeenCalledTimes(1); + expect(mockEventsRegistry.saveLastProcessedEvent).not.toHaveBeenCalled(); + }); + }); +}); + +// Helper function to create mock events +function createMockEvent>( + contractName: T, + eventName: E, + blockNumber: number, + params: EventParams = {} as EventParams, + srcAddress: Address = "0x123" as Address, +): ProcessorEvent { + return { + contractName, + eventName, + blockNumber, + blockTimestamp: 1234567890, + chainId: 1 as ChainId, + logIndex: 0, + srcAddress, + params, + transactionFields: { + hash: "0xabc", + transactionIndex: 0, + }, + } as ProcessorEvent; +} diff --git a/packages/data-flow/test/unit/strategyRegistry.spec.ts b/packages/data-flow/test/unit/strategyRegistry.spec.ts new file mode 100644 index 0000000..4ace8ee --- /dev/null +++ b/packages/data-flow/test/unit/strategyRegistry.spec.ts @@ -0,0 +1,42 @@ +import { Address, Hex } from "viem"; +import { describe, expect, it } from "vitest"; + +import { InMemoryStrategyRegistry } from "../../src/strategyRegistry.js"; + +describe("InMemoryStrategyRegistry", () => { + it("return undefined for non-existent strategy address", async () => { + const registry = new InMemoryStrategyRegistry(); + const strategyAddress = "0x123" as Address; + + const strategyId = await registry.getStrategyId(strategyAddress); + expect(strategyId).toBeUndefined(); + }); + + it("save and retrieve strategy id", async () => { + const registry = new InMemoryStrategyRegistry(); + const strategyAddress = "0x123" as Address; + const strategyId = "0xabc" as Hex; + + await registry.saveStrategyId(strategyAddress, strategyId); + const retrievedId = await registry.getStrategyId(strategyAddress); + + expect(retrievedId).toBe(strategyId); + }); + + it("handle multiple strategy addresses independently", async () => { + const registry = new InMemoryStrategyRegistry(); + const firstAddress = "0x123" as Address; + const secondAddress = "0x456" as Address; + const firstStrategyId = "0xabc" as Hex; + const secondStrategyId = "0xdef" as Hex; + + await registry.saveStrategyId(firstAddress, firstStrategyId); + await registry.saveStrategyId(secondAddress, secondStrategyId); + + const retrievedFirstId = await registry.getStrategyId(firstAddress); + const retrievedSecondId = await registry.getStrategyId(secondAddress); + + expect(retrievedFirstId).toBe(firstStrategyId); + expect(retrievedSecondId).toBe(secondStrategyId); + }); +}); diff --git a/packages/data-flow/test/unit/utils/queue.spec.ts b/packages/data-flow/test/unit/utils/queue.spec.ts new file mode 100644 index 0000000..6842109 --- /dev/null +++ b/packages/data-flow/test/unit/utils/queue.spec.ts @@ -0,0 +1,79 @@ +import { describe, expect, it } from "vitest"; + +import { Queue } from "../../../src/utils/queue.js"; + +describe("Queue", () => { + it("create an empty queue", () => { + const queue = new Queue(); + expect(queue.length).toBe(0); + expect(queue.isEmpty()).toBe(true); + }); + + it("push and pop items correctly", () => { + const queue = new Queue(); + queue.push(1, 2, 3); + + expect(queue.length).toBe(3); + expect(queue.pop()).toBe(1); + expect(queue.pop()).toBe(2); + expect(queue.pop()).toBe(3); + expect(queue.isEmpty()).toBe(true); + }); + + it("peek at the first item without removing it", () => { + const queue = new Queue(); + queue.push(1, 2); + + expect(queue.peek()).toBe(1); + expect(queue.length).toBe(2); // Length shouldn't change after peek + }); + + it("handles pushing more items than initial capacity", () => { + const queue = new Queue(2); + queue.push(1, 2, 3, 4, 5); + + expect(queue.length).toBe(5); + expect(queue.pop()).toBe(1); + expect(queue.pop()).toBe(2); + expect(queue.pop()).toBe(3); + expect(queue.pop()).toBe(4); + expect(queue.pop()).toBe(5); + }); + + it("handles circular buffer wrapping", () => { + const queue = new Queue(3); + queue.push(1, 2); + queue.pop(); // Remove 1 + queue.pop(); // Remove 2 + queue.push(3, 4); // Should wrap around + + expect(queue.length).toBe(2); + expect(queue.pop()).toBe(3); + expect(queue.pop()).toBe(4); + }); + + it("returns undefined when popping from empty queue", () => { + const queue = new Queue(); + expect(queue.pop()).toBeUndefined(); + expect(queue.peek()).toBeUndefined(); + }); + + it("shrinks buffer when queue becomes very empty", () => { + const queue = new Queue(20000); + const manyItems = Array.from({ length: 18000 }, (_, i) => i); + + queue.push(...manyItems); + expect(queue.length).toBe(18000); + + // Pop most items to trigger shrinking + for (let i = 0; i < 17000; i++) { + queue.pop(); + } + + expect(queue.length).toBe(1000); + // Verify remaining items are correct + for (let i = 17000; i < 18000; i++) { + expect(queue.pop()).toBe(i); + } + }); +}); diff --git a/packages/data-flow/vitest.config.ts b/packages/data-flow/vitest.config.ts index 8e1bbf4..22ef210 100644 --- a/packages/data-flow/vitest.config.ts +++ b/packages/data-flow/vitest.config.ts @@ -10,7 +10,14 @@ export default defineConfig({ coverage: { provider: "v8", reporter: ["text", "json", "html"], // Coverage reporters - exclude: ["node_modules", "dist", "src/index.ts", ...configDefaults.exclude], // Files to exclude from coverage + exclude: [ + "node_modules", + "dist", + "src/index.ts", + "src/external.ts", + "test/**", + ...configDefaults.exclude, + ], // Files to exclude from coverage }, }, resolve: { diff --git a/packages/indexer-client/src/interfaces/indexerClient.ts b/packages/indexer-client/src/interfaces/indexerClient.ts index f1fca99..77e1c97 100644 --- a/packages/indexer-client/src/interfaces/indexerClient.ts +++ b/packages/indexer-client/src/interfaces/indexerClient.ts @@ -1,4 +1,4 @@ -import { AnyProtocolEvent } from "@grants-stack-indexer/shared"; +import { AnyIndexerFetchedEvent, ChainId } from "@grants-stack-indexer/shared"; /** * Interface for the indexer client @@ -12,9 +12,9 @@ export interface IIndexerClient { * @param limit Limit of events to fetch */ getEventsAfterBlockNumberAndLogIndex( - chainId: bigint, - fromBlock: bigint, + chainId: ChainId, + fromBlock: number, logIndex: number, limit?: number, - ): Promise; + ): Promise; } diff --git a/packages/indexer-client/src/providers/envioIndexerClient.ts b/packages/indexer-client/src/providers/envioIndexerClient.ts index 8c210f4..9e8411f 100644 --- a/packages/indexer-client/src/providers/envioIndexerClient.ts +++ b/packages/indexer-client/src/providers/envioIndexerClient.ts @@ -1,6 +1,6 @@ import { gql, GraphQLClient } from "graphql-request"; -import { AnyProtocolEvent } from "@grants-stack-indexer/shared"; +import { AnyIndexerFetchedEvent, ChainId } from "@grants-stack-indexer/shared"; import { IndexerClientError, InvalidIndexerResponse } from "../exceptions/index.js"; import { IIndexerClient } from "../internal.js"; @@ -17,11 +17,11 @@ export class EnvioIndexerClient implements IIndexerClient { } /* @inheritdoc */ public async getEventsAfterBlockNumberAndLogIndex( - chainId: bigint, - blockNumber: bigint, + chainId: ChainId, + blockNumber: number, logIndex: number, limit: number = 100, - ): Promise { + ): Promise { try { const response = (await this.client.request( gql` @@ -51,7 +51,7 @@ export class EnvioIndexerClient implements IIndexerClient { } `, { chainId, blockNumber, logIndex, limit }, - )) as { data: { raw_events: AnyProtocolEvent[] } }; + )) as { data: { raw_events: AnyIndexerFetchedEvent[] } }; if (response?.data?.raw_events) { return response.data.raw_events; } else { diff --git a/packages/indexer-client/test/unit/envioIndexerClient.spec.ts b/packages/indexer-client/test/unit/envioIndexerClient.spec.ts index cffa08f..c084007 100644 --- a/packages/indexer-client/test/unit/envioIndexerClient.spec.ts +++ b/packages/indexer-client/test/unit/envioIndexerClient.spec.ts @@ -1,7 +1,7 @@ import { GraphQLClient } from "graphql-request"; import { afterEach, beforeEach, describe, expect, it, Mocked, vi } from "vitest"; -import { AnyProtocolEvent } from "@grants-stack-indexer/shared"; +import { AnyIndexerFetchedEvent, ChainId } from "@grants-stack-indexer/shared"; import { IndexerClientError, InvalidIndexerResponse } from "../../src/exceptions/index.js"; import { EnvioIndexerClient } from "../../src/providers/envioIndexerClient.js"; @@ -42,7 +42,7 @@ describe("EnvioIndexerClient", () => { }); describe("getEventsAfterBlockNumberAndLogIndex", () => { - const mockEvents: AnyProtocolEvent[] = [ + const mockEvents: AnyIndexerFetchedEvent[] = [ { chainId: 1, blockNumber: 12345, @@ -70,8 +70,8 @@ describe("EnvioIndexerClient", () => { graphqlClient.request.mockResolvedValue(mockedResponse); const result = await envioIndexerClient.getEventsAfterBlockNumberAndLogIndex( - 1n, - 12345n, + 1 as ChainId, + 12345, 0, 100, ); @@ -89,7 +89,7 @@ describe("EnvioIndexerClient", () => { graphqlClient.request.mockResolvedValue(mockedResponse); await expect( - envioIndexerClient.getEventsAfterBlockNumberAndLogIndex(1n, 12345n, 0), + envioIndexerClient.getEventsAfterBlockNumberAndLogIndex(1 as ChainId, 12345, 0), ).rejects.toThrow(InvalidIndexerResponse); }); @@ -98,7 +98,7 @@ describe("EnvioIndexerClient", () => { graphqlClient.request.mockRejectedValue(error); await expect( - envioIndexerClient.getEventsAfterBlockNumberAndLogIndex(1n, 12345n, 0), + envioIndexerClient.getEventsAfterBlockNumberAndLogIndex(1 as ChainId, 12345, 0), ).rejects.toThrow(IndexerClientError); }); @@ -114,8 +114,8 @@ describe("EnvioIndexerClient", () => { // Call the method without the limit argument const result = await envioIndexerClient.getEventsAfterBlockNumberAndLogIndex( - 1n, - 12345n, + 1 as ChainId, + 12345, 0, ); @@ -123,8 +123,8 @@ describe("EnvioIndexerClient", () => { expect(graphqlClient.request).toHaveBeenCalledWith( expect.any(String), // We can check the query string later if necessary { - chainId: 1n, - blockNumber: 12345n, + chainId: 1, + blockNumber: 12345, logIndex: 0, limit: 100, // Ensure the default limit is used }, @@ -142,8 +142,8 @@ describe("EnvioIndexerClient", () => { graphqlClient.request.mockResolvedValue(mockedResponse); const result = await envioIndexerClient.getEventsAfterBlockNumberAndLogIndex( - 1n, - 12345n, + 1 as ChainId, + 12345, 0, ); expect(result).toEqual([]); diff --git a/packages/processors/src/allo/allo.processor.ts b/packages/processors/src/allo/allo.processor.ts index 2a5ffed..c93afc4 100644 --- a/packages/processors/src/allo/allo.processor.ts +++ b/packages/processors/src/allo/allo.processor.ts @@ -1,5 +1,5 @@ import { Changeset } from "@grants-stack-indexer/repository"; -import { AlloEvent, ChainId, ProtocolEvent } from "@grants-stack-indexer/shared"; +import { AlloEvent, ChainId, ProcessorEvent } from "@grants-stack-indexer/shared"; import type { IProcessor, ProcessorDependencies } from "../internal.js"; import { UnsupportedEventException } from "../internal.js"; @@ -11,7 +11,7 @@ export class AlloProcessor implements IProcessor<"Allo", AlloEvent> { private readonly dependencies: ProcessorDependencies, ) {} - async process(event: ProtocolEvent<"Allo", AlloEvent>): Promise { + async process(event: ProcessorEvent<"Allo", AlloEvent>): Promise { switch (event.eventName) { case "PoolCreated": return new PoolCreatedHandler(event, this.chainId, this.dependencies).handle(); diff --git a/packages/processors/src/allo/handlers/poolCreated.handler.ts b/packages/processors/src/allo/handlers/poolCreated.handler.ts index 1a38ac4..e1dc9ae 100644 --- a/packages/processors/src/allo/handlers/poolCreated.handler.ts +++ b/packages/processors/src/allo/handlers/poolCreated.handler.ts @@ -1,7 +1,7 @@ import { getAddress, zeroAddress } from "viem"; import type { Changeset, NewRound, PendingRoundRole } from "@grants-stack-indexer/repository"; -import type { ChainId, ProtocolEvent, Token } from "@grants-stack-indexer/shared"; +import type { ChainId, ProcessorEvent, Token } from "@grants-stack-indexer/shared"; import { getToken, isAlloNativeToken } from "@grants-stack-indexer/shared"; import type { IEventHandler, ProcessorDependencies, StrategyTimings } from "../../internal.js"; @@ -30,7 +30,7 @@ export const TIMESTAMP_DELTA_RANGE = 2 * 60 * 60 * 1000; */ export class PoolCreatedHandler implements IEventHandler<"Allo", "PoolCreated"> { constructor( - readonly event: ProtocolEvent<"Allo", "PoolCreated">, + readonly event: ProcessorEvent<"Allo", "PoolCreated">, private readonly chainId: ChainId, private readonly dependencies: Dependencies, ) {} diff --git a/packages/processors/src/external.ts b/packages/processors/src/external.ts index 02e1abc..3cedb0e 100644 --- a/packages/processors/src/external.ts +++ b/packages/processors/src/external.ts @@ -1,5 +1,8 @@ // Add your external exports here -export { StrategyProcessor, AlloProcessor } from "./internal.js"; -export type { IProcessor } from "./internal.js"; +export { StrategyProcessor, AlloProcessor, RegistryProcessor } from "./internal.js"; + +export { UnsupportedStrategy, UnsupportedEventException } from "./internal.js"; + +export type { IProcessor, ProcessorDependencies } from "./internal.js"; export { existsHandler } from "./internal.js"; diff --git a/packages/processors/src/interfaces/eventHandler.interface.ts b/packages/processors/src/interfaces/eventHandler.interface.ts index 03ee946..56f3a0e 100644 --- a/packages/processors/src/interfaces/eventHandler.interface.ts +++ b/packages/processors/src/interfaces/eventHandler.interface.ts @@ -2,7 +2,7 @@ import type { Changeset } from "@grants-stack-indexer/repository"; import type { ContractName, ContractToEventName, - ProtocolEvent, + ProcessorEvent, } from "@grants-stack-indexer/shared"; /** @@ -14,7 +14,7 @@ export interface IEventHandler; + readonly event: ProcessorEvent; /** * Handles the event. diff --git a/packages/processors/src/interfaces/factory.interface.ts b/packages/processors/src/interfaces/factory.interface.ts index 3c82fbc..343eca9 100644 --- a/packages/processors/src/interfaces/factory.interface.ts +++ b/packages/processors/src/interfaces/factory.interface.ts @@ -2,7 +2,7 @@ import { ChainId, ContractName, ContractToEventName, - ProtocolEvent, + ProcessorEvent, } from "@grants-stack-indexer/shared"; import { ProcessorDependencies } from "../types/processor.types.js"; @@ -10,7 +10,7 @@ import { IEventHandler } from "./index.js"; export interface IEventHandlerFactory> { createHandler( - event: ProtocolEvent, + event: ProcessorEvent, chainId: ChainId, dependencies: ProcessorDependencies, ): IEventHandler; diff --git a/packages/processors/src/interfaces/processor.interface.ts b/packages/processors/src/interfaces/processor.interface.ts index 4f8c943..8062a90 100644 --- a/packages/processors/src/interfaces/processor.interface.ts +++ b/packages/processors/src/interfaces/processor.interface.ts @@ -1,5 +1,5 @@ import type { Changeset } from "@grants-stack-indexer/repository"; -import { ContractName, ContractToEventName, ProtocolEvent } from "@grants-stack-indexer/shared"; +import { ContractName, ContractToEventName, ProcessorEvent } from "@grants-stack-indexer/shared"; export interface IProcessor> { /** @@ -7,5 +7,5 @@ export interface IProcessor): Promise; + process(event: ProcessorEvent): Promise; } diff --git a/packages/processors/src/interfaces/strategyHandler.interface.ts b/packages/processors/src/interfaces/strategyHandler.interface.ts index 5e0c047..bb60f69 100644 --- a/packages/processors/src/interfaces/strategyHandler.interface.ts +++ b/packages/processors/src/interfaces/strategyHandler.interface.ts @@ -2,7 +2,7 @@ import type { Changeset } from "@grants-stack-indexer/repository"; import type { Address, ContractToEventName, - ProtocolEvent, + ProcessorEvent, Token, } from "@grants-stack-indexer/shared"; @@ -23,7 +23,7 @@ export interface IStrategyHandler> { * Handles the event. * @returns A promise that resolves to an array of changesets. */ - handle(event: ProtocolEvent<"Strategy", E>): Promise; + handle(event: ProcessorEvent<"Strategy", E>): Promise; /** * Fetch the strategy timings data from the strategy contract diff --git a/packages/processors/src/internal.ts b/packages/processors/src/internal.ts index b71289f..ec599c0 100644 --- a/packages/processors/src/internal.ts +++ b/packages/processors/src/internal.ts @@ -10,3 +10,4 @@ export * from "./allo/index.js"; // Strategy export * from "./strategy/index.js"; +export * from "./registry/index.js"; diff --git a/packages/processors/src/registry/handlers/profileCreated.handler.ts b/packages/processors/src/registry/handlers/profileCreated.handler.ts index 9d89c83..1417b4a 100644 --- a/packages/processors/src/registry/handlers/profileCreated.handler.ts +++ b/packages/processors/src/registry/handlers/profileCreated.handler.ts @@ -1,7 +1,7 @@ import { getAddress } from "viem"; import { Changeset, ProjectType } from "@grants-stack-indexer/repository"; -import { ChainId, ProtocolEvent } from "@grants-stack-indexer/shared"; +import { ChainId, ProcessorEvent } from "@grants-stack-indexer/shared"; import { IEventHandler, ProcessorDependencies } from "../../internal.js"; import { ProjectMetadata, ProjectMetadataSchema } from "../../schemas/projectMetadata.js"; @@ -15,7 +15,7 @@ type Dependencies = Pick< */ export class ProfileCreatedHandler implements IEventHandler<"Registry", "ProfileCreated"> { constructor( - readonly event: ProtocolEvent<"Registry", "ProfileCreated">, + readonly event: ProcessorEvent<"Registry", "ProfileCreated">, readonly chainId: ChainId, private dependencies: Dependencies, ) {} diff --git a/packages/processors/src/registry/handlers/roleGranted.handler.ts b/packages/processors/src/registry/handlers/roleGranted.handler.ts index 1e8c22e..461043e 100644 --- a/packages/processors/src/registry/handlers/roleGranted.handler.ts +++ b/packages/processors/src/registry/handlers/roleGranted.handler.ts @@ -1,7 +1,7 @@ import { getAddress } from "viem"; import { Changeset } from "@grants-stack-indexer/repository"; -import { ALLO_OWNER_ROLE, ChainId, ProtocolEvent } from "@grants-stack-indexer/shared"; +import { ALLO_OWNER_ROLE, ChainId, ProcessorEvent } from "@grants-stack-indexer/shared"; import { IEventHandler } from "../../internal.js"; import { ProcessorDependencies } from "../../types/processor.types.js"; @@ -11,7 +11,7 @@ import { ProcessorDependencies } from "../../types/processor.types.js"; */ export class RoleGrantedHandler implements IEventHandler<"Registry", "RoleGranted"> { constructor( - readonly event: ProtocolEvent<"Registry", "RoleGranted">, + readonly event: ProcessorEvent<"Registry", "RoleGranted">, readonly chainId: ChainId, private readonly dependencies: ProcessorDependencies, ) {} diff --git a/packages/processors/src/registry/registry.processor.ts b/packages/processors/src/registry/registry.processor.ts index d96e363..b23ab6c 100644 --- a/packages/processors/src/registry/registry.processor.ts +++ b/packages/processors/src/registry/registry.processor.ts @@ -1,5 +1,5 @@ import { Changeset } from "@grants-stack-indexer/repository"; -import { ChainId, ProtocolEvent, RegistryEvent } from "@grants-stack-indexer/shared"; +import { ChainId, ProcessorEvent, RegistryEvent } from "@grants-stack-indexer/shared"; import type { IProcessor } from "../internal.js"; import { UnsupportedEventException } from "../internal.js"; @@ -12,18 +12,18 @@ export class RegistryProcessor implements IProcessor<"Registry", RegistryEvent> private readonly dependencies: ProcessorDependencies, ) {} - async process(event: ProtocolEvent<"Registry", RegistryEvent>): Promise { + async process(event: ProcessorEvent<"Registry", RegistryEvent>): Promise { //TODO: Implement robust error handling and retry logic switch (event.eventName) { case "RoleGranted": return new RoleGrantedHandler( - event as ProtocolEvent<"Registry", "RoleGranted">, + event as ProcessorEvent<"Registry", "RoleGranted">, this.chainId, this.dependencies, ).handle(); case "ProfileCreated": return new ProfileCreatedHandler( - event as ProtocolEvent<"Registry", "ProfileCreated">, + event as ProcessorEvent<"Registry", "ProfileCreated">, this.chainId, this.dependencies, ).handle(); diff --git a/packages/processors/src/strategy/common/base.strategy.ts b/packages/processors/src/strategy/common/base.strategy.ts index eacefbf..83fcc7b 100644 --- a/packages/processors/src/strategy/common/base.strategy.ts +++ b/packages/processors/src/strategy/common/base.strategy.ts @@ -1,5 +1,5 @@ import { Changeset } from "@grants-stack-indexer/repository"; -import { Address, ProtocolEvent, StrategyEvent, Token } from "@grants-stack-indexer/shared"; +import { Address, ProcessorEvent, StrategyEvent, Token } from "@grants-stack-indexer/shared"; import { IStrategyHandler, StrategyTimings } from "../../internal.js"; @@ -39,5 +39,5 @@ export abstract class BaseStrategyHandler implements IStrategyHandler): Promise; + abstract handle(event: ProcessorEvent<"Strategy", StrategyEvent>): Promise; } diff --git a/packages/processors/src/strategy/common/baseDistributed.handler.ts b/packages/processors/src/strategy/common/baseDistributed.handler.ts index 138e918..816428c 100644 --- a/packages/processors/src/strategy/common/baseDistributed.handler.ts +++ b/packages/processors/src/strategy/common/baseDistributed.handler.ts @@ -1,7 +1,7 @@ import { getAddress } from "viem"; import { Changeset } from "@grants-stack-indexer/repository"; -import { ChainId, ProtocolEvent } from "@grants-stack-indexer/shared"; +import { ChainId, ProcessorEvent } from "@grants-stack-indexer/shared"; import { IEventHandler, ProcessorDependencies } from "../../internal.js"; @@ -21,7 +21,7 @@ type Dependencies = Pick; export class BaseDistributedHandler implements IEventHandler<"Strategy", "Distributed"> { constructor( - readonly event: ProtocolEvent<"Strategy", "Distributed">, + readonly event: ProcessorEvent<"Strategy", "Distributed">, private readonly chainId: ChainId, private readonly dependencies: Dependencies, ) {} diff --git a/packages/processors/src/strategy/donationVotingMerkleDistributionDirectTransfer/dvmdDirectTransfer.handler.ts b/packages/processors/src/strategy/donationVotingMerkleDistributionDirectTransfer/dvmdDirectTransfer.handler.ts index fb1ee1e..d6ded27 100644 --- a/packages/processors/src/strategy/donationVotingMerkleDistributionDirectTransfer/dvmdDirectTransfer.handler.ts +++ b/packages/processors/src/strategy/donationVotingMerkleDistributionDirectTransfer/dvmdDirectTransfer.handler.ts @@ -4,7 +4,7 @@ import { Changeset } from "@grants-stack-indexer/repository"; import { Address, ChainId, - ProtocolEvent, + ProcessorEvent, StrategyEvent, Token, } from "@grants-stack-indexer/shared"; @@ -44,17 +44,17 @@ export class DVMDDirectTransferStrategyHandler extends BaseStrategyHandler { } /** @inheritdoc */ - async handle(event: ProtocolEvent<"Strategy", StrategyEvent>): Promise { + async handle(event: ProcessorEvent<"Strategy", StrategyEvent>): Promise { switch (event.eventName) { case "Registered": return new DVMDRegisteredHandler( - event as ProtocolEvent<"Strategy", "Registered">, + event as ProcessorEvent<"Strategy", "Registered">, this.chainId, this.dependencies, ).handle(); case "Distributed": return new BaseDistributedHandler( - event as ProtocolEvent<"Strategy", "Distributed">, + event as ProcessorEvent<"Strategy", "Distributed">, this.chainId, this.dependencies, ).handle(); diff --git a/packages/processors/src/strategy/donationVotingMerkleDistributionDirectTransfer/handlers/registered.handler.ts b/packages/processors/src/strategy/donationVotingMerkleDistributionDirectTransfer/handlers/registered.handler.ts index 8fd10c8..c377b7a 100644 --- a/packages/processors/src/strategy/donationVotingMerkleDistributionDirectTransfer/handlers/registered.handler.ts +++ b/packages/processors/src/strategy/donationVotingMerkleDistributionDirectTransfer/handlers/registered.handler.ts @@ -1,7 +1,7 @@ import { getAddress } from "viem"; import { Changeset, NewApplication } from "@grants-stack-indexer/repository"; -import { ChainId, ProtocolEvent } from "@grants-stack-indexer/shared"; +import { ChainId, ProcessorEvent } from "@grants-stack-indexer/shared"; import { IEventHandler, @@ -29,7 +29,7 @@ type Dependencies = Pick< export class DVMDRegisteredHandler implements IEventHandler<"Strategy", "Registered"> { constructor( - readonly event: ProtocolEvent<"Strategy", "Registered">, + readonly event: ProcessorEvent<"Strategy", "Registered">, private readonly chainId: ChainId, private readonly dependencies: Dependencies, ) {} diff --git a/packages/processors/src/strategy/strategy.processor.ts b/packages/processors/src/strategy/strategy.processor.ts index 3b8ea94..c41557e 100644 --- a/packages/processors/src/strategy/strategy.processor.ts +++ b/packages/processors/src/strategy/strategy.processor.ts @@ -1,5 +1,5 @@ import { Changeset } from "@grants-stack-indexer/repository"; -import { ChainId, ProtocolEvent, StrategyEvent } from "@grants-stack-indexer/shared"; +import { ChainId, ProcessorEvent, StrategyEvent } from "@grants-stack-indexer/shared"; import type { IProcessor, ProcessorDependencies } from "../internal.js"; import { UnsupportedStrategy } from "../internal.js"; @@ -11,7 +11,7 @@ export class StrategyProcessor implements IProcessor<"Strategy", StrategyEvent> private readonly dependencies: ProcessorDependencies, ) {} - async process(event: ProtocolEvent<"Strategy", StrategyEvent>): Promise { + async process(event: ProcessorEvent<"Strategy", StrategyEvent>): Promise { const strategyId = event.strategyId; const strategyHandler = StrategyHandlerFactory.createHandler( diff --git a/packages/processors/test/allo/allo.processor.spec.ts b/packages/processors/test/allo/allo.processor.spec.ts index 173a2b3..bc47ef0 100644 --- a/packages/processors/test/allo/allo.processor.spec.ts +++ b/packages/processors/test/allo/allo.processor.spec.ts @@ -7,7 +7,7 @@ import type { IProjectReadRepository, IRoundReadRepository, } from "@grants-stack-indexer/repository"; -import type { AlloEvent, ChainId, ProtocolEvent } from "@grants-stack-indexer/shared"; +import type { AlloEvent, ChainId, ProcessorEvent } from "@grants-stack-indexer/shared"; import { AlloProcessor } from "../../src/allo/allo.processor.js"; import { PoolCreatedHandler } from "../../src/allo/handlers/poolCreated.handler.js"; @@ -50,10 +50,10 @@ describe("AlloProcessor", () => { }); it("call PoolCreatedHandler for PoolCreated event", async () => { - const mockEvent: ProtocolEvent<"Allo", "PoolCreated"> = { + const mockEvent: ProcessorEvent<"Allo", "PoolCreated"> = { eventName: "PoolCreated", // Add other necessary event properties here - } as ProtocolEvent<"Allo", "PoolCreated">; + } as ProcessorEvent<"Allo", "PoolCreated">; vi.spyOn(PoolCreatedHandler.prototype, "handle").mockResolvedValue([]); @@ -70,7 +70,7 @@ describe("AlloProcessor", () => { it("throw an error for unknown event names", async () => { const mockEvent = { eventName: "UnknownEvent", - } as unknown as ProtocolEvent<"Allo", AlloEvent>; + } as unknown as ProcessorEvent<"Allo", AlloEvent>; await expect(() => processor.process(mockEvent)).rejects.toThrow(UnsupportedEventException); }); diff --git a/packages/processors/test/allo/handlers/poolCreated.handler.spec.ts b/packages/processors/test/allo/handlers/poolCreated.handler.spec.ts index 89a8061..b508ca5 100644 --- a/packages/processors/test/allo/handlers/poolCreated.handler.spec.ts +++ b/packages/processors/test/allo/handlers/poolCreated.handler.spec.ts @@ -5,7 +5,7 @@ import type { EvmProvider } from "@grants-stack-indexer/chain-providers"; import type { IMetadataProvider } from "@grants-stack-indexer/metadata"; import type { IPricingProvider } from "@grants-stack-indexer/pricing"; import type { IRoundReadRepository, Round } from "@grants-stack-indexer/repository"; -import type { ChainId, DeepPartial, ProtocolEvent, TokenCode } from "@grants-stack-indexer/shared"; +import type { ChainId, DeepPartial, ProcessorEvent, TokenCode } from "@grants-stack-indexer/shared"; import { mergeDeep } from "@grants-stack-indexer/shared"; import { @@ -15,9 +15,9 @@ import { // Function to create a mock event with optional overrides function createMockEvent( - overrides: DeepPartial> = {}, -): ProtocolEvent<"Allo", "PoolCreated"> { - const defaultEvent: ProtocolEvent<"Allo", "PoolCreated"> = { + overrides: DeepPartial> = {}, +): ProcessorEvent<"Allo", "PoolCreated"> { + const defaultEvent: ProcessorEvent<"Allo", "PoolCreated"> = { blockNumber: 116385567, blockTimestamp: 1708369911, chainId: 10 as ChainId, @@ -41,7 +41,7 @@ function createMockEvent( strategyId: "0x9fa6890423649187b1f0e8bf4265f0305ce99523c3d11aa36b35a54617bb0ec0", }; - return mergeDeep(defaultEvent, overrides) as ProtocolEvent<"Allo", "PoolCreated">; + return mergeDeep(defaultEvent, overrides) as ProcessorEvent<"Allo", "PoolCreated">; } describe("PoolCreatedHandler", () => { diff --git a/packages/processors/test/registry/handlers/profileCreated.handler.spec.ts b/packages/processors/test/registry/handlers/profileCreated.handler.spec.ts index d2b1828..331bd42 100644 --- a/packages/processors/test/registry/handlers/profileCreated.handler.spec.ts +++ b/packages/processors/test/registry/handlers/profileCreated.handler.spec.ts @@ -5,13 +5,13 @@ import { EvmProvider } from "@grants-stack-indexer/chain-providers"; import { IMetadataProvider } from "@grants-stack-indexer/metadata"; import { IPricingProvider } from "@grants-stack-indexer/pricing"; import { IProjectReadRepository, IRoundReadRepository } from "@grants-stack-indexer/repository"; -import { Bytes32String, ChainId, ProtocolEvent } from "@grants-stack-indexer/shared"; +import { Bytes32String, ChainId, ProcessorEvent } from "@grants-stack-indexer/shared"; import { ProcessorDependencies } from "../../../src/internal.js"; import { ProfileCreatedHandler } from "../../../src/registry/handlers/index.js"; describe("ProfileCreatedHandler", () => { - let mockEvent: ProtocolEvent<"Registry", "ProfileCreated">; + let mockEvent: ProcessorEvent<"Registry", "ProfileCreated">; let mockChainId: ChainId; let mockDependencies: ProcessorDependencies; const mockedTxHash = "0x6e5a7115323ac1712f7c27adff46df2216324a4ad615a8c9ce488c32a1f3a035"; @@ -38,7 +38,7 @@ describe("ProfileCreatedHandler", () => { anchor: mockedAddress, owner: mockedAddress, }, - } as ProtocolEvent<"Registry", "ProfileCreated">; + } as ProcessorEvent<"Registry", "ProfileCreated">; mockChainId = 10 as ChainId; diff --git a/packages/processors/test/registry/handlers/roleGranted.handler.spec.ts b/packages/processors/test/registry/handlers/roleGranted.handler.spec.ts index 030ac33..72f23c4 100644 --- a/packages/processors/test/registry/handlers/roleGranted.handler.spec.ts +++ b/packages/processors/test/registry/handlers/roleGranted.handler.spec.ts @@ -5,7 +5,7 @@ import { ALLO_OWNER_ROLE, Bytes32String, ChainId, - ProtocolEvent, + ProcessorEvent, } from "@grants-stack-indexer/shared"; import { ProcessorDependencies } from "../../../src/internal.js"; @@ -41,7 +41,7 @@ describe("RoleGrantedHandler", () => { hash: "0x123", transactionIndex: 1, }, - } as ProtocolEvent<"Registry", "RoleGranted">; + } as ProcessorEvent<"Registry", "RoleGranted">; it("returns an empty array if role is ALLO_OWNER_ROLE", async () => { const event = mockedEvent; diff --git a/packages/processors/test/registry/registry.processor.spec.ts b/packages/processors/test/registry/registry.processor.spec.ts index 7a8c5cf..c26647c 100644 --- a/packages/processors/test/registry/registry.processor.spec.ts +++ b/packages/processors/test/registry/registry.processor.spec.ts @@ -1,6 +1,6 @@ import { afterEach, describe, expect, it, vi } from "vitest"; -import type { ChainId, ProtocolEvent, RegistryEvent } from "@grants-stack-indexer/shared"; +import type { ChainId, ProcessorEvent, RegistryEvent } from "@grants-stack-indexer/shared"; import { ProcessorDependencies, UnsupportedEventException } from "../../src/internal.js"; import { ProfileCreatedHandler } from "../../src/registry/handlers/profileCreated.handler.js"; @@ -36,9 +36,9 @@ describe("RegistryProcessor", () => { }); it("throws UnsupportedEventException for unsupported events", async () => { - const event: ProtocolEvent<"Registry", RegistryEvent> = { + const event: ProcessorEvent<"Registry", RegistryEvent> = { eventName: "UnsupportedEvent", - } as unknown as ProtocolEvent<"Registry", RegistryEvent>; + } as unknown as ProcessorEvent<"Registry", RegistryEvent>; const processor = new RegistryProcessor(chainId, dependencies); @@ -46,9 +46,9 @@ describe("RegistryProcessor", () => { }); it("should call ProfileCreatedHandler", async () => { - const event: ProtocolEvent<"Registry", "ProfileCreated"> = { + const event: ProcessorEvent<"Registry", "ProfileCreated"> = { eventName: "ProfileCreated", - } as ProtocolEvent<"Registry", "ProfileCreated">; + } as ProcessorEvent<"Registry", "ProfileCreated">; vi.spyOn(ProfileCreatedHandler.prototype, "handle").mockResolvedValue([]); @@ -60,9 +60,9 @@ describe("RegistryProcessor", () => { }); it("should call RoleGrantedHandler", async () => { - const event: ProtocolEvent<"Registry", "RoleGranted"> = { + const event: ProcessorEvent<"Registry", "RoleGranted"> = { eventName: "RoleGranted", - } as ProtocolEvent<"Registry", "RoleGranted">; + } as ProcessorEvent<"Registry", "RoleGranted">; vi.spyOn(RoleGrantedHandler.prototype, "handle").mockResolvedValue([]); diff --git a/packages/processors/test/strategy/common/baseDistributed.handler.spec.ts b/packages/processors/test/strategy/common/baseDistributed.handler.spec.ts index c05d38e..a20abdc 100644 --- a/packages/processors/test/strategy/common/baseDistributed.handler.spec.ts +++ b/packages/processors/test/strategy/common/baseDistributed.handler.spec.ts @@ -1,14 +1,14 @@ import { beforeEach, describe, expect, it, vi } from "vitest"; import { IRoundReadRepository, Round } from "@grants-stack-indexer/repository"; -import { ChainId, ProtocolEvent } from "@grants-stack-indexer/shared"; +import { ChainId, ProcessorEvent } from "@grants-stack-indexer/shared"; import { BaseDistributedHandler } from "../../../src/strategy/common/baseDistributed.handler.js"; function createMockEvent( - overrides: Partial> = {}, -): ProtocolEvent<"Strategy", "Distributed"> { - const defaultEvent: ProtocolEvent<"Strategy", "Distributed"> = { + overrides: Partial> = {}, +): ProcessorEvent<"Strategy", "Distributed"> { + const defaultEvent: ProcessorEvent<"Strategy", "Distributed"> = { params: { amount: 1000, recipientAddress: "0x1234567890123456789012345678901234567890", @@ -36,7 +36,7 @@ function createMockEvent( describe("BaseDistributedHandler", () => { let handler: BaseDistributedHandler; let mockRoundRepository: IRoundReadRepository; - let mockEvent: ProtocolEvent<"Strategy", "Distributed">; + let mockEvent: ProcessorEvent<"Strategy", "Distributed">; const chainId = 10 as ChainId; beforeEach(() => { diff --git a/packages/processors/test/strategy/donationVotingMerkleDistributionDirectTransfer/dvmdDirectTransfer.handler.spec.ts b/packages/processors/test/strategy/donationVotingMerkleDistributionDirectTransfer/dvmdDirectTransfer.handler.spec.ts index 486da9e..87a7825 100644 --- a/packages/processors/test/strategy/donationVotingMerkleDistributionDirectTransfer/dvmdDirectTransfer.handler.spec.ts +++ b/packages/processors/test/strategy/donationVotingMerkleDistributionDirectTransfer/dvmdDirectTransfer.handler.spec.ts @@ -10,7 +10,7 @@ import { EvmProvider } from "@grants-stack-indexer/chain-providers"; import { IPricingProvider } from "@grants-stack-indexer/pricing"; import { ChainId, - ProtocolEvent, + ProcessorEvent, StrategyEvent, Token, TokenCode, @@ -83,7 +83,7 @@ describe("DVMDDirectTransferHandler", () => { it("calls RegisteredHandler for Registered event", async () => { const mockEvent = { eventName: "Registered", - } as ProtocolEvent<"Strategy", "Registered">; + } as ProcessorEvent<"Strategy", "Registered">; vi.spyOn(DVMDRegisteredHandler.prototype, "handle").mockResolvedValue([]); @@ -102,7 +102,7 @@ describe("DVMDDirectTransferHandler", () => { it("calls DistributedHandler for Distributed event", async () => { const mockEvent = { eventName: "Distributed", - } as ProtocolEvent<"Strategy", "Distributed">; + } as ProcessorEvent<"Strategy", "Distributed">; vi.spyOn(BaseDistributedHandler.prototype, "handle").mockResolvedValue([]); @@ -227,7 +227,7 @@ describe("DVMDDirectTransferHandler", () => { it.skip("calls FundsDistributedHandler for FundsDistributed event"); it("throws UnsupportedEventException for unknown event names", async () => { - const mockEvent = { eventName: "UnknownEvent" } as unknown as ProtocolEvent< + const mockEvent = { eventName: "UnknownEvent" } as unknown as ProcessorEvent< "Strategy", StrategyEvent >; diff --git a/packages/processors/test/strategy/donationVotingMerkleDistributionDirectTransfer/handlers/registered.handler.spec.ts b/packages/processors/test/strategy/donationVotingMerkleDistributionDirectTransfer/handlers/registered.handler.spec.ts index ff494ee..1340273 100644 --- a/packages/processors/test/strategy/donationVotingMerkleDistributionDirectTransfer/handlers/registered.handler.spec.ts +++ b/packages/processors/test/strategy/donationVotingMerkleDistributionDirectTransfer/handlers/registered.handler.spec.ts @@ -8,15 +8,15 @@ import { Project, Round, } from "@grants-stack-indexer/repository"; -import { ChainId, DeepPartial, mergeDeep, ProtocolEvent } from "@grants-stack-indexer/shared"; +import { ChainId, DeepPartial, mergeDeep, ProcessorEvent } from "@grants-stack-indexer/shared"; import { ProjectNotFound, RoundNotFound } from "../../../../src/exceptions/index.js"; import { DVMDRegisteredHandler } from "../../../../src/strategy/donationVotingMerkleDistributionDirectTransfer/handlers/index.js"; function createMockEvent( - overrides: DeepPartial> = {}, -): ProtocolEvent<"Strategy", "Registered"> { - const defaultEvent: ProtocolEvent<"Strategy", "Registered"> = { + overrides: DeepPartial> = {}, +): ProcessorEvent<"Strategy", "Registered"> { + const defaultEvent: ProcessorEvent<"Strategy", "Registered"> = { params: { recipientId: "0x1234567890123456789012345678901234567890", sender: "0x0987654321098765432109876543210987654321", @@ -37,7 +37,7 @@ function createMockEvent( strategyId: "0x9fa6890423649187b1f0e8bf4265f0305ce99523c3d11aa36b35a54617bb0ec0", }; - return mergeDeep(defaultEvent, overrides) as ProtocolEvent<"Strategy", "Registered">; + return mergeDeep(defaultEvent, overrides) as ProcessorEvent<"Strategy", "Registered">; } describe("DVMDRegisteredHandler", () => { @@ -45,7 +45,7 @@ describe("DVMDRegisteredHandler", () => { let mockRoundRepository: IRoundReadRepository; let mockProjectRepository: IProjectReadRepository; let mockMetadataProvider: IMetadataProvider; - let mockEvent: ProtocolEvent<"Strategy", "Registered">; + let mockEvent: ProcessorEvent<"Strategy", "Registered">; const chainId = 10 as ChainId; beforeEach(() => { diff --git a/packages/processors/test/strategy/strategy.processor.spec.ts b/packages/processors/test/strategy/strategy.processor.spec.ts index d860b8c..b195c90 100644 --- a/packages/processors/test/strategy/strategy.processor.spec.ts +++ b/packages/processors/test/strategy/strategy.processor.spec.ts @@ -7,7 +7,7 @@ import type { IProjectReadRepository, IRoundReadRepository, } from "@grants-stack-indexer/repository"; -import type { ChainId, ProtocolEvent, StrategyEvent } from "@grants-stack-indexer/shared"; +import type { ChainId, ProcessorEvent, StrategyEvent } from "@grants-stack-indexer/shared"; import { StrategyProcessor, UnsupportedStrategy } from "../../src/internal.js"; @@ -41,7 +41,7 @@ describe("StrategyProcessor", () => { const mockEvent = { eventName: "UnknownEvent", strategyId: "0xunknown", - } as unknown as ProtocolEvent<"Strategy", StrategyEvent>; + } as unknown as ProcessorEvent<"Strategy", StrategyEvent>; await expect(() => processor.process(mockEvent)).rejects.toThrow(UnsupportedStrategy); }); diff --git a/packages/shared/src/external.ts b/packages/shared/src/external.ts index 0c449d0..c3ed0c1 100644 --- a/packages/shared/src/external.ts +++ b/packages/shared/src/external.ts @@ -1,5 +1,5 @@ export type * from "./types/index.js"; -export type { Address } from "./internal.js"; +export type { Address, Hex } from "./internal.js"; export { NATIVE_TOKEN_ADDRESS, isNativeToken, @@ -18,4 +18,5 @@ export type { BigNumberType } from "./internal.js"; export type { TokenCode, Token } from "./internal.js"; export { TOKENS, getToken } from "./tokens/tokens.js"; +export { isAlloEvent, isRegistryEvent, isStrategyEvent } from "./internal.js"; export { stringify } from "./internal.js"; diff --git a/packages/shared/src/internal.ts b/packages/shared/src/internal.ts index fae9241..1130d35 100644 --- a/packages/shared/src/internal.ts +++ b/packages/shared/src/internal.ts @@ -1,4 +1,4 @@ -export type { Address } from "viem"; +export type { Address, Hex } from "viem"; export { stringify } from "viem/utils"; export * from "./math/bignumber.js"; export * from "./types/index.js"; diff --git a/packages/shared/src/types/events/allo.ts b/packages/shared/src/types/events/allo.ts index d4b8e57..3b8e325 100644 --- a/packages/shared/src/types/events/allo.ts +++ b/packages/shared/src/types/events/allo.ts @@ -1,9 +1,14 @@ -import { Address } from "../../internal.js"; +import { Address, AnyEvent, ContractName, ProcessorEvent } from "../../internal.js"; + +/** + * This array is used to represent all Allo events. + */ +const AlloEventArray = ["PoolCreated"] as const; /** * This type is used to represent a Allo events. */ -export type AlloEvent = "PoolCreated"; +export type AlloEvent = (typeof AlloEventArray)[number]; /** * This type maps Allo events to their respective parameters. @@ -23,3 +28,17 @@ export type PoolCreatedParams = { amount: bigint; metadata: [protocol: bigint, pointer: string]; }; + +/** + * Type guard for Allo events. + * @param event The event to check. + * @returns True if the event is a Allo event, false otherwise. + */ +export function isAlloEvent( + event: ProcessorEvent, +): event is ProcessorEvent<"Allo", AlloEvent> { + return ( + event.contractName === "Allo" && + (AlloEventArray as readonly string[]).includes(event.eventName) + ); +} diff --git a/packages/shared/src/types/events/common.ts b/packages/shared/src/types/events/common.ts index 005cd5b..be5db3e 100644 --- a/packages/shared/src/types/events/common.ts +++ b/packages/shared/src/types/events/common.ts @@ -11,7 +11,7 @@ import { } from "./index.js"; export type ContractName = "Strategy" | "Allo" | "Registry"; -export type AnyEvent = StrategyEvent | AlloEvent; +export type AnyEvent = StrategyEvent | AlloEvent | RegistryEvent; type TransactionFields = { hash: Hex; @@ -48,9 +48,9 @@ export type EventParams : never; /** - * This type is used to represent a protocol event. + * This type represents events fetched from the indexer. */ -export type ProtocolEvent> = { +export type IndexerFetchedEvent> = { //TODO: make blocknumber and chainId bigints, implies implementing adapter patterns in the EventsFetcher or IndexerClient blockNumber: number; blockTimestamp: number; @@ -61,13 +61,7 @@ export type ProtocolEvent; srcAddress: Address; transactionFields: TransactionFields; -} & (T extends "Strategy" // strategyId should be defined for Strategy events or PoolCreated events in Allo - ? { strategyId: Address } - : T extends "Allo" - ? E extends "PoolCreated" - ? { strategyId: Address } - : object - : object); +}; /** * TODO: This type is currently only used in the EventsFetcher and IndexerClient. @@ -75,7 +69,22 @@ export type ProtocolEvent>, - "strategyId" +export type AnyIndexerFetchedEvent = IndexerFetchedEvent< + ContractName, + ContractToEventName >; + +/** + * This type represents events processed by the processor after being enriched with strategy ids. + */ +export type ProcessorEvent< + T extends ContractName, + E extends ContractToEventName, +> = IndexerFetchedEvent & + (T extends "Strategy" // strategyId should be defined for Strategy events or PoolCreated events in Allo + ? { strategyId: Address } + : T extends "Allo" + ? E extends "PoolCreated" + ? { strategyId: Address } + : object + : object); diff --git a/packages/shared/src/types/events/registry.ts b/packages/shared/src/types/events/registry.ts index d47c2f6..5871ec9 100644 --- a/packages/shared/src/types/events/registry.ts +++ b/packages/shared/src/types/events/registry.ts @@ -1,16 +1,16 @@ -//TODO: remove comment once we support registry events -// export type RegistryEvent = -// | "ProfileCreated" -// | "ProfileMetadataUpdated" -// | "ProfileNameUpdated" -// | "ProfileOwnerUpdated"; +// TODO: Should we validate event params in runtime ? How should we approach that ? -import { Address, Bytes32String } from "../../internal.js"; +import { Address, AnyEvent, Bytes32String, ContractName, ProcessorEvent } from "../../internal.js"; + +/** + * This array is used to represent all Registry events. + */ +const RegistryEventArray = ["ProfileCreated", "RoleGranted"] as const; /** * This type is used to represent a Registry events. */ -export type RegistryEvent = "ProfileCreated" | "RoleGranted"; +export type RegistryEvent = (typeof RegistryEventArray)[number]; /** * This type maps Registry events to their respective parameters. @@ -37,3 +37,17 @@ export type RoleGrantedParams = { account: Address; sender: Address; }; + +/** + * Type guard for Registry events. + * @param event The event to check. + * @returns True if the event is a Registry event, false otherwise. + */ +export function isRegistryEvent( + event: ProcessorEvent, +): event is ProcessorEvent<"Registry", RegistryEvent> { + return ( + event.contractName === "Registry" && + (RegistryEventArray as readonly string[]).includes(event.eventName) + ); +} diff --git a/packages/shared/src/types/events/strategy.ts b/packages/shared/src/types/events/strategy.ts index e4b3d19..fbff8db 100644 --- a/packages/shared/src/types/events/strategy.ts +++ b/packages/shared/src/types/events/strategy.ts @@ -1,15 +1,22 @@ import { Hex } from "viem"; -import { Address } from "../../internal.js"; +import { Address, AnyEvent, ContractName, ProcessorEvent } from "../../internal.js"; + +/** + * This array is used to represent all Strategy events. + */ +const StrategyEventArray = [ + "Registered", + "Distributed", + "TimestampsUpdated", + "AllocatedWithToken", +] as const; /** * This type is used to represent a Strategy events. */ -export type StrategyEvent = - | "Registered" - | "Distributed" - | "TimestampsUpdated" - | "AllocatedWithToken"; +export type StrategyEvent = (typeof StrategyEventArray)[number]; + /** * This type maps Strategy events to their respective parameters. */ @@ -49,3 +56,17 @@ export type AllocatedWithTokenParams = { tokenAddress: Address; amount: number; }; + +/** + * Type guard for Strategy events. + * @param event The event to check. + * @returns True if the event is a Strategy event, false otherwise. + */ +export function isStrategyEvent( + event: ProcessorEvent, +): event is ProcessorEvent<"Strategy", StrategyEvent> { + return ( + event.contractName === "Strategy" && + (StrategyEventArray as readonly string[]).includes(event.eventName) + ); +} diff --git a/pnpm-lock.yaml b/pnpm-lock.yaml index 903cdfa..8619cb0 100644 --- a/pnpm-lock.yaml +++ b/pnpm-lock.yaml @@ -110,9 +110,21 @@ importers: packages/data-flow: dependencies: + "@grants-stack-indexer/chain-providers": + specifier: workspace:* + version: link:../chain-providers "@grants-stack-indexer/indexer-client": specifier: workspace:* version: link:../indexer-client + "@grants-stack-indexer/metadata": + specifier: workspace:* + version: link:../metadata + "@grants-stack-indexer/pricing": + specifier: workspace:* + version: link:../pricing + "@grants-stack-indexer/processors": + specifier: workspace:* + version: link:../processors "@grants-stack-indexer/repository": specifier: workspace:* version: link:../repository