-
Notifications
You must be signed in to change notification settings - Fork 0
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: events orchestrator #21
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
huge PR, nice work!
import { AnyEvent, ContractName, ProcessorEvent } from "@grants-stack-indexer/shared"; | ||
|
||
/** | ||
* The events registry saves as a checkpoint to the last processed event by the system. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it saves the last processed event as a checkpoint? or the registry itself is a checkpoint for the last processed event?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the registry itself is the checkpoint, you query the registry for getting the last processed event (however at current MVP stage, since it's in memory on shutdown you lose tracking of course)
@@ -0,0 +1,231 @@ | |||
// class should contain the logic to orchestrate the data flow Events Fetcher -> Events Processor -> Data Loader |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
delete?
import { CoreDependencies, DataLoader, delay, IQueue, iStrategyAbi, Queue } from "./internal.js"; | ||
|
||
/** | ||
* The Orchestrator is the central coordinator of the data flow system, managing the interaction between |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nice explanation
eventsRegistry: IEventsRegistry; | ||
strategyRegistry: IStrategyRegistry; | ||
}, | ||
private fetchLimit: number = 1000, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is this limit the # of events? or the # of attempted fetches?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
limit the # of retrieved events on one call (pagination purposes)
strategyRegistry: IStrategyRegistry; | ||
}, | ||
private fetchLimit: number = 1000, | ||
private fetchDelay: number = 10000, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
in ms? might be good to specify somewhere for clarity
if (event.contractName === "Strategy" && "strategyId" in event) { | ||
if (!existsHandler(event.strategyId)) { | ||
//TODO: save to registry as unsupported strategy, so when the strategy is handled it will be backwards compatible and process all of the events | ||
console.log( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should be warn or error probably unless this is expected and then it could be debug
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this is expected, on a more advance stage when we implement the processors for a new type of strategy, the idea is that on application startup before long-running the orchestrator, is to process all those pending events so we don't need to fetch all events from the beginning)
private getStrategyAddress( | ||
event: ProcessorEvent<"Allo", "PoolCreated"> | ProcessorEvent<"Strategy", StrategyEvent>, | ||
): Address { | ||
return isAlloEvent(event) && event.eventName === "PoolCreated" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
seems this function is only for PoolCreated? I thought it should pull any type?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
there are two cases we need to add the "strategyId" to the event, on every "Strategy" event or on "PoolCreated" event for the Allo protocol (yeah really specific xd)
think this as a pre-processing where we enhance specific cases of events
private requiresStrategyId( | ||
event: ProcessorEvent<ContractName, AnyEvent>, | ||
): event is ProcessorEvent<"Allo", "PoolCreated"> | ProcessorEvent<"Strategy", StrategyEvent> { | ||
return (isAlloEvent(event) && event.eventName === "PoolCreated") || isStrategyEvent(event); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this is only for PoolCreated too? or will you be adding more event types?
import { EventsFetcher } from "./eventsFetcher.js"; | ||
import { EventsProcessor } from "./eventsProcessor.js"; | ||
import { InvalidEvent } from "./exceptions/index.js"; | ||
import { IEventsRegistry } from "./interfaces/eventsRegistry.interface.js"; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
import { IEventsRegistry } from "./interfaces/eventsRegistry.interface.js"; | |
import { IEventsRegistry } from "./interfaces/index.js"; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
one day IDE will correctly import them on first try and i will be really happy
import { InvalidEvent } from "./exceptions/index.js"; | ||
import { IEventsRegistry } from "./interfaces/eventsRegistry.interface.js"; | ||
import { IEventsFetcher } from "./interfaces/index.js"; | ||
import { IStrategyRegistry } from "./interfaces/strategyRegistry.interface.js"; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
import { IStrategyRegistry } from "./interfaces/strategyRegistry.interface.js"; | |
import { IStrategyRegistry } from "./interfaces/index.js"; |
* The Orchestrator is the central coordinator of the data flow system, managing the interaction between | ||
* three main components: | ||
* | ||
* 1. Events Fetcher: Retrieves blockchain events from the indexer service | ||
* 2. Events Processor: Processes and transforms raw events into domain events | ||
* 3. Data Loader: Persists the processed events into the database | ||
* | ||
* The Orchestrator implements a continuous processing loop that: | ||
* | ||
* 1. Fetches batches of events from the indexer and stores them in an internal queue | ||
* 2. Processes each event from the queue: | ||
* - For strategy events and PoolCreated from Allo contract, enhances them with strategyId | ||
* - Forwards the event to the Events Processor which is in charge of delagating the processing of the event to the correct handler | ||
* - Discards events for unsupported strategies or events | ||
* 3. Loads the processed events into the database via the Data Loader | ||
* | ||
* The Orchestrator provides fault tolerance and performance optimization through: | ||
* - Configurable batch sizes for event fetching | ||
* - Delayed processing to prevent overwhelming the system | ||
* - Error handling and logging for various failure scenarios | ||
* - Registry tracking of supported/unsupported strategies and events | ||
* |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
💎
/** | ||
* Fill the events queue with the events from the events fetcher | ||
*/ | ||
private async fillQueue(): Promise<void> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
enqueue ? 🤣
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
makes sense xd
* TODO: Handle unhandled strategies appropriately | ||
*/ | ||
export class Orchestrator { | ||
private readonly eventsQueue: IQueue<ProcessorEvent<ContractName, AnyEvent>>; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
just a tiny detail here. Do we need IQueue
? is something that implementation could be replaced in the future ? Just that, i am thinking out loud
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we won't probably but since we follow best practices, seemed nice here too. maybe we implement a super mega optimized new version of the Queue class idk
export type CoreDependencies = Pick< | ||
ProcessorDependencies, | ||
"evmProvider" | "pricingProvider" | "metadataProvider" | ||
> & { | ||
roundRepository: IRoundRepository; | ||
projectRepository: IProjectRepository; | ||
applicationRepository: IApplicationRepository; | ||
}; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Natspec and .... why do we need this type? i think it adds noise to the code since we can simply use ProcessorDependencies
. If there is no reason to have it lets get rid of it. :)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it's subtle but the difference is that Processor use the Read
interface, ie. they don't know about write operations on the Repository, but on the Orchestrator we need Read and Write
(maybe was over-optimization at this point but when we implement read-only PG connections this will be useful)
private tail: number = 0; | ||
private size: number = 0; | ||
|
||
constructor(initialCapacity: number = 5000) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
iam concerned about this capacity, should be the same as the max batch size of what we fetch on each iteration, dont know if it make sense to limit the queue capacity here, could lead in other issues
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
discussed offline but main reason of this implementation is that shift
method on Arrays is O(n)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
😁
🤖 Linear
Closes GIT-52 GIT-54
Description
data-flow
pipelines: Orchestrator, EventsProcessor, Registries (DataLoader and EventsFetcher were already implemented)processors
packageIndexer
typing from theProcessor
typing for eventsChecklist before requesting a review