Skip to content

Commit

Permalink
bank send....
Browse files Browse the repository at this point in the history
  • Loading branch information
faddat committed Oct 28, 2024
1 parent 669f150 commit 33abf49
Show file tree
Hide file tree
Showing 3 changed files with 120 additions and 65 deletions.
115 changes: 51 additions & 64 deletions broadcast/broadcast.go
Original file line number Diff line number Diff line change
@@ -1,14 +1,10 @@
package broadcast

import (
"context"
"fmt"
"log"
"time"

cometrpc "github.com/cometbft/cometbft/rpc/client/http"
coretypes "github.com/cometbft/cometbft/rpc/core/types"
tmtypes "github.com/cometbft/cometbft/types"
transfertypes "github.com/cosmos/ibc-go/v8/modules/apps/transfer/types"
"github.com/somatic-labs/meteorite/lib"
types "github.com/somatic-labs/meteorite/types"
Expand Down Expand Up @@ -66,33 +62,19 @@ func init() {

// Transaction broadcasts the transaction bytes to the given RPC endpoint.
func Transaction(txBytes []byte, rpcEndpoint string) (*coretypes.ResultBroadcastTx, error) {
cmtCli, err := cometrpc.New(rpcEndpoint, "/websocket")
client, err := GetClient(rpcEndpoint)
if err != nil {
log.Fatal(err)
}

t := tmtypes.Tx(txBytes)

ctx := context.Background()
res, err := cmtCli.BroadcastTxSync(ctx, t)
if err != nil {
fmt.Println("Error at broadcast:", err)
return nil, err
}

if res.Code != 0 {
// Return an error containing the code and log message
return res, fmt.Errorf("broadcast error code %d: %s", res.Code, res.Log)
}

return res, nil
return client.Transaction(txBytes)
}

// broadcastLoop handles the main transaction broadcasting logic
func Loop(
txParams types.TransactionParams,
batchSize int,
position int, // Add position parameter
position int,
) (successfulTxns, failedTxns int, responseCodes map[uint32]int, updatedSequence uint64) {
successfulTxns = 0
failedTxns = 0
Expand All @@ -118,57 +100,62 @@ func Loop(
)
metrics.Complete = time.Now()

if err == nil {
metrics.LogTiming(currentSequence, true, nil)
successfulTxns++
responseCodes[resp.Code]++
sequence++
continue
}

metrics.LogTiming(currentSequence, false, err)
// Handle case where resp is nil but there's an error
if err != nil {

Check failure on line 104 in broadcast/broadcast.go

View workflow job for this annotation

GitHub Actions / lint

`if err != nil` has complex nested blocks (complexity: 6) (nestif)
metrics.LogTiming(currentSequence, false, err)
failedTxns++

if resp.Code == 32 {
// Extract the expected sequence number from the error message
expectedSeq, parseErr := lib.ExtractExpectedSequence(err.Error())
if parseErr != nil {
fmt.Printf("[POS-%d] Failed to parse expected sequence: %v\n",
position, parseErr)
failedTxns++
// Skip sequence number handling if resp is nil
if resp == nil {
continue
}

sequence = expectedSeq
fmt.Printf("[POS-%d] Set sequence to expected value %d due to mismatch\n",
position, sequence)

// Re-send the transaction with the correct sequence
metrics = &TimingMetrics{
PrepStart: time.Now(),
Position: position,
}

metrics.SignStart = time.Now()
metrics.BroadStart = time.Now()
resp, _, err = SendTransactionViaRPC(
txParams,
sequence,
)
metrics.Complete = time.Now()

if err != nil {
metrics.LogTiming(sequence, false, err)
failedTxns++
if resp.Code == 32 {
// Extract the expected sequence number from the error message
expectedSeq, parseErr := lib.ExtractExpectedSequence(err.Error())
if parseErr != nil {
fmt.Printf("[POS-%d] Failed to parse expected sequence: %v\n",
position, parseErr)
continue
}

sequence = expectedSeq
fmt.Printf("[POS-%d] Set sequence to expected value %d due to mismatch\n",
position, sequence)

// Re-send the transaction with the correct sequence
metrics = &TimingMetrics{
PrepStart: time.Now(),
Position: position,
}

metrics.SignStart = time.Now()
metrics.BroadStart = time.Now()
resp, _, err = SendTransactionViaRPC(
txParams,
sequence,
)
metrics.Complete = time.Now()

if err != nil {
metrics.LogTiming(sequence, false, err)
failedTxns++
continue
}

metrics.LogTiming(sequence, true, nil)
successfulTxns++
responseCodes[resp.Code]++
sequence++
continue
}

metrics.LogTiming(sequence, true, nil)
successfulTxns++
responseCodes[resp.Code]++
sequence++
continue
}
failedTxns++

metrics.LogTiming(currentSequence, true, nil)
successfulTxns++
responseCodes[resp.Code]++
sequence++
}
updatedSequence = sequence
return successfulTxns, failedTxns, responseCodes, updatedSequence
Expand Down
68 changes: 68 additions & 0 deletions broadcast/client.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
package broadcast

import (
"context"
"fmt"
"sync"
"time"

cometrpc "github.com/cometbft/cometbft/rpc/client/http"
coretypes "github.com/cometbft/cometbft/rpc/core/types"
tmtypes "github.com/cometbft/cometbft/types"
)

type BroadcastClient struct {

Check failure on line 14 in broadcast/client.go

View workflow job for this annotation

GitHub Actions / lint

exported: type name will be used as broadcast.BroadcastClient by other packages, and that stutters; consider calling this Client (revive)
client *cometrpc.HTTP
}

var (
clients = make(map[string]*BroadcastClient)
clientsMux sync.RWMutex
)

func GetClient(rpcEndpoint string) (*BroadcastClient, error) {
clientsMux.RLock()
if client, exists := clients[rpcEndpoint]; exists {
clientsMux.RUnlock()
return client, nil
}
clientsMux.RUnlock()

// If client doesn't exist, acquire write lock and create it
clientsMux.Lock()
defer clientsMux.Unlock()

// Double-check after acquiring write lock
if client, exists := clients[rpcEndpoint]; exists {
return client, nil
}

// Create new client
cmtCli, err := cometrpc.New(rpcEndpoint, "/websocket")
if err != nil {
return nil, err
}

client := &BroadcastClient{
client: cmtCli,
}
clients[rpcEndpoint] = client
return client, nil
}

func (b *BroadcastClient) Transaction(txBytes []byte) (*coretypes.ResultBroadcastTx, error) {
ctx, cancel := context.WithTimeout(context.Background(), 100*time.Millisecond)
defer cancel()

t := tmtypes.Tx(txBytes)
res, err := b.client.BroadcastTxSync(ctx, t)
if err != nil {
return nil, err
}

if res.Code != 0 {
return res, fmt.Errorf("broadcast error code %d: %s", res.Code, res.Log)
}

return res, nil
}
2 changes: 1 addition & 1 deletion nodes.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ rand_max = 30000
revision_number = 4
timeout_height = 21720608
slip44 = 118
positions = 10 # Number of positions to use from the seed phrase
positions = 50 # Number of positions to use from the seed phrase

broadcast_mode = "grpc" # or "rpc"

Expand Down

0 comments on commit 33abf49

Please sign in to comment.