Skip to content

Commit

Permalink
Define and implement QueryKey Cursors (#13385)
Browse files Browse the repository at this point in the history
* Define and implement QueryKey Cursors

Cursors in the EVM implementation of QueryKey are constructed to match the current
default sorting in LogPoller using block number and log index. Using both of these
pieces of data together uniquely identifies a log within the LogPoller database.

* add tx hash to SQL query

* modify cursor query for accurate block and log querying
  • Loading branch information
EasterTheBunny authored Jun 17, 2024
1 parent facd3b9 commit ce08fc3
Show file tree
Hide file tree
Showing 2 changed files with 55 additions and 30 deletions.
57 changes: 40 additions & 17 deletions core/chains/evm/logpoller/parser.go
Original file line number Diff line number Diff line change
Expand Up @@ -249,11 +249,15 @@ func (v *pgDSLParser) whereClause(expressions []query.Expression, limiter query.
segment := "WHERE evm_chain_id = :evm_chain_id"

if len(expressions) > 0 {
exp, err := v.combineExpressions(expressions, query.AND)
exp, hasFinalized, err := v.combineExpressions(expressions, query.AND)
if err != nil {
return "", err
}

if limiter.HasCursorLimit() && !hasFinalized {
return "", errors.New("cursor-base queries limited to only finalized blocks")
}

segment = fmt.Sprintf("%s AND %s", segment, exp)
}

Expand All @@ -268,15 +272,14 @@ func (v *pgDSLParser) whereClause(expressions []query.Expression, limiter query.
return "", errors.New("invalid cursor direction")
}

block, txHash, logIdx, err := valuesFromCursor(limiter.Limit.Cursor)
block, logIdx, _, err := valuesFromCursor(limiter.Limit.Cursor)
if err != nil {
return "", err
}

segment = fmt.Sprintf("%s AND block_number %s= :cursor_block AND tx_hash %s= :cursor_txhash AND log_index %s :cursor_log_index", segment, op, op, op)
segment = fmt.Sprintf("%s AND (block_number %s :cursor_block_number OR (block_number = :cursor_block_number AND log_index %s :cursor_log_index))", segment, op, op)

v.args.withField("cursor_block_number", block).
withField("cursor_txhash", common.HexToHash(txHash)).
withField("cursor_log_index", logIdx)
}

Expand Down Expand Up @@ -319,7 +322,7 @@ func (v *pgDSLParser) orderClause(limiter query.LimitAndSort) (string, error) {
case query.SortByBlock:
name = blockFieldName
case query.SortBySequence:
sort[idx] = fmt.Sprintf("block_number %s, tx_hash %s, log_index %s", order, order, order)
sort[idx] = fmt.Sprintf("block_number %s, log_index %s, tx_hash %s", order, order, order)

continue
case query.SortByTimestamp:
Expand Down Expand Up @@ -352,24 +355,37 @@ func (v *pgDSLParser) getLastExpression() (string, error) {
return exp, err
}

func (v *pgDSLParser) combineExpressions(expressions []query.Expression, op query.BoolOperator) (string, error) {
func (v *pgDSLParser) combineExpressions(expressions []query.Expression, op query.BoolOperator) (string, bool, error) {
grouped := len(expressions) > 1
clauses := make([]string, len(expressions))

var isFinalized bool

for idx, exp := range expressions {
if exp.IsPrimitive() {
exp.Primitive.Accept(v)

switch prim := exp.Primitive.(type) {
case *primitives.Confidence:
isFinalized = prim.ConfidenceLevel == primitives.Finalized
case *confirmationsFilter:
isFinalized = prim.Confirmations == evmtypes.Finalized
}

clause, err := v.getLastExpression()
if err != nil {
return "", err
return "", isFinalized, err
}

clauses[idx] = clause
} else {
clause, err := v.combineExpressions(exp.BoolExpression.Expressions, exp.BoolExpression.BoolOperator)
clause, fin, err := v.combineExpressions(exp.BoolExpression.Expressions, exp.BoolExpression.BoolOperator)
if err != nil {
return "", err
return "", isFinalized, err
}

if fin {
isFinalized = fin
}

clauses[idx] = clause
Expand All @@ -382,7 +398,7 @@ func (v *pgDSLParser) combineExpressions(expressions []query.Expression, op quer
output = fmt.Sprintf("(%s)", output)
}

return output, nil
return output, isFinalized, nil
}

func cmpOpToString(op primitives.ComparisonOperator) (string, error) {
Expand Down Expand Up @@ -415,23 +431,30 @@ func orderToString(dir query.SortDirection) (string, error) {
}
}

func valuesFromCursor(cursor string) (int64, string, int, error) {
func valuesFromCursor(cursor string) (int64, int, []byte, error) {
partCount := 3

parts := strings.Split(cursor, "-")
if len(parts) != 3 {
return 0, "", 0, fmt.Errorf("%w: must be composed as block-txhash-logindex", ErrUnexpectedCursorFormat)
if len(parts) != partCount {
return 0, 0, nil, fmt.Errorf("%w: must be composed as block-logindex-txHash", ErrUnexpectedCursorFormat)
}

block, err := strconv.ParseInt(parts[0], 10, 64)
if err != nil {
return 0, "", 0, fmt.Errorf("%w: block number not parsable as int64", ErrUnexpectedCursorFormat)
return 0, 0, nil, fmt.Errorf("%w: block number not parsable as int64", ErrUnexpectedCursorFormat)
}

logIdx, err := strconv.ParseInt(parts[1], 10, 32)
if err != nil {
return 0, 0, nil, fmt.Errorf("%w: log index not parsable as int", ErrUnexpectedCursorFormat)
}

logIdx, err := strconv.ParseInt(parts[2], 10, 32)
txHash, err := hexutil.Decode(parts[2])
if err != nil {
return 0, "", 0, fmt.Errorf("%w: log index not parsable as int", ErrUnexpectedCursorFormat)
return 0, 0, nil, fmt.Errorf("%w: invalid transaction hash: %s", ErrUnexpectedCursorFormat, err.Error())
}

return block, parts[1], int(logIdx), nil
return block, int(logIdx), txHash, nil
}

type addressFilter struct {
Expand Down
28 changes: 15 additions & 13 deletions core/chains/evm/logpoller/parser_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (

"github.com/smartcontractkit/chainlink-common/pkg/types/query"
"github.com/smartcontractkit/chainlink-common/pkg/types/query/primitives"
"github.com/smartcontractkit/chainlink/v2/core/chains/evm/types"
)

func assertArgs(t *testing.T, args *queryArgs, numVals int) {
Expand Down Expand Up @@ -46,22 +47,25 @@ func TestDSLParser(t *testing.T) {
expressions := []query.Expression{
NewAddressFilter(common.HexToAddress("0x42")),
NewEventSigFilter(common.HexToHash("0x21")),
NewConfirmationsFilter(types.Finalized),
}
limiter := query.NewLimitAndSort(query.CursorLimit("10-0x42-5", query.CursorFollowing, 20))
limiter := query.NewLimitAndSort(query.CursorLimit("10-5-0x42", query.CursorFollowing, 20))

result, args, err := parser.buildQuery(chainID, expressions, limiter)
expected := "SELECT evm.logs.* " +
"FROM evm.logs " +
"WHERE evm_chain_id = :evm_chain_id " +
"AND (address = :address_0 AND event_sig = :event_sig_0) " +
"AND block_number >= :cursor_block AND tx_hash >= :cursor_txhash AND log_index > :cursor_log_index " +
"ORDER BY block_number ASC, tx_hash ASC, log_index ASC " +
"AND (address = :address_0 AND event_sig = :event_sig_0 " +
"AND block_number <= " +
"(SELECT finalized_block_number FROM evm.log_poller_blocks WHERE evm_chain_id = :evm_chain_id ORDER BY block_number DESC LIMIT 1)) " +
"AND (block_number > :cursor_block_number OR (block_number = :cursor_block_number AND log_index > :cursor_log_index)) " +
"ORDER BY block_number ASC, log_index ASC, tx_hash ASC " +
"LIMIT 20"

require.NoError(t, err)
assert.Equal(t, expected, result)

assertArgs(t, args, 6)
assertArgs(t, args, 5)
})

t.Run("query with limit and no order by", func(t *testing.T) {
Expand Down Expand Up @@ -100,7 +104,7 @@ func TestDSLParser(t *testing.T) {
expected := "SELECT evm.logs.* " +
"FROM evm.logs " +
"WHERE evm_chain_id = :evm_chain_id " +
"ORDER BY block_number DESC, tx_hash DESC, log_index DESC"
"ORDER BY block_number DESC, log_index DESC, tx_hash DESC"

require.NoError(t, err)
assert.Equal(t, expected, result)
Expand Down Expand Up @@ -138,9 +142,8 @@ func TestDSLParser(t *testing.T) {
query.TxHash(common.HexToHash("0x84").String()),
query.Block(99, primitives.Neq),
query.Confidence(primitives.Finalized),
query.Confidence(primitives.Unconfirmed),
}
limiter := query.NewLimitAndSort(query.CursorLimit("10-0x42-20", query.CursorPrevious, 20))
limiter := query.NewLimitAndSort(query.CursorLimit("10-20-0x42", query.CursorPrevious, 20))

result, args, err := parser.buildQuery(chainID, expressions, limiter)
expected := "SELECT evm.logs.* " +
Expand All @@ -150,15 +153,14 @@ func TestDSLParser(t *testing.T) {
"AND tx_hash = :tx_hash_0 " +
"AND block_number != :block_number_0 " +
"AND block_number <= " +
"(SELECT finalized_block_number FROM evm.log_poller_blocks WHERE evm_chain_id = :evm_chain_id ORDER BY block_number DESC LIMIT 1) " +
"AND block_number <= (SELECT greatest(block_number - :confs_0, 0) FROM evm.log_poller_blocks WHERE evm_chain_id = :evm_chain_id ORDER BY block_number DESC LIMIT 1)) " +
"AND block_number <= :cursor_block AND tx_hash <= :cursor_txhash AND log_index < :cursor_log_index " +
"ORDER BY block_number DESC, tx_hash DESC, log_index DESC LIMIT 20"
"(SELECT finalized_block_number FROM evm.log_poller_blocks WHERE evm_chain_id = :evm_chain_id ORDER BY block_number DESC LIMIT 1)) " +
"AND (block_number < :cursor_block_number OR (block_number = :cursor_block_number AND log_index < :cursor_log_index)) " +
"ORDER BY block_number DESC, log_index DESC, tx_hash DESC LIMIT 20"

require.NoError(t, err)
assert.Equal(t, expected, result)

assertArgs(t, args, 8)
assertArgs(t, args, 6)
})

t.Run("query for finality", func(t *testing.T) {
Expand Down

0 comments on commit ce08fc3

Please sign in to comment.