diff --git a/core/scripts/chaincli/handler/debug.go b/core/scripts/chaincli/handler/debug.go index 5947337b187..815a0c9a030 100644 --- a/core/scripts/chaincli/handler/debug.go +++ b/core/scripts/chaincli/handler/debug.go @@ -18,13 +18,19 @@ import ( gethcommon "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/common/hexutil" "github.com/ethereum/go-ethereum/core/types" + "github.com/ethereum/go-ethereum/ethclient" ocr2keepers "github.com/smartcontractkit/chainlink-automation/pkg/v3/types" + evm21 "github.com/smartcontractkit/chainlink/v2/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21" + "github.com/smartcontractkit/chainlink/v2/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/mercury" + "github.com/smartcontractkit/chainlink/core/scripts/chaincli/config" "github.com/smartcontractkit/chainlink/core/scripts/common" "github.com/smartcontractkit/chainlink/v2/core/gethwrappers/generated/automation_utils_2_1" iregistry21 "github.com/smartcontractkit/chainlink/v2/core/gethwrappers/generated/i_keeper_registry_master_wrapper_2_1" + "github.com/smartcontractkit/chainlink/v2/core/logger" + "github.com/smartcontractkit/chainlink/v2/core/services/ocr2/models" "github.com/smartcontractkit/chainlink/v2/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/core" "github.com/smartcontractkit/chainlink/v2/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/encoding" "github.com/smartcontractkit/chainlink/v2/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/mercury/streams" @@ -35,12 +41,7 @@ import ( const ( ConditionTrigger uint8 = iota LogTrigger - - blockNumber = "blockNumber" expectedTypeAndVersion = "KeeperRegistry 2.1.0" - feedIdHex = "feedIdHex" - feedIDs = "feedIDs" - timestamp = "timestamp" ) var packer = encoding.NewAbiPacker() @@ -51,24 +52,29 @@ func (k *Keeper) Debug(ctx context.Context, args []string) { if len(args) < 1 { failCheckArgs("no upkeepID supplied", nil) } + // test that we are connected to an archive node _, err := k.client.BalanceAt(ctx, gethcommon.Address{}, big.NewInt(1)) if err != nil { failCheckConfig("you are not connected to an archive node; try using infura or alchemy", err) } + chainIDBig, err := k.client.ChainID(ctx) if err != nil { failUnknown("unable to retrieve chainID from rpc client", err) } chainID := chainIDBig.Int64() + + var triggerCallOpts *bind.CallOpts // use latest block for conditionals, but use block from tx for log triggers + latestCallOpts := &bind.CallOpts{Context: ctx} // always use latest block + // connect to registry contract - latestCallOpts := &bind.CallOpts{Context: ctx} // always use latest block - triggerCallOpts := &bind.CallOpts{Context: ctx} // use latest block for conditionals, but use block from tx for log triggers registryAddress := gethcommon.HexToAddress(k.cfg.RegistryAddress) keeperRegistry21, err := iregistry21.NewIKeeperRegistryMaster(registryAddress, k.client) if err != nil { failUnknown("failed to connect to registry contract", err) } + // verify contract is correct typeAndVersion, err := keeperRegistry21.TypeAndVersion(latestCallOpts) if err != nil { @@ -124,17 +130,21 @@ func (k *Keeper) Debug(ctx context.Context, args []string) { var checkResult iregistry21.CheckUpkeep var blockNum uint64 var performData []byte + var workID [32]byte + var trigger ocr2keepers.Trigger upkeepNeeded := false // check upkeep if triggerType == ConditionTrigger { message("upkeep identified as conditional trigger") - tmpCheckResult, err := keeperRegistry21.CheckUpkeep0(latestCallOpts, upkeepID) + var tmpCheckResult iregistry21.CheckUpkeep0 + tmpCheckResult, err = keeperRegistry21.CheckUpkeep0(latestCallOpts, upkeepID) if err != nil { failUnknown("failed to check upkeep: ", err) } checkResult = iregistry21.CheckUpkeep(tmpCheckResult) // do tenderly simulation - rawCall, err := core.RegistryABI.Pack("checkUpkeep", upkeepID, []byte{}) + var rawCall []byte + rawCall, err = core.RegistryABI.Pack("checkUpkeep", upkeepID, []byte{}) if err != nil { failUnknown("failed to pack raw checkUpkeep call", err) } @@ -146,19 +156,26 @@ func (k *Keeper) Debug(ctx context.Context, args []string) { failCheckArgs("txHash and log index must be supplied to command in order to debug log triggered upkeeps", nil) } txHash := gethcommon.HexToHash(args[1]) - logIndex, err := strconv.ParseInt(args[2], 10, 64) + + var logIndex int64 + logIndex, err = strconv.ParseInt(args[2], 10, 64) if err != nil { failCheckArgs("unable to parse log index", err) } - // find transaction receipt - _, isPending, err := k.client.TransactionByHash(ctx, txHash) + + // check that tx is confirmed + var isPending bool + _, isPending, err = k.client.TransactionByHash(ctx, txHash) if err != nil { log.Fatal("failed to get tx by hash", err) } if isPending { resolveIneligible(fmt.Sprintf("tx %s is still pending confirmation", txHash)) } - receipt, err := k.client.TransactionReceipt(ctx, txHash) + + // find transaction receipt + var receipt *types.Receipt + receipt, err = k.client.TransactionReceipt(ctx, txHash) if err != nil { failCheckArgs("failed to fetch tx receipt", err) } @@ -176,9 +193,11 @@ func (k *Keeper) Debug(ctx context.Context, args []string) { } // check that tx for this upkeep / tx was not already performed message(fmt.Sprintf("LogTrigger{blockNum: %d, blockHash: %s, txHash: %s, logIndex: %d}", blockNum, receipt.BlockHash.Hex(), txHash, logIndex)) - workID := mustUpkeepWorkID(upkeepID, blockNum, receipt.BlockHash, txHash, logIndex) + trigger = mustAutomationTrigger(txHash, logIndex, blockNum, receipt.BlockHash) + workID = mustUpkeepWorkID(upkeepID, trigger) message(fmt.Sprintf("workID computed: %s", hex.EncodeToString(workID[:]))) - hasKey, err := keeperRegistry21.HasDedupKey(latestCallOpts, workID) + var hasKey bool + hasKey, err = keeperRegistry21.HasDedupKey(latestCallOpts, workID) if err != nil { failUnknown("failed to check if upkeep was already performed: ", err) } @@ -186,11 +205,13 @@ func (k *Keeper) Debug(ctx context.Context, args []string) { resolveIneligible("upkeep was already performed") } triggerCallOpts = &bind.CallOpts{Context: ctx, BlockNumber: big.NewInt(receipt.BlockNumber.Int64())} - rawTriggerConfig, err := keeperRegistry21.GetUpkeepTriggerConfig(triggerCallOpts, upkeepID) + var rawTriggerConfig []byte + rawTriggerConfig, err = keeperRegistry21.GetUpkeepTriggerConfig(triggerCallOpts, upkeepID) if err != nil { failUnknown("failed to fetch trigger config for upkeep", err) } - triggerConfig, err := packer.UnpackLogTriggerConfig(rawTriggerConfig) + var triggerConfig automation_utils_2_1.LogTriggerConfig + triggerConfig, err = packer.UnpackLogTriggerConfig(rawTriggerConfig) if err != nil { failUnknown("failed to unpack trigger config", err) } @@ -200,11 +221,13 @@ func (k *Keeper) Debug(ctx context.Context, args []string) { if !logMatchesTriggerConfig(triggeringEvent, triggerConfig) { resolveIneligible("log does not match trigger config") } - header, err := k.client.HeaderByHash(ctx, receipt.BlockHash) + var header *types.Header + header, err = k.client.HeaderByHash(ctx, receipt.BlockHash) if err != nil { failUnknown("failed to find block", err) } - triggerData, err := packTriggerData(triggeringEvent, header.Time) + var triggerData []byte + triggerData, err = packTriggerData(triggeringEvent, header.Time) if err != nil { failUnknown("failed to pack trigger data", err) } @@ -213,7 +236,8 @@ func (k *Keeper) Debug(ctx context.Context, args []string) { failUnknown("failed to check upkeep", err) } // do tenderly simulations - rawCall, err := core.RegistryABI.Pack("checkUpkeep", upkeepID, triggerData) + var rawCall []byte + rawCall, err = core.RegistryABI.Pack("checkUpkeep", upkeepID, triggerData) if err != nil { failUnknown("failed to pack raw checkUpkeep call", err) } @@ -228,73 +252,88 @@ func (k *Keeper) Debug(ctx context.Context, args []string) { if checkResult.UpkeepFailureReason != 0 { message(fmt.Sprintf("checkUpkeep failed with UpkeepFailureReason %d", checkResult.UpkeepFailureReason)) } + if checkResult.UpkeepFailureReason == uint8(encoding.UpkeepFailureReasonTargetCheckReverted) { - // TODO use the new streams lookup lib - //mc := &models.MercuryCredentials{k.cfg.MercuryLegacyURL, k.cfg.MercuryURL, k.cfg.MercuryID, k.cfg.MercuryKey} - //mercuryConfig := evm.NewMercuryConfig(mc, core.StreamsCompatibleABI) - //lggr, _ := logger.NewLogger() - //blockSub := &blockSubscriber{k.client} - //_ = streams.NewStreamsLookup(packer, mercuryConfig, blockSub, keeperRegistry21, k.rpcClient, lggr) - - streamsLookupErr, err := packer.DecodeStreamsLookupRequest(checkResult.PerformData) + mc := &models.MercuryCredentials{LegacyURL: k.cfg.MercuryLegacyURL, URL: k.cfg.MercuryURL, Username: k.cfg.MercuryID, Password: k.cfg.MercuryKey} + mercuryConfig := evm21.NewMercuryConfig(mc, core.StreamsCompatibleABI) + lggr, _ := logger.NewLogger() + blockSub := &blockSubscriber{k.client} + streams := streams.NewStreamsLookup(packer, mercuryConfig, blockSub, k.rpcClient, keeperRegistry21, lggr) + + var streamsLookupErr *mercury.StreamsLookupError + streamsLookupErr, err = packer.DecodeStreamsLookupRequest(checkResult.PerformData) if err == nil { message("upkeep reverted with StreamsLookup") message(fmt.Sprintf("StreamsLookup data: {FeedParamKey: %s, Feeds: %v, TimeParamKey: %s, Time: %d, ExtraData: %s}", streamsLookupErr.FeedParamKey, streamsLookupErr.Feeds, streamsLookupErr.TimeParamKey, streamsLookupErr.Time.Uint64(), hexutil.Encode(streamsLookupErr.ExtraData))) - if streamsLookupErr.FeedParamKey == feedIdHex && streamsLookupErr.TimeParamKey == blockNumber { + + streamsLookup := &mercury.StreamsLookup{ + StreamsLookupError: &mercury.StreamsLookupError{ + FeedParamKey: streamsLookupErr.FeedParamKey, + Feeds: streamsLookupErr.Feeds, + TimeParamKey: streamsLookupErr.TimeParamKey, + Time: streamsLookupErr.Time, + ExtraData: streamsLookupErr.ExtraData, + }, + UpkeepId: upkeepID, + Block: blockNum, + } + + if streamsLookup.IsMercuryV02() { message("using mercury lookup v0.2") - // handle v0.2 - cfg, err := keeperRegistry21.GetUpkeepPrivilegeConfig(triggerCallOpts, upkeepID) + // check if upkeep is allowed to use mercury v0.2 + var allowed bool + _, _, _, allowed, err = streams.AllowedToUseMercury(latestCallOpts, upkeepID) if err != nil { - failUnknown("failed to get upkeep privilege config ", err) - } - allowed := false - if len(cfg) > 0 { - var privilegeConfig streams.UpkeepPrivilegeConfig - if err := json.Unmarshal(cfg, &privilegeConfig); err != nil { - failUnknown("failed to unmarshal privilege config ", err) - } - allowed = privilegeConfig.MercuryEnabled + failUnknown("failed to check if upkeep is allowed to use mercury", err) } if !allowed { resolveIneligible("upkeep reverted with StreamsLookup but is not allowed to access streams") } - } else if streamsLookupErr.FeedParamKey != feedIDs || streamsLookupErr.TimeParamKey != timestamp { + } else if streamsLookup.IsMercuryV03() { // handle v0.3 - resolveIneligible("upkeep reverted with StreamsLookup but the configuration is invalid") - } else { message("using mercury lookup v0.3") + } else { + resolveIneligible("upkeep reverted with StreamsLookup but the configuration is invalid") } - streamsLookup := &StreamsLookup{streamsLookupErr.FeedParamKey, streamsLookupErr.Feeds, streamsLookupErr.TimeParamKey, streamsLookupErr.Time, streamsLookupErr.ExtraData, upkeepID, blockNum} if k.cfg.MercuryLegacyURL == "" || k.cfg.MercuryURL == "" || k.cfg.MercuryID == "" || k.cfg.MercuryKey == "" { failCheckConfig("Mercury configs not set properly, check your MERCURY_LEGACY_URL, MERCURY_URL, MERCURY_ID and MERCURY_KEY", nil) } - handler := NewMercuryLookupHandler(&MercuryCredentials{k.cfg.MercuryLegacyURL, k.cfg.MercuryURL, k.cfg.MercuryID, k.cfg.MercuryKey}, k.rpcClient) - state, failureReason, values, _, err := handler.doMercuryRequest(ctx, streamsLookup) - if failureReason == UpkeepFailureReasonInvalidRevertDataInput { + + // do mercury request + automationCheckResult := mustAutomationCheckResult(upkeepID, checkResult, trigger) + checkResults := []ocr2keepers.CheckResult{automationCheckResult} + + var values [][]byte + values, err = streams.DoMercuryRequest(ctx, streamsLookup, checkResults, 0) + + if automationCheckResult.IneligibilityReason == uint8(mercury.MercuryUpkeepFailureReasonInvalidRevertDataInput) { resolveIneligible("upkeep used invalid revert data") } - if state == InvalidMercuryRequest { + if automationCheckResult.PipelineExecutionState == uint8(mercury.InvalidMercuryRequest) { resolveIneligible("the mercury request data is invalid") } if err != nil { failCheckConfig("failed to do mercury request ", err) } - callbackResult, err := keeperRegistry21.CheckCallback(triggerCallOpts, upkeepID, values, streamsLookup.extraData) + + // do checkCallback + err = streams.CheckCallback(ctx, values, streamsLookup, checkResults, 0) if err != nil { failUnknown("failed to execute mercury callback ", err) } - if callbackResult.UpkeepFailureReason != 0 { - message(fmt.Sprintf("checkCallback failed with UpkeepFailureReason %d", checkResult.UpkeepFailureReason)) + if automationCheckResult.IneligibilityReason != 0 { + message(fmt.Sprintf("checkCallback failed with UpkeepFailureReason %d", automationCheckResult.IneligibilityReason)) } - upkeepNeeded, performData = callbackResult.UpkeepNeeded, callbackResult.PerformData - // do tenderly simulations - rawCall, err := core.RegistryABI.Pack("checkCallback", upkeepID, values, streamsLookup.extraData) + upkeepNeeded, performData = automationCheckResult.Eligible, automationCheckResult.PerformData + // do tenderly simulations for checkCallback + var rawCall []byte + rawCall, err = core.RegistryABI.Pack("checkCallback", upkeepID, values, streamsLookup.ExtraData) if err != nil { failUnknown("failed to pack raw checkCallback call", err) } addLink("checkCallback simulation", tenderlySimLink(k.cfg, chainID, blockNum, rawCall, registryAddress)) - rawCall, err = core.StreamsCompatibleABI.Pack("checkCallback", values, streamsLookup.extraData) + rawCall, err = core.StreamsCompatibleABI.Pack("checkCallback", values, streamsLookup.ExtraData) if err != nil { failUnknown("failed to pack raw checkCallback (direct) call", err) } @@ -316,6 +355,39 @@ func (k *Keeper) Debug(ctx context.Context, args []string) { } } +func mustAutomationCheckResult(upkeepID *big.Int, checkResult iregistry21.CheckUpkeep, trigger ocr2keepers.Trigger) ocr2keepers.CheckResult { + upkeepIdentifier := mustUpkeepIdentifier(upkeepID) + checkResult2 := ocr2keepers.CheckResult{ + Eligible: checkResult.UpkeepNeeded, + IneligibilityReason: checkResult.UpkeepFailureReason, + UpkeepID: upkeepIdentifier, + Trigger: trigger, + WorkID: core.UpkeepWorkID(upkeepIdentifier, trigger), + GasAllocated: 0, + PerformData: checkResult.PerformData, + FastGasWei: checkResult.FastGasWei, + LinkNative: checkResult.LinkNative, + } + + return checkResult2 +} + +type blockSubscriber struct { + ethClient *ethclient.Client +} + +func (bs *blockSubscriber) LatestBlock() *ocr2keepers.BlockKey { + header, err := bs.ethClient.HeaderByNumber(context.Background(), nil) + if err != nil { + return nil + } + + return &ocr2keepers.BlockKey{ + Number: ocr2keepers.BlockNumber(header.Number.Uint64()), + Hash: header.Hash(), + } +} + func logMatchesTriggerConfig(log *types.Log, config automation_utils_2_1.LogTriggerConfig) bool { if log.Topics[0] != config.Topic0 { return false @@ -353,9 +425,27 @@ func packTriggerData(log *types.Log, blockTime uint64) ([]byte, error) { return b, nil } -func mustUpkeepWorkID(upkeepID *big.Int, blockNum uint64, blockHash [32]byte, txHash [32]byte, logIndex int64) [32]byte { - // TODO - this is a copy of the code in core.UpkeepWorkID - // We should refactor that code to be more easily exported ex not rely on Trigger structs +func mustUpkeepWorkID(upkeepID *big.Int, trigger ocr2keepers.Trigger) [32]byte { + upkeepIdentifier := mustUpkeepIdentifier(upkeepID) + + workID := core.UpkeepWorkID(upkeepIdentifier, trigger) + workIDBytes, err := hex.DecodeString(workID) + if err != nil { + failUnknown("failed to decode workID", err) + } + + var result [32]byte + copy(result[:], workIDBytes[:]) + return result +} + +func mustUpkeepIdentifier(upkeepID *big.Int) ocr2keepers.UpkeepIdentifier { + upkeepIdentifier := &ocr2keepers.UpkeepIdentifier{} + upkeepIdentifier.FromBigInt(upkeepID) + return *upkeepIdentifier +} + +func mustAutomationTrigger(txHash [32]byte, logIndex int64, blockNum uint64, blockHash [32]byte) ocr2keepers.Trigger { trigger := ocr2keepers.Trigger{ LogTriggerExtension: &ocr2keepers.LogTriggerExtension{ TxHash: txHash, @@ -364,16 +454,7 @@ func mustUpkeepWorkID(upkeepID *big.Int, blockNum uint64, blockHash [32]byte, tx BlockHash: blockHash, }, } - upkeepIdentifier := &ocr2keepers.UpkeepIdentifier{} - upkeepIdentifier.FromBigInt(upkeepID) - workID := core.UpkeepWorkID(*upkeepIdentifier, trigger) - workIDBytes, err := hex.DecodeString(workID) - if err != nil { - failUnknown("failed to decode workID", err) - } - var result [32]byte - copy(result[:], workIDBytes[:]) - return result + return trigger } func message(msg string) { @@ -385,11 +466,11 @@ func warning(msg string) { } func resolveIneligible(msg string) { - exit(fmt.Sprintf("✅ %s: this upkeep is not currently elligible", msg), nil, 0) + exit(fmt.Sprintf("✅ %s: this upkeep is not currently eligible", msg), nil, 0) } func resolveEligible() { - exit("❌ this upkeep is currently elligible", nil, 0) + exit("❌ this upkeep is currently eligible", nil, 0) } func rerun(msg string, err error) { @@ -490,5 +571,3 @@ func tenderlySimLink(cfg *config.Config, chainID int64, blockNumber uint64, inpu } return common.TenderlySimLink(responseJSON.Simulation.Id) } - -// TODO - link to performUpkeep tx if exists diff --git a/core/scripts/chaincli/handler/mercury_lookup_handler.go b/core/scripts/chaincli/handler/mercury_lookup_handler.go deleted file mode 100644 index 1165d83921d..00000000000 --- a/core/scripts/chaincli/handler/mercury_lookup_handler.go +++ /dev/null @@ -1,536 +0,0 @@ -package handler - -import ( - "context" - "crypto/hmac" - "crypto/sha256" - "encoding/hex" - "encoding/json" - "fmt" - "io" - "math/big" - "net/http" - "net/url" - "strconv" - "strings" - "time" - - "github.com/avast/retry-go" - ethabi "github.com/ethereum/go-ethereum/accounts/abi" - "github.com/ethereum/go-ethereum/common" - "github.com/ethereum/go-ethereum/common/hexutil" - "github.com/ethereum/go-ethereum/rpc" - "github.com/pkg/errors" -) - -// MercuryLookupHandler is responsible for initiating the calls to the Mercury server -// to determine whether the upkeeps are eligible -type MercuryLookupHandler struct { - credentials *MercuryCredentials - httpClient HttpClient - rpcClient *rpc.Client -} - -func NewMercuryLookupHandler( - credentials *MercuryCredentials, - rpcClient *rpc.Client, -) *MercuryLookupHandler { - return &MercuryLookupHandler{ - credentials: credentials, - httpClient: http.DefaultClient, - rpcClient: rpcClient, - } -} - -type MercuryVersion string - -type StreamsLookup struct { - feedParamKey string - feeds []string - timeParamKey string - time *big.Int - extraData []byte - upkeepId *big.Int - block uint64 -} - -//go:generate mockery --quiet --name HttpClient --output ./mocks/ --case=underscore -type HttpClient interface { - Do(req *http.Request) (*http.Response, error) -} - -type MercuryCredentials struct { - LegacyURL string - URL string - ClientID string - ClientKey string -} - -func (mc *MercuryCredentials) Validate() bool { - return mc.URL != "" && mc.ClientID != "" && mc.ClientKey != "" -} - -type MercuryData struct { - Index int - Error error - Retryable bool - Bytes [][]byte - State PipelineExecutionState -} - -// MercuryV02Response represents a JSON structure used by Mercury v0.2 -type MercuryV02Response struct { - ChainlinkBlob string `json:"chainlinkBlob"` -} - -// MercuryV03Response represents a JSON structure used by Mercury v0.3 -type MercuryV03Response struct { - Reports []MercuryV03Report `json:"reports"` -} - -type MercuryV03Report struct { - FeedID string `json:"feedID"` // feed id in hex encoded - ValidFromTimestamp uint32 `json:"validFromTimestamp"` - ObservationsTimestamp uint32 `json:"observationsTimestamp"` - FullReport string `json:"fullReport"` // the actual hex encoded mercury report of this feed, can be sent to verifier -} - -const ( - // DefaultAllowListExpiration decides how long an upkeep's allow list info will be valid for. - DefaultAllowListExpiration = 20 * time.Minute - // CleanupInterval decides when the expired items in cache will be deleted. - CleanupInterval = 25 * time.Minute -) - -const ( - ApplicationJson = "application/json" - BlockNumber = "blockNumber" // valid for v0.2 - FeedIDs = "feedIDs" // valid for v0.3 - FeedIdHex = "feedIdHex" // valid for v0.2 - HeaderAuthorization = "Authorization" - HeaderContentType = "Content-Type" - HeaderTimestamp = "X-Authorization-Timestamp" - HeaderSignature = "X-Authorization-Signature-SHA256" - HeaderUpkeepId = "X-Authorization-Upkeep-Id" - MercuryPathV2 = "/client?" // only used to access mercury v0.2 server - MercuryBatchPathV3 = "/api/v1/reports/bulk?" // only used to access mercury v0.3 server - RetryDelay = 500 * time.Millisecond - Timestamp = "timestamp" // valid for v0.3 - TotalAttempt = 3 - UserId = "userId" -) - -type UpkeepFailureReason uint8 -type PipelineExecutionState uint8 - -const ( - // upkeep failure onchain reasons - UpkeepFailureReasonNone UpkeepFailureReason = 0 - UpkeepFailureReasonUpkeepCancelled UpkeepFailureReason = 1 - UpkeepFailureReasonUpkeepPaused UpkeepFailureReason = 2 - UpkeepFailureReasonTargetCheckReverted UpkeepFailureReason = 3 - UpkeepFailureReasonUpkeepNotNeeded UpkeepFailureReason = 4 - UpkeepFailureReasonPerformDataExceedsLimit UpkeepFailureReason = 5 - UpkeepFailureReasonInsufficientBalance UpkeepFailureReason = 6 - UpkeepFailureReasonMercuryCallbackReverted UpkeepFailureReason = 7 - UpkeepFailureReasonRevertDataExceedsLimit UpkeepFailureReason = 8 - UpkeepFailureReasonRegistryPaused UpkeepFailureReason = 9 - // leaving a gap here for more onchain failure reasons in the future - // upkeep failure offchain reasons - UpkeepFailureReasonMercuryAccessNotAllowed UpkeepFailureReason = 32 - UpkeepFailureReasonTxHashNoLongerExists UpkeepFailureReason = 33 - UpkeepFailureReasonInvalidRevertDataInput UpkeepFailureReason = 34 - UpkeepFailureReasonSimulationFailed UpkeepFailureReason = 35 - UpkeepFailureReasonTxHashReorged UpkeepFailureReason = 36 - - // pipeline execution error - NoPipelineError PipelineExecutionState = 0 - CheckBlockTooOld PipelineExecutionState = 1 - CheckBlockInvalid PipelineExecutionState = 2 - RpcFlakyFailure PipelineExecutionState = 3 - MercuryFlakyFailure PipelineExecutionState = 4 - PackUnpackDecodeFailed PipelineExecutionState = 5 - MercuryUnmarshalError PipelineExecutionState = 6 - InvalidMercuryRequest PipelineExecutionState = 7 - InvalidMercuryResponse PipelineExecutionState = 8 // this will only happen if Mercury server sends bad responses - UpkeepNotAuthorized PipelineExecutionState = 9 -) - -// UpkeepPrivilegeConfig represents the administrative offchain config for each upkeep. It can be set by s_upkeepPrivilegeManager -// role on the registry. Upkeeps allowed to use Mercury server will have this set to true. -type UpkeepPrivilegeConfig struct { - MercuryEnabled bool `json:"mercuryEnabled"` -} - -// generateHMAC calculates a user HMAC for Mercury server authentication. -func (mlh *MercuryLookupHandler) generateHMAC(method string, path string, body []byte, clientId string, secret string, ts int64) string { - bodyHash := sha256.New() - bodyHash.Write(body) - hashString := fmt.Sprintf("%s %s %s %s %d", - method, - path, - hex.EncodeToString(bodyHash.Sum(nil)), - clientId, - ts) - signedMessage := hmac.New(sha256.New, []byte(secret)) - signedMessage.Write([]byte(hashString)) - userHmac := hex.EncodeToString(signedMessage.Sum(nil)) - return userHmac -} - -// singleFeedRequest sends a v0.2 Mercury request for a single feed report. -func (mlh *MercuryLookupHandler) singleFeedRequest(ctx context.Context, ch chan<- MercuryData, index int, ml *StreamsLookup) { - q := url.Values{ - ml.feedParamKey: {ml.feeds[index]}, - ml.timeParamKey: {ml.time.String()}, - } - mercuryURL := mlh.credentials.LegacyURL - reqUrl := fmt.Sprintf("%s%s%s", mercuryURL, MercuryPathV2, q.Encode()) - // mlh.logger.Debugf("request URL for upkeep %s feed %s: %s", ml.upkeepId.String(), ml.feeds[index], reqUrl) - - req, err := http.NewRequestWithContext(ctx, http.MethodGet, reqUrl, nil) - if err != nil { - ch <- MercuryData{Index: index, Error: err, Retryable: false, State: InvalidMercuryRequest} - return - } - - ts := time.Now().UTC().UnixMilli() - signature := mlh.generateHMAC(http.MethodGet, MercuryPathV2+q.Encode(), []byte{}, mlh.credentials.ClientID, mlh.credentials.ClientKey, ts) - req.Header.Set(HeaderContentType, ApplicationJson) - req.Header.Set(HeaderAuthorization, mlh.credentials.ClientID) - req.Header.Set(HeaderTimestamp, strconv.FormatInt(ts, 10)) - req.Header.Set(HeaderSignature, signature) - - // in the case of multiple retries here, use the last attempt's data - state := NoPipelineError - retryable := false - sent := false - retryErr := retry.Do( - func() error { - retryable = false - resp, err1 := mlh.httpClient.Do(req) - if err1 != nil { - // mlh.logger.Errorw("StreamsLookup GET request failed", "upkeepID", ml.upkeepId.String(), "time", ml.time.String(), "feed", ml.feeds[index], "error", err1) - retryable = true - state = MercuryFlakyFailure - return err1 - } - defer func(Body io.ReadCloser) { - err := Body.Close() - if err != nil { - _ = "" // placate linter - // mlh.logger.Errorf("Encountered error when closing the body of the response in single feed: %s", err) - } - }(resp.Body) - - body, err1 := io.ReadAll(resp.Body) - if err1 != nil { - retryable = false - state = InvalidMercuryResponse - return err1 - } - - if resp.StatusCode == http.StatusNotFound || resp.StatusCode == http.StatusInternalServerError { - // mlh.logger.Errorw("StreamsLookup received retryable status code", "upkeepID", ml.upkeepId.String(), "time", ml.time.String(), "statusCode", resp.StatusCode, "feed", ml.feeds[index]) - retryable = true - state = MercuryFlakyFailure - return errors.New(strconv.FormatInt(int64(resp.StatusCode), 10)) - } else if resp.StatusCode != http.StatusOK { - retryable = false - state = InvalidMercuryRequest - return fmt.Errorf("StreamsLookup upkeep %s block %s received status code %d for feed %s", ml.upkeepId.String(), ml.time.String(), resp.StatusCode, ml.feeds[index]) - } - - // mlh.logger.Debugf("at block %s upkeep %s received status code %d from mercury v0.2 with BODY=%s", ml.time.String(), ml.upkeepId.String(), resp.StatusCode, hexutil.Encode(body)) - - var m MercuryV02Response - err1 = json.Unmarshal(body, &m) - if err1 != nil { - // mlh.logger.Errorw("StreamsLookup failed to unmarshal body to MercuryResponse", "upkeepID", ml.upkeepId.String(), "time", ml.time.String(), "feed", ml.feeds[index], "error", err1) - retryable = false - state = MercuryUnmarshalError - return err1 - } - blobBytes, err1 := hexutil.Decode(m.ChainlinkBlob) - if err1 != nil { - // mlh.logger.Errorw("StreamsLookup failed to decode chainlinkBlob for feed", "upkeepID", ml.upkeepId.String(), "time", ml.time.String(), "blob", m.ChainlinkBlob, "feed", ml.feeds[index], "error", err1) - retryable = false - state = InvalidMercuryResponse - return err1 - } - ch <- MercuryData{ - Index: index, - Bytes: [][]byte{blobBytes}, - Retryable: false, - State: NoPipelineError, - } - sent = true - return nil - }, - // only retry when the error is 404 Not Found or 500 Internal Server Error - retry.RetryIf(func(err error) bool { - return err.Error() == fmt.Sprintf("%d", http.StatusNotFound) || err.Error() == fmt.Sprintf("%d", http.StatusInternalServerError) - }), - retry.Context(ctx), - retry.Delay(RetryDelay), - retry.Attempts(TotalAttempt)) - - if !sent { - md := MercuryData{ - Index: index, - Bytes: [][]byte{}, - Retryable: retryable, - Error: fmt.Errorf("failed to request feed for %s: %w", ml.feeds[index], retryErr), - State: state, - } - ch <- md - } -} - -// multiFeedsRequest sends a Mercury v0.3 request for a multi-feed report -func (mlh *MercuryLookupHandler) multiFeedsRequest(ctx context.Context, ch chan<- MercuryData, ml *StreamsLookup) { - params := fmt.Sprintf("%s=%s&%s=%s", FeedIDs, strings.Join(ml.feeds, ","), Timestamp, ml.time.String()) - reqUrl := fmt.Sprintf("%s%s%s", mlh.credentials.URL, MercuryBatchPathV3, params) - // mlh.logger.Debugf("request URL for upkeep %s userId %s: %s", ml.upkeepId.String(), mlh.credentials.ClientID, reqUrl) - - req, err := http.NewRequestWithContext(ctx, http.MethodGet, reqUrl, nil) - if err != nil { - ch <- MercuryData{Index: 0, Error: err, Retryable: false, State: InvalidMercuryRequest} - return - } - - ts := time.Now().UTC().UnixMilli() - signature := mlh.generateHMAC(http.MethodGet, MercuryBatchPathV3+params, []byte{}, mlh.credentials.ClientID, mlh.credentials.ClientKey, ts) - req.Header.Set(HeaderContentType, ApplicationJson) - // username here is often referred to as user id - req.Header.Set(HeaderAuthorization, mlh.credentials.ClientID) - req.Header.Set(HeaderTimestamp, strconv.FormatInt(ts, 10)) - req.Header.Set(HeaderSignature, signature) - // mercury will inspect authorization headers above to make sure this user (in automation's context, this node) is eligible to access mercury - // and if it has an automation role. it will then look at this upkeep id to check if it has access to all the requested feeds. - req.Header.Set(HeaderUpkeepId, ml.upkeepId.String()) - - // in the case of multiple retries here, use the last attempt's data - state := NoPipelineError - retryable := false - sent := false - retryErr := retry.Do( - func() error { - retryable = false - resp, err1 := mlh.httpClient.Do(req) - if err1 != nil { - // mlh.logger.Errorw("StreamsLookup GET request fails for multi feed", "upkeepID", ml.upkeepId.String(), "time", ml.time.String(), "error", err1) - retryable = true - state = MercuryFlakyFailure - return err1 - } - defer func(Body io.ReadCloser) { - err := Body.Close() - if err != nil { - _ = "" // placate linter - // mlh.logger.Errorf("Encountered error when closing the body of the response in the multi feed: %s", err) - } - }(resp.Body) - body, err1 := io.ReadAll(resp.Body) - if err1 != nil { - retryable = false - state = InvalidMercuryResponse - return err1 - } - - // mlh.logger.Infof("at timestamp %s upkeep %s received status code %d from mercury v0.3", ml.time.String(), ml.upkeepId.String(), resp.StatusCode) - if resp.StatusCode == http.StatusUnauthorized { - retryable = false - state = UpkeepNotAuthorized - return fmt.Errorf("at timestamp %s upkeep %s received status code %d from mercury v0.3, most likely this is caused by unauthorized upkeep", ml.time.String(), ml.upkeepId.String(), resp.StatusCode) - } else if resp.StatusCode == http.StatusBadRequest { - retryable = false - state = InvalidMercuryRequest - return fmt.Errorf("at timestamp %s upkeep %s received status code %d from mercury v0.3, most likely this is caused by invalid format of timestamp", ml.time.String(), ml.upkeepId.String(), resp.StatusCode) - } else if resp.StatusCode == http.StatusInternalServerError { - retryable = true - state = MercuryFlakyFailure - return fmt.Errorf("%d", http.StatusInternalServerError) - } else if resp.StatusCode == 420 { - // in 0.3, this will happen when missing/malformed query args, missing or bad required headers, non-existent feeds, or no permissions for feeds - retryable = false - state = InvalidMercuryRequest - return fmt.Errorf("at timestamp %s upkeep %s received status code %d from mercury v0.3, most likely this is caused by missing/malformed query args, missing or bad required headers, non-existent feeds, or no permissions for feeds", ml.time.String(), ml.upkeepId.String(), resp.StatusCode) - } else if resp.StatusCode != http.StatusOK { - retryable = false - state = InvalidMercuryRequest - return fmt.Errorf("at timestamp %s upkeep %s received status code %d from mercury v0.3", ml.time.String(), ml.upkeepId.String(), resp.StatusCode) - } - - var response MercuryV03Response - err1 = json.Unmarshal(body, &response) - if err1 != nil { - // mlh.logger.Errorw("StreamsLookup failed to unmarshal body to MercuryResponse for multi feed", "upkeepID", ml.upkeepId.String(), "time", ml.time.String(), "error", err1) - retryable = false - state = MercuryUnmarshalError - return err1 - } - // in v0.3, if some feeds are not available, the server will only return available feeds, but we need to make sure ALL feeds are retrieved before calling user contract - // hence, retry in this case. retry will help when we send a very new timestamp and reports are not yet generated - if len(response.Reports) != len(ml.feeds) { - // TODO: AUTO-5044: calculate what reports are missing and log a warning - retryable = true - state = MercuryFlakyFailure - return fmt.Errorf("%d", http.StatusNotFound) - } - var reportBytes [][]byte - for _, rsp := range response.Reports { - b, err := hexutil.Decode(rsp.FullReport) - if err != nil { - retryable = false - state = InvalidMercuryResponse - return err - } - reportBytes = append(reportBytes, b) - } - ch <- MercuryData{ - Index: 0, - Bytes: reportBytes, - Retryable: false, - State: NoPipelineError, - } - sent = true - return nil - }, - // only retry when the error is 404 Not Found or 500 Internal Server Error - retry.RetryIf(func(err error) bool { - return err.Error() == fmt.Sprintf("%d", http.StatusNotFound) || err.Error() == fmt.Sprintf("%d", http.StatusInternalServerError) - }), - retry.Context(ctx), - retry.Delay(RetryDelay), - retry.Attempts(TotalAttempt)) - - if !sent { - md := MercuryData{ - Index: 0, - Bytes: [][]byte{}, - Retryable: retryable, - Error: retryErr, - State: state, - } - ch <- md - } -} - -// doMercuryRequest sends requests to Mercury API to retrieve ChainlinkBlob. -func (mlh *MercuryLookupHandler) doMercuryRequest(ctx context.Context, ml *StreamsLookup) (PipelineExecutionState, UpkeepFailureReason, [][]byte, bool, error) { - var isMercuryV03 bool - resultLen := len(ml.feeds) - ch := make(chan MercuryData, resultLen) - if len(ml.feeds) == 0 { - return NoPipelineError, UpkeepFailureReasonInvalidRevertDataInput, nil, false, fmt.Errorf("invalid revert data input: feed param key %s, time param key %s, feeds %s", ml.feedParamKey, ml.timeParamKey, ml.feeds) - } - if ml.feedParamKey == FeedIdHex && ml.timeParamKey == BlockNumber { - // only v0.2 - for i := range ml.feeds { - go mlh.singleFeedRequest(ctx, ch, i, ml) - } - } else if ml.feedParamKey == FeedIDs && ml.timeParamKey == Timestamp { - // only v0.3 - resultLen = 1 - isMercuryV03 = true - ch = make(chan MercuryData, resultLen) - go mlh.multiFeedsRequest(ctx, ch, ml) - } else { - return NoPipelineError, UpkeepFailureReasonInvalidRevertDataInput, nil, false, fmt.Errorf("invalid revert data input: feed param key %s, time param key %s, feeds %s", ml.feedParamKey, ml.timeParamKey, ml.feeds) - } - - var reqErr error - results := make([][]byte, len(ml.feeds)) - retryable := true - allSuccess := true - // in v0.2, use the last execution error as the state, if no execution errors, state will be no error - state := NoPipelineError - for i := 0; i < resultLen; i++ { - m := <-ch - if m.Error != nil { - if reqErr == nil { - reqErr = errors.New(m.Error.Error()) - } else { - reqErr = errors.New(reqErr.Error() + m.Error.Error()) - } - retryable = retryable && m.Retryable - allSuccess = false - if m.State != NoPipelineError { - state = m.State - } - continue - } - if isMercuryV03 { - results = m.Bytes - } else { - results[m.Index] = m.Bytes[0] - } - } - // only retry when not all successful AND none are not retryable - return state, UpkeepFailureReasonNone, results, retryable && !allSuccess, reqErr -} - -// decodeStreamsLookup decodes the revert error StreamsLookup(string feedParamKey, string[] feeds, string timeParamKey, uint256 time, byte[] extraData) -// func (mlh *MercuryLookupHandler) decodeStreamsLookup(data []byte) (*StreamsLookup, error) { -// e := mlh.mercuryConfig.Abi.Errors["StreamsLookup"] -// unpack, err := e.Unpack(data) -// if err != nil { -// return nil, fmt.Errorf("unpack error: %w", err) -// } -// errorParameters := unpack.([]interface{}) - -// return &StreamsLookup{ -// feedParamKey: *abi.ConvertType(errorParameters[0], new(string)).(*string), -// feeds: *abi.ConvertType(errorParameters[1], new([]string)).(*[]string), -// timeParamKey: *abi.ConvertType(errorParameters[2], new(string)).(*string), -// time: *abi.ConvertType(errorParameters[3], new(*big.Int)).(**big.Int), -// extraData: *abi.ConvertType(errorParameters[4], new([]byte)).(*[]byte), -// }, nil -// } - -// allowedToUseMercury retrieves upkeep's administrative offchain config and decode a mercuryEnabled bool to indicate if -// this upkeep is allowed to use Mercury service. -// func (mlh *MercuryLookupHandler) allowedToUseMercury(upkeep models.Upkeep) (bool, error) { -// allowed, ok := mlh.mercuryConfig.AllowListCache.Get(upkeep.Admin.Hex()) -// if ok { -// return allowed.(bool), nil -// } - -// if upkeep.UpkeepPrivilegeConfig == nil { -// return false, fmt.Errorf("the upkeep privilege config was not retrieved for upkeep with ID %s", upkeep.UpkeepID) -// } - -// if len(upkeep.UpkeepPrivilegeConfig) == 0 { -// return false, fmt.Errorf("the upkeep privilege config is empty") -// } - -// var a UpkeepPrivilegeConfig -// err := json.Unmarshal(upkeep.UpkeepPrivilegeConfig, &a) -// if err != nil { -// return false, fmt.Errorf("failed to unmarshal privilege config for upkeep ID %s: %v", upkeep.UpkeepID, err) -// } - -// mlh.mercuryConfig.AllowListCache.Set(upkeep.Admin.Hex(), a.MercuryEnabled, cache.DefaultExpiration) -// return a.MercuryEnabled, nil -// } - -func (mlh *MercuryLookupHandler) CheckCallback(ctx context.Context, values [][]byte, lookup *StreamsLookup, registryABI ethabi.ABI, registryAddress common.Address) (hexutil.Bytes, error) { - payload, err := registryABI.Pack("checkCallback", lookup.upkeepId, values, lookup.extraData) - if err != nil { - return nil, err - } - - var theBytes hexutil.Bytes - args := map[string]interface{}{ - "to": registryAddress.Hex(), - "data": hexutil.Bytes(payload), - } - - // call checkCallback function at the block which OCR3 has agreed upon - err = mlh.rpcClient.CallContext(ctx, &theBytes, "eth_call", args, hexutil.EncodeUint64(lookup.block)) - if err != nil { - return nil, err - } - return theBytes, nil -} diff --git a/core/scripts/go.mod b/core/scripts/go.mod index db71535723e..895363a53f8 100644 --- a/core/scripts/go.mod +++ b/core/scripts/go.mod @@ -6,7 +6,6 @@ go 1.21.3 replace github.com/smartcontractkit/chainlink/v2 => ../../ require ( - github.com/avast/retry-go v3.0.0+incompatible github.com/docker/docker v24.0.7+incompatible github.com/docker/go-connections v0.4.0 github.com/ethereum/go-ethereum v1.12.0 @@ -18,7 +17,6 @@ require ( github.com/montanaflynn/stats v0.7.1 github.com/olekukonko/tablewriter v0.0.5 github.com/pelletier/go-toml/v2 v2.1.1 - github.com/pkg/errors v0.9.1 github.com/shopspring/decimal v1.3.1 github.com/smartcontractkit/chainlink-automation v1.0.1 github.com/smartcontractkit/chainlink-vrf v0.0.0-20231120191722-fef03814f868 @@ -217,6 +215,7 @@ require ( github.com/patrickmn/go-cache v2.1.0+incompatible // indirect github.com/pelletier/go-toml v1.9.5 // indirect github.com/petermattis/goid v0.0.0-20230317030725-371a4b8eda08 // indirect + github.com/pkg/errors v0.9.1 // indirect github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 // indirect github.com/power-devops/perfstat v0.0.0-20210106213030-5aafc221ea8c // indirect github.com/pressly/goose/v3 v3.16.0 // indirect diff --git a/core/scripts/go.sum b/core/scripts/go.sum index 1c48074ea3a..dcf47d392cc 100644 --- a/core/scripts/go.sum +++ b/core/scripts/go.sum @@ -145,8 +145,6 @@ github.com/armon/go-metrics v0.0.0-20180917152333-f0300d1749da/go.mod h1:Q73ZrmV github.com/armon/go-metrics v0.4.1 h1:hR91U9KYmb6bLBYLQjyM+3j+rcd/UhE+G78SFnF8gJA= github.com/armon/go-metrics v0.4.1/go.mod h1:E6amYzXo6aW1tqzoZGT755KkbgrJsSdpwZ+3JqfkOG4= github.com/armon/go-radix v0.0.0-20180808171621-7fddfc383310/go.mod h1:ufUuZ+zHj4x4TnLV4JWEpy2hxWSpsRywHrMgIH9cCH8= -github.com/avast/retry-go v3.0.0+incompatible h1:4SOWQ7Qs+oroOTQOYnAHqelpCO0biHSxpiH9JdtuBj0= -github.com/avast/retry-go v3.0.0+incompatible/go.mod h1:XtSnn+n/sHqQIpZ10K1qAevBhOOCWBLXXy3hyiqqBrY= github.com/avast/retry-go/v4 v4.5.1 h1:AxIx0HGi4VZ3I02jr78j5lZ3M6x1E0Ivxa6b0pUUh7o= github.com/avast/retry-go/v4 v4.5.1/go.mod h1:/sipNsvNB3RRuT5iNcb6h73nw3IBmXJ/H3XrCQYSOpc= github.com/aws/aws-sdk-go v1.22.1/go.mod h1:KmX6BPdI08NWTb3/sm4ZGu5ShLoqVDhKgpiN924inxo= diff --git a/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/mercury/streams/streams.go b/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/mercury/streams/streams.go index 70db40c26bb..d64a9e7ef83 100644 --- a/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/mercury/streams/streams.go +++ b/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/mercury/streams/streams.go @@ -16,7 +16,6 @@ import ( "github.com/patrickmn/go-cache" ocr2keepers "github.com/smartcontractkit/chainlink-automation/pkg/v3/types" - "github.com/smartcontractkit/chainlink-common/pkg/services" iregistry21 "github.com/smartcontractkit/chainlink/v2/core/gethwrappers/generated/i_keeper_registry_master_wrapper_2_1" @@ -131,7 +130,6 @@ func (s *streams) buildResult(ctx context.Context, i int, checkResult ocr2keeper // tried to call mercury lookupLggr.Infof("at block %d upkeep %s trying to DecodeStreamsLookupRequest performData=%s", block, upkeepId, hexutil.Encode(checkResults[i].PerformData)) streamsLookupErr, err := s.packer.DecodeStreamsLookupRequest(checkResult.PerformData) - if err != nil { lookupLggr.Debugf("at block %d upkeep %s DecodeStreamsLookupRequest failed: %v", block, upkeepId, err) // user contract did not revert with StreamsLookup error @@ -149,7 +147,7 @@ func (s *streams) buildResult(ctx context.Context, i int, checkResult ocr2keeper if streamsLookupResponse.IsMercuryV02() { // check permission on the registry for mercury v0.2 opts := s.buildCallOpts(ctx, block) - if state, reason, retryable, allowed, err := s.allowedToUseMercury(opts, upkeepId.BigInt()); err != nil { + if state, reason, retryable, allowed, err := s.AllowedToUseMercury(opts, upkeepId.BigInt()); err != nil { lookupLggr.Warnf("at block %s upkeep %s failed to query mercury allow list: %s", block, upkeepId, err) checkResults[i].PipelineExecutionState = uint8(state) checkResults[i].IneligibilityReason = uint8(reason) @@ -178,65 +176,93 @@ func (s *streams) buildResult(ctx context.Context, i int, checkResult ocr2keeper func (s *streams) doLookup(ctx context.Context, wg *sync.WaitGroup, lookup *mercury.StreamsLookup, i int, checkResults []ocr2keepers.CheckResult) { defer wg.Done() - state, reason, values, retryable, retryInterval, err := mercury.NoPipelineError, mercury.MercuryUpkeepFailureReasonInvalidRevertDataInput, [][]byte{}, false, 0*time.Second, fmt.Errorf("invalid revert data input: feed param key %s, time param key %s, feeds %s", lookup.FeedParamKey, lookup.TimeParamKey, lookup.Feeds) - pluginRetryKey := generatePluginRetryKey(checkResults[i].WorkID, lookup.Block) + values, err := s.DoMercuryRequest(ctx, lookup, checkResults, i) + if err != nil { + s.lggr.Errorf("at block %d upkeep %s requested time %s DoMercuryRequest err: %s", lookup.Block, lookup.UpkeepId, lookup.Time, err.Error()) + } - if lookup.IsMercuryV02() { - state, reason, values, retryable, retryInterval, err = s.v02Client.DoRequest(ctx, lookup, pluginRetryKey) - } else if lookup.IsMercuryV03() { - state, reason, values, retryable, retryInterval, err = s.v03Client.DoRequest(ctx, lookup, pluginRetryKey) + if err := s.CheckCallback(ctx, values, lookup, checkResults, i); err != nil { + s.lggr.Errorf("at block %d upkeep %s requested time %s CheckCallback err: %s", lookup.Block, lookup.UpkeepId, lookup.Time, err.Error()) } +} +func (s *streams) CheckCallback(ctx context.Context, values [][]byte, lookup *mercury.StreamsLookup, checkResults []ocr2keepers.CheckResult, i int) error { + payload, err := s.abi.Pack("checkCallback", lookup.UpkeepId, values, lookup.ExtraData) if err != nil { - s.lggr.Errorf("at block %d upkeep %s requested time %s retryable %v retryInterval %s doMercuryRequest: %s", lookup.Block, lookup.UpkeepId, lookup.Time, retryable, retryInterval, err.Error()) - checkResults[i].Retryable = retryable - checkResults[i].RetryInterval = retryInterval - checkResults[i].PipelineExecutionState = uint8(state) - checkResults[i].IneligibilityReason = uint8(reason) - return + checkResults[i].Retryable = false + checkResults[i].PipelineExecutionState = uint8(mercury.PackUnpackDecodeFailed) + return err } - for j, v := range values { - s.lggr.Infof("at block %d upkeep %s requested time %s doMercuryRequest values[%d]: %s", lookup.Block, lookup.UpkeepId, lookup.Time, j, hexutil.Encode(v)) + var mercuryBytes hexutil.Bytes + args := map[string]interface{}{ + "to": s.registry.Address().Hex(), + "data": hexutil.Bytes(payload), } - state, retryable, mercuryBytes, err := s.checkCallback(ctx, values, lookup) - if err != nil { - s.lggr.Errorf("at block %d upkeep %s checkCallback err: %s", lookup.Block, lookup.UpkeepId, err.Error()) - checkResults[i].Retryable = retryable - checkResults[i].PipelineExecutionState = uint8(state) - return + // call checkCallback function at the block which OCR3 has agreed upon + if err = s.client.CallContext(ctx, &mercuryBytes, "eth_call", args, hexutil.EncodeUint64(lookup.Block)); err != nil { + checkResults[i].Retryable = true + checkResults[i].PipelineExecutionState = uint8(mercury.RpcFlakyFailure) + return err } + s.lggr.Infof("at block %d upkeep %s requested time %s checkCallback mercuryBytes: %s", lookup.Block, lookup.UpkeepId, lookup.Time, hexutil.Encode(mercuryBytes)) unpackCallBackState, needed, performData, failureReason, _, err := s.packer.UnpackCheckCallbackResult(mercuryBytes) if err != nil { - s.lggr.Errorf("at block %d upkeep %s requested time %s UnpackCheckCallbackResult err: %s", lookup.Block, lookup.UpkeepId, lookup.Time, err.Error()) checkResults[i].PipelineExecutionState = unpackCallBackState - return + return err } if failureReason == uint8(mercury.MercuryUpkeepFailureReasonMercuryCallbackReverted) { checkResults[i].IneligibilityReason = uint8(mercury.MercuryUpkeepFailureReasonMercuryCallbackReverted) s.lggr.Debugf("at block %d upkeep %s requested time %s mercury callback reverts", lookup.Block, lookup.UpkeepId, lookup.Time) - return + return nil + } if !needed { checkResults[i].IneligibilityReason = uint8(mercury.MercuryUpkeepFailureReasonUpkeepNotNeeded) s.lggr.Debugf("at block %d upkeep %s requested time %s callback reports upkeep not needed", lookup.Block, lookup.UpkeepId, lookup.Time) - return + return nil } checkResults[i].IneligibilityReason = uint8(mercury.MercuryUpkeepFailureReasonNone) checkResults[i].Eligible = true checkResults[i].PerformData = performData s.lggr.Infof("at block %d upkeep %s requested time %s successful with perform data: %s", lookup.Block, lookup.UpkeepId, lookup.Time, hexutil.Encode(performData)) + + return nil } -// allowedToUseMercury retrieves upkeep's administrative offchain config and decode a mercuryEnabled bool to indicate if +func (s *streams) DoMercuryRequest(ctx context.Context, lookup *mercury.StreamsLookup, checkResults []ocr2keepers.CheckResult, i int) ([][]byte, error) { + state, reason, values, retryable, retryInterval, err := mercury.NoPipelineError, mercury.MercuryUpkeepFailureReasonInvalidRevertDataInput, [][]byte{}, false, 0*time.Second, fmt.Errorf("invalid revert data input: feed param key %s, time param key %s, feeds %s", lookup.FeedParamKey, lookup.TimeParamKey, lookup.Feeds) + pluginRetryKey := generatePluginRetryKey(checkResults[i].WorkID, lookup.Block) + + if lookup.IsMercuryV02() { + state, reason, values, retryable, retryInterval, err = s.v02Client.DoRequest(ctx, lookup, pluginRetryKey) + } else if lookup.IsMercuryV03() { + state, reason, values, retryable, retryInterval, err = s.v03Client.DoRequest(ctx, lookup, pluginRetryKey) + } + + if err != nil { + checkResults[i].Retryable = retryable + checkResults[i].RetryInterval = retryInterval + checkResults[i].PipelineExecutionState = uint8(state) + checkResults[i].IneligibilityReason = uint8(reason) + return nil, err + } + + for j, v := range values { + s.lggr.Infof("at block %d upkeep %s requested time %s doMercuryRequest values[%d]: %s", lookup.Block, lookup.UpkeepId, lookup.Time, j, hexutil.Encode(v)) + } + return values, nil +} + +// AllowedToUseMercury retrieves upkeep's administrative offchain config and decode a mercuryEnabled bool to indicate if // this upkeep is allowed to use Mercury service. -func (s *streams) allowedToUseMercury(opts *bind.CallOpts, upkeepId *big.Int) (state mercury.MercuryUpkeepState, reason mercury.MercuryUpkeepFailureReason, retryable bool, allow bool, err error) { +func (s *streams) AllowedToUseMercury(opts *bind.CallOpts, upkeepId *big.Int) (state mercury.MercuryUpkeepState, reason mercury.MercuryUpkeepFailureReason, retryable bool, allow bool, err error) { allowed, ok := s.mercuryConfig.IsUpkeepAllowed(upkeepId.String()) if ok { return mercury.NoPipelineError, mercury.MercuryUpkeepFailureReasonNone, false, allowed.(bool), nil @@ -256,7 +282,6 @@ func (s *streams) allowedToUseMercury(opts *bind.CallOpts, upkeepId *big.Int) (s "data": hexutil.Bytes(payload), } - // call checkCallback function at the block which OCR3 has agreed upon if err = s.client.CallContext(opts.Context, &resultBytes, "eth_call", args, hexutil.EncodeBig(opts.BlockNumber)); err != nil { return mercury.RpcFlakyFailure, mercury.MercuryUpkeepFailureReasonNone, true, false, fmt.Errorf("failed to get upkeep privilege config: %v", err) } @@ -282,26 +307,6 @@ func (s *streams) allowedToUseMercury(opts *bind.CallOpts, upkeepId *big.Int) (s return mercury.NoPipelineError, mercury.MercuryUpkeepFailureReasonNone, false, privilegeConfig.MercuryEnabled, nil } -func (s *streams) checkCallback(ctx context.Context, values [][]byte, lookup *mercury.StreamsLookup) (mercury.MercuryUpkeepState, bool, hexutil.Bytes, error) { - payload, err := s.abi.Pack("checkCallback", lookup.UpkeepId, values, lookup.ExtraData) - if err != nil { - return mercury.PackUnpackDecodeFailed, false, nil, err - } - - var b hexutil.Bytes - args := map[string]interface{}{ - "to": s.registry.Address().Hex(), - "data": hexutil.Bytes(payload), - } - - // call checkCallback function at the block which OCR3 has agreed upon - if err := s.client.CallContext(ctx, &b, "eth_call", args, hexutil.EncodeUint64(lookup.Block)); err != nil { - return mercury.RpcFlakyFailure, true, nil, err - } - - return mercury.NoPipelineError, false, b, nil -} - func (s *streams) buildCallOpts(ctx context.Context, block *big.Int) *bind.CallOpts { opts := bind.CallOpts{ Context: ctx, diff --git a/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/mercury/streams/streams_test.go b/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/mercury/streams/streams_test.go index 32041d0f18b..caa3efb974b 100644 --- a/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/mercury/streams/streams_test.go +++ b/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/mercury/streams/streams_test.go @@ -127,6 +127,7 @@ func TestStreams_CheckCallback(t *testing.T) { tests := []struct { name string lookup *mercury.StreamsLookup + input []ocr2keepers.CheckResult values [][]byte statusCode int @@ -154,6 +155,9 @@ func TestStreams_CheckCallback(t *testing.T) { UpkeepId: upkeepId, Block: bn, }, + input: []ocr2keepers.CheckResult{ + {}, + }, values: values, statusCode: http.StatusOK, callbackResp: []byte{0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 64, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 4, 48, 120, 48, 48, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0}, @@ -186,6 +190,9 @@ func TestStreams_CheckCallback(t *testing.T) { UpkeepId: upkeepId, Block: bn, }, + input: []ocr2keepers.CheckResult{ + {}, + }, values: values, statusCode: http.StatusOK, callbackResp: []byte{0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 64, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 20, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 100, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0}, @@ -217,6 +224,9 @@ func TestStreams_CheckCallback(t *testing.T) { UpkeepId: upkeepId, Block: bn, }, + input: []ocr2keepers.CheckResult{ + {}, + }, values: values, statusCode: http.StatusOK, callbackResp: []byte{}, @@ -256,10 +266,10 @@ func TestStreams_CheckCallback(t *testing.T) { }).Once() s.client = client - state, retryable, _, err := s.checkCallback(testutils.Context(t), tt.values, tt.lookup) - tt.wantErr(t, err, fmt.Sprintf("Error asserion failed: %v", tt.name)) - assert.Equal(t, tt.state, state) - assert.Equal(t, tt.retryable, retryable) + err = s.CheckCallback(testutils.Context(t), tt.values, tt.lookup, tt.input, 0) + tt.wantErr(t, err, fmt.Sprintf("Error assertion failed: %v", tt.name)) + assert.Equal(t, uint8(tt.state), tt.input[0].PipelineExecutionState) + assert.Equal(t, tt.retryable, tt.input[0].Retryable) }) } } @@ -435,7 +445,7 @@ func TestStreams_AllowedToUseMercury(t *testing.T) { BlockNumber: big.NewInt(10), } - state, reason, retryable, allowed, err := s.allowedToUseMercury(opts, upkeepId) + state, reason, retryable, allowed, err := s.AllowedToUseMercury(opts, upkeepId) assert.Equal(t, tt.err, err) assert.Equal(t, tt.allowed, allowed) assert.Equal(t, tt.state, state)