Skip to content

Commit

Permalink
added extensive log poller orm tests for query capability
Browse files Browse the repository at this point in the history
  • Loading branch information
EasterTheBunny committed May 8, 2024
1 parent f91545a commit 3da40ac
Show file tree
Hide file tree
Showing 4 changed files with 225 additions and 7 deletions.
215 changes: 214 additions & 1 deletion core/chains/evm/logpoller/orm_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -597,47 +597,156 @@ func TestORM_IndexedLogs(t *testing.T) {
insertLogsTopicValueRange(t, th.ChainID, o1, addr, 1, eventSig, 1, 3)
insertLogsTopicValueRange(t, th.ChainID, o1, addr, 2, eventSig, 4, 4) // unconfirmed

filtersForTopics := func(topicIdx uint64, topicValues []uint64) query.Expression {
topicFilters := query.BoolExpression{
Expressions: make([]query.Expression, len(topicValues)),
BoolOperator: query.OR,
}

for idx, value := range topicValues {
topicFilters.Expressions[idx] = logpoller.NewEventByTopicFilter(topicIdx, []primitives.ValueComparator{
{Value: logpoller.EvmWord(value).Hex(), Operator: primitives.Eq},
})
}

return query.Expression{BoolExpression: topicFilters}
}

limiter := query.NewLimitAndSort(query.Limit{}, query.NewSortBySequence(query.Asc))
standardFilter := func(topicIdx uint64, topicValues []uint64) query.KeyFilter {
return query.KeyFilter{
Expressions: []query.Expression{
logpoller.NewAddressFilter(addr),
logpoller.NewEventSigFilter(eventSig),
filtersForTopics(topicIdx, topicValues),
query.Confirmation(primitives.Unconfirmed),
},
}
}

lgs, err := o1.SelectIndexedLogs(ctx, addr, eventSig, 1, []common.Hash{logpoller.EvmWord(1)}, 0)
require.NoError(t, err)
require.Equal(t, 1, len(lgs))
assert.Equal(t, logpoller.EvmWord(1).Bytes(), lgs[0].GetTopics()[1].Bytes())

lgs, err = o1.FilteredLogs(ctx, standardFilter(1, []uint64{1}), limiter)
require.NoError(t, err)
require.Equal(t, 1, len(lgs))
assert.Equal(t, logpoller.EvmWord(1).Bytes(), lgs[0].GetTopics()[1].Bytes())

lgs, err = o1.SelectIndexedLogs(ctx, addr, eventSig, 1, []common.Hash{logpoller.EvmWord(1), logpoller.EvmWord(2)}, 0)
require.NoError(t, err)
assert.Equal(t, 2, len(lgs))

lgs, err = o1.FilteredLogs(ctx, standardFilter(1, []uint64{1, 2}), limiter)
require.NoError(t, err)
assert.Equal(t, 2, len(lgs))

blockRangeFilter := func(start, end uint64, topicIdx uint64, topicValues []uint64) query.KeyFilter {
return query.KeyFilter{
Expressions: []query.Expression{
logpoller.NewAddressFilter(addr),
logpoller.NewEventSigFilter(eventSig),
filtersForTopics(topicIdx, topicValues),
query.Block(start, primitives.Gte),
query.Block(end, primitives.Lte),
},
}
}

lgs, err = o1.SelectIndexedLogsByBlockRange(ctx, 1, 1, addr, eventSig, 1, []common.Hash{logpoller.EvmWord(1)})
require.NoError(t, err)
assert.Equal(t, 1, len(lgs))

lgs, err = o1.FilteredLogs(ctx, blockRangeFilter(1, 1, 1, []uint64{1}), limiter)
require.NoError(t, err)
assert.Equal(t, 1, len(lgs))

lgs, err = o1.SelectIndexedLogsByBlockRange(ctx, 1, 2, addr, eventSig, 1, []common.Hash{logpoller.EvmWord(2)})
require.NoError(t, err)
assert.Equal(t, 1, len(lgs))

lgs, err = o1.FilteredLogs(ctx, blockRangeFilter(1, 2, 1, []uint64{2}), limiter)
require.NoError(t, err)
assert.Equal(t, 1, len(lgs))

lgs, err = o1.SelectIndexedLogsByBlockRange(ctx, 1, 2, addr, eventSig, 1, []common.Hash{logpoller.EvmWord(1)})
require.NoError(t, err)
assert.Equal(t, 1, len(lgs))

lgs, err = o1.FilteredLogs(ctx, blockRangeFilter(1, 2, 1, []uint64{1}), limiter)
require.NoError(t, err)
assert.Equal(t, 1, len(lgs))

_, err = o1.SelectIndexedLogsByBlockRange(ctx, 1, 2, addr, eventSig, 0, []common.Hash{logpoller.EvmWord(1)})
require.Error(t, err)
assert.Contains(t, err.Error(), "invalid index for topic: 0")

_, err = o1.FilteredLogs(ctx, blockRangeFilter(1, 2, 0, []uint64{1}), limiter)
require.Error(t, err)
assert.Contains(t, err.Error(), "invalid index for topic: 0")

_, err = o1.SelectIndexedLogsByBlockRange(ctx, 1, 2, addr, eventSig, 4, []common.Hash{logpoller.EvmWord(1)})
require.Error(t, err)
assert.Contains(t, err.Error(), "invalid index for topic: 4")

_, err = o1.FilteredLogs(ctx, blockRangeFilter(1, 2, 4, []uint64{1}), limiter)
require.Error(t, err)
assert.Contains(t, err.Error(), "invalid index for topic: 4")

lgs, err = o1.SelectIndexedLogsTopicGreaterThan(ctx, addr, eventSig, 1, logpoller.EvmWord(2), 0)
require.NoError(t, err)
assert.Equal(t, 2, len(lgs))

filter := query.KeyFilter{
Expressions: []query.Expression{
logpoller.NewAddressFilter(addr),
logpoller.NewEventSigFilter(eventSig),
logpoller.NewEventByTopicFilter(1, []primitives.ValueComparator{
{Value: logpoller.EvmWord(2).Hex(), Operator: primitives.Gte},
}),
query.Confirmation(primitives.Unconfirmed),
},
}

lgs, err = o1.FilteredLogs(ctx, filter, limiter)
require.NoError(t, err)
assert.Equal(t, 2, len(lgs))

rangeFilter := func(topicIdx uint64, min, max uint64) query.KeyFilter {
return query.KeyFilter{
Expressions: []query.Expression{
logpoller.NewAddressFilter(addr),
logpoller.NewEventSigFilter(eventSig),
logpoller.NewEventByTopicFilter(topicIdx, []primitives.ValueComparator{
{Value: logpoller.EvmWord(min).Hex(), Operator: primitives.Gte},
}),
logpoller.NewEventByTopicFilter(topicIdx, []primitives.ValueComparator{
{Value: logpoller.EvmWord(max).Hex(), Operator: primitives.Lte},
}),
query.Confirmation(primitives.Unconfirmed),
},
}
}

lgs, err = o1.SelectIndexedLogsTopicRange(ctx, addr, eventSig, 1, logpoller.EvmWord(3), logpoller.EvmWord(3), 0)
require.NoError(t, err)
assert.Equal(t, 1, len(lgs))
assert.Equal(t, logpoller.EvmWord(3).Bytes(), lgs[0].GetTopics()[1].Bytes())

lgs, err = o1.FilteredLogs(ctx, rangeFilter(1, 3, 3), limiter)
require.NoError(t, err)
assert.Equal(t, 1, len(lgs))
assert.Equal(t, logpoller.EvmWord(3).Bytes(), lgs[0].GetTopics()[1].Bytes())

lgs, err = o1.SelectIndexedLogsTopicRange(ctx, addr, eventSig, 1, logpoller.EvmWord(1), logpoller.EvmWord(3), 0)
require.NoError(t, err)
assert.Equal(t, 3, len(lgs))

lgs, err = o1.FilteredLogs(ctx, rangeFilter(1, 1, 3), limiter)
require.NoError(t, err)
assert.Equal(t, 3, len(lgs))

// Check confirmations work as expected.
require.NoError(t, o1.InsertBlock(ctx, common.HexToHash("0x2"), 2, time.Now(), 0))
lgs, err = o1.SelectIndexedLogsTopicRange(ctx, addr, eventSig, 1, logpoller.EvmWord(4), logpoller.EvmWord(4), 1)
Expand Down Expand Up @@ -714,6 +823,22 @@ func TestORM_SelectIndexedLogsByTxHash(t *testing.T) {
require.Equal(t, 2, len(retrievedLogs))
require.Equal(t, retrievedLogs[0].LogIndex, logs[0].LogIndex)
require.Equal(t, retrievedLogs[1].LogIndex, logs[1].LogIndex)

limiter := query.NewLimitAndSort(query.Limit{}, query.NewSortBySequence(query.Asc))
filter := query.KeyFilter{
Expressions: []query.Expression{
logpoller.NewAddressFilter(addr),
logpoller.NewEventSigFilter(eventSig),
query.TxHash(txHash.Hex()),
},
}

retrievedLogs, err = o1.FilteredLogs(ctx, filter, limiter)
require.NoError(t, err)

require.Equal(t, 2, len(retrievedLogs))
require.Equal(t, retrievedLogs[0].LogIndex, logs[0].LogIndex)
require.Equal(t, retrievedLogs[1].LogIndex, logs[1].LogIndex)
}

func TestORM_DataWords(t *testing.T) {
Expand Down Expand Up @@ -748,36 +873,92 @@ func TestORM_DataWords(t *testing.T) {
Data: append(logpoller.EvmWord(2).Bytes(), logpoller.EvmWord(3).Bytes()...),
},
}))

wordFilter := func(wordIdx uint8, word1, word2 uint64) query.KeyFilter {
return query.KeyFilter{
Expressions: []query.Expression{
logpoller.NewAddressFilter(addr),
logpoller.NewEventSigFilter(eventSig),
logpoller.NewEventByWordFilter(eventSig, wordIdx, []primitives.ValueComparator{
{Value: logpoller.EvmWord(word1).Hex(), Operator: primitives.Gte},
}),
logpoller.NewEventByWordFilter(eventSig, wordIdx, []primitives.ValueComparator{
{Value: logpoller.EvmWord(word2).Hex(), Operator: primitives.Lte},
}),
query.Confirmation(primitives.Unconfirmed),
},
}
}

limiter := query.NewLimitAndSort(query.Limit{}, query.NewSortBySequence(query.Asc))

// Outside range should fail.
lgs, err := o1.SelectLogsDataWordRange(ctx, addr, eventSig, 0, logpoller.EvmWord(2), logpoller.EvmWord(2), 0)
require.NoError(t, err)
require.Equal(t, 0, len(lgs))

lgs, err = o1.FilteredLogs(ctx, wordFilter(0, 2, 2), limiter)
require.NoError(t, err)
require.Equal(t, 0, len(lgs))

// Range including log should succeed
lgs, err = o1.SelectLogsDataWordRange(ctx, addr, eventSig, 0, logpoller.EvmWord(1), logpoller.EvmWord(2), 0)
require.NoError(t, err)
require.Equal(t, 1, len(lgs))

lgs, err = o1.FilteredLogs(ctx, wordFilter(0, 1, 2), limiter)
require.NoError(t, err)
require.Equal(t, 1, len(lgs))

// Range only covering log should succeed
lgs, err = o1.SelectLogsDataWordRange(ctx, addr, eventSig, 0, logpoller.EvmWord(1), logpoller.EvmWord(1), 0)
require.NoError(t, err)
require.Equal(t, 1, len(lgs))

lgs, err = o1.FilteredLogs(ctx, wordFilter(0, 1, 1), limiter)
require.NoError(t, err)
require.Equal(t, 1, len(lgs))

// Cannot query for unconfirmed second log.
lgs, err = o1.SelectLogsDataWordRange(ctx, addr, eventSig, 1, logpoller.EvmWord(3), logpoller.EvmWord(3), 0)
require.NoError(t, err)
require.Equal(t, 0, len(lgs))

lgs, err = o1.FilteredLogs(ctx, wordFilter(1, 3, 3), limiter)
require.NoError(t, err)
require.Equal(t, 0, len(lgs))

// Confirm it, then can query.
require.NoError(t, o1.InsertBlock(ctx, common.HexToHash("0x2"), 2, time.Now(), 0))
lgs, err = o1.SelectLogsDataWordRange(ctx, addr, eventSig, 1, logpoller.EvmWord(3), logpoller.EvmWord(3), 0)
require.NoError(t, err)
require.Equal(t, 1, len(lgs))
require.Equal(t, lgs[0].Data, append(logpoller.EvmWord(2).Bytes(), logpoller.EvmWord(3).Bytes()...))

lgs, err = o1.FilteredLogs(ctx, wordFilter(1, 3, 3), limiter)
require.NoError(t, err)
require.Equal(t, 1, len(lgs))
require.Equal(t, lgs[0].Data, append(logpoller.EvmWord(2).Bytes(), logpoller.EvmWord(3).Bytes()...))

// Check greater than 1 yields both logs.
lgs, err = o1.SelectLogsDataWordGreaterThan(ctx, addr, eventSig, 0, logpoller.EvmWord(1), 0)
require.NoError(t, err)
assert.Equal(t, 2, len(lgs))

filter := query.KeyFilter{
Expressions: []query.Expression{
logpoller.NewAddressFilter(addr),
logpoller.NewEventSigFilter(eventSig),
logpoller.NewEventByWordFilter(eventSig, 0, []primitives.ValueComparator{
{Value: logpoller.EvmWord(1).Hex(), Operator: primitives.Gte},
}),
query.Confirmation(primitives.Unconfirmed),
},
}

lgs, err = o1.FilteredLogs(ctx, filter, limiter)
require.NoError(t, err)
assert.Equal(t, 2, len(lgs))
}

func TestORM_SelectLogsWithSigsByBlockRangeFilter(t *testing.T) {
Expand Down Expand Up @@ -979,6 +1160,17 @@ func TestLogPoller_Logs(t *testing.T) {
assert.Equal(t, "0x0000000000000000000000000000000000000000000000000000000000000005", lgs[4].BlockHash.String())
assert.Equal(t, "0x0000000000000000000000000000000000000000000000000000000000000005", lgs[5].BlockHash.String())

logFilter := func(start, end uint64, address common.Address) query.KeyFilter {
return query.KeyFilter{
Expressions: []query.Expression{
logpoller.NewAddressFilter(address),
logpoller.NewEventSigFilter(event1),
query.Block(start, primitives.Gte),
query.Block(end, primitives.Lte),
},
}
}

// Filter by Address and topic
lgs, err = th.ORM.SelectLogs(ctx, 1, 3, address1, event1)
require.NoError(t, err)
Expand All @@ -989,6 +1181,17 @@ func TestLogPoller_Logs(t *testing.T) {
assert.Equal(t, "0x0000000000000000000000000000000000000000000000000000000000000005", lgs[1].BlockHash.String())
assert.Equal(t, address1, lgs[1].Address)

lgs, err = th.ORM.FilteredLogs(ctx, logFilter(1, 3, address1), query.LimitAndSort{
SortBy: []query.SortBy{query.NewSortBySequence(query.Asc)},
})
require.NoError(t, err)
require.Equal(t, 2, len(lgs))
assert.Equal(t, "0x0000000000000000000000000000000000000000000000000000000000000003", lgs[0].BlockHash.String())
assert.Equal(t, address1, lgs[0].Address)
assert.Equal(t, event1.Bytes(), lgs[0].Topics[0])
assert.Equal(t, "0x0000000000000000000000000000000000000000000000000000000000000005", lgs[1].BlockHash.String())
assert.Equal(t, address1, lgs[1].Address)

// Filter by block
lgs, err = th.ORM.SelectLogs(ctx, 2, 2, address2, event1)
require.NoError(t, err)
Expand All @@ -997,6 +1200,16 @@ func TestLogPoller_Logs(t *testing.T) {
assert.Equal(t, int64(1), lgs[0].LogIndex)
assert.Equal(t, address2, lgs[0].Address)
assert.Equal(t, event1.Bytes(), lgs[0].Topics[0])

lgs, err = th.ORM.FilteredLogs(ctx, logFilter(2, 2, address2), query.LimitAndSort{
SortBy: []query.SortBy{query.NewSortBySequence(query.Asc)},
})
require.NoError(t, err)
require.Equal(t, 1, len(lgs))
assert.Equal(t, "0x0000000000000000000000000000000000000000000000000000000000000004", lgs[0].BlockHash.String())
assert.Equal(t, int64(1), lgs[0].LogIndex)
assert.Equal(t, address2, lgs[0].Address)
assert.Equal(t, event1.Bytes(), lgs[0].Topics[0])
}

func BenchmarkLogs(b *testing.B) {
Expand Down Expand Up @@ -1475,7 +1688,7 @@ func TestSelectLogsCreatedAfter(t *testing.T) {
if len(topicVals) > 0 {
exp := make([]query.Expression, len(topicVals))
for idx, val := range topicVals {
exp[idx] = logpoller.NewEventByTopicFilter(common.Hash{}, uint64(topicIdx), []primitives.ValueComparator{
exp[idx] = logpoller.NewEventByTopicFilter(uint64(topicIdx), []primitives.ValueComparator{
{Value: val.String(), Operator: primitives.Eq},
})
}
Expand Down
13 changes: 9 additions & 4 deletions core/chains/evm/logpoller/parser.go
Original file line number Diff line number Diff line change
Expand Up @@ -172,7 +172,14 @@ func (v *pgDSLParser) VisitEventByWordFilter(p *eventByWordFilter) {

func (v *pgDSLParser) VisitEventTopicsByValueFilter(p *eventByTopicFilter) {
if len(p.ValueComparers) > 0 {
topicIdx := v.args.withIndexedField("topic_index", p.Topic)
if !(p.Topic == 1 || p.Topic == 2 || p.Topic == 3) {
v.err = fmt.Errorf("invalid index for topic: %d", p.Topic)

return
}

// Add 1 since postgresql arrays are 1-indexed.
topicIdx := v.args.withIndexedField("topic_index", p.Topic+1)

comps := make([]string, len(p.ValueComparers))
for idx, comp := range p.ValueComparers {
Expand Down Expand Up @@ -478,14 +485,12 @@ func (f *eventByWordFilter) Accept(visitor primitives.Visitor) {
}

type eventByTopicFilter struct {
EventSig common.Hash
Topic uint64
ValueComparers []primitives.ValueComparator
}

func NewEventByTopicFilter(eventSig common.Hash, topicIndex uint64, valueComparers []primitives.ValueComparator) query.Expression {
func NewEventByTopicFilter(topicIndex uint64, valueComparers []primitives.ValueComparator) query.Expression {
return query.Expression{Primitive: &eventByTopicFilter{
EventSig: eventSig,
Topic: topicIndex,
ValueComparers: valueComparers,
}}
Expand Down
2 changes: 1 addition & 1 deletion core/chains/evm/logpoller/parser_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -228,7 +228,7 @@ func TestDSLParser(t *testing.T) {
t.Run("query for event topic", func(t *testing.T) {
t.Parallel()

topicFilter := NewEventByTopicFilter(common.HexToHash("0x42"), 8, []primitives.ValueComparator{
topicFilter := NewEventByTopicFilter(8, []primitives.ValueComparator{
{Value: "a", Operator: primitives.Gt},
{Value: "b", Operator: primitives.Lt},
})
Expand Down
2 changes: 1 addition & 1 deletion core/services/relay/evm/event_binding.go
Original file line number Diff line number Diff line change
Expand Up @@ -395,7 +395,7 @@ func (e *eventBinding) remapPrimitive(key string, expression query.Expression) (
return logpoller.NewEventByWordFilter(e.hash, val, primitive.ValueComparators), nil
}

return logpoller.NewEventByTopicFilter(e.hash, e.topics[key].Index, primitive.ValueComparators), nil
return logpoller.NewEventByTopicFilter(e.topics[key].Index, primitive.ValueComparators), nil
default:
return expression, nil
}
Expand Down

0 comments on commit 3da40ac

Please sign in to comment.