From c2f01efae7b27cd447545613b22a343ae71c26ec Mon Sep 17 00:00:00 2001 From: rachid Date: Tue, 31 Oct 2023 13:54:56 +0100 Subject: [PATCH 01/13] feat: use historical data to query attestations --- cmd/blobstream/deploy/cmd.go | 56 ++++++-- cmd/blobstream/deploy/config.go | 21 ++- cmd/blobstream/deploy/errors.go | 5 +- e2e/scripts/deploy_blobstream_contract.sh | 2 + relayer/relayer.go | 10 +- rpc/app_querier.go | 154 ++++++++++++++++++++++ rpc/errors.go | 5 +- 7 files changed, 233 insertions(+), 20 deletions(-) diff --git a/cmd/blobstream/deploy/cmd.go b/cmd/blobstream/deploy/cmd.go index 211bbfa7..8be8f56d 100644 --- a/cmd/blobstream/deploy/cmd.go +++ b/cmd/blobstream/deploy/cmd.go @@ -43,19 +43,31 @@ func Command() *cobra.Command { encCfg := encoding.MakeConfig(app.ModuleEncodingRegisters...) - querier := rpc.NewAppQuerier(logger, config.coreGRPC, encCfg) - err = querier.Start() + appQuerier := rpc.NewAppQuerier(logger, config.coreGRPC, encCfg) + err = appQuerier.Start() if err != nil { return err } defer func() { - err := querier.Stop() + err := appQuerier.Stop() if err != nil { logger.Error(err.Error()) } }() - vs, err := getStartingValset(cmd.Context(), querier, config.startingNonce) + tmQuerier := rpc.NewTmQuerier(config.coreRPC, logger) + err = tmQuerier.Start() + if err != nil { + return err + } + defer func(tmQuerier *rpc.TmQuerier) { + err := tmQuerier.Stop() + if err != nil { + logger.Error(err.Error()) + } + }(tmQuerier) + + vs, err := getStartingValset(cmd.Context(), *tmQuerier, appQuerier, config.startingNonce) if err != nil { logger.Error("couldn't get valset from state (probably pruned). connect to an archive node to be able to deploy the contract") return errors.Wrap( @@ -130,15 +142,29 @@ func Command() *cobra.Command { } // getStartingValset get the valset that will be used to init the bridge contract. -func getStartingValset(ctx context.Context, querier *rpc.AppQuerier, startingNonce string) (*types.Valset, error) { +func getStartingValset(ctx context.Context, tmQuerier rpc.TmQuerier, appQuerier *rpc.AppQuerier, startingNonce string) (*types.Valset, error) { switch startingNonce { case "latest": - return querier.QueryLatestValset(ctx) + vs, err := appQuerier.QueryLatestValset(ctx) + if err != nil { + appQuerier.Logger.Debug("couldn't get the attestation from node state. trying with historical data if the target node is archival", "nonce", 1, "err", err.Error()) + currentHeight, err := tmQuerier.QueryHeight(ctx) + if err != nil { + return nil, err + } + return appQuerier.QueryRecursiveLatestValset(ctx, uint64(currentHeight)) + } + return vs, nil case "earliest": // TODO make the first nonce 1 a const - att, err := querier.QueryAttestationByNonce(ctx, 1) + att, err := appQuerier.QueryAttestationByNonce(ctx, 1) if err != nil { - return nil, err + appQuerier.Logger.Debug("couldn't get the attestation from node state. trying with historical data if the target node is archival", "nonce", 1, "err", err.Error()) + historicalAtt, err := appQuerier.QueryHistoricalAttestationByNonce(ctx, 1, 1) + if err != nil { + return nil, err + } + att = historicalAtt } vs, ok := att.(*types.Valset) if !ok { @@ -150,17 +176,23 @@ func getStartingValset(ctx context.Context, querier *rpc.AppQuerier, startingNon if err != nil { return nil, err } - attestation, err := querier.QueryAttestationByNonce(ctx, nonce) + currentHeight, err := tmQuerier.QueryHeight(ctx) + if err != nil { + return nil, err + } + attestation, err := appQuerier.QueryRecursiveHistoricalAttestationByNonce(ctx, nonce, uint64(currentHeight)) if err != nil { return nil, err } if attestation == nil { return nil, types.ErrNilAttestation } - value, ok := attestation.(*types.Valset) - if ok { + switch value := attestation.(type) { + case *types.Valset: return value, nil + case *types.DataCommitment: + return appQuerier.QueryRecursiveHistoricalLastValsetBeforeNonce(ctx, nonce, value.EndBlock) } - return querier.QueryLastValsetBeforeNonce(ctx, nonce) } + return nil, ErrNotFound } diff --git a/cmd/blobstream/deploy/config.go b/cmd/blobstream/deploy/config.go index f84cdc3b..47b479e6 100644 --- a/cmd/blobstream/deploy/config.go +++ b/cmd/blobstream/deploy/config.go @@ -17,6 +17,8 @@ const ( FlagEVMGasLimit = "evm.gas-limit" FlagCoreGRPCHost = "core.grpc.host" FlagCoreGRPCPort = "core.grpc.port" + FlagCoreRPCHost = "core.rpc.host" + FlagCoreRPCPort = "core.rpc.port" FlagStartingNonce = "starting-nonce" ServiceNameDeployer = "deployer" ) @@ -26,6 +28,8 @@ func addDeployFlags(cmd *cobra.Command) *cobra.Command { cmd.Flags().Uint64(FlagEVMChainID, 5, "Specify the evm chain id") cmd.Flags().String(FlagCoreGRPCHost, "localhost", "Specify the grpc address host") cmd.Flags().Uint(FlagCoreGRPCPort, 9090, "Specify the grpc address port") + cmd.Flags().String(FlagCoreRPCHost, "localhost", "Specify the rpc address host") + cmd.Flags().Uint(FlagCoreRPCPort, 26657, "Specify the rpc address port") cmd.Flags().String(FlagEVMRPC, "http://localhost:8545", "Specify the ethereum rpc address") cmd.Flags().String( FlagStartingNonce, @@ -48,11 +52,12 @@ func addDeployFlags(cmd *cobra.Command) *cobra.Command { type deployConfig struct { *base.Config - evmRPC, coreGRPC string - evmChainID uint64 - evmAccAddress string - startingNonce string - evmGasLimit uint64 + evmRPC string + coreRPC, coreGRPC string + evmChainID uint64 + evmAccAddress string + startingNonce string + evmGasLimit uint64 } func parseDeployFlags(cmd *cobra.Command) (deployConfig, error) { @@ -72,6 +77,11 @@ func parseDeployFlags(cmd *cobra.Command) (deployConfig, error) { return deployConfig{}, err } coreGRPCPort, err := cmd.Flags().GetUint(FlagCoreGRPCPort) + coreRPCHost, err := cmd.Flags().GetString(FlagCoreRPCHost) + if err != nil { + return deployConfig{}, err + } + coreRPCPort, err := cmd.Flags().GetUint(FlagCoreRPCPort) if err != nil { return deployConfig{}, err } @@ -107,6 +117,7 @@ func parseDeployFlags(cmd *cobra.Command) (deployConfig, error) { evmAccAddress: evmAccAddr, evmChainID: evmChainID, coreGRPC: fmt.Sprintf("%s:%d", coreGRPCHost, coreGRPCPort), + coreRPC: fmt.Sprintf("tcp://%s:%d", coreRPCHost, coreRPCPort), evmRPC: evmRPC, startingNonce: startingNonce, evmGasLimit: evmGasLimit, diff --git a/cmd/blobstream/deploy/errors.go b/cmd/blobstream/deploy/errors.go index f5d814b0..d5149d7e 100644 --- a/cmd/blobstream/deploy/errors.go +++ b/cmd/blobstream/deploy/errors.go @@ -2,4 +2,7 @@ package deploy import "errors" -var ErrUnmarshallValset = errors.New("couldn't unmarsall valset") +var ( + ErrUnmarshallValset = errors.New("couldn't unmarshall valset") + ErrNotFound = errors.New("not found") +) diff --git a/e2e/scripts/deploy_blobstream_contract.sh b/e2e/scripts/deploy_blobstream_contract.sh index b6270353..077ca8a9 100644 --- a/e2e/scripts/deploy_blobstream_contract.sh +++ b/e2e/scripts/deploy_blobstream_contract.sh @@ -68,6 +68,8 @@ echo "deploying Blobstream contract..." --evm.account "${EVM_ACCOUNT}" \ --core.grpc.host "${CORE_GRPC_HOST}" \ --core.grpc.port "${CORE_GRPC_PORT}" \ + --core.rpc.host "${CORE_RPC_HOST}" \ + --core.rpc.port "${CORE_RPC_PORT}" \ --starting-nonce "${STARTING_NONCE}" \ --evm.rpc "${EVM_ENDPOINT}" \ --evm.passphrase=123 > /opt/output diff --git a/relayer/relayer.go b/relayer/relayer.go index 79c7cd78..f77a0593 100644 --- a/relayer/relayer.go +++ b/relayer/relayer.go @@ -148,7 +148,15 @@ func (r *Relayer) ProcessAttestation(ctx context.Context, opts *bind.TransactOpt r.logger.Error("failed to query the last valset before nonce (probably pruned). recovering via falling back to the P2P network", "err", err.Error()) previousValset, err = r.QueryValsetFromP2PNetworkAndValidateIt(ctx) if err != nil { - return nil, err + r.logger.Error("failed to query the last valset before nonce from p2p network. trying using an archive node", "err", err.Error()) + currentHeight, err := r.TmQuerier.QueryHeight(ctx) + if err != nil { + return nil, err + } + previousValset, err = r.AppQuerier.QueryRecursiveHistoricalLastValsetBeforeNonce(ctx, attI.GetNonce(), uint64(currentHeight)) + if err != nil { + return nil, err + } } } switch att := attI.(type) { diff --git a/rpc/app_querier.go b/rpc/app_querier.go index 1a4d873d..366a9e55 100644 --- a/rpc/app_querier.go +++ b/rpc/app_querier.go @@ -2,6 +2,11 @@ package rpc import ( "context" + "strconv" + + "github.com/celestiaorg/celestia-app/pkg/appconsts" + cosmosgrpc "github.com/cosmos/cosmos-sdk/types/grpc" + "google.golang.org/grpc/metadata" "google.golang.org/grpc" "google.golang.org/grpc/credentials/insecure" @@ -14,6 +19,8 @@ import ( tmlog "github.com/tendermint/tendermint/libs/log" ) +var BlocksIn20DaysPeriod = 20 * 24 * 60 * 60 / appconsts.TimeoutCommit.Seconds() + // AppQuerier queries the application for attestations and unbonding periods. type AppQuerier struct { blobStreamRPC string @@ -63,6 +70,57 @@ func (aq *AppQuerier) QueryAttestationByNonce(ctx context.Context, nonce uint64) return unmarshalledAttestation, nil } +// QueryHistoricalAttestationByNonce query an attestation by nonce from the state machine at a certain height. +func (aq *AppQuerier) QueryHistoricalAttestationByNonce(ctx context.Context, nonce uint64, height uint64) (celestiatypes.AttestationRequestI, error) { + queryClient := celestiatypes.NewQueryClient(aq.clientConn) + + var header metadata.MD + atResp, err := queryClient.AttestationRequestByNonce( + metadata.AppendToOutgoingContext(ctx, cosmosgrpc.GRPCBlockHeightHeader, strconv.FormatUint(height, 10)), // Add metadata to request + &celestiatypes.QueryAttestationRequestByNonceRequest{Nonce: nonce}, + grpc.Header(&header), // Retrieve header from response + ) + if err != nil { + return nil, err + } + if atResp.Attestation == nil { + return nil, nil + } + + unmarshalledAttestation, err := aq.unmarshallAttestation(atResp.Attestation) + if err != nil { + return nil, err + } + + return unmarshalledAttestation, nil +} + +// QueryRecursiveHistoricalAttestationByNonce query an attestation by nonce from the state machine +// via going over the history step by step starting from height. +func (aq *AppQuerier) QueryRecursiveHistoricalAttestationByNonce(ctx context.Context, nonce uint64, height uint64) (celestiatypes.AttestationRequestI, error) { + queryClient := celestiatypes.NewQueryClient(aq.clientConn) + + currentHeight := height + for currentHeight >= 1 { + var header metadata.MD + atResp, err := queryClient.AttestationRequestByNonce( + metadata.AppendToOutgoingContext(ctx, cosmosgrpc.GRPCBlockHeightHeader, strconv.FormatUint(currentHeight, 10)), // Add metadata to request + &celestiatypes.QueryAttestationRequestByNonceRequest{Nonce: nonce}, + grpc.Header(&header), // Retrieve header from response + ) + if err == nil { + unmarshalledAttestation, err := aq.unmarshallAttestation(atResp.Attestation) + if err != nil { + return nil, err + } + return unmarshalledAttestation, nil + } + aq.Logger.Debug("keeping looking for attestation in archival state", "err", err.Error()) + currentHeight -= uint64(BlocksIn20DaysPeriod) + } + return nil, ErrNotFound +} + // QueryLatestAttestationNonce query the latest attestation nonce from the state machine. func (aq *AppQuerier) QueryLatestAttestationNonce(ctx context.Context) (uint64, error) { queryClient := celestiatypes.NewQueryClient(aq.clientConn) @@ -78,6 +136,23 @@ func (aq *AppQuerier) QueryLatestAttestationNonce(ctx context.Context) (uint64, return resp.Nonce, nil } +// QueryHistoricalLatestAttestationNonce query the historical latest attestation nonce from the state machine at a certain nonce. +func (aq *AppQuerier) QueryHistoricalLatestAttestationNonce(ctx context.Context, height uint64) (uint64, error) { + queryClient := celestiatypes.NewQueryClient(aq.clientConn) + + var header metadata.MD + resp, err := queryClient.LatestAttestationNonce( + metadata.AppendToOutgoingContext(ctx, cosmosgrpc.GRPCBlockHeightHeader, strconv.FormatUint(height, 10)), + &celestiatypes.QueryLatestAttestationNonceRequest{}, + grpc.Header(&header), + ) + if err != nil { + return 0, err + } + + return resp.Nonce, nil +} + // QueryDataCommitmentByNonce query a data commitment by its nonce. func (aq *AppQuerier) QueryDataCommitmentByNonce(ctx context.Context, nonce uint64) (*celestiatypes.DataCommitment, error) { attestation, err := aq.QueryAttestationByNonce(ctx, nonce) @@ -134,6 +209,24 @@ func (aq *AppQuerier) QueryValsetByNonce(ctx context.Context, nonce uint64) (*ce return value, nil } +// QueryHistoricalValsetByNonce query a historical valset by nonce. +func (aq *AppQuerier) QueryHistoricalValsetByNonce(ctx context.Context, nonce uint64, height uint64) (*celestiatypes.Valset, error) { + attestation, err := aq.QueryHistoricalAttestationByNonce(ctx, nonce, height) + if err != nil { + return nil, err + } + if attestation == nil { + return nil, types.ErrAttestationNotFound + } + + value, ok := attestation.(*celestiatypes.Valset) + if !ok { + return nil, types.ErrUnmarshalValset + } + + return value, nil +} + // QueryLatestValset query the latest recorded valset in the state machine. func (aq *AppQuerier) QueryLatestValset(ctx context.Context) (*celestiatypes.Valset, error) { latestNonce, err := aq.QueryLatestAttestationNonce(ctx) @@ -153,6 +246,30 @@ func (aq *AppQuerier) QueryLatestValset(ctx context.Context) (*celestiatypes.Val return latestValset, nil } +// QueryRecursiveLatestValset query the latest recorded valset in the state machine in history. +func (aq *AppQuerier) QueryRecursiveLatestValset(ctx context.Context, height uint64) (*celestiatypes.Valset, error) { + currentHeight := height + for currentHeight >= 1 { + latestNonce, err := aq.QueryHistoricalLatestAttestationNonce(ctx, currentHeight) + if err != nil { + return nil, err + } + + var latestValset *celestiatypes.Valset + if vs, err := aq.QueryHistoricalValsetByNonce(ctx, latestNonce, currentHeight); err == nil { + latestValset = vs + } else { + latestValset, err = aq.QueryHistoricalLastValsetBeforeNonce(ctx, latestNonce, currentHeight) + if err == nil { + return latestValset, nil + } + } + aq.Logger.Debug("keeping looking for attestation in archival state", "err", err.Error()) + currentHeight -= uint64(BlocksIn20DaysPeriod) + } + return nil, ErrNotFound +} + // QueryLastValsetBeforeNonce returns the last valset before nonce. // This will be needed when signing to know the validator set at that particular nonce. // the provided `nonce` can be a valset, but this will return the valset before it. @@ -170,6 +287,43 @@ func (aq *AppQuerier) QueryLastValsetBeforeNonce(ctx context.Context, nonce uint return resp.Valset, nil } +// QueryHistoricalLastValsetBeforeNonce returns the last historical valset before nonce for a certain height. +func (aq *AppQuerier) QueryHistoricalLastValsetBeforeNonce(ctx context.Context, nonce uint64, height uint64) (*celestiatypes.Valset, error) { + queryClient := celestiatypes.NewQueryClient(aq.clientConn) + var header metadata.MD + resp, err := queryClient.LatestValsetRequestBeforeNonce( + metadata.AppendToOutgoingContext(ctx, cosmosgrpc.GRPCBlockHeightHeader, strconv.FormatUint(height, 10)), + &celestiatypes.QueryLatestValsetRequestBeforeNonceRequest{Nonce: nonce}, + grpc.Header(&header), + ) + if err != nil { + return nil, err + } + + return resp.Valset, nil +} + +// QueryRecursiveHistoricalLastValsetBeforeNonce recursively looks for the last historical valset before nonce for a certain height until genesis. +func (aq *AppQuerier) QueryRecursiveHistoricalLastValsetBeforeNonce(ctx context.Context, nonce uint64, height uint64) (*celestiatypes.Valset, error) { + queryClient := celestiatypes.NewQueryClient(aq.clientConn) + + currentHeight := height + for currentHeight >= 1 { + var header metadata.MD + resp, err := queryClient.LatestValsetRequestBeforeNonce( + metadata.AppendToOutgoingContext(ctx, cosmosgrpc.GRPCBlockHeightHeader, strconv.FormatUint(height, 10)), + &celestiatypes.QueryLatestValsetRequestBeforeNonceRequest{Nonce: nonce}, + grpc.Header(&header), + ) + if err == nil { + return resp.Valset, err + } + aq.Logger.Debug("keeping looking for attestation in archival state", "err", err.Error()) + currentHeight -= uint64(BlocksIn20DaysPeriod) + } + return nil, ErrNotFound +} + // QueryLastUnbondingHeight query the last unbonding height from state machine. func (aq *AppQuerier) QueryLastUnbondingHeight(ctx context.Context) (int64, error) { queryClient := celestiatypes.NewQueryClient(aq.clientConn) diff --git a/rpc/errors.go b/rpc/errors.go index eabcf003..b91e0f6b 100644 --- a/rpc/errors.go +++ b/rpc/errors.go @@ -2,4 +2,7 @@ package rpc import "errors" -var ErrCouldntReachSpecifiedHeight = errors.New("couldn't reach specified height") +var ( + ErrCouldntReachSpecifiedHeight = errors.New("couldn't reach specified height") + ErrNotFound = errors.New("not found") +) From dedad028cd3f879fdac588643a5d83e4dfb09401 Mon Sep 17 00:00:00 2001 From: rachid Date: Tue, 31 Oct 2023 14:13:55 +0100 Subject: [PATCH 02/13] chore: lint --- cmd/blobstream/deploy/config.go | 3 +++ rpc/app_querier.go | 5 ++--- 2 files changed, 5 insertions(+), 3 deletions(-) diff --git a/cmd/blobstream/deploy/config.go b/cmd/blobstream/deploy/config.go index 47b479e6..47c6e48d 100644 --- a/cmd/blobstream/deploy/config.go +++ b/cmd/blobstream/deploy/config.go @@ -77,6 +77,9 @@ func parseDeployFlags(cmd *cobra.Command) (deployConfig, error) { return deployConfig{}, err } coreGRPCPort, err := cmd.Flags().GetUint(FlagCoreGRPCPort) + if err != nil { + return deployConfig{}, err + } coreRPCHost, err := cmd.Flags().GetString(FlagCoreRPCHost) if err != nil { return deployConfig{}, err diff --git a/rpc/app_querier.go b/rpc/app_querier.go index 366a9e55..a8b57612 100644 --- a/rpc/app_querier.go +++ b/rpc/app_querier.go @@ -255,11 +255,10 @@ func (aq *AppQuerier) QueryRecursiveLatestValset(ctx context.Context, height uin return nil, err } - var latestValset *celestiatypes.Valset if vs, err := aq.QueryHistoricalValsetByNonce(ctx, latestNonce, currentHeight); err == nil { - latestValset = vs + return vs, nil } else { - latestValset, err = aq.QueryHistoricalLastValsetBeforeNonce(ctx, latestNonce, currentHeight) + latestValset, err := aq.QueryHistoricalLastValsetBeforeNonce(ctx, latestNonce, currentHeight) if err == nil { return latestValset, nil } From a8ed811842fac197e12f61faae8821f3ff3865d2 Mon Sep 17 00:00:00 2001 From: rachid Date: Tue, 31 Oct 2023 14:17:37 +0100 Subject: [PATCH 03/13] chore: lint --- rpc/app_querier.go | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) diff --git a/rpc/app_querier.go b/rpc/app_querier.go index a8b57612..ac556d06 100644 --- a/rpc/app_querier.go +++ b/rpc/app_querier.go @@ -257,12 +257,13 @@ func (aq *AppQuerier) QueryRecursiveLatestValset(ctx context.Context, height uin if vs, err := aq.QueryHistoricalValsetByNonce(ctx, latestNonce, currentHeight); err == nil { return vs, nil - } else { - latestValset, err := aq.QueryHistoricalLastValsetBeforeNonce(ctx, latestNonce, currentHeight) - if err == nil { - return latestValset, nil - } } + + latestValset, err := aq.QueryHistoricalLastValsetBeforeNonce(ctx, latestNonce, currentHeight) + if err == nil { + return latestValset, nil + } + aq.Logger.Debug("keeping looking for attestation in archival state", "err", err.Error()) currentHeight -= uint64(BlocksIn20DaysPeriod) } From 0dec09b4c5f727b9bc565237ed18aee45f312c2d Mon Sep 17 00:00:00 2001 From: rachid Date: Tue, 31 Oct 2023 15:42:55 +0100 Subject: [PATCH 04/13] chore: better error message --- cmd/blobstream/deploy/cmd.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cmd/blobstream/deploy/cmd.go b/cmd/blobstream/deploy/cmd.go index 8be8f56d..e0b5ff23 100644 --- a/cmd/blobstream/deploy/cmd.go +++ b/cmd/blobstream/deploy/cmd.go @@ -69,7 +69,7 @@ func Command() *cobra.Command { vs, err := getStartingValset(cmd.Context(), *tmQuerier, appQuerier, config.startingNonce) if err != nil { - logger.Error("couldn't get valset from state (probably pruned). connect to an archive node to be able to deploy the contract") + logger.Error("couldn't get valset from state (probably pruned). connect to an archive node to be able to deploy the contract", "err", err.Error()) return errors.Wrap( err, "cannot initialize the Blobstream contract without having a valset request: %s", From e1d3468166e024ffb410b25b27d239210027c8b3 Mon Sep 17 00:00:00 2001 From: rachid Date: Tue, 31 Oct 2023 16:16:02 +0100 Subject: [PATCH 05/13] chore: use timeout for recursive lookup --- rpc/app_querier.go | 94 ++++++++++++++++++++++++++++------------------ 1 file changed, 58 insertions(+), 36 deletions(-) diff --git a/rpc/app_querier.go b/rpc/app_querier.go index ac556d06..86047244 100644 --- a/rpc/app_querier.go +++ b/rpc/app_querier.go @@ -3,6 +3,7 @@ package rpc import ( "context" "strconv" + "time" "github.com/celestiaorg/celestia-app/pkg/appconsts" cosmosgrpc "github.com/cosmos/cosmos-sdk/types/grpc" @@ -101,22 +102,29 @@ func (aq *AppQuerier) QueryRecursiveHistoricalAttestationByNonce(ctx context.Con queryClient := celestiatypes.NewQueryClient(aq.clientConn) currentHeight := height + ctx, cancel := context.WithTimeout(ctx, 10*time.Minute) + defer cancel() for currentHeight >= 1 { - var header metadata.MD - atResp, err := queryClient.AttestationRequestByNonce( - metadata.AppendToOutgoingContext(ctx, cosmosgrpc.GRPCBlockHeightHeader, strconv.FormatUint(currentHeight, 10)), // Add metadata to request - &celestiatypes.QueryAttestationRequestByNonceRequest{Nonce: nonce}, - grpc.Header(&header), // Retrieve header from response - ) - if err == nil { - unmarshalledAttestation, err := aq.unmarshallAttestation(atResp.Attestation) - if err != nil { - return nil, err + select { + case <-ctx.Done(): + return nil, ctx.Err() + default: + var header metadata.MD + atResp, err := queryClient.AttestationRequestByNonce( + metadata.AppendToOutgoingContext(ctx, cosmosgrpc.GRPCBlockHeightHeader, strconv.FormatUint(currentHeight, 10)), // Add metadata to request + &celestiatypes.QueryAttestationRequestByNonceRequest{Nonce: nonce}, + grpc.Header(&header), // Retrieve header from response + ) + if err == nil { + unmarshalledAttestation, err := aq.unmarshallAttestation(atResp.Attestation) + if err != nil { + return nil, err + } + return unmarshalledAttestation, nil } - return unmarshalledAttestation, nil + aq.Logger.Debug("keeping looking for attestation in archival state", "err", err.Error()) + currentHeight -= uint64(BlocksIn20DaysPeriod) } - aq.Logger.Debug("keeping looking for attestation in archival state", "err", err.Error()) - currentHeight -= uint64(BlocksIn20DaysPeriod) } return nil, ErrNotFound } @@ -249,23 +257,30 @@ func (aq *AppQuerier) QueryLatestValset(ctx context.Context) (*celestiatypes.Val // QueryRecursiveLatestValset query the latest recorded valset in the state machine in history. func (aq *AppQuerier) QueryRecursiveLatestValset(ctx context.Context, height uint64) (*celestiatypes.Valset, error) { currentHeight := height + ctx, cancel := context.WithTimeout(ctx, 10*time.Minute) + defer cancel() for currentHeight >= 1 { - latestNonce, err := aq.QueryHistoricalLatestAttestationNonce(ctx, currentHeight) - if err != nil { - return nil, err - } + select { + case <-ctx.Done(): + return nil, ctx.Err() + default: + latestNonce, err := aq.QueryHistoricalLatestAttestationNonce(ctx, currentHeight) + if err != nil { + return nil, err + } - if vs, err := aq.QueryHistoricalValsetByNonce(ctx, latestNonce, currentHeight); err == nil { - return vs, nil - } + if vs, err := aq.QueryHistoricalValsetByNonce(ctx, latestNonce, currentHeight); err == nil { + return vs, nil + } - latestValset, err := aq.QueryHistoricalLastValsetBeforeNonce(ctx, latestNonce, currentHeight) - if err == nil { - return latestValset, nil - } + latestValset, err := aq.QueryHistoricalLastValsetBeforeNonce(ctx, latestNonce, currentHeight) + if err == nil { + return latestValset, nil + } - aq.Logger.Debug("keeping looking for attestation in archival state", "err", err.Error()) - currentHeight -= uint64(BlocksIn20DaysPeriod) + aq.Logger.Debug("keeping looking for attestation in archival state", "err", err.Error()) + currentHeight -= uint64(BlocksIn20DaysPeriod) + } } return nil, ErrNotFound } @@ -308,18 +323,25 @@ func (aq *AppQuerier) QueryRecursiveHistoricalLastValsetBeforeNonce(ctx context. queryClient := celestiatypes.NewQueryClient(aq.clientConn) currentHeight := height + ctx, cancel := context.WithTimeout(ctx, 10*time.Minute) + defer cancel() for currentHeight >= 1 { - var header metadata.MD - resp, err := queryClient.LatestValsetRequestBeforeNonce( - metadata.AppendToOutgoingContext(ctx, cosmosgrpc.GRPCBlockHeightHeader, strconv.FormatUint(height, 10)), - &celestiatypes.QueryLatestValsetRequestBeforeNonceRequest{Nonce: nonce}, - grpc.Header(&header), - ) - if err == nil { - return resp.Valset, err + select { + case <-ctx.Done(): + return nil, ctx.Err() + default: + var header metadata.MD + resp, err := queryClient.LatestValsetRequestBeforeNonce( + metadata.AppendToOutgoingContext(ctx, cosmosgrpc.GRPCBlockHeightHeader, strconv.FormatUint(height, 10)), + &celestiatypes.QueryLatestValsetRequestBeforeNonceRequest{Nonce: nonce}, + grpc.Header(&header), + ) + if err == nil { + return resp.Valset, err + } + aq.Logger.Debug("keeping looking for attestation in archival state", "err", err.Error()) + currentHeight -= uint64(BlocksIn20DaysPeriod) } - aq.Logger.Debug("keeping looking for attestation in archival state", "err", err.Error()) - currentHeight -= uint64(BlocksIn20DaysPeriod) } return nil, ErrNotFound } From 01f8e7469fddb6ad46088f914f1e1b1fdda5ec27 Mon Sep 17 00:00:00 2001 From: rachid Date: Thu, 2 Nov 2023 12:40:19 +0100 Subject: [PATCH 06/13] test: add tests for historical queries --- orchestrator/orchestrator_test.go | 4 +- orchestrator/suite_test.go | 5 +- relayer/relayer_test.go | 4 +- rpc/app_historic_querier_test.go | 120 ++++++++++++++++++++++++++++++ rpc/app_querier.go | 9 +++ rpc/historic_suite_test.go | 51 +++++++++++++ testing/celestia_network.go | 15 ++-- 7 files changed, 200 insertions(+), 8 deletions(-) create mode 100644 rpc/app_historic_querier_test.go create mode 100644 rpc/historic_suite_test.go diff --git a/orchestrator/orchestrator_test.go b/orchestrator/orchestrator_test.go index 8c150b92..680d1bbf 100644 --- a/orchestrator/orchestrator_test.go +++ b/orchestrator/orchestrator_test.go @@ -206,7 +206,9 @@ func TestProcessWithoutValsetInStore(t *testing.T) { testnode.ImmediateProposals(codec), qgbtesting.SetDataCommitmentWindowParams(codec, celestiatypes.Params{DataCommitmentWindow: 101}), }, - TimeIotaMs: 6048000, // to have enough time to sign attestations after they're pruned + TimeIotaMs: 6048000, // to have enough time to sign attestations after they're pruned + Pruning: "default", + TimeoutCommit: 5 * time.Millisecond, }, ) _, err := node.CelestiaNetwork.WaitForHeight(400) diff --git a/orchestrator/suite_test.go b/orchestrator/suite_test.go index bafba568..fc0c5cf3 100644 --- a/orchestrator/suite_test.go +++ b/orchestrator/suite_test.go @@ -3,6 +3,7 @@ package orchestrator_test import ( "context" "testing" + "time" "github.com/celestiaorg/celestia-app/app" "github.com/celestiaorg/celestia-app/app/encoding" @@ -31,7 +32,9 @@ func (s *OrchestratorTestSuite) SetupSuite() { testnode.ImmediateProposals(codec), blobstreamtesting.SetDataCommitmentWindowParams(codec, types.Params{DataCommitmentWindow: 101}), }, - TimeIotaMs: 1, + TimeIotaMs: 1, + Pruning: "default", + TimeoutCommit: 5 * time.Millisecond, }, ) s.Orchestrator = blobstreamtesting.NewOrchestrator(t, s.Node) diff --git a/relayer/relayer_test.go b/relayer/relayer_test.go index 887f1021..586ccd78 100644 --- a/relayer/relayer_test.go +++ b/relayer/relayer_test.go @@ -68,7 +68,9 @@ func TestUseValsetFromP2P(t *testing.T) { testnode.ImmediateProposals(codec), qgbtesting.SetDataCommitmentWindowParams(codec, types.Params{DataCommitmentWindow: 101}), }, - TimeIotaMs: 2000000, // so attestations are pruned after they're queried + TimeIotaMs: 2000000, // so attestations are pruned after they're queried + Pruning: "default", + TimeoutCommit: 5 * time.Millisecond, }, ) diff --git a/rpc/app_historic_querier_test.go b/rpc/app_historic_querier_test.go new file mode 100644 index 00000000..9d9eb5e5 --- /dev/null +++ b/rpc/app_historic_querier_test.go @@ -0,0 +1,120 @@ +package rpc_test + +import ( + "context" + + "github.com/stretchr/testify/require" + + "github.com/celestiaorg/orchestrator-relayer/rpc" +) + +func (s *HistoricQuerierTestSuite) TestQueryHistoricAttestationByNonce() { + appQuerier := rpc.NewAppQuerier( + s.Logger, + s.Network.GRPCAddr, + s.EncConf, + ) + require.NoError(s.T(), appQuerier.Start()) + defer appQuerier.Stop() //nolint:errcheck + + // this one should fail because the attestation is deleted from the state + _, err := appQuerier.QueryAttestationByNonce(context.Background(), 1) + s.Error(err) + + att, err := appQuerier.QueryHistoricalAttestationByNonce(context.Background(), 1, 10) + s.NoError(err) + s.NotNil(att) + s.Equal(uint64(1), att.GetNonce()) +} + +func (s *HistoricQuerierTestSuite) TestQueryRecursiveHistoricAttestationByNonce() { + appQuerier := rpc.NewAppQuerier( + s.Logger, + s.Network.GRPCAddr, + s.EncConf, + ) + require.NoError(s.T(), appQuerier.Start()) + defer appQuerier.Stop() //nolint:errcheck + + // this one should fail because the attestation is deleted from the state + _, err := appQuerier.QueryAttestationByNonce(context.Background(), 1) + s.Error(err) + + height, err := s.Network.LatestHeight() + s.Require().NoError(err) + att, err := appQuerier.QueryRecursiveHistoricalAttestationByNonce(context.Background(), 1, uint64(height)) + s.Require().NoError(err) + s.NotNil(att) + s.Equal(uint64(1), att.GetNonce()) +} + +func (s *HistoricQuerierTestSuite) TestQueryHistoricalLatestAttestationNonce() { + appQuerier := rpc.NewAppQuerier( + s.Logger, + s.Network.GRPCAddr, + s.EncConf, + ) + require.NoError(s.T(), appQuerier.Start()) + defer appQuerier.Stop() //nolint:errcheck + + nonce, err := appQuerier.QueryHistoricalLatestAttestationNonce(context.Background(), 2) + s.Require().NoError(err) + s.Equal(uint64(1), nonce) +} + +func (s *HistoricQuerierTestSuite) TestQueryHistoricalValsetByNonce() { + appQuerier := rpc.NewAppQuerier( + s.Logger, + s.Network.GRPCAddr, + s.EncConf, + ) + require.NoError(s.T(), appQuerier.Start()) + defer appQuerier.Stop() //nolint:errcheck + + // this one should fail because the attestation is deleted from the state + _, err := appQuerier.QueryValsetByNonce(context.Background(), 1) + s.Error(err) + + att, err := appQuerier.QueryHistoricalValsetByNonce(context.Background(), 1, 10) + s.Require().NoError(err) + s.NotNil(att) + s.Equal(uint64(1), att.GetNonce()) +} + +func (s *HistoricQuerierTestSuite) TestQueryHistoricalLastValsetBeforeNonce() { + appQuerier := rpc.NewAppQuerier( + s.Logger, + s.Network.GRPCAddr, + s.EncConf, + ) + require.NoError(s.T(), appQuerier.Start()) + defer appQuerier.Stop() //nolint:errcheck + + // this one should fail because the attestation is deleted from the state + _, err := appQuerier.QueryLastValsetBeforeNonce(context.Background(), 2) + s.Error(err) + + att, err := appQuerier.QueryHistoricalLastValsetBeforeNonce(context.Background(), 2, 102) + s.Require().NoError(err) + s.NotNil(att) + s.Equal(uint64(1), att.GetNonce()) +} + +func (s *HistoricQuerierTestSuite) TestQueryRecursiveHistoricalLastValsetBeforeNonce() { + appQuerier := rpc.NewAppQuerier( + s.Logger, + s.Network.GRPCAddr, + s.EncConf, + ) + require.NoError(s.T(), appQuerier.Start()) + defer appQuerier.Stop() //nolint:errcheck + + // this one should fail because the attestation is deleted from the state + _, err := appQuerier.QueryLastValsetBeforeNonce(context.Background(), 2) + s.Error(err) + + att, err := appQuerier.QueryRecursiveHistoricalLastValsetBeforeNonce(context.Background(), 2, 201) + s.Require().NoError(err) + s.NotNil(att) + s.Equal(uint64(1), att.GetNonce()) +} diff --git a/rpc/app_querier.go b/rpc/app_querier.go index 86047244..de6082da 100644 --- a/rpc/app_querier.go +++ b/rpc/app_querier.go @@ -122,6 +122,9 @@ func (aq *AppQuerier) QueryRecursiveHistoricalAttestationByNonce(ctx context.Con } return unmarshalledAttestation, nil } + if currentHeight <= uint64(BlocksIn20DaysPeriod) { + return nil, ErrNotFound + } aq.Logger.Debug("keeping looking for attestation in archival state", "err", err.Error()) currentHeight -= uint64(BlocksIn20DaysPeriod) } @@ -278,6 +281,9 @@ func (aq *AppQuerier) QueryRecursiveLatestValset(ctx context.Context, height uin return latestValset, nil } + if currentHeight <= uint64(BlocksIn20DaysPeriod) { + return nil, ErrNotFound + } aq.Logger.Debug("keeping looking for attestation in archival state", "err", err.Error()) currentHeight -= uint64(BlocksIn20DaysPeriod) } @@ -339,6 +345,9 @@ func (aq *AppQuerier) QueryRecursiveHistoricalLastValsetBeforeNonce(ctx context. if err == nil { return resp.Valset, err } + if currentHeight <= uint64(BlocksIn20DaysPeriod) { + return nil, ErrNotFound + } aq.Logger.Debug("keeping looking for attestation in archival state", "err", err.Error()) currentHeight -= uint64(BlocksIn20DaysPeriod) } diff --git a/rpc/historic_suite_test.go b/rpc/historic_suite_test.go new file mode 100644 index 00000000..1213a456 --- /dev/null +++ b/rpc/historic_suite_test.go @@ -0,0 +1,51 @@ +package rpc_test + +import ( + "context" + "testing" + "time" + + "github.com/celestiaorg/celestia-app/test/util/testnode" + "github.com/celestiaorg/celestia-app/x/qgb/types" + "github.com/celestiaorg/orchestrator-relayer/rpc" + + "github.com/celestiaorg/celestia-app/app" + "github.com/celestiaorg/celestia-app/app/encoding" + tmlog "github.com/tendermint/tendermint/libs/log" + + "github.com/stretchr/testify/require" + + blobstreamtesting "github.com/celestiaorg/orchestrator-relayer/testing" + "github.com/stretchr/testify/suite" +) + +type HistoricQuerierTestSuite struct { + suite.Suite + Network *blobstreamtesting.CelestiaNetwork + EncConf encoding.Config + Logger tmlog.Logger +} + +func (s *HistoricQuerierTestSuite) SetupSuite() { + t := s.T() + ctx := context.Background() + s.EncConf = encoding.MakeConfig(app.ModuleEncodingRegisters...) + s.Network = blobstreamtesting.NewCelestiaNetwork( + ctx, + t, + blobstreamtesting.CelestiaNetworkParams{ + GenesisOpts: []testnode.GenesisOption{blobstreamtesting.SetDataCommitmentWindowParams(s.EncConf.Codec, types.Params{DataCommitmentWindow: 101})}, + TimeIotaMs: 6048000, // so that old attestations are deleted as soon as a new one appears + Pruning: "nothing", // make the node an archive one + TimeoutCommit: 20 * time.Millisecond, + }, + ) + _, err := s.Network.WaitForHeightWithTimeout(401, 30*time.Second) + s.Logger = tmlog.NewNopLogger() + require.NoError(t, err) + rpc.BlocksIn20DaysPeriod = 100 +} + +func TestHistoricQueriers(t *testing.T) { + suite.Run(t, new(HistoricQuerierTestSuite)) +} diff --git a/testing/celestia_network.go b/testing/celestia_network.go index 5afa8c41..8ec98aec 100644 --- a/testing/celestia_network.go +++ b/testing/celestia_network.go @@ -42,14 +42,18 @@ type CelestiaNetwork struct { } type CelestiaNetworkParams struct { - GenesisOpts []celestiatestnode.GenesisOption - TimeIotaMs int64 + GenesisOpts []celestiatestnode.GenesisOption + TimeIotaMs int64 + Pruning string + TimeoutCommit time.Duration } func DefaultCelestiaNetworkParams() CelestiaNetworkParams { return CelestiaNetworkParams{ - GenesisOpts: nil, - TimeIotaMs: 1, + GenesisOpts: nil, + TimeIotaMs: 1, + Pruning: "default", + TimeoutCommit: 5 * time.Millisecond, } } @@ -73,8 +77,9 @@ func NewCelestiaNetwork(ctx context.Context, t *testing.T, params CelestiaNetwor } tmCfg := celestiatestnode.DefaultTendermintConfig() - tmCfg.Consensus.TimeoutCommit = time.Millisecond * 5 + tmCfg.Consensus.TimeoutCommit = params.TimeoutCommit appConf := celestiatestnode.DefaultAppConfig() + appConf.Pruning = params.Pruning consensusParams := celestiatestnode.DefaultParams() consensusParams.Block.TimeIotaMs = params.TimeIotaMs From 875df3539bf89aeb11ec08fccb8b5baee1f8897b Mon Sep 17 00:00:00 2001 From: rachid Date: Thu, 2 Nov 2023 14:10:16 +0100 Subject: [PATCH 07/13] chore: refactor for less code duplication --- rpc/app_historic_querier_test.go | 52 ++++---------------------------- rpc/historic_suite_test.go | 15 ++++++++- 2 files changed, 20 insertions(+), 47 deletions(-) diff --git a/rpc/app_historic_querier_test.go b/rpc/app_historic_querier_test.go index 9d9eb5e5..71c874b0 100644 --- a/rpc/app_historic_querier_test.go +++ b/rpc/app_historic_querier_test.go @@ -2,20 +2,10 @@ package rpc_test import ( "context" - - "github.com/stretchr/testify/require" - - "github.com/celestiaorg/orchestrator-relayer/rpc" ) func (s *HistoricQuerierTestSuite) TestQueryHistoricAttestationByNonce() { - appQuerier := rpc.NewAppQuerier( - s.Logger, - s.Network.GRPCAddr, - s.EncConf, - ) - require.NoError(s.T(), appQuerier.Start()) - defer appQuerier.Stop() //nolint:errcheck + appQuerier := s.setupAppQuerier() // this one should fail because the attestation is deleted from the state _, err := appQuerier.QueryAttestationByNonce(context.Background(), 1) @@ -28,13 +18,7 @@ func (s *HistoricQuerierTestSuite) TestQueryHistoricAttestationByNonce() { } func (s *HistoricQuerierTestSuite) TestQueryRecursiveHistoricAttestationByNonce() { - appQuerier := rpc.NewAppQuerier( - s.Logger, - s.Network.GRPCAddr, - s.EncConf, - ) - require.NoError(s.T(), appQuerier.Start()) - defer appQuerier.Stop() //nolint:errcheck + appQuerier := s.setupAppQuerier() // this one should fail because the attestation is deleted from the state _, err := appQuerier.QueryAttestationByNonce(context.Background(), 1) @@ -49,13 +33,7 @@ func (s *HistoricQuerierTestSuite) TestQueryRecursiveHistoricAttestationByNonce( } func (s *HistoricQuerierTestSuite) TestQueryHistoricalLatestAttestationNonce() { - appQuerier := rpc.NewAppQuerier( - s.Logger, - s.Network.GRPCAddr, - s.EncConf, - ) - require.NoError(s.T(), appQuerier.Start()) - defer appQuerier.Stop() //nolint:errcheck + appQuerier := s.setupAppQuerier() nonce, err := appQuerier.QueryHistoricalLatestAttestationNonce(context.Background(), 2) s.Require().NoError(err) @@ -63,13 +41,7 @@ func (s *HistoricQuerierTestSuite) TestQueryHistoricalLatestAttestationNonce() { } func (s *HistoricQuerierTestSuite) TestQueryHistoricalValsetByNonce() { - appQuerier := rpc.NewAppQuerier( - s.Logger, - s.Network.GRPCAddr, - s.EncConf, - ) - require.NoError(s.T(), appQuerier.Start()) - defer appQuerier.Stop() //nolint:errcheck + appQuerier := s.setupAppQuerier() // this one should fail because the attestation is deleted from the state _, err := appQuerier.QueryValsetByNonce(context.Background(), 1) @@ -82,13 +54,7 @@ func (s *HistoricQuerierTestSuite) TestQueryHistoricalValsetByNonce() { } func (s *HistoricQuerierTestSuite) TestQueryHistoricalLastValsetBeforeNonce() { - appQuerier := rpc.NewAppQuerier( - s.Logger, - s.Network.GRPCAddr, - s.EncConf, - ) - require.NoError(s.T(), appQuerier.Start()) - defer appQuerier.Stop() //nolint:errcheck + appQuerier := s.setupAppQuerier() // this one should fail because the attestation is deleted from the state _, err := appQuerier.QueryLastValsetBeforeNonce(context.Background(), 2) @@ -101,13 +67,7 @@ func (s *HistoricQuerierTestSuite) TestQueryHistoricalLastValsetBeforeNonce() { } func (s *HistoricQuerierTestSuite) TestQueryRecursiveHistoricalLastValsetBeforeNonce() { - appQuerier := rpc.NewAppQuerier( - s.Logger, - s.Network.GRPCAddr, - s.EncConf, - ) - require.NoError(s.T(), appQuerier.Start()) - defer appQuerier.Stop() //nolint:errcheck + appQuerier := s.setupAppQuerier() // this one should fail because the attestation is deleted from the state _, err := appQuerier.QueryLastValsetBeforeNonce(context.Background(), 2) diff --git a/rpc/historic_suite_test.go b/rpc/historic_suite_test.go index 1213a456..9c296f01 100644 --- a/rpc/historic_suite_test.go +++ b/rpc/historic_suite_test.go @@ -41,11 +41,24 @@ func (s *HistoricQuerierTestSuite) SetupSuite() { }, ) _, err := s.Network.WaitForHeightWithTimeout(401, 30*time.Second) - s.Logger = tmlog.NewNopLogger() require.NoError(t, err) + s.Logger = tmlog.NewNopLogger() rpc.BlocksIn20DaysPeriod = 100 } func TestHistoricQueriers(t *testing.T) { suite.Run(t, new(HistoricQuerierTestSuite)) } + +func (s *HistoricQuerierTestSuite) setupAppQuerier() *rpc.AppQuerier { + appQuerier := rpc.NewAppQuerier( + s.Logger, + s.Network.GRPCAddr, + s.EncConf, + ) + require.NoError(s.T(), appQuerier.Start()) + s.T().Cleanup(func() { + appQuerier.Stop() //nolint:errcheck + }) + return appQuerier +} From 816989232463822cc500a167f8e1b99bf007e253 Mon Sep 17 00:00:00 2001 From: rachid Date: Thu, 2 Nov 2023 22:31:33 +0100 Subject: [PATCH 08/13] test: add relayer test --- relayer/historic_relayer_test.go | 39 +++++++++++++++++++ relayer/historic_suite_test.go | 66 ++++++++++++++++++++++++++++++++ rpc/app_querier.go | 25 +++++------- 3 files changed, 114 insertions(+), 16 deletions(-) create mode 100644 relayer/historic_relayer_test.go create mode 100644 relayer/historic_suite_test.go diff --git a/relayer/historic_relayer_test.go b/relayer/historic_relayer_test.go new file mode 100644 index 00000000..5503493d --- /dev/null +++ b/relayer/historic_relayer_test.go @@ -0,0 +1,39 @@ +package relayer_test + +import ( + "context" + "math/big" + "time" + + blobstreamtypes "github.com/celestiaorg/orchestrator-relayer/types" + + "github.com/celestiaorg/celestia-app/x/qgb/types" + "github.com/stretchr/testify/require" +) + +func (s *HistoricalRelayerTestSuite) TestProcessHistoricAttestation() { + t := s.T() + _, err := s.Node.CelestiaNetwork.WaitForHeightWithTimeout(400, 30*time.Second) + require.NoError(t, err) + + ctx := context.Background() + valset, err := s.Orchestrator.AppQuerier.QueryLatestValset(ctx) + require.NoError(t, err) + + for { + _, err = s.Orchestrator.AppQuerier.QueryAttestationByNonce(ctx, valset.Nonce) + if err != nil { + break + } + } + + att := types.NewDataCommitment(valset.Nonce+1, 10, 100, time.Now()) + commitment, err := s.Orchestrator.TmQuerier.QueryCommitment(ctx, att.BeginBlock, att.EndBlock) + require.NoError(t, err) + dataRootTupleRoot := blobstreamtypes.DataCommitmentTupleRootSignBytes(big.NewInt(int64(att.Nonce)), commitment) + err = s.Orchestrator.ProcessDataCommitmentEvent(ctx, *att, dataRootTupleRoot) + require.NoError(t, err) + + _, err = s.Relayer.ProcessAttestation(ctx, s.Node.EVMChain.Auth, att) + require.NoError(t, err) +} diff --git a/relayer/historic_suite_test.go b/relayer/historic_suite_test.go new file mode 100644 index 00000000..fa00c9e1 --- /dev/null +++ b/relayer/historic_suite_test.go @@ -0,0 +1,66 @@ +package relayer_test + +import ( + "context" + "testing" + "time" + + "github.com/celestiaorg/celestia-app/app" + "github.com/celestiaorg/celestia-app/app/encoding" + "github.com/celestiaorg/celestia-app/test/util/testnode" + "github.com/celestiaorg/celestia-app/x/qgb/types" + "github.com/celestiaorg/orchestrator-relayer/rpc" + + "github.com/celestiaorg/orchestrator-relayer/orchestrator" + + "github.com/celestiaorg/orchestrator-relayer/relayer" + blobstreamtesting "github.com/celestiaorg/orchestrator-relayer/testing" + "github.com/stretchr/testify/require" + "github.com/stretchr/testify/suite" +) + +type HistoricalRelayerTestSuite struct { + suite.Suite + Node *blobstreamtesting.TestNode + Orchestrator *orchestrator.Orchestrator + Relayer *relayer.Relayer +} + +func (s *HistoricalRelayerTestSuite) SetupSuite() { + t := s.T() + if testing.Short() { + t.Skip("skipping relayer tests in short mode.") + } + ctx := context.Background() + s.Node = blobstreamtesting.NewTestNode( + ctx, + t, + blobstreamtesting.CelestiaNetworkParams{ + GenesisOpts: []testnode.GenesisOption{blobstreamtesting.SetDataCommitmentWindowParams( + encoding.MakeConfig(app.ModuleEncodingRegisters...).Codec, + types.Params{DataCommitmentWindow: 101}, + )}, + TimeIotaMs: 3048000, // so that old attestations are deleted as soon as a new one appears + Pruning: "nothing", // make the node an archive one + TimeoutCommit: 20 * time.Millisecond, + }, + ) + _, err := s.Node.CelestiaNetwork.WaitForHeight(2) + require.NoError(t, err) + s.Orchestrator = blobstreamtesting.NewOrchestrator(t, s.Node) + s.Relayer = blobstreamtesting.NewRelayer(t, s.Node) + go s.Node.EVMChain.PeriodicCommit(ctx, time.Millisecond) + initVs, err := s.Relayer.AppQuerier.QueryLatestValset(s.Node.Context) + require.NoError(t, err) + _, _, _, err = s.Relayer.EVMClient.DeployBlobstreamContract(s.Node.EVMChain.Auth, s.Node.EVMChain.Backend, *initVs, initVs.Nonce, true) + require.NoError(t, err) + rpc.BlocksIn20DaysPeriod = 50 +} + +func (s *HistoricalRelayerTestSuite) TearDownSuite() { + s.Node.Close() +} + +func TestHistoricRelayer(t *testing.T) { + suite.Run(t, new(HistoricalRelayerTestSuite)) +} diff --git a/rpc/app_querier.go b/rpc/app_querier.go index de6082da..1abd2608 100644 --- a/rpc/app_querier.go +++ b/rpc/app_querier.go @@ -326,30 +326,23 @@ func (aq *AppQuerier) QueryHistoricalLastValsetBeforeNonce(ctx context.Context, // QueryRecursiveHistoricalLastValsetBeforeNonce recursively looks for the last historical valset before nonce for a certain height until genesis. func (aq *AppQuerier) QueryRecursiveHistoricalLastValsetBeforeNonce(ctx context.Context, nonce uint64, height uint64) (*celestiatypes.Valset, error) { - queryClient := celestiatypes.NewQueryClient(aq.clientConn) - - currentHeight := height ctx, cancel := context.WithTimeout(ctx, 10*time.Minute) defer cancel() - for currentHeight >= 1 { + currentNonce := nonce - 1 + for currentNonce > 0 { select { case <-ctx.Done(): return nil, ctx.Err() default: - var header metadata.MD - resp, err := queryClient.LatestValsetRequestBeforeNonce( - metadata.AppendToOutgoingContext(ctx, cosmosgrpc.GRPCBlockHeightHeader, strconv.FormatUint(height, 10)), - &celestiatypes.QueryLatestValsetRequestBeforeNonceRequest{Nonce: nonce}, - grpc.Header(&header), - ) - if err == nil { - return resp.Valset, err + n, err := aq.QueryRecursiveHistoricalAttestationByNonce(ctx, currentNonce, height) + if err != nil { + return nil, err } - if currentHeight <= uint64(BlocksIn20DaysPeriod) { - return nil, ErrNotFound + vs, ok := n.(*celestiatypes.Valset) + if ok { + return vs, nil } - aq.Logger.Debug("keeping looking for attestation in archival state", "err", err.Error()) - currentHeight -= uint64(BlocksIn20DaysPeriod) + nonce -= 1 } } return nil, ErrNotFound From dfe7f7daba9f60f994d730033aa7ebcebc12d458 Mon Sep 17 00:00:00 2001 From: rachid Date: Thu, 2 Nov 2023 22:41:15 +0100 Subject: [PATCH 09/13] docs: add comment --- relayer/historic_relayer_test.go | 4 ++++ rpc/app_querier.go | 2 ++ 2 files changed, 6 insertions(+) diff --git a/relayer/historic_relayer_test.go b/relayer/historic_relayer_test.go index 5503493d..7c7170e6 100644 --- a/relayer/historic_relayer_test.go +++ b/relayer/historic_relayer_test.go @@ -20,6 +20,8 @@ func (s *HistoricalRelayerTestSuite) TestProcessHistoricAttestation() { valset, err := s.Orchestrator.AppQuerier.QueryLatestValset(ctx) require.NoError(t, err) + // wait for the valset to be pruned to test if the relayer is able to + // relay using a pruned valset. for { _, err = s.Orchestrator.AppQuerier.QueryAttestationByNonce(ctx, valset.Nonce) if err != nil { @@ -27,6 +29,7 @@ func (s *HistoricalRelayerTestSuite) TestProcessHistoricAttestation() { } } + // sign a test data commitment so that the relayer can relay it att := types.NewDataCommitment(valset.Nonce+1, 10, 100, time.Now()) commitment, err := s.Orchestrator.TmQuerier.QueryCommitment(ctx, att.BeginBlock, att.EndBlock) require.NoError(t, err) @@ -34,6 +37,7 @@ func (s *HistoricalRelayerTestSuite) TestProcessHistoricAttestation() { err = s.Orchestrator.ProcessDataCommitmentEvent(ctx, *att, dataRootTupleRoot) require.NoError(t, err) + // process the test data commitment that needs the pruned valset to be relayed. _, err = s.Relayer.ProcessAttestation(ctx, s.Node.EVMChain.Auth, att) require.NoError(t, err) } diff --git a/rpc/app_querier.go b/rpc/app_querier.go index 1abd2608..c0c3f1a9 100644 --- a/rpc/app_querier.go +++ b/rpc/app_querier.go @@ -20,6 +20,8 @@ import ( tmlog "github.com/tendermint/tendermint/libs/log" ) +// BlocksIn20DaysPeriod represents the number of blocks in 20-days period. +// It uses the timeout commit constant, defined in app, for the computation var BlocksIn20DaysPeriod = 20 * 24 * 60 * 60 / appconsts.TimeoutCommit.Seconds() // AppQuerier queries the application for attestations and unbonding periods. From c5e3ae7d29fb2995780bbb34f94ff10325c0916a Mon Sep 17 00:00:00 2001 From: rachid Date: Thu, 2 Nov 2023 22:41:40 +0100 Subject: [PATCH 10/13] chore: golang ci --- rpc/app_querier.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/rpc/app_querier.go b/rpc/app_querier.go index c0c3f1a9..2cc7090a 100644 --- a/rpc/app_querier.go +++ b/rpc/app_querier.go @@ -344,7 +344,7 @@ func (aq *AppQuerier) QueryRecursiveHistoricalLastValsetBeforeNonce(ctx context. if ok { return vs, nil } - nonce -= 1 + nonce-- } } return nil, ErrNotFound From bda6cb552f4e8e72b3277a872d8c7108761fe16f Mon Sep 17 00:00:00 2001 From: rachid Date: Fri, 3 Nov 2023 11:33:10 +0100 Subject: [PATCH 11/13] chore: fix nonce + better logging --- relayer/relayer.go | 5 +++-- rpc/app_querier.go | 10 +++++----- 2 files changed, 8 insertions(+), 7 deletions(-) diff --git a/relayer/relayer.go b/relayer/relayer.go index f77a0593..f8f55417 100644 --- a/relayer/relayer.go +++ b/relayer/relayer.go @@ -145,10 +145,10 @@ func (r *Relayer) Start(ctx context.Context) error { func (r *Relayer) ProcessAttestation(ctx context.Context, opts *bind.TransactOpts, attI celestiatypes.AttestationRequestI) (*coregethtypes.Transaction, error) { previousValset, err := r.AppQuerier.QueryLastValsetBeforeNonce(ctx, attI.GetNonce()) if err != nil { - r.logger.Error("failed to query the last valset before nonce (probably pruned). recovering via falling back to the P2P network", "err", err.Error()) + r.logger.Debug("failed to query the last valset before nonce (probably pruned). recovering via falling back to the P2P network", "err", err.Error()) previousValset, err = r.QueryValsetFromP2PNetworkAndValidateIt(ctx) if err != nil { - r.logger.Error("failed to query the last valset before nonce from p2p network. trying using an archive node", "err", err.Error()) + r.logger.Debug("failed to query the last valset before nonce from p2p network. attempting via using an archive node (might take some time)", "err", err.Error()) currentHeight, err := r.TmQuerier.QueryHeight(ctx) if err != nil { return nil, err @@ -158,6 +158,7 @@ func (r *Relayer) ProcessAttestation(ctx context.Context, opts *bind.TransactOpt return nil, err } } + r.logger.Debug("found the needed valset") } switch att := attI.(type) { case *celestiatypes.Valset: diff --git a/rpc/app_querier.go b/rpc/app_querier.go index 2cc7090a..cc375e24 100644 --- a/rpc/app_querier.go +++ b/rpc/app_querier.go @@ -127,7 +127,6 @@ func (aq *AppQuerier) QueryRecursiveHistoricalAttestationByNonce(ctx context.Con if currentHeight <= uint64(BlocksIn20DaysPeriod) { return nil, ErrNotFound } - aq.Logger.Debug("keeping looking for attestation in archival state", "err", err.Error()) currentHeight -= uint64(BlocksIn20DaysPeriod) } } @@ -286,7 +285,6 @@ func (aq *AppQuerier) QueryRecursiveLatestValset(ctx context.Context, height uin if currentHeight <= uint64(BlocksIn20DaysPeriod) { return nil, ErrNotFound } - aq.Logger.Debug("keeping looking for attestation in archival state", "err", err.Error()) currentHeight -= uint64(BlocksIn20DaysPeriod) } } @@ -331,11 +329,14 @@ func (aq *AppQuerier) QueryRecursiveHistoricalLastValsetBeforeNonce(ctx context. ctx, cancel := context.WithTimeout(ctx, 10*time.Minute) defer cancel() currentNonce := nonce - 1 - for currentNonce > 0 { + for { select { case <-ctx.Done(): return nil, ctx.Err() default: + if currentNonce == 0 { + return nil, ErrNotFound + } n, err := aq.QueryRecursiveHistoricalAttestationByNonce(ctx, currentNonce, height) if err != nil { return nil, err @@ -344,10 +345,9 @@ func (aq *AppQuerier) QueryRecursiveHistoricalLastValsetBeforeNonce(ctx context. if ok { return vs, nil } - nonce-- + currentNonce-- } } - return nil, ErrNotFound } // QueryLastUnbondingHeight query the last unbonding height from state machine. From d269745a45f91cdfed23ff2ae11a84df492bc437 Mon Sep 17 00:00:00 2001 From: rachid Date: Mon, 6 Nov 2023 11:53:56 +0100 Subject: [PATCH 12/13] chore: conflicts --- cmd/blobstream/deploy/config.go | 2 +- rpc/app_querier.go | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/cmd/blobstream/deploy/config.go b/cmd/blobstream/deploy/config.go index f4f675e2..c3ac1b52 100644 --- a/cmd/blobstream/deploy/config.go +++ b/cmd/blobstream/deploy/config.go @@ -58,7 +58,7 @@ type deployConfig struct { evmAccAddress string startingNonce string evmGasLimit uint64 - grpcInsecure bool + grpcInsecure bool } func parseDeployFlags(cmd *cobra.Command) (deployConfig, error) { diff --git a/rpc/app_querier.go b/rpc/app_querier.go index 5e2d4852..0f0c545f 100644 --- a/rpc/app_querier.go +++ b/rpc/app_querier.go @@ -2,13 +2,13 @@ package rpc import ( "context" + "crypto/tls" "strconv" "time" "github.com/celestiaorg/celestia-app/pkg/appconsts" cosmosgrpc "github.com/cosmos/cosmos-sdk/types/grpc" "google.golang.org/grpc/metadata" - "crypto/tls" "google.golang.org/grpc" "google.golang.org/grpc/credentials" From 6de00da287bb9292217a4412ce86cd6c9fa83d20 Mon Sep 17 00:00:00 2001 From: rachid Date: Mon, 6 Nov 2023 11:55:34 +0100 Subject: [PATCH 13/13] chore: conflicts --- cmd/blobstream/deploy/cmd.go | 2 +- rpc/historic_suite_test.go | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/cmd/blobstream/deploy/cmd.go b/cmd/blobstream/deploy/cmd.go index 901fbeb3..68d214b8 100644 --- a/cmd/blobstream/deploy/cmd.go +++ b/cmd/blobstream/deploy/cmd.go @@ -44,7 +44,7 @@ func Command() *cobra.Command { encCfg := encoding.MakeConfig(app.ModuleEncodingRegisters...) appQuerier := rpc.NewAppQuerier(logger, config.coreGRPC, encCfg) - err = querier.Start(config.grpcInsecure) + err = appQuerier.Start(config.grpcInsecure) if err != nil { return err } diff --git a/rpc/historic_suite_test.go b/rpc/historic_suite_test.go index 9c296f01..43140a86 100644 --- a/rpc/historic_suite_test.go +++ b/rpc/historic_suite_test.go @@ -56,7 +56,7 @@ func (s *HistoricQuerierTestSuite) setupAppQuerier() *rpc.AppQuerier { s.Network.GRPCAddr, s.EncConf, ) - require.NoError(s.T(), appQuerier.Start()) + require.NoError(s.T(), appQuerier.Start(true)) s.T().Cleanup(func() { appQuerier.Stop() //nolint:errcheck })