diff --git a/core/services/pipeline/runner.go b/core/services/pipeline/runner.go index 2fbce7ed00d..40444ffd3e0 100644 --- a/core/services/pipeline/runner.go +++ b/core/services/pipeline/runner.go @@ -232,6 +232,17 @@ func init() { } } +// overtimeContext returns a modified context for overtime work, since tasks are expected to keep running and return +// results, even after context cancellation. +func overtimeContext(ctx context.Context) (context.Context, context.CancelFunc) { + if d, ok := ctx.Deadline(); ok { + // extend deadline + return context.WithDeadline(context.WithoutCancel(ctx), d.Add(overtime)) + } + // remove cancellation + return context.WithoutCancel(ctx), func() {} +} + func (r *runner) ExecuteRun( ctx context.Context, spec Spec, diff --git a/core/services/pipeline/task.bridge.go b/core/services/pipeline/task.bridge.go index 7995cf99296..103e5664666 100644 --- a/core/services/pipeline/task.bridge.go +++ b/core/services/pipeline/task.bridge.go @@ -109,7 +109,10 @@ func (t *BridgeTask) Run(ctx context.Context, lggr logger.Logger, vars Vars, inp return Result{Error: errors.Errorf("headers must have an even number of elements")}, runInfo } - url, err := t.getBridgeURLFromName(ctx, name) + overtimeCtx, cancel := overtimeContext(ctx) + defer cancel() + + url, err := t.getBridgeURLFromName(overtimeCtx, name) if err != nil { return Result{Error: err}, runInfo } @@ -181,7 +184,7 @@ func (t *BridgeTask) Run(ctx context.Context, lggr logger.Logger, vars Vars, inp } var cacheErr error - responseBytes, cacheErr = t.orm.GetCachedResponse(ctx, t.dotID, t.specId, cacheDuration) + responseBytes, cacheErr = t.orm.GetCachedResponse(overtimeCtx, t.dotID, t.specId, cacheDuration) if cacheErr != nil { promBridgeCacheErrors.WithLabelValues(t.Name).Inc() if !errors.Is(cacheErr, sql.ErrNoRows) { @@ -217,7 +220,7 @@ func (t *BridgeTask) Run(ctx context.Context, lggr logger.Logger, vars Vars, inp } if !cachedResponse && cacheTTL > 0 { - err := t.orm.UpsertBridgeResponse(ctx, t.dotID, t.specId, responseBytes) + err := t.orm.UpsertBridgeResponse(overtimeCtx, t.dotID, t.specId, responseBytes) if err != nil { lggr.Errorw("Bridge task: failed to upsert response in bridge cache", "err", err) } @@ -241,7 +244,7 @@ func (t *BridgeTask) Run(ctx context.Context, lggr logger.Logger, vars Vars, inp return result, runInfo } -func (t BridgeTask) getBridgeURLFromName(ctx context.Context, name StringParam) (URLParam, error) { +func (t *BridgeTask) getBridgeURLFromName(ctx context.Context, name StringParam) (URLParam, error) { bt, err := t.orm.FindBridge(ctx, bridges.BridgeName(name)) if err != nil { return URLParam{}, errors.Wrapf(err, "could not find bridge with name '%s'", name) diff --git a/core/services/pipeline/task.bridge_test.go b/core/services/pipeline/task.bridge_test.go index e95aef4984c..b707aa62dcf 100644 --- a/core/services/pipeline/task.bridge_test.go +++ b/core/services/pipeline/task.bridge_test.go @@ -1,6 +1,7 @@ package pipeline_test import ( + "context" "encoding/json" "fmt" "io" @@ -1139,3 +1140,74 @@ func TestBridgeTask_AdapterResponseStatusFailure(t *testing.T) { require.False(t, runInfo.IsRetryable) require.False(t, runInfo.IsPending) } + +func TestBridgeTask_AdapterTimeout(t *testing.T) { + t.Parallel() + ctx := testutils.Context(t) + + db := pgtest.NewSqlxDB(t) + cfg := configtest.NewGeneralConfig(t, func(c *chainlink.Config, s *chainlink.Secrets) { + c.WebServer.BridgeCacheTTL = commonconfig.MustNewDuration(1 * time.Minute) + }) + + s1 := httptest.NewServer( + http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + time.Sleep(time.Second) // delay enough to time-out + })) + defer s1.Close() + + feedURL, err := url.ParseRequestURI(s1.URL) + require.NoError(t, err) + + orm := bridges.NewORM(db) + _, bridge := cltest.MustCreateBridge(t, db, cltest.BridgeOpts{URL: feedURL.String()}) + + task := pipeline.BridgeTask{ + BaseTask: pipeline.NewBaseTask(0, "bridge", nil, nil, 0), + Name: bridge.Name.String(), + RequestData: btcUSDPairing, + } + c := clhttptest.NewTestLocalOnlyHTTPClient() + trORM := pipeline.NewORM(db, logger.TestLogger(t), cfg.JobPipeline().MaxSuccessfulRuns()) + specID, err := trORM.CreateSpec(ctx, pipeline.Pipeline{}, *models.NewInterval(5 * time.Minute)) + require.NoError(t, err) + task.HelperSetDependencies(cfg.JobPipeline(), cfg.WebServer(), orm, specID, uuid.UUID{}, c) + + // Insert entry 1m in the past, stale value, should not be used in case of EA failure. + _, err = db.ExecContext(ctx, `INSERT INTO bridge_last_value(dot_id, spec_id, value, finished_at) + VALUES($1, $2, $3, $4) ON CONFLICT ON CONSTRAINT bridge_last_value_pkey + DO UPDATE SET value = $3, finished_at = $4;`, task.DotID(), specID, big.NewInt(9700).Bytes(), time.Now()) + require.NoError(t, err) + + vars := pipeline.NewVarsFrom( + map[string]interface{}{ + "jobRun": map[string]interface{}{ + "meta": map[string]interface{}{ + "shouldFail": true, + }, + }, + }, + ) + + t.Run("pre-cancelled", func(t *testing.T) { + ctx, cancel := context.WithCancel(testutils.Context(t)) + cancel() // pre-cancelled + result, runInfo := task.Run(ctx, logger.TestLogger(t), vars, nil) + + require.NoError(t, result.Error) + require.NotNil(t, result.Value) + require.False(t, runInfo.IsRetryable) + require.False(t, runInfo.IsPending) + }) + + t.Run("short", func(t *testing.T) { + ctx, cancel := context.WithTimeout(testutils.Context(t), time.Millisecond) + t.Cleanup(cancel) + result, runInfo := task.Run(ctx, logger.TestLogger(t), vars, nil) + + require.NoError(t, result.Error) + require.NotNil(t, result.Value) + require.False(t, runInfo.IsRetryable) + require.False(t, runInfo.IsPending) + }) +}