diff --git a/pkg/solana/logpoller/filters.go b/pkg/solana/logpoller/filters.go new file mode 100644 index 000000000..44e1e7d94 --- /dev/null +++ b/pkg/solana/logpoller/filters.go @@ -0,0 +1,251 @@ +package logpoller + +import ( + "context" + "errors" + "fmt" + "iter" + "slices" + "sync" + "sync/atomic" + + "github.com/smartcontractkit/chainlink-common/pkg/logger" +) + +type filters struct { + orm ORM + lggr logger.SugaredLogger + + filtersByName map[string]Filter + filtersByAddress map[PublicKey]map[EventSignature][]Filter + filtersToBackfill []Filter + filtersToDelete []Filter // populated on start from db and pruned on first iteration of run + filtersMutex sync.RWMutex + loadedFilters atomic.Bool +} + +func newFilters(lggr logger.SugaredLogger, orm ORM) *filters { + return &filters{ + orm: orm, + lggr: lggr, + + filtersByName: make(map[string]Filter), + filtersByAddress: map[PublicKey]map[EventSignature][]Filter{}, + } +} + +// PruneFilters - prunes all filters marked to be deleted from the database and all corresponding logs. +func (lp *filters) PruneFilters(ctx context.Context) error { + err := lp.LoadFilters(ctx) + if err != nil { + return fmt.Errorf("failed to load filters: %w", err) + } + + lp.filtersMutex.Lock() + filtersToDelete := lp.filtersToDelete + lp.filtersToDelete = nil + lp.filtersMutex.Unlock() + + if len(filtersToDelete) == 0 { + return nil + } + + err = lp.orm.DeleteFilters(ctx, filtersToDelete) + if err != nil { + lp.filtersMutex.Lock() + defer lp.filtersMutex.Unlock() + lp.filtersToDelete = append(lp.filtersToDelete, filtersToDelete...) + return fmt.Errorf("failed to delete filters: %w", err) + } + + return nil +} + +// RegisterFilter persists provided filter and ensures that any log emitted by a contract with filter.Address +// that matches filter.EventSig signature will be captured starting from filter.StartingBlock. +// The filter may be unregistered later by filter.Name. +// In case of Filter.Name collision (within the chain scope) returns ErrFilterNameConflict if +// one of the fields defining resulting logs (Address, EventSig, EventIDL, SubkeyPaths) does not match original filter. +// Otherwise, updates remaining fields and schedules backfill. +// Warnings/debug information is keyed by filter name. +func (lp *filters) RegisterFilter(ctx context.Context, filter Filter) error { + if len(filter.Name) == 0 { + return errors.New("name is required") + } + + err := lp.LoadFilters(ctx) + if err != nil { + return fmt.Errorf("failed to load filters: %w", err) + } + + lp.filtersMutex.Lock() + defer lp.filtersMutex.Unlock() + + if existingFilter, ok := lp.filtersByName[filter.Name]; ok { + if !existingFilter.MatchSameLogs(filter) { + return ErrFilterNameConflict + } + + lp.removeFilterFromIndexes(existingFilter) + } + + filterID, err := lp.orm.InsertFilter(ctx, filter) + if err != nil { + return fmt.Errorf("failed to insert filter: %w", err) + } + + filter.ID = filterID + lp.filtersByName[filter.Name] = filter + filtersByEventSig, ok := lp.filtersByAddress[filter.Address] + if !ok { + filtersByEventSig = make(map[EventSignature][]Filter) + lp.filtersByAddress[filter.Address] = filtersByEventSig + } + + filtersByEventSig[filter.EventSig] = append(filtersByEventSig[filter.EventSig], filter) + lp.filtersToBackfill = append(lp.filtersToBackfill, filter) + return nil +} + +// UnregisterFilter will remove the filter with the given name and async prune all corresponding logs. +// If the name does not exist, it will log an error but not return an error. +// Warnings/debug information is keyed by filter name. +func (lp *filters) UnregisterFilter(ctx context.Context, name string) error { + err := lp.LoadFilters(ctx) + if err != nil { + return fmt.Errorf("failed to load filters: %w", err) + } + + lp.filtersMutex.Lock() + defer lp.filtersMutex.Unlock() + + filter, ok := lp.filtersByName[name] + if !ok { + lp.lggr.Warnw("Filter not found in filtersByName", "name", name) + return nil + } + + if err := lp.orm.MarkFilterDeleted(ctx, filter.ID); err != nil { + return fmt.Errorf("failed to mark filter deleted: %w", err) + } + + lp.removeFilterFromIndexes(filter) + + lp.filtersToDelete = append(lp.filtersToDelete, filter) + return nil +} + +func (lp *filters) removeFilterFromIndexes(filter Filter) { + delete(lp.filtersByName, filter.Name) + lp.filtersToBackfill, _ = removeFilterFromSlice(lp.filtersToBackfill, filter) + + filtersByEventSig, ok := lp.filtersByAddress[filter.Address] + if !ok { + lp.lggr.Warnw("Filter not found in filtersByAddress", "name", filter.Name, "address", filter.Address) + return + } + + filtersByEventSig[filter.EventSig], ok = removeFilterFromSlice(filtersByEventSig[filter.EventSig], filter) + if !ok { + lp.lggr.Warnw("Filter not found in filtersByEventSig", "name", filter.Name, "address", filter.Address) + return + } + + if len(filtersByEventSig[filter.EventSig]) == 0 { + delete(filtersByEventSig, filter.EventSig) + } + + if len(lp.filtersByAddress[filter.Address]) == 0 { + delete(lp.filtersByAddress, filter.Address) + } +} + +// MatchingFilters - returns iterator to go through all matching filters. +// Requires LoadFilters to be called at least once. +func (lp *filters) MatchingFilters(ctx context.Context, addr PublicKey, eventSignature EventSignature) iter.Seq[Filter] { + return func(yield func(Filter) bool) { + lp.filtersMutex.RLock() + defer lp.filtersMutex.RUnlock() + filters, ok := lp.filtersByAddress[addr] + if !ok { + return + } + + for _, filter := range filters[eventSignature] { + if !yield(filter) { + return + } + } + } +} + +// ConsumeFiltersToBackfill - removes all filters from the backfill queue and returns them to caller. +// Requires LoadFilters to be called at least once. +func (lp *filters) ConsumeFiltersToBackfill(ctx context.Context) []Filter { + lp.filtersMutex.Lock() + defer lp.filtersMutex.Unlock() + filtersToBackfill := lp.filtersToBackfill + lp.filtersToBackfill = nil + return filtersToBackfill +} + +// LoadFilters - loads filters from database. Can be called multiple times without side effects. +func (lp *filters) LoadFilters(ctx context.Context) error { + if lp.loadedFilters.Load() { + return nil + } + + lp.lggr.Debugw("Loading filters from db") + lp.filtersMutex.Lock() + defer lp.filtersMutex.Unlock() + // reset filters' indexes to ensure we do not have partial data from the previous run + lp.filtersByAddress = make(map[PublicKey]map[EventSignature][]Filter) + lp.filtersByName = make(map[string]Filter) + lp.filtersToBackfill = nil + lp.filtersToDelete = nil + + filters, err := lp.orm.SelectFilters(ctx) + if err != nil { + return fmt.Errorf("failed to select filters from db: %w", err) + } + + for _, filter := range filters { + if filter.IsDeleted { + lp.filtersToDelete = append(lp.filtersToDelete, filter) + continue + } + + if _, ok := lp.filtersByName[filter.Name]; ok { + errMsg := fmt.Sprintf("invariant violation while loading from db: expected filters to have unique name: %s ", filter.Name) + lp.lggr.Critical(errMsg) + return errors.New(errMsg) + } + + lp.filtersByName[filter.Name] = filter + filtersByEventSig, ok := lp.filtersByAddress[filter.Address] + if !ok { + filtersByEventSig = make(map[EventSignature][]Filter) + lp.filtersByAddress[filter.Address] = filtersByEventSig + } + + filtersByEventSig[filter.EventSig] = append(filtersByEventSig[filter.EventSig], filter) + lp.filtersToBackfill = append(lp.filtersToBackfill, filter) + } + + lp.loadedFilters.Store(true) + + return nil +} + +func removeFilterFromSlice(filters []Filter, filter Filter) ([]Filter, bool) { + index := slices.IndexFunc(filters, func(item Filter) bool { + return item.ID == filter.ID + }) + if index == -1 { + return filters, false + } + + lastIdx := len(filters) - 1 + filters[index], filters[lastIdx] = filters[lastIdx], filters[index] + return filters[:lastIdx], true +} diff --git a/pkg/solana/logpoller/log_poller_test.go b/pkg/solana/logpoller/filters_test.go similarity index 57% rename from pkg/solana/logpoller/log_poller_test.go rename to pkg/solana/logpoller/filters_test.go index 56b89dac0..f4d8a7046 100644 --- a/pkg/solana/logpoller/log_poller_test.go +++ b/pkg/solana/logpoller/filters_test.go @@ -2,17 +2,20 @@ package logpoller import ( "errors" + "fmt" "testing" + "github.com/gagliardetto/solana-go" + "github.com/google/uuid" "github.com/smartcontractkit/chainlink-common/pkg/logger" "github.com/smartcontractkit/chainlink-common/pkg/utils/tests" "github.com/stretchr/testify/mock" "github.com/stretchr/testify/require" ) -func TestLogPoller_LoadFilters(t *testing.T) { +func TestFilters_LoadFilters(t *testing.T) { orm := newMockORM(t) - lp := NewLogPoller(logger.Sugared(logger.Test(t)), orm) + fs := newFilters(logger.Sugared(logger.Test(t)), orm) ctx := tests.Context(t) orm.On("SelectFilters", mock.Anything).Return(nil, errors.New("db failed")).Once() deleted := Filter{ @@ -34,144 +37,171 @@ func TestLogPoller_LoadFilters(t *testing.T) { happyPath2, }, nil).Once() - err := lp.loadFilters(ctx) + err := fs.LoadFilters(ctx) require.EqualError(t, err, "failed to select filters from db: db failed") - err = lp.loadFilters(ctx) + err = fs.LoadFilters(ctx) require.NoError(t, err) // only one filter to delete - require.Len(t, lp.filtersToDelete, 1) - require.Equal(t, deleted, lp.filtersToDelete[0]) + require.Len(t, fs.filtersToDelete, 1) + require.Equal(t, deleted, fs.filtersToDelete[0]) // backfill and happy path both indexed - require.Len(t, lp.filtersByAddress, 1) - require.Len(t, lp.filtersByAddress[happyPath.Address], 1) - require.Len(t, lp.filtersByAddress[happyPath.Address][happyPath.EventSig], 2) - require.Contains(t, lp.filtersByAddress[happyPath.Address][happyPath.EventSig], happyPath) - require.Contains(t, lp.filtersByAddress[happyPath.Address][happyPath.EventSig], happyPath2) - require.Len(t, lp.filtersByName, 2) - require.Equal(t, lp.filtersByName[happyPath.Name], happyPath) - require.Equal(t, lp.filtersByName[happyPath2.Name], happyPath2) + require.Len(t, fs.filtersByAddress, 1) + require.Len(t, fs.filtersByAddress[happyPath.Address], 1) + require.Len(t, fs.filtersByAddress[happyPath.Address][happyPath.EventSig], 2) + require.Contains(t, fs.filtersByAddress[happyPath.Address][happyPath.EventSig], happyPath) + require.Contains(t, fs.filtersByAddress[happyPath.Address][happyPath.EventSig], happyPath2) + require.Len(t, fs.filtersByName, 2) + require.Equal(t, fs.filtersByName[happyPath.Name], happyPath) + require.Equal(t, fs.filtersByName[happyPath2.Name], happyPath2) // any call following successful should be noop - err = lp.loadFilters(ctx) + err = fs.LoadFilters(ctx) require.NoError(t, err) } -func TestLogPoller_RegisterFilter(t *testing.T) { +func TestFilters_RegisterFilter(t *testing.T) { lggr := logger.Sugared(logger.Test(t)) t.Run("Returns an error if name is empty", func(t *testing.T) { orm := newMockORM(t) - lp := NewLogPoller(lggr, orm) - err := lp.RegisterFilter(tests.Context(t), Filter{}) + fs := newFilters(lggr, orm) + err := fs.RegisterFilter(tests.Context(t), Filter{}) require.EqualError(t, err, "name is required") }) t.Run("Returns an error if fails to load filters from db", func(t *testing.T) { orm := newMockORM(t) - lp := NewLogPoller(lggr, orm) + fs := newFilters(lggr, orm) orm.On("SelectFilters", mock.Anything).Return(nil, errors.New("db failed")).Once() - err := lp.RegisterFilter(tests.Context(t), Filter{Name: "Filter"}) + err := fs.RegisterFilter(tests.Context(t), Filter{Name: "Filter"}) require.EqualError(t, err, "failed to load filters: failed to select filters from db: db failed") }) - t.Run("Returns an error if filter with the same name is present in db", func(t *testing.T) { - orm := newMockORM(t) - lp := NewLogPoller(lggr, orm) - const filterName = "Filter" - orm.On("SelectFilters", mock.Anything).Return([]Filter{{Name: filterName}}, nil).Once() - err := lp.RegisterFilter(tests.Context(t), Filter{Name: filterName}) - require.EqualError(t, err, ErrFilterNameConflict.Error()) - }) - t.Run("Returns an error if adding the same filter twice", func(t *testing.T) { - orm := newMockORM(t) - lp := NewLogPoller(lggr, orm) - const filterName = "Filter" - orm.On("SelectFilters", mock.Anything).Return(nil, nil).Once() - orm.On("InsertFilter", mock.Anything, mock.Anything).Return(int64(1), nil).Once() - err := lp.RegisterFilter(tests.Context(t), Filter{Name: filterName}) - require.NoError(t, err) - err = lp.RegisterFilter(tests.Context(t), Filter{Name: filterName}) - require.EqualError(t, err, ErrFilterNameConflict.Error()) + t.Run("Returns an error if trying to update primary fields", func(t *testing.T) { + testCases := []struct { + Name string + ModifyField func(*Filter) + }{ + { + Name: "Address", + ModifyField: func(f *Filter) { + privateKey, err := solana.NewRandomPrivateKey() + require.NoError(t, err) + f.Address = PublicKey(privateKey.PublicKey()) + }, + }, + { + Name: "EventSig", + ModifyField: func(f *Filter) { + f.EventSig = EventSignature{3, 2, 1} + }, + }, + { + Name: "EventIDL", + ModifyField: func(f *Filter) { + f.EventIDL = uuid.NewString() + }, + }, + { + Name: "SubkeyPaths", + ModifyField: func(f *Filter) { + f.SubkeyPaths = [][]string{{uuid.NewString()}} + }, + }, + } + for _, tc := range testCases { + t.Run(fmt.Sprintf("Updating %s", tc.Name), func(t *testing.T) { + orm := newMockORM(t) + fs := newFilters(lggr, orm) + const filterName = "Filter" + dbFilter := Filter{Name: filterName} + orm.On("SelectFilters", mock.Anything).Return([]Filter{dbFilter}, nil).Once() + newFilter := dbFilter + tc.ModifyField(&newFilter) + err := fs.RegisterFilter(tests.Context(t), newFilter) + require.EqualError(t, err, ErrFilterNameConflict.Error()) + }) + } }) t.Run("Happy path", func(t *testing.T) { orm := newMockORM(t) - lp := NewLogPoller(lggr, orm) + fs := newFilters(lggr, orm) const filterName = "Filter" orm.On("SelectFilters", mock.Anything).Return(nil, nil).Once() orm.On("InsertFilter", mock.Anything, mock.Anything).Return(int64(0), errors.New("failed to insert")).Once() - err := lp.RegisterFilter(tests.Context(t), Filter{Name: filterName}) + err := fs.RegisterFilter(tests.Context(t), Filter{Name: filterName}) require.Error(t, err) // can readd after db issue is resovled orm.On("InsertFilter", mock.Anything, mock.Anything).Return(int64(1), nil).Once() - err = lp.RegisterFilter(tests.Context(t), Filter{Name: filterName}) + err = fs.RegisterFilter(tests.Context(t), Filter{Name: filterName}) require.NoError(t, err) }) t.Run("Can reregister after unregister", func(t *testing.T) { orm := newMockORM(t) - lp := NewLogPoller(lggr, orm) + fs := newFilters(lggr, orm) const filterName = "Filter" orm.On("SelectFilters", mock.Anything).Return(nil, nil).Once() const filterID = int64(10) orm.On("InsertFilter", mock.Anything, mock.Anything).Return(filterID, nil).Once() - err := lp.RegisterFilter(tests.Context(t), Filter{Name: filterName}) + err := fs.RegisterFilter(tests.Context(t), Filter{Name: filterName}) require.NoError(t, err) orm.On("MarkFilterDeleted", mock.Anything, filterID).Return(nil).Once() - err = lp.UnregisterFilter(tests.Context(t), filterName) + err = fs.UnregisterFilter(tests.Context(t), filterName) require.NoError(t, err) orm.On("InsertFilter", mock.Anything, mock.Anything).Return(filterID+1, nil).Once() - err = lp.RegisterFilter(tests.Context(t), Filter{Name: filterName}) + err = fs.RegisterFilter(tests.Context(t), Filter{Name: filterName}) require.NoError(t, err) - require.Len(t, lp.filtersToDelete, 1) - require.Contains(t, lp.filtersToDelete, Filter{Name: filterName, ID: filterID}) - require.Len(t, lp.filtersToBackfill, 1) - require.Contains(t, lp.filtersToBackfill, Filter{Name: filterName, ID: filterID + 1}) + require.Len(t, fs.filtersToDelete, 1) + require.Contains(t, fs.filtersToDelete, Filter{Name: filterName, ID: filterID}) + require.Len(t, fs.filtersToBackfill, 1) + require.Contains(t, fs.filtersToBackfill, Filter{Name: filterName, ID: filterID + 1}) }) } -func TestLogPoller_UnregisterFilter(t *testing.T) { +func TestFilters_UnregisterFilter(t *testing.T) { lggr := logger.Sugared(logger.Test(t)) t.Run("Returns an error if fails to load filters from db", func(t *testing.T) { orm := newMockORM(t) - lp := NewLogPoller(lggr, orm) + fs := newFilters(lggr, orm) orm.On("SelectFilters", mock.Anything).Return(nil, errors.New("db failed")).Once() - err := lp.UnregisterFilter(tests.Context(t), "Filter") + err := fs.UnregisterFilter(tests.Context(t), "Filter") require.EqualError(t, err, "failed to load filters: failed to select filters from db: db failed") }) t.Run("Noop if filter is not present", func(t *testing.T) { orm := newMockORM(t) - lp := NewLogPoller(lggr, orm) + fs := newFilters(lggr, orm) const filterName = "Filter" orm.On("SelectFilters", mock.Anything).Return(nil, nil).Once() - err := lp.UnregisterFilter(tests.Context(t), filterName) + err := fs.UnregisterFilter(tests.Context(t), filterName) require.NoError(t, err) }) t.Run("Returns error if fails to mark filter as deleted", func(t *testing.T) { orm := newMockORM(t) - lp := NewLogPoller(lggr, orm) + fs := newFilters(lggr, orm) const filterName = "Filter" const id int64 = 10 orm.On("SelectFilters", mock.Anything).Return([]Filter{{ID: id, Name: filterName}}, nil).Once() orm.On("MarkFilterDeleted", mock.Anything, id).Return(errors.New("db query failed")).Once() - err := lp.UnregisterFilter(tests.Context(t), filterName) + err := fs.UnregisterFilter(tests.Context(t), filterName) require.EqualError(t, err, "failed to mark filter deleted: db query failed") }) t.Run("Happy path", func(t *testing.T) { orm := newMockORM(t) - lp := NewLogPoller(lggr, orm) + fs := newFilters(lggr, orm) const filterName = "Filter" const id int64 = 10 orm.On("SelectFilters", mock.Anything).Return([]Filter{{ID: id, Name: filterName}}, nil).Once() orm.On("MarkFilterDeleted", mock.Anything, id).Return(nil).Once() - err := lp.UnregisterFilter(tests.Context(t), filterName) + err := fs.UnregisterFilter(tests.Context(t), filterName) require.NoError(t, err) - require.Len(t, lp.filtersToDelete, 1) - require.Len(t, lp.filtersToBackfill, 0) - require.Len(t, lp.filtersByName, 0) - require.Len(t, lp.filtersByAddress, 0) + require.Len(t, fs.filtersToDelete, 1) + require.Len(t, fs.filtersToBackfill, 0) + require.Len(t, fs.filtersByName, 0) + require.Len(t, fs.filtersByAddress, 0) }) } -func TestLogPoller_pruneFilters(t *testing.T) { +func TestFilters_pruneFilters(t *testing.T) { lggr := logger.Sugared(logger.Test(t)) t.Run("Happy path", func(t *testing.T) { orm := newMockORM(t) - lp := NewLogPoller(lggr, orm) + fs := newFilters(lggr, orm) toDelete := Filter{ ID: 1, Name: "To delete", @@ -185,13 +215,13 @@ func TestLogPoller_pruneFilters(t *testing.T) { }, }, nil).Once() orm.On("DeleteFilters", mock.Anything, []Filter{toDelete}).Return(nil).Once() - err := lp.pruneFilters(tests.Context(t)) + err := fs.PruneFilters(tests.Context(t)) require.NoError(t, err) - require.Len(t, lp.filtersToDelete, 0) + require.Len(t, fs.filtersToDelete, 0) }) t.Run("If DB removal fails will add filters back into removal slice ", func(t *testing.T) { orm := newMockORM(t) - lp := NewLogPoller(lggr, orm) + fs := newFilters(lggr, orm) toDelete := Filter{ ID: 1, Name: "To delete", @@ -211,11 +241,11 @@ func TestLogPoller_pruneFilters(t *testing.T) { orm.On("DeleteFilters", mock.Anything, []Filter{toDelete}).Return(errors.New("db failed")).Run(func(_ mock.Arguments) { orm.On("MarkFilterDeleted", mock.Anything, newToDelete.ID).Return(nil).Once() orm.On("InsertFilter", mock.Anything, mock.Anything).Return(newToDelete.ID, nil).Once() - require.NoError(t, lp.RegisterFilter(tests.Context(t), newToDelete)) - require.NoError(t, lp.UnregisterFilter(tests.Context(t), newToDelete.Name)) + require.NoError(t, fs.RegisterFilter(tests.Context(t), newToDelete)) + require.NoError(t, fs.UnregisterFilter(tests.Context(t), newToDelete.Name)) }).Once() - err := lp.pruneFilters(tests.Context(t)) + err := fs.PruneFilters(tests.Context(t)) require.EqualError(t, err, "failed to delete filters: db failed") - require.Equal(t, lp.filtersToDelete, []Filter{newToDelete, toDelete}) + require.Equal(t, fs.filtersToDelete, []Filter{newToDelete, toDelete}) }) } diff --git a/pkg/solana/logpoller/log_poller.go b/pkg/solana/logpoller/log_poller.go index 8bcbf37ec..b075a7098 100644 --- a/pkg/solana/logpoller/log_poller.go +++ b/pkg/solana/logpoller/log_poller.go @@ -3,11 +3,7 @@ package logpoller import ( "context" "errors" - "fmt" - "iter" - "slices" "sync" - "sync/atomic" "time" "github.com/smartcontractkit/chainlink-common/pkg/logger" @@ -30,24 +26,18 @@ type LogPoller struct { lggr logger.SugaredLogger orm ORM - filtersByName map[string]Filter - filtersByAddress map[PublicKey]map[EventSignature][]Filter - filtersToBackfill []Filter - filtersToDelete []Filter // populated on start from db and pruned on first iteration of run - filtersMutex sync.RWMutex - loadedFilters atomic.Bool + filters *filters chStop services.StopChan wg sync.WaitGroup } func NewLogPoller(lggr logger.SugaredLogger, orm ORM) *LogPoller { + lggr = logger.Sugared(logger.Named(lggr, "LogPoller")) return &LogPoller{ - orm: orm, - lggr: logger.Sugared(logger.Named(lggr, "LogPoller")), - - filtersByName: make(map[string]Filter), - filtersByAddress: map[PublicKey]map[EventSignature][]Filter{}, + orm: orm, + lggr: lggr, + filters: newFilters(lggr, orm), } } @@ -68,10 +58,48 @@ func (lp *LogPoller) Close() error { }) } +// RegisterFilter - refer to filters.RegisterFilter for details. +func (lp *LogPoller) RegisterFilter(ctx context.Context, filter Filter) error { + ctx, cancel := lp.chStop.Ctx(ctx) + defer cancel() + return lp.filters.RegisterFilter(ctx, filter) +} + +// UnregisterFilter refer to filters.UnregisterFilter for details +func (lp *LogPoller) UnregisterFilter(ctx context.Context, name string) error { + ctx, cancel := lp.chStop.Ctx(ctx) + defer cancel() + return lp.filters.UnregisterFilter(ctx, name) +} + +func (lp *LogPoller) loadFilters(ctx context.Context) error { + retryTicker := services.TickerConfig{Initial: 0, JitterPct: services.DefaultJitter}.NewTicker(time.Second) + defer retryTicker.Stop() + + for { + select { + case <-ctx.Done(): + return ctx.Err() + case <-retryTicker.C: + } + err := lp.filters.LoadFilters(ctx) + if err != nil { + lp.lggr.Errorw("Failed loading filters in init logpoller loop, retrying later", "err", err) + } + + return nil + } +} + func (lp *LogPoller) run() { defer lp.wg.Done() ctx, cancel := lp.chStop.NewCtx() defer cancel() + err := lp.loadFilters(ctx) + if err != nil { + lp.lggr.Warnw("Failed loading filters", "err", err) + return + } var blocks chan struct { BlockNumber int64 @@ -83,10 +111,8 @@ func (lp *LogPoller) run() { case <-ctx.Done(): return case block := <-blocks: - lp.filtersMutex.Lock() - filtersToBackfill := lp.filtersToBackfill - lp.filtersToBackfill = nil - lp.filtersMutex.Unlock() + filtersToBackfill := lp.filters.ConsumeFiltersToBackfill(ctx) + // TODO: NONEVM-916 parse, filters and persist logs // NOTE: removal of filters occurs in the separate goroutine, so there is a chance that upon insert // of log corresponding filter won't be present in the db. Ensure to refilter and retry on insert error @@ -109,7 +135,7 @@ func (lp *LogPoller) backgroundWorkerRun() { case <-ctx.Done(): return case <-pruneFilters.C: - err := lp.pruneFilters(ctx) + err := lp.filters.PruneFilters(ctx) if err != nil { lp.lggr.Errorw("Failed to prune filters", "err", err) } @@ -117,200 +143,7 @@ func (lp *LogPoller) backgroundWorkerRun() { } } -func (lp *LogPoller) pruneFilters(ctx context.Context) error { - err := lp.loadFilters(ctx) - if err != nil { - return fmt.Errorf("failed to load filters: %w", err) - } - - lp.filtersMutex.Lock() - filtersToDelete := lp.filtersToDelete - lp.filtersToDelete = nil - lp.filtersMutex.Unlock() - - if len(filtersToDelete) == 0 { - return nil - } - - err = lp.orm.DeleteFilters(ctx, filtersToDelete) - if err != nil { - lp.filtersMutex.Lock() - defer lp.filtersMutex.Unlock() - lp.filtersToDelete = append(lp.filtersToDelete, filtersToDelete...) - return fmt.Errorf("failed to delete filters: %w", err) - } - - return nil -} - func (lp *LogPoller) startFilterBackfill(ctx context.Context, filter Filter, toBlock int64) { // TODO: NONEVM-916 start backfill lp.lggr.Debugw("Starting filter backfill", "filter", filter) } - -// RegisterFilter persists provided filter and ensures that any log emitted by a contract with filter.Address -// that matches filter.EventSig signature will be captured starting from filter.StartingBlock. -// filter.Name must be unique otherwise ErrFilterNameConflict is returned. // TODO: not sure this is a good idea. Callers are most likely going to ignore this error -// The filter may be unregistered later by Filter.Name -// Warnings/debug information is keyed by filter name. -func (lp *LogPoller) RegisterFilter(ctx context.Context, filter Filter) error { - if len(filter.Name) == 0 { - return errors.New("name is required") - } - - err := lp.loadFilters(ctx) - if err != nil { - return fmt.Errorf("failed to load filters: %w", err) - } - - lp.filtersMutex.Lock() - defer lp.filtersMutex.Unlock() - - if _, ok := lp.filtersByName[filter.Name]; ok { - return ErrFilterNameConflict - } - - filterID, err := lp.orm.InsertFilter(ctx, filter) - if err != nil { - return fmt.Errorf("failed to insert filter: %w", err) - } - - filter.ID = filterID - lp.filtersByName[filter.Name] = filter - filtersByEventSig, ok := lp.filtersByAddress[filter.Address] - if !ok { - filtersByEventSig = make(map[EventSignature][]Filter) - lp.filtersByAddress[filter.Address] = filtersByEventSig - } - - filtersByEventSig[filter.EventSig] = append(filtersByEventSig[filter.EventSig], filter) - lp.filtersToBackfill = append(lp.filtersToBackfill, filter) - return nil -} - -// UnregisterFilter will remove the filter with the given name and prune all corresponding logs. -// If the name does not exist, it will log an error but not return an error. -// Warnings/debug information is keyed by filter name. -func (lp *LogPoller) UnregisterFilter(ctx context.Context, name string) error { - err := lp.loadFilters(ctx) - if err != nil { - return fmt.Errorf("failed to load filters: %w", err) - } - - lp.filtersMutex.Lock() - defer lp.filtersMutex.Unlock() - - filter, ok := lp.filtersByName[name] - if !ok { - lp.lggr.Warnw("Filter not found in filtersByName", "name", name) - return nil - } - - if err := lp.orm.MarkFilterDeleted(ctx, filter.ID); err != nil { - return fmt.Errorf("failed to mark filter deleted: %w", err) - } - - delete(lp.filtersByName, filter.Name) - filtersByEventSig, ok := lp.filtersByAddress[filter.Address] - if !ok { - lp.lggr.Warnw("Filter not found in filtersByAddress", "name", name, "address", filter.Address) - return nil - } - - filtersByEventSig[filter.EventSig], ok = removeFilterFromSlice(filtersByEventSig[filter.EventSig], filter) - if !ok { - lp.lggr.Warnw("Filter not found in filtersByEventSig", "name", name, "address", filter.Address) - } - - if len(filtersByEventSig[filter.EventSig]) == 0 { - delete(filtersByEventSig, filter.EventSig) - } - - if len(lp.filtersByAddress[filter.Address]) == 0 { - delete(lp.filtersByAddress, filter.Address) - } - - // remove or ensure that filters was not present in the slice to backfill - lp.filtersToBackfill, _ = removeFilterFromSlice(lp.filtersToBackfill, filter) - lp.filtersToDelete = append(lp.filtersToDelete, filter) - return nil -} - -func (lp *LogPoller) loadFilters(ctx context.Context) error { - if lp.loadedFilters.Load() { - return nil - } - - lp.lggr.Debugw("Loading filters from db") - lp.filtersMutex.Lock() - defer lp.filtersMutex.Unlock() - // reset filters' indexes to ensure we do not have partial data from the previous run - lp.filtersByAddress = make(map[PublicKey]map[EventSignature][]Filter) - lp.filtersByName = make(map[string]Filter) - lp.filtersToBackfill = nil - lp.filtersToDelete = nil - - ctx, cancel := lp.chStop.Ctx(ctx) - defer cancel() - filters, err := lp.orm.SelectFilters(ctx) - if err != nil { - return fmt.Errorf("failed to select filters from db: %w", err) - } - - for _, filter := range filters { - if filter.IsDeleted { - lp.filtersToDelete = append(lp.filtersToDelete, filter) - continue - } - - if _, ok := lp.filtersByName[filter.Name]; ok { - errMsg := fmt.Sprintf("invariant violation while loading from db: expected filters to have unique name: %s ", filter.Name) - lp.lggr.Critical(errMsg) - return errors.New(errMsg) - } - - lp.filtersByName[filter.Name] = filter - filtersByEventSig, ok := lp.filtersByAddress[filter.Address] - if !ok { - filtersByEventSig = make(map[EventSignature][]Filter) - lp.filtersByAddress[filter.Address] = filtersByEventSig - } - - filtersByEventSig[filter.EventSig] = append(filtersByEventSig[filter.EventSig], filter) - lp.filtersToBackfill = append(lp.filtersToBackfill, filter) - } - - lp.loadedFilters.Store(true) - return nil -} - -func removeFilterFromSlice(filters []Filter, filter Filter) ([]Filter, bool) { - index := slices.IndexFunc(filters, func(item Filter) bool { - return item.ID == filter.ID - }) - if index == -1 { - return filters, false - } - - lastIdx := len(filters) - 1 - filters[index], filters[lastIdx] = filters[lastIdx], filters[index] - return filters[:lastIdx], true -} - -// matchingFilters - allows to iterate through filters that match provided keys -func (lp *LogPoller) matchingFilters(addr PublicKey, eventSignature EventSignature) iter.Seq[Filter] { - return func(yield func(Filter) bool) { - lp.filtersMutex.RLock() - defer lp.filtersMutex.RUnlock() - filters, ok := lp.filtersByAddress[addr] - if !ok { - return - } - - for _, filter := range filters[eventSignature] { - if !yield(filter) { - return - } - } - } -} diff --git a/pkg/solana/logpoller/models.go b/pkg/solana/logpoller/models.go index b04a5ae38..cee0ea381 100644 --- a/pkg/solana/logpoller/models.go +++ b/pkg/solana/logpoller/models.go @@ -7,7 +7,7 @@ import ( ) type Filter struct { - ID int64 + ID int64 // only for internal usage. Values set externally are ignored. Name string Address PublicKey EventName string @@ -17,7 +17,11 @@ type Filter struct { SubkeyPaths SubkeyPaths Retention time.Duration MaxLogsKept int64 - IsDeleted bool + IsDeleted bool // only for internal usage. Values set externally are ignored. +} + +func (f Filter) MatchSameLogs(other Filter) bool { + return f.Address == other.Address && f.EventSig == other.EventSig && f.EventIDL == other.EventIDL && f.SubkeyPaths.Equal(other.SubkeyPaths) } type Log struct { diff --git a/pkg/solana/logpoller/orm.go b/pkg/solana/logpoller/orm.go index 247a3d611..f79e242fb 100644 --- a/pkg/solana/logpoller/orm.go +++ b/pkg/solana/logpoller/orm.go @@ -60,6 +60,11 @@ func (o *DSORM) InsertFilter(ctx context.Context, filter Filter) (id int64, err INSERT INTO solana.log_poller_filters (chain_id, name, address, event_name, event_sig, starting_block, event_idl, subkey_paths, retention, max_logs_kept) VALUES (:chain_id, :name, :address, :event_name, :event_sig, :starting_block, :event_idl, :subkey_paths, :retention, :max_logs_kept) + ON CONFLICT (chain_id, name) WHERE NOT is_deleted DO UPDATE SET + event_name = EXCLUDED.event_name, + starting_block = EXCLUDED.starting_block, + retention = EXCLUDED.retention, + max_logs_kept = EXCLUDED.max_logs_kept RETURNING id;` query, sqlArgs, err := o.ds.BindNamed(query, args) diff --git a/pkg/solana/logpoller/orm_test.go b/pkg/solana/logpoller/orm_test.go index 217bccb17..ca533d0eb 100644 --- a/pkg/solana/logpoller/orm_test.go +++ b/pkg/solana/logpoller/orm_test.go @@ -76,16 +76,25 @@ func TestLogPollerFilters(t *testing.T) { }) } }) - t.Run("Returns and error if name is not unique", func(t *testing.T) { + t.Run("Updates non primary fields if name and chainID is not unique", func(t *testing.T) { chainID := uuid.NewString() dbx := pg.NewSqlxDB(t, dbURL) orm := NewORM(chainID, dbx, lggr) filter := newRandomFilter(t) ctx := tests.Context(t) - _, err := orm.InsertFilter(ctx, filter) + id, err := orm.InsertFilter(ctx, filter) require.NoError(t, err) - _, err = orm.InsertFilter(ctx, filter) - require.EqualError(t, err, `ERROR: duplicate key value violates unique constraint "solana_log_poller_filter_name" (SQLSTATE 23505)`) + filter.EventName = uuid.NewString() + filter.StartingBlock++ + filter.Retention++ + filter.MaxLogsKept++ + id2, err := orm.InsertFilter(ctx, filter) + require.NoError(t, err) + require.Equal(t, id, id2) + dbFilter, err := orm.GetFilterByID(ctx, id) + require.NoError(t, err) + filter.ID = id + require.Equal(t, filter, dbFilter) }) t.Run("Allows reuse name of a filter marked as deleted", func(t *testing.T) { chainID := uuid.NewString() @@ -106,6 +115,18 @@ func TestLogPollerFilters(t *testing.T) { require.NoError(t, err) require.NotEqual(t, newFilterID, filterID, "expected db to generate new filter as we can not be sure that new one matches the same logs") }) + t.Run("Allows reuse name for a filter with different chainID", func(t *testing.T) { + dbx := pg.NewSqlxDB(t, dbURL) + orm1 := NewORM(uuid.NewString(), dbx, lggr) + orm2 := NewORM(uuid.NewString(), dbx, lggr) + filter := newRandomFilter(t) + ctx := tests.Context(t) + filterID1, err := orm1.InsertFilter(ctx, filter) + require.NoError(t, err) + filterID2, err := orm2.InsertFilter(ctx, filter) + require.NoError(t, err) + require.NotEqual(t, filterID1, filterID2) + }) t.Run("Deletes log on parent filter deletion", func(t *testing.T) { dbx := pg.NewSqlxDB(t, dbURL) chainID := uuid.NewString() diff --git a/pkg/solana/logpoller/types.go b/pkg/solana/logpoller/types.go index 95122bc86..ab918c0d3 100644 --- a/pkg/solana/logpoller/types.go +++ b/pkg/solana/logpoller/types.go @@ -4,6 +4,7 @@ import ( "database/sql/driver" "encoding/json" "fmt" + "slices" "github.com/gagliardetto/solana-go" ) @@ -98,6 +99,10 @@ func (p *SubkeyPaths) Scan(src interface{}) error { } +func (k SubkeyPaths) Equal(o SubkeyPaths) bool { + return slices.EqualFunc(k, o, slices.Equal) +} + const EventSignatureLength = 8 type EventSignature [EventSignatureLength]byte