Skip to content

Commit

Permalink
fix: add graceful shutdown timeout (#196)
Browse files Browse the repository at this point in the history
# What ❔

Add graceful shutdown timeout.

## Why ❔

To ensure that we can gracefully process all incoming HTTP requests
before shutting down the service.

## Checklist

<!-- Check your PR fulfills the following items. -->
<!-- For draft PRs check the boxes as you complete them. -->

- [X] PR title corresponds to the body of PR (we generate changelog
entries from PRs).
- [X] Tests for the changes have been added / updated.
  • Loading branch information
vasyl-ivanchuk authored Mar 6, 2024
1 parent 18925c4 commit 19b0099
Show file tree
Hide file tree
Showing 13 changed files with 128 additions and 31 deletions.
1 change: 1 addition & 0 deletions packages/api/.env.example
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
GRACEFUL_SHUTDOWN_TIMEOUT_MS=0
DATABASE_URL=postgres://postgres:postgres@localhost:5432/block-explorer
DATABASE_REPLICA_URL_0=
DATABASE_CONNECTION_POOL_SIZE=50
Expand Down
1 change: 1 addition & 0 deletions packages/api/src/config/index.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ describe("config", () => {
feature1Enabled: true,
feature2Enabled: false,
},
gracefulShutdownTimeoutMs: 0,
});
});

Expand Down
2 changes: 2 additions & 0 deletions packages/api/src/config/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ export default () => {
DATABASE_CONNECTION_IDLE_TIMEOUT_MS,
DATABASE_STATEMENT_TIMEOUT_MS,
CONTRACT_VERIFICATION_API_URL,
GRACEFUL_SHUTDOWN_TIMEOUT_MS,
} = process.env;

const MAX_NUMBER_OF_REPLICA = 100;
Expand Down Expand Up @@ -74,5 +75,6 @@ export default () => {
typeORM: getTypeOrmModuleOptions(),
contractVerificationApiUrl: CONTRACT_VERIFICATION_API_URL || "http://127.0.0.1:3070",
featureFlags,
gracefulShutdownTimeoutMs: parseInt(GRACEFUL_SHUTDOWN_TIMEOUT_MS, 10) || 0,
};
};
36 changes: 36 additions & 0 deletions packages/api/src/health/health.controller.spec.ts
Original file line number Diff line number Diff line change
@@ -1,14 +1,24 @@
import { Test, TestingModule } from "@nestjs/testing";
import { HealthCheckService, TypeOrmHealthIndicator, HealthCheckResult } from "@nestjs/terminus";
import { mock } from "jest-mock-extended";
import { ConfigService } from "@nestjs/config";
import { setTimeout } from "node:timers/promises";
import { HealthController } from "./health.controller";

jest.mock("node:timers/promises", () => ({
setTimeout: jest.fn().mockResolvedValue(null),
}));

describe("HealthController", () => {
let healthCheckServiceMock: HealthCheckService;
let dbHealthCheckerMock: TypeOrmHealthIndicator;
let configServiceMock: ConfigService;
let healthController: HealthController;

beforeEach(async () => {
configServiceMock = mock<ConfigService>({
get: jest.fn().mockReturnValue(1),
});
healthCheckServiceMock = mock<HealthCheckService>({
check: jest.fn().mockImplementation((healthChecks) => {
for (const healthCheck of healthChecks) {
Expand All @@ -30,6 +40,10 @@ describe("HealthController", () => {
provide: TypeOrmHealthIndicator,
useValue: dbHealthCheckerMock,
},
{
provide: ConfigService,
useValue: configServiceMock,
},
],
}).compile();

Expand All @@ -50,4 +64,26 @@ describe("HealthController", () => {
expect(result).toBe(healthCheckResult);
});
});

describe("beforeApplicationShutdown", () => {
beforeEach(() => {
(setTimeout as jest.Mock).mockReset();
});

it("defined and returns void", async () => {
const result = await healthController.beforeApplicationShutdown();
expect(result).toBeUndefined();
});

it("awaits configured shutdown timeout", async () => {
await healthController.beforeApplicationShutdown("SIGTERM");
expect(setTimeout).toBeCalledTimes(1);
expect(setTimeout).toBeCalledWith(1);
});

it("does not await shutdown timeout if signal is not SIGTERM", async () => {
await healthController.beforeApplicationShutdown("SIGINT");
expect(setTimeout).toBeCalledTimes(0);
});
});
});
25 changes: 21 additions & 4 deletions packages/api/src/health/health.controller.ts
Original file line number Diff line number Diff line change
@@ -1,18 +1,35 @@
import { Controller, Get } from "@nestjs/common";
import { Logger, Controller, Get, BeforeApplicationShutdown } from "@nestjs/common";
import { HealthCheckService, TypeOrmHealthIndicator, HealthCheck, HealthCheckResult } from "@nestjs/terminus";
import { ApiExcludeController } from "@nestjs/swagger";
import { ConfigService } from "@nestjs/config";
import { setTimeout } from "node:timers/promises";

@ApiExcludeController()
@Controller(["health", "ready"])
export class HealthController {
export class HealthController implements BeforeApplicationShutdown {
private readonly logger: Logger;
private readonly gracefulShutdownTimeoutMs: number;

constructor(
private readonly healthCheckService: HealthCheckService,
private readonly dbHealthChecker: TypeOrmHealthIndicator
) {}
private readonly dbHealthChecker: TypeOrmHealthIndicator,
configService: ConfigService
) {
this.logger = new Logger(HealthController.name);
this.gracefulShutdownTimeoutMs = configService.get<number>("gracefulShutdownTimeoutMs");
}

@Get()
@HealthCheck()
public async check(): Promise<HealthCheckResult> {
return await this.healthCheckService.check([() => this.dbHealthChecker.pingCheck("database")]);
}

public async beforeApplicationShutdown(signal?: string): Promise<void> {
if (this.gracefulShutdownTimeoutMs && signal === "SIGTERM") {
this.logger.debug(`Awaiting ${this.gracefulShutdownTimeoutMs}ms before shutdown`);
await setTimeout(this.gracefulShutdownTimeoutMs);
this.logger.debug(`Timeout reached, shutting down now`);
}
}
}
2 changes: 2 additions & 0 deletions packages/data-fetcher/.env.example
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
LOG_LEVEL=debug
PORT=3040

GRACEFUL_SHUTDOWN_TIMEOUT_MS=0

BLOCKCHAIN_RPC_URL=http://localhost:3050

RPC_CALLS_DEFAULT_RETRY_TIMEOUT=30000
Expand Down
1 change: 1 addition & 0 deletions packages/data-fetcher/src/config.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ describe("config", () => {
useWebSocketsForTransactions: false,
},
maxBlocksBatchSize: 20,
gracefulShutdownTimeoutMs: 0,
});
});
});
2 changes: 2 additions & 0 deletions packages/data-fetcher/src/config.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ export default () => {
WS_MAX_CONNECTIONS,
USE_WEBSOCKETS_FOR_TRANSACTIONS,
MAX_BLOCKS_BATCH_SIZE,
GRACEFUL_SHUTDOWN_TIMEOUT_MS,
} = process.env;

return {
Expand All @@ -31,5 +32,6 @@ export default () => {
useWebSocketsForTransactions: USE_WEBSOCKETS_FOR_TRANSACTIONS === "true",
},
maxBlocksBatchSize: parseInt(MAX_BLOCKS_BATCH_SIZE, 10) || 20,
gracefulShutdownTimeoutMs: parseInt(GRACEFUL_SHUTDOWN_TIMEOUT_MS, 10) || 0,
};
};
34 changes: 31 additions & 3 deletions packages/data-fetcher/src/health/health.controller.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,23 +2,32 @@ import { ServiceUnavailableException, Logger } from "@nestjs/common";
import { Test, TestingModule } from "@nestjs/testing";
import { HealthCheckService, HealthCheckResult } from "@nestjs/terminus";
import { mock } from "jest-mock-extended";
import { ConfigService } from "@nestjs/config";
import { setTimeout } from "node:timers/promises";
import { JsonRpcHealthIndicator } from "./jsonRpcProvider.health";
import { HealthController } from "./health.controller";

jest.mock("node:timers/promises", () => ({
setTimeout: jest.fn().mockResolvedValue(null),
}));

describe("HealthController", () => {
let healthCheckServiceMock: HealthCheckService;
let jsonRpcHealthIndicatorMock: JsonRpcHealthIndicator;
let configServiceMock: ConfigService;
let healthController: HealthController;

beforeEach(async () => {
configServiceMock = mock<ConfigService>({
get: jest.fn().mockReturnValue(1),
});
healthCheckServiceMock = mock<HealthCheckService>({
check: jest.fn().mockImplementation((healthChecks) => {
for (const healthCheck of healthChecks) {
healthCheck();
}
}),
});

jsonRpcHealthIndicatorMock = mock<JsonRpcHealthIndicator>();

const app: TestingModule = await Test.createTestingModule({
Expand All @@ -32,6 +41,10 @@ describe("HealthController", () => {
provide: JsonRpcHealthIndicator,
useValue: jsonRpcHealthIndicatorMock,
},
{
provide: ConfigService,
useValue: configServiceMock,
},
],
}).compile();

Expand Down Expand Up @@ -82,10 +95,25 @@ describe("HealthController", () => {
});
});

describe("onApplicationShutdown", () => {
describe("beforeApplicationShutdown", () => {
beforeEach(() => {
(setTimeout as jest.Mock).mockReset();
});

it("defined and returns void", async () => {
const result = healthController.onApplicationShutdown();
const result = await healthController.beforeApplicationShutdown();
expect(result).toBeUndefined();
});

it("awaits configured shutdown timeout", async () => {
await healthController.beforeApplicationShutdown("SIGTERM");
expect(setTimeout).toBeCalledTimes(1);
expect(setTimeout).toBeCalledWith(1);
});

it("does not await shutdown timeout if signal is not SIGTERM", async () => {
await healthController.beforeApplicationShutdown("SIGINT");
expect(setTimeout).toBeCalledTimes(0);
});
});
});
25 changes: 15 additions & 10 deletions packages/data-fetcher/src/health/health.controller.ts
Original file line number Diff line number Diff line change
@@ -1,34 +1,39 @@
import { Logger, Controller, Get, OnApplicationShutdown } from "@nestjs/common";
import { Logger, Controller, Get, BeforeApplicationShutdown } from "@nestjs/common";
import { HealthCheckService, HealthCheck, HealthCheckResult } from "@nestjs/terminus";
import { ConfigService } from "@nestjs/config";
import { setTimeout } from "node:timers/promises";
import { JsonRpcHealthIndicator } from "./jsonRpcProvider.health";

@Controller(["health", "ready"])
export class HealthController implements OnApplicationShutdown {
export class HealthController implements BeforeApplicationShutdown {
private readonly logger: Logger;
private readonly gracefulShutdownTimeoutMs: number;

constructor(
private readonly healthCheckService: HealthCheckService,
private readonly jsonRpcHealthIndicator: JsonRpcHealthIndicator
private readonly jsonRpcHealthIndicator: JsonRpcHealthIndicator,
configService: ConfigService
) {
this.logger = new Logger(HealthController.name);
this.gracefulShutdownTimeoutMs = configService.get<number>("gracefulShutdownTimeoutMs");
}

@Get()
@HealthCheck()
public async check(): Promise<HealthCheckResult> {
try {
const healthCheckResult = await this.healthCheckService.check([
() => this.jsonRpcHealthIndicator.isHealthy("jsonRpcProvider"),
]);
this.logger.debug({ message: "Health check result", ...healthCheckResult });
return healthCheckResult;
return await this.healthCheckService.check([() => this.jsonRpcHealthIndicator.isHealthy("jsonRpcProvider")]);
} catch (error) {
this.logger.error({ message: error.message, response: error.getResponse() }, error.stack);
throw error;
}
}

onApplicationShutdown(signal?: string): void {
this.logger.debug({ message: "Received a signal", signal });
public async beforeApplicationShutdown(signal?: string): Promise<void> {
if (this.gracefulShutdownTimeoutMs && signal === "SIGTERM") {
this.logger.debug(`Awaiting ${this.gracefulShutdownTimeoutMs}ms before shutdown`);
await setTimeout(this.gracefulShutdownTimeoutMs);
this.logger.debug(`Timeout reached, shutting down now`);
}
}
}
2 changes: 1 addition & 1 deletion packages/data-fetcher/src/logger.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ const loggerFormatters: Logform.Format[] = [
format: "DD/MM/YYYY HH:mm:ss.SSS",
}),
format.ms(),
utilities.format.nestLike("Worker", {}),
utilities.format.nestLike("DataFetcher", {}),
];

if (NODE_ENV === "production") {
Expand Down
27 changes: 14 additions & 13 deletions packages/worker/src/block/block.processor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -87,25 +87,23 @@ export class BlockProcessor {
if (!allBlocksExist) {
// We don't need to handle this potential revert as these blocks are not in DB yet,
// try again later once these blocks are present in blockchain again.
this.logger.warn(
"Not all the requested blocks from the next blocks to process range exist in blockchain, likely revert has happened",
{
lastDbBlockNumber,
}
);
this.logger.warn({
message:
"Not all the requested blocks from the next blocks to process range exist in blockchain, likely revert has happened",
lastDbBlockNumber,
});
return false;
}
const isBlocksLinkingValid = validateBlocksLinking(blocksToProcess);
if (!isBlocksLinkingValid) {
// We don't need to handle this revert as these blocks are not in DB yet,
// we just need to wait for blockchain to complete this revert before inserting these blocks.
// This is very unlikely to ever happen.
this.logger.warn(
"Some of the requested blocks from the next blocks to process range have invalid link to previous block, likely revert has happened",
{
lastDbBlockNumber: lastDbBlockNumber,
}
);
this.logger.warn({
message:
"Some of the requested blocks from the next blocks to process range have invalid link to previous block, likely revert has happened",
lastDbBlockNumber: lastDbBlockNumber,
});
return false;
}

Expand Down Expand Up @@ -137,7 +135,10 @@ export class BlockProcessor {
}

private triggerBlocksRevertEvent(detectedIncorrectBlockNumber: number) {
this.logger.warn("Blocks revert detected", { detectedIncorrectBlockNumber });
this.logger.warn({
message: "Blocks revert detected",
detectedIncorrectBlockNumber,
});
if (!this.disableBlocksRevert) {
this.eventEmitter.emit(BLOCKS_REVERT_DETECTED_EVENT, {
detectedIncorrectBlockNumber,
Expand Down
1 change: 1 addition & 0 deletions packages/worker/src/block/block.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ export class BlockService extends Worker {
this.logger.error(`Error on processing next block range, waiting ${nextIterationDelay} ms to retry`, error.stack);
}
if (nextIterationDelay) {
this.logger.debug(`Waiting for ${nextIterationDelay}ms`);
await waitFor(() => !this.currentProcessPromise, nextIterationDelay);
}
if (!this.currentProcessPromise) {
Expand Down

0 comments on commit 19b0099

Please sign in to comment.