Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

NONEVM-745 LogPoller db models #921

Open
wants to merge 8 commits into
base: develop
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 13 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,12 @@ require (
github.com/go-viper/mapstructure/v2 v2.1.0
github.com/google/uuid v1.6.0
github.com/hashicorp/go-plugin v1.6.2-0.20240829161738-06afb6d7ae99
github.com/jackc/pgx/v4 v4.18.3
github.com/jpillora/backoff v1.0.0
github.com/lib/pq v1.10.9
github.com/pelletier/go-toml/v2 v2.2.0
github.com/prometheus/client_golang v1.17.0
github.com/smartcontractkit/chainlink-common v0.3.1-0.20241023204219-86c89e29937d
github.com/smartcontractkit/chainlink-common v0.3.1-0.20241102004624-9035c2a5cb36
github.com/smartcontractkit/libocr v0.0.0-20241007185508-adbe57025f12
github.com/stretchr/testify v1.9.0
go.uber.org/zap v1.27.0
Expand All @@ -28,7 +30,7 @@ require (

require (
contrib.go.opencensus.io/exporter/stackdriver v0.13.4 // indirect
filippo.io/edwards25519 v1.0.0-rc.1 // indirect
filippo.io/edwards25519 v1.1.0 // indirect
github.com/andres-erbsen/clock v0.0.0-20160526145045-9e14626cd129 // indirect
github.com/aybabtme/rgbterm v0.0.0-20170906152045-cc83f3b3ce59 // indirect
github.com/bahlo/generic-list-go v0.2.0 // indirect
Expand Down Expand Up @@ -59,6 +61,14 @@ require (
github.com/hashicorp/go-hclog v1.5.0 // indirect
github.com/hashicorp/yamux v0.1.1 // indirect
github.com/invopop/jsonschema v0.12.0 // indirect
github.com/jackc/chunkreader/v2 v2.0.1 // indirect
github.com/jackc/pgconn v1.14.3 // indirect
github.com/jackc/pgio v1.0.0 // indirect
github.com/jackc/pgpassfile v1.0.0 // indirect
github.com/jackc/pgproto3/v2 v2.3.3 // indirect
github.com/jackc/pgservicefile v0.0.0-20221227161230-091c0ba34f0a // indirect
github.com/jackc/pgtype v1.14.0 // indirect
github.com/jmoiron/sqlx v1.4.0 // indirect
github.com/json-iterator/go v1.1.12 // indirect
github.com/klauspost/compress v1.15.15 // indirect
github.com/kr/pretty v0.3.1 // indirect
Expand Down Expand Up @@ -87,6 +97,7 @@ require (
github.com/rogpeppe/go-internal v1.12.0 // indirect
github.com/ryanuber/go-glob v1.0.0 // indirect
github.com/santhosh-tekuri/jsonschema/v5 v5.2.0 // indirect
github.com/scylladb/go-reflectx v1.0.1 // indirect
github.com/shopspring/decimal v1.4.0 // indirect
github.com/smartcontractkit/grpc-proxy v0.0.0-20240830132753-a7e17fec5ab7 // indirect
github.com/streamingfast/logging v0.0.0-20220405224725-2755dab2ce75 // indirect
Expand Down
114 changes: 111 additions & 3 deletions go.sum

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion integration-tests/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ require (
github.com/lib/pq v1.10.9
github.com/pelletier/go-toml/v2 v2.2.3
github.com/rs/zerolog v1.33.0
github.com/smartcontractkit/chainlink-common v0.3.1-0.20241025132045-cfad02139595
github.com/smartcontractkit/chainlink-common v0.3.1-0.20241102004624-9035c2a5cb36
github.com/smartcontractkit/chainlink-solana v1.1.1-0.20241024132041-a3eb2e31b4c4
github.com/smartcontractkit/chainlink-testing-framework/lib v1.50.13
github.com/smartcontractkit/chainlink-testing-framework/seth v1.50.5
Expand Down
1 change: 1 addition & 0 deletions integration-tests/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -1388,6 +1388,7 @@ github.com/smartcontractkit/chainlink-ccip v0.0.0-20241025085158-0f6dce5d1fdb h1
github.com/smartcontractkit/chainlink-ccip v0.0.0-20241025085158-0f6dce5d1fdb/go.mod h1:4adKaHNaxFsRvV/lYfqtbsWyyvIPUMLR0FdOJN/ljis=
github.com/smartcontractkit/chainlink-common v0.3.1-0.20241025132045-cfad02139595 h1:H6i0LEvXB0se/63E3jE9N0/7TugOYLpK4e6TT6a0omc=
github.com/smartcontractkit/chainlink-common v0.3.1-0.20241025132045-cfad02139595/go.mod h1:TQ9/KKXZ9vr8QAlUquqGpSvDCpR+DtABKPXZY4CiRns=
github.com/smartcontractkit/chainlink-common v0.3.1-0.20241102004624-9035c2a5cb36/go.mod h1:SXPN46nO9o7FjfQCUUVEkVXDgw8l1myGDZ8z273k3SI=
github.com/smartcontractkit/chainlink-cosmos v0.5.2-0.20241017133723-5277829bd53f h1:BwrIaQIx5Iy6eT+DfLhFfK2XqjxRm74mVdlX8gbu4dw=
github.com/smartcontractkit/chainlink-cosmos v0.5.2-0.20241017133723-5277829bd53f/go.mod h1:wHtwSR3F1CQSJJZDQKuqaqFYnvkT+kMyget7dl8Clvo=
github.com/smartcontractkit/chainlink-data-streams v0.1.1-0.20241018134907-a00ba3729b5e h1:JiETqdNM0bktAUGMc62COwXIaw3rR3M77Me6bBLG0Fg=
Expand Down
38 changes: 38 additions & 0 deletions pkg/solana/logpoller/models.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
package logpoller

import (
"time"

"github.com/lib/pq"
)

type Filter struct {
ID int64
Name string
Address PublicKey
EventName string
EventSig []byte
StartingBlock int64
EventIDL string
SubKeyPaths SubKeyPaths
Retention time.Duration
MaxLogsKept int64
}

type Log struct {
ID int64
FilterId int64
ChainId string
LogIndex int64
BlockHash Hash
BlockNumber int64
BLockTimestamp time.Time
Address PublicKey
EventSig []byte
SubKeyValues pq.ByteaArray
TxHash Signature
Data []byte
CreatedAt time.Time
ExpiresAt *time.Time
SequenceNum int64
}
160 changes: 160 additions & 0 deletions pkg/solana/logpoller/orm.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,160 @@
package logpoller

import (
"context"
"errors"
"fmt"

"github.com/smartcontractkit/chainlink-common/pkg/logger"
"github.com/smartcontractkit/chainlink-common/pkg/sqlutil"
)

type DSORM struct {
chainID string
ds sqlutil.DataSource
lggr logger.Logger
}

// NewORM creates an DSORM scoped to chainID.
func NewORM(chainID string, ds sqlutil.DataSource, lggr logger.Logger) *DSORM {
return &DSORM{
chainID: chainID,
ds: ds,
lggr: lggr,
}
}

func (o *DSORM) Transact(ctx context.Context, fn func(*DSORM) error) (err error) {
return sqlutil.Transact(ctx, o.new, o.ds, nil, fn)
}

// new returns a NewORM like o, but backed by ds.
func (o *DSORM) new(ds sqlutil.DataSource) *DSORM { return NewORM(o.chainID, ds, o.lggr) }

// InsertFilter is idempotent.
//
// Each address/event pair must have a unique job id, so it may be removed when the job is deleted.
// Returns ID for updated or newly inserted filter.
func (o *DSORM) InsertFilter(ctx context.Context, filter Filter) (id int64, err error) {
args, err := newQueryArgs(o.chainID).
withField("name", filter.Name).
withRetention(filter.Retention).
withMaxLogsKept(filter.MaxLogsKept).
withName(filter.Name).
withAddress(filter.Address).
withEventName(filter.EventName).
withEventSig(filter.EventSig).
withStartingBlock(filter.StartingBlock).
withEventIDL(filter.EventIDL).
withSubKeyPaths(filter.SubKeyPaths).
toArgs()
if err != nil {
return 0, err
}

// '::' has to be escaped in the query string
// https://github.com/jmoiron/sqlx/issues/91, https://github.com/jmoiron/sqlx/issues/428
query := `
INSERT INTO solana.log_poller_filters
(chain_id, name, address, event_name, event_sig, starting_block, event_idl, sub_key_paths, retention, max_logs_kept)
VALUES (:chain_id, :name, :address, :event_name, :event_sig, :starting_block, :event_idl, :sub_key_paths, :retention, :max_logs_kept)
ON CONFLICT (solana.f_log_poller_filter_hash(name, chain_id, address, event_sig, sub_key_paths))
DO UPDATE SET retention=:retention ::::BIGINT, max_logs_kept=:max_logs_kept ::::NUMERIC, starting_block=:starting_block ::::NUMERIC
RETURNING id;`

query, sqlArgs, err := o.ds.BindNamed(query, args)
if err != nil {
return 0, err
}
if err = o.ds.GetContext(ctx, &id, query, sqlArgs...); err != nil {
return 0, err
}
return id, nil
}

// GetFilterByID returns filter by ID
func (o *DSORM) GetFilterByID(ctx context.Context, id int64) (Filter, error) {
query := `SELECT id, name, address, event_name, event_sig, starting_block, event_idl, sub_key_paths, retention, max_logs_kept
FROM solana.log_poller_filters WHERE id = $1`
var result Filter
err := o.ds.GetContext(ctx, &result, query, id)
return result, err
}

// InsertLogs is idempotent to support replays.
func (o *DSORM) InsertLogs(ctx context.Context, logs []Log) error {
if err := o.validateLogs(logs); err != nil {
return err
}
return o.Transact(ctx, func(orm *DSORM) error {
return orm.insertLogsWithinTx(ctx, logs, orm.ds)
})
}

func (o *DSORM) insertLogsWithinTx(ctx context.Context, logs []Log, tx sqlutil.DataSource) error {
batchInsertSize := 4000
for i := 0; i < len(logs); i += batchInsertSize {
start, end := i, i+batchInsertSize
if end > len(logs) {
end = len(logs)
}

query := `INSERT INTO solana.logs
(filter_id, chain_id, log_index, block_hash, block_number, block_timestamp, address, event_sig, subkey_values, tx_hash, data, created_at, expires_at, sequence_num)
VALUES
(:filter_id, :chain_id, :log_index, :block_hash, :block_number, :block_timestamp, :address, :event_sig, :subkey_values, :tx_hash, :data, NOW(), :expires_at, :sequence_num)
ON CONFLICT DO NOTHING`

_, err := tx.NamedExecContext(ctx, query, logs[start:end])
if err != nil {
if errors.Is(err, context.DeadlineExceeded) && batchInsertSize > 500 {
// In case of DB timeouts, try to insert again with a smaller batch upto a limit
batchInsertSize /= 2
i -= batchInsertSize // counteract +=batchInsertSize on next loop iteration
continue
}
return err
}
}
return nil
}

func (o *DSORM) validateLogs(logs []Log) error {
for _, log := range logs {
if o.chainID != log.ChainId {
return fmt.Errorf("invalid chainID in log got %v want %v", log.ChainId, o.chainID)
}
}
return nil
}

// SelectLogs finds the logs in a given block range.
func (o *DSORM) SelectLogs(ctx context.Context, start, end int64, address PublicKey, eventSig []byte) ([]Log, error) {
args, err := newQueryArgsForEvent(o.chainID, address, eventSig).
withStartBlock(start).
withEndBlock(end).
toArgs()
if err != nil {
return nil, err
}

query := logsQuery(`
WHERE chain_id = :chain_id
AND address = :address
AND event_sig = :event_sig
AND block_number >= :start_block
AND block_number <= :end_block
ORDER BY block_number, log_index`)

var logs []Log
query, sqlArgs, err := o.ds.BindNamed(query, args)
if err != nil {
return nil, err
}

err = o.ds.SelectContext(ctx, &logs, query, sqlArgs...)
if err != nil {
return nil, err
}
return logs, nil
}
135 changes: 135 additions & 0 deletions pkg/solana/logpoller/orm_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,135 @@
//go:build db_tests

package logpoller

import (
"os"
"testing"
"time"

"github.com/gagliardetto/solana-go"
"github.com/google/uuid"
_ "github.com/jackc/pgx/v4/stdlib"
"github.com/lib/pq"
"github.com/smartcontractkit/chainlink-common/pkg/logger"
"github.com/smartcontractkit/chainlink-common/pkg/pg"
"github.com/smartcontractkit/chainlink-common/pkg/utils/tests"
"github.com/stretchr/testify/require"
)

// NOTE: at the moment it's not possible to run all db tests at once. This issue will be addressed separately

func TestLogPollerFilters(t *testing.T) {
lggr := logger.Test(t)
dbURL, ok := os.LookupEnv("CL_DATABASE_URL")
require.True(t, ok, "CL_DATABASE_URL must be set")
chainID := uuid.NewString()
dbx := pg.NewSqlxDB(t, dbURL)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm really confused... I have a PR open to move NewSqlDB and some other things over from chainlink to chainlink-common, but as far as I know it hasn't been merged yet. 🤔

I wonder if someone else moved it over already.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, it's not merged. I've used your PR as a dependency to unblock myself.

orm := NewORM(chainID, dbx, lggr)

privateKey, err := solana.NewRandomPrivateKey()
require.NoError(t, err)
pubKey := privateKey.PublicKey()
filters := []Filter{
{
Name: "happy path",
Address: PublicKey(pubKey),
EventName: "event",
EventSig: []byte{1, 2, 3},
StartingBlock: 1,
EventIDL: "{}",
SubKeyPaths: SubKeyPaths([][]string{{"a", "b"}, {"c"}}),
Retention: 1000,
MaxLogsKept: 3,
},
{
Name: "empty sub key paths",
Address: PublicKey(pubKey),
EventName: "event",
EventSig: []byte{1, 2, 3},
StartingBlock: 1,
EventIDL: "{}",
SubKeyPaths: SubKeyPaths([][]string{}),
Retention: 1000,
MaxLogsKept: 3,
},
{
Name: "nil sub key paths",
Address: PublicKey(pubKey),
EventName: "event",
EventSig: []byte{1, 2, 3},
StartingBlock: 1,
EventIDL: "{}",
SubKeyPaths: nil,
Retention: 1000,
MaxLogsKept: 3,
},
}

for _, filter := range filters {
t.Run("Save filter: "+filter.Name, func(t *testing.T) {
ctx := tests.Context(t)
id, err := orm.InsertFilter(ctx, filter)
require.NoError(t, err)
filter.ID = id
dbFilter, err := orm.GetFilterByID(ctx, id)
require.NoError(t, err)
require.Equal(t, filter, dbFilter)

// subsequent insert of the same filter won't produce new db row
secondID, err := orm.InsertFilter(ctx, filter)
require.NoError(t, err)
require.Equal(t, secondID, id)
})
}
}

func TestLogPollerLogs(t *testing.T) {
lggr := logger.Test(t)
dbURL, ok := os.LookupEnv("CL_DATABASE_URL")
require.True(t, ok, "CL_DATABASE_URL must be set")
chainID := uuid.NewString()
dbx := pg.NewSqlxDB(t, dbURL)
orm := NewORM(chainID, dbx, lggr)

privateKey, err := solana.NewRandomPrivateKey()
require.NoError(t, err)
pubKey := privateKey.PublicKey()

ctx := tests.Context(t)
// create filter as it's required for a log
filterID, err := orm.InsertFilter(ctx, Filter{
Name: "awesome filter",
Address: PublicKey(pubKey),
EventName: "event",
EventSig: []byte{1, 2, 3},
StartingBlock: 1,
EventIDL: "{}",
SubKeyPaths: [][]string{{"a", "b"}, {"c"}},
Retention: 1000,
MaxLogsKept: 3,
})
require.NoError(t, err)
data := []byte("solana is fun")
signature, err := privateKey.Sign(data)
require.NoError(t, err)
log := Log{
FilterId: filterID,
ChainId: chainID,
LogIndex: 1,
BlockHash: Hash(pubKey),
BlockNumber: 10,
BLockTimestamp: time.Now(),
Address: PublicKey(pubKey),
EventSig: []byte{3, 2, 1},
SubKeyValues: pq.ByteaArray([][]byte{{3, 2, 1}, {1}, {1, 2}, pubKey.Bytes()}),
TxHash: Signature(signature),
Data: data,
}
err = orm.InsertLogs(ctx, []Log{log})
require.NoError(t, err)
dbLogs, err := orm.SelectLogs(ctx, 0, 100, log.Address, log.EventSig)
require.NoError(t, err)
require.Len(t, dbLogs, 1)
require.Equal(t, log, dbLogs[0])
}
Loading