Skip to content

Commit

Permalink
[ksqlDb.RestApi.Client]: integration tests improvements
Browse files Browse the repository at this point in the history
  • Loading branch information
tomasfabian committed Mar 7, 2024
1 parent fe81c38 commit 6eec6a7
Show file tree
Hide file tree
Showing 7 changed files with 63 additions and 64 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ protected static async Task InitializeDatabase()
[OneTimeTearDown]
public static async Task ClassCleanup()
{
var result = await RestApiProvider.DropStreamAndTopic(StreamName);
await RestApiProvider.DropStreamAndTopic(StreamName);
}

protected virtual ksqlDB.RestApi.Client.KSql.Linq.IQbservable<Tweet> QuerySource =>
Expand Down Expand Up @@ -240,7 +240,6 @@ public async Task SubscribeAsync_Canceled()
.SubscribeAsync(c => actualValues.Add(c), e => semaphore.Release(), () => semaphore.Release(), cancellationToken: cts.Token);

await cts.CancelAsync();
await semaphore.WaitAsync(timeOut);

//Assert
Assert.AreEqual(0, actualValues.Count);
Expand Down Expand Up @@ -275,13 +274,13 @@ public async Task ObserveOn_TaskPoolScheduler_ReceivesValuesOnNewThread()
var semaphore = new SemaphoreSlim(initialCount: 0, 1);
var source = QuerySource;

var currentThread = Thread.CurrentThread.ManagedThreadId;
var currentThread = Environment.CurrentManagedThreadId;
int? observeOnThread = null;

//Act
var subscription = await source.Take(1)
.ObserveOn(TaskPoolScheduler.Default)
.SubscribeAsync(_ => observeOnThread = Thread.CurrentThread.ManagedThreadId, e => semaphore.Release(), () => semaphore.Release());
.SubscribeAsync(_ => observeOnThread = Environment.CurrentManagedThreadId, e => semaphore.Release(), () => semaphore.Release());

await semaphore.WaitAsync(timeOut);

Expand Down Expand Up @@ -509,9 +508,6 @@ public async Task ListContainsProjection()

var orderTypes = new List<int> { 1, 3 };

var c = QuerySource
.Select(c => orderTypes.Contains(c.Id)).ToQueryString();

var source = QuerySource
.Select(c => orderTypes.Contains(c.Id))
.ToAsyncEnumerable();
Expand Down Expand Up @@ -711,7 +707,7 @@ public async Task SelectAsStruct()
var ksql =
$"SELECT STRUCT(NAME := 'E.T.') FROM {StreamName} EMIT CHANGES LIMIT 1;";

QueryStreamParameters queryStreamParameters = new QueryStreamParameters
QueryStreamParameters queryStreamParameters = new()
{
Sql = ksql,
[QueryParameters.AutoOffsetResetPropertyName] = "earliest",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ public async Task CreateOrReplaceStreamStatement_ToStatementString_ComplexQueryW
//Assert
ksql.Should().BeEquivalentTo(@$"CREATE OR REPLACE STREAM {StreamName}
WITH ( KAFKA_TOPIC='moviesByTitle', KEY_FORMAT='Json', VALUE_FORMAT='Json', PARTITIONS='1', REPLICAS='1' )
AS SELECT Id, Title, ReleaseYear AS ReleaseYear FROM {StreamEntityName}
AS SELECT Id, Title, Release_Year AS ReleaseYear FROM {StreamEntityName}
WHERE Id < 3 PARTITION BY Id EMIT CHANGES;");

var responses = await httpResponseMessage.ToStatementResponsesAsync();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -103,12 +103,12 @@ public async Task WithValue_RendersFromProvidedValue()

private record TimeTypes
{
public DateTime Dt { get; set; }
public TimeSpan Ts { get; set; }
public DateTime Dt { get; init; }
public TimeSpan Ts { get; init; }
public DateTimeOffset DtOffset { get; set; }
}

private readonly EntityCreationMetadata metadata = new EntityCreationMetadata
private readonly EntityCreationMetadata metadata = new()
{
KafkaTopic = nameof(TimeTypes),
Partitions = 1,
Expand All @@ -126,7 +126,7 @@ public async Task TimeTypes_InsertValues_ValuesReceived()

var buildServiceProvider = serviceCollection.BuildServiceProvider();
var httpResponseMessage = await buildServiceProvider.GetRequiredService<IKSqlDbRestApiClient>().CreateStreamAsync<TimeTypes>(metadata);
var statementResponses = await httpResponseMessage.ToStatementResponsesAsync().ConfigureAwait(false);
await httpResponseMessage.ToStatementResponsesAsync().ConfigureAwait(false);

await using var context = buildServiceProvider.GetRequiredService<IKSqlDBContext>();

Expand All @@ -138,12 +138,12 @@ public async Task TimeTypes_InsertValues_ValuesReceived()
using var subscription = context.CreateQueryStream<TimeTypes>()
.Take(1)
.Subscribe(value =>
{
receivedValues.Add(value);
}, error =>
{
semaphoreSlim.Release();
},
{
receivedValues.Add(value);
}, error =>
{
semaphoreSlim.Release();
},
() =>
{
semaphoreSlim.Release();
Expand All @@ -154,7 +154,6 @@ public async Task TimeTypes_InsertValues_ValuesReceived()
Dt = new DateTime(2021, 4, 1),
Ts = new TimeSpan(1, 2, 3),
DtOffset = new DateTimeOffset(2021, 7, 4, 13, 29, 45, 447, TimeSpan.Zero)
//DtOffset = new DateTimeOffset(2021, 7, 4, 13, 29, 45, 447, TimeSpan.FromHours(4))
};

context.Add(value);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,7 @@ public async Task ArrayJoin()
//Assert
Assert.AreEqual(expectedItemsCount, actualValues.Count);
actualValues.Count.Should().Be(1);
actualValues[0].Col.Should().Be(@"1;2");
actualValues[0].Col.Should().Be("1;2");
}

[Test]
Expand Down Expand Up @@ -308,7 +308,7 @@ public async Task ToBytes()
//Assert
Assert.AreEqual(expectedItemsCount, actualValues.Count);

string result = System.Text.Encoding.UTF8.GetString(actualValues[0].Col);
string result = Encoding.UTF8.GetString(actualValues[0].Col);
result.Should().Be(MoviesProvider.Movie1.Title);
}

Expand Down Expand Up @@ -340,9 +340,6 @@ public async Task FromBytes_CapturedVariable()
byte[] bytes = Encoding.UTF8.GetBytes(MoviesProvider.Movie1.Title);
//QWxpZW4=

var s = Context.CreateQuery<Movie>(MoviesTableName)
.Select(c => new { Col = K.Functions.FromBytes(bytes, "utf8") }).ToQueryString();

//Act
var source = Context.CreateQuery<Movie>(MoviesTableName)
.Select(c => new { Col = K.Functions.FromBytes(bytes, "utf8") })
Expand Down Expand Up @@ -397,11 +394,6 @@ public async Task MapKeys()
{
//Arrange
int expectedItemsCount = 1;
var map = new Dictionary<string, int>
{
{"apple", 10},
{"banana", 20}
};

//Act
var source = Context.CreateQuery<Movie>(MoviesTableName)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
using System.Net;
using FluentAssertions;
using ksqlDb.RestApi.Client.IntegrationTests.KSql.RestApi;
using ksqlDB.RestApi.Client.KSql.Linq;
Expand All @@ -14,35 +15,46 @@ public class KSqlInvocationFunctionsTests : Infrastructure.IntegrationTests
public static async Task ClassInitialize()
{
RestApiProvider = KSqlDbRestApiProvider.Create();

var statement =
new KSqlDbStatement(
@"CREATE STREAM stream2 (id INT, lambda_arr ARRAY<INTEGER>) WITH (kafka_topic = 'stream2', partitions = 1, value_format = 'json');");
$"CREATE STREAM {StreamName} (id INT, arr ARRAY<INTEGER>) WITH (kafka_topic = '{StreamName}', partitions = 1, value_format = 'json');");

var response = await RestApiProvider.ExecuteStatementAsync(statement);
response.StatusCode.Should().Be(HttpStatusCode.OK);

var statement2 =
new KSqlDbStatement(
@"CREATE OR REPLACE STREAM stream4 (id INT, lambda_map MAP<STRING,ARRAY<INTEGER>>) WITH (kafka_topic = 'stream4', partitions = 1, value_format = 'json');");
$"CREATE OR REPLACE STREAM {StreamName4} (id INT, map MAP<STRING,ARRAY<INTEGER>>) WITH (kafka_topic = '{StreamName4}', partitions = 1, value_format = 'json');");

response = await RestApiProvider.ExecuteStatementAsync(statement2);
response.StatusCode.Should().Be(HttpStatusCode.OK);

string insertIntoStream3 = "insert into stream4 (id, lambda_map) values (1, MAP('hello':= ARRAY [1,2,3], 'goodbye':= ARRAY [-1,-2,-3]) );";
string insertIntoStream3 = $"insert into {StreamName4} (id, map) values (1, MAP('hello':= ARRAY [1,2,3], 'goodbye':= ARRAY [-1,-2,-3]) );";

response = await RestApiProvider.ExecuteStatementAsync(
new KSqlDbStatement("insert into stream2 (id, lambda_arr) values (1, ARRAY [1,2,3]);"));
new KSqlDbStatement($"insert into {StreamName} (id, arr) values (1, ARRAY [1,2,3]);"));
response.StatusCode.Should().Be(HttpStatusCode.OK);
response = await RestApiProvider.ExecuteStatementAsync(
new KSqlDbStatement(insertIntoStream3));
response.StatusCode.Should().Be(HttpStatusCode.OK);
}

[OneTimeTearDown]
public static async Task ClassCleanup()
{
await RestApiProvider.DropStreamAndTopic(StreamName);
await RestApiProvider.DropStreamAndTopic(StreamName4);
}

record Lambda
private record Lambda
{
public int Id { get; set; }
public int[] Lambda_Arr { get; set; } = null!;
// public IEnumerable<int> Lambda_Arr { get; set; }
public int[] Arr { get; set; } = null!;
}

private readonly string streamName = "stream2";
private const string StreamName = "stream2";
private const string StreamName4 = "stream4";

[Test]
public async Task TransformArray()
Expand All @@ -51,8 +63,8 @@ public async Task TransformArray()
int expectedItemsCount = 1;

//Act
var source = Context.CreateQuery<Lambda>(streamName)
.Select(c => new { Col = KSqlFunctions.Instance.Transform(c.Lambda_Arr, x => x + 1) })
var source = Context.CreateQuery<Lambda>(StreamName)
.Select(c => new { Col = KSqlFunctions.Instance.Transform(c.Arr, x => x + 1) })
.ToAsyncEnumerable();

var actualValues = await CollectActualValues(source, expectedItemsCount);
Expand All @@ -62,10 +74,10 @@ public async Task TransformArray()
actualValues[0].Col.Should().BeEquivalentTo(new[] {2,3,4});
}

class LambdaMap
private class LambdaMap
{
public int Id { get; set; }
public IDictionary<string, int[]> Lambda_Map { get; set; } = null!;
public IDictionary<string, int[]> Map { get; set; } = null!;
public IDictionary<string, int> Dictionary2 { get; set; } = null!;
}

Expand All @@ -79,7 +91,7 @@ public async Task TransformMap()

//Act
var source = Context.CreateQuery<LambdaMap>(streamNameWithMap)
.Select(c => new { Col = K.Functions.Transform(c.Lambda_Map, (k, v) => K.Functions.Concat(k, "_new"), (k, v) => K.Functions.Transform(v, x => x * x)) })
.Select(c => new { Col = K.Functions.Transform(c.Map, (k, v) => K.Functions.Concat(k, "_new"), (k, v) => K.Functions.Transform(v, x => x * x)) })
.ToAsyncEnumerable();

var actualValues = await CollectActualValues(source, expectedItemsCount);
Expand All @@ -98,8 +110,8 @@ public async Task FilterArray()
int expectedItemsCount = 1;

//Act
var source = Context.CreateQuery<Lambda>(streamName)
.Select(c => new { Col = KSqlFunctions.Instance.Filter(c.Lambda_Arr, x => x > 1) })
var source = Context.CreateQuery<Lambda>(StreamName)
.Select(c => new { Col = KSqlFunctions.Instance.Filter(c.Arr, x => x > 1) })
.ToAsyncEnumerable();

var actualValues = await CollectActualValues(source, expectedItemsCount);
Expand All @@ -117,7 +129,7 @@ public async Task FilterMap()

//Act
var source = Context.CreateQuery<LambdaMap>(streamNameWithMap)
.Select(c => new { Col = K.Functions.Filter(c.Lambda_Map, (k, v) => k != "E.T" && v[1] > 0) })
.Select(c => new { Col = K.Functions.Filter(c.Map, (k, v) => k != "E.T" && v[1] > 0) })
.ToAsyncEnumerable();

var actualValues = await CollectActualValues(source, expectedItemsCount);
Expand All @@ -135,8 +147,8 @@ public async Task ReduceArray()
int expectedItemsCount = 1;

//Act
var source = Context.CreateQuery<Lambda>(streamName)
.Select(c => new { Acc = K.Functions.Reduce(c.Lambda_Arr, 0, (x,y) => x + y) })
var source = Context.CreateQuery<Lambda>(StreamName)
.Select(c => new { Acc = K.Functions.Reduce(c.Arr, 0, (x,y) => x + y) })
.ToAsyncEnumerable();

var actualValues = await CollectActualValues(source, expectedItemsCount);
Expand All @@ -154,7 +166,7 @@ public async Task ReduceMap()

//Act
var source = Context.CreateQuery<LambdaMap>(streamNameWithMap)
.Select(c => new { Col = K.Functions.Reduce(c.Lambda_Map, 2, (s, k, v) => K.Functions.Ceil(s / v[1])) })
.Select(c => new { Col = K.Functions.Reduce(c.Map, 2, (s, k, v) => K.Functions.Ceil(s / v[1])) })
.ToAsyncEnumerable();

var actualValues = await CollectActualValues(source, expectedItemsCount);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ namespace ksqlDb.RestApi.Client.IntegrationTests.KSql.Query;

public class KSqlLexicalPrecedenceTests : Infrastructure.IntegrationTests
{
protected static MoviesProvider MoviesProvider = null!;
private static MoviesProvider moviesProvider = null!;

[OneTimeSetUp]
public static async Task ClassInitialize()
Expand All @@ -21,20 +21,20 @@ protected static async Task InitializeDatabase()
{
RestApiProvider = KSqlDbRestApiProvider.Create();

MoviesProvider = new MoviesProvider(RestApiProvider);
await MoviesProvider.CreateTablesAsync();
moviesProvider = new MoviesProvider(RestApiProvider);
await moviesProvider.CreateTablesAsync();

await MoviesProvider.InsertMovieAsync(MoviesProvider.Movie1);
await MoviesProvider.InsertMovieAsync(MoviesProvider.Movie2);
await moviesProvider.InsertMovieAsync(MoviesProvider.Movie1);
await moviesProvider.InsertMovieAsync(MoviesProvider.Movie2);
}

[OneTimeTearDown]
public static async Task ClassCleanup()
{
await MoviesProvider.DropTablesAsync();
await moviesProvider.DropTablesAsync();
}

protected string MoviesTableName => MoviesProvider.MoviesTableName;
protected static string MoviesTableName => MoviesProvider.MoviesTableName;

protected virtual IQbservable<Movie> MoviesStream => Context.CreateQueryStream<Movie>(MoviesTableName);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ public static async Task ClassCleanup()
await MoviesProvider.DropTablesAsync();
}

protected string MoviesTableName => MoviesProvider.MoviesTableName;
protected static string MoviesTableName => MoviesProvider.MoviesTableName;

protected virtual IQbservable<Movie> MoviesStream => Context.CreateQueryStream<Movie>(MoviesTableName);

Expand All @@ -44,7 +44,7 @@ public async Task ArrayInArray()
int expectedItemsCount = 1;
var expected = new[]
{
new[] {1, 2},
[1, 2],
new[] {3, 4},
};

Expand Down Expand Up @@ -76,8 +76,8 @@ public async Task ArrayInMap()
int expectedItemsCount = 1;
var expected = new Dictionary<string, int[]>
{
{ "a", new[] { 1, 2 } },
{ "b", new[] { 3, 4 } },
{ "a", [1, 2] },
{ "b", [3, 4] },
};

//Act
Expand Down Expand Up @@ -172,11 +172,11 @@ public async Task MapInMap()
actual["y"]["d"].Should().Be(expected["y"]["d"]);
}

private struct MovieStruct
private readonly struct MovieStruct
{
public string Title { get; set; }
public string Title { get; init; }

public int Id { get; set; }
public int Id { get; init; }
}

[Test]
Expand Down

0 comments on commit 6eec6a7

Please sign in to comment.