Skip to content

Commit

Permalink
ft_watcher: PR comments
Browse files Browse the repository at this point in the history
Signed-off-by: bingyuyap <[email protected]>
  • Loading branch information
bingyuyap authored and evan-gray committed Aug 4, 2024
1 parent 42d1808 commit ece11a5
Show file tree
Hide file tree
Showing 4 changed files with 46 additions and 19 deletions.
1 change: 0 additions & 1 deletion watcher/src/databases/BigtableDatabase.ts
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,6 @@ export class BigtableDatabase extends Database {
this.signedVAAsTableId = assertEnvironmentVariable('BIGTABLE_SIGNED_VAAS_TABLE_ID');
this.vaasByTxHashTableId = assertEnvironmentVariable('BIGTABLE_VAAS_BY_TX_HASH_TABLE_ID');
this.instanceId = assertEnvironmentVariable('BIGTABLE_INSTANCE_ID');
// TODO: make these const?
this.latestCollectionName = assertEnvironmentVariable('FIRESTORE_LATEST_COLLECTION');
this.latestNTTCollectionName = assertEnvironmentVariable('FIRESTORE_LATEST_NTT_COLLECTION');
this.latestFTCollectionName = assertEnvironmentVariable('FIRESTORE_LATEST_FT_COLLECTION');
Expand Down
59 changes: 44 additions & 15 deletions watcher/src/watchers/FTEVMWatcher.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import { MarketOrder } from '../fastTransfer/types';
import { Block } from './EVMWatcher';
import { BigNumber } from 'ethers';
import axios from 'axios';

import { sleep } from '@wormhole-foundation/wormhole-monitor-common';
export type BlockTag = 'finalized' | 'safe' | 'latest';

export class FTEVMWatcher extends Watcher {
Expand Down Expand Up @@ -39,7 +39,6 @@ export class FTEVMWatcher extends Watcher {
this.rpc = RPCS_BY_CHAIN[this.network][this.chain]!;
this.parser = new TokenRouterParser(this.network, chain, this.provider);
this.logger.debug('FTWatcher', network, chain, finalizedBlockTag);

// hacky way to not connect to the db in tests
// this is to allow ci to run without a db
if (isTest) {
Expand Down Expand Up @@ -128,28 +127,58 @@ export class FTEVMWatcher extends Watcher {
const { results, lastBlockTime } = await this.parser.getFTResultsInRange(fromBlock, toBlock);

if (results.length) {
try {
await this.saveFastTransfers(results);
} catch (e) {
this.logger.error(e);
}
await this.saveFastTransfers(results, fromBlock, toBlock);
}
return makeBlockKey(toBlock.toString(), lastBlockTime.toString());
}

async saveFastTransfers(fastTransfers: MarketOrder[]): Promise<void> {
// this is to allow ci to run without a db
// saves fast transfers in smaller batches to reduce the impact in any case anything fails
// retry with exponential backoff is used here
async saveFastTransfers(
fastTransfers: MarketOrder[],
fromBlock: number,
toBlock: number
): Promise<void> {
if (!this.pg) {
return;
}
this.logger.debug(`saving ${fastTransfers.length} fast transfers`);

// Batch insert the fast transfers
try {
await this.pg('market_orders').insert(fastTransfers).onConflict('fast_vaa_id').merge();
} catch (e) {
this.logger.error(`Error saving fast transfers ${e}`);
const batchSize = 50;
const maxRetries = 3;
const totalBatches = Math.ceil(fastTransfers.length / batchSize);

this.logger.debug(
`Attempting to save ${fastTransfers.length} fast transfers in batches of ${batchSize}`
);

for (let batchIndex = 0; batchIndex < fastTransfers.length; batchIndex += batchSize) {
const batch = fastTransfers.slice(batchIndex, batchIndex + batchSize);
const batchNumber = Math.floor(batchIndex / batchSize) + 1;

for (let attempt = 1; attempt <= maxRetries; attempt++) {
try {
await this.pg('market_orders').insert(batch).onConflict('fast_vaa_id').merge();
this.logger.info(
`Successfully saved batch ${batchNumber}/${totalBatches} (${batch.length} transfers)`
);
break;
} catch (e) {
if (attempt === maxRetries) {
this.logger.error(
`Failed to save batch ${batchNumber}/${totalBatches} from block ${fromBlock} - ${toBlock} after ${maxRetries} attempts`,
e
);
} else {
// Wait before retrying (exponential backoff)
this.logger.warn(
`Attempt ${attempt} failed for batch ${batchNumber}/${totalBatches}. Retrying...`
);
await sleep(1000 * Math.pow(2, attempt - 1));
}
}
}
}
this.logger.info(`Completed saving fast transfers from block ${fromBlock} - ${toBlock}`);
}
}

Expand Down
1 change: 0 additions & 1 deletion watcher/src/watchers/FTSolanaWatcher.ts
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,6 @@ export class FTSolanaWatcher extends SolanaWatcher {
});
}

// TODO: Modify this so watcher can actually call this function (Add enum for mode)
async getFtMessagesForBlocks(fromSlot: number, toSlot: number): Promise<string> {
if (fromSlot > toSlot) throw new Error('solana: invalid block range');

Expand Down
4 changes: 2 additions & 2 deletions watcher/src/watchers/Watcher.ts
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,10 @@ export class Watcher {
this.chain = chain;
this.mode = mode;

// `vaa` -> ''
// `vaa` -> 'VAA_'
// `ntt` -> 'NTT_'
// `ft` -> 'FT_'
const loggerPrefix = mode === 'vaa' ? '' : mode.toUpperCase() + '_';
const loggerPrefix = mode.toUpperCase() + '_';
this.logger = getLogger(loggerPrefix + chain);
}

Expand Down

0 comments on commit ece11a5

Please sign in to comment.