Skip to content
This repository has been archived by the owner on Apr 15, 2024. It is now read-only.

Commit

Permalink
fix: introduce LatestValset type to fix serialization
Browse files Browse the repository at this point in the history
  • Loading branch information
rach-id committed Oct 26, 2023
1 parent 4df2e5a commit 755ff25
Show file tree
Hide file tree
Showing 12 changed files with 97 additions and 64 deletions.
6 changes: 2 additions & 4 deletions orchestrator/broadcaster.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,6 @@ package orchestrator
import (
"context"

celestiatypes "github.com/celestiaorg/celestia-app/x/qgb/types"

"github.com/celestiaorg/orchestrator-relayer/p2p"

"github.com/celestiaorg/orchestrator-relayer/types"
Expand Down Expand Up @@ -32,9 +30,9 @@ func (b Broadcaster) ProvideValsetConfirm(ctx context.Context, nonce uint64, con
return b.BlobstreamDHT.PutValsetConfirm(ctx, p2p.GetValsetConfirmKey(nonce, confirm.EthAddress, signBytes), confirm)
}

func (b Broadcaster) ProvideLatestValset(ctx context.Context, valset celestiatypes.Valset) error {
func (b Broadcaster) ProvideLatestValset(ctx context.Context, latestValset types.LatestValset) error {
if len(b.BlobstreamDHT.RoutingTable().ListPeers()) == 0 {
return ErrEmptyPeersTable
}
return b.BlobstreamDHT.PutLatestValset(ctx, valset)
return b.BlobstreamDHT.PutLatestValset(ctx, latestValset)
}
8 changes: 4 additions & 4 deletions orchestrator/broadcaster_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,15 +122,15 @@ func TestBroadcastLatestValset(t *testing.T) {

// Broadcast the valset
broadcaster := orchestrator.NewBroadcaster(network.DHTs[1])
err := broadcaster.ProvideLatestValset(context.Background(), expectedValset)
err := broadcaster.ProvideLatestValset(context.Background(), *types.ToLatestValset(expectedValset))
assert.NoError(t, err)

// try to get the valset from another peer
actualConfirm, err := network.DHTs[3].GetLatestValset(context.Background())
actualValset, err := network.DHTs[3].GetLatestValset(context.Background())
assert.NoError(t, err)
assert.NotNil(t, actualConfirm)
assert.NotNil(t, actualValset)

assert.Equal(t, expectedValset, actualConfirm)
assert.True(t, types.IsValsetEqualToLatestValset(expectedValset, actualValset))
}

// TestEmptyPeersTable tests that values are not broadcasted if the DHT peers
Expand Down
4 changes: 2 additions & 2 deletions orchestrator/orchestrator.go
Original file line number Diff line number Diff line change
Expand Up @@ -288,7 +288,7 @@ func (orch Orchestrator) Process(ctx context.Context, nonce uint64) error {
if err == nil && previousValset != nil {
// add the valset to the p2p network
// it's alright if this fails, we can expect other nodes to do it successfully
_ = orch.Broadcaster.ProvideLatestValset(ctx, *previousValset)
_ = orch.Broadcaster.ProvideLatestValset(ctx, *types.ToLatestValset(*previousValset))
}
}

Expand Down Expand Up @@ -349,7 +349,7 @@ func (orch Orchestrator) Process(ctx context.Context, nonce uint64) error {
func (orch Orchestrator) ProcessValsetEvent(ctx context.Context, valset celestiatypes.Valset) error {
// add the valset to the p2p network
// it's alright if this fails, we can expect other nodes to do it successfully
_ = orch.Broadcaster.ProvideLatestValset(ctx, valset)
_ = orch.Broadcaster.ProvideLatestValset(ctx, *types.ToLatestValset(valset))

signBytes, err := valset.SignBytes()
if err != nil {
Expand Down
13 changes: 6 additions & 7 deletions p2p/dht.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import (
"context"
"time"

celestiatypes "github.com/celestiaorg/celestia-app/x/qgb/types"
"github.com/libp2p/go-libp2p-kad-dht/providers"

"github.com/celestiaorg/orchestrator-relayer/types"
Expand Down Expand Up @@ -166,8 +165,8 @@ func (q BlobstreamDHT) GetValsetConfirm(ctx context.Context, key string) (types.
// The key will be returned by the `GetValsetKey` method.
// If the valset is not the latest, it will fail.
// Returns an error if it fails.
func (q BlobstreamDHT) PutLatestValset(ctx context.Context, v celestiatypes.Valset) error {
encodedData, err := types.MarshalValset(v)
func (q BlobstreamDHT) PutLatestValset(ctx context.Context, v types.LatestValset) error {
encodedData, err := types.MarshalLatestValset(v)
if err != nil {
return err
}
Expand All @@ -181,14 +180,14 @@ func (q BlobstreamDHT) PutLatestValset(ctx context.Context, v celestiatypes.Vals
// GetLatestValset looks for the latest valset in the DHT.
// The key will be returned by the `GetValsetKey` method.
// Returns an error if it fails.
func (q BlobstreamDHT) GetLatestValset(ctx context.Context) (celestiatypes.Valset, error) {
func (q BlobstreamDHT) GetLatestValset(ctx context.Context) (types.LatestValset, error) {
encoded, err := q.GetValue(ctx, GetLatestValsetKey()) // this is a blocking call, we should probably use timeout and channel
if err != nil {
return celestiatypes.Valset{}, err
return types.LatestValset{}, err
}
valset, err := types.UnmarshalValset(encoded)
valset, err := types.UnmarshalLatestValset(encoded)
if err != nil {
return celestiatypes.Valset{}, err
return types.LatestValset{}, err
}
return valset, nil
}
12 changes: 6 additions & 6 deletions p2p/dht_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,15 +119,15 @@ func TestPutLatestValset(t *testing.T) {
}

// put the test Valset in the DHT
err := network.DHTs[0].PutLatestValset(context.Background(), expectedValset)
err := network.DHTs[0].PutLatestValset(context.Background(), *types.ToLatestValset(expectedValset))
assert.NoError(t, err)

// try to get the latest valset from the same peer
actualValset, err := network.DHTs[0].GetLatestValset(context.Background())
assert.NoError(t, err)
assert.NotNil(t, actualValset)

assert.Equal(t, expectedValset, actualValset)
assert.True(t, types.IsValsetEqualToLatestValset(expectedValset, actualValset))
}

func TestPutMultipleLatestValset(t *testing.T) {
Expand Down Expand Up @@ -182,22 +182,22 @@ func TestPutMultipleLatestValset(t *testing.T) {
}

// put the valsets in the DHT
err := network.DHTs[0].PutLatestValset(context.Background(), valset1)
err := network.DHTs[0].PutLatestValset(context.Background(), *types.ToLatestValset(valset1))
assert.NoError(t, err)

err = network.DHTs[1].PutLatestValset(context.Background(), valset2)
err = network.DHTs[1].PutLatestValset(context.Background(), *types.ToLatestValset(valset2))
assert.NoError(t, err)

// this one should fail since it puts an older valset than ones in store
err = network.DHTs[2].PutLatestValset(context.Background(), valset3)
err = network.DHTs[2].PutLatestValset(context.Background(), *types.ToLatestValset(valset3))
assert.Error(t, err)

// try to get the valset from the same peer
actualValset, err := network.DHTs[0].GetLatestValset(context.Background())
assert.NoError(t, err)
assert.NotNil(t, actualValset)

assert.Equal(t, valset2, actualValset)
assert.True(t, types.IsValsetEqualToLatestValset(valset2, actualValset))
}

func TestNetworkPutDataCommitmentConfirm(t *testing.T) {
Expand Down
6 changes: 3 additions & 3 deletions p2p/querier.go
Original file line number Diff line number Diff line change
Expand Up @@ -322,10 +322,10 @@ func (q Querier) QueryValsetConfirms(ctx context.Context, nonce uint64, valset c
// QueryLatestValset get the latest valset from the p2p network.
func (q Querier) QueryLatestValset(
ctx context.Context,
) (*celestiatypes.Valset, error) {
valset, err := q.BlobstreamDHT.GetLatestValset(ctx)
) (*types.LatestValset, error) {
latestValset, err := q.BlobstreamDHT.GetLatestValset(ctx)
if err != nil {
return nil, err
}
return &valset, nil
return &latestValset, nil
}
6 changes: 3 additions & 3 deletions p2p/validators.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,11 +76,11 @@ type LatestValsetValidator struct{}

// Validate runs stateless checks on the provided valset key and value.
func (lcv LatestValsetValidator) Validate(key string, value []byte) error {
vs, err := types.UnmarshalValset(value)
vs, err := types.UnmarshalLatestValset(value)
if err != nil {
return err
}
if types.IsEmptyValset(vs) {
if types.IsEmptyLatestValset(vs) {
return ErrEmptyValset
}
if key != GetLatestValsetKey() {
Expand All @@ -102,7 +102,7 @@ func (lcv LatestValsetValidator) Select(key string, values [][]byte) (int, error
latestNonce := uint64(0)
latestIndex := 0
for index, value := range values {
valset, err := types.UnmarshalValset(value)
valset, err := types.UnmarshalLatestValset(value)
if err != nil {
return 0, err
}
Expand Down
26 changes: 12 additions & 14 deletions p2p/validators_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,6 @@ import (
"math/big"
"testing"

celestiatypes "github.com/celestiaorg/celestia-app/x/qgb/types"

"github.com/celestiaorg/orchestrator-relayer/evm"
"github.com/ethereum/go-ethereum/accounts/keystore"

Expand Down Expand Up @@ -453,7 +451,7 @@ func TestDataCommitmentConfirmSelect(t *testing.T) {
}

func TestLatestValsetValidatorValidate(t *testing.T) {
emptyVs, _ := types.MarshalValset(celestiatypes.Valset{})
emptyVs, _ := types.MarshalLatestValset(types.LatestValset{})
tests := []struct {
name string
key string
Expand All @@ -463,13 +461,13 @@ func TestLatestValsetValidatorValidate(t *testing.T) {
{
name: "valid key and value",
key: GetLatestValsetKey(),
value: []byte(`{"nonce":10,"members":[{"power":100,"evm_address":"evm_addr1"}],"height":5,"time":"1970-01-01T01:00:00.00001+01:00"}`),
value: []byte(`{"nonce":10,"members":[{"power":100,"evm_address":"evm_addr1"}],"height":5}`),
wantErr: false,
},
{
name: "invalid key",
key: "invalid_key",
value: []byte(`{"nonce":10,"members":[{"power":100,"evm_address":"evm_addr1"}],"height":5,"time":"1970-01-01T01:00:00.00001+01:00"}`),
value: []byte(`{"nonce":10,"members":[{"power":100,"evm_address":"evm_addr1"}],"height":5}`),
wantErr: true,
},
{
Expand All @@ -481,7 +479,7 @@ func TestLatestValsetValidatorValidate(t *testing.T) {
{
name: "invalid value",
key: GetLatestValsetKey(),
value: []byte(`{"nonce":"invalid nonce","members":[{"power":100,"evm_address":"evm_addr1"}],"height":5,"time":"1970-01-01T01:00:00.00001+01:00"}`),
value: []byte(`{"nonce":"invalid nonce","members":[{"power":100,"evm_address":"evm_addr1"}],"height":5"}`),
wantErr: true,
},
}
Expand Down Expand Up @@ -517,17 +515,17 @@ func TestLatestValsetValidatorSelect(t *testing.T) {
{
name: "single value",
key: GetLatestValsetKey(),
values: [][]byte{[]byte(`{"nonce":10,"members":[{"power":100,"evm_address":"evm_addr1"}],"height":5,"time":"1970-01-01T01:00:00.00001+01:00"}`)},
values: [][]byte{[]byte(`{"nonce":10,"members":[{"power":100,"evm_address":"evm_addr1"}],"height":5}`)},
wantErr: false,
index: 0,
},
{
name: "multiple values and last is latest",
key: GetLatestValsetKey(),
values: [][]byte{
[]byte(`{"nonce":10,"members":[{"power":100,"evm_address":"evm_addr1"}],"height":5,"time":"1970-01-01T01:00:00.00001+01:00"}`),
[]byte(`{"nonce":11,"members":[{"power":100,"evm_address":"evm_addr1"}],"height":5,"time":"1970-01-01T01:00:00.00001+01:00"}`),
[]byte(`{"nonce":12,"members":[{"power":100,"evm_address":"evm_addr1"}],"height":5,"time":"1970-01-01T01:00:00.00001+01:00"}`),
[]byte(`{"nonce":10,"members":[{"power":100,"evm_address":"evm_addr1"}],"height":5}`),
[]byte(`{"nonce":11,"members":[{"power":100,"evm_address":"evm_addr1"}],"height":5}`),
[]byte(`{"nonce":12,"members":[{"power":100,"evm_address":"evm_addr1"}],"height":5}`),
},
wantErr: false,
index: 2,
Expand All @@ -536,16 +534,16 @@ func TestLatestValsetValidatorSelect(t *testing.T) {
name: "multiple values and middle one is invalid",
key: GetLatestValsetKey(),
values: [][]byte{
[]byte(`{"nonce":10,"members":[{"power":100,"evm_address":"evm_addr1"}],"height":5,"time":"1970-01-01T01:00:00.00001+01:00"}`),
[]byte(`{"nonce":"invalid nonce","members":[{"power":100,"evm_address":"evm_addr1"}],"height":5,"time":"1970-01-01T01:00:00.00001+01:00"}`),
[]byte(`{"nonce":12,"members":[{"power":100,"evm_address":"evm_addr1"}],"height":5,"time":"1970-01-01T01:00:00.00001+01:00"}`),
[]byte(`{"nonce":10,"members":[{"power":100,"evm_address":"evm_addr1"}],"height":5}`),
[]byte(`{"nonce":"invalid nonce","members":[{"power":100,"evm_address":"evm_addr1"}],"height":5}`),
[]byte(`{"nonce":12,"members":[{"power":100,"evm_address":"evm_addr1"}],"height":5}`),
},
wantErr: true,
},
{
name: "invalid key",
key: "invalid key",
values: [][]byte{[]byte(`{"nonce":10,"members":[{"power":100,"evm_address":"evm_addr1"}],"height":5,"time":"1970-01-01T01:00:00.00001+01:00"}`)},
values: [][]byte{[]byte(`{"nonce":10,"members":[{"power":100,"evm_address":"evm_addr1"}],"height":5}`)},
wantErr: true,
},
}
Expand Down
3 changes: 2 additions & 1 deletion relayer/relayer.go
Original file line number Diff line number Diff line change
Expand Up @@ -144,10 +144,11 @@ func (r *Relayer) ProcessAttestation(ctx context.Context, opts *bind.TransactOpt
previousValset, err := r.AppQuerier.QueryLastValsetBeforeNonce(ctx, attI.GetNonce())
if err != nil {
r.logger.Error("failed to query the last valset before nonce (probably pruned). recovering via falling back to the P2P network", "err", err.Error())
previousValset, err = r.P2PQuerier.QueryLatestValset(ctx)
latestValset, err := r.P2PQuerier.QueryLatestValset(ctx)
if err != nil {
return nil, err
}
previousValset = latestValset.ToValset()
r.logger.Info("using the latest valset from P2P network. if the valset is malicious, the Blobstream contract will not accept it", "nonce", previousValset.Nonce)
}
switch att := attI.(type) {
Expand Down
2 changes: 1 addition & 1 deletion relayer/relayer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ func TestUseValsetFromP2P(t *testing.T) {

relayer := qgbtesting.NewRelayer(t, node)
go node.EVMChain.PeriodicCommit(ctx, time.Millisecond)
_, _, _, err = relayer.EVMClient.DeployBlobstreamContract(node.EVMChain.Auth, node.EVMChain.Backend, *latestValset, latestValset.Nonce, true)
_, _, _, err = relayer.EVMClient.DeployBlobstreamContract(node.EVMChain.Auth, node.EVMChain.Backend, *latestValset.ToValset(), latestValset.Nonce, true)
require.NoError(t, err)

// make sure the relayer is able to relay the signature using the pruned valset
Expand Down
62 changes: 50 additions & 12 deletions types/latest_valset.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,34 +2,72 @@ package types

import (
"encoding/json"
"time"

"github.com/celestiaorg/celestia-app/x/qgb/types"
)

// MarshalValset Encodes a valset to Json bytes.
func MarshalValset(lv types.Valset) ([]byte, error) {
// LatestValset a replica of the types.Valset to omit marshalling `time` as it bears different results on different machines.
type LatestValset struct {
// Universal nonce defined under:
// https://github.com/celestiaorg/celestia-app/pull/464
Nonce uint64 `json:"nonce,omitempty"`
// List of BridgeValidator containing the current validator set.
Members []types.BridgeValidator `json:"members"`
// Current chain height
Height uint64 `json:"height,omitempty"`
}

func (v LatestValset) ToValset() *types.Valset {
return &types.Valset{
Nonce: v.Nonce,
Members: v.Members,
Height: v.Height,
Time: time.UnixMicro(1), // it's alright to put an arbitrary value in here since the time is not used in hash creation nor the threshold.
}
}

func ToLatestValset(vs types.Valset) *LatestValset {
return &LatestValset{
Nonce: vs.Nonce,
Members: vs.Members,
Height: vs.Height,
}
}

// MarshalLatestValset Encodes a valset to Json bytes.
func MarshalLatestValset(lv LatestValset) ([]byte, error) {
encoded, err := json.Marshal(lv)
if err != nil {
return nil, err
}
return encoded, nil
}

// UnmarshalValset Decodes a valset from Json bytes.
func UnmarshalValset(encoded []byte) (types.Valset, error) {
var valset types.Valset
// UnmarshalLatestValset Decodes a valset from Json bytes.
func UnmarshalLatestValset(encoded []byte) (LatestValset, error) {
var valset LatestValset
err := json.Unmarshal(encoded, &valset)
if err != nil {
return types.Valset{}, err
return LatestValset{}, err
}
return valset, nil
}

// IsEmptyValset takes a valset and checks if it is empty.
func IsEmptyValset(valset types.Valset) bool {
// IsEmptyLatestValset takes a valset and checks if it is empty.
func IsEmptyLatestValset(latestValset LatestValset) bool {
emptyVs := types.Valset{}
return valset.Time.Equal(emptyVs.Time) &&
valset.Nonce == emptyVs.Nonce &&
valset.Height == emptyVs.Height &&
len(valset.Members) == 0
return latestValset.Nonce == emptyVs.Nonce &&
latestValset.Height == emptyVs.Height &&
len(latestValset.Members) == 0
}

func IsValsetEqualToLatestValset(vs types.Valset, lvs LatestValset) bool {
for index, value := range vs.Members {
if value.EvmAddress != lvs.Members[index].EvmAddress ||
value.Power != lvs.Members[index].Power {
return false
}
}
return vs.Nonce == lvs.Nonce && vs.Height == lvs.Height
}
Loading

0 comments on commit 755ff25

Please sign in to comment.