diff --git a/bots/package-lock.json b/bots/package-lock.json index e6a7dce8..f69c0050 100644 --- a/bots/package-lock.json +++ b/bots/package-lock.json @@ -10,7 +10,7 @@ "license": "MIT", "dependencies": { "@initia/builder.js": "^0.0.9", - "@initia/initia.js": "^0.1.15", + "@initia/initia.js": "^0.1.19", "@koa/cors": "^4.0.0", "@sentry/node": "^7.60.0", "@types/bluebird": "^3.5.38", @@ -1129,11 +1129,11 @@ } }, "node_modules/@initia/initia.js": { - "version": "0.1.15", - "resolved": "https://registry.npmjs.org/@initia/initia.js/-/initia.js-0.1.15.tgz", - "integrity": "sha512-EKNXGe7D7shd1DiRJR//zkEpRN2ijZBN3WJyTNZ0nnNpTJVAl2BWDLA1uxfQTu62PqkBGIkO8/DjjEtHRCqPlA==", + "version": "0.1.19", + "resolved": "https://registry.npmjs.org/@initia/initia.js/-/initia.js-0.1.19.tgz", + "integrity": "sha512-OKehNvBE4hbeM4HbqOEucb4IHWSOg8S9GnvcmItHqIhqKcDiX6ZkzthqdZz2q9E1xkiUHUckEbwkcZC/gf473g==", "dependencies": { - "@initia/initia.proto": "^0.1.13", + "@initia/initia.proto": "^0.1.18", "@initia/opinit.proto": "^0.0.1", "@ledgerhq/hw-transport": "^6.27.12", "@ledgerhq/hw-transport-webhid": "^6.27.12", @@ -1164,9 +1164,9 @@ } }, "node_modules/@initia/initia.proto": { - "version": "0.1.13", - "resolved": "https://registry.npmjs.org/@initia/initia.proto/-/initia.proto-0.1.13.tgz", - "integrity": "sha512-pJx/ny6J1rcZI0NSlrYx2tkzcXUnbPMXeYedcTxxnvz3LOY6Phe/1wW6c8qPJzmhBsZmLes+Uy6FeytSRotekw==", + "version": "0.1.18", + "resolved": "https://registry.npmjs.org/@initia/initia.proto/-/initia.proto-0.1.18.tgz", + "integrity": "sha512-98hSjstgjjzfasHoGIixWP0DiC1kNONyfpNCZxQ21DJAmK6wn1L/Ae51tF5N79J4UAstwTTXkmKrPDRGla9fJA==", "dependencies": { "@improbable-eng/grpc-web": "^0.15.0", "google-protobuf": "^3.21.0", @@ -17874,11 +17874,11 @@ } }, "@initia/initia.js": { - "version": "0.1.15", - "resolved": "https://registry.npmjs.org/@initia/initia.js/-/initia.js-0.1.15.tgz", - "integrity": "sha512-EKNXGe7D7shd1DiRJR//zkEpRN2ijZBN3WJyTNZ0nnNpTJVAl2BWDLA1uxfQTu62PqkBGIkO8/DjjEtHRCqPlA==", + "version": "0.1.19", + "resolved": "https://registry.npmjs.org/@initia/initia.js/-/initia.js-0.1.19.tgz", + "integrity": "sha512-OKehNvBE4hbeM4HbqOEucb4IHWSOg8S9GnvcmItHqIhqKcDiX6ZkzthqdZz2q9E1xkiUHUckEbwkcZC/gf473g==", "requires": { - "@initia/initia.proto": "^0.1.13", + "@initia/initia.proto": "^0.1.18", "@initia/opinit.proto": "^0.0.1", "@ledgerhq/hw-transport": "^6.27.12", "@ledgerhq/hw-transport-webhid": "^6.27.12", @@ -17908,9 +17908,9 @@ } }, "@initia/initia.proto": { - "version": "0.1.13", - "resolved": "https://registry.npmjs.org/@initia/initia.proto/-/initia.proto-0.1.13.tgz", - "integrity": "sha512-pJx/ny6J1rcZI0NSlrYx2tkzcXUnbPMXeYedcTxxnvz3LOY6Phe/1wW6c8qPJzmhBsZmLes+Uy6FeytSRotekw==", + "version": "0.1.18", + "resolved": "https://registry.npmjs.org/@initia/initia.proto/-/initia.proto-0.1.18.tgz", + "integrity": "sha512-98hSjstgjjzfasHoGIixWP0DiC1kNONyfpNCZxQ21DJAmK6wn1L/Ae51tF5N79J4UAstwTTXkmKrPDRGla9fJA==", "requires": { "@improbable-eng/grpc-web": "^0.15.0", "google-protobuf": "^3.21.0", diff --git a/bots/src/config.ts b/bots/src/config.ts index ae6e697d..a06d28d4 100644 --- a/bots/src/config.ts +++ b/bots/src/config.ts @@ -3,10 +3,10 @@ import { LCDClient } from '@initia/initia.js'; interface ConfigInterface { EXECUTOR_PORT: number; BATCH_PORT: number; - L1_LCD_URI: string; - L1_RPC_URI: string; - L2_LCD_URI: string; - L2_RPC_URI: string; + L1_LCD_URI: string[]; + L1_RPC_URI: string[]; + L2_LCD_URI: string[]; + L2_RPC_URI: string[]; EXECUTOR_URI: string; // only for test BRIDGE_ID: number; OUTPUT_SUBMITTER_MNEMONIC: string; @@ -27,7 +27,7 @@ const defaultConfig = { L1_RPC_URI: 'https://stone-rpc.initia.tech', L2_LCD_URI: 'https://minitia-rest.initia.tech', L2_RPC_URI: 'https://minitia-rpc.initia.tech', - EXECUTOR_URI: 'https://minitia-executor.initia.tech', + EXECUTOR_URI: 'https://minitia-executor.initia.tech', BRIDGE_ID: '', OUTPUT_SUBMITTER_MNEMONIC: '', EXECUTOR_ADDR: '', @@ -42,10 +42,10 @@ export class Config implements ConfigInterface { EXECUTOR_PORT: number; BATCH_PORT: number; - L1_LCD_URI: string; - L1_RPC_URI: string; - L2_LCD_URI: string; - L2_RPC_URI: string; + L1_LCD_URI: string[]; + L1_RPC_URI: string[]; + L2_LCD_URI: string[]; + L2_RPC_URI: string[]; EXECUTOR_URI: string; BRIDGE_ID: number; OUTPUT_SUBMITTER_MNEMONIC: string; @@ -78,10 +78,10 @@ export class Config implements ConfigInterface { this.EXECUTOR_PORT = parseInt(EXECUTOR_PORT); this.BATCH_PORT = parseInt(BATCH_PORT); - this.L1_LCD_URI = L1_LCD_URI; - this.L1_RPC_URI = L1_RPC_URI; - this.L2_LCD_URI = L2_LCD_URI; - this.L2_RPC_URI = L2_RPC_URI; + this.L1_LCD_URI = L1_LCD_URI.split(','); + this.L1_RPC_URI = L1_RPC_URI.split(','); + this.L2_LCD_URI = L2_LCD_URI.split(','); + this.L2_RPC_URI = L2_RPC_URI.split(','); this.EXECUTOR_URI = EXECUTOR_URI; this.BRIDGE_ID = parseInt(BRIDGE_ID); this.OUTPUT_SUBMITTER_MNEMONIC = OUTPUT_SUBMITTER_MNEMONIC; diff --git a/bots/src/lib/query.ts b/bots/src/lib/query.ts index b85dd630..d3e099e7 100644 --- a/bots/src/lib/query.ts +++ b/bots/src/lib/query.ts @@ -15,7 +15,9 @@ const config = getConfig(); export async function getLastOutputInfo( bridgeId: number ): Promise { - const [outputInfos, pagination] = await config.l1lcd.ophost.outputInfos(bridgeId); + const [outputInfos, pagination] = await config.l1lcd.ophost.outputInfos( + bridgeId + ); if (outputInfos.length === 0) return null; return await config.l1lcd.ophost.outputInfo(bridgeId, pagination.total); } diff --git a/bots/src/lib/rpc.ts b/bots/src/lib/rpc.ts index 03e739da..46b01c90 100644 --- a/bots/src/lib/rpc.ts +++ b/bots/src/lib/rpc.ts @@ -1,9 +1,6 @@ import * as winston from 'winston'; import axios, { AxiosRequestConfig } from 'axios'; import * as Websocket from 'ws'; -import { getConfig } from 'config'; - -const config = getConfig(); export class RPCSocket { public ws: Websocket; @@ -14,9 +11,20 @@ export class RPCSocket { public updateTimer: NodeJS.Timer; public latestHeight?: number; logger: winston.Logger; - - constructor(rpcUrl: string, public interval: number, logger: winston.Logger) { - this.wsUrl = rpcUrl.replace('http', 'ws') + '/websocket'; + rpcUrl: string; + curRPCUrlIndex: number; + + constructor( + public rpcUrls: string[], + public interval: number, + logger: winston.Logger + ) { + if (this.rpcUrls.length === 0) { + throw new Error('RPC URLs list cannot be empty'); + } + this.curRPCUrlIndex = 0; + this.rpcUrl = this.rpcUrls[this.curRPCUrlIndex]; + this.wsUrl = this.rpcUrl.replace('http', 'ws') + '/websocket'; this.logger = logger; } @@ -25,6 +33,13 @@ export class RPCSocket { this.updateTimer = setTimeout(() => this.tick(), this.interval); } + public rotateRPC() { + this.curRPCUrlIndex = (this.curRPCUrlIndex + 1) % this.rpcUrls.length; + this.rpcUrl = this.rpcUrls[this.curRPCUrlIndex]; + this.wsUrl = this.rpcUrl.replace('http', 'ws') + '/websocket'; + this.logger.info(`Rotate RPC to ${this.rpcUrl}`); + } + public stop(): void { if (this.ws) this.ws.terminate(); } @@ -76,7 +91,7 @@ export class RPCSocket { this.ws.on('close', (code, reason) => this.onDisconnect(code, reason.toString()) ); - this.ws.on('error', (err) => this.onError(err)); + this.ws.on('error', (error) => this.onError(error)); this.ws.on('message', async (raw) => await this.onRawData(raw)); this.ws.on('ping', () => this.ws.pong()); this.ws.on('pong', () => this.alive()); @@ -104,6 +119,7 @@ export class RPCSocket { } protected onDisconnect(code: number, reason: string): void { + this.rotateRPC(); this.logger.info( `${this.constructor.name}: websocket disconnected (${code}: ${reason})` ); @@ -122,7 +138,7 @@ export class RPCSocket { try { data = JSON.parse(raw); - } catch (err) { + } catch (error) { this.logger.error(`${this.constructor.name}: JSON parse error ${raw}`); return; } @@ -133,74 +149,138 @@ export class RPCSocket { data['result']?.['data']?.['value']['block']['header']['height'] ); } - } catch (err) { - this.logger.error(err); + } catch (error) { + this.logger.error(error); } this.alive(); } } -async function getRequest( - rpc: string, - path: string, - params?: Record -): Promise { - const options: AxiosRequestConfig = { - headers: { - 'Content-Type': 'application/json', - 'User-Agent': 'initia-rollup' +export class RPCClient { + private curRPCUrlIndex = 0; + private rpcUrl: string; + + constructor(public rpcUrls: string[], public logger: winston.Logger) { + if (this.rpcUrls.length === 0) { + throw new Error('RPC URLs list cannot be empty'); } - }; + this.curRPCUrlIndex = 0; + this.rpcUrl = this.rpcUrls[this.curRPCUrlIndex]; + } - let url = `${rpc}${path}`; - params && - Object.keys(params).forEach( - (key) => params[key] === undefined && delete params[key] - ); - const qs = new URLSearchParams(params as any).toString(); - if (qs.length) { - url += `?${qs}`; + public rotateRPC() { + this.curRPCUrlIndex = (this.curRPCUrlIndex + 1) % this.rpcUrls.length; + this.rpcUrl = this.rpcUrls[this.curRPCUrlIndex]; + this.logger.info(`Rotate RPC to ${this.rpcUrl}`); } - try { - const response = await axios.get(url, options); + async getRequest( + path: string, + params?: Record + ): Promise { + const options: AxiosRequestConfig = { + headers: { + 'Content-Type': 'application/json', + 'User-Agent': 'initia-rollup' + } + }; - if (response.status !== 200) { - throw new Error(`Invalid status code: ${response.status}`); + let url = `${this.rpcUrl}${path}`; + params && + Object.keys(params).forEach( + (key) => params[key] === undefined && delete params[key] + ); + const qs = new URLSearchParams(params as any).toString(); + if (qs.length) { + url += `?${qs}`; } - const data = response.data; - if (!data || typeof data.jsonrpc !== 'string') { - throw new Error('Failed to query RPC'); + try { + const response = await axios.get(url, options); + + if (response.status !== 200) { + throw new Error(`Invalid status code: ${response.status}`); + } + + const data = response.data; + if (!data || typeof data.jsonrpc !== 'string') { + throw new Error('Failed to query RPC'); + } + + return data.result; + } catch (e) { + throw new Error(`RPC request to ${url} failed by ${e}`); } + } - return data.result; - } catch (err) { - throw new Error(`RPC request to ${url} failed by ${err}`); + async getBlockchain( + min_height: number, + max_height: number + ): Promise { + const blockchainResult: Blockchain = await this.getRequest(`/blockchain`, { + minHeight: min_height.toString(), + maxHeight: max_height.toString() + }); + + if (!blockchainResult) { + this.logger.error('failed get blockchain from rpc'); + return null; + } + + return blockchainResult; } -} -export interface BlockBulk { - blocks: string[]; -} + async getBlockBulk(start: string, end: string): Promise { + const blockBulksResult: BlockBulk = await this.getRequest(`/block_bulk`, { + start, + end + }); + + if (!blockBulksResult) { + this.logger.error('failed get block bulks from rpc'); + return null; + } + + return blockBulksResult; + } + + async lookupInvalidBlock(): Promise { + const invalidBlockResult: InvalidBlock = await this.getRequest( + `/invalid_block` + ); + + if (invalidBlockResult.reason !== '' && invalidBlockResult.height !== '0') { + return invalidBlockResult; + } -export async function getBlockBulk( - start: string, - end: string -): Promise { - const blockBulksResult: BlockBulk = await getRequest( - config.L2_RPC_URI, - `/block_bulk`, - { start, end } - ); - - if (!blockBulksResult) { - this.logger.error('failed get block bulks from rpc'); return null; } - return blockBulksResult; + async getLatestBlockHeight(): Promise { + const abciInfo: ABCIInfo = await this.getRequest(`/abci_info`); + + if (abciInfo) { + return parseInt(abciInfo.last_block_height); + } + + throw new Error(`failed to get latest block height`); + } +} + +export interface Blockchain { + last_height: string; + block_metas: BlockMeta[]; +} + +export interface BlockMeta { + block_id: any; + block_size: string; + header: any; + num_txs: string; +} +export interface BlockBulk { + blocks: string[]; } interface InvalidBlock { @@ -208,21 +288,9 @@ interface InvalidBlock { height: string; } -/** - * Lookup invalid block on chain and return the response. - * Return null if there is no invalid block. - */ -export async function lookupInvalidBlock( - rpcUrl: string -): Promise { - const invalidBlockResult: InvalidBlock = await getRequest( - rpcUrl, - `/invalid_block` - ); - - if (invalidBlockResult.reason !== '' && invalidBlockResult.height !== '0') { - return invalidBlockResult; - } - - return null; +interface ABCIInfo { + data: string; + version: string; + last_block_height: string; + last_block_app_hash: string; } diff --git a/bots/src/scripts/setupL2.ts b/bots/src/scripts/setupL2.ts index e85f6854..9449f814 100644 --- a/bots/src/scripts/setupL2.ts +++ b/bots/src/scripts/setupL2.ts @@ -4,9 +4,9 @@ import { getConfig } from 'config'; import { executor, challenger, outputSubmitter } from 'test/utils/helper'; const config = getConfig(); -const SUBMISSION_INTERVAL = 10; -const FINALIZED_TIME = 10; -const IBC_METADATA = 'ibc_channel'; +const SUBMISSION_INTERVAL = 3600; // 1 hour +const FINALIZED_TIME = 3600; // 1 hour +const IBC_METADATA = 'channel-3'; class L2Initializer { l2id = config.BRIDGE_ID; diff --git a/bots/src/test/integration.ts b/bots/src/test/integration.ts index 355d3552..633047b6 100644 --- a/bots/src/test/integration.ts +++ b/bots/src/test/integration.ts @@ -26,11 +26,7 @@ async function setupBridge(submissionInterval: number, finalizedTime: number) { async function startBot() { try { - await Promise.all([ - startBatch(), - startExecutor(), - startOutput() - ]); + await Promise.all([startBatch(), startExecutor(), startOutput()]); } catch (err) { console.log(err); } diff --git a/bots/src/worker/batchSubmitter/batchSubmitter.ts b/bots/src/worker/batchSubmitter/batchSubmitter.ts index 354556ac..c987b276 100644 --- a/bots/src/worker/batchSubmitter/batchSubmitter.ts +++ b/bots/src/worker/batchSubmitter/batchSubmitter.ts @@ -1,7 +1,7 @@ import { getDB } from './db'; import { DataSource, EntityManager } from 'typeorm'; -import { batchLogger as logger } from 'lib/logger'; -import { BlockBulk, getBlockBulk } from 'lib/rpc'; +import { batchLogger, batchLogger as logger } from 'lib/logger'; +import { BlockBulk, RPCClient } from 'lib/rpc'; import { compressor } from 'lib/compressor'; import { ExecutorOutputEntity, RecordEntity } from 'orm'; import { Wallet, MnemonicKey, MsgRecordBatch } from '@initia/initia.js'; @@ -19,14 +19,17 @@ export class BatchSubmitter { private submitter: Wallet; private bridgeId: number; private isRunning = false; + private rpcClient: RPCClient; helper: MonitorHelper = new MonitorHelper(); async init() { [this.db] = getDB(); + this.rpcClient = new RPCClient(config.L1_RPC_URI, batchLogger); this.submitter = new Wallet( config.l1lcd, new MnemonicKey({ mnemonic: config.BATCH_SUBMITTER_MNEMONIC }) ); + this.bridgeId = config.BRIDGE_ID; this.isRunning = true; } @@ -81,7 +84,7 @@ export class BatchSubmitter { // Get [start, end] batch from L2 async getBatch(start: number, end: number): Promise { - const bulk: BlockBulk | null = await getBlockBulk( + const bulk: BlockBulk | null = await this.rpcClient.getBlockBulk( start.toString(), end.toString() ); diff --git a/bots/src/worker/bridgeExecutor/L1Monitor.ts b/bots/src/worker/bridgeExecutor/L1Monitor.ts index 1d94b49e..801ed95f 100644 --- a/bots/src/worker/bridgeExecutor/L1Monitor.ts +++ b/bots/src/worker/bridgeExecutor/L1Monitor.ts @@ -12,7 +12,7 @@ import { ExecutorOutputEntity } from 'orm'; import { EntityManager } from 'typeorm'; -import { RPCSocket } from 'lib/rpc'; +import { RPCClient, RPCSocket } from 'lib/rpc'; import { getDB } from './db'; import winston from 'winston'; import { getConfig } from 'config'; @@ -26,8 +26,12 @@ export class L1Monitor extends Monitor { new MnemonicKey({ mnemonic: config.EXECUTOR_MNEMONIC }) ); - constructor(public socket: RPCSocket, logger: winston.Logger) { - super(socket, logger); + constructor( + public socket: RPCSocket, + public rpcClient: RPCClient, + logger: winston.Logger + ) { + super(socket, rpcClient, logger); [this.db] = getDB(); } @@ -70,7 +74,7 @@ export class L1Monitor extends Monitor { ); } - public async handleEvents(manager: EntityManager): Promise { + public async handleEvents(manager: EntityManager): Promise { const msgs: Msg[] = []; const depositEvents = await this.helper.fetchEvents( @@ -79,6 +83,8 @@ export class L1Monitor extends Monitor { 'initiate_token_deposit' ); + if (depositEvents.length === 0) return false; + for (const evt of depositEvents) { const attrMap = this.helper.eventsToAttrMap(evt); if (attrMap['bridge_id'] !== this.bridgeId.toString()) continue; @@ -103,7 +109,7 @@ export class L1Monitor extends Monitor { .catch(async (err) => { const errMsg = err.response?.data ? JSON.stringify(err.response?.data) - : err; + : JSON.stringify(err); this.logger.error( `Failed to submit tx in height: ${this.syncedHeight}\nMsg: ${stringfyMsgs}\n${errMsg}` ); @@ -120,5 +126,7 @@ export class L1Monitor extends Monitor { ); }); } + + return true; } } diff --git a/bots/src/worker/bridgeExecutor/L2Monitor.ts b/bots/src/worker/bridgeExecutor/L2Monitor.ts index c8374ec2..30c9fc5e 100644 --- a/bots/src/worker/bridgeExecutor/L2Monitor.ts +++ b/bots/src/worker/bridgeExecutor/L2Monitor.ts @@ -5,7 +5,7 @@ import { WithdrawalTx } from 'lib/types'; import { EntityManager } from 'typeorm'; import { BlockInfo } from '@initia/initia.js'; import { getDB } from './db'; -import { RPCSocket } from 'lib/rpc'; +import { RPCClient, RPCSocket } from 'lib/rpc'; import winston from 'winston'; import { getConfig } from 'config'; import { getBridgeInfo } from 'lib/query'; @@ -16,8 +16,12 @@ export class L2Monitor extends Monitor { submissionInterval: number; nextSubmissionTimeSec: number; - constructor(public socket: RPCSocket, logger: winston.Logger) { - super(socket, logger); + constructor( + public socket: RPCSocket, + public rpcClient: RPCClient, + logger: winston.Logger + ) { + super(socket, rpcClient, logger); [this.db] = getDB(); this.nextSubmissionTimeSec = this.getCurTimeSec(); } @@ -71,20 +75,21 @@ export class L2Monitor extends Monitor { await this.helper.saveEntity(manager, ExecutorWithdrawalTxEntity, tx); } - public async handleEvents(manager: EntityManager): Promise { + public async handleEvents(manager: EntityManager): Promise { const withdrawalEvents = await this.helper.fetchEvents( config.l2lcd, this.syncedHeight, 'initiate_token_withdrawal' ); + if (withdrawalEvents.length === 0) return false; + for (const evt of withdrawalEvents) { const attrMap = this.helper.eventsToAttrMap(evt); - await this.handleInitiateTokenWithdrawalEvent( - manager, - attrMap - ); + await this.handleInitiateTokenWithdrawalEvent(manager, attrMap); } + + return true; } private async saveMerkleRootAndProof( diff --git a/bots/src/worker/bridgeExecutor/Monitor.ts b/bots/src/worker/bridgeExecutor/Monitor.ts index df418a54..ea73c275 100644 --- a/bots/src/worker/bridgeExecutor/Monitor.ts +++ b/bots/src/worker/bridgeExecutor/Monitor.ts @@ -1,5 +1,5 @@ import * as Bluebird from 'bluebird'; -import { RPCSocket } from 'lib/rpc'; +import { RPCClient, RPCSocket } from 'lib/rpc'; import { StateEntity } from 'orm'; import { DataSource, EntityManager } from 'typeorm'; import MonitorHelper from './MonitorHelper'; @@ -7,15 +7,22 @@ import winston from 'winston'; import { INTERVAL_MONITOR, getConfig } from 'config'; const config = getConfig(); +const MAX_BLOCKS = 20; // DO NOT CHANGE THIS, hard limit is 20 in cometbft. +const MAX_RETRY_INTERVAL = 30_000; export abstract class Monitor { public syncedHeight: number; protected db: DataSource; protected isRunning = false; protected bridgeId: number; + protected retryNum = 0; helper: MonitorHelper = new MonitorHelper(); - constructor(public socket: RPCSocket, public logger: winston.Logger) { + constructor( + public socket: RPCSocket, + public rpcClient: RPCClient, + public logger: winston.Logger + ) { this.bridgeId = config.BRIDGE_ID; } @@ -43,27 +50,64 @@ export abstract class Monitor { this.isRunning = false; } + async fetchBlockchainData() { + const latestHeight = this.socket.latestHeight; + if (!latestHeight || !(latestHeight > this.syncedHeight)) { + return null; + } + return this.rpcClient.getBlockchain( + this.syncedHeight + 1, + Math.min(latestHeight, this.syncedHeight + MAX_BLOCKS) + ); + } + public async monitor(): Promise { while (this.isRunning) { try { - const latestHeight = this.socket.latestHeight; - if (!latestHeight || this.syncedHeight >= latestHeight) continue; - if (this.syncedHeight % 10 == 0 && this.syncedHeight !== 1) { - this.logger.info(`${this.name()} height ${this.syncedHeight}`); - } - await this.db.transaction( - async (transactionalEntityManager: EntityManager) => { - await this.handleEvents(transactionalEntityManager); - await this.handleBlock(transactionalEntityManager); - - this.syncedHeight += 1; - - // update state - await this.db + const blockchainData = await this.fetchBlockchainData(); + if (blockchainData === null) continue; + + await this.db.transaction(async (manager: EntityManager) => { + for (const metadata of blockchainData?.block_metas.reverse()) { + const nextHeight = this.syncedHeight + 1; + if (nextHeight !== parseInt(metadata.header.height)) { + throw new Error( + `expected block meta is the height ${nextHeight}, but got ${metadata.header.height}` + ); + } + + if (nextHeight % 10 === 0) { + this.logger.info(`${this.name()} height ${nextHeight}`); + } + + if (parseInt(metadata.num_txs) === 0) { + this.syncedHeight++; + continue; + } + + // handle event always called when there is a tx in a block, + // so empty means, the tx indexing is still on going. + const ok: boolean = await this.handleEvents(manager); + if (!ok) { + this.retryNum++; + if (this.retryNum * INTERVAL_MONITOR >= MAX_RETRY_INTERVAL) { + // rotate when tx index data is not found during 30s after block stored. + this.rpcClient.rotateRPC(); + } + break; + } + this.retryNum = 0; + await this.handleBlock(manager); + + this.syncedHeight++; + await manager .getRepository(StateEntity) .update({ name: this.name() }, { height: this.syncedHeight }); + + // add delay to prevent spamming + await Bluebird.Promise.delay(INTERVAL_MONITOR); } - ); + }); } catch (err) { this.stop(); console.log(err); @@ -76,7 +120,7 @@ export abstract class Monitor { } // eslint-disable-next-line - public async handleEvents(manager: EntityManager): Promise {} + public async handleEvents(manager: EntityManager): Promise {} // eslint-disable-next-line public async handleBlock(manager: EntityManager): Promise {} diff --git a/bots/src/worker/bridgeExecutor/MonitorHelper.ts b/bots/src/worker/bridgeExecutor/MonitorHelper.ts index e73af61f..ecbbc2ad 100644 --- a/bots/src/worker/bridgeExecutor/MonitorHelper.ts +++ b/bots/src/worker/bridgeExecutor/MonitorHelper.ts @@ -102,12 +102,13 @@ class MonitorHelper { const searchRes = await lcd.tx.search({ events: [{ key: 'tx.height', value: height.toString() }] }); + return searchRes.txs .flatMap((tx) => tx.logs ?? []) .flatMap((log) => log.events) .filter((evt) => evt.type === eventType); } - + public eventsToAttrMap(event: any): { [key: string]: string } { return event.attributes.reduce((obj, attr) => { obj[attr.key] = attr.value; diff --git a/bots/src/worker/bridgeExecutor/index.ts b/bots/src/worker/bridgeExecutor/index.ts index 7cd096ab..7e844051 100644 --- a/bots/src/worker/bridgeExecutor/index.ts +++ b/bots/src/worker/bridgeExecutor/index.ts @@ -1,4 +1,4 @@ -import { RPCSocket } from 'lib/rpc'; +import { RPCClient, RPCSocket } from 'lib/rpc'; import { Monitor } from './Monitor'; import { L1Monitor } from './L1Monitor'; import { L2Monitor } from './L2Monitor'; @@ -15,8 +15,16 @@ let monitors: Monitor[]; async function runBot(): Promise { monitors = [ - new L1Monitor(new RPCSocket(config.L1_RPC_URI, 10000, logger), logger), - new L2Monitor(new RPCSocket(config.L2_RPC_URI, 10000, logger), logger) + new L1Monitor( + new RPCSocket(config.L1_RPC_URI, 10000, logger), + new RPCClient(config.L1_RPC_URI, logger), + logger + ), + new L2Monitor( + new RPCSocket(config.L2_RPC_URI, 10000, logger), + new RPCClient(config.L2_RPC_URI, logger), + logger + ) ]; try { await Promise.all( diff --git a/bots/src/worker/challenger/L1Monitor.ts b/bots/src/worker/challenger/L1Monitor.ts index 9a6985b8..681e05c3 100644 --- a/bots/src/worker/challenger/L1Monitor.ts +++ b/bots/src/worker/challenger/L1Monitor.ts @@ -1,10 +1,10 @@ import { Monitor } from 'worker/bridgeExecutor/Monitor'; import { ChallengerDepositTxEntity, - ChallengerFinalizeWithdrawalTxEntity, + ChallengerFinalizeWithdrawalTxEntity } from 'orm'; import { EntityManager } from 'typeorm'; -import { RPCSocket } from 'lib/rpc'; +import { RPCClient, RPCSocket } from 'lib/rpc'; import { getDB } from './db'; import winston from 'winston'; import { getConfig } from 'config'; @@ -12,8 +12,12 @@ import { getConfig } from 'config'; const config = getConfig(); export class L1Monitor extends Monitor { - constructor(public socket: RPCSocket, logger: winston.Logger) { - super(socket, logger); + constructor( + public socket: RPCSocket, + public rpcClient: RPCClient, + logger: winston.Logger + ) { + super(socket, rpcClient, logger); [this.db] = getDB(); } @@ -57,29 +61,33 @@ export class L1Monitor extends Monitor { .save(entity); } - public async handleEvents(manager: EntityManager): Promise { + public async handleEvents(manager: EntityManager): Promise { const depositEvents = await this.helper.fetchEvents( config.l1lcd, this.syncedHeight, 'initiate_token_deposit' ); - for (const evt of depositEvents) { - const attrMap = this.helper.eventsToAttrMap(evt); - if (attrMap['bridge_id'] !== this.bridgeId.toString()) continue; - await this.handleInitiateTokenDeposit(manager, attrMap); - } - const withdrawalEvents = await this.helper.fetchEvents( config.l1lcd, this.syncedHeight, 'finalize_token_withdrawal' ); + if (depositEvents.length === 0 && withdrawalEvents.length === 0) + return false; + + for (const evt of depositEvents) { + const attrMap = this.helper.eventsToAttrMap(evt); + if (attrMap['bridge_id'] !== this.bridgeId.toString()) continue; + await this.handleInitiateTokenDeposit(manager, attrMap); + } + for (const evt of withdrawalEvents) { const attrMap = this.helper.eventsToAttrMap(evt); if (attrMap['bridge_id'] !== this.bridgeId.toString()) continue; await this.handleFinalizeTokenWithdrawalEvent(manager, attrMap); } + return false; } } diff --git a/bots/src/worker/challenger/L2Monitor.ts b/bots/src/worker/challenger/L2Monitor.ts index 5839ca55..cb497163 100644 --- a/bots/src/worker/challenger/L2Monitor.ts +++ b/bots/src/worker/challenger/L2Monitor.ts @@ -9,7 +9,7 @@ import { Monitor } from 'worker/bridgeExecutor/Monitor'; import { WithdrawStorage } from 'lib/storage'; import { WithdrawalTx } from 'lib/types'; import { EntityManager } from 'typeorm'; -import { RPCSocket } from 'lib/rpc'; +import { RPCClient, RPCSocket } from 'lib/rpc'; import winston from 'winston'; import { getDB } from './db'; import { INTERVAL_MONITOR, getConfig } from 'config'; @@ -23,8 +23,12 @@ export class L2Monitor extends Monitor { outputInfo: OutputInfo; startBlockNumber: number; - constructor(public socket: RPCSocket, logger: winston.Logger) { - super(socket, logger); + constructor( + public socket: RPCSocket, + public rpcClient: RPCClient, + logger: winston.Logger + ) { + super(socket, rpcClient, logger); [this.db] = getDB(); this.outputIndex = 0; } @@ -35,24 +39,32 @@ export class L2Monitor extends Monitor { public async monitor(): Promise { while (this.isRunning) { - const nextOutputInfo = await getOutputInfoByIndex(this.bridgeId, this.outputIndex + 1) - .catch(async() => { - await delay(INTERVAL_MONITOR); - return null; - }); + const nextOutputInfo = await getOutputInfoByIndex( + this.bridgeId, + this.outputIndex + 1 + ).catch(async () => { + await delay(INTERVAL_MONITOR); + return null; + }); if (!nextOutputInfo) continue; - this.startBlockNumber = this.outputIndex === 0 ? 1 : nextOutputInfo.output_proposal.l2_block_number + 1; - this.outputIndex += 1; + this.startBlockNumber = + this.outputIndex === 0 + ? 1 + : nextOutputInfo.output_proposal.l2_block_number + 1; + this.outputIndex += 1; this.outputInfo = nextOutputInfo; await this.db.transaction( async (transactionalEntityManager: EntityManager) => { await this.processL2Monitor(transactionalEntityManager); - }) + } + ); } } async processL2Monitor(manager: EntityManager): Promise { - while (this.syncedHeight <= this.outputInfo.output_proposal.l2_block_number) { + while ( + this.syncedHeight <= this.outputInfo.output_proposal.l2_block_number + ) { try { if (this.syncedHeight % 10 == 0 && this.syncedHeight !== 1) { this.logger.info(`${this.name()} height ${this.syncedHeight}`); @@ -113,7 +125,7 @@ export class L2Monitor extends Monitor { receiver: data['recipient'], l2Denom: data['denom'], amount: data['amount'], - l1Height: parseInt(data['finalize_height']), + l1Height: parseInt(data['finalize_height']) }; await manager.getRepository(ChallengerFinalizeDepositTxEntity).save(entity); } @@ -124,15 +136,12 @@ export class L2Monitor extends Monitor { this.syncedHeight, 'initiate_token_withdrawal' ); - + for (const evt of withdrawalEvents) { const attrMap = this.helper.eventsToAttrMap(evt); - await this.handleInitiateTokenWithdrawalEvent( - manager, - attrMap - ); + await this.handleInitiateTokenWithdrawalEvent(manager, attrMap); } - + const depositEvents = await this.helper.fetchEvents( config.l2lcd, this.syncedHeight, @@ -141,10 +150,7 @@ export class L2Monitor extends Monitor { for (const evt of depositEvents) { const attrMap = this.helper.eventsToAttrMap(evt); - await this.handleFinalizeTokenDepositEvent( - manager, - attrMap - ); + await this.handleFinalizeTokenDepositEvent(manager, attrMap); } } @@ -178,7 +184,8 @@ export class L2Monitor extends Monitor { } public async handleBlock(manager: EntityManager): Promise { - if (this.syncedHeight > this.outputInfo.output_proposal.l2_block_number) return; + if (this.syncedHeight > this.outputInfo.output_proposal.l2_block_number) + return; const blockInfo: BlockInfo = await config.l2lcd.tendermint.blockInfo( this.syncedHeight @@ -191,10 +198,7 @@ export class L2Monitor extends Monitor { this.outputInfo.output_index ); - const storageRoot = await this.saveMerkleRootAndProof( - manager, - txEntities - ); + const storageRoot = await this.saveMerkleRootAndProof(manager, txEntities); const outputEntity = this.helper.calculateOutputEntity( this.outputInfo.output_index, @@ -204,10 +208,6 @@ export class L2Monitor extends Monitor { this.outputInfo.output_proposal.l2_block_number ); - await this.helper.saveEntity( - manager, - ChallengerOutputEntity, - outputEntity - ); + await this.helper.saveEntity(manager, ChallengerOutputEntity, outputEntity); } } diff --git a/bots/src/worker/challenger/challenger.ts b/bots/src/worker/challenger/challenger.ts index 5748b912..4d471fde 100644 --- a/bots/src/worker/challenger/challenger.ts +++ b/bots/src/worker/challenger/challenger.ts @@ -69,7 +69,7 @@ export class Challenger { .findOne({ where: { sequence: this.l1DepositSequenceToChallenge } as any }); - + if (!depositTxFromChallenger) { return; } @@ -95,10 +95,13 @@ export class Challenger { ); }); - if (!depositFinalizeTxFromChallenger)return; + if (!depositFinalizeTxFromChallenger) return; // case 2. not equal deposit tx between L1 and L2 - const pair = await config.l1lcd.ophost.tokenPairByL1Denom(this.bridgeId, depositTxFromChallenger.l1Denom); + const pair = await config.l1lcd.ophost.tokenPairByL1Denom( + this.bridgeId, + depositTxFromChallenger.l1Denom + ); const isEqaul = depositTxFromChallenger.sender === depositFinalizeTxFromChallenger.sender && @@ -106,8 +109,7 @@ export class Challenger { depositFinalizeTxFromChallenger.receiver && depositTxFromChallenger.amount === depositFinalizeTxFromChallenger.amount && - pair.l2_denom === - depositFinalizeTxFromChallenger.l2Denom; + pair.l2_denom === depositFinalizeTxFromChallenger.l2Denom; if (!isEqaul && lastOutputInfo) { await this.deleteOutputProposal( @@ -124,14 +126,16 @@ export class Challenger { this.l1LastChallengedSequence = this.l1DepositSequenceToChallenge; // get next sequence from db with smallest sequence but bigger than last challenged sequence const nextDepositSequenceToChallenge = await manager - .getRepository(ChallengerDepositTxEntity) - .find({ - where: { sequence: MoreThan(this.l1DepositSequenceToChallenge) } as any, - order: { sequence: 'ASC' }, - take: 1 - }) - if (nextDepositSequenceToChallenge.length === 0) return - this.l1DepositSequenceToChallenge = Number(nextDepositSequenceToChallenge[0].sequence) + .getRepository(ChallengerDepositTxEntity) + .find({ + where: { sequence: MoreThan(this.l1DepositSequenceToChallenge) } as any, + order: { sequence: 'ASC' }, + take: 1 + }); + if (nextDepositSequenceToChallenge.length === 0) return; + this.l1DepositSequenceToChallenge = Number( + nextDepositSequenceToChallenge[0].sequence + ); } public stop(): void { @@ -175,9 +179,9 @@ export class Challenger { ).catch(() => { return null; }); - + if (!outputInfoToChallenge) return; - + // case 1. output root not matched const outputRootFromContract = await this.getContractOutputRoot( this.l2OutputIndexToChallenge @@ -186,7 +190,7 @@ export class Challenger { manager, this.l2OutputIndexToChallenge ); - + if (!outputRootFromContract || !outputRootFromChallenger) return; if (outputRootFromContract !== outputRootFromChallenger) { @@ -200,7 +204,7 @@ export class Challenger { logger.info( `[L2 Challenger] output root matched in output index : ${this.l2OutputIndexToChallenge}` ); - this.l2OutputIndexToChallenge += 1 + this.l2OutputIndexToChallenge += 1; } async deleteOutputProposal( diff --git a/bots/src/worker/challenger/index.ts b/bots/src/worker/challenger/index.ts index 2d5cdd44..315ee360 100644 --- a/bots/src/worker/challenger/index.ts +++ b/bots/src/worker/challenger/index.ts @@ -1,4 +1,4 @@ -import { RPCSocket } from 'lib/rpc'; +import { RPCClient, RPCSocket } from 'lib/rpc'; import { L1Monitor } from './L1Monitor'; import { Monitor } from 'worker/bridgeExecutor/Monitor'; import { Challenger } from './challenger'; @@ -7,7 +7,13 @@ import { challengerLogger as logger } from 'lib/logger'; import { once } from 'lodash'; import { L2Monitor } from './L2Monitor'; import { getConfig } from 'config'; -import { ChallengerDepositTxEntity, ChallengerFinalizeDepositTxEntity, ChallengerOutputEntity, ChallengerWithdrawalTxEntity, StateEntity } from 'orm/index'; +import { + ChallengerDepositTxEntity, + ChallengerFinalizeDepositTxEntity, + ChallengerOutputEntity, + ChallengerWithdrawalTxEntity, + StateEntity +} from 'orm/index'; const config = getConfig(); @@ -34,8 +40,16 @@ async function runBot(): Promise { // ) // }); monitors = [ - new L1Monitor(new RPCSocket(config.L1_RPC_URI, 10000, logger), logger), - new L2Monitor(new RPCSocket(config.L2_RPC_URI, 10000, logger), logger), + new L1Monitor( + new RPCSocket(config.L1_RPC_URI, 10000, logger), + new RPCClient(config.L1_RPC_URI, logger), + logger + ), + new L2Monitor( + new RPCSocket(config.L2_RPC_URI, 10000, logger), + new RPCClient(config.L2_RPC_URI, logger), + logger + ), challenger ]; try { diff --git a/bots/src/worker/outputSubmitter/index.ts b/bots/src/worker/outputSubmitter/index.ts index 4042b83d..eee69bb7 100644 --- a/bots/src/worker/outputSubmitter/index.ts +++ b/bots/src/worker/outputSubmitter/index.ts @@ -6,9 +6,7 @@ import { initORM } from './db'; let jobs: OutputSubmitter[]; async function runBot(): Promise { - const outputSubmitter = new OutputSubmitter(); - - jobs = [outputSubmitter]; + jobs = [new OutputSubmitter()]; try { await Promise.all( diff --git a/bots/src/worker/outputSubmitter/outputSubmitter.ts b/bots/src/worker/outputSubmitter/outputSubmitter.ts index b6217bbe..ddcc0fb0 100644 --- a/bots/src/worker/outputSubmitter/outputSubmitter.ts +++ b/bots/src/worker/outputSubmitter/outputSubmitter.ts @@ -87,7 +87,8 @@ export class OutputSubmitter { outputEntity.outputRoot ); - const { account_number, sequence } = await this.submitter.accountNumberAndSequence(); + const { account_number, sequence } = + await this.submitter.accountNumberAndSequence(); await sendTx(this.submitter, [msg], account_number, sequence); diff --git a/package-lock.json b/package-lock.json new file mode 100644 index 00000000..e68ece41 --- /dev/null +++ b/package-lock.json @@ -0,0 +1,6 @@ +{ + "name": "OPinit", + "lockfileVersion": 2, + "requires": true, + "packages": {} +}