Skip to content

Commit

Permalink
feat(p2p): channels with limit of send and recv rate (#753)
Browse files Browse the repository at this point in the history
* feat(p2p): throttled channel

(cherry picked from commit 5e41958)

* feat: limit mempool broadcast to 5/s

(cherry picked from commit cdde233)

* feat(p2p): channel recv rate limiting - not tested

(cherry picked from commit f7f7ce7)

* feat(p2p): channel recv rate limiting - continued

(cherry picked from commit 54e00f7)

* chore(p2p): regen channel mocks

* feat(config): mempool tx-send-rate-limit, tx-recv-rate-limit, tx-recv-rate-punish-peer

* chore: lint

* chore(mempool): burst recv twice as big as burst send

* chore: lint

* chore: remove not needed log

* chore: fixes after merge
  • Loading branch information
lklimek authored Mar 12, 2024
1 parent 5764ab3 commit 2c28a4f
Show file tree
Hide file tree
Showing 13 changed files with 470 additions and 31 deletions.
18 changes: 18 additions & 0 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -831,6 +831,24 @@ type MempoolConfig struct {
// it's insertion time into the mempool is beyond TTLDuration.
TTLNumBlocks int64 `mapstructure:"ttl-num-blocks"`

// TxSendRateLimit is the rate limit for sending transactions to peers, in
// transactions per second. If zero, the rate limiter is disabled.
//
// Default: 0
TxSendRateLimit float64 `mapstructure:"tx-send-rate-limit"`

// TxRecvRateLimit is the rate limit for receiving transactions from peers, in
// transactions per second. If zero, the rate limiter is disabled.
//
// Default: 0
TxRecvRateLimit float64 `mapstructure:"tx-recv-rate-limit"`

// TxRecvRatePunishPeer set to true means that when the rate limit set in TxRecvRateLimit is reached, the
// peer will be punished (disconnected). If set to false, the peer will be throttled (messages will be dropped).
//
// Default: false
TxRecvRatePunishPeer bool `mapstructure:"tx-recv-rate-punish-peer"`

// TxEnqueueTimeout defines how long new mempool transaction will wait when internal
// processing queue is full (most likely due to busy CheckTx execution).
// Once the timeout is reached, the transaction will be silently dropped.
Expand Down
19 changes: 19 additions & 0 deletions config/toml.go
Original file line number Diff line number Diff line change
Expand Up @@ -433,6 +433,25 @@ ttl-duration = "{{ .Mempool.TTLDuration }}"
# it's insertion time into the mempool is beyond ttl-duration.
ttl-num-blocks = {{ .Mempool.TTLNumBlocks }}
# tx-send-rate-limit is the rate limit for sending transactions to peers, in transactions per second.
# If zero, the rate limiter is disabled.
#
# Default: 0
tx-send-rate-limit = {{ .Mempool.TxSendRateLimit }}
# tx-recv-rate-limit is the rate limit for receiving transactions from peers, in transactions per second.
# If zero, the rate limiter is disabled.
#
# Default: 0
tx-recv-rate-limit = {{ .Mempool.TxRecvRateLimit }}
# tx-recv-rate-punish-peer set to true means that when tx-recv-rate-limit is reached, the peer will be punished
# (disconnected). If set to false, the peer will be throttled (messages will be dropped).
#
# Default: false
tx-recv-rate-punish-peer = {{ .Mempool.TxRecvRatePunishPeer }}
# TxEnqueueTimeout defines how many nanoseconds new mempool transaction (received
# from other nodes) will wait when internal processing queue is full
# (most likely due to busy CheckTx execution).Once the timeout is reached, the transaction
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,6 @@ require (
go.opentelemetry.io/otel v1.8.0 // indirect
go.opentelemetry.io/otel/trace v1.8.0 // indirect
go.tmz.dev/musttag v0.7.2 // indirect
golang.org/x/time v0.1.0 // indirect
google.golang.org/genproto v0.0.0-20221227171554-f9683d7f8bef // indirect
)

Expand Down Expand Up @@ -307,4 +306,5 @@ require (
github.com/tendermint/go-amino v0.16.0
github.com/tyler-smith/go-bip39 v1.1.0
golang.org/x/exp v0.0.0-20240119083558-1b970713d09a
golang.org/x/time v0.1.0
)
139 changes: 118 additions & 21 deletions internal/p2p/channel.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,17 @@ import (
"github.com/gogo/protobuf/proto"
"github.com/rs/zerolog"
sync "github.com/sasha-s/go-deadlock"
"golang.org/x/time/rate"

log "github.com/dashpay/tenderdash/libs/log"
"github.com/dashpay/tenderdash/proto/tendermint/p2p"
"github.com/dashpay/tenderdash/types"
)

var (
ErrRecvRateLimitExceeded = errors.New("receive rate limit exceeded")
)

// Envelope contains a message with sender/receiver routing info.
type Envelope struct {
From types.NodeID // sender (empty if outbound)
Expand Down Expand Up @@ -117,7 +123,7 @@ type Channel interface {

Send(context.Context, Envelope) error
SendError(context.Context, PeerError) error
Receive(context.Context) *ChannelIterator
Receive(context.Context) ChannelIterator
}

// PeerError is a peer error reported via Channel.Error.
Expand Down Expand Up @@ -194,8 +200,8 @@ func (ch *legacyChannel) String() string { return fmt.Sprintf("p2p.Channel<%d:%s

// Receive returns a new unbuffered iterator to receive messages from ch.
// The iterator runs until ctx ends.
func (ch *legacyChannel) Receive(ctx context.Context) *ChannelIterator {
iter := &ChannelIterator{
func (ch *legacyChannel) Receive(ctx context.Context) ChannelIterator {
iter := &channelIterator{
pipe: make(chan Envelope), // unbuffered
}
go func(pipe chan<- Envelope) {
Expand All @@ -216,32 +222,38 @@ func (ch *legacyChannel) Receive(ctx context.Context) *ChannelIterator {
return iter
}

// ChannelIterator provides a context-aware path for callers
// ChannelIterator is an iterator for receiving messages from a Channel.
type ChannelIterator interface {
// Next returns true when the Envelope value has advanced, and false
// when the context is canceled or iteration should stop. If an iterator has returned false,
// it will never return true again.
// in general, use Next, as in:
//
// for iter.Next(ctx) {
// envelope := iter.Envelope()
// // ... do things ...
// }
Next(ctx context.Context) bool
Envelope() *Envelope
}

// channelIterator provides a context-aware path for callers
// (reactors) to process messages from the P2P layer without relying
// on the implementation details of the P2P layer. Channel provides
// access to it's Outbound stream as an iterator, and the
// MergedChannelIterator makes it possible to combine multiple
// channels into a single iterator.
type ChannelIterator struct {
type channelIterator struct {
pipe chan Envelope
current *Envelope
}

// NewChannelIterator returns a new instance of ChannelIterator
func NewChannelIterator(pipe chan Envelope) *ChannelIterator {
return &ChannelIterator{pipe: pipe}
// NewChannelIterator returns a new instance of channelIterator
func NewChannelIterator(pipe chan Envelope) ChannelIterator {
return &channelIterator{pipe: pipe}
}

// Next returns true when the Envelope value has advanced, and false
// when the context is canceled or iteration should stop. If an iterator has returned false,
// it will never return true again.
// in general, use Next, as in:
//
// for iter.Next(ctx) {
// envelope := iter.Envelope()
// // ... do things ...
// }
func (iter *ChannelIterator) Next(ctx context.Context) bool {
func (iter *channelIterator) Next(ctx context.Context) bool {
select {
case <-ctx.Done():
iter.current = nil
Expand All @@ -262,15 +274,15 @@ func (iter *ChannelIterator) Next(ctx context.Context) bool {
// iterator. When the last call to Next returned true, Envelope will
// return a non-nil object. If Next returned false then Envelope is
// always nil.
func (iter *ChannelIterator) Envelope() *Envelope { return iter.current }
func (iter *channelIterator) Envelope() *Envelope { return iter.current }

// MergedChannelIterator produces an iterator that merges the
// messages from the given channels in arbitrary order.
//
// This allows the caller to consume messages from multiple channels
// without needing to manage the concurrency separately.
func MergedChannelIterator(ctx context.Context, chs ...Channel) *ChannelIterator {
iter := &ChannelIterator{
func MergedChannelIterator(ctx context.Context, chs ...Channel) ChannelIterator {
iter := &channelIterator{
pipe: make(chan Envelope), // unbuffered
}
wg := new(sync.WaitGroup)
Expand Down Expand Up @@ -304,3 +316,88 @@ func MergedChannelIterator(ctx context.Context, chs ...Channel) *ChannelIterator

return iter
}

type throttledChannelIterator struct {
innerChan Channel
innerIter ChannelIterator
limiter *rate.Limiter
reportErr bool
logger log.Logger
}

// ThrottledChannelIterator wraps an existing channel iterator with a rate limiter.
//
// ## Arguments
// - ctx: the context in which the iterator will run
// - limiter: the rate limiter to use
// - innerIterator: the underlying iterator to use
// - reportError: if true, errors will be sent to the channel whenever the rate limit is exceeded; otherwise
// the messages will be dropped without error
// - innerChannel: the channel related; errors will be sent to this channel, also used for logging
// - logger: the logger to use
func ThrottledChannelIterator(_ context.Context, limiter *rate.Limiter, innerIterator ChannelIterator,
reportError bool, innerChannel Channel, logger log.Logger) (ChannelIterator, error) {
if innerChannel == nil {
if reportError {
return nil, fmt.Errorf("inner channel is required to report errors")
}
} else {
logger = logger.With("channel", innerChannel)
}

throttledChannelIterator := &throttledChannelIterator{
innerChan: innerChannel,
innerIter: innerIterator,
limiter: limiter,
reportErr: reportError,
logger: logger,
}

return throttledChannelIterator, nil
}

func (tci *throttledChannelIterator) Next(ctx context.Context) bool {
if tci.innerIter == nil {
tci.logger.Error("inner channel iterator is nil", "channel", tci.innerChan)
return false
}

for {
if ctx.Err() != nil {
return false
}

if !tci.innerIter.Next(ctx) {
return false
}

// If the limiter allows the message to be sent, we break the loop
if tci.limiter.Allow() {
break
}
e := tci.innerIter.Envelope()
if tci.reportErr && e != nil {
msg := PeerError{
NodeID: e.From,
Err: ErrRecvRateLimitExceeded,
Fatal: true,
}
if err := tci.innerChan.SendError(ctx, msg); err != nil {
tci.logger.Error("error sending error message", "err", err, "msg", msg)
}
} else {
tci.logger.Trace("dropping message due to rate limit", "channel", tci.innerChan, "rate", tci.limiter.Limit())
}
}

return true
}

func (tci *throttledChannelIterator) Envelope() *Envelope {
if tci.innerIter == nil {
tci.logger.Error("inner channel iterator is nil", "channel", tci.innerChan)
return nil
}

return tci.innerIter.Envelope()
}
6 changes: 6 additions & 0 deletions internal/p2p/channel_params.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"fmt"

"github.com/gogo/protobuf/proto"
"golang.org/x/time/rate"

"github.com/dashpay/tenderdash/config"
"github.com/dashpay/tenderdash/proto/tendermint/blocksync"
Expand Down Expand Up @@ -68,6 +69,11 @@ func ChannelDescriptors(cfg *config.Config) map[ChannelID]*ChannelDescriptor {
RecvMessageCapacity: mempoolBatchSize(cfg.Mempool.MaxTxBytes),
RecvBufferCapacity: 128,
Name: "mempool",
SendRateLimit: rate.Limit(cfg.Mempool.TxSendRateLimit),
SendRateBurst: int(5 * cfg.Mempool.TxSendRateLimit),
RecvRateLimit: rate.Limit(cfg.Mempool.TxRecvRateLimit),
RecvRateBurst: int(10 * cfg.Mempool.TxRecvRateLimit), // twice as big as send, to avoid false punishment
RecvRateShouldErr: cfg.Mempool.TxRecvRatePunishPeer,
EnqueueTimeout: cfg.Mempool.TxEnqueueTimeout,
},
SnapshotChannel: {
Expand Down
2 changes: 1 addition & 1 deletion internal/p2p/client/chanstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ func newChanStore(descriptors map[p2p.ChannelID]*p2p.ChannelDescriptor, creator
return store
}

func (c *chanStore) iter(ctx context.Context, chanIDs ...p2p.ChannelID) (*p2p.ChannelIterator, error) {
func (c *chanStore) iter(ctx context.Context, chanIDs ...p2p.ChannelID) (p2p.ChannelIterator, error) {
chans := make([]p2p.Channel, 0, len(chanIDs))
for _, chanID := range chanIDs {
ch, err := c.get(ctx, chanID)
Expand Down
2 changes: 1 addition & 1 deletion internal/p2p/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -272,7 +272,7 @@ func (c *Client) Consume(ctx context.Context, params ConsumerParams) error {
return c.iter(ctx, iter, params.Handler)
}

func (c *Client) iter(ctx context.Context, iter *p2p.ChannelIterator, handler ConsumerHandler) error {
func (c *Client) iter(ctx context.Context, iter p2p.ChannelIterator, handler ConsumerHandler) error {
for iter.Next(ctx) {
envelope := iter.Envelope()
if isMessageResolvable(envelope.Message) {
Expand Down
6 changes: 3 additions & 3 deletions internal/p2p/client/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -185,7 +185,7 @@ func (suite *ChannelTestSuite) TestConsumeHandle() {
suite.p2pChannel.
On("Receive", ctx).
Once().
Return(func(ctx context.Context) *p2p.ChannelIterator {
Return(func(ctx context.Context) p2p.ChannelIterator {
return p2p.NewChannelIterator(outCh)
})
consumer := newMockConsumer(suite.T())
Expand Down Expand Up @@ -226,7 +226,7 @@ func (suite *ChannelTestSuite) TestConsumeResolve() {
suite.p2pChannel.
On("Receive", ctx).
Once().
Return(func(ctx context.Context) *p2p.ChannelIterator {
Return(func(ctx context.Context) p2p.ChannelIterator {
return p2p.NewChannelIterator(outCh)
})
resCh := suite.client.addPending(reqID)
Expand Down Expand Up @@ -278,7 +278,7 @@ func (suite *ChannelTestSuite) TestConsumeError() {
suite.p2pChannel.
On("Receive", ctx).
Once().
Return(func(ctx context.Context) *p2p.ChannelIterator {
Return(func(ctx context.Context) p2p.ChannelIterator {
return p2p.NewChannelIterator(outCh)
})
consumer := newMockConsumer(suite.T())
Expand Down
13 changes: 13 additions & 0 deletions internal/p2p/conn/connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"time"

sync "github.com/sasha-s/go-deadlock"
"golang.org/x/time/rate"

"github.com/gogo/protobuf/proto"

Expand Down Expand Up @@ -616,6 +617,18 @@ type ChannelDescriptor struct {
// RecvMessageCapacity defines the max message size for a given p2p Channel.
RecvMessageCapacity int

/// SendRateLimit is used to limit the rate of sending messages, per second.
SendRateLimit rate.Limit
SendRateBurst int

/// RecvRateLimit is used to limit the rate of receiving messages, per second.
RecvRateLimit rate.Limit
RecvRateBurst int
// RecvRateShouldErr is used to determine if the rate limiter should
// report an error whenever recv rate limit is exceeded, most likely
// causing the peer to disconnect.
RecvRateShouldErr bool

// RecvBufferCapacity defines the max number of inbound messages for a
// given p2p Channel queue.
RecvBufferCapacity int
Expand Down
8 changes: 4 additions & 4 deletions internal/p2p/mocks/channel.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 6 additions & 0 deletions internal/p2p/router.go
Original file line number Diff line number Diff line change
Expand Up @@ -263,6 +263,12 @@ func (r *Router) OpenChannel(ctx context.Context, chDesc *ChannelDescriptor) (Ch
outCh := make(chan Envelope, chDesc.RecvBufferCapacity)
errCh := make(chan PeerError, chDesc.RecvBufferCapacity)
channel := NewChannel(chDesc.ID, chDesc.Name, queue.dequeue(), outCh, errCh)
if chDesc.SendRateLimit > 0 || chDesc.RecvRateLimit > 0 {
channel = NewThrottledChannel(channel,
chDesc.SendRateLimit, chDesc.SendRateBurst,
chDesc.RecvRateLimit, chDesc.RecvRateBurst, chDesc.RecvRateShouldErr,
r.logger)
}

r.channelQueues[id] = queue

Expand Down
Loading

0 comments on commit 2c28a4f

Please sign in to comment.