Skip to content

Commit

Permalink
sending txs in batches
Browse files Browse the repository at this point in the history
  • Loading branch information
goran-ethernal committed Apr 17, 2024
1 parent 6f675df commit 595df44
Show file tree
Hide file tree
Showing 8 changed files with 327 additions and 225 deletions.
8 changes: 8 additions & 0 deletions command/loadtest/load_test_run.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,13 @@ func setFlags(cmd *cobra.Command) {
"waits for tx pool to empty before collecting results",
)

cmd.Flags().IntVar(
&params.batchSize,
batchSizeFlag,
1,
"size of a batch of transactions to send to rpc node",
)

_ = cmd.MarkFlagRequired(mnemonicFlag)
_ = cmd.MarkFlagRequired(loadTestTypeFlag)
}
Expand All @@ -124,6 +131,7 @@ func runCommand(cmd *cobra.Command, _ []string) {
TxPoolTimeout: params.txPoolTimeout,
VUs: params.vus,
TxsPerUser: params.txsPerUser,
BatchSize: params.batchSize,
DynamicTxs: params.dynamicTxs,
ResultsToJSON: params.toJSON,
WaitForTxPoolToEmpty: params.waitForTxPoolToEmpty,
Expand Down
7 changes: 7 additions & 0 deletions command/loadtest/params.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ const (
vusFlag = "vus"
txsPerUserFlag = "txs-per-user"
dynamicTxsFlag = "dynamic"
batchSizeFlag = "batch-size"

saveToJSONFlag = "to-json"
waitForTxPoolToEmptyFlag = "wait-txpool"
Expand All @@ -29,6 +30,7 @@ var (
errUnsupportedLoadTestType = errors.New("unsupported load test type")
errInvalidVUs = errors.New("vus must be greater than 0")
errInvalidTxsPerUser = errors.New("txs-per-user must be greater than 0")
errInvalidBatchSize = errors.New("batch-size must be greater than 0 and les or equal to txs-per-user")
)

type loadTestParams struct {
Expand All @@ -42,6 +44,7 @@ type loadTestParams struct {

vus int
txsPerUser int
batchSize int

dynamicTxs bool
toJSON bool
Expand Down Expand Up @@ -69,5 +72,9 @@ func (ltp *loadTestParams) validateFlags() error {
return errInvalidTxsPerUser
}

if ltp.batchSize < 1 || ltp.batchSize > ltp.txsPerUser {
return errInvalidBatchSize
}

return nil
}
133 changes: 130 additions & 3 deletions loadtest/runner/base_load_test_runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package runner

import (
"context"
"encoding/hex"
"encoding/json"
"fmt"
"math"
Expand Down Expand Up @@ -48,6 +49,8 @@ type BaseLoadTestRunner struct {

resultsCollectedCh chan *stats
done chan error

batchSender *TransactionBatchSender
}

// NewBaseLoadTestRunner creates a new instance of BaseLoadTestRunner with the provided LoadTestConfig.
Expand Down Expand Up @@ -82,6 +85,7 @@ func NewBaseLoadTestRunner(cfg LoadTestConfig) (*BaseLoadTestRunner, error) {
client: client,
resultsCollectedCh: make(chan *stats),
done: make(chan error),
batchSender: newTransactionBatchSender(cfg.JSONRPCUrl),
}, nil
}

Expand Down Expand Up @@ -667,8 +671,7 @@ func (r *BaseLoadTestRunner) saveResultsToJSONFile(
// The transaction hashes are appended to the allTxnHashes slice.
// Finally, the function prints the time taken to send the transactions
// and returns the transaction hashes and nil error.
func (r *BaseLoadTestRunner) sendTransactions(
sendFn func(*account, *big.Int, *progressbar.ProgressBar) ([]types.Hash, []error, error)) ([]types.Hash, error) {
func (r *BaseLoadTestRunner) sendTransactions(createTxnFn func(*account, *feeData, *big.Int) *types.Transaction) ([]types.Hash, error) {

Check failure on line 674 in loadtest/runner/base_load_test_runner.go

View workflow job for this annotation

GitHub Actions / Lint / Run Lint

line is 136 characters (lll)
fmt.Println("=============================================================")

chainID, err := r.client.ChainID()
Expand All @@ -691,6 +694,11 @@ func (r *BaseLoadTestRunner) sendTransactions(

g, ctx := errgroup.WithContext(context.Background())

sendFn := r.sendTransactionsForUser
if r.cfg.BatchSize > 1 {
sendFn = r.sendTransactionsForUserInBatches
}

for _, vu := range r.vus {
vu := vu

Expand All @@ -700,7 +708,7 @@ func (r *BaseLoadTestRunner) sendTransactions(
return ctx.Err()

default:
txnHashes, sendErrors, err := sendFn(vu, chainID, bar)
txnHashes, sendErrors, err := sendFn(vu, chainID, bar, createTxnFn)
if err != nil {
return err
}
Expand Down Expand Up @@ -728,6 +736,125 @@ func (r *BaseLoadTestRunner) sendTransactions(
return allTxnHashes, nil
}

// sendTransactionsForUser sends ERC20 token transactions for a given user account.
// It takes an account pointer and a chainID as input parameters.
// It returns a slice of transaction hashes and an error if any.
func (r *BaseLoadTestRunner) sendTransactionsForUser(account *account, chainID *big.Int,
bar *progressbar.ProgressBar, createTxnFn func(*account, *feeData, *big.Int) *types.Transaction,
) ([]types.Hash, []error, error) {
txRelayer, err := txrelayer.NewTxRelayer(
txrelayer.WithClient(r.client),
txrelayer.WithChainID(chainID),
txrelayer.WithCollectTxnHashes(),
txrelayer.WithNoWaiting(),
txrelayer.WithEstimateGasFallback(),
txrelayer.WithoutNonceGet(),
)
if err != nil {
return nil, nil, err
}

feeData, err := getFeeData(r.client, r.cfg.DynamicTxs)
if err != nil {
return nil, nil, err
}

sendErrs := make([]error, 0)
checkFeeDataNum := r.cfg.TxsPerUser / 5

for i := 0; i < r.cfg.TxsPerUser; i++ {
if i%checkFeeDataNum == 0 {
feeData, err = getFeeData(r.client, r.cfg.DynamicTxs)
if err != nil {
return nil, nil, err
}
}

_, err = txRelayer.SendTransaction(createTxnFn(account, feeData, chainID), account.key)
if err != nil {
sendErrs = append(sendErrs, err)
}

account.nonce++
_ = bar.Add(1)
}

return txRelayer.GetTxnHashes(), sendErrs, nil
}

// sendTransactionsForUserInBatches sends user transactions in batches to the rpc node
func (r *BaseLoadTestRunner) sendTransactionsForUserInBatches(account *account, chainID *big.Int,
bar *progressbar.ProgressBar, createTxnFn func(*account, *feeData, *big.Int) *types.Transaction,
) ([]types.Hash, []error, error) {
signer := crypto.NewLondonSigner(chainID.Uint64())

numOfBatches := int(math.Ceil(float64(r.cfg.TxsPerUser) / float64(r.cfg.BatchSize)))
txHashes := make([]types.Hash, 0, r.cfg.TxsPerUser)
sendErrs := make([]error, 0)
totalTxs := 0
gas := uint64(0)

Check failure on line 795 in loadtest/runner/base_load_test_runner.go

View workflow job for this annotation

GitHub Actions / Lint / Run Lint

assigned to gas, but reassigned without using the value (wastedassign)

feeData, err := getFeeData(r.client, r.cfg.DynamicTxs)
if err != nil {
return nil, nil, err
}

txnExample := createTxnFn(account, feeData, chainID)
if txnExample.Gas() == 0 {
// estimate gas initially
gasLimit, err := r.client.EstimateGas(txrelayer.ConvertTxnToCallMsg(txnExample))
if err != nil {
gasLimit = txrelayer.DefaultGasLimit
}

gas = gasLimit * 2 // double it just in case
} else {
gas = txnExample.Gas()
}

for i := 0; i < numOfBatches; i++ {
batchTxs := make([]string, 0, r.cfg.BatchSize)

feeData, err := getFeeData(r.client, r.cfg.DynamicTxs)
if err != nil {
return nil, nil, err
}

for j := 0; j < r.cfg.BatchSize; j++ {
if totalTxs >= r.cfg.TxsPerUser {
break
}

txn := createTxnFn(account, feeData, chainID)
txn.SetGas(gas)

signedTxn, err := signer.SignTxWithCallback(txn,
func(hash types.Hash) (sig []byte, err error) {
return account.key.Sign(hash.Bytes())
})
if err != nil {
sendErrs = append(sendErrs, err)

continue
}

batchTxs = append(batchTxs, "0x"+hex.EncodeToString(signedTxn.MarshalRLP()))
account.nonce++
totalTxs++
}

hashes, err := r.batchSender.SendBatch(batchTxs)
if err != nil {
return nil, nil, err
}

txHashes = append(txHashes, hashes...)
bar.Add(len(batchTxs))

Check failure on line 852 in loadtest/runner/base_load_test_runner.go

View workflow job for this annotation

GitHub Actions / Lint / Run Lint

Error return value of `bar.Add` is not checked (errcheck)
}

return txHashes, sendErrs, nil
}

// getFeeData retrieves fee data based on the provided JSON-RPC Ethereum client and dynamicTxs flag.
// If dynamicTxs is true, it calculates the gasTipCap and gasFeeCap based on the MaxPriorityFeePerGas,
// FeeHistory, and BaseFee values obtained from the client. If dynamicTxs is false, it calculates the
Expand Down
97 changes: 24 additions & 73 deletions loadtest/runner/eoa_runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,7 @@ import (
"fmt"
"math/big"

"github.com/0xPolygon/polygon-edge/txrelayer"
"github.com/0xPolygon/polygon-edge/types"
"github.com/schollz/progressbar/v3"
"github.com/umbracle/ethgo"
)

Expand Down Expand Up @@ -50,15 +48,15 @@ func (e *EOARunner) Run() error {
go e.waitForReceiptsParallel()
go e.calculateResultsParallel()

_, err := e.sendTransactions()
_, err := e.sendTransactions(e.createEOATransaction)
if err != nil {
return err
}

return <-e.done
}

txHashes, err := e.sendTransactions()
txHashes, err := e.sendTransactions(e.createEOATransaction)
if err != nil {
return err
}
Expand All @@ -70,75 +68,28 @@ func (e *EOARunner) Run() error {
return e.calculateResults(e.waitForReceipts(txHashes))
}

// sendTransactions sends transactions for the load test.
func (e *EOARunner) sendTransactions() ([]types.Hash, error) {
return e.BaseLoadTestRunner.sendTransactions(e.sendTransactionsForUser)
}

// sendTransactionsForUser sends multiple transactions for a user account on a specific chain.
// It uses the provided client and chain ID to send transactions using either dynamic or legacy fee models.
// For each transaction, it increments the account's nonce and returns the transaction hashes.
// If an error occurs during the transaction sending process, it returns the error.
func (e *EOARunner) sendTransactionsForUser(account *account, chainID *big.Int,
bar *progressbar.ProgressBar) ([]types.Hash, []error, error) {
txRelayer, err := txrelayer.NewTxRelayer(
txrelayer.WithClient(e.client),
txrelayer.WithChainID(chainID),
txrelayer.WithCollectTxnHashes(),
txrelayer.WithNoWaiting(),
txrelayer.WithoutNonceGet(),
)
if err != nil {
return nil, nil, err
}

feeData, err := getFeeData(e.client, e.cfg.DynamicTxs)
if err != nil {
return nil, nil, err
}

sendErrs := make([]error, 0)
checkFeeDataNum := e.cfg.TxsPerUser / 5

for i := 0; i < e.cfg.TxsPerUser; i++ {
var err error

if i%checkFeeDataNum == 0 {
feeData, err = getFeeData(e.client, e.cfg.DynamicTxs)
if err != nil {
return nil, nil, err
}
}

if e.cfg.DynamicTxs {
_, err = txRelayer.SendTransaction(types.NewTx(types.NewDynamicFeeTx(
types.WithNonce(account.nonce),
types.WithTo(&receiverAddr),
types.WithValue(ethgo.Gwei(1)),
types.WithGas(21000),
types.WithFrom(account.key.Address()),
types.WithGasFeeCap(feeData.gasFeeCap),
types.WithGasTipCap(feeData.gasTipCap),
types.WithChainID(chainID),
)), account.key)
} else {
_, err = txRelayer.SendTransaction(types.NewTx(types.NewLegacyTx(
types.WithNonce(account.nonce),
types.WithTo(&receiverAddr),
types.WithValue(ethgo.Gwei(1)),
types.WithGas(21000),
types.WithGasPrice(feeData.gasPrice),
types.WithFrom(account.key.Address()),
)), account.key)
}

if err != nil {
sendErrs = append(sendErrs, err)
}

account.nonce++
_ = bar.Add(1)
// createEOATransaction creates an EOA transaction
func (e *EOARunner) createEOATransaction(account *account, feeData *feeData,
chainID *big.Int) *types.Transaction {
if e.cfg.DynamicTxs {
return types.NewTx(types.NewDynamicFeeTx(
types.WithNonce(account.nonce),
types.WithTo(&receiverAddr),
types.WithValue(ethgo.Gwei(1)),
types.WithGas(21000),
types.WithFrom(account.key.Address()),
types.WithGasFeeCap(feeData.gasFeeCap),
types.WithGasTipCap(feeData.gasTipCap),
types.WithChainID(chainID),
))
}

return txRelayer.GetTxnHashes(), sendErrs, nil
return types.NewTx(types.NewLegacyTx(
types.WithNonce(account.nonce),
types.WithTo(&receiverAddr),
types.WithValue(ethgo.Gwei(1)),
types.WithGas(21000),
types.WithGasPrice(feeData.gasPrice),
types.WithFrom(account.key.Address()),
))
}
Loading

0 comments on commit 595df44

Please sign in to comment.