Skip to content

Commit

Permalink
Implement DSL for LogPoller
Browse files Browse the repository at this point in the history
This commit implements the query DSL within LogPoller by remapping comparer filters to
topic and word filters. A parser then creates an SQL statement from the remapped events
and queries the log poller database directly.
  • Loading branch information
EasterTheBunny committed May 8, 2024
1 parent dc94178 commit 3cbb81a
Show file tree
Hide file tree
Showing 14 changed files with 1,254 additions and 314 deletions.
4 changes: 2 additions & 2 deletions core/chains/evm/logpoller/disabled.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,8 +118,8 @@ func (d disabled) LogsDataWordBetween(ctx context.Context, eventSig common.Hash,
return nil, ErrDisabled
}

func (d disabled) FilteredLogs(_ query.KeyFilter, _ query.LimitAndSort) ([]Log, error) {
return nil, nil
func (d disabled) FilteredLogs(_ context.Context, _ query.KeyFilter, _ query.LimitAndSort) ([]Log, error) {
return nil, ErrDisabled
}

func (d disabled) FindLCA(ctx context.Context) (*LogPollerBlock, error) {
Expand Down
7 changes: 4 additions & 3 deletions core/chains/evm/logpoller/log_poller.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,8 @@ type LogPoller interface {
LogsDataWordGreaterThan(ctx context.Context, eventSig common.Hash, address common.Address, wordIndex int, wordValueMin common.Hash, confs evmtypes.Confirmations) ([]Log, error)
LogsDataWordBetween(ctx context.Context, eventSig common.Hash, address common.Address, wordIndexMin, wordIndexMax int, wordValue common.Hash, confs evmtypes.Confirmations) ([]Log, error)

FilteredLogs(filter query.KeyFilter, limitAndSrt query.LimitAndSort) ([]Log, error)
// chainlink-common query filtering
FilteredLogs(ctx context.Context, filter query.KeyFilter, limitAndSort query.LimitAndSort) ([]Log, error)
}

type LogPollerTest interface {
Expand Down Expand Up @@ -1522,6 +1523,6 @@ func EvmWord(i uint64) common.Hash {
return common.BytesToHash(b)
}

func (lp *logPoller) FilteredLogs(queryFilter query.KeyFilter, sortAndLimit query.LimitAndSort) ([]Log, error) {
return lp.orm.FilteredLogs(queryFilter, sortAndLimit)
func (lp *logPoller) FilteredLogs(ctx context.Context, queryFilter query.KeyFilter, limitAndSort query.LimitAndSort) ([]Log, error) {
return lp.orm.FilteredLogs(ctx, queryFilter, limitAndSort)
}
18 changes: 9 additions & 9 deletions core/chains/evm/logpoller/mocks/log_poller.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

42 changes: 31 additions & 11 deletions core/chains/evm/logpoller/orm.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,8 +61,9 @@ type ORM interface {
SelectLogsDataWordRange(ctx context.Context, address common.Address, eventSig common.Hash, wordIndex int, wordValueMin, wordValueMax common.Hash, confs evmtypes.Confirmations) ([]Log, error)
SelectLogsDataWordGreaterThan(ctx context.Context, address common.Address, eventSig common.Hash, wordIndex int, wordValueMin common.Hash, confs evmtypes.Confirmations) ([]Log, error)
SelectLogsDataWordBetween(ctx context.Context, address common.Address, eventSig common.Hash, wordIndexMin int, wordIndexMax int, wordValue common.Hash, confs evmtypes.Confirmations) ([]Log, error)

// FilteredLogs accepts chainlink-common filtering DSL.
FilteredLogs(filter query.KeyFilter, sortAndLimit query.LimitAndSort) ([]Log, error)
FilteredLogs(ctx context.Context, filter query.KeyFilter, limitAndSort query.LimitAndSort) ([]Log, error)
}

type DSORM struct {
Expand Down Expand Up @@ -92,10 +93,10 @@ func (o *DSORM) new(ds sqlutil.DataSource) *DSORM { return NewORM(o.chainID, ds,
// InsertBlock is idempotent to support replays.
func (o *DSORM) InsertBlock(ctx context.Context, blockHash common.Hash, blockNumber int64, blockTimestamp time.Time, finalizedBlock int64) error {
args, err := newQueryArgs(o.chainID).
withCustomHashArg("block_hash", blockHash).
withCustomArg("block_number", blockNumber).
withCustomArg("block_timestamp", blockTimestamp).
withCustomArg("finalized_block_number", finalizedBlock).
withField("block_hash", blockHash).
withField("block_number", blockNumber).
withField("block_timestamp", blockTimestamp).
withField("finalized_block_number", finalizedBlock).
toArgs()
if err != nil {
return err
Expand All @@ -115,7 +116,7 @@ func (o *DSORM) InsertBlock(ctx context.Context, blockHash common.Hash, blockNum
func (o *DSORM) InsertFilter(ctx context.Context, filter Filter) (err error) {
topicArrays := []types.HashArray{filter.Topic2, filter.Topic3, filter.Topic4}
args, err := newQueryArgs(o.chainID).
withCustomArg("name", filter.Name).
withField("name", filter.Name).
withRetention(filter.Retention).
withMaxLogsKept(filter.MaxLogsKept).
withLogsPerBlock(filter.LogsPerBlock).
Expand Down Expand Up @@ -930,8 +931,8 @@ func (o *DSORM) SelectIndexedLogsWithSigsExcluding(ctx context.Context, sigA, si
withTopicIndex(topicIndex).
withStartBlock(startBlock).
withEndBlock(endBlock).
withCustomHashArg("sigA", sigA).
withCustomHashArg("sigB", sigB).
withField("sigA", sigA).
withField("sigB", sigB).
withConfs(confs).
toArgs()
if err != nil {
Expand Down Expand Up @@ -970,9 +971,28 @@ func (o *DSORM) SelectIndexedLogsWithSigsExcluding(ctx context.Context, sigA, si
return logs, nil
}

func (o *DSORM) FilteredLogs(_ query.KeyFilter, _ query.LimitAndSort) ([]Log, error) {
//TODO implement me
panic("implement me")
func (o *DSORM) FilteredLogs(ctx context.Context, filter query.KeyFilter, limitAndSort query.LimitAndSort) ([]Log, error) {
qs, args, err := (&pgDSLParser{}).buildQuery(o.chainID, filter.Expressions, limitAndSort)
if err != nil {
return nil, err
}

values, err := args.toArgs()
if err != nil {
return nil, err
}

query, sqlArgs, err := o.ds.BindNamed(qs, values)
if err != nil {
return nil, err
}

var logs []Log
if err = o.ds.SelectContext(ctx, &logs, query, sqlArgs...); err != nil {
return nil, err
}

return logs, nil
}

func nestedBlockNumberQuery(confs evmtypes.Confirmations) string {
Expand Down
201 changes: 172 additions & 29 deletions core/chains/evm/logpoller/orm_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ import (
"github.com/stretchr/testify/require"

"github.com/smartcontractkit/chainlink-common/pkg/logger"
"github.com/smartcontractkit/chainlink-common/pkg/types/query"
"github.com/smartcontractkit/chainlink-common/pkg/types/query/primitives"

"github.com/smartcontractkit/chainlink/v2/core/chains/evm/logpoller"
"github.com/smartcontractkit/chainlink/v2/core/chains/evm/types"
Expand Down Expand Up @@ -859,18 +861,65 @@ func TestORM_SelectLogsWithSigsByBlockRangeFilter(t *testing.T) {
}
require.NoError(t, o1.InsertLogs(ctx, inputLogs))

filter := func(sigs []common.Hash, startBlock, endBlock int64) query.KeyFilter {
filters := []query.Expression{
logpoller.NewAddressFilter(sourceAddr),
}

if len(sigs) > 0 {
exp := make([]query.Expression, len(sigs))
for idx, val := range sigs {
exp[idx] = logpoller.NewEventSigFilter(val)
}

filters = append(filters, query.Expression{
BoolExpression: query.BoolExpression{
Expressions: exp,
BoolOperator: query.OR,
},
})
}

filters = append(filters, query.Expression{
BoolExpression: query.BoolExpression{
Expressions: []query.Expression{
query.Block(uint64(startBlock), primitives.Gte),
query.Block(uint64(endBlock), primitives.Lte),
},
BoolOperator: query.AND,
},
})

return query.KeyFilter{
Expressions: filters,
}
}

limiter := query.LimitAndSort{
SortBy: []query.SortBy{query.NewSortBySequence(query.Asc)},
}

assertion := func(t *testing.T, logs []logpoller.Log, err error, startBlock, endBlock int64) {
require.NoError(t, err)
assert.Len(t, logs, 4)
for _, l := range logs {
assert.Equal(t, sourceAddr, l.Address, "wrong log address")
assert.True(t, bytes.Equal(topic.Bytes(), l.EventSig.Bytes()) || bytes.Equal(topic2.Bytes(), l.EventSig.Bytes()), "wrong log topic")
assert.True(t, l.BlockNumber >= startBlock && l.BlockNumber <= endBlock)
}
}

startBlock, endBlock := int64(10), int64(15)
logs, err := o1.SelectLogsWithSigs(ctx, startBlock, endBlock, sourceAddr, []common.Hash{
topic,
topic2,
})
require.NoError(t, err)
assert.Len(t, logs, 4)
for _, l := range logs {
assert.Equal(t, sourceAddr, l.Address, "wrong log address")
assert.True(t, bytes.Equal(topic.Bytes(), l.EventSig.Bytes()) || bytes.Equal(topic2.Bytes(), l.EventSig.Bytes()), "wrong log topic")
assert.True(t, l.BlockNumber >= startBlock && l.BlockNumber <= endBlock)
}

assertion(t, logs, err, startBlock, endBlock)

logs, err = th.ORM.FilteredLogs(ctx, filter([]common.Hash{topic, topic2}, startBlock, endBlock), limiter)

assertion(t, logs, err, startBlock, endBlock)
}

func TestORM_DeleteBlocksBefore(t *testing.T) {
Expand Down Expand Up @@ -1404,29 +1453,92 @@ func TestSelectLogsCreatedAfter(t *testing.T) {
},
},
}

filter := func(timestamp time.Time, confs evmtypes.Confirmations, topicIdx int, topicVals []common.Hash) query.KeyFilter {
var queryConfs primitives.ConfirmationLevel

switch confs {
case evmtypes.Finalized:
queryConfs = primitives.Finalized
case evmtypes.Unconfirmed:
queryConfs = primitives.Unconfirmed
default:
fmt.Println("default")
queryConfs = primitives.ConfirmationLevel(confs)
}

filters := []query.Expression{
logpoller.NewAddressFilter(address),
logpoller.NewEventSigFilter(event),
}

if len(topicVals) > 0 {
exp := make([]query.Expression, len(topicVals))
for idx, val := range topicVals {
exp[idx] = logpoller.NewEventByTopicFilter(common.Hash{}, uint64(topicIdx), []primitives.ValueComparator{
{Value: val.String(), Operator: primitives.Eq},
})
}

filters = append(filters, query.Expression{
BoolExpression: query.BoolExpression{
Expressions: exp,
BoolOperator: query.OR,
},
})
}

filters = append(filters, []query.Expression{
query.Timestamp(uint64(timestamp.Unix()), primitives.Gt),
query.Confirmation(queryConfs),
}...)

return query.KeyFilter{
Expressions: filters,
}
}

limiter := query.LimitAndSort{
SortBy: []query.SortBy{
query.NewSortBySequence(query.Asc),
},
}

assertion := func(t *testing.T, logs []logpoller.Log, err error, exp []expectedLog) {
require.NoError(t, err)
require.Len(t, logs, len(exp))

for i, log := range logs {
assert.Equal(t, exp[i].block, log.BlockNumber)
assert.Equal(t, exp[i].log, log.LogIndex)
}
}

for _, tt := range tests {
t.Run("SelectLogsCreatedAfter"+tt.name, func(t *testing.T) {
t.Run(tt.name, func(t *testing.T) {
logs, err := th.ORM.SelectLogsCreatedAfter(ctx, address, event, tt.after, tt.confs)
require.NoError(t, err)
require.Len(t, logs, len(tt.expectedLogs))

for i, log := range logs {
require.Equal(t, tt.expectedLogs[i].block, log.BlockNumber)
require.Equal(t, tt.expectedLogs[i].log, log.LogIndex)
}
})
assertion(t, logs, err, tt.expectedLogs)

t.Run("SelectIndexedLogsCreatedAfter"+tt.name, func(t *testing.T) {
logs, err := th.ORM.SelectIndexedLogsCreatedAfter(ctx, address, event, 1, []common.Hash{event}, tt.after, tt.confs)
require.NoError(t, err)
require.Len(t, logs, len(tt.expectedLogs))
logs, err = th.ORM.FilteredLogs(ctx, filter(tt.after, tt.confs, 0, nil), limiter)

for i, log := range logs {
require.Equal(t, tt.expectedLogs[i].block, log.BlockNumber)
require.Equal(t, tt.expectedLogs[i].log, log.LogIndex)
}
assertion(t, logs, err, tt.expectedLogs)
})
}

t.Run("SelectIndexedLogsCreatedAfter", func(t *testing.T) {
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
logs, err := th.ORM.SelectIndexedLogsCreatedAfter(ctx, address, event, 1, []common.Hash{event}, tt.after, tt.confs)

assertion(t, logs, err, tt.expectedLogs)

logs, err = th.ORM.FilteredLogs(ctx, filter(tt.after, tt.confs, 1, []common.Hash{event}), limiter)

assertion(t, logs, err, tt.expectedLogs)
})
}
})
}

func TestNestedLogPollerBlocksQuery(t *testing.T) {
Expand Down Expand Up @@ -1612,6 +1724,12 @@ func TestSelectLogsDataWordBetween(t *testing.T) {
logpoller.NewLogPollerBlock(utils.RandomBytes32(), 10, time.Now(), 1),
)
require.NoError(t, err)
limiter := query.LimitAndSort{
SortBy: []query.SortBy{
query.NewSortByBlock(query.Asc),
query.NewSortBySequence(query.Asc),
},
}

tests := []struct {
name string
Expand Down Expand Up @@ -1640,15 +1758,40 @@ func TestSelectLogsDataWordBetween(t *testing.T) {
},
}

wordFilter := func(word uint64) query.KeyFilter {
return query.KeyFilter{
Expressions: []query.Expression{
logpoller.NewAddressFilter(address),
logpoller.NewEventSigFilter(eventSig),
logpoller.NewEventByWordFilter(eventSig, 0, []primitives.ValueComparator{
{Value: logpoller.EvmWord(word).Hex(), Operator: primitives.Lte},
}),
logpoller.NewEventByWordFilter(eventSig, 1, []primitives.ValueComparator{
{Value: logpoller.EvmWord(word).Hex(), Operator: primitives.Gte},
}),
query.Confirmation(primitives.Unconfirmed),
},
}
}

assertion := func(t *testing.T, logs []logpoller.Log, err error, expected []int64) {
require.NoError(t, err)
assert.Len(t, logs, len(expected))

for index := range logs {
assert.Equal(t, expected[index], logs[index].BlockNumber)
}
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
logs, err1 := th.ORM.SelectLogsDataWordBetween(ctx, address, eventSig, 0, 1, logpoller.EvmWord(tt.wordValue), evmtypes.Unconfirmed)
assert.NoError(t, err1)
assert.Len(t, logs, len(tt.expectedLogs))
logs, err := th.ORM.SelectLogsDataWordBetween(ctx, address, eventSig, 0, 1, logpoller.EvmWord(tt.wordValue), evmtypes.Unconfirmed)

for index := range logs {
assert.Equal(t, tt.expectedLogs[index], logs[index].BlockNumber)
}
assertion(t, logs, err, tt.expectedLogs)

logs, err = th.ORM.FilteredLogs(ctx, wordFilter(tt.wordValue), limiter)

assertion(t, logs, err, tt.expectedLogs)
})
}
}
Expand Down
Loading

0 comments on commit 3cbb81a

Please sign in to comment.