From 5f2e7de8d96ba69074fa2833a1a7d34e00c0dd6b Mon Sep 17 00:00:00 2001 From: Hoon <48665813+sh-cha@users.noreply.github.com> Date: Mon, 18 Nov 2024 21:41:35 +0900 Subject: [PATCH] Fix/tx checker (#44) * find tx using txsearch with tx hash * change sequence * query block header instead of block * handle tx failed * remove redunt sequence err --- node/broadcaster/account.go | 8 +-- node/broadcaster/broadcaster.go | 7 ++- node/broadcaster/process.go | 99 +++++++++++++-------------------- node/broadcaster/tx.go | 11 ++-- node/node.go | 32 +++++------ node/process.go | 78 ++++++++++++++++---------- types/errors.go | 2 + types/retry.go | 7 ++- 8 files changed, 124 insertions(+), 120 deletions(-) diff --git a/node/broadcaster/account.go b/node/broadcaster/account.go index c741a00..f0866f6 100644 --- a/node/broadcaster/account.go +++ b/node/broadcaster/account.go @@ -18,8 +18,8 @@ import ( var _ client.AccountRetriever = &Broadcaster{} -func (b *Broadcaster) loadAccount() error { - account, err := b.GetAccount(b.getClientCtx(), b.keyAddress) +func (b *Broadcaster) loadAccount(ctx context.Context) error { + account, err := b.GetAccount(b.getClientCtx(ctx), b.keyAddress) if err != nil { return err } @@ -45,7 +45,7 @@ func (b *Broadcaster) GetAccount(clientCtx client.Context, addr sdk.AccAddress) // GetAccountWithHeight queries for an account given an address. Returns the // height of the query with the account. An error is returned if the query // or decoding fails. -func (b *Broadcaster) GetAccountWithHeight(_ client.Context, addr sdk.AccAddress) (client.Account, int64, error) { +func (b *Broadcaster) GetAccountWithHeight(clienCtx client.Context, addr sdk.AccAddress) (client.Account, int64, error) { var header metadata.MD address, err := keys.EncodeBech32AccAddr(addr, b.cfg.Bech32Prefix) if err != nil { @@ -53,7 +53,7 @@ func (b *Broadcaster) GetAccountWithHeight(_ client.Context, addr sdk.AccAddress } queryClient := authtypes.NewQueryClient(b.rpcClient) - res, err := queryClient.Account(context.Background(), &authtypes.QueryAccountRequest{Address: address}, grpc.Header(&header)) + res, err := queryClient.Account(clienCtx.CmdContext, &authtypes.QueryAccountRequest{Address: address}, grpc.Header(&header)) if err != nil { return nil, 0, err } diff --git a/node/broadcaster/broadcaster.go b/node/broadcaster/broadcaster.go index 32bbbc8..b922120 100644 --- a/node/broadcaster/broadcaster.go +++ b/node/broadcaster/broadcaster.go @@ -122,12 +122,13 @@ func (b *Broadcaster) Initialize(ctx context.Context, status *rpccoretypes.Resul return b.prepareBroadcaster(ctx, status.SyncInfo.LatestBlockTime) } -func (b Broadcaster) getClientCtx() client.Context { +func (b Broadcaster) getClientCtx(ctx context.Context) client.Context { return client.Context{}.WithClient(b.rpcClient). WithInterfaceRegistry(b.cdc.InterfaceRegistry()). WithChainID(b.cfg.ChainID). WithCodec(b.cdc). - WithFromAddress(b.keyAddress) + WithFromAddress(b.keyAddress). + WithCmdContext(ctx) } func (b Broadcaster) GetTxf() tx.Factory { @@ -144,7 +145,7 @@ func (b *Broadcaster) prepareBroadcaster(ctx context.Context, lastBlockTime time WithKeybase(b.keyBase). WithSignMode(signing.SignMode_SIGN_MODE_DIRECT) - err := b.loadAccount() + err := b.loadAccount(ctx) if err != nil { return err } diff --git a/node/broadcaster/process.go b/node/broadcaster/process.go index 78998c9..16c10da 100644 --- a/node/broadcaster/process.go +++ b/node/broadcaster/process.go @@ -20,85 +20,60 @@ func (b Broadcaster) GetHeight() int64 { return b.lastProcessedBlockHeight + 1 } -// HandleNewBlock is called when a new block is received. -func (b *Broadcaster) HandleNewBlock(block *rpccoretypes.ResultBlock, blockResult *rpccoretypes.ResultBlockResults, latestChainHeight int64) error { - // check pending txs first - for _, tx := range block.Block.Txs { - if b.LenLocalPendingTx() == 0 { - break +// CheckPendingTx query tx info to check if pending tx is processed. +func (b *Broadcaster) CheckPendingTx(ctx context.Context, pendingTx btypes.PendingTxInfo) (*rpccoretypes.ResultTx, time.Time, error) { + txHash, err := hex.DecodeString(pendingTx.TxHash) + if err != nil { + return nil, time.Time{}, err + } + res, txerr := b.rpcClient.QueryTx(ctx, txHash) + if txerr != nil { + // if the tx is not found, it means the tx is not processed yet + // or the tx is not indexed by the node in rare cases. + lastHeader, err := b.rpcClient.Header(ctx, nil) + if err != nil { + return nil, time.Time{}, err } - - // check if the first pending tx is included in the block - if pendingTx := b.peekLocalPendingTx(); btypes.TxHash(tx) == pendingTx.TxHash { - err := b.RemovePendingTx(block.Block.Height, pendingTx.TxHash, pendingTx.Sequence, pendingTx.MsgTypes) + pendingTxTime := time.Unix(0, pendingTx.Timestamp) + + // before timeout + if lastHeader.Header.Time.Before(pendingTxTime.Add(b.cfg.TxTimeout)) { + b.logger.Debug("failed to query tx", zap.String("tx_hash", pendingTx.TxHash), zap.String("error", txerr.Error())) + return nil, time.Time{}, types.ErrTxNotFound + } else { + // timeout case + account, err := b.GetAccount(b.getClientCtx(ctx), b.keyAddress) if err != nil { - return err + return nil, time.Time{}, err } - } - } - // check timeout of pending txs - // @sh-cha: should we rebroadcast pending txs? or raising monitoring alert? - if length := b.LenLocalPendingTx(); length > 0 { - b.logger.Debug("remaining pending txs", zap.Int64("height", block.Block.Height), zap.Int("count", length)) - pendingTxTime := time.Unix(0, b.peekLocalPendingTx().Timestamp) - if block.Block.Time.After(pendingTxTime.Add(b.cfg.TxTimeout)) { - panic(fmt.Errorf("something wrong, pending txs are not processed for a long time; current block time: %s, pending tx processing time: %s", block.Block.Time.UTC().String(), pendingTxTime.UTC().String())) + // if sequence is larger than the sequence of the pending tx, + // handle it as the tx has already been processed + if pendingTx.Sequence < account.GetSequence() { + return nil, time.Time{}, nil + } + panic(fmt.Errorf("something wrong, pending txs are not processed for a long time; current block time: %s, pending tx processing time: %s", time.Now().UTC().String(), pendingTxTime.UTC().String())) } + } else if res.TxResult.Code != 0 { + panic(fmt.Errorf("tx failed, tx hash: %s, code: %d, log: %s; you might need to check gas adjustment config or balance", pendingTx.TxHash, res.TxResult.Code, res.TxResult.Log)) } - // update last processed block height - b.lastProcessedBlockHeight = latestChainHeight - - return nil -} - -// CheckPendingTx query tx info to check if pending tx is processed. -func (b *Broadcaster) CheckPendingTx(ctx context.Context) (*btypes.PendingTxInfo, *rpccoretypes.ResultTx, time.Time, error) { - if b.LenLocalPendingTx() == 0 { - return nil, nil, time.Time{}, nil - } - - pendingTx := b.peekLocalPendingTx() - pendingTxTime := time.Unix(0, b.peekLocalPendingTx().Timestamp) - - lastBlockResult, err := b.rpcClient.Block(ctx, nil) - if err != nil { - return nil, nil, time.Time{}, err - } - if lastBlockResult.Block.Time.After(pendingTxTime.Add(b.cfg.TxTimeout)) { - // @sh-cha: should we rebroadcast pending txs? or raising monitoring alert? - panic(fmt.Errorf("something wrong, pending txs are not processed for a long time; current block time: %s, pending tx processing time: %s", time.Now().UTC().String(), pendingTxTime.UTC().String())) - } - - txHash, err := hex.DecodeString(pendingTx.TxHash) - if err != nil { - return nil, nil, time.Time{}, err - } - res, err := b.rpcClient.QueryTx(ctx, txHash) - if err != nil { - b.logger.Debug("failed to query tx", zap.String("tx_hash", pendingTx.TxHash), zap.String("error", err.Error())) - return nil, nil, time.Time{}, nil - } - - blockResult, err := b.rpcClient.Block(ctx, &res.Height) + header, err := b.rpcClient.Header(ctx, &res.Height) if err != nil { - return nil, nil, time.Time{}, err + return nil, time.Time{}, err } - return &pendingTx, res, blockResult.Block.Time, nil + return res, header.Header.Time, nil } // RemovePendingTx remove pending tx from local pending txs. // It is called when the pending tx is included in the block. -func (b *Broadcaster) RemovePendingTx(blockHeight int64, txHash string, sequence uint64, msgTypes []string) error { +func (b *Broadcaster) RemovePendingTx(sequence uint64) error { err := b.deletePendingTx(sequence) if err != nil { return err } - b.logger.Info("tx inserted", zap.Int64("height", blockHeight), zap.Uint64("sequence", sequence), zap.String("tx_hash", txHash), zap.Strings("msg_types", msgTypes)) b.dequeueLocalPendingTx() - return nil } @@ -124,7 +99,9 @@ func (b *Broadcaster) Start(ctx context.Context) error { break } b.logger.Warn(fmt.Sprintf("retry to handle processed msgs after %d seconds", int(2*math.Exp2(float64(retry)))), zap.Int("count", retry), zap.String("error", err.Error())) - types.SleepWithRetry(ctx, retry) + if types.SleepWithRetry(ctx, retry) { + return nil + } } if err != nil { return errors.Wrap(err, "failed to handle processed msgs") diff --git a/node/broadcaster/tx.go b/node/broadcaster/tx.go index eb2ef15..e16f7dc 100644 --- a/node/broadcaster/tx.go +++ b/node/broadcaster/tx.go @@ -10,6 +10,8 @@ import ( "go.uber.org/zap" + "github.com/pkg/errors" + sdkerrors "cosmossdk.io/errors" abci "github.com/cometbft/cometbft/abci/types" @@ -50,8 +52,6 @@ func (b *Broadcaster) handleMsgError(err error) error { b.txf = b.txf.WithSequence(expected) } - // account sequence mismatched - // TODO: handle mismatched sequence return err } @@ -237,11 +237,14 @@ func (b *Broadcaster) enqueueLocalPendingTx(tx btypes.PendingTxInfo) { b.pendingTxs = append(b.pendingTxs, tx) } -func (b *Broadcaster) peekLocalPendingTx() btypes.PendingTxInfo { +func (b *Broadcaster) PeekLocalPendingTx() (btypes.PendingTxInfo, error) { b.pendingTxMu.Lock() defer b.pendingTxMu.Unlock() - return b.pendingTxs[0] + if len(b.pendingTxs) == 0 { + return btypes.PendingTxInfo{}, errors.New("no pending txs") + } + return b.pendingTxs[0], nil } func (b Broadcaster) LenLocalPendingTx() int { diff --git a/node/node.go b/node/node.go index c1065ed..8b877d0 100644 --- a/node/node.go +++ b/node/node.go @@ -134,23 +134,9 @@ func (n *Node) Start(ctx context.Context) { n.broadcaster.BroadcastPendingProcessedMsgs() } - if n.cfg.ProcessType == nodetypes.PROCESS_TYPE_ONLY_BROADCAST { - if n.broadcaster == nil { - panic("broadcaster cannot be nil with nodetypes.PROCESS_TYPE_ONLY_BROADCAST") - } - - errGrp.Go(func() (err error) { - defer func() { - n.logger.Info("tx checker looper stopped") - if r := recover(); r != nil { - n.logger.Error("tx checker panic", zap.Any("recover", r)) - err = fmt.Errorf("tx checker panic: %v", r) - } - }() - - return n.txChecker(ctx) - }) - } else { + enableEventHandler := true + if n.cfg.ProcessType != nodetypes.PROCESS_TYPE_ONLY_BROADCAST { + enableEventHandler = false errGrp.Go(func() (err error) { defer func() { n.logger.Info("block process looper stopped") @@ -163,6 +149,18 @@ func (n *Node) Start(ctx context.Context) { return n.blockProcessLooper(ctx, n.cfg.ProcessType) }) } + + errGrp.Go(func() (err error) { + defer func() { + n.logger.Info("tx checker looper stopped") + if r := recover(); r != nil { + n.logger.Error("tx checker panic", zap.Any("recover", r)) + err = fmt.Errorf("tx checker panic: %v", r) + } + }() + + return n.txChecker(ctx, enableEventHandler) + }) } func (n Node) AccountCodec() address.Codec { diff --git a/node/process.go b/node/process.go index 6d08d47..9613d46 100644 --- a/node/process.go +++ b/node/process.go @@ -24,7 +24,9 @@ func (n *Node) blockProcessLooper(ctx context.Context, processType nodetypes.Blo case <-ctx.Done(): return nil case <-timer.C: - types.SleepWithRetry(ctx, consecutiveErrors) + if types.SleepWithRetry(ctx, consecutiveErrors) { + return nil + } consecutiveErrors++ } @@ -130,14 +132,6 @@ func (n *Node) handleNewBlock(ctx context.Context, block *rpccoretypes.ResultBlo return err } - // handle broadcaster first to check pending txs - if n.broadcaster != nil { - err := n.broadcaster.HandleNewBlock(block, blockResult, latestChainHeight) - if err != nil { - return err - } - } - if n.beginBlockHandler != nil { err := n.beginBlockHandler(ctx, nodetypes.BeginBlockArgs{ BlockID: block.BlockID.Hash, @@ -212,7 +206,10 @@ func (n *Node) handleEvent(ctx context.Context, blockHeight int64, blockTime tim } // txChecker checks pending txs and handle events if the tx is included in the block -func (n *Node) txChecker(ctx context.Context) error { +// in the case that the tx hash is not indexed by the node even if the tx is processed, +// event handler will not be called. +// so, it is recommended to use the event handler only for the check event (e.g. logs) +func (n *Node) txChecker(ctx context.Context, enableEventHandler bool) error { if n.broadcaster == nil { return nil } @@ -225,39 +222,64 @@ func (n *Node) txChecker(ctx context.Context) error { case <-ctx.Done(): return nil case <-timer.C: - types.SleepWithRetry(ctx, consecutiveErrors) + if n.broadcaster.LenLocalPendingTx() == 0 { + continue + } + + n.logger.Debug("remaining pending txs", zap.Int("count", n.broadcaster.LenLocalPendingTx())) + + if types.SleepWithRetry(ctx, consecutiveErrors) { + return nil + } consecutiveErrors++ } - pendingTx, res, blockTime, err := n.broadcaster.CheckPendingTx(ctx) + pendingTx, err := n.broadcaster.PeekLocalPendingTx() if err != nil { return err - } else if pendingTx == nil || res == nil { - // tx not found - continue } - if len(n.eventHandlers) != 0 { - events := res.TxResult.GetEvents() - for eventIndex, event := range events { - select { - case <-ctx.Done(): - return nil - default: - } + height := int64(0) - err := n.handleEvent(ctx, res.Height, blockTime, 0, event) - if err != nil { - n.logger.Error("failed to handle event", zap.String("tx_hash", pendingTx.TxHash), zap.Int("event_index", eventIndex), zap.String("error", err.Error())) - break + res, blockTime, err := n.broadcaster.CheckPendingTx(ctx, pendingTx) + if errors.Is(err, types.ErrTxNotFound) { + // tx not found + continue + } else if err != nil { + return err + } else if res != nil { + // tx found + height = res.Height + // it only handles the tx if node is only broadcasting txs, not processing blocks + if enableEventHandler && len(n.eventHandlers) != 0 { + events := res.TxResult.GetEvents() + for eventIndex, event := range events { + select { + case <-ctx.Done(): + return nil + default: + } + + err := n.handleEvent(ctx, res.Height, blockTime, 0, event) + if err != nil { + n.logger.Error("failed to handle event", zap.String("tx_hash", pendingTx.TxHash), zap.Int("event_index", eventIndex), zap.String("error", err.Error())) + break + } } } } - err = n.broadcaster.RemovePendingTx(res.Height, pendingTx.TxHash, pendingTx.Sequence, pendingTx.MsgTypes) + err = n.broadcaster.RemovePendingTx(pendingTx.Sequence) if err != nil { return err } + n.logger.Info("tx inserted", + zap.Int64("height", height), + zap.Uint64("sequence", pendingTx.Sequence), + zap.String("tx_hash", pendingTx.TxHash), + zap.Strings("msg_types", pendingTx.MsgTypes), + zap.Int("pending_txs", n.broadcaster.LenLocalPendingTx()), + ) consecutiveErrors = 0 } } diff --git a/types/errors.go b/types/errors.go index d8d8ee1..76dce6f 100644 --- a/types/errors.go +++ b/types/errors.go @@ -3,3 +3,5 @@ package types import "errors" var ErrKeyNotSet = errors.New("key not set") +var ErrAccountSequenceMismatch = errors.New("account sequence mismatch") +var ErrTxNotFound = errors.New("tx not found") diff --git a/types/retry.go b/types/retry.go index f874ec2..1174934 100644 --- a/types/retry.go +++ b/types/retry.go @@ -9,12 +9,12 @@ import ( const MaxRetryCount = 7 -func SleepWithRetry(ctx context.Context, retry int) { +func SleepWithRetry(ctx context.Context, retry int) bool { // to avoid to sleep too long if retry > MaxRetryCount { retry = MaxRetryCount } else if retry == 0 { - return + return false } sleepTime := 2 * math.Exp2(float64(retry)) @@ -23,7 +23,8 @@ func SleepWithRetry(ctx context.Context, retry int) { defer timer.Stop() select { case <-ctx.Done(): - return + return true case <-timer.C: } + return false }