Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: events orchestrator #21

Merged
merged 9 commits into from
Nov 1, 2024
Merged

feat: events orchestrator #21

merged 9 commits into from
Nov 1, 2024

Conversation

0xnigir1
Copy link
Collaborator

@0xnigir1 0xnigir1 commented Oct 31, 2024

🤖 Linear

Closes GIT-52 GIT-54

Description

  • Remaining core components of data-flow pipelines: Orchestrator, EventsProcessor, Registries (DataLoader and EventsFetcher were already implemented)
  • Orchestrator is the core component which orchestrates the flow of the events from "Fetching", "Processing" and "Loading" in DB. It's a long lived process that polls events from the EventsFetcher, pushed them to a Queue and process them sequentially
  • EventsProcessor is in charge of delegating the event to the corresponding processor from processors package
  • InMemory implementations of EventsRegistry and StrategyRegistry
  • Refactor of Events typing to decouple the Indexer typing from the Processor typing for events
  • Simple error handling that consists of logging error and discarding those events

Checklist before requesting a review

  • I have conducted a self-review of my code.
  • I have conducted a QA.
  • If it is a core feature, I have included comprehensive tests.

Copy link

linear bot commented Oct 31, 2024

Copy link
Collaborator

@jahabeebs jahabeebs left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

huge PR, nice work!

import { AnyEvent, ContractName, ProcessorEvent } from "@grants-stack-indexer/shared";

/**
* The events registry saves as a checkpoint to the last processed event by the system.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it saves the last processed event as a checkpoint? or the registry itself is a checkpoint for the last processed event?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the registry itself is the checkpoint, you query the registry for getting the last processed event (however at current MVP stage, since it's in memory on shutdown you lose tracking of course)

@@ -0,0 +1,231 @@
// class should contain the logic to orchestrate the data flow Events Fetcher -> Events Processor -> Data Loader
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

delete?

import { CoreDependencies, DataLoader, delay, IQueue, iStrategyAbi, Queue } from "./internal.js";

/**
* The Orchestrator is the central coordinator of the data flow system, managing the interaction between
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nice explanation

eventsRegistry: IEventsRegistry;
strategyRegistry: IStrategyRegistry;
},
private fetchLimit: number = 1000,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is this limit the # of events? or the # of attempted fetches?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

limit the # of retrieved events on one call (pagination purposes)

strategyRegistry: IStrategyRegistry;
},
private fetchLimit: number = 1000,
private fetchDelay: number = 10000,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

in ms? might be good to specify somewhere for clarity

if (event.contractName === "Strategy" && "strategyId" in event) {
if (!existsHandler(event.strategyId)) {
//TODO: save to registry as unsupported strategy, so when the strategy is handled it will be backwards compatible and process all of the events
console.log(
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should be warn or error probably unless this is expected and then it could be debug

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is expected, on a more advance stage when we implement the processors for a new type of strategy, the idea is that on application startup before long-running the orchestrator, is to process all those pending events so we don't need to fetch all events from the beginning)

private getStrategyAddress(
event: ProcessorEvent<"Allo", "PoolCreated"> | ProcessorEvent<"Strategy", StrategyEvent>,
): Address {
return isAlloEvent(event) && event.eventName === "PoolCreated"
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

seems this function is only for PoolCreated? I thought it should pull any type?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

there are two cases we need to add the "strategyId" to the event, on every "Strategy" event or on "PoolCreated" event for the Allo protocol (yeah really specific xd)
think this as a pre-processing where we enhance specific cases of events

private requiresStrategyId(
event: ProcessorEvent<ContractName, AnyEvent>,
): event is ProcessorEvent<"Allo", "PoolCreated"> | ProcessorEvent<"Strategy", StrategyEvent> {
return (isAlloEvent(event) && event.eventName === "PoolCreated") || isStrategyEvent(event);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is only for PoolCreated too? or will you be adding more event types?

import { EventsFetcher } from "./eventsFetcher.js";
import { EventsProcessor } from "./eventsProcessor.js";
import { InvalidEvent } from "./exceptions/index.js";
import { IEventsRegistry } from "./interfaces/eventsRegistry.interface.js";
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
import { IEventsRegistry } from "./interfaces/eventsRegistry.interface.js";
import { IEventsRegistry } from "./interfaces/index.js";

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

one day IDE will correctly import them on first try and i will be really happy

import { InvalidEvent } from "./exceptions/index.js";
import { IEventsRegistry } from "./interfaces/eventsRegistry.interface.js";
import { IEventsFetcher } from "./interfaces/index.js";
import { IStrategyRegistry } from "./interfaces/strategyRegistry.interface.js";
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
import { IStrategyRegistry } from "./interfaces/strategyRegistry.interface.js";
import { IStrategyRegistry } from "./interfaces/index.js";

Comment on lines +29 to +50
* The Orchestrator is the central coordinator of the data flow system, managing the interaction between
* three main components:
*
* 1. Events Fetcher: Retrieves blockchain events from the indexer service
* 2. Events Processor: Processes and transforms raw events into domain events
* 3. Data Loader: Persists the processed events into the database
*
* The Orchestrator implements a continuous processing loop that:
*
* 1. Fetches batches of events from the indexer and stores them in an internal queue
* 2. Processes each event from the queue:
* - For strategy events and PoolCreated from Allo contract, enhances them with strategyId
* - Forwards the event to the Events Processor which is in charge of delagating the processing of the event to the correct handler
* - Discards events for unsupported strategies or events
* 3. Loads the processed events into the database via the Data Loader
*
* The Orchestrator provides fault tolerance and performance optimization through:
* - Configurable batch sizes for event fetching
* - Delayed processing to prevent overwhelming the system
* - Error handling and logging for various failure scenarios
* - Registry tracking of supported/unsupported strategies and events
*
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💎

/**
* Fill the events queue with the events from the events fetcher
*/
private async fillQueue(): Promise<void> {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

enqueue ? 🤣

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

makes sense xd

* TODO: Handle unhandled strategies appropriately
*/
export class Orchestrator {
private readonly eventsQueue: IQueue<ProcessorEvent<ContractName, AnyEvent>>;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

just a tiny detail here. Do we need IQueue ? is something that implementation could be replaced in the future ? Just that, i am thinking out loud

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we won't probably but since we follow best practices, seemed nice here too. maybe we implement a super mega optimized new version of the Queue class idk

Comment on lines +20 to +27
export type CoreDependencies = Pick<
ProcessorDependencies,
"evmProvider" | "pricingProvider" | "metadataProvider"
> & {
roundRepository: IRoundRepository;
projectRepository: IProjectRepository;
applicationRepository: IApplicationRepository;
};
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Natspec and .... why do we need this type? i think it adds noise to the code since we can simply use ProcessorDependencies. If there is no reason to have it lets get rid of it. :)

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it's subtle but the difference is that Processor use the Read interface, ie. they don't know about write operations on the Repository, but on the Orchestrator we need Read and Write

(maybe was over-optimization at this point but when we implement read-only PG connections this will be useful)

private tail: number = 0;
private size: number = 0;

constructor(initialCapacity: number = 5000) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

iam concerned about this capacity, should be the same as the max batch size of what we fetch on each iteration, dont know if it make sense to limit the queue capacity here, could lead in other issues

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

discussed offline but main reason of this implementation is that shift method on Arrays is O(n)

@0xnigir1 0xnigir1 requested review from 0xkenj1 and jahabeebs November 1, 2024 17:18
Copy link
Collaborator

@jahabeebs jahabeebs left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

😁

@0xnigir1 0xnigir1 merged commit eb14792 into dev Nov 1, 2024
6 checks passed
@0xnigir1 0xnigir1 deleted the feat/events-orchestrator branch November 1, 2024 18:03
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants