Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: bulk fetch metadata & prices for events batch #57

Merged
merged 11 commits into from
Jan 23, 2025
29 changes: 21 additions & 8 deletions apps/processing/src/services/sharedDependencies.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ import { CachingPricingProvider, PricingProviderFactory } from "@grants-stack-in
import {
createKyselyDatabase,
IEventRegistryRepository,
InMemoryMetadataCache,
InMemoryPricingCache,
IStrategyProcessingCheckpointRepository,
IStrategyRegistryRepository,
KyselyApplicationPayoutRepository,
Expand Down Expand Up @@ -71,22 +73,33 @@ export class SharedDependenciesService {
env.DATABASE_SCHEMA,
);
const pricingRepository = new KyselyPricingCache(kyselyDatabase, env.DATABASE_SCHEMA);
const pricingProvider = PricingProviderFactory.create(env, {
const internalPricingProvider = PricingProviderFactory.create(env, {
logger,
});
const cachedPricingProvider = new CachingPricingProvider(
pricingProvider,
const dbCachedPricingProvider = new CachingPricingProvider(
internalPricingProvider,
pricingRepository,
logger,
);

const pricingProvider = new CachingPricingProvider(
dbCachedPricingProvider,
new InMemoryPricingCache(),
logger,
);

const metadataRepository = new KyselyMetadataCache(kyselyDatabase, env.DATABASE_SCHEMA);
const metadataProvider = new IpfsProvider(env.IPFS_GATEWAYS_URL, logger);
const cachedMetadataProvider = new CachingMetadataProvider(
metadataProvider,
const internalMetadataProvider = new IpfsProvider(env.IPFS_GATEWAYS_URL, logger);
const dbCachedMetadataProvider = new CachingMetadataProvider(
internalMetadataProvider,
metadataRepository,
logger,
);
const metadataProvider = new CachingMetadataProvider(
dbCachedMetadataProvider,
new InMemoryMetadataCache(),
logger,
);

const eventRegistryRepository = new KyselyEventRegistryRepository(
kyselyDatabase,
Expand Down Expand Up @@ -118,9 +131,9 @@ export class SharedDependenciesService {
projectRepository,
roundRepository,
applicationRepository,
pricingProvider: cachedPricingProvider,
pricingProvider,
donationRepository,
metadataProvider: cachedMetadataProvider,
metadataProvider,
applicationPayoutRepository,
transactionManager,
},
Expand Down
2 changes: 2 additions & 0 deletions apps/processing/test/unit/sharedDependencies.service.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,8 @@ vi.mock("@grants-stack-indexer/repository", () => ({
KyselyTransactionManager: vi.fn(),
KyselyPricingCache: vi.fn(),
KyselyMetadataCache: vi.fn(),
InMemoryPricingCache: vi.fn(),
InMemoryMetadataCache: vi.fn(),
}));

vi.mock("@grants-stack-indexer/pricing", () => ({
Expand Down
139 changes: 134 additions & 5 deletions packages/data-flow/src/orchestrator.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import { isNativeError } from "util/types";

import { IIndexerClient } from "@grants-stack-indexer/indexer-client";
import { TokenPrice } from "@grants-stack-indexer/pricing";
import {
existsHandler,
UnsupportedEventException,
Expand All @@ -13,6 +14,7 @@ import {
AnyIndexerFetchedEvent,
ChainId,
ContractName,
getToken,
Hex,
ILogger,
isAlloEvent,
Expand All @@ -23,6 +25,7 @@ import {
RetryStrategy,
StrategyEvent,
stringify,
Token,
} from "@grants-stack-indexer/shared";

import type { IEventsFetcher, IEventsRegistry, IStrategyRegistry } from "./interfaces/index.js";
Expand All @@ -31,6 +34,11 @@ import { EventsProcessor } from "./eventsProcessor.js";
import { InvalidEvent } from "./exceptions/index.js";
import { CoreDependencies, DataLoader, delay, IQueue, iStrategyAbi, Queue } from "./internal.js";

type TokenWithTimestamps = {
token: Token;
timestamps: number[];
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is the exact type of the timestamps? Maybe we should define a type like NumberMs to prevent conversion confusion

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

could be nice, i'll analyze how much impact it has on the codebase. if it's too big, i'll update the name to timestampsMs to be a bit clearer and leave the typing as tech debt

};

/**
* The Orchestrator is the central coordinator of the data flow system, managing the interaction between
* three main components:
Expand Down Expand Up @@ -116,7 +124,11 @@ export class Orchestrator {
while (!signal.aborted) {
let event: ProcessorEvent<ContractName, AnyEvent> | undefined;
try {
if (this.eventsQueue.isEmpty()) await this.enqueueEvents();
if (this.eventsQueue.isEmpty()) {
const events = await this.getNextEventsBatch();
await this.bulkFetchMetadataAndPricesForBatch(events);
await this.enqueueEvents(events);
}

event = this.eventsQueue.pop();

Expand Down Expand Up @@ -197,6 +209,51 @@ export class Orchestrator {
});
}

private async getMetadataFromEvents(events: AnyIndexerFetchedEvent[]): Promise<string[]> {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

natspec

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

const ids = new Set<string>();

for (const event of events) {
if ("metadata" in event.params) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should we add an extra validation to check metadata structure

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

here isn't relevant as different events have different structure, I'd leave that to the event processor

ids.add(event.params.metadata[1]);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
ids.add(event.params.metadata[1]);
ids.add(event.params.metadata[1]);

is metadata[1] safe ?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes is safe to use it this way

}
}

return Array.from(ids);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

are u using a set to avoid repeats ?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

exactly

}

private async getTokensFromEvents(
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

natspec

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

events: AnyIndexerFetchedEvent[],
): Promise<TokenWithTimestamps[]> {
const tokenMap = new Map<string, TokenWithTimestamps>();

for (const event of events) {
if (
"token" in event.params &&
"amount" in event.params &&
BigInt(event.params.amount) > 0n
Comment on lines +239 to +242
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should we add runtime validation here ?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sorry what validation are u specifically referring to?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

typings runtime validations , same as metadata

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

will track in Linear to see where this validation fits the best in the system

) {
const token = getToken(this.chainId, event.params.token);
if (!token) continue;

const existing = tokenMap.get(token.address);
if (existing) {
existing.timestamps.push(event.blockTimestamp);
} else {
tokenMap.set(token.address, {
token,
timestamps: [event.blockTimestamp],
});
}
}
}

// Convert timestamps to unique sorted arrays
return Array.from(tokenMap.values()).map(({ token, timestamps }) => ({
token,
timestamps: [...new Set(timestamps)].sort((a, b) => a - b),
}));
}

/**
* Sometimes the TimestampsUpdated event is part of the _initialize() function of a strategy.
* In this case, the event is emitted before the PoolCreated event. We can safely ignore the error
Expand Down Expand Up @@ -224,10 +281,7 @@ export class Orchestrator {
return false;
}

/**
* Enqueue new events from the events fetcher using the last processed event as a starting point
*/
private async enqueueEvents(): Promise<void> {
private async getNextEventsBatch(): Promise<AnyIndexerFetchedEvent[]> {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Docs

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

const lastProcessedEvent = await this.eventsRegistry.getLastProcessedEvent(this.chainId);
const blockNumber = lastProcessedEvent?.blockNumber ?? 0;
const logIndex = lastProcessedEvent?.logIndex ?? 0;
Expand All @@ -240,6 +294,33 @@ export class Orchestrator {
allowPartialLastBlock: false,
});

return events;
}

/**
* Clear caches and fetch metadata and prices for the batch
*/
private async bulkFetchMetadataAndPricesForBatch(
events: AnyIndexerFetchedEvent[],
): Promise<void> {
// Clear caches
if (this.dependencies.metadataProvider.clearCache) {
await this.dependencies.metadataProvider.clearCache();
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

token prices will live longer with a ttl , right ? i was wondering if a memory overflow could happen in this scenario, lets talk about this offline

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed the issue as per discussed offline

}

const metadataIds = await this.getMetadataFromEvents(events);
const tokens = await this.getTokensFromEvents(events);

await Promise.allSettled([
this.bulkFetchMetadata(metadataIds),
this.bulkFetchTokens(tokens),
]);
}

Copy link

@coderabbitai coderabbitai bot Jan 22, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Handle potential errors from bulkFetchMetadata and bulkFetchTokens.

While using Promise.allSettled prevents unhandled promise rejections, you should inspect the results to handle any failures appropriately, such as logging errors or retrying failed operations.

Apply this diff to handle errors from bulkFetchMetadata and bulkFetchTokens:

-        await Promise.allSettled([
+        const results = await Promise.allSettled([
             this.bulkFetchMetadata(metadataIds),
             this.bulkFetchTokens(tokens),
         ]);
+
+        results.forEach((result, index) => {
+            if (result.status === 'rejected') {
+                const operation = index === 0 ? 'bulkFetchMetadata' : 'bulkFetchTokens';
+                this.logger.error(`Error in ${operation}: ${result.reason}`, {
+                    className: Orchestrator.name,
+                    chainId: this.chainId,
+                });
+            }
+        });

Committable suggestion skipped: line range outside the PR's diff.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we're fine without handling errors here, this is a performance optimization but in case some fetch fails, we would try one more time to fetch the data on the processor

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@0xnigir1 Understood, thanks for explaining the error handling strategy. I'll keep this in mind for future reviews.


✏️ Learnings added
Learnt from: 0xnigir1
PR: defi-wonderland/grants-stack-indexer-v2#57
File: packages/data-flow/src/orchestrator.ts:300-319
Timestamp: 2025-01-22T01:29:58.582Z
Learning: In the Orchestrator's bulkFetchMetadataAndPricesForBatch method, error handling is not required as it's a performance optimization. If the bulk fetch fails, the data will be fetched again during processing in the processor.

Note: Learnings are effective only in the context of similar code segments. To apply general coding guidelines, please configure review instructions. You can manage existing learnings in the app.

/**
* Enqueue events and updates new context for the batch
*/
private async enqueueEvents(events: AnyIndexerFetchedEvent[]): Promise<void> {
// Clear previous context
this.eventsByBlockContext.clear();
for (const event of events) {
Expand All @@ -252,6 +333,54 @@ export class Orchestrator {
this.eventsQueue.push(...events);
}

/**
* Fetch all possible metadata for the batch
*/
private async bulkFetchMetadata(metadataIds: string[]): Promise<unknown[]> {
const results = await Promise.allSettled(
metadataIds.map((id) =>
this.retryHandler.execute(() =>
this.dependencies.metadataProvider.getMetadata<unknown>(id),
),
),
);

const metadata: unknown[] = [];
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the future we should try to parse or validate the metadata at sone basic level to be sure we can actually use it & make sure that we aren't typing it as unknown in the whole app

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

metadata parsing (using Zod schemas) is done in the processors, is not Orchestrator responsibility. metadata has different shapes depending on the event but the Orchestrator doesn't know this

for (const result of results) {
if (result.status === "fulfilled" && result.value) {
metadata.push(result.value);
}
}

return metadata;
}

/**
* Fetch all possible prices for the batch
*/
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

complete natspecs

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

private async bulkFetchTokens(tokens: TokenWithTimestamps[]): Promise<TokenPrice[]> {
const results = await Promise.allSettled(
tokens.map(({ token, timestamps }) =>
this.retryHandler.execute(async () => {
const prices = await this.dependencies.pricingProvider.getTokenPrices(
token.priceSourceCode,
timestamps,
);
return prices;
}),
),
);

const tokenPrices: TokenPrice[] = [];
for (const result of results) {
if (result.status === "fulfilled" && result.value) {
tokenPrices.push(...result.value);
}
}

return tokenPrices;
}

private async handleEvent(event: ProcessorEvent<ContractName, AnyEvent>): Promise<void> {
event = await this.enhanceStrategyId(event);
if (this.isPoolCreated(event)) {
Expand Down
5 changes: 5 additions & 0 deletions packages/data-flow/src/types/index.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
import { IMetadataProvider } from "@grants-stack-indexer/metadata";
import { IPricingProvider } from "@grants-stack-indexer/pricing";
import { ProcessorDependencies } from "@grants-stack-indexer/processors";
import {
IApplicationPayoutRepository,
Expand All @@ -7,6 +9,7 @@ import {
IRoundRepository,
ITransactionManager,
} from "@grants-stack-indexer/repository";
import { ICacheable } from "@grants-stack-indexer/shared";

/**
* The core dependencies for the data flow
Expand All @@ -19,6 +22,8 @@ export type CoreDependencies = Pick<
ProcessorDependencies,
"evmProvider" | "pricingProvider" | "metadataProvider"
> & {
pricingProvider: IPricingProvider & Partial<ICacheable>;
metadataProvider: IMetadataProvider & Partial<ICacheable>;
roundRepository: IRoundRepository;
projectRepository: IProjectRepository;
applicationRepository: IApplicationRepository;
Expand Down
Loading