Skip to content

Commit

Permalink
[ksqlDB.RestApi.Client]: KSqlDbContext CreateQueryStream was renamed …
Browse files Browse the repository at this point in the history
…to CreatePushQuery adjusted docs
  • Loading branch information
tomasfabian committed Apr 29, 2024
1 parent f63ff76 commit 54dcb06
Show file tree
Hide file tree
Showing 12 changed files with 75 additions and 73 deletions.
8 changes: 4 additions & 4 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ var contextOptions = new KSqlDBContextOptions(ksqlDbUrl)

await using var context = new KSqlDBContext(contextOptions);

using var subscription = context.CreateQueryStream<Tweet>()
using var subscription = context.CreatePushQuery<Tweet>()
.WithOffsetResetPolicy(AutoOffsetReset.Latest)
.Where(p => p.Message != "Hello world" || p.Id == 1)
.Select(l => new { l.Message, l.Id })
Expand Down Expand Up @@ -263,7 +263,7 @@ Stream names are generated based on the generic record types. They are pluralize
**By default the generated from item names such as stream and table names are pluralized**. This behavior could be switched off with the following `ShouldPluralizeStreamName` configuration.

```C#
context.CreateQueryStream<Person>();
context.CreatePushQuery<Person>();
```
```SQL
FROM People
Expand All @@ -275,15 +275,15 @@ var contextOptions = new KSqlDBContextOptions(@"http://localhost:8088")
ShouldPluralizeFromItemName = false
};

new KSqlDBContext(contextOptions).CreateQueryStream<Person>();
new KSqlDBContext(contextOptions).CreatePushQuery<Person>();
```
```SQL
FROM Person
```

Setting an arbitrary stream name (from_item name):
```C#
context.CreateQueryStream<Tweet>("custom_topic_name");
context.CreatePushQuery<Tweet>("custom_topic_name");
```
```SQL
FROM custom_topic_name
Expand Down
28 changes: 14 additions & 14 deletions docs/aggregations.md
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ var ksqlDbUrl = @"http://localhost:8088";
var contextOptions = new KSqlDBContextOptions(ksqlDbUrl);
var context = new KSqlDBContext(contextOptions);

context.CreateQueryStream<Tweet>()
context.CreatePushQuery<Tweet>()
.GroupBy(c => c.Id)
.Select(g => new { Id = g.Key, Count = g.Count() })
.Subscribe(count =>
Expand All @@ -42,7 +42,7 @@ The Key should be mapped back to the respective column too Id = g.Key. See IKSql

Or without the new expression:
```C#
context.CreateQueryStream<Tweet>()
context.CreatePushQuery<Tweet>()
.GroupBy(c => c.Id)
.Select(g => g.Count());
```
Expand All @@ -59,7 +59,7 @@ SELECT COUNT(*)
Extract records from an aggregation that fulfill a specified condition.

```C#
var query = context.CreateQueryStream<Tweet>()
var query = context.CreatePushQuery<Tweet>()
.GroupBy(c => c.Id)
.Having(c => c.Count() > 2)
.Select(g => new { Id = g.Key, Count = g.Count()});
Expand All @@ -83,7 +83,7 @@ public class Click
public string TIMESTAMP { get; set; }
}

var query = context.CreateQueryStream<Click>()
var query = context.CreatePushQuery<Click>()
.GroupBy(c => new { c.IP_ADDRESS, c.URL, c.TIMESTAMP })
.WindowedBy(new TimeWindows(Duration.OfMinutes(2)))
.Having(c => c.Count(g => c.Key.IP_ADDRESS) == 1)
Expand All @@ -105,7 +105,7 @@ HAVING COUNT(IP_ADDRESS) = 1
Sums the column values.

```C#
context.CreateQueryStream<Tweet>()
context.CreatePushQuery<Tweet>()
.GroupBy(c => c.Id)
//.Select(g => g.Sum(c => c.Amount))
.Select(g => new { Id = g.Key, Agg = g.Sum(c => c.Amount)})
Expand Down Expand Up @@ -157,7 +157,7 @@ MAX(col1)
- **COLLECT_SET** - returns an array containing the distinct values of column from each input row (for the specified grouping and time window, if any).

```C#
var subscription = context.CreateQueryStream<Tweet>()
var subscription = context.CreatePushQuery<Tweet>()
.GroupBy(c => c.Id)
.Select(g => new { Id = g.Key, Array = g.CollectSet(c => c.Message) })
//.Select(g => new { Id = g.Key, Array = g.CollectList(c => c.Message) })
Expand Down Expand Up @@ -193,7 +193,7 @@ var dict = new Dictionary<string, int>()
["Thomas"] = 42,
};

var source = Context.CreateQueryStream<Tweet>(TweetsStreamName)
var source = Context.CreatePushQuery<Tweet>(TweetsStreamName)
.GroupBy(c => c.Id)
.Select(l => new { Id = l.Key, Maps = l.CollectList(c => dict) })
```
Expand All @@ -215,7 +215,7 @@ COUNT(Amount) Count
```

```C#
new KSqlDBContext(@"http://localhost:8088").CreateQueryStream<Tweet>()
new KSqlDBContext(@"http://localhost:8088").CreatePushQuery<Tweet>()
.GroupBy(c => c.Id)
.Select(g => new { Id = g.Key, TopK = g.TopKDistinct(c => c.Amount, 4) })
.Subscribe(onNext: tweetMessage =>
Expand Down Expand Up @@ -254,7 +254,7 @@ in the partition have the lowest offsets.
```C#
await using var context = new KSqlDBContext(@"http://localhost:8088");

context.CreateQueryStream<Tweet>()
context.CreatePushQuery<Tweet>()
.GroupBy(c => c.Id)
.Select(g => new { Id = g.Key, EarliestByOffset = g.EarliestByOffset(c => c.Amount, 2) })
.Subscribe(earliest =>
Expand All @@ -279,7 +279,7 @@ SELECT Id, EARLIEST_BY_OFFSET(Amount, 2, True) EarliestByOffset
CountDistinct, LongCountDistinct

```C#
var subscription = context.CreateQueryStream<Tweet>()
var subscription = context.CreatePushQuery<Tweet>()
.GroupBy(c => c.Id)
// .Select(g => new { Id = g.Key, Count = g.CountDistinct(c => c.Message) })
.Select(g => new { Id = g.Key, Count = g.LongCountDistinct(c => c.Message) })
Expand Down Expand Up @@ -310,7 +310,7 @@ using ksqlDB.RestApi.Client.KSql.Query.Windows;
var tumblingWindow =
new TimeWindows(Duration.OfSeconds(2), OutputRefinement.Final).WithGracePeriod(Duration.OfSeconds(2));

var query = Context.CreateQueryStream<Tweet>()
var query = Context.CreatePushQuery<Tweet>()
.WithOffsetResetPolicy(AutoOffsetReset.Earliest)
.GroupBy(c => c.Id)
.WindowedBy(tumblingWindow)
Expand All @@ -336,7 +336,7 @@ Creation of windowed aggregation
```C#
var context = new TransactionsDbProvider(ksqlDbUrl);

var windowedQuery = context.CreateQueryStream<Transaction>()
var windowedQuery = context.CreatePushQuery<Transaction>()
.WindowedBy(new TimeWindows(Duration.OfSeconds(5)).WithGracePeriod(Duration.OfHours(2)))
.GroupBy(c => c.CardNumber)
.Select(g => new { CardNumber = g.Key, Count = g.Count() });
Expand All @@ -354,7 +354,7 @@ WINDOW TUMBLING (SIZE 5 SECONDS, GRACE PERIOD 2 HOURS)

[Hopping window](https://docs.ksqldb.io/en/latest/concepts/time-and-windows-in-ksqldb-queries/#hopping-window) is a time-based windowing mechanism used for aggregating and processing streaming data within **overlapping** time intervals.
```C#
var subscription = context.CreateQueryStream<Tweet>()
var subscription = context.CreatePushQuery<Tweet>()
.GroupBy(c => c.Id)
.WindowedBy(new HoppingWindows(Duration.OfSeconds(5)).WithAdvanceBy(Duration.OfSeconds(4)).WithRetention(Duration.OfDays(7)))
.Select(g => new { g.WindowStart, g.WindowEnd, Id = g.Key, Count = g.Count() })
Expand All @@ -375,7 +375,7 @@ Window advancement interval should be more than zero and less than window durati

A [session window](https://docs.ksqldb.io/en/latest/concepts/time-and-windows-in-ksqldb-queries/#session-window) aggregates records into a session, which represents a period of activity separated by a specified gap of inactivity, or "idleness".
```C#
var query = context.CreateQueryStream<Transaction>()
var query = context.CreatePushQuery<Transaction>()
.GroupBy(c => c.CardNumber)
.WindowedBy(new SessionWindow(Duration.OfSeconds(5)))
.Select(g => new { CardNumber = g.Key, Count = g.Count() });
Expand Down
2 changes: 1 addition & 1 deletion docs/breaking_changes.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

# v6.0.0
- `QueryType` enum was renamed to `EndpointType`
- removed `CreateQuery` from `IKSqlDBContext` interface, it was merged with `CreateQueryStream`.
- removed `CreateQuery` from `IKSqlDBContext` interface, it was merged with `CreateQueryStream`. Subsequently `CreateQueryStream` was renamed to `CreatePushQuery` to align with the nomenclature of `CreatePullQuery`.
- removed `SetupQuery` from `KSqlDbContextOptionsBuilder`, it was merged with `SetupQueryStream`. Subsequently `SetupQueryStream` was renamed to `SetupPushQuery` to align with the nomenclature of `SetupPullQuery`.
- introduced distinct parameters designed specifically for pull queries. Before this update, the parameters sent to both the 'query' and 'query-stream' endpoints were shared between pull and push queries.

Expand Down
2 changes: 1 addition & 1 deletion docs/cdc.md
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ class Program

var semaphoreSlim = new SemaphoreSlim(0, 1);

var cdcSubscription = context.CreateQueryStream<IoTSensorChange>("sqlserversensors")
var cdcSubscription = context.CreatePushQuery<IoTSensorChange>("sqlserversensors")
.WithOffsetResetPolicy(AutoOffsetReset.Latest)
.Where(c => c.Op != "r" && (c.After == null || c.After.SensorId != "d542a2b3-c"))
.Take(5)
Expand Down
4 changes: 2 additions & 2 deletions docs/data_types.md
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ MAP('c' := 2, 'd' := 4)['d']
```
Deeply nested types:
```C#
context.CreateQueryStream<Tweet>()
context.CreatePushQuery<Tweet>()
.Select(c => new
{
Map = new Dictionary<string, int[]>
Expand Down Expand Up @@ -186,7 +186,7 @@ INSERT INTO MyClasses (Dt, Ts, DtOffset) VALUES ('2021-04-01', '01:02:03', '2021
```

```C#
using var subscription = context.CreateQueryStream<MyClass>()
using var subscription = context.CreatePushQuery<MyClass>()
.Subscribe(onNext: m =>
{
Console.WriteLine($"Time types: {m.Dt} : {m.Ts} : {m.DtOffset}");
Expand Down
14 changes: 7 additions & 7 deletions docs/functions.md
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,7 @@ DATETOSTRING(18672, 'yyyy-MM-dd')

#### TIMESTAMPTOSTRING
```C#
new KSqlDBContext(ksqlDbUrl).CreateQueryStream<Movie>()
new KSqlDBContext(ksqlDbUrl).CreatePushQuery<Movie>()
.Select(c => K.Functions.TimestampToString(c.RowTime, "yyyy-MM-dd''T''HH:mm:ssX"))
```

Expand All @@ -165,7 +165,7 @@ FROM tweets EMIT CHANGES;
bool sorted = true;

var subscription = new KSqlDBContext(@"http://localhost:8088")
.CreateQueryStream<Movie>()
.CreatePushQuery<Movie>()
.Select(c => new
{
Entries = KSqlFunctions.Instance.Entries(new Dictionary<string, string>()
Expand Down Expand Up @@ -194,7 +194,7 @@ FROM movies_test EMIT CHANGES;
Converts any type to its string representation.

```C#
var query = context.CreateQueryStream<Movie>()
var query = context.CreatePushQuery<Movie>()
.GroupBy(c => c.Title)
.Select(c => new { Title = c.Key, Concatenated = K.Functions.Concat(c.Count().ToString(), "_Hello") });
```
Expand Down Expand Up @@ -276,7 +276,7 @@ Subscribe to the unbounded stream of events:
```C#
public IDisposable Invoke(IKSqlDBContext ksqlDbContext)
{
var subscription = ksqlDbContext.CreateQueryStream<Lambda>(fromItemName: "stream2")
var subscription = ksqlDbContext.CreatePushQuery<Lambda>(fromItemName: "stream2")
.WithOffsetResetPolicy(AutoOffsetReset.Earliest)
.Select(c => new
{
Expand Down Expand Up @@ -424,7 +424,7 @@ REDUCE(DictionaryInValues, 2, (s, k, v) => CEIL(s / v))
**v1.5.0**

```C#
var ksql = ksqlDbContext.CreateQueryStream<Lambda>()
var ksql = ksqlDbContext.CreatePushQuery<Lambda>()
.Select(c => new
{
Transformed = c.Lambda_Arr.Transform(x => x + 1),
Expand Down Expand Up @@ -453,7 +453,7 @@ By constructing the appropriate function call and providing the necessary parame
```C#
using ksqlDB.RestApi.Client.KSql.Query.Functions;

context.CreateQueryStream<Tweet>()
context.CreatePushQuery<Tweet>()
.Select(c => new { Col = KSql.Functions.Dynamic("IFNULL(Message, 'n/a')") as string, c.Id, c.Amount, c.Message });
```
The interesting part from the above query is:
Expand All @@ -477,7 +477,7 @@ You can achieve a dynamic function call in C# that returns an array by using the
```C#
using K = ksqlDB.RestApi.Client.KSql.Query.Functions.KSql;

context.CreateQueryStream<Tweet>()
context.CreatePushQuery<Tweet>()
.Select(c => K.F.Dynamic("ARRAY_DISTINCT(ARRAY[1, 1, 2, 3, 1, 2])") as int[])
.Subscribe(
message => Console.WriteLine($"{message[0]} - {message[^1]}"),
Expand Down
14 changes: 7 additions & 7 deletions docs/joins.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ var ksqlDbUrl = @"http://localhost:8088";

var context = new KSqlDBContext(ksqlDbUrl);

var query = (from o in context.CreateQueryStream<Order>()
var query = (from o in context.CreatePushQuery<Order>()
join p1 in Source.Of<Payment>() on o.PaymentId equals p1.Id
join s1 in Source.Of<Shipment>() on o.ShipmentId equals s1.Id into gj
from sa in gj.DefaultIfEmpty()
Expand Down Expand Up @@ -112,7 +112,7 @@ response = await restApiClient.InsertIntoAsync(shipment);
Left joins can be also constructed in the following (less readable) way:

```C#
var query2 = KSqlDBContext.CreateQueryStream<Order>()
var query2 = KSqlDBContext.CreatePushQuery<Order>()
.GroupJoin(Source.Of<Payment>(), c => c.OrderId, c => c.Id, (order, gj) => new
{
order,
Expand Down Expand Up @@ -141,7 +141,7 @@ SELECT o.OrderId OrderId, p.Id AS shipmentId
- specifies a time window for stream-stream joins

```C#
var query = from o in KSqlDBContext.CreateQueryStream<Order>()
var query = from o in KSqlDBContext.CreatePushQuery<Order>()
join p in Source.Of<Payment>().Within(Duration.OfHours(1), Duration.OfDays(5)) on o.OrderId equals p.Id
select new
{
Expand All @@ -166,7 +166,7 @@ Define nullable primitive value types in POCOs:

```C#
var source = new KSqlDBContext(@"http://localhost:8088")
.CreateQueryStream<Movie>()
.CreatePushQuery<Movie>()
.FullOuterJoin(
Source.Of<Lead_Actor>("Actors"),
movie => movie.Title,
Expand Down Expand Up @@ -211,7 +211,7 @@ SELECT m.Id Id, m.Title Title, m.Release_Year Release_Year, l.Title ActorTitle

**LEFT OUTER** joins will contain leftRecord-NULL records in the result stream, which means that the join contains NULL values for fields selected from the right-hand stream where no match is made.
```C#
var query = new KSqlDBContext(@"http://localhost:8088").CreateQueryStream<Movie>()
var query = new KSqlDBContext(@"http://localhost:8088").CreatePushQuery<Movie>()
.LeftJoin(
Source.Of<Lead_Actor>(),
movie => movie.Title,
Expand Down Expand Up @@ -242,7 +242,7 @@ Select all records for the right side of the join and the matching records from
```C#
using ksqlDB.RestApi.Client.KSql.Linq;

var query = KSqlDBContext.CreateQueryStream<Lead_Actor>(ActorsTableName)
var query = KSqlDBContext.CreatePushQuery<Lead_Actor>(ActorsTableName)
.RightJoin(
Source.Of<Movie>(MoviesTableName),
actor => actor.Title,
Expand Down Expand Up @@ -275,7 +275,7 @@ How to [join table and table](https://kafka-tutorials.confluent.io/join-a-table-
```C#
using ksqlDB.RestApi.Client.KSql.Linq;

var query = context.CreateQueryStream<Movie>()
var query = context.CreatePushQuery<Movie>()
.Join(
Source.Of<Lead_Actor>(nameof(Lead_Actor)),
movie => movie.Title,
Expand Down
16 changes: 8 additions & 8 deletions docs/ksqldbcontext.md
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ var contextOptions = new KSqlDBContextOptions(ksqlDbUrl);

await using var context = new KSqlDBContext(contextOptions);

using var disposable = context.CreateQueryStream<Movie>()
using var disposable = context.CreatePushQuery<Movie>()
.Subscribe(onNext: movie =>
{
Console.WriteLine($"{nameof(Movie)}: {movie.Id} - {movie.Title} - {movie.RowTime}");
Expand Down Expand Up @@ -80,10 +80,10 @@ using var disposable = context.CreateQuery<Movie>()
}, onError: error => { Console.WriteLine($"Exception: {error.Message}"); }, onCompleted: () => Console.WriteLine("Completed"));
```

⚠️ In version 6.0.0, CreateQuery has been merged into CreateQueryStream.
⚠️ In version 6.0.0, CreateQuery has been merged into CreatePushQuery.

# TFM netstandard 2.0 (.Net Framework, NetCoreApp 2.0 etc.)
The lack of support for **HTTP 2.0** in netstandard 2.0 prevents the exposure of `IKSqlDBContext.CreateQueryStream<TEntity>` in the current version. To address this limitation, `IKSqlDBContext.CreateQuery<TEntity>` was introduced as an alternative solution utilizing **HTTP 1.1** to provide the same functionality.
The lack of support for **HTTP 2.0** in netstandard 2.0 prevents the exposure of `IKSqlDBContext.CreatePushQuery<TEntity>` in the current version. To address this limitation, `IKSqlDBContext.CreateQuery<TEntity>` was introduced as an alternative solution utilizing **HTTP 1.1** to provide the same functionality.

## Basic auth
**v1.0.0**
Expand Down Expand Up @@ -144,7 +144,7 @@ internal class ApplicationKSqlDbContext : KSqlDBContext, Program.IApplicationKSq
{
}

public ksqlDB.RestApi.Client.KSql.Linq.IQbservable<Movie> Movies => CreateQueryStream<Movie>();
public ksqlDB.RestApi.Client.KSql.Linq.IQbservable<Movie> Movies => CreatePushQuery<Movie>();
}

public interface IApplicationKSqlDbContext : IKSqlDBContext
Expand Down Expand Up @@ -290,7 +290,7 @@ public class Worker : IHostedService, IDisposable
{
logger.LogInformation("Starting");

subscription = await context.CreateQueryStream<Movie>()
subscription = await context.CreatePushQuery<Movie>()
.WithOffsetResetPolicy(AutoOffsetReset.Earliest)
.SubscribeAsync(
onNext: movie => { },
Expand Down Expand Up @@ -374,7 +374,7 @@ var responseMessage = await context.SaveChangesAsync();
> ⚠When creating `KSqlDBContextOptions` using a constructor or through `KSqlDbContextOptionsBuilder`, the default behavior is to set the `auto.offset.reset` property to "earliest."
```C#
public static KSqlDBContextOptions CreateQueryStreamOptions(string ksqlDbUrl)
public static KSqlDBContextOptions CreatePushQueryOptions(string ksqlDbUrl)
{
var contextOptions = new KSqlDbContextOptionsBuilder()
.UseKSqlDb(ksqlDbUrl)
Expand Down Expand Up @@ -482,7 +482,7 @@ INSERT INTO Message (Message, Id, `Values`) VALUES ('Hello', 42, '123, abc');
Stream the result of the SELECT query into an existing stream and its underlying topic.

```C#
var response = await context.CreateQueryStream<Movie>("from_stream")
var response = await context.CreatePushQuery<Movie>("from_stream")
.Where(c => c.Title != "Apocalypse now")
.InsertIntoAsync("stream_name");

Expand All @@ -503,7 +503,7 @@ Alternatively an optional query ID can be used for INSERT INTO queries:
```C#
string queryId = "insert_query_123";

var response = await context.CreateQueryStream<Movie>("from_stream")
var response = await context.CreatePushQuery<Movie>("from_stream")
.Where(c => c.Title != "Apocalypse now")
.InsertIntoAsync("stream_name", queryId);
```
Expand Down
Loading

0 comments on commit 54dcb06

Please sign in to comment.