From 791453dbe91e178b660be13ec27adb4ea96fa90e Mon Sep 17 00:00:00 2001 From: CHAMI Rachid Date: Fri, 27 Oct 2023 23:31:33 +0200 Subject: [PATCH] feat: gossiping the latest valset in P2P layer in case they get pruned (#560) * feat: gossiping the latest valset in P2P layer in case they get pruned * Update p2p/dht.go * Update p2p/dht.go * Update p2p/dht.go * Update p2p/dht.go * Update p2p/dht.go * Update p2p/dht.go * Update p2p/errors.go * Update p2p/keys.go * Update types/latest_valset.go * Update p2p/validators.go * Update p2p/validators_test.go * Update p2p/validators_test.go * Update p2p/validators_test.go * Update p2p/validators.go * Update p2p/validators.go * Update p2p/validators.go * Update p2p/validators.go * Update p2p/validators.go * Update p2p/validators.go * Update p2p/validators.go * Update p2p/validators.go * fix: nil pointer * chore: type2 rename * Update types/latest_valset_test.go Co-authored-by: Evan Forbes <42654277+evan-forbes@users.noreply.github.com> * fix: introduce LatestValset type to fix serialization * feat: authenticate the valset to the Blobstream contract * feat: add log message for deployer --------- Co-authored-by: Evan Forbes <42654277+evan-forbes@users.noreply.github.com> --- cmd/blobstream/deploy/cmd.go | 1 + evm/evm_client.go | 8 +++ orchestrator/broadcaster.go | 7 ++ orchestrator/broadcaster_test.go | 36 ++++++++++ orchestrator/orchestrator.go | 10 +++ orchestrator/orchestrator_test.go | 26 +++++++ p2p/dht.go | 42 +++++++++-- p2p/dht_test.go | 105 +++++++++++++++++++++++++++ p2p/errors.go | 6 +- p2p/keys.go | 5 ++ p2p/querier.go | 11 +++ p2p/validators.go | 45 +++++++++++- p2p/validators_test.go | 113 ++++++++++++++++++++++++++++++ relayer/errors.go | 1 + relayer/relayer.go | 53 +++++++++++--- relayer/relayer_test.go | 106 ++++++++++++++++++++++++++++ types/latest_valset.go | 73 +++++++++++++++++++ types/latest_valset_test.go | 56 +++++++++++++++ 18 files changed, 689 insertions(+), 15 deletions(-) create mode 100644 types/latest_valset.go create mode 100644 types/latest_valset_test.go diff --git a/cmd/blobstream/deploy/cmd.go b/cmd/blobstream/deploy/cmd.go index 2866d4e4..211bbfa7 100644 --- a/cmd/blobstream/deploy/cmd.go +++ b/cmd/blobstream/deploy/cmd.go @@ -57,6 +57,7 @@ func Command() *cobra.Command { vs, err := getStartingValset(cmd.Context(), querier, config.startingNonce) if err != nil { + logger.Error("couldn't get valset from state (probably pruned). connect to an archive node to be able to deploy the contract") return errors.Wrap( err, "cannot initialize the Blobstream contract without having a valset request: %s", diff --git a/evm/evm_client.go b/evm/evm_client.go index 33bee506..70a81814 100644 --- a/evm/evm_client.go +++ b/evm/evm_client.go @@ -216,6 +216,14 @@ func (ec *Client) StateLastEventNonce(opts *bind.CallOpts) (uint64, error) { return nonce.Uint64(), nil } +func (ec *Client) StateLastValidatorSetCheckpoint(opts *bind.CallOpts) ([32]byte, error) { + checkpoint, err := ec.Wrapper.StateLastValidatorSetCheckpoint(opts) + if err != nil { + return [32]byte{}, err + } + return checkpoint, nil +} + func (ec *Client) WaitForTransaction( ctx context.Context, backend bind.DeployBackend, diff --git a/orchestrator/broadcaster.go b/orchestrator/broadcaster.go index f6d874c1..b995da71 100644 --- a/orchestrator/broadcaster.go +++ b/orchestrator/broadcaster.go @@ -29,3 +29,10 @@ func (b Broadcaster) ProvideValsetConfirm(ctx context.Context, nonce uint64, con } return b.BlobstreamDHT.PutValsetConfirm(ctx, p2p.GetValsetConfirmKey(nonce, confirm.EthAddress, signBytes), confirm) } + +func (b Broadcaster) ProvideLatestValset(ctx context.Context, latestValset types.LatestValset) error { + if len(b.BlobstreamDHT.RoutingTable().ListPeers()) == 0 { + return ErrEmptyPeersTable + } + return b.BlobstreamDHT.PutLatestValset(ctx, latestValset) +} diff --git a/orchestrator/broadcaster_test.go b/orchestrator/broadcaster_test.go index 519cd694..94066f70 100644 --- a/orchestrator/broadcaster_test.go +++ b/orchestrator/broadcaster_test.go @@ -5,6 +5,9 @@ import ( "encoding/hex" "math/big" "testing" + "time" + + celestiatypes "github.com/celestiaorg/celestia-app/x/qgb/types" "github.com/celestiaorg/orchestrator-relayer/evm" "github.com/ethereum/go-ethereum/accounts/keystore" @@ -97,6 +100,39 @@ func TestBroadcastValsetConfirm(t *testing.T) { assert.Equal(t, *expectedConfirm, actualConfirm) } +func TestBroadcastLatestValset(t *testing.T) { + network := blobstreamtesting.NewDHTNetwork(context.Background(), 4) + defer network.Stop() + + // create a test Valset + expectedValset := celestiatypes.Valset{ + Time: time.UnixMicro(10), + Height: 5, + Members: []celestiatypes.BridgeValidator{ + { + Power: 100, + EvmAddress: "evm_addr1", + }, + { + Power: 200, + EvmAddress: "evm_addr2", + }, + }, + } + + // Broadcast the valset + broadcaster := orchestrator.NewBroadcaster(network.DHTs[1]) + err := broadcaster.ProvideLatestValset(context.Background(), *types.ToLatestValset(expectedValset)) + assert.NoError(t, err) + + // try to get the valset from another peer + actualValset, err := network.DHTs[3].GetLatestValset(context.Background()) + assert.NoError(t, err) + assert.NotNil(t, actualValset) + + assert.True(t, types.IsValsetEqualToLatestValset(expectedValset, actualValset)) +} + // TestEmptyPeersTable tests that values are not broadcasted if the DHT peers // table is empty. func TestEmptyPeersTable(t *testing.T) { diff --git a/orchestrator/orchestrator.go b/orchestrator/orchestrator.go index ee82b2c0..22631257 100644 --- a/orchestrator/orchestrator.go +++ b/orchestrator/orchestrator.go @@ -284,6 +284,12 @@ func (orch Orchestrator) Process(ctx context.Context, nonce uint64) error { orch.Logger.Debug("validator not part of valset. won't sign", "nonce", nonce) return nil } + + if err == nil && previousValset != nil { + // add the valset to the p2p network + // it's alright if this fails, we can expect other nodes to do it successfully + _ = orch.Broadcaster.ProvideLatestValset(ctx, *types.ToLatestValset(*previousValset)) + } } switch castedAtt := att.(type) { @@ -341,6 +347,10 @@ func (orch Orchestrator) Process(ctx context.Context, nonce uint64) error { } func (orch Orchestrator) ProcessValsetEvent(ctx context.Context, valset celestiatypes.Valset) error { + // add the valset to the p2p network + // it's alright if this fails, we can expect other nodes to do it successfully + _ = orch.Broadcaster.ProvideLatestValset(ctx, *types.ToLatestValset(valset)) + signBytes, err := valset.SignBytes() if err != nil { return err diff --git a/orchestrator/orchestrator_test.go b/orchestrator/orchestrator_test.go index e60a2ed1..8c150b92 100644 --- a/orchestrator/orchestrator_test.go +++ b/orchestrator/orchestrator_test.go @@ -79,6 +79,32 @@ func (s *OrchestratorTestSuite) TestProcessValsetEvent() { assert.Equal(t, s.Orchestrator.EvmAccount.Address.Hex(), confirm.EthAddress) } +func (s *OrchestratorTestSuite) TestProcessValsetEventAndProvideValset() { + t := s.T() + _, err := s.Node.CelestiaNetwork.WaitForHeight(50) + require.NoError(t, err) + + vs, err := celestiatypes.NewValset( + 2, + 10, + []*celestiatypes.InternalBridgeValidator{{ + Power: 10, + EVMAddress: s.Orchestrator.EvmAccount.Address, + }}, + time.Now(), + ) + require.NoError(t, err) + + // signing and submitting the signature and also providing the valset event + err = s.Orchestrator.ProcessValsetEvent(s.Node.Context, *vs) + require.NoError(t, err) + + // retrieving the valset + actualVs, err := s.Node.DHTNetwork.DHTs[0].GetLatestValset(s.Node.Context) + require.NoError(t, err) + assert.Equal(t, vs.Nonce, actualVs.Nonce) +} + func TestValidatorPartOfValset(t *testing.T) { tests := []struct { name string diff --git a/p2p/dht.go b/p2p/dht.go index b9e4aeea..7598dcd5 100644 --- a/p2p/dht.go +++ b/p2p/dht.go @@ -4,10 +4,11 @@ import ( "context" "time" + "github.com/libp2p/go-libp2p-kad-dht/providers" + "github.com/celestiaorg/orchestrator-relayer/types" ds "github.com/ipfs/go-datastore" dht "github.com/libp2p/go-libp2p-kad-dht" - "github.com/libp2p/go-libp2p-kad-dht/providers" "github.com/libp2p/go-libp2p/core/host" "github.com/libp2p/go-libp2p/core/peer" tmlog "github.com/tendermint/tendermint/libs/log" @@ -17,6 +18,7 @@ const ( ProtocolPrefix = "/blobstream/0.1.0" // TODO "/blobstream/" ? DataCommitmentConfirmNamespace = "dcc" ValsetConfirmNamespace = "vc" + LatestValsetNamespace = "lv" ) // BlobstreamDHT wrapper around the `IpfsDHT` implementation. @@ -29,9 +31,9 @@ type BlobstreamDHT struct { // NewBlobstreamDHT create a new IPFS DHT using a suitable configuration for the Blobstream. // If nil is passed for bootstrappers, the DHT will not try to connect to any existing peer. func NewBlobstreamDHT(ctx context.Context, h host.Host, store ds.Batching, bootstrappers []peer.AddrInfo, logger tmlog.Logger) (*BlobstreamDHT, error) { - // this value is set to 23 days, which is the unbonding period. - // we want to have the signatures available for this whole period. - providers.ProvideValidity = time.Hour * 24 * 23 + // this values is set to a year, so that even in super-stable networks, we have at least + // one valset in store for a year. + providers.ProvideValidity = time.Hour * 24 * 365 router, err := dht.New( ctx, @@ -41,6 +43,7 @@ func NewBlobstreamDHT(ctx context.Context, h host.Host, store ds.Batching, boots dht.ProtocolPrefix(ProtocolPrefix), dht.NamespacedValidator(DataCommitmentConfirmNamespace, DataCommitmentConfirmValidator{}), dht.NamespacedValidator(ValsetConfirmNamespace, ValsetConfirmValidator{}), + dht.NamespacedValidator(LatestValsetNamespace, LatestValsetValidator{}), dht.BootstrapPeers(bootstrappers...), dht.DisableProviders(), ) @@ -157,3 +160,34 @@ func (q BlobstreamDHT) GetValsetConfirm(ctx context.Context, key string) (types. } return confirm, nil } + +// PutLatestValset encodes a valset then puts its value to the DHT. +// The key will be returned by the `GetValsetKey` method. +// If the valset is not the latest, it will fail. +// Returns an error if it fails. +func (q BlobstreamDHT) PutLatestValset(ctx context.Context, v types.LatestValset) error { + encodedData, err := types.MarshalLatestValset(v) + if err != nil { + return err + } + err = q.PutValue(ctx, GetLatestValsetKey(), encodedData) + if err != nil { + return err + } + return nil +} + +// GetLatestValset looks for the latest valset in the DHT. +// The key will be returned by the `GetValsetKey` method. +// Returns an error if it fails. +func (q BlobstreamDHT) GetLatestValset(ctx context.Context) (types.LatestValset, error) { + encoded, err := q.GetValue(ctx, GetLatestValsetKey()) // this is a blocking call, we should probably use timeout and channel + if err != nil { + return types.LatestValset{}, err + } + valset, err := types.UnmarshalLatestValset(encoded) + if err != nil { + return types.LatestValset{}, err + } + return valset, nil +} diff --git a/p2p/dht_test.go b/p2p/dht_test.go index 9ebfcecc..3d542b97 100644 --- a/p2p/dht_test.go +++ b/p2p/dht_test.go @@ -7,6 +7,8 @@ import ( "testing" "time" + celestiatypes "github.com/celestiaorg/celestia-app/x/qgb/types" + "github.com/celestiaorg/orchestrator-relayer/evm" "github.com/ethereum/go-ethereum/accounts/keystore" @@ -95,6 +97,109 @@ func TestPutDataCommitmentConfirm(t *testing.T) { assert.Equal(t, expectedConfirm, actualConfirm) } +func TestPutLatestValset(t *testing.T) { + network := blobstreamtesting.NewDHTNetwork(context.Background(), 2) + defer network.Stop() + + // create a test Valset + expectedValset := celestiatypes.Valset{ + Nonce: 10, + Time: time.UnixMicro(10), + Height: 5, + Members: []celestiatypes.BridgeValidator{ + { + Power: 100, + EvmAddress: "evm_addr1", + }, + { + Power: 200, + EvmAddress: "evm_addr2", + }, + }, + } + + // put the test Valset in the DHT + err := network.DHTs[0].PutLatestValset(context.Background(), *types.ToLatestValset(expectedValset)) + assert.NoError(t, err) + + // try to get the latest valset from the same peer + actualValset, err := network.DHTs[0].GetLatestValset(context.Background()) + assert.NoError(t, err) + assert.NotNil(t, actualValset) + + assert.True(t, types.IsValsetEqualToLatestValset(expectedValset, actualValset)) +} + +func TestPutMultipleLatestValset(t *testing.T) { + network := blobstreamtesting.NewDHTNetwork(context.Background(), 3) + defer network.Stop() + + // create test Valsets + valset1 := celestiatypes.Valset{ + Nonce: 10, + Time: time.UnixMicro(10), + Height: 5, + Members: []celestiatypes.BridgeValidator{ + { + Power: 100, + EvmAddress: "evm_addr1", + }, + { + Power: 200, + EvmAddress: "evm_addr2", + }, + }, + } + valset2 := celestiatypes.Valset{ + Nonce: 11, + Time: time.UnixMicro(10), + Height: 5, + Members: []celestiatypes.BridgeValidator{ + { + Power: 100, + EvmAddress: "evm_addr1", + }, + { + Power: 200, + EvmAddress: "evm_addr2", + }, + }, + } + valset3 := celestiatypes.Valset{ + Nonce: 9, + Time: time.UnixMicro(10), + Height: 5, + Members: []celestiatypes.BridgeValidator{ + { + Power: 100, + EvmAddress: "evm_addr1", + }, + { + Power: 200, + EvmAddress: "evm_addr2", + }, + }, + } + + // put the valsets in the DHT + err := network.DHTs[0].PutLatestValset(context.Background(), *types.ToLatestValset(valset1)) + assert.NoError(t, err) + + err = network.DHTs[1].PutLatestValset(context.Background(), *types.ToLatestValset(valset2)) + assert.NoError(t, err) + + // this one should fail since it puts an older valset than ones in store + err = network.DHTs[2].PutLatestValset(context.Background(), *types.ToLatestValset(valset3)) + assert.Error(t, err) + + // try to get the valset from the same peer + actualValset, err := network.DHTs[0].GetLatestValset(context.Background()) + assert.NoError(t, err) + assert.NotNil(t, actualValset) + + assert.True(t, types.IsValsetEqualToLatestValset(valset2, actualValset)) +} + func TestNetworkPutDataCommitmentConfirm(t *testing.T) { network := blobstreamtesting.NewDHTNetwork(context.Background(), 10) defer network.Stop() diff --git a/p2p/errors.go b/p2p/errors.go index dd1b1aec..77b4731e 100644 --- a/p2p/errors.go +++ b/p2p/errors.go @@ -1,6 +1,8 @@ package p2p -import "errors" +import ( + "errors" +) var ( ErrPeersTimeout = errors.New("timeout while waiting for peers") @@ -17,4 +19,6 @@ var ( ErrEmptyNamespace = errors.New("empty namespace") ErrEmptyEVMAddr = errors.New("empty evm address") ErrEmptyDigest = errors.New("empty digest") + ErrEmptyValset = errors.New("empty valset") + ErrInvalidLatestValsetKey = errors.New("invalid latest valset key") ) diff --git a/p2p/keys.go b/p2p/keys.go index 6b7132c2..86864300 100644 --- a/p2p/keys.go +++ b/p2p/keys.go @@ -32,6 +32,11 @@ func GetValsetConfirmKey(nonce uint64, evmAddr string, signBytes string) string evmAddr + ":" + signBytes } +// GetLatestValsetKey creates the latest valset key. +func GetLatestValsetKey() string { + return "/" + LatestValsetNamespace + "/latest" +} + // ParseKey parses a key and returns its fields. // Will return an error if the key is missing some fields, some fields are empty, or otherwise invalid. func ParseKey(key string) (namespace string, nonce uint64, evmAddr string, digest string, err error) { diff --git a/p2p/querier.go b/p2p/querier.go index 3be409ee..5529b0c6 100644 --- a/p2p/querier.go +++ b/p2p/querier.go @@ -318,3 +318,14 @@ func (q Querier) QueryValsetConfirms(ctx context.Context, nonce uint64, valset c } return confirms, nil } + +// QueryLatestValset get the latest valset from the p2p network. +func (q Querier) QueryLatestValset( + ctx context.Context, +) (*types.LatestValset, error) { + latestValset, err := q.BlobstreamDHT.GetLatestValset(ctx) + if err != nil { + return nil, err + } + return &latestValset, nil +} diff --git a/p2p/validators.go b/p2p/validators.go index 3c99f0e6..733fa08b 100644 --- a/p2p/validators.go +++ b/p2p/validators.go @@ -71,6 +71,49 @@ func (vcv ValsetConfirmValidator) Validate(key string, value []byte) error { return nil } +// LatestValsetValidator runs stateless checks on the latest valset when submitting it to the DHT. +type LatestValsetValidator struct{} + +// Validate runs stateless checks on the provided valset key and value. +func (lcv LatestValsetValidator) Validate(key string, value []byte) error { + vs, err := types.UnmarshalLatestValset(value) + if err != nil { + return err + } + if types.IsEmptyLatestValset(vs) { + return ErrEmptyValset + } + if key != GetLatestValsetKey() { + return ErrInvalidLatestValsetKey + } + return nil +} + +// Select selects a valid dht valset value from multiple ones. +// returns the latest one ordered by nonces. +// returns an error of no valid value is found. +func (lcv LatestValsetValidator) Select(key string, values [][]byte) (int, error) { + if key != GetLatestValsetKey() { + return 0, ErrInvalidLatestValsetKey + } + if len(values) == 0 { + return 0, ErrNoValues + } + latestNonce := uint64(0) + latestIndex := 0 + for index, value := range values { + valset, err := types.UnmarshalLatestValset(value) + if err != nil { + return 0, err + } + if valset.Nonce > latestNonce { + latestIndex = index + } + latestNonce = valset.Nonce + } + return latestIndex, nil +} + // Select selects a valid dht confirm value from multiple ones. // returns an error of no valid value is found. func (vcv ValsetConfirmValidator) Select(key string, values [][]byte) (int, error) { @@ -78,7 +121,7 @@ func (vcv ValsetConfirmValidator) Select(key string, values [][]byte) (int, erro return 0, ErrNoValues } for index, value := range values { - // choose the first correct value + // choose the first correct values if err := vcv.Validate(key, value); err == nil { return index, nil } diff --git a/p2p/validators_test.go b/p2p/validators_test.go index 15a18886..88a07a57 100644 --- a/p2p/validators_test.go +++ b/p2p/validators_test.go @@ -449,3 +449,116 @@ func TestDataCommitmentConfirmSelect(t *testing.T) { }) } } + +func TestLatestValsetValidatorValidate(t *testing.T) { + emptyVs, _ := types.MarshalLatestValset(types.LatestValset{}) + tests := []struct { + name string + key string + value []byte + wantErr bool + }{ + { + name: "valid key and value", + key: GetLatestValsetKey(), + value: []byte(`{"nonce":10,"members":[{"power":100,"evm_address":"evm_addr1"}],"height":5}`), + wantErr: false, + }, + { + name: "invalid key", + key: "invalid_key", + value: []byte(`{"nonce":10,"members":[{"power":100,"evm_address":"evm_addr1"}],"height":5}`), + wantErr: true, + }, + { + name: "empty valset", + key: GetLatestValsetKey(), + value: emptyVs, + wantErr: true, + }, + { + name: "invalid value", + key: GetLatestValsetKey(), + value: []byte(`{"nonce":"invalid nonce","members":[{"power":100,"evm_address":"evm_addr1"}],"height":5"}`), + wantErr: true, + }, + } + + lcv := LatestValsetValidator{} + + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + err := lcv.Validate(test.key, test.value) + if test.wantErr { + assert.Error(t, err) + } else { + assert.NoError(t, err) + } + }) + } +} + +func TestLatestValsetValidatorSelect(t *testing.T) { + tests := []struct { + name string + key string + values [][]byte + wantErr bool + index int + }{ + { + name: "no value", + key: GetLatestValsetKey(), + values: [][]byte{}, + wantErr: true, + }, + { + name: "single value", + key: GetLatestValsetKey(), + values: [][]byte{[]byte(`{"nonce":10,"members":[{"power":100,"evm_address":"evm_addr1"}],"height":5}`)}, + wantErr: false, + index: 0, + }, + { + name: "multiple values and last is latest", + key: GetLatestValsetKey(), + values: [][]byte{ + []byte(`{"nonce":10,"members":[{"power":100,"evm_address":"evm_addr1"}],"height":5}`), + []byte(`{"nonce":11,"members":[{"power":100,"evm_address":"evm_addr1"}],"height":5}`), + []byte(`{"nonce":12,"members":[{"power":100,"evm_address":"evm_addr1"}],"height":5}`), + }, + wantErr: false, + index: 2, + }, + { + name: "multiple values and middle one is invalid", + key: GetLatestValsetKey(), + values: [][]byte{ + []byte(`{"nonce":10,"members":[{"power":100,"evm_address":"evm_addr1"}],"height":5}`), + []byte(`{"nonce":"invalid nonce","members":[{"power":100,"evm_address":"evm_addr1"}],"height":5}`), + []byte(`{"nonce":12,"members":[{"power":100,"evm_address":"evm_addr1"}],"height":5}`), + }, + wantErr: true, + }, + { + name: "invalid key", + key: "invalid key", + values: [][]byte{[]byte(`{"nonce":10,"members":[{"power":100,"evm_address":"evm_addr1"}],"height":5}`)}, + wantErr: true, + }, + } + + lcv := LatestValsetValidator{} + + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + index, err := lcv.Select(test.key, test.values) + if test.wantErr { + assert.Error(t, err) + } else { + assert.NoError(t, err) + assert.Equal(t, test.index, index) + } + }) + } +} diff --git a/relayer/errors.go b/relayer/errors.go index b21b22e1..6326049d 100644 --- a/relayer/errors.go +++ b/relayer/errors.go @@ -8,4 +8,5 @@ var ( ErrAttestationNotValsetRequest = errors.New("attestation is not a valset request") ErrAttestationNotDataCommitmentRequest = errors.New("attestation is not a data commitment request") ErrAttestationNotFound = errors.New("attestation not found") + ErrValidatorSetMismatch = errors.New("p2p validator set is different from the trusted contract one") ) diff --git a/relayer/relayer.go b/relayer/relayer.go index ea53a2ee..79c7cd78 100644 --- a/relayer/relayer.go +++ b/relayer/relayer.go @@ -1,7 +1,9 @@ package relayer import ( + "bytes" "context" + "encoding/hex" "fmt" "math/big" "strconv" @@ -141,12 +143,16 @@ func (r *Relayer) Start(ctx context.Context) error { } func (r *Relayer) ProcessAttestation(ctx context.Context, opts *bind.TransactOpts, attI celestiatypes.AttestationRequestI) (*coregethtypes.Transaction, error) { - switch att := attI.(type) { - case *celestiatypes.Valset: - previousValset, err := r.AppQuerier.QueryLastValsetBeforeNonce(ctx, att.Nonce) + previousValset, err := r.AppQuerier.QueryLastValsetBeforeNonce(ctx, attI.GetNonce()) + if err != nil { + r.logger.Error("failed to query the last valset before nonce (probably pruned). recovering via falling back to the P2P network", "err", err.Error()) + previousValset, err = r.QueryValsetFromP2PNetworkAndValidateIt(ctx) if err != nil { return nil, err } + } + switch att := attI.(type) { + case *celestiatypes.Valset: signBytes, err := att.SignBytes() if err != nil { return nil, err @@ -165,16 +171,12 @@ func (r *Relayer) ProcessAttestation(ctx context.Context, opts *bind.TransactOpt } return tx, nil case *celestiatypes.DataCommitment: - valset, err := r.AppQuerier.QueryLastValsetBeforeNonce(ctx, att.Nonce) - if err != nil { - return nil, err - } commitment, err := r.TmQuerier.QueryCommitment(ctx, att.BeginBlock, att.EndBlock) if err != nil { return nil, err } dataRootHash := types.DataCommitmentTupleRootSignBytes(big.NewInt(int64(att.Nonce)), commitment) - confirms, err := r.P2PQuerier.QueryTwoThirdsDataCommitmentConfirms(ctx, 30*time.Minute, 10*time.Second, *valset, att.Nonce, dataRootHash.Hex()) + confirms, err := r.P2PQuerier.QueryTwoThirdsDataCommitmentConfirms(ctx, 30*time.Minute, 10*time.Second, *previousValset, att.Nonce, dataRootHash.Hex()) if err != nil { return nil, err } @@ -182,7 +184,7 @@ func (r *Relayer) ProcessAttestation(ctx context.Context, opts *bind.TransactOpt if err != nil { return nil, err } - tx, err := r.SubmitDataRootTupleRoot(opts, *att, *valset, commitment.String(), confirms) + tx, err := r.SubmitDataRootTupleRoot(opts, *att, *previousValset, commitment.String(), confirms) if err != nil { return nil, err } @@ -192,6 +194,39 @@ func (r *Relayer) ProcessAttestation(ctx context.Context, opts *bind.TransactOpt } } +// QueryValsetFromP2PNetworkAndValidateIt Queries the latest valset from the P2P network +// and validates it against the validator set hash used in the contract. +func (r *Relayer) QueryValsetFromP2PNetworkAndValidateIt(ctx context.Context) (*celestiatypes.Valset, error) { + latestValset, err := r.P2PQuerier.QueryLatestValset(ctx) + if err != nil { + return nil, err + } + vs := latestValset.ToValset() + vsHash, err := vs.SignBytes() + if err != nil { + return nil, err + } + r.logger.Info("found the latest valset in P2P network. Authenticating it against the contract to verify it's valid", "nonce", vs.Nonce, "hash", vsHash.Hex()) + + contractHash, err := r.EVMClient.StateLastValidatorSetCheckpoint(&bind.CallOpts{Context: ctx}) + if err != nil { + return nil, err + } + + bzVSHash, err := hex.DecodeString(vsHash.Hex()[2:]) + if err != nil { + return nil, err + } + + if !bytes.Equal(bzVSHash, contractHash[:]) { + r.logger.Error("valset hash from contract mismatches that of P2P one, halting. try running the relayer with an archive node to continue relaying", "contract_vs_hash", ethcmn.Bytes2Hex(contractHash[:]), "p2p_vs_hash", vsHash.Hex()) + return nil, ErrValidatorSetMismatch + } + + r.logger.Info("valset is valid. continuing relaying using the latest valset from P2P network", "nonce", vs.Nonce) + return vs, nil +} + func (r *Relayer) UpdateValidatorSet( ctx context.Context, opts *bind.TransactOpts, diff --git a/relayer/relayer_test.go b/relayer/relayer_test.go index 1b35a4fa..887f1021 100644 --- a/relayer/relayer_test.go +++ b/relayer/relayer_test.go @@ -1,10 +1,17 @@ package relayer_test import ( + "bytes" "context" "math/big" + "testing" "time" + "github.com/celestiaorg/celestia-app/app" + "github.com/celestiaorg/celestia-app/app/encoding" + "github.com/celestiaorg/celestia-app/test/util/testnode" + qgbtesting "github.com/celestiaorg/orchestrator-relayer/testing" + "github.com/celestiaorg/orchestrator-relayer/p2p" "github.com/ipfs/go-datastore" @@ -47,3 +54,102 @@ func (s *RelayerTestSuite) TestProcessAttestation() { require.NoError(t, err) assert.True(t, has) } + +func TestUseValsetFromP2P(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + codec := encoding.MakeConfig(app.ModuleEncodingRegisters...).Codec + node := qgbtesting.NewTestNode( + ctx, + t, + qgbtesting.CelestiaNetworkParams{ + GenesisOpts: []testnode.GenesisOption{ + testnode.ImmediateProposals(codec), + qgbtesting.SetDataCommitmentWindowParams(codec, types.Params{DataCommitmentWindow: 101}), + }, + TimeIotaMs: 2000000, // so attestations are pruned after they're queried + }, + ) + + // process valset nonce so that it is added to the DHT + orch := qgbtesting.NewOrchestrator(t, node) + vs, err := orch.AppQuerier.QueryLatestValset(ctx) + require.NoError(t, err) + err = orch.ProcessValsetEvent(ctx, *vs) + require.NoError(t, err) + + _, err = node.CelestiaNetwork.WaitForHeight(400) + require.NoError(t, err) + + for { + time.Sleep(time.Second) + // Wait until the valset is pruned + _, err = orch.AppQuerier.QueryLatestValset(ctx) + if err != nil { + break + } + } + + // the valset should be in the DHT + latestValset, err := orch.P2PQuerier.QueryLatestValset(ctx) + require.NoError(t, err) + + att := types.NewDataCommitment(latestValset.Nonce+1, 10, 100, time.Now()) + commitment, err := orch.TmQuerier.QueryCommitment(ctx, att.BeginBlock, att.EndBlock) + require.NoError(t, err) + dataRootTupleRoot := blobstreamtypes.DataCommitmentTupleRootSignBytes(big.NewInt(int64(att.Nonce)), commitment) + err = orch.ProcessDataCommitmentEvent(ctx, *att, dataRootTupleRoot) + require.NoError(t, err) + + relayer := qgbtesting.NewRelayer(t, node) + go node.EVMChain.PeriodicCommit(ctx, time.Millisecond) + _, _, _, err = relayer.EVMClient.DeployBlobstreamContract(node.EVMChain.Auth, node.EVMChain.Backend, *latestValset.ToValset(), latestValset.Nonce, true) + require.NoError(t, err) + + // make sure the relayer is able to relay the signature using the pruned valset + tx, err := relayer.ProcessAttestation(ctx, node.EVMChain.Auth, att) + require.NoError(t, err) + + receipt, err := relayer.EVMClient.WaitForTransaction(ctx, node.EVMChain.Backend, tx) + assert.NoError(t, err) + assert.Equal(t, uint64(1), receipt.Status) + + lastNonce, err := relayer.EVMClient.StateLastEventNonce(nil) + require.NoError(t, err) + assert.Equal(t, att.Nonce, lastNonce) +} + +func (s *RelayerTestSuite) TestQueryValsetFromP2P() { + t := s.T() + _, err := s.Node.CelestiaNetwork.WaitForHeightWithTimeout(400, 30*time.Second) + require.NoError(t, err) + + ctx := context.Background() + + // process valset nonce so that it is added to the DHT + vs, err := s.Orchestrator.AppQuerier.QueryLatestValset(ctx) + require.NoError(t, err) + err = s.Orchestrator.ProcessValsetEvent(ctx, *vs) + require.NoError(t, err) + + // the valset should be in the DHT + _, err = s.Orchestrator.P2PQuerier.QueryLatestValset(ctx) + require.NoError(t, err) + + // query the valset and authenticate it + p2pVS, err := s.Relayer.QueryValsetFromP2PNetworkAndValidateIt(ctx) + require.NoError(t, err) + + // check if the valset is the same + assert.Equal(t, vs.Nonce, p2pVS.Nonce) + assert.Equal(t, vs.Height, p2pVS.Height) + + // check if the hash is the same + appVSHash, err := vs.Hash() + require.NoError(t, err) + p2pVSHash, err := p2pVS.Hash() + require.NoError(t, err) + + assert.True(t, bytes.Equal(appVSHash.Bytes(), p2pVSHash.Bytes())) +} diff --git a/types/latest_valset.go b/types/latest_valset.go new file mode 100644 index 00000000..0bed57e7 --- /dev/null +++ b/types/latest_valset.go @@ -0,0 +1,73 @@ +package types + +import ( + "encoding/json" + "time" + + "github.com/celestiaorg/celestia-app/x/qgb/types" +) + +// LatestValset a replica of the types.Valset to omit marshalling `time` as it bears different results on different machines. +type LatestValset struct { + // Universal nonce defined under: + // https://github.com/celestiaorg/celestia-app/pull/464 + Nonce uint64 `json:"nonce,omitempty"` + // List of BridgeValidator containing the current validator set. + Members []types.BridgeValidator `json:"members"` + // Current chain height + Height uint64 `json:"height,omitempty"` +} + +func (v LatestValset) ToValset() *types.Valset { + return &types.Valset{ + Nonce: v.Nonce, + Members: v.Members, + Height: v.Height, + Time: time.UnixMicro(1), // it's alright to put an arbitrary value in here since the time is not used in hash creation nor the threshold. + } +} + +func ToLatestValset(vs types.Valset) *LatestValset { + return &LatestValset{ + Nonce: vs.Nonce, + Members: vs.Members, + Height: vs.Height, + } +} + +// MarshalLatestValset Encodes a valset to Json bytes. +func MarshalLatestValset(lv LatestValset) ([]byte, error) { + encoded, err := json.Marshal(lv) + if err != nil { + return nil, err + } + return encoded, nil +} + +// UnmarshalLatestValset Decodes a valset from Json bytes. +func UnmarshalLatestValset(encoded []byte) (LatestValset, error) { + var valset LatestValset + err := json.Unmarshal(encoded, &valset) + if err != nil { + return LatestValset{}, err + } + return valset, nil +} + +// IsEmptyLatestValset takes a valset and checks if it is empty. +func IsEmptyLatestValset(latestValset LatestValset) bool { + emptyVs := types.Valset{} + return latestValset.Nonce == emptyVs.Nonce && + latestValset.Height == emptyVs.Height && + len(latestValset.Members) == 0 +} + +func IsValsetEqualToLatestValset(vs types.Valset, lvs LatestValset) bool { + for index, value := range vs.Members { + if value.EvmAddress != lvs.Members[index].EvmAddress || + value.Power != lvs.Members[index].Power { + return false + } + } + return vs.Nonce == lvs.Nonce && vs.Height == lvs.Height +} diff --git a/types/latest_valset_test.go b/types/latest_valset_test.go new file mode 100644 index 00000000..4d2c9728 --- /dev/null +++ b/types/latest_valset_test.go @@ -0,0 +1,56 @@ +package types_test + +import ( + "testing" + "time" + + celestiatypes "github.com/celestiaorg/celestia-app/x/qgb/types" + + "github.com/celestiaorg/orchestrator-relayer/types" + "github.com/stretchr/testify/assert" +) + +func TestMarshalValset(t *testing.T) { + valset := types.LatestValset{ + Nonce: 10, + Height: 5, + Members: []celestiatypes.BridgeValidator{ + { + Power: 100, + EvmAddress: "evm_addr1", + }, + { + Power: 200, + EvmAddress: "evm_addr2", + }, + }, + } + + jsonData, err := types.MarshalLatestValset(valset) + assert.NoError(t, err) + expectedJSON := `{"nonce":10,"members":[{"power":100,"evm_address":"evm_addr1"},{"power":200,"evm_address":"evm_addr2"}],"height":5}` + assert.Equal(t, expectedJSON, string(jsonData)) +} + +func TestUnmarshalValset(t *testing.T) { + jsonData := []byte(`{"nonce":10,"members":[{"power":100,"evm_address":"evm_addr1"},{"power":200,"evm_address":"evm_addr2"}],"height":5}`) + expectedValset := celestiatypes.Valset{ + Nonce: 10, + Time: time.UnixMicro(10), + Height: 5, + Members: []celestiatypes.BridgeValidator{ + { + Power: 100, + EvmAddress: "evm_addr1", + }, + { + Power: 200, + EvmAddress: "evm_addr2", + }, + }, + } + + valset, err := types.UnmarshalLatestValset(jsonData) + assert.NoError(t, err) + assert.True(t, types.IsValsetEqualToLatestValset(expectedValset, valset)) +}