Skip to content

Commit

Permalink
SepReaderExtensions: Add EnumerateAsync (#248)
Browse files Browse the repository at this point in the history
  • Loading branch information
nietras authored Jan 29, 2025
1 parent 3c6b0f9 commit 0091351
Show file tree
Hide file tree
Showing 4 changed files with 95 additions and 6 deletions.
2 changes: 2 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -2116,6 +2116,8 @@ namespace nietras.SeparatedValues
{
public static System.Collections.Generic.IEnumerable<T> Enumerate<T>(this nietras.SeparatedValues.SepReader reader, nietras.SeparatedValues.SepReader.RowFunc<T> select) { }
public static System.Collections.Generic.IEnumerable<T> Enumerate<T>(this nietras.SeparatedValues.SepReader reader, nietras.SeparatedValues.SepReader.RowTryFunc<T> trySelect) { }
public static System.Collections.Generic.IAsyncEnumerable<T> EnumerateAsync<T>(this nietras.SeparatedValues.SepReader reader, nietras.SeparatedValues.SepReader.RowFunc<T> select) { }
public static System.Collections.Generic.IAsyncEnumerable<T> EnumerateAsync<T>(this nietras.SeparatedValues.SepReader reader, nietras.SeparatedValues.SepReader.RowTryFunc<T> trySelect) { }
public static nietras.SeparatedValues.SepReader From(in this nietras.SeparatedValues.SepReaderOptions options, byte[] buffer) { }
public static nietras.SeparatedValues.SepReader From(in this nietras.SeparatedValues.SepReaderOptions options, System.IO.Stream stream) { }
public static nietras.SeparatedValues.SepReader From(in this nietras.SeparatedValues.SepReaderOptions options, System.IO.TextReader reader) { }
Expand Down
52 changes: 47 additions & 5 deletions src/Sep.Test/SepReaderExtensionsEnumerationTest.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
using System.IO;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using Microsoft.VisualStudio.TestTools.UnitTesting;

namespace nietras.SeparatedValues.Test;
Expand Down Expand Up @@ -38,6 +39,23 @@ public void SepReaderExtensionsEnumerationTest_Enumerate_RowTryFunc()
CollectionAssert.AreEqual(expected, actual);
}

[TestMethod]
public async ValueTask SepReaderExtensionsEnumerationTest_EnumerateAsync_RowFunc()
{
using var reader = await CreateReaderAsync();
var actual = await ToListAsync(reader.EnumerateAsync(Parse));
CollectionAssert.AreEqual(s_expected, actual);
}

[TestMethod]
public async ValueTask SepReaderExtensionsEnumerationTest_EnumerateAsync_RowTryFunc()
{
using var reader = await CreateReaderAsync();
var actual = await ToListAsync(reader.EnumerateAsync<Seq>(TryParseEven));
var expected = s_expected.Where(s => s.Inc % 2 == 0).ToList();
CollectionAssert.AreEqual(expected, actual);
}

[TestMethod]
public void SepReaderExtensionsEnumerationTest_ParallelEnumerate_RowFunc()
{
Expand All @@ -56,6 +74,24 @@ public void SepReaderExtensionsEnumerationTest_ParallelEnumerate_RowTryFunc()
}

static SepReader CreateReader()
{
var csv = CreateCsv();
// Force small initial buffer length even for Release, to force reader
// state swapping and array swapping with increasing row length for
// ParallelEnumerate.
return Sep.Reader(o => o with { InitialBufferLength = 128 }).FromText(csv);
}

static ValueTask<SepReader> CreateReaderAsync()
{
var csv = CreateCsv();
// Force small initial buffer length even for Release, to force reader
// state swapping and array swapping with increasing row length for
// ParallelEnumerate.
return Sep.Reader(o => o with { InitialBufferLength = 128 }).FromTextAsync(csv);
}

static string CreateCsv()
{
var sb = new StringBuilder(1024 * 1024);
using var stringWriter = new StringWriter(sb);
Expand All @@ -70,11 +106,7 @@ static SepReader CreateReader()
row[ColNameDec].Format(dec);
}
}
var csv = sb.ToString();
// Force small initial buffer length even for Release, to force reader
// state swapping and array swapping with increasing row length for
// ParallelEnumerate.
return Sep.Reader(o => o with { InitialBufferLength = 128 }).FromText(csv);
return sb.ToString();
}

static bool TryParseEven(SepReader.Row row, out Seq seq)
Expand All @@ -85,4 +117,14 @@ static bool TryParseEven(SepReader.Row row, out Seq seq)

static Seq Parse(SepReader.Row row) =>
new(row[ColNameInc].Parse<int>(), row[ColNameDec].Parse<int>());

static async ValueTask<List<T>> ToListAsync<T>(IAsyncEnumerable<T> e)
{
var values = new List<T>();
await foreach (var v in e)
{
values.Add(v);
}
return values;
}
}
2 changes: 1 addition & 1 deletion src/Sep/SepReader.cs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ public sealed partial class SepReader : SepReaderState
readonly Info _info;
char _separator;
readonly bool _disableQuotesParsing;
readonly bool _continueOnCapturedContext;
internal readonly bool _continueOnCapturedContext;
readonly TextReader _reader;
ISepParser? _parser;

Expand Down
45 changes: 45 additions & 0 deletions src/Sep/SepReaderExtensions.Enumeration.cs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Linq;
using System.Runtime.CompilerServices;
using System.Threading;

namespace nietras.SeparatedValues;

Expand Down Expand Up @@ -36,6 +38,49 @@ public static IEnumerable<T> Enumerate<T>(this SepReader reader, SepReader.RowTr
}
}

public static IAsyncEnumerable<T> EnumerateAsync<T>(this SepReader reader, SepReader.RowFunc<T> select)
{
ArgumentNullException.ThrowIfNull(reader);
ArgumentNullException.ThrowIfNull(select);
return Impl(reader, select, default);

// Follow pattern seen in https://github.com/dotnet/runtime/pull/111685/files
static async IAsyncEnumerable<T> Impl(SepReader reader, SepReader.RowFunc<T> select,
[EnumeratorCancellation] CancellationToken cancellationToken)
{
#pragma warning disable CA2007 // Consider calling ConfigureAwait on the awaited task
await using var enumerator = reader.GetAsyncEnumerator(cancellationToken);
#pragma warning restore CA2007 // Consider calling ConfigureAwait on the awaited task
while (await enumerator.MoveNextAsync().ConfigureAwait(reader._continueOnCapturedContext))
{
yield return select(enumerator.Current);
}
}
}

public static IAsyncEnumerable<T> EnumerateAsync<T>(this SepReader reader, SepReader.RowTryFunc<T> trySelect)
{
ArgumentNullException.ThrowIfNull(reader);
ArgumentNullException.ThrowIfNull(trySelect);
return Impl(reader, trySelect, default);

// Follow pattern seen in https://github.com/dotnet/runtime/pull/111685/files
static async IAsyncEnumerable<T> Impl(SepReader reader, SepReader.RowTryFunc<T> trySelect,
[EnumeratorCancellation] CancellationToken cancellationToken)
{
#pragma warning disable CA2007 // Consider calling ConfigureAwait on the awaited task
await using var enumerator = reader.GetAsyncEnumerator(cancellationToken);
#pragma warning restore CA2007 // Consider calling ConfigureAwait on the awaited task
while (await enumerator.MoveNextAsync().ConfigureAwait(reader._continueOnCapturedContext))
{
if (trySelect(enumerator.Current, out var value))
{
yield return value;
}
}
}
}

public static IEnumerable<T> ParallelEnumerate<T>(this SepReader reader, SepReader.RowFunc<T> select)
{
ArgumentNullException.ThrowIfNull(reader);
Expand Down

0 comments on commit 0091351

Please sign in to comment.