Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve yamux; add test-plans interop app #73

Merged
merged 16 commits into from
Apr 24, 2024
35 changes: 35 additions & 0 deletions .dockerignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
.git
launchSettings.json

*DS_Store
_ReSharper.*
*.csproj.user
*[Rr]e[Ss]harper.user
_ReSharper.*/
.vs/

**/Obj/
**/obj/
**/bin/
**/Bin/

*.xap
*.user
/TestResults
*.vspscc
*.vssscc
*.suo
*.cache
packages/*
artifacts/*
msbuild.log
PublishProfiles/
*.psess
*.vsp
*.pidb
*.userprefs
*.ncrunchsolution
*.log
*.vspx
/.symbols
*.sln.ide
2 changes: 1 addition & 1 deletion .editorconfig
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ root = true
[*]
charset = utf-8
end_of_line = lf
file_header_template = SPDX-FileCopyrightText: 2023 Demerzel Solutions Limited\nSPDX-License-Identifier: MIT
file_header_template = SPDX-FileCopyrightText: 2024 Demerzel Solutions Limited\nSPDX-License-Identifier: MIT
indent_size = 2
indent_style = space
insert_final_newline = true
Expand Down
2 changes: 2 additions & 0 deletions .gitmodules
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@
[submodule "src/cs-multihash"]
path = src/cs-multihash
url = https://github.com/NethermindEth/cs-multihash.git
branch = master
[submodule "src/cs-multiaddress"]
path = src/cs-multiaddress
url = https://github.com/NethermindEth/cs-multiaddress.git
branch = master
2 changes: 1 addition & 1 deletion src/libp2p/Libp2p.Core.Benchmarks/Benchmarks.cs
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ await Task.Run(async () =>
long i = 0;
while (i < TotalSize)
{
i += (await revChan.ReadAsync(0, ReadBlockingMode.WaitAny)).Length;
i += (await revChan.ReadAsync(0, ReadBlockingMode.WaitAny).OrThrow()).Length;
}
});
}
Expand Down
4 changes: 2 additions & 2 deletions src/libp2p/Libp2p.Core.Benchmarks/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
using System.Diagnostics;

Channel chan = new();
IChannel revChan = ((Channel)chan).Reverse;
IChannel revChan = chan.Reverse;

const long GiB = 1024 * 1024 * 1024;
long PacketSize = 1 * 1024;
Expand Down Expand Up @@ -42,7 +42,7 @@ await Task.Run(async () =>
{
try
{
d = (await revChan.ReadAsync(0, ReadBlockingMode.WaitAny));
d = (await revChan.ReadAsync(0, ReadBlockingMode.WaitAny).OrThrow());
i += d.Length;
}
catch
Expand Down
19 changes: 0 additions & 19 deletions src/libp2p/Libp2p.Core.Tests/ChannelsBindingTests.cs

This file was deleted.

148 changes: 125 additions & 23 deletions src/libp2p/Libp2p.Core.Tests/ReaderWriterTests.cs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
// SPDX-FileCopyrightText: 2023 Demerzel Solutions Limited
// SPDX-License-Identifier: MIT

using Nethermind.Libp2p.Core.Exceptions;
using System.Buffers;

namespace Nethermind.Libp2p.Core.Tests;
Expand All @@ -14,18 +15,24 @@ public async Task Test_ChannelWrites_WhenReadIsRequested()
bool isWritten = false;
Task wrote = Task.Run(async () =>
{
await readerWriter.WriteAsync(new ReadOnlySequence<byte>(new byte[] { 1, 2, 3, 4 }));
await readerWriter.WriteAsync(new ReadOnlySequence<byte>([1, 2, 3, 4]));
isWritten = true;
});
await Task.Delay(100);
Assert.That(isWritten, Is.False);
ReadOnlySequence<byte> chunk1 = await readerWriter.ReadAsync(1);
Assert.That(chunk1.ToArray(), Is.EquivalentTo(new byte[] { 1 }));
Assert.That(isWritten, Is.False);
ReadOnlySequence<byte> chunk2 = await readerWriter.ReadAsync(2);
Assert.That(chunk2.ToArray(), Is.EquivalentTo(new byte[] { 2, 3 }));
Assert.That(isWritten, Is.False);
ReadOnlySequence<byte> chunk3 = await readerWriter.ReadAsync(1);
ReadOnlySequence<byte> chunk1 = await readerWriter.ReadAsync(1).OrThrow();
Assert.Multiple(() =>
{
Assert.That(chunk1.ToArray(), Is.EquivalentTo(new byte[] { 1 }));
Assert.That(isWritten, Is.False);
});
ReadOnlySequence<byte> chunk2 = await readerWriter.ReadAsync(2).OrThrow();
Assert.Multiple(() =>
{
Assert.That(chunk2.ToArray(), Is.EquivalentTo(new byte[] { 2, 3 }));
Assert.That(isWritten, Is.False);
});
ReadOnlySequence<byte> chunk3 = await readerWriter.ReadAsync(1).OrThrow();
Assert.That(chunk3.ToArray(), Is.EquivalentTo(new byte[] { 4 }));
await wrote;
Assert.That(isWritten, Is.True);
Expand All @@ -37,23 +44,23 @@ public async Task Test_ChannelReads_MultipleWrites()
Channel.ReaderWriter readerWriter = new();
_ = Task.Run(async () =>
{
await readerWriter.WriteAsync(new ReadOnlySequence<byte>(new byte[] { 1 }));
await readerWriter.WriteAsync(new ReadOnlySequence<byte>(new byte[] { 2 }));
await readerWriter.WriteAsync(new ReadOnlySequence<byte>([1]));
await readerWriter.WriteAsync(new ReadOnlySequence<byte>([2]));
});
ReadOnlySequence<byte> allTheData = await readerWriter.ReadAsync(2);
ReadOnlySequence<byte> allTheData = await readerWriter.ReadAsync(2).OrThrow();
Assert.That(allTheData.ToArray(), Is.EquivalentTo(new byte[] { 1, 2 }));
}

[Test]
public async Task Test_ChannelReads_SequentialChunks()
{
Channel.ReaderWriter readerWriter = new();
ValueTask<ReadOnlySequence<byte>> t1 = readerWriter.ReadAsync(2);
ValueTask<ReadOnlySequence<byte>> t2 = readerWriter.ReadAsync(2);
await readerWriter.WriteAsync(new ReadOnlySequence<byte>(new byte[] { 1 }));
await readerWriter.WriteAsync(new ReadOnlySequence<byte>(new byte[] { 2 }));
await readerWriter.WriteAsync(new ReadOnlySequence<byte>(new byte[] { 3 }));
await readerWriter.WriteAsync(new ReadOnlySequence<byte>(new byte[] { 4 }));
ValueTask<ReadOnlySequence<byte>> t1 = readerWriter.ReadAsync(2).OrThrow();
ValueTask<ReadOnlySequence<byte>> t2 = readerWriter.ReadAsync(2).OrThrow();
await readerWriter.WriteAsync(new ReadOnlySequence<byte>([1]));
await readerWriter.WriteAsync(new ReadOnlySequence<byte>([2]));
await readerWriter.WriteAsync(new ReadOnlySequence<byte>([3]));
await readerWriter.WriteAsync(new ReadOnlySequence<byte>([4]));
ReadOnlySequence<byte> chunk1 = await t1;
ReadOnlySequence<byte> chunk2 = await t2;
Assert.That(chunk1.ToArray(), Is.EquivalentTo(new byte[] { 1, 2 }));
Expand All @@ -64,28 +71,123 @@ public async Task Test_ChannelReads_SequentialChunks()
public async Task Test_ChannelWrites_WhenReadIsRequested2()
{
Channel.ReaderWriter readerWriter = new();
_ = Task.Run(async () => await readerWriter.WriteAsync(new ReadOnlySequence<byte>(new byte[] { 1, 2 })));
ReadOnlySequence<byte> res1 = await readerWriter.ReadAsync(3, ReadBlockingMode.WaitAny);
_ = Task.Run(async () => await readerWriter.WriteAsync(new ReadOnlySequence<byte>([1, 2])));
ReadOnlySequence<byte> res1 = await readerWriter.ReadAsync(3, ReadBlockingMode.WaitAny).OrThrow();
Assert.That(res1.ToArray().Length, Is.EqualTo(2));
}

[Test]
public async Task Test_ChannelReadsNithing_WhenItIsDontWaitAndEmpty()
{
Channel.ReaderWriter readerWriter = new();
ReadOnlySequence<byte> anyData = await readerWriter.ReadAsync(0, ReadBlockingMode.DontWait);
ReadOnlySequence<byte> anyData = await readerWriter.ReadAsync(0, ReadBlockingMode.DontWait).OrThrow();
Assert.That(anyData.ToArray(), Is.Empty);
anyData = await readerWriter.ReadAsync(1, ReadBlockingMode.DontWait);
anyData = await readerWriter.ReadAsync(1, ReadBlockingMode.DontWait).OrThrow();
Assert.That(anyData.ToArray(), Is.Empty);
anyData = await readerWriter.ReadAsync(10, ReadBlockingMode.DontWait);
anyData = await readerWriter.ReadAsync(10, ReadBlockingMode.DontWait).OrThrow();
Assert.That(anyData.ToArray(), Is.Empty);
}

[Test]
public async Task Test_ChannelWrites_WhenReadIsRequested3()
{
Channel.ReaderWriter readerWriter = new();
ReadOnlySequence<byte> res1 = await readerWriter.ReadAsync(3, ReadBlockingMode.DontWait);
ReadOnlySequence<byte> res1 = await readerWriter.ReadAsync(3, ReadBlockingMode.DontWait).OrThrow();
Assert.That(res1.ToArray().Length, Is.EqualTo(0));
}

[Test]
public async Task Test_ChannelWrites_Eof()
{
Channel.ReaderWriter readerWriter = new();

_ = Task.Run(async () =>
{
await readerWriter.WriteAsync(new ReadOnlySequence<byte>([1, 2, 3]));
await readerWriter.WriteEofAsync();
});

Assert.That(await readerWriter.CanReadAsync(), Is.EqualTo(IOResult.Ok));
ReadOnlySequence<byte> res1 = await readerWriter.ReadAsync(3, ReadBlockingMode.WaitAll).OrThrow();

Assert.ThrowsAsync<ChannelClosedException>(async () => await readerWriter.ReadAsync(3, ReadBlockingMode.DontWait).OrThrow());
Assert.That(await readerWriter.CanReadAsync(), Is.EqualTo(IOResult.Ended));

Assert.ThrowsAsync<ChannelClosedException>(async () => await readerWriter.ReadAsync(3, ReadBlockingMode.WaitAny).OrThrow());
Assert.That(await readerWriter.CanReadAsync(), Is.EqualTo(IOResult.Ended));

Assert.ThrowsAsync<ChannelClosedException>(async () => await readerWriter.ReadAsync(3, ReadBlockingMode.WaitAll).OrThrow());
Assert.That(await readerWriter.CanReadAsync(), Is.EqualTo(IOResult.Ended));

}

[TestCase(new byte[0])]
[TestCase(new byte[] { 1, 2, 3 })]
public async Task Test_ChannelWrites_CannotWriteAfterEof(byte[] toWrite)
{
Channel.ReaderWriter readerWriter = new();

await readerWriter.WriteEofAsync();
Assert.That(await readerWriter.CanReadAsync(), Is.EqualTo(IOResult.Ended));

Assert.ThrowsAsync<ChannelClosedException>(async () => await readerWriter.WriteAsync(new ReadOnlySequence<byte>(toWrite)).OrThrow());
Assert.That(await readerWriter.CanReadAsync(), Is.EqualTo(IOResult.Ended));
}

[Test]
public async Task Test_ChannelWrites_CanReadAny()
{
Channel.ReaderWriter readerWriter = new();

_ = Task.Run(async () =>
{
await readerWriter.WriteAsync(new ReadOnlySequence<byte>([1, 2, 3]));
await readerWriter.WriteEofAsync();
});

ReadOnlySequence<byte> res1 = await readerWriter.ReadAsync(3, ReadBlockingMode.WaitAll).OrThrow();

Assert.That(res1, Has.Length.EqualTo(3));

Assert.ThrowsAsync<ChannelClosedException>(async () => await readerWriter.ReadAsync(3, ReadBlockingMode.DontWait).OrThrow());
Assert.That(await readerWriter.CanReadAsync(), Is.EqualTo(IOResult.Ended));

Assert.ThrowsAsync<ChannelClosedException>(async () => await readerWriter.ReadAsync(3, ReadBlockingMode.WaitAny).OrThrow());
Assert.That(await readerWriter.CanReadAsync(), Is.EqualTo(IOResult.Ended));

Assert.ThrowsAsync<ChannelClosedException>(async () => await readerWriter.ReadAsync(3, ReadBlockingMode.WaitAll).OrThrow());
Assert.That(await readerWriter.CanReadAsync(), Is.EqualTo(IOResult.Ended));
}

[Test]
public async Task Test_ChannelWrites_CannotReadAll_OnePacket()
{
Channel.ReaderWriter readerWriter = new();

_ = Task.Run(async () =>
{
await readerWriter.WriteAsync(new ReadOnlySequence<byte>([1, 2, 3]));
await readerWriter.WriteEofAsync();
});

Assert.ThrowsAsync<ChannelClosedException>(async () => await readerWriter.ReadAsync(5, ReadBlockingMode.WaitAll).OrThrow());
Assert.That(await readerWriter.CanReadAsync(), Is.EqualTo(IOResult.Ended));
}

[Test]
public async Task Test_ChannelWrites_CannotReadAll_Fragmented()
{
Channel.ReaderWriter readerWriter = new();

_ = Task.Run(async () =>
{
await readerWriter.WriteAsync(new ReadOnlySequence<byte>([1]));
await readerWriter.WriteAsync(new ReadOnlySequence<byte>([2, 3]));
await readerWriter.WriteAsync(new ReadOnlySequence<byte>([4]));
await readerWriter.WriteEofAsync();
});

Assert.ThrowsAsync<ChannelClosedException>(async () => await readerWriter.ReadAsync(5, ReadBlockingMode.WaitAll).OrThrow());
Assert.That(await readerWriter.CanReadAsync(), Is.EqualTo(IOResult.Ended));
}
}
49 changes: 49 additions & 0 deletions src/libp2p/Libp2p.Core.TestsBase/DebugLoggerFactory.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
// SPDX-FileCopyrightText: 2024 Demerzel Solutions Limited
// SPDX-License-Identifier: MIT

using Microsoft.Extensions.Logging;
using NUnit.Framework;

namespace Nethermind.Libp2p.Core.TestsBase;

public class DebugLoggerFactory : ILoggerFactory
{
class DebugLogger(string categoryName) : ILogger, IDisposable
{
private readonly string _categoryName = categoryName;

public IDisposable? BeginScope<TState>(TState state) where TState : notnull
{
return this;
}

public void Dispose()
{
}

public bool IsEnabled(LogLevel logLevel)
{
return true;
}

public void Log<TState>(LogLevel logLevel, EventId eventId, TState state, Exception? exception, Func<TState, Exception?, string> formatter)
{
TestContext.Out.WriteLine($"{logLevel} {_categoryName}:{eventId}: {(exception is null ? state?.ToString() : formatter(state, exception))}");
}
}

public void AddProvider(ILoggerProvider provider)
{

}

public ILogger CreateLogger(string categoryName)
{
return new DebugLogger(categoryName);
}

public void Dispose()
{

}
}
Loading
Loading