Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

sweep: remove possible RBF when sweeping new inputs #8091

Merged
merged 11 commits into from
Oct 25, 2023
97 changes: 70 additions & 27 deletions sweep/sweeper.go
Original file line number Diff line number Diff line change
Expand Up @@ -687,22 +687,64 @@ func (s *UtxoSweeper) removeExclusiveGroup(group uint64) {
func (s *UtxoSweeper) sweepCluster(cluster inputCluster,
currentHeight int32) error {

// Execute the sweep within a coin select lock. Otherwise the coins that
// we are going to spend may be selected for other transactions like
// funding of a channel.
// Execute the sweep within a coin select lock. Otherwise the coins
// that we are going to spend may be selected for other transactions
// like funding of a channel.
return s.cfg.Wallet.WithCoinSelectLock(func() error {
// Examine pending inputs and try to construct
// lists of inputs.
inputLists, err := s.getInputLists(cluster, currentHeight)
// Examine pending inputs and try to construct lists of inputs.
allSets, newSets, err := s.getInputLists(cluster, currentHeight)
ziggie1984 marked this conversation as resolved.
Show resolved Hide resolved
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

seems like allSets only returns sets of previously-published sweeps. Why not reflect that on the name, e.g existingSets ?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

allSet uses all the inputs to make the sweeps tho.

if err != nil {
return fmt.Errorf("unable to examine pending inputs: %v", err)
return fmt.Errorf("examine pending inputs: %w", err)
}

// Sweep selected inputs.
for _, inputs := range inputLists {
err := s.sweep(inputs, cluster.sweepFeeRate, currentHeight)
// errAllSets records the error from broadcasting the sweeping
// transactions for all input sets.
var errAllSets error

// allSets contains retried inputs and new inputs. To avoid
// creating an RBF for the new inputs, we'd sweep this set
// first.
for _, inputs := range allSets {
errAllSets = s.sweep(
ziggie1984 marked this conversation as resolved.
Show resolved Hide resolved
inputs, cluster.sweepFeeRate, currentHeight,
)
// TODO(yy): we should also find out which set created
// this error. If there are new inputs in this set, we
// should give it a second chance by sweeping them
// below. To enable this, we need to provide richer
// state for each input other than just recording the
// publishAttempts. We'd also need to refactor how we
// create the input sets. Atm, the steps are,
// 1. create a list of input sets.
// 2. sweep each set by creating and publishing the tx.
// We should change the flow as,
// 1. create a list of input sets, and for each set,
// 2. when created, we create and publish the tx.
// 3. if the publish fails, find out which input is
// causing the failure and retry the rest of the
// inputs.
if errAllSets != nil {
log.Errorf("sweep all inputs: %w", err)
break
}
}

// If we have successfully swept all inputs, there's no need to
// sweep the new inputs as it'd create an RBF case.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: ... there's no need to sweep the new inputs separately ...

if allSets != nil && errAllSets == nil {
return nil
}

// We'd end up there if there's no retried inputs. In this
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: Comment needs an update as well now

// case, we'd sweep the new input sets. If there's an error
// when sweeping a given set, we'd log the error and sweep the
// next set.
for _, inputs := range newSets {
err := s.sweep(
inputs, cluster.sweepFeeRate, currentHeight,
)
if err != nil {
return fmt.Errorf("unable to sweep inputs: %v", err)
log.Errorf("sweep new inputs: %w", err)
}
}

Expand Down Expand Up @@ -1017,23 +1059,25 @@ func (s *UtxoSweeper) signalAndRemove(outpoint *wire.OutPoint, result Result) {
}

// getInputLists goes through the given inputs and constructs multiple distinct
// sweep lists with the given fee rate, each up to the configured maximum number
// of inputs. Negative yield inputs are skipped. Transactions with an output
// below the dust limit are not published. Those inputs remain pending and will
// be bundled with future inputs if possible.
// sweep lists with the given fee rate, each up to the configured maximum
// number of inputs. Negative yield inputs are skipped. Transactions with an
// output below the dust limit are not published. Those inputs remain pending
// and will be bundled with future inputs if possible. It returns two list -
// one containing all inputs and the other containing only the new inputs. If
// there's no retried inputs, the first set returned will be empty.
func (s *UtxoSweeper) getInputLists(cluster inputCluster,
currentHeight int32) ([]inputSet, error) {
currentHeight int32) ([]inputSet, []inputSet, error) {

// Filter for inputs that need to be swept. Create two lists: all
// sweepable inputs and a list containing only the new, never tried
// inputs.
//
// We want to create as large a tx as possible, so we return a final set
// list that starts with sets created from all inputs. However, there is
// a chance that those txes will not publish, because they already
// contain inputs that failed before. Therefore we also add sets
// consisting of only new inputs to the list, to make sure that new
// inputs are given a good, isolated chance of being published.
// We want to create as large a tx as possible, so we return a final
// set list that starts with sets created from all inputs. However,
// there is a chance that those txes will not publish, because they
// already contain inputs that failed before. Therefore we also add
// sets consisting of only new inputs to the list, to make sure that
// new inputs are given a good, isolated chance of being published.
//
// TODO(yy): this would lead to conflict transactions as the same input
// can be used in two sweeping transactions, and our rebroadcaster will
Expand Down Expand Up @@ -1070,7 +1114,8 @@ func (s *UtxoSweeper) getInputLists(cluster inputCluster,
s.cfg.MaxInputsPerTx, s.cfg.Wallet,
)
if err != nil {
return nil, fmt.Errorf("input partitionings: %v", err)
return nil, nil, fmt.Errorf("input partitionings: %w",
err)
}
}

Expand All @@ -1080,15 +1125,13 @@ func (s *UtxoSweeper) getInputLists(cluster inputCluster,
s.cfg.MaxInputsPerTx, s.cfg.Wallet,
)
if err != nil {
return nil, fmt.Errorf("input partitionings: %v", err)
return nil, nil, fmt.Errorf("input partitionings: %w", err)
}

log.Debugf("Sweep candidates at height=%v: total_num_pending=%v, "+
"total_num_new=%v", currentHeight, len(allSets), len(newSets))

// Append the new sets at the end of the list, because those tx likely
// have a higher fee per input.
return append(allSets, newSets...), nil
return allSets, newSets, nil
}

// sweep takes a set of preselected inputs, creates a sweep tx and publishes the
Expand Down
117 changes: 110 additions & 7 deletions sweep/sweeper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -860,10 +860,6 @@ func TestRetry(t *testing.T) {
// We expect a sweep to be published.
ctx.receiveTx()

// New block arrives. This should trigger a new sweep attempt timer
// start.
ctx.notifier.NotifyEpoch(1000)

// Offer a fresh input.
resultChan1, err := ctx.sweeper.SweepInput(
spendableInputs[1], defaultFeePref,
Expand All @@ -872,9 +868,7 @@ func TestRetry(t *testing.T) {
t.Fatal(err)
}

// Two txes are expected to be published, because new and retry inputs
// are separated.
ctx.receiveTx()
// A single tx is expected to be published.
ctx.receiveTx()

ctx.backend.mine()
Expand Down Expand Up @@ -2436,3 +2430,112 @@ func TestClusterByLockTime(t *testing.T) {
})
}
}

// TestGetInputLists checks that the expected input sets are returned based on
// whether there are retried inputs or not.
func TestGetInputLists(t *testing.T) {
t.Parallel()

// Create a test param with a dummy fee preference. This is needed so
// `feeRateForPreference` won't throw an error.
param := Params{Fee: FeePreference{ConfTarget: 1}}

// Create a mock input and mock all the methods used in this test.
testInput := &input.MockInput{}
testInput.On("RequiredLockTime").Return(0, false)
testInput.On("WitnessType").Return(input.CommitmentAnchor)
testInput.On("OutPoint").Return(&wire.OutPoint{Index: 1})
testInput.On("RequiredTxOut").Return(nil)
testInput.On("UnconfParent").Return(nil)
testInput.On("SignDesc").Return(&input.SignDescriptor{
Output: &wire.TxOut{Value: 100_000},
})

// Create a new and a retried input.
//
// NOTE: we use the same input.Input for both pending inputs as we only
// test the logic of returning the correct non-nil input sets, and not
// the content the of sets. To validate the content of the sets, we
// should test `generateInputPartitionings` instead.
newInput := &pendingInput{
Input: testInput,
params: param,
}
oldInput := &pendingInput{
Input: testInput,
Roasbeef marked this conversation as resolved.
Show resolved Hide resolved
params: param,
publishAttempts: 1,
}

// clusterNew contains only new inputs.
clusterNew := pendingInputs{
wire.OutPoint{Index: 1}: newInput,
}

// clusterMixed contains a mixed of new and retried inputs.
clusterMixed := pendingInputs{
wire.OutPoint{Index: 1}: newInput,
wire.OutPoint{Index: 2}: oldInput,
}

// clusterOld contains only retried inputs.
clusterOld := pendingInputs{
wire.OutPoint{Index: 2}: oldInput,
}

// Create a test sweeper.
s := New(&UtxoSweeperConfig{
MaxInputsPerTx: DefaultMaxInputsPerTx,
})

testCases := []struct {
name string
cluster inputCluster
expectedNilAllSet bool
expectNilNewSet bool
}{
{
// When there are only new inputs, we'd expect the
// first returned set(allSets) to be empty.
name: "new inputs only",
cluster: inputCluster{inputs: clusterNew},
expectedNilAllSet: true,
expectNilNewSet: false,
},
{
// When there are only retried inputs, we'd expect the
// second returned set(newSet) to be empty.
name: "retried inputs only",
cluster: inputCluster{inputs: clusterOld},
expectedNilAllSet: false,
expectNilNewSet: true,
},
{
// When there are mixed inputs, we'd expect two sets
// are returned.
name: "mixed inputs",
cluster: inputCluster{inputs: clusterMixed},
expectedNilAllSet: false,
expectNilNewSet: false,
},
}

for _, tc := range testCases {
tc := tc

t.Run(tc.name, func(t *testing.T) {
t.Parallel()

allSets, newSets, err := s.getInputLists(tc.cluster, 0)
require.NoError(t, err)

if tc.expectNilNewSet {
require.Nil(t, newSets)
}

if tc.expectedNilAllSet {
require.Nil(t, allSets)
}
})
}
}