Skip to content

Commit

Permalink
Update supported protocol revision to 54456
Browse files Browse the repository at this point in the history
  • Loading branch information
victor-sushko committed Jul 30, 2023
1 parent 7c2d8d2 commit 133f3b1
Show file tree
Hide file tree
Showing 3 changed files with 56 additions and 32 deletions.
51 changes: 31 additions & 20 deletions src/Octonica.ClickHouseClient/ClickHouseColumnWriter.cs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
#region License Apache 2.0
/* Copyright 2019-2021 Octonica
/* Copyright 2019-2021, 2023 Octonica
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -451,28 +451,39 @@ private async ValueTask EndWrite(bool disposing, bool async, CancellationToken c
else
await _session.SendTable(ClickHouseEmptyTableWriter.Instance, async, cancellationToken);

var message = await _session.ReadMessage(async, CancellationToken.None);
switch (message.MessageCode)
bool isProfileEvents;
do
{
case ServerMessageCode.EndOfStream:
await _session.Dispose(async);
break;

case ServerMessageCode.Error:
// Connection state can't be resotred if the server raised an exception.
// This error is probably caused by the wrong formatted data.
var exception = ((ServerErrorMessage)message).Exception;
if (disposing)
{
await _session.SetFailed(exception, false, async);
isProfileEvents = false;
var message = await _session.ReadMessage(async, CancellationToken.None);
switch (message.MessageCode)
{
case ServerMessageCode.EndOfStream:
await _session.Dispose(async);
break;
}

throw exception;

default:
throw new ClickHouseException(ClickHouseErrorCodes.ProtocolUnexpectedResponse, $"Unexpected server message: \"{message.MessageCode}\".");
}
case ServerMessageCode.Error:
// Connection state can't be resotred if the server raised an exception.
// This error is probably caused by the wrong formatted data.
var exception = ((ServerErrorMessage)message).Exception;
if (disposing)
{
await _session.SetFailed(exception, false, async);
break;
}

throw exception;

case ServerMessageCode.ProfileEvents:
isProfileEvents = true;
var profileEventsMessage = (ServerDataMessage)message;
await _session.SkipTable(profileEventsMessage, async, cancellationToken);
break;

default:
throw new ClickHouseException(ClickHouseErrorCodes.ProtocolUnexpectedResponse, $"Unexpected server message: \"{message.MessageCode}\".");
}
} while (isProfileEvents);
}
catch (ClickHouseHandledException ex)
{
Expand Down
35 changes: 24 additions & 11 deletions src/Octonica.ClickHouseClient/ClickHouseCommand.cs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
#region License Apache 2.0
/* Copyright 2019-2022 Octonica
/* Copyright 2019-2023 Octonica
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -684,21 +684,34 @@ private async ValueTask<ClickHouseDataReader> ExecuteDbDataReader(CommandBehavio
var query = await SendQuery(session, behavior, async, cancellationToken);

cancelOnFailure = true;
var message = await session.ReadMessage(async, cancellationToken);
switch (message.MessageCode)
bool isProfileEvents;
IServerMessage message;
do
{
case ServerMessageCode.Data:
break;
isProfileEvents = false;
message = await session.ReadMessage(async, cancellationToken);
switch (message.MessageCode)
{
case ServerMessageCode.Data:
break;

case ServerMessageCode.Error:
throw ((ServerErrorMessage) message).Exception.CopyWithQuery(query);
case ServerMessageCode.Error:
throw ((ServerErrorMessage)message).Exception.CopyWithQuery(query);

case ServerMessageCode.EndOfStream:
throw ClickHouseHandledException.Wrap(new ClickHouseException(ClickHouseErrorCodes.QueryTypeMismatch, "There is no table in the server's response."));
case ServerMessageCode.ProfileEvents:
isProfileEvents = true;
var dataMessage = (ServerDataMessage)message;
await session.SkipTable(dataMessage, async, CancellationToken.None);
break;

default:
throw new ClickHouseException(ClickHouseErrorCodes.QueryTypeMismatch, "There is no table in the server's response.");
case ServerMessageCode.EndOfStream:
throw ClickHouseHandledException.Wrap(new ClickHouseException(ClickHouseErrorCodes.QueryTypeMismatch, "There is no table in the server's response."));

default:
throw new ClickHouseException(ClickHouseErrorCodes.QueryTypeMismatch, "There is no table in the server's response.");
}
}
while (isProfileEvents);

var firstTable = await session.ReadTable((ServerDataMessage) message, null, async, cancellationToken);
if (rowLimit == ClickHouseDataReaderRowLimit.Zero)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ public static class ClickHouseProtocolRevisions
/// <summary>
/// The number of the current revision. It is the latest revision supported by the client.
/// </summary>
public const int CurrentRevision = MinRevisionWithCustomSerialization;
public const int CurrentRevision = 54456;

/// <summary>
/// The number of protocol's revision with support of custom serialization.
Expand Down

0 comments on commit 133f3b1

Please sign in to comment.