diff --git a/peerconn/conn_manager.go b/peerconn/conn_manager.go index 109c9a739b..3e3945202f 100644 --- a/peerconn/conn_manager.go +++ b/peerconn/conn_manager.go @@ -117,7 +117,7 @@ func (e *ErrPeerAlreadyConnected) Error() string { } // PeerConnManagerConfig holds config info for the peer conn manager. -type PeerConnManagerConfig struct { +type ManagerConfig struct { // PartialPeerConfig holds a peer config that's not yet completed and // will be finished by the peer conn manager when making a new peer. PartialPeerConfig peer.Config @@ -168,8 +168,8 @@ type PeerConnManagerConfig struct { } // PeerConnManager is responsible for managing peer connections. -type PeerConnManager struct { - Config *PeerConnManagerConfig +type Manager struct { + Config *ManagerConfig // IdentityECDH is an ECDH capable wrapper for the private key used // to authenticate any incoming connections. @@ -242,7 +242,7 @@ type PeerConnManager struct { } // Start will start the peer conn manager. -func (p *PeerConnManager) Start() error { +func (p *Manager) Start() error { // Create the connection manager which will be responsible for // maintaining persistent outbound connections and also accepting new // incoming connections @@ -280,7 +280,7 @@ func (p *PeerConnManager) Start() error { } // Stop will stop the peer conn manager. -func (p *PeerConnManager) Stop() error { +func (p *Manager) Stop() error { // Before we shutdown the manager, disconnect from each active peers to // ensure that peerTerminationWatchers signal completion to each peer. for _, peer := range p.Peers() { @@ -316,15 +316,15 @@ func (p *PeerConnManager) Stop() error { // Stopped returns true if the peer conn manager has been instructed to // shutdown. -func (p *PeerConnManager) Stopped() bool { +func (p *Manager) Stopped() bool { return atomic.LoadInt32(&p.stopping) != 0 } // NewPeerConnManager creates and returns a new peer conn manager. func NewPeerConnManager(nodeKey keychain.SingleKeyECDH, - tc *tor.Controller) *PeerConnManager { + tc *tor.Controller) *Manager { - return &PeerConnManager{ + return &Manager{ IdentityECDH: nodeKey, // Assemble a peer notifier which will provide clients with @@ -357,7 +357,7 @@ func NewPeerConnManager(nodeKey keychain.SingleKeyECDH, // advertised addresses for any NodeAnnouncements from our persisted peers. // //nolint:lll -func (p *PeerConnManager) UpdatePersistentPeerAddrs() error { +func (p *Manager) UpdatePersistentPeerAddrs() error { graphSub, err := p.Config.SubscribeTopology() if err != nil { return err @@ -445,7 +445,7 @@ type IgnoredPeers map[autopilot.NodeID]struct{} // to itself. // - the peers that already have connections with, as in s.peersByPub. // - the peers that we are attempting to connect, as in s.persistentPeers. -func (p *PeerConnManager) CreateBootstrapIgnorePeers() IgnoredPeers { +func (p *Manager) CreateBootstrapIgnorePeers() IgnoredPeers { p.mu.RLock() defer p.mu.RUnlock() @@ -477,7 +477,7 @@ func (p *PeerConnManager) CreateBootstrapIgnorePeers() IgnoredPeers { // invariant, we ensure that our node is connected to a diverse set of peers // and that nodes newly joining the network receive an up to date network view // as soon as possible. -func (p *PeerConnManager) PeerBootstrapper(numTargetPeers uint32, +func (p *Manager) PeerBootstrapper(numTargetPeers uint32, bootstrappers []discovery.NetworkPeerBootstrapper) { defer p.wg.Done() @@ -611,7 +611,7 @@ func (p *PeerConnManager) PeerBootstrapper(numTargetPeers uint32, // initialPeerBootstrap attempts to continuously connect to peers on startup // until the target number of peers has been reached. This ensures that nodes // receive an up to date network view as soon as possible. -func (p *PeerConnManager) initialPeerBootstrap( +func (p *Manager) initialPeerBootstrap( ignore map[autopilot.NodeID]struct{}, numTargetPeers uint32, bootstrappers []discovery.NetworkPeerBootstrapper) { @@ -728,7 +728,7 @@ type nodeAddresses struct { // to all our direct channel collaborators. In order to promote liveness of our // active channels, we instruct the connection manager to attempt to establish // and maintain persistent connections to all our direct channel counterparties. -func (p *PeerConnManager) EstablishPersistentConnections() error { +func (p *Manager) EstablishPersistentConnections() error { // nodeAddrsMap stores the combination of node public keys and addresses // that we'll attempt to reconnect to. PubKey strings are used as keys // since other PubKey forms can't be compared. @@ -907,7 +907,7 @@ func (p *PeerConnManager) EstablishPersistentConnections() error { // sampling a value for the delay between 0s and the maxInitReconnectDelay. // // NOTE: This method MUST be run as a goroutine. -func (p *PeerConnManager) delayInitialReconnect(pubStr string) { +func (p *Manager) delayInitialReconnect(pubStr string) { delay := time.Duration(prand.Intn(maxInitReconnectDelay)) * time.Second select { case <-time.After(delay): @@ -919,7 +919,7 @@ func (p *PeerConnManager) delayInitialReconnect(pubStr string) { // PrunePersistentPeerConnection removes all internal state related to // persistent connections to a peer within the server. This is used to avoid // persistent connection retries to peers we do not have any open channels with. -func (p *PeerConnManager) PrunePersistentPeerConnection( +func (p *Manager) PrunePersistentPeerConnection( compressedPubKey [33]byte) { pubKeyStr := string(compressedPubKey[:]) @@ -946,7 +946,7 @@ func (p *PeerConnManager) PrunePersistentPeerConnection( // the target peers. // // NOTE: This function is safe for concurrent access. -func (p *PeerConnManager) BroadcastMessage(skips map[route.Vertex]struct{}, +func (p *Manager) BroadcastMessage(skips map[route.Vertex]struct{}, msgs ...lnwire.Message) error { connLog.Debugf("Broadcasting %v messages", len(msgs)) @@ -998,7 +998,7 @@ func (p *PeerConnManager) BroadcastMessage(skips map[route.Vertex]struct{}, // particular peer comes online. The peer itself is sent across the peerChan. // // NOTE: This function is safe for concurrent access. -func (p *PeerConnManager) NotifyWhenOnline(peerKey [33]byte, +func (p *Manager) NotifyWhenOnline(peerKey [33]byte, peerChan chan<- lnpeer.Peer) { p.mu.Lock() @@ -1051,7 +1051,7 @@ func (p *PeerConnManager) NotifyWhenOnline(peerKey [33]byte, // NotifyWhenOffline delivers a notification to the caller of when the peer with // the given public key has been disconnected. The notification is signaled by // closing the channel returned. -func (p *PeerConnManager) NotifyWhenOffline( +func (p *Manager) NotifyWhenOffline( peerPubKey [33]byte) <-chan struct{} { p.mu.Lock() @@ -1083,7 +1083,7 @@ func (p *PeerConnManager) NotifyWhenOffline( // daemon's local representation of the remote peer. // // NOTE: This function is safe for concurrent access. -func (p *PeerConnManager) FindPeer( +func (p *Manager) FindPeer( peerKey *btcec.PublicKey) (*peer.Brontide, error) { p.mu.RLock() @@ -1099,7 +1099,7 @@ func (p *PeerConnManager) FindPeer( // public key. // // NOTE: This function is safe for concurrent access. -func (p *PeerConnManager) FindPeerByPubStr( +func (p *Manager) FindPeerByPubStr( pubStr string) (*peer.Brontide, error) { p.mu.RLock() @@ -1110,7 +1110,7 @@ func (p *PeerConnManager) FindPeerByPubStr( // findPeerByPubStr is an internal method that retrieves the specified peer from // the server's internal state using. -func (p *PeerConnManager) findPeerByPubStr( +func (p *Manager) findPeerByPubStr( pubStr string) (*peer.Brontide, error) { peer, ok := p.peersByPub[pubStr] @@ -1124,7 +1124,7 @@ func (p *PeerConnManager) findPeerByPubStr( // nextPeerBackoff computes the next backoff duration for a peer's pubkey using // exponential backoff. If no previous backoff was known, the default is // returned. -func (p *PeerConnManager) nextPeerBackoff(pubStr string, +func (p *Manager) nextPeerBackoff(pubStr string, startTime time.Time) time.Duration { // Now, determine the appropriate backoff to use for the retry. @@ -1187,7 +1187,7 @@ func shouldDropLocalConnection(local, remote *btcec.PublicKey) bool { // connection. // // NOTE: This function is safe for concurrent access. -func (p *PeerConnManager) InboundPeerConnected(conn net.Conn) { +func (p *Manager) InboundPeerConnected(conn net.Conn) { // Exit early if we have already been instructed to shutdown, this // prevents any delayed callbacks from accidentally registering peers. if p.Stopped() { @@ -1284,7 +1284,7 @@ func (p *PeerConnManager) InboundPeerConnected(conn net.Conn) { // OutboundPeerConnected initializes a new peer in response to a new outbound // connection. // NOTE: This function is safe for concurrent access. -func (p *PeerConnManager) OutboundPeerConnected(connReq *connmgr.ConnReq, +func (p *Manager) OutboundPeerConnected(connReq *connmgr.ConnReq, conn net.Conn) { // Exit early if we have already been instructed to shutdown, this @@ -1418,7 +1418,7 @@ const UnassignedConnID uint64 = 0 // optionally specify a connection ID to ignore, which prevents us from // canceling a successful request. All persistent connreqs for the provided // pubkey are discarded after the operationjw. -func (p *PeerConnManager) cancelConnReqs(pubStr string, skip *uint64) { +func (p *Manager) cancelConnReqs(pubStr string, skip *uint64) { // First, cancel any lingering persistent retry attempts, which will // prevent retries for any with backoffs that are still maturing. if cancelChan, ok := p.persistentRetryCancels[pubStr]; ok { @@ -1461,7 +1461,7 @@ func (p *PeerConnManager) cancelConnReqs(pubStr string, skip *uint64) { // peer by adding it to the server's global list of all active peers, and // starting all the goroutines the peer needs to function properly. The inbound // boolean should be true if the peer initiated the connection to us. -func (p *PeerConnManager) peerConnected(conn net.Conn, connReq *connmgr.ConnReq, +func (p *Manager) peerConnected(conn net.Conn, connReq *connmgr.ConnReq, inbound bool) { brontideConn, ok := conn.(*brontide.Conn) @@ -1544,7 +1544,7 @@ func (p *PeerConnManager) peerConnected(conn net.Conn, connReq *connmgr.ConnReq, // addPeer adds the passed peer to the server's global state of all active // peers. -func (p *PeerConnManager) addPeer(peer *peer.Brontide) { +func (p *Manager) addPeer(peer *peer.Brontide) { if peer == nil { return } @@ -1588,7 +1588,7 @@ func (p *PeerConnManager) addPeer(peer *peer.Brontide) { // be signaled of the new peer once the method returns. // // NOTE: This MUST be launched as a goroutine. -func (p *PeerConnManager) peerInitializer(peer *peer.Brontide) { +func (p *Manager) peerInitializer(peer *peer.Brontide) { defer p.wg.Done() // Avoid initializing peers while the server is exiting. @@ -1649,7 +1649,7 @@ func (p *PeerConnManager) peerInitializer(peer *peer.Brontide) { // successfully, otherwise the peer should be disconnected instead. // // NOTE: This MUST be launched as a goroutine. -func (p *PeerConnManager) peerTerminationWatcher(peer *peer.Brontide, +func (p *Manager) peerTerminationWatcher(peer *peer.Brontide, ready chan struct{}) { defer p.wg.Done() @@ -1847,7 +1847,7 @@ func (p *PeerConnManager) peerTerminationWatcher(peer *peer.Brontide, // currently none for a given address and it removes old connection requests // if the associated address is no longer in the latest address list for the // peer. -func (p *PeerConnManager) connectToPersistentPeer(pubKeyStr string) { +func (p *Manager) connectToPersistentPeer(pubKeyStr string) { p.mu.Lock() defer p.mu.Unlock() @@ -1947,7 +1947,7 @@ func (p *PeerConnManager) connectToPersistentPeer(pubKeyStr string) { // removePeer removes the passed peer from the server's state of all active // peers. -func (p *PeerConnManager) removePeer(peer *peer.Brontide) { +func (p *Manager) removePeer(peer *peer.Brontide) { if peer == nil { return } @@ -1999,7 +1999,7 @@ func (p *PeerConnManager) removePeer(peer *peer.Brontide) { // connection is established, or the initial handshake process fails. // // NOTE: This function is safe for concurrent access. -func (p *PeerConnManager) ConnectToPeer(addr *lnwire.NetAddress, +func (p *Manager) ConnectToPeer(addr *lnwire.NetAddress, perm bool, timeout time.Duration) error { targetPub := string(addr.IdentityKey.SerializeCompressed()) @@ -2075,7 +2075,7 @@ func (p *PeerConnManager) ConnectToPeer(addr *lnwire.NetAddress, // connectToPeer establishes a connection to a remote peer. errChan is used to // notify the caller if the connection attempt has failed. Otherwise, it will be // closed. -func (p *PeerConnManager) connectToPeer(addr *lnwire.NetAddress, +func (p *Manager) connectToPeer(addr *lnwire.NetAddress, errChan chan<- error, timeout time.Duration) { conn, err := brontide.Dial( @@ -2103,7 +2103,7 @@ func (p *PeerConnManager) connectToPeer(addr *lnwire.NetAddress, // identified by public key. // // NOTE: This function is safe for concurrent access. -func (p *PeerConnManager) DisconnectPeer(pubKey *btcec.PublicKey) error { +func (p *Manager) DisconnectPeer(pubKey *btcec.PublicKey) error { pubBytes := pubKey.SerializeCompressed() pubStr := string(pubBytes) @@ -2137,7 +2137,7 @@ func (p *PeerConnManager) DisconnectPeer(pubKey *btcec.PublicKey) error { // SendCustomMessage sends a custom message to the peer with the specified // pubkey. -func (p *PeerConnManager) SendCustomMessage(peerPub [33]byte, +func (p *Manager) SendCustomMessage(peerPub [33]byte, msgType lnwire.MessageType, data []byte) error { peer, err := p.FindPeerByPubStr(string(peerPub[:])) @@ -2165,7 +2165,7 @@ func (p *PeerConnManager) SendCustomMessage(peerPub [33]byte, } // AddPersistentPeer adds a peer's public key to the persistentPeers map. -func (p *PeerConnManager) AddPersistentPeer(peerKey *btcec.PublicKey) { +func (p *Manager) AddPersistentPeer(peerKey *btcec.PublicKey) { p.mu.Lock() defer p.mu.Unlock() @@ -2178,7 +2178,7 @@ func (p *PeerConnManager) AddPersistentPeer(peerKey *btcec.PublicKey) { // Peers returns a slice of all active peers. // // NOTE: This function is safe for concurrent access. -func (p *PeerConnManager) Peers() []*peer.Brontide { +func (p *Manager) Peers() []*peer.Brontide { p.mu.RLock() defer p.mu.RUnlock() @@ -2224,7 +2224,7 @@ var errNoAdvertisedAddr = errors.New("no advertised address found") // fetchNodeAdvertisedAddrs attempts to fetch the advertised addresses of a // node. -func (p *PeerConnManager) fetchNodeAdvertisedAddrs( +func (p *Manager) fetchNodeAdvertisedAddrs( pub *btcec.PublicKey) ([]net.Addr, error) { vertex, err := route.NewVertexFromBytes(pub.SerializeCompressed()) diff --git a/server.go b/server.go index db6cd15239..2893a74b2e 100644 --- a/server.go +++ b/server.go @@ -241,7 +241,7 @@ type server struct { wg sync.WaitGroup - pcm *peerconn.PeerConnManager + pcm *peerconn.Manager } // CustomMessage is a custom message that is received from a peer. @@ -1441,7 +1441,7 @@ func newServer(cfg *Config, listenAddrs []net.Addr, // Create liveness monitor. s.createLivenessMonitor(cfg, cc) - s.pcm.Config = &peerconn.PeerConnManagerConfig{ + s.pcm.Config = &peerconn.ManagerConfig{ PartialPeerConfig: s.createPartialPeerConfig(), FeatureMgr: s.featureMgr, Net: s.cfg.net,