diff --git a/lnrpc/routerrpc/router_backend.go b/lnrpc/routerrpc/router_backend.go index 9fde63411ff..f363c4e699f 100644 --- a/lnrpc/routerrpc/router_backend.go +++ b/lnrpc/routerrpc/router_backend.go @@ -122,8 +122,9 @@ type MissionControl interface { GetHistorySnapshot() *routing.MissionControlSnapshot // ImportHistory imports the mission control snapshot to our internal - // state. This import will only be applied in-memory, and will not be - // persisted across restarts. + // state. This import will be applied both in-memory and persisted to + // disk, ensuring that the mission control data is available across + // restarts. ImportHistory(snapshot *routing.MissionControlSnapshot, force bool) error // GetPairHistorySnapshot returns the stored history for a given node diff --git a/routing/missioncontrol.go b/routing/missioncontrol.go index 1dceace7c08..86be6efbd92 100644 --- a/routing/missioncontrol.go +++ b/routing/missioncontrol.go @@ -266,8 +266,23 @@ func (m *MissionControl) init() error { m.applyPaymentResult(result) } + mcSnapshots, err := m.store.fetchMCData() + if err != nil { + return err + } + + mcPairsPersisted := 0 + for _, mcSnapshot := range mcSnapshots { + err := m.ImportHistory(mcSnapshot, false) + if err != nil { + return err + } + mcPairsPersisted += len(mcSnapshot.Pairs) + } + log.Debugf("Mission control state reconstruction finished: "+ - "n=%v, time=%v", len(results), time.Since(start)) + "raw_payment_results_count=%v, persisted_mc_pairs_count=%v, "+ + "time=%v", len(results), mcPairsPersisted, time.Since(start)) return nil } @@ -323,6 +338,11 @@ func (m *MissionControl) ResetHistory() error { m.state.resetHistory() + err := m.store.resetMCData() + if err != nil { + return err + } + log.Debugf("Mission control history cleared") return nil @@ -378,6 +398,11 @@ func (m *MissionControl) ImportHistory(history *MissionControlSnapshot, imported := m.state.importSnapshot(history, force) + err := m.store.persistMCData(history) + if err != nil { + return err + } + log.Infof("Imported %v results to mission control", imported) return nil diff --git a/routing/missioncontrol_store.go b/routing/missioncontrol_store.go index e149e854589..5b064600b34 100644 --- a/routing/missioncontrol_store.go +++ b/routing/missioncontrol_store.go @@ -4,6 +4,7 @@ import ( "bytes" "container/list" "encoding/binary" + "encoding/json" "fmt" "math" "sync" @@ -13,6 +14,7 @@ import ( "github.com/lightningnetwork/lnd/channeldb" "github.com/lightningnetwork/lnd/kvdb" "github.com/lightningnetwork/lnd/lnwire" + "github.com/lightningnetwork/lnd/routing/route" ) var ( @@ -20,6 +22,10 @@ var ( // stored. resultsKey = []byte("missioncontrol-results") + mcKey = []byte("missioncontrol") + + FetchMissionControlBatchSize = 1000 + // Big endian is the preferred byte order, due to cursor scans over // integer keys iterating in order. byteOrder = binary.BigEndian @@ -29,6 +35,15 @@ const ( // unknownFailureSourceIdx is the database encoding of an unknown error // source. unknownFailureSourceIdx = -1 + + // PubKeyCompressedSize is the size of a single compressed sec pub key + // in bytes. + PubKeyCompressedSize = 33 + + // PubKeyCompressedSizeDouble is the size of compressed sec pub keys + // for both the source and destination nodes in the mission control + // data pair. + PubKeyCompressedSizeDouble = PubKeyCompressedSize * 2 ) // missionControlStore is a bolt db based implementation of a mission control @@ -494,3 +509,190 @@ func getResultKey(rp *paymentResult) []byte { return keyBytes[:] } + +// persistMCData stores the provided mission control snapshot in the database. +// It creates a top-level bucket if it doesn't exist and inserts key-value pairs +// for each pair of nodes in the snapshot. The keys are formed by concatenating +// the 'From' and 'To' node public keys, and the values are the serialized +// TimedPairResult data. +// +// Params: +// - mc: The mission control snapshot to be persisted. +// +// Returns: +// - error: An error if persisting data to the database fails. +func (b *missionControlStore) persistMCData(mc *MissionControlSnapshot) error { + err := kvdb.Update(b.db, func(tx kvdb.RwTx) error { + mcBucket, err := tx.CreateTopLevelBucket(mcKey) + if err != nil { + return fmt.Errorf("cannot create mission control "+ + "bucket: %w", err) + } + + for _, pair := range mc.Pairs { + // Create the key by concatenating From and To bytes. + key := append([]byte{}, pair.Pair.From[:]...) + key = append(key, pair.Pair.To[:]...) + + // Serialize TimedPairResult data. + data, err := json.Marshal(pair.TimedPairResult) + if err != nil { + return err + } + + // Put the key-value pair into the bucket + // (will override if key exists). + err = mcBucket.Put(key, data) + if err != nil { + return err + } + } + + return nil + }, func() {}) + if err != nil { + return fmt.Errorf( + "Error persisting mission control to disk: %w", err, + ) + } + + return nil +} + +// fetchMCData retrieves all mission control snapshots stored in the database. +// It reads from the mission control bucket and deserializes each key-value pair +// into MissionControlPairSnapshot objects. Snapshots are batched based on the +// FetchMissionControlBatchSize constant. +// +// Returns: +// - []*MissionControlSnapshot: A slice of pointers to MissionControlSnapshot +// objects containing the retrieved data. +// - error: An error if fetching data from the database fails. +func (b *missionControlStore) fetchMCData() ([]*MissionControlSnapshot, error) { + var ( + mcSnapshots []*MissionControlSnapshot + mcData []MissionControlPairSnapshot + ) + + err := kvdb.View(b.db, func(tx kvdb.RTx) error { + mcBucket := tx.ReadBucket(mcKey) + if mcBucket == nil { + return nil + } + + mcSnapshots = make([]*MissionControlSnapshot, 0) + mcData = make([]MissionControlPairSnapshot, 0) + + err := mcBucket.ForEach(func(k, v []byte) error { + mc, err := deserializeMCData(k, v) + if err != nil { + return err + } + + mcData = append(mcData, mc) + + // Check if we have reached the configured batch size. + if len(mcData) == FetchMissionControlBatchSize { + mcSnapshots = append( + mcSnapshots, + &MissionControlSnapshot{Pairs: mcData}, + ) + mcData = make([]MissionControlPairSnapshot, 0) + } + + return nil + }) + if err != nil { + return err + } + + // Add the remaining data if any. + if len(mcData) > 0 { + mcSnapshots = append( + mcSnapshots, + &MissionControlSnapshot{Pairs: mcData}, + ) + } + + return nil + }, func() { + mcSnapshots, mcData = nil, nil + }) + if err != nil { + return nil, err + } + + return mcSnapshots, nil +} + +// resetMCData clears all mission control data from the database. +// It deletes all key-value pairs in the mission control bucket. +// +// Returns: +// - error: An error if resetting the mission control data fails. +func (b *missionControlStore) resetMCData() error { + err := kvdb.Update(b.db, func(tx kvdb.RwTx) error { + mcBucket := tx.ReadWriteBucket(mcKey) + if mcBucket == nil { + return nil + } + + // Delete all key-value pairs in the mission control bucket. + err := mcBucket.ForEach(func(k, v []byte) error { + return mcBucket.Delete(k) + }) + if err != nil { + return fmt.Errorf("failed to delete mission control "+ + "data: %w", err) + } + + return nil + }, func() {}) + if err != nil { + return fmt.Errorf("error resetting mission control "+ + "data: %v", err) + } + + return nil +} + +// deserializeMCData deserializes the provided key and value bytes into a +// MissionControlPairSnapshot object. The key is expected to be a concatenation +// of two 33-byte public keys representing the 'From' and 'To' nodes, and the +// value is the serialized TimedPairResult data. +// +// Params: +// - k: The key bytes representing the 'From' and 'To' nodes. +// - v: The value bytes representing the serialized TimedPairResult data. +// +// Returns: +// - MissionControlPairSnapshot: The deserialized mission control pair snapshot. +// - error: An error if deserialization fails or the key length is invalid. +func deserializeMCData(k, v []byte) (MissionControlPairSnapshot, error) { + // Assuming the From and To are each 33 bytes. + if len(k) != PubKeyCompressedSizeDouble { + return MissionControlPairSnapshot{}, fmt.Errorf("invalid key "+ + "length: expected %d, got %d", + PubKeyCompressedSizeDouble, len(k)) + } + + // Split the key into From and To. + from := route.Vertex(k[:PubKeyCompressedSize]) + to := route.Vertex(k[PubKeyCompressedSize:]) + + // Deserialize the value into TimedPairResult. + var timedPairResult TimedPairResult + err := json.Unmarshal(v, &timedPairResult) + if err != nil { + return MissionControlPairSnapshot{}, fmt.Errorf("error "+ + "deserializing TimedPairResult: %w", err) + } + + return MissionControlPairSnapshot{ + Pair: DirectedNodePair{ + From: from, + To: to, + }, + TimedPairResult: timedPairResult, + }, nil +}