Skip to content

Commit

Permalink
Merge pull request lightningnetwork#4857 from cfromknecht/meta-index-…
Browse files Browse the repository at this point in the history
…reinit

channeldb+tlv: add channel meta-index
  • Loading branch information
Roasbeef authored Dec 11, 2020
2 parents 415680a + f4593f9 commit 62a5cdb
Show file tree
Hide file tree
Showing 7 changed files with 470 additions and 16 deletions.
100 changes: 95 additions & 5 deletions channeldb/channel.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"github.com/lightningnetwork/lnd/keychain"
"github.com/lightningnetwork/lnd/lnwire"
"github.com/lightningnetwork/lnd/shachain"
"github.com/lightningnetwork/lnd/tlv"
)

const (
Expand All @@ -45,6 +46,13 @@ var (
// TODO(roasbeef): flesh out comment
openChannelBucket = []byte("open-chan-bucket")

// outpointBucket stores all of our channel outpoints and a tlv
// stream containing channel data.
//
// outpoint -> tlv stream
//
outpointBucket = []byte("outpoint-bucket")

// historicalChannelBucket stores all channels that have seen their
// commitment tx confirm. All information from their previous open state
// is retained.
Expand Down Expand Up @@ -167,12 +175,35 @@ var (
// the height requested in the revocation log.
ErrLogEntryNotFound = fmt.Errorf("log entry not found")

// ErrMissingIndexEntry is returned when a caller attempts to close a
// channel and the outpoint is missing from the index.
ErrMissingIndexEntry = fmt.Errorf("missing outpoint from index")

// errHeightNotFound is returned when a query for channel balances at
// a height that we have not reached yet is made.
errHeightNotReached = fmt.Errorf("height requested greater than " +
"current commit height")
)

const (
// A tlv type definition used to serialize an outpoint's indexStatus
// for use in the outpoint index.
indexStatusType tlv.Type = 0
)

// indexStatus is an enum-like type that describes what state the
// outpoint is in. Currently only two possible values.
type indexStatus uint8

const (
// outpointOpen represents an outpoint that is open in the outpoint index.
outpointOpen indexStatus = 0

// outpointClosed represents an outpoint that is closed in the outpoint
// index.
outpointClosed indexStatus = 1
)

// ChannelType is an enum-like type that describes one of several possible
// channel types. Each open channel is associated with a particular type as the
// channel type may determine how higher level operations are conducted such as
Expand Down Expand Up @@ -827,6 +858,39 @@ func fetchChanBucketRw(tx kvdb.RwTx, nodeKey *btcec.PublicKey, // nolint:interfa
// fullSync syncs the contents of an OpenChannel while re-using an existing
// database transaction.
func (c *OpenChannel) fullSync(tx kvdb.RwTx) error {
// Fetch the outpoint bucket and check if the outpoint already exists.
opBucket := tx.ReadWriteBucket(outpointBucket)

var chanPointBuf bytes.Buffer
if err := writeOutpoint(&chanPointBuf, &c.FundingOutpoint); err != nil {
return err
}

// Now, check if the outpoint exists in our index.
if opBucket.Get(chanPointBuf.Bytes()) != nil {
return ErrChanAlreadyExists
}

status := uint8(outpointOpen)

// Write the status of this outpoint as the first entry in a tlv
// stream.
statusRecord := tlv.MakePrimitiveRecord(indexStatusType, &status)
opStream, err := tlv.NewStream(statusRecord)
if err != nil {
return err
}

var b bytes.Buffer
if err := opStream.Encode(&b); err != nil {
return err
}

// Add the outpoint to our outpoint index with the tlv stream.
if err := opBucket.Put(chanPointBuf.Bytes(), b.Bytes()); err != nil {
return err
}

// First fetch the top level bucket which stores all data related to
// current, active channels.
openChanBucket, err := tx.CreateTopLevelBucket(openChannelBucket)
Expand All @@ -851,10 +915,6 @@ func (c *OpenChannel) fullSync(tx kvdb.RwTx) error {

// With the bucket for the node fetched, we can now go down another
// level, creating the bucket for this channel itself.
var chanPointBuf bytes.Buffer
if err := writeOutpoint(&chanPointBuf, &c.FundingOutpoint); err != nil {
return err
}
chanBucket, err := chainBucket.CreateBucket(
chanPointBuf.Bytes(),
)
Expand Down Expand Up @@ -1258,7 +1318,7 @@ func (c *OpenChannel) clearChanStatus(status ChannelStatus) error {
return nil
}

// putChannel serializes, and stores the current state of the channel in its
// putOpenChannel serializes, and stores the current state of the channel in its
// entirety.
func putOpenChannel(chanBucket kvdb.RwBucket, channel *OpenChannel) error {
// First, we'll write out all the relatively static fields, that are
Expand Down Expand Up @@ -2772,6 +2832,36 @@ func (c *OpenChannel) CloseChannel(summary *ChannelCloseSummary,
return err
}

// Fetch the outpoint bucket to see if the outpoint exists or
// not.
opBucket := tx.ReadWriteBucket(outpointBucket)

// Add the closed outpoint to our outpoint index. This should
// replace an open outpoint in the index.
if opBucket.Get(chanPointBuf.Bytes()) == nil {
return ErrMissingIndexEntry
}

status := uint8(outpointClosed)

// Write the IndexStatus of this outpoint as the first entry in a tlv
// stream.
statusRecord := tlv.MakePrimitiveRecord(indexStatusType, &status)
opStream, err := tlv.NewStream(statusRecord)
if err != nil {
return err
}

var b bytes.Buffer
if err := opStream.Encode(&b); err != nil {
return err
}

// Finally add the closed outpoint and tlv stream to the index.
if err := opBucket.Put(chanPointBuf.Bytes(), b.Bytes()); err != nil {
return err
}

// Add channel state to the historical channel bucket.
historicalBucket, err := tx.CreateTopLevelBucket(
historicalChannelBucket,
Expand Down
20 changes: 19 additions & 1 deletion channeldb/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"github.com/lightningnetwork/lnd/channeldb/migration12"
"github.com/lightningnetwork/lnd/channeldb/migration13"
"github.com/lightningnetwork/lnd/channeldb/migration16"
"github.com/lightningnetwork/lnd/channeldb/migration20"
"github.com/lightningnetwork/lnd/channeldb/migration_01_to_11"
"github.com/lightningnetwork/lnd/clock"
"github.com/lightningnetwork/lnd/lnwire"
Expand Down Expand Up @@ -170,6 +171,17 @@ var (
number: 18,
migration: mig.CreateTLB(peersBucket),
},
{
// Create a top level bucket which holds outpoint
// information.
number: 19,
migration: mig.CreateTLB(outpointBucket),
},
{
// Migrate some data to the outpoint index.
number: 20,
migration: migration20.MigrateOutpointIndex,
},
}

// Big endian is the preferred byte order, due to cursor scans over
Expand Down Expand Up @@ -309,13 +321,14 @@ var topLevelBuckets = [][]byte{
graphMetaBucket,
metaBucket,
closeSummaryBucket,
outpointBucket,
}

// Wipe completely deletes all saved state within all used buckets within the
// database. The deletion is done in a single transaction, therefore this
// operation is fully atomic.
func (d *DB) Wipe() error {
return kvdb.Update(d, func(tx kvdb.RwTx) error {
err := kvdb.Update(d, func(tx kvdb.RwTx) error {
for _, tlb := range topLevelBuckets {
err := tx.DeleteTopLevelBucket(tlb)
if err != nil && err != kvdb.ErrBucketNotFound {
Expand All @@ -324,6 +337,11 @@ func (d *DB) Wipe() error {
}
return nil
}, func() {})
if err != nil {
return err
}

return initChannelDB(d.Backend)
}

// createChannelDB creates and initializes a fresh version of channeldb. In
Expand Down
18 changes: 8 additions & 10 deletions channeldb/db_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"github.com/lightningnetwork/lnd/keychain"
"github.com/lightningnetwork/lnd/lnwire"
"github.com/lightningnetwork/lnd/shachain"
"github.com/stretchr/testify/require"
)

func TestOpenWithCreate(t *testing.T) {
Expand Down Expand Up @@ -96,16 +97,13 @@ func TestWipe(t *testing.T) {
t.Fatalf("unable to wipe channeldb: %v", err)
}
// Check correct errors are returned
_, err = cdb.FetchAllOpenChannels()
if err != ErrNoActiveChannels {
t.Fatalf("fetching open channels: expected '%v' instead got '%v'",
ErrNoActiveChannels, err)
}
_, err = cdb.FetchClosedChannels(false)
if err != ErrNoClosedChannels {
t.Fatalf("fetching closed channels: expected '%v' instead got '%v'",
ErrNoClosedChannels, err)
}
openChannels, err := cdb.FetchAllOpenChannels()
require.NoError(t, err, "fetching open channels")
require.Equal(t, 0, len(openChannels))

closedChannels, err := cdb.FetchClosedChannels(false)
require.NoError(t, err, "fetching closed channels")
require.Equal(t, 0, len(closedChannels))
}

// TestFetchClosedChannelForID tests that we are able to properly retrieve a
Expand Down
36 changes: 36 additions & 0 deletions channeldb/migration20/codec.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
package migration20

import (
"encoding/binary"
"io"

"github.com/btcsuite/btcd/wire"
)

var (
byteOrder = binary.BigEndian
)

// writeOutpoint writes an outpoint from the passed writer.
func writeOutpoint(w io.Writer, o *wire.OutPoint) error {
if _, err := w.Write(o.Hash[:]); err != nil {
return err
}
if err := binary.Write(w, byteOrder, o.Index); err != nil {
return err
}

return nil
}

// readOutpoint reads an outpoint from the passed reader.
func readOutpoint(r io.Reader, o *wire.OutPoint) error {
if _, err := io.ReadFull(r, o.Hash[:]); err != nil {
return err
}
if err := binary.Read(r, byteOrder, &o.Index); err != nil {
return err
}

return nil
}
14 changes: 14 additions & 0 deletions channeldb/migration20/log.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
package migration20

import (
"github.com/btcsuite/btclog"
)

// log is a logger that is initialized as disabled. This means the package
// will not perform any logging by default until a logger is set.
var log = btclog.Disabled

// UseLogger uses a specified Logger to output package logging info.
func UseLogger(logger btclog.Logger) {
log = logger
}
Loading

0 comments on commit 62a5cdb

Please sign in to comment.