diff --git a/.changeset/brown-penguins-grin.md b/.changeset/brown-penguins-grin.md new file mode 100644 index 00000000000..24a06a030fc --- /dev/null +++ b/.changeset/brown-penguins-grin.md @@ -0,0 +1,5 @@ +--- +"chainlink": patch +--- + +Fix in memory data source cache changes/bug that only allowed pipeline results where none of the data sources failed. #bugfix diff --git a/core/services/ocrcommon/data_source.go b/core/services/ocrcommon/data_source.go index f07cfc0ab7a..9ca111dea68 100644 --- a/core/services/ocrcommon/data_source.go +++ b/core/services/ocrcommon/data_source.go @@ -3,7 +3,6 @@ package ocrcommon import ( "context" "encoding/json" - errjoin "errors" "fmt" "math/big" "sync" @@ -294,31 +293,30 @@ func (ds *inMemoryDataSourceCache) updateCache(ctx context.Context) error { ds.mu.Lock() defer ds.mu.Unlock() - // check for any errors - _, latestTrrs, latestUpdateErr := ds.executeRun(ctx) - if latestTrrs.FinalResult(ds.lggr).HasErrors() { - latestUpdateErr = errjoin.Join(append(latestTrrs.FinalResult(ds.lggr).AllErrors, latestUpdateErr)...) - } - - if latestUpdateErr != nil { + _, latestTrrs, err := ds.executeRun(ctx) + if err != nil { previousUpdateErr := ds.latestUpdateErr - ds.latestUpdateErr = latestUpdateErr - // raise log severity + ds.latestUpdateErr = err + // warn log if previous cache update also errored if previousUpdateErr != nil { ds.lggr.Warnf("consecutive cache updates errored: previous err: %v new err: %v", previousUpdateErr, ds.latestUpdateErr) } - return errors.Wrapf(ds.latestUpdateErr, "error executing run for spec ID %v", ds.spec.ID) + + return errors.Wrapf(ds.latestUpdateErr, "error updating in memory data source cache for spec ID %v", ds.spec.ID) } - ds.latestTrrs = latestTrrs - ds.latestResult = ds.latestTrrs.FinalResult(ds.lggr) - value, err := ds.inMemoryDataSource.parse(ds.latestResult) + value, err := ds.inMemoryDataSource.parse(latestTrrs.FinalResult(ds.lggr)) if err != nil { - return errors.Wrapf(err, "invalid result") + ds.latestUpdateErr = errors.Wrapf(err, "invalid result") + return ds.latestUpdateErr } - // backup in case data source fails continuously and node gets rebooted + // update cache values + ds.latestTrrs = latestTrrs + ds.latestResult = ds.latestTrrs.FinalResult(ds.lggr) + ds.latestUpdateErr = nil + // backup in case data source fails continuously and node gets rebooted timePairBytes, err := json.Marshal(&ResultTimePair{Result: *serializablebig.New(value), Time: time.Now()}) if err != nil { return fmt.Errorf("failed to marshal result time pair, err: %w", err) @@ -341,7 +339,7 @@ func (ds *inMemoryDataSourceCache) get(ctx context.Context) (pipeline.FinalResul ds.mu.RUnlock() if err := ds.updateCache(ctx); err != nil { - ds.lggr.Warnf("failed to update cache err: %v, returning stale result now, err: %v", err) + ds.lggr.Warnf("failed to update cache, returning stale result now, err: %v", err) } ds.mu.RLock() @@ -357,15 +355,15 @@ func (ds *inMemoryDataSourceCache) Observe(ctx context.Context, timestamp ocr2ty timePairBytes, err := ds.kvStore.Get(ctx, dataSourceCacheKey) if err != nil { - return nil, fmt.Errorf("failed to get result time pair bytes, err: %w", err) + return nil, fmt.Errorf("in memory data source cache is empty and failed to get backup persisted value, err: %w", err) } - if err := json.Unmarshal(timePairBytes, &resTime); err != nil { - return nil, fmt.Errorf("failed to unmarshal result time pair bytes, err: %w", err) + if err = json.Unmarshal(timePairBytes, &resTime); err != nil { + return nil, fmt.Errorf("in memory data source cache is empty and failed to unmarshal backup persisted value, err: %w", err) } if time.Since(resTime.Time) >= ds.stalenessAlertThreshold { - ds.lggr.Errorf("cache hasn't been updated for over %v, latestUpdateErr is: %v", ds.stalenessAlertThreshold, ds.latestUpdateErr) + ds.lggr.Errorf("in memory data source cache is empty and the persisted value hasn't been updated for over %v, latestUpdateErr is: %v", ds.stalenessAlertThreshold, ds.latestUpdateErr) } return resTime.Result.ToInt(), nil } @@ -376,6 +374,13 @@ func (ds *inMemoryDataSourceCache) Observe(ctx context.Context, timestamp ocr2ty ConfigDigest: timestamp.ConfigDigest.Hex(), }) + // if last update was unsuccessful, check how much time passed since a successful update + if ds.latestUpdateErr != nil { + if time.Since(ds.latestTrrs.GetTaskRunResultsFinishedAt()) >= ds.stalenessAlertThreshold { + ds.lggr.Errorf("in memory cache is old and hasn't been updated for over %v, latestUpdateErr is: %v", ds.stalenessAlertThreshold, ds.latestUpdateErr) + } + + } return ds.parse(latestResult) } diff --git a/core/services/pipeline/common.go b/core/services/pipeline/common.go index a88b2165a2e..a0fc28c6862 100644 --- a/core/services/pipeline/common.go +++ b/core/services/pipeline/common.go @@ -217,6 +217,17 @@ func (result *TaskRunResult) IsTerminal() bool { // TaskRunResults represents a collection of results for all task runs for one pipeline run type TaskRunResults []TaskRunResult +// GetTaskRunResultsFinishedAt returns latest finishedAt time from TaskRunResults. +func (trrs TaskRunResults) GetTaskRunResultsFinishedAt() time.Time { + var finishedTime time.Time + for _, trr := range trrs { + if trr.FinishedAt.Valid && trr.FinishedAt.Time.After(finishedTime) { + finishedTime = trr.FinishedAt.Time + } + } + return finishedTime +} + // FinalResult pulls the FinalResult for the pipeline_run from the task runs // It needs to respect the output index of each task func (trrs TaskRunResults) FinalResult(l logger.Logger) FinalResult {