-
Notifications
You must be signed in to change notification settings - Fork 0
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: bulk fetch metadata & prices for events batch #57
Changes from 8 commits
fd49ca0
c2a140e
189ee91
f97a4f8
50196d6
a8f1426
de2cdaf
b2202fa
9d423f9
6126ed6
deff69e
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change | ||||
---|---|---|---|---|---|---|
@@ -1,6 +1,7 @@ | ||||||
import { isNativeError } from "util/types"; | ||||||
|
||||||
import { IIndexerClient } from "@grants-stack-indexer/indexer-client"; | ||||||
import { TokenPrice } from "@grants-stack-indexer/pricing"; | ||||||
import { | ||||||
existsHandler, | ||||||
UnsupportedEventException, | ||||||
|
@@ -13,6 +14,7 @@ import { | |||||
AnyIndexerFetchedEvent, | ||||||
ChainId, | ||||||
ContractName, | ||||||
getToken, | ||||||
Hex, | ||||||
ILogger, | ||||||
isAlloEvent, | ||||||
|
@@ -23,6 +25,7 @@ import { | |||||
RetryStrategy, | ||||||
StrategyEvent, | ||||||
stringify, | ||||||
Token, | ||||||
} from "@grants-stack-indexer/shared"; | ||||||
|
||||||
import type { IEventsFetcher, IEventsRegistry, IStrategyRegistry } from "./interfaces/index.js"; | ||||||
|
@@ -31,6 +34,11 @@ import { EventsProcessor } from "./eventsProcessor.js"; | |||||
import { InvalidEvent } from "./exceptions/index.js"; | ||||||
import { CoreDependencies, DataLoader, delay, IQueue, iStrategyAbi, Queue } from "./internal.js"; | ||||||
|
||||||
type TokenWithTimestamps = { | ||||||
token: Token; | ||||||
timestamps: number[]; | ||||||
}; | ||||||
|
||||||
/** | ||||||
* The Orchestrator is the central coordinator of the data flow system, managing the interaction between | ||||||
* three main components: | ||||||
|
@@ -116,7 +124,11 @@ export class Orchestrator { | |||||
while (!signal.aborted) { | ||||||
let event: ProcessorEvent<ContractName, AnyEvent> | undefined; | ||||||
try { | ||||||
if (this.eventsQueue.isEmpty()) await this.enqueueEvents(); | ||||||
if (this.eventsQueue.isEmpty()) { | ||||||
const events = await this.getNextEventsBatch(); | ||||||
await this.bulkFetchMetadataAndPricesForBatch(events); | ||||||
await this.enqueueEvents(events); | ||||||
} | ||||||
|
||||||
event = this.eventsQueue.pop(); | ||||||
|
||||||
|
@@ -197,6 +209,51 @@ export class Orchestrator { | |||||
}); | ||||||
} | ||||||
|
||||||
private async getMetadataFromEvents(events: AnyIndexerFetchedEvent[]): Promise<string[]> { | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. natspec There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. |
||||||
const ids = new Set<string>(); | ||||||
|
||||||
for (const event of events) { | ||||||
if ("metadata" in event.params) { | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. should we add an extra validation to check There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. here isn't relevant as different events have different structure, I'd leave that to the event processor |
||||||
ids.add(event.params.metadata[1]); | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
is metadata[1] safe ? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. yes is safe to use it this way |
||||||
} | ||||||
} | ||||||
|
||||||
return Array.from(ids); | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. are u using a set to avoid repeats ? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. exactly |
||||||
} | ||||||
|
||||||
private async getTokensFromEvents( | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. natspec There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. |
||||||
events: AnyIndexerFetchedEvent[], | ||||||
): Promise<TokenWithTimestamps[]> { | ||||||
const tokenMap = new Map<string, TokenWithTimestamps>(); | ||||||
|
||||||
for (const event of events) { | ||||||
if ( | ||||||
"token" in event.params && | ||||||
"amount" in event.params && | ||||||
BigInt(event.params.amount) > 0n | ||||||
Comment on lines
+239
to
+242
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. should we add runtime validation here ? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. sorry what validation are u specifically referring to? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. typings runtime validations , same as metadata There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. will track in Linear to see where this validation fits the best in the system |
||||||
) { | ||||||
const token = getToken(this.chainId, event.params.token); | ||||||
if (!token) continue; | ||||||
|
||||||
const existing = tokenMap.get(token.address); | ||||||
if (existing) { | ||||||
existing.timestamps.push(event.blockTimestamp); | ||||||
} else { | ||||||
tokenMap.set(token.address, { | ||||||
token, | ||||||
timestamps: [event.blockTimestamp], | ||||||
}); | ||||||
} | ||||||
} | ||||||
} | ||||||
|
||||||
// Convert timestamps to unique sorted arrays | ||||||
return Array.from(tokenMap.values()).map(({ token, timestamps }) => ({ | ||||||
token, | ||||||
timestamps: [...new Set(timestamps)].sort((a, b) => a - b), | ||||||
})); | ||||||
} | ||||||
|
||||||
/** | ||||||
* Sometimes the TimestampsUpdated event is part of the _initialize() function of a strategy. | ||||||
* In this case, the event is emitted before the PoolCreated event. We can safely ignore the error | ||||||
|
@@ -224,10 +281,7 @@ export class Orchestrator { | |||||
return false; | ||||||
} | ||||||
|
||||||
/** | ||||||
* Enqueue new events from the events fetcher using the last processed event as a starting point | ||||||
*/ | ||||||
private async enqueueEvents(): Promise<void> { | ||||||
private async getNextEventsBatch(): Promise<AnyIndexerFetchedEvent[]> { | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Docs There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. |
||||||
const lastProcessedEvent = await this.eventsRegistry.getLastProcessedEvent(this.chainId); | ||||||
const blockNumber = lastProcessedEvent?.blockNumber ?? 0; | ||||||
const logIndex = lastProcessedEvent?.logIndex ?? 0; | ||||||
|
@@ -240,6 +294,33 @@ export class Orchestrator { | |||||
allowPartialLastBlock: false, | ||||||
}); | ||||||
|
||||||
return events; | ||||||
} | ||||||
|
||||||
/** | ||||||
* Clear caches and fetch metadata and prices for the batch | ||||||
*/ | ||||||
private async bulkFetchMetadataAndPricesForBatch( | ||||||
events: AnyIndexerFetchedEvent[], | ||||||
): Promise<void> { | ||||||
// Clear caches | ||||||
if (this.dependencies.metadataProvider.clearCache) { | ||||||
await this.dependencies.metadataProvider.clearCache(); | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. token prices will live longer with a ttl , right ? i was wondering if a memory overflow could happen in this scenario, lets talk about this offline There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. fixed the issue as per discussed offline |
||||||
} | ||||||
|
||||||
const metadataIds = await this.getMetadataFromEvents(events); | ||||||
const tokens = await this.getTokensFromEvents(events); | ||||||
|
||||||
await Promise.allSettled([ | ||||||
this.bulkFetchMetadata(metadataIds), | ||||||
this.bulkFetchTokens(tokens), | ||||||
]); | ||||||
} | ||||||
|
||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 🛠️ Refactor suggestion Handle potential errors from While using Apply this diff to handle errors from - await Promise.allSettled([
+ const results = await Promise.allSettled([
this.bulkFetchMetadata(metadataIds),
this.bulkFetchTokens(tokens),
]);
+
+ results.forEach((result, index) => {
+ if (result.status === 'rejected') {
+ const operation = index === 0 ? 'bulkFetchMetadata' : 'bulkFetchTokens';
+ this.logger.error(`Error in ${operation}: ${result.reason}`, {
+ className: Orchestrator.name,
+ chainId: this.chainId,
+ });
+ }
+ });
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. we're fine without handling errors here, this is a performance optimization but in case some fetch fails, we would try one more time to fetch the data on the processor There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @0xnigir1 Understood, thanks for explaining the error handling strategy. I'll keep this in mind for future reviews. ✏️ Learnings added
|
||||||
/** | ||||||
* Enqueue events and updates new context for the batch | ||||||
*/ | ||||||
private async enqueueEvents(events: AnyIndexerFetchedEvent[]): Promise<void> { | ||||||
// Clear previous context | ||||||
this.eventsByBlockContext.clear(); | ||||||
for (const event of events) { | ||||||
|
@@ -252,6 +333,54 @@ export class Orchestrator { | |||||
this.eventsQueue.push(...events); | ||||||
} | ||||||
|
||||||
/** | ||||||
* Fetch all possible metadata for the batch | ||||||
*/ | ||||||
private async bulkFetchMetadata(metadataIds: string[]): Promise<unknown[]> { | ||||||
const results = await Promise.allSettled( | ||||||
metadataIds.map((id) => | ||||||
this.retryHandler.execute(() => | ||||||
this.dependencies.metadataProvider.getMetadata<unknown>(id), | ||||||
), | ||||||
), | ||||||
); | ||||||
|
||||||
const metadata: unknown[] = []; | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. In the future we should try to parse or validate the metadata at sone basic level to be sure we can actually use it & make sure that we aren't typing it as unknown in the whole app There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. metadata parsing (using Zod schemas) is done in the processors, is not Orchestrator responsibility. metadata has different shapes depending on the event but the Orchestrator doesn't know this |
||||||
for (const result of results) { | ||||||
if (result.status === "fulfilled" && result.value) { | ||||||
metadata.push(result.value); | ||||||
} | ||||||
} | ||||||
|
||||||
return metadata; | ||||||
} | ||||||
|
||||||
/** | ||||||
* Fetch all possible prices for the batch | ||||||
*/ | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. complete natspecs There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. |
||||||
private async bulkFetchTokens(tokens: TokenWithTimestamps[]): Promise<TokenPrice[]> { | ||||||
const results = await Promise.allSettled( | ||||||
tokens.map(({ token, timestamps }) => | ||||||
this.retryHandler.execute(async () => { | ||||||
const prices = await this.dependencies.pricingProvider.getTokenPrices( | ||||||
token.priceSourceCode, | ||||||
timestamps, | ||||||
); | ||||||
return prices; | ||||||
}), | ||||||
), | ||||||
); | ||||||
|
||||||
const tokenPrices: TokenPrice[] = []; | ||||||
for (const result of results) { | ||||||
if (result.status === "fulfilled" && result.value) { | ||||||
tokenPrices.push(...result.value); | ||||||
} | ||||||
} | ||||||
|
||||||
return tokenPrices; | ||||||
} | ||||||
|
||||||
private async handleEvent(event: ProcessorEvent<ContractName, AnyEvent>): Promise<void> { | ||||||
event = await this.enhanceStrategyId(event); | ||||||
if (this.isPoolCreated(event)) { | ||||||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What is the exact type of the timestamps? Maybe we should define a type like NumberMs to prevent conversion confusion
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
could be nice, i'll analyze how much impact it has on the codebase. if it's too big, i'll update the name to
timestampsMs
to be a bit clearer and leave the typing as tech debt