Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix in mem ds cache err handling cherry pick #12920

Merged
merged 3 commits into from
Apr 23, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .changeset/brown-penguins-grin.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"chainlink": patch
---

Fix in memory data source cache changes/bug that only allowed pipeline results where none of the data sources failed. #bugfix
6 changes: 6 additions & 0 deletions .changeset/poor-masks-fold.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
---
"chainlink": minor
---

Move JuelsPerFeeCoinCacheDuration under JuelsPerFeeCoinCache struct in config. Rename JuelsPerFeeCoinCacheDuration to updateInterval. Add stalenessAlertThreshold to JuelsPerFeeCoinCache config.
StalenessAlertThreshold cfg option has a default of 24 hours which means that it doesn't have to be set unless we want to override the duration after which a stale cache should start throwing errors.
6 changes: 4 additions & 2 deletions core/internal/features/ocr2/features_ocr2_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -487,7 +487,8 @@ juelsPerFeeCoinSource = """

answer1 [type=median index=0];
"""
juelsPerFeeCoinCacheDuration = "1m"
[pluginConfig.juelsPerFeeCoinCache]
updateInterval = "1m"
`, ocrContractAddress, kbs[i].ID(), transmitters[i], fmt.Sprintf("bridge%d", i), i, slowServers[i].URL, i, blockBeforeConfig.Number().Int64(), chainReaderSpec, fmt.Sprintf("bridge%d", i), i, slowServers[i].URL, i), nil)
require.NoError(t, err)
err = apps[i].AddJobV2(testutils.Context(t), &ocrJob)
Expand Down Expand Up @@ -840,7 +841,8 @@ juelsPerFeeCoinSource = """

answer1 [type=median index=0];
"""
juelsPerFeeCoinCacheDuration = "1m"
[pluginConfig.juelsPerFeeCoinCache]
updateInterval = "1m"
`, ocrContractAddress, kbs[i].ID(), transmitters[i], fmt.Sprintf("bridge%d", i), i, slowServers[i].URL, i, fmt.Sprintf("bridge%d", i), i, slowServers[i].URL, i), nil)
require.NoError(t, err)
err = apps[i].AddJobV2(testutils.Context(t), &ocrJob)
Expand Down
9 changes: 6 additions & 3 deletions core/services/feeds/service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,8 @@ ds1_multiply [type=multiply times=1.23];
ds1 -> ds1_parse -> ds1_multiply -> answer1;
answer1 [type=median index=0];
"""
juelsPerFeeCoinCacheDuration = "1m"
[pluginConfig.juelsPerFeeCoinCache]
updateInterval = "1m"
`
const BootstrapTestSpecTemplate = `
type = "bootstrap"
Expand Down Expand Up @@ -2189,7 +2190,8 @@ ds1_multiply [type=multiply times=1.23];
ds1 -> ds1_parse -> ds1_multiply -> answer1;
answer1 [type=median index=0];
"""
juelsPerFeeCoinCacheDuration = "30s"
[pluginConfig.juelsPerFeeCoinCache]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is this going to break existing specs? if not, why not - it seems like a parsing change

Copy link
Contributor Author

@ilija42 ilija42 Apr 22, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We never distributed specs that have this set, default value is always used

updateInterval = "30s"
`
defn2 = `
name = 'LINK / ETH | version 3 | contract 0x0000000000000000000000000000000000000000'
Expand Down Expand Up @@ -2219,7 +2221,8 @@ ds1_multiply [type=multiply times=1.23];
ds1 -> ds1_parse -> ds1_multiply -> answer1;
answer1 [type=median index=0];
"""
juelsPerFeeCoinCacheDuration = "20m"
[pluginConfig.juelsPerFeeCoinCache]
updateInterval = "20m"
`

jp = &feeds.JobProposal{
Expand Down
24 changes: 15 additions & 9 deletions core/services/ocr2/plugins/median/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,14 @@ import (

// The PluginConfig struct contains the custom arguments needed for the Median plugin.
type PluginConfig struct {
JuelsPerFeeCoinPipeline string `json:"juelsPerFeeCoinSource"`
JuelsPerFeeCoinCacheDuration models.Interval `json:"juelsPerFeeCoinCacheDuration"`
JuelsPerFeeCoinCacheDisabled bool `json:"juelsPerFeeCoinCacheDisabled"`
JuelsPerFeeCoinPipeline string `json:"juelsPerFeeCoinSource"`
// JuelsPerFeeCoinCache is disabled when nil
JuelsPerFeeCoinCache *JuelsPerFeeCoinCache `json:"juelsPerFeeCoinCache"`
}

type JuelsPerFeeCoinCache struct {
UpdateInterval models.Interval `json:"updateInterval"`
StalenessAlertThreshold models.Interval `json:"stalenessAlertThreshold"`
}

// ValidatePluginConfig validates the arguments for the Median plugin.
Expand All @@ -25,12 +30,13 @@ func ValidatePluginConfig(config PluginConfig) error {
return errors.Wrap(err, "invalid juelsPerFeeCoinSource pipeline")
}

// unset duration defaults later
if config.JuelsPerFeeCoinCacheDuration != 0 {
if config.JuelsPerFeeCoinCacheDuration.Duration() < time.Second*30 {
return errors.Errorf("juelsPerFeeCoinSource cache duration: %s is below 30 second minimum", config.JuelsPerFeeCoinCacheDuration.Duration().String())
} else if config.JuelsPerFeeCoinCacheDuration.Duration() > time.Minute*20 {
return errors.Errorf("juelsPerFeeCoinSource cache duration: %s is above 20 minute maximum", config.JuelsPerFeeCoinCacheDuration.Duration().String())
// unset durations have a default set late
if config.JuelsPerFeeCoinCache != nil {
updateInterval := config.JuelsPerFeeCoinCache.UpdateInterval.Duration()
if updateInterval != 0 && updateInterval < time.Second*30 {
return errors.Errorf("juelsPerFeeCoinSourceCache update interval: %s is below 30 second minimum", updateInterval.String())
} else if updateInterval > time.Minute*20 {
return errors.Errorf("juelsPerFeeCoinSourceCache update interval: %s is above 20 minute maximum", updateInterval.String())
}
}

Expand Down
6 changes: 3 additions & 3 deletions core/services/ocr2/plugins/median/config/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,11 +32,11 @@ func TestValidatePluginConfig(t *testing.T) {

t.Run("cache duration validation", func(t *testing.T) {
for _, tc := range []testCase{
{"cache duration below minimum", `ds1 [type=bridge name=voter_turnout];`, models.Interval(time.Second * 29), fmt.Errorf("juelsPerFeeCoinSource cache duration: 29s is below 30 second minimum")},
{"cache duration above maximum", `ds1 [type=bridge name=voter_turnout];`, models.Interval(time.Minute*20 + time.Second), fmt.Errorf("juelsPerFeeCoinSource cache duration: 20m1s is above 20 minute maximum")},
{"cache duration below minimum", `ds1 [type=bridge name=voter_turnout];`, models.Interval(time.Second * 29), fmt.Errorf("juelsPerFeeCoinSourceCache update interval: 29s is below 30 second minimum")},
{"cache duration above maximum", `ds1 [type=bridge name=voter_turnout];`, models.Interval(time.Minute*20 + time.Second), fmt.Errorf("juelsPerFeeCoinSourceCache update interval: 20m1s is above 20 minute maximum")},
} {
t.Run(tc.name, func(t *testing.T) {
assert.EqualError(t, ValidatePluginConfig(PluginConfig{JuelsPerFeeCoinPipeline: tc.pipeline, JuelsPerFeeCoinCacheDuration: tc.cacheDuration}), tc.expectedError.Error())
assert.EqualError(t, ValidatePluginConfig(PluginConfig{JuelsPerFeeCoinPipeline: tc.pipeline, JuelsPerFeeCoinCache: &JuelsPerFeeCoinCache{UpdateInterval: tc.cacheDuration}}), tc.expectedError.Error())
})
}
})
Expand Down
4 changes: 2 additions & 2 deletions core/services/ocr2/plugins/median/services.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,9 +127,9 @@ func NewMedianServices(ctx context.Context,
CreatedAt: time.Now(),
}, lggr)

if !pluginConfig.JuelsPerFeeCoinCacheDisabled {
if pluginConfig.JuelsPerFeeCoinCache != nil {
lggr.Infof("juelsPerFeeCoin data source caching is enabled")
juelsPerFeeCoinSourceCache, err2 := ocrcommon.NewInMemoryDataSourceCache(juelsPerFeeCoinSource, kvStore, pluginConfig.JuelsPerFeeCoinCacheDuration.Duration())
juelsPerFeeCoinSourceCache, err2 := ocrcommon.NewInMemoryDataSourceCache(juelsPerFeeCoinSource, kvStore, *pluginConfig.JuelsPerFeeCoinCache)
if err2 != nil {
return nil, err2
}
Expand Down
99 changes: 56 additions & 43 deletions core/services/ocrcommon/data_source.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package ocrcommon
import (
"context"
"encoding/json"
errjoin "errors"
"fmt"
"math/big"
"sync"
Expand All @@ -19,6 +18,7 @@ import (
serializablebig "github.com/smartcontractkit/chainlink/v2/core/chains/evm/utils/big"
"github.com/smartcontractkit/chainlink/v2/core/logger"
"github.com/smartcontractkit/chainlink/v2/core/services/job"
"github.com/smartcontractkit/chainlink/v2/core/services/ocr2/plugins/median/config"
"github.com/smartcontractkit/chainlink/v2/core/services/pipeline"

"github.com/smartcontractkit/chainlink/v2/core/utils"
Expand Down Expand Up @@ -103,8 +103,8 @@ func NewInMemoryDataSource(pr pipeline.Runner, jb job.Job, spec pipeline.Spec, l
}
}

const defaultCacheFreshness = time.Minute * 5
const defaultCacheFreshnessAlert = time.Hour * 24
const defaultUpdateInterval = time.Minute * 5
const defaultStalenessAlertThreshold = time.Hour * 24
const dataSourceCacheKey = "dscache"

type DataSourceCacheService interface {
Expand All @@ -113,22 +113,27 @@ type DataSourceCacheService interface {
median.DataSource
}

func NewInMemoryDataSourceCache(ds median.DataSource, kvStore job.KVStore, cacheFreshness time.Duration) (DataSourceCacheService, error) {
func NewInMemoryDataSourceCache(ds median.DataSource, kvStore job.KVStore, cacheCfg config.JuelsPerFeeCoinCache) (DataSourceCacheService, error) {
inMemoryDS, ok := ds.(*inMemoryDataSource)
if !ok {
return nil, errors.Errorf("unsupported data source type: %T, only inMemoryDataSource supported", ds)
}

if cacheFreshness == 0 {
cacheFreshness = defaultCacheFreshness
updateInterval, stalenessAlertThreshold := cacheCfg.UpdateInterval.Duration(), cacheCfg.StalenessAlertThreshold.Duration()
if updateInterval == 0 {
updateInterval = defaultUpdateInterval
}
if stalenessAlertThreshold == 0 {
stalenessAlertThreshold = defaultStalenessAlertThreshold
}

dsCache := &inMemoryDataSourceCache{
kvStore: kvStore,
cacheFreshness: cacheFreshness,
inMemoryDataSource: inMemoryDS,
chStop: make(chan struct{}),
chDone: make(chan struct{}),
inMemoryDataSource: inMemoryDS,
kvStore: kvStore,
updateInterval: updateInterval,
stalenessAlertThreshold: stalenessAlertThreshold,
chStop: make(chan struct{}),
chDone: make(chan struct{}),
}
return dsCache, nil
}
Expand Down Expand Up @@ -231,16 +236,18 @@ func (ds *inMemoryDataSource) Observe(ctx context.Context, timestamp ocr2types.R
// If cache update is overdue Observe defaults to standard inMemoryDataSource behaviour.
type inMemoryDataSourceCache struct {
*inMemoryDataSource
// cacheFreshness indicates duration between cache updates.
// Even if updates fail, previous values are returned.
cacheFreshness time.Duration
mu sync.RWMutex
chStop services.StopChan
chDone chan struct{}
latestUpdateErr error
latestTrrs pipeline.TaskRunResults
latestResult pipeline.FinalResult
kvStore job.KVStore
// updateInterval indicates duration between cache updates.
// Even if update fail, previous values are returned.
updateInterval time.Duration
// stalenessAlertThreshold indicates duration before logs raise severity level because of stale cache.
stalenessAlertThreshold time.Duration
mu sync.RWMutex
chStop services.StopChan
chDone chan struct{}
latestUpdateErr error
latestTrrs pipeline.TaskRunResults
latestResult pipeline.FinalResult
kvStore job.KVStore
}

func (ds *inMemoryDataSourceCache) Start(context.Context) error {
Expand All @@ -256,7 +263,7 @@ func (ds *inMemoryDataSourceCache) Close() error {

// updater periodically updates data source cache.
func (ds *inMemoryDataSourceCache) updater() {
ticker := time.NewTicker(ds.cacheFreshness)
ticker := time.NewTicker(ds.updateInterval)
updateCache := func() {
ctx, cancel := ds.chStop.CtxCancel(context.WithTimeout(context.Background(), time.Second*10))
defer cancel()
Expand Down Expand Up @@ -286,31 +293,30 @@ func (ds *inMemoryDataSourceCache) updateCache(ctx context.Context) error {
ds.mu.Lock()
defer ds.mu.Unlock()

// check for any errors
_, latestTrrs, latestUpdateErr := ds.executeRun(ctx)
if latestTrrs.FinalResult(ds.lggr).HasErrors() {
latestUpdateErr = errjoin.Join(append(latestTrrs.FinalResult(ds.lggr).AllErrors, latestUpdateErr)...)
}

if latestUpdateErr != nil {
_, latestTrrs, err := ds.executeRun(ctx)
if err != nil {
previousUpdateErr := ds.latestUpdateErr
ds.latestUpdateErr = latestUpdateErr
// raise log severity
ds.latestUpdateErr = err
// warn log if previous cache update also errored
if previousUpdateErr != nil {
ds.lggr.Warnf("consecutive cache updates errored: previous err: %v new err: %v", previousUpdateErr, ds.latestUpdateErr)
}
return errors.Wrapf(ds.latestUpdateErr, "error executing run for spec ID %v", ds.spec.ID)

return errors.Wrapf(ds.latestUpdateErr, "error updating in memory data source cache for spec ID %v", ds.spec.ID)
}

ds.latestTrrs = latestTrrs
ds.latestResult = ds.latestTrrs.FinalResult(ds.lggr)
value, err := ds.inMemoryDataSource.parse(ds.latestResult)
value, err := ds.inMemoryDataSource.parse(latestTrrs.FinalResult(ds.lggr))
if err != nil {
return errors.Wrapf(err, "invalid result")
ds.latestUpdateErr = errors.Wrapf(err, "invalid result")
return ds.latestUpdateErr
}

// backup in case data source fails continuously and node gets rebooted
// update cache values
ds.latestTrrs = latestTrrs
ds.latestResult = ds.latestTrrs.FinalResult(ds.lggr)
ds.latestUpdateErr = nil

// backup in case data source fails continuously and node gets rebooted
timePairBytes, err := json.Marshal(&ResultTimePair{Result: *serializablebig.New(value), Time: time.Now()})
if err != nil {
return fmt.Errorf("failed to marshal result time pair, err: %w", err)
Expand All @@ -333,7 +339,7 @@ func (ds *inMemoryDataSourceCache) get(ctx context.Context) (pipeline.FinalResul
ds.mu.RUnlock()

if err := ds.updateCache(ctx); err != nil {
ds.lggr.Warnf("failed to update cache err: %v, returning stale result now, err: %v", err)
ds.lggr.Warnf("failed to update cache, returning stale result now, err: %v", err)
}

ds.mu.RLock()
Expand All @@ -349,15 +355,15 @@ func (ds *inMemoryDataSourceCache) Observe(ctx context.Context, timestamp ocr2ty

timePairBytes, err := ds.kvStore.Get(ctx, dataSourceCacheKey)
if err != nil {
return nil, fmt.Errorf("failed to get result time pair bytes, err: %w", err)
return nil, fmt.Errorf("in memory data source cache is empty and failed to get backup persisted value, err: %w", err)
}

if err := json.Unmarshal(timePairBytes, &resTime); err != nil {
return nil, fmt.Errorf("failed to unmarshal result time pair bytes, err: %w", err)
if err = json.Unmarshal(timePairBytes, &resTime); err != nil {
return nil, fmt.Errorf("in memory data source cache is empty and failed to unmarshal backup persisted value, err: %w", err)
}

if time.Since(resTime.Time) >= defaultCacheFreshnessAlert {
ds.lggr.Errorf("cache hasn't been updated for over %v, latestUpdateErr is: %v", defaultCacheFreshnessAlert, ds.latestUpdateErr)
if time.Since(resTime.Time) >= ds.stalenessAlertThreshold {
ds.lggr.Errorf("in memory data source cache is empty and the persisted value hasn't been updated for over %v, latestUpdateErr is: %v", ds.stalenessAlertThreshold, ds.latestUpdateErr)
}
return resTime.Result.ToInt(), nil
}
Expand All @@ -368,6 +374,13 @@ func (ds *inMemoryDataSourceCache) Observe(ctx context.Context, timestamp ocr2ty
ConfigDigest: timestamp.ConfigDigest.Hex(),
})

// if last update was unsuccessful, check how much time passed since a successful update
if ds.latestUpdateErr != nil {
if time.Since(ds.latestTrrs.GetTaskRunResultsFinishedAt()) >= ds.stalenessAlertThreshold {
ds.lggr.Errorf("in memory cache is old and hasn't been updated for over %v, latestUpdateErr is: %v", ds.stalenessAlertThreshold, ds.latestUpdateErr)
}

}
return ds.parse(latestResult)
}

Expand Down
8 changes: 5 additions & 3 deletions core/services/ocrcommon/data_source_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,11 @@ import (
"github.com/smartcontractkit/chainlink/v2/core/logger"
"github.com/smartcontractkit/chainlink/v2/core/services/job"
"github.com/smartcontractkit/chainlink/v2/core/services/job/mocks"
"github.com/smartcontractkit/chainlink/v2/core/services/ocr2/plugins/median/config"
"github.com/smartcontractkit/chainlink/v2/core/services/ocrcommon"
"github.com/smartcontractkit/chainlink/v2/core/services/pipeline"
pipelinemocks "github.com/smartcontractkit/chainlink/v2/core/services/pipeline/mocks"
"github.com/smartcontractkit/chainlink/v2/core/store/models"
)

var (
Expand Down Expand Up @@ -78,7 +80,7 @@ func Test_CachedInMemoryDataSourceErrHandling(t *testing.T) {
mockKVStore := mocks.KVStore{}
mockKVStore.On("Store", mock.Anything, mock.Anything, mock.Anything).Return(nil)
mockKVStore.On("Get", mock.Anything, mock.Anything).Return(nil, nil)
dsCache, err := ocrcommon.NewInMemoryDataSourceCache(ds, &mockKVStore, time.Second*2)
dsCache, err := ocrcommon.NewInMemoryDataSourceCache(ds, &mockKVStore, config.JuelsPerFeeCoinCache{UpdateInterval: models.Interval(time.Second * 2)})
require.NoError(t, err)
servicetest.Run(t, dsCache)

Expand Down Expand Up @@ -112,7 +114,7 @@ func Test_CachedInMemoryDataSourceErrHandling(t *testing.T) {
mockKVStore.On("Get", mock.Anything, mock.Anything).Return(result, nil)

// set updater to a long time so that it doesn't log errors after the test is done
dsCache, err := ocrcommon.NewInMemoryDataSourceCache(ds, &mockKVStore, time.Hour*100)
dsCache, err := ocrcommon.NewInMemoryDataSourceCache(ds, &mockKVStore, config.JuelsPerFeeCoinCache{UpdateInterval: models.Interval(time.Hour * 100)})
require.NoError(t, err)
changeResultValue(runner, "-1", true, false)
servicetest.Run(t, dsCache)
Expand All @@ -131,7 +133,7 @@ func Test_CachedInMemoryDataSourceErrHandling(t *testing.T) {
mockKVStore.On("Get", mock.Anything, mock.Anything).Return(nil, assert.AnError)

// set updater to a long time so that it doesn't log errors after the test is done
dsCache, err := ocrcommon.NewInMemoryDataSourceCache(ds, &mockKVStore, time.Hour*100)
dsCache, err := ocrcommon.NewInMemoryDataSourceCache(ds, &mockKVStore, config.JuelsPerFeeCoinCache{UpdateInterval: models.Interval(time.Hour * 100)})
require.NoError(t, err)
changeResultValue(runner, "-1", true, false)
servicetest.Run(t, dsCache)
Expand Down
11 changes: 11 additions & 0 deletions core/services/pipeline/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -217,6 +217,17 @@ func (result *TaskRunResult) IsTerminal() bool {
// TaskRunResults represents a collection of results for all task runs for one pipeline run
type TaskRunResults []TaskRunResult

// GetTaskRunResultsFinishedAt returns latest finishedAt time from TaskRunResults.
func (trrs TaskRunResults) GetTaskRunResultsFinishedAt() time.Time {
var finishedTime time.Time
for _, trr := range trrs {
if trr.FinishedAt.Valid && trr.FinishedAt.Time.After(finishedTime) {
finishedTime = trr.FinishedAt.Time
}
}
return finishedTime
}

// FinalResult pulls the FinalResult for the pipeline_run from the task runs
// It needs to respect the output index of each task
func (trrs TaskRunResults) FinalResult(l logger.Logger) FinalResult {
Expand Down
4 changes: 2 additions & 2 deletions tools/ci/install_solana
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
#!/usr/bin/env bash

set -euo pipefail
VERSION=v1.13.3
SHASUM=3a063fe58e6f8bc9e9de84a8d1b96da87e9184cb357d462522f7ec8a2c23bec2
VERSION=v1.17.28
SHASUM=97faa4d14becfccd3bc539dbc0aaf28c84cfe9d80d299ec70092fb5844403724

echo "Installing solana@${VERSION}"
curl -sSfL https://release.solana.com/$VERSION/install --output install_solana.sh \
Expand Down
Loading