Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: processor service integration #22

Merged
merged 7 commits into from
Nov 5, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion apps/indexer/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
},
"dependencies": {
"chai": "4.3.10",
"envio": "2.5.2",
"envio": "2.7.2",
"ethers": "6.8.0",
"yaml": "2.5.1"
},
Expand Down
44 changes: 23 additions & 21 deletions apps/indexer/pnpm-lock.yaml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

17 changes: 17 additions & 0 deletions apps/processing/.env.example
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
# configuration for Optimism
RPC_URLS=["https://optimism.llamarpc.com","https://rpc.ankr.com/optimism","https://optimism.gateway.tenderly.co","https://optimism.blockpi.network/v1/rpc/public","https://mainnet.optimism.io","https://opt-mainnet.g.alchemy.com/v2/demo"]
CHAIN_ID=10

FETCH_LIMIT=500
FETCH_DELAY_MS=3000

DATABASE_URL=
DATABASE_SCHEMA=chainDataSchema

INDEXER_GRAPHQL_URL=
INDEXER_ADMIN_SECRET=

IPFS_GATEWAYS_URL=["https://ipfs.io","https://gateway.pinata.cloud","https://dweb.link", "https://ipfs.eth.aragon.network"]

COINGECKO_API_KEY=
COINGECKO_API_TYPE= #demo or pro
61 changes: 61 additions & 0 deletions apps/processing/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
# Processor Service

This service is the main application that runs the core processing pipeline:

- Instantiates and coordinates components from [Grants Stack Indexer packages](../../packages/)
- Creates and manages an Orchestrator per chain to process blockchain events

## Requirements

- A running instance of PostgreSQL Data Layer with migrations applied
- A running instance of Envio Indexer

## Setup

1. Install dependencies running `pnpm install`
2. Build the app using `pnpm build`

### ⚙️ Setting up env variables

- Create `.env` file and copy paste `.env.example` content in there or run the following command:

```
$ cp .env.example .env
```

Available options:
| Name | Description | Default | Required | Notes |
|-----------------------------|--------------------------------------------------------------------------------------------------------------------------------|-----------|----------------------------------|-----------------------------------------------------------------|
| `RPC_URLS` | Array of RPC URLs | N/A | Yes | Multiple URLs for redundancy |
| `CHAIN_ID` | Chain ID | N/A | Yes | At the moment only Optimism is supported (10) |
| `FETCH_LIMIT` | Maximum number of items to fetch in one batch | 500 | No | |
| `FETCH_DELAY_MS` | Delay between fetch operations in milliseconds | 1000 | No | |
| `DATABASE_URL` | PostgreSQL Data Layer database connection URL | N/A | Yes | |
| `DATABASE_SCHEMA` | PostgreSQL Data Layer database schema name | public | Yes | |
| `INDEXER_GRAPHQL_URL` | GraphQL endpoint for the indexer | N/A | Yes | |
| `INDEXER_ADMIN_SECRET` | Admin secret for indexer authentication | N/A | Yes | |
| `IPFS_GATEWAYS_URL` | Array of IPFS gateway URLs | N/A | Yes | Multiple gateways for redundancy |
| `COINGECKO_API_KEY` | API key for CoinGecko service | N/A | Yes | |
| `COINGECKO_API_TYPE` | CoinGecko API tier (demo or pro) | pro | No | |

## Available Scripts

Available scripts that can be run using `pnpm`:

| Script | Description |
| ------------- | ------------------------------------------------------- |
| `build` | Build library using tsc |
| `check-types` | Check types issues using tsc |
| `clean` | Remove `dist` folder |
| `dev` | Run the app in development mode using tsx |
| `dev:watch` | Run the app in watch mode for development |
| `lint` | Run ESLint to check for coding standards |
| `lint:fix` | Run linter and automatically fix code formatting issues |
| `format` | Check code formatting and style using Prettier |
| `format:fix` | Run formatter and automatically fix issues |
| `start` | Run the compiled app from dist folder |
| `test` | Run tests using vitest |
| `test:cov` | Run tests with coverage report |

TODO: e2e tests
TODO: Docker image
36 changes: 36 additions & 0 deletions apps/processing/package.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
{
"name": "@grants-stack-indexer/processor",
"version": "0.0.1",
"type": "module",
"main": "./dist/index.js",
"scripts": {
"build": "tsc -p tsconfig.build.json",
"check-types": "tsc --noEmit -p ./tsconfig.json",
"clean": "rm -rf dist",
"dev": "tsx src/index.ts",
"dev:watch": "tsx watch src/index.ts",
"format": "prettier --check \"{src,test}/**/*.{js,ts,json}\"",
"format:fix": "prettier --write \"{src,test}/**/*.{js,ts,json}\"",
"lint": "eslint \"{src,test}/**/*.{js,ts,json}\"",
"lint:fix": "pnpm lint --fix",
"start": "node dist/index.js",
"test": "vitest run --config vitest.config.ts --passWithNoTests",
"test:cov": "vitest run --config vitest.config.ts --coverage --passWithNoTests"
},
"dependencies": {
"@grants-stack-indexer/chain-providers": "workspace:*",
"@grants-stack-indexer/data-flow": "workspace:*",
"@grants-stack-indexer/indexer-client": "workspace:*",
"@grants-stack-indexer/metadata": "workspace:*",
"@grants-stack-indexer/pricing": "workspace:*",
"@grants-stack-indexer/repository": "workspace:*",
"@grants-stack-indexer/shared": "workspace:*",
"dotenv": "16.4.5",
"pg": "8.13.0",
"viem": "2.19.6",
"zod": "3.23.8"
},
"devDependencies": {
"tsx": "4.19.2"
}
}
42 changes: 42 additions & 0 deletions apps/processing/src/config/env.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
import dotenv from "dotenv";
import { z } from "zod";

dotenv.config();

const stringToJSONSchema = z.string().transform((str, ctx): z.infer<ReturnType<typeof Object>> => {
try {
return JSON.parse(str);
} catch (e) {
ctx.addIssue({ code: "custom", message: "Invalid JSON" });
return z.NEVER;
}
});

const validationSchema = z.object({
RPC_URLS: stringToJSONSchema.pipe(z.array(z.string().url())),
CHAIN_ID: z.coerce.number().int().positive(),
FETCH_LIMIT: z.coerce.number().int().positive().default(500),
FETCH_DELAY_MS: z.coerce.number().int().positive().default(1000),
DATABASE_URL: z.string(),
DATABASE_SCHEMA: z.string().default("public"),
INDEXER_GRAPHQL_URL: z.string().url(),
INDEXER_ADMIN_SECRET: z.string(),
COINGECKO_API_KEY: z.string(),
COINGECKO_API_TYPE: z.enum(["demo", "pro"]).default("pro"),
IPFS_GATEWAYS_URL: stringToJSONSchema
.pipe(z.array(z.string().url()))
.default('["https://ipfs.io"]'),
});

const env = validationSchema.safeParse(process.env);

if (!env.success) {
console.error(
"Invalid environment variables:",
env.error.issues.map((issue) => JSON.stringify(issue)).join("\n"),
);
process.exit(1);
}

export const environment = env.data;
export type Environment = z.infer<typeof validationSchema>;
1 change: 1 addition & 0 deletions apps/processing/src/config/index.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
export * from "./env.js";
33 changes: 33 additions & 0 deletions apps/processing/src/index.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
import { inspect } from "util";

import { environment } from "./config/index.js";
import { ProcessingService } from "./services/processing.service.js";

let processor: ProcessingService;

const main = async (): Promise<void> => {
processor = new ProcessingService(environment);
await processor.start();
};

process.on("unhandledRejection", (reason, p) => {
console.error(`Unhandled Rejection at: \n${inspect(p, undefined, 100)}, \nreason: ${reason}`);
process.exit(1);
});

process.on("uncaughtException", (error: Error) => {
console.error(
`An uncaught exception occurred: ${error}\n` + `Exception origin: ${error.stack}`,
);
process.exit(1);
});

main()
.catch((err) => {
console.error(`Caught error in main handler: ${err}`);
process.exit(1);
})
// eslint-disable-next-line @typescript-eslint/no-misused-promises
.finally(async () => {
await processor?.releaseResources();
});
2 changes: 2 additions & 0 deletions apps/processing/src/services/index.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
export * from "./sharedDependencies.service.js";
export * from "./processing.service.js";
86 changes: 86 additions & 0 deletions apps/processing/src/services/processing.service.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
import { optimism } from "viem/chains";

import { EvmProvider } from "@grants-stack-indexer/chain-providers";
import { Orchestrator } from "@grants-stack-indexer/data-flow";
import { ChainId, Logger } from "@grants-stack-indexer/shared";

import { Environment } from "../config/env.js";
import { SharedDependencies, SharedDependenciesService } from "./index.js";

/**
* Processor service application
* - Initializes core dependencies (repositories, providers) via SharedDependenciesService
* - Sets up EVM provider with configured RPC endpoints
* - Creates an Orchestrator instance to coordinate an specific chain:
* - Fetching on-chain events via indexer client
* - Processing events through registered handlers
* - Storing processed data in PostgreSQL via repositories
* - Manages graceful shutdown on termination signals
*
* TODO: support multichain
*/
export class ProcessingService {
private readonly logger = Logger.getInstance();
private readonly orchestrator: Orchestrator;
private readonly kyselyDatabase: SharedDependencies["kyselyDatabase"];

constructor(private readonly env: Environment) {
const { core, registries, indexerClient, kyselyDatabase } =
SharedDependenciesService.initialize(env);
this.kyselyDatabase = kyselyDatabase;

// Initialize EVM provider
const evmProvider = new EvmProvider(env.RPC_URLS, optimism, this.logger);

this.orchestrator = new Orchestrator(
env.CHAIN_ID as ChainId,
{ ...core, evmProvider },
indexerClient,
registries,
env.FETCH_LIMIT,
env.FETCH_DELAY_MS,
);
}

/**
* Start the processor service
*
* The processor runs indefinitely until it is terminated.
*/
async start(): Promise<void> {
this.logger.info("Starting processor service...");

const abortController = new AbortController();

// Handle graceful shutdown
process.on("SIGINT", () => {
this.logger.info("Received SIGINT signal. Shutting down...");
abortController.abort();
});

process.on("SIGTERM", () => {
this.logger.info("Received SIGTERM signal. Shutting down...");
abortController.abort();
});

try {
await this.orchestrator.run(abortController.signal);
} catch (error) {
this.logger.error(`Processor service failed: ${error}`);
throw error;
}
}

/**
* Call this function when the processor service is terminated
* - Releases database resources
*/
async releaseResources(): Promise<void> {
try {
this.logger.info("Releasing resources...");
await this.kyselyDatabase.destroy();
} catch (error) {
this.logger.error(`Error releasing resources: ${error}`);
}
}
}
Loading