Skip to content

Commit

Permalink
make tests pass again
Browse files Browse the repository at this point in the history
  • Loading branch information
EasterTheBunny authored and reductionista committed Dec 12, 2024
1 parent 78b036c commit cde38a3
Show file tree
Hide file tree
Showing 3 changed files with 274 additions and 140 deletions.
8 changes: 7 additions & 1 deletion pkg/solana/logpoller/job.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,12 +55,18 @@ func (j *processEventJob) Run(_ context.Context) error {
return j.parser.Process(j.event)
}

type wrappedParser interface {
ProgramEventProcessor
ExpectBlock(uint64)
ExpectTxs(uint64, int)
}

// getTransactionsFromBlockJob is a job that fetches transaction signatures from a block and loads
// the job queue with getTransactionLogsJobs for each transaction found in the block.
type getTransactionsFromBlockJob struct {
slotNumber uint64
client RPCClient
parser *orderedParser
parser wrappedParser
chJobs chan Job
}

Expand Down
139 changes: 97 additions & 42 deletions pkg/solana/logpoller/loader.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,8 @@ type EncodedLogCollector struct {

// dependencies and configuration
client RPCClient
parser *orderedParser
ordered *orderedParser
unordered *unorderedParser
lggr logger.Logger
rpcTimeLimit time.Duration

Expand All @@ -66,7 +67,7 @@ func NewEncodedLogCollector(
) *EncodedLogCollector {
c := &EncodedLogCollector{
client: client,
parser: newOrderedParser(parser),
unordered: newUnorderedParser(parser),
chSlot: make(chan uint64),
chBlock: make(chan uint64, 1),
chJobs: make(chan Job, 1),
Expand All @@ -78,8 +79,9 @@ func NewEncodedLogCollector(
Name: "EncodedLogCollector",
NewSubServices: func(lggr logger.Logger) []services.Service {
c.workers = NewWorkerGroup(DefaultWorkerCount, lggr)
c.ordered = newOrderedParser(parser, lggr)

return []services.Service{c.workers}
return []services.Service{c.workers, c.ordered}
},
Start: c.start,
Close: c.close,
Expand Down Expand Up @@ -131,7 +133,7 @@ func (c *EncodedLogCollector) BackfillForAddress(ctx context.Context, address st
if err := c.workers.Do(ctx, &getTransactionsFromBlockJob{
slotNumber: sig.Slot,
client: c.client,
parser: c.parser,
parser: c.unordered,
chJobs: c.chJobs,
}); err != nil {
return err
Expand All @@ -142,7 +144,7 @@ func (c *EncodedLogCollector) BackfillForAddress(ctx context.Context, address st
return nil
}

func (c *EncodedLogCollector) start(ctx context.Context) error {
func (c *EncodedLogCollector) start(_ context.Context) error {
c.engine.Go(c.runSlotPolling)
c.engine.Go(c.runSlotProcessing)
c.engine.Go(c.runBlockProcessing)
Expand Down Expand Up @@ -227,7 +229,7 @@ func (c *EncodedLogCollector) runBlockProcessing(ctx context.Context) {
if err := c.workers.Do(ctx, &getTransactionsFromBlockJob{
slotNumber: slot,
client: c.client,
parser: c.parser,
parser: c.ordered,
chJobs: c.chJobs,
}); err != nil {
c.lggr.Errorf("failed to add job to queue: %s", err)
Expand Down Expand Up @@ -279,7 +281,7 @@ func (c *EncodedLogCollector) loadSlotBlocksRange(ctx context.Context, start, en
}

for _, block := range result {
c.parser.ExpectBlock(block)
c.ordered.ExpectBlock(block)
select {
case <-ctx.Done():
return nil
Expand All @@ -290,7 +292,26 @@ func (c *EncodedLogCollector) loadSlotBlocksRange(ctx context.Context, start, en
return nil
}

type unorderedParser struct {
parser ProgramEventProcessor
}

func newUnorderedParser(parser ProgramEventProcessor) *unorderedParser {
return &unorderedParser{parser: parser}
}

func (p *unorderedParser) ExpectBlock(_ uint64) {}
func (p *unorderedParser) ExpectTxs(_ uint64, _ int) {}
func (p *unorderedParser) Process(evt ProgramEvent) error {
return p.parser.Process(evt)
}

type orderedParser struct {
// service state management
services.Service
engine *services.Engine

// internal state
parser ProgramEventProcessor
mu sync.Mutex
blocks []uint64
Expand All @@ -299,14 +320,22 @@ type orderedParser struct {
actual map[uint64][]ProgramEvent
}

func newOrderedParser(parser ProgramEventProcessor) *orderedParser {
return &orderedParser{
func newOrderedParser(parser ProgramEventProcessor, lggr logger.Logger) *orderedParser {
op := &orderedParser{
parser: parser,
blocks: make([]uint64, 0),
ready: make([]uint64, 0),
expect: make(map[uint64]int),
actual: make(map[uint64][]ProgramEvent),
}

op.Service, op.engine = services.Config{
Name: "OrderedParser",
Start: op.start,
Close: op.close,
}.NewServiceEngine(lggr)

return op
}

func (p *orderedParser) ExpectBlock(block uint64) {
Expand All @@ -333,71 +362,97 @@ func (p *orderedParser) Process(event ProgramEvent) error {
p.mu.Lock()
defer p.mu.Unlock()

meetsExpectations, err := p.addAndCompareExpectations(event)
if err != nil {
return err
}

// incoming event does not meet expectations for transaction
// event is added to actual and no error is returned
if !meetsExpectations {
if err := p.addToExpectations(event); err != nil {
// TODO: log error because this is an unrecoverable error
return nil
}

p.clearEmptyBlocks()
p.setReady(event.SlotNumber)

return p.sendReadySlots()
}

func (p *orderedParser) addAndCompareExpectations(evt ProgramEvent) (bool, error) {
func (p *orderedParser) start(_ context.Context) error {
p.engine.GoTick(services.NewTicker(time.Second), p.run)

return nil
}

func (p *orderedParser) close() error {
return nil
}

func (p *orderedParser) addToExpectations(evt ProgramEvent) error {
expectations, ok := p.expect[evt.SlotNumber]
if !ok {
return false, fmt.Errorf("%w: %d", errExpectationsNotSet, evt.SlotNumber)
return fmt.Errorf("%w: %d", errExpectationsNotSet, evt.SlotNumber)
}

evts, ok := p.actual[evt.SlotNumber]
if !ok {
return false, fmt.Errorf("%w: %d", errExpectationsNotSet, evt.SlotNumber)
return fmt.Errorf("%w: %d", errExpectationsNotSet, evt.SlotNumber)
}

p.actual[evt.SlotNumber] = append(evts, evt)

return expectations == len(evts)+1, nil
}

func (p *orderedParser) clearEmptyBlocks() {
rmvIdx := make([]int, 0)

for idx, block := range p.blocks {
exp, ok := p.expect[block]
if !ok {
// transaction expectations have not been applied for this block yet
continue
}
if expectations == len(evts)+1 {
p.setReady(evt.SlotNumber)
}

if exp == 0 {
rmvIdx = append(rmvIdx, idx)
return nil
}

delete(p.expect, block)
delete(p.actual, block)
}
func (p *orderedParser) expectations(block uint64) (int, bool, error) {
expectations, ok := p.expect[block]
if !ok {
return 0, false, fmt.Errorf("%w: %d", errExpectationsNotSet, block)
}

for count, idx := range rmvIdx {
p.blocks = remove(p.blocks, idx-count)
evts, ok := p.actual[block]
if !ok {
return 0, false, fmt.Errorf("%w: %d", errExpectationsNotSet, block)
}

return expectations, expectations == len(evts), nil
}

func (p *orderedParser) clearExpectations(block uint64) {
delete(p.expect, block)
delete(p.actual, block)
}

func (p *orderedParser) setReady(slot uint64) {
p.ready = append(p.ready, slot)
}

func (p *orderedParser) run(_ context.Context) {
p.mu.Lock()
defer p.mu.Unlock()

_ = p.sendReadySlots()
}

func (p *orderedParser) sendReadySlots() error {
rmvIdx := make([]int, 0)

// start at the lowest block and find ready blocks
for idx, block := range p.blocks {
// if no expectations are set, we are still waiting on information for the block.
// if expectations set and not met, we are still waiting on information for the block
// no other block data should be sent until this is resolved
exp, met, err := p.expectations(block)
if err != nil || !met {
break
}

// if expectations are 0 -> remove and continue
if exp == 0 {
p.clearExpectations(block)
rmvIdx = append(rmvIdx, idx)

continue
}

// if expectations set and met -> forward, remove, and continue

// to ensure ordered delivery, break from the loop if a ready block isn't found
// this function should be preceded by clearEmptyBlocks
rIdx, ok := getIdx(p.ready, block)
Expand Down
Loading

0 comments on commit cde38a3

Please sign in to comment.