Skip to content

Commit

Permalink
feat: db event registry repository (#42)
Browse files Browse the repository at this point in the history
# 🤖 Linear

Closes GIT-188

## Description
We want to persist the last event processed for each chain so we can
safely stop and resume the indexer from where we left off.

- Create `EventRegistryRepository` to store the last processed event per
chain
- `DBEventsRegistry` that interfaces with the repo
- a Proxied version that caches in memory and uses the `DBEventRegistry`
(InMemoryCachedEventRegistry) and loads into memory the data stored on
DB on init
- update Processing app to use the InMemoryCachedEventRegistry
- add the migration to create the table `events` for `DBEventsRegistry`
repo


## Checklist before requesting a review

-   [x] I have conducted a self-review of my code.
-   [x] I have conducted a QA.
-   [x] If it is a core feature, I have included comprehensive tests.
  • Loading branch information
0xnigir1 authored Dec 20, 2024
1 parent fbec174 commit 4b4c594
Show file tree
Hide file tree
Showing 30 changed files with 555 additions and 228 deletions.
57 changes: 47 additions & 10 deletions apps/processing/src/services/processing.service.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,13 @@
import { optimism } from "viem/chains";

import { EvmProvider } from "@grants-stack-indexer/chain-providers";
import { Orchestrator } from "@grants-stack-indexer/data-flow";
import {
DatabaseEventRegistry,
DatabaseStrategyRegistry,
InMemoryCachedEventRegistry,
InMemoryCachedStrategyRegistry,
Orchestrator,
} from "@grants-stack-indexer/data-flow";
import { ChainId, Logger } from "@grants-stack-indexer/shared";

import { Environment } from "../config/env.js";
Expand All @@ -10,8 +16,10 @@ import { SharedDependencies, SharedDependenciesService } from "./index.js";
/**
* Processor service application
* - Initializes core dependencies (repositories, providers) via SharedDependenciesService
* - Initializes a StrategyRegistry and loads it with strategies from the database
* For each chain:
* - Sets up EVM provider with configured RPC endpoints
* - Instantiates an EventsRegistry and loads it with the last processed event for the chain
* - Creates an Orchestrator instance to coordinate an specific chain:
* - Fetching on-chain events via indexer client
* - Processing events through registered handlers
Expand All @@ -23,34 +31,63 @@ export class ProcessingService {
private readonly logger = new Logger({ className: "ProcessingService" });
private readonly kyselyDatabase: SharedDependencies["kyselyDatabase"];

private constructor(env: Environment, sharedDependencies: SharedDependencies) {
const { CHAINS: chains } = env;
const { core, registries, indexerClient, kyselyDatabase } = sharedDependencies;
private constructor(
orchestrators: Map<ChainId, Orchestrator>,
kyselyDatabase: SharedDependencies["kyselyDatabase"],
) {
this.orchestrators = orchestrators;
this.kyselyDatabase = kyselyDatabase;
}

static async initialize(env: Environment): Promise<ProcessingService> {
const sharedDependencies = await SharedDependenciesService.initialize(env);
const { CHAINS: chains } = env;
const { core, registriesRepositories, indexerClient, kyselyDatabase } = sharedDependencies;
const { eventRegistryRepository, strategyRegistryRepository } = registriesRepositories;
const orchestrators: Map<ChainId, Orchestrator> = new Map();

const strategyRegistry = await InMemoryCachedStrategyRegistry.initialize(
new Logger({ className: "InMemoryCachedStrategyRegistry" }),
new DatabaseStrategyRegistry(
new Logger({ className: "DatabaseStrategyRegistry" }),
strategyRegistryRepository,
),
);
const eventsRegistry = new DatabaseEventRegistry(
new Logger({ className: "DatabaseEventRegistry" }),
eventRegistryRepository,
);

for (const chain of chains) {
const chainLogger = new Logger({ chainId: chain.id as ChainId });
// Initialize EVM provider
const evmProvider = new EvmProvider(chain.rpcUrls, optimism, chainLogger);

this.orchestrators.set(
// Initialize events registry for the chain
const cachedEventsRegistry = await InMemoryCachedEventRegistry.initialize(
new Logger({ className: "InMemoryCachedEventRegistry" }),
eventsRegistry,
[chain.id as ChainId],
);

orchestrators.set(
chain.id as ChainId,
new Orchestrator(
chain.id as ChainId,
{ ...core, evmProvider },
indexerClient,
registries,
{
eventsRegistry: cachedEventsRegistry,
strategyRegistry,
},
chain.fetchLimit,
chain.fetchDelayMs,
chainLogger,
),
);
}
}

static async initialize(env: Environment): Promise<ProcessingService> {
const sharedDependencies = await SharedDependenciesService.initialize(env);
return new ProcessingService(env, sharedDependencies);
return new ProcessingService(orchestrators, kyselyDatabase);
}

/**
Expand Down
41 changes: 15 additions & 26 deletions apps/processing/src/services/sharedDependencies.service.ts
Original file line number Diff line number Diff line change
@@ -1,19 +1,15 @@
import {
CoreDependencies,
DatabaseStrategyRegistry,
IEventsRegistry,
InMemoryCachedStrategyRegistry,
InMemoryEventsRegistry,
IStrategyRegistry,
} from "@grants-stack-indexer/data-flow";
import { CoreDependencies } from "@grants-stack-indexer/data-flow";
import { EnvioIndexerClient } from "@grants-stack-indexer/indexer-client";
import { IpfsProvider } from "@grants-stack-indexer/metadata";
import { PricingProviderFactory } from "@grants-stack-indexer/pricing";
import {
createKyselyDatabase,
IEventRegistryRepository,
IStrategyRegistryRepository,
KyselyApplicationPayoutRepository,
KyselyApplicationRepository,
KyselyDonationRepository,
KyselyEventRegistryRepository,
KyselyProjectRepository,
KyselyRoundRepository,
KyselyStrategyRegistryRepository,
Expand All @@ -24,9 +20,9 @@ import { Environment } from "../config/index.js";

export type SharedDependencies = {
core: Omit<CoreDependencies, "evmProvider">;
registries: {
eventsRegistry: IEventsRegistry;
strategyRegistry: IStrategyRegistry;
registriesRepositories: {
eventRegistryRepository: IEventRegistryRepository;
strategyRegistryRepository: IStrategyRegistryRepository;
};
indexerClient: EnvioIndexerClient;
kyselyDatabase: ReturnType<typeof createKyselyDatabase>;
Expand All @@ -35,7 +31,7 @@ export type SharedDependencies = {
/**
* Shared dependencies service
* - Initializes core dependencies (repositories, providers)
* - Initializes registries
* - Initializes registries repositories
* - Initializes indexer client
*/
export class SharedDependenciesService {
Expand Down Expand Up @@ -68,20 +64,13 @@ export class SharedDependenciesService {
new Logger({ className: "IpfsProvider" }),
);

// Initialize registries
const eventsRegistry = new InMemoryEventsRegistry(
new Logger({ className: "InMemoryEventsRegistry" }),
);
const strategyRepository = new KyselyStrategyRegistryRepository(
const eventRegistryRepository = new KyselyEventRegistryRepository(
kyselyDatabase,
env.DATABASE_SCHEMA,
);
const strategyRegistry = await InMemoryCachedStrategyRegistry.initialize(
new Logger({ className: "InMemoryCachedStrategyRegistry" }),
new DatabaseStrategyRegistry(
new Logger({ className: "DatabaseStrategyRegistry" }),
strategyRepository,
),
const strategyRegistryRepository = new KyselyStrategyRegistryRepository(
kyselyDatabase,
env.DATABASE_SCHEMA,
);

// Initialize indexer client
Expand All @@ -100,9 +89,9 @@ export class SharedDependenciesService {
metadataProvider,
applicationPayoutRepository,
},
registries: {
eventsRegistry,
strategyRegistry,
registriesRepositories: {
eventRegistryRepository,
strategyRegistryRepository,
},
indexerClient,
kyselyDatabase,
Expand Down
48 changes: 46 additions & 2 deletions apps/processing/test/unit/processing.service.spec.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,13 @@
import { afterEach, beforeEach, describe, expect, it, vi } from "vitest";

import { EvmProvider } from "@grants-stack-indexer/chain-providers";
import { Orchestrator } from "@grants-stack-indexer/data-flow";
import {
DatabaseEventRegistry,
DatabaseStrategyRegistry,
InMemoryCachedEventRegistry,
InMemoryCachedStrategyRegistry,
Orchestrator,
} from "@grants-stack-indexer/data-flow";

import type { Environment } from "../../src/config/env.js";
import { ProcessingService } from "../../src/services/processing.service.js";
Expand All @@ -10,7 +16,7 @@ vi.mock("../../src/services/sharedDependencies.service.js", () => ({
SharedDependenciesService: {
initialize: vi.fn(() => ({
core: {},
registries: {},
registriesRepositories: {},
indexerClient: {},
kyselyDatabase: {
destroy: vi.fn(),
Expand All @@ -23,6 +29,39 @@ vi.mock("@grants-stack-indexer/chain-providers", () => ({
EvmProvider: vi.fn(),
}));

vi.mock("@grants-stack-indexer/data-flow", async (importOriginal) => {
const actual = await importOriginal<typeof import("@grants-stack-indexer/data-flow")>();
const mockStrategyRegistry = {
getStrategies: vi.fn(),
getStrategyId: vi.fn(),
saveStrategyId: vi.fn(),
};

const mockEventRegistry = {
getLastProcessedEvent: vi.fn(),
saveLastProcessedEvent: vi.fn(),
};

return {
...actual,
InMemoryCachedStrategyRegistry: {
initialize: vi.fn().mockResolvedValue(mockStrategyRegistry),
},
DatabaseStrategyRegistry: vi.fn().mockImplementation(() => ({
getStrategies: vi.fn(),
getStrategyId: vi.fn(),
saveStrategyId: vi.fn(),
})),
DatabaseEventRegistry: vi.fn().mockImplementation(() => ({
getLastProcessedEvent: vi.fn(),
saveLastProcessedEvent: vi.fn(),
})),
InMemoryCachedEventRegistry: {
initialize: vi.fn().mockResolvedValue(mockEventRegistry),
},
};
});

vi.spyOn(Orchestrator.prototype, "run").mockImplementation(async function (signal: AbortSignal) {
while (!signal.aborted) {
await new Promise((resolve) => setTimeout(resolve, 100));
Expand Down Expand Up @@ -62,7 +101,12 @@ describe("ProcessingService", () => {
});

it("initializes multiple orchestrators correctly", () => {
expect(InMemoryCachedStrategyRegistry.initialize).toHaveBeenCalledTimes(1);
expect(DatabaseStrategyRegistry).toHaveBeenCalledTimes(1);
expect(DatabaseEventRegistry).toHaveBeenCalledTimes(1);
expect(EvmProvider).toHaveBeenCalledTimes(2);
expect(InMemoryCachedEventRegistry.initialize).toHaveBeenCalledTimes(2);

// Verify orchestrators were created with correct parameters
expect(processingService["orchestrators"].size).toBe(2);

Expand Down
23 changes: 16 additions & 7 deletions apps/processing/test/unit/sharedDependencies.service.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ vi.mock("@grants-stack-indexer/repository", () => ({
getStrategyId: vi.fn(),
saveStrategyId: vi.fn(),
})),
KyselyEventRegistryRepository: vi.fn(),
}));

vi.mock("@grants-stack-indexer/pricing", () => ({
Expand All @@ -45,8 +46,12 @@ vi.mock("@grants-stack-indexer/data-flow", () => {
saveStrategyId: vi.fn(),
};

const mockEventRegistry = {
getLastProcessedEvent: vi.fn(),
saveLastProcessedEvent: vi.fn(),
};

return {
InMemoryEventsRegistry: vi.fn(),
InMemoryCachedStrategyRegistry: {
initialize: vi.fn().mockResolvedValue(mockStrategyRegistry),
},
Expand All @@ -55,6 +60,13 @@ vi.mock("@grants-stack-indexer/data-flow", () => {
getStrategyId: vi.fn(),
saveStrategyId: vi.fn(),
})),
DatabaseEventRegistry: vi.fn().mockImplementation(() => ({
getLastProcessedEvent: vi.fn(),
saveLastProcessedEvent: vi.fn(),
})),
InMemoryCachedEventRegistry: {
initialize: vi.fn().mockResolvedValue(mockEventRegistry),
},
};
});

Expand Down Expand Up @@ -98,7 +110,7 @@ describe("SharedDependenciesService", () => {

// Verify structure of returned dependencies
expect(dependencies).toHaveProperty("core");
expect(dependencies).toHaveProperty("registries");
expect(dependencies).toHaveProperty("registriesRepositories");
expect(dependencies).toHaveProperty("indexerClient");
expect(dependencies).toHaveProperty("kyselyDatabase");

Expand All @@ -112,10 +124,7 @@ describe("SharedDependenciesService", () => {
expect(dependencies.core).toHaveProperty("applicationPayoutRepository");

// Verify registries
expect(dependencies.registries).toHaveProperty("eventsRegistry");
expect(dependencies.registries).toHaveProperty("strategyRegistry");

// Verify InMemoryCachedStrategyRegistry initialization
expect(dependencies.registries.strategyRegistry).toBeDefined();
expect(dependencies.registriesRepositories).toHaveProperty("eventRegistryRepository");
expect(dependencies.registriesRepositories).toHaveProperty("strategyRegistryRepository");
});
});
40 changes: 0 additions & 40 deletions packages/data-flow/src/eventsRegistry.ts

This file was deleted.

3 changes: 2 additions & 1 deletion packages/data-flow/src/external.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
export {
DataLoader,
InMemoryEventsRegistry,
InMemoryCachedStrategyRegistry,
InMemoryCachedEventRegistry,
DatabaseEventRegistry,
DatabaseStrategyRegistry,
Orchestrator,
} from "./internal.js";
Expand Down
12 changes: 4 additions & 8 deletions packages/data-flow/src/interfaces/eventsRegistry.interface.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import { AnyEvent, ChainId, ContractName, ProcessorEvent } from "@grants-stack-indexer/shared";
import { NewProcessedEvent, ProcessedEvent } from "@grants-stack-indexer/repository";
import { ChainId } from "@grants-stack-indexer/shared";

/**
* The events registry saves as a checkpoint to the last processed event by the system.
Expand All @@ -10,16 +11,11 @@ export interface IEventsRegistry {
* @param chainId - The chain id
* @returns The last processed event or undefined if no event has been processed yet.
*/
getLastProcessedEvent(
chainId: ChainId,
): Promise<ProcessorEvent<ContractName, AnyEvent> | undefined>;
getLastProcessedEvent(chainId: ChainId): Promise<ProcessedEvent | undefined>;
/**
* Save the last processed event by the system
* @param chainId - The chain id
* @param event - The event to save.
*/
saveLastProcessedEvent(
chainId: ChainId,
event: ProcessorEvent<ContractName, AnyEvent>,
): Promise<void>;
saveLastProcessedEvent(chainId: ChainId, event: NewProcessedEvent): Promise<void>;
}
Loading

0 comments on commit 4b4c594

Please sign in to comment.