Skip to content

Commit

Permalink
Add basic support for transactions (#77)
Browse files Browse the repository at this point in the history
  • Loading branch information
victor-sushko committed Jul 31, 2023
1 parent 0558724 commit f801f6d
Show file tree
Hide file tree
Showing 2 changed files with 216 additions and 20 deletions.
186 changes: 166 additions & 20 deletions src/Octonica.ClickHouseClient/ClickHouseColumnWriter.cs
Original file line number Diff line number Diff line change
Expand Up @@ -190,28 +190,61 @@ public int GetOrdinal(string name)
/// Writes a single row to the table.
/// </summary>
/// <param name="values">The list of column values.</param>
/// <remarks>Please note that the method always commits a transaction. No subsequent call of <see cref="Commit"/> is required.</remarks>
public void WriteRow(params object?[] values)
{
TaskHelper.WaitNonAsyncTask(WriteRow(values, false, CancellationToken.None));
TaskHelper.WaitNonAsyncTask(WriteRow(values, commit: false, async: false, CancellationToken.None));
}

/// <summary>
/// Writes a single row to the table.
/// </summary>
/// <param name="values">The list of column values.</param>
/// <param name="values">The list of column values.</param>
/// <returns>A <see cref="Task"/> representing asyncronous operation.</returns>
/// <remarks>Please note that the method is always commits a transaction. No subsequent call of <see cref="Commit"/> is required.</remarks>
public void WriteRow(IReadOnlyCollection<object?> values)
{
TaskHelper.WaitNonAsyncTask(WriteRow(values, false, CancellationToken.None));
TaskHelper.WaitNonAsyncTask(WriteRow(values, commit: false, async: false, CancellationToken.None));
}

/// <summary>
/// Writes a single row to the table.
/// </summary>
/// <param name="values">The list of column values.</param>
/// <param name="commit">
/// If <see langword="true"/>, commits the transaction immediately after writing a row (the same mode as <see cref="ClickHouseTransactionMode.Block"/>).
/// If <see langword="false"/>, leaves the transaction open (the same mode as <see cref="ClickHouseTransactionMode.Manual"/>).
/// </param>
/// <returns>A <see cref="Task"/> representing asyncronous operation.</returns>
public void WriteRow(IReadOnlyCollection<object?> values, bool commit)
{
TaskHelper.WaitNonAsyncTask(WriteRow(values, commit, async: false, CancellationToken.None));
}

/// <summary>
/// Asyncronously writes a single row to the table.
/// </summary>
/// <param name="values">The list of column values.</param>
/// <returns>A <see cref="Task"/> representing asyncronous operation.</returns>
/// <remarks>Please note that the method is always commits a transaction. No subsequent call of <see cref="CommitAsync(CancellationToken)"/> is required.</remarks>
public async Task WriteRowAsync(IReadOnlyCollection<object?> values)
{
await WriteRow(values, true, CancellationToken.None);
await WriteRow(values, commit: false, async: true, CancellationToken.None);
}


/// <summary>
/// Asyncronously writes a single row to the table.
/// </summary>
/// <param name="values">The list of column values.</param>
/// <param name="commit">
/// If <see langword="true"/>, commits the transaction immediately after writing a row (the same mode as <see cref="ClickHouseTransactionMode.Block"/>).
/// If <see langword="false"/>, leaves the transaction open (the same mode as <see cref="ClickHouseTransactionMode.Manual"/>).
/// </param>
/// <returns>A <see cref="Task"/> representing asyncronous operation.</returns>
public async Task WriteRowAsync(IReadOnlyCollection<object?> values, bool commit)
{
await WriteRow(values, commit, async: true, CancellationToken.None);
}

/// <summary>
Expand All @@ -220,12 +253,28 @@ public async Task WriteRowAsync(IReadOnlyCollection<object?> values)
/// <param name="values">The list of column values.</param>
/// <param name="cancellationToken">The cancellation instruction.</param>
/// <returns>A <see cref="Task"/> representing asyncronous operation.</returns>
/// <remarks>Please note that the method is always commits a transaction. No subsequent call of <see cref="CommitAsync(CancellationToken)"/> is required.</remarks>
public async Task WriteRowAsync(IReadOnlyCollection<object?> values, CancellationToken cancellationToken)
{
await WriteRow(values, true, cancellationToken);
await WriteRow(values, commit: false, async: true, cancellationToken);
}

private async ValueTask WriteRow(IReadOnlyCollection<object?> values, bool async, CancellationToken cancellationToken)
/// <summary>
/// Asyncronously writes a single row to the table.
/// </summary>
/// <param name="values">The list of column values.</param>
/// <param name="commit">
/// If <see langword="true"/>, commits the transaction immediately after writing a row (the same mode as <see cref="ClickHouseTransactionMode.Block"/>).
/// If <see langword="false"/>, leaves the transaction open (the same mode as <see cref="ClickHouseTransactionMode.Manual"/>).
/// </param>
/// <param name="cancellationToken">The cancellation instruction.</param>
/// <returns>A <see cref="Task"/> representing asyncronous operation.</returns>
public async Task WriteRowAsync(IReadOnlyCollection<object?> values, bool commit, CancellationToken cancellationToken)
{
await WriteRow(values, commit, async: true, cancellationToken);
}

private async ValueTask WriteRow(IReadOnlyCollection<object?> values, bool commit, bool async, CancellationToken cancellationToken)
{
if (values == null)
throw new ArgumentNullException(nameof(values));
Expand Down Expand Up @@ -312,7 +361,7 @@ private async ValueTask WriteRow(IReadOnlyCollection<object?> values, bool async
}

var table = new ClickHouseTableWriter(string.Empty, 1, columnWriters);
await SendTable(table, true, async, cancellationToken);
await SendTable(table, commit, async, cancellationToken);
}

/// <summary>
Expand All @@ -328,7 +377,24 @@ private async ValueTask WriteRow(IReadOnlyCollection<object?> values, bool async
/// <param name="rowCount">The number of rows in columns.</param>
public void WriteTable(IReadOnlyDictionary<string, object?> columns, int rowCount)
{
TaskHelper.WaitNonAsyncTask(WriteTable(columns, rowCount, false, CancellationToken.None));
TaskHelper.WaitNonAsyncTask(WriteTable(columns, rowCount, ClickHouseTransactionMode.Default, false, CancellationToken.None));
}

/// <summary>
/// Writes the specified columns to the table.
/// <br/>
/// Each column must be an object implementing one of the interfaces:
/// <see cref="IReadOnlyList{T}"/>,
/// <see cref="IList{T}"/>,
/// <see cref="IEnumerable{T}"/> or
/// <see cref="IEnumerable"/>.
/// </summary>
/// <param name="columns">The <see cref="IReadOnlyDictionary{TKey, TValue}"/> object that provides access to columns by their names.</param>
/// <param name="rowCount">The number of rows in columns.</param>
/// <param name="transactionMode">The mode of sending write confirmations to the server.See <see cref="ClickHouseTransactionMode"/> for details.</param>
public void WriteTable(IReadOnlyDictionary<string, object?> columns, int rowCount, ClickHouseTransactionMode transactionMode)
{
TaskHelper.WaitNonAsyncTask(WriteTable(columns, rowCount, transactionMode, false, CancellationToken.None));
}

/// <summary>
Expand All @@ -344,7 +410,24 @@ public void WriteTable(IReadOnlyDictionary<string, object?> columns, int rowCoun
/// <param name="rowCount">The number of rows in columns.</param>
public void WriteTable(IReadOnlyList<object?> columns, int rowCount)
{
TaskHelper.WaitNonAsyncTask(WriteTable(columns, rowCount, false, CancellationToken.None));
TaskHelper.WaitNonAsyncTask(WriteTable(columns, rowCount, ClickHouseTransactionMode.Default, false, CancellationToken.None));
}

/// <summary>
/// Writes the specified columns to the table.
/// <br/>
/// Each column must be an object implementing one of the interfaces:
/// <see cref="IReadOnlyList{T}"/>,
/// <see cref="IList{T}"/>,
/// <see cref="IEnumerable{T}"/> or
/// <see cref="IEnumerable"/>.
/// </summary>
/// <param name="columns">The list of columns.</param>
/// <param name="rowCount">The number of rows in columns.</param>
/// <param name="transactionMode">The mode of sending write confirmations to the server.See <see cref="ClickHouseTransactionMode"/> for details.</param>
public void WriteTable(IReadOnlyList<object?> columns, int rowCount, ClickHouseTransactionMode transactionMode)
{
TaskHelper.WaitNonAsyncTask(WriteTable(columns, rowCount, transactionMode, false, CancellationToken.None));
}

/// <summary>
Expand All @@ -363,7 +446,27 @@ public void WriteTable(IReadOnlyList<object?> columns, int rowCount)
/// <returns>A <see cref="Task"/> representing asyncronous operation.</returns>
public async Task WriteTableAsync(IReadOnlyDictionary<string, object?> columns, int rowCount, CancellationToken cancellationToken)
{
await WriteTable(columns, rowCount, true, cancellationToken);
await WriteTable(columns, rowCount, ClickHouseTransactionMode.Default, true, cancellationToken);
}

/// <summary>
/// Asyncronously writes the specified columns to the table.
/// <br/>
/// Each column must be an object implementing one of the interfaces:
/// <see cref="IReadOnlyList{T}"/>,
/// <see cref="IList{T}"/>,
/// <see cref="IAsyncEnumerable{T}"/>,
/// <see cref="IEnumerable{T}"/> or
/// <see cref="IEnumerable"/>.
/// </summary>
/// <param name="columns">The <see cref="IReadOnlyDictionary{TKey, TValue}"/> object that provides access to columns by their names.</param>
/// <param name="rowCount">The number of rows in columns.</param>
/// <param name="transactionMode">The mode of sending write confirmations to the server.See <see cref="ClickHouseTransactionMode"/> for details.</param>
/// <param name="cancellationToken">The cancellation instruction.</param>
/// <returns>A <see cref="Task"/> representing asyncronous operation.</returns>
public async Task WriteTableAsync(IReadOnlyDictionary<string, object?> columns, int rowCount, ClickHouseTransactionMode transactionMode, CancellationToken cancellationToken)
{
await WriteTable(columns, rowCount, transactionMode, true, cancellationToken);
}

/// <summary>
Expand All @@ -382,10 +485,30 @@ public async Task WriteTableAsync(IReadOnlyDictionary<string, object?> columns,
/// <returns>A <see cref="Task"/> representing asyncronous operation.</returns>
public async Task WriteTableAsync(IReadOnlyList<object?> columns, int rowCount, CancellationToken cancellationToken)
{
await WriteTable(columns, rowCount, true, cancellationToken);
await WriteTable(columns, rowCount, ClickHouseTransactionMode.Default, true, cancellationToken);
}

private async ValueTask WriteTable(IReadOnlyDictionary<string, object?> columns, int rowCount, bool async, CancellationToken cancellationToken)
/// <summary>
/// Asyncronously writes the specified columns to the table.
/// <br/>
/// Each column must be an object implementing one of the interfaces:
/// <see cref="IReadOnlyList{T}"/>,
/// <see cref="IList{T}"/>,
/// <see cref="IAsyncEnumerable{T}"/>,
/// <see cref="IEnumerable{T}"/> or
/// <see cref="IEnumerable"/>.
/// </summary>
/// <param name="columns">The list of columns.</param>
/// <param name="rowCount">The number of rows in columns.</param>
/// <param name="transactionMode">The mode of sending write confirmations to the server.See <see cref="ClickHouseTransactionMode"/> for details.</param>
/// <param name="cancellationToken">The cancellation instruction.</param>
/// <returns>A <see cref="Task"/> representing asyncronous operation.</returns>
public async Task WriteTableAsync(IReadOnlyList<object?> columns, int rowCount, ClickHouseTransactionMode transactionMode, CancellationToken cancellationToken)
{
await WriteTable(columns, rowCount, transactionMode, true, cancellationToken);
}

private async ValueTask WriteTable(IReadOnlyDictionary<string, object?> columns, int rowCount, ClickHouseTransactionMode mode, bool async, CancellationToken cancellationToken)
{
if (columns == null)
throw new ArgumentNullException(nameof(columns));
Expand All @@ -399,10 +522,10 @@ private async ValueTask WriteTable(IReadOnlyDictionary<string, object?> columns,
list.Add(null);
}

await WriteTable(list, rowCount, async, cancellationToken);
await WriteTable(list, rowCount, mode, async, cancellationToken);
}

private async ValueTask WriteTable(IReadOnlyList<object?> columns, int rowCount, bool async, CancellationToken cancellationToken)
private async ValueTask WriteTable(IReadOnlyList<object?> columns, int rowCount, ClickHouseTransactionMode mode, bool async, CancellationToken cancellationToken)
{
if (columns == null)
throw new ArgumentNullException(nameof(columns));
Expand All @@ -425,18 +548,20 @@ private async ValueTask WriteTable(IReadOnlyList<object?> columns, int rowCount,

int offset;
var blockSize = MaxBlockSize ?? rowCount;
bool commitBlock = mode == ClickHouseTransactionMode.Block;
for (offset = 0; offset + blockSize < rowCount; offset += blockSize)
{
var table = new ClickHouseTableWriter(string.Empty, blockSize, writerFactories.Select(w => w.Create(offset, blockSize)));
await SendTable(table, false, async, cancellationToken);
await SendTable(table, commitBlock, async, cancellationToken);
}

var finalBlockSize = rowCount - offset;
var finalTable = new ClickHouseTableWriter(string.Empty, finalBlockSize, writerFactories.Select(w => w.Create(offset, finalBlockSize)));
await SendTable(finalTable, true, async, cancellationToken);
bool commit = commitBlock || mode == ClickHouseTransactionMode.Default || mode == ClickHouseTransactionMode.Auto;
await SendTable(finalTable, commit, async, cancellationToken);
}

private async ValueTask SendTable(ClickHouseTableWriter table, bool confirm, bool async, CancellationToken cancellationToken)
private async ValueTask SendTable(ClickHouseTableWriter table, bool commit, bool async, CancellationToken cancellationToken)
{
if (_endOfStream)
await RepeatQuery(async, cancellationToken);
Expand All @@ -445,7 +570,7 @@ private async ValueTask SendTable(ClickHouseTableWriter table, bool confirm, boo
{
await _session.SendTable(table, async, cancellationToken);

if (confirm)
if (commit)
await EndWrite(disposing: false, closeSession: false, async, cancellationToken);
}
catch (ClickHouseHandledException)
Expand Down Expand Up @@ -528,6 +653,28 @@ private async ValueTask RepeatQuery(bool async, CancellationToken cancellationTo
}
}

/// <summary>
/// Notifies the server that the transaction should be commited.
/// This method acts similar to <see cref="EndWrite()"/>, but it doesn't close the writer.
/// </summary>
/// <remarks>A subsequent writing operation will send a new INSERT query to the server.</remarks>
public void Commit()
{
TaskHelper.WaitNonAsyncTask(EndWrite(disposing: false, closeSession: false, async: false, CancellationToken.None));
}

/// <summary>
/// Asyncronously notifies the server that the transaction should be commited.
/// This method acts similar to <see cref="EndWriteAsync(CancellationToken)"/>, but it doesn't close the writer.
/// </summary>
/// <param name="cancellationToken">The cancellation instruction.</param>
/// <returns>A <see cref="Task"/> representing asyncronous operation.</returns>
/// <remarks>A subsequent writing operation will send a new INSERT query to the server.</remarks>
public async Task CommitAsync(CancellationToken cancellationToken)
{
await EndWrite(disposing: false, closeSession: false, async: true, cancellationToken);
}

/// <summary>
/// Closes the writer and releases all resources associated with it.
/// </summary>
Expand Down Expand Up @@ -579,9 +726,8 @@ private async ValueTask EndWrite(bool disposing, bool closeSession, bool async,
case ServerMessageCode.EndOfStream:
if (closeSession)
await _session.Dispose(async);
else
_endOfStream = true;

_endOfStream = true;
break;

case ServerMessageCode.Error:
Expand Down
50 changes: 50 additions & 0 deletions src/Octonica.ClickHouseClient/ClickHouseFlushMode.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
#region License Apache 2.0
/* Copyright 2023 Octonica
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#endregion

using System.Threading;

namespace Octonica.ClickHouseClient
{
/// <summary>
/// Describes a set of strategies used by <see cref="ClickHouseColumnWriter"/> for commiting a transaction.
/// </summary>
public enum ClickHouseTransactionMode
{
/// <summary>
/// The default strategy. This strategy is used if no strategy is specified. The same as <see cref="Auto"/>.
/// </summary>
Default = 0,

/// <summary>
/// <see cref="ClickHouseColumnWriter"/> will send a table and finish the query (commits transaction).
/// Next table (if any) will be sent with the new INSERT query (in a new transaction).
/// </summary>
Auto = 1,

/// <summary>
/// <see cref="ClickHouseColumnWriter"/> will <b>not</b> send confirmation after writing a table.
/// Next table (if any) will be sent with the current INSERT query (in the same transaction).
/// </summary>
/// <remarks>Call the method <see cref="ClickHouseColumnWriter.Commit"/> (or <see cref="ClickHouseColumnWriter.CommitAsync(CancellationToken)"/>) after writing tables.</remarks>
Manual = 2,

/// <summary>
/// <see cref="ClickHouseColumnWriter"/> will send each block of data in a separate query (one transaction per block).
/// </summary>
Block = 3
}
}

0 comments on commit f801f6d

Please sign in to comment.