From 9f73b4d28c6eddb0c9c710ab5409470873937717 Mon Sep 17 00:00:00 2001 From: Vasyl Ivanchuk Date: Wed, 6 Mar 2024 13:28:40 +0200 Subject: [PATCH] fix: add graceful shutdown timeout --- packages/api/.env.example | 1 + packages/api/src/config/index.spec.ts | 1 + packages/api/src/config/index.ts | 2 ++ .../api/src/health/health.controller.spec.ts | 36 +++++++++++++++++++ packages/api/src/health/health.controller.ts | 25 ++++++++++--- packages/data-fetcher/.env.example | 2 ++ packages/data-fetcher/src/config.spec.ts | 1 + packages/data-fetcher/src/config.ts | 2 ++ .../src/health/health.controller.spec.ts | 34 ++++++++++++++++-- .../src/health/health.controller.ts | 25 +++++++------ packages/data-fetcher/src/logger.ts | 2 +- packages/worker/src/block/block.processor.ts | 27 +++++++------- packages/worker/src/block/block.service.ts | 1 + 13 files changed, 128 insertions(+), 31 deletions(-) diff --git a/packages/api/.env.example b/packages/api/.env.example index ed85f208b3..03ca26849f 100644 --- a/packages/api/.env.example +++ b/packages/api/.env.example @@ -1,3 +1,4 @@ +GRACEFUL_SHUTDOWN_TIMEOUT_MS=0 DATABASE_URL=postgres://postgres:postgres@localhost:5432/block-explorer DATABASE_REPLICA_URL_0= DATABASE_CONNECTION_POOL_SIZE=50 diff --git a/packages/api/src/config/index.spec.ts b/packages/api/src/config/index.spec.ts index f7a1b7bf14..6803237f97 100644 --- a/packages/api/src/config/index.spec.ts +++ b/packages/api/src/config/index.spec.ts @@ -46,6 +46,7 @@ describe("config", () => { feature1Enabled: true, feature2Enabled: false, }, + gracefulShutdownTimeoutMs: 0, }); }); diff --git a/packages/api/src/config/index.ts b/packages/api/src/config/index.ts index 6b9c5f2a57..840259b1fa 100644 --- a/packages/api/src/config/index.ts +++ b/packages/api/src/config/index.ts @@ -12,6 +12,7 @@ export default () => { DATABASE_CONNECTION_IDLE_TIMEOUT_MS, DATABASE_STATEMENT_TIMEOUT_MS, CONTRACT_VERIFICATION_API_URL, + GRACEFUL_SHUTDOWN_TIMEOUT_MS, } = process.env; const MAX_NUMBER_OF_REPLICA = 100; @@ -74,5 +75,6 @@ export default () => { typeORM: getTypeOrmModuleOptions(), contractVerificationApiUrl: CONTRACT_VERIFICATION_API_URL || "http://127.0.0.1:3070", featureFlags, + gracefulShutdownTimeoutMs: parseInt(GRACEFUL_SHUTDOWN_TIMEOUT_MS, 10) || 0, }; }; diff --git a/packages/api/src/health/health.controller.spec.ts b/packages/api/src/health/health.controller.spec.ts index 54770ddda1..0dde57d1df 100644 --- a/packages/api/src/health/health.controller.spec.ts +++ b/packages/api/src/health/health.controller.spec.ts @@ -1,14 +1,24 @@ import { Test, TestingModule } from "@nestjs/testing"; import { HealthCheckService, TypeOrmHealthIndicator, HealthCheckResult } from "@nestjs/terminus"; import { mock } from "jest-mock-extended"; +import { ConfigService } from "@nestjs/config"; +import { setTimeout } from "node:timers/promises"; import { HealthController } from "./health.controller"; +jest.mock("node:timers/promises", () => ({ + setTimeout: jest.fn().mockResolvedValue(null), +})); + describe("HealthController", () => { let healthCheckServiceMock: HealthCheckService; let dbHealthCheckerMock: TypeOrmHealthIndicator; + let configServiceMock: ConfigService; let healthController: HealthController; beforeEach(async () => { + configServiceMock = mock({ + get: jest.fn().mockReturnValue(1), + }); healthCheckServiceMock = mock({ check: jest.fn().mockImplementation((healthChecks) => { for (const healthCheck of healthChecks) { @@ -30,6 +40,10 @@ describe("HealthController", () => { provide: TypeOrmHealthIndicator, useValue: dbHealthCheckerMock, }, + { + provide: ConfigService, + useValue: configServiceMock, + }, ], }).compile(); @@ -50,4 +64,26 @@ describe("HealthController", () => { expect(result).toBe(healthCheckResult); }); }); + + describe("beforeApplicationShutdown", () => { + beforeEach(() => { + (setTimeout as jest.Mock).mockReset(); + }); + + it("defined and returns void", async () => { + const result = await healthController.beforeApplicationShutdown(); + expect(result).toBeUndefined(); + }); + + it("awaits configured shutdown timeout", async () => { + await healthController.beforeApplicationShutdown("SIGTERM"); + expect(setTimeout).toBeCalledTimes(1); + expect(setTimeout).toBeCalledWith(1); + }); + + it("does not await shutdown timeout if signal is not SIGTERM", async () => { + await healthController.beforeApplicationShutdown("SIGINT"); + expect(setTimeout).toBeCalledTimes(0); + }); + }); }); diff --git a/packages/api/src/health/health.controller.ts b/packages/api/src/health/health.controller.ts index d5b5e7bf8b..d1a5db6640 100644 --- a/packages/api/src/health/health.controller.ts +++ b/packages/api/src/health/health.controller.ts @@ -1,18 +1,35 @@ -import { Controller, Get } from "@nestjs/common"; +import { Logger, Controller, Get, BeforeApplicationShutdown } from "@nestjs/common"; import { HealthCheckService, TypeOrmHealthIndicator, HealthCheck, HealthCheckResult } from "@nestjs/terminus"; import { ApiExcludeController } from "@nestjs/swagger"; +import { ConfigService } from "@nestjs/config"; +import { setTimeout } from "node:timers/promises"; @ApiExcludeController() @Controller(["health", "ready"]) -export class HealthController { +export class HealthController implements BeforeApplicationShutdown { + private readonly logger: Logger; + private readonly gracefulShutdownTimeoutMs: number; + constructor( private readonly healthCheckService: HealthCheckService, - private readonly dbHealthChecker: TypeOrmHealthIndicator - ) {} + private readonly dbHealthChecker: TypeOrmHealthIndicator, + configService: ConfigService + ) { + this.logger = new Logger(HealthController.name); + this.gracefulShutdownTimeoutMs = configService.get("gracefulShutdownTimeoutMs"); + } @Get() @HealthCheck() public async check(): Promise { return await this.healthCheckService.check([() => this.dbHealthChecker.pingCheck("database")]); } + + public async beforeApplicationShutdown(signal?: string): Promise { + if (this.gracefulShutdownTimeoutMs && signal === "SIGTERM") { + this.logger.debug(`Awaiting ${this.gracefulShutdownTimeoutMs}ms before shutdown`); + await setTimeout(this.gracefulShutdownTimeoutMs); + this.logger.debug(`Timeout reached, shutting down now`); + } + } } diff --git a/packages/data-fetcher/.env.example b/packages/data-fetcher/.env.example index e8fff235c8..5d001d848f 100644 --- a/packages/data-fetcher/.env.example +++ b/packages/data-fetcher/.env.example @@ -1,6 +1,8 @@ LOG_LEVEL=debug PORT=3040 +GRACEFUL_SHUTDOWN_TIMEOUT_MS=0 + BLOCKCHAIN_RPC_URL=http://localhost:3050 RPC_CALLS_DEFAULT_RETRY_TIMEOUT=30000 diff --git a/packages/data-fetcher/src/config.spec.ts b/packages/data-fetcher/src/config.spec.ts index b519910372..a26d65a490 100644 --- a/packages/data-fetcher/src/config.spec.ts +++ b/packages/data-fetcher/src/config.spec.ts @@ -25,6 +25,7 @@ describe("config", () => { useWebSocketsForTransactions: false, }, maxBlocksBatchSize: 20, + gracefulShutdownTimeoutMs: 0, }); }); }); diff --git a/packages/data-fetcher/src/config.ts b/packages/data-fetcher/src/config.ts index da399ecc91..3a80a0a938 100644 --- a/packages/data-fetcher/src/config.ts +++ b/packages/data-fetcher/src/config.ts @@ -13,6 +13,7 @@ export default () => { WS_MAX_CONNECTIONS, USE_WEBSOCKETS_FOR_TRANSACTIONS, MAX_BLOCKS_BATCH_SIZE, + GRACEFUL_SHUTDOWN_TIMEOUT_MS, } = process.env; return { @@ -31,5 +32,6 @@ export default () => { useWebSocketsForTransactions: USE_WEBSOCKETS_FOR_TRANSACTIONS === "true", }, maxBlocksBatchSize: parseInt(MAX_BLOCKS_BATCH_SIZE, 10) || 20, + gracefulShutdownTimeoutMs: parseInt(GRACEFUL_SHUTDOWN_TIMEOUT_MS, 10) || 0, }; }; diff --git a/packages/data-fetcher/src/health/health.controller.spec.ts b/packages/data-fetcher/src/health/health.controller.spec.ts index 065fcd36cb..a4efa52496 100644 --- a/packages/data-fetcher/src/health/health.controller.spec.ts +++ b/packages/data-fetcher/src/health/health.controller.spec.ts @@ -2,15 +2,25 @@ import { ServiceUnavailableException, Logger } from "@nestjs/common"; import { Test, TestingModule } from "@nestjs/testing"; import { HealthCheckService, HealthCheckResult } from "@nestjs/terminus"; import { mock } from "jest-mock-extended"; +import { ConfigService } from "@nestjs/config"; +import { setTimeout } from "node:timers/promises"; import { JsonRpcHealthIndicator } from "./jsonRpcProvider.health"; import { HealthController } from "./health.controller"; +jest.mock("node:timers/promises", () => ({ + setTimeout: jest.fn().mockResolvedValue(null), +})); + describe("HealthController", () => { let healthCheckServiceMock: HealthCheckService; let jsonRpcHealthIndicatorMock: JsonRpcHealthIndicator; + let configServiceMock: ConfigService; let healthController: HealthController; beforeEach(async () => { + configServiceMock = mock({ + get: jest.fn().mockReturnValue(1), + }); healthCheckServiceMock = mock({ check: jest.fn().mockImplementation((healthChecks) => { for (const healthCheck of healthChecks) { @@ -18,7 +28,6 @@ describe("HealthController", () => { } }), }); - jsonRpcHealthIndicatorMock = mock(); const app: TestingModule = await Test.createTestingModule({ @@ -32,6 +41,10 @@ describe("HealthController", () => { provide: JsonRpcHealthIndicator, useValue: jsonRpcHealthIndicatorMock, }, + { + provide: ConfigService, + useValue: configServiceMock, + }, ], }).compile(); @@ -82,10 +95,25 @@ describe("HealthController", () => { }); }); - describe("onApplicationShutdown", () => { + describe("beforeApplicationShutdown", () => { + beforeEach(() => { + (setTimeout as jest.Mock).mockReset(); + }); + it("defined and returns void", async () => { - const result = healthController.onApplicationShutdown(); + const result = await healthController.beforeApplicationShutdown(); expect(result).toBeUndefined(); }); + + it("awaits configured shutdown timeout", async () => { + await healthController.beforeApplicationShutdown("SIGTERM"); + expect(setTimeout).toBeCalledTimes(1); + expect(setTimeout).toBeCalledWith(1); + }); + + it("does not await shutdown timeout if signal is not SIGTERM", async () => { + await healthController.beforeApplicationShutdown("SIGINT"); + expect(setTimeout).toBeCalledTimes(0); + }); }); }); diff --git a/packages/data-fetcher/src/health/health.controller.ts b/packages/data-fetcher/src/health/health.controller.ts index 0568b6c115..bede1a9da1 100644 --- a/packages/data-fetcher/src/health/health.controller.ts +++ b/packages/data-fetcher/src/health/health.controller.ts @@ -1,34 +1,39 @@ -import { Logger, Controller, Get, OnApplicationShutdown } from "@nestjs/common"; +import { Logger, Controller, Get, BeforeApplicationShutdown } from "@nestjs/common"; import { HealthCheckService, HealthCheck, HealthCheckResult } from "@nestjs/terminus"; +import { ConfigService } from "@nestjs/config"; +import { setTimeout } from "node:timers/promises"; import { JsonRpcHealthIndicator } from "./jsonRpcProvider.health"; @Controller(["health", "ready"]) -export class HealthController implements OnApplicationShutdown { +export class HealthController implements BeforeApplicationShutdown { private readonly logger: Logger; + private readonly gracefulShutdownTimeoutMs: number; constructor( private readonly healthCheckService: HealthCheckService, - private readonly jsonRpcHealthIndicator: JsonRpcHealthIndicator + private readonly jsonRpcHealthIndicator: JsonRpcHealthIndicator, + configService: ConfigService ) { this.logger = new Logger(HealthController.name); + this.gracefulShutdownTimeoutMs = configService.get("gracefulShutdownTimeoutMs"); } @Get() @HealthCheck() public async check(): Promise { try { - const healthCheckResult = await this.healthCheckService.check([ - () => this.jsonRpcHealthIndicator.isHealthy("jsonRpcProvider"), - ]); - this.logger.debug({ message: "Health check result", ...healthCheckResult }); - return healthCheckResult; + return await this.healthCheckService.check([() => this.jsonRpcHealthIndicator.isHealthy("jsonRpcProvider")]); } catch (error) { this.logger.error({ message: error.message, response: error.getResponse() }, error.stack); throw error; } } - onApplicationShutdown(signal?: string): void { - this.logger.debug({ message: "Received a signal", signal }); + public async beforeApplicationShutdown(signal?: string): Promise { + if (this.gracefulShutdownTimeoutMs && signal === "SIGTERM") { + this.logger.debug(`Awaiting ${this.gracefulShutdownTimeoutMs}ms before shutdown`); + await setTimeout(this.gracefulShutdownTimeoutMs); + this.logger.debug(`Timeout reached, shutting down now`); + } } } diff --git a/packages/data-fetcher/src/logger.ts b/packages/data-fetcher/src/logger.ts index 48476324d8..76f0e87c08 100644 --- a/packages/data-fetcher/src/logger.ts +++ b/packages/data-fetcher/src/logger.ts @@ -9,7 +9,7 @@ const loggerFormatters: Logform.Format[] = [ format: "DD/MM/YYYY HH:mm:ss.SSS", }), format.ms(), - utilities.format.nestLike("Worker", {}), + utilities.format.nestLike("DataFetcher", {}), ]; if (NODE_ENV === "production") { diff --git a/packages/worker/src/block/block.processor.ts b/packages/worker/src/block/block.processor.ts index b6c3ecaed7..69ea1bf2b6 100644 --- a/packages/worker/src/block/block.processor.ts +++ b/packages/worker/src/block/block.processor.ts @@ -87,12 +87,11 @@ export class BlockProcessor { if (!allBlocksExist) { // We don't need to handle this potential revert as these blocks are not in DB yet, // try again later once these blocks are present in blockchain again. - this.logger.warn( - "Not all the requested blocks from the next blocks to process range exist in blockchain, likely revert has happened", - { - lastDbBlockNumber, - } - ); + this.logger.warn({ + message: + "Not all the requested blocks from the next blocks to process range exist in blockchain, likely revert has happened", + lastDbBlockNumber, + }); return false; } const isBlocksLinkingValid = validateBlocksLinking(blocksToProcess); @@ -100,12 +99,11 @@ export class BlockProcessor { // We don't need to handle this revert as these blocks are not in DB yet, // we just need to wait for blockchain to complete this revert before inserting these blocks. // This is very unlikely to ever happen. - this.logger.warn( - "Some of the requested blocks from the next blocks to process range have invalid link to previous block, likely revert has happened", - { - lastDbBlockNumber: lastDbBlockNumber, - } - ); + this.logger.warn({ + message: + "Some of the requested blocks from the next blocks to process range have invalid link to previous block, likely revert has happened", + lastDbBlockNumber: lastDbBlockNumber, + }); return false; } @@ -137,7 +135,10 @@ export class BlockProcessor { } private triggerBlocksRevertEvent(detectedIncorrectBlockNumber: number) { - this.logger.warn("Blocks revert detected", { detectedIncorrectBlockNumber }); + this.logger.warn({ + message: "Blocks revert detected", + detectedIncorrectBlockNumber, + }); if (!this.disableBlocksRevert) { this.eventEmitter.emit(BLOCKS_REVERT_DETECTED_EVENT, { detectedIncorrectBlockNumber, diff --git a/packages/worker/src/block/block.service.ts b/packages/worker/src/block/block.service.ts index 37bdc78797..9e92983f4a 100644 --- a/packages/worker/src/block/block.service.ts +++ b/packages/worker/src/block/block.service.ts @@ -33,6 +33,7 @@ export class BlockService extends Worker { this.logger.error(`Error on processing next block range, waiting ${nextIterationDelay} ms to retry`, error.stack); } if (nextIterationDelay) { + this.logger.debug(`Waiting for ${nextIterationDelay}ms`); await waitFor(() => !this.currentProcessPromise, nextIterationDelay); } if (!this.currentProcessPromise) {