From 6eec6a74f6ce17aa832be517c3e4ee6fd63e05dc Mon Sep 17 00:00:00 2001 From: Tomas Fabian Date: Thu, 7 Mar 2024 11:02:28 +0100 Subject: [PATCH] [ksqlDb.RestApi.Client]: integration tests improvements --- .../KSql/Linq/QbservableExtensionsTests.cs | 12 ++--- .../CreateStatementExtensionsTests.cs | 2 +- .../KSql/Query/Context/KSqlDbContextTests.cs | 21 ++++---- .../Functions/KSqlFunctionsExtensionsTests.cs | 12 +---- .../Functions/KSqlInvocationFunctionsTests.cs | 52 ++++++++++++------- .../KSql/Query/KSqlLexicalPrecedenceTests.cs | 14 ++--- .../KSql/Query/KSqlNestedTypesTests.cs | 14 ++--- 7 files changed, 63 insertions(+), 64 deletions(-) diff --git a/Tests/ksqlDB.RestApi.Client.IntegrationTests/KSql/Linq/QbservableExtensionsTests.cs b/Tests/ksqlDB.RestApi.Client.IntegrationTests/KSql/Linq/QbservableExtensionsTests.cs index dfd5f072..dd71250f 100644 --- a/Tests/ksqlDB.RestApi.Client.IntegrationTests/KSql/Linq/QbservableExtensionsTests.cs +++ b/Tests/ksqlDB.RestApi.Client.IntegrationTests/KSql/Linq/QbservableExtensionsTests.cs @@ -60,7 +60,7 @@ protected static async Task InitializeDatabase() [OneTimeTearDown] public static async Task ClassCleanup() { - var result = await RestApiProvider.DropStreamAndTopic(StreamName); + await RestApiProvider.DropStreamAndTopic(StreamName); } protected virtual ksqlDB.RestApi.Client.KSql.Linq.IQbservable QuerySource => @@ -240,7 +240,6 @@ public async Task SubscribeAsync_Canceled() .SubscribeAsync(c => actualValues.Add(c), e => semaphore.Release(), () => semaphore.Release(), cancellationToken: cts.Token); await cts.CancelAsync(); - await semaphore.WaitAsync(timeOut); //Assert Assert.AreEqual(0, actualValues.Count); @@ -275,13 +274,13 @@ public async Task ObserveOn_TaskPoolScheduler_ReceivesValuesOnNewThread() var semaphore = new SemaphoreSlim(initialCount: 0, 1); var source = QuerySource; - var currentThread = Thread.CurrentThread.ManagedThreadId; + var currentThread = Environment.CurrentManagedThreadId; int? observeOnThread = null; //Act var subscription = await source.Take(1) .ObserveOn(TaskPoolScheduler.Default) - .SubscribeAsync(_ => observeOnThread = Thread.CurrentThread.ManagedThreadId, e => semaphore.Release(), () => semaphore.Release()); + .SubscribeAsync(_ => observeOnThread = Environment.CurrentManagedThreadId, e => semaphore.Release(), () => semaphore.Release()); await semaphore.WaitAsync(timeOut); @@ -509,9 +508,6 @@ public async Task ListContainsProjection() var orderTypes = new List { 1, 3 }; - var c = QuerySource - .Select(c => orderTypes.Contains(c.Id)).ToQueryString(); - var source = QuerySource .Select(c => orderTypes.Contains(c.Id)) .ToAsyncEnumerable(); @@ -711,7 +707,7 @@ public async Task SelectAsStruct() var ksql = $"SELECT STRUCT(NAME := 'E.T.') FROM {StreamName} EMIT CHANGES LIMIT 1;"; - QueryStreamParameters queryStreamParameters = new QueryStreamParameters + QueryStreamParameters queryStreamParameters = new() { Sql = ksql, [QueryParameters.AutoOffsetResetPropertyName] = "earliest", diff --git a/Tests/ksqlDB.RestApi.Client.IntegrationTests/KSql/Linq/Statements/CreateStatementExtensionsTests.cs b/Tests/ksqlDB.RestApi.Client.IntegrationTests/KSql/Linq/Statements/CreateStatementExtensionsTests.cs index 3cbe5625..769ed22c 100644 --- a/Tests/ksqlDB.RestApi.Client.IntegrationTests/KSql/Linq/Statements/CreateStatementExtensionsTests.cs +++ b/Tests/ksqlDB.RestApi.Client.IntegrationTests/KSql/Linq/Statements/CreateStatementExtensionsTests.cs @@ -75,7 +75,7 @@ public async Task CreateOrReplaceStreamStatement_ToStatementString_ComplexQueryW //Assert ksql.Should().BeEquivalentTo(@$"CREATE OR REPLACE STREAM {StreamName} WITH ( KAFKA_TOPIC='moviesByTitle', KEY_FORMAT='Json', VALUE_FORMAT='Json', PARTITIONS='1', REPLICAS='1' ) -AS SELECT Id, Title, ReleaseYear AS ReleaseYear FROM {StreamEntityName} +AS SELECT Id, Title, Release_Year AS ReleaseYear FROM {StreamEntityName} WHERE Id < 3 PARTITION BY Id EMIT CHANGES;"); var responses = await httpResponseMessage.ToStatementResponsesAsync(); diff --git a/Tests/ksqlDB.RestApi.Client.IntegrationTests/KSql/Query/Context/KSqlDbContextTests.cs b/Tests/ksqlDB.RestApi.Client.IntegrationTests/KSql/Query/Context/KSqlDbContextTests.cs index 0d7ae62a..2c0ff0e8 100644 --- a/Tests/ksqlDB.RestApi.Client.IntegrationTests/KSql/Query/Context/KSqlDbContextTests.cs +++ b/Tests/ksqlDB.RestApi.Client.IntegrationTests/KSql/Query/Context/KSqlDbContextTests.cs @@ -103,12 +103,12 @@ public async Task WithValue_RendersFromProvidedValue() private record TimeTypes { - public DateTime Dt { get; set; } - public TimeSpan Ts { get; set; } + public DateTime Dt { get; init; } + public TimeSpan Ts { get; init; } public DateTimeOffset DtOffset { get; set; } } - private readonly EntityCreationMetadata metadata = new EntityCreationMetadata + private readonly EntityCreationMetadata metadata = new() { KafkaTopic = nameof(TimeTypes), Partitions = 1, @@ -126,7 +126,7 @@ public async Task TimeTypes_InsertValues_ValuesReceived() var buildServiceProvider = serviceCollection.BuildServiceProvider(); var httpResponseMessage = await buildServiceProvider.GetRequiredService().CreateStreamAsync(metadata); - var statementResponses = await httpResponseMessage.ToStatementResponsesAsync().ConfigureAwait(false); + await httpResponseMessage.ToStatementResponsesAsync().ConfigureAwait(false); await using var context = buildServiceProvider.GetRequiredService(); @@ -138,12 +138,12 @@ public async Task TimeTypes_InsertValues_ValuesReceived() using var subscription = context.CreateQueryStream() .Take(1) .Subscribe(value => - { - receivedValues.Add(value); - }, error => - { - semaphoreSlim.Release(); - }, + { + receivedValues.Add(value); + }, error => + { + semaphoreSlim.Release(); + }, () => { semaphoreSlim.Release(); @@ -154,7 +154,6 @@ public async Task TimeTypes_InsertValues_ValuesReceived() Dt = new DateTime(2021, 4, 1), Ts = new TimeSpan(1, 2, 3), DtOffset = new DateTimeOffset(2021, 7, 4, 13, 29, 45, 447, TimeSpan.Zero) - //DtOffset = new DateTimeOffset(2021, 7, 4, 13, 29, 45, 447, TimeSpan.FromHours(4)) }; context.Add(value); diff --git a/Tests/ksqlDB.RestApi.Client.IntegrationTests/KSql/Query/Functions/KSqlFunctionsExtensionsTests.cs b/Tests/ksqlDB.RestApi.Client.IntegrationTests/KSql/Query/Functions/KSqlFunctionsExtensionsTests.cs index e09b0020..2ebbf777 100644 --- a/Tests/ksqlDB.RestApi.Client.IntegrationTests/KSql/Query/Functions/KSqlFunctionsExtensionsTests.cs +++ b/Tests/ksqlDB.RestApi.Client.IntegrationTests/KSql/Query/Functions/KSqlFunctionsExtensionsTests.cs @@ -139,7 +139,7 @@ public async Task ArrayJoin() //Assert Assert.AreEqual(expectedItemsCount, actualValues.Count); actualValues.Count.Should().Be(1); - actualValues[0].Col.Should().Be(@"1;2"); + actualValues[0].Col.Should().Be("1;2"); } [Test] @@ -308,7 +308,7 @@ public async Task ToBytes() //Assert Assert.AreEqual(expectedItemsCount, actualValues.Count); - string result = System.Text.Encoding.UTF8.GetString(actualValues[0].Col); + string result = Encoding.UTF8.GetString(actualValues[0].Col); result.Should().Be(MoviesProvider.Movie1.Title); } @@ -340,9 +340,6 @@ public async Task FromBytes_CapturedVariable() byte[] bytes = Encoding.UTF8.GetBytes(MoviesProvider.Movie1.Title); //QWxpZW4= - var s = Context.CreateQuery(MoviesTableName) - .Select(c => new { Col = K.Functions.FromBytes(bytes, "utf8") }).ToQueryString(); - //Act var source = Context.CreateQuery(MoviesTableName) .Select(c => new { Col = K.Functions.FromBytes(bytes, "utf8") }) @@ -397,11 +394,6 @@ public async Task MapKeys() { //Arrange int expectedItemsCount = 1; - var map = new Dictionary - { - {"apple", 10}, - {"banana", 20} - }; //Act var source = Context.CreateQuery(MoviesTableName) diff --git a/Tests/ksqlDB.RestApi.Client.IntegrationTests/KSql/Query/Functions/KSqlInvocationFunctionsTests.cs b/Tests/ksqlDB.RestApi.Client.IntegrationTests/KSql/Query/Functions/KSqlInvocationFunctionsTests.cs index 2710f08d..f7dbf4b4 100644 --- a/Tests/ksqlDB.RestApi.Client.IntegrationTests/KSql/Query/Functions/KSqlInvocationFunctionsTests.cs +++ b/Tests/ksqlDB.RestApi.Client.IntegrationTests/KSql/Query/Functions/KSqlInvocationFunctionsTests.cs @@ -1,3 +1,4 @@ +using System.Net; using FluentAssertions; using ksqlDb.RestApi.Client.IntegrationTests.KSql.RestApi; using ksqlDB.RestApi.Client.KSql.Linq; @@ -14,35 +15,46 @@ public class KSqlInvocationFunctionsTests : Infrastructure.IntegrationTests public static async Task ClassInitialize() { RestApiProvider = KSqlDbRestApiProvider.Create(); - + var statement = new KSqlDbStatement( - @"CREATE STREAM stream2 (id INT, lambda_arr ARRAY) WITH (kafka_topic = 'stream2', partitions = 1, value_format = 'json');"); + $"CREATE STREAM {StreamName} (id INT, arr ARRAY) WITH (kafka_topic = '{StreamName}', partitions = 1, value_format = 'json');"); var response = await RestApiProvider.ExecuteStatementAsync(statement); + response.StatusCode.Should().Be(HttpStatusCode.OK); var statement2 = new KSqlDbStatement( - @"CREATE OR REPLACE STREAM stream4 (id INT, lambda_map MAP>) WITH (kafka_topic = 'stream4', partitions = 1, value_format = 'json');"); + $"CREATE OR REPLACE STREAM {StreamName4} (id INT, map MAP>) WITH (kafka_topic = '{StreamName4}', partitions = 1, value_format = 'json');"); response = await RestApiProvider.ExecuteStatementAsync(statement2); + response.StatusCode.Should().Be(HttpStatusCode.OK); - string insertIntoStream3 = "insert into stream4 (id, lambda_map) values (1, MAP('hello':= ARRAY [1,2,3], 'goodbye':= ARRAY [-1,-2,-3]) );"; + string insertIntoStream3 = $"insert into {StreamName4} (id, map) values (1, MAP('hello':= ARRAY [1,2,3], 'goodbye':= ARRAY [-1,-2,-3]) );"; response = await RestApiProvider.ExecuteStatementAsync( - new KSqlDbStatement("insert into stream2 (id, lambda_arr) values (1, ARRAY [1,2,3]);")); + new KSqlDbStatement($"insert into {StreamName} (id, arr) values (1, ARRAY [1,2,3]);")); + response.StatusCode.Should().Be(HttpStatusCode.OK); response = await RestApiProvider.ExecuteStatementAsync( new KSqlDbStatement(insertIntoStream3)); + response.StatusCode.Should().Be(HttpStatusCode.OK); + } + + [OneTimeTearDown] + public static async Task ClassCleanup() + { + await RestApiProvider.DropStreamAndTopic(StreamName); + await RestApiProvider.DropStreamAndTopic(StreamName4); } - record Lambda + private record Lambda { public int Id { get; set; } - public int[] Lambda_Arr { get; set; } = null!; - // public IEnumerable Lambda_Arr { get; set; } + public int[] Arr { get; set; } = null!; } - private readonly string streamName = "stream2"; + private const string StreamName = "stream2"; + private const string StreamName4 = "stream4"; [Test] public async Task TransformArray() @@ -51,8 +63,8 @@ public async Task TransformArray() int expectedItemsCount = 1; //Act - var source = Context.CreateQuery(streamName) - .Select(c => new { Col = KSqlFunctions.Instance.Transform(c.Lambda_Arr, x => x + 1) }) + var source = Context.CreateQuery(StreamName) + .Select(c => new { Col = KSqlFunctions.Instance.Transform(c.Arr, x => x + 1) }) .ToAsyncEnumerable(); var actualValues = await CollectActualValues(source, expectedItemsCount); @@ -62,10 +74,10 @@ public async Task TransformArray() actualValues[0].Col.Should().BeEquivalentTo(new[] {2,3,4}); } - class LambdaMap + private class LambdaMap { public int Id { get; set; } - public IDictionary Lambda_Map { get; set; } = null!; + public IDictionary Map { get; set; } = null!; public IDictionary Dictionary2 { get; set; } = null!; } @@ -79,7 +91,7 @@ public async Task TransformMap() //Act var source = Context.CreateQuery(streamNameWithMap) - .Select(c => new { Col = K.Functions.Transform(c.Lambda_Map, (k, v) => K.Functions.Concat(k, "_new"), (k, v) => K.Functions.Transform(v, x => x * x)) }) + .Select(c => new { Col = K.Functions.Transform(c.Map, (k, v) => K.Functions.Concat(k, "_new"), (k, v) => K.Functions.Transform(v, x => x * x)) }) .ToAsyncEnumerable(); var actualValues = await CollectActualValues(source, expectedItemsCount); @@ -98,8 +110,8 @@ public async Task FilterArray() int expectedItemsCount = 1; //Act - var source = Context.CreateQuery(streamName) - .Select(c => new { Col = KSqlFunctions.Instance.Filter(c.Lambda_Arr, x => x > 1) }) + var source = Context.CreateQuery(StreamName) + .Select(c => new { Col = KSqlFunctions.Instance.Filter(c.Arr, x => x > 1) }) .ToAsyncEnumerable(); var actualValues = await CollectActualValues(source, expectedItemsCount); @@ -117,7 +129,7 @@ public async Task FilterMap() //Act var source = Context.CreateQuery(streamNameWithMap) - .Select(c => new { Col = K.Functions.Filter(c.Lambda_Map, (k, v) => k != "E.T" && v[1] > 0) }) + .Select(c => new { Col = K.Functions.Filter(c.Map, (k, v) => k != "E.T" && v[1] > 0) }) .ToAsyncEnumerable(); var actualValues = await CollectActualValues(source, expectedItemsCount); @@ -135,8 +147,8 @@ public async Task ReduceArray() int expectedItemsCount = 1; //Act - var source = Context.CreateQuery(streamName) - .Select(c => new { Acc = K.Functions.Reduce(c.Lambda_Arr, 0, (x,y) => x + y) }) + var source = Context.CreateQuery(StreamName) + .Select(c => new { Acc = K.Functions.Reduce(c.Arr, 0, (x,y) => x + y) }) .ToAsyncEnumerable(); var actualValues = await CollectActualValues(source, expectedItemsCount); @@ -154,7 +166,7 @@ public async Task ReduceMap() //Act var source = Context.CreateQuery(streamNameWithMap) - .Select(c => new { Col = K.Functions.Reduce(c.Lambda_Map, 2, (s, k, v) => K.Functions.Ceil(s / v[1])) }) + .Select(c => new { Col = K.Functions.Reduce(c.Map, 2, (s, k, v) => K.Functions.Ceil(s / v[1])) }) .ToAsyncEnumerable(); var actualValues = await CollectActualValues(source, expectedItemsCount); diff --git a/Tests/ksqlDB.RestApi.Client.IntegrationTests/KSql/Query/KSqlLexicalPrecedenceTests.cs b/Tests/ksqlDB.RestApi.Client.IntegrationTests/KSql/Query/KSqlLexicalPrecedenceTests.cs index 6ccbb156..dc6aecfd 100644 --- a/Tests/ksqlDB.RestApi.Client.IntegrationTests/KSql/Query/KSqlLexicalPrecedenceTests.cs +++ b/Tests/ksqlDB.RestApi.Client.IntegrationTests/KSql/Query/KSqlLexicalPrecedenceTests.cs @@ -9,7 +9,7 @@ namespace ksqlDb.RestApi.Client.IntegrationTests.KSql.Query; public class KSqlLexicalPrecedenceTests : Infrastructure.IntegrationTests { - protected static MoviesProvider MoviesProvider = null!; + private static MoviesProvider moviesProvider = null!; [OneTimeSetUp] public static async Task ClassInitialize() @@ -21,20 +21,20 @@ protected static async Task InitializeDatabase() { RestApiProvider = KSqlDbRestApiProvider.Create(); - MoviesProvider = new MoviesProvider(RestApiProvider); - await MoviesProvider.CreateTablesAsync(); + moviesProvider = new MoviesProvider(RestApiProvider); + await moviesProvider.CreateTablesAsync(); - await MoviesProvider.InsertMovieAsync(MoviesProvider.Movie1); - await MoviesProvider.InsertMovieAsync(MoviesProvider.Movie2); + await moviesProvider.InsertMovieAsync(MoviesProvider.Movie1); + await moviesProvider.InsertMovieAsync(MoviesProvider.Movie2); } [OneTimeTearDown] public static async Task ClassCleanup() { - await MoviesProvider.DropTablesAsync(); + await moviesProvider.DropTablesAsync(); } - protected string MoviesTableName => MoviesProvider.MoviesTableName; + protected static string MoviesTableName => MoviesProvider.MoviesTableName; protected virtual IQbservable MoviesStream => Context.CreateQueryStream(MoviesTableName); diff --git a/Tests/ksqlDB.RestApi.Client.IntegrationTests/KSql/Query/KSqlNestedTypesTests.cs b/Tests/ksqlDB.RestApi.Client.IntegrationTests/KSql/Query/KSqlNestedTypesTests.cs index 85b8ec74..df019398 100644 --- a/Tests/ksqlDB.RestApi.Client.IntegrationTests/KSql/Query/KSqlNestedTypesTests.cs +++ b/Tests/ksqlDB.RestApi.Client.IntegrationTests/KSql/Query/KSqlNestedTypesTests.cs @@ -33,7 +33,7 @@ public static async Task ClassCleanup() await MoviesProvider.DropTablesAsync(); } - protected string MoviesTableName => MoviesProvider.MoviesTableName; + protected static string MoviesTableName => MoviesProvider.MoviesTableName; protected virtual IQbservable MoviesStream => Context.CreateQueryStream(MoviesTableName); @@ -44,7 +44,7 @@ public async Task ArrayInArray() int expectedItemsCount = 1; var expected = new[] { - new[] {1, 2}, + [1, 2], new[] {3, 4}, }; @@ -76,8 +76,8 @@ public async Task ArrayInMap() int expectedItemsCount = 1; var expected = new Dictionary { - { "a", new[] { 1, 2 } }, - { "b", new[] { 3, 4 } }, + { "a", [1, 2] }, + { "b", [3, 4] }, }; //Act @@ -172,11 +172,11 @@ public async Task MapInMap() actual["y"]["d"].Should().Be(expected["y"]["d"]); } - private struct MovieStruct + private readonly struct MovieStruct { - public string Title { get; set; } + public string Title { get; init; } - public int Id { get; set; } + public int Id { get; init; } } [Test]