Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FUN-1313] implement retry strategy for external adapter #12391

Merged
merged 11 commits into from
Mar 15, 2024
Merged
2 changes: 2 additions & 0 deletions core/scripts/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -170,10 +170,12 @@ require (
github.com/gtank/merlin v0.1.1 // indirect
github.com/gtank/ristretto255 v0.1.2 // indirect
github.com/hashicorp/consul/sdk v0.14.1 // indirect
github.com/hashicorp/go-cleanhttp v0.5.2 // indirect
github.com/hashicorp/go-envparse v0.1.0 // indirect
github.com/hashicorp/go-hclog v1.5.0 // indirect
github.com/hashicorp/go-immutable-radix v1.3.1 // indirect
github.com/hashicorp/go-plugin v1.6.0 // indirect
github.com/hashicorp/go-retryablehttp v0.7.5 // indirect
github.com/hashicorp/golang-lru v0.6.0 // indirect
github.com/hashicorp/hcl v1.0.0 // indirect
github.com/hashicorp/yamux v0.1.1 // indirect
Expand Down
3 changes: 3 additions & 0 deletions core/scripts/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -701,6 +701,7 @@ github.com/hashicorp/go-envparse v0.1.0 h1:bE++6bhIsNCPLvgDZkYqo3nA+/PFI51pkrHdm
github.com/hashicorp/go-envparse v0.1.0/go.mod h1:OHheN1GoygLlAkTlXLXvAdnXdZxy8JUweQ1rAXx1xnc=
github.com/hashicorp/go-getter v1.7.1 h1:SWiSWN/42qdpR0MdhaOc/bLR48PLuP1ZQtYLRlM69uY=
github.com/hashicorp/go-getter v1.7.1/go.mod h1:W7TalhMmbPmsSMdNjD0ZskARur/9GJ17cfHTRtXV744=
github.com/hashicorp/go-hclog v0.9.2/go.mod h1:5CU+agLiy3J7N7QjHK5d05KxGsuXiQLrjA0H7acj2lQ=
github.com/hashicorp/go-hclog v1.5.0 h1:bI2ocEMgcVlz55Oj1xZNBsVi900c7II+fWDyV9o+13c=
github.com/hashicorp/go-hclog v1.5.0/go.mod h1:W4Qnvbt70Wk/zYJryRzDRU/4r0kIg0PVHBcfoyhpF5M=
github.com/hashicorp/go-immutable-radix v1.0.0/go.mod h1:0y9vanUI8NX6FsYoO3zeMjhV/C5i9g4Q3DwcSNZ4P60=
Expand All @@ -709,6 +710,8 @@ github.com/hashicorp/go-immutable-radix v1.3.1/go.mod h1:0y9vanUI8NX6FsYoO3zeMjh
github.com/hashicorp/go-msgpack v0.5.3/go.mod h1:ahLV/dePpqEmjfWmKiqvPkv/twdG7iPBM1vqhUKIvfM=
github.com/hashicorp/go-multierror v1.0.0/go.mod h1:dHtQlpGsu+cZNNAkkCN/P3hoUDHhCYQXV3UM06sGGrk=
github.com/hashicorp/go-retryablehttp v0.5.3/go.mod h1:9B5zBasrRhHXnJnui7y6sL7es7NDiJgTc6Er0maI1Xs=
github.com/hashicorp/go-retryablehttp v0.7.5 h1:bJj+Pj19UZMIweq/iie+1u5YCdGrnxCT9yvm0e+Nd5M=
github.com/hashicorp/go-retryablehttp v0.7.5/go.mod h1:Jy/gPYAdjqffZ/yFGCFV2doI5wjtH1ewM9u8iYVjtX8=
github.com/hashicorp/go-rootcerts v1.0.0/go.mod h1:K6zTfqpRlCUIjkwsN4Z+hiSfzSTQa6eBIzfwKfwNnHU=
github.com/hashicorp/go-safetemp v1.0.0 h1:2HR189eFNrjHQyENnQMMpCiBAsRxzbTMIgBhEyExpmo=
github.com/hashicorp/go-safetemp v1.0.0/go.mod h1:oaerMy3BhqiTbVye6QuFhFtIceqFoDHxNAB65b+Rj1I=
Expand Down
43 changes: 29 additions & 14 deletions core/services/functions/external_adapter_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"net/url"
"time"

"github.com/hashicorp/go-retryablehttp"
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
Expand Down Expand Up @@ -42,8 +43,10 @@ type ExternalAdapterClient interface {
}

type externalAdapterClient struct {
adapterURL url.URL
maxResponseBytes int64
adapterURL url.URL
maxResponseBytes int64
maxRetries int
exponentialBackoffBase time.Duration
}

var _ ExternalAdapterClient = (*externalAdapterClient)(nil)
Expand All @@ -54,9 +57,11 @@ type BridgeAccessor interface {
}

type bridgeAccessor struct {
bridgeORM bridges.ORM
bridgeName string
maxResponseBytes int64
bridgeORM bridges.ORM
bridgeName string
maxResponseBytes int64
maxRetries int
exponentialBackoffBase time.Duration
}

var _ BridgeAccessor = (*bridgeAccessor)(nil)
Expand Down Expand Up @@ -112,10 +117,12 @@ var (
)
)

func NewExternalAdapterClient(adapterURL url.URL, maxResponseBytes int64) ExternalAdapterClient {
func NewExternalAdapterClient(adapterURL url.URL, maxResponseBytes int64, maxRetries int, exponentialBackoffBase time.Duration) ExternalAdapterClient {
return &externalAdapterClient{
adapterURL: adapterURL,
maxResponseBytes: maxResponseBytes,
adapterURL: adapterURL,
maxResponseBytes: maxResponseBytes,
maxRetries: maxRetries,
exponentialBackoffBase: exponentialBackoffBase,
}
}

Expand Down Expand Up @@ -190,7 +197,13 @@ func (ea *externalAdapterClient) request(
req.Header.Set("Content-Type", "application/json")

start := time.Now()
client := &http.Client{}

// retry will only happen on a 5XX error response code (except 501)
retryClient := retryablehttp.NewClient()
bolekk marked this conversation as resolved.
Show resolved Hide resolved
retryClient.RetryMax = ea.maxRetries
retryClient.RetryWaitMin = ea.exponentialBackoffBase

client := retryClient.StandardClient()
resp, err := client.Do(req)
if err != nil {
promEAClientErrors.WithLabelValues(label).Inc()
Expand Down Expand Up @@ -244,11 +257,13 @@ func (ea *externalAdapterClient) request(
}
}

func NewBridgeAccessor(bridgeORM bridges.ORM, bridgeName string, maxResponseBytes int64) BridgeAccessor {
func NewBridgeAccessor(bridgeORM bridges.ORM, bridgeName string, maxResponseBytes int64, maxRetries int, exponentialBackoffBase time.Duration) BridgeAccessor {
return &bridgeAccessor{
bridgeORM: bridgeORM,
bridgeName: bridgeName,
maxResponseBytes: maxResponseBytes,
bridgeORM: bridgeORM,
bridgeName: bridgeName,
maxResponseBytes: maxResponseBytes,
maxRetries: maxRetries,
exponentialBackoffBase: exponentialBackoffBase,
}
}

Expand All @@ -257,5 +272,5 @@ func (b *bridgeAccessor) NewExternalAdapterClient() (ExternalAdapterClient, erro
if err != nil {
return nil, err
}
return NewExternalAdapterClient(url.URL(bridge.URL), b.maxResponseBytes), nil
return NewExternalAdapterClient(url.URL(bridge.URL), b.maxResponseBytes, b.maxRetries, b.exponentialBackoffBase), nil
}
106 changes: 92 additions & 14 deletions core/services/functions/external_adapter_client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ func runFetcherTest(t *testing.T, adapterJSONResponse, expectedSecrets, expected
adapterUrl, err := url.Parse(ts.URL)
assert.NoError(t, err, "Unexpected error")

ea := functions.NewExternalAdapterClient(*adapterUrl, 100_000)
ea := functions.NewExternalAdapterClient(*adapterUrl, 100_000, 0, 0)
encryptedSecrets, userError, err := ea.FetchEncryptedSecrets(testutils.Context(t), []byte("urls to secrets"), "requestID1234", "TestJob")

if expectedError != nil {
Expand All @@ -50,7 +50,7 @@ func runRequestTest(t *testing.T, adapterJSONResponse, expectedUserResult, expec
adapterUrl, err := url.Parse(ts.URL)
assert.NoError(t, err, "Unexpected error")

ea := functions.NewExternalAdapterClient(*adapterUrl, 100_000)
ea := functions.NewExternalAdapterClient(*adapterUrl, 100_000, 0, 0)
userResult, userError, domains, err := ea.RunComputation(testutils.Context(t), "requestID1234", "TestJob", "SubOwner", 1, functions.RequestFlags{}, "", &functions.RequestData{})

if expectedError != nil {
Expand Down Expand Up @@ -144,15 +144,7 @@ func TestFetchEncryptedSecrets_UnexpectedResult(t *testing.T) {
}

func TestRunComputation_Success(t *testing.T) {
runRequestTest(t, `{
"result": "success",
"data": {
"result": "0x616263646566",
"error": "",
"domains": ["domain1", "domain2"]
},
"statusCode": 200
}`, "abcdef", "", []string{"domain1", "domain2"}, nil)
runRequestTest(t, runComputationSuccessResponse, "abcdef", "", []string{"domain1", "domain2"}, nil)
}

func TestRunComputation_MissingData(t *testing.T) {
Expand All @@ -177,7 +169,7 @@ func TestRunComputation_CorrectAdapterRequest(t *testing.T) {
adapterUrl, err := url.Parse(ts.URL)
assert.NoError(t, err)

ea := functions.NewExternalAdapterClient(*adapterUrl, 100_000)
ea := functions.NewExternalAdapterClient(*adapterUrl, 100_000, 0, 0)
reqData := &functions.RequestData{
Source: "abcd",
Language: 7,
Expand All @@ -199,7 +191,7 @@ func TestRunComputation_HTTP500(t *testing.T) {
adapterUrl, err := url.Parse(ts.URL)
assert.NoError(t, err)

ea := functions.NewExternalAdapterClient(*adapterUrl, 100_000)
ea := functions.NewExternalAdapterClient(*adapterUrl, 100_000, 0, 0)
_, _, _, err = ea.RunComputation(testutils.Context(t), "requestID1234", "TestJob", "SubOwner", 1, functions.RequestFlags{}, "secRETS", &functions.RequestData{})
assert.Error(t, err)
}
Expand All @@ -214,10 +206,96 @@ func TestRunComputation_ContextRespected(t *testing.T) {
adapterUrl, err := url.Parse(ts.URL)
assert.NoError(t, err)

ea := functions.NewExternalAdapterClient(*adapterUrl, 100_000)
ea := functions.NewExternalAdapterClient(*adapterUrl, 100_000, 0, 0)
ctx, cancel := context.WithTimeout(testutils.Context(t), 10*time.Millisecond)
defer cancel()
_, _, _, err = ea.RunComputation(ctx, "requestID1234", "TestJob", "SubOwner", 1, functions.RequestFlags{}, "secRETS", &functions.RequestData{})
assert.Error(t, err)
close(done)
}

func TestRunComputationRetrial(t *testing.T) {

t.Run("OK-retry_succeeds_after_one_failure", func(t *testing.T) {
counter := 0
ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
switch counter {
case 0:
counter++
w.WriteHeader(http.StatusInternalServerError)
return
case 1:
counter++
fmt.Fprintln(w, runComputationSuccessResponse)
return
default:
t.Errorf("invalid amount of retries: %d", counter)
t.FailNow()
}
}))
defer ts.Close()

adapterUrl, err := url.Parse(ts.URL)
assert.NoError(t, err)

ea := functions.NewExternalAdapterClient(*adapterUrl, 100_000, 1, 1*time.Nanosecond)
_, _, _, err = ea.RunComputation(testutils.Context(t), "requestID1234", "TestJob", "SubOwner", 1, functions.RequestFlags{}, "secRETS", &functions.RequestData{})
assert.NoError(t, err)
})

t.Run("NOK-retry_fails_after_retrial", func(t *testing.T) {
counter := 0
ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
switch counter {
case 0, 1:
counter++
w.WriteHeader(http.StatusInternalServerError)
return
default:
t.Errorf("invalid amount of retries: %d", counter)
t.FailNow()
}
}))
defer ts.Close()

adapterUrl, err := url.Parse(ts.URL)
assert.NoError(t, err)

ea := functions.NewExternalAdapterClient(*adapterUrl, 100_000, 1, 1*time.Nanosecond)
_, _, _, err = ea.RunComputation(testutils.Context(t), "requestID1234", "TestJob", "SubOwner", 1, functions.RequestFlags{}, "secRETS", &functions.RequestData{})
assert.Error(t, err)
})

t.Run("NOK-dont_retry_on_4XX_errors", func(t *testing.T) {
counter := 0
ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
switch counter {
case 0:
counter++
w.WriteHeader(http.StatusBadRequest)
return
default:
t.Errorf("invalid amount of retries: %d", counter)
t.FailNow()
}
}))
defer ts.Close()

adapterUrl, err := url.Parse(ts.URL)
assert.NoError(t, err)

ea := functions.NewExternalAdapterClient(*adapterUrl, 100_000, 1, 1*time.Nanosecond)
_, _, _, err = ea.RunComputation(testutils.Context(t), "requestID1234", "TestJob", "SubOwner", 1, functions.RequestFlags{}, "secRETS", &functions.RequestData{})
assert.Error(t, err)
})
}

const runComputationSuccessResponse = `{
"result": "success",
"data": {
"result": "0x616263646566",
"error": "",
"domains": ["domain1", "domain2"]
},
"statusCode": 200
}`
58 changes: 30 additions & 28 deletions core/services/ocr2/plugins/functions/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,34 +21,36 @@ import (

// This config is part of the job spec and is loaded only once on node boot/job creation.
type PluginConfig struct {
EnableRequestSignatureCheck bool `json:"enableRequestSignatureCheck"`
DONID string `json:"donID"`
ContractVersion uint32 `json:"contractVersion"`
MinRequestConfirmations uint32 `json:"minRequestConfirmations"`
MinResponseConfirmations uint32 `json:"minResponseConfirmations"`
MinIncomingConfirmations uint32 `json:"minIncomingConfirmations"`
PastBlocksToPoll uint32 `json:"pastBlocksToPoll"`
LogPollerCacheDurationSec uint32 `json:"logPollerCacheDurationSec"` // Duration to cache previously detected request or response logs such that they can be filtered when calling logpoller_wrapper.LatestEvents()
RequestTimeoutSec uint32 `json:"requestTimeoutSec"`
RequestTimeoutCheckFrequencySec uint32 `json:"requestTimeoutCheckFrequencySec"`
RequestTimeoutBatchLookupSize uint32 `json:"requestTimeoutBatchLookupSize"`
PruneMaxStoredRequests uint32 `json:"pruneMaxStoredRequests"`
PruneCheckFrequencySec uint32 `json:"pruneCheckFrequencySec"`
PruneBatchSize uint32 `json:"pruneBatchSize"`
ListenerEventHandlerTimeoutSec uint32 `json:"listenerEventHandlerTimeoutSec"`
ListenerEventsCheckFrequencyMillis uint32 `json:"listenerEventsCheckFrequencyMillis"`
ContractUpdateCheckFrequencySec uint32 `json:"contractUpdateCheckFrequencySec"`
MaxRequestSizeBytes uint32 `json:"maxRequestSizeBytes"`
MaxRequestSizesList []uint32 `json:"maxRequestSizesList"`
MaxSecretsSizesList []uint32 `json:"maxSecretsSizesList"`
MinimumSubscriptionBalance assets.Link `json:"minimumSubscriptionBalance"`
AllowedHeartbeatInitiators []string `json:"allowedHeartbeatInitiators"`
GatewayConnectorConfig *connector.ConnectorConfig `json:"gatewayConnectorConfig"`
OnchainAllowlist *allowlist.OnchainAllowlistConfig `json:"onchainAllowlist"`
OnchainSubscriptions *subscriptions.OnchainSubscriptionsConfig `json:"onchainSubscriptions"`
RateLimiter *common.RateLimiterConfig `json:"rateLimiter"`
S4Constraints *s4.Constraints `json:"s4Constraints"`
DecryptionQueueConfig *DecryptionQueueConfig `json:"decryptionQueueConfig"`
EnableRequestSignatureCheck bool `json:"enableRequestSignatureCheck"`
DONID string `json:"donID"`
ContractVersion uint32 `json:"contractVersion"`
MinRequestConfirmations uint32 `json:"minRequestConfirmations"`
MinResponseConfirmations uint32 `json:"minResponseConfirmations"`
MinIncomingConfirmations uint32 `json:"minIncomingConfirmations"`
PastBlocksToPoll uint32 `json:"pastBlocksToPoll"`
LogPollerCacheDurationSec uint32 `json:"logPollerCacheDurationSec"` // Duration to cache previously detected request or response logs such that they can be filtered when calling logpoller_wrapper.LatestEvents()
RequestTimeoutSec uint32 `json:"requestTimeoutSec"`
RequestTimeoutCheckFrequencySec uint32 `json:"requestTimeoutCheckFrequencySec"`
RequestTimeoutBatchLookupSize uint32 `json:"requestTimeoutBatchLookupSize"`
PruneMaxStoredRequests uint32 `json:"pruneMaxStoredRequests"`
PruneCheckFrequencySec uint32 `json:"pruneCheckFrequencySec"`
PruneBatchSize uint32 `json:"pruneBatchSize"`
ListenerEventHandlerTimeoutSec uint32 `json:"listenerEventHandlerTimeoutSec"`
ListenerEventsCheckFrequencyMillis uint32 `json:"listenerEventsCheckFrequencyMillis"`
ContractUpdateCheckFrequencySec uint32 `json:"contractUpdateCheckFrequencySec"`
MaxRequestSizeBytes uint32 `json:"maxRequestSizeBytes"`
MaxRequestSizesList []uint32 `json:"maxRequestSizesList"`
MaxSecretsSizesList []uint32 `json:"maxSecretsSizesList"`
MinimumSubscriptionBalance assets.Link `json:"minimumSubscriptionBalance"`
AllowedHeartbeatInitiators []string `json:"allowedHeartbeatInitiators"`
GatewayConnectorConfig *connector.ConnectorConfig `json:"gatewayConnectorConfig"`
OnchainAllowlist *allowlist.OnchainAllowlistConfig `json:"onchainAllowlist"`
OnchainSubscriptions *subscriptions.OnchainSubscriptionsConfig `json:"onchainSubscriptions"`
RateLimiter *common.RateLimiterConfig `json:"rateLimiter"`
S4Constraints *s4.Constraints `json:"s4Constraints"`
DecryptionQueueConfig *DecryptionQueueConfig `json:"decryptionQueueConfig"`
ExternalAdapterMaxRetries *uint32 `json:"externalAdapterMaxRetries"`
ExternalAdapterExponentialBackoffBaseSec *uint32 `json:"externalAdapterExponentialBackoffBaseSec"`
}

type DecryptionQueueConfig struct {
Expand Down
21 changes: 20 additions & 1 deletion core/services/ocr2/plugins/functions/plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,8 @@ const (
FunctionsS4Namespace string = "functions"
MaxAdapterResponseBytes int64 = 1_000_000
DefaultOffchainTransmitterChannelSize uint32 = 1000
DefaultMaxAdapterRetry int = 3
DefaultExponentialBackoffBase = 5 * time.Second
)

// Create all OCR2 plugin Oracles and all extra services needed to run a Functions job.
Expand Down Expand Up @@ -106,7 +108,24 @@ func NewFunctionsServices(ctx context.Context, functionsOracleArgs, thresholdOra

offchainTransmitter := functions.NewOffchainTransmitter(DefaultOffchainTransmitterChannelSize)
listenerLogger := conf.Logger.Named("FunctionsListener")
bridgeAccessor := functions.NewBridgeAccessor(conf.BridgeORM, FunctionsBridgeName, MaxAdapterResponseBytes)

var maxRetries int
if pluginConfig.ExternalAdapterMaxRetries != nil {
maxRetries = int(*pluginConfig.ExternalAdapterMaxRetries)
} else {
maxRetries = DefaultMaxAdapterRetry
}
conf.Logger.Debugf("external adapter maxRetries configured to: %d", maxRetries)

var exponentialBackoffBase time.Duration
if pluginConfig.ExternalAdapterExponentialBackoffBaseSec != nil {
exponentialBackoffBase = time.Duration(*pluginConfig.ExternalAdapterExponentialBackoffBaseSec) * time.Second
} else {
exponentialBackoffBase = DefaultExponentialBackoffBase
}
conf.Logger.Debugf("external adapter exponentialBackoffBase configured to: %g sec", exponentialBackoffBase.Seconds())

bridgeAccessor := functions.NewBridgeAccessor(conf.BridgeORM, FunctionsBridgeName, MaxAdapterResponseBytes, maxRetries, exponentialBackoffBase)
functionsListener := functions.NewFunctionsListener(
conf.Job,
conf.Chain.Client(),
Expand Down
Loading
Loading