Skip to content

Commit

Permalink
Implement LLO plugin
Browse files Browse the repository at this point in the history
  • Loading branch information
samsondav committed Nov 14, 2023
1 parent 7be17c9 commit 47185db
Show file tree
Hide file tree
Showing 22 changed files with 1,357 additions and 14 deletions.
118 changes: 118 additions & 0 deletions ]
Original file line number Diff line number Diff line change
@@ -0,0 +1,118 @@
package llo

import (
"context"
"fmt"
"math/big"

"github.com/smartcontractkit/chainlink/v2/core/logger"
"github.com/smartcontractkit/chainlink/v2/core/services/pipeline"
)

type Runner interface {
ExecuteRun(ctx context.Context, spec pipeline.Spec, vars pipeline.Vars, l logger.Logger) (run *pipeline.Run, trrs pipeline.TaskRunResults, err error)
}

// TODO: Generalize to beyond simply an int
type DataPoint *big.Int

type Stream interface {
Observe(ctx context.Context) (DataPoint, error)
}

type stream struct {
id string
lggr logger.Logger
runResults chan<- *pipeline.Run
spec pipeline.Spec
runner Runner
}

func NewStream(lggr logger.Logger, id string, runResults chan<- *pipeline.Run, pipelineSpec pipeline.Spec, pipelineRunner Runner) Stream {
return newStream(lggr, id, runResults, pipelineSpec, pipelineRunner)
}

func newStream(lggr logger.Logger, id string, runResults chan<- *pipeline.Run, pipelineSpec pipeline.Spec, pipelineRunner Runner) *stream {
return &stream{id, lggr, runResults, pipelineSpec, pipelineRunner}
}

func (s *stream) Observe(ctx context.Context) (DataPoint, error) {
var run *pipeline.Run
run, trrs, err := s.executeRun(ctx)
if err != nil {
return nil, fmt.Errorf("Observe failed while executing run: %w", err)
}
select {
case s.runResults <- run:
default:
s.lggr.Warnf("unable to enqueue run save for job ID %d, buffer full", s.spec.JobID)
}

// NOTE: trrs comes back as _all_ tasks, but we only want the terminal ones
// They are guaranteed to be sorted by index asc so should be in the correct order
var finaltrrs []pipeline.TaskRunResult
for _, trr := range trrs {
if trr.IsTerminal() {
finaltrrs = append(finaltrrs, trr)
}
}

// FIXME: How to handle arbitrary-shaped inputs?
// For now just assume everything is one *big.Int
var parsed parseOutput
parsed, pipelineExecutionErr = ds.parse(finaltrrs)
if pipelineExecutionErr != nil {
pipelineExecutionErr = fmt.Errorf("Observe failed while parsing run results: %w", pipelineExecutionErr)
return
}
obs.BenchmarkPrice = parsed.benchmarkPrice
obs.Bid = parsed.bid
obs.Ask = parsed.ask

}

// The context passed in here has a timeout of (ObservationTimeout + ObservationGracePeriod).
// Upon context cancellation, its expected that we return any usable values within ObservationGracePeriod.
func (s *stream) executeRun(ctx context.Context) (*pipeline.Run, pipeline.TaskRunResults, error) {
vars := pipeline.NewVarsFrom(map[string]interface{}{
"jb": map[string]interface{}{
"databaseID": ds.jb.ID,
"externalJobID": ds.jb.ExternalJobID,
"name": ds.jb.Name.ValueOrZero(),
},
})

run, trrs, err := ds.pipelineRunner.ExecuteRun(ctx, ds.spec, vars, ds.lggr)
if err != nil {
return nil, nil, pkgerrors.Wrapf(err, "error executing run for spec ID %v", ds.spec.ID)
}

return run, trrs, err
}

// returns error on parse errors: if something is the wrong type
func (ds *datasource) parse(trrs pipeline.TaskRunResults) (*big.Int, error) {
var finaltrrs []pipeline.TaskRunResult
for _, trr := range trrs {
// only return terminal trrs from executeRun
if trr.IsTerminal() {
finaltrrs = append(finaltrrs, trr)
}
}

// pipeline.TaskRunResults comes ordered asc by index, this is guaranteed
// by the pipeline executor
if len(finaltrrs) != 1 {
return o, fmt.Errorf("invalid number of results, expected: 1, got: %d", len(finaltrrs))
}
res := finaltrrs[0].Result
if res.Error != nil {
o.benchmarkPrice.Err = res.Error
} else if val, err := toBigInt(res.Value); err != nil {
return fmt.Errorf("failed to parse BenchmarkPrice: %w", err)
} else {
o.benchmarkPrice.Val = val
}

return o, merr
}
63 changes: 63 additions & 0 deletions contracts/src/v0.8/llo-feeds/ConfigurationStore.sol
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
// SPDX-License-Identifier: UNLICENSED
pragma solidity ^0.8.13;

struct ChannelDefinition {
// e.g. evm, solana, CosmWasm, kalechain, etc...
string reportFormat;
// Specifies the chain on which this channel can be verified. Currently uses
// CCIP chain selectors, but lots of other schemes are possible as well.
uint64 chainSelector;
// We assume that StreamIDs is always non-empty and that the 0-th stream
// contains the verification price in LINK and the 1-st stream contains the
// verification price in the native coin.
string[] streamIDs;
}

contract ConfigurationStore {
////////////////////////
// protocol instance management
////////////////////////

ChannelDefinition[] private s_channelDefinitions;

// setProductionConfig() onlyOwner -- the usual OCR way
// sets config for the production protocol instance

// setStagingConfig() onlyOwner -- the usual OCR way
// sets config for the staging protocol instance

// promoteStagingConfig() onlyOwner
// this will trigger the following:
// - offchain ShouldRetireCache will start returning true for the old (production)
// protocol instance
// - once the old production instance retires it will generate a handover
// retirement report
// - the staging instance will become the new production instance once
// any honest oracle that is on both instances forward the retirement
// report from the old instance to the new instace via the
// PredecessorRetirementReportCache
//
// Note: the promotion flow only works if the previous production instance
// is working correctly & generating reports. If that's not the case, the
// owner is expected to "setProductionConfig" directly instead. This will
// cause "gaps" to be created, but that seems unavoidable in such a scenario.

////////////////////////
// channel management
////////////////////////

addChannel(ChannelDefinition) onlyOwner {
// TODO
}

removeChannel(bytes32 channelId) onlyOwner {
// TODO

}

getChannelDefinitions() onlyEOA public view returns (ChannelDefinition[] memory) {
// TODO

}
// used by ChannelDefinitionCache
}
52 changes: 52 additions & 0 deletions core/services/llo/channel_definition_cache.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
package llo

import (
"context"
"maps"

relayllo "github.com/smartcontractkit/chainlink-relay/pkg/reportingplugins/llo"
"github.com/smartcontractkit/chainlink-relay/pkg/services"
"github.com/smartcontractkit/chainlink/v2/core/logger"
)

type ChannelDefinitionCache interface {
relayllo.ChannelDefinitionCache
services.Service
}

var _ ChannelDefinitionCache = &channelDefinitionCache{}

type channelDefinitionCache struct {
services.StateMachine

lggr logger.Logger
definitions relayllo.ChannelDefinitions
}

func NewChannelDefinitionCache() ChannelDefinitionCache {
return &channelDefinitionCache{}
}

func (c *channelDefinitionCache) Start(ctx context.Context) error {
// TODO: Initial load, then poll
// TODO: needs to be populated asynchronously from onchain ConfigurationStore
return nil
}

func (c *channelDefinitionCache) Close() error {
// TODO
return nil
}

func (c *channelDefinitionCache) HealthReport() map[string]error {
report := map[string]error{c.Name(): c.Healthy()}
return report
}

func (c *channelDefinitionCache) Name() string { return c.lggr.Name() }

func (c *channelDefinitionCache) Definitions() relayllo.ChannelDefinitions {
c.StateMachine.RLock()
defer c.StateMachine.RUnlock()
return maps.Clone(c.definitions)
}
9 changes: 9 additions & 0 deletions core/services/llo/channel_definition_cache_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
package llo

import "testing"

func Test_ChannelDefinitionCache(t *testing.T) {
t.Run("Definitions", func(t *testing.T) {
t.Fatal("TODO")
})
}
91 changes: 91 additions & 0 deletions core/services/llo/data_source.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
package llo

// TODO: llo datasource
import (
"context"
"fmt"
"math/big"
"sync"

"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
relayllo "github.com/smartcontractkit/chainlink-relay/pkg/reportingplugins/llo"
"github.com/smartcontractkit/chainlink/v2/core/logger"
)

var (
promMissingStreamCount = promauto.NewCounterVec(prometheus.CounterOpts{
Name: "llo_stream_missing_count",
Help: "Number of times we tried to observe a stream, but it was missing",
},
[]string{"streamID"},
)
promObservationErrorCount = promauto.NewCounterVec(prometheus.CounterOpts{
Name: "llo_stream_observation_error_count",
Help: "Number of times we tried to observe a stream, but it failed with an error",
},
[]string{"streamID"},
)
)

type ErrMissingStream struct {
id string
}

func (e ErrMissingStream) Error() string {
return fmt.Sprintf("missing stream definition for: %q", e.id)
}

var _ relayllo.DataSource = &dataSource{}

type dataSource struct {
lggr logger.Logger
streamCache StreamCache
}

func NewDataSource(lggr logger.Logger, streamCache StreamCache) relayllo.DataSource {
// TODO: lggr should include job ID
return &dataSource{lggr, streamCache}
}

func (d *dataSource) Observe(ctx context.Context, streamIDs map[relayllo.StreamID]struct{}) (relayllo.StreamValues, error) {
// There is no "observationSource" (AKA pipeline)
// Need a concept of "streams"
// Streams are referenced by ID from the on-chain config
// Each stream contains its own pipeline
// See: https://docs.google.com/document/d/1l1IiDOL1QSteLTnhmiGnJAi6QpcSpyOe0nkqS7D3SvU/edit for stream ID naming

var wg sync.WaitGroup
wg.Add(len(streamIDs))
sv := make(relayllo.StreamValues)
var mu sync.Mutex

for streamID := range streamIDs {
go func() {
defer wg.Done()

var res relayllo.ObsResult[*big.Int]

stream, exists := d.streamCache.Get(streamID)
if exists {
res.Val, res.Err = stream.Observe(ctx)
if res.Err != nil {
d.lggr.Debugw("Observation failed for stream", "err", res.Err, "streamID", streamID)
promObservationErrorCount.WithLabelValues(streamID.String()).Inc()
}
} else {
d.lggr.Errorw(fmt.Sprintf("Missing stream: %q"), "streamID", streamID)
promMissingStreamCount.WithLabelValues(streamID.String()).Inc()
res.Err = ErrMissingStream{streamID.String()}
}

mu.Lock()
defer mu.Unlock()
sv[streamID] = res
}()
}

wg.Wait()

return sv, nil
}
9 changes: 9 additions & 0 deletions core/services/llo/data_source_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
package llo

import "testing"

func Test_DataSource(t *testing.T) {
t.Run("Observe", func(t *testing.T) {
t.Fatal("TODO")
})
}
Loading

0 comments on commit 47185db

Please sign in to comment.