Skip to content

Commit

Permalink
routerrpc+routing: persist & reset MC data on disk
Browse files Browse the repository at this point in the history
Signed-off-by: Mohamed Awnallah <[email protected]>
  • Loading branch information
mohamedawnallah committed Jul 7, 2024
1 parent d7e0f69 commit b567967
Show file tree
Hide file tree
Showing 3 changed files with 235 additions and 3 deletions.
5 changes: 3 additions & 2 deletions lnrpc/routerrpc/router_backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,8 +122,9 @@ type MissionControl interface {
GetHistorySnapshot() *routing.MissionControlSnapshot

// ImportHistory imports the mission control snapshot to our internal
// state. This import will only be applied in-memory, and will not be
// persisted across restarts.
// state. This import will be applied both in-memory and persisted to
// disk, ensuring that the mission control data is available across
// restarts.
ImportHistory(snapshot *routing.MissionControlSnapshot, force bool) error

// GetPairHistorySnapshot returns the stored history for a given node
Expand Down
27 changes: 26 additions & 1 deletion routing/missioncontrol.go
Original file line number Diff line number Diff line change
Expand Up @@ -266,8 +266,23 @@ func (m *MissionControl) init() error {
m.applyPaymentResult(result)
}

mcSnapshots, err := m.store.fetchMCData()
if err != nil {
return err
}

mcPairsPersisted := 0
for _, mcSnapshot := range mcSnapshots {
err := m.ImportHistory(mcSnapshot, false)
if err != nil {
return err
}
mcPairsPersisted += len(mcSnapshot.Pairs)
}

log.Debugf("Mission control state reconstruction finished: "+
"n=%v, time=%v", len(results), time.Since(start))
"raw_payment_results_count=%v, persisted_mc_pairs_count=%v, "+
"time=%v", len(results), mcPairsPersisted, time.Since(start))

return nil
}
Expand Down Expand Up @@ -323,6 +338,11 @@ func (m *MissionControl) ResetHistory() error {

m.state.resetHistory()

err := m.store.resetMCData()
if err != nil {
return err
}

log.Debugf("Mission control history cleared")

return nil
Expand Down Expand Up @@ -378,6 +398,11 @@ func (m *MissionControl) ImportHistory(history *MissionControlSnapshot,

imported := m.state.importSnapshot(history, force)

err := m.store.persistMCData(history)
if err != nil {
return err
}

log.Infof("Imported %v results to mission control", imported)

return nil
Expand Down
206 changes: 206 additions & 0 deletions routing/missioncontrol_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"bytes"
"container/list"
"encoding/binary"
"encoding/json"
"fmt"
"math"
"sync"
Expand All @@ -13,13 +14,22 @@ import (
"github.com/lightningnetwork/lnd/channeldb"
"github.com/lightningnetwork/lnd/kvdb"
"github.com/lightningnetwork/lnd/lnwire"
"github.com/lightningnetwork/lnd/routing/route"
)

var (
// resultsKey is the fixed key under which the attempt results are
// stored.
resultsKey = []byte("missioncontrol-results")

// mcKey is the fixed key under which the mission control data is
// stored.
mcKey = []byte("missioncontrol")

// FetchMissionControlBatchSize is the number of mission control
// entries to be fetched in a single batch.
FetchMissionControlBatchSize = 1000

// Big endian is the preferred byte order, due to cursor scans over
// integer keys iterating in order.
byteOrder = binary.BigEndian
Expand All @@ -29,6 +39,15 @@ const (
// unknownFailureSourceIdx is the database encoding of an unknown error
// source.
unknownFailureSourceIdx = -1

// PubKeyCompressedSize is the size of a single compressed sec pub key
// in bytes.
PubKeyCompressedSize = 33

// PubKeyCompressedSizeDouble is the size of compressed sec pub keys
// for both the source and destination nodes in the mission control
// data pair.
PubKeyCompressedSizeDouble = PubKeyCompressedSize * 2
)

// missionControlStore is a bolt db based implementation of a mission control
Expand Down Expand Up @@ -494,3 +513,190 @@ func getResultKey(rp *paymentResult) []byte {

return keyBytes[:]
}

// persistMCData stores the provided mission control snapshot in the database.
// It creates a top-level bucket if it doesn't exist and inserts key-value pairs
// for each pair of nodes in the snapshot. The keys are formed by concatenating
// the 'From' and 'To' node public keys, and the values are the serialized
// TimedPairResult data.
//
// Params:
// - mc: The mission control snapshot to be persisted.
//
// Returns:
// - error: An error if persisting data to the database fails.
func (b *missionControlStore) persistMCData(mc *MissionControlSnapshot) error {
err := kvdb.Update(b.db, func(tx kvdb.RwTx) error {
mcBucket, err := tx.CreateTopLevelBucket(mcKey)
if err != nil {
return fmt.Errorf("cannot create mission control "+
"bucket: %w", err)
}

for _, pair := range mc.Pairs {
// Create the key by concatenating From and To bytes.
key := append([]byte{}, pair.Pair.From[:]...)
key = append(key, pair.Pair.To[:]...)

// Serialize TimedPairResult data.
data, err := json.Marshal(pair.TimedPairResult)
if err != nil {
return err
}

// Put the key-value pair into the bucket
// (will override if key exists).
err = mcBucket.Put(key, data)
if err != nil {
return err
}
}

return nil
}, func() {})
if err != nil {
return fmt.Errorf(
"Error persisting mission control to disk: %w", err,
)
}

return nil
}

// fetchMCData retrieves all mission control snapshots stored in the database.
// It reads from the mission control bucket and deserializes each key-value pair
// into MissionControlPairSnapshot objects. Snapshots are batched based on the
// FetchMissionControlBatchSize constant.
//
// Returns:
// - []*MissionControlSnapshot: A slice of pointers to MissionControlSnapshot
// objects containing the retrieved data.
// - error: An error if fetching data from the database fails.
func (b *missionControlStore) fetchMCData() ([]*MissionControlSnapshot, error) {
var (
mcSnapshots []*MissionControlSnapshot
mcData []MissionControlPairSnapshot
)

err := kvdb.View(b.db, func(tx kvdb.RTx) error {
mcBucket := tx.ReadBucket(mcKey)
if mcBucket == nil {
return nil
}

mcSnapshots = make([]*MissionControlSnapshot, 0)
mcData = make([]MissionControlPairSnapshot, 0)

err := mcBucket.ForEach(func(k, v []byte) error {
mc, err := deserializeMCData(k, v)
if err != nil {
return err
}

mcData = append(mcData, mc)

// Check if we have reached the configured batch size.
if len(mcData) == FetchMissionControlBatchSize {
mcSnapshots = append(
mcSnapshots,
&MissionControlSnapshot{Pairs: mcData},
)
mcData = make([]MissionControlPairSnapshot, 0)
}

return nil
})
if err != nil {
return err
}

// Add the remaining data if any.
if len(mcData) > 0 {
mcSnapshots = append(
mcSnapshots,
&MissionControlSnapshot{Pairs: mcData},
)
}

return nil
}, func() {
mcSnapshots, mcData = nil, nil
})
if err != nil {
return nil, err
}

return mcSnapshots, nil
}

// resetMCData clears all mission control data from the database.
// It deletes all key-value pairs in the mission control bucket.
//
// Returns:
// - error: An error if resetting the mission control data fails.
func (b *missionControlStore) resetMCData() error {
err := kvdb.Update(b.db, func(tx kvdb.RwTx) error {
mcBucket := tx.ReadWriteBucket(mcKey)
if mcBucket == nil {
return nil
}

// Delete all key-value pairs in the mission control bucket.
err := mcBucket.ForEach(func(k, v []byte) error {
return mcBucket.Delete(k)
})
if err != nil {
return fmt.Errorf("failed to delete mission control "+
"data: %w", err)
}

return nil
}, func() {})
if err != nil {
return fmt.Errorf("error resetting mission control "+
"data: %v", err)
}

return nil
}

// deserializeMCData deserializes the provided key and value bytes into a
// MissionControlPairSnapshot object. The key is expected to be a concatenation
// of two 33-byte public keys representing the 'From' and 'To' nodes, and the
// value is the serialized TimedPairResult data.
//
// Params:
// - k: The key bytes representing the 'From' and 'To' nodes.
// - v: The value bytes representing the serialized TimedPairResult data.
//
// Returns:
// - MissionControlPairSnapshot: The deserialized mission control pair snapshot.
// - error: An error if deserialization fails or the key length is invalid.
func deserializeMCData(k, v []byte) (MissionControlPairSnapshot, error) {
// Assuming the From and To are each 33 bytes.
if len(k) != PubKeyCompressedSizeDouble {
return MissionControlPairSnapshot{}, fmt.Errorf("invalid key "+
"length: expected %d, got %d",
PubKeyCompressedSizeDouble, len(k))
}

// Split the key into From and To.
from := route.Vertex(k[:PubKeyCompressedSize])
to := route.Vertex(k[PubKeyCompressedSize:])

// Deserialize the value into TimedPairResult.
var timedPairResult TimedPairResult
err := json.Unmarshal(v, &timedPairResult)
if err != nil {
return MissionControlPairSnapshot{}, fmt.Errorf("error "+
"deserializing TimedPairResult: %w", err)
}

return MissionControlPairSnapshot{
Pair: DirectedNodePair{
From: from,
To: to,
},
TimedPairResult: timedPairResult,
}, nil
}

0 comments on commit b567967

Please sign in to comment.