-
Notifications
You must be signed in to change notification settings - Fork 344
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: tx simulator #1613
feat: tx simulator #1613
Changes from 6 commits
b64a530
e6d43ac
9f9ef76
fb36328
7ef5695
5482401
60e53ca
5cf229f
e193524
3407ec2
846d4b1
b04537f
9bc19b3
304cf78
f44c569
34f12c9
6373b93
f3d229f
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Large diffs are not rendered by default.
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,102 @@ | ||
package txsim | ||
|
||
import ( | ||
"context" | ||
"fmt" | ||
"math/rand" | ||
|
||
ns "github.com/celestiaorg/celestia-app/pkg/namespace" | ||
"github.com/celestiaorg/celestia-app/testutil/blobfactory" | ||
blob "github.com/celestiaorg/celestia-app/x/blob/types" | ||
"github.com/cosmos/cosmos-sdk/types" | ||
"github.com/gogo/protobuf/grpc" | ||
) | ||
|
||
var _ Sequence = &BlobSequence{} | ||
|
||
// BlobSequence defines a pattern whereby a single user repeatedly sends a pay for blob | ||
// message roughly every height. The PFB may consist of several blobs | ||
type BlobSequence struct { | ||
namespace ns.Namespace | ||
sizes Range | ||
blobsPerPFB Range | ||
|
||
account types.AccAddress | ||
} | ||
|
||
func NewBlobSequence(sizes Range, blobsPerPFB Range) *BlobSequence { | ||
return &BlobSequence{ | ||
sizes: sizes, | ||
blobsPerPFB: blobsPerPFB, | ||
} | ||
} | ||
|
||
// WithNamespace provides the option of fixing a predefined namespace for | ||
// all blobs. | ||
func (s *BlobSequence) WithNamespace(namespace ns.Namespace) *BlobSequence { | ||
s.namespace = namespace | ||
return s | ||
} | ||
|
||
func (s *BlobSequence) Clone(n int) []Sequence { | ||
sequenceGroup := make([]Sequence, n) | ||
for i := 0; i < n; i++ { | ||
sequenceGroup[i] = &BlobSequence{ | ||
namespace: s.namespace, | ||
sizes: s.sizes, | ||
blobsPerPFB: s.blobsPerPFB, | ||
} | ||
} | ||
return sequenceGroup | ||
} | ||
|
||
func (s *BlobSequence) Init(_ context.Context, _ grpc.ClientConn, allocateAccounts AccountAllocator, _ *rand.Rand) { | ||
s.account = allocateAccounts(1, 1)[0] | ||
} | ||
|
||
func (s *BlobSequence) Next(ctx context.Context, querier grpc.ClientConn, rand *rand.Rand) (Operation, error) { | ||
numBlobs := s.blobsPerPFB.Rand(rand) | ||
sizes := make([]int, numBlobs) | ||
namespaces := make([]ns.Namespace, numBlobs) | ||
for i := range sizes { | ||
if s.namespace.ID != nil { | ||
namespaces[i] = s.namespace | ||
} else { | ||
// generate a random namespace for the blob | ||
namespace := make([]byte, ns.NamespaceVersionZeroIDSize) | ||
_, err := rand.Read(namespace) | ||
if err != nil { | ||
return Operation{}, fmt.Errorf("generating random namespace: %w", err) | ||
} | ||
namespaces[i] = ns.MustNewV0(namespace) | ||
} | ||
sizes[i] = s.sizes.Rand(rand) | ||
} | ||
// generate the blobs | ||
blobs := blobfactory.RandBlobsWithNamespace(namespaces, sizes) | ||
// derive the pay for blob message | ||
msg, err := blob.NewMsgPayForBlobs(s.account.String(), blobs...) | ||
if err != nil { | ||
return Operation{}, err | ||
} | ||
return Operation{ | ||
Msgs: []types.Msg{msg}, | ||
Blobs: blobs, | ||
}, nil | ||
} | ||
|
||
type Range struct { | ||
Min int | ||
Max int | ||
} | ||
Comment on lines
+93
to
+96
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 👍 |
||
|
||
func NewRange(min, max int) Range { | ||
return Range{Min: min, Max: max} | ||
} | ||
|
||
func (r Range) Rand(rand *rand.Rand) int { | ||
cmwaters marked this conversation as resolved.
Show resolved
Hide resolved
|
||
if r.Max <= r.Min { | ||
return r.Min | ||
} | ||
return rand.Intn(r.Max-r.Min) + r.Min | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,277 @@ | ||
package txsim | ||
|
||
import ( | ||
"context" | ||
"errors" | ||
"fmt" | ||
"strings" | ||
"sync" | ||
"time" | ||
|
||
"github.com/celestiaorg/celestia-app/app/encoding" | ||
blob "github.com/celestiaorg/celestia-app/x/blob/types" | ||
sdkclient "github.com/cosmos/cosmos-sdk/client" | ||
auth "github.com/cosmos/cosmos-sdk/x/auth/types" | ||
bank "github.com/cosmos/cosmos-sdk/x/bank/types" | ||
protogrpc "github.com/gogo/protobuf/grpc" | ||
"github.com/tendermint/tendermint/rpc/client/http" | ||
coretypes "github.com/tendermint/tendermint/rpc/core/types" | ||
"github.com/tendermint/tendermint/types" | ||
"google.golang.org/grpc" | ||
"google.golang.org/grpc/credentials/insecure" | ||
) | ||
|
||
// how often to poll the network for the latest height | ||
const ( | ||
DefaultPollTime = 3 * time.Second | ||
maxRetries = 20 | ||
) | ||
cmwaters marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
||
var errTimedOutWaitingForTx = errors.New("timed out waiting for tx to be committed (1 minute)") | ||
|
||
// TxClient is a client for submitting transactions to one of several nodes. | ||
type TxClient struct { | ||
rpcClients []*http.HTTP | ||
encCfg encoding.Config | ||
chainID string | ||
pollTime time.Duration | ||
|
||
mtx sync.Mutex | ||
sequence int | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. can we somehow clarify what type of sequence this is referring to? is this a testing sequence? a cosmos-sdk sequence (nonce)? a local count of successfully submitted txs? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yup, sorry for the misunderstanding. This is simply used to multiplex queries across multiple RPC clients (via round robin) |
||
height int64 | ||
lastUpdated time.Time | ||
} | ||
|
||
func NewTxClient(ctx context.Context, encCfg encoding.Config, pollTime time.Duration, rpcEndpoints []string) (*TxClient, error) { | ||
if len(rpcEndpoints) == 0 { | ||
return nil, errors.New("must have at least one endpoint specified") | ||
} | ||
|
||
// setup all the rpc clients to communicate with full nodes | ||
rpcClients := make([]*http.HTTP, len(rpcEndpoints)) | ||
var ( | ||
err error | ||
chainID string | ||
height int64 | ||
) | ||
for i, endpoint := range rpcEndpoints { | ||
rpcClients[i], err = http.New(endpoint, "/websocket") | ||
if err != nil { | ||
return nil, fmt.Errorf("error creating rpc client with endpoint %s: %w", endpoint, err) | ||
} | ||
|
||
// check that the node is up | ||
status, err := rpcClients[i].Status(ctx) | ||
if err != nil { | ||
return nil, fmt.Errorf("error getting status from rpc server %s: %w", endpoint, err) | ||
} | ||
|
||
// set the chainID | ||
if chainID == "" { | ||
chainID = status.NodeInfo.Network | ||
} | ||
|
||
// set the latest height | ||
if status.SyncInfo.EarliestBlockHeight > height { | ||
height = status.SyncInfo.EarliestBlockHeight | ||
} | ||
} | ||
return &TxClient{ | ||
rpcClients: rpcClients, | ||
encCfg: encCfg, | ||
chainID: chainID, | ||
pollTime: pollTime, | ||
height: height, | ||
lastUpdated: time.Now(), | ||
}, nil | ||
} | ||
|
||
func (tc *TxClient) Tx() sdkclient.TxBuilder { | ||
builder := tc.encCfg.TxConfig.NewTxBuilder() | ||
return builder | ||
} | ||
|
||
func (tc *TxClient) ChainID() string { | ||
return tc.chainID | ||
} | ||
|
||
func (tc *TxClient) Height() int64 { | ||
tc.mtx.Lock() | ||
defer tc.mtx.Unlock() | ||
return tc.height | ||
} | ||
|
||
func (tc *TxClient) updateHeight(newHeight int64) int64 { | ||
tc.mtx.Lock() | ||
defer tc.mtx.Unlock() | ||
if newHeight > tc.height { | ||
tc.height = newHeight | ||
tc.lastUpdated = time.Now() | ||
return newHeight | ||
} | ||
return tc.height | ||
} | ||
|
||
func (tc *TxClient) LastUpdated() time.Time { | ||
tc.mtx.Lock() | ||
defer tc.mtx.Unlock() | ||
return tc.lastUpdated | ||
} | ||
|
||
// WaitForNBlocks uses WaitForHeight to wait for the given number of blocks to | ||
// be produced. | ||
func (tc *TxClient) WaitForNBlocks(ctx context.Context, blocks int64) error { | ||
return tc.WaitForHeight(ctx, tc.Height()+blocks) | ||
} | ||
|
||
// WaitForHeight continually polls the network for the latest height. It is | ||
// concurrently safe. | ||
func (tc *TxClient) WaitForHeight(ctx context.Context, height int64) error { | ||
// check if we can immediately return | ||
if height <= tc.Height() { | ||
return nil | ||
} | ||
|
||
ticker := time.NewTicker(tc.pollTime) | ||
for { | ||
select { | ||
case <-ticker.C: | ||
// check if we've reached the target height | ||
if height <= tc.Height() { | ||
return nil | ||
} | ||
// check when the last time we polled to avoid concurrent processes | ||
// from polling the network too often | ||
if time.Since(tc.LastUpdated()) < tc.pollTime { | ||
continue | ||
} | ||
|
||
// ping a node for their latest height | ||
status, err := tc.Client().Status(ctx) | ||
if err != nil { | ||
return fmt.Errorf("error getting status from rpc server: %w", err) | ||
} | ||
|
||
latestHeight := tc.updateHeight(status.SyncInfo.LatestBlockHeight) | ||
// check if the new latest height is greater or equal than the target height | ||
if latestHeight >= height { | ||
return nil | ||
} | ||
|
||
case <-ctx.Done(): | ||
return ctx.Err() | ||
} | ||
} | ||
} | ||
|
||
func (tc *TxClient) WaitForTx(ctx context.Context, txID []byte) (*coretypes.ResultTx, error) { | ||
for i := 0; i < maxRetries; i++ { | ||
resp, err := tc.Client().Tx(ctx, txID, false) | ||
if err != nil { | ||
// tx still no longer exists | ||
if strings.Contains(err.Error(), "not found") { | ||
time.Sleep(tc.pollTime) | ||
continue | ||
} | ||
return nil, err | ||
} | ||
|
||
if resp.TxResult.Code != 0 { | ||
return nil, fmt.Errorf("non zero code delivering tx (%d): %s", resp.TxResult.Code, resp.TxResult.Log) | ||
} | ||
|
||
return resp, nil | ||
} | ||
return nil, errTimedOutWaitingForTx | ||
} | ||
|
||
// Client multiplexes the RPC clients | ||
func (tc *TxClient) Client() *http.HTTP { | ||
tc.mtx.Lock() | ||
defer tc.mtx.Unlock() | ||
evan-forbes marked this conversation as resolved.
Show resolved
Hide resolved
|
||
defer tc.next() | ||
evan-forbes marked this conversation as resolved.
Show resolved
Hide resolved
|
||
return tc.rpcClients[tc.sequence] | ||
} | ||
|
||
// Broadcast encodes and broadcasts a transaction to the network. If CheckTx fails, | ||
// the error will be returned. The method does not wait for the transaction to be | ||
// included in a block. | ||
func (tc *TxClient) Broadcast(ctx context.Context, txBuilder sdkclient.TxBuilder, blobs []*blob.Blob) (*coretypes.ResultTx, error) { | ||
tx, err := tc.encCfg.TxConfig.TxEncoder()(txBuilder.GetTx()) | ||
if err != nil { | ||
return nil, fmt.Errorf("error encoding tx: %w", err) | ||
} | ||
|
||
// If blobs exist, these are bundled into the existing tx. | ||
if len(blobs) > 0 { | ||
txWithBlobs, err := types.MarshalBlobTx(tx, blobs...) | ||
if err != nil { | ||
return nil, err | ||
} | ||
tx = txWithBlobs | ||
} | ||
|
||
resp, err := tc.Client().BroadcastTxSync(ctx, tx) | ||
if err != nil { | ||
return nil, fmt.Errorf("broadcast commit: %w", err) | ||
} | ||
|
||
if resp.Code != 0 { | ||
return nil, fmt.Errorf("non zero code checking tx (%d): %s", resp.Code, resp.Log) | ||
} | ||
|
||
return tc.WaitForTx(ctx, resp.Hash) | ||
} | ||
|
||
func (tc *TxClient) next() { | ||
tc.sequence = (tc.sequence + 1) % len(tc.rpcClients) | ||
} | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. please add a comment describing the purpose of this logic. also, in other places we use the mutex before mutating local txclient state (such as the height), do we need to do that here? if not, can we include that in the comment? |
||
|
||
type QueryClient struct { | ||
connections []*grpc.ClientConn | ||
|
||
mtx sync.Mutex | ||
sequence int | ||
} | ||
|
||
func NewQueryClient(grpcEndpoints []string) (*QueryClient, error) { | ||
connections := make([]*grpc.ClientConn, len(grpcEndpoints)) | ||
for idx, endpoint := range grpcEndpoints { | ||
conn, err := grpc.Dial(grpcEndpoints[0], grpc.WithTransportCredentials(insecure.NewCredentials())) | ||
if err != nil { | ||
return nil, fmt.Errorf("dialing %s: %w", endpoint, err) | ||
} | ||
connections[idx] = conn | ||
} | ||
|
||
return &QueryClient{ | ||
connections: connections, | ||
}, nil | ||
} | ||
|
||
func (qc *QueryClient) next() { | ||
qc.sequence = (qc.sequence + 1) % len(qc.connections) | ||
} | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. should we document with these that they are relying on the callers to handle mutexes since these are mutating state? |
||
|
||
func (qc *QueryClient) Conn() protogrpc.ClientConn { | ||
qc.mtx.Lock() | ||
defer qc.mtx.Unlock() | ||
defer qc.next() | ||
return qc.connections[qc.sequence] | ||
} | ||
|
||
func (qc *QueryClient) Bank() bank.QueryClient { | ||
return bank.NewQueryClient(qc.Conn()) | ||
} | ||
|
||
func (qc *QueryClient) Auth() auth.QueryClient { | ||
return auth.NewQueryClient(qc.Conn()) | ||
} | ||
|
||
func (qc *QueryClient) Close() error { | ||
var err error | ||
for _, conn := range qc.connections { | ||
err = conn.Close() | ||
} | ||
return err | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
[optional] can use
celestia-app/pkg/namespace/random_blob.go
Line 11 in dd19531
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks but I want to use the random source provided in the method so that by having a specific seed it should always reproduce the same sequence