Skip to content

Commit

Permalink
Improve yamux; add test-plans interop app (#73)
Browse files Browse the repository at this point in the history
  • Loading branch information
flcl42 authored Apr 24, 2024
1 parent 6432c5d commit 79237fb
Show file tree
Hide file tree
Showing 58 changed files with 2,695 additions and 670 deletions.
35 changes: 35 additions & 0 deletions .dockerignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
.git
launchSettings.json

*DS_Store
_ReSharper.*
*.csproj.user
*[Rr]e[Ss]harper.user
_ReSharper.*/
.vs/

**/Obj/
**/obj/
**/bin/
**/Bin/

*.xap
*.user
/TestResults
*.vspscc
*.vssscc
*.suo
*.cache
packages/*
artifacts/*
msbuild.log
PublishProfiles/
*.psess
*.vsp
*.pidb
*.userprefs
*.ncrunchsolution
*.log
*.vspx
/.symbols
*.sln.ide
2 changes: 1 addition & 1 deletion .editorconfig
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ root = true
[*]
charset = utf-8
end_of_line = lf
file_header_template = SPDX-FileCopyrightText: 2023 Demerzel Solutions Limited\nSPDX-License-Identifier: MIT
file_header_template = SPDX-FileCopyrightText: 2024 Demerzel Solutions Limited\nSPDX-License-Identifier: MIT
indent_size = 2
indent_style = space
insert_final_newline = true
Expand Down
2 changes: 2 additions & 0 deletions .gitmodules
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@
[submodule "src/cs-multihash"]
path = src/cs-multihash
url = https://github.com/NethermindEth/cs-multihash.git
branch = master
[submodule "src/cs-multiaddress"]
path = src/cs-multiaddress
url = https://github.com/NethermindEth/cs-multiaddress.git
branch = master
2 changes: 1 addition & 1 deletion src/libp2p/Libp2p.Core.Benchmarks/Benchmarks.cs
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ await Task.Run(async () =>
long i = 0;
while (i < TotalSize)
{
i += (await revChan.ReadAsync(0, ReadBlockingMode.WaitAny)).Length;
i += (await revChan.ReadAsync(0, ReadBlockingMode.WaitAny).OrThrow()).Length;
}
});
}
Expand Down
4 changes: 2 additions & 2 deletions src/libp2p/Libp2p.Core.Benchmarks/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
using System.Diagnostics;

Channel chan = new();
IChannel revChan = ((Channel)chan).Reverse;
IChannel revChan = chan.Reverse;

const long GiB = 1024 * 1024 * 1024;
long PacketSize = 1 * 1024;
Expand Down Expand Up @@ -42,7 +42,7 @@ await Task.Run(async () =>
{
try
{
d = (await revChan.ReadAsync(0, ReadBlockingMode.WaitAny));
d = (await revChan.ReadAsync(0, ReadBlockingMode.WaitAny).OrThrow());
i += d.Length;
}
catch
Expand Down
19 changes: 0 additions & 19 deletions src/libp2p/Libp2p.Core.Tests/ChannelsBindingTests.cs

This file was deleted.

148 changes: 125 additions & 23 deletions src/libp2p/Libp2p.Core.Tests/ReaderWriterTests.cs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
// SPDX-FileCopyrightText: 2023 Demerzel Solutions Limited
// SPDX-License-Identifier: MIT

using Nethermind.Libp2p.Core.Exceptions;
using System.Buffers;

namespace Nethermind.Libp2p.Core.Tests;
Expand All @@ -14,18 +15,24 @@ public async Task Test_ChannelWrites_WhenReadIsRequested()
bool isWritten = false;
Task wrote = Task.Run(async () =>
{
await readerWriter.WriteAsync(new ReadOnlySequence<byte>(new byte[] { 1, 2, 3, 4 }));
await readerWriter.WriteAsync(new ReadOnlySequence<byte>([1, 2, 3, 4]));
isWritten = true;
});
await Task.Delay(100);
Assert.That(isWritten, Is.False);
ReadOnlySequence<byte> chunk1 = await readerWriter.ReadAsync(1);
Assert.That(chunk1.ToArray(), Is.EquivalentTo(new byte[] { 1 }));
Assert.That(isWritten, Is.False);
ReadOnlySequence<byte> chunk2 = await readerWriter.ReadAsync(2);
Assert.That(chunk2.ToArray(), Is.EquivalentTo(new byte[] { 2, 3 }));
Assert.That(isWritten, Is.False);
ReadOnlySequence<byte> chunk3 = await readerWriter.ReadAsync(1);
ReadOnlySequence<byte> chunk1 = await readerWriter.ReadAsync(1).OrThrow();
Assert.Multiple(() =>
{
Assert.That(chunk1.ToArray(), Is.EquivalentTo(new byte[] { 1 }));
Assert.That(isWritten, Is.False);
});
ReadOnlySequence<byte> chunk2 = await readerWriter.ReadAsync(2).OrThrow();
Assert.Multiple(() =>
{
Assert.That(chunk2.ToArray(), Is.EquivalentTo(new byte[] { 2, 3 }));
Assert.That(isWritten, Is.False);
});
ReadOnlySequence<byte> chunk3 = await readerWriter.ReadAsync(1).OrThrow();
Assert.That(chunk3.ToArray(), Is.EquivalentTo(new byte[] { 4 }));
await wrote;
Assert.That(isWritten, Is.True);
Expand All @@ -37,23 +44,23 @@ public async Task Test_ChannelReads_MultipleWrites()
Channel.ReaderWriter readerWriter = new();
_ = Task.Run(async () =>
{
await readerWriter.WriteAsync(new ReadOnlySequence<byte>(new byte[] { 1 }));
await readerWriter.WriteAsync(new ReadOnlySequence<byte>(new byte[] { 2 }));
await readerWriter.WriteAsync(new ReadOnlySequence<byte>([1]));
await readerWriter.WriteAsync(new ReadOnlySequence<byte>([2]));
});
ReadOnlySequence<byte> allTheData = await readerWriter.ReadAsync(2);
ReadOnlySequence<byte> allTheData = await readerWriter.ReadAsync(2).OrThrow();
Assert.That(allTheData.ToArray(), Is.EquivalentTo(new byte[] { 1, 2 }));
}

[Test]
public async Task Test_ChannelReads_SequentialChunks()
{
Channel.ReaderWriter readerWriter = new();
ValueTask<ReadOnlySequence<byte>> t1 = readerWriter.ReadAsync(2);
ValueTask<ReadOnlySequence<byte>> t2 = readerWriter.ReadAsync(2);
await readerWriter.WriteAsync(new ReadOnlySequence<byte>(new byte[] { 1 }));
await readerWriter.WriteAsync(new ReadOnlySequence<byte>(new byte[] { 2 }));
await readerWriter.WriteAsync(new ReadOnlySequence<byte>(new byte[] { 3 }));
await readerWriter.WriteAsync(new ReadOnlySequence<byte>(new byte[] { 4 }));
ValueTask<ReadOnlySequence<byte>> t1 = readerWriter.ReadAsync(2).OrThrow();
ValueTask<ReadOnlySequence<byte>> t2 = readerWriter.ReadAsync(2).OrThrow();
await readerWriter.WriteAsync(new ReadOnlySequence<byte>([1]));
await readerWriter.WriteAsync(new ReadOnlySequence<byte>([2]));
await readerWriter.WriteAsync(new ReadOnlySequence<byte>([3]));
await readerWriter.WriteAsync(new ReadOnlySequence<byte>([4]));
ReadOnlySequence<byte> chunk1 = await t1;
ReadOnlySequence<byte> chunk2 = await t2;
Assert.That(chunk1.ToArray(), Is.EquivalentTo(new byte[] { 1, 2 }));
Expand All @@ -64,28 +71,123 @@ public async Task Test_ChannelReads_SequentialChunks()
public async Task Test_ChannelWrites_WhenReadIsRequested2()
{
Channel.ReaderWriter readerWriter = new();
_ = Task.Run(async () => await readerWriter.WriteAsync(new ReadOnlySequence<byte>(new byte[] { 1, 2 })));
ReadOnlySequence<byte> res1 = await readerWriter.ReadAsync(3, ReadBlockingMode.WaitAny);
_ = Task.Run(async () => await readerWriter.WriteAsync(new ReadOnlySequence<byte>([1, 2])));
ReadOnlySequence<byte> res1 = await readerWriter.ReadAsync(3, ReadBlockingMode.WaitAny).OrThrow();
Assert.That(res1.ToArray().Length, Is.EqualTo(2));
}

[Test]
public async Task Test_ChannelReadsNithing_WhenItIsDontWaitAndEmpty()
{
Channel.ReaderWriter readerWriter = new();
ReadOnlySequence<byte> anyData = await readerWriter.ReadAsync(0, ReadBlockingMode.DontWait);
ReadOnlySequence<byte> anyData = await readerWriter.ReadAsync(0, ReadBlockingMode.DontWait).OrThrow();
Assert.That(anyData.ToArray(), Is.Empty);
anyData = await readerWriter.ReadAsync(1, ReadBlockingMode.DontWait);
anyData = await readerWriter.ReadAsync(1, ReadBlockingMode.DontWait).OrThrow();
Assert.That(anyData.ToArray(), Is.Empty);
anyData = await readerWriter.ReadAsync(10, ReadBlockingMode.DontWait);
anyData = await readerWriter.ReadAsync(10, ReadBlockingMode.DontWait).OrThrow();
Assert.That(anyData.ToArray(), Is.Empty);
}

[Test]
public async Task Test_ChannelWrites_WhenReadIsRequested3()
{
Channel.ReaderWriter readerWriter = new();
ReadOnlySequence<byte> res1 = await readerWriter.ReadAsync(3, ReadBlockingMode.DontWait);
ReadOnlySequence<byte> res1 = await readerWriter.ReadAsync(3, ReadBlockingMode.DontWait).OrThrow();
Assert.That(res1.ToArray().Length, Is.EqualTo(0));
}

[Test]
public async Task Test_ChannelWrites_Eof()
{
Channel.ReaderWriter readerWriter = new();

_ = Task.Run(async () =>
{
await readerWriter.WriteAsync(new ReadOnlySequence<byte>([1, 2, 3]));
await readerWriter.WriteEofAsync();
});

Assert.That(await readerWriter.CanReadAsync(), Is.EqualTo(IOResult.Ok));
ReadOnlySequence<byte> res1 = await readerWriter.ReadAsync(3, ReadBlockingMode.WaitAll).OrThrow();

Assert.ThrowsAsync<ChannelClosedException>(async () => await readerWriter.ReadAsync(3, ReadBlockingMode.DontWait).OrThrow());
Assert.That(await readerWriter.CanReadAsync(), Is.EqualTo(IOResult.Ended));

Assert.ThrowsAsync<ChannelClosedException>(async () => await readerWriter.ReadAsync(3, ReadBlockingMode.WaitAny).OrThrow());
Assert.That(await readerWriter.CanReadAsync(), Is.EqualTo(IOResult.Ended));

Assert.ThrowsAsync<ChannelClosedException>(async () => await readerWriter.ReadAsync(3, ReadBlockingMode.WaitAll).OrThrow());
Assert.That(await readerWriter.CanReadAsync(), Is.EqualTo(IOResult.Ended));

}

[TestCase(new byte[0])]
[TestCase(new byte[] { 1, 2, 3 })]
public async Task Test_ChannelWrites_CannotWriteAfterEof(byte[] toWrite)
{
Channel.ReaderWriter readerWriter = new();

await readerWriter.WriteEofAsync();
Assert.That(await readerWriter.CanReadAsync(), Is.EqualTo(IOResult.Ended));

Assert.ThrowsAsync<ChannelClosedException>(async () => await readerWriter.WriteAsync(new ReadOnlySequence<byte>(toWrite)).OrThrow());
Assert.That(await readerWriter.CanReadAsync(), Is.EqualTo(IOResult.Ended));
}

[Test]
public async Task Test_ChannelWrites_CanReadAny()
{
Channel.ReaderWriter readerWriter = new();

_ = Task.Run(async () =>
{
await readerWriter.WriteAsync(new ReadOnlySequence<byte>([1, 2, 3]));
await readerWriter.WriteEofAsync();
});

ReadOnlySequence<byte> res1 = await readerWriter.ReadAsync(3, ReadBlockingMode.WaitAll).OrThrow();

Assert.That(res1, Has.Length.EqualTo(3));

Assert.ThrowsAsync<ChannelClosedException>(async () => await readerWriter.ReadAsync(3, ReadBlockingMode.DontWait).OrThrow());
Assert.That(await readerWriter.CanReadAsync(), Is.EqualTo(IOResult.Ended));

Assert.ThrowsAsync<ChannelClosedException>(async () => await readerWriter.ReadAsync(3, ReadBlockingMode.WaitAny).OrThrow());
Assert.That(await readerWriter.CanReadAsync(), Is.EqualTo(IOResult.Ended));

Assert.ThrowsAsync<ChannelClosedException>(async () => await readerWriter.ReadAsync(3, ReadBlockingMode.WaitAll).OrThrow());
Assert.That(await readerWriter.CanReadAsync(), Is.EqualTo(IOResult.Ended));
}

[Test]
public async Task Test_ChannelWrites_CannotReadAll_OnePacket()
{
Channel.ReaderWriter readerWriter = new();

_ = Task.Run(async () =>
{
await readerWriter.WriteAsync(new ReadOnlySequence<byte>([1, 2, 3]));
await readerWriter.WriteEofAsync();
});

Assert.ThrowsAsync<ChannelClosedException>(async () => await readerWriter.ReadAsync(5, ReadBlockingMode.WaitAll).OrThrow());
Assert.That(await readerWriter.CanReadAsync(), Is.EqualTo(IOResult.Ended));
}

[Test]
public async Task Test_ChannelWrites_CannotReadAll_Fragmented()
{
Channel.ReaderWriter readerWriter = new();

_ = Task.Run(async () =>
{
await readerWriter.WriteAsync(new ReadOnlySequence<byte>([1]));
await readerWriter.WriteAsync(new ReadOnlySequence<byte>([2, 3]));
await readerWriter.WriteAsync(new ReadOnlySequence<byte>([4]));
await readerWriter.WriteEofAsync();
});

Assert.ThrowsAsync<ChannelClosedException>(async () => await readerWriter.ReadAsync(5, ReadBlockingMode.WaitAll).OrThrow());
Assert.That(await readerWriter.CanReadAsync(), Is.EqualTo(IOResult.Ended));
}
}
49 changes: 49 additions & 0 deletions src/libp2p/Libp2p.Core.TestsBase/DebugLoggerFactory.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
// SPDX-FileCopyrightText: 2024 Demerzel Solutions Limited
// SPDX-License-Identifier: MIT

using Microsoft.Extensions.Logging;
using NUnit.Framework;

namespace Nethermind.Libp2p.Core.TestsBase;

public class DebugLoggerFactory : ILoggerFactory
{
class DebugLogger(string categoryName) : ILogger, IDisposable
{
private readonly string _categoryName = categoryName;

public IDisposable? BeginScope<TState>(TState state) where TState : notnull
{
return this;
}

public void Dispose()
{
}

public bool IsEnabled(LogLevel logLevel)
{
return true;
}

public void Log<TState>(LogLevel logLevel, EventId eventId, TState state, Exception? exception, Func<TState, Exception?, string> formatter)
{
TestContext.Out.WriteLine($"{logLevel} {_categoryName}:{eventId}: {(exception is null ? state?.ToString() : formatter(state, exception))}");
}
}

public void AddProvider(ILoggerProvider provider)
{

}

public ILogger CreateLogger(string categoryName)
{
return new DebugLogger(categoryName);
}

public void Dispose()
{

}
}
Loading

0 comments on commit 79237fb

Please sign in to comment.