Skip to content
This repository has been archived by the owner on Apr 15, 2024. It is now read-only.

feat: use archive nodes to query attestations #571

Merged
merged 18 commits into from
Nov 6, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
58 changes: 45 additions & 13 deletions cmd/blobstream/deploy/cmd.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,21 +43,33 @@ func Command() *cobra.Command {

encCfg := encoding.MakeConfig(app.ModuleEncodingRegisters...)

querier := rpc.NewAppQuerier(logger, config.coreGRPC, encCfg)
err = querier.Start(config.grpcInsecure)
appQuerier := rpc.NewAppQuerier(logger, config.coreGRPC, encCfg)
err = appQuerier.Start(config.grpcInsecure)
if err != nil {
return err
}
defer func() {
err := querier.Stop()
err := appQuerier.Stop()
if err != nil {
logger.Error(err.Error())
}
}()

vs, err := getStartingValset(cmd.Context(), querier, config.startingNonce)
tmQuerier := rpc.NewTmQuerier(config.coreRPC, logger)
err = tmQuerier.Start()
if err != nil {
logger.Error("couldn't get valset from state (probably pruned). connect to an archive node to be able to deploy the contract")
return err
}
defer func(tmQuerier *rpc.TmQuerier) {
err := tmQuerier.Stop()
if err != nil {
logger.Error(err.Error())
}
}(tmQuerier)

vs, err := getStartingValset(cmd.Context(), *tmQuerier, appQuerier, config.startingNonce)
if err != nil {
logger.Error("couldn't get valset from state (probably pruned). connect to an archive node to be able to deploy the contract", "err", err.Error())
return errors.Wrap(
err,
"cannot initialize the Blobstream contract without having a valset request: %s",
Expand Down Expand Up @@ -130,15 +142,29 @@ func Command() *cobra.Command {
}

// getStartingValset get the valset that will be used to init the bridge contract.
func getStartingValset(ctx context.Context, querier *rpc.AppQuerier, startingNonce string) (*types.Valset, error) {
func getStartingValset(ctx context.Context, tmQuerier rpc.TmQuerier, appQuerier *rpc.AppQuerier, startingNonce string) (*types.Valset, error) {
switch startingNonce {
case "latest":
return querier.QueryLatestValset(ctx)
vs, err := appQuerier.QueryLatestValset(ctx)
if err != nil {
appQuerier.Logger.Debug("couldn't get the attestation from node state. trying with historical data if the target node is archival", "nonce", 1, "err", err.Error())
currentHeight, err := tmQuerier.QueryHeight(ctx)
if err != nil {
return nil, err
}
return appQuerier.QueryRecursiveLatestValset(ctx, uint64(currentHeight))
}
return vs, nil
case "earliest":
// TODO make the first nonce 1 a const
att, err := querier.QueryAttestationByNonce(ctx, 1)
att, err := appQuerier.QueryAttestationByNonce(ctx, 1)
if err != nil {
return nil, err
appQuerier.Logger.Debug("couldn't get the attestation from node state. trying with historical data if the target node is archival", "nonce", 1, "err", err.Error())
historicalAtt, err := appQuerier.QueryHistoricalAttestationByNonce(ctx, 1, 1)
if err != nil {
return nil, err
}
att = historicalAtt
}
vs, ok := att.(*types.Valset)
if !ok {
Expand All @@ -150,17 +176,23 @@ func getStartingValset(ctx context.Context, querier *rpc.AppQuerier, startingNon
if err != nil {
return nil, err
}
attestation, err := querier.QueryAttestationByNonce(ctx, nonce)
currentHeight, err := tmQuerier.QueryHeight(ctx)
if err != nil {
return nil, err
}
attestation, err := appQuerier.QueryRecursiveHistoricalAttestationByNonce(ctx, nonce, uint64(currentHeight))
if err != nil {
return nil, err
}
if attestation == nil {
return nil, types.ErrNilAttestation
}
value, ok := attestation.(*types.Valset)
if ok {
switch value := attestation.(type) {
case *types.Valset:
return value, nil
case *types.DataCommitment:
return appQuerier.QueryRecursiveHistoricalLastValsetBeforeNonce(ctx, nonce, value.EndBlock)
}
return querier.QueryLastValsetBeforeNonce(ctx, nonce)
}
return nil, ErrNotFound
}
26 changes: 20 additions & 6 deletions cmd/blobstream/deploy/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ const (
FlagEVMGasLimit = "evm.gas-limit"
FlagCoreGRPCHost = "core.grpc.host"
FlagCoreGRPCPort = "core.grpc.port"
FlagCoreRPCHost = "core.rpc.host"
FlagCoreRPCPort = "core.rpc.port"
FlagStartingNonce = "starting-nonce"
ServiceNameDeployer = "deployer"
)
Expand All @@ -26,6 +28,8 @@ func addDeployFlags(cmd *cobra.Command) *cobra.Command {
cmd.Flags().Uint64(FlagEVMChainID, 5, "Specify the evm chain id")
cmd.Flags().String(FlagCoreGRPCHost, "localhost", "Specify the grpc address host")
cmd.Flags().Uint(FlagCoreGRPCPort, 9090, "Specify the grpc address port")
cmd.Flags().String(FlagCoreRPCHost, "localhost", "Specify the rpc address host")
cmd.Flags().Uint(FlagCoreRPCPort, 26657, "Specify the rpc address port")
cmd.Flags().String(FlagEVMRPC, "http://localhost:8545", "Specify the ethereum rpc address")
cmd.Flags().String(
FlagStartingNonce,
Expand All @@ -48,12 +52,13 @@ func addDeployFlags(cmd *cobra.Command) *cobra.Command {

type deployConfig struct {
*base.Config
evmRPC, coreGRPC string
evmChainID uint64
evmAccAddress string
startingNonce string
evmGasLimit uint64
grpcInsecure bool
evmRPC string
coreRPC, coreGRPC string
evmChainID uint64
evmAccAddress string
startingNonce string
evmGasLimit uint64
grpcInsecure bool
}

func parseDeployFlags(cmd *cobra.Command) (deployConfig, error) {
Expand All @@ -76,6 +81,14 @@ func parseDeployFlags(cmd *cobra.Command) (deployConfig, error) {
if err != nil {
return deployConfig{}, err
}
coreRPCHost, err := cmd.Flags().GetString(FlagCoreRPCHost)
if err != nil {
return deployConfig{}, err
}
coreRPCPort, err := cmd.Flags().GetUint(FlagCoreRPCPort)
if err != nil {
return deployConfig{}, err
}
evmRPC, err := cmd.Flags().GetString(FlagEVMRPC)
if err != nil {
return deployConfig{}, err
Expand Down Expand Up @@ -112,6 +125,7 @@ func parseDeployFlags(cmd *cobra.Command) (deployConfig, error) {
evmAccAddress: evmAccAddr,
evmChainID: evmChainID,
coreGRPC: fmt.Sprintf("%s:%d", coreGRPCHost, coreGRPCPort),
coreRPC: fmt.Sprintf("tcp://%s:%d", coreRPCHost, coreRPCPort),
evmRPC: evmRPC,
startingNonce: startingNonce,
evmGasLimit: evmGasLimit,
Expand Down
5 changes: 4 additions & 1 deletion cmd/blobstream/deploy/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,4 +2,7 @@ package deploy

import "errors"

var ErrUnmarshallValset = errors.New("couldn't unmarsall valset")
var (
ErrUnmarshallValset = errors.New("couldn't unmarshall valset")
ErrNotFound = errors.New("not found")
)
2 changes: 2 additions & 0 deletions e2e/scripts/deploy_blobstream_contract.sh
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,8 @@ echo "deploying Blobstream contract..."
--evm.account "${EVM_ACCOUNT}" \
--core.grpc.host "${CORE_GRPC_HOST}" \
--core.grpc.port "${CORE_GRPC_PORT}" \
--core.rpc.host "${CORE_RPC_HOST}" \
--core.rpc.port "${CORE_RPC_PORT}" \
--grpc.insecure \
--starting-nonce "${STARTING_NONCE}" \
--evm.rpc "${EVM_ENDPOINT}" \
Expand Down
4 changes: 3 additions & 1 deletion orchestrator/orchestrator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -206,7 +206,9 @@ func TestProcessWithoutValsetInStore(t *testing.T) {
testnode.ImmediateProposals(codec),
qgbtesting.SetDataCommitmentWindowParams(codec, celestiatypes.Params{DataCommitmentWindow: 101}),
},
TimeIotaMs: 6048000, // to have enough time to sign attestations after they're pruned
TimeIotaMs: 6048000, // to have enough time to sign attestations after they're pruned
Pruning: "default",
TimeoutCommit: 5 * time.Millisecond,
},
)
_, err := node.CelestiaNetwork.WaitForHeight(400)
Expand Down
5 changes: 4 additions & 1 deletion orchestrator/suite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package orchestrator_test
import (
"context"
"testing"
"time"

"github.com/celestiaorg/celestia-app/app"
"github.com/celestiaorg/celestia-app/app/encoding"
Expand Down Expand Up @@ -31,7 +32,9 @@ func (s *OrchestratorTestSuite) SetupSuite() {
testnode.ImmediateProposals(codec),
blobstreamtesting.SetDataCommitmentWindowParams(codec, types.Params{DataCommitmentWindow: 101}),
},
TimeIotaMs: 1,
TimeIotaMs: 1,
Pruning: "default",
TimeoutCommit: 5 * time.Millisecond,
},
)
s.Orchestrator = blobstreamtesting.NewOrchestrator(t, s.Node)
Expand Down
43 changes: 43 additions & 0 deletions relayer/historic_relayer_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
package relayer_test

import (
"context"
"math/big"
"time"

blobstreamtypes "github.com/celestiaorg/orchestrator-relayer/types"

"github.com/celestiaorg/celestia-app/x/qgb/types"
"github.com/stretchr/testify/require"
)

func (s *HistoricalRelayerTestSuite) TestProcessHistoricAttestation() {
t := s.T()
_, err := s.Node.CelestiaNetwork.WaitForHeightWithTimeout(400, 30*time.Second)
require.NoError(t, err)

ctx := context.Background()
valset, err := s.Orchestrator.AppQuerier.QueryLatestValset(ctx)
require.NoError(t, err)

// wait for the valset to be pruned to test if the relayer is able to
// relay using a pruned valset.
for {
_, err = s.Orchestrator.AppQuerier.QueryAttestationByNonce(ctx, valset.Nonce)
if err != nil {
break
}
}

// sign a test data commitment so that the relayer can relay it
att := types.NewDataCommitment(valset.Nonce+1, 10, 100, time.Now())
commitment, err := s.Orchestrator.TmQuerier.QueryCommitment(ctx, att.BeginBlock, att.EndBlock)
require.NoError(t, err)
dataRootTupleRoot := blobstreamtypes.DataCommitmentTupleRootSignBytes(big.NewInt(int64(att.Nonce)), commitment)
err = s.Orchestrator.ProcessDataCommitmentEvent(ctx, *att, dataRootTupleRoot)
require.NoError(t, err)

// process the test data commitment that needs the pruned valset to be relayed.
_, err = s.Relayer.ProcessAttestation(ctx, s.Node.EVMChain.Auth, att)
require.NoError(t, err)
}
66 changes: 66 additions & 0 deletions relayer/historic_suite_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
package relayer_test

import (
"context"
"testing"
"time"

"github.com/celestiaorg/celestia-app/app"
"github.com/celestiaorg/celestia-app/app/encoding"
"github.com/celestiaorg/celestia-app/test/util/testnode"
"github.com/celestiaorg/celestia-app/x/qgb/types"
"github.com/celestiaorg/orchestrator-relayer/rpc"

"github.com/celestiaorg/orchestrator-relayer/orchestrator"

"github.com/celestiaorg/orchestrator-relayer/relayer"
blobstreamtesting "github.com/celestiaorg/orchestrator-relayer/testing"
"github.com/stretchr/testify/require"
"github.com/stretchr/testify/suite"
)

type HistoricalRelayerTestSuite struct {
suite.Suite
Node *blobstreamtesting.TestNode
Orchestrator *orchestrator.Orchestrator
Relayer *relayer.Relayer
}

func (s *HistoricalRelayerTestSuite) SetupSuite() {
t := s.T()
if testing.Short() {
t.Skip("skipping relayer tests in short mode.")
}
ctx := context.Background()
s.Node = blobstreamtesting.NewTestNode(
ctx,
t,
blobstreamtesting.CelestiaNetworkParams{
GenesisOpts: []testnode.GenesisOption{blobstreamtesting.SetDataCommitmentWindowParams(
encoding.MakeConfig(app.ModuleEncodingRegisters...).Codec,
types.Params{DataCommitmentWindow: 101},
)},
TimeIotaMs: 3048000, // so that old attestations are deleted as soon as a new one appears
Pruning: "nothing", // make the node an archive one
TimeoutCommit: 20 * time.Millisecond,
},
)
_, err := s.Node.CelestiaNetwork.WaitForHeight(2)
require.NoError(t, err)
s.Orchestrator = blobstreamtesting.NewOrchestrator(t, s.Node)
s.Relayer = blobstreamtesting.NewRelayer(t, s.Node)
go s.Node.EVMChain.PeriodicCommit(ctx, time.Millisecond)
initVs, err := s.Relayer.AppQuerier.QueryLatestValset(s.Node.Context)
require.NoError(t, err)
_, _, _, err = s.Relayer.EVMClient.DeployBlobstreamContract(s.Node.EVMChain.Auth, s.Node.EVMChain.Backend, *initVs, initVs.Nonce, true)
require.NoError(t, err)
rpc.BlocksIn20DaysPeriod = 50
}

func (s *HistoricalRelayerTestSuite) TearDownSuite() {
s.Node.Close()
}

func TestHistoricRelayer(t *testing.T) {
suite.Run(t, new(HistoricalRelayerTestSuite))
}
13 changes: 11 additions & 2 deletions relayer/relayer.go
Original file line number Diff line number Diff line change
Expand Up @@ -145,11 +145,20 @@ func (r *Relayer) Start(ctx context.Context) error {
func (r *Relayer) ProcessAttestation(ctx context.Context, opts *bind.TransactOpts, attI celestiatypes.AttestationRequestI) (*coregethtypes.Transaction, error) {
previousValset, err := r.AppQuerier.QueryLastValsetBeforeNonce(ctx, attI.GetNonce())
if err != nil {
r.logger.Error("failed to query the last valset before nonce (probably pruned). recovering via falling back to the P2P network", "err", err.Error())
r.logger.Debug("failed to query the last valset before nonce (probably pruned). recovering via falling back to the P2P network", "err", err.Error())
previousValset, err = r.QueryValsetFromP2PNetworkAndValidateIt(ctx)
if err != nil {
return nil, err
r.logger.Debug("failed to query the last valset before nonce from p2p network. attempting via using an archive node (might take some time)", "err", err.Error())
currentHeight, err := r.TmQuerier.QueryHeight(ctx)
if err != nil {
return nil, err
}
previousValset, err = r.AppQuerier.QueryRecursiveHistoricalLastValsetBeforeNonce(ctx, attI.GetNonce(), uint64(currentHeight))
if err != nil {
return nil, err
}
}
r.logger.Debug("found the needed valset")
}
switch att := attI.(type) {
case *celestiatypes.Valset:
Expand Down
4 changes: 3 additions & 1 deletion relayer/relayer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,9 @@ func TestUseValsetFromP2P(t *testing.T) {
testnode.ImmediateProposals(codec),
qgbtesting.SetDataCommitmentWindowParams(codec, types.Params{DataCommitmentWindow: 101}),
},
TimeIotaMs: 2000000, // so attestations are pruned after they're queried
TimeIotaMs: 2000000, // so attestations are pruned after they're queried
Pruning: "default",
TimeoutCommit: 5 * time.Millisecond,
},
)

Expand Down
Loading
Loading