diff --git a/core/chains/evm/logpoller/disabled.go b/core/chains/evm/logpoller/disabled.go index d0a519bcb98..50ffb8751eb 100644 --- a/core/chains/evm/logpoller/disabled.go +++ b/core/chains/evm/logpoller/disabled.go @@ -118,6 +118,6 @@ func (d disabled) LogsDataWordBetween(ctx context.Context, eventSig common.Hash, return nil, ErrDisabled } -func (d disabled) FilteredLogs(_ context.Context, _ query.KeyFilter, _ query.LimitAndSort, _ *EventFilterMapper) ([]Log, error) { +func (d disabled) FilteredLogs(_ context.Context, _ query.KeyFilter, _ query.LimitAndSort, _ EventFilterMapper) ([]Log, error) { return nil, ErrDisabled } diff --git a/core/chains/evm/logpoller/expression.go b/core/chains/evm/logpoller/expression.go index 9314fce0322..554e74d8c52 100644 --- a/core/chains/evm/logpoller/expression.go +++ b/core/chains/evm/logpoller/expression.go @@ -6,20 +6,21 @@ import ( "strings" "github.com/ethereum/go-ethereum/common" + "github.com/smartcontractkit/chainlink-common/pkg/types/query" ) -type Select struct { +type qSelect struct { Fields []string Table string Limit query.Limit } -func (s Select) String() string { +func (q qSelect) String() string { fields := "*" - fieldsOut := s.Fields + fieldsOut := q.Fields - if hasCursorLimit(s.Limit) { + if hasCursorLimit(q.Limit) { fieldsOut = append(fieldsOut, fmt.Sprintf("%s AS %s", cursorFieldName, cursorFieldAlias)) } @@ -27,10 +28,10 @@ func (s Select) String() string { fields = strings.Join(fieldsOut, ",") } - return fmt.Sprintf("SELECT %s FROM %s", fields, s.Table) + return fmt.Sprintf("SELECT %s FROM %s", fields, q.Table) } -type Where struct { +type qWhere struct { ChainID *big.Int Address common.Address Op query.BoolOperator @@ -38,19 +39,19 @@ type Where struct { Limit query.Limit } -func (w Where) String() (string, map[string]any) { +func (q qWhere) String() (string, map[string]any) { args := newNamedArgs() segment := fmt.Sprintf( "WHERE evm_chain_id = %s AND address = %s AND %s", - args.Add("evm_chain_id", w.ChainID.Uint64()), - args.Add("address", w.Address), - makeExpression(w.Op, w.Expressions, args, w.ChainID), + args.Add("evm_chain_id", q.ChainID.Uint64()), + args.Add("address", q.Address), + makeExpression(q.Op, q.Expressions, args, q.ChainID), ) - if hasCursorLimit(w.Limit) { + if hasCursorLimit(q.Limit) { var op string - switch *w.Limit.CursorDirection { + switch *q.Limit.CursorDirection { case query.Following: op = ">" case query.Previous: @@ -64,23 +65,23 @@ func (w Where) String() (string, map[string]any) { segment, cursorFieldAlias, op, - args.Add(cursorFieldAlias, *w.Limit.Cursor), + args.Add(cursorFieldAlias, *q.Limit.Cursor), ) } return segment, args.Values() } -type Order []query.SortBy +type qOrder []query.SortBy -func (o Order) String() string { - if len(o) == 0 { +func (q qOrder) String() string { + if len(q) == 0 { return "" } - sort := make([]string, len(o)) + sort := make([]string, len(q)) - for idx, sorted := range o { + for idx, sorted := range q { var name string switch sorted.(type) { @@ -98,14 +99,14 @@ func (o Order) String() string { return fmt.Sprintf("ORDER BY (%s)", strings.Join(sort, ", ")) } -type Limit query.Limit +type qLimit query.Limit -func (l Limit) String() string { - if !hasCursorLimit(query.Limit(l)) && l.Count == 0 { +func (q qLimit) String() string { + if !hasCursorLimit(query.Limit(q)) && q.Count == 0 { return "" } - return fmt.Sprintf("LIMIT %d", l.Count) + return fmt.Sprintf("LIMIT %d", q.Count) } func hasCursorLimit(limit query.Limit) bool { @@ -128,7 +129,7 @@ func makeExpression(op query.BoolOperator, expressions []query.Expression, args for _, exp := range expressions { if exp.IsPrimitive() { - v := &PgDSLParser{ + v := &pgDSLParser{ chainID: chainID, args: args, } diff --git a/core/chains/evm/logpoller/log_poller.go b/core/chains/evm/logpoller/log_poller.go index 14a220a4437..c222d5169df 100644 --- a/core/chains/evm/logpoller/log_poller.go +++ b/core/chains/evm/logpoller/log_poller.go @@ -66,7 +66,7 @@ type LogPoller interface { LogsDataWordBetween(ctx context.Context, eventSig common.Hash, address common.Address, wordIndexMin, wordIndexMax int, wordValue common.Hash, confs evmtypes.Confirmations) ([]Log, error) // chainlink-common query filtering - FilteredLogs(ctx context.Context, filter query.KeyFilter, sortAndLimit query.LimitAndSort, mapper *EventFilterMapper) ([]Log, error) + FilteredLogs(ctx context.Context, filter query.KeyFilter, sortAndLimit query.LimitAndSort, mapper EventFilterMapper) ([]Log, error) } type LogPollerTest interface { @@ -1345,6 +1345,6 @@ func EvmWord(i uint64) common.Hash { return common.BytesToHash(b) } -func (lp *logPoller) FilteredLogs(ctx context.Context, queryFilter query.KeyFilter, sortAndLimit query.LimitAndSort, mapper *EventFilterMapper) ([]Log, error) { +func (lp *logPoller) FilteredLogs(ctx context.Context, queryFilter query.KeyFilter, sortAndLimit query.LimitAndSort, mapper EventFilterMapper) ([]Log, error) { return lp.orm.FilteredLogs(ctx, queryFilter, sortAndLimit, mapper) } diff --git a/core/chains/evm/logpoller/mapper.go b/core/chains/evm/logpoller/mapper.go index 18dfd13642e..8eb7c94806d 100644 --- a/core/chains/evm/logpoller/mapper.go +++ b/core/chains/evm/logpoller/mapper.go @@ -44,7 +44,7 @@ func (m *EventFilterMapper) remap(filter query.KeyFilter) (query.KeyFilter, erro } if addEventSigFilter { - remapped.Expressions = append(remapped.Expressions, NewEventBySigFilter(m.Hash)) + remapped.Expressions = append(remapped.Expressions, newEventBySigFilter(m.Hash)) } return remapped, nil @@ -81,97 +81,97 @@ func (m *EventFilterMapper) remapPrimitive(key string, expression query.Expressi // remap chain agnostic primitives to chain specific switch primitive := expression.Primitive.(type) { case *query.ConfirmationsPrimitive: - remapped, err := NewFinalityFilter(primitive) + remapped, err := newFinalityFilter(primitive) return remapped, false, err case *query.ComparerPrimitive: if val, ok := m.EventDataWords[primitive.Name]; ok { - return NewEventByWordFilter(m.Hash, val, primitive.ValueComparers), true, nil + return newEventByWordFilter(m.Hash, val, primitive.ValueComparers), true, nil } - return NewEventByTopicFilter(m.Hash, m.Topics[key].Index, primitive.ValueComparers), true, nil + return newEventByTopicFilter(m.Hash, m.Topics[key].Index, primitive.ValueComparers), true, nil default: return expression, false, nil } } -type EventBySigFilter struct { +type eventBySigFilter struct { EventSig common.Hash } -func NewEventBySigFilter(eventSig common.Hash) query.Expression { - return query.Expression{Primitive: &EventBySigFilter{ +func newEventBySigFilter(eventSig common.Hash) query.Expression { + return query.Expression{Primitive: &eventBySigFilter{ EventSig: eventSig, }} } -func (f *EventBySigFilter) Accept(visitor query.Visitor) { +func (f *eventBySigFilter) Accept(visitor query.Visitor) { switch v := visitor.(type) { - case *PgDSLParser: + case *pgDSLParser: v.VisitEventBySigFilter(f) } } -type FinalityFilter struct { +type finalityFilter struct { Confs evmtypes.Confirmations } -func NewFinalityFilter(filter *query.ConfirmationsPrimitive) (query.Expression, error) { +func newFinalityFilter(filter *query.ConfirmationsPrimitive) (query.Expression, error) { switch filter.ConfirmationLevel { case query.Finalized: - return query.Expression{Primitive: &FinalityFilter{evmtypes.Finalized}}, nil + return query.Expression{Primitive: &finalityFilter{evmtypes.Finalized}}, nil case query.Unconfirmed: - return query.Expression{Primitive: &FinalityFilter{evmtypes.Unconfirmed}}, nil + return query.Expression{Primitive: &finalityFilter{evmtypes.Unconfirmed}}, nil default: return query.Expression{}, fmt.Errorf("invalid finality confirmations filter value %v", filter.ConfirmationLevel) } } -func (f *FinalityFilter) Accept(visitor query.Visitor) { +func (f *finalityFilter) Accept(visitor query.Visitor) { switch v := visitor.(type) { - case *PgDSLParser: + case *pgDSLParser: v.VisitFinalityFilter(f) } } -type EventByWordFilter struct { +type eventByWordFilter struct { EventSig common.Hash WordIndex uint8 ValueComparers []query.ValueComparer } -func NewEventByWordFilter(eventSig common.Hash, wordIndex uint8, valueComparers []query.ValueComparer) query.Expression { - return query.Expression{Primitive: &EventByWordFilter{ +func newEventByWordFilter(eventSig common.Hash, wordIndex uint8, valueComparers []query.ValueComparer) query.Expression { + return query.Expression{Primitive: &eventByWordFilter{ EventSig: eventSig, WordIndex: wordIndex, ValueComparers: valueComparers, }} } -func (f *EventByWordFilter) Accept(visitor query.Visitor) { +func (f *eventByWordFilter) Accept(visitor query.Visitor) { switch v := visitor.(type) { - case *PgDSLParser: + case *pgDSLParser: v.VisitEventByWordFilter(f) } } -type EventByTopicFilter struct { +type eventByTopicFilter struct { EventSig common.Hash Topic uint64 ValueComparers []query.ValueComparer } -func NewEventByTopicFilter(eventSig common.Hash, topicIndex uint64, valueComparers []query.ValueComparer) query.Expression { - return query.Expression{Primitive: &EventByTopicFilter{ +func newEventByTopicFilter(eventSig common.Hash, topicIndex uint64, valueComparers []query.ValueComparer) query.Expression { + return query.Expression{Primitive: &eventByTopicFilter{ EventSig: eventSig, Topic: topicIndex, ValueComparers: valueComparers, }} } -func (f *EventByTopicFilter) Accept(visitor query.Visitor) { +func (f *eventByTopicFilter) Accept(visitor query.Visitor) { switch v := visitor.(type) { - case *PgDSLParser: + case *pgDSLParser: v.VisitEventTopicsByValueFilter(f) } } diff --git a/core/chains/evm/logpoller/mocks/log_poller.go b/core/chains/evm/logpoller/mocks/log_poller.go index 978ab287844..494287cc2ce 100644 --- a/core/chains/evm/logpoller/mocks/log_poller.go +++ b/core/chains/evm/logpoller/mocks/log_poller.go @@ -42,7 +42,7 @@ func (_m *LogPoller) Close() error { } // FilteredLogs provides a mock function with given fields: filter, limitAndSrt -func (_m *LogPoller) FilteredLogs(ctx context.Context, filter query.KeyFilter, limitAndSrt query.LimitAndSort, mapper *logpoller.EventFilterMapper) ([]logpoller.Log, error) { +func (_m *LogPoller) FilteredLogs(ctx context.Context, filter query.KeyFilter, limitAndSrt query.LimitAndSort, mapper logpoller.EventFilterMapper) ([]logpoller.Log, error) { ret := _m.Called(ctx, filter, limitAndSrt, mapper) if len(ret) == 0 { @@ -51,10 +51,10 @@ func (_m *LogPoller) FilteredLogs(ctx context.Context, filter query.KeyFilter, l var r0 []logpoller.Log var r1 error - if rf, ok := ret.Get(0).(func(context.Context, query.KeyFilter, query.LimitAndSort, *logpoller.EventFilterMapper) ([]logpoller.Log, error)); ok { + if rf, ok := ret.Get(0).(func(context.Context, query.KeyFilter, query.LimitAndSort, logpoller.EventFilterMapper) ([]logpoller.Log, error)); ok { return rf(ctx, filter, limitAndSrt, mapper) } - if rf, ok := ret.Get(0).(func(context.Context, query.KeyFilter, query.LimitAndSort, *logpoller.EventFilterMapper) []logpoller.Log); ok { + if rf, ok := ret.Get(0).(func(context.Context, query.KeyFilter, query.LimitAndSort, logpoller.EventFilterMapper) []logpoller.Log); ok { r0 = rf(ctx, filter, limitAndSrt, mapper) } else { if ret.Get(0) != nil { @@ -62,7 +62,7 @@ func (_m *LogPoller) FilteredLogs(ctx context.Context, filter query.KeyFilter, l } } - if rf, ok := ret.Get(1).(func(context.Context, query.KeyFilter, query.LimitAndSort, *logpoller.EventFilterMapper) error); ok { + if rf, ok := ret.Get(1).(func(context.Context, query.KeyFilter, query.LimitAndSort, logpoller.EventFilterMapper) error); ok { r1 = rf(ctx, filter, limitAndSrt, mapper) } else { r1 = ret.Error(1) diff --git a/core/chains/evm/logpoller/orm.go b/core/chains/evm/logpoller/orm.go index 1bbea8cf9c5..27b9261907f 100644 --- a/core/chains/evm/logpoller/orm.go +++ b/core/chains/evm/logpoller/orm.go @@ -3,7 +3,6 @@ package logpoller import ( "context" "database/sql" - "errors" "fmt" "math/big" "strings" @@ -63,7 +62,7 @@ type ORM interface { SelectLogsDataWordBetween(ctx context.Context, address common.Address, eventSig common.Hash, wordIndexMin int, wordIndexMax int, wordValue common.Hash, confs evmtypes.Confirmations) ([]Log, error) // FilteredLogs accepts chainlink-common filtering DSL. - FilteredLogs(ctx context.Context, filter query.KeyFilter, sortAndLimit query.LimitAndSort, mapper *EventFilterMapper) ([]Log, error) + FilteredLogs(ctx context.Context, filter query.KeyFilter, sortAndLimit query.LimitAndSort, mapper EventFilterMapper) ([]Log, error) } type DbORM struct { @@ -973,21 +972,15 @@ func (o *DbORM) SelectIndexedLogsWithSigsExcluding(ctx context.Context, sigA, si return logs, nil } -func (o *DbORM) FilteredLogs(ctx context.Context, filter query.KeyFilter, limit query.LimitAndSort, mapper *EventFilterMapper) ([]Log, error) { - if mapper == nil { - return nil, errors.New("mapper required for setting contract address") - } - - if mapper != nil { - newFilter, err := mapper.remap(filter) - if err != nil { - return nil, err - } +func (o *DbORM) FilteredLogs(ctx context.Context, filter query.KeyFilter, limit query.LimitAndSort, mapper EventFilterMapper) ([]Log, error) { + var err error - filter = newFilter + filter, err = mapper.remap(filter) + if err != nil { + return nil, err } - where := Where{ + where := qWhere{ ChainID: o.chainID, Address: mapper.Address, Op: query.AND, @@ -998,21 +991,21 @@ func (o *DbORM) FilteredLogs(ctx context.Context, filter query.KeyFilter, limit whereSegment, args := where.String() qs := fmt.Sprintf( "%s %s %s %s", - Select{ + qSelect{ Table: evmLogsTableName, Limit: limit.Limit, }, whereSegment, - Order(limit.SortBy), - Limit(limit.Limit), + qOrder(limit.SortBy), + qLimit(limit.Limit), ) - var logs []Log query, sqlArgs, err := o.db.BindNamed(qs, args) if err != nil { return nil, err } + var logs []Log if err := o.db.SelectContext(ctx, &logs, query, sqlArgs...); err != nil { return nil, err } diff --git a/core/chains/evm/logpoller/parser.go b/core/chains/evm/logpoller/parser.go index fd1c0fdc8e2..9c0014d160d 100644 --- a/core/chains/evm/logpoller/parser.go +++ b/core/chains/evm/logpoller/parser.go @@ -6,6 +6,7 @@ import ( "strings" "github.com/ethereum/go-ethereum/common" + "github.com/smartcontractkit/chainlink-common/pkg/types/query" ) @@ -20,26 +21,22 @@ const ( evmLogsTableName = "evm.logs" ) -// PgDSLParser is a visitor that builds a postgres query and arguments from a commontypes.QueryFilter -type PgDSLParser struct { +// pgDSLParser is a visitor that builds a postgres query and arguments from a commontypes.QueryFilter +type pgDSLParser struct { chainID *big.Int expression string args *namedArgs } -var _ query.Visitor = (*PgDSLParser)(nil) - -func NewPgParser(evmChainID *big.Int) *PgDSLParser { - return &PgDSLParser{} -} +var _ query.Visitor = (*pgDSLParser)(nil) -func (v *PgDSLParser) ComparerPrimitive(_ query.ComparerPrimitive) { +func (v *pgDSLParser) ComparerPrimitive(_ query.ComparerPrimitive) { // this function only exists to satisfy the query.Visitor interface // the actual usage of this function is expected to be replaced by // the `Visit...` functions } -func (v *PgDSLParser) BlockPrimitive(p query.BlockPrimitive) { +func (v *pgDSLParser) BlockPrimitive(p query.BlockPrimitive) { v.expression = fmt.Sprintf( "%s %s :%s", blockFieldName, @@ -49,7 +46,7 @@ func (v *PgDSLParser) BlockPrimitive(p query.BlockPrimitive) { } // TODO: need to properly handle unconfirmed level with a fallback option -func (v *PgDSLParser) ConfirmationPrimitive(p query.ConfirmationsPrimitive) { +func (v *pgDSLParser) ConfirmationPrimitive(p query.ConfirmationsPrimitive) { if p.ConfirmationLevel == query.Finalized { nested := fmt.Sprintf(`(SELECT finalized_block_number FROM evm.log_poller_blocks @@ -65,7 +62,7 @@ func (v *PgDSLParser) ConfirmationPrimitive(p query.ConfirmationsPrimitive) { } } -func (v *PgDSLParser) TimestampPrimitive(p query.TimestampPrimitive) { +func (v *pgDSLParser) TimestampPrimitive(p query.TimestampPrimitive) { v.expression = fmt.Sprintf( "%s %s :%s", timestampFieldName, @@ -74,7 +71,7 @@ func (v *PgDSLParser) TimestampPrimitive(p query.TimestampPrimitive) { ) } -func (v *PgDSLParser) TxHashPrimitives(p query.TxHashPrimitive) { +func (v *pgDSLParser) TxHashPrimitives(p query.TxHashPrimitive) { v.expression = fmt.Sprintf( "%s = :%s", txHashFieldName, @@ -82,11 +79,11 @@ func (v *PgDSLParser) TxHashPrimitives(p query.TxHashPrimitive) { ) } -func (v *PgDSLParser) VisitEventBySigFilter(p *EventBySigFilter) { +func (v *pgDSLParser) VisitEventBySigFilter(p *eventBySigFilter) { v.expression = eventSigExpression(p.EventSig, v.args) } -func (v *PgDSLParser) VisitFinalityFilter(p *FinalityFilter) { +func (v *pgDSLParser) VisitFinalityFilter(p *finalityFilter) { nested := fmt.Sprintf(`(SELECT greatest(block_number - :%s, 0) FROM evm.log_poller_blocks WHERE evm_chain_id = :%s @@ -98,7 +95,7 @@ func (v *PgDSLParser) VisitFinalityFilter(p *FinalityFilter) { v.expression = fmt.Sprintf("%s <= %s", blockFieldName, nested) } -func (v *PgDSLParser) VisitEventByWordFilter(p *EventByWordFilter) { +func (v *pgDSLParser) VisitEventByWordFilter(p *eventByWordFilter) { v.expression = eventSigExpression(p.EventSig, v.args) if len(p.ValueComparers) > 0 { @@ -119,7 +116,7 @@ func (v *PgDSLParser) VisitEventByWordFilter(p *EventByWordFilter) { } } -func (v *PgDSLParser) VisitEventTopicsByValueFilter(p *EventByTopicFilter) { +func (v *pgDSLParser) VisitEventTopicsByValueFilter(p *eventByTopicFilter) { v.expression = eventSigExpression(p.EventSig, v.args) if len(p.ValueComparers) > 0 { diff --git a/core/services/relay/evm/chain_reader.go b/core/services/relay/evm/chain_reader.go index 472991eb7d1..87411f91d11 100644 --- a/core/services/relay/evm/chain_reader.go +++ b/core/services/relay/evm/chain_reader.go @@ -221,7 +221,7 @@ func (cr *chainReader) addEvent(contractName, eventName string, a abi.ABI, chain inputModifier: inputModifier, codecTopicInfo: codecTopicInfo, id: wrapItemType(readKey, false) + uuid.NewString(), - mapper: &logpoller.EventFilterMapper{ + mapper: logpoller.EventFilterMapper{ Hash: event.ID, Topics: make(map[string]logpoller.Topic), EventDataWords: chainReaderDefinition.GenericDataWordNames, diff --git a/core/services/relay/evm/event_binding.go b/core/services/relay/evm/event_binding.go index 1912daaf3e5..97c59f448e5 100644 --- a/core/services/relay/evm/event_binding.go +++ b/core/services/relay/evm/event_binding.go @@ -33,7 +33,7 @@ type eventBinding struct { inputInfo types.CodecEntry inputModifier codec.Modifier codecTopicInfo types.CodecEntry - mapper *logpoller.EventFilterMapper + mapper logpoller.EventFilterMapper id string }