-
Notifications
You must be signed in to change notification settings - Fork 0
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: processor service integration #22
Changes from 5 commits
559e88f
92a51a8
15782cc
a9e835d
8b5afdb
a6c3566
fe0da62
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,17 @@ | ||
# configuration for Optimism | ||
RPC_URLS=["https://optimism.llamarpc.com","https://rpc.ankr.com/optimism","https://optimism.gateway.tenderly.co","https://optimism.blockpi.network/v1/rpc/public","https://mainnet.optimism.io","https://opt-mainnet.g.alchemy.com/v2/demo"] | ||
CHAIN_ID=10 | ||
|
||
FETCH_LIMIT=500 | ||
FETCH_DELAY_MS=3000 | ||
|
||
DATABASE_URL= | ||
DATABASE_SCHEMA=chainDataSchema | ||
|
||
INDEXER_GRAPHQL_URL= | ||
INDEXER_ADMIN_SECRET= | ||
|
||
IPFS_GATEWAYS_URL=["https://ipfs.io","https://gateway.pinata.cloud","https://dweb.link", "https://ipfs.eth.aragon.network"] | ||
|
||
COINGECKO_API_KEY= | ||
COINGECKO_API_TYPE= #demo or pro |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,61 @@ | ||
# Processor Service | ||
|
||
This service is the main application that runs the core processing pipeline: | ||
|
||
- Instantiates and coordinates components from [Grants Stack Indexer packages](../../packages/) | ||
- Creates and manages an Orchestrator per chain to process blockchain events | ||
|
||
## Requirements | ||
|
||
- A running instance of PostgreSQL Data Layer with migrations applied | ||
- A running instance of Envio Indexer | ||
|
||
## Setup | ||
|
||
1. Install dependencies running `pnpm install` | ||
2. Build the app using `pnpm build` | ||
|
||
### ⚙️ Setting up env variables | ||
|
||
- Create `.env` file and copy paste `.env.example` content in there. | ||
|
||
``` | ||
$ cp .env.example .env | ||
``` | ||
|
||
Available options: | ||
| Name | Description | Default | Required | Notes | | ||
|-----------------------------|--------------------------------------------------------------------------------------------------------------------------------|-----------|----------------------------------|-----------------------------------------------------------------| | ||
| `RPC_URLS` | Array of RPC URLs | N/A | Yes | Multiple URLs for redundancy | | ||
| `CHAIN_ID` | Chain ID | N/A | Yes | At the moment only Optimism is supported (10) | | ||
| `FETCH_LIMIT` | Maximum number of items to fetch in one batch | 500 | Yes | | | ||
| `FETCH_DELAY_MS` | Delay between fetch operations in milliseconds | 3000 | Yes | | | ||
| `DATABASE_URL` | PostgreSQL Data Layer database connection URL | N/A | Yes | | | ||
| `DATABASE_SCHEMA` | PostgreSQL Data Layer database schema name | chainDataSchema | Yes | | | ||
| `INDEXER_GRAPHQL_URL` | GraphQL endpoint for the indexer | N/A | Yes | | | ||
| `INDEXER_ADMIN_SECRET` | Admin secret for indexer authentication | N/A | Yes | | | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. we should change this, admin secret is just for admins, should be a way to generate some token for client auth There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. |
||
| `IPFS_GATEWAYS_URL` | Array of IPFS gateway URLs | N/A | Yes | Multiple gateways for redundancy | | ||
| `COINGECKO_API_KEY` | API key for CoinGecko service | N/A | Yes | | | ||
| `COINGECKO_API_TYPE` | CoinGecko API tier (demo or pro) | N/A | Yes | | | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. lets use default There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. give me my Pro Key xd |
||
|
||
## Available Scripts | ||
|
||
Available scripts that can be run using `pnpm`: | ||
|
||
| Script | Description | | ||
| ------------- | ------------------------------------------------------- | | ||
| `build` | Build library using tsc | | ||
| `check-types` | Check types issues using tsc | | ||
| `clean` | Remove `dist` folder | | ||
| `dev` | Run the app in development mode using tsx | | ||
| `dev:watch` | Run the app in watch mode for development | | ||
| `lint` | Run ESLint to check for coding standards | | ||
| `lint:fix` | Run linter and automatically fix code formatting issues | | ||
| `format` | Check code formatting and style using Prettier | | ||
| `format:fix` | Run formatter and automatically fix issues | | ||
| `start` | Run the compiled app from dist folder | | ||
| `test` | Run tests using vitest | | ||
| `test:cov` | Run tests with coverage report | | ||
|
||
TODO: e2e tests | ||
TODO: Docker image |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,36 @@ | ||
{ | ||
"name": "@grants-stack-indexer/processor", | ||
"version": "0.0.1", | ||
"type": "module", | ||
"main": "./dist/index.js", | ||
"scripts": { | ||
"build": "tsc -p tsconfig.build.json", | ||
"check-types": "tsc --noEmit -p ./tsconfig.json", | ||
"clean": "rm -rf dist", | ||
"dev": "tsx src/index.ts", | ||
"dev:watch": "tsx watch src/index.ts", | ||
"format": "prettier --check \"{src,test}/**/*.{js,ts,json}\"", | ||
"format:fix": "prettier --write \"{src,test}/**/*.{js,ts,json}\"", | ||
"lint": "eslint \"{src,test}/**/*.{js,ts,json}\"", | ||
"lint:fix": "pnpm lint --fix", | ||
"start": "node dist/index.js", | ||
"test": "vitest run --config vitest.config.ts --passWithNoTests", | ||
"test:cov": "vitest run --config vitest.config.ts --coverage --passWithNoTests" | ||
}, | ||
"dependencies": { | ||
"@grants-stack-indexer/chain-providers": "workspace:*", | ||
"@grants-stack-indexer/data-flow": "workspace:*", | ||
"@grants-stack-indexer/indexer-client": "workspace:*", | ||
"@grants-stack-indexer/metadata": "workspace:*", | ||
"@grants-stack-indexer/pricing": "workspace:*", | ||
"@grants-stack-indexer/repository": "workspace:*", | ||
"@grants-stack-indexer/shared": "workspace:*", | ||
"dotenv": "16.4.5", | ||
"pg": "8.13.0", | ||
"viem": "2.19.6", | ||
"zod": "3.23.8" | ||
}, | ||
"devDependencies": { | ||
"tsx": "4.19.2" | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,42 @@ | ||
import dotenv from "dotenv"; | ||
import { z } from "zod"; | ||
|
||
dotenv.config(); | ||
|
||
const stringToJSONSchema = z.string().transform((str, ctx): z.infer<ReturnType<typeof Object>> => { | ||
try { | ||
return JSON.parse(str); | ||
} catch (e) { | ||
ctx.addIssue({ code: "custom", message: "Invalid JSON" }); | ||
return z.NEVER; | ||
} | ||
}); | ||
|
||
const validationSchema = z.object({ | ||
RPC_URLS: stringToJSONSchema.pipe(z.array(z.string().url())), | ||
CHAIN_ID: z.coerce.number().int().positive(), | ||
FETCH_LIMIT: z.coerce.number().int().positive().default(1000), | ||
FETCH_DELAY_MS: z.coerce.number().int().positive().default(10000), | ||
DATABASE_URL: z.string(), | ||
DATABASE_SCHEMA: z.string().default("public"), | ||
INDEXER_GRAPHQL_URL: z.string().url(), | ||
INDEXER_ADMIN_SECRET: z.string(), | ||
COINGECKO_API_KEY: z.string(), | ||
COINGECKO_API_TYPE: z.enum(["demo", "pro"]).default("demo"), | ||
IPFS_GATEWAYS_URL: stringToJSONSchema | ||
.pipe(z.array(z.string().url())) | ||
.default('["https://ipfs.io"]'), | ||
}); | ||
|
||
const env = validationSchema.safeParse(process.env); | ||
|
||
if (!env.success) { | ||
console.error( | ||
"Invalid environment variables:", | ||
env.error.issues.map((issue) => JSON.stringify(issue)).join("\n"), | ||
); | ||
process.exit(1); | ||
} | ||
|
||
export const environment = env.data; | ||
export type Environment = z.infer<typeof validationSchema>; |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1 @@ | ||
export * from "./env.js"; |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,33 @@ | ||
import { inspect } from "util"; | ||
|
||
import { environment } from "./config/index.js"; | ||
import { ProcessorService } from "./services/processor.service.js"; | ||
|
||
let processor: ProcessorService; | ||
|
||
const main = async (): Promise<void> => { | ||
processor = new ProcessorService(environment); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. What about using There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. aaa okay makes sense, Processor was a more natural naming. the app folder should also be |
||
await processor.start(); | ||
}; | ||
|
||
process.on("unhandledRejection", (reason, p) => { | ||
console.error(`Unhandled Rejection at: \n${inspect(p, undefined, 100)}, \nreason: ${reason}`); | ||
process.exit(1); | ||
}); | ||
|
||
process.on("uncaughtException", (error: Error) => { | ||
console.error( | ||
`An uncaught exception occurred: ${error}\n` + `Exception origin: ${error.stack}`, | ||
); | ||
process.exit(1); | ||
}); | ||
|
||
main() | ||
.catch((err) => { | ||
console.error(`Caught error in main handler: ${err}`); | ||
process.exit(1); | ||
}) | ||
// eslint-disable-next-line @typescript-eslint/no-misused-promises | ||
.finally(async () => { | ||
await processor?.releaseResources(); | ||
}); |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,2 @@ | ||
export * from "./sharedDependencies.service.js"; | ||
export * from "./processor.service.js"; |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,86 @@ | ||
import { optimism } from "viem/chains"; | ||
|
||
import { EvmProvider } from "@grants-stack-indexer/chain-providers"; | ||
import { Orchestrator } from "@grants-stack-indexer/data-flow"; | ||
import { ChainId, Logger } from "@grants-stack-indexer/shared"; | ||
|
||
import { Environment } from "../config/env.js"; | ||
import { SharedDependencies, SharedDependenciesService } from "./index.js"; | ||
|
||
/** | ||
* Processor service application | ||
* - Initializes core dependencies (repositories, providers) via SharedDependenciesService | ||
* - Sets up EVM provider with configured RPC endpoints | ||
* - Creates an Orchestrator instance to coordinate an specific chain: | ||
* - Fetching on-chain events via indexer client | ||
* - Processing events through registered handlers | ||
* - Storing processed data in PostgreSQL via repositories | ||
* - Manages graceful shutdown on termination signals | ||
* | ||
* TODO: support multichain | ||
*/ | ||
export class ProcessorService { | ||
private readonly logger = Logger.getInstance(); | ||
private readonly orchestrator: Orchestrator; | ||
private readonly kyselyDatabase: SharedDependencies["kyselyDatabase"]; | ||
|
||
constructor(private readonly env: Environment) { | ||
const { core, registries, indexerClient, kyselyDatabase } = | ||
SharedDependenciesService.initialize(env); | ||
this.kyselyDatabase = kyselyDatabase; | ||
|
||
// Initialize EVM provider | ||
const evmProvider = new EvmProvider(env.RPC_URLS, optimism, this.logger); | ||
|
||
this.orchestrator = new Orchestrator( | ||
env.CHAIN_ID as ChainId, | ||
{ ...core, evmProvider }, | ||
indexerClient, | ||
registries, | ||
env.FETCH_LIMIT, | ||
env.FETCH_DELAY_MS, | ||
); | ||
} | ||
|
||
/** | ||
* Start the processor service | ||
* | ||
* The processor runs indefinitely until it is terminated. | ||
*/ | ||
async start(): Promise<void> { | ||
this.logger.info("Starting processor service..."); | ||
|
||
const abortController = new AbortController(); | ||
|
||
// Handle graceful shutdown | ||
process.on("SIGINT", () => { | ||
this.logger.info("Received SIGINT signal. Shutting down..."); | ||
abortController.abort(); | ||
}); | ||
|
||
process.on("SIGTERM", () => { | ||
this.logger.info("Received SIGTERM signal. Shutting down..."); | ||
abortController.abort(); | ||
}); | ||
|
||
try { | ||
await this.orchestrator.run(abortController.signal); | ||
} catch (error) { | ||
this.logger.error(`Processor service failed: ${error}`); | ||
throw error; | ||
} | ||
} | ||
|
||
/** | ||
* Call this function when the processor service is terminated | ||
* - Releases database resources | ||
*/ | ||
async releaseResources(): Promise<void> { | ||
try { | ||
this.logger.info("Releasing resources..."); | ||
await this.kyselyDatabase.destroy(); | ||
} catch (error) { | ||
this.logger.error(`Error releasing resources: ${error}`); | ||
} | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would add: "or run the below command"