From fd49ca09b854b685e4bcf9aeb41b9b266d772a33 Mon Sep 17 00:00:00 2001 From: nigiri <168690269+0xnigir1@users.noreply.github.com> Date: Thu, 16 Jan 2025 15:48:07 -0300 Subject: [PATCH 1/9] feat: in memory cache for pricing and metadata --- .../services/sharedDependencies.service.ts | 21 ++++++++-- .../unit/sharedDependencies.service.spec.ts | 2 + packages/metadata/src/external.ts | 2 +- .../src/interfaces/metadata.interface.ts | 9 +++++ .../src/providers/cachingProxy.provider.ts | 15 ++++++- packages/pricing/src/external.ts | 2 +- .../src/interfaces/pricing.interface.ts | 9 +++++ .../src/providers/cachingProxy.provider.ts | 15 ++++++- .../processors/src/types/processor.types.ts | 8 ++-- packages/repository/src/external.ts | 7 +++- .../src/interfaces/cache.interface.ts | 6 +++ packages/repository/src/internal.ts | 1 + .../kysely/metadata.repository.ts | 5 +++ .../repositories/kysely/prices.repository.ts | 5 +++ .../src/repositories/memory/index.ts | 2 + .../memory/metadata.repository.ts | 24 +++++++++++ .../repositories/memory/prices.repository.ts | 40 +++++++++++++++++++ 17 files changed, 158 insertions(+), 15 deletions(-) create mode 100644 packages/repository/src/repositories/memory/index.ts create mode 100644 packages/repository/src/repositories/memory/metadata.repository.ts create mode 100644 packages/repository/src/repositories/memory/prices.repository.ts diff --git a/apps/processing/src/services/sharedDependencies.service.ts b/apps/processing/src/services/sharedDependencies.service.ts index 0c493e1..768b929 100644 --- a/apps/processing/src/services/sharedDependencies.service.ts +++ b/apps/processing/src/services/sharedDependencies.service.ts @@ -5,6 +5,8 @@ import { CachingPricingProvider, PricingProviderFactory } from "@grants-stack-in import { createKyselyDatabase, IEventRegistryRepository, + InMemoryMetadataCache, + InMemoryPricingCache, IStrategyProcessingCheckpointRepository, IStrategyRegistryRepository, KyselyApplicationPayoutRepository, @@ -74,19 +76,30 @@ export class SharedDependenciesService { const pricingProvider = PricingProviderFactory.create(env, { logger, }); - const cachedPricingProvider = new CachingPricingProvider( + const dbCachedPricingProvider = new CachingPricingProvider( pricingProvider, pricingRepository, logger, ); + const inMemoryCachedPricingProvider = new CachingPricingProvider( + dbCachedPricingProvider, + new InMemoryPricingCache(), + logger, + ); + const metadataRepository = new KyselyMetadataCache(kyselyDatabase, env.DATABASE_SCHEMA); const metadataProvider = new IpfsProvider(env.IPFS_GATEWAYS_URL, logger); - const cachedMetadataProvider = new CachingMetadataProvider( + const dbCachedMetadataProvider = new CachingMetadataProvider( metadataProvider, metadataRepository, logger, ); + const inMemoryCachedMetadataProvider = new CachingMetadataProvider( + dbCachedMetadataProvider, + new InMemoryMetadataCache(), + logger, + ); const eventRegistryRepository = new KyselyEventRegistryRepository( kyselyDatabase, @@ -118,9 +131,9 @@ export class SharedDependenciesService { projectRepository, roundRepository, applicationRepository, - pricingProvider: cachedPricingProvider, + pricingProvider: inMemoryCachedPricingProvider, donationRepository, - metadataProvider: cachedMetadataProvider, + metadataProvider: inMemoryCachedMetadataProvider, applicationPayoutRepository, transactionManager, }, diff --git a/apps/processing/test/unit/sharedDependencies.service.spec.ts b/apps/processing/test/unit/sharedDependencies.service.spec.ts index c832ce9..85f1bd4 100644 --- a/apps/processing/test/unit/sharedDependencies.service.spec.ts +++ b/apps/processing/test/unit/sharedDependencies.service.spec.ts @@ -47,6 +47,8 @@ vi.mock("@grants-stack-indexer/repository", () => ({ KyselyTransactionManager: vi.fn(), KyselyPricingCache: vi.fn(), KyselyMetadataCache: vi.fn(), + InMemoryPricingCache: vi.fn(), + InMemoryMetadataCache: vi.fn(), })); vi.mock("@grants-stack-indexer/pricing", () => ({ diff --git a/packages/metadata/src/external.ts b/packages/metadata/src/external.ts index 6a7bccb..5cf43fa 100644 --- a/packages/metadata/src/external.ts +++ b/packages/metadata/src/external.ts @@ -6,4 +6,4 @@ export { InvalidContentException, } from "./internal.js"; -export type { IMetadataProvider } from "./internal.js"; +export type { IMetadataProvider, ICacheableMetadataProvider } from "./internal.js"; diff --git a/packages/metadata/src/interfaces/metadata.interface.ts b/packages/metadata/src/interfaces/metadata.interface.ts index b4552a9..9f8511c 100644 --- a/packages/metadata/src/interfaces/metadata.interface.ts +++ b/packages/metadata/src/interfaces/metadata.interface.ts @@ -13,3 +13,12 @@ export interface IMetadataProvider { */ getMetadata(ipfsCid: string, validateContent?: z.ZodSchema): Promise; } + +export interface ICacheableMetadataProvider extends IMetadataProvider { + /** + * Clear all cached metadata entries. + * This is only implemented by providers that support caching. + * @throws If there is an error clearing the cache + */ + clearCache?(): Promise; +} diff --git a/packages/metadata/src/providers/cachingProxy.provider.ts b/packages/metadata/src/providers/cachingProxy.provider.ts index 0824103..18be434 100644 --- a/packages/metadata/src/providers/cachingProxy.provider.ts +++ b/packages/metadata/src/providers/cachingProxy.provider.ts @@ -3,7 +3,7 @@ import { z } from "zod"; import { ICache } from "@grants-stack-indexer/repository"; import { ILogger } from "@grants-stack-indexer/shared"; -import { IMetadataProvider } from "../internal.js"; +import { ICacheableMetadataProvider, IMetadataProvider } from "../internal.js"; /** * A metadata provider that caches metadata lookups from the underlying provider. @@ -11,7 +11,7 @@ import { IMetadataProvider } from "../internal.js"; * If not found in cache, fetches from the underlying provider and caches the result before returning. * Cache failures (both reads and writes) are logged but do not prevent the provider from functioning. */ -export class CachingMetadataProvider implements IMetadataProvider { +export class CachingMetadataProvider implements ICacheableMetadataProvider { constructor( private readonly provider: IMetadataProvider, private readonly cache: ICache, @@ -50,4 +50,15 @@ export class CachingMetadataProvider implements IMetadataProvider { return metadata; } + + /** @inheritdoc */ + async clearCache(): Promise { + try { + await this.cache.clearAll(); + } catch (error) { + this.logger.debug(`Failed to clear metadata cache`, { + error, + }); + } + } } diff --git a/packages/pricing/src/external.ts b/packages/pricing/src/external.ts index f1ee939..cb078b0 100644 --- a/packages/pricing/src/external.ts +++ b/packages/pricing/src/external.ts @@ -1,4 +1,4 @@ -export type { TokenPrice, IPricingProvider } from "./internal.js"; +export type { TokenPrice, IPricingProvider, ICacheablePricingProvider } from "./internal.js"; export { CoingeckoProvider, DummyPricingProvider, CachingPricingProvider } from "./internal.js"; diff --git a/packages/pricing/src/interfaces/pricing.interface.ts b/packages/pricing/src/interfaces/pricing.interface.ts index dca4dad..0622563 100644 --- a/packages/pricing/src/interfaces/pricing.interface.ts +++ b/packages/pricing/src/interfaces/pricing.interface.ts @@ -26,3 +26,12 @@ export interface IPricingProvider { endTimestampMs?: number, ): Promise; } + +export interface ICacheablePricingProvider extends IPricingProvider { + /** + * Clear all cached prices entries. + * This is only implemented by providers that support caching. + * @throws If there is an error clearing the cache + */ + clearCache?(): Promise; +} diff --git a/packages/pricing/src/providers/cachingProxy.provider.ts b/packages/pricing/src/providers/cachingProxy.provider.ts index 6aa4234..cf900ee 100644 --- a/packages/pricing/src/providers/cachingProxy.provider.ts +++ b/packages/pricing/src/providers/cachingProxy.provider.ts @@ -1,7 +1,7 @@ import { ICache, PriceCacheKey } from "@grants-stack-indexer/repository"; import { ILogger, TokenCode } from "@grants-stack-indexer/shared"; -import { IPricingProvider, TokenPrice } from "../internal.js"; +import { ICacheablePricingProvider, IPricingProvider, TokenPrice } from "../internal.js"; /** * A pricing provider that caches token price lookups from the underlying provider. @@ -9,7 +9,7 @@ import { IPricingProvider, TokenPrice } from "../internal.js"; * If not found in cache, fetches from the underlying provider and caches the result before returning. * Cache failures (both reads and writes) are logged but do not prevent the provider from functioning. */ -export class CachingPricingProvider implements IPricingProvider { +export class CachingPricingProvider implements ICacheablePricingProvider { constructor( private readonly provider: IPricingProvider, private readonly cache: ICache, @@ -64,4 +64,15 @@ export class CachingPricingProvider implements IPricingProvider { return price; } + + /** @inheritdoc */ + async clearCache(): Promise { + try { + await this.cache.clearAll(); + } catch (error) { + this.logger.debug(`Failed to clear metadata cache`, { + error, + }); + } + } } diff --git a/packages/processors/src/types/processor.types.ts b/packages/processors/src/types/processor.types.ts index a1deeee..d0725fe 100644 --- a/packages/processors/src/types/processor.types.ts +++ b/packages/processors/src/types/processor.types.ts @@ -1,6 +1,6 @@ import type { EvmProvider } from "@grants-stack-indexer/chain-providers"; -import type { IMetadataProvider } from "@grants-stack-indexer/metadata"; -import type { IPricingProvider } from "@grants-stack-indexer/pricing"; +import type { ICacheableMetadataProvider } from "@grants-stack-indexer/metadata"; +import type { ICacheablePricingProvider } from "@grants-stack-indexer/pricing"; import type { IApplicationReadRepository, IProjectReadRepository, @@ -10,8 +10,8 @@ import { ILogger } from "@grants-stack-indexer/shared"; export type ProcessorDependencies = { evmProvider: EvmProvider; - pricingProvider: IPricingProvider; - metadataProvider: IMetadataProvider; + pricingProvider: ICacheablePricingProvider; + metadataProvider: ICacheableMetadataProvider; roundRepository: IRoundReadRepository; projectRepository: IProjectReadRepository; applicationRepository: IApplicationReadRepository; diff --git a/packages/repository/src/external.ts b/packages/repository/src/external.ts index 04a2893..6d28a41 100644 --- a/packages/repository/src/external.ts +++ b/packages/repository/src/external.ts @@ -77,6 +77,11 @@ export { KyselyTransactionManager } from "./internal.js"; export type { ICache } from "./internal.js"; export type { Metadata, NewMetadata, PartialMetadata } from "./internal.js"; export type { Price, NewPrice, PartialPrice, PriceCacheKey } from "./internal.js"; -export { KyselyMetadataCache, KyselyPricingCache } from "./internal.js"; +export { + KyselyMetadataCache, + KyselyPricingCache, + InMemoryMetadataCache, + InMemoryPricingCache, +} from "./internal.js"; export { createKyselyPostgresDb as createKyselyDatabase } from "./internal.js"; diff --git a/packages/repository/src/interfaces/cache.interface.ts b/packages/repository/src/interfaces/cache.interface.ts index a026730..33f1c67 100644 --- a/packages/repository/src/interfaces/cache.interface.ts +++ b/packages/repository/src/interfaces/cache.interface.ts @@ -19,4 +19,10 @@ export interface ICache { * @throws If there is an error setting the value. */ set(key: Key, value: Value): Promise; + + /** + * Clear all values from the cache. + * @throws If there is an error clearing the cache. + */ + clearAll(): Promise; } diff --git a/packages/repository/src/internal.ts b/packages/repository/src/internal.ts index f8c7d74..de80c52 100644 --- a/packages/repository/src/internal.ts +++ b/packages/repository/src/internal.ts @@ -4,3 +4,4 @@ export * from "./exceptions/index.js"; export * from "./utils/index.js"; export * from "./db/connection.js"; export * from "./repositories/kysely/index.js"; +export * from "./repositories/memory/index.js"; diff --git a/packages/repository/src/repositories/kysely/metadata.repository.ts b/packages/repository/src/repositories/kysely/metadata.repository.ts index 66fd5ea..cda8a9f 100644 --- a/packages/repository/src/repositories/kysely/metadata.repository.ts +++ b/packages/repository/src/repositories/kysely/metadata.repository.ts @@ -63,4 +63,9 @@ export class KyselyMetadataCache implements ICache { }); } } + + /** @inheritdoc */ + async clearAll(): Promise { + // No-op since we don't want to clear the cache + } } diff --git a/packages/repository/src/repositories/kysely/prices.repository.ts b/packages/repository/src/repositories/kysely/prices.repository.ts index 5d1ca10..5394be1 100644 --- a/packages/repository/src/repositories/kysely/prices.repository.ts +++ b/packages/repository/src/repositories/kysely/prices.repository.ts @@ -86,4 +86,9 @@ export class KyselyPricingCache implements ICache { }); } } + + /** @inheritdoc */ + async clearAll(): Promise { + // No-op since we don't want to clear the cache + } } diff --git a/packages/repository/src/repositories/memory/index.ts b/packages/repository/src/repositories/memory/index.ts new file mode 100644 index 0000000..68f6684 --- /dev/null +++ b/packages/repository/src/repositories/memory/index.ts @@ -0,0 +1,2 @@ +export * from "./metadata.repository.js"; +export * from "./prices.repository.js"; diff --git a/packages/repository/src/repositories/memory/metadata.repository.ts b/packages/repository/src/repositories/memory/metadata.repository.ts new file mode 100644 index 0000000..0c27d97 --- /dev/null +++ b/packages/repository/src/repositories/memory/metadata.repository.ts @@ -0,0 +1,24 @@ +import { ICache } from "../../internal.js"; + +/** + * A cache for metadata using a simple in-memory map. + * This cache is used to store and retrieve metadata + */ +export class InMemoryMetadataCache implements ICache { + private readonly cache: Map = new Map(); + + /** @inheritdoc */ + async get(id: string): Promise { + return this.cache.get(id) as T | undefined; + } + + /** @inheritdoc */ + async set(id: string, metadata: T): Promise { + this.cache.set(id, metadata); + } + + /** @inheritdoc */ + async clearAll(): Promise { + this.cache.clear(); + } +} diff --git a/packages/repository/src/repositories/memory/prices.repository.ts b/packages/repository/src/repositories/memory/prices.repository.ts new file mode 100644 index 0000000..6aa2edb --- /dev/null +++ b/packages/repository/src/repositories/memory/prices.repository.ts @@ -0,0 +1,40 @@ +import { TokenPrice } from "@grants-stack-indexer/shared"; + +import { ICache, PriceCacheKey } from "../../internal.js"; + +/** + * A cache for token prices using a simple in-memory map. + * This cache is used to store and retrieve token prices for a given token and timestamp. + */ +export class InMemoryPricingCache implements ICache { + private readonly cache: Map = new Map(); + + /** @inheritdoc */ + async get(key: PriceCacheKey): Promise { + const { tokenCode, timestampMs } = key; + + const keyString = `${tokenCode}-${timestampMs}`; + + const result = this.cache.get(keyString); + + if (!result) { + return undefined; + } + + return result; + } + + /** @inheritdoc */ + async set(key: PriceCacheKey, value: TokenPrice): Promise { + const { tokenCode, timestampMs } = key; + + const keyString = `${tokenCode}-${timestampMs}`; + + this.cache.set(keyString, value); + } + + /** @inheritdoc */ + async clearAll(): Promise { + this.cache.clear(); + } +} From 189ee917695f94bd9b3c73eb963f8542893de678 Mon Sep 17 00:00:00 2001 From: nigiri <168690269+0xnigir1@users.noreply.github.com> Date: Tue, 21 Jan 2025 16:42:06 -0300 Subject: [PATCH 2/9] fix: implement interface segregation for cacheable --- packages/data-flow/src/types/index.ts | 5 +++++ packages/metadata/src/external.ts | 2 +- packages/metadata/src/interfaces/metadata.interface.ts | 9 --------- .../metadata/src/providers/cachingProxy.provider.ts | 10 +++++----- packages/pricing/src/external.ts | 2 +- packages/pricing/src/interfaces/pricing.interface.ts | 9 --------- .../pricing/src/providers/cachingProxy.provider.ts | 10 +++++----- packages/processors/src/types/processor.types.ts | 8 ++++---- packages/repository/src/interfaces/cache.interface.ts | 6 ------ packages/shared/src/caching/cacheable.interface.ts | 7 +++++++ packages/shared/src/caching/index.ts | 1 + packages/shared/src/external.ts | 2 ++ packages/shared/src/internal.ts | 1 + 13 files changed, 32 insertions(+), 40 deletions(-) create mode 100644 packages/shared/src/caching/cacheable.interface.ts create mode 100644 packages/shared/src/caching/index.ts diff --git a/packages/data-flow/src/types/index.ts b/packages/data-flow/src/types/index.ts index 563071a..f336cf2 100644 --- a/packages/data-flow/src/types/index.ts +++ b/packages/data-flow/src/types/index.ts @@ -1,3 +1,5 @@ +import { IMetadataProvider } from "@grants-stack-indexer/metadata"; +import { IPricingProvider } from "@grants-stack-indexer/pricing"; import { ProcessorDependencies } from "@grants-stack-indexer/processors"; import { IApplicationPayoutRepository, @@ -7,6 +9,7 @@ import { IRoundRepository, ITransactionManager, } from "@grants-stack-indexer/repository"; +import { ICacheable } from "@grants-stack-indexer/shared"; /** * The core dependencies for the data flow @@ -19,6 +22,8 @@ export type CoreDependencies = Pick< ProcessorDependencies, "evmProvider" | "pricingProvider" | "metadataProvider" > & { + pricingProvider: IPricingProvider & Partial; + metadataProvider: IMetadataProvider & Partial; roundRepository: IRoundRepository; projectRepository: IProjectRepository; applicationRepository: IApplicationRepository; diff --git a/packages/metadata/src/external.ts b/packages/metadata/src/external.ts index 5cf43fa..6a7bccb 100644 --- a/packages/metadata/src/external.ts +++ b/packages/metadata/src/external.ts @@ -6,4 +6,4 @@ export { InvalidContentException, } from "./internal.js"; -export type { IMetadataProvider, ICacheableMetadataProvider } from "./internal.js"; +export type { IMetadataProvider } from "./internal.js"; diff --git a/packages/metadata/src/interfaces/metadata.interface.ts b/packages/metadata/src/interfaces/metadata.interface.ts index 9f8511c..b4552a9 100644 --- a/packages/metadata/src/interfaces/metadata.interface.ts +++ b/packages/metadata/src/interfaces/metadata.interface.ts @@ -13,12 +13,3 @@ export interface IMetadataProvider { */ getMetadata(ipfsCid: string, validateContent?: z.ZodSchema): Promise; } - -export interface ICacheableMetadataProvider extends IMetadataProvider { - /** - * Clear all cached metadata entries. - * This is only implemented by providers that support caching. - * @throws If there is an error clearing the cache - */ - clearCache?(): Promise; -} diff --git a/packages/metadata/src/providers/cachingProxy.provider.ts b/packages/metadata/src/providers/cachingProxy.provider.ts index 18be434..7e281aa 100644 --- a/packages/metadata/src/providers/cachingProxy.provider.ts +++ b/packages/metadata/src/providers/cachingProxy.provider.ts @@ -1,9 +1,9 @@ import { z } from "zod"; import { ICache } from "@grants-stack-indexer/repository"; -import { ILogger } from "@grants-stack-indexer/shared"; +import { ICacheable, ILogger } from "@grants-stack-indexer/shared"; -import { ICacheableMetadataProvider, IMetadataProvider } from "../internal.js"; +import { IMetadataProvider } from "../internal.js"; /** * A metadata provider that caches metadata lookups from the underlying provider. @@ -11,10 +11,10 @@ import { ICacheableMetadataProvider, IMetadataProvider } from "../internal.js"; * If not found in cache, fetches from the underlying provider and caches the result before returning. * Cache failures (both reads and writes) are logged but do not prevent the provider from functioning. */ -export class CachingMetadataProvider implements ICacheableMetadataProvider { +export class CachingMetadataProvider implements IMetadataProvider, ICacheable { constructor( private readonly provider: IMetadataProvider, - private readonly cache: ICache, + private readonly cache: ICache & Partial, private readonly logger: ILogger, ) {} @@ -54,7 +54,7 @@ export class CachingMetadataProvider implements ICacheableMetadataProvider { /** @inheritdoc */ async clearCache(): Promise { try { - await this.cache.clearAll(); + await this.cache.clearCache?.(); } catch (error) { this.logger.debug(`Failed to clear metadata cache`, { error, diff --git a/packages/pricing/src/external.ts b/packages/pricing/src/external.ts index cb078b0..f1ee939 100644 --- a/packages/pricing/src/external.ts +++ b/packages/pricing/src/external.ts @@ -1,4 +1,4 @@ -export type { TokenPrice, IPricingProvider, ICacheablePricingProvider } from "./internal.js"; +export type { TokenPrice, IPricingProvider } from "./internal.js"; export { CoingeckoProvider, DummyPricingProvider, CachingPricingProvider } from "./internal.js"; diff --git a/packages/pricing/src/interfaces/pricing.interface.ts b/packages/pricing/src/interfaces/pricing.interface.ts index 0622563..dca4dad 100644 --- a/packages/pricing/src/interfaces/pricing.interface.ts +++ b/packages/pricing/src/interfaces/pricing.interface.ts @@ -26,12 +26,3 @@ export interface IPricingProvider { endTimestampMs?: number, ): Promise; } - -export interface ICacheablePricingProvider extends IPricingProvider { - /** - * Clear all cached prices entries. - * This is only implemented by providers that support caching. - * @throws If there is an error clearing the cache - */ - clearCache?(): Promise; -} diff --git a/packages/pricing/src/providers/cachingProxy.provider.ts b/packages/pricing/src/providers/cachingProxy.provider.ts index cf900ee..7aceff8 100644 --- a/packages/pricing/src/providers/cachingProxy.provider.ts +++ b/packages/pricing/src/providers/cachingProxy.provider.ts @@ -1,7 +1,7 @@ import { ICache, PriceCacheKey } from "@grants-stack-indexer/repository"; -import { ILogger, TokenCode } from "@grants-stack-indexer/shared"; +import { ICacheable, ILogger, TokenCode } from "@grants-stack-indexer/shared"; -import { ICacheablePricingProvider, IPricingProvider, TokenPrice } from "../internal.js"; +import { IPricingProvider, TokenPrice } from "../internal.js"; /** * A pricing provider that caches token price lookups from the underlying provider. @@ -9,10 +9,10 @@ import { ICacheablePricingProvider, IPricingProvider, TokenPrice } from "../inte * If not found in cache, fetches from the underlying provider and caches the result before returning. * Cache failures (both reads and writes) are logged but do not prevent the provider from functioning. */ -export class CachingPricingProvider implements ICacheablePricingProvider { +export class CachingPricingProvider implements IPricingProvider, ICacheable { constructor( private readonly provider: IPricingProvider, - private readonly cache: ICache, + private readonly cache: ICache & Partial, private readonly logger: ILogger, ) {} @@ -68,7 +68,7 @@ export class CachingPricingProvider implements ICacheablePricingProvider { /** @inheritdoc */ async clearCache(): Promise { try { - await this.cache.clearAll(); + await this.cache.clearCache?.(); } catch (error) { this.logger.debug(`Failed to clear metadata cache`, { error, diff --git a/packages/processors/src/types/processor.types.ts b/packages/processors/src/types/processor.types.ts index d0725fe..2b92de0 100644 --- a/packages/processors/src/types/processor.types.ts +++ b/packages/processors/src/types/processor.types.ts @@ -1,17 +1,17 @@ import type { EvmProvider } from "@grants-stack-indexer/chain-providers"; -import type { ICacheableMetadataProvider } from "@grants-stack-indexer/metadata"; -import type { ICacheablePricingProvider } from "@grants-stack-indexer/pricing"; import type { IApplicationReadRepository, IProjectReadRepository, IRoundReadRepository, } from "@grants-stack-indexer/repository"; +import { IMetadataProvider } from "@grants-stack-indexer/metadata"; +import { IPricingProvider } from "@grants-stack-indexer/pricing"; import { ILogger } from "@grants-stack-indexer/shared"; export type ProcessorDependencies = { evmProvider: EvmProvider; - pricingProvider: ICacheablePricingProvider; - metadataProvider: ICacheableMetadataProvider; + pricingProvider: IPricingProvider; + metadataProvider: IMetadataProvider; roundRepository: IRoundReadRepository; projectRepository: IProjectReadRepository; applicationRepository: IApplicationReadRepository; diff --git a/packages/repository/src/interfaces/cache.interface.ts b/packages/repository/src/interfaces/cache.interface.ts index 33f1c67..a026730 100644 --- a/packages/repository/src/interfaces/cache.interface.ts +++ b/packages/repository/src/interfaces/cache.interface.ts @@ -19,10 +19,4 @@ export interface ICache { * @throws If there is an error setting the value. */ set(key: Key, value: Value): Promise; - - /** - * Clear all values from the cache. - * @throws If there is an error clearing the cache. - */ - clearAll(): Promise; } diff --git a/packages/shared/src/caching/cacheable.interface.ts b/packages/shared/src/caching/cacheable.interface.ts new file mode 100644 index 0000000..3b10321 --- /dev/null +++ b/packages/shared/src/caching/cacheable.interface.ts @@ -0,0 +1,7 @@ +export interface ICacheable { + /** + * Clear the cache. + * @throws If there is an error clearing the cache + */ + clearCache(): Promise; +} diff --git a/packages/shared/src/caching/index.ts b/packages/shared/src/caching/index.ts new file mode 100644 index 0000000..9e90587 --- /dev/null +++ b/packages/shared/src/caching/index.ts @@ -0,0 +1 @@ +export * from "./cacheable.interface.js"; diff --git a/packages/shared/src/external.ts b/packages/shared/src/external.ts index dbd0d85..0692a44 100644 --- a/packages/shared/src/external.ts +++ b/packages/shared/src/external.ts @@ -27,3 +27,5 @@ export type { RetryMetadata, ErrorContext } from "./internal.js"; export { ExponentialBackoff, RetryHandler } from "./internal.js"; export type { RetryStrategy, RetryStrategyOptions } from "./internal.js"; + +export type { ICacheable } from "./internal.js"; diff --git a/packages/shared/src/internal.ts b/packages/shared/src/internal.ts index 5229d88..3163cf7 100644 --- a/packages/shared/src/internal.ts +++ b/packages/shared/src/internal.ts @@ -8,3 +8,4 @@ export * from "./logger/index.js"; export * from "./tokens/tokens.js"; export * from "./exceptions/index.js"; export * from "./retry/index.js"; +export * from "./caching/index.js"; From f97a4f860f6a1a3282ba79f0a74c528793c62200 Mon Sep 17 00:00:00 2001 From: nigiri <168690269+0xnigir1@users.noreply.github.com> Date: Tue, 21 Jan 2025 16:51:37 -0300 Subject: [PATCH 3/9] style: rename variables --- .../src/services/sharedDependencies.service.ts | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/apps/processing/src/services/sharedDependencies.service.ts b/apps/processing/src/services/sharedDependencies.service.ts index 768b929..2b0012a 100644 --- a/apps/processing/src/services/sharedDependencies.service.ts +++ b/apps/processing/src/services/sharedDependencies.service.ts @@ -73,29 +73,29 @@ export class SharedDependenciesService { env.DATABASE_SCHEMA, ); const pricingRepository = new KyselyPricingCache(kyselyDatabase, env.DATABASE_SCHEMA); - const pricingProvider = PricingProviderFactory.create(env, { + const internalPricingProvider = PricingProviderFactory.create(env, { logger, }); const dbCachedPricingProvider = new CachingPricingProvider( - pricingProvider, + internalPricingProvider, pricingRepository, logger, ); - const inMemoryCachedPricingProvider = new CachingPricingProvider( + const pricingProvider = new CachingPricingProvider( dbCachedPricingProvider, new InMemoryPricingCache(), logger, ); const metadataRepository = new KyselyMetadataCache(kyselyDatabase, env.DATABASE_SCHEMA); - const metadataProvider = new IpfsProvider(env.IPFS_GATEWAYS_URL, logger); + const internalMetadataProvider = new IpfsProvider(env.IPFS_GATEWAYS_URL, logger); const dbCachedMetadataProvider = new CachingMetadataProvider( - metadataProvider, + internalMetadataProvider, metadataRepository, logger, ); - const inMemoryCachedMetadataProvider = new CachingMetadataProvider( + const metadataProvider = new CachingMetadataProvider( dbCachedMetadataProvider, new InMemoryMetadataCache(), logger, @@ -131,9 +131,9 @@ export class SharedDependenciesService { projectRepository, roundRepository, applicationRepository, - pricingProvider: inMemoryCachedPricingProvider, + pricingProvider, donationRepository, - metadataProvider: inMemoryCachedMetadataProvider, + metadataProvider, applicationPayoutRepository, transactionManager, }, From 50196d66dfe488dcef94ea20238a8f95aa316a68 Mon Sep 17 00:00:00 2001 From: nigiri <168690269+0xnigir1@users.noreply.github.com> Date: Tue, 21 Jan 2025 17:13:06 -0300 Subject: [PATCH 4/9] style: fix log message --- packages/pricing/src/providers/cachingProxy.provider.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/packages/pricing/src/providers/cachingProxy.provider.ts b/packages/pricing/src/providers/cachingProxy.provider.ts index 7aceff8..7cf4700 100644 --- a/packages/pricing/src/providers/cachingProxy.provider.ts +++ b/packages/pricing/src/providers/cachingProxy.provider.ts @@ -70,7 +70,7 @@ export class CachingPricingProvider implements IPricingProvider, ICacheable { try { await this.cache.clearCache?.(); } catch (error) { - this.logger.debug(`Failed to clear metadata cache`, { + this.logger.debug(`Failed to clear pricing cache`, { error, }); } From a8f1426030f3d9cf0b558f2502dfd7f700706d30 Mon Sep 17 00:00:00 2001 From: nigiri <168690269+0xnigir1@users.noreply.github.com> Date: Mon, 20 Jan 2025 20:33:13 -0300 Subject: [PATCH 5/9] feat: fetch metadata & prices in bulk ahead of batch processing --- packages/data-flow/src/orchestrator.ts | 153 ++++++++++++++- .../data-flow/test/unit/orchestrator.spec.ts | 1 + .../test/unit/retroactiveProcessor.spec.ts | 1 + .../metadata/src/providers/ipfs.provider.ts | 14 +- packages/pricing/src/constants/index.ts | 1 + .../src/interfaces/pricing.interface.ts | 9 + packages/pricing/src/internal.ts | 1 + .../src/providers/cachingProxy.provider.ts | 185 +++++++++++++++++- .../src/providers/coingecko.provider.ts | 53 +++++ .../pricing/src/providers/dummy.provider.ts | 8 + .../allo/handlers/poolCreated.handler.spec.ts | 1 + .../allo/handlers/poolFunded.handler.spec.ts | 1 + .../poolMetadataUpdated.handler.spec.ts | 1 + .../handlers/directAllocated.handler.spec.ts | 1 + .../directGrantsLite.handler.spec.ts | 1 + .../handlers/allocated.handler.spec.ts | 1 + .../dvmdDirectTransfer.handler.spec.ts | 1 + .../handlers/allocated.handler.spec.ts | 1 + packages/shared/src/retry/retry.ts | 7 +- packages/shared/test/retry/retry.spec.ts | 3 +- 20 files changed, 426 insertions(+), 18 deletions(-) create mode 100644 packages/pricing/src/constants/index.ts diff --git a/packages/data-flow/src/orchestrator.ts b/packages/data-flow/src/orchestrator.ts index 16b6c21..6ec4ea1 100644 --- a/packages/data-flow/src/orchestrator.ts +++ b/packages/data-flow/src/orchestrator.ts @@ -1,6 +1,7 @@ import { isNativeError } from "util/types"; import { IIndexerClient } from "@grants-stack-indexer/indexer-client"; +import { TokenPrice } from "@grants-stack-indexer/pricing"; import { existsHandler, UnsupportedEventException, @@ -13,6 +14,7 @@ import { AnyIndexerFetchedEvent, ChainId, ContractName, + getToken, Hex, ILogger, isAlloEvent, @@ -23,6 +25,7 @@ import { RetryStrategy, StrategyEvent, stringify, + Token, } from "@grants-stack-indexer/shared"; import type { IEventsFetcher, IEventsRegistry, IStrategyRegistry } from "./interfaces/index.js"; @@ -31,6 +34,12 @@ import { EventsProcessor } from "./eventsProcessor.js"; import { InvalidEvent } from "./exceptions/index.js"; import { CoreDependencies, DataLoader, delay, IQueue, iStrategyAbi, Queue } from "./internal.js"; +type TokenWithTimestamps = { + token: Token; + minTimestamp: number; + maxTimestamp: number; +}; + /** * The Orchestrator is the central coordinator of the data flow system, managing the interaction between * three main components: @@ -116,7 +125,11 @@ export class Orchestrator { while (!signal.aborted) { let event: ProcessorEvent | undefined; try { - if (this.eventsQueue.isEmpty()) await this.enqueueEvents(); + if (this.eventsQueue.isEmpty()) { + const events = await this.getNextEventsBatch(); + await this.bulkFetchMetadataAndPricesForBatch(events); + await this.enqueueEvents(events); + } event = this.eventsQueue.pop(); @@ -197,6 +210,49 @@ export class Orchestrator { }); } + private async getMetadataFromEvents(events: AnyIndexerFetchedEvent[]): Promise { + const ids = new Set(); + + for (const event of events) { + if ("metadata" in event.params) { + ids.add(event.params.metadata[1]); + } + } + + return Array.from(ids); + } + + private async getTokensFromEvents( + events: AnyIndexerFetchedEvent[], + ): Promise { + const tokenMap = new Map(); + + for (const event of events) { + if ( + "token" in event.params && + "amount" in event.params && + BigInt(event.params.amount) > 0n + ) { + const token = getToken(this.chainId, event.params.token); + if (!token) continue; + + const existing = tokenMap.get(token.address); + if (existing) { + existing.minTimestamp = Math.min(existing.minTimestamp, event.blockTimestamp); + existing.maxTimestamp = Math.max(existing.maxTimestamp, event.blockTimestamp); + } else { + tokenMap.set(token.address, { + token, + minTimestamp: event.blockTimestamp, + maxTimestamp: event.blockTimestamp, + }); + } + } + } + + return Array.from(tokenMap.values()); + } + /** * Sometimes the TimestampsUpdated event is part of the _initialize() function of a strategy. * In this case, the event is emitted before the PoolCreated event. We can safely ignore the error @@ -224,10 +280,7 @@ export class Orchestrator { return false; } - /** - * Enqueue new events from the events fetcher using the last processed event as a starting point - */ - private async enqueueEvents(): Promise { + private async getNextEventsBatch(): Promise { const lastProcessedEvent = await this.eventsRegistry.getLastProcessedEvent(this.chainId); const blockNumber = lastProcessedEvent?.blockNumber ?? 0; const logIndex = lastProcessedEvent?.logIndex ?? 0; @@ -240,6 +293,33 @@ export class Orchestrator { allowPartialLastBlock: false, }); + return events; + } + + /** + * Clear caches and fetch metadata and prices for the batch + */ + private async bulkFetchMetadataAndPricesForBatch( + events: AnyIndexerFetchedEvent[], + ): Promise { + // Clear caches + if (this.dependencies.metadataProvider.clearCache) { + await this.dependencies.metadataProvider.clearCache(); + } + + const metadataIds = await this.getMetadataFromEvents(events); + const tokens = await this.getTokensFromEvents(events); + + await Promise.allSettled([ + this.bulkFetchMetadata(metadataIds), + this.bulkFetchTokens(tokens), + ]); + } + + /** + * Enqueue events and updates new context for the batch + */ + private async enqueueEvents(events: AnyIndexerFetchedEvent[]): Promise { // Clear previous context this.eventsByBlockContext.clear(); for (const event of events) { @@ -252,6 +332,69 @@ export class Orchestrator { this.eventsQueue.push(...events); } + /** + * Fetch all possible metadata for the batch + */ + private async bulkFetchMetadata(metadataIds: string[]): Promise { + const results = await Promise.allSettled( + metadataIds.map((id) => + this.retryHandler.execute(() => + this.dependencies.metadataProvider.getMetadata(id), + ), + ), + ); + + const metadata: unknown[] = []; + for (const result of results) { + if (result.status === "fulfilled" && result.value) { + metadata.push(result.value); + } + } + + return metadata; + } + + /** + * Fetch all possible prices for the batch + */ + private async bulkFetchTokens(tokens: TokenWithTimestamps[]): Promise { + const results = await Promise.allSettled( + tokens.map(({ token, minTimestamp, maxTimestamp }) => + this.retryHandler.execute(async () => { + // Get all unique timestamps between min and max + const events = this.eventsByBlockContext.values(); + const timestamps = Array.from(events) + .flat() + .filter( + (e) => + e.blockTimestamp >= minTimestamp && + e.blockTimestamp <= maxTimestamp, + ) + .map((e) => e.blockTimestamp); + + // Remove duplicates and sort + const uniqueTimestamps = [...new Set(timestamps)].sort(); + + // Get prices for all timestamps in the range + const prices = await this.dependencies.pricingProvider.getTokenPrices( + token.priceSourceCode, + uniqueTimestamps, + ); + return prices; + }), + ), + ); + + const tokenPrices: TokenPrice[] = []; + for (const result of results) { + if (result.status === "fulfilled" && result.value) { + tokenPrices.push(...result.value); + } + } + + return tokenPrices; + } + private async handleEvent(event: ProcessorEvent): Promise { event = await this.enhanceStrategyId(event); if (this.isPoolCreated(event)) { diff --git a/packages/data-flow/test/unit/orchestrator.spec.ts b/packages/data-flow/test/unit/orchestrator.spec.ts index 5989b59..eb98258 100644 --- a/packages/data-flow/test/unit/orchestrator.spec.ts +++ b/packages/data-flow/test/unit/orchestrator.spec.ts @@ -103,6 +103,7 @@ describe("Orchestrator", { sequential: true }, () => { applicationPayoutRepository: {} as unknown as IApplicationPayoutRepository, pricingProvider: { getTokenPrice: vi.fn(), + getTokenPrices: vi.fn(), }, metadataProvider: { getMetadata: vi.fn(), diff --git a/packages/data-flow/test/unit/retroactiveProcessor.spec.ts b/packages/data-flow/test/unit/retroactiveProcessor.spec.ts index e9353ef..97ae098 100644 --- a/packages/data-flow/test/unit/retroactiveProcessor.spec.ts +++ b/packages/data-flow/test/unit/retroactiveProcessor.spec.ts @@ -144,6 +144,7 @@ describe("RetroactiveProcessor", () => { transactionManager: {} as ITransactionManager, pricingProvider: { getTokenPrice: vi.fn(), + getTokenPrices: vi.fn(), }, metadataProvider: { getMetadata: vi.fn(), diff --git a/packages/metadata/src/providers/ipfs.provider.ts b/packages/metadata/src/providers/ipfs.provider.ts index 3399faa..9a0cd08 100644 --- a/packages/metadata/src/providers/ipfs.provider.ts +++ b/packages/metadata/src/providers/ipfs.provider.ts @@ -8,6 +8,7 @@ import { EmptyGatewaysUrlsException, InvalidContentException, isValidCid } from export class IpfsProvider implements IMetadataProvider { private readonly axiosInstance: AxiosInstance; + private currentGatewayIndex: number = 0; constructor( private readonly gateways: string[], @@ -21,6 +22,12 @@ export class IpfsProvider implements IMetadataProvider { this.axiosInstance = axios.create(); } + private getNextGateway(): string { + const gateway = this.gateways[this.currentGatewayIndex]!; + this.currentGatewayIndex = (this.currentGatewayIndex + 1) % this.gateways.length; + return gateway; + } + /* @inheritdoc */ async getMetadata( ipfsCid: string, @@ -30,7 +37,12 @@ export class IpfsProvider implements IMetadataProvider { return undefined; } - for (const gateway of this.gateways) { + // Create array of gateways starting from current index + const orderedGateways = Array.from({ length: this.gateways.length }, () => + this.getNextGateway(), + ); + + for (const gateway of orderedGateways) { const url = `${gateway}/ipfs/${ipfsCid}`; try { //TODO: retry policy for each gateway diff --git a/packages/pricing/src/constants/index.ts b/packages/pricing/src/constants/index.ts new file mode 100644 index 0000000..768b4e9 --- /dev/null +++ b/packages/pricing/src/constants/index.ts @@ -0,0 +1 @@ +export const MIN_GRANULARITY_MS = 300_000; // 5 minutes diff --git a/packages/pricing/src/interfaces/pricing.interface.ts b/packages/pricing/src/interfaces/pricing.interface.ts index dca4dad..cfe738a 100644 --- a/packages/pricing/src/interfaces/pricing.interface.ts +++ b/packages/pricing/src/interfaces/pricing.interface.ts @@ -25,4 +25,13 @@ export interface IPricingProvider { startTimestampMs: number, endTimestampMs?: number, ): Promise; + + /** + * Retrieves all the prices of a token for specific timestamps. + * @param tokenCode - The code of the token. + * @param timestamps - Array of timestamps for which to retrieve prices. + * @returns A promise that resolves to the prices of the token for each requested timestamp. + * The returned prices may not have the exact timestamps requested. Depends on the implementation. + */ + getTokenPrices(tokenCode: TokenCode, timestamps: number[]): Promise; } diff --git a/packages/pricing/src/internal.ts b/packages/pricing/src/internal.ts index e1d4daa..7f6985a 100644 --- a/packages/pricing/src/internal.ts +++ b/packages/pricing/src/internal.ts @@ -1,4 +1,5 @@ export * from "./types/index.js"; +export * from "./constants/index.js"; export * from "./interfaces/index.js"; export * from "./providers/index.js"; export * from "./exceptions/index.js"; diff --git a/packages/pricing/src/providers/cachingProxy.provider.ts b/packages/pricing/src/providers/cachingProxy.provider.ts index 7cf4700..4c63785 100644 --- a/packages/pricing/src/providers/cachingProxy.provider.ts +++ b/packages/pricing/src/providers/cachingProxy.provider.ts @@ -3,6 +3,11 @@ import { ICacheable, ILogger, TokenCode } from "@grants-stack-indexer/shared"; import { IPricingProvider, TokenPrice } from "../internal.js"; +type CacheResult = { + timestampMs: number; + price: TokenPrice | undefined; +}; + /** * A pricing provider that caches token price lookups from the underlying provider. * When a price is requested, it first checks the cache. If found, returns the cached price. @@ -46,20 +51,21 @@ export class CachingPricingProvider implements IPricingProvider, ICacheable { ); if (price) { - try { - await this.cache.set( + // we don't await this, because it's not critical + this.cache + .set( { tokenCode, timestampMs: startTimestampMs, }, price, - ); - } catch (error) { - this.logger.debug( - `Failed to cache price for token ${tokenCode} at ${startTimestampMs}`, - { error }, - ); - } + ) + .catch((error) => { + this.logger.debug( + `Failed to cache price for token ${tokenCode} at ${startTimestampMs}`, + { error }, + ); + }); } return price; @@ -75,4 +81,165 @@ export class CachingPricingProvider implements IPricingProvider, ICacheable { }); } } + + /* @inheritdoc */ + /** + * Note: it caches the closest prices to the requested timestamps. + * Uses binary search to find the closest price for each requested timestamp. + */ + async getTokenPrices(tokenCode: TokenCode, timestamps: number[]): Promise { + if (timestamps.length === 0) return []; + + const cachedPrices = await this.getCachedPrices(tokenCode, timestamps); + const timestampsToFetch = this.getTimestampsToFetch(timestamps, cachedPrices); + + if (timestampsToFetch.length === 0) { + return cachedPrices + .filter( + (result): result is PromiseFulfilledResult => + result.status === "fulfilled" && !!result.value.price, + ) + .map((result) => result.value.price) + .filter((price): price is TokenPrice => !!price); + } + + const fetchedPrices = await this.provider.getTokenPrices(tokenCode, timestampsToFetch); + const sortedFetchedPrices = [...fetchedPrices].sort( + (a, b) => a.timestampMs - b.timestampMs, + ); + + const closestPrices = this.getClosestPricesWithCache( + tokenCode, + timestampsToFetch, + sortedFetchedPrices, + ); + + const priceMap = this.buildPriceMap(cachedPrices, closestPrices); + + return timestamps + .map((timestampMs) => priceMap.get(timestampMs)) + .filter((price): price is TokenPrice => !!price); + } + + private async getCachedPrices( + tokenCode: TokenCode, + timestamps: number[], + ): Promise[]> { + return Promise.allSettled( + timestamps.map(async (timestampMs) => { + try { + return { + timestampMs, + price: await this.cache.get({ tokenCode, timestampMs }), + }; + } catch (error) { + this.logger.debug( + `Failed to get cached price for token ${tokenCode} at ${timestampMs}`, + { error }, + ); + return { timestampMs, price: undefined }; + } + }), + ); + } + + private getTimestampsToFetch( + timestamps: number[], + cachedPrices: PromiseSettledResult[], + ): number[] { + return timestamps.filter((_, index) => { + const result = cachedPrices[index]; + if (!result || result.status === "rejected") return true; + return !result.value.price; + }); + } + + private getClosestPricesWithCache( + tokenCode: TokenCode, + timestampsToFetch: number[], + sortedFetchedPrices: TokenPrice[], + ): TokenPrice[] { + return timestampsToFetch + .map((timestampMs) => { + const closestPrice = this.findClosestPrice(sortedFetchedPrices, timestampMs); + if (!closestPrice) return null; + + const price = { + timestampMs, + priceUsd: closestPrice.priceUsd, + }; + + // Fire and forget cache operation + this.cache.set({ tokenCode, timestampMs }, price).catch((error) => { + this.logger.debug( + `Failed to cache price for token ${tokenCode} at ${timestampMs}`, + { + error, + }, + ); + }); + + return price; + }) + .filter((price): price is TokenPrice => price !== null); + } + + private buildPriceMap( + cachedPrices: PromiseSettledResult[], + closestPrices: TokenPrice[], + ): Map { + const priceMap = new Map(); + + // Add cached prices + cachedPrices.forEach((result) => { + if (result.status === "fulfilled" && result.value.price) { + priceMap.set(result.value.timestampMs, result.value.price); + } + }); + + // Add closest prices + closestPrices.forEach((price) => { + priceMap.set(price.timestampMs, price); + }); + + return priceMap; + } + + /** + * Find the closest price using binary search + * @param prices - Array of prices sorted by timestamp + * @param targetTimestamp - The timestamp to find closest match for + * @returns The closest matching price or null if no prices available + */ + private findClosestPrice(prices: TokenPrice[], targetTimestamp: number): TokenPrice | null { + if (prices.length === 0) { + return null; + } + + // Handle edge cases + if (targetTimestamp <= prices[0]!.timestampMs) return prices[0]!; + if (targetTimestamp >= prices[prices.length - 1]!.timestampMs) + return prices[prices.length - 1]!; + + // Binary search + let left = 0; + let right = prices.length - 1; + + while (left + 1 < right) { + const mid = Math.floor((left + right) / 2); + + if (prices[mid]!.timestampMs === targetTimestamp) { + return prices[mid]!; + } + + if (prices[mid]!.timestampMs < targetTimestamp) { + left = mid; + } else { + right = mid; + } + } + + // Return the floor value (largest timestamp <= target) + return prices[left]!; + } } diff --git a/packages/pricing/src/providers/coingecko.provider.ts b/packages/pricing/src/providers/coingecko.provider.ts index e1d5296..b08d1cf 100644 --- a/packages/pricing/src/providers/coingecko.provider.ts +++ b/packages/pricing/src/providers/coingecko.provider.ts @@ -13,6 +13,7 @@ import { IPricingProvider } from "../interfaces/index.js"; import { CoingeckoPriceChartData, CoingeckoTokenId, + MIN_GRANULARITY_MS, TokenPrice, UnknownPricingException, UnsupportedToken, @@ -143,6 +144,58 @@ export class CoingeckoProvider implements IPricingProvider { } } + /* @inheritdoc */ + async getTokenPrices(tokenCode: TokenCode, timestamps: number[]): Promise { + const tokenId = TokenMapping[tokenCode]; + if (!tokenId) { + throw new UnsupportedToken(tokenCode, { + className: CoingeckoProvider.name, + methodName: "getTokenPrice", + }); + } + + if (timestamps.length === 0) { + return []; + } + + const effectiveMin = Math.min(...timestamps); + let effectiveMax = Math.max(...timestamps); + + if (effectiveMin === effectiveMax || effectiveMin > effectiveMax) { + return []; + } + + if (effectiveMax - effectiveMin < MIN_GRANULARITY_MS) { + effectiveMax = effectiveMin + MIN_GRANULARITY_MS; + } + + const path = `/coins/${tokenId}/market_chart/range?vs_currency=usd&from=${effectiveMin}&to=${effectiveMax}&precision=full&interval=5m`; + + try { + const { data } = await this.axios.get(path); + return data.prices.map(([timestampMs, priceUsd]) => ({ + timestampMs, + priceUsd, + })); + } catch (error: unknown) { + if (isAxiosError(error)) { + this.handleAxiosError(error, path); + } + + const errorMessage = + `Unknown Coingecko API error: failed to fetch token price ` + + stringify(error, Object.getOwnPropertyNames(error)); + + throw new UnknownPricingException(errorMessage, { + className: CoingeckoProvider.name, + methodName: "getTokenPrice", + additionalData: { + path, + }, + }); + } + } + private handleAxiosError(error: AxiosError, path: string): void { const errorContext = { className: CoingeckoProvider.name, diff --git a/packages/pricing/src/providers/dummy.provider.ts b/packages/pricing/src/providers/dummy.provider.ts index df6b045..94ee6d3 100644 --- a/packages/pricing/src/providers/dummy.provider.ts +++ b/packages/pricing/src/providers/dummy.provider.ts @@ -21,4 +21,12 @@ export class DummyPricingProvider implements IPricingProvider { timestampMs: startTimestampMs, }); } + + /* @inheritdoc */ + async getTokenPrices(_tokenCode: TokenCode, timestamps: number[]): Promise { + return timestamps.map((timestampMs) => ({ + priceUsd: this.dummyPrice, + timestampMs, + })); + } } diff --git a/packages/processors/test/allo/handlers/poolCreated.handler.spec.ts b/packages/processors/test/allo/handlers/poolCreated.handler.spec.ts index 67c23e0..8d53490 100644 --- a/packages/processors/test/allo/handlers/poolCreated.handler.spec.ts +++ b/packages/processors/test/allo/handlers/poolCreated.handler.spec.ts @@ -56,6 +56,7 @@ describe("PoolCreatedHandler", () => { } as unknown as EvmProvider; mockPricingProvider = { getTokenPrice: vi.fn(), + getTokenPrices: vi.fn(), }; mockMetadataProvider = { getMetadata: vi.fn(), diff --git a/packages/processors/test/allo/handlers/poolFunded.handler.spec.ts b/packages/processors/test/allo/handlers/poolFunded.handler.spec.ts index 7588472..98d5153 100644 --- a/packages/processors/test/allo/handlers/poolFunded.handler.spec.ts +++ b/packages/processors/test/allo/handlers/poolFunded.handler.spec.ts @@ -56,6 +56,7 @@ describe("PoolFundedHandler", () => { } as unknown as IRoundReadRepository; mockPricingProvider = { getTokenPrice: vi.fn(), + getTokenPrices: vi.fn(), }; mockLogger = { error: vi.fn(), diff --git a/packages/processors/test/allo/handlers/poolMetadataUpdated.handler.spec.ts b/packages/processors/test/allo/handlers/poolMetadataUpdated.handler.spec.ts index b33441e..cb1aee2 100644 --- a/packages/processors/test/allo/handlers/poolMetadataUpdated.handler.spec.ts +++ b/packages/processors/test/allo/handlers/poolMetadataUpdated.handler.spec.ts @@ -68,6 +68,7 @@ describe("PoolMetadataUpdatedHandler", () => { } as unknown as ILogger; mockPricingProvider = { getTokenPrice: vi.fn(), + getTokenPrices: vi.fn(), }; }); diff --git a/packages/processors/test/strategy/directAllocation/handlers/directAllocated.handler.spec.ts b/packages/processors/test/strategy/directAllocation/handlers/directAllocated.handler.spec.ts index a177e51..60354e7 100644 --- a/packages/processors/test/strategy/directAllocation/handlers/directAllocated.handler.spec.ts +++ b/packages/processors/test/strategy/directAllocation/handlers/directAllocated.handler.spec.ts @@ -41,6 +41,7 @@ describe("DirectAllocatedHandler", () => { } as unknown as IRoundReadRepository; mockPricingProvider = { getTokenPrice: vi.fn(), + getTokenPrices: vi.fn(), } as IPricingProvider; mockProjectRepository = { getProjectByIdOrThrow: vi.fn(), diff --git a/packages/processors/test/strategy/directGrantsLite/directGrantsLite.handler.spec.ts b/packages/processors/test/strategy/directGrantsLite/directGrantsLite.handler.spec.ts index 2af0acd..c26807e 100644 --- a/packages/processors/test/strategy/directGrantsLite/directGrantsLite.handler.spec.ts +++ b/packages/processors/test/strategy/directGrantsLite/directGrantsLite.handler.spec.ts @@ -84,6 +84,7 @@ describe("DirectGrantsLiteStrategyHandler", () => { } as unknown as EvmProvider; mockPricingProvider = { getTokenPrice: vi.fn(), + getTokenPrices: vi.fn(), } as IPricingProvider; mockApplicationRepository = {} as IApplicationReadRepository; dependencies = { diff --git a/packages/processors/test/strategy/directGrantsLite/handlers/allocated.handler.spec.ts b/packages/processors/test/strategy/directGrantsLite/handlers/allocated.handler.spec.ts index b174bc4..1039f66 100644 --- a/packages/processors/test/strategy/directGrantsLite/handlers/allocated.handler.spec.ts +++ b/packages/processors/test/strategy/directGrantsLite/handlers/allocated.handler.spec.ts @@ -41,6 +41,7 @@ describe("DGLiteAllocatedHandler", () => { } as unknown as IApplicationRepository; mockPricingProvider = { getTokenPrice: vi.fn(), + getTokenPrices: vi.fn(), } as IPricingProvider; }); diff --git a/packages/processors/test/strategy/donationVotingMerkleDistributionDirectTransfer/dvmdDirectTransfer.handler.spec.ts b/packages/processors/test/strategy/donationVotingMerkleDistributionDirectTransfer/dvmdDirectTransfer.handler.spec.ts index 171de57..520c734 100644 --- a/packages/processors/test/strategy/donationVotingMerkleDistributionDirectTransfer/dvmdDirectTransfer.handler.spec.ts +++ b/packages/processors/test/strategy/donationVotingMerkleDistributionDirectTransfer/dvmdDirectTransfer.handler.spec.ts @@ -112,6 +112,7 @@ describe("DVMDDirectTransferHandler", () => { } as unknown as EvmProvider; mockPricingProvider = { getTokenPrice: vi.fn(), + getTokenPrices: vi.fn(), } as IPricingProvider; mockApplicationRepository = {} as IApplicationReadRepository; handler = new DVMDDirectTransferStrategyHandler(mockChainId, { diff --git a/packages/processors/test/strategy/donationVotingMerkleDistributionDirectTransfer/handlers/allocated.handler.spec.ts b/packages/processors/test/strategy/donationVotingMerkleDistributionDirectTransfer/handlers/allocated.handler.spec.ts index f7b0017..cd7a004 100644 --- a/packages/processors/test/strategy/donationVotingMerkleDistributionDirectTransfer/handlers/allocated.handler.spec.ts +++ b/packages/processors/test/strategy/donationVotingMerkleDistributionDirectTransfer/handlers/allocated.handler.spec.ts @@ -46,6 +46,7 @@ describe("DVMDAllocatedHandler", () => { } as unknown as IApplicationRepository; mockPricingProvider = { getTokenPrice: vi.fn(), + getTokenPrices: vi.fn(), } as IPricingProvider; }); diff --git a/packages/shared/src/retry/retry.ts b/packages/shared/src/retry/retry.ts index 7134e2b..e627c06 100644 --- a/packages/shared/src/retry/retry.ts +++ b/packages/shared/src/retry/retry.ts @@ -28,11 +28,12 @@ export class RetryHandler { async execute( operation: () => Promise, params: { abortSignal?: AbortSignal } = {}, - ): Promise { + ): Promise { + let result: T | undefined; let attemptCount = 0; while (true && !params.abortSignal?.aborted) { try { - await operation(); + result = await operation(); break; } catch (error) { if (!(error instanceof RetriableError)) { @@ -61,5 +62,7 @@ export class RetryHandler { if (params.abortSignal?.aborted) { throw new Error("Operation aborted"); } + + return result; } } diff --git a/packages/shared/test/retry/retry.spec.ts b/packages/shared/test/retry/retry.spec.ts index 7e4ae6f..87c9add 100644 --- a/packages/shared/test/retry/retry.spec.ts +++ b/packages/shared/test/retry/retry.spec.ts @@ -29,10 +29,11 @@ describe("RetryHandler", () => { const handler = new RetryHandler(new ExponentialBackoff(), mockLogger); const operation = vi.fn().mockResolvedValue("success"); - await handler.execute(operation); + const result = await handler.execute(operation); expect(operation).toHaveBeenCalledTimes(1); expect(mockLogger.debug).not.toHaveBeenCalled(); + expect(result).toBe("success"); }); it("retries on RetriableError and succeeds", async () => { From de2cdaf89d95dedec169682bc4616314803e5e79 Mon Sep 17 00:00:00 2001 From: nigiri <168690269+0xnigir1@users.noreply.github.com> Date: Tue, 21 Jan 2025 22:20:45 -0300 Subject: [PATCH 6/9] test: fix tests --- packages/data-flow/src/orchestrator.ts | 34 +--- .../data-flow/test/unit/orchestrator.spec.ts | 176 +++++++++++++++++- .../metadata/src/providers/ipfs.provider.ts | 9 +- .../src/providers/coingecko.provider.ts | 4 - .../providers/cachingProxy.provider.spec.ts | 9 +- .../test/providers/coingecko.provider.spec.ts | 41 ++++ .../test/providers/dummy.provider.spec.ts | 21 +++ packages/shared/test/retry/retry.spec.ts | 10 + 8 files changed, 259 insertions(+), 45 deletions(-) diff --git a/packages/data-flow/src/orchestrator.ts b/packages/data-flow/src/orchestrator.ts index 6ec4ea1..ea0aa16 100644 --- a/packages/data-flow/src/orchestrator.ts +++ b/packages/data-flow/src/orchestrator.ts @@ -36,8 +36,7 @@ import { CoreDependencies, DataLoader, delay, IQueue, iStrategyAbi, Queue } from type TokenWithTimestamps = { token: Token; - minTimestamp: number; - maxTimestamp: number; + timestamps: number[]; }; /** @@ -238,19 +237,21 @@ export class Orchestrator { const existing = tokenMap.get(token.address); if (existing) { - existing.minTimestamp = Math.min(existing.minTimestamp, event.blockTimestamp); - existing.maxTimestamp = Math.max(existing.maxTimestamp, event.blockTimestamp); + existing.timestamps.push(event.blockTimestamp); } else { tokenMap.set(token.address, { token, - minTimestamp: event.blockTimestamp, - maxTimestamp: event.blockTimestamp, + timestamps: [event.blockTimestamp], }); } } } - return Array.from(tokenMap.values()); + // Convert timestamps to unique sorted arrays + return Array.from(tokenMap.values()).map(({ token, timestamps }) => ({ + token, + timestamps: [...new Set(timestamps)].sort((a, b) => a - b), + })); } /** @@ -359,26 +360,11 @@ export class Orchestrator { */ private async bulkFetchTokens(tokens: TokenWithTimestamps[]): Promise { const results = await Promise.allSettled( - tokens.map(({ token, minTimestamp, maxTimestamp }) => + tokens.map(({ token, timestamps }) => this.retryHandler.execute(async () => { - // Get all unique timestamps between min and max - const events = this.eventsByBlockContext.values(); - const timestamps = Array.from(events) - .flat() - .filter( - (e) => - e.blockTimestamp >= minTimestamp && - e.blockTimestamp <= maxTimestamp, - ) - .map((e) => e.blockTimestamp); - - // Remove duplicates and sort - const uniqueTimestamps = [...new Set(timestamps)].sort(); - - // Get prices for all timestamps in the range const prices = await this.dependencies.pricingProvider.getTokenPrices( token.priceSourceCode, - uniqueTimestamps, + timestamps, ); return prices; }), diff --git a/packages/data-flow/test/unit/orchestrator.spec.ts b/packages/data-flow/test/unit/orchestrator.spec.ts index eb98258..1c48f22 100644 --- a/packages/data-flow/test/unit/orchestrator.spec.ts +++ b/packages/data-flow/test/unit/orchestrator.spec.ts @@ -1,8 +1,10 @@ -import { Address } from "viem"; +import { Address, zeroAddress } from "viem"; import { afterEach, beforeEach, describe, expect, it, vi } from "vitest"; import { EvmProvider } from "@grants-stack-indexer/chain-providers"; import { IIndexerClient } from "@grants-stack-indexer/indexer-client"; +import { IMetadataProvider } from "@grants-stack-indexer/metadata"; +import { IPricingProvider } from "@grants-stack-indexer/pricing"; import { Changeset, IApplicationPayoutRepository, @@ -16,16 +18,20 @@ import { import { RoundNotFoundForId } from "@grants-stack-indexer/repository/dist/src/internal.js"; import { AlloEvent, + AnyIndexerFetchedEvent, ChainId, ContractName, ContractToEventName, EventParams, ExponentialBackoff, Hex, + ICacheable, ILogger, ProcessorEvent, RateLimitError, StrategyEvent, + Token, + TokenCode, } from "@grants-stack-indexer/shared"; import { @@ -58,6 +64,8 @@ describe("Orchestrator", { sequential: true }, () => { let mockIndexerClient: IIndexerClient; let mockEventsRegistry: IEventsRegistry; let mockStrategyRegistry: IStrategyRegistry; + let mockPricingProvider: IPricingProvider & ICacheable; + let mockMetadataProvider: IMetadataProvider & ICacheable; let mockEvmProvider: EvmProvider; let abortController: AbortController; let runPromise: Promise | undefined; @@ -93,6 +101,17 @@ describe("Orchestrator", { sequential: true }, () => { readContract: vi.fn(), } as unknown as EvmProvider; + mockPricingProvider = { + getTokenPrice: vi.fn(), + getTokenPrices: vi.fn(), + clearCache: vi.fn(), + }; + + mockMetadataProvider = { + getMetadata: vi.fn(), + clearCache: vi.fn(), + }; + const dependencies: CoreDependencies = { evmProvider: mockEvmProvider, transactionManager: {} as unknown as ITransactionManager, @@ -101,13 +120,8 @@ describe("Orchestrator", { sequential: true }, () => { applicationRepository: {} as unknown as IApplicationRepository, donationRepository: {} as unknown as IDonationRepository, applicationPayoutRepository: {} as unknown as IApplicationPayoutRepository, - pricingProvider: { - getTokenPrice: vi.fn(), - getTokenPrices: vi.fn(), - }, - metadataProvider: { - getMetadata: vi.fn(), - }, + pricingProvider: mockPricingProvider, + metadataProvider: mockMetadataProvider, }; abortController = new AbortController(); @@ -756,6 +770,152 @@ describe("Orchestrator", { sequential: true }, () => { expect(logger.error).not.toHaveBeenCalled(); }); }); + + describe("getMetadataFromEvents", () => { + it("extracts unique metadata IDs from events", async () => { + const events = [ + { + params: { metadata: [1n, "id1"] }, + }, + { + params: { metadata: [1n, "id1"] }, // Duplicate + }, + { + params: { metadata: [1n, "id2"] }, + }, + { + params: { recipientAddress: "0x123" }, // No metadata + }, + ] as unknown as AnyIndexerFetchedEvent[]; + + const result = await orchestrator["getMetadataFromEvents"](events); + expect(result).toEqual(["id1", "id2"]); + }); + }); + + describe("getTokensFromEvents", () => { + const mockToken: Token = { + address: zeroAddress, + decimals: 18, + code: "ETH" as TokenCode, + priceSourceCode: "ETH" as TokenCode, + }; + + it("collects unique timestamps for each token", async () => { + const events = [ + { + params: { + token: zeroAddress, + amount: "1000000000000000000", // 1 ETH + }, + blockTimestamp: 1000, + }, + { + params: { + token: zeroAddress, + amount: "1000000000000000000", + }, + blockTimestamp: 1000, // Duplicate timestamp + }, + { + params: { + token: zeroAddress, + amount: "1000000000000000000", + }, + blockTimestamp: 2000, + }, + ] as unknown as AnyIndexerFetchedEvent[]; + + const result = await orchestrator["getTokensFromEvents"](events); + + expect(result).toEqual([ + { + token: mockToken, + timestamps: [1000, 2000], + }, + ]); + }); + + it("ignores events with zero amounts", async () => { + const events = [ + { + params: { + token: "0x123", + amount: "0", + }, + blockTimestamp: 1000, + }, + ] as unknown as AnyIndexerFetchedEvent[]; + + const result = await orchestrator["getTokensFromEvents"](events); + expect(result).toEqual([]); + }); + + it("ignores events with invalid tokens", async () => { + const events = [ + { + params: { + token: "0xInvalid", + amount: "1000000000000000000", + }, + blockTimestamp: 1000, + }, + ] as unknown as AnyIndexerFetchedEvent[]; + + const result = await orchestrator["getTokensFromEvents"](events); + expect(result).toEqual([]); + }); + }); + + describe("bulkFetchMetadataAndPricesForBatch", () => { + it("clears cache and fetches metadata and prices in parallel", async () => { + const events = [ + { + params: { + metadata: ["type", "id1"], + token: zeroAddress, + amount: "1000000000000000000", + }, + blockTimestamp: 1000, + }, + ] as unknown as AnyIndexerFetchedEvent[]; + + vi.spyOn(mockPricingProvider, "getTokenPrices").mockResolvedValue([ + { timestampMs: 1000, priceUsd: 1500 }, + ]); + + vi.spyOn(mockMetadataProvider, "getMetadata").mockResolvedValue({ name: "Test" }); + + await orchestrator["bulkFetchMetadataAndPricesForBatch"](events); + + expect(mockMetadataProvider.clearCache).toHaveBeenCalled(); + expect(mockMetadataProvider.getMetadata).toHaveBeenCalledWith("id1"); + expect(mockPricingProvider.getTokenPrices).toHaveBeenCalledWith("ETH", [1000]); + }); + + it("continues processing even if one fetch fails", async () => { + const events = [ + { + params: { + metadata: ["type", "id1"], + token: "0x123", + amount: "1000000000000000000", + }, + blockTimestamp: 1000, + }, + ] as unknown as AnyIndexerFetchedEvent[]; + + vi.spyOn(mockMetadataProvider, "getMetadata").mockRejectedValue( + new Error("Fetch failed"), + ); + vi.spyOn(mockPricingProvider, "getTokenPrices").mockResolvedValue([]); + + // Should not throw + await expect( + orchestrator["bulkFetchMetadataAndPricesForBatch"](events), + ).resolves.not.toThrow(); + }); + }); }); // Helper function to create mock events diff --git a/packages/metadata/src/providers/ipfs.provider.ts b/packages/metadata/src/providers/ipfs.provider.ts index 9a0cd08..2521c3d 100644 --- a/packages/metadata/src/providers/ipfs.provider.ts +++ b/packages/metadata/src/providers/ipfs.provider.ts @@ -37,12 +37,9 @@ export class IpfsProvider implements IMetadataProvider { return undefined; } - // Create array of gateways starting from current index - const orderedGateways = Array.from({ length: this.gateways.length }, () => - this.getNextGateway(), - ); - - for (const gateway of orderedGateways) { + const startIndex = this.currentGatewayIndex; + for (let i = 0; i < this.gateways.length; i++) { + const gateway = this.gateways[(startIndex + i) % this.gateways.length]!; const url = `${gateway}/ipfs/${ipfsCid}`; try { //TODO: retry policy for each gateway diff --git a/packages/pricing/src/providers/coingecko.provider.ts b/packages/pricing/src/providers/coingecko.provider.ts index b08d1cf..67b9c72 100644 --- a/packages/pricing/src/providers/coingecko.provider.ts +++ b/packages/pricing/src/providers/coingecko.provider.ts @@ -161,10 +161,6 @@ export class CoingeckoProvider implements IPricingProvider { const effectiveMin = Math.min(...timestamps); let effectiveMax = Math.max(...timestamps); - if (effectiveMin === effectiveMax || effectiveMin > effectiveMax) { - return []; - } - if (effectiveMax - effectiveMin < MIN_GRANULARITY_MS) { effectiveMax = effectiveMin + MIN_GRANULARITY_MS; } diff --git a/packages/pricing/test/providers/cachingProxy.provider.spec.ts b/packages/pricing/test/providers/cachingProxy.provider.spec.ts index 1e80581..313bf8f 100644 --- a/packages/pricing/test/providers/cachingProxy.provider.spec.ts +++ b/packages/pricing/test/providers/cachingProxy.provider.spec.ts @@ -1,7 +1,7 @@ import { beforeEach, describe, expect, it, vi } from "vitest"; import { ICache, PriceCacheKey } from "@grants-stack-indexer/repository"; -import { ILogger, TokenCode } from "@grants-stack-indexer/shared"; +import { ICacheable, ILogger, TokenCode } from "@grants-stack-indexer/shared"; import { IPricingProvider, TokenPrice } from "../../src/internal.js"; import { CachingPricingProvider } from "../../src/providers/cachingProxy.provider.js"; @@ -9,11 +9,14 @@ import { CachingPricingProvider } from "../../src/providers/cachingProxy.provide describe("CachingPricingProvider", () => { const mockProvider = { getTokenPrice: vi.fn(), - } as unknown as IPricingProvider; + getTokenPrices: vi.fn(), + clearCache: vi.fn(), + } as unknown as IPricingProvider & ICacheable; const mockCache = { get: vi.fn(), - set: vi.fn(), + set: vi.fn().mockImplementation(() => Promise.resolve()), + clearCache: vi.fn(), } as unknown as ICache; const mockLogger = { diff --git a/packages/pricing/test/providers/coingecko.provider.spec.ts b/packages/pricing/test/providers/coingecko.provider.spec.ts index f72e595..38bb757 100644 --- a/packages/pricing/test/providers/coingecko.provider.spec.ts +++ b/packages/pricing/test/providers/coingecko.provider.spec.ts @@ -179,4 +179,45 @@ describe("CoingeckoProvider", () => { ).rejects.toThrow(NetworkError); }); }); + + describe("getTokenPrices", () => { + it("handles empty timestamps array", async () => { + const result = await provider.getTokenPrices("ETH" as TokenCode, []); + expect(result).toEqual([]); + }); + + it("fetches prices within minimum granularity", async () => { + const timestamps = [1000, 1100]; // Less than MIN_GRANULARITY_MS apart + mock.get.mockResolvedValue({ + data: { + prices: [ + [1000, 1500], + [1100, 1600], + ], + }, + }); + + await provider.getTokenPrices("ETH" as TokenCode, timestamps); + expect(mock.get).toHaveBeenCalledWith(expect.stringContaining(`&interval=5m`)); + }); + + it("throws UnsupportedToken for unknown token", async () => { + await expect(provider.getTokenPrices("UNKNOWN" as TokenCode, [1000])).rejects.toThrow( + UnsupportedToken, + ); + }); + + it("handles rate limiting errors", async () => { + mock.get.mockRejectedValueOnce({ + status: 429, + data: "Rate limit exceeded", + isAxiosError: true, + response: { headers: { "retry-after": "60" } }, + }); + + await expect(provider.getTokenPrices("ETH" as TokenCode, [1000])).rejects.toThrow( + RateLimitError, + ); + }); + }); }); diff --git a/packages/pricing/test/providers/dummy.provider.spec.ts b/packages/pricing/test/providers/dummy.provider.spec.ts index 160361b..7c77dfd 100644 --- a/packages/pricing/test/providers/dummy.provider.spec.ts +++ b/packages/pricing/test/providers/dummy.provider.spec.ts @@ -15,4 +15,25 @@ describe("DummyPricingProvider", () => { timestampMs: 11111111, }); }); + + describe("getTokenPrices", () => { + it("returns dummy prices for all timestamps", async () => { + const provider = new DummyPricingProvider(1); + const timestamps = [1000, 2000, 3000]; + + const result = await provider.getTokenPrices("ETH" as TokenCode, timestamps); + expect(result).toEqual( + timestamps.map((ts) => ({ + timestampMs: ts, + priceUsd: 1, + })), + ); + }); + + it("handles empty timestamps array", async () => { + const provider = new DummyPricingProvider(); + const result = await provider.getTokenPrices("ETH" as TokenCode, []); + expect(result).toEqual([]); + }); + }); }); diff --git a/packages/shared/test/retry/retry.spec.ts b/packages/shared/test/retry/retry.spec.ts index 87c9add..b68a306 100644 --- a/packages/shared/test/retry/retry.spec.ts +++ b/packages/shared/test/retry/retry.spec.ts @@ -36,6 +36,16 @@ describe("RetryHandler", () => { expect(result).toBe("success"); }); + it("handles undefined result from operation", async () => { + const handler = new RetryHandler(new ExponentialBackoff(), mockLogger); + const operation = vi.fn().mockResolvedValue(undefined); + + const result = await handler.execute(operation); + + expect(result).toBeUndefined(); + expect(operation).toHaveBeenCalledTimes(1); + }); + it("retries on RetriableError and succeeds", async () => { vi.useFakeTimers(); const handler = new RetryHandler(new ExponentialBackoff(), mockLogger); From b2202fa8b41f2955fbc770f1ec98e193b2cc0fa2 Mon Sep 17 00:00:00 2001 From: nigiri <168690269+0xnigir1@users.noreply.github.com> Date: Tue, 21 Jan 2025 22:21:52 -0300 Subject: [PATCH 7/9] style: delete unused function --- packages/metadata/src/providers/ipfs.provider.ts | 6 ------ 1 file changed, 6 deletions(-) diff --git a/packages/metadata/src/providers/ipfs.provider.ts b/packages/metadata/src/providers/ipfs.provider.ts index 2521c3d..ae4bdf0 100644 --- a/packages/metadata/src/providers/ipfs.provider.ts +++ b/packages/metadata/src/providers/ipfs.provider.ts @@ -22,12 +22,6 @@ export class IpfsProvider implements IMetadataProvider { this.axiosInstance = axios.create(); } - private getNextGateway(): string { - const gateway = this.gateways[this.currentGatewayIndex]!; - this.currentGatewayIndex = (this.currentGatewayIndex + 1) % this.gateways.length; - return gateway; - } - /* @inheritdoc */ async getMetadata( ipfsCid: string, From 6126ed6d876d0dfe2061b6bea379258dd8e60b57 Mon Sep 17 00:00:00 2001 From: nigiri <168690269+0xnigir1@users.noreply.github.com> Date: Wed, 22 Jan 2025 16:46:57 -0300 Subject: [PATCH 8/9] fix: incorrect method name --- packages/pricing/src/providers/coingecko.provider.ts | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/packages/pricing/src/providers/coingecko.provider.ts b/packages/pricing/src/providers/coingecko.provider.ts index 67b9c72..7a3efcb 100644 --- a/packages/pricing/src/providers/coingecko.provider.ts +++ b/packages/pricing/src/providers/coingecko.provider.ts @@ -127,7 +127,7 @@ export class CoingeckoProvider implements IPricingProvider { }; } catch (error: unknown) { if (isAxiosError(error)) { - this.handleAxiosError(error, path); + this.handleAxiosError(error, path, "getTokenPrice"); } const errorMessage = @@ -150,7 +150,7 @@ export class CoingeckoProvider implements IPricingProvider { if (!tokenId) { throw new UnsupportedToken(tokenCode, { className: CoingeckoProvider.name, - methodName: "getTokenPrice", + methodName: "getTokenPrices", }); } @@ -175,7 +175,7 @@ export class CoingeckoProvider implements IPricingProvider { })); } catch (error: unknown) { if (isAxiosError(error)) { - this.handleAxiosError(error, path); + this.handleAxiosError(error, path, "getTokenPrices"); } const errorMessage = @@ -184,7 +184,7 @@ export class CoingeckoProvider implements IPricingProvider { throw new UnknownPricingException(errorMessage, { className: CoingeckoProvider.name, - methodName: "getTokenPrice", + methodName: "getTokenPrices", additionalData: { path, }, @@ -192,10 +192,10 @@ export class CoingeckoProvider implements IPricingProvider { } } - private handleAxiosError(error: AxiosError, path: string): void { + private handleAxiosError(error: AxiosError, path: string, methodName: string): void { const errorContext = { className: CoingeckoProvider.name, - methodName: "getTokenPrice", + methodName, additionalData: { path, }, From deff69e8462538d2212f0fe35995db98503a0075 Mon Sep 17 00:00:00 2001 From: nigiri <168690269+0xnigir1@users.noreply.github.com> Date: Wed, 22 Jan 2025 17:27:22 -0300 Subject: [PATCH 9/9] docs: add missing jsdocs to functions --- packages/data-flow/src/orchestrator.ts | 49 +++++++++++++------ packages/pricing/src/constants/index.ts | 2 + .../src/providers/cachingProxy.provider.ts | 25 ++++++++++ 3 files changed, 62 insertions(+), 14 deletions(-) diff --git a/packages/data-flow/src/orchestrator.ts b/packages/data-flow/src/orchestrator.ts index ea0aa16..407c20a 100644 --- a/packages/data-flow/src/orchestrator.ts +++ b/packages/data-flow/src/orchestrator.ts @@ -50,6 +50,7 @@ type TokenWithTimestamps = { * The Orchestrator implements a continuous processing loop that: * * 1. Fetches batches of events from the indexer and stores them in an internal queue + * 1.5 Bulk fetches metadata and prices for the batch, improving performance by reducing the number of requests and parallelizing them * 2. Processes each event from the queue: * - For strategy events and PoolCreated from Allo contract, enhances them with strategyId * - Forwards the event to the Events Processor which is in charge of delagating the processing of the event to the correct handler @@ -209,7 +210,12 @@ export class Orchestrator { }); } - private async getMetadataFromEvents(events: AnyIndexerFetchedEvent[]): Promise { + /** + * Extracts unique metadata ids from the events batch. + * @param events - Array of indexer fetched events to process + * @returns Array of unique metadata ids found in the events + */ + private getMetadataFromEvents(events: AnyIndexerFetchedEvent[]): string[] { const ids = new Set(); for (const event of events) { @@ -221,9 +227,12 @@ export class Orchestrator { return Array.from(ids); } - private async getTokensFromEvents( - events: AnyIndexerFetchedEvent[], - ): Promise { + /** + * Extracts unique tokens from the events batch. Leaves out tokens with zero amount and sorts the timestamps. + * @param events - Array of indexer fetched events to process + * @returns Array of unique tokens with timestamps found in the events + */ + private getTokensFromEvents(events: AnyIndexerFetchedEvent[]): TokenWithTimestamps[] { const tokenMap = new Map(); for (const event of events) { @@ -258,6 +267,9 @@ export class Orchestrator { * Sometimes the TimestampsUpdated event is part of the _initialize() function of a strategy. * In this case, the event is emitted before the PoolCreated event. We can safely ignore the error * if the PoolCreated event is present in the same block. + * @param error - The error + * @param event - The event + * @returns True if the error should be ignored, false otherwise */ private shouldIgnoreTimestampsUpdatedError( error: Error, @@ -281,6 +293,10 @@ export class Orchestrator { return false; } + /** + * Fetches the next events batch from the indexer + * @returns The next events batch + */ private async getNextEventsBatch(): Promise { const lastProcessedEvent = await this.eventsRegistry.getLastProcessedEvent(this.chainId); const blockNumber = lastProcessedEvent?.blockNumber ?? 0; @@ -298,18 +314,18 @@ export class Orchestrator { } /** - * Clear caches and fetch metadata and prices for the batch + * Clear pricing and metadata caches and bulk fetch metadata and prices for the batch + * @param events - The events batch */ private async bulkFetchMetadataAndPricesForBatch( events: AnyIndexerFetchedEvent[], ): Promise { - // Clear caches - if (this.dependencies.metadataProvider.clearCache) { - await this.dependencies.metadataProvider.clearCache(); - } + // Clear caches if the provider supports it + await this.dependencies.metadataProvider.clearCache?.(); + await this.dependencies.pricingProvider.clearCache?.(); - const metadataIds = await this.getMetadataFromEvents(events); - const tokens = await this.getTokensFromEvents(events); + const metadataIds = this.getMetadataFromEvents(events); + const tokens = this.getTokensFromEvents(events); await Promise.allSettled([ this.bulkFetchMetadata(metadataIds), @@ -318,7 +334,8 @@ export class Orchestrator { } /** - * Enqueue events and updates new context for the batch + * Enqueue events and updates new context of events by block number for the batch + * @param events - The events batch */ private async enqueueEvents(events: AnyIndexerFetchedEvent[]): Promise { // Clear previous context @@ -334,7 +351,9 @@ export class Orchestrator { } /** - * Fetch all possible metadata for the batch + * Fetch all possible metadata for the batch. + * @param metadataIds - The metadata ids + * @returns The metadata */ private async bulkFetchMetadata(metadataIds: string[]): Promise { const results = await Promise.allSettled( @@ -356,7 +375,9 @@ export class Orchestrator { } /** - * Fetch all possible prices for the batch + * Fetch all tokens prices + * @param tokens - The tokens with timestamps + * @returns The token prices */ private async bulkFetchTokens(tokens: TokenWithTimestamps[]): Promise { const results = await Promise.allSettled( diff --git a/packages/pricing/src/constants/index.ts b/packages/pricing/src/constants/index.ts index 768b4e9..188876d 100644 --- a/packages/pricing/src/constants/index.ts +++ b/packages/pricing/src/constants/index.ts @@ -1 +1,3 @@ +// This is the minimum granularity we can get data with on Enterprise plans of Coingecko +// Refer to https://support.coingecko.com/hc/en-us/articles/4538747001881-What-granularity-do-you-support-for-historical-data export const MIN_GRANULARITY_MS = 300_000; // 5 minutes diff --git a/packages/pricing/src/providers/cachingProxy.provider.ts b/packages/pricing/src/providers/cachingProxy.provider.ts index 4c63785..c1adba0 100644 --- a/packages/pricing/src/providers/cachingProxy.provider.ts +++ b/packages/pricing/src/providers/cachingProxy.provider.ts @@ -121,6 +121,12 @@ export class CachingPricingProvider implements IPricingProvider, ICacheable { .filter((price): price is TokenPrice => !!price); } + /** + * Fetches cached prices for the given token and timestamps. + * @param tokenCode - The token code + * @param timestamps - The timestamps + * @returns {PromiseSettledResult[]} - The cached prices + */ private async getCachedPrices( tokenCode: TokenCode, timestamps: number[], @@ -143,6 +149,12 @@ export class CachingPricingProvider implements IPricingProvider, ICacheable { ); } + /** + * Gets the timestamps that need to be fetched from the provider. + * @param timestamps - The timestamps + * @param cachedPrices - The cached prices PromiseSettledResult + * @returns The timestamps that need to be fetched + */ private getTimestampsToFetch( timestamps: number[], cachedPrices: PromiseSettledResult[], @@ -154,6 +166,13 @@ export class CachingPricingProvider implements IPricingProvider, ICacheable { }); } + /** + * Gets the closest price from the fetched prices. Updates the cache accordingly. + * @param tokenCode - The token code + * @param timestampsToFetch - The timestamps that need to be fetched + * @param sortedFetchedPrices - The sorted fetched prices + * @returns The closest prices + */ private getClosestPricesWithCache( tokenCode: TokenCode, timestampsToFetch: number[], @@ -184,6 +203,12 @@ export class CachingPricingProvider implements IPricingProvider, ICacheable { .filter((price): price is TokenPrice => price !== null); } + /** + * Builds a price map from cached and fetched prices. + * @param cachedPrices - The cached prices + * @param closestPrices - The fetched prices + * @returns The price map with all prices + */ private buildPriceMap( cachedPrices: PromiseSettledResult[], closestPrices: TokenPrice[],