From 204b6c51cf0e66301b106b5ca52c6c93e5e26ade Mon Sep 17 00:00:00 2001 From: eugene Date: Mon, 12 Oct 2020 11:08:30 -0400 Subject: [PATCH] channeldb: MigrateOutpointIndex, store indexStatus in outpoint index Adds an outpoint index that stores a tlv stream. Currently the stream only contains the outpoint's indexStatus. This should cut down on big bbolt transactions in several places throughout the codebase. --- channeldb/channel.go | 100 ++++++++++++++- channeldb/db.go | 13 ++ channeldb/migration20/codec.go | 36 ++++++ channeldb/migration20/log.go | 14 +++ channeldb/migration20/migration.go | 196 +++++++++++++++++++++++++++++ 5 files changed, 354 insertions(+), 5 deletions(-) create mode 100644 channeldb/migration20/codec.go create mode 100644 channeldb/migration20/log.go create mode 100644 channeldb/migration20/migration.go diff --git a/channeldb/channel.go b/channeldb/channel.go index 48a52648d5..41f513374a 100644 --- a/channeldb/channel.go +++ b/channeldb/channel.go @@ -21,6 +21,7 @@ import ( "github.com/lightningnetwork/lnd/keychain" "github.com/lightningnetwork/lnd/lnwire" "github.com/lightningnetwork/lnd/shachain" + "github.com/lightningnetwork/lnd/tlv" ) const ( @@ -45,6 +46,13 @@ var ( // TODO(roasbeef): flesh out comment openChannelBucket = []byte("open-chan-bucket") + // outpointBucket stores all of our channel outpoints and a tlv + // stream containing channel data. + // + // outpoint -> tlv stream + // + outpointBucket = []byte("outpoint-bucket") + // historicalChannelBucket stores all channels that have seen their // commitment tx confirm. All information from their previous open state // is retained. @@ -167,12 +175,35 @@ var ( // the height requested in the revocation log. ErrLogEntryNotFound = fmt.Errorf("log entry not found") + // ErrMissingIndexEntry is returned when a caller attempts to close a + // channel and the outpoint is missing from the index. + ErrMissingIndexEntry = fmt.Errorf("missing outpoint from index") + // errHeightNotFound is returned when a query for channel balances at // a height that we have not reached yet is made. errHeightNotReached = fmt.Errorf("height requested greater than " + "current commit height") ) +const ( + // A tlv type definition used to serialize an outpoint's indexStatus + // for use in the outpoint index. + indexStatusType tlv.Type = 0 +) + +// indexStatus is an enum-like type that describes what state the +// outpoint is in. Currently only two possible values. +type indexStatus uint8 + +const ( + // outpointOpen represents an outpoint that is open in the outpoint index. + outpointOpen indexStatus = 0 + + // outpointClosed represents an outpoint that is closed in the outpoint + // index. + outpointClosed indexStatus = 1 +) + // ChannelType is an enum-like type that describes one of several possible // channel types. Each open channel is associated with a particular type as the // channel type may determine how higher level operations are conducted such as @@ -827,6 +858,39 @@ func fetchChanBucketRw(tx kvdb.RwTx, nodeKey *btcec.PublicKey, // nolint:interfa // fullSync syncs the contents of an OpenChannel while re-using an existing // database transaction. func (c *OpenChannel) fullSync(tx kvdb.RwTx) error { + // Fetch the outpoint bucket and check if the outpoint already exists. + opBucket := tx.ReadWriteBucket(outpointBucket) + + var chanPointBuf bytes.Buffer + if err := writeOutpoint(&chanPointBuf, &c.FundingOutpoint); err != nil { + return err + } + + // Now, check if the outpoint exists in our index. + if opBucket.Get(chanPointBuf.Bytes()) != nil { + return ErrChanAlreadyExists + } + + status := uint8(outpointOpen) + + // Write the status of this outpoint as the first entry in a tlv + // stream. + statusRecord := tlv.MakePrimitiveRecord(indexStatusType, &status) + opStream, err := tlv.NewStream(statusRecord) + if err != nil { + return err + } + + var b bytes.Buffer + if err := opStream.Encode(&b); err != nil { + return err + } + + // Add the outpoint to our outpoint index with the tlv stream. + if err := opBucket.Put(chanPointBuf.Bytes(), b.Bytes()); err != nil { + return err + } + // First fetch the top level bucket which stores all data related to // current, active channels. openChanBucket, err := tx.CreateTopLevelBucket(openChannelBucket) @@ -851,10 +915,6 @@ func (c *OpenChannel) fullSync(tx kvdb.RwTx) error { // With the bucket for the node fetched, we can now go down another // level, creating the bucket for this channel itself. - var chanPointBuf bytes.Buffer - if err := writeOutpoint(&chanPointBuf, &c.FundingOutpoint); err != nil { - return err - } chanBucket, err := chainBucket.CreateBucket( chanPointBuf.Bytes(), ) @@ -1258,7 +1318,7 @@ func (c *OpenChannel) clearChanStatus(status ChannelStatus) error { return nil } -// putChannel serializes, and stores the current state of the channel in its +// putOpenChannel serializes, and stores the current state of the channel in its // entirety. func putOpenChannel(chanBucket kvdb.RwBucket, channel *OpenChannel) error { // First, we'll write out all the relatively static fields, that are @@ -2772,6 +2832,36 @@ func (c *OpenChannel) CloseChannel(summary *ChannelCloseSummary, return err } + // Fetch the outpoint bucket to see if the outpoint exists or + // not. + opBucket := tx.ReadWriteBucket(outpointBucket) + + // Add the closed outpoint to our outpoint index. This should + // replace an open outpoint in the index. + if opBucket.Get(chanPointBuf.Bytes()) == nil { + return ErrMissingIndexEntry + } + + status := uint8(outpointClosed) + + // Write the IndexStatus of this outpoint as the first entry in a tlv + // stream. + statusRecord := tlv.MakePrimitiveRecord(indexStatusType, &status) + opStream, err := tlv.NewStream(statusRecord) + if err != nil { + return err + } + + var b bytes.Buffer + if err := opStream.Encode(&b); err != nil { + return err + } + + // Finally add the closed outpoint and tlv stream to the index. + if err := opBucket.Put(chanPointBuf.Bytes(), b.Bytes()); err != nil { + return err + } + // Add channel state to the historical channel bucket. historicalBucket, err := tx.CreateTopLevelBucket( historicalChannelBucket, diff --git a/channeldb/db.go b/channeldb/db.go index f2e5669159..d2618e10f6 100644 --- a/channeldb/db.go +++ b/channeldb/db.go @@ -17,6 +17,7 @@ import ( "github.com/lightningnetwork/lnd/channeldb/migration12" "github.com/lightningnetwork/lnd/channeldb/migration13" "github.com/lightningnetwork/lnd/channeldb/migration16" + "github.com/lightningnetwork/lnd/channeldb/migration20" "github.com/lightningnetwork/lnd/channeldb/migration_01_to_11" "github.com/lightningnetwork/lnd/clock" "github.com/lightningnetwork/lnd/lnwire" @@ -170,6 +171,17 @@ var ( number: 18, migration: mig.CreateTLB(peersBucket), }, + { + // Create a top level bucket which holds outpoint + // information. + number: 19, + migration: mig.CreateTLB(outpointBucket), + }, + { + // Migrate some data to the outpoint index. + number: 20, + migration: migration20.MigrateOutpointIndex, + }, } // Big endian is the preferred byte order, due to cursor scans over @@ -309,6 +321,7 @@ var topLevelBuckets = [][]byte{ graphMetaBucket, metaBucket, closeSummaryBucket, + outpointBucket, } // Wipe completely deletes all saved state within all used buckets within the diff --git a/channeldb/migration20/codec.go b/channeldb/migration20/codec.go new file mode 100644 index 0000000000..37481c997b --- /dev/null +++ b/channeldb/migration20/codec.go @@ -0,0 +1,36 @@ +package migration20 + +import ( + "encoding/binary" + "io" + + "github.com/btcsuite/btcd/wire" +) + +var ( + byteOrder = binary.BigEndian +) + +// writeOutpoint writes an outpoint from the passed writer. +func writeOutpoint(w io.Writer, o *wire.OutPoint) error { + if _, err := w.Write(o.Hash[:]); err != nil { + return err + } + if err := binary.Write(w, byteOrder, o.Index); err != nil { + return err + } + + return nil +} + +// readOutpoint reads an outpoint from the passed reader. +func readOutpoint(r io.Reader, o *wire.OutPoint) error { + if _, err := io.ReadFull(r, o.Hash[:]); err != nil { + return err + } + if err := binary.Read(r, byteOrder, &o.Index); err != nil { + return err + } + + return nil +} diff --git a/channeldb/migration20/log.go b/channeldb/migration20/log.go new file mode 100644 index 0000000000..da75760d08 --- /dev/null +++ b/channeldb/migration20/log.go @@ -0,0 +1,14 @@ +package migration20 + +import ( + "github.com/btcsuite/btclog" +) + +// log is a logger that is initialized as disabled. This means the package +// will not perform any logging by default until a logger is set. +var log = btclog.Disabled + +// UseLogger uses a specified Logger to output package logging info. +func UseLogger(logger btclog.Logger) { + log = logger +} diff --git a/channeldb/migration20/migration.go b/channeldb/migration20/migration.go new file mode 100644 index 0000000000..dee53be776 --- /dev/null +++ b/channeldb/migration20/migration.go @@ -0,0 +1,196 @@ +package migration20 + +import ( + "bytes" + "fmt" + + "github.com/btcsuite/btcd/wire" + "github.com/lightningnetwork/lnd/channeldb/kvdb" + "github.com/lightningnetwork/lnd/tlv" +) + +var ( + // openChanBucket stores all the open channel information. + openChanBucket = []byte("open-chan-bucket") + + // closedChannelBucket stores all the closed channel information. + closedChannelBucket = []byte("closed-chan-bucket") + + // outpointBucket is an index mapping outpoints to a tlv + // stream of channel data. + outpointBucket = []byte("outpoint-bucket") +) + +const ( + // A tlv type definition used to serialize an outpoint's indexStatus for + // use in the outpoint index. + indexStatusType tlv.Type = 0 +) + +// indexStatus is an enum-like type that describes what state the +// outpoint is in. Currently only two possible values. +type indexStatus uint8 + +const ( + // outpointOpen represents an outpoint that is open in the outpoint index. + outpointOpen indexStatus = 0 + + // outpointClosed represents an outpoint that is closed in the outpoint + // index. + outpointClosed indexStatus = 1 +) + +// MigrateOutpointIndex populates the outpoint index with outpoints that +// the node already has. This takes every outpoint in the open channel +// bucket and every outpoint in the closed channel bucket and stores them +// in this index. +func MigrateOutpointIndex(tx kvdb.RwTx) error { + log.Info("Migrating to the outpoint index") + + // First get the set of open outpoints. + openList, err := getOpenOutpoints(tx) + if err != nil { + return err + } + + // Then get the set of closed outpoints. + closedList, err := getClosedOutpoints(tx) + if err != nil { + return err + } + + // Get the outpoint bucket which was created in migration 19. + bucket := tx.ReadWriteBucket(outpointBucket) + + // Store the set of open outpoints in the outpoint bucket. + if err := putOutpoints(bucket, openList, false); err != nil { + return err + } + + // Store the set of closed outpoints in the outpoint bucket. + return putOutpoints(bucket, closedList, true) +} + +// getOpenOutpoints traverses through the openChanBucket and returns the +// list of these channels' outpoints. +func getOpenOutpoints(tx kvdb.RwTx) ([]*wire.OutPoint, error) { + var ops []*wire.OutPoint + + openBucket := tx.ReadBucket(openChanBucket) + if openBucket == nil { + return ops, nil + } + + // Iterate through every node and chain bucket to get every open + // outpoint. + // + // The bucket tree: + // openChanBucket -> nodePub -> chainHash -> chanPoint + err := openBucket.ForEach(func(k, v []byte) error { + // Ensure that the key is the same size as a pubkey and the + // value is nil. + if len(k) != 33 || v != nil { + return nil + } + + nodeBucket := openBucket.NestedReadBucket(k) + if nodeBucket == nil { + return nil + } + + return nodeBucket.ForEach(func(k, v []byte) error { + // If there's a value it's not a bucket. + if v != nil { + return nil + } + + chainBucket := nodeBucket.NestedReadBucket(k) + if chainBucket == nil { + return fmt.Errorf("unable to read "+ + "bucket for chain: %x", k) + } + + return chainBucket.ForEach(func(k, v []byte) error { + // If there's a value it's not a bucket. + if v != nil { + return nil + } + + var op wire.OutPoint + r := bytes.NewReader(k) + if err := readOutpoint(r, &op); err != nil { + return err + } + + ops = append(ops, &op) + + return nil + }) + }) + }) + if err != nil { + return nil, err + } + return ops, nil +} + +// getClosedOutpoints traverses through the closedChanBucket and returns +// a list of closed outpoints. +func getClosedOutpoints(tx kvdb.RwTx) ([]*wire.OutPoint, error) { + var ops []*wire.OutPoint + closedBucket := tx.ReadBucket(closedChannelBucket) + if closedBucket == nil { + return ops, nil + } + + // Iterate through every key-value pair to gather all outpoints. + err := closedBucket.ForEach(func(k, v []byte) error { + var op wire.OutPoint + r := bytes.NewReader(k) + if err := readOutpoint(r, &op); err != nil { + return err + } + + ops = append(ops, &op) + + return nil + }) + if err != nil { + return nil, err + } + + return ops, nil +} + +// putOutpoints puts the set of outpoints into the outpoint bucket. +func putOutpoints(bucket kvdb.RwBucket, ops []*wire.OutPoint, isClosed bool) error { + status := uint8(outpointOpen) + if isClosed { + status = uint8(outpointClosed) + } + + record := tlv.MakePrimitiveRecord(indexStatusType, &status) + stream, err := tlv.NewStream(record) + if err != nil { + return err + } + + var b bytes.Buffer + if err := stream.Encode(&b); err != nil { + return err + } + + // Store the set of outpoints with the encoded tlv stream. + for _, op := range ops { + var opBuf bytes.Buffer + if err := writeOutpoint(&opBuf, op); err != nil { + return err + } + + if err := bucket.Put(opBuf.Bytes(), b.Bytes()); err != nil { + return err + } + } + + return nil +}