Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

prototype: compact blocks #1180

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 5 additions & 4 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -696,9 +696,7 @@ func DefaultFuzzConnConfig() *FuzzConnConfig {
// MempoolConfig defines the configuration options for the CometBFT mempool
type MempoolConfig struct {
// Mempool version to use:
// 1) "v0" - FIFO mempool.
// 2) "v1" - (default) prioritized mempool.
// 3) "v2" - content addressable transaction pool
// 1) "v2" - (default) content addressable transaction pool
Version string `mapstructure:"version"`
// RootDir is the root directory for all data. This should be configured via
// the $CMTHOME env variable or --home cmd flag rather than overriding this
Expand Down Expand Up @@ -767,7 +765,7 @@ type MempoolConfig struct {
// DefaultMempoolConfig returns a default configuration for the CometBFT mempool
func DefaultMempoolConfig() *MempoolConfig {
return &MempoolConfig{
Version: MempoolV1,
Version: MempoolV2,
Recheck: true,
Broadcast: true,
WalPath: "",
Expand Down Expand Up @@ -802,6 +800,9 @@ func (cfg *MempoolConfig) WalEnabled() bool {
// ValidateBasic performs basic validation (checking param bounds, etc.) and
// returns an error if any check fails.
func (cfg *MempoolConfig) ValidateBasic() error {
if cfg.Version != MempoolV2 {
return errors.New("only v2 mempool is supported for compact blocks")
}
if cfg.Size < 0 {
return errors.New("size can't be negative")
}
Expand Down
4 changes: 1 addition & 3 deletions config/toml.go
Original file line number Diff line number Diff line change
Expand Up @@ -343,9 +343,7 @@ dial_timeout = "{{ .P2P.DialTimeout }}"
[mempool]

# Mempool version to use:
# 1) "v0" - FIFO mempool.
# 2) "v1" - (default) prioritized mempool.
# 3) "v2" - content addressable transaction pool
# 1) "v2" - (default) content addressable transaction pool
version = "{{ .Mempool.Version }}"

# Recheck (default: true) defines whether CometBFT should recheck the
Expand Down
61 changes: 25 additions & 36 deletions consensus/byzantine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,8 @@ import (
"github.com/cometbft/cometbft/libs/log"
"github.com/cometbft/cometbft/libs/service"
cmtsync "github.com/cometbft/cometbft/libs/sync"
mempl "github.com/cometbft/cometbft/mempool"

cfg "github.com/cometbft/cometbft/config"
mempoolv2 "github.com/cometbft/cometbft/mempool/cat"
mempoolv0 "github.com/cometbft/cometbft/mempool/v0"
mempoolv1 "github.com/cometbft/cometbft/mempool/v1"
cat "github.com/cometbft/cometbft/mempool/cat"
"github.com/cometbft/cometbft/p2p"
cmtcons "github.com/cometbft/cometbft/proto/tendermint/consensus"
cmtproto "github.com/cometbft/cometbft/proto/tendermint/types"
Expand All @@ -48,6 +44,7 @@ func TestByzantinePrevoteEquivocation(t *testing.T) {

genDoc, privVals := randGenesisDoc(nValidators, false, 30)
css := make([]*State, nValidators)
catReactors := make([]*cat.Reactor, nValidators)

for i := 0; i < nValidators; i++ {
logger := consensusLogger().With("test", "byzantine", "validator", i)
Expand All @@ -72,33 +69,21 @@ func TestByzantinePrevoteEquivocation(t *testing.T) {
proxyAppConnConMem := abcicli.NewLocalClient(mtx, app)

// Make Mempool
var mempool mempl.Mempool

switch thisConfig.Mempool.Version {
case cfg.MempoolV0:
mempool = mempoolv0.NewCListMempool(config.Mempool,
proxyAppConnConMem,
state.LastBlockHeight,
mempoolv0.WithPreCheck(sm.TxPreCheck(state)),
mempoolv0.WithPostCheck(sm.TxPostCheck(state)))
case cfg.MempoolV1:
mempool = mempoolv1.NewTxMempool(logger,
config.Mempool,
proxyAppConnConMem,
state.LastBlockHeight,
mempoolv1.WithPreCheck(sm.TxPreCheck(state)),
mempoolv1.WithPostCheck(sm.TxPostCheck(state)),
)
case cfg.MempoolV2:
mempool = mempoolv2.NewTxPool(
logger,
config.Mempool,
proxyAppConnConMem,
state.LastBlockHeight,
mempoolv2.WithPreCheck(sm.TxPreCheck(state)),
mempoolv2.WithPostCheck(sm.TxPostCheck(state)),
)
}
mempool := cat.NewTxPool(
logger,
config.Mempool,
proxyAppConnConMem,
state.LastBlockHeight,
cat.WithPreCheck(sm.TxPreCheck(state)),
cat.WithPostCheck(sm.TxPostCheck(state)),
)
var err error
catReactors[i], err = cat.NewReactor(mempool, &cat.ReactorOptions{
ListenOnly: !config.Mempool.Broadcast,
MaxTxSize: config.Mempool.MaxTxBytes,
MaxGossipDelay: config.Mempool.MaxGossipDelay,
})
require.NoError(t, err)

if thisConfig.Consensus.WaitForTxs() {
mempool.EnableTxsAvailable()
Expand All @@ -112,7 +97,7 @@ func TestByzantinePrevoteEquivocation(t *testing.T) {

// Make State
blockExec := sm.NewBlockExecutor(stateStore, log.TestingLogger(), proxyAppConnCon, mempool, evpool)
cs := NewState(thisConfig.Consensus, state, blockExec, blockStore, mempool, evpool)
cs := NewState(thisConfig.Consensus, state, blockExec, blockStore, mempool, catReactors[i], evpool)
cs.SetLogger(cs.Logger)
// set private validator
pv := privVals[i]
Expand Down Expand Up @@ -154,6 +139,7 @@ func TestByzantinePrevoteEquivocation(t *testing.T) {
// make connected switches and start all reactors
p2p.MakeConnectedSwitches(config.P2P, nValidators, func(i int, s *p2p.Switch) *p2p.Switch {
s.AddReactor("CONSENSUS", reactors[i])
s.AddReactor("MEMPOOL", catReactors[i])
s.SetLogger(reactors[i].conS.Logger.With("module", "p2p"))
return s
}, p2p.Connect2Switches)
Expand Down Expand Up @@ -230,9 +216,10 @@ func TestByzantinePrevoteEquivocation(t *testing.T) {
}
proposerAddr := lazyProposer.privValidatorPubKey.Address()

block, blockParts := lazyProposer.blockExec.CreateProposalBlock(
block := lazyProposer.blockExec.CreateProposalBlock(
lazyProposer.Height, lazyProposer.state, commit, proposerAddr,
)
blockParts := block.MakePartSet(types.BlockPartSizeBytes)

// Flush the WAL. Otherwise, we may not recompute the same proposal to sign,
// and the privValidator will refuse to sign anything.
Expand Down Expand Up @@ -480,7 +467,8 @@ func byzantineDecideProposalFunc(t *testing.T, height int64, round int32, cs *St
// Avoid sending on internalMsgQueue and running consensus state.

// Create a new proposal block from state/txs from the mempool.
block1, blockParts1 := cs.createProposalBlock()
block1 := cs.createProposalBlock()
blockParts1 := block1.MakePartSet(types.BlockPartSizeBytes)
polRound := cs.TwoThirdPrevoteRound
propBlockID := types.BlockID{Hash: block1.Hash(), PartSetHeader: blockParts1.Header()}
proposal1 := types.NewProposal(height, round, polRound, propBlockID)
Expand All @@ -495,7 +483,8 @@ func byzantineDecideProposalFunc(t *testing.T, height int64, round int32, cs *St
deliverTxsRange(cs, 0, 1)

// Create a new proposal block from state/txs from the mempool.
block2, blockParts2 := cs.createProposalBlock()
block2 := cs.createProposalBlock()
blockParts2 := block2.MakePartSet(types.BlockPartSizeBytes)
polRound = cs.TwoThirdPrevoteRound
propBlockID = types.BlockID{Hash: block2.Hash(), PartSetHeader: blockParts2.Header()}
proposal2 := types.NewProposal(height, round, polRound, propBlockID)
Expand Down
6 changes: 4 additions & 2 deletions consensus/common_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -207,7 +207,9 @@ func decideProposal(
round int32,
) (proposal *types.Proposal, block *types.Block) {
cs1.mtx.Lock()
block, blockParts := cs1.createProposalBlock()
block = cs1.createProposalBlock()
blockParts := block.MakePartSet(types.BlockPartSizeBytes)

validRound := cs1.TwoThirdPrevoteRound
chainID := cs1.state.ChainID
cs1.mtx.Unlock()
Expand Down Expand Up @@ -447,7 +449,7 @@ func newStateWithConfigAndBlockStore(
}

blockExec := sm.NewBlockExecutor(stateStore, log.TestingLogger(), proxyAppConnCon, mempool, evpool)
cs := NewState(thisConfig.Consensus, state, blockExec, blockStore, mempool, evpool)
cs := NewState(thisConfig.Consensus, state, blockExec, blockStore, mempool, nil, evpool)
cs.SetLogger(log.TestingLogger().With("module", "consensus"))
cs.SetPrivValidator(pv)

Expand Down
50 changes: 50 additions & 0 deletions consensus/metrics.go
Original file line number Diff line number Diff line change
@@ -1,10 +1,14 @@
package consensus

import (
"encoding/json"
"fmt"
"path/filepath"
"strings"
"time"

cstypes "github.com/cometbft/cometbft/consensus/types"
"github.com/cometbft/cometbft/libs/os"
"github.com/go-kit/kit/metrics"
"github.com/go-kit/kit/metrics/discard"

Expand Down Expand Up @@ -290,3 +294,49 @@ func (m *Metrics) MarkStep(s cstypes.RoundStepType) {
}
m.stepStart = time.Now()
}

type JSONMetrics struct {
dir string
interval int
StartTime time.Time
EndTime time.Time
Blocks uint64
Rounds uint64
SentConsensusBytes uint64
SentCompactBlocks uint64
SentCompactBytes uint64
CompactBlockFailures uint64
SentBlockParts uint64
SentBlockPartsBytes uint64
}

func NewJSONMetrics(dir string) *JSONMetrics {
return &JSONMetrics{
dir: dir,
StartTime: time.Now().UTC(),
}
}

func (m *JSONMetrics) Save() {
m.EndTime = time.Now().UTC()
content, err := json.MarshalIndent(m, "", " ")
if err != nil {
panic(err)
}
path := filepath.Join(m.dir, fmt.Sprintf("metrics_%d.json", m.interval))
os.MustWriteFile(path, content, 0644)
m.StartTime = m.EndTime
m.interval++
m.reset()
}

func (m *JSONMetrics) reset() {
m.Blocks = 0
m.Rounds = 0
m.SentConsensusBytes = 0
m.SentBlockParts = 0
m.SentBlockPartsBytes = 0
m.SentCompactBlocks = 0
m.SentCompactBytes = 0
m.CompactBlockFailures = 0
}
2 changes: 1 addition & 1 deletion consensus/reactor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -214,7 +214,7 @@ func TestReactorWithEvidence(t *testing.T) {

// Make State
blockExec := sm.NewBlockExecutor(stateStore, log.TestingLogger(), proxyAppConnCon, mempool, evpool)
cs := NewState(thisConfig.Consensus, state, blockExec, blockStore, mempool, evpool2)
cs := NewState(thisConfig.Consensus, state, blockExec, blockStore, mempool, nil, evpool2)
cs.SetLogger(log.TestingLogger().With("module", "consensus"))
cs.SetPrivValidator(pv)

Expand Down
4 changes: 2 additions & 2 deletions consensus/replay_file.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@ func (pb *playback) replayReset(count int, newStepSub types.Subscription) error
pb.cs.Wait()

newCS := NewState(pb.cs.config, pb.genesisState.Copy(), pb.cs.blockExec,
pb.cs.blockStore, pb.cs.txNotifier, pb.cs.evpool)
pb.cs.blockStore, pb.cs.txNotifier, pb.cs.txFetcher, pb.cs.evpool)
newCS.SetEventBus(pb.cs.eventBus)
newCS.startForReplay()

Expand Down Expand Up @@ -333,7 +333,7 @@ func newConsensusStateForReplay(config cfg.BaseConfig, csConfig *cfg.ConsensusCo
blockExec := sm.NewBlockExecutor(stateStore, log.TestingLogger(), proxyApp.Consensus(), mempool, evpool)

consensusState := NewState(csConfig, state.Copy(), blockExec,
blockStore, mempool, evpool)
blockStore, mempool, nil, evpool)

consensusState.SetEventBus(eventBus)
return consensusState
Expand Down
8 changes: 4 additions & 4 deletions consensus/replay_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -372,7 +372,7 @@ func TestSimulateValidatorsChange(t *testing.T) {
newValidatorTx1 := kvstore.MakeValSetChangeTx(valPubKey1ABCI, testMinPower)
err = assertMempool(css[0].txNotifier).CheckTx(newValidatorTx1, nil, mempl.TxInfo{})
assert.Nil(t, err)
propBlock, _ := css[0].createProposalBlock() // changeProposer(t, cs1, vs2)
propBlock := css[0].createProposalBlock() // changeProposer(t, cs1, vs2)
propBlockParts := propBlock.MakePartSet(partSize)
blockID := types.BlockID{Hash: propBlock.Hash(), PartSetHeader: propBlockParts.Header()}

Expand Down Expand Up @@ -402,7 +402,7 @@ func TestSimulateValidatorsChange(t *testing.T) {
updateValidatorTx1 := kvstore.MakeValSetChangeTx(updatePubKey1ABCI, 25)
err = assertMempool(css[0].txNotifier).CheckTx(updateValidatorTx1, nil, mempl.TxInfo{})
assert.Nil(t, err)
propBlock, _ = css[0].createProposalBlock() // changeProposer(t, cs1, vs2)
propBlock = css[0].createProposalBlock() // changeProposer(t, cs1, vs2)
propBlockParts = propBlock.MakePartSet(partSize)
blockID = types.BlockID{Hash: propBlock.Hash(), PartSetHeader: propBlockParts.Header()}

Expand Down Expand Up @@ -439,7 +439,7 @@ func TestSimulateValidatorsChange(t *testing.T) {
newValidatorTx3 := kvstore.MakeValSetChangeTx(newVal3ABCI, testMinPower)
err = assertMempool(css[0].txNotifier).CheckTx(newValidatorTx3, nil, mempl.TxInfo{})
assert.Nil(t, err)
propBlock, _ = css[0].createProposalBlock() // changeProposer(t, cs1, vs2)
propBlock = css[0].createProposalBlock() // changeProposer(t, cs1, vs2)
propBlockParts = propBlock.MakePartSet(partSize)
blockID = types.BlockID{Hash: propBlock.Hash(), PartSetHeader: propBlockParts.Header()}
newVss := make([]*validatorStub, nVals+1)
Expand Down Expand Up @@ -514,7 +514,7 @@ func TestSimulateValidatorsChange(t *testing.T) {
removeValidatorTx3 := kvstore.MakeValSetChangeTx(newVal3ABCI, 0)
err = assertMempool(css[0].txNotifier).CheckTx(removeValidatorTx3, nil, mempl.TxInfo{})
assert.Nil(t, err)
propBlock, _ = css[0].createProposalBlock() // changeProposer(t, cs1, vs2)
propBlock = css[0].createProposalBlock() // changeProposer(t, cs1, vs2)
propBlockParts = propBlock.MakePartSet(partSize)
blockID = types.BlockID{Hash: propBlock.Hash(), PartSetHeader: propBlockParts.Header()}
newVss = make([]*validatorStub, nVals+3)
Expand Down
Loading
Loading