-
Notifications
You must be signed in to change notification settings - Fork 0
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: bulk fetch metadata & prices for events batch #57
Changes from all commits
fd49ca0
c2a140e
189ee91
f97a4f8
50196d6
a8f1426
de2cdaf
b2202fa
9d423f9
6126ed6
deff69e
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change | ||||
---|---|---|---|---|---|---|
@@ -1,6 +1,7 @@ | ||||||
import { isNativeError } from "util/types"; | ||||||
|
||||||
import { IIndexerClient } from "@grants-stack-indexer/indexer-client"; | ||||||
import { TokenPrice } from "@grants-stack-indexer/pricing"; | ||||||
import { | ||||||
existsHandler, | ||||||
UnsupportedEventException, | ||||||
|
@@ -13,6 +14,7 @@ import { | |||||
AnyIndexerFetchedEvent, | ||||||
ChainId, | ||||||
ContractName, | ||||||
getToken, | ||||||
Hex, | ||||||
ILogger, | ||||||
isAlloEvent, | ||||||
|
@@ -23,6 +25,7 @@ import { | |||||
RetryStrategy, | ||||||
StrategyEvent, | ||||||
stringify, | ||||||
Token, | ||||||
} from "@grants-stack-indexer/shared"; | ||||||
|
||||||
import type { IEventsFetcher, IEventsRegistry, IStrategyRegistry } from "./interfaces/index.js"; | ||||||
|
@@ -31,6 +34,11 @@ import { EventsProcessor } from "./eventsProcessor.js"; | |||||
import { InvalidEvent } from "./exceptions/index.js"; | ||||||
import { CoreDependencies, DataLoader, delay, IQueue, iStrategyAbi, Queue } from "./internal.js"; | ||||||
|
||||||
type TokenWithTimestamps = { | ||||||
token: Token; | ||||||
timestamps: number[]; | ||||||
}; | ||||||
|
||||||
/** | ||||||
* The Orchestrator is the central coordinator of the data flow system, managing the interaction between | ||||||
* three main components: | ||||||
|
@@ -42,6 +50,7 @@ import { CoreDependencies, DataLoader, delay, IQueue, iStrategyAbi, Queue } from | |||||
* The Orchestrator implements a continuous processing loop that: | ||||||
* | ||||||
* 1. Fetches batches of events from the indexer and stores them in an internal queue | ||||||
* 1.5 Bulk fetches metadata and prices for the batch, improving performance by reducing the number of requests and parallelizing them | ||||||
* 2. Processes each event from the queue: | ||||||
* - For strategy events and PoolCreated from Allo contract, enhances them with strategyId | ||||||
* - Forwards the event to the Events Processor which is in charge of delagating the processing of the event to the correct handler | ||||||
|
@@ -116,7 +125,11 @@ export class Orchestrator { | |||||
while (!signal.aborted) { | ||||||
let event: ProcessorEvent<ContractName, AnyEvent> | undefined; | ||||||
try { | ||||||
if (this.eventsQueue.isEmpty()) await this.enqueueEvents(); | ||||||
if (this.eventsQueue.isEmpty()) { | ||||||
const events = await this.getNextEventsBatch(); | ||||||
await this.bulkFetchMetadataAndPricesForBatch(events); | ||||||
await this.enqueueEvents(events); | ||||||
} | ||||||
|
||||||
event = this.eventsQueue.pop(); | ||||||
|
||||||
|
@@ -197,10 +210,66 @@ export class Orchestrator { | |||||
}); | ||||||
} | ||||||
|
||||||
/** | ||||||
* Extracts unique metadata ids from the events batch. | ||||||
* @param events - Array of indexer fetched events to process | ||||||
* @returns Array of unique metadata ids found in the events | ||||||
*/ | ||||||
private getMetadataFromEvents(events: AnyIndexerFetchedEvent[]): string[] { | ||||||
const ids = new Set<string>(); | ||||||
|
||||||
for (const event of events) { | ||||||
if ("metadata" in event.params) { | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. should we add an extra validation to check There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. here isn't relevant as different events have different structure, I'd leave that to the event processor |
||||||
ids.add(event.params.metadata[1]); | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
is metadata[1] safe ? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. yes is safe to use it this way |
||||||
} | ||||||
} | ||||||
|
||||||
return Array.from(ids); | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. are u using a set to avoid repeats ? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. exactly |
||||||
} | ||||||
|
||||||
/** | ||||||
* Extracts unique tokens from the events batch. Leaves out tokens with zero amount and sorts the timestamps. | ||||||
* @param events - Array of indexer fetched events to process | ||||||
* @returns Array of unique tokens with timestamps found in the events | ||||||
*/ | ||||||
private getTokensFromEvents(events: AnyIndexerFetchedEvent[]): TokenWithTimestamps[] { | ||||||
const tokenMap = new Map<string, TokenWithTimestamps>(); | ||||||
|
||||||
for (const event of events) { | ||||||
if ( | ||||||
"token" in event.params && | ||||||
"amount" in event.params && | ||||||
BigInt(event.params.amount) > 0n | ||||||
Comment on lines
+239
to
+242
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. should we add runtime validation here ? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. sorry what validation are u specifically referring to? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. typings runtime validations , same as metadata There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. will track in Linear to see where this validation fits the best in the system |
||||||
) { | ||||||
const token = getToken(this.chainId, event.params.token); | ||||||
if (!token) continue; | ||||||
|
||||||
const existing = tokenMap.get(token.address); | ||||||
if (existing) { | ||||||
existing.timestamps.push(event.blockTimestamp); | ||||||
} else { | ||||||
tokenMap.set(token.address, { | ||||||
token, | ||||||
timestamps: [event.blockTimestamp], | ||||||
}); | ||||||
} | ||||||
} | ||||||
} | ||||||
|
||||||
// Convert timestamps to unique sorted arrays | ||||||
return Array.from(tokenMap.values()).map(({ token, timestamps }) => ({ | ||||||
token, | ||||||
timestamps: [...new Set(timestamps)].sort((a, b) => a - b), | ||||||
})); | ||||||
} | ||||||
|
||||||
/** | ||||||
* Sometimes the TimestampsUpdated event is part of the _initialize() function of a strategy. | ||||||
* In this case, the event is emitted before the PoolCreated event. We can safely ignore the error | ||||||
* if the PoolCreated event is present in the same block. | ||||||
* @param error - The error | ||||||
* @param event - The event | ||||||
* @returns True if the error should be ignored, false otherwise | ||||||
*/ | ||||||
private shouldIgnoreTimestampsUpdatedError( | ||||||
error: Error, | ||||||
|
@@ -225,9 +294,10 @@ export class Orchestrator { | |||||
} | ||||||
|
||||||
/** | ||||||
* Enqueue new events from the events fetcher using the last processed event as a starting point | ||||||
* Fetches the next events batch from the indexer | ||||||
* @returns The next events batch | ||||||
*/ | ||||||
private async enqueueEvents(): Promise<void> { | ||||||
private async getNextEventsBatch(): Promise<AnyIndexerFetchedEvent[]> { | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Docs There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. |
||||||
const lastProcessedEvent = await this.eventsRegistry.getLastProcessedEvent(this.chainId); | ||||||
const blockNumber = lastProcessedEvent?.blockNumber ?? 0; | ||||||
const logIndex = lastProcessedEvent?.logIndex ?? 0; | ||||||
|
@@ -240,6 +310,34 @@ export class Orchestrator { | |||||
allowPartialLastBlock: false, | ||||||
}); | ||||||
|
||||||
return events; | ||||||
} | ||||||
|
||||||
/** | ||||||
* Clear pricing and metadata caches and bulk fetch metadata and prices for the batch | ||||||
* @param events - The events batch | ||||||
*/ | ||||||
private async bulkFetchMetadataAndPricesForBatch( | ||||||
events: AnyIndexerFetchedEvent[], | ||||||
): Promise<void> { | ||||||
// Clear caches if the provider supports it | ||||||
await this.dependencies.metadataProvider.clearCache?.(); | ||||||
await this.dependencies.pricingProvider.clearCache?.(); | ||||||
|
||||||
const metadataIds = this.getMetadataFromEvents(events); | ||||||
const tokens = this.getTokensFromEvents(events); | ||||||
|
||||||
await Promise.allSettled([ | ||||||
this.bulkFetchMetadata(metadataIds), | ||||||
this.bulkFetchTokens(tokens), | ||||||
]); | ||||||
} | ||||||
|
||||||
/** | ||||||
* Enqueue events and updates new context of events by block number for the batch | ||||||
* @param events - The events batch | ||||||
*/ | ||||||
private async enqueueEvents(events: AnyIndexerFetchedEvent[]): Promise<void> { | ||||||
// Clear previous context | ||||||
this.eventsByBlockContext.clear(); | ||||||
for (const event of events) { | ||||||
|
@@ -252,6 +350,58 @@ export class Orchestrator { | |||||
this.eventsQueue.push(...events); | ||||||
} | ||||||
|
||||||
/** | ||||||
* Fetch all possible metadata for the batch. | ||||||
* @param metadataIds - The metadata ids | ||||||
* @returns The metadata | ||||||
*/ | ||||||
private async bulkFetchMetadata(metadataIds: string[]): Promise<unknown[]> { | ||||||
const results = await Promise.allSettled( | ||||||
metadataIds.map((id) => | ||||||
this.retryHandler.execute(() => | ||||||
this.dependencies.metadataProvider.getMetadata<unknown>(id), | ||||||
), | ||||||
), | ||||||
); | ||||||
|
||||||
const metadata: unknown[] = []; | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. In the future we should try to parse or validate the metadata at sone basic level to be sure we can actually use it & make sure that we aren't typing it as unknown in the whole app There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. metadata parsing (using Zod schemas) is done in the processors, is not Orchestrator responsibility. metadata has different shapes depending on the event but the Orchestrator doesn't know this |
||||||
for (const result of results) { | ||||||
if (result.status === "fulfilled" && result.value) { | ||||||
metadata.push(result.value); | ||||||
} | ||||||
} | ||||||
|
||||||
return metadata; | ||||||
} | ||||||
|
||||||
/** | ||||||
* Fetch all tokens prices | ||||||
* @param tokens - The tokens with timestamps | ||||||
* @returns The token prices | ||||||
*/ | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. complete natspecs There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. |
||||||
private async bulkFetchTokens(tokens: TokenWithTimestamps[]): Promise<TokenPrice[]> { | ||||||
const results = await Promise.allSettled( | ||||||
tokens.map(({ token, timestamps }) => | ||||||
this.retryHandler.execute(async () => { | ||||||
const prices = await this.dependencies.pricingProvider.getTokenPrices( | ||||||
token.priceSourceCode, | ||||||
timestamps, | ||||||
); | ||||||
return prices; | ||||||
}), | ||||||
), | ||||||
); | ||||||
|
||||||
const tokenPrices: TokenPrice[] = []; | ||||||
for (const result of results) { | ||||||
if (result.status === "fulfilled" && result.value) { | ||||||
tokenPrices.push(...result.value); | ||||||
} | ||||||
} | ||||||
|
||||||
return tokenPrices; | ||||||
} | ||||||
|
||||||
private async handleEvent(event: ProcessorEvent<ContractName, AnyEvent>): Promise<void> { | ||||||
event = await this.enhanceStrategyId(event); | ||||||
if (this.isPoolCreated(event)) { | ||||||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What is the exact type of the timestamps? Maybe we should define a type like NumberMs to prevent conversion confusion
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
could be nice, i'll analyze how much impact it has on the codebase. if it's too big, i'll update the name to
timestampsMs
to be a bit clearer and leave the typing as tech debt