From 041fd7a34b41096995b138954494bd7a9442d652 Mon Sep 17 00:00:00 2001 From: sweexordious Date: Tue, 2 Jul 2024 00:35:38 +0100 Subject: [PATCH 1/6] feat: add replay functionality --- cmd/blobstream-ops/common/utils.go | 44 ++++ cmd/blobstream-ops/replay/cmd.go | 160 ++++++++++++++ cmd/blobstream-ops/replay/config.go | 160 ++++++++++++++ cmd/blobstream-ops/verify/cmd.go | 42 +--- go.mod | 3 +- go.sum | 6 +- replayer/evm.go | 180 ++++++++++++++++ replayer/replayer.go | 322 ++++++++++++++++++++++++++++ 8 files changed, 875 insertions(+), 42 deletions(-) create mode 100644 cmd/blobstream-ops/common/utils.go create mode 100644 cmd/blobstream-ops/replay/cmd.go create mode 100644 cmd/blobstream-ops/replay/config.go create mode 100644 replayer/evm.go create mode 100644 replayer/replayer.go diff --git a/cmd/blobstream-ops/common/utils.go b/cmd/blobstream-ops/common/utils.go new file mode 100644 index 0000000..0613007 --- /dev/null +++ b/cmd/blobstream-ops/common/utils.go @@ -0,0 +1,44 @@ +package common + +import ( + "context" + "fmt" + "io" + "os" + "os/signal" + "strings" + "syscall" + + "github.com/cosmos/cosmos-sdk/server" + "github.com/rs/zerolog" + tmconfig "github.com/tendermint/tendermint/config" + tmlog "github.com/tendermint/tendermint/libs/log" +) + +// GetLogger creates a new logger and returns +func GetLogger(level string, format string) (tmlog.Logger, error) { + logLvl, err := zerolog.ParseLevel(level) + if err != nil { + return nil, fmt.Errorf("failed to parse log level (%s): %w", level, err) + } + var logWriter io.Writer + if strings.ToLower(format) == tmconfig.LogFormatPlain { + logWriter = zerolog.ConsoleWriter{Out: os.Stderr} + } else { + logWriter = os.Stderr + } + + return server.ZeroLogWrapper{Logger: zerolog.New(logWriter).Level(logLvl).With().Timestamp().Logger()}, nil +} + +// TrapSignal will listen for any OS signal and cancel the context to exit gracefully. +func TrapSignal(logger tmlog.Logger, cancel context.CancelFunc) { + sigCh := make(chan os.Signal, 1) + + signal.Notify(sigCh, syscall.SIGTERM) + signal.Notify(sigCh, syscall.SIGINT) + + sig := <-sigCh + logger.Info("caught signal; shutting down...", "signal", sig.String()) + cancel() +} diff --git a/cmd/blobstream-ops/replay/cmd.go b/cmd/blobstream-ops/replay/cmd.go new file mode 100644 index 0000000..0e6a67d --- /dev/null +++ b/cmd/blobstream-ops/replay/cmd.go @@ -0,0 +1,160 @@ +package replay + +import ( + "context" + "fmt" + + "github.com/celestiaorg/blobstream-ops/cmd/blobstream-ops/common" + "github.com/celestiaorg/blobstream-ops/cmd/blobstream-ops/version" + "github.com/celestiaorg/blobstream-ops/replayer" + "github.com/ethereum/go-ethereum/accounts/abi/bind" + ethcmn "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/ethclient" + "github.com/spf13/cobra" + blobstreamxwrapper "github.com/succinctlabs/blobstreamx/bindings" + "github.com/tendermint/tendermint/rpc/client/http" +) + +// Command the replay command +func Command() *cobra.Command { + cmd := &cobra.Command{ + Use: "replay", + Short: "BlobstreamX deployment verification", + Long: "verifies that a BlobstreamX contract is committing to valid data", + SilenceUsage: true, + RunE: func(cmd *cobra.Command, args []string) error { + config, err := parseFlags(cmd) + if err != nil { + return err + } + if err := config.ValidateBasics(); err != nil { + return err + } + + logger, err := common.GetLogger(config.LogLevel, config.LogFormat) + if err != nil { + return err + } + + buildInfo := version.GetBuildInfo() + logger.Info("initializing replay service", "version", buildInfo.SemanticVersion, "build_date", buildInfo.BuildTime) + + ctx, cancel := context.WithCancel(cmd.Context()) + defer cancel() + + // Listen for and trap any OS signal to graceful shutdown and exit + go common.TrapSignal(logger, cancel) + + // connecting to the source BlobstreamX contract + sourceEVMClient, err := ethclient.Dial(config.SourceEVMRPC) + if err != nil { + return err + } + defer sourceEVMClient.Close() + + sourceBlobstreamReader, err := blobstreamxwrapper.NewBlobstreamXCaller( + ethcmn.HexToAddress(config.SourceContractAddress), + sourceEVMClient, + ) + if err != nil { + return err + } + + // connecting to the target BlobstreamX contract + targetEVMClient, err := ethclient.Dial(config.TargetEVMRPC) + if err != nil { + return err + } + defer targetEVMClient.Close() + + targetBlobstreamReader, err := blobstreamxwrapper.NewBlobstreamXCaller( + ethcmn.HexToAddress(config.TargetContractAddress), + targetEVMClient, + ) + if err != nil { + return err + } + + logger.Info( + "starting replay service", + "evm.source.rpc", + config.SourceEVMRPC, + "evm.source.contract-address", + config.SourceContractAddress, + "evm.target.rpc", + config.TargetEVMRPC, + "evm.target.contract-address", + config.TargetContractAddress, + "core.rpc", + config.CoreRPC, + ) + + latestSourceNonce, err := sourceBlobstreamReader.StateProofNonce(&bind.CallOpts{}) + if err != nil { + return err + } + logger.Info("found latest source blobstreamX contract nonce", "nonce", latestSourceNonce.Int64()) + + latestTargetNonce, err := targetBlobstreamReader.StateProofNonce(&bind.CallOpts{}) + if err != nil { + return err + } + logger.Info("found latest target blobstreamX contract nonce", "nonce", latestTargetNonce.Int64()) + + var trpc *http.HTTP + if config.Verify { + trpc, err = http.New(config.CoreRPC, "/websocket") + if err != nil { + return err + } + err = trpc.Start() + if err != nil { + return err + } + defer func(trpc *http.HTTP) { + err := trpc.Stop() + if err != nil { + fmt.Println(err.Error()) + } + }(trpc) + } + + if latestSourceNonce.Int64() > latestTargetNonce.Int64() { + err = replayer.Catchup( + ctx, + logger, + config.Verify, + trpc, + sourceEVMClient, + targetEVMClient, + config.SourceContractAddress, + config.TargetContractAddress, + config.TargetChainGateway, + config.PrivateKey, + ) + if err != nil { + return err + } + } else { + logger.Info("target contract is already up to date") + } + + return replayer.Follow( + ctx, + logger, + config.Verify, + trpc, + sourceEVMClient, + targetEVMClient, + config.SourceContractAddress, + config.TargetContractAddress, + config.TargetChainGateway, + config.PrivateKey, + ) + }, + } + + cmd.SetHelpCommand(&cobra.Command{}) + + return addFlags(cmd) +} diff --git a/cmd/blobstream-ops/replay/config.go b/cmd/blobstream-ops/replay/config.go new file mode 100644 index 0000000..dbd1b80 --- /dev/null +++ b/cmd/blobstream-ops/replay/config.go @@ -0,0 +1,160 @@ +package replay + +import ( + "crypto/ecdsa" + "errors" + "fmt" + + "github.com/ethereum/go-ethereum/crypto" + + ethcmn "github.com/ethereum/go-ethereum/common" + "github.com/spf13/cobra" +) + +const ( + FlagSourceEVMRPC = "evm.source.rpc" + FlagSourceEVMContractAddress = "evm.source.contract-address" + FlagTargetEVMRPC = "evm.target.rpc" + FlagTargetEVMContractAddress = "evm.target.contract-address" + FlagTargetChainGateway = "evm.target.gateway" + FlagEVMPrivateKey = "evm.private-key" + + FlagVerify = "verify" + + FlagLogLevel = "log.level" + FlagLogFormat = "log.format" + + FlagCoreRPC = "core.rpc" +) + +func addFlags(cmd *cobra.Command) *cobra.Command { + cmd.Flags().String(FlagSourceEVMRPC, "http://localhost:8545", "Specify the Ethereum rpc address of the source EVM chain") + cmd.Flags().String(FlagTargetEVMRPC, "http://localhost:8545", "Specify the Ethereum rpc address of the target EVM chain") + cmd.Flags().String(FlagSourceEVMContractAddress, "", "Specify the source contract at which the source BlobstreamX contract is deployed") + cmd.Flags().String(FlagTargetEVMContractAddress, "", "Specify the target contract at which the target BlobstreamX contract is deployed") + cmd.Flags().String(FlagTargetChainGateway, "", "Specify the target chain succinct gateway contract address") + cmd.Flags().String( + FlagLogLevel, + "info", + "The logging level (trace|debug|info|warn|error|fatal|panic)", + ) + cmd.Flags().String( + FlagLogFormat, + "plain", + "The logging format (json|plain)", + ) + cmd.Flags().String( + FlagCoreRPC, + "tcp://localhost:26657", + "The celestia app rpc address", + ) + cmd.Flags().Bool(FlagVerify, false, "Set to verify the commitments before replaying their proofs. Require the core rpc flag to be set") + cmd.Flags().String(FlagEVMPrivateKey, "", "Specify the EVM private key, in hex format without the leading 0x, to use for replaying transaction in the target chain. Corresponding account should be funded") + return cmd +} + +type Config struct { + SourceEVMRPC string + TargetEVMRPC string + SourceContractAddress string + TargetContractAddress string + TargetChainGateway string + LogLevel string + LogFormat string + CoreRPC string + Verify bool + PrivateKey *ecdsa.PrivateKey +} + +func (cfg Config) ValidateBasics() error { + if err := ValidateEVMAddress(cfg.SourceContractAddress); err != nil { + return fmt.Errorf("%s: flag --%s", err.Error(), FlagSourceEVMContractAddress) + } + if err := ValidateEVMAddress(cfg.TargetContractAddress); err != nil { + return fmt.Errorf("%s: flag --%s", err.Error(), FlagTargetEVMContractAddress) + } + if err := ValidateEVMAddress(cfg.TargetChainGateway); err != nil { + return fmt.Errorf("%s: flag --%s", err.Error(), FlagTargetChainGateway) + } + if cfg.Verify && cfg.CoreRPC == "" { + return fmt.Errorf("flag --%s is set but the core RPC flag --%s is not set", FlagVerify, FlagCoreRPC) + } + return nil +} + +func ValidateEVMAddress(addr string) error { + if addr == "" { + return fmt.Errorf("the EVM address cannot be empty") + } + if !ethcmn.IsHexAddress(addr) { + return errors.New("valid EVM address is required") + } + return nil +} + +func parseFlags(cmd *cobra.Command) (Config, error) { + // TODO add support for env variables + sourceContractAddress, err := cmd.Flags().GetString(FlagSourceEVMContractAddress) + if err != nil { + return Config{}, err + } + + targetContractAddress, err := cmd.Flags().GetString(FlagTargetEVMContractAddress) + if err != nil { + return Config{}, err + } + + targetChainGateway, err := cmd.Flags().GetString(FlagTargetChainGateway) + if err != nil { + return Config{}, err + } + + sourceEVMRPC, err := cmd.Flags().GetString(FlagSourceEVMRPC) + if err != nil { + return Config{}, err + } + + targetEVMRPC, err := cmd.Flags().GetString(FlagTargetEVMRPC) + if err != nil { + return Config{}, err + } + + coreRPC, err := cmd.Flags().GetString(FlagCoreRPC) + if err != nil { + return Config{}, err + } + + logLevel, err := cmd.Flags().GetString(FlagLogLevel) + if err != nil { + return Config{}, err + } + + logFormat, err := cmd.Flags().GetString(FlagLogFormat) + if err != nil { + return Config{}, err + } + + rawPrivateKey, err := cmd.Flags().GetString(FlagEVMPrivateKey) + if err != nil { + return Config{}, err + } + if rawPrivateKey == "" { + return Config{}, fmt.Errorf("please set the private key --%s", FlagEVMPrivateKey) + } + privateKey, err := crypto.HexToECDSA(rawPrivateKey) + if err != nil { + return Config{}, fmt.Errorf("failed to hex-decode Ethereum ECDSA Private Key: %w", err) + } + + return Config{ + SourceEVMRPC: sourceEVMRPC, + TargetEVMRPC: targetEVMRPC, + SourceContractAddress: sourceContractAddress, + TargetContractAddress: targetContractAddress, + TargetChainGateway: targetChainGateway, + CoreRPC: coreRPC, + LogLevel: logLevel, + LogFormat: logFormat, + PrivateKey: privateKey, + }, nil +} diff --git a/cmd/blobstream-ops/verify/cmd.go b/cmd/blobstream-ops/verify/cmd.go index d2bd9f3..2120ec4 100644 --- a/cmd/blobstream-ops/verify/cmd.go +++ b/cmd/blobstream-ops/verify/cmd.go @@ -4,22 +4,14 @@ import ( "bytes" "context" "fmt" - "io" - "os" - "os/signal" - "strings" - "syscall" + "github.com/celestiaorg/blobstream-ops/cmd/blobstream-ops/common" "github.com/celestiaorg/blobstream-ops/cmd/blobstream-ops/version" - "github.com/cosmos/cosmos-sdk/server" "github.com/ethereum/go-ethereum/accounts/abi/bind" ethcmn "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/ethclient" - "github.com/rs/zerolog" "github.com/spf13/cobra" blobstreamxwrapper "github.com/succinctlabs/blobstreamx/bindings" - tmconfig "github.com/tendermint/tendermint/config" - tmlog "github.com/tendermint/tendermint/libs/log" "github.com/tendermint/tendermint/rpc/client/http" ) @@ -58,7 +50,7 @@ func VerifyContractCommand() *cobra.Command { return err } - logger, err := GetLogger(config.LogLevel, config.LogFormat) + logger, err := common.GetLogger(config.LogLevel, config.LogFormat) if err != nil { return err } @@ -85,7 +77,7 @@ func VerifyContractCommand() *cobra.Command { } // Listen for and trap any OS signal to graceful shutdown and exit - go TrapSignal(logger, cancel) + go common.TrapSignal(logger, cancel) logger.Info( "starting verifier", @@ -189,31 +181,3 @@ func VerifyContractCommand() *cobra.Command { } return addStartFlags(command) } - -// GetLogger creates a new logger and returns -func GetLogger(level string, format string) (tmlog.Logger, error) { - logLvl, err := zerolog.ParseLevel(level) - if err != nil { - return nil, fmt.Errorf("failed to parse log level (%s): %w", level, err) - } - var logWriter io.Writer - if strings.ToLower(format) == tmconfig.LogFormatPlain { - logWriter = zerolog.ConsoleWriter{Out: os.Stderr} - } else { - logWriter = os.Stderr - } - - return server.ZeroLogWrapper{Logger: zerolog.New(logWriter).Level(logLvl).With().Timestamp().Logger()}, nil -} - -// TrapSignal will listen for any OS signal and cancel the context to exit gracefully. -func TrapSignal(logger tmlog.Logger, cancel context.CancelFunc) { - sigCh := make(chan os.Signal, 1) - - signal.Notify(sigCh, syscall.SIGTERM) - signal.Notify(sigCh, syscall.SIGINT) - - sig := <-sigCh - logger.Info("caught signal; shutting down...", "signal", sig.String()) - cancel() -} diff --git a/go.mod b/go.mod index 2535d78..6b134be 100644 --- a/go.mod +++ b/go.mod @@ -7,6 +7,7 @@ require ( github.com/rs/zerolog v1.33.0 github.com/spf13/cobra v1.8.1 github.com/succinctlabs/blobstreamx v0.0.0-20240115194141-5649c689a7fe + github.com/succinctlabs/succinctx v1.1.0 github.com/tendermint/tendermint v0.35.9 ) @@ -35,7 +36,7 @@ require ( github.com/cometbft/cometbft-db v0.9.1 // indirect github.com/confio/ics23/go v0.9.0 // indirect github.com/consensys/bavard v0.1.13 // indirect - github.com/consensys/gnark-crypto v0.12.1 // indirect + github.com/consensys/gnark-crypto v0.12.2-0.20231013160410-1f65e75b6dfb // indirect github.com/cosmos/btcutil v1.0.5 // indirect github.com/cosmos/cosmos-proto v1.0.0-beta.3 // indirect github.com/cosmos/go-bip39 v1.0.0 // indirect diff --git a/go.sum b/go.sum index 3e821b5..71fe92d 100644 --- a/go.sum +++ b/go.sum @@ -215,8 +215,8 @@ github.com/consensys/bavard v0.1.13 h1:oLhMLOFGTLdlda/kma4VOJazblc7IM5y5QPd2A/Yj github.com/consensys/bavard v0.1.13/go.mod h1:9ItSMtA/dXMAiL7BG6bqW2m3NdSEObYWoH223nGHukI= github.com/consensys/gnark-crypto v0.4.1-0.20210426202927-39ac3d4b3f1f/go.mod h1:815PAHg3wvysy0SyIqanF8gZ0Y1wjk/hrDHD/iT88+Q= github.com/consensys/gnark-crypto v0.5.3/go.mod h1:hOdPlWQV1gDLp7faZVeg8Y0iEPFaOUnCc4XeCCk96p0= -github.com/consensys/gnark-crypto v0.12.1 h1:lHH39WuuFgVHONRl3J0LRBtuYdQTumFSDtJF7HpyG8M= -github.com/consensys/gnark-crypto v0.12.1/go.mod h1:v2Gy7L/4ZRosZ7Ivs+9SfUDr0f5UlG+EM5t7MPHiLuY= +github.com/consensys/gnark-crypto v0.12.2-0.20231013160410-1f65e75b6dfb h1:f0BMgIjhZy4lSRHCXFbQst85f5agZAjtDMixQqBWNpc= +github.com/consensys/gnark-crypto v0.12.2-0.20231013160410-1f65e75b6dfb/go.mod h1:v2Gy7L/4ZRosZ7Ivs+9SfUDr0f5UlG+EM5t7MPHiLuY= github.com/containerd/continuity v0.3.0 h1:nisirsYROK15TAMVukJOUyGJjz4BNQJBVsNvAXZJ/eg= github.com/containerd/continuity v0.3.0/go.mod h1:wJEAIwKOm/pBZuBd0JmeTvnLquTB1Ag8espWhkykbPM= github.com/coreos/etcd v3.3.10+incompatible/go.mod h1:uF7uidLiAD3TWHmW31ZFd/JWoc32PjwdhPthX9715RE= @@ -994,6 +994,8 @@ github.com/subosito/gotenv v1.6.0 h1:9NlTDc1FTs4qu0DDq7AEtTPNw6SVm7uBMsUCUjABIf8 github.com/subosito/gotenv v1.6.0/go.mod h1:Dk4QP5c2W3ibzajGcXpNraDfq2IrhjMIvMSWPKKo0FU= github.com/succinctlabs/blobstreamx v0.0.0-20240115194141-5649c689a7fe h1:lRAlEnfW3/+9ZZuD41TASapvcJZKy8FHMkH9U9Wc7aY= github.com/succinctlabs/blobstreamx v0.0.0-20240115194141-5649c689a7fe/go.mod h1:8ZvZV7KHR9olj1/Hdf5wJYlYjzmLms3ue/P1gzqGxTg= +github.com/succinctlabs/succinctx v1.1.0 h1:8Mz2Ig1eDV8zEq5y7V5JfIftSE5TPjPpOvSTJ1i0lW8= +github.com/succinctlabs/succinctx v1.1.0/go.mod h1:9/IDJr415PLecNQNTMg1BLxWJvrD44uA7ra+C9g/QAs= github.com/supranational/blst v0.3.11 h1:LyU6FolezeWAhvQk0k6O/d49jqgO52MSDDfYgbeoEm4= github.com/supranational/blst v0.3.11/go.mod h1:jZJtfjgudtNl4en1tzwPIV3KjUnQUvG3/j+w+fVonLw= github.com/syndtr/goleveldb v1.0.1-0.20210819022825-2ae1ddf74ef7/go.mod h1:q4W45IWZaF22tdD+VEXcAWRA037jwmWEB5VWYORlTpc= diff --git a/replayer/evm.go b/replayer/evm.go new file mode 100644 index 0000000..f48d0c1 --- /dev/null +++ b/replayer/evm.go @@ -0,0 +1,180 @@ +package replayer + +import ( + "context" + "crypto/ecdsa" + "errors" + "fmt" + "math/big" + "time" + + "github.com/ethereum/go-ethereum/accounts/abi/bind" + ethcmn "github.com/ethereum/go-ethereum/common" + coregethtypes "github.com/ethereum/go-ethereum/core/types" + "github.com/ethereum/go-ethereum/crypto" + "github.com/ethereum/go-ethereum/ethclient" + bindings2 "github.com/succinctlabs/blobstreamx/bindings" + "github.com/succinctlabs/succinctx/bindings" + tmlog "github.com/tendermint/tendermint/libs/log" +) + +type fulfillCallArgs struct { + FunctionID [32]byte `json:"_functionId"` + Input []byte `json:"_input"` + Output []byte `json:"_output"` + Proof []byte `json:"_proof"` + CallbackAddress ethcmn.Address `json:"_callbackAddress"` + CallbackData []byte `json:"_callbackData"` +} + +func toFulfillCallArgs(args map[string]interface{}) (fulfillCallArgs, error) { + fID, ok := args["_functionId"] + if !ok { + return fulfillCallArgs{}, fmt.Errorf("couldn't find the _functionId in map") + } + input, ok := args["_input"] + if !ok { + return fulfillCallArgs{}, fmt.Errorf("couldn't find the _input in map") + } + output, ok := args["_output"] + if !ok { + return fulfillCallArgs{}, fmt.Errorf("couldn't find the _output in map") + } + proof, ok := args["_proof"] + if !ok { + return fulfillCallArgs{}, fmt.Errorf("couldn't find the _proof in map") + } + callbackAddress, ok := args["_callbackAddress"] + if !ok { + return fulfillCallArgs{}, fmt.Errorf("couldn't find the _callbackAddress in map") + } + callbackData, ok := args["_callbackData"] + if !ok { + return fulfillCallArgs{}, fmt.Errorf("couldn't find the _callbackData in map") + } + + return fulfillCallArgs{ + FunctionID: fID.([32]byte), + Input: input.([]byte), + Output: output.([]byte), + Proof: proof.([]byte), + CallbackAddress: callbackAddress.(ethcmn.Address), + CallbackData: callbackData.([]byte), + }, nil +} + +type transactOpsBuilder func(ctx context.Context, client *ethclient.Client, gasLim uint64) (*bind.TransactOpts, error) + +func newTransactOptsBuilder(privKey *ecdsa.PrivateKey) transactOpsBuilder { + publicKey := privKey.Public() + publicKeyECDSA, ok := publicKey.(*ecdsa.PublicKey) + if !ok { + panic(fmt.Errorf("invalid public key; expected: %T, got: %T", &ecdsa.PublicKey{}, publicKey)) + } + + evmAddress := crypto.PubkeyToAddress(*publicKeyECDSA) + return func(ctx context.Context, client *ethclient.Client, gasLim uint64) (*bind.TransactOpts, error) { + nonce, err := client.PendingNonceAt(ctx, evmAddress) + if err != nil { + return nil, err + } + + ethChainID, err := client.ChainID(ctx) + if err != nil { + return nil, fmt.Errorf("failed to get Ethereum chain ID: %w", err) + } + + auth, err := bind.NewKeyedTransactorWithChainID(privKey, ethChainID) + if err != nil { + return nil, fmt.Errorf("failed to create Ethereum transactor: %w", err) + } + + bigGasPrice, err := client.SuggestGasPrice(ctx) + if err != nil { + return nil, fmt.Errorf("failed to get Ethereum gas estimate: %w", err) + } + + auth.Nonce = new(big.Int).SetUint64(nonce) + auth.Value = big.NewInt(0) // in wei + auth.GasLimit = gasLim // in units + auth.GasPrice = bigGasPrice + + return auth, nil + } +} + +func submitProof( + ctx context.Context, + logger tmlog.Logger, + client *ethclient.Client, + opts *bind.TransactOpts, + succinctGateway *bindings.SuccinctGateway, + targetBlobstreamXContract *bindings2.BlobstreamX, + args fulfillCallArgs, + proofNonce int64, + waitTimeout time.Duration, +) error { + for i := 0; i < 10; i++ { + logger.Info("submitting proof", "nonce", proofNonce, "gas_price", opts.GasPrice.Int64()) + tx, err := succinctGateway.FulfillCall( + opts, + args.FunctionID, + args.Input, + args.Output, + args.Proof, + args.CallbackAddress, + args.CallbackData, + ) + if err != nil { + return err + } + _, err = waitForTransaction(ctx, logger, client, tx, waitTimeout) + if err != nil { + actualNonce, err := targetBlobstreamXContract.StateProofNonce(&bind.CallOpts{}) + if err != nil { + return err + } + if actualNonce.Int64() > proofNonce { + logger.Info("no need to replay this nonce, the contract has already committed to it", "nonce", actualNonce) + return nil + } + + if errors.Is(err, context.DeadlineExceeded) { + // we need to speed up the transaction by increasing the gas price + bigGasPrice, err := client.SuggestGasPrice(ctx) + if err != nil { + return fmt.Errorf("failed to get Ethereum gas estimate: %w", err) + } + + // 20% increase of the suggested gas price + opts.GasPrice = big.NewInt(bigGasPrice.Int64() + bigGasPrice.Int64()/5) + continue + } else { + return err + } + } + return nil + } + return fmt.Errorf("failed to submit proof nonce %d", proofNonce) +} + +func waitForTransaction( + ctx context.Context, + logger tmlog.Logger, + backend bind.DeployBackend, + tx *coregethtypes.Transaction, + timeout time.Duration, +) (*coregethtypes.Receipt, error) { + logger.Debug("waiting for transaction to be confirmed", "hash", tx.Hash().String()) + + ctx, cancel := context.WithTimeout(ctx, timeout) + defer cancel() + + receipt, err := bind.WaitMined(ctx, backend, tx) + if err == nil && receipt != nil && receipt.Status == 1 { + logger.Info("transaction confirmed", "hash", tx.Hash().String(), "block", receipt.BlockNumber.Uint64()) + return receipt, nil + } + + return receipt, err +} diff --git a/replayer/replayer.go b/replayer/replayer.go new file mode 100644 index 0000000..8248c54 --- /dev/null +++ b/replayer/replayer.go @@ -0,0 +1,322 @@ +package replayer + +import ( + "bytes" + "context" + "crypto/ecdsa" + "fmt" + "time" + + "github.com/ethereum/go-ethereum/accounts/abi/bind" + ethcmn "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/ethclient" + blobstreamxwrapper "github.com/succinctlabs/blobstreamx/bindings" + "github.com/succinctlabs/succinctx/bindings" + tmlog "github.com/tendermint/tendermint/libs/log" + "github.com/tendermint/tendermint/rpc/client/http" +) + +func Follow( + ctx context.Context, + logger tmlog.Logger, + verify bool, + trpc *http.HTTP, + sourceEVMClient *ethclient.Client, + targetEVMClient *ethclient.Client, + sourceBlobstreamContractAddress string, + targetBlobstreamContractAddress string, + targetChainGatewayAddress string, + privateKey *ecdsa.PrivateKey, +) error { + logger.Info("listening for new proofs on the source chain") + sourceBlobstreamX, err := blobstreamxwrapper.NewBlobstreamX(ethcmn.HexToAddress(sourceBlobstreamContractAddress), sourceEVMClient) + if err != nil { + return err + } + + targetBlobstreamX, err := blobstreamxwrapper.NewBlobstreamX(ethcmn.HexToAddress(targetBlobstreamContractAddress), sourceEVMClient) + if err != nil { + return err + } + + newEvents := make(chan *blobstreamxwrapper.BlobstreamXDataCommitmentStored) + subscription, err := sourceBlobstreamX.WatchDataCommitmentStored(&bind.WatchOpts{Context: ctx}, newEvents, nil, nil, nil) + if err != nil { + return err + } + defer subscription.Unsubscribe() + + gateway, err := bindings.NewSuccinctGateway(ethcmn.HexToAddress(targetChainGatewayAddress), targetEVMClient) + if err != nil { + return err + } + abi, err := bindings.SuccinctGatewayMetaData.GetAbi() + if err != nil { + return err + } + + for { + select { + case <-ctx.Done(): + return nil + case event := <-newEvents: + latestTargetContractNonce, err := targetBlobstreamX.LatestBlock(&bind.CallOpts{Context: ctx}) + if err != nil { + return err + } + if event.ProofNonce.Int64() < int64(latestTargetContractNonce) { + logger.Info("the target contract is at a higher nonce, waiting for new events", "event_nonce", event.ProofNonce, "target_contract_latest_nonce", latestTargetContractNonce) + continue + } else if event.ProofNonce.Int64() > int64(latestTargetContractNonce) { + logger.Info("the target contract needs to catchup", "event_nonce", event.ProofNonce, "target_contract_latest_nonce", latestTargetContractNonce) + err = Catchup(ctx, logger, verify, trpc, sourceEVMClient, targetEVMClient, sourceBlobstreamContractAddress, targetBlobstreamContractAddress, targetChainGatewayAddress, privateKey) + if err != nil { + return err + } + } + logger.Debug("getting transaction containing the proof", "nonce", event.ProofNonce.Int64(), "hash", event.Raw.TxHash.Hex()) + tx, _, err := sourceEVMClient.TransactionByHash(ctx, event.Raw.TxHash) + if err != nil { + return err + } + + logger.Debug("decoding the proof") + rawMap := make(map[string]interface{}) + err = abi.UnpackIntoMap(rawMap, "fulfillCall", tx.Data()) + if err != nil { + return err + } + + decodedArgs, err := toFulfillCallArgs(rawMap) + if err != nil { + return err + } + + // update the address to be the target blobstreamX contract for the callback + decodedArgs.CallbackAddress = ethcmn.HexToAddress(targetBlobstreamContractAddress) + + logger.Info("replaying the proof", "nonce", event.ProofNonce.Int64()) + opts, err := newTransactOptsBuilder(privateKey)(ctx, targetEVMClient, 25000000) + if err != nil { + return err + } + err = submitProof( + ctx, + logger, + targetEVMClient, + opts, + gateway, + targetBlobstreamX, + decodedArgs, + event.ProofNonce.Int64(), + 3*time.Minute, + ) + if err != nil { + return err + } + logger.Info("successfully replayed proof", "nonce", event.ProofNonce.Int64()) + } + } +} + +func Catchup( + ctx context.Context, + logger tmlog.Logger, + verify bool, + trpc *http.HTTP, + sourceEVMClient *ethclient.Client, + targetEVMClient *ethclient.Client, + sourceBlobstreamContractAddress string, + targetBlobstreamContractAddress string, + targetChainGatewayAddress string, + privateKey *ecdsa.PrivateKey, +) error { + filterRange := int64(5000) + + lookupStartHeight, err := sourceEVMClient.BlockNumber(ctx) + if err != nil { + return err + } + + sourceBlobstreamX, err := blobstreamxwrapper.NewBlobstreamX(ethcmn.HexToAddress(sourceBlobstreamContractAddress), sourceEVMClient) + if err != nil { + return err + } + + targetBlobstreamX, err := blobstreamxwrapper.NewBlobstreamX(ethcmn.HexToAddress(targetBlobstreamContractAddress), sourceEVMClient) + if err != nil { + return err + } + + latestSourceContractNonce, err := sourceBlobstreamX.LatestBlock(&bind.CallOpts{Context: ctx}) + if err != nil { + return err + } + + latestTargetContractNonce, err := targetBlobstreamX.LatestBlock(&bind.CallOpts{Context: ctx}) + if err != nil { + return err + } + + logger.Info("catching up", "latest_source_contract_nonce", latestSourceContractNonce, "latest_target_contract_nonce", latestTargetContractNonce) + + // TODO: this could be improved in the future to only get the events needed + dataCommitmentEvents, err := getAllDataCommitmentStoredEvents( + ctx, + logger, + &sourceBlobstreamX.BlobstreamXFilterer, + int64(lookupStartHeight), + filterRange, + int64(latestSourceContractNonce), + ) + if err != nil { + return err + } + + gateway, err := bindings.NewSuccinctGateway(ethcmn.HexToAddress(targetChainGatewayAddress), targetEVMClient) + if err != nil { + return err + } + abi, err := bindings.SuccinctGatewayMetaData.GetAbi() + if err != nil { + return err + } + + for nonce := latestTargetContractNonce; nonce < latestSourceContractNonce; nonce++ { + event, exists := dataCommitmentEvents[int(nonce)] + if !exists { + return fmt.Errorf("couldn't find nonce %d in events", nonce) + } + + if verify { + logger.Info("verifying data root tuple root", "nonce", event.ProofNonce, "start_block", event.StartBlock, "end_block", event.EndBlock) + coreDataCommitment, err := trpc.DataCommitment(ctx, event.StartBlock, event.EndBlock) + if err != nil { + return err + } + if bytes.Equal(coreDataCommitment.DataCommitment.Bytes(), event.DataCommitment[:]) { + logger.Info("data commitment verified") + } else { + logger.Error("data commitment mismatch!! quitting", "nonce", event.ProofNonce) + return fmt.Errorf("data commitment mistmatch. nonce %d", event.ProofNonce) + } + } + + latestSourceBlock, err := sourceBlobstreamX.LatestBlock(&bind.CallOpts{Context: ctx}) + if err != nil { + return err + } + + latestTargetBlock, err := targetBlobstreamX.LatestBlock(&bind.CallOpts{Context: ctx}) + if err != nil { + return err + } + if latestTargetBlock > latestSourceBlock { + // contract already up to date + return nil + } + if latestTargetBlock != event.StartBlock { + return fmt.Errorf("can't replay event to contract. mismatch latest target block %d and start block %d", latestTargetBlock, event.StartBlock) + } + + logger.Debug("getting transaction containing the proof", "nonce", nonce, "hash", event.Raw.TxHash.Hex()) + tx, _, err := sourceEVMClient.TransactionByHash(ctx, event.Raw.TxHash) + if err != nil { + return err + } + + logger.Debug("decoding the proof") + rawMap := make(map[string]interface{}) + err = abi.UnpackIntoMap(rawMap, "fulfillCall", tx.Data()) + if err != nil { + return err + } + + decodedArgs, err := toFulfillCallArgs(rawMap) + if err != nil { + return err + } + + // update the address to be the target blobstreamX contract for the callback + decodedArgs.CallbackAddress = ethcmn.HexToAddress(targetBlobstreamContractAddress) + + logger.Info("replaying the proof", "nonce", nonce) + opts, err := newTransactOptsBuilder(privateKey)(ctx, targetEVMClient, 25000000) + if err != nil { + return err + } + err = submitProof( + ctx, + logger, + targetEVMClient, + opts, + gateway, + targetBlobstreamX, + decodedArgs, + int64(nonce), + 3*time.Minute, + ) + if err != nil { + return err + } + } + + latestTargetContractNonce, err = targetBlobstreamX.LatestBlock(&bind.CallOpts{Context: ctx}) + if err != nil { + return err + } + + logger.Info("contract up to date", "latest_nonce", latestTargetContractNonce) + return nil +} + +func getAllDataCommitmentStoredEvents( + ctx context.Context, + logger tmlog.Logger, + blobstreamLogFilterer *blobstreamxwrapper.BlobstreamXFilterer, + lookupStartHeight int64, + filterRange int64, + latestSourceContractNonce int64, +) (map[int]blobstreamxwrapper.BlobstreamXDataCommitmentStored, error) { + logger.Info("querying all the data commitment stored events in the source contract...") + dataCommitmentEvents := make(map[int]blobstreamxwrapper.BlobstreamXDataCommitmentStored) + for eventLookupEnd := lookupStartHeight; eventLookupEnd > 0; eventLookupEnd -= filterRange { + logger.Debug("querying all the data commitment stored events", "evm_block_start", eventLookupEnd, "evm_block_end", eventLookupEnd-filterRange) + rangeStart := eventLookupEnd - filterRange + rangeEnd := uint64(eventLookupEnd) + events, err := blobstreamLogFilterer.FilterDataCommitmentStored( + &bind.FilterOpts{ + Context: ctx, + Start: uint64(rangeStart), + End: &rangeEnd, + }, + nil, + nil, + nil, + ) + if err != nil { + return nil, err + } + + for { + if events.Event != nil { + _, exists := dataCommitmentEvents[int(events.Event.ProofNonce.Int64())] + if exists { + continue + } else { + dataCommitmentEvents[int(events.Event.ProofNonce.Int64())] = *events.Event + } + } + if !events.Next() { + break + } + } + if int64(len(dataCommitmentEvents)) >= latestSourceContractNonce-1 { + // found all the events + logger.Info("found all events", "count", len(dataCommitmentEvents)) + break + } + logger.Info("found events", "count", len(dataCommitmentEvents)) + } + return dataCommitmentEvents, nil +} From 0c93b679f70d4e631774e148bae2b970f2ca540a Mon Sep 17 00:00:00 2001 From: sweexordious Date: Tue, 2 Jul 2024 13:51:34 +0100 Subject: [PATCH 2/6] fix: use blocks instead of heights to find the right event --- cmd/blobstream-ops/replay/cmd.go | 16 ++++---- cmd/blobstream-ops/root/cmd.go | 2 + {replayer => replay}/evm.go | 2 +- {replayer => replay}/replayer.go | 65 +++++++++++++++++--------------- 4 files changed, 45 insertions(+), 40 deletions(-) rename {replayer => replay}/evm.go (99%) rename {replayer => replay}/replayer.go (74%) diff --git a/cmd/blobstream-ops/replay/cmd.go b/cmd/blobstream-ops/replay/cmd.go index 0e6a67d..566f7fc 100644 --- a/cmd/blobstream-ops/replay/cmd.go +++ b/cmd/blobstream-ops/replay/cmd.go @@ -6,7 +6,7 @@ import ( "github.com/celestiaorg/blobstream-ops/cmd/blobstream-ops/common" "github.com/celestiaorg/blobstream-ops/cmd/blobstream-ops/version" - "github.com/celestiaorg/blobstream-ops/replayer" + "github.com/celestiaorg/blobstream-ops/replay" "github.com/ethereum/go-ethereum/accounts/abi/bind" ethcmn "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/ethclient" @@ -89,17 +89,17 @@ func Command() *cobra.Command { config.CoreRPC, ) - latestSourceNonce, err := sourceBlobstreamReader.StateProofNonce(&bind.CallOpts{}) + latestSourceBlock, err := sourceBlobstreamReader.LatestBlock(&bind.CallOpts{}) if err != nil { return err } - logger.Info("found latest source blobstreamX contract nonce", "nonce", latestSourceNonce.Int64()) + logger.Info("found source blobstreamX contract", "latest_block", latestSourceBlock) - latestTargetNonce, err := targetBlobstreamReader.StateProofNonce(&bind.CallOpts{}) + latestTargetBlock, err := targetBlobstreamReader.LatestBlock(&bind.CallOpts{}) if err != nil { return err } - logger.Info("found latest target blobstreamX contract nonce", "nonce", latestTargetNonce.Int64()) + logger.Info("found target blobstreamX contract", "latest_block", latestTargetBlock) var trpc *http.HTTP if config.Verify { @@ -119,8 +119,8 @@ func Command() *cobra.Command { }(trpc) } - if latestSourceNonce.Int64() > latestTargetNonce.Int64() { - err = replayer.Catchup( + if latestSourceBlock > latestTargetBlock { + err = replay.Catchup( ctx, logger, config.Verify, @@ -139,7 +139,7 @@ func Command() *cobra.Command { logger.Info("target contract is already up to date") } - return replayer.Follow( + return replay.Follow( ctx, logger, config.Verify, diff --git a/cmd/blobstream-ops/root/cmd.go b/cmd/blobstream-ops/root/cmd.go index fe44046..cb6a917 100644 --- a/cmd/blobstream-ops/root/cmd.go +++ b/cmd/blobstream-ops/root/cmd.go @@ -1,6 +1,7 @@ package root import ( + "github.com/celestiaorg/blobstream-ops/cmd/blobstream-ops/replay" "github.com/celestiaorg/blobstream-ops/cmd/blobstream-ops/verify" "github.com/celestiaorg/blobstream-ops/cmd/blobstream-ops/version" "github.com/spf13/cobra" @@ -18,6 +19,7 @@ func Cmd() *cobra.Command { rootCmd.AddCommand( version.Cmd, verify.Command(), + replay.Command(), ) rootCmd.SetHelpCommand(&cobra.Command{}) diff --git a/replayer/evm.go b/replay/evm.go similarity index 99% rename from replayer/evm.go rename to replay/evm.go index f48d0c1..359f90e 100644 --- a/replayer/evm.go +++ b/replay/evm.go @@ -1,4 +1,4 @@ -package replayer +package replay import ( "context" diff --git a/replayer/replayer.go b/replay/replayer.go similarity index 74% rename from replayer/replayer.go rename to replay/replayer.go index 8248c54..eb41718 100644 --- a/replayer/replayer.go +++ b/replay/replayer.go @@ -1,4 +1,4 @@ -package replayer +package replay import ( "bytes" @@ -60,21 +60,21 @@ func Follow( case <-ctx.Done(): return nil case event := <-newEvents: - latestTargetContractNonce, err := targetBlobstreamX.LatestBlock(&bind.CallOpts{Context: ctx}) + latestTargetContractBlock, err := targetBlobstreamX.LatestBlock(&bind.CallOpts{Context: ctx}) if err != nil { return err } - if event.ProofNonce.Int64() < int64(latestTargetContractNonce) { - logger.Info("the target contract is at a higher nonce, waiting for new events", "event_nonce", event.ProofNonce, "target_contract_latest_nonce", latestTargetContractNonce) + if event.StartBlock < latestTargetContractBlock { + logger.Info("the target contract is at a higher block, waiting for new events", "event_start_block", event.StartBlock, "target_contract_latest_block", latestTargetContractBlock) continue - } else if event.ProofNonce.Int64() > int64(latestTargetContractNonce) { - logger.Info("the target contract needs to catchup", "event_nonce", event.ProofNonce, "target_contract_latest_nonce", latestTargetContractNonce) + } else if event.StartBlock > latestTargetContractBlock { + logger.Info("the target contract needs to catchup", "event_start_block", event.StartBlock, "target_contract_latest_block", latestTargetContractBlock) err = Catchup(ctx, logger, verify, trpc, sourceEVMClient, targetEVMClient, sourceBlobstreamContractAddress, targetBlobstreamContractAddress, targetChainGatewayAddress, privateKey) if err != nil { return err } } - logger.Debug("getting transaction containing the proof", "nonce", event.ProofNonce.Int64(), "hash", event.Raw.TxHash.Hex()) + logger.Debug("getting transaction containing the proof", "nonce", event.ProofNonce.Int64(), "hash", event.Raw.TxHash.Hex(), "start_block", event.StartBlock) tx, _, err := sourceEVMClient.TransactionByHash(ctx, event.Raw.TxHash) if err != nil { return err @@ -148,17 +148,22 @@ func Catchup( return err } - latestSourceContractNonce, err := sourceBlobstreamX.LatestBlock(&bind.CallOpts{Context: ctx}) + latestSourceContractBlock, err := sourceBlobstreamX.LatestBlock(&bind.CallOpts{Context: ctx}) if err != nil { return err } - latestTargetContractNonce, err := targetBlobstreamX.LatestBlock(&bind.CallOpts{Context: ctx}) + latestTargetContractBlock, err := targetBlobstreamX.LatestBlock(&bind.CallOpts{Context: ctx}) if err != nil { return err } - logger.Info("catching up", "latest_source_contract_nonce", latestSourceContractNonce, "latest_target_contract_nonce", latestTargetContractNonce) + logger.Info("catching up", "latest_source_contract_block", latestSourceContractBlock, "latest_target_contract_block", latestTargetContractBlock) + + latestSourceContractNonce, err := sourceBlobstreamX.StateProofNonce(&bind.CallOpts{Context: ctx}) + if err != nil { + return err + } // TODO: this could be improved in the future to only get the events needed dataCommitmentEvents, err := getAllDataCommitmentStoredEvents( @@ -167,7 +172,7 @@ func Catchup( &sourceBlobstreamX.BlobstreamXFilterer, int64(lookupStartHeight), filterRange, - int64(latestSourceContractNonce), + latestSourceContractNonce.Int64(), ) if err != nil { return err @@ -182,14 +187,14 @@ func Catchup( return err } - for nonce := latestTargetContractNonce; nonce < latestSourceContractNonce; nonce++ { - event, exists := dataCommitmentEvents[int(nonce)] + for startHeight := latestTargetContractBlock; startHeight < latestSourceContractBlock; { + event, exists := dataCommitmentEvents[int64(startHeight)] if !exists { - return fmt.Errorf("couldn't find nonce %d in events", nonce) + return fmt.Errorf("couldn't find a proof that starts at height %d in events", startHeight) } if verify { - logger.Info("verifying data root tuple root", "nonce", event.ProofNonce, "start_block", event.StartBlock, "end_block", event.EndBlock) + logger.Info("verifying data root tuple root", "proof_nonce_in_source_contract", event.ProofNonce, "start_block", event.StartBlock, "end_block", event.EndBlock) coreDataCommitment, err := trpc.DataCommitment(ctx, event.StartBlock, event.EndBlock) if err != nil { return err @@ -197,8 +202,8 @@ func Catchup( if bytes.Equal(coreDataCommitment.DataCommitment.Bytes(), event.DataCommitment[:]) { logger.Info("data commitment verified") } else { - logger.Error("data commitment mismatch!! quitting", "nonce", event.ProofNonce) - return fmt.Errorf("data commitment mistmatch. nonce %d", event.ProofNonce) + logger.Error("data commitment mismatch!! quitting", "proof_nonce_in_source_contract", event.ProofNonce, "start_block", event.StartBlock, "end_block", event.EndBlock) + return fmt.Errorf("data commitment mistmatch. start height %d end height %d", event.StartBlock, event.EndBlock) } } @@ -207,19 +212,16 @@ func Catchup( return err } - latestTargetBlock, err := targetBlobstreamX.LatestBlock(&bind.CallOpts{Context: ctx}) + latestTargetContractBlock, err = targetBlobstreamX.LatestBlock(&bind.CallOpts{Context: ctx}) if err != nil { return err } - if latestTargetBlock > latestSourceBlock { + if latestTargetContractBlock >= latestSourceBlock { // contract already up to date return nil } - if latestTargetBlock != event.StartBlock { - return fmt.Errorf("can't replay event to contract. mismatch latest target block %d and start block %d", latestTargetBlock, event.StartBlock) - } - logger.Debug("getting transaction containing the proof", "nonce", nonce, "hash", event.Raw.TxHash.Hex()) + logger.Debug("getting transaction containing the proof", "startHeight", startHeight, "hash", event.Raw.TxHash.Hex()) tx, _, err := sourceEVMClient.TransactionByHash(ctx, event.Raw.TxHash) if err != nil { return err @@ -240,7 +242,7 @@ func Catchup( // update the address to be the target blobstreamX contract for the callback decodedArgs.CallbackAddress = ethcmn.HexToAddress(targetBlobstreamContractAddress) - logger.Info("replaying the proof", "nonce", nonce) + logger.Info("replaying the proof", "startHeight", startHeight) opts, err := newTransactOptsBuilder(privateKey)(ctx, targetEVMClient, 25000000) if err != nil { return err @@ -253,20 +255,21 @@ func Catchup( gateway, targetBlobstreamX, decodedArgs, - int64(nonce), + int64(startHeight), 3*time.Minute, ) if err != nil { return err } + startHeight = event.EndBlock } - latestTargetContractNonce, err = targetBlobstreamX.LatestBlock(&bind.CallOpts{Context: ctx}) + latestTargetContractBlock, err = targetBlobstreamX.LatestBlock(&bind.CallOpts{Context: ctx}) if err != nil { return err } - logger.Info("contract up to date", "latest_nonce", latestTargetContractNonce) + logger.Info("contract up to date", "latest_nonce", latestTargetContractBlock) return nil } @@ -277,9 +280,9 @@ func getAllDataCommitmentStoredEvents( lookupStartHeight int64, filterRange int64, latestSourceContractNonce int64, -) (map[int]blobstreamxwrapper.BlobstreamXDataCommitmentStored, error) { +) (map[int64]blobstreamxwrapper.BlobstreamXDataCommitmentStored, error) { logger.Info("querying all the data commitment stored events in the source contract...") - dataCommitmentEvents := make(map[int]blobstreamxwrapper.BlobstreamXDataCommitmentStored) + dataCommitmentEvents := make(map[int64]blobstreamxwrapper.BlobstreamXDataCommitmentStored) for eventLookupEnd := lookupStartHeight; eventLookupEnd > 0; eventLookupEnd -= filterRange { logger.Debug("querying all the data commitment stored events", "evm_block_start", eventLookupEnd, "evm_block_end", eventLookupEnd-filterRange) rangeStart := eventLookupEnd - filterRange @@ -300,11 +303,11 @@ func getAllDataCommitmentStoredEvents( for { if events.Event != nil { - _, exists := dataCommitmentEvents[int(events.Event.ProofNonce.Int64())] + _, exists := dataCommitmentEvents[int64(events.Event.StartBlock)] if exists { continue } else { - dataCommitmentEvents[int(events.Event.ProofNonce.Int64())] = *events.Event + dataCommitmentEvents[int64(events.Event.StartBlock)] = *events.Event } } if !events.Next() { From 34146400fcea63834953574e0ddd5fcafdbee5e1 Mon Sep 17 00:00:00 2001 From: sweexordious Date: Tue, 2 Jul 2024 17:21:11 +0100 Subject: [PATCH 3/6] fix: logs + data decrypt --- replay/evm.go | 3 ++- replay/replayer.go | 5 +++-- 2 files changed, 5 insertions(+), 3 deletions(-) diff --git a/replay/evm.go b/replay/evm.go index 359f90e..583919f 100644 --- a/replay/evm.go +++ b/replay/evm.go @@ -115,7 +115,7 @@ func submitProof( waitTimeout time.Duration, ) error { for i := 0; i < 10; i++ { - logger.Info("submitting proof", "nonce", proofNonce, "gas_price", opts.GasPrice.Int64()) + logger.Info("submitting transaction for proof", "nonce", proofNonce, "gas_price", opts.GasPrice.Int64()) tx, err := succinctGateway.FulfillCall( opts, args.FunctionID, @@ -128,6 +128,7 @@ func submitProof( if err != nil { return err } + logger.Info("transaction submitted", "hash", tx.Hash().Hex()) _, err = waitForTransaction(ctx, logger, client, tx, waitTimeout) if err != nil { actualNonce, err := targetBlobstreamXContract.StateProofNonce(&bind.CallOpts{}) diff --git a/replay/replayer.go b/replay/replayer.go index eb41718..12a7e45 100644 --- a/replay/replayer.go +++ b/replay/replayer.go @@ -82,7 +82,7 @@ func Follow( logger.Debug("decoding the proof") rawMap := make(map[string]interface{}) - err = abi.UnpackIntoMap(rawMap, "fulfillCall", tx.Data()) + err = abi.UnpackIntoMap(rawMap, "fulfillCall", tx.Data()[4:]) if err != nil { return err } @@ -229,7 +229,8 @@ func Catchup( logger.Debug("decoding the proof") rawMap := make(map[string]interface{}) - err = abi.UnpackIntoMap(rawMap, "fulfillCall", tx.Data()) + inputArgs := abi.Methods["fulfillCall"].Inputs + err = inputArgs.UnpackIntoMap(rawMap, tx.Data()[4:]) if err != nil { return err } From 98a1ada5f6749fd10fa900700c17d52aaf93dac9 Mon Sep 17 00:00:00 2001 From: sweexordious Date: Thu, 4 Jul 2024 11:00:26 +0100 Subject: [PATCH 4/6] fix: working listener + more control --- cmd/blobstream-ops/replay/cmd.go | 9 +++-- cmd/blobstream-ops/replay/config.go | 55 +++++++++++++++++++++++++++++ cmd/blobstream-ops/verify/cmd.go | 2 +- replay/evm.go | 10 ++++-- 4 files changed, 70 insertions(+), 6 deletions(-) diff --git a/cmd/blobstream-ops/replay/cmd.go b/cmd/blobstream-ops/replay/cmd.go index 566f7fc..bc4a290 100644 --- a/cmd/blobstream-ops/replay/cmd.go +++ b/cmd/blobstream-ops/replay/cmd.go @@ -2,7 +2,6 @@ package replay import ( "context" - "fmt" "github.com/celestiaorg/blobstream-ops/cmd/blobstream-ops/common" "github.com/celestiaorg/blobstream-ops/cmd/blobstream-ops/version" @@ -114,7 +113,7 @@ func Command() *cobra.Command { defer func(trpc *http.HTTP) { err := trpc.Stop() if err != nil { - fmt.Println(err.Error()) + logger.Error("error stopping tendermint RPC", "err", err.Error()) } }(trpc) } @@ -131,6 +130,9 @@ func Command() *cobra.Command { config.TargetContractAddress, config.TargetChainGateway, config.PrivateKey, + config.HeaderRangeFunctionID, + config.NextHeaderFunctionID, + config.FilterRange, ) if err != nil { return err @@ -150,6 +152,9 @@ func Command() *cobra.Command { config.TargetContractAddress, config.TargetChainGateway, config.PrivateKey, + config.HeaderRangeFunctionID, + config.NextHeaderFunctionID, + config.FilterRange, ) }, } diff --git a/cmd/blobstream-ops/replay/config.go b/cmd/blobstream-ops/replay/config.go index dbd1b80..d200aab 100644 --- a/cmd/blobstream-ops/replay/config.go +++ b/cmd/blobstream-ops/replay/config.go @@ -2,6 +2,7 @@ package replay import ( "crypto/ecdsa" + "encoding/hex" "errors" "fmt" @@ -18,6 +19,10 @@ const ( FlagTargetEVMContractAddress = "evm.target.contract-address" FlagTargetChainGateway = "evm.target.gateway" FlagEVMPrivateKey = "evm.private-key" + FlagEVMFilterRange = "evm.filter-range" + + FlagHeaderRangeFunctionID = "circuits.header-range.functionID" + FlagNextHeaderFunctionID = "circuits.next-header.functionID" FlagVerify = "verify" @@ -50,6 +55,9 @@ func addFlags(cmd *cobra.Command) *cobra.Command { ) cmd.Flags().Bool(FlagVerify, false, "Set to verify the commitments before replaying their proofs. Require the core rpc flag to be set") cmd.Flags().String(FlagEVMPrivateKey, "", "Specify the EVM private key, in hex format without the leading 0x, to use for replaying transaction in the target chain. Corresponding account should be funded") + cmd.Flags().String(FlagHeaderRangeFunctionID, "", "Specify the function ID of the header range circuit in the target BlobstreamX contract, in hex format without the leading 0x") + cmd.Flags().String(FlagNextHeaderFunctionID, "", "Specify the function ID of the next header circuit in the target BlobstreamX contract, in hex format without the leading 0x") + cmd.Flags().Int64(FlagEVMFilterRange, 5000, "Specify the eth_getLogs filter range") return cmd } @@ -64,6 +72,9 @@ type Config struct { CoreRPC string Verify bool PrivateKey *ecdsa.PrivateKey + HeaderRangeFunctionID [32]byte + NextHeaderFunctionID [32]byte + FilterRange int64 } func (cfg Config) ValidateBasics() error { @@ -146,6 +157,46 @@ func parseFlags(cmd *cobra.Command) (Config, error) { return Config{}, fmt.Errorf("failed to hex-decode Ethereum ECDSA Private Key: %w", err) } + strHeaderRange, err := cmd.Flags().GetString(FlagHeaderRangeFunctionID) + if err != nil { + return Config{}, err + } + if strHeaderRange == "" { + return Config{}, fmt.Errorf("please set the header range function ID --%s", FlagHeaderRangeFunctionID) + } + decodedHeaderRange, err := hex.DecodeString(strHeaderRange) + if err != nil { + return Config{}, err + } + var bzHeaderRange [32]byte + copy(bzHeaderRange[:], decodedHeaderRange) + + strNextHeader, err := cmd.Flags().GetString(FlagNextHeaderFunctionID) + if err != nil { + return Config{}, err + } + if strNextHeader == "" { + return Config{}, fmt.Errorf("please set the header range function ID --%s", FlagHeaderRangeFunctionID) + } + decodedNextHeader, err := hex.DecodeString(strNextHeader) + if err != nil { + return Config{}, err + } + var bzNextHeader [32]byte + copy(bzNextHeader[:], decodedNextHeader) + + filterRange, err := cmd.Flags().GetInt64(FlagEVMFilterRange) + if err != nil { + return Config{}, err + } + + verify, err := cmd.Flags().GetBool(FlagVerify) + if err != nil { + return Config{}, err + } + + // TODO add rate limiting flag + // TODO add gas price multiplier flag return Config{ SourceEVMRPC: sourceEVMRPC, TargetEVMRPC: targetEVMRPC, @@ -156,5 +207,9 @@ func parseFlags(cmd *cobra.Command) (Config, error) { LogLevel: logLevel, LogFormat: logFormat, PrivateKey: privateKey, + NextHeaderFunctionID: bzNextHeader, + HeaderRangeFunctionID: bzHeaderRange, + FilterRange: filterRange, + Verify: verify, }, nil } diff --git a/cmd/blobstream-ops/verify/cmd.go b/cmd/blobstream-ops/verify/cmd.go index 2120ec4..5fe2c80 100644 --- a/cmd/blobstream-ops/verify/cmd.go +++ b/cmd/blobstream-ops/verify/cmd.go @@ -154,7 +154,7 @@ func VerifyContractCommand() *cobra.Command { defer func(trpc *http.HTTP) { err := trpc.Stop() if err != nil { - fmt.Println(err.Error()) + logger.Error("error stopping tendermint RPC", "err", err.Error()) } }(trpc) diff --git a/replay/evm.go b/replay/evm.go index 583919f..498afcf 100644 --- a/replay/evm.go +++ b/replay/evm.go @@ -131,9 +131,9 @@ func submitProof( logger.Info("transaction submitted", "hash", tx.Hash().Hex()) _, err = waitForTransaction(ctx, logger, client, tx, waitTimeout) if err != nil { - actualNonce, err := targetBlobstreamXContract.StateProofNonce(&bind.CallOpts{}) - if err != nil { - return err + actualNonce, err2 := targetBlobstreamXContract.StateProofNonce(&bind.CallOpts{}) + if err2 != nil { + return err2 } if actualNonce.Int64() > proofNonce { logger.Info("no need to replay this nonce, the contract has already committed to it", "nonce", actualNonce) @@ -141,6 +141,7 @@ func submitProof( } if errors.Is(err, context.DeadlineExceeded) { + logger.Debug("transaction still not included, accelerating...") // we need to speed up the transaction by increasing the gas price bigGasPrice, err := client.SuggestGasPrice(ctx) if err != nil { @@ -149,8 +150,11 @@ func submitProof( // 20% increase of the suggested gas price opts.GasPrice = big.NewInt(bigGasPrice.Int64() + bigGasPrice.Int64()/5) + logger.Debug("transaction still not included, accelerating...", "new_gas_price", opts.GasPrice.Int64()) continue } else { + logger.Error("transaction failed", "err", err.Error()) + logger.Debug("retrying...") return err } } From 7c4b57d5c64fb400af7290360e00eb9677316a2b Mon Sep 17 00:00:00 2001 From: sweexordious Date: Thu, 4 Jul 2024 11:01:07 +0100 Subject: [PATCH 5/6] fix: working listener + more control --- replay/replayer.go | 91 ++++++++++++++++++++++++++++++++++++++++++---- 1 file changed, 83 insertions(+), 8 deletions(-) diff --git a/replay/replayer.go b/replay/replayer.go index 12a7e45..49716e8 100644 --- a/replay/replayer.go +++ b/replay/replayer.go @@ -4,6 +4,7 @@ import ( "bytes" "context" "crypto/ecdsa" + "encoding/hex" "fmt" "time" @@ -27,6 +28,9 @@ func Follow( targetBlobstreamContractAddress string, targetChainGatewayAddress string, privateKey *ecdsa.PrivateKey, + headerRangeFunctionID [32]byte, + nextHeaderFunctionID [32]byte, + filterRange int64, ) error { logger.Info("listening for new proofs on the source chain") sourceBlobstreamX, err := blobstreamxwrapper.NewBlobstreamX(ethcmn.HexToAddress(sourceBlobstreamContractAddress), sourceEVMClient) @@ -69,10 +73,33 @@ func Follow( continue } else if event.StartBlock > latestTargetContractBlock { logger.Info("the target contract needs to catchup", "event_start_block", event.StartBlock, "target_contract_latest_block", latestTargetContractBlock) - err = Catchup(ctx, logger, verify, trpc, sourceEVMClient, targetEVMClient, sourceBlobstreamContractAddress, targetBlobstreamContractAddress, targetChainGatewayAddress, privateKey) + err = Catchup( + ctx, + logger, + verify, + trpc, + sourceEVMClient, + targetEVMClient, + sourceBlobstreamContractAddress, + targetBlobstreamContractAddress, + targetChainGatewayAddress, + privateKey, + headerRangeFunctionID, + nextHeaderFunctionID, + filterRange, + ) if err != nil { return err } + latestTargetContractBlock, err = targetBlobstreamX.LatestBlock(&bind.CallOpts{Context: ctx}) + if err != nil { + return err + } + if event.EndBlock == latestTargetContractBlock { + // the contract is already up to date + logger.Info("contract up to date", "target_contract_latest_block", event.EndBlock) + continue + } } logger.Debug("getting transaction containing the proof", "nonce", event.ProofNonce.Int64(), "hash", event.Raw.TxHash.Hex(), "start_block", event.StartBlock) tx, _, err := sourceEVMClient.TransactionByHash(ctx, event.Raw.TxHash) @@ -82,7 +109,8 @@ func Follow( logger.Debug("decoding the proof") rawMap := make(map[string]interface{}) - err = abi.UnpackIntoMap(rawMap, "fulfillCall", tx.Data()[4:]) + inputArgs := abi.Methods["fulfillCall"].Inputs + err = inputArgs.UnpackIntoMap(rawMap, tx.Data()[4:]) if err != nil { return err } @@ -94,6 +122,13 @@ func Follow( // update the address to be the target blobstreamX contract for the callback decodedArgs.CallbackAddress = ethcmn.HexToAddress(targetBlobstreamContractAddress) + if event.EndBlock-event.StartBlock > 1 { + // this is a header range proof + decodedArgs.FunctionID = headerRangeFunctionID + } else { + // this is a next header proof + decodedArgs.FunctionID = nextHeaderFunctionID + } logger.Info("replaying the proof", "nonce", event.ProofNonce.Int64()) opts, err := newTransactOptsBuilder(privateKey)(ctx, targetEVMClient, 25000000) @@ -130,9 +165,10 @@ func Catchup( targetBlobstreamContractAddress string, targetChainGatewayAddress string, privateKey *ecdsa.PrivateKey, + headerRangeFunctionID [32]byte, + nextHeaderFunctionID [32]byte, + filterRange int64, ) error { - filterRange := int64(5000) - lookupStartHeight, err := sourceEVMClient.BlockNumber(ctx) if err != nil { return err @@ -165,7 +201,6 @@ func Catchup( return err } - // TODO: this could be improved in the future to only get the events needed dataCommitmentEvents, err := getAllDataCommitmentStoredEvents( ctx, logger, @@ -173,6 +208,7 @@ func Catchup( int64(lookupStartHeight), filterRange, latestSourceContractNonce.Int64(), + int64(latestTargetContractBlock), ) if err != nil { return err @@ -202,7 +238,19 @@ func Catchup( if bytes.Equal(coreDataCommitment.DataCommitment.Bytes(), event.DataCommitment[:]) { logger.Info("data commitment verified") } else { - logger.Error("data commitment mismatch!! quitting", "proof_nonce_in_source_contract", event.ProofNonce, "start_block", event.StartBlock, "end_block", event.EndBlock) + logger.Error( + "data commitment mismatch!! quitting", + "proof_nonce_in_source_contract", + event.ProofNonce, + "start_block", + event.StartBlock, + "end_block", + event.EndBlock, + "expected_data_commitment", + hex.EncodeToString(coreDataCommitment.DataCommitment.Bytes()), + "actual_data_commitment", + hex.EncodeToString(event.DataCommitment[:]), + ) return fmt.Errorf("data commitment mistmatch. start height %d end height %d", event.StartBlock, event.EndBlock) } } @@ -243,6 +291,14 @@ func Catchup( // update the address to be the target blobstreamX contract for the callback decodedArgs.CallbackAddress = ethcmn.HexToAddress(targetBlobstreamContractAddress) + if event.EndBlock-event.StartBlock > 1 { + // this is a header range proof + decodedArgs.FunctionID = headerRangeFunctionID + } else { + // this is a next header proof + decodedArgs.FunctionID = nextHeaderFunctionID + } + logger.Info("replaying the proof", "startHeight", startHeight) opts, err := newTransactOptsBuilder(privateKey)(ctx, targetEVMClient, 25000000) if err != nil { @@ -262,7 +318,17 @@ func Catchup( if err != nil { return err } - startHeight = event.EndBlock + // make sure the contract was updated + latestTargetContractBlock, err = targetBlobstreamX.LatestBlock(&bind.CallOpts{Context: ctx}) + if err != nil { + return err + } + if latestTargetContractBlock == event.EndBlock { + // contract updated successfully, we can advance + startHeight = event.EndBlock + } else { + logger.Error("contract did not update successfully, retrying the same proof", "expected_target_height", event.EndBlock, "actual_target_height", latestTargetContractBlock) + } } latestTargetContractBlock, err = targetBlobstreamX.LatestBlock(&bind.CallOpts{Context: ctx}) @@ -270,7 +336,7 @@ func Catchup( return err } - logger.Info("contract up to date", "latest_nonce", latestTargetContractBlock) + logger.Info("contract up to date", "latest_target_contract_block", latestTargetContractBlock) return nil } @@ -281,6 +347,7 @@ func getAllDataCommitmentStoredEvents( lookupStartHeight int64, filterRange int64, latestSourceContractNonce int64, + latestTargetContractBlock int64, ) (map[int64]blobstreamxwrapper.BlobstreamXDataCommitmentStored, error) { logger.Info("querying all the data commitment stored events in the source contract...") dataCommitmentEvents := make(map[int64]blobstreamxwrapper.BlobstreamXDataCommitmentStored) @@ -302,6 +369,7 @@ func getAllDataCommitmentStoredEvents( return nil, err } + gatheredTheNecessaryEvents := false for { if events.Event != nil { _, exists := dataCommitmentEvents[int64(events.Event.StartBlock)] @@ -309,6 +377,9 @@ func getAllDataCommitmentStoredEvents( continue } else { dataCommitmentEvents[int64(events.Event.StartBlock)] = *events.Event + if int64(events.Event.StartBlock) < latestTargetContractBlock { + gatheredTheNecessaryEvents = true + } } } if !events.Next() { @@ -320,6 +391,10 @@ func getAllDataCommitmentStoredEvents( logger.Info("found all events", "count", len(dataCommitmentEvents)) break } + if gatheredTheNecessaryEvents { + logger.Info("found enough events to cover the needed range", "count", len(dataCommitmentEvents)) + break + } logger.Info("found events", "count", len(dataCommitmentEvents)) } return dataCommitmentEvents, nil From 6734331f834a31cb26e565364160d178a1417edb Mon Sep 17 00:00:00 2001 From: sweexordious Date: Thu, 4 Jul 2024 11:04:51 +0100 Subject: [PATCH 6/6] chore: lint --- cmd/blobstream-ops/replay/cmd.go | 2 +- replay/evm.go | 7 +++---- replay/replayer.go | 9 ++++----- 3 files changed, 8 insertions(+), 10 deletions(-) diff --git a/cmd/blobstream-ops/replay/cmd.go b/cmd/blobstream-ops/replay/cmd.go index bc4a290..3825032 100644 --- a/cmd/blobstream-ops/replay/cmd.go +++ b/cmd/blobstream-ops/replay/cmd.go @@ -21,7 +21,7 @@ func Command() *cobra.Command { Short: "BlobstreamX deployment verification", Long: "verifies that a BlobstreamX contract is committing to valid data", SilenceUsage: true, - RunE: func(cmd *cobra.Command, args []string) error { + RunE: func(cmd *cobra.Command, _ []string) error { config, err := parseFlags(cmd) if err != nil { return err diff --git a/replay/evm.go b/replay/evm.go index 498afcf..0ae91b5 100644 --- a/replay/evm.go +++ b/replay/evm.go @@ -152,11 +152,10 @@ func submitProof( opts.GasPrice = big.NewInt(bigGasPrice.Int64() + bigGasPrice.Int64()/5) logger.Debug("transaction still not included, accelerating...", "new_gas_price", opts.GasPrice.Int64()) continue - } else { - logger.Error("transaction failed", "err", err.Error()) - logger.Debug("retrying...") - return err } + logger.Error("transaction failed", "err", err.Error()) + logger.Debug("retrying...") + return err } return nil } diff --git a/replay/replayer.go b/replay/replayer.go index 49716e8..eb442c7 100644 --- a/replay/replayer.go +++ b/replay/replayer.go @@ -375,11 +375,10 @@ func getAllDataCommitmentStoredEvents( _, exists := dataCommitmentEvents[int64(events.Event.StartBlock)] if exists { continue - } else { - dataCommitmentEvents[int64(events.Event.StartBlock)] = *events.Event - if int64(events.Event.StartBlock) < latestTargetContractBlock { - gatheredTheNecessaryEvents = true - } + } + dataCommitmentEvents[int64(events.Event.StartBlock)] = *events.Event + if int64(events.Event.StartBlock) < latestTargetContractBlock { + gatheredTheNecessaryEvents = true } } if !events.Next() {