From ca72c10eb391d0607c72e755998841001a28d315 Mon Sep 17 00:00:00 2001 From: Jordan Krage Date: Sun, 10 Sep 2023 09:53:01 -0500 Subject: [PATCH] G601: Implicit memory aliasing in for loop. receiver-naming: receiver name X should be consistent with previous receiver name Y for T `X` is a misspelling of `Y` (misspell) --- core/chains/evm/config/toml/defaults.go | 3 +- core/chains/evm/txmgr/confirmer_test.go | 41 +- core/chains/evm/txmgr/evm_tx_store.go | 240 +++---- core/chains/evm/txmgr/evm_tx_store_test.go | 3 +- core/cmd/evm_transaction_commands_test.go | 4 +- core/cmd/shell_local.go | 6 +- core/services/blockhashstore/feeder_test.go | 632 +++++++++--------- .../block_header_feeder_test.go | 122 ++-- core/services/cron/cron.go | 2 +- core/services/directrequest/delegate.go | 2 +- core/services/fluxmonitorv2/flux_monitor.go | 4 +- .../fluxmonitorv2/flux_monitor_test.go | 16 +- core/services/keeper/upkeep_executer.go | 2 +- core/services/ocr/delegate.go | 2 +- core/services/ocr2/delegate.go | 16 +- core/services/ocr2/plugins/median/services.go | 2 +- core/services/ocr2/plugins/mercury/plugin.go | 2 +- .../ocr2keeper/evm21/logprovider/recoverer.go | 2 +- .../evm21/registry_check_pipeline_test.go | 18 +- core/services/ocrcommon/data_source.go | 10 +- core/services/ocrcommon/data_source_test.go | 18 +- core/services/ocrcommon/run_saver.go | 8 +- core/services/ocrcommon/run_saver_test.go | 4 +- .../ocrcommon/transmitter_pipeline.go | 2 +- core/services/pipeline/mocks/runner.go | 12 +- core/services/pipeline/runner.go | 14 +- core/services/pipeline/runner_test.go | 8 +- core/services/pipeline/scheduler_test.go | 2 +- .../relay/evm/mercury/mocks/pipeline.go | 4 +- .../relay/evm/mercury/v1/data_source.go | 12 +- .../relay/evm/mercury/v1/data_source_test.go | 2 +- .../relay/evm/mercury/v2/data_source.go | 12 +- .../relay/evm/mercury/v3/data_source.go | 12 +- .../services/transmission/integration_test.go | 2 +- core/services/vrf/v1/listener_v1.go | 2 +- .../vrf/v2/integration_v2_plus_test.go | 2 +- core/services/vrf/v2/integration_v2_test.go | 11 +- core/services/vrf/v2/listener_v2.go | 4 +- core/services/vrf/v2/listener_v2_types.go | 4 +- core/services/webhook/delegate.go | 2 +- core/store/models/common_test.go | 2 +- core/utils/big_test.go | 2 +- core/web/bridge_types_controller_test.go | 3 +- ...ipeline_job_spec_errors_controller_test.go | 2 +- 44 files changed, 651 insertions(+), 624 deletions(-) diff --git a/core/chains/evm/config/toml/defaults.go b/core/chains/evm/config/toml/defaults.go index 8c32b81301d..239a97f585b 100644 --- a/core/chains/evm/config/toml/defaults.go +++ b/core/chains/evm/config/toml/defaults.go @@ -164,7 +164,8 @@ func (c *Chain) SetFrom(f *Chain) { c.GasEstimator.setFrom(&f.GasEstimator) if ks := f.KeySpecific; ks != nil { - for _, v := range ks { + for i := range ks { + v := ks[i] if i := slices.IndexFunc(c.KeySpecific, func(k KeySpecific) bool { return k.Key == v.Key }); i == -1 { c.KeySpecific = append(c.KeySpecific, v) } else { diff --git a/core/chains/evm/txmgr/confirmer_test.go b/core/chains/evm/txmgr/confirmer_test.go index 555ea09ff3a..e0070e35b17 100644 --- a/core/chains/evm/txmgr/confirmer_test.go +++ b/core/chains/evm/txmgr/confirmer_test.go @@ -1399,7 +1399,8 @@ func TestEthConfirmer_FindTxsRequiringRebroadcast(t *testing.T) { etx1 := cltest.MustInsertUnconfirmedEthTxWithBroadcastLegacyAttempt(t, txStore, nonce, fromAddress) nonce++ attempt1_1 := etx1.TxAttempts[0] - dbAttempt := txmgr.DbEthTxAttemptFromEthTxAttempt(&attempt1_1) + var dbAttempt txmgr.DbEthTxAttempt + dbAttempt.FromTxAttempt(&attempt1_1) require.NoError(t, db.Get(&dbAttempt, `UPDATE evm.tx_attempts SET broadcast_before_block_num=$1 WHERE id=$2 RETURNING *`, tooNew, attempt1_1.ID)) attempt1_2 := newBroadcastLegacyEthTxAttempt(t, etx1.ID) attempt1_2.BroadcastBeforeBlockNum = &onTheMoney @@ -1416,7 +1417,8 @@ func TestEthConfirmer_FindTxsRequiringRebroadcast(t *testing.T) { etx2 := cltest.MustInsertUnconfirmedEthTxWithBroadcastLegacyAttempt(t, txStore, nonce, fromAddress) nonce++ attempt2_1 := etx2.TxAttempts[0] - dbAttempt = txmgr.DbEthTxAttemptFromEthTxAttempt(&attempt2_1) + dbAttempt = txmgr.DbEthTxAttempt{} + dbAttempt.FromTxAttempt(&attempt2_1) require.NoError(t, db.Get(&dbAttempt, `UPDATE evm.tx_attempts SET broadcast_before_block_num=$1 WHERE id=$2 RETURNING *`, tooNew, attempt2_1.ID)) t.Run("returns nothing when the transaction has attempts that are too new", func(t *testing.T) { @@ -1463,13 +1465,15 @@ func TestEthConfirmer_FindTxsRequiringRebroadcast(t *testing.T) { etx3 := cltest.MustInsertUnconfirmedEthTxWithBroadcastLegacyAttempt(t, txStore, nonce, fromAddress) nonce++ attempt3_1 := etx3.TxAttempts[0] - dbAttempt = txmgr.DbEthTxAttemptFromEthTxAttempt(&attempt3_1) + dbAttempt = txmgr.DbEthTxAttempt{} + dbAttempt.FromTxAttempt(&attempt3_1) require.NoError(t, db.Get(&dbAttempt, `UPDATE evm.tx_attempts SET broadcast_before_block_num=$1 WHERE id=$2 RETURNING *`, oldEnough, attempt3_1.ID)) // NOTE: It should ignore qualifying eth_txes from a different address etxOther := cltest.MustInsertUnconfirmedEthTxWithBroadcastLegacyAttempt(t, txStore, 0, otherAddress) attemptOther1 := etxOther.TxAttempts[0] - dbAttempt = txmgr.DbEthTxAttemptFromEthTxAttempt(&attemptOther1) + dbAttempt = txmgr.DbEthTxAttempt{} + dbAttempt.FromTxAttempt(&attemptOther1) require.NoError(t, db.Get(&dbAttempt, `UPDATE evm.tx_attempts SET broadcast_before_block_num=$1 WHERE id=$2 RETURNING *`, oldEnough, attemptOther1.ID)) t.Run("returns the transaction if it is unconfirmed with an attempt that is older than gasBumpThreshold blocks", func(t *testing.T) { @@ -1519,14 +1523,16 @@ func TestEthConfirmer_FindTxsRequiringRebroadcast(t *testing.T) { etx4 := cltest.MustInsertUnconfirmedEthTxWithBroadcastLegacyAttempt(t, txStore, nonce, fromAddress) nonce++ attempt4_1 := etx4.TxAttempts[0] - dbAttempt = txmgr.DbEthTxAttemptFromEthTxAttempt(&attemptOther1) + dbAttempt = txmgr.DbEthTxAttempt{} + dbAttempt.FromTxAttempt(&attempt4_1) require.NoError(t, db.Get(&dbAttempt, `UPDATE evm.tx_attempts SET broadcast_before_block_num=$1 WHERE id=$2 RETURNING *`, oldEnough, attempt4_1.ID)) t.Run("ignores pending transactions for another key", func(t *testing.T) { // Re-use etx3 nonce for another key, it should not affect the results for this key etxOther := cltest.MustInsertUnconfirmedEthTxWithBroadcastLegacyAttempt(t, txStore, (*etx3.Sequence).Int64(), otherAddress) aOther := etxOther.TxAttempts[0] - dbAttempt = txmgr.DbEthTxAttemptFromEthTxAttempt(&aOther) + dbAttempt = txmgr.DbEthTxAttempt{} + dbAttempt.FromTxAttempt(&aOther) require.NoError(t, db.Get(&dbAttempt, `UPDATE evm.tx_attempts SET broadcast_before_block_num=$1 WHERE id=$2 RETURNING *`, oldEnough, aOther.ID)) etxs, err := ec.FindTxsRequiringRebroadcast(testutils.Context(t), lggr, evmFromAddress, currentHead, gasBumpThreshold, 6, 0, &cltest.FixtureChainID) @@ -1659,7 +1665,8 @@ func TestEthConfirmer_RebroadcastWhereNecessary_WithConnectivityCheck(t *testing etx := cltest.MustInsertUnconfirmedEthTxWithBroadcastLegacyAttempt(t, txStore, nonce, fromAddress, originalBroadcastAt) attempt1 := etx.TxAttempts[0] - dbAttempt := txmgr.DbEthTxAttemptFromEthTxAttempt(&attempt1) + var dbAttempt txmgr.DbEthTxAttempt + dbAttempt.FromTxAttempt(&attempt1) require.NoError(t, db.Get(&dbAttempt, `UPDATE evm.tx_attempts SET broadcast_before_block_num=$1 WHERE id=$2 RETURNING *`, oldEnough, attempt1.ID)) // Send transaction and assume success. @@ -1703,7 +1710,8 @@ func TestEthConfirmer_RebroadcastWhereNecessary_WithConnectivityCheck(t *testing etx := cltest.MustInsertUnconfirmedEthTxWithBroadcastDynamicFeeAttempt(t, txStore, nonce, fromAddress, originalBroadcastAt) attempt1 := etx.TxAttempts[0] - dbAttempt := txmgr.DbEthTxAttemptFromEthTxAttempt(&attempt1) + var dbAttempt txmgr.DbEthTxAttempt + dbAttempt.FromTxAttempt(&attempt1) require.NoError(t, db.Get(&dbAttempt, `UPDATE evm.tx_attempts SET broadcast_before_block_num=$1 WHERE id=$2 RETURNING *`, oldEnough, attempt1.ID)) // Send transaction and assume success. @@ -1974,7 +1982,7 @@ func TestEthConfirmer_RebroadcastWhereNecessary(t *testing.T) { require.Equal(t, txmgrtypes.TxAttemptBroadcast, etx.TxAttempts[3].State) }) - // Mark original tx as confirmed so we won't pick it up any more + // Mark original tx as confirmed, so we won't pick it up anymore pgtest.MustExec(t, db, `UPDATE evm.txes SET state = 'confirmed'`) etx2 := cltest.MustInsertUnconfirmedEthTxWithBroadcastLegacyAttempt(t, txStore, nonce, fromAddress) @@ -2083,7 +2091,7 @@ func TestEthConfirmer_RebroadcastWhereNecessary(t *testing.T) { assert.Equal(t, txmgrtypes.TxAttemptBroadcast, etx2.TxAttempts[2].State) }) - // Original tx is confirmed so we won't pick it up any more + // Original tx is confirmed, so we won't pick it up anymore etx3 := cltest.MustInsertUnconfirmedEthTxWithBroadcastLegacyAttempt(t, txStore, nonce, fromAddress) nonce++ attempt3_1 := etx3.TxAttempts[0] @@ -2214,7 +2222,7 @@ func TestEthConfirmer_RebroadcastWhereNecessary(t *testing.T) { ethClient.On("SendTransactionReturnCode", mock.Anything, mock.MatchedBy(func(tx *types.Transaction) bool { return evmtypes.Nonce(tx.Nonce()) == *etx3.Sequence && gasPrice.Cmp(tx.GasPrice()) == 0 - }), fromAddress).Return(clienttypes.Successful, errors.New("already known")).Once() // we already submitted at this price, now its time to bump and submit again but since we simply resubmitted rather than increasing gas price, geth already knows about this tx + }), fromAddress).Return(clienttypes.Successful, errors.New("already known")).Once() // we already submitted at this price, now it's time to bump and submit again but since we simply resubmitted rather than increasing gas price, geth already knows about this tx // Do the thing require.NoError(t, ec2.RebroadcastWhereNecessary(testutils.Context(t), currentHead)) @@ -2245,7 +2253,7 @@ func TestEthConfirmer_RebroadcastWhereNecessary(t *testing.T) { ethClient.On("SendTransactionReturnCode", mock.Anything, mock.MatchedBy(func(tx *types.Transaction) bool { return evmtypes.Nonce(tx.Nonce()) == *etx3.Sequence && gasPrice.Cmp(tx.GasPrice()) == 0 - }), fromAddress).Return(clienttypes.Successful, errors.New("already known")).Once() // we already submitted at this price, now its time to bump and submit again but since we simply resubmitted rather than increasing gas price, geth already knows about this tx + }), fromAddress).Return(clienttypes.Successful, errors.New("already known")).Once() // we already submitted at this price, now it's time to bump and submit again but since we simply resubmitted rather than increasing gas price, geth already knows about this tx // Do the thing require.NoError(t, ec2.RebroadcastWhereNecessary(testutils.Context(t), currentHead)) @@ -2430,7 +2438,8 @@ func TestEthConfirmer_RebroadcastWhereNecessary_TerminallyUnderpriced_ThenGoesTh etx := cltest.MustInsertUnconfirmedEthTxWithBroadcastLegacyAttempt(t, txStore, nonce, fromAddress) nonce++ legacyAttempt := etx.TxAttempts[0] - dbAttempt := txmgr.DbEthTxAttemptFromEthTxAttempt(&legacyAttempt) + var dbAttempt txmgr.DbEthTxAttempt + dbAttempt.FromTxAttempt(&legacyAttempt) require.NoError(t, db.Get(&dbAttempt, `UPDATE evm.tx_attempts SET broadcast_before_block_num=$1 WHERE id=$2 RETURNING *`, oldEnough, legacyAttempt.ID)) // Fail a few times with terminally underpriced @@ -2462,7 +2471,8 @@ func TestEthConfirmer_RebroadcastWhereNecessary_TerminallyUnderpriced_ThenGoesTh etx := cltest.MustInsertUnconfirmedEthTxWithBroadcastDynamicFeeAttempt(t, txStore, nonce, fromAddress) nonce++ dxFeeAttempt := etx.TxAttempts[0] - dbAttempt := txmgr.DbEthTxAttemptFromEthTxAttempt(&dxFeeAttempt) + var dbAttempt txmgr.DbEthTxAttempt + dbAttempt.FromTxAttempt(&dxFeeAttempt) require.NoError(t, db.Get(&dbAttempt, `UPDATE evm.tx_attempts SET broadcast_before_block_num=$1 WHERE id=$2 RETURNING *`, oldEnough, dxFeeAttempt.ID)) // Fail a few times with terminally underpriced @@ -2513,7 +2523,8 @@ func TestEthConfirmer_RebroadcastWhereNecessary_WhenOutOfEth(t *testing.T) { etx := cltest.MustInsertUnconfirmedEthTxWithBroadcastLegacyAttempt(t, txStore, nonce, fromAddress) nonce++ attempt1_1 := etx.TxAttempts[0] - dbAttempt := txmgr.DbEthTxAttemptFromEthTxAttempt(&attempt1_1) + var dbAttempt txmgr.DbEthTxAttempt + dbAttempt.FromTxAttempt(&attempt1_1) require.NoError(t, db.Get(&dbAttempt, `UPDATE evm.tx_attempts SET broadcast_before_block_num=$1 WHERE id=$2 RETURNING *`, oldEnough, attempt1_1.ID)) var attempt1_2 txmgr.TxAttempt diff --git a/core/chains/evm/txmgr/evm_tx_store.go b/core/chains/evm/txmgr/evm_tx_store.go index 4585d868603..52cd50cba32 100644 --- a/core/chains/evm/txmgr/evm_tx_store.go +++ b/core/chains/evm/txmgr/evm_tx_store.go @@ -182,66 +182,62 @@ type DbEthTx struct { InitialBroadcastAt *time.Time } -func DbEthTxFromEthTx(ethTx *Tx) DbEthTx { - tx := DbEthTx{ - ID: ethTx.ID, - FromAddress: ethTx.FromAddress, - ToAddress: ethTx.ToAddress, - EncodedPayload: ethTx.EncodedPayload, - Value: assets.Eth(ethTx.Value), - GasLimit: ethTx.FeeLimit, - Error: ethTx.Error, - BroadcastAt: ethTx.BroadcastAt, - CreatedAt: ethTx.CreatedAt, - State: ethTx.State, - Meta: ethTx.Meta, - Subject: ethTx.Subject, - PipelineTaskRunID: ethTx.PipelineTaskRunID, - MinConfirmations: ethTx.MinConfirmations, - TransmitChecker: ethTx.TransmitChecker, - InitialBroadcastAt: ethTx.InitialBroadcastAt, - } - - if ethTx.ChainID != nil { - tx.EVMChainID = *utils.NewBig(ethTx.ChainID) - } - if ethTx.Sequence != nil { - n := ethTx.Sequence.Int64() - tx.Nonce = &n - } - - return tx -} - -func DbEthTxToEthTx(dbEthTx DbEthTx, evmEthTx *Tx) { - evmEthTx.ID = dbEthTx.ID - if dbEthTx.Nonce != nil { - n := evmtypes.Nonce(*dbEthTx.Nonce) - evmEthTx.Sequence = &n - } - evmEthTx.IdempotencyKey = dbEthTx.IdempotencyKey - evmEthTx.FromAddress = dbEthTx.FromAddress - evmEthTx.ToAddress = dbEthTx.ToAddress - evmEthTx.EncodedPayload = dbEthTx.EncodedPayload - evmEthTx.Value = *dbEthTx.Value.ToInt() - evmEthTx.FeeLimit = dbEthTx.GasLimit - evmEthTx.Error = dbEthTx.Error - evmEthTx.BroadcastAt = dbEthTx.BroadcastAt - evmEthTx.CreatedAt = dbEthTx.CreatedAt - evmEthTx.State = dbEthTx.State - evmEthTx.Meta = dbEthTx.Meta - evmEthTx.Subject = dbEthTx.Subject - evmEthTx.PipelineTaskRunID = dbEthTx.PipelineTaskRunID - evmEthTx.MinConfirmations = dbEthTx.MinConfirmations - evmEthTx.ChainID = dbEthTx.EVMChainID.ToInt() - evmEthTx.TransmitChecker = dbEthTx.TransmitChecker - evmEthTx.InitialBroadcastAt = dbEthTx.InitialBroadcastAt +func (db *DbEthTx) FromTx(tx *Tx) { + db.ID = tx.ID + db.FromAddress = tx.FromAddress + db.ToAddress = tx.ToAddress + db.EncodedPayload = tx.EncodedPayload + db.Value = assets.Eth(tx.Value) + db.GasLimit = tx.FeeLimit + db.Error = tx.Error + db.BroadcastAt = tx.BroadcastAt + db.CreatedAt = tx.CreatedAt + db.State = tx.State + db.Meta = tx.Meta + db.Subject = tx.Subject + db.PipelineTaskRunID = tx.PipelineTaskRunID + db.MinConfirmations = tx.MinConfirmations + db.TransmitChecker = tx.TransmitChecker + db.InitialBroadcastAt = tx.InitialBroadcastAt + + if tx.ChainID != nil { + db.EVMChainID = *utils.NewBig(tx.ChainID) + } + if tx.Sequence != nil { + n := tx.Sequence.Int64() + db.Nonce = &n + } +} + +func (db DbEthTx) ToTx(tx *Tx) { + tx.ID = db.ID + if db.Nonce != nil { + n := evmtypes.Nonce(*db.Nonce) + tx.Sequence = &n + } + tx.IdempotencyKey = db.IdempotencyKey + tx.FromAddress = db.FromAddress + tx.ToAddress = db.ToAddress + tx.EncodedPayload = db.EncodedPayload + tx.Value = *db.Value.ToInt() + tx.FeeLimit = db.GasLimit + tx.Error = db.Error + tx.BroadcastAt = db.BroadcastAt + tx.CreatedAt = db.CreatedAt + tx.State = db.State + tx.Meta = db.Meta + tx.Subject = db.Subject + tx.PipelineTaskRunID = db.PipelineTaskRunID + tx.MinConfirmations = db.MinConfirmations + tx.ChainID = db.EVMChainID.ToInt() + tx.TransmitChecker = db.TransmitChecker + tx.InitialBroadcastAt = db.InitialBroadcastAt } func dbEthTxsToEvmEthTxs(dbEthTxs []DbEthTx) []Tx { evmEthTxs := make([]Tx, len(dbEthTxs)) for i, dbTx := range dbEthTxs { - DbEthTxToEthTx(dbTx, &evmEthTxs[i]) + dbTx.ToTx(&evmEthTxs[i]) } return evmEthTxs } @@ -249,7 +245,7 @@ func dbEthTxsToEvmEthTxs(dbEthTxs []DbEthTx) []Tx { func dbEthTxsToEvmEthTxPtrs(dbEthTxs []DbEthTx, evmEthTxs []*Tx) { for i, dbTx := range dbEthTxs { evmEthTxs[i] = &Tx{} - DbEthTxToEthTx(dbTx, evmEthTxs[i]) + dbTx.ToTx(evmEthTxs[i]) } } @@ -270,29 +266,25 @@ type DbEthTxAttempt struct { GasFeeCap *assets.Wei } -func DbEthTxAttemptFromEthTxAttempt(ethTxAttempt *TxAttempt) DbEthTxAttempt { - dbTx := DbEthTxAttempt{ - ID: ethTxAttempt.ID, - EthTxID: ethTxAttempt.TxID, - GasPrice: ethTxAttempt.TxFee.Legacy, - SignedRawTx: ethTxAttempt.SignedRawTx, - Hash: ethTxAttempt.Hash, - BroadcastBeforeBlockNum: ethTxAttempt.BroadcastBeforeBlockNum, - CreatedAt: ethTxAttempt.CreatedAt, - ChainSpecificGasLimit: ethTxAttempt.ChainSpecificFeeLimit, - TxType: ethTxAttempt.TxType, - GasTipCap: ethTxAttempt.TxFee.DynamicTipCap, - GasFeeCap: ethTxAttempt.TxFee.DynamicFeeCap, - } +func (db *DbEthTxAttempt) FromTxAttempt(attempt *TxAttempt) { + db.ID = attempt.ID + db.EthTxID = attempt.TxID + db.GasPrice = attempt.TxFee.Legacy + db.SignedRawTx = attempt.SignedRawTx + db.Hash = attempt.Hash + db.BroadcastBeforeBlockNum = attempt.BroadcastBeforeBlockNum + db.CreatedAt = attempt.CreatedAt + db.ChainSpecificGasLimit = attempt.ChainSpecificFeeLimit + db.TxType = attempt.TxType + db.GasTipCap = attempt.TxFee.DynamicTipCap + db.GasFeeCap = attempt.TxFee.DynamicFeeCap // handle state naming difference between generic + EVM - if ethTxAttempt.State == txmgrtypes.TxAttemptInsufficientFunds { - dbTx.State = "insufficient_eth" + if attempt.State == txmgrtypes.TxAttemptInsufficientFunds { + db.State = "insufficient_eth" } else { - dbTx.State = ethTxAttempt.State.String() + db.State = attempt.State.String() } - - return dbTx } func DbEthTxAttemptStateToTxAttemptState(state string) txmgrtypes.TxAttemptState { @@ -302,27 +294,27 @@ func DbEthTxAttemptStateToTxAttemptState(state string) txmgrtypes.TxAttemptState return txmgrtypes.NewTxAttemptState(state) } -func DbEthTxAttemptToEthTxAttempt(dbEthTxAttempt DbEthTxAttempt, evmAttempt *TxAttempt) { - evmAttempt.ID = dbEthTxAttempt.ID - evmAttempt.TxID = dbEthTxAttempt.EthTxID - evmAttempt.SignedRawTx = dbEthTxAttempt.SignedRawTx - evmAttempt.Hash = dbEthTxAttempt.Hash - evmAttempt.BroadcastBeforeBlockNum = dbEthTxAttempt.BroadcastBeforeBlockNum - evmAttempt.State = DbEthTxAttemptStateToTxAttemptState(dbEthTxAttempt.State) - evmAttempt.CreatedAt = dbEthTxAttempt.CreatedAt - evmAttempt.ChainSpecificFeeLimit = dbEthTxAttempt.ChainSpecificGasLimit - evmAttempt.TxType = dbEthTxAttempt.TxType - evmAttempt.TxFee = gas.EvmFee{ - Legacy: dbEthTxAttempt.GasPrice, - DynamicTipCap: dbEthTxAttempt.GasTipCap, - DynamicFeeCap: dbEthTxAttempt.GasFeeCap, +func (db DbEthTxAttempt) ToTxAttempt(attempt *TxAttempt) { + attempt.ID = db.ID + attempt.TxID = db.EthTxID + attempt.SignedRawTx = db.SignedRawTx + attempt.Hash = db.Hash + attempt.BroadcastBeforeBlockNum = db.BroadcastBeforeBlockNum + attempt.State = DbEthTxAttemptStateToTxAttemptState(db.State) + attempt.CreatedAt = db.CreatedAt + attempt.ChainSpecificFeeLimit = db.ChainSpecificGasLimit + attempt.TxType = db.TxType + attempt.TxFee = gas.EvmFee{ + Legacy: db.GasPrice, + DynamicTipCap: db.GasTipCap, + DynamicFeeCap: db.GasFeeCap, } } func dbEthTxAttemptsToEthTxAttempts(dbEthTxAttempt []DbEthTxAttempt) []TxAttempt { evmEthTxAttempt := make([]TxAttempt, len(dbEthTxAttempt)) for i, dbTxAttempt := range dbEthTxAttempt { - DbEthTxAttemptToEthTxAttempt(dbTxAttempt, &evmEthTxAttempt[i]) + dbTxAttempt.ToTxAttempt(&evmEthTxAttempt[i]) } return evmEthTxAttempt } @@ -379,7 +371,7 @@ func (o *evmTxStore) preloadTxAttempts(txs []Tx) error { for i, tx := range txs { if tx.ID == dbAttempt.EthTxID { var attempt TxAttempt - DbEthTxAttemptToEthTxAttempt(dbAttempt, &attempt) + dbAttempt.ToTxAttempt(&attempt) txs[i].TxAttempts = append(txs[i].TxAttempts, attempt) } } @@ -405,7 +397,7 @@ func (o *evmTxStore) PreloadTxes(attempts []TxAttempt, qopts ...pg.QOpt) error { } for _, dbEtx := range dbEthTxs { etx := ethTxM[dbEtx.ID] - DbEthTxToEthTx(dbEtx, &etx) + dbEtx.ToTx(&etx) ethTxM[etx.ID] = etx } for i, attempt := range attempts { @@ -475,7 +467,7 @@ func (o *evmTxStore) FindTxAttempt(hash common.Hash) (*TxAttempt, error) { } // reuse the preload var attempt TxAttempt - DbEthTxAttemptToEthTxAttempt(dbTxAttempt, &attempt) + dbTxAttempt.ToTxAttempt(&attempt) attempts := []TxAttempt{attempt} err := o.PreloadTxes(attempts) return &attempts[0], err @@ -502,7 +494,7 @@ func (o *evmTxStore) FindTxByHash(hash common.Hash) (*Tx, error) { }, pg.OptReadOnlyTx()) var etx Tx - DbEthTxToEthTx(dbEtx, &etx) + dbEtx.ToTx(&etx) return &etx, pkgerrors.Wrap(err, "FindEthTxByHash failed") } @@ -514,17 +506,19 @@ func (o *evmTxStore) InsertTx(etx *Tx) error { const insertEthTxSQL = `INSERT INTO evm.txes (nonce, from_address, to_address, encoded_payload, value, gas_limit, error, broadcast_at, initial_broadcast_at, created_at, state, meta, subject, pipeline_task_run_id, min_confirmations, evm_chain_id, transmit_checker) VALUES ( :nonce, :from_address, :to_address, :encoded_payload, :value, :gas_limit, :error, :broadcast_at, :initial_broadcast_at, :created_at, :state, :meta, :subject, :pipeline_task_run_id, :min_confirmations, :evm_chain_id, :transmit_checker ) RETURNING *` - dbTx := DbEthTxFromEthTx(etx) + var dbTx DbEthTx + dbTx.FromTx(etx) err := o.q.GetNamed(insertEthTxSQL, &dbTx, &dbTx) - DbEthTxToEthTx(dbTx, etx) + dbTx.ToTx(etx) return pkgerrors.Wrap(err, "InsertTx failed") } // InsertTxAttempt inserts a new txAttempt into the database func (o *evmTxStore) InsertTxAttempt(attempt *TxAttempt) error { - dbTxAttempt := DbEthTxAttemptFromEthTxAttempt(attempt) + var dbTxAttempt DbEthTxAttempt + dbTxAttempt.FromTxAttempt(attempt) err := o.q.GetNamed(insertIntoEthTxAttemptsQuery, &dbTxAttempt, &dbTxAttempt) - DbEthTxAttemptToEthTxAttempt(dbTxAttempt, attempt) + dbTxAttempt.ToTxAttempt(attempt) return pkgerrors.Wrap(err, "InsertTxAttempt failed") } @@ -548,7 +542,7 @@ func (o *evmTxStore) FindTxWithAttempts(etxID int64) (etx Tx, err error) { if err = tx.Get(&dbEtx, `SELECT * FROM evm.txes WHERE id = $1 ORDER BY created_at ASC, id ASC`, etxID); err != nil { return pkgerrors.Wrapf(err, "failed to find eth_tx with id %d", etxID) } - DbEthTxToEthTx(dbEtx, &etx) + dbEtx.ToTx(&etx) if err = o.LoadTxAttempts(&etx, pg.WithQueryer(tx)); err != nil { return pkgerrors.Wrapf(err, "failed to load evm.tx_attempts for eth_tx with id %d", etxID) } @@ -591,7 +585,7 @@ func (o *evmTxStore) LoadTxesAttempts(etxs []*Tx, qopts ...pg.QOpt) error { for _, dbAttempt := range dbTxAttempts { etx := ethTxesM[dbAttempt.EthTxID] var attempt TxAttempt - DbEthTxAttemptToEthTxAttempt(dbAttempt, &attempt) + dbAttempt.ToTxAttempt(&attempt) etx.TxAttempts = append(etx.TxAttempts, attempt) } return nil @@ -919,7 +913,7 @@ func (o *evmTxStore) FindTxWithIdempotencyKey(idempotencyKey string, chainID *bi return nil, pkgerrors.Wrap(err, "FindTxWithIdempotencyKey failed to load evm.txes") } etx = new(Tx) - DbEthTxToEthTx(dbEtx, etx) + dbEtx.ToTx(etx) return } @@ -934,7 +928,7 @@ SELECT * FROM evm.txes WHERE from_address = $1 AND nonce = $2 AND state IN ('con if err != nil { return pkgerrors.Wrap(err, "FindEthTxWithNonce failed to load evm.txes") } - DbEthTxToEthTx(dbEtx, etx) + dbEtx.ToTx(etx) err = o.LoadTxAttempts(etx, pg.WithQueryer(tx)) return pkgerrors.Wrap(err, "FindEthTxWithNonce failed to load evm.tx_attempts") }, pg.OptReadOnlyTx()) @@ -1008,7 +1002,8 @@ ORDER BY nonce ASC func saveAttemptWithNewState(q pg.Queryer, timeout time.Duration, logger logger.Logger, attempt TxAttempt, broadcastAt time.Time) error { ctx, cancel := context.WithTimeout(context.Background(), timeout) - dbAttempt := DbEthTxAttemptFromEthTxAttempt(&attempt) + var dbAttempt DbEthTxAttempt + dbAttempt.FromTxAttempt(&attempt) defer cancel() return pg.SqlxTransaction(ctx, q, logger, func(tx pg.Queryer) error { // In case of null broadcast_at (shouldn't happen) we don't want to @@ -1075,7 +1070,8 @@ func (o *evmTxStore) SaveInProgressAttempt(attempt *TxAttempt) error { if attempt.State != txmgrtypes.TxAttemptInProgress { return errors.New("SaveInProgressAttempt failed: attempt state must be in_progress") } - dbAttempt := DbEthTxAttemptFromEthTxAttempt(attempt) + var dbAttempt DbEthTxAttempt + dbAttempt.FromTxAttempt(attempt) // Insert is the usual mode because the attempt is new if attempt.ID == 0 { query, args, e := o.q.BindNamed(insertIntoEthTxAttemptsQuery, &dbAttempt) @@ -1083,7 +1079,7 @@ func (o *evmTxStore) SaveInProgressAttempt(attempt *TxAttempt) error { return pkgerrors.Wrap(e, "SaveInProgressAttempt failed to BindNamed") } e = o.q.Get(&dbAttempt, query, args...) - DbEthTxAttemptToEthTxAttempt(dbAttempt, attempt) + dbAttempt.ToTxAttempt(attempt) return pkgerrors.Wrap(e, "SaveInProgressAttempt failed to insert into evm.tx_attempts") } // Update only applies to case of insufficient eth and simply changes the state to in_progress @@ -1265,13 +1261,14 @@ func (o *evmTxStore) SaveReplacementInProgressAttempt(oldAttempt TxAttempt, repl if _, err := tx.Exec(`DELETE FROM evm.tx_attempts WHERE id=$1`, oldAttempt.ID); err != nil { return pkgerrors.Wrap(err, "saveReplacementInProgressAttempt failed to delete from evm.tx_attempts") } - dbAttempt := DbEthTxAttemptFromEthTxAttempt(replacementAttempt) + var dbAttempt DbEthTxAttempt + dbAttempt.FromTxAttempt(replacementAttempt) query, args, e := tx.BindNamed(insertIntoEthTxAttemptsQuery, &dbAttempt) if e != nil { return pkgerrors.Wrap(e, "saveReplacementInProgressAttempt failed to BindNamed") } e = tx.Get(&dbAttempt, query, args...) - DbEthTxAttemptToEthTxAttempt(dbAttempt, replacementAttempt) + dbAttempt.ToTxAttempt(replacementAttempt) return pkgerrors.Wrap(e, "saveReplacementInProgressAttempt failed to insert replacement attempt") }) } @@ -1281,7 +1278,7 @@ func (o *evmTxStore) FindNextUnstartedTransactionFromAddress(etx *Tx, fromAddres qq := o.q.WithOpts(qopts...) var dbEtx DbEthTx err := qq.Get(&dbEtx, `SELECT * FROM evm.txes WHERE from_address = $1 AND state = 'unstarted' AND evm_chain_id = $2 ORDER BY value ASC, created_at ASC, id ASC`, fromAddress, chainID.String()) - DbEthTxToEthTx(dbEtx, etx) + dbEtx.ToTx(etx) return pkgerrors.Wrap(err, "failed to FindNextUnstartedTransactionFromAddress") } @@ -1302,9 +1299,10 @@ func (o *evmTxStore) UpdateTxFatalError(etx *Tx, qopts ...pg.QOpt) error { if _, err := tx.Exec(`DELETE FROM evm.tx_attempts WHERE eth_tx_id = $1`, etx.ID); err != nil { return pkgerrors.Wrapf(err, "saveFatallyErroredTransaction failed to delete eth_tx_attempt with eth_tx.ID %v", etx.ID) } - dbEtx := DbEthTxFromEthTx(etx) + var dbEtx DbEthTx + dbEtx.FromTx(etx) err := pkgerrors.Wrap(tx.Get(&dbEtx, `UPDATE evm.txes SET state=$1, error=$2, broadcast_at=NULL, initial_broadcast_at=NULL, nonce=NULL WHERE id=$3 RETURNING *`, etx.State, etx.Error, etx.ID), "saveFatallyErroredTransaction failed to save eth_tx") - DbEthTxToEthTx(dbEtx, etx) + dbEtx.ToTx(etx) return err }) } @@ -1336,12 +1334,14 @@ func (o *evmTxStore) UpdateTxAttemptInProgressToBroadcast(etx *Tx, attempt TxAtt if err := incrNextNonceCallback(tx); err != nil { return pkgerrors.Wrap(err, "SaveEthTxAttempt failed on incrNextNonceCallback") } - dbEtx := DbEthTxFromEthTx(etx) + var dbEtx DbEthTx + dbEtx.FromTx(etx) if err := tx.Get(&dbEtx, `UPDATE evm.txes SET state=$1, error=$2, broadcast_at=$3, initial_broadcast_at=$4 WHERE id = $5 RETURNING *`, dbEtx.State, dbEtx.Error, dbEtx.BroadcastAt, dbEtx.InitialBroadcastAt, dbEtx.ID); err != nil { return pkgerrors.Wrap(err, "SaveEthTxAttempt failed to save eth_tx") } - DbEthTxToEthTx(dbEtx, etx) - dbAttempt := DbEthTxAttemptFromEthTxAttempt(&attempt) + dbEtx.ToTx(etx) + var dbAttempt DbEthTxAttempt + dbAttempt.FromTxAttempt(&attempt) if err := tx.Get(&dbAttempt, `UPDATE evm.tx_attempts SET state = $1 WHERE id = $2 RETURNING *`, dbAttempt.State, dbAttempt.ID); err != nil { return pkgerrors.Wrap(err, "SaveEthTxAttempt failed to save eth_tx_attempt") } @@ -1380,7 +1380,8 @@ func (o *evmTxStore) UpdateTxUnstartedToInProgress(etx *Tx, attempt *TxAttempt, return err } - dbAttempt := DbEthTxAttemptFromEthTxAttempt(attempt) + var dbAttempt DbEthTxAttempt + dbAttempt.FromTxAttempt(attempt) query, args, e := tx.BindNamed(insertIntoEthTxAttemptsQuery, &dbAttempt) if e != nil { return pkgerrors.Wrap(e, "failed to BindNamed") @@ -1397,10 +1398,11 @@ func (o *evmTxStore) UpdateTxUnstartedToInProgress(etx *Tx, attempt *TxAttempt, return pkgerrors.Wrap(err, "UpdateTxUnstartedToInProgress failed to create eth_tx_attempt") } } - DbEthTxAttemptToEthTxAttempt(dbAttempt, attempt) - dbEtx := DbEthTxFromEthTx(etx) + dbAttempt.ToTxAttempt(attempt) + var dbEtx DbEthTx + dbEtx.FromTx(etx) err = tx.Get(&dbEtx, `UPDATE evm.txes SET nonce=$1, state=$2, broadcast_at=$3, initial_broadcast_at=$4 WHERE id=$5 RETURNING *`, etx.Sequence, etx.State, etx.BroadcastAt, etx.InitialBroadcastAt, etx.ID) - DbEthTxToEthTx(dbEtx, etx) + dbEtx.ToTx(etx) return pkgerrors.Wrap(err, "UpdateTxUnstartedToInProgress failed to update eth_tx") }) } @@ -1424,7 +1426,7 @@ func (o *evmTxStore) GetTxInProgress(fromAddress common.Address, qopts ...pg.QOp } else if err != nil { return pkgerrors.Wrap(err, "GetTxInProgress failed while loading eth tx") } - DbEthTxToEthTx(dbEtx, etx) + dbEtx.ToTx(etx) if err = o.LoadTxAttempts(etx, pg.WithQueryer(tx)); err != nil { return pkgerrors.Wrap(err, "GetTxInProgress failed while loading EthTxAttempts") } @@ -1536,7 +1538,7 @@ RETURNING "txes".* return nil }) var etx Tx - DbEthTxToEthTx(dbEtx, &etx) + dbEtx.ToTx(&etx) return etx, err } diff --git a/core/chains/evm/txmgr/evm_tx_store_test.go b/core/chains/evm/txmgr/evm_tx_store_test.go index c4837f81e87..913652b7c8b 100644 --- a/core/chains/evm/txmgr/evm_tx_store_test.go +++ b/core/chains/evm/txmgr/evm_tx_store_test.go @@ -1100,7 +1100,8 @@ func TestORM_LoadEthTxesAttempts(t *testing.T) { q := pg.NewQ(db, logger.TestLogger(t), cfg.Database()) newAttempt := cltest.NewDynamicFeeEthTxAttempt(t, etx.ID) - dbAttempt := txmgr.DbEthTxAttemptFromEthTxAttempt(&newAttempt) + var dbAttempt txmgr.DbEthTxAttempt + dbAttempt.FromTxAttempt(&newAttempt) err := q.Transaction(func(tx pg.Queryer) error { const insertEthTxAttemptSQL = `INSERT INTO evm.tx_attempts (eth_tx_id, gas_price, signed_raw_tx, hash, broadcast_before_block_num, state, created_at, chain_specific_gas_limit, tx_type, gas_tip_cap, gas_fee_cap) VALUES ( :eth_tx_id, :gas_price, :signed_raw_tx, :hash, :broadcast_before_block_num, :state, NOW(), :chain_specific_gas_limit, :tx_type, :gas_tip_cap, :gas_fee_cap diff --git a/core/cmd/evm_transaction_commands_test.go b/core/cmd/evm_transaction_commands_test.go index f213aefb154..eb421b03968 100644 --- a/core/cmd/evm_transaction_commands_test.go +++ b/core/cmd/evm_transaction_commands_test.go @@ -181,7 +181,7 @@ func TestShell_SendEther_From_Txm(t *testing.T) { assert.Equal(t, dbEvmTx.Value.String(), output.Value) assert.Equal(t, fmt.Sprintf("%d", *dbEvmTx.Nonce), output.Nonce) - dbEvmTxAttempt := txmgr.DbEthTxAttempt{} + var dbEvmTxAttempt txmgr.DbEthTxAttempt require.NoError(t, db.Get(&dbEvmTxAttempt, `SELECT * FROM evm.tx_attempts`)) assert.Equal(t, dbEvmTxAttempt.Hash, output.Hash) } @@ -246,7 +246,7 @@ func TestShell_SendEther_From_Txm_WEI(t *testing.T) { assert.Equal(t, dbEvmTx.Value.String(), output.Value) assert.Equal(t, fmt.Sprintf("%d", *dbEvmTx.Nonce), output.Nonce) - dbEvmTxAttempt := txmgr.DbEthTxAttempt{} + var dbEvmTxAttempt txmgr.DbEthTxAttempt require.NoError(t, db.Get(&dbEvmTxAttempt, `SELECT * FROM evm.tx_attempts`)) assert.Equal(t, dbEvmTxAttempt.Hash, output.Hash) } diff --git a/core/cmd/shell_local.go b/core/cmd/shell_local.go index 051f28d8e7a..af8c1528de8 100644 --- a/core/cmd/shell_local.go +++ b/core/cmd/shell_local.go @@ -712,7 +712,7 @@ func (s *Shell) validateDB(c *cli.Context) error { } // ResetDatabase drops, creates and migrates the database specified by CL_DATABASE_URL or Database.URL -// in secrets TOML. This is useful to setup the database for testing +// in secrets TOML. This is useful to set up the database for testing func (s *Shell) ResetDatabase(c *cli.Context) error { cfg := s.Config.Database() parsed := cfg.URL() @@ -819,7 +819,7 @@ func dropDanglingTestDBs(lggr logger.Logger, db *sqlx.DB) (err error) { return } -// PrepareTestDatabase calls ResetDatabase then loads fixtures required for local +// PrepareTestDatabaseUserOnly calls ResetDatabase then loads only user fixtures required for local // testing against testnets. Does not include fake chain fixtures. func (s *Shell) PrepareTestDatabaseUserOnly(c *cli.Context) error { if err := s.ResetDatabase(c); err != nil { @@ -852,7 +852,7 @@ func (s *Shell) MigrateDatabase(_ *cli.Context) error { return nil } -// VersionDatabase displays the current database version. +// RollbackDatabase rolls back the database via down migrations. func (s *Shell) RollbackDatabase(c *cli.Context) error { var version null.Int if c.Args().Present() { diff --git a/core/services/blockhashstore/feeder_test.go b/core/services/blockhashstore/feeder_test.go index 08d7c0e9c46..d9e2c1bacdb 100644 --- a/core/services/blockhashstore/feeder_test.go +++ b/core/services/blockhashstore/feeder_test.go @@ -31,16 +31,12 @@ import ( const ( // VRF-only events. - randomWordsRequestedV2Plus string = "RandomWordsRequested" - randomWordsFulfilledV2Plus string = "RandomWordsFulfilled" - randomWordsRequestedV2 string = "RandomWordsRequested" - randomWordsFulfilledV2 string = "RandomWordsFulfilled" - randomWordsRequestedV1 string = "RandomnessRequest" - randomWordsFulfilledV1 string = "RandomnessRequestFulfilled" - randomnessFulfillmentRequestedEvent string = "RandomnessFulfillmentRequested" - randomWordsFulfilledEvent string = "RandomWordsFulfilled" - newTransmissionEvent string = "NewTransmission" - outputsServedEvent string = "OutputsServed" + randomWordsRequestedV2Plus string = "RandomWordsRequested" + randomWordsFulfilledV2Plus string = "RandomWordsFulfilled" + randomWordsRequestedV2 string = "RandomWordsRequested" + randomWordsFulfilledV2 string = "RandomWordsFulfilled" + randomWordsRequestedV1 string = "RandomnessRequest" + randomWordsFulfilledV1 string = "RandomnessRequestFulfilled" ) var ( @@ -50,18 +46,7 @@ var ( _ Coordinator = &TestCoordinator{} _ BHS = &TestBHS{} - tests = []struct { - name string - requests []Event - fulfillments []Event - wait int - lookback int - latest uint64 - bhs TestBHS - expectedStored []uint64 - expectedStoredMapBlocks []uint64 // expected state of stored map in Feeder struct - expectedErrMsg string - }{ + tests = []testCase{ { name: "single unfulfilled request", requests: []Event{{Block: 150, ID: "1000"}}, @@ -363,327 +348,344 @@ func TestStartHeartbeats(t *testing.T) { }) } -func TestFeeder(t *testing.T) { +type testCase struct { + name string + requests []Event + fulfillments []Event + wait int + lookback int + latest uint64 + bhs TestBHS + expectedStored []uint64 + expectedStoredMapBlocks []uint64 // expected state of stored map in Feeder struct + expectedErrMsg string +} +func TestFeeder(t *testing.T) { for _, test := range tests { - t.Run(test.name, func(t *testing.T) { - coordinator := &TestCoordinator{ - RequestEvents: test.requests, - FulfillmentEvents: test.fulfillments, - } - - lp := &mocklp.LogPoller{} - feeder := NewFeeder( - logger.TestLogger(t), - coordinator, - &test.bhs, - lp, - 0, - test.wait, - test.lookback, - 600*time.Second, - func(ctx context.Context) (uint64, error) { - return test.latest, nil - }) - - err := feeder.Run(testutils.Context(t)) - if test.expectedErrMsg == "" { - require.NoError(t, err) - } else { - require.EqualError(t, err, test.expectedErrMsg) - } - - require.ElementsMatch(t, test.expectedStored, test.bhs.Stored) - require.ElementsMatch(t, test.expectedStoredMapBlocks, maps.Keys(feeder.stored)) + t.Run(test.name, test.testFeeder) + } +} + +func (test testCase) testFeeder(t *testing.T) { + coordinator := &TestCoordinator{ + RequestEvents: test.requests, + FulfillmentEvents: test.fulfillments, + } + + lp := &mocklp.LogPoller{} + feeder := NewFeeder( + logger.TestLogger(t), + coordinator, + &test.bhs, + lp, + 0, + test.wait, + test.lookback, + 600*time.Second, + func(ctx context.Context) (uint64, error) { + return test.latest, nil }) + + err := feeder.Run(testutils.Context(t)) + if test.expectedErrMsg == "" { + require.NoError(t, err) + } else { + require.EqualError(t, err, test.expectedErrMsg) } + + require.ElementsMatch(t, test.expectedStored, test.bhs.Stored) + require.ElementsMatch(t, test.expectedStoredMapBlocks, maps.Keys(feeder.stored)) } func TestFeederWithLogPollerVRFv1(t *testing.T) { + for _, test := range tests { + t.Run(test.name, test.testFeederWithLogPollerVRFv1) + } +} +func (test testCase) testFeederWithLogPollerVRFv1(t *testing.T) { var coordinatorAddress = common.HexToAddress("0x514910771AF9Ca656af840dff83E8264EcF986CA") - for _, test := range tests { - t.Run(test.name, func(t *testing.T) { - // Instantiate log poller & coordinator. - lp := &mocklp.LogPoller{} - lp.On("RegisterFilter", mock.Anything).Return(nil) - c, err := solidity_vrf_coordinator_interface.NewVRFCoordinator(coordinatorAddress, nil) - require.NoError(t, err) - coordinator := &V1Coordinator{ - c: c, - lp: lp, - } - - // Assert search window. - latest := int64(test.latest) - fromBlock := mathutil.Max(latest-int64(test.lookback), 0) - toBlock := mathutil.Max(latest-int64(test.wait), 0) - - // Construct request logs. - var requestLogs []logpoller.Log - for _, r := range test.requests { - if r.Block < uint64(fromBlock) || r.Block > uint64(toBlock) { - continue // do not include blocks outside our search window - } - requestLogs = append( - requestLogs, - newRandomnessRequestedLogV1(t, r.Block, r.ID, coordinatorAddress), - ) - } - - // Construct fulfillment logs. - var fulfillmentLogs []logpoller.Log - for _, r := range test.fulfillments { - fulfillmentLogs = append( - fulfillmentLogs, - newRandomnessFulfilledLogV1(t, r.Block, r.ID, coordinatorAddress), - ) - } - - // Mock log poller. - lp.On("LatestBlock", mock.Anything). - Return(latest, nil) - lp.On( - "LogsWithSigs", - fromBlock, - toBlock, - []common.Hash{ - solidity_vrf_coordinator_interface.VRFCoordinatorRandomnessRequest{}.Topic(), - }, - coordinatorAddress, - mock.Anything, - ).Return(requestLogs, nil) - lp.On( - "LogsWithSigs", - fromBlock, - latest, - []common.Hash{ - solidity_vrf_coordinator_interface.VRFCoordinatorRandomnessRequestFulfilled{}.Topic(), - }, - coordinatorAddress, - mock.Anything, - ).Return(fulfillmentLogs, nil) - - // Instantiate feeder. - feeder := NewFeeder( - logger.TestLogger(t), - coordinator, - &test.bhs, - lp, - 0, - test.wait, - test.lookback, - 600*time.Second, - func(ctx context.Context) (uint64, error) { - return test.latest, nil - }) - - // Run feeder and assert correct results. - err = feeder.Run(testutils.Context(t)) - if test.expectedErrMsg == "" { - require.NoError(t, err) - } else { - require.EqualError(t, err, test.expectedErrMsg) - } - require.ElementsMatch(t, test.expectedStored, test.bhs.Stored) - require.ElementsMatch(t, test.expectedStoredMapBlocks, maps.Keys(feeder.stored)) + // Instantiate log poller & coordinator. + lp := &mocklp.LogPoller{} + lp.On("RegisterFilter", mock.Anything).Return(nil) + c, err := solidity_vrf_coordinator_interface.NewVRFCoordinator(coordinatorAddress, nil) + require.NoError(t, err) + coordinator := &V1Coordinator{ + c: c, + lp: lp, + } + + // Assert search window. + latest := int64(test.latest) + fromBlock := mathutil.Max(latest-int64(test.lookback), 0) + toBlock := mathutil.Max(latest-int64(test.wait), 0) + + // Construct request logs. + var requestLogs []logpoller.Log + for _, r := range test.requests { + if r.Block < uint64(fromBlock) || r.Block > uint64(toBlock) { + continue // do not include blocks outside our search window + } + requestLogs = append( + requestLogs, + newRandomnessRequestedLogV1(t, r.Block, r.ID, coordinatorAddress), + ) + } + + // Construct fulfillment logs. + var fulfillmentLogs []logpoller.Log + for _, r := range test.fulfillments { + fulfillmentLogs = append( + fulfillmentLogs, + newRandomnessFulfilledLogV1(t, r.Block, r.ID, coordinatorAddress), + ) + } + + // Mock log poller. + lp.On("LatestBlock", mock.Anything). + Return(latest, nil) + lp.On( + "LogsWithSigs", + fromBlock, + toBlock, + []common.Hash{ + solidity_vrf_coordinator_interface.VRFCoordinatorRandomnessRequest{}.Topic(), + }, + coordinatorAddress, + mock.Anything, + ).Return(requestLogs, nil) + lp.On( + "LogsWithSigs", + fromBlock, + latest, + []common.Hash{ + solidity_vrf_coordinator_interface.VRFCoordinatorRandomnessRequestFulfilled{}.Topic(), + }, + coordinatorAddress, + mock.Anything, + ).Return(fulfillmentLogs, nil) + + // Instantiate feeder. + feeder := NewFeeder( + logger.TestLogger(t), + coordinator, + &test.bhs, + lp, + 0, + test.wait, + test.lookback, + 600*time.Second, + func(ctx context.Context) (uint64, error) { + return test.latest, nil }) + + // Run feeder and assert correct results. + err = feeder.Run(testutils.Context(t)) + if test.expectedErrMsg == "" { + require.NoError(t, err) + } else { + require.EqualError(t, err, test.expectedErrMsg) } + require.ElementsMatch(t, test.expectedStored, test.bhs.Stored) + require.ElementsMatch(t, test.expectedStoredMapBlocks, maps.Keys(feeder.stored)) } func TestFeederWithLogPollerVRFv2(t *testing.T) { + for _, test := range tests { + t.Run(test.name, test.testFeederWithLogPollerVRFv2) + } +} +func (test testCase) testFeederWithLogPollerVRFv2(t *testing.T) { var coordinatorAddress = common.HexToAddress("0x514910771AF9Ca656af840dff83E8264EcF986CA") - for _, test := range tests { - t.Run(test.name, func(t *testing.T) { - // Instantiate log poller & coordinator. - lp := &mocklp.LogPoller{} - lp.On("RegisterFilter", mock.Anything).Return(nil) - c, err := vrf_coordinator_v2.NewVRFCoordinatorV2(coordinatorAddress, nil) - require.NoError(t, err) - coordinator := &V2Coordinator{ - c: c, - lp: lp, - } - - // Assert search window. - latest := int64(test.latest) - fromBlock := mathutil.Max(latest-int64(test.lookback), 0) - toBlock := mathutil.Max(latest-int64(test.wait), 0) - - // Construct request logs. - var requestLogs []logpoller.Log - for _, r := range test.requests { - if r.Block < uint64(fromBlock) || r.Block > uint64(toBlock) { - continue // do not include blocks outside our search window - } - reqId, ok := big.NewInt(0).SetString(r.ID, 10) - require.True(t, ok) - requestLogs = append( - requestLogs, - newRandomnessRequestedLogV2(t, r.Block, reqId, coordinatorAddress), - ) - } - - // Construct fulfillment logs. - var fulfillmentLogs []logpoller.Log - for _, r := range test.fulfillments { - reqId, ok := big.NewInt(0).SetString(r.ID, 10) - require.True(t, ok) - fulfillmentLogs = append( - fulfillmentLogs, - newRandomnessFulfilledLogV2(t, r.Block, reqId, coordinatorAddress), - ) - } - - // Mock log poller. - lp.On("LatestBlock", mock.Anything). - Return(latest, nil) - lp.On( - "LogsWithSigs", - fromBlock, - toBlock, - []common.Hash{ - vrf_coordinator_v2.VRFCoordinatorV2RandomWordsRequested{}.Topic(), - }, - coordinatorAddress, - mock.Anything, - ).Return(requestLogs, nil) - lp.On( - "LogsWithSigs", - fromBlock, - latest, - []common.Hash{ - vrf_coordinator_v2.VRFCoordinatorV2RandomWordsFulfilled{}.Topic(), - }, - coordinatorAddress, - mock.Anything, - ).Return(fulfillmentLogs, nil) - - // Instantiate feeder. - feeder := NewFeeder( - logger.TestLogger(t), - coordinator, - &test.bhs, - lp, - 0, - test.wait, - test.lookback, - 600*time.Second, - func(ctx context.Context) (uint64, error) { - return test.latest, nil - }) - - // Run feeder and assert correct results. - err = feeder.Run(testutils.Context(t)) - if test.expectedErrMsg == "" { - require.NoError(t, err) - } else { - require.EqualError(t, err, test.expectedErrMsg) - } - require.ElementsMatch(t, test.expectedStored, test.bhs.Stored) - require.ElementsMatch(t, test.expectedStoredMapBlocks, maps.Keys(feeder.stored)) + // Instantiate log poller & coordinator. + lp := &mocklp.LogPoller{} + lp.On("RegisterFilter", mock.Anything).Return(nil) + c, err := vrf_coordinator_v2.NewVRFCoordinatorV2(coordinatorAddress, nil) + require.NoError(t, err) + coordinator := &V2Coordinator{ + c: c, + lp: lp, + } + + // Assert search window. + latest := int64(test.latest) + fromBlock := mathutil.Max(latest-int64(test.lookback), 0) + toBlock := mathutil.Max(latest-int64(test.wait), 0) + + // Construct request logs. + var requestLogs []logpoller.Log + for _, r := range test.requests { + if r.Block < uint64(fromBlock) || r.Block > uint64(toBlock) { + continue // do not include blocks outside our search window + } + reqId, ok := big.NewInt(0).SetString(r.ID, 10) + require.True(t, ok) + requestLogs = append( + requestLogs, + newRandomnessRequestedLogV2(t, r.Block, reqId, coordinatorAddress), + ) + } + + // Construct fulfillment logs. + var fulfillmentLogs []logpoller.Log + for _, r := range test.fulfillments { + reqId, ok := big.NewInt(0).SetString(r.ID, 10) + require.True(t, ok) + fulfillmentLogs = append( + fulfillmentLogs, + newRandomnessFulfilledLogV2(t, r.Block, reqId, coordinatorAddress), + ) + } + + // Mock log poller. + lp.On("LatestBlock", mock.Anything). + Return(latest, nil) + lp.On( + "LogsWithSigs", + fromBlock, + toBlock, + []common.Hash{ + vrf_coordinator_v2.VRFCoordinatorV2RandomWordsRequested{}.Topic(), + }, + coordinatorAddress, + mock.Anything, + ).Return(requestLogs, nil) + lp.On( + "LogsWithSigs", + fromBlock, + latest, + []common.Hash{ + vrf_coordinator_v2.VRFCoordinatorV2RandomWordsFulfilled{}.Topic(), + }, + coordinatorAddress, + mock.Anything, + ).Return(fulfillmentLogs, nil) + + // Instantiate feeder. + feeder := NewFeeder( + logger.TestLogger(t), + coordinator, + &test.bhs, + lp, + 0, + test.wait, + test.lookback, + 600*time.Second, + func(ctx context.Context) (uint64, error) { + return test.latest, nil }) + + // Run feeder and assert correct results. + err = feeder.Run(testutils.Context(t)) + if test.expectedErrMsg == "" { + require.NoError(t, err) + } else { + require.EqualError(t, err, test.expectedErrMsg) } + require.ElementsMatch(t, test.expectedStored, test.bhs.Stored) + require.ElementsMatch(t, test.expectedStoredMapBlocks, maps.Keys(feeder.stored)) } func TestFeederWithLogPollerVRFv2Plus(t *testing.T) { + for _, test := range tests { + t.Run(test.name, test.testFeederWithLogPollerVRFv2Plus) + } +} +func (test testCase) testFeederWithLogPollerVRFv2Plus(t *testing.T) { var coordinatorAddress = common.HexToAddress("0x514910771AF9Ca656af840dff83E8264EcF986CA") - for _, test := range tests { - t.Run(test.name, func(t *testing.T) { - // Instantiate log poller & coordinator. - lp := &mocklp.LogPoller{} - lp.On("RegisterFilter", mock.Anything).Return(nil) - c, err := vrf_coordinator_v2plus.NewVRFCoordinatorV2Plus(coordinatorAddress, nil) - require.NoError(t, err) - coordinator := &V2PlusCoordinator{ - c: c, - lp: lp, - } - - // Assert search window. - latest := int64(test.latest) - fromBlock := mathutil.Max(latest-int64(test.lookback), 0) - toBlock := mathutil.Max(latest-int64(test.wait), 0) - - // Construct request logs. - var requestLogs []logpoller.Log - for _, r := range test.requests { - if r.Block < uint64(fromBlock) || r.Block > uint64(toBlock) { - continue // do not include blocks outside our search window - } - reqId, ok := big.NewInt(0).SetString(r.ID, 10) - require.True(t, ok) - requestLogs = append( - requestLogs, - newRandomnessRequestedLogV2Plus(t, r.Block, reqId, coordinatorAddress), - ) - } - - // Construct fulfillment logs. - var fulfillmentLogs []logpoller.Log - for _, r := range test.fulfillments { - reqId, ok := big.NewInt(0).SetString(r.ID, 10) - require.True(t, ok) - fulfillmentLogs = append( - fulfillmentLogs, - newRandomnessFulfilledLogV2Plus(t, r.Block, reqId, coordinatorAddress), - ) - } - - // Mock log poller. - lp.On("LatestBlock", mock.Anything). - Return(latest, nil) - lp.On( - "LogsWithSigs", - fromBlock, - toBlock, - []common.Hash{ - vrf_coordinator_v2plus.VRFCoordinatorV2PlusRandomWordsRequested{}.Topic(), - }, - coordinatorAddress, - mock.Anything, - ).Return(requestLogs, nil) - lp.On( - "LogsWithSigs", - fromBlock, - latest, - []common.Hash{ - vrf_coordinator_v2plus.VRFCoordinatorV2PlusRandomWordsFulfilled{}.Topic(), - }, - coordinatorAddress, - mock.Anything, - ).Return(fulfillmentLogs, nil) - - // Instantiate feeder. - feeder := NewFeeder( - logger.TestLogger(t), - coordinator, - &test.bhs, - lp, - 0, - test.wait, - test.lookback, - 600*time.Second, - func(ctx context.Context) (uint64, error) { - return test.latest, nil - }) - - // Run feeder and assert correct results. - err = feeder.Run(testutils.Context(t)) - if test.expectedErrMsg == "" { - require.NoError(t, err) - } else { - require.EqualError(t, err, test.expectedErrMsg) - } - require.ElementsMatch(t, test.expectedStored, test.bhs.Stored) - require.ElementsMatch(t, test.expectedStoredMapBlocks, maps.Keys(feeder.stored)) + // Instantiate log poller & coordinator. + lp := &mocklp.LogPoller{} + lp.On("RegisterFilter", mock.Anything).Return(nil) + c, err := vrf_coordinator_v2plus.NewVRFCoordinatorV2Plus(coordinatorAddress, nil) + require.NoError(t, err) + coordinator := &V2PlusCoordinator{ + c: c, + lp: lp, + } + + // Assert search window. + latest := int64(test.latest) + fromBlock := mathutil.Max(latest-int64(test.lookback), 0) + toBlock := mathutil.Max(latest-int64(test.wait), 0) + + // Construct request logs. + var requestLogs []logpoller.Log + for _, r := range test.requests { + if r.Block < uint64(fromBlock) || r.Block > uint64(toBlock) { + continue // do not include blocks outside our search window + } + reqId, ok := big.NewInt(0).SetString(r.ID, 10) + require.True(t, ok) + requestLogs = append( + requestLogs, + newRandomnessRequestedLogV2Plus(t, r.Block, reqId, coordinatorAddress), + ) + } + + // Construct fulfillment logs. + var fulfillmentLogs []logpoller.Log + for _, r := range test.fulfillments { + reqId, ok := big.NewInt(0).SetString(r.ID, 10) + require.True(t, ok) + fulfillmentLogs = append( + fulfillmentLogs, + newRandomnessFulfilledLogV2Plus(t, r.Block, reqId, coordinatorAddress), + ) + } + + // Mock log poller. + lp.On("LatestBlock", mock.Anything). + Return(latest, nil) + lp.On( + "LogsWithSigs", + fromBlock, + toBlock, + []common.Hash{ + vrf_coordinator_v2plus.VRFCoordinatorV2PlusRandomWordsRequested{}.Topic(), + }, + coordinatorAddress, + mock.Anything, + ).Return(requestLogs, nil) + lp.On( + "LogsWithSigs", + fromBlock, + latest, + []common.Hash{ + vrf_coordinator_v2plus.VRFCoordinatorV2PlusRandomWordsFulfilled{}.Topic(), + }, + coordinatorAddress, + mock.Anything, + ).Return(fulfillmentLogs, nil) + + // Instantiate feeder. + feeder := NewFeeder( + logger.TestLogger(t), + coordinator, + &test.bhs, + lp, + 0, + test.wait, + test.lookback, + 600*time.Second, + func(ctx context.Context) (uint64, error) { + return test.latest, nil }) + + // Run feeder and assert correct results. + err = feeder.Run(testutils.Context(t)) + if test.expectedErrMsg == "" { + require.NoError(t, err) + } else { + require.EqualError(t, err, test.expectedErrMsg) } + require.ElementsMatch(t, test.expectedStored, test.bhs.Stored) + require.ElementsMatch(t, test.expectedStoredMapBlocks, maps.Keys(feeder.stored)) } func TestFeeder_CachesStoredBlocks(t *testing.T) { diff --git a/core/services/blockheaderfeeder/block_header_feeder_test.go b/core/services/blockheaderfeeder/block_header_feeder_test.go index 0e52ee9447d..6c1ec0946e7 100644 --- a/core/services/blockheaderfeeder/block_header_feeder_test.go +++ b/core/services/blockheaderfeeder/block_header_feeder_test.go @@ -16,25 +16,27 @@ import ( keystoremocks "github.com/smartcontractkit/chainlink/v2/core/services/keystore/mocks" ) +type testCase struct { + name string + requests []blockhashstore.Event + fulfillments []blockhashstore.Event + wait int + lookback int + latest uint64 + alreadyStored []uint64 + expectedStored []uint64 + expectedErrMsg string + getBatchSize uint16 + storeBatchSize uint16 + getBatchCallCount uint16 + storeBatchCallCount uint16 + storedEarliest bool + bhs blockhashstore.TestBHS + batchBHS blockhashstore.TestBatchBHS +} + func TestFeeder(t *testing.T) { - tests := []struct { - name string - requests []blockhashstore.Event - fulfillments []blockhashstore.Event - wait int - lookback int - latest uint64 - alreadyStored []uint64 - expectedStored []uint64 - expectedErrMsg string - getBatchSize uint16 - storeBatchSize uint16 - getBatchCallCount uint16 - storeBatchCallCount uint16 - storedEarliest bool - bhs blockhashstore.TestBHS - batchBHS blockhashstore.TestBatchBHS - }{ + tests := []testCase{ { name: "single missing block", requests: []blockhashstore.Event{{Block: 150, ID: "request"}}, @@ -182,51 +184,55 @@ func TestFeeder(t *testing.T) { } for _, test := range tests { - lggr := logger.TestLogger(t) - lggr.Debugf("running test case: %s", test.name) - coordinator := &blockhashstore.TestCoordinator{ - RequestEvents: test.requests, - FulfillmentEvents: test.fulfillments, - } + t.Run(test.name, test.testFeeder) + } +} - test.batchBHS.Stored = append(test.batchBHS.Stored, test.alreadyStored...) +func (test testCase) testFeeder(t *testing.T) { + lggr := logger.TestLogger(t) + lggr.Debugf("running test case: %s", test.name) + coordinator := &blockhashstore.TestCoordinator{ + RequestEvents: test.requests, + FulfillmentEvents: test.fulfillments, + } - blockHeaderProvider := &blockhashstore.TestBlockHeaderProvider{} - fromAddress := "0x469aA2CD13e037DC5236320783dCfd0e641c0559" - fromAddresses := []ethkey.EIP55Address{(ethkey.EIP55Address(fromAddress))} - ks := keystoremocks.NewEth(t) - ks.On("GetRoundRobinAddress", testutils.FixtureChainID, mock.Anything).Maybe().Return(common.HexToAddress(fromAddress), nil) + test.batchBHS.Stored = append(test.batchBHS.Stored, test.alreadyStored...) - feeder := NewBlockHeaderFeeder( - lggr, - coordinator, - &test.bhs, - &test.batchBHS, - blockHeaderProvider, - test.wait, - test.lookback, - func(ctx context.Context) (uint64, error) { - return test.latest, nil - }, - ks, - test.getBatchSize, - test.storeBatchSize, - fromAddresses, - testutils.FixtureChainID, - ) + blockHeaderProvider := &blockhashstore.TestBlockHeaderProvider{} + fromAddress := "0x469aA2CD13e037DC5236320783dCfd0e641c0559" + fromAddresses := []ethkey.EIP55Address{ethkey.EIP55Address(fromAddress)} + ks := keystoremocks.NewEth(t) + ks.On("GetRoundRobinAddress", testutils.FixtureChainID, mock.Anything).Maybe().Return(common.HexToAddress(fromAddress), nil) - err := feeder.Run(testutils.Context(t)) - if test.expectedErrMsg == "" { - require.NoError(t, err) - } else { - require.EqualError(t, err, test.expectedErrMsg) - } + feeder := NewBlockHeaderFeeder( + lggr, + coordinator, + &test.bhs, + &test.batchBHS, + blockHeaderProvider, + test.wait, + test.lookback, + func(ctx context.Context) (uint64, error) { + return test.latest, nil + }, + ks, + test.getBatchSize, + test.storeBatchSize, + fromAddresses, + testutils.FixtureChainID, + ) - require.ElementsMatch(t, test.expectedStored, test.batchBHS.Stored) - require.Equal(t, test.storedEarliest, test.bhs.StoredEarliest) - require.Equal(t, test.getBatchCallCount, test.batchBHS.GetBlockhashesCallCounter) - require.Equal(t, test.storeBatchCallCount, test.batchBHS.StoreVerifyHeaderCallCounter) + err := feeder.Run(testutils.Context(t)) + if test.expectedErrMsg == "" { + require.NoError(t, err) + } else { + require.EqualError(t, err, test.expectedErrMsg) } + + require.ElementsMatch(t, test.expectedStored, test.batchBHS.Stored) + require.Equal(t, test.storedEarliest, test.bhs.StoredEarliest) + require.Equal(t, test.getBatchCallCount, test.batchBHS.GetBlockhashesCallCounter) + require.Equal(t, test.storeBatchCallCount, test.batchBHS.StoreVerifyHeaderCallCounter) } func TestFeeder_CachesStoredBlocks(t *testing.T) { @@ -238,7 +244,7 @@ func TestFeeder_CachesStoredBlocks(t *testing.T) { batchBHS := &blockhashstore.TestBatchBHS{Stored: []uint64{75}} blockHeaderProvider := &blockhashstore.TestBlockHeaderProvider{} fromAddress := "0x469aA2CD13e037DC5236320783dCfd0e641c0559" - fromAddresses := []ethkey.EIP55Address{(ethkey.EIP55Address(fromAddress))} + fromAddresses := []ethkey.EIP55Address{ethkey.EIP55Address(fromAddress)} ks := keystoremocks.NewEth(t) ks.On("GetRoundRobinAddress", testutils.FixtureChainID, mock.Anything).Maybe().Return(common.HexToAddress(fromAddress), nil) diff --git a/core/services/cron/cron.go b/core/services/cron/cron.go index 56c67096e50..e89dd1ceabd 100644 --- a/core/services/cron/cron.go +++ b/core/services/cron/cron.go @@ -79,7 +79,7 @@ func (cr *Cron) runPipeline() { run := pipeline.NewRun(*cr.jobSpec.PipelineSpec, vars) - _, err := cr.pipelineRunner.Run(ctx, &run, cr.logger, false, nil) + _, err := cr.pipelineRunner.Run(ctx, run, cr.logger, false, nil) if err != nil { cr.logger.Errorf("Error executing new run for jobSpec ID %v", cr.jobSpec.ID) } diff --git a/core/services/directrequest/delegate.go b/core/services/directrequest/delegate.go index f0ba5276ce7..39564b8c8bd 100644 --- a/core/services/directrequest/delegate.go +++ b/core/services/directrequest/delegate.go @@ -370,7 +370,7 @@ func (l *listener) handleOracleRequest(request *operator_wrapper.OperatorOracleR }, }) run := pipeline.NewRun(*l.job.PipelineSpec, vars) - _, err := l.pipelineRunner.Run(ctx, &run, l.logger, true, func(tx pg.Queryer) error { + _, err := l.pipelineRunner.Run(ctx, run, l.logger, true, func(tx pg.Queryer) error { l.markLogConsumed(lb, pg.WithQueryer(tx)) return nil }) diff --git a/core/services/fluxmonitorv2/flux_monitor.go b/core/services/fluxmonitorv2/flux_monitor.go index 11e9b25be04..0b09655707d 100644 --- a/core/services/fluxmonitorv2/flux_monitor.go +++ b/core/services/fluxmonitorv2/flux_monitor.go @@ -769,7 +769,7 @@ func (fm *FluxMonitor) respondToNewRoundLog(log flux_aggregator_wrapper.FluxAggr } err = fm.q.Transaction(func(tx pg.Queryer) error { - if err2 := fm.runner.InsertFinishedRun(&run, false, pg.WithQueryer(tx)); err2 != nil { + if err2 := fm.runner.InsertFinishedRun(run, false, pg.WithQueryer(tx)); err2 != nil { return err2 } if err2 := fm.queueTransactionForTxm(tx, run.ID, answer, roundState.RoundId, &log); err2 != nil { @@ -993,7 +993,7 @@ func (fm *FluxMonitor) pollIfEligible(pollReq PollRequestType, deviationChecker } err = fm.q.Transaction(func(tx pg.Queryer) error { - if err2 := fm.runner.InsertFinishedRun(&run, true, pg.WithQueryer(tx)); err2 != nil { + if err2 := fm.runner.InsertFinishedRun(run, true, pg.WithQueryer(tx)); err2 != nil { return err2 } if err2 := fm.queueTransactionForTxm(tx, run.ID, answer, roundState.RoundId, nil); err2 != nil { diff --git a/core/services/fluxmonitorv2/flux_monitor_test.go b/core/services/fluxmonitorv2/flux_monitor_test.go index e165ce68205..27d40cd69c7 100644 --- a/core/services/fluxmonitorv2/flux_monitor_test.go +++ b/core/services/fluxmonitorv2/flux_monitor_test.go @@ -455,7 +455,7 @@ func TestFluxMonitor_PollIfEligible(t *testing.T) { }, }, ), mock.Anything). - Return(run, pipeline.TaskRunResults{ + Return(&run, pipeline.TaskRunResults{ { Result: pipeline.Result{ Value: decimal.NewFromInt(answers.polledAnswer), @@ -584,7 +584,7 @@ func TestPollingDeviationChecker_BuffersLogs(t *testing.T) { tm.orm.On("MostRecentFluxMonitorRoundID", contractAddress).Return(uint32(4), nil) // Round 1 - run := pipeline.Run{ID: 1} + run := &pipeline.Run{ID: 1} tm.orm. On("FindOrCreateFluxMonitorRoundStats", contractAddress, uint32(1), mock.Anything). Return(fluxmonitorv2.FluxMonitorRoundStatsV2{ @@ -624,7 +624,7 @@ func TestPollingDeviationChecker_BuffersLogs(t *testing.T) { Return(nil).Once() // Round 3 - run = pipeline.Run{ID: 2} + run = &pipeline.Run{ID: 2} tm.orm. On("FindOrCreateFluxMonitorRoundStats", contractAddress, uint32(3), mock.Anything). Return(fluxmonitorv2.FluxMonitorRoundStatsV2{ @@ -663,7 +663,7 @@ func TestPollingDeviationChecker_BuffersLogs(t *testing.T) { Return(nil).Once() // Round 4 - run = pipeline.Run{ID: 3} + run = &pipeline.Run{ID: 3} tm.orm. On("FindOrCreateFluxMonitorRoundStats", contractAddress, uint32(4), mock.Anything). Return(fluxmonitorv2.FluxMonitorRoundStatsV2{ @@ -1484,7 +1484,7 @@ func TestFluxMonitor_DoesNotDoubleSubmit(t *testing.T) { answer = 100 ) - run := pipeline.Run{ID: 1} + run := &pipeline.Run{ID: 1} tm.keyStore.On("EnabledKeysForChain", testutils.FixtureChainID).Return([]ethkey.KeyV2{{Address: nodeAddr}}, nil).Once() tm.logBroadcaster.On("IsConnected").Return(true).Maybe() @@ -1600,7 +1600,7 @@ func TestFluxMonitor_DoesNotDoubleSubmit(t *testing.T) { answer = 100 ) - run := pipeline.Run{ID: 1} + run := &pipeline.Run{ID: 1} tm.keyStore.On("EnabledKeysForChain", testutils.FixtureChainID).Return([]ethkey.KeyV2{{Address: nodeAddr}}, nil).Once() tm.logBroadcaster.On("IsConnected").Return(true).Maybe() @@ -1696,7 +1696,7 @@ func TestFluxMonitor_DoesNotDoubleSubmit(t *testing.T) { roundID = 3 answer = 100 ) - run := pipeline.Run{ID: 1} + run := &pipeline.Run{ID: 1} tm.keyStore.On("EnabledKeysForChain", testutils.FixtureChainID).Return([]ethkey.KeyV2{{Address: nodeAddr}}, nil).Once() tm.logBroadcaster.On("IsConnected").Return(true).Maybe() @@ -1901,7 +1901,7 @@ func TestFluxMonitor_DrumbeatTicker(t *testing.T) { }, }, ), mock.Anything). - Return(pipeline.Run{ID: runID}, pipeline.TaskRunResults{ + Return(&pipeline.Run{ID: runID}, pipeline.TaskRunResults{ { Result: pipeline.Result{ Value: decimal.NewFromInt(fetchedAnswer), diff --git a/core/services/keeper/upkeep_executer.go b/core/services/keeper/upkeep_executer.go index 249d3217469..435b245792c 100644 --- a/core/services/keeper/upkeep_executer.go +++ b/core/services/keeper/upkeep_executer.go @@ -223,7 +223,7 @@ func (ex *UpkeepExecuter) execute(upkeep UpkeepRegistration, head *evmtypes.Head ex.job.PipelineSpec.DotDagSource = pipeline.KeepersObservationSource run := pipeline.NewRun(*ex.job.PipelineSpec, vars) - if _, err := ex.pr.Run(ctxService, &run, svcLogger, true, nil); err != nil { + if _, err := ex.pr.Run(ctxService, run, svcLogger, true, nil); err != nil { svcLogger.Error(errors.Wrap(err, "failed executing run")) return } diff --git a/core/services/ocr/delegate.go b/core/services/ocr/delegate.go index 9cb736a58f3..9ed22d01e72 100644 --- a/core/services/ocr/delegate.go +++ b/core/services/ocr/delegate.go @@ -274,7 +274,7 @@ func (d *Delegate) ServicesForSpec(jb job.Job, qopts ...pg.QOpt) (services []job effectiveTransmitterAddress, ) - runResults := make(chan pipeline.Run, chain.Config().JobPipeline().ResultWriteQueueDepth()) + runResults := make(chan *pipeline.Run, chain.Config().JobPipeline().ResultWriteQueueDepth()) var configOverrider ocrtypes.ConfigOverrider configOverriderService, err := d.maybeCreateConfigOverrider(lggr, chain, concreteSpec.ContractAddress) diff --git a/core/services/ocr2/delegate.go b/core/services/ocr2/delegate.go index 2d1ff41ac12..4168ed6c61b 100644 --- a/core/services/ocr2/delegate.go +++ b/core/services/ocr2/delegate.go @@ -401,7 +401,7 @@ func (d *Delegate) ServicesForSpec(jb job.Job, qopts ...pg.QOpt) ([]job.ServiceC spec.CaptureEATelemetry = d.cfg.OCR2().CaptureEATelemetry() - runResults := make(chan pipeline.Run, d.cfg.JobPipeline().ResultWriteQueueDepth()) + runResults := make(chan *pipeline.Run, d.cfg.JobPipeline().ResultWriteQueueDepth()) ctx := lggrCtx.ContextWithValues(context.Background()) switch spec.PluginType { @@ -479,7 +479,7 @@ func (d *Delegate) newServicesMercury( ctx context.Context, lggr logger.SugaredLogger, jb job.Job, - runResults chan pipeline.Run, + runResults chan *pipeline.Run, bootstrapPeers []commontypes.BootstrapperLocator, kb ocr2key.KeyBundle, ocrDB *db, @@ -565,7 +565,7 @@ func (d *Delegate) newServicesMedian( ctx context.Context, lggr logger.SugaredLogger, jb job.Job, - runResults chan pipeline.Run, + runResults chan *pipeline.Run, bootstrapPeers []commontypes.BootstrapperLocator, kb ocr2key.KeyBundle, ocrDB *db, @@ -677,7 +677,7 @@ func (d *Delegate) newServicesDKG( func (d *Delegate) newServicesOCR2VRF( lggr logger.SugaredLogger, jb job.Job, - runResults chan pipeline.Run, + runResults chan *pipeline.Run, bootstrapPeers []commontypes.BootstrapperLocator, kb ocr2key.KeyBundle, ocrDB *db, @@ -865,7 +865,7 @@ func (d *Delegate) newServicesOCR2VRF( func (d *Delegate) newServicesOCR2Keepers( lggr logger.SugaredLogger, jb job.Job, - runResults chan pipeline.Run, + runResults chan *pipeline.Run, bootstrapPeers []commontypes.BootstrapperLocator, kb ocr2key.KeyBundle, ocrDB *db, @@ -895,7 +895,7 @@ func (d *Delegate) newServicesOCR2Keepers( func (d *Delegate) newServicesOCR2Keepers21( lggr logger.SugaredLogger, jb job.Job, - runResults chan pipeline.Run, + runResults chan *pipeline.Run, bootstrapPeers []commontypes.BootstrapperLocator, kb ocr2key.KeyBundle, ocrDB *db, @@ -1014,7 +1014,7 @@ func (d *Delegate) newServicesOCR2Keepers21( func (d *Delegate) newServicesOCR2Keepers20( lggr logger.SugaredLogger, jb job.Job, - runResults chan pipeline.Run, + runResults chan *pipeline.Run, bootstrapPeers []commontypes.BootstrapperLocator, kb ocr2key.KeyBundle, ocrDB *db, @@ -1148,7 +1148,7 @@ func (d *Delegate) newServicesOCR2Keepers20( func (d *Delegate) newServicesOCR2Functions( lggr logger.SugaredLogger, jb job.Job, - runResults chan pipeline.Run, + runResults chan *pipeline.Run, bootstrapPeers []commontypes.BootstrapperLocator, kb ocr2key.KeyBundle, functionsOcrDB *db, diff --git a/core/services/ocr2/plugins/median/services.go b/core/services/ocr2/plugins/median/services.go index e435ee747f5..562c0fa34f1 100644 --- a/core/services/ocr2/plugins/median/services.go +++ b/core/services/ocr2/plugins/median/services.go @@ -49,7 +49,7 @@ func NewMedianServices(ctx context.Context, isNewlyCreatedJob bool, relayer loop.Relayer, pipelineRunner pipeline.Runner, - runResults chan pipeline.Run, + runResults chan *pipeline.Run, lggr logger.Logger, argsNoPlugin libocr.OCR2OracleArgs, cfg MedianConfig, diff --git a/core/services/ocr2/plugins/mercury/plugin.go b/core/services/ocr2/plugins/mercury/plugin.go index 05e7e968f8b..31fb8ab8c4b 100644 --- a/core/services/ocr2/plugins/mercury/plugin.go +++ b/core/services/ocr2/plugins/mercury/plugin.go @@ -32,7 +32,7 @@ func NewServices( jb job.Job, ocr2Provider relaytypes.MercuryProvider, pipelineRunner pipeline.Runner, - runResults chan pipeline.Run, + runResults chan *pipeline.Run, lggr logger.Logger, argsNoPlugin libocr2.MercuryOracleArgs, cfg Config, diff --git a/core/services/ocr2/plugins/ocr2keeper/evm21/logprovider/recoverer.go b/core/services/ocr2/plugins/ocr2keeper/evm21/logprovider/recoverer.go index c3c1b99787b..3994f1d8413 100644 --- a/core/services/ocr2/plugins/ocr2keeper/evm21/logprovider/recoverer.go +++ b/core/services/ocr2/plugins/ocr2keeper/evm21/logprovider/recoverer.go @@ -375,7 +375,7 @@ func (r *logRecoverer) recoverFilter(ctx context.Context, f upkeepFilter, startB // If recoverer is lagging by a lot (more than 100x recoveryLogsBuffer), allow // a range of recoveryLogsBurst // Exploratory: Store lastRePollBlock in DB to prevent bursts during restarts - // (while also taking into account exisitng pending payloads) + // (while also taking into account existing pending payloads) end = start + recoveryLogsBurst } if end > offsetBlock { diff --git a/core/services/ocr2/plugins/ocr2keeper/evm21/registry_check_pipeline_test.go b/core/services/ocr2/plugins/ocr2keeper/evm21/registry_check_pipeline_test.go index cdb5e50b5b1..ee213643194 100644 --- a/core/services/ocr2/plugins/ocr2keeper/evm21/registry_check_pipeline_test.go +++ b/core/services/ocr2/plugins/ocr2keeper/evm21/registry_check_pipeline_test.go @@ -75,7 +75,7 @@ func TestRegistry_VerifyCheckBlock(t *testing.T) { tests := []struct { name string checkBlock *big.Int - latestBlock ocr2keepers.BlockKey + latestBlock *ocr2keepers.BlockKey upkeepId *big.Int checkHash common.Hash payload ocr2keepers.UpkeepPayload @@ -88,7 +88,7 @@ func TestRegistry_VerifyCheckBlock(t *testing.T) { { name: "for an invalid check block number, if hash does not match the check hash, return CheckBlockInvalid", checkBlock: big.NewInt(500), - latestBlock: ocr2keepers.BlockKey{Number: 560}, + latestBlock: &ocr2keepers.BlockKey{Number: 560}, upkeepId: big.NewInt(12345), checkHash: common.HexToHash("0x5bff03de234fe771ac0d685f9ee0fb0b757ea02ec9e6f10e8e2ee806db1b6b83"), payload: ocr2keepers.UpkeepPayload{ @@ -112,7 +112,7 @@ func TestRegistry_VerifyCheckBlock(t *testing.T) { { name: "for an invalid check block number, if hash does match the check hash, return NoPipelineError", checkBlock: big.NewInt(500), - latestBlock: ocr2keepers.BlockKey{Number: 560}, + latestBlock: &ocr2keepers.BlockKey{Number: 560}, upkeepId: big.NewInt(12345), checkHash: common.HexToHash("0x5bff03de234fe771ac0d685f9ee0fb0b757ea02ec9e6f10e8e2ee806db1b6b83"), payload: ocr2keepers.UpkeepPayload{ @@ -136,7 +136,7 @@ func TestRegistry_VerifyCheckBlock(t *testing.T) { { name: "check block hash does not match", checkBlock: big.NewInt(500), - latestBlock: ocr2keepers.BlockKey{Number: 560}, + latestBlock: &ocr2keepers.BlockKey{Number: 560}, upkeepId: big.NewInt(12345), checkHash: common.HexToHash("0x5bff03de234fe771ac0d685f9ee0fb0b757ea02ec9e6f10e8e2ee806db1b6b83"), payload: ocr2keepers.UpkeepPayload{ @@ -161,7 +161,7 @@ func TestRegistry_VerifyCheckBlock(t *testing.T) { { name: "check block is valid", checkBlock: big.NewInt(500), - latestBlock: ocr2keepers.BlockKey{Number: 560}, + latestBlock: &ocr2keepers.BlockKey{Number: 560}, upkeepId: big.NewInt(12345), checkHash: common.HexToHash("0x5bff03de234fe771ac0d685f9ee0fb0b757ea02ec9e6f10e8e2ee806db1b6b83"), payload: ocr2keepers.UpkeepPayload{ @@ -182,7 +182,7 @@ func TestRegistry_VerifyCheckBlock(t *testing.T) { latestBlock: atomic.Pointer[ocr2keepers.BlockKey]{}, blocks: tc.blocks, } - bs.latestBlock.Store(&tc.latestBlock) + bs.latestBlock.Store(tc.latestBlock) e := &EvmRegistry{ lggr: lggr, bs: bs, @@ -392,7 +392,7 @@ func TestRegistry_CheckUpkeeps(t *testing.T) { name string inputs []ocr2keepers.UpkeepPayload blocks map[int64]string - latestBlock ocr2keepers.BlockKey + latestBlock *ocr2keepers.BlockKey results []ocr2keepers.CheckResult err error ethCalls map[string]bool @@ -427,7 +427,7 @@ func TestRegistry_CheckUpkeeps(t *testing.T) { 570: "0x1222d75217e2dd461cc77e4091c37abe76277430d97f1963a822b4e94ebb83fc", 575: "0x9840e5b709bfccf6a1b44f34c884bc39403f57923f3f5ead6243cc090546b857", }, - latestBlock: ocr2keepers.BlockKey{Number: 580}, + latestBlock: &ocr2keepers.BlockKey{Number: 580}, results: []ocr2keepers.CheckResult{ { PipelineExecutionState: uint8(encoding.CheckBlockInvalid), @@ -494,7 +494,7 @@ func TestRegistry_CheckUpkeeps(t *testing.T) { latestBlock: atomic.Pointer[ocr2keepers.BlockKey]{}, blocks: tc.blocks, } - bs.latestBlock.Store(&tc.latestBlock) + bs.latestBlock.Store(tc.latestBlock) e := &EvmRegistry{ lggr: lggr, bs: bs, diff --git a/core/services/ocrcommon/data_source.go b/core/services/ocrcommon/data_source.go index 4ea7cb7a9a9..ed832e45fcf 100644 --- a/core/services/ocrcommon/data_source.go +++ b/core/services/ocrcommon/data_source.go @@ -35,7 +35,7 @@ type inMemoryDataSource struct { type dataSourceBase struct { inMemoryDataSource - runResults chan<- pipeline.Run + runResults chan<- *pipeline.Run } // dataSource implements dataSourceBase with the proper Observe return type for ocr1 @@ -55,7 +55,7 @@ type ObservationTimestamp struct { ConfigDigest string } -func NewDataSourceV1(pr pipeline.Runner, jb job.Job, spec pipeline.Spec, lggr logger.Logger, runResults chan<- pipeline.Run, chEnhancedTelemetry chan EnhancedTelemetryData) ocr1types.DataSource { +func NewDataSourceV1(pr pipeline.Runner, jb job.Job, spec pipeline.Spec, lggr logger.Logger, runResults chan<- *pipeline.Run, chEnhancedTelemetry chan EnhancedTelemetryData) ocr1types.DataSource { return &dataSource{ dataSourceBase: dataSourceBase{ inMemoryDataSource: inMemoryDataSource{ @@ -70,7 +70,7 @@ func NewDataSourceV1(pr pipeline.Runner, jb job.Job, spec pipeline.Spec, lggr lo } } -func NewDataSourceV2(pr pipeline.Runner, jb job.Job, spec pipeline.Spec, lggr logger.Logger, runResults chan<- pipeline.Run, enhancedTelemChan chan EnhancedTelemetryData) median.DataSource { +func NewDataSourceV2(pr pipeline.Runner, jb job.Job, spec pipeline.Spec, lggr logger.Logger, runResults chan<- *pipeline.Run, enhancedTelemChan chan EnhancedTelemetryData) median.DataSource { return &dataSourceV2{ dataSourceBase: dataSourceBase{ inMemoryDataSource: inMemoryDataSource{ @@ -113,7 +113,7 @@ func (ds *inMemoryDataSource) currentAnswer() (*big.Int, *big.Int) { // The context passed in here has a timeout of (ObservationTimeout + ObservationGracePeriod). // Upon context cancellation, its expected that we return any usable values within ObservationGracePeriod. -func (ds *inMemoryDataSource) executeRun(ctx context.Context, timestamp ObservationTimestamp) (pipeline.Run, pipeline.FinalResult, error) { +func (ds *inMemoryDataSource) executeRun(ctx context.Context, timestamp ObservationTimestamp) (*pipeline.Run, pipeline.FinalResult, error) { md, err := bridges.MarshalBridgeMetaData(ds.currentAnswer()) if err != nil { ds.lggr.Warnw("unable to attach metadata for run", "err", err) @@ -132,7 +132,7 @@ func (ds *inMemoryDataSource) executeRun(ctx context.Context, timestamp Observat run, trrs, err := ds.pipelineRunner.ExecuteRun(ctx, ds.spec, vars, ds.lggr) if err != nil { - return pipeline.Run{}, pipeline.FinalResult{}, errors.Wrapf(err, "error executing run for spec ID %v", ds.spec.ID) + return nil, pipeline.FinalResult{}, errors.Wrapf(err, "error executing run for spec ID %v", ds.spec.ID) } finalResult := trrs.FinalResult(ds.lggr) promSetBridgeParseMetrics(ds, &trrs) diff --git a/core/services/ocrcommon/data_source_test.go b/core/services/ocrcommon/data_source_test.go index f40637cd999..51a004f1f05 100644 --- a/core/services/ocrcommon/data_source_test.go +++ b/core/services/ocrcommon/data_source_test.go @@ -28,7 +28,7 @@ var ( func Test_InMemoryDataSource(t *testing.T) { runner := pipelinemocks.NewRunner(t) runner.On("ExecuteRun", mock.Anything, mock.AnythingOfType("pipeline.Spec"), mock.Anything, mock.Anything). - Return(pipeline.Run{}, pipeline.TaskRunResults{ + Return(&pipeline.Run{}, pipeline.TaskRunResults{ { Result: pipeline.Result{ Value: mockValue, @@ -65,7 +65,7 @@ func Test_InMemoryDataSourceWithProm(t *testing.T) { }}, []pipeline.Task{}, 2) runner.On("ExecuteRun", mock.Anything, mock.AnythingOfType("pipeline.Spec"), mock.Anything, mock.Anything). - Return(pipeline.Run{}, pipeline.TaskRunResults([]pipeline.TaskRunResult{ + Return(&pipeline.Run{}, pipeline.TaskRunResults([]pipeline.TaskRunResult{ { Task: &bridgeTask, Result: pipeline.Result{}, @@ -96,7 +96,7 @@ func Test_InMemoryDataSourceWithProm(t *testing.T) { func Test_NewDataSourceV2(t *testing.T) { runner := pipelinemocks.NewRunner(t) runner.On("ExecuteRun", mock.Anything, mock.AnythingOfType("pipeline.Spec"), mock.Anything, mock.Anything). - Return(pipeline.Run{}, pipeline.TaskRunResults{ + Return(&pipeline.Run{}, pipeline.TaskRunResults{ { Result: pipeline.Result{ Value: mockValue, @@ -106,18 +106,18 @@ func Test_NewDataSourceV2(t *testing.T) { }, }, nil) - resChan := make(chan pipeline.Run, 100) + resChan := make(chan *pipeline.Run, 100) ds := ocrcommon.NewDataSourceV2(runner, job.Job{}, pipeline.Spec{}, logger.TestLogger(t), resChan, nil) val, err := ds.Observe(testutils.Context(t), types.ReportTimestamp{}) require.NoError(t, err) - assert.Equal(t, mockValue, val.String()) // returns expected value after pipeline run - assert.Equal(t, pipeline.Run{}, <-resChan) // expected data properly passed to channel + assert.Equal(t, mockValue, val.String()) // returns expected value after pipeline run + assert.Equal(t, &pipeline.Run{}, <-resChan) // expected data properly passed to channel } func Test_NewDataSourceV1(t *testing.T) { runner := pipelinemocks.NewRunner(t) runner.On("ExecuteRun", mock.Anything, mock.AnythingOfType("pipeline.Spec"), mock.Anything, mock.Anything). - Return(pipeline.Run{}, pipeline.TaskRunResults{ + Return(&pipeline.Run{}, pipeline.TaskRunResults{ { Result: pipeline.Result{ Value: mockValue, @@ -127,10 +127,10 @@ func Test_NewDataSourceV1(t *testing.T) { }, }, nil) - resChan := make(chan pipeline.Run, 100) + resChan := make(chan *pipeline.Run, 100) ds := ocrcommon.NewDataSourceV1(runner, job.Job{}, pipeline.Spec{}, logger.TestLogger(t), resChan, nil) val, err := ds.Observe(testutils.Context(t), ocrtypes.ReportTimestamp{}) require.NoError(t, err) assert.Equal(t, mockValue, new(big.Int).Set(val).String()) // returns expected value after pipeline run - assert.Equal(t, pipeline.Run{}, <-resChan) // expected data properly passed to channel + assert.Equal(t, &pipeline.Run{}, <-resChan) // expected data properly passed to channel } diff --git a/core/services/ocrcommon/run_saver.go b/core/services/ocrcommon/run_saver.go index 7a7ea0c9d0a..3aa3aff876e 100644 --- a/core/services/ocrcommon/run_saver.go +++ b/core/services/ocrcommon/run_saver.go @@ -12,7 +12,7 @@ type RunResultSaver struct { utils.StartStopOnce maxSuccessfulRuns uint64 - runResults <-chan pipeline.Run + runResults <-chan *pipeline.Run pipelineRunner pipeline.Runner done chan struct{} logger logger.Logger @@ -24,7 +24,7 @@ func (r *RunResultSaver) HealthReport() map[string]error { func (r *RunResultSaver) Name() string { return r.logger.Name() } -func NewResultRunSaver(runResults <-chan pipeline.Run, pipelineRunner pipeline.Runner, done chan struct{}, +func NewResultRunSaver(runResults <-chan *pipeline.Run, pipelineRunner pipeline.Runner, done chan struct{}, logger logger.Logger, maxSuccessfulRuns uint64, ) *RunResultSaver { return &RunResultSaver{ @@ -51,7 +51,7 @@ func (r *RunResultSaver) Start(context.Context) error { r.logger.Tracew("RunSaver: saving job run", "run", run) // We do not want save successful TaskRuns as OCR runs very frequently so a lot of records // are produced and the successful TaskRuns do not provide value. - if err := r.pipelineRunner.InsertFinishedRun(&run, false); err != nil { + if err := r.pipelineRunner.InsertFinishedRun(run, false); err != nil { r.logger.Errorw("error inserting finished results", "err", err) } case <-r.done: @@ -73,7 +73,7 @@ func (r *RunResultSaver) Close() error { select { case run := <-r.runResults: r.logger.Infow("RunSaver: saving job run before exiting", "run", run) - if err := r.pipelineRunner.InsertFinishedRun(&run, false); err != nil { + if err := r.pipelineRunner.InsertFinishedRun(run, false); err != nil { r.logger.Errorw("error inserting finished results", "err", err) } default: diff --git a/core/services/ocrcommon/run_saver_test.go b/core/services/ocrcommon/run_saver_test.go index 0f24f93e97d..7d20a7a202e 100644 --- a/core/services/ocrcommon/run_saver_test.go +++ b/core/services/ocrcommon/run_saver_test.go @@ -14,7 +14,7 @@ import ( func TestRunSaver(t *testing.T) { pipelineRunner := mocks.NewRunner(t) - rr := make(chan pipeline.Run, 100) + rr := make(chan *pipeline.Run, 100) rs := NewResultRunSaver( rr, pipelineRunner, @@ -31,7 +31,7 @@ func TestRunSaver(t *testing.T) { args.Get(0).(*pipeline.Run).ID = int64(d) }). Once() - rr <- pipeline.Run{ID: int64(i)} + rr <- &pipeline.Run{ID: int64(i)} } require.NoError(t, rs.Close()) } diff --git a/core/services/ocrcommon/transmitter_pipeline.go b/core/services/ocrcommon/transmitter_pipeline.go index d07be5a5409..e62f745a941 100644 --- a/core/services/ocrcommon/transmitter_pipeline.go +++ b/core/services/ocrcommon/transmitter_pipeline.go @@ -81,7 +81,7 @@ func (t *pipelineTransmitter) CreateEthTransaction(ctx context.Context, toAddres t.spec.PipelineSpec.DotDagSource = txObservationSource run := pipeline.NewRun(*t.spec.PipelineSpec, vars) - if _, err := t.pr.Run(ctx, &run, t.lgr, true, nil); err != nil { + if _, err := t.pr.Run(ctx, run, t.lgr, true, nil); err != nil { return errors.Wrap(err, "Skipped OCR transmission") } diff --git a/core/services/pipeline/mocks/runner.go b/core/services/pipeline/mocks/runner.go index a43498c100e..e2cc70378e5 100644 --- a/core/services/pipeline/mocks/runner.go +++ b/core/services/pipeline/mocks/runner.go @@ -66,19 +66,21 @@ func (_m *Runner) ExecuteAndInsertFinishedRun(ctx context.Context, spec pipeline } // ExecuteRun provides a mock function with given fields: ctx, spec, vars, l -func (_m *Runner) ExecuteRun(ctx context.Context, spec pipeline.Spec, vars pipeline.Vars, l logger.Logger) (pipeline.Run, pipeline.TaskRunResults, error) { +func (_m *Runner) ExecuteRun(ctx context.Context, spec pipeline.Spec, vars pipeline.Vars, l logger.Logger) (*pipeline.Run, pipeline.TaskRunResults, error) { ret := _m.Called(ctx, spec, vars, l) - var r0 pipeline.Run + var r0 *pipeline.Run var r1 pipeline.TaskRunResults var r2 error - if rf, ok := ret.Get(0).(func(context.Context, pipeline.Spec, pipeline.Vars, logger.Logger) (pipeline.Run, pipeline.TaskRunResults, error)); ok { + if rf, ok := ret.Get(0).(func(context.Context, pipeline.Spec, pipeline.Vars, logger.Logger) (*pipeline.Run, pipeline.TaskRunResults, error)); ok { return rf(ctx, spec, vars, l) } - if rf, ok := ret.Get(0).(func(context.Context, pipeline.Spec, pipeline.Vars, logger.Logger) pipeline.Run); ok { + if rf, ok := ret.Get(0).(func(context.Context, pipeline.Spec, pipeline.Vars, logger.Logger) *pipeline.Run); ok { r0 = rf(ctx, spec, vars, l) } else { - r0 = ret.Get(0).(pipeline.Run) + if ret.Get(0) != nil { + r0 = ret.Get(0).(*pipeline.Run) + } } if rf, ok := ret.Get(1).(func(context.Context, pipeline.Spec, pipeline.Vars, logger.Logger) pipeline.TaskRunResults); ok { diff --git a/core/services/pipeline/runner.go b/core/services/pipeline/runner.go index 7a755e8fe11..3366a177ba8 100644 --- a/core/services/pipeline/runner.go +++ b/core/services/pipeline/runner.go @@ -38,7 +38,7 @@ type Runner interface { // ExecuteRun executes a new run in-memory according to a spec and returns the results. // We expect spec.JobID and spec.JobName to be set for logging/prometheus. - ExecuteRun(ctx context.Context, spec Spec, vars Vars, l logger.Logger) (run Run, trrs TaskRunResults, err error) + ExecuteRun(ctx context.Context, spec Spec, vars Vars, l logger.Logger) (run *Run, trrs TaskRunResults, err error) // InsertFinishedRun saves the run results in the database. InsertFinishedRun(run *Run, saveSuccessfulTaskRuns bool, qopts ...pg.QOpt) error InsertFinishedRuns(runs []*Run, saveSuccessfulTaskRuns bool, qopts ...pg.QOpt) error @@ -196,8 +196,8 @@ func (err ErrRunPanicked) Error() string { return fmt.Sprintf("goroutine panicked when executing run: %v", err.v) } -func NewRun(spec Spec, vars Vars) Run { - return Run{ +func NewRun(spec Spec, vars Vars) *Run { + return &Run{ State: RunStatusRunning, PipelineSpec: spec, PipelineSpecID: spec.ID, @@ -218,16 +218,16 @@ func (r *runner) ExecuteRun( spec Spec, vars Vars, l logger.Logger, -) (Run, TaskRunResults, error) { +) (*Run, TaskRunResults, error) { run := NewRun(spec, vars) - pipeline, err := r.initializePipeline(&run) + pipeline, err := r.initializePipeline(run) if err != nil { return run, nil, err } - taskRunResults := r.run(ctx, pipeline, &run, vars, l) + taskRunResults := r.run(ctx, pipeline, run, vars, l) if run.Pending { return run, nil, pkgerrors.Wrapf(err, "unexpected async run for spec ID %v, tried executing via ExecuteAndInsertFinishedRun", spec.ID) @@ -505,7 +505,7 @@ func (r *runner) ExecuteAndInsertFinishedRun(ctx context.Context, spec Spec, var return 0, finalResult, nil } - if err = r.orm.InsertFinishedRun(&run, saveSuccessfulTaskRuns); err != nil { + if err = r.orm.InsertFinishedRun(run, saveSuccessfulTaskRuns); err != nil { return 0, finalResult, pkgerrors.Wrapf(err, "error inserting finished results for spec ID %v", spec.ID) } return run.ID, finalResult, nil diff --git a/core/services/pipeline/runner_test.go b/core/services/pipeline/runner_test.go index 2554315a46c..22b70829ba5 100644 --- a/core/services/pipeline/runner_test.go +++ b/core/services/pipeline/runner_test.go @@ -635,7 +635,7 @@ ds5 [type=http method="GET" url="%s" index=2] }).Once() orm.On("StoreRun", mock.AnythingOfType("*pipeline.Run"), mock.Anything).Return(false, nil).Once() lggr := logger.TestLogger(t) - incomplete, err := r.Run(testutils.Context(t), &run, lggr, false, nil) + incomplete, err := r.Run(testutils.Context(t), run, lggr, false, nil) require.NoError(t, err) require.Len(t, run.PipelineTaskRuns, 9) // 3 tasks are suspended: ds1_parse, ds1_multiply, median. ds1 is present, but contains ErrPending require.Equal(t, true, incomplete) // still incomplete @@ -644,7 +644,7 @@ ds5 [type=http method="GET" url="%s" index=2] // Trigger run resumption with no new data orm.On("StoreRun", mock.AnythingOfType("*pipeline.Run")).Return(false, nil).Once() - incomplete, err = r.Run(testutils.Context(t), &run, lggr, false, nil) + incomplete, err = r.Run(testutils.Context(t), run, lggr, false, nil) require.NoError(t, err) require.Equal(t, true, incomplete) // still incomplete @@ -657,7 +657,7 @@ ds5 [type=http method="GET" url="%s" index=2] } // Trigger run resumption orm.On("StoreRun", mock.AnythingOfType("*pipeline.Run"), mock.Anything).Return(false, nil).Once() - incomplete, err = r.Run(testutils.Context(t), &run, lggr, false, nil) + incomplete, err = r.Run(testutils.Context(t), run, lggr, false, nil) require.NoError(t, err) require.Equal(t, false, incomplete) // done require.Len(t, run.PipelineTaskRuns, 12) @@ -773,7 +773,7 @@ ds5 [type=http method="GET" url="%s" index=2] }).Once() // StoreRun is called again to store the final result orm.On("StoreRun", mock.AnythingOfType("*pipeline.Run"), mock.Anything).Return(false, nil).Once() - incomplete, err := r.Run(testutils.Context(t), &run, logger.TestLogger(t), false, nil) + incomplete, err := r.Run(testutils.Context(t), run, logger.TestLogger(t), false, nil) require.NoError(t, err) require.Len(t, run.PipelineTaskRuns, 12) require.Equal(t, false, incomplete) // run is complete diff --git a/core/services/pipeline/scheduler_test.go b/core/services/pipeline/scheduler_test.go index bbb9ee80b9b..1d7da59da9d 100644 --- a/core/services/pipeline/scheduler_test.go +++ b/core/services/pipeline/scheduler_test.go @@ -135,7 +135,7 @@ func TestScheduler(t *testing.T) { require.NoError(t, err) vars := NewVarsFrom(nil) run := NewRun(Spec{}, vars) - s := newScheduler(p, &run, vars, logger.TestLogger(t)) + s := newScheduler(p, run, vars, logger.TestLogger(t)) go s.Run() diff --git a/core/services/relay/evm/mercury/mocks/pipeline.go b/core/services/relay/evm/mercury/mocks/pipeline.go index 317404e4409..f553ba98509 100644 --- a/core/services/relay/evm/mercury/mocks/pipeline.go +++ b/core/services/relay/evm/mercury/mocks/pipeline.go @@ -13,8 +13,8 @@ type MockRunner struct { Err error } -func (m *MockRunner) ExecuteRun(ctx context.Context, spec pipeline.Spec, vars pipeline.Vars, l logger.Logger) (run pipeline.Run, trrs pipeline.TaskRunResults, err error) { - return pipeline.Run{ID: 42}, m.Trrs, m.Err +func (m *MockRunner) ExecuteRun(ctx context.Context, spec pipeline.Spec, vars pipeline.Vars, l logger.Logger) (run *pipeline.Run, trrs pipeline.TaskRunResults, err error) { + return &pipeline.Run{ID: 42}, m.Trrs, m.Err } var _ pipeline.Task = &MockTask{} diff --git a/core/services/relay/evm/mercury/v1/data_source.go b/core/services/relay/evm/mercury/v1/data_source.go index ff7a2c0ab7f..5c1f55ddab7 100644 --- a/core/services/relay/evm/mercury/v1/data_source.go +++ b/core/services/relay/evm/mercury/v1/data_source.go @@ -26,7 +26,7 @@ import ( ) type Runner interface { - ExecuteRun(ctx context.Context, spec pipeline.Spec, vars pipeline.Vars, l logger.Logger) (run pipeline.Run, trrs pipeline.TaskRunResults, err error) + ExecuteRun(ctx context.Context, spec pipeline.Spec, vars pipeline.Vars, l logger.Logger) (run *pipeline.Run, trrs pipeline.TaskRunResults, err error) } // Fetcher fetcher data from Mercury server @@ -40,7 +40,7 @@ type datasource struct { jb job.Job spec pipeline.Spec lggr logger.Logger - runResults chan<- pipeline.Run + runResults chan<- *pipeline.Run orm types.DataSourceORM codec reportcodec.ReportCodec feedID [32]byte @@ -55,7 +55,7 @@ type datasource struct { var _ relaymercuryv1.DataSource = &datasource{} -func NewDataSource(orm types.DataSourceORM, pr pipeline.Runner, jb job.Job, spec pipeline.Spec, lggr logger.Logger, rr chan pipeline.Run, enhancedTelemChan chan ocrcommon.EnhancedTelemetryMercuryData, chainHeadTracker types.ChainHeadTracker, fetcher Fetcher, initialBlockNumber *int64, feedID [32]byte) *datasource { +func NewDataSource(orm types.DataSourceORM, pr pipeline.Runner, jb job.Job, spec pipeline.Spec, lggr logger.Logger, rr chan *pipeline.Run, enhancedTelemChan chan ocrcommon.EnhancedTelemetryMercuryData, chainHeadTracker types.ChainHeadTracker, fetcher Fetcher, initialBlockNumber *int64, feedID [32]byte) *datasource { return &datasource{pr, jb, spec, lggr, rr, orm, reportcodec.ReportCodec{}, feedID, sync.RWMutex{}, enhancedTelemChan, chainHeadTracker, fetcher, initialBlockNumber} } @@ -115,7 +115,7 @@ func (ds *datasource) Observe(ctx context.Context, repts ocrtypes.ReportTimestam wg.Add(1) go func() { defer wg.Done() - var run pipeline.Run + var run *pipeline.Run run, trrs, err = ds.executeRun(ctx) if err != nil { err = fmt.Errorf("Observe failed while executing run: %w", err) @@ -238,7 +238,7 @@ func setAsk(o *parseOutput, res pipeline.Result) error { // The context passed in here has a timeout of (ObservationTimeout + ObservationGracePeriod). // Upon context cancellation, its expected that we return any usable values within ObservationGracePeriod. -func (ds *datasource) executeRun(ctx context.Context) (pipeline.Run, pipeline.TaskRunResults, error) { +func (ds *datasource) executeRun(ctx context.Context) (*pipeline.Run, pipeline.TaskRunResults, error) { vars := pipeline.NewVarsFrom(map[string]interface{}{ "jb": map[string]interface{}{ "databaseID": ds.jb.ID, @@ -249,7 +249,7 @@ func (ds *datasource) executeRun(ctx context.Context) (pipeline.Run, pipeline.Ta run, trrs, err := ds.pipelineRunner.ExecuteRun(ctx, ds.spec, vars, ds.lggr) if err != nil { - return pipeline.Run{}, nil, pkgerrors.Wrapf(err, "error executing run for spec ID %v", ds.spec.ID) + return nil, nil, pkgerrors.Wrapf(err, "error executing run for spec ID %v", ds.spec.ID) } return run, trrs, err diff --git a/core/services/relay/evm/mercury/v1/data_source_test.go b/core/services/relay/evm/mercury/v1/data_source_test.go index a0932990f02..6e460951301 100644 --- a/core/services/relay/evm/mercury/v1/data_source_test.go +++ b/core/services/relay/evm/mercury/v1/data_source_test.go @@ -308,8 +308,8 @@ func TestMercury_Observe(t *testing.T) { trrs[i].Result.Value = "123" trrs[i].Result.Error = nil } + ch := make(chan *pipeline.Run, 1) - ch := make(chan pipeline.Run, 1) ds.runResults = ch _, err := ds.Observe(ctx, repts, false) diff --git a/core/services/relay/evm/mercury/v2/data_source.go b/core/services/relay/evm/mercury/v2/data_source.go index 632278a3c56..caeae8d278a 100644 --- a/core/services/relay/evm/mercury/v2/data_source.go +++ b/core/services/relay/evm/mercury/v2/data_source.go @@ -25,7 +25,7 @@ import ( ) type Runner interface { - ExecuteRun(ctx context.Context, spec pipeline.Spec, vars pipeline.Vars, l logger.Logger) (run pipeline.Run, trrs pipeline.TaskRunResults, err error) + ExecuteRun(ctx context.Context, spec pipeline.Spec, vars pipeline.Vars, l logger.Logger) (run *pipeline.Run, trrs pipeline.TaskRunResults, err error) } type LatestReportFetcher interface { @@ -39,7 +39,7 @@ type datasource struct { spec pipeline.Spec feedID mercuryutils.FeedID lggr logger.Logger - runResults chan<- pipeline.Run + runResults chan<- *pipeline.Run orm types.DataSourceORM codec reportcodec.ReportCodec @@ -54,7 +54,7 @@ type datasource struct { var _ relaymercuryv2.DataSource = &datasource{} -func NewDataSource(orm types.DataSourceORM, pr pipeline.Runner, jb job.Job, spec pipeline.Spec, feedID mercuryutils.FeedID, lggr logger.Logger, rr chan pipeline.Run, enhancedTelemChan chan ocrcommon.EnhancedTelemetryMercuryData, fetcher LatestReportFetcher, linkFeedID, nativeFeedID mercuryutils.FeedID) *datasource { +func NewDataSource(orm types.DataSourceORM, pr pipeline.Runner, jb job.Job, spec pipeline.Spec, feedID mercuryutils.FeedID, lggr logger.Logger, rr chan *pipeline.Run, enhancedTelemChan chan ocrcommon.EnhancedTelemetryMercuryData, fetcher LatestReportFetcher, linkFeedID, nativeFeedID mercuryutils.FeedID) *datasource { return &datasource{pr, jb, spec, feedID, lggr, rr, orm, reportcodec.ReportCodec{}, fetcher, linkFeedID, nativeFeedID, sync.RWMutex{}, enhancedTelemChan} } @@ -84,7 +84,7 @@ func (ds *datasource) Observe(ctx context.Context, repts ocrtypes.ReportTimestam go func() { defer wg.Done() var trrs pipeline.TaskRunResults - var run pipeline.Run + var run *pipeline.Run run, trrs, err = ds.executeRun(ctx) if err != nil { cancel() @@ -218,7 +218,7 @@ func setBenchmarkPrice(o *parseOutput, res pipeline.Result) error { // The context passed in here has a timeout of (ObservationTimeout + ObservationGracePeriod). // Upon context cancellation, its expected that we return any usable values within ObservationGracePeriod. -func (ds *datasource) executeRun(ctx context.Context) (pipeline.Run, pipeline.TaskRunResults, error) { +func (ds *datasource) executeRun(ctx context.Context) (*pipeline.Run, pipeline.TaskRunResults, error) { vars := pipeline.NewVarsFrom(map[string]interface{}{ "jb": map[string]interface{}{ "databaseID": ds.jb.ID, @@ -229,7 +229,7 @@ func (ds *datasource) executeRun(ctx context.Context) (pipeline.Run, pipeline.Ta run, trrs, err := ds.pipelineRunner.ExecuteRun(ctx, ds.spec, vars, ds.lggr) if err != nil { - return pipeline.Run{}, nil, pkgerrors.Wrapf(err, "error executing run for spec ID %v", ds.spec.ID) + return nil, nil, pkgerrors.Wrapf(err, "error executing run for spec ID %v", ds.spec.ID) } return run, trrs, err diff --git a/core/services/relay/evm/mercury/v3/data_source.go b/core/services/relay/evm/mercury/v3/data_source.go index 8d3895cd62b..79f6c536efd 100644 --- a/core/services/relay/evm/mercury/v3/data_source.go +++ b/core/services/relay/evm/mercury/v3/data_source.go @@ -26,7 +26,7 @@ import ( ) type Runner interface { - ExecuteRun(ctx context.Context, spec pipeline.Spec, vars pipeline.Vars, l logger.Logger) (run pipeline.Run, trrs pipeline.TaskRunResults, err error) + ExecuteRun(ctx context.Context, spec pipeline.Spec, vars pipeline.Vars, l logger.Logger) (run *pipeline.Run, trrs pipeline.TaskRunResults, err error) } type LatestReportFetcher interface { @@ -40,7 +40,7 @@ type datasource struct { spec pipeline.Spec feedID mercuryutils.FeedID lggr logger.Logger - runResults chan<- pipeline.Run + runResults chan<- *pipeline.Run orm types.DataSourceORM codec reportcodec.ReportCodec @@ -55,7 +55,7 @@ type datasource struct { var _ relaymercuryv3.DataSource = &datasource{} -func NewDataSource(orm types.DataSourceORM, pr pipeline.Runner, jb job.Job, spec pipeline.Spec, feedID mercuryutils.FeedID, lggr logger.Logger, rr chan pipeline.Run, enhancedTelemChan chan ocrcommon.EnhancedTelemetryMercuryData, fetcher LatestReportFetcher, linkFeedID, nativeFeedID mercuryutils.FeedID) *datasource { +func NewDataSource(orm types.DataSourceORM, pr pipeline.Runner, jb job.Job, spec pipeline.Spec, feedID mercuryutils.FeedID, lggr logger.Logger, rr chan *pipeline.Run, enhancedTelemChan chan ocrcommon.EnhancedTelemetryMercuryData, fetcher LatestReportFetcher, linkFeedID, nativeFeedID mercuryutils.FeedID) *datasource { return &datasource{pr, jb, spec, feedID, lggr, rr, orm, reportcodec.ReportCodec{}, fetcher, linkFeedID, nativeFeedID, sync.RWMutex{}, enhancedTelemChan} } @@ -85,7 +85,7 @@ func (ds *datasource) Observe(ctx context.Context, repts ocrtypes.ReportTimestam go func() { defer wg.Done() var trrs pipeline.TaskRunResults - var run pipeline.Run + var run *pipeline.Run run, trrs, err = ds.executeRun(ctx) if err != nil { cancel() @@ -256,7 +256,7 @@ func setAsk(o *parseOutput, res pipeline.Result) error { // The context passed in here has a timeout of (ObservationTimeout + ObservationGracePeriod). // Upon context cancellation, its expected that we return any usable values within ObservationGracePeriod. -func (ds *datasource) executeRun(ctx context.Context) (pipeline.Run, pipeline.TaskRunResults, error) { +func (ds *datasource) executeRun(ctx context.Context) (*pipeline.Run, pipeline.TaskRunResults, error) { vars := pipeline.NewVarsFrom(map[string]interface{}{ "jb": map[string]interface{}{ "databaseID": ds.jb.ID, @@ -267,7 +267,7 @@ func (ds *datasource) executeRun(ctx context.Context) (pipeline.Run, pipeline.Ta run, trrs, err := ds.pipelineRunner.ExecuteRun(ctx, ds.spec, vars, ds.lggr) if err != nil { - return pipeline.Run{}, nil, pkgerrors.Wrapf(err, "error executing run for spec ID %v", ds.spec.ID) + return nil, nil, pkgerrors.Wrapf(err, "error executing run for spec ID %v", ds.spec.ID) } return run, trrs, err diff --git a/core/services/transmission/integration_test.go b/core/services/transmission/integration_test.go index 3aa025f0ae0..0484b1d8cd6 100644 --- a/core/services/transmission/integration_test.go +++ b/core/services/transmission/integration_test.go @@ -63,7 +63,7 @@ func deployTransmissionUniverse(t *testing.T) *EntryPointUniverse { holder1Key := cltest.MustGenerateRandomKey(t) t.Log("Holder key:", holder1Key.String()) - // Construct simulated blockchain environmnet. + // Construct simulated blockchain environment. holder1Transactor, err := bind.NewKeyedTransactorWithChainID(holder1Key.ToEcdsaPrivKey(), testutils.SimulatedChainID) require.NoError(t, err) var ( diff --git a/core/services/vrf/v1/listener_v1.go b/core/services/vrf/v1/listener_v1.go index 39aec367b4a..03b92bc15cb 100644 --- a/core/services/vrf/v1/listener_v1.go +++ b/core/services/vrf/v1/listener_v1.go @@ -429,7 +429,7 @@ func (lsn *Listener) ProcessRequest(ctx context.Context, req request) bool { run := pipeline.NewRun(*lsn.Job.PipelineSpec, vars) // The VRF pipeline has no async tasks, so we don't need to check for `incomplete` - if _, err = lsn.PipelineRunner.Run(ctx, &run, lggr, true, func(tx pg.Queryer) error { + if _, err = lsn.PipelineRunner.Run(ctx, run, lggr, true, func(tx pg.Queryer) error { // Always mark consumed regardless of whether the proof failed or not. if err = lsn.LogBroadcaster.MarkConsumed(req.lb, pg.WithQueryer(tx)); err != nil { lggr.Errorw("Failed mark consumed", "err", err) diff --git a/core/services/vrf/v2/integration_v2_plus_test.go b/core/services/vrf/v2/integration_v2_plus_test.go index 8923bfac643..7e05c5b347c 100644 --- a/core/services/vrf/v2/integration_v2_plus_test.go +++ b/core/services/vrf/v2/integration_v2_plus_test.go @@ -253,7 +253,7 @@ func newVRFCoordinatorV2PlusUniverse(t *testing.T, key ethkey.KeyV2, numConsumer big.NewInt(1e16), // 0.01 eth per link fallbackLinkPrice vrf_coordinator_v2plus.VRFCoordinatorV2PlusFeeConfig{ FulfillmentFlatFeeLinkPPM: uint32(1000), // 0.001 LINK premium - FulfillmentFlatFeeEthPPM: uint32(5), // 0.000005 ETH preimum + FulfillmentFlatFeeEthPPM: uint32(5), // 0.000005 ETH premium }, ) require.NoError(t, err, "failed to set coordinator configuration") diff --git a/core/services/vrf/v2/integration_v2_test.go b/core/services/vrf/v2/integration_v2_test.go index f4054333808..33e613733d5 100644 --- a/core/services/vrf/v2/integration_v2_test.go +++ b/core/services/vrf/v2/integration_v2_test.go @@ -803,7 +803,7 @@ func mineBatch(t *testing.T, requestIDs []*big.Int, subID *big.Int, backend *bac require.NoError(t, err) for _, tx := range txs { var evmTx txmgr.Tx - txmgr.DbEthTxToEthTx(tx, &evmTx) + tx.ToTx(&evmTx) meta, err := evmTx.GetMeta() require.NoError(t, err) for _, requestID := range meta.RequestIDs { @@ -2105,7 +2105,8 @@ func TestStartingCountsV1(t *testing.T) { sql := `INSERT INTO evm.txes (nonce, from_address, to_address, encoded_payload, value, gas_limit, state, created_at, broadcast_at, initial_broadcast_at, meta, subject, evm_chain_id, min_confirmations, pipeline_task_run_id) VALUES (:nonce, :from_address, :to_address, :encoded_payload, :value, :gas_limit, :state, :created_at, :broadcast_at, :initial_broadcast_at, :meta, :subject, :evm_chain_id, :min_confirmations, :pipeline_task_run_id);` for _, tx := range append(confirmedTxes, unconfirmedTxes...) { - dbEtx := txmgr.DbEthTxFromEthTx(&tx) + var dbEtx txmgr.DbEthTx + dbEtx.FromTx(&tx) //nolint:gosec // just copying fields _, err = db.NamedExec(sql, &dbEtx) require.NoError(t, err) } @@ -2143,10 +2144,10 @@ VALUES (:nonce, :from_address, :to_address, :encoded_payload, :value, :gas_limit sql = `INSERT INTO evm.tx_attempts (eth_tx_id, gas_price, signed_raw_tx, hash, state, created_at, chain_specific_gas_limit) VALUES (:eth_tx_id, :gas_price, :signed_raw_tx, :hash, :state, :created_at, :chain_specific_gas_limit)` for _, attempt := range txAttempts { - dbAttempt := txmgr.DbEthTxAttemptFromEthTxAttempt(&attempt) //nolint:gosec // just copying fields + var dbAttempt txmgr.DbEthTxAttempt + dbAttempt.FromTxAttempt(&attempt) //nolint:gosec // just copying fields _, err = db.NamedExec(sql, &dbAttempt) require.NoError(t, err) - txmgr.DbEthTxAttemptToEthTxAttempt(dbAttempt, &attempt) //nolint:gosec // just copying fields } // add evm.receipts @@ -2164,7 +2165,7 @@ VALUES (:nonce, :from_address, :to_address, :encoded_payload, :value, :gas_limit sql = `INSERT INTO evm.receipts (block_hash, tx_hash, block_number, transaction_index, receipt, created_at) VALUES (:block_hash, :tx_hash, :block_number, :transaction_index, :receipt, :created_at)` for _, r := range receipts { - _, err := db.NamedExec(sql, &r) + _, err := db.NamedExec(sql, r) require.NoError(t, err) } diff --git a/core/services/vrf/v2/listener_v2.go b/core/services/vrf/v2/listener_v2.go index 047a1e6e29a..ff915fba34e 100644 --- a/core/services/vrf/v2/listener_v2.go +++ b/core/services/vrf/v2/listener_v2.go @@ -181,7 +181,7 @@ type vrfPipelineResult struct { // fundsNeeded indicates a "minimum balance" in juels or wei that must be held in the // subscription's account in order to fulfill the request. fundsNeeded *big.Int - run pipeline.Run + run *pipeline.Run payload string gasLimit uint32 req pendingRequest @@ -1098,7 +1098,7 @@ func (lsn *listenerV2) processRequestsPerSubHelper( ll.Infow("Enqueuing fulfillment") var transaction txmgr.Tx err = lsn.q.Transaction(func(tx pg.Queryer) error { - if err = lsn.pipelineRunner.InsertFinishedRun(&p.run, true, pg.WithQueryer(tx)); err != nil { + if err = lsn.pipelineRunner.InsertFinishedRun(p.run, true, pg.WithQueryer(tx)); err != nil { return err } if err = lsn.logBroadcaster.MarkConsumed(p.req.lb, pg.WithQueryer(tx)); err != nil { diff --git a/core/services/vrf/v2/listener_v2_types.go b/core/services/vrf/v2/listener_v2_types.go index 4ad645ac17c..e8c3a8ccb13 100644 --- a/core/services/vrf/v2/listener_v2_types.go +++ b/core/services/vrf/v2/listener_v2_types.go @@ -41,7 +41,7 @@ func newBatchFulfillment(result vrfPipelineResult, fromAddress common.Address, v }, totalGasLimit: result.gasLimit, runs: []*pipeline.Run{ - &result.run, + result.run, }, reqIDs: []*big.Int{ result.req.req.RequestID(), @@ -95,7 +95,7 @@ func (b *batchFulfillments) addRun(result vrfPipelineResult, fromAddress common. currBatch.proofs = append(currBatch.proofs, result.proof) currBatch.commitments = append(currBatch.commitments, result.reqCommitment) currBatch.totalGasLimit += result.gasLimit - currBatch.runs = append(currBatch.runs, &result.run) + currBatch.runs = append(currBatch.runs, result.run) currBatch.reqIDs = append(currBatch.reqIDs, result.req.req.RequestID()) currBatch.lbs = append(currBatch.lbs, result.req.lb) currBatch.maxFees = append(currBatch.maxFees, result.maxFee) diff --git a/core/services/webhook/delegate.go b/core/services/webhook/delegate.go index e373ff8087a..ca85a4d1621 100644 --- a/core/services/webhook/delegate.go +++ b/core/services/webhook/delegate.go @@ -172,7 +172,7 @@ func (r *webhookJobRunner) RunJob(ctx context.Context, jobUUID uuid.UUID, reques run := pipeline.NewRun(*spec.PipelineSpec, vars) - _, err := r.runner.Run(ctx, &run, jobLggr, true, nil) + _, err := r.runner.Run(ctx, run, jobLggr, true, nil) if err != nil { jobLggr.Errorw("Error running pipeline for webhook job", "err", err) return 0, err diff --git a/core/store/models/common_test.go b/core/store/models/common_test.go index 1f514142b80..57b7ca73c6b 100644 --- a/core/store/models/common_test.go +++ b/core/store/models/common_test.go @@ -203,7 +203,7 @@ func TestDuration_MarshalJSON(t *testing.T) { } for _, test := range tests { t.Run(test.name, func(t *testing.T) { - b, err := json.Marshal(&test.input) + b, err := json.Marshal(test.input) assert.NoError(t, err) assert.Equal(t, test.want, string(b)) }) diff --git a/core/utils/big_test.go b/core/utils/big_test.go index ca8be3f90b8..e46d46a0651 100644 --- a/core/utils/big_test.go +++ b/core/utils/big_test.go @@ -19,7 +19,7 @@ func TestBigFloatMarshal(t *testing.T) { } for _, tc := range tests { - buf, err := json.Marshal(&tc.obj) + buf, err := json.Marshal(tc.obj) require.NoError(t, err) assert.Equal(t, tc.exp, string(buf)) } diff --git a/core/web/bridge_types_controller_test.go b/core/web/bridge_types_controller_test.go index c875df94539..7184b05f5e0 100644 --- a/core/web/bridge_types_controller_test.go +++ b/core/web/bridge_types_controller_test.go @@ -105,7 +105,8 @@ func TestValidateBridgeType(t *testing.T) { for _, test := range tests { t.Run(test.description, func(t *testing.T) { - result := web.ValidateBridgeType(&test.request) + req := test.request + result := web.ValidateBridgeType(&req) assert.Equal(t, test.want, result) }) } diff --git a/core/web/pipeline_job_spec_errors_controller_test.go b/core/web/pipeline_job_spec_errors_controller_test.go index 13c02379674..8ec77a84f05 100644 --- a/core/web/pipeline_job_spec_errors_controller_test.go +++ b/core/web/pipeline_job_spec_errors_controller_test.go @@ -23,7 +23,7 @@ func TestPipelineJobSpecErrorsController_Delete_2(t *testing.T) { j, err := app.JobORM().FindJob(testutils.Context(t), jID) require.NoError(t, err) t.Log(j.JobSpecErrors) - require.GreaterOrEqual(t, len(j.JobSpecErrors), 1) // second 'got nil head' error may have occured also + require.GreaterOrEqual(t, len(j.JobSpecErrors), 1) // second 'got nil head' error may have occurred also var id int64 = -1 for i := range j.JobSpecErrors { jse := j.JobSpecErrors[i]