Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: tx simulator #1613

Merged
merged 18 commits into from
Apr 20, 2023
Merged
Show file tree
Hide file tree
Changes from 8 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
459 changes: 459 additions & 0 deletions testing/txsim/account.go

Large diffs are not rendered by default.

102 changes: 102 additions & 0 deletions testing/txsim/blob.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
package txsim

import (
"context"
"fmt"
"math/rand"

ns "github.com/celestiaorg/celestia-app/pkg/namespace"
"github.com/celestiaorg/celestia-app/testutil/blobfactory"
blob "github.com/celestiaorg/celestia-app/x/blob/types"
"github.com/cosmos/cosmos-sdk/types"
"github.com/gogo/protobuf/grpc"
)

var _ Sequence = &BlobSequence{}

// BlobSequence defines a pattern whereby a single user repeatedly sends a pay for blob
// message roughly every height. The PFB may consist of several blobs
type BlobSequence struct {
namespace ns.Namespace
sizes Range
blobsPerPFB Range

account types.AccAddress
}

func NewBlobSequence(sizes Range, blobsPerPFB Range) *BlobSequence {
return &BlobSequence{
sizes: sizes,
blobsPerPFB: blobsPerPFB,
}
}

// WithNamespace provides the option of fixing a predefined namespace for
// all blobs.
func (s *BlobSequence) WithNamespace(namespace ns.Namespace) *BlobSequence {
s.namespace = namespace
return s
}

func (s *BlobSequence) Clone(n int) []Sequence {
sequenceGroup := make([]Sequence, n)
for i := 0; i < n; i++ {
sequenceGroup[i] = &BlobSequence{
namespace: s.namespace,
sizes: s.sizes,
blobsPerPFB: s.blobsPerPFB,
}
}
return sequenceGroup
}

func (s *BlobSequence) Init(_ context.Context, _ grpc.ClientConn, allocateAccounts AccountAllocator, _ *rand.Rand) {
s.account = allocateAccounts(1, 1)[0]
}

func (s *BlobSequence) Next(ctx context.Context, querier grpc.ClientConn, rand *rand.Rand) (Operation, error) {
numBlobs := s.blobsPerPFB.Rand(rand)
sizes := make([]int, numBlobs)
namespaces := make([]ns.Namespace, numBlobs)
for i := range sizes {
if s.namespace.ID != nil {
namespaces[i] = s.namespace
} else {
// generate a random namespace for the blob
namespace := make([]byte, ns.NamespaceVersionZeroIDSize)
_, err := rand.Read(namespace)
if err != nil {
return Operation{}, fmt.Errorf("generating random namespace: %w", err)
}
namespaces[i] = ns.MustNewV0(namespace)
Comment on lines +69 to +75
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[optional] can use

func RandomBlobNamespace() Namespace {

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks but I want to use the random source provided in the method so that by having a specific seed it should always reproduce the same sequence

}
sizes[i] = s.sizes.Rand(rand)
}
// generate the blobs
blobs := blobfactory.RandBlobsWithNamespace(namespaces, sizes)
// derive the pay for blob message
msg, err := blob.NewMsgPayForBlobs(s.account.String(), blobs...)
if err != nil {
return Operation{}, err
}
return Operation{
Msgs: []types.Msg{msg},
Blobs: blobs,
}, nil
}

type Range struct {
Min int
Max int
}
Comment on lines +93 to +96
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍


func NewRange(min, max int) Range {
return Range{Min: min, Max: max}
}

func (r Range) Rand(rand *rand.Rand) int {
cmwaters marked this conversation as resolved.
Show resolved Hide resolved
if r.Max <= r.Min {
return r.Min
}
return rand.Intn(r.Max-r.Min) + r.Min
}
283 changes: 283 additions & 0 deletions testing/txsim/client.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,283 @@
package txsim

import (
"context"
"errors"
"fmt"
"strings"
"sync"
"time"

"github.com/celestiaorg/celestia-app/app/encoding"
blob "github.com/celestiaorg/celestia-app/x/blob/types"
sdkclient "github.com/cosmos/cosmos-sdk/client"
auth "github.com/cosmos/cosmos-sdk/x/auth/types"
bank "github.com/cosmos/cosmos-sdk/x/bank/types"
protogrpc "github.com/gogo/protobuf/grpc"
"github.com/tendermint/tendermint/rpc/client/http"
coretypes "github.com/tendermint/tendermint/rpc/core/types"
"github.com/tendermint/tendermint/types"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
)

// how often to poll the network for the latest height
const (
DefaultPollTime = 3 * time.Second
maxRetries = 20
)
cmwaters marked this conversation as resolved.
Show resolved Hide resolved

var errTimedOutWaitingForTx = errors.New("timed out waiting for tx to be committed (1 minute)")

// TxClient is a client for submitting transactions to one of several nodes. It uses a round-robin
// algorithm for multiplexing requests across multiple clients.
type TxClient struct {
rpcClients []*http.HTTP
encCfg encoding.Config
chainID string
pollTime time.Duration

mtx sync.Mutex
// index indicates which client to use next
index int
height int64
lastUpdated time.Time
}

func NewTxClient(ctx context.Context, encCfg encoding.Config, pollTime time.Duration, rpcEndpoints []string) (*TxClient, error) {
if len(rpcEndpoints) == 0 {
return nil, errors.New("must have at least one endpoint specified")
}

// setup all the rpc clients to communicate with full nodes
rpcClients := make([]*http.HTTP, len(rpcEndpoints))
var (
err error
chainID string
height int64
)
for i, endpoint := range rpcEndpoints {
rpcClients[i], err = http.New(endpoint, "/websocket")
if err != nil {
return nil, fmt.Errorf("error creating rpc client with endpoint %s: %w", endpoint, err)
}

// check that the node is up
status, err := rpcClients[i].Status(ctx)
if err != nil {
return nil, fmt.Errorf("error getting status from rpc server %s: %w", endpoint, err)
}

// set the chainID
if chainID == "" {
chainID = status.NodeInfo.Network
}

// set the latest height
if status.SyncInfo.EarliestBlockHeight > height {
height = status.SyncInfo.EarliestBlockHeight
}
}
return &TxClient{
rpcClients: rpcClients,
encCfg: encCfg,
chainID: chainID,
pollTime: pollTime,
height: height,
lastUpdated: time.Now(),
}, nil
}

func (tc *TxClient) Tx() sdkclient.TxBuilder {
builder := tc.encCfg.TxConfig.NewTxBuilder()
return builder
}

func (tc *TxClient) ChainID() string {
return tc.chainID
}

func (tc *TxClient) Height() int64 {
tc.mtx.Lock()
defer tc.mtx.Unlock()
return tc.height
}

func (tc *TxClient) updateHeight(newHeight int64) int64 {
tc.mtx.Lock()
defer tc.mtx.Unlock()
if newHeight > tc.height {
tc.height = newHeight
tc.lastUpdated = time.Now()
return newHeight
}
return tc.height
}

func (tc *TxClient) LastUpdated() time.Time {
tc.mtx.Lock()
defer tc.mtx.Unlock()
return tc.lastUpdated
}

// WaitForNBlocks uses WaitForHeight to wait for the given number of blocks to
// be produced.
func (tc *TxClient) WaitForNBlocks(ctx context.Context, blocks int64) error {
return tc.WaitForHeight(ctx, tc.Height()+blocks)
}

// WaitForHeight continually polls the network for the latest height. It is
// concurrently safe.
func (tc *TxClient) WaitForHeight(ctx context.Context, height int64) error {
// check if we can immediately return
if height <= tc.Height() {
return nil
}

ticker := time.NewTicker(tc.pollTime)
for {
select {
case <-ticker.C:
// check if we've reached the target height
if height <= tc.Height() {
return nil
}
// check when the last time we polled to avoid concurrent processes
// from polling the network too often
if time.Since(tc.LastUpdated()) < tc.pollTime {
continue
}

// ping a node for their latest height
status, err := tc.Client().Status(ctx)
if err != nil {
return fmt.Errorf("error getting status from rpc server: %w", err)
}

latestHeight := tc.updateHeight(status.SyncInfo.LatestBlockHeight)
// check if the new latest height is greater or equal than the target height
if latestHeight >= height {
return nil
}

case <-ctx.Done():
return ctx.Err()
}
}
}

func (tc *TxClient) WaitForTx(ctx context.Context, txID []byte) (*coretypes.ResultTx, error) {
for i := 0; i < maxRetries; i++ {
resp, err := tc.Client().Tx(ctx, txID, false)
if err != nil {
// tx still no longer exists
if strings.Contains(err.Error(), "not found") {
time.Sleep(tc.pollTime)
continue
}
return nil, err
}

if resp.TxResult.Code != 0 {
return nil, fmt.Errorf("non zero code delivering tx (%d): %s", resp.TxResult.Code, resp.TxResult.Log)
}

return resp, nil
}
return nil, errTimedOutWaitingForTx
}

// Client multiplexes the RPC clients
func (tc *TxClient) Client() *http.HTTP {
tc.mtx.Lock()
defer tc.mtx.Unlock()
evan-forbes marked this conversation as resolved.
Show resolved Hide resolved
defer tc.next()
evan-forbes marked this conversation as resolved.
Show resolved Hide resolved
return tc.rpcClients[tc.index]
}

// Broadcast encodes and broadcasts a transaction to the network. If CheckTx fails,
// the error will be returned. The method does not wait for the transaction to be
// included in a block.
func (tc *TxClient) Broadcast(ctx context.Context, txBuilder sdkclient.TxBuilder, blobs []*blob.Blob) (*coretypes.ResultTx, error) {
tx, err := tc.encCfg.TxConfig.TxEncoder()(txBuilder.GetTx())
if err != nil {
return nil, fmt.Errorf("error encoding tx: %w", err)
}

// If blobs exist, these are bundled into the existing tx.
if len(blobs) > 0 {
txWithBlobs, err := types.MarshalBlobTx(tx, blobs...)
if err != nil {
return nil, err
}
tx = txWithBlobs
}

resp, err := tc.Client().BroadcastTxSync(ctx, tx)
if err != nil {
return nil, fmt.Errorf("broadcast commit: %w", err)
}

if resp.Code != 0 {
return nil, fmt.Errorf("non zero code checking tx (%d): %s", resp.Code, resp.Log)
}

return tc.WaitForTx(ctx, resp.Hash)
}

// next iterates the index of the RPC clients. It is not thread safe and should be called within a mutex.
func (tc *TxClient) next() {
tc.index = (tc.index + 1) % len(tc.rpcClients)
}

// QueryClient multiplexes requests across multiple running gRPC connections. It does this in a round-robin fashion.
type QueryClient struct {
connections []*grpc.ClientConn

mtx sync.Mutex
// index indicates which client to be used next
index int
}

func NewQueryClient(grpcEndpoints []string) (*QueryClient, error) {
connections := make([]*grpc.ClientConn, len(grpcEndpoints))
for idx, endpoint := range grpcEndpoints {
conn, err := grpc.Dial(grpcEndpoints[0], grpc.WithTransportCredentials(insecure.NewCredentials()))
if err != nil {
return nil, fmt.Errorf("dialing %s: %w", endpoint, err)
}
connections[idx] = conn
}

return &QueryClient{
connections: connections,
}, nil
}

// next iterates the index of the RPC clients. It is not thread safe and should be called within a mutex.
func (qc *QueryClient) next() {
qc.index = (qc.index + 1) % len(qc.connections)
}

func (qc *QueryClient) Conn() protogrpc.ClientConn {
qc.mtx.Lock()
defer qc.mtx.Unlock()
defer qc.next()
return qc.connections[qc.index]
}

func (qc *QueryClient) Bank() bank.QueryClient {
return bank.NewQueryClient(qc.Conn())
}

func (qc *QueryClient) Auth() auth.QueryClient {
return auth.NewQueryClient(qc.Conn())
}

func (qc *QueryClient) Close() error {
var err error
for _, conn := range qc.connections {
err = conn.Close()
}
return err
}
Loading