Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[BCI-3730] - Remove dependence on MinConfirmations in EVM TXM code #13973

Open
wants to merge 59 commits into
base: develop
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
59 commits
Select commit Hold shift + click to select a range
fcc4ef5
inject trimmed HT API with LatestAndFinalizedBlock
Farber98 Jul 31, 2024
18665f5
types changes
Farber98 Jul 31, 2024
c7f79c6
tests
Farber98 Jul 31, 2024
d56cafc
Merge branch 'develop' into BCI-3730-remove-minconfirmations-evm-txm
Farber98 Jul 31, 2024
2df05f7
mocks
Farber98 Jul 31, 2024
df0eeef
refactor
huangzhen1997 Jul 31, 2024
2ae25fa
update goland format to fix lint, add changeset file
huangzhen1997 Aug 1, 2024
eec4380
update changeset
huangzhen1997 Aug 1, 2024
82156bf
update
huangzhen1997 Aug 1, 2024
92b5974
fix test
huangzhen1997 Aug 1, 2024
9c28bbb
more test fix
huangzhen1997 Aug 1, 2024
cdb12d3
fix integration test, and added important comments
huangzhen1997 Aug 1, 2024
d30eb3a
fix commit signing
huangzhen1997 Aug 1, 2024
fe3eb8b
update comments
huangzhen1997 Aug 2, 2024
8b34faa
rephrase comments
huangzhen1997 Aug 2, 2024
ab755c1
undo typo
huangzhen1997 Aug 2, 2024
c32d061
small refactor
huangzhen1997 Aug 2, 2024
be447e8
update comments
huangzhen1997 Aug 2, 2024
e9f5874
Merge branch 'develop' into BCI-3730-remove-minconfirmations-evm-txm
huangzhen1997 Aug 2, 2024
c9033db
fix
huangzhen1997 Aug 2, 2024
a4557a4
remove multiline comments
huangzhen1997 Aug 2, 2024
2a0c8f7
Address PR comments
huangzhen1997 Aug 2, 2024
1370e51
fix test, update mock
huangzhen1997 Aug 2, 2024
77d5a55
fix lint, and remove unused
huangzhen1997 Aug 2, 2024
424253b
fix test
huangzhen1997 Aug 2, 2024
0783aab
update tests
huangzhen1997 Aug 3, 2024
3c9b965
cleanup more test
huangzhen1997 Aug 3, 2024
5818d5c
formatting
huangzhen1997 Aug 3, 2024
df44a3a
Merge branch 'develop' into BCI-3730-remove-minconfirmations-evm-txm
huangzhen1997 Aug 3, 2024
daab2d6
rename
huangzhen1997 Aug 3, 2024
76a9510
Merge branch 'BCI-3730-remove-minconfirmations-evm-txm' of github.com…
huangzhen1997 Aug 3, 2024
913533c
Merge branch 'develop' into BCI-3730-remove-minconfirmations-evm-txm
huangzhen1997 Aug 6, 2024
ca6a2d4
update after finalizer merged
huangzhen1997 Aug 6, 2024
9d3f425
Merge branch 'develop' into BCI-3730-remove-minconfirmations-evm-txm
huangzhen1997 Aug 6, 2024
943224d
Merge branch 'develop' into BCI-3730-remove-minconfirmations-evm-txm
huangzhen1997 Aug 7, 2024
09bf8ca
address comments
huangzhen1997 Aug 8, 2024
2799272
merge conflict
huangzhen1997 Aug 8, 2024
995636a
Merge branch 'develop' into BCI-3730-remove-minconfirmations-evm-txm
huangzhen1997 Aug 8, 2024
67a5f8a
fix unit tests
huangzhen1997 Aug 8, 2024
7091ba8
Merge branch 'develop' into BCI-3730-remove-minconfirmations-evm-txm
huangzhen1997 Aug 8, 2024
cdcbb99
fix test attempt
huangzhen1997 Aug 8, 2024
82fde8d
update test
huangzhen1997 Aug 8, 2024
acc3a8a
refactor TransactionStore interface
huangzhen1997 Aug 9, 2024
e0573cb
revert some temp changes
huangzhen1997 Aug 9, 2024
c24408b
one more
huangzhen1997 Aug 9, 2024
bc148c9
revert
huangzhen1997 Aug 9, 2024
2b195c7
Amit comments, more work needed
huangzhen1997 Aug 9, 2024
755593c
fix test
huangzhen1997 Aug 9, 2024
f8463ac
add one deleted test back
huangzhen1997 Aug 9, 2024
e2332fb
fix lint
huangzhen1997 Aug 9, 2024
b45fb5d
fix test attempt
huangzhen1997 Aug 9, 2024
07f5cc3
revert
huangzhen1997 Aug 9, 2024
8b45996
fix test
huangzhen1997 Aug 10, 2024
ae734e9
fix race condition for feature test, and fix simulated backend endpoint
huangzhen1997 Aug 12, 2024
f025054
add minConfirmation field in db
huangzhen1997 Aug 12, 2024
be13b71
Merge branch 'develop' into BCI-3730-remove-minconfirmations-evm-txm
huangzhen1997 Aug 12, 2024
85d8a92
add comment
huangzhen1997 Aug 12, 2024
f5094bd
Merge branch 'BCI-3730-remove-minconfirmations-evm-txm' of github.com…
huangzhen1997 Aug 12, 2024
2fdff68
Merge branch 'develop' of github.com:smartcontractkit/chainlink into …
huangzhen1997 Sep 3, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .changeset/tidy-planets-kneel.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"chainlink": patch
---

Remove dependency of MinConfirmations in EVM TXM code #update #deprecation_notice
7 changes: 3 additions & 4 deletions common/txmgr/confirmer.go
Original file line number Diff line number Diff line change
Expand Up @@ -346,7 +346,7 @@ func (ec *Confirmer[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) pro

if ec.resumeCallback != nil {
mark = time.Now()
if err := ec.ResumePendingTaskRuns(ctx, head); err != nil {
if err := ec.ResumePendingTaskRuns(ctx); err != nil {
return fmt.Errorf("ResumePendingTaskRuns failed: %w", err)
}

Expand Down Expand Up @@ -1231,9 +1231,8 @@ func (ec *Confirmer[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) sen
}

// ResumePendingTaskRuns issues callbacks to task runs that are pending waiting for receipts
func (ec *Confirmer[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) ResumePendingTaskRuns(ctx context.Context, head types.Head[BLOCK_HASH]) error {
receiptsPlus, err := ec.txStore.FindTxesPendingCallback(ctx, head.BlockNumber(), ec.chainID)

func (ec *Confirmer[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) ResumePendingTaskRuns(ctx context.Context) error {
receiptsPlus, err := ec.txStore.FindTxesPendingCallback(ctx, ec.chainID)
if err != nil {
return err
}
Expand Down
29 changes: 14 additions & 15 deletions common/txmgr/types/mocks/tx_store.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion common/txmgr/types/tx.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,8 +83,9 @@ type TxRequest[ADDR types.Hashable, TX_HASH types.Hashable] struct {

// Pipeline variables - if you aren't calling this from chain tx task within
// the pipeline, you don't need these variables
MinConfirmations clnull.Uint32
PipelineTaskRunID *uuid.UUID
// deprecated
MinConfirmations clnull.Uint32

Strategy TxStrategy

Expand Down
4 changes: 2 additions & 2 deletions common/txmgr/types/tx_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,8 @@ type TxStore[
TxHistoryReaper[CHAIN_ID]
TransactionStore[ADDR, CHAIN_ID, TX_HASH, BLOCK_HASH, SEQ, FEE]

// Find confirmed txes beyond the minConfirmations param that require callback but have not yet been signaled
FindTxesPendingCallback(ctx context.Context, blockNum int64, chainID CHAIN_ID) (receiptsPlus []ReceiptPlus[R], err error)
// Find finalized txes that require callback but have not yet been signaled
FindTxesPendingCallback(ctx context.Context, chainID CHAIN_ID) (receiptsPlus []ReceiptPlus[R], err error)
// Update tx to mark that its callback has been signaled
UpdateTxCallbackCompleted(ctx context.Context, pipelineTaskRunRid uuid.UUID, chainId CHAIN_ID) error
SaveFetchedReceipts(ctx context.Context, r []R, state TxState, errorMsg *string, chainID CHAIN_ID) error
Expand Down
19 changes: 11 additions & 8 deletions core/chains/evm/client/simulated_backend_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -703,14 +703,17 @@ func (c *SimulatedBackendClient) ethGetHeaderByNumber(ctx context.Context, resul
}

func (c *SimulatedBackendClient) LatestFinalizedBlock(ctx context.Context) (*evmtypes.Head, error) {
block := c.b.Blockchain().CurrentFinalBlock()
return &evmtypes.Head{
EVMChainID: ubig.NewI(c.chainId.Int64()),
Hash: block.Hash(),
Number: block.Number.Int64(),
ParentHash: block.ParentHash,
Timestamp: time.Unix(int64(block.Time), 0),
}, nil
if block := c.b.Blockchain().CurrentFinalBlock(); block != nil {
return &evmtypes.Head{
EVMChainID: ubig.NewI(c.chainId.Int64()),
Hash: block.Hash(),
Number: block.Number.Int64(),
ParentHash: block.ParentHash,
Timestamp: time.Unix(int64(block.Time), 0),
}, nil
}

return nil, fmt.Errorf("SimulatedBackendClient failed to find latest finalized block")
}

func (c *SimulatedBackendClient) ethGetLogs(ctx context.Context, result interface{}, args ...interface{}) error {
Expand Down
66 changes: 32 additions & 34 deletions core/chains/evm/txmgr/confirmer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3033,9 +3033,6 @@ func TestEthConfirmer_ResumePendingRuns(t *testing.T) {
},
},
}

minConfirmations := int64(2)

pgtest.MustExec(t, db, `SET CONSTRAINTS fk_pipeline_runs_pruning_key DEFERRED`)
pgtest.MustExec(t, db, `SET CONSTRAINTS pipeline_runs_pipeline_spec_id_fkey DEFERRED`)

Expand All @@ -3049,16 +3046,16 @@ func TestEthConfirmer_ResumePendingRuns(t *testing.T) {
tr := cltest.MustInsertUnfinishedPipelineTaskRun(t, db, run.ID)

etx := cltest.MustInsertConfirmedEthTxWithLegacyAttempt(t, txStore, 1, 1, fromAddress)
mustInsertEthReceipt(t, txStore, head.Number-minConfirmations, head.Hash, etx.TxAttempts[0].Hash)
mustInsertEthReceipt(t, txStore, head.Number, head.Hash, etx.TxAttempts[0].Hash)
// Setting both signal_callback and callback_completed to TRUE to simulate a completed pipeline task
// It would only be in a state past suspended if the resume callback was called and callback_completed was set to TRUE
pgtest.MustExec(t, db, `UPDATE evm.txes SET pipeline_task_run_id = $1, min_confirmations = $2, signal_callback = TRUE, callback_completed = TRUE WHERE id = $3`, &tr.ID, minConfirmations, etx.ID)
pgtest.MustExec(t, db, `UPDATE evm.txes SET pipeline_task_run_id = $1, signal_callback = TRUE, callback_completed = TRUE WHERE id = $2`, &tr.ID, etx.ID)

err := ec.ResumePendingTaskRuns(tests.Context(t), &head)
err := ec.ResumePendingTaskRuns(tests.Context(t))
require.NoError(t, err)
})

t.Run("doesn't process task runs where the receipt is younger than minConfirmations", func(t *testing.T) {
t.Run("doesn't process task runs where the receipt is not finalized", func(t *testing.T) {
ec := newEthConfirmer(t, txStore, ethClient, config, evmcfg, ethKeyStore, func(context.Context, uuid.UUID, interface{}, error) error {
t.Fatal("No value expected")
return nil
Expand All @@ -3070,13 +3067,13 @@ func TestEthConfirmer_ResumePendingRuns(t *testing.T) {
etx := cltest.MustInsertConfirmedEthTxWithLegacyAttempt(t, txStore, 2, 1, fromAddress)
mustInsertEthReceipt(t, txStore, head.Number, head.Hash, etx.TxAttempts[0].Hash)

pgtest.MustExec(t, db, `UPDATE evm.txes SET pipeline_task_run_id = $1, min_confirmations = $2, signal_callback = TRUE WHERE id = $3`, &tr.ID, minConfirmations, etx.ID)
pgtest.MustExec(t, db, `UPDATE evm.txes SET pipeline_task_run_id = $1, signal_callback = TRUE, callback_completed = TRUE WHERE id = $2`, &tr.ID, etx.ID)

err := ec.ResumePendingTaskRuns(tests.Context(t), &head)
err := ec.ResumePendingTaskRuns(tests.Context(t))
require.NoError(t, err)
})

t.Run("processes eth_txes with receipts older than minConfirmations", func(t *testing.T) {
t.Run("processes eth_txes with receipts for finalized tx", func(t *testing.T) {
ch := make(chan interface{})
nonce := evmtypes.Nonce(3)
var err error
Expand All @@ -3090,25 +3087,25 @@ func TestEthConfirmer_ResumePendingRuns(t *testing.T) {
tr := cltest.MustInsertUnfinishedPipelineTaskRun(t, db, run.ID)
pgtest.MustExec(t, db, `UPDATE pipeline_runs SET state = 'suspended' WHERE id = $1`, run.ID)

etx := cltest.MustInsertConfirmedEthTxWithLegacyAttempt(t, txStore, int64(nonce), 1, fromAddress)
etx := cltest.MustInsertFinalizedEthTxWithLegacyAttempt(t, txStore, int64(nonce), 1, fromAddress)
pgtest.MustExec(t, db, `UPDATE evm.txes SET meta='{"FailOnRevert": true}'`)
receipt := mustInsertEthReceipt(t, txStore, head.Number-minConfirmations, head.Hash, etx.TxAttempts[0].Hash)
receipt := mustInsertEthReceipt(t, txStore, head.Number, head.Hash, etx.TxAttempts[0].Hash)

pgtest.MustExec(t, db, `UPDATE evm.txes SET pipeline_task_run_id = $1, min_confirmations = $2, signal_callback = TRUE WHERE id = $3`, &tr.ID, minConfirmations, etx.ID)
pgtest.MustExec(t, db, `UPDATE evm.txes SET pipeline_task_run_id = $1, signal_callback = TRUE WHERE id = $2`, &tr.ID, etx.ID)

done := make(chan struct{})
t.Cleanup(func() { <-done })
go func() {
defer close(done)
err2 := ec.ResumePendingTaskRuns(tests.Context(t), &head)
err2 := ec.ResumePendingTaskRuns(tests.Context(t))
if !assert.NoError(t, err2) {
return
}
// Retrieve Tx to check if callback completed flag was set to true
updateTx, err3 := txStore.FindTxWithSequence(tests.Context(t), fromAddress, nonce)
if assert.NoError(t, err3) {
assert.Equal(t, true, updateTx.CallbackCompleted)
}
txs, err3 := txStore.FindTxesByFromAddressAndNonce(tests.Context(t), fromAddress, int64(nonce))
assert.Nil(t, err3)
assert.Equal(t, 1, len(txs))
assert.Equal(t, true, txs[0].CallbackCompleted)
Comment on lines +3105 to +3108
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why was this changed?

Copy link
Contributor

@huangzhen1997 huangzhen1997 Sep 3, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Due to the new logic in confirmer, we only fetch finalized tx for callback ,FindTxWithSequence doesn't work anymore because it returns only the state IN ('confirmed', 'confirmed_missing_receipt', 'unconfirmed')

}()

select {
Expand All @@ -3126,7 +3123,7 @@ func TestEthConfirmer_ResumePendingRuns(t *testing.T) {

pgtest.MustExec(t, db, `DELETE FROM pipeline_runs`)

t.Run("processes eth_txes with receipt older than minConfirmations that reverted", func(t *testing.T) {
t.Run("processes eth_txes with receipt for finalized tx that reverted", func(t *testing.T) {
type data struct {
value any
error
Expand All @@ -3142,27 +3139,27 @@ func TestEthConfirmer_ResumePendingRuns(t *testing.T) {
tr := cltest.MustInsertUnfinishedPipelineTaskRun(t, db, run.ID)
pgtest.MustExec(t, db, `UPDATE pipeline_runs SET state = 'suspended' WHERE id = $1`, run.ID)

etx := cltest.MustInsertConfirmedEthTxWithLegacyAttempt(t, txStore, int64(nonce), 1, fromAddress)
etx := cltest.MustInsertFinalizedEthTxWithLegacyAttempt(t, txStore, int64(nonce), 1, fromAddress)
pgtest.MustExec(t, db, `UPDATE evm.txes SET meta='{"FailOnRevert": true}'`)

// receipt is not passed through as a value since it reverted and caused an error
mustInsertRevertedEthReceipt(t, txStore, head.Number-minConfirmations, head.Hash, etx.TxAttempts[0].Hash)
mustInsertRevertedEthReceipt(t, txStore, head.Number-2, head.Hash, etx.TxAttempts[0].Hash)

pgtest.MustExec(t, db, `UPDATE evm.txes SET pipeline_task_run_id = $1, min_confirmations = $2, signal_callback = TRUE WHERE id = $3`, &tr.ID, minConfirmations, etx.ID)
pgtest.MustExec(t, db, `UPDATE evm.txes SET pipeline_task_run_id = $1, signal_callback = TRUE WHERE id = $2`, &tr.ID, etx.ID)

done := make(chan struct{})
t.Cleanup(func() { <-done })
go func() {
defer close(done)
err2 := ec.ResumePendingTaskRuns(tests.Context(t), &head)
err2 := ec.ResumePendingTaskRuns(tests.Context(t))
if !assert.NoError(t, err2) {
return
}
// Retrieve Tx to check if callback completed flag was set to true
updateTx, err3 := txStore.FindTxWithSequence(tests.Context(t), fromAddress, nonce)
if assert.NoError(t, err3) {
assert.Equal(t, true, updateTx.CallbackCompleted)
}
txs, err := txStore.FindTxesByFromAddressAndNonce(tests.Context(t), fromAddress, int64(nonce))
assert.Nil(t, err)
assert.Equal(t, 1, len(txs))
assert.Equal(t, true, txs[0].CallbackCompleted)
}()

select {
Expand All @@ -3187,17 +3184,18 @@ func TestEthConfirmer_ResumePendingRuns(t *testing.T) {
run := cltest.MustInsertPipelineRun(t, db)
tr := cltest.MustInsertUnfinishedPipelineTaskRun(t, db, run.ID)

etx := cltest.MustInsertConfirmedEthTxWithLegacyAttempt(t, txStore, int64(nonce), 1, fromAddress)
mustInsertEthReceipt(t, txStore, head.Number-minConfirmations, head.Hash, etx.TxAttempts[0].Hash)
pgtest.MustExec(t, db, `UPDATE evm.txes SET pipeline_task_run_id = $1, min_confirmations = $2, signal_callback = TRUE WHERE id = $3`, &tr.ID, minConfirmations, etx.ID)
etx := cltest.MustInsertFinalizedEthTxWithLegacyAttempt(t, txStore, int64(nonce), 1, fromAddress)
mustInsertEthReceipt(t, txStore, head.Number, head.Hash, etx.TxAttempts[0].Hash)
pgtest.MustExec(t, db, `UPDATE evm.txes SET pipeline_task_run_id = $1, signal_callback = TRUE WHERE id = $2`, &tr.ID, etx.ID)

err := ec.ResumePendingTaskRuns(tests.Context(t), &head)
err := ec.ResumePendingTaskRuns(tests.Context(t))
require.Error(t, err)

// Retrieve Tx to check if callback completed flag was left unchanged
updateTx, err := txStore.FindTxWithSequence(tests.Context(t), fromAddress, nonce)
require.NoError(t, err)
require.Equal(t, false, updateTx.CallbackCompleted)
txs, err := txStore.FindTxesByFromAddressAndNonce(tests.Context(t), fromAddress, int64(nonce))
assert.Nil(t, err)
assert.Equal(t, 1, len(txs))
assert.Equal(t, false, txs[0].CallbackCompleted)
})
}

Expand Down
25 changes: 19 additions & 6 deletions core/chains/evm/txmgr/evm_tx_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ type TestEvmTxStore interface {
GetAllTxAttempts(ctx context.Context) (attempts []TxAttempt, err error)
CountTxesByStateAndSubject(ctx context.Context, state txmgrtypes.TxState, subject uuid.UUID) (count int, err error)
FindTxesByFromAddressAndState(ctx context.Context, fromAddress common.Address, state string) (txes []*Tx, err error)
FindTxesByFromAddressAndNonce(ctx context.Context, fromAddress common.Address, nonce int64) (txes []*Tx, err error)
UpdateTxAttemptBroadcastBeforeBlockNum(ctx context.Context, id int64, blockNum uint) error
}

Expand Down Expand Up @@ -250,7 +251,6 @@ func (db DbEthTx) ToTx(tx *Tx) {
tx.Meta = db.Meta
tx.Subject = db.Subject
tx.PipelineTaskRunID = db.PipelineTaskRunID
tx.MinConfirmations = db.MinConfirmations
tx.ChainID = db.EVMChainID.ToInt()
tx.TransmitChecker = db.TransmitChecker
tx.InitialBroadcastAt = db.InitialBroadcastAt
Expand Down Expand Up @@ -1056,19 +1056,20 @@ WHERE evm.tx_attempts.state = 'in_progress' AND evm.txes.from_address = $1 AND e
}

// Find confirmed txes requiring callback but have not yet been signaled
func (o *evmTxStore) FindTxesPendingCallback(ctx context.Context, blockNum int64, chainID *big.Int) (receiptsPlus []ReceiptPlus, err error) {
func (o *evmTxStore) FindTxesPendingCallback(ctx context.Context, chainID *big.Int) (receiptsPlus []ReceiptPlus, err error) {
var rs []dbReceiptPlus

var cancel context.CancelFunc
ctx, cancel = o.stopCh.Ctx(ctx)
defer cancel()

err = o.q.SelectContext(ctx, &rs, `
SELECT evm.txes.pipeline_task_run_id, evm.receipts.receipt, COALESCE((evm.txes.meta->>'FailOnRevert')::boolean, false) "FailOnRevert" FROM evm.txes
INNER JOIN evm.tx_attempts ON evm.txes.id = evm.tx_attempts.eth_tx_id
INNER JOIN evm.receipts ON evm.tx_attempts.hash = evm.receipts.tx_hash
WHERE evm.txes.pipeline_task_run_id IS NOT NULL AND evm.txes.signal_callback = TRUE AND evm.txes.callback_completed = FALSE
AND evm.receipts.block_number <= ($1 - evm.txes.min_confirmations) AND evm.txes.evm_chain_id = $2
`, blockNum, chainID.String())
AND evm.txes.state = 'finalized' AND evm.txes.evm_chain_id = $1
`, chainID.String())
if err != nil {
return nil, fmt.Errorf("failed to retrieve transactions pending pipeline resume callback: %w", err)
}
Expand Down Expand Up @@ -1852,10 +1853,10 @@ func (o *evmTxStore) CreateTransaction(ctx context.Context, txRequest TxRequest,
err = orm.q.GetContext(ctx, &dbEtx, `
INSERT INTO evm.txes (from_address, to_address, encoded_payload, value, gas_limit, state, created_at, meta, subject, evm_chain_id, min_confirmations, pipeline_task_run_id, transmit_checker, idempotency_key, signal_callback)
VALUES (
$1,$2,$3,$4,$5,'unstarted',NOW(),$6,$7,$8,$9,$10,$11,$12,$13
$1,$2,$3,$4,$5,'unstarted',NOW(),$6,$7,$8,NULL,$9,$10,$11,$12
)
RETURNING "txes".*
`, txRequest.FromAddress, txRequest.ToAddress, txRequest.EncodedPayload, assets.Eth(txRequest.Value), txRequest.FeeLimit, txRequest.Meta, txRequest.Strategy.Subject(), chainID.String(), txRequest.MinConfirmations, txRequest.PipelineTaskRunID, txRequest.Checker, txRequest.IdempotencyKey, txRequest.SignalCallback)
`, txRequest.FromAddress, txRequest.ToAddress, txRequest.EncodedPayload, assets.Eth(txRequest.Value), txRequest.FeeLimit, txRequest.Meta, txRequest.Strategy.Subject(), chainID.String(), txRequest.PipelineTaskRunID, txRequest.Checker, txRequest.IdempotencyKey, txRequest.SignalCallback)
if err != nil {
return pkgerrors.Wrap(err, "CreateEthTransaction failed to insert evm tx")
}
Expand Down Expand Up @@ -2100,6 +2101,18 @@ func (o *evmTxStore) FindTxesByFromAddressAndState(ctx context.Context, fromAddr
return txes, err
}

func (o *evmTxStore) FindTxesByFromAddressAndNonce(ctx context.Context, fromAddress common.Address, nonce int64) (txes []*Tx, err error) {
var cancel context.CancelFunc
ctx, cancel = o.stopCh.Ctx(ctx)
defer cancel()
sql := "SELECT * FROM evm.txes WHERE from_address = $1 AND nonce = $2"
var dbEtxs []DbEthTx
err = o.q.SelectContext(ctx, &dbEtxs, sql, fromAddress, nonce)
txes = make([]*Tx, len(dbEtxs))
dbEthTxsToEvmEthTxPtrs(dbEtxs, txes)
return txes, err
}

func (o *evmTxStore) UpdateTxAttemptBroadcastBeforeBlockNum(ctx context.Context, id int64, blockNum uint) error {
var cancel context.CancelFunc
ctx, cancel = o.stopCh.Ctx(ctx)
Expand Down
Loading
Loading