diff --git a/src/flow/src/compute/render/reduce.rs b/src/flow/src/compute/render/reduce.rs index 1d0689c4032f..0bbc613260d7 100644 --- a/src/flow/src/compute/render/reduce.rs +++ b/src/flow/src/compute/render/reduce.rs @@ -16,6 +16,7 @@ use std::collections::{BTreeMap, BTreeSet}; use std::ops::Range; use std::sync::Arc; +use arrow::array::new_null_array; use common_telemetry::trace; use datatypes::data_type::ConcreteDataType; use datatypes::prelude::DataType; @@ -398,20 +399,54 @@ fn reduce_batch_subgraph( } } - // TODO: here reduce numbers of eq to minimal by keeping slicing key/val batch + let key_data_types = output_type + .column_types + .iter() + .map(|t| t.scalar_type.clone()) + .collect_vec(); + + // TODO(discord9): here reduce numbers of eq to minimal by keeping slicing key/val batch for key_row in distinct_keys { let key_scalar_value = { let mut key_scalar_value = Vec::with_capacity(key_row.len()); - for key in key_row.iter() { + for (key_idx, key) in key_row.iter().enumerate() { let v = key.try_to_scalar_value(&key.data_type()) .context(DataTypeSnafu { msg: "can't convert key values to datafusion value", })?; - let arrow_value = + + let key_data_type = key_data_types.get(key_idx).context(InternalSnafu { + reason: format!( + "Key index out of bound, expected at most {} but got {}", + output_type.column_types.len(), + key_idx + ), + })?; + + // if incoming value's datatype is null, it need to be handled specially, see below + if key_data_type.as_arrow_type() != v.data_type() + && !v.data_type().is_null() + { + crate::expr::error::InternalSnafu { + reason: format!( + "Key data type mismatch, expected {:?} but got {:?}", + key_data_type.as_arrow_type(), + v.data_type() + ), + } + .fail()? + } + + // handle single null key + let arrow_value = if v.data_type().is_null() { + let ret = new_null_array(&arrow::datatypes::DataType::Null, 1); + arrow::array::Scalar::new(ret) + } else { v.to_scalar().context(crate::expr::error::DatafusionSnafu { context: "can't convert key values to arrow value", - })?; + })? + }; key_scalar_value.push(arrow_value); } key_scalar_value @@ -423,7 +458,19 @@ fn reduce_batch_subgraph( .zip(key_batch.batch().iter()) .map(|(key, col)| { // TODO(discord9): this takes half of the cpu! And this is redundant amount of `eq`! - arrow::compute::kernels::cmp::eq(&key, &col.to_arrow_array().as_ref() as _) + + // note that if lhs is a null, we still need to get all rows that are null! But can't use `eq` since + // it will return null if input have null, so we need to use `is_null` instead + if arrow::array::Datum::get(&key).0.data_type().is_null() { + arrow::compute::kernels::boolean::is_null( + col.to_arrow_array().as_ref() as _ + ) + } else { + arrow::compute::kernels::cmp::eq( + &key, + &col.to_arrow_array().as_ref() as _, + ) + } }) .try_collect::<_, Vec<_>, _>() .context(ArrowSnafu { diff --git a/tests/cases/standalone/common/flow/flow_null.result b/tests/cases/standalone/common/flow/flow_null.result new file mode 100644 index 000000000000..298d4025da07 --- /dev/null +++ b/tests/cases/standalone/common/flow/flow_null.result @@ -0,0 +1,251 @@ +-- test null handling in flow +-- test null handling in value part of key-value pair +CREATE TABLE requests ( + service_name STRING, + service_ip STRING, + val INT, + ts TIMESTAMP TIME INDEX +); + +Affected Rows: 0 + +CREATE TABLE sum_val_in_reqs ( + sum_val INT64, + ts TIMESTAMP TIME INDEX +); + +Affected Rows: 0 + +CREATE FLOW requests_long_term SINK TO sum_val_in_reqs AS +SELECT + sum(val) as sum_val, + date_bin(INTERVAL '30 seconds', ts) as time_window, +FROM + requests +GROUP BY + time_window; + +Affected Rows: 0 + +INSERT INTO + requests +VALUES + (NULL, "10.0.0.1", NULL, "2024-10-18 19:00:00"), + ("svc1", "10.0.0.2", 100, "2024-10-18 19:00:00"), + (NULL, "10.0.0.1", NULL, "2024-10-18 19:00:30"), + ("svc1", "10.0.0.2", 200, "2024-10-18 19:00:30"), + (NULL, "10.0.0.1", 300, "2024-10-18 19:01:00"), + (NULL, "10.0.0.2", NULL, "2024-10-18 19:01:01"), + ("svc1", "10.0.0.1", 400, "2024-10-18 19:01:30"), + ("svc1", "10.0.0.2", 200, "2024-10-18 19:01:31"); + +Affected Rows: 8 + +-- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED | +ADMIN FLUSH_FLOW('requests_long_term'); + ++----------------------------------------+ +| ADMIN FLUSH_FLOW('requests_long_term') | ++----------------------------------------+ +| FLOW_FLUSHED | ++----------------------------------------+ + +SELECT + * +FROM + sum_val_in_reqs; + ++---------+---------------------+ +| sum_val | ts | ++---------+---------------------+ +| 100 | 2024-10-18T19:00:00 | +| 200 | 2024-10-18T19:00:30 | +| 300 | 2024-10-18T19:01:00 | +| 600 | 2024-10-18T19:01:30 | ++---------+---------------------+ + +-- Test if FLOWS table works, but don't care about the result since it vary from runs +SELECT + count(CASE WHEN state_size > 0 THEN 1 ELSE 0 END) as active_flows, +FROM + INFORMATION_SCHEMA.FLOWS; + ++--------------+ +| active_flows | ++--------------+ +| 1 | ++--------------+ + +DROP FLOW requests_long_term; + +Affected Rows: 0 + +DROP TABLE sum_val_in_reqs; + +Affected Rows: 0 + +DROP TABLE requests; + +Affected Rows: 0 + +-- test null handling in key part of key-value pair +CREATE TABLE ngx_access_log ( + client STRING, + country STRING, + access_time TIMESTAMP TIME INDEX +); + +Affected Rows: 0 + +CREATE FLOW calc_ngx_country SINK TO ngx_country AS +SELECT + client, + country as 'country', + count(1) as country_count, + date_bin(INTERVAL '1 hour', access_time) as time_window, +FROM + ngx_access_log +GROUP BY + client, + country, + time_window; + +Affected Rows: 0 + +INSERT INTO + ngx_access_log +VALUES + ("cli1", null, 0), + ("cli1", null, 0), + ("cli2", null, 0), + ("cli2", null, 1), + ("cli1", "b", 0), + ("cli1", "c", 0); + +Affected Rows: 6 + +-- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED | +ADMIN FLUSH_FLOW('calc_ngx_country'); + ++--------------------------------------+ +| ADMIN FLUSH_FLOW('calc_ngx_country') | ++--------------------------------------+ +| FLOW_FLUSHED | ++--------------------------------------+ + +SHOW CREATE TABLE ngx_country; + ++-------------+----------------------------------------------------------------------------------+ +| Table | Create Table | ++-------------+----------------------------------------------------------------------------------+ +| ngx_country | CREATE TABLE IF NOT EXISTS "ngx_country" ( | +| | "ngx_access_log.client" STRING NULL, | +| | "ngx_access_log.country" STRING NULL, | +| | "country_count" BIGINT NULL, | +| | "time_window" TIMESTAMP(3) NULL, | +| | "update_at" TIMESTAMP(3) NULL, | +| | "__ts_placeholder" TIMESTAMP(3) NOT NULL, | +| | TIME INDEX ("__ts_placeholder"), | +| | PRIMARY KEY ("ngx_access_log.client", "ngx_access_log.country", "time_window") | +| | ) | +| | | +| | ENGINE=mito | +| | | ++-------------+----------------------------------------------------------------------------------+ + +SELECT + "ngx_access_log.client", + "ngx_access_log.country", + country_count, + time_window +FROM + ngx_country; + ++-----------------------+------------------------+---------------+---------------------+ +| ngx_access_log.client | ngx_access_log.country | country_count | time_window | ++-----------------------+------------------------+---------------+---------------------+ +| cli1 | | 2 | 1970-01-01T00:00:00 | +| cli1 | b | 1 | 1970-01-01T00:00:00 | +| cli1 | c | 1 | 1970-01-01T00:00:00 | +| cli2 | | 2 | 1970-01-01T00:00:00 | ++-----------------------+------------------------+---------------+---------------------+ + +-- making sure distinct is working +INSERT INTO + ngx_access_log +VALUES + ("cli1", "b", 1); + +Affected Rows: 1 + +-- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED | +ADMIN FLUSH_FLOW('calc_ngx_country'); + ++--------------------------------------+ +| ADMIN FLUSH_FLOW('calc_ngx_country') | ++--------------------------------------+ +| FLOW_FLUSHED | ++--------------------------------------+ + +SELECT + "ngx_access_log.client", + "ngx_access_log.country", + country_count, + time_window +FROM + ngx_country; + ++-----------------------+------------------------+---------------+---------------------+ +| ngx_access_log.client | ngx_access_log.country | country_count | time_window | ++-----------------------+------------------------+---------------+---------------------+ +| cli1 | | 2 | 1970-01-01T00:00:00 | +| cli1 | b | 2 | 1970-01-01T00:00:00 | +| cli1 | c | 1 | 1970-01-01T00:00:00 | +| cli2 | | 2 | 1970-01-01T00:00:00 | ++-----------------------+------------------------+---------------+---------------------+ + +INSERT INTO + ngx_access_log +VALUES + ("cli1", "c", 2); + +Affected Rows: 1 + +-- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED | +ADMIN FLUSH_FLOW('calc_ngx_country'); + ++--------------------------------------+ +| ADMIN FLUSH_FLOW('calc_ngx_country') | ++--------------------------------------+ +| FLOW_FLUSHED | ++--------------------------------------+ + +SELECT + "ngx_access_log.client", + "ngx_access_log.country", + country_count, + time_window +FROM + ngx_country; + ++-----------------------+------------------------+---------------+---------------------+ +| ngx_access_log.client | ngx_access_log.country | country_count | time_window | ++-----------------------+------------------------+---------------+---------------------+ +| cli1 | | 2 | 1970-01-01T00:00:00 | +| cli1 | b | 2 | 1970-01-01T00:00:00 | +| cli1 | c | 2 | 1970-01-01T00:00:00 | +| cli2 | | 2 | 1970-01-01T00:00:00 | ++-----------------------+------------------------+---------------+---------------------+ + +DROP FLOW calc_ngx_country; + +Affected Rows: 0 + +DROP TABLE ngx_access_log; + +Affected Rows: 0 + +DROP TABLE ngx_country; + +Affected Rows: 0 + diff --git a/tests/cases/standalone/common/flow/flow_null.sql b/tests/cases/standalone/common/flow/flow_null.sql new file mode 100644 index 000000000000..70719acb8d58 --- /dev/null +++ b/tests/cases/standalone/common/flow/flow_null.sql @@ -0,0 +1,137 @@ +-- test null handling in flow + +-- test null handling in value part of key-value pair +CREATE TABLE requests ( + service_name STRING, + service_ip STRING, + val INT, + ts TIMESTAMP TIME INDEX +); + +CREATE TABLE sum_val_in_reqs ( + sum_val INT64, + ts TIMESTAMP TIME INDEX +); + +CREATE FLOW requests_long_term SINK TO sum_val_in_reqs AS +SELECT + sum(val) as sum_val, + date_bin(INTERVAL '30 seconds', ts) as time_window, +FROM + requests +GROUP BY + time_window; + +INSERT INTO + requests +VALUES + (NULL, "10.0.0.1", NULL, "2024-10-18 19:00:00"), + ("svc1", "10.0.0.2", 100, "2024-10-18 19:00:00"), + (NULL, "10.0.0.1", NULL, "2024-10-18 19:00:30"), + ("svc1", "10.0.0.2", 200, "2024-10-18 19:00:30"), + (NULL, "10.0.0.1", 300, "2024-10-18 19:01:00"), + (NULL, "10.0.0.2", NULL, "2024-10-18 19:01:01"), + ("svc1", "10.0.0.1", 400, "2024-10-18 19:01:30"), + ("svc1", "10.0.0.2", 200, "2024-10-18 19:01:31"); + +-- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED | +ADMIN FLUSH_FLOW('requests_long_term'); + +SELECT + * +FROM + sum_val_in_reqs; + +-- Test if FLOWS table works, but don't care about the result since it vary from runs +SELECT + count(CASE WHEN state_size > 0 THEN 1 ELSE 0 END) as active_flows, +FROM + INFORMATION_SCHEMA.FLOWS; + +DROP FLOW requests_long_term; + +DROP TABLE sum_val_in_reqs; + +DROP TABLE requests; + +-- test null handling in key part of key-value pair +CREATE TABLE ngx_access_log ( + client STRING, + country STRING, + access_time TIMESTAMP TIME INDEX +); + +CREATE FLOW calc_ngx_country SINK TO ngx_country AS +SELECT + client, + country as 'country', + count(1) as country_count, + date_bin(INTERVAL '1 hour', access_time) as time_window, +FROM + ngx_access_log +GROUP BY + client, + country, + time_window; + +INSERT INTO + ngx_access_log +VALUES + ("cli1", null, 0), + ("cli1", null, 0), + ("cli2", null, 0), + ("cli2", null, 1), + ("cli1", "b", 0), + ("cli1", "c", 0); + +-- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED | +ADMIN FLUSH_FLOW('calc_ngx_country'); + +SHOW CREATE TABLE ngx_country; + +SELECT + "ngx_access_log.client", + "ngx_access_log.country", + country_count, + time_window +FROM + ngx_country; + +-- making sure distinct is working +INSERT INTO + ngx_access_log +VALUES + ("cli1", "b", 1); + +-- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED | +ADMIN FLUSH_FLOW('calc_ngx_country'); + +SELECT + "ngx_access_log.client", + "ngx_access_log.country", + country_count, + time_window +FROM + ngx_country; + +INSERT INTO + ngx_access_log +VALUES + ("cli1", "c", 2); + +-- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED | +ADMIN FLUSH_FLOW('calc_ngx_country'); + +SELECT + "ngx_access_log.client", + "ngx_access_log.country", + country_count, + time_window +FROM + ngx_country; + +DROP FLOW calc_ngx_country; + +DROP TABLE ngx_access_log; + +DROP TABLE ngx_country;