Skip to content

Commit

Permalink
fix mercury 0.3 DoRequest
Browse files Browse the repository at this point in the history
  • Loading branch information
infiloop2 committed Feb 19, 2024
1 parent fa054c1 commit 1219a12
Showing 1 changed file with 73 additions and 47 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -61,37 +61,39 @@ func NewClient(mercuryConfig mercury.MercuryConfigProvider, httpClient mercury.H
}
}

func (c *client) DoRequest(ctx context.Context, streamsLookup *mercury.StreamsLookup, pluginRetryKey string) (encoding.PipelineExecutionState, encoding.UpkeepFailureReason, [][]byte, bool, time.Duration, encoding.ErrCode, error) {
func (c *client) DoRequest(ctx context.Context, streamsLookup *mercury.StreamsLookup, pluginRetryKey string) (encoding.PipelineExecutionState, [][]byte, encoding.ErrCode, bool, time.Duration, error) {
if len(streamsLookup.Feeds) == 0 {
return encoding.NoPipelineError, encoding.UpkeepFailureReasonInvalidRevertDataInput, [][]byte{}, false, 0 * time.Second, encoding.ErrCodeNil, fmt.Errorf("invalid revert data input: feed param key %s, time param key %s, feeds %s", streamsLookup.FeedParamKey, streamsLookup.TimeParamKey, streamsLookup.Feeds)
return encoding.NoPipelineError, [][]byte{}, encoding.ErrCodeStreamsBadRequest, false, 0 * time.Second, nil
}
resultLen := 1 // Only 1 multi-feed request is made for all feeds
ch := make(chan mercury.MercuryData, resultLen)
c.threadCtrl.Go(func(ctx context.Context) {
c.multiFeedsRequest(ctx, ch, streamsLookup)
})

var reqErr error
var retryInterval time.Duration
var errCode encoding.ErrCode
results := make([][]byte, len(streamsLookup.Feeds))
retryable := false
state := encoding.NoPipelineError

m := <-ch
if m.Error != nil {
reqErr = m.Error
retryable = m.Retryable
state = m.State
errCode = m.ErrCode
if retryable {
retryInterval = mercury.CalculateRetryConfigFn(pluginRetryKey, c.mercuryConfig)
// There was a pipeline error during execution
// If error was non retryable then just return the state and error
if !m.Retryable {
return m.State, nil, m.ErrCode, m.Retryable, 0 * time.Second, m.Error
}
} else {
results = m.Bytes
// If errors were retryable then calculate retry interval
retryInterval := mercury.CalculateRetryConfigFn(pluginRetryKey, c.mercuryConfig)
if retryInterval != mercury.RetryIntervalTimeout {
// Return the retyrable state with appropriate retry interval
return m.State, nil, m.ErrCode, m.Retryable, retryInterval, m.Error
}

// Now we have exhausted all retries and we have an error code to expose to user expose it with noPipelineError
// otherwise expose the pipeline error to pipeline runner (not the user) as non retryable
if m.ErrCode != encoding.ErrCodeNil {
return encoding.NoPipelineError, nil, m.ErrCode, false, 0 * time.Second, nil
}
return m.State, nil, m.ErrCode, false, 0 * time.Second, m.Error
}

return state, encoding.UpkeepFailureReasonNone, results, retryable, retryInterval, errCode, reqErr
return encoding.NoPipelineError, m.Bytes, encoding.ErrCodeNil, false, 0 * time.Second, nil
}

func (c *client) multiFeedsRequest(ctx context.Context, ch chan<- mercury.MercuryData, sl *mercury.StreamsLookup) {
Expand All @@ -112,7 +114,8 @@ func (c *client) multiFeedsRequest(ctx context.Context, ch chan<- mercury.Mercur

req, err := http.NewRequestWithContext(ctx, http.MethodGet, reqUrl, nil)
if err != nil {
ch <- mercury.MercuryData{Index: 0, Error: err, Retryable: false, State: encoding.InvalidMercuryRequest}
// Not a pipeline error, a bad streams request
ch <- mercury.MercuryData{Index: 0, ErrCode: encoding.ErrCodeStreamsBadRequest, State: encoding.NoPipelineError}
return
}

Expand All @@ -130,9 +133,9 @@ func (c *client) multiFeedsRequest(ctx context.Context, ch chan<- mercury.Mercur

// in the case of multiple retries here, use the last attempt's data
state := encoding.NoPipelineError
errCode := encoding.ErrCodeNil
retryable := false
sent := false
errCode := encoding.ErrCodeNil
retryErr := retry.Do(
func() error {
retryable = false
Expand All @@ -147,23 +150,36 @@ func (c *client) multiFeedsRequest(ctx context.Context, ch chan<- mercury.Mercur

body, err := io.ReadAll(resp.Body)
if err != nil {
retryable = false
state = encoding.InvalidMercuryResponse
return err
// Not a pipeline error, a bad streams response, send back error code
ch <- mercury.MercuryData{
Index: 0,
ErrCode: encoding.ErrCodeStreamsBadResponse,
State: encoding.NoPipelineError,
}
sent = true
return nil
}

c.lggr.Infof("at timestamp %s upkeep %s received status code %d from mercury v0.3", sl.Time.String(), sl.UpkeepId.String(), resp.StatusCode)
switch resp.StatusCode {
case http.StatusUnauthorized:
retryable = false
state = encoding.UpkeepNotAuthorized
errCode = encoding.HttpToStreamsErrCode(resp.StatusCode)
return fmt.Errorf("at timestamp %s upkeep %s received status code %d from mercury v0.3, most likely this is caused by unauthorized upkeep", sl.Time.String(), sl.UpkeepId.String(), resp.StatusCode)
c.lggr.Errorf("at timestamp %s upkeep %s received status code %d from mercury v0.3, most likely this is caused by unauthorized upkeep", sl.Time.String(), sl.UpkeepId.String(), resp.StatusCode)
ch <- mercury.MercuryData{
Index: 0,
ErrCode: encoding.ErrCodeStreamsUnauthorized,
State: encoding.NoPipelineError,
}
sent = true
return nil
case http.StatusBadRequest:
retryable = false
state = encoding.InvalidMercuryRequest
errCode = encoding.HttpToStreamsErrCode(resp.StatusCode)
return fmt.Errorf("at timestamp %s upkeep %s received status code %d from mercury v0.3, most likely this is caused by invalid format of timestamp", sl.Time.String(), sl.UpkeepId.String(), resp.StatusCode)
c.lggr.Errorf("at timestamp %s upkeep %s received status code %d from mercury v0.3, most likely this is caused by invalid format of timestamp", sl.Time.String(), sl.UpkeepId.String(), resp.StatusCode)
ch <- mercury.MercuryData{
Index: 0,
ErrCode: encoding.HttpToStreamsErrCode(resp.StatusCode),
State: encoding.NoPipelineError,
}
sent = true
return nil
case http.StatusInternalServerError, http.StatusBadGateway, http.StatusServiceUnavailable, http.StatusGatewayTimeout:
retryable = true
state = encoding.MercuryFlakyFailure
Expand All @@ -179,20 +195,28 @@ func (c *client) multiFeedsRequest(ctx context.Context, ch chan<- mercury.Mercur
case http.StatusOK:
// continue
default:
retryable = false
state = encoding.InvalidMercuryRequest
errCode = encoding.ErrCodeStreamsBadRequest
return fmt.Errorf("at timestamp %s upkeep %s received status code %d from mercury v0.3", sl.Time.String(), sl.UpkeepId.String(), resp.StatusCode)
// Not considered as a pipeline error, a bad streams response with unknown status code. Send back to user as error code
c.lggr.Errorf("at timestamp %s upkeep %s received status code %d from mercury v0.3", sl.Time.String(), sl.UpkeepId.String(), resp.StatusCode)
ch <- mercury.MercuryData{
Index: 0,
ErrCode: encoding.ErrCodeStreamsUnknownError,
State: encoding.NoPipelineError,
}
sent = true
return nil
}
c.lggr.Debugf("at block %s upkeep %s received status code %d from mercury v0.3 with BODY=%s", sl.Time.String(), sl.UpkeepId.String(), resp.StatusCode, hexutil.Encode(body))

var response MercuryV03Response
if err := json.Unmarshal(body, &response); err != nil {
c.lggr.Warnf("at timestamp %s upkeep %s failed to unmarshal body to MercuryV03Response from mercury v0.3: %v", sl.Time.String(), sl.UpkeepId.String(), err)
retryable = false
state = encoding.MercuryUnmarshalError
errCode = encoding.ErrCodeStreamsEncodingError
return err
ch <- mercury.MercuryData{
Index: 0,
ErrCode: encoding.ErrCodeStreamsBadResponse,
State: encoding.NoPipelineError,
}
sent = true
return nil
}

// in v0.3, if some feeds are not available, the server will only return available feeds, but we need to make sure ALL feeds are retrieved before calling user contract
Expand All @@ -213,18 +237,20 @@ func (c *client) multiFeedsRequest(ctx context.Context, ch chan<- mercury.Mercur
b, err := hexutil.Decode(rsp.FullReport)
if err != nil {
c.lggr.Warnf("at timestamp %s upkeep %s failed to decode reportBlob %s: %v", sl.Time.String(), sl.UpkeepId.String(), rsp.FullReport, err)
retryable = false
state = encoding.InvalidMercuryResponse
errCode = encoding.ErrCodeStreamsEncodingError
return err
ch <- mercury.MercuryData{
Index: 0,
ErrCode: encoding.ErrCodeStreamsBadResponse,
State: encoding.NoPipelineError,
}
sent = true
return nil
}
reportBytes = append(reportBytes, b)
}
ch <- mercury.MercuryData{
Index: 0,
Bytes: reportBytes,
Retryable: false,
State: encoding.NoPipelineError,
Index: 0,
Bytes: reportBytes,
State: encoding.NoPipelineError,
}
sent = true
return nil
Expand Down

0 comments on commit 1219a12

Please sign in to comment.