From 417b04463c7de1f0b702e8452e9e69280033ce8a Mon Sep 17 00:00:00 2001 From: Vasyl Ivanchuk Date: Mon, 4 Mar 2024 17:20:46 +0200 Subject: [PATCH] fix: data fetcher add graceful shutdown timeout --- packages/data-fetcher/.env.example | 2 ++ packages/data-fetcher/src/config.ts | 2 ++ .../src/health/health.controller.ts | 17 ++++++++++++++--- packages/data-fetcher/src/main.ts | 4 ---- packages/worker/src/block/block.watcher.ts | 17 +++++------------ 5 files changed, 23 insertions(+), 19 deletions(-) diff --git a/packages/data-fetcher/.env.example b/packages/data-fetcher/.env.example index e8fff235c8..5d001d848f 100644 --- a/packages/data-fetcher/.env.example +++ b/packages/data-fetcher/.env.example @@ -1,6 +1,8 @@ LOG_LEVEL=debug PORT=3040 +GRACEFUL_SHUTDOWN_TIMEOUT_MS=0 + BLOCKCHAIN_RPC_URL=http://localhost:3050 RPC_CALLS_DEFAULT_RETRY_TIMEOUT=30000 diff --git a/packages/data-fetcher/src/config.ts b/packages/data-fetcher/src/config.ts index da399ecc91..3a80a0a938 100644 --- a/packages/data-fetcher/src/config.ts +++ b/packages/data-fetcher/src/config.ts @@ -13,6 +13,7 @@ export default () => { WS_MAX_CONNECTIONS, USE_WEBSOCKETS_FOR_TRANSACTIONS, MAX_BLOCKS_BATCH_SIZE, + GRACEFUL_SHUTDOWN_TIMEOUT_MS, } = process.env; return { @@ -31,5 +32,6 @@ export default () => { useWebSocketsForTransactions: USE_WEBSOCKETS_FOR_TRANSACTIONS === "true", }, maxBlocksBatchSize: parseInt(MAX_BLOCKS_BATCH_SIZE, 10) || 20, + gracefulShutdownTimeoutMs: parseInt(GRACEFUL_SHUTDOWN_TIMEOUT_MS, 10) || 0, }; }; diff --git a/packages/data-fetcher/src/health/health.controller.ts b/packages/data-fetcher/src/health/health.controller.ts index 631a88434a..906c04fa02 100644 --- a/packages/data-fetcher/src/health/health.controller.ts +++ b/packages/data-fetcher/src/health/health.controller.ts @@ -1,18 +1,23 @@ import { Logger, Controller, Get, OnApplicationShutdown, BeforeApplicationShutdown } from "@nestjs/common"; import { HealthCheckService, HealthCheck, HealthCheckResult } from "@nestjs/terminus"; import { HttpAdapterHost } from "@nestjs/core"; +import { ConfigService } from "@nestjs/config"; import { JsonRpcHealthIndicator } from "./jsonRpcProvider.health"; +import { setTimeout } from "node:timers/promises"; @Controller(["health", "ready"]) export class HealthController implements OnApplicationShutdown, BeforeApplicationShutdown { private readonly logger: Logger; + private readonly gracefulShutdownTimeoutMs: number; constructor( private readonly healthCheckService: HealthCheckService, private readonly jsonRpcHealthIndicator: JsonRpcHealthIndicator, - private readonly httpAdapter: HttpAdapterHost + private readonly httpAdapter: HttpAdapterHost, + configService: ConfigService ) { this.logger = new Logger(HealthController.name); + this.gracefulShutdownTimeoutMs = configService.get("gracefulShutdownTimeoutMs"); } @Get() @@ -27,6 +32,7 @@ export class HealthController implements OnApplicationShutdown, BeforeApplicatio } async beforeApplicationShutdown(signal?: string): Promise { + this.logger.debug(`Received termination signal ${signal || ""}`); const httpServer = this.httpAdapter.httpAdapter.getHttpServer(); await new Promise((resolve) => { httpServer.getConnections((_, count) => { @@ -34,10 +40,16 @@ export class HealthController implements OnApplicationShutdown, BeforeApplicatio resolve(true); }); }); - this.logger.debug({ message: "Before application shutdown called", signal }); + + if (signal === "SIGTERM") { + this.logger.debug(`Awaiting ${this.gracefulShutdownTimeoutMs}ms before shutdown`); + await setTimeout(this.gracefulShutdownTimeoutMs); + this.logger.debug(`Timeout reached, shutting down now`); + } } async onApplicationShutdown(signal?: string): Promise { + this.logger.debug({ message: "On application shutdown called", signal }); const httpServer = this.httpAdapter.httpAdapter.getHttpServer(); await new Promise((resolve) => { httpServer.getConnections((_, count) => { @@ -45,6 +57,5 @@ export class HealthController implements OnApplicationShutdown, BeforeApplicatio resolve(true); }); }); - this.logger.debug({ message: "On application shutdown called", signal }); } } diff --git a/packages/data-fetcher/src/main.ts b/packages/data-fetcher/src/main.ts index 127090d85d..109d16815c 100644 --- a/packages/data-fetcher/src/main.ts +++ b/packages/data-fetcher/src/main.ts @@ -31,10 +31,6 @@ async function bootstrap() { app.enableShutdownHooks(); app.useGlobalInterceptors(new ResponseTransformInterceptor()); await app.listen(configService.get("port")); - - setInterval(() => { - logger.debug({ message: "I'm alive", context: "Main" }); - }, 1000); } bootstrap(); diff --git a/packages/worker/src/block/block.watcher.ts b/packages/worker/src/block/block.watcher.ts index 0c981032ff..15d9ce99da 100644 --- a/packages/worker/src/block/block.watcher.ts +++ b/packages/worker/src/block/block.watcher.ts @@ -90,18 +90,11 @@ export class BlockWatcher implements OnModuleInit, OnModuleDestroy { private async getBlockInfoFromBlockChain(blockNumber: number): Promise { this.logger.debug({ message: "Getting block from the blockchain", blockNumber }); - //const stopGetBlockInfoDurationMetric = this.getBlockInfoDurationMetric.startTimer(); - this.dataFetchService - .getBlockData(blockNumber) - .then((_) => { - this.logger.debug({ message: "Received block from the blockchain", blockNumber }); - }) - .catch((e) => { - this.logger.error(e); - }); - //stopGetBlockInfoDurationMetric(); - - return null; + const stopGetBlockInfoDurationMetric = this.getBlockInfoDurationMetric.startTimer(); + const blockData = await this.dataFetchService.getBlockData(blockNumber); + stopGetBlockInfoDurationMetric(); + + return blockData; } public async onModuleInit(): Promise {