Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: persisting strategy registry into db #40

Merged
merged 8 commits into from
Dec 16, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion apps/processing/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import { ProcessingService } from "./services/processing.service.js";
let processor: ProcessingService;

const main = async (): Promise<void> => {
processor = new ProcessingService(environment);
processor = await ProcessingService.initialize(environment);
await processor.start();
};

Expand Down
10 changes: 7 additions & 3 deletions apps/processing/src/services/processing.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,9 @@ export class ProcessingService {
private readonly logger = new Logger({ className: "ProcessingService" });
private readonly kyselyDatabase: SharedDependencies["kyselyDatabase"];

constructor(private readonly env: Environment) {
private constructor(env: Environment, sharedDependencies: SharedDependencies) {
const { CHAINS: chains } = env;
const { core, registries, indexerClient, kyselyDatabase } =
SharedDependenciesService.initialize(env);
const { core, registries, indexerClient, kyselyDatabase } = sharedDependencies;
this.kyselyDatabase = kyselyDatabase;

for (const chain of chains) {
Expand All @@ -49,6 +48,11 @@ export class ProcessingService {
}
}

static async initialize(env: Environment): Promise<ProcessingService> {
const sharedDependencies = await SharedDependenciesService.initialize(env);
return new ProcessingService(env, sharedDependencies);
}

/**
* Start the processor service
*
Expand Down
24 changes: 18 additions & 6 deletions apps/processing/src/services/sharedDependencies.service.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
import {
CoreDependencies,
DatabaseStrategyRegistry,
IEventsRegistry,
InMemoryCachedStrategyRegistry,
InMemoryEventsRegistry,
InMemoryStrategyRegistry,
IStrategyRegistry,
} from "@grants-stack-indexer/data-flow";
import { EnvioIndexerClient } from "@grants-stack-indexer/indexer-client";
import { IpfsProvider } from "@grants-stack-indexer/metadata";
Expand All @@ -13,6 +16,7 @@ import {
KyselyDonationRepository,
KyselyProjectRepository,
KyselyRoundRepository,
KyselyStrategyRegistryRepository,
} from "@grants-stack-indexer/repository";
import { Logger } from "@grants-stack-indexer/shared";

Expand All @@ -21,8 +25,8 @@ import { Environment } from "../config/index.js";
export type SharedDependencies = {
core: Omit<CoreDependencies, "evmProvider">;
registries: {
eventsRegistry: InMemoryEventsRegistry;
strategyRegistry: InMemoryStrategyRegistry;
eventsRegistry: IEventsRegistry;
strategyRegistry: IStrategyRegistry;
};
indexerClient: EnvioIndexerClient;
kyselyDatabase: ReturnType<typeof createKyselyDatabase>;
Expand All @@ -35,7 +39,7 @@ export type SharedDependencies = {
* - Initializes indexer client
*/
export class SharedDependenciesService {
static initialize(env: Environment): SharedDependencies {
static async initialize(env: Environment): Promise<SharedDependencies> {
// Initialize repositories
const kyselyDatabase = createKyselyDatabase({
connectionString: env.DATABASE_URL,
Expand Down Expand Up @@ -68,8 +72,16 @@ export class SharedDependenciesService {
const eventsRegistry = new InMemoryEventsRegistry(
new Logger({ className: "InMemoryEventsRegistry" }),
);
const strategyRegistry = new InMemoryStrategyRegistry(
new Logger({ className: "InMemoryStrategyRegistry" }),
const strategyRepository = new KyselyStrategyRegistryRepository(
kyselyDatabase,
env.DATABASE_SCHEMA,
);
const strategyRegistry = await InMemoryCachedStrategyRegistry.initialize(
new Logger({ className: "InMemoryCachedStrategyRegistry" }),
new DatabaseStrategyRegistry(
new Logger({ className: "DatabaseStrategyRegistry" }),
Comment on lines +80 to +82
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am a little concerned about using multiple logger instances. I think it adds tons of overhead, think about a logger that sends logs to a server or prints all the logs into a file, for each instance you will have a connection (1st case) or a dedicated file (2nd case) correct me if iam wrong pls.

i would love to hear @0xyaco 's opinion about having multiple logger instances across the application

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

mmm i see what you're pointing, we can better discuss it on the Error handling and Logging milestone but a solution could be a Context object on each call so we know which file or chain emitted a log?

strategyRepository,
),
);

// Initialize indexer client
Expand Down
4 changes: 2 additions & 2 deletions apps/processing/test/unit/processing.service.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -52,9 +52,9 @@ describe("ProcessingService", () => {
DATABASE_SCHEMA: "public",
};

beforeEach(() => {
beforeEach(async () => {
vi.clearAllMocks();
processingService = new ProcessingService(mockEnv as Environment);
processingService = await ProcessingService.initialize(mockEnv as Environment);
});

afterEach(() => {
Expand Down
32 changes: 30 additions & 2 deletions apps/processing/test/unit/sharedDependencies.service.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,11 @@ vi.mock("@grants-stack-indexer/repository", () => ({
KyselyApplicationRepository: vi.fn(),
KyselyDonationRepository: vi.fn(),
KyselyApplicationPayoutRepository: vi.fn(),
KyselyStrategyRegistryRepository: vi.fn().mockImplementation(() => ({
getStrategies: vi.fn().mockResolvedValue([]),
getStrategyId: vi.fn(),
saveStrategyId: vi.fn(),
})),
}));

vi.mock("@grants-stack-indexer/pricing", () => ({
Expand All @@ -33,6 +38,26 @@ vi.mock("@grants-stack-indexer/indexer-client", () => ({
EnvioIndexerClient: vi.fn(),
}));

vi.mock("@grants-stack-indexer/data-flow", () => {
const mockStrategyRegistry = {
getStrategies: vi.fn(),
getStrategyId: vi.fn(),
saveStrategyId: vi.fn(),
};

return {
InMemoryEventsRegistry: vi.fn(),
InMemoryCachedStrategyRegistry: {
initialize: vi.fn().mockResolvedValue(mockStrategyRegistry),
},
DatabaseStrategyRegistry: vi.fn().mockImplementation(() => ({
getStrategies: vi.fn(),
getStrategyId: vi.fn(),
saveStrategyId: vi.fn(),
})),
};
});

describe("SharedDependenciesService", () => {
const mockEnv: Pick<
Environment,
Expand All @@ -51,8 +76,8 @@ describe("SharedDependenciesService", () => {
PRICING_SOURCE: "dummy",
};

it("initializes all dependencies correctly", () => {
const dependencies = SharedDependenciesService.initialize(mockEnv as Environment);
it("initializes all dependencies correctly", async () => {
const dependencies = await SharedDependenciesService.initialize(mockEnv as Environment);

// Verify database initialization
expect(createKyselyDatabase).toHaveBeenCalledWith({
Expand Down Expand Up @@ -89,5 +114,8 @@ describe("SharedDependenciesService", () => {
// Verify registries
expect(dependencies.registries).toHaveProperty("eventsRegistry");
expect(dependencies.registries).toHaveProperty("strategyRegistry");

// Verify InMemoryCachedStrategyRegistry initialization
expect(dependencies.registries.strategyRegistry).toBeDefined();
});
});
4 changes: 2 additions & 2 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -10,15 +10,15 @@
"build": "turbo run build",
"check-types": "turbo run check-types",
"clean": "turbo run clean",
"db:migrate": "pnpm run --filter @grants-stack-indexer/migrations db:migrate",
"db:reset": "pnpm run --filter @grants-stack-indexer/migrations db:reset",
"dev": "turbo run dev",
"format": "turbo run format",
"format:fix": "turbo run format:fix",
"preinstall": "npx only-allow pnpm",
"lint": "turbo run lint",
"lint:fix": "turbo run lint:fix",
"prepare": "husky",
"script:db:migrate": "pnpm run --filter @grants-stack-indexer/migrations script:db:migrate",
"script:db:reset": "pnpm run --filter @grants-stack-indexer/migrations script:db:reset",
"start": "turbo run start",
"test": "turbo run test",
"test:cov": "turbo run test:cov",
Expand Down
6 changes: 5 additions & 1 deletion packages/data-flow/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -90,9 +90,13 @@ The `EventsProcessor` class is responsible for processing events in the processi

The `EventsFetcher` class is responsible for fetching events from the blockchain.

### [StrategyRegistry](./src/strategyRegistry.ts)
### [StrategyRegistry](./src/registries/)

The `StrategyRegistry` stores strategy IDs to populate strategy events with them given the Strategy address.
There are 3 implementations:

- `DatabaseStrategyRegistry`: persists data to database using IStrategyRepository
- `InMemoryCachedStrategyRegistry`: stores map in-memory as cache and persists to database

### [DataLoader](./src/data-loader/dataLoader.ts)

Expand Down
3 changes: 2 additions & 1 deletion packages/data-flow/src/external.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
export {
DataLoader,
InMemoryEventsRegistry,
InMemoryStrategyRegistry,
InMemoryCachedStrategyRegistry,
DatabaseStrategyRegistry,
Orchestrator,
} from "./internal.js";

Expand Down
30 changes: 23 additions & 7 deletions packages/data-flow/src/interfaces/strategyRegistry.interface.ts
Original file line number Diff line number Diff line change
@@ -1,22 +1,38 @@
import { Address, Hex } from "viem";

import { Strategy } from "@grants-stack-indexer/repository";
import { ChainId } from "@grants-stack-indexer/shared";

/**
* The strategy registry saves the mapping between the strategy address and the strategy id. Serves as a Cache
* to avoid having to read from the chain to get the strategy id every time.
*/
//TODO: implement a mechanism to record Strategy that we still don't have a corresponding handler
// we need to store and mark that this strategy is not handled yet, so when it's supported we can process all of the pending events for it
export interface IStrategyRegistry {
/**
* Get the strategy id by the strategy address
* Get the strategy id by the strategy address and chain id
*
* @param chainId - The chain id
* @param strategyAddress - The strategy address
* @returns The strategy id or undefined if the strategy address is not registered
* @returns The strategy or undefined if the strategy address is not registered
*/
getStrategyId(strategyAddress: Address): Promise<Hex | undefined>;
getStrategyId(chainId: ChainId, strategyAddress: Address): Promise<Strategy | undefined>;
/**
* Save the strategy id by the strategy address
* Save the strategy id by the strategy address and chain id
* @param chainId - The chain id
* @param strategyAddress - The strategy address
* @param strategyId - The strategy id
* @param handled - Whether the strategy is handled
*/
saveStrategyId(
chainId: ChainId,
strategyAddress: Address,
strategyId: Hex,
handled: boolean,
): Promise<void>;

/**
* Get all the strategies
* @returns The strategies
*/
saveStrategyId(strategyAddress: Address, strategyId: Hex): Promise<void>;
getStrategies(filters?: { handled?: boolean; chainId?: ChainId }): Promise<Strategy[]>;
}
2 changes: 1 addition & 1 deletion packages/data-flow/src/internal.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ export * from "./abis/index.js";
export * from "./utils/index.js";
export * from "./data-loader/index.js";
export * from "./eventsFetcher.js";
export * from "./strategyRegistry.js";
export * from "./registries/index.js";
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

will we add a strategy directory within registries in the future ? i mean, once we develop the events registry we will need it

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah that was what i had in mind but will refactor it now

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

export * from "./eventsRegistry.js";
export * from "./eventsProcessor.js";
export * from "./orchestrator.js";
42 changes: 28 additions & 14 deletions packages/data-flow/src/orchestrator.ts
Original file line number Diff line number Diff line change
Expand Up @@ -113,15 +113,17 @@ export class Orchestrator {
await this.eventsRegistry.saveLastProcessedEvent(this.chainId, event);

event = await this.enhanceStrategyId(event);
if (event.contractName === "Strategy" && "strategyId" in event) {
if (this.isPoolCreated(event)) {
const handleable = existsHandler(event.strategyId);
await this.strategyRegistry.saveStrategyId(
this.chainId,
event.srcAddress,
event.strategyId,
handleable,
);
} else if (event.contractName === "Strategy" && "strategyId" in event) {
if (!existsHandler(event.strategyId)) {
//TODO: save to registry as unsupported strategy, so when the strategy is handled it will be backwards compatible and process all of the events
//TODO: decide if we want to log this
// this.logger.info(
// `No handler found for strategyId: ${event.strategyId}. Event: ${stringify(
// event,
// )}`,
// );
// we skip the event if the strategy id is not handled yet
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

shouldn't we save it with handled=false here ?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe handleable is more representative than handled

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we don't need to, we register on every PoolCreated the strategyAddress to strategyId entry with handled=false since it's only the pool creation. then when an event from the Strategy arrives, if it can be handled then we update the handled=true, we don't need to mark it again as handled=false

continue;
}
}
Expand Down Expand Up @@ -216,9 +218,12 @@ export class Orchestrator {
* @returns The strategy id
*/
private async getOrFetchStrategyId(strategyAddress: Address): Promise<Hex> {
const existingId = await this.strategyRegistry.getStrategyId(strategyAddress);
if (existingId) {
return existingId;
const cachedStrategy = await this.strategyRegistry.getStrategyId(
this.chainId,
strategyAddress,
);
if (cachedStrategy) {
return cachedStrategy.id;
}

const strategyId = await this.dependencies.evmProvider.readContract(
Expand All @@ -227,11 +232,20 @@ export class Orchestrator {
"getStrategyId",
);

await this.strategyRegistry.saveStrategyId(strategyAddress, strategyId);

return strategyId;
}

/**
* Check if the event is a PoolCreated event from Allo contract
* @param event - The event
* @returns True if the event is a PoolCreated event from Allo contract, false otherwise
*/
private isPoolCreated(
event: ProcessorEvent<ContractName, AnyEvent>,
): event is ProcessorEvent<"Allo", "PoolCreated"> {
return isAlloEvent(event) && event.eventName === "PoolCreated";
}

/**
* Check if the event requires a strategy id
* @param event - The event
Expand All @@ -240,6 +254,6 @@ export class Orchestrator {
private requiresStrategyId(
event: ProcessorEvent<ContractName, AnyEvent>,
): event is ProcessorEvent<"Allo", "PoolCreated"> | ProcessorEvent<"Strategy", StrategyEvent> {
return (isAlloEvent(event) && event.eventName === "PoolCreated") || isStrategyEvent(event);
return this.isPoolCreated(event) || isStrategyEvent(event);
}
}
1 change: 1 addition & 0 deletions packages/data-flow/src/registries/index.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
export * from "./strategy/index.js";
Loading
Loading