diff --git a/core/chains/evm/logpoller/parser.go b/core/chains/evm/logpoller/parser.go index 2e17c92ed60..3860fb2ff12 100644 --- a/core/chains/evm/logpoller/parser.go +++ b/core/chains/evm/logpoller/parser.go @@ -249,11 +249,15 @@ func (v *pgDSLParser) whereClause(expressions []query.Expression, limiter query. segment := "WHERE evm_chain_id = :evm_chain_id" if len(expressions) > 0 { - exp, err := v.combineExpressions(expressions, query.AND) + exp, hasFinalized, err := v.combineExpressions(expressions, query.AND) if err != nil { return "", err } + if limiter.HasCursorLimit() && !hasFinalized { + return "", errors.New("cursor-base queries limited to only finalized blocks") + } + segment = fmt.Sprintf("%s AND %s", segment, exp) } @@ -268,15 +272,14 @@ func (v *pgDSLParser) whereClause(expressions []query.Expression, limiter query. return "", errors.New("invalid cursor direction") } - block, txHash, logIdx, err := valuesFromCursor(limiter.Limit.Cursor) + block, logIdx, _, err := valuesFromCursor(limiter.Limit.Cursor) if err != nil { return "", err } - segment = fmt.Sprintf("%s AND block_number %s= :cursor_block AND tx_hash %s= :cursor_txhash AND log_index %s :cursor_log_index", segment, op, op, op) + segment = fmt.Sprintf("%s AND (block_number %s :cursor_block_number OR (block_number = :cursor_block_number AND log_index %s :cursor_log_index))", segment, op, op) v.args.withField("cursor_block_number", block). - withField("cursor_txhash", common.HexToHash(txHash)). withField("cursor_log_index", logIdx) } @@ -319,7 +322,7 @@ func (v *pgDSLParser) orderClause(limiter query.LimitAndSort) (string, error) { case query.SortByBlock: name = blockFieldName case query.SortBySequence: - sort[idx] = fmt.Sprintf("block_number %s, tx_hash %s, log_index %s", order, order, order) + sort[idx] = fmt.Sprintf("block_number %s, log_index %s, tx_hash %s", order, order, order) continue case query.SortByTimestamp: @@ -352,24 +355,37 @@ func (v *pgDSLParser) getLastExpression() (string, error) { return exp, err } -func (v *pgDSLParser) combineExpressions(expressions []query.Expression, op query.BoolOperator) (string, error) { +func (v *pgDSLParser) combineExpressions(expressions []query.Expression, op query.BoolOperator) (string, bool, error) { grouped := len(expressions) > 1 clauses := make([]string, len(expressions)) + var isFinalized bool + for idx, exp := range expressions { if exp.IsPrimitive() { exp.Primitive.Accept(v) + switch prim := exp.Primitive.(type) { + case *primitives.Confidence: + isFinalized = prim.ConfidenceLevel == primitives.Finalized + case *confirmationsFilter: + isFinalized = prim.Confirmations == evmtypes.Finalized + } + clause, err := v.getLastExpression() if err != nil { - return "", err + return "", isFinalized, err } clauses[idx] = clause } else { - clause, err := v.combineExpressions(exp.BoolExpression.Expressions, exp.BoolExpression.BoolOperator) + clause, fin, err := v.combineExpressions(exp.BoolExpression.Expressions, exp.BoolExpression.BoolOperator) if err != nil { - return "", err + return "", isFinalized, err + } + + if fin { + isFinalized = fin } clauses[idx] = clause @@ -382,7 +398,7 @@ func (v *pgDSLParser) combineExpressions(expressions []query.Expression, op quer output = fmt.Sprintf("(%s)", output) } - return output, nil + return output, isFinalized, nil } func cmpOpToString(op primitives.ComparisonOperator) (string, error) { @@ -415,23 +431,30 @@ func orderToString(dir query.SortDirection) (string, error) { } } -func valuesFromCursor(cursor string) (int64, string, int, error) { +func valuesFromCursor(cursor string) (int64, int, []byte, error) { + partCount := 3 + parts := strings.Split(cursor, "-") - if len(parts) != 3 { - return 0, "", 0, fmt.Errorf("%w: must be composed as block-txhash-logindex", ErrUnexpectedCursorFormat) + if len(parts) != partCount { + return 0, 0, nil, fmt.Errorf("%w: must be composed as block-logindex-txHash", ErrUnexpectedCursorFormat) } block, err := strconv.ParseInt(parts[0], 10, 64) if err != nil { - return 0, "", 0, fmt.Errorf("%w: block number not parsable as int64", ErrUnexpectedCursorFormat) + return 0, 0, nil, fmt.Errorf("%w: block number not parsable as int64", ErrUnexpectedCursorFormat) + } + + logIdx, err := strconv.ParseInt(parts[1], 10, 32) + if err != nil { + return 0, 0, nil, fmt.Errorf("%w: log index not parsable as int", ErrUnexpectedCursorFormat) } - logIdx, err := strconv.ParseInt(parts[2], 10, 32) + txHash, err := hexutil.Decode(parts[2]) if err != nil { - return 0, "", 0, fmt.Errorf("%w: log index not parsable as int", ErrUnexpectedCursorFormat) + return 0, 0, nil, fmt.Errorf("%w: invalid transaction hash: %s", ErrUnexpectedCursorFormat, err.Error()) } - return block, parts[1], int(logIdx), nil + return block, int(logIdx), txHash, nil } type addressFilter struct { diff --git a/core/chains/evm/logpoller/parser_test.go b/core/chains/evm/logpoller/parser_test.go index 521aec14d8c..a12455ea5dd 100644 --- a/core/chains/evm/logpoller/parser_test.go +++ b/core/chains/evm/logpoller/parser_test.go @@ -10,6 +10,7 @@ import ( "github.com/smartcontractkit/chainlink-common/pkg/types/query" "github.com/smartcontractkit/chainlink-common/pkg/types/query/primitives" + "github.com/smartcontractkit/chainlink/v2/core/chains/evm/types" ) func assertArgs(t *testing.T, args *queryArgs, numVals int) { @@ -46,22 +47,25 @@ func TestDSLParser(t *testing.T) { expressions := []query.Expression{ NewAddressFilter(common.HexToAddress("0x42")), NewEventSigFilter(common.HexToHash("0x21")), + NewConfirmationsFilter(types.Finalized), } - limiter := query.NewLimitAndSort(query.CursorLimit("10-0x42-5", query.CursorFollowing, 20)) + limiter := query.NewLimitAndSort(query.CursorLimit("10-5-0x42", query.CursorFollowing, 20)) result, args, err := parser.buildQuery(chainID, expressions, limiter) expected := "SELECT evm.logs.* " + "FROM evm.logs " + "WHERE evm_chain_id = :evm_chain_id " + - "AND (address = :address_0 AND event_sig = :event_sig_0) " + - "AND block_number >= :cursor_block AND tx_hash >= :cursor_txhash AND log_index > :cursor_log_index " + - "ORDER BY block_number ASC, tx_hash ASC, log_index ASC " + + "AND (address = :address_0 AND event_sig = :event_sig_0 " + + "AND block_number <= " + + "(SELECT finalized_block_number FROM evm.log_poller_blocks WHERE evm_chain_id = :evm_chain_id ORDER BY block_number DESC LIMIT 1)) " + + "AND (block_number > :cursor_block_number OR (block_number = :cursor_block_number AND log_index > :cursor_log_index)) " + + "ORDER BY block_number ASC, log_index ASC, tx_hash ASC " + "LIMIT 20" require.NoError(t, err) assert.Equal(t, expected, result) - assertArgs(t, args, 6) + assertArgs(t, args, 5) }) t.Run("query with limit and no order by", func(t *testing.T) { @@ -100,7 +104,7 @@ func TestDSLParser(t *testing.T) { expected := "SELECT evm.logs.* " + "FROM evm.logs " + "WHERE evm_chain_id = :evm_chain_id " + - "ORDER BY block_number DESC, tx_hash DESC, log_index DESC" + "ORDER BY block_number DESC, log_index DESC, tx_hash DESC" require.NoError(t, err) assert.Equal(t, expected, result) @@ -138,9 +142,8 @@ func TestDSLParser(t *testing.T) { query.TxHash(common.HexToHash("0x84").String()), query.Block(99, primitives.Neq), query.Confidence(primitives.Finalized), - query.Confidence(primitives.Unconfirmed), } - limiter := query.NewLimitAndSort(query.CursorLimit("10-0x42-20", query.CursorPrevious, 20)) + limiter := query.NewLimitAndSort(query.CursorLimit("10-20-0x42", query.CursorPrevious, 20)) result, args, err := parser.buildQuery(chainID, expressions, limiter) expected := "SELECT evm.logs.* " + @@ -150,15 +153,14 @@ func TestDSLParser(t *testing.T) { "AND tx_hash = :tx_hash_0 " + "AND block_number != :block_number_0 " + "AND block_number <= " + - "(SELECT finalized_block_number FROM evm.log_poller_blocks WHERE evm_chain_id = :evm_chain_id ORDER BY block_number DESC LIMIT 1) " + - "AND block_number <= (SELECT greatest(block_number - :confs_0, 0) FROM evm.log_poller_blocks WHERE evm_chain_id = :evm_chain_id ORDER BY block_number DESC LIMIT 1)) " + - "AND block_number <= :cursor_block AND tx_hash <= :cursor_txhash AND log_index < :cursor_log_index " + - "ORDER BY block_number DESC, tx_hash DESC, log_index DESC LIMIT 20" + "(SELECT finalized_block_number FROM evm.log_poller_blocks WHERE evm_chain_id = :evm_chain_id ORDER BY block_number DESC LIMIT 1)) " + + "AND (block_number < :cursor_block_number OR (block_number = :cursor_block_number AND log_index < :cursor_log_index)) " + + "ORDER BY block_number DESC, log_index DESC, tx_hash DESC LIMIT 20" require.NoError(t, err) assert.Equal(t, expected, result) - assertArgs(t, args, 8) + assertArgs(t, args, 6) }) t.Run("query for finality", func(t *testing.T) {