diff --git a/watcher/src/databases/BigtableDatabase.ts b/watcher/src/databases/BigtableDatabase.ts index 33244d9a..0e69b7cd 100644 --- a/watcher/src/databases/BigtableDatabase.ts +++ b/watcher/src/databases/BigtableDatabase.ts @@ -49,7 +49,6 @@ export class BigtableDatabase extends Database { this.signedVAAsTableId = assertEnvironmentVariable('BIGTABLE_SIGNED_VAAS_TABLE_ID'); this.vaasByTxHashTableId = assertEnvironmentVariable('BIGTABLE_VAAS_BY_TX_HASH_TABLE_ID'); this.instanceId = assertEnvironmentVariable('BIGTABLE_INSTANCE_ID'); - // TODO: make these const? this.latestCollectionName = assertEnvironmentVariable('FIRESTORE_LATEST_COLLECTION'); this.latestNTTCollectionName = assertEnvironmentVariable('FIRESTORE_LATEST_NTT_COLLECTION'); this.latestFTCollectionName = assertEnvironmentVariable('FIRESTORE_LATEST_FT_COLLECTION'); diff --git a/watcher/src/watchers/FTEVMWatcher.ts b/watcher/src/watchers/FTEVMWatcher.ts index af306672..1ba86934 100644 --- a/watcher/src/watchers/FTEVMWatcher.ts +++ b/watcher/src/watchers/FTEVMWatcher.ts @@ -11,7 +11,7 @@ import { MarketOrder } from '../fastTransfer/types'; import { Block } from './EVMWatcher'; import { BigNumber } from 'ethers'; import axios from 'axios'; - +import { sleep } from '@wormhole-foundation/wormhole-monitor-common'; export type BlockTag = 'finalized' | 'safe' | 'latest'; export class FTEVMWatcher extends Watcher { @@ -39,7 +39,6 @@ export class FTEVMWatcher extends Watcher { this.rpc = RPCS_BY_CHAIN[this.network][this.chain]!; this.parser = new TokenRouterParser(this.network, chain, this.provider); this.logger.debug('FTWatcher', network, chain, finalizedBlockTag); - // hacky way to not connect to the db in tests // this is to allow ci to run without a db if (isTest) { @@ -128,28 +127,58 @@ export class FTEVMWatcher extends Watcher { const { results, lastBlockTime } = await this.parser.getFTResultsInRange(fromBlock, toBlock); if (results.length) { - try { - await this.saveFastTransfers(results); - } catch (e) { - this.logger.error(e); - } + await this.saveFastTransfers(results, fromBlock, toBlock); } return makeBlockKey(toBlock.toString(), lastBlockTime.toString()); } - async saveFastTransfers(fastTransfers: MarketOrder[]): Promise { - // this is to allow ci to run without a db + // saves fast transfers in smaller batches to reduce the impact in any case anything fails + // retry with exponential backoff is used here + async saveFastTransfers( + fastTransfers: MarketOrder[], + fromBlock: number, + toBlock: number + ): Promise { if (!this.pg) { return; } - this.logger.debug(`saving ${fastTransfers.length} fast transfers`); - // Batch insert the fast transfers - try { - await this.pg('market_orders').insert(fastTransfers).onConflict('fast_vaa_id').merge(); - } catch (e) { - this.logger.error(`Error saving fast transfers ${e}`); + const batchSize = 50; + const maxRetries = 3; + const totalBatches = Math.ceil(fastTransfers.length / batchSize); + + this.logger.debug( + `Attempting to save ${fastTransfers.length} fast transfers in batches of ${batchSize}` + ); + + for (let batchIndex = 0; batchIndex < fastTransfers.length; batchIndex += batchSize) { + const batch = fastTransfers.slice(batchIndex, batchIndex + batchSize); + const batchNumber = Math.floor(batchIndex / batchSize) + 1; + + for (let attempt = 1; attempt <= maxRetries; attempt++) { + try { + await this.pg('market_orders').insert(batch).onConflict('fast_vaa_id').merge(); + this.logger.info( + `Successfully saved batch ${batchNumber}/${totalBatches} (${batch.length} transfers)` + ); + break; + } catch (e) { + if (attempt === maxRetries) { + this.logger.error( + `Failed to save batch ${batchNumber}/${totalBatches} from block ${fromBlock} - ${toBlock} after ${maxRetries} attempts`, + e + ); + } else { + // Wait before retrying (exponential backoff) + this.logger.warn( + `Attempt ${attempt} failed for batch ${batchNumber}/${totalBatches}. Retrying...` + ); + await sleep(1000 * Math.pow(2, attempt - 1)); + } + } + } } + this.logger.info(`Completed saving fast transfers from block ${fromBlock} - ${toBlock}`); } } diff --git a/watcher/src/watchers/FTSolanaWatcher.ts b/watcher/src/watchers/FTSolanaWatcher.ts index eb1a38e0..f0b7828e 100644 --- a/watcher/src/watchers/FTSolanaWatcher.ts +++ b/watcher/src/watchers/FTSolanaWatcher.ts @@ -107,7 +107,6 @@ export class FTSolanaWatcher extends SolanaWatcher { }); } - // TODO: Modify this so watcher can actually call this function (Add enum for mode) async getFtMessagesForBlocks(fromSlot: number, toSlot: number): Promise { if (fromSlot > toSlot) throw new Error('solana: invalid block range'); diff --git a/watcher/src/watchers/Watcher.ts b/watcher/src/watchers/Watcher.ts index 4e33f9b0..b87b1a8f 100644 --- a/watcher/src/watchers/Watcher.ts +++ b/watcher/src/watchers/Watcher.ts @@ -23,10 +23,10 @@ export class Watcher { this.chain = chain; this.mode = mode; - // `vaa` -> '' + // `vaa` -> 'VAA_' // `ntt` -> 'NTT_' // `ft` -> 'FT_' - const loggerPrefix = mode === 'vaa' ? '' : mode.toUpperCase() + '_'; + const loggerPrefix = mode.toUpperCase() + '_'; this.logger = getLogger(loggerPrefix + chain); }