Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

docs: add three usecase for flow #1165

Merged
merged 23 commits into from
Sep 11, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion docs/user-guide/continuous-aggregation/overview.md
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ SELECT
min(size) as min_size,
max(size) as max_size,
avg(size) as avg_size,
sum(case when `size` > 550::double then 1::double else 0::double end) as high_size_count,
sum(case when `size` > 550 then 1 else 0 end) as high_size_count,
date_bin(INTERVAL '1 minutes', access_time) as time_window,
FROM ngx_access_log
GROUP BY
Expand Down Expand Up @@ -133,11 +133,15 @@ Here is the explanation of the columns in the `ngx_statistics` table:
- `time_window`: The time window of the aggregation.
- `update_at`: The time when the aggregation is updated.

<!-- TODO(discord9): improve auto create table then add back this feature explain, i.e. for now everything in group by is put to primary key, and time index is always a placeholder -->
discord9 marked this conversation as resolved.
Show resolved Hide resolved
<!-- if you don't manually create sink table, the Flow engine will automatically create it for you based on the query(i.e. using columns in `GROUP BY` as primary tags and time index), however, sometimes you may want to create the sink table manually to have more control over the schema. -->

## Next Steps

Congratulations you already have a preliminary understanding of the continuous aggregation feature.
Please refer to the following sections to learn more:

- [Usecase Examples](./usecase-example.md) provides more examples of how to use continuous aggregation in real-time analytics, monitoring, and dashboard.
- [Manage Flows](./manage-flow.md) describes how to create, update, and delete a flow. Each of your continuous aggregation query is a flow.
- [Write a Query](./query.md) describes how to write a continuous aggregation query.
- [Define Time Window](./define-time-window.md) describes how to define the time window for the continuous aggregation. Time window is an important attribute of your continuous aggregation query. It defines the time interval for the aggregation.
Expand Down
89 changes: 5 additions & 84 deletions docs/user-guide/continuous-aggregation/query.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,91 +15,12 @@ Only two kinds of expression are allowed after `SELECT` keyword:

The query should have a `FROM` clause to identify the source table. As the join clause is currently not supported, the query can only aggregate columns from a single table.

`GROUP BY` clause works as in a normal query. It groups the data by the specified columns. One special thing is the time window functions `hop()` and `tumble()` described in [Define Time Window](./define-time-window.md) part. They are used in the `GROUP BY` clause to define the time window for the aggregation. Other expressions in `GROUP BY` can be either literal, column or scalar expressions.

Others things like `ORDER BY`, `LIMIT`, `OFFSET` are not supported in the continuous aggregation query.
discord9 marked this conversation as resolved.
Show resolved Hide resolved

## Rewrite an existing query to a continuous aggregation query

Some of simple existing aggregation queries can be directly used as continuous aggregation queries. For example, the example in [overview](./overview.md) can be used to query both in standard SQL and continuous aggregation query, since it's also a valid SQL query without any flow-specific syntax or functions:
`WHERE` and `HAVING` clauses are supported in the continuous aggregation query. They work as in a normal query. The `WHERE` clause filters the data before aggregation, and the `HAVING` clause filters the data after aggregation.

```sql
SELECT
status,
count(client) AS total_logs,
min(size) as min_size,
max(size) as max_size,
avg(size) as avg_size,
sum(case when `size` > 550::double then 1::double else 0::double end) as high_size_count,
date_bin(INTERVAL '1 minutes', access_time) as time_window,
FROM ngx_access_log
GROUP BY
status,
time_window;
```
`DISTINCT` currently only work with `SELECT DISTINCT column1 ..` syntax. It is used to remove duplicate rows from the result set. Support for `SELECT count(DISTINCT column1) ...` is not available yet, but it will be added in the future.

However, there are other types of queries that cannot be directly used as continuous aggregation queries.
For example, a query that needs to compute percentiles would be unwise to repeatedly calculate the percentile for each time window everytime a new batch of data arrive. In this case, you can pre-aggregate the data into buckets of the desired size, and then calculate the percentile in the sink table using standard SQL when needed. The original SQL might be:
```sql
SELECT
status,
percentile_approx(size, 0.5) as median_size,
date_bin(INTERVAL '1 minutes', access_time) as time_window,
FROM ngx_access_log
GROUP BY
status,
time_window;
```
The above query can be rewritten to first aggregate the data into buckets of size 10, and then calculate the percentile in the sink table.
The flow query would be:
```sql
CREATE FLOW calc_ngx_distribution
SINK TO ngx_distribution
AS
SELECT
status,
trunc(size, -1) as bucket,
count(client) AS total_logs,
date_bin(INTERVAL '1 minutes', access_time) as time_window,
FROM ngx_access_log
GROUP BY
status,
time_window,
bucket;
```

And then you can calculate the percentile in the sink table using standard SQL:
```sql
SELECT
outer.status,
outer.time_window,
outer.bucket,
SUM(case when in1.bucket <= outer.bucket then in1.total_logs else 0 end) * 100 / SUM(in1.total_logs) AS percentile
FROM ngx_distribution AS outer
JOIN ngx_distribution AS in1
ON in1.status = outer.status
AND in1.time_window = outer.time_window
GROUP BY
status,
time_window,
bucket
ORDER BY status, time_window, bucket;
```

The SQL query groups the data by status, time_window, and bucket. The percentile column calculates the percentage within each group by taking the sum of all buckets not greater than the current bucket and dividing it by the total count of all logs. The result would be something like this:
`GROUP BY` clause works as in a normal query. It groups the data by the specified columns. One special thing is the time window functions `hop()` and `tumble()` described in [Define Time Window](./define-time-window.md) part. They are used in the `GROUP BY` clause to define the time window for the aggregation. Other expressions in `GROUP BY` can be either literal, column or scalar expressions.

```sql
status | time_window | bucket | percentile
--------+----------------------------+--------+------------
404 | 1970-01-01 00:00:00.000000 | 0 | 22
404 | 1970-01-01 00:00:00.000000 | 1 | 55
404 | 1970-01-01 00:00:00.000000 | 2 | 66
404 | 1970-01-01 00:00:00.000000 | 3 | 100
(4 rows)
```
Others things like `ORDER BY`, `LIMIT`, `OFFSET` are not supported in the continuous aggregation query.
discord9 marked this conversation as resolved.
Show resolved Hide resolved

<!--
TODO(discord9): add example for percentile query
TODO(discord9): add example for tumble and hop once we support window table function
Another example that require rewrite is for query that needs overlapping timewindow, hence `hop()` function is needed.
-->
See [Usecase Examples](./usecase-example.md) for more examples of how to use continuous aggregation in real-time analytics, monitoring, and dashboard.
223 changes: 223 additions & 0 deletions docs/user-guide/continuous-aggregation/usecase-example.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,223 @@
# Usecase Examples
Following are three major usecase examples for continuous aggregation:

1. **Real-time Analytics**: A real-time analytics platform that continuously aggregates data from a stream of events, delivering immediate insights while optionally downsampling the data to a lower resolution. For instance, this system can compile data from a high-frequency stream of log events (e.g., occurring every millisecond) to provide up-to-the-minute insights such as the number of requests per minute, average response times, and error rates per minute.

2. **Real-time Monitoring**: A real-time monitoring system that continuously aggregates data from a stream of events and provides real-time alerts based on the aggregated data. For example, a system that aggregates data from a stream of sensor events and provides real-time alerts when the temperature exceeds a certain threshold.

3. **Real-time Dashboard**: A real-time dashboard that shows the number of requests per minute, the average response time, and the number of errors per minute. This dashboard can be used to monitor the health of the system and to detect any anomalies in the system.

In all these usecases, the continuous aggregation system continuously aggregates data from a stream of events and provides real-time insights and alerts based on the aggregated data. The system can also downsample the data to a lower resolution to reduce the amount of data stored and processed. This allows the system to provide real-time insights and alerts while keeping the data storage and processing costs low.

## Real-time analytics example

See [Overview](/user-guide/continuous-aggregation/overview.md#quick-start-with-an-example) for an example of real-time analytics. Which is to calculate the total number of logs, the minimum size, the maximum size, the average size, and the number of packets with the size greater than 550 for each status code in a 1-minute fixed window for access logs.

Another example of real-time analytics is to get all distinct country from the `ngx_access_log` table. The query for continuous aggregation would be:

```sql
/* input table */
CREATE TABLE ngx_access_log (
client STRING,
country STRING,
access_time TIMESTAMP TIME INDEX
);

/* sink table */
CREATE TABLE ngx_country (
country STRING,
update_at TIMESTAMP,
__ts_placeholder TIMESTAMP TIME INDEX,
PRIMARY KEY(country)
);

/* create flow task to calculate the distinct country */
CREATE FLOW calc_ngx_country
SINK TO ngx_country
AS
SELECT
DISTINCT country,
FROM ngx_access_log;
```

now that we have created the flow task, we can insert some data into the source table `ngx_access_log`:

```sql
/* insert some data */
INSERT INTO ngx_access_log VALUES
("client1", "US", "2022-01-01 00:00:00"),
("client2", "US", "2022-01-01 00:00:01"),
("client3", "UK", "2022-01-01 00:00:02"),
("client4", "UK", "2022-01-01 00:00:03"),
("client5", "CN", "2022-01-01 00:00:04"),
("client6", "CN", "2022-01-01 00:00:05"),
("client7", "JP", "2022-01-01 00:00:06"),
("client8", "JP", "2022-01-01 00:00:07"),
("client9", "KR", "2022-01-01 00:00:08"),
("client10", "KR", "2022-01-01 00:00:09");

```

<!-- TODO(discord9): explain what `ADMIN FLUSH_FLOW('calc_ngx_country')` do -->
Wait for one second for the Flow to write the result to the sink table and then query:


```sql
select * from ngx_country;
```

Or if you want to group the data by time window, you can use the following query:

```sql
/* input table create same as above */
/* sink table */
CREATE TABLE ngx_country (
country STRING,
time_window TIMESTAMP TIME INDEX,
update_at TIMESTAMP,
PRIMARY KEY(country)
);
CREATE FLOW calc_ngx_country
SINK TO ngx_country
AS
SELECT
DISTINCT country,
date_bin(INTERVAL '1 hour', access_time) as time_window,
FROM ngx_access_log
GROUP BY
country,
time_window;
/* insert data using the same data as above */
```
discord9 marked this conversation as resolved.
Show resolved Hide resolved

discord9 marked this conversation as resolved.
Show resolved Hide resolved
The above query puts the data from the `ngx_access_log` table into the `ngx_country` table. It calculates the distinct country for each time window. The `date_bin` function is used to group the data into one-hour intervals. The `ngx_country` table will be continuously updated with the aggregated data, providing real-time insights into the distinct countries that are accessing the system.

Note that there is currently no persistent storage for the internal state of the flow. The internal state refers to the intermediate state used in computing incremental query results, such as the accumulator's value for an aggregation query (e.g., `count(col)`'s accumulator records the current count number). However, there is persistent storage for the data in the sink table.
Therefore, it is recommended to use an appropriate time window (e.g., hourly) to minimize data loss. This is because if the internal state is lost, the related data within that time window will also be lost.

## Real-time monitoring example

Consider a usecase where you have a stream of sensor events from a network of temperature sensors that you want to monitor in real-time. The sensor events contain information such as the sensor ID, the temperature reading, the timestamp of the reading, and the location of the sensor. You want to continuously aggregate this data to provide real-time alerts when the temperature exceeds a certain threshold. Then the query for continuous aggregation would be:

```sql
/* create input table */
CREATE TABLE temp_sensor_data (
sensor_id INT,
loc STRING,
temperature DOUBLE,
ts TIMESTAMP TIME INDEX
);

/* create sink table */
CREATE TABLE temp_alerts (
sensor_id INT,
loc STRING,
max_temp DOUBLE,
update_at TIMESTAMP TIME INDEX,
PRIMARY KEY(sensor_id, loc)
);

CREATE FLOW temp_monitoring
SINK TO temp_alerts
AS
SELECT
sensor_id,
loc,
max(temperature) as max_temp,
FROM temp_sensor_data
GROUP BY
sensor_id,
loc
HAVING max_temp > 100;
```

Now that we have created the flow task, we can insert some data into the source table `temp_sensor_data`:

```sql

INSERT INTO temp_sensor_data VALUES
(1, "room1", 98.5, "2022-01-01 00:00:00"),
(2, "room2", 99.5, "2022-01-01 00:00:01");
```
table should be empty now, but still wait at least one second for flow to update results to sink table:

```sql
SELECT * FROM temp_alerts;
```

```sql

INSERT INTO temp_sensor_data VALUES
(1, "room1", 101.5, "2022-01-01 00:00:02"),
(2, "room2", 102.5, "2022-01-01 00:00:03");
```
wait at least one second for flow to update results to sink table:
```sql
/* wait at least one second for flow to update results to sink table */
SELECT * FROM temp_alerts;
```

The above query continuously aggregates the data from the `temp_sensor_data` table into the `temp_alerts` table. It calculates the maximum temperature reading for each sensor and location and filters out the data where the maximum temperature exceeds 100 degrees. The `temp_alerts` table will be continuously updated with the aggregated data, providing real-time alerts (Which is a new row in the `temp_alerts` table) when the temperature exceeds the threshold.


## Real-time dashboard

Consider a usecase in which you need a bar graph that show the distribution of packet sizes for each status code to monitor the health of the system. The query for continuous aggregation would be:

```sql
/* create input table */
CREATE TABLE ngx_access_log (
client STRING,
stat INT,
size INT,
access_time TIMESTAMP TIME INDEX
);
/* create sink table */
CREATE TABLE ngx_distribution (
stat INT,
bucket_size INT,
total_logs BIGINT,
time_window TIMESTAMP TIME INDEX,
update_at TIMESTAMP, /* auto generated column to store the last update time */
PRIMARY KEY(stat, bucket_size)
);
/* create flow task to calculate the distribution of packet sizes for each status code */
CREATE FLOW calc_ngx_distribution SINK TO ngx_distribution AS
SELECT
stat,
trunc(size, -1)::INT as bucket_size,
count(client) AS total_logs,
date_bin(INTERVAL '1 minutes', access_time) as time_window,
FROM
ngx_access_log
GROUP BY
stat,
time_window,
bucket_size;
```

Now that we have created the flow task, we can insert some data into the source table `ngx_access_log`:

```sql
INSERT INTO ngx_access_log VALUES
("cli1", 200, 100, "2022-01-01 00:00:00"),
("cli2", 200, 104, "2022-01-01 00:00:01"),
("cli3", 200, 120, "2022-01-01 00:00:02"),
("cli4", 200, 124, "2022-01-01 00:00:03"),
("cli5", 200, 140, "2022-01-01 00:00:04"),
("cli6", 404, 144, "2022-01-01 00:00:05"),
("cli7", 404, 160, "2022-01-01 00:00:06"),
("cli8", 404, 164, "2022-01-01 00:00:07"),
("cli9", 404, 180, "2022-01-01 00:00:08"),
("cli10", 404, 184, "2022-01-01 00:00:09");
```
wait at least one second for flow to update results to sink table:
```sql
SELECT * FROM ngx_distribution;
```

The above query puts the data from the `ngx_access_log` table into the `ngx_distribution` table. It calculates the total number of logs for each status code and packet size bucket (in this case, since `trunc`'s second argument is -1, meaning a bucket size of 10) for each time window. The `date_bin` function is used to group the data into one-minute intervals. The `ngx_distribution` table will be continuously updated with the aggregated data, providing real-time insights into the distribution of packet sizes for each status code.

## Conclusion

Continuous aggregation is a powerful tool for real-time analytics, monitoring, and dashboarding. It allows you to continuously aggregate data from a stream of events and provide real-time insights and alerts based on the aggregated data. By downsampling the data to a lower resolution, you can reduce the amount of data stored and processed, making it easier to provide real-time insights and alerts while keeping the data storage and processing costs low. Continuous aggregation is a key component of any real-time data processing system and can be used in a wide range of usecases to provide real-time insights and alerts based on streaming data.
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ SELECT
min(size) as min_size,
max(size) as max_size,
avg(size) as avg_size,
sum(case when `size` > 550::double then 1::double else 0::double end) as high_size_count,
sum(case when `size` > 550 then 1 else 0 end) as high_size_count,
date_bin(INTERVAL '1 minutes', access_time) as time_window,
FROM ngx_access_log
GROUP BY
Expand Down Expand Up @@ -141,6 +141,8 @@ SELECT * FROM ngx_statistics;
恭喜你已经初步了解了持续聚合功能。
请参考以下章节了解更多:


- [用例](./usecase-example.md) 提供了更多关于如何在实时分析、监控和仪表板中使用持续聚合的示例。
- [管理 Flow](./manage-flow.md) 描述了如何创建、更新和删除 flow。你的每个持续聚合查询都是一个 flow。
- [编写查询语句](./query.md) 描述了如何编写持续聚合查询。
- [定义时间窗口](./define-time-window.md) 描述了如何为持续聚合定义时间窗口。时间窗口是持续聚合查询的一个重要属性,它定义了聚合的时间间隔。
Expand Down
Loading
Loading