Skip to content

Commit

Permalink
[ksqlDb.RestApi.Client]: added cloning of QueryStreamParameters tests #…
Browse files Browse the repository at this point in the history
  • Loading branch information
tomasfabian committed Apr 9, 2024
1 parent 0052e16 commit cf2fa3f
Show file tree
Hide file tree
Showing 2 changed files with 81 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,38 @@ public void SelectConstant_BuildKSql_PrintsConstant()
ksql.Should().BeEquivalentTo("SELECT 'Hello world' FROM Locations EMIT CHANGES;");
}

[Test]
public void RunInParallel_BuildKSql_PrintsConstant()
{
//Arrange
var context = new TestableDbProvider(TestParameters.KsqlDbUrl);

var query1 = context.CreateQueryStream<Location>();
var query2 = context.CreateQueryStream<Location>("Place");

var providedParameters = new IKSqlDbParameters[2];
int i = 0;

context.KSqlDbProviderMock.Setup(c => c.Run<Location>(It.IsAny<object>(), It.IsAny<CancellationToken>()))
.Callback<object, CancellationToken>((par, ct) => { providedParameters[i++] = (IKSqlDbParameters)par; })
.Returns(new List<Location>().ToAsyncEnumerable());

//Act
var task1 = Task.Run(() =>
{
query1.ToAsyncEnumerable().ToListAsync();
});
var task2 = Task.Run(() =>
{
query2.ToAsyncEnumerable().ToListAsync();
});

//Assert
Task.WaitAll(task1, task2);
providedParameters[0].Sql.Should().NotBeEquivalentTo(providedParameters[1].Sql);
providedParameters[0].Sql.Should().ContainAny("Locations", "Places");
}

#region OperatorPrecedence

[Test]
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
using FluentAssertions;
using ksqlDB.RestApi.Client.KSql.Query;
using ksqlDB.RestApi.Client.KSql.Query.Options;
using ksqlDB.RestApi.Client.KSql.RestApi.Parameters;
using NUnit.Framework;

namespace ksqlDb.RestApi.Client.Tests.KSql.Query
{
public class KStreamSetDependenciesTests
{
[Test]
public void QueryStreamParameters_CloneIsReturned()
{
//Arrange
var queryStreamParameters = new QueryStreamParameters
{
[QueryStreamParameters.AutoOffsetResetPropertyName] = AutoOffsetReset.Latest.ToKSqlValue()
};
var kStreamSetDependencies = new KStreamSetDependencies(null!, null!, null!, queryStreamParameters);

//Act
var queryStreamParameters1 = kStreamSetDependencies.QueryStreamParameters;
var queryStreamParameters2 = kStreamSetDependencies.QueryStreamParameters;

//Assert
queryStreamParameters1.Should().BeEquivalentTo(queryStreamParameters2);
}

[Test]
public void QueryStreamParameters_CloneIsReturned_SqlIsChanged()
{
//Arrange
var queryStreamParameters = new QueryStreamParameters
{
[QueryStreamParameters.AutoOffsetResetPropertyName] = AutoOffsetReset.Latest.ToKSqlValue()
};
var kStreamSetDependencies = new KStreamSetDependencies(null!, null!, null!, queryStreamParameters);

//Act
var queryStreamParameters1 = kStreamSetDependencies.QueryStreamParameters;
queryStreamParameters1.Sql = "sql1";
var queryStreamParameters2 = kStreamSetDependencies.QueryStreamParameters;
queryStreamParameters1.Sql = "sql2";

//Assert
queryStreamParameters1.Should().NotBeEquivalentTo(queryStreamParameters2);
}
}
}

0 comments on commit cf2fa3f

Please sign in to comment.