Skip to content

Commit

Permalink
Merge pull request #7 from celestiaorg/add-commands
Browse files Browse the repository at this point in the history
feat: add replay functionality
  • Loading branch information
rach-id authored Jul 4, 2024
2 parents 12f10f7 + 6734331 commit e496e2b
Show file tree
Hide file tree
Showing 9 changed files with 1,020 additions and 43 deletions.
44 changes: 44 additions & 0 deletions cmd/blobstream-ops/common/utils.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
package common

import (
"context"
"fmt"
"io"
"os"
"os/signal"
"strings"
"syscall"

"github.com/cosmos/cosmos-sdk/server"
"github.com/rs/zerolog"
tmconfig "github.com/tendermint/tendermint/config"
tmlog "github.com/tendermint/tendermint/libs/log"
)

// GetLogger creates a new logger and returns
func GetLogger(level string, format string) (tmlog.Logger, error) {
logLvl, err := zerolog.ParseLevel(level)
if err != nil {
return nil, fmt.Errorf("failed to parse log level (%s): %w", level, err)
}
var logWriter io.Writer
if strings.ToLower(format) == tmconfig.LogFormatPlain {
logWriter = zerolog.ConsoleWriter{Out: os.Stderr}
} else {
logWriter = os.Stderr
}

return server.ZeroLogWrapper{Logger: zerolog.New(logWriter).Level(logLvl).With().Timestamp().Logger()}, nil
}

// TrapSignal will listen for any OS signal and cancel the context to exit gracefully.
func TrapSignal(logger tmlog.Logger, cancel context.CancelFunc) {
sigCh := make(chan os.Signal, 1)

signal.Notify(sigCh, syscall.SIGTERM)
signal.Notify(sigCh, syscall.SIGINT)

sig := <-sigCh
logger.Info("caught signal; shutting down...", "signal", sig.String())
cancel()
}
165 changes: 165 additions & 0 deletions cmd/blobstream-ops/replay/cmd.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,165 @@
package replay

import (
"context"

"github.com/celestiaorg/blobstream-ops/cmd/blobstream-ops/common"
"github.com/celestiaorg/blobstream-ops/cmd/blobstream-ops/version"
"github.com/celestiaorg/blobstream-ops/replay"
"github.com/ethereum/go-ethereum/accounts/abi/bind"
ethcmn "github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/ethclient"
"github.com/spf13/cobra"
blobstreamxwrapper "github.com/succinctlabs/blobstreamx/bindings"
"github.com/tendermint/tendermint/rpc/client/http"
)

// Command the replay command
func Command() *cobra.Command {
cmd := &cobra.Command{
Use: "replay",
Short: "BlobstreamX deployment verification",
Long: "verifies that a BlobstreamX contract is committing to valid data",
SilenceUsage: true,
RunE: func(cmd *cobra.Command, _ []string) error {
config, err := parseFlags(cmd)
if err != nil {
return err
}
if err := config.ValidateBasics(); err != nil {
return err
}

logger, err := common.GetLogger(config.LogLevel, config.LogFormat)
if err != nil {
return err
}

buildInfo := version.GetBuildInfo()
logger.Info("initializing replay service", "version", buildInfo.SemanticVersion, "build_date", buildInfo.BuildTime)

ctx, cancel := context.WithCancel(cmd.Context())
defer cancel()

// Listen for and trap any OS signal to graceful shutdown and exit
go common.TrapSignal(logger, cancel)

// connecting to the source BlobstreamX contract
sourceEVMClient, err := ethclient.Dial(config.SourceEVMRPC)
if err != nil {
return err
}
defer sourceEVMClient.Close()

sourceBlobstreamReader, err := blobstreamxwrapper.NewBlobstreamXCaller(
ethcmn.HexToAddress(config.SourceContractAddress),
sourceEVMClient,
)
if err != nil {
return err
}

// connecting to the target BlobstreamX contract
targetEVMClient, err := ethclient.Dial(config.TargetEVMRPC)
if err != nil {
return err
}
defer targetEVMClient.Close()

targetBlobstreamReader, err := blobstreamxwrapper.NewBlobstreamXCaller(
ethcmn.HexToAddress(config.TargetContractAddress),
targetEVMClient,
)
if err != nil {
return err
}

logger.Info(
"starting replay service",
"evm.source.rpc",
config.SourceEVMRPC,
"evm.source.contract-address",
config.SourceContractAddress,
"evm.target.rpc",
config.TargetEVMRPC,
"evm.target.contract-address",
config.TargetContractAddress,
"core.rpc",
config.CoreRPC,
)

latestSourceBlock, err := sourceBlobstreamReader.LatestBlock(&bind.CallOpts{})
if err != nil {
return err
}
logger.Info("found source blobstreamX contract", "latest_block", latestSourceBlock)

latestTargetBlock, err := targetBlobstreamReader.LatestBlock(&bind.CallOpts{})
if err != nil {
return err
}
logger.Info("found target blobstreamX contract", "latest_block", latestTargetBlock)

var trpc *http.HTTP
if config.Verify {
trpc, err = http.New(config.CoreRPC, "/websocket")
if err != nil {
return err
}
err = trpc.Start()
if err != nil {
return err
}
defer func(trpc *http.HTTP) {
err := trpc.Stop()
if err != nil {
logger.Error("error stopping tendermint RPC", "err", err.Error())
}
}(trpc)
}

if latestSourceBlock > latestTargetBlock {
err = replay.Catchup(
ctx,
logger,
config.Verify,
trpc,
sourceEVMClient,
targetEVMClient,
config.SourceContractAddress,
config.TargetContractAddress,
config.TargetChainGateway,
config.PrivateKey,
config.HeaderRangeFunctionID,
config.NextHeaderFunctionID,
config.FilterRange,
)
if err != nil {
return err
}
} else {
logger.Info("target contract is already up to date")
}

return replay.Follow(
ctx,
logger,
config.Verify,
trpc,
sourceEVMClient,
targetEVMClient,
config.SourceContractAddress,
config.TargetContractAddress,
config.TargetChainGateway,
config.PrivateKey,
config.HeaderRangeFunctionID,
config.NextHeaderFunctionID,
config.FilterRange,
)
},
}

cmd.SetHelpCommand(&cobra.Command{})

return addFlags(cmd)
}
Loading

0 comments on commit e496e2b

Please sign in to comment.