diff --git a/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/mercury/v03/request.go b/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/mercury/v03/request.go index 9bc4e72dc09..96f62e3853b 100644 --- a/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/mercury/v03/request.go +++ b/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/mercury/v03/request.go @@ -61,9 +61,9 @@ func NewClient(mercuryConfig mercury.MercuryConfigProvider, httpClient mercury.H } } -func (c *client) DoRequest(ctx context.Context, streamsLookup *mercury.StreamsLookup, pluginRetryKey string) (encoding.PipelineExecutionState, encoding.UpkeepFailureReason, [][]byte, bool, time.Duration, encoding.ErrCode, error) { +func (c *client) DoRequest(ctx context.Context, streamsLookup *mercury.StreamsLookup, pluginRetryKey string) (encoding.PipelineExecutionState, [][]byte, encoding.ErrCode, bool, time.Duration, error) { if len(streamsLookup.Feeds) == 0 { - return encoding.NoPipelineError, encoding.UpkeepFailureReasonInvalidRevertDataInput, [][]byte{}, false, 0 * time.Second, encoding.ErrCodeNil, fmt.Errorf("invalid revert data input: feed param key %s, time param key %s, feeds %s", streamsLookup.FeedParamKey, streamsLookup.TimeParamKey, streamsLookup.Feeds) + return encoding.NoPipelineError, [][]byte{}, encoding.ErrCodeStreamsBadRequest, false, 0 * time.Second, nil } resultLen := 1 // Only 1 multi-feed request is made for all feeds ch := make(chan mercury.MercuryData, resultLen) @@ -71,27 +71,29 @@ func (c *client) DoRequest(ctx context.Context, streamsLookup *mercury.StreamsLo c.multiFeedsRequest(ctx, ch, streamsLookup) }) - var reqErr error - var retryInterval time.Duration - var errCode encoding.ErrCode - results := make([][]byte, len(streamsLookup.Feeds)) - retryable := false - state := encoding.NoPipelineError - m := <-ch if m.Error != nil { - reqErr = m.Error - retryable = m.Retryable - state = m.State - errCode = m.ErrCode - if retryable { - retryInterval = mercury.CalculateRetryConfigFn(pluginRetryKey, c.mercuryConfig) + // There was a pipeline error during execution + // If error was non retryable then just return the state and error + if !m.Retryable { + return m.State, nil, m.ErrCode, m.Retryable, 0 * time.Second, m.Error } - } else { - results = m.Bytes + // If errors were retryable then calculate retry interval + retryInterval := mercury.CalculateRetryConfigFn(pluginRetryKey, c.mercuryConfig) + if retryInterval != mercury.RetryIntervalTimeout { + // Return the retyrable state with appropriate retry interval + return m.State, nil, m.ErrCode, m.Retryable, retryInterval, m.Error + } + + // Now we have exhausted all retries and we have an error code to expose to user expose it with noPipelineError + // otherwise expose the pipeline error to pipeline runner (not the user) as non retryable + if m.ErrCode != encoding.ErrCodeNil { + return encoding.NoPipelineError, nil, m.ErrCode, false, 0 * time.Second, nil + } + return m.State, nil, m.ErrCode, false, 0 * time.Second, m.Error } - return state, encoding.UpkeepFailureReasonNone, results, retryable, retryInterval, errCode, reqErr + return encoding.NoPipelineError, m.Bytes, encoding.ErrCodeNil, false, 0 * time.Second, nil } func (c *client) multiFeedsRequest(ctx context.Context, ch chan<- mercury.MercuryData, sl *mercury.StreamsLookup) { @@ -112,7 +114,8 @@ func (c *client) multiFeedsRequest(ctx context.Context, ch chan<- mercury.Mercur req, err := http.NewRequestWithContext(ctx, http.MethodGet, reqUrl, nil) if err != nil { - ch <- mercury.MercuryData{Index: 0, Error: err, Retryable: false, State: encoding.InvalidMercuryRequest} + // Not a pipeline error, a bad streams request + ch <- mercury.MercuryData{Index: 0, ErrCode: encoding.ErrCodeStreamsBadRequest, State: encoding.NoPipelineError} return } @@ -130,9 +133,9 @@ func (c *client) multiFeedsRequest(ctx context.Context, ch chan<- mercury.Mercur // in the case of multiple retries here, use the last attempt's data state := encoding.NoPipelineError + errCode := encoding.ErrCodeNil retryable := false sent := false - errCode := encoding.ErrCodeNil retryErr := retry.Do( func() error { retryable = false @@ -147,23 +150,36 @@ func (c *client) multiFeedsRequest(ctx context.Context, ch chan<- mercury.Mercur body, err := io.ReadAll(resp.Body) if err != nil { - retryable = false - state = encoding.InvalidMercuryResponse - return err + // Not a pipeline error, a bad streams response, send back error code + ch <- mercury.MercuryData{ + Index: 0, + ErrCode: encoding.ErrCodeStreamsBadResponse, + State: encoding.NoPipelineError, + } + sent = true + return nil } c.lggr.Infof("at timestamp %s upkeep %s received status code %d from mercury v0.3", sl.Time.String(), sl.UpkeepId.String(), resp.StatusCode) switch resp.StatusCode { case http.StatusUnauthorized: - retryable = false - state = encoding.UpkeepNotAuthorized - errCode = encoding.HttpToStreamsErrCode(resp.StatusCode) - return fmt.Errorf("at timestamp %s upkeep %s received status code %d from mercury v0.3, most likely this is caused by unauthorized upkeep", sl.Time.String(), sl.UpkeepId.String(), resp.StatusCode) + c.lggr.Errorf("at timestamp %s upkeep %s received status code %d from mercury v0.3, most likely this is caused by unauthorized upkeep", sl.Time.String(), sl.UpkeepId.String(), resp.StatusCode) + ch <- mercury.MercuryData{ + Index: 0, + ErrCode: encoding.ErrCodeStreamsUnauthorized, + State: encoding.NoPipelineError, + } + sent = true + return nil case http.StatusBadRequest: - retryable = false - state = encoding.InvalidMercuryRequest - errCode = encoding.HttpToStreamsErrCode(resp.StatusCode) - return fmt.Errorf("at timestamp %s upkeep %s received status code %d from mercury v0.3, most likely this is caused by invalid format of timestamp", sl.Time.String(), sl.UpkeepId.String(), resp.StatusCode) + c.lggr.Errorf("at timestamp %s upkeep %s received status code %d from mercury v0.3, most likely this is caused by invalid format of timestamp", sl.Time.String(), sl.UpkeepId.String(), resp.StatusCode) + ch <- mercury.MercuryData{ + Index: 0, + ErrCode: encoding.HttpToStreamsErrCode(resp.StatusCode), + State: encoding.NoPipelineError, + } + sent = true + return nil case http.StatusInternalServerError, http.StatusBadGateway, http.StatusServiceUnavailable, http.StatusGatewayTimeout: retryable = true state = encoding.MercuryFlakyFailure @@ -179,20 +195,28 @@ func (c *client) multiFeedsRequest(ctx context.Context, ch chan<- mercury.Mercur case http.StatusOK: // continue default: - retryable = false - state = encoding.InvalidMercuryRequest - errCode = encoding.ErrCodeStreamsBadRequest - return fmt.Errorf("at timestamp %s upkeep %s received status code %d from mercury v0.3", sl.Time.String(), sl.UpkeepId.String(), resp.StatusCode) + // Not considered as a pipeline error, a bad streams response with unknown status code. Send back to user as error code + c.lggr.Errorf("at timestamp %s upkeep %s received status code %d from mercury v0.3", sl.Time.String(), sl.UpkeepId.String(), resp.StatusCode) + ch <- mercury.MercuryData{ + Index: 0, + ErrCode: encoding.ErrCodeStreamsUnknownError, + State: encoding.NoPipelineError, + } + sent = true + return nil } c.lggr.Debugf("at block %s upkeep %s received status code %d from mercury v0.3 with BODY=%s", sl.Time.String(), sl.UpkeepId.String(), resp.StatusCode, hexutil.Encode(body)) var response MercuryV03Response if err := json.Unmarshal(body, &response); err != nil { c.lggr.Warnf("at timestamp %s upkeep %s failed to unmarshal body to MercuryV03Response from mercury v0.3: %v", sl.Time.String(), sl.UpkeepId.String(), err) - retryable = false - state = encoding.MercuryUnmarshalError - errCode = encoding.ErrCodeStreamsEncodingError - return err + ch <- mercury.MercuryData{ + Index: 0, + ErrCode: encoding.ErrCodeStreamsBadResponse, + State: encoding.NoPipelineError, + } + sent = true + return nil } // in v0.3, if some feeds are not available, the server will only return available feeds, but we need to make sure ALL feeds are retrieved before calling user contract @@ -213,18 +237,20 @@ func (c *client) multiFeedsRequest(ctx context.Context, ch chan<- mercury.Mercur b, err := hexutil.Decode(rsp.FullReport) if err != nil { c.lggr.Warnf("at timestamp %s upkeep %s failed to decode reportBlob %s: %v", sl.Time.String(), sl.UpkeepId.String(), rsp.FullReport, err) - retryable = false - state = encoding.InvalidMercuryResponse - errCode = encoding.ErrCodeStreamsEncodingError - return err + ch <- mercury.MercuryData{ + Index: 0, + ErrCode: encoding.ErrCodeStreamsBadResponse, + State: encoding.NoPipelineError, + } + sent = true + return nil } reportBytes = append(reportBytes, b) } ch <- mercury.MercuryData{ - Index: 0, - Bytes: reportBytes, - Retryable: false, - State: encoding.NoPipelineError, + Index: 0, + Bytes: reportBytes, + State: encoding.NoPipelineError, } sent = true return nil