Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

multi: log additional information for local force closures #8072

Open
wants to merge 7 commits into
base: master
Choose a base branch
from
216 changes: 216 additions & 0 deletions channeldb/channel.go
Original file line number Diff line number Diff line change
Expand Up @@ -3269,6 +3269,13 @@ const (
// localForceCloseInitiatorType is used to serialize/deserialize
// localForceCloseInitiator.
localForceCloseInitiatorType tlv.Type = 0

// chainActionType is used to serialize/deserialize chainAction.
chainActionType tlv.Type = 1

// ActionKeyType is used to serialize/deserialize the action key in the
// chain actions map.
ActionKeyType tlv.Type = 2
)

// String returns the human-readable format of the LocalForceCloseInitiator.
Expand Down Expand Up @@ -3342,6 +3349,215 @@ func DeserializeLocalForceCloseInitiator(r io.Reader) (LocalForceCloseInitiator,
return LocalForceCloseInitiator(lc), nil
}

// HtlcActionRecordSize returns the amount of bytes this TLV record will occupy
// when encoded.
func HtlcActionRecordSize(a *map[string][]HTLC) func() uint64 {
var (
b bytes.Buffer
buf [8]byte
)

// We know that encoding works since the tests pass in the build this
// file is checked into, so we'll simplify things and simply encode it
// ourselves then report the total amount of bytes used.
if err := HtlcActionEncoder(&b, a, &buf); err != nil {
// This should never error out, but we log it just in case it
// does.
log.Errorf("encoding the chain action map failed: %v",
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should return an error here.

err)
}

return func() uint64 {
return uint64(len(b.Bytes()))
}
}

// HtlcActionEncoder is a custom TLV encoder for the htlcAction record.
func HtlcActionEncoder(w io.Writer, val interface{}, buf *[8]byte) error {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should also add this customized encode decoders to the FuzzSuite as well, or what do you think @morehouse ?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This would be great, though we currently have no fuzz tests for channeldb. Writing those tests is probably a bigger project to tackle separately, rather than holding up this PR.

See #8045 for some progress on channeldb fuzzing.

if v, ok := val.(*map[string][]HTLC); ok {
numRecords := uint64(len(*v))

// First, we'll write out the number of records as a var int.
if err := tlv.WriteVarInt(w, numRecords, buf); err != nil {
return err
}

// With that written out, we'll now encode the entries
// themselves as a sub-TLV record, which includes its _own_
// inner length prefix.
for action, htlcs := range *v {
action := []byte(action)

tlvstream, err := tlv.NewStream(
tlv.MakePrimitiveRecord(
ActionKeyType, &action,
),
)
if err != nil {
return err
}

var actionBytes bytes.Buffer

err = tlvstream.Encode(&actionBytes)
if err != nil {
return err
}
if err != nil {
return err
}

// We encode the record with a varint length followed by
// the _raw_ TLV bytes.
tlvLen := uint64(len(actionBytes.Bytes()))
if err := tlv.WriteVarInt(w, tlvLen, buf); err != nil {
return err
}

_, err = w.Write(actionBytes.Bytes())
if err != nil {
return err
}
err = SerializeHtlcs(w, htlcs...)
if err != nil {
return err
}
}

return nil
}

return tlv.NewTypeForEncodingErr(val, "map[string][]HTLC")
}

// HtlcActionDecoder is a custom TLV decoder for the htlcAction record.
func HtlcActionDecoder(r io.Reader, val interface{}, buf *[8]byte,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we use interface value here? I think these encoder/decoder methods are only used for HTLC?

Copy link
Contributor Author

@Chinwendu20 Chinwendu20 Dec 18, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is where that function is used: https://github.com/lightningnetwork/lnd/blob/b22d72d44d5c71f732f15e5a796f5de7fb39a451/channeldb/channel.go#L3498-L3503C5
Also take a look at the function parameters for that function:

lnd/tlv/record.go

Lines 186 to 187 in 0df507e

func MakeDynamicRecord(typ Type, val interface{}, sizeFunc SizeFunc,
encoder Encoder, decoder Decoder) Record {

This is the function signature for Decoder type:
type Decoder func(r io.Reader, val interface{}, buf *[8]byte, l uint64) error

l uint64) error {

if v, ok := val.(*map[string][]HTLC); ok {
// First, we'll decode the varint that encodes how many actions
// are encoded within the map.
numRecords, err := tlv.ReadVarInt(r, buf)
if err != nil {
return err
}
// Now that we know how many records we'll need to read, we can
// iterate and read them all out in series.
for i := uint64(0); i < numRecords; i++ {
// Read out the varint that encodes the size of this
// inner TLV record.
actionRecordSize, err := tlv.ReadVarInt(r, buf)
if err != nil {
return err
}

// Using this information, we'll create a new limited
// reader that'll return an EOF once the end has been
// reached so the stream stops consuming bytes.
actionTlvReader := io.LimitedReader{
R: r,
N: int64(actionRecordSize),
}

var action []byte

tlvStream, err := tlv.NewStream(
tlv.MakePrimitiveRecord(
ActionKeyType, &action,
),
)

if err != nil {
return err
}

err = tlvStream.Decode(&actionTlvReader)
if err != nil {
return err
}

htlcs, err := DeserializeHtlcs(r)
if err != nil {
return err
}
(*v)[string(action)] = htlcs
}

return nil
}

return tlv.NewTypeForDecodingErr(
val, "map[string][]HTLC", l, l,
)
}

// SerializeChainActions writes out the passed chain actions map to the passed
// writer.
func SerializeChainActions(w io.Writer, m *map[string][]HTLC) error {
tlvStream, err := tlv.NewStream(
tlv.MakeDynamicRecord(
chainActionType, m,
HtlcActionRecordSize(m),
HtlcActionEncoder,
HtlcActionDecoder,
),
)
if err != nil {
return err
}

var b bytes.Buffer
err = tlvStream.Encode(&b)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is a set of writeTlvStream and readTlvStream methods that can be used here.

Copy link
Contributor Author

@Chinwendu20 Chinwendu20 Dec 18, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, I did not know this. The function I was following its pattern did not use this.

if err != nil {
return err
}

err = binary.Write(w, byteOrder, uint64(b.Len()))
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why not use varint here as well ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good question, I just followed the serialization pattern I saw for other structures such as this:

lnd/channeldb/invoices.go

Lines 821 to 915 in 56dba2d

func serializeInvoice(w io.Writer, i *invpkg.Invoice) error {
creationDateBytes, err := i.CreationDate.MarshalBinary()
if err != nil {
return err
}
settleDateBytes, err := i.SettleDate.MarshalBinary()
if err != nil {
return err
}
var fb bytes.Buffer
err = i.Terms.Features.EncodeBase256(&fb)
if err != nil {
return err
}
featureBytes := fb.Bytes()
preimage := [32]byte(invpkg.UnknownPreimage)
if i.Terms.PaymentPreimage != nil {
preimage = *i.Terms.PaymentPreimage
if preimage == invpkg.UnknownPreimage {
return errors.New("cannot use all-zeroes preimage")
}
}
value := uint64(i.Terms.Value)
cltvDelta := uint32(i.Terms.FinalCltvDelta)
expiry := uint64(i.Terms.Expiry)
amtPaid := uint64(i.AmtPaid)
state := uint8(i.State)
var hodlInvoice uint8
if i.HodlInvoice {
hodlInvoice = 1
}
tlvStream, err := tlv.NewStream(
// Memo and payreq.
tlv.MakePrimitiveRecord(memoType, &i.Memo),
tlv.MakePrimitiveRecord(payReqType, &i.PaymentRequest),
// Add/settle metadata.
tlv.MakePrimitiveRecord(createTimeType, &creationDateBytes),
tlv.MakePrimitiveRecord(settleTimeType, &settleDateBytes),
tlv.MakePrimitiveRecord(addIndexType, &i.AddIndex),
tlv.MakePrimitiveRecord(settleIndexType, &i.SettleIndex),
// Terms.
tlv.MakePrimitiveRecord(preimageType, &preimage),
tlv.MakePrimitiveRecord(valueType, &value),
tlv.MakePrimitiveRecord(cltvDeltaType, &cltvDelta),
tlv.MakePrimitiveRecord(expiryType, &expiry),
tlv.MakePrimitiveRecord(paymentAddrType, &i.Terms.PaymentAddr),
tlv.MakePrimitiveRecord(featuresType, &featureBytes),
// Invoice state.
tlv.MakePrimitiveRecord(invStateType, &state),
tlv.MakePrimitiveRecord(amtPaidType, &amtPaid),
tlv.MakePrimitiveRecord(hodlInvoiceType, &hodlInvoice),
// Invoice AMP state.
tlv.MakeDynamicRecord(
invoiceAmpStateType, &i.AMPState,
ampRecordSize(&i.AMPState),
ampStateEncoder, ampStateDecoder,
),
)
if err != nil {
return err
}
var b bytes.Buffer
if err = tlvStream.Encode(&b); err != nil {
return err
}
err = binary.Write(w, byteOrder, uint64(b.Len()))
if err != nil {
return err
}
if _, err = w.Write(b.Bytes()); err != nil {
return err
}
// Only if this is a _non_ AMP invoice do we serialize the HTLCs
// in-line with the rest of the invoice.
if i.IsAMP() {
return nil
}
return serializeHtlcs(w, i.Htlcs)
}

Hopefully @cfromknecht can explain

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the main reason for varints is in the communication of data (between peers over the wire where every byte counts). So I guess because we are only storing this data and then presenting it to the user via rpc calls we can do it both ways here. No strong opinions here ...

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Mostly like due to old practice - we've switched to use varint as it's more space-efficient.

if err != nil {
return err
}

if _, err = w.Write(b.Bytes()); err != nil {
return err
}

return nil
}

// DeserializeChainActions reads out the chainAction map from the passed
// reader.
func DeserializeChainActions(r io.Reader) (map[string][]HTLC,
error) {

chainAction := make(map[string][]HTLC)

tlvStream, err := tlv.NewStream(
tlv.MakeDynamicRecord(
chainActionType, &chainAction,
HtlcActionRecordSize(&chainAction),
HtlcActionEncoder,
HtlcActionDecoder,
),
)

if err != nil {
return chainAction, err
}

var bodyLen int64
err = binary.Read(r, byteOrder, &bodyLen)
if err != nil {
return chainAction, err
}

lr := io.LimitReader(r, bodyLen)
if err = tlvStream.Decode(lr); err != nil {
return chainAction, err
}

return chainAction, nil
}

// ChannelCloseSummary contains the final state of a channel at the point it
// was closed. Once a channel is closed, all the information pertaining to that
// channel within the openChannelBucket is deleted, and a compact summary is
Expand Down
82 changes: 82 additions & 0 deletions contractcourt/briefcase.go
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,16 @@ type ArbitratorLog interface {
FetchLocalForceCloseInitiator() (channeldb.LocalForceCloseInitiator,
error)

// LogLocalFCChainActions records the passed-in chainActionMap
// for the force closed channel into the ArbitratorLog for deferred
// consumption.
LogLocalFCChainActions(ChainActionMap)

// FetchLocalFCChainActions attempts to fetch the chainActionMap
// for the force closed channel that was previously logged in the
// ArbitratorLog.
FetchLocalFCChainActions() (map[string][]channeldb.HTLC, error)

// WipeHistory is to be called ONLY once *all* contracts have been
// fully resolved, and the channel closure if finalized. This method
// will delete all on-disk state within the persistent log.
Expand Down Expand Up @@ -384,6 +394,10 @@ var (
// localForceCloseInitiatorKey is the key that we use to store the
// localForceCloseInitiator object within the log, if any.
localForceCloseInitiatorKey = []byte("local-force-close-info")

// fcChainActionKey is the key that we use to store the chain actions
// during a force close.
fcChainActionKey = []byte("htlc-map-key")
)

var (
Expand Down Expand Up @@ -1124,6 +1138,74 @@ func (b *boltArbitratorLog) FetchLocalForceCloseInitiator() (
return init, nil
}

// LogLocalFCChainActions records the passed-in chainActionMap
// for the force closed channel into the ArbitratorLog for deferred
// consumption.
func (b *boltArbitratorLog) LogLocalFCChainActions(
chainActions ChainActionMap) {

htlcMap := make(map[string][]channeldb.HTLC)
for action, htlcs := range chainActions {
htlcMap[action.String()] = htlcs
}

err := kvdb.Update(b.db, func(tx kvdb.RwTx) error {
scopeBucket, err := tx.CreateTopLevelBucket(b.scopeKey[:])
if err != nil {
return err
}
var buf bytes.Buffer
if err := channeldb.SerializeChainActions(&buf,
&htlcMap); err != nil {
return err
}

return scopeBucket.Put(fcChainActionKey, buf.Bytes())
}, func() {})

if err != nil {
log.Error(err)
}
}

// FetchLocalFCChainActions attempts to fetch the chainActionMap
// for the force closed channel that was previously logged in the
// ArbitratorLog.
func (b *boltArbitratorLog) FetchLocalFCChainActions() (
map[string][]channeldb.HTLC,
error) {

var chainAction map[string][]channeldb.HTLC
err := kvdb.View(b.db, func(tx kvdb.RTx) error {
scopeBucket := tx.ReadBucket(b.scopeKey[:])
if scopeBucket == nil {
return errScopeBucketNoExist
}

chainActionBytes := scopeBucket.Get(fcChainActionKey)
if len(chainActionBytes) == 0 {
return nil
}

chainActionReader := bytes.NewReader(chainActionBytes)
chMap, err := channeldb.DeserializeChainActions(
chainActionReader)
if err != nil {
return err
}

chainAction = chMap

return nil
}, func() {})

if err != nil {
return nil, err
}

return chainAction, nil
}

// InsertConfirmedCommitSet stores the known set of active HTLCs at the time
// channel closure. We'll use this to reconstruct our set of chain actions anew
// based on the confirmed and pending commitment state.
Expand Down
5 changes: 1 addition & 4 deletions contractcourt/chain_arbitrator.go
Original file line number Diff line number Diff line change
Expand Up @@ -1100,10 +1100,6 @@ func ChainTriggerForceClose(
chainActions ChainActionMap) ForceCloseContractOption {

return func(ca *ChannelArbitrator) {
htlcMap := make(map[string][]channeldb.HTLC)
for action, htlcs := range chainActions {
htlcMap[action.String()] = htlcs
}
// Ignore errors since logging force close info is not a
// critical part of the force close flow.
err := ca.log.LogLocalForceCloseInitiator(
Expand All @@ -1114,6 +1110,7 @@ func ChainTriggerForceClose(
"close initiator: %v",
err.Error())
}
ca.log.LogLocalFCChainActions(chainActions)

log.Info("Logged chain trigger initiated force close.")
}
Expand Down
4 changes: 2 additions & 2 deletions contractcourt/channel_arbitrator.go
Original file line number Diff line number Diff line change
Expand Up @@ -965,10 +965,10 @@ func (c *ChannelArbitrator) stateStep(
// Ignore errors since logging force close info is not a
// critical part of the force close flow.
ChainTriggerForceClose(chainActions)(c)
fallthrough
nextState = StateBroadcastCommit
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why this change?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because we do not want this function called for https://github.com/lightningnetwork/lnd/pull/8072/files/c60ea993c2a31fe62c3e481cd40618455bf60afd..b22d72d44d5c71f732f15e5a796f5de7fb39a451#diff-a0b8064876b1b1d6085fa7ffdbfd38c81cb06c1ca3f34a08dbaacba203cda3ebL969-L970 on line 970 called for chain trigger force close. Chain action would be recorded twice in that case.

case userTrigger:
c.log.LogLocalFCChainActions(chainActions)
nextState = StateBroadcastCommit

// If the trigger is a cooperative close being confirmed, then
// we can go straight to StateFullyResolved, as there won't be
// any contracts to resolve.
Expand Down
19 changes: 19 additions & 0 deletions contractcourt/channel_arbitrator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ type mockArbitratorLog struct {
resolutions *ContractResolutions
resolvers map[ContractResolver]struct{}
localFCInfo channeldb.LocalForceCloseInitiator
fcChainActions map[string][]channeldb.HTLC

commitSet *CommitSet

Expand Down Expand Up @@ -162,6 +163,24 @@ func (b *mockArbitratorLog) WipeHistory() error {
return nil
}

func (b *mockArbitratorLog) LogLocalFCChainActions(
chainActions ChainActionMap) {

htlcMap := make(map[string][]channeldb.HTLC)
for action, htlcs := range chainActions {
htlcMap[action.String()] = htlcs
}

b.fcChainActions = htlcMap
}

func (b *mockArbitratorLog) FetchLocalFCChainActions() (
map[string][]channeldb.HTLC,
error) {

return b.fcChainActions, nil
}

// testArbLog is a wrapper around an existing (ideally fully concrete
// ArbitratorLog) that lets us intercept certain calls like transitioning to a
// new state.
Expand Down