From cc200b8542cf328fd5f61aeec79a65d84bbfd0cf Mon Sep 17 00:00:00 2001 From: discord9 Date: Mon, 2 Sep 2024 14:55:47 +0800 Subject: [PATCH 01/23] docs: 3 usecase --- .../continuous-aggregation/usecase-example.md | 88 +++++++++++++++++++ 1 file changed, 88 insertions(+) create mode 100644 docs/user-guide/continuous-aggregation/usecase-example.md diff --git a/docs/user-guide/continuous-aggregation/usecase-example.md b/docs/user-guide/continuous-aggregation/usecase-example.md new file mode 100644 index 000000000..089848e04 --- /dev/null +++ b/docs/user-guide/continuous-aggregation/usecase-example.md @@ -0,0 +1,88 @@ +# Usecase Example +Following are three major usecase examples for continuous aggregation: + +1. **Real-time Analytics**: A real-time analytics platform that continuously aggregates data from a stream of events, delivering immediate insights while optionally downsampling the data to a lower resolution. For instance, this system can compile data from a high-frequency stream of log events (e.g., occurring every millisecond) to provide up-to-the-minute insights such as the number of requests per minute, average response times, and error rates per minute. + +2. **Real-time Monitoring**: A real-time monitoring system that continuously aggregates data from a stream of events and provides real-time alerts based on the aggregated data. For example, a system that aggregates data from a stream of sensor events and provides real-time alerts when the temperature exceeds a certain threshold. + +3. **Real-time Dashboard**: A real-time dashboard that shows the number of requests per minute, the average response time, and the number of errors per minute. This dashboard can be used to monitor the health of the system and to detect any anomalies in the system. + +In all these usecases, the continuous aggregation system continuously aggregates data from a stream of events and provides real-time insights and alerts based on the aggregated data. The system can also downsample the data to a lower resolution to reduce the amount of data stored and processed. This allows the system to provide real-time insights and alerts while keeping the data storage and processing costs low. + +## Real-time Analytics Example + +Consider a usecase where you have a stream of log events from a web server that you want to analyze in real-time. The log events contain information such as the status of the request, the size of the response, the client IP address, and the timestamp of the request. You want to continuously aggregate this data to provide real-time analytics on the number of requests per minute, the min/max/average packet size, and the error rate per minute. Then the query for continuous aggregation would be: + +```sql +CREATE FLOW ngx_aggregation +SINK TO ngx_statistics +AS +SELECT + status, + count(client) AS total_logs, + sum(case when status >= 400 then 1 end) as error_logs, + min(size) as min_size, + max(size) as max_size, + avg(size) as avg_size +FROM ngx_access_log +GROUP BY + status, + date_bin(INTERVAL '1 minutes', access_time, '2024-01-01 00:00:00'::Timestamp); +``` + +The above query continuously aggregates the data from the `ngx_access_log` table into the `ngx_statistics` table. It calculates the total number of logs, the number of error logs, the min/max/average packet size, and the error rate per minute. The `date_bin` function is used to group the data into one-minute intervals. The `ngx_statistics` table will be continuously updated with the aggregated data, providing real-time insights into the web server's performance. + +## Real-time Monitoring Example + +Consider a usecase where you have a stream of sensor events from a network of temperature sensors that you want to monitor in real-time. The sensor events contain information such as the sensor ID, the temperature reading, the timestamp of the reading, and the location of the sensor. You want to continuously aggregate this data to provide real-time alerts when the temperature exceeds a certain threshold. Then the query for continuous aggregation would be: + +```sql +CREATE TABLE temp_sensor_data ( + sensor_id INT, + loc STRING, + temperature DOUBLE, + ts TIMESTAMP TIME INDEX +); + +CREATE FLOW temp_monitoring +SINK TO temp_alerts +AS +SELECT + sensor_id, + loc, + max(temperature) as max_temp, +FROM temp_sensor_data +GROUP BY + sensor_id, + loc +HAVING max_temp > 100; +``` + +The above query continuously aggregates the data from the `temp_sensor_data` table into the `temp_alerts` table. It calculates the maximum temperature reading for each sensor and location and filters out the data where the maximum temperature exceeds 100 degrees. The `temp_alerts` table will be continuously updated with the aggregated data, providing real-time alerts (Which is a new row in the `temp_alerts` table) when the temperature exceeds the threshold. + + +## Real-time Dashboard + +Consider a usecase in which you need a bar graph that show the distribution of packet sizes for each status code to monitor the health of the system. The query for continuous aggregation would be: + +```sql +CREATE FLOW calc_ngx_distribution +SINK TO ngx_distribution +AS +SELECT + status, + trunc(size, -1) as bucket_size, + count(client) AS total_logs, + date_bin(INTERVAL '1 minutes', access_time) as time_window, +FROM ngx_access_log +GROUP BY + status, + time_window, + bucket; +``` + +The above query put the data from the `ngx_access_log` table into the `ngx_distribution` table. It calculates the total number of logs for each status code and packet size bucket for each time window. The `date_bin` function is used to group the data into one-minute intervals. The `ngx_distribution` table will be continuously updated with the aggregated data, providing real-time insights into the distribution of packet sizes for each status code. + +# Conclusion + +Continuous aggregation is a powerful tool for real-time analytics, monitoring, and dashboarding. It allows you to continuously aggregate data from a stream of events and provide real-time insights and alerts based on the aggregated data. By downsampling the data to a lower resolution, you can reduce the amount of data stored and processed, making it easier to provide real-time insights and alerts while keeping the data storage and processing costs low. Continuous aggregation is a key component of any real-time data processing system and can be used in a wide range of usecases to provide real-time insights and alerts based on streaming data. \ No newline at end of file From 7e7e3aec7b58e67f7f53a68e3a4ce12bbf7b2c3b Mon Sep 17 00:00:00 2001 From: discord9 Date: Mon, 2 Sep 2024 14:58:44 +0800 Subject: [PATCH 02/23] note for auto create table --- docs/user-guide/continuous-aggregation/overview.md | 2 ++ 1 file changed, 2 insertions(+) diff --git a/docs/user-guide/continuous-aggregation/overview.md b/docs/user-guide/continuous-aggregation/overview.md index 313c0fe68..1826fde87 100644 --- a/docs/user-guide/continuous-aggregation/overview.md +++ b/docs/user-guide/continuous-aggregation/overview.md @@ -133,6 +133,8 @@ Here is the explanation of the columns in the `ngx_statistics` table: - `time_window`: The time window of the aggregation. - `update_at`: The time when the aggregation is updated. +NOTE: if you don't manually create sink table, the Flow engine will automatically create it for you based on the query(i.e. using columns in `GROUP BY` as primary tags and time index), however, sometimes you may want to create the sink table manually to have more control over the schema. + ## Next Steps Congratulations you already have a preliminary understanding of the continuous aggregation feature. From 97d89968e88a0a4c3064f019c67b9d7174fe8e07 Mon Sep 17 00:00:00 2001 From: discord9 Date: Mon, 2 Sep 2024 15:01:50 +0800 Subject: [PATCH 03/23] explain bar graph --- docs/user-guide/continuous-aggregation/usecase-example.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/user-guide/continuous-aggregation/usecase-example.md b/docs/user-guide/continuous-aggregation/usecase-example.md index 089848e04..2b28512eb 100644 --- a/docs/user-guide/continuous-aggregation/usecase-example.md +++ b/docs/user-guide/continuous-aggregation/usecase-example.md @@ -81,7 +81,7 @@ GROUP BY bucket; ``` -The above query put the data from the `ngx_access_log` table into the `ngx_distribution` table. It calculates the total number of logs for each status code and packet size bucket for each time window. The `date_bin` function is used to group the data into one-minute intervals. The `ngx_distribution` table will be continuously updated with the aggregated data, providing real-time insights into the distribution of packet sizes for each status code. +The above query puts the data from the `ngx_access_log` table into the `ngx_distribution` table. It calculates the total number of logs for each status code and packet size bucket (in this case, since `trunc`'s second argument is -1, meaning a bucket size of 10) for each time window. The `date_bin` function is used to group the data into one-minute intervals. The `ngx_distribution` table will be continuously updated with the aggregated data, providing real-time insights into the distribution of packet sizes for each status code. # Conclusion From a15ef29ba8a91e804e6b1ec44b8019b229887265 Mon Sep 17 00:00:00 2001 From: discord9 Date: Mon, 2 Sep 2024 17:40:57 +0800 Subject: [PATCH 04/23] refactor: per review --- .../continuous-aggregation/overview.md | 7 +- .../continuous-aggregation/query.md | 105 ------------------ .../continuous-aggregation/usecase-example.md | 31 +----- sidebars.ts | 2 +- 4 files changed, 11 insertions(+), 134 deletions(-) delete mode 100644 docs/user-guide/continuous-aggregation/query.md diff --git a/docs/user-guide/continuous-aggregation/overview.md b/docs/user-guide/continuous-aggregation/overview.md index 1826fde87..3bc3fc11b 100644 --- a/docs/user-guide/continuous-aggregation/overview.md +++ b/docs/user-guide/continuous-aggregation/overview.md @@ -64,7 +64,7 @@ SELECT min(size) as min_size, max(size) as max_size, avg(size) as avg_size, - sum(case when `size` > 550::double then 1::double else 0::double end) as high_size_count, + sum(case when `size` > 550 then 1 else 0 end) as high_size_count, date_bin(INTERVAL '1 minutes', access_time) as time_window, FROM ngx_access_log GROUP BY @@ -133,14 +133,15 @@ Here is the explanation of the columns in the `ngx_statistics` table: - `time_window`: The time window of the aggregation. - `update_at`: The time when the aggregation is updated. -NOTE: if you don't manually create sink table, the Flow engine will automatically create it for you based on the query(i.e. using columns in `GROUP BY` as primary tags and time index), however, sometimes you may want to create the sink table manually to have more control over the schema. + + ## Next Steps Congratulations you already have a preliminary understanding of the continuous aggregation feature. Please refer to the following sections to learn more: +- [Usecase Example](./usecase-example.md) provides more examples of how to use continuous aggregation in real-time analytics, monitoring, and dashboard. - [Manage Flows](./manage-flow.md) describes how to create, update, and delete a flow. Each of your continuous aggregation query is a flow. -- [Write a Query](./query.md) describes how to write a continuous aggregation query. - [Define Time Window](./define-time-window.md) describes how to define the time window for the continuous aggregation. Time window is an important attribute of your continuous aggregation query. It defines the time interval for the aggregation. - [Expression](./expression.md) is a reference of available expressions in the continuous aggregation query. diff --git a/docs/user-guide/continuous-aggregation/query.md b/docs/user-guide/continuous-aggregation/query.md deleted file mode 100644 index b9e574f0b..000000000 --- a/docs/user-guide/continuous-aggregation/query.md +++ /dev/null @@ -1,105 +0,0 @@ -# Write a Query - -This chapter describes how to write a continuous aggregation query in GreptimeDB. Query here should be a `SELECT` statement with either aggregating functions or non-aggregating functions (i.e., scalar function). - -Generally speaking, the `SQL` part in the flow is just like a normal `SELECT` clause with a few difference. -The grammar of the query is like the following: - -```sql -SELECT AGGR_FUNCTION(column1, column2,..) FROM GROUP BY TIME_WINDOW_FUNCTION(); -``` - -Only two kinds of expression are allowed after `SELECT` keyword: -- Aggregate functions: see the reference in [Expression](./expression.md) for detail. -- Scalar functions: like `col`, `to_lowercase(col)`, `col + 1`, etc. This part is the same as the normal `SELECT` clause in GreptimeDB. - -The query should have a `FROM` clause to identify the source table. As the join clause is currently not supported, the query can only aggregate columns from a single table. - -`GROUP BY` clause works as in a normal query. It groups the data by the specified columns. One special thing is the time window functions `hop()` and `tumble()` described in [Define Time Window](./define-time-window.md) part. They are used in the `GROUP BY` clause to define the time window for the aggregation. Other expressions in `GROUP BY` can be either literal, column or scalar expressions. - -Others things like `ORDER BY`, `LIMIT`, `OFFSET` are not supported in the continuous aggregation query. - -## Rewrite an existing query to a continuous aggregation query - -Some of simple existing aggregation queries can be directly used as continuous aggregation queries. For example, the example in [overview](./overview.md) can be used to query both in standard SQL and continuous aggregation query, since it's also a valid SQL query without any flow-specific syntax or functions: - -```sql -SELECT - status, - count(client) AS total_logs, - min(size) as min_size, - max(size) as max_size, - avg(size) as avg_size, - sum(case when `size` > 550::double then 1::double else 0::double end) as high_size_count, - date_bin(INTERVAL '1 minutes', access_time) as time_window, -FROM ngx_access_log -GROUP BY - status, - time_window; -``` - -However, there are other types of queries that cannot be directly used as continuous aggregation queries. -For example, a query that needs to compute percentiles would be unwise to repeatedly calculate the percentile for each time window everytime a new batch of data arrive. In this case, you can pre-aggregate the data into buckets of the desired size, and then calculate the percentile in the sink table using standard SQL when needed. The original SQL might be: -```sql -SELECT - status, - percentile_approx(size, 0.5) as median_size, - date_bin(INTERVAL '1 minutes', access_time) as time_window, -FROM ngx_access_log -GROUP BY - status, - time_window; -``` -The above query can be rewritten to first aggregate the data into buckets of size 10, and then calculate the percentile in the sink table. -The flow query would be: -```sql -CREATE FLOW calc_ngx_distribution -SINK TO ngx_distribution -AS -SELECT - status, - trunc(size, -1) as bucket, - count(client) AS total_logs, - date_bin(INTERVAL '1 minutes', access_time) as time_window, -FROM ngx_access_log -GROUP BY - status, - time_window, - bucket; -``` - -And then you can calculate the percentile in the sink table using standard SQL: -```sql -SELECT - outer.status, - outer.time_window, - outer.bucket, - SUM(case when in1.bucket <= outer.bucket then in1.total_logs else 0 end) * 100 / SUM(in1.total_logs) AS percentile -FROM ngx_distribution AS outer -JOIN ngx_distribution AS in1 -ON in1.status = outer.status -AND in1.time_window = outer.time_window -GROUP BY - status, - time_window, - bucket -ORDER BY status, time_window, bucket; -``` - -The SQL query groups the data by status, time_window, and bucket. The percentile column calculates the percentage within each group by taking the sum of all buckets not greater than the current bucket and dividing it by the total count of all logs. The result would be something like this: - -```sql - status | time_window | bucket | percentile ---------+----------------------------+--------+------------ - 404 | 1970-01-01 00:00:00.000000 | 0 | 22 - 404 | 1970-01-01 00:00:00.000000 | 1 | 55 - 404 | 1970-01-01 00:00:00.000000 | 2 | 66 - 404 | 1970-01-01 00:00:00.000000 | 3 | 100 -(4 rows) -``` - - \ No newline at end of file diff --git a/docs/user-guide/continuous-aggregation/usecase-example.md b/docs/user-guide/continuous-aggregation/usecase-example.md index 2b28512eb..52f1ea345 100644 --- a/docs/user-guide/continuous-aggregation/usecase-example.md +++ b/docs/user-guide/continuous-aggregation/usecase-example.md @@ -1,4 +1,4 @@ -# Usecase Example +# Usecase Examples Following are three major usecase examples for continuous aggregation: 1. **Real-time Analytics**: A real-time analytics platform that continuously aggregates data from a stream of events, delivering immediate insights while optionally downsampling the data to a lower resolution. For instance, this system can compile data from a high-frequency stream of log events (e.g., occurring every millisecond) to provide up-to-the-minute insights such as the number of requests per minute, average response times, and error rates per minute. @@ -9,30 +9,11 @@ Following are three major usecase examples for continuous aggregation: In all these usecases, the continuous aggregation system continuously aggregates data from a stream of events and provides real-time insights and alerts based on the aggregated data. The system can also downsample the data to a lower resolution to reduce the amount of data stored and processed. This allows the system to provide real-time insights and alerts while keeping the data storage and processing costs low. -## Real-time Analytics Example +## Real-time analytics example -Consider a usecase where you have a stream of log events from a web server that you want to analyze in real-time. The log events contain information such as the status of the request, the size of the response, the client IP address, and the timestamp of the request. You want to continuously aggregate this data to provide real-time analytics on the number of requests per minute, the min/max/average packet size, and the error rate per minute. Then the query for continuous aggregation would be: +See [Overview](overview.md) for an example of real-time analytics. Which is to calculate the total number of logs, the minimum size, the maximum size, the average size, and the number of packets with the size greater than 550 for each status code in a 1-minute fixed window for access logs. -```sql -CREATE FLOW ngx_aggregation -SINK TO ngx_statistics -AS -SELECT - status, - count(client) AS total_logs, - sum(case when status >= 400 then 1 end) as error_logs, - min(size) as min_size, - max(size) as max_size, - avg(size) as avg_size -FROM ngx_access_log -GROUP BY - status, - date_bin(INTERVAL '1 minutes', access_time, '2024-01-01 00:00:00'::Timestamp); -``` - -The above query continuously aggregates the data from the `ngx_access_log` table into the `ngx_statistics` table. It calculates the total number of logs, the number of error logs, the min/max/average packet size, and the error rate per minute. The `date_bin` function is used to group the data into one-minute intervals. The `ngx_statistics` table will be continuously updated with the aggregated data, providing real-time insights into the web server's performance. - -## Real-time Monitoring Example +## Real-time monitoring example Consider a usecase where you have a stream of sensor events from a network of temperature sensors that you want to monitor in real-time. The sensor events contain information such as the sensor ID, the temperature reading, the timestamp of the reading, and the location of the sensor. You want to continuously aggregate this data to provide real-time alerts when the temperature exceeds a certain threshold. Then the query for continuous aggregation would be: @@ -61,7 +42,7 @@ HAVING max_temp > 100; The above query continuously aggregates the data from the `temp_sensor_data` table into the `temp_alerts` table. It calculates the maximum temperature reading for each sensor and location and filters out the data where the maximum temperature exceeds 100 degrees. The `temp_alerts` table will be continuously updated with the aggregated data, providing real-time alerts (Which is a new row in the `temp_alerts` table) when the temperature exceeds the threshold. -## Real-time Dashboard +## Real-time dashboard Consider a usecase in which you need a bar graph that show the distribution of packet sizes for each status code to monitor the health of the system. The query for continuous aggregation would be: @@ -83,6 +64,6 @@ GROUP BY The above query puts the data from the `ngx_access_log` table into the `ngx_distribution` table. It calculates the total number of logs for each status code and packet size bucket (in this case, since `trunc`'s second argument is -1, meaning a bucket size of 10) for each time window. The `date_bin` function is used to group the data into one-minute intervals. The `ngx_distribution` table will be continuously updated with the aggregated data, providing real-time insights into the distribution of packet sizes for each status code. -# Conclusion +## Conclusion Continuous aggregation is a powerful tool for real-time analytics, monitoring, and dashboarding. It allows you to continuously aggregate data from a stream of events and provide real-time insights and alerts based on the aggregated data. By downsampling the data to a lower resolution, you can reduce the amount of data stored and processed, making it easier to provide real-time insights and alerts while keeping the data storage and processing costs low. Continuous aggregation is a key component of any real-time data processing system and can be used in a wide range of usecases to provide real-time insights and alerts based on streaming data. \ No newline at end of file diff --git a/sidebars.ts b/sidebars.ts index ee78b5c5c..dde53a3c6 100644 --- a/sidebars.ts +++ b/sidebars.ts @@ -123,8 +123,8 @@ const sidebars: SidebarsConfig = { label: 'Continuous Aggregation', items: [ 'user-guide/continuous-aggregation/overview', + 'user-guide/continuous-aggregation/usecase-example', 'user-guide/continuous-aggregation/manage-flow', - 'user-guide/continuous-aggregation/query', 'user-guide/continuous-aggregation/define-time-window', 'user-guide/continuous-aggregation/expression', ], From 043cec0da6fe6b784318b8754efb7d443d4e8d13 Mon Sep 17 00:00:00 2001 From: discord9 Date: Tue, 3 Sep 2024 17:17:31 +0800 Subject: [PATCH 05/23] chore: per review --- .../continuous-aggregation/overview.md | 2 +- .../continuous-aggregation/query.md | 20 +++++++++++++++++++ 2 files changed, 21 insertions(+), 1 deletion(-) create mode 100644 docs/user-guide/continuous-aggregation/query.md diff --git a/docs/user-guide/continuous-aggregation/overview.md b/docs/user-guide/continuous-aggregation/overview.md index 3bc3fc11b..9ca8e11bf 100644 --- a/docs/user-guide/continuous-aggregation/overview.md +++ b/docs/user-guide/continuous-aggregation/overview.md @@ -141,7 +141,7 @@ Here is the explanation of the columns in the `ngx_statistics` table: Congratulations you already have a preliminary understanding of the continuous aggregation feature. Please refer to the following sections to learn more: -- [Usecase Example](./usecase-example.md) provides more examples of how to use continuous aggregation in real-time analytics, monitoring, and dashboard. +- [Usecase Examples](./usecase-example.md) provides more examples of how to use continuous aggregation in real-time analytics, monitoring, and dashboard. - [Manage Flows](./manage-flow.md) describes how to create, update, and delete a flow. Each of your continuous aggregation query is a flow. - [Define Time Window](./define-time-window.md) describes how to define the time window for the continuous aggregation. Time window is an important attribute of your continuous aggregation query. It defines the time interval for the aggregation. - [Expression](./expression.md) is a reference of available expressions in the continuous aggregation query. diff --git a/docs/user-guide/continuous-aggregation/query.md b/docs/user-guide/continuous-aggregation/query.md new file mode 100644 index 000000000..490e20f50 --- /dev/null +++ b/docs/user-guide/continuous-aggregation/query.md @@ -0,0 +1,20 @@ +# Write a Query + +This chapter describes how to write a continuous aggregation query in GreptimeDB. Query here should be a `SELECT` statement with either aggregating functions or non-aggregating functions (i.e., scalar function). + +Generally speaking, the `SQL` part in the flow is just like a normal `SELECT` clause with a few difference. +The grammar of the query is like the following: + +```sql +SELECT AGGR_FUNCTION(column1, column2,..) FROM GROUP BY TIME_WINDOW_FUNCTION(); +``` + +Only two kinds of expression are allowed after `SELECT` keyword: +- Aggregate functions: see the reference in [Expression](./expression.md) for detail. +- Scalar functions: like `col`, `to_lowercase(col)`, `col + 1`, etc. This part is the same as the normal `SELECT` clause in GreptimeDB. + +The query should have a `FROM` clause to identify the source table. As the join clause is currently not supported, the query can only aggregate columns from a single table. + +`GROUP BY` clause works as in a normal query. It groups the data by the specified columns. One special thing is the time window functions `hop()` and `tumble()` described in [Define Time Window](./define-time-window.md) part. They are used in the `GROUP BY` clause to define the time window for the aggregation. Other expressions in `GROUP BY` can be either literal, column or scalar expressions. + +Others things like `ORDER BY`, `LIMIT`, `OFFSET` are not supported in the continuous aggregation query. From 05b100aa7180541ba8011af10b7b3e767e68de7d Mon Sep 17 00:00:00 2001 From: discord9 Date: Tue, 3 Sep 2024 19:49:37 +0800 Subject: [PATCH 06/23] docs: add one more example for analytics --- .../continuous-aggregation/usecase-example.md | 13 +++++++++++++ 1 file changed, 13 insertions(+) diff --git a/docs/user-guide/continuous-aggregation/usecase-example.md b/docs/user-guide/continuous-aggregation/usecase-example.md index 52f1ea345..dcab7bae4 100644 --- a/docs/user-guide/continuous-aggregation/usecase-example.md +++ b/docs/user-guide/continuous-aggregation/usecase-example.md @@ -13,6 +13,19 @@ In all these usecases, the continuous aggregation system continuously aggregates See [Overview](overview.md) for an example of real-time analytics. Which is to calculate the total number of logs, the minimum size, the maximum size, the average size, and the number of packets with the size greater than 550 for each status code in a 1-minute fixed window for access logs. +Another exmaple of real-time analytics is to get all distinct values from a column. The query for continuous aggregation would be: + +```sql +CREATE FLOW get_distinct_values +SINK TO distinct_values +AS +SELECT DISTINCT + column_name, +FROM source_table; +``` + +The above query continuously aggregates the data from the `source_table` table into the `distinct_values` table. It calculates the distinct values for the `column_name` column and stores them in the `distinct_values` table. The `distinct_values` table will be continuously updated with the distinct values from the `column_name` column, providing real-time insights into the data. Note that due to not have a persistent storage, the `distinct_values` table will be reset when the server restarts. + ## Real-time monitoring example Consider a usecase where you have a stream of sensor events from a network of temperature sensors that you want to monitor in real-time. The sensor events contain information such as the sensor ID, the temperature reading, the timestamp of the reading, and the location of the sensor. You want to continuously aggregate this data to provide real-time alerts when the temperature exceeds a certain threshold. Then the query for continuous aggregation would be: From bc0366fa26310055ad747597e7a42a3bc8226d67 Mon Sep 17 00:00:00 2001 From: discord9 Date: Tue, 3 Sep 2024 19:59:46 +0800 Subject: [PATCH 07/23] more explain --- .../continuous-aggregation/usecase-example.md | 17 ++++++++++------- sidebars.ts | 1 + 2 files changed, 11 insertions(+), 7 deletions(-) diff --git a/docs/user-guide/continuous-aggregation/usecase-example.md b/docs/user-guide/continuous-aggregation/usecase-example.md index dcab7bae4..9c9ded694 100644 --- a/docs/user-guide/continuous-aggregation/usecase-example.md +++ b/docs/user-guide/continuous-aggregation/usecase-example.md @@ -13,18 +13,21 @@ In all these usecases, the continuous aggregation system continuously aggregates See [Overview](overview.md) for an example of real-time analytics. Which is to calculate the total number of logs, the minimum size, the maximum size, the average size, and the number of packets with the size greater than 550 for each status code in a 1-minute fixed window for access logs. -Another exmaple of real-time analytics is to get all distinct values from a column. The query for continuous aggregation would be: +Another example of real-time analytics is to get all distinct country from the `ngx_access_log` table. The query for continuous aggregation would be: ```sql -CREATE FLOW get_distinct_values -SINK TO distinct_values +CREATE FLOW calc_ngx_country +SINK TO ngx_country AS -SELECT DISTINCT - column_name, -FROM source_table; +SELECT + DISTINCT country, + date_bin(INTERVAL '1 hour', access_time) as time_window, +FROM ngx_access_log +GROUP BY + time_window; ``` -The above query continuously aggregates the data from the `source_table` table into the `distinct_values` table. It calculates the distinct values for the `column_name` column and stores them in the `distinct_values` table. The `distinct_values` table will be continuously updated with the distinct values from the `column_name` column, providing real-time insights into the data. Note that due to not have a persistent storage, the `distinct_values` table will be reset when the server restarts. +The above query puts the data from the `ngx_access_log` table into the `ngx_country` table. It calculates the distinct country for each time window. The `date_bin` function is used to group the data into one-hour intervals. The `ngx_country` table will be continuously updated with the aggregated data, providing real-time insights into the distinct countries that are accessing the system. Note that there is currently no persistent storage for flow's internal state(There is persistent storage for the table data however), so it's recommended to use appropriate time window to miniminize data loss, because if the internal state is lost, related time window data will be lost as well. ## Real-time monitoring example diff --git a/sidebars.ts b/sidebars.ts index dde53a3c6..c47984750 100644 --- a/sidebars.ts +++ b/sidebars.ts @@ -125,6 +125,7 @@ const sidebars: SidebarsConfig = { 'user-guide/continuous-aggregation/overview', 'user-guide/continuous-aggregation/usecase-example', 'user-guide/continuous-aggregation/manage-flow', + 'user-guide/continuous-aggregation/query', 'user-guide/continuous-aggregation/define-time-window', 'user-guide/continuous-aggregation/expression', ], From fb16ad22116378a8a64e3f726e63578de0db1552 Mon Sep 17 00:00:00 2001 From: discord9 Date: Wed, 4 Sep 2024 11:59:22 +0800 Subject: [PATCH 08/23] fix: make examples work --- .../continuous-aggregation/usecase-example.md | 55 ++++++++++++++++--- 1 file changed, 46 insertions(+), 9 deletions(-) diff --git a/docs/user-guide/continuous-aggregation/usecase-example.md b/docs/user-guide/continuous-aggregation/usecase-example.md index 9c9ded694..3a6fd772c 100644 --- a/docs/user-guide/continuous-aggregation/usecase-example.md +++ b/docs/user-guide/continuous-aggregation/usecase-example.md @@ -15,6 +15,24 @@ See [Overview](overview.md) for an example of real-time analytics. Which is to c Another example of real-time analytics is to get all distinct country from the `ngx_access_log` table. The query for continuous aggregation would be: +```sql +-- input table +CREATE TABLE ngx_access_log ( + client STRING, + country STRING, + access_time TIMESTAMP TIME INDEX +); +-- create flow task to calculate the distinct country +CREATE FLOW calc_ngx_country +SINK TO ngx_country +AS +SELECT + DISTINCT country, +FROM ngx_access_log; +``` + +or if you want to group the data by time window, you can use the following query: + ```sql CREATE FLOW calc_ngx_country SINK TO ngx_country @@ -24,10 +42,13 @@ SELECT date_bin(INTERVAL '1 hour', access_time) as time_window, FROM ngx_access_log GROUP BY + country, time_window; ``` -The above query puts the data from the `ngx_access_log` table into the `ngx_country` table. It calculates the distinct country for each time window. The `date_bin` function is used to group the data into one-hour intervals. The `ngx_country` table will be continuously updated with the aggregated data, providing real-time insights into the distinct countries that are accessing the system. Note that there is currently no persistent storage for flow's internal state(There is persistent storage for the table data however), so it's recommended to use appropriate time window to miniminize data loss, because if the internal state is lost, related time window data will be lost as well. +The above query puts the data from the `ngx_access_log` table into the `ngx_country` table. It calculates the distinct country for each time window. The `date_bin` function is used to group the data into one-hour intervals. The `ngx_country` table will be continuously updated with the aggregated data, providing real-time insights into the distinct countries that are accessing the system. + +Note that there is currently no persistent storage for flow's internal state(There is persistent storage for the table data however), so it's recommended to use appropriate time window to miniminize data loss, because if the internal state is lost, related time window data will be lost as well. ## Real-time monitoring example @@ -63,19 +84,35 @@ The above query continuously aggregates the data from the `temp_sensor_data` tab Consider a usecase in which you need a bar graph that show the distribution of packet sizes for each status code to monitor the health of the system. The query for continuous aggregation would be: ```sql -CREATE FLOW calc_ngx_distribution -SINK TO ngx_distribution -AS +-- create input table +CREATE TABLE ngx_access_log ( + client STRING, + stat INT, + size INT, + access_time TIMESTAMP TIME INDEX +); +-- create output table +CREATE TABLE ngx_distribution ( + stat INT, + bucket_size INT, + total_logs BIGINT, + time_window TIMESTAMP TIME INDEX, + update_at TIMESTAMP, -- auto generated column to store the last update time + PRIMARY KEY(stat, bucket_size) +); +-- create flow task to calculate the distribution of packet sizes for each status code +CREATE FLOW calc_ngx_distribution SINK TO ngx_distribution AS SELECT - status, - trunc(size, -1) as bucket_size, + stat, + trunc(size, -1)::INT as bucket_size, count(client) AS total_logs, date_bin(INTERVAL '1 minutes', access_time) as time_window, -FROM ngx_access_log +FROM + ngx_access_log GROUP BY - status, + stat, time_window, - bucket; + bucket_size; ``` The above query puts the data from the `ngx_access_log` table into the `ngx_distribution` table. It calculates the total number of logs for each status code and packet size bucket (in this case, since `trunc`'s second argument is -1, meaning a bucket size of 10) for each time window. The `date_bin` function is used to group the data into one-minute intervals. The `ngx_distribution` table will be continuously updated with the aggregated data, providing real-time insights into the distribution of packet sizes for each status code. From 3cc51a9f10f104ac8a3b6b18cb06659b46d85823 Mon Sep 17 00:00:00 2001 From: discord9 Date: Wed, 4 Sep 2024 12:00:19 +0800 Subject: [PATCH 09/23] docs: explain --- docs/user-guide/continuous-aggregation/query.md | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/docs/user-guide/continuous-aggregation/query.md b/docs/user-guide/continuous-aggregation/query.md index 490e20f50..cc19e486b 100644 --- a/docs/user-guide/continuous-aggregation/query.md +++ b/docs/user-guide/continuous-aggregation/query.md @@ -15,6 +15,10 @@ Only two kinds of expression are allowed after `SELECT` keyword: The query should have a `FROM` clause to identify the source table. As the join clause is currently not supported, the query can only aggregate columns from a single table. +`WHERE` and `HAVING` clauses are supported in the continuous aggregation query. They work as in a normal query. The `WHERE` clause filters the data before aggregation, and the `HAVING` clause filters the data after aggregation. + +`DISTINCT` currently only work with `SELECT DISTINCT column1, column2,..` syntax. It is used to remove duplicate rows from the result set. Support for `SELECT count(DISTINCT column1, column2,..)` is not available yet, but it will be added in the future. + `GROUP BY` clause works as in a normal query. It groups the data by the specified columns. One special thing is the time window functions `hop()` and `tumble()` described in [Define Time Window](./define-time-window.md) part. They are used in the `GROUP BY` clause to define the time window for the aggregation. Other expressions in `GROUP BY` can be either literal, column or scalar expressions. Others things like `ORDER BY`, `LIMIT`, `OFFSET` are not supported in the continuous aggregation query. From 5002f26970e3a845f34e66baccf05cb8c65e9555 Mon Sep 17 00:00:00 2001 From: discord9 Date: Wed, 4 Sep 2024 12:23:42 +0800 Subject: [PATCH 10/23] chore: query --- docs/user-guide/continuous-aggregation/query.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/user-guide/continuous-aggregation/query.md b/docs/user-guide/continuous-aggregation/query.md index cc19e486b..e2fc14bbd 100644 --- a/docs/user-guide/continuous-aggregation/query.md +++ b/docs/user-guide/continuous-aggregation/query.md @@ -17,7 +17,7 @@ The query should have a `FROM` clause to identify the source table. As the join `WHERE` and `HAVING` clauses are supported in the continuous aggregation query. They work as in a normal query. The `WHERE` clause filters the data before aggregation, and the `HAVING` clause filters the data after aggregation. -`DISTINCT` currently only work with `SELECT DISTINCT column1, column2,..` syntax. It is used to remove duplicate rows from the result set. Support for `SELECT count(DISTINCT column1, column2,..)` is not available yet, but it will be added in the future. +`DISTINCT` currently only work with `SELECT DISTINCT column1 ..` syntax. It is used to remove duplicate rows from the result set. Support for `SELECT count(DISTINCT column1) ...` is not available yet, but it will be added in the future. `GROUP BY` clause works as in a normal query. It groups the data by the specified columns. One special thing is the time window functions `hop()` and `tumble()` described in [Define Time Window](./define-time-window.md) part. They are used in the `GROUP BY` clause to define the time window for the aggregation. Other expressions in `GROUP BY` can be either literal, column or scalar expressions. From aefb9e04a2f69e4b0990ee4d9f7eea4bf0fa071b Mon Sep 17 00:00:00 2001 From: discord9 Date: Thu, 5 Sep 2024 17:15:49 +0800 Subject: [PATCH 11/23] fix: link --- docs/user-guide/continuous-aggregation/overview.md | 1 + 1 file changed, 1 insertion(+) diff --git a/docs/user-guide/continuous-aggregation/overview.md b/docs/user-guide/continuous-aggregation/overview.md index 9ca8e11bf..a6b801f4b 100644 --- a/docs/user-guide/continuous-aggregation/overview.md +++ b/docs/user-guide/continuous-aggregation/overview.md @@ -143,5 +143,6 @@ Please refer to the following sections to learn more: - [Usecase Examples](./usecase-example.md) provides more examples of how to use continuous aggregation in real-time analytics, monitoring, and dashboard. - [Manage Flows](./manage-flow.md) describes how to create, update, and delete a flow. Each of your continuous aggregation query is a flow. +- [Write a Query](./query.md) describes how to write a continuous aggregation query. - [Define Time Window](./define-time-window.md) describes how to define the time window for the continuous aggregation. Time window is an important attribute of your continuous aggregation query. It defines the time interval for the aggregation. - [Expression](./expression.md) is a reference of available expressions in the continuous aggregation query. From b4a76555f00d8ed88356cd9c000bbc88abb87a79 Mon Sep 17 00:00:00 2001 From: discord9 Date: Thu, 5 Sep 2024 17:16:09 +0800 Subject: [PATCH 12/23] =?UTF-8?q?copilot=20=E6=9C=BA=E7=BF=BB=E6=96=87?= =?UTF-8?q?=E6=A1=A3?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../continuous-aggregation/overview.md | 4 +- .../continuous-aggregation/query.md | 98 +------------- .../continuous-aggregation/usecase-example.md | 121 ++++++++++++++++++ 3 files changed, 130 insertions(+), 93 deletions(-) create mode 100644 i18n/zh/docusaurus-plugin-content-docs/current/user-guide/continuous-aggregation/usecase-example.md diff --git a/i18n/zh/docusaurus-plugin-content-docs/current/user-guide/continuous-aggregation/overview.md b/i18n/zh/docusaurus-plugin-content-docs/current/user-guide/continuous-aggregation/overview.md index 03794d1eb..282535fe1 100644 --- a/i18n/zh/docusaurus-plugin-content-docs/current/user-guide/continuous-aggregation/overview.md +++ b/i18n/zh/docusaurus-plugin-content-docs/current/user-guide/continuous-aggregation/overview.md @@ -67,7 +67,7 @@ SELECT min(size) as min_size, max(size) as max_size, avg(size) as avg_size, - sum(case when `size` > 550::double then 1::double else 0::double end) as high_size_count, + sum(case when `size` > 550 then 1 else 0 end) as high_size_count, date_bin(INTERVAL '1 minutes', access_time) as time_window, FROM ngx_access_log GROUP BY @@ -141,6 +141,8 @@ SELECT * FROM ngx_statistics; 恭喜你已经初步了解了持续聚合功能。 请参考以下章节了解更多: + +- [用例](./usecase-example.md) 提供了更多关于如何在实时分析、监控和仪表板中使用持续聚合的示例。 - [管理 Flow](./manage-flow.md) 描述了如何创建、更新和删除 flow。你的每个持续聚合查询都是一个 flow。 - [编写查询语句](./query.md) 描述了如何编写持续聚合查询。 - [定义时间窗口](./define-time-window.md) 描述了如何为持续聚合定义时间窗口。时间窗口是持续聚合查询的一个重要属性,它定义了聚合的时间间隔。 diff --git a/i18n/zh/docusaurus-plugin-content-docs/current/user-guide/continuous-aggregation/query.md b/i18n/zh/docusaurus-plugin-content-docs/current/user-guide/continuous-aggregation/query.md index 77c05ac92..f3123df8e 100644 --- a/i18n/zh/docusaurus-plugin-content-docs/current/user-guide/continuous-aggregation/query.md +++ b/i18n/zh/docusaurus-plugin-content-docs/current/user-guide/continuous-aggregation/query.md @@ -16,98 +16,12 @@ SELECT AGGR_FUNCTION(column1, column2,..) FROM GROUP BY TIME_WIND 查询应该有一个 `FROM` 子句来标识 source 表。由于不支持 join 子句,目前只能从单个表中聚合列。 -`GROUP BY` 子句与普通查询中的工作方式相同。 -它根据指定的列对数据进行分组。 -`GROUP BY` 子句中使用的时间窗口函数 `hop()` 和 `tumble()` 在 [定义时间窗口](./define-time-window.md) 部分中有描述。 -它们用于在聚合中定义时间窗口。 -`GROUP BY` 中的其他表达式可以是 literal、列名或 scalar 表达式。 - -持续聚合查询不支持 `ORDER BY`、`LIMIT`、`OFFSET` 等其他操作。 - - -## 将现有查询重写为持续聚合查询 - -一些简单的现有聚合查询可以直接用作持续聚合查询。例如,[概述](./overview.md) 部分中的示例可以用于标准 SQL 查询和持续聚合查询,因为它也是一个有效的 SQL 查询,没有任何特定于流的语法或函数: - -```sql -SELECT - status, - count(client) AS total_logs, - min(size) as min_size, - max(size) as max_size, - avg(size) as avg_size, - sum(case when `size` > 550::double then 1::double else 0::double end) as high_size_count, - date_bin(INTERVAL '1 minutes', access_time) as time_window, -FROM ngx_access_log -GROUP BY - status, - time_window; -``` - -然而,还有其他类型的查询不能直接用作持续聚合查询。 -例如,需要计算百分位数的查询不应该在每次新数据到达时重复计算每个时间窗口的百分位数。 -在这种情况下,您可以将数据预聚合到所需大小的桶中,然后在需要时使用标准 SQL 在 sink 表中计算百分位数。原始 SQL 可能是: - -```sql -SELECT - status, - percentile_approx(size, 0.5) as median_size, - date_bin(INTERVAL '1 minutes', access_time) as time_window, -FROM ngx_access_log -GROUP BY - status, - time_window; -``` -上述查询可以重写为首先将数据聚合到大小为 10 的桶中,然后在 sink 表中计算百分位数。 -流查询将是: +`WHERE` 和 `HAVING` 子句在持续聚合查询中是支持的。它们的工作方式与普通查询中的相同。`WHERE` 子句在聚合之前过滤数据,而 `HAVING` 子句在聚合之后过滤数据。 -```sql -CREATE FLOW calc_ngx_distribution -SINK TO ngx_distribution -AS -SELECT - status, - trunc(size, -1) as bucket, - count(client) AS total_logs, - date_bin(INTERVAL '1 minutes', access_time) as time_window, -FROM ngx_access_log -GROUP BY - status, - time_window, - bucket; -``` -接下来,您可以使用标准 SQL 在 sink 表中计算百分位数: -```sql -SELECT - outer.status, - outer.time_window, - outer.bucket, - SUM(case when in1.bucket <= outer.bucket then in1.total_logs else 0 end) * 100 / SUM(in1.total_logs) AS percentile -FROM ngx_distribution AS outer -JOIN ngx_distribution AS in1 -ON in1.status = outer.status -AND in1.time_window = outer.time_window -GROUP BY - status, - time_window, - bucket -ORDER BY status, time_window, bucket; -``` +`DISTINCT` 目前只能使用 `SELECT DISTINCT column1 ..` 语法。它用于从结果集中删除重复行。目前还不支持 `SELECT count(DISTINCT column1) ...`,但将来会添加支持。 -上述 SQL 查询按 status、time_window 和 bucket 对数据进行分组。percentile 列通过计算小于或等于当前 bucket 的所有 bucket 的总和,并将其除以所有日志的总数来计算每个组内的百分比。结果可能如下所示: - -```sql - status | time_window | bucket | percentile ---------+----------------------------+--------+------------ - 404 | 1970-01-01 00:00:00.000000 | 0 | 22 - 404 | 1970-01-01 00:00:00.000000 | 1 | 55 - 404 | 1970-01-01 00:00:00.000000 | 2 | 66 - 404 | 1970-01-01 00:00:00.000000 | 3 | 100 -(4 rows) -``` +`GROUP BY` 子句与普通查询中的工作方式相同。它根据指定的列对数据进行分组。`GROUP BY` 子句中使用的时间窗口函数 `hop()` 和 `tumble()` 在 [定义时间窗口](./define-time-window.md) 部分中有描述。 +它们用于在聚合中定义时间窗口。 +`GROUP BY` 中的其他表达式可以是 literal、列名或 scalar 表达式。 - \ No newline at end of file +持续聚合查询不支持 `ORDER BY`、`LIMIT`、`OFFSET` 等其他操作。 \ No newline at end of file diff --git a/i18n/zh/docusaurus-plugin-content-docs/current/user-guide/continuous-aggregation/usecase-example.md b/i18n/zh/docusaurus-plugin-content-docs/current/user-guide/continuous-aggregation/usecase-example.md new file mode 100644 index 000000000..ecc080cbf --- /dev/null +++ b/i18n/zh/docusaurus-plugin-content-docs/current/user-guide/continuous-aggregation/usecase-example.md @@ -0,0 +1,121 @@ +# 用例 +持续聚合的三个主要用例示例如下: + +1. **实时分析**:一个实时分析平台,不断聚合来自事件流的数据,提供即时洞察,同时可选择将数据降采样到较低分辨率。例如,此系统可以编译来自高频日志事件流(例如,每毫秒发生一次)的数据,以提供每分钟的请求数、平均响应时间和每分钟的错误率等最新洞察。 + +2. **实时监控**:一个实时监控系统,不断聚合来自事件流的数据,根据聚合数据提供实时警报。例如,此系统可以处理来自传感器事件流的数据,以提供当温度超过某个阈值时的实时警报。 + +3. **实时仪表盘**:一个实时仪表盘,显示每分钟的请求数、平均响应时间和每分钟的错误数。此仪表板可用于监控系统的健康状况,并检测系统中的任何异常。 + +在所有这些用例中,持续聚合系统不断聚合来自事件流的数据,并根据聚合数据提供实时洞察和警报。系统还可以将数据降采样到较低分辨率,以减少存储和处理的数据量。这使得系统能够提供实时洞察和警报,同时保持数据存储和处理成本低廉。 + +## 实时分析示例 + +请参阅[概述](overview.md)中的实时分析示例。该示例用于计算日志的总数、包大小的最小、最大和平均值,以及大小大于 550 的数据包数量按照每个状态码在 1 分钟固定窗口中的实时分析。 + +另外,您还可以使用持续聚合来计算其他类型的实时分析。例如,要从 `ngx_access_log` 表中获取所有不同的国家。持续聚合的查询如下: + +```sql +-- input table +CREATE TABLE ngx_access_log ( + client STRING, + country STRING, + access_time TIMESTAMP TIME INDEX +); +-- create flow task to calculate the distinct country +CREATE FLOW calc_ngx_country +SINK TO ngx_country +AS +SELECT + DISTINCT country, +FROM ngx_access_log; +``` + +或者,如果您想要按时间窗口对数据进行分组,可以使用以下查询: + +```sql +CREATE FLOW calc_ngx_country +SINK TO ngx_country +AS +SELECT + DISTINCT country, + date_bin(INTERVAL '1 hour', access_time) as time_window, +FROM ngx_access_log +GROUP BY + country, + time_window; +``` + +上述的查询将 `ngx_access_log` 表中的数据放入 `ngx_country` 表中。它计算每个时间窗口的不同国家。`date_bin` 函数用于将数据分组为一小时的间隔。`ngx_country` 表将不断更新聚合数据,提供实时洞察,显示正在访问系统的不同国家。 + +请注意,目前没有持久存储用于 flow 的内部状态(但是表数据有持久存储),因此建议使用适当的时间窗口来最小化数据丢失,因为如果内部状态丢失,相关时间窗口数据也将丢失。 + +## 实时监控示例 + +考虑一个使用情况,您有一个来自温度传感器网络的传感器事件流,您希望实时监控。传感器事件包含传感器 ID、温度读数、读数的时间戳和传感器的位置等信息。您希望不断聚合这些数据,以在温度超过某个阈值时提供实时警报。那么持续聚合的查询将是: + +```sql +CREATE TABLE temp_sensor_data ( + sensor_id INT, + loc STRING, + temperature DOUBLE, + ts TIMESTAMP TIME INDEX +); + +CREATE FLOW temp_monitoring +SINK TO temp_alerts +AS +SELECT + sensor_id, + loc, + max(temperature) as max_temp, +FROM temp_sensor_data +GROUP BY + sensor_id, + loc +HAVING max_temp > 100; +``` + +上述的查询将从 `temp_sensor_data` 表中不断聚合数据到 `temp_alerts` 表中。它计算每个传感器和位置的最高温度读数,并过滤出最高温度超过 100 度的数据。`temp_alerts` 表将不断更新聚合数据,并且当温度超过阈值时提供实时警报(即 `temp_alerts` 表中的新行)。 + +## 实时仪表盘示例 + +考虑一个使用情况,您需要一个柱状图,显示每个状态码的数据包大小分布,以监控系统的健康状况。持续聚合的查询将是: + +```sql +-- create input table +CREATE TABLE ngx_access_log ( + client STRING, + stat INT, + size INT, + access_time TIMESTAMP TIME INDEX +); +-- create output table +CREATE TABLE ngx_distribution ( + stat INT, + bucket_size INT, + total_logs BIGINT, + time_window TIMESTAMP TIME INDEX, + update_at TIMESTAMP, -- auto generated column to store the last update time + PRIMARY KEY(stat, bucket_size) +); +-- create flow task to calculate the distribution of packet sizes for each status code +CREATE FLOW calc_ngx_distribution SINK TO ngx_distribution AS +SELECT + stat, + trunc(size, -1)::INT as bucket_size, + count(client) AS total_logs, + date_bin(INTERVAL '1 minutes', access_time) as time_window, +FROM + ngx_access_log +GROUP BY + stat, + time_window, + bucket_size; +``` + +上述查询将从 `ngx_access_log` 表中的数据放入 `ngx_distribution` 表中。它计算每个状态码的数据包大小分布,并将数据分组到每个时间窗口中。`ngx_distribution` 表将不断更新聚合数据,提供实时洞察,显示每个状态码的数据包大小分布。 + +## 结论 + +持续聚合是实时分析、监控和仪表盘的强大工具。它允许您不断聚合来自事件流的数据,并根据聚合数据提供实时洞察和警报。通过将数据降采样到较低分辨率,您可以减少存储和处理的数据量,从而更容易提供实时洞察和警报,同时保持数据存储和处理成本低廉。持续聚合是任何实时数据处理系统的关键组件,可以在各种用例中使用,以提供基于流数据的实时洞察和警报。 \ No newline at end of file From 40003f98de02ec414d64aab3ab25d5af7f0c37c6 Mon Sep 17 00:00:00 2001 From: discord9 Date: Mon, 9 Sep 2024 15:44:56 +0800 Subject: [PATCH 13/23] chore: en review --- .../continuous-aggregation/query.md | 2 + .../continuous-aggregation/usecase-example.md | 81 ++++++++++++++++++- 2 files changed, 82 insertions(+), 1 deletion(-) diff --git a/docs/user-guide/continuous-aggregation/query.md b/docs/user-guide/continuous-aggregation/query.md index e2fc14bbd..f21ee2af0 100644 --- a/docs/user-guide/continuous-aggregation/query.md +++ b/docs/user-guide/continuous-aggregation/query.md @@ -22,3 +22,5 @@ The query should have a `FROM` clause to identify the source table. As the join `GROUP BY` clause works as in a normal query. It groups the data by the specified columns. One special thing is the time window functions `hop()` and `tumble()` described in [Define Time Window](./define-time-window.md) part. They are used in the `GROUP BY` clause to define the time window for the aggregation. Other expressions in `GROUP BY` can be either literal, column or scalar expressions. Others things like `ORDER BY`, `LIMIT`, `OFFSET` are not supported in the continuous aggregation query. + +See [Usecase Examples](./usecase-example.md) for more examples of how to use continuous aggregation in real-time analytics, monitoring, and dashboard. \ No newline at end of file diff --git a/docs/user-guide/continuous-aggregation/usecase-example.md b/docs/user-guide/continuous-aggregation/usecase-example.md index 3a6fd772c..422d93130 100644 --- a/docs/user-guide/continuous-aggregation/usecase-example.md +++ b/docs/user-guide/continuous-aggregation/usecase-example.md @@ -22,6 +22,15 @@ CREATE TABLE ngx_access_log ( country STRING, access_time TIMESTAMP TIME INDEX ); + +-- output table +CREATE TABLE ngx_country ( + country STRING, + update_at TIMESTAMP, + __ts_placeholder TIMESTAMP TIME INDEX, -- placeholder column for time index + PRIMARY KEY(country) +); + -- create flow task to calculate the distinct country CREATE FLOW calc_ngx_country SINK TO ngx_country @@ -29,11 +38,35 @@ AS SELECT DISTINCT country, FROM ngx_access_log; + +-- insert some data +INSERT INTO ngx_access_log VALUES + ("client1", "US", "2022-01-01 00:00:00"), + ("client2", "US", "2022-01-01 00:00:01"), + ("client3", "UK", "2022-01-01 00:00:02"), + ("client4", "UK", "2022-01-01 00:00:03"), + ("client5", "CN", "2022-01-01 00:00:04"), + ("client6", "CN", "2022-01-01 00:00:05"), + ("client7", "JP", "2022-01-01 00:00:06"), + ("client8", "JP", "2022-01-01 00:00:07"), + ("client9", "KR", "2022-01-01 00:00:08"), + ("client10", "KR", "2022-01-01 00:00:09"); + +-- check the result +select * from ngx_country; ``` or if you want to group the data by time window, you can use the following query: ```sql +-- input table create same as above +-- output table +CREATE TABLE ngx_country ( + country STRING, + time_window TIMESTAMP TIME INDEX,-- no need to use __ts_placeholder here since we have a time window column as time index + update_at TIMESTAMP, + PRIMARY KEY(country) +); CREATE FLOW calc_ngx_country SINK TO ngx_country AS @@ -44,17 +77,20 @@ FROM ngx_access_log GROUP BY country, time_window; +-- insert data using the same data as above ``` The above query puts the data from the `ngx_access_log` table into the `ngx_country` table. It calculates the distinct country for each time window. The `date_bin` function is used to group the data into one-hour intervals. The `ngx_country` table will be continuously updated with the aggregated data, providing real-time insights into the distinct countries that are accessing the system. -Note that there is currently no persistent storage for flow's internal state(There is persistent storage for the table data however), so it's recommended to use appropriate time window to miniminize data loss, because if the internal state is lost, related time window data will be lost as well. +Note that there is currently no persistent storage for flow's internal state, internal state refer to intermediate state used in computing incremental query result, like accumlator's value for a aggregation query, there is persistent storage for the sink table data however. +so it's recommended to use appropriate time window(i.e. hourly if you can tolerate loss one hour of data when rebooting) to miniminize data loss, because if the internal state is lost, related time window data will be lost as well. ## Real-time monitoring example Consider a usecase where you have a stream of sensor events from a network of temperature sensors that you want to monitor in real-time. The sensor events contain information such as the sensor ID, the temperature reading, the timestamp of the reading, and the location of the sensor. You want to continuously aggregate this data to provide real-time alerts when the temperature exceeds a certain threshold. Then the query for continuous aggregation would be: ```sql +-- input table CREATE TABLE temp_sensor_data ( sensor_id INT, loc STRING, @@ -62,6 +98,15 @@ CREATE TABLE temp_sensor_data ( ts TIMESTAMP TIME INDEX ); +-- output table +CREATE TABLE temp_alerts ( + sensor_id INT, + loc STRING, + max_temp DOUBLE, + update_at TIMESTAMP TIME INDEX, + PRIMARY KEY(sensor_id, loc) +); + CREATE FLOW temp_monitoring SINK TO temp_alerts AS @@ -74,6 +119,26 @@ GROUP BY sensor_id, loc HAVING max_temp > 100; + +INSERT INTO temp_sensor_data VALUES + (1, "room1", 98.5, "2022-01-01 00:00:00"), + (2, "room2", 99.5, "2022-01-01 00:00:01"); + +-- You may want to flush the flow task to see the result +ADMIN FLUSH_FLOW('temp_monitoring'); + +-- for now sink table will be empty +SELECT * FROM temp_alerts; + +INSERT INTO temp_sensor_data VALUES + (1, "room1", 101.5, "2022-01-01 00:00:02"), + (2, "room2", 102.5, "2022-01-01 00:00:03"); + +-- You may want to flush the flow task to see the result +ADMIN FLUSH_FLOW('temp_monitoring'); + +-- now sink table will have the max temperature data +SELECT * FROM temp_alerts; ``` The above query continuously aggregates the data from the `temp_sensor_data` table into the `temp_alerts` table. It calculates the maximum temperature reading for each sensor and location and filters out the data where the maximum temperature exceeds 100 degrees. The `temp_alerts` table will be continuously updated with the aggregated data, providing real-time alerts (Which is a new row in the `temp_alerts` table) when the temperature exceeds the threshold. @@ -113,6 +178,20 @@ GROUP BY stat, time_window, bucket_size; + +INSERT INTO ngx_access_log VALUES + ("cli1", 200, 100, "2022-01-01 00:00:00"), + ("cli2", 200, 110, "2022-01-01 00:00:01"), + ("cli3", 200, 120, "2022-01-01 00:00:02"), + ("cli4", 200, 130, "2022-01-01 00:00:03"), + ("cli5", 200, 140, "2022-01-01 00:00:04"), + ("cli6", 404, 150, "2022-01-01 00:00:05"), + ("cli7", 404, 160, "2022-01-01 00:00:06"), + ("cli8", 404, 170, "2022-01-01 00:00:07"), + ("cli9", 404, 180, "2022-01-01 00:00:08"), + ("cli10", 404, 190, "2022-01-01 00:00:09"); + +SELECT * FROM ngx_distribution; ``` The above query puts the data from the `ngx_access_log` table into the `ngx_distribution` table. It calculates the total number of logs for each status code and packet size bucket (in this case, since `trunc`'s second argument is -1, meaning a bucket size of 10) for each time window. The `date_bin` function is used to group the data into one-minute intervals. The `ngx_distribution` table will be continuously updated with the aggregated data, providing real-time insights into the distribution of packet sizes for each status code. From 22a52a5cb2bd20923e130c0c4a650ac64a0c5bfb Mon Sep 17 00:00:00 2001 From: discord9 Date: Mon, 9 Sep 2024 15:47:07 +0800 Subject: [PATCH 14/23] chore: zh review TODO zh new paragraph --- .../current/user-guide/continuous-aggregation/query.md | 2 +- .../user-guide/continuous-aggregation/usecase-example.md | 8 ++++---- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/i18n/zh/docusaurus-plugin-content-docs/current/user-guide/continuous-aggregation/query.md b/i18n/zh/docusaurus-plugin-content-docs/current/user-guide/continuous-aggregation/query.md index f3123df8e..fb47e99f3 100644 --- a/i18n/zh/docusaurus-plugin-content-docs/current/user-guide/continuous-aggregation/query.md +++ b/i18n/zh/docusaurus-plugin-content-docs/current/user-guide/continuous-aggregation/query.md @@ -18,7 +18,7 @@ SELECT AGGR_FUNCTION(column1, column2,..) FROM GROUP BY TIME_WIND `WHERE` 和 `HAVING` 子句在持续聚合查询中是支持的。它们的工作方式与普通查询中的相同。`WHERE` 子句在聚合之前过滤数据,而 `HAVING` 子句在聚合之后过滤数据。 -`DISTINCT` 目前只能使用 `SELECT DISTINCT column1 ..` 语法。它用于从结果集中删除重复行。目前还不支持 `SELECT count(DISTINCT column1) ...`,但将来会添加支持。 +`DISTINCT` 用于从结果集中删除重复行,目前仅支持 `SELECT DISTINCT column1 ..` 语法,不支持 `SELECT count(DISTINCT column1) ...`,但将来会添加支持。 `GROUP BY` 子句与普通查询中的工作方式相同。它根据指定的列对数据进行分组。`GROUP BY` 子句中使用的时间窗口函数 `hop()` 和 `tumble()` 在 [定义时间窗口](./define-time-window.md) 部分中有描述。 它们用于在聚合中定义时间窗口。 diff --git a/i18n/zh/docusaurus-plugin-content-docs/current/user-guide/continuous-aggregation/usecase-example.md b/i18n/zh/docusaurus-plugin-content-docs/current/user-guide/continuous-aggregation/usecase-example.md index ecc080cbf..5147c3524 100644 --- a/i18n/zh/docusaurus-plugin-content-docs/current/user-guide/continuous-aggregation/usecase-example.md +++ b/i18n/zh/docusaurus-plugin-content-docs/current/user-guide/continuous-aggregation/usecase-example.md @@ -7,7 +7,7 @@ 3. **实时仪表盘**:一个实时仪表盘,显示每分钟的请求数、平均响应时间和每分钟的错误数。此仪表板可用于监控系统的健康状况,并检测系统中的任何异常。 -在所有这些用例中,持续聚合系统不断聚合来自事件流的数据,并根据聚合数据提供实时洞察和警报。系统还可以将数据降采样到较低分辨率,以减少存储和处理的数据量。这使得系统能够提供实时洞察和警报,同时保持数据存储和处理成本低廉。 +在所有这些用例中,持续聚合系统不断聚合来自事件流的数据,并根据聚合数据提供实时洞察和警报。系统还可以将数据降采样到较低分辨率,以减少存储和处理的数据量。这使得系统能够提供实时洞察和警报,同时保持较低的数据存储和处理成本。 ## 实时分析示例 @@ -52,7 +52,7 @@ GROUP BY ## 实时监控示例 -考虑一个使用情况,您有一个来自温度传感器网络的传感器事件流,您希望实时监控。传感器事件包含传感器 ID、温度读数、读数的时间戳和传感器的位置等信息。您希望不断聚合这些数据,以在温度超过某个阈值时提供实时警报。那么持续聚合的查询将是: +假设您希望实时监控一个来自温度传感器网络的传感器事件流。传感器事件包含传感器 ID、温度读数、读数的时间戳和传感器的位置等信息。您希望不断聚合这些数据,以在温度超过某个阈值时提供实时警报。那么持续聚合的查询将是: ```sql CREATE TABLE temp_sensor_data ( @@ -80,7 +80,7 @@ HAVING max_temp > 100; ## 实时仪表盘示例 -考虑一个使用情况,您需要一个柱状图,显示每个状态码的数据包大小分布,以监控系统的健康状况。持续聚合的查询将是: +假设您需要一个柱状图显示每个状态码的数据包大小分布,以监控系统的健康状况。持续聚合的查询将是: ```sql -- create input table @@ -118,4 +118,4 @@ GROUP BY ## 结论 -持续聚合是实时分析、监控和仪表盘的强大工具。它允许您不断聚合来自事件流的数据,并根据聚合数据提供实时洞察和警报。通过将数据降采样到较低分辨率,您可以减少存储和处理的数据量,从而更容易提供实时洞察和警报,同时保持数据存储和处理成本低廉。持续聚合是任何实时数据处理系统的关键组件,可以在各种用例中使用,以提供基于流数据的实时洞察和警报。 \ No newline at end of file +持续聚合是实时分析、监控和仪表盘的强大工具。它允许您不断聚合来自事件流的数据,并根据聚合数据提供实时洞察和警报。通过将数据降采样到较低分辨率,您可以减少存储和处理的数据量,从而更容易提供实时洞察和警报,同时保持较低的数据存储和处理成本。持续聚合是任何实时数据处理系统的关键组件,可以在各种用例中使用,以提供基于流数据的实时洞察和警报。 \ No newline at end of file From e02d36847da41073edc9b48f165b66b59dc83321 Mon Sep 17 00:00:00 2001 From: discord9 Date: Mon, 9 Sep 2024 15:55:25 +0800 Subject: [PATCH 15/23] update new zh paragraph --- .../continuous-aggregation/usecase-example.md | 2 +- .../continuous-aggregation/query.md | 4 +- .../continuous-aggregation/usecase-example.md | 80 ++++++++++++++++++- 3 files changed, 83 insertions(+), 3 deletions(-) diff --git a/docs/user-guide/continuous-aggregation/usecase-example.md b/docs/user-guide/continuous-aggregation/usecase-example.md index 422d93130..ca07fcc49 100644 --- a/docs/user-guide/continuous-aggregation/usecase-example.md +++ b/docs/user-guide/continuous-aggregation/usecase-example.md @@ -82,7 +82,7 @@ GROUP BY The above query puts the data from the `ngx_access_log` table into the `ngx_country` table. It calculates the distinct country for each time window. The `date_bin` function is used to group the data into one-hour intervals. The `ngx_country` table will be continuously updated with the aggregated data, providing real-time insights into the distinct countries that are accessing the system. -Note that there is currently no persistent storage for flow's internal state, internal state refer to intermediate state used in computing incremental query result, like accumlator's value for a aggregation query, there is persistent storage for the sink table data however. +Note that there is currently no persistent storage for flow's internal state, internal state refer to intermediate state used in computing incremental query result, like accumlator's value for a aggregation query(i.e. `count(col)`'s accumlator record current count number), there is persistent storage for the sink table data however. so it's recommended to use appropriate time window(i.e. hourly if you can tolerate loss one hour of data when rebooting) to miniminize data loss, because if the internal state is lost, related time window data will be lost as well. ## Real-time monitoring example diff --git a/i18n/zh/docusaurus-plugin-content-docs/current/user-guide/continuous-aggregation/query.md b/i18n/zh/docusaurus-plugin-content-docs/current/user-guide/continuous-aggregation/query.md index fb47e99f3..2e98aec98 100644 --- a/i18n/zh/docusaurus-plugin-content-docs/current/user-guide/continuous-aggregation/query.md +++ b/i18n/zh/docusaurus-plugin-content-docs/current/user-guide/continuous-aggregation/query.md @@ -24,4 +24,6 @@ SELECT AGGR_FUNCTION(column1, column2,..) FROM GROUP BY TIME_WIND 它们用于在聚合中定义时间窗口。 `GROUP BY` 中的其他表达式可以是 literal、列名或 scalar 表达式。 -持续聚合查询不支持 `ORDER BY`、`LIMIT`、`OFFSET` 等其他操作。 \ No newline at end of file +持续聚合查询不支持 `ORDER BY`、`LIMIT`、`OFFSET` 等其他操作。 + +更多关于如何在实时分析、监控和仪表盘中使用持续聚合的示例,请参考 [用例](./usecase-example.md)。 \ No newline at end of file diff --git a/i18n/zh/docusaurus-plugin-content-docs/current/user-guide/continuous-aggregation/usecase-example.md b/i18n/zh/docusaurus-plugin-content-docs/current/user-guide/continuous-aggregation/usecase-example.md index 5147c3524..838313284 100644 --- a/i18n/zh/docusaurus-plugin-content-docs/current/user-guide/continuous-aggregation/usecase-example.md +++ b/i18n/zh/docusaurus-plugin-content-docs/current/user-guide/continuous-aggregation/usecase-example.md @@ -22,6 +22,15 @@ CREATE TABLE ngx_access_log ( country STRING, access_time TIMESTAMP TIME INDEX ); + +-- output table +CREATE TABLE ngx_country ( + country STRING, + update_at TIMESTAMP, + __ts_placeholder TIMESTAMP TIME INDEX, -- placeholder column for time index + PRIMARY KEY(country) +); + -- create flow task to calculate the distinct country CREATE FLOW calc_ngx_country SINK TO ngx_country @@ -29,11 +38,35 @@ AS SELECT DISTINCT country, FROM ngx_access_log; + +-- insert some data +INSERT INTO ngx_access_log VALUES + ("client1", "US", "2022-01-01 00:00:00"), + ("client2", "US", "2022-01-01 00:00:01"), + ("client3", "UK", "2022-01-01 00:00:02"), + ("client4", "UK", "2022-01-01 00:00:03"), + ("client5", "CN", "2022-01-01 00:00:04"), + ("client6", "CN", "2022-01-01 00:00:05"), + ("client7", "JP", "2022-01-01 00:00:06"), + ("client8", "JP", "2022-01-01 00:00:07"), + ("client9", "KR", "2022-01-01 00:00:08"), + ("client10", "KR", "2022-01-01 00:00:09"); + +-- check the result +select * from ngx_country; ``` 或者,如果您想要按时间窗口对数据进行分组,可以使用以下查询: ```sql +-- input table create same as above +-- output table +CREATE TABLE ngx_country ( + country STRING, + time_window TIMESTAMP TIME INDEX,-- no need to use __ts_placeholder here since we have a time window column as time index + update_at TIMESTAMP, + PRIMARY KEY(country) +); CREATE FLOW calc_ngx_country SINK TO ngx_country AS @@ -44,17 +77,19 @@ FROM ngx_access_log GROUP BY country, time_window; +-- insert data using the same data as above ``` 上述的查询将 `ngx_access_log` 表中的数据放入 `ngx_country` 表中。它计算每个时间窗口的不同国家。`date_bin` 函数用于将数据分组为一小时的间隔。`ngx_country` 表将不断更新聚合数据,提供实时洞察,显示正在访问系统的不同国家。 -请注意,目前没有持久存储用于 flow 的内部状态(但是表数据有持久存储),因此建议使用适当的时间窗口来最小化数据丢失,因为如果内部状态丢失,相关时间窗口数据也将丢失。 +请注意,目前 Flow 的内部状态没有持久存储。内部状态指的是用于计算增量查询结果的中间状态,例如聚合查询的累加器值(如count(col)的累加器记录了目前为止的 count 计数)。然而,Sink 表的数据是有持久存储的。因此,建议您使用适当的时间窗口(例如,如果可以接受在重启时丢失一小时的数据,则可以设置为每小时)来最小化数据丢失。因为一旦内部状态丢失,相关时间窗口的数据也将随之丢失。 ## 实时监控示例 假设您希望实时监控一个来自温度传感器网络的传感器事件流。传感器事件包含传感器 ID、温度读数、读数的时间戳和传感器的位置等信息。您希望不断聚合这些数据,以在温度超过某个阈值时提供实时警报。那么持续聚合的查询将是: ```sql +-- input table CREATE TABLE temp_sensor_data ( sensor_id INT, loc STRING, @@ -62,6 +97,15 @@ CREATE TABLE temp_sensor_data ( ts TIMESTAMP TIME INDEX ); +-- output table +CREATE TABLE temp_alerts ( + sensor_id INT, + loc STRING, + max_temp DOUBLE, + update_at TIMESTAMP TIME INDEX, + PRIMARY KEY(sensor_id, loc) +); + CREATE FLOW temp_monitoring SINK TO temp_alerts AS @@ -74,6 +118,26 @@ GROUP BY sensor_id, loc HAVING max_temp > 100; + +INSERT INTO temp_sensor_data VALUES + (1, "room1", 98.5, "2022-01-01 00:00:00"), + (2, "room2", 99.5, "2022-01-01 00:00:01"); + +-- You may want to flush the flow task to see the result +ADMIN FLUSH_FLOW('temp_monitoring'); + +-- for now sink table will be empty +SELECT * FROM temp_alerts; + +INSERT INTO temp_sensor_data VALUES + (1, "room1", 101.5, "2022-01-01 00:00:02"), + (2, "room2", 102.5, "2022-01-01 00:00:03"); + +-- You may want to flush the flow task to see the result +ADMIN FLUSH_FLOW('temp_monitoring'); + +-- now sink table will have the max temperature data +SELECT * FROM temp_alerts; ``` 上述的查询将从 `temp_sensor_data` 表中不断聚合数据到 `temp_alerts` 表中。它计算每个传感器和位置的最高温度读数,并过滤出最高温度超过 100 度的数据。`temp_alerts` 表将不断更新聚合数据,并且当温度超过阈值时提供实时警报(即 `temp_alerts` 表中的新行)。 @@ -112,6 +176,20 @@ GROUP BY stat, time_window, bucket_size; + +INSERT INTO ngx_access_log VALUES + ("cli1", 200, 100, "2022-01-01 00:00:00"), + ("cli2", 200, 110, "2022-01-01 00:00:01"), + ("cli3", 200, 120, "2022-01-01 00:00:02"), + ("cli4", 200, 130, "2022-01-01 00:00:03"), + ("cli5", 200, 140, "2022-01-01 00:00:04"), + ("cli6", 404, 150, "2022-01-01 00:00:05"), + ("cli7", 404, 160, "2022-01-01 00:00:06"), + ("cli8", 404, 170, "2022-01-01 00:00:07"), + ("cli9", 404, 180, "2022-01-01 00:00:08"), + ("cli10", 404, 190, "2022-01-01 00:00:09"); + +SELECT * FROM ngx_distribution; ``` 上述查询将从 `ngx_access_log` 表中的数据放入 `ngx_distribution` 表中。它计算每个状态码的数据包大小分布,并将数据分组到每个时间窗口中。`ngx_distribution` 表将不断更新聚合数据,提供实时洞察,显示每个状态码的数据包大小分布。 From 1d42b40e3cba586f758680fb4cb7ad61a1fb9063 Mon Sep 17 00:00:00 2001 From: discord9 Date: Mon, 9 Sep 2024 15:57:02 +0800 Subject: [PATCH 16/23] chore: typo --- docs/user-guide/continuous-aggregation/usecase-example.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/user-guide/continuous-aggregation/usecase-example.md b/docs/user-guide/continuous-aggregation/usecase-example.md index ca07fcc49..cd0398264 100644 --- a/docs/user-guide/continuous-aggregation/usecase-example.md +++ b/docs/user-guide/continuous-aggregation/usecase-example.md @@ -82,7 +82,7 @@ GROUP BY The above query puts the data from the `ngx_access_log` table into the `ngx_country` table. It calculates the distinct country for each time window. The `date_bin` function is used to group the data into one-hour intervals. The `ngx_country` table will be continuously updated with the aggregated data, providing real-time insights into the distinct countries that are accessing the system. -Note that there is currently no persistent storage for flow's internal state, internal state refer to intermediate state used in computing incremental query result, like accumlator's value for a aggregation query(i.e. `count(col)`'s accumlator record current count number), there is persistent storage for the sink table data however. +Note that there is currently no persistent storage for flow's internal state, internal state refer to intermediate state used in computing incremental query result, like accumulator's value for a aggregation query(i.e. `count(col)`'s accumulator record current count number), there is persistent storage for the sink table data however. so it's recommended to use appropriate time window(i.e. hourly if you can tolerate loss one hour of data when rebooting) to miniminize data loss, because if the internal state is lost, related time window data will be lost as well. ## Real-time monitoring example From c77cf453bae2ee5966736e0b0069a95e644289f0 Mon Sep 17 00:00:00 2001 From: discord9 Date: Mon, 9 Sep 2024 16:58:47 +0800 Subject: [PATCH 17/23] refactor: use standard comment --- .../continuous-aggregation/usecase-example.md | 54 ++++++++++++------- .../continuous-aggregation/usecase-example.md | 54 ++++++++++++------- 2 files changed, 68 insertions(+), 40 deletions(-) diff --git a/docs/user-guide/continuous-aggregation/usecase-example.md b/docs/user-guide/continuous-aggregation/usecase-example.md index cd0398264..f229cd115 100644 --- a/docs/user-guide/continuous-aggregation/usecase-example.md +++ b/docs/user-guide/continuous-aggregation/usecase-example.md @@ -16,30 +16,34 @@ See [Overview](overview.md) for an example of real-time analytics. Which is to c Another example of real-time analytics is to get all distinct country from the `ngx_access_log` table. The query for continuous aggregation would be: ```sql --- input table +/* input table */ CREATE TABLE ngx_access_log ( client STRING, country STRING, access_time TIMESTAMP TIME INDEX ); --- output table +/* output table */ CREATE TABLE ngx_country ( country STRING, update_at TIMESTAMP, - __ts_placeholder TIMESTAMP TIME INDEX, -- placeholder column for time index + __ts_placeholder TIMESTAMP TIME INDEX, /* placeholder column for time index */ PRIMARY KEY(country) ); --- create flow task to calculate the distinct country +/* create flow task to calculate the distinct country */ CREATE FLOW calc_ngx_country SINK TO ngx_country AS SELECT DISTINCT country, FROM ngx_access_log; +``` + +now that we have created the flow task, we can insert some data into the source table `ngx_access_log`: --- insert some data +```sql +/* insert some data */ INSERT INTO ngx_access_log VALUES ("client1", "US", "2022-01-01 00:00:00"), ("client2", "US", "2022-01-01 00:00:01"), @@ -52,18 +56,18 @@ INSERT INTO ngx_access_log VALUES ("client9", "KR", "2022-01-01 00:00:08"), ("client10", "KR", "2022-01-01 00:00:09"); --- check the result +/* check the result */ select * from ngx_country; ``` or if you want to group the data by time window, you can use the following query: ```sql --- input table create same as above --- output table +/* input table create same as above */ +/* output table */ CREATE TABLE ngx_country ( country STRING, - time_window TIMESTAMP TIME INDEX,-- no need to use __ts_placeholder here since we have a time window column as time index + time_window TIMESTAMP TIME INDEX,/* no need to use __ts_placeholder here since we have a time window column as time index */ update_at TIMESTAMP, PRIMARY KEY(country) ); @@ -77,7 +81,7 @@ FROM ngx_access_log GROUP BY country, time_window; --- insert data using the same data as above +/* insert data using the same data as above */ ``` The above query puts the data from the `ngx_access_log` table into the `ngx_country` table. It calculates the distinct country for each time window. The `date_bin` function is used to group the data into one-hour intervals. The `ngx_country` table will be continuously updated with the aggregated data, providing real-time insights into the distinct countries that are accessing the system. @@ -90,7 +94,7 @@ so it's recommended to use appropriate time window(i.e. hourly if you can tolera Consider a usecase where you have a stream of sensor events from a network of temperature sensors that you want to monitor in real-time. The sensor events contain information such as the sensor ID, the temperature reading, the timestamp of the reading, and the location of the sensor. You want to continuously aggregate this data to provide real-time alerts when the temperature exceeds a certain threshold. Then the query for continuous aggregation would be: ```sql --- input table +/* create input table */ CREATE TABLE temp_sensor_data ( sensor_id INT, loc STRING, @@ -98,7 +102,7 @@ CREATE TABLE temp_sensor_data ( ts TIMESTAMP TIME INDEX ); --- output table +/* create output table */ CREATE TABLE temp_alerts ( sensor_id INT, loc STRING, @@ -119,25 +123,30 @@ GROUP BY sensor_id, loc HAVING max_temp > 100; +``` + +Now that we have created the flow task, we can insert some data into the source table `temp_sensor_data`: + +```sql INSERT INTO temp_sensor_data VALUES (1, "room1", 98.5, "2022-01-01 00:00:00"), (2, "room2", 99.5, "2022-01-01 00:00:01"); --- You may want to flush the flow task to see the result +/* You may want to flush the flow task to see the result */ ADMIN FLUSH_FLOW('temp_monitoring'); --- for now sink table will be empty +/* for now sink table will be empty */ SELECT * FROM temp_alerts; INSERT INTO temp_sensor_data VALUES (1, "room1", 101.5, "2022-01-01 00:00:02"), (2, "room2", 102.5, "2022-01-01 00:00:03"); --- You may want to flush the flow task to see the result +/* You may want to flush the flow task to see the result */ ADMIN FLUSH_FLOW('temp_monitoring'); --- now sink table will have the max temperature data +/* now sink table will have the max temperature data */ SELECT * FROM temp_alerts; ``` @@ -149,23 +158,23 @@ The above query continuously aggregates the data from the `temp_sensor_data` tab Consider a usecase in which you need a bar graph that show the distribution of packet sizes for each status code to monitor the health of the system. The query for continuous aggregation would be: ```sql --- create input table +/* create input table */ CREATE TABLE ngx_access_log ( client STRING, stat INT, size INT, access_time TIMESTAMP TIME INDEX ); --- create output table +/* create output table */ CREATE TABLE ngx_distribution ( stat INT, bucket_size INT, total_logs BIGINT, time_window TIMESTAMP TIME INDEX, - update_at TIMESTAMP, -- auto generated column to store the last update time + update_at TIMESTAMP, /* auto generated column to store the last update time */ PRIMARY KEY(stat, bucket_size) ); --- create flow task to calculate the distribution of packet sizes for each status code +/* create flow task to calculate the distribution of packet sizes for each status code */ CREATE FLOW calc_ngx_distribution SINK TO ngx_distribution AS SELECT stat, @@ -178,6 +187,11 @@ GROUP BY stat, time_window, bucket_size; +``` + +Now that we have created the flow task, we can insert some data into the source table `ngx_access_log`: + +```sql INSERT INTO ngx_access_log VALUES ("cli1", 200, 100, "2022-01-01 00:00:00"), diff --git a/i18n/zh/docusaurus-plugin-content-docs/current/user-guide/continuous-aggregation/usecase-example.md b/i18n/zh/docusaurus-plugin-content-docs/current/user-guide/continuous-aggregation/usecase-example.md index 838313284..792f653c6 100644 --- a/i18n/zh/docusaurus-plugin-content-docs/current/user-guide/continuous-aggregation/usecase-example.md +++ b/i18n/zh/docusaurus-plugin-content-docs/current/user-guide/continuous-aggregation/usecase-example.md @@ -16,30 +16,34 @@ 另外,您还可以使用持续聚合来计算其他类型的实时分析。例如,要从 `ngx_access_log` 表中获取所有不同的国家。持续聚合的查询如下: ```sql --- input table +/* input table */ CREATE TABLE ngx_access_log ( client STRING, country STRING, access_time TIMESTAMP TIME INDEX ); --- output table +/* output table */ CREATE TABLE ngx_country ( country STRING, update_at TIMESTAMP, - __ts_placeholder TIMESTAMP TIME INDEX, -- placeholder column for time index + __ts_placeholder TIMESTAMP TIME INDEX, /* placeholder column for time index */ PRIMARY KEY(country) ); --- create flow task to calculate the distinct country +/* create flow task to calculate the distinct country */ CREATE FLOW calc_ngx_country SINK TO ngx_country AS SELECT DISTINCT country, FROM ngx_access_log; +``` + +创建好 flow 任务后,我们可以将一些数据插入源表 `ngx_access_log` 中: --- insert some data +```sql +/* insert some data */ INSERT INTO ngx_access_log VALUES ("client1", "US", "2022-01-01 00:00:00"), ("client2", "US", "2022-01-01 00:00:01"), @@ -52,18 +56,18 @@ INSERT INTO ngx_access_log VALUES ("client9", "KR", "2022-01-01 00:00:08"), ("client10", "KR", "2022-01-01 00:00:09"); --- check the result +/* check the result */ select * from ngx_country; ``` 或者,如果您想要按时间窗口对数据进行分组,可以使用以下查询: ```sql --- input table create same as above --- output table +/* input table create same as above */ +/* output table */ CREATE TABLE ngx_country ( country STRING, - time_window TIMESTAMP TIME INDEX,-- no need to use __ts_placeholder here since we have a time window column as time index + time_window TIMESTAMP TIME INDEX,/* no need to use __ts_placeholder here since we have a time window column as time index */ update_at TIMESTAMP, PRIMARY KEY(country) ); @@ -77,7 +81,7 @@ FROM ngx_access_log GROUP BY country, time_window; --- insert data using the same data as above +/* insert data using the same data as above */ ``` 上述的查询将 `ngx_access_log` 表中的数据放入 `ngx_country` 表中。它计算每个时间窗口的不同国家。`date_bin` 函数用于将数据分组为一小时的间隔。`ngx_country` 表将不断更新聚合数据,提供实时洞察,显示正在访问系统的不同国家。 @@ -89,7 +93,7 @@ GROUP BY 假设您希望实时监控一个来自温度传感器网络的传感器事件流。传感器事件包含传感器 ID、温度读数、读数的时间戳和传感器的位置等信息。您希望不断聚合这些数据,以在温度超过某个阈值时提供实时警报。那么持续聚合的查询将是: ```sql --- input table +/* create input table */ CREATE TABLE temp_sensor_data ( sensor_id INT, loc STRING, @@ -97,7 +101,7 @@ CREATE TABLE temp_sensor_data ( ts TIMESTAMP TIME INDEX ); --- output table +/* create output table */ CREATE TABLE temp_alerts ( sensor_id INT, loc STRING, @@ -118,25 +122,30 @@ GROUP BY sensor_id, loc HAVING max_temp > 100; +``` + +创建好 flow 任务后,我们可以将一些数据插入源表 `temp_sensor_data` 中: + +```sql INSERT INTO temp_sensor_data VALUES (1, "room1", 98.5, "2022-01-01 00:00:00"), (2, "room2", 99.5, "2022-01-01 00:00:01"); --- You may want to flush the flow task to see the result +/* You may want to flush the flow task to see the result */ ADMIN FLUSH_FLOW('temp_monitoring'); --- for now sink table will be empty +/* for now sink table will be empty */ SELECT * FROM temp_alerts; INSERT INTO temp_sensor_data VALUES (1, "room1", 101.5, "2022-01-01 00:00:02"), (2, "room2", 102.5, "2022-01-01 00:00:03"); --- You may want to flush the flow task to see the result +/* You may want to flush the flow task to see the result */ ADMIN FLUSH_FLOW('temp_monitoring'); --- now sink table will have the max temperature data +/* now sink table will have the max temperature data */ SELECT * FROM temp_alerts; ``` @@ -147,23 +156,23 @@ SELECT * FROM temp_alerts; 假设您需要一个柱状图显示每个状态码的数据包大小分布,以监控系统的健康状况。持续聚合的查询将是: ```sql --- create input table +/* create input table */ CREATE TABLE ngx_access_log ( client STRING, stat INT, size INT, access_time TIMESTAMP TIME INDEX ); --- create output table +/* create output table */ CREATE TABLE ngx_distribution ( stat INT, bucket_size INT, total_logs BIGINT, time_window TIMESTAMP TIME INDEX, - update_at TIMESTAMP, -- auto generated column to store the last update time + update_at TIMESTAMP, /* auto generated column to store the last update time */ PRIMARY KEY(stat, bucket_size) ); --- create flow task to calculate the distribution of packet sizes for each status code +/* create flow task to calculate the distribution of packet sizes for each status code */ CREATE FLOW calc_ngx_distribution SINK TO ngx_distribution AS SELECT stat, @@ -176,6 +185,11 @@ GROUP BY stat, time_window, bucket_size; +``` + +创建好 flow 任务后,我们可以将一些数据插入源表 `ngx_access_log` 中: + +```sql INSERT INTO ngx_access_log VALUES ("cli1", 200, 100, "2022-01-01 00:00:00"), From 995a71e98496f757133f0046042e3663e8373321 Mon Sep 17 00:00:00 2001 From: discord9 Date: Mon, 9 Sep 2024 17:00:59 +0800 Subject: [PATCH 18/23] sink table --- docs/user-guide/continuous-aggregation/usecase-example.md | 8 ++++---- .../user-guide/continuous-aggregation/usecase-example.md | 8 ++++---- 2 files changed, 8 insertions(+), 8 deletions(-) diff --git a/docs/user-guide/continuous-aggregation/usecase-example.md b/docs/user-guide/continuous-aggregation/usecase-example.md index f229cd115..2609a5a1e 100644 --- a/docs/user-guide/continuous-aggregation/usecase-example.md +++ b/docs/user-guide/continuous-aggregation/usecase-example.md @@ -23,7 +23,7 @@ CREATE TABLE ngx_access_log ( access_time TIMESTAMP TIME INDEX ); -/* output table */ +/* sink table */ CREATE TABLE ngx_country ( country STRING, update_at TIMESTAMP, @@ -64,7 +64,7 @@ or if you want to group the data by time window, you can use the following query ```sql /* input table create same as above */ -/* output table */ +/* sink table */ CREATE TABLE ngx_country ( country STRING, time_window TIMESTAMP TIME INDEX,/* no need to use __ts_placeholder here since we have a time window column as time index */ @@ -102,7 +102,7 @@ CREATE TABLE temp_sensor_data ( ts TIMESTAMP TIME INDEX ); -/* create output table */ +/* create sink table */ CREATE TABLE temp_alerts ( sensor_id INT, loc STRING, @@ -165,7 +165,7 @@ CREATE TABLE ngx_access_log ( size INT, access_time TIMESTAMP TIME INDEX ); -/* create output table */ +/* create sink table */ CREATE TABLE ngx_distribution ( stat INT, bucket_size INT, diff --git a/i18n/zh/docusaurus-plugin-content-docs/current/user-guide/continuous-aggregation/usecase-example.md b/i18n/zh/docusaurus-plugin-content-docs/current/user-guide/continuous-aggregation/usecase-example.md index 792f653c6..4c8ad814c 100644 --- a/i18n/zh/docusaurus-plugin-content-docs/current/user-guide/continuous-aggregation/usecase-example.md +++ b/i18n/zh/docusaurus-plugin-content-docs/current/user-guide/continuous-aggregation/usecase-example.md @@ -23,7 +23,7 @@ CREATE TABLE ngx_access_log ( access_time TIMESTAMP TIME INDEX ); -/* output table */ +/* sink table */ CREATE TABLE ngx_country ( country STRING, update_at TIMESTAMP, @@ -64,7 +64,7 @@ select * from ngx_country; ```sql /* input table create same as above */ -/* output table */ +/* sink table */ CREATE TABLE ngx_country ( country STRING, time_window TIMESTAMP TIME INDEX,/* no need to use __ts_placeholder here since we have a time window column as time index */ @@ -101,7 +101,7 @@ CREATE TABLE temp_sensor_data ( ts TIMESTAMP TIME INDEX ); -/* create output table */ +/* create sink table */ CREATE TABLE temp_alerts ( sensor_id INT, loc STRING, @@ -163,7 +163,7 @@ CREATE TABLE ngx_access_log ( size INT, access_time TIMESTAMP TIME INDEX ); -/* create output table */ +/* create sink table */ CREATE TABLE ngx_distribution ( stat INT, bucket_size INT, From 4297527331c94a3e54a7aeac5aaba9a653dd1472 Mon Sep 17 00:00:00 2001 From: discord9 Date: Mon, 9 Sep 2024 17:19:40 +0800 Subject: [PATCH 19/23] remove comment --- docs/user-guide/continuous-aggregation/usecase-example.md | 4 ++-- .../user-guide/continuous-aggregation/usecase-example.md | 4 ++-- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/docs/user-guide/continuous-aggregation/usecase-example.md b/docs/user-guide/continuous-aggregation/usecase-example.md index 2609a5a1e..f27f03e7b 100644 --- a/docs/user-guide/continuous-aggregation/usecase-example.md +++ b/docs/user-guide/continuous-aggregation/usecase-example.md @@ -27,7 +27,7 @@ CREATE TABLE ngx_access_log ( CREATE TABLE ngx_country ( country STRING, update_at TIMESTAMP, - __ts_placeholder TIMESTAMP TIME INDEX, /* placeholder column for time index */ + __ts_placeholder TIMESTAMP TIME INDEX, PRIMARY KEY(country) ); @@ -67,7 +67,7 @@ or if you want to group the data by time window, you can use the following query /* sink table */ CREATE TABLE ngx_country ( country STRING, - time_window TIMESTAMP TIME INDEX,/* no need to use __ts_placeholder here since we have a time window column as time index */ + time_window TIMESTAMP TIME INDEX, update_at TIMESTAMP, PRIMARY KEY(country) ); diff --git a/i18n/zh/docusaurus-plugin-content-docs/current/user-guide/continuous-aggregation/usecase-example.md b/i18n/zh/docusaurus-plugin-content-docs/current/user-guide/continuous-aggregation/usecase-example.md index 4c8ad814c..3b2587e6e 100644 --- a/i18n/zh/docusaurus-plugin-content-docs/current/user-guide/continuous-aggregation/usecase-example.md +++ b/i18n/zh/docusaurus-plugin-content-docs/current/user-guide/continuous-aggregation/usecase-example.md @@ -27,7 +27,7 @@ CREATE TABLE ngx_access_log ( CREATE TABLE ngx_country ( country STRING, update_at TIMESTAMP, - __ts_placeholder TIMESTAMP TIME INDEX, /* placeholder column for time index */ + __ts_placeholder TIMESTAMP TIME INDEX, PRIMARY KEY(country) ); @@ -67,7 +67,7 @@ select * from ngx_country; /* sink table */ CREATE TABLE ngx_country ( country STRING, - time_window TIMESTAMP TIME INDEX,/* no need to use __ts_placeholder here since we have a time window column as time index */ + time_window TIMESTAMP TIME INDEX, update_at TIMESTAMP, PRIMARY KEY(country) ); From f12a48a8f8a6dfdae36962bb0d6b820ce55a8375 Mon Sep 17 00:00:00 2001 From: discord9 Date: Mon, 9 Sep 2024 17:42:17 +0800 Subject: [PATCH 20/23] remove admin flush flow --- .../continuous-aggregation/usecase-example.md | 19 ++++++++++--------- 1 file changed, 10 insertions(+), 9 deletions(-) diff --git a/docs/user-guide/continuous-aggregation/usecase-example.md b/docs/user-guide/continuous-aggregation/usecase-example.md index f27f03e7b..9f7cefb10 100644 --- a/docs/user-guide/continuous-aggregation/usecase-example.md +++ b/docs/user-guide/continuous-aggregation/usecase-example.md @@ -56,7 +56,13 @@ INSERT INTO ngx_access_log VALUES ("client9", "KR", "2022-01-01 00:00:08"), ("client10", "KR", "2022-01-01 00:00:09"); -/* check the result */ +``` + + +Wait for one second for the Flow to write the result to the sink table and then query: + + +```sql select * from ngx_country; ``` @@ -133,20 +139,14 @@ INSERT INTO temp_sensor_data VALUES (1, "room1", 98.5, "2022-01-01 00:00:00"), (2, "room2", 99.5, "2022-01-01 00:00:01"); -/* You may want to flush the flow task to see the result */ -ADMIN FLUSH_FLOW('temp_monitoring'); - -/* for now sink table will be empty */ +/* table should be empty now, but still wait at least one second for flow to update results to sink table */ SELECT * FROM temp_alerts; INSERT INTO temp_sensor_data VALUES (1, "room1", 101.5, "2022-01-01 00:00:02"), (2, "room2", 102.5, "2022-01-01 00:00:03"); -/* You may want to flush the flow task to see the result */ -ADMIN FLUSH_FLOW('temp_monitoring'); - -/* now sink table will have the max temperature data */ +/* wait at least one second for flow to update results to sink table */ SELECT * FROM temp_alerts; ``` @@ -205,6 +205,7 @@ INSERT INTO ngx_access_log VALUES ("cli9", 404, 180, "2022-01-01 00:00:08"), ("cli10", 404, 190, "2022-01-01 00:00:09"); +/* wait at least one second for flow to update results to sink table */ SELECT * FROM ngx_distribution; ``` From 1970e1f73bd24b6c23d367a9c28c4ecd9afe328a Mon Sep 17 00:00:00 2001 From: discord9 Date: Tue, 10 Sep 2024 17:50:34 +0800 Subject: [PATCH 21/23] apply review advices --- .../continuous-aggregation/usecase-example.md | 18 ++++++++++-------- .../continuous-aggregation/usecase-example.md | 2 +- 2 files changed, 11 insertions(+), 9 deletions(-) diff --git a/docs/user-guide/continuous-aggregation/usecase-example.md b/docs/user-guide/continuous-aggregation/usecase-example.md index 9f7cefb10..2f8da883e 100644 --- a/docs/user-guide/continuous-aggregation/usecase-example.md +++ b/docs/user-guide/continuous-aggregation/usecase-example.md @@ -11,7 +11,7 @@ In all these usecases, the continuous aggregation system continuously aggregates ## Real-time analytics example -See [Overview](overview.md) for an example of real-time analytics. Which is to calculate the total number of logs, the minimum size, the maximum size, the average size, and the number of packets with the size greater than 550 for each status code in a 1-minute fixed window for access logs. +See [Overview](/user-guide/continuous-aggregation/overview.md#quick-start-with-an-example) for an example of real-time analytics. Which is to calculate the total number of logs, the minimum size, the maximum size, the average size, and the number of packets with the size greater than 550 for each status code in a 1-minute fixed window for access logs. Another example of real-time analytics is to get all distinct country from the `ngx_access_log` table. The query for continuous aggregation would be: @@ -66,7 +66,7 @@ Wait for one second for the Flow to write the result to the sink table and then select * from ngx_country; ``` -or if you want to group the data by time window, you can use the following query: +Or if you want to group the data by time window, you can use the following query: ```sql /* input table create same as above */ @@ -92,8 +92,8 @@ GROUP BY The above query puts the data from the `ngx_access_log` table into the `ngx_country` table. It calculates the distinct country for each time window. The `date_bin` function is used to group the data into one-hour intervals. The `ngx_country` table will be continuously updated with the aggregated data, providing real-time insights into the distinct countries that are accessing the system. -Note that there is currently no persistent storage for flow's internal state, internal state refer to intermediate state used in computing incremental query result, like accumulator's value for a aggregation query(i.e. `count(col)`'s accumulator record current count number), there is persistent storage for the sink table data however. -so it's recommended to use appropriate time window(i.e. hourly if you can tolerate loss one hour of data when rebooting) to miniminize data loss, because if the internal state is lost, related time window data will be lost as well. +Note that there is currently no persistent storage for the internal state of the flow. The internal state refers to the intermediate state used in computing incremental query results, such as the accumulator's value for an aggregation query (e.g., `count(col)`'s accumulator records the current count number). However, there is persistent storage for the data in the sink table. +Therefore, it is recommended to use an appropriate time window (e.g., hourly) to minimize data loss. This is because if the internal state is lost, the related data within that time window will also be lost. ## Real-time monitoring example @@ -145,7 +145,9 @@ SELECT * FROM temp_alerts; INSERT INTO temp_sensor_data VALUES (1, "room1", 101.5, "2022-01-01 00:00:02"), (2, "room2", 102.5, "2022-01-01 00:00:03"); - +``` +wait at least one second for flow to update results to sink table: +```sql /* wait at least one second for flow to update results to sink table */ SELECT * FROM temp_alerts; ``` @@ -192,7 +194,6 @@ GROUP BY Now that we have created the flow task, we can insert some data into the source table `ngx_access_log`: ```sql - INSERT INTO ngx_access_log VALUES ("cli1", 200, 100, "2022-01-01 00:00:00"), ("cli2", 200, 110, "2022-01-01 00:00:01"), @@ -204,8 +205,9 @@ INSERT INTO ngx_access_log VALUES ("cli8", 404, 170, "2022-01-01 00:00:07"), ("cli9", 404, 180, "2022-01-01 00:00:08"), ("cli10", 404, 190, "2022-01-01 00:00:09"); - -/* wait at least one second for flow to update results to sink table */ +``` +wait at least one second for flow to update results to sink table: +```sql SELECT * FROM ngx_distribution; ``` diff --git a/i18n/zh/docusaurus-plugin-content-docs/current/user-guide/continuous-aggregation/usecase-example.md b/i18n/zh/docusaurus-plugin-content-docs/current/user-guide/continuous-aggregation/usecase-example.md index 3b2587e6e..00ae832ce 100644 --- a/i18n/zh/docusaurus-plugin-content-docs/current/user-guide/continuous-aggregation/usecase-example.md +++ b/i18n/zh/docusaurus-plugin-content-docs/current/user-guide/continuous-aggregation/usecase-example.md @@ -11,7 +11,7 @@ ## 实时分析示例 -请参阅[概述](overview.md)中的实时分析示例。该示例用于计算日志的总数、包大小的最小、最大和平均值,以及大小大于 550 的数据包数量按照每个状态码在 1 分钟固定窗口中的实时分析。 +请参阅[概述](/user-guide/continuous-aggregation/overview.md#快速开始示例)中的实时分析示例。该示例用于计算日志的总数、包大小的最小、最大和平均值,以及大小大于 550 的数据包数量按照每个状态码在 1 分钟固定窗口中的实时分析。 另外,您还可以使用持续聚合来计算其他类型的实时分析。例如,要从 `ngx_access_log` 表中获取所有不同的国家。持续聚合的查询如下: From 2c88ebabba676d07fb3edd0961ebd26eb42e6bbf Mon Sep 17 00:00:00 2001 From: discord9 Date: Tue, 10 Sep 2024 18:27:25 +0800 Subject: [PATCH 22/23] =?UTF-8?q?=E6=9B=B4=E6=96=B0=E4=B8=AD=E6=96=87?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../continuous-aggregation/usecase-example.md | 7 +++++- .../continuous-aggregation/usecase-example.md | 22 ++++++++++++++----- 2 files changed, 23 insertions(+), 6 deletions(-) diff --git a/docs/user-guide/continuous-aggregation/usecase-example.md b/docs/user-guide/continuous-aggregation/usecase-example.md index 2f8da883e..41ebc8da0 100644 --- a/docs/user-guide/continuous-aggregation/usecase-example.md +++ b/docs/user-guide/continuous-aggregation/usecase-example.md @@ -138,9 +138,14 @@ Now that we have created the flow task, we can insert some data into the source INSERT INTO temp_sensor_data VALUES (1, "room1", 98.5, "2022-01-01 00:00:00"), (2, "room2", 99.5, "2022-01-01 00:00:01"); +``` +table should be empty now, but still wait at least one second for flow to update results to sink table: -/* table should be empty now, but still wait at least one second for flow to update results to sink table */ +```sql SELECT * FROM temp_alerts; +``` + +```sql INSERT INTO temp_sensor_data VALUES (1, "room1", 101.5, "2022-01-01 00:00:02"), diff --git a/i18n/zh/docusaurus-plugin-content-docs/current/user-guide/continuous-aggregation/usecase-example.md b/i18n/zh/docusaurus-plugin-content-docs/current/user-guide/continuous-aggregation/usecase-example.md index 00ae832ce..95bcc172e 100644 --- a/i18n/zh/docusaurus-plugin-content-docs/current/user-guide/continuous-aggregation/usecase-example.md +++ b/i18n/zh/docusaurus-plugin-content-docs/current/user-guide/continuous-aggregation/usecase-example.md @@ -55,7 +55,9 @@ INSERT INTO ngx_access_log VALUES ("client8", "JP", "2022-01-01 00:00:07"), ("client9", "KR", "2022-01-01 00:00:08"), ("client10", "KR", "2022-01-01 00:00:09"); - +``` +等待一秒钟确保 Flow 有时间将结果写入 sink 表,然后就可以查询结果了: +```sql /* check the result */ select * from ngx_country; ``` @@ -86,7 +88,7 @@ GROUP BY 上述的查询将 `ngx_access_log` 表中的数据放入 `ngx_country` 表中。它计算每个时间窗口的不同国家。`date_bin` 函数用于将数据分组为一小时的间隔。`ngx_country` 表将不断更新聚合数据,提供实时洞察,显示正在访问系统的不同国家。 -请注意,目前 Flow 的内部状态没有持久存储。内部状态指的是用于计算增量查询结果的中间状态,例如聚合查询的累加器值(如count(col)的累加器记录了目前为止的 count 计数)。然而,Sink 表的数据是有持久存储的。因此,建议您使用适当的时间窗口(例如,如果可以接受在重启时丢失一小时的数据,则可以设置为每小时)来最小化数据丢失。因为一旦内部状态丢失,相关时间窗口的数据也将随之丢失。 +请注意,目前 Flow 的内部状态没有持久存储。内部状态指的是用于计算增量查询结果的中间状态,例如聚合查询的累加器值(如count(col)的累加器记录了目前为止的 count 计数)。然而,Sink 表的数据是有持久存储的。因此,建议您使用适当的时间窗口(例如设置为每小时)来最小化数据丢失。因为一旦内部状态丢失,相关时间窗口的数据也将随之丢失。 ## 实时监控示例 @@ -134,17 +136,25 @@ INSERT INTO temp_sensor_data VALUES /* You may want to flush the flow task to see the result */ ADMIN FLUSH_FLOW('temp_monitoring'); +``` +当前输出表应该为空。您可以在等待一秒后通过以下查询查看结果: + +```sql /* for now sink table will be empty */ SELECT * FROM temp_alerts; +``` + +```sql INSERT INTO temp_sensor_data VALUES (1, "room1", 101.5, "2022-01-01 00:00:02"), (2, "room2", 102.5, "2022-01-01 00:00:03"); -/* You may want to flush the flow task to see the result */ -ADMIN FLUSH_FLOW('temp_monitoring'); +``` +等待一秒钟确保 Flow 有时间将结果写入 sink 表,然后就可以查询结果了: +```sql /* now sink table will have the max temperature data */ SELECT * FROM temp_alerts; ``` @@ -202,7 +212,9 @@ INSERT INTO ngx_access_log VALUES ("cli8", 404, 170, "2022-01-01 00:00:07"), ("cli9", 404, 180, "2022-01-01 00:00:08"), ("cli10", 404, 190, "2022-01-01 00:00:09"); - +``` +等待一秒钟确保 Flow 有时间将结果写入 sink 表,然后就可以查询结果了: +```sql SELECT * FROM ngx_distribution; ``` From 94eee85227d59299cc922e04d2cce06716e3b388 Mon Sep 17 00:00:00 2001 From: discord9 Date: Wed, 11 Sep 2024 14:47:14 +0800 Subject: [PATCH 23/23] chore: better example data --- .../continuous-aggregation/usecase-example.md | 10 +++++----- .../continuous-aggregation/usecase-example.md | 11 +++++------ 2 files changed, 10 insertions(+), 11 deletions(-) diff --git a/docs/user-guide/continuous-aggregation/usecase-example.md b/docs/user-guide/continuous-aggregation/usecase-example.md index 41ebc8da0..d59140c74 100644 --- a/docs/user-guide/continuous-aggregation/usecase-example.md +++ b/docs/user-guide/continuous-aggregation/usecase-example.md @@ -201,15 +201,15 @@ Now that we have created the flow task, we can insert some data into the source ```sql INSERT INTO ngx_access_log VALUES ("cli1", 200, 100, "2022-01-01 00:00:00"), - ("cli2", 200, 110, "2022-01-01 00:00:01"), + ("cli2", 200, 104, "2022-01-01 00:00:01"), ("cli3", 200, 120, "2022-01-01 00:00:02"), - ("cli4", 200, 130, "2022-01-01 00:00:03"), + ("cli4", 200, 124, "2022-01-01 00:00:03"), ("cli5", 200, 140, "2022-01-01 00:00:04"), - ("cli6", 404, 150, "2022-01-01 00:00:05"), + ("cli6", 404, 144, "2022-01-01 00:00:05"), ("cli7", 404, 160, "2022-01-01 00:00:06"), - ("cli8", 404, 170, "2022-01-01 00:00:07"), + ("cli8", 404, 164, "2022-01-01 00:00:07"), ("cli9", 404, 180, "2022-01-01 00:00:08"), - ("cli10", 404, 190, "2022-01-01 00:00:09"); + ("cli10", 404, 184, "2022-01-01 00:00:09"); ``` wait at least one second for flow to update results to sink table: ```sql diff --git a/i18n/zh/docusaurus-plugin-content-docs/current/user-guide/continuous-aggregation/usecase-example.md b/i18n/zh/docusaurus-plugin-content-docs/current/user-guide/continuous-aggregation/usecase-example.md index 95bcc172e..1f739b9a4 100644 --- a/i18n/zh/docusaurus-plugin-content-docs/current/user-guide/continuous-aggregation/usecase-example.md +++ b/i18n/zh/docusaurus-plugin-content-docs/current/user-guide/continuous-aggregation/usecase-example.md @@ -200,18 +200,17 @@ GROUP BY 创建好 flow 任务后,我们可以将一些数据插入源表 `ngx_access_log` 中: ```sql - INSERT INTO ngx_access_log VALUES ("cli1", 200, 100, "2022-01-01 00:00:00"), - ("cli2", 200, 110, "2022-01-01 00:00:01"), + ("cli2", 200, 104, "2022-01-01 00:00:01"), ("cli3", 200, 120, "2022-01-01 00:00:02"), - ("cli4", 200, 130, "2022-01-01 00:00:03"), + ("cli4", 200, 124, "2022-01-01 00:00:03"), ("cli5", 200, 140, "2022-01-01 00:00:04"), - ("cli6", 404, 150, "2022-01-01 00:00:05"), + ("cli6", 404, 144, "2022-01-01 00:00:05"), ("cli7", 404, 160, "2022-01-01 00:00:06"), - ("cli8", 404, 170, "2022-01-01 00:00:07"), + ("cli8", 404, 164, "2022-01-01 00:00:07"), ("cli9", 404, 180, "2022-01-01 00:00:08"), - ("cli10", 404, 190, "2022-01-01 00:00:09"); + ("cli10", 404, 184, "2022-01-01 00:00:09"); ``` 等待一秒钟确保 Flow 有时间将结果写入 sink 表,然后就可以查询结果了: ```sql