Skip to content

Commit

Permalink
fix: flow compare null values (#5234)
Browse files Browse the repository at this point in the history
* fix: flow compare null values

* fix: fix again ck ty before cmp

* chore: rm comment

* fix: handle null

* chore: typo

* docs: update comment

* refactor: per review

* tests: more sqlness
  • Loading branch information
discord9 authored Dec 25, 2024
1 parent abf34b8 commit a6aa6d7
Show file tree
Hide file tree
Showing 3 changed files with 440 additions and 5 deletions.
57 changes: 52 additions & 5 deletions src/flow/src/compute/render/reduce.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ use std::collections::{BTreeMap, BTreeSet};
use std::ops::Range;
use std::sync::Arc;

use arrow::array::new_null_array;
use common_telemetry::trace;
use datatypes::data_type::ConcreteDataType;
use datatypes::prelude::DataType;
Expand Down Expand Up @@ -398,20 +399,54 @@ fn reduce_batch_subgraph(
}
}

// TODO: here reduce numbers of eq to minimal by keeping slicing key/val batch
let key_data_types = output_type
.column_types
.iter()
.map(|t| t.scalar_type.clone())
.collect_vec();

// TODO(discord9): here reduce numbers of eq to minimal by keeping slicing key/val batch
for key_row in distinct_keys {
let key_scalar_value = {
let mut key_scalar_value = Vec::with_capacity(key_row.len());
for key in key_row.iter() {
for (key_idx, key) in key_row.iter().enumerate() {
let v =
key.try_to_scalar_value(&key.data_type())
.context(DataTypeSnafu {
msg: "can't convert key values to datafusion value",
})?;
let arrow_value =

let key_data_type = key_data_types.get(key_idx).context(InternalSnafu {
reason: format!(
"Key index out of bound, expected at most {} but got {}",
output_type.column_types.len(),
key_idx
),
})?;

// if incoming value's datatype is null, it need to be handled specially, see below
if key_data_type.as_arrow_type() != v.data_type()
&& !v.data_type().is_null()
{
crate::expr::error::InternalSnafu {
reason: format!(
"Key data type mismatch, expected {:?} but got {:?}",
key_data_type.as_arrow_type(),
v.data_type()
),
}
.fail()?
}

// handle single null key
let arrow_value = if v.data_type().is_null() {
let ret = new_null_array(&arrow::datatypes::DataType::Null, 1);
arrow::array::Scalar::new(ret)
} else {
v.to_scalar().context(crate::expr::error::DatafusionSnafu {
context: "can't convert key values to arrow value",
})?;
})?
};
key_scalar_value.push(arrow_value);
}
key_scalar_value
Expand All @@ -423,7 +458,19 @@ fn reduce_batch_subgraph(
.zip(key_batch.batch().iter())
.map(|(key, col)| {
// TODO(discord9): this takes half of the cpu! And this is redundant amount of `eq`!
arrow::compute::kernels::cmp::eq(&key, &col.to_arrow_array().as_ref() as _)

// note that if lhs is a null, we still need to get all rows that are null! But can't use `eq` since
// it will return null if input have null, so we need to use `is_null` instead
if arrow::array::Datum::get(&key).0.data_type().is_null() {
arrow::compute::kernels::boolean::is_null(
col.to_arrow_array().as_ref() as _
)
} else {
arrow::compute::kernels::cmp::eq(
&key,
&col.to_arrow_array().as_ref() as _,
)
}
})
.try_collect::<_, Vec<_>, _>()
.context(ArrowSnafu {
Expand Down
251 changes: 251 additions & 0 deletions tests/cases/standalone/common/flow/flow_null.result
Original file line number Diff line number Diff line change
@@ -0,0 +1,251 @@
-- test null handling in flow
-- test null handling in value part of key-value pair
CREATE TABLE requests (
service_name STRING,
service_ip STRING,
val INT,
ts TIMESTAMP TIME INDEX
);

Affected Rows: 0

CREATE TABLE sum_val_in_reqs (
sum_val INT64,
ts TIMESTAMP TIME INDEX
);

Affected Rows: 0

CREATE FLOW requests_long_term SINK TO sum_val_in_reqs AS
SELECT
sum(val) as sum_val,
date_bin(INTERVAL '30 seconds', ts) as time_window,
FROM
requests
GROUP BY
time_window;

Affected Rows: 0

INSERT INTO
requests
VALUES
(NULL, "10.0.0.1", NULL, "2024-10-18 19:00:00"),
("svc1", "10.0.0.2", 100, "2024-10-18 19:00:00"),
(NULL, "10.0.0.1", NULL, "2024-10-18 19:00:30"),
("svc1", "10.0.0.2", 200, "2024-10-18 19:00:30"),
(NULL, "10.0.0.1", 300, "2024-10-18 19:01:00"),
(NULL, "10.0.0.2", NULL, "2024-10-18 19:01:01"),
("svc1", "10.0.0.1", 400, "2024-10-18 19:01:30"),
("svc1", "10.0.0.2", 200, "2024-10-18 19:01:31");

Affected Rows: 8

-- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED |
ADMIN FLUSH_FLOW('requests_long_term');

+----------------------------------------+
| ADMIN FLUSH_FLOW('requests_long_term') |
+----------------------------------------+
| FLOW_FLUSHED |
+----------------------------------------+

SELECT
*
FROM
sum_val_in_reqs;

+---------+---------------------+
| sum_val | ts |
+---------+---------------------+
| 100 | 2024-10-18T19:00:00 |
| 200 | 2024-10-18T19:00:30 |
| 300 | 2024-10-18T19:01:00 |
| 600 | 2024-10-18T19:01:30 |
+---------+---------------------+

-- Test if FLOWS table works, but don't care about the result since it vary from runs
SELECT
count(CASE WHEN state_size > 0 THEN 1 ELSE 0 END) as active_flows,
FROM
INFORMATION_SCHEMA.FLOWS;

+--------------+
| active_flows |
+--------------+
| 1 |
+--------------+

DROP FLOW requests_long_term;

Affected Rows: 0

DROP TABLE sum_val_in_reqs;

Affected Rows: 0

DROP TABLE requests;

Affected Rows: 0

-- test null handling in key part of key-value pair
CREATE TABLE ngx_access_log (
client STRING,
country STRING,
access_time TIMESTAMP TIME INDEX
);

Affected Rows: 0

CREATE FLOW calc_ngx_country SINK TO ngx_country AS
SELECT
client,
country as 'country',
count(1) as country_count,
date_bin(INTERVAL '1 hour', access_time) as time_window,
FROM
ngx_access_log
GROUP BY
client,
country,
time_window;

Affected Rows: 0

INSERT INTO
ngx_access_log
VALUES
("cli1", null, 0),
("cli1", null, 0),
("cli2", null, 0),
("cli2", null, 1),
("cli1", "b", 0),
("cli1", "c", 0);

Affected Rows: 6

-- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED |
ADMIN FLUSH_FLOW('calc_ngx_country');

+--------------------------------------+
| ADMIN FLUSH_FLOW('calc_ngx_country') |
+--------------------------------------+
| FLOW_FLUSHED |
+--------------------------------------+

SHOW CREATE TABLE ngx_country;

+-------------+----------------------------------------------------------------------------------+
| Table | Create Table |
+-------------+----------------------------------------------------------------------------------+
| ngx_country | CREATE TABLE IF NOT EXISTS "ngx_country" ( |
| | "ngx_access_log.client" STRING NULL, |
| | "ngx_access_log.country" STRING NULL, |
| | "country_count" BIGINT NULL, |
| | "time_window" TIMESTAMP(3) NULL, |
| | "update_at" TIMESTAMP(3) NULL, |
| | "__ts_placeholder" TIMESTAMP(3) NOT NULL, |
| | TIME INDEX ("__ts_placeholder"), |
| | PRIMARY KEY ("ngx_access_log.client", "ngx_access_log.country", "time_window") |
| | ) |
| | |
| | ENGINE=mito |
| | |
+-------------+----------------------------------------------------------------------------------+

SELECT
"ngx_access_log.client",
"ngx_access_log.country",
country_count,
time_window
FROM
ngx_country;

+-----------------------+------------------------+---------------+---------------------+
| ngx_access_log.client | ngx_access_log.country | country_count | time_window |
+-----------------------+------------------------+---------------+---------------------+
| cli1 | | 2 | 1970-01-01T00:00:00 |
| cli1 | b | 1 | 1970-01-01T00:00:00 |
| cli1 | c | 1 | 1970-01-01T00:00:00 |
| cli2 | | 2 | 1970-01-01T00:00:00 |
+-----------------------+------------------------+---------------+---------------------+

-- making sure distinct is working
INSERT INTO
ngx_access_log
VALUES
("cli1", "b", 1);

Affected Rows: 1

-- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED |
ADMIN FLUSH_FLOW('calc_ngx_country');

+--------------------------------------+
| ADMIN FLUSH_FLOW('calc_ngx_country') |
+--------------------------------------+
| FLOW_FLUSHED |
+--------------------------------------+

SELECT
"ngx_access_log.client",
"ngx_access_log.country",
country_count,
time_window
FROM
ngx_country;

+-----------------------+------------------------+---------------+---------------------+
| ngx_access_log.client | ngx_access_log.country | country_count | time_window |
+-----------------------+------------------------+---------------+---------------------+
| cli1 | | 2 | 1970-01-01T00:00:00 |
| cli1 | b | 2 | 1970-01-01T00:00:00 |
| cli1 | c | 1 | 1970-01-01T00:00:00 |
| cli2 | | 2 | 1970-01-01T00:00:00 |
+-----------------------+------------------------+---------------+---------------------+

INSERT INTO
ngx_access_log
VALUES
("cli1", "c", 2);

Affected Rows: 1

-- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED |
ADMIN FLUSH_FLOW('calc_ngx_country');

+--------------------------------------+
| ADMIN FLUSH_FLOW('calc_ngx_country') |
+--------------------------------------+
| FLOW_FLUSHED |
+--------------------------------------+

SELECT
"ngx_access_log.client",
"ngx_access_log.country",
country_count,
time_window
FROM
ngx_country;

+-----------------------+------------------------+---------------+---------------------+
| ngx_access_log.client | ngx_access_log.country | country_count | time_window |
+-----------------------+------------------------+---------------+---------------------+
| cli1 | | 2 | 1970-01-01T00:00:00 |
| cli1 | b | 2 | 1970-01-01T00:00:00 |
| cli1 | c | 2 | 1970-01-01T00:00:00 |
| cli2 | | 2 | 1970-01-01T00:00:00 |
+-----------------------+------------------------+---------------+---------------------+

DROP FLOW calc_ngx_country;

Affected Rows: 0

DROP TABLE ngx_access_log;

Affected Rows: 0

DROP TABLE ngx_country;

Affected Rows: 0

Loading

0 comments on commit a6aa6d7

Please sign in to comment.