Skip to content

Commit

Permalink
populate retry interval for mercury requests [DO NOT MERGE] (#11150) (#…
Browse files Browse the repository at this point in the history
…11210)

* populate retry interval for mercury requests

* fix go mod

* populate retry interval based on the work ID counter

* add tests

* use const value

* include check block number in plugin retry key

* address comments

* refactor

* test for plugin retry counter

* Bump ocr2keepers to 0.7.28 (#11181)

* handles 206 response code

* reduce mercury permission cache period

* address comments

* remove special logging for 206

---------

Co-authored-by: Akshay Aggarwal <[email protected]>
  • Loading branch information
FelixFan1992 and infiloop2 authored Nov 7, 2023
1 parent 33144f9 commit 3b9585e
Show file tree
Hide file tree
Showing 9 changed files with 247 additions and 82 deletions.
2 changes: 1 addition & 1 deletion core/scripts/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ require (
github.com/shopspring/decimal v1.3.1
github.com/smartcontractkit/chainlink/v2 v2.0.0-00010101000000-000000000000
github.com/smartcontractkit/libocr v0.0.0-20231020123319-d255366a6545
github.com/smartcontractkit/ocr2keepers v0.7.27
github.com/smartcontractkit/ocr2keepers v0.7.28
github.com/smartcontractkit/ocr2vrf v0.0.0-20230804151440-2f1eb1e20687
github.com/smartcontractkit/sqlx v1.3.5-0.20210805004948-4be295aacbeb
github.com/spf13/cobra v1.6.1
Expand Down
4 changes: 2 additions & 2 deletions core/scripts/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -1470,8 +1470,8 @@ github.com/smartcontractkit/grpc-proxy v0.0.0-20230731113816-f1be6620749f h1:hgJ
github.com/smartcontractkit/grpc-proxy v0.0.0-20230731113816-f1be6620749f/go.mod h1:MvMXoufZAtqExNexqi4cjrNYE9MefKddKylxjS+//n0=
github.com/smartcontractkit/libocr v0.0.0-20231020123319-d255366a6545 h1:qOsw2ETQD/Sb/W2xuYn2KPWjvvsWA0C+l19rWFq8iNg=
github.com/smartcontractkit/libocr v0.0.0-20231020123319-d255366a6545/go.mod h1:2lyRkw/qLQgUWlrWWmq5nj0y90rWeO6Y+v+fCakRgb0=
github.com/smartcontractkit/ocr2keepers v0.7.27 h1:kwqMrzmEdq6gH4yqNuLQCbdlED0KaIjwZzu3FF+Gves=
github.com/smartcontractkit/ocr2keepers v0.7.27/go.mod h1:1QGzJURnoWpysguPowOe2bshV0hNp1YX10HHlhDEsas=
github.com/smartcontractkit/ocr2keepers v0.7.28 h1:dufAiYl4+uly9aH0+6GkS2jYzHGujq7tg0LYQE+x6JU=
github.com/smartcontractkit/ocr2keepers v0.7.28/go.mod h1:1QGzJURnoWpysguPowOe2bshV0hNp1YX10HHlhDEsas=
github.com/smartcontractkit/ocr2vrf v0.0.0-20230804151440-2f1eb1e20687 h1:NwC3SOc25noBTe1KUQjt45fyTIuInhoE2UfgcHAdihM=
github.com/smartcontractkit/ocr2vrf v0.0.0-20230804151440-2f1eb1e20687/go.mod h1:YYZq52t4wcHoMQeITksYsorD+tZcOyuVU5+lvot3VFM=
github.com/smartcontractkit/sqlx v1.3.5-0.20210805004948-4be295aacbeb h1:OMaBUb4X9IFPLbGbCHsMU+kw/BPCrewaVwWGIBc0I4A=
Expand Down
18 changes: 12 additions & 6 deletions core/services/ocr2/plugins/ocr2keeper/evm21/registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,11 +34,14 @@ import (
)

const (
defaultPluginRetryExpiration = 30 * time.Minute
// defaultAllowListExpiration decides how long an upkeep's allow list info will be valid for.
defaultAllowListExpiration = 20 * time.Minute
// allowListCleanupInterval decides when the expired items in allowList cache will be deleted.
allowListCleanupInterval = 5 * time.Minute
defaultAllowListExpiration = 10 * time.Minute
// cleanupInterval decides when the expired items in cache will be deleted.
cleanupInterval = 5 * time.Minute
logTriggerRefreshBatchSize = 32
totalFastPluginRetries = 5
totalMediumPluginRetries = 10
)

var (
Expand Down Expand Up @@ -99,9 +102,10 @@ func NewEvmRegistry(
headFunc: func(ocr2keepers.BlockKey) {},
chLog: make(chan logpoller.Log, 1000),
mercury: &MercuryConfig{
cred: mc,
abi: core.StreamsCompatibleABI,
allowListCache: cache.New(defaultAllowListExpiration, allowListCleanupInterval),
cred: mc,
abi: core.StreamsCompatibleABI,
allowListCache: cache.New(defaultAllowListExpiration, cleanupInterval),
pluginRetryCache: cache.New(defaultPluginRetryExpiration, cleanupInterval),
},
hc: http.DefaultClient,
logEventProvider: logEventProvider,
Expand All @@ -125,6 +129,8 @@ type MercuryConfig struct {
abi abi.ABI
// allowListCache stores the upkeeps privileges. In 2.1, this only includes a JSON bytes for allowed to use mercury
allowListCache *cache.Cache

pluginRetryCache *cache.Cache
}

type EvmRegistry struct {
Expand Down
88 changes: 68 additions & 20 deletions core/services/ocr2/plugins/ocr2keeper/evm21/streams_lookup.go
Original file line number Diff line number Diff line change
Expand Up @@ -162,10 +162,11 @@ func (r *EvmRegistry) streamsLookup(ctx context.Context, checkResults []ocr2keep
func (r *EvmRegistry) doLookup(ctx context.Context, wg *sync.WaitGroup, lookup *StreamsLookup, i int, checkResults []ocr2keepers.CheckResult, lggr logger.Logger) {
defer wg.Done()

state, reason, values, retryable, err := r.doMercuryRequest(ctx, lookup, lggr)
state, reason, values, retryable, ri, err := r.doMercuryRequest(ctx, lookup, generatePluginRetryKey(checkResults[i].WorkID, lookup.block), lggr)
if err != nil {
lggr.Errorf("upkeep %s retryable %v doMercuryRequest: %s", lookup.upkeepId, retryable, err.Error())
lggr.Errorf("upkeep %s retryable %v retryInterval %s doMercuryRequest: %s", lookup.upkeepId, retryable, ri, err.Error())
checkResults[i].Retryable = retryable
checkResults[i].RetryInterval = ri
checkResults[i].PipelineExecutionState = uint8(state)
checkResults[i].IneligibilityReason = uint8(reason)
return
Expand Down Expand Up @@ -278,12 +279,12 @@ func (r *EvmRegistry) checkCallback(ctx context.Context, values [][]byte, lookup
}

// doMercuryRequest sends requests to Mercury API to retrieve mercury data.
func (r *EvmRegistry) doMercuryRequest(ctx context.Context, sl *StreamsLookup, lggr logger.Logger) (encoding.PipelineExecutionState, encoding.UpkeepFailureReason, [][]byte, bool, error) {
func (r *EvmRegistry) doMercuryRequest(ctx context.Context, sl *StreamsLookup, prk string, lggr logger.Logger) (encoding.PipelineExecutionState, encoding.UpkeepFailureReason, [][]byte, bool, time.Duration, error) {
var isMercuryV03 bool
resultLen := len(sl.Feeds)
ch := make(chan MercuryData, resultLen)
if len(sl.Feeds) == 0 {
return encoding.NoPipelineError, encoding.UpkeepFailureReasonInvalidRevertDataInput, [][]byte{}, false, fmt.Errorf("invalid revert data input: feed param key %s, time param key %s, feeds %s", sl.FeedParamKey, sl.TimeParamKey, sl.Feeds)
return encoding.NoPipelineError, encoding.UpkeepFailureReasonInvalidRevertDataInput, [][]byte{}, false, 0 * time.Second, fmt.Errorf("invalid revert data input: feed param key %s, time param key %s, feeds %s", sl.FeedParamKey, sl.TimeParamKey, sl.Feeds)
}
if sl.FeedParamKey == feedIdHex && sl.TimeParamKey == blockNumber {
// only mercury v0.2
Expand All @@ -297,10 +298,11 @@ func (r *EvmRegistry) doMercuryRequest(ctx context.Context, sl *StreamsLookup, l
ch = make(chan MercuryData, resultLen)
go r.multiFeedsRequest(ctx, ch, sl, lggr)
} else {
return encoding.NoPipelineError, encoding.UpkeepFailureReasonInvalidRevertDataInput, [][]byte{}, false, fmt.Errorf("invalid revert data input: feed param key %s, time param key %s, feeds %s", sl.FeedParamKey, sl.TimeParamKey, sl.Feeds)
return encoding.NoPipelineError, encoding.UpkeepFailureReasonInvalidRevertDataInput, [][]byte{}, false, 0 * time.Second, fmt.Errorf("invalid revert data input: feed param key %s, time param key %s, feeds %s", sl.FeedParamKey, sl.TimeParamKey, sl.Feeds)
}

var reqErr error
var ri time.Duration
results := make([][]byte, len(sl.Feeds))
retryable := true
allSuccess := true
Expand All @@ -323,8 +325,11 @@ func (r *EvmRegistry) doMercuryRequest(ctx context.Context, sl *StreamsLookup, l
results[m.Index] = m.Bytes[0]
}
}
if retryable && !allSuccess {
ri = r.calculateRetryConfig(prk)
}
// only retry when not all successful AND none are not retryable
return state, encoding.UpkeepFailureReasonNone, results, retryable && !allSuccess, reqErr
return state, encoding.UpkeepFailureReasonNone, results, retryable && !allSuccess, ri, reqErr
}

// singleFeedRequest sends a v0.2 Mercury request for a single feed report.
Expand Down Expand Up @@ -378,7 +383,7 @@ func (r *EvmRegistry) singleFeedRequest(ctx context.Context, ch chan<- MercuryDa
return err1
}

if resp.StatusCode == http.StatusNotFound || resp.StatusCode == http.StatusInternalServerError {
if resp.StatusCode == http.StatusNotFound || resp.StatusCode == http.StatusInternalServerError || resp.StatusCode == http.StatusBadGateway || resp.StatusCode == http.StatusServiceUnavailable || resp.StatusCode == http.StatusGatewayTimeout {
lggr.Warnf("at block %s upkeep %s received status code %d for feed %s", sl.Time.String(), sl.upkeepId.String(), resp.StatusCode, sl.Feeds[index])
retryable = true
state = encoding.MercuryFlakyFailure
Expand Down Expand Up @@ -415,9 +420,9 @@ func (r *EvmRegistry) singleFeedRequest(ctx context.Context, ch chan<- MercuryDa
sent = true
return nil
},
// only retry when the error is 404 Not Found or 500 Internal Server Error
// only retry when the error is 404 Not Found, 500 Internal Server Error, 502 Bad Gateway, 503 Service Unavailable, 504 Gateway Timeout
retry.RetryIf(func(err error) bool {
return err.Error() == fmt.Sprintf("%d", http.StatusNotFound) || err.Error() == fmt.Sprintf("%d", http.StatusInternalServerError)
return err.Error() == fmt.Sprintf("%d", http.StatusNotFound) || err.Error() == fmt.Sprintf("%d", http.StatusInternalServerError) || err.Error() == fmt.Sprintf("%d", http.StatusBadGateway) || err.Error() == fmt.Sprintf("%d", http.StatusServiceUnavailable) || err.Error() == fmt.Sprintf("%d", http.StatusGatewayTimeout)
}),
retry.Context(ctx),
retry.Delay(retryDelay),
Expand Down Expand Up @@ -504,15 +509,29 @@ func (r *EvmRegistry) multiFeedsRequest(ctx context.Context, ch chan<- MercuryDa
retryable = false
state = encoding.InvalidMercuryRequest
return fmt.Errorf("at timestamp %s upkeep %s received status code %d from mercury v0.3 with message: %s", sl.Time.String(), sl.upkeepId.String(), resp.StatusCode, string(body))
} else if resp.StatusCode == http.StatusInternalServerError {
} else if resp.StatusCode == http.StatusInternalServerError || resp.StatusCode == http.StatusBadGateway || resp.StatusCode == http.StatusServiceUnavailable || resp.StatusCode == http.StatusGatewayTimeout {
retryable = true
state = encoding.MercuryFlakyFailure
return fmt.Errorf("%d", http.StatusInternalServerError)
} else if resp.StatusCode == 420 {
// in 0.3, this will happen when missing/malformed query args, missing or bad required headers, non-existent feeds, or no permissions for feeds
retryable = false
state = encoding.InvalidMercuryRequest
return fmt.Errorf("at timestamp %s upkeep %s received status code %d from mercury v0.3, most likely this is caused by missing/malformed query args, missing or bad required headers, non-existent feeds, or no permissions for feeds", sl.Time.String(), sl.upkeepId.String(), resp.StatusCode)
return fmt.Errorf("%d", resp.StatusCode)
} else if resp.StatusCode == http.StatusPartialContent {
//var response MercuryV03Response
//err1 = json.Unmarshal(body, &response)
//if err1 != nil {
// lggr.Warnf("at timestamp %s upkeep %s failed to unmarshal body to MercuryV03Response from mercury v0.3: %v", sl.Time.String(), sl.upkeepId.String(), err1)
// retryable = false
// state = encoding.MercuryUnmarshalError
// return err1
//}
// in v0.3, if some feeds are not available, the server will only return available feeds, but we need to make sure ALL feeds are retrieved before calling user contract
// hence, retry in this case. retry will help when we send a very new timestamp and reports are not yet generated
//var receivedFeeds []string
//for _, f := range response.Reports {
// receivedFeeds = append(receivedFeeds, f.FeedID)
//}
lggr.Warnf("at timestamp %s upkeep %s requested [%s] feeds but mercury v0.3 server returned 206 status, treating it as 404 and retrying", sl.Time.String(), sl.upkeepId.String(), sl.Feeds)
retryable = true
state = encoding.MercuryFlakyFailure
return fmt.Errorf("%d", http.StatusPartialContent)
} else if resp.StatusCode != http.StatusOK {
retryable = false
state = encoding.InvalidMercuryRequest
Expand All @@ -532,8 +551,11 @@ func (r *EvmRegistry) multiFeedsRequest(ctx context.Context, ch chan<- MercuryDa
// in v0.3, if some feeds are not available, the server will only return available feeds, but we need to make sure ALL feeds are retrieved before calling user contract
// hence, retry in this case. retry will help when we send a very new timestamp and reports are not yet generated
if len(response.Reports) != len(sl.Feeds) {
// TODO: AUTO-5044: calculate what reports are missing and log a warning
lggr.Warnf("at timestamp %s upkeep %s mercury v0.3 server retruned 200 status with %d reports while we requested %d feeds, treating as 404 (not found) and retrying", sl.Time.String(), sl.upkeepId.String(), len(response.Reports), len(sl.Feeds))
var receivedFeeds []string
for _, f := range response.Reports {
receivedFeeds = append(receivedFeeds, f.FeedID)
}
lggr.Warnf("at timestamp %s upkeep %s mercury v0.3 server returned 206 status with [%s] reports while we requested [%s] feeds, retrying", sl.Time.String(), sl.upkeepId.String(), receivedFeeds, sl.Feeds)
retryable = true
state = encoding.MercuryFlakyFailure
return fmt.Errorf("%d", http.StatusNotFound)
Expand All @@ -558,9 +580,9 @@ func (r *EvmRegistry) multiFeedsRequest(ctx context.Context, ch chan<- MercuryDa
sent = true
return nil
},
// only retry when the error is 404 Not Found or 500 Internal Server Error
// only retry when the error is 206 Partial Content, 404 Not Found, 500 Internal Server Error, 502 Bad Gateway, 503 Service Unavailable, 504 Gateway Timeout
retry.RetryIf(func(err error) bool {
return err.Error() == fmt.Sprintf("%d", http.StatusNotFound) || err.Error() == fmt.Sprintf("%d", http.StatusInternalServerError)
return err.Error() == fmt.Sprintf("%d", http.StatusPartialContent) || err.Error() == fmt.Sprintf("%d", http.StatusNotFound) || err.Error() == fmt.Sprintf("%d", http.StatusInternalServerError) || err.Error() == fmt.Sprintf("%d", http.StatusBadGateway) || err.Error() == fmt.Sprintf("%d", http.StatusServiceUnavailable) || err.Error() == fmt.Sprintf("%d", http.StatusGatewayTimeout)
}),
retry.Context(ctx),
retry.Delay(retryDelay),
Expand Down Expand Up @@ -593,3 +615,29 @@ func (r *EvmRegistry) generateHMAC(method string, path string, body []byte, clie
userHmac := hex.EncodeToString(signedMessage.Sum(nil))
return userHmac
}

// calculateRetryConfig returns plugin retry interval based on how many times plugin has retried this work
func (r *EvmRegistry) calculateRetryConfig(prk string) time.Duration {
var ri time.Duration
var retries int
totalAttempts, ok := r.mercury.pluginRetryCache.Get(prk)
if ok {
retries = totalAttempts.(int)
if retries < totalFastPluginRetries {
ri = 1 * time.Second
} else if retries < totalMediumPluginRetries {
ri = 5 * time.Second
}
// if the core node has retried totalMediumPluginRetries times, do not set retry interval and plugin will use
// the default interval
} else {
ri = 1 * time.Second
}
r.mercury.pluginRetryCache.Set(prk, retries+1, cache.DefaultExpiration)
return ri
}

// generatePluginRetryKey returns a plugin retry cache key
func generatePluginRetryKey(workID string, block uint64) string {
return workID + "|" + fmt.Sprintf("%d", block)
}
Loading

0 comments on commit 3b9585e

Please sign in to comment.