From ce08fc3c4172f4d9f5a480ec3339b2a044dc5d9d Mon Sep 17 00:00:00 2001 From: Awbrey Hughlett Date: Mon, 17 Jun 2024 17:47:09 -0500 Subject: [PATCH] Define and implement QueryKey Cursors (#13385) * Define and implement QueryKey Cursors Cursors in the EVM implementation of QueryKey are constructed to match the current default sorting in LogPoller using block number and log index. Using both of these pieces of data together uniquely identifies a log within the LogPoller database. * add tx hash to SQL query * modify cursor query for accurate block and log querying --- core/chains/evm/logpoller/parser.go | 57 +++++++++++++++++------- core/chains/evm/logpoller/parser_test.go | 28 ++++++------ 2 files changed, 55 insertions(+), 30 deletions(-) diff --git a/core/chains/evm/logpoller/parser.go b/core/chains/evm/logpoller/parser.go index 2e17c92ed60..3860fb2ff12 100644 --- a/core/chains/evm/logpoller/parser.go +++ b/core/chains/evm/logpoller/parser.go @@ -249,11 +249,15 @@ func (v *pgDSLParser) whereClause(expressions []query.Expression, limiter query. segment := "WHERE evm_chain_id = :evm_chain_id" if len(expressions) > 0 { - exp, err := v.combineExpressions(expressions, query.AND) + exp, hasFinalized, err := v.combineExpressions(expressions, query.AND) if err != nil { return "", err } + if limiter.HasCursorLimit() && !hasFinalized { + return "", errors.New("cursor-base queries limited to only finalized blocks") + } + segment = fmt.Sprintf("%s AND %s", segment, exp) } @@ -268,15 +272,14 @@ func (v *pgDSLParser) whereClause(expressions []query.Expression, limiter query. return "", errors.New("invalid cursor direction") } - block, txHash, logIdx, err := valuesFromCursor(limiter.Limit.Cursor) + block, logIdx, _, err := valuesFromCursor(limiter.Limit.Cursor) if err != nil { return "", err } - segment = fmt.Sprintf("%s AND block_number %s= :cursor_block AND tx_hash %s= :cursor_txhash AND log_index %s :cursor_log_index", segment, op, op, op) + segment = fmt.Sprintf("%s AND (block_number %s :cursor_block_number OR (block_number = :cursor_block_number AND log_index %s :cursor_log_index))", segment, op, op) v.args.withField("cursor_block_number", block). - withField("cursor_txhash", common.HexToHash(txHash)). withField("cursor_log_index", logIdx) } @@ -319,7 +322,7 @@ func (v *pgDSLParser) orderClause(limiter query.LimitAndSort) (string, error) { case query.SortByBlock: name = blockFieldName case query.SortBySequence: - sort[idx] = fmt.Sprintf("block_number %s, tx_hash %s, log_index %s", order, order, order) + sort[idx] = fmt.Sprintf("block_number %s, log_index %s, tx_hash %s", order, order, order) continue case query.SortByTimestamp: @@ -352,24 +355,37 @@ func (v *pgDSLParser) getLastExpression() (string, error) { return exp, err } -func (v *pgDSLParser) combineExpressions(expressions []query.Expression, op query.BoolOperator) (string, error) { +func (v *pgDSLParser) combineExpressions(expressions []query.Expression, op query.BoolOperator) (string, bool, error) { grouped := len(expressions) > 1 clauses := make([]string, len(expressions)) + var isFinalized bool + for idx, exp := range expressions { if exp.IsPrimitive() { exp.Primitive.Accept(v) + switch prim := exp.Primitive.(type) { + case *primitives.Confidence: + isFinalized = prim.ConfidenceLevel == primitives.Finalized + case *confirmationsFilter: + isFinalized = prim.Confirmations == evmtypes.Finalized + } + clause, err := v.getLastExpression() if err != nil { - return "", err + return "", isFinalized, err } clauses[idx] = clause } else { - clause, err := v.combineExpressions(exp.BoolExpression.Expressions, exp.BoolExpression.BoolOperator) + clause, fin, err := v.combineExpressions(exp.BoolExpression.Expressions, exp.BoolExpression.BoolOperator) if err != nil { - return "", err + return "", isFinalized, err + } + + if fin { + isFinalized = fin } clauses[idx] = clause @@ -382,7 +398,7 @@ func (v *pgDSLParser) combineExpressions(expressions []query.Expression, op quer output = fmt.Sprintf("(%s)", output) } - return output, nil + return output, isFinalized, nil } func cmpOpToString(op primitives.ComparisonOperator) (string, error) { @@ -415,23 +431,30 @@ func orderToString(dir query.SortDirection) (string, error) { } } -func valuesFromCursor(cursor string) (int64, string, int, error) { +func valuesFromCursor(cursor string) (int64, int, []byte, error) { + partCount := 3 + parts := strings.Split(cursor, "-") - if len(parts) != 3 { - return 0, "", 0, fmt.Errorf("%w: must be composed as block-txhash-logindex", ErrUnexpectedCursorFormat) + if len(parts) != partCount { + return 0, 0, nil, fmt.Errorf("%w: must be composed as block-logindex-txHash", ErrUnexpectedCursorFormat) } block, err := strconv.ParseInt(parts[0], 10, 64) if err != nil { - return 0, "", 0, fmt.Errorf("%w: block number not parsable as int64", ErrUnexpectedCursorFormat) + return 0, 0, nil, fmt.Errorf("%w: block number not parsable as int64", ErrUnexpectedCursorFormat) + } + + logIdx, err := strconv.ParseInt(parts[1], 10, 32) + if err != nil { + return 0, 0, nil, fmt.Errorf("%w: log index not parsable as int", ErrUnexpectedCursorFormat) } - logIdx, err := strconv.ParseInt(parts[2], 10, 32) + txHash, err := hexutil.Decode(parts[2]) if err != nil { - return 0, "", 0, fmt.Errorf("%w: log index not parsable as int", ErrUnexpectedCursorFormat) + return 0, 0, nil, fmt.Errorf("%w: invalid transaction hash: %s", ErrUnexpectedCursorFormat, err.Error()) } - return block, parts[1], int(logIdx), nil + return block, int(logIdx), txHash, nil } type addressFilter struct { diff --git a/core/chains/evm/logpoller/parser_test.go b/core/chains/evm/logpoller/parser_test.go index 521aec14d8c..a12455ea5dd 100644 --- a/core/chains/evm/logpoller/parser_test.go +++ b/core/chains/evm/logpoller/parser_test.go @@ -10,6 +10,7 @@ import ( "github.com/smartcontractkit/chainlink-common/pkg/types/query" "github.com/smartcontractkit/chainlink-common/pkg/types/query/primitives" + "github.com/smartcontractkit/chainlink/v2/core/chains/evm/types" ) func assertArgs(t *testing.T, args *queryArgs, numVals int) { @@ -46,22 +47,25 @@ func TestDSLParser(t *testing.T) { expressions := []query.Expression{ NewAddressFilter(common.HexToAddress("0x42")), NewEventSigFilter(common.HexToHash("0x21")), + NewConfirmationsFilter(types.Finalized), } - limiter := query.NewLimitAndSort(query.CursorLimit("10-0x42-5", query.CursorFollowing, 20)) + limiter := query.NewLimitAndSort(query.CursorLimit("10-5-0x42", query.CursorFollowing, 20)) result, args, err := parser.buildQuery(chainID, expressions, limiter) expected := "SELECT evm.logs.* " + "FROM evm.logs " + "WHERE evm_chain_id = :evm_chain_id " + - "AND (address = :address_0 AND event_sig = :event_sig_0) " + - "AND block_number >= :cursor_block AND tx_hash >= :cursor_txhash AND log_index > :cursor_log_index " + - "ORDER BY block_number ASC, tx_hash ASC, log_index ASC " + + "AND (address = :address_0 AND event_sig = :event_sig_0 " + + "AND block_number <= " + + "(SELECT finalized_block_number FROM evm.log_poller_blocks WHERE evm_chain_id = :evm_chain_id ORDER BY block_number DESC LIMIT 1)) " + + "AND (block_number > :cursor_block_number OR (block_number = :cursor_block_number AND log_index > :cursor_log_index)) " + + "ORDER BY block_number ASC, log_index ASC, tx_hash ASC " + "LIMIT 20" require.NoError(t, err) assert.Equal(t, expected, result) - assertArgs(t, args, 6) + assertArgs(t, args, 5) }) t.Run("query with limit and no order by", func(t *testing.T) { @@ -100,7 +104,7 @@ func TestDSLParser(t *testing.T) { expected := "SELECT evm.logs.* " + "FROM evm.logs " + "WHERE evm_chain_id = :evm_chain_id " + - "ORDER BY block_number DESC, tx_hash DESC, log_index DESC" + "ORDER BY block_number DESC, log_index DESC, tx_hash DESC" require.NoError(t, err) assert.Equal(t, expected, result) @@ -138,9 +142,8 @@ func TestDSLParser(t *testing.T) { query.TxHash(common.HexToHash("0x84").String()), query.Block(99, primitives.Neq), query.Confidence(primitives.Finalized), - query.Confidence(primitives.Unconfirmed), } - limiter := query.NewLimitAndSort(query.CursorLimit("10-0x42-20", query.CursorPrevious, 20)) + limiter := query.NewLimitAndSort(query.CursorLimit("10-20-0x42", query.CursorPrevious, 20)) result, args, err := parser.buildQuery(chainID, expressions, limiter) expected := "SELECT evm.logs.* " + @@ -150,15 +153,14 @@ func TestDSLParser(t *testing.T) { "AND tx_hash = :tx_hash_0 " + "AND block_number != :block_number_0 " + "AND block_number <= " + - "(SELECT finalized_block_number FROM evm.log_poller_blocks WHERE evm_chain_id = :evm_chain_id ORDER BY block_number DESC LIMIT 1) " + - "AND block_number <= (SELECT greatest(block_number - :confs_0, 0) FROM evm.log_poller_blocks WHERE evm_chain_id = :evm_chain_id ORDER BY block_number DESC LIMIT 1)) " + - "AND block_number <= :cursor_block AND tx_hash <= :cursor_txhash AND log_index < :cursor_log_index " + - "ORDER BY block_number DESC, tx_hash DESC, log_index DESC LIMIT 20" + "(SELECT finalized_block_number FROM evm.log_poller_blocks WHERE evm_chain_id = :evm_chain_id ORDER BY block_number DESC LIMIT 1)) " + + "AND (block_number < :cursor_block_number OR (block_number = :cursor_block_number AND log_index < :cursor_log_index)) " + + "ORDER BY block_number DESC, log_index DESC, tx_hash DESC LIMIT 20" require.NoError(t, err) assert.Equal(t, expected, result) - assertArgs(t, args, 8) + assertArgs(t, args, 6) }) t.Run("query for finality", func(t *testing.T) {