From c28d758b6fb17744926e4bfe5ccf92c4622c48e2 Mon Sep 17 00:00:00 2001 From: Tomas Fabian Date: Thu, 1 Feb 2024 06:31:34 +0100 Subject: [PATCH] [ksqlDB.RestApi.Client]: added Exploded table function support integration test --- .../KSql/Linq/FunctionsTests.cs | 64 +++++++++++++++++++ 1 file changed, 64 insertions(+) create mode 100644 Tests/ksqlDB.RestApi.Client.IntegrationTests/KSql/Linq/FunctionsTests.cs diff --git a/Tests/ksqlDB.RestApi.Client.IntegrationTests/KSql/Linq/FunctionsTests.cs b/Tests/ksqlDB.RestApi.Client.IntegrationTests/KSql/Linq/FunctionsTests.cs new file mode 100644 index 00000000..564fc430 --- /dev/null +++ b/Tests/ksqlDB.RestApi.Client.IntegrationTests/KSql/Linq/FunctionsTests.cs @@ -0,0 +1,64 @@ +using FluentAssertions; +using ksqlDb.RestApi.Client.IntegrationTests.KSql.RestApi; +using ksqlDb.RestApi.Client.IntegrationTests.Models; +using ksqlDB.RestApi.Client.KSql.Linq; +using ksqlDB.RestApi.Client.KSql.Query.Functions; +using NUnit.Framework; + +namespace ksqlDb.RestApi.Client.IntegrationTests.KSql.Linq +{ + public class FunctionsTests : Infrastructure.IntegrationTests + { + private static TweetsProvider? tweetsProvider; + + private static readonly string TweetsTopicName = "tweetsFunctionTestTopic"; + protected static string TweetsStreamName = "tweetsFunctionTest"; + + [OneTimeSetUp] + public static async Task ClassInitialize() + { + RestApiProvider = KSqlDbRestApiProvider.Create(); + + tweetsProvider = new TweetsProvider(RestApiProvider); + + await tweetsProvider.CreateTweetsStream(TweetsStreamName, TweetsTopicName); + + await tweetsProvider.InsertTweetAsync(TweetsProvider.Tweet1, TweetsStreamName); + await tweetsProvider.InsertTweetAsync(TweetsProvider.Tweet2, TweetsStreamName); + } + + [OneTimeTearDown] + public static async Task ClassCleanup() + { + await RestApiProvider.DropStreamAndTopic(TweetsStreamName); + + tweetsProvider = null; + } + + [Test] + public async Task Explode() + { + await Explode(Context.CreateQueryStream(TweetsStreamName)); + } + + private async Task Explode(IQbservable querySource) + { + //Arrange + int expectedItemsCount = 3; + + string[] array1 = ["a", "b"]; + int[] array2 = [1, 2, 3]; + + var source = querySource + .Select(l => new { Result = array1.Explode(), Result1 = array2.Explode() }) + .ToAsyncEnumerable(); + + //Act + var actualValues = await CollectActualValues(source, expectedItemsCount); + + //Assert + var results = actualValues.FirstOrDefault(c => c.Result == array1[0] && c.Result1 == array2[0]); + results.Should().NotBeNull(); + } + } +}