Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[3/3] Graph RIP: multi: RPC GraphSource #9265

Open
wants to merge 16 commits into
base: elle-graphSourceAbstraction
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .golangci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ run:
- autopilotrpc
- chainrpc
- dev
- graphrpc
- invoicesrpc
- neutrinorpc
- peersrpc
Expand Down
17 changes: 12 additions & 5 deletions config.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,6 @@ const (
defaultLogFilename = "lnd.log"
defaultRPCPort = 10009
defaultRESTPort = 8080
defaultPeerPort = 9735
defaultRPCHost = "localhost"

defaultNoSeedBackup = false
Expand Down Expand Up @@ -486,6 +485,8 @@ type Config struct {

RemoteSigner *lncfg.RemoteSigner `group:"remotesigner" namespace:"remotesigner"`

RemoteGraph *lncfg.RemoteGraph `group:"remotegraph" namespace:"remotegraph"`

Sweeper *lncfg.Sweeper `group:"sweeper" namespace:"sweeper"`

Htlcswitch *lncfg.Htlcswitch `group:"htlcswitch" namespace:"htlcswitch"`
Expand Down Expand Up @@ -723,6 +724,9 @@ func DefaultConfig() Config {
RemoteSigner: &lncfg.RemoteSigner{
Timeout: lncfg.DefaultRemoteSignerRPCTimeout,
},
RemoteGraph: &lncfg.RemoteGraph{
Timeout: lncfg.DefaultRemoteGraphRPCTimeout,
},
Sweeper: lncfg.DefaultSweeperConfig(),
Htlcswitch: &lncfg.Htlcswitch{
MailboxDeliveryTimeout: htlcswitch.DefaultMailboxDeliveryTimeout,
Expand Down Expand Up @@ -1476,9 +1480,11 @@ func ValidateConfig(cfg Config, interceptor signal.Interceptor, fileParser,
// default to only listening on localhost for hidden service
// connections.
if len(cfg.RawListeners) == 0 {
addr := fmt.Sprintf(":%d", defaultPeerPort)
addr := fmt.Sprintf(":%d", lncfg.DefaultPeerPort)
if cfg.Tor.Active && !cfg.Tor.SkipProxyForClearNetTargets {
addr = fmt.Sprintf("localhost:%d", defaultPeerPort)
addr = fmt.Sprintf(
"localhost:%d", lncfg.DefaultPeerPort,
)
}
cfg.RawListeners = append(cfg.RawListeners, addr)
}
Expand Down Expand Up @@ -1557,7 +1563,7 @@ func ValidateConfig(cfg Config, interceptor signal.Interceptor, fileParser,
// Add default port to all listener addresses if needed and remove
// duplicate addresses.
cfg.Listeners, err = lncfg.NormalizeAddresses(
cfg.RawListeners, strconv.Itoa(defaultPeerPort),
cfg.RawListeners, strconv.Itoa(lncfg.DefaultPeerPort),
cfg.net.ResolveTCPAddr,
)
if err != nil {
Expand All @@ -1568,7 +1574,7 @@ func ValidateConfig(cfg Config, interceptor signal.Interceptor, fileParser,
// Add default port to all external IP addresses if needed and remove
// duplicate addresses.
cfg.ExternalIPs, err = lncfg.NormalizeAddresses(
cfg.RawExternalIPs, strconv.Itoa(defaultPeerPort),
cfg.RawExternalIPs, strconv.Itoa(lncfg.DefaultPeerPort),
cfg.net.ResolveTCPAddr,
)
if err != nil {
Expand Down Expand Up @@ -1749,6 +1755,7 @@ func ValidateConfig(cfg Config, interceptor signal.Interceptor, fileParser,
cfg.HealthChecks,
cfg.RPCMiddleware,
cfg.RemoteSigner,
cfg.RemoteGraph,
cfg.Sweeper,
cfg.Htlcswitch,
cfg.Invoices,
Expand Down
86 changes: 85 additions & 1 deletion config_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package lnd
import (
"bytes"
"context"
"crypto/x509"
"database/sql"
"errors"
"fmt"
Expand Down Expand Up @@ -42,13 +43,15 @@ import (
"github.com/lightningnetwork/lnd/kvdb"
"github.com/lightningnetwork/lnd/lncfg"
"github.com/lightningnetwork/lnd/lnrpc"
"github.com/lightningnetwork/lnd/lnrpc/graphrpc"
"github.com/lightningnetwork/lnd/lnwallet"
"github.com/lightningnetwork/lnd/lnwallet/btcwallet"
"github.com/lightningnetwork/lnd/lnwallet/chancloser"
"github.com/lightningnetwork/lnd/lnwallet/rpcwallet"
"github.com/lightningnetwork/lnd/macaroons"
"github.com/lightningnetwork/lnd/msgmux"
"github.com/lightningnetwork/lnd/routing"
"github.com/lightningnetwork/lnd/routing/route"
"github.com/lightningnetwork/lnd/rpcperms"
"github.com/lightningnetwork/lnd/signal"
"github.com/lightningnetwork/lnd/sqldb"
Expand All @@ -58,7 +61,9 @@ import (
"github.com/lightningnetwork/lnd/watchtower/wtclient"
"github.com/lightningnetwork/lnd/watchtower/wtdb"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials"
"gopkg.in/macaroon-bakery.v2/bakery"
"gopkg.in/macaroon.v2"
)

// GrpcRegistrar is an interface that must be satisfied by an external subserver
Expand Down Expand Up @@ -270,7 +275,86 @@ func (d *DefaultWalletImpl) RegisterGrpcSubserver(s *grpc.Server) error {
func (d *DefaultWalletImpl) Graph(_ context.Context,
dbs *DatabaseInstances) (sources.GraphSource, error) {

return sources.NewDBGSource(dbs.GraphDB), nil
localSource := sources.NewDBGSource(dbs.GraphDB)

if d.cfg.RemoteGraph == nil || !d.cfg.RemoteGraph.Enable {
return localSource, nil
}

cfg := d.cfg.RemoteGraph
conn, err := connectRPC(
cfg.RPCHost, cfg.TLSCertPath, cfg.MacaroonPath, cfg.Timeout,
)
if err != nil {
return nil, err
}

remoteSourceClient := graphrpc.NewRemoteClient(
conn, d.cfg.net.ResolveTCPAddr,
)

getLocalPub := func() (route.Vertex, error) {
node, err := dbs.GraphDB.SourceNode()
if err != nil {
return route.Vertex{}, err
}

return route.NewVertexFromBytes(node.PubKeyBytes[:])
}

return sources.NewMux(
localSource, remoteSourceClient, getLocalPub,
), nil
}

// connectRPC tries to establish an RPC connection to the given host:port with
// the supplied certificate and macaroon.
func connectRPC(hostPort, tlsCertPath, macaroonPath string,
timeout time.Duration) (*grpc.ClientConn, error) {

certBytes, err := os.ReadFile(tlsCertPath)
if err != nil {
return nil, fmt.Errorf("error reading TLS cert file %v: %w",
tlsCertPath, err)
}

cp := x509.NewCertPool()
if !cp.AppendCertsFromPEM(certBytes) {
return nil, fmt.Errorf("credentials: failed to append " +
"certificate")
}

macBytes, err := os.ReadFile(macaroonPath)
if err != nil {
return nil, fmt.Errorf("error reading macaroon file %v: %w",
macaroonPath, err)
}
mac := &macaroon.Macaroon{}
if err := mac.UnmarshalBinary(macBytes); err != nil {
return nil, fmt.Errorf("error decoding macaroon: %w", err)
}

macCred, err := macaroons.NewMacaroonCredential(mac)
if err != nil {
return nil, fmt.Errorf("error creating creds: %w", err)
}

opts := []grpc.DialOption{
grpc.WithTransportCredentials(credentials.NewClientTLSFromCert(
cp, "",
)),
grpc.WithPerRPCCredentials(macCred),
grpc.WithBlock(),
}
ctxt, cancel := context.WithTimeout(context.Background(), timeout)
defer cancel()
conn, err := grpc.DialContext(ctxt, hostPort, opts...)
if err != nil {
return nil, fmt.Errorf("unable to connect to RPC server: %w",
err)
}

return conn, nil
}

// ValidateMacaroon extracts the macaroon from the context's gRPC metadata,
Expand Down
31 changes: 31 additions & 0 deletions discovery/gossiper.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,13 @@ var (
// the remote peer.
ErrGossipSyncerNotFound = errors.New("gossip syncer not found")

// ErrUnexpectedGossipQueries is an error that is returned if we receive
// gossip queries from a peer when we have disabled gossip syncing and
// in other words have not advertised that we support gossip queries.
ErrUnexpectedGossipQueries = errors.New(
"unexpected gossip queries received",
)

// emptyPubkey is used to compare compressed pubkeys against an empty
// byte array.
emptyPubkey [33]byte
Expand Down Expand Up @@ -359,6 +366,11 @@ type Config struct {
// updates for a channel and returns true if the channel should be
// considered a zombie based on these timestamps.
IsStillZombieChannel func(time.Time, time.Time) bool

// NoGossipSync is true if gossip syncing has been disabled. It
// indicates that we have not advertised the gossip queries feature bit,
// and so we should not receive any gossip queries from our peers.
NoGossipSync bool
}

// processedNetworkMsg is a wrapper around networkMsg and a boolean. It is
Expand Down Expand Up @@ -810,6 +822,25 @@ func (d *AuthenticatedGossiper) ProcessRemoteAnnouncement(msg lnwire.Message,

errChan := make(chan error, 1)

// If gossip syncing has been disabled, we expect not to receive any
// gossip queries from our peer.
if d.cfg.NoGossipSync {
switch m := msg.(type) {
case *lnwire.QueryShortChanIDs,
*lnwire.QueryChannelRange,
*lnwire.ReplyChannelRange,
*lnwire.ReplyShortChanIDsEnd,
*lnwire.GossipTimestampRange:

log.Warnf("Gossip syncing was disabled, "+
"skipping message: %v", m)
errChan <- ErrUnexpectedGossipQueries

return errChan
default:
}
}

// For messages in the known set of channel series queries, we'll
// dispatch the message directly to the GossipSyncer, and skip the main
// processing loop.
Expand Down
10 changes: 10 additions & 0 deletions docs/release-notes/release-notes-0.19.0.md
Original file line number Diff line number Diff line change
Expand Up @@ -209,6 +209,16 @@ The underlying functionality between those two options remain the same.
it with an external graph source rather than requiring it to first sync its
own graph.

* [Add an RPC implementation of the `GraphSource`
interface](https://github.com/lightningnetwork/lnd/pull/9265). With this PR,
users can now provide an external graph source to LND using the new
`--remotegraph` config options. These can be combined with the new
`--gossip.no-sync` option to run a node that does not sync gossip and uses a
remote LND node to provide it with network gossip data. If this configuration
is being used, it is recommended to set the
`--caches.rpc-graph-cache-duration` to a non-zero duration (zero is the
default) so that certain calls to the remote node are cached for some time.

## Tooling and Documentation

* [Improved `lncli create` command help text](https://github.com/lightningnetwork/lnd/pull/9077)
Expand Down
7 changes: 7 additions & 0 deletions feature/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,9 @@ type Config struct {
// forwarding experimental endorsement.
NoExperimentalEndorsement bool

// NoGossipQueries unsets the gossip queries feature bit.
NoGossipQueries bool

// CustomFeatures is a set of custom features to advertise in each
// set.
CustomFeatures map[Set][]lnwire.FeatureBit
Expand Down Expand Up @@ -214,6 +217,10 @@ func newManager(cfg Config, desc setDesc) (*Manager, error) {
raw.Unset(lnwire.ExperimentalEndorsementOptional)
raw.Unset(lnwire.ExperimentalEndorsementRequired)
}
if cfg.NoGossipQueries {
raw.Unset(lnwire.GossipQueriesOptional)
raw.Unset(lnwire.GossipQueriesRequired)
}

for _, custom := range cfg.CustomFeatures[set] {
if custom > set.Maximum() {
Expand Down
13 changes: 8 additions & 5 deletions graph/db/graph.go
Original file line number Diff line number Diff line change
Expand Up @@ -723,7 +723,7 @@ func (c *ChannelGraph) DisabledChannelIDs() ([]uint64, error) {
// ForEachNode iterates through all the stored vertices/nodes in the graph,
// executing the passed callback with each node encountered. If the callback
// returns an error, then the transaction is aborted and the iteration stops
// early.
// early. No error is returned if no nodes are found.
//
// TODO(roasbeef): add iterator interface to allow for memory efficient graph
// traversal when graph gets mega
Expand Down Expand Up @@ -936,6 +936,7 @@ func addLightningNode(tx kvdb.RwTx, node *models.LightningNode) error {
}

// LookupAlias attempts to return the alias as advertised by the target node.
// ErrNodeAliasNotFound is returned if no alias is found.
// TODO(roasbeef): currently assumes that aliases are unique...
func (c *ChannelGraph) LookupAlias(pub *btcec.PublicKey) (string, error) {
var alias string
Expand Down Expand Up @@ -3090,7 +3091,7 @@ var _ GraphCacheNode = (*graphCacheNode)(nil)
// target node identity public key. If the node exists in the database, a
// timestamp of when the data for the node was lasted updated is returned along
// with a true boolean. Otherwise, an empty time.Time is returned with a false
// boolean.
// boolean and a nil error.
func (c *ChannelGraph) HasLightningNode(nodePub [33]byte) (time.Time, bool,
error) {

Expand Down Expand Up @@ -3226,7 +3227,8 @@ func nodeTraversal(tx kvdb.RTx, nodePub []byte, db kvdb.Backend,
// of each end of the channel. The first edge policy is the outgoing edge *to*
// the connecting node, while the second is the incoming edge *from* the
// connecting node. If the callback returns an error, then the iteration is
// halted with the error propagated back up to the caller.
// halted with the error propagated back up to the caller. No error is returned
// if the node is not found.
//
// Unknown policies are passed into the callback as nil values.
func (c *ChannelGraph) ForEachNodeChannel(nodePub route.Vertex,
Expand Down Expand Up @@ -3414,7 +3416,7 @@ func (c *ChannelGraph) FetchChannelEdgesByOutpoint(op *wire.OutPoint) (
// for the channel itself is returned as well as two structs that contain the
// routing policies for the channel in either direction.
//
// ErrZombieEdge an be returned if the edge is currently marked as a zombie
// ErrZombieEdge can be returned if the edge is currently marked as a zombie
// within the database. In this case, the ChannelEdgePolicy's will be nil, and
// the ChannelEdgeInfo will only include the public keys of each node.
func (c *ChannelGraph) FetchChannelEdgesByID(chanID uint64) (
Expand Down Expand Up @@ -3518,7 +3520,8 @@ func (c *ChannelGraph) FetchChannelEdgesByID(chanID uint64) (

// IsPublicNode is a helper method that determines whether the node with the
// given public key is seen as a public node in the graph from the graph's
// source node's point of view.
// source node's point of view. If this node is unknown, then
// ErrGraphNodeNotFound is returned.
func (c *ChannelGraph) IsPublicNode(pubKey [33]byte) (bool, error) {
var nodeIsPublic bool
err := kvdb.View(c.db, func(tx kvdb.RTx) error {
Expand Down
2 changes: 1 addition & 1 deletion graph/db/log.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
package graphdb

import (
"github.com/btcsuite/btclog"
"github.com/btcsuite/btclog/v2"
"github.com/lightningnetwork/lnd/build"
)

Expand Down
5 changes: 3 additions & 2 deletions graph/session/graph_session.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ type graph interface {
// node, executing the passed callback on the directed edge representing
// the channel and its incoming policy. If the callback returns an
// error, then the iteration is halted with the error propagated back
// up to the caller.
// up to the caller. No error is returned if the node is not found.
//
// Unknown policies are passed into the callback as nil values.
//
Expand All @@ -135,7 +135,8 @@ type graph interface {
cb func(channel *graphdb.DirectedChannel) error) error

// FetchNodeFeatures returns the features of a given node. If no
// features are known for the node, an empty feature vector is returned.
// features are known for the node or if the node itself is unknown,
// an empty feature vector is returned.
//
// NOTE: if a nil tx is provided, then it is expected that the
// implementation create a read only tx.
Expand Down
Loading
Loading