Skip to content

Commit

Permalink
Merge pull request #4630 from nozim/refactor-libp2p-node
Browse files Browse the repository at this point in the history
[Networking] Tiny refactoring of p2pNode
  • Loading branch information
yhassanzadeh13 authored Sep 6, 2023
2 parents 74fc6c6 + 63c63a6 commit af0b381
Show file tree
Hide file tree
Showing 2 changed files with 78 additions and 48 deletions.
124 changes: 78 additions & 46 deletions network/p2p/libp2pNode.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,81 +15,113 @@ import (
"github.com/onflow/flow-go/module/component"
"github.com/onflow/flow-go/module/irrecoverable"
"github.com/onflow/flow-go/network"
flownet "github.com/onflow/flow-go/network"
"github.com/onflow/flow-go/network/channels"
"github.com/onflow/flow-go/network/p2p/unicast/protocols"
)

// LibP2PNode represents a flow libp2p node. It provides the network layer with the necessary interface to
// control the underlying libp2p node. It is essentially the flow wrapper around the libp2p node, and allows
// us to define different types of libp2p nodes that can operate in different ways by overriding these methods.
// TODO: this interface is highly coupled with the current implementation of the libp2p node. We should
//
// consider refactoring it to be more generic and less coupled with the current implementation.
// https://github.com/dapperlabs/flow-go/issues/6575
type LibP2PNode interface {
module.ReadyDoneAware
Subscriptions
// PeerConnections connection status information per peer.
PeerConnections
// PeerScore exposes the peer score API.
PeerScore
// DisallowListNotificationConsumer exposes the disallow list notification consumer API for the node so that
// it will be notified when a new disallow list update is distributed.
DisallowListNotificationConsumer
// CollectionClusterChangesConsumer is the interface for consuming the events of changes in the collection cluster.
// This is used to notify the node of changes in the collection cluster.
// LibP2PNode implements this interface and consumes the events to be notified of changes in the clustering channels.
// The clustering channels are used by the collection nodes of a cluster to communicate with each other.
// As the cluster (and hence their cluster channels) of collection nodes changes over time (per epoch) the node needs to be notified of these changes.
CollectionClusterChangesConsumer
// DisallowListOracle exposes the disallow list oracle API for external consumers to query about the disallow list.
DisallowListOracle
// CoreP2P service management capabilities
type CoreP2P interface {
// Start the libp2p node.
Start(ctx irrecoverable.SignalerContext)
// Stop terminates the libp2p node.
Stop() error
// GetIPPort returns the IP and Port the libp2p node is listening on.
GetIPPort() (string, string, error)
// Host returns pointer to host object of node.
Host() host.Host
// SetComponentManager sets the component manager for the node.
// SetComponentManager may be called at most once.
SetComponentManager(cm *component.ComponentManager)
}

// PeerManagement set of node traits related to its lifecycle and metadata retrieval
type PeerManagement interface {
// AddPeer adds a peer to this node by adding it to this node's peerstore and connecting to it.
AddPeer(ctx context.Context, peerInfo peer.AddrInfo) error
// RemovePeer closes the connection with the peer.
RemovePeer(peerID peer.ID) error
// GetPeersForProtocol returns slice peer IDs for the specified protocol ID.
GetPeersForProtocol(pid protocol.ID) peer.IDSlice
// CreateStream returns an existing stream connected to the peer if it exists, or creates a new stream with it.
CreateStream(ctx context.Context, peerID peer.ID) (libp2pnet.Stream, error)
// GetIPPort returns the IP and Port the libp2p node is listening on.
GetIPPort() (string, string, error)
// RoutingTable returns the node routing table
RoutingTable() *kbucket.RoutingTable
// ListPeers returns list of peer IDs for peers subscribed to the topic.
ListPeers(topic string) []peer.ID
// Subscribe subscribes the node to the given topic and returns the subscription
Subscribe(topic channels.Topic, topicValidator TopicValidatorFunc) (Subscription, error)
// Unsubscribe cancels the subscriber and closes the topic corresponding to the given channel.
Unsubscribe(topic channels.Topic) error
// Publish publishes the given payload on the topic.
Publish(ctx context.Context, messageScope network.OutgoingMessageScope) error
// Host returns pointer to host object of node.
Host() host.Host
// WithDefaultUnicastProtocol overrides the default handler of the unicast manager and registers all preferred protocols.
WithDefaultUnicastProtocol(defaultHandler libp2pnet.StreamHandler, preferred []protocols.ProtocolName) error
// GetPeersForProtocol returns slice peer IDs for the specified protocol ID.
GetPeersForProtocol(pid protocol.ID) peer.IDSlice
// WithPeersProvider sets the PeersProvider for the peer manager.
// If a peer manager factory is set, this method will set the peer manager's PeersProvider.
WithPeersProvider(peersProvider PeersProvider)
// PeerManagerComponent returns the component interface of the peer manager.
PeerManagerComponent() component.Component
// RequestPeerUpdate requests an update to the peer connections of this node using the peer manager.
RequestPeerUpdate()
}

// Routable set of node routing capabilities
type Routable interface {
// RoutingTable returns the node routing table
RoutingTable() *kbucket.RoutingTable
// SetRouting sets the node's routing implementation.
// SetRouting may be called at most once.
SetRouting(r routing.Routing)
// Routing returns node routing object.
Routing() routing.Routing
}

// StreamManagement peer to peer stream management functions
type UnicastManagement interface {
// CreateStream returns an existing stream connected to the peer if it exists, or creates a new stream with it.
CreateStream(ctx context.Context, peerID peer.ID) (libp2pnet.Stream, error)
// WithDefaultUnicastProtocol overrides the default handler of the unicast manager and registers all preferred protocols.
WithDefaultUnicastProtocol(defaultHandler libp2pnet.StreamHandler, preferred []protocols.ProtocolName) error
}

// PubSub publish subscribe features for node
type PubSub interface {
// Subscribe subscribes the node to the given topic and returns the subscription
Subscribe(topic channels.Topic, topicValidator TopicValidatorFunc) (Subscription, error)
// UnSubscribe cancels the subscriber and closes the topic.
Unsubscribe(topic channels.Topic) error
// Publish publishes the given payload on the topic.
Publish(ctx context.Context, messageScope flownet.OutgoingMessageScope) error
// SetPubSub sets the node's pubsub implementation.
// SetPubSub may be called at most once.
SetPubSub(ps PubSubAdapter)
// SetComponentManager sets the component manager for the node.
// SetComponentManager may be called at most once.
SetComponentManager(cm *component.ComponentManager)
}

// LibP2PNode represents a Flow libp2p node. It provides the network layer with the necessary interface to
// control the underlying libp2p node. It is essentially the Flow wrapper around the libp2p node, and allows
// us to define different types of libp2p nodes that can operate in different ways by overriding these methods.
type LibP2PNode interface {
module.ReadyDoneAware
Subscriptions
// PeerConnections connection status information per peer.
PeerConnections
// PeerScore exposes the peer score API.
PeerScore
// DisallowListNotificationConsumer exposes the disallow list notification consumer API for the node so that
// it will be notified when a new disallow list update is distributed.
DisallowListNotificationConsumer
// CollectionClusterChangesConsumer is the interface for consuming the events of changes in the collection cluster.
// This is used to notify the node of changes in the collection cluster.
// LibP2PNode implements this interface and consumes the events to be notified of changes in the clustering channels.
// The clustering channels are used by the collection nodes of a cluster to communicate with each other.
// As the cluster (and hence their cluster channels) of collection nodes changes over time (per epoch) the node needs to be notified of these changes.
CollectionClusterChangesConsumer
// DisallowListOracle exposes the disallow list oracle API for external consumers to query about the disallow list.
DisallowListOracle

// CoreP2P service management capabilities
CoreP2P

// PeerManagement current peer management functions
PeerManagement

// Routable routing related features
Routable

// PubSub publish subscribe features for node
PubSub

// UnicastManagement node stream management
UnicastManagement
}

// Subscriptions set of funcs related to current subscription info of a node.
Expand Down
2 changes: 0 additions & 2 deletions network/p2p/p2pnode/libp2pNode.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,8 +90,6 @@ func NewNode(
}
}

var _ p2p.LibP2PNode = (*Node)(nil)

func (n *Node) Start(ctx irrecoverable.SignalerContext) {
n.Component.Start(ctx)
}
Expand Down

0 comments on commit af0b381

Please sign in to comment.