Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

NONEVM-745 LogPoller db models #921

Open
wants to merge 8 commits into
base: develop
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 13 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,10 @@ require (
github.com/google/uuid v1.6.0
github.com/hashicorp/go-plugin v1.6.2
github.com/jpillora/backoff v1.0.0
github.com/lib/pq v1.10.9
github.com/pelletier/go-toml/v2 v2.2.0
github.com/prometheus/client_golang v1.17.0
github.com/smartcontractkit/chainlink-common v0.3.1-0.20241112140826-0e2daed34ef6
github.com/smartcontractkit/chainlink-common v0.3.1-0.20241121230942-6e5941293689
github.com/smartcontractkit/libocr v0.0.0-20241007185508-adbe57025f12
github.com/stretchr/testify v1.9.0
go.uber.org/zap v1.27.0
Expand All @@ -28,7 +29,7 @@ require (

require (
contrib.go.opencensus.io/exporter/stackdriver v0.13.4 // indirect
filippo.io/edwards25519 v1.0.0-rc.1 // indirect
filippo.io/edwards25519 v1.1.0 // indirect
github.com/andres-erbsen/clock v0.0.0-20160526145045-9e14626cd129 // indirect
github.com/aybabtme/rgbterm v0.0.0-20170906152045-cc83f3b3ce59 // indirect
github.com/bahlo/generic-list-go v0.2.0 // indirect
Expand Down Expand Up @@ -59,6 +60,15 @@ require (
github.com/hashicorp/go-hclog v1.5.0 // indirect
github.com/hashicorp/yamux v0.1.1 // indirect
github.com/invopop/jsonschema v0.12.0 // indirect
github.com/jackc/chunkreader/v2 v2.0.1 // indirect
github.com/jackc/pgconn v1.14.3 // indirect
github.com/jackc/pgio v1.0.0 // indirect
github.com/jackc/pgpassfile v1.0.0 // indirect
github.com/jackc/pgproto3/v2 v2.3.3 // indirect
github.com/jackc/pgservicefile v0.0.0-20221227161230-091c0ba34f0a // indirect
github.com/jackc/pgtype v1.14.0 // indirect
github.com/jackc/pgx/v4 v4.18.3 // indirect
github.com/jmoiron/sqlx v1.4.0 // indirect
github.com/json-iterator/go v1.1.12 // indirect
github.com/klauspost/compress v1.15.15 // indirect
github.com/kr/pretty v0.3.1 // indirect
Expand Down Expand Up @@ -87,6 +97,7 @@ require (
github.com/rogpeppe/go-internal v1.13.1 // indirect
github.com/ryanuber/go-glob v1.0.0 // indirect
github.com/santhosh-tekuri/jsonschema/v5 v5.2.0 // indirect
github.com/scylladb/go-reflectx v1.0.1 // indirect
github.com/shopspring/decimal v1.4.0 // indirect
github.com/smartcontractkit/grpc-proxy v0.0.0-20240830132753-a7e17fec5ab7 // indirect
github.com/streamingfast/logging v0.0.0-20220405224725-2755dab2ce75 // indirect
Expand Down
114 changes: 111 additions & 3 deletions go.sum

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion integration-tests/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ require (
github.com/lib/pq v1.10.9
github.com/pelletier/go-toml/v2 v2.2.3
github.com/rs/zerolog v1.33.0
github.com/smartcontractkit/chainlink-common v0.3.1-0.20241112140826-0e2daed34ef6
github.com/smartcontractkit/chainlink-common v0.3.1-0.20241121230942-6e5941293689
github.com/smartcontractkit/chainlink-solana v1.1.1-0.20241104202120-39cabce465f6
github.com/smartcontractkit/chainlink-testing-framework/lib v1.50.13
github.com/smartcontractkit/chainlink-testing-framework/seth v1.50.5
Expand Down
4 changes: 2 additions & 2 deletions integration-tests/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -1386,8 +1386,8 @@ github.com/smartcontractkit/chainlink-automation v1.0.5-0.20241009152924-78acf19
github.com/smartcontractkit/chainlink-automation v1.0.5-0.20241009152924-78acf196c332/go.mod h1:74ly9zfnQ9EwBtHZH46sIAbxQdOnX56fFjjvSQvn53k=
github.com/smartcontractkit/chainlink-ccip v0.0.0-20241106140121-4c9ee21ab422 h1:VfH/AW5NtTmroY9zz6OYCPFbFTqpMyJ2ubgT9ahYf3U=
github.com/smartcontractkit/chainlink-ccip v0.0.0-20241106140121-4c9ee21ab422/go.mod h1:4adKaHNaxFsRvV/lYfqtbsWyyvIPUMLR0FdOJN/ljis=
github.com/smartcontractkit/chainlink-common v0.3.1-0.20241112140826-0e2daed34ef6 h1:yJNBWCdNL/X8+wEs3TGTBe9gssMmw5FTFxxrlo+0mVo=
github.com/smartcontractkit/chainlink-common v0.3.1-0.20241112140826-0e2daed34ef6/go.mod h1:ny87uTW6hLjCTLiBqBRNFEhETSXhHWevYlPclT5lSco=
github.com/smartcontractkit/chainlink-common v0.3.1-0.20241121230942-6e5941293689 h1:AOEncXa2vHMbkoSQLaBLfEH9hDoGoQFsVZItuFFQ36c=
github.com/smartcontractkit/chainlink-common v0.3.1-0.20241121230942-6e5941293689/go.mod h1:H6u1I9PIOErSR8Gy/CA3kGvHrM8g2moEjKPYtJJdmRQ=
github.com/smartcontractkit/chainlink-cosmos v0.5.2-0.20241017133723-5277829bd53f h1:BwrIaQIx5Iy6eT+DfLhFfK2XqjxRm74mVdlX8gbu4dw=
github.com/smartcontractkit/chainlink-cosmos v0.5.2-0.20241017133723-5277829bd53f/go.mod h1:wHtwSR3F1CQSJJZDQKuqaqFYnvkT+kMyget7dl8Clvo=
github.com/smartcontractkit/chainlink-data-streams v0.1.1-0.20241018134907-a00ba3729b5e h1:JiETqdNM0bktAUGMc62COwXIaw3rR3M77Me6bBLG0Fg=
Expand Down
38 changes: 38 additions & 0 deletions pkg/solana/logpoller/models.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
package logpoller

import (
"time"

"github.com/lib/pq"
)

type Filter struct {
ID int64
Name string
Address PublicKey
EventName string
EventSig []byte
StartingBlock int64
EventIDL string
SubkeyPaths SubkeyPaths
Retention time.Duration
MaxLogsKept int64
}

type Log struct {
ID int64
FilterID int64
ChainID string
LogIndex int64
BlockHash Hash
BlockNumber int64
BlockTimestamp time.Time
Address PublicKey
EventSig []byte
SubkeyValues pq.ByteaArray
TxHash Signature
Data []byte
CreatedAt time.Time
ExpiresAt *time.Time
SequenceNum int64
}
158 changes: 158 additions & 0 deletions pkg/solana/logpoller/orm.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,158 @@
package logpoller

import (
"context"
"errors"
"fmt"

"github.com/smartcontractkit/chainlink-common/pkg/logger"
"github.com/smartcontractkit/chainlink-common/pkg/sqlutil"
)

type DSORM struct {
chainID string
ds sqlutil.DataSource
lggr logger.Logger
}

// NewORM creates an DSORM scoped to chainID.
func NewORM(chainID string, ds sqlutil.DataSource, lggr logger.Logger) *DSORM {
return &DSORM{
chainID: chainID,
ds: ds,
lggr: lggr,
}
}

func (o *DSORM) Transact(ctx context.Context, fn func(*DSORM) error) (err error) {
return sqlutil.Transact(ctx, o.new, o.ds, nil, fn)
}

// new returns a NewORM like o, but backed by ds.
func (o *DSORM) new(ds sqlutil.DataSource) *DSORM { return NewORM(o.chainID, ds, o.lggr) }

// InsertFilter is idempotent.
//
// Each address/event pair must have a unique job id, so it may be removed when the job is deleted.
// Returns ID for updated or newly inserted filter.
func (o *DSORM) InsertFilter(ctx context.Context, filter Filter) (id int64, err error) {
args, err := newQueryArgs(o.chainID).
withField("name", filter.Name).
withRetention(filter.Retention).
withMaxLogsKept(filter.MaxLogsKept).
withName(filter.Name).
withAddress(filter.Address).
withEventName(filter.EventName).
withEventSig(filter.EventSig).
withStartingBlock(filter.StartingBlock).
withEventIDL(filter.EventIDL).
withSubkeyPaths(filter.SubkeyPaths).
toArgs()
if err != nil {
return 0, err
}

// '::' has to be escaped in the query string
// https://github.com/jmoiron/sqlx/issues/91, https://github.com/jmoiron/sqlx/issues/428
query := `
INSERT INTO solana.log_poller_filters
(chain_id, name, address, event_name, event_sig, starting_block, event_idl, subkey_paths, retention, max_logs_kept)
VALUES (:chain_id, :name, :address, :event_name, :event_sig, :starting_block, :event_idl, :subkey_paths, :retention, :max_logs_kept)
RETURNING id;`

query, sqlArgs, err := o.ds.BindNamed(query, args)
if err != nil {
return 0, err
}
if err = o.ds.GetContext(ctx, &id, query, sqlArgs...); err != nil {
return 0, err
}
return id, nil
}

// GetFilterByID returns filter by ID
func (o *DSORM) GetFilterByID(ctx context.Context, id int64) (Filter, error) {
query := `SELECT id, name, address, event_name, event_sig, starting_block, event_idl, subkey_paths, retention, max_logs_kept
FROM solana.log_poller_filters WHERE id = $1`
var result Filter
err := o.ds.GetContext(ctx, &result, query, id)
return result, err
}

// InsertLogs is idempotent to support replays.
func (o *DSORM) InsertLogs(ctx context.Context, logs []Log) error {
if err := o.validateLogs(logs); err != nil {
return err
}
return o.Transact(ctx, func(orm *DSORM) error {
return orm.insertLogsWithinTx(ctx, logs, orm.ds)
})
}

func (o *DSORM) insertLogsWithinTx(ctx context.Context, logs []Log, tx sqlutil.DataSource) error {
batchInsertSize := 4000
for i := 0; i < len(logs); i += batchInsertSize {
start, end := i, i+batchInsertSize
if end > len(logs) {
end = len(logs)
}

query := `INSERT INTO solana.logs
(filter_id, chain_id, log_index, block_hash, block_number, block_timestamp, address, event_sig, subkey_values, tx_hash, data, created_at, expires_at, sequence_num)
VALUES
(:filter_id, :chain_id, :log_index, :block_hash, :block_number, :block_timestamp, :address, :event_sig, :subkey_values, :tx_hash, :data, NOW(), :expires_at, :sequence_num)
ON CONFLICT DO NOTHING`

_, err := tx.NamedExecContext(ctx, query, logs[start:end])
if err != nil {
if errors.Is(err, context.DeadlineExceeded) && batchInsertSize > 500 {
// In case of DB timeouts, try to insert again with a smaller batch upto a limit
batchInsertSize /= 2
i -= batchInsertSize // counteract +=batchInsertSize on next loop iteration
continue
}
return err
}
}
return nil
}

func (o *DSORM) validateLogs(logs []Log) error {
for _, log := range logs {
if o.chainID != log.ChainID {
return fmt.Errorf("invalid chainID in log got %v want %v", log.ChainID, o.chainID)
}
}
return nil
}

// SelectLogs finds the logs in a given block range.
func (o *DSORM) SelectLogs(ctx context.Context, start, end int64, address PublicKey, eventSig []byte) ([]Log, error) {
args, err := newQueryArgsForEvent(o.chainID, address, eventSig).
withStartBlock(start).
withEndBlock(end).
toArgs()
if err != nil {
return nil, err
}

query := logsQuery(`
WHERE chain_id = :chain_id
AND address = :address
AND event_sig = :event_sig
AND block_number >= :start_block
AND block_number <= :end_block
ORDER BY block_number, log_index`)

var logs []Log
query, sqlArgs, err := o.ds.BindNamed(query, args)
if err != nil {
return nil, err
}

err = o.ds.SelectContext(ctx, &logs, query, sqlArgs...)
if err != nil {
return nil, err
}
return logs, nil
}
Loading
Loading