Skip to content
This repository was archived by the owner on Apr 15, 2024. It is now read-only.

feat: use archive nodes to query attestations #571

Merged
merged 18 commits into from
Nov 6, 2023
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
56 changes: 44 additions & 12 deletions cmd/blobstream/deploy/cmd.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,19 +43,31 @@ func Command() *cobra.Command {

encCfg := encoding.MakeConfig(app.ModuleEncodingRegisters...)

querier := rpc.NewAppQuerier(logger, config.coreGRPC, encCfg)
err = querier.Start()
appQuerier := rpc.NewAppQuerier(logger, config.coreGRPC, encCfg)
err = appQuerier.Start()
if err != nil {
return err
}
defer func() {
err := querier.Stop()
err := appQuerier.Stop()
if err != nil {
logger.Error(err.Error())
}
}()

vs, err := getStartingValset(cmd.Context(), querier, config.startingNonce)
tmQuerier := rpc.NewTmQuerier(config.coreRPC, logger)
err = tmQuerier.Start()
if err != nil {
return err
}
defer func(tmQuerier *rpc.TmQuerier) {
err := tmQuerier.Stop()
if err != nil {
logger.Error(err.Error())
}
}(tmQuerier)

vs, err := getStartingValset(cmd.Context(), *tmQuerier, appQuerier, config.startingNonce)
if err != nil {
logger.Error("couldn't get valset from state (probably pruned). connect to an archive node to be able to deploy the contract")
return errors.Wrap(
Expand Down Expand Up @@ -130,15 +142,29 @@ func Command() *cobra.Command {
}

// getStartingValset get the valset that will be used to init the bridge contract.
func getStartingValset(ctx context.Context, querier *rpc.AppQuerier, startingNonce string) (*types.Valset, error) {
func getStartingValset(ctx context.Context, tmQuerier rpc.TmQuerier, appQuerier *rpc.AppQuerier, startingNonce string) (*types.Valset, error) {
switch startingNonce {
case "latest":
return querier.QueryLatestValset(ctx)
vs, err := appQuerier.QueryLatestValset(ctx)
if err != nil {
appQuerier.Logger.Debug("couldn't get the attestation from node state. trying with historical data if the target node is archival", "nonce", 1, "err", err.Error())
currentHeight, err := tmQuerier.QueryHeight(ctx)
if err != nil {
return nil, err
}
return appQuerier.QueryRecursiveLatestValset(ctx, uint64(currentHeight))
}
return vs, nil
case "earliest":
// TODO make the first nonce 1 a const
att, err := querier.QueryAttestationByNonce(ctx, 1)
att, err := appQuerier.QueryAttestationByNonce(ctx, 1)
if err != nil {
return nil, err
appQuerier.Logger.Debug("couldn't get the attestation from node state. trying with historical data if the target node is archival", "nonce", 1, "err", err.Error())
historicalAtt, err := appQuerier.QueryHistoricalAttestationByNonce(ctx, 1, 1)
if err != nil {
return nil, err
}
att = historicalAtt
}
vs, ok := att.(*types.Valset)
if !ok {
Expand All @@ -150,17 +176,23 @@ func getStartingValset(ctx context.Context, querier *rpc.AppQuerier, startingNon
if err != nil {
return nil, err
}
attestation, err := querier.QueryAttestationByNonce(ctx, nonce)
currentHeight, err := tmQuerier.QueryHeight(ctx)
if err != nil {
return nil, err
}
attestation, err := appQuerier.QueryRecursiveHistoricalAttestationByNonce(ctx, nonce, uint64(currentHeight))
if err != nil {
return nil, err
}
if attestation == nil {
return nil, types.ErrNilAttestation
}
value, ok := attestation.(*types.Valset)
if ok {
switch value := attestation.(type) {
case *types.Valset:
return value, nil
case *types.DataCommitment:
return appQuerier.QueryRecursiveHistoricalLastValsetBeforeNonce(ctx, nonce, value.EndBlock)
}
return querier.QueryLastValsetBeforeNonce(ctx, nonce)
}
return nil, ErrNotFound
}
24 changes: 19 additions & 5 deletions cmd/blobstream/deploy/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ const (
FlagEVMGasLimit = "evm.gas-limit"
FlagCoreGRPCHost = "core.grpc.host"
FlagCoreGRPCPort = "core.grpc.port"
FlagCoreRPCHost = "core.rpc.host"
FlagCoreRPCPort = "core.rpc.port"
FlagStartingNonce = "starting-nonce"
ServiceNameDeployer = "deployer"
)
Expand All @@ -26,6 +28,8 @@ func addDeployFlags(cmd *cobra.Command) *cobra.Command {
cmd.Flags().Uint64(FlagEVMChainID, 5, "Specify the evm chain id")
cmd.Flags().String(FlagCoreGRPCHost, "localhost", "Specify the grpc address host")
cmd.Flags().Uint(FlagCoreGRPCPort, 9090, "Specify the grpc address port")
cmd.Flags().String(FlagCoreRPCHost, "localhost", "Specify the rpc address host")
cmd.Flags().Uint(FlagCoreRPCPort, 26657, "Specify the rpc address port")
cmd.Flags().String(FlagEVMRPC, "http://localhost:8545", "Specify the ethereum rpc address")
cmd.Flags().String(
FlagStartingNonce,
Expand All @@ -48,11 +52,12 @@ func addDeployFlags(cmd *cobra.Command) *cobra.Command {

type deployConfig struct {
*base.Config
evmRPC, coreGRPC string
evmChainID uint64
evmAccAddress string
startingNonce string
evmGasLimit uint64
evmRPC string
coreRPC, coreGRPC string
evmChainID uint64
evmAccAddress string
startingNonce string
evmGasLimit uint64
}

func parseDeployFlags(cmd *cobra.Command) (deployConfig, error) {
Expand All @@ -75,6 +80,14 @@ func parseDeployFlags(cmd *cobra.Command) (deployConfig, error) {
if err != nil {
return deployConfig{}, err
}
coreRPCHost, err := cmd.Flags().GetString(FlagCoreRPCHost)
if err != nil {
return deployConfig{}, err
}
coreRPCPort, err := cmd.Flags().GetUint(FlagCoreRPCPort)
if err != nil {
return deployConfig{}, err
}
evmRPC, err := cmd.Flags().GetString(FlagEVMRPC)
if err != nil {
return deployConfig{}, err
Expand Down Expand Up @@ -107,6 +120,7 @@ func parseDeployFlags(cmd *cobra.Command) (deployConfig, error) {
evmAccAddress: evmAccAddr,
evmChainID: evmChainID,
coreGRPC: fmt.Sprintf("%s:%d", coreGRPCHost, coreGRPCPort),
coreRPC: fmt.Sprintf("tcp://%s:%d", coreRPCHost, coreRPCPort),
evmRPC: evmRPC,
startingNonce: startingNonce,
evmGasLimit: evmGasLimit,
Expand Down
5 changes: 4 additions & 1 deletion cmd/blobstream/deploy/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,4 +2,7 @@ package deploy

import "errors"

var ErrUnmarshallValset = errors.New("couldn't unmarsall valset")
var (
ErrUnmarshallValset = errors.New("couldn't unmarshall valset")
ErrNotFound = errors.New("not found")
)
2 changes: 2 additions & 0 deletions e2e/scripts/deploy_blobstream_contract.sh
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,8 @@ echo "deploying Blobstream contract..."
--evm.account "${EVM_ACCOUNT}" \
--core.grpc.host "${CORE_GRPC_HOST}" \
--core.grpc.port "${CORE_GRPC_PORT}" \
--core.rpc.host "${CORE_RPC_HOST}" \
--core.rpc.port "${CORE_RPC_PORT}" \
--starting-nonce "${STARTING_NONCE}" \
--evm.rpc "${EVM_ENDPOINT}" \
--evm.passphrase=123 > /opt/output
Expand Down
10 changes: 9 additions & 1 deletion relayer/relayer.go
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,15 @@ func (r *Relayer) ProcessAttestation(ctx context.Context, opts *bind.TransactOpt
r.logger.Error("failed to query the last valset before nonce (probably pruned). recovering via falling back to the P2P network", "err", err.Error())
previousValset, err = r.QueryValsetFromP2PNetworkAndValidateIt(ctx)
if err != nil {
return nil, err
r.logger.Error("failed to query the last valset before nonce from p2p network. trying using an archive node", "err", err.Error())
currentHeight, err := r.TmQuerier.QueryHeight(ctx)
if err != nil {
return nil, err
}
previousValset, err = r.AppQuerier.QueryRecursiveHistoricalLastValsetBeforeNonce(ctx, attI.GetNonce(), uint64(currentHeight))
if err != nil {
return nil, err
}
}
}
switch att := attI.(type) {
Expand Down
154 changes: 154 additions & 0 deletions rpc/app_querier.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,11 @@ package rpc

import (
"context"
"strconv"

"github.com/celestiaorg/celestia-app/pkg/appconsts"
cosmosgrpc "github.com/cosmos/cosmos-sdk/types/grpc"
"google.golang.org/grpc/metadata"

"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
Expand All @@ -14,6 +19,8 @@ import (
tmlog "github.com/tendermint/tendermint/libs/log"
)

var BlocksIn20DaysPeriod = 20 * 24 * 60 * 60 / appconsts.TimeoutCommit.Seconds()

// AppQuerier queries the application for attestations and unbonding periods.
type AppQuerier struct {
blobStreamRPC string
Expand Down Expand Up @@ -63,6 +70,57 @@ func (aq *AppQuerier) QueryAttestationByNonce(ctx context.Context, nonce uint64)
return unmarshalledAttestation, nil
}

// QueryHistoricalAttestationByNonce query an attestation by nonce from the state machine at a certain height.
func (aq *AppQuerier) QueryHistoricalAttestationByNonce(ctx context.Context, nonce uint64, height uint64) (celestiatypes.AttestationRequestI, error) {
queryClient := celestiatypes.NewQueryClient(aq.clientConn)

var header metadata.MD
atResp, err := queryClient.AttestationRequestByNonce(
metadata.AppendToOutgoingContext(ctx, cosmosgrpc.GRPCBlockHeightHeader, strconv.FormatUint(height, 10)), // Add metadata to request
&celestiatypes.QueryAttestationRequestByNonceRequest{Nonce: nonce},
grpc.Header(&header), // Retrieve header from response
)
if err != nil {
return nil, err
}
if atResp.Attestation == nil {
return nil, nil
}

unmarshalledAttestation, err := aq.unmarshallAttestation(atResp.Attestation)
if err != nil {
return nil, err
}

return unmarshalledAttestation, nil
}

// QueryRecursiveHistoricalAttestationByNonce query an attestation by nonce from the state machine
// via going over the history step by step starting from height.
func (aq *AppQuerier) QueryRecursiveHistoricalAttestationByNonce(ctx context.Context, nonce uint64, height uint64) (celestiatypes.AttestationRequestI, error) {
queryClient := celestiatypes.NewQueryClient(aq.clientConn)

currentHeight := height
for currentHeight >= 1 {
var header metadata.MD
atResp, err := queryClient.AttestationRequestByNonce(
metadata.AppendToOutgoingContext(ctx, cosmosgrpc.GRPCBlockHeightHeader, strconv.FormatUint(currentHeight, 10)), // Add metadata to request
&celestiatypes.QueryAttestationRequestByNonceRequest{Nonce: nonce},
grpc.Header(&header), // Retrieve header from response
)
if err == nil {
unmarshalledAttestation, err := aq.unmarshallAttestation(atResp.Attestation)
if err != nil {
return nil, err
}
return unmarshalledAttestation, nil
}
aq.Logger.Debug("keeping looking for attestation in archival state", "err", err.Error())
currentHeight -= uint64(BlocksIn20DaysPeriod)
}
return nil, ErrNotFound
}

// QueryLatestAttestationNonce query the latest attestation nonce from the state machine.
func (aq *AppQuerier) QueryLatestAttestationNonce(ctx context.Context) (uint64, error) {
queryClient := celestiatypes.NewQueryClient(aq.clientConn)
Expand All @@ -78,6 +136,23 @@ func (aq *AppQuerier) QueryLatestAttestationNonce(ctx context.Context) (uint64,
return resp.Nonce, nil
}

// QueryHistoricalLatestAttestationNonce query the historical latest attestation nonce from the state machine at a certain nonce.
func (aq *AppQuerier) QueryHistoricalLatestAttestationNonce(ctx context.Context, height uint64) (uint64, error) {
queryClient := celestiatypes.NewQueryClient(aq.clientConn)

var header metadata.MD
resp, err := queryClient.LatestAttestationNonce(
metadata.AppendToOutgoingContext(ctx, cosmosgrpc.GRPCBlockHeightHeader, strconv.FormatUint(height, 10)),
&celestiatypes.QueryLatestAttestationNonceRequest{},
grpc.Header(&header),
)
if err != nil {
return 0, err
}

return resp.Nonce, nil
}

// QueryDataCommitmentByNonce query a data commitment by its nonce.
func (aq *AppQuerier) QueryDataCommitmentByNonce(ctx context.Context, nonce uint64) (*celestiatypes.DataCommitment, error) {
attestation, err := aq.QueryAttestationByNonce(ctx, nonce)
Expand Down Expand Up @@ -134,6 +209,24 @@ func (aq *AppQuerier) QueryValsetByNonce(ctx context.Context, nonce uint64) (*ce
return value, nil
}

// QueryHistoricalValsetByNonce query a historical valset by nonce.
func (aq *AppQuerier) QueryHistoricalValsetByNonce(ctx context.Context, nonce uint64, height uint64) (*celestiatypes.Valset, error) {
attestation, err := aq.QueryHistoricalAttestationByNonce(ctx, nonce, height)
if err != nil {
return nil, err
}
if attestation == nil {
return nil, types.ErrAttestationNotFound
}

value, ok := attestation.(*celestiatypes.Valset)
if !ok {
return nil, types.ErrUnmarshalValset
}

return value, nil
}

// QueryLatestValset query the latest recorded valset in the state machine.
func (aq *AppQuerier) QueryLatestValset(ctx context.Context) (*celestiatypes.Valset, error) {
latestNonce, err := aq.QueryLatestAttestationNonce(ctx)
Expand All @@ -153,6 +246,30 @@ func (aq *AppQuerier) QueryLatestValset(ctx context.Context) (*celestiatypes.Val
return latestValset, nil
}

// QueryRecursiveLatestValset query the latest recorded valset in the state machine in history.
func (aq *AppQuerier) QueryRecursiveLatestValset(ctx context.Context, height uint64) (*celestiatypes.Valset, error) {
currentHeight := height
for currentHeight >= 1 {
latestNonce, err := aq.QueryHistoricalLatestAttestationNonce(ctx, currentHeight)
if err != nil {
return nil, err
}

if vs, err := aq.QueryHistoricalValsetByNonce(ctx, latestNonce, currentHeight); err == nil {
return vs, nil
}

latestValset, err := aq.QueryHistoricalLastValsetBeforeNonce(ctx, latestNonce, currentHeight)
if err == nil {
return latestValset, nil
}

aq.Logger.Debug("keeping looking for attestation in archival state", "err", err.Error())
currentHeight -= uint64(BlocksIn20DaysPeriod)
}
return nil, ErrNotFound
}

// QueryLastValsetBeforeNonce returns the last valset before nonce.
// This will be needed when signing to know the validator set at that particular nonce.
// the provided `nonce` can be a valset, but this will return the valset before it.
Expand All @@ -170,6 +287,43 @@ func (aq *AppQuerier) QueryLastValsetBeforeNonce(ctx context.Context, nonce uint
return resp.Valset, nil
}

// QueryHistoricalLastValsetBeforeNonce returns the last historical valset before nonce for a certain height.
func (aq *AppQuerier) QueryHistoricalLastValsetBeforeNonce(ctx context.Context, nonce uint64, height uint64) (*celestiatypes.Valset, error) {
queryClient := celestiatypes.NewQueryClient(aq.clientConn)
var header metadata.MD
resp, err := queryClient.LatestValsetRequestBeforeNonce(
metadata.AppendToOutgoingContext(ctx, cosmosgrpc.GRPCBlockHeightHeader, strconv.FormatUint(height, 10)),
&celestiatypes.QueryLatestValsetRequestBeforeNonceRequest{Nonce: nonce},
grpc.Header(&header),
)
if err != nil {
return nil, err
}

return resp.Valset, nil
}

// QueryRecursiveHistoricalLastValsetBeforeNonce recursively looks for the last historical valset before nonce for a certain height until genesis.
func (aq *AppQuerier) QueryRecursiveHistoricalLastValsetBeforeNonce(ctx context.Context, nonce uint64, height uint64) (*celestiatypes.Valset, error) {
Comment on lines +323 to +339
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

are both of these used independently, or do we only ever call one of these?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One of them queries at an exact height, the other goes range by range of blocks to find the attestation. So yes, both is used

queryClient := celestiatypes.NewQueryClient(aq.clientConn)

currentHeight := height
for currentHeight >= 1 {
var header metadata.MD
resp, err := queryClient.LatestValsetRequestBeforeNonce(
metadata.AppendToOutgoingContext(ctx, cosmosgrpc.GRPCBlockHeightHeader, strconv.FormatUint(height, 10)),
&celestiatypes.QueryLatestValsetRequestBeforeNonceRequest{Nonce: nonce},
grpc.Header(&header),
)
if err == nil {
return resp.Valset, err
}
aq.Logger.Debug("keeping looking for attestation in archival state", "err", err.Error())
currentHeight -= uint64(BlocksIn20DaysPeriod)
}
return nil, ErrNotFound
}

// QueryLastUnbondingHeight query the last unbonding height from state machine.
func (aq *AppQuerier) QueryLastUnbondingHeight(ctx context.Context) (int64, error) {
queryClient := celestiatypes.NewQueryClient(aq.clientConn)
Expand Down
Loading