diff --git a/.changeset/eighty-hotels-sit.md b/.changeset/eighty-hotels-sit.md new file mode 100644 index 00000000000..e83b70c7695 --- /dev/null +++ b/.changeset/eighty-hotels-sit.md @@ -0,0 +1,5 @@ +--- +"chainlink": patch +--- + +Fix panic if mercury server returns error #bugfix diff --git a/.changeset/hungry-carpets-flow.md b/.changeset/hungry-carpets-flow.md new file mode 100644 index 00000000000..19835b99c17 --- /dev/null +++ b/.changeset/hungry-carpets-flow.md @@ -0,0 +1,5 @@ +--- +"chainlink": minor +--- + +Added a mechanism to validate forwarders for OCR2 and fallback to EOA if necessary #added diff --git a/CHANGELOG.md b/CHANGELOG.md index 314626a0bd6..6bb68416133 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -189,7 +189,7 @@ You may disable if this results in excessive log volume. Disable like so: ``` - [Pipeline] + [JobPipeline] VerboseLogging = false ``` @@ -219,7 +219,7 @@ - [#12404](https://github.com/smartcontractkit/chainlink/pull/12404) [`b74079b672`](https://github.com/smartcontractkit/chainlink/commit/b74079b672f36fb0c241f90ea1e875ea3a9524da) Thanks [@HenryNguyen5](https://github.com/HenryNguyen5)! - Add OCR3 capability contract wrapper -- [#12498](https://github.com/smartcontractkit/chainlink/pull/12498) [`1c576d0e34`](https://github.com/smartcontractkit/chainlink/commit/1c576d0e34d93a6298ddcb662ee89fd04eeda53e) Thanks [@samsondav](https://github.com/samsondav)! - Add new config option Pipeline.VerboseLogging +- [#12498](https://github.com/smartcontractkit/chainlink/pull/12498) [`1c576d0e34`](https://github.com/smartcontractkit/chainlink/commit/1c576d0e34d93a6298ddcb662ee89fd04eeda53e) Thanks [@samsondav](https://github.com/samsondav)! - Add new config option JobPipeline.VerboseLogging VerboseLogging enables detailed logging of pipeline execution steps. This is disabled by default because it increases log volume for pipeline runs, but can @@ -230,7 +230,7 @@ Set it like the following example: ``` - [Pipeline] + [JobPipeline] VerboseLogging = true ``` diff --git a/common/txmgr/mocks/tx_manager.go b/common/txmgr/mocks/tx_manager.go index 935e7313817..a3e8c489314 100644 --- a/common/txmgr/mocks/tx_manager.go +++ b/common/txmgr/mocks/tx_manager.go @@ -301,6 +301,34 @@ func (_m *TxManager[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) GetFor return r0, r1 } +// GetForwarderForEOAOCR2Feeds provides a mock function with given fields: eoa, ocr2AggregatorID +func (_m *TxManager[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) GetForwarderForEOAOCR2Feeds(eoa ADDR, ocr2AggregatorID ADDR) (ADDR, error) { + ret := _m.Called(eoa, ocr2AggregatorID) + + if len(ret) == 0 { + panic("no return value specified for GetForwarderForEOAOCR2Feeds") + } + + var r0 ADDR + var r1 error + if rf, ok := ret.Get(0).(func(ADDR, ADDR) (ADDR, error)); ok { + return rf(eoa, ocr2AggregatorID) + } + if rf, ok := ret.Get(0).(func(ADDR, ADDR) ADDR); ok { + r0 = rf(eoa, ocr2AggregatorID) + } else { + r0 = ret.Get(0).(ADDR) + } + + if rf, ok := ret.Get(1).(func(ADDR, ADDR) error); ok { + r1 = rf(eoa, ocr2AggregatorID) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + // HealthReport provides a mock function with given fields: func (_m *TxManager[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) HealthReport() map[string]error { ret := _m.Called() diff --git a/common/txmgr/txmgr.go b/common/txmgr/txmgr.go index 4d4eabe5c40..1c8b59a55cc 100644 --- a/common/txmgr/txmgr.go +++ b/common/txmgr/txmgr.go @@ -47,6 +47,7 @@ type TxManager[ Trigger(addr ADDR) CreateTransaction(ctx context.Context, txRequest txmgrtypes.TxRequest[ADDR, TX_HASH]) (etx txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], err error) GetForwarderForEOA(eoa ADDR) (forwarder ADDR, err error) + GetForwarderForEOAOCR2Feeds(eoa, ocr2AggregatorID ADDR) (forwarder ADDR, err error) RegisterResumeCallback(fn ResumeCallback) SendNativeToken(ctx context.Context, chainID CHAIN_ID, from, to ADDR, value big.Int, gasLimit uint64) (etx txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], err error) Reset(addr ADDR, abandon bool) error @@ -553,6 +554,15 @@ func (b *Txm[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) GetForward return } +// GetForwarderForEOAOCR2Feeds calls forwarderMgr to get a proper forwarder for a given EOA and checks if its set as a transmitter on the OCR2Aggregator contract. +func (b *Txm[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) GetForwarderForEOAOCR2Feeds(eoa, ocr2Aggregator ADDR) (forwarder ADDR, err error) { + if !b.txConfig.ForwardersEnabled() { + return forwarder, fmt.Errorf("forwarding is not enabled, to enable set Transactions.ForwardersEnabled =true") + } + forwarder, err = b.fwdMgr.ForwarderForOCR2Feeds(eoa, ocr2Aggregator) + return +} + func (b *Txm[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) checkEnabled(ctx context.Context, addr ADDR) error { if err := b.keyStore.CheckEnabled(ctx, addr, b.chainID); err != nil { return fmt.Errorf("cannot send transaction from %s on chain ID %s: %w", addr, b.chainID.String(), err) @@ -649,6 +659,10 @@ func (n *NullTxManager[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) Cre func (n *NullTxManager[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) GetForwarderForEOA(addr ADDR) (fwdr ADDR, err error) { return fwdr, err } +func (n *NullTxManager[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) GetForwarderForEOAOCR2Feeds(_, _ ADDR) (fwdr ADDR, err error) { + return fwdr, err +} + func (n *NullTxManager[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) Reset(addr ADDR, abandon bool) error { return nil } diff --git a/common/txmgr/types/forwarder_manager.go b/common/txmgr/types/forwarder_manager.go index 4d70b730004..3e51ffb1524 100644 --- a/common/txmgr/types/forwarder_manager.go +++ b/common/txmgr/types/forwarder_manager.go @@ -9,6 +9,7 @@ import ( type ForwarderManager[ADDR types.Hashable] interface { services.Service ForwarderFor(addr ADDR) (forwarder ADDR, err error) + ForwarderForOCR2Feeds(eoa, ocr2Aggregator ADDR) (forwarder ADDR, err error) // Converts payload to be forwarder-friendly ConvertPayload(dest ADDR, origPayload []byte) ([]byte, error) } diff --git a/common/txmgr/types/mocks/forwarder_manager.go b/common/txmgr/types/mocks/forwarder_manager.go index fe40e7bb5e2..1021e776e9d 100644 --- a/common/txmgr/types/mocks/forwarder_manager.go +++ b/common/txmgr/types/mocks/forwarder_manager.go @@ -91,6 +91,34 @@ func (_m *ForwarderManager[ADDR]) ForwarderFor(addr ADDR) (ADDR, error) { return r0, r1 } +// ForwarderForOCR2Feeds provides a mock function with given fields: eoa, ocr2Aggregator +func (_m *ForwarderManager[ADDR]) ForwarderForOCR2Feeds(eoa ADDR, ocr2Aggregator ADDR) (ADDR, error) { + ret := _m.Called(eoa, ocr2Aggregator) + + if len(ret) == 0 { + panic("no return value specified for ForwarderForOCR2Feeds") + } + + var r0 ADDR + var r1 error + if rf, ok := ret.Get(0).(func(ADDR, ADDR) (ADDR, error)); ok { + return rf(eoa, ocr2Aggregator) + } + if rf, ok := ret.Get(0).(func(ADDR, ADDR) ADDR); ok { + r0 = rf(eoa, ocr2Aggregator) + } else { + r0 = ret.Get(0).(ADDR) + } + + if rf, ok := ret.Get(1).(func(ADDR, ADDR) error); ok { + r1 = rf(eoa, ocr2Aggregator) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + // HealthReport provides a mock function with given fields: func (_m *ForwarderManager[ADDR]) HealthReport() map[string]error { ret := _m.Called() diff --git a/core/chains/evm/forwarders/forwarder_manager.go b/core/chains/evm/forwarders/forwarder_manager.go index 68015229307..9505cdfbbbf 100644 --- a/core/chains/evm/forwarders/forwarder_manager.go +++ b/core/chains/evm/forwarders/forwarder_manager.go @@ -2,6 +2,7 @@ package forwarders import ( "context" + "slices" "sync" "time" @@ -9,6 +10,7 @@ import ( "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/core/types" pkgerrors "github.com/pkg/errors" + "github.com/smartcontractkit/libocr/gethwrappers2/ocr2aggregator" "github.com/smartcontractkit/chainlink-common/pkg/logger" "github.com/smartcontractkit/chainlink-common/pkg/services" @@ -131,6 +133,42 @@ func (f *FwdMgr) ForwarderFor(addr common.Address) (forwarder common.Address, er return common.Address{}, pkgerrors.Errorf("Cannot find forwarder for given EOA") } +func (f *FwdMgr) ForwarderForOCR2Feeds(eoa, ocr2Aggregator common.Address) (forwarder common.Address, err error) { + fwdrs, err := f.ORM.FindForwardersByChain(f.ctx, big.Big(*f.evmClient.ConfiguredChainID())) + if err != nil { + return common.Address{}, err + } + + offchainAggregator, err := ocr2aggregator.NewOCR2Aggregator(ocr2Aggregator, f.evmClient) + if err != nil { + return common.Address{}, err + } + + transmitters, err := offchainAggregator.GetTransmitters(&bind.CallOpts{Context: f.ctx}) + if err != nil { + return common.Address{}, pkgerrors.Errorf("failed to get ocr2 aggregator transmitters: %s", err.Error()) + } + + for _, fwdr := range fwdrs { + if !slices.Contains(transmitters, fwdr.Address) { + f.logger.Criticalw("Forwarder is not set as a transmitter", "forwarder", fwdr.Address, "ocr2Aggregator", ocr2Aggregator, "err", err) + continue + } + + eoas, err := f.getContractSenders(fwdr.Address) + if err != nil { + f.logger.Errorw("Failed to get forwarder senders", "forwarder", fwdr.Address, "err", err) + continue + } + for _, addr := range eoas { + if addr == eoa { + return fwdr.Address, nil + } + } + } + return common.Address{}, pkgerrors.Errorf("Cannot find forwarder for given EOA") +} + func (f *FwdMgr) ConvertPayload(dest common.Address, origPayload []byte) ([]byte, error) { databytes, err := f.getForwardedPayload(dest, origPayload) if err != nil { diff --git a/core/chains/evm/forwarders/forwarder_manager_test.go b/core/chains/evm/forwarders/forwarder_manager_test.go index 3a515e7ab39..993efacac4a 100644 --- a/core/chains/evm/forwarders/forwarder_manager_test.go +++ b/core/chains/evm/forwarders/forwarder_manager_test.go @@ -2,19 +2,23 @@ package forwarders_test import ( "math/big" + "slices" "testing" "time" - "github.com/smartcontractkit/chainlink-common/pkg/sqlutil" - + "github.com/ethereum/go-ethereum/accounts/abi/bind" "github.com/ethereum/go-ethereum/accounts/abi/bind/backends" "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/core" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + "github.com/smartcontractkit/libocr/gethwrappers2/testocr2aggregator" + "github.com/smartcontractkit/chainlink-common/pkg/logger" + "github.com/smartcontractkit/chainlink-common/pkg/sqlutil" "github.com/smartcontractkit/chainlink-common/pkg/utils" + "github.com/smartcontractkit/chainlink/v2/core/services/ocr2/testhelpers" "github.com/smartcontractkit/chainlink/v2/core/chains/evm/client" "github.com/smartcontractkit/chainlink/v2/core/chains/evm/forwarders" @@ -150,3 +154,105 @@ func TestFwdMgr_AccountUnauthorizedToForward_SkipsForwarding(t *testing.T) { err = fwdMgr.Close() require.NoError(t, err) } + +func TestFwdMgr_InvalidForwarderForOCR2FeedsStates(t *testing.T) { + lggr := logger.Test(t) + db := pgtest.NewSqlxDB(t) + ctx := testutils.Context(t) + cfg := configtest.NewTestGeneralConfig(t) + evmcfg := evmtest.NewChainScopedConfig(t, cfg) + owner := testutils.MustNewSimTransactor(t) + ec := backends.NewSimulatedBackend(map[common.Address]core.GenesisAccount{ + owner.From: { + Balance: big.NewInt(0).Mul(big.NewInt(10), big.NewInt(1e18)), + }, + }, 10e6) + t.Cleanup(func() { ec.Close() }) + linkAddr := common.HexToAddress("0x01BE23585060835E02B77ef475b0Cc51aA1e0709") + operatorAddr, _, _, err := operator_wrapper.DeployOperator(owner, ec, linkAddr, owner.From) + require.NoError(t, err) + + forwarderAddr, _, forwarder, err := authorized_forwarder.DeployAuthorizedForwarder(owner, ec, linkAddr, owner.From, operatorAddr, []byte{}) + require.NoError(t, err) + ec.Commit() + + accessAddress, _, _, err := testocr2aggregator.DeploySimpleWriteAccessController(owner, ec) + require.NoError(t, err, "failed to deploy test access controller contract") + ocr2Address, _, ocr2, err := testocr2aggregator.DeployOCR2Aggregator( + owner, + ec, + linkAddr, + big.NewInt(0), + big.NewInt(10), + accessAddress, + accessAddress, + 9, + "TEST", + ) + require.NoError(t, err, "failed to deploy ocr2 test aggregator") + ec.Commit() + + evmClient := client.NewSimulatedBackendClient(t, ec, testutils.FixtureChainID) + lpOpts := logpoller.Opts{ + PollPeriod: 100 * time.Millisecond, + FinalityDepth: 2, + BackfillBatchSize: 3, + RpcBatchSize: 2, + KeepFinalizedBlocksDepth: 1000, + } + lp := logpoller.NewLogPoller(logpoller.NewORM(testutils.FixtureChainID, db, lggr), evmClient, lggr, lpOpts) + fwdMgr := forwarders.NewFwdMgr(db, evmClient, lp, lggr, evmcfg.EVM()) + fwdMgr.ORM = forwarders.NewORM(db) + + _, err = fwdMgr.ORM.CreateForwarder(ctx, forwarderAddr, ubig.Big(*testutils.FixtureChainID)) + require.NoError(t, err) + lst, err := fwdMgr.ORM.FindForwardersByChain(ctx, ubig.Big(*testutils.FixtureChainID)) + require.NoError(t, err) + require.Equal(t, len(lst), 1) + require.Equal(t, lst[0].Address, forwarderAddr) + + fwdMgr = forwarders.NewFwdMgr(db, evmClient, lp, lggr, evmcfg.EVM()) + require.NoError(t, fwdMgr.Start(testutils.Context(t))) + // cannot find forwarder because it isn't authorized nor added as a transmitter + addr, err := fwdMgr.ForwarderForOCR2Feeds(owner.From, ocr2Address) + require.ErrorContains(t, err, "Cannot find forwarder for given EOA") + require.True(t, utils.IsZero(addr)) + + _, err = forwarder.SetAuthorizedSenders(owner, []common.Address{owner.From}) + require.NoError(t, err) + ec.Commit() + + authorizedSenders, err := forwarder.GetAuthorizedSenders(&bind.CallOpts{Context: ctx}) + require.NoError(t, err) + require.Equal(t, owner.From, authorizedSenders[0]) + + // cannot find forwarder because it isn't added as a transmitter + addr, err = fwdMgr.ForwarderForOCR2Feeds(owner.From, ocr2Address) + require.ErrorContains(t, err, "Cannot find forwarder for given EOA") + require.True(t, utils.IsZero(addr)) + + onchainConfig, err := testhelpers.GenerateDefaultOCR2OnchainConfig(big.NewInt(0), big.NewInt(10)) + require.NoError(t, err) + + _, err = ocr2.SetConfig(owner, + []common.Address{testutils.NewAddress(), testutils.NewAddress(), testutils.NewAddress(), testutils.NewAddress()}, + []common.Address{forwarderAddr, testutils.NewAddress(), testutils.NewAddress(), testutils.NewAddress()}, + 1, + onchainConfig, + 0, + []byte{}) + require.NoError(t, err) + ec.Commit() + + transmitters, err := ocr2.GetTransmitters(&bind.CallOpts{Context: ctx}) + require.NoError(t, err) + require.True(t, slices.Contains(transmitters, forwarderAddr)) + + // create new fwd to have an empty cache that has to fetch authorized forwarders from log poller + fwdMgr = forwarders.NewFwdMgr(db, evmClient, lp, lggr, evmcfg.EVM()) + require.NoError(t, fwdMgr.Start(testutils.Context(t))) + addr, err = fwdMgr.ForwarderForOCR2Feeds(owner.From, ocr2Address) + require.NoError(t, err, "forwarder should be valid and found because it is both authorized and set as a transmitter") + require.Equal(t, forwarderAddr, addr) + require.NoError(t, fwdMgr.Close()) +} diff --git a/core/scripts/go.mod b/core/scripts/go.mod index 542bdece430..bc59f086ca9 100644 --- a/core/scripts/go.mod +++ b/core/scripts/go.mod @@ -21,7 +21,7 @@ require ( github.com/prometheus/client_golang v1.17.0 github.com/shopspring/decimal v1.3.1 github.com/smartcontractkit/chainlink-automation v1.0.3 - github.com/smartcontractkit/chainlink-common v0.1.7-0.20240514153505-0ddba5aa4d2c + github.com/smartcontractkit/chainlink-common v0.1.7-0.20240516150131-e1be553a9d10 github.com/smartcontractkit/chainlink-vrf v0.0.0-20240222010609-cd67d123c772 github.com/smartcontractkit/chainlink/v2 v2.0.0-00010101000000-000000000000 github.com/smartcontractkit/libocr v0.0.0-20240419185742-fd3cab206b2c diff --git a/core/scripts/go.sum b/core/scripts/go.sum index a20ad0cce4f..fee9295d617 100644 --- a/core/scripts/go.sum +++ b/core/scripts/go.sum @@ -1185,8 +1185,8 @@ github.com/smartcontractkit/chain-selectors v1.0.10 h1:t9kJeE6B6G+hKD0GYR4kGJSCq github.com/smartcontractkit/chain-selectors v1.0.10/go.mod h1:d4Hi+E1zqjy9HqMkjBE5q1vcG9VGgxf5VxiRHfzi2kE= github.com/smartcontractkit/chainlink-automation v1.0.3 h1:h/ijT0NiyV06VxYVgcNfsE3+8OEzT3Q0Z9au0z1BPWs= github.com/smartcontractkit/chainlink-automation v1.0.3/go.mod h1:RjboV0Qd7YP+To+OrzHGXaxUxoSONveCoAK2TQ1INLU= -github.com/smartcontractkit/chainlink-common v0.1.7-0.20240514153505-0ddba5aa4d2c h1:KiG8PAwUrdYn/AGBQ+B4p6erEUbEB+g6LJKhAaDjJ2s= -github.com/smartcontractkit/chainlink-common v0.1.7-0.20240514153505-0ddba5aa4d2c/go.mod h1:sj0pjL+METqeYL9ibp0T8SXquymlaQsofa6bdfLgXX8= +github.com/smartcontractkit/chainlink-common v0.1.7-0.20240516150131-e1be553a9d10 h1:IwJKWZHPBJbbh4oI3BGX8VNT3c/ChNiPZ/XI4iq6c0E= +github.com/smartcontractkit/chainlink-common v0.1.7-0.20240516150131-e1be553a9d10/go.mod h1:sj0pjL+METqeYL9ibp0T8SXquymlaQsofa6bdfLgXX8= github.com/smartcontractkit/chainlink-cosmos v0.4.1-0.20240508101745-af1ed7bc8a69 h1:Sec/GpBpUVaTEax1kSHlTvkzF/+d3w5roAQXaj5+SLA= github.com/smartcontractkit/chainlink-cosmos v0.4.1-0.20240508101745-af1ed7bc8a69/go.mod h1:ZQKf+0OLzCLYIisH/OdOIQuFRI6bDuw+jPBTATyHfFM= github.com/smartcontractkit/chainlink-data-streams v0.0.0-20240220203239-09be0ea34540 h1:xFSv8561jsLtF6gYZr/zW2z5qUUAkcFkApin2mnbYTo= diff --git a/core/services/ocr2/delegate.go b/core/services/ocr2/delegate.go index 74e7f9e496f..8ea43582126 100644 --- a/core/services/ocr2/delegate.go +++ b/core/services/ocr2/delegate.go @@ -496,14 +496,22 @@ func GetEVMEffectiveTransmitterID(jb *job.Job, chain legacyevm.Chain, lggr logge if chain == nil { return "", fmt.Errorf("job forwarding requires non-nil chain") } - effectiveTransmitterID, err := chain.TxManager().GetForwarderForEOA(common.HexToAddress(spec.TransmitterID.String)) + + var err error + var effectiveTransmitterID common.Address + // Median forwarders need special handling because of OCR2Aggregator transmitters whitelist. + if spec.PluginType == types.Median { + effectiveTransmitterID, err = chain.TxManager().GetForwarderForEOAOCR2Feeds(common.HexToAddress(spec.TransmitterID.String), common.HexToAddress(spec.ContractID)) + } else { + effectiveTransmitterID, err = chain.TxManager().GetForwarderForEOA(common.HexToAddress(spec.TransmitterID.String)) + } + if err == nil { return effectiveTransmitterID.String(), nil } else if !spec.TransmitterID.Valid { return "", errors.New("failed to get forwarder address and transmitterID is not set") } lggr.Warnw("Skipping forwarding for job, will fallback to default behavior", "job", jb.Name, "err", err) - // this shouldn't happen unless behaviour above was changed } return spec.TransmitterID.String, nil diff --git a/core/services/ocr2/delegate_test.go b/core/services/ocr2/delegate_test.go index 720ad308348..bc5c2df2bbe 100644 --- a/core/services/ocr2/delegate_test.go +++ b/core/services/ocr2/delegate_test.go @@ -67,10 +67,17 @@ func TestGetEVMEffectiveTransmitterID(t *testing.T) { jb.OCR2OracleSpec.RelayConfig["sendingKeys"] = tc.sendingKeys jb.ForwardingAllowed = tc.forwardingEnabled + args := []interface{}{tc.getForwarderForEOAArg} + getForwarderMethodName := "GetForwarderForEOA" + if tc.pluginType == types.Median { + getForwarderMethodName = "GetForwarderForEOAOCR2Feeds" + args = append(args, common.HexToAddress(jb.OCR2OracleSpec.ContractID)) + } + if tc.forwardingEnabled && tc.getForwarderForEOAErr { - txManager.Mock.On("GetForwarderForEOA", tc.getForwarderForEOAArg).Return(common.HexToAddress("0x0"), errors.New("random error")).Once() + txManager.Mock.On(getForwarderMethodName, args...).Return(common.HexToAddress("0x0"), errors.New("random error")).Once() } else if tc.forwardingEnabled { - txManager.Mock.On("GetForwarderForEOA", tc.getForwarderForEOAArg).Return(common.HexToAddress(tc.expectedTransmitterID), nil).Once() + txManager.Mock.On(getForwarderMethodName, args...).Return(common.HexToAddress(tc.expectedTransmitterID), nil).Once() } } diff --git a/core/services/relay/evm/chain_reader_test.go b/core/services/relay/evm/chain_reader_test.go index 72332f38a06..662d5258bfb 100644 --- a/core/services/relay/evm/chain_reader_test.go +++ b/core/services/relay/evm/chain_reader_test.go @@ -46,12 +46,12 @@ const ( triggerWithAllTopics = "TriggeredWithFourTopics" ) -func TestChainReaderGetLatestValue(t *testing.T) { +func TestChainReaderInterfaceTests(t *testing.T) { t.Parallel() it := &chainReaderInterfaceTester{} - RunChainReaderGetLatestValueInterfaceTests(t, it) - RunChainReaderGetLatestValueInterfaceTests(t, commontestutils.WrapChainReaderTesterForLoop(it)) + RunChainReaderInterfaceTests(t, it) + RunChainReaderInterfaceTests(t, commontestutils.WrapChainReaderTesterForLoop(it)) t.Run("Dynamically typed topics can be used to filter and have type correct in return", func(t *testing.T) { it.Setup(t) @@ -110,14 +110,6 @@ func TestChainReaderGetLatestValue(t *testing.T) { }) } -func TestChainReaderQueryKey(t *testing.T) { - t.Parallel() - it := &chainReaderInterfaceTester{} - - RunQueryKeyInterfaceTests(t, it) - RunQueryKeyInterfaceTests(t, commontestutils.WrapChainReaderTesterForLoop(it)) -} - func triggerFourTopics(t *testing.T, it *chainReaderInterfaceTester, i1, i2, i3 int32) { tx, err := it.evmTest.ChainReaderTesterTransactor.TriggerWithFourTopics(it.auth, i1, i2, i3) require.NoError(t, err) diff --git a/core/services/relay/evm/mercury/queue.go b/core/services/relay/evm/mercury/queue.go index 80d0c77f46b..1923b091531 100644 --- a/core/services/relay/evm/mercury/queue.go +++ b/core/services/relay/evm/mercury/queue.go @@ -25,7 +25,7 @@ type asyncDeleter interface { AsyncDelete(req *pb.TransmitRequest) } -var _ services.Service = (*TransmitQueue)(nil) +var _ services.Service = (*transmitQueue)(nil) var transmitQueueLoad = promauto.NewGaugeVec(prometheus.GaugeOpts{ Name: "mercury_transmit_queue_load", @@ -40,7 +40,7 @@ const promInterval = 6500 * time.Millisecond // TransmitQueue is the high-level package that everything outside of this file should be using // It stores pending transmissions, yielding the latest (highest priority) first to the caller -type TransmitQueue struct { +type transmitQueue struct { services.StateMachine cond sync.Cond @@ -62,11 +62,20 @@ type Transmission struct { ReportCtx ocrtypes.ReportContext // contains priority information (latest epoch/round wins) } +type TransmitQueue interface { + services.Service + + BlockingPop() (t *Transmission) + Push(req *pb.TransmitRequest, reportCtx ocrtypes.ReportContext) (ok bool) + Init(transmissions []*Transmission) + IsEmpty() bool +} + // maxlen controls how many items will be stored in the queue // 0 means unlimited - be careful, this can cause memory leaks -func NewTransmitQueue(lggr logger.Logger, serverURL, feedID string, maxlen int, asyncDeleter asyncDeleter) *TransmitQueue { +func NewTransmitQueue(lggr logger.Logger, serverURL, feedID string, maxlen int, asyncDeleter asyncDeleter) TransmitQueue { mu := new(sync.RWMutex) - return &TransmitQueue{ + return &transmitQueue{ services.StateMachine{}, sync.Cond{L: mu}, lggr.Named("TransmitQueue"), @@ -80,13 +89,13 @@ func NewTransmitQueue(lggr logger.Logger, serverURL, feedID string, maxlen int, } } -func (tq *TransmitQueue) Init(transmissions []*Transmission) { +func (tq *transmitQueue) Init(transmissions []*Transmission) { pq := priorityQueue(transmissions) heap.Init(&pq) // ensure the heap is ordered tq.pq = &pq } -func (tq *TransmitQueue) Push(req *pb.TransmitRequest, reportCtx ocrtypes.ReportContext) (ok bool) { +func (tq *transmitQueue) Push(req *pb.TransmitRequest, reportCtx ocrtypes.ReportContext) (ok bool) { tq.cond.L.Lock() defer tq.cond.L.Unlock() @@ -111,7 +120,7 @@ func (tq *TransmitQueue) Push(req *pb.TransmitRequest, reportCtx ocrtypes.Report // BlockingPop will block until at least one item is in the heap, and then return it // If the queue is closed, it will immediately return nil -func (tq *TransmitQueue) BlockingPop() (t *Transmission) { +func (tq *transmitQueue) BlockingPop() (t *Transmission) { tq.cond.L.Lock() defer tq.cond.L.Unlock() if tq.closed { @@ -126,13 +135,13 @@ func (tq *TransmitQueue) BlockingPop() (t *Transmission) { return t } -func (tq *TransmitQueue) IsEmpty() bool { +func (tq *transmitQueue) IsEmpty() bool { tq.mu.RLock() defer tq.mu.RUnlock() return tq.pq.Len() == 0 } -func (tq *TransmitQueue) Start(context.Context) error { +func (tq *transmitQueue) Start(context.Context) error { return tq.StartOnce("TransmitQueue", func() error { t := time.NewTicker(utils.WithJitter(promInterval)) wg := new(sync.WaitGroup) @@ -148,7 +157,7 @@ func (tq *TransmitQueue) Start(context.Context) error { }) } -func (tq *TransmitQueue) Close() error { +func (tq *transmitQueue) Close() error { return tq.StopOnce("TransmitQueue", func() error { tq.cond.L.Lock() tq.closed = true @@ -159,7 +168,7 @@ func (tq *TransmitQueue) Close() error { }) } -func (tq *TransmitQueue) monitorLoop(c <-chan time.Time, chStop <-chan struct{}, wg *sync.WaitGroup) { +func (tq *transmitQueue) monitorLoop(c <-chan time.Time, chStop <-chan struct{}, wg *sync.WaitGroup) { defer wg.Done() for { @@ -172,25 +181,25 @@ func (tq *TransmitQueue) monitorLoop(c <-chan time.Time, chStop <-chan struct{}, } } -func (tq *TransmitQueue) report() { +func (tq *transmitQueue) report() { tq.mu.RLock() length := tq.pq.Len() tq.mu.RUnlock() tq.transmitQueueLoad.Set(float64(length)) } -func (tq *TransmitQueue) Ready() error { +func (tq *transmitQueue) Ready() error { return nil } -func (tq *TransmitQueue) Name() string { return tq.lggr.Name() } -func (tq *TransmitQueue) HealthReport() map[string]error { +func (tq *transmitQueue) Name() string { return tq.lggr.Name() } +func (tq *transmitQueue) HealthReport() map[string]error { report := map[string]error{tq.Name(): errors.Join( tq.status(), )} return report } -func (tq *TransmitQueue) status() (merr error) { +func (tq *transmitQueue) status() (merr error) { tq.mu.RLock() length := tq.pq.Len() closed := tq.closed @@ -206,7 +215,7 @@ func (tq *TransmitQueue) status() (merr error) { // pop latest Transmission from the heap // Not thread-safe -func (tq *TransmitQueue) pop() *Transmission { +func (tq *transmitQueue) pop() *Transmission { if tq.pq.Len() == 0 { return nil } diff --git a/core/services/relay/evm/mercury/transmitter.go b/core/services/relay/evm/mercury/transmitter.go index 3ddc285a6fc..2d54b4b1f17 100644 --- a/core/services/relay/evm/mercury/transmitter.go +++ b/core/services/relay/evm/mercury/transmitter.go @@ -153,10 +153,12 @@ type server struct { c wsrpc.Client pm *PersistenceManager - q *TransmitQueue + q TransmitQueue deleteQueue chan *pb.TransmitRequest + url string + transmitSuccessCount prometheus.Counter transmitDuplicateCount prometheus.Counter transmitConnectionErrorCount prometheus.Counter @@ -268,7 +270,7 @@ func (s *server) runQueueLoop(stopCh services.StopChan, wg *sync.WaitGroup, feed s.transmitDuplicateCount.Inc() s.lggr.Debugw("Transmit report success; duplicate report", "payload", hexutil.Encode(t.Req.Payload), "response", res, "repts", t.ReportCtx.ReportTimestamp) default: - transmitServerErrorCount.WithLabelValues(feedIDHex, fmt.Sprintf("%d", res.Code)).Inc() + transmitServerErrorCount.WithLabelValues(feedIDHex, s.url, fmt.Sprintf("%d", res.Code)).Inc() s.lggr.Errorw("Transmit report failed; mercury server returned error", "response", res, "reportCtx", t.ReportCtx, "err", res.Error, "code", res.Code) } } @@ -281,26 +283,31 @@ func (s *server) runQueueLoop(stopCh services.StopChan, wg *sync.WaitGroup, feed } } +func newServer(lggr logger.Logger, cfg TransmitterConfig, client wsrpc.Client, pm *PersistenceManager, serverURL, feedIDHex string) *server { + return &server{ + lggr, + cfg.TransmitTimeout().Duration(), + client, + pm, + NewTransmitQueue(lggr, serverURL, feedIDHex, int(cfg.TransmitQueueMaxSize()), pm), + make(chan *pb.TransmitRequest, int(cfg.TransmitQueueMaxSize())), + serverURL, + transmitSuccessCount.WithLabelValues(feedIDHex, serverURL), + transmitDuplicateCount.WithLabelValues(feedIDHex, serverURL), + transmitConnectionErrorCount.WithLabelValues(feedIDHex, serverURL), + transmitQueueDeleteErrorCount.WithLabelValues(feedIDHex, serverURL), + transmitQueueInsertErrorCount.WithLabelValues(feedIDHex, serverURL), + transmitQueuePushErrorCount.WithLabelValues(feedIDHex, serverURL), + } +} + func NewTransmitter(lggr logger.Logger, cfg TransmitterConfig, clients map[string]wsrpc.Client, fromAccount ed25519.PublicKey, jobID int32, feedID [32]byte, orm ORM, codec TransmitterReportDecoder, triggerCapability *triggers.MercuryTriggerService) *mercuryTransmitter { feedIDHex := fmt.Sprintf("0x%x", feedID[:]) servers := make(map[string]*server, len(clients)) for serverURL, client := range clients { cLggr := lggr.Named(serverURL).With("serverURL", serverURL) pm := NewPersistenceManager(cLggr, serverURL, orm, jobID, int(cfg.TransmitQueueMaxSize()), flushDeletesFrequency, pruneFrequency) - servers[serverURL] = &server{ - cLggr, - cfg.TransmitTimeout().Duration(), - client, - pm, - NewTransmitQueue(cLggr, serverURL, feedIDHex, int(cfg.TransmitQueueMaxSize()), pm), - make(chan *pb.TransmitRequest, int(cfg.TransmitQueueMaxSize())), - transmitSuccessCount.WithLabelValues(feedIDHex, serverURL), - transmitDuplicateCount.WithLabelValues(feedIDHex, serverURL), - transmitConnectionErrorCount.WithLabelValues(feedIDHex, serverURL), - transmitQueueDeleteErrorCount.WithLabelValues(feedIDHex, serverURL), - transmitQueueInsertErrorCount.WithLabelValues(feedIDHex, serverURL), - transmitQueuePushErrorCount.WithLabelValues(feedIDHex, serverURL), - } + servers[serverURL] = newServer(cLggr, cfg, client, pm, serverURL, feedIDHex) } return &mercuryTransmitter{ services.StateMachine{}, diff --git a/core/services/relay/evm/mercury/transmitter_test.go b/core/services/relay/evm/mercury/transmitter_test.go index 25f8a918e4e..4eedc0c24a8 100644 --- a/core/services/relay/evm/mercury/transmitter_test.go +++ b/core/services/relay/evm/mercury/transmitter_test.go @@ -3,6 +3,7 @@ package mercury import ( "context" "math/big" + "sync" "testing" "time" @@ -15,6 +16,7 @@ import ( "github.com/smartcontractkit/chainlink-common/pkg/capabilities/triggers" commonconfig "github.com/smartcontractkit/chainlink-common/pkg/config" + "github.com/smartcontractkit/chainlink/v2/core/chains/evm/utils" "github.com/smartcontractkit/chainlink/v2/core/internal/testutils" "github.com/smartcontractkit/chainlink/v2/core/internal/testutils/pgtest" "github.com/smartcontractkit/chainlink/v2/core/logger" @@ -56,8 +58,8 @@ func Test_MercuryTransmitter_Transmit(t *testing.T) { require.NoError(t, err) // ensure it was added to the queue - require.Equal(t, mt.servers[sURL].q.pq.Len(), 1) - assert.Subset(t, mt.servers[sURL].q.pq.Pop().(*Transmission).Req.Payload, report) + require.Equal(t, mt.servers[sURL].q.(*transmitQueue).pq.Len(), 1) + assert.Subset(t, mt.servers[sURL].q.(*transmitQueue).pq.Pop().(*Transmission).Req.Payload, report) }) t.Run("v2 report transmission successfully enqueued", func(t *testing.T) { report := sampleV2Report @@ -70,8 +72,8 @@ func Test_MercuryTransmitter_Transmit(t *testing.T) { require.NoError(t, err) // ensure it was added to the queue - require.Equal(t, mt.servers[sURL].q.pq.Len(), 1) - assert.Subset(t, mt.servers[sURL].q.pq.Pop().(*Transmission).Req.Payload, report) + require.Equal(t, mt.servers[sURL].q.(*transmitQueue).pq.Len(), 1) + assert.Subset(t, mt.servers[sURL].q.(*transmitQueue).pq.Pop().(*Transmission).Req.Payload, report) }) t.Run("v3 report transmission successfully enqueued", func(t *testing.T) { report := sampleV3Report @@ -84,8 +86,8 @@ func Test_MercuryTransmitter_Transmit(t *testing.T) { require.NoError(t, err) // ensure it was added to the queue - require.Equal(t, mt.servers[sURL].q.pq.Len(), 1) - assert.Subset(t, mt.servers[sURL].q.pq.Pop().(*Transmission).Req.Payload, report) + require.Equal(t, mt.servers[sURL].q.(*transmitQueue).pq.Len(), 1) + assert.Subset(t, mt.servers[sURL].q.(*transmitQueue).pq.Pop().(*Transmission).Req.Payload, report) }) t.Run("v3 report transmission sent only to trigger service", func(t *testing.T) { report := sampleV3Report @@ -98,7 +100,7 @@ func Test_MercuryTransmitter_Transmit(t *testing.T) { err := mt.Transmit(testutils.Context(t), sampleReportContext, report, sampleSigs) require.NoError(t, err) // queue is empty - require.Equal(t, mt.servers[sURL].q.pq.Len(), 0) + require.Equal(t, mt.servers[sURL].q.(*transmitQueue).pq.Len(), 0) }) }) @@ -119,12 +121,12 @@ func Test_MercuryTransmitter_Transmit(t *testing.T) { require.NoError(t, err) // ensure it was added to the queue - require.Equal(t, mt.servers[sURL].q.pq.Len(), 1) - assert.Subset(t, mt.servers[sURL].q.pq.Pop().(*Transmission).Req.Payload, report) - require.Equal(t, mt.servers[sURL2].q.pq.Len(), 1) - assert.Subset(t, mt.servers[sURL2].q.pq.Pop().(*Transmission).Req.Payload, report) - require.Equal(t, mt.servers[sURL3].q.pq.Len(), 1) - assert.Subset(t, mt.servers[sURL3].q.pq.Pop().(*Transmission).Req.Payload, report) + require.Equal(t, mt.servers[sURL].q.(*transmitQueue).pq.Len(), 1) + assert.Subset(t, mt.servers[sURL].q.(*transmitQueue).pq.Pop().(*Transmission).Req.Payload, report) + require.Equal(t, mt.servers[sURL2].q.(*transmitQueue).pq.Len(), 1) + assert.Subset(t, mt.servers[sURL2].q.(*transmitQueue).pq.Pop().(*Transmission).Req.Payload, report) + require.Equal(t, mt.servers[sURL3].q.(*transmitQueue).pq.Len(), 1) + assert.Subset(t, mt.servers[sURL3].q.(*transmitQueue).pq.Pop().(*Transmission).Req.Payload, report) }) } @@ -413,3 +415,166 @@ func Test_sortReportsLatestFirst(t *testing.T) { assert.Nil(t, reports[6]) assert.Nil(t, reports[7]) } + +type mockQ struct { + ch chan *Transmission +} + +func newMockQ() *mockQ { + return &mockQ{make(chan *Transmission, 100)} +} + +func (m *mockQ) Start(context.Context) error { return nil } +func (m *mockQ) Close() error { + m.ch <- nil + return nil +} +func (m *mockQ) Ready() error { return nil } +func (m *mockQ) HealthReport() map[string]error { return nil } +func (m *mockQ) Name() string { return "" } +func (m *mockQ) BlockingPop() (t *Transmission) { + val := <-m.ch + return val +} +func (m *mockQ) Push(req *pb.TransmitRequest, reportCtx ocrtypes.ReportContext) (ok bool) { + m.ch <- &Transmission{Req: req, ReportCtx: reportCtx} + return true +} +func (m *mockQ) Init(transmissions []*Transmission) {} +func (m *mockQ) IsEmpty() bool { return false } + +func Test_MercuryTransmitter_runQueueLoop(t *testing.T) { + feedIDHex := utils.NewHash().Hex() + lggr := logger.TestLogger(t) + c := &mocks.MockWSRPCClient{} + db := pgtest.NewSqlxDB(t) + orm := NewORM(db) + pm := NewPersistenceManager(lggr, sURL, orm, 0, 0, 0, 0) + cfg := mockCfg{} + + s := newServer(lggr, cfg, c, pm, sURL, feedIDHex) + + req := &pb.TransmitRequest{ + Payload: []byte{1, 2, 3}, + ReportFormat: 32, + } + + t.Run("pulls from queue and transmits successfully", func(t *testing.T) { + transmit := make(chan *pb.TransmitRequest, 1) + c.TransmitF = func(ctx context.Context, in *pb.TransmitRequest) (*pb.TransmitResponse, error) { + transmit <- in + return &pb.TransmitResponse{Code: 0, Error: ""}, nil + } + q := newMockQ() + s.q = q + wg := &sync.WaitGroup{} + wg.Add(1) + + go s.runQueueLoop(nil, wg, feedIDHex) + + q.Push(req, sampleReportContext) + + select { + case tr := <-transmit: + assert.Equal(t, []byte{1, 2, 3}, tr.Payload) + assert.Equal(t, 32, int(tr.ReportFormat)) + // case <-time.After(testutils.WaitTimeout(t)): + case <-time.After(1 * time.Second): + t.Fatal("expected a transmit request to be sent") + } + + q.Close() + wg.Wait() + }) + + t.Run("on duplicate, success", func(t *testing.T) { + transmit := make(chan *pb.TransmitRequest, 1) + c.TransmitF = func(ctx context.Context, in *pb.TransmitRequest) (*pb.TransmitResponse, error) { + transmit <- in + return &pb.TransmitResponse{Code: DuplicateReport, Error: ""}, nil + } + q := newMockQ() + s.q = q + wg := &sync.WaitGroup{} + wg.Add(1) + + go s.runQueueLoop(nil, wg, feedIDHex) + + q.Push(req, sampleReportContext) + + select { + case tr := <-transmit: + assert.Equal(t, []byte{1, 2, 3}, tr.Payload) + assert.Equal(t, 32, int(tr.ReportFormat)) + // case <-time.After(testutils.WaitTimeout(t)): + case <-time.After(1 * time.Second): + t.Fatal("expected a transmit request to be sent") + } + + q.Close() + wg.Wait() + }) + t.Run("on server-side error, does not retry", func(t *testing.T) { + transmit := make(chan *pb.TransmitRequest, 1) + c.TransmitF = func(ctx context.Context, in *pb.TransmitRequest) (*pb.TransmitResponse, error) { + transmit <- in + return &pb.TransmitResponse{Code: DuplicateReport, Error: ""}, nil + } + q := newMockQ() + s.q = q + wg := &sync.WaitGroup{} + wg.Add(1) + + go s.runQueueLoop(nil, wg, feedIDHex) + + q.Push(req, sampleReportContext) + + select { + case tr := <-transmit: + assert.Equal(t, []byte{1, 2, 3}, tr.Payload) + assert.Equal(t, 32, int(tr.ReportFormat)) + // case <-time.After(testutils.WaitTimeout(t)): + case <-time.After(1 * time.Second): + t.Fatal("expected a transmit request to be sent") + } + + q.Close() + wg.Wait() + }) + t.Run("on transmit error, retries", func(t *testing.T) { + transmit := make(chan *pb.TransmitRequest, 1) + c.TransmitF = func(ctx context.Context, in *pb.TransmitRequest) (*pb.TransmitResponse, error) { + transmit <- in + return &pb.TransmitResponse{}, errors.New("transmission error") + } + q := newMockQ() + s.q = q + wg := &sync.WaitGroup{} + wg.Add(1) + stopCh := make(chan struct{}, 1) + + go s.runQueueLoop(stopCh, wg, feedIDHex) + + q.Push(req, sampleReportContext) + + cnt := 0 + Loop: + for { + select { + case tr := <-transmit: + assert.Equal(t, []byte{1, 2, 3}, tr.Payload) + assert.Equal(t, 32, int(tr.ReportFormat)) + if cnt > 2 { + break Loop + } + cnt++ + // case <-time.After(testutils.WaitTimeout(t)): + case <-time.After(1 * time.Second): + t.Fatal("expected 3 transmit requests to be sent") + } + } + + close(stopCh) + wg.Wait() + }) +} diff --git a/go.mod b/go.mod index f46e4b78333..9220ad3e63d 100644 --- a/go.mod +++ b/go.mod @@ -72,7 +72,7 @@ require ( github.com/shopspring/decimal v1.3.1 github.com/smartcontractkit/chain-selectors v1.0.10 github.com/smartcontractkit/chainlink-automation v1.0.3 - github.com/smartcontractkit/chainlink-common v0.1.7-0.20240514153505-0ddba5aa4d2c + github.com/smartcontractkit/chainlink-common v0.1.7-0.20240516150131-e1be553a9d10 github.com/smartcontractkit/chainlink-cosmos v0.4.1-0.20240508101745-af1ed7bc8a69 github.com/smartcontractkit/chainlink-data-streams v0.0.0-20240220203239-09be0ea34540 github.com/smartcontractkit/chainlink-feeds v0.0.0-20240422130241-13c17a91b2ab diff --git a/go.sum b/go.sum index 5fa3897fc21..3d8d24b481b 100644 --- a/go.sum +++ b/go.sum @@ -1171,8 +1171,8 @@ github.com/smartcontractkit/chain-selectors v1.0.10 h1:t9kJeE6B6G+hKD0GYR4kGJSCq github.com/smartcontractkit/chain-selectors v1.0.10/go.mod h1:d4Hi+E1zqjy9HqMkjBE5q1vcG9VGgxf5VxiRHfzi2kE= github.com/smartcontractkit/chainlink-automation v1.0.3 h1:h/ijT0NiyV06VxYVgcNfsE3+8OEzT3Q0Z9au0z1BPWs= github.com/smartcontractkit/chainlink-automation v1.0.3/go.mod h1:RjboV0Qd7YP+To+OrzHGXaxUxoSONveCoAK2TQ1INLU= -github.com/smartcontractkit/chainlink-common v0.1.7-0.20240514153505-0ddba5aa4d2c h1:KiG8PAwUrdYn/AGBQ+B4p6erEUbEB+g6LJKhAaDjJ2s= -github.com/smartcontractkit/chainlink-common v0.1.7-0.20240514153505-0ddba5aa4d2c/go.mod h1:sj0pjL+METqeYL9ibp0T8SXquymlaQsofa6bdfLgXX8= +github.com/smartcontractkit/chainlink-common v0.1.7-0.20240516150131-e1be553a9d10 h1:IwJKWZHPBJbbh4oI3BGX8VNT3c/ChNiPZ/XI4iq6c0E= +github.com/smartcontractkit/chainlink-common v0.1.7-0.20240516150131-e1be553a9d10/go.mod h1:sj0pjL+METqeYL9ibp0T8SXquymlaQsofa6bdfLgXX8= github.com/smartcontractkit/chainlink-cosmos v0.4.1-0.20240508101745-af1ed7bc8a69 h1:Sec/GpBpUVaTEax1kSHlTvkzF/+d3w5roAQXaj5+SLA= github.com/smartcontractkit/chainlink-cosmos v0.4.1-0.20240508101745-af1ed7bc8a69/go.mod h1:ZQKf+0OLzCLYIisH/OdOIQuFRI6bDuw+jPBTATyHfFM= github.com/smartcontractkit/chainlink-data-streams v0.0.0-20240220203239-09be0ea34540 h1:xFSv8561jsLtF6gYZr/zW2z5qUUAkcFkApin2mnbYTo= diff --git a/integration-tests/go.mod b/integration-tests/go.mod index c831f3f181d..610e7067ec2 100644 --- a/integration-tests/go.mod +++ b/integration-tests/go.mod @@ -27,7 +27,7 @@ require ( github.com/shopspring/decimal v1.3.1 github.com/slack-go/slack v0.12.2 github.com/smartcontractkit/chainlink-automation v1.0.3 - github.com/smartcontractkit/chainlink-common v0.1.7-0.20240514153505-0ddba5aa4d2c + github.com/smartcontractkit/chainlink-common v0.1.7-0.20240516150131-e1be553a9d10 github.com/smartcontractkit/chainlink-testing-framework v1.28.12 github.com/smartcontractkit/chainlink-vrf v0.0.0-20231120191722-fef03814f868 github.com/smartcontractkit/chainlink/v2 v2.0.0-00010101000000-000000000000 diff --git a/integration-tests/go.sum b/integration-tests/go.sum index dfacb294fd9..63f81dd8252 100644 --- a/integration-tests/go.sum +++ b/integration-tests/go.sum @@ -1512,8 +1512,8 @@ github.com/smartcontractkit/chain-selectors v1.0.10 h1:t9kJeE6B6G+hKD0GYR4kGJSCq github.com/smartcontractkit/chain-selectors v1.0.10/go.mod h1:d4Hi+E1zqjy9HqMkjBE5q1vcG9VGgxf5VxiRHfzi2kE= github.com/smartcontractkit/chainlink-automation v1.0.3 h1:h/ijT0NiyV06VxYVgcNfsE3+8OEzT3Q0Z9au0z1BPWs= github.com/smartcontractkit/chainlink-automation v1.0.3/go.mod h1:RjboV0Qd7YP+To+OrzHGXaxUxoSONveCoAK2TQ1INLU= -github.com/smartcontractkit/chainlink-common v0.1.7-0.20240514153505-0ddba5aa4d2c h1:KiG8PAwUrdYn/AGBQ+B4p6erEUbEB+g6LJKhAaDjJ2s= -github.com/smartcontractkit/chainlink-common v0.1.7-0.20240514153505-0ddba5aa4d2c/go.mod h1:sj0pjL+METqeYL9ibp0T8SXquymlaQsofa6bdfLgXX8= +github.com/smartcontractkit/chainlink-common v0.1.7-0.20240516150131-e1be553a9d10 h1:IwJKWZHPBJbbh4oI3BGX8VNT3c/ChNiPZ/XI4iq6c0E= +github.com/smartcontractkit/chainlink-common v0.1.7-0.20240516150131-e1be553a9d10/go.mod h1:sj0pjL+METqeYL9ibp0T8SXquymlaQsofa6bdfLgXX8= github.com/smartcontractkit/chainlink-cosmos v0.4.1-0.20240508101745-af1ed7bc8a69 h1:Sec/GpBpUVaTEax1kSHlTvkzF/+d3w5roAQXaj5+SLA= github.com/smartcontractkit/chainlink-cosmos v0.4.1-0.20240508101745-af1ed7bc8a69/go.mod h1:ZQKf+0OLzCLYIisH/OdOIQuFRI6bDuw+jPBTATyHfFM= github.com/smartcontractkit/chainlink-data-streams v0.0.0-20240220203239-09be0ea34540 h1:xFSv8561jsLtF6gYZr/zW2z5qUUAkcFkApin2mnbYTo= diff --git a/integration-tests/load/go.mod b/integration-tests/load/go.mod index 782408dff3c..575986038e2 100644 --- a/integration-tests/load/go.mod +++ b/integration-tests/load/go.mod @@ -16,7 +16,7 @@ require ( github.com/rs/zerolog v1.30.0 github.com/slack-go/slack v0.12.2 github.com/smartcontractkit/chainlink-automation v1.0.3 - github.com/smartcontractkit/chainlink-common v0.1.7-0.20240514153505-0ddba5aa4d2c + github.com/smartcontractkit/chainlink-common v0.1.7-0.20240516150131-e1be553a9d10 github.com/smartcontractkit/chainlink-testing-framework v1.28.12 github.com/smartcontractkit/chainlink/integration-tests v0.0.0-20240214231432-4ad5eb95178c github.com/smartcontractkit/chainlink/v2 v2.9.0-beta0.0.20240216210048-da02459ddad8 diff --git a/integration-tests/load/go.sum b/integration-tests/load/go.sum index 00112420705..a045ea3fd34 100644 --- a/integration-tests/load/go.sum +++ b/integration-tests/load/go.sum @@ -1502,8 +1502,8 @@ github.com/smartcontractkit/chain-selectors v1.0.10 h1:t9kJeE6B6G+hKD0GYR4kGJSCq github.com/smartcontractkit/chain-selectors v1.0.10/go.mod h1:d4Hi+E1zqjy9HqMkjBE5q1vcG9VGgxf5VxiRHfzi2kE= github.com/smartcontractkit/chainlink-automation v1.0.3 h1:h/ijT0NiyV06VxYVgcNfsE3+8OEzT3Q0Z9au0z1BPWs= github.com/smartcontractkit/chainlink-automation v1.0.3/go.mod h1:RjboV0Qd7YP+To+OrzHGXaxUxoSONveCoAK2TQ1INLU= -github.com/smartcontractkit/chainlink-common v0.1.7-0.20240514153505-0ddba5aa4d2c h1:KiG8PAwUrdYn/AGBQ+B4p6erEUbEB+g6LJKhAaDjJ2s= -github.com/smartcontractkit/chainlink-common v0.1.7-0.20240514153505-0ddba5aa4d2c/go.mod h1:sj0pjL+METqeYL9ibp0T8SXquymlaQsofa6bdfLgXX8= +github.com/smartcontractkit/chainlink-common v0.1.7-0.20240516150131-e1be553a9d10 h1:IwJKWZHPBJbbh4oI3BGX8VNT3c/ChNiPZ/XI4iq6c0E= +github.com/smartcontractkit/chainlink-common v0.1.7-0.20240516150131-e1be553a9d10/go.mod h1:sj0pjL+METqeYL9ibp0T8SXquymlaQsofa6bdfLgXX8= github.com/smartcontractkit/chainlink-cosmos v0.4.1-0.20240508101745-af1ed7bc8a69 h1:Sec/GpBpUVaTEax1kSHlTvkzF/+d3w5roAQXaj5+SLA= github.com/smartcontractkit/chainlink-cosmos v0.4.1-0.20240508101745-af1ed7bc8a69/go.mod h1:ZQKf+0OLzCLYIisH/OdOIQuFRI6bDuw+jPBTATyHfFM= github.com/smartcontractkit/chainlink-data-streams v0.0.0-20240220203239-09be0ea34540 h1:xFSv8561jsLtF6gYZr/zW2z5qUUAkcFkApin2mnbYTo= diff --git a/integration-tests/smoke/forwarders_ocr2_test.go b/integration-tests/smoke/forwarders_ocr2_test.go index 9dd5d5c39a4..00fd3583ea6 100644 --- a/integration-tests/smoke/forwarders_ocr2_test.go +++ b/integration-tests/smoke/forwarders_ocr2_test.go @@ -94,9 +94,6 @@ func TestForwarderOCR2Basic(t *testing.T) { ocrInstances, err := actions_seth.DeployOCRv2Contracts(l, sethClient, 1, common.HexToAddress(lt.Address()), transmitters, ocrOffchainOptions) require.NoError(t, err, "Error deploying OCRv2 contracts with forwarders") - err = actions.CreateOCRv2JobsLocal(ocrInstances, bootstrapNode, workerNodes, env.MockAdapter, "ocr2", 5, uint64(sethClient.ChainID), true, false) - require.NoError(t, err, "Error creating OCRv2 jobs with forwarders") - ocrv2Config, err := actions.BuildMedianOCR2ConfigLocal(workerNodes, ocrOffchainOptions) require.NoError(t, err, "Error building OCRv2 config") ocrv2Config.Transmitters = authorizedForwarders @@ -104,6 +101,9 @@ func TestForwarderOCR2Basic(t *testing.T) { err = actions_seth.ConfigureOCRv2AggregatorContracts(ocrv2Config, ocrInstances) require.NoError(t, err, "Error configuring OCRv2 aggregator contracts") + err = actions.CreateOCRv2JobsLocal(ocrInstances, bootstrapNode, workerNodes, env.MockAdapter, "ocr2", 5, uint64(sethClient.ChainID), true, false) + require.NoError(t, err, "Error creating OCRv2 jobs with forwarders") + err = actions_seth.WatchNewOCRRound(l, sethClient, 1, contracts.V2OffChainAgrregatorToOffChainAggregatorWithRounds(ocrInstances), time.Duration(10*time.Minute)) require.NoError(t, err, "error watching for new OCRv2 round") diff --git a/integration-tests/smoke/ocr2_test.go b/integration-tests/smoke/ocr2_test.go index d2df0c858c0..19bf4a5aa07 100644 --- a/integration-tests/smoke/ocr2_test.go +++ b/integration-tests/smoke/ocr2_test.go @@ -25,17 +25,27 @@ import ( "github.com/smartcontractkit/chainlink/integration-tests/types/config/node" ) +type ocr2test struct { + name string + env map[string]string + chainReaderAndCodec bool +} + +func defaultTestData() ocr2test { + return ocr2test{ + name: "n/a", + env: make(map[string]string), + chainReaderAndCodec: false, + } +} + // Tests a basic OCRv2 median feed func TestOCRv2Basic(t *testing.T) { t.Parallel() noMedianPlugin := map[string]string{string(env.MedianPlugin.Cmd): ""} medianPlugin := map[string]string{string(env.MedianPlugin.Cmd): "chainlink-feeds"} - for _, test := range []struct { - name string - env map[string]string - chainReaderAndCodec bool - }{ + for _, test := range []ocr2test{ {"legacy", noMedianPlugin, false}, {"legacy-chain-reader", noMedianPlugin, true}, {"plugins", medianPlugin, false}, @@ -46,7 +56,7 @@ func TestOCRv2Basic(t *testing.T) { t.Parallel() l := logging.GetTestLogger(t) - env, aggregatorContracts, sethClient := prepareORCv2SmokeTestEnv(t, l, 5) + env, aggregatorContracts, sethClient := prepareORCv2SmokeTestEnv(t, test, l, 5) err := env.MockAdapter.SetAdapterBasedIntValuePath("ocr2", []string{http.MethodGet, http.MethodPost}, 10) require.NoError(t, err) @@ -68,7 +78,7 @@ func TestOCRv2Request(t *testing.T) { t.Parallel() l := logging.GetTestLogger(t) - _, aggregatorContracts, sethClient := prepareORCv2SmokeTestEnv(t, l, 5) + _, aggregatorContracts, sethClient := prepareORCv2SmokeTestEnv(t, defaultTestData(), l, 5) // Keep the mockserver value the same and continually request new rounds for round := 2; round <= 4; round++ { @@ -90,7 +100,7 @@ func TestOCRv2JobReplacement(t *testing.T) { t.Parallel() l := logging.GetTestLogger(t) - env, aggregatorContracts, sethClient := prepareORCv2SmokeTestEnv(t, l, 5) + env, aggregatorContracts, sethClient := prepareORCv2SmokeTestEnv(t, defaultTestData(), l, 5) nodeClients := env.ClCluster.NodeAPIs() bootstrapNode, workerNodes := nodeClients[0], nodeClients[1:] @@ -126,7 +136,7 @@ func TestOCRv2JobReplacement(t *testing.T) { ) } -func prepareORCv2SmokeTestEnv(t *testing.T, l zerolog.Logger, firstRoundResult int) (*test_env.CLClusterTestEnv, []contracts.OffchainAggregatorV2, *seth.Client) { +func prepareORCv2SmokeTestEnv(t *testing.T, testData ocr2test, l zerolog.Logger, firstRoundResult int) (*test_env.CLClusterTestEnv, []contracts.OffchainAggregatorV2, *seth.Client) { config, err := tc.GetConfig("Smoke", tc.OCR2) if err != nil { t.Fatal(err) @@ -145,6 +155,7 @@ func prepareORCv2SmokeTestEnv(t *testing.T, l zerolog.Logger, firstRoundResult i node.WithP2Pv2(), node.WithTracing(), )). + WithCLNodeOptions(test_env.WithNodeEnvVars(testData.env)). WithCLNodes(6). WithFunding(big.NewFloat(.1)). WithStandardCleanup(). @@ -179,7 +190,7 @@ func prepareORCv2SmokeTestEnv(t *testing.T, l zerolog.Logger, firstRoundResult i aggregatorContracts, err := actions_seth.DeployOCRv2Contracts(l, sethClient, 1, common.HexToAddress(linkContract.Address()), transmitters, ocrOffchainOptions) require.NoError(t, err, "Error deploying OCRv2 aggregator contracts") - err = actions.CreateOCRv2JobsLocal(aggregatorContracts, bootstrapNode, workerNodes, env.MockAdapter, "ocr2", 5, uint64(sethClient.ChainID), false, false) + err = actions.CreateOCRv2JobsLocal(aggregatorContracts, bootstrapNode, workerNodes, env.MockAdapter, "ocr2", 5, uint64(sethClient.ChainID), false, testData.chainReaderAndCodec) require.NoError(t, err, "Error creating OCRv2 jobs") ocrv2Config, err := actions.BuildMedianOCR2ConfigLocal(workerNodes, ocrOffchainOptions)