Skip to content

Commit

Permalink
fix mercury v0.2 request handling
Browse files Browse the repository at this point in the history
  • Loading branch information
infiloop2 committed Feb 19, 2024
1 parent ed7baf4 commit fa054c1
Show file tree
Hide file tree
Showing 3 changed files with 101 additions and 39 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -40,9 +40,7 @@ const (
MercuryFlakyFailure PipelineExecutionState = 4
PackUnpackDecodeFailed PipelineExecutionState = 5
MercuryUnmarshalError PipelineExecutionState = 6
InvalidMercuryRequest PipelineExecutionState = 7
InvalidMercuryResponse PipelineExecutionState = 8 // this will only happen if Mercury server sends bad responses
UpkeepNotAuthorized PipelineExecutionState = 9
UpkeepNotAuthorized PipelineExecutionState = 7
)

// ErrCode is used for invoking an error handler with a specific error code.
Expand All @@ -54,8 +52,9 @@ const (
ErrCodeStreamsBadRequest ErrCode = 808400
ErrCodeStreamsUnauthorized ErrCode = 808401
ErrCodeStreamsInternalError ErrCode = 808500
ErrCodeStreamsEncodingError ErrCode = 808600
ErrCodeStreamsTimeout ErrCode = 808603
ErrCodeStreamsBadResponse ErrCode = 808600
ErrCodeStreamsTimeout ErrCode = 808602
ErrCodeStreamsUnknownError ErrCode = 808700
)

func HttpToStreamsErrCode(statusCode int) ErrCode {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,11 +68,11 @@ var CalculateRetryConfigFn = func(prk string, mercuryConfig MercuryConfigProvide

type MercuryData struct {
Index int
Error error
ErrCode encoding.ErrCode
Retryable bool
Bytes [][]byte
State encoding.PipelineExecutionState
Bytes [][]byte // Mercury values is request is successful
ErrCode encoding.ErrCode // Error code if mercury gives an error
State encoding.PipelineExecutionState // NoPipelineError if no error during execution, otherwise appropriate error
Retryable bool // Applicable if State != NoPipelineError
Error error // non nil if State != NoPipelineError
}

type MercuryConfigProvider interface {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,11 +53,11 @@ func NewClient(mercuryConfig mercury.MercuryConfigProvider, httpClient mercury.H
}
}

func (c *client) DoRequest(ctx context.Context, streamsLookup *mercury.StreamsLookup, pluginRetryKey string) (encoding.PipelineExecutionState, encoding.UpkeepFailureReason, [][]byte, bool, time.Duration, encoding.ErrCode, error) {
func (c *client) DoRequest(ctx context.Context, streamsLookup *mercury.StreamsLookup, pluginRetryKey string) (encoding.PipelineExecutionState, [][]byte, encoding.ErrCode, bool, time.Duration, error) {
resultLen := len(streamsLookup.Feeds)
ch := make(chan mercury.MercuryData, resultLen)
if len(streamsLookup.Feeds) == 0 {
return encoding.NoPipelineError, encoding.UpkeepFailureReasonInvalidRevertDataInput, [][]byte{}, false, 0 * time.Second, encoding.ErrCodeNil, fmt.Errorf("invalid revert data input: feed param key %s, time param key %s, feeds %s", streamsLookup.FeedParamKey, streamsLookup.TimeParamKey, streamsLookup.Feeds)
return encoding.NoPipelineError, [][]byte{}, encoding.ErrCodeStreamsBadRequest, false, 0 * time.Second, nil
}
for i := range streamsLookup.Feeds {
// TODO (AUTO-7209): limit the number of concurrent requests
Expand All @@ -67,33 +67,69 @@ func (c *client) DoRequest(ctx context.Context, streamsLookup *mercury.StreamsLo
})
}

state := encoding.NoPipelineError
var reqErr error
var retryInterval time.Duration
retryable := true
allFeedsPipelineSuccess := true
allFeedsReturnedValues := true
var errCode encoding.ErrCode
results := make([][]byte, len(streamsLookup.Feeds))
retryable := true
allSuccess := true
// in v0.2, use the last execution error as the state, if no execution errors, state will be no error
state := encoding.NoPipelineError

// in v0.2, when combining results for multiple feed requests
// if any request resulted in pipeline execution error then use the last execution error as the state
// if no execution errors, then check if any feed returned an error code
// When combining multiple results,
for i := 0; i < resultLen; i++ {
m := <-ch
if m.Error != nil {
state = m.State
reqErr = errors.Join(reqErr, m.Error)
retryable = retryable && m.Retryable
errCode = m.ErrCode
allSuccess = false
if m.State != encoding.NoPipelineError {
state = m.State
if m.ErrCode != encoding.ErrCodeNil {
// Some pipeline errors can get converted to error codes if retries are exhausted
errCode = m.ErrCode
}
allFeedsPipelineSuccess = false
continue
}
if m.ErrCode != encoding.ErrCodeNil {
errCode = m.ErrCode
allFeedsReturnedValues = false
continue
}
// Feed request didn't face a pipeline error and didn't return an error code
results[m.Index] = m.Bytes[0]
}
if retryable && !allSuccess {
retryInterval = mercury.CalculateRetryConfigFn(pluginRetryKey, c.mercuryConfig)

if !allFeedsPipelineSuccess {
// Some feeds faced a pipeline error during execution
// If any error was non retryable then just return the state and error
if !retryable {
return state, nil, errCode, retryable, 0 * time.Second, reqErr
}
// If errors were retryable then calculate retry interval
retryInterval := mercury.CalculateRetryConfigFn(pluginRetryKey, c.mercuryConfig)
if retryInterval == mercury.RetryIntervalTimeout {
// If we have exhausted all retries and we have an error code to expose to user expose it with noPipelineError
// otherwise expose the last pipeline error to pipeline runner (not the user)
if errCode != encoding.ErrCodeNil {
return encoding.NoPipelineError, nil, errCode, false, 0 * time.Second, nil
}
return state, nil, errCode, false, 0 * time.Second, reqErr
}

// Return the retyrable state with appropriate retry interval
return state, nil, errCode, retryable, retryInterval, reqErr
}

// All feeds faced no pipeline error
// If any feed request returned an error code, return the error code with empty values, else return the values
if !allFeedsReturnedValues {
return encoding.NoPipelineError, nil, errCode, false, 0 * time.Second, reqErr
}
// only retry when not all successful AND none are not retryable
return state, encoding.UpkeepFailureReasonNone, results, retryable && !allSuccess, retryInterval, errCode, reqErr

// All success, return the results
return state, results, encoding.ErrCodeNil, false, 0 * time.Second, nil
}

func (c *client) singleFeedRequest(ctx context.Context, ch chan<- mercury.MercuryData, index int, sl *mercury.StreamsLookup) {
Expand All @@ -110,7 +146,8 @@ func (c *client) singleFeedRequest(ctx context.Context, ch chan<- mercury.Mercur

httpRequest, err = http.NewRequestWithContext(ctx, http.MethodGet, reqUrl, nil)
if err != nil {
ch <- mercury.MercuryData{Index: index, Error: err, Retryable: false, State: encoding.InvalidMercuryRequest}
// Not a pipeline error, a bad streams request
ch <- mercury.MercuryData{Index: index, ErrCode: encoding.ErrCodeStreamsBadRequest, Bytes: nil, State: encoding.NoPipelineError}
return
}

Expand Down Expand Up @@ -142,38 +179,64 @@ func (c *client) singleFeedRequest(ctx context.Context, ch chan<- mercury.Mercur
defer httpResponse.Body.Close()

if responseBody, err = io.ReadAll(httpResponse.Body); err != nil {
state = encoding.InvalidMercuryResponse
return err
// Not a pipeline error, a bad streams response, send back error code
ch <- mercury.MercuryData{
Index: index,
Bytes: [][]byte{},
ErrCode: encoding.ErrCodeStreamsBadResponse,
State: encoding.NoPipelineError,
}
sent = true
return nil
}

switch httpResponse.StatusCode {
case http.StatusNotFound, http.StatusInternalServerError, http.StatusBadGateway, http.StatusServiceUnavailable, http.StatusGatewayTimeout:
// Considered as pipeline error, but if retry attempts go over threshold, is changed upstream to ErrCode
c.lggr.Warnf("at block %s upkeep %s received status code %d for feed %s", sl.Time.String(), sl.UpkeepId.String(), httpResponse.StatusCode, sl.Feeds[index])
retryable = true
state = encoding.MercuryFlakyFailure
retryable = true
errCode = encoding.HttpToStreamsErrCode(httpResponse.StatusCode)
return errors.New(strconv.FormatInt(int64(httpResponse.StatusCode), 10))
case http.StatusOK:
// continue
default:
state = encoding.InvalidMercuryRequest
return fmt.Errorf("at block %s upkeep %s received status code %d for feed %s", sl.Time.String(), sl.UpkeepId.String(), httpResponse.StatusCode, sl.Feeds[index])
// Not considered as a pipeline error, a bad streams response with unknown status code. Send back to user as error code
c.lggr.Errorf("at block %s upkeep %s received unhandled status code %d for feed %s", sl.Time.String(), sl.UpkeepId.String(), httpResponse.StatusCode, sl.Feeds[index])
ch <- mercury.MercuryData{
Index: index,
Bytes: [][]byte{},
ErrCode: encoding.ErrCodeStreamsUnknownError,
State: encoding.NoPipelineError,
}
sent = true
return nil
}

c.lggr.Debugf("at block %s upkeep %s received status code %d from mercury v0.2 with BODY=%s", sl.Time.String(), sl.UpkeepId.String(), httpResponse.StatusCode, hexutil.Encode(responseBody))

var m MercuryV02Response
if err = json.Unmarshal(responseBody, &m); err != nil {
c.lggr.Warnf("at block %s upkeep %s failed to unmarshal body to MercuryV02Response for feed %s: %v", sl.Time.String(), sl.UpkeepId.String(), sl.Feeds[index], err)
state = encoding.MercuryUnmarshalError
errCode = encoding.ErrCodeStreamsEncodingError
return err
ch <- mercury.MercuryData{
Index: index,
Bytes: [][]byte{},
ErrCode: encoding.ErrCodeStreamsBadResponse,
State: encoding.NoPipelineError,
}
sent = true
return nil
}
if blobBytes, err = hexutil.Decode(m.ChainlinkBlob); err != nil {
c.lggr.Warnf("at block %s upkeep %s failed to decode chainlinkBlob %s for feed %s: %v", sl.Time.String(), sl.UpkeepId.String(), m.ChainlinkBlob, sl.Feeds[index], err)
state = encoding.InvalidMercuryResponse
errCode = encoding.ErrCodeStreamsEncodingError
return err
ch <- mercury.MercuryData{
Index: index,
Bytes: [][]byte{},
ErrCode: encoding.ErrCodeStreamsBadResponse,
State: encoding.NoPipelineError,
}
sent = true
return nil
}
ch <- mercury.MercuryData{
Index: index,
Expand All @@ -197,10 +260,10 @@ func (c *client) singleFeedRequest(ctx context.Context, ch chan<- mercury.Mercur
ch <- mercury.MercuryData{
Index: index,
Bytes: [][]byte{},
Retryable: retryable,
Error: fmt.Errorf("failed to request feed for %s: %w", sl.Feeds[index], retryErr),
ErrCode: errCode,
State: state,
Retryable: retryable,
Error: fmt.Errorf("failed to request feed for %s: %w", sl.Feeds[index], retryErr),
}
}
}
Expand Down

0 comments on commit fa054c1

Please sign in to comment.