diff --git a/packages/data-flow/src/orchestrator.ts b/packages/data-flow/src/orchestrator.ts index 16b6c21..407c20a 100644 --- a/packages/data-flow/src/orchestrator.ts +++ b/packages/data-flow/src/orchestrator.ts @@ -1,6 +1,7 @@ import { isNativeError } from "util/types"; import { IIndexerClient } from "@grants-stack-indexer/indexer-client"; +import { TokenPrice } from "@grants-stack-indexer/pricing"; import { existsHandler, UnsupportedEventException, @@ -13,6 +14,7 @@ import { AnyIndexerFetchedEvent, ChainId, ContractName, + getToken, Hex, ILogger, isAlloEvent, @@ -23,6 +25,7 @@ import { RetryStrategy, StrategyEvent, stringify, + Token, } from "@grants-stack-indexer/shared"; import type { IEventsFetcher, IEventsRegistry, IStrategyRegistry } from "./interfaces/index.js"; @@ -31,6 +34,11 @@ import { EventsProcessor } from "./eventsProcessor.js"; import { InvalidEvent } from "./exceptions/index.js"; import { CoreDependencies, DataLoader, delay, IQueue, iStrategyAbi, Queue } from "./internal.js"; +type TokenWithTimestamps = { + token: Token; + timestamps: number[]; +}; + /** * The Orchestrator is the central coordinator of the data flow system, managing the interaction between * three main components: @@ -42,6 +50,7 @@ import { CoreDependencies, DataLoader, delay, IQueue, iStrategyAbi, Queue } from * The Orchestrator implements a continuous processing loop that: * * 1. Fetches batches of events from the indexer and stores them in an internal queue + * 1.5 Bulk fetches metadata and prices for the batch, improving performance by reducing the number of requests and parallelizing them * 2. Processes each event from the queue: * - For strategy events and PoolCreated from Allo contract, enhances them with strategyId * - Forwards the event to the Events Processor which is in charge of delagating the processing of the event to the correct handler @@ -116,7 +125,11 @@ export class Orchestrator { while (!signal.aborted) { let event: ProcessorEvent | undefined; try { - if (this.eventsQueue.isEmpty()) await this.enqueueEvents(); + if (this.eventsQueue.isEmpty()) { + const events = await this.getNextEventsBatch(); + await this.bulkFetchMetadataAndPricesForBatch(events); + await this.enqueueEvents(events); + } event = this.eventsQueue.pop(); @@ -197,10 +210,66 @@ export class Orchestrator { }); } + /** + * Extracts unique metadata ids from the events batch. + * @param events - Array of indexer fetched events to process + * @returns Array of unique metadata ids found in the events + */ + private getMetadataFromEvents(events: AnyIndexerFetchedEvent[]): string[] { + const ids = new Set(); + + for (const event of events) { + if ("metadata" in event.params) { + ids.add(event.params.metadata[1]); + } + } + + return Array.from(ids); + } + + /** + * Extracts unique tokens from the events batch. Leaves out tokens with zero amount and sorts the timestamps. + * @param events - Array of indexer fetched events to process + * @returns Array of unique tokens with timestamps found in the events + */ + private getTokensFromEvents(events: AnyIndexerFetchedEvent[]): TokenWithTimestamps[] { + const tokenMap = new Map(); + + for (const event of events) { + if ( + "token" in event.params && + "amount" in event.params && + BigInt(event.params.amount) > 0n + ) { + const token = getToken(this.chainId, event.params.token); + if (!token) continue; + + const existing = tokenMap.get(token.address); + if (existing) { + existing.timestamps.push(event.blockTimestamp); + } else { + tokenMap.set(token.address, { + token, + timestamps: [event.blockTimestamp], + }); + } + } + } + + // Convert timestamps to unique sorted arrays + return Array.from(tokenMap.values()).map(({ token, timestamps }) => ({ + token, + timestamps: [...new Set(timestamps)].sort((a, b) => a - b), + })); + } + /** * Sometimes the TimestampsUpdated event is part of the _initialize() function of a strategy. * In this case, the event is emitted before the PoolCreated event. We can safely ignore the error * if the PoolCreated event is present in the same block. + * @param error - The error + * @param event - The event + * @returns True if the error should be ignored, false otherwise */ private shouldIgnoreTimestampsUpdatedError( error: Error, @@ -225,9 +294,10 @@ export class Orchestrator { } /** - * Enqueue new events from the events fetcher using the last processed event as a starting point + * Fetches the next events batch from the indexer + * @returns The next events batch */ - private async enqueueEvents(): Promise { + private async getNextEventsBatch(): Promise { const lastProcessedEvent = await this.eventsRegistry.getLastProcessedEvent(this.chainId); const blockNumber = lastProcessedEvent?.blockNumber ?? 0; const logIndex = lastProcessedEvent?.logIndex ?? 0; @@ -240,6 +310,34 @@ export class Orchestrator { allowPartialLastBlock: false, }); + return events; + } + + /** + * Clear pricing and metadata caches and bulk fetch metadata and prices for the batch + * @param events - The events batch + */ + private async bulkFetchMetadataAndPricesForBatch( + events: AnyIndexerFetchedEvent[], + ): Promise { + // Clear caches if the provider supports it + await this.dependencies.metadataProvider.clearCache?.(); + await this.dependencies.pricingProvider.clearCache?.(); + + const metadataIds = this.getMetadataFromEvents(events); + const tokens = this.getTokensFromEvents(events); + + await Promise.allSettled([ + this.bulkFetchMetadata(metadataIds), + this.bulkFetchTokens(tokens), + ]); + } + + /** + * Enqueue events and updates new context of events by block number for the batch + * @param events - The events batch + */ + private async enqueueEvents(events: AnyIndexerFetchedEvent[]): Promise { // Clear previous context this.eventsByBlockContext.clear(); for (const event of events) { @@ -252,6 +350,58 @@ export class Orchestrator { this.eventsQueue.push(...events); } + /** + * Fetch all possible metadata for the batch. + * @param metadataIds - The metadata ids + * @returns The metadata + */ + private async bulkFetchMetadata(metadataIds: string[]): Promise { + const results = await Promise.allSettled( + metadataIds.map((id) => + this.retryHandler.execute(() => + this.dependencies.metadataProvider.getMetadata(id), + ), + ), + ); + + const metadata: unknown[] = []; + for (const result of results) { + if (result.status === "fulfilled" && result.value) { + metadata.push(result.value); + } + } + + return metadata; + } + + /** + * Fetch all tokens prices + * @param tokens - The tokens with timestamps + * @returns The token prices + */ + private async bulkFetchTokens(tokens: TokenWithTimestamps[]): Promise { + const results = await Promise.allSettled( + tokens.map(({ token, timestamps }) => + this.retryHandler.execute(async () => { + const prices = await this.dependencies.pricingProvider.getTokenPrices( + token.priceSourceCode, + timestamps, + ); + return prices; + }), + ), + ); + + const tokenPrices: TokenPrice[] = []; + for (const result of results) { + if (result.status === "fulfilled" && result.value) { + tokenPrices.push(...result.value); + } + } + + return tokenPrices; + } + private async handleEvent(event: ProcessorEvent): Promise { event = await this.enhanceStrategyId(event); if (this.isPoolCreated(event)) { diff --git a/packages/data-flow/test/unit/orchestrator.spec.ts b/packages/data-flow/test/unit/orchestrator.spec.ts index 5989b59..1c48f22 100644 --- a/packages/data-flow/test/unit/orchestrator.spec.ts +++ b/packages/data-flow/test/unit/orchestrator.spec.ts @@ -1,8 +1,10 @@ -import { Address } from "viem"; +import { Address, zeroAddress } from "viem"; import { afterEach, beforeEach, describe, expect, it, vi } from "vitest"; import { EvmProvider } from "@grants-stack-indexer/chain-providers"; import { IIndexerClient } from "@grants-stack-indexer/indexer-client"; +import { IMetadataProvider } from "@grants-stack-indexer/metadata"; +import { IPricingProvider } from "@grants-stack-indexer/pricing"; import { Changeset, IApplicationPayoutRepository, @@ -16,16 +18,20 @@ import { import { RoundNotFoundForId } from "@grants-stack-indexer/repository/dist/src/internal.js"; import { AlloEvent, + AnyIndexerFetchedEvent, ChainId, ContractName, ContractToEventName, EventParams, ExponentialBackoff, Hex, + ICacheable, ILogger, ProcessorEvent, RateLimitError, StrategyEvent, + Token, + TokenCode, } from "@grants-stack-indexer/shared"; import { @@ -58,6 +64,8 @@ describe("Orchestrator", { sequential: true }, () => { let mockIndexerClient: IIndexerClient; let mockEventsRegistry: IEventsRegistry; let mockStrategyRegistry: IStrategyRegistry; + let mockPricingProvider: IPricingProvider & ICacheable; + let mockMetadataProvider: IMetadataProvider & ICacheable; let mockEvmProvider: EvmProvider; let abortController: AbortController; let runPromise: Promise | undefined; @@ -93,6 +101,17 @@ describe("Orchestrator", { sequential: true }, () => { readContract: vi.fn(), } as unknown as EvmProvider; + mockPricingProvider = { + getTokenPrice: vi.fn(), + getTokenPrices: vi.fn(), + clearCache: vi.fn(), + }; + + mockMetadataProvider = { + getMetadata: vi.fn(), + clearCache: vi.fn(), + }; + const dependencies: CoreDependencies = { evmProvider: mockEvmProvider, transactionManager: {} as unknown as ITransactionManager, @@ -101,12 +120,8 @@ describe("Orchestrator", { sequential: true }, () => { applicationRepository: {} as unknown as IApplicationRepository, donationRepository: {} as unknown as IDonationRepository, applicationPayoutRepository: {} as unknown as IApplicationPayoutRepository, - pricingProvider: { - getTokenPrice: vi.fn(), - }, - metadataProvider: { - getMetadata: vi.fn(), - }, + pricingProvider: mockPricingProvider, + metadataProvider: mockMetadataProvider, }; abortController = new AbortController(); @@ -755,6 +770,152 @@ describe("Orchestrator", { sequential: true }, () => { expect(logger.error).not.toHaveBeenCalled(); }); }); + + describe("getMetadataFromEvents", () => { + it("extracts unique metadata IDs from events", async () => { + const events = [ + { + params: { metadata: [1n, "id1"] }, + }, + { + params: { metadata: [1n, "id1"] }, // Duplicate + }, + { + params: { metadata: [1n, "id2"] }, + }, + { + params: { recipientAddress: "0x123" }, // No metadata + }, + ] as unknown as AnyIndexerFetchedEvent[]; + + const result = await orchestrator["getMetadataFromEvents"](events); + expect(result).toEqual(["id1", "id2"]); + }); + }); + + describe("getTokensFromEvents", () => { + const mockToken: Token = { + address: zeroAddress, + decimals: 18, + code: "ETH" as TokenCode, + priceSourceCode: "ETH" as TokenCode, + }; + + it("collects unique timestamps for each token", async () => { + const events = [ + { + params: { + token: zeroAddress, + amount: "1000000000000000000", // 1 ETH + }, + blockTimestamp: 1000, + }, + { + params: { + token: zeroAddress, + amount: "1000000000000000000", + }, + blockTimestamp: 1000, // Duplicate timestamp + }, + { + params: { + token: zeroAddress, + amount: "1000000000000000000", + }, + blockTimestamp: 2000, + }, + ] as unknown as AnyIndexerFetchedEvent[]; + + const result = await orchestrator["getTokensFromEvents"](events); + + expect(result).toEqual([ + { + token: mockToken, + timestamps: [1000, 2000], + }, + ]); + }); + + it("ignores events with zero amounts", async () => { + const events = [ + { + params: { + token: "0x123", + amount: "0", + }, + blockTimestamp: 1000, + }, + ] as unknown as AnyIndexerFetchedEvent[]; + + const result = await orchestrator["getTokensFromEvents"](events); + expect(result).toEqual([]); + }); + + it("ignores events with invalid tokens", async () => { + const events = [ + { + params: { + token: "0xInvalid", + amount: "1000000000000000000", + }, + blockTimestamp: 1000, + }, + ] as unknown as AnyIndexerFetchedEvent[]; + + const result = await orchestrator["getTokensFromEvents"](events); + expect(result).toEqual([]); + }); + }); + + describe("bulkFetchMetadataAndPricesForBatch", () => { + it("clears cache and fetches metadata and prices in parallel", async () => { + const events = [ + { + params: { + metadata: ["type", "id1"], + token: zeroAddress, + amount: "1000000000000000000", + }, + blockTimestamp: 1000, + }, + ] as unknown as AnyIndexerFetchedEvent[]; + + vi.spyOn(mockPricingProvider, "getTokenPrices").mockResolvedValue([ + { timestampMs: 1000, priceUsd: 1500 }, + ]); + + vi.spyOn(mockMetadataProvider, "getMetadata").mockResolvedValue({ name: "Test" }); + + await orchestrator["bulkFetchMetadataAndPricesForBatch"](events); + + expect(mockMetadataProvider.clearCache).toHaveBeenCalled(); + expect(mockMetadataProvider.getMetadata).toHaveBeenCalledWith("id1"); + expect(mockPricingProvider.getTokenPrices).toHaveBeenCalledWith("ETH", [1000]); + }); + + it("continues processing even if one fetch fails", async () => { + const events = [ + { + params: { + metadata: ["type", "id1"], + token: "0x123", + amount: "1000000000000000000", + }, + blockTimestamp: 1000, + }, + ] as unknown as AnyIndexerFetchedEvent[]; + + vi.spyOn(mockMetadataProvider, "getMetadata").mockRejectedValue( + new Error("Fetch failed"), + ); + vi.spyOn(mockPricingProvider, "getTokenPrices").mockResolvedValue([]); + + // Should not throw + await expect( + orchestrator["bulkFetchMetadataAndPricesForBatch"](events), + ).resolves.not.toThrow(); + }); + }); }); // Helper function to create mock events diff --git a/packages/data-flow/test/unit/retroactiveProcessor.spec.ts b/packages/data-flow/test/unit/retroactiveProcessor.spec.ts index e9353ef..97ae098 100644 --- a/packages/data-flow/test/unit/retroactiveProcessor.spec.ts +++ b/packages/data-flow/test/unit/retroactiveProcessor.spec.ts @@ -144,6 +144,7 @@ describe("RetroactiveProcessor", () => { transactionManager: {} as ITransactionManager, pricingProvider: { getTokenPrice: vi.fn(), + getTokenPrices: vi.fn(), }, metadataProvider: { getMetadata: vi.fn(), diff --git a/packages/metadata/src/providers/ipfs.provider.ts b/packages/metadata/src/providers/ipfs.provider.ts index 3399faa..ae4bdf0 100644 --- a/packages/metadata/src/providers/ipfs.provider.ts +++ b/packages/metadata/src/providers/ipfs.provider.ts @@ -8,6 +8,7 @@ import { EmptyGatewaysUrlsException, InvalidContentException, isValidCid } from export class IpfsProvider implements IMetadataProvider { private readonly axiosInstance: AxiosInstance; + private currentGatewayIndex: number = 0; constructor( private readonly gateways: string[], @@ -30,7 +31,9 @@ export class IpfsProvider implements IMetadataProvider { return undefined; } - for (const gateway of this.gateways) { + const startIndex = this.currentGatewayIndex; + for (let i = 0; i < this.gateways.length; i++) { + const gateway = this.gateways[(startIndex + i) % this.gateways.length]!; const url = `${gateway}/ipfs/${ipfsCid}`; try { //TODO: retry policy for each gateway diff --git a/packages/pricing/src/constants/index.ts b/packages/pricing/src/constants/index.ts new file mode 100644 index 0000000..188876d --- /dev/null +++ b/packages/pricing/src/constants/index.ts @@ -0,0 +1,3 @@ +// This is the minimum granularity we can get data with on Enterprise plans of Coingecko +// Refer to https://support.coingecko.com/hc/en-us/articles/4538747001881-What-granularity-do-you-support-for-historical-data +export const MIN_GRANULARITY_MS = 300_000; // 5 minutes diff --git a/packages/pricing/src/interfaces/pricing.interface.ts b/packages/pricing/src/interfaces/pricing.interface.ts index dca4dad..cfe738a 100644 --- a/packages/pricing/src/interfaces/pricing.interface.ts +++ b/packages/pricing/src/interfaces/pricing.interface.ts @@ -25,4 +25,13 @@ export interface IPricingProvider { startTimestampMs: number, endTimestampMs?: number, ): Promise; + + /** + * Retrieves all the prices of a token for specific timestamps. + * @param tokenCode - The code of the token. + * @param timestamps - Array of timestamps for which to retrieve prices. + * @returns A promise that resolves to the prices of the token for each requested timestamp. + * The returned prices may not have the exact timestamps requested. Depends on the implementation. + */ + getTokenPrices(tokenCode: TokenCode, timestamps: number[]): Promise; } diff --git a/packages/pricing/src/internal.ts b/packages/pricing/src/internal.ts index e1d4daa..7f6985a 100644 --- a/packages/pricing/src/internal.ts +++ b/packages/pricing/src/internal.ts @@ -1,4 +1,5 @@ export * from "./types/index.js"; +export * from "./constants/index.js"; export * from "./interfaces/index.js"; export * from "./providers/index.js"; export * from "./exceptions/index.js"; diff --git a/packages/pricing/src/providers/cachingProxy.provider.ts b/packages/pricing/src/providers/cachingProxy.provider.ts index 7cf4700..c1adba0 100644 --- a/packages/pricing/src/providers/cachingProxy.provider.ts +++ b/packages/pricing/src/providers/cachingProxy.provider.ts @@ -3,6 +3,11 @@ import { ICacheable, ILogger, TokenCode } from "@grants-stack-indexer/shared"; import { IPricingProvider, TokenPrice } from "../internal.js"; +type CacheResult = { + timestampMs: number; + price: TokenPrice | undefined; +}; + /** * A pricing provider that caches token price lookups from the underlying provider. * When a price is requested, it first checks the cache. If found, returns the cached price. @@ -46,20 +51,21 @@ export class CachingPricingProvider implements IPricingProvider, ICacheable { ); if (price) { - try { - await this.cache.set( + // we don't await this, because it's not critical + this.cache + .set( { tokenCode, timestampMs: startTimestampMs, }, price, - ); - } catch (error) { - this.logger.debug( - `Failed to cache price for token ${tokenCode} at ${startTimestampMs}`, - { error }, - ); - } + ) + .catch((error) => { + this.logger.debug( + `Failed to cache price for token ${tokenCode} at ${startTimestampMs}`, + { error }, + ); + }); } return price; @@ -75,4 +81,190 @@ export class CachingPricingProvider implements IPricingProvider, ICacheable { }); } } + + /* @inheritdoc */ + /** + * Note: it caches the closest prices to the requested timestamps. + * Uses binary search to find the closest price for each requested timestamp. + */ + async getTokenPrices(tokenCode: TokenCode, timestamps: number[]): Promise { + if (timestamps.length === 0) return []; + + const cachedPrices = await this.getCachedPrices(tokenCode, timestamps); + const timestampsToFetch = this.getTimestampsToFetch(timestamps, cachedPrices); + + if (timestampsToFetch.length === 0) { + return cachedPrices + .filter( + (result): result is PromiseFulfilledResult => + result.status === "fulfilled" && !!result.value.price, + ) + .map((result) => result.value.price) + .filter((price): price is TokenPrice => !!price); + } + + const fetchedPrices = await this.provider.getTokenPrices(tokenCode, timestampsToFetch); + const sortedFetchedPrices = [...fetchedPrices].sort( + (a, b) => a.timestampMs - b.timestampMs, + ); + + const closestPrices = this.getClosestPricesWithCache( + tokenCode, + timestampsToFetch, + sortedFetchedPrices, + ); + + const priceMap = this.buildPriceMap(cachedPrices, closestPrices); + + return timestamps + .map((timestampMs) => priceMap.get(timestampMs)) + .filter((price): price is TokenPrice => !!price); + } + + /** + * Fetches cached prices for the given token and timestamps. + * @param tokenCode - The token code + * @param timestamps - The timestamps + * @returns {PromiseSettledResult[]} - The cached prices + */ + private async getCachedPrices( + tokenCode: TokenCode, + timestamps: number[], + ): Promise[]> { + return Promise.allSettled( + timestamps.map(async (timestampMs) => { + try { + return { + timestampMs, + price: await this.cache.get({ tokenCode, timestampMs }), + }; + } catch (error) { + this.logger.debug( + `Failed to get cached price for token ${tokenCode} at ${timestampMs}`, + { error }, + ); + return { timestampMs, price: undefined }; + } + }), + ); + } + + /** + * Gets the timestamps that need to be fetched from the provider. + * @param timestamps - The timestamps + * @param cachedPrices - The cached prices PromiseSettledResult + * @returns The timestamps that need to be fetched + */ + private getTimestampsToFetch( + timestamps: number[], + cachedPrices: PromiseSettledResult[], + ): number[] { + return timestamps.filter((_, index) => { + const result = cachedPrices[index]; + if (!result || result.status === "rejected") return true; + return !result.value.price; + }); + } + + /** + * Gets the closest price from the fetched prices. Updates the cache accordingly. + * @param tokenCode - The token code + * @param timestampsToFetch - The timestamps that need to be fetched + * @param sortedFetchedPrices - The sorted fetched prices + * @returns The closest prices + */ + private getClosestPricesWithCache( + tokenCode: TokenCode, + timestampsToFetch: number[], + sortedFetchedPrices: TokenPrice[], + ): TokenPrice[] { + return timestampsToFetch + .map((timestampMs) => { + const closestPrice = this.findClosestPrice(sortedFetchedPrices, timestampMs); + if (!closestPrice) return null; + + const price = { + timestampMs, + priceUsd: closestPrice.priceUsd, + }; + + // Fire and forget cache operation + this.cache.set({ tokenCode, timestampMs }, price).catch((error) => { + this.logger.debug( + `Failed to cache price for token ${tokenCode} at ${timestampMs}`, + { + error, + }, + ); + }); + + return price; + }) + .filter((price): price is TokenPrice => price !== null); + } + + /** + * Builds a price map from cached and fetched prices. + * @param cachedPrices - The cached prices + * @param closestPrices - The fetched prices + * @returns The price map with all prices + */ + private buildPriceMap( + cachedPrices: PromiseSettledResult[], + closestPrices: TokenPrice[], + ): Map { + const priceMap = new Map(); + + // Add cached prices + cachedPrices.forEach((result) => { + if (result.status === "fulfilled" && result.value.price) { + priceMap.set(result.value.timestampMs, result.value.price); + } + }); + + // Add closest prices + closestPrices.forEach((price) => { + priceMap.set(price.timestampMs, price); + }); + + return priceMap; + } + + /** + * Find the closest price using binary search + * @param prices - Array of prices sorted by timestamp + * @param targetTimestamp - The timestamp to find closest match for + * @returns The closest matching price or null if no prices available + */ + private findClosestPrice(prices: TokenPrice[], targetTimestamp: number): TokenPrice | null { + if (prices.length === 0) { + return null; + } + + // Handle edge cases + if (targetTimestamp <= prices[0]!.timestampMs) return prices[0]!; + if (targetTimestamp >= prices[prices.length - 1]!.timestampMs) + return prices[prices.length - 1]!; + + // Binary search + let left = 0; + let right = prices.length - 1; + + while (left + 1 < right) { + const mid = Math.floor((left + right) / 2); + + if (prices[mid]!.timestampMs === targetTimestamp) { + return prices[mid]!; + } + + if (prices[mid]!.timestampMs < targetTimestamp) { + left = mid; + } else { + right = mid; + } + } + + // Return the floor value (largest timestamp <= target) + return prices[left]!; + } } diff --git a/packages/pricing/src/providers/coingecko.provider.ts b/packages/pricing/src/providers/coingecko.provider.ts index e1d5296..7a3efcb 100644 --- a/packages/pricing/src/providers/coingecko.provider.ts +++ b/packages/pricing/src/providers/coingecko.provider.ts @@ -13,6 +13,7 @@ import { IPricingProvider } from "../interfaces/index.js"; import { CoingeckoPriceChartData, CoingeckoTokenId, + MIN_GRANULARITY_MS, TokenPrice, UnknownPricingException, UnsupportedToken, @@ -126,7 +127,7 @@ export class CoingeckoProvider implements IPricingProvider { }; } catch (error: unknown) { if (isAxiosError(error)) { - this.handleAxiosError(error, path); + this.handleAxiosError(error, path, "getTokenPrice"); } const errorMessage = @@ -143,10 +144,58 @@ export class CoingeckoProvider implements IPricingProvider { } } - private handleAxiosError(error: AxiosError, path: string): void { + /* @inheritdoc */ + async getTokenPrices(tokenCode: TokenCode, timestamps: number[]): Promise { + const tokenId = TokenMapping[tokenCode]; + if (!tokenId) { + throw new UnsupportedToken(tokenCode, { + className: CoingeckoProvider.name, + methodName: "getTokenPrices", + }); + } + + if (timestamps.length === 0) { + return []; + } + + const effectiveMin = Math.min(...timestamps); + let effectiveMax = Math.max(...timestamps); + + if (effectiveMax - effectiveMin < MIN_GRANULARITY_MS) { + effectiveMax = effectiveMin + MIN_GRANULARITY_MS; + } + + const path = `/coins/${tokenId}/market_chart/range?vs_currency=usd&from=${effectiveMin}&to=${effectiveMax}&precision=full&interval=5m`; + + try { + const { data } = await this.axios.get(path); + return data.prices.map(([timestampMs, priceUsd]) => ({ + timestampMs, + priceUsd, + })); + } catch (error: unknown) { + if (isAxiosError(error)) { + this.handleAxiosError(error, path, "getTokenPrices"); + } + + const errorMessage = + `Unknown Coingecko API error: failed to fetch token price ` + + stringify(error, Object.getOwnPropertyNames(error)); + + throw new UnknownPricingException(errorMessage, { + className: CoingeckoProvider.name, + methodName: "getTokenPrices", + additionalData: { + path, + }, + }); + } + } + + private handleAxiosError(error: AxiosError, path: string, methodName: string): void { const errorContext = { className: CoingeckoProvider.name, - methodName: "getTokenPrice", + methodName, additionalData: { path, }, diff --git a/packages/pricing/src/providers/dummy.provider.ts b/packages/pricing/src/providers/dummy.provider.ts index df6b045..94ee6d3 100644 --- a/packages/pricing/src/providers/dummy.provider.ts +++ b/packages/pricing/src/providers/dummy.provider.ts @@ -21,4 +21,12 @@ export class DummyPricingProvider implements IPricingProvider { timestampMs: startTimestampMs, }); } + + /* @inheritdoc */ + async getTokenPrices(_tokenCode: TokenCode, timestamps: number[]): Promise { + return timestamps.map((timestampMs) => ({ + priceUsd: this.dummyPrice, + timestampMs, + })); + } } diff --git a/packages/pricing/test/providers/cachingProxy.provider.spec.ts b/packages/pricing/test/providers/cachingProxy.provider.spec.ts index 1e80581..313bf8f 100644 --- a/packages/pricing/test/providers/cachingProxy.provider.spec.ts +++ b/packages/pricing/test/providers/cachingProxy.provider.spec.ts @@ -1,7 +1,7 @@ import { beforeEach, describe, expect, it, vi } from "vitest"; import { ICache, PriceCacheKey } from "@grants-stack-indexer/repository"; -import { ILogger, TokenCode } from "@grants-stack-indexer/shared"; +import { ICacheable, ILogger, TokenCode } from "@grants-stack-indexer/shared"; import { IPricingProvider, TokenPrice } from "../../src/internal.js"; import { CachingPricingProvider } from "../../src/providers/cachingProxy.provider.js"; @@ -9,11 +9,14 @@ import { CachingPricingProvider } from "../../src/providers/cachingProxy.provide describe("CachingPricingProvider", () => { const mockProvider = { getTokenPrice: vi.fn(), - } as unknown as IPricingProvider; + getTokenPrices: vi.fn(), + clearCache: vi.fn(), + } as unknown as IPricingProvider & ICacheable; const mockCache = { get: vi.fn(), - set: vi.fn(), + set: vi.fn().mockImplementation(() => Promise.resolve()), + clearCache: vi.fn(), } as unknown as ICache; const mockLogger = { diff --git a/packages/pricing/test/providers/coingecko.provider.spec.ts b/packages/pricing/test/providers/coingecko.provider.spec.ts index f72e595..38bb757 100644 --- a/packages/pricing/test/providers/coingecko.provider.spec.ts +++ b/packages/pricing/test/providers/coingecko.provider.spec.ts @@ -179,4 +179,45 @@ describe("CoingeckoProvider", () => { ).rejects.toThrow(NetworkError); }); }); + + describe("getTokenPrices", () => { + it("handles empty timestamps array", async () => { + const result = await provider.getTokenPrices("ETH" as TokenCode, []); + expect(result).toEqual([]); + }); + + it("fetches prices within minimum granularity", async () => { + const timestamps = [1000, 1100]; // Less than MIN_GRANULARITY_MS apart + mock.get.mockResolvedValue({ + data: { + prices: [ + [1000, 1500], + [1100, 1600], + ], + }, + }); + + await provider.getTokenPrices("ETH" as TokenCode, timestamps); + expect(mock.get).toHaveBeenCalledWith(expect.stringContaining(`&interval=5m`)); + }); + + it("throws UnsupportedToken for unknown token", async () => { + await expect(provider.getTokenPrices("UNKNOWN" as TokenCode, [1000])).rejects.toThrow( + UnsupportedToken, + ); + }); + + it("handles rate limiting errors", async () => { + mock.get.mockRejectedValueOnce({ + status: 429, + data: "Rate limit exceeded", + isAxiosError: true, + response: { headers: { "retry-after": "60" } }, + }); + + await expect(provider.getTokenPrices("ETH" as TokenCode, [1000])).rejects.toThrow( + RateLimitError, + ); + }); + }); }); diff --git a/packages/pricing/test/providers/dummy.provider.spec.ts b/packages/pricing/test/providers/dummy.provider.spec.ts index 160361b..7c77dfd 100644 --- a/packages/pricing/test/providers/dummy.provider.spec.ts +++ b/packages/pricing/test/providers/dummy.provider.spec.ts @@ -15,4 +15,25 @@ describe("DummyPricingProvider", () => { timestampMs: 11111111, }); }); + + describe("getTokenPrices", () => { + it("returns dummy prices for all timestamps", async () => { + const provider = new DummyPricingProvider(1); + const timestamps = [1000, 2000, 3000]; + + const result = await provider.getTokenPrices("ETH" as TokenCode, timestamps); + expect(result).toEqual( + timestamps.map((ts) => ({ + timestampMs: ts, + priceUsd: 1, + })), + ); + }); + + it("handles empty timestamps array", async () => { + const provider = new DummyPricingProvider(); + const result = await provider.getTokenPrices("ETH" as TokenCode, []); + expect(result).toEqual([]); + }); + }); }); diff --git a/packages/processors/test/allo/handlers/poolCreated.handler.spec.ts b/packages/processors/test/allo/handlers/poolCreated.handler.spec.ts index 67c23e0..8d53490 100644 --- a/packages/processors/test/allo/handlers/poolCreated.handler.spec.ts +++ b/packages/processors/test/allo/handlers/poolCreated.handler.spec.ts @@ -56,6 +56,7 @@ describe("PoolCreatedHandler", () => { } as unknown as EvmProvider; mockPricingProvider = { getTokenPrice: vi.fn(), + getTokenPrices: vi.fn(), }; mockMetadataProvider = { getMetadata: vi.fn(), diff --git a/packages/processors/test/allo/handlers/poolFunded.handler.spec.ts b/packages/processors/test/allo/handlers/poolFunded.handler.spec.ts index 7588472..98d5153 100644 --- a/packages/processors/test/allo/handlers/poolFunded.handler.spec.ts +++ b/packages/processors/test/allo/handlers/poolFunded.handler.spec.ts @@ -56,6 +56,7 @@ describe("PoolFundedHandler", () => { } as unknown as IRoundReadRepository; mockPricingProvider = { getTokenPrice: vi.fn(), + getTokenPrices: vi.fn(), }; mockLogger = { error: vi.fn(), diff --git a/packages/processors/test/allo/handlers/poolMetadataUpdated.handler.spec.ts b/packages/processors/test/allo/handlers/poolMetadataUpdated.handler.spec.ts index b33441e..cb1aee2 100644 --- a/packages/processors/test/allo/handlers/poolMetadataUpdated.handler.spec.ts +++ b/packages/processors/test/allo/handlers/poolMetadataUpdated.handler.spec.ts @@ -68,6 +68,7 @@ describe("PoolMetadataUpdatedHandler", () => { } as unknown as ILogger; mockPricingProvider = { getTokenPrice: vi.fn(), + getTokenPrices: vi.fn(), }; }); diff --git a/packages/processors/test/strategy/directAllocation/handlers/directAllocated.handler.spec.ts b/packages/processors/test/strategy/directAllocation/handlers/directAllocated.handler.spec.ts index a177e51..60354e7 100644 --- a/packages/processors/test/strategy/directAllocation/handlers/directAllocated.handler.spec.ts +++ b/packages/processors/test/strategy/directAllocation/handlers/directAllocated.handler.spec.ts @@ -41,6 +41,7 @@ describe("DirectAllocatedHandler", () => { } as unknown as IRoundReadRepository; mockPricingProvider = { getTokenPrice: vi.fn(), + getTokenPrices: vi.fn(), } as IPricingProvider; mockProjectRepository = { getProjectByIdOrThrow: vi.fn(), diff --git a/packages/processors/test/strategy/directGrantsLite/directGrantsLite.handler.spec.ts b/packages/processors/test/strategy/directGrantsLite/directGrantsLite.handler.spec.ts index 2af0acd..c26807e 100644 --- a/packages/processors/test/strategy/directGrantsLite/directGrantsLite.handler.spec.ts +++ b/packages/processors/test/strategy/directGrantsLite/directGrantsLite.handler.spec.ts @@ -84,6 +84,7 @@ describe("DirectGrantsLiteStrategyHandler", () => { } as unknown as EvmProvider; mockPricingProvider = { getTokenPrice: vi.fn(), + getTokenPrices: vi.fn(), } as IPricingProvider; mockApplicationRepository = {} as IApplicationReadRepository; dependencies = { diff --git a/packages/processors/test/strategy/directGrantsLite/handlers/allocated.handler.spec.ts b/packages/processors/test/strategy/directGrantsLite/handlers/allocated.handler.spec.ts index b174bc4..1039f66 100644 --- a/packages/processors/test/strategy/directGrantsLite/handlers/allocated.handler.spec.ts +++ b/packages/processors/test/strategy/directGrantsLite/handlers/allocated.handler.spec.ts @@ -41,6 +41,7 @@ describe("DGLiteAllocatedHandler", () => { } as unknown as IApplicationRepository; mockPricingProvider = { getTokenPrice: vi.fn(), + getTokenPrices: vi.fn(), } as IPricingProvider; }); diff --git a/packages/processors/test/strategy/donationVotingMerkleDistributionDirectTransfer/dvmdDirectTransfer.handler.spec.ts b/packages/processors/test/strategy/donationVotingMerkleDistributionDirectTransfer/dvmdDirectTransfer.handler.spec.ts index 171de57..520c734 100644 --- a/packages/processors/test/strategy/donationVotingMerkleDistributionDirectTransfer/dvmdDirectTransfer.handler.spec.ts +++ b/packages/processors/test/strategy/donationVotingMerkleDistributionDirectTransfer/dvmdDirectTransfer.handler.spec.ts @@ -112,6 +112,7 @@ describe("DVMDDirectTransferHandler", () => { } as unknown as EvmProvider; mockPricingProvider = { getTokenPrice: vi.fn(), + getTokenPrices: vi.fn(), } as IPricingProvider; mockApplicationRepository = {} as IApplicationReadRepository; handler = new DVMDDirectTransferStrategyHandler(mockChainId, { diff --git a/packages/processors/test/strategy/donationVotingMerkleDistributionDirectTransfer/handlers/allocated.handler.spec.ts b/packages/processors/test/strategy/donationVotingMerkleDistributionDirectTransfer/handlers/allocated.handler.spec.ts index f7b0017..cd7a004 100644 --- a/packages/processors/test/strategy/donationVotingMerkleDistributionDirectTransfer/handlers/allocated.handler.spec.ts +++ b/packages/processors/test/strategy/donationVotingMerkleDistributionDirectTransfer/handlers/allocated.handler.spec.ts @@ -46,6 +46,7 @@ describe("DVMDAllocatedHandler", () => { } as unknown as IApplicationRepository; mockPricingProvider = { getTokenPrice: vi.fn(), + getTokenPrices: vi.fn(), } as IPricingProvider; }); diff --git a/packages/shared/src/retry/retry.ts b/packages/shared/src/retry/retry.ts index 7134e2b..e627c06 100644 --- a/packages/shared/src/retry/retry.ts +++ b/packages/shared/src/retry/retry.ts @@ -28,11 +28,12 @@ export class RetryHandler { async execute( operation: () => Promise, params: { abortSignal?: AbortSignal } = {}, - ): Promise { + ): Promise { + let result: T | undefined; let attemptCount = 0; while (true && !params.abortSignal?.aborted) { try { - await operation(); + result = await operation(); break; } catch (error) { if (!(error instanceof RetriableError)) { @@ -61,5 +62,7 @@ export class RetryHandler { if (params.abortSignal?.aborted) { throw new Error("Operation aborted"); } + + return result; } } diff --git a/packages/shared/test/retry/retry.spec.ts b/packages/shared/test/retry/retry.spec.ts index 7e4ae6f..b68a306 100644 --- a/packages/shared/test/retry/retry.spec.ts +++ b/packages/shared/test/retry/retry.spec.ts @@ -29,10 +29,21 @@ describe("RetryHandler", () => { const handler = new RetryHandler(new ExponentialBackoff(), mockLogger); const operation = vi.fn().mockResolvedValue("success"); - await handler.execute(operation); + const result = await handler.execute(operation); expect(operation).toHaveBeenCalledTimes(1); expect(mockLogger.debug).not.toHaveBeenCalled(); + expect(result).toBe("success"); + }); + + it("handles undefined result from operation", async () => { + const handler = new RetryHandler(new ExponentialBackoff(), mockLogger); + const operation = vi.fn().mockResolvedValue(undefined); + + const result = await handler.execute(operation); + + expect(result).toBeUndefined(); + expect(operation).toHaveBeenCalledTimes(1); }); it("retries on RetriableError and succeeds", async () => {