Skip to content

Commit

Permalink
Add listener readiness handling
Browse files Browse the repository at this point in the history
  • Loading branch information
flcl42 committed Oct 12, 2023
1 parent c7caeac commit 4efe0fc
Show file tree
Hide file tree
Showing 5 changed files with 40 additions and 10 deletions.
3 changes: 3 additions & 0 deletions src/libp2p/Libp2p.Core/IPeerContext.cs
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,12 @@ public interface IPeerContext
IChannelRequest? SpecificProtocolRequest { get; set; }

event RemotePeerConnected OnRemotePeerConnection;
event ListenerReady OnListenerReady;

void Connected(IPeer peer);
void ListenerReady();
#endregion
}

public delegate void RemotePeerConnected(IRemotePeer peer);
public delegate void ListenerReady();
11 changes: 9 additions & 2 deletions src/libp2p/Libp2p.Core/PeerContext.cs
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,6 @@ public class PeerContext : IPeerContext
public Multiaddr RemoteEndpoint { get; set; }
public Multiaddr LocalEndpoint { get; set; }
public BlockingCollection<IChannelRequest> SubDialRequests { get; set; } = new();

public event RemotePeerConnected? OnRemotePeerConnection;
public IChannelRequest? SpecificProtocolRequest { get; set; }

public IPeerContext Fork()
Expand All @@ -24,8 +22,17 @@ public IPeerContext Fork()
return result;
}



public event RemotePeerConnected? OnRemotePeerConnection;
public void Connected(IPeer peer)
{
OnRemotePeerConnection?.Invoke((IRemotePeer)peer);
}

public event ListenerReady? OnListenerReady;
public void ListenerReady()
{
OnListenerReady?.Invoke();
}
}
27 changes: 20 additions & 7 deletions src/libp2p/Libp2p.Core/PeerFactory.cs
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ public void Setup(IProtocol protocol, IChannelFactory upChannelFactory)
_upChannelFactory = upChannelFactory;
}

private Task<IListener> ListenAsync(LocalPeer peer, Multiaddr addr, CancellationToken token)
private async Task<IListener> ListenAsync(LocalPeer peer, Multiaddr addr, CancellationToken token)
{
peer.Address = addr;
if (!peer.Address.Has(Enums.Multiaddr.P2p))
Expand All @@ -51,16 +51,28 @@ private Task<IListener> ListenAsync(LocalPeer peer, Multiaddr addr, Cancellation
token.Register(() => chan.CloseAsync());
}

PeerContext peerCtx = new()
TaskCompletionSource ts = new();


PeerContext peerContext = new()
{
Id = $"ctx-{++CtxId}",
LocalPeer = peer,
};
RemotePeer remotePeer = new(this, peer, peerCtx);
peerCtx.RemotePeer = remotePeer;

peerContext.OnListenerReady += OnListenerReady;

void OnListenerReady()
{
ts.SetResult();
peerContext.OnListenerReady -= OnListenerReady;
}

RemotePeer remotePeer = new(this, peer, peerContext);
peerContext.RemotePeer = remotePeer;

PeerListener result = new(chan, peer);
peerCtx.OnRemotePeerConnection += remotePeer =>
peerContext.OnRemotePeerConnection += remotePeer =>
{
if (((RemotePeer)remotePeer).LocalPeer != peer)
{
Expand All @@ -70,9 +82,10 @@ private Task<IListener> ListenAsync(LocalPeer peer, Multiaddr addr, Cancellation
ConnectedTo(remotePeer, false)
.ContinueWith(t => { result.RaiseOnConnection(remotePeer); }, token);
};
_ = _protocol.ListenAsync(chan, _upChannelFactory, peerCtx);
_ = _protocol.ListenAsync(chan, _upChannelFactory, peerContext);

return Task.FromResult((IListener)result);
await ts.Task;
return result;
}

protected virtual Task ConnectedTo(IRemotePeer peer, bool isDialer)
Expand Down
3 changes: 3 additions & 0 deletions src/libp2p/Libp2p.Protocols.IpTcp/IpTcpProtocol.cs
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,9 @@ public async Task ListenAsync(IChannel channel, IChannelFactory? channelFactory,
srv.Bind(new IPEndPoint(ipAddress, tcpPort));
srv.Listen(tcpPort);

_logger?.LogDebug("Ready to handle connections");
context.ListenerReady();

IPEndPoint localIpEndpoint = (IPEndPoint)srv.LocalEndPoint!;
channel.OnClose(() =>
{
Expand Down
6 changes: 5 additions & 1 deletion src/libp2p/Libp2p.Protocols.Quic/QuicProtocol.cs
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,9 @@ public async Task ListenAsync(IChannel channel, IChannelFactory? channelFactory,
await listener.DisposeAsync();
});

_logger?.LogDebug("Ready to handle connections");
context.ListenerReady();

while (!channel.IsClosed)
{
QuicConnection connection = await listener.AcceptConnectionAsync(channel.Token);
Expand Down Expand Up @@ -131,6 +134,7 @@ public async Task DialAsync(IChannel channel, IChannelFactory? channelFactory, I
MaxInboundBidirectionalStreams = 100,
ClientAuthenticationOptions = new SslClientAuthenticationOptions
{
TargetHost = null,
ApplicationProtocols = protocols,
RemoteCertificateValidationCallback = (_, c, _, _) => VerifyRemoteCertificate(context.RemotePeer, c),
ClientCertificates = new X509CertificateCollection { CertificateHelper.CertificateFromIdentity(_sessionKey, context.LocalPeer.Identity) },
Expand All @@ -139,7 +143,7 @@ public async Task DialAsync(IChannel channel, IChannelFactory? channelFactory, I
};

QuicConnection connection = await QuicConnection.ConnectAsync(clientConnectionOptions);

channel.OnClose(async () =>
{
await connection.CloseAsync(0);
Expand Down

0 comments on commit 4efe0fc

Please sign in to comment.