Skip to content

Commit

Permalink
Merge branch 'develop' of github.com:smartcontractkit/chainlink into …
Browse files Browse the repository at this point in the history
…checkLokiUpdates
  • Loading branch information
kalverra committed Oct 9, 2024
2 parents 9d0a9ed + 5ca0d1f commit b22050e
Show file tree
Hide file tree
Showing 30 changed files with 713 additions and 102 deletions.
9 changes: 9 additions & 0 deletions .changeset/brave-ads-explode.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
---
"chainlink": patch
---

Remove finality depth as the default value for minConfirmation for tx jobs.
Update the sql query for fetching pending callback transactions:
if minConfirmation is not null, we check difference if the current block - tx block > minConfirmation
else we check if the tx block is <= finalizedBlock
#updated
5 changes: 5 additions & 0 deletions .changeset/chilled-months-bow.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"chainlink": minor
---

#added oracle support in standard capabilities
6 changes: 3 additions & 3 deletions common/txmgr/confirmer.go
Original file line number Diff line number Diff line change
Expand Up @@ -347,7 +347,7 @@ func (ec *Confirmer[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) pro

if ec.resumeCallback != nil {
mark = time.Now()
if err := ec.ResumePendingTaskRuns(ctx, head); err != nil {
if err := ec.ResumePendingTaskRuns(ctx, head.BlockNumber(), latestFinalizedHead.BlockNumber()); err != nil {
return fmt.Errorf("ResumePendingTaskRuns failed: %w", err)
}

Expand Down Expand Up @@ -1259,8 +1259,8 @@ func (ec *Confirmer[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) sen
}

// ResumePendingTaskRuns issues callbacks to task runs that are pending waiting for receipts
func (ec *Confirmer[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) ResumePendingTaskRuns(ctx context.Context, head types.Head[BLOCK_HASH]) error {
receiptsPlus, err := ec.txStore.FindTxesPendingCallback(ctx, head.BlockNumber(), ec.chainID)
func (ec *Confirmer[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) ResumePendingTaskRuns(ctx context.Context, latest, finalized int64) error {
receiptsPlus, err := ec.txStore.FindTxesPendingCallback(ctx, latest, finalized, ec.chainID)

if err != nil {
return err
Expand Down
31 changes: 16 additions & 15 deletions common/txmgr/types/mocks/tx_store.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion common/txmgr/types/tx_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ type TxStore[
TransactionStore[ADDR, CHAIN_ID, TX_HASH, BLOCK_HASH, SEQ, FEE]

// Find confirmed txes beyond the minConfirmations param that require callback but have not yet been signaled
FindTxesPendingCallback(ctx context.Context, blockNum int64, chainID CHAIN_ID) (receiptsPlus []ReceiptPlus[R], err error)
FindTxesPendingCallback(ctx context.Context, latest, finalized int64, chainID CHAIN_ID) (receiptsPlus []ReceiptPlus[R], err error)
// Update tx to mark that its callback has been signaled
UpdateTxCallbackCompleted(ctx context.Context, pipelineTaskRunRid uuid.UUID, chainId CHAIN_ID) error
SaveFetchedReceipts(ctx context.Context, r []R, state TxState, errorMsg *string, chainID CHAIN_ID) error
Expand Down
10 changes: 5 additions & 5 deletions core/chains/evm/txmgr/confirmer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3055,7 +3055,7 @@ func TestEthConfirmer_ResumePendingRuns(t *testing.T) {
// It would only be in a state past suspended if the resume callback was called and callback_completed was set to TRUE
pgtest.MustExec(t, db, `UPDATE evm.txes SET pipeline_task_run_id = $1, min_confirmations = $2, signal_callback = TRUE, callback_completed = TRUE WHERE id = $3`, &tr.ID, minConfirmations, etx.ID)

err := ec.ResumePendingTaskRuns(tests.Context(t), &head)
err := ec.ResumePendingTaskRuns(tests.Context(t), head.Number, 0)
require.NoError(t, err)
})

Expand All @@ -3073,7 +3073,7 @@ func TestEthConfirmer_ResumePendingRuns(t *testing.T) {

pgtest.MustExec(t, db, `UPDATE evm.txes SET pipeline_task_run_id = $1, min_confirmations = $2, signal_callback = TRUE WHERE id = $3`, &tr.ID, minConfirmations, etx.ID)

err := ec.ResumePendingTaskRuns(tests.Context(t), &head)
err := ec.ResumePendingTaskRuns(tests.Context(t), head.Number, 0)
require.NoError(t, err)
})

Expand Down Expand Up @@ -3101,7 +3101,7 @@ func TestEthConfirmer_ResumePendingRuns(t *testing.T) {
t.Cleanup(func() { <-done })
go func() {
defer close(done)
err2 := ec.ResumePendingTaskRuns(tests.Context(t), &head)
err2 := ec.ResumePendingTaskRuns(tests.Context(t), head.Number, 0)
if !assert.NoError(t, err2) {
return
}
Expand Down Expand Up @@ -3155,7 +3155,7 @@ func TestEthConfirmer_ResumePendingRuns(t *testing.T) {
t.Cleanup(func() { <-done })
go func() {
defer close(done)
err2 := ec.ResumePendingTaskRuns(tests.Context(t), &head)
err2 := ec.ResumePendingTaskRuns(tests.Context(t), head.Number, 0)
if !assert.NoError(t, err2) {
return
}
Expand Down Expand Up @@ -3192,7 +3192,7 @@ func TestEthConfirmer_ResumePendingRuns(t *testing.T) {
mustInsertEthReceipt(t, txStore, head.Number-minConfirmations, head.Hash, etx.TxAttempts[0].Hash)
pgtest.MustExec(t, db, `UPDATE evm.txes SET pipeline_task_run_id = $1, min_confirmations = $2, signal_callback = TRUE WHERE id = $3`, &tr.ID, minConfirmations, etx.ID)

err := ec.ResumePendingTaskRuns(tests.Context(t), &head)
err := ec.ResumePendingTaskRuns(tests.Context(t), head.Number, 0)
require.Error(t, err)

// Retrieve Tx to check if callback completed flag was left unchanged
Expand Down
10 changes: 7 additions & 3 deletions core/chains/evm/txmgr/evm_tx_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -1055,7 +1055,7 @@ WHERE evm.tx_attempts.state = 'in_progress' AND evm.txes.from_address = $1 AND e
}

// Find confirmed txes requiring callback but have not yet been signaled
func (o *evmTxStore) FindTxesPendingCallback(ctx context.Context, blockNum int64, chainID *big.Int) (receiptsPlus []ReceiptPlus, err error) {
func (o *evmTxStore) FindTxesPendingCallback(ctx context.Context, latest, finalized int64, chainID *big.Int) (receiptsPlus []ReceiptPlus, err error) {
var rs []dbReceiptPlus

var cancel context.CancelFunc
Expand All @@ -1066,8 +1066,12 @@ func (o *evmTxStore) FindTxesPendingCallback(ctx context.Context, blockNum int64
INNER JOIN evm.tx_attempts ON evm.txes.id = evm.tx_attempts.eth_tx_id
INNER JOIN evm.receipts ON evm.tx_attempts.hash = evm.receipts.tx_hash
WHERE evm.txes.pipeline_task_run_id IS NOT NULL AND evm.txes.signal_callback = TRUE AND evm.txes.callback_completed = FALSE
AND evm.receipts.block_number <= ($1 - evm.txes.min_confirmations) AND evm.txes.evm_chain_id = $2
`, blockNum, chainID.String())
AND (
(evm.txes.min_confirmations IS NOT NULL AND evm.receipts.block_number <= ($1 - evm.txes.min_confirmations))
OR (evm.txes.min_confirmations IS NULL AND evm.receipts.block_number <= $2)
)
AND evm.txes.evm_chain_id = $3
`, latest, finalized, chainID.String())
if err != nil {
return nil, fmt.Errorf("failed to retrieve transactions pending pipeline resume callback: %w", err)
}
Expand Down
24 changes: 20 additions & 4 deletions core/chains/evm/txmgr/evm_tx_store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -652,7 +652,7 @@ func TestORM_FindTxesPendingCallback(t *testing.T) {
etx1 := cltest.MustInsertConfirmedEthTxWithLegacyAttempt(t, txStore, 3, 1, fromAddress)
pgtest.MustExec(t, db, `UPDATE evm.txes SET meta='{"FailOnRevert": true}'`)
attempt1 := etx1.TxAttempts[0]
mustInsertEthReceipt(t, txStore, head.Number-minConfirmations, head.Hash, attempt1.Hash)
etxBlockNum := mustInsertEthReceipt(t, txStore, head.Number-minConfirmations, head.Hash, attempt1.Hash).BlockNumber
pgtest.MustExec(t, db, `UPDATE evm.txes SET pipeline_task_run_id = $1, min_confirmations = $2, signal_callback = TRUE WHERE id = $3`, &tr1.ID, minConfirmations, etx1.ID)

// Callback to pipeline service completed. Should be ignored
Expand Down Expand Up @@ -685,10 +685,26 @@ func TestORM_FindTxesPendingCallback(t *testing.T) {
pgtest.MustExec(t, db, `UPDATE evm.txes SET min_confirmations = $1 WHERE id = $2`, minConfirmations, etx5.ID)

// Search evm.txes table for tx requiring callback
receiptsPlus, err := txStore.FindTxesPendingCallback(tests.Context(t), head.Number, ethClient.ConfiguredChainID())
receiptsPlus, err := txStore.FindTxesPendingCallback(tests.Context(t), head.Number, 0, ethClient.ConfiguredChainID())
require.NoError(t, err)
assert.Len(t, receiptsPlus, 1)
assert.Equal(t, tr1.ID, receiptsPlus[0].ID)
if assert.Len(t, receiptsPlus, 1) {
assert.Equal(t, tr1.ID, receiptsPlus[0].ID)
}

// Clear min_confirmations
pgtest.MustExec(t, db, `UPDATE evm.txes SET min_confirmations = NULL WHERE id = $1`, etx1.ID)

// Search evm.txes table for tx requiring callback
receiptsPlus, err = txStore.FindTxesPendingCallback(tests.Context(t), head.Number, 0, ethClient.ConfiguredChainID())
require.NoError(t, err)
assert.Empty(t, receiptsPlus)

// Search evm.txes table for tx requiring callback, with block 1 finalized
receiptsPlus, err = txStore.FindTxesPendingCallback(tests.Context(t), head.Number, etxBlockNum, ethClient.ConfiguredChainID())
require.NoError(t, err)
if assert.Len(t, receiptsPlus, 1) {
assert.Equal(t, tr1.ID, receiptsPlus[0].ID)
}
}

func Test_FindTxWithIdempotencyKey(t *testing.T) {
Expand Down
31 changes: 16 additions & 15 deletions core/chains/evm/txmgr/mocks/evm_tx_store.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion core/scripts/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ require (
github.com/prometheus/client_golang v1.20.0
github.com/shopspring/decimal v1.4.0
github.com/smartcontractkit/chainlink-automation v1.0.4
github.com/smartcontractkit/chainlink-common v0.2.3-0.20241001210038-dd59341432bd
github.com/smartcontractkit/chainlink-common v0.2.3-0.20241008170407-8bfcea33a98d
github.com/smartcontractkit/chainlink/v2 v2.0.0-00010101000000-000000000000
github.com/smartcontractkit/libocr v0.0.0-20240717100443-f6226e09bee7
github.com/spf13/cobra v1.8.1
Expand Down
4 changes: 2 additions & 2 deletions core/scripts/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -1099,8 +1099,8 @@ github.com/smartcontractkit/chainlink-automation v1.0.4 h1:iyW181JjKHLNMnDleI8um
github.com/smartcontractkit/chainlink-automation v1.0.4/go.mod h1:u4NbPZKJ5XiayfKHD/v3z3iflQWqvtdhj13jVZXj/cM=
github.com/smartcontractkit/chainlink-ccip v0.0.0-20241009125450-4290917273fb h1:bYT7j5syymCtAKv/gGG7CLp4APe/VDWkig9Ph7mc8fQ=
github.com/smartcontractkit/chainlink-ccip v0.0.0-20241009125450-4290917273fb/go.mod h1:WbtjuYMnDAYojL3CSWmruc1ecSBgSTggzXJ6F1U6bxw=
github.com/smartcontractkit/chainlink-common v0.2.3-0.20241001210038-dd59341432bd h1:DraA9kRpkyI02OEx7QDbSq3bCYxVFZ78TdOP2rUvHts=
github.com/smartcontractkit/chainlink-common v0.2.3-0.20241001210038-dd59341432bd/go.mod h1:F6WUS6N4mP5ScwpwyTyAJc9/vjR+GXbMCRUOVekQi1g=
github.com/smartcontractkit/chainlink-common v0.2.3-0.20241008170407-8bfcea33a98d h1:GpGKzVtDWSb8cesHSvm+LQpCJpFdfVPk3us5wAY6ixI=
github.com/smartcontractkit/chainlink-common v0.2.3-0.20241008170407-8bfcea33a98d/go.mod h1:+yz2So1Lp90C3ATfMmqMEJaIr3zfG8e4xHa/3u1kIDk=
github.com/smartcontractkit/chainlink-cosmos v0.4.1-0.20240911175228-daf2600bb7b7 h1:lTGIOQYLk1Ufn++X/AvZnt6VOcuhste5yp+C157No/Q=
github.com/smartcontractkit/chainlink-cosmos v0.4.1-0.20240911175228-daf2600bb7b7/go.mod h1:BMYE1vC/pGmdFSsOJdPrAA0/4gZ0Xo0SxTMdGspBtRo=
github.com/smartcontractkit/chainlink-data-streams v0.0.0-20240916152957-433914114bd2 h1:yRk4ektpx/UxwarqAfgxUXLrsYXlaNeP1NOwzHGrK2Q=
Expand Down
23 changes: 14 additions & 9 deletions core/services/chainlink/application.go
Original file line number Diff line number Diff line change
Expand Up @@ -453,15 +453,6 @@ func NewApplication(opts ApplicationOpts) (Application, error) {
pipelineRunner,
cfg.JobPipeline(),
),
job.StandardCapabilities: standardcapabilities.NewDelegate(
globalLogger,
opts.DS, jobORM,
opts.CapabilitiesRegistry,
loopRegistrarConfig,
telemetryManager,
pipelineRunner,
opts.RelayerChainInteroperators,
gatewayConnectorWrapper),
}
webhookJobRunner = delegates[job.Webhook].(*webhook.Delegate).WebhookJobRunner()
)
Expand Down Expand Up @@ -501,6 +492,20 @@ func NewApplication(opts ApplicationOpts) (Application, error) {
return nil, fmt.Errorf("P2P stack required for OCR or OCR2")
}

// If peer wrapper is initialized, Oracle Factory dependency will be available to standard capabilities
delegates[job.StandardCapabilities] = standardcapabilities.NewDelegate(
globalLogger,
opts.DS, jobORM,
opts.CapabilitiesRegistry,
loopRegistrarConfig,
telemetryManager,
pipelineRunner,
opts.RelayerChainInteroperators,
gatewayConnectorWrapper,
keyStore,
peerWrapper,
)

if cfg.OCR().Enabled() {
delegates[job.OffchainReporting] = ocr.NewDelegate(
opts.DS,
Expand Down
Loading

0 comments on commit b22050e

Please sign in to comment.