diff --git a/core/scripts/go.mod b/core/scripts/go.mod index d9f2cc7c9c3..175859f91eb 100644 --- a/core/scripts/go.mod +++ b/core/scripts/go.mod @@ -98,6 +98,7 @@ require ( github.com/docker/go-units v0.5.0 // indirect github.com/dustin/go-humanize v1.0.1 // indirect github.com/dvsekhvalnov/jose2go v1.5.0 // indirect + github.com/esote/minmaxheap v1.0.0 // indirect github.com/fatih/color v1.15.0 // indirect github.com/flynn/noise v0.0.0-20180327030543-2492fe189ae6 // indirect github.com/fsnotify/fsnotify v1.6.0 // indirect diff --git a/core/scripts/go.sum b/core/scripts/go.sum index 26a31aafde8..ef0ba7b8ef2 100644 --- a/core/scripts/go.sum +++ b/core/scripts/go.sum @@ -320,6 +320,8 @@ github.com/envoyproxy/go-control-plane v0.9.9-0.20201210154907-fd9021fe5dad/go.m github.com/envoyproxy/go-control-plane v0.9.9-0.20210217033140-668b12f5399d/go.mod h1:cXg6YxExXjJnVBQHBLXeUAgxn2UodCpnH306RInaBQk= github.com/envoyproxy/protoc-gen-validate v0.1.0/go.mod h1:iSmxcyjqTsJpI2R4NaDN7+kN2VEUnK/pcBlmesArF7c= github.com/envoyproxy/protoc-gen-validate v1.0.2 h1:QkIBuU5k+x7/QXPvPPnWXWlCdaBFApVqftFV6k087DA= +github.com/esote/minmaxheap v1.0.0 h1:rgA7StnXXpZG6qlM0S7pUmEv1KpWe32rYT4x8J8ntaA= +github.com/esote/minmaxheap v1.0.0/go.mod h1:Ln8+i7fS1k3PLgZI2JAo0iA1as95QnIYiGCrqSJ5FZk= github.com/etcd-io/bbolt v1.3.3/go.mod h1:ZF2nL25h33cCyBtcyWeZ2/I3HQOfTP+0PIEvHjkjCrw= github.com/ethereum/go-ethereum v1.12.0 h1:bdnhLPtqETd4m3mS8BGMNvBTf36bO5bx/hxE2zljOa0= github.com/ethereum/go-ethereum v1.12.0/go.mod h1:/oo2X/dZLJjf2mJ6YT9wcWxa4nNJDBKDBU6sFIpx1Gs= diff --git a/core/services/relay/evm/mercury/queue.go b/core/services/relay/evm/mercury/queue.go index 743a6553dff..44042c57725 100644 --- a/core/services/relay/evm/mercury/queue.go +++ b/core/services/relay/evm/mercury/queue.go @@ -1,13 +1,13 @@ package mercury import ( - "container/heap" "context" "errors" "fmt" "sync" "time" + heap "github.com/esote/minmaxheap" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/promauto" @@ -59,11 +59,6 @@ type TransmitQueue struct { type Transmission struct { Req *pb.TransmitRequest // the payload to transmit ReportCtx ocrtypes.ReportContext // contains priority information (latest epoch/round wins) - - // The index is needed by update and is maintained by the heap.Interface - // methods - // It should NOT be set manually - index int // the index of the item in the heap } // maxlen controls how many items will be stored in the queue @@ -97,13 +92,13 @@ func (tq *TransmitQueue) Push(req *pb.TransmitRequest, reportCtx ocrtypes.Report if tq.maxlen != 0 && tq.pq.Len() == tq.maxlen { // evict oldest entry to make room tq.lggr.Criticalf("Transmit queue is full; dropping oldest transmission (reached max length of %d)", tq.maxlen) - removed := heap.Remove(tq.pq, tq.pq.Len()-1) + removed := heap.PopMax(tq.pq) if transmission, ok := removed.(*Transmission); ok { tq.asyncDeleter.AsyncDelete(transmission.Req) } } - heap.Push(tq.pq, &Transmission{req, reportCtx, -1}) + heap.Push(tq.pq, &Transmission{req, reportCtx}) tq.cond.Signal() return true @@ -231,6 +226,10 @@ func (pq priorityQueue) Less(i, j int) bool { pq[i].ReportCtx.ReportTimestamp.Round > pq[j].ReportCtx.ReportTimestamp.Round } +func (pq priorityQueue) Swap(i, j int) { + pq[i], pq[j] = pq[j], pq[i] +} + func (pq *priorityQueue) Pop() any { n := len(*pq) if n == 0 { @@ -238,21 +237,11 @@ func (pq *priorityQueue) Pop() any { } old := *pq item := old[n-1] - old[n-1] = nil // avoid memory leak - item.index = -1 // for safety + old[n-1] = nil // avoid memory leak *pq = old[0 : n-1] return item } func (pq *priorityQueue) Push(x any) { - n := len(*pq) - item := x.(*Transmission) - item.index = n - *pq = append(*pq, item) -} - -func (pq priorityQueue) Swap(i, j int) { - pq[i], pq[j] = pq[j], pq[i] - pq[i].index = i - pq[j].index = j + *pq = append(*pq, x.(*Transmission)) } diff --git a/core/services/relay/evm/mercury/queue_test.go b/core/services/relay/evm/mercury/queue_test.go index 9fa2520147d..de2f64f9fe9 100644 --- a/core/services/relay/evm/mercury/queue_test.go +++ b/core/services/relay/evm/mercury/queue_test.go @@ -5,7 +5,6 @@ import ( "testing" "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/mock" "github.com/stretchr/testify/require" "go.uber.org/zap/zapcore" @@ -81,7 +80,7 @@ func Test_Queue(t *testing.T) { }) t.Run("transmit queue is more than 50% full", func(t *testing.T) { - transmitQueue.Push(testTransmissions[2].tr, testTransmissions[0].ctx) + transmitQueue.Push(testTransmissions[2].tr, testTransmissions[2].ctx) report := transmitQueue.HealthReport() assert.Equal(t, report[transmitQueue.Name()].Error(), "transmit priority queue is greater than 50% full (4/7)") }) @@ -92,21 +91,18 @@ func Test_Queue(t *testing.T) { }) t.Run("transmit queue is full and evicts the oldest transmission", func(t *testing.T) { - // There is a bug with queue eviction where the evicted transmission is NOT the oldest. - // TODO: MERC-1049 Once this bug is fixed, replace the expectation with the one below. - // deleter.On("AsyncDelete", testTransmissions[0].tr).Once() - deleter.On("AsyncDelete", mock.Anything).Once() + deleter.On("AsyncDelete", testTransmissions[0].tr).Once() - // add 5 more transmissions to overflow the queue + // add 5 more transmissions to overflow the queue by 1 for i := 0; i < 5; i++ { transmitQueue.Push(testTransmissions[1].tr, testTransmissions[1].ctx) } + // expecting testTransmissions[0] to get evicted and not present in the queue anymore testutils.WaitForLogMessage(t, observedLogs, "Transmit queue is full; dropping oldest transmission (reached max length of 7)") for i := 0; i < 7; i++ { tr := transmitQueue.BlockingPop() - // TODO: Should be tr.Req instead of tr. - assert.NotEqual(t, tr, testTransmissions[0].tr) + assert.NotEqual(t, tr.Req, testTransmissions[0].tr) } }) diff --git a/go.mod b/go.mod index 7d5b1757992..c0e7c4f1376 100644 --- a/go.mod +++ b/go.mod @@ -12,6 +12,7 @@ require ( github.com/cometbft/cometbft v0.37.2 github.com/cosmos/cosmos-sdk v0.47.4 github.com/danielkov/gin-helmet v0.0.0-20171108135313-1387e224435e + github.com/esote/minmaxheap v1.0.0 github.com/ethereum/go-ethereum v1.12.0 github.com/fatih/color v1.15.0 github.com/fxamacker/cbor/v2 v2.4.0 diff --git a/go.sum b/go.sum index a0243097bc0..d277e898692 100644 --- a/go.sum +++ b/go.sum @@ -314,6 +314,8 @@ github.com/envoyproxy/go-control-plane v0.9.9-0.20201210154907-fd9021fe5dad/go.m github.com/envoyproxy/go-control-plane v0.9.9-0.20210217033140-668b12f5399d/go.mod h1:cXg6YxExXjJnVBQHBLXeUAgxn2UodCpnH306RInaBQk= github.com/envoyproxy/protoc-gen-validate v0.1.0/go.mod h1:iSmxcyjqTsJpI2R4NaDN7+kN2VEUnK/pcBlmesArF7c= github.com/envoyproxy/protoc-gen-validate v1.0.2 h1:QkIBuU5k+x7/QXPvPPnWXWlCdaBFApVqftFV6k087DA= +github.com/esote/minmaxheap v1.0.0 h1:rgA7StnXXpZG6qlM0S7pUmEv1KpWe32rYT4x8J8ntaA= +github.com/esote/minmaxheap v1.0.0/go.mod h1:Ln8+i7fS1k3PLgZI2JAo0iA1as95QnIYiGCrqSJ5FZk= github.com/etcd-io/bbolt v1.3.3/go.mod h1:ZF2nL25h33cCyBtcyWeZ2/I3HQOfTP+0PIEvHjkjCrw= github.com/ethereum/go-ethereum v1.12.0 h1:bdnhLPtqETd4m3mS8BGMNvBTf36bO5bx/hxE2zljOa0= github.com/ethereum/go-ethereum v1.12.0/go.mod h1:/oo2X/dZLJjf2mJ6YT9wcWxa4nNJDBKDBU6sFIpx1Gs= diff --git a/integration-tests/go.mod b/integration-tests/go.mod index d3a8b5a19cb..60ba06590fa 100644 --- a/integration-tests/go.mod +++ b/integration-tests/go.mod @@ -131,6 +131,7 @@ require ( github.com/dvsekhvalnov/jose2go v1.5.0 // indirect github.com/edsrzf/mmap-go v1.1.0 // indirect github.com/emicklei/go-restful/v3 v3.10.1 // indirect + github.com/esote/minmaxheap v1.0.0 // indirect github.com/evanphx/json-patch v5.6.0+incompatible // indirect github.com/evanphx/json-patch/v5 v5.6.0 // indirect github.com/exponent-io/jsonpath v0.0.0-20210407135951-1de76d718b3f // indirect diff --git a/integration-tests/go.sum b/integration-tests/go.sum index 5e77b2bddcc..d24961515fb 100644 --- a/integration-tests/go.sum +++ b/integration-tests/go.sum @@ -896,6 +896,8 @@ github.com/envoyproxy/protoc-gen-validate v0.6.7/go.mod h1:dyJXwwfPK2VSqiB9Klm1J github.com/envoyproxy/protoc-gen-validate v0.9.1/go.mod h1:OKNgG7TCp5pF4d6XftA0++PMirau2/yoOwVac3AbF2w= github.com/envoyproxy/protoc-gen-validate v0.10.0/go.mod h1:DRjgyB0I43LtJapqN6NiRwroiAU2PaFuvk/vjgh61ss= github.com/envoyproxy/protoc-gen-validate v0.10.1 h1:c0g45+xCJhdgFGw7a5QAfdS4byAbud7miNWJ1WwEVf8= +github.com/esote/minmaxheap v1.0.0 h1:rgA7StnXXpZG6qlM0S7pUmEv1KpWe32rYT4x8J8ntaA= +github.com/esote/minmaxheap v1.0.0/go.mod h1:Ln8+i7fS1k3PLgZI2JAo0iA1as95QnIYiGCrqSJ5FZk= github.com/etcd-io/bbolt v1.3.3/go.mod h1:ZF2nL25h33cCyBtcyWeZ2/I3HQOfTP+0PIEvHjkjCrw= github.com/ethereum/go-ethereum v1.12.0 h1:bdnhLPtqETd4m3mS8BGMNvBTf36bO5bx/hxE2zljOa0= github.com/ethereum/go-ethereum v1.12.0/go.mod h1:/oo2X/dZLJjf2mJ6YT9wcWxa4nNJDBKDBU6sFIpx1Gs=