Skip to content

Commit

Permalink
Fix/tx checker (#44)
Browse files Browse the repository at this point in the history
* find tx using txsearch with tx hash

* change sequence

* query block header instead of block

* handle tx failed

* remove redunt sequence err
  • Loading branch information
sh-cha authored Nov 18, 2024
1 parent 4434dfb commit 5f2e7de
Show file tree
Hide file tree
Showing 8 changed files with 124 additions and 120 deletions.
8 changes: 4 additions & 4 deletions node/broadcaster/account.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,8 @@ import (

var _ client.AccountRetriever = &Broadcaster{}

func (b *Broadcaster) loadAccount() error {
account, err := b.GetAccount(b.getClientCtx(), b.keyAddress)
func (b *Broadcaster) loadAccount(ctx context.Context) error {
account, err := b.GetAccount(b.getClientCtx(ctx), b.keyAddress)
if err != nil {
return err
}
Expand All @@ -45,15 +45,15 @@ func (b *Broadcaster) GetAccount(clientCtx client.Context, addr sdk.AccAddress)
// GetAccountWithHeight queries for an account given an address. Returns the
// height of the query with the account. An error is returned if the query
// or decoding fails.
func (b *Broadcaster) GetAccountWithHeight(_ client.Context, addr sdk.AccAddress) (client.Account, int64, error) {
func (b *Broadcaster) GetAccountWithHeight(clienCtx client.Context, addr sdk.AccAddress) (client.Account, int64, error) {
var header metadata.MD
address, err := keys.EncodeBech32AccAddr(addr, b.cfg.Bech32Prefix)
if err != nil {
return nil, 0, err
}

queryClient := authtypes.NewQueryClient(b.rpcClient)
res, err := queryClient.Account(context.Background(), &authtypes.QueryAccountRequest{Address: address}, grpc.Header(&header))
res, err := queryClient.Account(clienCtx.CmdContext, &authtypes.QueryAccountRequest{Address: address}, grpc.Header(&header))
if err != nil {
return nil, 0, err
}
Expand Down
7 changes: 4 additions & 3 deletions node/broadcaster/broadcaster.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,12 +122,13 @@ func (b *Broadcaster) Initialize(ctx context.Context, status *rpccoretypes.Resul
return b.prepareBroadcaster(ctx, status.SyncInfo.LatestBlockTime)
}

func (b Broadcaster) getClientCtx() client.Context {
func (b Broadcaster) getClientCtx(ctx context.Context) client.Context {
return client.Context{}.WithClient(b.rpcClient).
WithInterfaceRegistry(b.cdc.InterfaceRegistry()).
WithChainID(b.cfg.ChainID).
WithCodec(b.cdc).
WithFromAddress(b.keyAddress)
WithFromAddress(b.keyAddress).
WithCmdContext(ctx)
}

func (b Broadcaster) GetTxf() tx.Factory {
Expand All @@ -144,7 +145,7 @@ func (b *Broadcaster) prepareBroadcaster(ctx context.Context, lastBlockTime time
WithKeybase(b.keyBase).
WithSignMode(signing.SignMode_SIGN_MODE_DIRECT)

err := b.loadAccount()
err := b.loadAccount(ctx)
if err != nil {
return err
}
Expand Down
99 changes: 38 additions & 61 deletions node/broadcaster/process.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,85 +20,60 @@ func (b Broadcaster) GetHeight() int64 {
return b.lastProcessedBlockHeight + 1
}

// HandleNewBlock is called when a new block is received.
func (b *Broadcaster) HandleNewBlock(block *rpccoretypes.ResultBlock, blockResult *rpccoretypes.ResultBlockResults, latestChainHeight int64) error {
// check pending txs first
for _, tx := range block.Block.Txs {
if b.LenLocalPendingTx() == 0 {
break
// CheckPendingTx query tx info to check if pending tx is processed.
func (b *Broadcaster) CheckPendingTx(ctx context.Context, pendingTx btypes.PendingTxInfo) (*rpccoretypes.ResultTx, time.Time, error) {
txHash, err := hex.DecodeString(pendingTx.TxHash)
if err != nil {
return nil, time.Time{}, err
}
res, txerr := b.rpcClient.QueryTx(ctx, txHash)
if txerr != nil {
// if the tx is not found, it means the tx is not processed yet
// or the tx is not indexed by the node in rare cases.
lastHeader, err := b.rpcClient.Header(ctx, nil)
if err != nil {
return nil, time.Time{}, err
}

// check if the first pending tx is included in the block
if pendingTx := b.peekLocalPendingTx(); btypes.TxHash(tx) == pendingTx.TxHash {
err := b.RemovePendingTx(block.Block.Height, pendingTx.TxHash, pendingTx.Sequence, pendingTx.MsgTypes)
pendingTxTime := time.Unix(0, pendingTx.Timestamp)

// before timeout
if lastHeader.Header.Time.Before(pendingTxTime.Add(b.cfg.TxTimeout)) {
b.logger.Debug("failed to query tx", zap.String("tx_hash", pendingTx.TxHash), zap.String("error", txerr.Error()))
return nil, time.Time{}, types.ErrTxNotFound
} else {
// timeout case
account, err := b.GetAccount(b.getClientCtx(ctx), b.keyAddress)
if err != nil {
return err
return nil, time.Time{}, err
}
}
}

// check timeout of pending txs
// @sh-cha: should we rebroadcast pending txs? or raising monitoring alert?
if length := b.LenLocalPendingTx(); length > 0 {
b.logger.Debug("remaining pending txs", zap.Int64("height", block.Block.Height), zap.Int("count", length))
pendingTxTime := time.Unix(0, b.peekLocalPendingTx().Timestamp)
if block.Block.Time.After(pendingTxTime.Add(b.cfg.TxTimeout)) {
panic(fmt.Errorf("something wrong, pending txs are not processed for a long time; current block time: %s, pending tx processing time: %s", block.Block.Time.UTC().String(), pendingTxTime.UTC().String()))
// if sequence is larger than the sequence of the pending tx,
// handle it as the tx has already been processed
if pendingTx.Sequence < account.GetSequence() {
return nil, time.Time{}, nil
}
panic(fmt.Errorf("something wrong, pending txs are not processed for a long time; current block time: %s, pending tx processing time: %s", time.Now().UTC().String(), pendingTxTime.UTC().String()))
}
} else if res.TxResult.Code != 0 {
panic(fmt.Errorf("tx failed, tx hash: %s, code: %d, log: %s; you might need to check gas adjustment config or balance", pendingTx.TxHash, res.TxResult.Code, res.TxResult.Log))
}

// update last processed block height
b.lastProcessedBlockHeight = latestChainHeight

return nil
}

// CheckPendingTx query tx info to check if pending tx is processed.
func (b *Broadcaster) CheckPendingTx(ctx context.Context) (*btypes.PendingTxInfo, *rpccoretypes.ResultTx, time.Time, error) {
if b.LenLocalPendingTx() == 0 {
return nil, nil, time.Time{}, nil
}

pendingTx := b.peekLocalPendingTx()
pendingTxTime := time.Unix(0, b.peekLocalPendingTx().Timestamp)

lastBlockResult, err := b.rpcClient.Block(ctx, nil)
if err != nil {
return nil, nil, time.Time{}, err
}
if lastBlockResult.Block.Time.After(pendingTxTime.Add(b.cfg.TxTimeout)) {
// @sh-cha: should we rebroadcast pending txs? or raising monitoring alert?
panic(fmt.Errorf("something wrong, pending txs are not processed for a long time; current block time: %s, pending tx processing time: %s", time.Now().UTC().String(), pendingTxTime.UTC().String()))
}

txHash, err := hex.DecodeString(pendingTx.TxHash)
if err != nil {
return nil, nil, time.Time{}, err
}
res, err := b.rpcClient.QueryTx(ctx, txHash)
if err != nil {
b.logger.Debug("failed to query tx", zap.String("tx_hash", pendingTx.TxHash), zap.String("error", err.Error()))
return nil, nil, time.Time{}, nil
}

blockResult, err := b.rpcClient.Block(ctx, &res.Height)
header, err := b.rpcClient.Header(ctx, &res.Height)
if err != nil {
return nil, nil, time.Time{}, err
return nil, time.Time{}, err
}
return &pendingTx, res, blockResult.Block.Time, nil
return res, header.Header.Time, nil
}

// RemovePendingTx remove pending tx from local pending txs.
// It is called when the pending tx is included in the block.
func (b *Broadcaster) RemovePendingTx(blockHeight int64, txHash string, sequence uint64, msgTypes []string) error {
func (b *Broadcaster) RemovePendingTx(sequence uint64) error {
err := b.deletePendingTx(sequence)
if err != nil {
return err
}

b.logger.Info("tx inserted", zap.Int64("height", blockHeight), zap.Uint64("sequence", sequence), zap.String("tx_hash", txHash), zap.Strings("msg_types", msgTypes))
b.dequeueLocalPendingTx()

return nil
}

Expand All @@ -124,7 +99,9 @@ func (b *Broadcaster) Start(ctx context.Context) error {
break
}
b.logger.Warn(fmt.Sprintf("retry to handle processed msgs after %d seconds", int(2*math.Exp2(float64(retry)))), zap.Int("count", retry), zap.String("error", err.Error()))
types.SleepWithRetry(ctx, retry)
if types.SleepWithRetry(ctx, retry) {
return nil
}
}
if err != nil {
return errors.Wrap(err, "failed to handle processed msgs")
Expand Down
11 changes: 7 additions & 4 deletions node/broadcaster/tx.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ import (

"go.uber.org/zap"

"github.com/pkg/errors"

sdkerrors "cosmossdk.io/errors"
abci "github.com/cometbft/cometbft/abci/types"

Expand Down Expand Up @@ -50,8 +52,6 @@ func (b *Broadcaster) handleMsgError(err error) error {
b.txf = b.txf.WithSequence(expected)
}

// account sequence mismatched
// TODO: handle mismatched sequence
return err
}

Expand Down Expand Up @@ -237,11 +237,14 @@ func (b *Broadcaster) enqueueLocalPendingTx(tx btypes.PendingTxInfo) {
b.pendingTxs = append(b.pendingTxs, tx)
}

func (b *Broadcaster) peekLocalPendingTx() btypes.PendingTxInfo {
func (b *Broadcaster) PeekLocalPendingTx() (btypes.PendingTxInfo, error) {
b.pendingTxMu.Lock()
defer b.pendingTxMu.Unlock()

return b.pendingTxs[0]
if len(b.pendingTxs) == 0 {
return btypes.PendingTxInfo{}, errors.New("no pending txs")
}
return b.pendingTxs[0], nil
}

func (b Broadcaster) LenLocalPendingTx() int {
Expand Down
32 changes: 15 additions & 17 deletions node/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -134,23 +134,9 @@ func (n *Node) Start(ctx context.Context) {
n.broadcaster.BroadcastPendingProcessedMsgs()
}

if n.cfg.ProcessType == nodetypes.PROCESS_TYPE_ONLY_BROADCAST {
if n.broadcaster == nil {
panic("broadcaster cannot be nil with nodetypes.PROCESS_TYPE_ONLY_BROADCAST")
}

errGrp.Go(func() (err error) {
defer func() {
n.logger.Info("tx checker looper stopped")
if r := recover(); r != nil {
n.logger.Error("tx checker panic", zap.Any("recover", r))
err = fmt.Errorf("tx checker panic: %v", r)
}
}()

return n.txChecker(ctx)
})
} else {
enableEventHandler := true
if n.cfg.ProcessType != nodetypes.PROCESS_TYPE_ONLY_BROADCAST {
enableEventHandler = false
errGrp.Go(func() (err error) {
defer func() {
n.logger.Info("block process looper stopped")
Expand All @@ -163,6 +149,18 @@ func (n *Node) Start(ctx context.Context) {
return n.blockProcessLooper(ctx, n.cfg.ProcessType)
})
}

errGrp.Go(func() (err error) {
defer func() {
n.logger.Info("tx checker looper stopped")
if r := recover(); r != nil {
n.logger.Error("tx checker panic", zap.Any("recover", r))
err = fmt.Errorf("tx checker panic: %v", r)
}
}()

return n.txChecker(ctx, enableEventHandler)
})
}

func (n Node) AccountCodec() address.Codec {
Expand Down
78 changes: 50 additions & 28 deletions node/process.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,9 @@ func (n *Node) blockProcessLooper(ctx context.Context, processType nodetypes.Blo
case <-ctx.Done():
return nil
case <-timer.C:
types.SleepWithRetry(ctx, consecutiveErrors)
if types.SleepWithRetry(ctx, consecutiveErrors) {
return nil
}
consecutiveErrors++
}

Expand Down Expand Up @@ -130,14 +132,6 @@ func (n *Node) handleNewBlock(ctx context.Context, block *rpccoretypes.ResultBlo
return err
}

// handle broadcaster first to check pending txs
if n.broadcaster != nil {
err := n.broadcaster.HandleNewBlock(block, blockResult, latestChainHeight)
if err != nil {
return err
}
}

if n.beginBlockHandler != nil {
err := n.beginBlockHandler(ctx, nodetypes.BeginBlockArgs{
BlockID: block.BlockID.Hash,
Expand Down Expand Up @@ -212,7 +206,10 @@ func (n *Node) handleEvent(ctx context.Context, blockHeight int64, blockTime tim
}

// txChecker checks pending txs and handle events if the tx is included in the block
func (n *Node) txChecker(ctx context.Context) error {
// in the case that the tx hash is not indexed by the node even if the tx is processed,
// event handler will not be called.
// so, it is recommended to use the event handler only for the check event (e.g. logs)
func (n *Node) txChecker(ctx context.Context, enableEventHandler bool) error {
if n.broadcaster == nil {
return nil
}
Expand All @@ -225,39 +222,64 @@ func (n *Node) txChecker(ctx context.Context) error {
case <-ctx.Done():
return nil
case <-timer.C:
types.SleepWithRetry(ctx, consecutiveErrors)
if n.broadcaster.LenLocalPendingTx() == 0 {
continue
}

n.logger.Debug("remaining pending txs", zap.Int("count", n.broadcaster.LenLocalPendingTx()))

if types.SleepWithRetry(ctx, consecutiveErrors) {
return nil
}
consecutiveErrors++
}

pendingTx, res, blockTime, err := n.broadcaster.CheckPendingTx(ctx)
pendingTx, err := n.broadcaster.PeekLocalPendingTx()
if err != nil {
return err
} else if pendingTx == nil || res == nil {
// tx not found
continue
}

if len(n.eventHandlers) != 0 {
events := res.TxResult.GetEvents()
for eventIndex, event := range events {
select {
case <-ctx.Done():
return nil
default:
}
height := int64(0)

err := n.handleEvent(ctx, res.Height, blockTime, 0, event)
if err != nil {
n.logger.Error("failed to handle event", zap.String("tx_hash", pendingTx.TxHash), zap.Int("event_index", eventIndex), zap.String("error", err.Error()))
break
res, blockTime, err := n.broadcaster.CheckPendingTx(ctx, pendingTx)
if errors.Is(err, types.ErrTxNotFound) {
// tx not found
continue
} else if err != nil {
return err
} else if res != nil {
// tx found
height = res.Height
// it only handles the tx if node is only broadcasting txs, not processing blocks
if enableEventHandler && len(n.eventHandlers) != 0 {
events := res.TxResult.GetEvents()
for eventIndex, event := range events {
select {
case <-ctx.Done():
return nil
default:
}

err := n.handleEvent(ctx, res.Height, blockTime, 0, event)
if err != nil {
n.logger.Error("failed to handle event", zap.String("tx_hash", pendingTx.TxHash), zap.Int("event_index", eventIndex), zap.String("error", err.Error()))
break
}
}
}
}

err = n.broadcaster.RemovePendingTx(res.Height, pendingTx.TxHash, pendingTx.Sequence, pendingTx.MsgTypes)
err = n.broadcaster.RemovePendingTx(pendingTx.Sequence)
if err != nil {
return err
}
n.logger.Info("tx inserted",
zap.Int64("height", height),
zap.Uint64("sequence", pendingTx.Sequence),
zap.String("tx_hash", pendingTx.TxHash),
zap.Strings("msg_types", pendingTx.MsgTypes),
zap.Int("pending_txs", n.broadcaster.LenLocalPendingTx()),
)
consecutiveErrors = 0
}
}
2 changes: 2 additions & 0 deletions types/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,3 +3,5 @@ package types
import "errors"

var ErrKeyNotSet = errors.New("key not set")
var ErrAccountSequenceMismatch = errors.New("account sequence mismatch")
var ErrTxNotFound = errors.New("tx not found")
Loading

0 comments on commit 5f2e7de

Please sign in to comment.