Skip to content

Commit

Permalink
Merge branch 'cal/app-version' of github.com:celestiaorg/celestia-cor…
Browse files Browse the repository at this point in the history
…e into cal/app-version
  • Loading branch information
cmwaters committed Sep 27, 2023
2 parents 4b6b788 + 1f7a6bd commit 2e2de0b
Show file tree
Hide file tree
Showing 74 changed files with 1,385 additions and 1,531 deletions.
672 changes: 85 additions & 587 deletions abci/types/types.pb.go

Large diffs are not rendered by default.

26 changes: 19 additions & 7 deletions cmd/cometbft/commands/rollback.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,32 +14,44 @@ import (
"github.com/cometbft/cometbft/store"
)

var removeBlock = false

func init() {
RollbackStateCmd.Flags().BoolVar(&removeBlock, "hard", false, "remove last block as well as state")
}

var RollbackStateCmd = &cobra.Command{
Use: "rollback",
Short: "rollback CometBFT state by one height",
Long: `
A state rollback is performed to recover from an incorrect application state transition,
when CometBFT has persisted an incorrect app hash and is thus unable to make
progress. Rollback overwrites a state at height n with the state at height n - 1.
The application should also roll back to height n - 1. No blocks are removed, so upon
restarting CometBFT the transactions in block n will be re-executed against the
application.
The application should also roll back to height n - 1. If the --hard flag is not used,
no blocks will be removed so upon restarting Tendermint the transactions in block n will be
re-executed against the application. Using --hard will also remove block n. This can
be done multiple times.
`,
RunE: func(cmd *cobra.Command, args []string) error {
height, hash, err := RollbackState(config)
height, hash, err := RollbackState(config, removeBlock)
if err != nil {
return fmt.Errorf("failed to rollback state: %w", err)
}

fmt.Printf("Rolled back state to height %d and hash %v", height, hash)
if removeBlock {
fmt.Printf("Rolled back both state and block to height %d and hash %X\n", height, hash)
} else {
fmt.Printf("Rolled back state to height %d and hash %X\n", height, hash)
}

return nil
},
}

// RollbackState takes the state at the current height n and overwrites it with the state
// at height n - 1. Note state here refers to CometBFT state not application state.
// Returns the latest state height and app hash alongside an error if there was one.
func RollbackState(config *cfg.Config) (int64, []byte, error) {
func RollbackState(config *cfg.Config, removeBlock bool) (int64, []byte, error) {
// use the parsed config to load the block and state store
blockStore, stateStore, err := loadStateAndBlockStore(config)
if err != nil {
Expand All @@ -51,7 +63,7 @@ func RollbackState(config *cfg.Config) (int64, []byte, error) {
}()

// rollback the last state
return state.Rollback(blockStore, stateStore)
return state.Rollback(blockStore, stateStore, removeBlock)
}

func loadStateAndBlockStore(config *cfg.Config) (*store.BlockStore, state.Store, error) {
Expand Down
8 changes: 4 additions & 4 deletions consensus/replay.go
Original file line number Diff line number Diff line change
Expand Up @@ -307,12 +307,12 @@ func (h *Handshaker) ReplayBlocks(
}
validatorSet := types.NewValidatorSet(validators)
nextVals := types.TM2PB.ValidatorUpdates(validatorSet)
csParams := types.TM2PB.ConsensusParams(h.genDoc.ConsensusParams)
pbparams := h.genDoc.ConsensusParams.ToProto()
req := abci.RequestInitChain{
Time: h.genDoc.GenesisTime,
ChainId: h.genDoc.ChainID,
InitialHeight: h.genDoc.InitialHeight,
ConsensusParams: csParams,
ConsensusParams: &pbparams,
Validators: nextVals,
AppStateBytes: h.genDoc.AppState,
}
Expand Down Expand Up @@ -344,8 +344,8 @@ func (h *Handshaker) ReplayBlocks(
}

if res.ConsensusParams != nil {
state.ConsensusParams = types.UpdateConsensusParams(state.ConsensusParams, res.ConsensusParams)
state.Version.Consensus.App = state.ConsensusParams.Version.AppVersion
state.ConsensusParams = state.ConsensusParams.Update(res.ConsensusParams)
state.Version.Consensus.App = state.ConsensusParams.Version.App
}
// We update the last results hash with the empty hash, to conform with RFC-6962.
state.LastResultsHash = merkle.HashFromByteSlices(nil)
Expand Down
6 changes: 4 additions & 2 deletions consensus/replay_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1191,14 +1191,14 @@ func stateAndStore(

type mockBlockStore struct {
config *cfg.Config
params cmtproto.ConsensusParams
params types.ConsensusParams
chain []*types.Block
commits []*types.Commit
base int64
}

// TODO: NewBlockStore(db.NewMemDB) ...
func newMockBlockStore(config *cfg.Config, params cmtproto.ConsensusParams) *mockBlockStore {
func newMockBlockStore(config *cfg.Config, params types.ConsensusParams) *mockBlockStore {
return &mockBlockStore{config, params, nil, nil, 0}
}

Expand Down Expand Up @@ -1241,6 +1241,8 @@ func (bs *mockBlockStore) PruneBlocks(height int64) (uint64, error) {
return pruned, nil
}

func (bs *mockBlockStore) DeleteLatestBlock() error { return nil }

//---------------------------------------
// Test handshake/init chain

Expand Down
5 changes: 3 additions & 2 deletions consensus/state.go
Original file line number Diff line number Diff line change
Expand Up @@ -2250,9 +2250,10 @@ func (cs *State) signVote(
func (cs *State) voteTime() time.Time {
now := cmttime.Now()
minVoteTime := now
// Minimum time increment between blocks
const timeIota = time.Millisecond
// TODO: We should remove next line in case we don't vote for v in case cs.ProposalBlock == nil,
// even if cs.LockedBlock != nil. See https://github.com/cometbft/cometbft/tree/v0.34.x/spec/.
timeIota := time.Duration(cs.state.ConsensusParams.Block.TimeIotaMs) * time.Millisecond
// even if cs.LockedBlock != nil. See https://docs.tendermint.com/master/spec/.
if cs.LockedBlock != nil {
// See the BFT time spec
// https://github.com/cometbft/cometbft/blob/v0.34.x/spec/consensus/bft-time.md
Expand Down
2 changes: 1 addition & 1 deletion evidence/mocks/block_store.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

13 changes: 6 additions & 7 deletions evidence/pool_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ import (
"github.com/cometbft/cometbft/evidence"
"github.com/cometbft/cometbft/evidence/mocks"
"github.com/cometbft/cometbft/libs/log"
cmtproto "github.com/cometbft/cometbft/proto/tendermint/types"
cmtversion "github.com/cometbft/cometbft/proto/tendermint/version"
sm "github.com/cometbft/cometbft/state"
smmocks "github.com/cometbft/cometbft/state/mocks"
Expand Down Expand Up @@ -326,12 +325,12 @@ func TestRecoverPendingEvidence(t *testing.T) {
newStateStore.On("Load").Return(sm.State{
LastBlockTime: defaultEvidenceTime.Add(25 * time.Minute),
LastBlockHeight: height + 15,
ConsensusParams: cmtproto.ConsensusParams{
Block: cmtproto.BlockParams{
ConsensusParams: types.ConsensusParams{
Block: types.BlockParams{
MaxBytes: 22020096,
MaxGas: -1,
},
Evidence: cmtproto.EvidenceParams{
Evidence: types.EvidenceParams{
MaxAgeNumBlocks: 20,
MaxAgeDuration: 20 * time.Minute,
MaxBytes: defaultEvidenceMaxBytes,
Expand Down Expand Up @@ -361,12 +360,12 @@ func initializeStateFromValidatorSet(valSet *types.ValidatorSet, height int64) s
NextValidators: valSet.CopyIncrementProposerPriority(1),
LastValidators: valSet,
LastHeightValidatorsChanged: 1,
ConsensusParams: cmtproto.ConsensusParams{
Block: cmtproto.BlockParams{
ConsensusParams: types.ConsensusParams{
Block: types.BlockParams{
MaxBytes: 22020096,
MaxGas: -1,
},
Evidence: cmtproto.EvidenceParams{
Evidence: types.EvidenceParams{
MaxAgeNumBlocks: 20,
MaxAgeDuration: 20 * time.Minute,
MaxBytes: 1000,
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ require (
github.com/Masterminds/semver/v3 v3.2.0
github.com/btcsuite/btcd/btcec/v2 v2.2.1
github.com/btcsuite/btcd/btcutil v1.1.2
github.com/celestiaorg/nmt v0.18.1
github.com/celestiaorg/nmt v0.20.0
github.com/cometbft/cometbft-db v0.7.0
github.com/go-git/go-git/v5 v5.5.1
github.com/vektra/mockery/v2 v2.14.0
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -150,8 +150,8 @@ github.com/bufbuild/protocompile v0.1.0/go.mod h1:ix/MMMdsT3fzxfw91dvbfzKW3fRRnu
github.com/butuzov/ireturn v0.1.1 h1:QvrO2QF2+/Cx1WA/vETCIYBKtRjc30vesdoPUNo1EbY=
github.com/butuzov/ireturn v0.1.1/go.mod h1:Wh6Zl3IMtTpaIKbmwzqi6olnM9ptYQxxVacMsOEFPoc=
github.com/bwesterb/go-ristretto v1.2.0/go.mod h1:fUIoIZaG73pV5biE2Blr2xEzDoMj7NFEuV9ekS419A0=
github.com/celestiaorg/nmt v0.18.1 h1:zU3apzW4y0fs0ilQA74XnEYW8FvRv0CUK2LXK66L3rA=
github.com/celestiaorg/nmt v0.18.1/go.mod h1:0l8q6UYRju1xNrxtvV6NwPdW3lfsN6KuZ0htRnModdc=
github.com/celestiaorg/nmt v0.20.0 h1:9i7ultZ8Wv5ytt8ZRaxKQ5KOOMo4A2K2T/aPGjIlSas=
github.com/celestiaorg/nmt v0.20.0/go.mod h1:Oz15Ub6YPez9uJV0heoU4WpFctxazuIhKyUtaYNio7E=
github.com/cenkalti/backoff v2.2.1+incompatible h1:tNowT99t7UNflLxfYYSlKYsBpXdEet03Pg2g16Swow4=
github.com/cenkalti/backoff v2.2.1+incompatible/go.mod h1:90ReRw6GdpyfrHakVjL/QHaoyV4aDUVVkXQJJJ3NXXM=
github.com/cenkalti/backoff/v4 v4.1.3 h1:cFAlzYUlVYDysBEH2T5hyJZMh3+5+WCBvSnK6Q8UtC4=
Expand Down
4 changes: 2 additions & 2 deletions light/rpc/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -234,7 +234,7 @@ func (c *Client) ConsensusParams(ctx context.Context, height *int64) (*ctypes.Re
}

// Validate res.
if err := types.ValidateConsensusParams(res.ConsensusParams); err != nil {
if err := res.ConsensusParams.ValidateBasic(); err != nil {
return nil, err
}
if res.BlockHeight <= 0 {
Expand All @@ -248,7 +248,7 @@ func (c *Client) ConsensusParams(ctx context.Context, height *int64) (*ctypes.Re
}

// Verify hash.
if cH, tH := types.HashConsensusParams(res.ConsensusParams), l.ConsensusHash; !bytes.Equal(cH, tH) {
if cH, tH := res.ConsensusParams.Hash(), l.ConsensusHash; !bytes.Equal(cH, tH) {
return nil, fmt.Errorf("params hash %X does not match trusted hash %X",
cH, tH)
}
Expand Down
24 changes: 20 additions & 4 deletions mempool/cat/pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,8 @@ type TxPool struct {
txsAvailable chan struct{} // one value sent per height when mempool is not empty
preCheckFn mempool.PreCheckFunc
postCheckFn mempool.PostCheckFunc
height int64 // the latest height passed to Update
height int64 // the latest height passed to Update
lastPurgeTime time.Time // the last time we attempted to purge transactions via the TTL

// Thread-safe cache of rejected transactions for quick look-up
rejectedTxCache *LRUTxCache
Expand Down Expand Up @@ -186,6 +187,22 @@ func (txmp *TxPool) IsRejectedTx(txKey types.TxKey) bool {
return txmp.rejectedTxCache.Has(txKey)
}

// CheckToPurgeExpiredTxs checks if there has been adequate time since the last time
// the txpool looped through all transactions and if so, performs a purge of any transaction
// that has expired according to the TTLDuration. This is thread safe.
func (txmp *TxPool) CheckToPurgeExpiredTxs() {
txmp.updateMtx.Lock()
defer txmp.updateMtx.Unlock()
if txmp.config.TTLDuration > 0 && time.Since(txmp.lastPurgeTime) > txmp.config.TTLDuration {
expirationAge := time.Now().Add(-txmp.config.TTLDuration)
// a height of 0 means no transactions will be removed because of height
// (in other words, no transaction has a height less than 0)
numExpired := txmp.store.purgeExpiredTxs(0, expirationAge)
txmp.metrics.EvictedTxs.Add(float64(numExpired))
txmp.lastPurgeTime = time.Now()
}
}

// CheckTx adds the given transaction to the mempool if it fits and passes the
// application's ABCI CheckTx method. This should be viewed as the entry method for new transactions
// into the network. In practice this happens via an RPC endpoint
Expand Down Expand Up @@ -464,6 +481,7 @@ func (txmp *TxPool) Update(
if newPostFn != nil {
txmp.postCheckFn = newPostFn
}
txmp.lastPurgeTime = time.Now()
txmp.updateMtx.Unlock()

txmp.metrics.SuccessfulTxs.Add(float64(len(blockTxs)))
Expand Down Expand Up @@ -681,8 +699,6 @@ func (txmp *TxPool) canAddTx(size int64) bool {
// purgeExpiredTxs removes all transactions from the mempool that have exceeded
// their respective height or time-based limits as of the given blockHeight.
// Transactions removed by this operation are not removed from the rejectedTxCache.
//
// The caller must hold txmp.mtx exclusively.
func (txmp *TxPool) purgeExpiredTxs(blockHeight int64) {
if txmp.config.TTLNumBlocks == 0 && txmp.config.TTLDuration == 0 {
return // nothing to do
Expand All @@ -704,7 +720,7 @@ func (txmp *TxPool) purgeExpiredTxs(blockHeight int64) {

// purge old evicted and seen transactions
if txmp.config.TTLDuration == 0 {
// ensure that evictedTxs and seenByPeersSet are eventually pruned
// ensure that seenByPeersSet are eventually pruned
expirationAge = now.Add(-time.Hour)
}
txmp.seenByPeersSet.Prune(expirationAge)
Expand Down
43 changes: 29 additions & 14 deletions mempool/cat/reactor.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,23 +97,38 @@ func (memR *Reactor) SetLogger(l log.Logger) {

// OnStart implements Service.
func (memR *Reactor) OnStart() error {
if memR.opts.ListenOnly {
if !memR.opts.ListenOnly {
go func() {
for {
select {
case <-memR.Quit():
return

// listen in for any newly verified tx via RPC, then immediately
// broadcast it to all connected peers.
case nextTx := <-memR.mempool.next():
memR.broadcastNewTx(nextTx)
}
}
}()
} else {
memR.Logger.Info("Tx broadcasting is disabled")
return nil
}
go func() {
for {
select {
case <-memR.Quit():
return

// listen in for any newly verified tx via RFC, then immediately
// broadcasts it to all connected peers.
case nextTx := <-memR.mempool.next():
memR.broadcastNewTx(nextTx)
// run a separate go routine to check for time based TTLs
if memR.mempool.config.TTLDuration > 0 {
go func() {
ticker := time.NewTicker(memR.mempool.config.TTLDuration)
for {
select {
case <-ticker.C:
memR.mempool.CheckToPurgeExpiredTxs()
case <-memR.Quit():
return
}
}
}
}()
}()
}

return nil
}

Expand Down
23 changes: 23 additions & 0 deletions mempool/cat/reactor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -177,6 +177,29 @@ func TestMempoolVectors(t *testing.T) {
}
}

func TestReactorEventuallyRemovesExpiredTransaction(t *testing.T) {
reactor, _ := setupReactor(t)
reactor.mempool.config.TTLDuration = 100 * time.Millisecond

tx := newDefaultTx("hello")
key := tx.Key()
txMsg := &protomem.Message{
Sum: &protomem.Message_Txs{Txs: &protomem.Txs{Txs: [][]byte{tx}}},
}
txMsgBytes, err := txMsg.Marshal()
require.NoError(t, err)

peer := genPeer()
require.NoError(t, reactor.Start())
reactor.InitPeer(peer)
reactor.Receive(mempool.MempoolChannel, peer, txMsgBytes)
require.True(t, reactor.mempool.Has(key))

// wait for the transaction to expire
time.Sleep(reactor.mempool.config.TTLDuration * 2)
require.False(t, reactor.mempool.Has(key))
}

func TestLegacyReactorReceiveBasic(t *testing.T) {
config := cfg.TestConfig()
// if there were more than two reactors, the order of transactions could not be
Expand Down
16 changes: 15 additions & 1 deletion mempool/v1/mempool.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,8 @@ type TxMempool struct {
txsAvailable chan struct{} // one value sent per height when mempool is not empty
preCheck mempool.PreCheckFunc
postCheck mempool.PostCheckFunc
height int64 // the latest height passed to Update
height int64 // the latest height passed to Update
lastPurgeTime time.Time // the last time we attempted to purge transactions via the TTL

txs *clist.CList // valid transactions (passed CheckTx)
txByKey map[types.TxKey]*clist.CElement
Expand Down Expand Up @@ -723,6 +724,17 @@ func (txmp *TxMempool) canAddTx(wtx *WrappedTx) error {
return nil
}

// CheckToPurgeExpiredTxs checks if there has been adequate time since the last time
// the txpool looped through all transactions and if so, performs a purge of any transaction
// that has expired according to the TTLDuration. This is thread safe.
func (txmp *TxMempool) CheckToPurgeExpiredTxs() {
txmp.mtx.Lock()
defer txmp.mtx.Unlock()
if txmp.config.TTLDuration > 0 && time.Since(txmp.lastPurgeTime) > txmp.config.TTLDuration {
txmp.purgeExpiredTxs(txmp.height)
}
}

// purgeExpiredTxs removes all transactions from the mempool that have exceeded
// their respective height or time-based limits as of the given blockHeight.
// Transactions removed by this operation are not removed from the cache.
Expand Down Expand Up @@ -752,6 +764,8 @@ func (txmp *TxMempool) purgeExpiredTxs(blockHeight int64) {
}
cur = next
}

txmp.lastPurgeTime = now
}

func (txmp *TxMempool) notifyTxsAvailable() {
Expand Down
Loading

0 comments on commit 2e2de0b

Please sign in to comment.