Skip to content

Commit

Permalink
move filters management into separate struct & allow update of non pr…
Browse files Browse the repository at this point in the history
…imary fields
  • Loading branch information
dhaidashenko committed Nov 25, 2024
1 parent 1567d41 commit f29db30
Show file tree
Hide file tree
Showing 7 changed files with 439 additions and 290 deletions.
251 changes: 251 additions & 0 deletions pkg/solana/logpoller/filters.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,251 @@
package logpoller

import (
"context"
"errors"
"fmt"
"iter"
"slices"
"sync"
"sync/atomic"

"github.com/smartcontractkit/chainlink-common/pkg/logger"
)

type filters struct {
orm ORM
lggr logger.SugaredLogger

filtersByName map[string]Filter
filtersByAddress map[PublicKey]map[EventSignature][]Filter
filtersToBackfill []Filter
filtersToDelete []Filter // populated on start from db and pruned on first iteration of run
filtersMutex sync.RWMutex
loadedFilters atomic.Bool
}

func newFilters(lggr logger.SugaredLogger, orm ORM) *filters {
return &filters{
orm: orm,
lggr: lggr,

filtersByName: make(map[string]Filter),
filtersByAddress: map[PublicKey]map[EventSignature][]Filter{},
}
}

// PruneFilters - prunes all filters marked to be deleted from the database and all corresponding logs.
func (lp *filters) PruneFilters(ctx context.Context) error {
err := lp.LoadFilters(ctx)
if err != nil {
return fmt.Errorf("failed to load filters: %w", err)
}

lp.filtersMutex.Lock()
filtersToDelete := lp.filtersToDelete
lp.filtersToDelete = nil
lp.filtersMutex.Unlock()

if len(filtersToDelete) == 0 {
return nil
}

err = lp.orm.DeleteFilters(ctx, filtersToDelete)
if err != nil {
lp.filtersMutex.Lock()
defer lp.filtersMutex.Unlock()
lp.filtersToDelete = append(lp.filtersToDelete, filtersToDelete...)
return fmt.Errorf("failed to delete filters: %w", err)
}

return nil
}

// RegisterFilter persists provided filter and ensures that any log emitted by a contract with filter.Address
// that matches filter.EventSig signature will be captured starting from filter.StartingBlock.
// The filter may be unregistered later by filter.Name.
// In case of Filter.Name collision (within the chain scope) returns ErrFilterNameConflict if
// one of the fields defining resulting logs (Address, EventSig, EventIDL, SubkeyPaths) does not match original filter.
// Otherwise, updates remaining fields and schedules backfill.
// Warnings/debug information is keyed by filter name.
func (lp *filters) RegisterFilter(ctx context.Context, filter Filter) error {
if len(filter.Name) == 0 {
return errors.New("name is required")
}

err := lp.LoadFilters(ctx)
if err != nil {
return fmt.Errorf("failed to load filters: %w", err)
}

lp.filtersMutex.Lock()
defer lp.filtersMutex.Unlock()

if existingFilter, ok := lp.filtersByName[filter.Name]; ok {
if !existingFilter.MatchSameLogs(filter) {
return ErrFilterNameConflict
}

lp.removeFilterFromIndexes(existingFilter)
}

filterID, err := lp.orm.InsertFilter(ctx, filter)
if err != nil {
return fmt.Errorf("failed to insert filter: %w", err)
}

filter.ID = filterID
lp.filtersByName[filter.Name] = filter
filtersByEventSig, ok := lp.filtersByAddress[filter.Address]
if !ok {
filtersByEventSig = make(map[EventSignature][]Filter)
lp.filtersByAddress[filter.Address] = filtersByEventSig
}

filtersByEventSig[filter.EventSig] = append(filtersByEventSig[filter.EventSig], filter)
lp.filtersToBackfill = append(lp.filtersToBackfill, filter)
return nil
}

// UnregisterFilter will remove the filter with the given name and async prune all corresponding logs.
// If the name does not exist, it will log an error but not return an error.
// Warnings/debug information is keyed by filter name.
func (lp *filters) UnregisterFilter(ctx context.Context, name string) error {
err := lp.LoadFilters(ctx)
if err != nil {
return fmt.Errorf("failed to load filters: %w", err)
}

lp.filtersMutex.Lock()
defer lp.filtersMutex.Unlock()

filter, ok := lp.filtersByName[name]
if !ok {
lp.lggr.Warnw("Filter not found in filtersByName", "name", name)
return nil
}

if err := lp.orm.MarkFilterDeleted(ctx, filter.ID); err != nil {
return fmt.Errorf("failed to mark filter deleted: %w", err)
}

lp.removeFilterFromIndexes(filter)

lp.filtersToDelete = append(lp.filtersToDelete, filter)
return nil
}

func (lp *filters) removeFilterFromIndexes(filter Filter) {
delete(lp.filtersByName, filter.Name)
lp.filtersToBackfill, _ = removeFilterFromSlice(lp.filtersToBackfill, filter)

filtersByEventSig, ok := lp.filtersByAddress[filter.Address]
if !ok {
lp.lggr.Warnw("Filter not found in filtersByAddress", "name", filter.Name, "address", filter.Address)
return
}

filtersByEventSig[filter.EventSig], ok = removeFilterFromSlice(filtersByEventSig[filter.EventSig], filter)
if !ok {
lp.lggr.Warnw("Filter not found in filtersByEventSig", "name", filter.Name, "address", filter.Address)
return
}

if len(filtersByEventSig[filter.EventSig]) == 0 {
delete(filtersByEventSig, filter.EventSig)
}

if len(lp.filtersByAddress[filter.Address]) == 0 {
delete(lp.filtersByAddress, filter.Address)
}
}

// MatchingFilters - returns iterator to go through all matching filters.
// Requires LoadFilters to be called at least once.
func (lp *filters) MatchingFilters(ctx context.Context, addr PublicKey, eventSignature EventSignature) iter.Seq[Filter] {
return func(yield func(Filter) bool) {
lp.filtersMutex.RLock()
defer lp.filtersMutex.RUnlock()
filters, ok := lp.filtersByAddress[addr]
if !ok {
return
}

for _, filter := range filters[eventSignature] {
if !yield(filter) {
return
}
}
}
}

// ConsumeFiltersToBackfill - removes all filters from the backfill queue and returns them to caller.
// Requires LoadFilters to be called at least once.
func (lp *filters) ConsumeFiltersToBackfill(ctx context.Context) []Filter {
lp.filtersMutex.Lock()
defer lp.filtersMutex.Unlock()
filtersToBackfill := lp.filtersToBackfill
lp.filtersToBackfill = nil
return filtersToBackfill
}

// LoadFilters - loads filters from database. Can be called multiple times without side effects.
func (lp *filters) LoadFilters(ctx context.Context) error {
if lp.loadedFilters.Load() {
return nil
}

lp.lggr.Debugw("Loading filters from db")
lp.filtersMutex.Lock()
defer lp.filtersMutex.Unlock()
// reset filters' indexes to ensure we do not have partial data from the previous run
lp.filtersByAddress = make(map[PublicKey]map[EventSignature][]Filter)
lp.filtersByName = make(map[string]Filter)
lp.filtersToBackfill = nil
lp.filtersToDelete = nil

filters, err := lp.orm.SelectFilters(ctx)
if err != nil {
return fmt.Errorf("failed to select filters from db: %w", err)
}

for _, filter := range filters {
if filter.IsDeleted {
lp.filtersToDelete = append(lp.filtersToDelete, filter)
continue
}

if _, ok := lp.filtersByName[filter.Name]; ok {
errMsg := fmt.Sprintf("invariant violation while loading from db: expected filters to have unique name: %s ", filter.Name)
lp.lggr.Critical(errMsg)
return errors.New(errMsg)
}

lp.filtersByName[filter.Name] = filter
filtersByEventSig, ok := lp.filtersByAddress[filter.Address]
if !ok {
filtersByEventSig = make(map[EventSignature][]Filter)
lp.filtersByAddress[filter.Address] = filtersByEventSig
}

filtersByEventSig[filter.EventSig] = append(filtersByEventSig[filter.EventSig], filter)
lp.filtersToBackfill = append(lp.filtersToBackfill, filter)
}

lp.loadedFilters.Store(true)

return nil
}

func removeFilterFromSlice(filters []Filter, filter Filter) ([]Filter, bool) {
index := slices.IndexFunc(filters, func(item Filter) bool {
return item.ID == filter.ID
})
if index == -1 {
return filters, false
}

lastIdx := len(filters) - 1
filters[index], filters[lastIdx] = filters[lastIdx], filters[index]
return filters[:lastIdx], true
}
Loading

0 comments on commit f29db30

Please sign in to comment.