Skip to content

Commit

Permalink
Merge remote-tracking branch 'origin/dev' into docs/packages-and-apps
Browse files Browse the repository at this point in the history
  • Loading branch information
0xkenj1 committed Nov 8, 2024
2 parents 878d78c + 36632dc commit 08cc6b0
Show file tree
Hide file tree
Showing 26 changed files with 211 additions and 84 deletions.
4 changes: 3 additions & 1 deletion apps/processing/src/config/env.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
import dotenv from "dotenv";
import { z } from "zod";

import { stringify } from "@grants-stack-indexer/shared";

dotenv.config();

const stringToJSONSchema = z.string().transform((str, ctx): z.infer<ReturnType<typeof Object>> => {
Expand Down Expand Up @@ -33,7 +35,7 @@ const env = validationSchema.safeParse(process.env);
if (!env.success) {
console.error(
"Invalid environment variables:",
env.error.issues.map((issue) => JSON.stringify(issue)).join("\n"),
env.error.issues.map((issue) => stringify(issue)).join("\n"),
);
process.exit(1);
}
Expand Down
3 changes: 2 additions & 1 deletion apps/processing/src/services/processing.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ export class ProcessingService {

constructor(private readonly env: Environment) {
const { core, registries, indexerClient, kyselyDatabase } =
SharedDependenciesService.initialize(env);
SharedDependenciesService.initialize(env, this.logger);
this.kyselyDatabase = kyselyDatabase;

// Initialize EVM provider
Expand All @@ -39,6 +39,7 @@ export class ProcessingService {
registries,
env.FETCH_LIMIT,
env.FETCH_DELAY_MS,
this.logger,
);
}

Expand Down
20 changes: 12 additions & 8 deletions apps/processing/src/services/sharedDependencies.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import {
KyselyProjectRepository,
KyselyRoundRepository,
} from "@grants-stack-indexer/repository";
import { ILogger } from "@grants-stack-indexer/shared";

import { Environment } from "../config/index.js";

Expand All @@ -32,7 +33,7 @@ export type SharedDependencies = {
* - Initializes indexer client
*/
export class SharedDependenciesService {
static initialize(env: Environment): SharedDependencies {
static initialize(env: Environment, logger: ILogger): SharedDependencies {
// Initialize repositories
const kyselyDatabase = createKyselyDatabase({
connectionString: env.DATABASE_URL,
Expand All @@ -44,16 +45,19 @@ export class SharedDependenciesService {
kyselyDatabase,
env.DATABASE_SCHEMA,
);
const pricingProvider = new CoingeckoProvider({
apiKey: env.COINGECKO_API_KEY,
apiType: env.COINGECKO_API_TYPE,
});
const pricingProvider = new CoingeckoProvider(
{
apiKey: env.COINGECKO_API_KEY,
apiType: env.COINGECKO_API_TYPE,
},
logger,
);

const metadataProvider = new IpfsProvider(env.IPFS_GATEWAYS_URL);
const metadataProvider = new IpfsProvider(env.IPFS_GATEWAYS_URL, logger);

// Initialize registries
const eventsRegistry = new InMemoryEventsRegistry();
const strategyRegistry = new InMemoryStrategyRegistry();
const eventsRegistry = new InMemoryEventsRegistry(logger);
const strategyRegistry = new InMemoryStrategyRegistry(logger);

// Initialize indexer client
const indexerClient = new EnvioIndexerClient(
Expand Down
4 changes: 3 additions & 1 deletion packages/data-flow/src/data-loader/dataLoader.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import {
IProjectRepository,
IRoundRepository,
} from "@grants-stack-indexer/repository";
import { ILogger, stringify } from "@grants-stack-indexer/shared";

import { ExecutionResult, IDataLoader, InvalidChangeset } from "../internal.js";
import {
Expand Down Expand Up @@ -35,6 +36,7 @@ export class DataLoader implements IDataLoader {
round: IRoundRepository;
application: IApplicationRepository;
},
private readonly logger: ILogger,
) {
this.handlers = {
...createProjectHandlers(repositories.project),
Expand Down Expand Up @@ -73,7 +75,7 @@ export class DataLoader implements IDataLoader {
error instanceof Error ? error.message : String(error)
}`,
);
console.error(error);
this.logger.error(`${stringify(error, Object.getOwnPropertyNames(error))}`);
break;
}
}
Expand Down
6 changes: 5 additions & 1 deletion packages/data-flow/src/eventsRegistry.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import type { AnyEvent, ContractName, ProcessorEvent } from "@grants-stack-indexer/shared";
import type { AnyEvent, ContractName, ILogger, ProcessorEvent } from "@grants-stack-indexer/shared";
import { stringify } from "@grants-stack-indexer/shared";

import type { IEventsRegistry } from "./internal.js";

Expand All @@ -9,6 +10,8 @@ import type { IEventsRegistry } from "./internal.js";
export class InMemoryEventsRegistry implements IEventsRegistry {
private lastProcessedEvent: ProcessorEvent<ContractName, AnyEvent> | undefined;

constructor(private logger: ILogger) {}

/**
* @inheritdoc
*/
Expand All @@ -20,6 +23,7 @@ export class InMemoryEventsRegistry implements IEventsRegistry {
* @inheritdoc
*/
async saveLastProcessedEvent(event: ProcessorEvent<ContractName, AnyEvent>): Promise<void> {
this.logger.debug(`Saving last processed event: ${stringify(event, undefined, 4)}`);
this.lastProcessedEvent = event;
}
}
30 changes: 19 additions & 11 deletions packages/data-flow/src/orchestrator.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import {
ChainId,
ContractName,
Hex,
ILogger,
isAlloEvent,
isStrategyEvent,
ProcessorEvent,
Expand Down Expand Up @@ -75,16 +76,23 @@ export class Orchestrator {
},
private fetchLimit: number = 1000,
private fetchDelayInMs: number = 10000,
private logger: ILogger,
) {
this.eventsFetcher = new EventsFetcher(this.indexerClient);
this.eventsProcessor = new EventsProcessor(this.chainId, this.dependencies);
this.eventsProcessor = new EventsProcessor(this.chainId, {
...this.dependencies,
logger: this.logger,
});
this.eventsRegistry = registries.eventsRegistry;
this.strategyRegistry = registries.strategyRegistry;
this.dataLoader = new DataLoader({
project: this.dependencies.projectRepository,
round: this.dependencies.roundRepository,
application: this.dependencies.applicationRepository,
});
this.dataLoader = new DataLoader(
{
project: this.dependencies.projectRepository,
round: this.dependencies.roundRepository,
application: this.dependencies.applicationRepository,
},
this.logger,
);
this.eventsQueue = new Queue<ProcessorEvent<ContractName, AnyEvent>>(fetchLimit);
}

Expand All @@ -107,7 +115,7 @@ export class Orchestrator {
if (!existsHandler(event.strategyId)) {
//TODO: save to registry as unsupported strategy, so when the strategy is handled it will be backwards compatible and process all of the events
//TODO: decide if we want to log this
// console.log(
// this.logger.info(
// `No handler found for strategyId: ${event.strategyId}. Event: ${stringify(
// event,
// )}`,
Expand All @@ -121,7 +129,7 @@ export class Orchestrator {

if (executionResult.numFailed > 0) {
//TODO: should we retry the failed changesets?
console.error(
this.logger.error(
`Failed to apply changesets. ${executionResult.errors.join("\n")} Event: ${stringify(
event,
)}`,
Expand All @@ -134,16 +142,16 @@ export class Orchestrator {
error instanceof InvalidEvent ||
error instanceof UnsupportedEventException
) {
// console.error(
// this.logger.error(
// `Current event cannot be handled. ${error.name}: ${error.message}. Event: ${stringify(event)}`,
// );
} else {
console.error(`Error processing event: ${stringify(event)}`, error);
this.logger.error(`Error processing event: ${stringify(event)} ${error}`);
}
}
}

console.log("Shutdown signal received. Exiting...");
this.logger.info("Shutdown signal received. Exiting...");
}

/**
Expand Down
4 changes: 4 additions & 0 deletions packages/data-flow/src/strategyRegistry.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
import type { Address, Hex } from "viem";

import { ILogger } from "@grants-stack-indexer/shared";

import type { IStrategyRegistry } from "./internal.js";

/**
Expand All @@ -8,6 +10,7 @@ import type { IStrategyRegistry } from "./internal.js";
//TODO: Implement storage to persist strategies. since we're using address, do we need ChainId?
export class InMemoryStrategyRegistry implements IStrategyRegistry {
private strategiesMap: Map<Address, Hex> = new Map();
constructor(private logger: ILogger) {}

/** @inheritdoc */
async getStrategyId(strategyAddress: Address): Promise<Hex | undefined> {
Expand All @@ -16,6 +19,7 @@ export class InMemoryStrategyRegistry implements IStrategyRegistry {

/** @inheritdoc */
async saveStrategyId(strategyAddress: Address, strategyId: Hex): Promise<void> {
this.logger.debug(`Saving strategy id ${strategyId} for address ${strategyAddress}`);
this.strategiesMap.set(strategyAddress, strategyId);
}
}
20 changes: 15 additions & 5 deletions packages/data-flow/test/data-loader/dataLoader.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import {
IProjectRepository,
IRoundRepository,
} from "@grants-stack-indexer/repository";
import { ILogger } from "@grants-stack-indexer/shared";

import { DataLoader } from "../../src/data-loader/dataLoader.js";
import { InvalidChangeset } from "../../src/internal.js";
Expand All @@ -26,12 +27,21 @@ describe("DataLoader", () => {
updateApplication: vi.fn(),
} as unknown as IApplicationRepository;

const logger: ILogger = {
debug: vi.fn(),
error: vi.fn(),
info: vi.fn(),
warn: vi.fn(),
};
const createDataLoader = (): DataLoader =>
new DataLoader({
project: mockProjectRepository,
round: mockRoundRepository,
application: mockApplicationRepository,
});
new DataLoader(
{
project: mockProjectRepository,
round: mockRoundRepository,
application: mockApplicationRepository,
},
logger,
);

beforeEach(() => {
vi.clearAllMocks();
Expand Down
16 changes: 11 additions & 5 deletions packages/data-flow/test/unit/eventsRegistry.spec.ts
Original file line number Diff line number Diff line change
@@ -1,18 +1,24 @@
import { describe, expect, it } from "vitest";
import { describe, expect, it, vi } from "vitest";

import { ChainId, ProcessorEvent } from "@grants-stack-indexer/shared";
import { ChainId, ILogger, ProcessorEvent } from "@grants-stack-indexer/shared";

import { InMemoryEventsRegistry } from "../../src/eventsRegistry.js";

describe("InMemoryEventsRegistry", () => {
const logger: ILogger = {
debug: vi.fn(),
error: vi.fn(),
info: vi.fn(),
warn: vi.fn(),
};
it("return null when no event has been saved", async () => {
const registry = new InMemoryEventsRegistry();
const registry = new InMemoryEventsRegistry(logger);
const lastEvent = await registry.getLastProcessedEvent();
expect(lastEvent).toBeUndefined();
});

it("save and retrieve the last processed event", async () => {
const registry = new InMemoryEventsRegistry();
const registry = new InMemoryEventsRegistry(logger);
const mockEvent: ProcessorEvent<"Allo", "PoolCreated"> = {
contractName: "Allo",
eventName: "PoolCreated",
Expand Down Expand Up @@ -43,7 +49,7 @@ describe("InMemoryEventsRegistry", () => {
});

it("should update the last processed event when saving multiple times", async () => {
const registry = new InMemoryEventsRegistry();
const registry = new InMemoryEventsRegistry(logger);

const firstEvent: ProcessorEvent<"Allo", "PoolCreated"> = {
contractName: "Allo",
Expand Down
14 changes: 10 additions & 4 deletions packages/data-flow/test/unit/orchestrator.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import {
ContractToEventName,
EventParams,
Hex,
ILogger,
ProcessorEvent,
StrategyEvent,
stringify,
Expand Down Expand Up @@ -59,6 +60,12 @@ describe("Orchestrator", { sequential: true }, () => {
const chainId = 1 as ChainId;
const mockFetchLimit = 10;
const mockFetchDelay = 100;
const logger: ILogger = {
debug: vi.fn(),
error: vi.fn(),
info: vi.fn(),
warn: vi.fn(),
};

beforeEach(() => {
// Setup mock implementations
Expand Down Expand Up @@ -105,6 +112,7 @@ describe("Orchestrator", { sequential: true }, () => {
},
mockFetchLimit,
mockFetchDelay,
logger,
);
});

Expand Down Expand Up @@ -468,7 +476,6 @@ describe("Orchestrator", { sequential: true }, () => {

it("keeps running when there is an error", async () => {
const eventsProcessorSpy = vi.spyOn(orchestrator["eventsProcessor"], "processEvent");
const consoleSpy = vi.spyOn(console, "error");
const errorEvent = createMockEvent("Allo", "Unknown" as unknown as AlloEvent, 1);
const error = new Error("test");

Expand Down Expand Up @@ -503,10 +510,9 @@ describe("Orchestrator", { sequential: true }, () => {
expect(eventsProcessorSpy).toHaveBeenCalledTimes(2);
expect(orchestrator["dataLoader"].applyChanges).toHaveBeenCalledTimes(1);
expect(mockEventsRegistry.saveLastProcessedEvent).toHaveBeenCalledTimes(2);
expect(consoleSpy).toHaveBeenCalledTimes(1);
expect(consoleSpy).toHaveBeenCalledWith(
expect(logger.error).toHaveBeenCalledTimes(1);
expect(logger.error).toHaveBeenCalledWith(
expect.stringContaining(`Error processing event: ${stringify(errorEvent)}`),
error,
);
});

Expand Down
16 changes: 12 additions & 4 deletions packages/data-flow/test/unit/strategyRegistry.spec.ts
Original file line number Diff line number Diff line change
@@ -1,19 +1,27 @@
import { Address, Hex } from "viem";
import { describe, expect, it } from "vitest";
import { describe, expect, it, vi } from "vitest";

import { ILogger } from "@grants-stack-indexer/shared";

import { InMemoryStrategyRegistry } from "../../src/strategyRegistry.js";

describe("InMemoryStrategyRegistry", () => {
const logger: ILogger = {
debug: vi.fn(),
error: vi.fn(),
info: vi.fn(),
warn: vi.fn(),
};
it("return undefined for non-existent strategy address", async () => {
const registry = new InMemoryStrategyRegistry();
const registry = new InMemoryStrategyRegistry(logger);
const strategyAddress = "0x123" as Address;

const strategyId = await registry.getStrategyId(strategyAddress);
expect(strategyId).toBeUndefined();
});

it("save and retrieve strategy id", async () => {
const registry = new InMemoryStrategyRegistry();
const registry = new InMemoryStrategyRegistry(logger);
const strategyAddress = "0x123" as Address;
const strategyId = "0xabc" as Hex;

Expand All @@ -24,7 +32,7 @@ describe("InMemoryStrategyRegistry", () => {
});

it("handle multiple strategy addresses independently", async () => {
const registry = new InMemoryStrategyRegistry();
const registry = new InMemoryStrategyRegistry(logger);
const firstAddress = "0x123" as Address;
const secondAddress = "0x456" as Address;
const firstStrategyId = "0xabc" as Hex;
Expand Down
Loading

0 comments on commit 08cc6b0

Please sign in to comment.