Skip to content

Commit

Permalink
mempool: add a safety check, write tests for mempoolIDs (tendermint#3487
Browse files Browse the repository at this point in the history
)

* mempool: add a safety check, write tests for mempoolIDs

and document 65536 limit in the mempool reactor spec

follow-up to tendermint#2778

* rename the test

* fixes after Ismail's review
  • Loading branch information
melekes authored Mar 27, 2019
1 parent 5a25b75 commit 5fa540b
Show file tree
Hide file tree
Showing 8 changed files with 140 additions and 159 deletions.
6 changes: 3 additions & 3 deletions consensus/state_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ import (
cmn "github.com/tendermint/tendermint/libs/common"
"github.com/tendermint/tendermint/libs/log"
tmpubsub "github.com/tendermint/tendermint/libs/pubsub"
p2pdummy "github.com/tendermint/tendermint/p2p/dummy"
p2pmock "github.com/tendermint/tendermint/p2p/mock"
"github.com/tendermint/tendermint/types"
)

Expand Down Expand Up @@ -1548,7 +1548,7 @@ func TestStateHalt1(t *testing.T) {
func TestStateOutputsBlockPartsStats(t *testing.T) {
// create dummy peer
cs, _ := randConsensusState(1)
peer := p2pdummy.NewPeer()
peer := p2pmock.NewPeer(nil)

// 1) new block part
parts := types.NewPartSetFromData(cmn.RandBytes(100), 10)
Expand Down Expand Up @@ -1591,7 +1591,7 @@ func TestStateOutputsBlockPartsStats(t *testing.T) {
func TestStateOutputVoteStats(t *testing.T) {
cs, vss := randConsensusState(2)
// create dummy peer
peer := p2pdummy.NewPeer()
peer := p2pmock.NewPeer(nil)

vote := signVote(vss[1], types.PrecommitType, []byte("test"), types.PartSetHeader{})

Expand Down
8 changes: 7 additions & 1 deletion docs/spec/reactors/mempool/reactor.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,4 +13,10 @@ for details.
Sending incorrectly encoded data or data exceeding `maxMsgSize` will result
in stopping the peer.

The mempool will not send a tx back to any peer which it received it from.
The mempool will not send a tx back to any peer which it received it from.

The reactor assigns an `uint16` number for each peer and maintains a map from
p2p.ID to `uint16`. Each mempool transaction carries a list of all the senders
(`[]uint16`). The list is updated every time mempool receives a transaction it
is already seen. `uint16` assumes that a node will never have over 65535 active
peers (0 is reserved for unknown source - e.g. RPC).
5 changes: 4 additions & 1 deletion mempool/mempool.go
Original file line number Diff line number Diff line change
Expand Up @@ -382,7 +382,10 @@ func (mem *Mempool) CheckTxWithInfo(tx types.Tx, cb func(*abci.Response), txInfo
if !mem.cache.Push(tx) {
// record the sender
e, ok := mem.txsMap[sha256.Sum256(tx)]
if ok { // tx may be in cache, but not in the mempool
// The check is needed because tx may be in cache, but not in the mempool.
// E.g. after we've committed a block, txs are removed from the mempool,
// but not from the cache.
if ok {
memTx := e.Value.(*mempoolTx)
if _, loaded := memTx.senders.LoadOrStore(txInfo.PeerID, true); loaded {
// TODO: consider punishing peer for dups,
Expand Down
24 changes: 18 additions & 6 deletions mempool/reactor.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package mempool

import (
"fmt"
"math"
"reflect"
"sync"
"time"
Expand All @@ -26,6 +27,8 @@ const (
// UnknownPeerID is the peer ID to use when running CheckTx when there is
// no peer (e.g. RPC)
UnknownPeerID uint16 = 0

maxActiveIDs = math.MaxUint16
)

// MempoolReactor handles mempool tx broadcasting amongst peers.
Expand All @@ -45,7 +48,8 @@ type mempoolIDs struct {
activeIDs map[uint16]struct{} // used to check if a given peerID key is used, the value doesn't matter
}

// Reserve searches for the next unused ID and assignes it to the peer.
// Reserve searches for the next unused ID and assignes it to the
// peer.
func (ids *mempoolIDs) ReserveForPeer(peer p2p.Peer) {
ids.mtx.Lock()
defer ids.mtx.Unlock()
Expand All @@ -58,6 +62,10 @@ func (ids *mempoolIDs) ReserveForPeer(peer p2p.Peer) {
// nextPeerID returns the next unused peer ID to use.
// This assumes that ids's mutex is already locked.
func (ids *mempoolIDs) nextPeerID() uint16 {
if len(ids.activeIDs) == maxActiveIDs {
panic(fmt.Sprintf("node has maximum %d active IDs and wanted to get one more", maxActiveIDs))
}

_, idExists := ids.activeIDs[ids.nextID]
for idExists {
ids.nextID++
Expand Down Expand Up @@ -88,16 +96,20 @@ func (ids *mempoolIDs) GetForPeer(peer p2p.Peer) uint16 {
return ids.peerMap[peer.ID()]
}

func newMempoolIDs() *mempoolIDs {
return &mempoolIDs{
peerMap: make(map[p2p.ID]uint16),
activeIDs: map[uint16]struct{}{0: {}},
nextID: 1, // reserve unknownPeerID(0) for mempoolReactor.BroadcastTx
}
}

// NewMempoolReactor returns a new MempoolReactor with the given config and mempool.
func NewMempoolReactor(config *cfg.MempoolConfig, mempool *Mempool) *MempoolReactor {
memR := &MempoolReactor{
config: config,
Mempool: mempool,
ids: &mempoolIDs{
peerMap: make(map[p2p.ID]uint16),
activeIDs: map[uint16]struct{}{0: {}},
nextID: 1, // reserve unknownPeerID(0) for mempoolReactor.BroadcastTx
},
ids: newMempoolIDs(),
}
memR.BaseReactor = *p2p.NewBaseReactor("MempoolReactor", memR)
return memR
Expand Down
35 changes: 35 additions & 0 deletions mempool/reactor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package mempool

import (
"fmt"
"net"
"sync"
"testing"
"time"
Expand All @@ -15,6 +16,7 @@ import (
cfg "github.com/tendermint/tendermint/config"
"github.com/tendermint/tendermint/libs/log"
"github.com/tendermint/tendermint/p2p"
"github.com/tendermint/tendermint/p2p/mock"
"github.com/tendermint/tendermint/proxy"
"github.com/tendermint/tendermint/types"
)
Expand Down Expand Up @@ -189,3 +191,36 @@ func TestBroadcastTxForPeerStopsWhenReactorStops(t *testing.T) {
// i.e. broadcastTxRoutine finishes when reactor is stopped
leaktest.CheckTimeout(t, 10*time.Second)()
}

func TestMempoolIDsBasic(t *testing.T) {
ids := newMempoolIDs()

peer := mock.NewPeer(net.IP{127, 0, 0, 1})

ids.ReserveForPeer(peer)
assert.EqualValues(t, 1, ids.GetForPeer(peer))
ids.Reclaim(peer)

ids.ReserveForPeer(peer)
assert.EqualValues(t, 2, ids.GetForPeer(peer))
ids.Reclaim(peer)
}

func TestMempoolIDsPanicsIfNodeRequestsOvermaxActiveIDs(t *testing.T) {
if testing.Short() {
return
}

// 0 is already reserved for UnknownPeerID
ids := newMempoolIDs()

for i := 0; i < maxActiveIDs-1; i++ {
peer := mock.NewPeer(net.IP{127, 0, 0, 1})
ids.ReserveForPeer(peer)
}

assert.Panics(t, func() {
peer := mock.NewPeer(net.IP{127, 0, 0, 1})
ids.ReserveForPeer(peer)
})
}
100 changes: 0 additions & 100 deletions p2p/dummy/peer.go

This file was deleted.

68 changes: 68 additions & 0 deletions p2p/mock/peer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
package mock

import (
"net"

"github.com/tendermint/tendermint/crypto/ed25519"
cmn "github.com/tendermint/tendermint/libs/common"
"github.com/tendermint/tendermint/p2p"
"github.com/tendermint/tendermint/p2p/conn"
)

type Peer struct {
*cmn.BaseService
ip net.IP
id p2p.ID
addr *p2p.NetAddress
kv map[string]interface{}
Outbound, Persistent bool
}

// NewPeer creates and starts a new mock peer. If the ip
// is nil, random routable address is used.
func NewPeer(ip net.IP) *Peer {
var netAddr *p2p.NetAddress
if ip == nil {
_, netAddr = p2p.CreateRoutableAddr()
} else {
netAddr = p2p.NewNetAddressIPPort(ip, 26656)
}
nodeKey := p2p.NodeKey{PrivKey: ed25519.GenPrivKey()}
netAddr.ID = nodeKey.ID()
mp := &Peer{
ip: ip,
id: nodeKey.ID(),
addr: netAddr,
kv: make(map[string]interface{}),
}
mp.BaseService = cmn.NewBaseService(nil, "MockPeer", mp)
mp.Start()
return mp
}

func (mp *Peer) FlushStop() { mp.Stop() }
func (mp *Peer) TrySend(chID byte, msgBytes []byte) bool { return true }
func (mp *Peer) Send(chID byte, msgBytes []byte) bool { return true }
func (mp *Peer) NodeInfo() p2p.NodeInfo {
return p2p.DefaultNodeInfo{
ID_: mp.addr.ID,
ListenAddr: mp.addr.DialString(),
}
}
func (mp *Peer) Status() conn.ConnectionStatus { return conn.ConnectionStatus{} }
func (mp *Peer) ID() p2p.ID { return mp.id }
func (mp *Peer) IsOutbound() bool { return mp.Outbound }
func (mp *Peer) IsPersistent() bool { return mp.Persistent }
func (mp *Peer) Get(key string) interface{} {
if value, ok := mp.kv[key]; ok {
return value
}
return nil
}
func (mp *Peer) Set(key string, value interface{}) {
mp.kv[key] = value
}
func (mp *Peer) RemoteIP() net.IP { return mp.ip }
func (mp *Peer) OriginalAddr() *p2p.NetAddress { return mp.addr }
func (mp *Peer) RemoteAddr() net.Addr { return &net.TCPAddr{IP: mp.ip, Port: 8800} }
func (mp *Peer) CloseConn() error { return nil }
Loading

0 comments on commit 5fa540b

Please sign in to comment.