Skip to content

Commit

Permalink
fix: comments
Browse files Browse the repository at this point in the history
  • Loading branch information
ToniRamirezM committed Sep 6, 2024
1 parent 41e1a20 commit 7c6595c
Show file tree
Hide file tree
Showing 4 changed files with 84 additions and 57 deletions.
24 changes: 16 additions & 8 deletions sequencesender/ethtx.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,11 +56,12 @@ func (s *SequenceSender) sendTx(ctx context.Context, resend bool, txOldHash *com
log.Errorf("trying to resend a tx with nil hash")
return errors.New("resend tx with nil hash monitor id")
}
paramTo = &s.ethTransactions[*txOldHash].To
paramNonce = &s.ethTransactions[*txOldHash].Nonce
oldEthTx := s.ethTransactions[*txOldHash]
paramTo = &oldEthTx.To
paramNonce = &oldEthTx.Nonce
paramData = s.ethTxData[*txOldHash]
valueFromBatch = s.ethTransactions[*txOldHash].FromBatch
valueToBatch = s.ethTransactions[*txOldHash].ToBatch
valueFromBatch = oldEthTx.FromBatch
valueToBatch = oldEthTx.ToBatch
}
if paramTo != nil {
valueToAddress = *paramTo
Expand Down Expand Up @@ -93,7 +94,10 @@ func (s *SequenceSender) sendTx(ctx context.Context, resend bool, txOldHash *com
s.ethTransactions[txHash] = &txData
txResults := make(map[common.Hash]ethtxmanager.TxResult, 0)
s.copyTxData(txHash, paramData, txResults)
_ = s.getResultAndUpdateEthTx(ctx, txHash)
err = s.getResultAndUpdateEthTx(ctx, txHash)
if err != nil {
log.Errorf("error getting result for tx %v: %v", txHash, err)
}
if !resend {
s.latestSentToL1Batch = valueToBatch
} else {
Expand Down Expand Up @@ -168,7 +172,10 @@ func (s *SequenceSender) syncEthTxResults(ctx context.Context) (uint64, error) {
continue
}

_ = s.getResultAndUpdateEthTx(ctx, hash)
err := s.getResultAndUpdateEthTx(ctx, hash)
if err != nil {
log.Errorf("error getting result for tx %v: %v", hash, err)
}
txSync++
txStatus := s.ethTransactions[hash].Status
// Count if it is not in a final state
Expand Down Expand Up @@ -284,8 +291,8 @@ func (s *SequenceSender) updateEthTxResult(txData *ethTxData, txResult ethtxmana
func (s *SequenceSender) getResultAndUpdateEthTx(ctx context.Context, txHash common.Hash) error {
txData, exists := s.ethTransactions[txHash]
if !exists {
log.Errorf("transaction %v not found in memory", txHash)
return errors.New("transaction not found in memory structure")
log.Infof("transaction %v not found in memory", txHash)
return nil
}

txResult, err := s.ethTxManager.Result(ctx, txHash)
Expand All @@ -297,6 +304,7 @@ func (s *SequenceSender) getResultAndUpdateEthTx(ctx context.Context, txHash com
if errSend == nil {
txData.OnMonitor = false
}
return errSend
} else if err != nil {
log.Errorf("error getting result for tx %v: %v", txHash, err)
return err
Expand Down
14 changes: 7 additions & 7 deletions sequencesender/rpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,12 @@ import (

func (s *SequenceSender) getBatchFromRPC(batchNumber uint64) (*rpcbatch.RPCBatch, error) {
type zkEVMBatch struct {
Blocks []string `mapstructure:"blocks"`
BatchL2Data string `mapstructure:"batchL2Data"`
Coinbase string `mapstructure:"coinbase"`
GlobalExitRoot string `mapstructure:"globalExitRoot"`
Closed bool `mapstructure:"closed"`
Timestamp string `mapstructure:"timestamp"`
Blocks []string `json:"blocks"`
BatchL2Data string `json:"batchL2Data"`
Coinbase string `json:"coinbase"`
GlobalExitRoot string `json:"globalExitRoot"`
Closed bool `json:"closed"`
Timestamp string `json:"timestamp"`
}

zkEVMBatchData := zkEVMBatch{}
Expand Down Expand Up @@ -68,7 +68,7 @@ func (s *SequenceSender) getBatchFromRPC(batchNumber uint64) (*rpcbatch.RPCBatch

func (s *SequenceSender) getL2BlockTimestampFromRPC(blockHash string) (uint64, error) {
type zkeEVML2Block struct {
Timestamp string `mapstructure:"timestamp"`
Timestamp string `json:"timestamp"`
}

l2Block := zkeEVML2Block{}
Expand Down
16 changes: 8 additions & 8 deletions sequencesender/seqsendertypes/rpcbatch/rpcbatch.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,14 +8,14 @@ import (
)

type RPCBatch struct {
batchNumber uint64 `mapstructure:"batchNumber"`
blockHashes []string `mapstructure:"blocks"`
batchL2Data []byte `mapstructure:"batchL2Data"`
globalExitRoot common.Hash `mapstructure:"globalExitRoot"`
coinbase common.Address `mapstructure:"coinbase"`
closed bool `mapstructure:"closed"`
lastL2BlockTimestamp uint64 `mapstructure:"lastL2BlockTimestamp"`
l1InfoTreeIndex uint32 `mapstructure:"l1InfoTreeIndex"`
batchNumber uint64 `json:"batchNumber"`
blockHashes []string `json:"blocks"`
batchL2Data []byte `json:"batchL2Data"`
globalExitRoot common.Hash `json:"globalExitRoot"`
coinbase common.Address `json:"coinbase"`
closed bool `json:"closed"`
lastL2BlockTimestamp uint64 `json:"lastL2BlockTimestamp"`
l1InfoTreeIndex uint32 `json:"l1InfoTreeIndex"`
}

func New(batchNumber uint64, blockHashes []string, batchL2Data []byte, globalExitRoot common.Hash, coinbase common.Address, closed bool) (*RPCBatch, error) {
Expand Down
87 changes: 53 additions & 34 deletions sequencesender/sequencesender.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"github.com/0xPolygon/cdk/etherman"
"github.com/0xPolygon/cdk/log"
"github.com/0xPolygon/cdk/sequencesender/seqsendertypes"
"github.com/0xPolygon/cdk/sequencesender/seqsendertypes/rpcbatch"
"github.com/0xPolygon/cdk/sequencesender/txbuilder"
"github.com/0xPolygon/cdk/state"
"github.com/0xPolygonHermez/zkevm-ethtx-manager/ethtxmanager"
Expand Down Expand Up @@ -127,52 +128,70 @@ func (s *SequenceSender) Start(ctx context.Context) {

// batchRetrieval keeps reading batches from the RPC
func (s *SequenceSender) batchRetrieval(ctx context.Context) error {
ticker := time.NewTicker(s.cfg.GetBatchWaitInterval.Duration)
defer ticker.Stop()

currentBatchNumber := s.latestVirtualBatchNumber + 1
for {
rpcBatch, err := s.getBatchFromRPC(currentBatchNumber)
if err != nil {
if err == state.ErrNotFound {
log.Infof("batch %d not found in RPC", currentBatchNumber)
} else {
log.Errorf("error getting batch %d from RPC: %v", currentBatchNumber, err)
select {
case <-ctx.Done():
log.Info("context cancelled, stopping batch retrieval")
return ctx.Err()
default:
// Try to retrieve batch from RPC
rpcBatch, err := s.getBatchFromRPC(currentBatchNumber)
if err != nil {
if err == state.ErrNotFound {
log.Infof("batch %d not found in RPC", currentBatchNumber)
} else {
log.Errorf("error getting batch %d from RPC: %v", currentBatchNumber, err)
}
<-ticker.C
continue
}
time.Sleep(s.cfg.GetBatchWaitInterval.Duration)
continue
}

// Check if the batch is closed
if !rpcBatch.IsClosed() {
log.Infof("batch %d is not closed yet", currentBatchNumber)
time.Sleep(s.cfg.GetBatchWaitInterval.Duration)
continue
}
// Check if the batch is closed
if !rpcBatch.IsClosed() {
log.Infof("batch %d is not closed yet", currentBatchNumber)
<-ticker.C
continue
}

// Create new batch
s.mutexSequence.Lock()
s.sequenceList = append(s.sequenceList, currentBatchNumber)
// Process and decode the batch
if err := s.populateSequenceData(rpcBatch, currentBatchNumber); err != nil {
return err
}

// Decode batch to retrieve the l1 info tree index
batchRaw, err := state.DecodeBatchV2(rpcBatch.L2Data())
if err != nil {
log.Errorf("Failed to decode batch data, err: %v", err)
return err
// Increment the batch number for the next iteration
currentBatchNumber++
}
}
}

if len(batchRaw.Blocks) > 0 {
rpcBatch.SetL1InfoTreeIndex(batchRaw.Blocks[len(batchRaw.Blocks)-1].IndexL1InfoTree)
}
func (s *SequenceSender) populateSequenceData(rpcBatch *rpcbatch.RPCBatch, batchNumber uint64) error {
s.mutexSequence.Lock()
defer s.mutexSequence.Unlock()

data := &sequenceData{
batchClosed: rpcBatch.IsClosed(),
batch: rpcBatch,
batchRaw: batchRaw,
}
s.sequenceList = append(s.sequenceList, batchNumber)

// Decode batch to retrieve the l1 info tree index
batchRaw, err := state.DecodeBatchV2(rpcBatch.L2Data())
if err != nil {
log.Errorf("Failed to decode batch data, err: %v", err)
return err
}

s.sequenceData[currentBatchNumber] = data
s.mutexSequence.Unlock()
if len(batchRaw.Blocks) > 0 {
rpcBatch.SetL1InfoTreeIndex(batchRaw.Blocks[len(batchRaw.Blocks)-1].IndexL1InfoTree)
}

currentBatchNumber++
s.sequenceData[batchNumber] = &sequenceData{
batchClosed: rpcBatch.IsClosed(),
batch: rpcBatch,
batchRaw: batchRaw,
}

return nil
}

// sequenceSending starts loop to check if there are sequences to send and sends them if it's convenient
Expand Down

0 comments on commit 7c6595c

Please sign in to comment.