diff --git a/go.mod b/go.mod index efd45151d..d6b2f0eb1 100644 --- a/go.mod +++ b/go.mod @@ -3,6 +3,7 @@ module github.com/cosmos/relayer/v2 go 1.19 require ( + cosmossdk.io/api v0.3.1 cosmossdk.io/errors v1.0.0-beta.7 cosmossdk.io/math v1.0.0-beta.6.0.20230216172121-959ce49135e4 github.com/avast/retry-go/v4 v4.3.2 @@ -43,7 +44,6 @@ require ( cloud.google.com/go/compute/metadata v0.2.3 // indirect cloud.google.com/go/iam v0.12.0 // indirect cloud.google.com/go/storage v1.29.0 // indirect - cosmossdk.io/api v0.3.1 // indirect cosmossdk.io/core v0.5.1 // indirect cosmossdk.io/depinject v1.0.0-alpha.3 // indirect cosmossdk.io/tools/rosetta v0.2.1 // indirect diff --git a/relayer/chains/cosmos/broadcast.go b/relayer/chains/cosmos/broadcast.go index 06ff12ff9..b17c3d59f 100644 --- a/relayer/chains/cosmos/broadcast.go +++ b/relayer/chains/cosmos/broadcast.go @@ -1,16 +1,7 @@ package cosmos import ( - "context" - "errors" - "fmt" - "time" - - ctypes "github.com/cometbft/cometbft/rpc/core/types" - tmtypes "github.com/cometbft/cometbft/types" codectypes "github.com/cosmos/cosmos-sdk/codec/types" - sdk "github.com/cosmos/cosmos-sdk/types" - sdkerrors "github.com/cosmos/cosmos-sdk/types/errors" ) const ( @@ -21,115 +12,6 @@ type _err string func (e _err) Error() string { return string(e) } -type rpcTxBroadcaster interface { - Tx(ctx context.Context, hash []byte, prove bool) (*ctypes.ResultTx, error) - BroadcastTxSync(context.Context, tmtypes.Tx) (*ctypes.ResultBroadcastTx, error) - - // TODO: implement commit and async as well - // BroadcastTxCommit(context.Context, tmtypes.Tx) (*ctypes.ResultBroadcastTxCommit, error) - // BroadcastTxAsync(context.Context, tmtypes.Tx) (*ctypes.ResultBroadcastTx, error) -} - -func (cc *CosmosProvider) BroadcastTx(ctx context.Context, tx []byte) (*sdk.TxResponse, error) { - var ( - blockTimeout = defaultBroadcastWaitTimeout - err error - ) - - if cc.PCfg.BlockTimeout != "" { - blockTimeout, err = time.ParseDuration(cc.PCfg.BlockTimeout) - if err != nil { - // Did you call Validate() method on ChainClientConfig struct - // before coming here? - return nil, err - } - } - - return broadcastTx( - ctx, - cc.RPCClient, - cc.Codec.TxConfig.TxDecoder(), - tx, - blockTimeout, - ) -} - -// broadcastTx broadcasts a TX and then waits for the TX to be included in the block. -// The waiting will either be canceled after the waitTimeout has run out or the context -// exited. -func broadcastTx( - ctx context.Context, - broadcaster rpcTxBroadcaster, - txDecoder sdk.TxDecoder, - tx []byte, - waitTimeout time.Duration, -) (*sdk.TxResponse, error) { - // broadcast tx sync waits for check tx to pass - // NOTE: this can return w/ a timeout - // need to investigate if this will leave the tx - // in the mempool or we can retry the broadcast at that - // point - - syncRes, err := broadcaster.BroadcastTxSync(ctx, tx) - if err != nil { - if syncRes == nil { - // There are some cases where BroadcastTxSync will return an error but the associated - // ResultBroadcastTx will be nil. - return nil, err - } - return &sdk.TxResponse{ - Code: syncRes.Code, - Codespace: syncRes.Codespace, - TxHash: syncRes.Hash.String(), - }, err - } - - // ABCIError will return an error other than "unknown" if syncRes.Code is a registered error in syncRes.Codespace - // This catches all of the sdk errors https://github.com/cosmos/cosmos-sdk/blob/f10f5e5974d2ecbf9efc05bc0bfe1c99fdeed4b6/types/errors/errors.go - err = errors.Unwrap(sdkerrors.ABCIError(syncRes.Codespace, syncRes.Code, "error broadcasting transaction")) - if err.Error() != errUnknown { - return nil, err - } - - // TODO: maybe we need to check if the node has tx indexing enabled? - // if not, we need to find a new way to block until inclusion in a block - - // wait for tx to be included in a block - exitAfter := time.After(waitTimeout) - for { - select { - case <-exitAfter: - return nil, fmt.Errorf("timed out after: %d; %w", waitTimeout, ErrTimeoutAfterWaitingForTxBroadcast) - // TODO: this is potentially less than optimal and may - // be better as something configurable - case <-time.After(time.Millisecond * 100): - resTx, err := broadcaster.Tx(ctx, syncRes.Hash, false) - if err == nil { - return mkTxResult(txDecoder, resTx) - } - case <-ctx.Done(): - return nil, ctx.Err() - } - } -} - -func mkTxResult(txDecoder sdk.TxDecoder, resTx *ctypes.ResultTx) (*sdk.TxResponse, error) { - txb, err := txDecoder(resTx.Tx) - if err != nil { - return nil, err - } - p, ok := txb.(intoAny) - if !ok { - return nil, fmt.Errorf("expecting a type implementing intoAny, got: %T", txb) - } - any := p.AsAny() - // TODO: maybe don't make up the time here? - // we can fetch the block for the block time buts thats - // more round trips - // TODO: logs get rendered as base64 encoded, need to fix this somehow - return sdk.NewResponseResultTx(resTx, any, time.Now().Format(time.RFC3339)), nil -} - // Deprecated: this interface is used only internally for scenario we are // deprecating (StdTxConfig support) type intoAny interface { diff --git a/relayer/chains/cosmos/grpc_query.go b/relayer/chains/cosmos/grpc_query.go index 3a84eb47d..32658a337 100644 --- a/relayer/chains/cosmos/grpc_query.go +++ b/relayer/chains/cosmos/grpc_query.go @@ -5,9 +5,12 @@ import ( "fmt" "reflect" "strconv" + "sync" + "time" abci "github.com/cometbft/cometbft/abci/types" gogogrpc "github.com/cosmos/gogoproto/grpc" + "github.com/cosmos/relayer/v2/relayer/provider" "google.golang.org/grpc" "google.golang.org/grpc/codes" "google.golang.org/grpc/encoding" @@ -16,6 +19,7 @@ import ( "google.golang.org/grpc/status" "github.com/cosmos/cosmos-sdk/codec/types" + sdk "github.com/cosmos/cosmos-sdk/types" sdkerrors "github.com/cosmos/cosmos-sdk/types/errors" grpctypes "github.com/cosmos/cosmos-sdk/types/grpc" "github.com/cosmos/cosmos-sdk/types/tx" @@ -148,13 +152,49 @@ func (cc *CosmosProvider) TxServiceBroadcast(ctx context.Context, req *tx.Broadc return nil, status.Error(codes.InvalidArgument, "invalid empty tx") } - resp, err := cc.BroadcastTx(ctx, req.TxBytes) - if err != nil { + var ( + blockTimeout = defaultBroadcastWaitTimeout + err error + rlyResp *provider.RelayerTxResponse + callbackErr error + wg sync.WaitGroup + ) + + if cc.PCfg.BlockTimeout != "" { + blockTimeout, err = time.ParseDuration(cc.PCfg.BlockTimeout) + if err != nil { + // Did you call Validate() method on CosmosProviderConfig struct + // before coming here? + return nil, err + } + } + + callback := func(rtr *provider.RelayerTxResponse, err error) { + rlyResp = rtr + callbackErr = err + wg.Done() + } + + wg.Add(1) + + if err := cc.broadcastTx(ctx, req.TxBytes, nil, nil, ctx, blockTimeout, callback); err != nil { return nil, err } + wg.Wait() + + if callbackErr != nil { + return nil, callbackErr + } + return &tx.BroadcastTxResponse{ - TxResponse: resp, + TxResponse: &sdk.TxResponse{ + Height: rlyResp.Height, + TxHash: rlyResp.TxHash, + Codespace: rlyResp.Codespace, + Code: rlyResp.Code, + Data: rlyResp.Data, + }, }, nil } diff --git a/relayer/chains/cosmos/tx.go b/relayer/chains/cosmos/tx.go index 0f61e8b70..a20fb0cab 100644 --- a/relayer/chains/cosmos/tx.go +++ b/relayer/chains/cosmos/tx.go @@ -8,6 +8,7 @@ import ( "regexp" "strconv" "strings" + "sync" "time" "github.com/avast/retry-go/v4" @@ -80,105 +81,22 @@ func (cc *CosmosProvider) SendMessage(ctx context.Context, msg provider.RelayerM // of that transaction will be logged. A boolean indicating if a transaction was successfully // sent and executed successfully is returned. func (cc *CosmosProvider) SendMessages(ctx context.Context, msgs []provider.RelayerMessage, memo string) (*provider.RelayerTxResponse, bool, error) { - var resp *sdk.TxResponse - var fees sdk.Coins - - // Guard against account sequence number mismatch errors by locking for the specific wallet for - // the account sequence query all the way through the transaction broadcast success/fail. - cc.txMu.Lock() - defer cc.txMu.Unlock() - - if err := retry.Do(func() error { - txBytes, sequence, f, err := cc.buildMessages(ctx, msgs, memo) - fees = f - if err != nil { - errMsg := err.Error() - - // Account sequence mismatch errors can happen on the simulated transaction also. - if strings.Contains(errMsg, sdkerrors.ErrWrongSequence.Error()) { - cc.handleAccountSequenceMismatchError(err) - return err - } - - // Occasionally the client will be out of date, - // and we will receive an RPC error like: - // rpc error: code = InvalidArgument desc = failed to execute message; message index: 1: channel handshake open try failed: failed channel state verification for client (07-tendermint-0): client state height < proof height ({0 58} < {0 59}), please ensure the client has been updated: invalid height: invalid request - // or - // rpc error: code = InvalidArgument desc = failed to execute message; message index: 1: receive packet verification failed: couldn't verify counterparty packet commitment: failed packet commitment verification for client (07-tendermint-0): client state height < proof height ({0 142} < {0 143}), please ensure the client has been updated: invalid height: invalid request - // - // No amount of retrying will fix this. The client needs to be updated. - // Unfortunately, the entirety of that error message originates on the server, - // so there is not an obvious way to access a more structured error value. - // - // If this logic should ever fail due to the string values of the error messages on the server - // changing from the client's version of the library, - // at worst this will run more unnecessary retries. - if strings.Contains(errMsg, sdkerrors.ErrInvalidHeight.Error()) { - cc.log.Info( - "Skipping retry due to invalid height error", - zap.Error(err), - ) - return retry.Unrecoverable(err) - } - - // On a fast retry, it is possible to have an invalid connection state. - // Retrying that message also won't fix the underlying state mismatch, - // so log it and mark it as unrecoverable. - if strings.Contains(errMsg, conntypes.ErrInvalidConnectionState.Error()) { - cc.log.Info( - "Skipping retry due to invalid connection state", - zap.Error(err), - ) - return retry.Unrecoverable(err) - } - - // Also possible to have an invalid channel state on a fast retry. - if strings.Contains(errMsg, chantypes.ErrInvalidChannelState.Error()) { - cc.log.Info( - "Skipping retry due to invalid channel state", - zap.Error(err), - ) - return retry.Unrecoverable(err) - } - - // If the message reported an invalid proof, back off. - // NOTE: this error string ("invalid proof") will match other errors too, - // but presumably it is safe to stop retrying in those cases as well. - if strings.Contains(errMsg, commitmenttypes.ErrInvalidProof.Error()) { - cc.log.Info( - "Skipping retry due to invalid proof", - zap.Error(err), - ) - return retry.Unrecoverable(err) - } - - // Invalid packets should not be retried either. - if strings.Contains(errMsg, chantypes.ErrInvalidPacket.Error()) { - cc.log.Info( - "Skipping retry due to invalid packet", - zap.Error(err), - ) - return retry.Unrecoverable(err) - } - - return err - } - - resp, err = cc.BroadcastTx(ctx, txBytes) - if err != nil { - if strings.Contains(err.Error(), sdkerrors.ErrWrongSequence.Error()) { - cc.handleAccountSequenceMismatchError(err) - return err - } + var ( + rlyResp *provider.RelayerTxResponse + callbackErr error + wg sync.WaitGroup + ) - // Don't retry if BroadcastTx resulted in any other error. - return retry.Unrecoverable(err) - } + callback := func(rtr *provider.RelayerTxResponse, err error) { + rlyResp = rtr + callbackErr = err + wg.Done() + } - // we had a successful tx with this sequence, so update it to the next - cc.updateNextAccountSequence(sequence + 1) + wg.Add(1) - return nil + if err := retry.Do(func() error { + return cc.SendMessagesToMempool(ctx, msgs, memo, ctx, callback) }, retry.Context(ctx), rtyAtt, rtyDel, rtyErr, retry.OnRetry(func(n uint, err error) { cc.log.Info( "Error building or broadcasting transaction", @@ -187,32 +105,21 @@ func (cc *CosmosProvider) SendMessages(ctx context.Context, msgs []provider.Rela zap.Uint("max_attempts", rtyAttNum), zap.Error(err), ) - })); err != nil || resp == nil { + })); err != nil { return nil, false, err } - rlyResp := &provider.RelayerTxResponse{ - Height: resp.Height, - TxHash: resp.TxHash, - Codespace: resp.Codespace, - Code: resp.Code, - Data: resp.Data, - Events: parseEventsFromTxResponse(resp), + wg.Wait() + + if callbackErr != nil { + return rlyResp, false, callbackErr } - // transaction was executed, log the success or failure using the tx response code - // NOTE: error is nil, logic should use the returned error to determine if the - // transaction was successfully executed. if rlyResp.Code != 0 { - cc.LogFailedTx(rlyResp, nil, msgs) - cc.UpdateFeesSpent(cc.ChainId(), cc.Key(), fees) - return rlyResp, false, fmt.Errorf("transaction failed with code: %d", resp.Code) + return rlyResp, false, fmt.Errorf("transaction failed with code: %d", rlyResp.Code) } - cc.LogSuccessTx(resp, msgs) - cc.UpdateFeesSpent(cc.ChainId(), cc.Key(), fees) - - return rlyResp, true, nil + return rlyResp, true, callbackErr } // SendMessagesToMempool simulates and broadcasts a transaction with the given msgs and memo. @@ -281,8 +188,10 @@ func (cc *CosmosProvider) broadcastTx( asyncCallback func(*provider.RelayerTxResponse, error), // callback for success/fail of the wait for block inclusion ) error { res, err := cc.RPCClient.BroadcastTxSync(ctx, tx) - if err != nil { - if res == nil { + isErr := err != nil + isFailed := res != nil && res.Code != 0 + if isErr || isFailed { + if isErr && res == nil { // There are some cases where BroadcastTxSync will return an error but the associated // ResultBroadcastTx will be nil. return err @@ -293,6 +202,12 @@ func (cc *CosmosProvider) broadcastTx( Code: res.Code, Data: res.Data.String(), } + if isFailed { + err = cc.sdkError(res.Codespace, res.Code) + if err == nil { + err = fmt.Errorf("transaction failed to execute") + } + } cc.LogFailedTx(rlyResp, err, msgs) return err }