Skip to content

Commit

Permalink
implement register unregister filter operations
Browse files Browse the repository at this point in the history
  • Loading branch information
dhaidashenko committed Nov 21, 2024
1 parent 3186314 commit ee63576
Show file tree
Hide file tree
Showing 9 changed files with 815 additions and 48 deletions.
317 changes: 317 additions & 0 deletions pkg/solana/logpoller/log_poller.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,317 @@
package logpoller

import (
"context"
"errors"
"fmt"
"iter"
"slices"
"sync"
"sync/atomic"
"time"

"github.com/smartcontractkit/chainlink-common/pkg/logger"
"github.com/smartcontractkit/chainlink-common/pkg/services"
)

var (
ErrFilterNameConflict = errors.New("filter with such name already exists")
)

//go:generate mockery --name ORM --inpackage --structname mockORM --filename mock_orm.go
type ORM interface {
InsertFilter(ctx context.Context, filter Filter) (id int64, err error)
SelectFilters(ctx context.Context) ([]Filter, error)
DeleteFilters(ctx context.Context, filters []Filter) error
MarkFilterDeleted(ctx context.Context, id int64) (err error)
}

type LogPoller struct {
services.StateMachine
lggr logger.SugaredLogger
orm ORM

filtersByName map[string]Filter
filtersByAddress map[PublicKey]map[EventSignature][]Filter
filtersToBackfill []Filter
filtersToDelete []Filter // populated on start from db and pruned on first iteration of run
filtersMutex sync.RWMutex
loadedFilters atomic.Bool

chStop services.StopChan
wg sync.WaitGroup
}

func NewLogPoller(lggr logger.SugaredLogger, orm ORM) *LogPoller {
return &LogPoller{
orm: orm,
lggr: logger.Sugared(logger.Named(lggr, "LogPoller")),

filtersByName: make(map[string]Filter),
filtersByAddress: map[PublicKey]map[EventSignature][]Filter{},
}
}

func (lp *LogPoller) Start(context.Context) error {
return lp.StartOnce("LogPoller", func() error {
lp.wg.Add(2)
go lp.run()
go lp.backgroundWorkerRun()
return nil
})
}

func (lp *LogPoller) Close() error {
return lp.StopOnce("LogPoller", func() error {
close(lp.chStop)
lp.wg.Wait()
return nil
})
}

func (lp *LogPoller) run() {
defer lp.wg.Done()
ctx, cancel := lp.chStop.NewCtx()
defer cancel()

var blocks chan struct {
BlockNumber int64
Logs any // to be defined
}

for {
select {
case <-ctx.Done():
return
case block := <-blocks:
lp.filtersMutex.Lock()
filtersToBackfill := lp.filtersToBackfill
lp.filtersToBackfill = nil
lp.filtersMutex.Unlock()
// TODO: NONEVM-916 parse, filters and persist logs
// NOTE: removal of filters occurs in the separate goroutine, so there is a chance that upon insert
// of log corresponding filter won't be present in the db. Ensure to refilter and retry on insert error
for _, filter := range filtersToBackfill {
go lp.startFilterBackfill(ctx, filter, block.BlockNumber)
}
}
}
}

func (lp *LogPoller) backgroundWorkerRun() {
defer lp.wg.Done()
ctx, cancel := lp.chStop.NewCtx()
defer cancel()

pruneFilters := services.NewTicker(time.Minute + 618*time.Millisecond) // try to minimize collisions with one-second period
defer pruneFilters.Stop()
for {
select {
case <-ctx.Done():
return
case <-pruneFilters.C:
err := lp.pruneFilters(ctx)
if err != nil {
lp.lggr.Errorw("Failed to prune filters", "err", err)
}
}
}
}

func (lp *LogPoller) pruneFilters(ctx context.Context) error {
err := lp.loadFilters(ctx)
if err != nil {
return fmt.Errorf("failed to load filters: %w", err)
}

lp.filtersMutex.Lock()
filtersToDelete := lp.filtersToDelete
lp.filtersToDelete = nil
lp.filtersMutex.Unlock()

if len(filtersToDelete) == 0 {
return nil
}

err = lp.orm.DeleteFilters(ctx, filtersToDelete)
if err != nil {
lp.filtersMutex.Lock()
defer lp.filtersMutex.Unlock()
lp.filtersToDelete = append(lp.filtersToDelete, filtersToDelete...)
return fmt.Errorf("failed to delete filters: %w", err)
}

return nil
}

func (lp *LogPoller) startFilterBackfill(ctx context.Context, filter Filter, toBlock int64) {
// TODO: NONEVM-916 start backfill
lp.lggr.Debugw("Starting filter backfill", "filter", filter)
}

// RegisterFilter persists provided filter and ensures that any log emitted by a contract with filter.Address
// that matches filter.EventSig signature will be captured starting from filter.StartingBlock.
// filter.Name must be unique otherwise ErrFilterNameConflict is returned. // TODO: not sure this is a good idea. Callers are most likely going to ignore this error
// The filter may be unregistered later by Filter.Name
// Warnings/debug information is keyed by filter name.
func (lp *LogPoller) RegisterFilter(ctx context.Context, filter Filter) error {
if len(filter.Name) == 0 {
return errors.New("name is required")
}

err := lp.loadFilters(ctx)
if err != nil {
return fmt.Errorf("failed to load filters: %w", err)
}

lp.filtersMutex.Lock()
defer lp.filtersMutex.Unlock()

if _, ok := lp.filtersByName[filter.Name]; ok {
return ErrFilterNameConflict
}

filterID, err := lp.orm.InsertFilter(ctx, filter)
if err != nil {
return fmt.Errorf("failed to insert filter: %w", err)
}

filter.ID = filterID
lp.filtersByName[filter.Name] = filter
filtersByEventSig, ok := lp.filtersByAddress[filter.Address]
if !ok {
filtersByEventSig = make(map[EventSignature][]Filter)
lp.filtersByAddress[filter.Address] = filtersByEventSig
}

filtersByEventSig[filter.EventSig] = append(filtersByEventSig[filter.EventSig], filter)
lp.filtersToBackfill = append(lp.filtersToBackfill, filter)
return nil
}

// UnregisterFilter will remove the filter with the given name and prune all corresponding logs.
// If the name does not exist, it will log an error but not return an error.
// Warnings/debug information is keyed by filter name.
func (lp *LogPoller) UnregisterFilter(ctx context.Context, name string) error {
err := lp.loadFilters(ctx)
if err != nil {
return fmt.Errorf("failed to load filters: %w", err)
}

lp.filtersMutex.Lock()
defer lp.filtersMutex.Unlock()

filter, ok := lp.filtersByName[name]
if !ok {
lp.lggr.Warnw("Filter not found in filtersByName", "name", name)
return nil
}

if err := lp.orm.MarkFilterDeleted(ctx, filter.ID); err != nil {
return fmt.Errorf("failed to mark filter deleted: %w", err)
}

delete(lp.filtersByName, filter.Name)
filtersByEventSig, ok := lp.filtersByAddress[filter.Address]
if !ok {
lp.lggr.Warnw("Filter not found in filtersByAddress", "name", name, "address", filter.Address)
return nil
}

filtersByEventSig[filter.EventSig], ok = removeFilterFromSlice(filtersByEventSig[filter.EventSig], filter)
if !ok {
lp.lggr.Warnw("Filter not found in filtersByEventSig", "name", name, "address", filter.Address)
}

if len(filtersByEventSig[filter.EventSig]) == 0 {
delete(filtersByEventSig, filter.EventSig)
}

if len(lp.filtersByAddress[filter.Address]) == 0 {
delete(lp.filtersByAddress, filter.Address)
}

// remove or ensure that filters was not present in the slice to backfill
lp.filtersToBackfill, _ = removeFilterFromSlice(lp.filtersToBackfill, filter)
lp.filtersToDelete = append(lp.filtersToDelete, filter)
return nil
}

func (lp *LogPoller) loadFilters(ctx context.Context) error {
if lp.loadedFilters.Load() {
return nil
}

lp.lggr.Debugw("Loading filters from db")
lp.filtersMutex.Lock()
defer lp.filtersMutex.Unlock()
// reset filters' indexes to ensure we do not have partial data from the previous run
lp.filtersByAddress = make(map[PublicKey]map[EventSignature][]Filter)
lp.filtersByName = make(map[string]Filter)
lp.filtersToBackfill = nil
lp.filtersToDelete = nil

ctx, cancel := lp.chStop.Ctx(ctx)
defer cancel()
filters, err := lp.orm.SelectFilters(ctx)
if err != nil {
return fmt.Errorf("failed to select filters from db: %w", err)
}

for _, filter := range filters {
if filter.IsDeleted {
lp.filtersToDelete = append(lp.filtersToDelete, filter)
continue
}

if _, ok := lp.filtersByName[filter.Name]; ok {
errMsg := fmt.Sprintf("invariant violation while loading from db: expected filters to have unique name: %s ", filter.Name)
lp.lggr.Critical(errMsg)
return errors.New(errMsg)
}

lp.filtersByName[filter.Name] = filter
filtersByEventSig, ok := lp.filtersByAddress[filter.Address]
if !ok {
filtersByEventSig = make(map[EventSignature][]Filter)
lp.filtersByAddress[filter.Address] = filtersByEventSig
}

filtersByEventSig[filter.EventSig] = append(filtersByEventSig[filter.EventSig], filter)
lp.filtersToBackfill = append(lp.filtersToBackfill, filter)
}

lp.loadedFilters.Store(true)
return nil
}

func removeFilterFromSlice(filters []Filter, filter Filter) ([]Filter, bool) {
index := slices.IndexFunc(filters, func(item Filter) bool {
return item.ID == filter.ID
})
if index == -1 {
return filters, false
}

lastIdx := len(filters) - 1
filters[index], filters[lastIdx] = filters[lastIdx], filters[index]
return filters[:lastIdx], true
}

// matchingFilters - allows to iterate through filters that match provided keys
func (lp *LogPoller) matchingFilters(addr PublicKey, eventSignature EventSignature) iter.Seq[Filter] {
return func(yield func(Filter) bool) {
lp.filtersMutex.RLock()
defer lp.filtersMutex.RUnlock()
filters, ok := lp.filtersByAddress[addr]
if !ok {
return
}

for _, filter := range filters[eventSignature] {
if !yield(filter) {
return
}
}
}
}
Loading

0 comments on commit ee63576

Please sign in to comment.