Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: bulk fetch metadata & prices for events batch #57

Merged
merged 11 commits into from
Jan 23, 2025
156 changes: 153 additions & 3 deletions packages/data-flow/src/orchestrator.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import { isNativeError } from "util/types";

import { IIndexerClient } from "@grants-stack-indexer/indexer-client";
import { TokenPrice } from "@grants-stack-indexer/pricing";
import {
existsHandler,
UnsupportedEventException,
Expand All @@ -13,6 +14,7 @@ import {
AnyIndexerFetchedEvent,
ChainId,
ContractName,
getToken,
Hex,
ILogger,
isAlloEvent,
Expand All @@ -23,6 +25,7 @@ import {
RetryStrategy,
StrategyEvent,
stringify,
Token,
} from "@grants-stack-indexer/shared";

import type { IEventsFetcher, IEventsRegistry, IStrategyRegistry } from "./interfaces/index.js";
Expand All @@ -31,6 +34,11 @@ import { EventsProcessor } from "./eventsProcessor.js";
import { InvalidEvent } from "./exceptions/index.js";
import { CoreDependencies, DataLoader, delay, IQueue, iStrategyAbi, Queue } from "./internal.js";

type TokenWithTimestamps = {
token: Token;
timestamps: number[];
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is the exact type of the timestamps? Maybe we should define a type like NumberMs to prevent conversion confusion

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

could be nice, i'll analyze how much impact it has on the codebase. if it's too big, i'll update the name to timestampsMs to be a bit clearer and leave the typing as tech debt

};

/**
* The Orchestrator is the central coordinator of the data flow system, managing the interaction between
* three main components:
Expand All @@ -42,6 +50,7 @@ import { CoreDependencies, DataLoader, delay, IQueue, iStrategyAbi, Queue } from
* The Orchestrator implements a continuous processing loop that:
*
* 1. Fetches batches of events from the indexer and stores them in an internal queue
* 1.5 Bulk fetches metadata and prices for the batch, improving performance by reducing the number of requests and parallelizing them
* 2. Processes each event from the queue:
* - For strategy events and PoolCreated from Allo contract, enhances them with strategyId
* - Forwards the event to the Events Processor which is in charge of delagating the processing of the event to the correct handler
Expand Down Expand Up @@ -116,7 +125,11 @@ export class Orchestrator {
while (!signal.aborted) {
let event: ProcessorEvent<ContractName, AnyEvent> | undefined;
try {
if (this.eventsQueue.isEmpty()) await this.enqueueEvents();
if (this.eventsQueue.isEmpty()) {
const events = await this.getNextEventsBatch();
await this.bulkFetchMetadataAndPricesForBatch(events);
await this.enqueueEvents(events);
}

event = this.eventsQueue.pop();

Expand Down Expand Up @@ -197,10 +210,66 @@ export class Orchestrator {
});
}

/**
* Extracts unique metadata ids from the events batch.
* @param events - Array of indexer fetched events to process
* @returns Array of unique metadata ids found in the events
*/
private getMetadataFromEvents(events: AnyIndexerFetchedEvent[]): string[] {
const ids = new Set<string>();

for (const event of events) {
if ("metadata" in event.params) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should we add an extra validation to check metadata structure

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

here isn't relevant as different events have different structure, I'd leave that to the event processor

ids.add(event.params.metadata[1]);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
ids.add(event.params.metadata[1]);
ids.add(event.params.metadata[1]);

is metadata[1] safe ?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes is safe to use it this way

}
}

return Array.from(ids);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

are u using a set to avoid repeats ?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

exactly

}

/**
* Extracts unique tokens from the events batch. Leaves out tokens with zero amount and sorts the timestamps.
* @param events - Array of indexer fetched events to process
* @returns Array of unique tokens with timestamps found in the events
*/
private getTokensFromEvents(events: AnyIndexerFetchedEvent[]): TokenWithTimestamps[] {
const tokenMap = new Map<string, TokenWithTimestamps>();

for (const event of events) {
if (
"token" in event.params &&
"amount" in event.params &&
BigInt(event.params.amount) > 0n
Comment on lines +239 to +242
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should we add runtime validation here ?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sorry what validation are u specifically referring to?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

typings runtime validations , same as metadata

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

will track in Linear to see where this validation fits the best in the system

) {
const token = getToken(this.chainId, event.params.token);
if (!token) continue;

const existing = tokenMap.get(token.address);
if (existing) {
existing.timestamps.push(event.blockTimestamp);
} else {
tokenMap.set(token.address, {
token,
timestamps: [event.blockTimestamp],
});
}
}
}

// Convert timestamps to unique sorted arrays
return Array.from(tokenMap.values()).map(({ token, timestamps }) => ({
token,
timestamps: [...new Set(timestamps)].sort((a, b) => a - b),
}));
}

/**
* Sometimes the TimestampsUpdated event is part of the _initialize() function of a strategy.
* In this case, the event is emitted before the PoolCreated event. We can safely ignore the error
* if the PoolCreated event is present in the same block.
* @param error - The error
* @param event - The event
* @returns True if the error should be ignored, false otherwise
*/
private shouldIgnoreTimestampsUpdatedError(
error: Error,
Expand All @@ -225,9 +294,10 @@ export class Orchestrator {
}

/**
* Enqueue new events from the events fetcher using the last processed event as a starting point
* Fetches the next events batch from the indexer
* @returns The next events batch
*/
private async enqueueEvents(): Promise<void> {
private async getNextEventsBatch(): Promise<AnyIndexerFetchedEvent[]> {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Docs

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

const lastProcessedEvent = await this.eventsRegistry.getLastProcessedEvent(this.chainId);
const blockNumber = lastProcessedEvent?.blockNumber ?? 0;
const logIndex = lastProcessedEvent?.logIndex ?? 0;
Expand All @@ -240,6 +310,34 @@ export class Orchestrator {
allowPartialLastBlock: false,
});

return events;
}

/**
* Clear pricing and metadata caches and bulk fetch metadata and prices for the batch
* @param events - The events batch
*/
private async bulkFetchMetadataAndPricesForBatch(
events: AnyIndexerFetchedEvent[],
): Promise<void> {
// Clear caches if the provider supports it
await this.dependencies.metadataProvider.clearCache?.();
await this.dependencies.pricingProvider.clearCache?.();

const metadataIds = this.getMetadataFromEvents(events);
const tokens = this.getTokensFromEvents(events);

await Promise.allSettled([
this.bulkFetchMetadata(metadataIds),
this.bulkFetchTokens(tokens),
]);
}

/**
* Enqueue events and updates new context of events by block number for the batch
* @param events - The events batch
*/
private async enqueueEvents(events: AnyIndexerFetchedEvent[]): Promise<void> {
// Clear previous context
this.eventsByBlockContext.clear();
for (const event of events) {
Expand All @@ -252,6 +350,58 @@ export class Orchestrator {
this.eventsQueue.push(...events);
}

/**
* Fetch all possible metadata for the batch.
* @param metadataIds - The metadata ids
* @returns The metadata
*/
private async bulkFetchMetadata(metadataIds: string[]): Promise<unknown[]> {
const results = await Promise.allSettled(
metadataIds.map((id) =>
this.retryHandler.execute(() =>
this.dependencies.metadataProvider.getMetadata<unknown>(id),
),
),
);

const metadata: unknown[] = [];
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the future we should try to parse or validate the metadata at sone basic level to be sure we can actually use it & make sure that we aren't typing it as unknown in the whole app

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

metadata parsing (using Zod schemas) is done in the processors, is not Orchestrator responsibility. metadata has different shapes depending on the event but the Orchestrator doesn't know this

for (const result of results) {
if (result.status === "fulfilled" && result.value) {
metadata.push(result.value);
}
}

return metadata;
}

/**
* Fetch all tokens prices
* @param tokens - The tokens with timestamps
* @returns The token prices
*/
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

complete natspecs

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

private async bulkFetchTokens(tokens: TokenWithTimestamps[]): Promise<TokenPrice[]> {
const results = await Promise.allSettled(
tokens.map(({ token, timestamps }) =>
this.retryHandler.execute(async () => {
const prices = await this.dependencies.pricingProvider.getTokenPrices(
token.priceSourceCode,
timestamps,
);
return prices;
}),
),
);

const tokenPrices: TokenPrice[] = [];
for (const result of results) {
if (result.status === "fulfilled" && result.value) {
tokenPrices.push(...result.value);
}
}

return tokenPrices;
}

private async handleEvent(event: ProcessorEvent<ContractName, AnyEvent>): Promise<void> {
event = await this.enhanceStrategyId(event);
if (this.isPoolCreated(event)) {
Expand Down
Loading
Loading