Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

pipeline/task.bridge: do not cache invalid external adapter response objects. #11725

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 39 additions & 0 deletions core/services/pipeline/internal/eautils/eautils.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
package eautils

import (
"encoding/json"
"net/http"
)

type AdapterStatus struct {
ErrorMessage *string `json:"errorMessage"`
Error any `json:"error"`
StatusCode *int `json:"statusCode"`
ProviderStatusCode *int `json:"providerStatusCode"`
}

func BestEffortExtractEAStatus(responseBytes []byte) (code int, ok bool) {
var status AdapterStatus
err := json.Unmarshal(responseBytes, &status)
if err != nil {
return 0, false
}

if status.StatusCode == nil {
return 0, false
}

if *status.StatusCode != http.StatusOK {
return *status.StatusCode, true
}

if status.ProviderStatusCode != nil && *status.ProviderStatusCode != http.StatusOK {
return *status.ProviderStatusCode, true
}

if status.Error != nil {
return http.StatusInternalServerError, true
}

return *status.StatusCode, true
}
61 changes: 61 additions & 0 deletions core/services/pipeline/internal/eautils/eautils_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
package eautils

import (
"net/http"
"testing"

"github.com/stretchr/testify/assert"
)

func TestBestEffortExtractEAStatus(t *testing.T) {
tests := []struct {
name string
arg []byte
expectCode int
expectOk bool
}{
{
name: "invalid object",
arg: []byte(`{"error": "invalid json object" `),
expectCode: 0,
expectOk: false,
},
{
name: "no status code in object",
arg: []byte(`{}`),
expectCode: 0,
expectOk: false,
},
{
name: "invalid status code",
arg: []byte(`{"statusCode":400}`),
expectCode: http.StatusBadRequest,
expectOk: true,
},
{
name: "invalid provider status code",
arg: []byte(`{"statusCode":200, "providerStatusCode":500}`),
expectCode: http.StatusInternalServerError,
expectOk: true,
},
{
name: "valid statuses with error message",
arg: []byte(`{"statusCode":200, "providerStatusCode":200, "error": "unexpected error"}`),
expectCode: http.StatusInternalServerError,
expectOk: true,
},
{
name: "valid status code",
arg: []byte(`{"statusCode":200}`),
expectCode: http.StatusOK,
expectOk: true,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
code, ok := BestEffortExtractEAStatus(tt.arg)
assert.Equal(t, tt.expectCode, code)
assert.Equal(t, tt.expectOk, ok)
})
}
}
9 changes: 8 additions & 1 deletion core/services/pipeline/task.bridge.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (

"github.com/smartcontractkit/chainlink/v2/core/bridges"
"github.com/smartcontractkit/chainlink/v2/core/logger"
"github.com/smartcontractkit/chainlink/v2/core/services/pipeline/internal/eautils"
)

// NOTE: These metrics generate a new label per bridge, this should be safe
Expand Down Expand Up @@ -167,7 +168,13 @@ func (t *BridgeTask) Run(ctx context.Context, lggr logger.Logger, vars Vars, inp

var cachedResponse bool
responseBytes, statusCode, headers, elapsed, err := makeHTTPRequest(requestCtx, lggr, "POST", url, reqHeaders, requestData, t.httpClient, t.config.DefaultHTTPLimit())
if err != nil {

// check for external adapter response object status
if code, ok := eautils.BestEffortExtractEAStatus(responseBytes); ok {
statusCode = code
}

if err != nil || statusCode != http.StatusOK {
promBridgeErrors.WithLabelValues(t.Name).Inc()
if cacheTTL == 0 {
return Result{Error: err}, RunInfo{IsRetryable: isRetryableHTTPError(statusCode, err)}
Expand Down
139 changes: 131 additions & 8 deletions core/services/pipeline/task.bridge_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ import (
"github.com/shopspring/decimal"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"gopkg.in/guregu/null.v4"

commonconfig "github.com/smartcontractkit/chainlink-common/pkg/config"
"github.com/smartcontractkit/chainlink/v2/core/bridges"
Expand All @@ -32,6 +31,7 @@ import (
"github.com/smartcontractkit/chainlink/v2/core/services/chainlink"
"github.com/smartcontractkit/chainlink/v2/core/services/pg"
"github.com/smartcontractkit/chainlink/v2/core/services/pipeline"
"github.com/smartcontractkit/chainlink/v2/core/services/pipeline/internal/eautils"
"github.com/smartcontractkit/chainlink/v2/core/store/models"
"github.com/smartcontractkit/chainlink/v2/core/utils"
)
Expand Down Expand Up @@ -59,11 +59,43 @@ type adapterResponseData struct {
// adapterResponse is the HTTP response as defined by the external adapter:
// https://github.com/smartcontractkit/bnc-adapter
type adapterResponse struct {
Data adapterResponseData `json:"data"`
ErrorMessage null.String `json:"errorMessage"`
eautils.AdapterStatus
Data adapterResponseData `json:"data"`
}

func (pr adapterResponse) Result() *decimal.Decimal {
func (pr *adapterResponse) SetStatusCode(code int) {
pr.StatusCode = &code
}

func (pr *adapterResponse) UnsetStatusCode() {
pr.StatusCode = nil
}

func (pr *adapterResponse) SetProviderStatusCode(code int) {
pr.ProviderStatusCode = &code
}

func (pr *adapterResponse) UnsetProviderStatusCode() {
pr.ProviderStatusCode = nil
}

func (pr *adapterResponse) SetError(msg string) {
pr.Error = msg
}

func (pr *adapterResponse) UnsetError() {
pr.Error = nil
}

func (pr *adapterResponse) SetErrorMessage(msg string) {
pr.ErrorMessage = &msg
}

func (pr *adapterResponse) UnsetErrorMessage() {
pr.ErrorMessage = nil
}

func (pr *adapterResponse) Result() *decimal.Decimal {
return pr.Data.Result
}

Expand Down Expand Up @@ -295,7 +327,7 @@ func TestBridgeTask_DoesNotReturnStaleResults(t *testing.T) {
task.HelperSetDependencies(cfg.JobPipeline(), cfg.WebServer(), orm, specID, uuid.UUID{}, c)

// Insert entry 1m in the past, stale value, should not be used in case of EA failure.
err = queryer.ExecQ(`INSERT INTO bridge_last_value(dot_id, spec_id, value, finished_at)
err = queryer.ExecQ(`INSERT INTO bridge_last_value(dot_id, spec_id, value, finished_at)
VALUES($1, $2, $3, $4) ON CONFLICT ON CONSTRAINT bridge_last_value_pkey
DO UPDATE SET value = $3, finished_at = $4;`, task.DotID(), specID, big.NewInt(9700).Bytes(), time.Now().Add(-1*time.Minute))
require.NoError(t, err)
Expand Down Expand Up @@ -786,9 +818,10 @@ func TestBridgeTask_ErrorMessage(t *testing.T) {
handler := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Content-Type", "application/json")
w.WriteHeader(http.StatusTooManyRequests)
err := json.NewEncoder(w).Encode(adapterResponse{
ErrorMessage: null.StringFrom("could not hit data fetcher"),
})

resp := &adapterResponse{}
resp.SetErrorMessage("could not hit data fetcher")
err := json.NewEncoder(w).Encode(resp)
require.NoError(t, err)
})

Expand Down Expand Up @@ -1016,3 +1049,93 @@ func TestBridgeTask_Headers(t *testing.T) {
assert.Equal(t, []string{"Content-Length", "38", "Content-Type", "footype", "User-Agent", "Go-http-client/1.1", "X-Header-1", "foo", "X-Header-2", "bar"}, allHeaders(headers))
})
}

func TestBridgeTask_AdapterResponseStatusFailure(t *testing.T) {
t.Parallel()

db := pgtest.NewSqlxDB(t)
cfg := configtest.NewGeneralConfig(t, func(c *chainlink.Config, s *chainlink.Secrets) {
c.WebServer.BridgeCacheTTL = commonconfig.MustNewDuration(1 * time.Minute)
})

testAdapterResponse := &adapterResponse{
Data: adapterResponseData{Result: &decimal.Zero},
}

queryer := pg.NewQ(db, logger.TestLogger(t), cfg.Database())
s1 := httptest.NewServer(
http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
err := json.NewEncoder(w).Encode(testAdapterResponse)
require.NoError(t, err)
}))
defer s1.Close()

feedURL, err := url.ParseRequestURI(s1.URL)
require.NoError(t, err)

orm := bridges.NewORM(db, logger.TestLogger(t), cfg.Database())
_, bridge := cltest.MustCreateBridge(t, db, cltest.BridgeOpts{URL: feedURL.String()}, cfg.Database())

task := pipeline.BridgeTask{
BaseTask: pipeline.NewBaseTask(0, "bridge", nil, nil, 0),
Name: bridge.Name.String(),
RequestData: btcUSDPairing,
}
c := clhttptest.NewTestLocalOnlyHTTPClient()
trORM := pipeline.NewORM(db, logger.TestLogger(t), cfg.Database(), cfg.JobPipeline().MaxSuccessfulRuns())
specID, err := trORM.CreateSpec(pipeline.Pipeline{}, *models.NewInterval(5 * time.Minute), pg.WithParentCtx(testutils.Context(t)))
require.NoError(t, err)
task.HelperSetDependencies(cfg.JobPipeline(), cfg.WebServer(), orm, specID, uuid.UUID{}, c)

// Insert entry 1m in the past, stale value, should not be used in case of EA failure.
err = queryer.ExecQ(`INSERT INTO bridge_last_value(dot_id, spec_id, value, finished_at)
VALUES($1, $2, $3, $4) ON CONFLICT ON CONSTRAINT bridge_last_value_pkey
DO UPDATE SET value = $3, finished_at = $4;`, task.DotID(), specID, big.NewInt(9700).Bytes(), time.Now())
require.NoError(t, err)

vars := pipeline.NewVarsFrom(
map[string]interface{}{
"jobRun": map[string]interface{}{
"meta": map[string]interface{}{
"shouldFail": true,
},
},
},
)

// expect all external adapter response status failures to be served from the cache
testAdapterResponse.SetStatusCode(http.StatusBadRequest)
result, runInfo := task.Run(testutils.Context(t), logger.TestLogger(t), vars, nil)

require.NoError(t, result.Error)
require.NotNil(t, result.Value)
require.False(t, runInfo.IsRetryable)
require.False(t, runInfo.IsPending)

testAdapterResponse.SetStatusCode(http.StatusOK)
testAdapterResponse.SetProviderStatusCode(http.StatusBadRequest)
result, runInfo = task.Run(testutils.Context(t), logger.TestLogger(t), vars, nil)

require.NoError(t, result.Error)
require.NotNil(t, result.Value)
require.False(t, runInfo.IsRetryable)
require.False(t, runInfo.IsPending)

testAdapterResponse.SetStatusCode(http.StatusOK)
testAdapterResponse.SetProviderStatusCode(http.StatusOK)
testAdapterResponse.SetError("some error")
result, runInfo = task.Run(testutils.Context(t), logger.TestLogger(t), vars, nil)

require.NoError(t, result.Error)
require.NotNil(t, result.Value)
require.False(t, runInfo.IsRetryable)
require.False(t, runInfo.IsPending)

testAdapterResponse.SetStatusCode(http.StatusInternalServerError)
result, runInfo = task.Run(testutils.Context(t), logger.TestLogger(t), vars, nil)

require.NoError(t, result.Error)
require.NotNil(t, result.Value)
require.False(t, runInfo.IsRetryable)
require.False(t, runInfo.IsPending)
}
7 changes: 3 additions & 4 deletions core/services/pipeline/task.http_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ import (
"github.com/shopspring/decimal"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"gopkg.in/guregu/null.v4"

"github.com/smartcontractkit/chainlink/v2/core/bridges"
"github.com/smartcontractkit/chainlink/v2/core/internal/cltest"
Expand Down Expand Up @@ -264,9 +263,9 @@ func TestHTTPTask_ErrorMessage(t *testing.T) {
handler := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Content-Type", "application/json")
w.WriteHeader(http.StatusTooManyRequests)
err := json.NewEncoder(w).Encode(adapterResponse{
ErrorMessage: null.StringFrom("could not hit data fetcher"),
})
resp := &adapterResponse{}
resp.SetErrorMessage("could not hit data fetcher")
err := json.NewEncoder(w).Encode(resp)
require.NoError(t, err)
})

Expand Down
Loading