Skip to content
This repository has been archived by the owner on Apr 15, 2024. It is now read-only.

Commit

Permalink
feat: gossiping the latest valset in P2P layer in case they get pruned
Browse files Browse the repository at this point in the history
  • Loading branch information
rach-id committed Oct 25, 2023
1 parent bb0bc42 commit 9270c89
Show file tree
Hide file tree
Showing 15 changed files with 589 additions and 26 deletions.
9 changes: 9 additions & 0 deletions orchestrator/broadcaster.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@ package orchestrator
import (
"context"

types2 "github.com/celestiaorg/celestia-app/x/qgb/types"

"github.com/celestiaorg/orchestrator-relayer/p2p"

"github.com/celestiaorg/orchestrator-relayer/types"
Expand All @@ -29,3 +31,10 @@ func (b Broadcaster) ProvideValsetConfirm(ctx context.Context, nonce uint64, con
}
return b.BlobstreamDHT.PutValsetConfirm(ctx, p2p.GetValsetConfirmKey(nonce, confirm.EthAddress, signBytes), confirm)
}

func (b Broadcaster) ProvideLatestValset(ctx context.Context, valset types2.Valset) error {
if len(b.BlobstreamDHT.RoutingTable().ListPeers()) == 0 {
return ErrEmptyPeersTable
}
return b.BlobstreamDHT.PutLatestValset(ctx, valset)
}
36 changes: 36 additions & 0 deletions orchestrator/broadcaster_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,9 @@ import (
"encoding/hex"
"math/big"
"testing"
"time"

types2 "github.com/celestiaorg/celestia-app/x/qgb/types"

"github.com/celestiaorg/orchestrator-relayer/evm"
"github.com/ethereum/go-ethereum/accounts/keystore"
Expand Down Expand Up @@ -97,6 +100,39 @@ func TestBroadcastValsetConfirm(t *testing.T) {
assert.Equal(t, *expectedConfirm, actualConfirm)
}

func TestBroadcastLatestValset(t *testing.T) {
network := blobstreamtesting.NewDHTNetwork(context.Background(), 4)
defer network.Stop()

// create a test Valset
expectedValset := types2.Valset{
Time: time.UnixMicro(10),
Height: 5,
Members: []types2.BridgeValidator{
{
Power: 100,
EvmAddress: "evm_addr1",
},
{
Power: 200,
EvmAddress: "evm_addr2",
},
},
}

// Broadcast the valset
broadcaster := orchestrator.NewBroadcaster(network.DHTs[1])
err := broadcaster.ProvideLatestValset(context.Background(), expectedValset)
assert.NoError(t, err)

// try to get the valset from another peer
actualConfirm, err := network.DHTs[3].GetLatestValset(context.Background())
assert.NoError(t, err)
assert.NotNil(t, actualConfirm)

assert.Equal(t, expectedValset, actualConfirm)
}

// TestEmptyPeersTable tests that values are not broadcasted if the DHT peers
// table is empty.
func TestEmptyPeersTable(t *testing.T) {
Expand Down
12 changes: 11 additions & 1 deletion orchestrator/orchestrator.go
Original file line number Diff line number Diff line change
Expand Up @@ -279,7 +279,13 @@ func (orch Orchestrator) Process(ctx context.Context, nonce uint64) error {
previousValset, err := orch.AppQuerier.QueryLastValsetBeforeNonce(ctx, att.GetNonce())
if err != nil {
orch.Logger.Debug("failed to query last valset before nonce (most likely pruned). signing anyway", "err", err.Error())
} else if !ValidatorPartOfValset(previousValset.Members, orch.EvmAccount.Address.Hex()) {
}

// add the valset to the p2p network
// it's alright if this fails, we can expect other nodes to do it successfully
_ = orch.Broadcaster.ProvideLatestValset(ctx, *previousValset)

if !ValidatorPartOfValset(previousValset.Members, orch.EvmAccount.Address.Hex()) {
// no need to sign if the orchestrator is not part of the validator set that needs to sign the attestation
orch.Logger.Debug("validator not part of valset. won't sign", "nonce", nonce)
return nil
Expand Down Expand Up @@ -341,6 +347,10 @@ func (orch Orchestrator) Process(ctx context.Context, nonce uint64) error {
}

func (orch Orchestrator) ProcessValsetEvent(ctx context.Context, valset celestiatypes.Valset) error {
// add the valset to the p2p network
// it's alright if this fails, we can expect other nodes to do it successfully
_ = orch.Broadcaster.ProvideLatestValset(ctx, valset)

signBytes, err := valset.SignBytes()
if err != nil {
return err
Expand Down
26 changes: 26 additions & 0 deletions orchestrator/orchestrator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,32 @@ func (s *OrchestratorTestSuite) TestProcessValsetEvent() {
assert.Equal(t, s.Orchestrator.EvmAccount.Address.Hex(), confirm.EthAddress)
}

func (s *OrchestratorTestSuite) TestProcessValsetEventAndProvideValset() {
t := s.T()
_, err := s.Node.CelestiaNetwork.WaitForHeight(50)
require.NoError(t, err)

vs, err := celestiatypes.NewValset(
2,
10,
[]*celestiatypes.InternalBridgeValidator{{
Power: 10,
EVMAddress: s.Orchestrator.EvmAccount.Address,
}},
time.Now(),
)
require.NoError(t, err)

// signing and submitting the signature and also providing the valset event
err = s.Orchestrator.ProcessValsetEvent(s.Node.Context, *vs)
require.NoError(t, err)

// retrieving the valset
actualVs, err := s.Node.DHTNetwork.DHTs[0].GetLatestValset(s.Node.Context)
require.NoError(t, err)
assert.Equal(t, vs.Nonce, actualVs.Nonce)
}

func TestValidatorPartOfValset(t *testing.T) {
tests := []struct {
name string
Expand Down
47 changes: 41 additions & 6 deletions p2p/dht.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,12 @@ import (
"context"
"time"

types2 "github.com/celestiaorg/celestia-app/x/qgb/types"
"github.com/libp2p/go-libp2p-kad-dht/providers"

"github.com/celestiaorg/orchestrator-relayer/types"
ds "github.com/ipfs/go-datastore"
dht "github.com/libp2p/go-libp2p-kad-dht"
"github.com/libp2p/go-libp2p-kad-dht/providers"
"github.com/libp2p/go-libp2p/core/host"
"github.com/libp2p/go-libp2p/core/peer"
tmlog "github.com/tendermint/tendermint/libs/log"
Expand All @@ -17,6 +19,7 @@ const (
ProtocolPrefix = "/blobstream/0.1.0" // TODO "/blobstream/<version>" ?
DataCommitmentConfirmNamespace = "dcc"
ValsetConfirmNamespace = "vc"
LatestValsetNamespace = "lv"
)

// BlobstreamDHT wrapper around the `IpfsDHT` implementation.
Expand All @@ -29,9 +32,9 @@ type BlobstreamDHT struct {
// NewBlobstreamDHT create a new IPFS DHT using a suitable configuration for the Blobstream.
// If nil is passed for bootstrappers, the DHT will not try to connect to any existing peer.
func NewBlobstreamDHT(ctx context.Context, h host.Host, store ds.Batching, bootstrappers []peer.AddrInfo, logger tmlog.Logger) (*BlobstreamDHT, error) {
// this value is set to 23 days, which is the unbonding period.
// we want to have the signatures available for this whole period.
providers.ProvideValidity = time.Hour * 24 * 23
// this values is set to a year, so that even in super-stable networks, we have at least
// one valset in store for a year.
providers.ProvideValidity = time.Hour * 24 * 365

router, err := dht.New(
ctx,
Expand All @@ -41,6 +44,7 @@ func NewBlobstreamDHT(ctx context.Context, h host.Host, store ds.Batching, boots
dht.ProtocolPrefix(ProtocolPrefix),
dht.NamespacedValidator(DataCommitmentConfirmNamespace, DataCommitmentConfirmValidator{}),
dht.NamespacedValidator(ValsetConfirmNamespace, ValsetConfirmValidator{}),
dht.NamespacedValidator(LatestValsetNamespace, LatestValsetValidator{}),
dht.BootstrapPeers(bootstrappers...),
dht.DisableProviders(),
)
Expand Down Expand Up @@ -98,7 +102,7 @@ func (q BlobstreamDHT) WaitForPeers(ctx context.Context, timeout time.Duration,
// and valset confirms. The checks are supposed to be handled by the validators under `p2p/validators.go`.
// Same goes for the Marshal and Unmarshal methods (as long as they're using simple Json encoding).

// PutDataCommitmentConfirm encodes a data commitment confirm then puts its value to the DHT.
// PutDataCommitmentConfirm encodes a data commitment confirm then puts its values to the DHT.
// The key can be generated using the `GetDataCommitmentConfirmKey` method.
// Returns an error if it fails to do so.
func (q BlobstreamDHT) PutDataCommitmentConfirm(ctx context.Context, key string, dcc types.DataCommitmentConfirm) error {
Expand Down Expand Up @@ -128,7 +132,7 @@ func (q BlobstreamDHT) GetDataCommitmentConfirm(ctx context.Context, key string)
return confirm, nil
}

// PutValsetConfirm encodes a valset confirm then puts its value to the DHT.
// PutValsetConfirm encodes a valset confirm then puts its values to the DHT.
// The key can be generated using the `GetValsetConfirmKey` method.
// Returns an error if it fails to do so.
func (q BlobstreamDHT) PutValsetConfirm(ctx context.Context, key string, vc types.ValsetConfirm) error {
Expand Down Expand Up @@ -157,3 +161,34 @@ func (q BlobstreamDHT) GetValsetConfirm(ctx context.Context, key string) (types.
}
return confirm, nil
}

// PutLatestValset encodes a valset then puts its values to the DHT.
// The key will be returned by the `GetValsetKey` method.
// If the valset is not the latest, it will fail.
// Returns an error if it fails to do so.
func (q BlobstreamDHT) PutLatestValset(ctx context.Context, v types2.Valset) error {
encodedData, err := types.MarshalValset(v)
if err != nil {
return err
}
err = q.PutValue(ctx, GetLatestValsetKey(), encodedData)
if err != nil {
return err
}
return nil
}

// GetLatestValset looks for a valset referenced by its key in the DHT.
// The key will be returned by the `GetValsetKey` method.
// Returns an error if it fails to get the valset.
func (q BlobstreamDHT) GetLatestValset(ctx context.Context) (types2.Valset, error) {
encoded, err := q.GetValue(ctx, GetLatestValsetKey()) // this is a blocking call, we should probably use timeout and channel
if err != nil {
return types2.Valset{}, err
}
valset, err := types.UnmarshalValset(encoded)
if err != nil {
return types2.Valset{}, err
}
return valset, nil
}
105 changes: 105 additions & 0 deletions p2p/dht_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ import (
"testing"
"time"

types2 "github.com/celestiaorg/celestia-app/x/qgb/types"

"github.com/celestiaorg/orchestrator-relayer/evm"
"github.com/ethereum/go-ethereum/accounts/keystore"

Expand Down Expand Up @@ -95,6 +97,109 @@ func TestPutDataCommitmentConfirm(t *testing.T) {
assert.Equal(t, expectedConfirm, actualConfirm)
}

func TestPutLatestValset(t *testing.T) {
network := blobstreamtesting.NewDHTNetwork(context.Background(), 2)
defer network.Stop()

// create a test Valset
expectedValset := types2.Valset{
Nonce: 10,
Time: time.UnixMicro(10),
Height: 5,
Members: []types2.BridgeValidator{
{
Power: 100,
EvmAddress: "evm_addr1",
},
{
Power: 200,
EvmAddress: "evm_addr2",
},
},
}

// put the test Valset in the DHT
err := network.DHTs[0].PutLatestValset(context.Background(), expectedValset)
assert.NoError(t, err)

// try to get the latest valset from the same peer
actualValset, err := network.DHTs[0].GetLatestValset(context.Background())
assert.NoError(t, err)
assert.NotNil(t, actualValset)

assert.Equal(t, expectedValset, actualValset)
}

func TestPutMultipleLatestValset(t *testing.T) {
network := blobstreamtesting.NewDHTNetwork(context.Background(), 3)
defer network.Stop()

// create test Valsets
valset1 := types2.Valset{
Nonce: 10,
Time: time.UnixMicro(10),
Height: 5,
Members: []types2.BridgeValidator{
{
Power: 100,
EvmAddress: "evm_addr1",
},
{
Power: 200,
EvmAddress: "evm_addr2",
},
},
}
valset2 := types2.Valset{
Nonce: 11,
Time: time.UnixMicro(10),
Height: 5,
Members: []types2.BridgeValidator{
{
Power: 100,
EvmAddress: "evm_addr1",
},
{
Power: 200,
EvmAddress: "evm_addr2",
},
},
}
valset3 := types2.Valset{
Nonce: 9,
Time: time.UnixMicro(10),
Height: 5,
Members: []types2.BridgeValidator{
{
Power: 100,
EvmAddress: "evm_addr1",
},
{
Power: 200,
EvmAddress: "evm_addr2",
},
},
}

// put the valsets in the DHT
err := network.DHTs[0].PutLatestValset(context.Background(), valset1)
assert.NoError(t, err)

err = network.DHTs[1].PutLatestValset(context.Background(), valset2)
assert.NoError(t, err)

// this one should fail since it puts an older valset than ones in store
err = network.DHTs[2].PutLatestValset(context.Background(), valset3)
assert.Error(t, err)

// try to get the valset from the same peer
actualValset, err := network.DHTs[0].GetLatestValset(context.Background())
assert.NoError(t, err)
assert.NotNil(t, actualValset)

assert.Equal(t, valset2, actualValset)
}

func TestNetworkPutDataCommitmentConfirm(t *testing.T) {
network := blobstreamtesting.NewDHTNetwork(context.Background(), 10)
defer network.Stop()
Expand Down
8 changes: 6 additions & 2 deletions p2p/errors.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package p2p

import "errors"
import (
"errors"
)

var (
ErrPeersTimeout = errors.New("timeout while waiting for peers")
Expand All @@ -13,8 +15,10 @@ var (
ErrNotTheSameEVMAddress = errors.New("not the same evm address")
ErrInvalidConfirmKey = errors.New("invalid confirm key")
ErrNoValues = errors.New("can't select from no values")
ErrNoValidValueFound = errors.New("no valid dht confirm value found")
ErrNoValidValueFound = errors.New("no valid dht confirm values found")
ErrEmptyNamespace = errors.New("empty namespace")
ErrEmptyEVMAddr = errors.New("empty evm address")
ErrEmptyDigest = errors.New("empty digest")
ErrEmptyValset = errors.New("empty valset")
ErrInvalidLatestValsetKey = errors.New("invalid latest valset key")
)
5 changes: 5 additions & 0 deletions p2p/keys.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,11 @@ func GetValsetConfirmKey(nonce uint64, evmAddr string, signBytes string) string
evmAddr + ":" + signBytes
}

// GetLatestValsetKey creates the latest valset key.
func GetLatestValsetKey() string {
return "/" + LatestValsetNamespace + "/" + "latest"
}

// ParseKey parses a key and returns its fields.
// Will return an error if the key is missing some fields, some fields are empty, or otherwise invalid.
func ParseKey(key string) (namespace string, nonce uint64, evmAddr string, digest string, err error) {
Expand Down
11 changes: 11 additions & 0 deletions p2p/querier.go
Original file line number Diff line number Diff line change
Expand Up @@ -318,3 +318,14 @@ func (q Querier) QueryValsetConfirms(ctx context.Context, nonce uint64, valset c
}
return confirms, nil
}

// QueryLatestValset get the latest valset from the p2p network.
func (q Querier) QueryLatestValset(
ctx context.Context,
) (*celestiatypes.Valset, error) {
valset, err := q.BlobstreamDHT.GetLatestValset(ctx)
if err != nil {
return nil, err
}
return &valset, nil
}
Loading

0 comments on commit 9270c89

Please sign in to comment.