Skip to content

Commit

Permalink
Fix some tests & bugs causing failures
Browse files Browse the repository at this point in the history
  • Loading branch information
reductionista committed Aug 21, 2024
1 parent 628457f commit 29e383a
Show file tree
Hide file tree
Showing 3 changed files with 97 additions and 96 deletions.
61 changes: 34 additions & 27 deletions core/chains/evm/logpoller/orm.go
Original file line number Diff line number Diff line change
Expand Up @@ -186,27 +186,33 @@ func (o *DSORM) LoadFilters(ctx context.Context) (map[string]Filter, error) {
return filters, err
}

const (
blocksFields = `evm_chain_id, block_hash, block_number, block_timestamp, finalized_block_number, created_at`
logsFields = `evm_chain_id, log_index, block_hash, block_number, address, event_sig, topics, tx_hash, data, created_at, block_timestamp`
)

func blocksQuery(clause string) string {
return fmt.Sprintf(`SELECT %s FROM evm.log_poller_blocks %s`, blocksFields, clause)
return fmt.Sprintf(`SELECT %s FROM evm.log_poller_blocks %s`, strings.Join(blocksFields[:], ", "), clause)
}
func logsQuery(clause string) string {
return fmt.Sprintf(`SELECT %s FROM evm.logs %s`, logsFields, clause)
return fmt.Sprintf(`SELECT %s FROM evm.logs %s`, strings.Join(logsFields[:], ", "), clause)
}

func logsQueryWithTablePrefix(schema string, clause string) string {
var s strings.Builder
for i, field := range logsFields {
if i > 0 {
s.WriteString(", ")
}
s.WriteString(fmt.Sprintf("%s.%s", schema, field))
}
return fmt.Sprintf(`SELECT %s FROM evm.logs AS %s %s`, s.String(), schema, clause)
}

func withConfs(query string, confs evmtypes.Confirmations) string {
var lastConfirmedBlock string

if confs == evmtypes.Finalized {
lastConfirmedBlock = `MAX(finalized_block)`
lastConfirmedBlock = `MAX(finalized_block_number)`
} else {
lastConfirmedBlock = `MAX(block_number) - :confs`
}
return fmt.Sprintf(`%s AND block_number <= (
return fmt.Sprintf(`%sblock_number <= (
SELECT %s
FROM evm.log_poller_blocks
WHERE evm_chain_id = :evm_chain_id)`, query, lastConfirmedBlock)
Expand Down Expand Up @@ -236,7 +242,7 @@ func (o *DSORM) SelectBlockByHash(ctx context.Context, hash common.Hash) (*LogPo
func (o *DSORM) SelectBlockByNumber(ctx context.Context, n int64) (*LogPollerBlock, error) {
var b LogPollerBlock
if err := o.ds.GetContext(ctx, &b,
blocksQuery(`WHERE block_number = $1 AND evm_chain_id = $2)`), n, ubig.New(o.chainID),
blocksQuery(`WHERE block_number = $1 AND evm_chain_id = $2`), n, ubig.New(o.chainID),
); err != nil {
return nil, err
}
Expand Down Expand Up @@ -274,7 +280,7 @@ func (o *DSORM) SelectLatestLogByEventSigWithConfs(ctx context.Context, eventSig
query := logsQueryWithConfs(
`WHERE evm_chain_id = :evm_chain_id
AND event_sig = :event_sig
AND address = :address`, confs) +
AND address = :address AND `, confs) +
`ORDER BY block_number desc, log_index DESC LIMIT 1`
var l Log

Expand Down Expand Up @@ -578,7 +584,7 @@ func (o *DSORM) SelectLogsCreatedAfter(ctx context.Context, address common.Addre
`WHERE evm_chain_id = :evm_chain_id
AND address = :address
AND event_sig = :event_sig
AND block_timestamp > :block_timestamp_after`, confs) +
AND block_timestamp > :block_timestamp_after AND `, confs) +
`ORDER BY block_number, log_index`

var logs []Log
Expand Down Expand Up @@ -670,7 +676,7 @@ func (o *DSORM) SelectLatestLogEventSigsAddrsWithConfs(ctx context.Context, from
WHERE evm_chain_id = :evm_chain_id
AND event_sig = ANY(:event_sig_array)
AND address = ANY(:address_array)
AND block_number > :start_block`, confs) +
AND block_number > :start_block AND `, confs) +
`GROUP BY event_sig, address)
ORDER BY block_number ASC`

Expand Down Expand Up @@ -701,7 +707,7 @@ func (o *DSORM) SelectLatestBlockByEventSigsAddrsWithConfs(ctx context.Context,
WHERE evm_chain_id = :evm_chain_id
AND event_sig = ANY(:event_sig_array)
AND address = ANY(:address_array)
AND block_number > :start_block`, confs)
AND block_number > :start_block AND `, confs)

var blockNumber int64
query, sqlArgs, err := o.ds.BindNamed(query, args)
Expand Down Expand Up @@ -730,7 +736,7 @@ func (o *DSORM) SelectLogsDataWordRange(ctx context.Context, address common.Addr
AND address = :address
AND event_sig = :event_sig
AND substring(data from 32*:word_index+1 for 32) >= :word_value_min
AND substring(data from 32*:word_index+1 for 32) <= :word_value_max`, confs) +
AND substring(data from 32*:word_index+1 for 32) <= :word_value_max AND `, confs) +
`ORDER BY block_number, log_index`

var logs []Log
Expand Down Expand Up @@ -758,7 +764,7 @@ func (o *DSORM) SelectLogsDataWordGreaterThan(ctx context.Context, address commo
query := logsQueryWithConfs(`WHERE evm_chain_id = :evm_chain_id
AND address = :address
AND event_sig = :event_sig
AND substring(data from 32*:word_index+1 for 32) >= :word_value_min`, confs) +
AND substring(data from 32*:word_index+1 for 32) >= :word_value_min AND `, confs) +
`ORDER BY block_number, log_index`

var logs []Log
Expand Down Expand Up @@ -787,7 +793,7 @@ func (o *DSORM) SelectLogsDataWordBetween(ctx context.Context, address common.Ad
AND address = :address
AND event_sig = :event_sig
AND substring(data from 32*:word_index_min+1 for 32) <= :word_value
AND substring(data from 32*:word_index_max+1 for 32) >= :word_value`, confs) +
AND substring(data from 32*:word_index_max+1 for 32) >= :word_value AND `, confs) +
`ORDER BY block_number, log_index`

var logs []Log
Expand Down Expand Up @@ -815,7 +821,7 @@ func (o *DSORM) SelectIndexedLogsTopicGreaterThan(ctx context.Context, address c
query := logsQueryWithConfs(`WHERE evm_chain_id = :evm_chain_id
AND address = :address
AND event_sig = :event_sig
AND topics[:topic_index] >= :topic_value_min`, confs) +
AND topics[:topic_index] >= :topic_value_min AND `, confs) +
`ORDER BY block_number, log_index`

var logs []Log
Expand Down Expand Up @@ -845,7 +851,7 @@ func (o *DSORM) SelectIndexedLogsTopicRange(ctx context.Context, address common.
AND address = :address
AND event_sig = :event_sig
AND topics[:topic_index] >= :topic_value_min
AND topics[:topic_index] <= :topic_value_max`, confs) +
AND topics[:topic_index] <= :topic_value_max AND `, confs) +
`ORDER BY block_number, log_index`

var logs []Log
Expand Down Expand Up @@ -874,7 +880,7 @@ func (o *DSORM) SelectIndexedLogs(ctx context.Context, address common.Address, e
WHERE evm_chain_id = :evm_chain_id
AND address = :address
AND event_sig = :event_sig
AND topics[:topic_index] = ANY(:topic_values)`, confs) +
AND topics[:topic_index] = ANY(:topic_values) AND `, confs) +
`ORDER BY block_number, log_index`

var logs []Log
Expand Down Expand Up @@ -939,7 +945,7 @@ func (o *DSORM) SelectIndexedLogsCreatedAfter(ctx context.Context, address commo
AND address = :address
AND event_sig = :event_sig
AND topics[:topic_index] = ANY(:topic_values)
AND block_timestamp > :block_timestamp_after`, confs) +
AND block_timestamp > :block_timestamp_after AND `, confs) +
`ORDER BY block_number, log_index`

var logs []Log
Expand Down Expand Up @@ -1003,17 +1009,18 @@ func (o *DSORM) SelectIndexedLogsWithSigsExcluding(ctx context.Context, sigA, si
WHERE evm_chain_id = :evm_chain_id
AND address = :address
AND event_sig = :sigA
AND block_number BETWEEN :start_block AND :end_block`, confs) +
withConfs(`EXCEPT
SELECT a.* FROM evm.logs AS a
INNER JOIN evm.logs B
AND block_number BETWEEN :start_block AND :end_block AND `, confs) +
` EXCEPT ` +
withConfs(logsQueryWithTablePrefix("a", `
INNER JOIN evm.logs AS b
ON a.evm_chain_id = b.evm_chain_id
AND a.address = b.address
AND a.topics[:topic_index] = b.topics[:topic_index]
AND a.event_sig = :sigA
AND b.event_sig = :sigB
AND b.block_number BETWEEN :start_block AND :end_block`, confs) +
`ORDER BY block_number, log_index`
AND b.block_number BETWEEN :start_block AND :end_block
AND b.`), confs) +
` ORDER BY block_number, log_index`

var logs []Log
query, sqlArgs, err := o.ds.BindNamed(query, args)
Expand Down
7 changes: 6 additions & 1 deletion core/chains/evm/logpoller/parser.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (

"github.com/smartcontractkit/chainlink-common/pkg/types/query"
"github.com/smartcontractkit/chainlink-common/pkg/types/query/primitives"

evmtypes "github.com/smartcontractkit/chainlink/v2/core/chains/evm/types"
)

Expand All @@ -26,6 +27,10 @@ const (

var (
ErrUnexpectedCursorFormat = errors.New("unexpected cursor format")
logsFields = [...]string{"evm_chain_id", "log_index", "block_hash", "block_number",
"address", "event_sig", "topics", "tx_hash", "data", "created_at", "block_timestamp"}
blocksFields = [...]string{"evm_chain_id", "block_hash", "block_number", "block_timestamp",
"finalized_block_number", "created_at"}
)

// The parser builds SQL expressions piece by piece for each Accept function call and resets the error and expression
Expand Down Expand Up @@ -220,7 +225,7 @@ func (v *pgDSLParser) buildQuery(chainID *big.Int, expressions []query.Expressio
v.err = nil

// build the query string
clauses := []string{"SELECT evm.logs.* FROM evm.logs"}
clauses := []string{logsQuery("")}

where, err := v.whereClause(expressions, limiter)
if err != nil {
Expand Down
Loading

0 comments on commit 29e383a

Please sign in to comment.