From 133f3b1239a87f790631a5a075ad878951862467 Mon Sep 17 00:00:00 2001 From: Victor Sushko Date: Sun, 30 Jul 2023 21:38:23 +0500 Subject: [PATCH] Update supported protocol revision to 54456 --- .../ClickHouseColumnWriter.cs | 51 +++++++++++-------- .../ClickHouseCommand.cs | 35 +++++++++---- .../Protocol/ClickHouseProtocolRevisions.cs | 2 +- 3 files changed, 56 insertions(+), 32 deletions(-) diff --git a/src/Octonica.ClickHouseClient/ClickHouseColumnWriter.cs b/src/Octonica.ClickHouseClient/ClickHouseColumnWriter.cs index 33a5c65..8eae17f 100644 --- a/src/Octonica.ClickHouseClient/ClickHouseColumnWriter.cs +++ b/src/Octonica.ClickHouseClient/ClickHouseColumnWriter.cs @@ -1,5 +1,5 @@ #region License Apache 2.0 -/* Copyright 2019-2021 Octonica +/* Copyright 2019-2021, 2023 Octonica * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -451,28 +451,39 @@ private async ValueTask EndWrite(bool disposing, bool async, CancellationToken c else await _session.SendTable(ClickHouseEmptyTableWriter.Instance, async, cancellationToken); - var message = await _session.ReadMessage(async, CancellationToken.None); - switch (message.MessageCode) + bool isProfileEvents; + do { - case ServerMessageCode.EndOfStream: - await _session.Dispose(async); - break; - - case ServerMessageCode.Error: - // Connection state can't be resotred if the server raised an exception. - // This error is probably caused by the wrong formatted data. - var exception = ((ServerErrorMessage)message).Exception; - if (disposing) - { - await _session.SetFailed(exception, false, async); + isProfileEvents = false; + var message = await _session.ReadMessage(async, CancellationToken.None); + switch (message.MessageCode) + { + case ServerMessageCode.EndOfStream: + await _session.Dispose(async); break; - } - - throw exception; - default: - throw new ClickHouseException(ClickHouseErrorCodes.ProtocolUnexpectedResponse, $"Unexpected server message: \"{message.MessageCode}\"."); - } + case ServerMessageCode.Error: + // Connection state can't be resotred if the server raised an exception. + // This error is probably caused by the wrong formatted data. + var exception = ((ServerErrorMessage)message).Exception; + if (disposing) + { + await _session.SetFailed(exception, false, async); + break; + } + + throw exception; + + case ServerMessageCode.ProfileEvents: + isProfileEvents = true; + var profileEventsMessage = (ServerDataMessage)message; + await _session.SkipTable(profileEventsMessage, async, cancellationToken); + break; + + default: + throw new ClickHouseException(ClickHouseErrorCodes.ProtocolUnexpectedResponse, $"Unexpected server message: \"{message.MessageCode}\"."); + } + } while (isProfileEvents); } catch (ClickHouseHandledException ex) { diff --git a/src/Octonica.ClickHouseClient/ClickHouseCommand.cs b/src/Octonica.ClickHouseClient/ClickHouseCommand.cs index 4bef407..5d849c2 100644 --- a/src/Octonica.ClickHouseClient/ClickHouseCommand.cs +++ b/src/Octonica.ClickHouseClient/ClickHouseCommand.cs @@ -1,5 +1,5 @@ #region License Apache 2.0 -/* Copyright 2019-2022 Octonica +/* Copyright 2019-2023 Octonica * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -684,21 +684,34 @@ private async ValueTask ExecuteDbDataReader(CommandBehavio var query = await SendQuery(session, behavior, async, cancellationToken); cancelOnFailure = true; - var message = await session.ReadMessage(async, cancellationToken); - switch (message.MessageCode) + bool isProfileEvents; + IServerMessage message; + do { - case ServerMessageCode.Data: - break; + isProfileEvents = false; + message = await session.ReadMessage(async, cancellationToken); + switch (message.MessageCode) + { + case ServerMessageCode.Data: + break; - case ServerMessageCode.Error: - throw ((ServerErrorMessage) message).Exception.CopyWithQuery(query); + case ServerMessageCode.Error: + throw ((ServerErrorMessage)message).Exception.CopyWithQuery(query); - case ServerMessageCode.EndOfStream: - throw ClickHouseHandledException.Wrap(new ClickHouseException(ClickHouseErrorCodes.QueryTypeMismatch, "There is no table in the server's response.")); + case ServerMessageCode.ProfileEvents: + isProfileEvents = true; + var dataMessage = (ServerDataMessage)message; + await session.SkipTable(dataMessage, async, CancellationToken.None); + break; - default: - throw new ClickHouseException(ClickHouseErrorCodes.QueryTypeMismatch, "There is no table in the server's response."); + case ServerMessageCode.EndOfStream: + throw ClickHouseHandledException.Wrap(new ClickHouseException(ClickHouseErrorCodes.QueryTypeMismatch, "There is no table in the server's response.")); + + default: + throw new ClickHouseException(ClickHouseErrorCodes.QueryTypeMismatch, "There is no table in the server's response."); + } } + while (isProfileEvents); var firstTable = await session.ReadTable((ServerDataMessage) message, null, async, cancellationToken); if (rowLimit == ClickHouseDataReaderRowLimit.Zero) diff --git a/src/Octonica.ClickHouseClient/Protocol/ClickHouseProtocolRevisions.cs b/src/Octonica.ClickHouseClient/Protocol/ClickHouseProtocolRevisions.cs index 4913b36..94fe935 100644 --- a/src/Octonica.ClickHouseClient/Protocol/ClickHouseProtocolRevisions.cs +++ b/src/Octonica.ClickHouseClient/Protocol/ClickHouseProtocolRevisions.cs @@ -25,7 +25,7 @@ public static class ClickHouseProtocolRevisions /// /// The number of the current revision. It is the latest revision supported by the client. /// - public const int CurrentRevision = MinRevisionWithCustomSerialization; + public const int CurrentRevision = 54456; /// /// The number of protocol's revision with support of custom serialization.