Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

prototype: compact blocks #1180

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 5 additions & 4 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -696,9 +696,7 @@ func DefaultFuzzConnConfig() *FuzzConnConfig {
// MempoolConfig defines the configuration options for the CometBFT mempool
type MempoolConfig struct {
// Mempool version to use:
// 1) "v0" - FIFO mempool.
// 2) "v1" - (default) prioritized mempool.
// 3) "v2" - content addressable transaction pool
// 1) "v2" - (default) content addressable transaction pool
Version string `mapstructure:"version"`
// RootDir is the root directory for all data. This should be configured via
// the $CMTHOME env variable or --home cmd flag rather than overriding this
Expand Down Expand Up @@ -767,7 +765,7 @@ type MempoolConfig struct {
// DefaultMempoolConfig returns a default configuration for the CometBFT mempool
func DefaultMempoolConfig() *MempoolConfig {
return &MempoolConfig{
Version: MempoolV1,
Version: MempoolV2,
Recheck: true,
Broadcast: true,
WalPath: "",
Expand Down Expand Up @@ -802,6 +800,9 @@ func (cfg *MempoolConfig) WalEnabled() bool {
// ValidateBasic performs basic validation (checking param bounds, etc.) and
// returns an error if any check fails.
func (cfg *MempoolConfig) ValidateBasic() error {
if cfg.Version != MempoolV2 {
return errors.New("only v2 mempool is supported for compact blocks")
}
if cfg.Size < 0 {
return errors.New("size can't be negative")
}
Expand Down
4 changes: 1 addition & 3 deletions config/toml.go
Original file line number Diff line number Diff line change
Expand Up @@ -343,9 +343,7 @@ dial_timeout = "{{ .P2P.DialTimeout }}"
[mempool]

# Mempool version to use:
# 1) "v0" - FIFO mempool.
# 2) "v1" - (default) prioritized mempool.
# 3) "v2" - content addressable transaction pool
# 1) "v2" - (default) content addressable transaction pool
version = "{{ .Mempool.Version }}"

# Recheck (default: true) defines whether CometBFT should recheck the
Expand Down
61 changes: 25 additions & 36 deletions consensus/byzantine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,8 @@ import (
"github.com/cometbft/cometbft/libs/log"
"github.com/cometbft/cometbft/libs/service"
cmtsync "github.com/cometbft/cometbft/libs/sync"
mempl "github.com/cometbft/cometbft/mempool"

cfg "github.com/cometbft/cometbft/config"
mempoolv2 "github.com/cometbft/cometbft/mempool/cat"
mempoolv0 "github.com/cometbft/cometbft/mempool/v0"
mempoolv1 "github.com/cometbft/cometbft/mempool/v1"
cat "github.com/cometbft/cometbft/mempool/cat"
"github.com/cometbft/cometbft/p2p"
cmtcons "github.com/cometbft/cometbft/proto/tendermint/consensus"
cmtproto "github.com/cometbft/cometbft/proto/tendermint/types"
Expand All @@ -48,6 +44,7 @@ func TestByzantinePrevoteEquivocation(t *testing.T) {

genDoc, privVals := randGenesisDoc(nValidators, false, 30)
css := make([]*State, nValidators)
catReactors := make([]*cat.Reactor, nValidators)

for i := 0; i < nValidators; i++ {
logger := consensusLogger().With("test", "byzantine", "validator", i)
Expand All @@ -72,33 +69,21 @@ func TestByzantinePrevoteEquivocation(t *testing.T) {
proxyAppConnConMem := abcicli.NewLocalClient(mtx, app)

// Make Mempool
var mempool mempl.Mempool

switch thisConfig.Mempool.Version {
case cfg.MempoolV0:
mempool = mempoolv0.NewCListMempool(config.Mempool,
proxyAppConnConMem,
state.LastBlockHeight,
mempoolv0.WithPreCheck(sm.TxPreCheck(state)),
mempoolv0.WithPostCheck(sm.TxPostCheck(state)))
case cfg.MempoolV1:
mempool = mempoolv1.NewTxMempool(logger,
config.Mempool,
proxyAppConnConMem,
state.LastBlockHeight,
mempoolv1.WithPreCheck(sm.TxPreCheck(state)),
mempoolv1.WithPostCheck(sm.TxPostCheck(state)),
)
case cfg.MempoolV2:
mempool = mempoolv2.NewTxPool(
logger,
config.Mempool,
proxyAppConnConMem,
state.LastBlockHeight,
mempoolv2.WithPreCheck(sm.TxPreCheck(state)),
mempoolv2.WithPostCheck(sm.TxPostCheck(state)),
)
}
mempool := cat.NewTxPool(
logger,
config.Mempool,
proxyAppConnConMem,
state.LastBlockHeight,
cat.WithPreCheck(sm.TxPreCheck(state)),
cat.WithPostCheck(sm.TxPostCheck(state)),
)
var err error
catReactors[i], err = cat.NewReactor(mempool, &cat.ReactorOptions{
ListenOnly: !config.Mempool.Broadcast,
MaxTxSize: config.Mempool.MaxTxBytes,
MaxGossipDelay: config.Mempool.MaxGossipDelay,
})
require.NoError(t, err)

if thisConfig.Consensus.WaitForTxs() {
mempool.EnableTxsAvailable()
Expand All @@ -112,7 +97,7 @@ func TestByzantinePrevoteEquivocation(t *testing.T) {

// Make State
blockExec := sm.NewBlockExecutor(stateStore, log.TestingLogger(), proxyAppConnCon, mempool, evpool)
cs := NewState(thisConfig.Consensus, state, blockExec, blockStore, mempool, evpool)
cs := NewState(thisConfig.Consensus, state, blockExec, blockStore, mempool, catReactors[i], evpool)
cs.SetLogger(cs.Logger)
// set private validator
pv := privVals[i]
Expand Down Expand Up @@ -154,6 +139,7 @@ func TestByzantinePrevoteEquivocation(t *testing.T) {
// make connected switches and start all reactors
p2p.MakeConnectedSwitches(config.P2P, nValidators, func(i int, s *p2p.Switch) *p2p.Switch {
s.AddReactor("CONSENSUS", reactors[i])
s.AddReactor("MEMPOOL", catReactors[i])
s.SetLogger(reactors[i].conS.Logger.With("module", "p2p"))
return s
}, p2p.Connect2Switches)
Expand Down Expand Up @@ -230,9 +216,10 @@ func TestByzantinePrevoteEquivocation(t *testing.T) {
}
proposerAddr := lazyProposer.privValidatorPubKey.Address()

block, blockParts := lazyProposer.blockExec.CreateProposalBlock(
block := lazyProposer.blockExec.CreateProposalBlock(
lazyProposer.Height, lazyProposer.state, commit, proposerAddr,
)
blockParts := block.MakePartSet(types.BlockPartSizeBytes)

// Flush the WAL. Otherwise, we may not recompute the same proposal to sign,
// and the privValidator will refuse to sign anything.
Expand Down Expand Up @@ -480,7 +467,8 @@ func byzantineDecideProposalFunc(t *testing.T, height int64, round int32, cs *St
// Avoid sending on internalMsgQueue and running consensus state.

// Create a new proposal block from state/txs from the mempool.
block1, blockParts1 := cs.createProposalBlock()
block1 := cs.createProposalBlock()
blockParts1 := block1.MakePartSet(types.BlockPartSizeBytes)
polRound := cs.TwoThirdPrevoteRound
propBlockID := types.BlockID{Hash: block1.Hash(), PartSetHeader: blockParts1.Header()}
proposal1 := types.NewProposal(height, round, polRound, propBlockID)
Expand All @@ -495,7 +483,8 @@ func byzantineDecideProposalFunc(t *testing.T, height int64, round int32, cs *St
deliverTxsRange(cs, 0, 1)

// Create a new proposal block from state/txs from the mempool.
block2, blockParts2 := cs.createProposalBlock()
block2 := cs.createProposalBlock()
blockParts2 := block2.MakePartSet(types.BlockPartSizeBytes)
polRound = cs.TwoThirdPrevoteRound
propBlockID = types.BlockID{Hash: block2.Hash(), PartSetHeader: blockParts2.Header()}
proposal2 := types.NewProposal(height, round, polRound, propBlockID)
Expand Down
6 changes: 4 additions & 2 deletions consensus/common_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -207,7 +207,9 @@ func decideProposal(
round int32,
) (proposal *types.Proposal, block *types.Block) {
cs1.mtx.Lock()
block, blockParts := cs1.createProposalBlock()
block = cs1.createProposalBlock()
blockParts := block.MakePartSet(types.BlockPartSizeBytes)

validRound := cs1.TwoThirdPrevoteRound
chainID := cs1.state.ChainID
cs1.mtx.Unlock()
Expand Down Expand Up @@ -447,7 +449,7 @@ func newStateWithConfigAndBlockStore(
}

blockExec := sm.NewBlockExecutor(stateStore, log.TestingLogger(), proxyAppConnCon, mempool, evpool)
cs := NewState(thisConfig.Consensus, state, blockExec, blockStore, mempool, evpool)
cs := NewState(thisConfig.Consensus, state, blockExec, blockStore, mempool, nil, evpool)
cs.SetLogger(log.TestingLogger().With("module", "consensus"))
cs.SetPrivValidator(pv)

Expand Down
50 changes: 50 additions & 0 deletions consensus/metrics.go
Original file line number Diff line number Diff line change
@@ -1,10 +1,14 @@
package consensus

import (
"encoding/json"
"fmt"
"path/filepath"
"strings"
"time"

cstypes "github.com/cometbft/cometbft/consensus/types"
"github.com/cometbft/cometbft/libs/os"
"github.com/go-kit/kit/metrics"
"github.com/go-kit/kit/metrics/discard"

Expand Down Expand Up @@ -290,3 +294,49 @@ func (m *Metrics) MarkStep(s cstypes.RoundStepType) {
}
m.stepStart = time.Now()
}

type JSONMetrics struct {
dir string
interval int
StartTime time.Time
EndTime time.Time
Blocks uint64
Rounds uint64
SentConsensusBytes uint64
SentCompactBlocks uint64
SentCompactBytes uint64
CompactBlockFailures uint64
SentBlockParts uint64
SentBlockPartsBytes uint64
}

func NewJSONMetrics(dir string) *JSONMetrics {
return &JSONMetrics{
dir: dir,
StartTime: time.Now().UTC(),
}
}

func (m *JSONMetrics) Save() {
m.EndTime = time.Now().UTC()
content, err := json.MarshalIndent(m, "", " ")
if err != nil {
panic(err)
}
path := filepath.Join(m.dir, fmt.Sprintf("metrics_%d.json", m.interval))
os.MustWriteFile(path, content, 0644)
m.StartTime = m.EndTime
m.interval++
m.reset()
}

func (m *JSONMetrics) reset() {
m.Blocks = 0
m.Rounds = 0
m.SentConsensusBytes = 0
m.SentBlockParts = 0
m.SentBlockPartsBytes = 0
m.SentCompactBlocks = 0
m.SentCompactBytes = 0
m.CompactBlockFailures = 0
}
19 changes: 19 additions & 0 deletions consensus/msgs.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,16 @@ func MsgToProto(msg Message) (*cmtcons.Message, error) {
}
return m.Wrap().(*cmtcons.Message), nil

case *CompactBlockMessage:
block, err := msg.Block.ToProto()
if err != nil {
return nil, fmt.Errorf("msg to proto error: %w", err)
}
m := &cmtcons.CompactBlock{
Block: block,
}
return m.Wrap().(*cmtcons.Message), nil

case *BlockPartMessage:
parts, err := msg.Part.ToProto()
if err != nil {
Expand Down Expand Up @@ -188,6 +198,15 @@ func MsgFromProto(p *cmtcons.Message) (Message, error) {
Round: msg.Round,
Part: parts,
}
case *cmtcons.CompactBlock:
block, err := types.BlockFromProto(msg.Block)
if err != nil {
return nil, fmt.Errorf("compactBlock msg to proto error: %w", err)
}
pb = &CompactBlockMessage{
Block: block,
}

case *cmtcons.Vote:
vote, err := types.VoteFromProto(msg.Vote)
if err != nil {
Expand Down
57 changes: 44 additions & 13 deletions consensus/reactor.go
Original file line number Diff line number Diff line change
Expand Up @@ -335,6 +335,9 @@ func (conR *Reactor) ReceiveEnvelope(e p2p.Envelope) {
conR.conS.peerMsgQueue <- msgInfo{msg, e.Src.ID()}
case *ProposalPOLMessage:
ps.ApplyProposalPOLMessage(msg)
case *CompactBlockMessage:
ps.SetHasBlock(msg.Block.Height, ps.PRS.Round)
conR.conS.peerMsgQueue <- msgInfo{msg, e.Src.ID()}
case *BlockPartMessage:
ps.SetHasProposalBlockPart(msg.Height, msg.Round, int(msg.Part.Index))
conR.Metrics.BlockParts.With("peer_id", string(e.Src.ID())).Add(1)
Expand All @@ -358,6 +361,8 @@ func (conR *Reactor) ReceiveEnvelope(e p2p.Envelope) {
ps.EnsureVoteBitArrays(height, valSize)
ps.EnsureVoteBitArrays(height-1, lastCommitSize)
ps.SetHasVote(msg.Vote)
// If we receive a vote we can deduce that an honest peer also has the block
ps.SetHasBlock(msg.Vote.Height, msg.Vote.Round)

cs.peerMsgQueue <- msgInfo{msg, e.Src.ID()}

Expand Down Expand Up @@ -578,25 +583,22 @@ OUTER_LOOP:
rs := conR.getRoundState()
prs := ps.GetRoundState()

// Send proposal Block parts?
if rs.ProposalBlockParts.HasHeader(prs.ProposalBlockPartSetHeader) {
if index, ok := rs.ProposalBlockParts.BitArray().Sub(prs.ProposalBlockParts.Copy()).PickRandom(); ok {
part := rs.ProposalBlockParts.GetPart(index)
parts, err := part.ToProto()
// Send compact block
if !prs.Block && rs.ProposalBlockParts.HasHeader(prs.ProposalBlockPartSetHeader) {
logger.Info("Peer has proposal but not block", "height", prs.Height, "round", prs.Round)
if rs.ProposalCompactBlock != nil {
compactBlock, err := rs.ProposalCompactBlock.ToProto()
if err != nil {
panic(err)
}
logger.Debug("Sending block part", "height", prs.Height, "round", prs.Round)
logger.Info("Sending compact block", "height", prs.Height, "round", prs.Round)
if p2p.SendEnvelopeShim(peer, p2p.Envelope{ //nolint: staticcheck
ChannelID: DataChannel,
Message: &cmtcons.BlockPart{
Height: rs.Height, // This tells peer that this part applies to us.
Round: rs.Round, // This tells peer that this part applies to us.
Part: *parts,
Message: &cmtcons.CompactBlock{
Block: compactBlock,
},
}, logger) {
schema.WriteBlockPart(conR.traceClient, rs.Height, rs.Round, peer.ID(), part.Index, schema.TransferTypeUpload)
ps.SetHasProposalBlockPart(prs.Height, prs.Round, index)
ps.SetHasBlock(prs.Height, prs.Round)
}
continue OUTER_LOOP
}
Expand Down Expand Up @@ -641,7 +643,7 @@ OUTER_LOOP:
if rs.Proposal != nil && !prs.Proposal {
// Proposal: share the proposal metadata with peer.
{
logger.Debug("Sending proposal", "height", prs.Height, "round", prs.Round)
logger.Info("Sending proposal", "height", prs.Height, "round", prs.Round)
if p2p.SendEnvelopeShim(peer, p2p.Envelope{ //nolint: staticcheck
ChannelID: DataChannel,
Message: &cmtcons.Proposal{Proposal: *rs.Proposal.ToProto()},
Expand Down Expand Up @@ -1162,6 +1164,18 @@ func (ps *PeerState) SetHasProposalBlockPart(height int64, round int32, index in
ps.PRS.ProposalBlockParts.SetIndex(index, true)
}

// SetHasCompactBlock sets the given block part index as known for the peer.
func (ps *PeerState) SetHasBlock(height int64, round int32) {
ps.mtx.Lock()
defer ps.mtx.Unlock()

if ps.PRS.Height != height || ps.PRS.Round != round {
return
}

ps.PRS.Block = true
}

// PickSendVote picks a vote and sends it to the peer.
// Returns true if vote was sent.
func (ps *PeerState) PickSendVote(votes types.VoteSetReader) bool {
Expand Down Expand Up @@ -1692,6 +1706,23 @@ func (m *ProposalPOLMessage) String() string {

//-------------------------------------

// CompactBlockMessage is sent when gossipping a piece of the proposed block.
type CompactBlockMessage struct {
Block *types.Block
}

// ValidateBasic performs basic validation.
func (m *CompactBlockMessage) ValidateBasic() error {
return m.Block.ValidateBasic()
}

// String returns a string representation.
func (m *CompactBlockMessage) String() string {
return fmt.Sprintf("[CompactBlock H:%v]", m.Block.Height)
}

//-------------------------------------

// BlockPartMessage is sent when gossipping a piece of the proposed block.
type BlockPartMessage struct {
Height int64
Expand Down
2 changes: 1 addition & 1 deletion consensus/reactor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -214,7 +214,7 @@ func TestReactorWithEvidence(t *testing.T) {

// Make State
blockExec := sm.NewBlockExecutor(stateStore, log.TestingLogger(), proxyAppConnCon, mempool, evpool)
cs := NewState(thisConfig.Consensus, state, blockExec, blockStore, mempool, evpool2)
cs := NewState(thisConfig.Consensus, state, blockExec, blockStore, mempool, nil, evpool2)
cs.SetLogger(log.TestingLogger().With("module", "consensus"))
cs.SetPrivValidator(pv)

Expand Down
Loading
Loading