diff --git a/README.md b/README.md index 3a98718e..42fe1cc2 100644 --- a/README.md +++ b/README.md @@ -41,7 +41,7 @@ var contextOptions = new KSqlDBContextOptions(ksqlDbUrl) await using var context = new KSqlDBContext(contextOptions); -using var subscription = context.CreateQueryStream() +using var subscription = context.CreatePushQuery() .WithOffsetResetPolicy(AutoOffsetReset.Latest) .Where(p => p.Message != "Hello world" || p.Id == 1) .Select(l => new { l.Message, l.Id }) @@ -263,7 +263,7 @@ Stream names are generated based on the generic record types. They are pluralize **By default the generated from item names such as stream and table names are pluralized**. This behavior could be switched off with the following `ShouldPluralizeStreamName` configuration. ```C# -context.CreateQueryStream(); +context.CreatePushQuery(); ``` ```SQL FROM People @@ -275,7 +275,7 @@ var contextOptions = new KSqlDBContextOptions(@"http://localhost:8088") ShouldPluralizeFromItemName = false }; -new KSqlDBContext(contextOptions).CreateQueryStream(); +new KSqlDBContext(contextOptions).CreatePushQuery(); ``` ```SQL FROM Person @@ -283,7 +283,7 @@ FROM Person Setting an arbitrary stream name (from_item name): ```C# -context.CreateQueryStream("custom_topic_name"); +context.CreatePushQuery("custom_topic_name"); ``` ```SQL FROM custom_topic_name diff --git a/docs/aggregations.md b/docs/aggregations.md index 5099636f..af753a38 100644 --- a/docs/aggregations.md +++ b/docs/aggregations.md @@ -21,7 +21,7 @@ var ksqlDbUrl = @"http://localhost:8088"; var contextOptions = new KSqlDBContextOptions(ksqlDbUrl); var context = new KSqlDBContext(contextOptions); -context.CreateQueryStream() +context.CreatePushQuery() .GroupBy(c => c.Id) .Select(g => new { Id = g.Key, Count = g.Count() }) .Subscribe(count => @@ -42,7 +42,7 @@ The Key should be mapped back to the respective column too Id = g.Key. See IKSql Or without the new expression: ```C# -context.CreateQueryStream() +context.CreatePushQuery() .GroupBy(c => c.Id) .Select(g => g.Count()); ``` @@ -59,7 +59,7 @@ SELECT COUNT(*) Extract records from an aggregation that fulfill a specified condition. ```C# -var query = context.CreateQueryStream() +var query = context.CreatePushQuery() .GroupBy(c => c.Id) .Having(c => c.Count() > 2) .Select(g => new { Id = g.Key, Count = g.Count()}); @@ -83,7 +83,7 @@ public class Click public string TIMESTAMP { get; set; } } -var query = context.CreateQueryStream() +var query = context.CreatePushQuery() .GroupBy(c => new { c.IP_ADDRESS, c.URL, c.TIMESTAMP }) .WindowedBy(new TimeWindows(Duration.OfMinutes(2))) .Having(c => c.Count(g => c.Key.IP_ADDRESS) == 1) @@ -105,7 +105,7 @@ HAVING COUNT(IP_ADDRESS) = 1 Sums the column values. ```C# -context.CreateQueryStream() +context.CreatePushQuery() .GroupBy(c => c.Id) //.Select(g => g.Sum(c => c.Amount)) .Select(g => new { Id = g.Key, Agg = g.Sum(c => c.Amount)}) @@ -157,7 +157,7 @@ MAX(col1) - **COLLECT_SET** - returns an array containing the distinct values of column from each input row (for the specified grouping and time window, if any). ```C# -var subscription = context.CreateQueryStream() +var subscription = context.CreatePushQuery() .GroupBy(c => c.Id) .Select(g => new { Id = g.Key, Array = g.CollectSet(c => c.Message) }) //.Select(g => new { Id = g.Key, Array = g.CollectList(c => c.Message) }) @@ -193,7 +193,7 @@ var dict = new Dictionary() ["Thomas"] = 42, }; -var source = Context.CreateQueryStream(TweetsStreamName) +var source = Context.CreatePushQuery(TweetsStreamName) .GroupBy(c => c.Id) .Select(l => new { Id = l.Key, Maps = l.CollectList(c => dict) }) ``` @@ -215,7 +215,7 @@ COUNT(Amount) Count ``` ```C# -new KSqlDBContext(@"http://localhost:8088").CreateQueryStream() +new KSqlDBContext(@"http://localhost:8088").CreatePushQuery() .GroupBy(c => c.Id) .Select(g => new { Id = g.Key, TopK = g.TopKDistinct(c => c.Amount, 4) }) .Subscribe(onNext: tweetMessage => @@ -254,7 +254,7 @@ in the partition have the lowest offsets. ```C# await using var context = new KSqlDBContext(@"http://localhost:8088"); -context.CreateQueryStream() +context.CreatePushQuery() .GroupBy(c => c.Id) .Select(g => new { Id = g.Key, EarliestByOffset = g.EarliestByOffset(c => c.Amount, 2) }) .Subscribe(earliest => @@ -279,7 +279,7 @@ SELECT Id, EARLIEST_BY_OFFSET(Amount, 2, True) EarliestByOffset CountDistinct, LongCountDistinct ```C# -var subscription = context.CreateQueryStream() +var subscription = context.CreatePushQuery() .GroupBy(c => c.Id) // .Select(g => new { Id = g.Key, Count = g.CountDistinct(c => c.Message) }) .Select(g => new { Id = g.Key, Count = g.LongCountDistinct(c => c.Message) }) @@ -310,7 +310,7 @@ using ksqlDB.RestApi.Client.KSql.Query.Windows; var tumblingWindow = new TimeWindows(Duration.OfSeconds(2), OutputRefinement.Final).WithGracePeriod(Duration.OfSeconds(2)); -var query = Context.CreateQueryStream() +var query = Context.CreatePushQuery() .WithOffsetResetPolicy(AutoOffsetReset.Earliest) .GroupBy(c => c.Id) .WindowedBy(tumblingWindow) @@ -336,7 +336,7 @@ Creation of windowed aggregation ```C# var context = new TransactionsDbProvider(ksqlDbUrl); -var windowedQuery = context.CreateQueryStream() +var windowedQuery = context.CreatePushQuery() .WindowedBy(new TimeWindows(Duration.OfSeconds(5)).WithGracePeriod(Duration.OfHours(2))) .GroupBy(c => c.CardNumber) .Select(g => new { CardNumber = g.Key, Count = g.Count() }); @@ -354,7 +354,7 @@ WINDOW TUMBLING (SIZE 5 SECONDS, GRACE PERIOD 2 HOURS) [Hopping window](https://docs.ksqldb.io/en/latest/concepts/time-and-windows-in-ksqldb-queries/#hopping-window) is a time-based windowing mechanism used for aggregating and processing streaming data within **overlapping** time intervals. ```C# -var subscription = context.CreateQueryStream() +var subscription = context.CreatePushQuery() .GroupBy(c => c.Id) .WindowedBy(new HoppingWindows(Duration.OfSeconds(5)).WithAdvanceBy(Duration.OfSeconds(4)).WithRetention(Duration.OfDays(7))) .Select(g => new { g.WindowStart, g.WindowEnd, Id = g.Key, Count = g.Count() }) @@ -375,7 +375,7 @@ Window advancement interval should be more than zero and less than window durati A [session window](https://docs.ksqldb.io/en/latest/concepts/time-and-windows-in-ksqldb-queries/#session-window) aggregates records into a session, which represents a period of activity separated by a specified gap of inactivity, or "idleness". ```C# -var query = context.CreateQueryStream() +var query = context.CreatePushQuery() .GroupBy(c => c.CardNumber) .WindowedBy(new SessionWindow(Duration.OfSeconds(5))) .Select(g => new { CardNumber = g.Key, Count = g.Count() }); diff --git a/docs/breaking_changes.md b/docs/breaking_changes.md index d608bdde..4008cd81 100644 --- a/docs/breaking_changes.md +++ b/docs/breaking_changes.md @@ -2,7 +2,7 @@ # v6.0.0 - `QueryType` enum was renamed to `EndpointType` -- removed `CreateQuery` from `IKSqlDBContext` interface, it was merged with `CreateQueryStream`. +- removed `CreateQuery` from `IKSqlDBContext` interface, it was merged with `CreateQueryStream`. Subsequently `CreateQueryStream` was renamed to `CreatePushQuery` to align with the nomenclature of `CreatePullQuery`. - removed `SetupQuery` from `KSqlDbContextOptionsBuilder`, it was merged with `SetupQueryStream`. Subsequently `SetupQueryStream` was renamed to `SetupPushQuery` to align with the nomenclature of `SetupPullQuery`. - introduced distinct parameters designed specifically for pull queries. Before this update, the parameters sent to both the 'query' and 'query-stream' endpoints were shared between pull and push queries. diff --git a/docs/cdc.md b/docs/cdc.md index 4172628e..bf0b19c3 100644 --- a/docs/cdc.md +++ b/docs/cdc.md @@ -55,7 +55,7 @@ class Program var semaphoreSlim = new SemaphoreSlim(0, 1); - var cdcSubscription = context.CreateQueryStream("sqlserversensors") + var cdcSubscription = context.CreatePushQuery("sqlserversensors") .WithOffsetResetPolicy(AutoOffsetReset.Latest) .Where(c => c.Op != "r" && (c.After == null || c.After.SensorId != "d542a2b3-c")) .Take(5) diff --git a/docs/data_types.md b/docs/data_types.md index cafa5822..c7d53a41 100644 --- a/docs/data_types.md +++ b/docs/data_types.md @@ -119,7 +119,7 @@ MAP('c' := 2, 'd' := 4)['d'] ``` Deeply nested types: ```C# -context.CreateQueryStream() +context.CreatePushQuery() .Select(c => new { Map = new Dictionary @@ -186,7 +186,7 @@ INSERT INTO MyClasses (Dt, Ts, DtOffset) VALUES ('2021-04-01', '01:02:03', '2021 ``` ```C# -using var subscription = context.CreateQueryStream() +using var subscription = context.CreatePushQuery() .Subscribe(onNext: m => { Console.WriteLine($"Time types: {m.Dt} : {m.Ts} : {m.DtOffset}"); diff --git a/docs/functions.md b/docs/functions.md index 927509ca..6be77a20 100644 --- a/docs/functions.md +++ b/docs/functions.md @@ -147,7 +147,7 @@ DATETOSTRING(18672, 'yyyy-MM-dd') #### TIMESTAMPTOSTRING ```C# -new KSqlDBContext(ksqlDbUrl).CreateQueryStream() +new KSqlDBContext(ksqlDbUrl).CreatePushQuery() .Select(c => K.Functions.TimestampToString(c.RowTime, "yyyy-MM-dd''T''HH:mm:ssX")) ``` @@ -165,7 +165,7 @@ FROM tweets EMIT CHANGES; bool sorted = true; var subscription = new KSqlDBContext(@"http://localhost:8088") - .CreateQueryStream() + .CreatePushQuery() .Select(c => new { Entries = KSqlFunctions.Instance.Entries(new Dictionary() @@ -194,7 +194,7 @@ FROM movies_test EMIT CHANGES; Converts any type to its string representation. ```C# -var query = context.CreateQueryStream() +var query = context.CreatePushQuery() .GroupBy(c => c.Title) .Select(c => new { Title = c.Key, Concatenated = K.Functions.Concat(c.Count().ToString(), "_Hello") }); ``` @@ -276,7 +276,7 @@ Subscribe to the unbounded stream of events: ```C# public IDisposable Invoke(IKSqlDBContext ksqlDbContext) { - var subscription = ksqlDbContext.CreateQueryStream(fromItemName: "stream2") + var subscription = ksqlDbContext.CreatePushQuery(fromItemName: "stream2") .WithOffsetResetPolicy(AutoOffsetReset.Earliest) .Select(c => new { @@ -424,7 +424,7 @@ REDUCE(DictionaryInValues, 2, (s, k, v) => CEIL(s / v)) **v1.5.0** ```C# -var ksql = ksqlDbContext.CreateQueryStream() +var ksql = ksqlDbContext.CreatePushQuery() .Select(c => new { Transformed = c.Lambda_Arr.Transform(x => x + 1), @@ -453,7 +453,7 @@ By constructing the appropriate function call and providing the necessary parame ```C# using ksqlDB.RestApi.Client.KSql.Query.Functions; -context.CreateQueryStream() +context.CreatePushQuery() .Select(c => new { Col = KSql.Functions.Dynamic("IFNULL(Message, 'n/a')") as string, c.Id, c.Amount, c.Message }); ``` The interesting part from the above query is: @@ -477,7 +477,7 @@ You can achieve a dynamic function call in C# that returns an array by using the ```C# using K = ksqlDB.RestApi.Client.KSql.Query.Functions.KSql; -context.CreateQueryStream() +context.CreatePushQuery() .Select(c => K.F.Dynamic("ARRAY_DISTINCT(ARRAY[1, 1, 2, 3, 1, 2])") as int[]) .Subscribe( message => Console.WriteLine($"{message[0]} - {message[^1]}"), diff --git a/docs/joins.md b/docs/joins.md index 43a29702..81a4724c 100644 --- a/docs/joins.md +++ b/docs/joins.md @@ -16,7 +16,7 @@ var ksqlDbUrl = @"http://localhost:8088"; var context = new KSqlDBContext(ksqlDbUrl); -var query = (from o in context.CreateQueryStream() +var query = (from o in context.CreatePushQuery() join p1 in Source.Of() on o.PaymentId equals p1.Id join s1 in Source.Of() on o.ShipmentId equals s1.Id into gj from sa in gj.DefaultIfEmpty() @@ -112,7 +112,7 @@ response = await restApiClient.InsertIntoAsync(shipment); Left joins can be also constructed in the following (less readable) way: ```C# -var query2 = KSqlDBContext.CreateQueryStream() +var query2 = KSqlDBContext.CreatePushQuery() .GroupJoin(Source.Of(), c => c.OrderId, c => c.Id, (order, gj) => new { order, @@ -141,7 +141,7 @@ SELECT o.OrderId OrderId, p.Id AS shipmentId - specifies a time window for stream-stream joins ```C# -var query = from o in KSqlDBContext.CreateQueryStream() +var query = from o in KSqlDBContext.CreatePushQuery() join p in Source.Of().Within(Duration.OfHours(1), Duration.OfDays(5)) on o.OrderId equals p.Id select new { @@ -166,7 +166,7 @@ Define nullable primitive value types in POCOs: ```C# var source = new KSqlDBContext(@"http://localhost:8088") - .CreateQueryStream() + .CreatePushQuery() .FullOuterJoin( Source.Of("Actors"), movie => movie.Title, @@ -211,7 +211,7 @@ SELECT m.Id Id, m.Title Title, m.Release_Year Release_Year, l.Title ActorTitle **LEFT OUTER** joins will contain leftRecord-NULL records in the result stream, which means that the join contains NULL values for fields selected from the right-hand stream where no match is made. ```C# -var query = new KSqlDBContext(@"http://localhost:8088").CreateQueryStream() +var query = new KSqlDBContext(@"http://localhost:8088").CreatePushQuery() .LeftJoin( Source.Of(), movie => movie.Title, @@ -242,7 +242,7 @@ Select all records for the right side of the join and the matching records from ```C# using ksqlDB.RestApi.Client.KSql.Linq; -var query = KSqlDBContext.CreateQueryStream(ActorsTableName) +var query = KSqlDBContext.CreatePushQuery(ActorsTableName) .RightJoin( Source.Of(MoviesTableName), actor => actor.Title, @@ -275,7 +275,7 @@ How to [join table and table](https://kafka-tutorials.confluent.io/join-a-table- ```C# using ksqlDB.RestApi.Client.KSql.Linq; -var query = context.CreateQueryStream() +var query = context.CreatePushQuery() .Join( Source.Of(nameof(Lead_Actor)), movie => movie.Title, diff --git a/docs/ksqldbcontext.md b/docs/ksqldbcontext.md index 0a7d04c5..c1a56b2b 100644 --- a/docs/ksqldbcontext.md +++ b/docs/ksqldbcontext.md @@ -34,7 +34,7 @@ var contextOptions = new KSqlDBContextOptions(ksqlDbUrl); await using var context = new KSqlDBContext(contextOptions); -using var disposable = context.CreateQueryStream() +using var disposable = context.CreatePushQuery() .Subscribe(onNext: movie => { Console.WriteLine($"{nameof(Movie)}: {movie.Id} - {movie.Title} - {movie.RowTime}"); @@ -80,10 +80,10 @@ using var disposable = context.CreateQuery() }, onError: error => { Console.WriteLine($"Exception: {error.Message}"); }, onCompleted: () => Console.WriteLine("Completed")); ``` -⚠️ In version 6.0.0, CreateQuery has been merged into CreateQueryStream. +⚠️ In version 6.0.0, CreateQuery has been merged into CreatePushQuery. # TFM netstandard 2.0 (.Net Framework, NetCoreApp 2.0 etc.) -The lack of support for **HTTP 2.0** in netstandard 2.0 prevents the exposure of `IKSqlDBContext.CreateQueryStream` in the current version. To address this limitation, `IKSqlDBContext.CreateQuery` was introduced as an alternative solution utilizing **HTTP 1.1** to provide the same functionality. +The lack of support for **HTTP 2.0** in netstandard 2.0 prevents the exposure of `IKSqlDBContext.CreatePushQuery` in the current version. To address this limitation, `IKSqlDBContext.CreateQuery` was introduced as an alternative solution utilizing **HTTP 1.1** to provide the same functionality. ## Basic auth **v1.0.0** @@ -144,7 +144,7 @@ internal class ApplicationKSqlDbContext : KSqlDBContext, Program.IApplicationKSq { } - public ksqlDB.RestApi.Client.KSql.Linq.IQbservable Movies => CreateQueryStream(); + public ksqlDB.RestApi.Client.KSql.Linq.IQbservable Movies => CreatePushQuery(); } public interface IApplicationKSqlDbContext : IKSqlDBContext @@ -290,7 +290,7 @@ public class Worker : IHostedService, IDisposable { logger.LogInformation("Starting"); - subscription = await context.CreateQueryStream() + subscription = await context.CreatePushQuery() .WithOffsetResetPolicy(AutoOffsetReset.Earliest) .SubscribeAsync( onNext: movie => { }, @@ -374,7 +374,7 @@ var responseMessage = await context.SaveChangesAsync(); > ⚠When creating `KSqlDBContextOptions` using a constructor or through `KSqlDbContextOptionsBuilder`, the default behavior is to set the `auto.offset.reset` property to "earliest." ```C# -public static KSqlDBContextOptions CreateQueryStreamOptions(string ksqlDbUrl) +public static KSqlDBContextOptions CreatePushQueryOptions(string ksqlDbUrl) { var contextOptions = new KSqlDbContextOptionsBuilder() .UseKSqlDb(ksqlDbUrl) @@ -482,7 +482,7 @@ INSERT INTO Message (Message, Id, `Values`) VALUES ('Hello', 42, '123, abc'); Stream the result of the SELECT query into an existing stream and its underlying topic. ```C# -var response = await context.CreateQueryStream("from_stream") +var response = await context.CreatePushQuery("from_stream") .Where(c => c.Title != "Apocalypse now") .InsertIntoAsync("stream_name"); @@ -503,7 +503,7 @@ Alternatively an optional query ID can be used for INSERT INTO queries: ```C# string queryId = "insert_query_123"; -var response = await context.CreateQueryStream("from_stream") +var response = await context.CreatePushQuery("from_stream") .Where(c => c.Title != "Apocalypse now") .InsertIntoAsync("stream_name", queryId); ``` diff --git a/docs/operators.md b/docs/operators.md index e444f4b9..5e84ff20 100644 --- a/docs/operators.md +++ b/docs/operators.md @@ -24,7 +24,7 @@ The **LIKE** operator is used in combination with the % (percent sign) wildcard Match a string with a specified pattern: ```C# -var query = context.CreateQueryStream() +var query = context.CreatePushQuery() .Where(c => c.Title.ToLower().Contains("hard".ToLower()); ``` @@ -36,7 +36,7 @@ SELECT * ``` ```C# -var query = context.CreateQueryStream() +var query = context.CreatePushQuery() .Where(c => c.Title.StartsWith("Die"); ``` @@ -54,7 +54,7 @@ The **IS NULL** and **IS NOT NULL** operators are used in the **WHERE** clause t ```C# using var subscription = new KSqlDBContext(@"http://localhost:8088") - .CreateQueryStream() + .CreatePushQuery() .Where(c => c.IP_ADDRESS != null || c.URL == null) .Select(c => new { c.IP_ADDRESS, c.URL, c.TIMESTAMP }); ``` @@ -105,7 +105,7 @@ The **BETWEEN** operator provides a concise way to specify range conditions in K ```C# using ksqlDB.RestApi.Client.KSql.Query.Operators; -IQbservable query = context.CreateQueryStream() +IQbservable query = context.CreatePushQuery() .Where(c => c.Id.Between(1, 5)); ``` @@ -137,7 +137,7 @@ Ts BETWEEN '11:00:00' AND '15:00:00' var from = new TimeSpan(11, 0, 0); var to = new TimeSpan(15, 0, 0); -var query = context.CreateQueryStream() +var query = context.CreatePushQuery() .Where(c => c.Ts.Between(from, to)) .Select(c => new { c.Ts, to, FromTime = from, DateTime.Now, New = new TimeSpan(1, 0, 0) } .ToQueryString(); @@ -149,7 +149,7 @@ var query = context.CreateQueryStream() - Select a condition from one or more expressions. ```C# var query = new KSqlDBContext(@"http://localhost:8088") - .CreateQueryStream() + .CreatePushQuery() .Select(c => new { @@ -187,7 +187,7 @@ You can use parentheses to change the order of evaluation: ```C# await using var context = new KSqlDBContext(@"http://localhost:8088"); -var query = context.CreateQueryStream() +var query = context.CreatePushQuery() .Select(c => (c.Longitude + c.Longitude) * c.Longitude); ``` @@ -201,7 +201,7 @@ In Where clauses: ```C# await using var context = new KSqlDBContext(@"http://localhost:8088"); -var query = context.CreateQueryStream() +var query = context.CreatePushQuery() .Where(c => (c.Latitude == "1" || c.Latitude != "2") && c.Latitude == "3"); ``` diff --git a/docs/protobuf.md b/docs/protobuf.md index aad75c15..0bb70b58 100644 --- a/docs/protobuf.md +++ b/docs/protobuf.md @@ -51,7 +51,7 @@ var ksqlDbUrl = @"http://localhost:8088"; await using var context = new ProtoBufKSqlDbContext(ksqlDbUrl); -var query = context.CreateQueryStream("movie") +var query = context.CreatePushQuery("movie") .Where(p => p.Title != "E.T.") .Select(l => new { Id = l.Id, l.Title }) .Take(2); // LIMIT 2 diff --git a/docs/push_queries.md b/docs/push_queries.md index aa3b575d..07ee2d72 100644 --- a/docs/push_queries.md +++ b/docs/push_queries.md @@ -5,13 +5,15 @@ They don't rely on batch processing or waiting for a predefined interval to prod It is important to note that `ksqlDB` does not support the **ORDER BY** clause. `ksqlDB` processes data in a streaming manner, and the order of events is based on their **arrival time** rather than explicit sorting. +⚠️ In version 6.0.0, `CreateQueryStream` was renamed to `CreatePushQuery`. + ### Take (LIMIT) **v1.0.0** Returns a specified number of contiguous elements from the start of a stream. Depends on the 'auto.topic.offset' parameter. ```C# -context.CreateQueryStream() +context.CreatePushQuery() .Take(2); ``` @@ -27,7 +29,7 @@ SELECT * Projects each element of a stream into a new form. ```C# -context.CreateQueryStream() +context.CreatePushQuery() .Select(l => new { l.RowTime, l.Message }); ``` Omitting select is equivalent to SELECT * @@ -37,7 +39,7 @@ Omitting select is equivalent to SELECT * ```C# var value = new FooClass { Property = 42 }; -var query = context.CreateQueryStream() +var query = context.CreatePushQuery() .Select(_ => new { Value = value @@ -56,7 +58,7 @@ SELECT STRUCT(Property := 42) AS Value Filters a stream of values based on a predicate. ```C# -context.CreateQueryStream() +context.CreatePushQuery() .Where(p => p.Message != "Hello world" || p.Id == 1) .Where(p => p.RowTime >= 1510923225000); ``` @@ -81,7 +83,7 @@ Implementing the `IObserver` interface: ```C# using var subscription = new KSqlDBContext(@"http://localhost:8088") - .CreateQueryStream() + .CreatePushQuery() .Subscribe(new TweetsObserver()); public class TweetsObserver : System.IObserver @@ -109,7 +111,7 @@ Observers can **react to changes** in the subject's state and perform actions ac Providing ```Action onNext, Action onError and Action onCompleted```: ```C# using var subscription = new KSqlDBContext(@"http://localhost:8088") - .CreateQueryStream() + .CreatePushQuery() .Subscribe( onNext: tweetMessage => { @@ -128,7 +130,7 @@ Moving to [Rx.NET](https://github.com/dotnet/reactive)... The following code snippet shows how to observe messages on the desired [IScheduler](http://introtorx.com/Content/v1.0.10621.0/15_SchedulingAndThreading.html): ```C# -using var disposable = context.CreateQueryStream() +using var disposable = context.CreatePushQuery() .Take(2) .ToObservable() //client side processing starts here lazily after subscription .ObserveOn(System.Reactive.Concurrency.DispatcherScheduler.Current) @@ -139,7 +141,7 @@ The `IScheduler` interface is part of the Reactive Extensions (Rx) library, whic Be cautious regarding to server side and client side processings: ```C# -KSql.Linq.IQbservable queryStream = context.CreateQueryStream().Take(2); +KSql.Linq.IQbservable queryStream = context.CreatePushQuery().Take(2); System.IObservable observable = queryStream.ToObservable(); @@ -152,7 +154,7 @@ WPF client side batching example: ```C# private static IDisposable ClientSideBatching(KSqlDBContext context) { - var disposable = context.CreateQueryStream() + var disposable = context.CreatePushQuery() //Client side execution .ToObservable() .Buffer(TimeSpan.FromMilliseconds(250), 100) @@ -180,7 +182,7 @@ It also **filters** the buffered events, allowing only those buffers with a coun `ToQueryString` is mainly helpful for debugging purposes. It returns the generated ksql query without executing it. ```C# -var ksql = context.CreateQueryStream().ToQueryString(); +var ksql = context.CreatePushQuery().ToQueryString(); //prints SELECT * FROM tweets EMIT CHANGES; Console.WriteLine(ksql); @@ -193,7 +195,7 @@ However, it's worth noting that query comprehension syntax is just a different w ```C# var grouping = - from city in context.CreateQueryStream() + from city in context.CreatePushQuery() where city.RegionCode != "xy" group city by city.State.Name into g select new @@ -215,7 +217,7 @@ Renaming of stream or table column names with the `JsonPropertyNameAttribute` wa - grouping by nested properies. Can be used in the following way: ```C# -var source = Context.CreateQueryStream() +var source = Context.CreatePushQuery() .WithOffsetResetPolicy(AutoOffsetReset.Earliest) .GroupBy(c => new { c.RegionCode, c.State.Name }) .Select(g => new { g.Source.RegionCode, g.Source.State.Name, Count = g.Count()}) @@ -251,7 +253,7 @@ SELECT RegionCode, State->Name, COUNT(*) Count Creates an [async iterator](https://docs.microsoft.com/en-us/archive/msdn-magazine/2019/november/csharp-iterating-with-async-enumerables-in-csharp-8) from the query: ```C# var cts = new CancellationTokenSource(); -var asyncTweetsEnumerable = context.CreateQueryStream().ToAsyncEnumerable(); +var asyncTweetsEnumerable = context.CreatePushQuery().ToAsyncEnumerable(); await foreach (var tweet in asyncTweetsEnumerable.WithCancellation(cts.Token)) Console.WriteLine(tweet.Message); @@ -289,7 +291,7 @@ private static async Task SubscribeAsync(IKSqlDBContext context) try { - var subscription = await context.CreateQueryStream() + var subscription = await context.CreatePushQuery() .SubscribeOn(ThreadPoolScheduler.Instance) .ObserveOn(TaskPoolScheduler.Default) .SubscribeAsync(onNext: movie => @@ -324,13 +326,13 @@ using ksqlDB.RestApi.Client.Sample.Models.Movies; public static async Task ExplainAsync(IKSqlDBContext context) { - var query = context.CreateQueryStream() + var query = context.CreatePushQuery() .Where(c => c.Title != "E.T."); string explain = await query .ExplainAsStringAsync(); - ExplainResponse[] explainResponses = await context.CreateQueryStream().ExplainAsync(); + ExplainResponse[] explainResponses = await context.CreatePushQuery().ExplainAsync(); Console.WriteLine(explainResponses[0].QueryDescription.ExecutionPlan); } @@ -375,7 +377,7 @@ QueryParameters queryParameters = new QueryParameters await using var context = new KSqlDBContext(@"http://localhost:8088"); -var moviesSource = context.CreateQueryStream(queryParameters) +var moviesSource = context.CreatePushQuery(queryParameters) .ToObservable(); ``` @@ -392,7 +394,7 @@ QueryStreamParameters queryStreamParameters = new QueryStreamParameters await using var context = new KSqlDBContext(@"http://localhost:8088"); -var source = context.CreateQueryStream(queryStreamParameters) +var source = context.CreatePushQuery(queryStreamParameters) .ToObservable(); ``` @@ -411,7 +413,7 @@ Overrides the AutoOffsetReset policy for the current query: ```C# using ksqlDB.RestApi.Client.KSql.Query.Options; -var subscription = context.CreateQueryStream() +var subscription = context.CreatePushQuery() .WithOffsetResetPolicy(AutoOffsetReset.Latest) .Subscribe(movie => { diff --git a/docs/statements.md b/docs/statements.md index 1e55d017..8f5e3e68 100644 --- a/docs/statements.md +++ b/docs/statements.md @@ -607,7 +607,7 @@ Drop table {nameof(Event)}; await using var ksqlDbContext = new KSqlDBContext(new KSqlDBContextOptions(ksqlDbUrl)); - var subscription = ksqlDbContext.CreateQueryStream() + var subscription = ksqlDbContext.CreatePushQuery() .Take(1) .Subscribe(value => {