Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

TXM In-memory: add priority queue: STEP 1 #12121

Closed
wants to merge 25 commits into from
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
c0cfa05
add priority queue
poopoothegorilla Feb 21, 2024
d54cd8d
Merge branch 'develop' into jtw/step-1-in-memory-work
poopoothegorilla Feb 29, 2024
a516d5e
move RWMutex to PriorityQueue
poopoothegorilla Feb 29, 2024
f6d3d0d
Merge branch 'develop' into jtw/step-1-in-memory-work
poopoothegorilla Mar 6, 2024
51d413b
unexport and move priority queue work
poopoothegorilla Mar 6, 2024
24aa17f
rename parameter from maxUnstarted to capacity
poopoothegorilla Mar 6, 2024
fd6d36b
apply initial recommendations
poopoothegorilla Mar 6, 2024
40f434a
implement all unit tests for TxPriorityQueue
poopoothegorilla Mar 6, 2024
c9865ab
fix test name
poopoothegorilla Mar 6, 2024
9a2f956
remove popped ID from idToIndex map
poopoothegorilla Mar 6, 2024
6e5913a
fix TxPriorityQueue
poopoothegorilla Mar 6, 2024
d57096c
fix potential race condition
poopoothegorilla Mar 7, 2024
e1756d0
panic if misconfigured capacity for priority heap
poopoothegorilla Mar 8, 2024
4306f3e
Merge branch 'develop' into jtw/step-1-in-memory-work
poopoothegorilla Mar 8, 2024
dd11aa8
update changeset
poopoothegorilla Mar 8, 2024
5b541ce
address some comments
poopoothegorilla Mar 11, 2024
960f6dc
add back line that sets element to nil
poopoothegorilla Mar 11, 2024
6f82130
minor nit
poopoothegorilla Mar 11, 2024
41a1b76
Merge branch 'develop' into jtw/step-1-in-memory-work
poopoothegorilla Mar 13, 2024
40f2b22
address comments
poopoothegorilla Mar 14, 2024
1d51ff0
move len to caller
poopoothegorilla Mar 21, 2024
130ec9a
Merge branch 'develop' into jtw/step-1-in-memory-work
poopoothegorilla Mar 21, 2024
ffb5ac8
Merge branch 'develop' into jtw/step-1-in-memory-work
poopoothegorilla Mar 22, 2024
108ad38
Merge branch 'develop' into jtw/step-1-in-memory-work
amit-momin Jun 26, 2024
7adbba8
Updated changeset with tag and fixed lint error
amit-momin Jun 26, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
101 changes: 101 additions & 0 deletions common/txmgr/priority_queue.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
package txmgr

import (
"container/heap"
"sync"

feetypes "github.com/smartcontractkit/chainlink/v2/common/fee/types"
txmgrtypes "github.com/smartcontractkit/chainlink/v2/common/txmgr/types"
"github.com/smartcontractkit/chainlink/v2/common/types"
)

// TxPriorityQueue is a priority queue of transactions prioritized by creation time. The oldest transaction is at the front of the queue.
type TxPriorityQueue[
CHAIN_ID types.ID,
ADDR, TX_HASH, BLOCK_HASH types.Hashable,
R txmgrtypes.ChainReceipt[TX_HASH, BLOCK_HASH],
SEQ types.Sequence,
FEE feetypes.Fee,
] struct {
sync.RWMutex
*txmgrtypes.PriorityQueue[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]
}

// NewTxPriorityQueue returns a new TxPriorityQueue instance
func NewTxPriorityQueue[
CHAIN_ID types.ID,
ADDR, TX_HASH, BLOCK_HASH types.Hashable,
R txmgrtypes.ChainReceipt[TX_HASH, BLOCK_HASH],
SEQ types.Sequence,
FEE feetypes.Fee,
](maxUnstarted int) *TxPriorityQueue[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE] {
pq := TxPriorityQueue[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]{
PriorityQueue: txmgrtypes.NewPriorityQueue[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE](maxUnstarted),
}

return &pq
}

// AddTx adds a transaction to the queue
func (pq *TxPriorityQueue[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) AddTx(tx *txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) {
pq.Lock()
defer pq.Unlock()

heap.Push(pq, tx)
}

// RemoveNextTx removes the next transaction to be processed from the queue
func (pq *TxPriorityQueue[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) RemoveNextTx() *txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE] {
pq.Lock()
defer pq.Unlock()

return heap.Pop(pq).(*txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE])
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would it make sense to capture these low level ops as a separate clean generic heap, without the tx queue specifics? Then this can just have a field of type Heap[*txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE] without mixing in explicit casts.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm... I might not be understanding what you are proposing fully. We will need a method which has the ability to remove a Tx by ID (for pruning purposes)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Lets hold off on refactoring to make more generic for a future PR after all this stuff gets merged

}

// RemoveTxByID removes the transaction with the given ID from the queue
func (pq *TxPriorityQueue[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) RemoveTxByID(id int64) *txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE] {
pq.Lock()
defer pq.Unlock()

if i := pq.FindIndexByID(id); i != -1 {
return heap.Remove(pq, i).(*txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE])
}

return nil
}

// PruneByTxIDs removes the transactions with the given IDs from the queue
func (pq *TxPriorityQueue[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) PruneByTxIDs(ids []int64) []txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE] {
pq.Lock()
defer pq.Unlock()

removed := []txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]{}
for _, id := range ids {
if tx := pq.RemoveTxByID(id); tx != nil {
removed = append(removed, *tx)
}
}

return removed
}

// PeekNextTx returns the next transaction to be processed without removing it from the queue
func (pq *TxPriorityQueue[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) PeekNextTx() *txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE] {
pq.Lock()
defer pq.Unlock()

return pq.Peek()
}

// Close clears the queue
func (pq *TxPriorityQueue[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) Close() {
pq.Lock()
defer pq.Unlock()

pq.PriorityQueue.Close()
}

// Cap returns the capacity of the queue
func (pq *TxPriorityQueue[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) Cap() int {
return pq.PriorityQueue.Cap()
}
86 changes: 86 additions & 0 deletions common/txmgr/types/priority_queue.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
package types

import (
feetypes "github.com/smartcontractkit/chainlink/v2/common/fee/types"
"github.com/smartcontractkit/chainlink/v2/common/types"
)

type PriorityQueue[
CHAIN_ID types.ID,
ADDR, TX_HASH, BLOCK_HASH types.Hashable,
R ChainReceipt[TX_HASH, BLOCK_HASH],
SEQ types.Sequence,
FEE feetypes.Fee,
] struct {
txs []*Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]
idToIndex map[int64]int
}

// NewPriorityQueue returns a new PriorityQueue instance
func NewPriorityQueue[
CHAIN_ID types.ID,
ADDR, TX_HASH, BLOCK_HASH types.Hashable,
R ChainReceipt[TX_HASH, BLOCK_HASH],
SEQ types.Sequence,
FEE feetypes.Fee,
](maxUnstarted int) *PriorityQueue[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE] {
pq := PriorityQueue[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]{
txs: make([]*Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], 0, maxUnstarted),
idToIndex: make(map[int64]int),
}

return &pq
}

// Close clears the queue
func (pq *PriorityQueue[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) Close() {
clear(pq.txs)
clear(pq.idToIndex)
}

// FindIndexByID returns the index of the transaction with the given ID
func (pq *PriorityQueue[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) FindIndexByID(id int64) int {
i, ok := pq.idToIndex[id]
if !ok {
return -1
}
return i
}

// Peek returns the next transaction to be processed
func (pq *PriorityQueue[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) Peek() *Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE] {
if len(pq.txs) == 0 {
return nil
}
return pq.txs[0]
}

// Cap returns the capacity of the queue
func (pq *PriorityQueue[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) Cap() int {
return cap(pq.txs)
}

// Len, Less, Swap, Push, and Pop methods implement the heap interface
func (pq *PriorityQueue[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) Len() int {
return len(pq.txs)
}
func (pq *PriorityQueue[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) Less(i, j int) bool {
// We want Pop to give us the oldest, not newest, transaction based on creation time
return pq.txs[i].CreatedAt.Before(pq.txs[j].CreatedAt)
}
func (pq *PriorityQueue[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) Swap(i, j int) {
pq.txs[i], pq.txs[j] = pq.txs[j], pq.txs[i]
pq.idToIndex[pq.txs[i].ID] = j
pq.idToIndex[pq.txs[j].ID] = i
}
func (pq *PriorityQueue[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) Push(tx any) {
pq.txs = append(pq.txs, tx.(*Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]))
}
func (pq *PriorityQueue[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) Pop() any {
old := pq.txs
n := len(old)
tx := old[n-1]
old[n-1] = nil // avoid memory leak
pq.txs = old[0 : n-1]
return tx
}
Loading