diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml
index 5352e82e..413ad491 100644
--- a/.github/workflows/test.yml
+++ b/.github/workflows/test.yml
@@ -36,7 +36,11 @@ jobs:
TEST_OPTS: -c ${{ env.BUILD_CONFIG }} --no-restore
run: |
dotnet test Libp2p.Core.Tests ${{ env.PACK_OPTS }}
- #dotnet test Libp2p.Protocols.Multistream.Tests ${{ env.PACK_OPTS }}
+ dotnet test Libp2p.Protocols.Multistream.Tests ${{ env.PACK_OPTS }}
dotnet test Libp2p.Protocols.Noise.Tests ${{ env.PACK_OPTS }}
dotnet test Libp2p.Protocols.Pubsub.Tests ${{ env.PACK_OPTS }}
dotnet test Libp2p.Protocols.Quic.Tests ${{ env.PACK_OPTS }}
+ dotnet test Libp2p.Protocols.Yamux.Tests ${{ env.PACK_OPTS }}
+ dotnet test Libp2p.E2eTests ${{ env.PACK_OPTS }}
+ dotnet test Libp2p.Protocols.Pubsub.E2eTests ${{ env.PACK_OPTS }}
+ dotnet test Libp2p.Protocols.PubsubPeerDiscovery.E2eTests ${{ env.PACK_OPTS }}
diff --git a/README.md b/README.md
index cac0d299..e8d2b180 100644
--- a/README.md
+++ b/README.md
@@ -40,14 +40,14 @@ The target is to provide a performant well-tested implementation of a wide range
| Protocol | Version | Status |
|--------------------|--------------------|-----------------|
| TCP | tcp | ✅ |
-| QUIC | quic-v1 | ✅ |
+| QUIC | quic-v1 | 🚧 |
| multistream-select | /multistream/1.0.0 | ✅ |
| plaintext | /plaintext/2.0.0 | ✅ |
| noise | /noise | ✅ |
| tls | /tls/1.0.0 | 🚧 |
| WebTransport | | ⬜ help wanted |
| yamux | /yamux/1.0.0 | ✅ |
-| Circuit Relay | /libp2p/circuit/relay/0.2.0/* | ⬜ help wanted |
+| Circuit Relay | /libp2p/circuit/relay/0.2.0/* | 🚧 |
| hole punching | | ⬜ help wanted |
| **Application layer**
| Identify | /ipfs/id/1.0.0 | ✅ |
@@ -55,11 +55,11 @@ The target is to provide a performant well-tested implementation of a wide range
| pubsub | /floodsub/1.0.0 | ✅ |
| | /meshsub/1.0.0 | ✅ |
| | /meshsub/1.1.0 | 🚧 |
-| | /meshsub/1.2.0 | ⬜ |
+| | /meshsub/1.2.0 | 🚧 |
| **Discovery**
| mDns | basic | ✅ |
| | DNS-SD | 🚧 |
-| [discv5](https://github.com/Pier-Two/Lantern.Discv5) | 5.1 | 🚧 help wanted |
+| [discv5](https://github.com/Pier-Two/Lantern.Discv5) (wrapper) | 5.1 | 🚧 help wanted |
⬜ - not yet implemented
🚧 - work in progress
diff --git a/src/libp2p/Directory.Packages.props b/src/libp2p/Directory.Packages.props
index 9e92e867..40d639fe 100644
--- a/src/libp2p/Directory.Packages.props
+++ b/src/libp2p/Directory.Packages.props
@@ -23,7 +23,7 @@
-
+
@@ -34,4 +34,4 @@
-
\ No newline at end of file
+
diff --git a/src/libp2p/Libp2p.Core.Tests/TaskHelperTests.cs b/src/libp2p/Libp2p.Core.Tests/TaskHelperTests.cs
new file mode 100644
index 00000000..772740ff
--- /dev/null
+++ b/src/libp2p/Libp2p.Core.Tests/TaskHelperTests.cs
@@ -0,0 +1,52 @@
+// SPDX-FileCopyrightText: 2024 Demerzel Solutions Limited
+// SPDX-License-Identifier: MIT
+
+using Nethermind.Libp2p.Core.Extensions;
+
+namespace Nethermind.Libp2p.Core.Tests;
+internal class TaskHelperTests
+{
+
+ [Test]
+ public async Task Test_AllExceptions_RaiseAggregateException()
+ {
+ TaskCompletionSource tcs1 = new();
+ TaskCompletionSource tcs2 = new();
+ TaskCompletionSource tcs3 = new();
+
+ Task t = TaskHelper.FirstSuccess(tcs1.Task, tcs2.Task, tcs3.Task);
+
+ tcs1.SetException(new Exception());
+ tcs2.SetException(new Exception());
+ tcs3.SetException(new Exception());
+
+ await t.ContinueWith((t) =>
+ {
+ Assert.Multiple(() =>
+ {
+ Assert.That(t.IsFaulted, Is.True);
+ Assert.That(t.Exception?.InnerException, Is.TypeOf());
+ Assert.That((t.Exception?.InnerException as AggregateException)?.InnerExceptions, Has.Count.EqualTo(3));
+ });
+ });
+ }
+
+ [Test]
+ public async Task Test_SingleSuccess_ReturnsCompletedTask()
+ {
+ TaskCompletionSource tcs1 = new();
+ TaskCompletionSource tcs2 = new();
+ TaskCompletionSource tcs3 = new();
+
+ Task t = TaskHelper.FirstSuccess(tcs1.Task, tcs2.Task, tcs3.Task);
+
+ tcs1.SetException(new Exception());
+ tcs2.SetException(new Exception());
+ _ = Task.Delay(100).ContinueWith(t => tcs3.SetResult(true));
+
+ Task result = await t;
+
+ Assert.That(result, Is.EqualTo(tcs3.Task));
+ Assert.That((result as Task)!.Result, Is.EqualTo(true));
+ }
+}
diff --git a/src/libp2p/Libp2p.Core.TestsBase/E2e/ChannelBus.cs b/src/libp2p/Libp2p.Core.TestsBase/E2e/ChannelBus.cs
index e7db3c7d..ef083406 100644
--- a/src/libp2p/Libp2p.Core.TestsBase/E2e/ChannelBus.cs
+++ b/src/libp2p/Libp2p.Core.TestsBase/E2e/ChannelBus.cs
@@ -2,7 +2,6 @@
// SPDX-License-Identifier: MIT
using Microsoft.Extensions.Logging;
-using Newtonsoft.Json;
using System.Threading.Channels;
namespace Nethermind.Libp2p.Core.TestsBase.E2e;
@@ -30,7 +29,7 @@ public async IAsyncEnumerable GetIncomingRequests(PeerId serverId)
logger?.LogDebug($"Listen {serverId}");
- await foreach (var item in col.Reader.ReadAllAsync())
+ await foreach (ClientChannel item in col.Reader.ReadAllAsync())
{
logger?.LogDebug($"New request from {item.Client} to {serverId}");
yield return item.Channel;
diff --git a/src/libp2p/Libp2p.Core.TestsBase/E2e/TestBuilder.cs b/src/libp2p/Libp2p.Core.TestsBase/E2e/TestBuilder.cs
index 91f51f44..7912768d 100644
--- a/src/libp2p/Libp2p.Core.TestsBase/E2e/TestBuilder.cs
+++ b/src/libp2p/Libp2p.Core.TestsBase/E2e/TestBuilder.cs
@@ -1,13 +1,45 @@
// SPDX-FileCopyrightText: 2024 Demerzel Solutions Limited
// SPDX-License-Identifier: MIT
+using Microsoft.Extensions.Logging;
+using Nethermind.Libp2p.Core.Discovery;
+using Nethermind.Libp2p.Protocols;
+using System.Collections.Concurrent;
+
namespace Nethermind.Libp2p.Core.TestsBase.E2e;
-public class TestBuilder(ChannelBus? commmonBus = null, IServiceProvider? serviceProvider = null) : PeerFactoryBuilderBase(serviceProvider)
+public class TestBuilder(IServiceProvider? serviceProvider = null) : PeerFactoryBuilderBase(serviceProvider)
+{
+ protected override ProtocolRef[] BuildStack(IEnumerable additionalProtocols)
+ {
+ ProtocolRef root = Get();
+
+ Connect([root],
+ [
+ Get(),
+ Get(),
+ .. additionalProtocols
+ ]);
+
+ return [root];
+ }
+}
+
+public class TestPeerFactory(IProtocolStackSettings protocolStackSettings, PeerStore peerStore, ILoggerFactory? loggerFactory = null) : PeerFactory(protocolStackSettings, peerStore)
+{
+ ConcurrentDictionary peers = new();
+
+ public override ILocalPeer Create(Identity? identity = default)
+ {
+ ArgumentNullException.ThrowIfNull(identity);
+ return peers.GetOrAdd(identity.PeerId, (p) => new TestLocalPeer(identity, protocolStackSettings, peerStore, loggerFactory));
+ }
+}
+
+internal class TestLocalPeer(Identity id, IProtocolStackSettings protocolStackSettings, PeerStore peerStore, ILoggerFactory? loggerFactory = null) : LocalPeer(id, peerStore, protocolStackSettings, loggerFactory)
{
- protected override ProtocolStack BuildStack()
+ protected override async Task ConnectedTo(ISession session, bool isDialer)
{
- return Over(new TestMuxerProtocol(commmonBus ?? new ChannelBus(), new TestContextLoggerFactory()))
- .AddAppLayerProtocol();
+ await session.DialAsync();
}
}
diff --git a/src/libp2p/Libp2p.Core.TestsBase/E2e/TestLocalPeer.cs b/src/libp2p/Libp2p.Core.TestsBase/E2e/TestLocalPeer.cs
deleted file mode 100644
index d09db191..00000000
--- a/src/libp2p/Libp2p.Core.TestsBase/E2e/TestLocalPeer.cs
+++ /dev/null
@@ -1,22 +0,0 @@
-// SPDX-FileCopyrightText: 2024 Demerzel Solutions Limited
-// SPDX-License-Identifier: MIT
-
-using Multiformats.Address;
-
-namespace Nethermind.Libp2p.Core.TestsBase.E2e;
-
-internal class TestLocalPeer(Identity id) : ILocalPeer
-{
- public Identity Identity { get => id; set => throw new NotImplementedException(); }
- public Multiaddress Address { get => $"/p2p/{id.PeerId}"; set => throw new NotImplementedException(); }
-
- public Task DialAsync(Multiaddress addr, CancellationToken token = default)
- {
- throw new NotImplementedException();
- }
-
- public Task ListenAsync(Multiaddress addr, CancellationToken token = default)
- {
- throw new NotImplementedException();
- }
-}
diff --git a/src/libp2p/Libp2p.Core.TestsBase/E2e/TestMuxerProtocol.cs b/src/libp2p/Libp2p.Core.TestsBase/E2e/TestMuxerProtocol.cs
index 4ead3772..10fd051d 100644
--- a/src/libp2p/Libp2p.Core.TestsBase/E2e/TestMuxerProtocol.cs
+++ b/src/libp2p/Libp2p.Core.TestsBase/E2e/TestMuxerProtocol.cs
@@ -1,84 +1,115 @@
using Google.Protobuf;
using Microsoft.Extensions.Logging;
+using Multiformats.Address;
using Nethermind.Libp2p.Core;
+using Nethermind.Libp2p.Core.Dto;
+using Nethermind.Libp2p.Core.Exceptions;
using Nethermind.Libp2p.Core.TestsBase.Dto;
using Nethermind.Libp2p.Core.TestsBase.E2e;
using Org.BouncyCastle.Utilities.Encoders;
using System.Buffers;
-class TestMuxerProtocol(ChannelBus bus, ILoggerFactory? loggerFactory = null) : IProtocol
+class TestMuxerProtocol(ChannelBus bus, ILoggerFactory? loggerFactory = null) : ITransportProtocol
{
private const string id = "test-muxer";
private readonly ILogger? logger = loggerFactory?.CreateLogger(id);
public string Id => id;
+ public static Multiaddress[] GetDefaultAddresses(PeerId peerId) => [$"/p2p/{peerId}"];
+ public static bool IsAddressMatch(Multiaddress addr) => true;
- public async Task DialAsync(IChannel downChannel, IChannelFactory? upChannelFactory, IPeerContext context)
+ public async Task DialAsync(ITransportContext context, Multiaddress remoteAddr, CancellationToken token)
{
- logger?.LogDebug($"{context.LocalPeer.Identity.PeerId}: Dial async");
- context.Connected(context.RemotePeer);
- await Task.Run(() => HandleRemote(bus.Dial(context.LocalPeer.Identity.PeerId, context.RemotePeer.Address.GetPeerId()!), upChannelFactory!, context));
+ logger?.LogDebug($"{context.Peer.Identity.PeerId}: Dial async");
+
+ //await Task.Run(async () =>
+ //{
+ IChannel chan = bus.Dial(context.Peer.Identity.PeerId, remoteAddr.GetPeerId()!);
+ using INewConnectionContext connection = context.CreateConnection();
+ connection.State.RemoteAddress = remoteAddr;
+
+ await HandleRemote(chan, connection, context);
+ //});
}
- public async Task ListenAsync(IChannel downChannel, IChannelFactory? upChannelFactory, IPeerContext context)
+ public async Task ListenAsync(ITransportContext context, Multiaddress listenAddr, CancellationToken token)
{
- context.ListenerReady();
- logger?.LogDebug($"{context.LocalPeer.Identity.PeerId}: Listen async");
- await foreach (var item in bus.GetIncomingRequests(context.LocalPeer.Identity.PeerId))
+ context.ListenerReady(listenAddr);
+ logger?.LogDebug($"{context.Peer.Identity.PeerId}: Listen async");
+ await foreach (IChannel item in bus.GetIncomingRequests(context.Peer.Identity.PeerId))
{
- logger?.LogDebug($"{context.LocalPeer.Identity.PeerId}: Listener handles new con");
- _ = HandleRemote(item, upChannelFactory!, context, true);
+ _ = Task.Run(async () =>
+ {
+ INewConnectionContext connection = context.CreateConnection();
+ logger?.LogDebug($"{context.Peer.Identity.PeerId}: Listener handles new con");
+ try
+ {
+ await HandleRemote(item, connection, context, true);
+ }
+ catch (SessionExistsException)
+ {
+ logger?.LogDebug($"{context.Peer.Identity.PeerId}: Listener rejected inititation of a redundant session");
+ }
+ catch (Exception e)
+ {
+ logger?.LogError(e, $"{context.Peer.Identity.PeerId}: Listener exception");
+ }
+ }, token);
}
}
- private async Task HandleRemote(IChannel downChannel, IChannelFactory upChannelFactory, IPeerContext context, bool isListen = false)
+ private async Task HandleRemote(IChannel downChannel, INewConnectionContext connection, ITransportContext context, bool isListen = false)
{
uint counter = isListen ? 1u : 0u;
Dictionary chans = [];
- string peer = "";
- context = context.Fork();
+ PublicKey? remotePublicKey;
+ PeerId? remotePeerId;
if (isListen)
{
- peer = await downChannel.ReadLineAsync();
- await downChannel.WriteLineAsync(context.LocalPeer.Identity.PeerId!.ToString());
- logger?.LogDebug($"{context.LocalPeer.Identity.PeerId}: Listener handles remote {peer}");
+ remotePublicKey = await downChannel.ReadPrefixedProtobufAsync(PublicKey.Parser);
+ remotePeerId = new PeerId(remotePublicKey);
+ await downChannel.WriteSizeAndProtobufAsync(context.Peer.Identity.PublicKey);
+ logger?.LogDebug($"{context.Peer.Identity.PeerId}: Listener handles remote {remotePeerId}");
}
else
{
- await downChannel.WriteLineAsync(context.LocalPeer.Identity.PeerId!.ToString());
- peer = await downChannel.ReadLineAsync();
- logger?.LogDebug($"{context.LocalPeer.Identity.PeerId}: Dialer handles remote {peer}");
+ await downChannel.WriteSizeAndProtobufAsync(context.Peer.Identity.PublicKey);
+ remotePublicKey = await downChannel.ReadPrefixedProtobufAsync(PublicKey.Parser);
+ remotePeerId = new PeerId(remotePublicKey);
+ logger?.LogDebug($"{context.Peer.Identity.PeerId}: Dialer handles remote {remotePeerId}");
}
- context.RemotePeer.Address = $"/p2p/{peer}";
+ connection.State.RemotePublicKey = remotePublicKey;
+ connection.State.RemoteAddress = $"/p2p/{remotePeerId}";
+ using INewSessionContext? session = connection.UpgradeToSession();
- string logPrefix = $"{context.LocalPeer.Identity.PeerId}<>{peer}";
+ string logPrefix = $"{context.Peer.Identity.PeerId}<>{remotePeerId}";
- _ = Task.Run(async () =>
+ _ = Task.Run(() =>
{
- foreach (var item in context.SubDialRequests.GetConsumingEnumerable())
+ foreach (UpgradeOptions item in session.DialRequests)
{
uint chanId = Interlocked.Add(ref counter, 2);
- logger?.LogDebug($"{context.LocalPeer.Identity.PeerId}({chanId}): Sub-request {item.SubProtocol} {item.CompletionSource is not null} from {context.RemotePeer.Address.GetPeerId()}");
+ logger?.LogDebug($"{context.Peer.Identity.PeerId}({chanId}): Sub-request {item.SelectedProtocol} {item.CompletionSource is not null} to call {connection.State.RemoteAddress.GetPeerId()}");
- chans[chanId] = new MuxerChannel { Tcs = item.CompletionSource };
- var response = new MuxerPacket()
+ chans[chanId] = new MuxerChannel { Tcs = item.CompletionSource, Argument = item.Argument };
+ MuxerPacket response = new()
{
ChannelId = chanId,
Type = MuxerPacketType.NewStreamRequest,
- Protocols = { item.SubProtocol!.Id }
+ Protocols = { item.SelectedProtocol!.Id }
};
logger?.LogDebug($"{logPrefix}({response.ChannelId}): > Packet {response.Type} {string.Join(",", response.Protocols)} {response.Data?.Length ?? 0}");
_ = downChannel.WriteSizeAndProtobufAsync(response);
}
- logger?.LogDebug($"{context.LocalPeer.Identity.PeerId}: SubDialRequests End");
-
+ logger?.LogDebug($"{context.Peer.Identity.PeerId}: SubDialRequests End");
+ return Task.CompletedTask;
});
while (true)
@@ -87,7 +118,7 @@ private async Task HandleRemote(IChannel downChannel, IChannelFactory upChannelF
{
logger?.LogDebug($"{logPrefix}: < READY({(isListen ? "list" : "dial")})");
- var packet = await downChannel.ReadPrefixedProtobufAsync(MuxerPacket.Parser);
+ MuxerPacket packet = await downChannel.ReadPrefixedProtobufAsync(MuxerPacket.Parser);
logger?.LogDebug($"{logPrefix}({packet.ChannelId}): < Packet {packet.Type} {string.Join(",", packet.Protocols)} {packet.Data?.Length ?? 0}");
@@ -95,15 +126,15 @@ private async Task HandleRemote(IChannel downChannel, IChannelFactory upChannelF
{
case MuxerPacketType.NewStreamRequest:
IProtocol? selected = null;
- foreach (var proto in packet.Protocols)
+ foreach (string? proto in packet.Protocols)
{
- selected = upChannelFactory.SubProtocols.FirstOrDefault(x => x.Id == proto);
+ selected = session.SubProtocols.FirstOrDefault(x => x.Id == proto);
if (selected is not null) break;
}
if (selected is not null)
{
logger?.LogDebug($"{logPrefix}({packet.ChannelId}): Matched {selected}");
- var response = new MuxerPacket()
+ MuxerPacket response = new()
{
ChannelId = packet.ChannelId,
Type = MuxerPacketType.NewStreamResponse,
@@ -113,9 +144,9 @@ private async Task HandleRemote(IChannel downChannel, IChannelFactory upChannelF
}
};
- var req = new ChannelRequest { SubProtocol = selected };
+ UpgradeOptions req = new() { SelectedProtocol = selected, ModeOverride = UpgradeModeOverride.Listen };
- IChannel upChannel = upChannelFactory.SubListen(context, req);
+ IChannel upChannel = session.Upgrade(selected, req);
chans[packet.ChannelId] = new MuxerChannel { UpChannel = upChannel };
_ = HandleUpchannelData(downChannel, chans, packet.ChannelId, upChannel, logPrefix);
@@ -127,7 +158,7 @@ private async Task HandleRemote(IChannel downChannel, IChannelFactory upChannelF
{
logger?.LogDebug($"{logPrefix}({packet.ChannelId}): No match {packet.Type} {string.Join(",", packet.Protocols)} {packet.Data?.Length ?? 0}");
- var response = new MuxerPacket()
+ MuxerPacket response = new()
{
ChannelId = packet.ChannelId,
Type = MuxerPacketType.NewStreamResponse,
@@ -141,10 +172,10 @@ private async Task HandleRemote(IChannel downChannel, IChannelFactory upChannelF
case MuxerPacketType.NewStreamResponse:
if (packet.Protocols.Any())
{
- var req = new ChannelRequest { SubProtocol = upChannelFactory.SubProtocols.FirstOrDefault(x => x.Id == packet.Protocols.First()) };
- IChannel upChannel = upChannelFactory.SubDial(context, req);
+ UpgradeOptions req = new() { SelectedProtocol = session.SubProtocols.FirstOrDefault(x => x.Id == packet.Protocols.First()), CompletionSource = chans[packet.ChannelId].Tcs, Argument = chans[packet.ChannelId].Argument, ModeOverride = UpgradeModeOverride.Dial };
+ IChannel upChannel = session.Upgrade(session.SubProtocols.FirstOrDefault(x => x.Id == packet.Protocols.First())!, req);
chans[packet.ChannelId].UpChannel = upChannel;
- logger?.LogDebug($"{logPrefix}({packet.ChannelId}): Start upchanel with {req.SubProtocol}");
+ logger?.LogDebug($"{logPrefix}({packet.ChannelId}): Start upchanel with {req.SelectedProtocol}");
_ = HandleUpchannelData(downChannel, chans, packet.ChannelId, upChannel, logPrefix);
}
else
@@ -153,23 +184,38 @@ private async Task HandleRemote(IChannel downChannel, IChannelFactory upChannelF
}
break;
case MuxerPacketType.Data:
- logger?.LogDebug($"{logPrefix}({packet.ChannelId}): Data to upchanel {packet.Data?.Length ?? 0} {Hex.ToHexString(packet.Data?.ToByteArray() ?? [])}");
- _ = chans[packet.ChannelId].UpChannel!.WriteAsync(new ReadOnlySequence(packet.Data.ToByteArray()));
+ if (packet.Data is null or [])
+ {
+ logger?.LogWarning($"{logPrefix}({packet.ChannelId}): Empty data received");
+ break;
+ }
+ logger?.LogDebug($"{logPrefix}({packet.ChannelId}): Data to upchanel {packet.Data.Length} {Hex.ToHexString(packet.Data.ToByteArray())}");
+ _ = chans.GetValueOrDefault(packet.ChannelId)?.UpChannel?.WriteAsync(new ReadOnlySequence(packet.Data.ToByteArray()));
break;
case MuxerPacketType.CloseWrite:
logger?.LogDebug($"{logPrefix}({packet.ChannelId}): Remote EOF");
- chans[packet.ChannelId].RemoteClosedWrites = true;
- _ = chans[packet.ChannelId].UpChannel!.WriteEofAsync();
+ lock (chans[packet.ChannelId])
+ {
+ chans[packet.ChannelId].RemoteClosedWrites = true;
+
+ _ = chans[packet.ChannelId].UpChannel?.WriteEofAsync();
+
+ if (chans[packet.ChannelId].LocalClosedWrites)
+ {
+ //chans[packet.ChannelId].Tcs?.SetResult(null);
+ _ = chans[packet.ChannelId].UpChannel?.CloseAsync();
+ chans.Remove(packet.ChannelId);
+ }
+ }
break;
default:
break;
}
}
- catch
+ catch (Exception e)
{
-
-
+ logger?.LogError(e, $"{logPrefix}: Muxer listener exception");
}
}
@@ -181,12 +227,12 @@ private Task HandleUpchannelData(IChannel downChannel, Dictionary item in upChannel.ReadAllAsync())
{
- var data = item.ToArray();
+ byte[] data = item.ToArray();
logger?.LogDebug($"{logPrefix}({channelId}): Upchannel data {data.Length} {Hex.ToHexString(data, false)}");
- var packet = new MuxerPacket()
+ MuxerPacket packet = new()
{
ChannelId = channelId,
Type = MuxerPacketType.Data,
@@ -197,23 +243,31 @@ private Task HandleUpchannelData(IChannel downChannel, Dictionary Packet {packet.Type} {string.Join(",", packet.Protocols)} {packet.Data?.Length ?? 0}");
+ logger?.LogDebug($"{logPrefix}({channelId}): Upchannel write close");
- _ = downChannel.WriteSizeAndProtobufAsync(packet);
+ {
+ MuxerPacket packet = new()
+ {
+ ChannelId = channelId,
+ Type = MuxerPacketType.CloseWrite,
+ };
+
+ logger?.LogDebug($"{logPrefix}({packet.ChannelId}): > Packet {packet.Type} {string.Join(",", packet.Protocols)} {packet.Data?.Length ?? 0}");
+
+ _ = downChannel.WriteSizeAndProtobufAsync(packet);
+ }
}
}
catch
@@ -226,7 +280,9 @@ private Task HandleUpchannelData(IChannel downChannel, Dictionary? Tcs { get; set; }
public bool RemoteClosedWrites { get; set; }
+ public bool LocalClosedWrites { get; set; }
+ public object? Argument { get; internal set; }
}
}
diff --git a/src/libp2p/Libp2p.Core.TestsBase/E2e/TestMuxerTests.cs b/src/libp2p/Libp2p.Core.TestsBase/E2e/TestMuxerTests.cs
index b5593514..496fe013 100644
--- a/src/libp2p/Libp2p.Core.TestsBase/E2e/TestMuxerTests.cs
+++ b/src/libp2p/Libp2p.Core.TestsBase/E2e/TestMuxerTests.cs
@@ -11,20 +11,21 @@ internal class TestMuxerTests
[Test]
public async Task Test_ConnectionEstablished_AfterHandshake()
{
- ServiceProvider sp = new ServiceCollection()
- .AddSingleton(sp => new TestBuilder(null, sp))
+ ChannelBus channelBus = new();
+ ServiceProvider MakeServiceProvider() => new ServiceCollection()
+ .AddSingleton(sp => new TestBuilder(sp))
+ .AddSingleton()
.AddSingleton()
+ .AddSingleton(channelBus)
.AddSingleton(sp => sp.GetService()!.Build())
.BuildServiceProvider();
- IPeerFactory peerFactory = sp.GetService()!;
+ ILocalPeer peerA = MakeServiceProvider().GetRequiredService().Create(TestPeers.Identity(1));
+ await peerA.StartListenAsync();
+ ILocalPeer peerB = MakeServiceProvider().GetRequiredService().Create(TestPeers.Identity(2));
+ await peerB.StartListenAsync();
- ILocalPeer peerA = peerFactory.Create(TestPeers.Identity(1));
- await peerA.ListenAsync(TestPeers.Multiaddr(1));
- ILocalPeer peerB = peerFactory.Create(TestPeers.Identity(2));
- await peerB.ListenAsync(TestPeers.Multiaddr(2));
-
- IRemotePeer remotePeerB = await peerA.DialAsync(peerB.Address);
+ ISession remotePeerB = await peerA.DialAsync(TestPeers.Multiaddr(2));
await remotePeerB.DialAsync();
}
}
diff --git a/src/libp2p/Libp2p.Core.TestsBase/E2e/TestPeerFactory.cs b/src/libp2p/Libp2p.Core.TestsBase/E2e/TestPeerFactory.cs
deleted file mode 100644
index dcfcaebb..00000000
--- a/src/libp2p/Libp2p.Core.TestsBase/E2e/TestPeerFactory.cs
+++ /dev/null
@@ -1,18 +0,0 @@
-// SPDX-FileCopyrightText: 2024 Demerzel Solutions Limited
-// SPDX-License-Identifier: MIT
-
-using Multiformats.Address;
-using System.Collections.Concurrent;
-
-namespace Nethermind.Libp2p.Core.TestsBase.E2e;
-
-internal class TestPeerFactory(IServiceProvider serviceProvider) : PeerFactory(serviceProvider)
-{
- ConcurrentDictionary peers = new();
-
- public override ILocalPeer Create(Identity? identity = null, Multiaddress? localAddr = null)
- {
- ArgumentNullException.ThrowIfNull(identity);
- return peers.GetOrAdd(identity.PeerId, (p) => new TestLocalPeer(identity));
- }
-}
diff --git a/src/libp2p/Libp2p.Core.TestsBase/E2e/TestPingProtocol.cs b/src/libp2p/Libp2p.Core.TestsBase/E2e/TestPingProtocol.cs
index 0a735853..6d4438d6 100644
--- a/src/libp2p/Libp2p.Core.TestsBase/E2e/TestPingProtocol.cs
+++ b/src/libp2p/Libp2p.Core.TestsBase/E2e/TestPingProtocol.cs
@@ -4,11 +4,11 @@
using NUnit.Framework;
namespace Nethermind.Libp2p.Core.TestsBase.E2e;
-class TestPingProtocol : IProtocol
+class TestPingProtocol : ISessionProtocol
{
public string Id => "test-ping";
- public async Task DialAsync(IChannel downChannel, IChannelFactory? upChannelFactory, IPeerContext context)
+ public async Task DialAsync(IChannel downChannel, ISessionContext context)
{
string str = "hello";
await downChannel.WriteLineAsync(str);
@@ -16,7 +16,7 @@ public async Task DialAsync(IChannel downChannel, IChannelFactory? upChannelFact
Assert.That(res, Is.EqualTo(str + " there"));
}
- public async Task ListenAsync(IChannel downChannel, IChannelFactory? upChannelFactory, IPeerContext context)
+ public async Task ListenAsync(IChannel downChannel, ISessionContext context)
{
string str = await downChannel.ReadLineAsync();
await downChannel.WriteLineAsync(str + " there");
diff --git a/src/libp2p/Libp2p.Core.TestsBase/E2e/TestSuite.cs b/src/libp2p/Libp2p.Core.TestsBase/E2e/TestSuite.cs
deleted file mode 100644
index a355e05b..00000000
--- a/src/libp2p/Libp2p.Core.TestsBase/E2e/TestSuite.cs
+++ /dev/null
@@ -1,12 +0,0 @@
-// SPDX-FileCopyrightText: 2024 Demerzel Solutions Limited
-// SPDX-License-Identifier: MIT
-
-namespace Nethermind.Libp2p.Core.TestsBase.E2e;
-
-public class TestSuite
-{
- public static IPeerFactory CreateLibp2p(params Type[] appProcols)
- {
- return new TestBuilder().Build();
- }
-}
diff --git a/src/libp2p/Libp2p.Core.TestsBase/Libp2p.Core.TestsBase.csproj b/src/libp2p/Libp2p.Core.TestsBase/Libp2p.Core.TestsBase.csproj
index 01c7ec59..bec831a2 100644
--- a/src/libp2p/Libp2p.Core.TestsBase/Libp2p.Core.TestsBase.csproj
+++ b/src/libp2p/Libp2p.Core.TestsBase/Libp2p.Core.TestsBase.csproj
@@ -33,6 +33,7 @@
+
diff --git a/src/libp2p/Libp2p.Core.TestsBase/LocalPeerStub.cs b/src/libp2p/Libp2p.Core.TestsBase/LocalPeerStub.cs
index 86045f3c..78fbf036 100644
--- a/src/libp2p/Libp2p.Core.TestsBase/LocalPeerStub.cs
+++ b/src/libp2p/Libp2p.Core.TestsBase/LocalPeerStub.cs
@@ -2,6 +2,7 @@
// SPDX-License-Identifier: MIT
using Multiformats.Address;
+using System.Collections.ObjectModel;
namespace Nethermind.Libp2p.Core.TestsBase;
@@ -16,18 +17,37 @@ public LocalPeerStub()
public Identity Identity { get; set; }
public Multiaddress Address { get; set; }
- public Task DialAsync(Multiaddress addr, CancellationToken token = default)
+ public ObservableCollection ListenAddresses => throw new NotImplementedException();
+
+ public event Connected? OnConnected;
+
+ public Task DialAsync(Multiaddress addr, CancellationToken token = default)
+ {
+ return Task.FromResult(new TestRemotePeer(addr));
+ }
+
+ public Task DialAsync(Multiaddress[] samePeerAddrs, CancellationToken token = default)
{
- return Task.FromResult(new TestRemotePeer(addr));
+ return Task.FromResult(new TestRemotePeer(samePeerAddrs.First()));
}
- public Task ListenAsync(Multiaddress addr, CancellationToken token = default)
+ public Task DialAsync(PeerId peerId, CancellationToken token = default)
{
- return Task.FromResult(null);
+ throw new NotImplementedException();
+ }
+
+ public Task DisconnectAsync()
+ {
+ return Task.CompletedTask;
+ }
+
+ public Task StartListenAsync(Multiaddress[] addrs, CancellationToken token = default)
+ {
+ throw new NotImplementedException();
}
}
-public class TestRemotePeer : IRemotePeer
+public class TestRemotePeer : ISession
{
public TestRemotePeer(Multiaddress addr)
{
@@ -38,11 +58,18 @@ public TestRemotePeer(Multiaddress addr)
public Identity Identity { get; set; }
public Multiaddress Address { get; set; }
- public Task DialAsync(CancellationToken token = default) where TProtocol : IProtocol
+ public Multiaddress RemoteAddress => $"/p2p/{Identity.PeerId}";
+
+ public Task DialAsync(CancellationToken token = default) where TProtocol : ISessionProtocol
{
return Task.CompletedTask;
}
+ public Task DialAsync(TRequest request, CancellationToken token = default) where TProtocol : ISessionProtocol
+ {
+ throw new NotImplementedException();
+ }
+
public Task DisconnectAsync()
{
return Task.CompletedTask;
diff --git a/src/libp2p/Libp2p.Core.TestsBase/TestContextLoggerFactory.cs b/src/libp2p/Libp2p.Core.TestsBase/TestContextLoggerFactory.cs
index cd337265..9c1520bd 100644
--- a/src/libp2p/Libp2p.Core.TestsBase/TestContextLoggerFactory.cs
+++ b/src/libp2p/Libp2p.Core.TestsBase/TestContextLoggerFactory.cs
@@ -13,39 +13,32 @@ class TestContextLogger(string categoryName) : ILogger, IDisposable
{
private readonly string _categoryName = categoryName;
- public IDisposable? BeginScope(TState state) where TState : notnull
- {
- return this;
- }
+ public IDisposable? BeginScope(TState state) where TState : notnull => this;
- public void Dispose()
- {
- }
+ public void Dispose() { }
+ public bool IsEnabled(LogLevel logLevel) => true;
- public bool IsEnabled(LogLevel logLevel)
+ private static string ToString(LogLevel level) => level switch
{
- return true;
- }
+ LogLevel.Trace => "TRAC",
+ LogLevel.Debug => "DEBG",
+ LogLevel.Information => "INFO",
+ LogLevel.Warning => "WARN",
+ LogLevel.Error => "EROR",
+ LogLevel.Critical => "CRIT",
+ LogLevel.None => "NONE",
+ _ => throw new NotImplementedException()
+ };
public void Log(LogLevel logLevel, EventId eventId, TState state, Exception? exception, Func formatter)
{
- TestContext.Out.WriteLine($"{logLevel} {_categoryName}:{eventId}: {(exception is null ? state?.ToString() : formatter(state, exception))}");
- Debug.WriteLine($"{logLevel} {_categoryName}:{eventId}: {(exception is null ? state?.ToString() : formatter(state, exception))}");
+ string log = $"{ToString(logLevel)} {_categoryName}: {(exception is null ? state?.ToString() : formatter(state, exception))}";
+ TestContext.Out.WriteLine(log);
+ Debug.WriteLine(log);
}
}
- public void AddProvider(ILoggerProvider provider)
- {
-
- }
-
- public ILogger CreateLogger(string categoryName)
- {
- return new TestContextLogger(categoryName);
- }
-
- public void Dispose()
- {
-
- }
+ public void AddProvider(ILoggerProvider provider) { }
+ public ILogger CreateLogger(string categoryName) => new TestContextLogger(categoryName);
+ public void Dispose() { }
}
diff --git a/src/libp2p/Libp2p.Core.TestsBase/TestDiscoveryProtocol.cs b/src/libp2p/Libp2p.Core.TestsBase/TestDiscoveryProtocol.cs
index 3aae5d9c..476e496d 100644
--- a/src/libp2p/Libp2p.Core.TestsBase/TestDiscoveryProtocol.cs
+++ b/src/libp2p/Libp2p.Core.TestsBase/TestDiscoveryProtocol.cs
@@ -8,7 +8,7 @@ namespace Nethermind.Libp2p.Core.TestsBase;
// public Func? OnAddPeer { get; set; }
// public Func? OnRemovePeer { get; set; }
-// public Task DiscoverAsync(Multiaddress localPeerAddr, CancellationToken token = default)
+// public Task DiscoverAsync(IPeer peer, CancellationToken token = default)
// {
// TaskCompletionSource task = new();
// token.Register(task.SetResult);
diff --git a/src/libp2p/Libp2p.Core/Channel.cs b/src/libp2p/Libp2p.Core/Channel.cs
index 1e3f5707..e19596ef 100644
--- a/src/libp2p/Libp2p.Core/Channel.cs
+++ b/src/libp2p/Libp2p.Core/Channel.cs
@@ -11,7 +11,7 @@
namespace Nethermind.Libp2p.Core;
-internal class Channel : IChannel
+public class Channel : IChannel
{
private IChannel? _reversedChannel;
private ReaderWriter _reader;
diff --git a/src/libp2p/Libp2p.Core/ChannelFactory.cs b/src/libp2p/Libp2p.Core/ChannelFactory.cs
deleted file mode 100644
index f63821c8..00000000
--- a/src/libp2p/Libp2p.Core/ChannelFactory.cs
+++ /dev/null
@@ -1,108 +0,0 @@
-// SPDX-FileCopyrightText: 2023 Demerzel Solutions Limited
-// SPDX-License-Identifier: MIT
-
-using Microsoft.Extensions.DependencyInjection;
-using Microsoft.Extensions.Logging;
-using Nethermind.Libp2p.Core.Extensions;
-
-namespace Nethermind.Libp2p.Core;
-
-public class ChannelFactory : IChannelFactory
-{
- private readonly IServiceProvider _serviceProvider;
- private readonly ILoggerFactory? _loggerFactory;
- private IDictionary _factories;
- private readonly ILogger? _logger;
-
- public ChannelFactory(IServiceProvider serviceProvider)
- {
- _serviceProvider = serviceProvider;
- _loggerFactory = _serviceProvider.GetService();
- _logger = _loggerFactory?.CreateLogger();
- }
-
- public IEnumerable SubProtocols => _factories.Keys;
-
- public IChannel SubDial(IPeerContext context, IChannelRequest? req = null)
- {
- IProtocol? subProtocol = req?.SubProtocol ?? SubProtocols.FirstOrDefault();
- Channel channel = new();
- ChannelFactory? channelFactory = _factories[subProtocol] as ChannelFactory;
-
-
- _ = subProtocol.DialAsync(channel.Reverse, channelFactory, context)
- .ContinueWith(async task =>
- {
- if (!task.IsCompletedSuccessfully)
- {
- _logger?.DialFailed(subProtocol.Id, task.Exception, task.Exception.GetErrorMessage());
- }
- await channel.CloseAsync();
-
- req?.CompletionSource?.SetResult();
- });
-
- return channel;
- }
-
- public IChannel SubListen(IPeerContext context, IChannelRequest? req = null)
- {
- IProtocol? subProtocol = req?.SubProtocol ?? SubProtocols.FirstOrDefault();
- Channel channel = new();
- ChannelFactory? channelFactory = _factories[subProtocol] as ChannelFactory;
-
-
- _ = subProtocol.ListenAsync(channel.Reverse, channelFactory, context)
- .ContinueWith(async task =>
- {
- if (!task.IsCompletedSuccessfully)
- {
- _logger?.ListenFailed(subProtocol.Id, task.Exception, task.Exception.GetErrorMessage());
- }
- await channel.CloseAsync();
-
- req?.CompletionSource?.SetResult();
- });
-
- return channel;
- }
-
- public Task SubDialAndBind(IChannel parent, IPeerContext context,
- IChannelRequest? req = null)
- {
- IProtocol? subProtocol = req?.SubProtocol ?? SubProtocols.FirstOrDefault();
- ChannelFactory? channelFactory = _factories[subProtocol] as ChannelFactory;
-
- return subProtocol.DialAsync(((Channel)parent), channelFactory, context)
- .ContinueWith(async task =>
- {
- if (!task.IsCompletedSuccessfully)
- {
- _logger?.DialAndBindFailed(subProtocol.Id, task.Exception, task.Exception.GetErrorMessage());
- }
- await parent.CloseAsync();
-
- req?.CompletionSource?.SetResult();
- });
- }
-
- public Task SubListenAndBind(IChannel parent, IPeerContext context,
- IChannelRequest? req = null)
- {
- IProtocol? subProtocol = req?.SubProtocol ?? SubProtocols.FirstOrDefault();
- ChannelFactory? channelFactory = _factories[subProtocol] as ChannelFactory;
-
- return subProtocol.ListenAsync(((Channel)parent), channelFactory, context)
- .ContinueWith(async task =>
- {
- await parent.CloseAsync();
- req?.CompletionSource?.SetResult();
- });
- }
-
- public ChannelFactory Setup(IDictionary factories)
- {
- _factories = factories;
- return this;
- }
-}
diff --git a/src/libp2p/Libp2p.Core/ChannelRequest.cs b/src/libp2p/Libp2p.Core/ChannelRequest.cs
deleted file mode 100644
index 3f909665..00000000
--- a/src/libp2p/Libp2p.Core/ChannelRequest.cs
+++ /dev/null
@@ -1,15 +0,0 @@
-// SPDX-FileCopyrightText: 2023 Demerzel Solutions Limited
-// SPDX-License-Identifier: MIT
-
-namespace Nethermind.Libp2p.Core;
-
-public class ChannelRequest : IChannelRequest
-{
- public IProtocol? SubProtocol { get; init; }
- public TaskCompletionSource? CompletionSource { get; init; }
-
- public override string ToString()
- {
- return $"Request for {SubProtocol?.Id ?? "unknown protocol"}";
- }
-}
diff --git a/src/libp2p/Libp2p.Core/Context/ConnectionContext.cs b/src/libp2p/Libp2p.Core/Context/ConnectionContext.cs
new file mode 100644
index 00000000..ae912a68
--- /dev/null
+++ b/src/libp2p/Libp2p.Core/Context/ConnectionContext.cs
@@ -0,0 +1,14 @@
+// SPDX-FileCopyrightText: 2024 Demerzel Solutions Limited
+// SPDX-License-Identifier: MIT
+
+namespace Nethermind.Libp2p.Core.Context;
+
+public class ConnectionContext(LocalPeer localPeer, LocalPeer.Session session, ProtocolRef protocol, bool isListener, UpgradeOptions? upgradeOptions) : ContextBase(localPeer, session, protocol, isListener, upgradeOptions), IConnectionContext
+{
+ public UpgradeOptions? UpgradeOptions => upgradeOptions;
+
+ public Task DisconnectAsync()
+ {
+ return session.DisconnectAsync();
+ }
+}
diff --git a/src/libp2p/Libp2p.Core/Context/ContextBase.cs b/src/libp2p/Libp2p.Core/Context/ContextBase.cs
new file mode 100644
index 00000000..3c70d6ab
--- /dev/null
+++ b/src/libp2p/Libp2p.Core/Context/ContextBase.cs
@@ -0,0 +1,57 @@
+// SPDX-FileCopyrightText: 2024 Demerzel Solutions Limited
+// SPDX-License-Identifier: MIT
+
+using Multiformats.Address;
+
+namespace Nethermind.Libp2p.Core.Context;
+
+public class ContextBase(LocalPeer localPeer, LocalPeer.Session session, ProtocolRef protocol, bool isListener, UpgradeOptions? upgradeOptions) : IChannelFactory
+{
+ protected bool isListener = isListener;
+ public ILocalPeer Peer => localPeer;
+ public State State => session.State;
+
+ public IEnumerable SubProtocols => localPeer.GetProtocolsFor(protocol);
+
+ public string Id { get; } = session.Id;
+
+ protected LocalPeer localPeer = localPeer;
+ protected LocalPeer.Session session = session;
+ protected ProtocolRef protocol = protocol;
+ protected UpgradeOptions? upgradeOptions = upgradeOptions;
+
+ public IChannel Upgrade(UpgradeOptions? upgradeOptions = null)
+ {
+ return localPeer.Upgrade(session, protocol, null, upgradeOptions ?? this.upgradeOptions, isListener);
+ }
+
+ public IChannel Upgrade(IProtocol specificProtocol, UpgradeOptions? upgradeOptions = null)
+ {
+ return localPeer.Upgrade(session, protocol, specificProtocol, upgradeOptions ?? this.upgradeOptions, isListener);
+ }
+
+ public Task Upgrade(IChannel parentChannel, UpgradeOptions? upgradeOptions = null)
+ {
+ return localPeer.Upgrade(session, parentChannel, protocol, null, upgradeOptions ?? this.upgradeOptions, isListener);
+ }
+
+ public Task Upgrade(IChannel parentChannel, IProtocol specificProtocol, UpgradeOptions? upgradeOptions = null)
+ {
+ return localPeer.Upgrade(session, parentChannel, protocol, specificProtocol, upgradeOptions ?? this.upgradeOptions, isListener);
+ }
+
+ public INewConnectionContext CreateConnection()
+ {
+ return localPeer.CreateConnection(protocol, null, isListener);
+ }
+
+ public INewSessionContext UpgradeToSession()
+ {
+ return localPeer.UpgradeToSession(session, protocol, isListener);
+ }
+
+ public void ListenerReady(Multiaddress addr)
+ {
+ localPeer.ListenerReady(this, addr);
+ }
+}
diff --git a/src/libp2p/Libp2p.Core/Context/NewConnectionContext.cs b/src/libp2p/Libp2p.Core/Context/NewConnectionContext.cs
new file mode 100644
index 00000000..e6744d48
--- /dev/null
+++ b/src/libp2p/Libp2p.Core/Context/NewConnectionContext.cs
@@ -0,0 +1,14 @@
+// SPDX-FileCopyrightText: 2024 Demerzel Solutions Limited
+// SPDX-License-Identifier: MIT
+
+namespace Nethermind.Libp2p.Core.Context;
+
+public class NewConnectionContext(LocalPeer localPeer, LocalPeer.Session session, ProtocolRef protocol, bool isListener, UpgradeOptions? upgradeOptions) : ContextBase(localPeer, session, protocol, isListener, upgradeOptions), INewConnectionContext
+{
+ public CancellationToken Token => session.ConnectionToken;
+
+ public void Dispose()
+ {
+
+ }
+}
diff --git a/src/libp2p/Libp2p.Core/Context/NewSessionContext.cs b/src/libp2p/Libp2p.Core/Context/NewSessionContext.cs
new file mode 100644
index 00000000..21a6bbcf
--- /dev/null
+++ b/src/libp2p/Libp2p.Core/Context/NewSessionContext.cs
@@ -0,0 +1,16 @@
+// SPDX-FileCopyrightText: 2024 Demerzel Solutions Limited
+// SPDX-License-Identifier: MIT
+
+namespace Nethermind.Libp2p.Core.Context;
+
+public class NewSessionContext(LocalPeer localPeer, LocalPeer.Session session, ProtocolRef protocol, bool isListener, UpgradeOptions? upgradeOptions) : ContextBase(localPeer, session, protocol, isListener, upgradeOptions), INewSessionContext
+{
+ public IEnumerable DialRequests => session.GetRequestQueue();
+
+ public CancellationToken Token => session.ConnectionToken;
+
+ public void Dispose()
+ {
+
+ }
+}
diff --git a/src/libp2p/Libp2p.Core/Context/SessionContext.cs b/src/libp2p/Libp2p.Core/Context/SessionContext.cs
new file mode 100644
index 00000000..73cdff53
--- /dev/null
+++ b/src/libp2p/Libp2p.Core/Context/SessionContext.cs
@@ -0,0 +1,24 @@
+// SPDX-FileCopyrightText: 2024 Demerzel Solutions Limited
+// SPDX-License-Identifier: MIT
+
+namespace Nethermind.Libp2p.Core.Context;
+
+public class SessionContext(LocalPeer localPeer, LocalPeer.Session session, ProtocolRef protocol, bool isListener, UpgradeOptions? upgradeOptions) : ContextBase(localPeer, session, protocol, isListener, upgradeOptions), ISessionContext
+{
+ public UpgradeOptions? UpgradeOptions => upgradeOptions;
+
+ public async Task DialAsync() where TProtocol : ISessionProtocol
+ {
+ await session.DialAsync();
+ }
+
+ public async Task DialAsync(ISessionProtocol protocol)
+ {
+ await session.DialAsync(protocol);
+ }
+
+ public Task DisconnectAsync()
+ {
+ return session.DisconnectAsync();
+ }
+}
diff --git a/src/libp2p/Libp2p.Core/Discovery/IDiscoveryProtocol.cs b/src/libp2p/Libp2p.Core/Discovery/IDiscoveryProtocol.cs
index abd933ec..fd4029a5 100644
--- a/src/libp2p/Libp2p.Core/Discovery/IDiscoveryProtocol.cs
+++ b/src/libp2p/Libp2p.Core/Discovery/IDiscoveryProtocol.cs
@@ -7,5 +7,5 @@ namespace Nethermind.Libp2p.Core.Discovery;
public interface IDiscoveryProtocol
{
- Task DiscoverAsync(Multiaddress localPeerAddr, CancellationToken token = default);
+ Task StartDiscoveryAsync(IReadOnlyList localPeerAddr, CancellationToken token = default);
}
diff --git a/src/libp2p/Libp2p.Core/Discovery/PeerStore.cs b/src/libp2p/Libp2p.Core/Discovery/PeerStore.cs
index 7008217c..c026dc5c 100644
--- a/src/libp2p/Libp2p.Core/Discovery/PeerStore.cs
+++ b/src/libp2p/Libp2p.Core/Discovery/PeerStore.cs
@@ -4,13 +4,38 @@
using Google.Protobuf;
using Multiformats.Address;
using Nethermind.Libp2p.Core.Dto;
+using Nethermind.Libp2p.Core.Extensions;
using System.Collections.Concurrent;
namespace Nethermind.Libp2p.Core.Discovery;
public class PeerStore
{
- ConcurrentDictionary store = [];
+ private readonly ConcurrentDictionary _store = [];
+
+ public void Discover(ByteString signedPeerRecord)
+ {
+ SignedEnvelope signedEnvelope = SignedEnvelope.Parser.ParseFrom(signedPeerRecord);
+ PublicKey publicKey = PublicKey.Parser.ParseFrom(signedEnvelope.PublicKey);
+ PeerId peerId = new Identity(publicKey).PeerId;
+
+ if (!SigningHelper.VerifyPeerRecord(signedEnvelope, publicKey))
+ {
+ return;
+ }
+
+ Multiaddress[] addresses = PeerRecord.Parser.ParseFrom(signedEnvelope.Payload).Addresses
+ .Select(ai => Multiaddress.Decode(ai.Multiaddr.ToByteArray()))
+ .Where(a => a.GetPeerId() == peerId)
+ .ToArray();
+
+ if (addresses.Length == 0)
+ {
+ return;
+ }
+
+ Discover(addresses);
+ }
public void Discover(Multiaddress[] addrs)
{
@@ -24,8 +49,8 @@ public void Discover(Multiaddress[] addrs)
if (peerId is not null)
{
PeerInfo? newOne = null;
- PeerInfo peerInfo = store.GetOrAdd(peerId, (id) => newOne = new PeerInfo { Addrs = [.. addrs] });
- if (peerInfo != newOne && peerInfo.Addrs is not null && peerInfo.Addrs.Count == addrs.Length && addrs.All(peerInfo.Addrs.Contains))
+ PeerInfo peerInfo = _store.GetOrAdd(peerId, (id) => newOne = new PeerInfo { Addrs = [.. addrs] });
+ if (peerInfo != newOne && peerInfo.Addrs is not null && addrs.UnorderedSequenceEqual(peerInfo.Addrs))
{
return;
}
@@ -45,7 +70,7 @@ public event Action? OnNewPeer
}
onNewPeer += value;
- foreach (var item in store.Select(x => x.Value).ToArray())
+ foreach (PeerInfo? item in _store.Select(x => x.Value).ToArray())
{
if (item.Addrs is not null) value.Invoke(item.Addrs.ToArray());
}
@@ -56,19 +81,17 @@ public event Action? OnNewPeer
}
}
- public override string ToString()
- {
- return $"peerStore({store.Count}):{string.Join(",", store.Select(x => x.Key.ToString() ?? "null"))}";
- }
+ public override string ToString() => $"peerStore({_store.Count}):{string.Join(",", _store.Select(x => x.Key.ToString() ?? "null"))}";
public PeerInfo GetPeerInfo(PeerId peerId)
{
- return store.GetOrAdd(peerId, id => new PeerInfo());
+ return _store.GetOrAdd(peerId, id => new PeerInfo());
}
public class PeerInfo
{
public ByteString? SignedPeerRecord { get; set; }
+ public string[]? SupportedProtocols { get; set; }
public HashSet? Addrs { get; set; }
}
}
diff --git a/src/libp2p/Libp2p.Protocols.Identify/Dto/PeerRecord.cs b/src/libp2p/Libp2p.Core/Dto/PeerRecord.cs
similarity index 94%
rename from src/libp2p/Libp2p.Protocols.Identify/Dto/PeerRecord.cs
rename to src/libp2p/Libp2p.Core/Dto/PeerRecord.cs
index 2e2eef5f..b7099a92 100644
--- a/src/libp2p/Libp2p.Protocols.Identify/Dto/PeerRecord.cs
+++ b/src/libp2p/Libp2p.Core/Dto/PeerRecord.cs
@@ -9,7 +9,7 @@
using pbc = global::Google.Protobuf.Collections;
using pbr = global::Google.Protobuf.Reflection;
using scg = global::System.Collections.Generic;
-namespace Nethermind.Libp2p.Protocols.Identify.Dto {
+namespace Nethermind.Libp2p.Core.Dto {
/// Holder for reflection information generated from PeerRecord.proto
public static partial class PeerRecordReflection {
@@ -26,13 +26,13 @@ static PeerRecordReflection() {
string.Concat(
"ChBQZWVyUmVjb3JkLnByb3RvIiAKC0FkZHJlc3NJbmZvEhEKCW11bHRpYWRk",
"chgBIAIoDCJLCgpQZWVyUmVjb3JkEg8KB3BlZXJfaWQYASACKAwSCwoDc2Vx",
- "GAIgAigEEh8KCWFkZHJlc3NlcxgDIAMoCzIMLkFkZHJlc3NJbmZvQiuqAihO",
- "ZXRoZXJtaW5kLkxpYnAycC5Qcm90b2NvbHMuSWRlbnRpZnkuRHRv"));
+ "GAIgAigEEh8KCWFkZHJlc3NlcxgDIAMoCzIMLkFkZHJlc3NJbmZvQh2qAhpO",
+ "ZXRoZXJtaW5kLkxpYnAycC5Db3JlLkR0bw=="));
descriptor = pbr::FileDescriptor.FromGeneratedCode(descriptorData,
new pbr::FileDescriptor[] { },
new pbr::GeneratedClrTypeInfo(null, null, new pbr::GeneratedClrTypeInfo[] {
- new pbr::GeneratedClrTypeInfo(typeof(global::Nethermind.Libp2p.Protocols.Identify.Dto.AddressInfo), global::Nethermind.Libp2p.Protocols.Identify.Dto.AddressInfo.Parser, new[]{ "Multiaddr" }, null, null, null, null),
- new pbr::GeneratedClrTypeInfo(typeof(global::Nethermind.Libp2p.Protocols.Identify.Dto.PeerRecord), global::Nethermind.Libp2p.Protocols.Identify.Dto.PeerRecord.Parser, new[]{ "PeerId", "Seq", "Addresses" }, null, null, null, null)
+ new pbr::GeneratedClrTypeInfo(typeof(global::Nethermind.Libp2p.Core.Dto.AddressInfo), global::Nethermind.Libp2p.Core.Dto.AddressInfo.Parser, new[]{ "Multiaddr" }, null, null, null, null),
+ new pbr::GeneratedClrTypeInfo(typeof(global::Nethermind.Libp2p.Core.Dto.PeerRecord), global::Nethermind.Libp2p.Core.Dto.PeerRecord.Parser, new[]{ "PeerId", "Seq", "Addresses" }, null, null, null, null)
}));
}
#endregion
@@ -54,7 +54,7 @@ public sealed partial class AddressInfo : pb::IMessage
[global::System.Diagnostics.DebuggerNonUserCodeAttribute]
[global::System.CodeDom.Compiler.GeneratedCode("protoc", null)]
public static pbr::MessageDescriptor Descriptor {
- get { return global::Nethermind.Libp2p.Protocols.Identify.Dto.PeerRecordReflection.Descriptor.MessageTypes[0]; }
+ get { return global::Nethermind.Libp2p.Core.Dto.PeerRecordReflection.Descriptor.MessageTypes[0]; }
}
[global::System.Diagnostics.DebuggerNonUserCodeAttribute]
@@ -267,7 +267,7 @@ public sealed partial class PeerRecord : pb::IMessage
[global::System.Diagnostics.DebuggerNonUserCodeAttribute]
[global::System.CodeDom.Compiler.GeneratedCode("protoc", null)]
public static pbr::MessageDescriptor Descriptor {
- get { return global::Nethermind.Libp2p.Protocols.Identify.Dto.PeerRecordReflection.Descriptor.MessageTypes[1]; }
+ get { return global::Nethermind.Libp2p.Core.Dto.PeerRecordReflection.Descriptor.MessageTypes[1]; }
}
[global::System.Diagnostics.DebuggerNonUserCodeAttribute]
@@ -361,15 +361,15 @@ public void ClearSeq() {
/// Field number for the "addresses" field.
public const int AddressesFieldNumber = 3;
- private static readonly pb::FieldCodec _repeated_addresses_codec
- = pb::FieldCodec.ForMessage(26, global::Nethermind.Libp2p.Protocols.Identify.Dto.AddressInfo.Parser);
- private readonly pbc::RepeatedField addresses_ = new pbc::RepeatedField();
+ private static readonly pb::FieldCodec _repeated_addresses_codec
+ = pb::FieldCodec.ForMessage(26, global::Nethermind.Libp2p.Core.Dto.AddressInfo.Parser);
+ private readonly pbc::RepeatedField addresses_ = new pbc::RepeatedField();
///
/// addresses is a list of public listen addresses for the peer.
///
[global::System.Diagnostics.DebuggerNonUserCodeAttribute]
[global::System.CodeDom.Compiler.GeneratedCode("protoc", null)]
- public pbc::RepeatedField Addresses {
+ public pbc::RepeatedField Addresses {
get { return addresses_; }
}
diff --git a/src/libp2p/Libp2p.Protocols.Identify/Dto/PeerRecord.proto b/src/libp2p/Libp2p.Core/Dto/PeerRecord.proto
similarity index 89%
rename from src/libp2p/Libp2p.Protocols.Identify/Dto/PeerRecord.proto
rename to src/libp2p/Libp2p.Core/Dto/PeerRecord.proto
index 186cd3b2..db1628e0 100644
--- a/src/libp2p/Libp2p.Protocols.Identify/Dto/PeerRecord.proto
+++ b/src/libp2p/Libp2p.Core/Dto/PeerRecord.proto
@@ -1,6 +1,6 @@
syntax = "proto2";
-option csharp_namespace = "Nethermind.Libp2p.Protocols.Identify.Dto";
+option csharp_namespace = "Nethermind.Libp2p.Core.Dto";
message AddressInfo {
required bytes multiaddr = 1;
diff --git a/src/libp2p/Libp2p.Core/Dto/SigningHelper.cs b/src/libp2p/Libp2p.Core/Dto/SigningHelper.cs
new file mode 100644
index 00000000..e11c6b0b
--- /dev/null
+++ b/src/libp2p/Libp2p.Core/Dto/SigningHelper.cs
@@ -0,0 +1,103 @@
+// SPDX-FileCopyrightText: 2024 Demerzel Solutions Limited
+// SPDX-License-Identifier: MIT
+
+using Google.Protobuf;
+using Multiformats.Address;
+
+namespace Nethermind.Libp2p.Core.Dto;
+
+public static class SigningHelper
+{
+ private static readonly byte[] PayloadType = [((ushort)Enums.Libp2p.Libp2pPeerRecord >> 8) & 0xFF, (ushort)Enums.Libp2p.Libp2pPeerRecord & 0xFF];
+ private static readonly byte[] Domain = "libp2p-peer-record"u8.ToArray().ToArray();
+ public static bool VerifyPeerRecord(ByteString signedEnvelopeBytes, PublicKey publicKey)
+ {
+ SignedEnvelope signedEnvelope = SignedEnvelope.Parser.ParseFrom(signedEnvelopeBytes);
+ return VerifyPeerRecord(signedEnvelope, publicKey);
+ }
+
+ public static bool VerifyPeerRecord(SignedEnvelope signedEnvelope, PublicKey publicKey)
+ {
+ Identity identity = new(publicKey);
+
+ if (signedEnvelope.PayloadType?.Take(2).SequenceEqual(PayloadType) is not true)
+ {
+ return false;
+ }
+
+ PeerRecord pr = PeerRecord.Parser.ParseFrom(signedEnvelope.Payload);
+
+ if (identity.PeerId != new PeerId(pr.PeerId.ToByteArray()))
+ {
+ return false;
+ }
+
+ byte[] signedData = new byte[
+ VarInt.GetSizeInBytes(Domain.Length) + Domain.Length +
+ VarInt.GetSizeInBytes(PayloadType.Length) + PayloadType.Length +
+ VarInt.GetSizeInBytes(signedEnvelope.Payload.Length) + signedEnvelope.Payload.Length];
+
+ int offset = 0;
+
+ VarInt.Encode(Domain.Length, signedData.AsSpan(), ref offset);
+ Array.Copy(Domain, 0, signedData, offset, Domain.Length);
+ offset += Domain.Length;
+
+ VarInt.Encode(PayloadType.Length, signedData.AsSpan(), ref offset);
+ Array.Copy(PayloadType, 0, signedData, offset, PayloadType.Length);
+ offset += PayloadType.Length;
+
+ VarInt.Encode(signedEnvelope.Payload.Length, signedData.AsSpan(), ref offset);
+ Array.Copy(signedEnvelope.Payload.ToByteArray(), 0, signedData, offset, signedEnvelope.Payload.Length);
+
+ return identity.VerifySignature(signedData, signedEnvelope.Signature.ToByteArray());
+ }
+
+ public static ByteString CreateSignedEnvelope(Identity identity, Multiaddress[] addresses, ulong seq)
+ {
+ PeerRecord payload = new()
+ {
+ PeerId = ByteString.CopyFrom(identity.PeerId.Bytes),
+ Seq = seq
+ };
+
+ foreach (Multiaddress address in addresses)
+ {
+ payload.Addresses.Add(new AddressInfo
+ {
+ Multiaddr = ByteString.CopyFrom(address.ToBytes())
+ });
+ }
+
+ SignedEnvelope envelope = new()
+ {
+ PayloadType = ByteString.CopyFrom(PayloadType),
+ Payload = payload.ToByteString(),
+ PublicKey = identity.PublicKey.ToByteString(),
+ };
+
+ int payloadLength = payload.CalculateSize();
+
+ byte[] signingData = new byte[
+ VarInt.GetSizeInBytes(Domain.Length) + Domain.Length +
+ VarInt.GetSizeInBytes(PayloadType.Length) + PayloadType.Length +
+ VarInt.GetSizeInBytes(payloadLength) + payloadLength];
+
+ int offset = 0;
+
+ VarInt.Encode(Domain.Length, signingData.AsSpan(), ref offset);
+ Array.Copy(Domain, 0, signingData, offset, Domain.Length);
+ offset += Domain.Length;
+
+ VarInt.Encode(PayloadType.Length, signingData.AsSpan(), ref offset);
+ Array.Copy(PayloadType, 0, signingData, offset, PayloadType.Length);
+ offset += PayloadType.Length;
+
+ VarInt.Encode(payloadLength, signingData.AsSpan(), ref offset);
+ Array.Copy(payload.ToByteArray(), 0, signingData, offset, payloadLength);
+
+ envelope.Signature = ByteString.CopyFrom(identity.Sign(signingData).ToArray());
+
+ return envelope.ToByteString();
+ }
+}
diff --git a/src/libp2p/Libp2p.Core/Exceptions/Libp2pException.cs b/src/libp2p/Libp2p.Core/Exceptions/Libp2pException.cs
index d36b4528..afe39951 100644
--- a/src/libp2p/Libp2p.Core/Exceptions/Libp2pException.cs
+++ b/src/libp2p/Libp2p.Core/Exceptions/Libp2pException.cs
@@ -5,17 +5,33 @@ namespace Nethermind.Libp2p.Core.Exceptions;
public class Libp2pException : Exception
{
- public Libp2pException(string? message) : base(message)
- {
+ public Libp2pException(string? message) : base(message) { }
+ public Libp2pException() : base() { }
+}
- }
- public Libp2pException() : base()
- {
+///
+/// Exception instead of IOResult to signal a channel cannot send or receive data anymore
+///
+public class ChannelClosedException() : Libp2pException("Channel closed");
- }
-}
+///
+/// Appears when libp2p is not set up properly in part of protocol tack, IoC, etc.
+///
+///
+public class Libp2pSetupException(string? message = null) : Libp2pException(message);
+
+///
+/// Appears when there is already active session for the given peer
+///
+public class SessionExistsException(PeerId remotePeerId) : Libp2pException($"Session is already established with {remotePeerId}");
+
+
+///
+/// Appears if connection to peer failed or declined
+///
+public class PeerConnectionException(string? message = null) : Libp2pException(message);
-public class ChannelClosedException : Libp2pException
+public class DLibp2pException : Libp2pException
{
}
diff --git a/src/libp2p/Libp2p.Core/Extensions/ChannelFactoryExtensions.cs b/src/libp2p/Libp2p.Core/Extensions/ChannelFactoryExtensions.cs
deleted file mode 100644
index 3c5c70ed..00000000
--- a/src/libp2p/Libp2p.Core/Extensions/ChannelFactoryExtensions.cs
+++ /dev/null
@@ -1,10 +0,0 @@
-// SPDX-FileCopyrightText: 2023 Demerzel Solutions Limited
-// SPDX-License-Identifier: MIT
-
-namespace Nethermind.Libp2p.Core.Extensions;
-
-internal static class ChannelFactoryExtensions
-{
- public static IEnumerable GetSubProtocols(this ChannelFactory? channelFactory)
- => channelFactory?.SubProtocols.Select(protocol => protocol.Id) ?? Enumerable.Empty();
-}
diff --git a/src/libp2p/Libp2p.Core/Extensions/EnumerableExtensions.cs b/src/libp2p/Libp2p.Core/Extensions/EnumerableExtensions.cs
new file mode 100644
index 00000000..a34ab6ba
--- /dev/null
+++ b/src/libp2p/Libp2p.Core/Extensions/EnumerableExtensions.cs
@@ -0,0 +1,9 @@
+// SPDX-FileCopyrightText: 2024 Demerzel Solutions Limited
+// SPDX-License-Identifier: MIT
+
+namespace Nethermind.Libp2p.Core.Extensions;
+
+public static class EnumerableExtensions
+{
+ public static bool UnorderedSequenceEqual(this IEnumerable left, IEnumerable right) => left.OrderBy(x => x).SequenceEqual(right.OrderBy(x => x));
+}
diff --git a/src/libp2p/Libp2p.Core/Extensions/TaskHelper.cs b/src/libp2p/Libp2p.Core/Extensions/TaskHelper.cs
new file mode 100644
index 00000000..249a68f9
--- /dev/null
+++ b/src/libp2p/Libp2p.Core/Extensions/TaskHelper.cs
@@ -0,0 +1,33 @@
+// SPDX-FileCopyrightText: 2024 Demerzel Solutions Limited
+// SPDX-License-Identifier: MIT
+
+using Nethermind.Libp2p.Core.Exceptions;
+
+namespace Nethermind.Libp2p.Core.Extensions;
+
+internal static class TaskHelper
+{
+ public static async Task FirstSuccess(params Task[] tasks)
+ {
+ TaskCompletionSource tcs = new();
+
+ Task all = Task.WhenAll(tasks.Select(t => t.ContinueWith(t =>
+ {
+ if (t.IsCompletedSuccessfully)
+ {
+ tcs.TrySetResult(t);
+ }
+ if (t.IsFaulted && t.Exception.InnerException is SessionExistsException)
+ {
+ tcs.TrySetResult(t);
+ }
+ })));
+
+ Task result = await Task.WhenAny(tcs.Task, all);
+ if (result == all)
+ {
+ throw new AggregateException(tasks.Select(t => t.Exception?.InnerException).Where(ex => ex is not null)!);
+ }
+ return tcs.Task.Result;
+ }
+}
diff --git a/src/libp2p/Libp2p.Core/IChannel.cs b/src/libp2p/Libp2p.Core/IChannel.cs
index ce24c06d..c933bb2d 100644
--- a/src/libp2p/Libp2p.Core/IChannel.cs
+++ b/src/libp2p/Libp2p.Core/IChannel.cs
@@ -14,9 +14,9 @@ CancellationToken CancellationToken
{
get
{
- var token = new CancellationTokenSource();
- GetAwaiter().OnCompleted(token.Cancel);
- return token.Token;
+ CancellationTokenSource cts = new();
+ GetAwaiter().OnCompleted(cts.Cancel);
+ return cts.Token;
}
}
}
diff --git a/src/libp2p/Libp2p.Core/IChannelFactory.cs b/src/libp2p/Libp2p.Core/IChannelFactory.cs
index d8ff230e..698cab80 100644
--- a/src/libp2p/Libp2p.Core/IChannelFactory.cs
+++ b/src/libp2p/Libp2p.Core/IChannelFactory.cs
@@ -6,33 +6,25 @@ namespace Nethermind.Libp2p.Core;
public interface IChannelFactory
{
IEnumerable SubProtocols { get; }
- IChannel SubDial(IPeerContext context, IChannelRequest? request = null);
- IChannel SubListen(IPeerContext context, IChannelRequest? request = null);
+ IChannel Upgrade(UpgradeOptions? options = null);
+ IChannel Upgrade(IProtocol specificProtocol, UpgradeOptions? options = null);
- Task SubDialAndBind(IChannel parentChannel, IPeerContext context, IChannelRequest? request = null);
-
- Task SubListenAndBind(IChannel parentChannel, IPeerContext context, IChannelRequest? request = null);
-
-
-
- IChannel SubDial(IPeerContext context, IProtocol protocol)
- {
- return SubDial(context, new ChannelRequest { SubProtocol = protocol });
- }
-
- IChannel SubListen(IPeerContext context, IProtocol protocol)
- {
- return SubListen(context, new ChannelRequest { SubProtocol = protocol });
- }
+ Task Upgrade(IChannel parentChannel, UpgradeOptions? options = null);
+ Task Upgrade(IChannel parentChannel, IProtocol specificProtocol, UpgradeOptions? options = null);
+}
- Task SubDialAndBind(IChannel parentChannel, IPeerContext context, IProtocol protocol)
- {
- return SubDialAndBind(parentChannel, context, new ChannelRequest { SubProtocol = protocol });
- }
+public record UpgradeOptions
+{
+ public IProtocol? SelectedProtocol { get; init; }
+ public UpgradeModeOverride ModeOverride { get; init; }
+ public TaskCompletionSource