From b567967458625c1bdd1e2d2dda6f2c3d9a3dd31e Mon Sep 17 00:00:00 2001 From: Mohamed Awnallah Date: Sun, 7 Jul 2024 17:58:57 +0300 Subject: [PATCH 1/2] routerrpc+routing: persist & reset MC data on disk Signed-off-by: Mohamed Awnallah --- lnrpc/routerrpc/router_backend.go | 5 +- routing/missioncontrol.go | 27 +++- routing/missioncontrol_store.go | 206 ++++++++++++++++++++++++++++++ 3 files changed, 235 insertions(+), 3 deletions(-) diff --git a/lnrpc/routerrpc/router_backend.go b/lnrpc/routerrpc/router_backend.go index 9fde63411f..f363c4e699 100644 --- a/lnrpc/routerrpc/router_backend.go +++ b/lnrpc/routerrpc/router_backend.go @@ -122,8 +122,9 @@ type MissionControl interface { GetHistorySnapshot() *routing.MissionControlSnapshot // ImportHistory imports the mission control snapshot to our internal - // state. This import will only be applied in-memory, and will not be - // persisted across restarts. + // state. This import will be applied both in-memory and persisted to + // disk, ensuring that the mission control data is available across + // restarts. ImportHistory(snapshot *routing.MissionControlSnapshot, force bool) error // GetPairHistorySnapshot returns the stored history for a given node diff --git a/routing/missioncontrol.go b/routing/missioncontrol.go index 1dceace7c0..86be6efbd9 100644 --- a/routing/missioncontrol.go +++ b/routing/missioncontrol.go @@ -266,8 +266,23 @@ func (m *MissionControl) init() error { m.applyPaymentResult(result) } + mcSnapshots, err := m.store.fetchMCData() + if err != nil { + return err + } + + mcPairsPersisted := 0 + for _, mcSnapshot := range mcSnapshots { + err := m.ImportHistory(mcSnapshot, false) + if err != nil { + return err + } + mcPairsPersisted += len(mcSnapshot.Pairs) + } + log.Debugf("Mission control state reconstruction finished: "+ - "n=%v, time=%v", len(results), time.Since(start)) + "raw_payment_results_count=%v, persisted_mc_pairs_count=%v, "+ + "time=%v", len(results), mcPairsPersisted, time.Since(start)) return nil } @@ -323,6 +338,11 @@ func (m *MissionControl) ResetHistory() error { m.state.resetHistory() + err := m.store.resetMCData() + if err != nil { + return err + } + log.Debugf("Mission control history cleared") return nil @@ -378,6 +398,11 @@ func (m *MissionControl) ImportHistory(history *MissionControlSnapshot, imported := m.state.importSnapshot(history, force) + err := m.store.persistMCData(history) + if err != nil { + return err + } + log.Infof("Imported %v results to mission control", imported) return nil diff --git a/routing/missioncontrol_store.go b/routing/missioncontrol_store.go index e149e85458..e2d6a6e01e 100644 --- a/routing/missioncontrol_store.go +++ b/routing/missioncontrol_store.go @@ -4,6 +4,7 @@ import ( "bytes" "container/list" "encoding/binary" + "encoding/json" "fmt" "math" "sync" @@ -13,6 +14,7 @@ import ( "github.com/lightningnetwork/lnd/channeldb" "github.com/lightningnetwork/lnd/kvdb" "github.com/lightningnetwork/lnd/lnwire" + "github.com/lightningnetwork/lnd/routing/route" ) var ( @@ -20,6 +22,14 @@ var ( // stored. resultsKey = []byte("missioncontrol-results") + // mcKey is the fixed key under which the mission control data is + // stored. + mcKey = []byte("missioncontrol") + + // FetchMissionControlBatchSize is the number of mission control + // entries to be fetched in a single batch. + FetchMissionControlBatchSize = 1000 + // Big endian is the preferred byte order, due to cursor scans over // integer keys iterating in order. byteOrder = binary.BigEndian @@ -29,6 +39,15 @@ const ( // unknownFailureSourceIdx is the database encoding of an unknown error // source. unknownFailureSourceIdx = -1 + + // PubKeyCompressedSize is the size of a single compressed sec pub key + // in bytes. + PubKeyCompressedSize = 33 + + // PubKeyCompressedSizeDouble is the size of compressed sec pub keys + // for both the source and destination nodes in the mission control + // data pair. + PubKeyCompressedSizeDouble = PubKeyCompressedSize * 2 ) // missionControlStore is a bolt db based implementation of a mission control @@ -494,3 +513,190 @@ func getResultKey(rp *paymentResult) []byte { return keyBytes[:] } + +// persistMCData stores the provided mission control snapshot in the database. +// It creates a top-level bucket if it doesn't exist and inserts key-value pairs +// for each pair of nodes in the snapshot. The keys are formed by concatenating +// the 'From' and 'To' node public keys, and the values are the serialized +// TimedPairResult data. +// +// Params: +// - mc: The mission control snapshot to be persisted. +// +// Returns: +// - error: An error if persisting data to the database fails. +func (b *missionControlStore) persistMCData(mc *MissionControlSnapshot) error { + err := kvdb.Update(b.db, func(tx kvdb.RwTx) error { + mcBucket, err := tx.CreateTopLevelBucket(mcKey) + if err != nil { + return fmt.Errorf("cannot create mission control "+ + "bucket: %w", err) + } + + for _, pair := range mc.Pairs { + // Create the key by concatenating From and To bytes. + key := append([]byte{}, pair.Pair.From[:]...) + key = append(key, pair.Pair.To[:]...) + + // Serialize TimedPairResult data. + data, err := json.Marshal(pair.TimedPairResult) + if err != nil { + return err + } + + // Put the key-value pair into the bucket + // (will override if key exists). + err = mcBucket.Put(key, data) + if err != nil { + return err + } + } + + return nil + }, func() {}) + if err != nil { + return fmt.Errorf( + "Error persisting mission control to disk: %w", err, + ) + } + + return nil +} + +// fetchMCData retrieves all mission control snapshots stored in the database. +// It reads from the mission control bucket and deserializes each key-value pair +// into MissionControlPairSnapshot objects. Snapshots are batched based on the +// FetchMissionControlBatchSize constant. +// +// Returns: +// - []*MissionControlSnapshot: A slice of pointers to MissionControlSnapshot +// objects containing the retrieved data. +// - error: An error if fetching data from the database fails. +func (b *missionControlStore) fetchMCData() ([]*MissionControlSnapshot, error) { + var ( + mcSnapshots []*MissionControlSnapshot + mcData []MissionControlPairSnapshot + ) + + err := kvdb.View(b.db, func(tx kvdb.RTx) error { + mcBucket := tx.ReadBucket(mcKey) + if mcBucket == nil { + return nil + } + + mcSnapshots = make([]*MissionControlSnapshot, 0) + mcData = make([]MissionControlPairSnapshot, 0) + + err := mcBucket.ForEach(func(k, v []byte) error { + mc, err := deserializeMCData(k, v) + if err != nil { + return err + } + + mcData = append(mcData, mc) + + // Check if we have reached the configured batch size. + if len(mcData) == FetchMissionControlBatchSize { + mcSnapshots = append( + mcSnapshots, + &MissionControlSnapshot{Pairs: mcData}, + ) + mcData = make([]MissionControlPairSnapshot, 0) + } + + return nil + }) + if err != nil { + return err + } + + // Add the remaining data if any. + if len(mcData) > 0 { + mcSnapshots = append( + mcSnapshots, + &MissionControlSnapshot{Pairs: mcData}, + ) + } + + return nil + }, func() { + mcSnapshots, mcData = nil, nil + }) + if err != nil { + return nil, err + } + + return mcSnapshots, nil +} + +// resetMCData clears all mission control data from the database. +// It deletes all key-value pairs in the mission control bucket. +// +// Returns: +// - error: An error if resetting the mission control data fails. +func (b *missionControlStore) resetMCData() error { + err := kvdb.Update(b.db, func(tx kvdb.RwTx) error { + mcBucket := tx.ReadWriteBucket(mcKey) + if mcBucket == nil { + return nil + } + + // Delete all key-value pairs in the mission control bucket. + err := mcBucket.ForEach(func(k, v []byte) error { + return mcBucket.Delete(k) + }) + if err != nil { + return fmt.Errorf("failed to delete mission control "+ + "data: %w", err) + } + + return nil + }, func() {}) + if err != nil { + return fmt.Errorf("error resetting mission control "+ + "data: %v", err) + } + + return nil +} + +// deserializeMCData deserializes the provided key and value bytes into a +// MissionControlPairSnapshot object. The key is expected to be a concatenation +// of two 33-byte public keys representing the 'From' and 'To' nodes, and the +// value is the serialized TimedPairResult data. +// +// Params: +// - k: The key bytes representing the 'From' and 'To' nodes. +// - v: The value bytes representing the serialized TimedPairResult data. +// +// Returns: +// - MissionControlPairSnapshot: The deserialized mission control pair snapshot. +// - error: An error if deserialization fails or the key length is invalid. +func deserializeMCData(k, v []byte) (MissionControlPairSnapshot, error) { + // Assuming the From and To are each 33 bytes. + if len(k) != PubKeyCompressedSizeDouble { + return MissionControlPairSnapshot{}, fmt.Errorf("invalid key "+ + "length: expected %d, got %d", + PubKeyCompressedSizeDouble, len(k)) + } + + // Split the key into From and To. + from := route.Vertex(k[:PubKeyCompressedSize]) + to := route.Vertex(k[PubKeyCompressedSize:]) + + // Deserialize the value into TimedPairResult. + var timedPairResult TimedPairResult + err := json.Unmarshal(v, &timedPairResult) + if err != nil { + return MissionControlPairSnapshot{}, fmt.Errorf("error "+ + "deserializing TimedPairResult: %w", err) + } + + return MissionControlPairSnapshot{ + Pair: DirectedNodePair{ + From: from, + To: to, + }, + TimedPairResult: timedPairResult, + }, nil +} From 4ef07530beddc6ebc20e4d0e17f2a39635c9191e Mon Sep 17 00:00:00 2001 From: Mohamed Awnallah Date: Sun, 7 Jul 2024 18:01:22 +0300 Subject: [PATCH 2/2] routing: test persist/reset of MC data on disk Signed-off-by: Mohamed Awnallah --- routing/missioncontrol_store_test.go | 172 +++++++++++++++++++++++++++ 1 file changed, 172 insertions(+) diff --git a/routing/missioncontrol_store_test.go b/routing/missioncontrol_store_test.go index 34b925a3e1..65254c42a3 100644 --- a/routing/missioncontrol_store_test.go +++ b/routing/missioncontrol_store_test.go @@ -1,6 +1,7 @@ package routing import ( + "encoding/json" "fmt" "os" "testing" @@ -293,3 +294,174 @@ func BenchmarkMissionControlStoreFlushing(b *testing.B) { }) } } + +// timeToUnix converts a time.Time value to its Unix time representation. +func timeToUnix(t time.Time) int64 { + return t.Unix() +} + +// timedPairResultsAreEqual compares two TimedPairResult structs for equality. +// It returns true if the FailTime, FailAmt, SuccessTime, and SuccessAmt fields +// of both TimedPairResult structs are equal. The comparison of FailTime and +// SuccessTime is done by converting them to Unix time to avoid issues with +// internal representation differences. +// +// Parameters: +// - tpr1: The first TimedPairResult to compare. +// - tpr2: The second TimedPairResult to compare. +// +// Returns: +// - bool: true if both TimedPairResult structs are equal, false otherwise. +func timedPairResultsAreEqual(tpr1, tpr2 TimedPairResult) bool { + return timeToUnix(tpr1.FailTime) == timeToUnix(tpr2.FailTime) && + tpr1.FailAmt == tpr2.FailAmt && + timeToUnix(tpr1.SuccessTime) == timeToUnix(tpr2.SuccessTime) && + tpr1.SuccessAmt == tpr2.SuccessAmt +} + +// TestPersistMCData verifies that the persistMCData function correctly +// stores a given MissionControlSnapshot into the database and retrieves it +// accurately using fetchMCData. +// +// It performs the following steps: +// 1. Creates a test harness for the mission control store. +// 2. Prepares a sample MissionControlSnapshot with timed pair results. +// 3. Persists the snapshot using persistMCData. +// 4. Fetches the stored data using fetchMCData. +// 5. Verifies that the fetched data matches the original snapshot. +func TestPersistMCData(t *testing.T) { + h := newMCStoreTestHarness(t, testMaxRecords, time.Second) + store := h.store + + // Prepare a sample mission control snapshot. + snapshot := &MissionControlSnapshot{ + Pairs: []MissionControlPairSnapshot{ + { + Pair: DirectedNodePair{ + From: route.Vertex{1}, + To: route.Vertex{2}, + }, + TimedPairResult: TimedPairResult{ + SuccessTime: time.Now().Add(-time.Hour), + SuccessAmt: lnwire.MilliSatoshi(1500), + }, + }, + { + Pair: DirectedNodePair{ + From: route.Vertex{3}, + To: route.Vertex{4}, + }, + TimedPairResult: TimedPairResult{ + FailTime: time.Now().Add(-time.Hour), + FailAmt: lnwire.MilliSatoshi(3000), + }, + }, + }, + } + + // Persist the mission control snapshot. + err := store.persistMCData(snapshot) + require.NoError(t, err) + + // Fetch the data to verify. + mcSnapshots, err := store.fetchMCData() + require.NoError(t, err) + require.Len(t, mcSnapshots, 1) + require.Len(t, mcSnapshots[0].Pairs, 2) + require.Equal(t, snapshot.Pairs[0].Pair, mcSnapshots[0].Pairs[0].Pair) + require.Equal(t, snapshot.Pairs[1].Pair, mcSnapshots[0].Pairs[1].Pair) + require.True( + t, timedPairResultsAreEqual( + snapshot.Pairs[0].TimedPairResult, + mcSnapshots[0].Pairs[0].TimedPairResult, + ), + ) + require.True( + t, timedPairResultsAreEqual( + snapshot.Pairs[1].TimedPairResult, + mcSnapshots[0].Pairs[1].TimedPairResult, + ), + ) +} + +// TestResetMCData verifies that the resetMCData function correctly +// clears all mission control data from the database. +// +// It performs the following steps: +// 1. Creates a test harness for the mission control store. +// 2. Prepares and persists a sample MissionControlSnapshot. +// 3. Calls resetMCData to clear the mission control data. +// 4. Fetches the data using fetchMCData to verify that it has been reset. +func TestResetMCData(t *testing.T) { + h := newMCStoreTestHarness(t, testMaxRecords, time.Second) + store := h.store + + // Prepare a sample mission control snapshot. + snapshot := &MissionControlSnapshot{ + Pairs: []MissionControlPairSnapshot{ + { + Pair: DirectedNodePair{ + From: route.Vertex{1}, + To: route.Vertex{2}, + }, + TimedPairResult: TimedPairResult{ + SuccessTime: time.Now().Add(-time.Hour), + SuccessAmt: lnwire.MilliSatoshi(2000), + }, + }, + }, + } + + // Persist the mission control snapshot. + err := store.persistMCData(snapshot) + require.NoError(t, err) + + // Reset the mission control data. + err = store.resetMCData() + require.NoError(t, err) + + // Fetch the data to verify it has been reset. + mcSnapshots, err := store.fetchMCData() + require.NoError(t, err) + require.Len(t, mcSnapshots, 0) +} + +// TestDeserializeMCData verifies that the deserializeMCData function correctly +// deserializes key and value bytes into a MissionControlPairSnapshot. +// +// It performs the following steps: +// 1. Prepares sample data for serialization, including 'From' and 'To' public +// keys and a TimedPairResult. +// 2. Serializes the TimedPairResult into JSON format. +// 3. Concatenates the 'From' and 'To' public keys to form the key. +// 4. Deserializes the mission control data using deserializeMCData. +// 5. Verifies that the deserialized data matches the original data. +func TestDeserializeMCData(t *testing.T) { + // Prepare sample data for serialization. + from := route.Vertex{1} + to := route.Vertex{2} + timedPairResult := TimedPairResult{ + FailTime: time.Now(), + FailAmt: lnwire.MilliSatoshi(1000), + SuccessTime: time.Now().Add(-time.Hour), + SuccessAmt: lnwire.MilliSatoshi(2000), + } + data, err := json.Marshal(timedPairResult) + require.NoError(t, err) + + // Concatenate the 'From' and 'To' public keys to form the key. + // Create the key by concatenating From and To bytes. + key := append([]byte{}, from[:]...) + key = append(key, to[:]...) + + // Deserialize the mission control data. + mcSnapshot, err := deserializeMCData(key, data) + require.NoError(t, err) + require.Equal(t, from, mcSnapshot.Pair.From) + require.Equal(t, to, mcSnapshot.Pair.To) + require.True( + t, timedPairResultsAreEqual( + timedPairResult, mcSnapshot.TimedPairResult, + ), + ) +}