Skip to content

Commit

Permalink
Improve QUIC and tcp/ip IP and port resolution (#51)
Browse files Browse the repository at this point in the history
  • Loading branch information
flcl42 authored Oct 18, 2023
1 parent b0da90a commit 8aed3e2
Show file tree
Hide file tree
Showing 6 changed files with 93 additions and 57 deletions.
3 changes: 3 additions & 0 deletions src/libp2p/Libp2p.Core/IPeerContext.cs
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,12 @@ public interface IPeerContext
IChannelRequest? SpecificProtocolRequest { get; set; }

event RemotePeerConnected OnRemotePeerConnection;
event ListenerReady OnListenerReady;

void Connected(IPeer peer);
void ListenerReady();
#endregion
}

public delegate void RemotePeerConnected(IRemotePeer peer);
public delegate void ListenerReady();
11 changes: 9 additions & 2 deletions src/libp2p/Libp2p.Core/PeerContext.cs
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,6 @@ public class PeerContext : IPeerContext
public Multiaddr RemoteEndpoint { get; set; }
public Multiaddr LocalEndpoint { get; set; }
public BlockingCollection<IChannelRequest> SubDialRequests { get; set; } = new();

public event RemotePeerConnected? OnRemotePeerConnection;
public IChannelRequest? SpecificProtocolRequest { get; set; }

public IPeerContext Fork()
Expand All @@ -24,8 +22,17 @@ public IPeerContext Fork()
return result;
}



public event RemotePeerConnected? OnRemotePeerConnection;
public void Connected(IPeer peer)
{
OnRemotePeerConnection?.Invoke((IRemotePeer)peer);
}

public event ListenerReady? OnListenerReady;
public void ListenerReady()
{
OnListenerReady?.Invoke();
}
}
27 changes: 20 additions & 7 deletions src/libp2p/Libp2p.Core/PeerFactory.cs
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ public void Setup(IProtocol protocol, IChannelFactory upChannelFactory)
_upChannelFactory = upChannelFactory;
}

private Task<IListener> ListenAsync(LocalPeer peer, Multiaddr addr, CancellationToken token)
private async Task<IListener> ListenAsync(LocalPeer peer, Multiaddr addr, CancellationToken token)
{
peer.Address = addr;
if (!peer.Address.Has(Enums.Multiaddr.P2p))
Expand All @@ -51,16 +51,28 @@ private Task<IListener> ListenAsync(LocalPeer peer, Multiaddr addr, Cancellation
token.Register(() => chan.CloseAsync());
}

PeerContext peerCtx = new()
TaskCompletionSource ts = new();


PeerContext peerContext = new()
{
Id = $"ctx-{++CtxId}",
LocalPeer = peer,
};
RemotePeer remotePeer = new(this, peer, peerCtx);
peerCtx.RemotePeer = remotePeer;

peerContext.OnListenerReady += OnListenerReady;

void OnListenerReady()
{
ts.SetResult();
peerContext.OnListenerReady -= OnListenerReady;
}

RemotePeer remotePeer = new(this, peer, peerContext);
peerContext.RemotePeer = remotePeer;

PeerListener result = new(chan, peer);
peerCtx.OnRemotePeerConnection += remotePeer =>
peerContext.OnRemotePeerConnection += remotePeer =>
{
if (((RemotePeer)remotePeer).LocalPeer != peer)
{
Expand All @@ -70,9 +82,10 @@ private Task<IListener> ListenAsync(LocalPeer peer, Multiaddr addr, Cancellation
ConnectedTo(remotePeer, false)
.ContinueWith(t => { result.RaiseOnConnection(remotePeer); }, token);
};
_ = _protocol.ListenAsync(chan, _upChannelFactory, peerCtx);
_ = _protocol.ListenAsync(chan, _upChannelFactory, peerContext);

return Task.FromResult((IListener)result);
await ts.Task;
return result;
}

protected virtual Task ConnectedTo(IRemotePeer peer, bool isDialer)
Expand Down
72 changes: 31 additions & 41 deletions src/libp2p/Libp2p.Protocols.IpTcp/IpTcpProtocol.cs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
using System.Net.Sockets;
using Nethermind.Libp2p.Core;
using Microsoft.Extensions.Logging;
using MultiaddrEnum = Nethermind.Libp2p.Core.Enums.Multiaddr;

namespace Nethermind.Libp2p.Protocols;

Expand All @@ -25,34 +26,36 @@ public async Task ListenAsync(IChannel channel, IChannelFactory? channelFactory,
{
_logger?.LogInformation("ListenAsync({contextId})", context.Id);

Socket srv = new(SocketType.Stream, ProtocolType.Tcp);
Multiaddr addr = context.LocalPeer.Address;
Core.Enums.Multiaddr ipProtocol = addr.Has(Core.Enums.Multiaddr.Ip4) ? Core.Enums.Multiaddr.Ip4 : Core.Enums.Multiaddr.Ip6;
MultiaddrEnum ipProtocol = addr.Has(MultiaddrEnum.Ip4) ? MultiaddrEnum.Ip4 : MultiaddrEnum.Ip6;
IPAddress ipAddress = IPAddress.Parse(addr.At(ipProtocol)!);
int tcpPort = int.Parse(addr.At(Core.Enums.Multiaddr.Tcp)!);
int tcpPort = int.Parse(addr.At(MultiaddrEnum.Tcp)!);

Socket srv = new(SocketType.Stream, ProtocolType.Tcp);
srv.Bind(new IPEndPoint(ipAddress, tcpPort));
srv.Listen(32767);
srv.Listen(tcpPort);

IPEndPoint localIpEndpoint = (IPEndPoint)srv.LocalEndPoint!;
channel.OnClose(() =>
{
srv.Close();
return Task.CompletedTask;
});
Core.Enums.Multiaddr newIpProtocol = localIpEndpoint.AddressFamily == AddressFamily.InterNetwork
? Core.Enums.Multiaddr.Ip4
: Core.Enums.Multiaddr.Ip6;

context.LocalEndpoint = Core.Multiaddr.From(newIpProtocol, localIpEndpoint.Address.ToString(),
Core.Enums.Multiaddr.Tcp,
localIpEndpoint.Port);
context.LocalEndpoint = Multiaddr.From(
ipProtocol, ipProtocol == MultiaddrEnum.Ip4 ?
localIpEndpoint.Address.MapToIPv4().ToString() :
localIpEndpoint.Address.MapToIPv6().ToString(),
MultiaddrEnum.Tcp, localIpEndpoint.Port);

if (tcpPort == 0)
{
context.LocalPeer.Address = context.LocalPeer.Address
.Replace(MultiaddrEnum.Tcp, localIpEndpoint.Port.ToString());
}

context.LocalPeer.Address = context.LocalPeer.Address.Replace(
context.LocalEndpoint.Has(Core.Enums.Multiaddr.Ip4) ? Core.Enums.Multiaddr.Ip4 : Core.Enums.Multiaddr.Ip6, newIpProtocol,
localIpEndpoint.Address.ToString())
.Replace(
Core.Enums.Multiaddr.Tcp,
localIpEndpoint.Port.ToString());
_logger?.LogDebug("Ready to handle connections");
context.ListenerReady();

await Task.Run(async () =>
{
Expand All @@ -62,22 +65,9 @@ await Task.Run(async () =>
IPeerContext clientContext = context.Fork();
IPEndPoint remoteIpEndpoint = (IPEndPoint)client.RemoteEndPoint!;

clientContext.RemoteEndpoint = Core.Multiaddr.From(
remoteIpEndpoint.AddressFamily == AddressFamily.InterNetwork
? Core.Enums.Multiaddr.Ip4
: Core.Enums.Multiaddr.Ip6, remoteIpEndpoint.Address.ToString(), Core.Enums.Multiaddr.Tcp,
remoteIpEndpoint.Port);
clientContext.LocalPeer.Address = context.LocalPeer.Address.Replace(
context.LocalEndpoint.Has(Core.Enums.Multiaddr.Ip4) ? Core.Enums.Multiaddr.Ip4 : Core.Enums.Multiaddr.Ip6, newIpProtocol,
localIpEndpoint.Address.ToString())
.Replace(
Core.Enums.Multiaddr.Tcp,
remoteIpEndpoint.Port.ToString());
clientContext.RemotePeer.Address = new Multiaddr()
.Append(remoteIpEndpoint.AddressFamily == AddressFamily.InterNetwork
? Core.Enums.Multiaddr.Ip4
: Core.Enums.Multiaddr.Ip6, remoteIpEndpoint.Address.ToString())
.Append(Core.Enums.Multiaddr.Tcp, remoteIpEndpoint.Port.ToString());
clientContext.RemoteEndpoint = clientContext.RemotePeer.Address = Multiaddr.From(
ipProtocol, remoteIpEndpoint.Address.ToString(),
MultiaddrEnum.Tcp, remoteIpEndpoint.Port);

IChannel chan = channelFactory.SubListen(clientContext);

Expand Down Expand Up @@ -131,9 +121,9 @@ public async Task DialAsync(IChannel channel, IChannelFactory channelFactory, IP
TaskCompletionSource<bool?> waitForStop = new(TaskCreationOptions.RunContinuationsAsynchronously);
Socket client = new(SocketType.Stream, ProtocolType.Tcp);
Multiaddr addr = context.RemotePeer.Address;
Core.Enums.Multiaddr ipProtocol = addr.Has(Core.Enums.Multiaddr.Ip4) ? Core.Enums.Multiaddr.Ip4 : Core.Enums.Multiaddr.Ip6;
MultiaddrEnum ipProtocol = addr.Has(MultiaddrEnum.Ip4) ? MultiaddrEnum.Ip4 : MultiaddrEnum.Ip6;
IPAddress ipAddress = IPAddress.Parse(addr.At(ipProtocol)!);
int tcpPort = int.Parse(addr.At(Core.Enums.Multiaddr.Tcp)!);
int tcpPort = int.Parse(addr.At(MultiaddrEnum.Tcp)!);
try
{
await client.ConnectAsync(new IPEndPoint(ipAddress, tcpPort), channel.Token);
Expand All @@ -148,15 +138,15 @@ public async Task DialAsync(IChannel channel, IChannelFactory channelFactory, IP
IPEndPoint localEndpoint = (IPEndPoint)client.LocalEndPoint!;
IPEndPoint remoteEndpoint = (IPEndPoint)client.RemoteEndPoint!;

context.RemoteEndpoint = Core.Multiaddr.From(
context.RemoteEndpoint = Multiaddr.From(
ipProtocol,
ipProtocol == Core.Enums.Multiaddr.Ip4 ? remoteEndpoint.Address.MapToIPv4() : remoteEndpoint.Address.MapToIPv6(),
Core.Enums.Multiaddr.Tcp, remoteEndpoint.Port);
context.LocalEndpoint = Core.Multiaddr.From(
ipProtocol == MultiaddrEnum.Ip4 ? remoteEndpoint.Address.MapToIPv4() : remoteEndpoint.Address.MapToIPv6(),
MultiaddrEnum.Tcp, remoteEndpoint.Port);
context.LocalEndpoint = Multiaddr.From(
ipProtocol,
ipProtocol == Core.Enums.Multiaddr.Ip4 ? localEndpoint.Address.MapToIPv4() : localEndpoint.Address.MapToIPv6(),
Core.Enums.Multiaddr.Tcp, localEndpoint.Port);
context.LocalPeer.Address = context.LocalEndpoint.Append(Core.Enums.Multiaddr.P2p, context.LocalPeer.Identity.PeerId.ToString());
ipProtocol == MultiaddrEnum.Ip4 ? localEndpoint.Address.MapToIPv4() : localEndpoint.Address.MapToIPv6(),
MultiaddrEnum.Tcp, localEndpoint.Port);
context.LocalPeer.Address = context.LocalEndpoint.Append(MultiaddrEnum.P2p, context.LocalPeer.Identity.PeerId.ToString());

IChannel upChannel = channelFactory.SubDial(context);
channel.Token.Register(() => upChannel.CloseAsync());
Expand Down
24 changes: 19 additions & 5 deletions src/libp2p/Libp2p.Protocols.Quic/QuicProtocol.cs
Original file line number Diff line number Diff line change
Expand Up @@ -75,11 +75,24 @@ public async Task ListenAsync(IChannel channel, IChannelFactory? channelFactory,
ConnectionOptionsCallback = (_, _, _) => ValueTask.FromResult(serverConnectionOptions)
});

context.LocalEndpoint = Multiaddr.From(
ipProtocol, listener.LocalEndPoint.Address.ToString(),
MultiaddrEnum.Udp, listener.LocalEndPoint.Port);

if (udpPort == 0)
{
context.LocalPeer.Address = context.LocalPeer.Address
.Replace(MultiaddrEnum.Udp, listener.LocalEndPoint.Port.ToString());
}

channel.OnClose(async () =>
{
await listener.DisposeAsync();
});

_logger?.LogDebug("Ready to handle connections");
context.ListenerReady();

while (!channel.IsClosed)
{
QuicConnection connection = await listener.AcceptConnectionAsync(channel.Token);
Expand Down Expand Up @@ -123,6 +136,7 @@ public async Task DialAsync(IChannel channel, IChannelFactory? channelFactory, I
MaxInboundBidirectionalStreams = 100,
ClientAuthenticationOptions = new SslClientAuthenticationOptions
{
TargetHost = null,
ApplicationProtocols = protocols,
RemoteCertificateValidationCallback = (_, c, _, _) => VerifyRemoteCertificate(context.RemotePeer, c),
ClientCertificates = new X509CertificateCollection { CertificateHelper.CertificateFromIdentity(_sessionKey, context.LocalPeer.Identity) },
Expand Down Expand Up @@ -159,11 +173,11 @@ private async Task ProcessStreams(QuicConnection connection, IPeerContext contex
connection.LocalEndPoint.Port);

context.LocalPeer.Address = context.LocalPeer.Address.Replace(
context.LocalEndpoint.Has(MultiaddrEnum.Ip4) ? MultiaddrEnum.Ip4 : MultiaddrEnum.Ip6, newIpProtocol,
connection.LocalEndPoint.Address.ToString())
.Replace(
MultiaddrEnum.Udp,
connection.LocalEndPoint.Port.ToString());
context.LocalEndpoint.Has(MultiaddrEnum.Ip4) ?
MultiaddrEnum.Ip4 :
MultiaddrEnum.Ip6,
newIpProtocol,
connection.LocalEndPoint.Address.ToString());

IPEndPoint remoteIpEndpoint = connection.RemoteEndPoint!;
newIpProtocol = remoteIpEndpoint.AddressFamily == AddressFamily.InterNetwork
Expand Down
13 changes: 11 additions & 2 deletions src/samples/chat/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,12 @@
if (args.Length > 0 && args[0] == "-d")
{
Multiaddr remoteAddr = args[1];
ILocalPeer localPeer = peerFactory.Create();

string addrTemplate = remoteAddr.Has(Nethermind.Libp2p.Core.Enums.Multiaddr.QuicV1) ?
"/ip4/0.0.0.0/udp/0/quic-v1" :
"/ip4/0.0.0.0/tcp/0";

ILocalPeer localPeer = peerFactory.Create(localAddr: addrTemplate);

logger.LogInformation("Dialing {0}", remoteAddr);
IRemotePeer remotePeer = await localPeer.DialAsync(remoteAddr, ts.Token);
Expand All @@ -38,8 +43,12 @@
Identity optionalFixedIdentity = new(Enumerable.Repeat((byte)42, 32).ToArray());
ILocalPeer peer = peerFactory.Create(optionalFixedIdentity);

string addrTemplate = args.Contains("-quic") ?
"/ip4/0.0.0.0/udp/{0}/quic-v1/p2p/{1}" :
"/ip4/0.0.0.0/tcp/{0}/p2p/{1}";

IListener listener = await peer.ListenAsync(
$"/ip4/0.0.0.0/udp/{(args.Length > 0 && args[0] == "-sp" ? args[1] : "0")}/quic-v1/p2p/{peer.Identity.PeerId}",
string.Format(addrTemplate, args.Length > 0 && args[0] == "-sp" ? args[1] : "0", peer.Identity.PeerId),
ts.Token);
logger.LogInformation($"Listener started at {listener.Address}");
listener.OnConnection += async remotePeer => logger.LogInformation($"A peer connected {remotePeer.Address}");
Expand Down

0 comments on commit 8aed3e2

Please sign in to comment.