Skip to content

Commit

Permalink
[FUN-1313] implement retry strategy for external adapter (#12391)
Browse files Browse the repository at this point in the history
* feat: implement retry strategy with exponential backoff for external adapter

* fix: integration-tests go.mod

* fix: core/scripts go.mod

* fix: integration-tests/load go.mod

* chore: set retryMax and exponentialBackoffBase as jobspec config parameters

* chore: rename maxRetry to maxRetries

* chore: adjust exponential backoff base and improve logs

* chore: remove redundant log
  • Loading branch information
agparadiso authored Mar 15, 2024
1 parent 9f44174 commit d0a88b8
Show file tree
Hide file tree
Showing 12 changed files with 189 additions and 61 deletions.
2 changes: 2 additions & 0 deletions core/scripts/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -170,10 +170,12 @@ require (
github.com/gtank/merlin v0.1.1 // indirect
github.com/gtank/ristretto255 v0.1.2 // indirect
github.com/hashicorp/consul/sdk v0.14.1 // indirect
github.com/hashicorp/go-cleanhttp v0.5.2 // indirect
github.com/hashicorp/go-envparse v0.1.0 // indirect
github.com/hashicorp/go-hclog v1.5.0 // indirect
github.com/hashicorp/go-immutable-radix v1.3.1 // indirect
github.com/hashicorp/go-plugin v1.6.0 // indirect
github.com/hashicorp/go-retryablehttp v0.7.5 // indirect
github.com/hashicorp/golang-lru v0.6.0 // indirect
github.com/hashicorp/hcl v1.0.0 // indirect
github.com/hashicorp/yamux v0.1.1 // indirect
Expand Down
3 changes: 3 additions & 0 deletions core/scripts/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -701,6 +701,7 @@ github.com/hashicorp/go-envparse v0.1.0 h1:bE++6bhIsNCPLvgDZkYqo3nA+/PFI51pkrHdm
github.com/hashicorp/go-envparse v0.1.0/go.mod h1:OHheN1GoygLlAkTlXLXvAdnXdZxy8JUweQ1rAXx1xnc=
github.com/hashicorp/go-getter v1.7.1 h1:SWiSWN/42qdpR0MdhaOc/bLR48PLuP1ZQtYLRlM69uY=
github.com/hashicorp/go-getter v1.7.1/go.mod h1:W7TalhMmbPmsSMdNjD0ZskARur/9GJ17cfHTRtXV744=
github.com/hashicorp/go-hclog v0.9.2/go.mod h1:5CU+agLiy3J7N7QjHK5d05KxGsuXiQLrjA0H7acj2lQ=
github.com/hashicorp/go-hclog v1.5.0 h1:bI2ocEMgcVlz55Oj1xZNBsVi900c7II+fWDyV9o+13c=
github.com/hashicorp/go-hclog v1.5.0/go.mod h1:W4Qnvbt70Wk/zYJryRzDRU/4r0kIg0PVHBcfoyhpF5M=
github.com/hashicorp/go-immutable-radix v1.0.0/go.mod h1:0y9vanUI8NX6FsYoO3zeMjhV/C5i9g4Q3DwcSNZ4P60=
Expand All @@ -709,6 +710,8 @@ github.com/hashicorp/go-immutable-radix v1.3.1/go.mod h1:0y9vanUI8NX6FsYoO3zeMjh
github.com/hashicorp/go-msgpack v0.5.3/go.mod h1:ahLV/dePpqEmjfWmKiqvPkv/twdG7iPBM1vqhUKIvfM=
github.com/hashicorp/go-multierror v1.0.0/go.mod h1:dHtQlpGsu+cZNNAkkCN/P3hoUDHhCYQXV3UM06sGGrk=
github.com/hashicorp/go-retryablehttp v0.5.3/go.mod h1:9B5zBasrRhHXnJnui7y6sL7es7NDiJgTc6Er0maI1Xs=
github.com/hashicorp/go-retryablehttp v0.7.5 h1:bJj+Pj19UZMIweq/iie+1u5YCdGrnxCT9yvm0e+Nd5M=
github.com/hashicorp/go-retryablehttp v0.7.5/go.mod h1:Jy/gPYAdjqffZ/yFGCFV2doI5wjtH1ewM9u8iYVjtX8=
github.com/hashicorp/go-rootcerts v1.0.0/go.mod h1:K6zTfqpRlCUIjkwsN4Z+hiSfzSTQa6eBIzfwKfwNnHU=
github.com/hashicorp/go-safetemp v1.0.0 h1:2HR189eFNrjHQyENnQMMpCiBAsRxzbTMIgBhEyExpmo=
github.com/hashicorp/go-safetemp v1.0.0/go.mod h1:oaerMy3BhqiTbVye6QuFhFtIceqFoDHxNAB65b+Rj1I=
Expand Down
43 changes: 29 additions & 14 deletions core/services/functions/external_adapter_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"net/url"
"time"

"github.com/hashicorp/go-retryablehttp"
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
Expand Down Expand Up @@ -42,8 +43,10 @@ type ExternalAdapterClient interface {
}

type externalAdapterClient struct {
adapterURL url.URL
maxResponseBytes int64
adapterURL url.URL
maxResponseBytes int64
maxRetries int
exponentialBackoffBase time.Duration
}

var _ ExternalAdapterClient = (*externalAdapterClient)(nil)
Expand All @@ -54,9 +57,11 @@ type BridgeAccessor interface {
}

type bridgeAccessor struct {
bridgeORM bridges.ORM
bridgeName string
maxResponseBytes int64
bridgeORM bridges.ORM
bridgeName string
maxResponseBytes int64
maxRetries int
exponentialBackoffBase time.Duration
}

var _ BridgeAccessor = (*bridgeAccessor)(nil)
Expand Down Expand Up @@ -112,10 +117,12 @@ var (
)
)

func NewExternalAdapterClient(adapterURL url.URL, maxResponseBytes int64) ExternalAdapterClient {
func NewExternalAdapterClient(adapterURL url.URL, maxResponseBytes int64, maxRetries int, exponentialBackoffBase time.Duration) ExternalAdapterClient {
return &externalAdapterClient{
adapterURL: adapterURL,
maxResponseBytes: maxResponseBytes,
adapterURL: adapterURL,
maxResponseBytes: maxResponseBytes,
maxRetries: maxRetries,
exponentialBackoffBase: exponentialBackoffBase,
}
}

Expand Down Expand Up @@ -190,7 +197,13 @@ func (ea *externalAdapterClient) request(
req.Header.Set("Content-Type", "application/json")

start := time.Now()
client := &http.Client{}

// retry will only happen on a 5XX error response code (except 501)
retryClient := retryablehttp.NewClient()
retryClient.RetryMax = ea.maxRetries
retryClient.RetryWaitMin = ea.exponentialBackoffBase

client := retryClient.StandardClient()
resp, err := client.Do(req)
if err != nil {
promEAClientErrors.WithLabelValues(label).Inc()
Expand Down Expand Up @@ -244,11 +257,13 @@ func (ea *externalAdapterClient) request(
}
}

func NewBridgeAccessor(bridgeORM bridges.ORM, bridgeName string, maxResponseBytes int64) BridgeAccessor {
func NewBridgeAccessor(bridgeORM bridges.ORM, bridgeName string, maxResponseBytes int64, maxRetries int, exponentialBackoffBase time.Duration) BridgeAccessor {
return &bridgeAccessor{
bridgeORM: bridgeORM,
bridgeName: bridgeName,
maxResponseBytes: maxResponseBytes,
bridgeORM: bridgeORM,
bridgeName: bridgeName,
maxResponseBytes: maxResponseBytes,
maxRetries: maxRetries,
exponentialBackoffBase: exponentialBackoffBase,
}
}

Expand All @@ -257,5 +272,5 @@ func (b *bridgeAccessor) NewExternalAdapterClient() (ExternalAdapterClient, erro
if err != nil {
return nil, err
}
return NewExternalAdapterClient(url.URL(bridge.URL), b.maxResponseBytes), nil
return NewExternalAdapterClient(url.URL(bridge.URL), b.maxResponseBytes, b.maxRetries, b.exponentialBackoffBase), nil
}
106 changes: 92 additions & 14 deletions core/services/functions/external_adapter_client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ func runFetcherTest(t *testing.T, adapterJSONResponse, expectedSecrets, expected
adapterUrl, err := url.Parse(ts.URL)
assert.NoError(t, err, "Unexpected error")

ea := functions.NewExternalAdapterClient(*adapterUrl, 100_000)
ea := functions.NewExternalAdapterClient(*adapterUrl, 100_000, 0, 0)
encryptedSecrets, userError, err := ea.FetchEncryptedSecrets(testutils.Context(t), []byte("urls to secrets"), "requestID1234", "TestJob")

if expectedError != nil {
Expand All @@ -50,7 +50,7 @@ func runRequestTest(t *testing.T, adapterJSONResponse, expectedUserResult, expec
adapterUrl, err := url.Parse(ts.URL)
assert.NoError(t, err, "Unexpected error")

ea := functions.NewExternalAdapterClient(*adapterUrl, 100_000)
ea := functions.NewExternalAdapterClient(*adapterUrl, 100_000, 0, 0)
userResult, userError, domains, err := ea.RunComputation(testutils.Context(t), "requestID1234", "TestJob", "SubOwner", 1, functions.RequestFlags{}, "", &functions.RequestData{})

if expectedError != nil {
Expand Down Expand Up @@ -144,15 +144,7 @@ func TestFetchEncryptedSecrets_UnexpectedResult(t *testing.T) {
}

func TestRunComputation_Success(t *testing.T) {
runRequestTest(t, `{
"result": "success",
"data": {
"result": "0x616263646566",
"error": "",
"domains": ["domain1", "domain2"]
},
"statusCode": 200
}`, "abcdef", "", []string{"domain1", "domain2"}, nil)
runRequestTest(t, runComputationSuccessResponse, "abcdef", "", []string{"domain1", "domain2"}, nil)
}

func TestRunComputation_MissingData(t *testing.T) {
Expand All @@ -177,7 +169,7 @@ func TestRunComputation_CorrectAdapterRequest(t *testing.T) {
adapterUrl, err := url.Parse(ts.URL)
assert.NoError(t, err)

ea := functions.NewExternalAdapterClient(*adapterUrl, 100_000)
ea := functions.NewExternalAdapterClient(*adapterUrl, 100_000, 0, 0)
reqData := &functions.RequestData{
Source: "abcd",
Language: 7,
Expand All @@ -199,7 +191,7 @@ func TestRunComputation_HTTP500(t *testing.T) {
adapterUrl, err := url.Parse(ts.URL)
assert.NoError(t, err)

ea := functions.NewExternalAdapterClient(*adapterUrl, 100_000)
ea := functions.NewExternalAdapterClient(*adapterUrl, 100_000, 0, 0)
_, _, _, err = ea.RunComputation(testutils.Context(t), "requestID1234", "TestJob", "SubOwner", 1, functions.RequestFlags{}, "secRETS", &functions.RequestData{})
assert.Error(t, err)
}
Expand All @@ -214,10 +206,96 @@ func TestRunComputation_ContextRespected(t *testing.T) {
adapterUrl, err := url.Parse(ts.URL)
assert.NoError(t, err)

ea := functions.NewExternalAdapterClient(*adapterUrl, 100_000)
ea := functions.NewExternalAdapterClient(*adapterUrl, 100_000, 0, 0)
ctx, cancel := context.WithTimeout(testutils.Context(t), 10*time.Millisecond)
defer cancel()
_, _, _, err = ea.RunComputation(ctx, "requestID1234", "TestJob", "SubOwner", 1, functions.RequestFlags{}, "secRETS", &functions.RequestData{})
assert.Error(t, err)
close(done)
}

func TestRunComputationRetrial(t *testing.T) {

t.Run("OK-retry_succeeds_after_one_failure", func(t *testing.T) {
counter := 0
ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
switch counter {
case 0:
counter++
w.WriteHeader(http.StatusInternalServerError)
return
case 1:
counter++
fmt.Fprintln(w, runComputationSuccessResponse)
return
default:
t.Errorf("invalid amount of retries: %d", counter)
t.FailNow()
}
}))
defer ts.Close()

adapterUrl, err := url.Parse(ts.URL)
assert.NoError(t, err)

ea := functions.NewExternalAdapterClient(*adapterUrl, 100_000, 1, 1*time.Nanosecond)
_, _, _, err = ea.RunComputation(testutils.Context(t), "requestID1234", "TestJob", "SubOwner", 1, functions.RequestFlags{}, "secRETS", &functions.RequestData{})
assert.NoError(t, err)
})

t.Run("NOK-retry_fails_after_retrial", func(t *testing.T) {
counter := 0
ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
switch counter {
case 0, 1:
counter++
w.WriteHeader(http.StatusInternalServerError)
return
default:
t.Errorf("invalid amount of retries: %d", counter)
t.FailNow()
}
}))
defer ts.Close()

adapterUrl, err := url.Parse(ts.URL)
assert.NoError(t, err)

ea := functions.NewExternalAdapterClient(*adapterUrl, 100_000, 1, 1*time.Nanosecond)
_, _, _, err = ea.RunComputation(testutils.Context(t), "requestID1234", "TestJob", "SubOwner", 1, functions.RequestFlags{}, "secRETS", &functions.RequestData{})
assert.Error(t, err)
})

t.Run("NOK-dont_retry_on_4XX_errors", func(t *testing.T) {
counter := 0
ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
switch counter {
case 0:
counter++
w.WriteHeader(http.StatusBadRequest)
return
default:
t.Errorf("invalid amount of retries: %d", counter)
t.FailNow()
}
}))
defer ts.Close()

adapterUrl, err := url.Parse(ts.URL)
assert.NoError(t, err)

ea := functions.NewExternalAdapterClient(*adapterUrl, 100_000, 1, 1*time.Nanosecond)
_, _, _, err = ea.RunComputation(testutils.Context(t), "requestID1234", "TestJob", "SubOwner", 1, functions.RequestFlags{}, "secRETS", &functions.RequestData{})
assert.Error(t, err)
})
}

const runComputationSuccessResponse = `{
"result": "success",
"data": {
"result": "0x616263646566",
"error": "",
"domains": ["domain1", "domain2"]
},
"statusCode": 200
}`
58 changes: 30 additions & 28 deletions core/services/ocr2/plugins/functions/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,34 +21,36 @@ import (

// This config is part of the job spec and is loaded only once on node boot/job creation.
type PluginConfig struct {
EnableRequestSignatureCheck bool `json:"enableRequestSignatureCheck"`
DONID string `json:"donID"`
ContractVersion uint32 `json:"contractVersion"`
MinRequestConfirmations uint32 `json:"minRequestConfirmations"`
MinResponseConfirmations uint32 `json:"minResponseConfirmations"`
MinIncomingConfirmations uint32 `json:"minIncomingConfirmations"`
PastBlocksToPoll uint32 `json:"pastBlocksToPoll"`
LogPollerCacheDurationSec uint32 `json:"logPollerCacheDurationSec"` // Duration to cache previously detected request or response logs such that they can be filtered when calling logpoller_wrapper.LatestEvents()
RequestTimeoutSec uint32 `json:"requestTimeoutSec"`
RequestTimeoutCheckFrequencySec uint32 `json:"requestTimeoutCheckFrequencySec"`
RequestTimeoutBatchLookupSize uint32 `json:"requestTimeoutBatchLookupSize"`
PruneMaxStoredRequests uint32 `json:"pruneMaxStoredRequests"`
PruneCheckFrequencySec uint32 `json:"pruneCheckFrequencySec"`
PruneBatchSize uint32 `json:"pruneBatchSize"`
ListenerEventHandlerTimeoutSec uint32 `json:"listenerEventHandlerTimeoutSec"`
ListenerEventsCheckFrequencyMillis uint32 `json:"listenerEventsCheckFrequencyMillis"`
ContractUpdateCheckFrequencySec uint32 `json:"contractUpdateCheckFrequencySec"`
MaxRequestSizeBytes uint32 `json:"maxRequestSizeBytes"`
MaxRequestSizesList []uint32 `json:"maxRequestSizesList"`
MaxSecretsSizesList []uint32 `json:"maxSecretsSizesList"`
MinimumSubscriptionBalance assets.Link `json:"minimumSubscriptionBalance"`
AllowedHeartbeatInitiators []string `json:"allowedHeartbeatInitiators"`
GatewayConnectorConfig *connector.ConnectorConfig `json:"gatewayConnectorConfig"`
OnchainAllowlist *allowlist.OnchainAllowlistConfig `json:"onchainAllowlist"`
OnchainSubscriptions *subscriptions.OnchainSubscriptionsConfig `json:"onchainSubscriptions"`
RateLimiter *common.RateLimiterConfig `json:"rateLimiter"`
S4Constraints *s4.Constraints `json:"s4Constraints"`
DecryptionQueueConfig *DecryptionQueueConfig `json:"decryptionQueueConfig"`
EnableRequestSignatureCheck bool `json:"enableRequestSignatureCheck"`
DONID string `json:"donID"`
ContractVersion uint32 `json:"contractVersion"`
MinRequestConfirmations uint32 `json:"minRequestConfirmations"`
MinResponseConfirmations uint32 `json:"minResponseConfirmations"`
MinIncomingConfirmations uint32 `json:"minIncomingConfirmations"`
PastBlocksToPoll uint32 `json:"pastBlocksToPoll"`
LogPollerCacheDurationSec uint32 `json:"logPollerCacheDurationSec"` // Duration to cache previously detected request or response logs such that they can be filtered when calling logpoller_wrapper.LatestEvents()
RequestTimeoutSec uint32 `json:"requestTimeoutSec"`
RequestTimeoutCheckFrequencySec uint32 `json:"requestTimeoutCheckFrequencySec"`
RequestTimeoutBatchLookupSize uint32 `json:"requestTimeoutBatchLookupSize"`
PruneMaxStoredRequests uint32 `json:"pruneMaxStoredRequests"`
PruneCheckFrequencySec uint32 `json:"pruneCheckFrequencySec"`
PruneBatchSize uint32 `json:"pruneBatchSize"`
ListenerEventHandlerTimeoutSec uint32 `json:"listenerEventHandlerTimeoutSec"`
ListenerEventsCheckFrequencyMillis uint32 `json:"listenerEventsCheckFrequencyMillis"`
ContractUpdateCheckFrequencySec uint32 `json:"contractUpdateCheckFrequencySec"`
MaxRequestSizeBytes uint32 `json:"maxRequestSizeBytes"`
MaxRequestSizesList []uint32 `json:"maxRequestSizesList"`
MaxSecretsSizesList []uint32 `json:"maxSecretsSizesList"`
MinimumSubscriptionBalance assets.Link `json:"minimumSubscriptionBalance"`
AllowedHeartbeatInitiators []string `json:"allowedHeartbeatInitiators"`
GatewayConnectorConfig *connector.ConnectorConfig `json:"gatewayConnectorConfig"`
OnchainAllowlist *allowlist.OnchainAllowlistConfig `json:"onchainAllowlist"`
OnchainSubscriptions *subscriptions.OnchainSubscriptionsConfig `json:"onchainSubscriptions"`
RateLimiter *common.RateLimiterConfig `json:"rateLimiter"`
S4Constraints *s4.Constraints `json:"s4Constraints"`
DecryptionQueueConfig *DecryptionQueueConfig `json:"decryptionQueueConfig"`
ExternalAdapterMaxRetries *uint32 `json:"externalAdapterMaxRetries"`
ExternalAdapterExponentialBackoffBaseSec *uint32 `json:"externalAdapterExponentialBackoffBaseSec"`
}

type DecryptionQueueConfig struct {
Expand Down
21 changes: 20 additions & 1 deletion core/services/ocr2/plugins/functions/plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,8 @@ const (
FunctionsS4Namespace string = "functions"
MaxAdapterResponseBytes int64 = 1_000_000
DefaultOffchainTransmitterChannelSize uint32 = 1000
DefaultMaxAdapterRetry int = 3
DefaultExponentialBackoffBase = 5 * time.Second
)

// Create all OCR2 plugin Oracles and all extra services needed to run a Functions job.
Expand Down Expand Up @@ -106,7 +108,24 @@ func NewFunctionsServices(ctx context.Context, functionsOracleArgs, thresholdOra

offchainTransmitter := functions.NewOffchainTransmitter(DefaultOffchainTransmitterChannelSize)
listenerLogger := conf.Logger.Named("FunctionsListener")
bridgeAccessor := functions.NewBridgeAccessor(conf.BridgeORM, FunctionsBridgeName, MaxAdapterResponseBytes)

var maxRetries int
if pluginConfig.ExternalAdapterMaxRetries != nil {
maxRetries = int(*pluginConfig.ExternalAdapterMaxRetries)
} else {
maxRetries = DefaultMaxAdapterRetry
}
conf.Logger.Debugf("external adapter maxRetries configured to: %d", maxRetries)

var exponentialBackoffBase time.Duration
if pluginConfig.ExternalAdapterExponentialBackoffBaseSec != nil {
exponentialBackoffBase = time.Duration(*pluginConfig.ExternalAdapterExponentialBackoffBaseSec) * time.Second
} else {
exponentialBackoffBase = DefaultExponentialBackoffBase
}
conf.Logger.Debugf("external adapter exponentialBackoffBase configured to: %g sec", exponentialBackoffBase.Seconds())

bridgeAccessor := functions.NewBridgeAccessor(conf.BridgeORM, FunctionsBridgeName, MaxAdapterResponseBytes, maxRetries, exponentialBackoffBase)
functionsListener := functions.NewFunctionsListener(
conf.Job,
conf.Chain.Client(),
Expand Down
Loading

0 comments on commit d0a88b8

Please sign in to comment.