Skip to content

Commit

Permalink
fix vrf test
Browse files Browse the repository at this point in the history
  • Loading branch information
dhaidashenko committed Jan 25, 2024
1 parent 52e1966 commit bdcf530
Show file tree
Hide file tree
Showing 19 changed files with 383 additions and 77 deletions.
28 changes: 28 additions & 0 deletions common/client/mock_rpc_test.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

8 changes: 8 additions & 0 deletions common/client/multi_node.go
Original file line number Diff line number Diff line change
Expand Up @@ -633,3 +633,11 @@ func (c *multiNode[CHAIN_ID, SEQ, ADDR, BLOCK_HASH, TX, TX_HASH, EVENT, EVENT_OP
}
return n.RPC().TransactionReceipt(ctx, txHash)
}

func (c *multiNode[CHAIN_ID, SEQ, ADDR, BLOCK_HASH, TX, TX_HASH, EVENT, EVENT_OPS, TX_RECEIPT, FEE, HEAD, RPC_CLIENT]) FinalizedBlock(ctx context.Context) (head HEAD, wee error) {
n, err := c.selectNode()
if err != nil {
return head, err
}
return n.RPC().FinalizedBlock(ctx)
}
2 changes: 2 additions & 0 deletions common/client/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"math/big"

"github.com/smartcontractkit/chainlink-common/pkg/assets"

feetypes "github.com/smartcontractkit/chainlink/v2/common/fee/types"
"github.com/smartcontractkit/chainlink/v2/common/types"
)
Expand Down Expand Up @@ -113,6 +114,7 @@ type clientAPI[
BlockByNumber(ctx context.Context, number *big.Int) (HEAD, error)
BlockByHash(ctx context.Context, hash BLOCK_HASH) (HEAD, error)
LatestBlockHeight(context.Context) (*big.Int, error)
FinalizedBlock(ctx context.Context) (HEAD, error)

// Events
FilterEvents(ctx context.Context, query EVENT_OPS) ([]EVENT, error)
Expand Down
160 changes: 117 additions & 43 deletions common/txmgr/confirmer.go
Original file line number Diff line number Diff line change
Expand Up @@ -912,12 +912,38 @@ func (ec *Confirmer[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) han
}
}

// EnsureConfirmedTransactionsInLongestChain finds all confirmed txes up to the depth
// of the given chain and ensures that every one has a receipt with a block hash that is
// in the given chain.
//
// If any of the confirmed transactions does not have a receipt in the chain, it has been
// re-org'd out and will be rebroadcast.
func (ec *Confirmer[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) findFinalizedHeadInChain(ctx context.Context, head types.Head[BLOCK_HASH]) (types.Head[BLOCK_HASH], error) {
if ec.chainConfig.FinalityTagEnabled() {
finalizedHash, finalizedBlockNumber, err := ec.client.FinalizedBlockHash(ctx)
if err != nil {
return nil, fmt.Errorf("failed to get finalized block hash: %w", err)
}

for head != nil {
if head.BlockNumber() == finalizedBlockNumber.Int64() && finalizedHash == head.BlockHash() {
return head, nil
}
head = head.GetParent()
}

return nil, nil
}

finalizedBlock := head.BlockNumber() - int64(ec.chainConfig.FinalityDepth())
for head != nil {
if head.BlockNumber() == finalizedBlock {
return head, nil
}
head = head.GetParent()
}

return nil, nil
}

// EnsureConfirmedTransactionsInLongestChain finds all confirmed txes tries to find them in the provided chain or fetches
// it from an RPC.
// If any of the confirmed transactions does not have a receipt in the chain, it has been re-org'd out and will be rebroadcast.
// If finalized receipt was found marks transaction and the receipt as finalized.
func (ec *Confirmer[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) EnsureConfirmedTransactionsInLongestChain(ctx context.Context, head types.Head[BLOCK_HASH]) error {
if head.ChainLength() < ec.chainConfig.FinalityDepth() {
logArgs := []interface{}{
Expand All @@ -935,19 +961,20 @@ func (ec *Confirmer[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) Ens
ec.nConsecutiveBlocksChainTooShort = 0
}

// fetch all confirmed transactions.
confirmedTxs, err := ec.txStore.FindConfirmedTransactions(ctx, ec.chainID)
missingTxs, err := ec.markFinalizedUsingLocalChain(ctx, head, ec.chainConfig.RPCDefaultBatchSize())
if err != nil {
return fmt.Errorf("FindConfirmedTransactions failed: %w", err)
return fmt.Errorf("failed to find tx missing from the local chain: %w", err)
}

finalizedIDs, err := ec.markFinalized(ctx, head, confirmedTxs)
// Tx might be missing from the chain because it was included into block that is deeper than HistoryDepth.
// Mark finalized txes and returns IDs of transactions whose ID's we failed to find
missingTxIDs, err := ec.fetchAndSaveFinalizedReceipts(ctx, missingTxs)
if err != nil {
return err
}

for _, tx := range confirmedTxs {
if _, ok := finalizedIDs[tx.ID]; ok || hasReceiptInLongestChain(*tx, head) || hasReceiptHigherThanHead(*tx, head) {
for _, tx := range missingTxs {
if _, ok := missingTxIDs[tx.ID]; !ok {
continue
}

Expand All @@ -960,15 +987,15 @@ func (ec *Confirmer[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) Ens
// It is safe to process separate keys concurrently
// NOTE: This design will block one key if another takes a really long time to execute
var wg sync.WaitGroup
errors := []error{}
var errs []error
var errMu sync.Mutex
wg.Add(len(ec.enabledAddresses))
for _, address := range ec.enabledAddresses {
go func(fromAddress ADDR) {
defer wg.Done()
if err := ec.handleAnyInProgressAttempts(ctx, fromAddress, head.BlockNumber()); err != nil {
errMu.Lock()
errors = append(errors, err)
errs = append(errs, err)
errMu.Unlock()
ec.lggr.Errorw("Error in handleAnyInProgressAttempts", "err", err, "fromAddress", fromAddress)
}
Expand All @@ -977,27 +1004,79 @@ func (ec *Confirmer[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) Ens

wg.Wait()

return multierr.Combine(errors...)
return multierr.Combine(errs...)
}

func (ec *Confirmer[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) markFinalized(ctx context.Context, head types.Head[BLOCK_HASH],
txs []*txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) (map[int64]TX_HASH, error) {
finalizedTxs := make(map[int64]TX_HASH, len(txs))
func (ec *Confirmer[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) markFinalizedUsingLocalChain(ctx context.Context,
head types.Head[BLOCK_HASH], batchSize uint32) (
[]*txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], error) {
// fetch all confirmed transactions.
confirmedTxs, err := ec.txStore.FindConfirmedTransactions(ctx, ec.chainID)
if err != nil {
return nil, fmt.Errorf("FindConfirmedTransactions failed: %w", err)
}

finalizedBHead, err := ec.findFinalizedHeadInChain(ctx, head)
if err != nil {
return nil, fmt.Errorf("failed to find finalized block in chain: %w", err)
}

txsMissingInChain := make([]*txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], 0, len(confirmedTxs))
finalizedAttempts := make([]int64, 0, batchSize)
for _, tx := range confirmedTxs {
// transaction is finalized
if finalizedAttempt := findAttemptWithReceiptInChain(*tx, finalizedBHead); finalizedAttempt != nil {
finalizedAttempts = append(finalizedAttempts, finalizedAttempt.ID)

if uint32(len(finalizedAttempts)) >= batchSize {
err := ec.txStore.MarkFinalized(ctx, finalizedAttempts)
if err != nil {
return nil, fmt.Errorf("failed to mark attempts as finalized")
}
finalizedAttempts = finalizedAttempts[:0]
}
continue
}

// tx is present in chain
if confirmedReceipt := findAttemptWithReceiptInChain(*tx, head); confirmedReceipt != nil {
continue
}

// for some reason one of the receipts has block number from the future - it's safer to wait
if hasReceiptHigherThanHead(*tx, head) {
continue
}

txsMissingInChain = append(txsMissingInChain, tx)
}

if len(txs) == 0 {
ec.lggr.Debug("no transactions to mark finalized")
return finalizedTxs, nil
if len(finalizedAttempts) >= 0 {
err := ec.txStore.MarkFinalized(ctx, finalizedAttempts)
if err != nil {
return nil, fmt.Errorf("failed to mark attempts as finalized")
}
}

return txsMissingInChain, nil
}

func (ec *Confirmer[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) fetchAndSaveFinalizedReceipts(
ctx context.Context,
txs []*txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE],
) (map[int64]struct{}, error) {

missingTxs := make(map[int64]struct{}, len(txs))

attemptsBatch := make([]txmgrtypes.TxAttempt[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE],
0, ec.chainConfig.RPCDefaultBatchSize())
for _, tx := range txs {
missingTxs[tx.ID] = struct{}{}
for _, attempt := range tx.TxAttempts {
// TODO: BCI-2605 skip attempts that were broadcasted after the finalized block
attemptsBatch = append(attemptsBatch, attempt)
// save one for block fetching
if uint32(len(attemptsBatch)+1) >= ec.chainConfig.RPCDefaultBatchSize() {
err := ec.markFinalizedBatch(ctx, attemptsBatch, finalizedTxs)
err := ec.fetchAndSaveFinalizedReceiptsBatch(ctx, attemptsBatch, missingTxs)
if err != nil {
return nil, fmt.Errorf("failed to mark txs finalized: %w", err)
}
Expand All @@ -1007,19 +1086,19 @@ func (ec *Confirmer[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) mar
}

if len(attemptsBatch) > 0 {
err := ec.markFinalizedBatch(ctx, attemptsBatch, finalizedTxs)
err := ec.fetchAndSaveFinalizedReceiptsBatch(ctx, attemptsBatch, missingTxs)
if err != nil {
return nil, fmt.Errorf("failed to mark txs finalized: %w", err)
}
}

return finalizedTxs, nil
return missingTxs, nil
}

func (ec *Confirmer[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) markFinalizedBatch(
func (ec *Confirmer[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) fetchAndSaveFinalizedReceiptsBatch(
ctx context.Context,
attempts []txmgrtypes.TxAttempt[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE],
finalizedTxs map[int64]TX_HASH,
missingTxs map[int64]struct{},
) error {

rpcFinalizedBlock, receipts, txErrs, err := ec.client.BatchGetReceiptsWithFinalizedBlock(ctx, attempts,
Expand All @@ -1032,7 +1111,6 @@ func (ec *Confirmer[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) mar
const errMsg = "invariant violation expected number of attempts to match number of resulting receipts/errors"
ec.lggr.With("receipts", receipts, "attempts", attempts, "txErrs", txErrs).Criticalw(errMsg)
return errors.New(errMsg)

}

finalizedReceipts := make([]R, 0, len(attempts))
Expand All @@ -1048,18 +1126,14 @@ func (ec *Confirmer[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) mar
continue
}

delete(missingTxs, attempt.TxID)
if receipt.GetBlockNumber().Cmp(rpcFinalizedBlock) >= 0 {
l.With("batch_request_finalized_block", rpcFinalizedBlock).Debug("attempt is not finalized")
l.With("rpc_finalized_block", rpcFinalizedBlock).Debug("attempt is not finalized")
continue
}

finalizedReceipts = append(finalizedReceipts, receipt)
if anotherTxHash, ok := finalizedTxs[attempt.TxID]; ok {
const errMsg = "found two finalized attempts for single tx"
l.With("another_tx_hash", anotherTxHash).Critical(errMsg)
return errors.New(errMsg)
}
finalizedTxs[attempts[i].TxID] = attempts[i].Hash
finalizedReceipts = append(finalizedReceipts, receipts[i])
missingTxs[attempts[i].TxID] = struct{}{}
}

err = ec.txStore.SaveFinalizedReceipts(ctx, finalizedReceipts, ec.chainID)
Expand Down Expand Up @@ -1089,26 +1163,26 @@ func hasReceiptHigherThanHead[
return false
}

func hasReceiptInLongestChain[
func findAttemptWithReceiptInChain[
CHAIN_ID types.ID,
ADDR types.Hashable,
TX_HASH, BLOCK_HASH types.Hashable,
SEQ types.Sequence,
FEE feetypes.Fee,
](etx txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], head types.Head[BLOCK_HASH]) bool {
for {
](etx txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], head types.Head[BLOCK_HASH]) *txmgrtypes.TxAttempt[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE] {
for head != nil {
for _, attempt := range etx.TxAttempts {
for _, receipt := range attempt.Receipts {
if receipt.GetBlockHash().String() == head.BlockHash().String() && receipt.GetBlockNumber().Int64() == head.BlockNumber() {
return true
return &attempt
}
}
}
if head.GetParent() == nil {
return false
}

head = head.GetParent()
}

return nil
}

func (ec *Confirmer[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) markForRebroadcast(etx txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], head types.Head[BLOCK_HASH]) error {
Expand Down
1 change: 1 addition & 0 deletions common/txmgr/types/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ type TxmClient[
attempts []TxAttempt[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE],
useFinalityTag bool, finalityDepth uint32) (
finalizedBlock *big.Int, txReceipt []R, txErr []error, funcErr error)
FinalizedBlockHash(ctx context.Context) (BLOCK_HASH, *big.Int, error)
}

// TransactionClient contains the methods for building, simulating, broadcasting transactions
Expand Down
26 changes: 22 additions & 4 deletions common/txmgr/types/mocks/tx_store.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading

0 comments on commit bdcf530

Please sign in to comment.