Skip to content

Commit

Permalink
Improve in memory data source cache Observe() logs and error messages
Browse files Browse the repository at this point in the history
  • Loading branch information
ilija42 committed Apr 21, 2024
1 parent 5bcf9cd commit 4ab5ed4
Show file tree
Hide file tree
Showing 2 changed files with 22 additions and 4 deletions.
15 changes: 11 additions & 4 deletions core/services/ocrcommon/data_source.go
Original file line number Diff line number Diff line change
Expand Up @@ -355,15 +355,15 @@ func (ds *inMemoryDataSourceCache) Observe(ctx context.Context, timestamp ocr2ty

timePairBytes, err := ds.kvStore.Get(ctx, dataSourceCacheKey)
if err != nil {
return nil, fmt.Errorf("failed to get result time pair bytes, err: %w", err)
return nil, fmt.Errorf("in memory data source cache is empty and failed to get backup persisted value, err: %w", err)
}

if err := json.Unmarshal(timePairBytes, &resTime); err != nil {
return nil, fmt.Errorf("failed to unmarshal result time pair bytes, err: %w", err)
if err = json.Unmarshal(timePairBytes, &resTime); err != nil {
return nil, fmt.Errorf("in memory data source cache is empty and failed to unmarshal backup persisted value, err: %w", err)
}

if time.Since(resTime.Time) >= ds.stalenessAlertThreshold {
ds.lggr.Errorf("cache hasn't been updated for over %v, latestUpdateErr is: %v", ds.stalenessAlertThreshold, ds.latestUpdateErr)
ds.lggr.Errorf("in memory data source cache is empty and the persisted value hasn't been updated for over %v, latestUpdateErr is: %v", ds.stalenessAlertThreshold, ds.latestUpdateErr)
}
return resTime.Result.ToInt(), nil
}
Expand All @@ -374,6 +374,13 @@ func (ds *inMemoryDataSourceCache) Observe(ctx context.Context, timestamp ocr2ty
ConfigDigest: timestamp.ConfigDigest.Hex(),
})

// if last update was unsuccessful, check how much time passed since a successful update
if ds.latestUpdateErr != nil {
if time.Since(ds.latestTrrs.GetTaskRunResultsFinishedAt()) >= ds.stalenessAlertThreshold {
ds.lggr.Errorf("in memory cache is old and hasn't been updated for over %v, latestUpdateErr is: %v", ds.stalenessAlertThreshold, ds.latestUpdateErr)
}

}
return ds.parse(latestResult)
}

Expand Down
11 changes: 11 additions & 0 deletions core/services/pipeline/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -217,6 +217,17 @@ func (result *TaskRunResult) IsTerminal() bool {
// TaskRunResults represents a collection of results for all task runs for one pipeline run
type TaskRunResults []TaskRunResult

// GetTaskRunResultsFinishedAt returns latest finishedAt time from TaskRunResults.
func (trrs TaskRunResults) GetTaskRunResultsFinishedAt() time.Time {
var finishedTime time.Time
for _, trr := range trrs {
if trr.FinishedAt.Valid && trr.FinishedAt.Time.After(finishedTime) {
finishedTime = trr.FinishedAt.Time
}
}
return finishedTime
}

// FinalResult pulls the FinalResult for the pipeline_run from the task runs
// It needs to respect the output index of each task
func (trrs TaskRunResults) FinalResult(l logger.Logger) FinalResult {
Expand Down

0 comments on commit 4ab5ed4

Please sign in to comment.