Skip to content

Commit

Permalink
BCF-2854 Jfpc pipeline cache (#12094)
Browse files Browse the repository at this point in the history
* Add rough implementation for data source caching

* Add JuelsPerFeeCoinCacheDuration validation

* Make sonar happy

* Set EATelemetry for dataSource in observe to reuse executeRun for cache

* Fix races in inMemoryDataSourceCache

* Fix JuelsPerFeeCoinCacheDuration validation

* Shorten NewInMemoryDataSourceCache signature

* Infer juels fee per coin caching from non-zero duration instead of bool

* Change how jfpc cache handles cache updater cron errors

* Change jfpc cache duration validation

* Set default jfpc data source expiry duration to 5minutes if not set

* Improve jfpc updateCache err handling to account for result errors

* Make juels fee per coin get cached by default, and add cache tests

* Make jfpc cache updater run on start

* Increase jfpc data source timeout

* Change log priority in cache updater

* Fix test flakiness

* Change log severity in jfpc cache updater

* Add cfg option to disable jfpc caching

* Remove ptr from inMemoryDataSourceCache mutex

* Fix JuelsPerFeeCoinCacheDuration encoding type to use models.Interval

* Add tests for validating juels per fee coin data source cache config

* Add changelog

* Fix typo in test

* Fix log
  • Loading branch information
ilija42 authored Feb 27, 2024
1 parent 728a7ab commit 23f767f
Show file tree
Hide file tree
Showing 8 changed files with 242 additions and 36 deletions.
2 changes: 2 additions & 0 deletions core/internal/features/ocr2/features_ocr2_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -487,6 +487,7 @@ juelsPerFeeCoinSource = """
answer1 [type=median index=0];
"""
juelsPerFeeCoinCacheDuration = "1m"
`, ocrContractAddress, kbs[i].ID(), transmitters[i], fmt.Sprintf("bridge%d", i), i, slowServers[i].URL, i, blockBeforeConfig.Number().Int64(), chainReaderSpec, fmt.Sprintf("bridge%d", i), i, slowServers[i].URL, i))
require.NoError(t, err)
err = apps[i].AddJobV2(testutils.Context(t), &ocrJob)
Expand Down Expand Up @@ -839,6 +840,7 @@ juelsPerFeeCoinSource = """
answer1 [type=median index=0];
"""
juelsPerFeeCoinCacheDuration = "1m"
`, ocrContractAddress, kbs[i].ID(), transmitters[i], fmt.Sprintf("bridge%d", i), i, slowServers[i].URL, i, fmt.Sprintf("bridge%d", i), i, slowServers[i].URL, i))
require.NoError(t, err)
err = apps[i].AddJobV2(testutils.Context(t), &ocrJob)
Expand Down
3 changes: 3 additions & 0 deletions core/services/feeds/service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,7 @@ ds1_multiply [type=multiply times=1.23];
ds1 -> ds1_parse -> ds1_multiply -> answer1;
answer1 [type=median index=0];
"""
juelsPerFeeCoinCacheDuration = "1m"
`
const BootstrapTestSpecTemplate = `
type = "bootstrap"
Expand Down Expand Up @@ -2186,6 +2187,7 @@ ds1_multiply [type=multiply times=1.23];
ds1 -> ds1_parse -> ds1_multiply -> answer1;
answer1 [type=median index=0];
"""
juelsPerFeeCoinCacheDuration = "30s"
`
defn2 = `
name = 'LINK / ETH | version 3 | contract 0x0000000000000000000000000000000000000000'
Expand Down Expand Up @@ -2215,6 +2217,7 @@ ds1_multiply [type=multiply times=1.23];
ds1 -> ds1_parse -> ds1_multiply -> answer1;
answer1 [type=median index=0];
"""
juelsPerFeeCoinCacheDuration = "20m"
`

jp = &feeds.JobProposal{
Expand Down
16 changes: 15 additions & 1 deletion core/services/ocr2/plugins/median/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,19 @@
package config

import (
"time"

"github.com/pkg/errors"

"github.com/smartcontractkit/chainlink/v2/core/services/pipeline"
"github.com/smartcontractkit/chainlink/v2/core/store/models"
)

// The PluginConfig struct contains the custom arguments needed for the Median plugin.
type PluginConfig struct {
JuelsPerFeeCoinPipeline string `json:"juelsPerFeeCoinSource"`
JuelsPerFeeCoinPipeline string `json:"juelsPerFeeCoinSource"`
JuelsPerFeeCoinCacheDuration models.Interval `json:"juelsPerFeeCoinCacheDuration"`
JuelsPerFeeCoinCacheDisabled bool `json:"juelsPerFeeCoinCacheDisabled"`
}

// ValidatePluginConfig validates the arguments for the Median plugin.
Expand All @@ -20,5 +25,14 @@ func ValidatePluginConfig(config PluginConfig) error {
return errors.Wrap(err, "invalid juelsPerFeeCoinSource pipeline")
}

// unset duration defaults later
if config.JuelsPerFeeCoinCacheDuration != 0 {
if config.JuelsPerFeeCoinCacheDuration.Duration() < time.Second*30 {
return errors.Errorf("juelsPerFeeCoinSource cache duration: %s is below 30 second minimum", config.JuelsPerFeeCoinCacheDuration.Duration().String())
} else if config.JuelsPerFeeCoinCacheDuration.Duration() > time.Minute*20 {
return errors.Errorf("juelsPerFeeCoinSource cache duration: %s is above 20 minute maximum", config.JuelsPerFeeCoinCacheDuration.Duration().String())
}
}

return nil
}
55 changes: 44 additions & 11 deletions core/services/ocr2/plugins/median/config/config_test.go
Original file line number Diff line number Diff line change
@@ -1,22 +1,55 @@
package config

import (
"fmt"
"testing"
"time"

"github.com/stretchr/testify/assert"

"github.com/smartcontractkit/chainlink/v2/core/store/models"
)

func TestValidatePluginConfig(t *testing.T) {
for _, s := range []struct {
name string
pipeline string
}{
{"empty", ""},
{"blank", " "},
{"foo", "foo"},
} {
t.Run(s.name, func(t *testing.T) {
assert.Error(t, ValidatePluginConfig(PluginConfig{JuelsPerFeeCoinPipeline: s.pipeline}))
})
type testCase struct {
name string
pipeline string
cacheDuration models.Interval
expectedError error
}

t.Run("pipeline validation", func(t *testing.T) {
for _, tc := range []testCase{
{"empty pipeline", "", models.Interval(time.Minute), fmt.Errorf("invalid juelsPerFeeCoinSource pipeline: empty pipeline")},
{"blank pipeline", " ", models.Interval(time.Minute), fmt.Errorf("invalid juelsPerFeeCoinSource pipeline: empty pipeline")},
{"foo pipeline", "foo", models.Interval(time.Minute), fmt.Errorf("invalid juelsPerFeeCoinSource pipeline: UnmarshalTaskFromMap: unknown task type: \"\"")},
} {
t.Run(tc.name, func(t *testing.T) {
assert.EqualError(t, ValidatePluginConfig(PluginConfig{JuelsPerFeeCoinPipeline: tc.pipeline}), tc.expectedError.Error())
})
}
})

t.Run("cache duration validation", func(t *testing.T) {
for _, tc := range []testCase{
{"cache duration below minimum", `ds1 [type=bridge name=voter_turnout];`, models.Interval(time.Second * 29), fmt.Errorf("juelsPerFeeCoinSource cache duration: 29s is below 30 second minimum")},
{"cache duration above maximum", `ds1 [type=bridge name=voter_turnout];`, models.Interval(time.Minute*20 + time.Second), fmt.Errorf("juelsPerFeeCoinSource cache duration: 20m1s is above 20 minute maximum")},
} {
t.Run(tc.name, func(t *testing.T) {
assert.EqualError(t, ValidatePluginConfig(PluginConfig{JuelsPerFeeCoinPipeline: tc.pipeline, JuelsPerFeeCoinCacheDuration: tc.cacheDuration}), tc.expectedError.Error())
})
}
})

t.Run("valid values", func(t *testing.T) {
for _, s := range []testCase{
{"valid 0 cache duration and valid pipeline", `ds1 [type=bridge name=voter_turnout];`, 0, nil},
{"valid duration and valid pipeline", `ds1 [type=bridge name=voter_turnout];`, models.Interval(time.Second * 30), nil},
{"valid duration and valid pipeline", `ds1 [type=bridge name=voter_turnout];`, models.Interval(time.Minute * 20), nil},
} {
t.Run(s.name, func(t *testing.T) {
assert.Nil(t, ValidatePluginConfig(PluginConfig{JuelsPerFeeCoinPipeline: s.pipeline}))
})
}
})
}
14 changes: 11 additions & 3 deletions core/services/ocr2/plugins/median/services.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,18 +113,26 @@ func NewMedianServices(ctx context.Context,
}
}

dataSource, juelsPerFeeCoinSource := ocrcommon.NewDataSourceV2(pipelineRunner,
dataSource := ocrcommon.NewDataSourceV2(pipelineRunner,
jb,
*jb.PipelineSpec,
lggr,
runSaver,
chEnhancedTelem,
), ocrcommon.NewInMemoryDataSource(pipelineRunner, jb, pipeline.Spec{
chEnhancedTelem)

juelsPerFeeCoinSource := ocrcommon.NewInMemoryDataSource(pipelineRunner, jb, pipeline.Spec{
ID: jb.ID,
DotDagSource: pluginConfig.JuelsPerFeeCoinPipeline,
CreatedAt: time.Now(),
}, lggr)

if !pluginConfig.JuelsPerFeeCoinCacheDisabled {
lggr.Infof("juelsPerFeeCoin data source caching is enabled")
if juelsPerFeeCoinSource, err = ocrcommon.NewInMemoryDataSourceCache(juelsPerFeeCoinSource, pluginConfig.JuelsPerFeeCoinCacheDuration.Duration()); err != nil {
return nil, err
}
}

if cmdName := env.MedianPlugin.Cmd.Get(); cmdName != "" {
// use unique logger names so we can use it to register a loop
medianLggr := lggr.Named("Median").Named(spec.ContractID).Named(spec.GetID())
Expand Down
138 changes: 117 additions & 21 deletions core/services/ocrcommon/data_source.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package ocrcommon

import (
"context"
errjoin "errors"
"math/big"
"sync"
"time"
Expand Down Expand Up @@ -98,8 +99,42 @@ func NewInMemoryDataSource(pr pipeline.Runner, jb job.Job, spec pipeline.Spec, l
}
}

const defaultInMemoryCacheDuration = time.Minute * 5

func NewInMemoryDataSourceCache(ds median.DataSource, cacheExpiryDuration time.Duration) (median.DataSource, error) {
inMemoryDS, ok := ds.(*inMemoryDataSource)
if !ok {
return nil, errors.Errorf("unsupported data source type: %T, only inMemoryDataSource supported", ds)
}

if cacheExpiryDuration == 0 {
cacheExpiryDuration = defaultInMemoryCacheDuration
}

dsCache := &inMemoryDataSourceCache{
cacheExpiration: cacheExpiryDuration,
inMemoryDataSource: inMemoryDS,
}
go func() { dsCache.updater() }()
return dsCache, nil
}

var _ ocr1types.DataSource = (*dataSource)(nil)

func setEATelemetry(ds *inMemoryDataSource, finalResult pipeline.FinalResult, trrs pipeline.TaskRunResults, timestamp ObservationTimestamp) {
promSetFinalResultMetrics(ds, &finalResult)
promSetBridgeParseMetrics(ds, &trrs)
if ShouldCollectEnhancedTelemetry(&ds.jb) {
EnqueueEnhancedTelem(ds.chEnhancedTelemetry, EnhancedTelemetryData{
TaskRunResults: trrs,
FinalResults: finalResult,
RepTimestamp: timestamp,
})
} else {
ds.lggr.Infow("Enhanced telemetry is disabled for job", "job", ds.jb.Name)
}
}

func (ds *inMemoryDataSource) updateAnswer(a *big.Int) {
ds.mu.Lock()
defer ds.mu.Unlock()
Expand All @@ -117,7 +152,7 @@ func (ds *inMemoryDataSource) currentAnswer() (*big.Int, *big.Int) {

// The context passed in here has a timeout of (ObservationTimeout + ObservationGracePeriod).
// Upon context cancellation, its expected that we return any usable values within ObservationGracePeriod.
func (ds *inMemoryDataSource) executeRun(ctx context.Context, timestamp ObservationTimestamp) (*pipeline.Run, pipeline.FinalResult, error) {
func (ds *inMemoryDataSource) executeRun(ctx context.Context) (*pipeline.Run, pipeline.TaskRunResults, error) {
md, err := bridges.MarshalBridgeMetaData(ds.currentAnswer())
if err != nil {
ds.lggr.Warnw("unable to attach metadata for run", "err", err)
Expand All @@ -136,23 +171,10 @@ func (ds *inMemoryDataSource) executeRun(ctx context.Context, timestamp Observat

run, trrs, err := ds.pipelineRunner.ExecuteRun(ctx, ds.spec, vars, ds.lggr)
if err != nil {
return nil, pipeline.FinalResult{}, errors.Wrapf(err, "error executing run for spec ID %v", ds.spec.ID)
}
finalResult := trrs.FinalResult(ds.lggr)
promSetBridgeParseMetrics(ds, &trrs)
promSetFinalResultMetrics(ds, &finalResult)

if ShouldCollectEnhancedTelemetry(&ds.jb) {
EnqueueEnhancedTelem(ds.chEnhancedTelemetry, EnhancedTelemetryData{
TaskRunResults: trrs,
FinalResults: finalResult,
RepTimestamp: timestamp,
})
} else {
ds.lggr.Infow("Enhanced telemetry is disabled for job", "job", ds.jb.Name)
return nil, pipeline.TaskRunResults{}, errors.Wrapf(err, "error executing run for spec ID %v", ds.spec.ID)
}

return run, finalResult, err
return run, trrs, err
}

// parse uses the FinalResult into a big.Int and stores it in the bridge metadata
Expand All @@ -176,19 +198,90 @@ func (ds *inMemoryDataSource) parse(finalResult pipeline.FinalResult) (*big.Int,

// Observe without saving to DB
func (ds *inMemoryDataSource) Observe(ctx context.Context, timestamp ocr2types.ReportTimestamp) (*big.Int, error) {
_, finalResult, err := ds.executeRun(ctx, ObservationTimestamp{
_, trrs, err := ds.executeRun(ctx)
if err != nil {
return nil, err
}

finalResult := trrs.FinalResult(ds.lggr)
setEATelemetry(ds, finalResult, trrs, ObservationTimestamp{
Round: timestamp.Round,
Epoch: timestamp.Epoch,
ConfigDigest: timestamp.ConfigDigest.Hex(),
})
if err != nil {
return nil, err
}

return ds.parse(finalResult)
}

// inMemoryDataSourceCache is a time based cache wrapper for inMemoryDataSource.
// If cache update is overdue Observe defaults to standard inMemoryDataSource behaviour.
type inMemoryDataSourceCache struct {
*inMemoryDataSource
cacheExpiration time.Duration
mu sync.RWMutex
latestUpdateErr error
latestTrrs pipeline.TaskRunResults
latestResult pipeline.FinalResult
}

// updater periodically updates data source cache.
func (ds *inMemoryDataSourceCache) updater() {
ticker := time.NewTicker(ds.cacheExpiration)
for ; true; <-ticker.C {
ctx, cancel := context.WithTimeout(context.Background(), time.Second*10)
if err := ds.updateCache(ctx); err != nil {
ds.lggr.Infow("failed to update cache", "err", err)
}
cancel()
}
}

func (ds *inMemoryDataSourceCache) updateCache(ctx context.Context) error {
ds.mu.Lock()
defer ds.mu.Unlock()
_, ds.latestTrrs, ds.latestUpdateErr = ds.executeRun(ctx)
if ds.latestUpdateErr != nil {
return errors.Wrapf(ds.latestUpdateErr, "error executing run for spec ID %v", ds.spec.ID)
} else if ds.latestTrrs.FinalResult(ds.lggr).HasErrors() {
ds.latestUpdateErr = errjoin.Join(ds.latestTrrs.FinalResult(ds.lggr).AllErrors...)
return errors.Wrapf(ds.latestUpdateErr, "error executing run for spec ID %v", ds.spec.ID)
}

ds.latestResult = ds.latestTrrs.FinalResult(ds.lggr)
return nil
}

func (ds *inMemoryDataSourceCache) get(ctx context.Context) (pipeline.FinalResult, pipeline.TaskRunResults) {
ds.mu.RLock()
// updater didn't error, so we know that the latestResult is fresh
if ds.latestUpdateErr == nil {
defer ds.mu.RUnlock()
return ds.latestResult, ds.latestTrrs
}
ds.mu.RUnlock()

if err := ds.updateCache(ctx); err != nil {
ds.lggr.Errorw("failed to update cache, returning stale result now", "err", err)
}

ds.mu.RLock()
defer ds.mu.RUnlock()
return ds.latestResult, ds.latestTrrs
}

func (ds *inMemoryDataSourceCache) Observe(ctx context.Context, timestamp ocr2types.ReportTimestamp) (*big.Int, error) {
latestResult, latestTrrs := ds.get(ctx)
setEATelemetry(ds.inMemoryDataSource, latestResult, latestTrrs, ObservationTimestamp{
Round: timestamp.Round,
Epoch: timestamp.Epoch,
ConfigDigest: timestamp.ConfigDigest.Hex(),
})

return ds.parse(latestResult)
}

func (ds *dataSourceBase) observe(ctx context.Context, timestamp ObservationTimestamp) (*big.Int, error) {
run, finalResult, err := ds.inMemoryDataSource.executeRun(ctx, timestamp)
run, trrs, err := ds.inMemoryDataSource.executeRun(ctx)
if err != nil {
return nil, err
}
Expand All @@ -201,6 +294,9 @@ func (ds *dataSourceBase) observe(ctx context.Context, timestamp ObservationTime
// a db write block that.
ds.saver.Save(run)

finalResult := trrs.FinalResult(ds.lggr)
setEATelemetry(&ds.inMemoryDataSource, finalResult, trrs, timestamp)

return ds.inMemoryDataSource.parse(finalResult)
}

Expand Down
Loading

0 comments on commit 23f767f

Please sign in to comment.