diff --git a/orchestrator/broadcaster.go b/orchestrator/broadcaster.go index f6d874c1..71800afa 100644 --- a/orchestrator/broadcaster.go +++ b/orchestrator/broadcaster.go @@ -3,6 +3,8 @@ package orchestrator import ( "context" + types2 "github.com/celestiaorg/celestia-app/x/qgb/types" + "github.com/celestiaorg/orchestrator-relayer/p2p" "github.com/celestiaorg/orchestrator-relayer/types" @@ -29,3 +31,10 @@ func (b Broadcaster) ProvideValsetConfirm(ctx context.Context, nonce uint64, con } return b.BlobstreamDHT.PutValsetConfirm(ctx, p2p.GetValsetConfirmKey(nonce, confirm.EthAddress, signBytes), confirm) } + +func (b Broadcaster) ProvideLatestValset(ctx context.Context, valset types2.Valset) error { + if len(b.BlobstreamDHT.RoutingTable().ListPeers()) == 0 { + return ErrEmptyPeersTable + } + return b.BlobstreamDHT.PutLatestValset(ctx, valset) +} diff --git a/orchestrator/broadcaster_test.go b/orchestrator/broadcaster_test.go index 519cd694..a04deb46 100644 --- a/orchestrator/broadcaster_test.go +++ b/orchestrator/broadcaster_test.go @@ -5,6 +5,9 @@ import ( "encoding/hex" "math/big" "testing" + "time" + + types2 "github.com/celestiaorg/celestia-app/x/qgb/types" "github.com/celestiaorg/orchestrator-relayer/evm" "github.com/ethereum/go-ethereum/accounts/keystore" @@ -97,6 +100,39 @@ func TestBroadcastValsetConfirm(t *testing.T) { assert.Equal(t, *expectedConfirm, actualConfirm) } +func TestBroadcastLatestValset(t *testing.T) { + network := blobstreamtesting.NewDHTNetwork(context.Background(), 4) + defer network.Stop() + + // create a test Valset + expectedValset := types2.Valset{ + Time: time.UnixMicro(10), + Height: 5, + Members: []types2.BridgeValidator{ + { + Power: 100, + EvmAddress: "evm_addr1", + }, + { + Power: 200, + EvmAddress: "evm_addr2", + }, + }, + } + + // Broadcast the valset + broadcaster := orchestrator.NewBroadcaster(network.DHTs[1]) + err := broadcaster.ProvideLatestValset(context.Background(), expectedValset) + assert.NoError(t, err) + + // try to get the valset from another peer + actualConfirm, err := network.DHTs[3].GetLatestValset(context.Background()) + assert.NoError(t, err) + assert.NotNil(t, actualConfirm) + + assert.Equal(t, expectedValset, actualConfirm) +} + // TestEmptyPeersTable tests that values are not broadcasted if the DHT peers // table is empty. func TestEmptyPeersTable(t *testing.T) { diff --git a/orchestrator/orchestrator.go b/orchestrator/orchestrator.go index ee82b2c0..97d49fc7 100644 --- a/orchestrator/orchestrator.go +++ b/orchestrator/orchestrator.go @@ -279,7 +279,13 @@ func (orch Orchestrator) Process(ctx context.Context, nonce uint64) error { previousValset, err := orch.AppQuerier.QueryLastValsetBeforeNonce(ctx, att.GetNonce()) if err != nil { orch.Logger.Debug("failed to query last valset before nonce (most likely pruned). signing anyway", "err", err.Error()) - } else if !ValidatorPartOfValset(previousValset.Members, orch.EvmAccount.Address.Hex()) { + } + + // add the valset to the p2p network + // it's alright if this fails, we can expect other nodes to do it successfully + _ = orch.Broadcaster.ProvideLatestValset(ctx, *previousValset) + + if !ValidatorPartOfValset(previousValset.Members, orch.EvmAccount.Address.Hex()) { // no need to sign if the orchestrator is not part of the validator set that needs to sign the attestation orch.Logger.Debug("validator not part of valset. won't sign", "nonce", nonce) return nil @@ -341,6 +347,10 @@ func (orch Orchestrator) Process(ctx context.Context, nonce uint64) error { } func (orch Orchestrator) ProcessValsetEvent(ctx context.Context, valset celestiatypes.Valset) error { + // add the valset to the p2p network + // it's alright if this fails, we can expect other nodes to do it successfully + _ = orch.Broadcaster.ProvideLatestValset(ctx, valset) + signBytes, err := valset.SignBytes() if err != nil { return err diff --git a/orchestrator/orchestrator_test.go b/orchestrator/orchestrator_test.go index e60a2ed1..8c150b92 100644 --- a/orchestrator/orchestrator_test.go +++ b/orchestrator/orchestrator_test.go @@ -79,6 +79,32 @@ func (s *OrchestratorTestSuite) TestProcessValsetEvent() { assert.Equal(t, s.Orchestrator.EvmAccount.Address.Hex(), confirm.EthAddress) } +func (s *OrchestratorTestSuite) TestProcessValsetEventAndProvideValset() { + t := s.T() + _, err := s.Node.CelestiaNetwork.WaitForHeight(50) + require.NoError(t, err) + + vs, err := celestiatypes.NewValset( + 2, + 10, + []*celestiatypes.InternalBridgeValidator{{ + Power: 10, + EVMAddress: s.Orchestrator.EvmAccount.Address, + }}, + time.Now(), + ) + require.NoError(t, err) + + // signing and submitting the signature and also providing the valset event + err = s.Orchestrator.ProcessValsetEvent(s.Node.Context, *vs) + require.NoError(t, err) + + // retrieving the valset + actualVs, err := s.Node.DHTNetwork.DHTs[0].GetLatestValset(s.Node.Context) + require.NoError(t, err) + assert.Equal(t, vs.Nonce, actualVs.Nonce) +} + func TestValidatorPartOfValset(t *testing.T) { tests := []struct { name string diff --git a/p2p/dht.go b/p2p/dht.go index b9e4aeea..9d5b1777 100644 --- a/p2p/dht.go +++ b/p2p/dht.go @@ -4,10 +4,12 @@ import ( "context" "time" + types2 "github.com/celestiaorg/celestia-app/x/qgb/types" + "github.com/libp2p/go-libp2p-kad-dht/providers" + "github.com/celestiaorg/orchestrator-relayer/types" ds "github.com/ipfs/go-datastore" dht "github.com/libp2p/go-libp2p-kad-dht" - "github.com/libp2p/go-libp2p-kad-dht/providers" "github.com/libp2p/go-libp2p/core/host" "github.com/libp2p/go-libp2p/core/peer" tmlog "github.com/tendermint/tendermint/libs/log" @@ -17,6 +19,7 @@ const ( ProtocolPrefix = "/blobstream/0.1.0" // TODO "/blobstream/" ? DataCommitmentConfirmNamespace = "dcc" ValsetConfirmNamespace = "vc" + LatestValsetNamespace = "lv" ) // BlobstreamDHT wrapper around the `IpfsDHT` implementation. @@ -29,9 +32,9 @@ type BlobstreamDHT struct { // NewBlobstreamDHT create a new IPFS DHT using a suitable configuration for the Blobstream. // If nil is passed for bootstrappers, the DHT will not try to connect to any existing peer. func NewBlobstreamDHT(ctx context.Context, h host.Host, store ds.Batching, bootstrappers []peer.AddrInfo, logger tmlog.Logger) (*BlobstreamDHT, error) { - // this value is set to 23 days, which is the unbonding period. - // we want to have the signatures available for this whole period. - providers.ProvideValidity = time.Hour * 24 * 23 + // this values is set to a year, so that even in super-stable networks, we have at least + // one valset in store for a year. + providers.ProvideValidity = time.Hour * 24 * 365 router, err := dht.New( ctx, @@ -41,6 +44,7 @@ func NewBlobstreamDHT(ctx context.Context, h host.Host, store ds.Batching, boots dht.ProtocolPrefix(ProtocolPrefix), dht.NamespacedValidator(DataCommitmentConfirmNamespace, DataCommitmentConfirmValidator{}), dht.NamespacedValidator(ValsetConfirmNamespace, ValsetConfirmValidator{}), + dht.NamespacedValidator(LatestValsetNamespace, LatestValsetValidator{}), dht.BootstrapPeers(bootstrappers...), dht.DisableProviders(), ) @@ -98,7 +102,7 @@ func (q BlobstreamDHT) WaitForPeers(ctx context.Context, timeout time.Duration, // and valset confirms. The checks are supposed to be handled by the validators under `p2p/validators.go`. // Same goes for the Marshal and Unmarshal methods (as long as they're using simple Json encoding). -// PutDataCommitmentConfirm encodes a data commitment confirm then puts its value to the DHT. +// PutDataCommitmentConfirm encodes a data commitment confirm then puts its values to the DHT. // The key can be generated using the `GetDataCommitmentConfirmKey` method. // Returns an error if it fails to do so. func (q BlobstreamDHT) PutDataCommitmentConfirm(ctx context.Context, key string, dcc types.DataCommitmentConfirm) error { @@ -128,7 +132,7 @@ func (q BlobstreamDHT) GetDataCommitmentConfirm(ctx context.Context, key string) return confirm, nil } -// PutValsetConfirm encodes a valset confirm then puts its value to the DHT. +// PutValsetConfirm encodes a valset confirm then puts its values to the DHT. // The key can be generated using the `GetValsetConfirmKey` method. // Returns an error if it fails to do so. func (q BlobstreamDHT) PutValsetConfirm(ctx context.Context, key string, vc types.ValsetConfirm) error { @@ -157,3 +161,34 @@ func (q BlobstreamDHT) GetValsetConfirm(ctx context.Context, key string) (types. } return confirm, nil } + +// PutLatestValset encodes a valset then puts its values to the DHT. +// The key will be returned by the `GetValsetKey` method. +// If the valset is not the latest, it will fail. +// Returns an error if it fails to do so. +func (q BlobstreamDHT) PutLatestValset(ctx context.Context, v types2.Valset) error { + encodedData, err := types.MarshalValset(v) + if err != nil { + return err + } + err = q.PutValue(ctx, GetLatestValsetKey(), encodedData) + if err != nil { + return err + } + return nil +} + +// GetLatestValset looks for a valset referenced by its key in the DHT. +// The key will be returned by the `GetValsetKey` method. +// Returns an error if it fails to get the valset. +func (q BlobstreamDHT) GetLatestValset(ctx context.Context) (types2.Valset, error) { + encoded, err := q.GetValue(ctx, GetLatestValsetKey()) // this is a blocking call, we should probably use timeout and channel + if err != nil { + return types2.Valset{}, err + } + valset, err := types.UnmarshalValset(encoded) + if err != nil { + return types2.Valset{}, err + } + return valset, nil +} diff --git a/p2p/dht_test.go b/p2p/dht_test.go index 9ebfcecc..7435b01e 100644 --- a/p2p/dht_test.go +++ b/p2p/dht_test.go @@ -7,6 +7,8 @@ import ( "testing" "time" + types2 "github.com/celestiaorg/celestia-app/x/qgb/types" + "github.com/celestiaorg/orchestrator-relayer/evm" "github.com/ethereum/go-ethereum/accounts/keystore" @@ -95,6 +97,109 @@ func TestPutDataCommitmentConfirm(t *testing.T) { assert.Equal(t, expectedConfirm, actualConfirm) } +func TestPutLatestValset(t *testing.T) { + network := blobstreamtesting.NewDHTNetwork(context.Background(), 2) + defer network.Stop() + + // create a test Valset + expectedValset := types2.Valset{ + Nonce: 10, + Time: time.UnixMicro(10), + Height: 5, + Members: []types2.BridgeValidator{ + { + Power: 100, + EvmAddress: "evm_addr1", + }, + { + Power: 200, + EvmAddress: "evm_addr2", + }, + }, + } + + // put the test Valset in the DHT + err := network.DHTs[0].PutLatestValset(context.Background(), expectedValset) + assert.NoError(t, err) + + // try to get the latest valset from the same peer + actualValset, err := network.DHTs[0].GetLatestValset(context.Background()) + assert.NoError(t, err) + assert.NotNil(t, actualValset) + + assert.Equal(t, expectedValset, actualValset) +} + +func TestPutMultipleLatestValset(t *testing.T) { + network := blobstreamtesting.NewDHTNetwork(context.Background(), 3) + defer network.Stop() + + // create test Valsets + valset1 := types2.Valset{ + Nonce: 10, + Time: time.UnixMicro(10), + Height: 5, + Members: []types2.BridgeValidator{ + { + Power: 100, + EvmAddress: "evm_addr1", + }, + { + Power: 200, + EvmAddress: "evm_addr2", + }, + }, + } + valset2 := types2.Valset{ + Nonce: 11, + Time: time.UnixMicro(10), + Height: 5, + Members: []types2.BridgeValidator{ + { + Power: 100, + EvmAddress: "evm_addr1", + }, + { + Power: 200, + EvmAddress: "evm_addr2", + }, + }, + } + valset3 := types2.Valset{ + Nonce: 9, + Time: time.UnixMicro(10), + Height: 5, + Members: []types2.BridgeValidator{ + { + Power: 100, + EvmAddress: "evm_addr1", + }, + { + Power: 200, + EvmAddress: "evm_addr2", + }, + }, + } + + // put the valsets in the DHT + err := network.DHTs[0].PutLatestValset(context.Background(), valset1) + assert.NoError(t, err) + + err = network.DHTs[1].PutLatestValset(context.Background(), valset2) + assert.NoError(t, err) + + // this one should fail since it puts an older valset than ones in store + err = network.DHTs[2].PutLatestValset(context.Background(), valset3) + assert.Error(t, err) + + // try to get the valset from the same peer + actualValset, err := network.DHTs[0].GetLatestValset(context.Background()) + assert.NoError(t, err) + assert.NotNil(t, actualValset) + + assert.Equal(t, valset2, actualValset) +} + func TestNetworkPutDataCommitmentConfirm(t *testing.T) { network := blobstreamtesting.NewDHTNetwork(context.Background(), 10) defer network.Stop() diff --git a/p2p/errors.go b/p2p/errors.go index dd1b1aec..02922326 100644 --- a/p2p/errors.go +++ b/p2p/errors.go @@ -1,6 +1,8 @@ package p2p -import "errors" +import ( + "errors" +) var ( ErrPeersTimeout = errors.New("timeout while waiting for peers") @@ -13,8 +15,10 @@ var ( ErrNotTheSameEVMAddress = errors.New("not the same evm address") ErrInvalidConfirmKey = errors.New("invalid confirm key") ErrNoValues = errors.New("can't select from no values") - ErrNoValidValueFound = errors.New("no valid dht confirm value found") + ErrNoValidValueFound = errors.New("no valid dht confirm values found") ErrEmptyNamespace = errors.New("empty namespace") ErrEmptyEVMAddr = errors.New("empty evm address") ErrEmptyDigest = errors.New("empty digest") + ErrEmptyValset = errors.New("empty valset") + ErrInvalidLatestValsetKey = errors.New("invalid latest valset key") ) diff --git a/p2p/keys.go b/p2p/keys.go index 6b7132c2..68be6f9e 100644 --- a/p2p/keys.go +++ b/p2p/keys.go @@ -32,6 +32,11 @@ func GetValsetConfirmKey(nonce uint64, evmAddr string, signBytes string) string evmAddr + ":" + signBytes } +// GetLatestValsetKey creates the latest valset key. +func GetLatestValsetKey() string { + return "/" + LatestValsetNamespace + "/" + "latest" +} + // ParseKey parses a key and returns its fields. // Will return an error if the key is missing some fields, some fields are empty, or otherwise invalid. func ParseKey(key string) (namespace string, nonce uint64, evmAddr string, digest string, err error) { diff --git a/p2p/querier.go b/p2p/querier.go index 3be409ee..5748b268 100644 --- a/p2p/querier.go +++ b/p2p/querier.go @@ -318,3 +318,14 @@ func (q Querier) QueryValsetConfirms(ctx context.Context, nonce uint64, valset c } return confirms, nil } + +// QueryLatestValset get the latest valset from the p2p network. +func (q Querier) QueryLatestValset( + ctx context.Context, +) (*celestiatypes.Valset, error) { + valset, err := q.BlobstreamDHT.GetLatestValset(ctx) + if err != nil { + return nil, err + } + return &valset, nil +} diff --git a/p2p/validators.go b/p2p/validators.go index 3c99f0e6..a3ab3fc0 100644 --- a/p2p/validators.go +++ b/p2p/validators.go @@ -12,7 +12,7 @@ import ( // ValsetConfirmValidator runs stateless checks on valset confirms when submitting them to the DHT. type ValsetConfirmValidator struct{} -// Validate runs stateless checks on the provided confirm key and value. +// Validate runs stateless checks on the provided confirm key and values. func (vcv ValsetConfirmValidator) Validate(key string, value []byte) error { namespace, _, evmAddr, signBytes, err := ParseKey(key) if err != nil { @@ -71,14 +71,57 @@ func (vcv ValsetConfirmValidator) Validate(key string, value []byte) error { return nil } -// Select selects a valid dht confirm value from multiple ones. -// returns an error of no valid value is found. +// LatestValsetValidator runs stateless checks on the latest valset when submitting it to the DHT. +type LatestValsetValidator struct{} + +// Validate runs stateless checks on the provided valset key and values. +func (lcv LatestValsetValidator) Validate(key string, value []byte) error { + vs, err := types.UnmarshalValset(value) + if err != nil { + return err + } + if types.IsEmptyValset(vs) { + return ErrEmptyValset + } + if key != GetLatestValsetKey() { + return ErrInvalidLatestValsetKey + } + return nil +} + +// Select selects a valid dht valset values from multiple ones. +// returns the latest one ordered by nonces. +// returns an error of no valid values is found. +func (lcv LatestValsetValidator) Select(key string, values [][]byte) (int, error) { + if key != GetLatestValsetKey() { + return 0, ErrInvalidLatestValsetKey + } + if len(values) == 0 { + return 0, ErrNoValues + } + latestNonce := uint64(0) + latestIndex := 0 + for index, value := range values { + valset, err := types.UnmarshalValset(value) + if err != nil { + return 0, err + } + if valset.Nonce > latestNonce { + latestIndex = index + } + latestNonce = valset.Nonce + } + return latestIndex, nil +} + +// Select selects a valid dht confirm values from multiple ones. +// returns an error of no valid values is found. func (vcv ValsetConfirmValidator) Select(key string, values [][]byte) (int, error) { if len(values) == 0 { return 0, ErrNoValues } for index, value := range values { - // choose the first correct value + // choose the first correct values if err := vcv.Validate(key, value); err == nil { return index, nil } @@ -89,7 +132,7 @@ func (vcv ValsetConfirmValidator) Select(key string, values [][]byte) (int, erro // DataCommitmentConfirmValidator runs stateless checks on data commitment confirms when submitting to the DHT. type DataCommitmentConfirmValidator struct{} -// Validate runs stateless checks on the provided confirm key and value. +// Validate runs stateless checks on the provided confirm key and values. func (dcv DataCommitmentConfirmValidator) Validate(key string, value []byte) error { namespace, _, evmAddr, dataRootTupleRoot, err := ParseKey(key) if err != nil { @@ -148,14 +191,14 @@ func (dcv DataCommitmentConfirmValidator) Validate(key string, value []byte) err return nil } -// Select selects a valid dht confirm value from multiple ones. -// returns an error of no valid value is found. +// Select selects a valid dht confirm values from multiple ones. +// returns an error of no valid values is found. func (dcv DataCommitmentConfirmValidator) Select(key string, values [][]byte) (int, error) { if len(values) == 0 { return 0, ErrNoValues } for index, value := range values { - // choose the first correct value + // choose the first correct values if err := dcv.Validate(key, value); err == nil { return index, nil } diff --git a/p2p/validators_test.go b/p2p/validators_test.go index 15a18886..6485a562 100644 --- a/p2p/validators_test.go +++ b/p2p/validators_test.go @@ -5,6 +5,8 @@ import ( "math/big" "testing" + types2 "github.com/celestiaorg/celestia-app/x/qgb/types" + "github.com/celestiaorg/orchestrator-relayer/evm" "github.com/ethereum/go-ethereum/accounts/keystore" @@ -449,3 +451,116 @@ func TestDataCommitmentConfirmSelect(t *testing.T) { }) } } + +func TestLatestValsetValidatorValidate(t *testing.T) { + emptyVs, _ := types.MarshalValset(types2.Valset{}) + tests := []struct { + name string + key string + value []byte + wantErr bool + }{ + { + name: "valid key and values", + key: GetLatestValsetKey(), + value: []byte(`{"nonce":10,"members":[{"power":100,"evm_address":"evm_addr1"}],"height":5,"time":"1970-01-01T01:00:00.00001+01:00"}`), + wantErr: false, + }, + { + name: "invalid key", + key: "invalid_key", + value: []byte(`{"nonce":10,"members":[{"power":100,"evm_address":"evm_addr1"}],"height":5,"time":"1970-01-01T01:00:00.00001+01:00"}`), + wantErr: true, + }, + { + name: "empty valset", + key: GetLatestValsetKey(), + value: emptyVs, + wantErr: true, + }, + { + name: "invalid values", + key: GetLatestValsetKey(), + value: []byte(`{"nonce":"invalid nonce","members":[{"power":100,"evm_address":"evm_addr1"}],"height":5,"time":"1970-01-01T01:00:00.00001+01:00"}`), + wantErr: true, + }, + } + + lcv := LatestValsetValidator{} + + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + err := lcv.Validate(test.key, test.value) + if test.wantErr { + assert.Error(t, err) + } else { + assert.NoError(t, err) + } + }) + } +} + +func TestLatestValsetValidatorSelect(t *testing.T) { + tests := []struct { + name string + key string + values [][]byte + wantErr bool + index int + }{ + { + name: "no values", + key: GetLatestValsetKey(), + values: [][]byte{}, + wantErr: true, + }, + { + name: "single value", + key: GetLatestValsetKey(), + values: [][]byte{[]byte(`{"nonce":10,"members":[{"power":100,"evm_address":"evm_addr1"}],"height":5,"time":"1970-01-01T01:00:00.00001+01:00"}`)}, + wantErr: false, + index: 0, + }, + { + name: "multiple values and last is latest", + key: GetLatestValsetKey(), + values: [][]byte{ + []byte(`{"nonce":10,"members":[{"power":100,"evm_address":"evm_addr1"}],"height":5,"time":"1970-01-01T01:00:00.00001+01:00"}`), + []byte(`{"nonce":11,"members":[{"power":100,"evm_address":"evm_addr1"}],"height":5,"time":"1970-01-01T01:00:00.00001+01:00"}`), + []byte(`{"nonce":12,"members":[{"power":100,"evm_address":"evm_addr1"}],"height":5,"time":"1970-01-01T01:00:00.00001+01:00"}`), + }, + wantErr: false, + index: 2, + }, + { + name: "multiple values and middle one is invalid", + key: GetLatestValsetKey(), + values: [][]byte{ + []byte(`{"nonce":10,"members":[{"power":100,"evm_address":"evm_addr1"}],"height":5,"time":"1970-01-01T01:00:00.00001+01:00"}`), + []byte(`{"nonce":"invalid nonce","members":[{"power":100,"evm_address":"evm_addr1"}],"height":5,"time":"1970-01-01T01:00:00.00001+01:00"}`), + []byte(`{"nonce":12,"members":[{"power":100,"evm_address":"evm_addr1"}],"height":5,"time":"1970-01-01T01:00:00.00001+01:00"}`), + }, + wantErr: true, + }, + { + name: "invalid key", + key: "invalid key", + values: [][]byte{[]byte(`{"nonce":10,"members":[{"power":100,"evm_address":"evm_addr1"}],"height":5,"time":"1970-01-01T01:00:00.00001+01:00"}`)}, + wantErr: true, + }, + } + + lcv := LatestValsetValidator{} + + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + index, err := lcv.Select(test.key, test.values) + if test.wantErr { + assert.Error(t, err) + } else { + assert.NoError(t, err) + assert.Equal(t, test.index, index) + } + }) + } +} diff --git a/relayer/relayer.go b/relayer/relayer.go index ea53a2ee..0c10af72 100644 --- a/relayer/relayer.go +++ b/relayer/relayer.go @@ -141,12 +141,17 @@ func (r *Relayer) Start(ctx context.Context) error { } func (r *Relayer) ProcessAttestation(ctx context.Context, opts *bind.TransactOpts, attI celestiatypes.AttestationRequestI) (*coregethtypes.Transaction, error) { - switch att := attI.(type) { - case *celestiatypes.Valset: - previousValset, err := r.AppQuerier.QueryLastValsetBeforeNonce(ctx, att.Nonce) + previousValset, err := r.AppQuerier.QueryLastValsetBeforeNonce(ctx, attI.GetNonce()) + if err != nil { + r.logger.Error("failed to query the last valset before nonce (probably pruned). recovering via falling back to the P2P network", "err", err.Error()) + previousValset, err = r.P2PQuerier.QueryLatestValset(ctx) if err != nil { return nil, err } + r.logger.Info("using the latest valset from P2P network. if the valset is malicious, the Blobstream contract will not accept it", "nonce", previousValset.Nonce) + } + switch att := attI.(type) { + case *celestiatypes.Valset: signBytes, err := att.SignBytes() if err != nil { return nil, err @@ -165,16 +170,12 @@ func (r *Relayer) ProcessAttestation(ctx context.Context, opts *bind.TransactOpt } return tx, nil case *celestiatypes.DataCommitment: - valset, err := r.AppQuerier.QueryLastValsetBeforeNonce(ctx, att.Nonce) - if err != nil { - return nil, err - } commitment, err := r.TmQuerier.QueryCommitment(ctx, att.BeginBlock, att.EndBlock) if err != nil { return nil, err } dataRootHash := types.DataCommitmentTupleRootSignBytes(big.NewInt(int64(att.Nonce)), commitment) - confirms, err := r.P2PQuerier.QueryTwoThirdsDataCommitmentConfirms(ctx, 30*time.Minute, 10*time.Second, *valset, att.Nonce, dataRootHash.Hex()) + confirms, err := r.P2PQuerier.QueryTwoThirdsDataCommitmentConfirms(ctx, 30*time.Minute, 10*time.Second, *previousValset, att.Nonce, dataRootHash.Hex()) if err != nil { return nil, err } @@ -182,7 +183,7 @@ func (r *Relayer) ProcessAttestation(ctx context.Context, opts *bind.TransactOpt if err != nil { return nil, err } - tx, err := r.SubmitDataRootTupleRoot(opts, *att, *valset, commitment.String(), confirms) + tx, err := r.SubmitDataRootTupleRoot(opts, *att, *previousValset, commitment.String(), confirms) if err != nil { return nil, err } diff --git a/relayer/relayer_test.go b/relayer/relayer_test.go index 1b35a4fa..139dbe5d 100644 --- a/relayer/relayer_test.go +++ b/relayer/relayer_test.go @@ -3,8 +3,14 @@ package relayer_test import ( "context" "math/big" + "testing" "time" + "github.com/celestiaorg/celestia-app/app" + "github.com/celestiaorg/celestia-app/app/encoding" + "github.com/celestiaorg/celestia-app/test/util/testnode" + qgbtesting "github.com/celestiaorg/orchestrator-relayer/testing" + "github.com/celestiaorg/orchestrator-relayer/p2p" "github.com/ipfs/go-datastore" @@ -47,3 +53,68 @@ func (s *RelayerTestSuite) TestProcessAttestation() { require.NoError(t, err) assert.True(t, has) } + +func TestUseValsetFromP2P(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + codec := encoding.MakeConfig(app.ModuleEncodingRegisters...).Codec + node := qgbtesting.NewTestNode( + ctx, + t, + qgbtesting.CelestiaNetworkParams{ + GenesisOpts: []testnode.GenesisOption{ + testnode.ImmediateProposals(codec), + qgbtesting.SetDataCommitmentWindowParams(codec, types.Params{DataCommitmentWindow: 101}), + }, + TimeIotaMs: 2000000, // so attestations are pruned after they're queried + }, + ) + + // process valset nonce so that it is added to the DHT + orch := qgbtesting.NewOrchestrator(t, node) + vs, err := orch.AppQuerier.QueryLatestValset(ctx) + require.NoError(t, err) + err = orch.ProcessValsetEvent(ctx, *vs) + require.NoError(t, err) + + _, err = node.CelestiaNetwork.WaitForHeight(400) + require.NoError(t, err) + + for { + time.Sleep(time.Second) + // Wait until the valset is pruned + _, err = orch.AppQuerier.QueryLatestValset(ctx) + if err != nil { + break + } + } + + // the valset should be in the DHT + latestValset, err := orch.P2PQuerier.QueryLatestValset(ctx) + require.NoError(t, err) + + att := types.NewDataCommitment(latestValset.Nonce+1, 10, 100, time.Now()) + commitment, err := orch.TmQuerier.QueryCommitment(ctx, att.BeginBlock, att.EndBlock) + require.NoError(t, err) + dataRootTupleRoot := blobstreamtypes.DataCommitmentTupleRootSignBytes(big.NewInt(int64(att.Nonce)), commitment) + err = orch.ProcessDataCommitmentEvent(ctx, *att, dataRootTupleRoot) + require.NoError(t, err) + + relayer := qgbtesting.NewRelayer(t, node) + go node.EVMChain.PeriodicCommit(ctx, time.Millisecond) + _, _, _, err = relayer.EVMClient.DeployBlobstreamContract(node.EVMChain.Auth, node.EVMChain.Backend, *latestValset, latestValset.Nonce, true) + require.NoError(t, err) + + // make sure the relayer is able to relay the signature using the pruned valset + tx, err := relayer.ProcessAttestation(ctx, node.EVMChain.Auth, att) + require.NoError(t, err) + + receipt, err := relayer.EVMClient.WaitForTransaction(ctx, node.EVMChain.Backend, tx) + assert.NoError(t, err) + assert.Equal(t, uint64(1), receipt.Status) + + lastNonce, err := relayer.EVMClient.StateLastEventNonce(nil) + require.NoError(t, err) + assert.Equal(t, att.Nonce, lastNonce) +} diff --git a/types/latest_valset.go b/types/latest_valset.go new file mode 100644 index 00000000..cf878889 --- /dev/null +++ b/types/latest_valset.go @@ -0,0 +1,35 @@ +package types + +import ( + "encoding/json" + + "github.com/celestiaorg/celestia-app/x/qgb/types" +) + +// MarshalValset Encodes a valset to Json bytes. +func MarshalValset(lv types.Valset) ([]byte, error) { + encoded, err := json.Marshal(lv) + if err != nil { + return nil, err + } + return encoded, nil +} + +// UnmarshalValset Decodes a valset from Json bytes. +func UnmarshalValset(encoded []byte) (types.Valset, error) { + var valset types.Valset + err := json.Unmarshal(encoded, &valset) + if err != nil { + return types.Valset{}, err + } + return valset, nil +} + +// IsEmptyValset takes a msg valset and checks if it is empty. +func IsEmptyValset(valset types.Valset) bool { + emptyVs := types.Valset{} + return valset.Time.Equal(emptyVs.Time) && + valset.Nonce == emptyVs.Nonce && + valset.Height == emptyVs.Height && + len(valset.Members) == 0 +} diff --git a/types/latest_valset_test.go b/types/latest_valset_test.go new file mode 100644 index 00000000..c9b90036 --- /dev/null +++ b/types/latest_valset_test.go @@ -0,0 +1,57 @@ +package types_test + +import ( + "testing" + "time" + + types2 "github.com/celestiaorg/celestia-app/x/qgb/types" + + "github.com/celestiaorg/orchestrator-relayer/types" + "github.com/stretchr/testify/assert" +) + +func TestMarshalValset(t *testing.T) { + valset := types2.Valset{ + Nonce: 10, + Time: time.UnixMicro(10), + Height: 5, + Members: []types2.BridgeValidator{ + { + Power: 100, + EvmAddress: "evm_addr1", + }, + { + Power: 200, + EvmAddress: "evm_addr2", + }, + }, + } + + jsonData, err := types.MarshalValset(valset) + assert.NoError(t, err) + expectedJSON := `{"nonce":10,"members":[{"power":100,"evm_address":"evm_addr1"},{"power":200,"evm_address":"evm_addr2"}],"height":5,"time":"1970-01-01T01:00:00.00001+01:00"}` + assert.Equal(t, string(jsonData), expectedJSON) +} + +func TestUnmarshalValset(t *testing.T) { + jsonData := []byte(`{"nonce":10,"members":[{"power":100,"evm_address":"evm_addr1"},{"power":200,"evm_address":"evm_addr2"}],"height":5,"time":"1970-01-01T01:00:00.00001+01:00"}`) + expectedValset := types2.Valset{ + Nonce: 10, + Time: time.UnixMicro(10), + Height: 5, + Members: []types2.BridgeValidator{ + { + Power: 100, + EvmAddress: "evm_addr1", + }, + { + Power: 200, + EvmAddress: "evm_addr2", + }, + }, + } + + valset, err := types.UnmarshalValset(jsonData) + assert.NoError(t, err) + assert.Equal(t, valset, expectedValset) +}