Skip to content

Commit

Permalink
fix: configurable watchdog timeout
Browse files Browse the repository at this point in the history
  • Loading branch information
mfw78 committed Oct 6, 2023
1 parent ec69080 commit f994854
Show file tree
Hide file tree
Showing 4 changed files with 56 additions and 33 deletions.
11 changes: 9 additions & 2 deletions src/commands/run.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,14 @@ import { ApiService } from "../utils/api";
*/
export async function run(options: RunOptions) {
const log = getLogger("commands:run");
const { rpc, deploymentBlock, oneShot, disableApi, apiPort } = options;
const {
rpc,
deploymentBlock,
oneShot,
disableApi,
apiPort,
watchdogTimeout,
} = options;

// Start the API server if it's not disabled
if (!disableApi) {
Expand Down Expand Up @@ -45,7 +52,7 @@ export async function run(options: RunOptions) {

// Run the block watcher after warm up for each chain
const runPromises = chainContexts.map(async (context) => {
return context.warmUp(oneShot);
return context.warmUp(watchdogTimeout, oneShot);
});

// Run all the chain contexts
Expand Down
16 changes: 9 additions & 7 deletions src/domain/chainContext.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ import { composableCowContract, DBService, getLogger } from "../utils";
import { MetricsService } from "../utils/metrics";

const WATCHDOG_FREQUENCY = 5 * 1000; // 5 seconds
const WATCHDOG_KILL_THRESHOLD = 30 * 1000; // 30 seconds

const MULTICALL3 = "0xcA11bde05977b3631167028862bE2a173976CA11";

Expand Down Expand Up @@ -92,10 +91,11 @@ export class ChainContext {
/**
* Warm up the chain watcher by fetching the latest block number and
* checking if the chain is in sync.
* @param watchdogTimeout the timeout for the watchdog
* @param oneShot if true, only warm up the chain watcher and return
* @returns the run promises for what needs to be watched
*/
public async warmUp(oneShot?: boolean) {
public async warmUp(watchdogTimeout: number, oneShot?: boolean) {
const { provider, chainId } = this;
const log = getLogger(`chainContext:warmUp:${chainId}`);
const { lastProcessedBlock } = this.registry;
Expand Down Expand Up @@ -225,19 +225,19 @@ export class ChainContext {
}

// Otherwise, run the block watcher
return await this.runBlockWatcher();
return await this.runBlockWatcher(watchdogTimeout);
}

/**
* Run the block watcher for the chain. As new blocks come in:
* 1. Check if there are any `ConditionalOrderCreated` events, and index these.
* 2. Check if any orders want to create discrete orders.
*/
private async runBlockWatcher() {
private async runBlockWatcher(watchdogTimeout: number) {
const { provider, registry, chainId } = this;
const log = getLogger(`chainContext:runBlockWatcher:${chainId}`);
// Watch for new blocks
log.info("👀 Start block watcher");
log.info(`👀 Start block watcher with ${watchdogTimeout}s timeout`);
let lastBlockReceived = 0;
let timeLastBlockProcessed = new Date().getTime();
provider.on("block", async (blockNumber: number) => {
Expand Down Expand Up @@ -294,8 +294,10 @@ export class ChainContext {
log.debug(`Time since last block processed: ${timeElapsed}ms`);

// If we haven't received a block in 30 seconds, exit so that the process manager can restart us
if (timeElapsed >= WATCHDOG_KILL_THRESHOLD) {
log.error(`Watchdog timeout`);
if (timeElapsed >= watchdogTimeout * 1000) {
log.error(
`Watchdog timeout (RPC failed, or chain is stuck / not issuing blocks)`
);
await registry.storage.close();
process.exit(1);
}
Expand Down
61 changes: 37 additions & 24 deletions src/index.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,10 @@
import "dotenv/config";

import { program, Option } from "@commander-js/extra-typings";
import {
program,
Option,
InvalidArgumentError,
} from "@commander-js/extra-typings";
import { ReplayTxOptions } from "./types";
import { dumpDb, replayBlock, replayTx, run } from "./commands";
import { initLogging } from "./utils";
Expand All @@ -25,10 +29,10 @@ async function main() {
"--deployment-block <deploymentBlock...>",
"Block number at which the contracts were deployed"
)
.option(
"--page-size <pageSize>",
"Number of blocks to fetch per page",
"5000"
.addOption(
new Option("--page-size <pageSize>", "Number of blocks to fetch per page")
.default("5000")
.argParser(parseIntOption)
)
.option("--dry-run", "Do not publish orders to the OrderBook API", false)
.addOption(
Expand All @@ -37,45 +41,46 @@ async function main() {
.default(false)
)
.option("--disable-api", "Disable the REST API", false)
.option("--api-port <apiPort>", "Port for the REST API", "8080")
.addOption(
new Option("--api-port <apiPort>", "Port for the REST API")
.default("8080")
.argParser(parseIntOption)
)
.option("--slack-webhook <slackWebhook>", "Slack webhook URL")
.option("--one-shot", "Run the watchtower once and exit", false)
.addOption(
new Option(
"--watchdog-timeout <watchdogTimeout>",
"Watchdog timeout (in seconds)"
)
.default("30")
.argParser(parseIntOption)
)
.addOption(logLevelOption)
.action((options) => {
const { logLevel } = options;
const [pageSize, apiPort, watchdogTimeout] = [
options.pageSize,
options.apiPort,
options.watchdogTimeout,
].map((value) => Number(value));

initLogging({ logLevel });
const {
rpc,
deploymentBlock: deploymentBlockEnv,
pageSize: pageSizeEnv,
} = options;
const { rpc, deploymentBlock: deploymentBlockEnv } = options;

// Ensure that the deployment blocks are all numbers
const deploymentBlock = deploymentBlockEnv.map((block) => Number(block));
if (deploymentBlock.some((block) => isNaN(block))) {
throw new Error("Deployment blocks must be numbers");
}

// Ensure that pageSize is a number
const pageSize = Number(pageSizeEnv);
if (isNaN(pageSize)) {
throw new Error("Page size must be a number");
}

// Ensure that the port is a number
const apiPort = Number(options.apiPort);
if (isNaN(apiPort)) {
throw new Error("API port must be a number");
}

// Ensure that the RPCs and deployment blocks are the same length
if (rpc.length !== deploymentBlock.length) {
throw new Error("RPC and deployment blocks must be the same length");
}

// Run the watchtower
run({ ...options, deploymentBlock, pageSize, apiPort });
run({ ...options, deploymentBlock, pageSize, apiPort, watchdogTimeout });
});

program
Expand Down Expand Up @@ -134,6 +139,14 @@ async function main() {
await program.parseAsync();
}

function parseIntOption(option: string, _value: string) {
const parsed = Number(option);
if (isNaN(parsed)) {
throw new InvalidArgumentError(`${option} must be a number`);
}
return parsed.toString();
}

main().catch((error) => {
console.error(error);
process.exit(1);
Expand Down
1 change: 1 addition & 0 deletions src/types/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ export interface RunOptions extends WatchtowerOptions {
oneShot: boolean;
disableApi: boolean;
apiPort: number;
watchdogTimeout: number;
}

export type SingularRunOptions = Omit<RunOptions, "rpc" | "deploymentBlock"> & {
Expand Down

0 comments on commit f994854

Please sign in to comment.