From 96390f039d9683627fbeb60adaccd34191eb961b Mon Sep 17 00:00:00 2001 From: Awbrey Hughlett Date: Fri, 25 Aug 2023 10:56:51 -0500 Subject: [PATCH] Log Trigger Pipeline Retry Integration Test This commit offers a proposal for completing an integration test that runs log trigger upkeeps, a full check pipeline, fails mercury server calls, and allows the application to retry the upkeeps. Additions include: - mock mercury server that can have a custom handler per test - upkeep setup helpers for creating/registering log triggered upkeeps - integration test for log event retry flow --- .../plugins/ocr2keeper/integration_21_test.go | 452 +++++++++++++++++- .../plugins/ocr2keeper/integration_test.go | 85 +++- 2 files changed, 523 insertions(+), 14 deletions(-) diff --git a/core/services/ocr2/plugins/ocr2keeper/integration_21_test.go b/core/services/ocr2/plugins/ocr2keeper/integration_21_test.go index 91e422ce949..3d0e3c20f97 100644 --- a/core/services/ocr2/plugins/ocr2keeper/integration_21_test.go +++ b/core/services/ocr2/plugins/ocr2keeper/integration_21_test.go @@ -7,6 +7,7 @@ import ( "encoding/json" "fmt" "math/big" + "net/http" "strings" "sync" "testing" @@ -33,11 +34,13 @@ import ( "github.com/smartcontractkit/chainlink/v2/core/chains/evm/logpoller" automationForwarderLogic "github.com/smartcontractkit/chainlink/v2/core/gethwrappers/generated/automation_forwarder_logic" "github.com/smartcontractkit/chainlink/v2/core/gethwrappers/generated/basic_upkeep_contract" + "github.com/smartcontractkit/chainlink/v2/core/gethwrappers/generated/dummy_protocol_wrapper" iregistry21 "github.com/smartcontractkit/chainlink/v2/core/gethwrappers/generated/i_keeper_registry_master_wrapper_2_1" registrylogica21 "github.com/smartcontractkit/chainlink/v2/core/gethwrappers/generated/keeper_registry_logic_a_wrapper_2_1" registrylogicb21 "github.com/smartcontractkit/chainlink/v2/core/gethwrappers/generated/keeper_registry_logic_b_wrapper_2_1" registry21 "github.com/smartcontractkit/chainlink/v2/core/gethwrappers/generated/keeper_registry_wrapper_2_1" "github.com/smartcontractkit/chainlink/v2/core/gethwrappers/generated/link_token_interface" + "github.com/smartcontractkit/chainlink/v2/core/gethwrappers/generated/log_triggered_feed_lookup_wrapper" "github.com/smartcontractkit/chainlink/v2/core/gethwrappers/generated/log_upkeep_counter_wrapper" "github.com/smartcontractkit/chainlink/v2/core/gethwrappers/generated/mock_v3_aggregator_contract" "github.com/smartcontractkit/chainlink/v2/core/internal/cltest" @@ -46,6 +49,7 @@ import ( "github.com/smartcontractkit/chainlink/v2/core/services/job" "github.com/smartcontractkit/chainlink/v2/core/services/keystore/keys/ethkey" "github.com/smartcontractkit/chainlink/v2/core/services/ocr2/plugins/ocr2keeper" + evm21 "github.com/smartcontractkit/chainlink/v2/core/services/ocr2/plugins/ocr2keeper/evm21" "github.com/smartcontractkit/chainlink/v2/core/services/pipeline" "github.com/smartcontractkit/chainlink/v2/core/services/relay/evm" ) @@ -109,7 +113,7 @@ func TestIntegration_KeeperPluginConditionalUpkeep(t *testing.T) { require.NoError(t, err) registry := deployKeeper21Registry(t, steve, backend, linkAddr, linkFeedAddr, gasFeedAddr) - nodes := setupNodes(t, nodeKeys, registry, backend, steve) + nodes, _ := setupNodes(t, nodeKeys, registry, backend, steve) <-time.After(time.Second * 5) @@ -197,7 +201,7 @@ func TestIntegration_KeeperPluginLogUpkeep(t *testing.T) { require.NoError(t, err) registry := deployKeeper21Registry(t, steve, backend, linkAddr, linkFeedAddr, gasFeedAddr) - nodes := setupNodes(t, nodeKeys, registry, backend, steve) + nodes, _ := setupNodes(t, nodeKeys, registry, backend, steve) // wait for nodes to start // TODO: find a better way to do this <-time.After(time.Second * 10) @@ -264,6 +268,162 @@ func TestIntegration_KeeperPluginLogUpkeep(t *testing.T) { }) } +func TestIntegration_KeeperPluginLogUpkeep_Retry(t *testing.T) { + t.Skip() + /* + In trying to create the retry integration test I ran into an issue with + the simulated chain not being able to query old blocks: + + `simulatedBackend cannot access blocks other than the latest block` + + What I haven't figured out yet is why it only affects parts of the + pipeline and not others. + + 1. `checkUpkeeps` seems to be unaffected + 2. `feedLookup` fails when the func `allowedToUseMercury` calls the contract for mercury privileges + 3. `feedLookup` fails when the func `checkCallback` calls the contract callback function + 4. `simulatePerformUpkeeps` seems to be unaffected + + I know we ran into this issue early on with conditional upkeeps and I'm + guessing that the current integration tests are only ever using check + and simulate perform. The only difference that stands out to me is the + unaffected ones are using BatchCallContext and the ones that fail use + CallContext. + + The difference on the simulated chain is CallContract does a check on + the latest block number while PendingCallContract does not. I have not + yet made any connections between the client methods and the simulated + chain methods. + + I think the functions that currently use CallContext should be updated + to use BatchCallContext to stay consistent with the current usages of + batching. This would improve the latency of the current pipeline and + possibly eliminate this issue with the simulated chain if the assumption + above is true. + */ + + g := gomega.NewWithT(t) + + // setup blockchain + linkOwner := testutils.MustNewSimTransactor(t) // owns all the link + registryOwner := testutils.MustNewSimTransactor(t) // registry owner + upkeepOwner := testutils.MustNewSimTransactor(t) // upkeep owner + genesisData := core.GenesisAlloc{ + linkOwner.From: {Balance: assets.Ether(10000).ToInt()}, + registryOwner.From: {Balance: assets.Ether(10000).ToInt()}, + upkeepOwner.From: {Balance: assets.Ether(10000).ToInt()}, + } + + // Generate 5 keys for nodes (1 bootstrap + 4 ocr nodes) and fund them with ether + var nodeKeys [5]ethkey.KeyV2 + for i := int64(0); i < 5; i++ { + nodeKeys[i] = cltest.MustGenerateRandomKey(t) + genesisData[nodeKeys[i].Address] = core.GenesisAccount{Balance: assets.Ether(1000).ToInt()} + } + + backend := cltest.NewSimulatedBackend(t, genesisData, uint32(ethconfig.Defaults.Miner.GasCeil)) + stopMining := cltest.Mine(backend, 3*time.Second) // Should be greater than deltaRound since we cannot access old blocks on simulated blockchain + defer stopMining() + + // Deploy registry + linkAddr, _, linkToken, err := link_token_interface.DeployLinkToken(linkOwner, backend) + require.NoError(t, err) + + gasFeedAddr, _, _, err := mock_v3_aggregator_contract.DeployMockV3AggregatorContract(registryOwner, backend, 18, big.NewInt(60000000000)) + require.NoError(t, err) + + linkFeedAddr, _, _, err := mock_v3_aggregator_contract.DeployMockV3AggregatorContract(registryOwner, backend, 18, big.NewInt(2000000000000000000)) + require.NoError(t, err) + + registry := deployKeeper21Registry(t, registryOwner, backend, linkAddr, linkFeedAddr, gasFeedAddr) + + nodes, mercuryServer := setupNodes(t, nodeKeys, registry, backend, registryOwner) + + const upkeepCount = 10 + + // testing with the mercury server involves mocking responses. currently, + // there is not a way to connect a mercury call to an upkeep id (though we + // could add custom headers) so the test must be fairly basic and just + // count calls before switching to successes + var ( + mu sync.Mutex + count int + ) + + mercuryServer.RegisterHandler(func(w http.ResponseWriter, r *http.Request) { + mu.Lock() + defer mu.Unlock() + + count++ + + _ = r.ParseForm() + + // TODO: validate request URI + t.Logf("MercuryHTTPServe:RequestURI: %s", r.RequestURI) + + // TODO: validate form values + for key, value := range r.Form { + t.Logf("MercuryHTTPServe:FormValue: key: %s; value: %s;", key, value) + } + + // upkeepCount * 2 should be two rounds of calling the mercury endpoint + if count <= upkeepCount*2 { + // start sending success messages + output := `{"chainlinkBlob":"0x0001c38d71fed6c320b90e84b6f559459814d068e2a1700adc931ca9717d4fe70000000000000000000000000000000000000000000000000000000001a80b52b4bf1233f9cb71144a253a1791b202113c4ab4a92fa1b176d684b4959666ff8200000000000000000000000000000000000000000000000000000000000000e000000000000000000000000000000000000000000000000000000000000002000000000000000000000000000000000000000000000000000000000000000260000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000001004254432d5553442d415242495452554d2d544553544e4554000000000000000000000000000000000000000000000000000000000000000000000000645570be000000000000000000000000000000000000000000000000000002af2b818dc5000000000000000000000000000000000000000000000000000002af2426faf3000000000000000000000000000000000000000000000000000002af32dc209700000000000000000000000000000000000000000000000000000000012130f8df0a9745bb6ad5e2df605e158ba8ad8a33ef8a0acf9851f0f01668a3a3f2b68600000000000000000000000000000000000000000000000000000000012130f60000000000000000000000000000000000000000000000000000000000000002c4a7958dce105089cf5edb68dad7dcfe8618d7784eb397f97d5a5fade78c11a58275aebda478968e545f7e3657aba9dcbe8d44605e4c6fde3e24edd5e22c94270000000000000000000000000000000000000000000000000000000000000002459c12d33986018a8959566d145225f0c4a4e61a9a3f50361ccff397899314f0018162cf10cd89897635a0bb62a822355bd199d09f4abe76e4d05261bb44733d"}` + + w.WriteHeader(http.StatusOK) + _, _ = w.Write([]byte(output)) + + return + } + + w.WriteHeader(http.StatusNotFound) + }) + + // wait for nodes to start + // TODO: find a better way to do this + <-time.After(10 * time.Second) + + defer mercuryServer.Stop() + + _, err = linkToken.Transfer(linkOwner, upkeepOwner.From, big.NewInt(0).Mul(oneHunEth, big.NewInt(int64(upkeepCount+1)))) + require.NoError(t, err) + + backend.Commit() + + feeds, err := newFeedLookupUpkeepController(backend, registryOwner) + require.NoError(t, err, "no error expected from creating a feed lookup controller") + + // deploy multiple upkeeps that listen to a log emitter and need to be + // performed for each log event + _ = feeds.DeployUpkeeps(t, backend, upkeepOwner, upkeepCount) + _ = feeds.RegisterAndFund(t, registerAndFund(registry, registryOwner, backend, linkToken)) + _ = feeds.EnableMercury(t, backend, registry, registryOwner) + _ = feeds.VerifyEnv(t, backend, registry, registryOwner) + + // using sleep here is not the right way to go, but it currently work on + // a local machine. this does have issues in CI + time.Sleep(10 * time.Second) + + // start emitting events in a separate go-routine + // feed lookup relies on a single contract event log to perform multiple + // listener contracts + go func() { + // only 1 event is necessary to make all 10 upkeeps eligible + _ = feeds.EmitEvents(t, backend, 1, func() { + // pause per emit for expected block production time + time.Sleep(3 * time.Second) + }) + }() + + listener, done := listenPerformed(t, backend, registry, feeds.UpkeepsIds(), int64(1)) + g.Eventually(listener, testutils.WaitTimeout(t)-(5*time.Second), cltest.DBPollingInterval).Should(gomega.BeTrue()) + + done() + + _ = checkPipelineRuns(t, nodes, 1*len(nodes)) // TODO: TBD +} + func waitPipelineRuns(t *testing.T, nodes []Node, n int, timeout, interval time.Duration) { ctx, cancel := context.WithTimeout(testutils.Context(t), timeout) defer cancel() @@ -321,29 +481,37 @@ func listenPerformed(t *testing.T, backend *backends.SimulatedBackend, registry cache := &sync.Map{} ctx, cancel := context.WithCancel(testutils.Context(t)) start := startBlock + go func() { for ctx.Err() == nil { bl := backend.Blockchain().CurrentBlock().Number.Uint64() + sc := make([]bool, len(ids)) for i := range sc { sc[i] = true } + iter, err := registry.FilterUpkeepPerformed(&bind.FilterOpts{ Start: uint64(start), End: &bl, Context: ctx, }, ids, sc) + if ctx.Err() != nil { return } + require.NoError(t, err) + for iter.Next() { if iter.Event != nil { t.Logf("[automation-ocr3 | EvmRegistry] upkeep performed event emitted for id %s", iter.Event.Id.String()) cache.Store(iter.Event.Id.String(), true) } } + require.NoError(t, iter.Close()) + time.Sleep(time.Second) } }() @@ -351,11 +519,14 @@ func listenPerformed(t *testing.T, backend *backends.SimulatedBackend, registry return mapListener(cache, 0), cancel } -func setupNodes(t *testing.T, nodeKeys [5]ethkey.KeyV2, registry *iregistry21.IKeeperRegistryMaster, backend *backends.SimulatedBackend, usr *bind.TransactOpts) []Node { +func setupNodes(t *testing.T, nodeKeys [5]ethkey.KeyV2, registry *iregistry21.IKeeperRegistryMaster, backend *backends.SimulatedBackend, usr *bind.TransactOpts) ([]Node, *SimulatedMercuryServer) { lggr := logger.TestLogger(t) + mServer := NewSimulatedMercuryServer() + mServer.Start() + // Setup bootstrap + oracle nodes bootstrapNodePort := int64(19599) - appBootstrap, bootstrapPeerID, bootstrapTransmitter, bootstrapKb := setupNode(t, bootstrapNodePort, "bootstrap_keeper_ocr", nodeKeys[0], backend, nil) + appBootstrap, bootstrapPeerID, bootstrapTransmitter, bootstrapKb := setupNode(t, bootstrapNodePort, "bootstrap_keeper_ocr", nodeKeys[0], backend, nil, mServer) bootstrapNode := Node{ appBootstrap, bootstrapTransmitter, bootstrapKb, } @@ -368,7 +539,7 @@ func setupNodes(t *testing.T, nodeKeys [5]ethkey.KeyV2, registry *iregistry21.IK app, peerID, transmitter, kb := setupNode(t, bootstrapNodePort+i+1, fmt.Sprintf("oracle_keeper%d", i), nodeKeys[i+1], backend, []commontypes.BootstrapperLocator{ // Supply the bootstrap IP and port as a V2 peer address {PeerID: bootstrapPeerID, Addrs: []string{fmt.Sprintf("127.0.0.1:%d", bootstrapNodePort)}}, - }) + }, mServer) nodes = append(nodes, Node{ app, transmitter, kb, @@ -441,7 +612,7 @@ func setupNodes(t *testing.T, nodeKeys [5]ethkey.KeyV2, registry *iregistry21.IK "fallbackLinkPrice": big.NewInt(2000000000000000000), "transcoder": testutils.NewAddress(), "registrars": []common.Address{testutils.NewAddress()}, - "upkeepPrivilegeManager": testutils.NewAddress(), + "upkeepPrivilegeManager": usr.From, }, configType) require.NoError(t, err) rawCfg, err := json.Marshal(config.OffchainConfig{ @@ -499,7 +670,7 @@ func setupNodes(t *testing.T, nodeKeys [5]ethkey.KeyV2, registry *iregistry21.IK require.NoError(t, err) backend.Commit() - return nodes + return nodes, mServer } func deployUpkeeps(t *testing.T, backend *backends.SimulatedBackend, carrol, steve *bind.TransactOpts, linkToken *link_token_interface.LinkToken, registry *iregistry21.IKeeperRegistryMaster, n int) ([]*big.Int, []common.Address, []*log_upkeep_counter_wrapper.LogUpkeepCounter) { @@ -602,3 +773,270 @@ func getUpkeepIdFromTx21(t *testing.T, registry *iregistry21.IKeeperRegistryMast require.NoError(t, err) return parsedLog.Id } + +// ------- below this line could be added to a test helpers package +type registerAndFundFunc func(*testing.T, common.Address, *bind.TransactOpts, uint8, []byte) *big.Int + +func registerAndFund( + registry *iregistry21.IKeeperRegistryMaster, + registryOwner *bind.TransactOpts, + backend *backends.SimulatedBackend, + linkToken *link_token_interface.LinkToken, +) registerAndFundFunc { + return func(t *testing.T, upkeepAddr common.Address, upkeepOwner *bind.TransactOpts, trigger uint8, config []byte) *big.Int { + // register the upkeep on the host registry contract + registrationTx, err := registry.RegisterUpkeep( + registryOwner, + upkeepAddr, + 2_500_000, + upkeepOwner.From, + trigger, + []byte{}, + config, + []byte{}, + ) + require.NoError(t, err) + + backend.Commit() + + receipt, err := backend.TransactionReceipt(testutils.Context(t), registrationTx.Hash()) + require.NoError(t, err) + + parsedLog, err := registry.ParseUpkeepRegistered(*receipt.Logs[0]) + require.NoError(t, err) + + upkeepID := parsedLog.Id + + // Fund the upkeep + _, err = linkToken.Approve(upkeepOwner, registry.Address(), oneHunEth) + require.NoError(t, err) + + _, err = registry.AddFunds(upkeepOwner, upkeepID, oneHunEth) + require.NoError(t, err) + + backend.Commit() + + return upkeepID + } +} + +type feedLookupUpkeepController struct { + // address for dummy protocol + logSrcAddr common.Address + // dummy protocol is a log event source + protocol *dummy_protocol_wrapper.DummyProtocol + protocolOwner *bind.TransactOpts + // log trigger listener contracts react to logs produced from protocol + count int + upkeepIds []*big.Int + addresses []common.Address + contracts []*log_triggered_feed_lookup_wrapper.LogTriggeredFeedLookup + contractsOwner *bind.TransactOpts +} + +func newFeedLookupUpkeepController( + backend *backends.SimulatedBackend, + protocolOwner *bind.TransactOpts, +) (*feedLookupUpkeepController, error) { + addr, _, contract, err := dummy_protocol_wrapper.DeployDummyProtocol(protocolOwner, backend) + if err != nil { + return nil, err + } + + backend.Commit() + + return &feedLookupUpkeepController{ + logSrcAddr: addr, + protocol: contract, + protocolOwner: protocolOwner, + }, nil +} + +func (c *feedLookupUpkeepController) DeployUpkeeps( + t *testing.T, + backend *backends.SimulatedBackend, + owner *bind.TransactOpts, + count int, +) error { + addresses := make([]common.Address, count) + contracts := make([]*log_triggered_feed_lookup_wrapper.LogTriggeredFeedLookup, count) + + // deploy n upkeep contracts + for x := 0; x < count; x++ { + addr, _, contract, err := log_triggered_feed_lookup_wrapper.DeployLogTriggeredFeedLookup( + owner, + backend, + false, + false, + ) + + if err != nil { + require.NoError(t, err, "test dependent on contract deployment") + + return err + } + + addresses[x] = addr + contracts[x] = contract + } + + backend.Commit() + + c.count = count + c.addresses = addresses + c.contracts = contracts + c.contractsOwner = owner + + return nil +} + +func (c *feedLookupUpkeepController) RegisterAndFund( + t *testing.T, + f registerAndFundFunc, +) error { + ids := make([]*big.Int, len(c.contracts)) + + t.Logf("address: %s", c.logSrcAddr.Hex()) + + logTriggerConfigType := abi.MustNewType("tuple(address contractAddress, uint8 filterSelector, bytes32 topic0, bytes32 topic1, bytes32 topic2, bytes32 topic3)") + config, err := abi.Encode(map[string]interface{}{ + "contractAddress": c.logSrcAddr, + "filterSelector": 0, // no indexed topics filtered + "topic0": "0xd1ffe9e45581c11d7d9f2ed5f75217cd4be9f8b7eee6af0f6d03f46de53956cd", // LimitOrderExecuted event for dummy protocol + "topic1": "0x", + "topic2": "0x", + "topic3": "0x", + }, logTriggerConfigType) + + require.NoError(t, err) + + for x := range c.contracts { + ids[x] = f(t, c.addresses[x], c.contractsOwner, 1, config) + } + + c.upkeepIds = ids + + return nil +} + +func (c *feedLookupUpkeepController) EnableMercury( + t *testing.T, + backend *backends.SimulatedBackend, + registry *iregistry21.IKeeperRegistryMaster, + registryOwner *bind.TransactOpts, +) error { + adminBytes, _ := json.Marshal(evm21.UpkeepPrivilegeConfig{ + MercuryEnabled: true, + }) + + for _, id := range c.upkeepIds { + if _, err := registry.SetUpkeepPrivilegeConfig(registryOwner, id, adminBytes); err != nil { + require.NoError(t, err) + + return err + } + } + + bl, _ := backend.BlockByHash(testutils.Context(t), backend.Commit()) + t.Logf("block number after mercury enabled: %d", bl.NumberU64()) + + return nil +} + +func (c *feedLookupUpkeepController) VerifyEnv( + t *testing.T, + backend *backends.SimulatedBackend, + registry *iregistry21.IKeeperRegistryMaster, + registryOwner *bind.TransactOpts, +) error { + t.Log("verifying number of active upkeeps") + + ids, err := registry.GetActiveUpkeepIDs(&bind.CallOpts{ + Context: testutils.Context(t), + From: registryOwner.From, + }, big.NewInt(0), big.NewInt(100)) + + require.NoError(t, err) + require.Len(t, ids, c.count, "active upkeep ids does not match count") + require.Len(t, ids, len(c.upkeepIds)) + + t.Log("verifying total number of contracts") + require.Len(t, c.contracts, len(c.upkeepIds), "one contract for each upkeep id expected") + + // call individual contracts to see that they revert + for _, contract := range c.contracts { + _, err := contract.CheckLog(c.contractsOwner, log_triggered_feed_lookup_wrapper.Log{ + Index: big.NewInt(0), + TxIndex: big.NewInt(0), + TxHash: common.HexToHash("0x1"), + BlockNumber: big.NewInt(0), + BlockHash: common.HexToHash("0x14"), + Source: common.HexToAddress("0x2"), + Topics: [][32]byte{ + common.HexToHash("0xd1ffe9e45581c11d7d9f2ed5f75217cd4be9f8b7eee6af0f6d03f46de53956cd"), // matches executedSig and should result in a feedlookup revert + common.HexToHash("0x"), + common.HexToHash("0x"), + common.HexToHash("0x"), + }, + Data: []byte{}, + }, []byte("0x")) + + require.Error(t, err, "check log contract call should revert: %s", err) + } + + return nil +} + +func (c *feedLookupUpkeepController) EmitEvents( + t *testing.T, + backend *backends.SimulatedBackend, + count int, + afterEmit func(), +) error { + ctx := testutils.Context(t) + + for i := 0; i < count && ctx.Err() == nil; i++ { + _, err := c.protocol.ExecuteLimitOrder(c.protocolOwner, big.NewInt(1000), big.NewInt(10000), c.logSrcAddr) + require.NoError(t, err, "no error expected from limit order exec") + + if err != nil { + return err + } + + backend.Commit() + + // verify event was emitted + block, _ := backend.BlockByHash(context.Background(), backend.Commit()) + t.Logf("block number after emit event: %d", block.NumberU64()) + + iter, _ := c.protocol.FilterLimitOrderExecuted( + &bind.FilterOpts{ + Context: testutils.Context(t), + Start: block.NumberU64() - 1, + }, + []*big.Int{big.NewInt(1000)}, + []*big.Int{big.NewInt(10000)}, + []common.Address{c.logSrcAddr}, + ) + + var eventEmitted bool + for iter.Next() { + if iter.Event != nil { + eventEmitted = true + } + } + + require.True(t, eventEmitted, "event expected on backend") + if !eventEmitted { + return fmt.Errorf("event was not emitted") + } + + afterEmit() + } + + return nil +} + +func (c *feedLookupUpkeepController) UpkeepsIds() []*big.Int { + return c.upkeepIds +} diff --git a/core/services/ocr2/plugins/ocr2keeper/integration_test.go b/core/services/ocr2/plugins/ocr2keeper/integration_test.go index 8b7e92c40fe..de27b457bb5 100644 --- a/core/services/ocr2/plugins/ocr2keeper/integration_test.go +++ b/core/services/ocr2/plugins/ocr2keeper/integration_test.go @@ -7,7 +7,10 @@ import ( "encoding/json" "fmt" "math/big" + "net/http" + "net/http/httptest" "strings" + "sync" "testing" "time" @@ -108,6 +111,7 @@ func setupNode( nodeKey ethkey.KeyV2, backend *backends.SimulatedBackend, p2pV2Bootstrappers []commontypes.BootstrapperLocator, + mercury MercuryEndpoint, ) (chainlink.Application, string, common.Address, ocr2key.KeyBundle) { p2pKey, err := p2pkey.NewV2() require.NoError(t, err) @@ -133,9 +137,9 @@ func setupNode( c.EVM[0].GasEstimator.Mode = ptr("FixedPrice") s.Mercury.Credentials = map[string]toml.MercuryCredentials{ MercuryCredName: { - URL: models.MustSecretURL("https://mercury.chain.link"), - Username: models.NewSecret("username1"), - Password: models.NewSecret("password1"), + URL: models.MustSecretURL(mercury.URL()), + Username: models.NewSecret(mercury.Username()), + Password: models.NewSecret(mercury.Password()), }, } }) @@ -232,7 +236,7 @@ func TestIntegration_KeeperPluginBasic(t *testing.T) { // Setup bootstrap + oracle nodes bootstrapNodePort := int64(19599) - appBootstrap, bootstrapPeerID, bootstrapTransmitter, bootstrapKb := setupNode(t, bootstrapNodePort, "bootstrap_keeper_ocr", nodeKeys[0], backend, nil) + appBootstrap, bootstrapPeerID, bootstrapTransmitter, bootstrapKb := setupNode(t, bootstrapNodePort, "bootstrap_keeper_ocr", nodeKeys[0], backend, nil, NewSimulatedMercuryServer()) bootstrapNode := Node{ appBootstrap, bootstrapTransmitter, bootstrapKb, } @@ -245,7 +249,7 @@ func TestIntegration_KeeperPluginBasic(t *testing.T) { app, peerID, transmitter, kb := setupNode(t, bootstrapNodePort+i+1, fmt.Sprintf("oracle_keeper%d", i), nodeKeys[i+1], backend, []commontypes.BootstrapperLocator{ // Supply the bootstrap IP and port as a V2 peer address {PeerID: bootstrapPeerID, Addrs: []string{fmt.Sprintf("127.0.0.1:%d", bootstrapNodePort)}}, - }) + }, NewSimulatedMercuryServer()) nodes = append(nodes, Node{ app, transmitter, kb, @@ -492,7 +496,7 @@ func TestIntegration_KeeperPluginForwarderEnabled(t *testing.T) { effectiveTransmitters := make([]common.Address, 0) // Setup bootstrap + oracle nodes bootstrapNodePort := int64(19599) - appBootstrap, bootstrapPeerID, bootstrapTransmitter, bootstrapKb := setupNode(t, bootstrapNodePort, "bootstrap_keeper_ocr", nodeKeys[0], backend, nil) + appBootstrap, bootstrapPeerID, bootstrapTransmitter, bootstrapKb := setupNode(t, bootstrapNodePort, "bootstrap_keeper_ocr", nodeKeys[0], backend, nil, NewSimulatedMercuryServer()) bootstrapNode := Node{ appBootstrap, bootstrapTransmitter, bootstrapKb, @@ -506,7 +510,7 @@ func TestIntegration_KeeperPluginForwarderEnabled(t *testing.T) { app, peerID, transmitter, kb := setupNode(t, bootstrapNodePort+i+1, fmt.Sprintf("oracle_keeper%d", i), nodeKeys[i+1], backend, []commontypes.BootstrapperLocator{ // Supply the bootstrap IP and port as a V2 peer address {PeerID: bootstrapPeerID, Addrs: []string{fmt.Sprintf("127.0.0.1:%d", bootstrapNodePort)}}, - }) + }, NewSimulatedMercuryServer()) nodeForwarder := setupForwarderForNode(t, app, sergey, backend, transmitter, linkAddr) effectiveTransmitters = append(effectiveTransmitters, nodeForwarder) @@ -723,3 +727,70 @@ func TestFilterNamesFromSpec20(t *testing.T) { _, err = ocr2keeper.FilterNamesFromSpec20(spec) require.ErrorContains(t, err, "not a valid EIP55 formatted address") } + +// ------- below this line could be added to a test helpers package +type MercuryEndpoint interface { + URL() string + Username() string + Password() string + CallCount() int + RegisterHandler(http.HandlerFunc) +} + +type SimulatedMercuryServer struct { + server *httptest.Server + handler http.HandlerFunc + + mu sync.RWMutex + callCount int +} + +func NewSimulatedMercuryServer() *SimulatedMercuryServer { + srv := &SimulatedMercuryServer{ + handler: func(w http.ResponseWriter, r *http.Request) {}, + } + + srv.server = httptest.NewUnstartedServer(srv) + + return srv +} + +func (ms *SimulatedMercuryServer) URL() string { + return ms.server.URL +} + +func (ms *SimulatedMercuryServer) Username() string { + return "username1" +} + +func (ms *SimulatedMercuryServer) Password() string { + return "password1" +} + +func (ms *SimulatedMercuryServer) CallCount() int { + ms.mu.RLock() + defer ms.mu.RUnlock() + + return ms.callCount +} + +func (ms *SimulatedMercuryServer) RegisterHandler(h http.HandlerFunc) { + ms.handler = h +} + +func (ms *SimulatedMercuryServer) Start() { + ms.server.Start() +} + +func (ms *SimulatedMercuryServer) Stop() { + ms.server.Close() +} + +func (ms *SimulatedMercuryServer) ServeHTTP(w http.ResponseWriter, r *http.Request) { + ms.mu.Lock() + defer ms.mu.Unlock() + + ms.callCount++ + + ms.handler.ServeHTTP(w, r) +}