Skip to content

Commit

Permalink
feat: persisting strategy registry into db (#40)
Browse files Browse the repository at this point in the history
# 🤖 Linear

Closes GIT-185

## Description
We want to persist `strategyId`s on `PoolCreated` events so we could
later pre-process all of the past events when we implement in code a new
Strategy Handler.

- Create `StrategyRepository` to store seen strategyIds with their
corresponding address and whether it was handled or not
- `DBStrategyRegistry` that interfaces with the repo
- a Proxied version that caches in memory and uses the
`DBStrategyRegistry` (InMemoryCachedStrategyRegistry)
- update Processing app to use the InMemoryCachedStrategyRegistry

## Checklist before requesting a review

-   [x] I have conducted a self-review of my code.
-   [x] I have conducted a QA.
-   [x] If it is a core feature, I have included comprehensive tests.
  • Loading branch information
0xnigir1 authored Dec 16, 2024
1 parent 1d83d5d commit 79f4681
Show file tree
Hide file tree
Showing 37 changed files with 820 additions and 168 deletions.
2 changes: 1 addition & 1 deletion apps/processing/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import { ProcessingService } from "./services/processing.service.js";
let processor: ProcessingService;

const main = async (): Promise<void> => {
processor = new ProcessingService(environment);
processor = await ProcessingService.initialize(environment);
await processor.start();
};

Expand Down
10 changes: 7 additions & 3 deletions apps/processing/src/services/processing.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,9 @@ export class ProcessingService {
private readonly logger = new Logger({ className: "ProcessingService" });
private readonly kyselyDatabase: SharedDependencies["kyselyDatabase"];

constructor(private readonly env: Environment) {
private constructor(env: Environment, sharedDependencies: SharedDependencies) {
const { CHAINS: chains } = env;
const { core, registries, indexerClient, kyselyDatabase } =
SharedDependenciesService.initialize(env);
const { core, registries, indexerClient, kyselyDatabase } = sharedDependencies;
this.kyselyDatabase = kyselyDatabase;

for (const chain of chains) {
Expand All @@ -49,6 +48,11 @@ export class ProcessingService {
}
}

static async initialize(env: Environment): Promise<ProcessingService> {
const sharedDependencies = await SharedDependenciesService.initialize(env);
return new ProcessingService(env, sharedDependencies);
}

/**
* Start the processor service
*
Expand Down
24 changes: 18 additions & 6 deletions apps/processing/src/services/sharedDependencies.service.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
import {
CoreDependencies,
DatabaseStrategyRegistry,
IEventsRegistry,
InMemoryCachedStrategyRegistry,
InMemoryEventsRegistry,
InMemoryStrategyRegistry,
IStrategyRegistry,
} from "@grants-stack-indexer/data-flow";
import { EnvioIndexerClient } from "@grants-stack-indexer/indexer-client";
import { IpfsProvider } from "@grants-stack-indexer/metadata";
Expand All @@ -13,6 +16,7 @@ import {
KyselyDonationRepository,
KyselyProjectRepository,
KyselyRoundRepository,
KyselyStrategyRegistryRepository,
} from "@grants-stack-indexer/repository";
import { Logger } from "@grants-stack-indexer/shared";

Expand All @@ -21,8 +25,8 @@ import { Environment } from "../config/index.js";
export type SharedDependencies = {
core: Omit<CoreDependencies, "evmProvider">;
registries: {
eventsRegistry: InMemoryEventsRegistry;
strategyRegistry: InMemoryStrategyRegistry;
eventsRegistry: IEventsRegistry;
strategyRegistry: IStrategyRegistry;
};
indexerClient: EnvioIndexerClient;
kyselyDatabase: ReturnType<typeof createKyselyDatabase>;
Expand All @@ -35,7 +39,7 @@ export type SharedDependencies = {
* - Initializes indexer client
*/
export class SharedDependenciesService {
static initialize(env: Environment): SharedDependencies {
static async initialize(env: Environment): Promise<SharedDependencies> {
// Initialize repositories
const kyselyDatabase = createKyselyDatabase({
connectionString: env.DATABASE_URL,
Expand Down Expand Up @@ -68,8 +72,16 @@ export class SharedDependenciesService {
const eventsRegistry = new InMemoryEventsRegistry(
new Logger({ className: "InMemoryEventsRegistry" }),
);
const strategyRegistry = new InMemoryStrategyRegistry(
new Logger({ className: "InMemoryStrategyRegistry" }),
const strategyRepository = new KyselyStrategyRegistryRepository(
kyselyDatabase,
env.DATABASE_SCHEMA,
);
const strategyRegistry = await InMemoryCachedStrategyRegistry.initialize(
new Logger({ className: "InMemoryCachedStrategyRegistry" }),
new DatabaseStrategyRegistry(
new Logger({ className: "DatabaseStrategyRegistry" }),
strategyRepository,
),
);

// Initialize indexer client
Expand Down
4 changes: 2 additions & 2 deletions apps/processing/test/unit/processing.service.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -52,9 +52,9 @@ describe("ProcessingService", () => {
DATABASE_SCHEMA: "public",
};

beforeEach(() => {
beforeEach(async () => {
vi.clearAllMocks();
processingService = new ProcessingService(mockEnv as Environment);
processingService = await ProcessingService.initialize(mockEnv as Environment);
});

afterEach(() => {
Expand Down
32 changes: 30 additions & 2 deletions apps/processing/test/unit/sharedDependencies.service.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,11 @@ vi.mock("@grants-stack-indexer/repository", () => ({
KyselyApplicationRepository: vi.fn(),
KyselyDonationRepository: vi.fn(),
KyselyApplicationPayoutRepository: vi.fn(),
KyselyStrategyRegistryRepository: vi.fn().mockImplementation(() => ({
getStrategies: vi.fn().mockResolvedValue([]),
getStrategyId: vi.fn(),
saveStrategyId: vi.fn(),
})),
}));

vi.mock("@grants-stack-indexer/pricing", () => ({
Expand All @@ -33,6 +38,26 @@ vi.mock("@grants-stack-indexer/indexer-client", () => ({
EnvioIndexerClient: vi.fn(),
}));

vi.mock("@grants-stack-indexer/data-flow", () => {
const mockStrategyRegistry = {
getStrategies: vi.fn(),
getStrategyId: vi.fn(),
saveStrategyId: vi.fn(),
};

return {
InMemoryEventsRegistry: vi.fn(),
InMemoryCachedStrategyRegistry: {
initialize: vi.fn().mockResolvedValue(mockStrategyRegistry),
},
DatabaseStrategyRegistry: vi.fn().mockImplementation(() => ({
getStrategies: vi.fn(),
getStrategyId: vi.fn(),
saveStrategyId: vi.fn(),
})),
};
});

describe("SharedDependenciesService", () => {
const mockEnv: Pick<
Environment,
Expand All @@ -51,8 +76,8 @@ describe("SharedDependenciesService", () => {
PRICING_SOURCE: "dummy",
};

it("initializes all dependencies correctly", () => {
const dependencies = SharedDependenciesService.initialize(mockEnv as Environment);
it("initializes all dependencies correctly", async () => {
const dependencies = await SharedDependenciesService.initialize(mockEnv as Environment);

// Verify database initialization
expect(createKyselyDatabase).toHaveBeenCalledWith({
Expand Down Expand Up @@ -89,5 +114,8 @@ describe("SharedDependenciesService", () => {
// Verify registries
expect(dependencies.registries).toHaveProperty("eventsRegistry");
expect(dependencies.registries).toHaveProperty("strategyRegistry");

// Verify InMemoryCachedStrategyRegistry initialization
expect(dependencies.registries.strategyRegistry).toBeDefined();
});
});
4 changes: 2 additions & 2 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -10,15 +10,15 @@
"build": "turbo run build",
"check-types": "turbo run check-types",
"clean": "turbo run clean",
"db:migrate": "pnpm run --filter @grants-stack-indexer/migrations db:migrate",
"db:reset": "pnpm run --filter @grants-stack-indexer/migrations db:reset",
"dev": "turbo run dev",
"format": "turbo run format",
"format:fix": "turbo run format:fix",
"preinstall": "npx only-allow pnpm",
"lint": "turbo run lint",
"lint:fix": "turbo run lint:fix",
"prepare": "husky",
"script:db:migrate": "pnpm run --filter @grants-stack-indexer/migrations script:db:migrate",
"script:db:reset": "pnpm run --filter @grants-stack-indexer/migrations script:db:reset",
"start": "turbo run start",
"test": "turbo run test",
"test:cov": "turbo run test:cov",
Expand Down
6 changes: 5 additions & 1 deletion packages/data-flow/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -90,9 +90,13 @@ The `EventsProcessor` class is responsible for processing events in the processi

The `EventsFetcher` class is responsible for fetching events from the blockchain.

### [StrategyRegistry](./src/strategyRegistry.ts)
### [StrategyRegistry](./src/registries/)

The `StrategyRegistry` stores strategy IDs to populate strategy events with them given the Strategy address.
There are 3 implementations:

- `DatabaseStrategyRegistry`: persists data to database using IStrategyRepository
- `InMemoryCachedStrategyRegistry`: stores map in-memory as cache and persists to database

### [DataLoader](./src/data-loader/dataLoader.ts)

Expand Down
3 changes: 2 additions & 1 deletion packages/data-flow/src/external.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
export {
DataLoader,
InMemoryEventsRegistry,
InMemoryStrategyRegistry,
InMemoryCachedStrategyRegistry,
DatabaseStrategyRegistry,
Orchestrator,
} from "./internal.js";

Expand Down
30 changes: 23 additions & 7 deletions packages/data-flow/src/interfaces/strategyRegistry.interface.ts
Original file line number Diff line number Diff line change
@@ -1,22 +1,38 @@
import { Address, Hex } from "viem";

import { Strategy } from "@grants-stack-indexer/repository";
import { ChainId } from "@grants-stack-indexer/shared";

/**
* The strategy registry saves the mapping between the strategy address and the strategy id. Serves as a Cache
* to avoid having to read from the chain to get the strategy id every time.
*/
//TODO: implement a mechanism to record Strategy that we still don't have a corresponding handler
// we need to store and mark that this strategy is not handled yet, so when it's supported we can process all of the pending events for it
export interface IStrategyRegistry {
/**
* Get the strategy id by the strategy address
* Get the strategy id by the strategy address and chain id
*
* @param chainId - The chain id
* @param strategyAddress - The strategy address
* @returns The strategy id or undefined if the strategy address is not registered
* @returns The strategy or undefined if the strategy address is not registered
*/
getStrategyId(strategyAddress: Address): Promise<Hex | undefined>;
getStrategyId(chainId: ChainId, strategyAddress: Address): Promise<Strategy | undefined>;
/**
* Save the strategy id by the strategy address
* Save the strategy id by the strategy address and chain id
* @param chainId - The chain id
* @param strategyAddress - The strategy address
* @param strategyId - The strategy id
* @param handled - Whether the strategy is handled
*/
saveStrategyId(
chainId: ChainId,
strategyAddress: Address,
strategyId: Hex,
handled: boolean,
): Promise<void>;

/**
* Get all the strategies
* @returns The strategies
*/
saveStrategyId(strategyAddress: Address, strategyId: Hex): Promise<void>;
getStrategies(filters?: { handled?: boolean; chainId?: ChainId }): Promise<Strategy[]>;
}
2 changes: 1 addition & 1 deletion packages/data-flow/src/internal.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ export * from "./abis/index.js";
export * from "./utils/index.js";
export * from "./data-loader/index.js";
export * from "./eventsFetcher.js";
export * from "./strategyRegistry.js";
export * from "./registries/index.js";
export * from "./eventsRegistry.js";
export * from "./eventsProcessor.js";
export * from "./orchestrator.js";
42 changes: 28 additions & 14 deletions packages/data-flow/src/orchestrator.ts
Original file line number Diff line number Diff line change
Expand Up @@ -113,15 +113,17 @@ export class Orchestrator {
await this.eventsRegistry.saveLastProcessedEvent(this.chainId, event);

event = await this.enhanceStrategyId(event);
if (event.contractName === "Strategy" && "strategyId" in event) {
if (this.isPoolCreated(event)) {
const handleable = existsHandler(event.strategyId);
await this.strategyRegistry.saveStrategyId(
this.chainId,
event.srcAddress,
event.strategyId,
handleable,
);
} else if (event.contractName === "Strategy" && "strategyId" in event) {
if (!existsHandler(event.strategyId)) {
//TODO: save to registry as unsupported strategy, so when the strategy is handled it will be backwards compatible and process all of the events
//TODO: decide if we want to log this
// this.logger.info(
// `No handler found for strategyId: ${event.strategyId}. Event: ${stringify(
// event,
// )}`,
// );
// we skip the event if the strategy id is not handled yet
continue;
}
}
Expand Down Expand Up @@ -216,9 +218,12 @@ export class Orchestrator {
* @returns The strategy id
*/
private async getOrFetchStrategyId(strategyAddress: Address): Promise<Hex> {
const existingId = await this.strategyRegistry.getStrategyId(strategyAddress);
if (existingId) {
return existingId;
const cachedStrategy = await this.strategyRegistry.getStrategyId(
this.chainId,
strategyAddress,
);
if (cachedStrategy) {
return cachedStrategy.id;
}

const strategyId = await this.dependencies.evmProvider.readContract(
Expand All @@ -227,11 +232,20 @@ export class Orchestrator {
"getStrategyId",
);

await this.strategyRegistry.saveStrategyId(strategyAddress, strategyId);

return strategyId;
}

/**
* Check if the event is a PoolCreated event from Allo contract
* @param event - The event
* @returns True if the event is a PoolCreated event from Allo contract, false otherwise
*/
private isPoolCreated(
event: ProcessorEvent<ContractName, AnyEvent>,
): event is ProcessorEvent<"Allo", "PoolCreated"> {
return isAlloEvent(event) && event.eventName === "PoolCreated";
}

/**
* Check if the event requires a strategy id
* @param event - The event
Expand All @@ -240,6 +254,6 @@ export class Orchestrator {
private requiresStrategyId(
event: ProcessorEvent<ContractName, AnyEvent>,
): event is ProcessorEvent<"Allo", "PoolCreated"> | ProcessorEvent<"Strategy", StrategyEvent> {
return (isAlloEvent(event) && event.eventName === "PoolCreated") || isStrategyEvent(event);
return this.isPoolCreated(event) || isStrategyEvent(event);
}
}
1 change: 1 addition & 0 deletions packages/data-flow/src/registries/index.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
export * from "./strategy/index.js";
Loading

0 comments on commit 79f4681

Please sign in to comment.