From b50d5b49e773818dcee9c378cc63d34499f23fdb Mon Sep 17 00:00:00 2001 From: Leandro Date: Wed, 9 Oct 2024 11:26:48 +0100 Subject: [PATCH] fix: attempt to fix memory leak (#161) # Description I noticed that since my last changes #159 and #160 , there has been a very visible memory leak, specially on arb1. ![image](https://github.com/user-attachments/assets/f447e5dd-c503-4eed-bce9-3c50771bdcbd) [graph link](https://g-0263500beb.grafana-workspace.eu-central-1.amazonaws.com/goto/sjeXFwkHg?orgId=1) While I have not been able to detect where the memory leak is, this PR has several improvements that might help. # Changes - [x] Remove a couple of unnecessary Array conversions, using iterators instead - [x] Use a single logger instance in the Registry class - [x] Move block skipping logic early, avoiding unnecessary RPC calls - [x] Reuse `block` instance if already queried ## How to test Run it locally: should work ![image](https://github.com/user-attachments/assets/f2583ed1-4ebf-4d20-a53d-86824d7c7ac2) ~I'll also run it on staging for awhile before taking this PR out of draft.~ can't apply it, pulumi is not happy with me. --- src/domain/polling/index.ts | 5 +- src/services/chain.ts | 100 ++++++++++++++++++++---------------- src/types/model.ts | 20 ++++---- 3 files changed, 69 insertions(+), 56 deletions(-) diff --git a/src/domain/polling/index.ts b/src/domain/polling/index.ts index 7b93eef..a575019 100644 --- a/src/domain/polling/index.ts +++ b/src/domain/polling/index.ts @@ -266,7 +266,7 @@ export async function checkForAndPlaceOrder( // It may be handy in other versions of the watch tower implemented in other languages // to not delete owners, so we can keep track of them. - for (const [owner, conditionalOrders] of Array.from(ownerOrders.entries())) { + for (const [owner, conditionalOrders] of ownerOrders) { if (conditionalOrders.size === 0) { ownerOrders.delete(owner); metrics.activeOwnersTotal.labels(chainId.toString()).dec(); @@ -293,7 +293,8 @@ function _deleteOrders( log: LoggerWithMethods, chainId: SupportedChainId ) { - log.debug(`${ordersPendingDelete.length} to delete`); + ordersPendingDelete.length && + log.debug(`${ordersPendingDelete.length} to delete`); for (const conditionalOrder of ordersPendingDelete) { const deleted = conditionalOrders.delete(conditionalOrder); diff --git a/src/services/chain.ts b/src/services/chain.ts index cb314cc..1abe59e 100644 --- a/src/services/chain.ts +++ b/src/services/chain.ts @@ -320,7 +320,17 @@ export class ChainContext { let lastBlockReceived = lastProcessedBlock; provider.on("block", async (blockNumber: number) => { try { + // Decide if we should process this block before polling for events + const shouldProcessBlock = + blockNumber % this.processEveryNumBlocks === 0; + + if (!shouldProcessBlock) { + log.debug(`Skipping block ${blockNumber}`); + return; + } + log.debug(`New block ${blockNumber}`); + const block = await provider.getBlock(blockNumber); // Set the block time metric @@ -348,6 +358,7 @@ export class ChainContext { await processBlockAndPersist({ context: this, + block, blockNumber, events, log, @@ -442,7 +453,7 @@ async function processBlock( blockNumberOverride?: number, blockTimestampOverride?: number ) { - const { provider, chainId, processEveryNumBlocks } = context; + const { provider, chainId } = context; const timer = metrics.processBlockDurationSeconds .labels(context.chainId.toString()) .startTimer(); @@ -471,31 +482,27 @@ async function processBlock( } } - // Decide if we should process this block - const shouldProcessBlock = block.number % processEveryNumBlocks === 0; - // Check programmatic orders and place orders if necessary - if (shouldProcessBlock) { - const result = await checkForAndPlaceOrder( - context, - block, - blockNumberOverride, - blockTimestampOverride - ) - .then(() => true) - .catch(() => { - hasErrors = true; - log.error(`Error running "checkForAndPlaceOrder" action`); - return false; - }); - log.debug( - `Result of "checkForAndPlaceOrder" action for block ${ - block.number - }: ${_formatResult(result)}` - ); - } + const result = await checkForAndPlaceOrder( + context, + block, + blockNumberOverride, + blockTimestampOverride + ) + .then(() => true) + .catch(() => { + hasErrors = true; + log.error(`Error running "checkForAndPlaceOrder" action`); + return false; + }); + log.debug( + `Result of "checkForAndPlaceOrder" action for block ${ + block.number + }: ${_formatResult(result)}` + ); timer(); + if (hasErrors) { throw new Error("Errors found in processing block"); } @@ -524,26 +531,31 @@ async function persistLastProcessedBlock(params: { async function processBlockAndPersist(params: { context: ChainContext; + block?: providers.Block; blockNumber: number; events: ConditionalOrderCreatedEvent[]; currentBlock?: providers.Block; log: LoggerWithMethods; provider: ethers.providers.Provider; }) { - const { context, blockNumber, events, currentBlock, log, provider } = params; - const block = await provider.getBlock(blockNumber); + const { context, block, blockNumber, events, currentBlock, log, provider } = + params; + + // Accept optional block object, in case it was already fetched + const _block = block || (await provider.getBlock(blockNumber)); + try { await processBlock( context, - block, + _block, events, currentBlock?.number, currentBlock?.timestamp ); } catch (err) { - log.error(`Error processing block ${block.number}`, err); + log.error(`Error processing block ${_block.number}`, err); } finally { - return persistLastProcessedBlock({ context, block, log }); + return persistLastProcessedBlock({ context, block: _block, log }); } } @@ -563,27 +575,25 @@ async function pollContractForEvents( topics: [topic], }); - return logs - .map((event) => { - try { - const decoded = composableCow.interface.decodeEventLog( - topic, - event.data, - event.topics - ) as unknown as ConditionalOrderCreatedEvent; + return logs.reduce((acc, event) => { + try { + const decoded = composableCow.interface.decodeEventLog( + topic, + event.data, + event.topics + ) as unknown as ConditionalOrderCreatedEvent; - return { + if (!addresses || addresses.includes(decoded.args.owner)) { + acc.push({ ...decoded, ...event, - }; - } catch { - return null; + }); } - }) - .filter((e): e is ConditionalOrderCreatedEvent => e !== null) - .filter((e): e is ConditionalOrderCreatedEvent => { - return addresses ? addresses.includes(e.args.owner) : true; - }); + } catch { + // Ignore errors and do not add to the accumulator + } + return acc; + }, []); } function _formatResult(result: boolean) { diff --git a/src/types/model.ts b/src/types/model.ts index e6fe803..adc2503 100644 --- a/src/types/model.ts +++ b/src/types/model.ts @@ -106,6 +106,7 @@ export class Registry { network: string; lastNotifiedError: Date | null; lastProcessedBlock: RegistryBlock | null; + readonly logger = getLogger("Registry"); /** * Instantiates a registry. @@ -189,7 +190,11 @@ export class Registry { } get numOrders(): number { - return Array.from(this.ownerOrders.values()).flatMap((o) => o).length; + let count = 0; + for (const orders of this.ownerOrders.values()) { + count += orders.size; // Count each set's size directly + } + return count; } /** @@ -241,13 +246,12 @@ export class Registry { // Write all atomically await batch.write(); - const log = getLogger( - `Registry:write:${this.version}:${this.network}:${ + this.logger.debug( + `write:${this.version}:${this.network}:${ this.lastProcessedBlock?.number - }:${this.lastNotifiedError || ""}` + }:${this.lastNotifiedError || ""}`, + "batch written 📝" ); - - log.debug("batch written 📝"); } public stringifyOrders(): string { @@ -300,9 +304,7 @@ async function loadOwnerOrders( ); // Parse conditional orders registry (for the persisted version, converting it to the last version) - const ownerOrders = parseConditionalOrders(!!str ? str : undefined, version); - - return ownerOrders; + return parseConditionalOrders(!!str ? str : undefined, version); } function parseConditionalOrders(