Skip to content

Commit

Permalink
core/services/pipeline: bridge task must continue after cancellation
Browse files Browse the repository at this point in the history
  • Loading branch information
jmank88 authored and reductionista committed May 31, 2024
1 parent a06b2f2 commit b95fe98
Show file tree
Hide file tree
Showing 3 changed files with 90 additions and 4 deletions.
11 changes: 11 additions & 0 deletions core/services/pipeline/runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -232,6 +232,17 @@ func init() {
}
}

// overtimeContext returns a modified context for overtime work, since tasks are expected to keep running and return
// results, even after context cancellation.
func overtimeContext(ctx context.Context) (context.Context, context.CancelFunc) {
if d, ok := ctx.Deadline(); ok {
// extend deadline
return context.WithDeadline(context.WithoutCancel(ctx), d.Add(overtime))
}
// remove cancellation
return context.WithoutCancel(ctx), func() {}
}

func (r *runner) ExecuteRun(
ctx context.Context,
spec Spec,
Expand Down
11 changes: 7 additions & 4 deletions core/services/pipeline/task.bridge.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,10 @@ func (t *BridgeTask) Run(ctx context.Context, lggr logger.Logger, vars Vars, inp
return Result{Error: errors.Errorf("headers must have an even number of elements")}, runInfo
}

url, err := t.getBridgeURLFromName(ctx, name)
overtimeCtx, cancel := overtimeContext(ctx)
defer cancel()

url, err := t.getBridgeURLFromName(overtimeCtx, name)
if err != nil {
return Result{Error: err}, runInfo
}
Expand Down Expand Up @@ -181,7 +184,7 @@ func (t *BridgeTask) Run(ctx context.Context, lggr logger.Logger, vars Vars, inp
}

var cacheErr error
responseBytes, cacheErr = t.orm.GetCachedResponse(ctx, t.dotID, t.specId, cacheDuration)
responseBytes, cacheErr = t.orm.GetCachedResponse(overtimeCtx, t.dotID, t.specId, cacheDuration)
if cacheErr != nil {
promBridgeCacheErrors.WithLabelValues(t.Name).Inc()
if !errors.Is(cacheErr, sql.ErrNoRows) {
Expand Down Expand Up @@ -217,7 +220,7 @@ func (t *BridgeTask) Run(ctx context.Context, lggr logger.Logger, vars Vars, inp
}

if !cachedResponse && cacheTTL > 0 {
err := t.orm.UpsertBridgeResponse(ctx, t.dotID, t.specId, responseBytes)
err := t.orm.UpsertBridgeResponse(overtimeCtx, t.dotID, t.specId, responseBytes)
if err != nil {
lggr.Errorw("Bridge task: failed to upsert response in bridge cache", "err", err)
}
Expand All @@ -241,7 +244,7 @@ func (t *BridgeTask) Run(ctx context.Context, lggr logger.Logger, vars Vars, inp
return result, runInfo
}

func (t BridgeTask) getBridgeURLFromName(ctx context.Context, name StringParam) (URLParam, error) {
func (t *BridgeTask) getBridgeURLFromName(ctx context.Context, name StringParam) (URLParam, error) {
bt, err := t.orm.FindBridge(ctx, bridges.BridgeName(name))
if err != nil {
return URLParam{}, errors.Wrapf(err, "could not find bridge with name '%s'", name)
Expand Down
72 changes: 72 additions & 0 deletions core/services/pipeline/task.bridge_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package pipeline_test

import (
"context"
"encoding/json"
"fmt"
"io"
Expand Down Expand Up @@ -1139,3 +1140,74 @@ func TestBridgeTask_AdapterResponseStatusFailure(t *testing.T) {
require.False(t, runInfo.IsRetryable)
require.False(t, runInfo.IsPending)
}

func TestBridgeTask_AdapterTimeout(t *testing.T) {
t.Parallel()
ctx := testutils.Context(t)

db := pgtest.NewSqlxDB(t)
cfg := configtest.NewGeneralConfig(t, func(c *chainlink.Config, s *chainlink.Secrets) {
c.WebServer.BridgeCacheTTL = commonconfig.MustNewDuration(1 * time.Minute)
})

s1 := httptest.NewServer(
http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
time.Sleep(time.Second) // delay enough to time-out
}))
defer s1.Close()

feedURL, err := url.ParseRequestURI(s1.URL)
require.NoError(t, err)

orm := bridges.NewORM(db)
_, bridge := cltest.MustCreateBridge(t, db, cltest.BridgeOpts{URL: feedURL.String()})

task := pipeline.BridgeTask{
BaseTask: pipeline.NewBaseTask(0, "bridge", nil, nil, 0),
Name: bridge.Name.String(),
RequestData: btcUSDPairing,
}
c := clhttptest.NewTestLocalOnlyHTTPClient()
trORM := pipeline.NewORM(db, logger.TestLogger(t), cfg.JobPipeline().MaxSuccessfulRuns())
specID, err := trORM.CreateSpec(ctx, pipeline.Pipeline{}, *models.NewInterval(5 * time.Minute))
require.NoError(t, err)
task.HelperSetDependencies(cfg.JobPipeline(), cfg.WebServer(), orm, specID, uuid.UUID{}, c)

// Insert entry 1m in the past, stale value, should not be used in case of EA failure.
_, err = db.ExecContext(ctx, `INSERT INTO bridge_last_value(dot_id, spec_id, value, finished_at)
VALUES($1, $2, $3, $4) ON CONFLICT ON CONSTRAINT bridge_last_value_pkey
DO UPDATE SET value = $3, finished_at = $4;`, task.DotID(), specID, big.NewInt(9700).Bytes(), time.Now())
require.NoError(t, err)

vars := pipeline.NewVarsFrom(
map[string]interface{}{
"jobRun": map[string]interface{}{
"meta": map[string]interface{}{
"shouldFail": true,
},
},
},
)

t.Run("pre-cancelled", func(t *testing.T) {
ctx, cancel := context.WithCancel(testutils.Context(t))
cancel() // pre-cancelled
result, runInfo := task.Run(ctx, logger.TestLogger(t), vars, nil)

require.NoError(t, result.Error)
require.NotNil(t, result.Value)
require.False(t, runInfo.IsRetryable)
require.False(t, runInfo.IsPending)
})

t.Run("short", func(t *testing.T) {
ctx, cancel := context.WithTimeout(testutils.Context(t), time.Millisecond)
t.Cleanup(cancel)
result, runInfo := task.Run(ctx, logger.TestLogger(t), vars, nil)

require.NoError(t, result.Error)
require.NotNil(t, result.Value)
require.False(t, runInfo.IsRetryable)
require.False(t, runInfo.IsPending)
})
}

0 comments on commit b95fe98

Please sign in to comment.