Skip to content

Commit

Permalink
Make state/transmit configurable + polling state (#124)
Browse files Browse the repository at this point in the history
* standardize rpc commitment + move preflight check to parameter

* add SkipPreflight to spec

* make commitment level configurable

* fix relay test cases

* update monitoring with fetching state changes

* update e2e test with state fetching change

* preliminary polling - need test case, polling configurability

* polling test case

* add state polling to start up

* remove blocking calls on state fetching

* lint fixes

* use utils.StartStopOnce

* fix lint: pass lock by value

* use ContextFromChanWithDeadline

* initial race condition fixes

* add parameters to relay config

* update libocr to 5d2b1d5f424ba9183c4240ed4a6062141c82c7b5

* add RWMutex to prevent race conditions

* fix rebase damage

* include preliminary state timeout error

* make stale timeout configurable, default to skip preflight

* go fmt

* add new params to simple job spec

* fix commitment typo

* feedback fixes: fix comment, block close until loop exits, etc

* add polling jitter
  • Loading branch information
aalu1418 authored Jan 21, 2022
1 parent 01061f9 commit 5b4d7cd
Show file tree
Hide file tree
Showing 14 changed files with 347 additions and 418 deletions.
5 changes: 5 additions & 0 deletions examples/spec/ocr2-oracle-simple.spec.toml
Original file line number Diff line number Diff line change
Expand Up @@ -44,3 +44,8 @@ nodeEndpointHTTP = "http:..."
ocr2ProgramID = "<insert solana ocr2 program ID>"
transmissionsID = "<insert solana ocr2 transmissions account>"
storeProgramID = "<insert solana ocr2 store account>"
usePreflight = false
commitment = "confirmed"
pollingInterval = "1s"
pollingCtxTimeout = "2s"
staleTimeout = "1m"
4 changes: 2 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ require (
github.com/smartcontractkit/chainlink v1.0.1-0.20211209223503-68928efa429a
github.com/smartcontractkit/helmenv v1.0.24
github.com/smartcontractkit/integrations-framework v1.0.31
github.com/smartcontractkit/libocr v0.0.0-20211210213233-5443fb9db7f7
github.com/smartcontractkit/libocr v0.0.0-20220121130134-5d2b1d5f424b
github.com/stretchr/testify v1.7.0
)

Expand All @@ -28,6 +28,7 @@ require (
github.com/prometheus/client_golang v1.11.0
github.com/riferrei/srclient v0.4.0
github.com/rs/zerolog v1.26.1
go.uber.org/atomic v1.9.0
go.uber.org/zap v1.19.1
golang.org/x/crypto v0.0.0-20211215165025-cf75a172585e
golang.org/x/sync v0.0.0-20210220032951-036812b2e83c
Expand Down Expand Up @@ -182,7 +183,6 @@ require (
github.com/xlab/treeprint v1.1.0 // indirect
go.opencensus.io v0.23.0 // indirect
go.starlark.net v0.0.0-20211013185944-b0039bd2cfe3 // indirect
go.uber.org/atomic v1.9.0 // indirect
go.uber.org/multierr v1.7.0 // indirect
go.uber.org/ratelimit v0.2.0 // indirect
golang.org/x/net v0.0.0-20211209124913-491a49abca63 // indirect
Expand Down
321 changes: 2 additions & 319 deletions go.sum

Large diffs are not rendered by default.

8 changes: 6 additions & 2 deletions pkg/monitoring/account_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,10 @@ import (
pkgSolana "github.com/smartcontractkit/chainlink-solana/pkg/solana"
)

const (
commitment = rpc.CommitmentConfirmed
)

// AccountReader is a wrapper on top of *rpc.Client
type AccountReader interface {
Read(ctx context.Context, account solana.PublicKey) (interface{}, error)
Expand All @@ -28,7 +32,7 @@ type trReader struct {
}

func (t *trReader) Read(ctx context.Context, transmissionsAccount solana.PublicKey) (interface{}, error) {
answer, blockNum, err := pkgSolana.GetLatestTransmission(ctx, t.client, transmissionsAccount)
answer, blockNum, err := pkgSolana.GetLatestTransmission(ctx, t.client, transmissionsAccount, commitment)
return TransmissionEnvelope{answer, blockNum}, err
}

Expand All @@ -46,7 +50,7 @@ type StateEnvelope struct {
}

func (s *stReader) Read(ctx context.Context, stateAccount solana.PublicKey) (interface{}, error) {
state, blockNum, err := pkgSolana.GetState(ctx, s.client, stateAccount)
state, blockNum, err := pkgSolana.GetState(ctx, s.client, stateAccount, commitment)
if err != nil {
return nil, fmt.Errorf("failed to fetch state : %w", err)
}
Expand Down
48 changes: 43 additions & 5 deletions pkg/solana/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,25 +2,63 @@ package solana

import (
"context"
"time"

"github.com/gagliardetto/solana-go/rpc"
"golang.org/x/sync/singleflight"
)

// Client contains the rpc and requestGroup for a given network
type Client struct {
rpc *rpc.Client
rpc *rpc.Client
skipPreflight bool // to enable or disable preflight checks
commitment rpc.CommitmentType
pollingInterval time.Duration
contextDuration time.Duration

// provides a duplicate function call suppression mechanism
requestGroup *singleflight.Group
}

// NewClient will bundle the RPC and requestGroup together as a network Client
func NewClient(rpcEndpoint string) *Client {
return &Client{
rpc: rpc.New(rpcEndpoint),
requestGroup: &singleflight.Group{},
func NewClient(spec OCR2Spec, logger Logger) *Client {
client := &Client{
rpc: rpc.New(spec.NodeEndpointHTTP),
skipPreflight: !spec.UsePreflight,
requestGroup: &singleflight.Group{},
}

// parse commitment level (defaults to confirmed)
switch spec.Commitment {
case "processed":
client.commitment = rpc.CommitmentProcessed
case "finalized":
client.commitment = rpc.CommitmentFinalized
default:
client.commitment = rpc.CommitmentConfirmed
}

// parse poll interval, if errors: use 1 second default
pollInterval, err := time.ParseDuration(spec.PollingInterval)
if err != nil {
logger.Warnf("could not parse polling interval using default 1s")
pollInterval = 1 * time.Second
}

// parse context lenght, if errors, use 2x poll interval
ctxInterval, err := time.ParseDuration(spec.PollingCtxTimeout)
if err != nil {
logger.Warnf("could not parse polling context duration using default 2x polling interval")
ctxInterval = 2 * pollInterval
}

client.pollingInterval = pollInterval
client.contextDuration = ctxInterval

// log client configuration
logger.Debugf("NewClient configuration: %+v", client)

return client
}

// GetBlockHeight returns the height of the most recent processed block in the chain, coalescing requests.
Expand Down
13 changes: 7 additions & 6 deletions pkg/solana/config_tracker.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,15 +8,15 @@ import (
"github.com/smartcontractkit/libocr/offchainreporting2/types"
)

func (c ContractTracker) Notify() <-chan struct{} {
func (c *ContractTracker) Notify() <-chan struct{} {
return nil // not using websocket, config changes will be handled by polling in libocr
}

// LatestConfigDetails returns information about the latest configuration,
// but not the configuration itself.
func (c *ContractTracker) LatestConfigDetails(ctx context.Context) (changedInBlock uint64, configDigest types.ConfigDigest, err error) {
err = c.fetchState(ctx)
return c.state.Config.LatestConfigBlockNumber, c.state.Config.LatestConfigDigest, err
state, _, err := c.ReadState()
return state.Config.LatestConfigBlockNumber, state.Config.LatestConfigDigest, err
}

func configFromState(state State) (types.ContractConfig, error) {
Expand Down Expand Up @@ -53,13 +53,14 @@ func configFromState(state State) (types.ContractConfig, error) {

// LatestConfig returns the latest configuration.
func (c *ContractTracker) LatestConfig(ctx context.Context, changedInBlock uint64) (types.ContractConfig, error) {
if err := c.fetchState(ctx); err != nil {
state, _, err := c.ReadState()
if err != nil {
return types.ContractConfig{}, err
}
return configFromState(c.state)
return configFromState(state)
}

// LatestBlockHeight returns the height of the most recent block in the chain.
func (c *ContractTracker) LatestBlockHeight(ctx context.Context) (blockHeight uint64, err error) {
return c.client.GetBlockHeight(ctx, rpc.CommitmentProcessed)
return c.client.GetBlockHeight(ctx, rpc.CommitmentProcessed) // this returns the latest height through CommitmentProcessed
}
3 changes: 2 additions & 1 deletion pkg/solana/config_tracker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,14 @@ import (
"testing"

"github.com/gagliardetto/solana-go/rpc"
"github.com/smartcontractkit/chainlink/core/logger"
"github.com/stretchr/testify/assert"
)

func TestLatestBlockHeight(t *testing.T) {
ctx := context.Background()
c := &ContractTracker{
client: NewClient(rpc.DevNet_RPC),
client: NewClient(OCR2Spec{NodeEndpointHTTP: rpc.DevNet_RPC}, logger.TestLogger(t)),
}

h, err := c.LatestBlockHeight(ctx)
Expand Down
Loading

0 comments on commit 5b4d7cd

Please sign in to comment.