From b64a53066286b9ef21b580821c8a9d13647c4488 Mon Sep 17 00:00:00 2001 From: Callum Waters Date: Tue, 11 Apr 2023 21:05:17 +0200 Subject: [PATCH 01/10] tx simulator prototype --- testing/txsim/account.go | 442 +++++++++++++++++++++++++++++++++ testing/txsim/blob.go | 102 ++++++++ testing/txsim/client.go | 275 ++++++++++++++++++++ testing/txsim/run.go | 101 ++++++++ testing/txsim/run_test.go | 152 ++++++++++++ testing/txsim/send.go | 57 +++++ testing/txsim/sequence.go | 47 ++++ testing/txsim/stake.go | 109 ++++++++ testutil/testfactory/utils.go | 2 +- testutil/testnode/full_node.go | 12 +- testutil/testnode/read.go | 54 ++++ 11 files changed, 1346 insertions(+), 7 deletions(-) create mode 100644 testing/txsim/account.go create mode 100644 testing/txsim/blob.go create mode 100644 testing/txsim/client.go create mode 100644 testing/txsim/run.go create mode 100644 testing/txsim/run_test.go create mode 100644 testing/txsim/send.go create mode 100644 testing/txsim/sequence.go create mode 100644 testing/txsim/stake.go create mode 100644 testutil/testnode/read.go diff --git a/testing/txsim/account.go b/testing/txsim/account.go new file mode 100644 index 0000000000..06aab2eee9 --- /dev/null +++ b/testing/txsim/account.go @@ -0,0 +1,442 @@ +package txsim + +import ( + "context" + "fmt" + "strings" + "sync" + + "github.com/celestiaorg/celestia-app/app" + "github.com/cosmos/cosmos-sdk/client" + "github.com/cosmos/cosmos-sdk/crypto/hd" + "github.com/cosmos/cosmos-sdk/crypto/keyring" + cryptotypes "github.com/cosmos/cosmos-sdk/crypto/types" + "github.com/cosmos/cosmos-sdk/types" + "github.com/cosmos/cosmos-sdk/types/tx/signing" + authsigning "github.com/cosmos/cosmos-sdk/x/auth/signing" + auth "github.com/cosmos/cosmos-sdk/x/auth/types" + bank "github.com/cosmos/cosmos-sdk/x/bank/types" + "github.com/cosmos/cosmos-sdk/x/feegrant" + "github.com/rs/zerolog/log" +) + +const ( + gasLimit = 1000000 + feeAmount = 1000 +) + +type AccountManager struct { + mtx sync.Mutex + keys keyring.Keyring + masterAccount *Account + accounts map[string]*Account + pending []*Account + tx *TxClient + query *QueryClient +} + +type Account struct { + Address types.AccAddress + PubKey cryptotypes.PubKey + Sequence uint64 + AccountNumber uint64 + Balance int64 +} + +func NewAccountManager(ctx context.Context, keys keyring.Keyring, txClient *TxClient, queryClient *QueryClient) (*AccountManager, error) { + records, err := keys.List() + if err != nil { + return nil, err + } + + if len(records) == 0 { + return nil, fmt.Errorf("no accounts found in keyring") + } + + am := &AccountManager{ + keys: keys, + accounts: make(map[string]*Account), + pending: make([]*Account, 0), + tx: txClient, + query: queryClient, + } + + if err := am.setupMasterAccount(ctx); err != nil { + return nil, err + } + + log.Info(). + Str("address", am.masterAccount.Address.String()). + Int64("balance", am.masterAccount.Balance). + Msg("set master account") + am.accounts[am.masterAccount.Address.String()] = am.masterAccount + + return am, nil +} + +// setupMasterAccount loops through all accounts in the keyring and picks out the one with +// the highest balance as the master account. Accounts that don't yet exist on chain are +// ignored. +func (am *AccountManager) setupMasterAccount(ctx context.Context) error { + records, err := am.keys.List() + if err != nil { + return err + } + + for _, record := range records { + address, err := record.GetAddress() + if err != nil { + return fmt.Errorf("error getting address for account %s: %w", record.Name, err) + } + + // search for the account on chain + balance, err := am.getBalance(ctx, address) + if err != nil { + log.Err(err).Str("account", record.Name).Msg("error getting initial account balance") + continue + } + + // the master account is the account with the highest balance + if am.masterAccount == nil || balance > am.masterAccount.Balance { + accountNumber, sequence, err := am.getAccountDetails(ctx, address) + if err != nil { + log.Err(err).Str("account", record.Name).Msg("error getting initial account details") + continue + } + pk, err := record.GetPubKey() + if err != nil { + return fmt.Errorf("error getting public key for account %s: %w", record.Name, err) + } + am.masterAccount = &Account{ + Address: address, + PubKey: pk, + Sequence: sequence, + AccountNumber: accountNumber, + Balance: balance, + } + } + } + + if am.masterAccount == nil { + return fmt.Errorf("no suitable account found") + } + + return nil +} + +// AllocateAccounts is used by sequences to specify the number of accounts +// and the balance of each of those accounts. Not concurrently safe. +func (am *AccountManager) AllocateAccounts(n, balance int) []types.AccAddress { + if n < 1 { + panic("n must be greater than 0") + } + if balance < 1 { + panic("balance must be greater than 0") + } + + path := hd.CreateHDPath(118, 0, 0).String() + addresses := make([]types.AccAddress, n) + for i := 0; i < n; i++ { + record, _, err := am.keys.NewMnemonic(am.nextAccountName(), keyring.English, path, keyring.DefaultBIP39Passphrase, hd.Secp256k1) + if err != nil { + panic(err) + } + addresses[i], err = record.GetAddress() + if err != nil { + panic(err) + } + + pk, err := record.GetPubKey() + if err != nil { + panic(err) + } + + am.pending = append(am.pending, &Account{ + Address: addresses[i], + PubKey: pk, + Balance: int64(balance), + }) + } + return addresses +} + +func (am *AccountManager) Submit(ctx context.Context, op Operation) error { + for _, msg := range op.Msgs { + if err := msg.ValidateBasic(); err != nil { + return fmt.Errorf("error validating message: %w", err) + } + } + + // create the tx builder and add the messages + builder := am.tx.Tx() + err := builder.SetMsgs(op.Msgs...) + if err != nil { + return fmt.Errorf("error setting messages: %w", err) + } + + builder.SetFeeAmount(types.NewCoins(types.NewInt64Coin(app.BondDenom, feeAmount))) + // the master account is responsible for paying the fees + builder.SetFeeGranter(am.masterAccount.Address) + builder.SetGasLimit(gasLimit) + + if err := am.signTransaction(builder); err != nil { + return err + } + + // If the sequence specified a delay, then wait for those blocks to be produced + if op.Delay != 0 { + if err := am.tx.WaitForNBlocks(ctx, op.Delay); err != nil { + return fmt.Errorf("error waiting for blocks: %w", err) + } + } + + // broadcast the transaction + resp, err := am.tx.Broadcast(ctx, builder, op.Blobs) + if err != nil { + return fmt.Errorf("error broadcasting transaction: %w", err) + } + + signers := builder.GetTx().GetSigners() + + log.Info(). + Int64("height", resp.Height). + Str("signers", addrsToString(signers)). + Str("msgs", msgsToString(op.Msgs)). + Msg("tx committed") + + // increment the sequence number for all the signers + for _, signer := range signers { + am.accounts[signer.String()].Sequence++ + } + + return nil +} + +// Generate the pending accounts by sending the adequate funds and setting up the feegrant permissions. +// This operation is not concurrently safe. +func (am *AccountManager) GenerateAccounts(ctx context.Context) error { + if len(am.pending) == 0 { + return nil + } + + msgs := make([]types.Msg, 0) + // batch together all the messages needed to create all the accounts + for _, acc := range am.pending { + accMsgs, err := am.setupAccountMsgs(acc) + if err != nil { + return fmt.Errorf("generating account %s: %w", acc.Address, err) + } + + msgs = append(msgs, accMsgs...) + } + + err := am.Submit(ctx, Operation{Msgs: msgs}) + if err != nil { + return fmt.Errorf("error funding accounts: %w", err) + } + + // check that the account now exists + for _, acc := range am.pending { + acc.AccountNumber, acc.Sequence, err = am.getAccountDetails(ctx, acc.Address) + if err != nil { + return fmt.Errorf("getting account %s: %w", acc.Address, err) + } + + // set the account + am.accounts[acc.Address.String()] = acc + log.Info(). + Str("address", acc.Address.String()). + Int64("balance", acc.Balance). + Str("pubkey", acc.PubKey.String()). + Uint64("account number", acc.AccountNumber). + Uint64("sequence", acc.Sequence). + Msg("initialized account") + } + + // update master account + err = am.updateAccount(ctx, am.masterAccount) + if err != nil { + return fmt.Errorf("updating master account: %w", err) + } + + // clear the pending accounts + am.pending = nil + return nil +} + +// setupAccount initializes the account on chain with the given balance. It also sets up +// a grant such that the master account covers the fees of any message sent. +func (am *AccountManager) setupAccountMsgs(account *Account) ([]types.Msg, error) { + if am.masterAccount.Balance < account.Balance { + return nil, fmt.Errorf("master account has insufficient funds") + } + + // create a feegrant message so that the master account pays for all the fees of the sub accounts + feegrantMsg, err := feegrant.NewMsgGrantAllowance(&feegrant.BasicAllowance{}, am.masterAccount.Address, account.Address) + if err != nil { + return nil, fmt.Errorf("error creating feegrant message: %w", err) + } + bankMsg := bank.NewMsgSend(am.masterAccount.Address, account.Address, types.NewCoins(types.NewInt64Coin(app.BondDenom, account.Balance))) + return []types.Msg{feegrantMsg, bankMsg}, nil +} + +func (am *AccountManager) signTransaction(builder client.TxBuilder) error { + am.mtx.Lock() + defer am.mtx.Unlock() + signers := builder.GetTx().GetSigners() + for _, signer := range signers { + _, ok := am.accounts[signer.String()] + if !ok { + return fmt.Errorf("account %s not found", signer.String()) + } + } + + // To ensure we have the correct bytes to sign over we produce + // a dry run of the signing data + draftsigV2 := make([]signing.SignatureV2, len(signers)) + index := 0 + for _, signer := range signers { + acc := am.accounts[signer.String()] + record, err := am.keys.KeyByAddress(signer) + if err != nil { + return fmt.Errorf("error getting key for account %s: %w", signer.String(), err) + } + pk, _ := record.GetPubKey() + if !pk.Equals(acc.PubKey) { + return fmt.Errorf("public key (%s != %s) mismatch for account %s", pk.String(), acc.PubKey.String(), signer.String()) + } + draftsigV2[index] = signing.SignatureV2{ + PubKey: acc.PubKey, + Data: &signing.SingleSignatureData{ + SignMode: signing.SignMode_SIGN_MODE_DIRECT, + Signature: nil, + }, + Sequence: acc.Sequence, + } + index++ + } + + err := builder.SetSignatures(draftsigV2...) + if err != nil { + return fmt.Errorf("error setting draft signatures: %w", err) + } + + // now we can use the data to produce the signature from each signer + index = 0 + sigV2 := make([]signing.SignatureV2, len(signers)) + for _, signer := range signers { + acc := am.accounts[signer.String()] + signature, err := am.createSignature(acc, builder) + if err != nil { + return fmt.Errorf("error creating signature: %w", err) + } + fmt.Printf("signing msg with pubkey %s\n", acc.PubKey.String()) + sigV2[index] = signing.SignatureV2{ + PubKey: acc.PubKey, + Data: &signing.SingleSignatureData{ + SignMode: signing.SignMode_SIGN_MODE_DIRECT, + Signature: signature, + }, + Sequence: acc.Sequence, + } + index++ + } + + err = builder.SetSignatures(sigV2...) + if err != nil { + return fmt.Errorf("error setting signatures: %w", err) + } + + return nil +} + +func (am *AccountManager) createSignature(account *Account, builder client.TxBuilder) ([]byte, error) { + signerData := authsigning.SignerData{ + Address: account.Address.String(), + ChainID: am.tx.ChainID(), + AccountNumber: account.AccountNumber, + Sequence: account.Sequence, + PubKey: account.PubKey, + } + + bytesToSign, err := am.tx.encCfg.TxConfig.SignModeHandler().GetSignBytes( + signing.SignMode_SIGN_MODE_DIRECT, + signerData, + builder.GetTx(), + ) + if err != nil { + return nil, fmt.Errorf("error getting sign bytes: %w", err) + } + + signature, _, err := am.keys.SignByAddress(account.Address, bytesToSign) + if err != nil { + return nil, fmt.Errorf("error signing bytes: %w", err) + } + + return signature, nil +} + +func (am *AccountManager) updateAccount(ctx context.Context, account *Account) error { + var err error + account.Balance, err = am.getBalance(ctx, account.Address) + if err != nil { + return fmt.Errorf("getting account balance: %w", err) + } + account.AccountNumber, account.Sequence, err = am.getAccountDetails(ctx, account.Address) + if err != nil { + return fmt.Errorf("getting account details: %w", err) + } + return nil +} + +// getBalance returns the balance for the given address +func (am *AccountManager) getBalance(ctx context.Context, address types.AccAddress) (int64, error) { + balanceResp, err := am.query.Bank().Balance(ctx, &bank.QueryBalanceRequest{ + Address: address.String(), + Denom: app.BondDenom, + }) + if err != nil { + return 0, fmt.Errorf("error getting balance for %s: %w", address.String(), err) + } + return balanceResp.GetBalance().Amount.Int64(), nil +} + +// getAccountDetails returns the account number and sequence for the given address +func (am *AccountManager) getAccountDetails(ctx context.Context, address types.AccAddress) (uint64, uint64, error) { + accountResp, err := am.query.Auth().Account(ctx, &auth.QueryAccountRequest{ + Address: address.String(), + }) + if err != nil { + return 0, 0, fmt.Errorf("error getting account state for %s: %w", address.String(), err) + } + + var acc auth.AccountI + err = am.tx.encCfg.InterfaceRegistry.UnpackAny(accountResp.Account, &acc) + if err != nil { + return 0, 0, fmt.Errorf("error unpacking account: %w", err) + } + + return acc.GetAccountNumber(), acc.GetSequence(), nil +} + +func (am *AccountManager) nextAccountName() string { + return accountName(len(am.pending) + len(am.accounts)) +} + +func accountName(n int) string { return fmt.Sprintf("tx-sim-%d", n) } + +func addrsToString(addrs []types.AccAddress) string { + addrsStr := make([]string, len(addrs)) + for i, addr := range addrs { + addrsStr[i] = addr.String() + } + return strings.Join(addrsStr, ",") +} + +func msgsToString(msgs []types.Msg) string { + msgsStr := make([]string, len(msgs)) + for i, msg := range msgs { + msgsStr[i] = types.MsgTypeURL(msg) + } + return strings.Join(msgsStr, ",") +} diff --git a/testing/txsim/blob.go b/testing/txsim/blob.go new file mode 100644 index 0000000000..8a09cd60e8 --- /dev/null +++ b/testing/txsim/blob.go @@ -0,0 +1,102 @@ +package txsim + +import ( + "context" + "fmt" + "math/rand" + + ns "github.com/celestiaorg/celestia-app/pkg/namespace" + "github.com/celestiaorg/celestia-app/testutil/blobfactory" + blob "github.com/celestiaorg/celestia-app/x/blob/types" + "github.com/cosmos/cosmos-sdk/types" + "github.com/gogo/protobuf/grpc" +) + +var _ Sequence = &BlobSequence{} + +// BlobSequence defines a pattern whereby a single user repeatedly sends a pay for blob +// message roughly every height. The PFB may consist of several blobs +type BlobSequence struct { + namespace ns.Namespace + sizes Range + blobsPerPFB Range + + account types.AccAddress +} + +func NewBlobSequence(sizes Range, blobsPerPFB Range) *BlobSequence { + return &BlobSequence{ + sizes: sizes, + blobsPerPFB: blobsPerPFB, + } +} + +// WithNamespace provides the option of fixing a predefined namespace for +// all blobs. +func (s *BlobSequence) WithNamespace(namespace ns.Namespace) *BlobSequence { + s.namespace = namespace + return s +} + +func (s *BlobSequence) Clone(n int) []Sequence { + sequenceGroup := make([]Sequence, n) + for i := 0; i < n; i++ { + sequenceGroup[i] = &BlobSequence{ + namespace: s.namespace, + sizes: s.sizes, + blobsPerPFB: s.blobsPerPFB, + } + } + return sequenceGroup +} + +func (s *BlobSequence) Init(_ context.Context, _ grpc.ClientConn, allocateAccounts AccountAllocator, _ *rand.Rand) { + s.account = allocateAccounts(1, 1)[0] +} + +func (s *BlobSequence) Next(ctx context.Context, querier grpc.ClientConn, rand *rand.Rand) (Operation, error) { + numBlobs := s.blobsPerPFB.Rand(rand) + sizes := make([]int, numBlobs) + namespaces := make([]ns.Namespace, numBlobs) + for i := range sizes { + if s.namespace.ID != nil { + namespaces[i] = s.namespace + } else { + // generate a random namespace for the blob + namespace := make([]byte, ns.NamespaceVersionZeroIDSize) + _, err := rand.Read(namespace) + if err != nil { + return Operation{}, fmt.Errorf("generating random namespace: %w", err) + } + namespaces[i] = ns.MustNewV0(namespace) + } + sizes[i] = s.sizes.Rand(rand) + } + // generate the blobs + blobs := blobfactory.RandBlobsWithNamespace(namespaces, sizes) + // derive the pay for blob message + msg, err := blob.NewMsgPayForBlobs(s.account.String(), blobs...) + if err != nil { + return Operation{}, err + } + return Operation{ + Msgs: []types.Msg{msg}, + Blobs: blobs, + }, nil +} + +type Range struct { + Min int + Max int +} + +func NewRange(min, max int) Range { + return Range{Min: min, Max: max} +} + +func (r Range) Rand(rand *rand.Rand) int { + if r.Max <= r.Min { + return r.Min + } + return rand.Intn(r.Max-r.Min) + r.Min +} diff --git a/testing/txsim/client.go b/testing/txsim/client.go new file mode 100644 index 0000000000..d177dde97a --- /dev/null +++ b/testing/txsim/client.go @@ -0,0 +1,275 @@ +package txsim + +import ( + "context" + "errors" + "fmt" + "strings" + "sync" + "time" + + "github.com/celestiaorg/celestia-app/app/encoding" + blob "github.com/celestiaorg/celestia-app/x/blob/types" + sdkclient "github.com/cosmos/cosmos-sdk/client" + auth "github.com/cosmos/cosmos-sdk/x/auth/types" + bank "github.com/cosmos/cosmos-sdk/x/bank/types" + protogrpc "github.com/gogo/protobuf/grpc" + "github.com/tendermint/tendermint/rpc/client/http" + coretypes "github.com/tendermint/tendermint/rpc/core/types" + "github.com/tendermint/tendermint/types" + "google.golang.org/grpc" + "google.golang.org/grpc/credentials/insecure" +) + +// how often to poll the network for the latest height +const ( + DefaultPollTime = 3 * time.Second + maxRetries = 20 +) + +var errTimedOutWaitingForTx = errors.New("timed out waiting for tx to be committed (1 minute)") + +// TxClient is a client for submitting transactions to one of several nodes. +type TxClient struct { + rpcClients []*http.HTTP + encCfg encoding.Config + chainID string + pollTime time.Duration + + mtx sync.Mutex + sequence int + height int64 + lastUpdated time.Time +} + +func NewTxClient(ctx context.Context, encCfg encoding.Config, pollTime time.Duration, rpcEndpoints []string) (*TxClient, error) { + if len(rpcEndpoints) == 0 { + return nil, errors.New("must have at least one endpoint specified") + } + + // setup all the rpc clients to communicate with full nodes + rpcClients := make([]*http.HTTP, len(rpcEndpoints)) + var ( + err error + chainID string + height int64 + ) + for i, endpoint := range rpcEndpoints { + rpcClients[i], err = http.New(endpoint, "/websocket") + if err != nil { + return nil, fmt.Errorf("error creating rpc client with endpoint %s: %w", endpoint, err) + } + + // check that the node is up + status, err := rpcClients[i].Status(ctx) + if err != nil { + return nil, fmt.Errorf("error getting status from rpc server %s: %w", endpoint, err) + } + + // set the chainID + if chainID == "" { + chainID = status.NodeInfo.Network + } + + // set the latest height + if status.SyncInfo.EarliestBlockHeight > height { + height = status.SyncInfo.EarliestBlockHeight + } + } + return &TxClient{ + rpcClients: rpcClients, + encCfg: encCfg, + chainID: chainID, + pollTime: pollTime, + height: height, + lastUpdated: time.Now(), + }, nil +} + +func (tc *TxClient) Tx() sdkclient.TxBuilder { + builder := tc.encCfg.TxConfig.NewTxBuilder() + return builder +} + +func (tc *TxClient) ChainID() string { + return tc.chainID +} + +func (tc *TxClient) Height() int64 { + tc.mtx.Lock() + defer tc.mtx.Unlock() + return tc.height +} + +func (tc *TxClient) updateHeight(newHeight int64) int64 { + tc.mtx.Lock() + defer tc.mtx.Unlock() + if newHeight > tc.height { + tc.height = newHeight + tc.lastUpdated = time.Now() + return newHeight + } + return tc.height +} + +func (tc *TxClient) LastUpdated() time.Time { + tc.mtx.Lock() + defer tc.mtx.Unlock() + return tc.lastUpdated +} + +// WaitForNBlocks uses WaitForHeight to wait for the given number of blocks to +// be produced. +func (tc *TxClient) WaitForNBlocks(ctx context.Context, blocks int64) error { + return tc.WaitForHeight(ctx, tc.Height()+blocks) +} + +// WaitForHeight continually polls the network for the latest height. It is +// concurrently safe. +func (tc *TxClient) WaitForHeight(ctx context.Context, height int64) error { + // check if we can immediately return + if height <= tc.Height() { + return nil + } + + ticker := time.NewTicker(tc.pollTime) + for { + select { + case <-ticker.C: + // check if we've reached the target height + if height <= tc.Height() { + return nil + } + // check when the last time we polled to avoid concurrent processes + // from polling the network too often + if time.Since(tc.LastUpdated()) < tc.pollTime { + continue + } + + // ping a node for their latest height + status, err := tc.Client().Status(ctx) + if err != nil { + return fmt.Errorf("error getting status from rpc server: %w", err) + } + + latestHeight := tc.updateHeight(status.SyncInfo.LatestBlockHeight) + // check if the new latest height is greater or equal than the target height + if latestHeight >= height { + return nil + } + + case <-ctx.Done(): + return ctx.Err() + } + } +} + +func (tc *TxClient) WaitForTx(ctx context.Context, txID []byte) (*coretypes.ResultTx, error) { + for i := 0; i < maxRetries; i++ { + resp, err := tc.Client().Tx(ctx, txID, false) + if err != nil { + // tx still no longer exists + if strings.Contains(err.Error(), "not found") { + time.Sleep(tc.pollTime) + continue + } + return nil, err + } + + if resp.TxResult.Code != 0 { + return nil, fmt.Errorf("non zero code delivering tx (%d): %s", resp.TxResult.Code, resp.TxResult.Log) + } + + return resp, nil + } + return nil, errTimedOutWaitingForTx +} + +// Client multiplexes the RPC clients +func (tc *TxClient) Client() *http.HTTP { + tc.mtx.Lock() + defer tc.mtx.Unlock() + defer tc.next() + return tc.rpcClients[tc.sequence] +} + +// Broadcast encodes and broadcasts a transaction to the network. If CheckTx fails, +// the error will be returned. The method does not wait for the transaction to be +// included in a block. +func (tc *TxClient) Broadcast(ctx context.Context, txBuilder sdkclient.TxBuilder, blobs []*blob.Blob) (*coretypes.ResultTx, error) { + tx, err := tc.encCfg.TxConfig.TxEncoder()(txBuilder.GetTx()) + if err != nil { + return nil, fmt.Errorf("error encoding tx: %w", err) + } + + // If blobs exist, these are bundled into the existing tx. + if len(blobs) > 0 { + txWithBlobs, err := types.MarshalBlobTx(tx, blobs...) + if err != nil { + return nil, err + } + tx = txWithBlobs + } + + resp, err := tc.Client().BroadcastTxSync(ctx, tx) + if err != nil { + return nil, fmt.Errorf("broadcast commit: %w", err) + } + + if resp.Code != 0 { + return nil, fmt.Errorf("non zero code checking tx (%d): %s", resp.Code, resp.Log) + } + + return tc.WaitForTx(ctx, resp.Hash) +} + +func (tc *TxClient) next() { + tc.sequence = (tc.sequence + 1) % len(tc.rpcClients) +} + +type QueryClient struct { + connections []*grpc.ClientConn + sequence int +} + +func NewQueryClient(grpcEndpoints []string) (*QueryClient, error) { + connections := make([]*grpc.ClientConn, len(grpcEndpoints)) + for idx, endpoint := range grpcEndpoints { + conn, err := grpc.Dial(grpcEndpoints[0], grpc.WithTransportCredentials(insecure.NewCredentials())) + if err != nil { + return nil, fmt.Errorf("dialing %s: %w", endpoint, err) + } + connections[idx] = conn + } + + return &QueryClient{ + connections: connections, + }, nil +} + +func (qc *QueryClient) next() { + qc.sequence = (qc.sequence + 1) % len(qc.connections) +} + +func (qc *QueryClient) Conn() protogrpc.ClientConn { + defer qc.next() + return qc.connections[qc.sequence] +} + +func (qc *QueryClient) Bank() bank.QueryClient { + defer qc.next() + return bank.NewQueryClient(qc.connections[qc.sequence]) +} + +func (qc *QueryClient) Auth() auth.QueryClient { + defer qc.next() + return auth.NewQueryClient(qc.connections[qc.sequence]) +} + +func (qc *QueryClient) Close() error { + var err error + for _, conn := range qc.connections { + err = conn.Close() + } + return err +} diff --git a/testing/txsim/run.go b/testing/txsim/run.go new file mode 100644 index 0000000000..73911ad374 --- /dev/null +++ b/testing/txsim/run.go @@ -0,0 +1,101 @@ +package txsim + +import ( + "context" + "errors" + "fmt" + "math/rand" + "time" + + "github.com/celestiaorg/celestia-app/app" + "github.com/celestiaorg/celestia-app/app/encoding" + "github.com/cosmos/cosmos-sdk/crypto/keyring" + "github.com/rs/zerolog/log" +) + +// Run is the entrypoint function for starting the txsim client. The lifecycle of the client is managed +// through the context. At least one grpc and rpc endpoint must be provided. The client relies on a +// single funded master account present in the keyring. The client allocates subaccounts for sequences +// at runtime. A seed can be provided for deterministic randomness. +// +// This should be used for testing purposes only. +// +// All sequences can be scaled up using the `Clone` method. This allows for a single sequence that +// repeatedly sends random PFBs to be scaled up to 1000 accounts sending PFBs. +func Run( + ctx context.Context, + rpcEndpoints, grpcEndpoints []string, + keys keyring.Keyring, + seed int64, + pollTime time.Duration, + sequences ...Sequence, +) error { + rand := rand.New(rand.NewSource(seed)) + + txClient, err := NewTxClient(ctx, encoding.MakeConfig(app.ModuleEncodingRegisters...), pollTime, rpcEndpoints) + if err != nil { + return err + } + + queryClient, err := NewQueryClient(grpcEndpoints) + if err != nil { + return err + } + + // Create the account manager to handle account transactions. + manager, err := NewAccountManager(ctx, keys, txClient, queryClient) + if err != nil { + return err + } + + // Initiaize each of the sequences by allowing them to allocate accounts. + for _, sequence := range sequences { + sequence.Init(ctx, manager.query.Conn(), manager.AllocateAccounts, rand) + } + + // Generate the allotted accounts on chain by sending them sufficient funds + if err := manager.GenerateAccounts(ctx); err != nil { + return err + } + + errCh := make(chan error, len(sequences)) + + // Spin up a task group to run each of the sequences concurrently. + for idx, sequence := range sequences { + go func(seqID int, sequence Sequence, errCh chan<- error) { + opNum := 0 + // each sequence loops through the next set of operations, the new messages are then + // submitted on chain + for { + ops, err := sequence.Next(ctx, manager.query.Conn(), rand) + if err != nil { + errCh <- fmt.Errorf("sequence %d: %w", seqID, err) + return + } + + // Submit the messages to the chain. + if err := manager.Submit(ctx, ops); err != nil { + errCh <- fmt.Errorf("sequence %d: %w", seqID, err) + return + } + opNum++ + } + }(idx, sequence, errCh) + } + + var finalErr error + for i := 0; i < len(sequences); i++ { + err := <-errCh + if err == nil { // should never happen + continue + } + if errors.Is(err, EndOfSequence) { + log.Info().Err(err).Msg("sequence terminated") + continue + } + log.Error().Err(err).Msg("sequence failed") + finalErr = err + } + + return finalErr +} diff --git a/testing/txsim/run_test.go b/testing/txsim/run_test.go new file mode 100644 index 0000000000..83f0e0b87b --- /dev/null +++ b/testing/txsim/run_test.go @@ -0,0 +1,152 @@ +package txsim_test + +import ( + "context" + "errors" + "fmt" + "testing" + "time" + + "github.com/celestiaorg/celestia-app/testing/txsim" + "github.com/celestiaorg/celestia-app/testutil/testnode" + "github.com/cosmos/cosmos-sdk/crypto/keyring" + sdk "github.com/cosmos/cosmos-sdk/types" + + blob "github.com/celestiaorg/celestia-app/x/blob/types" + bank "github.com/cosmos/cosmos-sdk/x/bank/types" + distribution "github.com/cosmos/cosmos-sdk/x/distribution/types" + staking "github.com/cosmos/cosmos-sdk/x/staking/types" + "github.com/stretchr/testify/require" +) + +func TestTxSimulator(t *testing.T) { + testCases := []struct { + name string + sequences []txsim.Sequence + expMessages map[string]int64 + }{ + { + name: "send sequence", + sequences: []txsim.Sequence{txsim.NewSendSequence(2, 1000)}, + // we expect at least 5 bank send messages within 30 seconds + expMessages: map[string]int64{sdk.MsgTypeURL(&bank.MsgSend{}): 5}, + }, + { + name: "stake sequence", + sequences: []txsim.Sequence{txsim.NewStakeSequence(1000)}, + expMessages: map[string]int64{ + sdk.MsgTypeURL(&staking.MsgDelegate{}): 1, + sdk.MsgTypeURL(&distribution.MsgWithdrawDelegatorReward{}): 5, + // NOTE: this sequence also makes redelegations but because the + // testnet has only one validator, this never happens + }, + }, + { + name: "blob sequence", + sequences: []txsim.Sequence{ + txsim.NewBlobSequence( + txsim.NewRange(100, 1000), + txsim.NewRange(1, 3)), + }, + expMessages: map[string]int64{sdk.MsgTypeURL(&blob.MsgPayForBlobs{}): 10}, + }, + { + name: "multi blob sequence", + sequences: txsim.NewBlobSequence( + txsim.NewRange(100, 1000), + txsim.NewRange(1, 3), + ).Clone(4), + expMessages: map[string]int64{sdk.MsgTypeURL(&blob.MsgPayForBlobs{}): 20}, + }, + { + name: "multi mixed sequence", + sequences: append(append( + txsim.NewSendSequence(2, 1000).Clone(3), + txsim.NewStakeSequence(1000).Clone(3)...), + txsim.NewBlobSequence(txsim.NewRange(100, 400), txsim.NewRange(1, 4)).Clone(2)...), + expMessages: map[string]int64{ + sdk.MsgTypeURL(&bank.MsgSend{}): 15, + sdk.MsgTypeURL(&staking.MsgDelegate{}): 2, + sdk.MsgTypeURL(&distribution.MsgWithdrawDelegatorReward{}): 10, + sdk.MsgTypeURL(&blob.MsgPayForBlobs{}): 10, + }, + }, + } + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) + defer cancel() + + keyring, rpcAddr, grpcAddr := Setup(t) + + err := txsim.Run( + ctx, + []string{rpcAddr}, + []string{grpcAddr}, + keyring, + 9001, + time.Second, + tc.sequences..., + ) + // Expect all sequences to run for at least 30 seconds without error + require.True(t, errors.Is(err, context.DeadlineExceeded), err.Error()) + + blocks, err := testnode.ReadBlockchain(context.Background(), rpcAddr) + require.NoError(t, err) + for _, block := range blocks { + msgs, err := testnode.DecodeBlockData(block.Data) + require.NoError(t, err, block.Height) + for _, msg := range msgs { + if _, ok := tc.expMessages[sdk.MsgTypeURL(msg)]; ok { + tc.expMessages[sdk.MsgTypeURL(msg)]-- + } + } + } + for msg, count := range tc.expMessages { + if count > 0 { + t.Errorf("missing %d messages of type %s (blocks: %d)", count, msg, len(blocks)) + } + } + }) + } +} + +func Setup(t *testing.T) (keyring.Keyring, string, string) { + t.Helper() + genesis, keyring, err := testnode.DefaultGenesisState() + require.NoError(t, err) + + tmCfg := testnode.DefaultTendermintConfig() + tmCfg.RPC.ListenAddress = fmt.Sprintf("tcp://127.0.0.1:%d", testnode.GetFreePort()) + tmCfg.P2P.ListenAddress = fmt.Sprintf("tcp://127.0.0.1:%d", testnode.GetFreePort()) + tmCfg.RPC.GRPCListenAddress = fmt.Sprintf("tcp://127.0.0.1:%d", testnode.GetFreePort()) + + node, app, cctx, err := testnode.New( + t, + testnode.DefaultParams(), + tmCfg, + true, + genesis, + keyring, + "testnet", + ) + require.NoError(t, err) + + cctx, stopNode, err := testnode.StartNode(node, cctx) + require.NoError(t, err) + + appConf := testnode.DefaultAppConfig() + appConf.GRPC.Address = fmt.Sprintf("127.0.0.1:%d", testnode.GetFreePort()) + appConf.API.Address = fmt.Sprintf("tcp://127.0.0.1:%d", testnode.GetFreePort()) + + _, cleanupGRPC, err := testnode.StartGRPCServer(app, appConf, cctx) + require.NoError(t, err) + + t.Cleanup(func() { + t.Log("tearing down testnode") + require.NoError(t, stopNode()) + require.NoError(t, cleanupGRPC()) + }) + + return keyring, tmCfg.RPC.ListenAddress, appConf.GRPC.Address +} diff --git a/testing/txsim/send.go b/testing/txsim/send.go new file mode 100644 index 0000000000..758d4d122d --- /dev/null +++ b/testing/txsim/send.go @@ -0,0 +1,57 @@ +package txsim + +import ( + "context" + "math/rand" + + "github.com/celestiaorg/celestia-app/app" + "github.com/cosmos/cosmos-sdk/types" + bank "github.com/cosmos/cosmos-sdk/x/bank/types" + "github.com/gogo/protobuf/grpc" + "github.com/rs/zerolog/log" +) + +var _ Sequence = &SendSequence{} + +// SendSequence sets up an endless sequence of send transactions, moving tokens +// between a set of accounts +type SendSequence struct { + numAccounts int + sendAmount int + maxHeightDelay int + accounts []types.AccAddress + sequence int +} + +func NewSendSequence(numAccounts int, sendAmount int) *SendSequence { + return &SendSequence{ + numAccounts: numAccounts, + sendAmount: sendAmount, + maxHeightDelay: 5, + } +} + +func (s *SendSequence) Clone(n int) []Sequence { + sequenceGroup := make([]Sequence, n) + for i := 0; i < n; i++ { + sequenceGroup[i] = NewSendSequence(s.numAccounts, s.sendAmount) + } + return sequenceGroup +} + +func (s *SendSequence) Init(_ context.Context, _ grpc.ClientConn, allocateAccounts AccountAllocator, _ *rand.Rand) { + s.accounts = allocateAccounts(s.numAccounts, s.sendAmount) +} + +// Next sumbits a transaction to remove funds from one account to the next +func (s *SendSequence) Next(_ context.Context, _ grpc.ClientConn, rand *rand.Rand) (Operation, error) { + op := Operation{ + Msgs: []types.Msg{ + bank.NewMsgSend(s.accounts[s.sequence%s.numAccounts], s.accounts[(s.sequence+1)%s.numAccounts], types.NewCoins(types.NewInt64Coin(app.BondDenom, int64(s.sendAmount)))), + }, + Delay: rand.Int63n(int64(s.maxHeightDelay)), + } + log.Info().Msg("next send sequence") + s.sequence++ + return op, nil +} diff --git a/testing/txsim/sequence.go b/testing/txsim/sequence.go new file mode 100644 index 0000000000..0e9beacf20 --- /dev/null +++ b/testing/txsim/sequence.go @@ -0,0 +1,47 @@ +package txsim + +import ( + "context" + "errors" + "math/rand" + + blob "github.com/celestiaorg/celestia-app/x/blob/types" + "github.com/cosmos/cosmos-sdk/types" + "github.com/gogo/protobuf/grpc" +) + +// Sequence is the basic unit for programmatic transaction generation. +// It embodies a pattern of transactions which are executed among a group +// of accounts in isolation from the rest of the state machine. +type Sequence interface { + // Clone replicates n instances of the sequence for scaling up the load + // on a network. This is called before `Init` + Clone(n int) []Sequence + + // Init allows the sequence to initialize itself. It may read the current state of + // the chain and provision accounts for usage throughout the sequence. + // For any randomness, use the rand source provided. + Init(ctx context.Context, querier grpc.ClientConn, accountAllocator AccountAllocator, rand *rand.Rand) + + // Next returns the next operation in the sequence. It returns EndOfSequence + // when the sequence has been exhausted. The sequence may make use of the + // grpc connection to query the state of the network as well as the deterministic + // random number generator. Any error will abort the rest of the sequence. + Next(ctx context.Context, querier grpc.ClientConn, rand *rand.Rand) (Operation, error) +} + +// An operation represents a series of messages and blobs that are to be bundled in a +// single transaction. A delay (in heights) may also be set before the transaction is sent. +type Operation struct { + Msgs []types.Msg + Blobs []*blob.Blob + Delay int64 +} + +// EndOfSequence is a special error which indicates that the sequence has been terminated +// nolint: revive +var EndOfSequence = errors.New("end of sequence") + +// AccountAllocator reserves and funds a series of accounts to be used exclusively by +// the Sequence. +type AccountAllocator func(n, balance int) []types.AccAddress diff --git a/testing/txsim/stake.go b/testing/txsim/stake.go new file mode 100644 index 0000000000..d2da16fa23 --- /dev/null +++ b/testing/txsim/stake.go @@ -0,0 +1,109 @@ +package txsim + +import ( + "context" + "math/rand" + + "github.com/celestiaorg/celestia-app/app" + "github.com/cosmos/cosmos-sdk/types" + distribution "github.com/cosmos/cosmos-sdk/x/distribution/types" + staking "github.com/cosmos/cosmos-sdk/x/staking/types" + "github.com/gogo/protobuf/grpc" +) + +var _ Sequence = &StakeSequence{} + +// StakeSequence sets up an endless sequence whereby an account delegates to a validator, continuously claims +// the reward, and occasionally redelegates to another validator at random. The account only ever delegates +// to a single validator at a time. TODO: Allow for multiple delegations +type StakeSequence struct { + initialStake int + redelegatePropability int + delegatedTo string + account types.AccAddress +} + +func NewStakeSequence(initialStake int) *StakeSequence { + return &StakeSequence{ + initialStake: initialStake, + redelegatePropability: 10, // 1 in every 10 + } +} + +func (s *StakeSequence) Clone(n int) []Sequence { + sequenceGroup := make([]Sequence, n) + for i := 0; i < n; i++ { + sequenceGroup[i] = NewStakeSequence(s.initialStake) + } + return sequenceGroup +} + +func (s *StakeSequence) Init(_ context.Context, _ grpc.ClientConn, allocateAccounts AccountAllocator, _ *rand.Rand) { + s.account = allocateAccounts(1, s.initialStake)[0] +} + +func (s *StakeSequence) Next(ctx context.Context, querier grpc.ClientConn, rand *rand.Rand) (Operation, error) { + var op Operation + + // for the first operation, the account delegates to a validator + if s.delegatedTo == "" { + val, err := getRandomValidator(ctx, querier, rand) + if err != nil { + return Operation{}, err + } + s.delegatedTo = val.OperatorAddress + return Operation{ + Msgs: []types.Msg{ + &staking.MsgDelegate{ + DelegatorAddress: s.account.String(), + ValidatorAddress: s.delegatedTo, + Amount: types.NewInt64Coin(app.BondDenom, int64(s.initialStake)), + }, + }, + }, nil + } + + // occasionally redelegate the initial stake to another validator at random + if rand.Intn(s.redelegatePropability) == 0 { + val, err := getRandomValidator(ctx, querier, rand) + if err != nil { + return Operation{}, err + } + if val.OperatorAddress != s.delegatedTo { + op = Operation{ + Msgs: []types.Msg{ + &staking.MsgBeginRedelegate{ + DelegatorAddress: s.account.String(), + ValidatorSrcAddress: s.delegatedTo, + ValidatorDstAddress: val.OperatorAddress, + // NOTE: only the initial stake is redelgated (not the entire balance) + Amount: types.NewInt64Coin(app.BondDenom, int64(s.initialStake)), + }, + }, + } + s.delegatedTo = val.OperatorAddress + return op, nil + } + } + + // claim pending rewards + op = Operation{ + Msgs: []types.Msg{ + &distribution.MsgWithdrawDelegatorReward{ + DelegatorAddress: s.account.String(), + ValidatorAddress: s.delegatedTo, + }, + }, + Delay: rand.Int63n(20), + } + + return op, nil +} + +func getRandomValidator(ctx context.Context, conn grpc.ClientConn, rand *rand.Rand) (staking.Validator, error) { + resp, err := staking.NewQueryClient(conn).Validators(ctx, &staking.QueryValidatorsRequest{}) + if err != nil { + return staking.Validator{}, err + } + return resp.Validators[rand.Intn(len(resp.Validators))], nil +} diff --git a/testutil/testfactory/utils.go b/testutil/testfactory/utils.go index 8c9b7ba1ab..160759afe6 100644 --- a/testutil/testfactory/utils.go +++ b/testutil/testfactory/utils.go @@ -91,7 +91,7 @@ func FundKeyringAccounts(cdc codec.Codec, accounts ...string) (keyring.Keyring, ) genBalances[i] = banktypes.Balance{Address: addr.String(), Coins: balances.Sort()} - genAccounts[i] = authtypes.NewBaseAccount(addr, nil, 0, 0) + genAccounts[i] = authtypes.NewBaseAccount(addr, nil, uint64(i), 0) } return kr, genBalances, genAccounts } diff --git a/testutil/testnode/full_node.go b/testutil/testnode/full_node.go index efe6880c2a..315cbd30a4 100644 --- a/testutil/testnode/full_node.go +++ b/testutil/testnode/full_node.go @@ -190,9 +190,9 @@ func DefaultNetwork(t *testing.T, blockTime time.Duration) (accounts []string, c tmCfg := DefaultTendermintConfig() tmCfg.Consensus.TimeoutCommit = blockTime - tmCfg.RPC.ListenAddress = fmt.Sprintf("tcp://127.0.0.1:%d", getFreePort()) - tmCfg.P2P.ListenAddress = fmt.Sprintf("tcp://127.0.0.1:%d", getFreePort()) - tmCfg.RPC.GRPCListenAddress = fmt.Sprintf("tcp://127.0.0.1:%d", getFreePort()) + tmCfg.RPC.ListenAddress = fmt.Sprintf("tcp://127.0.0.1:%d", GetFreePort()) + tmCfg.P2P.ListenAddress = fmt.Sprintf("tcp://127.0.0.1:%d", GetFreePort()) + tmCfg.RPC.GRPCListenAddress = fmt.Sprintf("tcp://127.0.0.1:%d", GetFreePort()) genState, kr, err := DefaultGenesisState(accounts...) require.NoError(t, err) @@ -204,8 +204,8 @@ func DefaultNetwork(t *testing.T, blockTime time.Duration) (accounts []string, c require.NoError(t, err) appConf := DefaultAppConfig() - appConf.GRPC.Address = fmt.Sprintf("127.0.0.1:%d", getFreePort()) - appConf.API.Address = fmt.Sprintf("tcp://127.0.0.1:%d", getFreePort()) + appConf.GRPC.Address = fmt.Sprintf("127.0.0.1:%d", GetFreePort()) + appConf.API.Address = fmt.Sprintf("tcp://127.0.0.1:%d", GetFreePort()) cctx, cleanupGRPC, err := StartGRPCServer(app, appConf, cctx) require.NoError(t, err) @@ -219,7 +219,7 @@ func DefaultNetwork(t *testing.T, blockTime time.Duration) (accounts []string, c return accounts, cctx } -func getFreePort() int { +func GetFreePort() int { a, err := net.ResolveTCPAddr("tcp", "localhost:0") if err == nil { var l *net.TCPListener diff --git a/testutil/testnode/read.go b/testutil/testnode/read.go new file mode 100644 index 0000000000..bc6189dfd1 --- /dev/null +++ b/testutil/testnode/read.go @@ -0,0 +1,54 @@ +package testnode + +import ( + "context" + "fmt" + + "github.com/celestiaorg/celestia-app/app" + "github.com/celestiaorg/celestia-app/app/encoding" + sdk "github.com/cosmos/cosmos-sdk/types" + "github.com/tendermint/tendermint/rpc/client/http" + "github.com/tendermint/tendermint/types" +) + +func ReadBlockchain(ctx context.Context, rpcAddress string) ([]*types.Block, error) { + client, err := http.New(rpcAddress, "/websocket") + if err != nil { + return nil, err + } + status, err := client.Status(ctx) + if err != nil { + return nil, err + } + return ReadBlockHeights(ctx, rpcAddress, 1, status.SyncInfo.LatestBlockHeight) +} + +func ReadBlockHeights(ctx context.Context, rpcAddress string, fromHeight, toHeight int64) ([]*types.Block, error) { + client, err := http.New(rpcAddress, "/websocket") + if err != nil { + return nil, err + } + blocks := make([]*types.Block, toHeight-fromHeight+1) + for i := fromHeight; i <= toHeight; i++ { + resp, err := client.Block(ctx, &i) + if err != nil { + return nil, err + } + blocks[i-fromHeight] = resp.Block + } + return blocks, nil +} + +func DecodeBlockData(data types.Data) ([]sdk.Msg, error) { + encCfg := encoding.MakeConfig(app.ModuleEncodingRegisters...) + decoder := encoding.IndexWrapperDecoder(encCfg.TxConfig.TxDecoder()) + msgs := make([]sdk.Msg, 0) + for _, txBytes := range data.Txs { + tx, err := decoder(txBytes) + if err != nil { + return nil, fmt.Errorf("decoding tx: %s: %w", string(txBytes), err) + } + msgs = append(msgs, tx.GetMsgs()...) + } + return msgs, nil +} From 9f9ef7610e539d72b2aa58a37152f4528ec0e751 Mon Sep 17 00:00:00 2001 From: Callum Waters Date: Tue, 11 Apr 2023 22:24:39 +0200 Subject: [PATCH 02/10] clean up logs --- testing/txsim/account.go | 1 - testing/txsim/run_test.go | 5 ++++- testing/txsim/send.go | 2 -- 3 files changed, 4 insertions(+), 4 deletions(-) diff --git a/testing/txsim/account.go b/testing/txsim/account.go index 06aab2eee9..d3d84e77e8 100644 --- a/testing/txsim/account.go +++ b/testing/txsim/account.go @@ -330,7 +330,6 @@ func (am *AccountManager) signTransaction(builder client.TxBuilder) error { if err != nil { return fmt.Errorf("error creating signature: %w", err) } - fmt.Printf("signing msg with pubkey %s\n", acc.PubKey.String()) sigV2[index] = signing.SignatureV2{ PubKey: acc.PubKey, Data: &signing.SingleSignatureData{ diff --git a/testing/txsim/run_test.go b/testing/txsim/run_test.go index 83f0e0b87b..0953195ede 100644 --- a/testing/txsim/run_test.go +++ b/testing/txsim/run_test.go @@ -72,7 +72,10 @@ func TestTxSimulator(t *testing.T) { }, }, } - for _, tc := range testCases { + for idx, tc := range testCases { + if idx != 4 { + continue + } t.Run(tc.name, func(t *testing.T) { ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) defer cancel() diff --git a/testing/txsim/send.go b/testing/txsim/send.go index 758d4d122d..f90b8dd743 100644 --- a/testing/txsim/send.go +++ b/testing/txsim/send.go @@ -8,7 +8,6 @@ import ( "github.com/cosmos/cosmos-sdk/types" bank "github.com/cosmos/cosmos-sdk/x/bank/types" "github.com/gogo/protobuf/grpc" - "github.com/rs/zerolog/log" ) var _ Sequence = &SendSequence{} @@ -51,7 +50,6 @@ func (s *SendSequence) Next(_ context.Context, _ grpc.ClientConn, rand *rand.Ran }, Delay: rand.Int63n(int64(s.maxHeightDelay)), } - log.Info().Msg("next send sequence") s.sequence++ return op, nil } From 7ef5695efcbeb187ed23e87db9e381b6637e238b Mon Sep 17 00:00:00 2001 From: Callum Waters Date: Wed, 12 Apr 2023 11:12:45 +0200 Subject: [PATCH 03/10] fix up the data race (I think) --- testing/txsim/client.go | 12 +++++++----- testing/txsim/run.go | 8 +++++--- testing/txsim/run_test.go | 5 +---- 3 files changed, 13 insertions(+), 12 deletions(-) diff --git a/testing/txsim/client.go b/testing/txsim/client.go index d177dde97a..66de6ee2e8 100644 --- a/testing/txsim/client.go +++ b/testing/txsim/client.go @@ -229,7 +229,9 @@ func (tc *TxClient) next() { type QueryClient struct { connections []*grpc.ClientConn - sequence int + + mtx sync.Mutex + sequence int } func NewQueryClient(grpcEndpoints []string) (*QueryClient, error) { @@ -252,18 +254,18 @@ func (qc *QueryClient) next() { } func (qc *QueryClient) Conn() protogrpc.ClientConn { + qc.mtx.Lock() + defer qc.mtx.Unlock() defer qc.next() return qc.connections[qc.sequence] } func (qc *QueryClient) Bank() bank.QueryClient { - defer qc.next() - return bank.NewQueryClient(qc.connections[qc.sequence]) + return bank.NewQueryClient(qc.Conn()) } func (qc *QueryClient) Auth() auth.QueryClient { - defer qc.next() - return auth.NewQueryClient(qc.connections[qc.sequence]) + return auth.NewQueryClient(qc.Conn()) } func (qc *QueryClient) Close() error { diff --git a/testing/txsim/run.go b/testing/txsim/run.go index 73911ad374..a6a486fb88 100644 --- a/testing/txsim/run.go +++ b/testing/txsim/run.go @@ -30,7 +30,7 @@ func Run( pollTime time.Duration, sequences ...Sequence, ) error { - rand := rand.New(rand.NewSource(seed)) + r := rand.New(rand.NewSource(seed)) txClient, err := NewTxClient(ctx, encoding.MakeConfig(app.ModuleEncodingRegisters...), pollTime, rpcEndpoints) if err != nil { @@ -41,6 +41,7 @@ func Run( if err != nil { return err } + defer queryClient.Close() // Create the account manager to handle account transactions. manager, err := NewAccountManager(ctx, keys, txClient, queryClient) @@ -50,7 +51,7 @@ func Run( // Initiaize each of the sequences by allowing them to allocate accounts. for _, sequence := range sequences { - sequence.Init(ctx, manager.query.Conn(), manager.AllocateAccounts, rand) + sequence.Init(ctx, manager.query.Conn(), manager.AllocateAccounts, r) } // Generate the allotted accounts on chain by sending them sufficient funds @@ -64,10 +65,11 @@ func Run( for idx, sequence := range sequences { go func(seqID int, sequence Sequence, errCh chan<- error) { opNum := 0 + r := rand.New(rand.NewSource(seed)) // each sequence loops through the next set of operations, the new messages are then // submitted on chain for { - ops, err := sequence.Next(ctx, manager.query.Conn(), rand) + ops, err := sequence.Next(ctx, manager.query.Conn(), r) if err != nil { errCh <- fmt.Errorf("sequence %d: %w", seqID, err) return diff --git a/testing/txsim/run_test.go b/testing/txsim/run_test.go index 0953195ede..83f0e0b87b 100644 --- a/testing/txsim/run_test.go +++ b/testing/txsim/run_test.go @@ -72,10 +72,7 @@ func TestTxSimulator(t *testing.T) { }, }, } - for idx, tc := range testCases { - if idx != 4 { - continue - } + for _, tc := range testCases { t.Run(tc.name, func(t *testing.T) { ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) defer cancel() From 54824011774c72ed7a6b305992d09e08e2a167f9 Mon Sep 17 00:00:00 2001 From: Callum Waters Date: Wed, 12 Apr 2023 15:25:35 +0200 Subject: [PATCH 04/10] result to ignoring race failure --- testing/txsim/run_test.go | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/testing/txsim/run_test.go b/testing/txsim/run_test.go index 83f0e0b87b..c47c7f0b0c 100644 --- a/testing/txsim/run_test.go +++ b/testing/txsim/run_test.go @@ -1,3 +1,7 @@ +//go:build !race + +// known race in testnode +// ref: https://github.com/celestiaorg/celestia-app/issues/1369 package txsim_test import ( From 60e53caebf3e95c6f05e7a288130c842f9de8a75 Mon Sep 17 00:00:00 2001 From: Callum Waters Date: Thu, 13 Apr 2023 11:12:28 +0200 Subject: [PATCH 05/10] address PR comments --- testing/txsim/account.go | 4 -- testing/txsim/client.go | 24 +++++++---- testing/txsim/run_test.go | 8 ++-- testutil/testnode/full_node.go | 2 +- testutil/testnode/node_init.go | 2 +- testutil/testnode/read.go | 79 ++++++++++++++++++++++++++++++++++ 6 files changed, 100 insertions(+), 19 deletions(-) diff --git a/testing/txsim/account.go b/testing/txsim/account.go index d3d84e77e8..e44bfff805 100644 --- a/testing/txsim/account.go +++ b/testing/txsim/account.go @@ -4,7 +4,6 @@ import ( "context" "fmt" "strings" - "sync" "github.com/celestiaorg/celestia-app/app" "github.com/cosmos/cosmos-sdk/client" @@ -26,7 +25,6 @@ const ( ) type AccountManager struct { - mtx sync.Mutex keys keyring.Keyring masterAccount *Account accounts map[string]*Account @@ -281,8 +279,6 @@ func (am *AccountManager) setupAccountMsgs(account *Account) ([]types.Msg, error } func (am *AccountManager) signTransaction(builder client.TxBuilder) error { - am.mtx.Lock() - defer am.mtx.Unlock() signers := builder.GetTx().GetSigners() for _, signer := range signers { _, ok := am.accounts[signer.String()] diff --git a/testing/txsim/client.go b/testing/txsim/client.go index 66de6ee2e8..ed87e07a06 100644 --- a/testing/txsim/client.go +++ b/testing/txsim/client.go @@ -29,15 +29,17 @@ const ( var errTimedOutWaitingForTx = errors.New("timed out waiting for tx to be committed (1 minute)") -// TxClient is a client for submitting transactions to one of several nodes. +// TxClient is a client for submitting transactions to one of several nodes. It uses a round-robin +// algorithm for multiplexing requests across multiple clients. type TxClient struct { rpcClients []*http.HTTP encCfg encoding.Config chainID string pollTime time.Duration - mtx sync.Mutex - sequence int + mtx sync.Mutex + // index indicates which client to use next + index int height int64 lastUpdated time.Time } @@ -190,7 +192,7 @@ func (tc *TxClient) Client() *http.HTTP { tc.mtx.Lock() defer tc.mtx.Unlock() defer tc.next() - return tc.rpcClients[tc.sequence] + return tc.rpcClients[tc.index] } // Broadcast encodes and broadcasts a transaction to the network. If CheckTx fails, @@ -223,15 +225,18 @@ func (tc *TxClient) Broadcast(ctx context.Context, txBuilder sdkclient.TxBuilder return tc.WaitForTx(ctx, resp.Hash) } +// next iterates the index of the RPC clients. It is not thread safe and should be called within a mutex. func (tc *TxClient) next() { - tc.sequence = (tc.sequence + 1) % len(tc.rpcClients) + tc.index = (tc.index + 1) % len(tc.rpcClients) } +// QueryClient multiplexes requests across multiple running gRPC connections. It does this in a round-robin fashion. type QueryClient struct { connections []*grpc.ClientConn - mtx sync.Mutex - sequence int + mtx sync.Mutex + // index indicates which client to be used next + index int } func NewQueryClient(grpcEndpoints []string) (*QueryClient, error) { @@ -249,15 +254,16 @@ func NewQueryClient(grpcEndpoints []string) (*QueryClient, error) { }, nil } +// next iterates the index of the RPC clients. It is not thread safe and should be called within a mutex. func (qc *QueryClient) next() { - qc.sequence = (qc.sequence + 1) % len(qc.connections) + qc.index = (qc.index + 1) % len(qc.connections) } func (qc *QueryClient) Conn() protogrpc.ClientConn { qc.mtx.Lock() defer qc.mtx.Unlock() defer qc.next() - return qc.connections[qc.sequence] + return qc.connections[qc.index] } func (qc *QueryClient) Bank() bank.QueryClient { diff --git a/testing/txsim/run_test.go b/testing/txsim/run_test.go index c47c7f0b0c..1eacd098ec 100644 --- a/testing/txsim/run_test.go +++ b/testing/txsim/run_test.go @@ -57,8 +57,8 @@ func TestTxSimulator(t *testing.T) { { name: "multi blob sequence", sequences: txsim.NewBlobSequence( - txsim.NewRange(100, 1000), - txsim.NewRange(1, 3), + txsim.NewRange(1000, 1000), + txsim.NewRange(1, 1), ).Clone(4), expMessages: map[string]int64{sdk.MsgTypeURL(&blob.MsgPayForBlobs{}): 20}, }, @@ -67,7 +67,7 @@ func TestTxSimulator(t *testing.T) { sequences: append(append( txsim.NewSendSequence(2, 1000).Clone(3), txsim.NewStakeSequence(1000).Clone(3)...), - txsim.NewBlobSequence(txsim.NewRange(100, 400), txsim.NewRange(1, 4)).Clone(2)...), + txsim.NewBlobSequence(txsim.NewRange(1000, 1000), txsim.NewRange(1, 1)).Clone(3)...), expMessages: map[string]int64{ sdk.MsgTypeURL(&bank.MsgSend{}): 15, sdk.MsgTypeURL(&staking.MsgDelegate{}): 2, @@ -115,7 +115,7 @@ func TestTxSimulator(t *testing.T) { } } -func Setup(t *testing.T) (keyring.Keyring, string, string) { +func Setup(t testing.TB) (keyring.Keyring, string, string) { t.Helper() genesis, keyring, err := testnode.DefaultGenesisState() require.NoError(t, err) diff --git a/testutil/testnode/full_node.go b/testutil/testnode/full_node.go index 315cbd30a4..3886b63f7c 100644 --- a/testutil/testnode/full_node.go +++ b/testutil/testnode/full_node.go @@ -41,7 +41,7 @@ import ( // NOTE: the forced delay between blocks, TimeIotaMs in the consensus // parameters, is set to the lowest possible value (1ms). func New( - t *testing.T, + t testing.TB, cparams *tmproto.ConsensusParams, tmCfg *config.Config, supressLog bool, diff --git a/testutil/testnode/node_init.go b/testutil/testnode/node_init.go index f79197da86..e28813b470 100644 --- a/testutil/testnode/node_init.go +++ b/testutil/testnode/node_init.go @@ -188,7 +188,7 @@ func writeFile(name string, dir string, contents []byte) error { return nil } -func initFileStructure(t *testing.T, tmCfg *config.Config) (string, error) { +func initFileStructure(t testing.TB, tmCfg *config.Config) (string, error) { basePath := filepath.Join(t.TempDir(), ".celestia-app") tmCfg.SetRoot(basePath) configPath := filepath.Join(basePath, "config") diff --git a/testutil/testnode/read.go b/testutil/testnode/read.go index bc6189dfd1..3181124d4d 100644 --- a/testutil/testnode/read.go +++ b/testutil/testnode/read.go @@ -11,6 +11,21 @@ import ( "github.com/tendermint/tendermint/types" ) +func ReadRecentBlocks(ctx context.Context, rpcAddress string, blocks int64) ([]*types.Block, error) { + client, err := http.New(rpcAddress, "/websocket") + if err != nil { + return nil, err + } + status, err := client.Status(ctx) + if err != nil { + return nil, err + } + if status.SyncInfo.LatestBlockHeight < blocks { + return nil, fmt.Errorf("latest block height %d is less than requested blocks %d", status.SyncInfo.LatestBlockHeight, blocks) + } + return ReadBlockHeights(ctx, rpcAddress, status.SyncInfo.LatestBlockHeight-blocks+1, status.SyncInfo.LatestBlockHeight) +} + func ReadBlockchain(ctx context.Context, rpcAddress string) ([]*types.Block, error) { client, err := http.New(rpcAddress, "/websocket") if err != nil { @@ -52,3 +67,67 @@ func DecodeBlockData(data types.Data) ([]sdk.Msg, error) { } return msgs, nil } + +func CalculateMeanGasFromRecentBlocks(ctx context.Context, rpcAddress, msgType string, blocks int64) (float64, int64, error) { + client, err := http.New(rpcAddress, "/websocket") + if err != nil { + return 0.0, 0, err + } + status, err := client.Status(ctx) + if err != nil { + return 0.0, 0, err + } + if status.SyncInfo.LatestBlockHeight <= blocks { + return 0.0, 0, fmt.Errorf("latest block height %d is less than %d", status.SyncInfo.LatestBlockHeight, blocks) + } + return CalculateMeanGas(ctx, rpcAddress, msgType, status.SyncInfo.LatestBlockHeight-blocks+1, status.SyncInfo.LatestBlockHeight) +} + +func CalculateMeanGas(ctx context.Context, rpcAddress, msgType string, fromHeight int64, toHeight int64) (float64, int64, error) { + var ( + encCfg = encoding.MakeConfig(app.ModuleEncodingRegisters...) + decoder = encoding.IndexWrapperDecoder(encCfg.TxConfig.TxDecoder()) + totalGas int64 = 0 + count int64 = 0 + average = func() float64 { + if count == 0 { + return 0 + } + return float64(totalGas) / float64(count) + } + ) + client, err := http.New(rpcAddress, "/websocket") + if err != nil { + return 0.0, 0, err + } + + for height := fromHeight; height <= toHeight; height++ { + resp, err := client.Block(ctx, &height) + if err != nil { + return average(), count, err + } + indices := make([]int, 0, len(resp.Block.Data.Txs)) + for i, rawTx := range resp.Block.Data.Txs { + tx, err := decoder(rawTx) + if err != nil { + return average(), count, fmt.Errorf("decoding tx (height: %d): %w", height, err) + } + msgs := tx.GetMsgs() + // multi message transactions are not included + if len(msgs) == 1 && sdk.MsgTypeURL(msgs[0]) == msgType { + indices = append(indices, i) + } + } + if len(indices) > 0 { + results, err := client.BlockResults(ctx, &height) + if err != nil { + return average(), count, fmt.Errorf("getting block results (height %d): %w", height, err) + } + for _, i := range indices { + totalGas += results.TxsResults[i].GasUsed + count++ + } + } + } + return average(), count, nil +} From 5cf229f233bfe672252403cfff3b274454c313bc Mon Sep 17 00:00:00 2001 From: Callum Waters Date: Thu, 13 Apr 2023 11:55:05 +0200 Subject: [PATCH 06/10] add some concurrency protection to the map --- testing/txsim/account.go | 46 +++++++++++++++++++++++++++++----------- 1 file changed, 34 insertions(+), 12 deletions(-) diff --git a/testing/txsim/account.go b/testing/txsim/account.go index e44bfff805..685cf3ec17 100644 --- a/testing/txsim/account.go +++ b/testing/txsim/account.go @@ -4,6 +4,7 @@ import ( "context" "fmt" "strings" + "sync" "github.com/celestiaorg/celestia-app/app" "github.com/cosmos/cosmos-sdk/client" @@ -20,17 +21,23 @@ import ( ) const ( + // set default gas limit to cover the costs of most transactions + // At 0.001 utia per gas, this equates to 1000utia per transaction + // In the future we may want to give some access to the sequence itself gasLimit = 1000000 feeAmount = 1000 ) type AccountManager struct { - keys keyring.Keyring + keys keyring.Keyring + tx *TxClient + query *QueryClient + pending []*Account + + // to protect from concurrent writes to the map + mtx sync.Mutex masterAccount *Account accounts map[string]*Account - pending []*Account - tx *TxClient - query *QueryClient } type Account struct { @@ -76,6 +83,9 @@ func NewAccountManager(ctx context.Context, keys keyring.Keyring, txClient *TxCl // the highest balance as the master account. Accounts that don't yet exist on chain are // ignored. func (am *AccountManager) setupMasterAccount(ctx context.Context) error { + am.mtx.Lock() + defer am.mtx.Unlock() + records, err := am.keys.List() if err != nil { return err @@ -158,6 +168,7 @@ func (am *AccountManager) AllocateAccounts(n, balance int) []types.AccAddress { return addresses } +// Submit executes on an operation. This is thread safe. func (am *AccountManager) Submit(ctx context.Context, op Operation) error { for _, msg := range op.Msgs { if err := msg.ValidateBasic(); err != nil { @@ -196,17 +207,15 @@ func (am *AccountManager) Submit(ctx context.Context, op Operation) error { signers := builder.GetTx().GetSigners() + // increment the sequence number for all the signers + am.incrementSignerSequences(signers) + log.Info(). Int64("height", resp.Height). Str("signers", addrsToString(signers)). Str("msgs", msgsToString(op.Msgs)). Msg("tx committed") - // increment the sequence number for all the signers - for _, signer := range signers { - am.accounts[signer.String()].Sequence++ - } - return nil } @@ -372,15 +381,20 @@ func (am *AccountManager) createSignature(account *Account, builder client.TxBui } func (am *AccountManager) updateAccount(ctx context.Context, account *Account) error { - var err error - account.Balance, err = am.getBalance(ctx, account.Address) + newBalance, err := am.getBalance(ctx, account.Address) if err != nil { return fmt.Errorf("getting account balance: %w", err) } - account.AccountNumber, account.Sequence, err = am.getAccountDetails(ctx, account.Address) + newAccountNumber, newSequence, err := am.getAccountDetails(ctx, account.Address) if err != nil { return fmt.Errorf("getting account details: %w", err) } + + am.mtx.Lock() + defer am.mtx.Unlock() + account.Balance = newBalance + account.AccountNumber = newAccountNumber + account.Sequence = newSequence return nil } @@ -414,6 +428,14 @@ func (am *AccountManager) getAccountDetails(ctx context.Context, address types.A return acc.GetAccountNumber(), acc.GetSequence(), nil } +func (am *AccountManager) incrementSignerSequences(signers []types.AccAddress) { + am.mtx.Lock() + defer am.mtx.Unlock() + for _, signer := range signers { + am.accounts[signer.String()].Sequence++ + } +} + func (am *AccountManager) nextAccountName() string { return accountName(len(am.pending) + len(am.accounts)) } From 846d4b14a7337e74ffbc0c53ef5af309d267795e Mon Sep 17 00:00:00 2001 From: Callum Waters Date: Tue, 18 Apr 2023 10:20:17 +0200 Subject: [PATCH 07/10] allow sequences to dictate gas --- app/estimate_square_size.go | 2 +- testing/txsim/account.go | 48 +++++++++++++------------------------ testing/txsim/blob.go | 30 +++++++++++++++++++---- testing/txsim/client.go | 45 ++++++++++++++++++++++++---------- testing/txsim/run.go | 3 ++- testing/txsim/run_test.go | 8 +++---- testing/txsim/send.go | 28 ++++++++++++++++------ testing/txsim/sequence.go | 16 ++++++++++--- testing/txsim/stake.go | 2 +- 9 files changed, 117 insertions(+), 65 deletions(-) diff --git a/app/estimate_square_size.go b/app/estimate_square_size.go index 1d4eb96c55..b5a07926aa 100644 --- a/app/estimate_square_size.go +++ b/app/estimate_square_size.go @@ -34,7 +34,7 @@ func estimateSquareSize(normalTxs [][]byte, blobTxs []core.BlobTx) (squareSize u // estimate as much totalSharesUsed := uint64(txSharesUsed + pfbTxSharesUsed + blobSharesUsed) totalSharesUsed *= 2 - minSize := uint64(math.Sqrt(float64(totalSharesUsed))) + minSize := uint64(math.Sqrt(float64(totalSharesUsed))) + 1 squareSize = shares.RoundUpPowerOfTwo(minSize) if squareSize >= appconsts.DefaultMaxSquareSize { squareSize = appconsts.DefaultMaxSquareSize diff --git a/testing/txsim/account.go b/testing/txsim/account.go index 685cf3ec17..97528a769f 100644 --- a/testing/txsim/account.go +++ b/testing/txsim/account.go @@ -16,17 +16,10 @@ import ( authsigning "github.com/cosmos/cosmos-sdk/x/auth/signing" auth "github.com/cosmos/cosmos-sdk/x/auth/types" bank "github.com/cosmos/cosmos-sdk/x/bank/types" - "github.com/cosmos/cosmos-sdk/x/feegrant" "github.com/rs/zerolog/log" ) -const ( - // set default gas limit to cover the costs of most transactions - // At 0.001 utia per gas, this equates to 1000utia per transaction - // In the future we may want to give some access to the sequence itself - gasLimit = 1000000 - feeAmount = 1000 -) +const defaultFee = DefaultGasLimit * DefaultGasPrice type AccountManager struct { keys keyring.Keyring @@ -183,10 +176,17 @@ func (am *AccountManager) Submit(ctx context.Context, op Operation) error { return fmt.Errorf("error setting messages: %w", err) } - builder.SetFeeAmount(types.NewCoins(types.NewInt64Coin(app.BondDenom, feeAmount))) - // the master account is responsible for paying the fees - builder.SetFeeGranter(am.masterAccount.Address) - builder.SetGasLimit(gasLimit) + if op.GasLimit == 0 { + builder.SetGasLimit(DefaultGasLimit) + builder.SetFeeAmount(types.NewCoins(types.NewInt64Coin(app.BondDenom, int64(defaultFee)))) + } else { + builder.SetGasLimit(op.GasLimit) + if op.GasPrice > 0 { + builder.SetFeeAmount(types.NewCoins(types.NewInt64Coin(app.BondDenom, int64(float64(op.GasLimit)*op.GasPrice)))) + } else { + builder.SetFeeAmount(types.NewCoins(types.NewInt64Coin(app.BondDenom, int64(float64(op.GasLimit)*DefaultGasPrice)))) + } + } if err := am.signTransaction(builder); err != nil { return err @@ -229,12 +229,12 @@ func (am *AccountManager) GenerateAccounts(ctx context.Context) error { msgs := make([]types.Msg, 0) // batch together all the messages needed to create all the accounts for _, acc := range am.pending { - accMsgs, err := am.setupAccountMsgs(acc) - if err != nil { - return fmt.Errorf("generating account %s: %w", acc.Address, err) + if am.masterAccount.Balance < acc.Balance { + return fmt.Errorf("master account has insufficient funds") } - msgs = append(msgs, accMsgs...) + bankMsg := bank.NewMsgSend(am.masterAccount.Address, acc.Address, types.NewCoins(types.NewInt64Coin(app.BondDenom, acc.Balance))) + msgs = append(msgs, bankMsg) } err := am.Submit(ctx, Operation{Msgs: msgs}) @@ -271,22 +271,6 @@ func (am *AccountManager) GenerateAccounts(ctx context.Context) error { return nil } -// setupAccount initializes the account on chain with the given balance. It also sets up -// a grant such that the master account covers the fees of any message sent. -func (am *AccountManager) setupAccountMsgs(account *Account) ([]types.Msg, error) { - if am.masterAccount.Balance < account.Balance { - return nil, fmt.Errorf("master account has insufficient funds") - } - - // create a feegrant message so that the master account pays for all the fees of the sub accounts - feegrantMsg, err := feegrant.NewMsgGrantAllowance(&feegrant.BasicAllowance{}, am.masterAccount.Address, account.Address) - if err != nil { - return nil, fmt.Errorf("error creating feegrant message: %w", err) - } - bankMsg := bank.NewMsgSend(am.masterAccount.Address, account.Address, types.NewCoins(types.NewInt64Coin(app.BondDenom, account.Balance))) - return []types.Msg{feegrantMsg, bankMsg}, nil -} - func (am *AccountManager) signTransaction(builder client.TxBuilder) error { signers := builder.GetTx().GetSigners() for _, signer := range signers { diff --git a/testing/txsim/blob.go b/testing/txsim/blob.go index 8a09cd60e8..7b0a58053b 100644 --- a/testing/txsim/blob.go +++ b/testing/txsim/blob.go @@ -5,6 +5,7 @@ import ( "fmt" "math/rand" + "github.com/celestiaorg/celestia-app/pkg/appconsts" ns "github.com/celestiaorg/celestia-app/pkg/namespace" "github.com/celestiaorg/celestia-app/testutil/blobfactory" blob "github.com/celestiaorg/celestia-app/x/blob/types" @@ -14,6 +15,9 @@ import ( var _ Sequence = &BlobSequence{} +// As napkin math, this would cover the cost of 8267 4KB blobs +const fundsForGas = 1e9 // 1000 TIA + // BlobSequence defines a pattern whereby a single user repeatedly sends a pay for blob // message roughly every height. The PFB may consist of several blobs type BlobSequence struct { @@ -24,7 +28,7 @@ type BlobSequence struct { account types.AccAddress } -func NewBlobSequence(sizes Range, blobsPerPFB Range) *BlobSequence { +func NewBlobSequence(sizes, blobsPerPFB Range) *BlobSequence { return &BlobSequence{ sizes: sizes, blobsPerPFB: blobsPerPFB, @@ -51,7 +55,7 @@ func (s *BlobSequence) Clone(n int) []Sequence { } func (s *BlobSequence) Init(_ context.Context, _ grpc.ClientConn, allocateAccounts AccountAllocator, _ *rand.Rand) { - s.account = allocateAccounts(1, 1)[0] + s.account = allocateAccounts(1, fundsForGas)[0] } func (s *BlobSequence) Next(ctx context.Context, querier grpc.ClientConn, rand *rand.Rand) (Operation, error) { @@ -80,8 +84,9 @@ func (s *BlobSequence) Next(ctx context.Context, querier grpc.ClientConn, rand * return Operation{}, err } return Operation{ - Msgs: []types.Msg{msg}, - Blobs: blobs, + Msgs: []types.Msg{msg}, + Blobs: blobs, + GasLimit: EstimateGas(sizes), }, nil } @@ -94,9 +99,26 @@ func NewRange(min, max int) Range { return Range{Min: min, Max: max} } +// Rand returns a random number between min (inclusive) and max (exclusive). func (r Range) Rand(rand *rand.Rand) int { if r.Max <= r.Min { return r.Min } return rand.Intn(r.Max-r.Min) + r.Min } + +const ( + perByteGasTolerance = 2 + pfbGasFixedCost = 80000 +) + +// EstimateGas estimates the gas required to pay for a set of blobs in a PFB. +func EstimateGas(blobSizes []int) uint64 { + totalByteCount := 0 + for _, size := range blobSizes { + totalByteCount += size + } + variableGasAmount := (appconsts.DefaultGasPerBlobByte + perByteGasTolerance) * totalByteCount + + return uint64(variableGasAmount + pfbGasFixedCost) +} diff --git a/testing/txsim/client.go b/testing/txsim/client.go index ed87e07a06..0e71a95677 100644 --- a/testing/txsim/client.go +++ b/testing/txsim/client.go @@ -21,13 +21,18 @@ import ( "google.golang.org/grpc/credentials/insecure" ) -// how often to poll the network for the latest height const ( + // how often to poll the network for the latest height DefaultPollTime = 3 * time.Second - maxRetries = 20 + + // how many times to wait for a transaction to be committed before + // concluding that it has failed + maxRetries = 10 + + rpcContextTimeout = 10 * time.Second ) -var errTimedOutWaitingForTx = errors.New("timed out waiting for tx to be committed (1 minute)") +var errTimedOutWaitingForTx = errors.New("timed out waiting for tx to be committed") // TxClient is a client for submitting transactions to one of several nodes. It uses a round-robin // algorithm for multiplexing requests across multiple clients. @@ -168,8 +173,16 @@ func (tc *TxClient) WaitForHeight(ctx context.Context, height int64) error { func (tc *TxClient) WaitForTx(ctx context.Context, txID []byte) (*coretypes.ResultTx, error) { for i := 0; i < maxRetries; i++ { - resp, err := tc.Client().Tx(ctx, txID, false) + subctx, cancel := context.WithTimeout(ctx, rpcContextTimeout) + defer cancel() + + resp, err := tc.Client().Tx(subctx, txID, false) if err != nil { + // sub context timed out but the parent hasn't (we retry) + if subctx.Err() != nil && ctx.Err() == nil { + continue + } + // tx still no longer exists if strings.Contains(err.Error(), "not found") { time.Sleep(tc.pollTime) @@ -213,16 +226,24 @@ func (tc *TxClient) Broadcast(ctx context.Context, txBuilder sdkclient.TxBuilder tx = txWithBlobs } - resp, err := tc.Client().BroadcastTxSync(ctx, tx) - if err != nil { - return nil, fmt.Errorf("broadcast commit: %w", err) - } + for { + subctx, cancel := context.WithTimeout(ctx, rpcContextTimeout) + defer cancel() - if resp.Code != 0 { - return nil, fmt.Errorf("non zero code checking tx (%d): %s", resp.Code, resp.Log) - } + resp, err := tc.Client().BroadcastTxSync(subctx, tx) + if err != nil { + if subctx.Err() != nil { + continue + } + return nil, err + } + + if resp.Code != 0 { + return nil, fmt.Errorf("non zero code checking tx (%d): %s", resp.Code, resp.Log) + } - return tc.WaitForTx(ctx, resp.Hash) + return tc.WaitForTx(ctx, resp.Hash) + } } // next iterates the index of the RPC clients. It is not thread safe and should be called within a mutex. diff --git a/testing/txsim/run.go b/testing/txsim/run.go index a6a486fb88..ac6efb2f3b 100644 --- a/testing/txsim/run.go +++ b/testing/txsim/run.go @@ -16,7 +16,8 @@ import ( // Run is the entrypoint function for starting the txsim client. The lifecycle of the client is managed // through the context. At least one grpc and rpc endpoint must be provided. The client relies on a // single funded master account present in the keyring. The client allocates subaccounts for sequences -// at runtime. A seed can be provided for deterministic randomness. +// at runtime. A seed can be provided for deterministic randomness. The pollTime dictates the frequency +// that the client should check for updates from state and that transactions have been committed. // // This should be used for testing purposes only. // diff --git a/testing/txsim/run_test.go b/testing/txsim/run_test.go index 1eacd098ec..3dacf1c705 100644 --- a/testing/txsim/run_test.go +++ b/testing/txsim/run_test.go @@ -31,7 +31,7 @@ func TestTxSimulator(t *testing.T) { }{ { name: "send sequence", - sequences: []txsim.Sequence{txsim.NewSendSequence(2, 1000)}, + sequences: []txsim.Sequence{txsim.NewSendSequence(2, 1000, 100)}, // we expect at least 5 bank send messages within 30 seconds expMessages: map[string]int64{sdk.MsgTypeURL(&bank.MsgSend{}): 5}, }, @@ -58,16 +58,16 @@ func TestTxSimulator(t *testing.T) { name: "multi blob sequence", sequences: txsim.NewBlobSequence( txsim.NewRange(1000, 1000), - txsim.NewRange(1, 1), + txsim.NewRange(3, 3), ).Clone(4), expMessages: map[string]int64{sdk.MsgTypeURL(&blob.MsgPayForBlobs{}): 20}, }, { name: "multi mixed sequence", sequences: append(append( - txsim.NewSendSequence(2, 1000).Clone(3), + txsim.NewSendSequence(2, 1000, 100).Clone(3), txsim.NewStakeSequence(1000).Clone(3)...), - txsim.NewBlobSequence(txsim.NewRange(1000, 1000), txsim.NewRange(1, 1)).Clone(3)...), + txsim.NewBlobSequence(txsim.NewRange(1000, 1000), txsim.NewRange(1, 3)).Clone(3)...), expMessages: map[string]int64{ sdk.MsgTypeURL(&bank.MsgSend{}): 15, sdk.MsgTypeURL(&staking.MsgDelegate{}): 2, diff --git a/testing/txsim/send.go b/testing/txsim/send.go index f90b8dd743..04c7477941 100644 --- a/testing/txsim/send.go +++ b/testing/txsim/send.go @@ -12,6 +12,11 @@ import ( var _ Sequence = &SendSequence{} +const ( + sendGasLimit = 100000 + sendFee = sendGasLimit * DefaultGasPrice +) + // SendSequence sets up an endless sequence of send transactions, moving tokens // between a set of accounts type SendSequence struct { @@ -19,37 +24,46 @@ type SendSequence struct { sendAmount int maxHeightDelay int accounts []types.AccAddress - sequence int + index int + numIterations int } -func NewSendSequence(numAccounts int, sendAmount int) *SendSequence { +func NewSendSequence(numAccounts, sendAmount, numIterations int) *SendSequence { return &SendSequence{ numAccounts: numAccounts, sendAmount: sendAmount, maxHeightDelay: 5, + numIterations: numIterations, } } func (s *SendSequence) Clone(n int) []Sequence { sequenceGroup := make([]Sequence, n) for i := 0; i < n; i++ { - sequenceGroup[i] = NewSendSequence(s.numAccounts, s.sendAmount) + sequenceGroup[i] = NewSendSequence(s.numAccounts, s.sendAmount, s.numIterations) } return sequenceGroup } +// Init sets up the accounts involved in the sequence. It calculates the necessary balance as the fees per transaction +// multiplied by the number of expected iterations plus the amount to be sent from one account to another func (s *SendSequence) Init(_ context.Context, _ grpc.ClientConn, allocateAccounts AccountAllocator, _ *rand.Rand) { - s.accounts = allocateAccounts(s.numAccounts, s.sendAmount) + amount := s.sendAmount + (s.numIterations * int(sendFee)) + s.accounts = allocateAccounts(s.numAccounts, amount) } // Next sumbits a transaction to remove funds from one account to the next func (s *SendSequence) Next(_ context.Context, _ grpc.ClientConn, rand *rand.Rand) (Operation, error) { + if s.index >= s.numIterations { + return Operation{}, EndOfSequence + } op := Operation{ Msgs: []types.Msg{ - bank.NewMsgSend(s.accounts[s.sequence%s.numAccounts], s.accounts[(s.sequence+1)%s.numAccounts], types.NewCoins(types.NewInt64Coin(app.BondDenom, int64(s.sendAmount)))), + bank.NewMsgSend(s.accounts[s.index%s.numAccounts], s.accounts[(s.index+1)%s.numAccounts], types.NewCoins(types.NewInt64Coin(app.BondDenom, int64(s.sendAmount)))), }, - Delay: rand.Int63n(int64(s.maxHeightDelay)), + Delay: rand.Int63n(int64(s.maxHeightDelay)), + GasLimit: sendGasLimit, } - s.sequence++ + s.index++ return op, nil } diff --git a/testing/txsim/sequence.go b/testing/txsim/sequence.go index 0e9beacf20..3f088fb00e 100644 --- a/testing/txsim/sequence.go +++ b/testing/txsim/sequence.go @@ -32,12 +32,22 @@ type Sequence interface { // An operation represents a series of messages and blobs that are to be bundled in a // single transaction. A delay (in heights) may also be set before the transaction is sent. +// The gas limit and price can also be set. If left at 0, the DefaultGasLimit will be used. type Operation struct { - Msgs []types.Msg - Blobs []*blob.Blob - Delay int64 + Msgs []types.Msg + Blobs []*blob.Blob + Delay int64 + GasLimit uint64 + GasPrice float64 } +const ( + // set default gas limit to cover the costs of most transactions + // At 0.001 utia per gas, this equates to 1000utia per transaction + DefaultGasLimit = 1000000 + DefaultGasPrice = 0.001 +) + // EndOfSequence is a special error which indicates that the sequence has been terminated // nolint: revive var EndOfSequence = errors.New("end of sequence") diff --git a/testing/txsim/stake.go b/testing/txsim/stake.go index d2da16fa23..c2db0439e3 100644 --- a/testing/txsim/stake.go +++ b/testing/txsim/stake.go @@ -39,7 +39,7 @@ func (s *StakeSequence) Clone(n int) []Sequence { } func (s *StakeSequence) Init(_ context.Context, _ grpc.ClientConn, allocateAccounts AccountAllocator, _ *rand.Rand) { - s.account = allocateAccounts(1, s.initialStake)[0] + s.account = allocateAccounts(1, s.initialStake+fundsForGas)[0] } func (s *StakeSequence) Next(ctx context.Context, querier grpc.ClientConn, rand *rand.Rand) (Operation, error) { From 9bc19b386bca188fca14fa34aaf116b6c3dd04cc Mon Sep 17 00:00:00 2001 From: Callum Waters Date: Tue, 18 Apr 2023 13:38:32 +0200 Subject: [PATCH 08/10] lint --- testutil/testnode/read.go | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/testutil/testnode/read.go b/testutil/testnode/read.go index 3181124d4d..c4d873342b 100644 --- a/testutil/testnode/read.go +++ b/testutil/testnode/read.go @@ -85,11 +85,11 @@ func CalculateMeanGasFromRecentBlocks(ctx context.Context, rpcAddress, msgType s func CalculateMeanGas(ctx context.Context, rpcAddress, msgType string, fromHeight int64, toHeight int64) (float64, int64, error) { var ( - encCfg = encoding.MakeConfig(app.ModuleEncodingRegisters...) - decoder = encoding.IndexWrapperDecoder(encCfg.TxConfig.TxDecoder()) - totalGas int64 = 0 - count int64 = 0 - average = func() float64 { + encCfg = encoding.MakeConfig(app.ModuleEncodingRegisters...) + decoder = encoding.IndexWrapperDecoder(encCfg.TxConfig.TxDecoder()) + totalGas int64 + count int64 + average = func() float64 { if count == 0 { return 0 } From 304cf78dd2734b5b6eb930f79d139a347e34900a Mon Sep 17 00:00:00 2001 From: Callum Waters Date: Tue, 18 Apr 2023 13:49:59 +0200 Subject: [PATCH 09/10] fix estimation tests --- app/estimate_square_size.go | 2 +- app/estimate_square_size_test.go | 6 +++--- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/app/estimate_square_size.go b/app/estimate_square_size.go index b5a07926aa..f3f648788c 100644 --- a/app/estimate_square_size.go +++ b/app/estimate_square_size.go @@ -34,7 +34,7 @@ func estimateSquareSize(normalTxs [][]byte, blobTxs []core.BlobTx) (squareSize u // estimate as much totalSharesUsed := uint64(txSharesUsed + pfbTxSharesUsed + blobSharesUsed) totalSharesUsed *= 2 - minSize := uint64(math.Sqrt(float64(totalSharesUsed))) + 1 + minSize := uint64(math.Ceil(math.Sqrt(float64(totalSharesUsed)))) squareSize = shares.RoundUpPowerOfTwo(minSize) if squareSize >= appconsts.DefaultMaxSquareSize { squareSize = appconsts.DefaultMaxSquareSize diff --git a/app/estimate_square_size_test.go b/app/estimate_square_size_test.go index 2f43921a4a..2556d02f6a 100644 --- a/app/estimate_square_size_test.go +++ b/app/estimate_square_size_test.go @@ -25,12 +25,12 @@ func Test_estimateSquareSize(t *testing.T) { } tests := []test{ {"empty block", 0, 0, 0, appconsts.DefaultMinSquareSize}, - {"one normal tx", 1, 0, 0, appconsts.DefaultMinSquareSize}, + {"one normal tx", 1, 0, 0, 2}, {"one small pfb small block", 0, 1, 100, 2}, {"mixed small block", 10, 12, 500, 16}, {"small block 2", 0, 12, 1000, 16}, {"mixed medium block 2", 10, 20, 10000, 32}, - {"one large pfb large block", 0, 1, 1000000, 64}, + {"one large pfb large block", 0, 1, 1000000, appconsts.DefaultMaxSquareSize}, {"one hundred large pfb large block", 0, 100, 100000, appconsts.DefaultMaxSquareSize}, {"one hundred large pfb medium block", 100, 100, 100000, appconsts.DefaultMaxSquareSize}, {"mixed transactions large block", 100, 100, 100000, appconsts.DefaultMaxSquareSize}, @@ -75,7 +75,7 @@ func Test_estimateSquareSize_MultiBlob(t *testing.T) { func() [][]int { return blobfactory.Repeat([]int{1000}, 10) }, - 8, 8, + 16, 8, }, { "10 multiblob 4 share transactions", From 34f12c90d533154668603e9ebafcec3980acd74c Mon Sep 17 00:00:00 2001 From: Callum Waters Date: Tue, 18 Apr 2023 14:38:40 +0200 Subject: [PATCH 10/10] fix node test --- testutil/testnode/full_node_test.go | 2 +- testutil/testnode/node_interaction_api.go | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/testutil/testnode/full_node_test.go b/testutil/testnode/full_node_test.go index ede2b2708b..40f4980131 100644 --- a/testutil/testnode/full_node_test.go +++ b/testutil/testnode/full_node_test.go @@ -67,7 +67,7 @@ func TestIntegrationTestSuite(t *testing.T) { func (s *IntegrationTestSuite) Test_FillBlock() { require := s.Require() - for squareSize := 2; squareSize < appconsts.DefaultMaxSquareSize; squareSize *= 2 { + for squareSize := 2; squareSize <= appconsts.DefaultMaxSquareSize; squareSize *= 2 { resp, err := s.cctx.FillBlock(squareSize, s.accounts, flags.BroadcastAsync) require.NoError(err) diff --git a/testutil/testnode/node_interaction_api.go b/testutil/testnode/node_interaction_api.go index 2a451fafa3..660e041c09 100644 --- a/testutil/testnode/node_interaction_api.go +++ b/testutil/testnode/node_interaction_api.go @@ -177,7 +177,7 @@ func (c *Context) FillBlock(squareSize int, accounts []string, broadcastMode str // in order to get the square size that we want, we need to fill half the // square minus a few for the tx (see the square estimation logic in // app/estimate_square_size.go) - shareCount := (squareSize * squareSize / 2) - 1 + shareCount := (squareSize * squareSize / 2) - 2 // we use a formula to guarantee that the tx is the exact size needed to force a specific square size. blobSize := shareCount * appconsts.ContinuationSparseShareContentSize // this last patch allows for the formula above to work on a square size of