From 87d842a607cf7519e9d657a0e9df7acf83753955 Mon Sep 17 00:00:00 2001 From: CHAMI Rachid Date: Mon, 6 Nov 2023 16:30:46 +0100 Subject: [PATCH] feat: use archive nodes to query attestations (#571) * feat: use historical data to query attestations * chore: lint * chore: lint * chore: better error message * chore: use timeout for recursive lookup * test: add tests for historical queries * chore: refactor for less code duplication * test: add relayer test * docs: add comment * chore: golang ci * chore: fix nonce + better logging * chore: conflicts * chore: conflicts --- cmd/blobstream/deploy/cmd.go | 58 +++++-- cmd/blobstream/deploy/config.go | 26 +++- cmd/blobstream/deploy/errors.go | 5 +- e2e/scripts/deploy_blobstream_contract.sh | 2 + orchestrator/orchestrator_test.go | 4 +- orchestrator/suite_test.go | 5 +- relayer/historic_relayer_test.go | 43 ++++++ relayer/historic_suite_test.go | 66 ++++++++ relayer/relayer.go | 13 +- relayer/relayer_test.go | 4 +- rpc/app_historic_querier_test.go | 80 ++++++++++ rpc/app_querier.go | 180 ++++++++++++++++++++++ rpc/errors.go | 5 +- rpc/historic_suite_test.go | 64 ++++++++ testing/celestia_network.go | 15 +- 15 files changed, 539 insertions(+), 31 deletions(-) create mode 100644 relayer/historic_relayer_test.go create mode 100644 relayer/historic_suite_test.go create mode 100644 rpc/app_historic_querier_test.go create mode 100644 rpc/historic_suite_test.go diff --git a/cmd/blobstream/deploy/cmd.go b/cmd/blobstream/deploy/cmd.go index a84d75dc..68d214b8 100644 --- a/cmd/blobstream/deploy/cmd.go +++ b/cmd/blobstream/deploy/cmd.go @@ -43,21 +43,33 @@ func Command() *cobra.Command { encCfg := encoding.MakeConfig(app.ModuleEncodingRegisters...) - querier := rpc.NewAppQuerier(logger, config.coreGRPC, encCfg) - err = querier.Start(config.grpcInsecure) + appQuerier := rpc.NewAppQuerier(logger, config.coreGRPC, encCfg) + err = appQuerier.Start(config.grpcInsecure) if err != nil { return err } defer func() { - err := querier.Stop() + err := appQuerier.Stop() if err != nil { logger.Error(err.Error()) } }() - vs, err := getStartingValset(cmd.Context(), querier, config.startingNonce) + tmQuerier := rpc.NewTmQuerier(config.coreRPC, logger) + err = tmQuerier.Start() if err != nil { - logger.Error("couldn't get valset from state (probably pruned). connect to an archive node to be able to deploy the contract") + return err + } + defer func(tmQuerier *rpc.TmQuerier) { + err := tmQuerier.Stop() + if err != nil { + logger.Error(err.Error()) + } + }(tmQuerier) + + vs, err := getStartingValset(cmd.Context(), *tmQuerier, appQuerier, config.startingNonce) + if err != nil { + logger.Error("couldn't get valset from state (probably pruned). connect to an archive node to be able to deploy the contract", "err", err.Error()) return errors.Wrap( err, "cannot initialize the Blobstream contract without having a valset request: %s", @@ -130,15 +142,29 @@ func Command() *cobra.Command { } // getStartingValset get the valset that will be used to init the bridge contract. -func getStartingValset(ctx context.Context, querier *rpc.AppQuerier, startingNonce string) (*types.Valset, error) { +func getStartingValset(ctx context.Context, tmQuerier rpc.TmQuerier, appQuerier *rpc.AppQuerier, startingNonce string) (*types.Valset, error) { switch startingNonce { case "latest": - return querier.QueryLatestValset(ctx) + vs, err := appQuerier.QueryLatestValset(ctx) + if err != nil { + appQuerier.Logger.Debug("couldn't get the attestation from node state. trying with historical data if the target node is archival", "nonce", 1, "err", err.Error()) + currentHeight, err := tmQuerier.QueryHeight(ctx) + if err != nil { + return nil, err + } + return appQuerier.QueryRecursiveLatestValset(ctx, uint64(currentHeight)) + } + return vs, nil case "earliest": // TODO make the first nonce 1 a const - att, err := querier.QueryAttestationByNonce(ctx, 1) + att, err := appQuerier.QueryAttestationByNonce(ctx, 1) if err != nil { - return nil, err + appQuerier.Logger.Debug("couldn't get the attestation from node state. trying with historical data if the target node is archival", "nonce", 1, "err", err.Error()) + historicalAtt, err := appQuerier.QueryHistoricalAttestationByNonce(ctx, 1, 1) + if err != nil { + return nil, err + } + att = historicalAtt } vs, ok := att.(*types.Valset) if !ok { @@ -150,17 +176,23 @@ func getStartingValset(ctx context.Context, querier *rpc.AppQuerier, startingNon if err != nil { return nil, err } - attestation, err := querier.QueryAttestationByNonce(ctx, nonce) + currentHeight, err := tmQuerier.QueryHeight(ctx) + if err != nil { + return nil, err + } + attestation, err := appQuerier.QueryRecursiveHistoricalAttestationByNonce(ctx, nonce, uint64(currentHeight)) if err != nil { return nil, err } if attestation == nil { return nil, types.ErrNilAttestation } - value, ok := attestation.(*types.Valset) - if ok { + switch value := attestation.(type) { + case *types.Valset: return value, nil + case *types.DataCommitment: + return appQuerier.QueryRecursiveHistoricalLastValsetBeforeNonce(ctx, nonce, value.EndBlock) } - return querier.QueryLastValsetBeforeNonce(ctx, nonce) } + return nil, ErrNotFound } diff --git a/cmd/blobstream/deploy/config.go b/cmd/blobstream/deploy/config.go index d926ec22..c3ac1b52 100644 --- a/cmd/blobstream/deploy/config.go +++ b/cmd/blobstream/deploy/config.go @@ -17,6 +17,8 @@ const ( FlagEVMGasLimit = "evm.gas-limit" FlagCoreGRPCHost = "core.grpc.host" FlagCoreGRPCPort = "core.grpc.port" + FlagCoreRPCHost = "core.rpc.host" + FlagCoreRPCPort = "core.rpc.port" FlagStartingNonce = "starting-nonce" ServiceNameDeployer = "deployer" ) @@ -26,6 +28,8 @@ func addDeployFlags(cmd *cobra.Command) *cobra.Command { cmd.Flags().Uint64(FlagEVMChainID, 5, "Specify the evm chain id") cmd.Flags().String(FlagCoreGRPCHost, "localhost", "Specify the grpc address host") cmd.Flags().Uint(FlagCoreGRPCPort, 9090, "Specify the grpc address port") + cmd.Flags().String(FlagCoreRPCHost, "localhost", "Specify the rpc address host") + cmd.Flags().Uint(FlagCoreRPCPort, 26657, "Specify the rpc address port") cmd.Flags().String(FlagEVMRPC, "http://localhost:8545", "Specify the ethereum rpc address") cmd.Flags().String( FlagStartingNonce, @@ -48,12 +52,13 @@ func addDeployFlags(cmd *cobra.Command) *cobra.Command { type deployConfig struct { *base.Config - evmRPC, coreGRPC string - evmChainID uint64 - evmAccAddress string - startingNonce string - evmGasLimit uint64 - grpcInsecure bool + evmRPC string + coreRPC, coreGRPC string + evmChainID uint64 + evmAccAddress string + startingNonce string + evmGasLimit uint64 + grpcInsecure bool } func parseDeployFlags(cmd *cobra.Command) (deployConfig, error) { @@ -76,6 +81,14 @@ func parseDeployFlags(cmd *cobra.Command) (deployConfig, error) { if err != nil { return deployConfig{}, err } + coreRPCHost, err := cmd.Flags().GetString(FlagCoreRPCHost) + if err != nil { + return deployConfig{}, err + } + coreRPCPort, err := cmd.Flags().GetUint(FlagCoreRPCPort) + if err != nil { + return deployConfig{}, err + } evmRPC, err := cmd.Flags().GetString(FlagEVMRPC) if err != nil { return deployConfig{}, err @@ -112,6 +125,7 @@ func parseDeployFlags(cmd *cobra.Command) (deployConfig, error) { evmAccAddress: evmAccAddr, evmChainID: evmChainID, coreGRPC: fmt.Sprintf("%s:%d", coreGRPCHost, coreGRPCPort), + coreRPC: fmt.Sprintf("tcp://%s:%d", coreRPCHost, coreRPCPort), evmRPC: evmRPC, startingNonce: startingNonce, evmGasLimit: evmGasLimit, diff --git a/cmd/blobstream/deploy/errors.go b/cmd/blobstream/deploy/errors.go index f5d814b0..d5149d7e 100644 --- a/cmd/blobstream/deploy/errors.go +++ b/cmd/blobstream/deploy/errors.go @@ -2,4 +2,7 @@ package deploy import "errors" -var ErrUnmarshallValset = errors.New("couldn't unmarsall valset") +var ( + ErrUnmarshallValset = errors.New("couldn't unmarshall valset") + ErrNotFound = errors.New("not found") +) diff --git a/e2e/scripts/deploy_blobstream_contract.sh b/e2e/scripts/deploy_blobstream_contract.sh index d6696ec6..11c1f024 100644 --- a/e2e/scripts/deploy_blobstream_contract.sh +++ b/e2e/scripts/deploy_blobstream_contract.sh @@ -68,6 +68,8 @@ echo "deploying Blobstream contract..." --evm.account "${EVM_ACCOUNT}" \ --core.grpc.host "${CORE_GRPC_HOST}" \ --core.grpc.port "${CORE_GRPC_PORT}" \ + --core.rpc.host "${CORE_RPC_HOST}" \ + --core.rpc.port "${CORE_RPC_PORT}" \ --grpc.insecure \ --starting-nonce "${STARTING_NONCE}" \ --evm.rpc "${EVM_ENDPOINT}" \ diff --git a/orchestrator/orchestrator_test.go b/orchestrator/orchestrator_test.go index 99cafed6..0f03b5ed 100644 --- a/orchestrator/orchestrator_test.go +++ b/orchestrator/orchestrator_test.go @@ -206,7 +206,9 @@ func TestProcessWithoutValsetInStore(t *testing.T) { testnode.ImmediateProposals(codec), qgbtesting.SetDataCommitmentWindowParams(codec, celestiatypes.Params{DataCommitmentWindow: 101}), }, - TimeIotaMs: 6048000, // to have enough time to sign attestations after they're pruned + TimeIotaMs: 6048000, // to have enough time to sign attestations after they're pruned + Pruning: "default", + TimeoutCommit: 5 * time.Millisecond, }, ) _, err := node.CelestiaNetwork.WaitForHeight(400) diff --git a/orchestrator/suite_test.go b/orchestrator/suite_test.go index bafba568..fc0c5cf3 100644 --- a/orchestrator/suite_test.go +++ b/orchestrator/suite_test.go @@ -3,6 +3,7 @@ package orchestrator_test import ( "context" "testing" + "time" "github.com/celestiaorg/celestia-app/app" "github.com/celestiaorg/celestia-app/app/encoding" @@ -31,7 +32,9 @@ func (s *OrchestratorTestSuite) SetupSuite() { testnode.ImmediateProposals(codec), blobstreamtesting.SetDataCommitmentWindowParams(codec, types.Params{DataCommitmentWindow: 101}), }, - TimeIotaMs: 1, + TimeIotaMs: 1, + Pruning: "default", + TimeoutCommit: 5 * time.Millisecond, }, ) s.Orchestrator = blobstreamtesting.NewOrchestrator(t, s.Node) diff --git a/relayer/historic_relayer_test.go b/relayer/historic_relayer_test.go new file mode 100644 index 00000000..7c7170e6 --- /dev/null +++ b/relayer/historic_relayer_test.go @@ -0,0 +1,43 @@ +package relayer_test + +import ( + "context" + "math/big" + "time" + + blobstreamtypes "github.com/celestiaorg/orchestrator-relayer/types" + + "github.com/celestiaorg/celestia-app/x/qgb/types" + "github.com/stretchr/testify/require" +) + +func (s *HistoricalRelayerTestSuite) TestProcessHistoricAttestation() { + t := s.T() + _, err := s.Node.CelestiaNetwork.WaitForHeightWithTimeout(400, 30*time.Second) + require.NoError(t, err) + + ctx := context.Background() + valset, err := s.Orchestrator.AppQuerier.QueryLatestValset(ctx) + require.NoError(t, err) + + // wait for the valset to be pruned to test if the relayer is able to + // relay using a pruned valset. + for { + _, err = s.Orchestrator.AppQuerier.QueryAttestationByNonce(ctx, valset.Nonce) + if err != nil { + break + } + } + + // sign a test data commitment so that the relayer can relay it + att := types.NewDataCommitment(valset.Nonce+1, 10, 100, time.Now()) + commitment, err := s.Orchestrator.TmQuerier.QueryCommitment(ctx, att.BeginBlock, att.EndBlock) + require.NoError(t, err) + dataRootTupleRoot := blobstreamtypes.DataCommitmentTupleRootSignBytes(big.NewInt(int64(att.Nonce)), commitment) + err = s.Orchestrator.ProcessDataCommitmentEvent(ctx, *att, dataRootTupleRoot) + require.NoError(t, err) + + // process the test data commitment that needs the pruned valset to be relayed. + _, err = s.Relayer.ProcessAttestation(ctx, s.Node.EVMChain.Auth, att) + require.NoError(t, err) +} diff --git a/relayer/historic_suite_test.go b/relayer/historic_suite_test.go new file mode 100644 index 00000000..fa00c9e1 --- /dev/null +++ b/relayer/historic_suite_test.go @@ -0,0 +1,66 @@ +package relayer_test + +import ( + "context" + "testing" + "time" + + "github.com/celestiaorg/celestia-app/app" + "github.com/celestiaorg/celestia-app/app/encoding" + "github.com/celestiaorg/celestia-app/test/util/testnode" + "github.com/celestiaorg/celestia-app/x/qgb/types" + "github.com/celestiaorg/orchestrator-relayer/rpc" + + "github.com/celestiaorg/orchestrator-relayer/orchestrator" + + "github.com/celestiaorg/orchestrator-relayer/relayer" + blobstreamtesting "github.com/celestiaorg/orchestrator-relayer/testing" + "github.com/stretchr/testify/require" + "github.com/stretchr/testify/suite" +) + +type HistoricalRelayerTestSuite struct { + suite.Suite + Node *blobstreamtesting.TestNode + Orchestrator *orchestrator.Orchestrator + Relayer *relayer.Relayer +} + +func (s *HistoricalRelayerTestSuite) SetupSuite() { + t := s.T() + if testing.Short() { + t.Skip("skipping relayer tests in short mode.") + } + ctx := context.Background() + s.Node = blobstreamtesting.NewTestNode( + ctx, + t, + blobstreamtesting.CelestiaNetworkParams{ + GenesisOpts: []testnode.GenesisOption{blobstreamtesting.SetDataCommitmentWindowParams( + encoding.MakeConfig(app.ModuleEncodingRegisters...).Codec, + types.Params{DataCommitmentWindow: 101}, + )}, + TimeIotaMs: 3048000, // so that old attestations are deleted as soon as a new one appears + Pruning: "nothing", // make the node an archive one + TimeoutCommit: 20 * time.Millisecond, + }, + ) + _, err := s.Node.CelestiaNetwork.WaitForHeight(2) + require.NoError(t, err) + s.Orchestrator = blobstreamtesting.NewOrchestrator(t, s.Node) + s.Relayer = blobstreamtesting.NewRelayer(t, s.Node) + go s.Node.EVMChain.PeriodicCommit(ctx, time.Millisecond) + initVs, err := s.Relayer.AppQuerier.QueryLatestValset(s.Node.Context) + require.NoError(t, err) + _, _, _, err = s.Relayer.EVMClient.DeployBlobstreamContract(s.Node.EVMChain.Auth, s.Node.EVMChain.Backend, *initVs, initVs.Nonce, true) + require.NoError(t, err) + rpc.BlocksIn20DaysPeriod = 50 +} + +func (s *HistoricalRelayerTestSuite) TearDownSuite() { + s.Node.Close() +} + +func TestHistoricRelayer(t *testing.T) { + suite.Run(t, new(HistoricalRelayerTestSuite)) +} diff --git a/relayer/relayer.go b/relayer/relayer.go index 79c7cd78..f8f55417 100644 --- a/relayer/relayer.go +++ b/relayer/relayer.go @@ -145,11 +145,20 @@ func (r *Relayer) Start(ctx context.Context) error { func (r *Relayer) ProcessAttestation(ctx context.Context, opts *bind.TransactOpts, attI celestiatypes.AttestationRequestI) (*coregethtypes.Transaction, error) { previousValset, err := r.AppQuerier.QueryLastValsetBeforeNonce(ctx, attI.GetNonce()) if err != nil { - r.logger.Error("failed to query the last valset before nonce (probably pruned). recovering via falling back to the P2P network", "err", err.Error()) + r.logger.Debug("failed to query the last valset before nonce (probably pruned). recovering via falling back to the P2P network", "err", err.Error()) previousValset, err = r.QueryValsetFromP2PNetworkAndValidateIt(ctx) if err != nil { - return nil, err + r.logger.Debug("failed to query the last valset before nonce from p2p network. attempting via using an archive node (might take some time)", "err", err.Error()) + currentHeight, err := r.TmQuerier.QueryHeight(ctx) + if err != nil { + return nil, err + } + previousValset, err = r.AppQuerier.QueryRecursiveHistoricalLastValsetBeforeNonce(ctx, attI.GetNonce(), uint64(currentHeight)) + if err != nil { + return nil, err + } } + r.logger.Debug("found the needed valset") } switch att := attI.(type) { case *celestiatypes.Valset: diff --git a/relayer/relayer_test.go b/relayer/relayer_test.go index 887f1021..586ccd78 100644 --- a/relayer/relayer_test.go +++ b/relayer/relayer_test.go @@ -68,7 +68,9 @@ func TestUseValsetFromP2P(t *testing.T) { testnode.ImmediateProposals(codec), qgbtesting.SetDataCommitmentWindowParams(codec, types.Params{DataCommitmentWindow: 101}), }, - TimeIotaMs: 2000000, // so attestations are pruned after they're queried + TimeIotaMs: 2000000, // so attestations are pruned after they're queried + Pruning: "default", + TimeoutCommit: 5 * time.Millisecond, }, ) diff --git a/rpc/app_historic_querier_test.go b/rpc/app_historic_querier_test.go new file mode 100644 index 00000000..71c874b0 --- /dev/null +++ b/rpc/app_historic_querier_test.go @@ -0,0 +1,80 @@ +package rpc_test + +import ( + "context" +) + +func (s *HistoricQuerierTestSuite) TestQueryHistoricAttestationByNonce() { + appQuerier := s.setupAppQuerier() + + // this one should fail because the attestation is deleted from the state + _, err := appQuerier.QueryAttestationByNonce(context.Background(), 1) + s.Error(err) + + att, err := appQuerier.QueryHistoricalAttestationByNonce(context.Background(), 1, 10) + s.NoError(err) + s.NotNil(att) + s.Equal(uint64(1), att.GetNonce()) +} + +func (s *HistoricQuerierTestSuite) TestQueryRecursiveHistoricAttestationByNonce() { + appQuerier := s.setupAppQuerier() + + // this one should fail because the attestation is deleted from the state + _, err := appQuerier.QueryAttestationByNonce(context.Background(), 1) + s.Error(err) + + height, err := s.Network.LatestHeight() + s.Require().NoError(err) + att, err := appQuerier.QueryRecursiveHistoricalAttestationByNonce(context.Background(), 1, uint64(height)) + s.Require().NoError(err) + s.NotNil(att) + s.Equal(uint64(1), att.GetNonce()) +} + +func (s *HistoricQuerierTestSuite) TestQueryHistoricalLatestAttestationNonce() { + appQuerier := s.setupAppQuerier() + + nonce, err := appQuerier.QueryHistoricalLatestAttestationNonce(context.Background(), 2) + s.Require().NoError(err) + s.Equal(uint64(1), nonce) +} + +func (s *HistoricQuerierTestSuite) TestQueryHistoricalValsetByNonce() { + appQuerier := s.setupAppQuerier() + + // this one should fail because the attestation is deleted from the state + _, err := appQuerier.QueryValsetByNonce(context.Background(), 1) + s.Error(err) + + att, err := appQuerier.QueryHistoricalValsetByNonce(context.Background(), 1, 10) + s.Require().NoError(err) + s.NotNil(att) + s.Equal(uint64(1), att.GetNonce()) +} + +func (s *HistoricQuerierTestSuite) TestQueryHistoricalLastValsetBeforeNonce() { + appQuerier := s.setupAppQuerier() + + // this one should fail because the attestation is deleted from the state + _, err := appQuerier.QueryLastValsetBeforeNonce(context.Background(), 2) + s.Error(err) + + att, err := appQuerier.QueryHistoricalLastValsetBeforeNonce(context.Background(), 2, 102) + s.Require().NoError(err) + s.NotNil(att) + s.Equal(uint64(1), att.GetNonce()) +} + +func (s *HistoricQuerierTestSuite) TestQueryRecursiveHistoricalLastValsetBeforeNonce() { + appQuerier := s.setupAppQuerier() + + // this one should fail because the attestation is deleted from the state + _, err := appQuerier.QueryLastValsetBeforeNonce(context.Background(), 2) + s.Error(err) + + att, err := appQuerier.QueryRecursiveHistoricalLastValsetBeforeNonce(context.Background(), 2, 201) + s.Require().NoError(err) + s.NotNil(att) + s.Equal(uint64(1), att.GetNonce()) +} diff --git a/rpc/app_querier.go b/rpc/app_querier.go index c49b3aab..0f0c545f 100644 --- a/rpc/app_querier.go +++ b/rpc/app_querier.go @@ -3,6 +3,12 @@ package rpc import ( "context" "crypto/tls" + "strconv" + "time" + + "github.com/celestiaorg/celestia-app/pkg/appconsts" + cosmosgrpc "github.com/cosmos/cosmos-sdk/types/grpc" + "google.golang.org/grpc/metadata" "google.golang.org/grpc" "google.golang.org/grpc/credentials" @@ -16,6 +22,10 @@ import ( tmlog "github.com/tendermint/tendermint/libs/log" ) +// BlocksIn20DaysPeriod represents the number of blocks in 20-days period. +// It uses the timeout commit constant, defined in app, for the computation +var BlocksIn20DaysPeriod = 20 * 24 * 60 * 60 / appconsts.TimeoutCommit.Seconds() + // AppQuerier queries the application for attestations and unbonding periods. type AppQuerier struct { blobStreamRPC string @@ -74,6 +84,66 @@ func (aq *AppQuerier) QueryAttestationByNonce(ctx context.Context, nonce uint64) return unmarshalledAttestation, nil } +// QueryHistoricalAttestationByNonce query an attestation by nonce from the state machine at a certain height. +func (aq *AppQuerier) QueryHistoricalAttestationByNonce(ctx context.Context, nonce uint64, height uint64) (celestiatypes.AttestationRequestI, error) { + queryClient := celestiatypes.NewQueryClient(aq.clientConn) + + var header metadata.MD + atResp, err := queryClient.AttestationRequestByNonce( + metadata.AppendToOutgoingContext(ctx, cosmosgrpc.GRPCBlockHeightHeader, strconv.FormatUint(height, 10)), // Add metadata to request + &celestiatypes.QueryAttestationRequestByNonceRequest{Nonce: nonce}, + grpc.Header(&header), // Retrieve header from response + ) + if err != nil { + return nil, err + } + if atResp.Attestation == nil { + return nil, nil + } + + unmarshalledAttestation, err := aq.unmarshallAttestation(atResp.Attestation) + if err != nil { + return nil, err + } + + return unmarshalledAttestation, nil +} + +// QueryRecursiveHistoricalAttestationByNonce query an attestation by nonce from the state machine +// via going over the history step by step starting from height. +func (aq *AppQuerier) QueryRecursiveHistoricalAttestationByNonce(ctx context.Context, nonce uint64, height uint64) (celestiatypes.AttestationRequestI, error) { + queryClient := celestiatypes.NewQueryClient(aq.clientConn) + + currentHeight := height + ctx, cancel := context.WithTimeout(ctx, 10*time.Minute) + defer cancel() + for currentHeight >= 1 { + select { + case <-ctx.Done(): + return nil, ctx.Err() + default: + var header metadata.MD + atResp, err := queryClient.AttestationRequestByNonce( + metadata.AppendToOutgoingContext(ctx, cosmosgrpc.GRPCBlockHeightHeader, strconv.FormatUint(currentHeight, 10)), // Add metadata to request + &celestiatypes.QueryAttestationRequestByNonceRequest{Nonce: nonce}, + grpc.Header(&header), // Retrieve header from response + ) + if err == nil { + unmarshalledAttestation, err := aq.unmarshallAttestation(atResp.Attestation) + if err != nil { + return nil, err + } + return unmarshalledAttestation, nil + } + if currentHeight <= uint64(BlocksIn20DaysPeriod) { + return nil, ErrNotFound + } + currentHeight -= uint64(BlocksIn20DaysPeriod) + } + } + return nil, ErrNotFound +} + // QueryLatestAttestationNonce query the latest attestation nonce from the state machine. func (aq *AppQuerier) QueryLatestAttestationNonce(ctx context.Context) (uint64, error) { queryClient := celestiatypes.NewQueryClient(aq.clientConn) @@ -89,6 +159,23 @@ func (aq *AppQuerier) QueryLatestAttestationNonce(ctx context.Context) (uint64, return resp.Nonce, nil } +// QueryHistoricalLatestAttestationNonce query the historical latest attestation nonce from the state machine at a certain nonce. +func (aq *AppQuerier) QueryHistoricalLatestAttestationNonce(ctx context.Context, height uint64) (uint64, error) { + queryClient := celestiatypes.NewQueryClient(aq.clientConn) + + var header metadata.MD + resp, err := queryClient.LatestAttestationNonce( + metadata.AppendToOutgoingContext(ctx, cosmosgrpc.GRPCBlockHeightHeader, strconv.FormatUint(height, 10)), + &celestiatypes.QueryLatestAttestationNonceRequest{}, + grpc.Header(&header), + ) + if err != nil { + return 0, err + } + + return resp.Nonce, nil +} + // QueryDataCommitmentByNonce query a data commitment by its nonce. func (aq *AppQuerier) QueryDataCommitmentByNonce(ctx context.Context, nonce uint64) (*celestiatypes.DataCommitment, error) { attestation, err := aq.QueryAttestationByNonce(ctx, nonce) @@ -145,6 +232,24 @@ func (aq *AppQuerier) QueryValsetByNonce(ctx context.Context, nonce uint64) (*ce return value, nil } +// QueryHistoricalValsetByNonce query a historical valset by nonce. +func (aq *AppQuerier) QueryHistoricalValsetByNonce(ctx context.Context, nonce uint64, height uint64) (*celestiatypes.Valset, error) { + attestation, err := aq.QueryHistoricalAttestationByNonce(ctx, nonce, height) + if err != nil { + return nil, err + } + if attestation == nil { + return nil, types.ErrAttestationNotFound + } + + value, ok := attestation.(*celestiatypes.Valset) + if !ok { + return nil, types.ErrUnmarshalValset + } + + return value, nil +} + // QueryLatestValset query the latest recorded valset in the state machine. func (aq *AppQuerier) QueryLatestValset(ctx context.Context) (*celestiatypes.Valset, error) { latestNonce, err := aq.QueryLatestAttestationNonce(ctx) @@ -164,6 +269,39 @@ func (aq *AppQuerier) QueryLatestValset(ctx context.Context) (*celestiatypes.Val return latestValset, nil } +// QueryRecursiveLatestValset query the latest recorded valset in the state machine in history. +func (aq *AppQuerier) QueryRecursiveLatestValset(ctx context.Context, height uint64) (*celestiatypes.Valset, error) { + currentHeight := height + ctx, cancel := context.WithTimeout(ctx, 10*time.Minute) + defer cancel() + for currentHeight >= 1 { + select { + case <-ctx.Done(): + return nil, ctx.Err() + default: + latestNonce, err := aq.QueryHistoricalLatestAttestationNonce(ctx, currentHeight) + if err != nil { + return nil, err + } + + if vs, err := aq.QueryHistoricalValsetByNonce(ctx, latestNonce, currentHeight); err == nil { + return vs, nil + } + + latestValset, err := aq.QueryHistoricalLastValsetBeforeNonce(ctx, latestNonce, currentHeight) + if err == nil { + return latestValset, nil + } + + if currentHeight <= uint64(BlocksIn20DaysPeriod) { + return nil, ErrNotFound + } + currentHeight -= uint64(BlocksIn20DaysPeriod) + } + } + return nil, ErrNotFound +} + // QueryLastValsetBeforeNonce returns the last valset before nonce. // This will be needed when signing to know the validator set at that particular nonce. // the provided `nonce` can be a valset, but this will return the valset before it. @@ -181,6 +319,48 @@ func (aq *AppQuerier) QueryLastValsetBeforeNonce(ctx context.Context, nonce uint return resp.Valset, nil } +// QueryHistoricalLastValsetBeforeNonce returns the last historical valset before nonce for a certain height. +func (aq *AppQuerier) QueryHistoricalLastValsetBeforeNonce(ctx context.Context, nonce uint64, height uint64) (*celestiatypes.Valset, error) { + queryClient := celestiatypes.NewQueryClient(aq.clientConn) + var header metadata.MD + resp, err := queryClient.LatestValsetRequestBeforeNonce( + metadata.AppendToOutgoingContext(ctx, cosmosgrpc.GRPCBlockHeightHeader, strconv.FormatUint(height, 10)), + &celestiatypes.QueryLatestValsetRequestBeforeNonceRequest{Nonce: nonce}, + grpc.Header(&header), + ) + if err != nil { + return nil, err + } + + return resp.Valset, nil +} + +// QueryRecursiveHistoricalLastValsetBeforeNonce recursively looks for the last historical valset before nonce for a certain height until genesis. +func (aq *AppQuerier) QueryRecursiveHistoricalLastValsetBeforeNonce(ctx context.Context, nonce uint64, height uint64) (*celestiatypes.Valset, error) { + ctx, cancel := context.WithTimeout(ctx, 10*time.Minute) + defer cancel() + currentNonce := nonce - 1 + for { + select { + case <-ctx.Done(): + return nil, ctx.Err() + default: + if currentNonce == 0 { + return nil, ErrNotFound + } + n, err := aq.QueryRecursiveHistoricalAttestationByNonce(ctx, currentNonce, height) + if err != nil { + return nil, err + } + vs, ok := n.(*celestiatypes.Valset) + if ok { + return vs, nil + } + currentNonce-- + } + } +} + // QueryLastUnbondingHeight query the last unbonding height from state machine. func (aq *AppQuerier) QueryLastUnbondingHeight(ctx context.Context) (int64, error) { queryClient := celestiatypes.NewQueryClient(aq.clientConn) diff --git a/rpc/errors.go b/rpc/errors.go index eabcf003..b91e0f6b 100644 --- a/rpc/errors.go +++ b/rpc/errors.go @@ -2,4 +2,7 @@ package rpc import "errors" -var ErrCouldntReachSpecifiedHeight = errors.New("couldn't reach specified height") +var ( + ErrCouldntReachSpecifiedHeight = errors.New("couldn't reach specified height") + ErrNotFound = errors.New("not found") +) diff --git a/rpc/historic_suite_test.go b/rpc/historic_suite_test.go new file mode 100644 index 00000000..43140a86 --- /dev/null +++ b/rpc/historic_suite_test.go @@ -0,0 +1,64 @@ +package rpc_test + +import ( + "context" + "testing" + "time" + + "github.com/celestiaorg/celestia-app/test/util/testnode" + "github.com/celestiaorg/celestia-app/x/qgb/types" + "github.com/celestiaorg/orchestrator-relayer/rpc" + + "github.com/celestiaorg/celestia-app/app" + "github.com/celestiaorg/celestia-app/app/encoding" + tmlog "github.com/tendermint/tendermint/libs/log" + + "github.com/stretchr/testify/require" + + blobstreamtesting "github.com/celestiaorg/orchestrator-relayer/testing" + "github.com/stretchr/testify/suite" +) + +type HistoricQuerierTestSuite struct { + suite.Suite + Network *blobstreamtesting.CelestiaNetwork + EncConf encoding.Config + Logger tmlog.Logger +} + +func (s *HistoricQuerierTestSuite) SetupSuite() { + t := s.T() + ctx := context.Background() + s.EncConf = encoding.MakeConfig(app.ModuleEncodingRegisters...) + s.Network = blobstreamtesting.NewCelestiaNetwork( + ctx, + t, + blobstreamtesting.CelestiaNetworkParams{ + GenesisOpts: []testnode.GenesisOption{blobstreamtesting.SetDataCommitmentWindowParams(s.EncConf.Codec, types.Params{DataCommitmentWindow: 101})}, + TimeIotaMs: 6048000, // so that old attestations are deleted as soon as a new one appears + Pruning: "nothing", // make the node an archive one + TimeoutCommit: 20 * time.Millisecond, + }, + ) + _, err := s.Network.WaitForHeightWithTimeout(401, 30*time.Second) + require.NoError(t, err) + s.Logger = tmlog.NewNopLogger() + rpc.BlocksIn20DaysPeriod = 100 +} + +func TestHistoricQueriers(t *testing.T) { + suite.Run(t, new(HistoricQuerierTestSuite)) +} + +func (s *HistoricQuerierTestSuite) setupAppQuerier() *rpc.AppQuerier { + appQuerier := rpc.NewAppQuerier( + s.Logger, + s.Network.GRPCAddr, + s.EncConf, + ) + require.NoError(s.T(), appQuerier.Start(true)) + s.T().Cleanup(func() { + appQuerier.Stop() //nolint:errcheck + }) + return appQuerier +} diff --git a/testing/celestia_network.go b/testing/celestia_network.go index 5afa8c41..8ec98aec 100644 --- a/testing/celestia_network.go +++ b/testing/celestia_network.go @@ -42,14 +42,18 @@ type CelestiaNetwork struct { } type CelestiaNetworkParams struct { - GenesisOpts []celestiatestnode.GenesisOption - TimeIotaMs int64 + GenesisOpts []celestiatestnode.GenesisOption + TimeIotaMs int64 + Pruning string + TimeoutCommit time.Duration } func DefaultCelestiaNetworkParams() CelestiaNetworkParams { return CelestiaNetworkParams{ - GenesisOpts: nil, - TimeIotaMs: 1, + GenesisOpts: nil, + TimeIotaMs: 1, + Pruning: "default", + TimeoutCommit: 5 * time.Millisecond, } } @@ -73,8 +77,9 @@ func NewCelestiaNetwork(ctx context.Context, t *testing.T, params CelestiaNetwor } tmCfg := celestiatestnode.DefaultTendermintConfig() - tmCfg.Consensus.TimeoutCommit = time.Millisecond * 5 + tmCfg.Consensus.TimeoutCommit = params.TimeoutCommit appConf := celestiatestnode.DefaultAppConfig() + appConf.Pruning = params.Pruning consensusParams := celestiatestnode.DefaultParams() consensusParams.Block.TimeIotaMs = params.TimeIotaMs