diff --git a/config/config.go b/config/config.go index b78636fa8..9ad5150f1 100644 --- a/config/config.go +++ b/config/config.go @@ -831,6 +831,24 @@ type MempoolConfig struct { // it's insertion time into the mempool is beyond TTLDuration. TTLNumBlocks int64 `mapstructure:"ttl-num-blocks"` + // TxSendRateLimit is the rate limit for sending transactions to peers, in + // transactions per second. If zero, the rate limiter is disabled. + // + // Default: 0 + TxSendRateLimit float64 `mapstructure:"tx-send-rate-limit"` + + // TxRecvRateLimit is the rate limit for receiving transactions from peers, in + // transactions per second. If zero, the rate limiter is disabled. + // + // Default: 0 + TxRecvRateLimit float64 `mapstructure:"tx-recv-rate-limit"` + + // TxRecvRatePunishPeer set to true means that when the rate limit set in TxRecvRateLimit is reached, the + // peer will be punished (disconnected). If set to false, the peer will be throttled (messages will be dropped). + // + // Default: false + TxRecvRatePunishPeer bool `mapstructure:"tx-recv-rate-punish-peer"` + // TxEnqueueTimeout defines how long new mempool transaction will wait when internal // processing queue is full (most likely due to busy CheckTx execution). // Once the timeout is reached, the transaction will be silently dropped. diff --git a/config/toml.go b/config/toml.go index c05ea6fb0..44f8b38ce 100644 --- a/config/toml.go +++ b/config/toml.go @@ -433,6 +433,25 @@ ttl-duration = "{{ .Mempool.TTLDuration }}" # it's insertion time into the mempool is beyond ttl-duration. ttl-num-blocks = {{ .Mempool.TTLNumBlocks }} + +# tx-send-rate-limit is the rate limit for sending transactions to peers, in transactions per second. +# If zero, the rate limiter is disabled. +# +# Default: 0 +tx-send-rate-limit = {{ .Mempool.TxSendRateLimit }} + +# tx-recv-rate-limit is the rate limit for receiving transactions from peers, in transactions per second. +# If zero, the rate limiter is disabled. +# +# Default: 0 +tx-recv-rate-limit = {{ .Mempool.TxRecvRateLimit }} + +# tx-recv-rate-punish-peer set to true means that when tx-recv-rate-limit is reached, the peer will be punished +# (disconnected). If set to false, the peer will be throttled (messages will be dropped). +# +# Default: false +tx-recv-rate-punish-peer = {{ .Mempool.TxRecvRatePunishPeer }} + # TxEnqueueTimeout defines how many nanoseconds new mempool transaction (received # from other nodes) will wait when internal processing queue is full # (most likely due to busy CheckTx execution).Once the timeout is reached, the transaction diff --git a/go.mod b/go.mod index c521ea9e8..0d8c3a5b9 100644 --- a/go.mod +++ b/go.mod @@ -114,7 +114,6 @@ require ( go.opentelemetry.io/otel v1.8.0 // indirect go.opentelemetry.io/otel/trace v1.8.0 // indirect go.tmz.dev/musttag v0.7.2 // indirect - golang.org/x/time v0.1.0 // indirect google.golang.org/genproto v0.0.0-20221227171554-f9683d7f8bef // indirect ) @@ -307,4 +306,5 @@ require ( github.com/tendermint/go-amino v0.16.0 github.com/tyler-smith/go-bip39 v1.1.0 golang.org/x/exp v0.0.0-20240119083558-1b970713d09a + golang.org/x/time v0.1.0 ) diff --git a/internal/p2p/channel.go b/internal/p2p/channel.go index 9e8cb3283..c77b4f39f 100644 --- a/internal/p2p/channel.go +++ b/internal/p2p/channel.go @@ -12,11 +12,17 @@ import ( "github.com/gogo/protobuf/proto" "github.com/rs/zerolog" sync "github.com/sasha-s/go-deadlock" + "golang.org/x/time/rate" + log "github.com/dashpay/tenderdash/libs/log" "github.com/dashpay/tenderdash/proto/tendermint/p2p" "github.com/dashpay/tenderdash/types" ) +var ( + ErrRecvRateLimitExceeded = errors.New("receive rate limit exceeded") +) + // Envelope contains a message with sender/receiver routing info. type Envelope struct { From types.NodeID // sender (empty if outbound) @@ -117,7 +123,7 @@ type Channel interface { Send(context.Context, Envelope) error SendError(context.Context, PeerError) error - Receive(context.Context) *ChannelIterator + Receive(context.Context) ChannelIterator } // PeerError is a peer error reported via Channel.Error. @@ -194,8 +200,8 @@ func (ch *legacyChannel) String() string { return fmt.Sprintf("p2p.Channel<%d:%s // Receive returns a new unbuffered iterator to receive messages from ch. // The iterator runs until ctx ends. -func (ch *legacyChannel) Receive(ctx context.Context) *ChannelIterator { - iter := &ChannelIterator{ +func (ch *legacyChannel) Receive(ctx context.Context) ChannelIterator { + iter := &channelIterator{ pipe: make(chan Envelope), // unbuffered } go func(pipe chan<- Envelope) { @@ -216,32 +222,38 @@ func (ch *legacyChannel) Receive(ctx context.Context) *ChannelIterator { return iter } -// ChannelIterator provides a context-aware path for callers +// ChannelIterator is an iterator for receiving messages from a Channel. +type ChannelIterator interface { + // Next returns true when the Envelope value has advanced, and false + // when the context is canceled or iteration should stop. If an iterator has returned false, + // it will never return true again. + // in general, use Next, as in: + // + // for iter.Next(ctx) { + // envelope := iter.Envelope() + // // ... do things ... + // } + Next(ctx context.Context) bool + Envelope() *Envelope +} + +// channelIterator provides a context-aware path for callers // (reactors) to process messages from the P2P layer without relying // on the implementation details of the P2P layer. Channel provides // access to it's Outbound stream as an iterator, and the // MergedChannelIterator makes it possible to combine multiple // channels into a single iterator. -type ChannelIterator struct { +type channelIterator struct { pipe chan Envelope current *Envelope } -// NewChannelIterator returns a new instance of ChannelIterator -func NewChannelIterator(pipe chan Envelope) *ChannelIterator { - return &ChannelIterator{pipe: pipe} +// NewChannelIterator returns a new instance of channelIterator +func NewChannelIterator(pipe chan Envelope) ChannelIterator { + return &channelIterator{pipe: pipe} } -// Next returns true when the Envelope value has advanced, and false -// when the context is canceled or iteration should stop. If an iterator has returned false, -// it will never return true again. -// in general, use Next, as in: -// -// for iter.Next(ctx) { -// envelope := iter.Envelope() -// // ... do things ... -// } -func (iter *ChannelIterator) Next(ctx context.Context) bool { +func (iter *channelIterator) Next(ctx context.Context) bool { select { case <-ctx.Done(): iter.current = nil @@ -262,15 +274,15 @@ func (iter *ChannelIterator) Next(ctx context.Context) bool { // iterator. When the last call to Next returned true, Envelope will // return a non-nil object. If Next returned false then Envelope is // always nil. -func (iter *ChannelIterator) Envelope() *Envelope { return iter.current } +func (iter *channelIterator) Envelope() *Envelope { return iter.current } // MergedChannelIterator produces an iterator that merges the // messages from the given channels in arbitrary order. // // This allows the caller to consume messages from multiple channels // without needing to manage the concurrency separately. -func MergedChannelIterator(ctx context.Context, chs ...Channel) *ChannelIterator { - iter := &ChannelIterator{ +func MergedChannelIterator(ctx context.Context, chs ...Channel) ChannelIterator { + iter := &channelIterator{ pipe: make(chan Envelope), // unbuffered } wg := new(sync.WaitGroup) @@ -304,3 +316,88 @@ func MergedChannelIterator(ctx context.Context, chs ...Channel) *ChannelIterator return iter } + +type throttledChannelIterator struct { + innerChan Channel + innerIter ChannelIterator + limiter *rate.Limiter + reportErr bool + logger log.Logger +} + +// ThrottledChannelIterator wraps an existing channel iterator with a rate limiter. +// +// ## Arguments +// - ctx: the context in which the iterator will run +// - limiter: the rate limiter to use +// - innerIterator: the underlying iterator to use +// - reportError: if true, errors will be sent to the channel whenever the rate limit is exceeded; otherwise +// the messages will be dropped without error +// - innerChannel: the channel related; errors will be sent to this channel, also used for logging +// - logger: the logger to use +func ThrottledChannelIterator(_ context.Context, limiter *rate.Limiter, innerIterator ChannelIterator, + reportError bool, innerChannel Channel, logger log.Logger) (ChannelIterator, error) { + if innerChannel == nil { + if reportError { + return nil, fmt.Errorf("inner channel is required to report errors") + } + } else { + logger = logger.With("channel", innerChannel) + } + + throttledChannelIterator := &throttledChannelIterator{ + innerChan: innerChannel, + innerIter: innerIterator, + limiter: limiter, + reportErr: reportError, + logger: logger, + } + + return throttledChannelIterator, nil +} + +func (tci *throttledChannelIterator) Next(ctx context.Context) bool { + if tci.innerIter == nil { + tci.logger.Error("inner channel iterator is nil", "channel", tci.innerChan) + return false + } + + for { + if ctx.Err() != nil { + return false + } + + if !tci.innerIter.Next(ctx) { + return false + } + + // If the limiter allows the message to be sent, we break the loop + if tci.limiter.Allow() { + break + } + e := tci.innerIter.Envelope() + if tci.reportErr && e != nil { + msg := PeerError{ + NodeID: e.From, + Err: ErrRecvRateLimitExceeded, + Fatal: true, + } + if err := tci.innerChan.SendError(ctx, msg); err != nil { + tci.logger.Error("error sending error message", "err", err, "msg", msg) + } + } else { + tci.logger.Trace("dropping message due to rate limit", "channel", tci.innerChan, "rate", tci.limiter.Limit()) + } + } + + return true +} + +func (tci *throttledChannelIterator) Envelope() *Envelope { + if tci.innerIter == nil { + tci.logger.Error("inner channel iterator is nil", "channel", tci.innerChan) + return nil + } + + return tci.innerIter.Envelope() +} diff --git a/internal/p2p/channel_params.go b/internal/p2p/channel_params.go index 62c168b2a..14b3676b1 100644 --- a/internal/p2p/channel_params.go +++ b/internal/p2p/channel_params.go @@ -4,6 +4,7 @@ import ( "fmt" "github.com/gogo/protobuf/proto" + "golang.org/x/time/rate" "github.com/dashpay/tenderdash/config" "github.com/dashpay/tenderdash/proto/tendermint/blocksync" @@ -68,6 +69,11 @@ func ChannelDescriptors(cfg *config.Config) map[ChannelID]*ChannelDescriptor { RecvMessageCapacity: mempoolBatchSize(cfg.Mempool.MaxTxBytes), RecvBufferCapacity: 128, Name: "mempool", + SendRateLimit: rate.Limit(cfg.Mempool.TxSendRateLimit), + SendRateBurst: int(5 * cfg.Mempool.TxSendRateLimit), + RecvRateLimit: rate.Limit(cfg.Mempool.TxRecvRateLimit), + RecvRateBurst: int(10 * cfg.Mempool.TxRecvRateLimit), // twice as big as send, to avoid false punishment + RecvRateShouldErr: cfg.Mempool.TxRecvRatePunishPeer, EnqueueTimeout: cfg.Mempool.TxEnqueueTimeout, }, SnapshotChannel: { diff --git a/internal/p2p/client/chanstore.go b/internal/p2p/client/chanstore.go index e49c3994f..e25858f6a 100644 --- a/internal/p2p/client/chanstore.go +++ b/internal/p2p/client/chanstore.go @@ -31,7 +31,7 @@ func newChanStore(descriptors map[p2p.ChannelID]*p2p.ChannelDescriptor, creator return store } -func (c *chanStore) iter(ctx context.Context, chanIDs ...p2p.ChannelID) (*p2p.ChannelIterator, error) { +func (c *chanStore) iter(ctx context.Context, chanIDs ...p2p.ChannelID) (p2p.ChannelIterator, error) { chans := make([]p2p.Channel, 0, len(chanIDs)) for _, chanID := range chanIDs { ch, err := c.get(ctx, chanID) diff --git a/internal/p2p/client/client.go b/internal/p2p/client/client.go index 3ea400909..8aaa4206b 100644 --- a/internal/p2p/client/client.go +++ b/internal/p2p/client/client.go @@ -272,7 +272,7 @@ func (c *Client) Consume(ctx context.Context, params ConsumerParams) error { return c.iter(ctx, iter, params.Handler) } -func (c *Client) iter(ctx context.Context, iter *p2p.ChannelIterator, handler ConsumerHandler) error { +func (c *Client) iter(ctx context.Context, iter p2p.ChannelIterator, handler ConsumerHandler) error { for iter.Next(ctx) { envelope := iter.Envelope() if isMessageResolvable(envelope.Message) { diff --git a/internal/p2p/client/client_test.go b/internal/p2p/client/client_test.go index 5a02123db..50a88c4bb 100644 --- a/internal/p2p/client/client_test.go +++ b/internal/p2p/client/client_test.go @@ -185,7 +185,7 @@ func (suite *ChannelTestSuite) TestConsumeHandle() { suite.p2pChannel. On("Receive", ctx). Once(). - Return(func(ctx context.Context) *p2p.ChannelIterator { + Return(func(ctx context.Context) p2p.ChannelIterator { return p2p.NewChannelIterator(outCh) }) consumer := newMockConsumer(suite.T()) @@ -226,7 +226,7 @@ func (suite *ChannelTestSuite) TestConsumeResolve() { suite.p2pChannel. On("Receive", ctx). Once(). - Return(func(ctx context.Context) *p2p.ChannelIterator { + Return(func(ctx context.Context) p2p.ChannelIterator { return p2p.NewChannelIterator(outCh) }) resCh := suite.client.addPending(reqID) @@ -278,7 +278,7 @@ func (suite *ChannelTestSuite) TestConsumeError() { suite.p2pChannel. On("Receive", ctx). Once(). - Return(func(ctx context.Context) *p2p.ChannelIterator { + Return(func(ctx context.Context) p2p.ChannelIterator { return p2p.NewChannelIterator(outCh) }) consumer := newMockConsumer(suite.T()) diff --git a/internal/p2p/conn/connection.go b/internal/p2p/conn/connection.go index 6602781fe..58fcb1a5f 100644 --- a/internal/p2p/conn/connection.go +++ b/internal/p2p/conn/connection.go @@ -14,6 +14,7 @@ import ( "time" sync "github.com/sasha-s/go-deadlock" + "golang.org/x/time/rate" "github.com/gogo/protobuf/proto" @@ -616,6 +617,18 @@ type ChannelDescriptor struct { // RecvMessageCapacity defines the max message size for a given p2p Channel. RecvMessageCapacity int + /// SendRateLimit is used to limit the rate of sending messages, per second. + SendRateLimit rate.Limit + SendRateBurst int + + /// RecvRateLimit is used to limit the rate of receiving messages, per second. + RecvRateLimit rate.Limit + RecvRateBurst int + // RecvRateShouldErr is used to determine if the rate limiter should + // report an error whenever recv rate limit is exceeded, most likely + // causing the peer to disconnect. + RecvRateShouldErr bool + // RecvBufferCapacity defines the max number of inbound messages for a // given p2p Channel queue. RecvBufferCapacity int diff --git a/internal/p2p/mocks/channel.go b/internal/p2p/mocks/channel.go index b24f2d48a..ae54580e6 100644 --- a/internal/p2p/mocks/channel.go +++ b/internal/p2p/mocks/channel.go @@ -33,19 +33,19 @@ func (_m *Channel) Err() error { } // Receive provides a mock function with given fields: _a0 -func (_m *Channel) Receive(_a0 context.Context) *p2p.ChannelIterator { +func (_m *Channel) Receive(_a0 context.Context) p2p.ChannelIterator { ret := _m.Called(_a0) if len(ret) == 0 { panic("no return value specified for Receive") } - var r0 *p2p.ChannelIterator - if rf, ok := ret.Get(0).(func(context.Context) *p2p.ChannelIterator); ok { + var r0 p2p.ChannelIterator + if rf, ok := ret.Get(0).(func(context.Context) p2p.ChannelIterator); ok { r0 = rf(_a0) } else { if ret.Get(0) != nil { - r0 = ret.Get(0).(*p2p.ChannelIterator) + r0 = ret.Get(0).(p2p.ChannelIterator) } } diff --git a/internal/p2p/router.go b/internal/p2p/router.go index 31830139b..43bdde816 100644 --- a/internal/p2p/router.go +++ b/internal/p2p/router.go @@ -263,6 +263,12 @@ func (r *Router) OpenChannel(ctx context.Context, chDesc *ChannelDescriptor) (Ch outCh := make(chan Envelope, chDesc.RecvBufferCapacity) errCh := make(chan PeerError, chDesc.RecvBufferCapacity) channel := NewChannel(chDesc.ID, chDesc.Name, queue.dequeue(), outCh, errCh) + if chDesc.SendRateLimit > 0 || chDesc.RecvRateLimit > 0 { + channel = NewThrottledChannel(channel, + chDesc.SendRateLimit, chDesc.SendRateBurst, + chDesc.RecvRateLimit, chDesc.RecvRateBurst, chDesc.RecvRateShouldErr, + r.logger) + } r.channelQueues[id] = queue diff --git a/internal/p2p/throttled_channel.go b/internal/p2p/throttled_channel.go new file mode 100644 index 000000000..f82f0374b --- /dev/null +++ b/internal/p2p/throttled_channel.go @@ -0,0 +1,91 @@ +package p2p + +import ( + "context" + + "golang.org/x/time/rate" + + "github.com/dashpay/tenderdash/libs/log" +) + +// / Channel that will block if the send limit is reached +type ThrottledChannel struct { + channel Channel + sendLimiter *rate.Limiter + recvLimiter *rate.Limiter + recvShouldErr bool + + logger log.Logger +} + +// NewThrottledChannel creates a new throttled channel. +// The rate is specified in messages per second. +// The burst is specified in messages. +func NewThrottledChannel(channel Channel, sendLimit rate.Limit, sendBurst int, + recvLimit rate.Limit, recvBurst int, recvShouldErr bool, + logger log.Logger) *ThrottledChannel { + + var ( + sendLimiter *rate.Limiter + recvLimiter *rate.Limiter + ) + if sendLimit > 0 { + sendLimiter = rate.NewLimiter(sendLimit, sendBurst) + } + if recvLimit > 0 { + recvLimiter = rate.NewLimiter(recvLimit, recvBurst) + } + + return &ThrottledChannel{ + channel: channel, + sendLimiter: sendLimiter, + recvLimiter: recvLimiter, + recvShouldErr: recvShouldErr, + logger: logger, + } +} + +var _ Channel = (*ThrottledChannel)(nil) + +func (ch *ThrottledChannel) Send(ctx context.Context, envelope Envelope) error { + // Wait until limiter allows us to proceed. + if ch.sendLimiter != nil { + if err := ch.sendLimiter.Wait(ctx); err != nil { + return err + } + } + + return ch.channel.Send(ctx, envelope) +} + +func (ch *ThrottledChannel) SendError(ctx context.Context, pe PeerError) error { + // Wait until limiter allows us to proceed. + if err := ch.sendLimiter.Wait(ctx); err != nil { + return err + } + + return ch.channel.SendError(ctx, pe) +} + +func (ch *ThrottledChannel) Receive(ctx context.Context) ChannelIterator { + if ch.recvLimiter == nil { + return ch.channel.Receive(ctx) + } + + innerIter := ch.channel.Receive(ctx) + iter, err := ThrottledChannelIterator(ctx, ch.recvLimiter, innerIter, ch.recvShouldErr, ch.channel, ch.logger) + if err != nil { + ch.logger.Error("error creating ThrottledChannelIterator", "err", err) + return nil + } + + return iter +} + +func (ch *ThrottledChannel) Err() error { + return ch.channel.Err() +} + +func (ch *ThrottledChannel) String() string { + return ch.channel.String() +} diff --git a/internal/p2p/throttled_channel_test.go b/internal/p2p/throttled_channel_test.go new file mode 100644 index 000000000..4571d89f1 --- /dev/null +++ b/internal/p2p/throttled_channel_test.go @@ -0,0 +1,189 @@ +package p2p_test + +import ( + "context" + "fmt" + "sync" + "sync/atomic" + "testing" + "time" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "golang.org/x/time/rate" + + "github.com/dashpay/tenderdash/internal/p2p" + "github.com/dashpay/tenderdash/libs/log" +) + +type mockChannel struct { + sent atomic.Uint32 + received atomic.Uint32 + errored atomic.Uint32 +} + +func (c *mockChannel) SendCount() int { + return int(c.sent.Load()) +} + +func (c *mockChannel) RecvCount() int { + return int(c.received.Load()) +} + +func (c *mockChannel) Send(_ context.Context, _e p2p.Envelope) error { + c.sent.Add(1) + return nil +} + +func (c *mockChannel) SendError(_ context.Context, _ p2p.PeerError) error { + c.errored.Add(1) + return nil +} + +func (c *mockChannel) Receive(ctx context.Context) p2p.ChannelIterator { + var pipe = make(chan p2p.Envelope, 5) + + go func() { + for { + select { + case pipe <- p2p.Envelope{}: + c.received.Add(1) + case <-ctx.Done(): + close(pipe) + return + } + } + }() + + return p2p.NewChannelIterator(pipe) +} + +func (c *mockChannel) Err() error { + if e := c.errored.Load(); e > 0 { + return fmt.Errorf("mock_channel_error: error count: %d", e) + } + return nil +} + +func (c *mockChannel) String() string { + return "mock_channel" +} + +func TestThrottledChannelSend(t *testing.T) { + const n = 31 + const rate rate.Limit = 10 + const burst = int(rate) + + ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second) + defer cancel() + + logger := log.NewTestingLogger(t) + + mock := &mockChannel{} + + ch := p2p.NewThrottledChannel(mock, rate, burst, 0, 0, false, logger) // 1 message per second + + wg := sync.WaitGroup{} + start := time.Now() + for i := 0; i < n; i++ { + wg.Add(1) + go func() { + err := ch.Send(ctx, p2p.Envelope{}) + require.NoError(t, err) + wg.Done() + }() + } + + time.Sleep(time.Second) + assert.LessOrEqual(t, mock.SendCount(), burst+int(rate)) + + // Wait until all finish + wg.Wait() + took := time.Since(start) + assert.Equal(t, n, mock.SendCount()) + assert.GreaterOrEqual(t, took.Seconds(), 2.0) +} + +// Given some thrrottled channel that generates messages all the time, we should error out after receiving a rate +// of 10 messages per second. +func TestThrottledChannelRecvError(t *testing.T) { + const rate rate.Limit = 10 + const burst = int(rate) + + ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second) + defer cancel() + + logger := log.NewTestingLogger(t) + + mock := &mockChannel{} + + ch := p2p.NewThrottledChannel(mock, rate, burst, rate, burst, true, logger) // 1 message per second + + start := time.Now() + + assert.NoError(t, mock.Err()) + + iter := ch.Receive(ctx) + + for i := 0; i < burst+int(rate)+1; i++ { + assert.True(t, iter.Next(ctx)) + + e := iter.Envelope() + if e == nil { + t.Error("nil envelope") + } + } + + err := mock.Err() + t.Logf("expected mock error: %v", err) + assert.Error(t, mock.Err()) + + // Wait until all finish + cancel() + + took := time.Since(start) + assert.GreaterOrEqual(t, took.Seconds(), 1.0) +} + +// Given some thrrottled channel that generates messages all the time, we should be able to receive them at a rate +// of 10 messages per second. +func TestThrottledChannelRecv(t *testing.T) { + const rate rate.Limit = 10 + const burst = int(rate) + + ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second) + defer cancel() + + logger := log.NewTestingLogger(t) + + mock := &mockChannel{} + + ch := p2p.NewThrottledChannel(mock, rate, burst, rate, burst, false, logger) // 1 message per second + + start := time.Now() + count := 0 + wg := sync.WaitGroup{} + wg.Add(1) + go func() { + iter := ch.Receive(ctx) + for iter.Next(ctx) { + e := iter.Envelope() + if e == nil { + t.Error("nil envelope") + } + count++ + } + wg.Done() + }() + + time.Sleep(time.Second) + assert.LessOrEqual(t, mock.SendCount(), burst+int(rate)) + + // Wait until all finish + cancel() + wg.Wait() + + took := time.Since(start) + assert.Greater(t, mock.RecvCount(), count*100, "we should generate much more messages than we can receive") + assert.GreaterOrEqual(t, took.Seconds(), 1.0) +}