Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: db event registry repository #42

Merged
merged 7 commits into from
Dec 20, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
57 changes: 47 additions & 10 deletions apps/processing/src/services/processing.service.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,13 @@
import { optimism } from "viem/chains";

import { EvmProvider } from "@grants-stack-indexer/chain-providers";
import { Orchestrator } from "@grants-stack-indexer/data-flow";
import {
DatabaseEventRegistry,
DatabaseStrategyRegistry,
InMemoryCachedEventRegistry,
InMemoryCachedStrategyRegistry,
Orchestrator,
} from "@grants-stack-indexer/data-flow";
import { ChainId, Logger } from "@grants-stack-indexer/shared";

import { Environment } from "../config/env.js";
Expand All @@ -10,8 +16,10 @@ import { SharedDependencies, SharedDependenciesService } from "./index.js";
/**
* Processor service application
* - Initializes core dependencies (repositories, providers) via SharedDependenciesService
* - Initializes a StrategyRegistry and loads it with strategies from the database
* For each chain:
* - Sets up EVM provider with configured RPC endpoints
* - Instantiates an EventsRegistry and loads it with the last processed event for the chain
* - Creates an Orchestrator instance to coordinate an specific chain:
* - Fetching on-chain events via indexer client
* - Processing events through registered handlers
Expand All @@ -23,34 +31,63 @@ export class ProcessingService {
private readonly logger = new Logger({ className: "ProcessingService" });
private readonly kyselyDatabase: SharedDependencies["kyselyDatabase"];

private constructor(env: Environment, sharedDependencies: SharedDependencies) {
const { CHAINS: chains } = env;
const { core, registries, indexerClient, kyselyDatabase } = sharedDependencies;
private constructor(
orchestrators: Map<ChainId, Orchestrator>,
kyselyDatabase: SharedDependencies["kyselyDatabase"],
) {
this.orchestrators = orchestrators;
this.kyselyDatabase = kyselyDatabase;
}

static async initialize(env: Environment): Promise<ProcessingService> {
const sharedDependencies = await SharedDependenciesService.initialize(env);
const { CHAINS: chains } = env;
const { core, registriesRepositories, indexerClient, kyselyDatabase } = sharedDependencies;
const { eventRegistryRepository, strategyRegistryRepository } = registriesRepositories;
const orchestrators: Map<ChainId, Orchestrator> = new Map();

const strategyRegistry = await InMemoryCachedStrategyRegistry.initialize(
new Logger({ className: "InMemoryCachedStrategyRegistry" }),
new DatabaseStrategyRegistry(
new Logger({ className: "DatabaseStrategyRegistry" }),
strategyRegistryRepository,
),
);
const eventsRegistry = new DatabaseEventRegistry(
new Logger({ className: "DatabaseEventRegistry" }),
eventRegistryRepository,
);
Comment on lines +49 to +59
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is there a reason why you are doing this outside the SharedDependenciesService ?


for (const chain of chains) {
const chainLogger = new Logger({ chainId: chain.id as ChainId });
// Initialize EVM provider
const evmProvider = new EvmProvider(chain.rpcUrls, optimism, chainLogger);

this.orchestrators.set(
// Initialize events registry for the chain
const cachedEventsRegistry = await InMemoryCachedEventRegistry.initialize(
new Logger({ className: "InMemoryCachedEventRegistry" }),
eventsRegistry,
[chain.id as ChainId],
);

orchestrators.set(
chain.id as ChainId,
new Orchestrator(
chain.id as ChainId,
{ ...core, evmProvider },
indexerClient,
registries,
{
eventsRegistry: cachedEventsRegistry,
strategyRegistry,
},
chain.fetchLimit,
chain.fetchDelayMs,
chainLogger,
),
);
}
}

static async initialize(env: Environment): Promise<ProcessingService> {
const sharedDependencies = await SharedDependenciesService.initialize(env);
return new ProcessingService(env, sharedDependencies);
return new ProcessingService(orchestrators, kyselyDatabase);
}

/**
Expand Down
41 changes: 15 additions & 26 deletions apps/processing/src/services/sharedDependencies.service.ts
Original file line number Diff line number Diff line change
@@ -1,19 +1,15 @@
import {
CoreDependencies,
DatabaseStrategyRegistry,
IEventsRegistry,
InMemoryCachedStrategyRegistry,
InMemoryEventsRegistry,
IStrategyRegistry,
} from "@grants-stack-indexer/data-flow";
import { CoreDependencies } from "@grants-stack-indexer/data-flow";
import { EnvioIndexerClient } from "@grants-stack-indexer/indexer-client";
import { IpfsProvider } from "@grants-stack-indexer/metadata";
import { PricingProviderFactory } from "@grants-stack-indexer/pricing";
import {
createKyselyDatabase,
IEventRegistryRepository,
IStrategyRegistryRepository,
KyselyApplicationPayoutRepository,
KyselyApplicationRepository,
KyselyDonationRepository,
KyselyEventRegistryRepository,
KyselyProjectRepository,
KyselyRoundRepository,
KyselyStrategyRegistryRepository,
Expand All @@ -24,9 +20,9 @@ import { Environment } from "../config/index.js";

export type SharedDependencies = {
core: Omit<CoreDependencies, "evmProvider">;
registries: {
eventsRegistry: IEventsRegistry;
strategyRegistry: IStrategyRegistry;
registriesRepositories: {
eventRegistryRepository: IEventRegistryRepository;
strategyRegistryRepository: IStrategyRegistryRepository;
};
Comment on lines +23 to +25
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i think the previous way was ok. I...Registry should be de dependency not I...RegistryRepository. Lmk if iam missing something

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i've changed the way it works by creating an instance per chain but using the same repository, since it's not really necessary to load into memory the lastProcessedEvent of another chain (if we deploy a container per chain, this is more obvious to see).

if this makes sense, the same change applies for strategies (we create an instance per chain) but didn't want to mix and make the change for StrategyRegistry too. if we go for this approach, i'll make the change in another PR

if smth it's not clear, we can discuss it offline 😄

indexerClient: EnvioIndexerClient;
kyselyDatabase: ReturnType<typeof createKyselyDatabase>;
Expand All @@ -35,7 +31,7 @@ export type SharedDependencies = {
/**
* Shared dependencies service
* - Initializes core dependencies (repositories, providers)
* - Initializes registries
* - Initializes registries repositories
* - Initializes indexer client
*/
export class SharedDependenciesService {
Expand Down Expand Up @@ -68,20 +64,13 @@ export class SharedDependenciesService {
new Logger({ className: "IpfsProvider" }),
);

// Initialize registries
const eventsRegistry = new InMemoryEventsRegistry(
new Logger({ className: "InMemoryEventsRegistry" }),
);
const strategyRepository = new KyselyStrategyRegistryRepository(
const eventRegistryRepository = new KyselyEventRegistryRepository(
kyselyDatabase,
env.DATABASE_SCHEMA,
);
const strategyRegistry = await InMemoryCachedStrategyRegistry.initialize(
new Logger({ className: "InMemoryCachedStrategyRegistry" }),
new DatabaseStrategyRegistry(
new Logger({ className: "DatabaseStrategyRegistry" }),
strategyRepository,
),
const strategyRegistryRepository = new KyselyStrategyRegistryRepository(
kyselyDatabase,
env.DATABASE_SCHEMA,
);

// Initialize indexer client
Expand All @@ -100,9 +89,9 @@ export class SharedDependenciesService {
metadataProvider,
applicationPayoutRepository,
},
registries: {
eventsRegistry,
strategyRegistry,
registriesRepositories: {
eventRegistryRepository,
strategyRegistryRepository,
},
indexerClient,
kyselyDatabase,
Expand Down
48 changes: 46 additions & 2 deletions apps/processing/test/unit/processing.service.spec.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,13 @@
import { afterEach, beforeEach, describe, expect, it, vi } from "vitest";

import { EvmProvider } from "@grants-stack-indexer/chain-providers";
import { Orchestrator } from "@grants-stack-indexer/data-flow";
import {
DatabaseEventRegistry,
DatabaseStrategyRegistry,
InMemoryCachedEventRegistry,
InMemoryCachedStrategyRegistry,
Orchestrator,
} from "@grants-stack-indexer/data-flow";

import type { Environment } from "../../src/config/env.js";
import { ProcessingService } from "../../src/services/processing.service.js";
Expand All @@ -10,7 +16,7 @@ vi.mock("../../src/services/sharedDependencies.service.js", () => ({
SharedDependenciesService: {
initialize: vi.fn(() => ({
core: {},
registries: {},
registriesRepositories: {},
indexerClient: {},
kyselyDatabase: {
destroy: vi.fn(),
Expand All @@ -23,6 +29,39 @@ vi.mock("@grants-stack-indexer/chain-providers", () => ({
EvmProvider: vi.fn(),
}));

vi.mock("@grants-stack-indexer/data-flow", async (importOriginal) => {
const actual = await importOriginal<typeof import("@grants-stack-indexer/data-flow")>();
const mockStrategyRegistry = {
getStrategies: vi.fn(),
getStrategyId: vi.fn(),
saveStrategyId: vi.fn(),
};

const mockEventRegistry = {
getLastProcessedEvent: vi.fn(),
saveLastProcessedEvent: vi.fn(),
};

return {
...actual,
InMemoryCachedStrategyRegistry: {
initialize: vi.fn().mockResolvedValue(mockStrategyRegistry),
},
DatabaseStrategyRegistry: vi.fn().mockImplementation(() => ({
getStrategies: vi.fn(),
getStrategyId: vi.fn(),
saveStrategyId: vi.fn(),
})),
DatabaseEventRegistry: vi.fn().mockImplementation(() => ({
getLastProcessedEvent: vi.fn(),
saveLastProcessedEvent: vi.fn(),
})),
InMemoryCachedEventRegistry: {
initialize: vi.fn().mockResolvedValue(mockEventRegistry),
},
};
});

vi.spyOn(Orchestrator.prototype, "run").mockImplementation(async function (signal: AbortSignal) {
while (!signal.aborted) {
await new Promise((resolve) => setTimeout(resolve, 100));
Expand Down Expand Up @@ -62,7 +101,12 @@ describe("ProcessingService", () => {
});

it("initializes multiple orchestrators correctly", () => {
expect(InMemoryCachedStrategyRegistry.initialize).toHaveBeenCalledTimes(1);
expect(DatabaseStrategyRegistry).toHaveBeenCalledTimes(1);
expect(DatabaseEventRegistry).toHaveBeenCalledTimes(1);
expect(EvmProvider).toHaveBeenCalledTimes(2);
expect(InMemoryCachedEventRegistry.initialize).toHaveBeenCalledTimes(2);

// Verify orchestrators were created with correct parameters
expect(processingService["orchestrators"].size).toBe(2);

Expand Down
23 changes: 16 additions & 7 deletions apps/processing/test/unit/sharedDependencies.service.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ vi.mock("@grants-stack-indexer/repository", () => ({
getStrategyId: vi.fn(),
saveStrategyId: vi.fn(),
})),
KyselyEventRegistryRepository: vi.fn(),
}));

vi.mock("@grants-stack-indexer/pricing", () => ({
Expand All @@ -45,8 +46,12 @@ vi.mock("@grants-stack-indexer/data-flow", () => {
saveStrategyId: vi.fn(),
};

const mockEventRegistry = {
getLastProcessedEvent: vi.fn(),
saveLastProcessedEvent: vi.fn(),
};

return {
InMemoryEventsRegistry: vi.fn(),
InMemoryCachedStrategyRegistry: {
initialize: vi.fn().mockResolvedValue(mockStrategyRegistry),
},
Expand All @@ -55,6 +60,13 @@ vi.mock("@grants-stack-indexer/data-flow", () => {
getStrategyId: vi.fn(),
saveStrategyId: vi.fn(),
})),
DatabaseEventRegistry: vi.fn().mockImplementation(() => ({
getLastProcessedEvent: vi.fn(),
saveLastProcessedEvent: vi.fn(),
})),
InMemoryCachedEventRegistry: {
initialize: vi.fn().mockResolvedValue(mockEventRegistry),
},
};
});

Expand Down Expand Up @@ -98,7 +110,7 @@ describe("SharedDependenciesService", () => {

// Verify structure of returned dependencies
expect(dependencies).toHaveProperty("core");
expect(dependencies).toHaveProperty("registries");
expect(dependencies).toHaveProperty("registriesRepositories");
expect(dependencies).toHaveProperty("indexerClient");
expect(dependencies).toHaveProperty("kyselyDatabase");

Expand All @@ -112,10 +124,7 @@ describe("SharedDependenciesService", () => {
expect(dependencies.core).toHaveProperty("applicationPayoutRepository");

// Verify registries
expect(dependencies.registries).toHaveProperty("eventsRegistry");
expect(dependencies.registries).toHaveProperty("strategyRegistry");

// Verify InMemoryCachedStrategyRegistry initialization
expect(dependencies.registries.strategyRegistry).toBeDefined();
expect(dependencies.registriesRepositories).toHaveProperty("eventRegistryRepository");
expect(dependencies.registriesRepositories).toHaveProperty("strategyRegistryRepository");
});
});
40 changes: 0 additions & 40 deletions packages/data-flow/src/eventsRegistry.ts

This file was deleted.

3 changes: 2 additions & 1 deletion packages/data-flow/src/external.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
export {
DataLoader,
InMemoryEventsRegistry,
InMemoryCachedStrategyRegistry,
InMemoryCachedEventRegistry,
DatabaseEventRegistry,
DatabaseStrategyRegistry,
Orchestrator,
} from "./internal.js";
Expand Down
12 changes: 4 additions & 8 deletions packages/data-flow/src/interfaces/eventsRegistry.interface.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import { AnyEvent, ChainId, ContractName, ProcessorEvent } from "@grants-stack-indexer/shared";
import { NewProcessedEvent, ProcessedEvent } from "@grants-stack-indexer/repository";
import { ChainId } from "@grants-stack-indexer/shared";

/**
* The events registry saves as a checkpoint to the last processed event by the system.
Expand All @@ -10,16 +11,11 @@ export interface IEventsRegistry {
* @param chainId - The chain id
* @returns The last processed event or undefined if no event has been processed yet.
*/
getLastProcessedEvent(
chainId: ChainId,
): Promise<ProcessorEvent<ContractName, AnyEvent> | undefined>;
getLastProcessedEvent(chainId: ChainId): Promise<ProcessedEvent | undefined>;
/**
* Save the last processed event by the system
* @param chainId - The chain id
* @param event - The event to save.
*/
saveLastProcessedEvent(
chainId: ChainId,
event: ProcessorEvent<ContractName, AnyEvent>,
): Promise<void>;
saveLastProcessedEvent(chainId: ChainId, event: NewProcessedEvent): Promise<void>;
}
Loading
Loading