Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement ZK Overflow Status Checker and Idempotency Key Management #13496

Closed
wants to merge 6 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .changeset/chilled-icons-turn.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"chainlink": minor
---

#added Introducing a TXM status checker for CCIP to retrieve zk overflow transaction for a specific messageId
53 changes: 53 additions & 0 deletions core/services/ocrcommon/transmitter.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package ocrcommon

import (
"context"
"fmt"
"math/big"
"slices"

Expand All @@ -11,6 +12,7 @@ import (
"github.com/smartcontractkit/chainlink/v2/common/txmgr/types"
"github.com/smartcontractkit/chainlink/v2/core/chains/evm/forwarders"
"github.com/smartcontractkit/chainlink/v2/core/chains/evm/txmgr"
"github.com/smartcontractkit/chainlink/v2/core/services/relay/evm/statuschecker"
)

type roundRobinKeystore interface {
Expand All @@ -35,6 +37,7 @@ type transmitter struct {
checker txmgr.TransmitCheckerSpec
chainID *big.Int
keystore roundRobinKeystore
statuschecker statuschecker.TransactionStatusChecker // Used for CCIP's idempotency key generation
}

// NewTransmitter creates a new eth transmitter
Expand Down Expand Up @@ -65,6 +68,39 @@ func NewTransmitter(
}, nil
}

func NewTransmitterWithStatusChecker(
txm txManager,
fromAddresses []common.Address,
gasLimit uint64,
effectiveTransmitterAddress common.Address,
strategy types.TxStrategy,
checker txmgr.TransmitCheckerSpec,
chainID *big.Int,
keystore roundRobinKeystore,
statuschecker statuschecker.TransactionStatusChecker,
0xnogo marked this conversation as resolved.
Show resolved Hide resolved
) (Transmitter, error) {
// Ensure that a keystore is provided.
if keystore == nil {
return nil, errors.New("nil keystore provided to transmitter")
}

if statuschecker == nil {
return nil, errors.New("nil statuschecker provided to transmitter")
}

return &transmitter{
txm: txm,
fromAddresses: fromAddresses,
gasLimit: gasLimit,
effectiveTransmitterAddress: effectiveTransmitterAddress,
strategy: strategy,
checker: checker,
chainID: chainID,
keystore: keystore,
statuschecker: statuschecker,
}, nil
}

type txManagerOCR2 interface {
CreateTransaction(ctx context.Context, txRequest txmgr.TxRequest) (tx txmgr.Tx, err error)
GetForwarderForEOAOCR2Feeds(ctx context.Context, eoa, ocr2AggregatorID common.Address) (forwarder common.Address, err error)
Expand Down Expand Up @@ -116,7 +152,24 @@ func (t *transmitter) CreateEthTransaction(ctx context.Context, toAddress common
return errors.Wrap(err, "skipped OCR transmission, error getting round-robin address")
}

var idempotencyKey *string

// Define idempotency key for CCIP transactions
if len(txMeta.MessageIDs) > 0 && t.statuschecker != nil {
messageIds := txMeta.MessageIDs
_, count, err1 := t.statuschecker.CheckMessageStatus(ctx, messageIds[0])

if err1 != nil {
return errors.Wrap(err, "skipped OCR transmission, error getting message status")
}
idempotencyKey = func() *string {
s := fmt.Sprintf("%s-%d", messageIds[0], count+1)
return &s
}()
}

_, err = t.txm.CreateTransaction(ctx, txmgr.TxRequest{
IdempotencyKey: idempotencyKey,
FromAddress: roundRobinFromAddress,
ToAddress: toAddress,
EncodedPayload: payload,
Expand Down
52 changes: 52 additions & 0 deletions core/services/ocrcommon/transmitter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@ import (
"github.com/smartcontractkit/chainlink/v2/core/internal/testutils"
"github.com/smartcontractkit/chainlink/v2/core/internal/testutils/pgtest"
"github.com/smartcontractkit/chainlink/v2/core/services/ocrcommon"
"github.com/smartcontractkit/chainlink/v2/core/services/relay/evm/mocks"
"github.com/smartcontractkit/chainlink/v2/core/services/relay/evm/statuschecker"
)

func newMockTxStrategy(t *testing.T) *commontxmmocks.TxStrategy {
Expand Down Expand Up @@ -169,3 +171,53 @@ func Test_DefaultTransmitter_Forwarding_Enabled_CreateEthTransaction_No_Keystore
)
require.Error(t, err)
}

func Test_Transmitter_With_StatusChecker_CreateEthTransaction(t *testing.T) {
t.Parallel()

db := pgtest.NewSqlxDB(t)
ethKeyStore := cltest.NewKeyStore(t, db).Eth()

_, fromAddress := cltest.MustInsertRandomKey(t, ethKeyStore)

gasLimit := uint64(1000)
chainID := big.NewInt(0)
effectiveTransmitterAddress := fromAddress
txm := txmmocks.NewMockEvmTxManager(t)
strategy := newMockTxStrategy(t)
statusChecker := mocks.NewTransactionStatusChecker(t)
toAddress := testutils.NewAddress()
payload := []byte{1, 2, 3}
idempotencyKey := "1-0"
txMeta := &txmgr.TxMeta{MessageIDs: []string{"1"}}

transmitter, err := ocrcommon.NewTransmitterWithStatusChecker(
txm,
[]common.Address{fromAddress},
gasLimit,
effectiveTransmitterAddress,
strategy,
txmgr.TransmitCheckerSpec{},
chainID,
ethKeyStore,
statusChecker,
)
require.NoError(t, err)

statusChecker.On("CheckMessageStatus", mock.Anything, "1").Return([]statuschecker.TransactionStatus{}, -1, nil).Once()

txm.On("CreateTransaction", mock.Anything, txmgr.TxRequest{
IdempotencyKey: &idempotencyKey,
FromAddress: fromAddress,
ToAddress: toAddress,
EncodedPayload: payload,
FeeLimit: gasLimit,
ForwarderAddress: common.Address{},
Meta: txMeta,
Strategy: strategy,
}).Return(txmgr.Tx{}, nil).Once()

require.NoError(t, transmitter.CreateEthTransaction(testutils.Context(t), toAddress, payload, txMeta))
// check that the status checker was called
statusChecker.AssertExpectations(t)
}
14 changes: 14 additions & 0 deletions core/services/relay/evm/evm.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ import (
reportcodecv2 "github.com/smartcontractkit/chainlink/v2/core/services/relay/evm/mercury/v2/reportcodec"
reportcodecv3 "github.com/smartcontractkit/chainlink/v2/core/services/relay/evm/mercury/v3/reportcodec"
"github.com/smartcontractkit/chainlink/v2/core/services/relay/evm/mercury/wsrpc"
"github.com/smartcontractkit/chainlink/v2/core/services/relay/evm/statuschecker"
"github.com/smartcontractkit/chainlink/v2/core/services/relay/evm/types"
)

Expand Down Expand Up @@ -597,6 +598,19 @@ func newOnChainContractTransmitter(ctx context.Context, lggr logger.Logger, rarg
configWatcher.chain.ID(),
ethKeystore,
)
case commontypes.CCIPExecution:
transmitter, err = ocrcommon.NewTransmitterWithStatusChecker(
configWatcher.chain.TxManager(),
fromAddresses,
gasLimit,
effectiveTransmitterAddress,
strategy,
checker,
configWatcher.chain.ID(),
ethKeystore,
statuschecker.NewTransactionStatusChecker(nil), // TODO: remove after TXM changes are merged
// statuschecker.NewTransactionStatusChecker(configWatcher.chain.TxManager()), // TODO: uncomment after TXM changes are merged
)
default:
transmitter, err = ocrcommon.NewTransmitter(
configWatcher.chain.TxManager(),
Expand Down
66 changes: 66 additions & 0 deletions core/services/relay/evm/mocks/transaction_status_checker.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

66 changes: 66 additions & 0 deletions core/services/relay/evm/statuschecker/txm_status_checker.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
package statuschecker

import (
"context"
"fmt"
"strings"
)

//go:generate mockery --quiet --name TransactionStatusChecker --output ../mocks/ --case=underscore

// TODO replace with actual implementation coming from "github.com/smartcontractkit/chainlink-common/pkg/types"
type TransactionStatus int

const (
Unknown TransactionStatus = iota
Unconfirmed
Finalized
Failed
Fatal
)

type TxManager interface {
GetTransactionStatus(ctx context.Context, transactionID string) (TransactionStatus, error)
}

type TransactionStatusChecker interface {
CheckMessageStatus(ctx context.Context, msgID string) ([]TransactionStatus, int, error)
0xnogo marked this conversation as resolved.
Show resolved Hide resolved
}

type TxmStatusChecker struct {
txManager TxManager
}

type NoOpTxManager struct{}

func (n *NoOpTxManager) GetTransactionStatus() error {
return nil
}
0xnogo marked this conversation as resolved.
Show resolved Hide resolved

func NewTransactionStatusChecker(txManager TxManager) *TxmStatusChecker {
return &TxmStatusChecker{txManager: txManager}
}

// CheckMessageStatus checks the status of a message by checking the status of all transactions associated with the message ID.
// It returns a slice of all statuses and the number of transactions found (-1 if none).
// The key will follow the format: <msgID>-<counter>. TXM will be queried for each key until a NotFound error is returned.
// The goal is to find all transactions associated with a message ID and snooze messages if they are fatal in the Execution Plugin.
func (tsc *TxmStatusChecker) CheckMessageStatus(ctx context.Context, msgID string) ([]TransactionStatus, int, error) {
0xnogo marked this conversation as resolved.
Show resolved Hide resolved
var allStatuses []TransactionStatus
var counter int

for {
transactionID := fmt.Sprintf("%s-%d", msgID, counter)
status, err := tsc.txManager.GetTransactionStatus(ctx, transactionID)
if err != nil {
if status == Unknown && strings.Contains(err.Error(), fmt.Sprintf("failed to find transaction with IdempotencyKey %s", transactionID)) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If err != nil then status may not have a usable value, or at least, shouldn't. Lets make sure of the API contract here because this is kinda unidiomatic code

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

https://github.com/smartcontractkit/chainlink/pull/13040/files

func (b *Txm[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) GetTransactionStatus(ctx context.Context, transactionID string) (status commontypes.TransactionStatus, err error) {
	// Loads attempts and receipts in the transaction
	tx, err := b.txStore.FindTxWithIdempotencyKey(ctx, transactionID, b.chainID)
	if err != nil {
		return status, fmt.Errorf("failed to find transaction with IdempotencyKey %s: %w", transactionID, err)
	}
	// This check is required since a no-rows error returns nil err
	if tx == nil {
		return status, fmt.Errorf("failed to find transaction with IdempotencyKey %s", transactionID)
	}
	switch tx.State {
	case TxUnconfirmed, TxConfirmedMissingReceipt:
		// Return unconfirmed for ConfirmedMissingReceipt since a receipt is required to determine if it is finalized
		return commontypes.Unconfirmed, nil
	case TxConfirmed:
		// TODO: Check for finality and return finalized status
		// Return unconfirmed if tx receipt's block is newer than the latest finalized block
		return commontypes.Unconfirmed, nil
	case TxFatalError:
		// Use an ErrorClassifier to determine if the transaction is considered Fatal
		txErr := b.newErrorClassifier(tx.GetError())
		if txErr != nil && txErr.IsFatal() {
			return commontypes.Fatal, tx.GetError()
		}
		// Return failed for all other tx's marked as FatalError
		return commontypes.Failed, tx.GetError()
	default:
		// Unstarted and InProgress are classified as unknown since they are not supported by the ChainWriter interface
		return commontypes.Unknown, nil
	}
}

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Status will always return the default value Unknown in this case. But I agree that this check is useless. We could just rely on the error check.

break
}
return nil, counter - 1, err
}
allStatuses = append(allStatuses, status)
counter++
}

return allStatuses, counter - 1, nil
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need all statuses or just the latest is good enough? Need all right?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does it make sense to put a cap (even if its like 1,000) on the # of statuses to return to avoid getting into an infinite loop here?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We probably would want to skip that msg anyway if it was retried 1000s of times

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need all statuses or just the latest is good enough? Need all right?

Yes we would need all. The main scenario is a message (mi-0) submitted at round r0 and only gets resolved at r2. if we fetch the status mi-2 we won't know what the message is a zko has only mi-0 resolved

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does it make sense to put a cap (even if its like 1,000) on the # of statuses to return to avoid getting into an infinite loop here?

Good point. And yes, we should probably return an error and consider that something is wrong with the message. So returning an error here will signal the batching logic to snooze it.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Rough math:
deltaRound of 30s and 1000 retries so that 1000*30/3600 = 8h.

Say the chain has an outage, we will allow up to 8 hours of outage before dropping the message. It is unlikely to happen but it is leaving us enough room

}
Loading
Loading