Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: events orchestrator #21

Merged
merged 9 commits into from
Nov 1, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions packages/data-flow/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,11 @@
"test:cov": "vitest run --config vitest.config.ts --coverage"
},
"dependencies": {
"@grants-stack-indexer/chain-providers": "workspace:*",
"@grants-stack-indexer/indexer-client": "workspace:*",
"@grants-stack-indexer/metadata": "workspace:*",
"@grants-stack-indexer/pricing": "workspace:*",
"@grants-stack-indexer/processors": "workspace:*",
"@grants-stack-indexer/repository": "workspace:*",
"@grants-stack-indexer/shared": "workspace:*",
"viem": "2.21.19"
Expand Down
1 change: 1 addition & 0 deletions packages/data-flow/src/abis/index.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
export * from "./strategy.abi.js";
225 changes: 225 additions & 0 deletions packages/data-flow/src/abis/strategy.abi.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,225 @@
export const iStrategyAbi = [
{
type: "function",
name: "allocate",
inputs: [
{ name: "_data", type: "bytes", internalType: "bytes" },
{ name: "_sender", type: "address", internalType: "address" },
],
outputs: [],
stateMutability: "payable",
},
{
type: "function",
name: "distribute",
inputs: [
{ name: "_recipientIds", type: "address[]", internalType: "address[]" },
{ name: "_data", type: "bytes", internalType: "bytes" },
{ name: "_sender", type: "address", internalType: "address" },
],
outputs: [],
stateMutability: "nonpayable",
},
{
type: "function",
name: "getAllo",
inputs: [],
outputs: [{ name: "", type: "address", internalType: "contract IAllo" }],
stateMutability: "view",
},
{
type: "function",
name: "getPayouts",
inputs: [
{ name: "_recipientIds", type: "address[]", internalType: "address[]" },
{ name: "_data", type: "bytes[]", internalType: "bytes[]" },
],
outputs: [
{
name: "",
type: "tuple[]",
internalType: "struct IStrategy.PayoutSummary[]",
components: [
{
name: "recipientAddress",
type: "address",
internalType: "address",
},
{ name: "amount", type: "uint256", internalType: "uint256" },
],
},
],
stateMutability: "view",
},
{
type: "function",
name: "getPoolAmount",
inputs: [],
outputs: [{ name: "", type: "uint256", internalType: "uint256" }],
stateMutability: "view",
},
{
type: "function",
name: "getPoolId",
inputs: [],
outputs: [{ name: "", type: "uint256", internalType: "uint256" }],
stateMutability: "view",
},
{
type: "function",
name: "getRecipientStatus",
inputs: [{ name: "_recipientId", type: "address", internalType: "address" }],
outputs: [{ name: "", type: "uint8", internalType: "enum IStrategy.Status" }],
stateMutability: "view",
},
{
type: "function",
name: "getStrategyId",
inputs: [],
outputs: [{ name: "", type: "bytes32", internalType: "bytes32" }],
stateMutability: "view",
},
{
type: "function",
name: "increasePoolAmount",
inputs: [{ name: "_amount", type: "uint256", internalType: "uint256" }],
outputs: [],
stateMutability: "nonpayable",
},
{
type: "function",
name: "initialize",
inputs: [
{ name: "_poolId", type: "uint256", internalType: "uint256" },
{ name: "_data", type: "bytes", internalType: "bytes" },
],
outputs: [],
stateMutability: "nonpayable",
},
{
type: "function",
name: "isPoolActive",
inputs: [],
outputs: [{ name: "", type: "bool", internalType: "bool" }],
stateMutability: "nonpayable",
},
{
type: "function",
name: "isValidAllocator",
inputs: [{ name: "_allocator", type: "address", internalType: "address" }],
outputs: [{ name: "", type: "bool", internalType: "bool" }],
stateMutability: "view",
},
{
type: "function",
name: "registerRecipient",
inputs: [
{ name: "_data", type: "bytes", internalType: "bytes" },
{ name: "_sender", type: "address", internalType: "address" },
],
outputs: [{ name: "", type: "address", internalType: "address" }],
stateMutability: "payable",
},
{
type: "event",
name: "Allocated",
inputs: [
{
name: "recipientId",
type: "address",
indexed: true,
internalType: "address",
},
{
name: "amount",
type: "uint256",
indexed: false,
internalType: "uint256",
},
{
name: "token",
type: "address",
indexed: false,
internalType: "address",
},
{
name: "sender",
type: "address",
indexed: false,
internalType: "address",
},
],
anonymous: false,
},
{
type: "event",
name: "Distributed",
inputs: [
{
name: "recipientId",
type: "address",
indexed: true,
internalType: "address",
},
{
name: "recipientAddress",
type: "address",
indexed: false,
internalType: "address",
},
{
name: "amount",
type: "uint256",
indexed: false,
internalType: "uint256",
},
{
name: "sender",
type: "address",
indexed: false,
internalType: "address",
},
],
anonymous: false,
},
{
type: "event",
name: "Initialized",
inputs: [
{
name: "poolId",
type: "uint256",
indexed: false,
internalType: "uint256",
},
{ name: "data", type: "bytes", indexed: false, internalType: "bytes" },
],
anonymous: false,
},
{
type: "event",
name: "PoolActive",
inputs: [{ name: "active", type: "bool", indexed: false, internalType: "bool" }],
anonymous: false,
},
{
type: "event",
name: "Registered",
inputs: [
{
name: "recipientId",
type: "address",
indexed: true,
internalType: "address",
},
{ name: "data", type: "bytes", indexed: false, internalType: "bytes" },
{
name: "sender",
type: "address",
indexed: false,
internalType: "address",
},
],
anonymous: false,
},
] as const;
8 changes: 4 additions & 4 deletions packages/data-flow/src/eventsFetcher.ts
Original file line number Diff line number Diff line change
@@ -1,17 +1,17 @@
import { IIndexerClient } from "@grants-stack-indexer/indexer-client";
import { AnyProtocolEvent } from "@grants-stack-indexer/shared";
import { AnyIndexerFetchedEvent, ChainId } from "@grants-stack-indexer/shared";

import { IEventsFetcher } from "./interfaces/index.js";

export class EventsFetcher implements IEventsFetcher {
constructor(private indexerClient: IIndexerClient) {}
/* @inheritdoc */
async fetchEventsByBlockNumberAndLogIndex(
chainId: bigint,
blockNumber: bigint,
chainId: ChainId,
blockNumber: number,
logIndex: number,
limit: number = 100,
): Promise<AnyProtocolEvent[]> {
): Promise<AnyIndexerFetchedEvent[]> {
return await this.indexerClient.getEventsAfterBlockNumberAndLogIndex(
chainId,
blockNumber,
Expand Down
55 changes: 55 additions & 0 deletions packages/data-flow/src/eventsProcessor.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
import type { Changeset } from "@grants-stack-indexer/repository";
import {
AlloProcessor,
ProcessorDependencies,
RegistryProcessor,
StrategyProcessor,
} from "@grants-stack-indexer/processors";
import {
AnyEvent,
ChainId,
ContractName,
isAlloEvent,
isRegistryEvent,
isStrategyEvent,
ProcessorEvent,
} from "@grants-stack-indexer/shared";

import { InvalidEvent } from "./exceptions/index.js";

/**
* EventsProcessor handles the processing of Allo V2 events by delegating them to the appropriate processor
* (Allo, Registry, or Strategy) based on the event type. Each processor generates changesets that represent
* the required database updates.
*/
export class EventsProcessor {
alloProcessor: AlloProcessor;
registryProcessor: RegistryProcessor;
strategyProcessor: StrategyProcessor;

constructor(chainId: ChainId, dependencies: Readonly<ProcessorDependencies>) {
this.alloProcessor = new AlloProcessor(chainId, dependencies);
this.registryProcessor = new RegistryProcessor(chainId, dependencies);
this.strategyProcessor = new StrategyProcessor(chainId, dependencies);
}

/**
* Process an Allo V2 event and return the changesets
* @param event - The event to process
* @returns The changesets
* @throws InvalidEvent if the event is not a valid Allo V2 event (Allo, Registry or Strategy)
*/
public async processEvent(event: ProcessorEvent<ContractName, AnyEvent>): Promise<Changeset[]> {
if (isAlloEvent(event)) {
return await this.alloProcessor.process(event);
}
if (isRegistryEvent(event)) {
return await this.registryProcessor.process(event);
}
if (isStrategyEvent(event)) {
return await this.strategyProcessor.process(event);
}

throw new InvalidEvent(event);
}
}
25 changes: 25 additions & 0 deletions packages/data-flow/src/eventsRegistry.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
import type { AnyEvent, ContractName, ProcessorEvent } from "@grants-stack-indexer/shared";

import type { IEventsRegistry } from "./internal.js";

/**
* Class to store the last processed event in memory
*/
//TODO: Implement storage version to persist the last processed event. we need to store it by chainId
export class InMemoryEventsRegistry implements IEventsRegistry {
private lastProcessedEvent: ProcessorEvent<ContractName, AnyEvent> | undefined;

/**
* @inheritdoc
*/
async getLastProcessedEvent(): Promise<ProcessorEvent<ContractName, AnyEvent> | undefined> {
return this.lastProcessedEvent;
}

/**
* @inheritdoc
*/
async saveLastProcessedEvent(event: ProcessorEvent<ContractName, AnyEvent>): Promise<void> {
this.lastProcessedEvent = event;
}
}
1 change: 1 addition & 0 deletions packages/data-flow/src/exceptions/index.ts
Original file line number Diff line number Diff line change
@@ -1 +1,2 @@
export * from "./invalidEvent.js";
export * from "./invalidChangeset.exception.js";
9 changes: 9 additions & 0 deletions packages/data-flow/src/exceptions/invalidEvent.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
import { AnyEvent, ContractName, ProcessorEvent, stringify } from "@grants-stack-indexer/shared";

export class InvalidEvent extends Error {
constructor(event: ProcessorEvent<ContractName, AnyEvent>) {
super(`Event couldn't be processed: ${stringify(event)}`);

this.name = "InvalidEvent";
}
}
8 changes: 4 additions & 4 deletions packages/data-flow/src/interfaces/eventsFetcher.interface.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { AnyProtocolEvent } from "@grants-stack-indexer/shared";
import { AnyIndexerFetchedEvent, ChainId } from "@grants-stack-indexer/shared";

/**
* Interface for the events fetcher
Expand All @@ -12,9 +12,9 @@ export interface IEventsFetcher {
* @param limit limit of events to fetch
*/
fetchEventsByBlockNumberAndLogIndex(
chainId: bigint,
blockNumber: bigint,
chainId: ChainId,
blockNumber: number,
logIndex: number,
limit?: number,
): Promise<AnyProtocolEvent[]>;
): Promise<AnyIndexerFetchedEvent[]>;
}
18 changes: 18 additions & 0 deletions packages/data-flow/src/interfaces/eventsRegistry.interface.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
import { AnyEvent, ContractName, ProcessorEvent } from "@grants-stack-indexer/shared";

/**
* The events registry saves as a checkpoint to the last processed event by the system.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it saves the last processed event as a checkpoint? or the registry itself is a checkpoint for the last processed event?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the registry itself is the checkpoint, you query the registry for getting the last processed event (however at current MVP stage, since it's in memory on shutdown you lose tracking of course)

* This is used to resume the indexing from the last processed event in case of an error or temporary interruption.
*/
export interface IEventsRegistry {
/**
* Get the last processed event by the system
* @returns The last processed event or undefined if no event has been processed yet.
*/
getLastProcessedEvent(): Promise<ProcessorEvent<ContractName, AnyEvent> | undefined>;
/**
* Save the last processed event by the system
* @param event - The event to save.
*/
saveLastProcessedEvent(event: ProcessorEvent<ContractName, AnyEvent>): Promise<void>;
}
2 changes: 2 additions & 0 deletions packages/data-flow/src/interfaces/index.ts
Original file line number Diff line number Diff line change
@@ -1,2 +1,4 @@
export * from "./eventsFetcher.interface.js";
export * from "./dataLoader.interface.js";
export * from "./eventsRegistry.interface.js";
export * from "./strategyRegistry.interface.js";
Loading