diff --git a/src/services/chain.ts b/src/services/chain.ts index 872a256..ef5fa29 100644 --- a/src/services/chain.ts +++ b/src/services/chain.ts @@ -1,6 +1,5 @@ import { Registry, - ReplayPlan, ConditionalOrderCreatedEvent, Multicall3, ComposableCoW, @@ -18,6 +17,7 @@ import { addContract } from "../domain/events"; import { checkForAndPlaceOrder } from "../domain/polling"; import { ethers, providers } from "ethers"; import { + LoggerWithMethods, composableCowContract, getLogger, isRunningInKubernetesPod, @@ -170,7 +170,7 @@ export class ChainContext { public async warmUp(oneShot?: boolean) { const { provider, chainId } = this; const log = getLogger("chainContext:warmUp", chainId.toString()); - const { lastProcessedBlock } = this.registry; + let { lastProcessedBlock } = this.registry; const { pageSize } = this; // Set the block height metric @@ -185,7 +185,6 @@ export class ChainContext { let currentBlock = await provider.getBlock("latest"); let printSyncInfo = true; // Print sync info only once - let plan: ReplayPlan = {}; let toBlock: "latest" | number = 0; do { do { @@ -228,67 +227,57 @@ export class ChainContext { log.debug(`Found ${events.length} events`); } - // process the events - for (const event of events) { - if (plan[event.blockNumber] === undefined) { - plan[event.blockNumber] = new Set(); + // Get relevant block numbers to process (the ones with relevant events) + const eventsByBlock = events.reduce< + Record + >((acc, event) => { + const events = acc[event.blockNumber]; + if (events) { + events.push(event); + } else { + acc[event.blockNumber] = [event]; } - plan[event.blockNumber].add(event); + return acc; + }, {}); + + // Process blocks in order + for (const blockNumberKey of Object.keys(eventsByBlock).sort()) { + const blockNumber = Number(blockNumberKey); + await processBlockAndPersist({ + context: this, + blockNumber, + events: eventsByBlock[blockNumber], + currentBlock, + log, + provider, + }); } + // Persist "toBlock" as the last block (even if there's no events, we are caught up until this block) + lastProcessedBlock = await persistLastProcessedBlock({ + context: this, + block: await provider.getBlock(toBlock), + log, + }); + // only possible string value for toBlock is 'latest' if (typeof toBlock === "number") { fromBlock = toBlock + 1; } } while (toBlock !== "latest" && toBlock !== currentBlock.number); - // Replay only the blocks that had some events. - for (const [blockNumber, events] of Object.entries(plan)) { - log.debug(`Processing block ${blockNumber}`); - const historicalBlock = await provider.getBlock(Number(blockNumber)); - try { - await processBlock( - this, - historicalBlock, - events, - currentBlock.number, - currentBlock.timestamp - ); - - // Set the last processed block to this iteration's block number - this.registry.lastProcessedBlock = - blockToRegistryBlock(historicalBlock); - await this.registry.write(); - - // Set the block height metric - metrics.blockHeight - .labels(chainId.toString()) - .set(Number(blockNumber)); - } catch (err) { - log.error(`Error processing block ${blockNumber}`, err); - } - - log.debug(`Block ${blockNumber} has been processed`); - } - - // Set the last processed block to the current block number - this.registry.lastProcessedBlock = blockToRegistryBlock(currentBlock); - - // Save the registry - await this.registry.write(); - // It may have taken some time to process the blocks, so refresh the current block number // and check if we are in sync currentBlock = await provider.getBlock("latest"); // If we are in sync, let it be known - if (currentBlock.number === this.registry.lastProcessedBlock.number) { + const lastProcessedBlockNumber = lastProcessedBlock?.number || 0; + if (currentBlock.number === lastProcessedBlockNumber) { this.sync = ChainSync.IN_SYNC; } else { // Otherwise, we need to keep processing blocks - fromBlock = this.registry.lastProcessedBlock.number + 1; - plan = {}; + fromBlock = lastProcessedBlockNumber + 1; } } while (this.sync === ChainSync.SYNCING); @@ -297,9 +286,7 @@ export class ChainContext { oneShot ? "Chain watcher is in sync" : "Chain watcher is warmed up" }` ); - log.debug( - `Last processed block: ${this.registry.lastProcessedBlock.number}` - ); + log.debug(`Last processed block: ${lastProcessedBlock}`); // If one-shot, return if (oneShot) { @@ -325,6 +312,7 @@ export class ChainContext { provider.on("block", async (blockNumber: number) => { try { const block = await provider.getBlock(blockNumber); + log.debug(`New block ${blockNumber}`); // Set the block time metric @@ -350,18 +338,13 @@ export class ChainContext { this ); - try { - await processBlock(this, block, events); - - // Block height metric - this.registry.lastProcessedBlock = blockToRegistryBlock(block); - this.registry.write(); - metrics.blockHeight.labels(chainId.toString()).set(blockNumber); - } catch { - log.error(`Error processing block ${blockNumber}`); - } - - log.debug(`Block ${blockNumber} has been processed`); + await processBlockAndPersist({ + context: this, + blockNumber, + events, + log, + provider, + }); } catch (error) { log.error( `Error in pollContractForEvents for block ${blockNumber}`, @@ -505,6 +488,52 @@ async function processBlock( } } +async function persistLastProcessedBlock(params: { + context: ChainContext; + block: ethers.providers.Block; + log: LoggerWithMethods; +}) { + const { context, block, log } = params; + const blockNumber = block.number; + + // Set the last processed block to the current block number + context.registry.lastProcessedBlock = blockToRegistryBlock(block); + + // Save the registry + await context.registry.write(); + log.debug(`Block ${blockNumber} has been processed`); + + // Set the block height metric + metrics.blockHeight.labels(context.toString()).set(blockNumber); + + return context.registry.lastProcessedBlock; +} + +async function processBlockAndPersist(params: { + context: ChainContext; + blockNumber: number; + events: ConditionalOrderCreatedEvent[]; + currentBlock?: providers.Block; + log: LoggerWithMethods; + provider: ethers.providers.Provider; +}) { + const { context, blockNumber, events, currentBlock, log, provider } = params; + const block = await provider.getBlock(blockNumber); + try { + await processBlock( + context, + block, + events, + currentBlock?.number, + currentBlock?.timestamp + ); + } catch (err) { + log.error(`Error processing block ${block.number}`, err); + } finally { + return persistLastProcessedBlock({ context, block, log }); + } +} + async function pollContractForEvents( fromBlock: number, toBlock: number | "latest", diff --git a/src/types/model.ts b/src/types/model.ts index 3820f37..6171d9d 100644 --- a/src/types/model.ts +++ b/src/types/model.ts @@ -2,7 +2,6 @@ import Slack = require("node-slack"); import { BytesLike, ethers } from "ethers"; -import type { ConditionalOrderCreatedEvent } from "./generated/ComposableCoW"; import { ConditionalOrderParams, PollResult } from "@cowprotocol/cow-sdk"; import { DBService } from "../services"; import { metrics } from "../utils"; @@ -27,11 +26,6 @@ export interface ExecutionContext { storage: DBService; } -// Todo: This should also encompass `MerkleRootSet` -export interface ReplayPlan { - [key: number]: Set; -} - /** * A merkle proof is a set of parameters: * - `merkleRoot`: the merkle root of the conditional order