-
Notifications
You must be signed in to change notification settings - Fork 1.7k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[BCF-3178] - Improve err handling and logs for in memory data source cache #12907
Merged
Merged
Changes from all commits
Commits
Show all changes
4 commits
Select commit
Hold shift + click to select a range
86fd239
Fix inMemoryDataSourceCache get() warn log formatting
ilija42 5bcf9cd
Fix in mem ds cache updateCache() to save runs even if some ds failed
ilija42 4ab5ed4
Improve in memory data source cache Observe() logs and error messages
ilija42 23ef713
Add changeset
ilija42 File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,5 @@ | ||
--- | ||
"chainlink": patch | ||
--- | ||
|
||
Fix in memory data source cache changes/bug that only allowed pipeline results where none of the data sources failed. #bugfix |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -3,7 +3,6 @@ package ocrcommon | |
import ( | ||
"context" | ||
"encoding/json" | ||
errjoin "errors" | ||
"fmt" | ||
"math/big" | ||
"sync" | ||
|
@@ -294,31 +293,30 @@ func (ds *inMemoryDataSourceCache) updateCache(ctx context.Context) error { | |
ds.mu.Lock() | ||
defer ds.mu.Unlock() | ||
|
||
// check for any errors | ||
_, latestTrrs, latestUpdateErr := ds.executeRun(ctx) | ||
if latestTrrs.FinalResult(ds.lggr).HasErrors() { | ||
latestUpdateErr = errjoin.Join(append(latestTrrs.FinalResult(ds.lggr).AllErrors, latestUpdateErr)...) | ||
} | ||
|
||
if latestUpdateErr != nil { | ||
_, latestTrrs, err := ds.executeRun(ctx) | ||
if err != nil { | ||
previousUpdateErr := ds.latestUpdateErr | ||
ds.latestUpdateErr = latestUpdateErr | ||
// raise log severity | ||
ds.latestUpdateErr = err | ||
// warn log if previous cache update also errored | ||
if previousUpdateErr != nil { | ||
ds.lggr.Warnf("consecutive cache updates errored: previous err: %v new err: %v", previousUpdateErr, ds.latestUpdateErr) | ||
} | ||
return errors.Wrapf(ds.latestUpdateErr, "error executing run for spec ID %v", ds.spec.ID) | ||
|
||
return errors.Wrapf(ds.latestUpdateErr, "error updating in memory data source cache for spec ID %v", ds.spec.ID) | ||
} | ||
|
||
ds.latestTrrs = latestTrrs | ||
ds.latestResult = ds.latestTrrs.FinalResult(ds.lggr) | ||
value, err := ds.inMemoryDataSource.parse(ds.latestResult) | ||
value, err := ds.inMemoryDataSource.parse(latestTrrs.FinalResult(ds.lggr)) | ||
if err != nil { | ||
return errors.Wrapf(err, "invalid result") | ||
ds.latestUpdateErr = errors.Wrapf(err, "invalid result") | ||
return ds.latestUpdateErr | ||
} | ||
|
||
// backup in case data source fails continuously and node gets rebooted | ||
// update cache values | ||
ds.latestTrrs = latestTrrs | ||
ds.latestResult = ds.latestTrrs.FinalResult(ds.lggr) | ||
ds.latestUpdateErr = nil | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. moved this down to be more cautious since we now don't check for all errors, although median task should handle everything properly |
||
|
||
// backup in case data source fails continuously and node gets rebooted | ||
timePairBytes, err := json.Marshal(&ResultTimePair{Result: *serializablebig.New(value), Time: time.Now()}) | ||
if err != nil { | ||
return fmt.Errorf("failed to marshal result time pair, err: %w", err) | ||
|
@@ -341,7 +339,7 @@ func (ds *inMemoryDataSourceCache) get(ctx context.Context) (pipeline.FinalResul | |
ds.mu.RUnlock() | ||
|
||
if err := ds.updateCache(ctx); err != nil { | ||
ds.lggr.Warnf("failed to update cache err: %v, returning stale result now, err: %v", err) | ||
ds.lggr.Warnf("failed to update cache, returning stale result now, err: %v", err) | ||
} | ||
|
||
ds.mu.RLock() | ||
|
@@ -357,15 +355,15 @@ func (ds *inMemoryDataSourceCache) Observe(ctx context.Context, timestamp ocr2ty | |
|
||
timePairBytes, err := ds.kvStore.Get(ctx, dataSourceCacheKey) | ||
if err != nil { | ||
return nil, fmt.Errorf("failed to get result time pair bytes, err: %w", err) | ||
return nil, fmt.Errorf("in memory data source cache is empty and failed to get backup persisted value, err: %w", err) | ||
} | ||
|
||
if err := json.Unmarshal(timePairBytes, &resTime); err != nil { | ||
return nil, fmt.Errorf("failed to unmarshal result time pair bytes, err: %w", err) | ||
if err = json.Unmarshal(timePairBytes, &resTime); err != nil { | ||
return nil, fmt.Errorf("in memory data source cache is empty and failed to unmarshal backup persisted value, err: %w", err) | ||
} | ||
|
||
if time.Since(resTime.Time) >= ds.stalenessAlertThreshold { | ||
ds.lggr.Errorf("cache hasn't been updated for over %v, latestUpdateErr is: %v", ds.stalenessAlertThreshold, ds.latestUpdateErr) | ||
ds.lggr.Errorf("in memory data source cache is empty and the persisted value hasn't been updated for over %v, latestUpdateErr is: %v", ds.stalenessAlertThreshold, ds.latestUpdateErr) | ||
} | ||
return resTime.Result.ToInt(), nil | ||
} | ||
|
@@ -376,6 +374,13 @@ func (ds *inMemoryDataSourceCache) Observe(ctx context.Context, timestamp ocr2ty | |
ConfigDigest: timestamp.ConfigDigest.Hex(), | ||
}) | ||
|
||
// if last update was unsuccessful, check how much time passed since a successful update | ||
if ds.latestUpdateErr != nil { | ||
if time.Since(ds.latestTrrs.GetTaskRunResultsFinishedAt()) >= ds.stalenessAlertThreshold { | ||
ds.lggr.Errorf("in memory cache is old and hasn't been updated for over %v, latestUpdateErr is: %v", ds.stalenessAlertThreshold, ds.latestUpdateErr) | ||
} | ||
|
||
} | ||
return ds.parse(latestResult) | ||
} | ||
|
||
|
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This checked for any errors in the pipeline results, which doesn't make sense since multiple bridge data sources are used and not always all of them are successful