diff --git a/packages/data-flow/src/abis/index.ts b/packages/data-flow/src/abis/index.ts new file mode 100644 index 0000000..8b6c147 --- /dev/null +++ b/packages/data-flow/src/abis/index.ts @@ -0,0 +1 @@ +export * from "./strategy.abi.js"; diff --git a/packages/data-flow/src/abis/strategy.abi.ts b/packages/data-flow/src/abis/strategy.abi.ts new file mode 100644 index 0000000..e1e182d --- /dev/null +++ b/packages/data-flow/src/abis/strategy.abi.ts @@ -0,0 +1,225 @@ +export const iStrategyAbi = [ + { + type: "function", + name: "allocate", + inputs: [ + { name: "_data", type: "bytes", internalType: "bytes" }, + { name: "_sender", type: "address", internalType: "address" }, + ], + outputs: [], + stateMutability: "payable", + }, + { + type: "function", + name: "distribute", + inputs: [ + { name: "_recipientIds", type: "address[]", internalType: "address[]" }, + { name: "_data", type: "bytes", internalType: "bytes" }, + { name: "_sender", type: "address", internalType: "address" }, + ], + outputs: [], + stateMutability: "nonpayable", + }, + { + type: "function", + name: "getAllo", + inputs: [], + outputs: [{ name: "", type: "address", internalType: "contract IAllo" }], + stateMutability: "view", + }, + { + type: "function", + name: "getPayouts", + inputs: [ + { name: "_recipientIds", type: "address[]", internalType: "address[]" }, + { name: "_data", type: "bytes[]", internalType: "bytes[]" }, + ], + outputs: [ + { + name: "", + type: "tuple[]", + internalType: "struct IStrategy.PayoutSummary[]", + components: [ + { + name: "recipientAddress", + type: "address", + internalType: "address", + }, + { name: "amount", type: "uint256", internalType: "uint256" }, + ], + }, + ], + stateMutability: "view", + }, + { + type: "function", + name: "getPoolAmount", + inputs: [], + outputs: [{ name: "", type: "uint256", internalType: "uint256" }], + stateMutability: "view", + }, + { + type: "function", + name: "getPoolId", + inputs: [], + outputs: [{ name: "", type: "uint256", internalType: "uint256" }], + stateMutability: "view", + }, + { + type: "function", + name: "getRecipientStatus", + inputs: [{ name: "_recipientId", type: "address", internalType: "address" }], + outputs: [{ name: "", type: "uint8", internalType: "enum IStrategy.Status" }], + stateMutability: "view", + }, + { + type: "function", + name: "getStrategyId", + inputs: [], + outputs: [{ name: "", type: "bytes32", internalType: "bytes32" }], + stateMutability: "view", + }, + { + type: "function", + name: "increasePoolAmount", + inputs: [{ name: "_amount", type: "uint256", internalType: "uint256" }], + outputs: [], + stateMutability: "nonpayable", + }, + { + type: "function", + name: "initialize", + inputs: [ + { name: "_poolId", type: "uint256", internalType: "uint256" }, + { name: "_data", type: "bytes", internalType: "bytes" }, + ], + outputs: [], + stateMutability: "nonpayable", + }, + { + type: "function", + name: "isPoolActive", + inputs: [], + outputs: [{ name: "", type: "bool", internalType: "bool" }], + stateMutability: "nonpayable", + }, + { + type: "function", + name: "isValidAllocator", + inputs: [{ name: "_allocator", type: "address", internalType: "address" }], + outputs: [{ name: "", type: "bool", internalType: "bool" }], + stateMutability: "view", + }, + { + type: "function", + name: "registerRecipient", + inputs: [ + { name: "_data", type: "bytes", internalType: "bytes" }, + { name: "_sender", type: "address", internalType: "address" }, + ], + outputs: [{ name: "", type: "address", internalType: "address" }], + stateMutability: "payable", + }, + { + type: "event", + name: "Allocated", + inputs: [ + { + name: "recipientId", + type: "address", + indexed: true, + internalType: "address", + }, + { + name: "amount", + type: "uint256", + indexed: false, + internalType: "uint256", + }, + { + name: "token", + type: "address", + indexed: false, + internalType: "address", + }, + { + name: "sender", + type: "address", + indexed: false, + internalType: "address", + }, + ], + anonymous: false, + }, + { + type: "event", + name: "Distributed", + inputs: [ + { + name: "recipientId", + type: "address", + indexed: true, + internalType: "address", + }, + { + name: "recipientAddress", + type: "address", + indexed: false, + internalType: "address", + }, + { + name: "amount", + type: "uint256", + indexed: false, + internalType: "uint256", + }, + { + name: "sender", + type: "address", + indexed: false, + internalType: "address", + }, + ], + anonymous: false, + }, + { + type: "event", + name: "Initialized", + inputs: [ + { + name: "poolId", + type: "uint256", + indexed: false, + internalType: "uint256", + }, + { name: "data", type: "bytes", indexed: false, internalType: "bytes" }, + ], + anonymous: false, + }, + { + type: "event", + name: "PoolActive", + inputs: [{ name: "active", type: "bool", indexed: false, internalType: "bool" }], + anonymous: false, + }, + { + type: "event", + name: "Registered", + inputs: [ + { + name: "recipientId", + type: "address", + indexed: true, + internalType: "address", + }, + { name: "data", type: "bytes", indexed: false, internalType: "bytes" }, + { + name: "sender", + type: "address", + indexed: false, + internalType: "address", + }, + ], + anonymous: false, + }, +] as const; diff --git a/packages/data-flow/src/eventsProcessor.ts b/packages/data-flow/src/eventsProcessor.ts index a7bd917..b27948e 100644 --- a/packages/data-flow/src/eventsProcessor.ts +++ b/packages/data-flow/src/eventsProcessor.ts @@ -1,13 +1,10 @@ +import type { Changeset } from "@grants-stack-indexer/repository"; import { AlloProcessor, ProcessorDependencies, RegistryProcessor, StrategyProcessor, } from "@grants-stack-indexer/processors"; -import { Changeset } from "@grants-stack-indexer/repository"; - -import "@grants-stack-indexer/processors/dist/src/internal.js"; - import { AnyEvent, ChainId, @@ -20,6 +17,11 @@ import { import { InvalidEvent } from "./exceptions/index.js"; +/** + * EventsProcessor handles the processing of Allo V2 events by delegating them to the appropriate processor + * (Allo, Registry, or Strategy) based on the event type. Each processor generates changesets that represent + * the required database updates. + */ export class EventsProcessor { alloProcessor: AlloProcessor; registryProcessor: RegistryProcessor; @@ -31,6 +33,12 @@ export class EventsProcessor { this.strategyProcessor = new StrategyProcessor(chainId, dependencies); } + /** + * Process an Allo V2 event and return the changesets + * @param event - The event to process + * @returns The changesets + * @throws InvalidEvent if the event is not a valid Allo V2 event (Allo, Registry or Strategy) + */ public async processEvent(event: ProcessorEvent): Promise { if (isAlloEvent(event)) { return await this.alloProcessor.process(event); diff --git a/packages/data-flow/src/eventsRegistry.ts b/packages/data-flow/src/eventsRegistry.ts index b458923..5e5852e 100644 --- a/packages/data-flow/src/eventsRegistry.ts +++ b/packages/data-flow/src/eventsRegistry.ts @@ -1,17 +1,18 @@ -import { AnyEvent, ContractName, ProcessorEvent } from "@grants-stack-indexer/shared"; +import type { AnyEvent, ContractName, ProcessorEvent } from "@grants-stack-indexer/shared"; + +import type { IEventsRegistry } from "./internal.js"; /** * Class to store the last processed event */ -export class EventsRegistry { - //TODO: Implement storage to persist the last processed event - private lastProcessedEvent: ProcessorEvent | null = null; - - constructor() {} +export class InMemoryEventsRegistry implements IEventsRegistry { + //TODO: Implement storage to persist the last processed event. we need to store it by chainId + private lastProcessedEvent: ProcessorEvent | undefined; - async getLastProcessedEvent(): Promise | null> { + async getLastProcessedEvent(): Promise | undefined> { return this.lastProcessedEvent; } + async saveLastProcessedEvent(event: ProcessorEvent): Promise { this.lastProcessedEvent = event; } diff --git a/packages/data-flow/src/interfaces/eventsRegistry.interface.ts b/packages/data-flow/src/interfaces/eventsRegistry.interface.ts new file mode 100644 index 0000000..37bfd18 --- /dev/null +++ b/packages/data-flow/src/interfaces/eventsRegistry.interface.ts @@ -0,0 +1,6 @@ +import { AnyEvent, ContractName, ProcessorEvent } from "@grants-stack-indexer/shared"; + +export interface IEventsRegistry { + getLastProcessedEvent(): Promise | undefined>; + saveLastProcessedEvent(event: ProcessorEvent): Promise; +} diff --git a/packages/data-flow/src/interfaces/index.ts b/packages/data-flow/src/interfaces/index.ts index 883b4bb..b46333d 100644 --- a/packages/data-flow/src/interfaces/index.ts +++ b/packages/data-flow/src/interfaces/index.ts @@ -1,2 +1,4 @@ export * from "./eventsFetcher.interface.js"; export * from "./dataLoader.interface.js"; +export * from "./eventsRegistry.interface.js"; +export * from "./strategyRegistry.interface.js"; diff --git a/packages/data-flow/src/interfaces/strategyRegistry.interface.ts b/packages/data-flow/src/interfaces/strategyRegistry.interface.ts new file mode 100644 index 0000000..4cf0e89 --- /dev/null +++ b/packages/data-flow/src/interfaces/strategyRegistry.interface.ts @@ -0,0 +1,6 @@ +import { Address, Hex } from "viem"; + +export interface IStrategyRegistry { + getStrategyId(strategyAddress: Address): Promise; + saveStrategyId(strategyAddress: Address, strategyId: Hex): Promise; +} diff --git a/packages/data-flow/src/internal.ts b/packages/data-flow/src/internal.ts index b6a22e0..e7261a5 100644 --- a/packages/data-flow/src/internal.ts +++ b/packages/data-flow/src/internal.ts @@ -1,5 +1,7 @@ export * from "./types/index.js"; export * from "./interfaces/index.js"; export * from "./exceptions/index.js"; +export * from "./abis/index.js"; +export * from "./utils/index.js"; export * from "./data-loader/index.js"; export * from "./eventsFetcher.js"; diff --git a/packages/data-flow/src/orchestrator.ts b/packages/data-flow/src/orchestrator.ts index fbf4306..cd110f4 100644 --- a/packages/data-flow/src/orchestrator.ts +++ b/packages/data-flow/src/orchestrator.ts @@ -1,69 +1,158 @@ // class should contain the logic to orchestrate the data flow Events Fetcher -> Events Processor -> Data Loader import { IIndexerClient } from "@grants-stack-indexer/indexer-client"; -import { ProcessorDependencies } from "@grants-stack-indexer/processors"; -import { AnyEvent, ChainId, ContractName, ProcessorEvent } from "@grants-stack-indexer/shared"; +import { UnsupportedStrategy } from "@grants-stack-indexer/processors/dist/src/internal.js"; +import { + Address, + AnyEvent, + ChainId, + ContractName, + Hex, + isAlloEvent, + isStrategyEvent, + ProcessorEvent, + StrategyEvent, + stringify, +} from "@grants-stack-indexer/shared"; import { EventsFetcher } from "./eventsFetcher.js"; import { EventsProcessor } from "./eventsProcessor.js"; -import { EventsRegistry } from "./eventsRegistry.js"; -import { StrategyRegistry } from "./strategyRegistry.js"; +import { InvalidEvent } from "./exceptions/index.js"; +import { IEventsRegistry } from "./interfaces/eventsRegistry.interface.js"; +import { IEventsFetcher } from "./interfaces/index.js"; +import { IStrategyRegistry } from "./interfaces/strategyRegistry.interface.js"; +import { CoreDependencies, DataLoader, delay, IQueue, iStrategyAbi, Queue } from "./internal.js"; export class Orchestrator { - private eventsQueue: ProcessorEvent[] = []; - private readonly eventsFetcher: EventsFetcher; + private readonly eventsQueue: IQueue>; + private readonly eventsFetcher: IEventsFetcher; private readonly eventsProcessor: EventsProcessor; - private readonly eventsRegistry: EventsRegistry; - private readonly strategyRegistry: StrategyRegistry; + private readonly eventsRegistry: IEventsRegistry; + private readonly strategyRegistry: IStrategyRegistry; + private readonly dataLoader: DataLoader; constructor( private chainId: ChainId, - private dependencies: Readonly, + private dependencies: Readonly, private indexerClient: IIndexerClient, + private registries: { + eventsRegistry: IEventsRegistry; + strategyRegistry: IStrategyRegistry; + }, private fetchLimit: number = 1000, private fetchDelay: number = 10000, ) { this.eventsFetcher = new EventsFetcher(this.indexerClient); this.eventsProcessor = new EventsProcessor(this.chainId, this.dependencies); - this.eventsRegistry = new EventsRegistry(); - this.strategyRegistry = new StrategyRegistry(); + this.eventsRegistry = registries.eventsRegistry; + this.strategyRegistry = registries.strategyRegistry; + this.dataLoader = new DataLoader({ + project: this.dependencies.projectRepository, + round: this.dependencies.roundRepository, + application: this.dependencies.applicationRepository, + }); + this.eventsQueue = new Queue>(fetchLimit); } async run(): Promise { while (true) { + let event: ProcessorEvent | undefined; try { - if (this.eventsQueue.length === 0) { - const lastProcessedEvent = await this.eventsRegistry.getLastProcessedEvent(); - const blockNumber = lastProcessedEvent ? lastProcessedEvent.blockNumber : 0; - const logIndex = lastProcessedEvent ? lastProcessedEvent.logIndex : 0; - - const events = await this.eventsFetcher.fetchEventsByBlockNumberAndLogIndex( - this.chainId, - blockNumber, - logIndex, - this.fetchLimit, - ); - this.eventsQueue.push(...events); + if (this.eventsQueue.isEmpty()) await this.fillQueue(); + + event = this.eventsQueue.pop(); - //there are no events left to process - if (events.length === 0) { - await delay(this.fetchDelay); - break; - } + if (!event) { + await delay(this.fetchDelay); + continue; } - // if poolCreated event, get strategyId and save in the map - // if strategy event populate event with strategyId if exists in the map - // get strategyId and populate event with it + event = await this.enhanceStrategyId(event); + const changesets = await this.eventsProcessor.processEvent(event); - const event = this.eventsQueue[0]; - } catch (error) { - console.log("asd"); + const executionResult = await this.dataLoader.applyChanges(changesets); + + if (executionResult.numFailed > 0) { + //TODO: should we retry the failed changesets? + console.error( + `Failed to apply changesets. ${executionResult.errors.join("\n")} Event: ${stringify( + event, + )}`, + ); + } + + await this.eventsRegistry.saveLastProcessedEvent(event); + } catch (error: unknown) { + // TODO: improve error handling and notify + if (error instanceof UnsupportedStrategy || error instanceof InvalidEvent) { + console.error(`${error.name}: ${error.message}. Event: ${stringify(event)}`); + } else { + console.error(`Error processing event: ${stringify(event)}`, error); + } } } } -} -function delay(ms: number): Promise { - return new Promise((resolve) => setTimeout(resolve, ms)); + private async fillQueue(): Promise { + const lastProcessedEvent = await this.eventsRegistry.getLastProcessedEvent(); + const blockNumber = lastProcessedEvent?.blockNumber ?? 0; + const logIndex = lastProcessedEvent?.logIndex ?? 0; + + const events = await this.eventsFetcher.fetchEventsByBlockNumberAndLogIndex( + this.chainId, + blockNumber, + logIndex, + this.fetchLimit, + ); + + this.eventsQueue.push(...events); + } + + // if poolCreated event, get strategyId and save in the map + // if strategy event populate event with strategyId if exists in the map + // get strategyId and populate event with it + private async enhanceStrategyId( + event: ProcessorEvent, + ): Promise> { + if (!this.requiresStrategyId(event)) { + return event; + } + + const strategyAddress = this.getStrategyAddress(event); + const strategyId = await this.getOrFetchStrategyId(strategyAddress); + event.strategyId = strategyId; + + return event; + } + + private getStrategyAddress( + event: ProcessorEvent<"Allo", "PoolCreated"> | ProcessorEvent<"Strategy", StrategyEvent>, + ): Address { + return isAlloEvent(event) && event.eventName === "PoolCreated" + ? event.params.strategy + : event.srcAddress; + } + + private async getOrFetchStrategyId(strategyAddress: Address): Promise { + const existingId = await this.strategyRegistry.getStrategyId(strategyAddress); + if (existingId) { + return existingId; + } + + const strategyId = await this.dependencies.evmProvider.readContract( + strategyAddress, + iStrategyAbi, + "getStrategyId", + ); + + await this.strategyRegistry.saveStrategyId(strategyAddress, strategyId); + + return strategyId; + } + + private requiresStrategyId( + event: ProcessorEvent, + ): event is ProcessorEvent<"Allo", "PoolCreated"> | ProcessorEvent<"Strategy", StrategyEvent> { + return (isAlloEvent(event) && event.eventName === "PoolCreated") || isStrategyEvent(event); + } } diff --git a/packages/data-flow/src/strategyRegistry.ts b/packages/data-flow/src/strategyRegistry.ts index e4a7b7b..ae6649f 100644 --- a/packages/data-flow/src/strategyRegistry.ts +++ b/packages/data-flow/src/strategyRegistry.ts @@ -1,15 +1,18 @@ -import { Address, Hex } from "viem"; +import type { Address, Hex } from "viem"; + +import type { IStrategyRegistry } from "./internal.js"; /** * Class to store strategy ids */ -export class StrategyRegistry { - //TODO: Implement storage to persist strategies +export class InMemoryStrategyRegistry implements IStrategyRegistry { + //TODO: Implement storage to persist strategies. since we're using address, do we need ChainId? private strategiesMap: Map = new Map(); async getStrategyId(strategyAddress: Address): Promise { return this.strategiesMap.get(strategyAddress); } + async saveStrategyId(strategyAddress: Address, strategyId: Hex): Promise { this.strategiesMap.set(strategyAddress, strategyId); } diff --git a/packages/data-flow/src/types/index.ts b/packages/data-flow/src/types/index.ts index 2bd755b..a67db51 100644 --- a/packages/data-flow/src/types/index.ts +++ b/packages/data-flow/src/types/index.ts @@ -1,4 +1,10 @@ -import { Changeset } from "@grants-stack-indexer/repository"; +import { ProcessorDependencies } from "@grants-stack-indexer/processors"; +import { + Changeset, + IApplicationRepository, + IProjectRepository, + IRoundRepository, +} from "@grants-stack-indexer/repository"; export type ExecutionResult = { changesets: Changeset["type"][]; @@ -7,3 +13,12 @@ export type ExecutionResult = { numFailed: number; errors: string[]; }; + +export type CoreDependencies = Pick< + ProcessorDependencies, + "evmProvider" | "pricingProvider" | "metadataProvider" +> & { + roundRepository: IRoundRepository; + projectRepository: IProjectRepository; + applicationRepository: IApplicationRepository; +}; diff --git a/packages/data-flow/src/utils/delay.ts b/packages/data-flow/src/utils/delay.ts new file mode 100644 index 0000000..b7a032b --- /dev/null +++ b/packages/data-flow/src/utils/delay.ts @@ -0,0 +1,3 @@ +export function delay(ms: number): Promise { + return new Promise((resolve) => setTimeout(resolve, ms)); +} diff --git a/packages/data-flow/src/utils/index.ts b/packages/data-flow/src/utils/index.ts new file mode 100644 index 0000000..0fa9eb7 --- /dev/null +++ b/packages/data-flow/src/utils/index.ts @@ -0,0 +1,2 @@ +export * from "./queue.js"; +export * from "./delay.js"; diff --git a/packages/data-flow/src/utils/queue.ts b/packages/data-flow/src/utils/queue.ts new file mode 100644 index 0000000..703b125 --- /dev/null +++ b/packages/data-flow/src/utils/queue.ts @@ -0,0 +1,72 @@ +export interface IQueue { + push(...items: T[]): void; + pop(): T | undefined; + peek(): T | undefined; + get length(): number; + isEmpty(): boolean; +} + +export class Queue implements IQueue { + private buffer: (T | undefined)[]; + private head: number = 0; + private tail: number = 0; + private size: number = 0; + + constructor(initialCapacity: number = 5000) { + this.buffer = new Array(initialCapacity).fill(undefined) as (T | undefined)[]; + } + + push(...items: T[]): void { + const requiredCapacity = this.size + items.length; + if (requiredCapacity > this.buffer.length) { + const newCapacity = Math.max(this.buffer.length * 2, requiredCapacity * 1.5); + this.resize(Math.ceil(newCapacity)); + } + + for (const item of items) { + this.buffer[this.tail] = item; + this.tail = (this.tail + 1) % this.buffer.length; + this.size++; + } + } + + pop(): T | undefined { + if (this.size === 0) { + return undefined; + } + + const item = this.buffer[this.head]; + this.buffer[this.head] = undefined; + this.head = (this.head + 1) % this.buffer.length; + this.size--; + + // Only shrink when very empty and buffer is large + if (this.size < this.buffer.length / 8 && this.buffer.length > 16384) { + this.resize(this.buffer.length / 2); + } + + return item; + } + + peek(): T | undefined { + return this.buffer[this.head]; + } + + get length(): number { + return this.size; + } + + isEmpty(): boolean { + return this.size === 0; + } + + private resize(newCapacity: number): void { + const newBuffer = Array(Math.ceil(newCapacity)).fill(undefined) as (T | undefined)[]; + for (let i = 0; i < this.size; i++) { + newBuffer[i] = this.buffer[(this.head + i) % this.buffer.length]; + } + this.buffer = newBuffer; + this.head = 0; + this.tail = this.size; + } +} diff --git a/packages/indexer-client/test/unit/envioIndexerClient.spec.ts b/packages/indexer-client/test/unit/envioIndexerClient.spec.ts index 4e3aa9a..c084007 100644 --- a/packages/indexer-client/test/unit/envioIndexerClient.spec.ts +++ b/packages/indexer-client/test/unit/envioIndexerClient.spec.ts @@ -123,8 +123,8 @@ describe("EnvioIndexerClient", () => { expect(graphqlClient.request).toHaveBeenCalledWith( expect.any(String), // We can check the query string later if necessary { - chainId: 1n, - blockNumber: 12345n, + chainId: 1, + blockNumber: 12345, logIndex: 0, limit: 100, // Ensure the default limit is used }, diff --git a/packages/shared/src/external.ts b/packages/shared/src/external.ts index 31f2522..c3ed0c1 100644 --- a/packages/shared/src/external.ts +++ b/packages/shared/src/external.ts @@ -1,5 +1,5 @@ export type * from "./types/index.js"; -export type { Address } from "./internal.js"; +export type { Address, Hex } from "./internal.js"; export { NATIVE_TOKEN_ADDRESS, isNativeToken, diff --git a/packages/shared/src/internal.ts b/packages/shared/src/internal.ts index fae9241..1130d35 100644 --- a/packages/shared/src/internal.ts +++ b/packages/shared/src/internal.ts @@ -1,4 +1,4 @@ -export type { Address } from "viem"; +export type { Address, Hex } from "viem"; export { stringify } from "viem/utils"; export * from "./math/bignumber.js"; export * from "./types/index.js";