diff --git a/src/flow/src/adapter.rs b/src/flow/src/adapter.rs index 04f7fd80b3f2..3a8acd60e75a 100644 --- a/src/flow/src/adapter.rs +++ b/src/flow/src/adapter.rs @@ -50,11 +50,9 @@ use crate::adapter::util::column_schemas_to_proto; use crate::adapter::worker::{create_worker, Worker, WorkerHandle}; use crate::compute::ErrCollector; use crate::df_optimizer::sql_to_flow_plan; -use crate::error::{ExternalSnafu, InternalSnafu, TableNotFoundSnafu, UnexpectedSnafu}; -use crate::expr::GlobalId; -use crate::metrics::{ - METRIC_FLOW_INPUT_BUF_SIZE, METRIC_FLOW_INSERT_ELAPSED, METRIC_FLOW_RUN_INTERVAL_MS, -}; +use crate::error::{EvalSnafu, ExternalSnafu, InternalSnafu, TableNotFoundSnafu, UnexpectedSnafu}; +use crate::expr::{Batch, GlobalId}; +use crate::metrics::{METRIC_FLOW_INSERT_ELAPSED, METRIC_FLOW_RUN_INTERVAL_MS}; use crate::repr::{self, DiffRow, Row, BATCH_SIZE}; mod flownode_impl; @@ -227,11 +225,24 @@ pub fn diff_row_to_request(rows: Vec) -> Vec { reqs } +pub fn batches_to_rows_req(batches: Vec) -> Result, Error> { + let mut reqs = Vec::new(); + for batch in batches { + let mut rows = Vec::with_capacity(batch.row_count()); + for i in 0..batch.row_count() { + let row = batch.get_row(i).context(EvalSnafu)?; + rows.push((Row::new(row), 0)); + } + reqs.push(DiffRequest::Insert(rows)); + } + Ok(reqs) +} + /// This impl block contains methods to send writeback requests to frontend impl FlowWorkerManager { /// Return the number of requests it made pub async fn send_writeback_requests(&self) -> Result { - let all_reqs = self.generate_writeback_request().await; + let all_reqs = self.generate_writeback_request().await?; if all_reqs.is_empty() || all_reqs.iter().all(|v| v.1.is_empty()) { return Ok(0); } @@ -242,122 +253,16 @@ impl FlowWorkerManager { } let (catalog, schema) = (table_name[0].clone(), table_name[1].clone()); let ctx = Arc::new(QueryContext::with(&catalog, &schema)); - // TODO(discord9): instead of auto build table from request schema, actually build table - // before `create flow` to be able to assign pk and ts etc. - let (primary_keys, schema, is_ts_placeholder) = if let Some(table_id) = self - .table_info_source - .get_table_id_from_name(&table_name) - .await? - { - let table_info = self - .table_info_source - .get_table_info_value(&table_id) - .await? - .unwrap(); - let meta = table_info.table_info.meta; - let primary_keys = meta - .primary_key_indices - .into_iter() - .map(|i| meta.schema.column_schemas[i].name.clone()) - .collect_vec(); - let schema = meta.schema.column_schemas; - // check if the last column is the auto created timestamp column, hence the table is auto created from - // flow's plan type - let is_auto_create = { - let correct_name = schema - .last() - .map(|s| s.name == AUTO_CREATED_PLACEHOLDER_TS_COL) - .unwrap_or(false); - let correct_time_index = meta.schema.timestamp_index == Some(schema.len() - 1); - correct_name && correct_time_index - }; - (primary_keys, schema, is_auto_create) - } else { - // TODO(discord9): condiser remove buggy auto create by schema - - let node_ctx = self.node_context.read().await; - let gid: GlobalId = node_ctx - .table_repr - .get_by_name(&table_name) - .map(|x| x.1) - .unwrap(); - let schema = node_ctx - .schema - .get(&gid) - .with_context(|| TableNotFoundSnafu { - name: format!("Table name = {:?}", table_name), - })? - .clone(); - // TODO(discord9): use default key from schema - let primary_keys = schema - .typ() - .keys - .first() - .map(|v| { - v.column_indices - .iter() - .map(|i| { - schema - .get_name(*i) - .clone() - .unwrap_or_else(|| format!("col_{i}")) - }) - .collect_vec() - }) - .unwrap_or_default(); - let update_at = ColumnSchema::new( - UPDATE_AT_TS_COL, - ConcreteDataType::timestamp_millisecond_datatype(), - true, - ); - let original_schema = schema - .typ() - .column_types - .clone() - .into_iter() - .enumerate() - .map(|(idx, typ)| { - let name = schema - .names - .get(idx) - .cloned() - .flatten() - .unwrap_or(format!("col_{}", idx)); - let ret = ColumnSchema::new(name, typ.scalar_type, typ.nullable); - if schema.typ().time_index == Some(idx) { - ret.with_time_index(true) - } else { - ret - } - }) - .collect_vec(); - - let mut with_auto_added_col = original_schema.clone(); - with_auto_added_col.push(update_at); - - // if no time index, add one as placeholder - let no_time_index = schema.typ().time_index.is_none(); - if no_time_index { - let ts_col = ColumnSchema::new( - AUTO_CREATED_PLACEHOLDER_TS_COL, - ConcreteDataType::timestamp_millisecond_datatype(), - true, - ) - .with_time_index(true); - with_auto_added_col.push(ts_col); - } + let (is_ts_placeholder, proto_schema) = + self.try_fetch_or_create_table(&table_name).await?; + let schema_len = proto_schema.len(); - (primary_keys, with_auto_added_col, no_time_index) - }; - let schema_len = schema.len(); - let proto_schema = column_schemas_to_proto(schema, &primary_keys)?; - - debug!( - "Sending {} writeback requests to table {}, reqs={:?}", + trace!( + "Sending {} writeback requests to table {}, reqs total rows={}", reqs.len(), table_name.join("."), - reqs + reqs.iter().map(|r| r.len()).sum::() ); let now = self.tick_manager.tick(); for req in reqs { @@ -450,8 +355,12 @@ impl FlowWorkerManager { } /// Generate writeback request for all sink table - pub async fn generate_writeback_request(&self) -> BTreeMap> { + pub async fn generate_writeback_request( + &self, + ) -> Result>, Error> { + trace!("Start to generate writeback request"); let mut output = BTreeMap::new(); + let mut total_row_count = 0; for (name, sink_recv) in self .node_context .write() @@ -460,14 +369,133 @@ impl FlowWorkerManager { .iter_mut() .map(|(n, (_s, r))| (n, r)) { - let mut rows = Vec::new(); - while let Ok(row) = sink_recv.try_recv() { - rows.push(row); + let mut batches = Vec::new(); + while let Ok(batch) = sink_recv.try_recv() { + total_row_count += batch.row_count(); + batches.push(batch); } - let reqs = diff_row_to_request(rows); + let reqs = batches_to_rows_req(batches)?; output.insert(name.clone(), reqs); } - output + trace!("Prepare writeback req: total row count={}", total_row_count); + Ok(output) + } + + /// Fetch table info or create table from flow's schema if not exist + async fn try_fetch_or_create_table( + &self, + table_name: &TableName, + ) -> Result<(bool, Vec), Error> { + // TODO(discord9): instead of auto build table from request schema, actually build table + // before `create flow` to be able to assign pk and ts etc. + let (primary_keys, schema, is_ts_placeholder) = if let Some(table_id) = self + .table_info_source + .get_table_id_from_name(table_name) + .await? + { + let table_info = self + .table_info_source + .get_table_info_value(&table_id) + .await? + .unwrap(); + let meta = table_info.table_info.meta; + let primary_keys = meta + .primary_key_indices + .into_iter() + .map(|i| meta.schema.column_schemas[i].name.clone()) + .collect_vec(); + let schema = meta.schema.column_schemas; + // check if the last column is the auto created timestamp column, hence the table is auto created from + // flow's plan type + let is_auto_create = { + let correct_name = schema + .last() + .map(|s| s.name == AUTO_CREATED_PLACEHOLDER_TS_COL) + .unwrap_or(false); + let correct_time_index = meta.schema.timestamp_index == Some(schema.len() - 1); + correct_name && correct_time_index + }; + (primary_keys, schema, is_auto_create) + } else { + // TODO(discord9): condiser remove buggy auto create by schema + + let node_ctx = self.node_context.read().await; + let gid: GlobalId = node_ctx + .table_repr + .get_by_name(table_name) + .map(|x| x.1) + .unwrap(); + let schema = node_ctx + .schema + .get(&gid) + .with_context(|| TableNotFoundSnafu { + name: format!("Table name = {:?}", table_name), + })? + .clone(); + // TODO(discord9): use default key from schema + let primary_keys = schema + .typ() + .keys + .first() + .map(|v| { + v.column_indices + .iter() + .map(|i| { + schema + .get_name(*i) + .clone() + .unwrap_or_else(|| format!("col_{i}")) + }) + .collect_vec() + }) + .unwrap_or_default(); + let update_at = ColumnSchema::new( + UPDATE_AT_TS_COL, + ConcreteDataType::timestamp_millisecond_datatype(), + true, + ); + + let original_schema = schema + .typ() + .column_types + .clone() + .into_iter() + .enumerate() + .map(|(idx, typ)| { + let name = schema + .names + .get(idx) + .cloned() + .flatten() + .unwrap_or(format!("col_{}", idx)); + let ret = ColumnSchema::new(name, typ.scalar_type, typ.nullable); + if schema.typ().time_index == Some(idx) { + ret.with_time_index(true) + } else { + ret + } + }) + .collect_vec(); + + let mut with_auto_added_col = original_schema.clone(); + with_auto_added_col.push(update_at); + + // if no time index, add one as placeholder + let no_time_index = schema.typ().time_index.is_none(); + if no_time_index { + let ts_col = ColumnSchema::new( + AUTO_CREATED_PLACEHOLDER_TS_COL, + ConcreteDataType::timestamp_millisecond_datatype(), + true, + ) + .with_time_index(true); + with_auto_added_col.push(ts_col); + } + + (primary_keys, with_auto_added_col, no_time_index) + }; + let proto_schema = column_schemas_to_proto(schema, &primary_keys)?; + Ok((is_ts_placeholder, proto_schema)) } } @@ -498,10 +526,6 @@ impl FlowWorkerManager { } } - async fn get_buf_size(&self) -> usize { - self.node_context.read().await.get_send_buf_size().await - } - /// Trigger dataflow running, and then send writeback request to the source sender /// /// note that this method didn't handle input mirror request, as this should be handled by grpc server @@ -575,43 +599,37 @@ impl FlowWorkerManager { /// TODO(discord9): add flag for subgraph that have input since last run pub async fn run_available(&self, blocking: bool) -> Result { let mut row_cnt = 0; - loop { - let now = self.tick_manager.tick(); - for worker in self.worker_handles.iter() { - // TODO(discord9): consider how to handle error in individual worker - if blocking { - worker.lock().await.run_available(now, blocking).await?; - } else if let Ok(worker) = worker.try_lock() { - worker.run_available(now, blocking).await?; - } else { - return Ok(row_cnt); - } - } - // check row send and rows remain in send buf - let (flush_res, _buf_len) = if blocking { - let ctx = self.node_context.read().await; - (ctx.flush_all_sender().await, ctx.get_send_buf_size().await) + + let now = self.tick_manager.tick(); + for worker in self.worker_handles.iter() { + // TODO(discord9): consider how to handle error in individual worker + if blocking { + worker.lock().await.run_available(now, blocking).await?; + } else if let Ok(worker) = worker.try_lock() { + worker.run_available(now, blocking).await?; } else { - match self.node_context.try_read() { - Ok(ctx) => (ctx.flush_all_sender().await, ctx.get_send_buf_size().await), - Err(_) => return Ok(row_cnt), - } - }; - match flush_res { - Ok(r) => { - common_telemetry::trace!("Flushed {} rows", r); - row_cnt += r; - // send buf is likely to be somewhere empty now, wait - if r < BATCH_SIZE / 2 { - break; - } - } - Err(err) => { - common_telemetry::error!("Flush send buf errors: {:?}", err); - break; - } - }; + return Ok(row_cnt); + } } + // check row send and rows remain in send buf + let flush_res = if blocking { + let ctx = self.node_context.read().await; + ctx.flush_all_sender().await + } else { + match self.node_context.try_read() { + Ok(ctx) => ctx.flush_all_sender().await, + Err(_) => return Ok(row_cnt), + } + }; + match flush_res { + Ok(r) => { + common_telemetry::trace!("Total flushed {} rows", r); + row_cnt += r; + } + Err(err) => { + common_telemetry::error!("Flush send buf errors: {:?}", err); + } + }; Ok(row_cnt) } @@ -624,14 +642,14 @@ impl FlowWorkerManager { ) -> Result<(), Error> { let rows_len = rows.len(); let table_id = region_id.table_id(); - METRIC_FLOW_INPUT_BUF_SIZE.add(rows_len as _); let _timer = METRIC_FLOW_INSERT_ELAPSED .with_label_values(&[table_id.to_string().as_str()]) .start_timer(); self.node_context.read().await.send(table_id, rows).await?; - debug!( + trace!( "Handling write request for table_id={} with {} rows", - table_id, rows_len + table_id, + rows_len ); Ok(()) } diff --git a/src/flow/src/adapter/flownode_impl.rs b/src/flow/src/adapter/flownode_impl.rs index c2da7af95c7d..739804be930f 100644 --- a/src/flow/src/adapter/flownode_impl.rs +++ b/src/flow/src/adapter/flownode_impl.rs @@ -23,7 +23,7 @@ use api::v1::region::InsertRequests; use common_error::ext::BoxedError; use common_meta::error::{ExternalSnafu, Result, UnexpectedSnafu}; use common_meta::node_manager::Flownode; -use common_telemetry::debug; +use common_telemetry::{debug, trace}; use itertools::Itertools; use snafu::{OptionExt, ResultExt}; use store_api::storage::RegionId; @@ -189,7 +189,7 @@ impl Flownode for FlowWorkerManager { }) .try_collect()?; if !fetch_order.iter().enumerate().all(|(i, &v)| i == v) { - debug!("Reordering columns: {:?}", fetch_order) + trace!("Reordering columns: {:?}", fetch_order) } fetch_order }; diff --git a/src/flow/src/adapter/node_context.rs b/src/flow/src/adapter/node_context.rs index 99bd9f97e96e..26e1a6483ab8 100644 --- a/src/flow/src/adapter/node_context.rs +++ b/src/flow/src/adapter/node_context.rs @@ -15,10 +15,10 @@ //! Node context, prone to change with every incoming requests use std::collections::{BTreeMap, BTreeSet, HashMap}; -use std::sync::atomic::AtomicUsize; +use std::sync::atomic::{AtomicUsize, Ordering}; use std::sync::Arc; -use common_telemetry::debug; +use common_telemetry::trace; use session::context::QueryContext; use snafu::{OptionExt, ResultExt}; use table::metadata::TableId; @@ -27,9 +27,9 @@ use tokio::sync::{broadcast, mpsc, RwLock}; use crate::adapter::{FlowId, TableName, TableSource}; use crate::error::{Error, EvalSnafu, TableNotFoundSnafu}; use crate::expr::error::InternalSnafu; -use crate::expr::GlobalId; +use crate::expr::{Batch, GlobalId}; use crate::metrics::METRIC_FLOW_INPUT_BUF_SIZE; -use crate::repr::{DiffRow, RelationDesc, BROADCAST_CAP, SEND_BUF_CAP}; +use crate::repr::{DiffRow, RelationDesc, BATCH_SIZE, BROADCAST_CAP, SEND_BUF_CAP}; /// A context that holds the information of the dataflow #[derive(Default, Debug)] @@ -47,13 +47,8 @@ pub struct FlownodeContext { /// /// and send it back to the client, since we are mocking the sink table as a client, we should use table name as the key /// note that the sink receiver should only have one, and we are using broadcast as mpsc channel here - pub sink_receiver: BTreeMap< - TableName, - ( - mpsc::UnboundedSender, - mpsc::UnboundedReceiver, - ), - >, + pub sink_receiver: + BTreeMap, mpsc::UnboundedReceiver)>, /// the schema of the table, query from metasrv or inferred from TypedPlan pub schema: HashMap, /// All the tables that have been registered in the worker @@ -61,25 +56,27 @@ pub struct FlownodeContext { pub query_context: Option>, } -/// a simple broadcast sender with backpressure and unbound capacity +/// a simple broadcast sender with backpressure, bounded capacity and blocking on send when send buf is full +/// note that it wouldn't evict old data, so it's possible to block forever if the receiver is slow /// /// receiver still use tokio broadcast channel, since only sender side need to know /// backpressure and adjust dataflow running duration to avoid blocking #[derive(Debug)] pub struct SourceSender { // TODO(discord9): make it all Vec? - sender: broadcast::Sender, - send_buf_tx: mpsc::Sender>, - send_buf_rx: RwLock>>, + sender: broadcast::Sender, + send_buf_tx: mpsc::Sender, + send_buf_rx: RwLock>, send_buf_row_cnt: AtomicUsize, } impl Default for SourceSender { fn default() -> Self { + // TODO(discord9): the capacity is arbitrary, we can adjust it later, might also want to limit the max number of rows in send buf let (send_buf_tx, send_buf_rx) = mpsc::channel(SEND_BUF_CAP); Self { // TODO(discord9): found a better way then increase this to prevent lagging and hence missing input data - sender: broadcast::Sender::new(BROADCAST_CAP * 2), + sender: broadcast::Sender::new(SEND_BUF_CAP), send_buf_tx, send_buf_rx: RwLock::new(send_buf_rx), send_buf_row_cnt: AtomicUsize::new(0), @@ -90,7 +87,7 @@ impl Default for SourceSender { impl SourceSender { /// max number of iterations to try flush send buf const MAX_ITERATIONS: usize = 16; - pub fn get_receiver(&self) -> broadcast::Receiver { + pub fn get_receiver(&self) -> broadcast::Receiver { self.sender.subscribe() } @@ -106,30 +103,27 @@ impl SourceSender { break; } // TODO(discord9): send rows instead so it's just moving a point - if let Some(rows) = send_buf.recv().await { - let len = rows.len(); - self.send_buf_row_cnt - .fetch_sub(len, std::sync::atomic::Ordering::SeqCst); - for row in rows { - self.sender - .send(row) - .map_err(|err| { - InternalSnafu { - reason: format!("Failed to send row, error = {:?}", err), - } - .build() - }) - .with_context(|_| EvalSnafu)?; - row_cnt += 1; - } + if let Some(batch) = send_buf.recv().await { + let len = batch.row_count(); + self.send_buf_row_cnt.fetch_sub(len, Ordering::SeqCst); + row_cnt += len; + self.sender + .send(batch) + .map_err(|err| { + InternalSnafu { + reason: format!("Failed to send row, error = {:?}", err), + } + .build() + }) + .with_context(|_| EvalSnafu)?; } } if row_cnt > 0 { - debug!("Send {} rows", row_cnt); + trace!("Source Flushed {} rows", row_cnt); METRIC_FLOW_INPUT_BUF_SIZE.sub(row_cnt as _); - debug!( - "Remaining Send buf.len() = {}", - self.send_buf_rx.read().await.len() + trace!( + "Remaining Source Send buf.len() = {}", + METRIC_FLOW_INPUT_BUF_SIZE.get() ); } @@ -138,12 +132,23 @@ impl SourceSender { /// return number of rows it actual send(including what's in the buffer) pub async fn send_rows(&self, rows: Vec) -> Result { - self.send_buf_tx.send(rows).await.map_err(|e| { + METRIC_FLOW_INPUT_BUF_SIZE.add(rows.len() as _); + while self.send_buf_row_cnt.load(Ordering::SeqCst) >= BATCH_SIZE * 4 { + tokio::task::yield_now().await; + } + // row count metrics is approx so relaxed order is ok + self.send_buf_row_cnt + .fetch_add(rows.len(), Ordering::SeqCst); + let batch = Batch::try_from_rows(rows.into_iter().map(|(row, _, _)| row).collect()) + .context(EvalSnafu)?; + common_telemetry::trace!("Send one batch to worker with {} rows", batch.row_count()); + self.send_buf_tx.send(batch).await.map_err(|e| { crate::error::InternalSnafu { reason: format!("Failed to send row, error = {:?}", e), } .build() })?; + Ok(0) } } @@ -159,8 +164,6 @@ impl FlownodeContext { .with_context(|| TableNotFoundSnafu { name: table_id.to_string(), })?; - - debug!("FlownodeContext::send: trying to send {} rows", rows.len()); sender.send_rows(rows).await } @@ -174,16 +177,6 @@ impl FlownodeContext { } Ok(sum) } - - /// Return the sum number of rows in all send buf - /// TODO(discord9): remove this since we can't get correct row cnt anyway - pub async fn get_send_buf_size(&self) -> usize { - let mut sum = 0; - for sender in self.source_sender.values() { - sum += sender.send_buf_rx.read().await.len(); - } - sum - } } impl FlownodeContext { @@ -230,7 +223,7 @@ impl FlownodeContext { pub fn add_sink_receiver(&mut self, table_name: TableName) { self.sink_receiver .entry(table_name) - .or_insert_with(mpsc::unbounded_channel::); + .or_insert_with(mpsc::unbounded_channel); } pub fn get_source_by_global_id(&self, id: &GlobalId) -> Result<&SourceSender, Error> { @@ -254,7 +247,7 @@ impl FlownodeContext { pub fn get_sink_by_global_id( &self, id: &GlobalId, - ) -> Result, Error> { + ) -> Result, Error> { let table_name = self .table_repr .get_by_global_id(id) diff --git a/src/flow/src/adapter/worker.rs b/src/flow/src/adapter/worker.rs index 72ac9df127c5..978d3c608cec 100644 --- a/src/flow/src/adapter/worker.rs +++ b/src/flow/src/adapter/worker.rs @@ -27,7 +27,7 @@ use tokio::sync::{broadcast, mpsc, oneshot, Mutex}; use crate::adapter::FlowId; use crate::compute::{Context, DataflowState, ErrCollector}; use crate::error::{Error, FlowAlreadyExistSnafu, InternalSnafu, UnexpectedSnafu}; -use crate::expr::GlobalId; +use crate::expr::{Batch, GlobalId}; use crate::plan::TypedPlan; use crate::repr::{self, DiffRow}; @@ -89,6 +89,8 @@ impl<'subgraph> ActiveDataflowState<'subgraph> { err_collector: self.err_collector.clone(), input_collection: Default::default(), local_scope: Default::default(), + input_collection_batch: Default::default(), + local_scope_batch: Default::default(), } } @@ -156,13 +158,13 @@ impl WorkerHandle { /// /// the returned error is unrecoverable, and the worker should be shutdown/rebooted pub async fn run_available(&self, now: repr::Timestamp, blocking: bool) -> Result<(), Error> { - common_telemetry::debug!("Running available with blocking={}", blocking); + common_telemetry::trace!("Running available with blocking={}", blocking); if blocking { let resp = self .itc_client .call_with_resp(Request::RunAvail { now, blocking }) .await?; - common_telemetry::debug!("Running available with response={:?}", resp); + common_telemetry::trace!("Running available with response={:?}", resp); Ok(()) } else { self.itc_client @@ -225,9 +227,9 @@ impl<'s> Worker<'s> { flow_id: FlowId, plan: TypedPlan, sink_id: GlobalId, - sink_sender: mpsc::UnboundedSender, + sink_sender: mpsc::UnboundedSender, source_ids: &[GlobalId], - src_recvs: Vec>, + src_recvs: Vec>, // TODO(discord9): set expire duration for all arrangement and compare to sys timestamp instead expire_after: Option, create_if_not_exists: bool, @@ -249,12 +251,12 @@ impl<'s> Worker<'s> { { let mut ctx = cur_task_state.new_ctx(sink_id); for (source_id, src_recv) in source_ids.iter().zip(src_recvs) { - let bundle = ctx.render_source(src_recv)?; - ctx.insert_global(*source_id, bundle); + let bundle = ctx.render_source_batch(src_recv)?; + ctx.insert_global_batch(*source_id, bundle); } - let rendered = ctx.render_plan(plan)?; - ctx.render_unbounded_sink(rendered, sink_sender); + let rendered = ctx.render_plan_batch(plan)?; + ctx.render_unbounded_sink_batch(rendered, sink_sender); } self.task_states.insert(flow_id, cur_task_state); Ok(Some(flow_id)) @@ -370,9 +372,9 @@ pub enum Request { flow_id: FlowId, plan: TypedPlan, sink_id: GlobalId, - sink_sender: mpsc::UnboundedSender, + sink_sender: mpsc::UnboundedSender, source_ids: Vec, - src_recvs: Vec>, + src_recvs: Vec>, expire_after: Option, create_if_not_exists: bool, err_collector: ErrCollector, @@ -472,7 +474,7 @@ mod test { use super::*; use crate::expr::Id; use crate::plan::Plan; - use crate::repr::{RelationType, Row}; + use crate::repr::RelationType; #[test] fn drop_handle() { @@ -497,8 +499,8 @@ mod test { }); let handle = rx.await.unwrap(); let src_ids = vec![GlobalId::User(1)]; - let (tx, rx) = broadcast::channel::(1024); - let (sink_tx, mut sink_rx) = mpsc::unbounded_channel::(); + let (tx, rx) = broadcast::channel::(1024); + let (sink_tx, mut sink_rx) = mpsc::unbounded_channel::(); let (flow_id, plan) = ( 1, TypedPlan { @@ -523,9 +525,9 @@ mod test { handle.create_flow(create_reqs).await.unwrap(), Some(flow_id) ); - tx.send((Row::empty(), 0, 0)).unwrap(); + tx.send(Batch::empty()).unwrap(); handle.run_available(0, true).await.unwrap(); - assert_eq!(sink_rx.recv().await.unwrap().0, Row::empty()); + assert_eq!(sink_rx.recv().await.unwrap(), Batch::empty()); drop(handle); worker_thread_handle.join().unwrap(); } diff --git a/src/flow/src/compute/render.rs b/src/flow/src/compute/render.rs index dcb2194a1d79..90f0d86c73af 100644 --- a/src/flow/src/compute/render.rs +++ b/src/flow/src/compute/render.rs @@ -49,6 +49,14 @@ pub struct Context<'referred, 'df> { /// /// TODO(discord9): consider if use Vec<(LocalId, CollectionBundle)> instead pub local_scope: Vec>, + /// a list of all collections being used in the operator + /// + /// TODO(discord9): remove extra clone by counting usage and remove it on last usage? + pub input_collection_batch: BTreeMap>, + /// used by `Get`/`Let` Plan for getting/setting local variables + /// + /// TODO(discord9): consider if use Vec<(LocalId, CollectionBundle)> instead + pub local_scope_batch: Vec>>, // Collect all errors in this operator's evaluation pub err_collector: ErrCollector, } @@ -67,6 +75,19 @@ impl<'referred, 'df> Drop for Context<'referred, 'df> { bundle.collection.into_inner().drop(self.df); drop(bundle.arranged); } + + for bundle in std::mem::take(&mut self.input_collection_batch) + .into_values() + .chain( + std::mem::take(&mut self.local_scope_batch) + .into_iter() + .flat_map(|v| v.into_iter()) + .map(|(_k, v)| v), + ) + { + bundle.collection.into_inner().drop(self.df); + drop(bundle.arranged); + } // The automatically generated "drop glue" which recursively calls the destructors of all the fields (including the now empty `input_collection`) } } @@ -84,6 +105,19 @@ impl<'referred, 'df> Context<'referred, 'df> { self.local_scope.push(first); } } + + pub fn insert_global_batch(&mut self, id: GlobalId, collection: CollectionBundle) { + self.input_collection_batch.insert(id, collection); + } + + pub fn insert_local_batch(&mut self, id: LocalId, collection: CollectionBundle) { + if let Some(last) = self.local_scope_batch.last_mut() { + last.insert(id, collection); + } else { + let first = BTreeMap::from([(id, collection)]); + self.local_scope_batch.push(first); + } + } } impl<'referred, 'df> Context<'referred, 'df> { @@ -91,14 +125,8 @@ impl<'referred, 'df> Context<'referred, 'df> { pub fn render_plan_batch(&mut self, plan: TypedPlan) -> Result, Error> { match plan.plan { Plan::Constant { rows } => Ok(self.render_constant_batch(rows)), - Plan::Get { .. } => NotImplementedSnafu { - reason: "Get is still WIP in batchmode", - } - .fail(), - Plan::Let { .. } => NotImplementedSnafu { - reason: "Let is still WIP in batchmode", - } - .fail(), + Plan::Get { id } => self.get_batch_by_id(id), + Plan::Let { id, value, body } => self.eval_batch_let(id, value, body), Plan::Mfp { input, mfp } => self.render_mfp_batch(input, mfp), Plan::Reduce { input, @@ -225,6 +253,32 @@ impl<'referred, 'df> Context<'referred, 'df> { CollectionBundle::from_collection(Collection::from_port(recv_port)) } + pub fn get_batch_by_id(&mut self, id: expr::Id) -> Result, Error> { + let ret = match id { + expr::Id::Local(local) => { + let bundle = self + .local_scope_batch + .iter() + .rev() + .find_map(|scope| scope.get(&local)) + .with_context(|| InvalidQuerySnafu { + reason: format!("Local variable {:?} not found", local), + })?; + bundle.clone(self.df) + } + expr::Id::Global(id) => { + let bundle = + self.input_collection_batch + .get(&id) + .with_context(|| InvalidQuerySnafu { + reason: format!("Collection {:?} not found", id), + })?; + bundle.clone(self.df) + } + }; + Ok(ret) + } + pub fn get_by_id(&mut self, id: expr::Id) -> Result { let ret = match id { expr::Id::Local(local) => { @@ -251,6 +305,21 @@ impl<'referred, 'df> Context<'referred, 'df> { Ok(ret) } + /// Eval `Let` operator, useful for assigning a value to a local variable + pub fn eval_batch_let( + &mut self, + id: LocalId, + value: Box, + body: Box, + ) -> Result, Error> { + let value = self.render_plan_batch(*value)?; + + self.local_scope_batch.push(Default::default()); + self.insert_local_batch(id, value); + let ret = self.render_plan_batch(*body)?; + Ok(ret) + } + /// Eval `Let` operator, useful for assigning a value to a local variable pub fn eval_let( &mut self, @@ -268,11 +337,11 @@ impl<'referred, 'df> Context<'referred, 'df> { } /// The Common argument for all `Subgraph` in the render process -struct SubgraphArg<'a> { +struct SubgraphArg<'a, T = Toff> { now: repr::Timestamp, err_collector: &'a ErrCollector, scheduler: &'a Scheduler, - send: &'a PortCtx, + send: &'a PortCtx, } #[cfg(test)] @@ -345,6 +414,8 @@ mod test { compute_state: state, input_collection: BTreeMap::new(), local_scope: Default::default(), + input_collection_batch: BTreeMap::new(), + local_scope_batch: Default::default(), err_collector, } } diff --git a/src/flow/src/compute/render/reduce.rs b/src/flow/src/compute/render/reduce.rs index e43d3d22a4e0..8de6b7a17a4b 100644 --- a/src/flow/src/compute/render/reduce.rs +++ b/src/flow/src/compute/render/reduce.rs @@ -12,14 +12,15 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::collections::BTreeMap; +use std::collections::{BTreeMap, BTreeSet}; use std::ops::Range; use std::sync::Arc; +use common_telemetry::trace; use datatypes::data_type::ConcreteDataType; use datatypes::prelude::DataType; use datatypes::value::{ListValue, Value}; -use datatypes::vectors::NullVector; +use datatypes::vectors::{BooleanVector, NullVector}; use hydroflow::scheduled::graph_ext::GraphExt; use itertools::Itertools; use snafu::{ensure, OptionExt, ResultExt}; @@ -27,8 +28,8 @@ use snafu::{ensure, OptionExt, ResultExt}; use crate::compute::render::{Context, SubgraphArg}; use crate::compute::types::{Arranged, Collection, CollectionBundle, ErrCollector, Toff}; use crate::error::{Error, NotImplementedSnafu, PlanSnafu}; -use crate::expr::error::{DataAlreadyExpiredSnafu, DataTypeSnafu, InternalSnafu}; -use crate::expr::{Batch, EvalError, ScalarExpr}; +use crate::expr::error::{ArrowSnafu, DataAlreadyExpiredSnafu, DataTypeSnafu, InternalSnafu}; +use crate::expr::{Accum, Accumulator, Batch, EvalError, ScalarExpr, VectorDiff}; use crate::plan::{AccumulablePlan, AggrWithIndex, KeyValPlan, ReducePlan, TypedPlan}; use crate::repr::{self, DiffRow, KeyValDiffRow, RelationType, Row}; use crate::utils::{ArrangeHandler, ArrangeReader, ArrangeWriter, KeyExpiryManager}; @@ -93,152 +94,39 @@ impl<'referred, 'df> Context<'referred, 'df> { // TODO(discord9): better way to schedule future run let scheduler = self.compute_state.get_scheduler(); + let scheduler_inner = scheduler.clone(); + let (out_send_port, out_recv_port) = self.df.make_edge::<_, Toff>(Self::REDUCE_BATCH); - let subgraph = - self.df.add_subgraph_in_out( - Self::REDUCE_BATCH, - input.collection.into_inner(), - out_send_port, - move |_ctx, recv, send| { - let now = *(now.borrow()); - let arrange = arrange_handler_inner.clone(); - // mfp only need to passively receive updates from recvs - let src_data = recv - .take_inner() - .into_iter() - .flat_map(|v| v.into_iter()) - .collect_vec(); - - let mut key_to_many_vals = BTreeMap::::new(); - for batch in src_data { - err_collector.run(|| { - let (key_batch, val_batch) = - batch_split_by_key_val(&batch, &key_val_plan, &err_collector); - ensure!( - key_batch.row_count() == val_batch.row_count(), - InternalSnafu { - reason: format!( - "Key and val batch should have the same row count, found {} and {}", - key_batch.row_count(), - val_batch.row_count() - ) - } - ); - - for row_idx in 0..key_batch.row_count() { - let key_row = key_batch.get_row(row_idx).unwrap(); - let val_row = val_batch.slice(row_idx, 1)?; - let val_batch = - key_to_many_vals.entry(Row::new(key_row)).or_default(); - val_batch.append_batch(val_row)?; - } - - Ok(()) - }); - } - - // write lock the arrange for the rest of the function body - // to prevent wired race condition - let mut arrange = arrange.write(); - let mut all_arrange_updates = Vec::with_capacity(key_to_many_vals.len()); - let mut all_output_rows = Vec::with_capacity(key_to_many_vals.len()); - - for (key, val_batch) in key_to_many_vals { - err_collector.run(|| -> Result<(), _> { - let (accums, _, _) = arrange.get(now, &key).unwrap_or_default(); - let accum_list = from_accum_values_to_live_accums( - accums.unpack(), - accum_plan.simple_aggrs.len(), - )?; - - let mut accum_output = AccumOutput::new(); - for AggrWithIndex { - expr, - input_idx, - output_idx, - } in accum_plan.simple_aggrs.iter() - { - let cur_old_accum = accum_list.get(*output_idx).cloned().unwrap_or_default(); - // if batch is empty, input null instead - let cur_input = val_batch.batch().get(*input_idx).cloned().unwrap_or_else(||Arc::new(NullVector::new(val_batch.row_count()))); - - let (output, new_accum) = - expr.func.eval_batch(cur_old_accum, cur_input, None)?; - - accum_output.insert_accum(*output_idx, new_accum); - accum_output.insert_output(*output_idx, output); - } - - let (new_accums, res_val_row) = accum_output.into_accum_output()?; - - let arrange_update = ((key.clone(), Row::new(new_accums)), now, 1); - all_arrange_updates.push(arrange_update); - - let mut key_val = key; - key_val.extend(res_val_row); - all_output_rows.push((key_val, now, 1)); - - Ok(()) - }); - } - - err_collector.run(|| { - arrange.apply_updates(now, all_arrange_updates)?; - arrange.compact_to(now) - }); - - // this output part is not supposed to be resource intensive - // (because for every batch there wouldn't usually be as many output row?), - // so we can do some costly operation here - let output_types = all_output_rows.first().map(|(row, _, _)| { - row.iter() - .map(|v| v.data_type()) - .collect::>() - }); - - if let Some(output_types) = output_types { - err_collector.run(|| { - let column_cnt = output_types.len(); - let row_cnt = all_output_rows.len(); - - let mut output_builder = output_types - .into_iter() - .map(|t| t.create_mutable_vector(row_cnt)) - .collect_vec(); - - for (row, _, _) in all_output_rows { - for (i, v) in row.into_iter().enumerate() { - output_builder - .get_mut(i) - .context(InternalSnafu{ - reason: format!( - "Output builder should have the same length as the row, expected at most {} but got {}", - column_cnt-1, - i - ) - })? - .try_push_value_ref(v.as_value_ref()) - .context(DataTypeSnafu { - msg: "Failed to push value", - })?; - } - } - - let output_columns = output_builder - .into_iter() - .map(|mut b| b.to_vector()) - .collect_vec(); - - let output_batch = Batch::try_new(output_columns, row_cnt)?; - send.give(vec![output_batch]); + let subgraph = self.df.add_subgraph_in_out( + Self::REDUCE_BATCH, + input.collection.into_inner(), + out_send_port, + move |_ctx, recv, send| { + let now = *(now.borrow()); + let arrange = arrange_handler_inner.clone(); + // mfp only need to passively receive updates from recvs + let src_data = recv + .take_inner() + .into_iter() + .flat_map(|v| v.into_iter()) + .collect_vec(); - Ok(()) - }); - } - }, - ); + reduce_batch_subgraph( + &arrange, + src_data, + &key_val_plan, + &accum_plan, + SubgraphArg { + now, + err_collector: &err_collector, + scheduler: &scheduler_inner, + send, + }, + ) + }, + ); scheduler.set_cur_subgraph(subgraph); @@ -461,6 +349,245 @@ fn split_rows_to_key_val( ) } +fn reduce_batch_subgraph( + arrange: &ArrangeHandler, + src_data: impl IntoIterator, + key_val_plan: &KeyValPlan, + accum_plan: &AccumulablePlan, + SubgraphArg { + now, + err_collector, + scheduler: _, + send, + }: SubgraphArg>, +) { + let mut key_to_many_vals = BTreeMap::>::new(); + let mut input_row_count = 0; + let mut input_batch_count = 0; + + for batch in src_data { + input_batch_count += 1; + input_row_count += batch.row_count(); + err_collector.run(|| { + let (key_batch, val_batch) = + batch_split_by_key_val(&batch, key_val_plan, err_collector); + ensure!( + key_batch.row_count() == val_batch.row_count(), + InternalSnafu { + reason: format!( + "Key and val batch should have the same row count, found {} and {}", + key_batch.row_count(), + val_batch.row_count() + ) + } + ); + + let mut distinct_keys = BTreeSet::new(); + for row_idx in 0..key_batch.row_count() { + let key_row = key_batch.get_row(row_idx)?; + let key_row = Row::new(key_row); + + if distinct_keys.contains(&key_row) { + continue; + } else { + distinct_keys.insert(key_row.clone()); + } + } + + // TODO: here reduce numbers of eq to minimal by keeping slicing key/val batch + for key_row in distinct_keys { + let key_scalar_value = { + let mut key_scalar_value = Vec::with_capacity(key_row.len()); + for key in key_row.iter() { + let v = + key.try_to_scalar_value(&key.data_type()) + .context(DataTypeSnafu { + msg: "can't convert key values to datafusion value", + })?; + let arrow_value = + v.to_scalar().context(crate::expr::error::DatafusionSnafu { + context: "can't convert key values to arrow value", + })?; + key_scalar_value.push(arrow_value); + } + key_scalar_value + }; + + // first compute equal from separate columns + let eq_results = key_scalar_value + .into_iter() + .zip(key_batch.batch().iter()) + .map(|(key, col)| { + // TODO(discord9): this takes half of the cpu! And this is redundant amount of `eq`! + arrow::compute::kernels::cmp::eq(&key, &col.to_arrow_array().as_ref() as _) + }) + .try_collect::<_, Vec<_>, _>() + .context(ArrowSnafu { + context: "Failed to compare key values", + })?; + + // then combine all equal results to finally found equal key rows + let opt_eq_mask = eq_results + .into_iter() + .fold(None, |acc, v| match acc { + Some(Ok(acc)) => Some(arrow::compute::kernels::boolean::and(&acc, &v)), + Some(Err(_)) => acc, + None => Some(Ok(v)), + }) + .transpose() + .context(ArrowSnafu { + context: "Failed to combine key comparison results", + })?; + + let key_eq_mask = if let Some(eq_mask) = opt_eq_mask { + BooleanVector::from(eq_mask) + } else { + // if None, meaning key_batch's column number is zero, which means + // the key is empty, so we just return a mask of all true + // meaning taking all values + BooleanVector::from(vec![true; key_batch.row_count()]) + }; + // TODO: both slice and mutate remaining batch + + let cur_val_batch = val_batch.filter(&key_eq_mask)?; + + key_to_many_vals + .entry(key_row) + .or_default() + .push(cur_val_batch); + } + + Ok(()) + }); + } + + trace!( + "Reduce take {} batches, {} rows", + input_batch_count, + input_row_count + ); + + // write lock the arrange for the rest of the function body + // to prevent wired race condition + let mut arrange = arrange.write(); + let mut all_arrange_updates = Vec::with_capacity(key_to_many_vals.len()); + + let mut all_output_dict = BTreeMap::new(); + + for (key, val_batches) in key_to_many_vals { + err_collector.run(|| -> Result<(), _> { + let (accums, _, _) = arrange.get(now, &key).unwrap_or_default(); + let accum_list = + from_accum_values_to_live_accums(accums.unpack(), accum_plan.simple_aggrs.len())?; + + let mut accum_output = AccumOutput::new(); + for AggrWithIndex { + expr, + input_idx, + output_idx, + } in accum_plan.simple_aggrs.iter() + { + let cur_accum_value = accum_list.get(*output_idx).cloned().unwrap_or_default(); + let mut cur_accum = if cur_accum_value.is_empty() { + Accum::new_accum(&expr.func.clone())? + } else { + Accum::try_into_accum(&expr.func, cur_accum_value)? + }; + + for val_batch in val_batches.iter() { + // if batch is empty, input null instead + let cur_input = val_batch + .batch() + .get(*input_idx) + .cloned() + .unwrap_or_else(|| Arc::new(NullVector::new(val_batch.row_count()))); + let len = cur_input.len(); + cur_accum.update_batch(&expr.func, VectorDiff::from(cur_input))?; + + trace!("Reduce accum after take {} rows: {:?}", len, cur_accum); + } + let final_output = cur_accum.eval(&expr.func)?; + trace!("Reduce accum final output: {:?}", final_output); + accum_output.insert_output(*output_idx, final_output); + + let cur_accum_value = cur_accum.into_state(); + accum_output.insert_accum(*output_idx, cur_accum_value); + } + + let (new_accums, res_val_row) = accum_output.into_accum_output()?; + + let arrange_update = ((key.clone(), Row::new(new_accums)), now, 1); + all_arrange_updates.push(arrange_update); + + all_output_dict.insert(key, Row::from(res_val_row)); + + Ok(()) + }); + } + + err_collector.run(|| { + arrange.apply_updates(now, all_arrange_updates)?; + arrange.compact_to(now) + }); + // release the lock + drop(arrange); + + // this output part is not supposed to be resource intensive + // (because for every batch there wouldn't usually be as many output row?), + // so we can do some costly operation here + let output_types = all_output_dict.first_entry().map(|entry| { + entry + .key() + .iter() + .chain(entry.get().iter()) + .map(|v| v.data_type()) + .collect::>() + }); + + if let Some(output_types) = output_types { + err_collector.run(|| { + let column_cnt = output_types.len(); + let row_cnt = all_output_dict.len(); + + let mut output_builder = output_types + .into_iter() + .map(|t| t.create_mutable_vector(row_cnt)) + .collect_vec(); + + for (key, val) in all_output_dict { + for (i, v) in key.into_iter().chain(val.into_iter()).enumerate() { + output_builder + .get_mut(i) + .context(InternalSnafu{ + reason: format!( + "Output builder should have the same length as the row, expected at most {} but got {}", + column_cnt - 1, + i + ) + })? + .try_push_value_ref(v.as_value_ref()) + .context(DataTypeSnafu { + msg: "Failed to push value", + })?; + } + } + + let output_columns = output_builder + .into_iter() + .map(|mut b| b.to_vector()) + .collect_vec(); + + let output_batch = Batch::try_new(output_columns, row_cnt)?; + + trace!("Reduce output batch: {:?}", output_batch); + + send.give(vec![output_batch]); + + Ok(()) + }); + } +} + /// reduce subgraph, reduce the input data into a single row /// output is concat from key and val fn reduce_subgraph( @@ -856,6 +983,9 @@ impl AccumOutput { /// return (accums, output) fn into_accum_output(self) -> Result<(Vec, Vec), EvalError> { + if self.accum.is_empty() && self.output.is_empty() { + return Ok((vec![], vec![])); + } ensure!( !self.accum.is_empty() && self.accum.len() == self.output.len(), InternalSnafu { diff --git a/src/flow/src/compute/render/src_sink.rs b/src/flow/src/compute/render/src_sink.rs index 53141b8cc0c1..3f5bfaf34574 100644 --- a/src/flow/src/compute/render/src_sink.rs +++ b/src/flow/src/compute/render/src_sink.rs @@ -16,7 +16,7 @@ use std::collections::{BTreeMap, VecDeque}; -use common_telemetry::debug; +use common_telemetry::{debug, trace}; use hydroflow::scheduled::graph_ext::GraphExt; use itertools::Itertools; use snafu::OptionExt; @@ -48,10 +48,13 @@ impl<'referred, 'df> Context<'referred, 'df> { let sub = self .df .add_subgraph_source("source_batch", send_port, move |_ctx, send| { + let mut total_batches = vec![]; + let mut total_row_count = 0; loop { match src_recv.try_recv() { Ok(batch) => { - send.give(vec![batch]); + total_row_count += batch.row_count(); + total_batches.push(batch); } Err(TryRecvError::Empty) => { break; @@ -78,6 +81,13 @@ impl<'referred, 'df> Context<'referred, 'df> { } } + trace!( + "Send {} rows in {} batches", + total_row_count, + total_batches.len() + ); + send.give(total_batches); + let now = *now.borrow(); // always schedule source to run at now so we can // repeatedly run source if needed @@ -185,13 +195,18 @@ impl<'referred, 'df> Context<'referred, 'df> { collection.into_inner(), move |_ctx, recv| { let data = recv.take_inner(); + let mut row_count = 0; + let mut batch_count = 0; for batch in data.into_iter().flat_map(|i| i.into_iter()) { + row_count += batch.row_count(); + batch_count += 1; // if the sender is closed unexpectedly, stop sending if sender.is_closed() || sender.send(batch).is_err() { common_telemetry::error!("UnboundedSinkBatch is closed"); break; } } + trace!("sink send {} rows in {} batches", row_count, batch_count); }, ); } diff --git a/src/flow/src/compute/types.rs b/src/flow/src/compute/types.rs index f7cbebae271a..00ed660a6ef0 100644 --- a/src/flow/src/compute/types.rs +++ b/src/flow/src/compute/types.rs @@ -24,7 +24,7 @@ use hydroflow::scheduled::SubgraphId; use itertools::Itertools; use tokio::sync::Mutex; -use crate::expr::{EvalError, ScalarExpr}; +use crate::expr::{Batch, EvalError, ScalarExpr}; use crate::repr::DiffRow; use crate::utils::ArrangeHandler; @@ -123,6 +123,38 @@ pub struct CollectionBundle { pub arranged: BTreeMap, Arranged>, } +pub trait GenericBundle { + fn is_batch(&self) -> bool; + + fn try_as_batch(&self) -> Option<&CollectionBundle> { + None + } + + fn try_as_row(&self) -> Option<&CollectionBundle> { + None + } +} + +impl GenericBundle for CollectionBundle { + fn is_batch(&self) -> bool { + true + } + + fn try_as_batch(&self) -> Option<&CollectionBundle> { + Some(self) + } +} + +impl GenericBundle for CollectionBundle { + fn is_batch(&self) -> bool { + false + } + + fn try_as_row(&self) -> Option<&CollectionBundle> { + Some(self) + } +} + impl CollectionBundle { pub fn from_collection(collection: Collection) -> Self { Self { diff --git a/src/flow/src/expr.rs b/src/flow/src/expr.rs index 1f8160716a55..2e6019ba4ca7 100644 --- a/src/flow/src/expr.rs +++ b/src/flow/src/expr.rs @@ -16,27 +16,29 @@ mod df_func; pub(crate) mod error; -mod func; +pub(crate) mod func; mod id; mod linear; -mod relation; +pub(crate) mod relation; mod scalar; mod signature; +use arrow::compute::FilterBuilder; use datatypes::prelude::DataType; use datatypes::value::Value; -use datatypes::vectors::VectorRef; +use datatypes::vectors::{BooleanVector, Helper, VectorRef}; pub(crate) use df_func::{DfScalarFunction, RawDfScalarFn}; pub(crate) use error::{EvalError, InvalidArgumentSnafu}; pub(crate) use func::{BinaryFunc, UnaryFunc, UnmaterializableFunc, VariadicFunc}; pub(crate) use id::{GlobalId, Id, LocalId}; use itertools::Itertools; pub(crate) use linear::{MapFilterProject, MfpPlan, SafeMfpPlan}; -pub(crate) use relation::{AggregateExpr, AggregateFunc}; +pub(crate) use relation::{Accum, Accumulator, AggregateExpr, AggregateFunc}; pub(crate) use scalar::{ScalarExpr, TypedExpr}; use snafu::{ensure, ResultExt}; -use crate::expr::error::DataTypeSnafu; +use crate::expr::error::{ArrowSnafu, DataTypeSnafu}; +use crate::repr::Diff; pub const TUMBLE_START: &str = "tumble_start"; pub const TUMBLE_END: &str = "tumble_end"; @@ -179,7 +181,9 @@ impl Batch { ) } ); - Ok(self.batch.iter().map(|v| v.get(idx)).collect_vec()) + let mut ret = Vec::with_capacity(self.column_count()); + ret.extend(self.batch.iter().map(|v| v.get(idx))); + Ok(ret) } /// Slices the `Batch`, returning a new `Batch`. @@ -248,4 +252,97 @@ impl Batch { self.row_count = self_row_count + other_row_count; Ok(()) } + + /// filter the batch with given predicate + pub fn filter(&self, predicate: &BooleanVector) -> Result { + let len = predicate.as_boolean_array().true_count(); + let filter_builder = FilterBuilder::new(predicate.as_boolean_array()).optimize(); + let filter_pred = filter_builder.build(); + let filtered = self + .batch() + .iter() + .map(|col| filter_pred.filter(col.to_arrow_array().as_ref())) + .try_collect::<_, Vec<_>, _>() + .context(ArrowSnafu { + context: "Failed to filter val batches", + })?; + let res_vector = Helper::try_into_vectors(&filtered).context(DataTypeSnafu { + msg: "can't convert arrow array to vector", + })?; + Self::try_new(res_vector, len) + } +} + +/// Vector with diff to note the insert and delete +pub(crate) struct VectorDiff { + vector: VectorRef, + diff: Option, +} + +impl From for VectorDiff { + fn from(vector: VectorRef) -> Self { + Self { vector, diff: None } + } +} + +impl VectorDiff { + fn len(&self) -> usize { + self.vector.len() + } + + fn try_new(vector: VectorRef, diff: Option) -> Result { + ensure!( + diff.as_ref() + .map_or(true, |diff| diff.len() == vector.len()), + InvalidArgumentSnafu { + reason: "Length of vector and diff should be the same" + } + ); + Ok(Self { vector, diff }) + } +} + +impl IntoIterator for VectorDiff { + type Item = (Value, Diff); + type IntoIter = VectorDiffIter; + + fn into_iter(self) -> Self::IntoIter { + VectorDiffIter { + vector: self.vector, + diff: self.diff, + idx: 0, + } + } +} + +/// iterator for VectorDiff +pub(crate) struct VectorDiffIter { + vector: VectorRef, + diff: Option, + idx: usize, +} + +impl std::iter::Iterator for VectorDiffIter { + type Item = (Value, Diff); + + fn next(&mut self) -> Option { + if self.idx >= self.vector.len() { + return None; + } + let value = self.vector.get(self.idx); + // +1 means insert, -1 means delete, and default to +1 insert when diff is not provided + let diff = if let Some(diff) = self.diff.as_ref() { + if let Ok(diff_at) = diff.get(self.idx).try_into() { + diff_at + } else { + common_telemetry::warn!("Invalid diff value at index {}", self.idx); + return None; + } + } else { + 1 + }; + + self.idx += 1; + Some((value, diff)) + } } diff --git a/src/flow/src/expr/df_func.rs b/src/flow/src/expr/df_func.rs index b0a2648dd15e..620615c9eb48 100644 --- a/src/flow/src/expr/df_func.rs +++ b/src/flow/src/expr/df_func.rs @@ -92,12 +92,8 @@ impl DfScalarFunction { let len = rb.num_rows(); - let res = self.fn_impl.evaluate(&rb).map_err(|err| { - EvalDatafusionSnafu { - raw: err, - context: "Failed to evaluate datafusion scalar function", - } - .build() + let res = self.fn_impl.evaluate(&rb).context(EvalDatafusionSnafu { + context: "Failed to evaluate datafusion scalar function", })?; let res = common_query::columnar_value::ColumnarValue::try_from(&res) .map_err(BoxedError::new) @@ -157,12 +153,8 @@ impl DfScalarFunction { .into_error(err) })?; - let res = self.fn_impl.evaluate(&rb).map_err(|err| { - EvalDatafusionSnafu { - raw: err, - context: "Failed to evaluate datafusion scalar function", - } - .build() + let res = self.fn_impl.evaluate(&rb).context(EvalDatafusionSnafu { + context: "Failed to evaluate datafusion scalar function", })?; let res = common_query::columnar_value::ColumnarValue::try_from(&res) .map_err(BoxedError::new) diff --git a/src/flow/src/expr/error.rs b/src/flow/src/expr/error.rs index 6703ce240471..4b69b3df235e 100644 --- a/src/flow/src/expr/error.rs +++ b/src/flow/src/expr/error.rs @@ -106,18 +106,19 @@ pub enum EvalError { location: Location, }, - #[snafu(display("Arrow error: {raw:?}, context: {context}"))] + #[snafu(display("Arrow error: {error:?}, context: {context}"))] Arrow { #[snafu(source)] - raw: ArrowError, + error: ArrowError, context: String, #[snafu(implicit)] location: Location, }, - #[snafu(display("DataFusion error: {raw:?}, context: {context}"))] + #[snafu(display("DataFusion error: {error:?}, context: {context}"))] Datafusion { - raw: DataFusionError, + #[snafu(source)] + error: DataFusionError, context: String, #[snafu(implicit)] location: Location, diff --git a/src/flow/src/expr/func.rs b/src/flow/src/expr/func.rs index 65da763e27d6..36a102972536 100644 --- a/src/flow/src/expr/func.rs +++ b/src/flow/src/expr/func.rs @@ -967,7 +967,7 @@ impl BinaryFunc { | Self::DivUInt32 | Self::DivUInt64 | Self::DivFloat32 - | Self::DivFloat64 => arrow::compute::kernels::numeric::mul(&left, &right) + | Self::DivFloat64 => arrow::compute::kernels::numeric::div(&left, &right) .context(ArrowSnafu { context: "div" })?, Self::ModInt16 @@ -1280,119 +1280,183 @@ where Ok(Value::from(left % right)) } -#[test] -fn test_num_ops() { - let left = Value::from(10); - let right = Value::from(3); - let res = add::(left.clone(), right.clone()).unwrap(); - assert_eq!(res, Value::from(13)); - let res = sub::(left.clone(), right.clone()).unwrap(); - assert_eq!(res, Value::from(7)); - let res = mul::(left.clone(), right.clone()).unwrap(); - assert_eq!(res, Value::from(30)); - let res = div::(left.clone(), right.clone()).unwrap(); - assert_eq!(res, Value::from(3)); - let res = rem::(left, right).unwrap(); - assert_eq!(res, Value::from(1)); - - let values = vec![Value::from(true), Value::from(false)]; - let exprs = vec![ScalarExpr::Column(0), ScalarExpr::Column(1)]; - let res = and(&values, &exprs).unwrap(); - assert_eq!(res, Value::from(false)); - let res = or(&values, &exprs).unwrap(); - assert_eq!(res, Value::from(true)); -} +#[cfg(test)] +mod test { + use std::sync::Arc; -/// test if the binary function specialization works -/// whether from direct type or from the expression that is literal -#[test] -fn test_binary_func_spec() { - assert_eq!( - BinaryFunc::from_str_expr_and_type( - "add", - &[ScalarExpr::Column(0), ScalarExpr::Column(0)], - &[ - Some(ConcreteDataType::int32_datatype()), - Some(ConcreteDataType::int32_datatype()) - ] - ) - .unwrap(), - (BinaryFunc::AddInt32, BinaryFunc::AddInt32.signature()) - ); - - assert_eq!( - BinaryFunc::from_str_expr_and_type( - "add", - &[ScalarExpr::Column(0), ScalarExpr::Column(0)], - &[Some(ConcreteDataType::int32_datatype()), None] - ) - .unwrap(), - (BinaryFunc::AddInt32, BinaryFunc::AddInt32.signature()) - ); - - assert_eq!( - BinaryFunc::from_str_expr_and_type( - "add", - &[ScalarExpr::Column(0), ScalarExpr::Column(0)], - &[Some(ConcreteDataType::int32_datatype()), None] - ) - .unwrap(), - (BinaryFunc::AddInt32, BinaryFunc::AddInt32.signature()) - ); - - assert_eq!( - BinaryFunc::from_str_expr_and_type( - "add", - &[ScalarExpr::Column(0), ScalarExpr::Column(0)], - &[Some(ConcreteDataType::int32_datatype()), None] - ) - .unwrap(), - (BinaryFunc::AddInt32, BinaryFunc::AddInt32.signature()) - ); - - assert_eq!( - BinaryFunc::from_str_expr_and_type( - "add", - &[ - ScalarExpr::Literal(Value::from(1i32), ConcreteDataType::int32_datatype()), - ScalarExpr::Column(0) - ], - &[None, None] - ) - .unwrap(), - (BinaryFunc::AddInt32, BinaryFunc::AddInt32.signature()) - ); - - // this testcase make sure the specialization can find actual type from expression and fill in signature - assert_eq!( - BinaryFunc::from_str_expr_and_type( - "equal", - &[ - ScalarExpr::Literal(Value::from(1i32), ConcreteDataType::int32_datatype()), - ScalarExpr::Column(0) - ], - &[None, None] - ) - .unwrap(), - ( - BinaryFunc::Eq, - Signature { - input: smallvec![ - ConcreteDataType::int32_datatype(), - ConcreteDataType::int32_datatype() + use common_time::Interval; + use datatypes::vectors::Vector; + use pretty_assertions::assert_eq; + + use super::*; + + #[test] + fn test_tumble_batch() { + let datetime_vector = DateTimeVector::from_vec(vec![1, 2, 10, 13, 14, 20, 25]); + let tumble_start = UnaryFunc::TumbleWindowFloor { + window_size: Interval::from_day_time(0, 10), + start_time: None, + }; + let tumble_end = UnaryFunc::TumbleWindowCeiling { + window_size: Interval::from_day_time(0, 10), + start_time: None, + }; + + let len = datetime_vector.len(); + let batch = Batch::try_new(vec![Arc::new(datetime_vector)], len).unwrap(); + let arg = ScalarExpr::Column(0); + + let start = tumble_start.eval_batch(&batch, &arg).unwrap(); + let end = tumble_end.eval_batch(&batch, &arg).unwrap(); + assert_eq!( + start.to_arrow_array().as_ref(), + TimestampMillisecondVector::from_vec(vec![0, 0, 10, 10, 10, 20, 20]) + .to_arrow_array() + .as_ref() + ); + + assert_eq!( + end.to_arrow_array().as_ref(), + TimestampMillisecondVector::from_vec(vec![10, 10, 20, 20, 20, 30, 30]) + .to_arrow_array() + .as_ref() + ); + + let ts_ms_vector = TimestampMillisecondVector::from_vec(vec![1, 2, 10, 13, 14, 20, 25]); + let batch = Batch::try_new(vec![Arc::new(ts_ms_vector)], len).unwrap(); + + let start = tumble_start.eval_batch(&batch, &arg).unwrap(); + let end = tumble_end.eval_batch(&batch, &arg).unwrap(); + + assert_eq!( + start.to_arrow_array().as_ref(), + TimestampMillisecondVector::from_vec(vec![0, 0, 10, 10, 10, 20, 20]) + .to_arrow_array() + .as_ref() + ); + + assert_eq!( + end.to_arrow_array().as_ref(), + TimestampMillisecondVector::from_vec(vec![10, 10, 20, 20, 20, 30, 30]) + .to_arrow_array() + .as_ref() + ); + } + + #[test] + fn test_num_ops() { + let left = Value::from(10); + let right = Value::from(3); + let res = add::(left.clone(), right.clone()).unwrap(); + assert_eq!(res, Value::from(13)); + let res = sub::(left.clone(), right.clone()).unwrap(); + assert_eq!(res, Value::from(7)); + let res = mul::(left.clone(), right.clone()).unwrap(); + assert_eq!(res, Value::from(30)); + let res = div::(left.clone(), right.clone()).unwrap(); + assert_eq!(res, Value::from(3)); + let res = rem::(left, right).unwrap(); + assert_eq!(res, Value::from(1)); + + let values = vec![Value::from(true), Value::from(false)]; + let exprs = vec![ScalarExpr::Column(0), ScalarExpr::Column(1)]; + let res = and(&values, &exprs).unwrap(); + assert_eq!(res, Value::from(false)); + let res = or(&values, &exprs).unwrap(); + assert_eq!(res, Value::from(true)); + } + + /// test if the binary function specialization works + /// whether from direct type or from the expression that is literal + #[test] + fn test_binary_func_spec() { + assert_eq!( + BinaryFunc::from_str_expr_and_type( + "add", + &[ScalarExpr::Column(0), ScalarExpr::Column(0)], + &[ + Some(ConcreteDataType::int32_datatype()), + Some(ConcreteDataType::int32_datatype()) + ] + ) + .unwrap(), + (BinaryFunc::AddInt32, BinaryFunc::AddInt32.signature()) + ); + + assert_eq!( + BinaryFunc::from_str_expr_and_type( + "add", + &[ScalarExpr::Column(0), ScalarExpr::Column(0)], + &[Some(ConcreteDataType::int32_datatype()), None] + ) + .unwrap(), + (BinaryFunc::AddInt32, BinaryFunc::AddInt32.signature()) + ); + + assert_eq!( + BinaryFunc::from_str_expr_and_type( + "add", + &[ScalarExpr::Column(0), ScalarExpr::Column(0)], + &[Some(ConcreteDataType::int32_datatype()), None] + ) + .unwrap(), + (BinaryFunc::AddInt32, BinaryFunc::AddInt32.signature()) + ); + + assert_eq!( + BinaryFunc::from_str_expr_and_type( + "add", + &[ScalarExpr::Column(0), ScalarExpr::Column(0)], + &[Some(ConcreteDataType::int32_datatype()), None] + ) + .unwrap(), + (BinaryFunc::AddInt32, BinaryFunc::AddInt32.signature()) + ); + + assert_eq!( + BinaryFunc::from_str_expr_and_type( + "add", + &[ + ScalarExpr::Literal(Value::from(1i32), ConcreteDataType::int32_datatype()), + ScalarExpr::Column(0) ], - output: ConcreteDataType::boolean_datatype(), - generic_fn: GenericFn::Eq - } - ) - ); - - matches!( - BinaryFunc::from_str_expr_and_type( - "add", - &[ScalarExpr::Column(0), ScalarExpr::Column(0)], - &[None, None] - ), - Err(Error::InvalidQuery { .. }) - ); + &[None, None] + ) + .unwrap(), + (BinaryFunc::AddInt32, BinaryFunc::AddInt32.signature()) + ); + + // this testcase make sure the specialization can find actual type from expression and fill in signature + assert_eq!( + BinaryFunc::from_str_expr_and_type( + "equal", + &[ + ScalarExpr::Literal(Value::from(1i32), ConcreteDataType::int32_datatype()), + ScalarExpr::Column(0) + ], + &[None, None] + ) + .unwrap(), + ( + BinaryFunc::Eq, + Signature { + input: smallvec![ + ConcreteDataType::int32_datatype(), + ConcreteDataType::int32_datatype() + ], + output: ConcreteDataType::boolean_datatype(), + generic_fn: GenericFn::Eq + } + ) + ); + + matches!( + BinaryFunc::from_str_expr_and_type( + "add", + &[ScalarExpr::Column(0), ScalarExpr::Column(0)], + &[None, None] + ), + Err(Error::InvalidQuery { .. }) + ); + } } diff --git a/src/flow/src/expr/linear.rs b/src/flow/src/expr/linear.rs index 3185bdffcf5b..8e220f7d86a2 100644 --- a/src/flow/src/expr/linear.rs +++ b/src/flow/src/expr/linear.rs @@ -17,8 +17,9 @@ use std::collections::{BTreeMap, BTreeSet}; use arrow::array::BooleanArray; +use arrow::buffer::BooleanBuffer; use arrow::compute::FilterBuilder; -use common_telemetry::debug; +use common_telemetry::trace; use datatypes::prelude::ConcreteDataType; use datatypes::value::Value; use datatypes::vectors::{BooleanVector, Helper}; @@ -500,7 +501,7 @@ impl SafeMfpPlan { for col in batch.batch() { let filtered = pred .filter(col.to_arrow_array().as_ref()) - .context(ArrowSnafu { + .with_context(|_| ArrowSnafu { context: format!("failed to filter column for mfp operator {:?}", self), })?; result.push(Helper::try_into_vector(filtered).context(DataTypeSnafu { @@ -523,7 +524,9 @@ impl SafeMfpPlan { // mark the columns that have been evaluated and appended to the `batch` let mut expression = 0; // preds default to true and will be updated as we evaluate each predicate - let mut all_preds = BooleanVector::from(vec![Some(true); batch.row_count()]); + let buf = BooleanBuffer::new_set(batch.row_count()); + let arr = BooleanArray::new(buf, None); + let mut all_preds = BooleanVector::from(arr); // to compute predicate, need to first compute all expressions used in predicates for (support, predicate) in self.mfp.predicates.iter() { @@ -793,7 +796,7 @@ impl MfpPlan { if Some(lower_bound) != upper_bound && !null_eval { if self.mfp.mfp.projection.iter().any(|c| values.len() <= *c) { - debug!("values={:?}, mfp={:?}", &values, &self.mfp.mfp); + trace!("values={:?}, mfp={:?}", &values, &self.mfp.mfp); let err = InternalSnafu { reason: format!( "Index out of bound for mfp={:?} and values={:?}", diff --git a/src/flow/src/expr/relation.rs b/src/flow/src/expr/relation.rs index 3661db4ff0f2..b5d7e4ef2078 100644 --- a/src/flow/src/expr/relation.rs +++ b/src/flow/src/expr/relation.rs @@ -14,6 +14,7 @@ //! Describes an aggregation function and it's input expression. +pub(crate) use accum::{Accum, Accumulator}; pub(crate) use func::AggregateFunc; use crate::expr::ScalarExpr; diff --git a/src/flow/src/expr/relation/func.rs b/src/flow/src/expr/relation/func.rs index afcdb7ddd152..fd72b58bb12e 100644 --- a/src/flow/src/expr/relation/func.rs +++ b/src/flow/src/expr/relation/func.rs @@ -21,14 +21,14 @@ use datatypes::value::Value; use datatypes::vectors::VectorRef; use serde::{Deserialize, Serialize}; use smallvec::smallvec; -use snafu::{ensure, IntoError, OptionExt}; +use snafu::{IntoError, OptionExt}; use strum::{EnumIter, IntoEnumIterator}; use crate::error::{DatafusionSnafu, Error, InvalidQuerySnafu}; use crate::expr::error::EvalError; use crate::expr::relation::accum::{Accum, Accumulator}; use crate::expr::signature::{GenericFn, Signature}; -use crate::expr::InvalidArgumentSnafu; +use crate::expr::VectorDiff; use crate::repr::Diff; /// Aggregate functions that can be applied to a group of rows. @@ -161,72 +161,6 @@ impl AggregateFunc { } } -struct VectorDiff { - vector: VectorRef, - diff: Option, -} - -impl VectorDiff { - fn len(&self) -> usize { - self.vector.len() - } - - fn try_new(vector: VectorRef, diff: Option) -> Result { - ensure!( - diff.as_ref() - .map_or(true, |diff| diff.len() == vector.len()), - InvalidArgumentSnafu { - reason: "Length of vector and diff should be the same" - } - ); - Ok(Self { vector, diff }) - } -} - -impl IntoIterator for VectorDiff { - type Item = (Value, Diff); - type IntoIter = VectorDiffIter; - - fn into_iter(self) -> Self::IntoIter { - VectorDiffIter { - vector: self.vector, - diff: self.diff, - idx: 0, - } - } -} - -struct VectorDiffIter { - vector: VectorRef, - diff: Option, - idx: usize, -} - -impl std::iter::Iterator for VectorDiffIter { - type Item = (Value, Diff); - - fn next(&mut self) -> Option { - if self.idx >= self.vector.len() { - return None; - } - let value = self.vector.get(self.idx); - // +1 means insert, -1 means delete, and default to +1 insert when diff is not provided - let diff = if let Some(diff) = self.diff.as_ref() { - if let Ok(diff_at) = diff.get(self.idx).try_into() { - diff_at - } else { - common_telemetry::warn!("Invalid diff value at index {}", self.idx); - return None; - } - } else { - 1 - }; - - self.idx += 1; - Some((value, diff)) - } -} - /// Generate signature for each aggregate function macro_rules! generate_signature { ($value:ident, diff --git a/src/flow/src/expr/scalar.rs b/src/flow/src/expr/scalar.rs index 6a9cf30d950c..a6e00cce5bdd 100644 --- a/src/flow/src/expr/scalar.rs +++ b/src/flow/src/expr/scalar.rs @@ -16,17 +16,20 @@ use std::collections::{BTreeMap, BTreeSet}; +use arrow::array::{make_array, ArrayData, ArrayRef}; use common_error::ext::BoxedError; use datatypes::prelude::{ConcreteDataType, DataType}; use datatypes::value::Value; -use datatypes::vectors::{BooleanVector, Helper, NullVector, Vector, VectorRef}; +use datatypes::vectors::{BooleanVector, Helper, VectorRef}; +use hydroflow::lattices::cc_traits::Iter; +use itertools::Itertools; use snafu::{ensure, OptionExt, ResultExt}; use crate::error::{ DatafusionSnafu, Error, InvalidQuerySnafu, UnexpectedSnafu, UnsupportedTemporalFilterSnafu, }; use crate::expr::error::{ - DataTypeSnafu, EvalError, InternalSnafu, InvalidArgumentSnafu, OptimizeSnafu, TypeMismatchSnafu, + ArrowSnafu, DataTypeSnafu, EvalError, InvalidArgumentSnafu, OptimizeSnafu, TypeMismatchSnafu, }; use crate::expr::func::{BinaryFunc, UnaryFunc, UnmaterializableFunc, VariadicFunc}; use crate::expr::{Batch, DfScalarFunction}; @@ -222,6 +225,8 @@ impl ScalarExpr { } } + /// NOTE: this if then eval impl assume all given expr are pure, and will not change the state of the world + /// since it will evaluate both then and else branch and filter the result fn eval_if_then( batch: &Batch, cond: &ScalarExpr, @@ -240,130 +245,69 @@ impl ScalarExpr { })? .as_boolean_array(); - let mut then_input_batch = None; - let mut else_input_batch = None; - let mut null_input_batch = None; - - // instructions for how to reassembly result vector, - // iterate over (type of vec, offset, length) and append to resulting vec - let mut assembly_idx = vec![]; - - // append batch, returning appended batch's slice in (offset, length) - fn append_batch( - batch: &mut Option, - to_be_append: Batch, - ) -> Result<(usize, usize), EvalError> { - let len = to_be_append.row_count(); - if let Some(batch) = batch { - let offset = batch.row_count(); - batch.append_batch(to_be_append)?; - Ok((offset, len)) - } else { - *batch = Some(to_be_append); - Ok((0, len)) + let indices = bool_conds + .into_iter() + .enumerate() + .map(|(idx, b)| { + ( + match b { + Some(true) => 0, // then branch vector + Some(false) => 1, // else branch vector + None => 2, // null vector + }, + idx, + ) + }) + .collect_vec(); + + let then_input_vec = then.eval_batch(batch)?; + let else_input_vec = els.eval_batch(batch)?; + + ensure!( + then_input_vec.data_type() == else_input_vec.data_type(), + TypeMismatchSnafu { + expected: then_input_vec.data_type(), + actual: else_input_vec.data_type(), } - } + ); - let mut prev_cond: Option> = None; - let mut prev_start_idx: Option = None; - // first put different conds' vector into different batches - for (idx, cond) in bool_conds.iter().enumerate() { - // if belong to same slice and not last one continue - if prev_cond == Some(cond) { - continue; - } else if let Some(prev_cond_idx) = prev_start_idx { - let prev_cond = prev_cond.unwrap(); - - // put a slice to corresponding batch - let slice_offset = prev_cond_idx; - let slice_length = idx - prev_cond_idx; - let to_be_append = batch.slice(slice_offset, slice_length)?; - - let to_put_back = match prev_cond { - Some(true) => ( - Some(true), - append_batch(&mut then_input_batch, to_be_append)?, - ), - Some(false) => ( - Some(false), - append_batch(&mut else_input_batch, to_be_append)?, - ), - None => (None, append_batch(&mut null_input_batch, to_be_append)?), - }; - assembly_idx.push(to_put_back); + ensure!( + then_input_vec.len() == else_input_vec.len() && then_input_vec.len() == batch.row_count(), + InvalidArgumentSnafu { + reason: format!( + "then and else branch must have the same length(found {} and {}) which equals input batch's row count(which is {})", + then_input_vec.len(), + else_input_vec.len(), + batch.row_count() + ) } - prev_cond = Some(cond); - prev_start_idx = Some(idx); - } + ); - // deal with empty and last slice case - if let Some(slice_offset) = prev_start_idx { - let prev_cond = prev_cond.unwrap(); - let slice_length = bool_conds.len() - slice_offset; - let to_be_append = batch.slice(slice_offset, slice_length)?; - let to_put_back = match prev_cond { - Some(true) => ( - Some(true), - append_batch(&mut then_input_batch, to_be_append)?, - ), - Some(false) => ( - Some(false), - append_batch(&mut else_input_batch, to_be_append)?, - ), - None => (None, append_batch(&mut null_input_batch, to_be_append)?), - }; - assembly_idx.push(to_put_back); + fn new_nulls(dt: &arrow_schema::DataType, len: usize) -> ArrayRef { + let data = ArrayData::new_null(dt, len); + make_array(data) } - let then_output_vec = then_input_batch - .map(|batch| then.eval_batch(&batch)) - .transpose()?; - let else_output_vec = else_input_batch - .map(|batch| els.eval_batch(&batch)) - .transpose()?; - let null_output_vec = null_input_batch - .map(|null| NullVector::new(null.row_count()).slice(0, null.row_count())); - - let dt = then_output_vec - .as_ref() - .map(|v| v.data_type()) - .or(else_output_vec.as_ref().map(|v| v.data_type())) - .unwrap_or(ConcreteDataType::null_datatype()); - let mut builder = dt.create_mutable_vector(conds.len()); - for (cond, (offset, length)) in assembly_idx { - let slice = match cond { - Some(true) => then_output_vec.as_ref(), - Some(false) => else_output_vec.as_ref(), - None => null_output_vec.as_ref(), - } - .context(InternalSnafu { - reason: "Expect corresponding output vector to exist", - })?; - // TODO(discord9): seems `extend_slice_of` doesn't support NullVector or ConstantVector - // consider adding it maybe? - if slice.data_type().is_null() { - builder.push_nulls(length); - } else if slice.is_const() { - let arr = slice.slice(offset, length).to_arrow_array(); - let vector = Helper::try_into_vector(arr).context(DataTypeSnafu { - msg: "Failed to convert arrow array to vector", - })?; - builder - .extend_slice_of(vector.as_ref(), 0, vector.len()) - .context(DataTypeSnafu { - msg: "Failed to build result vector for if-then expression", - })?; - } else { - builder - .extend_slice_of(slice.as_ref(), offset, length) - .context(DataTypeSnafu { - msg: "Failed to build result vector for if-then expression", - })?; - } - } - let result_vec = builder.to_vector(); + let null_input_vec = new_nulls( + &then_input_vec.data_type().as_arrow_type(), + batch.row_count(), + ); + + let interleave_values = vec![ + then_input_vec.to_arrow_array(), + else_input_vec.to_arrow_array(), + null_input_vec, + ]; + let int_ref: Vec<_> = interleave_values.iter().map(|x| x.as_ref()).collect(); - Ok(result_vec) + let interleave_res_arr = + arrow::compute::interleave(&int_ref, &indices).context(ArrowSnafu { + context: "Failed to interleave output arrays", + })?; + let res_vec = Helper::try_into_vector(interleave_res_arr).context(DataTypeSnafu { + msg: "Failed to convert arrow array to vector", + })?; + Ok(res_vec) } /// Eval this expression with the given values. @@ -685,7 +629,7 @@ impl ScalarExpr { #[cfg(test)] mod test { - use datatypes::vectors::Int32Vector; + use datatypes::vectors::{Int32Vector, Vector}; use pretty_assertions::assert_eq; use super::*; @@ -781,7 +725,7 @@ mod test { } #[test] - fn test_eval_batch() { + fn test_eval_batch_if_then() { // TODO(discord9): add more tests { let expr = ScalarExpr::If { @@ -840,7 +784,7 @@ mod test { let vectors = vec![Int32Vector::from(raw).slice(0, raw_len)]; let batch = Batch::try_new(vectors, raw_len).unwrap(); - let expected = NullVector::new(raw_len).slice(0, raw_len); + let expected = Int32Vector::from(vec![]).slice(0, raw_len); assert_eq!(expr.eval_batch(&batch).unwrap(), expected); } } diff --git a/src/flow/src/repr.rs b/src/flow/src/repr.rs index 7b57fc3ed22a..acec71ebcfd7 100644 --- a/src/flow/src/repr.rs +++ b/src/flow/src/repr.rs @@ -50,12 +50,13 @@ pub type KeyValDiffRow = ((Row, Row), Timestamp, Diff); /// broadcast channel capacity, can be important to memory consumption, since this influence how many /// updates can be buffered in memory in the entire dataflow /// TODO(discord9): add config for this, so cpu&mem usage can be balanced and configured by this -pub const BROADCAST_CAP: usize = 65535; +pub const BROADCAST_CAP: usize = 1024; /// The maximum capacity of the send buffer, to prevent the buffer from growing too large pub const SEND_BUF_CAP: usize = BROADCAST_CAP * 2; -pub const BATCH_SIZE: usize = BROADCAST_CAP / 2; +/// Flow worker will try to at least accumulate this many rows before processing them(if one second havn't passed) +pub const BATCH_SIZE: usize = 32 * 16384; /// Convert a value that is or can be converted to Datetime to internal timestamp /// diff --git a/src/flow/src/utils.rs b/src/flow/src/utils.rs index 269c53fa84aa..1cd5b3ba5c1c 100644 --- a/src/flow/src/utils.rs +++ b/src/flow/src/utils.rs @@ -18,7 +18,7 @@ use std::collections::{BTreeMap, BTreeSet}; use std::ops::Bound; use std::sync::Arc; -use common_telemetry::debug; +use common_telemetry::trace; use smallvec::{smallvec, SmallVec}; use tokio::sync::RwLock; @@ -235,9 +235,11 @@ impl Arrangement { if let Some(s) = &mut self.expire_state { if let Some(expired_by) = s.get_expire_duration_and_update_event_ts(now, &key)? { max_expired_by = max_expired_by.max(Some(expired_by)); - debug!( + trace!( "Expired key: {:?}, expired by: {:?} with time being now={}", - key, expired_by, now + key, + expired_by, + now ); continue; } diff --git a/tests/cases/standalone/common/flow/flow_basic.result b/tests/cases/standalone/common/flow/flow_basic.result index db3d3c8c3b3d..6aaa2c74be5b 100644 --- a/tests/cases/standalone/common/flow/flow_basic.result +++ b/tests/cases/standalone/common/flow/flow_basic.result @@ -7,10 +7,13 @@ CREATE TABLE numbers_input_basic ( Affected Rows: 0 -CREATE FLOW test_numbers_basic -SINK TO out_num_cnt_basic -AS -SELECT sum(number) FROM numbers_input_basic GROUP BY tumble(ts, '1 second', '2021-07-01 00:00:00'); +CREATE FLOW test_numbers_basic SINK TO out_num_cnt_basic AS +SELECT + sum(number) +FROM + numbers_input_basic +GROUP BY + tumble(ts, '1 second', '2021-07-01 00:00:00'); Affected Rows: 0 @@ -25,7 +28,8 @@ admin flush_flow('test_numbers_basic'); +----------------------------------------+ -- SQLNESS ARG restart=true -INSERT INTO numbers_input_basic +INSERT INTO + numbers_input_basic VALUES (20, "2021-07-01 00:00:00.200"), (22, "2021-07-01 00:00:00.600"); @@ -40,7 +44,12 @@ admin flush_flow('test_numbers_basic'); | 1 | +----------------------------------------+ -SELECT "SUM(numbers_input_basic.number)", window_start, window_end FROM out_num_cnt_basic; +SELECT + "SUM(numbers_input_basic.number)", + window_start, + window_end +FROM + out_num_cnt_basic; +---------------------------------+---------------------+---------------------+ | SUM(numbers_input_basic.number) | window_start | window_end | @@ -56,10 +65,11 @@ admin flush_flow('test_numbers_basic'); | 0 | +----------------------------------------+ -INSERT INTO numbers_input_basic +INSERT INTO + numbers_input_basic VALUES - (23,"2021-07-01 00:00:01.000"), - (24,"2021-07-01 00:00:01.500"); + (23, "2021-07-01 00:00:01.000"), + (24, "2021-07-01 00:00:01.500"); Affected Rows: 2 @@ -72,7 +82,12 @@ admin flush_flow('test_numbers_basic'); +----------------------------------------+ -- note that this quote-unquote column is a column-name, **not** a aggregation expr, generated by datafusion -SELECT "SUM(numbers_input_basic.number)", window_start, window_end FROM out_num_cnt_basic; +SELECT + "SUM(numbers_input_basic.number)", + window_start, + window_end +FROM + out_num_cnt_basic; +---------------------------------+---------------------+---------------------+ | SUM(numbers_input_basic.number) | window_start | window_end | @@ -93,6 +108,114 @@ DROP TABLE out_num_cnt_basic; Affected Rows: 0 +-- test distinct +CREATE TABLE distinct_basic ( + number INT, + ts TIMESTAMP DEFAULT CURRENT_TIMESTAMP, + PRIMARY KEY(number), + TIME INDEX(ts) +); + +Affected Rows: 0 + +CREATE FLOW test_distinct_basic SINK TO out_distinct_basic AS +SELECT + DISTINCT number as dis +FROM + distinct_basic; + +Affected Rows: 0 + +-- TODO(discord9): confirm if it's necessary to flush flow here? +-- because flush_flow result is at most 1 +admin flush_flow('test_distinct_basic'); + ++-----------------------------------------+ +| ADMIN flush_flow('test_distinct_basic') | ++-----------------------------------------+ +| 0 | ++-----------------------------------------+ + +-- SQLNESS ARG restart=true +INSERT INTO + distinct_basic +VALUES + (20, "2021-07-01 00:00:00.200"), + (22, "2021-07-01 00:00:00.600"); + +Affected Rows: 2 + +admin flush_flow('test_distinct_basic'); + ++-----------------------------------------+ +| ADMIN flush_flow('test_distinct_basic') | ++-----------------------------------------+ +| 1 | ++-----------------------------------------+ + +SELECT + dis +FROM + out_distinct_basic; + ++-----+ +| dis | ++-----+ +| 20 | +| 22 | ++-----+ + +admin flush_flow('test_distinct_basic'); + ++-----------------------------------------+ +| ADMIN flush_flow('test_distinct_basic') | ++-----------------------------------------+ +| 0 | ++-----------------------------------------+ + +INSERT INTO + distinct_basic +VALUES + (23, "2021-07-01 00:00:01.000"), + (24, "2021-07-01 00:00:01.500"); + +Affected Rows: 2 + +admin flush_flow('test_distinct_basic'); + ++-----------------------------------------+ +| ADMIN flush_flow('test_distinct_basic') | ++-----------------------------------------+ +| 1 | ++-----------------------------------------+ + +-- note that this quote-unquote column is a column-name, **not** a aggregation expr, generated by datafusion +SELECT + dis +FROM + out_distinct_basic; + ++-----+ +| dis | ++-----+ +| 20 | +| 22 | +| 23 | +| 24 | ++-----+ + +DROP FLOW test_distinct_basic; + +Affected Rows: 0 + +DROP TABLE distinct_basic; + +Affected Rows: 0 + +DROP TABLE out_distinct_basic; + +Affected Rows: 0 + -- test interprete interval CREATE TABLE numbers_input_basic ( number INT, @@ -105,11 +228,20 @@ Affected Rows: 0 create table out_num_cnt_basic ( number INT, - ts TIMESTAMP DEFAULT CURRENT_TIMESTAMP TIME INDEX); + ts TIMESTAMP DEFAULT CURRENT_TIMESTAMP TIME INDEX +); Affected Rows: 0 -CREATE FLOW filter_numbers_basic SINK TO out_num_cnt_basic AS SELECT INTERVAL '1 day 1 second', INTERVAL '1 month 1 day 1 second', INTERVAL '1 year 1 month' FROM numbers_input_basic where number > 10; +CREATE FLOW filter_numbers_basic SINK TO out_num_cnt_basic AS +SELECT + INTERVAL '1 day 1 second', + INTERVAL '1 month 1 day 1 second', + INTERVAL '1 year 1 month' +FROM + numbers_input_basic +where + number > 10; Affected Rows: 0 @@ -137,7 +269,8 @@ Affected Rows: 0 CREATE TABLE bytes_log ( byte INT, - ts TIMESTAMP DEFAULT CURRENT_TIMESTAMP, -- event time + ts TIMESTAMP DEFAULT CURRENT_TIMESTAMP, + -- event time TIME INDEX(ts) ); @@ -153,16 +286,22 @@ CREATE TABLE approx_rate ( Affected Rows: 0 -CREATE FLOW find_approx_rate -SINK TO approx_rate -AS -SELECT (max(byte) - min(byte))/30.0 as rate, date_bin(INTERVAL '30 second', ts) as time_window from bytes_log GROUP BY time_window; +CREATE FLOW find_approx_rate SINK TO approx_rate AS +SELECT + (max(byte) - min(byte)) / 30.0 as rate, + date_bin(INTERVAL '30 second', ts) as time_window +from + bytes_log +GROUP BY + time_window; Affected Rows: 0 -INSERT INTO bytes_log VALUES -(101, '2025-01-01 00:00:01'), -(300, '2025-01-01 00:00:29'); +INSERT INTO + bytes_log +VALUES + (101, '2025-01-01 00:00:01'), + (300, '2025-01-01 00:00:29'); Affected Rows: 2 @@ -174,7 +313,11 @@ admin flush_flow('find_approx_rate'); | 1 | +--------------------------------------+ -SELECT rate, time_window FROM approx_rate; +SELECT + rate, + time_window +FROM + approx_rate; +-------------------+---------------------+ | rate | time_window | @@ -182,9 +325,11 @@ SELECT rate, time_window FROM approx_rate; | 6.633333333333334 | 2025-01-01T00:00:00 | +-------------------+---------------------+ -INSERT INTO bytes_log VALUES -(450, '2025-01-01 00:00:32'), -(500, '2025-01-01 00:00:37'); +INSERT INTO + bytes_log +VALUES + (450, '2025-01-01 00:00:32'), + (500, '2025-01-01 00:00:37'); Affected Rows: 2 @@ -196,7 +341,11 @@ admin flush_flow('find_approx_rate'); | 1 | +--------------------------------------+ -SELECT rate, time_window FROM approx_rate; +SELECT + rate, + time_window +FROM + approx_rate; +--------------------+---------------------+ | rate | time_window | @@ -217,3 +366,474 @@ DROP TABLE approx_rate; Affected Rows: 0 +-- input table +CREATE TABLE ngx_access_log ( + client STRING, + country STRING, + access_time TIMESTAMP TIME INDEX +); + +Affected Rows: 0 + +-- create flow task to calculate the distinct country +CREATE FLOW calc_ngx_country SINK TO ngx_country AS +SELECT + DISTINCT country, +FROM + ngx_access_log; + +Affected Rows: 0 + +INSERT INTO + ngx_access_log +VALUES + ("cli1", "b", 0); + +Affected Rows: 1 + +ADMIN FLUSH_FLOW('calc_ngx_country'); + ++--------------------------------------+ +| ADMIN FLUSH_FLOW('calc_ngx_country') | ++--------------------------------------+ +| 1 | ++--------------------------------------+ + +SELECT + "ngx_access_log.country" +FROM + ngx_country; + ++------------------------+ +| ngx_access_log.country | ++------------------------+ +| b | ++------------------------+ + +-- making sure distinct is working +INSERT INTO + ngx_access_log +VALUES + ("cli1", "b", 1); + +Affected Rows: 1 + +ADMIN FLUSH_FLOW('calc_ngx_country'); + ++--------------------------------------+ +| ADMIN FLUSH_FLOW('calc_ngx_country') | ++--------------------------------------+ +| 1 | ++--------------------------------------+ + +SELECT + "ngx_access_log.country" +FROM + ngx_country; + ++------------------------+ +| ngx_access_log.country | ++------------------------+ +| b | ++------------------------+ + +INSERT INTO + ngx_access_log +VALUES + ("cli1", "c", 2); + +Affected Rows: 1 + +ADMIN FLUSH_FLOW('calc_ngx_country'); + ++--------------------------------------+ +| ADMIN FLUSH_FLOW('calc_ngx_country') | ++--------------------------------------+ +| 1 | ++--------------------------------------+ + +SELECT + "ngx_access_log.country" +FROM + ngx_country; + ++------------------------+ +| ngx_access_log.country | ++------------------------+ +| b | +| c | ++------------------------+ + +DROP FLOW calc_ngx_country; + +Affected Rows: 0 + +DROP TABLE ngx_access_log; + +Affected Rows: 0 + +DROP TABLE ngx_country; + +Affected Rows: 0 + +CREATE TABLE ngx_access_log ( + client STRING, + country STRING, + access_time TIMESTAMP TIME INDEX +); + +Affected Rows: 0 + +CREATE FLOW calc_ngx_country SINK TO ngx_country AS +SELECT + DISTINCT country, + -- this distinct is not necessary, but it's a good test to see if it works + date_bin(INTERVAL '1 hour', access_time) as time_window, +FROM + ngx_access_log +GROUP BY + country, + time_window; + +Affected Rows: 0 + +INSERT INTO + ngx_access_log +VALUES + ("cli1", "b", 0); + +Affected Rows: 1 + +ADMIN FLUSH_FLOW('calc_ngx_country'); + ++--------------------------------------+ +| ADMIN FLUSH_FLOW('calc_ngx_country') | ++--------------------------------------+ +| 1 | ++--------------------------------------+ + +SELECT + "ngx_access_log.country", + time_window +FROM + ngx_country; + ++------------------------+---------------------+ +| ngx_access_log.country | time_window | ++------------------------+---------------------+ +| b | 1970-01-01T00:00:00 | ++------------------------+---------------------+ + +-- making sure distinct is working +INSERT INTO + ngx_access_log +VALUES + ("cli1", "b", 1); + +Affected Rows: 1 + +ADMIN FLUSH_FLOW('calc_ngx_country'); + ++--------------------------------------+ +| ADMIN FLUSH_FLOW('calc_ngx_country') | ++--------------------------------------+ +| 1 | ++--------------------------------------+ + +SELECT + "ngx_access_log.country", + time_window +FROM + ngx_country; + ++------------------------+---------------------+ +| ngx_access_log.country | time_window | ++------------------------+---------------------+ +| b | 1970-01-01T00:00:00 | ++------------------------+---------------------+ + +INSERT INTO + ngx_access_log +VALUES + ("cli1", "c", 2); + +Affected Rows: 1 + +ADMIN FLUSH_FLOW('calc_ngx_country'); + ++--------------------------------------+ +| ADMIN FLUSH_FLOW('calc_ngx_country') | ++--------------------------------------+ +| 1 | ++--------------------------------------+ + +SELECT + "ngx_access_log.country", + time_window +FROM + ngx_country; + ++------------------------+---------------------+ +| ngx_access_log.country | time_window | ++------------------------+---------------------+ +| b | 1970-01-01T00:00:00 | +| c | 1970-01-01T00:00:00 | ++------------------------+---------------------+ + +DROP FLOW calc_ngx_country; + +Affected Rows: 0 + +DROP TABLE ngx_access_log; + +Affected Rows: 0 + +DROP TABLE ngx_country; + +Affected Rows: 0 + +CREATE TABLE temp_sensor_data ( + sensor_id INT, + loc STRING, + temperature DOUBLE, + ts TIMESTAMP TIME INDEX +); + +Affected Rows: 0 + +CREATE TABLE temp_alerts ( + sensor_id INT, + loc STRING, + max_temp DOUBLE, + ts TIMESTAMP TIME INDEX +); + +Affected Rows: 0 + +CREATE FLOW temp_monitoring SINK TO temp_alerts AS +SELECT + sensor_id, + loc, + max(temperature) as max_temp, +FROM + temp_sensor_data +GROUP BY + sensor_id, + loc +HAVING + max_temp > 100; + +Affected Rows: 0 + +INSERT INTO + temp_sensor_data +VALUES + (1, "room1", 50, 0); + +Affected Rows: 1 + +ADMIN FLUSH_FLOW('temp_monitoring'); + ++-------------------------------------+ +| ADMIN FLUSH_FLOW('temp_monitoring') | ++-------------------------------------+ +| 1 | ++-------------------------------------+ + +-- This table should not exist yet +SHOW TABLES LIKE 'temp_alerts'; + ++-------------+ +| Tables | ++-------------+ +| temp_alerts | ++-------------+ + +INSERT INTO + temp_sensor_data +VALUES + (1, "room1", 150, 1); + +Affected Rows: 1 + +ADMIN FLUSH_FLOW('temp_monitoring'); + ++-------------------------------------+ +| ADMIN FLUSH_FLOW('temp_monitoring') | ++-------------------------------------+ +| 1 | ++-------------------------------------+ + +SHOW TABLES LIKE 'temp_alerts'; + ++-------------+ +| Tables | ++-------------+ +| temp_alerts | ++-------------+ + +SELECT + sensor_id, + loc, + max_temp +FROM + temp_alerts; + ++-----------+-------+----------+ +| sensor_id | loc | max_temp | ++-----------+-------+----------+ +| 1 | room1 | 150.0 | ++-----------+-------+----------+ + +INSERT INTO + temp_sensor_data +VALUES + (2, "room1", 0, 2); + +Affected Rows: 1 + +ADMIN FLUSH_FLOW('temp_monitoring'); + ++-------------------------------------+ +| ADMIN FLUSH_FLOW('temp_monitoring') | ++-------------------------------------+ +| 1 | ++-------------------------------------+ + +SELECT + sensor_id, + loc, + max_temp +FROM + temp_alerts; + ++-----------+-------+----------+ +| sensor_id | loc | max_temp | ++-----------+-------+----------+ +| 1 | room1 | 150.0 | ++-----------+-------+----------+ + +DROP FLOW temp_monitoring; + +Affected Rows: 0 + +DROP TABLE temp_sensor_data; + +Affected Rows: 0 + +DROP TABLE temp_alerts; + +Affected Rows: 0 + +CREATE TABLE ngx_access_log ( + client STRING, + stat INT, + size INT, + access_time TIMESTAMP TIME INDEX +); + +Affected Rows: 0 + +CREATE TABLE ngx_distribution ( + stat INT, + bucket_size INT, + total_logs BIGINT, + time_window TIMESTAMP TIME INDEX, + update_at TIMESTAMP, -- auto generated column by flow engine + PRIMARY KEY(stat, bucket_size) +); + +Affected Rows: 0 + +CREATE FLOW calc_ngx_distribution SINK TO ngx_distribution AS +SELECT + stat, + trunc(size, -1)::INT as bucket_size, + count(client) AS total_logs, + date_bin(INTERVAL '1 minutes', access_time) as time_window, +FROM + ngx_access_log +GROUP BY + stat, + time_window, + bucket_size; + +Affected Rows: 0 + +INSERT INTO + ngx_access_log +VALUES + ("cli1", 200, 100, 0); + +Affected Rows: 1 + +ADMIN FLUSH_FLOW('calc_ngx_distribution'); + ++-------------------------------------------+ +| ADMIN FLUSH_FLOW('calc_ngx_distribution') | ++-------------------------------------------+ +| 1 | ++-------------------------------------------+ + +SELECT + stat, + bucket_size, + total_logs, + time_window +FROM + ngx_distribution; + ++------+-------------+------------+---------------------+ +| stat | bucket_size | total_logs | time_window | ++------+-------------+------------+---------------------+ +| 200 | 100 | 1 | 1970-01-01T00:00:00 | ++------+-------------+------------+---------------------+ + +INSERT INTO + ngx_access_log +VALUES + ("cli1", 200, 200, 1), + ("cli1", 200, 205, 1), + ("cli1", 200, 209, 1), + ("cli1", 200, 210, 1), + ("cli2", 200, 300, 1); + +Affected Rows: 5 + +ADMIN FLUSH_FLOW('calc_ngx_distribution'); + ++-------------------------------------------+ +| ADMIN FLUSH_FLOW('calc_ngx_distribution') | ++-------------------------------------------+ +| 1 | ++-------------------------------------------+ + +SELECT + stat, + bucket_size, + total_logs, + time_window +FROM + ngx_distribution; + ++------+-------------+------------+---------------------+ +| stat | bucket_size | total_logs | time_window | ++------+-------------+------------+---------------------+ +| 200 | 100 | 1 | 1970-01-01T00:00:00 | +| 200 | 200 | 1 | 1970-01-01T00:00:00 | +| 200 | 210 | 3 | 1970-01-01T00:00:00 | +| 200 | 300 | 1 | 1970-01-01T00:00:00 | ++------+-------------+------------+---------------------+ + +DROP FLOW calc_ngx_distribution; + +Affected Rows: 0 + +DROP TABLE ngx_access_log; + +Affected Rows: 0 + +DROP TABLE ngx_distribution; + +Affected Rows: 0 + diff --git a/tests/cases/standalone/common/flow/flow_basic.sql b/tests/cases/standalone/common/flow/flow_basic.sql index b9ccc810585c..9356508aba4f 100644 --- a/tests/cases/standalone/common/flow/flow_basic.sql +++ b/tests/cases/standalone/common/flow/flow_basic.sql @@ -5,54 +5,134 @@ CREATE TABLE numbers_input_basic ( TIME INDEX(ts) ); -CREATE FLOW test_numbers_basic -SINK TO out_num_cnt_basic -AS -SELECT sum(number) FROM numbers_input_basic GROUP BY tumble(ts, '1 second', '2021-07-01 00:00:00'); +CREATE FLOW test_numbers_basic SINK TO out_num_cnt_basic AS +SELECT + sum(number) +FROM + numbers_input_basic +GROUP BY + tumble(ts, '1 second', '2021-07-01 00:00:00'); -- TODO(discord9): confirm if it's necessary to flush flow here? -- because flush_flow result is at most 1 admin flush_flow('test_numbers_basic'); -- SQLNESS ARG restart=true -INSERT INTO numbers_input_basic +INSERT INTO + numbers_input_basic VALUES (20, "2021-07-01 00:00:00.200"), (22, "2021-07-01 00:00:00.600"); admin flush_flow('test_numbers_basic'); -SELECT "SUM(numbers_input_basic.number)", window_start, window_end FROM out_num_cnt_basic; +SELECT + "SUM(numbers_input_basic.number)", + window_start, + window_end +FROM + out_num_cnt_basic; admin flush_flow('test_numbers_basic'); -INSERT INTO numbers_input_basic +INSERT INTO + numbers_input_basic VALUES - (23,"2021-07-01 00:00:01.000"), - (24,"2021-07-01 00:00:01.500"); + (23, "2021-07-01 00:00:01.000"), + (24, "2021-07-01 00:00:01.500"); admin flush_flow('test_numbers_basic'); -- note that this quote-unquote column is a column-name, **not** a aggregation expr, generated by datafusion -SELECT "SUM(numbers_input_basic.number)", window_start, window_end FROM out_num_cnt_basic; +SELECT + "SUM(numbers_input_basic.number)", + window_start, + window_end +FROM + out_num_cnt_basic; DROP FLOW test_numbers_basic; + DROP TABLE numbers_input_basic; + DROP TABLE out_num_cnt_basic; --- test interprete interval +-- test distinct +CREATE TABLE distinct_basic ( + number INT, + ts TIMESTAMP DEFAULT CURRENT_TIMESTAMP, + PRIMARY KEY(number), + TIME INDEX(ts) +); + +CREATE FLOW test_distinct_basic SINK TO out_distinct_basic AS +SELECT + DISTINCT number as dis +FROM + distinct_basic; + +-- TODO(discord9): confirm if it's necessary to flush flow here? +-- because flush_flow result is at most 1 +admin flush_flow('test_distinct_basic'); + +-- SQLNESS ARG restart=true +INSERT INTO + distinct_basic +VALUES + (20, "2021-07-01 00:00:00.200"), + (22, "2021-07-01 00:00:00.600"); + +admin flush_flow('test_distinct_basic'); + +SELECT + dis +FROM + out_distinct_basic; + +admin flush_flow('test_distinct_basic'); + +INSERT INTO + distinct_basic +VALUES + (23, "2021-07-01 00:00:01.000"), + (24, "2021-07-01 00:00:01.500"); + +admin flush_flow('test_distinct_basic'); + +-- note that this quote-unquote column is a column-name, **not** a aggregation expr, generated by datafusion +SELECT + dis +FROM + out_distinct_basic; + +DROP FLOW test_distinct_basic; + +DROP TABLE distinct_basic; + +DROP TABLE out_distinct_basic; +-- test interprete interval CREATE TABLE numbers_input_basic ( number INT, ts TIMESTAMP DEFAULT CURRENT_TIMESTAMP, PRIMARY KEY(number), TIME INDEX(ts) ); + create table out_num_cnt_basic ( number INT, - ts TIMESTAMP DEFAULT CURRENT_TIMESTAMP TIME INDEX); + ts TIMESTAMP DEFAULT CURRENT_TIMESTAMP TIME INDEX +); -CREATE FLOW filter_numbers_basic SINK TO out_num_cnt_basic AS SELECT INTERVAL '1 day 1 second', INTERVAL '1 month 1 day 1 second', INTERVAL '1 year 1 month' FROM numbers_input_basic where number > 10; +CREATE FLOW filter_numbers_basic SINK TO out_num_cnt_basic AS +SELECT + INTERVAL '1 day 1 second', + INTERVAL '1 month 1 day 1 second', + INTERVAL '1 year 1 month' +FROM + numbers_input_basic +where + number > 10; SHOW CREATE FLOW filter_numbers_basic; @@ -64,7 +144,8 @@ drop table numbers_input_basic; CREATE TABLE bytes_log ( byte INT, - ts TIMESTAMP DEFAULT CURRENT_TIMESTAMP, -- event time + ts TIMESTAMP DEFAULT CURRENT_TIMESTAMP, + -- event time TIME INDEX(ts) ); @@ -76,27 +157,307 @@ CREATE TABLE approx_rate ( TIME INDEX(time_window) ); -CREATE FLOW find_approx_rate -SINK TO approx_rate -AS -SELECT (max(byte) - min(byte))/30.0 as rate, date_bin(INTERVAL '30 second', ts) as time_window from bytes_log GROUP BY time_window; +CREATE FLOW find_approx_rate SINK TO approx_rate AS +SELECT + (max(byte) - min(byte)) / 30.0 as rate, + date_bin(INTERVAL '30 second', ts) as time_window +from + bytes_log +GROUP BY + time_window; -INSERT INTO bytes_log VALUES -(101, '2025-01-01 00:00:01'), -(300, '2025-01-01 00:00:29'); +INSERT INTO + bytes_log +VALUES + (101, '2025-01-01 00:00:01'), + (300, '2025-01-01 00:00:29'); admin flush_flow('find_approx_rate'); -SELECT rate, time_window FROM approx_rate; +SELECT + rate, + time_window +FROM + approx_rate; -INSERT INTO bytes_log VALUES -(450, '2025-01-01 00:00:32'), -(500, '2025-01-01 00:00:37'); +INSERT INTO + bytes_log +VALUES + (450, '2025-01-01 00:00:32'), + (500, '2025-01-01 00:00:37'); admin flush_flow('find_approx_rate'); -SELECT rate, time_window FROM approx_rate; +SELECT + rate, + time_window +FROM + approx_rate; DROP TABLE bytes_log; + DROP FLOW find_approx_rate; + DROP TABLE approx_rate; + +-- input table +CREATE TABLE ngx_access_log ( + client STRING, + country STRING, + access_time TIMESTAMP TIME INDEX +); + +-- create flow task to calculate the distinct country +CREATE FLOW calc_ngx_country SINK TO ngx_country AS +SELECT + DISTINCT country, +FROM + ngx_access_log; + +INSERT INTO + ngx_access_log +VALUES + ("cli1", "b", 0); + +ADMIN FLUSH_FLOW('calc_ngx_country'); + +SELECT + "ngx_access_log.country" +FROM + ngx_country; + +-- making sure distinct is working +INSERT INTO + ngx_access_log +VALUES + ("cli1", "b", 1); + +ADMIN FLUSH_FLOW('calc_ngx_country'); + +SELECT + "ngx_access_log.country" +FROM + ngx_country; + +INSERT INTO + ngx_access_log +VALUES + ("cli1", "c", 2); + +ADMIN FLUSH_FLOW('calc_ngx_country'); + +SELECT + "ngx_access_log.country" +FROM + ngx_country; + +DROP FLOW calc_ngx_country; + +DROP TABLE ngx_access_log; + +DROP TABLE ngx_country; + +CREATE TABLE ngx_access_log ( + client STRING, + country STRING, + access_time TIMESTAMP TIME INDEX +); + +CREATE FLOW calc_ngx_country SINK TO ngx_country AS +SELECT + DISTINCT country, + -- this distinct is not necessary, but it's a good test to see if it works + date_bin(INTERVAL '1 hour', access_time) as time_window, +FROM + ngx_access_log +GROUP BY + country, + time_window; + +INSERT INTO + ngx_access_log +VALUES + ("cli1", "b", 0); + +ADMIN FLUSH_FLOW('calc_ngx_country'); + +SELECT + "ngx_access_log.country", + time_window +FROM + ngx_country; + +-- making sure distinct is working +INSERT INTO + ngx_access_log +VALUES + ("cli1", "b", 1); + +ADMIN FLUSH_FLOW('calc_ngx_country'); + +SELECT + "ngx_access_log.country", + time_window +FROM + ngx_country; + +INSERT INTO + ngx_access_log +VALUES + ("cli1", "c", 2); + +ADMIN FLUSH_FLOW('calc_ngx_country'); + +SELECT + "ngx_access_log.country", + time_window +FROM + ngx_country; + +DROP FLOW calc_ngx_country; + +DROP TABLE ngx_access_log; + +DROP TABLE ngx_country; + +CREATE TABLE temp_sensor_data ( + sensor_id INT, + loc STRING, + temperature DOUBLE, + ts TIMESTAMP TIME INDEX +); + +CREATE TABLE temp_alerts ( + sensor_id INT, + loc STRING, + max_temp DOUBLE, + ts TIMESTAMP TIME INDEX +); + +CREATE FLOW temp_monitoring SINK TO temp_alerts AS +SELECT + sensor_id, + loc, + max(temperature) as max_temp, +FROM + temp_sensor_data +GROUP BY + sensor_id, + loc +HAVING + max_temp > 100; + +INSERT INTO + temp_sensor_data +VALUES + (1, "room1", 50, 0); + +ADMIN FLUSH_FLOW('temp_monitoring'); + +-- This table should not exist yet +SHOW TABLES LIKE 'temp_alerts'; + +INSERT INTO + temp_sensor_data +VALUES + (1, "room1", 150, 1); + +ADMIN FLUSH_FLOW('temp_monitoring'); + +SHOW TABLES LIKE 'temp_alerts'; + +SELECT + sensor_id, + loc, + max_temp +FROM + temp_alerts; + +INSERT INTO + temp_sensor_data +VALUES + (2, "room1", 0, 2); + +ADMIN FLUSH_FLOW('temp_monitoring'); + +SELECT + sensor_id, + loc, + max_temp +FROM + temp_alerts; + +DROP FLOW temp_monitoring; + +DROP TABLE temp_sensor_data; + +DROP TABLE temp_alerts; + +CREATE TABLE ngx_access_log ( + client STRING, + stat INT, + size INT, + access_time TIMESTAMP TIME INDEX +); + +CREATE TABLE ngx_distribution ( + stat INT, + bucket_size INT, + total_logs BIGINT, + time_window TIMESTAMP TIME INDEX, + update_at TIMESTAMP, -- auto generated column by flow engine + PRIMARY KEY(stat, bucket_size) +); + +CREATE FLOW calc_ngx_distribution SINK TO ngx_distribution AS +SELECT + stat, + trunc(size, -1)::INT as bucket_size, + count(client) AS total_logs, + date_bin(INTERVAL '1 minutes', access_time) as time_window, +FROM + ngx_access_log +GROUP BY + stat, + time_window, + bucket_size; + +INSERT INTO + ngx_access_log +VALUES + ("cli1", 200, 100, 0); + +ADMIN FLUSH_FLOW('calc_ngx_distribution'); + +SELECT + stat, + bucket_size, + total_logs, + time_window +FROM + ngx_distribution; + +INSERT INTO + ngx_access_log +VALUES + ("cli1", 200, 200, 1), + ("cli1", 200, 205, 1), + ("cli1", 200, 209, 1), + ("cli1", 200, 210, 1), + ("cli2", 200, 300, 1); + +ADMIN FLUSH_FLOW('calc_ngx_distribution'); + +SELECT + stat, + bucket_size, + total_logs, + time_window +FROM + ngx_distribution; + +DROP FLOW calc_ngx_distribution; + +DROP TABLE ngx_access_log; + +DROP TABLE ngx_distribution;