diff --git a/core/services/ocr2/plugins/ocr2keeper/evm21/encoding/interface.go b/core/services/ocr2/plugins/ocr2keeper/evm21/encoding/interface.go index ee7074faf08..1f36cadb4d7 100644 --- a/core/services/ocr2/plugins/ocr2keeper/evm21/encoding/interface.go +++ b/core/services/ocr2/plugins/ocr2keeper/evm21/encoding/interface.go @@ -54,4 +54,6 @@ type Packer interface { UnpackLogTriggerConfig(raw []byte) (automation_utils_2_1.LogTriggerConfig, error) PackReport(report automation_utils_2_1.KeeperRegistryBase21Report) ([]byte, error) UnpackReport(raw []byte) (automation_utils_2_1.KeeperRegistryBase21Report, error) + PackGetUpkeepPrivilegeConfig(upkeepId *big.Int) ([]byte, error) + UnpackGetUpkeepPrivilegeConfig(resp []byte) ([]byte, error) } diff --git a/core/services/ocr2/plugins/ocr2keeper/evm21/encoding/packer.go b/core/services/ocr2/plugins/ocr2keeper/evm21/encoding/packer.go index 45d5736cb72..824a98172bd 100644 --- a/core/services/ocr2/plugins/ocr2keeper/evm21/encoding/packer.go +++ b/core/services/ocr2/plugins/ocr2keeper/evm21/encoding/packer.go @@ -66,6 +66,21 @@ func (p *abiPacker) UnpackCheckResult(payload ocr2keepers.UpkeepPayload, raw str return result, nil } +func (p *abiPacker) PackGetUpkeepPrivilegeConfig(upkeepId *big.Int) ([]byte, error) { + return p.abi.Pack("getUpkeepPrivilegeConfig", upkeepId) +} + +func (p *abiPacker) UnpackGetUpkeepPrivilegeConfig(resp []byte) ([]byte, error) { + out, err := p.abi.Methods["getUpkeepPrivilegeConfig"].Outputs.UnpackValues(resp) + if err != nil { + return nil, fmt.Errorf("%w: unpack getUpkeepPrivilegeConfig return", err) + } + + bts := *abi.ConvertType(out[0], new([]byte)).(*[]byte) + + return bts, nil +} + func (p *abiPacker) UnpackCheckCallbackResult(callbackResp []byte) (PipelineExecutionState, bool, []byte, uint8, *big.Int, error) { out, err := p.abi.Methods["checkCallback"].Outputs.UnpackValues(callbackResp) if err != nil { diff --git a/core/services/ocr2/plugins/ocr2keeper/evm21/encoding/packer_test.go b/core/services/ocr2/plugins/ocr2keeper/evm21/encoding/packer_test.go index ccc84765baa..b333a695bf8 100644 --- a/core/services/ocr2/plugins/ocr2keeper/evm21/encoding/packer_test.go +++ b/core/services/ocr2/plugins/ocr2keeper/evm21/encoding/packer_test.go @@ -1,6 +1,7 @@ package encoding import ( + "encoding/json" "fmt" "math/big" "strings" @@ -355,6 +356,97 @@ func TestPacker_PackReport_UnpackReport(t *testing.T) { assert.Equal(t, hexutil.Encode(res), expected) } +func TestPacker_PackGetUpkeepPrivilegeConfig(t *testing.T) { + tests := []struct { + name string + upkeepId *big.Int + raw []byte + errored bool + }{ + { + name: "happy path", + upkeepId: func() *big.Int { + id, _ := new(big.Int).SetString("52236098515066839510538748191966098678939830769967377496848891145101407612976", 10) + + return id + }(), + raw: func() []byte { + b, _ := hexutil.Decode("0x19d97a94737c9583000000000000000000000001ea8ed6d0617dd5b3b87374020efaf030") + + return b + }(), + errored: false, + }, + } + + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + packer, err := newPacker() + require.NoError(t, err, "valid packer required for test") + + b, err := packer.PackGetUpkeepPrivilegeConfig(test.upkeepId) + + if !test.errored { + require.NoError(t, err, "no error expected from packing") + + assert.Equal(t, test.raw, b, "raw bytes for output should match expected") + } else { + assert.NotNil(t, err, "error expected from packing function") + } + }) + } +} + +func TestPacker_UnpackGetUpkeepPrivilegeConfig(t *testing.T) { + tests := []struct { + name string + raw []byte + errored bool + }{ + { + name: "happy path", + raw: func() []byte { + b, _ := hexutil.Decode("0x000000000000000000000000000000000000000000000000000000000000002000000000000000000000000000000000000000000000000000000000000000177b226d657263757279456e61626c6564223a747275657d000000000000000000") + + return b + }(), + errored: false, + }, + { + name: "error empty config", + raw: func() []byte { + b, _ := hexutil.Decode("0x") + + return b + }(), + errored: true, + }, + } + + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + packer, err := newPacker() + require.NoError(t, err, "valid packer required for test") + + b, err := packer.UnpackGetUpkeepPrivilegeConfig(test.raw) + + if !test.errored { + require.NoError(t, err, "should unpack bytes from abi encoded value") + + // the actual struct to unmarshal into is not available to this + // package so basic json encoding is the limit of the following test + var data map[string]interface{} + err = json.Unmarshal(b, &data) + + assert.NoError(t, err, "packed data should unmarshal using json encoding") + assert.Equal(t, []byte(`{"mercuryEnabled":true}`), b) + } else { + assert.NotNil(t, err, "error expected from unpack function") + } + }) + } +} + func newPacker() (*abiPacker, error) { keepersABI, err := abi.JSON(strings.NewReader(iregistry21.IKeeperRegistryMasterABI)) if err != nil { diff --git a/core/services/ocr2/plugins/ocr2keeper/evm21/streams_lookup.go b/core/services/ocr2/plugins/ocr2keeper/evm21/streams_lookup.go index c7345e4ed2f..1651e2251da 100644 --- a/core/services/ocr2/plugins/ocr2keeper/evm21/streams_lookup.go +++ b/core/services/ocr2/plugins/ocr2keeper/evm21/streams_lookup.go @@ -220,22 +220,44 @@ func (r *EvmRegistry) allowedToUseMercury(opts *bind.CallOpts, upkeepId *big.Int return encoding.NoPipelineError, encoding.UpkeepFailureReasonNone, false, allowed.(bool), nil } - cfg, err := r.registry.GetUpkeepPrivilegeConfig(opts, upkeepId) + payload, err := r.packer.PackGetUpkeepPrivilegeConfig(upkeepId) + if err != nil { + // pack error, no retryable + r.lggr.Warnf("failed to pack getUpkeepPrivilegeConfig data for upkeepId %s: %s", upkeepId, err) + + return encoding.PackUnpackDecodeFailed, encoding.UpkeepFailureReasonNone, false, false, fmt.Errorf("failed to pack upkeepId: %w", err) + } + + var resultBytes hexutil.Bytes + args := map[string]interface{}{ + "to": r.addr.Hex(), + "data": hexutil.Bytes(payload), + } + + // call checkCallback function at the block which OCR3 has agreed upon + err = r.client.CallContext(opts.Context, &resultBytes, "eth_call", args, opts.BlockNumber) if err != nil { return encoding.RpcFlakyFailure, encoding.UpkeepFailureReasonNone, true, false, fmt.Errorf("failed to get upkeep privilege config: %v", err) } + + cfg, err := r.packer.UnpackGetUpkeepPrivilegeConfig(resultBytes) + if err != nil { + return encoding.PackUnpackDecodeFailed, encoding.UpkeepFailureReasonNone, false, false, fmt.Errorf("failed to get upkeep privilege config: %v", err) + } + if len(cfg) == 0 { r.mercury.allowListCache.Set(upkeepId.String(), false, cache.DefaultExpiration) return encoding.NoPipelineError, encoding.UpkeepFailureReasonMercuryAccessNotAllowed, false, false, fmt.Errorf("upkeep privilege config is empty") } - var a UpkeepPrivilegeConfig - err = json.Unmarshal(cfg, &a) - if err != nil { + var privilegeConfig UpkeepPrivilegeConfig + if err := json.Unmarshal(cfg, &privilegeConfig); err != nil { return encoding.MercuryUnmarshalError, encoding.UpkeepFailureReasonNone, false, false, fmt.Errorf("failed to unmarshal privilege config: %v", err) } - r.mercury.allowListCache.Set(upkeepId.String(), a.MercuryEnabled, cache.DefaultExpiration) - return encoding.NoPipelineError, encoding.UpkeepFailureReasonNone, false, a.MercuryEnabled, nil + + r.mercury.allowListCache.Set(upkeepId.String(), privilegeConfig.MercuryEnabled, cache.DefaultExpiration) + + return encoding.NoPipelineError, encoding.UpkeepFailureReasonNone, false, privilegeConfig.MercuryEnabled, nil } // decodeStreamsLookup decodes the revert error StreamsLookup(string feedParamKey, string[] feeds, string feedParamKey, uint256 time, byte[] extraData) diff --git a/core/services/ocr2/plugins/ocr2keeper/evm21/streams_lookup_test.go b/core/services/ocr2/plugins/ocr2keeper/evm21/streams_lookup_test.go index 95b43c98195..95e81b2e1c8 100644 --- a/core/services/ocr2/plugins/ocr2keeper/evm21/streams_lookup_test.go +++ b/core/services/ocr2/plugins/ocr2keeper/evm21/streams_lookup_test.go @@ -12,6 +12,7 @@ import ( "testing" "github.com/ethereum/go-ethereum/accounts/abi" + "github.com/ethereum/go-ethereum/accounts/abi/bind" "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/common/hexutil" "github.com/patrickmn/go-cache" @@ -184,14 +185,30 @@ func TestEvmRegistry_StreamsLookup(t *testing.T) { for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { r := setupEVMRegistry(t) + client := new(evmClientMocks.Client) + r.client = client if !tt.cachedAdminCfg && !tt.hasError { - mockReg := mocks.NewRegistry(t) cfg := UpkeepPrivilegeConfig{MercuryEnabled: tt.hasPermission} - b, err := json.Marshal(cfg) - assert.Nil(t, err) - mockReg.On("GetUpkeepPrivilegeConfig", mock.Anything, upkeepId).Return(b, nil) - r.registry = mockReg + bCfg, err := json.Marshal(cfg) + require.Nil(t, err) + + bContractCfg, err := r.abi.Methods["getUpkeepPrivilegeConfig"].Outputs.PackValues([]interface{}{bCfg}) + require.Nil(t, err) + + payload, err := r.abi.Pack("getUpkeepPrivilegeConfig", upkeepId) + require.Nil(t, err) + + args := map[string]interface{}{ + "to": r.addr.Hex(), + "data": hexutil.Bytes(payload), + } + + client.On("CallContext", mock.Anything, mock.AnythingOfType("*hexutil.Bytes"), "eth_call", args, mock.AnythingOfType("*big.Int")).Return(nil). + Run(func(args mock.Arguments) { + b := args.Get(1).(*hexutil.Bytes) + *b = bContractCfg + }).Once() } if len(tt.blobs) > 0 { @@ -227,13 +244,11 @@ func TestEvmRegistry_StreamsLookup(t *testing.T) { "to": r.addr.Hex(), "data": hexutil.Bytes(payload), } - client := new(evmClientMocks.Client) client.On("CallContext", mock.Anything, mock.AnythingOfType("*hexutil.Bytes"), "eth_call", args, hexutil.EncodeUint64(uint64(blockNum))).Return(nil). Run(func(args mock.Arguments) { b := args.Get(1).(*hexutil.Bytes) *b = tt.checkCallbackResp }).Once() - r.client = client } got := r.streamsLookup(context.Background(), tt.input) @@ -337,24 +352,59 @@ func TestEvmRegistry_AllowedToUseMercury(t *testing.T) { t.Run(tt.name, func(t *testing.T) { r := setupEVMRegistry(t) + client := new(evmClientMocks.Client) + r.client = client + if tt.cached { r.mercury.allowListCache.Set(upkeepId.String(), tt.allowed, cache.DefaultExpiration) } else { if tt.err != nil { - mockReg := mocks.NewRegistry(t) - mockReg.On("GetUpkeepPrivilegeConfig", mock.Anything, upkeepId).Return(tt.config, tt.ethCallErr) - r.registry = mockReg + bContractCfg, err := r.abi.Methods["getUpkeepPrivilegeConfig"].Outputs.PackValues([]interface{}{tt.config}) + require.Nil(t, err) + + payload, err := r.abi.Pack("getUpkeepPrivilegeConfig", upkeepId) + require.Nil(t, err) + + args := map[string]interface{}{ + "to": r.addr.Hex(), + "data": hexutil.Bytes(payload), + } + + client.On("CallContext", mock.Anything, mock.AnythingOfType("*hexutil.Bytes"), "eth_call", args, mock.AnythingOfType("*big.Int")). + Return(tt.ethCallErr). + Run(func(args mock.Arguments) { + b := args.Get(1).(*hexutil.Bytes) + *b = bContractCfg + }).Once() } else { - mockReg := mocks.NewRegistry(t) cfg := UpkeepPrivilegeConfig{MercuryEnabled: tt.allowed} - b, err := json.Marshal(cfg) - assert.Nil(t, err) - mockReg.On("GetUpkeepPrivilegeConfig", mock.Anything, upkeepId).Return(b, nil) - r.registry = mockReg + bCfg, err := json.Marshal(cfg) + require.Nil(t, err) + + bContractCfg, err := r.abi.Methods["getUpkeepPrivilegeConfig"].Outputs.PackValues([]interface{}{bCfg}) + require.Nil(t, err) + + payload, err := r.abi.Pack("getUpkeepPrivilegeConfig", upkeepId) + require.Nil(t, err) + + args := map[string]interface{}{ + "to": r.addr.Hex(), + "data": hexutil.Bytes(payload), + } + + client.On("CallContext", mock.Anything, mock.AnythingOfType("*hexutil.Bytes"), "eth_call", args, mock.AnythingOfType("*big.Int")).Return(nil). + Run(func(args mock.Arguments) { + b := args.Get(1).(*hexutil.Bytes) + *b = bContractCfg + }).Once() } } - state, reason, retryable, allowed, err := r.allowedToUseMercury(nil, upkeepId) + opts := &bind.CallOpts{ + BlockNumber: big.NewInt(10), + } + + state, reason, retryable, allowed, err := r.allowedToUseMercury(opts, upkeepId) assert.Equal(t, tt.err, err) assert.Equal(t, tt.allowed, allowed) assert.Equal(t, tt.state, state) diff --git a/core/services/ocr2/plugins/ocr2keeper/integration_21_test.go b/core/services/ocr2/plugins/ocr2keeper/integration_21_test.go index 7d69ea6522d..61748e83ae8 100644 --- a/core/services/ocr2/plugins/ocr2keeper/integration_21_test.go +++ b/core/services/ocr2/plugins/ocr2keeper/integration_21_test.go @@ -7,6 +7,7 @@ import ( "encoding/json" "fmt" "math/big" + "net/http" "strings" "sync" "testing" @@ -34,11 +35,13 @@ import ( "github.com/smartcontractkit/chainlink/v2/core/chains/evm/logpoller" automationForwarderLogic "github.com/smartcontractkit/chainlink/v2/core/gethwrappers/generated/automation_forwarder_logic" "github.com/smartcontractkit/chainlink/v2/core/gethwrappers/generated/basic_upkeep_contract" + "github.com/smartcontractkit/chainlink/v2/core/gethwrappers/generated/dummy_protocol_wrapper" iregistry21 "github.com/smartcontractkit/chainlink/v2/core/gethwrappers/generated/i_keeper_registry_master_wrapper_2_1" registrylogica21 "github.com/smartcontractkit/chainlink/v2/core/gethwrappers/generated/keeper_registry_logic_a_wrapper_2_1" registrylogicb21 "github.com/smartcontractkit/chainlink/v2/core/gethwrappers/generated/keeper_registry_logic_b_wrapper_2_1" registry21 "github.com/smartcontractkit/chainlink/v2/core/gethwrappers/generated/keeper_registry_wrapper_2_1" "github.com/smartcontractkit/chainlink/v2/core/gethwrappers/generated/link_token_interface" + "github.com/smartcontractkit/chainlink/v2/core/gethwrappers/generated/log_triggered_streams_lookup_wrapper" "github.com/smartcontractkit/chainlink/v2/core/gethwrappers/generated/log_upkeep_counter_wrapper" "github.com/smartcontractkit/chainlink/v2/core/gethwrappers/generated/mock_v3_aggregator_contract" "github.com/smartcontractkit/chainlink/v2/core/internal/cltest" @@ -47,6 +50,7 @@ import ( "github.com/smartcontractkit/chainlink/v2/core/services/job" "github.com/smartcontractkit/chainlink/v2/core/services/keystore/keys/ethkey" "github.com/smartcontractkit/chainlink/v2/core/services/ocr2/plugins/ocr2keeper" + evm21 "github.com/smartcontractkit/chainlink/v2/core/services/ocr2/plugins/ocr2keeper/evm21" "github.com/smartcontractkit/chainlink/v2/core/services/pipeline" "github.com/smartcontractkit/chainlink/v2/core/services/relay/evm" ) @@ -110,7 +114,7 @@ func TestIntegration_KeeperPluginConditionalUpkeep(t *testing.T) { require.NoError(t, err) registry := deployKeeper21Registry(t, steve, backend, linkAddr, linkFeedAddr, gasFeedAddr) - nodes := setupNodes(t, nodeKeys, registry, backend, steve) + nodes, _ := setupNodes(t, nodeKeys, registry, backend, steve) <-time.After(time.Second * 5) @@ -195,10 +199,9 @@ func TestIntegration_KeeperPluginLogUpkeep(t *testing.T) { require.NoError(t, err) linkFeedAddr, _, _, err := mock_v3_aggregator_contract.DeployMockV3AggregatorContract(steve, backend, 18, big.NewInt(2000000000000000000)) require.NoError(t, err) - registry := deployKeeper21Registry(t, steve, backend, linkAddr, linkFeedAddr, gasFeedAddr) - - nodes := setupNodes(t, nodeKeys, registry, backend, steve) + registry := deployKeeper21Registry(t, steve, backend, linkAddr, linkFeedAddr, gasFeedAddr) + nodes, _ := setupNodes(t, nodeKeys, registry, backend, steve) upkeeps := 1 _, err = linkToken.Transfer(sergey, carrol.From, big.NewInt(0).Mul(oneHunEth, big.NewInt(int64(upkeeps+1)))) @@ -254,6 +257,124 @@ func TestIntegration_KeeperPluginLogUpkeep(t *testing.T) { }) } +func TestIntegration_KeeperPluginLogUpkeep_Retry(t *testing.T) { + g := gomega.NewWithT(t) + + // setup blockchain + linkOwner := testutils.MustNewSimTransactor(t) // owns all the link + registryOwner := testutils.MustNewSimTransactor(t) // registry owner + upkeepOwner := testutils.MustNewSimTransactor(t) // upkeep owner + genesisData := core.GenesisAlloc{ + linkOwner.From: {Balance: assets.Ether(10000).ToInt()}, + registryOwner.From: {Balance: assets.Ether(10000).ToInt()}, + upkeepOwner.From: {Balance: assets.Ether(10000).ToInt()}, + } + + // Generate 5 keys for nodes (1 bootstrap + 4 ocr nodes) and fund them with ether + var nodeKeys [5]ethkey.KeyV2 + for i := int64(0); i < 5; i++ { + nodeKeys[i] = cltest.MustGenerateRandomKey(t) + genesisData[nodeKeys[i].Address] = core.GenesisAccount{Balance: assets.Ether(1000).ToInt()} + } + + backend := cltest.NewSimulatedBackend(t, genesisData, uint32(ethconfig.Defaults.Miner.GasCeil)) + stopMining := cltest.Mine(backend, 3*time.Second) // Should be greater than deltaRound since we cannot access old blocks on simulated blockchain + defer stopMining() + + // Deploy registry + linkAddr, _, linkToken, err := link_token_interface.DeployLinkToken(linkOwner, backend) + require.NoError(t, err) + + gasFeedAddr, _, _, err := mock_v3_aggregator_contract.DeployMockV3AggregatorContract(registryOwner, backend, 18, big.NewInt(60000000000)) + require.NoError(t, err) + + linkFeedAddr, _, _, err := mock_v3_aggregator_contract.DeployMockV3AggregatorContract(registryOwner, backend, 18, big.NewInt(2000000000000000000)) + require.NoError(t, err) + + registry := deployKeeper21Registry(t, registryOwner, backend, linkAddr, linkFeedAddr, gasFeedAddr) + + nodes, mercuryServer := setupNodes(t, nodeKeys, registry, backend, registryOwner) + + const upkeepCount = 10 + const mercuryFailCount = upkeepCount * 3 * 2 + + // testing with the mercury server involves mocking responses. currently, + // there is not a way to connect a mercury call to an upkeep id (though we + // could add custom headers) so the test must be fairly basic and just + // count calls before switching to successes + var ( + mu sync.Mutex + count int + ) + + mercuryServer.RegisterHandler(func(w http.ResponseWriter, r *http.Request) { + mu.Lock() + defer mu.Unlock() + + count++ + + _ = r.ParseForm() + + t.Logf("MercuryHTTPServe:RequestURI: %s", r.RequestURI) + + for key, value := range r.Form { + t.Logf("MercuryHTTPServe:FormValue: key: %s; value: %s;", key, value) + } + + // the streams lookup retries against the remote server 3 times before + // returning a result as retryable. + // the simulation here should force the streams lookup process to return + // retryable 2 times. + // the total count of failures should be (upkeepCount * 3 * tryCount) + if count <= mercuryFailCount { + w.WriteHeader(http.StatusNotFound) + + return + } + + // start sending success messages + output := `{"chainlinkBlob":"0x0001c38d71fed6c320b90e84b6f559459814d068e2a1700adc931ca9717d4fe70000000000000000000000000000000000000000000000000000000001a80b52b4bf1233f9cb71144a253a1791b202113c4ab4a92fa1b176d684b4959666ff8200000000000000000000000000000000000000000000000000000000000000e000000000000000000000000000000000000000000000000000000000000002000000000000000000000000000000000000000000000000000000000000000260000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000001004254432d5553442d415242495452554d2d544553544e4554000000000000000000000000000000000000000000000000000000000000000000000000645570be000000000000000000000000000000000000000000000000000002af2b818dc5000000000000000000000000000000000000000000000000000002af2426faf3000000000000000000000000000000000000000000000000000002af32dc209700000000000000000000000000000000000000000000000000000000012130f8df0a9745bb6ad5e2df605e158ba8ad8a33ef8a0acf9851f0f01668a3a3f2b68600000000000000000000000000000000000000000000000000000000012130f60000000000000000000000000000000000000000000000000000000000000002c4a7958dce105089cf5edb68dad7dcfe8618d7784eb397f97d5a5fade78c11a58275aebda478968e545f7e3657aba9dcbe8d44605e4c6fde3e24edd5e22c94270000000000000000000000000000000000000000000000000000000000000002459c12d33986018a8959566d145225f0c4a4e61a9a3f50361ccff397899314f0018162cf10cd89897635a0bb62a822355bd199d09f4abe76e4d05261bb44733d"}` + + w.WriteHeader(http.StatusOK) + _, _ = w.Write([]byte(output)) + }) + + defer mercuryServer.Stop() + + _, err = linkToken.Transfer(linkOwner, upkeepOwner.From, big.NewInt(0).Mul(oneHunEth, big.NewInt(int64(upkeepCount+1)))) + require.NoError(t, err) + + backend.Commit() + + feeds, err := newFeedLookupUpkeepController(backend, registryOwner) + require.NoError(t, err, "no error expected from creating a feed lookup controller") + + // deploy multiple upkeeps that listen to a log emitter and need to be + // performed for each log event + _ = feeds.DeployUpkeeps(t, backend, upkeepOwner, upkeepCount) + _ = feeds.RegisterAndFund(t, registry, registryOwner, backend, linkToken) + _ = feeds.EnableMercury(t, backend, registry, registryOwner) + _ = feeds.VerifyEnv(t, backend, registry, registryOwner) + + // start emitting events in a separate go-routine + // feed lookup relies on a single contract event log to perform multiple + // listener contracts + go func() { + // only 1 event is necessary to make all 10 upkeeps eligible + _ = feeds.EmitEvents(t, backend, 1, func() { + // pause per emit for expected block production time + time.Sleep(3 * time.Second) + }) + }() + + listener, done := listenPerformed(t, backend, registry, feeds.UpkeepsIds(), int64(1)) + g.Eventually(listener, testutils.WaitTimeout(t)-(5*time.Second), cltest.DBPollingInterval).Should(gomega.BeTrue()) + + done() + + _ = checkPipelineRuns(t, nodes, 1*len(nodes)) // TODO: TBD +} + func waitPipelineRuns(t *testing.T, nodes []Node, n int, timeout, interval time.Duration) { ctx, cancel := context.WithTimeout(testutils.Context(t), timeout) defer cancel() @@ -311,29 +432,37 @@ func listenPerformed(t *testing.T, backend *backends.SimulatedBackend, registry cache := &sync.Map{} ctx, cancel := context.WithCancel(testutils.Context(t)) start := startBlock + go func() { for ctx.Err() == nil { bl := backend.Blockchain().CurrentBlock().Number.Uint64() + sc := make([]bool, len(ids)) for i := range sc { sc[i] = true } + iter, err := registry.FilterUpkeepPerformed(&bind.FilterOpts{ Start: uint64(start), End: &bl, Context: ctx, }, ids, sc) + if ctx.Err() != nil { return } + require.NoError(t, err) + for iter.Next() { if iter.Event != nil { t.Logf("[automation-ocr3 | EvmRegistry] upkeep performed event emitted for id %s", iter.Event.Id.String()) cache.Store(iter.Event.Id.String(), true) } } + require.NoError(t, iter.Close()) + time.Sleep(time.Second) } }() @@ -341,11 +470,14 @@ func listenPerformed(t *testing.T, backend *backends.SimulatedBackend, registry return mapListener(cache, 0), cancel } -func setupNodes(t *testing.T, nodeKeys [5]ethkey.KeyV2, registry *iregistry21.IKeeperRegistryMaster, backend *backends.SimulatedBackend, usr *bind.TransactOpts) []Node { +func setupNodes(t *testing.T, nodeKeys [5]ethkey.KeyV2, registry *iregistry21.IKeeperRegistryMaster, backend *backends.SimulatedBackend, usr *bind.TransactOpts) ([]Node, *SimulatedMercuryServer) { lggr := logger.TestLogger(t) + mServer := NewSimulatedMercuryServer() + mServer.Start() + // Setup bootstrap + oracle nodes bootstrapNodePort := int64(19599) - appBootstrap, bootstrapPeerID, bootstrapTransmitter, bootstrapKb := setupNode(t, bootstrapNodePort, "bootstrap_keeper_ocr", nodeKeys[0], backend, nil) + appBootstrap, bootstrapPeerID, bootstrapTransmitter, bootstrapKb := setupNode(t, bootstrapNodePort, "bootstrap_keeper_ocr", nodeKeys[0], backend, nil, mServer) bootstrapNode := Node{ appBootstrap, bootstrapTransmitter, bootstrapKb, } @@ -358,7 +490,7 @@ func setupNodes(t *testing.T, nodeKeys [5]ethkey.KeyV2, registry *iregistry21.IK app, peerID, transmitter, kb := setupNode(t, bootstrapNodePort+i+1, fmt.Sprintf("oracle_keeper%d", i), nodeKeys[i+1], backend, []commontypes.BootstrapperLocator{ // Supply the bootstrap IP and port as a V2 peer address {PeerID: bootstrapPeerID, Addrs: []string{fmt.Sprintf("127.0.0.1:%d", bootstrapNodePort)}}, - }) + }, mServer) nodes = append(nodes, Node{ app, transmitter, kb, @@ -431,7 +563,7 @@ func setupNodes(t *testing.T, nodeKeys [5]ethkey.KeyV2, registry *iregistry21.IK "fallbackLinkPrice": big.NewInt(2000000000000000000), "transcoder": testutils.NewAddress(), "registrars": []common.Address{testutils.NewAddress()}, - "upkeepPrivilegeManager": testutils.NewAddress(), + "upkeepPrivilegeManager": usr.From, }, configType) require.NoError(t, err) rawCfg, err := json.Marshal(config.OffchainConfig{ @@ -489,7 +621,7 @@ func setupNodes(t *testing.T, nodeKeys [5]ethkey.KeyV2, registry *iregistry21.IK require.NoError(t, err) backend.Commit() - return nodes + return nodes, mServer } func deployUpkeeps(t *testing.T, backend *backends.SimulatedBackend, carrol, steve *bind.TransactOpts, linkToken *link_token_interface.LinkToken, registry *iregistry21.IKeeperRegistryMaster, n int) ([]*big.Int, []common.Address, []*log_upkeep_counter_wrapper.LogUpkeepCounter) { @@ -592,3 +724,297 @@ func getUpkeepIdFromTx21(t *testing.T, registry *iregistry21.IKeeperRegistryMast require.NoError(t, err) return parsedLog.Id } + +// ------- below this line could be added to a test helpers package +type registerAndFundFunc func(*testing.T, common.Address, *bind.TransactOpts, uint8, []byte) *big.Int + +func registerAndFund( + registry *iregistry21.IKeeperRegistryMaster, + registryOwner *bind.TransactOpts, + backend *backends.SimulatedBackend, + linkToken *link_token_interface.LinkToken, +) registerAndFundFunc { + return func(t *testing.T, upkeepAddr common.Address, upkeepOwner *bind.TransactOpts, trigger uint8, config []byte) *big.Int { + // register the upkeep on the host registry contract + registrationTx, err := registry.RegisterUpkeep( + registryOwner, + upkeepAddr, + 2_500_000, + upkeepOwner.From, + trigger, + []byte{}, + config, + []byte{}, + ) + require.NoError(t, err) + + backend.Commit() + + receipt, err := backend.TransactionReceipt(testutils.Context(t), registrationTx.Hash()) + require.NoError(t, err) + + parsedLog, err := registry.ParseUpkeepRegistered(*receipt.Logs[0]) + require.NoError(t, err) + + upkeepID := parsedLog.Id + + // Fund the upkeep + _, err = linkToken.Approve(upkeepOwner, registry.Address(), oneHunEth) + require.NoError(t, err) + + _, err = registry.AddFunds(upkeepOwner, upkeepID, oneHunEth) + require.NoError(t, err) + + backend.Commit() + + return upkeepID + } +} + +type feedLookupUpkeepController struct { + // address for dummy protocol + logSrcAddr common.Address + // dummy protocol is a log event source + protocol *dummy_protocol_wrapper.DummyProtocol + protocolOwner *bind.TransactOpts + // log trigger listener contracts react to logs produced from protocol + count int + upkeepIds []*big.Int + addresses []common.Address + contracts []*log_triggered_streams_lookup_wrapper.LogTriggeredStreamsLookup + contractsOwner *bind.TransactOpts +} + +func newFeedLookupUpkeepController( + backend *backends.SimulatedBackend, + protocolOwner *bind.TransactOpts, +) (*feedLookupUpkeepController, error) { + addr, _, contract, err := dummy_protocol_wrapper.DeployDummyProtocol(protocolOwner, backend) + if err != nil { + return nil, err + } + + backend.Commit() + + return &feedLookupUpkeepController{ + logSrcAddr: addr, + protocol: contract, + protocolOwner: protocolOwner, + }, nil +} + +func (c *feedLookupUpkeepController) DeployUpkeeps( + t *testing.T, + backend *backends.SimulatedBackend, + owner *bind.TransactOpts, + count int, +) error { + addresses := make([]common.Address, count) + contracts := make([]*log_triggered_streams_lookup_wrapper.LogTriggeredStreamsLookup, count) + + // deploy n upkeep contracts + for x := 0; x < count; x++ { + addr, _, contract, err := log_triggered_streams_lookup_wrapper.DeployLogTriggeredStreamsLookup( + owner, + backend, + false, + false, + ) + + if err != nil { + require.NoError(t, err, "test dependent on contract deployment") + + return err + } + + addresses[x] = addr + contracts[x] = contract + } + + backend.Commit() + + c.count = count + c.addresses = addresses + c.contracts = contracts + c.contractsOwner = owner + + return nil +} + +func (c *feedLookupUpkeepController) RegisterAndFund( + t *testing.T, + registry *iregistry21.IKeeperRegistryMaster, + registryOwner *bind.TransactOpts, + backend *backends.SimulatedBackend, + linkToken *link_token_interface.LinkToken, +) error { + ids := make([]*big.Int, len(c.contracts)) + + t.Logf("address: %s", c.logSrcAddr.Hex()) + + logTriggerConfigType := abi.MustNewType("tuple(address contractAddress, uint8 filterSelector, bytes32 topic0, bytes32 topic1, bytes32 topic2, bytes32 topic3)") + config, err := abi.Encode(map[string]interface{}{ + "contractAddress": c.logSrcAddr, + "filterSelector": 0, // no indexed topics filtered + "topic0": "0xd1ffe9e45581c11d7d9f2ed5f75217cd4be9f8b7eee6af0f6d03f46de53956cd", // LimitOrderExecuted event for dummy protocol + "topic1": "0x", + "topic2": "0x", + "topic3": "0x", + }, logTriggerConfigType) + + require.NoError(t, err) + + registerFunc := registerAndFund(registry, registryOwner, backend, linkToken) + + for x := range c.contracts { + ids[x] = registerFunc(t, c.addresses[x], c.contractsOwner, 1, config) + } + + c.upkeepIds = ids + + return nil +} + +func (c *feedLookupUpkeepController) EnableMercury( + t *testing.T, + backend *backends.SimulatedBackend, + registry *iregistry21.IKeeperRegistryMaster, + registryOwner *bind.TransactOpts, +) error { + adminBytes, _ := json.Marshal(evm21.UpkeepPrivilegeConfig{ + MercuryEnabled: true, + }) + + for _, id := range c.upkeepIds { + if _, err := registry.SetUpkeepPrivilegeConfig(registryOwner, id, adminBytes); err != nil { + require.NoError(t, err) + + return err + } + + callOpts := &bind.CallOpts{ + Pending: true, + From: registryOwner.From, + Context: context.Background(), + } + + bts, err := registry.GetUpkeepPrivilegeConfig(callOpts, id) + if err != nil { + require.NoError(t, err) + + return err + } + + var checkBytes evm21.UpkeepPrivilegeConfig + if err := json.Unmarshal(bts, &checkBytes); err != nil { + require.NoError(t, err) + + return err + } + + require.True(t, checkBytes.MercuryEnabled) + } + + bl, _ := backend.BlockByHash(testutils.Context(t), backend.Commit()) + t.Logf("block number after mercury enabled: %d", bl.NumberU64()) + + return nil +} + +func (c *feedLookupUpkeepController) VerifyEnv( + t *testing.T, + backend *backends.SimulatedBackend, + registry *iregistry21.IKeeperRegistryMaster, + registryOwner *bind.TransactOpts, +) error { + t.Log("verifying number of active upkeeps") + + ids, err := registry.GetActiveUpkeepIDs(&bind.CallOpts{ + Context: testutils.Context(t), + From: registryOwner.From, + }, big.NewInt(0), big.NewInt(100)) + + require.NoError(t, err) + require.Len(t, ids, c.count, "active upkeep ids does not match count") + require.Len(t, ids, len(c.upkeepIds)) + + t.Log("verifying total number of contracts") + require.Len(t, c.contracts, len(c.upkeepIds), "one contract for each upkeep id expected") + + // call individual contracts to see that they revert + for _, contract := range c.contracts { + _, err := contract.CheckLog(c.contractsOwner, log_triggered_streams_lookup_wrapper.Log{ + Index: big.NewInt(0), + TxIndex: big.NewInt(0), + TxHash: common.HexToHash("0x1"), + BlockNumber: big.NewInt(0), + BlockHash: common.HexToHash("0x14"), + Source: common.HexToAddress("0x2"), + Topics: [][32]byte{ + common.HexToHash("0xd1ffe9e45581c11d7d9f2ed5f75217cd4be9f8b7eee6af0f6d03f46de53956cd"), // matches executedSig and should result in a feedlookup revert + common.HexToHash("0x"), + common.HexToHash("0x"), + common.HexToHash("0x"), + }, + Data: []byte{}, + }, []byte("0x")) + + require.Error(t, err, "check log contract call should revert: %s", err) + } + + return nil +} + +func (c *feedLookupUpkeepController) EmitEvents( + t *testing.T, + backend *backends.SimulatedBackend, + count int, + afterEmit func(), +) error { + ctx := testutils.Context(t) + + for i := 0; i < count && ctx.Err() == nil; i++ { + _, err := c.protocol.ExecuteLimitOrder(c.protocolOwner, big.NewInt(1000), big.NewInt(10000), c.logSrcAddr) + require.NoError(t, err, "no error expected from limit order exec") + + if err != nil { + return err + } + + backend.Commit() + + // verify event was emitted + block, _ := backend.BlockByHash(context.Background(), backend.Commit()) + t.Logf("block number after emit event: %d", block.NumberU64()) + + iter, _ := c.protocol.FilterLimitOrderExecuted( + &bind.FilterOpts{ + Context: testutils.Context(t), + Start: block.NumberU64() - 1, + }, + []*big.Int{big.NewInt(1000)}, + []*big.Int{big.NewInt(10000)}, + []common.Address{c.logSrcAddr}, + ) + + var eventEmitted bool + for iter.Next() { + if iter.Event != nil { + eventEmitted = true + } + } + + require.True(t, eventEmitted, "event expected on backend") + if !eventEmitted { + return fmt.Errorf("event was not emitted") + } + + afterEmit() + } + + return nil +} + +func (c *feedLookupUpkeepController) UpkeepsIds() []*big.Int { + return c.upkeepIds +} diff --git a/core/services/ocr2/plugins/ocr2keeper/integration_test.go b/core/services/ocr2/plugins/ocr2keeper/integration_test.go index f51aa43e199..eea9c1574cf 100644 --- a/core/services/ocr2/plugins/ocr2keeper/integration_test.go +++ b/core/services/ocr2/plugins/ocr2keeper/integration_test.go @@ -7,7 +7,10 @@ import ( "encoding/json" "fmt" "math/big" + "net/http" + "net/http/httptest" "strings" + "sync" "testing" "time" @@ -110,6 +113,7 @@ func setupNode( nodeKey ethkey.KeyV2, backend *backends.SimulatedBackend, p2pV2Bootstrappers []commontypes.BootstrapperLocator, + mercury MercuryEndpoint, ) (chainlink.Application, string, common.Address, ocr2key.KeyBundle) { p2pKey, err := p2pkey.NewV2() require.NoError(t, err) @@ -135,10 +139,10 @@ func setupNode( c.EVM[0].GasEstimator.Mode = ptr("FixedPrice") s.Mercury.Credentials = map[string]toml.MercuryCredentials{ MercuryCredName: { - LegacyURL: models.MustSecretURL("https://old.api.link"), - URL: models.MustSecretURL("https://new.api.link"), - Username: models.NewSecret("username1"), - Password: models.NewSecret("password1"), + LegacyURL: models.MustSecretURL(mercury.URL()), + URL: models.MustSecretURL(mercury.URL()), + Username: models.NewSecret(mercury.Username()), + Password: models.NewSecret(mercury.Password()), }, } }) @@ -235,7 +239,7 @@ func TestIntegration_KeeperPluginBasic(t *testing.T) { // Setup bootstrap + oracle nodes bootstrapNodePort := int64(19599) - appBootstrap, bootstrapPeerID, bootstrapTransmitter, bootstrapKb := setupNode(t, bootstrapNodePort, "bootstrap_keeper_ocr", nodeKeys[0], backend, nil) + appBootstrap, bootstrapPeerID, bootstrapTransmitter, bootstrapKb := setupNode(t, bootstrapNodePort, "bootstrap_keeper_ocr", nodeKeys[0], backend, nil, NewSimulatedMercuryServer()) bootstrapNode := Node{ appBootstrap, bootstrapTransmitter, bootstrapKb, } @@ -248,7 +252,7 @@ func TestIntegration_KeeperPluginBasic(t *testing.T) { app, peerID, transmitter, kb := setupNode(t, bootstrapNodePort+i+1, fmt.Sprintf("oracle_keeper%d", i), nodeKeys[i+1], backend, []commontypes.BootstrapperLocator{ // Supply the bootstrap IP and port as a V2 peer address {PeerID: bootstrapPeerID, Addrs: []string{fmt.Sprintf("127.0.0.1:%d", bootstrapNodePort)}}, - }) + }, NewSimulatedMercuryServer()) nodes = append(nodes, Node{ app, transmitter, kb, @@ -495,7 +499,7 @@ func TestIntegration_KeeperPluginForwarderEnabled(t *testing.T) { effectiveTransmitters := make([]common.Address, 0) // Setup bootstrap + oracle nodes bootstrapNodePort := int64(19599) - appBootstrap, bootstrapPeerID, bootstrapTransmitter, bootstrapKb := setupNode(t, bootstrapNodePort, "bootstrap_keeper_ocr", nodeKeys[0], backend, nil) + appBootstrap, bootstrapPeerID, bootstrapTransmitter, bootstrapKb := setupNode(t, bootstrapNodePort, "bootstrap_keeper_ocr", nodeKeys[0], backend, nil, NewSimulatedMercuryServer()) bootstrapNode := Node{ appBootstrap, bootstrapTransmitter, bootstrapKb, @@ -509,7 +513,7 @@ func TestIntegration_KeeperPluginForwarderEnabled(t *testing.T) { app, peerID, transmitter, kb := setupNode(t, bootstrapNodePort+i+1, fmt.Sprintf("oracle_keeper%d", i), nodeKeys[i+1], backend, []commontypes.BootstrapperLocator{ // Supply the bootstrap IP and port as a V2 peer address {PeerID: bootstrapPeerID, Addrs: []string{fmt.Sprintf("127.0.0.1:%d", bootstrapNodePort)}}, - }) + }, NewSimulatedMercuryServer()) nodeForwarder := setupForwarderForNode(t, app, sergey, backend, transmitter, linkAddr) effectiveTransmitters = append(effectiveTransmitters, nodeForwarder) @@ -726,3 +730,72 @@ func TestFilterNamesFromSpec20(t *testing.T) { _, err = ocr2keeper.FilterNamesFromSpec20(spec) require.ErrorContains(t, err, "not a valid EIP55 formatted address") } + +// ------- below this line could be added to a test helpers package +type MercuryEndpoint interface { + URL() string + Username() string + Password() string + CallCount() int + RegisterHandler(http.HandlerFunc) +} + +type SimulatedMercuryServer struct { + server *httptest.Server + handler http.HandlerFunc + + mu sync.RWMutex + callCount int +} + +func NewSimulatedMercuryServer() *SimulatedMercuryServer { + srv := &SimulatedMercuryServer{ + handler: func(w http.ResponseWriter, r *http.Request) { + w.WriteHeader(http.StatusNotFound) + }, + } + + srv.server = httptest.NewUnstartedServer(srv) + + return srv +} + +func (ms *SimulatedMercuryServer) URL() string { + return ms.server.URL +} + +func (ms *SimulatedMercuryServer) Username() string { + return "username1" +} + +func (ms *SimulatedMercuryServer) Password() string { + return "password1" +} + +func (ms *SimulatedMercuryServer) CallCount() int { + ms.mu.RLock() + defer ms.mu.RUnlock() + + return ms.callCount +} + +func (ms *SimulatedMercuryServer) RegisterHandler(h http.HandlerFunc) { + ms.handler = h +} + +func (ms *SimulatedMercuryServer) Start() { + ms.server.Start() +} + +func (ms *SimulatedMercuryServer) Stop() { + ms.server.Close() +} + +func (ms *SimulatedMercuryServer) ServeHTTP(w http.ResponseWriter, r *http.Request) { + ms.mu.Lock() + defer ms.mu.Unlock() + + ms.callCount++ + + ms.handler.ServeHTTP(w, r) +}