diff --git a/core/scripts/chaincli/handler/debug.go b/core/scripts/chaincli/handler/debug.go index 8b06937fc2c..629483f2453 100644 --- a/core/scripts/chaincli/handler/debug.go +++ b/core/scripts/chaincli/handler/debug.go @@ -324,16 +324,17 @@ func (k *Keeper) Debug(ctx context.Context, args []string) { checkResults := []ocr2keepers.CheckResult{automationCheckResult} var values [][]byte - values, err = streams.DoMercuryRequest(ctx, streamsLookup, checkResults, 0) + var errCode encoding.ErrCode + values, errCode, err = streams.DoMercuryRequest(ctx, streamsLookup, checkResults, 0) if checkResults[0].IneligibilityReason == uint8(encoding.UpkeepFailureReasonInvalidRevertDataInput) { resolveIneligible("upkeep used invalid revert data") } - if checkResults[0].PipelineExecutionState == uint8(encoding.InvalidMercuryRequest) { - resolveIneligible("the data streams request data is invalid") - } if err != nil { - failCheckConfig("failed to do data streams request ", err) + failCheckConfig("pipeline execution error, failed to do data streams request ", err) + } + if errCode != encoding.ErrCodeNil { + failCheckConfig(fmt.Sprintf("data streams error, failed to do data streams request with error code %d", errCode), nil) } // do checkCallback diff --git a/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/encoding/interface.go b/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/encoding/interface.go index dc379217d83..2f3488c9c24 100644 --- a/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/encoding/interface.go +++ b/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/encoding/interface.go @@ -33,43 +33,42 @@ const ( UpkeepFailureReasonTxHashReorged UpkeepFailureReason = 36 // pipeline execution error - NoPipelineError PipelineExecutionState = 0 - CheckBlockTooOld PipelineExecutionState = 1 - CheckBlockInvalid PipelineExecutionState = 2 - RpcFlakyFailure PipelineExecutionState = 3 - MercuryFlakyFailure PipelineExecutionState = 4 - PackUnpackDecodeFailed PipelineExecutionState = 5 - MercuryUnmarshalError PipelineExecutionState = 6 - InvalidMercuryRequest PipelineExecutionState = 7 - InvalidMercuryResponse PipelineExecutionState = 8 // this will only happen if Mercury server sends bad responses - UpkeepNotAuthorized PipelineExecutionState = 9 + NoPipelineError PipelineExecutionState = 0 + CheckBlockTooOld PipelineExecutionState = 1 + CheckBlockInvalid PipelineExecutionState = 2 + RpcFlakyFailure PipelineExecutionState = 3 + MercuryFlakyFailure PipelineExecutionState = 4 + PackUnpackDecodeFailed PipelineExecutionState = 5 + PrivilegeConfigUnmarshalError PipelineExecutionState = 6 ) // ErrCode is used for invoking an error handler with a specific error code. type ErrCode uint32 const ( - ErrCodeNil ErrCode = 0 - ErrCodePartialContent ErrCode = 808206 - ErrCodeDataStreamsError ErrCode = 808500 - ErrCodeBadRequest ErrCode = 808400 - ErrCodeUnauthorized ErrCode = 808401 - ErrCodeEncodingError ErrCode = 808600 - ErrCodeStreamLookupTimeout ErrCode = 808603 + // TODO: Finalize these + ErrCodeNil ErrCode = 0 + ErrCodeStreamsPartialContent ErrCode = 808206 + ErrCodeStreamsBadRequest ErrCode = 808400 + ErrCodeStreamsUnauthorized ErrCode = 808401 + ErrCodeStreamsInternalError ErrCode = 808500 + ErrCodeStreamsBadResponse ErrCode = 808600 + ErrCodeStreamsTimeout ErrCode = 808602 + ErrCodeStreamsUnknownError ErrCode = 808700 ) -func HttpToErrCode(statusCode int) ErrCode { +func HttpToStreamsErrCode(statusCode int) ErrCode { switch statusCode { case http.StatusOK: return ErrCodeNil case http.StatusPartialContent: - return ErrCodePartialContent + return ErrCodeStreamsPartialContent case http.StatusBadRequest: - return ErrCodeBadRequest + return ErrCodeStreamsBadRequest case http.StatusUnauthorized: - return ErrCodeUnauthorized + return ErrCodeStreamsUnauthorized case http.StatusInternalServerError, http.StatusBadGateway, http.StatusServiceUnavailable, http.StatusGatewayTimeout: - return ErrCodeDataStreamsError + return ErrCodeStreamsInternalError default: return 0 } diff --git a/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/mercury/mercury.go b/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/mercury/mercury.go index f951072003b..6702e02e73d 100644 --- a/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/mercury/mercury.go +++ b/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/mercury/mercury.go @@ -10,6 +10,7 @@ import ( "net/http" "time" + automationTypes "github.com/smartcontractkit/chainlink-automation/pkg/v3/types" "github.com/smartcontractkit/chainlink-common/pkg/types" "github.com/ethereum/go-ethereum/accounts/abi" @@ -45,8 +46,8 @@ var GenerateHMACFn = func(method string, path string, body []byte, clientId stri return userHmac } -// CalculateRetryConfig returns plugin retry interval based on how many times plugin has retried this work -var CalculateRetryConfigFn = func(prk string, mercuryConfig MercuryConfigProvider) (retryInterval time.Duration) { +// CalculateStreamsRetryConfig returns plugin retry interval based on how many times plugin has retried this work +var CalculateStreamsRetryConfigFn = func(upkeepType automationTypes.UpkeepType, prk string, mercuryConfig MercuryConfigProvider) (retryInterval time.Duration) { var retries int totalAttempts, ok := mercuryConfig.GetPluginRetry(prk) if ok { @@ -61,17 +62,21 @@ var CalculateRetryConfigFn = func(prk string, mercuryConfig MercuryConfigProvide } else { retryInterval = 1 * time.Second } + if upkeepType == automationTypes.ConditionTrigger { + // Conditional Upkees don't have any pipeline retries as they automatically get checked on a new block + retryInterval = RetryIntervalTimeout + } mercuryConfig.SetPluginRetry(prk, retries+1, cache.DefaultExpiration) return retryInterval } type MercuryData struct { Index int - Error error - ErrCode encoding.ErrCode - Retryable bool - Bytes [][]byte - State encoding.PipelineExecutionState + Bytes [][]byte // Mercury values if request is successful + ErrCode encoding.ErrCode // Error code if mercury gives an error + State encoding.PipelineExecutionState // NoPipelineError if no error during execution, otherwise appropriate error + Retryable bool // Applicable if State != NoPipelineError + Error error // non nil if State != NoPipelineError } type MercuryConfigProvider interface { @@ -88,16 +93,13 @@ type HttpClient interface { type MercuryClient interface { // DoRequest makes a data stream request, it manages retries and returns the following: - // state: the state of the pipeline execution, used by our components. - // upkeepFailureReason: the reason for the upkeep failure, used by our components. - // data: the data returned from the data stream. - // retryable: whether the request is retryable. - // retryInterval: the interval to wait before retrying the request, or RetryIntervalTimeout if no more retries are allowed. - // errCode: the error code of the request, to be passed to the user's error handler if applicable. - // error: the raw error that occurred during the request. - // - // Exploratory: consider to merge state/failureReason/errCode into a single object - DoRequest(ctx context.Context, streamsLookup *StreamsLookup, pluginRetryKey string) (encoding.PipelineExecutionState, encoding.UpkeepFailureReason, [][]byte, bool, time.Duration, encoding.ErrCode, error) + // state: the state of the pipeline execution. This field is coupled with err. If state is NoPipelineError then err is nil, otherwise err is non nil and appropriate state is returned + // data: the data returned from the data stream if there's NoPipelineError + // errCode: the errorCode of streams request if data is nil and there's NoPipelineError + // retryable: whether the request is retryable. Only applicable for errored states. + // retryInterval: the interval to wait before retrying the request. Only applicable for errored states. + // err: non nil if some internal error occurs in pipeline + DoRequest(ctx context.Context, streamsLookup *StreamsLookup, upkeepType automationTypes.UpkeepType, pluginRetryKey string) (encoding.PipelineExecutionState, [][]byte, encoding.ErrCode, bool, time.Duration, error) } type StreamsLookupError struct { @@ -132,6 +134,7 @@ type Packer interface { PackGetUpkeepPrivilegeConfig(upkeepId *big.Int) ([]byte, error) UnpackGetUpkeepPrivilegeConfig(resp []byte) ([]byte, error) DecodeStreamsLookupRequest(data []byte) (*StreamsLookupError, error) + PackUserCheckErrorHandler(errCode encoding.ErrCode, extraData []byte) ([]byte, error) } type abiPacker struct { @@ -189,3 +192,9 @@ func (p *abiPacker) UnpackGetUpkeepPrivilegeConfig(resp []byte) ([]byte, error) func (p *abiPacker) PackGetUpkeepPrivilegeConfig(upkeepId *big.Int) ([]byte, error) { return p.registryABI.Pack("getUpkeepPrivilegeConfig", upkeepId) } + +func (p *abiPacker) PackUserCheckErrorHandler(errCode encoding.ErrCode, extraData []byte) ([]byte, error) { + // Convert errCode to bigInt as contract expects uint256 + errCodeBigInt := new(big.Int).SetUint64(uint64(errCode)) + return p.streamsABI.Pack("checkErrorHandler", errCodeBigInt, extraData) +} diff --git a/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/mercury/mercury_test.go b/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/mercury/mercury_test.go index 3854253d48a..200c26482bc 100644 --- a/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/mercury/mercury_test.go +++ b/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/mercury/mercury_test.go @@ -14,6 +14,8 @@ import ( "github.com/stretchr/testify/assert" "github.com/smartcontractkit/chainlink-common/pkg/types" + + automationTypes "github.com/smartcontractkit/chainlink-automation/pkg/v3/types" "github.com/smartcontractkit/chainlink/v2/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/encoding" ) @@ -282,7 +284,7 @@ func Test_CalculateRetryConfigFn(t *testing.T) { cfg := newMercuryConfigMock() var result time.Duration for i := 0; i < tc.times; i++ { - result = CalculateRetryConfigFn("prk", cfg) + result = CalculateStreamsRetryConfigFn(automationTypes.ConditionTrigger, "prk", cfg) } assert.Equal(t, tc.expected, result) }) diff --git a/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/mercury/streams/streams.go b/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/mercury/streams/streams.go index 7f10e97950e..bb58d172062 100644 --- a/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/mercury/streams/streams.go +++ b/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/mercury/streams/streams.go @@ -15,7 +15,6 @@ import ( "github.com/ethereum/go-ethereum/common/hexutil" "github.com/patrickmn/go-cache" - "github.com/smartcontractkit/chainlink-automation/pkg/v3/types" "github.com/smartcontractkit/chainlink-common/pkg/services" ocr2keepers "github.com/smartcontractkit/chainlink-common/pkg/types/automation" @@ -175,15 +174,27 @@ func (s *streams) buildResult(ctx context.Context, i int, checkResult ocr2keeper lookups[i] = streamsLookupResponse } +// Does the requested lookup and sets appropriate fields on checkResult[i] func (s *streams) doLookup(ctx context.Context, wg *sync.WaitGroup, lookup *mercury.StreamsLookup, i int, checkResults []ocr2keepers.CheckResult) { defer wg.Done() - values, err := s.DoMercuryRequest(ctx, lookup, checkResults, i) + values, errCode, err := s.DoMercuryRequest(ctx, lookup, checkResults, i) if err != nil { s.lggr.Errorf("at block %d upkeep %s requested time %s DoMercuryRequest err: %s", lookup.Block, lookup.UpkeepId, lookup.Time, err.Error()) + return + } + + if errCode != encoding.ErrCodeNil { + err = s.CheckErrorHandler(ctx, errCode, lookup, checkResults, i) + if err != nil { + s.lggr.Errorf("at block %d upkeep %s requested time %s CheckErrorHandler err: %s", lookup.Block, lookup.UpkeepId, lookup.Time, err.Error()) + } + return } - if err := s.CheckCallback(ctx, values, lookup, checkResults, i); err != nil { + // Mercury request returned values, call checkCallback + err = s.CheckCallback(ctx, values, lookup, checkResults, i) + if err != nil { s.lggr.Errorf("at block %d upkeep %s requested time %s CheckCallback err: %s", lookup.Block, lookup.UpkeepId, lookup.Time, err.Error()) } } @@ -196,106 +207,90 @@ func (s *streams) CheckCallback(ctx context.Context, values [][]byte, lookup *me return err } - var mercuryBytes hexutil.Bytes + return s.makeCallbackEthCall(ctx, payload, lookup, checkResults, i) +} + +// eth_call to checkCallback and checkErrorHandler and update checkResults[i] accordingly +func (s *streams) makeCallbackEthCall(ctx context.Context, payload []byte, lookup *mercury.StreamsLookup, checkResults []ocr2keepers.CheckResult, i int) error { + var responseBytes hexutil.Bytes args := map[string]interface{}{ "to": s.registry.Address().Hex(), "data": hexutil.Bytes(payload), } - // call checkCallback function at the block which OCR3 has agreed upon - if err = s.client.CallContext(ctx, &mercuryBytes, "eth_call", args, hexutil.EncodeUint64(lookup.Block)); err != nil { + if err := s.client.CallContext(ctx, &responseBytes, "eth_call", args, hexutil.EncodeUint64(lookup.Block)); err != nil { checkResults[i].Retryable = true checkResults[i].PipelineExecutionState = uint8(encoding.RpcFlakyFailure) return err } - s.lggr.Infof("at block %d upkeep %s requested time %s checkCallback mercuryBytes: %s", lookup.Block, lookup.UpkeepId, lookup.Time, hexutil.Encode(mercuryBytes)) + s.lggr.Infof("at block %d upkeep %s requested time %s responseBytes: %s", lookup.Block, lookup.UpkeepId, lookup.Time, hexutil.Encode(responseBytes)) - unpackCallBackState, needed, performData, failureReason, _, err := s.packer.UnpackCheckCallbackResult(mercuryBytes) + unpackCallBackState, needed, performData, failureReason, _, err := s.packer.UnpackCheckCallbackResult(responseBytes) if err != nil { checkResults[i].PipelineExecutionState = uint8(unpackCallBackState) return err } - if failureReason == encoding.UpkeepFailureReasonMercuryCallbackReverted { - checkResults[i].IneligibilityReason = uint8(encoding.UpkeepFailureReasonMercuryCallbackReverted) - s.lggr.Debugf("at block %d upkeep %s requested time %s mercury callback reverts", lookup.Block, lookup.UpkeepId, lookup.Time) - return nil - } + s.lggr.Infof("at block %d upkeep %s requested time %s returns needed: %v, failure reason: %d, perform data: %s", lookup.Block, lookup.UpkeepId, lookup.Time, needed, failureReason, hexutil.Encode(performData)) - if !needed { - checkResults[i].IneligibilityReason = uint8(encoding.UpkeepFailureReasonUpkeepNotNeeded) - s.lggr.Debugf("at block %d upkeep %s requested time %s callback reports upkeep not needed", lookup.Block, lookup.UpkeepId, lookup.Time) - return nil - } - - checkResults[i].IneligibilityReason = uint8(encoding.UpkeepFailureReasonNone) - checkResults[i].Eligible = true + checkResults[i].IneligibilityReason = uint8(failureReason) + checkResults[i].Eligible = needed checkResults[i].PerformData = performData - s.lggr.Infof("at block %d upkeep %s requested time %s CheckCallback successful with perform data: %s", lookup.Block, lookup.UpkeepId, lookup.Time, hexutil.Encode(performData)) return nil } -func (s *streams) DoMercuryRequest(ctx context.Context, lookup *mercury.StreamsLookup, checkResults []ocr2keepers.CheckResult, i int) ([][]byte, error) { - state, reason, values, retryable, retryInterval, errCode, err := encoding.NoPipelineError, encoding.UpkeepFailureReasonInvalidRevertDataInput, [][]byte{}, false, 0*time.Second, encoding.ErrCodeNil, fmt.Errorf("invalid revert data input: feed param key %s, time param key %s, feeds %s", lookup.FeedParamKey, lookup.TimeParamKey, lookup.Feeds) +// Does the mercury request for the checkResult. Returns either the looked up values or an error code if something is wrong with mercury +// In case of any pipeline processing issues, returns an error and also sets approriate state on the checkResult itself +func (s *streams) DoMercuryRequest(ctx context.Context, lookup *mercury.StreamsLookup, checkResults []ocr2keepers.CheckResult, i int) ([][]byte, encoding.ErrCode, error) { + var state, values, errCode, retryable, retryInterval = encoding.NoPipelineError, [][]byte{}, encoding.ErrCodeNil, false, 0 * time.Second + var err error pluginRetryKey := generatePluginRetryKey(checkResults[i].WorkID, lookup.Block) + upkeepType := core.GetUpkeepType(checkResults[i].UpkeepID) if lookup.IsMercuryV02() { - state, reason, values, retryable, retryInterval, errCode, err = s.v02Client.DoRequest(ctx, lookup, pluginRetryKey) + state, values, errCode, retryable, retryInterval, err = s.v02Client.DoRequest(ctx, lookup, upkeepType, pluginRetryKey) } else if lookup.IsMercuryV03() { - state, reason, values, retryable, retryInterval, errCode, err = s.v03Client.DoRequest(ctx, lookup, pluginRetryKey) + state, values, errCode, retryable, retryInterval, err = s.v03Client.DoRequest(ctx, lookup, upkeepType, pluginRetryKey) } if err != nil { + // Something went wrong in the pipeline processing, set the state, retry reason and return checkResults[i].Retryable = retryable checkResults[i].RetryInterval = retryInterval checkResults[i].PipelineExecutionState = uint8(state) - checkResults[i].IneligibilityReason = uint8(reason) - retryTimeout := retryInterval == mercury.RetryIntervalTimeout - if retryTimeout { - // in case of retry timeout, setting retryable to false - checkResults[i].Retryable = false - } - _, eCodeErr := s.handleErrCode(&checkResults[i], errCode, err) - if eCodeErr != nil { - return nil, eCodeErr - } - s.lggr.Debugf("at block %d upkeep %s requested time %s doMercuryRequest err: %s, errCode: %d", lookup.Block, lookup.UpkeepId, lookup.Time, err.Error(), errCode) - return nil, err // TODO: remove this line once we have error handler + s.lggr.Debugf("at block %d upkeep %s requested time %s doMercuryRequest err: %s", lookup.Block, lookup.UpkeepId, lookup.Time, err.Error()) + return nil, encoding.ErrCodeNil, err + } + + if errCode != encoding.ErrCodeNil { + s.lggr.Infof("at block %d upkeep %s requested time %s doMercuryRequest error code: %d", lookup.Block, lookup.UpkeepId, lookup.Time, errCode) + return nil, errCode, nil } for j, v := range values { s.lggr.Infof("at block %d upkeep %s requested time %s doMercuryRequest values[%d]: %s", lookup.Block, lookup.UpkeepId, lookup.Time, j, hexutil.Encode(v)) } - return values, nil + return values, encoding.ErrCodeNil, nil } -// TODO: complete this function for preparing values for error handler -func (s *streams) handleErrCode(result *ocr2keepers.CheckResult, errCode encoding.ErrCode, err error) ([][]byte, error) { - upkeepType := core.GetUpkeepType(result.UpkeepID) - switch upkeepType { - case types.LogTrigger: - switch errCode { - case encoding.ErrCodePartialContent, encoding.ErrCodeDataStreamsError: - if result.RetryInterval != mercury.RetryIntervalTimeout { - return nil, err - } - case encoding.ErrCodeBadRequest, encoding.ErrCodeUnauthorized, encoding.ErrCodeEncodingError: - default: - return nil, err - } - case types.ConditionTrigger: - switch errCode { - case encoding.ErrCodePartialContent, encoding.ErrCodeDataStreamsError, encoding.ErrCodeBadRequest, encoding.ErrCodeUnauthorized, encoding.ErrCodeEncodingError: - default: - return nil, err - } - default: - return nil, err +func (s *streams) CheckErrorHandler(ctx context.Context, errCode encoding.ErrCode, lookup *mercury.StreamsLookup, checkResults []ocr2keepers.CheckResult, i int) error { + userPayload, err := s.packer.PackUserCheckErrorHandler(errCode, lookup.ExtraData) + if err != nil { + checkResults[i].Retryable = false + checkResults[i].PipelineExecutionState = uint8(encoding.PackUnpackDecodeFailed) + return err } - // TODO: prepare values for error handler - return [][]byte{}, nil + + payload, err := s.abi.Pack("executeCallback", lookup.UpkeepId, userPayload) + if err != nil { + checkResults[i].Retryable = false + checkResults[i].PipelineExecutionState = uint8(encoding.PackUnpackDecodeFailed) + return err + } + + return s.makeCallbackEthCall(ctx, payload, lookup, checkResults, i) } // AllowedToUseMercury retrieves upkeep's administrative offchain config and decode a mercuryEnabled bool to indicate if @@ -338,7 +333,7 @@ func (s *streams) AllowedToUseMercury(opts *bind.CallOpts, upkeepId *big.Int) (s var privilegeConfig UpkeepPrivilegeConfig if err = json.Unmarshal(upkeepPrivilegeConfigBytes, &privilegeConfig); err != nil { - return encoding.MercuryUnmarshalError, encoding.UpkeepFailureReasonNone, false, false, fmt.Errorf("failed to unmarshal privilege config: %v", err) + return encoding.PrivilegeConfigUnmarshalError, encoding.UpkeepFailureReasonNone, false, false, fmt.Errorf("failed to unmarshal privilege config: %v", err) } s.mercuryConfig.SetUpkeepAllowed(upkeepId.String(), privilegeConfig.MercuryEnabled, cache.DefaultExpiration) diff --git a/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/mercury/streams/streams_test.go b/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/mercury/streams/streams_test.go index 0ea87c78059..e58f046c239 100644 --- a/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/mercury/streams/streams_test.go +++ b/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/mercury/streams/streams_test.go @@ -11,8 +11,6 @@ import ( "testing" "time" - clatypes "github.com/smartcontractkit/chainlink-automation/pkg/v3/types" - "github.com/smartcontractkit/chainlink-common/pkg/types" "github.com/pkg/errors" @@ -30,7 +28,6 @@ import ( iregistry21 "github.com/smartcontractkit/chainlink/v2/core/gethwrappers/generated/i_keeper_registry_master_wrapper_2_1" "github.com/smartcontractkit/chainlink/v2/core/internal/testutils" "github.com/smartcontractkit/chainlink/v2/core/logger" - "github.com/smartcontractkit/chainlink/v2/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/core" "github.com/smartcontractkit/chainlink/v2/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/encoding" "github.com/smartcontractkit/chainlink/v2/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/mercury" v02 "github.com/smartcontractkit/chainlink/v2/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/mercury/v02" @@ -344,7 +341,7 @@ func TestStreams_AllowedToUseMercury(t *testing.T) { { name: "failure - cannot unmarshal privilege config", err: fmt.Errorf("failed to unmarshal privilege config: invalid character '\\x00' looking for beginning of value"), - state: encoding.MercuryUnmarshalError, + state: encoding.PrivilegeConfigUnmarshalError, config: []byte{0, 1}, registry: &mockRegistry{ GetUpkeepPrivilegeConfigFn: func(opts *bind.CallOpts, upkeepId *big.Int) ([]byte, error) { @@ -823,104 +820,3 @@ func TestStreams_StreamsLookup(t *testing.T) { }) } } - -func Test_HandleErrCode(t *testing.T) { - partialContentErr := errors.New("206") - - tests := []struct { - name string - checkResult *ocr2keepers.CheckResult - errCode encoding.ErrCode - err error - expectedValues [][]byte - expectedErr error - }{ - { - name: "log trigger no error", - checkResult: &ocr2keepers.CheckResult{ - UpkeepID: core.GenUpkeepID(clatypes.LogTrigger, "111"), - }, - errCode: encoding.ErrCodeNil, - err: nil, - expectedValues: [][]byte{}, - expectedErr: nil, - }, - { - name: "conditional trigger no error", - checkResult: &ocr2keepers.CheckResult{ - UpkeepID: core.GenUpkeepID(clatypes.ConditionTrigger, "222"), - }, - errCode: encoding.ErrCodeNil, - err: nil, - expectedValues: [][]byte{}, - expectedErr: nil, - }, - { - name: "log trigger error code bad request", - checkResult: &ocr2keepers.CheckResult{ - UpkeepID: core.GenUpkeepID(clatypes.LogTrigger, "111"), - }, - errCode: encoding.ErrCodeBadRequest, - err: errors.New("400"), - expectedValues: [][]byte{}, - expectedErr: nil, - }, - { - name: "conditional trigger bad request", - checkResult: &ocr2keepers.CheckResult{ - UpkeepID: core.GenUpkeepID(clatypes.ConditionTrigger, "222"), - }, - errCode: encoding.ErrCodeBadRequest, - err: errors.New("400"), - expectedValues: [][]byte{}, - expectedErr: nil, - }, - { - name: "log trigger error code partial content with retry timeout", - checkResult: &ocr2keepers.CheckResult{ - UpkeepID: core.GenUpkeepID(clatypes.LogTrigger, "111"), - RetryInterval: mercury.RetryIntervalTimeout, - Retryable: true, - }, - errCode: encoding.ErrCodePartialContent, - err: errors.New("206"), - expectedValues: [][]byte{}, - expectedErr: nil, - }, - { - name: "log trigger error code partial content without retry timeout", - checkResult: &ocr2keepers.CheckResult{ - UpkeepID: core.GenUpkeepID(clatypes.LogTrigger, "111"), - RetryInterval: time.Second, - Retryable: true, - }, - errCode: encoding.ErrCodePartialContent, - err: partialContentErr, - expectedValues: nil, - expectedErr: partialContentErr, - }, - { - name: "conditional trigger partial content", - checkResult: &ocr2keepers.CheckResult{ - UpkeepID: core.GenUpkeepID(clatypes.ConditionTrigger, "222"), - RetryInterval: time.Second, - Retryable: true, - }, - errCode: encoding.ErrCodePartialContent, - err: errors.New("206"), - expectedValues: [][]byte{}, - expectedErr: nil, - }, - } - - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - s := setupStreams(t) - defer s.Close() - - values, err := s.handleErrCode(tt.checkResult, tt.errCode, tt.err) - assert.Equal(t, len(tt.expectedValues), len(values)) - assert.Equal(t, tt.expectedErr, err) - }) - } -} diff --git a/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/mercury/v02/request.go b/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/mercury/v02/request.go index 5f8e91377a2..44205491e1b 100644 --- a/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/mercury/v02/request.go +++ b/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/mercury/v02/request.go @@ -13,6 +13,7 @@ import ( "github.com/avast/retry-go/v4" "github.com/ethereum/go-ethereum/common/hexutil" + automationTypes "github.com/smartcontractkit/chainlink-automation/pkg/v3/types" "github.com/smartcontractkit/chainlink-common/pkg/services" @@ -53,11 +54,11 @@ func NewClient(mercuryConfig mercury.MercuryConfigProvider, httpClient mercury.H } } -func (c *client) DoRequest(ctx context.Context, streamsLookup *mercury.StreamsLookup, pluginRetryKey string) (encoding.PipelineExecutionState, encoding.UpkeepFailureReason, [][]byte, bool, time.Duration, encoding.ErrCode, error) { +func (c *client) DoRequest(ctx context.Context, streamsLookup *mercury.StreamsLookup, upkeepType automationTypes.UpkeepType, pluginRetryKey string) (encoding.PipelineExecutionState, [][]byte, encoding.ErrCode, bool, time.Duration, error) { resultLen := len(streamsLookup.Feeds) ch := make(chan mercury.MercuryData, resultLen) if len(streamsLookup.Feeds) == 0 { - return encoding.NoPipelineError, encoding.UpkeepFailureReasonInvalidRevertDataInput, [][]byte{}, false, 0 * time.Second, encoding.ErrCodeNil, fmt.Errorf("invalid revert data input: feed param key %s, time param key %s, feeds %s", streamsLookup.FeedParamKey, streamsLookup.TimeParamKey, streamsLookup.Feeds) + return encoding.NoPipelineError, [][]byte{}, encoding.ErrCodeStreamsBadRequest, false, 0 * time.Second, nil } for i := range streamsLookup.Feeds { // TODO (AUTO-7209): limit the number of concurrent requests @@ -67,33 +68,68 @@ func (c *client) DoRequest(ctx context.Context, streamsLookup *mercury.StreamsLo }) } + state := encoding.NoPipelineError var reqErr error - var retryInterval time.Duration + retryable := true + allFeedsPipelineSuccess := true + allFeedsReturnedValues := true var errCode encoding.ErrCode results := make([][]byte, len(streamsLookup.Feeds)) - retryable := true - allSuccess := true - // in v0.2, use the last execution error as the state, if no execution errors, state will be no error - state := encoding.NoPipelineError + + // in v0.2, when combining results for multiple feed requests + // if any request resulted in pipeline execution error then use the last execution error as the state + // if no execution errors, then check if any feed returned an error code, if so use the last error code for i := 0; i < resultLen; i++ { m := <-ch if m.Error != nil { + state = m.State reqErr = errors.Join(reqErr, m.Error) retryable = retryable && m.Retryable - errCode = m.ErrCode - allSuccess = false - if m.State != encoding.NoPipelineError { - state = m.State + if m.ErrCode != encoding.ErrCodeNil { + // Some pipeline errors can get converted to error codes if retries are exhausted + errCode = m.ErrCode } + allFeedsPipelineSuccess = false continue } + if m.ErrCode != encoding.ErrCodeNil { + errCode = m.ErrCode + allFeedsReturnedValues = false + continue + } + // Feed request didn't face a pipeline error and didn't return an error code results[m.Index] = m.Bytes[0] } - if retryable && !allSuccess { - retryInterval = mercury.CalculateRetryConfigFn(pluginRetryKey, c.mercuryConfig) + + if !allFeedsPipelineSuccess { + // Some feeds faced a pipeline error during execution + // If any error was non retryable then just return the state and error + if !retryable { + return state, nil, errCode, retryable, 0 * time.Second, reqErr + } + // If errors were retryable then calculate retry interval + retryInterval := mercury.CalculateStreamsRetryConfigFn(upkeepType, pluginRetryKey, c.mercuryConfig) + if retryInterval != mercury.RetryIntervalTimeout { + // Return the retyrable state with appropriate retry interval + return state, nil, errCode, retryable, retryInterval, reqErr + } + + // Now we have exhausted all retries and we have an error code to expose to user expose it with noPipelineError + // otherwise expose the last pipeline error to pipeline runner (not the user) + if errCode != encoding.ErrCodeNil { + return encoding.NoPipelineError, nil, errCode, false, 0 * time.Second, nil + } + return state, nil, errCode, false, 0 * time.Second, reqErr + } + + // All feeds faced no pipeline error + // If any feed request returned an error code, return the error code with empty values, else return the values + if !allFeedsReturnedValues { + return encoding.NoPipelineError, nil, errCode, false, 0 * time.Second, reqErr } - // only retry when not all successful AND none are not retryable - return state, encoding.UpkeepFailureReasonNone, results, retryable && !allSuccess, retryInterval, errCode, reqErr + + // All success, return the results + return state, results, encoding.ErrCodeNil, false, 0 * time.Second, nil } func (c *client) singleFeedRequest(ctx context.Context, ch chan<- mercury.MercuryData, index int, sl *mercury.StreamsLookup) { @@ -110,7 +146,8 @@ func (c *client) singleFeedRequest(ctx context.Context, ch chan<- mercury.Mercur httpRequest, err = http.NewRequestWithContext(ctx, http.MethodGet, reqUrl, nil) if err != nil { - ch <- mercury.MercuryData{Index: index, Error: err, Retryable: false, State: encoding.InvalidMercuryRequest} + // Not a pipeline error, a bad streams request + ch <- mercury.MercuryData{Index: index, ErrCode: encoding.ErrCodeStreamsBadRequest, Bytes: nil, State: encoding.NoPipelineError} return } @@ -142,22 +179,38 @@ func (c *client) singleFeedRequest(ctx context.Context, ch chan<- mercury.Mercur defer httpResponse.Body.Close() if responseBody, err = io.ReadAll(httpResponse.Body); err != nil { - state = encoding.InvalidMercuryResponse - return err + // Not a pipeline error, a bad streams response, send back error code + ch <- mercury.MercuryData{ + Index: index, + Bytes: [][]byte{}, + ErrCode: encoding.ErrCodeStreamsBadResponse, + State: encoding.NoPipelineError, + } + sent = true + return nil } switch httpResponse.StatusCode { case http.StatusNotFound, http.StatusInternalServerError, http.StatusBadGateway, http.StatusServiceUnavailable, http.StatusGatewayTimeout: + // Considered as pipeline error, but if retry attempts go over threshold, is changed upstream to ErrCode c.lggr.Warnf("at block %s upkeep %s received status code %d for feed %s", sl.Time.String(), sl.UpkeepId.String(), httpResponse.StatusCode, sl.Feeds[index]) - retryable = true state = encoding.MercuryFlakyFailure - errCode = encoding.HttpToErrCode(httpResponse.StatusCode) + retryable = true + errCode = encoding.HttpToStreamsErrCode(httpResponse.StatusCode) return errors.New(strconv.FormatInt(int64(httpResponse.StatusCode), 10)) case http.StatusOK: // continue default: - state = encoding.InvalidMercuryRequest - return fmt.Errorf("at block %s upkeep %s received status code %d for feed %s", sl.Time.String(), sl.UpkeepId.String(), httpResponse.StatusCode, sl.Feeds[index]) + // Not considered as a pipeline error, a bad streams response with unknown status code. Send back to user as error code + c.lggr.Errorf("at block %s upkeep %s received unhandled status code %d for feed %s", sl.Time.String(), sl.UpkeepId.String(), httpResponse.StatusCode, sl.Feeds[index]) + ch <- mercury.MercuryData{ + Index: index, + Bytes: [][]byte{}, + ErrCode: encoding.ErrCodeStreamsUnknownError, + State: encoding.NoPipelineError, + } + sent = true + return nil } c.lggr.Debugf("at block %s upkeep %s received status code %d from mercury v0.2 with BODY=%s", sl.Time.String(), sl.UpkeepId.String(), httpResponse.StatusCode, hexutil.Encode(responseBody)) @@ -165,15 +218,25 @@ func (c *client) singleFeedRequest(ctx context.Context, ch chan<- mercury.Mercur var m MercuryV02Response if err = json.Unmarshal(responseBody, &m); err != nil { c.lggr.Warnf("at block %s upkeep %s failed to unmarshal body to MercuryV02Response for feed %s: %v", sl.Time.String(), sl.UpkeepId.String(), sl.Feeds[index], err) - state = encoding.MercuryUnmarshalError - errCode = encoding.ErrCodeEncodingError - return err + ch <- mercury.MercuryData{ + Index: index, + Bytes: [][]byte{}, + ErrCode: encoding.ErrCodeStreamsBadResponse, + State: encoding.NoPipelineError, + } + sent = true + return nil } if blobBytes, err = hexutil.Decode(m.ChainlinkBlob); err != nil { c.lggr.Warnf("at block %s upkeep %s failed to decode chainlinkBlob %s for feed %s: %v", sl.Time.String(), sl.UpkeepId.String(), m.ChainlinkBlob, sl.Feeds[index], err) - state = encoding.InvalidMercuryResponse - errCode = encoding.ErrCodeEncodingError - return err + ch <- mercury.MercuryData{ + Index: index, + Bytes: [][]byte{}, + ErrCode: encoding.ErrCodeStreamsBadResponse, + State: encoding.NoPipelineError, + } + sent = true + return nil } ch <- mercury.MercuryData{ Index: index, @@ -197,10 +260,10 @@ func (c *client) singleFeedRequest(ctx context.Context, ch chan<- mercury.Mercur ch <- mercury.MercuryData{ Index: index, Bytes: [][]byte{}, - Retryable: retryable, - Error: fmt.Errorf("failed to request feed for %s: %w", sl.Feeds[index], retryErr), ErrCode: errCode, State: state, + Retryable: retryable, + Error: fmt.Errorf("failed to request feed for %s: %w", sl.Feeds[index], retryErr), } } } diff --git a/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/mercury/v02/v02_request_test.go b/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/mercury/v02/v02_request_test.go index ac8cbfb866b..c839da26533 100644 --- a/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/mercury/v02/v02_request_test.go +++ b/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/mercury/v02/v02_request_test.go @@ -11,6 +11,7 @@ import ( "testing" "time" + automationTypes "github.com/smartcontractkit/chainlink-automation/pkg/v3/types" "github.com/smartcontractkit/chainlink-common/pkg/types" "github.com/ethereum/go-ethereum/common/hexutil" @@ -331,7 +332,7 @@ func TestV02_DoMercuryRequestV02(t *testing.T) { expectedRetryable: true, pluginRetries: 0, expectedRetryInterval: 1 * time.Second, - expectedErrCode: encoding.ErrCodeDataStreamsError, + expectedErrCode: encoding.ErrCodeStreamsInternalError, expectedError: errors.New("failed to request feed for 0x4554482d5553442d415242495452554d2d544553544e45540000000000000000: All attempts fail:\n#1: 500\n#2: 500\n#3: 500"), state: encoding.MercuryFlakyFailure, }, @@ -353,7 +354,7 @@ func TestV02_DoMercuryRequestV02(t *testing.T) { expectedValues: [][]byte{nil}, expectedRetryable: true, expectedRetryInterval: 5 * time.Second, - expectedErrCode: encoding.ErrCodeDataStreamsError, + expectedErrCode: encoding.ErrCodeStreamsInternalError, expectedError: errors.New("failed to request feed for 0x4554482d5553442d415242495452554d2d544553544e45540000000000000000: All attempts fail:\n#1: 500\n#2: 500\n#3: 500"), state: encoding.MercuryFlakyFailure, }, @@ -374,7 +375,7 @@ func TestV02_DoMercuryRequestV02(t *testing.T) { mockChainlinkBlobs: []string{"0x00066dfcd1ed2d95b18c948dbc5bd64c687afe93e4ca7d663ddec14c20090ad80000000000000000000000000000000000000000000000000000000000081401000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000e000000000000000000000000000000000000000000000000000000000000002200000000000000000000000000000000000000000000000000000000000000280000100000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000001204554482d5553442d415242495452554d2d544553544e455400000000000000000000000000000000000000000000000000000000000000000000000064891c98000000000000000000000000000000000000000000000000000000289ad8d367000000000000000000000000000000000000000000000000000000289acf0b38000000000000000000000000000000000000000000000000000000289b3da40000000000000000000000000000000000000000000000000000000000018ae7ce74d9fa252a8983976eab600dc7590c778d04813430841bc6e765c34cd81a168d00000000000000000000000000000000000000000000000000000000018ae7cb0000000000000000000000000000000000000000000000000000000064891c98000000000000000000000000000000000000000000000000000000000000000260412b94e525ca6cedc9f544fd86f77606d52fe731a5d069dbe836a8bfc0fb8c911963b0ae7a14971f3b4621bffb802ef0605392b9a6c89c7fab1df8633a5ade00000000000000000000000000000000000000000000000000000000000000024500c2f521f83fba5efc2bf3effaaedde43d0a4adff785c1213b712a3aed0d8157642a84324db0cf9695ebd27708d4608eb0337e0dd87b0e43f0fa70c700d911"}, expectedValues: [][]byte{nil}, expectedRetryInterval: mercury.RetryIntervalTimeout, - expectedErrCode: encoding.ErrCodeDataStreamsError, + expectedErrCode: encoding.ErrCodeStreamsInternalError, expectedRetryable: true, expectedError: errors.New("failed to request feed for 0x4554482d5553442d415242495452554d2d544553544e45540000000000000000: All attempts fail:\n#1: 500\n#2: 500\n#3: 500"), state: encoding.MercuryFlakyFailure, @@ -396,7 +397,7 @@ func TestV02_DoMercuryRequestV02(t *testing.T) { expectedValues: [][]byte{nil}, expectedRetryable: false, expectedError: errors.New("failed to request feed for 0x4554482d5553442d415242495452554d2d544553544e45540000000000000000: All attempts fail:\n#1: at block 25880526 upkeep 88786950015966611018675766524283132478093844178961698330929478019253453382042 received status code 429 for feed 0x4554482d5553442d415242495452554d2d544553544e45540000000000000000"), - state: encoding.InvalidMercuryRequest, + //state: encoding.InvalidMercuryRequest, TODO: Fix this }, { name: "failure - no feeds", @@ -456,7 +457,8 @@ func TestV02_DoMercuryRequestV02(t *testing.T) { } c.httpClient = hc - state, reason, values, retryable, retryInterval, errCode, reqErr := c.DoRequest(testutils.Context(t), tt.lookup, tt.pluginRetryKey) + reason := encoding.UpkeepFailureReasonNone // TODO: Fix test + state, values, errCode, retryable, retryInterval, reqErr := c.DoRequest(testutils.Context(t), tt.lookup, automationTypes.ConditionTrigger, tt.pluginRetryKey) assert.Equal(t, tt.expectedValues, values) assert.Equal(t, tt.expectedRetryable, retryable) if retryable { diff --git a/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/mercury/v03/request.go b/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/mercury/v03/request.go index 7e90a2f547c..15a6e16766d 100644 --- a/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/mercury/v03/request.go +++ b/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/mercury/v03/request.go @@ -13,6 +13,8 @@ import ( "github.com/avast/retry-go/v4" "github.com/ethereum/go-ethereum/common/hexutil" + automationTypes "github.com/smartcontractkit/chainlink-automation/pkg/v3/types" + "github.com/smartcontractkit/chainlink-common/pkg/services" "github.com/smartcontractkit/chainlink/v2/core/logger" @@ -61,9 +63,9 @@ func NewClient(mercuryConfig mercury.MercuryConfigProvider, httpClient mercury.H } } -func (c *client) DoRequest(ctx context.Context, streamsLookup *mercury.StreamsLookup, pluginRetryKey string) (encoding.PipelineExecutionState, encoding.UpkeepFailureReason, [][]byte, bool, time.Duration, encoding.ErrCode, error) { +func (c *client) DoRequest(ctx context.Context, streamsLookup *mercury.StreamsLookup, upkeepType automationTypes.UpkeepType, pluginRetryKey string) (encoding.PipelineExecutionState, [][]byte, encoding.ErrCode, bool, time.Duration, error) { if len(streamsLookup.Feeds) == 0 { - return encoding.NoPipelineError, encoding.UpkeepFailureReasonInvalidRevertDataInput, [][]byte{}, false, 0 * time.Second, encoding.ErrCodeNil, fmt.Errorf("invalid revert data input: feed param key %s, time param key %s, feeds %s", streamsLookup.FeedParamKey, streamsLookup.TimeParamKey, streamsLookup.Feeds) + return encoding.NoPipelineError, [][]byte{}, encoding.ErrCodeStreamsBadRequest, false, 0 * time.Second, nil } resultLen := 1 // Only 1 multi-feed request is made for all feeds ch := make(chan mercury.MercuryData, resultLen) @@ -71,27 +73,29 @@ func (c *client) DoRequest(ctx context.Context, streamsLookup *mercury.StreamsLo c.multiFeedsRequest(ctx, ch, streamsLookup) }) - var reqErr error - var retryInterval time.Duration - var errCode encoding.ErrCode - results := make([][]byte, len(streamsLookup.Feeds)) - retryable := false - state := encoding.NoPipelineError - m := <-ch if m.Error != nil { - reqErr = m.Error - retryable = m.Retryable - state = m.State - errCode = m.ErrCode - if retryable { - retryInterval = mercury.CalculateRetryConfigFn(pluginRetryKey, c.mercuryConfig) + // There was a pipeline error during execution + // If error was non retryable then just return the state and error + if !m.Retryable { + return m.State, nil, m.ErrCode, m.Retryable, 0 * time.Second, m.Error } - } else { - results = m.Bytes + // If errors were retryable then calculate retry interval + retryInterval := mercury.CalculateStreamsRetryConfigFn(upkeepType, pluginRetryKey, c.mercuryConfig) + if retryInterval != mercury.RetryIntervalTimeout { + // Return the retyrable state with appropriate retry interval + return m.State, nil, m.ErrCode, m.Retryable, retryInterval, m.Error + } + + // Now we have exhausted all retries and we have an error code to expose to user expose it with noPipelineError + // otherwise expose the pipeline error to pipeline runner (not the user) as non retryable + if m.ErrCode != encoding.ErrCodeNil { + return encoding.NoPipelineError, nil, m.ErrCode, false, 0 * time.Second, nil + } + return m.State, nil, m.ErrCode, false, 0 * time.Second, m.Error } - return state, encoding.UpkeepFailureReasonNone, results, retryable, retryInterval, errCode, reqErr + return encoding.NoPipelineError, m.Bytes, encoding.ErrCodeNil, false, 0 * time.Second, nil } func (c *client) multiFeedsRequest(ctx context.Context, ch chan<- mercury.MercuryData, sl *mercury.StreamsLookup) { @@ -112,7 +116,8 @@ func (c *client) multiFeedsRequest(ctx context.Context, ch chan<- mercury.Mercur req, err := http.NewRequestWithContext(ctx, http.MethodGet, reqUrl, nil) if err != nil { - ch <- mercury.MercuryData{Index: 0, Error: err, Retryable: false, State: encoding.InvalidMercuryRequest} + // Not a pipeline error, a bad streams request + ch <- mercury.MercuryData{Index: 0, ErrCode: encoding.ErrCodeStreamsBadRequest, State: encoding.NoPipelineError} return } @@ -130,9 +135,9 @@ func (c *client) multiFeedsRequest(ctx context.Context, ch chan<- mercury.Mercur // in the case of multiple retries here, use the last attempt's data state := encoding.NoPipelineError + errCode := encoding.ErrCodeNil retryable := false sent := false - errCode := encoding.ErrCodeNil retryErr := retry.Do( func() error { retryable = false @@ -147,52 +152,73 @@ func (c *client) multiFeedsRequest(ctx context.Context, ch chan<- mercury.Mercur body, err := io.ReadAll(resp.Body) if err != nil { - retryable = false - state = encoding.InvalidMercuryResponse - return err + // Not a pipeline error, a bad streams response, send back error code + ch <- mercury.MercuryData{ + Index: 0, + ErrCode: encoding.ErrCodeStreamsBadResponse, + State: encoding.NoPipelineError, + } + sent = true + return nil } c.lggr.Infof("at timestamp %s upkeep %s received status code %d from mercury v0.3", sl.Time.String(), sl.UpkeepId.String(), resp.StatusCode) switch resp.StatusCode { case http.StatusUnauthorized: - retryable = false - state = encoding.UpkeepNotAuthorized - errCode = encoding.HttpToErrCode(resp.StatusCode) - return fmt.Errorf("at timestamp %s upkeep %s received status code %d from mercury v0.3, most likely this is caused by unauthorized upkeep", sl.Time.String(), sl.UpkeepId.String(), resp.StatusCode) + c.lggr.Errorf("at timestamp %s upkeep %s received status code %d from mercury v0.3, most likely this is caused by unauthorized upkeep", sl.Time.String(), sl.UpkeepId.String(), resp.StatusCode) + ch <- mercury.MercuryData{ + Index: 0, + ErrCode: encoding.ErrCodeStreamsUnauthorized, + State: encoding.NoPipelineError, + } + sent = true + return nil case http.StatusBadRequest: - retryable = false - state = encoding.InvalidMercuryRequest - errCode = encoding.HttpToErrCode(resp.StatusCode) - return fmt.Errorf("at timestamp %s upkeep %s received status code %d from mercury v0.3, most likely this is caused by invalid format of timestamp", sl.Time.String(), sl.UpkeepId.String(), resp.StatusCode) + c.lggr.Errorf("at timestamp %s upkeep %s received status code %d from mercury v0.3, most likely this is caused by invalid format of timestamp", sl.Time.String(), sl.UpkeepId.String(), resp.StatusCode) + ch <- mercury.MercuryData{ + Index: 0, + ErrCode: encoding.HttpToStreamsErrCode(resp.StatusCode), + State: encoding.NoPipelineError, + } + sent = true + return nil case http.StatusInternalServerError, http.StatusBadGateway, http.StatusServiceUnavailable, http.StatusGatewayTimeout: retryable = true state = encoding.MercuryFlakyFailure - errCode = encoding.HttpToErrCode(resp.StatusCode) + errCode = encoding.HttpToStreamsErrCode(resp.StatusCode) return fmt.Errorf("%d", resp.StatusCode) case http.StatusPartialContent: // TODO (AUTO-5044): handle response code 206 entirely with errors field parsing c.lggr.Warnf("at timestamp %s upkeep %s requested [%s] feeds but mercury v0.3 server returned 206 status, treating it as 404 and retrying", sl.Time.String(), sl.UpkeepId.String(), sl.Feeds) retryable = true state = encoding.MercuryFlakyFailure - errCode = encoding.HttpToErrCode(resp.StatusCode) + errCode = encoding.HttpToStreamsErrCode(resp.StatusCode) return fmt.Errorf("%d", http.StatusPartialContent) case http.StatusOK: // continue default: - retryable = false - state = encoding.InvalidMercuryRequest - errCode = encoding.ErrCodeBadRequest - return fmt.Errorf("at timestamp %s upkeep %s received status code %d from mercury v0.3", sl.Time.String(), sl.UpkeepId.String(), resp.StatusCode) + // Not considered as a pipeline error, a bad streams response with unknown status code. Send back to user as error code + c.lggr.Errorf("at timestamp %s upkeep %s received status code %d from mercury v0.3", sl.Time.String(), sl.UpkeepId.String(), resp.StatusCode) + ch <- mercury.MercuryData{ + Index: 0, + ErrCode: encoding.ErrCodeStreamsUnknownError, + State: encoding.NoPipelineError, + } + sent = true + return nil } c.lggr.Debugf("at block %s upkeep %s received status code %d from mercury v0.3 with BODY=%s", sl.Time.String(), sl.UpkeepId.String(), resp.StatusCode, hexutil.Encode(body)) var response MercuryV03Response if err := json.Unmarshal(body, &response); err != nil { c.lggr.Warnf("at timestamp %s upkeep %s failed to unmarshal body to MercuryV03Response from mercury v0.3: %v", sl.Time.String(), sl.UpkeepId.String(), err) - retryable = false - state = encoding.MercuryUnmarshalError - errCode = encoding.ErrCodeEncodingError - return err + ch <- mercury.MercuryData{ + Index: 0, + ErrCode: encoding.ErrCodeStreamsBadResponse, + State: encoding.NoPipelineError, + } + sent = true + return nil } // in v0.3, if some feeds are not available, the server will only return available feeds, but we need to make sure ALL feeds are retrieved before calling user contract @@ -205,7 +231,7 @@ func (c *client) multiFeedsRequest(ctx context.Context, ch chan<- mercury.Mercur c.lggr.Warnf("at timestamp %s upkeep %s mercury v0.3 server returned 206 status with [%s] reports while we requested [%s] feeds, retrying", sl.Time.String(), sl.UpkeepId.String(), receivedFeeds, sl.Feeds) retryable = true state = encoding.MercuryFlakyFailure - errCode = encoding.HttpToErrCode(http.StatusPartialContent) + errCode = encoding.HttpToStreamsErrCode(http.StatusPartialContent) return fmt.Errorf("%d", http.StatusNotFound) } var reportBytes [][]byte @@ -213,18 +239,20 @@ func (c *client) multiFeedsRequest(ctx context.Context, ch chan<- mercury.Mercur b, err := hexutil.Decode(rsp.FullReport) if err != nil { c.lggr.Warnf("at timestamp %s upkeep %s failed to decode reportBlob %s: %v", sl.Time.String(), sl.UpkeepId.String(), rsp.FullReport, err) - retryable = false - state = encoding.InvalidMercuryResponse - errCode = encoding.ErrCodeEncodingError - return err + ch <- mercury.MercuryData{ + Index: 0, + ErrCode: encoding.ErrCodeStreamsBadResponse, + State: encoding.NoPipelineError, + } + sent = true + return nil } reportBytes = append(reportBytes, b) } ch <- mercury.MercuryData{ - Index: 0, - Bytes: reportBytes, - Retryable: false, - State: encoding.NoPipelineError, + Index: 0, + Bytes: reportBytes, + State: encoding.NoPipelineError, } sent = true return nil diff --git a/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/mercury/v03/v03_request_test.go b/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/mercury/v03/v03_request_test.go index 616226f1d4c..eaead032be0 100644 --- a/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/mercury/v03/v03_request_test.go +++ b/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/mercury/v03/v03_request_test.go @@ -9,6 +9,7 @@ import ( "testing" "time" + automationTypes "github.com/smartcontractkit/chainlink-automation/pkg/v3/types" "github.com/smartcontractkit/chainlink-common/pkg/types" "github.com/ethereum/go-ethereum/common/hexutil" @@ -164,7 +165,8 @@ func TestV03_DoMercuryRequestV03(t *testing.T) { } c.httpClient = hc - state, reason, values, retryable, retryInterval, errCode, reqErr := c.DoRequest(testutils.Context(t), tt.lookup, tt.pluginRetryKey) + reason := encoding.UpkeepFailureReasonNone // TODO: Fix test + state, values, errCode, retryable, retryInterval, reqErr := c.DoRequest(testutils.Context(t), tt.lookup, automationTypes.ConditionTrigger, tt.pluginRetryKey) assert.Equal(t, tt.expectedValues, values) assert.Equal(t, tt.expectedRetryable, retryable)