Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Using minmax heap for Mercury transmission queue to evict oldest transmission correctly #10427

Merged
merged 6 commits into from
Sep 6, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions core/scripts/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,7 @@ require (
github.com/docker/go-units v0.5.0 // indirect
github.com/dustin/go-humanize v1.0.1 // indirect
github.com/dvsekhvalnov/jose2go v1.5.0 // indirect
github.com/esote/minmaxheap v1.0.0 // indirect
github.com/fatih/color v1.15.0 // indirect
github.com/flynn/noise v0.0.0-20180327030543-2492fe189ae6 // indirect
github.com/fsnotify/fsnotify v1.6.0 // indirect
Expand Down
2 changes: 2 additions & 0 deletions core/scripts/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -320,6 +320,8 @@ github.com/envoyproxy/go-control-plane v0.9.9-0.20201210154907-fd9021fe5dad/go.m
github.com/envoyproxy/go-control-plane v0.9.9-0.20210217033140-668b12f5399d/go.mod h1:cXg6YxExXjJnVBQHBLXeUAgxn2UodCpnH306RInaBQk=
github.com/envoyproxy/protoc-gen-validate v0.1.0/go.mod h1:iSmxcyjqTsJpI2R4NaDN7+kN2VEUnK/pcBlmesArF7c=
github.com/envoyproxy/protoc-gen-validate v1.0.2 h1:QkIBuU5k+x7/QXPvPPnWXWlCdaBFApVqftFV6k087DA=
github.com/esote/minmaxheap v1.0.0 h1:rgA7StnXXpZG6qlM0S7pUmEv1KpWe32rYT4x8J8ntaA=
github.com/esote/minmaxheap v1.0.0/go.mod h1:Ln8+i7fS1k3PLgZI2JAo0iA1as95QnIYiGCrqSJ5FZk=
github.com/etcd-io/bbolt v1.3.3/go.mod h1:ZF2nL25h33cCyBtcyWeZ2/I3HQOfTP+0PIEvHjkjCrw=
github.com/ethereum/go-ethereum v1.12.0 h1:bdnhLPtqETd4m3mS8BGMNvBTf36bO5bx/hxE2zljOa0=
github.com/ethereum/go-ethereum v1.12.0/go.mod h1:/oo2X/dZLJjf2mJ6YT9wcWxa4nNJDBKDBU6sFIpx1Gs=
Expand Down
29 changes: 9 additions & 20 deletions core/services/relay/evm/mercury/queue.go
Original file line number Diff line number Diff line change
@@ -1,13 +1,13 @@
package mercury

import (
"container/heap"
"context"
"errors"
"fmt"
"sync"
"time"

heap "github.com/esote/minmaxheap"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"

Expand Down Expand Up @@ -59,11 +59,6 @@ type TransmitQueue struct {
type Transmission struct {
Req *pb.TransmitRequest // the payload to transmit
ReportCtx ocrtypes.ReportContext // contains priority information (latest epoch/round wins)

// The index is needed by update and is maintained by the heap.Interface
// methods
// It should NOT be set manually
index int // the index of the item in the heap
}

// maxlen controls how many items will be stored in the queue
Expand Down Expand Up @@ -97,13 +92,13 @@ func (tq *TransmitQueue) Push(req *pb.TransmitRequest, reportCtx ocrtypes.Report
if tq.maxlen != 0 && tq.pq.Len() == tq.maxlen {
// evict oldest entry to make room
tq.lggr.Criticalf("Transmit queue is full; dropping oldest transmission (reached max length of %d)", tq.maxlen)
removed := heap.Remove(tq.pq, tq.pq.Len()-1)
removed := heap.PopMax(tq.pq)
if transmission, ok := removed.(*Transmission); ok {
tq.asyncDeleter.AsyncDelete(transmission.Req)
}
}

heap.Push(tq.pq, &Transmission{req, reportCtx, -1})
heap.Push(tq.pq, &Transmission{req, reportCtx})
tq.cond.Signal()

return true
Expand Down Expand Up @@ -231,28 +226,22 @@ func (pq priorityQueue) Less(i, j int) bool {
pq[i].ReportCtx.ReportTimestamp.Round > pq[j].ReportCtx.ReportTimestamp.Round
}

func (pq priorityQueue) Swap(i, j int) {
pq[i], pq[j] = pq[j], pq[i]
}

func (pq *priorityQueue) Pop() any {
n := len(*pq)
if n == 0 {
return nil
}
old := *pq
item := old[n-1]
old[n-1] = nil // avoid memory leak
item.index = -1 // for safety
old[n-1] = nil // avoid memory leak
*pq = old[0 : n-1]
return item
}

func (pq *priorityQueue) Push(x any) {
n := len(*pq)
item := x.(*Transmission)
item.index = n
*pq = append(*pq, item)
}

func (pq priorityQueue) Swap(i, j int) {
pq[i], pq[j] = pq[j], pq[i]
pq[i].index = i
pq[j].index = j
*pq = append(*pq, x.(*Transmission))
}
14 changes: 5 additions & 9 deletions core/services/relay/evm/mercury/queue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ import (
"testing"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/require"
"go.uber.org/zap/zapcore"

Expand Down Expand Up @@ -81,7 +80,7 @@ func Test_Queue(t *testing.T) {
})

t.Run("transmit queue is more than 50% full", func(t *testing.T) {
transmitQueue.Push(testTransmissions[2].tr, testTransmissions[0].ctx)
transmitQueue.Push(testTransmissions[2].tr, testTransmissions[2].ctx)
report := transmitQueue.HealthReport()
assert.Equal(t, report[transmitQueue.Name()].Error(), "transmit priority queue is greater than 50% full (4/7)")
})
Expand All @@ -92,21 +91,18 @@ func Test_Queue(t *testing.T) {
})

t.Run("transmit queue is full and evicts the oldest transmission", func(t *testing.T) {
// There is a bug with queue eviction where the evicted transmission is NOT the oldest.
// TODO: MERC-1049 Once this bug is fixed, replace the expectation with the one below.
// deleter.On("AsyncDelete", testTransmissions[0].tr).Once()
deleter.On("AsyncDelete", mock.Anything).Once()
deleter.On("AsyncDelete", testTransmissions[0].tr).Once()

// add 5 more transmissions to overflow the queue
// add 5 more transmissions to overflow the queue by 1
for i := 0; i < 5; i++ {
transmitQueue.Push(testTransmissions[1].tr, testTransmissions[1].ctx)
}

// expecting testTransmissions[0] to get evicted and not present in the queue anymore
testutils.WaitForLogMessage(t, observedLogs, "Transmit queue is full; dropping oldest transmission (reached max length of 7)")
for i := 0; i < 7; i++ {
tr := transmitQueue.BlockingPop()
// TODO: Should be tr.Req instead of tr.
assert.NotEqual(t, tr, testTransmissions[0].tr)
assert.NotEqual(t, tr.Req, testTransmissions[0].tr)
}
})

Expand Down
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ require (
github.com/cometbft/cometbft v0.37.2
github.com/cosmos/cosmos-sdk v0.47.4
github.com/danielkov/gin-helmet v0.0.0-20171108135313-1387e224435e
github.com/esote/minmaxheap v1.0.0
github.com/ethereum/go-ethereum v1.12.0
github.com/fatih/color v1.15.0
github.com/fxamacker/cbor/v2 v2.4.0
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -314,6 +314,8 @@ github.com/envoyproxy/go-control-plane v0.9.9-0.20201210154907-fd9021fe5dad/go.m
github.com/envoyproxy/go-control-plane v0.9.9-0.20210217033140-668b12f5399d/go.mod h1:cXg6YxExXjJnVBQHBLXeUAgxn2UodCpnH306RInaBQk=
github.com/envoyproxy/protoc-gen-validate v0.1.0/go.mod h1:iSmxcyjqTsJpI2R4NaDN7+kN2VEUnK/pcBlmesArF7c=
github.com/envoyproxy/protoc-gen-validate v1.0.2 h1:QkIBuU5k+x7/QXPvPPnWXWlCdaBFApVqftFV6k087DA=
github.com/esote/minmaxheap v1.0.0 h1:rgA7StnXXpZG6qlM0S7pUmEv1KpWe32rYT4x8J8ntaA=
github.com/esote/minmaxheap v1.0.0/go.mod h1:Ln8+i7fS1k3PLgZI2JAo0iA1as95QnIYiGCrqSJ5FZk=
github.com/etcd-io/bbolt v1.3.3/go.mod h1:ZF2nL25h33cCyBtcyWeZ2/I3HQOfTP+0PIEvHjkjCrw=
github.com/ethereum/go-ethereum v1.12.0 h1:bdnhLPtqETd4m3mS8BGMNvBTf36bO5bx/hxE2zljOa0=
github.com/ethereum/go-ethereum v1.12.0/go.mod h1:/oo2X/dZLJjf2mJ6YT9wcWxa4nNJDBKDBU6sFIpx1Gs=
Expand Down
1 change: 1 addition & 0 deletions integration-tests/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,7 @@ require (
github.com/dvsekhvalnov/jose2go v1.5.0 // indirect
github.com/edsrzf/mmap-go v1.1.0 // indirect
github.com/emicklei/go-restful/v3 v3.10.1 // indirect
github.com/esote/minmaxheap v1.0.0 // indirect
github.com/evanphx/json-patch v5.6.0+incompatible // indirect
github.com/evanphx/json-patch/v5 v5.6.0 // indirect
github.com/exponent-io/jsonpath v0.0.0-20210407135951-1de76d718b3f // indirect
Expand Down
2 changes: 2 additions & 0 deletions integration-tests/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -896,6 +896,8 @@ github.com/envoyproxy/protoc-gen-validate v0.6.7/go.mod h1:dyJXwwfPK2VSqiB9Klm1J
github.com/envoyproxy/protoc-gen-validate v0.9.1/go.mod h1:OKNgG7TCp5pF4d6XftA0++PMirau2/yoOwVac3AbF2w=
github.com/envoyproxy/protoc-gen-validate v0.10.0/go.mod h1:DRjgyB0I43LtJapqN6NiRwroiAU2PaFuvk/vjgh61ss=
github.com/envoyproxy/protoc-gen-validate v0.10.1 h1:c0g45+xCJhdgFGw7a5QAfdS4byAbud7miNWJ1WwEVf8=
github.com/esote/minmaxheap v1.0.0 h1:rgA7StnXXpZG6qlM0S7pUmEv1KpWe32rYT4x8J8ntaA=
github.com/esote/minmaxheap v1.0.0/go.mod h1:Ln8+i7fS1k3PLgZI2JAo0iA1as95QnIYiGCrqSJ5FZk=
github.com/etcd-io/bbolt v1.3.3/go.mod h1:ZF2nL25h33cCyBtcyWeZ2/I3HQOfTP+0PIEvHjkjCrw=
github.com/ethereum/go-ethereum v1.12.0 h1:bdnhLPtqETd4m3mS8BGMNvBTf36bO5bx/hxE2zljOa0=
github.com/ethereum/go-ethereum v1.12.0/go.mod h1:/oo2X/dZLJjf2mJ6YT9wcWxa4nNJDBKDBU6sFIpx1Gs=
Expand Down
Loading