-
Notifications
You must be signed in to change notification settings - Fork 344
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: tx simulator #1613
Merged
feat: tx simulator #1613
Changes from 8 commits
Commits
Show all changes
18 commits
Select commit
Hold shift + click to select a range
b64a530
tx simulator prototype
cmwaters e6d43ac
Merge branch 'main' into cal/fuzz
cmwaters 9f9ef76
clean up logs
cmwaters fb36328
Merge branch 'cal/fuzz' of github.com:celestiaorg/celestia-app into c…
cmwaters 7ef5695
fix up the data race (I think)
cmwaters 5482401
result to ignoring race failure
cmwaters 60e53ca
address PR comments
cmwaters 5cf229f
add some concurrency protection to the map
cmwaters e193524
Merge branch 'main' into cal/fuzz
cmwaters 3407ec2
Merge branch 'main' into cal/fuzz
cmwaters 846d4b1
allow sequences to dictate gas
cmwaters b04537f
Merge branch 'main' into cal/fuzz
cmwaters 9bc19b3
lint
cmwaters 304cf78
fix estimation tests
cmwaters f44c569
Merge branch 'cal/fuzz' of github.com:celestiaorg/celestia-app into c…
cmwaters 34f12c9
fix node test
cmwaters 6373b93
Merge branch 'main' into cal/fuzz
cmwaters f3d229f
Merge branch 'main' into cal/fuzz
cmwaters File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
Large diffs are not rendered by default.
Oops, something went wrong.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,102 @@ | ||
package txsim | ||
|
||
import ( | ||
"context" | ||
"fmt" | ||
"math/rand" | ||
|
||
ns "github.com/celestiaorg/celestia-app/pkg/namespace" | ||
"github.com/celestiaorg/celestia-app/testutil/blobfactory" | ||
blob "github.com/celestiaorg/celestia-app/x/blob/types" | ||
"github.com/cosmos/cosmos-sdk/types" | ||
"github.com/gogo/protobuf/grpc" | ||
) | ||
|
||
var _ Sequence = &BlobSequence{} | ||
|
||
// BlobSequence defines a pattern whereby a single user repeatedly sends a pay for blob | ||
// message roughly every height. The PFB may consist of several blobs | ||
type BlobSequence struct { | ||
namespace ns.Namespace | ||
sizes Range | ||
blobsPerPFB Range | ||
|
||
account types.AccAddress | ||
} | ||
|
||
func NewBlobSequence(sizes Range, blobsPerPFB Range) *BlobSequence { | ||
return &BlobSequence{ | ||
sizes: sizes, | ||
blobsPerPFB: blobsPerPFB, | ||
} | ||
} | ||
|
||
// WithNamespace provides the option of fixing a predefined namespace for | ||
// all blobs. | ||
func (s *BlobSequence) WithNamespace(namespace ns.Namespace) *BlobSequence { | ||
s.namespace = namespace | ||
return s | ||
} | ||
|
||
func (s *BlobSequence) Clone(n int) []Sequence { | ||
sequenceGroup := make([]Sequence, n) | ||
for i := 0; i < n; i++ { | ||
sequenceGroup[i] = &BlobSequence{ | ||
namespace: s.namespace, | ||
sizes: s.sizes, | ||
blobsPerPFB: s.blobsPerPFB, | ||
} | ||
} | ||
return sequenceGroup | ||
} | ||
|
||
func (s *BlobSequence) Init(_ context.Context, _ grpc.ClientConn, allocateAccounts AccountAllocator, _ *rand.Rand) { | ||
s.account = allocateAccounts(1, 1)[0] | ||
} | ||
|
||
func (s *BlobSequence) Next(ctx context.Context, querier grpc.ClientConn, rand *rand.Rand) (Operation, error) { | ||
numBlobs := s.blobsPerPFB.Rand(rand) | ||
sizes := make([]int, numBlobs) | ||
namespaces := make([]ns.Namespace, numBlobs) | ||
for i := range sizes { | ||
if s.namespace.ID != nil { | ||
namespaces[i] = s.namespace | ||
} else { | ||
// generate a random namespace for the blob | ||
namespace := make([]byte, ns.NamespaceVersionZeroIDSize) | ||
_, err := rand.Read(namespace) | ||
if err != nil { | ||
return Operation{}, fmt.Errorf("generating random namespace: %w", err) | ||
} | ||
namespaces[i] = ns.MustNewV0(namespace) | ||
} | ||
sizes[i] = s.sizes.Rand(rand) | ||
} | ||
// generate the blobs | ||
blobs := blobfactory.RandBlobsWithNamespace(namespaces, sizes) | ||
// derive the pay for blob message | ||
msg, err := blob.NewMsgPayForBlobs(s.account.String(), blobs...) | ||
if err != nil { | ||
return Operation{}, err | ||
} | ||
return Operation{ | ||
Msgs: []types.Msg{msg}, | ||
Blobs: blobs, | ||
}, nil | ||
} | ||
|
||
type Range struct { | ||
Min int | ||
Max int | ||
} | ||
Comment on lines
+93
to
+96
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 👍 |
||
|
||
func NewRange(min, max int) Range { | ||
return Range{Min: min, Max: max} | ||
} | ||
|
||
func (r Range) Rand(rand *rand.Rand) int { | ||
cmwaters marked this conversation as resolved.
Show resolved
Hide resolved
|
||
if r.Max <= r.Min { | ||
return r.Min | ||
} | ||
return rand.Intn(r.Max-r.Min) + r.Min | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,283 @@ | ||
package txsim | ||
|
||
import ( | ||
"context" | ||
"errors" | ||
"fmt" | ||
"strings" | ||
"sync" | ||
"time" | ||
|
||
"github.com/celestiaorg/celestia-app/app/encoding" | ||
blob "github.com/celestiaorg/celestia-app/x/blob/types" | ||
sdkclient "github.com/cosmos/cosmos-sdk/client" | ||
auth "github.com/cosmos/cosmos-sdk/x/auth/types" | ||
bank "github.com/cosmos/cosmos-sdk/x/bank/types" | ||
protogrpc "github.com/gogo/protobuf/grpc" | ||
"github.com/tendermint/tendermint/rpc/client/http" | ||
coretypes "github.com/tendermint/tendermint/rpc/core/types" | ||
"github.com/tendermint/tendermint/types" | ||
"google.golang.org/grpc" | ||
"google.golang.org/grpc/credentials/insecure" | ||
) | ||
|
||
// how often to poll the network for the latest height | ||
const ( | ||
DefaultPollTime = 3 * time.Second | ||
maxRetries = 20 | ||
) | ||
cmwaters marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
||
var errTimedOutWaitingForTx = errors.New("timed out waiting for tx to be committed (1 minute)") | ||
|
||
// TxClient is a client for submitting transactions to one of several nodes. It uses a round-robin | ||
// algorithm for multiplexing requests across multiple clients. | ||
type TxClient struct { | ||
rpcClients []*http.HTTP | ||
encCfg encoding.Config | ||
chainID string | ||
pollTime time.Duration | ||
|
||
mtx sync.Mutex | ||
// index indicates which client to use next | ||
index int | ||
height int64 | ||
lastUpdated time.Time | ||
} | ||
|
||
func NewTxClient(ctx context.Context, encCfg encoding.Config, pollTime time.Duration, rpcEndpoints []string) (*TxClient, error) { | ||
if len(rpcEndpoints) == 0 { | ||
return nil, errors.New("must have at least one endpoint specified") | ||
} | ||
|
||
// setup all the rpc clients to communicate with full nodes | ||
rpcClients := make([]*http.HTTP, len(rpcEndpoints)) | ||
var ( | ||
err error | ||
chainID string | ||
height int64 | ||
) | ||
for i, endpoint := range rpcEndpoints { | ||
rpcClients[i], err = http.New(endpoint, "/websocket") | ||
if err != nil { | ||
return nil, fmt.Errorf("error creating rpc client with endpoint %s: %w", endpoint, err) | ||
} | ||
|
||
// check that the node is up | ||
status, err := rpcClients[i].Status(ctx) | ||
if err != nil { | ||
return nil, fmt.Errorf("error getting status from rpc server %s: %w", endpoint, err) | ||
} | ||
|
||
// set the chainID | ||
if chainID == "" { | ||
chainID = status.NodeInfo.Network | ||
} | ||
|
||
// set the latest height | ||
if status.SyncInfo.EarliestBlockHeight > height { | ||
height = status.SyncInfo.EarliestBlockHeight | ||
} | ||
} | ||
return &TxClient{ | ||
rpcClients: rpcClients, | ||
encCfg: encCfg, | ||
chainID: chainID, | ||
pollTime: pollTime, | ||
height: height, | ||
lastUpdated: time.Now(), | ||
}, nil | ||
} | ||
|
||
func (tc *TxClient) Tx() sdkclient.TxBuilder { | ||
builder := tc.encCfg.TxConfig.NewTxBuilder() | ||
return builder | ||
} | ||
|
||
func (tc *TxClient) ChainID() string { | ||
return tc.chainID | ||
} | ||
|
||
func (tc *TxClient) Height() int64 { | ||
tc.mtx.Lock() | ||
defer tc.mtx.Unlock() | ||
return tc.height | ||
} | ||
|
||
func (tc *TxClient) updateHeight(newHeight int64) int64 { | ||
tc.mtx.Lock() | ||
defer tc.mtx.Unlock() | ||
if newHeight > tc.height { | ||
tc.height = newHeight | ||
tc.lastUpdated = time.Now() | ||
return newHeight | ||
} | ||
return tc.height | ||
} | ||
|
||
func (tc *TxClient) LastUpdated() time.Time { | ||
tc.mtx.Lock() | ||
defer tc.mtx.Unlock() | ||
return tc.lastUpdated | ||
} | ||
|
||
// WaitForNBlocks uses WaitForHeight to wait for the given number of blocks to | ||
// be produced. | ||
func (tc *TxClient) WaitForNBlocks(ctx context.Context, blocks int64) error { | ||
return tc.WaitForHeight(ctx, tc.Height()+blocks) | ||
} | ||
|
||
// WaitForHeight continually polls the network for the latest height. It is | ||
// concurrently safe. | ||
func (tc *TxClient) WaitForHeight(ctx context.Context, height int64) error { | ||
// check if we can immediately return | ||
if height <= tc.Height() { | ||
return nil | ||
} | ||
|
||
ticker := time.NewTicker(tc.pollTime) | ||
for { | ||
select { | ||
case <-ticker.C: | ||
// check if we've reached the target height | ||
if height <= tc.Height() { | ||
return nil | ||
} | ||
// check when the last time we polled to avoid concurrent processes | ||
// from polling the network too often | ||
if time.Since(tc.LastUpdated()) < tc.pollTime { | ||
continue | ||
} | ||
|
||
// ping a node for their latest height | ||
status, err := tc.Client().Status(ctx) | ||
if err != nil { | ||
return fmt.Errorf("error getting status from rpc server: %w", err) | ||
} | ||
|
||
latestHeight := tc.updateHeight(status.SyncInfo.LatestBlockHeight) | ||
// check if the new latest height is greater or equal than the target height | ||
if latestHeight >= height { | ||
return nil | ||
} | ||
|
||
case <-ctx.Done(): | ||
return ctx.Err() | ||
} | ||
} | ||
} | ||
|
||
func (tc *TxClient) WaitForTx(ctx context.Context, txID []byte) (*coretypes.ResultTx, error) { | ||
for i := 0; i < maxRetries; i++ { | ||
resp, err := tc.Client().Tx(ctx, txID, false) | ||
if err != nil { | ||
// tx still no longer exists | ||
if strings.Contains(err.Error(), "not found") { | ||
time.Sleep(tc.pollTime) | ||
continue | ||
} | ||
return nil, err | ||
} | ||
|
||
if resp.TxResult.Code != 0 { | ||
return nil, fmt.Errorf("non zero code delivering tx (%d): %s", resp.TxResult.Code, resp.TxResult.Log) | ||
} | ||
|
||
return resp, nil | ||
} | ||
return nil, errTimedOutWaitingForTx | ||
} | ||
|
||
// Client multiplexes the RPC clients | ||
func (tc *TxClient) Client() *http.HTTP { | ||
tc.mtx.Lock() | ||
defer tc.mtx.Unlock() | ||
evan-forbes marked this conversation as resolved.
Show resolved
Hide resolved
|
||
defer tc.next() | ||
evan-forbes marked this conversation as resolved.
Show resolved
Hide resolved
|
||
return tc.rpcClients[tc.index] | ||
} | ||
|
||
// Broadcast encodes and broadcasts a transaction to the network. If CheckTx fails, | ||
// the error will be returned. The method does not wait for the transaction to be | ||
// included in a block. | ||
func (tc *TxClient) Broadcast(ctx context.Context, txBuilder sdkclient.TxBuilder, blobs []*blob.Blob) (*coretypes.ResultTx, error) { | ||
tx, err := tc.encCfg.TxConfig.TxEncoder()(txBuilder.GetTx()) | ||
if err != nil { | ||
return nil, fmt.Errorf("error encoding tx: %w", err) | ||
} | ||
|
||
// If blobs exist, these are bundled into the existing tx. | ||
if len(blobs) > 0 { | ||
txWithBlobs, err := types.MarshalBlobTx(tx, blobs...) | ||
if err != nil { | ||
return nil, err | ||
} | ||
tx = txWithBlobs | ||
} | ||
|
||
resp, err := tc.Client().BroadcastTxSync(ctx, tx) | ||
if err != nil { | ||
return nil, fmt.Errorf("broadcast commit: %w", err) | ||
} | ||
|
||
if resp.Code != 0 { | ||
return nil, fmt.Errorf("non zero code checking tx (%d): %s", resp.Code, resp.Log) | ||
} | ||
|
||
return tc.WaitForTx(ctx, resp.Hash) | ||
} | ||
|
||
// next iterates the index of the RPC clients. It is not thread safe and should be called within a mutex. | ||
func (tc *TxClient) next() { | ||
tc.index = (tc.index + 1) % len(tc.rpcClients) | ||
} | ||
|
||
// QueryClient multiplexes requests across multiple running gRPC connections. It does this in a round-robin fashion. | ||
type QueryClient struct { | ||
connections []*grpc.ClientConn | ||
|
||
mtx sync.Mutex | ||
// index indicates which client to be used next | ||
index int | ||
} | ||
|
||
func NewQueryClient(grpcEndpoints []string) (*QueryClient, error) { | ||
connections := make([]*grpc.ClientConn, len(grpcEndpoints)) | ||
for idx, endpoint := range grpcEndpoints { | ||
conn, err := grpc.Dial(grpcEndpoints[0], grpc.WithTransportCredentials(insecure.NewCredentials())) | ||
if err != nil { | ||
return nil, fmt.Errorf("dialing %s: %w", endpoint, err) | ||
} | ||
connections[idx] = conn | ||
} | ||
|
||
return &QueryClient{ | ||
connections: connections, | ||
}, nil | ||
} | ||
|
||
// next iterates the index of the RPC clients. It is not thread safe and should be called within a mutex. | ||
func (qc *QueryClient) next() { | ||
qc.index = (qc.index + 1) % len(qc.connections) | ||
} | ||
|
||
func (qc *QueryClient) Conn() protogrpc.ClientConn { | ||
qc.mtx.Lock() | ||
defer qc.mtx.Unlock() | ||
defer qc.next() | ||
return qc.connections[qc.index] | ||
} | ||
|
||
func (qc *QueryClient) Bank() bank.QueryClient { | ||
return bank.NewQueryClient(qc.Conn()) | ||
} | ||
|
||
func (qc *QueryClient) Auth() auth.QueryClient { | ||
return auth.NewQueryClient(qc.Conn()) | ||
} | ||
|
||
func (qc *QueryClient) Close() error { | ||
var err error | ||
for _, conn := range qc.connections { | ||
err = conn.Close() | ||
} | ||
return err | ||
} |
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
[optional] can use
celestia-app/pkg/namespace/random_blob.go
Line 11 in dd19531
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks but I want to use the random source provided in the method so that by having a specific seed it should always reproduce the same sequence