diff --git a/src/domain/checkForAndPlaceOrder.ts b/src/domain/checkForAndPlaceOrder.ts index 36250cb..9945d51 100644 --- a/src/domain/checkForAndPlaceOrder.ts +++ b/src/domain/checkForAndPlaceOrder.ts @@ -5,8 +5,7 @@ import { computeOrderUid, } from "@cowprotocol/contracts"; -import { ethers } from "ethers"; -import { BytesLike } from "ethers/lib/utils"; +import { ethers, utils } from "ethers"; import { ConditionalOrder, OrderStatus } from "../types"; import { @@ -28,9 +27,13 @@ import { SigningScheme, SupportedChainId, formatEpoch, + COW_PROTOCOL_SETTLEMENT_CONTRACT_ADDRESS, } from "@cowprotocol/cow-sdk"; import { ChainContext, SDK_BACKOFF_NUM_OF_ATTEMPTS } from "./chainContext"; import { + pollingDurationSeconds, + pollingByOwnerDurationSeconds, + pollingPostProcessingDurationSeconds, pollingOnChainDurationSeconds, activeOrdersTotal, activeOwnersTotal, @@ -45,8 +48,6 @@ import { import { FilterAction } from "../utils/filterPolicy"; import { validateOrder } from "../utils/filterOrder"; -const GPV2SETTLEMENT = "0x9008D19f58AAbD9eD0D60971565AA8510560ab41"; - const HANDLED_RESULT_CODES = [ PollResultCode.SUCCESS, PollResultCode.TRY_AT_EPOCH, @@ -92,9 +93,7 @@ export async function checkForAndPlaceOrder( const blockNumber = blockNumberOverride || block.number; const blockTimestamp = blockTimestampOverride || block.timestamp; - let hasErrors = false; let ownerCounter = 0; - let orderCounter = 0; const log = getLogger( "checkForAndPlaceOrder:checkForAndPlaceOrder", @@ -103,159 +102,267 @@ export async function checkForAndPlaceOrder( ); log.debug(`Total number of orders: ${numOrders}`); - for (const [owner, conditionalOrders] of ownerOrders.entries()) { - ownerCounter++; - const log = getLogger( - "checkForAndPlaceOrder:checkForAndPlaceOrder", - chainId.toString(), - blockNumber.toString(), - ownerCounter.toString() - ); - const ordersPendingDelete = []; - // enumerate all the `ConditionalOrder`s for a given owner - log.debug(`Process owner ${owner} (${conditionalOrders.size} orders)`); - for (const conditionalOrder of conditionalOrders) { - orderCounter++; - const ownerRef = `${ownerCounter}.${orderCounter}`; - const orderRef = `${chainId}:${ownerRef}@${blockNumber}`; + const blockTimer = pollingDurationSeconds + .labels(chainId.toString(), blockNumber.toString()) + .startTimer(); + + // No guarantee is made that the order of the owners is the same over multiple blocks + const ownerPromises = Array.from(ownerOrders.entries()).map( + async ([owner, conditionalOrders]) => { + const ownerTimer = pollingByOwnerDurationSeconds + .labels(owner, chainId.toString(), blockNumber.toString()) + .startTimer(); const log = getLogger( "checkForAndPlaceOrder:checkForAndPlaceOrder", chainId.toString(), blockNumber.toString(), - ownerRef + (ownerCounter++).toString() ); - const logOrderDetails = `Processing order from TX ${conditionalOrder.tx} with params:`; - - const { result: lastHint } = conditionalOrder.pollResult || {}; - - // Apply filtering policy - if (filterPolicy) { - const filterResult = filterPolicy.preFilter({ - owner, - conditionalOrderParams: conditionalOrder.params, - }); - - switch (filterResult) { - case FilterAction.DROP: - log.debug("Dropping conditional order. Reason: AcceptPolicy: DROP"); - ordersPendingDelete.push(conditionalOrder); - - continue; - case FilterAction.SKIP: - log.debug("Skipping conditional order. Reason: AcceptPolicy: SKIP"); - continue; - } - } - // Check if the order is due (by epoch) - if ( - lastHint?.result === PollResultCode.TRY_AT_EPOCH && - blockTimestamp < lastHint.epoch - ) { - log.debug( - `Skipping conditional order. Reason: Not due yet (TRY_AT_EPOCH=${ - lastHint.epoch - }, ${formatEpoch(lastHint.epoch)}). ${logOrderDetails}`, - conditionalOrder.params - ); - continue; - } + let orderCounter = 0; + // enumerate all the `ConditionalOrder`s for a given owner + log.debug(`Process owner ${owner} (${conditionalOrders.size} orders)`); + const orderPromises = Array.from(conditionalOrders.values()).map( + async (order) => { + const ownerRef = `${ownerCounter}.${(orderCounter++).toString()}`; + const orderRef = `${chainId}:${ownerRef}@${blockNumber}`; + const log = getLogger( + "checkForAndPlaceOrder:checkForAndPlaceOrder", + chainId.toString(), + blockNumber.toString(), + ownerRef + ); + const logOrderDetails = `Processing order from TX ${order.tx} with params:`; + + const { result: lastHint } = order.pollResult || {}; + + // Apply filtering policy + if (filterPolicy) { + const filterResult = filterPolicy.preFilter({ + owner, + conditionalOrderParams: order.params, + }); + + switch (filterResult) { + case FilterAction.DROP: + log.debug( + "Dropping conditional order. Reason: AcceptPolicy: DROP" + ); + + return { + order, + _delete: true, + lastExecutionTimestamp: blockTimestamp, + blockNumber: blockNumber, + unexpectedError: false, + }; + + case FilterAction.SKIP: + log.debug( + "Skipping conditional order. Reason: AcceptPolicy: SKIP" + ); + + return; + } + } + + // Check if the order is due (by epoch) + if ( + lastHint?.result === PollResultCode.TRY_AT_EPOCH && + blockTimestamp < lastHint.epoch + ) { + log.debug( + `Skipping conditional order. Reason: Not due yet (TRY_AT_EPOCH=${ + lastHint.epoch + }, ${formatEpoch(lastHint.epoch)}). ${logOrderDetails}`, + order.params + ); + + return; + } + + // Check if the order is due (by blockNumber) + if ( + lastHint?.result === PollResultCode.TRY_ON_BLOCK && + blockNumber < lastHint.blockNumber + ) { + log.debug( + `Skipping conditional order. Reason: Not due yet (TRY_ON_BLOCK=${ + lastHint.blockNumber + }, in ${ + lastHint.blockNumber - blockNumber + } blocks). ${logOrderDetails}`, + order.params + ); + + return; + } + + // Proceed with the normal check + log.info(`${logOrderDetails}`, order.params); + + const returnValue = { + order, + pollResult: await _processConditionalOrder( + owner, + order, + blockTimestamp, + blockNumber, + context, + orderRef + ), + lastExecutionTimestamp: blockTimestamp, + blockNumber: blockNumber, + _delete: false, + unexpectedError: false, + }; + + const { result } = returnValue.pollResult; + + // Don't try again the same order, in case that's the poll result + if (result === PollResultCode.DONT_TRY_AGAIN) { + returnValue._delete = true; + } + + // Log the result + returnValue.unexpectedError = + result === PollResultCode.UNEXPECTED_ERROR; + + // Print the polling result + const resultDescription = + result + + (result !== PollResultCode.SUCCESS && returnValue.pollResult.reason + ? `. Reason: ${returnValue.pollResult.reason}` + : ""); + + log[returnValue.unexpectedError ? "error" : "info"]( + `Check conditional order result: ${getEmojiByPollResult( + result + )} ${resultDescription}` + ); + if (result === PollResultCode.UNEXPECTED_ERROR) { + log.error( + `UNEXPECTED_ERROR Details:`, + returnValue.pollResult.error + ); + } + + return returnValue; + } + ); - // Check if the order is due (by blockNumber) - if ( - lastHint?.result === PollResultCode.TRY_ON_BLOCK && - blockNumber < lastHint.blockNumber - ) { - log.debug( - `Skipping conditional order. Reason: Not due yet (TRY_ON_BLOCK=${ - lastHint.blockNumber - }, in ${ - lastHint.blockNumber - blockNumber - } blocks). ${logOrderDetails}`, - conditionalOrder.params - ); - continue; - } + // Get all the results and filter out the undefined ones + const results = (await Promise.all(orderPromises)).filter((r) => !!r); - // Proceed with the normal check - log.info(`${logOrderDetails}`, conditionalOrder.params); + // Stop the timer + ownerTimer(); - const pollResult = await _processConditionalOrder( + return { owner, - conditionalOrder, - blockTimestamp, - blockNumber, - context, - orderRef - ); + results, + }; + } + ); - // Don't try again the same order, in case that's the poll result - if (pollResult.result === PollResultCode.DONT_TRY_AGAIN) { - ordersPendingDelete.push(conditionalOrder); - } + // Get all the results + const ownerResults = await Promise.all(ownerPromises); - // Save the latest poll result - conditionalOrder.pollResult = { - lastExecutionTimestamp: blockTimestamp, - blockNumber: blockNumber, + // Stop the timer + blockTimer(); - result: pollResult, - }; + // Start the post-processing timer + const postProcessingTimer = pollingPostProcessingDurationSeconds + .labels(chainId.toString(), blockNumber.toString()) + .startTimer(); - // Log the result - const unexpectedError = - pollResult?.result === PollResultCode.UNEXPECTED_ERROR; + // Now that we have all the results, we can update the registry synchronously + let hasErrors = false; - // Print the polling result - const isError = pollResult.result !== PollResultCode.SUCCESS; - const resultDescription = - pollResult.result + - (isError && pollResult.reason ? `. Reason: ${pollResult.reason}` : ""); + // Process all the orders. We do this in a try/catch so that we can apply the + // post-processing timer. + try { + for (const { owner, results } of ownerResults) { + const conditionalOrders = ownerOrders.get(owner); - log[unexpectedError ? "error" : "info"]( - `Check conditional order result: ${getEmojiByPollResult( - pollResult?.result - )} ${resultDescription}` - ); - if (unexpectedError) { - log.error(`UNEXPECTED_ERROR Details:`, pollResult.error); + if (conditionalOrders === undefined && results.length > 0) { + throw new Error( + "Unexpected error: conditionalOrders is undefined but results is not empty" + ); + } else if (conditionalOrders) { + // Process all the orders + for (const result of results) { + if (!result) { + throw new Error("Unexpected error: orderResult is undefined"); + } + + const { order, _delete, unexpectedError } = result; + + if (unexpectedError) { + hasErrors = true; + } + + // First calculate the `conditionalOrderId` from the `ConditionalOrder` params + const id = ConditionalOrderSDK.leafToId(order.params); + + // Search for the order in the registry and update / delete it + for (const o of Array.from(conditionalOrders.values())) { + if (ConditionalOrderSDK.leafToId(o.params) === id) { + // Delete the order if it was marked for deletion + if (_delete) { + log.debug(`Delete order ${order.tx}`); + conditionalOrders.delete(o); + + // Decrement the total number of orders + activeOrdersTotal.labels(chainId.toString()).dec(); + continue; + } + + // Otherwise, update the order + conditionalOrders.delete(o); + conditionalOrders.add(order); + } + } + } + + // Update the registry + registry.ownerOrders.set(owner, conditionalOrders); } + } - hasErrors ||= unexpectedError; + // It may be handy in other versions of the watch tower implemented in other languages + // to not delete owners, so we can keep track of them. + for (const [owner, conditionalOrders] of Array.from( + ownerOrders.entries() + )) { + if (conditionalOrders.size === 0) { + ownerOrders.delete(owner); + activeOwnersTotal.labels(chainId.toString()).dec(); + } } - // Delete orders we don't want to keep watching - for (const conditionalOrder of ordersPendingDelete) { - const deleted = conditionalOrders.delete(conditionalOrder); - const action = deleted ? "Stop Watching" : "Failed to stop watching"; + // save the registry - don't catch errors here, as it's now a docker container + // and we want to crash if there's an error + await registry.write(); - log.debug(`${action} conditional order from TX ${conditionalOrder.tx}`); - activeOrdersTotal.labels(chainId.toString()).dec(); - } - } + log.debug( + `Total orders after processing all conditional orders: ${registry.numOrders}` + ); - // It may be handy in other versions of the watch tower implemented in other languages - // to not delete owners, so we can keep track of them. - for (const [owner, conditionalOrders] of Array.from(ownerOrders.entries())) { - if (conditionalOrders.size === 0) { - ownerOrders.delete(owner); - activeOwnersTotal.labels(chainId.toString()).dec(); + // Throw execution error if there was at least one error + if (hasErrors) { + throw Error( + `At least one unexpected error processing conditional orders` + ); } + } catch (e: any) { + postProcessingTimer(); + throw e; } - // save the registry - don't catch errors here, as it's now a docker container - // and we want to crash if there's an error - await registry.write(); - - log.debug( - `Total orders after processing all conditional orders: ${registry.numOrders}` - ); - - // Throw execution error if there was at least one error - if (hasErrors) { - throw Error(`At least one unexpected error processing conditional orders`); - } + // Stop the timer + postProcessingTimer(); } + async function _processConditionalOrder( owner: string, conditionalOrder: ConditionalOrder, @@ -410,7 +517,7 @@ function _getOrderUid( name: "Gnosis Protocol", version: "v2", chainId: chainId, - verifyingContract: GPV2SETTLEMENT, + verifyingContract: COW_PROTOCOL_SETTLEMENT_CONTRACT_ADDRESS[chainId], }, { ...orderToSubmit, @@ -428,7 +535,9 @@ function _getOrderUid( * * @param orders All the orders that are being tracked */ -export const _printUnfilledOrders = (orders: Map) => { +export const _printUnfilledOrders = ( + orders: Map +) => { const unfilledOrders = Array.from(orders.entries()) .filter(([_orderUid, status]) => status === OrderStatus.SUBMITTED) // as SUBMITTED != FILLED .map(([orderUid, _status]) => orderUid) diff --git a/src/utils/metrics.ts b/src/utils/metrics.ts index 853094a..3d5293d 100644 --- a/src/utils/metrics.ts +++ b/src/utils/metrics.ts @@ -141,6 +141,24 @@ export const pollingRunsTotal = new client.Counter({ labelNames: ["chain_id", "handler", "owner", "id"], }); +export const pollingDurationSeconds = new client.Histogram({ + name: "watch_tower_polling_duration_seconds", + help: "Duration of polling run", + labelNames: ["chain_id", "block"], +}); + +export const pollingByOwnerDurationSeconds = new client.Histogram({ + name: "watch_tower_polling_by_owner_duration_seconds", + help: "Duration of polling run", + labelNames: ["chain_id", "block", "owner"], +}); + +export const pollingPostProcessingDurationSeconds = new client.Histogram({ + name: "watch_tower_polling_post_processing_duration_seconds", + help: "Duration of polling post processing", + labelNames: ["chain_id", "block"], +}); + export const pollingOnChainChecksTotal = new client.Counter({ name: "watch_tower_polling_onchain_checks_total", help: "Total number of on-chain hint checks",