diff --git a/core/chains/cosmos/cosmostxm/txm.go b/core/chains/cosmos/cosmostxm/txm.go index 48d5c2a13ce..e9fb2f6aca3 100644 --- a/core/chains/cosmos/cosmostxm/txm.go +++ b/core/chains/cosmos/cosmostxm/txm.go @@ -4,6 +4,7 @@ import ( "cmp" "context" "encoding/hex" + "fmt" "slices" "strings" "time" @@ -327,7 +328,13 @@ func (txm *Txm) sendMsgBatchFromAddress(ctx context.Context, gasPrice sdk.DecCoi // Assume transient api issue and retry. return err } - timeoutHeight := uint64(lb.Block.Header.Height) + uint64(txm.cfg.BlocksUntilTxTimeout()) + header, timeout := lb.SdkBlock.Header.Height, txm.cfg.BlocksUntilTxTimeout() + if header < 0 { + return fmt.Errorf("invalid negative header height: %d", header) + } else if timeout < 0 { + return fmt.Errorf("invalid negative blocks until tx timeout: %d", timeout) + } + timeoutHeight := uint64(header) + uint64(timeout) signedTx, err := tc.CreateAndSign(simResults.Succeeded.GetMsgs(), an, sn, gasLimit, txm.cfg.GasLimitMultiplier(), gasPrice, NewKeyWrapper(txm.keystoreAdapter, sender.String()), timeoutHeight) if err != nil { diff --git a/core/chains/cosmos/cosmostxm/txm_internal_test.go b/core/chains/cosmos/cosmostxm/txm_internal_test.go index c236242b84c..d9ad87375d2 100644 --- a/core/chains/cosmos/cosmostxm/txm_internal_test.go +++ b/core/chains/cosmos/cosmostxm/txm_internal_test.go @@ -6,7 +6,6 @@ import ( "time" wasmtypes "github.com/CosmWasm/wasmd/x/wasm/types" - tmtypes "github.com/cometbft/cometbft/proto/tendermint/types" tmservicetypes "github.com/cosmos/cosmos-sdk/client/grpc/tmservice" cosmostypes "github.com/cosmos/cosmos-sdk/types" txtypes "github.com/cosmos/cosmos-sdk/types/tx" @@ -112,8 +111,8 @@ func TestTxm(t *testing.T) { tc.On("SimulateUnsigned", mock.Anything, mock.Anything).Return(&txtypes.SimulateResponse{GasInfo: &cosmostypes.GasInfo{ GasUsed: 1_000_000, }}, nil) - tc.On("LatestBlock").Return(&tmservicetypes.GetLatestBlockResponse{Block: &tmtypes.Block{ - Header: tmtypes.Header{Height: 1}, + tc.On("LatestBlock").Return(&tmservicetypes.GetLatestBlockResponse{SdkBlock: &tmservicetypes.Block{ + Header: tmservicetypes.Header{Height: 1}, }}, nil) tc.On("CreateAndSign", mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return([]byte{0x01}, nil) @@ -168,8 +167,8 @@ func TestTxm(t *testing.T) { tc.On("SimulateUnsigned", mock.Anything, mock.Anything).Return(&txtypes.SimulateResponse{GasInfo: &cosmostypes.GasInfo{ GasUsed: 1_000_000, }}, nil).Once() - tc.On("LatestBlock").Return(&tmservicetypes.GetLatestBlockResponse{Block: &tmtypes.Block{ - Header: tmtypes.Header{Height: 1}, + tc.On("LatestBlock").Return(&tmservicetypes.GetLatestBlockResponse{SdkBlock: &tmservicetypes.Block{ + Header: tmservicetypes.Header{Height: 1}, }}, nil).Once() tc.On("CreateAndSign", mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return([]byte{0x01}, nil).Once() txResp := &cosmostypes.TxResponse{TxHash: "4BF5122F344554C53BDE2EBB8CD2B7E3D1600AD631C385A5D7CCE23C7785459A"} @@ -227,8 +226,8 @@ func TestTxm(t *testing.T) { tc.On("SimulateUnsigned", mock.Anything, mock.Anything).Return(&txtypes.SimulateResponse{GasInfo: &cosmostypes.GasInfo{ GasUsed: 1_000_000, }}, nil).Once() - tc.On("LatestBlock").Return(&tmservicetypes.GetLatestBlockResponse{Block: &tmtypes.Block{ - Header: tmtypes.Header{Height: 1}, + tc.On("LatestBlock").Return(&tmservicetypes.GetLatestBlockResponse{SdkBlock: &tmservicetypes.Block{ + Header: tmservicetypes.Header{Height: 1}, }}, nil).Once() tc.On("CreateAndSign", mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return([]byte{0x01}, nil).Once() } @@ -360,8 +359,8 @@ func TestTxm(t *testing.T) { tc.On("SimulateUnsigned", mock.Anything, mock.Anything).Return(&txtypes.SimulateResponse{GasInfo: &cosmostypes.GasInfo{ GasUsed: 1_000_000, }}, nil) - tc.On("LatestBlock").Return(&tmservicetypes.GetLatestBlockResponse{Block: &tmtypes.Block{ - Header: tmtypes.Header{Height: 1}, + tc.On("LatestBlock").Return(&tmservicetypes.GetLatestBlockResponse{SdkBlock: &tmservicetypes.Block{ + Header: tmservicetypes.Header{Height: 1}, }}, nil) tc.On("CreateAndSign", mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return([]byte{0x01}, nil) txResp := &cosmostypes.TxResponse{TxHash: "4BF5122F344554C53BDE2EBB8CD2B7E3D1600AD631C385A5D7CCE23C7785459A"} diff --git a/core/chains/evm/log/broadcaster.go b/core/chains/evm/log/broadcaster.go index 8ee4018f3c5..bc9ba1cf6cf 100644 --- a/core/chains/evm/log/broadcaster.go +++ b/core/chains/evm/log/broadcaster.go @@ -188,7 +188,7 @@ func NewBroadcaster(orm ORM, ethClient evmclient.Client, config Config, lggr log func (b *broadcaster) Start(context.Context) error { return b.StartOnce("LogBroadcaster", func() error { - b.wgDone.Add(2) + b.wgDone.Add(1) go b.awaitInitialSubscribers() b.mailMon.Monitor(b.changeSubscriberStatus, "LogBroadcaster", "ChangeSubscriber", b.evmChainID.String()) return nil @@ -234,11 +234,11 @@ func (b *broadcaster) awaitInitialSubscribers() { case <-b.DependentAwaiter.AwaitDependents(): // ensure that any queued dependent subscriptions are registered first b.onChangeSubscriberStatus() + b.wgDone.Add(1) go b.startResubscribeLoop() return case <-b.chStop: - b.wgDone.Done() // because startResubscribeLoop won't be called return } } diff --git a/core/chains/evm/log/helpers_test.go b/core/chains/evm/log/helpers_test.go index e82ed31fa17..58e81132b0f 100644 --- a/core/chains/evm/log/helpers_test.go +++ b/core/chains/evm/log/helpers_test.go @@ -58,20 +58,10 @@ type broadcasterHelper struct { pipelineHelper cltest.JobPipelineV2TestHelper } -func newBroadcasterHelper(t *testing.T, blockHeight int64, timesSubscribe int, overridesFn func(*chainlink.Config, *chainlink.Secrets)) *broadcasterHelper { - return broadcasterHelperCfg{}.new(t, blockHeight, timesSubscribe, nil, overridesFn) -} - -type broadcasterHelperCfg struct { - highestSeenHead *evmtypes.Head - db *sqlx.DB -} +func newBroadcasterHelper(t *testing.T, blockHeight int64, timesSubscribe int, filterLogsResult []types.Log, overridesFn func(*chainlink.Config, *chainlink.Secrets)) *broadcasterHelper { + // ensure we check before registering any mock Cleanup assertions + testutils.SkipShortDB(t) -func (c broadcasterHelperCfg) new(t *testing.T, blockHeight int64, timesSubscribe int, filterLogsResult []types.Log, overridesFn func(*chainlink.Config, *chainlink.Secrets)) *broadcasterHelper { - if c.db == nil { - // ensure we check before registering any mock Cleanup assertions - testutils.SkipShortDB(t) - } expectedCalls := mockEthClientExpectedCalls{ SubscribeFilterLogs: timesSubscribe, HeaderByNumber: 1, @@ -81,21 +71,13 @@ func (c broadcasterHelperCfg) new(t *testing.T, blockHeight int64, timesSubscrib chchRawLogs := make(chan evmtest.RawSub[types.Log], timesSubscribe) mockEth := newMockEthClient(t, chchRawLogs, blockHeight, expectedCalls) - helper := c.newWithEthClient(t, mockEth.EthClient, overridesFn) + helper := newBroadcasterHelperWithEthClient(t, mockEth.EthClient, nil, overridesFn) helper.chchRawLogs = chchRawLogs helper.mockEth = mockEth return helper } func newBroadcasterHelperWithEthClient(t *testing.T, ethClient evmclient.Client, highestSeenHead *evmtypes.Head, overridesFn func(*chainlink.Config, *chainlink.Secrets)) *broadcasterHelper { - return broadcasterHelperCfg{highestSeenHead: highestSeenHead}.newWithEthClient(t, ethClient, overridesFn) -} - -func (c broadcasterHelperCfg) newWithEthClient(t *testing.T, ethClient evmclient.Client, overridesFn func(*chainlink.Config, *chainlink.Secrets)) *broadcasterHelper { - if c.db == nil { - c.db = pgtest.NewSqlxDB(t) - } - globalConfig := configtest.NewGeneralConfig(t, func(c *chainlink.Config, s *chainlink.Secrets) { c.Database.LogQueries = ptr(true) finality := uint32(10) @@ -109,25 +91,26 @@ func (c broadcasterHelperCfg) newWithEthClient(t *testing.T, ethClient evmclient lggr := logger.TestLogger(t) mailMon := srvctest.Start(t, utils.NewMailboxMonitor(t.Name())) - orm := log.NewORM(c.db, lggr, config.Database(), cltest.FixtureChainID) - lb := log.NewTestBroadcaster(orm, ethClient, config.EVM(), lggr, c.highestSeenHead, mailMon) - kst := cltest.NewKeyStore(t, c.db, globalConfig.Database()) + db := pgtest.NewSqlxDB(t) + orm := log.NewORM(db, lggr, config.Database(), cltest.FixtureChainID) + lb := log.NewTestBroadcaster(orm, ethClient, config.EVM(), lggr, highestSeenHead, mailMon) + kst := cltest.NewKeyStore(t, db, globalConfig.Database()) cc := evmtest.NewChainRelayExtenders(t, evmtest.TestChainOpts{ Client: ethClient, GeneralConfig: globalConfig, - DB: c.db, + DB: db, KeyStore: kst.Eth(), LogBroadcaster: &log.NullBroadcaster{}, MailMon: mailMon, }) legacyChains := evmrelay.NewLegacyChainsFromRelayerExtenders(cc) - pipelineHelper := cltest.NewJobPipelineV2(t, config.WebServer(), config.JobPipeline(), config.Database(), legacyChains, c.db, kst, nil, nil) + pipelineHelper := cltest.NewJobPipelineV2(t, config.WebServer(), config.JobPipeline(), config.Database(), legacyChains, db, kst, nil, nil) return &broadcasterHelper{ t: t, lb: lb, - db: c.db, + db: db, globalConfig: globalConfig, config: config, pipelineHelper: pipelineHelper, diff --git a/core/chains/evm/log/integration_test.go b/core/chains/evm/log/integration_test.go index 94d11726d10..137b4c7292a 100644 --- a/core/chains/evm/log/integration_test.go +++ b/core/chains/evm/log/integration_test.go @@ -24,9 +24,7 @@ import ( "github.com/smartcontractkit/chainlink/v2/core/gethwrappers/generated/flux_aggregator_wrapper" "github.com/smartcontractkit/chainlink/v2/core/internal/cltest" "github.com/smartcontractkit/chainlink/v2/core/internal/testutils" - configtest "github.com/smartcontractkit/chainlink/v2/core/internal/testutils/configtest/v2" "github.com/smartcontractkit/chainlink/v2/core/internal/testutils/evmtest" - "github.com/smartcontractkit/chainlink/v2/core/internal/testutils/pgtest" "github.com/smartcontractkit/chainlink/v2/core/logger" "github.com/smartcontractkit/chainlink/v2/core/services/chainlink" "github.com/smartcontractkit/chainlink/v2/core/services/srvctest" @@ -37,7 +35,7 @@ func TestBroadcaster_AwaitsInitialSubscribersOnStartup(t *testing.T) { g := gomega.NewWithT(t) const blockHeight int64 = 123 - helper := newBroadcasterHelper(t, blockHeight, 1, nil) + helper := newBroadcasterHelper(t, blockHeight, 1, nil, nil) helper.lb.AddDependents(2) var listener = helper.newLogListenerWithJob("A") @@ -248,13 +246,6 @@ func TestBroadcaster_ReplaysLogs(t *testing.T) { } func TestBroadcaster_BackfillUnconsumedAfterCrash(t *testing.T) { - db := pgtest.NewSqlxDB(t) - lggr := logger.TestLogger(t) - cfg := configtest.NewGeneralConfig(t, nil) - - orm := log.NewORM(db, lggr, cfg.Database(), cltest.FixtureChainID) - - helperCfg := broadcasterHelperCfg{db: db} contract1 := newMockContract(t) contract2 := newMockContract(t) @@ -271,92 +262,102 @@ func TestBroadcaster_BackfillUnconsumedAfterCrash(t *testing.T) { contract1.On("ParseLog", log1).Return(flux_aggregator_wrapper.FluxAggregatorNewRound{}, nil) contract2.On("ParseLog", log2).Return(flux_aggregator_wrapper.FluxAggregatorAnswerUpdated{}, nil) - - // Pool two logs from subscription, then shut down - helper := helperCfg.new(t, 0, 1, logs, func(c *chainlink.Config, s *chainlink.Secrets) { - c.EVM[0].FinalityDepth = ptr[uint32](confs) - }) - listener := helper.newLogListenerWithJob("one") - listener.SkipMarkingConsumed(true) - listener2 := helper.newLogListenerWithJob("two") - listener2.SkipMarkingConsumed(true) - expBlock := int64(log1.BlockNumber) - helper.simulateHeads(t, listener, listener2, contract1, contract2, confs, blocks.Slice(0, 2), orm, &expBlock, func() { - chRawLogs := <-helper.chchRawLogs - chRawLogs.TrySend(log1) - chRawLogs.TrySend(log2) + t.Run("pool two logs from subscription, then shut down", func(t *testing.T) { + helper := newBroadcasterHelper(t, 0, 1, logs, func(c *chainlink.Config, s *chainlink.Secrets) { + c.EVM[0].FinalityDepth = ptr[uint32](confs) + }) + lggr := logger.TestLogger(t) + orm := log.NewORM(helper.db, lggr, helper.config.Database(), cltest.FixtureChainID) + + listener := helper.newLogListenerWithJob("one") + listener.SkipMarkingConsumed(true) + listener2 := helper.newLogListenerWithJob("two") + listener2.SkipMarkingConsumed(true) + expBlock := int64(log1.BlockNumber) + helper.simulateHeads(t, listener, listener2, contract1, contract2, confs, blocks.Slice(0, 2), orm, &expBlock, func() { + chRawLogs := <-helper.chchRawLogs + chRawLogs.TrySend(log1) + chRawLogs.TrySend(log2) + }) + // Pool min block in DB and neither listener received a broadcast + blockNum, err := orm.GetPendingMinBlock() + require.NoError(t, err) + require.NotNil(t, blockNum) + require.Equal(t, int64(log1.BlockNumber), *blockNum) + require.Empty(t, listener.getUniqueLogs()) + require.Empty(t, listener2.getUniqueLogs()) + helper.requireBroadcastCount(0) }) + t.Run("backfill pool with both, then broadcast one, but don't consume", func(t *testing.T) { + helper := newBroadcasterHelper(t, 2, 1, logs, func(c *chainlink.Config, s *chainlink.Secrets) { + c.EVM[0].FinalityDepth = ptr[uint32](confs) + }) + lggr := logger.TestLogger(t) + orm := log.NewORM(helper.db, lggr, helper.config.Database(), cltest.FixtureChainID) - // Pool min block in DB and neither listener received a broadcast - blockNum, err := orm.GetPendingMinBlock() - require.NoError(t, err) - require.NotNil(t, blockNum) - require.Equal(t, int64(log1.BlockNumber), *blockNum) - require.Empty(t, listener.getUniqueLogs()) - require.Empty(t, listener2.getUniqueLogs()) - helper.requireBroadcastCount(0) - - // Backfill pool with both, then broadcast one, but don't consume - helper = helperCfg.new(t, 2, 1, logs, func(c *chainlink.Config, s *chainlink.Secrets) { - c.EVM[0].FinalityDepth = ptr[uint32](confs) - }) - listener = helper.newLogListenerWithJob("one") - listener.SkipMarkingConsumed(true) - listener2 = helper.newLogListenerWithJob("two") - listener2.SkipMarkingConsumed(true) - expBlock = int64(log2.BlockNumber) - helper.simulateHeads(t, listener, listener2, contract1, contract2, confs, blocks.Slice(2, 5), orm, &expBlock, nil) - - // Pool min block in DB and one listener received but didn't consume - blockNum, err = orm.GetPendingMinBlock() - require.NoError(t, err) - require.NotNil(t, blockNum) - require.Equal(t, int64(log2.BlockNumber), *blockNum) - require.NotEmpty(t, listener.getUniqueLogs()) - require.Empty(t, listener2.getUniqueLogs()) - c, err := orm.WasBroadcastConsumed(log1.BlockHash, log1.Index, listener.JobID()) - require.NoError(t, err) - require.False(t, c) + listener := helper.newLogListenerWithJob("one") + listener.SkipMarkingConsumed(true) + listener2 := helper.newLogListenerWithJob("two") + listener2.SkipMarkingConsumed(true) + expBlock := int64(log2.BlockNumber) + helper.simulateHeads(t, listener, listener2, contract1, contract2, confs, blocks.Slice(2, 5), orm, &expBlock, nil) - // Backfill pool and broadcast two, but only consume one - helper = helperCfg.new(t, 4, 1, logs, func(c *chainlink.Config, s *chainlink.Secrets) { - c.EVM[0].FinalityDepth = ptr[uint32](confs) + // Pool min block in DB and one listener received but didn't consume + blockNum, err := orm.GetPendingMinBlock() + require.NoError(t, err) + require.NotNil(t, blockNum) + require.Equal(t, int64(log2.BlockNumber), *blockNum) + require.NotEmpty(t, listener.getUniqueLogs()) + require.Empty(t, listener2.getUniqueLogs()) + c, err := orm.WasBroadcastConsumed(log1.BlockHash, log1.Index, listener.JobID()) + require.NoError(t, err) + require.False(t, c) }) - listener = helper.newLogListenerWithJob("one") - listener2 = helper.newLogListenerWithJob("two") - listener2.SkipMarkingConsumed(true) - helper.simulateHeads(t, listener, listener2, contract1, contract2, confs, blocks.Slice(5, 8), orm, nil, nil) + t.Run("backfill pool and broadcast two, but only consume one", func(t *testing.T) { + helper := newBroadcasterHelper(t, 4, 1, logs, func(c *chainlink.Config, s *chainlink.Secrets) { + c.EVM[0].FinalityDepth = ptr[uint32](confs) + }) + lggr := logger.TestLogger(t) + orm := log.NewORM(helper.db, lggr, helper.config.Database(), cltest.FixtureChainID) - // Pool empty and one consumed but other didn't - blockNum, err = orm.GetPendingMinBlock() - require.NoError(t, err) - require.Nil(t, blockNum) - require.NotEmpty(t, listener.getUniqueLogs()) - require.NotEmpty(t, listener2.getUniqueLogs()) - c, err = orm.WasBroadcastConsumed(log1.BlockHash, log1.Index, listener.JobID()) - require.NoError(t, err) - require.True(t, c) - c, err = orm.WasBroadcastConsumed(log2.BlockHash, log2.Index, listener2.JobID()) - require.NoError(t, err) - require.False(t, c) + listener := helper.newLogListenerWithJob("one") + listener2 := helper.newLogListenerWithJob("two") + listener2.SkipMarkingConsumed(true) + helper.simulateHeads(t, listener, listener2, contract1, contract2, confs, blocks.Slice(5, 8), orm, nil, nil) - // Backfill pool, broadcast and consume one - helper = helperCfg.new(t, 7, 1, logs[1:], func(c *chainlink.Config, s *chainlink.Secrets) { - c.EVM[0].FinalityDepth = ptr[uint32](confs) + // Pool empty and one consumed but other didn't + blockNum, err := orm.GetPendingMinBlock() + require.NoError(t, err) + require.Nil(t, blockNum) + require.NotEmpty(t, listener.getUniqueLogs()) + require.NotEmpty(t, listener2.getUniqueLogs()) + c, err := orm.WasBroadcastConsumed(log1.BlockHash, log1.Index, listener.JobID()) + require.NoError(t, err) + require.True(t, c) + c, err = orm.WasBroadcastConsumed(log2.BlockHash, log2.Index, listener2.JobID()) + require.NoError(t, err) + require.False(t, c) }) - listener = helper.newLogListenerWithJob("one") - listener2 = helper.newLogListenerWithJob("two") - helper.simulateHeads(t, listener, listener2, contract1, contract2, confs, blocks.Slice(8, 9), orm, nil, nil) + t.Run("backfill pool, broadcast and consume one", func(t *testing.T) { + helper := newBroadcasterHelper(t, 7, 1, logs[1:], func(c *chainlink.Config, s *chainlink.Secrets) { + c.EVM[0].FinalityDepth = ptr[uint32](confs) + }) + lggr := logger.TestLogger(t) + orm := log.NewORM(helper.db, lggr, helper.config.Database(), cltest.FixtureChainID) + listener := helper.newLogListenerWithJob("one") + listener2 := helper.newLogListenerWithJob("two") + helper.simulateHeads(t, listener, listener2, contract1, contract2, confs, blocks.Slice(8, 9), orm, nil, nil) - // Pool empty, one broadcasted and consumed - blockNum, err = orm.GetPendingMinBlock() - require.NoError(t, err) - require.Nil(t, blockNum) - require.Empty(t, listener.getUniqueLogs()) - require.NotEmpty(t, listener2.getUniqueLogs()) - c, err = orm.WasBroadcastConsumed(log2.BlockHash, log2.Index, listener2.JobID()) - require.NoError(t, err) - require.True(t, c) + // Pool empty, one broadcasted and consumed + blockNum, err := orm.GetPendingMinBlock() + require.NoError(t, err) + require.Nil(t, blockNum) + require.Empty(t, listener.getUniqueLogs()) + require.NotEmpty(t, listener2.getUniqueLogs()) + c, err := orm.WasBroadcastConsumed(log2.BlockHash, log2.Index, listener2.JobID()) + require.NoError(t, err) + require.True(t, c) + }) } func (helper *broadcasterHelper) simulateHeads(t *testing.T, listener, listener2 *simpleLogListener, @@ -556,7 +557,7 @@ func TestBroadcaster_BackfillALargeNumberOfLogs(t *testing.T) { func TestBroadcaster_BroadcastsToCorrectRecipients(t *testing.T) { const blockHeight int64 = 0 - helper := newBroadcasterHelper(t, blockHeight, 1, nil) + helper := newBroadcasterHelper(t, blockHeight, 1, nil, nil) contract1, err := flux_aggregator_wrapper.NewFluxAggregator(testutils.NewAddress(), nil) require.NoError(t, err) @@ -615,7 +616,7 @@ func TestBroadcaster_BroadcastsToCorrectRecipients(t *testing.T) { func TestBroadcaster_BroadcastsAtCorrectHeights(t *testing.T) { const blockHeight int64 = 0 - helper := newBroadcasterHelper(t, blockHeight, 1, nil) + helper := newBroadcasterHelper(t, blockHeight, 1, nil, nil) helper.start() contract1, err := flux_aggregator_wrapper.NewFluxAggregator(testutils.NewAddress(), nil) @@ -691,7 +692,7 @@ func TestBroadcaster_BroadcastsAtCorrectHeights(t *testing.T) { func TestBroadcaster_DeletesOldLogsAfterNumberOfHeads(t *testing.T) { const blockHeight int64 = 0 - helper := newBroadcasterHelper(t, blockHeight, 1, func(c *chainlink.Config, s *chainlink.Secrets) { + helper := newBroadcasterHelper(t, blockHeight, 1, nil, func(c *chainlink.Config, s *chainlink.Secrets) { c.EVM[0].FinalityDepth = ptr[uint32](1) }) helper.start() @@ -742,7 +743,7 @@ func TestBroadcaster_DeletesOldLogsAfterNumberOfHeads(t *testing.T) { func TestBroadcaster_DeletesOldLogsOnlyAfterFinalityDepth(t *testing.T) { const blockHeight int64 = 0 - helper := newBroadcasterHelper(t, blockHeight, 1, func(c *chainlink.Config, s *chainlink.Secrets) { + helper := newBroadcasterHelper(t, blockHeight, 1, nil, func(c *chainlink.Config, s *chainlink.Secrets) { c.EVM[0].FinalityDepth = ptr[uint32](4) }) helper.start() @@ -793,7 +794,7 @@ func TestBroadcaster_DeletesOldLogsOnlyAfterFinalityDepth(t *testing.T) { func TestBroadcaster_FilterByTopicValues(t *testing.T) { const blockHeight int64 = 0 - helper := newBroadcasterHelper(t, blockHeight, 1, func(c *chainlink.Config, s *chainlink.Secrets) { + helper := newBroadcasterHelper(t, blockHeight, 1, nil, func(c *chainlink.Config, s *chainlink.Secrets) { c.EVM[0].FinalityDepth = ptr[uint32](3) }) helper.start() @@ -873,7 +874,7 @@ func TestBroadcaster_FilterByTopicValues(t *testing.T) { func TestBroadcaster_BroadcastsWithOneDelayedLog(t *testing.T) { const blockHeight int64 = 0 - helper := newBroadcasterHelper(t, blockHeight, 1, func(c *chainlink.Config, s *chainlink.Secrets) { + helper := newBroadcasterHelper(t, blockHeight, 1, nil, func(c *chainlink.Config, s *chainlink.Secrets) { c.EVM[0].FinalityDepth = ptr[uint32](2) }) helper.start() @@ -912,7 +913,7 @@ func TestBroadcaster_BroadcastsWithOneDelayedLog(t *testing.T) { func TestBroadcaster_BroadcastsAtCorrectHeightsWithLogsEarlierThanHeads(t *testing.T) { const blockHeight int64 = 0 - helper := newBroadcasterHelper(t, blockHeight, 1, nil) + helper := newBroadcasterHelper(t, blockHeight, 1, nil, nil) helper.start() contract1, err := flux_aggregator_wrapper.NewFluxAggregator(testutils.NewAddress(), nil) @@ -953,7 +954,7 @@ func TestBroadcaster_BroadcastsAtCorrectHeightsWithLogsEarlierThanHeads(t *testi func TestBroadcaster_BroadcastsAtCorrectHeightsWithHeadsEarlierThanLogs(t *testing.T) { const blockHeight int64 = 0 - helper := newBroadcasterHelper(t, blockHeight, 1, func(c *chainlink.Config, s *chainlink.Secrets) { + helper := newBroadcasterHelper(t, blockHeight, 1, nil, func(c *chainlink.Config, s *chainlink.Secrets) { c.EVM[0].FinalityDepth = ptr[uint32](2) }) helper.start() @@ -1214,7 +1215,7 @@ func TestBroadcaster_ReceivesAllLogsWhenResubscribing(t *testing.T) { test := test t.Run(test.name, func(t *testing.T) { const backfillDepth = 5 - helper := newBroadcasterHelper(t, int64(test.blockHeight1), 2, func(c *chainlink.Config, s *chainlink.Secrets) { + helper := newBroadcasterHelper(t, int64(test.blockHeight1), 2, nil, func(c *chainlink.Config, s *chainlink.Secrets) { // something other than default c.EVM[0].BlockBackfillDepth = ptr[uint32](backfillDepth) }) @@ -1359,7 +1360,7 @@ func TestBroadcaster_AppendLogChannel(t *testing.T) { func TestBroadcaster_InjectsBroadcastRecordFunctions(t *testing.T) { const blockHeight int64 = 0 - helper := newBroadcasterHelper(t, blockHeight, 1, nil) + helper := newBroadcasterHelper(t, blockHeight, 1, nil, nil) helper.start() defer helper.stop() @@ -1390,7 +1391,7 @@ func TestBroadcaster_ProcessesLogsFromReorgsAndMissedHead(t *testing.T) { g := gomega.NewWithT(t) const startBlockHeight int64 = 0 - helper := newBroadcasterHelper(t, startBlockHeight, 1, nil) + helper := newBroadcasterHelper(t, startBlockHeight, 1, nil, nil) helper.start() defer helper.stop() @@ -1473,7 +1474,7 @@ func TestBroadcaster_BackfillsForNewListeners(t *testing.T) { g := gomega.NewWithT(t) const blockHeight int64 = 0 - helper := newBroadcasterHelper(t, blockHeight, 2, nil) + helper := newBroadcasterHelper(t, blockHeight, 2, nil, nil) helper.mockEth.EthClient.On("HeadByNumber", mock.Anything, (*big.Int)(nil)).Return(&evmtypes.Head{Number: blockHeight}, nil).Times(1) helper.mockEth.EthClient.On("FilterLogs", mock.Anything, mock.Anything).Return(nil, nil).Times(1) diff --git a/core/chains/evm/logpoller/log_poller_test.go b/core/chains/evm/logpoller/log_poller_test.go index 65a2f97f922..44d1f9c1cb9 100644 --- a/core/chains/evm/logpoller/log_poller_test.go +++ b/core/chains/evm/logpoller/log_poller_test.go @@ -393,8 +393,8 @@ func TestLogPoller_BackupPollAndSaveLogsWithPollerNotWorking(t *testing.T) { // Emit some logs in blocks for i := 0; i < emittedLogs; i++ { - _, err := th.Emitter1.EmitLog1(th.Owner, []*big.Int{big.NewInt(int64(i))}) - require.NoError(t, err) + _, err2 := th.Emitter1.EmitLog1(th.Owner, []*big.Int{big.NewInt(int64(i))}) + require.NoError(t, err2) th.Client.Commit() } @@ -1601,8 +1601,8 @@ func Test_CreatedAfterQueriesWithBackfill(t *testing.T) { // Emit some logs in blocks for i := 0; i < emittedLogs; i++ { - _, err := th.Emitter1.EmitLog1(th.Owner, []*big.Int{big.NewInt(int64(i))}) - require.NoError(t, err) + _, err2 := th.Emitter1.EmitLog1(th.Owner, []*big.Int{big.NewInt(int64(i))}) + require.NoError(t, err2) th.Client.Commit() } diff --git a/core/cmd/shell.go b/core/cmd/shell.go index fb0f1df6a84..d62e6c41bf2 100644 --- a/core/cmd/shell.go +++ b/core/cmd/shell.go @@ -181,8 +181,8 @@ func (n ChainlinkAppFactory) NewApplication(ctx context.Context, cfg chainlink.G } if cfg.SolanaEnabled() { solanaCfg := chainlink.SolanaFactoryConfig{ - Keystore: keyStore.Solana(), - SolanaConfigs: cfg.SolanaConfigs(), + Keystore: keyStore.Solana(), + TOMLConfigs: cfg.SolanaConfigs(), } initOps = append(initOps, chainlink.InitSolana(ctx, relayerFactory, solanaCfg)) } diff --git a/core/cmd/shell_test.go b/core/cmd/shell_test.go index 646a266837a..d2fb0738fdf 100644 --- a/core/cmd/shell_test.go +++ b/core/cmd/shell_test.go @@ -351,20 +351,20 @@ func TestSetupSolanaRelayer(t *testing.T) { // config 3 chains but only enable 2 => should only be 2 relayer nEnabledChains := 2 tConfig := configtest.NewGeneralConfig(t, func(c *chainlink.Config, s *chainlink.Secrets) { - c.Solana = solana.SolanaConfigs{ - &solana.SolanaConfig{ + c.Solana = solana.TOMLConfigs{ + &solana.TOMLConfig{ ChainID: ptr[string]("solana-id-1"), Enabled: ptr(true), Chain: solcfg.Chain{}, Nodes: []*solcfg.Node{}, }, - &solana.SolanaConfig{ + &solana.TOMLConfig{ ChainID: ptr[string]("solana-id-2"), Enabled: ptr(true), Chain: solcfg.Chain{}, Nodes: []*solcfg.Node{}, }, - &solana.SolanaConfig{ + &solana.TOMLConfig{ ChainID: ptr[string]("disabled-solana-id-1"), Enabled: ptr(false), Chain: solcfg.Chain{}, @@ -401,14 +401,14 @@ func TestSetupSolanaRelayer(t *testing.T) { // test that duplicate enabled chains is an error when duplicateConfig := configtest.NewGeneralConfig(t, func(c *chainlink.Config, s *chainlink.Secrets) { - c.Solana = solana.SolanaConfigs{ - &solana.SolanaConfig{ + c.Solana = solana.TOMLConfigs{ + &solana.TOMLConfig{ ChainID: ptr[string]("dupe"), Enabled: ptr(true), Chain: solcfg.Chain{}, Nodes: []*solcfg.Node{}, }, - &solana.SolanaConfig{ + &solana.TOMLConfig{ ChainID: ptr[string]("dupe"), Enabled: ptr(true), Chain: solcfg.Chain{}, diff --git a/core/cmd/solana_chains_commands_test.go b/core/cmd/solana_chains_commands_test.go index 3a0265a5b5a..88bc8049247 100644 --- a/core/cmd/solana_chains_commands_test.go +++ b/core/cmd/solana_chains_commands_test.go @@ -16,7 +16,7 @@ func TestShell_IndexSolanaChains(t *testing.T) { t.Parallel() id := solanatest.RandomChainID() - cfg := solana.SolanaConfig{ + cfg := solana.TOMLConfig{ ChainID: &id, Enabled: ptr(true), } diff --git a/core/cmd/solana_node_commands_test.go b/core/cmd/solana_node_commands_test.go index 9a39129e34a..3f95ebc0d84 100644 --- a/core/cmd/solana_node_commands_test.go +++ b/core/cmd/solana_node_commands_test.go @@ -19,7 +19,7 @@ import ( "github.com/smartcontractkit/chainlink/v2/core/services/chainlink" ) -func solanaStartNewApplication(t *testing.T, cfgs ...*solana.SolanaConfig) *cltest.TestApplication { +func solanaStartNewApplication(t *testing.T, cfgs ...*solana.TOMLConfig) *cltest.TestApplication { for i := range cfgs { cfgs[i].SetDefaults() } @@ -41,7 +41,7 @@ func TestShell_IndexSolanaNodes(t *testing.T) { Name: ptr("second"), URL: utils.MustParseURL("https://solana2.example"), } - chain := solana.SolanaConfig{ + chain := solana.TOMLConfig{ ChainID: &id, Nodes: solana.SolanaNodes{&node1, &node2}, } diff --git a/core/cmd/solana_transaction_commands_test.go b/core/cmd/solana_transaction_commands_test.go index 8cd8e39daf3..cdb182cba41 100644 --- a/core/cmd/solana_transaction_commands_test.go +++ b/core/cmd/solana_transaction_commands_test.go @@ -31,7 +31,7 @@ func TestShell_SolanaSendSol(t *testing.T) { Name: ptr(t.Name()), URL: utils.MustParseURL(url), } - cfg := solana.SolanaConfig{ + cfg := solana.TOMLConfig{ ChainID: &chainID, Nodes: solana.SolanaNodes{&node}, Enabled: ptr(true), diff --git a/core/config/docs/docs_test.go b/core/config/docs/docs_test.go index a5fdfbebecb..b5e83a322c3 100644 --- a/core/config/docs/docs_test.go +++ b/core/config/docs/docs_test.go @@ -1,7 +1,6 @@ package docs_test import ( - _ "embed" "strings" "testing" @@ -99,7 +98,7 @@ func TestDoc(t *testing.T) { }) t.Run("Solana", func(t *testing.T) { - var fallbackDefaults solana.SolanaConfig + var fallbackDefaults solana.TOMLConfig fallbackDefaults.SetDefaults() assertTOML(t, fallbackDefaults.Chain, defaults.Solana[0].Chain) diff --git a/core/config/toml/types.go b/core/config/toml/types.go index cc2a85c9a02..b7c8cfbc473 100644 --- a/core/config/toml/types.go +++ b/core/config/toml/types.go @@ -1,7 +1,6 @@ package toml import ( - _ "embed" "errors" "fmt" "net" @@ -1357,14 +1356,9 @@ var hostnameRegex = regexp.MustCompile(`^[a-zA-Z0-9-]+(\.[a-zA-Z0-9-]+)*$`) func isValidURI(uri string) bool { if strings.Contains(uri, "://") { // Standard URI check - _, err := url.ParseRequestURI(uri) - if err != nil { - // TODO: BCF-2703. All external addresses currently fail validation until we have secure transport to external networks. - return false - } else { - return false - } - + _, _ = url.ParseRequestURI(uri) + // TODO: BCF-2703. Handle error. All external addresses currently fail validation until we have secure transport to external networks. + return false } // For URIs like "otel-collector:4317" diff --git a/core/internal/cltest/cltest.go b/core/internal/cltest/cltest.go index 7b18c6ecb00..503bcfed8db 100644 --- a/core/internal/cltest/cltest.go +++ b/core/internal/cltest/cltest.go @@ -429,8 +429,8 @@ func NewApplicationWithConfig(t testing.TB, cfg chainlink.GeneralConfig, flagsAn } if cfg.SolanaEnabled() { solanaCfg := chainlink.SolanaFactoryConfig{ - Keystore: keyStore.Solana(), - SolanaConfigs: cfg.SolanaConfigs(), + Keystore: keyStore.Solana(), + TOMLConfigs: cfg.SolanaConfigs(), } initOps = append(initOps, chainlink.InitSolana(testCtx, relayerFactory, solanaCfg)) } @@ -1338,8 +1338,7 @@ func BatchElemMustMatchParams(t *testing.T, req rpc.BatchElem, hash common.Hash, // SimulateIncomingHeads spawns a goroutine which sends a stream of heads and closes the returned channel when finished. func SimulateIncomingHeads(t *testing.T, heads []*evmtypes.Head, headTrackables ...httypes.HeadTrackable) (done chan struct{}) { // Build the full chain of heads - ctx, cancel := context.WithTimeout(context.Background(), testutils.WaitTimeout(t)) - t.Cleanup(cancel) + ctx := testutils.Context(t) done = make(chan struct{}) go func(t *testing.T) { defer close(done) diff --git a/core/services/chainlink/config.go b/core/services/chainlink/config.go index 6227faac07e..40339581f4f 100644 --- a/core/services/chainlink/config.go +++ b/core/services/chainlink/config.go @@ -38,7 +38,7 @@ type Config struct { Cosmos cosmos.CosmosConfigs `toml:",omitempty"` - Solana solana.SolanaConfigs `toml:",omitempty"` + Solana solana.TOMLConfigs `toml:",omitempty"` Starknet starknet.StarknetConfigs `toml:",omitempty"` } @@ -82,7 +82,7 @@ func (c *Config) setDefaults() { for i := range c.Solana { if c.Solana[i] == nil { - c.Solana[i] = new(solana.SolanaConfig) + c.Solana[i] = new(solana.TOMLConfig) } c.Solana[i].Chain.SetDefaults() } diff --git a/core/services/chainlink/config_general.go b/core/services/chainlink/config_general.go index 4c8b6586486..50e182e9887 100644 --- a/core/services/chainlink/config_general.go +++ b/core/services/chainlink/config_general.go @@ -197,7 +197,7 @@ func (g *generalConfig) CosmosConfigs() cosmos.CosmosConfigs { return g.c.Cosmos } -func (g *generalConfig) SolanaConfigs() solana.SolanaConfigs { +func (g *generalConfig) SolanaConfigs() solana.TOMLConfigs { return g.c.Solana } diff --git a/core/services/chainlink/config_test.go b/core/services/chainlink/config_test.go index ac412c0d4bd..91ac0987a56 100644 --- a/core/services/chainlink/config_test.go +++ b/core/services/chainlink/config_test.go @@ -157,7 +157,7 @@ var ( {Name: ptr("secondary"), TendermintURL: relayutils.MustParseURL("http://bombay.cosmos.com")}, }}, }, - Solana: []*solana.SolanaConfig{ + Solana: []*solana.TOMLConfig{ { ChainID: ptr("mainnet"), Chain: solcfg.Chain{ @@ -579,7 +579,7 @@ func TestConfig_Marshal(t *testing.T) { }, }}, } - full.Solana = []*solana.SolanaConfig{ + full.Solana = []*solana.TOMLConfig{ { ChainID: ptr("mainnet"), Enabled: ptr(false), @@ -1410,7 +1410,7 @@ func TestConfig_setDefaults(t *testing.T) { var c Config c.EVM = evmcfg.EVMConfigs{{ChainID: utils.NewBigI(99999133712345)}} c.Cosmos = cosmos.CosmosConfigs{{ChainID: ptr("unknown cosmos chain")}} - c.Solana = solana.SolanaConfigs{{ChainID: ptr("unknown solana chain")}} + c.Solana = solana.TOMLConfigs{{ChainID: ptr("unknown solana chain")}} c.Starknet = starknet.StarknetConfigs{{ChainID: ptr("unknown starknet chain")}} c.setDefaults() if s, err := c.TOMLString(); assert.NoError(t, err) { diff --git a/core/services/chainlink/relayer_chain_interoperators.go b/core/services/chainlink/relayer_chain_interoperators.go index 6fd6e1cb581..79bb4de5d29 100644 --- a/core/services/chainlink/relayer_chain_interoperators.go +++ b/core/services/chainlink/relayer_chain_interoperators.go @@ -144,7 +144,7 @@ func InitCosmos(ctx context.Context, factory RelayerFactory, config CosmosFactor // InitSolana is a option for instantiating Solana relayers func InitSolana(ctx context.Context, factory RelayerFactory, config SolanaFactoryConfig) CoreRelayerChainInitFunc { return func(op *CoreRelayerChainInteroperators) error { - solRelayers, err := factory.NewSolana(config.Keystore, config.SolanaConfigs) + solRelayers, err := factory.NewSolana(config.Keystore, config.TOMLConfigs) if err != nil { return fmt.Errorf("failed to setup Solana relayer: %w", err) } diff --git a/core/services/chainlink/relayer_chain_interoperators_test.go b/core/services/chainlink/relayer_chain_interoperators_test.go index 70dbd73646c..377401f5022 100644 --- a/core/services/chainlink/relayer_chain_interoperators_test.go +++ b/core/services/chainlink/relayer_chain_interoperators_test.go @@ -79,8 +79,8 @@ func TestCoreRelayerChainInteroperators(t *testing.T) { Nodes: evmcfg.EVMNodes{&node2_1}, }) - c.Solana = solana.SolanaConfigs{ - &solana.SolanaConfig{ + c.Solana = solana.TOMLConfigs{ + &solana.TOMLConfig{ ChainID: &solanaChainID1, Enabled: ptr(true), Chain: solcfg.Chain{}, @@ -89,7 +89,7 @@ func TestCoreRelayerChainInteroperators(t *testing.T) { URL: ((*relayutils.URL)(models.MustParseURL("http://localhost:8547").URL())), }}, }, - &solana.SolanaConfig{ + &solana.TOMLConfig{ ChainID: &solanaChainID2, Enabled: ptr(true), Chain: solcfg.Chain{}, @@ -227,8 +227,8 @@ func TestCoreRelayerChainInteroperators(t *testing.T) { initFuncs: []chainlink.CoreRelayerChainInitFunc{ chainlink.InitSolana(testctx, factory, chainlink.SolanaFactoryConfig{ - Keystore: keyStore.Solana(), - SolanaConfigs: cfg.SolanaConfigs()}), + Keystore: keyStore.Solana(), + TOMLConfigs: cfg.SolanaConfigs()}), }, expectedSolanaChainCnt: 2, expectedSolanaNodeCnt: 2, @@ -277,8 +277,8 @@ func TestCoreRelayerChainInteroperators(t *testing.T) { {name: "all chains", initFuncs: []chainlink.CoreRelayerChainInitFunc{chainlink.InitSolana(testctx, factory, chainlink.SolanaFactoryConfig{ - Keystore: keyStore.Solana(), - SolanaConfigs: cfg.SolanaConfigs()}), + Keystore: keyStore.Solana(), + TOMLConfigs: cfg.SolanaConfigs()}), chainlink.InitEVM(testctx, factory, chainlink.EVMFactoryConfig{ ChainOpts: evm.ChainOpts{ AppConfig: cfg, diff --git a/core/services/chainlink/relayer_factory.go b/core/services/chainlink/relayer_factory.go index 3a65a58f257..1a46461d6d8 100644 --- a/core/services/chainlink/relayer_factory.go +++ b/core/services/chainlink/relayer_factory.go @@ -83,10 +83,10 @@ func (r *RelayerFactory) NewEVM(ctx context.Context, config EVMFactoryConfig) (m type SolanaFactoryConfig struct { Keystore keystore.Solana - solana.SolanaConfigs + solana.TOMLConfigs } -func (r *RelayerFactory) NewSolana(ks keystore.Solana, chainCfgs solana.SolanaConfigs) (map[relay.ID]loop.Relayer, error) { +func (r *RelayerFactory) NewSolana(ks keystore.Solana, chainCfgs solana.TOMLConfigs) (map[relay.ID]loop.Relayer, error) { solanaRelayers := make(map[relay.ID]loop.Relayer) var ( solLggr = r.Logger.Named("Solana") @@ -116,7 +116,7 @@ func (r *RelayerFactory) NewSolana(ks keystore.Solana, chainCfgs solana.SolanaCo // setup the solana relayer to be a LOOP cfgTOML, err := toml.Marshal(struct { - Solana solana.SolanaConfig + Solana solana.TOMLConfig }{Solana: *chainCfg}) if err != nil { diff --git a/core/services/chainlink/types.go b/core/services/chainlink/types.go index 7cfa15f3031..88de7d1ec04 100644 --- a/core/services/chainlink/types.go +++ b/core/services/chainlink/types.go @@ -14,7 +14,7 @@ type GeneralConfig interface { config.AppConfig toml.HasEVMConfigs CosmosConfigs() cosmos.CosmosConfigs - SolanaConfigs() solana.SolanaConfigs + SolanaConfigs() solana.TOMLConfigs StarknetConfigs() starknet.StarknetConfigs // ConfigTOML returns both the user provided and effective configuration as TOML. ConfigTOML() (user, effective string) diff --git a/core/services/fluxmonitorv2/integrations_test.go b/core/services/fluxmonitorv2/integrations_test.go index 5b6a94cac58..2c45ed5ad89 100644 --- a/core/services/fluxmonitorv2/integrations_test.go +++ b/core/services/fluxmonitorv2/integrations_test.go @@ -21,10 +21,11 @@ import ( "github.com/ethereum/go-ethereum/core" "github.com/ethereum/go-ethereum/eth/ethconfig" "github.com/onsi/gomega" - "github.com/smartcontractkit/sqlx" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + "github.com/smartcontractkit/sqlx" + "github.com/smartcontractkit/chainlink/v2/core/assets" "github.com/smartcontractkit/chainlink/v2/core/bridges" "github.com/smartcontractkit/chainlink/v2/core/chains/evm/log" @@ -523,8 +524,9 @@ func TestFluxMonitor_Deviation(t *testing.T) { initialBalance := currentBalance(t, &fa).Int64() jobResponse := cltest.CreateJobViaWeb2(t, app, string(requestBody)) - jobId, err := strconv.Atoi(jobResponse.ID) + i, err := strconv.ParseInt(jobResponse.ID, 10, 32) require.NoError(t, err) + jobID := int32(i) // Waiting for flux monitor to finish Register process in log broadcaster // and then to have log broadcaster backfill logs after the debounceResubscribe period of ~ 1 sec @@ -559,7 +561,7 @@ func TestFluxMonitor_Deviation(t *testing.T) { // Need to wait until NewRound log is consumed - otherwise there is a chance // it will arrive after the next answer is submitted, and cause // DeleteFluxMonitorRoundsBackThrough to delete previous stats - checkLogWasConsumed(t, fa, app.GetSqlxDB(), int32(jobId), receiptBlock, app.GetConfig().Database()) + checkLogWasConsumed(t, fa, app.GetSqlxDB(), jobID, receiptBlock, app.GetConfig().Database()) lggr.Info("Updating price to 103") // Change reported price to a value outside the deviation @@ -588,7 +590,7 @@ func TestFluxMonitor_Deviation(t *testing.T) { // Need to wait until NewRound log is consumed - otherwise there is a chance // it will arrive after the next answer is submitted, and cause // DeleteFluxMonitorRoundsBackThrough to delete previous stats - checkLogWasConsumed(t, fa, app.GetSqlxDB(), int32(jobId), receiptBlock, app.GetConfig().Database()) + checkLogWasConsumed(t, fa, app.GetSqlxDB(), jobID, receiptBlock, app.GetConfig().Database()) // Should not received a submission as it is inside the deviation reportPrice.Store(104) diff --git a/core/services/keeper/registry_synchronizer_helper_test.go b/core/services/keeper/registry_synchronizer_helper_test.go index 6b9bb815a89..99fb54eba4d 100644 --- a/core/services/keeper/registry_synchronizer_helper_test.go +++ b/core/services/keeper/registry_synchronizer_helper_test.go @@ -5,10 +5,11 @@ import ( "time" "github.com/onsi/gomega" - "github.com/smartcontractkit/sqlx" "github.com/stretchr/testify/mock" "github.com/stretchr/testify/require" + "github.com/smartcontractkit/sqlx" + evmclimocks "github.com/smartcontractkit/chainlink/v2/core/chains/evm/client/mocks" "github.com/smartcontractkit/chainlink/v2/core/chains/evm/log" logmocks "github.com/smartcontractkit/chainlink/v2/core/chains/evm/log/mocks" @@ -59,6 +60,8 @@ func setupRegistrySync(t *testing.T, version keeper.RegistryVersion) ( case keeper.RegistryVersion_1_3: registryMock := cltest.NewContractMockReceiver(t, ethClient, keeper.Registry1_3ABI, contractAddress) registryMock.MockResponse("typeAndVersion", "KeeperRegistry 1.3.0").Once() + case keeper.RegistryVersion_2_0, keeper.RegistryVersion_2_1: + t.Fatalf("Unsupported version: %s", version) } registryWrapper, err := keeper.NewRegistryWrapper(j.KeeperSpec.ContractAddress, ethClient) diff --git a/core/services/ocr/contract_tracker.go b/core/services/ocr/contract_tracker.go index 0c7e288bd43..a7df28e1c76 100644 --- a/core/services/ocr/contract_tracker.go +++ b/core/services/ocr/contract_tracker.go @@ -13,6 +13,7 @@ import ( gethCommon "github.com/ethereum/go-ethereum/common" gethTypes "github.com/ethereum/go-ethereum/core/types" "github.com/pkg/errors" + "github.com/smartcontractkit/sqlx" "github.com/smartcontractkit/libocr/gethwrappers/offchainaggregator" @@ -399,7 +400,7 @@ func (t *OCRContractTracker) LatestBlockHeight(ctx context.Context) (blockheight // care about the block height; we have no way of getting the L1 block // height anyway return 0, nil - case "", config.ChainArbitrum, config.ChainXDai: + case "", config.ChainArbitrum, config.ChainCelo, config.ChainOptimismBedrock, config.ChainXDai: // continue } latestBlockHeight := t.getLatestBlockHeight() diff --git a/core/services/ocrcommon/peerstore_test.go b/core/services/ocrcommon/peerstore_test.go index 99d44229b19..ba55e0767ab 100644 --- a/core/services/ocrcommon/peerstore_test.go +++ b/core/services/ocrcommon/peerstore_test.go @@ -89,7 +89,7 @@ func Test_Peerstore_WriteToDB(t *testing.T) { err = wrapper.WriteToDB() require.NoError(t, err) - peers := make([]ocrcommon.P2PPeer, 0) + var peers []ocrcommon.P2PPeer err = db.Select(&peers, `SELECT * FROM p2p_peers`) require.NoError(t, err) require.Equal(t, 1, len(peers)) diff --git a/core/services/relay/evm/evm.go b/core/services/relay/evm/evm.go index fd124fe9f46..bb9b6af82ee 100644 --- a/core/services/relay/evm/evm.go +++ b/core/services/relay/evm/evm.go @@ -45,7 +45,7 @@ import ( "github.com/smartcontractkit/chainlink/v2/core/utils" ) -var _ relaytypes.Relayer = &Relayer{} +var _ relaytypes.Relayer = &Relayer{} //nolint:staticcheck type Relayer struct { db *sqlx.DB diff --git a/core/services/relay/evm/types/types.go b/core/services/relay/evm/types/types.go index 97eddd7d9cf..6a1e66bd096 100644 --- a/core/services/relay/evm/types/types.go +++ b/core/services/relay/evm/types/types.go @@ -13,6 +13,7 @@ import ( ocrtypes "github.com/smartcontractkit/libocr/offchainreporting2plus/types" + "github.com/smartcontractkit/chainlink-relay/pkg/services" "github.com/smartcontractkit/chainlink-relay/pkg/types" relaytypes "github.com/smartcontractkit/chainlink-relay/pkg/types" @@ -103,7 +104,7 @@ type RouteUpdateSubscriber interface { // //go:generate mockery --quiet --name LogPollerWrapper --output ./mocks/ --case=underscore type LogPollerWrapper interface { - relaytypes.Service + services.Service LatestEvents() ([]OracleRequest, []OracleResponse, error) // TODO (FUN-668): Remove from the LOOP interface and only use internally within the EVM relayer diff --git a/core/services/relay/relay.go b/core/services/relay/relay.go index 022e7b794e5..4ec7373714c 100644 --- a/core/services/relay/relay.go +++ b/core/services/relay/relay.go @@ -88,7 +88,7 @@ type relayerAdapter struct { // NewRelayerAdapter returns a [loop.Relayer] adapted from a [types.Relayer] and [RelayerExt]. // Unlike NewRelayerServerAdapter which is used to adapt non-LOOPP relayers, this is used to adapt // LOOPP-based relayer which are then server over GRPC (by the relayerServer). -func NewRelayerAdapter(r types.Relayer, e RelayerExt) loop.Relayer { +func NewRelayerAdapter(r types.Relayer, e RelayerExt) loop.Relayer { //nolint:staticcheck return &relayerAdapter{Relayer: r, RelayerExt: e} } @@ -172,9 +172,10 @@ func (r *relayerServerAdapter) NewPluginProvider(ctx context.Context, rargs type return r.NewMercuryProvider(ctx, rargs, pargs) case types.DKG, types.OCR2VRF, types.OCR2Keeper, types.GenericPlugin: return r.relayerAdapter.NewPluginProvider(ctx, rargs, pargs) + case types.CCIPCommit, types.CCIPExecution: + return nil, fmt.Errorf("provider type not supported: %s", rargs.ProviderType) } - - return nil, fmt.Errorf("provider type not supported: %s", rargs.ProviderType) + return nil, fmt.Errorf("provider type not recognized: %s", rargs.ProviderType) } // NewRelayerServerAdapter returns a [loop.Relayer] adapted from a [types.Relayer] and [RelayerExt]. diff --git a/core/services/relay/relay_test.go b/core/services/relay/relay_test.go index 28ee0172c20..25ab76adf3b 100644 --- a/core/services/relay/relay_test.go +++ b/core/services/relay/relay_test.go @@ -115,9 +115,17 @@ func TestRelayerServerAdapter(t *testing.T) { Test: isType[types.MercuryProvider], }, { - ProviderType: "unknown", + ProviderType: string(types.CCIPCommit), + Error: "provider type not supported", + }, + { + ProviderType: string(types.CCIPExecution), Error: "provider type not supported", }, + { + ProviderType: "unknown", + Error: "provider type not recognized", + }, { ProviderType: string(types.GenericPlugin), Error: "unexpected call to NewPluginProvider", diff --git a/core/services/service.go b/core/services/service.go index 5ed4e67f272..066405ac012 100644 --- a/core/services/service.go +++ b/core/services/service.go @@ -1,87 +1,7 @@ package services -import "context" +import ( + "github.com/smartcontractkit/chainlink-relay/pkg/services" +) -// ServiceCtx represents a long-running service inside the Application. -// -// Typically, a ServiceCtx will leverage utils.StartStopOnce to implement these -// calls in a safe manner. -// -// # Template -// -// Mockable Foo service with a run loop -// -// //go:generate mockery --quiet --name Foo --output ../internal/mocks/ --case=underscore -// type ( -// // Expose a public interface so we can mock the service. -// Foo interface { -// service.ServiceCtx -// -// // ... -// } -// -// foo struct { -// // ... -// -// stop chan struct{} -// done chan struct{} -// -// utils.StartStopOnce -// } -// ) -// -// var _ Foo = (*foo)(nil) -// -// func NewFoo() Foo { -// f := &foo{ -// // ... -// } -// -// return f -// } -// -// func (f *foo) Start(ctx context.Context) error { -// return f.StartOnce("Foo", func() error { -// go f.run() -// -// return nil -// }) -// } -// -// func (f *foo) Close() error { -// return f.StopOnce("Foo", func() error { -// // trigger goroutine cleanup -// close(f.stop) -// // wait for cleanup to complete -// <-f.done -// return nil -// }) -// } -// -// func (f *foo) run() { -// // signal cleanup completion -// defer close(f.done) -// -// for { -// select { -// // ... -// case <-f.stop: -// // stop the routine -// return -// } -// } -// -// } -// -// Deprecated: use chainlink-relay/pkg/services.Service -type ServiceCtx interface { - // Start the service. Must quit immediately if the context is cancelled. - // The given context applies to Start function only and must not be retained. - Start(context.Context) error - // Close stops the Service. - // Invariants: Usually after this call the Service cannot be started - // again, you need to build a new Service to do so. - Close() error - - Checkable -} +type ServiceCtx = services.Service diff --git a/core/services/telemetry/manager_test.go b/core/services/telemetry/manager_test.go index 98fa82bd92e..4aaf3280155 100644 --- a/core/services/telemetry/manager_test.go +++ b/core/services/telemetry/manager_test.go @@ -10,6 +10,7 @@ import ( "testing" "time" + "github.com/stretchr/testify/assert" "github.com/stretchr/testify/mock" "github.com/stretchr/testify/require" "go.uber.org/zap/zapcore" @@ -304,7 +305,7 @@ func TestLegacyMode(t *testing.T) { require.Equal(t, true, tm.legacyMode) require.Len(t, tm.endpoints, 1) - clientSent := make([]synchronization.TelemPayload, 0) + var clientSent []synchronization.TelemPayload clientMock := mocks2.NewTelemetryService(t) clientMock.On("Send", mock.Anything, mock.AnythingOfType("[]uint8"), mock.AnythingOfType("string"), mock.AnythingOfType("TelemetryType")).Return().Run(func(args mock.Arguments) { clientSent = append(clientSent, synchronization.TelemPayload{ @@ -329,15 +330,12 @@ func TestLegacyMode(t *testing.T) { e2.SendLog([]byte("endpoint-2-message-1")) e2.SendLog([]byte("endpoint-2-message-2")) e2.SendLog([]byte("endpoint-2-message-3")) - if len(clientSent) != 6 { - t.Fatalf("expected length 6 but got %d", len(clientSent)) - } - - require.Equal(t, 1, obsLogs.Len()) // Deprecation warning for TelemetryIngress.URL and TelemetryIngress.ServerPubKey - require.Equal(t, []byte("endpoint-1-message-1"), clientSent[0].Telemetry) - require.Equal(t, []byte("endpoint-1-message-2"), clientSent[1].Telemetry) - require.Equal(t, []byte("endpoint-1-message-3"), clientSent[2].Telemetry) - require.Equal(t, []byte("endpoint-2-message-1"), clientSent[3].Telemetry) - require.Equal(t, []byte("endpoint-2-message-2"), clientSent[4].Telemetry) - require.Equal(t, []byte("endpoint-2-message-3"), clientSent[5].Telemetry) + require.Len(t, clientSent, 6) + assert.Equal(t, []byte("endpoint-1-message-1"), clientSent[0].Telemetry) + assert.Equal(t, []byte("endpoint-1-message-2"), clientSent[1].Telemetry) + assert.Equal(t, []byte("endpoint-1-message-3"), clientSent[2].Telemetry) + assert.Equal(t, []byte("endpoint-2-message-1"), clientSent[3].Telemetry) + assert.Equal(t, []byte("endpoint-2-message-2"), clientSent[4].Telemetry) + assert.Equal(t, []byte("endpoint-2-message-3"), clientSent[5].Telemetry) + assert.Equal(t, 1, obsLogs.Len()) // Deprecation warning for TelemetryIngress.URL and TelemetryIngress.ServerPubKey } diff --git a/core/services/vrf/delegate.go b/core/services/vrf/delegate.go index a8dd17b6daf..f6b6a460b89 100644 --- a/core/services/vrf/delegate.go +++ b/core/services/vrf/delegate.go @@ -132,16 +132,16 @@ func (d *Delegate) ServicesForSpec(jb job.Job) ([]job.ServiceCtx, error) { for _, task := range pl.Tasks { if _, ok := task.(*pipeline.VRFTaskV2Plus); ok { - if err := CheckFromAddressesExist(jb, d.ks.Eth()); err != nil { - return nil, err + if err2 := CheckFromAddressesExist(jb, d.ks.Eth()); err != nil { + return nil, err2 } if !FromAddressMaxGasPricesAllEqual(jb, chain.Config().EVM().GasEstimator().PriceMaxKey) { return nil, errors.New("key-specific max gas prices of all fromAddresses are not equal, please set them to equal values") } - if err := CheckFromAddressMaxGasPrices(jb, chain.Config().EVM().GasEstimator().PriceMaxKey); err != nil { - return nil, err + if err2 := CheckFromAddressMaxGasPrices(jb, chain.Config().EVM().GasEstimator().PriceMaxKey); err != nil { + return nil, err2 } if vrfOwner != nil { return nil, errors.New("VRF Owner is not supported for VRF V2 Plus") @@ -159,9 +159,9 @@ func (d *Delegate) ServicesForSpec(jb job.Job) ([]job.ServiceCtx, error) { return nil, errors.Wrap(err, "can't call LINKNATIVEFEED") } - aggregator, err := aggregator_v3_interface.NewAggregatorV3Interface(linkNativeFeedAddress, chain.Client()) - if err != nil { - return nil, errors.Wrap(err, "NewAggregatorV3Interface") + aggregator, err2 := aggregator_v3_interface.NewAggregatorV3Interface(linkNativeFeedAddress, chain.Client()) + if err2 != nil { + return nil, errors.Wrap(err2, "NewAggregatorV3Interface") } return []job.ServiceCtx{v2.New( @@ -188,16 +188,16 @@ func (d *Delegate) ServicesForSpec(jb job.Job) ([]job.ServiceCtx, error) { vrfcommon.NewLogDeduper(int(chain.Config().EVM().FinalityDepth())))}, nil } if _, ok := task.(*pipeline.VRFTaskV2); ok { - if err := CheckFromAddressesExist(jb, d.ks.Eth()); err != nil { - return nil, err + if err2 := CheckFromAddressesExist(jb, d.ks.Eth()); err != nil { + return nil, err2 } if !FromAddressMaxGasPricesAllEqual(jb, chain.Config().EVM().GasEstimator().PriceMaxKey) { return nil, errors.New("key-specific max gas prices of all fromAddresses are not equal, please set them to equal values") } - if err := CheckFromAddressMaxGasPrices(jb, chain.Config().EVM().GasEstimator().PriceMaxKey); err != nil { - return nil, err + if err2 := CheckFromAddressMaxGasPrices(jb, chain.Config().EVM().GasEstimator().PriceMaxKey); err != nil { + return nil, err2 } // Get the LINKETHFEED address with retries diff --git a/core/web/presenters/job.go b/core/web/presenters/job.go index b1f42ebc68f..2aa97730881 100644 --- a/core/web/presenters/job.go +++ b/core/web/presenters/job.go @@ -525,6 +525,8 @@ func NewJobResource(j job.Job) *JobResource { resource.BootstrapSpec = NewBootstrapSpec(j.BootstrapSpec) case job.Gateway: resource.GatewaySpec = NewGatewaySpec(j.GatewaySpec) + case job.LegacyGasStationServer, job.LegacyGasStationSidecar: + // unsupported } jes := []JobError{} diff --git a/core/web/solana_chains_controller_test.go b/core/web/solana_chains_controller_test.go index 78453ad6ddc..5d6dc7424a2 100644 --- a/core/web/solana_chains_controller_test.go +++ b/core/web/solana_chains_controller_test.go @@ -80,7 +80,7 @@ Nodes = [] t.Run(tc.name, func(t *testing.T) { t.Parallel() - controller := setupSolanaChainsControllerTestV2(t, &solana.SolanaConfig{ + controller := setupSolanaChainsControllerTestV2(t, &solana.TOMLConfig{ ChainID: ptr(validId), Chain: config.Chain{ SkipPreflight: ptr(false), @@ -111,13 +111,13 @@ Nodes = [] func Test_SolanaChainsController_Index(t *testing.T) { t.Parallel() - chainA := &solana.SolanaConfig{ + chainA := &solana.TOMLConfig{ ChainID: ptr(fmt.Sprintf("ChainlinktestA-%d", rand.Int31n(999999))), Chain: config.Chain{ TxTimeout: utils.MustNewDuration(time.Hour), }, } - chainB := &solana.SolanaConfig{ + chainB := &solana.TOMLConfig{ ChainID: ptr(fmt.Sprintf("ChainlinktestB-%d", rand.Int31n(999999))), Chain: config.Chain{ SkipPreflight: ptr(false), @@ -175,7 +175,7 @@ type TestSolanaChainsController struct { client cltest.HTTPClientCleaner } -func setupSolanaChainsControllerTestV2(t *testing.T, cfgs ...*solana.SolanaConfig) *TestSolanaChainsController { +func setupSolanaChainsControllerTestV2(t *testing.T, cfgs ...*solana.TOMLConfig) *TestSolanaChainsController { for i := range cfgs { cfgs[i].SetDefaults() } diff --git a/plugins/cmd/chainlink-solana/main.go b/plugins/cmd/chainlink-solana/main.go index 8640c04bd2f..392ec88405e 100644 --- a/plugins/cmd/chainlink-solana/main.go +++ b/plugins/cmd/chainlink-solana/main.go @@ -56,7 +56,7 @@ func (c *pluginRelayer) NewRelayer(ctx context.Context, config string, keystore d := toml.NewDecoder(strings.NewReader(config)) d.DisallowUnknownFields() var cfg struct { - Solana solana.SolanaConfig + Solana solana.TOMLConfig } if err := d.Decode(&cfg); err != nil { diff --git a/plugins/config.go b/plugins/config.go index 23de2164c0f..e84a08f1eac 100644 --- a/plugins/config.go +++ b/plugins/config.go @@ -156,7 +156,7 @@ func GetEnvConfig() (EnvConfig, error) { tracingEnabled: tracingEnabled, tracingCollectorTarget: tracingCollectorTarget, tracingAttributes: tracingAttributes, - tracingSamplingRatio: tracingSamplingRatio, + tracingSamplingRatio: tracingSamplingRatio, }, nil } diff --git a/plugins/loop_registry.go b/plugins/loop_registry.go index b69e8344d97..5dc3bf69920 100644 --- a/plugins/loop_registry.go +++ b/plugins/loop_registry.go @@ -6,6 +6,7 @@ import ( "sync" "github.com/smartcontractkit/chainlink-relay/pkg/logger" + "github.com/smartcontractkit/chainlink/v2/core/config" ) @@ -23,8 +24,8 @@ type RegisteredLoop struct { // LoopRegistry is responsible for assigning ports to plugins that are to be used for the // plugin's prometheus HTTP server, and for passing the tracing configuration to the plugin. type LoopRegistry struct { - mu sync.Mutex - registry map[string]*RegisteredLoop + mu sync.Mutex + registry map[string]*RegisteredLoop lggr logger.Logger cfgTracing config.Tracing diff --git a/plugins/plugin.go b/plugins/plugin.go index 6cb3c606b94..06a59224249 100644 --- a/plugins/plugin.go +++ b/plugins/plugin.go @@ -4,8 +4,7 @@ import ( "sync" "github.com/smartcontractkit/chainlink-relay/pkg/logger" - "github.com/smartcontractkit/chainlink-relay/pkg/types" - "github.com/smartcontractkit/chainlink/v2/core/services" + "github.com/smartcontractkit/chainlink-relay/pkg/services" ) // Base is a base layer for plugins to easily manage sub-[types.Service]s. @@ -13,13 +12,13 @@ type Base struct { Logger logger.Logger mu sync.RWMutex - srvs []types.Service + srvs []services.Service } func (p *Base) Ready() error { return nil } func (p *Base) Name() string { return p.Logger.Name() } -func (p *Base) SubService(s types.Service) { +func (p *Base) SubService(s services.Service) { p.mu.Lock() p.srvs = append(p.srvs, s) p.mu.Unlock()