Skip to content

Commit

Permalink
update after finalizer merged
Browse files Browse the repository at this point in the history
  • Loading branch information
huangzhen1997 committed Aug 6, 2024
1 parent 913533c commit ca6a2d4
Show file tree
Hide file tree
Showing 7 changed files with 40 additions and 43 deletions.
6 changes: 3 additions & 3 deletions common/txmgr/confirmer.go
Original file line number Diff line number Diff line change
Expand Up @@ -326,7 +326,7 @@ func (ec *Confirmer[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) pro

if ec.resumeCallback != nil {
mark = time.Now()
if err := ec.ResumePendingTaskRuns(ctx, head.BlockNumber()); err != nil {
if err := ec.ResumePendingTaskRuns(ctx); err != nil {
return fmt.Errorf("ResumePendingTaskRuns failed: %w", err)
}

Expand Down Expand Up @@ -1188,8 +1188,8 @@ func (ec *Confirmer[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) sen
}

// ResumePendingTaskRuns issues callbacks to task runs that are pending waiting for receipts
func (ec *Confirmer[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) ResumePendingTaskRuns(ctx context.Context, headNum int64) error {
receiptsPlus, err := ec.txStore.FindTxesPendingCallback(ctx, headNum, ec.chainID)
func (ec *Confirmer[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) ResumePendingTaskRuns(ctx context.Context) error {
receiptsPlus, err := ec.txStore.FindTxesPendingCallback(ctx, ec.chainID)
if err != nil {
return err
}
Expand Down
29 changes: 14 additions & 15 deletions common/txmgr/types/mocks/tx_store.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion common/txmgr/types/tx_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ type TxStore[
TransactionStore[ADDR, CHAIN_ID, TX_HASH, BLOCK_HASH, SEQ, FEE]

// Find confirmed txes with finalized state that require callback but have not yet been signaled
FindTxesPendingCallback(ctx context.Context, blockNum int64, chainID CHAIN_ID) (receiptsPlus []ReceiptPlus[R], err error)
FindTxesPendingCallback(ctx context.Context, chainID CHAIN_ID) (receiptsPlus []ReceiptPlus[R], err error)
// Update tx to mark that its callback has been signaled
UpdateTxCallbackCompleted(ctx context.Context, pipelineTaskRunRid uuid.UUID, chainId CHAIN_ID) error
SaveFetchedReceipts(ctx context.Context, r []R, state TxState, errorMsg *string, chainID CHAIN_ID) error
Expand Down
8 changes: 4 additions & 4 deletions core/chains/evm/txmgr/confirmer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2976,7 +2976,7 @@ func TestEthConfirmer_ResumePendingRuns(t *testing.T) {
// It would only be in a state past suspended if the resume callback was called and callback_completed was set to TRUE
pgtest.MustExec(t, db, `UPDATE evm.txes SET pipeline_task_run_id = $1, signal_callback = TRUE, callback_completed = TRUE WHERE id = $2`, &tr.ID, etx.ID)

err := ec.ResumePendingTaskRuns(tests.Context(t), head.Number)
err := ec.ResumePendingTaskRuns(tests.Context(t))
require.NoError(t, err)
})

Expand Down Expand Up @@ -3004,7 +3004,7 @@ func TestEthConfirmer_ResumePendingRuns(t *testing.T) {
t.Cleanup(func() { <-done })
go func() {
defer close(done)
err2 := ec.ResumePendingTaskRuns(tests.Context(t), head.Number)
err2 := ec.ResumePendingTaskRuns(tests.Context(t))
if !assert.NoError(t, err2) {
return
}
Expand Down Expand Up @@ -3058,7 +3058,7 @@ func TestEthConfirmer_ResumePendingRuns(t *testing.T) {
t.Cleanup(func() { <-done })
go func() {
defer close(done)
err2 := ec.ResumePendingTaskRuns(tests.Context(t), head.Number)
err2 := ec.ResumePendingTaskRuns(tests.Context(t))
if !assert.NoError(t, err2) {
return
}
Expand Down Expand Up @@ -3095,7 +3095,7 @@ func TestEthConfirmer_ResumePendingRuns(t *testing.T) {
mustInsertEthReceipt(t, txStore, head.Number, head.Hash, etx.TxAttempts[0].Hash)
pgtest.MustExec(t, db, `UPDATE evm.txes SET pipeline_task_run_id = $1, signal_callback = TRUE WHERE id = $2`, &tr.ID, etx.ID)

err := ec.ResumePendingTaskRuns(tests.Context(t), head.Number)
err := ec.ResumePendingTaskRuns(tests.Context(t))
require.Error(t, err)

// Retrieve Tx to check if callback completed flag was left unchanged
Expand Down
7 changes: 3 additions & 4 deletions core/chains/evm/txmgr/evm_tx_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -1015,21 +1015,20 @@ WHERE evm.tx_attempts.state = 'in_progress' AND evm.txes.from_address = $1 AND e
}

// Find confirmed txes requiring callback but have not yet been signaled
func (o *evmTxStore) FindTxesPendingCallback(ctx context.Context, blockNum int64, chainID *big.Int) (receiptsPlus []ReceiptPlus, err error) {
func (o *evmTxStore) FindTxesPendingCallback(ctx context.Context, chainID *big.Int) (receiptsPlus []ReceiptPlus, err error) {
var rs []dbReceiptPlus

var cancel context.CancelFunc
ctx, cancel = o.stopCh.Ctx(ctx)
defer cancel()

// TODO update the query to use tx finality state instead of use block range after Finalizer PR https://github.com/smartcontractkit/chainlink/pull/13638 merged
err = o.q.SelectContext(ctx, &rs, `
SELECT evm.txes.pipeline_task_run_id, evm.receipts.receipt, COALESCE((evm.txes.meta->>'FailOnRevert')::boolean, false) "FailOnRevert" FROM evm.txes
INNER JOIN evm.tx_attempts ON evm.txes.id = evm.tx_attempts.eth_tx_id
INNER JOIN evm.receipts ON evm.tx_attempts.hash = evm.receipts.tx_hash
WHERE evm.txes.pipeline_task_run_id IS NOT NULL AND evm.txes.signal_callback = TRUE AND evm.txes.callback_completed = FALSE
AND evm.receipts.block_number <= $1 AND evm.txes.evm_chain_id = $2
`, blockNum, chainID.String())
AND evm.txes.state = 'confirmed' AND evm.txes.evm_chain_id = $1
`, chainID.String())
if err != nil {
return nil, fmt.Errorf("failed to retrieve transactions pending pipeline resume callback: %w", err)
}
Expand Down
2 changes: 1 addition & 1 deletion core/chains/evm/txmgr/evm_tx_store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -680,7 +680,7 @@ func TestORM_FindTxesPendingCallback(t *testing.T) {
cltest.MustInsertConfirmedEthTxWithLegacyAttempt(t, txStore, 7, 1, fromAddress)

// Search evm.txes table for tx requiring callback
receiptsPlus, err := txStore.FindTxesPendingCallback(tests.Context(t), head.Number, ethClient.ConfiguredChainID())
receiptsPlus, err := txStore.FindTxesPendingCallback(tests.Context(t), ethClient.ConfiguredChainID())
require.NoError(t, err)
assert.Len(t, receiptsPlus, 2)
assert.Equal(t, tr1.ID, receiptsPlus[0].ID)
Expand Down
29 changes: 14 additions & 15 deletions core/chains/evm/txmgr/mocks/evm_tx_store.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

0 comments on commit ca6a2d4

Please sign in to comment.