Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: tx simulator #1613

Merged
merged 18 commits into from
Apr 20, 2023
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
441 changes: 441 additions & 0 deletions testing/txsim/account.go

Large diffs are not rendered by default.

102 changes: 102 additions & 0 deletions testing/txsim/blob.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
package txsim

import (
"context"
"fmt"
"math/rand"

ns "github.com/celestiaorg/celestia-app/pkg/namespace"
"github.com/celestiaorg/celestia-app/testutil/blobfactory"
blob "github.com/celestiaorg/celestia-app/x/blob/types"
"github.com/cosmos/cosmos-sdk/types"
"github.com/gogo/protobuf/grpc"
)

var _ Sequence = &BlobSequence{}

// BlobSequence defines a pattern whereby a single user repeatedly sends a pay for blob
// message roughly every height. The PFB may consist of several blobs
type BlobSequence struct {
namespace ns.Namespace
sizes Range
blobsPerPFB Range

account types.AccAddress
}

func NewBlobSequence(sizes Range, blobsPerPFB Range) *BlobSequence {
return &BlobSequence{
sizes: sizes,
blobsPerPFB: blobsPerPFB,
}
}

// WithNamespace provides the option of fixing a predefined namespace for
// all blobs.
func (s *BlobSequence) WithNamespace(namespace ns.Namespace) *BlobSequence {
s.namespace = namespace
return s
}

func (s *BlobSequence) Clone(n int) []Sequence {
sequenceGroup := make([]Sequence, n)
for i := 0; i < n; i++ {
sequenceGroup[i] = &BlobSequence{
namespace: s.namespace,
sizes: s.sizes,
blobsPerPFB: s.blobsPerPFB,
}
}
return sequenceGroup
}

func (s *BlobSequence) Init(_ context.Context, _ grpc.ClientConn, allocateAccounts AccountAllocator, _ *rand.Rand) {
s.account = allocateAccounts(1, 1)[0]
}

func (s *BlobSequence) Next(ctx context.Context, querier grpc.ClientConn, rand *rand.Rand) (Operation, error) {
numBlobs := s.blobsPerPFB.Rand(rand)
sizes := make([]int, numBlobs)
namespaces := make([]ns.Namespace, numBlobs)
for i := range sizes {
if s.namespace.ID != nil {
namespaces[i] = s.namespace
} else {
// generate a random namespace for the blob
namespace := make([]byte, ns.NamespaceVersionZeroIDSize)
_, err := rand.Read(namespace)
if err != nil {
return Operation{}, fmt.Errorf("generating random namespace: %w", err)
}
namespaces[i] = ns.MustNewV0(namespace)
Comment on lines +69 to +75
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[optional] can use

func RandomBlobNamespace() Namespace {

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks but I want to use the random source provided in the method so that by having a specific seed it should always reproduce the same sequence

}
sizes[i] = s.sizes.Rand(rand)
}
// generate the blobs
blobs := blobfactory.RandBlobsWithNamespace(namespaces, sizes)
// derive the pay for blob message
msg, err := blob.NewMsgPayForBlobs(s.account.String(), blobs...)
if err != nil {
return Operation{}, err
}
return Operation{
Msgs: []types.Msg{msg},
Blobs: blobs,
}, nil
}

type Range struct {
Min int
Max int
}
Comment on lines +93 to +96
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍


func NewRange(min, max int) Range {
return Range{Min: min, Max: max}
}

func (r Range) Rand(rand *rand.Rand) int {
cmwaters marked this conversation as resolved.
Show resolved Hide resolved
if r.Max <= r.Min {
return r.Min
}
return rand.Intn(r.Max-r.Min) + r.Min
}
277 changes: 277 additions & 0 deletions testing/txsim/client.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,277 @@
package txsim

import (
"context"
"errors"
"fmt"
"strings"
"sync"
"time"

"github.com/celestiaorg/celestia-app/app/encoding"
blob "github.com/celestiaorg/celestia-app/x/blob/types"
sdkclient "github.com/cosmos/cosmos-sdk/client"
auth "github.com/cosmos/cosmos-sdk/x/auth/types"
bank "github.com/cosmos/cosmos-sdk/x/bank/types"
protogrpc "github.com/gogo/protobuf/grpc"
"github.com/tendermint/tendermint/rpc/client/http"
coretypes "github.com/tendermint/tendermint/rpc/core/types"
"github.com/tendermint/tendermint/types"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
)

// how often to poll the network for the latest height
const (
DefaultPollTime = 3 * time.Second
maxRetries = 20
)
cmwaters marked this conversation as resolved.
Show resolved Hide resolved

var errTimedOutWaitingForTx = errors.New("timed out waiting for tx to be committed (1 minute)")

// TxClient is a client for submitting transactions to one of several nodes.
type TxClient struct {
rpcClients []*http.HTTP
encCfg encoding.Config
chainID string
pollTime time.Duration

mtx sync.Mutex
sequence int
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we somehow clarify what type of sequence this is referring to? is this a testing sequence? a cosmos-sdk sequence (nonce)? a local count of successfully submitted txs?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yup, sorry for the misunderstanding. This is simply used to multiplex queries across multiple RPC clients (via round robin)

height int64
lastUpdated time.Time
}

func NewTxClient(ctx context.Context, encCfg encoding.Config, pollTime time.Duration, rpcEndpoints []string) (*TxClient, error) {
if len(rpcEndpoints) == 0 {
return nil, errors.New("must have at least one endpoint specified")
}

// setup all the rpc clients to communicate with full nodes
rpcClients := make([]*http.HTTP, len(rpcEndpoints))
var (
err error
chainID string
height int64
)
for i, endpoint := range rpcEndpoints {
rpcClients[i], err = http.New(endpoint, "/websocket")
if err != nil {
return nil, fmt.Errorf("error creating rpc client with endpoint %s: %w", endpoint, err)
}

// check that the node is up
status, err := rpcClients[i].Status(ctx)
if err != nil {
return nil, fmt.Errorf("error getting status from rpc server %s: %w", endpoint, err)
}

// set the chainID
if chainID == "" {
chainID = status.NodeInfo.Network
}

// set the latest height
if status.SyncInfo.EarliestBlockHeight > height {
height = status.SyncInfo.EarliestBlockHeight
}
}
return &TxClient{
rpcClients: rpcClients,
encCfg: encCfg,
chainID: chainID,
pollTime: pollTime,
height: height,
lastUpdated: time.Now(),
}, nil
}

func (tc *TxClient) Tx() sdkclient.TxBuilder {
builder := tc.encCfg.TxConfig.NewTxBuilder()
return builder
}

func (tc *TxClient) ChainID() string {
return tc.chainID
}

func (tc *TxClient) Height() int64 {
tc.mtx.Lock()
defer tc.mtx.Unlock()
return tc.height
}

func (tc *TxClient) updateHeight(newHeight int64) int64 {
tc.mtx.Lock()
defer tc.mtx.Unlock()
if newHeight > tc.height {
tc.height = newHeight
tc.lastUpdated = time.Now()
return newHeight
}
return tc.height
}

func (tc *TxClient) LastUpdated() time.Time {
tc.mtx.Lock()
defer tc.mtx.Unlock()
return tc.lastUpdated
}

// WaitForNBlocks uses WaitForHeight to wait for the given number of blocks to
// be produced.
func (tc *TxClient) WaitForNBlocks(ctx context.Context, blocks int64) error {
return tc.WaitForHeight(ctx, tc.Height()+blocks)
}

// WaitForHeight continually polls the network for the latest height. It is
// concurrently safe.
func (tc *TxClient) WaitForHeight(ctx context.Context, height int64) error {
// check if we can immediately return
if height <= tc.Height() {
return nil
}

ticker := time.NewTicker(tc.pollTime)
for {
select {
case <-ticker.C:
// check if we've reached the target height
if height <= tc.Height() {
return nil
}
// check when the last time we polled to avoid concurrent processes
// from polling the network too often
if time.Since(tc.LastUpdated()) < tc.pollTime {
continue
}

// ping a node for their latest height
status, err := tc.Client().Status(ctx)
if err != nil {
return fmt.Errorf("error getting status from rpc server: %w", err)
}

latestHeight := tc.updateHeight(status.SyncInfo.LatestBlockHeight)
// check if the new latest height is greater or equal than the target height
if latestHeight >= height {
return nil
}

case <-ctx.Done():
return ctx.Err()
}
}
}

func (tc *TxClient) WaitForTx(ctx context.Context, txID []byte) (*coretypes.ResultTx, error) {
for i := 0; i < maxRetries; i++ {
resp, err := tc.Client().Tx(ctx, txID, false)
if err != nil {
// tx still no longer exists
if strings.Contains(err.Error(), "not found") {
time.Sleep(tc.pollTime)
continue
}
return nil, err
}

if resp.TxResult.Code != 0 {
return nil, fmt.Errorf("non zero code delivering tx (%d): %s", resp.TxResult.Code, resp.TxResult.Log)
}

return resp, nil
}
return nil, errTimedOutWaitingForTx
}

// Client multiplexes the RPC clients
func (tc *TxClient) Client() *http.HTTP {
tc.mtx.Lock()
defer tc.mtx.Unlock()
evan-forbes marked this conversation as resolved.
Show resolved Hide resolved
defer tc.next()
evan-forbes marked this conversation as resolved.
Show resolved Hide resolved
return tc.rpcClients[tc.sequence]
}

// Broadcast encodes and broadcasts a transaction to the network. If CheckTx fails,
// the error will be returned. The method does not wait for the transaction to be
// included in a block.
func (tc *TxClient) Broadcast(ctx context.Context, txBuilder sdkclient.TxBuilder, blobs []*blob.Blob) (*coretypes.ResultTx, error) {
tx, err := tc.encCfg.TxConfig.TxEncoder()(txBuilder.GetTx())
if err != nil {
return nil, fmt.Errorf("error encoding tx: %w", err)
}

// If blobs exist, these are bundled into the existing tx.
if len(blobs) > 0 {
txWithBlobs, err := types.MarshalBlobTx(tx, blobs...)
if err != nil {
return nil, err
}
tx = txWithBlobs
}

resp, err := tc.Client().BroadcastTxSync(ctx, tx)
if err != nil {
return nil, fmt.Errorf("broadcast commit: %w", err)
}

if resp.Code != 0 {
return nil, fmt.Errorf("non zero code checking tx (%d): %s", resp.Code, resp.Log)
}

return tc.WaitForTx(ctx, resp.Hash)
}

func (tc *TxClient) next() {
tc.sequence = (tc.sequence + 1) % len(tc.rpcClients)
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

please add a comment describing the purpose of this logic. also, in other places we use the mutex before mutating local txclient state (such as the height), do we need to do that here? if not, can we include that in the comment?


type QueryClient struct {
connections []*grpc.ClientConn

mtx sync.Mutex
sequence int
}

func NewQueryClient(grpcEndpoints []string) (*QueryClient, error) {
connections := make([]*grpc.ClientConn, len(grpcEndpoints))
for idx, endpoint := range grpcEndpoints {
conn, err := grpc.Dial(grpcEndpoints[0], grpc.WithTransportCredentials(insecure.NewCredentials()))
if err != nil {
return nil, fmt.Errorf("dialing %s: %w", endpoint, err)
}
connections[idx] = conn
}

return &QueryClient{
connections: connections,
}, nil
}

func (qc *QueryClient) next() {
qc.sequence = (qc.sequence + 1) % len(qc.connections)
}
Copy link
Member

@evan-forbes evan-forbes Apr 12, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should we document with these that they are relying on the callers to handle mutexes since these are mutating state?


func (qc *QueryClient) Conn() protogrpc.ClientConn {
qc.mtx.Lock()
defer qc.mtx.Unlock()
defer qc.next()
return qc.connections[qc.sequence]
}

func (qc *QueryClient) Bank() bank.QueryClient {
return bank.NewQueryClient(qc.Conn())
}

func (qc *QueryClient) Auth() auth.QueryClient {
return auth.NewQueryClient(qc.Conn())
}

func (qc *QueryClient) Close() error {
var err error
for _, conn := range qc.connections {
err = conn.Close()
}
return err
}
Loading