Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

multi: log additional information for local force closures #8072

Open
wants to merge 7 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
514 changes: 513 additions & 1 deletion channeldb/channel.go

Large diffs are not rendered by default.

48 changes: 46 additions & 2 deletions channeldb/channel_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1087,7 +1087,8 @@ func TestFetchWaitingCloseChannels(t *testing.T) {
},
)

if err := channel.MarkCommitmentBroadcasted(closeTx, true); err != nil {
if err := channel.MarkCommitmentBroadcasted(closeTx,
&LocalForceCloseInsights{}); err != nil {
t.Fatalf("unable to mark commitment broadcast: %v", err)
}

Expand Down Expand Up @@ -1291,7 +1292,8 @@ func TestCloseInitiator(t *testing.T) {
// local initiator.
updateChannel: func(c *OpenChannel) error {
return c.MarkCommitmentBroadcasted(
&wire.MsgTx{}, true,
&wire.MsgTx{},
&LocalForceCloseInsights{},
)
},
expectedStatuses: []ChannelStatus{
Expand Down Expand Up @@ -1633,3 +1635,45 @@ func TestOnionBlobIncorrectLength(t *testing.T) {
_, err := DeserializeHtlcs(&b)
require.ErrorIs(t, err, ErrOnionBlobLength)
}

// TestLocalForceCloseInitiatorEncoding tests serializing and deserializing
// LocalForceCloseInitiator.
func TestLocalForceCloseInitiatorEncoding(t *testing.T) {
t.Parallel()

testCases := []struct {
name string
data LocalForceCloseInitiator
}{
{
name: "chain action initiated",
data: ChainActionsInitiated,
},
{
name: "user initiated",
data: ChainActionsInitiated,
},
{
name: "custom initiator",
data: LocalForceCloseInitiator("invalid update"),
},
}

for _, testCase := range testCases {
testCase := testCase

t.Run(testCase.name, func(t *testing.T) {
t.Parallel()

var b bytes.Buffer
err := SerializeLocalForceCloseInitiator(&b,
&testCase.data)
require.NoError(t, err)

r := bytes.NewReader(b.Bytes())
data, err := DeserializeLocalForceCloseInitiator(r)
require.NoError(t, err)
require.Equal(t, testCase.data, data)
})
}
}
159 changes: 159 additions & 0 deletions contractcourt/briefcase.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,30 @@ type ArbitratorLog interface {
// introduced.
FetchChainActions() (ChainActionMap, error)

// LogLocalForceCloseInitiator records the passed-in
// localForceCloseInitiator object within the ArbitratorLog for deferred
// consumption. This is typically invoked when the channel force closure
// is first initiated either by the user or automatically (
// link error, HTLC actions, etc.).
LogLocalForceCloseInitiator(
initiator channeldb.LocalForceCloseInitiator) error

// FetchLocalForceCloseInitiator attempts to fetch the
// localForceCloseInitiator object that was previously logged in the
// ArbitratorLog.
FetchLocalForceCloseInitiator() (channeldb.LocalForceCloseInitiator,
error)

// LogLocalFCChainActions records the passed-in chainActionMap
// for the force closed channel into the ArbitratorLog for deferred
// consumption.
LogLocalFCChainActions(ChainActionMap)

// FetchLocalFCChainActions attempts to fetch the chainActionMap
// for the force closed channel that was previously logged in the
// ArbitratorLog.
FetchLocalFCChainActions() (map[string][]channeldb.HTLC, error)

// WipeHistory is to be called ONLY once *all* contracts have been
// fully resolved, and the channel closure if finalized. This method
// will delete all on-disk state within the persistent log.
Expand Down Expand Up @@ -366,6 +390,14 @@ var (
// taprootDataKey is the key we'll use to store taproot specific data
// for the set of channels we'll need to sweep/claim.
taprootDataKey = []byte("taproot-data")

// localForceCloseInitiatorKey is the key that we use to store the
// localForceCloseInitiator object within the log, if any.
localForceCloseInitiatorKey = []byte("local-force-close-info")

// fcChainActionKey is the key that we use to store the chain actions
// during a force close.
fcChainActionKey = []byte("htlc-map-key")
)

var (
Expand Down Expand Up @@ -1047,6 +1079,133 @@ func (b *boltArbitratorLog) FetchChainActions() (ChainActionMap, error) {
return actionsMap, nil
}

// LogLocalForceCloseInitiator records the passed-in localForceCloseInitiator
// object within the ArbitratorLog for deferred consumption. This is
// typically invoked when the channel force closure is first initiated
// either by the user or automatically (link error, HTLC actions, etc.).
func (b *boltArbitratorLog) LogLocalForceCloseInitiator(
info channeldb.LocalForceCloseInitiator) error {

return kvdb.Update(b.db, func(tx kvdb.RwTx) error {
scopeBucket, err := tx.CreateTopLevelBucket(b.scopeKey[:])
if err != nil {
return err
}
var buf bytes.Buffer
if err := channeldb.SerializeLocalForceCloseInitiator(&buf,
&info); err != nil {
return err
}

return scopeBucket.Put(localForceCloseInitiatorKey, buf.Bytes())
}, func() {})
}

// FetchLocalForceCloseInitiator attempts to fetch the localForceCloseInitiator
// object that was previously logged in the ArbitratorLog.
func (b *boltArbitratorLog) FetchLocalForceCloseInitiator() (
channeldb.LocalForceCloseInitiator,
error) {

var init channeldb.LocalForceCloseInitiator
err := kvdb.View(b.db, func(tx kvdb.RTx) error {
hieblmi marked this conversation as resolved.
Show resolved Hide resolved
scopeBucket := tx.ReadBucket(b.scopeKey[:])
if scopeBucket == nil {
return errScopeBucketNoExist
}

infoBytes := scopeBucket.Get(localForceCloseInitiatorKey)
if infoBytes == nil {
return nil
}

infoReader := bytes.NewReader(infoBytes)
lcInfo, err := channeldb.DeserializeLocalForceCloseInitiator(
infoReader)
if err != nil {
return err
}

init = lcInfo

return nil
}, func() {})

if err != nil {
return "", err
}

return init, nil
}

// LogLocalFCChainActions records the passed-in chainActionMap
// for the force closed channel into the ArbitratorLog for deferred
// consumption.
func (b *boltArbitratorLog) LogLocalFCChainActions(
chainActions ChainActionMap) {

htlcMap := make(map[string][]channeldb.HTLC)
for action, htlcs := range chainActions {
htlcMap[action.String()] = htlcs
}

err := kvdb.Update(b.db, func(tx kvdb.RwTx) error {
scopeBucket, err := tx.CreateTopLevelBucket(b.scopeKey[:])
if err != nil {
return err
}
var buf bytes.Buffer
if err := channeldb.SerializeChainActions(&buf,
&htlcMap); err != nil {
return err
}

return scopeBucket.Put(fcChainActionKey, buf.Bytes())
}, func() {})

if err != nil {
log.Error(err)
}
}

// FetchLocalFCChainActions attempts to fetch the chainActionMap
// for the force closed channel that was previously logged in the
// ArbitratorLog.
func (b *boltArbitratorLog) FetchLocalFCChainActions() (
map[string][]channeldb.HTLC,
error) {

var chainAction map[string][]channeldb.HTLC
err := kvdb.View(b.db, func(tx kvdb.RTx) error {
scopeBucket := tx.ReadBucket(b.scopeKey[:])
if scopeBucket == nil {
return errScopeBucketNoExist
}

chainActionBytes := scopeBucket.Get(fcChainActionKey)
if len(chainActionBytes) == 0 {
return nil
}

chainActionReader := bytes.NewReader(chainActionBytes)
chMap, err := channeldb.DeserializeChainActions(
chainActionReader)
if err != nil {
return err
}

chainAction = chMap

return nil
}, func() {})

if err != nil {
return nil, err
}

return chainAction, nil
}

// InsertConfirmedCommitSet stores the known set of active HTLCs at the time
// channel closure. We'll use this to reconstruct our set of chain actions anew
// based on the confirmed and pending commitment state.
Expand Down
64 changes: 63 additions & 1 deletion contractcourt/chain_arbitrator.go
Original file line number Diff line number Diff line change
Expand Up @@ -1057,13 +1057,74 @@ type forceCloseReq struct {
closeTx chan *wire.MsgTx
}

// ForceCloseContractOption is used for providing functional options to callers
// of ForceCloseContract().
type ForceCloseContractOption func(*ChannelArbitrator)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think this is the right strategy to go for here. Using continuations to implement this is "too powerful" and we probably just want to explicitly enumerate the cases.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Okay could you please explain why this is so? I got that it is "too powerful" but that was not backed up by anything really.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is too powerful because you are using a full continuation here. Someone could supply a function that does anything and it is delegating the process of calling the "log" functions to the argument. If you look at ChainArbitrator.ForceCloseContract it is simply calling out to the supplied opt do do the log action. However, all of what you need to do with this is finitely enumerable and we have no reason to believe it ever won't be. So you should just take a regular data parameter and then call the appropriate log action depending on that value.

Continuation passing has its uses but I don't think this is a good instance of it.

That said, I wouldn't waste time taking a look at these smaller comments before zooming out and reworking the approach. See my other comment. I think @ziggie1984 has the right idea of what to do here.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I also don't think it's the right approach - 1) it's not really an option, but an extra step in ForceCloseContract where we save the info 2) this can be expanded indefinitely, instead we should limit the function scope here. I think you could define a ForceCloseReason struct here and let other callers use it in ForceCloseContract. Just a rough thought, need to think about it more

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't quite get this @yyforyongyu. Care explaining more about this?


// UserInitiatedForceClose is a functional option for ForceCloseContract() that
// signifies that the force close is purely user-initiated.
func UserInitiatedForceClose(ca *ChannelArbitrator) {
// Just log errors since this is not a critical part of the force close
// flow.
err := ca.log.LogLocalForceCloseInitiator(channeldb.UserInitiated)
if err != nil {
log.Errorf("Error logging user initiated force "+
"close initiator: %v", err.Error())
}

log.Info("Logged user initiated force close.")
}

// LinkFailureErrorForceClose is a functional option for ForceCloseContract()
// that signifies that the force close is due to a link failure. Requires the
// error message of the link failure to be specified.
func LinkFailureErrorForceClose(errorMsg string) ForceCloseContractOption {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should accept a LinkFailureError as opposed to a string. We should move the type to a separate package if necessary to accomplish this.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you please explain why this should be stored as a LinkFailureError as opposed to string? Note that why this errorMsg is converted to channeldb's LocalForceCloseInitiator type and stored in this function. The string contains the reason for the Link failure which is the necessary information that we need any way.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's not that it needs to be stored as a LinkFailureError. But this function should accept a LinkFailureError for type safety reasons. It enforces stricter discipline at potential callsites

return func(ca *ChannelArbitrator) {
// Just log errors since this is not a critical part of the
// force close flow.
err := ca.log.LogLocalForceCloseInitiator(
channeldb.LocalForceCloseInitiator(errorMsg))

if err != nil {
log.Errorf("Error logging link failure error force "+
"close initiator: %v",
err.Error())
}
log.Infof("Logged link failure error force close: %v", errorMsg)
}
}

// ChainTriggerForceClose is a functional option for ForceCloseContract() that
// signifies that the force close is due to a chain trigger.
func ChainTriggerForceClose(
chainActions ChainActionMap) ForceCloseContractOption {

return func(ca *ChannelArbitrator) {
// Ignore errors since logging force close info is not a
// critical part of the force close flow.
err := ca.log.LogLocalForceCloseInitiator(
channeldb.ChainActionsInitiated)

if err != nil {
log.Errorf("Error logging chain action force "+
"close initiator: %v",
err.Error())
}
ca.log.LogLocalFCChainActions(chainActions)

log.Info("Logged chain trigger initiated force close.")
}
}

// ForceCloseContract attempts to force close the channel infield by the passed
// channel point. A force close will immediately terminate the contract,
// causing it to enter the resolution phase. If the force close was successful,
// then the force close transaction itself will be returned.
//
// TODO(roasbeef): just return the summary itself?
func (c *ChainArbitrator) ForceCloseContract(chanPoint wire.OutPoint) (*wire.MsgTx, error) {
func (c *ChainArbitrator) ForceCloseContract(chanPoint wire.OutPoint,
opt ForceCloseContractOption) (*wire.MsgTx, error) {

c.Lock()
arbitrator, ok := c.activeChannels[chanPoint]
c.Unlock()
Expand All @@ -1072,6 +1133,7 @@ func (c *ChainArbitrator) ForceCloseContract(chanPoint wire.OutPoint) (*wire.Msg
}

log.Infof("Attempting to force close ChannelPoint(%v)", chanPoint)
opt(arbitrator)

// Before closing, we'll attempt to send a disable update for the
// channel. We do so before closing the channel as otherwise the current
Expand Down
3 changes: 2 additions & 1 deletion contractcourt/chain_arbitrator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,8 @@ func TestChainArbitratorRepublishCloses(t *testing.T) {
for i := 0; i < numChans/2; i++ {
closeTx := channels[i].FundingTxn.Copy()
closeTx.TxIn[0].PreviousOutPoint = channels[i].FundingOutpoint
err := channels[i].MarkCommitmentBroadcasted(closeTx, true)
err := channels[i].MarkCommitmentBroadcasted(closeTx,
&channeldb.LocalForceCloseInsights{})
if err != nil {
t.Fatal(err)
}
Expand Down
Loading
Loading