Skip to content

Commit

Permalink
refactor: signer to use txstatus (#3767)
Browse files Browse the repository at this point in the history
  • Loading branch information
ninabarbakadze authored Aug 16, 2024
1 parent dec69e4 commit a28b9e7
Show file tree
Hide file tree
Showing 9 changed files with 143 additions and 57 deletions.
3 changes: 2 additions & 1 deletion app/test/big_blob_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,8 @@ func (s *BigBlobSuite) TestErrBlobsTooLarge() {
res, err := txClient.SubmitPayForBlob(subCtx, []*share.Blob{tc.blob}, user.SetGasLimitAndGasPrice(1e9, appconsts.DefaultMinGasPrice))
require.Error(t, err)
require.NotNil(t, res)
require.Equal(t, tc.want, res.Code, res.Logs)
// FIXME: assert RawLog once TxStatus supports it.
require.Equal(t, tc.want, res.Code)
})
}
}
8 changes: 7 additions & 1 deletion app/test/prepare_proposal_context_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"github.com/cosmos/cosmos-sdk/crypto/hd"
"github.com/cosmos/cosmos-sdk/crypto/keyring"
sdk "github.com/cosmos/cosmos-sdk/types"
sdktx "github.com/cosmos/cosmos-sdk/types/tx"
vestingtypes "github.com/cosmos/cosmos-sdk/x/auth/vesting/types"
banktypes "github.com/cosmos/cosmos-sdk/x/bank/types"
"github.com/stretchr/testify/assert"
Expand Down Expand Up @@ -80,8 +81,13 @@ func TestTimeInPrepareProposalContext(t *testing.T) {
msgs, _ := tt.msgFunc()
res, err := txClient.SubmitTx(cctx.GoContext(), msgs, user.SetGasLimit(1000000), user.SetFee(2000))
require.NoError(t, err)
// FIXME: Temporary way of querying the raw log.
// TxStatus will natively support this in the future.
serviceClient := sdktx.NewServiceClient(cctx.GRPCClient)
getTxResp, err := serviceClient.GetTx(cctx.GoContext(), &sdktx.GetTxRequest{Hash: res.TxHash})
require.NoError(t, err)
require.NotNil(t, res)
assert.Equal(t, abci.CodeTypeOK, res.Code, res.RawLog)
assert.Equal(t, abci.CodeTypeOK, res.Code, getTxResp.TxResponse.RawLog)
})
}
}
8 changes: 7 additions & 1 deletion app/test/square_size_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"github.com/celestiaorg/celestia-app/v3/test/util/testnode"
blobtypes "github.com/celestiaorg/celestia-app/v3/x/blob/types"
sdk "github.com/cosmos/cosmos-sdk/types"
sdktx "github.com/cosmos/cosmos-sdk/types/tx"
v1 "github.com/cosmos/cosmos-sdk/x/gov/types/v1"
oldgov "github.com/cosmos/cosmos-sdk/x/gov/types/v1beta1"
"github.com/cosmos/cosmos-sdk/x/params/types/proposal"
Expand Down Expand Up @@ -173,7 +174,12 @@ func (s *SquareSizeIntegrationTest) setBlockSizeParams(t *testing.T, squareSize,

res, err := txClient.SubmitTx(s.cctx.GoContext(), []sdk.Msg{msg}, blobfactory.DefaultTxOpts()...)
require.NoError(t, err)
require.Equal(t, res.Code, abci.CodeTypeOK, res.RawLog)
// FIXME: Temporary way of querying the raw log.
// TxStatus will natively support this in the future.
serviceClient := sdktx.NewServiceClient(s.cctx.GRPCClient)
getTxResp, err := serviceClient.GetTx(s.cctx.GoContext(), &sdktx.GetTxRequest{Hash: res.TxHash})
require.NoError(t, err)
require.Equal(t, res.Code, abci.CodeTypeOK, getTxResp.TxResponse.RawLog)

require.NoError(t, s.cctx.WaitForNextBlock())

Expand Down
8 changes: 7 additions & 1 deletion app/test/std_sdk_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"github.com/cosmos/cosmos-sdk/crypto/keyring"
"github.com/cosmos/cosmos-sdk/testutil/mock"
sdk "github.com/cosmos/cosmos-sdk/types"
sdktx "github.com/cosmos/cosmos-sdk/types/tx"
vestingtypes "github.com/cosmos/cosmos-sdk/x/auth/vesting/types"
banktypes "github.com/cosmos/cosmos-sdk/x/bank/types"
disttypes "github.com/cosmos/cosmos-sdk/x/distribution/types"
Expand Down Expand Up @@ -325,8 +326,13 @@ func (s *StandardSDKIntegrationTestSuite) TestStandardSDK() {
} else {
require.NoError(t, err)
}
// FIXME: Temporary way of querying the raw log.
// TxStatus will natively support this in the future.
serviceClient := sdktx.NewServiceClient(s.cctx.GRPCClient)
getTxResp, err := serviceClient.GetTx(s.cctx.GoContext(), &sdktx.GetTxRequest{Hash: res.TxHash})
require.NoError(t, err)
require.NotNil(t, res)
assert.Equal(t, tt.expectedCode, res.Code, res.RawLog)
assert.Equal(t, tt.expectedCode, res.Code, getTxResp.TxResponse.RawLog)
})
}
}
Expand Down
85 changes: 57 additions & 28 deletions pkg/user/tx_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"github.com/celestiaorg/celestia-app/v3/app"
"github.com/celestiaorg/celestia-app/v3/app/encoding"
apperrors "github.com/celestiaorg/celestia-app/v3/app/errors"
"github.com/celestiaorg/celestia-app/v3/app/grpc/tx"
"github.com/celestiaorg/celestia-app/v3/pkg/appconsts"
"github.com/celestiaorg/celestia-app/v3/x/blob/types"
"github.com/celestiaorg/celestia-app/v3/x/minfee"
Expand All @@ -39,6 +40,14 @@ const (

type Option func(client *TxClient)

// TxResponse is a response from the chain after
// a transaction has been submitted.
type TxResponse struct {
Height int64
TxHash string
Code uint32
}

// WithGasMultiplier is a functional option allows to configure the gas multiplier.
func WithGasMultiplier(multiplier float64) Option {
return func(c *TxClient) {
Expand Down Expand Up @@ -200,19 +209,25 @@ func SetupTxClient(

// SubmitPayForBlob forms a transaction from the provided blobs, signs it, and submits it to the chain.
// TxOptions may be provided to set the fee and gas limit.
func (client *TxClient) SubmitPayForBlob(ctx context.Context, blobs []*share.Blob, opts ...TxOption) (*sdktypes.TxResponse, error) {
func (client *TxClient) SubmitPayForBlob(ctx context.Context, blobs []*share.Blob, opts ...TxOption) (*TxResponse, error) {
resp, err := client.BroadcastPayForBlob(ctx, blobs, opts...)
if err != nil {
return resp, err
if err != nil && resp != nil {
return &TxResponse{Code: resp.Code, TxHash: resp.TxHash}, fmt.Errorf("failed to broadcast pay for blob: %v", err)
} else if err != nil {
return &TxResponse{}, fmt.Errorf("failed to broadcast pay for blob: %v", err)
}

return client.ConfirmTx(ctx, resp.TxHash)
}

func (client *TxClient) SubmitPayForBlobWithAccount(ctx context.Context, account string, blobs []*share.Blob, opts ...TxOption) (*sdktypes.TxResponse, error) {
// SubmitPayForBlobWithAccount forms a transaction from the provided blobs, signs it with the provided account, and submits it to the chain.
// TxOptions may be provided to set the fee and gas limit.
func (client *TxClient) SubmitPayForBlobWithAccount(ctx context.Context, account string, blobs []*share.Blob, opts ...TxOption) (*TxResponse, error) {
resp, err := client.BroadcastPayForBlobWithAccount(ctx, account, blobs, opts...)
if err != nil {
return resp, err
if err != nil && resp != nil {
return &TxResponse{Code: resp.Code, TxHash: resp.TxHash}, fmt.Errorf("failed to broadcast pay for blob with account: %v", err)
} else if err != nil {
return &TxResponse{}, fmt.Errorf("failed to broadcast pay for blob with account: %v", err)
}

return client.ConfirmTx(ctx, resp.TxHash)
Expand Down Expand Up @@ -253,10 +268,12 @@ func (client *TxClient) BroadcastPayForBlobWithAccount(ctx context.Context, acco

// SubmitTx forms a transaction from the provided messages, signs it, and submits it to the chain. TxOptions
// may be provided to set the fee and gas limit.
func (client *TxClient) SubmitTx(ctx context.Context, msgs []sdktypes.Msg, opts ...TxOption) (*sdktypes.TxResponse, error) {
func (client *TxClient) SubmitTx(ctx context.Context, msgs []sdktypes.Msg, opts ...TxOption) (*TxResponse, error) {
resp, err := client.BroadcastTx(ctx, msgs, opts...)
if err != nil {
return resp, err
if err != nil && resp != nil {
return &TxResponse{Code: resp.Code, TxHash: resp.TxHash}, fmt.Errorf("failed to broadcast tx: %v", err)
} else if err != nil {
return &TxResponse{}, fmt.Errorf("failed to broadcast tx: %v", err)
}

return client.ConfirmTx(ctx, resp.TxHash)
Expand Down Expand Up @@ -414,32 +431,44 @@ func (client *TxClient) retryBroadcastingTx(ctx context.Context, txBytes []byte)
// ConfirmTx periodically pings the provided node for the commitment of a transaction by its
// hash. It will continually loop until the context is cancelled, the tx is found or an error
// is encountered.
func (client *TxClient) ConfirmTx(ctx context.Context, txHash string) (*sdktypes.TxResponse, error) {
txClient := sdktx.NewServiceClient(client.grpc)
func (client *TxClient) ConfirmTx(ctx context.Context, txHash string) (*TxResponse, error) {
txClient := tx.NewTxClient(client.grpc)

pollTicker := time.NewTicker(client.pollTime)
defer pollTicker.Stop()

for {
resp, err := txClient.GetTx(ctx, &sdktx.GetTxRequest{Hash: txHash})
if err == nil {
if resp.TxResponse.Code != 0 {
return resp.TxResponse, fmt.Errorf("tx was included but failed with code %d: %s", resp.TxResponse.Code, resp.TxResponse.RawLog)
}
return resp.TxResponse, nil
}
// FIXME: this is a relatively brittle of working out whether to retry or not. The tx might be not found for other
// reasons. It may have been removed from the mempool at a later point. We should build an endpoint that gives the
// signer more information on the status of their transaction and then update the logic here
if !strings.Contains(err.Error(), "not found") {
return &sdktypes.TxResponse{}, err
resp, err := txClient.TxStatus(ctx, &tx.TxStatusRequest{TxId: txHash})
if err != nil {
return &TxResponse{}, err
}

// Wait for the next round.
select {
case <-ctx.Done():
return &sdktypes.TxResponse{}, ctx.Err()
case <-pollTicker.C:
if err == nil && resp != nil {
switch resp.Status {
// FIXME: replace hardcoded status with constants
case "PENDING":
// Continue polling if the transaction is still pending
select {
case <-ctx.Done():
return &TxResponse{}, ctx.Err()
case <-pollTicker.C:
continue
}
case "COMMITTED":
txResponse := &TxResponse{
Height: resp.Height,
TxHash: txHash,
Code: resp.ExecutionCode,
}
if resp.ExecutionCode != 0 {
return txResponse, fmt.Errorf("tx was included but failed with code %d: %s", resp.ExecutionCode, resp.Status)
}
return txResponse, nil
case "EVICTED":
return &TxResponse{}, fmt.Errorf("tx: %s was evicted from the mempool", txHash)
default:
return &TxResponse{}, fmt.Errorf("unknown tx: %s", txHash)
}
}
}
}
Expand Down
59 changes: 42 additions & 17 deletions pkg/user/tx_client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"time"

sdk "github.com/cosmos/cosmos-sdk/types"
sdktx "github.com/cosmos/cosmos-sdk/types/tx"
bank "github.com/cosmos/cosmos-sdk/x/bank/types"
"github.com/stretchr/testify/require"
"github.com/stretchr/testify/suite"
Expand All @@ -30,9 +31,10 @@ func TestTxClientTestSuite(t *testing.T) {
type TxClientTestSuite struct {
suite.Suite

ctx testnode.Context
encCfg encoding.Config
txClient *user.TxClient
ctx testnode.Context
encCfg encoding.Config
txClient *user.TxClient
serviceClient sdktx.ServiceClient
}

func (suite *TxClientTestSuite) SetupSuite() {
Expand All @@ -45,6 +47,9 @@ func (suite *TxClientTestSuite) SetupSuite() {
suite.Require().NoError(err)
suite.txClient, err = user.SetupTxClient(suite.ctx.GoContext(), suite.ctx.Keyring, suite.ctx.GRPCClient, suite.encCfg, user.WithGasMultiplier(1.2))
suite.Require().NoError(err)
// FIXME: Temporary way of querying the raw log.
// TxStatus will natively support this in the future.
suite.serviceClient = sdktx.NewServiceClient(suite.ctx.GRPCClient)
}

func (suite *TxClientTestSuite) TestSubmitPayForBlob() {
Expand All @@ -57,24 +62,30 @@ func (suite *TxClientTestSuite) TestSubmitPayForBlob() {
t.Run("submit blob without provided fee and gas limit", func(t *testing.T) {
resp, err := suite.txClient.SubmitPayForBlob(subCtx, blobs)
require.NoError(t, err)
getTxResp, err := suite.serviceClient.GetTx(subCtx, &sdktx.GetTxRequest{Hash: resp.TxHash})
require.NoError(t, err)
require.EqualValues(t, 0, resp.Code)
require.Greater(t, resp.GasWanted, int64(0))
require.Greater(t, getTxResp.TxResponse.GasWanted, int64(0))
})

t.Run("submit blob with provided fee and gas limit", func(t *testing.T) {
fee := user.SetFee(1e6)
gas := user.SetGasLimit(1e6)
resp, err := suite.txClient.SubmitPayForBlob(subCtx, blobs, fee, gas)
require.NoError(t, err)
getTxResp, err := suite.serviceClient.GetTx(subCtx, &sdktx.GetTxRequest{Hash: resp.TxHash})
require.NoError(t, err)
require.EqualValues(t, 0, resp.Code)
require.EqualValues(t, resp.GasWanted, 1e6)
require.EqualValues(t, getTxResp.TxResponse.GasWanted, 1e6)
})

t.Run("submit blob with different account", func(t *testing.T) {
resp, err := suite.txClient.SubmitPayForBlobWithAccount(subCtx, "c", blobs, user.SetFee(1e6), user.SetGasLimit(1e6))
require.NoError(t, err)
getTxResp, err := suite.serviceClient.GetTx(subCtx, &sdktx.GetTxRequest{Hash: resp.TxHash})
require.NoError(t, err)
require.EqualValues(t, 0, resp.Code)
require.EqualValues(t, resp.GasWanted, 1e6)
require.EqualValues(t, getTxResp.TxResponse.GasWanted, 1e6)
})

t.Run("try submit a blob with an account that doesn't exist", func(t *testing.T) {
Expand All @@ -96,14 +107,18 @@ func (suite *TxClientTestSuite) TestSubmitTx() {
resp, err := suite.txClient.SubmitTx(suite.ctx.GoContext(), []sdk.Msg{msg})
require.NoError(t, err)
require.Equal(t, abci.CodeTypeOK, resp.Code)
require.Greater(t, resp.GasWanted, int64(0))
getTxResp, err := suite.serviceClient.GetTx(suite.ctx.GoContext(), &sdktx.GetTxRequest{Hash: resp.TxHash})
require.NoError(t, err)
require.Greater(t, getTxResp.TxResponse.GasWanted, int64(0))
})

t.Run("submit tx with provided gas limit", func(t *testing.T) {
resp, err := suite.txClient.SubmitTx(suite.ctx.GoContext(), []sdk.Msg{msg}, gasLimitOption)
require.NoError(t, err)
require.Equal(t, abci.CodeTypeOK, resp.Code)
require.EqualValues(t, gasLimit, resp.GasWanted)
getTxResp, err := suite.serviceClient.GetTx(suite.ctx.GoContext(), &sdktx.GetTxRequest{Hash: resp.TxHash})
require.NoError(t, err)
require.EqualValues(t, int64(gasLimit), getTxResp.TxResponse.GasWanted)
})

t.Run("submit tx with provided fee", func(t *testing.T) {
Expand All @@ -116,7 +131,9 @@ func (suite *TxClientTestSuite) TestSubmitTx() {
resp, err := suite.txClient.SubmitTx(suite.ctx.GoContext(), []sdk.Msg{msg}, feeOption, gasLimitOption)
require.NoError(t, err)
require.Equal(t, abci.CodeTypeOK, resp.Code)
require.EqualValues(t, gasLimit, resp.GasWanted)
getTxResp, err := suite.serviceClient.GetTx(suite.ctx.GoContext(), &sdktx.GetTxRequest{Hash: resp.TxHash})
require.NoError(t, err)
require.EqualValues(t, int64(gasLimit), getTxResp.TxResponse.GasWanted)
})

t.Run("submit tx with a different account", func(t *testing.T) {
Expand Down Expand Up @@ -145,16 +162,21 @@ func (suite *TxClientTestSuite) TestConfirmTx() {
t.Run("deadline exceeded when the context times out", func(t *testing.T) {
ctx, cancel := context.WithTimeout(suite.ctx.GoContext(), time.Second)
defer cancel()
_, err := suite.txClient.ConfirmTx(ctx, "E32BD15CAF57AF15D17B0D63CF4E63A9835DD1CEBB059C335C79586BC3013728")

msg := bank.NewMsgSend(suite.txClient.DefaultAddress(), testnode.RandomAddress().(sdk.AccAddress), sdk.NewCoins(sdk.NewInt64Coin(app.BondDenom, 10)))
resp, err := suite.txClient.BroadcastTx(ctx, []sdk.Msg{msg})
require.NoError(t, err)

_, err = suite.txClient.ConfirmTx(ctx, resp.TxHash)
require.Error(t, err)
require.Contains(t, err.Error(), context.DeadlineExceeded.Error())
})

t.Run("should error when tx is not found", func(t *testing.T) {
ctx, cancel := context.WithTimeout(suite.ctx.GoContext(), 5*time.Second)
defer cancel()
_, err := suite.txClient.ConfirmTx(ctx, "not found tx")
require.Error(t, err)
_, err := suite.txClient.ConfirmTx(ctx, "E32BD15CAF57AF15D17B0D63CF4E63A9835DD1CEBB059C335C79586BC3013728")
require.Contains(t, err.Error(), "unknown tx: E32BD15CAF57AF15D17B0D63CF4E63A9835DD1CEBB059C335C79586BC3013728")
})

t.Run("should success when tx is found immediately", func(t *testing.T) {
Expand All @@ -165,9 +187,9 @@ func (suite *TxClientTestSuite) TestConfirmTx() {
require.NotNil(t, resp)
ctx, cancel := context.WithTimeout(suite.ctx.GoContext(), 30*time.Second)
defer cancel()
resp, err = suite.txClient.ConfirmTx(ctx, resp.TxHash)
confirmTxResp, err := suite.txClient.ConfirmTx(ctx, resp.TxHash)
require.NoError(t, err)
require.Equal(t, abci.CodeTypeOK, resp.Code)
require.Equal(t, abci.CodeTypeOK, confirmTxResp.Code)
})

t.Run("should error when tx is found with a non-zero error code", func(t *testing.T) {
Expand All @@ -178,9 +200,9 @@ func (suite *TxClientTestSuite) TestConfirmTx() {
resp, err := suite.txClient.BroadcastTx(suite.ctx.GoContext(), []sdk.Msg{msg}, fee, gas)
require.NoError(t, err)
require.NotNil(t, resp)
resp, err = suite.txClient.ConfirmTx(suite.ctx.GoContext(), resp.TxHash)
confirmTxResp, err := suite.txClient.ConfirmTx(suite.ctx.GoContext(), resp.TxHash)
require.Error(t, err)
require.NotEqual(t, abci.CodeTypeOK, resp.Code)
require.NotEqual(t, abci.CodeTypeOK, confirmTxResp.Code)
})
}

Expand Down Expand Up @@ -221,8 +243,11 @@ func (suite *TxClientTestSuite) TestGasConsumption() {
amountDeducted := balanceBefore - balanceAfter - utiaToSend
require.Equal(t, int64(fee), amountDeducted)

res, err := suite.serviceClient.GetTx(suite.ctx.GoContext(), &sdktx.GetTxRequest{Hash: resp.TxHash})
require.NoError(t, err)

// verify that the amount deducted does not depend on the actual gas used.
gasUsedBasedDeduction := resp.GasUsed * gasPrice
gasUsedBasedDeduction := res.TxResponse.GasUsed * gasPrice
require.NotEqual(t, gasUsedBasedDeduction, amountDeducted)
// The gas used based deduction should be less than the fee because the fee is 1 TIA.
require.Less(t, gasUsedBasedDeduction, int64(fee))
Expand Down
2 changes: 1 addition & 1 deletion test/txsim/account.go
Original file line number Diff line number Diff line change
Expand Up @@ -240,7 +240,7 @@ func (am *AccountManager) Submit(ctx context.Context, op Operation) error {
}

var (
res *types.TxResponse
res *user.TxResponse
err error
)
if len(op.Blobs) > 0 {
Expand Down
Loading

0 comments on commit a28b9e7

Please sign in to comment.