Skip to content

Commit

Permalink
feat: improve first sync (#154)
Browse files Browse the repository at this point in the history
# Description
This PR is an attempt to improve the first sync for watch-tower

## Context: Before this PR
The issue with the current model is that
1) It enter SYNC mode
2) In SYNC mode it will check all pending blocks from last processed
until the tip of the blockchains
- Instead of actually processing the blocks, its just "saving" the block
for processing later (its creating a sync plan)
3) Once in SYNC, it will apply the plan
- This time it will process for real the blocks

The issues we experience is becasue between 2 and 3 there's a delay and
the metric of "blocks processed" goes down. Thats our main metric.

The solution is to not make a plan and process blocks as we find them
instead of waiting to be fully catched up.

## Proposed solution
I just refactored the 2 points where we do the processing of blocks
(applying the plan, and when watching for new blocks).

Now I use an auxiliary function that will process the block, update the
metrics, and persist in the database.

This should help in several ways:
- We will improve the metric of "blocks processed". This metric is key
for our alerts. We don't want to be notified if during a restart
watchtower needs some time to consume all pending blocks
- Additionally should help with very big syncs, like first syncs or big
downtimes. Before this PR it was indexing all blocks before processing
any block. This approach should be better because in case of a crash,
next run will resume the work where it left it.
- Derived from the item above, I suspect the memory issues could be
related to applying a big plan when the pod restarts

## Test
I haven't tested this much and is very sensitive change. I would love to
get some feedback first, then I'd like to test it in staging.


I just did a minimal test of running it in Arbitrum locally:

<img width="1429" alt="image"
src="https://github.com/cowprotocol/watch-tower/assets/2352112/362d997e-c17b-458b-be79-ce309986c433">


The watch-tower arrived to a SYNC state and processed blocks as it found
them:
<img width="1089" alt="image"
src="https://github.com/cowprotocol/watch-tower/assets/2352112/93cfa98f-faa8-4e1d-9157-0644159abaff">
  • Loading branch information
anxolin authored Jun 25, 2024
1 parent 8a4c044 commit fd653a7
Show file tree
Hide file tree
Showing 2 changed files with 90 additions and 67 deletions.
151 changes: 90 additions & 61 deletions src/services/chain.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
import {
Registry,
ReplayPlan,
ConditionalOrderCreatedEvent,
Multicall3,
ComposableCoW,
Expand All @@ -18,6 +17,7 @@ import { addContract } from "../domain/events";
import { checkForAndPlaceOrder } from "../domain/polling";
import { ethers, providers } from "ethers";
import {
LoggerWithMethods,
composableCowContract,
getLogger,
isRunningInKubernetesPod,
Expand Down Expand Up @@ -170,7 +170,7 @@ export class ChainContext {
public async warmUp(oneShot?: boolean) {
const { provider, chainId } = this;
const log = getLogger("chainContext:warmUp", chainId.toString());
const { lastProcessedBlock } = this.registry;
let { lastProcessedBlock } = this.registry;
const { pageSize } = this;

// Set the block height metric
Expand All @@ -185,7 +185,6 @@ export class ChainContext {
let currentBlock = await provider.getBlock("latest");

let printSyncInfo = true; // Print sync info only once
let plan: ReplayPlan = {};
let toBlock: "latest" | number = 0;
do {
do {
Expand Down Expand Up @@ -228,67 +227,57 @@ export class ChainContext {
log.debug(`Found ${events.length} events`);
}

// process the events
for (const event of events) {
if (plan[event.blockNumber] === undefined) {
plan[event.blockNumber] = new Set();
// Get relevant block numbers to process (the ones with relevant events)
const eventsByBlock = events.reduce<
Record<number, ConditionalOrderCreatedEvent[]>
>((acc, event) => {
const events = acc[event.blockNumber];
if (events) {
events.push(event);
} else {
acc[event.blockNumber] = [event];
}

plan[event.blockNumber].add(event);
return acc;
}, {});

// Process blocks in order
for (const blockNumberKey of Object.keys(eventsByBlock).sort()) {
const blockNumber = Number(blockNumberKey);
await processBlockAndPersist({
context: this,
blockNumber,
events: eventsByBlock[blockNumber],
currentBlock,
log,
provider,
});
}

// Persist "toBlock" as the last block (even if there's no events, we are caught up until this block)
lastProcessedBlock = await persistLastProcessedBlock({
context: this,
block: await provider.getBlock(toBlock),
log,
});

// only possible string value for toBlock is 'latest'
if (typeof toBlock === "number") {
fromBlock = toBlock + 1;
}
} while (toBlock !== "latest" && toBlock !== currentBlock.number);

// Replay only the blocks that had some events.
for (const [blockNumber, events] of Object.entries(plan)) {
log.debug(`Processing block ${blockNumber}`);
const historicalBlock = await provider.getBlock(Number(blockNumber));
try {
await processBlock(
this,
historicalBlock,
events,
currentBlock.number,
currentBlock.timestamp
);

// Set the last processed block to this iteration's block number
this.registry.lastProcessedBlock =
blockToRegistryBlock(historicalBlock);
await this.registry.write();

// Set the block height metric
metrics.blockHeight
.labels(chainId.toString())
.set(Number(blockNumber));
} catch (err) {
log.error(`Error processing block ${blockNumber}`, err);
}

log.debug(`Block ${blockNumber} has been processed`);
}

// Set the last processed block to the current block number
this.registry.lastProcessedBlock = blockToRegistryBlock(currentBlock);

// Save the registry
await this.registry.write();

// It may have taken some time to process the blocks, so refresh the current block number
// and check if we are in sync
currentBlock = await provider.getBlock("latest");

// If we are in sync, let it be known
if (currentBlock.number === this.registry.lastProcessedBlock.number) {
const lastProcessedBlockNumber = lastProcessedBlock?.number || 0;
if (currentBlock.number === lastProcessedBlockNumber) {
this.sync = ChainSync.IN_SYNC;
} else {
// Otherwise, we need to keep processing blocks
fromBlock = this.registry.lastProcessedBlock.number + 1;
plan = {};
fromBlock = lastProcessedBlockNumber + 1;
}
} while (this.sync === ChainSync.SYNCING);

Expand All @@ -297,9 +286,7 @@ export class ChainContext {
oneShot ? "Chain watcher is in sync" : "Chain watcher is warmed up"
}`
);
log.debug(
`Last processed block: ${this.registry.lastProcessedBlock.number}`
);
log.debug(`Last processed block: ${lastProcessedBlock}`);

// If one-shot, return
if (oneShot) {
Expand All @@ -325,6 +312,7 @@ export class ChainContext {
provider.on("block", async (blockNumber: number) => {
try {
const block = await provider.getBlock(blockNumber);

log.debug(`New block ${blockNumber}`);

// Set the block time metric
Expand All @@ -350,18 +338,13 @@ export class ChainContext {
this
);

try {
await processBlock(this, block, events);

// Block height metric
this.registry.lastProcessedBlock = blockToRegistryBlock(block);
this.registry.write();
metrics.blockHeight.labels(chainId.toString()).set(blockNumber);
} catch {
log.error(`Error processing block ${blockNumber}`);
}

log.debug(`Block ${blockNumber} has been processed`);
await processBlockAndPersist({
context: this,
blockNumber,
events,
log,
provider,
});
} catch (error) {
log.error(
`Error in pollContractForEvents for block ${blockNumber}`,
Expand Down Expand Up @@ -505,6 +488,52 @@ async function processBlock(
}
}

async function persistLastProcessedBlock(params: {
context: ChainContext;
block: ethers.providers.Block;
log: LoggerWithMethods;
}) {
const { context, block, log } = params;
const blockNumber = block.number;

// Set the last processed block to the current block number
context.registry.lastProcessedBlock = blockToRegistryBlock(block);

// Save the registry
await context.registry.write();
log.debug(`Block ${blockNumber} has been processed`);

// Set the block height metric
metrics.blockHeight.labels(context.toString()).set(blockNumber);

return context.registry.lastProcessedBlock;
}

async function processBlockAndPersist(params: {
context: ChainContext;
blockNumber: number;
events: ConditionalOrderCreatedEvent[];
currentBlock?: providers.Block;
log: LoggerWithMethods;
provider: ethers.providers.Provider;
}) {
const { context, blockNumber, events, currentBlock, log, provider } = params;
const block = await provider.getBlock(blockNumber);
try {
await processBlock(
context,
block,
events,
currentBlock?.number,
currentBlock?.timestamp
);
} catch (err) {
log.error(`Error processing block ${block.number}`, err);
} finally {
return persistLastProcessedBlock({ context, block, log });
}
}

async function pollContractForEvents(
fromBlock: number,
toBlock: number | "latest",
Expand Down
6 changes: 0 additions & 6 deletions src/types/model.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ import Slack = require("node-slack");

import { BytesLike, ethers } from "ethers";

import type { ConditionalOrderCreatedEvent } from "./generated/ComposableCoW";
import { ConditionalOrderParams, PollResult } from "@cowprotocol/cow-sdk";
import { DBService } from "../services";
import { metrics } from "../utils";
Expand All @@ -27,11 +26,6 @@ export interface ExecutionContext {
storage: DBService;
}

// Todo: This should also encompass `MerkleRootSet`
export interface ReplayPlan {
[key: number]: Set<ConditionalOrderCreatedEvent>;
}

/**
* A merkle proof is a set of parameters:
* - `merkleRoot`: the merkle root of the conditional order
Expand Down

0 comments on commit fd653a7

Please sign in to comment.