Skip to content

Commit

Permalink
use slices and heap packages for slices
Browse files Browse the repository at this point in the history
  • Loading branch information
EasterTheBunny authored and reductionista committed Dec 12, 2024
1 parent cde38a3 commit c6777c5
Showing 1 changed file with 34 additions and 30 deletions.
64 changes: 34 additions & 30 deletions pkg/solana/logpoller/loader.go
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
package logpoller

import (
"container/heap"
"context"
"errors"
"fmt"
"reflect"
"sort"
"slices"
"sync"
"sync/atomic"
"time"
Expand Down Expand Up @@ -314,7 +314,7 @@ type orderedParser struct {
// internal state
parser ProgramEventProcessor
mu sync.Mutex
blocks []uint64
blocks *blockHeap
ready []uint64
expect map[uint64]int
actual map[uint64][]ProgramEvent
Expand All @@ -323,7 +323,7 @@ type orderedParser struct {
func newOrderedParser(parser ProgramEventProcessor, lggr logger.Logger) *orderedParser {
op := &orderedParser{
parser: parser,
blocks: make([]uint64, 0),
blocks: &blockHeap{},
ready: make([]uint64, 0),
expect: make(map[uint64]int),
actual: make(map[uint64][]ProgramEvent),
Expand All @@ -335,19 +335,16 @@ func newOrderedParser(parser ProgramEventProcessor, lggr logger.Logger) *ordered
Close: op.close,
}.NewServiceEngine(lggr)

heap.Init(op.blocks)

return op
}

func (p *orderedParser) ExpectBlock(block uint64) {
p.mu.Lock()
defer p.mu.Unlock()

p.blocks = append(p.blocks, block)

// ensure sort ascending
sort.Slice(p.blocks, func(i, j int) bool {
return p.blocks[i] < p.blocks[j]
})
heap.Push(p.blocks, block)
}

func (p *orderedParser) ExpectTxs(block uint64, quantity int) {
Expand Down Expand Up @@ -434,7 +431,7 @@ func (p *orderedParser) sendReadySlots() error {
rmvIdx := make([]int, 0)

// start at the lowest block and find ready blocks
for idx, block := range p.blocks {
for idx, block := range *p.blocks {
// if no expectations are set, we are still waiting on information for the block.
// if expectations set and not met, we are still waiting on information for the block
// no other block data should be sent until this is resolved
Expand All @@ -451,12 +448,9 @@ func (p *orderedParser) sendReadySlots() error {
continue
}

// if expectations set and met -> forward, remove, and continue

// to ensure ordered delivery, break from the loop if a ready block isn't found
// this function should be preceded by clearEmptyBlocks
rIdx, ok := getIdx(p.ready, block)
if !ok {
rIdx := slices.Index(p.ready, block)
if rIdx < 0 {
return nil
}

Expand All @@ -475,34 +469,44 @@ func (p *orderedParser) sendReadySlots() error {
return errs
}

p.ready = remove(p.ready, rIdx)
p.ready = slices.Delete(p.ready, rIdx, rIdx+1)
rmvIdx = append(rmvIdx, idx)

delete(p.expect, block)
delete(p.actual, block)
}

for count, idx := range rmvIdx {
p.blocks = remove(p.blocks, idx-count)
p.blocks.Delete(idx - count)
}

return nil
}

func getIdx[T any](slice []T, match T) (int, bool) {
for idx, value := range slice {
if reflect.DeepEqual(value, match) {
return idx, true
}
}
var (
errExpectationsNotSet = errors.New("expectations not set")
)

type blockHeap []uint64

return -1, false
func (h blockHeap) Len() int { return len(h) }
func (h blockHeap) Less(i, j int) bool { return h[i] < h[j] }
func (h blockHeap) Swap(i, j int) { h[i], h[j] = h[j], h[i] }

func (h *blockHeap) Push(x any) {
*h = append(*h, x.(uint64))
}

func remove[T any](slice []T, s int) []T {
return append(slice[:s], slice[s+1:]...)
func (h *blockHeap) Pop() any {
old := *h
n := len(old)
x := old[n-1]
*h = old[0 : n-1]

return x
}

var (
errExpectationsNotSet = errors.New("expectations not set")
)
func (h *blockHeap) Delete(idx int) {
old := *h
*h = slices.Delete(old, idx, idx+1)
}

0 comments on commit c6777c5

Please sign in to comment.