Skip to content

Commit

Permalink
fix: working listener + more control
Browse files Browse the repository at this point in the history
  • Loading branch information
rach-id committed Jul 4, 2024
1 parent 98a1ada commit 7c4b57d
Showing 1 changed file with 83 additions and 8 deletions.
91 changes: 83 additions & 8 deletions replay/replayer.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"bytes"
"context"
"crypto/ecdsa"
"encoding/hex"
"fmt"
"time"

Expand All @@ -27,6 +28,9 @@ func Follow(
targetBlobstreamContractAddress string,
targetChainGatewayAddress string,
privateKey *ecdsa.PrivateKey,
headerRangeFunctionID [32]byte,
nextHeaderFunctionID [32]byte,
filterRange int64,
) error {
logger.Info("listening for new proofs on the source chain")
sourceBlobstreamX, err := blobstreamxwrapper.NewBlobstreamX(ethcmn.HexToAddress(sourceBlobstreamContractAddress), sourceEVMClient)
Expand Down Expand Up @@ -69,10 +73,33 @@ func Follow(
continue
} else if event.StartBlock > latestTargetContractBlock {
logger.Info("the target contract needs to catchup", "event_start_block", event.StartBlock, "target_contract_latest_block", latestTargetContractBlock)
err = Catchup(ctx, logger, verify, trpc, sourceEVMClient, targetEVMClient, sourceBlobstreamContractAddress, targetBlobstreamContractAddress, targetChainGatewayAddress, privateKey)
err = Catchup(
ctx,
logger,
verify,
trpc,
sourceEVMClient,
targetEVMClient,
sourceBlobstreamContractAddress,
targetBlobstreamContractAddress,
targetChainGatewayAddress,
privateKey,
headerRangeFunctionID,
nextHeaderFunctionID,
filterRange,
)
if err != nil {
return err
}
latestTargetContractBlock, err = targetBlobstreamX.LatestBlock(&bind.CallOpts{Context: ctx})
if err != nil {
return err
}
if event.EndBlock == latestTargetContractBlock {
// the contract is already up to date
logger.Info("contract up to date", "target_contract_latest_block", event.EndBlock)
continue
}
}
logger.Debug("getting transaction containing the proof", "nonce", event.ProofNonce.Int64(), "hash", event.Raw.TxHash.Hex(), "start_block", event.StartBlock)
tx, _, err := sourceEVMClient.TransactionByHash(ctx, event.Raw.TxHash)
Expand All @@ -82,7 +109,8 @@ func Follow(

logger.Debug("decoding the proof")
rawMap := make(map[string]interface{})
err = abi.UnpackIntoMap(rawMap, "fulfillCall", tx.Data()[4:])
inputArgs := abi.Methods["fulfillCall"].Inputs
err = inputArgs.UnpackIntoMap(rawMap, tx.Data()[4:])
if err != nil {
return err
}
Expand All @@ -94,6 +122,13 @@ func Follow(

// update the address to be the target blobstreamX contract for the callback
decodedArgs.CallbackAddress = ethcmn.HexToAddress(targetBlobstreamContractAddress)
if event.EndBlock-event.StartBlock > 1 {
// this is a header range proof
decodedArgs.FunctionID = headerRangeFunctionID
} else {
// this is a next header proof
decodedArgs.FunctionID = nextHeaderFunctionID
}

logger.Info("replaying the proof", "nonce", event.ProofNonce.Int64())
opts, err := newTransactOptsBuilder(privateKey)(ctx, targetEVMClient, 25000000)
Expand Down Expand Up @@ -130,9 +165,10 @@ func Catchup(
targetBlobstreamContractAddress string,
targetChainGatewayAddress string,
privateKey *ecdsa.PrivateKey,
headerRangeFunctionID [32]byte,
nextHeaderFunctionID [32]byte,
filterRange int64,
) error {
filterRange := int64(5000)

lookupStartHeight, err := sourceEVMClient.BlockNumber(ctx)
if err != nil {
return err
Expand Down Expand Up @@ -165,14 +201,14 @@ func Catchup(
return err
}

// TODO: this could be improved in the future to only get the events needed
dataCommitmentEvents, err := getAllDataCommitmentStoredEvents(
ctx,
logger,
&sourceBlobstreamX.BlobstreamXFilterer,
int64(lookupStartHeight),
filterRange,
latestSourceContractNonce.Int64(),
int64(latestTargetContractBlock),
)
if err != nil {
return err
Expand Down Expand Up @@ -202,7 +238,19 @@ func Catchup(
if bytes.Equal(coreDataCommitment.DataCommitment.Bytes(), event.DataCommitment[:]) {
logger.Info("data commitment verified")
} else {
logger.Error("data commitment mismatch!! quitting", "proof_nonce_in_source_contract", event.ProofNonce, "start_block", event.StartBlock, "end_block", event.EndBlock)
logger.Error(
"data commitment mismatch!! quitting",
"proof_nonce_in_source_contract",
event.ProofNonce,
"start_block",
event.StartBlock,
"end_block",
event.EndBlock,
"expected_data_commitment",
hex.EncodeToString(coreDataCommitment.DataCommitment.Bytes()),
"actual_data_commitment",
hex.EncodeToString(event.DataCommitment[:]),
)
return fmt.Errorf("data commitment mistmatch. start height %d end height %d", event.StartBlock, event.EndBlock)
}
}
Expand Down Expand Up @@ -243,6 +291,14 @@ func Catchup(
// update the address to be the target blobstreamX contract for the callback
decodedArgs.CallbackAddress = ethcmn.HexToAddress(targetBlobstreamContractAddress)

if event.EndBlock-event.StartBlock > 1 {
// this is a header range proof
decodedArgs.FunctionID = headerRangeFunctionID
} else {
// this is a next header proof
decodedArgs.FunctionID = nextHeaderFunctionID
}

logger.Info("replaying the proof", "startHeight", startHeight)
opts, err := newTransactOptsBuilder(privateKey)(ctx, targetEVMClient, 25000000)
if err != nil {
Expand All @@ -262,15 +318,25 @@ func Catchup(
if err != nil {
return err
}
startHeight = event.EndBlock
// make sure the contract was updated
latestTargetContractBlock, err = targetBlobstreamX.LatestBlock(&bind.CallOpts{Context: ctx})
if err != nil {
return err
}
if latestTargetContractBlock == event.EndBlock {
// contract updated successfully, we can advance
startHeight = event.EndBlock
} else {
logger.Error("contract did not update successfully, retrying the same proof", "expected_target_height", event.EndBlock, "actual_target_height", latestTargetContractBlock)
}
}

latestTargetContractBlock, err = targetBlobstreamX.LatestBlock(&bind.CallOpts{Context: ctx})
if err != nil {
return err
}

logger.Info("contract up to date", "latest_nonce", latestTargetContractBlock)
logger.Info("contract up to date", "latest_target_contract_block", latestTargetContractBlock)
return nil
}

Expand All @@ -281,6 +347,7 @@ func getAllDataCommitmentStoredEvents(
lookupStartHeight int64,
filterRange int64,
latestSourceContractNonce int64,
latestTargetContractBlock int64,
) (map[int64]blobstreamxwrapper.BlobstreamXDataCommitmentStored, error) {
logger.Info("querying all the data commitment stored events in the source contract...")
dataCommitmentEvents := make(map[int64]blobstreamxwrapper.BlobstreamXDataCommitmentStored)
Expand All @@ -302,13 +369,17 @@ func getAllDataCommitmentStoredEvents(
return nil, err
}

gatheredTheNecessaryEvents := false
for {
if events.Event != nil {
_, exists := dataCommitmentEvents[int64(events.Event.StartBlock)]
if exists {
continue
} else {

Check failure on line 378 in replay/replayer.go

View workflow job for this annotation

GitHub Actions / lint / golangci-lint

superfluous-else: if block ends with a continue statement, so drop this else and outdent its block (revive)
dataCommitmentEvents[int64(events.Event.StartBlock)] = *events.Event
if int64(events.Event.StartBlock) < latestTargetContractBlock {
gatheredTheNecessaryEvents = true
}
}
}
if !events.Next() {
Expand All @@ -320,6 +391,10 @@ func getAllDataCommitmentStoredEvents(
logger.Info("found all events", "count", len(dataCommitmentEvents))
break
}
if gatheredTheNecessaryEvents {
logger.Info("found enough events to cover the needed range", "count", len(dataCommitmentEvents))
break
}
logger.Info("found events", "count", len(dataCommitmentEvents))
}
return dataCommitmentEvents, nil
Expand Down

0 comments on commit 7c4b57d

Please sign in to comment.