Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: health checks for worker and fetcher #374

Merged
merged 3 commits into from
Jan 20, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 15 additions & 11 deletions package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 3 additions & 1 deletion packages/data-fetcher/.env.example
Original file line number Diff line number Diff line change
Expand Up @@ -16,4 +16,6 @@ RPC_BATCH_MAX_COUNT=10
RPC_BATCH_MAX_SIZE_BYTES=1048576
RPC_BATCH_STALL_TIME_MS=0

MAX_BLOCKS_BATCH_SIZE=20
MAX_BLOCKS_BATCH_SIZE=20

RPC_HEALTH_CHECK_TIMEOUT_MS=20_000
2 changes: 2 additions & 0 deletions packages/data-fetcher/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -24,12 +24,14 @@
"test:e2e": "jest --config ./test/jest-e2e.json"
},
"dependencies": {
"@nestjs/axios": "^3.1.3",
"@nestjs/common": "^9.0.0",
"@nestjs/config": "^2.2.0",
"@nestjs/core": "^9.0.0",
"@nestjs/platform-express": "^9.0.0",
"@nestjs/terminus": "^9.1.2",
"@willsoto/nestjs-prometheus": "^4.7.0",
"axios": "^1.7.9",
"ethers": "6.13.4",
"nest-winston": "^1.7.0",
"prom-client": "^14.1.0",
Expand Down
3 changes: 3 additions & 0 deletions packages/data-fetcher/src/config.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,9 @@ describe("config", () => {
},
maxBlocksBatchSize: 20,
gracefulShutdownTimeoutMs: 0,
healthChecks: {
rpcHealthCheckTimeoutMs: 20_000,
},
};
});

Expand Down
4 changes: 4 additions & 0 deletions packages/data-fetcher/src/config.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ export default () => {
RPC_BATCH_STALL_TIME_MS,
MAX_BLOCKS_BATCH_SIZE,
GRACEFUL_SHUTDOWN_TIMEOUT_MS,
RPC_HEALTH_CHECK_TIMEOUT_MS,
} = process.env;

return {
Expand Down Expand Up @@ -42,5 +43,8 @@ export default () => {
},
maxBlocksBatchSize: parseInt(MAX_BLOCKS_BATCH_SIZE, 10) || 20,
gracefulShutdownTimeoutMs: parseInt(GRACEFUL_SHUTDOWN_TIMEOUT_MS, 10) || 0,
healthChecks: {
rpcHealthCheckTimeoutMs: parseInt(RPC_HEALTH_CHECK_TIMEOUT_MS, 10) || 20_000,
},
};
};
3 changes: 2 additions & 1 deletion packages/data-fetcher/src/health/health.module.ts
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
import { Module } from "@nestjs/common";
import { TerminusModule } from "@nestjs/terminus";
import { HttpModule } from "@nestjs/axios";
import { HealthController } from "./health.controller";
import { JsonRpcHealthIndicator } from "./jsonRpcProvider.health";

@Module({
controllers: [HealthController],
imports: [TerminusModule],
imports: [TerminusModule, HttpModule],
providers: [JsonRpcHealthIndicator],
})
export class HealthModule {}
105 changes: 80 additions & 25 deletions packages/data-fetcher/src/health/jsonRpcProvider.health.spec.ts
Original file line number Diff line number Diff line change
@@ -1,56 +1,111 @@
import { Test, TestingModule } from "@nestjs/testing";
import { Logger } from "@nestjs/common";
import { mock } from "jest-mock-extended";
import { HealthCheckError } from "@nestjs/terminus";
import { JsonRpcProviderBase } from "../rpcProvider";
import { JsonRpcHealthIndicator } from "./jsonRpcProvider.health";
import { ConfigService } from "@nestjs/config";
import { HttpService } from "@nestjs/axios";
import { of, throwError } from "rxjs";
import { AxiosError } from "axios";

describe("JsonRpcHealthIndicator", () => {
const healthIndicatorKey = "rpcProvider";
let jsonRpcProviderMock: JsonRpcProviderBase;
let jsonRpcHealthIndicator: JsonRpcHealthIndicator;
let httpService: HttpService;
let configService: ConfigService;

beforeEach(async () => {
jsonRpcProviderMock = mock<JsonRpcProviderBase>();

const getHealthIndicator = async () => {
const app: TestingModule = await Test.createTestingModule({
providers: [
JsonRpcHealthIndicator,
{
provide: JsonRpcProviderBase,
useValue: jsonRpcProviderMock,
},
{
provide: HttpService,
useValue: httpService,
},
{
provide: ConfigService,
useValue: configService,
},
],
}).compile();

jsonRpcHealthIndicator = app.get<JsonRpcHealthIndicator>(JsonRpcHealthIndicator);
app.useLogger(mock<Logger>());
return app.get<JsonRpcHealthIndicator>(JsonRpcHealthIndicator);
};

beforeEach(async () => {
jsonRpcProviderMock = mock<JsonRpcProviderBase>();

httpService = mock<HttpService>({
post: jest.fn(),
});

configService = mock<ConfigService>({
get: jest.fn().mockImplementation((key: string) => {
if (key === "blockchain.rpcUrl") return "http://localhost:3050";
if (key === "healthChecks.rpcHealthCheckTimeoutMs") return 5000;
return null;
}),
});

jsonRpcHealthIndicator = await getHealthIndicator();
});

describe("isHealthy", () => {
describe("when rpcProvider is open", () => {
beforeEach(() => {
jest.spyOn(jsonRpcProviderMock, "getState").mockReturnValueOnce("open");
});
const rpcRequest = {
id: 1,
jsonrpc: "2.0",
method: "eth_chainId",
params: [],
};

it("returns OK health indicator result", async () => {
const result = await jsonRpcHealthIndicator.isHealthy(healthIndicatorKey);
expect(result).toEqual({ [healthIndicatorKey]: { rpcProviderState: "open", status: "up" } });
it("returns healthy status when RPC responds successfully", async () => {
(httpService.post as jest.Mock).mockReturnValueOnce(of({ data: { result: "0x1" } }));
const result = await jsonRpcHealthIndicator.isHealthy("jsonRpcProvider");
expect(result).toEqual({
jsonRpcProvider: {
status: "up",
},
});
expect(httpService.post).toHaveBeenCalledWith("http://localhost:3050", rpcRequest, { timeout: 5000 });
});

describe("when rpcProvider is closed", () => {
beforeEach(() => {
jest.spyOn(jsonRpcProviderMock, "getState").mockReturnValueOnce("closed");
});
it("throws HealthCheckError when RPC request fails", async () => {
const error = new AxiosError();
error.response = {
status: 503,
data: "Service Unavailable",
} as any;

it("throws HealthCheckError error", async () => {
expect.assertions(2);
try {
await jsonRpcHealthIndicator.isHealthy(healthIndicatorKey);
} catch (error) {
expect(error).toBeInstanceOf(HealthCheckError);
expect(error.message).toBe("JSON RPC provider is not in open state");
}
(httpService.post as jest.Mock).mockReturnValueOnce(throwError(() => error));
await expect(jsonRpcHealthIndicator.isHealthy("jsonRpcProvider")).rejects.toThrow();
expect(httpService.post).toHaveBeenCalledWith("http://localhost:3050", rpcRequest, { timeout: 5000 });
});

it("throws HealthCheckError when RPC request times out", async () => {
const error = new AxiosError();
error.code = "ECONNABORTED";

(httpService.post as jest.Mock).mockReturnValueOnce(throwError(() => error));
await expect(jsonRpcHealthIndicator.isHealthy("jsonRpcProvider")).rejects.toThrow();
expect(httpService.post).toHaveBeenCalledWith("http://localhost:3050", rpcRequest, { timeout: 5000 });
});

it("uses configured timeout from config service", async () => {
(configService.get as jest.Mock).mockImplementation((key: string) => {
if (key === "blockchain.rpcUrl") return "http://localhost:3050";
if (key === "healthChecks.rpcHealthCheckTimeoutMs") return 10000;
return null;
});
jsonRpcHealthIndicator = await getHealthIndicator();

(httpService.post as jest.Mock).mockReturnValueOnce(of({ data: { result: "0x1" } }));
await jsonRpcHealthIndicator.isHealthy("jsonRpcProvider");
expect(httpService.post).toHaveBeenCalledWith("http://localhost:3050", rpcRequest, { timeout: 10000 });
});
});
});
54 changes: 48 additions & 6 deletions packages/data-fetcher/src/health/jsonRpcProvider.health.ts
Original file line number Diff line number Diff line change
@@ -1,22 +1,64 @@
import { Injectable } from "@nestjs/common";
import { HealthIndicator, HealthIndicatorResult, HealthCheckError } from "@nestjs/terminus";
import { JsonRpcProviderBase } from "../rpcProvider";
import { ConfigService } from "@nestjs/config";
import { Logger } from "@nestjs/common";
import { HttpService } from "@nestjs/axios";
import { catchError, firstValueFrom } from "rxjs";
import { AxiosError } from "axios";

@Injectable()
export class JsonRpcHealthIndicator extends HealthIndicator {
constructor(private readonly provider: JsonRpcProviderBase) {
private readonly rpcUrl: string;
private readonly healthCheckTimeoutMs: number;
private readonly logger: Logger;

constructor(configService: ConfigService, private readonly httpService: HttpService) {
super();
this.logger = new Logger(JsonRpcHealthIndicator.name);
this.rpcUrl = configService.get<string>("blockchain.rpcUrl");
this.healthCheckTimeoutMs = configService.get<number>("healthChecks.rpcHealthCheckTimeoutMs");
}

async isHealthy(key: string): Promise<HealthIndicatorResult> {
const rpcProviderState = this.provider.getState();
const isHealthy = rpcProviderState === "open";
const result = this.getStatus(key, isHealthy, { rpcProviderState });
let isHealthy = true;
try {
// Check RPC health with a pure HTTP request to remove SDK out of the picture
// and avoid any SDK-specific issues.
// Use eth_chainId call as it is the lightest one and return a static value from the memory.
await firstValueFrom(
this.httpService
.post(
this.rpcUrl,
{
id: 1,
jsonrpc: "2.0",
method: "eth_chainId",
params: [],
},
{ timeout: this.healthCheckTimeoutMs }
)
.pipe(
catchError((error: AxiosError) => {
this.logger.error({
message: `Failed to ping RPC`,
stack: error.stack,
status: error.response?.status,
response: error.response?.data,
});
throw error;
})
)
);
} catch {
isHealthy = false;
}

const result = this.getStatus(key, isHealthy, { status: isHealthy ? "up" : "down" });

if (isHealthy) {
return result;
}

throw new HealthCheckError("JSON RPC provider is not in open state", result);
throw new HealthCheckError("JSON RPC provider is down or not reachable", result);
}
}
3 changes: 3 additions & 0 deletions packages/worker/.env.example
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,9 @@ RPC_BATCH_STALL_TIME_MS=0
COLLECT_DB_CONNECTION_POOL_METRICS_INTERVAL=10000
COLLECT_BLOCKS_TO_PROCESS_METRIC_INTERVAL=10000

RPC_HEALTH_CHECK_TIMEOUT_MS=20000
DB_HEALTH_CHECK_TIMEOUT_MS=20000

DISABLE_MISSING_BLOCKS_METRIC=false
CHECK_MISSING_BLOCKS_METRIC_INTERVAL=86400000

Expand Down
8 changes: 8 additions & 0 deletions packages/worker/src/config.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,10 @@ describe("config", () => {
interval: 86_400_000,
},
},
healthChecks: {
rpcHealthCheckTimeoutMs: 20_000,
dbHealthCheckTimeoutMs: 20_000,
},
};
});

Expand Down Expand Up @@ -123,6 +127,10 @@ describe("config", () => {
interval: 86_400_000,
},
},
healthChecks: {
rpcHealthCheckTimeoutMs: 20_000,
dbHealthCheckTimeoutMs: 20_000,
},
});
});

Expand Down
6 changes: 6 additions & 0 deletions packages/worker/src/config.ts
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@ export default () => {
COINGECKO_API_KEY,
DISABLE_MISSING_BLOCKS_METRIC,
CHECK_MISSING_BLOCKS_METRIC_INTERVAL,
RPC_HEALTH_CHECK_TIMEOUT_MS,
DB_HEALTH_CHECK_TIMEOUT_MS,
} = process.env;

return {
Expand Down Expand Up @@ -97,5 +99,9 @@ export default () => {
interval: parseInt(CHECK_MISSING_BLOCKS_METRIC_INTERVAL, 10) || 86_400_000, // 1 day
},
},
healthChecks: {
rpcHealthCheckTimeoutMs: parseInt(RPC_HEALTH_CHECK_TIMEOUT_MS, 10) || 20_000,
dbHealthCheckTimeoutMs: parseInt(DB_HEALTH_CHECK_TIMEOUT_MS, 10) || 20_000,
},
};
};
Loading
Loading