Skip to content

Commit

Permalink
feat: process fee vault withdrawals (#24)
Browse files Browse the repository at this point in the history
* IMPORTANT: change db type to WithdrawalInitiatedLog

* feat: rename L2ScannedBlock to L2ScannedBlockV2

Rename type name to change the database table, so that discard the
stabled records

* feat(processor): process withdrawalInitiatedLog instead of withdrawToEvent

* feat: add indexer

* fix: check log length

* chore(metrics): adjust db table name

* chore: rename variables

* optimize(indexer): update l2_scanned_blocks every 10min
  • Loading branch information
bendanzhentan authored Mar 7, 2024
1 parent 9836c73 commit df84810
Show file tree
Hide file tree
Showing 5 changed files with 319 additions and 177 deletions.
113 changes: 8 additions & 105 deletions cmd/bot/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,23 +5,19 @@ import (
"context"
"errors"
"fmt"
"math/big"
"net/http"
"strings"
"time"

"github.com/ethereum-optimism/optimism/indexer/config"
oplog "github.com/ethereum-optimism/optimism/op-service/log"
"github.com/ethereum-optimism/optimism/op-service/retry"
"github.com/ethereum/go-ethereum"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/log"
"github.com/prometheus/client_golang/prometheus/promhttp"
"github.com/urfave/cli/v2"
"gorm.io/driver/mysql"
"gorm.io/gorm"
"gorm.io/gorm/clause"
)

func RunCommand(ctx *cli.Context) error {
Expand Down Expand Up @@ -52,11 +48,12 @@ func RunCommand(ctx *cli.Context) error {
if err != nil {
return fmt.Errorf("failed to connect database: %w", err)
}

err = db.AutoMigrate(&core.L2ScannedBlock{})
if err != nil {
return fmt.Errorf("failed to migrate l2_scanned_blocks: %w", err)
}
err = db.AutoMigrate(&core.BotDelegatedWithdrawal{})
err = db.AutoMigrate(&core.WithdrawalInitiatedLog{})
if err != nil {
return fmt.Errorf("failed to migrate withdrawals: %w", err)
}
Expand All @@ -78,7 +75,10 @@ func RunCommand(ctx *cli.Context) error {
}()

go core.StartMetrics(ctx.Context, &cfg, &l1Client.Client, db, logger)
go WatchBotDelegatedWithdrawals(ctx.Context, logger, db, l2Client, l2ScannedBlock, cfg)
go func() {
indexer := core.NewIndexer(logger, db, l2Client, cfg)
indexer.Start(ctx.Context, l2ScannedBlock)
}()
go ProcessBotDelegatedWithdrawals(ctx.Context, logger, db, l1Client, l2Client, cfg)

<-ctx.Context.Done()
Expand Down Expand Up @@ -130,7 +130,7 @@ func ProcessUnprovenBotDelegatedWithdrawals(ctx context.Context, log log.Logger,
processor := core.NewProcessor(log, l1Client, l2Client, cfg)
limit := 1000

unprovens := make([]core.BotDelegatedWithdrawal, 0)
unprovens := make([]core.WithdrawalInitiatedLog, 0)
result := db.Order("id asc").Where("proven_time IS NULL AND initiated_block_number <= ? AND failure_reason IS NULL", latestProposedNumber.Uint64()).Limit(limit).Find(&unprovens)
if result.Error != nil {
log.Error("failed to query withdrawals", "error", result.Error)
Expand Down Expand Up @@ -182,7 +182,7 @@ func ProcessUnfinalizedBotDelegatedWithdrawals(ctx context.Context, log log.Logg
now := time.Now()
maxProvenTime := now.Add(-time.Duration(cfg.ChallengeTimeWindow) * time.Second)

unfinalizeds := make([]core.BotDelegatedWithdrawal, 0)
unfinalizeds := make([]core.WithdrawalInitiatedLog, 0)
result := db.Order("id asc").Where("finalized_time IS NULL AND proven_time IS NOT NULL AND proven_time < ? AND failure_reason IS NULL", maxProvenTime).Limit(limit).Find(&unfinalizeds)
if result.Error != nil {
log.Error("failed to query withdrawals", "error", result.Error)
Expand Down Expand Up @@ -228,103 +228,6 @@ func ProcessUnfinalizedBotDelegatedWithdrawals(ctx context.Context, log log.Logg
}
}

// storeLogs stores the logs in the database
func storeLogs(db *gorm.DB, client *core.ClientExt, logs []types.Log) error {
// save all the logs in this range of blocks
for _, vLog := range logs {
header, err := client.HeaderByHash(context.Background(), vLog.BlockHash)
if err != nil {
return err
}

event := core.BotDelegatedWithdrawal{
TransactionHash: vLog.TxHash.Hex(),
LogIndex: int(vLog.Index),
InitiatedBlockNumber: int64(header.Number.Uint64()),
}

deduped := db.Clauses(
clause.OnConflict{DoNothing: true},
)
result := deduped.Create(&event)
if result.Error != nil {
return result.Error
}
}

return nil
}

// WatchBotDelegatedWithdrawals watches for new bot-delegated withdrawals and stores them in the database.
func WatchBotDelegatedWithdrawals(ctx context.Context, log log.Logger, db *gorm.DB, client *core.ClientExt, l2StartingBlock *core.L2ScannedBlock, cfg core.Config) {
timer := time.NewTimer(0)
fromBlockNumber := big.NewInt(l2StartingBlock.Number)

for {
select {
case <-ctx.Done():
return
case <-timer.C:
timer.Reset(time.Second)
}

toBlockNumber := new(big.Int).Add(fromBlockNumber, big.NewInt(cfg.L2StandardBridgeBot.LogFilterBlockRange))
finalizedHeader, err := client.GetHeaderByTag(context.Background(), "finalized")
if err != nil {
log.Error("call eth_blockNumber", "error", err)
continue
}
if toBlockNumber.Uint64() > finalizedHeader.Number.Uint64() {
toBlockNumber = finalizedHeader.Number
}

if fromBlockNumber.Uint64() > toBlockNumber.Uint64() {
timer.Reset(5 * time.Second)
continue
}

log.Info("Fetching logs from blocks", "fromBlock", fromBlockNumber, "toBlock", toBlockNumber)
logs, err := getLogs(client, fromBlockNumber, toBlockNumber, common.HexToAddress(cfg.L2StandardBridgeBot.ContractAddress), core.WithdrawToEventSig())
if err != nil {
log.Error("eth_getLogs", "error", err)
continue
}

if len(logs) != 0 {
for _, vlog := range logs {
log.Info("fetched bot-delegated withdrawal", "blockNumber", vlog.BlockNumber, "transactionHash", vlog.TxHash.Hex())
}

err = storeLogs(db, client, logs)
if err != nil {
log.Error("storeLogs", "error", err)
continue
}
}

l2StartingBlock.Number = toBlockNumber.Int64()
result := db.Save(l2StartingBlock)
if result.Error != nil {
log.Error("update l2_scanned_blocks", "error", result.Error)
}

fromBlockNumber = new(big.Int).Add(toBlockNumber, big.NewInt(1))
}
}

// getLogs returns the logs for a given contract address and block range
func getLogs(client *core.ClientExt, fromBlock *big.Int, toBlock *big.Int, contractAddress common.Address, eventSig common.Hash) ([]types.Log, error) {
query := ethereum.FilterQuery{
FromBlock: fromBlock,
ToBlock: toBlock,
Addresses: []common.Address{
contractAddress,
},
Topics: [][]common.Hash{[]common.Hash{eventSig}},
}
return client.FilterLogs(context.Background(), query)
}

// connect connects to the database
func connect(log log.Logger, dbConfig config.DBConfig) (*gorm.DB, error) {
dsn := fmt.Sprintf("%s:%s@tcp(%s:%d)/%s?charset=utf8&parseTime=True&loc=Local", dbConfig.User, dbConfig.Password, dbConfig.Host, dbConfig.Port, dbConfig.Name)
Expand Down
Loading

0 comments on commit df84810

Please sign in to comment.