Skip to content

Commit

Permalink
feat: add replay functionality
Browse files Browse the repository at this point in the history
  • Loading branch information
rach-id committed Jul 1, 2024
1 parent 6c0faa8 commit 041fd7a
Show file tree
Hide file tree
Showing 8 changed files with 875 additions and 42 deletions.
44 changes: 44 additions & 0 deletions cmd/blobstream-ops/common/utils.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
package common

import (
"context"
"fmt"
"io"
"os"
"os/signal"
"strings"
"syscall"

"github.com/cosmos/cosmos-sdk/server"
"github.com/rs/zerolog"
tmconfig "github.com/tendermint/tendermint/config"
tmlog "github.com/tendermint/tendermint/libs/log"
)

// GetLogger creates a new logger and returns
func GetLogger(level string, format string) (tmlog.Logger, error) {
logLvl, err := zerolog.ParseLevel(level)
if err != nil {
return nil, fmt.Errorf("failed to parse log level (%s): %w", level, err)
}
var logWriter io.Writer
if strings.ToLower(format) == tmconfig.LogFormatPlain {
logWriter = zerolog.ConsoleWriter{Out: os.Stderr}
} else {
logWriter = os.Stderr
}

return server.ZeroLogWrapper{Logger: zerolog.New(logWriter).Level(logLvl).With().Timestamp().Logger()}, nil
}

// TrapSignal will listen for any OS signal and cancel the context to exit gracefully.
func TrapSignal(logger tmlog.Logger, cancel context.CancelFunc) {
sigCh := make(chan os.Signal, 1)

signal.Notify(sigCh, syscall.SIGTERM)
signal.Notify(sigCh, syscall.SIGINT)

sig := <-sigCh
logger.Info("caught signal; shutting down...", "signal", sig.String())
cancel()
}
160 changes: 160 additions & 0 deletions cmd/blobstream-ops/replay/cmd.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,160 @@
package replay

import (
"context"
"fmt"

"github.com/celestiaorg/blobstream-ops/cmd/blobstream-ops/common"
"github.com/celestiaorg/blobstream-ops/cmd/blobstream-ops/version"
"github.com/celestiaorg/blobstream-ops/replayer"
"github.com/ethereum/go-ethereum/accounts/abi/bind"
ethcmn "github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/ethclient"
"github.com/spf13/cobra"
blobstreamxwrapper "github.com/succinctlabs/blobstreamx/bindings"
"github.com/tendermint/tendermint/rpc/client/http"
)

// Command the replay command
func Command() *cobra.Command {
cmd := &cobra.Command{
Use: "replay",
Short: "BlobstreamX deployment verification",
Long: "verifies that a BlobstreamX contract is committing to valid data",
SilenceUsage: true,
RunE: func(cmd *cobra.Command, args []string) error {
config, err := parseFlags(cmd)
if err != nil {
return err
}
if err := config.ValidateBasics(); err != nil {
return err
}

logger, err := common.GetLogger(config.LogLevel, config.LogFormat)
if err != nil {
return err
}

buildInfo := version.GetBuildInfo()
logger.Info("initializing replay service", "version", buildInfo.SemanticVersion, "build_date", buildInfo.BuildTime)

ctx, cancel := context.WithCancel(cmd.Context())
defer cancel()

// Listen for and trap any OS signal to graceful shutdown and exit
go common.TrapSignal(logger, cancel)

// connecting to the source BlobstreamX contract
sourceEVMClient, err := ethclient.Dial(config.SourceEVMRPC)
if err != nil {
return err
}
defer sourceEVMClient.Close()

sourceBlobstreamReader, err := blobstreamxwrapper.NewBlobstreamXCaller(
ethcmn.HexToAddress(config.SourceContractAddress),
sourceEVMClient,
)
if err != nil {
return err
}

// connecting to the target BlobstreamX contract
targetEVMClient, err := ethclient.Dial(config.TargetEVMRPC)
if err != nil {
return err
}
defer targetEVMClient.Close()

targetBlobstreamReader, err := blobstreamxwrapper.NewBlobstreamXCaller(
ethcmn.HexToAddress(config.TargetContractAddress),
targetEVMClient,
)
if err != nil {
return err
}

logger.Info(
"starting replay service",
"evm.source.rpc",
config.SourceEVMRPC,
"evm.source.contract-address",
config.SourceContractAddress,
"evm.target.rpc",
config.TargetEVMRPC,
"evm.target.contract-address",
config.TargetContractAddress,
"core.rpc",
config.CoreRPC,
)

latestSourceNonce, err := sourceBlobstreamReader.StateProofNonce(&bind.CallOpts{})
if err != nil {
return err
}
logger.Info("found latest source blobstreamX contract nonce", "nonce", latestSourceNonce.Int64())

latestTargetNonce, err := targetBlobstreamReader.StateProofNonce(&bind.CallOpts{})
if err != nil {
return err
}
logger.Info("found latest target blobstreamX contract nonce", "nonce", latestTargetNonce.Int64())

var trpc *http.HTTP
if config.Verify {
trpc, err = http.New(config.CoreRPC, "/websocket")
if err != nil {
return err
}
err = trpc.Start()
if err != nil {
return err
}
defer func(trpc *http.HTTP) {
err := trpc.Stop()
if err != nil {
fmt.Println(err.Error())
}
}(trpc)
}

if latestSourceNonce.Int64() > latestTargetNonce.Int64() {
err = replayer.Catchup(
ctx,
logger,
config.Verify,
trpc,
sourceEVMClient,
targetEVMClient,
config.SourceContractAddress,
config.TargetContractAddress,
config.TargetChainGateway,
config.PrivateKey,
)
if err != nil {
return err
}
} else {
logger.Info("target contract is already up to date")
}

return replayer.Follow(
ctx,
logger,
config.Verify,
trpc,
sourceEVMClient,
targetEVMClient,
config.SourceContractAddress,
config.TargetContractAddress,
config.TargetChainGateway,
config.PrivateKey,
)
},
}

cmd.SetHelpCommand(&cobra.Command{})

return addFlags(cmd)
}
160 changes: 160 additions & 0 deletions cmd/blobstream-ops/replay/config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,160 @@
package replay

import (
"crypto/ecdsa"
"errors"
"fmt"

"github.com/ethereum/go-ethereum/crypto"

ethcmn "github.com/ethereum/go-ethereum/common"
"github.com/spf13/cobra"
)

const (
FlagSourceEVMRPC = "evm.source.rpc"
FlagSourceEVMContractAddress = "evm.source.contract-address"
FlagTargetEVMRPC = "evm.target.rpc"
FlagTargetEVMContractAddress = "evm.target.contract-address"
FlagTargetChainGateway = "evm.target.gateway"
FlagEVMPrivateKey = "evm.private-key"

FlagVerify = "verify"

FlagLogLevel = "log.level"
FlagLogFormat = "log.format"

FlagCoreRPC = "core.rpc"
)

func addFlags(cmd *cobra.Command) *cobra.Command {
cmd.Flags().String(FlagSourceEVMRPC, "http://localhost:8545", "Specify the Ethereum rpc address of the source EVM chain")
cmd.Flags().String(FlagTargetEVMRPC, "http://localhost:8545", "Specify the Ethereum rpc address of the target EVM chain")
cmd.Flags().String(FlagSourceEVMContractAddress, "", "Specify the source contract at which the source BlobstreamX contract is deployed")
cmd.Flags().String(FlagTargetEVMContractAddress, "", "Specify the target contract at which the target BlobstreamX contract is deployed")
cmd.Flags().String(FlagTargetChainGateway, "", "Specify the target chain succinct gateway contract address")
cmd.Flags().String(
FlagLogLevel,
"info",
"The logging level (trace|debug|info|warn|error|fatal|panic)",
)
cmd.Flags().String(
FlagLogFormat,
"plain",
"The logging format (json|plain)",
)
cmd.Flags().String(
FlagCoreRPC,
"tcp://localhost:26657",
"The celestia app rpc address",
)
cmd.Flags().Bool(FlagVerify, false, "Set to verify the commitments before replaying their proofs. Require the core rpc flag to be set")
cmd.Flags().String(FlagEVMPrivateKey, "", "Specify the EVM private key, in hex format without the leading 0x, to use for replaying transaction in the target chain. Corresponding account should be funded")
return cmd
}

type Config struct {
SourceEVMRPC string
TargetEVMRPC string
SourceContractAddress string
TargetContractAddress string
TargetChainGateway string
LogLevel string
LogFormat string
CoreRPC string
Verify bool
PrivateKey *ecdsa.PrivateKey
}

func (cfg Config) ValidateBasics() error {
if err := ValidateEVMAddress(cfg.SourceContractAddress); err != nil {
return fmt.Errorf("%s: flag --%s", err.Error(), FlagSourceEVMContractAddress)
}
if err := ValidateEVMAddress(cfg.TargetContractAddress); err != nil {
return fmt.Errorf("%s: flag --%s", err.Error(), FlagTargetEVMContractAddress)
}
if err := ValidateEVMAddress(cfg.TargetChainGateway); err != nil {
return fmt.Errorf("%s: flag --%s", err.Error(), FlagTargetChainGateway)
}
if cfg.Verify && cfg.CoreRPC == "" {
return fmt.Errorf("flag --%s is set but the core RPC flag --%s is not set", FlagVerify, FlagCoreRPC)
}
return nil
}

func ValidateEVMAddress(addr string) error {
if addr == "" {
return fmt.Errorf("the EVM address cannot be empty")
}
if !ethcmn.IsHexAddress(addr) {
return errors.New("valid EVM address is required")
}
return nil
}

func parseFlags(cmd *cobra.Command) (Config, error) {
// TODO add support for env variables
sourceContractAddress, err := cmd.Flags().GetString(FlagSourceEVMContractAddress)
if err != nil {
return Config{}, err
}

targetContractAddress, err := cmd.Flags().GetString(FlagTargetEVMContractAddress)
if err != nil {
return Config{}, err
}

targetChainGateway, err := cmd.Flags().GetString(FlagTargetChainGateway)
if err != nil {
return Config{}, err
}

sourceEVMRPC, err := cmd.Flags().GetString(FlagSourceEVMRPC)
if err != nil {
return Config{}, err
}

targetEVMRPC, err := cmd.Flags().GetString(FlagTargetEVMRPC)
if err != nil {
return Config{}, err
}

coreRPC, err := cmd.Flags().GetString(FlagCoreRPC)
if err != nil {
return Config{}, err
}

logLevel, err := cmd.Flags().GetString(FlagLogLevel)
if err != nil {
return Config{}, err
}

logFormat, err := cmd.Flags().GetString(FlagLogFormat)
if err != nil {
return Config{}, err
}

rawPrivateKey, err := cmd.Flags().GetString(FlagEVMPrivateKey)
if err != nil {
return Config{}, err
}
if rawPrivateKey == "" {
return Config{}, fmt.Errorf("please set the private key --%s", FlagEVMPrivateKey)
}
privateKey, err := crypto.HexToECDSA(rawPrivateKey)
if err != nil {
return Config{}, fmt.Errorf("failed to hex-decode Ethereum ECDSA Private Key: %w", err)
}

return Config{
SourceEVMRPC: sourceEVMRPC,
TargetEVMRPC: targetEVMRPC,
SourceContractAddress: sourceContractAddress,
TargetContractAddress: targetContractAddress,
TargetChainGateway: targetChainGateway,
CoreRPC: coreRPC,
LogLevel: logLevel,
LogFormat: logFormat,
PrivateKey: privateKey,
}, nil
}
Loading

0 comments on commit 041fd7a

Please sign in to comment.