From f94c48d2c956b6ce5480ca642cd38bd3c0d6f39b Mon Sep 17 00:00:00 2001 From: nigiri <168690269+0xnigir1@users.noreply.github.com> Date: Mon, 23 Dec 2024 16:55:52 -0300 Subject: [PATCH 1/2] feat: retroactive processing --- apps/processing/package.json | 1 + .../src/retroactiveHandleStrategies.ts | 33 ++ .../src/services/processing.service.ts | 51 +- packages/data-flow/src/external.ts | 2 + packages/data-flow/src/internal.ts | 1 + .../data-flow/src/retroactiveProcessor.ts | 264 ++++++++++ .../test/unit/retroactiveProcessor.spec.ts | 482 ++++++++++++++++++ 7 files changed, 817 insertions(+), 17 deletions(-) create mode 100644 apps/processing/src/retroactiveHandleStrategies.ts create mode 100644 packages/data-flow/src/retroactiveProcessor.ts create mode 100644 packages/data-flow/test/unit/retroactiveProcessor.spec.ts diff --git a/apps/processing/package.json b/apps/processing/package.json index 4b280e7..3577de8 100644 --- a/apps/processing/package.json +++ b/apps/processing/package.json @@ -13,6 +13,7 @@ "format:fix": "prettier --write \"{src,test}/**/*.{js,ts,json}\"", "lint": "eslint \"{src,test}/**/*.{js,ts,json}\"", "lint:fix": "pnpm lint --fix", + "retroactive": "tsx src/retroactiveHandleStrategies.ts", "start": "node dist/index.js", "test": "vitest run --config vitest.config.ts --passWithNoTests", "test:cov": "vitest run --config vitest.config.ts --coverage --passWithNoTests" diff --git a/apps/processing/src/retroactiveHandleStrategies.ts b/apps/processing/src/retroactiveHandleStrategies.ts new file mode 100644 index 0000000..18b764d --- /dev/null +++ b/apps/processing/src/retroactiveHandleStrategies.ts @@ -0,0 +1,33 @@ +import { inspect } from "util"; + +import { environment } from "./config/index.js"; +import { ProcessingService } from "./services/processing.service.js"; + +let processor: ProcessingService; + +const main = async (): Promise => { + processor = await ProcessingService.initialize(environment); + await processor.processRetroactiveEvents(); +}; + +process.on("unhandledRejection", (reason, p) => { + console.error(`Unhandled Rejection at: \n${inspect(p, undefined, 100)}, \nreason: ${reason}`); + process.exit(1); +}); + +process.on("uncaughtException", (error: Error) => { + console.error( + `An uncaught exception occurred: ${error}\n` + `Exception origin: ${error.stack}`, + ); + process.exit(1); +}); + +main() + .catch((err) => { + console.error(`Caught error in main handler: ${err}`); + process.exit(1); + }) + // eslint-disable-next-line @typescript-eslint/no-misused-promises + .finally(async () => { + await processor?.releaseResources(); + }); diff --git a/apps/processing/src/services/processing.service.ts b/apps/processing/src/services/processing.service.ts index 0cbe94a..5e31d13 100644 --- a/apps/processing/src/services/processing.service.ts +++ b/apps/processing/src/services/processing.service.ts @@ -7,6 +7,7 @@ import { InMemoryCachedEventRegistry, InMemoryCachedStrategyRegistry, Orchestrator, + RetroactiveProcessor, } from "@grants-stack-indexer/data-flow"; import { ChainId, Logger } from "@grants-stack-indexer/shared"; @@ -27,12 +28,12 @@ import { SharedDependencies, SharedDependenciesService } from "./index.js"; * - Manages graceful shutdown on termination signals */ export class ProcessingService { - private readonly orchestrators: Map = new Map(); + private readonly orchestrators: Map = new Map(); private readonly logger = new Logger({ className: "ProcessingService" }); private readonly kyselyDatabase: SharedDependencies["kyselyDatabase"]; private constructor( - orchestrators: Map, + orchestrators: Map, kyselyDatabase: SharedDependencies["kyselyDatabase"], ) { this.orchestrators = orchestrators; @@ -44,7 +45,7 @@ export class ProcessingService { const { CHAINS: chains } = env; const { core, registriesRepositories, indexerClient, kyselyDatabase } = sharedDependencies; const { eventRegistryRepository, strategyRegistryRepository } = registriesRepositories; - const orchestrators: Map = new Map(); + const orchestrators: Map = new Map(); const strategyRegistry = await InMemoryCachedStrategyRegistry.initialize( new Logger({ className: "InMemoryCachedStrategyRegistry" }), @@ -70,21 +71,31 @@ export class ProcessingService { [chain.id as ChainId], ); - orchestrators.set( + const orchestrator = new Orchestrator( chain.id as ChainId, - new Orchestrator( - chain.id as ChainId, - { ...core, evmProvider }, - indexerClient, - { - eventsRegistry: cachedEventsRegistry, - strategyRegistry, - }, - chain.fetchLimit, - chain.fetchDelayMs, - chainLogger, - ), + { ...core, evmProvider }, + indexerClient, + { + eventsRegistry: cachedEventsRegistry, + strategyRegistry, + }, + chain.fetchLimit, + chain.fetchDelayMs, + chainLogger, ); + const retroactiveProcessor = new RetroactiveProcessor( + chain.id as ChainId, + { ...core, evmProvider }, + indexerClient, + { + eventsRegistry: cachedEventsRegistry, + strategyRegistry, + }, + chain.fetchLimit, + chainLogger, + ); + + orchestrators.set(chain.id as ChainId, [orchestrator, retroactiveProcessor]); } return new ProcessingService(orchestrators, kyselyDatabase); @@ -114,7 +125,7 @@ export class ProcessingService { }); try { - for (const orchestrator of this.orchestrators.values()) { + for (const [orchestrator, _] of this.orchestrators.values()) { this.logger.info(`Starting orchestrator for chain ${orchestrator.chainId}...`); orchestratorProcesses.push(orchestrator.run(abortController.signal)); } @@ -126,6 +137,12 @@ export class ProcessingService { } } + async processRetroactiveEvents(): Promise { + for (const [_, retroactiveProcessor] of this.orchestrators.values()) { + await retroactiveProcessor.processRetroactiveStrategies(); + } + } + /** * Call this function when the processor service is terminated * - Releases database resources diff --git a/packages/data-flow/src/external.ts b/packages/data-flow/src/external.ts index 954c0b7..8a217ce 100644 --- a/packages/data-flow/src/external.ts +++ b/packages/data-flow/src/external.ts @@ -10,3 +10,5 @@ export { export type { IEventsRegistry, IStrategyRegistry, IDataLoader } from "./internal.js"; export type { CoreDependencies } from "./internal.js"; + +export { RetroactiveProcessor } from "./retroactiveProcessor.js"; diff --git a/packages/data-flow/src/internal.ts b/packages/data-flow/src/internal.ts index 9224340..9e5bde1 100644 --- a/packages/data-flow/src/internal.ts +++ b/packages/data-flow/src/internal.ts @@ -8,3 +8,4 @@ export * from "./eventsFetcher.js"; export * from "./registries/index.js"; export * from "./eventsProcessor.js"; export * from "./orchestrator.js"; +export * from "./retroactiveProcessor.js"; diff --git a/packages/data-flow/src/retroactiveProcessor.ts b/packages/data-flow/src/retroactiveProcessor.ts new file mode 100644 index 0000000..5456cb1 --- /dev/null +++ b/packages/data-flow/src/retroactiveProcessor.ts @@ -0,0 +1,264 @@ +import { IIndexerClient } from "@grants-stack-indexer/indexer-client"; +import { existsHandler, UnsupportedEventException } from "@grants-stack-indexer/processors"; +import { + Address, + AnyEvent, + ChainId, + ContractName, + Hex, + ILogger, + ProcessorEvent, + stringify, +} from "@grants-stack-indexer/shared"; + +import { EventsProcessor } from "./eventsProcessor.js"; +import { + CoreDependencies, + DataLoader, + EventsFetcher, + IEventsFetcher, + IEventsRegistry, + InvalidEvent, + IStrategyRegistry, + Queue, +} from "./internal.js"; + +/** + * Represents a pointer to a specific event in the blockchain + */ +type EventPointer = { + /** The block number where the event occurred */ + blockNumber: number; + /** The log index within the block */ + logIndex: number; +}; + +/** + * The RetroactiveProcessor is responsible for processing historical events from strategies + * that were previously unsupported but are now handleable. This allows the system to + * catch up on missed events and maintain data consistency. + * + * Key responsibilities: + * 1. Identify newly handleable strategies + * 2. Fetch historical events for these strategies + * 3. Process events through the appropriate handlers + * 4. Update strategy registry with processed status + */ +export class RetroactiveProcessor { + private readonly eventsFetcher: IEventsFetcher; + private readonly eventsProcessor: EventsProcessor; + private readonly eventsRegistry: IEventsRegistry; + private readonly strategyRegistry: IStrategyRegistry; + private readonly dataLoader: DataLoader; + + /** + * Creates a new instance of RetroactiveProcessor + * @param chainId - The blockchain network identifier + * @param dependencies - Core system dependencies for data access and processing + * @param indexerClient - Client for fetching blockchain events + * @param registries - Event and strategy registries for tracking processing state + * @param fetchLimit - Maximum number of events to fetch in a single batch + * @param logger - Logger instance for debugging and monitoring + */ + constructor( + public readonly chainId: ChainId, + private dependencies: Readonly, + private indexerClient: IIndexerClient, + private registries: { + eventsRegistry: IEventsRegistry; + strategyRegistry: IStrategyRegistry; + }, + private fetchLimit: number = 1000, + private logger: ILogger, + ) { + this.eventsFetcher = new EventsFetcher(this.indexerClient); + this.eventsProcessor = new EventsProcessor(this.chainId, { + ...this.dependencies, + logger: this.logger, + }); + this.eventsRegistry = registries.eventsRegistry; + this.strategyRegistry = registries.strategyRegistry; + this.dataLoader = new DataLoader( + { + project: this.dependencies.projectRepository, + round: this.dependencies.roundRepository, + application: this.dependencies.applicationRepository, + donation: this.dependencies.donationRepository, + applicationPayout: this.dependencies.applicationPayoutRepository, + }, + this.logger, + ); + } + + /** + * Process historical events for all strategies that are now handleable but weren't before + * @returns Promise that resolves when all retroactive processing is complete + */ + async processRetroactiveStrategies(): Promise { + const newHandleableStrategies = await this.findNewHandleableStrategies(); + + if (newHandleableStrategies.size === 0) { + this.logger.info("No new handleable strategies found"); + return; + } + + const lastEvent = await this.eventsRegistry.getLastProcessedEvent(this.chainId); + const lastEventPointer: EventPointer = { + blockNumber: lastEvent?.blockNumber ?? 0, + logIndex: lastEvent?.logIndex ?? 0, + }; + + const results = await Promise.allSettled( + Array.from(newHandleableStrategies.entries()).map( + async ([strategyId, strategyAddresses]) => { + try { + await this.processRetroactiveStrategy( + strategyId, + strategyAddresses, + lastEventPointer, + ); + } catch (error) { + this.logger.error( + `Failed to process strategy ${strategyId}: ${error instanceof Error ? error.message : String(error)}`, + ); + throw error; + } + }, + ), + ); + + // Log results summary + const succeeded = results.filter((r) => r.status === "fulfilled").length; + const failed = results.filter((r) => r.status === "rejected").length; + this.logger.info( + `Retroactive processing complete. Succeeded: ${succeeded}, Failed: ${failed}`, + ); + } + + /** + * Process historical events for a specific strategy + * @param strategyId - Identifier of the strategy to process + * @param strategyAddresses - Set of contract addresses implementing this strategy + * @param lastEventPointer - Latest processed event pointer to process up to + */ + private async processRetroactiveStrategy( + strategyId: Hex, + strategyAddresses: Set
, + lastEventPointer: Readonly, + ): Promise { + const currentPointer: EventPointer = { blockNumber: 0, logIndex: 0 }; + const events = new Queue & { strategyId?: Hex }>(); + let event: (ProcessorEvent & { strategyId?: Hex }) | undefined; + + while (true) { + try { + await this.enqueueEventsIfEmpty( + events, + strategyAddresses, + currentPointer, + lastEventPointer, + ); + + event = events.pop(); + if (!event) break; + + currentPointer.blockNumber = event.blockNumber; + currentPointer.logIndex = event.logIndex; + + if (this.hasReachedLastEvent(currentPointer, lastEventPointer)) break; + + event.strategyId = strategyId; + const changesets = await this.eventsProcessor.processEvent(event); + const executionResult = await this.dataLoader.applyChanges(changesets); + + if (executionResult.numFailed > 0) { + this.logger.error( + `Failed to apply changesets. ${executionResult.errors.join("\n")} Event: ${stringify(event)}`, + ); + } + + // Update pointer + } catch (error) { + if (error instanceof InvalidEvent || error instanceof UnsupportedEventException) { + // Expected errors that we can safely ignore + this.logger.debug(`Skipping error for ${error.name}: ${stringify(event)}`); + } else { + this.logger.error(`Error processing event: ${stringify(event)} ${error}`); + } + } + } + + await this.markStrategyAsHandled(strategyId, strategyAddresses); + } + + private async enqueueEventsIfEmpty( + queue: Queue & { strategyId?: Hex }>, + strategyAddresses: Set
, + currentPointer: EventPointer, + lastEventPointer: EventPointer, + ): Promise { + if (queue.isEmpty()) { + const fetchedEvents = await this.eventsFetcher.fetchEvents({ + chainId: this.chainId, + srcAddresses: Array.from(strategyAddresses), + from: currentPointer, + to: lastEventPointer, + limit: this.fetchLimit, + }); + if (fetchedEvents.length > 0) queue.push(...fetchedEvents); + } + } + + /** + * Find strategies that were previously unhandled but now have handlers available + * @returns Map of strategy IDs to their implementation addresses + */ + private async findNewHandleableStrategies(): Promise>> { + const unhandledStrategies = await this.strategyRegistry.getStrategies({ + handled: false, + chainId: this.chainId, + }); + + const newHandleableStrategies = new Map>(); + for (const strategy of unhandledStrategies) { + if (existsHandler(strategy.id)) { + if (!newHandleableStrategies.has(strategy.id)) { + newHandleableStrategies.set(strategy.id, new Set()); + } + newHandleableStrategies.get(strategy.id)?.add(strategy.address); + } + } + + return newHandleableStrategies; + } + + private hasReachedLastEvent(current: EventPointer, last: EventPointer): boolean { + return ( + current.blockNumber > last.blockNumber || + (current.blockNumber === last.blockNumber && current.logIndex >= last.logIndex) + ); + } + + private async markStrategyAsHandled(strategyId: Hex, addresses: Set
): Promise { + this.logger.info(`Processed retroactively strategy ${strategyId}`); + + await Promise.all( + Array.from(addresses).map(async (address) => { + this.logger.debug( + `Marking strategy ${strategyId} as handled for address ${address}`, + ); + try { + await this.strategyRegistry.saveStrategyId( + this.chainId, + address, + strategyId, + true, + ); + } catch (error: unknown) { + this.logger.error(`Failed to mark strategy ${strategyId} as handled: ${error}`); + throw error; + } + }), + ); + } +} diff --git a/packages/data-flow/test/unit/retroactiveProcessor.spec.ts b/packages/data-flow/test/unit/retroactiveProcessor.spec.ts new file mode 100644 index 0000000..69f5add --- /dev/null +++ b/packages/data-flow/test/unit/retroactiveProcessor.spec.ts @@ -0,0 +1,482 @@ +import { afterEach, beforeEach, describe, expect, it, vi } from "vitest"; + +import { EvmProvider } from "@grants-stack-indexer/chain-providers"; +import { IIndexerClient } from "@grants-stack-indexer/indexer-client"; +import { + IApplicationPayoutRepository, + IApplicationRepository, + IDonationRepository, + IProjectRepository, + IRoundRepository, + Strategy, +} from "@grants-stack-indexer/repository"; +import { + ChainId, + ContractToEventName, + DeepPartial, + EventParams, + Hex, + ILogger, + mergeDeep, + ProcessorEvent, +} from "@grants-stack-indexer/shared"; + +import { + CoreDependencies, + DataLoader, + EventsProcessor, + IEventsFetcher, + IEventsRegistry, + InvalidEvent, + IStrategyRegistry, +} from "../../src/internal.js"; +import { RetroactiveProcessor } from "../../src/retroactiveProcessor.js"; + +vi.mock("../../src/eventsProcessor.js", () => { + const EventsProcessor = vi.fn(); + // eslint-disable-next-line @typescript-eslint/no-unsafe-member-access + EventsProcessor.prototype.processEvent = vi.fn(); + return { + EventsProcessor, + }; +}); +vi.mock("../../src/data-loader/dataLoader.js", () => { + const DataLoader = vi.fn(); + // eslint-disable-next-line @typescript-eslint/no-unsafe-member-access + DataLoader.prototype.applyChanges = vi.fn(); + return { + DataLoader, + }; +}); + +vi.mock("../../src/eventsFetcher.js", () => { + const EventsFetcher = vi.fn(); + // eslint-disable-next-line @typescript-eslint/no-unsafe-member-access + EventsFetcher.prototype.fetchEvents = vi.fn(); + return { + EventsFetcher, + }; +}); + +describe("RetroactiveProcessor", () => { + let processor: RetroactiveProcessor; + let mockIndexerClient: IIndexerClient; + let mockEventsRegistry: IEventsRegistry; + let mockStrategyRegistry: IStrategyRegistry; + let mockEvmProvider: EvmProvider; + let mockLogger: ILogger; + let mockEventsProcessor: EventsProcessor; + let mockDataLoader: DataLoader; + let mockEventsFetcher: IEventsFetcher; + + const chainId = 1 as ChainId; + const mockFetchLimit = 10; + const existentStrategyId = + "0x103732a8e473467a510d4128ee11065262bdd978f0d9dad89ba68f2c56127e27" as Hex; + const eventName = "TimestampsUpdated"; + const defaultParams = { + startTime: "1704067200", // 2024-01-01 00:00:00 + endTime: "1704153600", // 2024-01-02 00:00:00 + sender: "0xcBf407C33d68a55CB594Ffc8f4fD1416Bba39DA5", + } as const; + + const mockValidStrategies: Strategy[] = [ + { + address: "0x1234", + id: existentStrategyId, + chainId, + handled: false, + }, + { + address: "0x4567", + id: existentStrategyId, + chainId, + handled: false, + }, + ]; + + beforeEach(() => { + // Setup mock implementations + mockIndexerClient = { + getEventsAfterBlockNumberAndLogIndex: vi.fn(), + } as unknown as IIndexerClient; + + mockEventsRegistry = { + getLastProcessedEvent: vi.fn(), + saveLastProcessedEvent: vi.fn(), + }; + + mockStrategyRegistry = { + getStrategyId: vi.fn(), + saveStrategyId: vi.fn(), + getStrategies: vi.fn(), + }; + + mockEvmProvider = { + readContract: vi.fn(), + } as unknown as EvmProvider; + + mockLogger = { + debug: vi.fn(), + error: vi.fn(), + info: vi.fn(), + warn: vi.fn(), + }; + + const dependencies: CoreDependencies = { + evmProvider: mockEvmProvider, + projectRepository: {} as IProjectRepository, + roundRepository: {} as IRoundRepository, + applicationRepository: {} as IApplicationRepository, + donationRepository: {} as IDonationRepository, + applicationPayoutRepository: {} as IApplicationPayoutRepository, + pricingProvider: { + getTokenPrice: vi.fn(), + }, + metadataProvider: { + getMetadata: vi.fn(), + }, + }; + + processor = new RetroactiveProcessor( + chainId, + dependencies, + mockIndexerClient, + { + eventsRegistry: mockEventsRegistry, + strategyRegistry: mockStrategyRegistry, + }, + mockFetchLimit, + mockLogger, + ); + + mockEventsProcessor = processor["eventsProcessor"]; + mockDataLoader = processor["dataLoader"]; + mockEventsFetcher = processor["eventsFetcher"]; + }); + + afterEach(() => { + vi.clearAllMocks(); + }); + + describe("processRetroactiveStrategies", () => { + it("exits early if all strategies are marked as handled", async () => { + vi.spyOn(mockStrategyRegistry, "getStrategies").mockResolvedValue([]); + + await processor.processRetroactiveStrategies(); + + expect(mockLogger.info).toHaveBeenCalledWith("No new handleable strategies found"); + expect(mockEventsRegistry.getLastProcessedEvent).not.toHaveBeenCalled(); + }); + + it("exits early if Handler doesn't exist for strategy", async () => { + const mockStrategy: Strategy = { + address: "0x1234", + id: "0xnohandler", + chainId, + handled: false, + }; + vi.spyOn(mockStrategyRegistry, "getStrategies").mockResolvedValue([mockStrategy]); + + await processor.processRetroactiveStrategies(); + + expect(mockLogger.info).toHaveBeenCalledWith("No new handleable strategies found"); + expect(mockEventsRegistry.getLastProcessedEvent).not.toHaveBeenCalled(); + }); + + it("process new handleable strategies", async () => { + const mockEvent = createMockEvent(eventName, defaultParams, existentStrategyId, { + blockNumber: 90, + logIndex: 0, + }); + + vi.spyOn(mockStrategyRegistry, "getStrategies").mockResolvedValue(mockValidStrategies); + vi.spyOn(mockEventsRegistry, "getLastProcessedEvent").mockResolvedValue({ + blockNumber: 100, + logIndex: 1, + chainId, + blockTimestamp: 1234567890, + }); + vi.spyOn(processor["eventsFetcher"], "fetchEvents") + .mockResolvedValueOnce([mockEvent]) + .mockResolvedValue([]); + vi.spyOn(mockEventsProcessor, "processEvent").mockResolvedValue([]); + vi.spyOn(mockDataLoader, "applyChanges").mockResolvedValue({ + numFailed: 0, + errors: [], + changesets: [], + numExecuted: 1, + numSuccessful: 1, + }); + vi.spyOn(mockStrategyRegistry, "saveStrategyId").mockResolvedValue(); + + await processor.processRetroactiveStrategies(); + + expect(mockLogger.info).toHaveBeenCalledWith( + "Retroactive processing complete. Succeeded: 1, Failed: 0", + ); + expect(mockEventsProcessor.processEvent).toHaveBeenCalledTimes(1); + expect(mockStrategyRegistry.saveStrategyId).toHaveBeenCalledTimes(2); + }); + + it("process multiple new handleable strategies", async () => { + const strategies: Strategy[] = [ + ...mockValidStrategies, + { + address: "0x9abc", + id: "0x6f9291df02b2664139cec5703c124e4ebce32879c74b6297faa1468aa5ff9ebf", + chainId, + handled: false, + }, + ]; + const mockEvent = createMockEvent(eventName, defaultParams, existentStrategyId, { + blockNumber: 50, + logIndex: 0, + }); + + vi.spyOn(mockStrategyRegistry, "getStrategies").mockResolvedValue(strategies); + vi.spyOn(mockEventsRegistry, "getLastProcessedEvent").mockResolvedValue({ + blockNumber: 100, + logIndex: 1, + chainId, + blockTimestamp: 1234567890, + }); + + const fetchEventsSpy = vi.spyOn(mockEventsFetcher, "fetchEvents"); + fetchEventsSpy.mockImplementation(async (params) => { + if (params.from?.blockNumber === 0 && params.from?.logIndex === 0) { + return [mockEvent]; + } + return []; + }); + vi.spyOn(mockEventsProcessor, "processEvent").mockResolvedValue([]); + vi.spyOn(mockDataLoader, "applyChanges").mockResolvedValue({ + numFailed: 0, + errors: [], + changesets: [], + numExecuted: 1, + numSuccessful: 1, + }); + vi.spyOn(mockStrategyRegistry, "saveStrategyId").mockResolvedValue(); + + await processor.processRetroactiveStrategies(); + + expect(mockLogger.info).toHaveBeenCalledWith( + "Retroactive processing complete. Succeeded: 2, Failed: 0", + ); + expect(mockEventsFetcher.fetchEvents).toHaveBeenCalledTimes(4); + expect(mockStrategyRegistry.saveStrategyId).toHaveBeenCalledTimes(3); + expect(mockStrategyRegistry.saveStrategyId).toHaveBeenCalledWith( + chainId, + "0x9abc", + "0x6f9291df02b2664139cec5703c124e4ebce32879c74b6297faa1468aa5ff9ebf", + true, + ); + }); + + it("breaks loop if event is older than last processed", async () => { + const mockEvent = createMockEvent(eventName, defaultParams, existentStrategyId, { + blockNumber: 100, + logIndex: 2, + }); + + vi.spyOn(mockStrategyRegistry, "getStrategies").mockResolvedValue(mockValidStrategies); + vi.spyOn(mockEventsRegistry, "getLastProcessedEvent").mockResolvedValue({ + blockNumber: 100, + logIndex: 1, + chainId, + blockTimestamp: 1234567890, + }); + vi.spyOn(processor["eventsFetcher"], "fetchEvents").mockResolvedValueOnce([mockEvent]); + + vi.spyOn(mockEventsProcessor, "processEvent").mockResolvedValue([]); + vi.spyOn(mockDataLoader, "applyChanges").mockResolvedValue({ + numFailed: 0, + errors: [], + changesets: [], + numExecuted: 1, + numSuccessful: 1, + }); + vi.spyOn(mockStrategyRegistry, "saveStrategyId").mockResolvedValue(); + + await processor.processRetroactiveStrategies(); + + expect(mockLogger.info).toHaveBeenCalledWith( + "Retroactive processing complete. Succeeded: 1, Failed: 0", + ); + expect(mockEventsProcessor.processEvent).toHaveBeenCalledTimes(0); + expect(mockStrategyRegistry.saveStrategyId).toHaveBeenCalledTimes(2); + }); + + it("keep fetching events if available", async () => { + const mockEvent = createMockEvent(eventName, defaultParams, existentStrategyId, { + blockNumber: 90, + logIndex: 0, + }); + const mockEvent2 = createMockEvent(eventName, defaultParams, existentStrategyId, { + blockNumber: 95, + logIndex: 4, + }); + + vi.spyOn(mockStrategyRegistry, "getStrategies").mockResolvedValue(mockValidStrategies); + vi.spyOn(mockEventsRegistry, "getLastProcessedEvent").mockResolvedValue({ + blockNumber: 100, + logIndex: 1, + chainId, + blockTimestamp: 1234567890, + }); + vi.spyOn(processor["eventsFetcher"], "fetchEvents") + .mockResolvedValueOnce([mockEvent]) + .mockResolvedValueOnce([mockEvent2]) + .mockResolvedValue([]); + + vi.spyOn(mockEventsProcessor, "processEvent").mockResolvedValue([]); + vi.spyOn(mockDataLoader, "applyChanges").mockResolvedValue({ + numFailed: 0, + errors: [], + changesets: [], + numExecuted: 1, + numSuccessful: 1, + }); + vi.spyOn(mockStrategyRegistry, "saveStrategyId").mockResolvedValue(); + + await processor.processRetroactiveStrategies(); + + expect(mockEventsProcessor.processEvent).toHaveBeenCalledTimes(2); + expect(mockEventsFetcher.fetchEvents).toHaveBeenCalledTimes(3); + expect(mockStrategyRegistry.saveStrategyId).toHaveBeenCalledTimes(2); + }); + + describe("error handling", () => { + it("handles expected errors silently and continues processing", async () => { + const mockEvent1 = createMockEvent(eventName, defaultParams, existentStrategyId, { + blockNumber: 50, + logIndex: 0, + }); + const mockEvent2 = createMockEvent(eventName, defaultParams, existentStrategyId, { + blockNumber: 60, + logIndex: 0, + }); + + vi.spyOn(mockStrategyRegistry, "getStrategies").mockResolvedValue( + mockValidStrategies, + ); + vi.spyOn(mockEventsRegistry, "getLastProcessedEvent").mockResolvedValue({ + blockNumber: 100, + logIndex: 1, + chainId, + blockTimestamp: 1234567890, + }); + + // First event throws InvalidEvent, second one processes successfully + vi.spyOn(mockEventsFetcher, "fetchEvents") + .mockResolvedValueOnce([mockEvent1, mockEvent2]) + .mockResolvedValue([]); + + const processEventSpy = vi.spyOn(mockEventsProcessor, "processEvent"); + processEventSpy + .mockRejectedValueOnce(new InvalidEvent(mockEvent1)) + .mockResolvedValueOnce([]); + + vi.spyOn(mockDataLoader, "applyChanges").mockResolvedValue({ + numFailed: 0, + errors: [], + changesets: [], + numExecuted: 1, + numSuccessful: 1, + }); + + await processor.processRetroactiveStrategies(); + + // Verify that processing continued after the error + expect(processEventSpy).toHaveBeenCalledTimes(2); + expect(mockLogger.debug).toHaveBeenCalledWith( + expect.stringContaining("Skipping error for InvalidEvent"), + ); + expect(mockLogger.error).not.toHaveBeenCalled(); + expect(mockStrategyRegistry.saveStrategyId).toHaveBeenCalledTimes(2); + }); + + it("handles error on marking strategy as handled", async () => { + const unexpectedError = new Error("Unexpected database error"); + const mockEvent = createMockEvent(eventName, defaultParams, existentStrategyId, { + blockNumber: 50, + logIndex: 0, + }); + + vi.spyOn(mockStrategyRegistry, "getStrategies").mockResolvedValue( + mockValidStrategies, + ); + vi.spyOn(mockEventsRegistry, "getLastProcessedEvent").mockResolvedValue({ + blockNumber: 100, + logIndex: 1, + chainId, + blockTimestamp: 1234567890, + }); + + vi.spyOn(mockEventsFetcher, "fetchEvents") + .mockResolvedValueOnce([mockEvent]) + .mockResolvedValue([]); + + vi.spyOn(mockEventsProcessor, "processEvent").mockRejectedValue(unexpectedError); + vi.spyOn(mockStrategyRegistry, "saveStrategyId") + .mockResolvedValueOnce() + .mockRejectedValue(unexpectedError); + + await processor.processRetroactiveStrategies(); + + expect(mockLogger.info).toHaveBeenCalledWith( + "Retroactive processing complete. Succeeded: 0, Failed: 1", + ); + }); + }); + }); +}); + +/** + * Creates a mock event for testing. + * + * @param eventName - The name of the event. + * @param params - The parameters of the event. + * @param strategyId - The ID of the strategy. + * @param overrides - The overrides for the event. + * @returns A mock event. + * + * @default + * srcAddress: "0x1234567890123456789012345678901234567890", + * blockNumber: 118034410, + * blockTimestamp: 1000000000, + * chainId: 10 as ChainId, + * contractName: "Strategy", + * logIndex: 1, + * transactionFields: { + * hash: "0xd2352acdcd59e312370831ea927d51a1917654697a72434cd905a60897a5bb8b", + * transactionIndex: 1, + * from: "0xcBf407C33d68a55CB594Ffc8f4fD1416Bba39DA5", + * }, + */ +export const createMockEvent = >( + eventName: T, + params: EventParams<"Strategy", T>, + strategyId: Hex, + overrides: DeepPartial> = {}, +): ProcessorEvent<"Strategy", T> => { + const defaultEvent: ProcessorEvent<"Strategy", T> = { + eventName, + params, + srcAddress: "0x1234567890123456789012345678901234567890", + blockNumber: 118034410, + blockTimestamp: 1000000000, + chainId: 10 as ChainId, + contractName: "Strategy", + logIndex: 1, + transactionFields: { + hash: "0xd2352acdcd59e312370831ea927d51a1917654697a72434cd905a60897a5bb8b", + transactionIndex: 1, + from: "0xcBf407C33d68a55CB594Ffc8f4fD1416Bba39DA5", + }, + strategyId, + }; + + return mergeDeep(defaultEvent, overrides); +}; From 1ccb40896af53ac7d0dc3693ba55af19dfb201a4 Mon Sep 17 00:00:00 2001 From: nigiri <168690269+0xnigir1@users.noreply.github.com> Date: Mon, 23 Dec 2024 21:27:46 -0300 Subject: [PATCH 2/2] feat: add event processing checkpoint on retroactive processing --- apps/processing/README.md | 1 + .../src/services/processing.service.ts | 7 +- .../services/sharedDependencies.service.ts | 7 + .../test/unit/processing.service.spec.ts | 193 ++++++++++-------- .../unit/sharedDependencies.service.spec.ts | 4 + packages/data-flow/README.md | 4 + packages/data-flow/src/orchestrator.ts | 1 - .../data-flow/src/retroactiveProcessor.ts | 49 ++++- .../test/unit/retroactiveProcessor.spec.ts | 65 +++++- packages/repository/src/db/connection.ts | 2 + packages/repository/src/external.ts | 4 + packages/repository/src/interfaces/index.ts | 1 + ...rocessingCheckpointRepository.interface.ts | 28 +++ .../src/repositories/kysely/index.ts | 1 + ...strategyProcessingCheckpoint.repository.ts | 60 ++++++ packages/repository/src/types/index.ts | 1 + .../strategyProcessingCheckpoint.types.ts | 15 ++ ...000_add_strategy_processing_checkpoints.ts | 27 +++ 18 files changed, 374 insertions(+), 96 deletions(-) create mode 100644 packages/repository/src/interfaces/strategyProcessingCheckpointRepository.interface.ts create mode 100644 packages/repository/src/repositories/kysely/strategyProcessingCheckpoint.repository.ts create mode 100644 packages/repository/src/types/strategyProcessingCheckpoint.types.ts create mode 100644 scripts/migrations/src/migrations/20241223T000000_add_strategy_processing_checkpoints.ts diff --git a/apps/processing/README.md b/apps/processing/README.md index 96f7b4f..03e5392 100644 --- a/apps/processing/README.md +++ b/apps/processing/README.md @@ -55,6 +55,7 @@ Available scripts that can be run using `pnpm`: | `start` | Run the compiled app from dist folder | | `test` | Run tests using vitest | | `test:cov` | Run tests with coverage report | +| `retroactive` | Run retroactive processing for all chains | TODO: e2e tests TODO: Docker image diff --git a/apps/processing/src/services/processing.service.ts b/apps/processing/src/services/processing.service.ts index 5e31d13..9a87df2 100644 --- a/apps/processing/src/services/processing.service.ts +++ b/apps/processing/src/services/processing.service.ts @@ -44,7 +44,11 @@ export class ProcessingService { const sharedDependencies = await SharedDependenciesService.initialize(env); const { CHAINS: chains } = env; const { core, registriesRepositories, indexerClient, kyselyDatabase } = sharedDependencies; - const { eventRegistryRepository, strategyRegistryRepository } = registriesRepositories; + const { + eventRegistryRepository, + strategyRegistryRepository, + strategyProcessingCheckpointRepository, + } = registriesRepositories; const orchestrators: Map = new Map(); const strategyRegistry = await InMemoryCachedStrategyRegistry.initialize( @@ -90,6 +94,7 @@ export class ProcessingService { { eventsRegistry: cachedEventsRegistry, strategyRegistry, + checkpointRepository: strategyProcessingCheckpointRepository, }, chain.fetchLimit, chainLogger, diff --git a/apps/processing/src/services/sharedDependencies.service.ts b/apps/processing/src/services/sharedDependencies.service.ts index 3fb29fe..df70dc1 100644 --- a/apps/processing/src/services/sharedDependencies.service.ts +++ b/apps/processing/src/services/sharedDependencies.service.ts @@ -5,6 +5,7 @@ import { PricingProviderFactory } from "@grants-stack-indexer/pricing"; import { createKyselyDatabase, IEventRegistryRepository, + IStrategyProcessingCheckpointRepository, IStrategyRegistryRepository, KyselyApplicationPayoutRepository, KyselyApplicationRepository, @@ -12,6 +13,7 @@ import { KyselyEventRegistryRepository, KyselyProjectRepository, KyselyRoundRepository, + KyselyStrategyProcessingCheckpointRepository, KyselyStrategyRegistryRepository, } from "@grants-stack-indexer/repository"; import { Logger } from "@grants-stack-indexer/shared"; @@ -23,6 +25,7 @@ export type SharedDependencies = { registriesRepositories: { eventRegistryRepository: IEventRegistryRepository; strategyRegistryRepository: IStrategyRegistryRepository; + strategyProcessingCheckpointRepository: IStrategyProcessingCheckpointRepository; }; indexerClient: EnvioIndexerClient; kyselyDatabase: ReturnType; @@ -73,6 +76,9 @@ export class SharedDependenciesService { env.DATABASE_SCHEMA, ); + const strategyProcessingCheckpointRepository = + new KyselyStrategyProcessingCheckpointRepository(kyselyDatabase, env.DATABASE_SCHEMA); + // Initialize indexer client const indexerClient = new EnvioIndexerClient( env.INDEXER_GRAPHQL_URL, @@ -92,6 +98,7 @@ export class SharedDependenciesService { registriesRepositories: { eventRegistryRepository, strategyRegistryRepository, + strategyProcessingCheckpointRepository, }, indexerClient, kyselyDatabase, diff --git a/apps/processing/test/unit/processing.service.spec.ts b/apps/processing/test/unit/processing.service.spec.ts index 15b59ff..ce71a27 100644 --- a/apps/processing/test/unit/processing.service.spec.ts +++ b/apps/processing/test/unit/processing.service.spec.ts @@ -7,6 +7,7 @@ import { InMemoryCachedEventRegistry, InMemoryCachedStrategyRegistry, Orchestrator, + RetroactiveProcessor, } from "@grants-stack-indexer/data-flow"; import type { Environment } from "../../src/config/env.js"; @@ -67,6 +68,11 @@ vi.spyOn(Orchestrator.prototype, "run").mockImplementation(async function (signa await new Promise((resolve) => setTimeout(resolve, 100)); } }); +vi.spyOn(RetroactiveProcessor.prototype, "processRetroactiveStrategies").mockImplementation( + async () => { + await new Promise((resolve) => setTimeout(resolve, 100)); + }, +); describe("ProcessingService", () => { let processingService: ProcessingService; @@ -100,96 +106,109 @@ describe("ProcessingService", () => { vi.clearAllMocks(); }); - it("initializes multiple orchestrators correctly", () => { - expect(InMemoryCachedStrategyRegistry.initialize).toHaveBeenCalledTimes(1); - expect(DatabaseStrategyRegistry).toHaveBeenCalledTimes(1); - expect(DatabaseEventRegistry).toHaveBeenCalledTimes(1); - expect(EvmProvider).toHaveBeenCalledTimes(2); - expect(InMemoryCachedEventRegistry.initialize).toHaveBeenCalledTimes(2); - - // Verify orchestrators were created with correct parameters - expect(processingService["orchestrators"].size).toBe(2); - - // Verify first chain initialization - expect(EvmProvider).toHaveBeenNthCalledWith( - 1, - ["http://localhost:8545"], - expect.any(Object), - expect.any(Object), - ); - - // Verify second chain initialization - expect(EvmProvider).toHaveBeenNthCalledWith( - 2, - ["http://localhost:8546"], - expect.any(Object), - expect.any(Object), - ); - }); - - it("starts all orchestrators and handles shutdown signals", async () => { - const abortSpy = vi.spyOn(AbortController.prototype, "abort"); - const runSpy = vi.mocked(Orchestrator.prototype.run); - const logSpy = vi.spyOn(processingService["logger"], "info"); - - const startPromise = processingService.start(); - - // Wait for orchestrators to start - await new Promise((resolve) => setTimeout(resolve, 100)); - - // Verify both orchestrators are running - // const orchestratorInstances = vi.mocked(Orchestrator).mock.results; - // Verify both orchestrators are running - expect(runSpy).toHaveBeenCalledTimes(2); - expect(runSpy.mock.calls.map((call) => call[0])).toEqual([ - expect.any(AbortSignal), - expect.any(AbortSignal), - ]); - expect(logSpy).toHaveBeenNthCalledWith(2, "Starting orchestrator for chain 1..."); - expect(logSpy).toHaveBeenNthCalledWith(3, "Starting orchestrator for chain 2..."); - - // Simulate SIGINT - process.emit("SIGINT"); - expect(abortSpy).toHaveBeenCalled(); - - // Wait for orchestrators to shut down - await startPromise; - - // Verify all orchestrators were properly shut down - expect(runSpy.mock.results.every((result) => result.value)).toBeTruthy(); - }); - - it("handles SIGTERM signal", async () => { - const abortSpy = vi.spyOn(AbortController.prototype, "abort"); - const startPromise = processingService.start(); - const runSpy = vi.mocked(Orchestrator.prototype.run); - - // Wait for orchestrators to start - await new Promise((resolve) => setTimeout(resolve, 100)); - - // Simulate SIGTERM - process.emit("SIGTERM"); - expect(abortSpy).toHaveBeenCalled(); - - await startPromise; - - // Verify all orchestrators were properly shut down - expect(runSpy.mock.results.every((result) => result.value)).toBeTruthy(); - }); - - it("releases resources correctly", async () => { - await processingService.releaseResources(); - - expect(processingService["kyselyDatabase"].destroy).toHaveBeenCalled(); + describe("start", () => { + it("initializes multiple orchestrators correctly", () => { + expect(InMemoryCachedStrategyRegistry.initialize).toHaveBeenCalledTimes(1); + expect(DatabaseStrategyRegistry).toHaveBeenCalledTimes(1); + expect(DatabaseEventRegistry).toHaveBeenCalledTimes(1); + expect(EvmProvider).toHaveBeenCalledTimes(2); + expect(InMemoryCachedEventRegistry.initialize).toHaveBeenCalledTimes(2); + + // Verify orchestrators were created with correct parameters + expect(processingService["orchestrators"].size).toBe(2); + + // Verify first chain initialization + expect(EvmProvider).toHaveBeenNthCalledWith( + 1, + ["http://localhost:8545"], + expect.any(Object), + expect.any(Object), + ); + + // Verify second chain initialization + expect(EvmProvider).toHaveBeenNthCalledWith( + 2, + ["http://localhost:8546"], + expect.any(Object), + expect.any(Object), + ); + }); + + it("starts all orchestrators and handles shutdown signals", async () => { + const abortSpy = vi.spyOn(AbortController.prototype, "abort"); + const runSpy = vi.mocked(Orchestrator.prototype.run); + const logSpy = vi.spyOn(processingService["logger"], "info"); + + const startPromise = processingService.start(); + + // Wait for orchestrators to start + await new Promise((resolve) => setTimeout(resolve, 100)); + + // Verify both orchestrators are running + // const orchestratorInstances = vi.mocked(Orchestrator).mock.results; + // Verify both orchestrators are running + expect(runSpy).toHaveBeenCalledTimes(2); + expect(runSpy.mock.calls.map((call) => call[0])).toEqual([ + expect.any(AbortSignal), + expect.any(AbortSignal), + ]); + expect(logSpy).toHaveBeenNthCalledWith(2, "Starting orchestrator for chain 1..."); + expect(logSpy).toHaveBeenNthCalledWith(3, "Starting orchestrator for chain 2..."); + + // Simulate SIGINT + process.emit("SIGINT"); + expect(abortSpy).toHaveBeenCalled(); + + // Wait for orchestrators to shut down + await startPromise; + + // Verify all orchestrators were properly shut down + expect(runSpy.mock.results.every((result) => result.value)).toBeTruthy(); + }); + + it("handles SIGTERM signal", async () => { + const abortSpy = vi.spyOn(AbortController.prototype, "abort"); + const startPromise = processingService.start(); + const runSpy = vi.mocked(Orchestrator.prototype.run); + + // Wait for orchestrators to start + await new Promise((resolve) => setTimeout(resolve, 100)); + + // Simulate SIGTERM + process.emit("SIGTERM"); + expect(abortSpy).toHaveBeenCalled(); + + await startPromise; + + // Verify all orchestrators were properly shut down + expect(runSpy.mock.results.every((result) => result.value)).toBeTruthy(); + }); + + it("releases resources correctly", async () => { + await processingService.releaseResources(); + + expect(processingService["kyselyDatabase"].destroy).toHaveBeenCalled(); + }); + + it("logs error during resource release", async () => { + const mockError = new Error("Database error"); + const logSpy = vi.spyOn(processingService["logger"], "error"); + vi.mocked(processingService["kyselyDatabase"].destroy).mockRejectedValueOnce(mockError); + + await processingService.releaseResources(); + + expect(logSpy).toHaveBeenCalledWith(`Error releasing resources: ${mockError}`); + }); }); - it("logs error during resource release", async () => { - const mockError = new Error("Database error"); - const logSpy = vi.spyOn(processingService["logger"], "error"); - vi.mocked(processingService["kyselyDatabase"].destroy).mockRejectedValueOnce(mockError); + describe("retroactiveProcessing", () => { + it("processes retroactive strategies", async () => { + const runSpy = vi.mocked(RetroactiveProcessor.prototype.processRetroactiveStrategies); - await processingService.releaseResources(); + await processingService.processRetroactiveEvents(); - expect(logSpy).toHaveBeenCalledWith(`Error releasing resources: ${mockError}`); + // Verify both retroactive processors were run + expect(runSpy).toHaveBeenCalledTimes(2); + }); }); }); diff --git a/apps/processing/test/unit/sharedDependencies.service.spec.ts b/apps/processing/test/unit/sharedDependencies.service.spec.ts index bff277e..97b722b 100644 --- a/apps/processing/test/unit/sharedDependencies.service.spec.ts +++ b/apps/processing/test/unit/sharedDependencies.service.spec.ts @@ -23,6 +23,7 @@ vi.mock("@grants-stack-indexer/repository", () => ({ saveStrategyId: vi.fn(), })), KyselyEventRegistryRepository: vi.fn(), + KyselyStrategyProcessingCheckpointRepository: vi.fn(), })); vi.mock("@grants-stack-indexer/pricing", () => ({ @@ -126,5 +127,8 @@ describe("SharedDependenciesService", () => { // Verify registries expect(dependencies.registriesRepositories).toHaveProperty("eventRegistryRepository"); expect(dependencies.registriesRepositories).toHaveProperty("strategyRegistryRepository"); + expect(dependencies.registriesRepositories).toHaveProperty( + "strategyProcessingCheckpointRepository", + ); }); }); diff --git a/packages/data-flow/README.md b/packages/data-flow/README.md index d6ab601..9fc328a 100644 --- a/packages/data-flow/README.md +++ b/packages/data-flow/README.md @@ -101,3 +101,7 @@ There are 3 implementations: ### [DataLoader](./src/data-loader/dataLoader.ts) The `DataLoader` is responsible for applying changesets to the database. + +### [RetroactiveProcessor](./src/retroactiveProcessor.ts) + +The `RetroactiveProcessor` is an independent runner class for retroactively processing strategies from strategies that were previously unsupported. diff --git a/packages/data-flow/src/orchestrator.ts b/packages/data-flow/src/orchestrator.ts index 0a2f53e..1a02aee 100644 --- a/packages/data-flow/src/orchestrator.ts +++ b/packages/data-flow/src/orchestrator.ts @@ -48,7 +48,6 @@ import { CoreDependencies, DataLoader, delay, IQueue, iStrategyAbi, Queue } from * - Registry tracking of supported/unsupported strategies and events * * TODO: Enhance the error handling/retries, logging and observability - * TODO: Handle unhandled strategies appropriately */ export class Orchestrator { private readonly eventsQueue: IQueue>; diff --git a/packages/data-flow/src/retroactiveProcessor.ts b/packages/data-flow/src/retroactiveProcessor.ts index 5456cb1..d9fbd80 100644 --- a/packages/data-flow/src/retroactiveProcessor.ts +++ b/packages/data-flow/src/retroactiveProcessor.ts @@ -1,5 +1,6 @@ import { IIndexerClient } from "@grants-stack-indexer/indexer-client"; import { existsHandler, UnsupportedEventException } from "@grants-stack-indexer/processors"; +import { IStrategyProcessingCheckpointRepository } from "@grants-stack-indexer/repository"; import { Address, AnyEvent, @@ -39,10 +40,20 @@ type EventPointer = { * catch up on missed events and maintain data consistency. * * Key responsibilities: - * 1. Identify newly handleable strategies - * 2. Fetch historical events for these strategies - * 3. Process events through the appropriate handlers + * 1. Identify newly handleable strategies that were previously unsupported + * 2. Fetch historical events for these strategies from the Indexer client + * 3. Process events through the appropriate handlers to update system state * 4. Update strategy registry with processed status + * 5. Track processing progress via checkpoints to enable resumability + * + * The checkpoint registry maintains processing state for each strategy, storing: + * - Last processed block number and log index + * + * This enables the processor to: + * - Resume processing from last checkpoint after interruption + * - Track multiple strategies independently + * - Provide processing status visibility + * - Ensure exactly-once processing semantics */ export class RetroactiveProcessor { private readonly eventsFetcher: IEventsFetcher; @@ -50,6 +61,7 @@ export class RetroactiveProcessor { private readonly eventsRegistry: IEventsRegistry; private readonly strategyRegistry: IStrategyRegistry; private readonly dataLoader: DataLoader; + private readonly checkpointRepository: IStrategyProcessingCheckpointRepository; /** * Creates a new instance of RetroactiveProcessor @@ -67,6 +79,7 @@ export class RetroactiveProcessor { private registries: { eventsRegistry: IEventsRegistry; strategyRegistry: IStrategyRegistry; + checkpointRepository: IStrategyProcessingCheckpointRepository; }, private fetchLimit: number = 1000, private logger: ILogger, @@ -78,6 +91,7 @@ export class RetroactiveProcessor { }); this.eventsRegistry = registries.eventsRegistry; this.strategyRegistry = registries.strategyRegistry; + this.checkpointRepository = registries.checkpointRepository; this.dataLoader = new DataLoader( { project: this.dependencies.projectRepository, @@ -146,7 +160,16 @@ export class RetroactiveProcessor { strategyAddresses: Set
, lastEventPointer: Readonly, ): Promise { - const currentPointer: EventPointer = { blockNumber: 0, logIndex: 0 }; + // Check if we have a checkpoint for this strategy + const checkpoint = await this.checkpointRepository.getCheckpoint(this.chainId, strategyId); + + const currentPointer: EventPointer = checkpoint + ? { + blockNumber: checkpoint.lastProcessedBlockNumber, + logIndex: checkpoint.lastProcessedLogIndex, + } + : { blockNumber: 0, logIndex: 0 }; + const events = new Queue & { strategyId?: Hex }>(); let event: (ProcessorEvent & { strategyId?: Hex }) | undefined; @@ -176,8 +199,6 @@ export class RetroactiveProcessor { `Failed to apply changesets. ${executionResult.errors.join("\n")} Event: ${stringify(event)}`, ); } - - // Update pointer } catch (error) { if (error instanceof InvalidEvent || error instanceof UnsupportedEventException) { // Expected errors that we can safely ignore @@ -186,9 +207,25 @@ export class RetroactiveProcessor { this.logger.error(`Error processing event: ${stringify(event)} ${error}`); } } + + // Update checkpoint after processing + await this.updateCheckpoint(strategyId, currentPointer); } await this.markStrategyAsHandled(strategyId, strategyAddresses); + // Delete checkpoint after processing of all events + await this.checkpointRepository.deleteCheckpoint(this.chainId, strategyId); + } + + private async updateCheckpoint(strategyId: Hex, currentPointer: EventPointer): Promise { + const checkpointData = { + chainId: this.chainId, + strategyId, + lastProcessedBlockNumber: currentPointer.blockNumber, + lastProcessedLogIndex: currentPointer.logIndex, + }; + + await this.checkpointRepository.upsertCheckpoint(checkpointData); } private async enqueueEventsIfEmpty( diff --git a/packages/data-flow/test/unit/retroactiveProcessor.spec.ts b/packages/data-flow/test/unit/retroactiveProcessor.spec.ts index 69f5add..d4d3a85 100644 --- a/packages/data-flow/test/unit/retroactiveProcessor.spec.ts +++ b/packages/data-flow/test/unit/retroactiveProcessor.spec.ts @@ -8,6 +8,7 @@ import { IDonationRepository, IProjectRepository, IRoundRepository, + IStrategyProcessingCheckpointRepository, Strategy, } from "@grants-stack-indexer/repository"; import { @@ -64,6 +65,7 @@ describe("RetroactiveProcessor", () => { let mockEventsRegistry: IEventsRegistry; let mockStrategyRegistry: IStrategyRegistry; let mockEvmProvider: EvmProvider; + let mockCheckpointRepository: IStrategyProcessingCheckpointRepository; let mockLogger: ILogger; let mockEventsProcessor: EventsProcessor; let mockDataLoader: DataLoader; @@ -123,6 +125,12 @@ describe("RetroactiveProcessor", () => { warn: vi.fn(), }; + mockCheckpointRepository = { + upsertCheckpoint: vi.fn(), + deleteCheckpoint: vi.fn(), + getCheckpoint: vi.fn(), + }; + const dependencies: CoreDependencies = { evmProvider: mockEvmProvider, projectRepository: {} as IProjectRepository, @@ -145,6 +153,7 @@ describe("RetroactiveProcessor", () => { { eventsRegistry: mockEventsRegistry, strategyRegistry: mockStrategyRegistry, + checkpointRepository: mockCheckpointRepository, }, mockFetchLimit, mockLogger, @@ -217,6 +226,8 @@ describe("RetroactiveProcessor", () => { ); expect(mockEventsProcessor.processEvent).toHaveBeenCalledTimes(1); expect(mockStrategyRegistry.saveStrategyId).toHaveBeenCalledTimes(2); + expect(mockCheckpointRepository.upsertCheckpoint).toHaveBeenCalledTimes(1); + expect(mockCheckpointRepository.deleteCheckpoint).toHaveBeenCalledTimes(1); }); it("process multiple new handleable strategies", async () => { @@ -272,6 +283,59 @@ describe("RetroactiveProcessor", () => { "0x6f9291df02b2664139cec5703c124e4ebce32879c74b6297faa1468aa5ff9ebf", true, ); + expect(mockCheckpointRepository.upsertCheckpoint).toHaveBeenCalledTimes(2); + expect(mockCheckpointRepository.deleteCheckpoint).toHaveBeenCalledTimes(2); + }); + + it("starts from checkpoint if exists", async () => { + const mockEvent = createMockEvent(eventName, defaultParams, existentStrategyId, { + blockNumber: 95, + logIndex: 4, + }); + + vi.spyOn(mockStrategyRegistry, "getStrategies").mockResolvedValue(mockValidStrategies); + vi.spyOn(mockEventsRegistry, "getLastProcessedEvent").mockResolvedValue({ + blockNumber: 100, + logIndex: 1, + chainId, + blockTimestamp: 1234567890, + }); + vi.spyOn(mockCheckpointRepository, "getCheckpoint").mockResolvedValue({ + chainId, + strategyId: existentStrategyId, + lastProcessedBlockNumber: 90, + lastProcessedLogIndex: 0, + }); + + vi.spyOn(mockEventsFetcher, "fetchEvents") + .mockResolvedValueOnce([mockEvent]) + .mockResolvedValue([]); + vi.spyOn(mockEventsProcessor, "processEvent").mockResolvedValue([]); + vi.spyOn(mockDataLoader, "applyChanges").mockResolvedValue({ + numFailed: 0, + errors: [], + changesets: [], + numExecuted: 1, + numSuccessful: 1, + }); + vi.spyOn(mockStrategyRegistry, "saveStrategyId").mockResolvedValue(); + + await processor.processRetroactiveStrategies(); + + expect(mockLogger.info).toHaveBeenCalledWith( + "Retroactive processing complete. Succeeded: 1, Failed: 0", + ); + + expect(mockEventsProcessor.processEvent).toHaveBeenCalledTimes(1); + expect(mockStrategyRegistry.saveStrategyId).toHaveBeenCalledTimes(2); + expect(mockEventsFetcher.fetchEvents).not.toHaveBeenNthCalledWith( + 1, + expect.objectContaining({ + from: { blockNumber: 0, logIndex: 0 }, + }), + ); + expect(mockCheckpointRepository.upsertCheckpoint).toHaveBeenCalledTimes(1); + expect(mockCheckpointRepository.deleteCheckpoint).toHaveBeenCalledTimes(1); }); it("breaks loop if event is older than last processed", async () => { @@ -432,7 +496,6 @@ describe("RetroactiveProcessor", () => { }); }); }); - /** * Creates a mock event for testing. * diff --git a/packages/repository/src/db/connection.ts b/packages/repository/src/db/connection.ts index d7398a4..f243832 100644 --- a/packages/repository/src/db/connection.ts +++ b/packages/repository/src/db/connection.ts @@ -21,6 +21,7 @@ import { Round, RoundRole as RoundRoleTable, StatusSnapshot, + StrategyProcessingCheckpoint as StrategyProcessingCheckpointTable, Strategy as StrategyRegistryTable, } from "../internal.js"; @@ -63,6 +64,7 @@ export interface Database { applicationsPayouts: ApplicationPayoutTable; strategiesRegistry: StrategyRegistryTable; eventsRegistry: EventRegistryTable; + strategyProcessingCheckpoints: StrategyProcessingCheckpointTable; } /** diff --git a/packages/repository/src/external.ts b/packages/repository/src/external.ts index 08222ec..9966150 100644 --- a/packages/repository/src/external.ts +++ b/packages/repository/src/external.ts @@ -10,6 +10,7 @@ export type { IApplicationPayoutRepository, IStrategyRegistryRepository, IEventRegistryRepository, + IStrategyProcessingCheckpointRepository, DatabaseConfig, } from "./internal.js"; @@ -57,6 +58,7 @@ export { KyselyApplicationPayoutRepository, KyselyStrategyRegistryRepository, KyselyEventRegistryRepository, + KyselyStrategyProcessingCheckpointRepository, } from "./repositories/kysely/index.js"; export { @@ -66,4 +68,6 @@ export { ProjectByRoleNotFound, } from "./internal.js"; +export type { StrategyProcessingCheckpoint, NewStrategyProcessingCheckpoint } from "./internal.js"; + export { createKyselyPostgresDb as createKyselyDatabase } from "./internal.js"; diff --git a/packages/repository/src/interfaces/index.ts b/packages/repository/src/interfaces/index.ts index f1a27be..f8818a5 100644 --- a/packages/repository/src/interfaces/index.ts +++ b/packages/repository/src/interfaces/index.ts @@ -5,3 +5,4 @@ export * from "./donationRepository.interface.js"; export * from "./applicationPayoutRepository.interface.js"; export * from "./strategyRepository.interface.js"; export * from "./eventsRepository.interface.js"; +export * from "./strategyProcessingCheckpointRepository.interface.js"; diff --git a/packages/repository/src/interfaces/strategyProcessingCheckpointRepository.interface.ts b/packages/repository/src/interfaces/strategyProcessingCheckpointRepository.interface.ts new file mode 100644 index 0000000..2ad8d7f --- /dev/null +++ b/packages/repository/src/interfaces/strategyProcessingCheckpointRepository.interface.ts @@ -0,0 +1,28 @@ +import { ChainId, Hex } from "@grants-stack-indexer/shared"; + +import { NewStrategyProcessingCheckpoint, StrategyProcessingCheckpoint } from "../internal.js"; + +export interface IStrategyProcessingCheckpointRepository { + /** + * Get the latest checkpoint for a strategy + * @param chainId - The chain ID + * @param strategyId - The strategy ID + */ + getCheckpoint( + chainId: ChainId, + strategyId: Hex, + ): Promise; + + /** + * Upsert a checkpoint for a strategy + * @param checkpoint - The checkpoint data to upsert + */ + upsertCheckpoint(checkpoint: NewStrategyProcessingCheckpoint): Promise; + + /** + * Delete the checkpoint for a strategy + * @param chainId - The chain ID + * @param strategyId - The strategy ID + */ + deleteCheckpoint(chainId: ChainId, strategyId: Hex): Promise; +} diff --git a/packages/repository/src/repositories/kysely/index.ts b/packages/repository/src/repositories/kysely/index.ts index 66febbe..fcdd4e3 100644 --- a/packages/repository/src/repositories/kysely/index.ts +++ b/packages/repository/src/repositories/kysely/index.ts @@ -5,3 +5,4 @@ export * from "./donation.repository.js"; export * from "./applicationPayout.repository.js"; export * from "./strategyRegistry.repository.js"; export * from "./eventRegistry.repository.js"; +export * from "./strategyProcessingCheckpoint.repository.js"; diff --git a/packages/repository/src/repositories/kysely/strategyProcessingCheckpoint.repository.ts b/packages/repository/src/repositories/kysely/strategyProcessingCheckpoint.repository.ts new file mode 100644 index 0000000..80c6e34 --- /dev/null +++ b/packages/repository/src/repositories/kysely/strategyProcessingCheckpoint.repository.ts @@ -0,0 +1,60 @@ +import { Kysely } from "kysely"; + +import { ChainId, Hex } from "@grants-stack-indexer/shared"; + +import { Database } from "../../db/connection.js"; +import { IStrategyProcessingCheckpointRepository } from "../../interfaces/strategyProcessingCheckpointRepository.interface.js"; +import { NewStrategyProcessingCheckpoint, StrategyProcessingCheckpoint } from "../../internal.js"; + +export class KyselyStrategyProcessingCheckpointRepository + implements IStrategyProcessingCheckpointRepository +{ + constructor( + private readonly db: Kysely, + private readonly schemaName: string, + ) {} + + /** @inheritdoc */ + async getCheckpoint( + chainId: ChainId, + strategyId: Hex, + ): Promise { + return this.db + .withSchema(this.schemaName) + .selectFrom("strategyProcessingCheckpoints") + .where("chainId", "=", chainId) + .where("strategyId", "=", strategyId) + .selectAll() + .executeTakeFirst(); + } + + /** @inheritdoc */ + async upsertCheckpoint(checkpoint: NewStrategyProcessingCheckpoint): Promise { + await this.db + .withSchema(this.schemaName) + .insertInto("strategyProcessingCheckpoints") + .values({ + ...checkpoint, + createdAt: new Date(), + updatedAt: new Date(), + }) + .onConflict((oc) => + oc.columns(["chainId", "strategyId"]).doUpdateSet({ + lastProcessedBlockNumber: checkpoint.lastProcessedBlockNumber, + lastProcessedLogIndex: checkpoint.lastProcessedLogIndex, + updatedAt: new Date(), + }), + ) + .execute(); + } + + /** @inheritdoc */ + async deleteCheckpoint(chainId: ChainId, strategyId: Hex): Promise { + await this.db + .withSchema(this.schemaName) + .deleteFrom("strategyProcessingCheckpoints") + .where("chainId", "=", chainId) + .where("strategyId", "=", strategyId) + .execute(); + } +} diff --git a/packages/repository/src/types/index.ts b/packages/repository/src/types/index.ts index 9832417..64da8f9 100644 --- a/packages/repository/src/types/index.ts +++ b/packages/repository/src/types/index.ts @@ -6,3 +6,4 @@ export * from "./donation.types.js"; export * from "./applicationPayout.types.js"; export * from "./strategy.types.js"; export * from "./event.types.js"; +export * from "./strategyProcessingCheckpoint.types.js"; diff --git a/packages/repository/src/types/strategyProcessingCheckpoint.types.ts b/packages/repository/src/types/strategyProcessingCheckpoint.types.ts new file mode 100644 index 0000000..1d88cad --- /dev/null +++ b/packages/repository/src/types/strategyProcessingCheckpoint.types.ts @@ -0,0 +1,15 @@ +import { ChainId, Hex } from "@grants-stack-indexer/shared"; + +export type StrategyProcessingCheckpoint = { + chainId: ChainId; + strategyId: Hex; + lastProcessedBlockNumber: number; + lastProcessedLogIndex: number; + createdAt?: Date; + updatedAt?: Date; +}; + +export type NewStrategyProcessingCheckpoint = Omit< + StrategyProcessingCheckpoint, + "createdAt" | "updatedAt" +>; diff --git a/scripts/migrations/src/migrations/20241223T000000_add_strategy_processing_checkpoints.ts b/scripts/migrations/src/migrations/20241223T000000_add_strategy_processing_checkpoints.ts new file mode 100644 index 0000000..6a633c1 --- /dev/null +++ b/scripts/migrations/src/migrations/20241223T000000_add_strategy_processing_checkpoints.ts @@ -0,0 +1,27 @@ +import { Kysely, sql } from "kysely"; + +/** + * The up function is called when you update your database schema to the next version and down when you go back to previous version. + * The only argument for the functions is an instance of Kysely. It's important to use Kysely and not Kysely. + * ref: https://kysely.dev/docs/migrations#migration-files + */ +// eslint-disable-next-line @typescript-eslint/no-explicit-any +export async function up(db: Kysely): Promise { + const CHAIN_ID_TYPE = "integer"; + + await db.schema + .createTable("strategy_processing_checkpoints") + .addColumn("chainId", CHAIN_ID_TYPE) + .addColumn("strategyId", "text") + .addColumn("lastProcessedBlockNumber", "integer") + .addColumn("lastProcessedLogIndex", "integer") + .addColumn("createdAt", "timestamptz", (col) => col.defaultTo(sql`now()`)) + .addColumn("updatedAt", "timestamptz", (col) => col.defaultTo(sql`now()`)) + .addPrimaryKeyConstraint("strategy_processing_checkpoints_pkey", ["chainId", "strategyId"]) + .execute(); +} + +// eslint-disable-next-line @typescript-eslint/no-explicit-any +export async function down(db: Kysely): Promise { + await db.schema.dropTable("strategy_processing_checkpoints").execute(); +}