diff --git a/submitter/relayer/relayer.go b/submitter/relayer/relayer.go index 1e23ebb..4d16793 100644 --- a/submitter/relayer/relayer.go +++ b/submitter/relayer/relayer.go @@ -55,14 +55,15 @@ func New( ) *Relayer { metrics.ResendIntervalSecondsGauge.Set(float64(config.ResendIntervalSeconds)) return &Relayer{ - Estimator: est, - BTCWallet: wallet, - tag: tag, - version: version, - submitterAddress: submitterAddress, - metrics: metrics, - config: config, - logger: parentLogger.With(zap.String("module", "relayer")).Sugar(), + Estimator: est, + BTCWallet: wallet, + tag: tag, + version: version, + submitterAddress: submitterAddress, + metrics: metrics, + config: config, + lastSubmittedCheckpoint: &types.CheckpointInfo{}, + logger: parentLogger.With(zap.String("module", "relayer")).Sugar(), } } @@ -80,8 +81,8 @@ func (rl *Relayer) SendCheckpointToBTC(ckpt *ckpttypes.RawCheckpointWithMetaResp return nil } - if rl.lastSubmittedCheckpoint == nil || rl.lastSubmittedCheckpoint.Epoch < ckptEpoch { - rl.logger.Infof("Submitting a raw checkpoint for epoch %v for the first time", ckptEpoch) + if rl.shouldSendCompleteCkpt(ckptEpoch) { + rl.logger.Infof("Submitting a raw checkpoint for epoch %v", ckptEpoch) submittedCheckpoint, err := rl.convertCkptToTwoTxAndSubmit(ckpt.Ckpt) if err != nil { @@ -90,6 +91,16 @@ func (rl *Relayer) SendCheckpointToBTC(ckpt *ckpttypes.RawCheckpointWithMetaResp rl.lastSubmittedCheckpoint = submittedCheckpoint + return nil + } else if rl.shouldSendTx2(ckptEpoch) { + rl.logger.Infof("Retrying to send tx2 for epoch %v, tx1 %s", ckptEpoch, rl.lastSubmittedCheckpoint.Tx1.TxId) + submittedCheckpoint, err := rl.retrySendTx2(ckpt.Ckpt) + if err != nil { + return err + } + + rl.lastSubmittedCheckpoint = submittedCheckpoint + return nil } @@ -144,6 +155,16 @@ func (rl *Relayer) SendCheckpointToBTC(ckpt *ckpttypes.RawCheckpointWithMetaResp return nil } +func (rl *Relayer) shouldSendCompleteCkpt(ckptEpoch uint64) bool { + return rl.lastSubmittedCheckpoint.Tx1 == nil || rl.lastSubmittedCheckpoint.Epoch < ckptEpoch +} + +// shouldSendTx2 - we want to avoid resending tx1 if only tx2 submission has failed +func (rl *Relayer) shouldSendTx2(ckptEpoch uint64) bool { + return (rl.lastSubmittedCheckpoint.Tx1 != nil || rl.lastSubmittedCheckpoint.Epoch < ckptEpoch) && + rl.lastSubmittedCheckpoint.Tx2 == nil +} + // shouldResendCheckpoint checks whether the bumpedFee is effective for replacement func (rl *Relayer) shouldResendCheckpoint(ckptInfo *types.CheckpointInfo, bumpedFee btcutil.Amount) bool { // if the bumped fee is less than the fee of the previous second tx plus the minimum required bumping fee @@ -238,49 +259,67 @@ func (rl *Relayer) signTx(tx *wire.MsgTx) (*wire.MsgTx, error) { return signedTx, nil } -func (rl *Relayer) convertCkptToTwoTxAndSubmit(ckpt *ckpttypes.RawCheckpointResponse) (*types.CheckpointInfo, error) { +func (rl *Relayer) encodeCheckpointData(ckpt *ckpttypes.RawCheckpointResponse) ([]byte, []byte, error) { + // Convert to raw checkpoint rawCkpt, err := ckpt.ToRawCheckpoint() if err != nil { - return nil, err + return nil, nil, err } + + // Convert raw checkpoint to BTC checkpoint btcCkpt, err := ckpttypes.FromRawCkptToBTCCkpt(rawCkpt, rl.submitterAddress) if err != nil { - return nil, err + return nil, nil, err } + + // Encode checkpoint data data1, data2, err := btctxformatter.EncodeCheckpointData( rl.tag, rl.version, btcCkpt, ) if err != nil { - return nil, err - } - - tx1, tx2, err := rl.ChainTwoTxAndSend(data1, data2) - if err != nil { - return nil, err + return nil, nil, err } - // this is to wait for btcwallet to update utxo database so that - // the tx that tx1 consumes will not appear in the next unspent txs lit - time.Sleep(1 * time.Second) + // Return the encoded data + return data1, data2, nil +} +func (rl *Relayer) logAndRecordCheckpointMetrics(tx1, tx2 *types.BtcTxInfo, epochNum uint64) { + // Log the transactions sent for checkpointing rl.logger.Infof("Sent two txs to BTC for checkpointing epoch %v, first txid: %s, second txid: %s", - ckpt.EpochNum, tx1.Tx.TxHash().String(), tx2.Tx.TxHash().String()) + epochNum, tx1.Tx.TxHash().String(), tx2.Tx.TxHash().String()) - // record metrics of the two transactions + // Record metrics for the first transaction rl.metrics.NewSubmittedCheckpointSegmentGaugeVec.WithLabelValues( - strconv.Itoa(int(ckpt.EpochNum)), + strconv.Itoa(int(epochNum)), "0", tx1.Tx.TxHash().String(), strconv.Itoa(int(tx1.Fee)), ).SetToCurrentTime() + + // Record metrics for the second transaction rl.metrics.NewSubmittedCheckpointSegmentGaugeVec.WithLabelValues( - strconv.Itoa(int(ckpt.EpochNum)), + strconv.Itoa(int(epochNum)), "1", tx2.Tx.TxHash().String(), strconv.Itoa(int(tx2.Fee)), ).SetToCurrentTime() +} + +func (rl *Relayer) convertCkptToTwoTxAndSubmit(ckpt *ckpttypes.RawCheckpointResponse) (*types.CheckpointInfo, error) { + data1, data2, err := rl.encodeCheckpointData(ckpt) + if err != nil { + return nil, err + } + + tx1, tx2, err := rl.ChainTwoTxAndSend(data1, data2) + if err != nil { + return nil, err + } + + rl.logAndRecordCheckpointMetrics(tx1, tx2, ckpt.EpochNum) return &types.CheckpointInfo{ Epoch: ckpt.EpochNum, @@ -290,34 +329,68 @@ func (rl *Relayer) convertCkptToTwoTxAndSubmit(ckpt *ckpttypes.RawCheckpointResp }, nil } -// ChainTwoTxAndSend builds two chaining txs with the given data: -// the second tx consumes the output of the first tx -func (rl *Relayer) ChainTwoTxAndSend(data1 []byte, data2 []byte) (*types.BtcTxInfo, *types.BtcTxInfo, error) { - // recipient is a change address that all the - // remaining balance of the utxo is sent to - tx1, err := rl.buildTxWithData(data1, nil) +// retrySendTx2 - rebuilds the tx2 and sends it, expects that tx1 has been sent and +// lastSubmittedCheckpoint.Tx1 is not nil +func (rl *Relayer) retrySendTx2(ckpt *ckpttypes.RawCheckpointResponse) (*types.CheckpointInfo, error) { + _, data2, err := rl.encodeCheckpointData(ckpt) if err != nil { - return nil, nil, fmt.Errorf("failed to add data to tx1: %w", err) + return nil, err } - tx1.TxId, err = rl.sendTxToBTC(tx1.Tx) + tx1 := rl.lastSubmittedCheckpoint.Tx1 + if tx1 == nil { + return nil, fmt.Errorf("tx1 is nil") // shouldn't happen, sanity check + } + + tx2, err := rl.buildAndSendTx(data2, tx1.Tx) + if err != nil { + return nil, err + } + + rl.logAndRecordCheckpointMetrics(tx1, tx2, ckpt.EpochNum) + + return &types.CheckpointInfo{ + Epoch: ckpt.EpochNum, + Ts: time.Now(), + Tx1: tx1, + Tx2: tx2, + }, nil +} + +// buildAndSendTx helper function to build and send a transaction +func (rl *Relayer) buildAndSendTx(data []byte, parentTx *wire.MsgTx) (*types.BtcTxInfo, error) { + tx, err := rl.buildTxWithData(data, parentTx) if err != nil { - return nil, nil, fmt.Errorf("failed to send tx1 to BTC: %w", err) + return nil, fmt.Errorf("failed to add data to tx: %w", err) } - // the second tx consumes the second output (index 1) - // of the first tx, as the output at index 0 is OP_RETURN - tx2, err := rl.buildTxWithData(data2, tx1.Tx) + tx.TxId, err = rl.sendTxToBTC(tx.Tx) if err != nil { - return nil, nil, fmt.Errorf("failed to add data to tx2: %w", err) + return nil, fmt.Errorf("failed to send tx to BTC: %w", err) } - tx2.TxId, err = rl.sendTxToBTC(tx2.Tx) + return tx, nil +} + +// ChainTwoTxAndSend builds two chaining txs with the given data: +// the second tx consumes the output of the first tx +func (rl *Relayer) ChainTwoTxAndSend(data1 []byte, data2 []byte) (*types.BtcTxInfo, *types.BtcTxInfo, error) { + // recipient is a change address that all the + // remaining balance of the utxo is sent to + + tx1, err := rl.buildAndSendTx(data1, nil) if err != nil { - return nil, nil, fmt.Errorf("failed to send tx2 to BTC: %w", err) + return nil, nil, err } - // TODO: if tx1 succeeds but tx2 fails, we should not resent tx1 + // cache the success of tx1, we need it if we fail with tx2 send + rl.lastSubmittedCheckpoint.Tx1 = tx1 + + // Build and send tx2, using tx1 as the parent + tx2, err := rl.buildAndSendTx(data2, tx1.Tx) + if err != nil { + return nil, nil, err + } return tx1, tx2, nil }