Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

SNOW-968741: Arrow performance optimizations #831

Merged
merged 13 commits into from
Jan 3, 2024
9 changes: 4 additions & 5 deletions Snowflake.Data.Tests/IntegrationTests/SFDbDataReaderIT.cs
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,7 @@

namespace Snowflake.Data.Tests.IntegrationTests
{
// TODO: enable tests for Arrow
//[TestFixture(ResultFormat.ARROW)]
[TestFixture(ResultFormat.ARROW)]
[TestFixture(ResultFormat.JSON)]
class SFDbDataReaderIT : SFBaseTest
{
Expand Down Expand Up @@ -571,7 +570,7 @@ public void TestGetTimestampLTZ()
}

[Test]
public void TestGetBoolean()
public void TestGetBoolean([Values]bool value)
{
using (var conn = CreateAndOpenConnection())
{
Expand All @@ -585,7 +584,7 @@ public void TestGetBoolean()
var p1 = cmd.CreateParameter();
p1.ParameterName = "1";
p1.DbType = DbType.Boolean;
p1.Value = true;
p1.Value = value;
cmd.Parameters.Add(p1);

var count = cmd.ExecuteNonQuery();
Expand All @@ -597,7 +596,7 @@ public void TestGetBoolean()
ValidateResultFormat(reader);

Assert.IsTrue(reader.Read());
Assert.IsTrue(reader.GetBoolean(0));
Assert.AreEqual(value, reader.GetBoolean(0));
reader.Close();

CloseConnection(conn);
Expand Down
2 changes: 1 addition & 1 deletion Snowflake.Data.Tests/UnitTests/ArrowResultChunkTest.cs
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,7 @@ public void TestGetChunkIndexReturnsFirstChunk()
{
var chunk = new ArrowResultChunk(_recordBatchOne);

Assert.AreEqual(0, chunk.ChunkIndex);
Assert.AreEqual(-1, chunk.ChunkIndex);
}

[Test]
Expand Down
219 changes: 160 additions & 59 deletions Snowflake.Data/Core/ArrowResultChunk.cs

Large diffs are not rendered by default.

88 changes: 61 additions & 27 deletions Snowflake.Data/Core/ArrowResultSet.cs
Original file line number Diff line number Diff line change
Expand Up @@ -23,27 +23,11 @@
private BaseResultChunk _currentChunk;
private readonly IChunkDownloader _chunkDownloader;

public ArrowResultSet(QueryExecResponseData responseData, SFStatement sfStatement, CancellationToken cancellationToken) : base()
public ArrowResultSet(QueryExecResponseData responseData, SFStatement sfStatement, CancellationToken cancellationToken)
{
columnCount = responseData.rowType.Count;
try
{
if (responseData.rowsetBase64.Length > 0)
{
using (var stream = new MemoryStream(Convert.FromBase64String(responseData.rowsetBase64)))
{
using (var reader = new ArrowStreamReader(stream))
{
var recordBatch = reader.ReadNextRecordBatch();
_currentChunk = new ArrowResultChunk(recordBatch);
}
}
}
else
{
_currentChunk = new ArrowResultChunk(columnCount);
}

this.sfStatement = sfStatement;
UpdateSessionStatus(responseData);

Expand All @@ -60,8 +44,28 @@
isClosed = false;

queryId = responseData.queryId;

if (responseData.rowsetBase64.Length > 0)
sfc-gh-knozderko marked this conversation as resolved.
Show resolved Hide resolved
{
using (var stream = new MemoryStream(Convert.FromBase64String(responseData.rowsetBase64)))
{
using (var reader = new ArrowStreamReader(stream))
{
var recordBatch = reader.ReadNextRecordBatch();
_currentChunk = new ArrowResultChunk(recordBatch);
while ((recordBatch = reader.ReadNextRecordBatch()) != null)
{
((ArrowResultChunk)_currentChunk).AddRecordBatch(recordBatch);
}

Check warning on line 59 in Snowflake.Data/Core/ArrowResultSet.cs

View check run for this annotation

Codecov / codecov/patch

Snowflake.Data/Core/ArrowResultSet.cs#L57-L59

Added lines #L57 - L59 were not covered by tests
}
}
}
else
{
_currentChunk = new ArrowResultChunk(columnCount);
}
}
catch(System.Exception ex)
catch(Exception ex)

Check warning on line 68 in Snowflake.Data/Core/ArrowResultSet.cs

View check run for this annotation

Codecov / codecov/patch

Snowflake.Data/Core/ArrowResultSet.cs#L68

Added line #L68 was not covered by tests
{
s_logger.Error("Result set error queryId="+responseData.queryId, ex);
throw;
Expand All @@ -77,14 +81,16 @@

if (_totalChunkCount > 0)
{
s_logger.Debug("Get next chunk from chunk downloader");
s_logger.Debug($"Get next chunk from chunk downloader, chunk: {_currentChunk.ChunkIndex + 1}/{_totalChunkCount}" +
$" rows: {_currentChunk.RowCount}, size compressed: {_currentChunk.CompressedSize}," +
$" size uncompressed: {_currentChunk.UncompressedSize}");

Check warning on line 86 in Snowflake.Data/Core/ArrowResultSet.cs

View check run for this annotation

Codecov / codecov/patch

Snowflake.Data/Core/ArrowResultSet.cs#L84-L86

Added lines #L84 - L86 were not covered by tests
_currentChunk = await _chunkDownloader.GetNextChunkAsync().ConfigureAwait(false);
return _currentChunk?.Next() ?? false;
}

return false;
}

internal override bool Next()
{
ThrowIfClosed();
Expand All @@ -94,8 +100,11 @@

if (_totalChunkCount > 0)
{
s_logger.Debug("Get next chunk from chunk downloader");
s_logger.Debug($"Get next chunk from chunk downloader, chunk: {_currentChunk.ChunkIndex + 1}/{_totalChunkCount}" +
$" rows: {_currentChunk.RowCount}, size compressed: {_currentChunk.CompressedSize}," +
$" size uncompressed: {_currentChunk.UncompressedSize}");

Check warning on line 105 in Snowflake.Data/Core/ArrowResultSet.cs

View check run for this annotation

Codecov / codecov/patch

Snowflake.Data/Core/ArrowResultSet.cs#L103-L105

Added lines #L103 - L105 were not covered by tests
_currentChunk = Task.Run(async() => await (_chunkDownloader.GetNextChunkAsync()).ConfigureAwait(false)).Result;

return _currentChunk?.Next() ?? false;
}

Expand Down Expand Up @@ -140,7 +149,7 @@

return false;
}

private object GetObjectInternal(int ordinal)
{
ThrowIfClosed();
Expand All @@ -159,14 +168,39 @@
{
var value = GetObjectInternal(ordinal);
if (value == DBNull.Value)
{
return value;
}

if (value is decimal ret)
return ret;

var dstType = sfResultSetMetaData.GetCSharpTypeByIndex(ordinal);
object obj;
checked
{
switch (value)
{
case decimal ret: obj = ret;
break;
case long ret: obj = ret;
break;
case int ret: obj = (long)ret;
break;
case short ret: obj = (long)ret;
break;
case sbyte ret: obj = (long)ret;
break;
case string ret: obj = ret;
break;
case bool ret: obj = ret;
break;
default:
{
var dstType = sfResultSetMetaData.GetCSharpTypeByIndex(ordinal);
obj = Convert.ChangeType(value, dstType);
break;
}
}
}

return Convert.ChangeType(value, dstType);
return obj;
}

internal override bool IsDBNull(int ordinal)
Expand Down
10 changes: 9 additions & 1 deletion Snowflake.Data/Core/BaseResultChunk.cs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,10 @@ public abstract class BaseResultChunk : IResultChunk

public int ChunkIndex { get; protected set; }

internal int CompressedSize;

internal int UncompressedSize;

internal string Url { get; set; }

internal string[,] RowSet { get; set; }
Expand All @@ -32,12 +36,16 @@ public abstract class BaseResultChunk : IResultChunk
internal abstract bool Next();

internal abstract bool Rewind();

internal virtual void Reset(ExecResponseChunk chunkInfo, int chunkIndex)
{
RowCount = chunkInfo.rowCount;
Url = chunkInfo.url;
ChunkIndex = chunkIndex;

sfc-gh-knozderko marked this conversation as resolved.
Show resolved Hide resolved
CompressedSize = chunkInfo.compressedSize;
UncompressedSize = chunkInfo.uncompressedSize;

sfc-gh-knozderko marked this conversation as resolved.
Show resolved Hide resolved
}

internal virtual void ResetForRetry()
Expand Down
3 changes: 3 additions & 0 deletions Snowflake.Data/Core/RestResponse.cs
Original file line number Diff line number Diff line change
Expand Up @@ -307,6 +307,9 @@ internal class ExecResponseChunk

[JsonProperty(PropertyName = "uncompressedSize")]
internal int uncompressedSize { get; set; }

[JsonProperty(PropertyName = "compressedSize")]
internal int compressedSize { get; set; }
}

internal class CloseResponse : BaseRestResponse
Expand Down
1 change: 1 addition & 0 deletions Snowflake.Data/Core/SFResultChunk.cs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ public SFResultChunk(string[,] rowSet)
RowSet = rowSet;
RowCount = rowSet.GetLength(0);
ColumnCount = rowSet.GetLength(1);
ChunkIndex = -1;
}

public SFResultChunk(string url, int rowCount, int columnCount, int index)
Expand Down
11 changes: 8 additions & 3 deletions Snowflake.Data/Core/SFResultSet.cs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
using Snowflake.Data.Log;
using Snowflake.Data.Client;
using System.Collections.Generic;
using System.Diagnostics;

namespace Snowflake.Data.Core
{
Expand Down Expand Up @@ -115,7 +116,9 @@
{
// GetNextChunk could be blocked if download result is not done yet.
// So put this piece of code in a seperate task
s_logger.Debug("Get next chunk from chunk downloader");
s_logger.Debug($"Get next chunk from chunk downloader, chunk: {_currentChunk.ChunkIndex + 1}/{_totalChunkCount}" +
$" rows: {_currentChunk.RowCount}, size compressed: {_currentChunk.CompressedSize}," +
$" size uncompressed: {_currentChunk.UncompressedSize}");

Check warning on line 121 in Snowflake.Data/Core/SFResultSet.cs

View check run for this annotation

Codecov / codecov/patch

Snowflake.Data/Core/SFResultSet.cs#L119-L121

Added lines #L119 - L121 were not covered by tests
BaseResultChunk nextChunk = await _chunkDownloader.GetNextChunkAsync().ConfigureAwait(false);
if (nextChunk != null)
{
Expand All @@ -136,7 +139,9 @@

if (_chunkDownloader != null)
{
s_logger.Debug("Get next chunk from chunk downloader");
s_logger.Debug($"Get next chunk from chunk downloader, chunk: {_currentChunk.ChunkIndex + 1}/{_totalChunkCount}" +
$" rows: {_currentChunk.RowCount}, size compressed: {_currentChunk.CompressedSize}," +
$" size uncompressed: {_currentChunk.UncompressedSize}");
BaseResultChunk nextChunk = Task.Run(async() => await (_chunkDownloader.GetNextChunkAsync()).ConfigureAwait(false)).Result;
if (nextChunk != null)
{
Expand Down Expand Up @@ -284,7 +289,7 @@
return GetObjectInternal(ordinal).SafeToString();
}
}

internal override object GetValue(int ordinal)
{
UTF8Buffer val = GetObjectInternal(ordinal);
Expand Down
2 changes: 1 addition & 1 deletion Snowflake.Data/Snowflake.Data.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
</PropertyGroup>

<ItemGroup>
<PackageReference Include="Apache.Arrow" Version="12.0.1" />
<PackageReference Include="Apache.Arrow" Version="14.0.2" />
<PackageReference Include="AWSSDK.S3" Version="3.7.0.4" />
<PackageReference Include="Google.Cloud.Storage.V1" Version="4.6.0" />
<PackageReference Include="Azure.Storage.Blobs" Version="12.13.0" />
Expand Down
Loading