diff --git a/apps/processing/src/services/processing.service.ts b/apps/processing/src/services/processing.service.ts index cf75f58..0cbe94a 100644 --- a/apps/processing/src/services/processing.service.ts +++ b/apps/processing/src/services/processing.service.ts @@ -1,7 +1,13 @@ import { optimism } from "viem/chains"; import { EvmProvider } from "@grants-stack-indexer/chain-providers"; -import { Orchestrator } from "@grants-stack-indexer/data-flow"; +import { + DatabaseEventRegistry, + DatabaseStrategyRegistry, + InMemoryCachedEventRegistry, + InMemoryCachedStrategyRegistry, + Orchestrator, +} from "@grants-stack-indexer/data-flow"; import { ChainId, Logger } from "@grants-stack-indexer/shared"; import { Environment } from "../config/env.js"; @@ -10,8 +16,10 @@ import { SharedDependencies, SharedDependenciesService } from "./index.js"; /** * Processor service application * - Initializes core dependencies (repositories, providers) via SharedDependenciesService + * - Initializes a StrategyRegistry and loads it with strategies from the database * For each chain: * - Sets up EVM provider with configured RPC endpoints + * - Instantiates an EventsRegistry and loads it with the last processed event for the chain * - Creates an Orchestrator instance to coordinate an specific chain: * - Fetching on-chain events via indexer client * - Processing events through registered handlers @@ -23,34 +31,63 @@ export class ProcessingService { private readonly logger = new Logger({ className: "ProcessingService" }); private readonly kyselyDatabase: SharedDependencies["kyselyDatabase"]; - private constructor(env: Environment, sharedDependencies: SharedDependencies) { - const { CHAINS: chains } = env; - const { core, registries, indexerClient, kyselyDatabase } = sharedDependencies; + private constructor( + orchestrators: Map, + kyselyDatabase: SharedDependencies["kyselyDatabase"], + ) { + this.orchestrators = orchestrators; this.kyselyDatabase = kyselyDatabase; + } + + static async initialize(env: Environment): Promise { + const sharedDependencies = await SharedDependenciesService.initialize(env); + const { CHAINS: chains } = env; + const { core, registriesRepositories, indexerClient, kyselyDatabase } = sharedDependencies; + const { eventRegistryRepository, strategyRegistryRepository } = registriesRepositories; + const orchestrators: Map = new Map(); + + const strategyRegistry = await InMemoryCachedStrategyRegistry.initialize( + new Logger({ className: "InMemoryCachedStrategyRegistry" }), + new DatabaseStrategyRegistry( + new Logger({ className: "DatabaseStrategyRegistry" }), + strategyRegistryRepository, + ), + ); + const eventsRegistry = new DatabaseEventRegistry( + new Logger({ className: "DatabaseEventRegistry" }), + eventRegistryRepository, + ); for (const chain of chains) { const chainLogger = new Logger({ chainId: chain.id as ChainId }); // Initialize EVM provider const evmProvider = new EvmProvider(chain.rpcUrls, optimism, chainLogger); - this.orchestrators.set( + // Initialize events registry for the chain + const cachedEventsRegistry = await InMemoryCachedEventRegistry.initialize( + new Logger({ className: "InMemoryCachedEventRegistry" }), + eventsRegistry, + [chain.id as ChainId], + ); + + orchestrators.set( chain.id as ChainId, new Orchestrator( chain.id as ChainId, { ...core, evmProvider }, indexerClient, - registries, + { + eventsRegistry: cachedEventsRegistry, + strategyRegistry, + }, chain.fetchLimit, chain.fetchDelayMs, chainLogger, ), ); } - } - static async initialize(env: Environment): Promise { - const sharedDependencies = await SharedDependenciesService.initialize(env); - return new ProcessingService(env, sharedDependencies); + return new ProcessingService(orchestrators, kyselyDatabase); } /** diff --git a/apps/processing/src/services/sharedDependencies.service.ts b/apps/processing/src/services/sharedDependencies.service.ts index b2e05ae..3fb29fe 100644 --- a/apps/processing/src/services/sharedDependencies.service.ts +++ b/apps/processing/src/services/sharedDependencies.service.ts @@ -1,19 +1,15 @@ -import { - CoreDependencies, - DatabaseStrategyRegistry, - IEventsRegistry, - InMemoryCachedStrategyRegistry, - InMemoryEventsRegistry, - IStrategyRegistry, -} from "@grants-stack-indexer/data-flow"; +import { CoreDependencies } from "@grants-stack-indexer/data-flow"; import { EnvioIndexerClient } from "@grants-stack-indexer/indexer-client"; import { IpfsProvider } from "@grants-stack-indexer/metadata"; import { PricingProviderFactory } from "@grants-stack-indexer/pricing"; import { createKyselyDatabase, + IEventRegistryRepository, + IStrategyRegistryRepository, KyselyApplicationPayoutRepository, KyselyApplicationRepository, KyselyDonationRepository, + KyselyEventRegistryRepository, KyselyProjectRepository, KyselyRoundRepository, KyselyStrategyRegistryRepository, @@ -24,9 +20,9 @@ import { Environment } from "../config/index.js"; export type SharedDependencies = { core: Omit; - registries: { - eventsRegistry: IEventsRegistry; - strategyRegistry: IStrategyRegistry; + registriesRepositories: { + eventRegistryRepository: IEventRegistryRepository; + strategyRegistryRepository: IStrategyRegistryRepository; }; indexerClient: EnvioIndexerClient; kyselyDatabase: ReturnType; @@ -35,7 +31,7 @@ export type SharedDependencies = { /** * Shared dependencies service * - Initializes core dependencies (repositories, providers) - * - Initializes registries + * - Initializes registries repositories * - Initializes indexer client */ export class SharedDependenciesService { @@ -68,20 +64,13 @@ export class SharedDependenciesService { new Logger({ className: "IpfsProvider" }), ); - // Initialize registries - const eventsRegistry = new InMemoryEventsRegistry( - new Logger({ className: "InMemoryEventsRegistry" }), - ); - const strategyRepository = new KyselyStrategyRegistryRepository( + const eventRegistryRepository = new KyselyEventRegistryRepository( kyselyDatabase, env.DATABASE_SCHEMA, ); - const strategyRegistry = await InMemoryCachedStrategyRegistry.initialize( - new Logger({ className: "InMemoryCachedStrategyRegistry" }), - new DatabaseStrategyRegistry( - new Logger({ className: "DatabaseStrategyRegistry" }), - strategyRepository, - ), + const strategyRegistryRepository = new KyselyStrategyRegistryRepository( + kyselyDatabase, + env.DATABASE_SCHEMA, ); // Initialize indexer client @@ -100,9 +89,9 @@ export class SharedDependenciesService { metadataProvider, applicationPayoutRepository, }, - registries: { - eventsRegistry, - strategyRegistry, + registriesRepositories: { + eventRegistryRepository, + strategyRegistryRepository, }, indexerClient, kyselyDatabase, diff --git a/apps/processing/test/unit/processing.service.spec.ts b/apps/processing/test/unit/processing.service.spec.ts index 6623fde..15b59ff 100644 --- a/apps/processing/test/unit/processing.service.spec.ts +++ b/apps/processing/test/unit/processing.service.spec.ts @@ -1,7 +1,13 @@ import { afterEach, beforeEach, describe, expect, it, vi } from "vitest"; import { EvmProvider } from "@grants-stack-indexer/chain-providers"; -import { Orchestrator } from "@grants-stack-indexer/data-flow"; +import { + DatabaseEventRegistry, + DatabaseStrategyRegistry, + InMemoryCachedEventRegistry, + InMemoryCachedStrategyRegistry, + Orchestrator, +} from "@grants-stack-indexer/data-flow"; import type { Environment } from "../../src/config/env.js"; import { ProcessingService } from "../../src/services/processing.service.js"; @@ -10,7 +16,7 @@ vi.mock("../../src/services/sharedDependencies.service.js", () => ({ SharedDependenciesService: { initialize: vi.fn(() => ({ core: {}, - registries: {}, + registriesRepositories: {}, indexerClient: {}, kyselyDatabase: { destroy: vi.fn(), @@ -23,6 +29,39 @@ vi.mock("@grants-stack-indexer/chain-providers", () => ({ EvmProvider: vi.fn(), })); +vi.mock("@grants-stack-indexer/data-flow", async (importOriginal) => { + const actual = await importOriginal(); + const mockStrategyRegistry = { + getStrategies: vi.fn(), + getStrategyId: vi.fn(), + saveStrategyId: vi.fn(), + }; + + const mockEventRegistry = { + getLastProcessedEvent: vi.fn(), + saveLastProcessedEvent: vi.fn(), + }; + + return { + ...actual, + InMemoryCachedStrategyRegistry: { + initialize: vi.fn().mockResolvedValue(mockStrategyRegistry), + }, + DatabaseStrategyRegistry: vi.fn().mockImplementation(() => ({ + getStrategies: vi.fn(), + getStrategyId: vi.fn(), + saveStrategyId: vi.fn(), + })), + DatabaseEventRegistry: vi.fn().mockImplementation(() => ({ + getLastProcessedEvent: vi.fn(), + saveLastProcessedEvent: vi.fn(), + })), + InMemoryCachedEventRegistry: { + initialize: vi.fn().mockResolvedValue(mockEventRegistry), + }, + }; +}); + vi.spyOn(Orchestrator.prototype, "run").mockImplementation(async function (signal: AbortSignal) { while (!signal.aborted) { await new Promise((resolve) => setTimeout(resolve, 100)); @@ -62,7 +101,12 @@ describe("ProcessingService", () => { }); it("initializes multiple orchestrators correctly", () => { + expect(InMemoryCachedStrategyRegistry.initialize).toHaveBeenCalledTimes(1); + expect(DatabaseStrategyRegistry).toHaveBeenCalledTimes(1); + expect(DatabaseEventRegistry).toHaveBeenCalledTimes(1); expect(EvmProvider).toHaveBeenCalledTimes(2); + expect(InMemoryCachedEventRegistry.initialize).toHaveBeenCalledTimes(2); + // Verify orchestrators were created with correct parameters expect(processingService["orchestrators"].size).toBe(2); diff --git a/apps/processing/test/unit/sharedDependencies.service.spec.ts b/apps/processing/test/unit/sharedDependencies.service.spec.ts index 9d309b9..bff277e 100644 --- a/apps/processing/test/unit/sharedDependencies.service.spec.ts +++ b/apps/processing/test/unit/sharedDependencies.service.spec.ts @@ -22,6 +22,7 @@ vi.mock("@grants-stack-indexer/repository", () => ({ getStrategyId: vi.fn(), saveStrategyId: vi.fn(), })), + KyselyEventRegistryRepository: vi.fn(), })); vi.mock("@grants-stack-indexer/pricing", () => ({ @@ -45,8 +46,12 @@ vi.mock("@grants-stack-indexer/data-flow", () => { saveStrategyId: vi.fn(), }; + const mockEventRegistry = { + getLastProcessedEvent: vi.fn(), + saveLastProcessedEvent: vi.fn(), + }; + return { - InMemoryEventsRegistry: vi.fn(), InMemoryCachedStrategyRegistry: { initialize: vi.fn().mockResolvedValue(mockStrategyRegistry), }, @@ -55,6 +60,13 @@ vi.mock("@grants-stack-indexer/data-flow", () => { getStrategyId: vi.fn(), saveStrategyId: vi.fn(), })), + DatabaseEventRegistry: vi.fn().mockImplementation(() => ({ + getLastProcessedEvent: vi.fn(), + saveLastProcessedEvent: vi.fn(), + })), + InMemoryCachedEventRegistry: { + initialize: vi.fn().mockResolvedValue(mockEventRegistry), + }, }; }); @@ -98,7 +110,7 @@ describe("SharedDependenciesService", () => { // Verify structure of returned dependencies expect(dependencies).toHaveProperty("core"); - expect(dependencies).toHaveProperty("registries"); + expect(dependencies).toHaveProperty("registriesRepositories"); expect(dependencies).toHaveProperty("indexerClient"); expect(dependencies).toHaveProperty("kyselyDatabase"); @@ -112,10 +124,7 @@ describe("SharedDependenciesService", () => { expect(dependencies.core).toHaveProperty("applicationPayoutRepository"); // Verify registries - expect(dependencies.registries).toHaveProperty("eventsRegistry"); - expect(dependencies.registries).toHaveProperty("strategyRegistry"); - - // Verify InMemoryCachedStrategyRegistry initialization - expect(dependencies.registries.strategyRegistry).toBeDefined(); + expect(dependencies.registriesRepositories).toHaveProperty("eventRegistryRepository"); + expect(dependencies.registriesRepositories).toHaveProperty("strategyRegistryRepository"); }); }); diff --git a/packages/data-flow/src/eventsRegistry.ts b/packages/data-flow/src/eventsRegistry.ts deleted file mode 100644 index 42bed46..0000000 --- a/packages/data-flow/src/eventsRegistry.ts +++ /dev/null @@ -1,40 +0,0 @@ -import type { - AnyEvent, - ChainId, - ContractName, - ILogger, - ProcessorEvent, -} from "@grants-stack-indexer/shared"; -import { stringify } from "@grants-stack-indexer/shared"; - -import type { IEventsRegistry } from "./internal.js"; - -/** - * Class to store the last processed event in memory - */ -//TODO: Implement storage version to persist the last processed event. we need to store it by chainId -export class InMemoryEventsRegistry implements IEventsRegistry { - private lastProcessedEvent: Map> = new Map(); - - constructor(private logger: ILogger) {} - - /** - * @inheritdoc - */ - async getLastProcessedEvent( - chainId: ChainId, - ): Promise | undefined> { - return this.lastProcessedEvent.get(chainId); - } - - /** - * @inheritdoc - */ - async saveLastProcessedEvent( - chainId: ChainId, - event: ProcessorEvent, - ): Promise { - this.logger.debug(`Saving last processed event: ${stringify(event, undefined, 4)}`); - this.lastProcessedEvent.set(chainId, event); - } -} diff --git a/packages/data-flow/src/external.ts b/packages/data-flow/src/external.ts index 4a22f36..954c0b7 100644 --- a/packages/data-flow/src/external.ts +++ b/packages/data-flow/src/external.ts @@ -1,7 +1,8 @@ export { DataLoader, - InMemoryEventsRegistry, InMemoryCachedStrategyRegistry, + InMemoryCachedEventRegistry, + DatabaseEventRegistry, DatabaseStrategyRegistry, Orchestrator, } from "./internal.js"; diff --git a/packages/data-flow/src/interfaces/eventsRegistry.interface.ts b/packages/data-flow/src/interfaces/eventsRegistry.interface.ts index 6c063e1..16e9fd7 100644 --- a/packages/data-flow/src/interfaces/eventsRegistry.interface.ts +++ b/packages/data-flow/src/interfaces/eventsRegistry.interface.ts @@ -1,4 +1,5 @@ -import { AnyEvent, ChainId, ContractName, ProcessorEvent } from "@grants-stack-indexer/shared"; +import { NewProcessedEvent, ProcessedEvent } from "@grants-stack-indexer/repository"; +import { ChainId } from "@grants-stack-indexer/shared"; /** * The events registry saves as a checkpoint to the last processed event by the system. @@ -10,16 +11,11 @@ export interface IEventsRegistry { * @param chainId - The chain id * @returns The last processed event or undefined if no event has been processed yet. */ - getLastProcessedEvent( - chainId: ChainId, - ): Promise | undefined>; + getLastProcessedEvent(chainId: ChainId): Promise; /** * Save the last processed event by the system * @param chainId - The chain id * @param event - The event to save. */ - saveLastProcessedEvent( - chainId: ChainId, - event: ProcessorEvent, - ): Promise; + saveLastProcessedEvent(chainId: ChainId, event: NewProcessedEvent): Promise; } diff --git a/packages/data-flow/src/interfaces/strategyRegistry.interface.ts b/packages/data-flow/src/interfaces/strategyRegistry.interface.ts index 965ad44..b75eabb 100644 --- a/packages/data-flow/src/interfaces/strategyRegistry.interface.ts +++ b/packages/data-flow/src/interfaces/strategyRegistry.interface.ts @@ -21,13 +21,13 @@ export interface IStrategyRegistry { * @param chainId - The chain id * @param strategyAddress - The strategy address * @param strategyId - The strategy id - * @param handled - Whether the strategy is handled + * @param handleable - Whether the strategy is handled */ saveStrategyId( chainId: ChainId, strategyAddress: Address, strategyId: Hex, - handled: boolean, + handleable: boolean, ): Promise; /** diff --git a/packages/data-flow/src/internal.ts b/packages/data-flow/src/internal.ts index cb9a1a9..9224340 100644 --- a/packages/data-flow/src/internal.ts +++ b/packages/data-flow/src/internal.ts @@ -6,6 +6,5 @@ export * from "./utils/index.js"; export * from "./data-loader/index.js"; export * from "./eventsFetcher.js"; export * from "./registries/index.js"; -export * from "./eventsRegistry.js"; export * from "./eventsProcessor.js"; export * from "./orchestrator.js"; diff --git a/packages/data-flow/src/orchestrator.ts b/packages/data-flow/src/orchestrator.ts index 2eb2266..0a2f53e 100644 --- a/packages/data-flow/src/orchestrator.ts +++ b/packages/data-flow/src/orchestrator.ts @@ -110,7 +110,10 @@ export class Orchestrator { await delay(this.fetchDelayInMs); continue; } - await this.eventsRegistry.saveLastProcessedEvent(this.chainId, event); + await this.eventsRegistry.saveLastProcessedEvent(this.chainId, { + ...event, + rawEvent: event, + }); event = await this.enhanceStrategyId(event); if (this.isPoolCreated(event)) { diff --git a/packages/data-flow/src/registries/event/cachedEventRegistry.ts b/packages/data-flow/src/registries/event/cachedEventRegistry.ts new file mode 100644 index 0000000..0c3c344 --- /dev/null +++ b/packages/data-flow/src/registries/event/cachedEventRegistry.ts @@ -0,0 +1,69 @@ +import { NewProcessedEvent, ProcessedEvent } from "@grants-stack-indexer/repository"; +import { ChainId, ILogger, stringify } from "@grants-stack-indexer/shared"; + +import { IEventsRegistry } from "../../internal.js"; + +/** + * Proxy class to cache the events in memory or fallback to another event registry + */ +export class InMemoryCachedEventRegistry implements IEventsRegistry { + private cache: Map = new Map(); + + private constructor( + private readonly logger: ILogger, + private readonly eventRegistry: IEventsRegistry, + cache: Map, + ) { + this.cache = structuredClone(cache); + } + + /** @inheritdoc */ + async getLastProcessedEvent(chainId: ChainId): Promise { + const cachedEvent = this.cache.get(chainId); + if (cachedEvent) { + return cachedEvent; + } + const event = await this.eventRegistry.getLastProcessedEvent(chainId); + if (event) { + this.cache.set(chainId, event); + } + return event; + } + + /** @inheritdoc */ + async saveLastProcessedEvent(chainId: ChainId, event: NewProcessedEvent): Promise { + this.logger.debug(`Saving last processed event: ${stringify(event, undefined, 4)}`); + await this.eventRegistry.saveLastProcessedEvent(chainId, event); + this.cache.set(chainId, { ...event, chainId }); + } + + /** + * Creates a new cached event registry instance. It will load the events into memory and cache them and + * fallback to the event registry if the event is not found in the cache. + * + * @param logger - The logger instance + * @param eventRegistry - The event registry instance + * @param chainIds - The chain ids to load the events for + * @returns The initialized cached event registry + */ + static async initialize( + logger: ILogger, + eventRegistry: IEventsRegistry, + chainIds: ChainId[], + ): Promise { + const events = await Promise.allSettled( + chainIds.map(async (chainId) => await eventRegistry.getLastProcessedEvent(chainId)), + ); + const cache = new Map(); + + logger.debug(`Loading events into memory...`); + + for (const event of events) { + if (event.status === "fulfilled" && event.value) { + cache.set(event.value.chainId, event.value); + } + } + + return new InMemoryCachedEventRegistry(logger, eventRegistry, cache); + } +} diff --git a/packages/data-flow/src/registries/event/dbEventRegistry.ts b/packages/data-flow/src/registries/event/dbEventRegistry.ts new file mode 100644 index 0000000..0196111 --- /dev/null +++ b/packages/data-flow/src/registries/event/dbEventRegistry.ts @@ -0,0 +1,28 @@ +import { + IEventRegistryRepository, + NewProcessedEvent, + ProcessedEvent, +} from "@grants-stack-indexer/repository"; +import { ChainId, ILogger } from "@grants-stack-indexer/shared"; + +import { IEventsRegistry } from "../../internal.js"; + +/** + * Class to store last processed event of a chain in Database + */ +export class DatabaseEventRegistry implements IEventsRegistry { + constructor( + private logger: ILogger, + private eventRepository: IEventRegistryRepository, + ) {} + + /** @inheritdoc */ + async getLastProcessedEvent(chainId: ChainId): Promise { + return this.eventRepository.getLastProcessedEvent(chainId); + } + + /** @inheritdoc */ + async saveLastProcessedEvent(chainId: ChainId, event: NewProcessedEvent): Promise { + return this.eventRepository.saveLastProcessedEvent(chainId, event); + } +} diff --git a/packages/data-flow/src/registries/event/index.ts b/packages/data-flow/src/registries/event/index.ts new file mode 100644 index 0000000..547781d --- /dev/null +++ b/packages/data-flow/src/registries/event/index.ts @@ -0,0 +1,2 @@ +export * from "./cachedEventRegistry.js"; +export * from "./dbEventRegistry.js"; diff --git a/packages/data-flow/src/registries/index.ts b/packages/data-flow/src/registries/index.ts index c6319bf..36ea2a7 100644 --- a/packages/data-flow/src/registries/index.ts +++ b/packages/data-flow/src/registries/index.ts @@ -1 +1,2 @@ export * from "./strategy/index.js"; +export * from "./event/index.js"; diff --git a/packages/data-flow/src/registries/strategy/cachedStrategyRegistry.ts b/packages/data-flow/src/registries/strategy/cachedStrategyRegistry.ts index 16899cc..05cfec8 100644 --- a/packages/data-flow/src/registries/strategy/cachedStrategyRegistry.ts +++ b/packages/data-flow/src/registries/strategy/cachedStrategyRegistry.ts @@ -58,7 +58,11 @@ export class InMemoryCachedStrategyRegistry implements IStrategyRegistry { const strategy = await this.strategyRegistry.getStrategyId(chainId, strategyAddress); if (strategy) { - this.cache.get(chainId)?.set(strategyAddress, strategy); + if (!this.cache.has(strategy.chainId)) { + this.cache.set(strategy.chainId, new Map()); + } + + this.cache.get(strategy.chainId)?.set(strategyAddress, strategy); } return strategy; } @@ -79,6 +83,10 @@ export class InMemoryCachedStrategyRegistry implements IStrategyRegistry { ); await this.strategyRegistry.saveStrategyId(chainId, strategyAddress, strategyId, handled); + if (!this.cache.has(chainId)) { + this.cache.set(chainId, new Map()); + } + this.cache.get(chainId)?.set(strategyAddress, { address: strategyAddress, id: strategyId, diff --git a/packages/data-flow/test/registries/cachedEventRegistry.spec.ts b/packages/data-flow/test/registries/cachedEventRegistry.spec.ts new file mode 100644 index 0000000..6e9caad --- /dev/null +++ b/packages/data-flow/test/registries/cachedEventRegistry.spec.ts @@ -0,0 +1,123 @@ +import { beforeEach, describe, expect, it, vi } from "vitest"; + +import { NewProcessedEvent, ProcessedEvent } from "@grants-stack-indexer/repository"; +import { ChainId, ILogger } from "@grants-stack-indexer/shared"; + +import { IEventsRegistry } from "../../src/internal.js"; +import { InMemoryCachedEventRegistry } from "../../src/registries/event/cachedEventRegistry.js"; + +describe("InMemoryCachedEventRegistry", () => { + const logger: ILogger = { + debug: vi.fn(), + error: vi.fn(), + info: vi.fn(), + warn: vi.fn(), + }; + + const mockEventRegistry: IEventsRegistry = { + getLastProcessedEvent: vi.fn(), + saveLastProcessedEvent: vi.fn(), + }; + + const chainId = 1 as ChainId; + const mockEvent: ProcessedEvent = { + chainId, + blockNumber: 100, + blockTimestamp: 1234567890, + logIndex: 1, + }; + + beforeEach(() => { + vi.clearAllMocks(); + }); + + it("initialize with existing events", async () => { + vi.mocked(mockEventRegistry.getLastProcessedEvent).mockResolvedValue(mockEvent); + + const registry = await InMemoryCachedEventRegistry.initialize(logger, mockEventRegistry, [ + chainId, + ]); + + const cached = await registry.getLastProcessedEvent(chainId); + expect(cached).toEqual(mockEvent); + expect(mockEventRegistry.getLastProcessedEvent).toHaveBeenCalledTimes(1); + }); + + it("fetch from underlying registry when not in cache", async () => { + vi.mocked(mockEventRegistry.getLastProcessedEvent) + .mockResolvedValueOnce(undefined) // For initialization + .mockResolvedValueOnce(mockEvent); // For actual fetch + + const registry = await InMemoryCachedEventRegistry.initialize(logger, mockEventRegistry, [ + chainId, + ]); + + const result = await registry.getLastProcessedEvent(chainId); + expect(result).toEqual(mockEvent); + expect(mockEventRegistry.getLastProcessedEvent).toHaveBeenCalledTimes(2); + }); + + it("save event and update cache", async () => { + const registry = await InMemoryCachedEventRegistry.initialize(logger, mockEventRegistry, [ + chainId, + ]); + + const newEvent: NewProcessedEvent = { + blockNumber: 200, + blockTimestamp: 1234577890, + logIndex: 2, + }; + + await registry.saveLastProcessedEvent(chainId, newEvent); + + // Verify the event was saved to underlying registry + expect(mockEventRegistry.saveLastProcessedEvent).toHaveBeenCalledWith(chainId, newEvent); + + // Verify the cache was updated + const cached = await registry.getLastProcessedEvent(chainId); + expect(cached).toEqual({ + ...newEvent, + chainId, + }); + + // Verify no additional calls to underlying registry + expect(mockEventRegistry.getLastProcessedEvent).toHaveBeenCalledTimes(1); + }); + + it("initialize with multiple chain ids", async () => { + const chainId2 = 5 as ChainId; + const mockEvent2: ProcessedEvent = { ...mockEvent, chainId: chainId2 }; + + vi.mocked(mockEventRegistry.getLastProcessedEvent).mockImplementation(async (chain) => + chain === chainId ? mockEvent : mockEvent2, + ); + + const registry = await InMemoryCachedEventRegistry.initialize(logger, mockEventRegistry, [ + chainId, + chainId2, + ]); + + const cached1 = await registry.getLastProcessedEvent(chainId); + const cached2 = await registry.getLastProcessedEvent(chainId2); + + expect(cached1).toEqual(mockEvent); + expect(cached2).toEqual(mockEvent2); + expect(mockEventRegistry.getLastProcessedEvent).toHaveBeenCalledTimes(2); + }); + + it("throws error when underlying registry throws error", async () => { + vi.mocked(mockEventRegistry.saveLastProcessedEvent).mockRejectedValue( + new Error("Saving error"), + ); + + const registry = await InMemoryCachedEventRegistry.initialize(logger, mockEventRegistry, [ + chainId, + ]); + const cacheSetSpy = vi.spyOn(registry["cache"], "set"); + + await expect(registry.saveLastProcessedEvent(chainId, mockEvent)).rejects.toThrow( + "Saving error", + ); + expect(cacheSetSpy).not.toHaveBeenCalled(); + }); +}); diff --git a/packages/data-flow/test/registries/dbEventRegistry.spec.ts b/packages/data-flow/test/registries/dbEventRegistry.spec.ts new file mode 100644 index 0000000..679c000 --- /dev/null +++ b/packages/data-flow/test/registries/dbEventRegistry.spec.ts @@ -0,0 +1,66 @@ +import { describe, expect, it, vi } from "vitest"; + +import { + IEventRegistryRepository, + NewProcessedEvent, + ProcessedEvent, +} from "@grants-stack-indexer/repository"; +import { ChainId, ILogger } from "@grants-stack-indexer/shared"; + +import { DatabaseEventRegistry } from "../../src/registries/event/dbEventRegistry.js"; + +describe("DatabaseEventRegistry", () => { + const logger: ILogger = { + debug: vi.fn(), + error: vi.fn(), + info: vi.fn(), + warn: vi.fn(), + }; + + const mockEventRepository: IEventRegistryRepository = { + getLastProcessedEvent: vi.fn(), + saveLastProcessedEvent: vi.fn(), + }; + + const chainId = 1 as ChainId; + + it("return undefined for non-existent last processed event", async () => { + const registry = new DatabaseEventRegistry(logger, mockEventRepository); + + vi.mocked(mockEventRepository.getLastProcessedEvent).mockResolvedValue(undefined); + + const event = await registry.getLastProcessedEvent(chainId); + expect(event).toBeUndefined(); + expect(mockEventRepository.getLastProcessedEvent).toHaveBeenCalledWith(chainId); + }); + + it("return last processed event when it exists", async () => { + const registry = new DatabaseEventRegistry(logger, mockEventRepository); + const mockEvent: ProcessedEvent = { + chainId, + blockNumber: 100, + blockTimestamp: 1234567890, + logIndex: 1, + }; + + vi.mocked(mockEventRepository.getLastProcessedEvent).mockResolvedValue(mockEvent); + + const event = await registry.getLastProcessedEvent(chainId); + expect(event).toEqual(mockEvent); + expect(mockEventRepository.getLastProcessedEvent).toHaveBeenCalledWith(chainId); + }); + + it("save last processed event", async () => { + const registry = new DatabaseEventRegistry(logger, mockEventRepository); + const newEvent: NewProcessedEvent = { + blockNumber: 100, + blockTimestamp: 1234567890, + logIndex: 1, + }; + + vi.mocked(mockEventRepository.saveLastProcessedEvent).mockResolvedValue(); + + await registry.saveLastProcessedEvent(chainId, newEvent); + expect(mockEventRepository.saveLastProcessedEvent).toHaveBeenCalledWith(chainId, newEvent); + }); +}); diff --git a/packages/data-flow/test/unit/eventsRegistry.spec.ts b/packages/data-flow/test/unit/eventsRegistry.spec.ts deleted file mode 100644 index 1ddf8b7..0000000 --- a/packages/data-flow/test/unit/eventsRegistry.spec.ts +++ /dev/null @@ -1,105 +0,0 @@ -import { describe, expect, it, vi } from "vitest"; - -import { ChainId, ILogger, ProcessorEvent } from "@grants-stack-indexer/shared"; - -import { InMemoryEventsRegistry } from "../../src/eventsRegistry.js"; - -describe("InMemoryEventsRegistry", () => { - const logger: ILogger = { - debug: vi.fn(), - error: vi.fn(), - info: vi.fn(), - warn: vi.fn(), - }; - const chainId = 1 as ChainId; - it("return null when no event has been saved", async () => { - const registry = new InMemoryEventsRegistry(logger); - const lastEvent = await registry.getLastProcessedEvent(chainId); - expect(lastEvent).toBeUndefined(); - }); - - it("save and retrieve the last processed event", async () => { - const registry = new InMemoryEventsRegistry(logger); - const mockEvent: ProcessorEvent<"Allo", "PoolCreated"> = { - contractName: "Allo", - eventName: "PoolCreated", - blockNumber: 1, - blockTimestamp: 1234567890, - chainId: 1 as ChainId, - logIndex: 0, - srcAddress: "0x123", - strategyId: "0xstrategy", - params: { - poolId: "1", - profileId: "0x456", - strategy: "0x789", - token: "0xtoken", - amount: "0", - metadata: ["1", "0xmetadata"], - }, - transactionFields: { - hash: "0xabc", - transactionIndex: 0, - }, - }; - - await registry.saveLastProcessedEvent(chainId, mockEvent); - const retrievedEvent = await registry.getLastProcessedEvent(chainId); - - expect(retrievedEvent).toEqual(mockEvent); - }); - - it("updates the last processed event when saving multiple times", async () => { - const registry = new InMemoryEventsRegistry(logger); - - const firstEvent: ProcessorEvent<"Allo", "PoolCreated"> = { - contractName: "Allo", - eventName: "PoolCreated", - blockNumber: 1, - blockTimestamp: 1234567890, - chainId: 1 as ChainId, - logIndex: 0, - srcAddress: "0x123", - strategyId: "0xstrategy", - params: { - poolId: "1", - profileId: "0x456", - strategy: "0x789", - token: "0xtoken", - amount: "0", - metadata: ["1", "0xmetadata"], - }, - transactionFields: { - hash: "0xabc", - transactionIndex: 0, - }, - }; - - const secondEvent: ProcessorEvent<"Strategy", "RegisteredWithSender"> = { - contractName: "Strategy", - eventName: "RegisteredWithSender", - blockNumber: 1, - blockTimestamp: 1234567890, - chainId: 1 as ChainId, - logIndex: 0, - srcAddress: "0x123", - strategyId: "0xstrategy", - params: { - recipientId: "0xrecipient", - data: "0xdata", - sender: "0xsender", - }, - transactionFields: { - hash: "0xabc", - transactionIndex: 0, - }, - }; - - await registry.saveLastProcessedEvent(chainId, firstEvent); - await registry.saveLastProcessedEvent(chainId, secondEvent); - - const lastEvent = await registry.getLastProcessedEvent(chainId); - expect(lastEvent).toEqual(secondEvent); - expect(lastEvent).not.toEqual(firstEvent); - }); -}); diff --git a/packages/data-flow/test/unit/orchestrator.spec.ts b/packages/data-flow/test/unit/orchestrator.spec.ts index 7b492c2..7b90c04 100644 --- a/packages/data-flow/test/unit/orchestrator.spec.ts +++ b/packages/data-flow/test/unit/orchestrator.spec.ts @@ -178,14 +178,7 @@ describe("Orchestrator", { sequential: true }, () => { expect(eventsProcessorSpy).toHaveBeenCalledWith(mockEvents[0]); expect(eventsProcessorSpy).toHaveBeenCalledWith(mockEvents[1]); - expect(mockEventsRegistry.saveLastProcessedEvent).toHaveBeenCalledWith( - chainId, - mockEvents[0], - ); - expect(mockEventsRegistry.saveLastProcessedEvent).toHaveBeenCalledWith( - chainId, - mockEvents[1], - ); + expect(mockEventsRegistry.saveLastProcessedEvent).toHaveBeenCalledTimes(2); }); it("wait and keep polling on empty queue", async () => { @@ -274,10 +267,7 @@ describe("Orchestrator", { sequential: true }, () => { }); expect(orchestrator["dataLoader"].applyChanges).toHaveBeenCalledTimes(1); expect(orchestrator["dataLoader"].applyChanges).toHaveBeenCalledWith(changesets); - expect(mockEventsRegistry.saveLastProcessedEvent).toHaveBeenCalledWith( - chainId, - mockEvent, - ); + expect(mockEventsRegistry.saveLastProcessedEvent).toHaveBeenCalled(); }); it("save strategyId to registry on PoolCreated event", async () => { @@ -409,10 +399,7 @@ describe("Orchestrator", { sequential: true }, () => { ); expect(orchestrator["dataLoader"].applyChanges).toHaveBeenCalledTimes(1); expect(orchestrator["dataLoader"].applyChanges).toHaveBeenCalledWith(changesets); - expect(mockEventsRegistry.saveLastProcessedEvent).toHaveBeenCalledWith( - chainId, - mockEvent, - ); + expect(mockEventsRegistry.saveLastProcessedEvent).toHaveBeenCalled(); }); } diff --git a/packages/repository/src/db/connection.ts b/packages/repository/src/db/connection.ts index d9e5568..d7398a4 100644 --- a/packages/repository/src/db/connection.ts +++ b/packages/repository/src/db/connection.ts @@ -12,6 +12,7 @@ import { Application, ApplicationPayout, Donation as DonationTable, + ProcessedEvent as EventRegistryTable, MatchingDistribution, PendingProjectRole as PendingProjectRoleTable, PendingRoundRole as PendingRoundRoleTable, @@ -20,7 +21,7 @@ import { Round, RoundRole as RoundRoleTable, StatusSnapshot, - Strategy as StrategyTable, + Strategy as StrategyRegistryTable, } from "../internal.js"; const { Pool } = pg; @@ -60,7 +61,8 @@ export interface Database { applications: ApplicationTable; donations: DonationTable; applicationsPayouts: ApplicationPayoutTable; - strategies: StrategyTable; + strategiesRegistry: StrategyRegistryTable; + eventsRegistry: EventRegistryTable; } /** diff --git a/packages/repository/src/external.ts b/packages/repository/src/external.ts index fec981b..08222ec 100644 --- a/packages/repository/src/external.ts +++ b/packages/repository/src/external.ts @@ -9,6 +9,7 @@ export type { IDonationRepository, IApplicationPayoutRepository, IStrategyRegistryRepository, + IEventRegistryRepository, DatabaseConfig, } from "./internal.js"; @@ -37,6 +38,7 @@ export type { Donation, NewDonation } from "./types/index.js"; export type { NewApplicationPayout, ApplicationPayout } from "./types/index.js"; export type { Strategy, NewStrategy } from "./types/index.js"; +export type { ProcessedEvent, NewProcessedEvent } from "./types/index.js"; export type { Changeset, @@ -54,6 +56,7 @@ export { KyselyDonationRepository, KyselyApplicationPayoutRepository, KyselyStrategyRegistryRepository, + KyselyEventRegistryRepository, } from "./repositories/kysely/index.js"; export { diff --git a/packages/repository/src/interfaces/eventsRepository.interface.ts b/packages/repository/src/interfaces/eventsRepository.interface.ts new file mode 100644 index 0000000..5e3ee73 --- /dev/null +++ b/packages/repository/src/interfaces/eventsRepository.interface.ts @@ -0,0 +1,19 @@ +import { ChainId } from "@grants-stack-indexer/shared"; + +import { NewProcessedEvent, ProcessedEvent } from "../types/index.js"; + +export interface IEventRegistryRepository { + /** + * Get the last processed event for a given chain. + * @param chainId - The chain ID. + * @returns The last processed event or undefined if none exists. + */ + getLastProcessedEvent: (chainId: ChainId) => Promise; + + /** + * Save the last processed event for a given chain. + * @param chainId - The chain ID. + * @param event - The new processed event to save. + */ + saveLastProcessedEvent: (chainId: ChainId, event: NewProcessedEvent) => Promise; +} diff --git a/packages/repository/src/interfaces/index.ts b/packages/repository/src/interfaces/index.ts index 3562351..f1a27be 100644 --- a/packages/repository/src/interfaces/index.ts +++ b/packages/repository/src/interfaces/index.ts @@ -4,3 +4,4 @@ export * from "./applicationRepository.interface.js"; export * from "./donationRepository.interface.js"; export * from "./applicationPayoutRepository.interface.js"; export * from "./strategyRepository.interface.js"; +export * from "./eventsRepository.interface.js"; diff --git a/packages/repository/src/repositories/kysely/eventRegistry.repository.ts b/packages/repository/src/repositories/kysely/eventRegistry.repository.ts new file mode 100644 index 0000000..5a88d8e --- /dev/null +++ b/packages/repository/src/repositories/kysely/eventRegistry.repository.ts @@ -0,0 +1,42 @@ +import { Kysely } from "kysely"; + +import { ChainId } from "@grants-stack-indexer/shared"; + +import { IEventRegistryRepository } from "../../interfaces/index.js"; +import { Database, NewProcessedEvent, ProcessedEvent } from "../../internal.js"; + +export class KyselyEventRegistryRepository implements IEventRegistryRepository { + constructor( + private readonly db: Kysely, + private readonly schemaName: string, + ) {} + + /** @inheritdoc */ + async getLastProcessedEvent(chainId: ChainId): Promise { + return this.db + .withSchema(this.schemaName) + .selectFrom("eventsRegistry") + .where("chainId", "=", chainId) + .selectAll() + .executeTakeFirst(); + } + + /** @inheritdoc */ + async saveLastProcessedEvent(chainId: ChainId, event: NewProcessedEvent): Promise { + const { blockNumber, blockTimestamp, logIndex, rawEvent } = event; // Extract only the fields from NewProcessedEvent + await this.db + .withSchema(this.schemaName) + .insertInto("eventsRegistry") + .values({ blockNumber, blockTimestamp, logIndex, chainId, rawEvent }) + .onConflict((oc) => + oc.columns(["chainId"]).doUpdateSet({ + blockNumber, + blockTimestamp, + logIndex, + rawEvent, + chainId, + }), + ) + .execute(); + } +} diff --git a/packages/repository/src/repositories/kysely/index.ts b/packages/repository/src/repositories/kysely/index.ts index ae4143a..66febbe 100644 --- a/packages/repository/src/repositories/kysely/index.ts +++ b/packages/repository/src/repositories/kysely/index.ts @@ -4,3 +4,4 @@ export * from "./application.repository.js"; export * from "./donation.repository.js"; export * from "./applicationPayout.repository.js"; export * from "./strategyRegistry.repository.js"; +export * from "./eventRegistry.repository.js"; diff --git a/packages/repository/src/repositories/kysely/strategyRegistry.repository.ts b/packages/repository/src/repositories/kysely/strategyRegistry.repository.ts index 2b1b011..eef386c 100644 --- a/packages/repository/src/repositories/kysely/strategyRegistry.repository.ts +++ b/packages/repository/src/repositories/kysely/strategyRegistry.repository.ts @@ -18,7 +18,7 @@ export class KyselyStrategyRegistryRepository implements IStrategyRegistryReposi ): Promise { return this.db .withSchema(this.schemaName) - .selectFrom("strategies") + .selectFrom("strategiesRegistry") .where("chainId", "=", chainId) .where("address", "=", strategyAddress) .selectAll() @@ -29,7 +29,7 @@ export class KyselyStrategyRegistryRepository implements IStrategyRegistryReposi async saveStrategy(strategy: Strategy): Promise { await this.db .withSchema(this.schemaName) - .insertInto("strategies") + .insertInto("strategiesRegistry") .values(strategy) .onConflict((oc) => oc.columns(["chainId", "address"]).doUpdateSet(strategy)) .execute(); @@ -37,7 +37,10 @@ export class KyselyStrategyRegistryRepository implements IStrategyRegistryReposi /** @inheritdoc */ async getStrategies(filters?: { handled?: boolean; chainId?: ChainId }): Promise { - let query = this.db.withSchema(this.schemaName).selectFrom("strategies").selectAll(); + let query = this.db + .withSchema(this.schemaName) + .selectFrom("strategiesRegistry") + .selectAll(); if (filters?.chainId) { query = query.where("chainId", "=", filters.chainId); diff --git a/packages/repository/src/types/event.types.ts b/packages/repository/src/types/event.types.ts new file mode 100644 index 0000000..1e0c5ca --- /dev/null +++ b/packages/repository/src/types/event.types.ts @@ -0,0 +1,11 @@ +import { AnyEvent, ChainId, ContractName, ProcessorEvent } from "@grants-stack-indexer/shared"; + +export type ProcessedEvent = { + chainId: ChainId; + blockNumber: number; + blockTimestamp: number; + logIndex: number; + rawEvent?: Partial>; +}; + +export type NewProcessedEvent = Omit; diff --git a/packages/repository/src/types/index.ts b/packages/repository/src/types/index.ts index c34de7d..9832417 100644 --- a/packages/repository/src/types/index.ts +++ b/packages/repository/src/types/index.ts @@ -5,3 +5,4 @@ export * from "./changeset.types.js"; export * from "./donation.types.js"; export * from "./applicationPayout.types.js"; export * from "./strategy.types.js"; +export * from "./event.types.js"; diff --git a/scripts/migrations/src/migrations/20241210T175001_strategy_registry.ts b/scripts/migrations/src/migrations/20241210T175001_strategy_registry.ts index 45a5936..fbef0cb 100644 --- a/scripts/migrations/src/migrations/20241210T175001_strategy_registry.ts +++ b/scripts/migrations/src/migrations/20241210T175001_strategy_registry.ts @@ -11,17 +11,17 @@ export async function up(db: Kysely): Promise { const CHAIN_ID_TYPE = "integer"; await db.schema - .createTable("strategies") + .createTable("strategies_registry") .addColumn("address", ADDRESS_TYPE) .addColumn("id", "text") .addColumn("chainId", CHAIN_ID_TYPE) .addColumn("handled", "boolean") - .addPrimaryKeyConstraint("strategies_pkey", ["address", "chainId"]) + .addPrimaryKeyConstraint("strategies_registry_pkey", ["address", "chainId"]) .execute(); } // eslint-disable-next-line @typescript-eslint/no-explicit-any export async function down(db: Kysely): Promise { // Drop everything in reverse order - await db.schema.dropTable("strategies").execute(); + await db.schema.dropTable("strategies_registry").execute(); } diff --git a/scripts/migrations/src/migrations/20241216T160549_event_registry.ts b/scripts/migrations/src/migrations/20241216T160549_event_registry.ts new file mode 100644 index 0000000..3950876 --- /dev/null +++ b/scripts/migrations/src/migrations/20241216T160549_event_registry.ts @@ -0,0 +1,27 @@ +import { Kysely } from "kysely"; + +/** + * The up function is called when you update your database schema to the next version and down when you go back to previous version. + * The only argument for the functions is an instance of Kysely. It's important to use Kysely and not Kysely. + * ref: https://kysely.dev/docs/migrations#migration-files + */ +// eslint-disable-next-line @typescript-eslint/no-explicit-any +export async function up(db: Kysely): Promise { + const CHAIN_ID_TYPE = "integer"; + + await db.schema + .createTable("events_registry") + .addColumn("chainId", CHAIN_ID_TYPE) + .addColumn("blockNumber", "integer") + .addColumn("blockTimestamp", "integer") + .addColumn("logIndex", "integer") + .addColumn("rawEvent", "jsonb") + .addPrimaryKeyConstraint("events_registry_pkey", ["chainId"]) + .execute(); +} + +// eslint-disable-next-line @typescript-eslint/no-explicit-any +export async function down(db: Kysely): Promise { + // Drop everything in reverse order + await db.schema.dropTable("events_registry").execute(); +}