diff --git a/src/Octonica.ClickHouseClient/ClickHouseColumnWriter.cs b/src/Octonica.ClickHouseClient/ClickHouseColumnWriter.cs index 8eae17f..007fdeb 100644 --- a/src/Octonica.ClickHouseClient/ClickHouseColumnWriter.cs +++ b/src/Octonica.ClickHouseClient/ClickHouseColumnWriter.cs @@ -38,12 +38,13 @@ namespace Octonica.ClickHouseClient public sealed class ClickHouseColumnWriter : IDisposable, IAsyncDisposable { private readonly ClickHouseTcpClient.Session _session; - + private readonly ClientQueryMessage _query; private readonly ReadOnlyCollection _columns; private ClickHouseColumnSettings?[]? _columnSettings; private int? _rowsPerBlock; + private bool _endOfStream; /// /// Gets the number of fields (columns) in the table. @@ -72,9 +73,10 @@ public int? MaxBlockSize } } - internal ClickHouseColumnWriter(ClickHouseTcpClient.Session session, ReadOnlyCollection columns) + internal ClickHouseColumnWriter(ClickHouseTcpClient.Session session, ClientQueryMessage query, ReadOnlyCollection columns) { _session = session ?? throw new ArgumentNullException(nameof(session)); + _query = query ?? throw new ArgumentNullException(nameof(query)); _columns = columns; if (columns.Count <= 100) @@ -85,6 +87,39 @@ internal ClickHouseColumnWriter(ClickHouseTcpClient.Session session, ReadOnlyCol MaxBlockSize = 8800 - 8 * columns.Count; } + internal static async ValueTask ReadTableMetadata(ClickHouseTcpClient.Session session, string queryText, bool async, CancellationToken cancellationToken) + { + var msg = await session.ReadMessage(async, cancellationToken); + switch (msg.MessageCode) + { + case ServerMessageCode.Error: + throw ((ServerErrorMessage)msg).Exception.CopyWithQuery(queryText); + + case ServerMessageCode.TableColumns: + break; + + default: + throw new ClickHouseException(ClickHouseErrorCodes.ProtocolUnexpectedResponse, $"Unexpected server message. Received the message of type {msg.MessageCode}."); + } + + msg = await session.ReadMessage(async, cancellationToken); + ClickHouseTable data; + switch (msg.MessageCode) + { + case ServerMessageCode.Error: + throw ((ServerErrorMessage)msg).Exception.CopyWithQuery(queryText); + + case ServerMessageCode.Data: + data = await session.ReadTable((ServerDataMessage)msg, null, async, cancellationToken); + break; + + default: + throw new ClickHouseException(ClickHouseErrorCodes.ProtocolUnexpectedResponse, $"Unexpected server message. Received the message of type {msg.MessageCode}."); + } + + return data; + } + /// public void ConfigureColumn(string name, ClickHouseColumnSettings columnSettings) { @@ -277,7 +312,7 @@ private async ValueTask WriteRow(IReadOnlyCollection values, bool async } var table = new ClickHouseTableWriter(string.Empty, 1, columnWriters); - await SendTable(table, async, cancellationToken); + await SendTable(table, true, async, cancellationToken); } /// @@ -376,7 +411,7 @@ private async ValueTask WriteTable(IReadOnlyList columns, int rowCount, if (rowCount < 0) throw new ArgumentOutOfRangeException(nameof(rowCount)); if (rowCount == 0) - throw new ArgumentException("The number of rows must be grater than zero.", nameof(rowCount)); + throw new ArgumentException("The number of rows must be greater than zero.", nameof(rowCount)); if (IsClosed) throw new ClickHouseException(ClickHouseErrorCodes.InvalidConnectionState, "The writer is closed."); @@ -393,22 +428,57 @@ private async ValueTask WriteTable(IReadOnlyList columns, int rowCount, for (offset = 0; offset + blockSize < rowCount; offset += blockSize) { var table = new ClickHouseTableWriter(string.Empty, blockSize, writerFactories.Select(w => w.Create(offset, blockSize))); - await SendTable(table, async, cancellationToken); + await SendTable(table, false, async, cancellationToken); } var finalBlockSize = rowCount - offset; var finalTable = new ClickHouseTableWriter(string.Empty, finalBlockSize, writerFactories.Select(w => w.Create(offset, finalBlockSize))); - await SendTable(finalTable, async, cancellationToken); + await SendTable(finalTable, true, async, cancellationToken); } - private async ValueTask SendTable(ClickHouseTableWriter table, bool async, CancellationToken cancellationToken) + private async ValueTask SendTable(ClickHouseTableWriter table, bool confirm, bool async, CancellationToken cancellationToken) { + if (_endOfStream) + await RepeatQuery(async, cancellationToken); + try { await _session.SendTable(table, async, cancellationToken); + + if (confirm) + await EndWrite(disposing: false, closeSession: false, async, cancellationToken); + } + catch (ClickHouseHandledException) + { + throw; + } + catch (Exception ex) + { + var aggrEx = await _session.SetFailed(ex, false, async); + if (aggrEx != null) + throw aggrEx; + + throw; + } + } + + private async ValueTask RepeatQuery(bool async, CancellationToken cancellationToken) + { + ClickHouseTable data; + try + { + await _session.SendQuery(_query, async, cancellationToken); + data = await ReadTableMetadata(_session, _query.Query, async, cancellationToken); + _endOfStream = false; + } + catch (ClickHouseServerException) + { + await _session.Dispose(async); + throw; } catch (ClickHouseHandledException) { + await _session.Dispose(async); throw; } catch (Exception ex) @@ -419,6 +489,43 @@ private async ValueTask SendTable(ClickHouseTableWriter table, bool async, Cance throw; } + + try + { + // Repeating the query is almost the same as opening a new independent column writer. So we must check that the structure of the table wasn't changed. + var newColumns = data.Header.Columns; + if (newColumns.Count != _columns.Count) + throw new ClickHouseException(ClickHouseErrorCodes.TableModified, "The number of columns returned by the query has changed."); + + for (int i = 0; i < data.Columns.Count; i++) + { + var newCol = newColumns[i]; + var origCol = _columns[i]; + + if (!string.Equals(origCol.Name, newCol.Name, StringComparison.Ordinal)) + throw new ClickHouseException(ClickHouseErrorCodes.TableModified, $"Unexpected column \"{newCol.Name}\" at the position {i}. Expected \"{origCol.Name}\"."); + + if (!string.Equals(origCol.TypeInfo.ComplexTypeName, newCol.TypeInfo.ComplexTypeName, StringComparison.Ordinal)) + throw new ClickHouseException(ClickHouseErrorCodes.TableModified, $"The type of the column \"{origCol.Name}\" has changed from \"{origCol.TypeInfo.ComplexTypeName}\" to \"{newCol.TypeInfo.ComplexTypeName}\" between queries."); + } + } + catch (Exception ex) + { + try + { + await EndWrite(disposing: true, closeSession: true, async, cancellationToken); + } + catch (Exception disposingEx) + { + throw new AggregateException(ex, disposingEx); + } + + var hEx = ClickHouseHandledException.Wrap(ex); + if (ReferenceEquals(hEx, ex)) + throw; + + throw hEx; + } } /// @@ -426,7 +533,7 @@ private async ValueTask SendTable(ClickHouseTableWriter table, bool async, Cance /// public void EndWrite() { - TaskHelper.WaitNonAsyncTask(EndWrite(false, false, CancellationToken.None)); + TaskHelper.WaitNonAsyncTask(EndWrite(disposing: false, closeSession: true, false, CancellationToken.None)); } /// @@ -436,14 +543,25 @@ public void EndWrite() /// A representing asyncronous operation. public async Task EndWriteAsync(CancellationToken cancellationToken) { - await EndWrite(false, true, cancellationToken); + await EndWrite(disposing: false, closeSession: true, true, cancellationToken); } - private async ValueTask EndWrite(bool disposing, bool async, CancellationToken cancellationToken) + private async ValueTask EndWrite(bool disposing, bool closeSession, bool async, CancellationToken cancellationToken) { + // The session should not be in the open state after disposing + Debug.Assert(closeSession || !disposing); + if (IsClosed) return; + if (_endOfStream) + { + if (closeSession) + await _session.Dispose(async); + + return; + } + try { if (disposing) @@ -459,7 +577,11 @@ private async ValueTask EndWrite(bool disposing, bool async, CancellationToken c switch (message.MessageCode) { case ServerMessageCode.EndOfStream: - await _session.Dispose(async); + if (closeSession) + await _session.Dispose(async); + else + _endOfStream = true; + break; case ServerMessageCode.Error: @@ -522,7 +644,7 @@ public ValueTask DisposeAsync() private async ValueTask Dispose(bool async) { - await EndWrite(true, async, CancellationToken.None); + await EndWrite(disposing: true, closeSession: true, async, CancellationToken.None); } internal static async ValueTask CreateColumnWriterFactory(ColumnInfo columnInfo, object? column, int columnIndex, int rowCount, ClickHouseColumnSettings? settings, bool async, CancellationToken cancellationToken) diff --git a/src/Octonica.ClickHouseClient/ClickHouseConnection.cs b/src/Octonica.ClickHouseClient/ClickHouseConnection.cs index 1da85ab..c708a21 100644 --- a/src/Octonica.ClickHouseClient/ClickHouseConnection.cs +++ b/src/Octonica.ClickHouseClient/ClickHouseConnection.cs @@ -395,38 +395,12 @@ private async ValueTask CreateColumnWriter(string insert session = await connectionState.TcpClient.OpenSession(async, null, CancellationToken.None, cancellationToken); var messageBuilder = new ClientQueryMessage.Builder {QueryKind = QueryKind.InitialQuery, Query = insertFormatCommand}; - await session.SendQuery(messageBuilder, null, async, cancellationToken); + var query = await session.SendQuery(messageBuilder, null, async, cancellationToken); cancelOnFailure = true; - var msg = await session.ReadMessage(async, cancellationToken); - switch (msg.MessageCode) - { - case ServerMessageCode.Error: - throw ((ServerErrorMessage) msg).Exception.CopyWithQuery(insertFormatCommand); - - case ServerMessageCode.TableColumns: - break; - - default: - throw new ClickHouseException(ClickHouseErrorCodes.ProtocolUnexpectedResponse, $"Unexpected server message. Received the message of type {msg.MessageCode}."); - } - - msg = await session.ReadMessage(async, cancellationToken); - ClickHouseTable data; - switch (msg.MessageCode) - { - case ServerMessageCode.Error: - throw ((ServerErrorMessage) msg).Exception.CopyWithQuery(insertFormatCommand); - - case ServerMessageCode.Data: - data = await session.ReadTable((ServerDataMessage) msg, null, async, cancellationToken); - break; - - default: - throw new ClickHouseException(ClickHouseErrorCodes.ProtocolUnexpectedResponse, $"Unexpected server message. Received the message of type {msg.MessageCode}."); - } + var data = await ClickHouseColumnWriter.ReadTableMetadata(session, query.Query, async, cancellationToken); - return new ClickHouseColumnWriter(session, data.Header.Columns); + return new ClickHouseColumnWriter(session, query, data.Header.Columns); } catch (ClickHouseServerException) { diff --git a/src/Octonica.ClickHouseClient/ClickHouseTcpClient.cs b/src/Octonica.ClickHouseClient/ClickHouseTcpClient.cs index d69ad6c..4acaed3 100644 --- a/src/Octonica.ClickHouseClient/ClickHouseTcpClient.cs +++ b/src/Octonica.ClickHouseClient/ClickHouseTcpClient.cs @@ -156,7 +156,7 @@ public Session(ClickHouseTcpClient client, IClickHouseSessionExternalResources? _sessionCancellationToken = sessionCancellationToken; } - public async ValueTask SendQuery( + public async ValueTask SendQuery( ClientQueryMessage.Builder messageBuilder, IReadOnlyCollection? tables, bool async, @@ -165,6 +165,7 @@ public async ValueTask SendQuery( CheckDisposed(); var writer = _client._writer; + ClientQueryMessage queryMessage; try { var settings = _client._settings; @@ -176,7 +177,7 @@ public async ValueTask SendQuery( messageBuilder.ProtocolRevision = Math.Min(ClickHouseProtocolRevisions.CurrentRevision, _client.ServerInfo.Revision); messageBuilder.CompressionEnabled = _client._settings.Compress; - var queryMessage = messageBuilder.Build(); + queryMessage = messageBuilder.Build(); if (queryMessage.Settings != null) { if (_client.ServerInfo.Revision < ClickHouseProtocolRevisions.MinRevisionWithSettingsSerializedAsStrings) @@ -204,6 +205,27 @@ public async ValueTask SendQuery( } await WithCancellationToken(cancellationToken, ct => writer.Flush(async, ct)); + + return queryMessage; + } + + public async ValueTask SendQuery(ClientQueryMessage queryMessage, bool async, CancellationToken cancellationToken) + { + CheckDisposed(); + + var writer = _client._writer; + try + { + queryMessage.Write(writer); + WriteTable(ClickHouseEmptyTableWriter.Instance); + } + catch (Exception ex) + { + writer.Discard(); + throw ClickHouseHandledException.Wrap(ex); + } + + await WithCancellationToken(cancellationToken, ct => writer.Flush(async, ct)); } public async ValueTask SendCancel(bool async) diff --git a/src/Octonica.ClickHouseClient/Exceptions/ClickHouseErrorCodes.cs b/src/Octonica.ClickHouseClient/Exceptions/ClickHouseErrorCodes.cs index c4d76d1..d74cae2 100644 --- a/src/Octonica.ClickHouseClient/Exceptions/ClickHouseErrorCodes.cs +++ b/src/Octonica.ClickHouseClient/Exceptions/ClickHouseErrorCodes.cs @@ -1,5 +1,5 @@ #region License Apache 2.0 -/* Copyright 2019-2021 Octonica +/* Copyright 2019-2021, 2023 Octonica * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -139,5 +139,10 @@ public static class ClickHouseErrorCodes /// The code for an error caused by a violation of the TLS protocol. /// public const int TlsError = 22; + + /// + /// The code for an error caused by unexpected changes in the table's structure. + /// + public const int TableModified = 23; } }