Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

769 implement beefy networking #794

Open
wants to merge 4 commits into
base: dev
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
97 changes: 68 additions & 29 deletions src/main/java/com/limechain/network/ConnectionManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
import com.limechain.network.dto.ProtocolStreamType;
import com.limechain.network.dto.ProtocolStreams;
import com.limechain.network.protocol.blockannounce.NodeRole;
import com.limechain.network.protocol.blockannounce.messages.BlockAnnounceHandshake;
import com.limechain.network.protocol.blockannounce.messages.Handshake;
import com.limechain.network.protocol.blockannounce.messages.BlockAnnounceMessage;
import com.limechain.network.protocol.warp.dto.BlockHeader;
import io.libp2p.core.PeerId;
Expand All @@ -25,6 +25,7 @@
@Log
@NoArgsConstructor(access = AccessLevel.PROTECTED)
public class ConnectionManager {

private static ConnectionManager instance;
protected final Map<PeerId, PeerInfo> peers = new HashMap<>();

Expand All @@ -45,6 +46,14 @@ public PeerInfo getPeerInfo(PeerId peerId) {
return peers.get(peerId);
}

/**
* Adds a Transaction stream to the peer info. Peer id is retrieved from the stream.
* @param stream stream to be added
*/
public void addTransactionsStream(Stream stream) {
addStream(stream, ProtocolStreamType.TRANSACTIONS);
}

/**
* Adds a Block Announce stream to the peer info. Peer id is retrieved from the stream.
*
Expand All @@ -63,8 +72,12 @@ public void addGrandpaStream(Stream stream) {
addStream(stream, ProtocolStreamType.GRANDPA);
}

public void addTransactionsStream(Stream stream) {
addStream(stream, ProtocolStreamType.TRANSACTIONS);
/**
* Adds a BEEFY stream to the peer info. Peer id is retrieved from the stream.
* @param stream stream to be added
*/
public void addBeefyStream(Stream stream) {
addStream(stream, ProtocolStreamType.BEEFY);
}

private void addStream(Stream stream, ProtocolStreamType type) {
Expand Down Expand Up @@ -94,15 +107,6 @@ public PeerInfo addNewPeer(PeerId peerId) {
return peerInfo;
}

/**
* Removes a GRANDPA stream from the peer info. Peer id is retrieved from the stream.
*
* @param stream stream to be closed
*/
public void closeGrandpaStream(Stream stream) {
closeStream(stream, ProtocolStreamType.GRANDPA);
}

/**
* Removes a Transactions stream from the peer info. Peer id is retrieved from the stream.
*
Expand All @@ -121,6 +125,23 @@ public void closeBlockAnnounceStream(Stream stream) {
closeStream(stream, ProtocolStreamType.BLOCK_ANNOUNCE);
}

/**
* Removes a GRANDPA stream from the peer info. Peer id is retrieved from the stream.
*
* @param stream stream to be closed
*/
public void closeGrandpaStream(Stream stream) {
closeStream(stream, ProtocolStreamType.GRANDPA);
}

/**
* Removes a BEEFY stream from the peer info. Peer id is retrieved from the stream
* @param stream stream to be closed
*/
public void closeBeefyStream(Stream stream) {
closeStream(stream, ProtocolStreamType.BEEFY);
}

private void closeStream(Stream stream, ProtocolStreamType type) {
PeerInfo peerInfo = peers.get(stream.remotePeerId());
if (peerInfo == null) {
Expand All @@ -142,18 +163,18 @@ private void closeStream(Stream stream, ProtocolStreamType type) {
* based on a Block Announce Handshake.
*
* @param peerId peer to be updated
* @param blockAnnounceHandshake handshake
* @param handshake handshake
*/
public void updatePeer(PeerId peerId, BlockAnnounceHandshake blockAnnounceHandshake) {
public void updatePeer(PeerId peerId, Handshake handshake) {
PeerInfo peerInfo = peers.get(peerId);
if (peerInfo == null) {
log.log(Level.WARNING, "Trying to update missing peer " + peerId);
return;
}
peerInfo.setNodeRole(blockAnnounceHandshake.getNodeRole());
peerInfo.setGenesisBlockHash(blockAnnounceHandshake.getGenesisBlockHash());
peerInfo.setBestBlock(blockAnnounceHandshake.getBestBlock());
peerInfo.setBestBlockHash(blockAnnounceHandshake.getBestBlockHash());
peerInfo.setNodeRole(handshake.getNodeRole());
peerInfo.setGenesisBlockHash(handshake.getGenesisBlockHash());
peerInfo.setBestBlock(handshake.getBestBlock());
peerInfo.setBestBlockHash(handshake.getBestBlockHash());
}

/**
Expand All @@ -179,16 +200,6 @@ public void updatePeer(PeerId peerId, BlockAnnounceMessage blockAnnounceMessage)
}
}

/**
* Checks if we have an open GRANDPA responder stream with a peer.
*
* @param peerId peer to check
* @return do peer info and GRANDPA responder stream exist
*/
public boolean isGrandpaConnected(PeerId peerId) {
return peers.containsKey(peerId) && peers.get(peerId).getGrandpaStreams().getResponder() != null;
}

/**
* Checks if we have an open Transactions responder stream with a peer.
*
Expand All @@ -209,6 +220,25 @@ public boolean isBlockAnnounceConnected(PeerId peerId) {
return peers.containsKey(peerId) && peers.get(peerId).getBlockAnnounceStreams().getResponder() != null;
}

/**
* Checks if we have an open GRANDPA responder stream with a peer.
*
* @param peerId peer to check
* @return do peer info and GRANDPA responder stream exist
*/
public boolean isGrandpaConnected(PeerId peerId) {
return peers.containsKey(peerId) && peers.get(peerId).getGrandpaStreams().getResponder() != null;
}

/**
* Checks if we have an open BEEFY responder steam with a peer
* @param peerId peer to check
* @return do peer info and BEEFY responder steam exist
*/
public boolean isBeefyConnected(PeerId peerId) {
return peers.containsKey(peerId) && peers.get(peerId).getBeefyStreams().getResponder() != null;
}

/**
* Gets the ids of all peers with open connections.
* Open connection means either Grandpa or Block Announce stream has been opened.
Expand All @@ -223,10 +253,14 @@ public Set<PeerId> getPeerIds() {
* Closes conneciton to all the connected peers and removes them from the peersList.
*/
public void removeAllPeers() {
peers.forEach((peerId, peerInfo) -> {

peers.values().forEach(peerInfo -> {
closeProtocolStream(peerInfo.getTransactionsStreams());
closeProtocolStream(peerInfo.getBlockAnnounceStreams());
closeProtocolStream(peerInfo.getGrandpaStreams());
closeProtocolStream(peerInfo.getBeefyStreams());
});

peers.clear();
}

Expand All @@ -236,10 +270,15 @@ public void removeAllPeers() {
* @param peerId peerId of the peer to be removed
*/
public void removePeer(PeerId peerId) {

if (peers.containsKey(peerId)) {

PeerInfo peerInfo = peers.get(peerId);
closeProtocolStream(peerInfo.getTransactionsStreams());
closeProtocolStream(peerInfo.getBlockAnnounceStreams());
closeProtocolStream(peerInfo.getGrandpaStreams());
closeProtocolStream(peerInfo.getBeefyStreams());

peers.remove(peerId);
}
}
Expand Down
21 changes: 15 additions & 6 deletions src/main/java/com/limechain/network/NetworkService.java
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import com.limechain.config.HostConfig;
import com.limechain.constants.GenesisBlockHash;
import com.limechain.network.kad.KademliaService;
import com.limechain.network.protocol.beefy.BeefyService;
import com.limechain.network.protocol.blockannounce.BlockAnnounceService;
import com.limechain.network.protocol.blockannounce.NodeRole;
import com.limechain.network.protocol.grandpa.GrandpaService;
Expand Down Expand Up @@ -70,9 +71,10 @@ public class NetworkService implements NodeService {
private WarpSyncService warpSyncService;
private LightMessagesService lightMessagesService;

private TransactionsService transactionsService;
private BlockAnnounceService blockAnnounceService;
private GrandpaService grandpaService;
private TransactionsService transactionsService;
private BeefyService beefyService;

private Ping ping;

Expand Down Expand Up @@ -104,6 +106,7 @@ public NetworkService(ChainService chainService, HostConfig hostConfig, KVReposi
private void initializeProtocols(ChainService chainService, GenesisBlockHash genesisBlockHash,
HostConfig hostConfig,
KVRepository<String, Object> repository, CliArguments cliArgs) {

boolean isLocalEnabled = hostConfig.getChain() == Chain.LOCAL;
boolean clientMode = true;

Expand All @@ -118,26 +121,31 @@ private void initializeProtocols(ChainService chainService, GenesisBlockHash gen
String pingProtocol = ProtocolUtils.PING_PROTOCOL;
String chainId = chainService.getChainSpec().getProtocolId();
boolean legacyProtocol = !cliArgs.noLegacyProtocols();
String protocolId = legacyProtocol ? chainId :
StringUtils.remove0xPrefix(genesisBlockHash.getGenesisHash().toString());
String genesisBlockHashWithoutPrefix = StringUtils.remove0xPrefix(genesisBlockHash.getGenesisHash().toString());
String protocolId = legacyProtocol ?
chainId :
genesisBlockHashWithoutPrefix;

String kadProtocolId = ProtocolUtils.getKadProtocol(chainId);
String warpProtocolId = ProtocolUtils.getWarpSyncProtocol(protocolId);
String lightProtocolId = ProtocolUtils.getLightMessageProtocol(protocolId);
String syncProtocolId = ProtocolUtils.getSyncProtocol(protocolId);
String stateProtocolId = ProtocolUtils.getStateProtocol(protocolId);
String transactionsProtocolId = ProtocolUtils.getTransactionsProtocol(protocolId);
String blockAnnounceProtocolId = ProtocolUtils.getBlockAnnounceProtocol(protocolId);
String grandpaProtocolId = ProtocolUtils.getGrandpaProtocol(protocolId, legacyProtocol);
String transactionsProtocolId = ProtocolUtils.getTransactionsProtocol(protocolId);
String beefyProtocolId = ProtocolUtils.getBeefyProtocol(genesisBlockHashWithoutPrefix);

kademliaService = new KademliaService(kadProtocolId, hostId, isLocalEnabled, clientMode);
lightMessagesService = new LightMessagesService(lightProtocolId);
warpSyncService = new WarpSyncService(warpProtocolId);
syncService = new SyncService(syncProtocolId);
stateService = new StateService(stateProtocolId);
transactionsService = new TransactionsService(transactionsProtocolId);
blockAnnounceService = new BlockAnnounceService(blockAnnounceProtocolId);
grandpaService = new GrandpaService(grandpaProtocolId);
beefyService = new BeefyService(beefyProtocolId);
ping = new Ping(pingProtocol, new PingProtocol());
transactionsService = new TransactionsService(transactionsProtocolId);

hostBuilder.addProtocols(
List.of(
Expand All @@ -155,7 +163,8 @@ private void initializeProtocols(ChainService chainService, GenesisBlockHash gen
if (nodeRole == NodeRole.AUTHORING) {
hostBuilder.addProtocols(
List.of(
transactionsService.getProtocol()
transactionsService.getProtocol(),
beefyService.getProtocol()
)
);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,9 @@ public void handshakePeers() {
if (network.getNodeRole().equals(NodeRole.AUTHORING)) {
asyncExecutor.executeAndForget(() ->
network.getTransactionsService().sendHandshake(network.getHost(), peerId));

asyncExecutor.executeAndForget(() ->
network.getBeefyService().sendHandshake(network.getHost(), peerId));
}
});
}
Expand Down
19 changes: 13 additions & 6 deletions src/main/java/com/limechain/network/ProtocolUtils.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,19 +23,26 @@ public static String getStateProtocol(String chainId) {
return String.format("/%s/state/2", chainId);
}

public static String getBlockAnnounceProtocol(String chainId) {
return String.format("/%s/block-announces/1", chainId);
}

public static String getKadProtocol(String chainId) {
return String.format("/%s/kad", chainId);
}

public static String getTransactionsProtocol(String chainId) {
return String.format("/%s/transactions/1", chainId);
}

public static String getBlockAnnounceProtocol(String chainId) {
return String.format("/%s/block-announces/1", chainId);
}

public static String getGrandpaProtocol(String chainId, boolean legacyProtocol) {
return String.format("/%s/grandpa/1", legacyProtocol ? "paritytech" : chainId);
}

public static String getTransactionsProtocol(String chainId) {
return String.format("/%s/transactions/1", chainId);
// NOTE: Beefy was likely not part of the original protocols and therefore
// only operates with the genesis hash. As a result, it does not support
// the {chainId}/beefy/2 format.
public static String getBeefyProtocol(String genesisBlockHash) {
return String.format("/%s/beefy/2", genesisBlockHash);
}
}
3 changes: 3 additions & 0 deletions src/main/java/com/limechain/network/dto/PeerInfo.java
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
@Data
@NoArgsConstructor
public class PeerInfo {

private PeerId peerId;
private int nodeRole;
private BigInteger bestBlock;
Expand All @@ -21,6 +22,7 @@ public class PeerInfo {
private final ProtocolStreams blockAnnounceStreams = new ProtocolStreams();
private final ProtocolStreams grandpaStreams = new ProtocolStreams();
private final ProtocolStreams transactionsStreams = new ProtocolStreams();
private final ProtocolStreams beefyStreams = new ProtocolStreams();

public String getNodeRoleName(){
return Arrays
Expand All @@ -36,6 +38,7 @@ public ProtocolStreams getProtocolStreams(ProtocolStreamType type) {
case GRANDPA -> grandpaStreams;
case BLOCK_ANNOUNCE -> blockAnnounceStreams;
case TRANSACTIONS -> transactionsStreams;
case BEEFY -> beefyStreams;
};
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
package com.limechain.network.dto;

public enum ProtocolStreamType {
GRANDPA, BLOCK_ANNOUNCE, TRANSACTIONS
TRANSACTIONS, BLOCK_ANNOUNCE, GRANDPA, BEEFY
}
12 changes: 12 additions & 0 deletions src/main/java/com/limechain/network/protocol/beefy/Beefy.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
package com.limechain.network.protocol.beefy;

import com.limechain.network.StrictProtocolBinding;

/**
* BEEFY protocol binding
*/
public class Beefy extends StrictProtocolBinding<BeefyController> {
public Beefy(String protocolId, BeefyProtocol protocol) {
super(protocolId, protocol);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
package com.limechain.network.protocol.beefy;

import io.libp2p.core.Stream;

/**
* A controller for sending message on a BEEFY stream
*/
public class BeefyController {

protected final Stream stream;
protected BeefyEngine engine = new BeefyEngine();

public BeefyController(Stream stream) {
this.stream = stream;
}

/**
* Sends a handshake message over the controller stream.
*/
public void sendHandshake() {
engine.writeHandshakeToStream(stream, stream.remotePeerId());
}

public void sendVoteMessage() {
//TODO
}
}
Loading
Loading