Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore(relayer): prevent tx1 resubmission #28

Merged
merged 26 commits into from
Sep 2, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
157 changes: 115 additions & 42 deletions submitter/relayer/relayer.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,14 +55,15 @@ func New(
) *Relayer {
metrics.ResendIntervalSecondsGauge.Set(float64(config.ResendIntervalSeconds))
return &Relayer{
Estimator: est,
BTCWallet: wallet,
tag: tag,
version: version,
submitterAddress: submitterAddress,
metrics: metrics,
config: config,
logger: parentLogger.With(zap.String("module", "relayer")).Sugar(),
Estimator: est,
BTCWallet: wallet,
tag: tag,
version: version,
submitterAddress: submitterAddress,
metrics: metrics,
config: config,
lastSubmittedCheckpoint: &types.CheckpointInfo{},
logger: parentLogger.With(zap.String("module", "relayer")).Sugar(),
}
}

Expand All @@ -80,8 +81,8 @@ func (rl *Relayer) SendCheckpointToBTC(ckpt *ckpttypes.RawCheckpointWithMetaResp
return nil
}

if rl.lastSubmittedCheckpoint == nil || rl.lastSubmittedCheckpoint.Epoch < ckptEpoch {
rl.logger.Infof("Submitting a raw checkpoint for epoch %v for the first time", ckptEpoch)
if rl.shouldSendCompleteCkpt(ckptEpoch) {
rl.logger.Infof("Submitting a raw checkpoint for epoch %v", ckptEpoch)

submittedCheckpoint, err := rl.convertCkptToTwoTxAndSubmit(ckpt.Ckpt)
if err != nil {
Expand All @@ -90,6 +91,16 @@ func (rl *Relayer) SendCheckpointToBTC(ckpt *ckpttypes.RawCheckpointWithMetaResp

rl.lastSubmittedCheckpoint = submittedCheckpoint

return nil
} else if rl.shouldSendTx2(ckptEpoch) {
rl.logger.Infof("Retrying to send tx2 for epoch %v, tx1 %s", ckptEpoch, rl.lastSubmittedCheckpoint.Tx1.TxId)
submittedCheckpoint, err := rl.retrySendTx2(ckpt.Ckpt)
if err != nil {
return err
}

rl.lastSubmittedCheckpoint = submittedCheckpoint

return nil
}

Expand Down Expand Up @@ -144,6 +155,16 @@ func (rl *Relayer) SendCheckpointToBTC(ckpt *ckpttypes.RawCheckpointWithMetaResp
return nil
}

func (rl *Relayer) shouldSendCompleteCkpt(ckptEpoch uint64) bool {
return rl.lastSubmittedCheckpoint.Tx1 == nil || rl.lastSubmittedCheckpoint.Epoch < ckptEpoch
}

// shouldSendTx2 - we want to avoid resending tx1 if only tx2 submission has failed
func (rl *Relayer) shouldSendTx2(ckptEpoch uint64) bool {
return (rl.lastSubmittedCheckpoint.Tx1 != nil || rl.lastSubmittedCheckpoint.Epoch < ckptEpoch) &&
rl.lastSubmittedCheckpoint.Tx2 == nil
}

// shouldResendCheckpoint checks whether the bumpedFee is effective for replacement
func (rl *Relayer) shouldResendCheckpoint(ckptInfo *types.CheckpointInfo, bumpedFee btcutil.Amount) bool {
// if the bumped fee is less than the fee of the previous second tx plus the minimum required bumping fee
Expand Down Expand Up @@ -238,49 +259,67 @@ func (rl *Relayer) signTx(tx *wire.MsgTx) (*wire.MsgTx, error) {
return signedTx, nil
}

func (rl *Relayer) convertCkptToTwoTxAndSubmit(ckpt *ckpttypes.RawCheckpointResponse) (*types.CheckpointInfo, error) {
func (rl *Relayer) encodeCheckpointData(ckpt *ckpttypes.RawCheckpointResponse) ([]byte, []byte, error) {
// Convert to raw checkpoint
rawCkpt, err := ckpt.ToRawCheckpoint()
if err != nil {
return nil, err
return nil, nil, err
}

// Convert raw checkpoint to BTC checkpoint
btcCkpt, err := ckpttypes.FromRawCkptToBTCCkpt(rawCkpt, rl.submitterAddress)
if err != nil {
return nil, err
return nil, nil, err
}

// Encode checkpoint data
data1, data2, err := btctxformatter.EncodeCheckpointData(
rl.tag,
rl.version,
btcCkpt,
)
if err != nil {
return nil, err
}

tx1, tx2, err := rl.ChainTwoTxAndSend(data1, data2)
if err != nil {
return nil, err
return nil, nil, err
}

// this is to wait for btcwallet to update utxo database so that
// the tx that tx1 consumes will not appear in the next unspent txs lit
time.Sleep(1 * time.Second)
// Return the encoded data
return data1, data2, nil
}

func (rl *Relayer) logAndRecordCheckpointMetrics(tx1, tx2 *types.BtcTxInfo, epochNum uint64) {
// Log the transactions sent for checkpointing
rl.logger.Infof("Sent two txs to BTC for checkpointing epoch %v, first txid: %s, second txid: %s",
ckpt.EpochNum, tx1.Tx.TxHash().String(), tx2.Tx.TxHash().String())
epochNum, tx1.Tx.TxHash().String(), tx2.Tx.TxHash().String())

// record metrics of the two transactions
// Record metrics for the first transaction
rl.metrics.NewSubmittedCheckpointSegmentGaugeVec.WithLabelValues(
strconv.Itoa(int(ckpt.EpochNum)),
strconv.Itoa(int(epochNum)),
"0",
tx1.Tx.TxHash().String(),
strconv.Itoa(int(tx1.Fee)),
).SetToCurrentTime()

// Record metrics for the second transaction
rl.metrics.NewSubmittedCheckpointSegmentGaugeVec.WithLabelValues(
strconv.Itoa(int(ckpt.EpochNum)),
strconv.Itoa(int(epochNum)),
"1",
tx2.Tx.TxHash().String(),
strconv.Itoa(int(tx2.Fee)),
).SetToCurrentTime()
}

func (rl *Relayer) convertCkptToTwoTxAndSubmit(ckpt *ckpttypes.RawCheckpointResponse) (*types.CheckpointInfo, error) {
data1, data2, err := rl.encodeCheckpointData(ckpt)
if err != nil {
return nil, err
}

tx1, tx2, err := rl.ChainTwoTxAndSend(data1, data2)
if err != nil {
return nil, err
}

rl.logAndRecordCheckpointMetrics(tx1, tx2, ckpt.EpochNum)

return &types.CheckpointInfo{
Epoch: ckpt.EpochNum,
Expand All @@ -290,34 +329,68 @@ func (rl *Relayer) convertCkptToTwoTxAndSubmit(ckpt *ckpttypes.RawCheckpointResp
}, nil
}

// ChainTwoTxAndSend builds two chaining txs with the given data:
// the second tx consumes the output of the first tx
func (rl *Relayer) ChainTwoTxAndSend(data1 []byte, data2 []byte) (*types.BtcTxInfo, *types.BtcTxInfo, error) {
// recipient is a change address that all the
// remaining balance of the utxo is sent to
tx1, err := rl.buildTxWithData(data1, nil)
// retrySendTx2 - rebuilds the tx2 and sends it, expects that tx1 has been sent and
// lastSubmittedCheckpoint.Tx1 is not nil
func (rl *Relayer) retrySendTx2(ckpt *ckpttypes.RawCheckpointResponse) (*types.CheckpointInfo, error) {
_, data2, err := rl.encodeCheckpointData(ckpt)
if err != nil {
return nil, nil, fmt.Errorf("failed to add data to tx1: %w", err)
return nil, err
}

tx1.TxId, err = rl.sendTxToBTC(tx1.Tx)
tx1 := rl.lastSubmittedCheckpoint.Tx1
if tx1 == nil {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

better to have a doc string for this function saying that the tx1 should not be nil

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But this function uses tx1 implicitly through state, I'll add the doc string above, but I would live the check in

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yep yep, keeping the check is good here

return nil, fmt.Errorf("tx1 is nil") // shouldn't happen, sanity check
}

tx2, err := rl.buildAndSendTx(data2, tx1.Tx)
if err != nil {
return nil, err
}

rl.logAndRecordCheckpointMetrics(tx1, tx2, ckpt.EpochNum)

return &types.CheckpointInfo{
Epoch: ckpt.EpochNum,
Ts: time.Now(),
Tx1: tx1,
Tx2: tx2,
}, nil
}

// buildAndSendTx helper function to build and send a transaction
func (rl *Relayer) buildAndSendTx(data []byte, parentTx *wire.MsgTx) (*types.BtcTxInfo, error) {
tx, err := rl.buildTxWithData(data, parentTx)
if err != nil {
return nil, nil, fmt.Errorf("failed to send tx1 to BTC: %w", err)
return nil, fmt.Errorf("failed to add data to tx: %w", err)
}

// the second tx consumes the second output (index 1)
// of the first tx, as the output at index 0 is OP_RETURN
tx2, err := rl.buildTxWithData(data2, tx1.Tx)
tx.TxId, err = rl.sendTxToBTC(tx.Tx)
if err != nil {
return nil, nil, fmt.Errorf("failed to add data to tx2: %w", err)
return nil, fmt.Errorf("failed to send tx to BTC: %w", err)
}

tx2.TxId, err = rl.sendTxToBTC(tx2.Tx)
return tx, nil
}

// ChainTwoTxAndSend builds two chaining txs with the given data:
// the second tx consumes the output of the first tx
func (rl *Relayer) ChainTwoTxAndSend(data1 []byte, data2 []byte) (*types.BtcTxInfo, *types.BtcTxInfo, error) {
// recipient is a change address that all the
// remaining balance of the utxo is sent to

tx1, err := rl.buildAndSendTx(data1, nil)
if err != nil {
return nil, nil, fmt.Errorf("failed to send tx2 to BTC: %w", err)
return nil, nil, err
}

// TODO: if tx1 succeeds but tx2 fails, we should not resent tx1
// cache the success of tx1, we need it if we fail with tx2 send
rl.lastSubmittedCheckpoint.Tx1 = tx1

// Build and send tx2, using tx1 as the parent
tx2, err := rl.buildAndSendTx(data2, tx1.Tx)
if err != nil {
return nil, nil, err
}

return tx1, tx2, nil
}
Expand Down
Loading