Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Initial implementation for Row gossiping #443

Closed
wants to merge 25 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
4777a6b
refactor(p2p/ipld): move-in DAHeader Share
Wondertan Jun 21, 2021
8bbbf91
refactor(p2p/ipld): rework PutBlock API to PutData
Wondertan Jun 21, 2021
0077eaa
refactor(p2p/ipld): rework RetrieveData to the latest changes
Wondertan Jun 21, 2021
d1f6619
refactor(p2p/ipld): rework ValidateAvailability to the latest changes
Wondertan Jun 21, 2021
f639855
refactor(p2p/ipld): rework tests and test utilities to the latest API…
Wondertan Jun 21, 2021
a1c6149
refactor(types): Extract DAHeader and Shares
Wondertan Jun 21, 2021
236145f
feat(types): introduce RowSet
Wondertan Jun 21, 2021
b1d0b0f
feat: rely on new RowSet method and new API in p2p/ipld in consensus
Wondertan Jun 21, 2021
613d803
fix: tests and build everywhere
Wondertan Jun 21, 2021
9174d8b
fix(state): update to new p2p/ipld API
Wondertan Jun 27, 2021
a1e21c4
feat(libs/bits): ADd new Ones method to BitArray
Wondertan Jun 23, 2021
04533f1
feat(types): finish RowSet implementation
Wondertan Jun 23, 2021
11f450f
feat(types): update row set implementation with more tests
Wondertan Jun 30, 2021
35c52b4
feat(p2p/ipld): add helper methods to DAHeader to show amount of Rows…
Wondertan Jun 30, 2021
7e011b6
fix(p2p/ipld): fix testing utilities bugs
Wondertan Jun 30, 2021
c27591b
fix(state): block exec: don't return PartSet on CreateProposalBlock
Wondertan Jun 30, 2021
d2bf230
faet(consesnsus): implemenent row gossiping in consensus reactor
Wondertan Jun 30, 2021
bad2b9f
feat(proto): update protos for row block gossiping
Wondertan Jun 30, 2021
a034eeb
feat(consensus): implement row gossiping for consensus state
Wondertan Jun 30, 2021
f33be90
tests(consensus): fix state tests according to row gossiping
Wondertan Jun 30, 2021
e64dee7
tests(consensus): fix byzantine tests
Wondertan Jun 30, 2021
9f04c17
tests(consensus): fix reactor tests
Wondertan Jun 30, 2021
1b8c45b
tests(consensus): fix replay and wal tests
Wondertan Jun 30, 2021
c1c884f
tests(node): fix tests
Wondertan Jun 30, 2021
bb13617
fix(consensus): fix reactor block loading in catchup logic broken aft…
Wondertan Jun 30, 2021
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
220 changes: 44 additions & 176 deletions abci/types/types.pb.go

Large diffs are not rendered by default.

5 changes: 5 additions & 0 deletions blockchain/msgs_test.go
Original file line number Diff line number Diff line change
@@ -1,11 +1,13 @@
package blockchain

import (
"context"
"encoding/hex"
"math"
"testing"

"github.com/gogo/protobuf/proto"
mdutils "github.com/ipfs/go-merkledag/test"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

Expand Down Expand Up @@ -83,6 +85,9 @@ func TestBlockchainMessageVectors(t *testing.T) {
block := types.MakeBlock(int64(3), []types.Tx{types.Tx("Hello World")}, nil, nil, types.Messages{}, nil)
block.Version.Block = 11 // overwrite updated protocol version

_, err := block.RowSet(context.TODO(), mdutils.Mock())
require.NoError(t, err)

bpb, err := block.ToProto()
require.NoError(t, err)

Expand Down
9 changes: 7 additions & 2 deletions blockchain/v0/reactor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"testing"
"time"

mdutils "github.com/ipfs/go-merkledag/test"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

Expand Down Expand Up @@ -302,9 +303,13 @@ func makeTxs(height int64) (txs []types.Tx) {
}

func makeBlock(height int64, state sm.State, lastCommit *types.Commit) *types.Block {
block, _ := state.MakeBlock(height, makeTxs(height), nil,
b := state.MakeBlock(height, makeTxs(height), nil,
nil, types.Messages{}, lastCommit, state.Validators.GetProposer().Address)
return block
_, err := b.RowSet(context.TODO(), mdutils.Mock())
if err != nil {
panic(err)
}
return b
}

type testApp struct {
Expand Down
18 changes: 10 additions & 8 deletions consensus/byzantine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -218,6 +218,7 @@ func TestByzantinePrevoteEquivocation(t *testing.T) {
// B sees a commit, A doesn't.
// Heal partition and ensure A sees the commit
func TestByzantineConflictingProposalsWithPartition(t *testing.T) {
t.Skip("This requires DAHeader in Vote")
N := 4
logger := consensusLogger().With("test", "byzantine")
app := newCounter
Expand Down Expand Up @@ -384,7 +385,7 @@ func byzantineDecideProposalFunc(t *testing.T, height int64, round int32, cs *St
// Avoid sending on internalMsgQueue and running consensus state.

// Create a new proposal block from state/txs from the mempool.
block1, blockParts1 := cs.createProposalBlock()
block1, blockParts1, blockRows1 := cs.createProposalBlock(cs.privValidatorPubKey.Address())
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not asking to do this in this PR but I'm wondering if the blockParts would be removed entirely as part of this work. This also trickles into the storage I guess? As tendermint currently stores the data in parts 🤔

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, that should also affect storing.

polRound, propBlockID := cs.ValidRound, types.BlockID{Hash: block1.Hash(), PartSetHeader: blockParts1.Header()}
proposal1 := types.NewProposal(height, round, polRound, propBlockID, &block1.DataAvailabilityHeader)
p1, err := proposal1.ToProto()
Expand All @@ -399,7 +400,7 @@ func byzantineDecideProposalFunc(t *testing.T, height int64, round int32, cs *St
deliverTxsRange(cs, 0, 1)

// Create a new proposal block from state/txs from the mempool.
block2, blockParts2 := cs.createProposalBlock()
block2, blockParts2, blockRows2 := cs.createProposalBlock(cs.privValidatorPubKey.Address())
polRound, propBlockID = cs.ValidRound, types.BlockID{Hash: block2.Hash(), PartSetHeader: blockParts2.Header()}
proposal2 := types.NewProposal(height, round, polRound, propBlockID, &block2.DataAvailabilityHeader)
p2, err := proposal2.ToProto()
Expand All @@ -418,9 +419,9 @@ func byzantineDecideProposalFunc(t *testing.T, height int64, round int32, cs *St
t.Logf("Byzantine: broadcasting conflicting proposals to %d peers", len(peers))
for i, peer := range peers {
if i < len(peers)/2 {
go sendProposalAndParts(height, round, cs, peer, proposal1, block1Hash, blockParts1)
go sendProposalAndParts(height, round, cs, peer, proposal1, block1Hash, blockParts1, blockRows1)
} else {
go sendProposalAndParts(height, round, cs, peer, proposal2, block2Hash, blockParts2)
go sendProposalAndParts(height, round, cs, peer, proposal2, block2Hash, blockParts2, blockRows2)
}
}
}
Expand All @@ -433,18 +434,19 @@ func sendProposalAndParts(
proposal *types.Proposal,
blockHash []byte,
parts *types.PartSet,
rows *types.RowSet,
) {
// proposal
msg := &ProposalMessage{Proposal: proposal}
peer.Send(DataChannel, MustEncode(msg))

// parts
for i := 0; i < int(parts.Total()); i++ {
part := parts.GetPart(i)
msg := &BlockPartMessage{
for i := 0; i < rows.Total(); i++ {
row := rows.GetRow(i)
msg := &BlockRowMessage{
Height: height, // This tells peer that this part applies to us.
Round: round, // This tells peer that this part applies to us.
Part: part,
Row: row,
}
peer.Send(DataChannel, MustEncode(msg))
}
Expand Down
12 changes: 10 additions & 2 deletions consensus/common_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
abci "github.com/lazyledger/lazyledger-core/abci/types"
cfg "github.com/lazyledger/lazyledger-core/config"
cstypes "github.com/lazyledger/lazyledger-core/consensus/types"
"github.com/lazyledger/lazyledger-core/crypto"
"github.com/lazyledger/lazyledger-core/ipfs"
tmbytes "github.com/lazyledger/lazyledger-core/libs/bytes"
dbm "github.com/lazyledger/lazyledger-core/libs/db"
Expand Down Expand Up @@ -91,6 +92,11 @@ func newValidatorStub(privValidator types.PrivValidator, valIndex int32) *valida
}
}

func (vs *validatorStub) Address() crypto.Address {
pk, _ := vs.GetPubKey()
return pk.Address()
}

func (vs *validatorStub) signVote(
voteType tmproto.SignedMsgType,
hash []byte,
Expand Down Expand Up @@ -194,9 +200,9 @@ func decideProposal(
vs *validatorStub,
height int64,
round int32,
) (proposal *types.Proposal, block *types.Block) {
) (proposal *types.Proposal, block *types.Block, rows *types.RowSet) {
cs1.mtx.Lock()
block, blockParts := cs1.createProposalBlock()
block, blockParts, rows := cs1.createProposalBlock(vs.Address())
validRound := cs1.ValidRound
chainID := cs1.state.ChainID
cs1.mtx.Unlock()
Expand Down Expand Up @@ -599,6 +605,7 @@ func ensureNewUnlock(unlockCh <-chan tmpubsub.Message, height int64, round int32
"Timeout expired while waiting for NewUnlock event")
}

//nolint:unused
func ensureProposal(proposalCh <-chan tmpubsub.Message, height int64, round int32, propID types.BlockID) {
select {
case <-time.After(ensureTimeout):
Expand Down Expand Up @@ -785,6 +792,7 @@ func randConsensusNetWithPeers(
}
}

//nolint:unused
func getSwitchIndex(switches []*p2p.Switch, peer p2p.Peer) int {
for i, s := range switches {
if peer.NodeInfo().ID() == s.NodeInfo().ID() {
Expand Down
55 changes: 30 additions & 25 deletions consensus/msgs.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"github.com/lazyledger/lazyledger-core/libs/bits"
tmmath "github.com/lazyledger/lazyledger-core/libs/math"
"github.com/lazyledger/lazyledger-core/p2p"
"github.com/lazyledger/lazyledger-core/p2p/ipld"
tmcons "github.com/lazyledger/lazyledger-core/proto/tendermint/consensus"
tmproto "github.com/lazyledger/lazyledger-core/proto/tendermint/types"
"github.com/lazyledger/lazyledger-core/types"
Expand All @@ -36,16 +37,20 @@ func MsgToProto(msg Message) (*tmcons.Message, error) {
},
}
case *NewValidBlockMessage:
pbPartSetHeader := msg.BlockPartSetHeader.ToProto()
pbBits := msg.BlockParts.ToProto()
dah, err := msg.BlockDAHeader.ToProto()
if err != nil {
return nil, err
}

pbBits := msg.BlockRows.ToProto()
pb = tmcons.Message{
Sum: &tmcons.Message_NewValidBlock{
NewValidBlock: &tmcons.NewValidBlock{
Height: msg.Height,
Round: msg.Round,
BlockPartSetHeader: pbPartSetHeader,
BlockParts: pbBits,
IsCommit: msg.IsCommit,
Height: msg.Height,
Round: msg.Round,
DaHeader: dah,
BlockParts: pbBits,
IsCommit: msg.IsCommit,
},
},
}
Expand Down Expand Up @@ -73,17 +78,17 @@ func MsgToProto(msg Message) (*tmcons.Message, error) {
},
},
}
case *BlockPartMessage:
parts, err := msg.Part.ToProto()
case *BlockRowMessage:
row, err := msg.Row.ToProto()
if err != nil {
return nil, fmt.Errorf("msg to proto error: %w", err)
}
pb = tmcons.Message{
Sum: &tmcons.Message_BlockPart{
BlockPart: &tmcons.BlockPart{
Sum: &tmcons.Message_BlockRow{
BlockRow: &tmcons.BlockRow{
Height: msg.Height,
Round: msg.Round,
Part: *parts,
Row: row,
},
},
}
Expand Down Expand Up @@ -169,7 +174,7 @@ func MsgFromProto(msg *tmcons.Message) (Message, error) {
LastCommitRound: msg.NewRoundStep.LastCommitRound,
}
case *tmcons.Message_NewValidBlock:
pbPartSetHeader, err := types.PartSetHeaderFromProto(&msg.NewValidBlock.BlockPartSetHeader)
dah, err := ipld.DataAvailabilityHeaderFromProto(msg.NewValidBlock.DaHeader)
if err != nil {
return nil, fmt.Errorf("parts header to proto error: %w", err)
}
Expand All @@ -181,11 +186,11 @@ func MsgFromProto(msg *tmcons.Message) (Message, error) {
}

pb = &NewValidBlockMessage{
Height: msg.NewValidBlock.Height,
Round: msg.NewValidBlock.Round,
BlockPartSetHeader: *pbPartSetHeader,
BlockParts: pbBits,
IsCommit: msg.NewValidBlock.IsCommit,
Height: msg.NewValidBlock.Height,
Round: msg.NewValidBlock.Round,
BlockDAHeader: dah,
BlockRows: pbBits,
IsCommit: msg.NewValidBlock.IsCommit,
}
case *tmcons.Message_Proposal:
pbP, err := types.ProposalFromProto(&msg.Proposal.Proposal)
Expand All @@ -207,15 +212,15 @@ func MsgFromProto(msg *tmcons.Message) (Message, error) {
ProposalPOLRound: msg.ProposalPol.ProposalPolRound,
ProposalPOL: pbBits,
}
case *tmcons.Message_BlockPart:
parts, err := types.PartFromProto(&msg.BlockPart.Part)
case *tmcons.Message_BlockRow:
row, err := types.RowFromProto(msg.BlockRow.Row)
if err != nil {
return nil, fmt.Errorf("blockpart msg to proto error: %w", err)
return nil, fmt.Errorf("block row msg to proto error: %w", err)
}
pb = &BlockPartMessage{
Height: msg.BlockPart.Height,
Round: msg.BlockPart.Round,
Part: parts,
pb = &BlockRowMessage{
Height: msg.BlockRow.Height,
Round: msg.BlockRow.Round,
Row: row,
}
case *tmcons.Message_Vote:
vote, err := types.VoteFromProto(msg.Vote.Vote)
Expand Down
Loading