Skip to content

Commit

Permalink
Completing INSERT query immediately after sending a table (#77)
Browse files Browse the repository at this point in the history
  • Loading branch information
victor-sushko committed Jul 31, 2023
1 parent 133f3b1 commit 0558724
Show file tree
Hide file tree
Showing 4 changed files with 167 additions and 44 deletions.
146 changes: 134 additions & 12 deletions src/Octonica.ClickHouseClient/ClickHouseColumnWriter.cs
Original file line number Diff line number Diff line change
Expand Up @@ -38,12 +38,13 @@ namespace Octonica.ClickHouseClient
public sealed class ClickHouseColumnWriter : IDisposable, IAsyncDisposable
{
private readonly ClickHouseTcpClient.Session _session;

private readonly ClientQueryMessage _query;
private readonly ReadOnlyCollection<ColumnInfo> _columns;

private ClickHouseColumnSettings?[]? _columnSettings;

private int? _rowsPerBlock;
private bool _endOfStream;

/// <summary>
/// Gets the number of fields (columns) in the table.
Expand Down Expand Up @@ -72,9 +73,10 @@ public int? MaxBlockSize
}
}

internal ClickHouseColumnWriter(ClickHouseTcpClient.Session session, ReadOnlyCollection<ColumnInfo> columns)
internal ClickHouseColumnWriter(ClickHouseTcpClient.Session session, ClientQueryMessage query, ReadOnlyCollection<ColumnInfo> columns)
{
_session = session ?? throw new ArgumentNullException(nameof(session));
_query = query ?? throw new ArgumentNullException(nameof(query));
_columns = columns;

if (columns.Count <= 100)
Expand All @@ -85,6 +87,39 @@ internal ClickHouseColumnWriter(ClickHouseTcpClient.Session session, ReadOnlyCol
MaxBlockSize = 8800 - 8 * columns.Count;
}

internal static async ValueTask<ClickHouseTable> ReadTableMetadata(ClickHouseTcpClient.Session session, string queryText, bool async, CancellationToken cancellationToken)
{
var msg = await session.ReadMessage(async, cancellationToken);
switch (msg.MessageCode)
{
case ServerMessageCode.Error:
throw ((ServerErrorMessage)msg).Exception.CopyWithQuery(queryText);

case ServerMessageCode.TableColumns:
break;

default:
throw new ClickHouseException(ClickHouseErrorCodes.ProtocolUnexpectedResponse, $"Unexpected server message. Received the message of type {msg.MessageCode}.");
}

msg = await session.ReadMessage(async, cancellationToken);
ClickHouseTable data;
switch (msg.MessageCode)
{
case ServerMessageCode.Error:
throw ((ServerErrorMessage)msg).Exception.CopyWithQuery(queryText);

case ServerMessageCode.Data:
data = await session.ReadTable((ServerDataMessage)msg, null, async, cancellationToken);
break;

default:
throw new ClickHouseException(ClickHouseErrorCodes.ProtocolUnexpectedResponse, $"Unexpected server message. Received the message of type {msg.MessageCode}.");
}

return data;
}

/// <inheritdoc cref="ClickHouseDataReader.ConfigureColumn(string, ClickHouseColumnSettings)"/>
public void ConfigureColumn(string name, ClickHouseColumnSettings columnSettings)
{
Expand Down Expand Up @@ -277,7 +312,7 @@ private async ValueTask WriteRow(IReadOnlyCollection<object?> values, bool async
}

var table = new ClickHouseTableWriter(string.Empty, 1, columnWriters);
await SendTable(table, async, cancellationToken);
await SendTable(table, true, async, cancellationToken);
}

/// <summary>
Expand Down Expand Up @@ -376,7 +411,7 @@ private async ValueTask WriteTable(IReadOnlyList<object?> columns, int rowCount,
if (rowCount < 0)
throw new ArgumentOutOfRangeException(nameof(rowCount));
if (rowCount == 0)
throw new ArgumentException("The number of rows must be grater than zero.", nameof(rowCount));
throw new ArgumentException("The number of rows must be greater than zero.", nameof(rowCount));

if (IsClosed)
throw new ClickHouseException(ClickHouseErrorCodes.InvalidConnectionState, "The writer is closed.");
Expand All @@ -393,22 +428,57 @@ private async ValueTask WriteTable(IReadOnlyList<object?> columns, int rowCount,
for (offset = 0; offset + blockSize < rowCount; offset += blockSize)
{
var table = new ClickHouseTableWriter(string.Empty, blockSize, writerFactories.Select(w => w.Create(offset, blockSize)));
await SendTable(table, async, cancellationToken);
await SendTable(table, false, async, cancellationToken);
}

var finalBlockSize = rowCount - offset;
var finalTable = new ClickHouseTableWriter(string.Empty, finalBlockSize, writerFactories.Select(w => w.Create(offset, finalBlockSize)));
await SendTable(finalTable, async, cancellationToken);
await SendTable(finalTable, true, async, cancellationToken);
}

private async ValueTask SendTable(ClickHouseTableWriter table, bool async, CancellationToken cancellationToken)
private async ValueTask SendTable(ClickHouseTableWriter table, bool confirm, bool async, CancellationToken cancellationToken)
{
if (_endOfStream)
await RepeatQuery(async, cancellationToken);

try
{
await _session.SendTable(table, async, cancellationToken);

if (confirm)
await EndWrite(disposing: false, closeSession: false, async, cancellationToken);
}
catch (ClickHouseHandledException)
{
throw;
}
catch (Exception ex)
{
var aggrEx = await _session.SetFailed(ex, false, async);
if (aggrEx != null)
throw aggrEx;

throw;
}
}

private async ValueTask RepeatQuery(bool async, CancellationToken cancellationToken)
{
ClickHouseTable data;
try
{
await _session.SendQuery(_query, async, cancellationToken);
data = await ReadTableMetadata(_session, _query.Query, async, cancellationToken);
_endOfStream = false;
}
catch (ClickHouseServerException)
{
await _session.Dispose(async);
throw;
}
catch (ClickHouseHandledException)
{
await _session.Dispose(async);
throw;
}
catch (Exception ex)
Expand All @@ -419,14 +489,51 @@ private async ValueTask SendTable(ClickHouseTableWriter table, bool async, Cance

throw;
}

try
{
// Repeating the query is almost the same as opening a new independent column writer. So we must check that the structure of the table wasn't changed.
var newColumns = data.Header.Columns;
if (newColumns.Count != _columns.Count)
throw new ClickHouseException(ClickHouseErrorCodes.TableModified, "The number of columns returned by the query has changed.");

for (int i = 0; i < data.Columns.Count; i++)
{
var newCol = newColumns[i];
var origCol = _columns[i];

if (!string.Equals(origCol.Name, newCol.Name, StringComparison.Ordinal))
throw new ClickHouseException(ClickHouseErrorCodes.TableModified, $"Unexpected column \"{newCol.Name}\" at the position {i}. Expected \"{origCol.Name}\".");

if (!string.Equals(origCol.TypeInfo.ComplexTypeName, newCol.TypeInfo.ComplexTypeName, StringComparison.Ordinal))
throw new ClickHouseException(ClickHouseErrorCodes.TableModified, $"The type of the column \"{origCol.Name}\" has changed from \"{origCol.TypeInfo.ComplexTypeName}\" to \"{newCol.TypeInfo.ComplexTypeName}\" between queries.");
}
}
catch (Exception ex)
{
try
{
await EndWrite(disposing: true, closeSession: true, async, cancellationToken);
}
catch (Exception disposingEx)
{
throw new AggregateException(ex, disposingEx);
}

var hEx = ClickHouseHandledException.Wrap(ex);
if (ReferenceEquals(hEx, ex))
throw;

throw hEx;
}
}

/// <summary>
/// Closes the writer and releases all resources associated with it.
/// </summary>
public void EndWrite()
{
TaskHelper.WaitNonAsyncTask(EndWrite(false, false, CancellationToken.None));
TaskHelper.WaitNonAsyncTask(EndWrite(disposing: false, closeSession: true, false, CancellationToken.None));
}

/// <summary>
Expand All @@ -436,14 +543,25 @@ public void EndWrite()
/// <returns>A <see cref="Task"/> representing asyncronous operation.</returns>
public async Task EndWriteAsync(CancellationToken cancellationToken)
{
await EndWrite(false, true, cancellationToken);
await EndWrite(disposing: false, closeSession: true, true, cancellationToken);
}

private async ValueTask EndWrite(bool disposing, bool async, CancellationToken cancellationToken)
private async ValueTask EndWrite(bool disposing, bool closeSession, bool async, CancellationToken cancellationToken)
{
// The session should not be in the open state after disposing
Debug.Assert(closeSession || !disposing);

if (IsClosed)
return;

if (_endOfStream)
{
if (closeSession)
await _session.Dispose(async);

return;
}

try
{
if (disposing)
Expand All @@ -459,7 +577,11 @@ private async ValueTask EndWrite(bool disposing, bool async, CancellationToken c
switch (message.MessageCode)
{
case ServerMessageCode.EndOfStream:
await _session.Dispose(async);
if (closeSession)
await _session.Dispose(async);
else
_endOfStream = true;

break;

case ServerMessageCode.Error:
Expand Down Expand Up @@ -522,7 +644,7 @@ public ValueTask DisposeAsync()

private async ValueTask Dispose(bool async)
{
await EndWrite(true, async, CancellationToken.None);
await EndWrite(disposing: true, closeSession: true, async, CancellationToken.None);
}

internal static async ValueTask<IClickHouseColumnWriterFactory> CreateColumnWriterFactory(ColumnInfo columnInfo, object? column, int columnIndex, int rowCount, ClickHouseColumnSettings? settings, bool async, CancellationToken cancellationToken)
Expand Down
32 changes: 3 additions & 29 deletions src/Octonica.ClickHouseClient/ClickHouseConnection.cs
Original file line number Diff line number Diff line change
Expand Up @@ -395,38 +395,12 @@ private async ValueTask<ClickHouseColumnWriter> CreateColumnWriter(string insert
session = await connectionState.TcpClient.OpenSession(async, null, CancellationToken.None, cancellationToken);

var messageBuilder = new ClientQueryMessage.Builder {QueryKind = QueryKind.InitialQuery, Query = insertFormatCommand};
await session.SendQuery(messageBuilder, null, async, cancellationToken);
var query = await session.SendQuery(messageBuilder, null, async, cancellationToken);

cancelOnFailure = true;
var msg = await session.ReadMessage(async, cancellationToken);
switch (msg.MessageCode)
{
case ServerMessageCode.Error:
throw ((ServerErrorMessage) msg).Exception.CopyWithQuery(insertFormatCommand);

case ServerMessageCode.TableColumns:
break;

default:
throw new ClickHouseException(ClickHouseErrorCodes.ProtocolUnexpectedResponse, $"Unexpected server message. Received the message of type {msg.MessageCode}.");
}

msg = await session.ReadMessage(async, cancellationToken);
ClickHouseTable data;
switch (msg.MessageCode)
{
case ServerMessageCode.Error:
throw ((ServerErrorMessage) msg).Exception.CopyWithQuery(insertFormatCommand);

case ServerMessageCode.Data:
data = await session.ReadTable((ServerDataMessage) msg, null, async, cancellationToken);
break;

default:
throw new ClickHouseException(ClickHouseErrorCodes.ProtocolUnexpectedResponse, $"Unexpected server message. Received the message of type {msg.MessageCode}.");
}
var data = await ClickHouseColumnWriter.ReadTableMetadata(session, query.Query, async, cancellationToken);

return new ClickHouseColumnWriter(session, data.Header.Columns);
return new ClickHouseColumnWriter(session, query, data.Header.Columns);
}
catch (ClickHouseServerException)
{
Expand Down
26 changes: 24 additions & 2 deletions src/Octonica.ClickHouseClient/ClickHouseTcpClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,7 @@ public Session(ClickHouseTcpClient client, IClickHouseSessionExternalResources?
_sessionCancellationToken = sessionCancellationToken;
}

public async ValueTask SendQuery(
public async ValueTask<ClientQueryMessage> SendQuery(
ClientQueryMessage.Builder messageBuilder,
IReadOnlyCollection<IClickHouseTableWriter>? tables,
bool async,
Expand All @@ -165,6 +165,7 @@ public async ValueTask SendQuery(
CheckDisposed();

var writer = _client._writer;
ClientQueryMessage queryMessage;
try
{
var settings = _client._settings;
Expand All @@ -176,7 +177,7 @@ public async ValueTask SendQuery(
messageBuilder.ProtocolRevision = Math.Min(ClickHouseProtocolRevisions.CurrentRevision, _client.ServerInfo.Revision);
messageBuilder.CompressionEnabled = _client._settings.Compress;

var queryMessage = messageBuilder.Build();
queryMessage = messageBuilder.Build();
if (queryMessage.Settings != null)
{
if (_client.ServerInfo.Revision < ClickHouseProtocolRevisions.MinRevisionWithSettingsSerializedAsStrings)
Expand Down Expand Up @@ -204,6 +205,27 @@ public async ValueTask SendQuery(
}

await WithCancellationToken(cancellationToken, ct => writer.Flush(async, ct));

return queryMessage;
}

public async ValueTask SendQuery(ClientQueryMessage queryMessage, bool async, CancellationToken cancellationToken)
{
CheckDisposed();

var writer = _client._writer;
try
{
queryMessage.Write(writer);
WriteTable(ClickHouseEmptyTableWriter.Instance);
}
catch (Exception ex)
{
writer.Discard();
throw ClickHouseHandledException.Wrap(ex);
}

await WithCancellationToken(cancellationToken, ct => writer.Flush(async, ct));
}

public async ValueTask SendCancel(bool async)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
#region License Apache 2.0
/* Copyright 2019-2021 Octonica
/* Copyright 2019-2021, 2023 Octonica
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -139,5 +139,10 @@ public static class ClickHouseErrorCodes
/// The code for an error caused by a violation of the TLS protocol.
/// </summary>
public const int TlsError = 22;

/// <summary>
/// The code for an error caused by unexpected changes in the table's structure.
/// </summary>
public const int TableModified = 23;
}
}

0 comments on commit 0558724

Please sign in to comment.