diff --git a/src/flow/src/compute/render.rs b/src/flow/src/compute/render.rs index 444ef7a4ac8c..dcb2194a1d79 100644 --- a/src/flow/src/compute/render.rs +++ b/src/flow/src/compute/render.rs @@ -28,7 +28,7 @@ use super::state::Scheduler; use crate::compute::state::DataflowState; use crate::compute::types::{Collection, CollectionBundle, ErrCollector, Toff}; use crate::error::{Error, InvalidQuerySnafu, NotImplementedSnafu}; -use crate::expr::{self, GlobalId, LocalId}; +use crate::expr::{self, Batch, GlobalId, LocalId}; use crate::plan::{Plan, TypedPlan}; use crate::repr::{self, DiffRow}; @@ -87,9 +87,38 @@ impl<'referred, 'df> Context<'referred, 'df> { } impl<'referred, 'df> Context<'referred, 'df> { - /// Interpret and execute plan + /// Like `render_plan` but in Batch Mode + pub fn render_plan_batch(&mut self, plan: TypedPlan) -> Result, Error> { + match plan.plan { + Plan::Constant { rows } => Ok(self.render_constant_batch(rows)), + Plan::Get { .. } => NotImplementedSnafu { + reason: "Get is still WIP in batchmode", + } + .fail(), + Plan::Let { .. } => NotImplementedSnafu { + reason: "Let is still WIP in batchmode", + } + .fail(), + Plan::Mfp { input, mfp } => self.render_mfp_batch(input, mfp), + Plan::Reduce { + input, + key_val_plan, + reduce_plan, + } => self.render_reduce_batch(input, &key_val_plan, &reduce_plan, &plan.schema.typ), + Plan::Join { .. } => NotImplementedSnafu { + reason: "Join is still WIP", + } + .fail(), + Plan::Union { .. } => NotImplementedSnafu { + reason: "Union is still WIP", + } + .fail(), + } + } + + /// Interpret plan to dataflow and prepare them for execution /// - /// return the output of this plan + /// return the output handler of this plan pub fn render_plan(&mut self, plan: TypedPlan) -> Result { match plan.plan { Plan::Constant { rows } => Ok(self.render_constant(rows)), @@ -112,17 +141,61 @@ impl<'referred, 'df> Context<'referred, 'df> { } } + /// render Constant, take all rows that have a timestamp not greater than the current time + /// This function is primarily used for testing + /// Always assume input is sorted by timestamp + pub fn render_constant_batch(&mut self, rows: Vec) -> CollectionBundle { + let (send_port, recv_port) = self.df.make_edge::<_, Toff>("constant_batch"); + let mut per_time: BTreeMap> = Default::default(); + for (key, group) in &rows.into_iter().group_by(|(_row, ts, _diff)| *ts) { + per_time.entry(key).or_default().extend(group); + } + + let now = self.compute_state.current_time_ref(); + // TODO(discord9): better way to schedule future run + let scheduler = self.compute_state.get_scheduler(); + let scheduler_inner = scheduler.clone(); + let err_collector = self.err_collector.clone(); + + let subgraph_id = + self.df + .add_subgraph_source("ConstantBatch", send_port, move |_ctx, send_port| { + // find the first timestamp that is greater than now + // use filter_map + + let mut after = per_time.split_off(&(*now.borrow() + 1)); + // swap + std::mem::swap(&mut per_time, &mut after); + let not_great_than_now = after; + + not_great_than_now.into_iter().for_each(|(_ts, rows)| { + err_collector.run(|| { + let rows = rows.into_iter().map(|(row, _ts, _diff)| row).collect(); + let batch = Batch::try_from_rows(rows)?; + send_port.give(vec![batch]); + Ok(()) + }); + }); + // schedule the next run + if let Some(next_run_time) = per_time.keys().next().copied() { + scheduler_inner.schedule_at(next_run_time); + } + }); + scheduler.set_cur_subgraph(subgraph_id); + + CollectionBundle::from_collection(Collection::from_port(recv_port)) + } + /// render Constant, take all rows that have a timestamp not greater than the current time /// /// Always assume input is sorted by timestamp pub fn render_constant(&mut self, rows: Vec) -> CollectionBundle { let (send_port, recv_port) = self.df.make_edge::<_, Toff>("constant"); - let mut per_time: BTreeMap> = rows - .into_iter() - .group_by(|(_row, ts, _diff)| *ts) - .into_iter() - .map(|(k, v)| (k, v.into_iter().collect_vec())) - .collect(); + let mut per_time: BTreeMap> = Default::default(); + for (key, group) in &rows.into_iter().group_by(|(_row, ts, _diff)| *ts) { + per_time.entry(key).or_default().extend(group); + } + let now = self.compute_state.current_time_ref(); // TODO(discord9): better way to schedule future run let scheduler = self.compute_state.get_scheduler(); diff --git a/src/flow/src/compute/render/map.rs b/src/flow/src/compute/render/map.rs index c940b34ed144..d84d1778af88 100644 --- a/src/flow/src/compute/render/map.rs +++ b/src/flow/src/compute/render/map.rs @@ -23,12 +23,59 @@ use crate::compute::render::Context; use crate::compute::state::Scheduler; use crate::compute::types::{Arranged, Collection, CollectionBundle, ErrCollector, Toff}; use crate::error::{Error, PlanSnafu}; -use crate::expr::{EvalError, MapFilterProject, MfpPlan, ScalarExpr}; +use crate::expr::{Batch, EvalError, MapFilterProject, MfpPlan, ScalarExpr}; use crate::plan::TypedPlan; use crate::repr::{self, DiffRow, KeyValDiffRow, Row}; use crate::utils::ArrangeHandler; impl<'referred, 'df> Context<'referred, 'df> { + /// Like `render_mfp` but in batch mode + pub fn render_mfp_batch( + &mut self, + input: Box, + mfp: MapFilterProject, + ) -> Result, Error> { + let input = self.render_plan_batch(*input)?; + + let (out_send_port, out_recv_port) = self.df.make_edge::<_, Toff>("mfp_batch"); + + // This closure capture following variables: + let mfp_plan = MfpPlan::create_from(mfp)?; + + let err_collector = self.err_collector.clone(); + + // TODO(discord9): better way to schedule future run + let scheduler = self.compute_state.get_scheduler(); + + let subgraph = self.df.add_subgraph_in_out( + "mfp_batch", + input.collection.into_inner(), + out_send_port, + move |_ctx, recv, send| { + // mfp only need to passively receive updates from recvs + let src_data = recv.take_inner().into_iter().flat_map(|v| v.into_iter()); + + let output_batches = src_data + .filter_map(|mut input_batch| { + err_collector.run(|| { + let res_batch = mfp_plan.mfp.eval_batch_into(&mut input_batch)?; + Ok(res_batch) + }) + }) + .collect_vec(); + + send.give(output_batches); + }, + ); + + // register current subgraph in scheduler for future scheduling + scheduler.set_cur_subgraph(subgraph); + + let bundle = + CollectionBundle::from_collection(Collection::::from_port(out_recv_port)); + Ok(bundle) + } + /// render MapFilterProject, will only emit the `rows` once. Assume all incoming row's sys time being `now`` and ignore the row's stated sys time /// TODO(discord9): schedule mfp operator to run when temporal filter need /// diff --git a/src/flow/src/compute/render/reduce.rs b/src/flow/src/compute/render/reduce.rs index b41364ec4435..e43d3d22a4e0 100644 --- a/src/flow/src/compute/render/reduce.rs +++ b/src/flow/src/compute/render/reduce.rs @@ -14,23 +14,247 @@ use std::collections::BTreeMap; use std::ops::Range; +use std::sync::Arc; use datatypes::data_type::ConcreteDataType; +use datatypes::prelude::DataType; use datatypes::value::{ListValue, Value}; +use datatypes::vectors::NullVector; use hydroflow::scheduled::graph_ext::GraphExt; use itertools::Itertools; use snafu::{ensure, OptionExt, ResultExt}; use crate::compute::render::{Context, SubgraphArg}; use crate::compute::types::{Arranged, Collection, CollectionBundle, ErrCollector, Toff}; -use crate::error::{Error, PlanSnafu}; +use crate::error::{Error, NotImplementedSnafu, PlanSnafu}; use crate::expr::error::{DataAlreadyExpiredSnafu, DataTypeSnafu, InternalSnafu}; -use crate::expr::{EvalError, ScalarExpr}; +use crate::expr::{Batch, EvalError, ScalarExpr}; use crate::plan::{AccumulablePlan, AggrWithIndex, KeyValPlan, ReducePlan, TypedPlan}; use crate::repr::{self, DiffRow, KeyValDiffRow, RelationType, Row}; use crate::utils::{ArrangeHandler, ArrangeReader, ArrangeWriter, KeyExpiryManager}; impl<'referred, 'df> Context<'referred, 'df> { + const REDUCE_BATCH: &'static str = "reduce_batch"; + /// Like `render_reduce`, but for batch mode, and only barebone implementation + /// no support for distinct aggregation for now + // There is a false positive in using `Vec` as key due to `Value` have `bytes` variant + #[allow(clippy::mutable_key_type)] + pub fn render_reduce_batch( + &mut self, + input: Box, + key_val_plan: &KeyValPlan, + reduce_plan: &ReducePlan, + output_type: &RelationType, + ) -> Result, Error> { + let accum_plan = if let ReducePlan::Accumulable(accum_plan) = reduce_plan { + if !accum_plan.distinct_aggrs.is_empty() { + NotImplementedSnafu { + reason: "Distinct aggregation is not supported in batch mode", + } + .fail()? + } + accum_plan.clone() + } else { + NotImplementedSnafu { + reason: "Only accumulable reduce plan is supported in batch mode", + } + .fail()? + }; + + let input = self.render_plan_batch(*input)?; + + // first assembly key&val to separate key and val columns(since this is batch mode) + // Then stream kvs through a reduce operator + + // the output is concat from key and val + let output_key_arity = key_val_plan.key_plan.output_arity(); + + // TODO(discord9): config global expire time from self + let arrange_handler = self.compute_state.new_arrange(None); + + if let (Some(time_index), Some(expire_after)) = + (output_type.time_index, self.compute_state.expire_after()) + { + let expire_man = + KeyExpiryManager::new(Some(expire_after), Some(ScalarExpr::Column(time_index))); + arrange_handler.write().set_expire_state(expire_man); + } + + // reduce need full arrangement to be able to query all keys + let arrange_handler_inner = arrange_handler.clone_full_arrange().context(PlanSnafu { + reason: "No write is expected at this point", + })?; + let key_val_plan = key_val_plan.clone(); + + let now = self.compute_state.current_time_ref(); + + let err_collector = self.err_collector.clone(); + + // TODO(discord9): better way to schedule future run + let scheduler = self.compute_state.get_scheduler(); + + let (out_send_port, out_recv_port) = + self.df.make_edge::<_, Toff>(Self::REDUCE_BATCH); + + let subgraph = + self.df.add_subgraph_in_out( + Self::REDUCE_BATCH, + input.collection.into_inner(), + out_send_port, + move |_ctx, recv, send| { + let now = *(now.borrow()); + let arrange = arrange_handler_inner.clone(); + // mfp only need to passively receive updates from recvs + let src_data = recv + .take_inner() + .into_iter() + .flat_map(|v| v.into_iter()) + .collect_vec(); + + let mut key_to_many_vals = BTreeMap::::new(); + for batch in src_data { + err_collector.run(|| { + let (key_batch, val_batch) = + batch_split_by_key_val(&batch, &key_val_plan, &err_collector); + ensure!( + key_batch.row_count() == val_batch.row_count(), + InternalSnafu { + reason: format!( + "Key and val batch should have the same row count, found {} and {}", + key_batch.row_count(), + val_batch.row_count() + ) + } + ); + + for row_idx in 0..key_batch.row_count() { + let key_row = key_batch.get_row(row_idx).unwrap(); + let val_row = val_batch.slice(row_idx, 1)?; + let val_batch = + key_to_many_vals.entry(Row::new(key_row)).or_default(); + val_batch.append_batch(val_row)?; + } + + Ok(()) + }); + } + + // write lock the arrange for the rest of the function body + // to prevent wired race condition + let mut arrange = arrange.write(); + let mut all_arrange_updates = Vec::with_capacity(key_to_many_vals.len()); + let mut all_output_rows = Vec::with_capacity(key_to_many_vals.len()); + + for (key, val_batch) in key_to_many_vals { + err_collector.run(|| -> Result<(), _> { + let (accums, _, _) = arrange.get(now, &key).unwrap_or_default(); + let accum_list = from_accum_values_to_live_accums( + accums.unpack(), + accum_plan.simple_aggrs.len(), + )?; + + let mut accum_output = AccumOutput::new(); + for AggrWithIndex { + expr, + input_idx, + output_idx, + } in accum_plan.simple_aggrs.iter() + { + let cur_old_accum = accum_list.get(*output_idx).cloned().unwrap_or_default(); + // if batch is empty, input null instead + let cur_input = val_batch.batch().get(*input_idx).cloned().unwrap_or_else(||Arc::new(NullVector::new(val_batch.row_count()))); + + let (output, new_accum) = + expr.func.eval_batch(cur_old_accum, cur_input, None)?; + + accum_output.insert_accum(*output_idx, new_accum); + accum_output.insert_output(*output_idx, output); + } + + let (new_accums, res_val_row) = accum_output.into_accum_output()?; + + let arrange_update = ((key.clone(), Row::new(new_accums)), now, 1); + all_arrange_updates.push(arrange_update); + + let mut key_val = key; + key_val.extend(res_val_row); + all_output_rows.push((key_val, now, 1)); + + Ok(()) + }); + } + + err_collector.run(|| { + arrange.apply_updates(now, all_arrange_updates)?; + arrange.compact_to(now) + }); + + // this output part is not supposed to be resource intensive + // (because for every batch there wouldn't usually be as many output row?), + // so we can do some costly operation here + let output_types = all_output_rows.first().map(|(row, _, _)| { + row.iter() + .map(|v| v.data_type()) + .collect::>() + }); + + if let Some(output_types) = output_types { + err_collector.run(|| { + let column_cnt = output_types.len(); + let row_cnt = all_output_rows.len(); + + let mut output_builder = output_types + .into_iter() + .map(|t| t.create_mutable_vector(row_cnt)) + .collect_vec(); + + for (row, _, _) in all_output_rows { + for (i, v) in row.into_iter().enumerate() { + output_builder + .get_mut(i) + .context(InternalSnafu{ + reason: format!( + "Output builder should have the same length as the row, expected at most {} but got {}", + column_cnt-1, + i + ) + })? + .try_push_value_ref(v.as_value_ref()) + .context(DataTypeSnafu { + msg: "Failed to push value", + })?; + } + } + + let output_columns = output_builder + .into_iter() + .map(|mut b| b.to_vector()) + .collect_vec(); + + let output_batch = Batch::try_new(output_columns, row_cnt)?; + send.give(vec![output_batch]); + + Ok(()) + }); + } + }, + ); + + scheduler.set_cur_subgraph(subgraph); + + // by default the key of output arrange + let arranged = BTreeMap::from([( + (0..output_key_arity).map(ScalarExpr::Column).collect_vec(), + Arranged::new(arrange_handler), + )]); + + let bundle = CollectionBundle { + collection: Collection::from_port(out_recv_port), + arranged, + }; + Ok(bundle) + } + const REDUCE: &'static str = "reduce"; /// render `Plan::Reduce` into executable dataflow // There is a false positive in using `Vec` as key due to `Value` have `bytes` variant @@ -151,6 +375,18 @@ impl<'referred, 'df> Context<'referred, 'df> { } } +fn from_accum_values_to_live_accums( + accums: Vec, + len: usize, +) -> Result>, EvalError> { + let accum_ranges = from_val_to_slice_idx(accums.first().cloned(), len)?; + let mut accum_list = vec![]; + for range in accum_ranges.iter() { + accum_list.push(accums.get(range.clone()).unwrap_or_default().to_vec()); + } + Ok(accum_list) +} + /// All arrange(aka state) used in reduce operator pub struct ReduceArrange { /// The output arrange of reduce operator @@ -160,33 +396,40 @@ pub struct ReduceArrange { distinct_input: Option>, } -/// split a row into key and val by evaluate the key and val plan -fn split_row_to_key_val( - row: Row, - sys_time: repr::Timestamp, - diff: repr::Diff, +fn batch_split_by_key_val( + batch: &Batch, key_val_plan: &KeyValPlan, - row_buf: &mut Row, -) -> Result, EvalError> { - if let Some(key) = key_val_plan - .key_plan - .evaluate_into(&mut row.inner.clone(), row_buf)? - { - // val_plan is not supported to carry any filter predicate, - let val = key_val_plan - .val_plan - .evaluate_into(&mut row.inner.clone(), row_buf)? - .context(InternalSnafu { - reason: "val_plan should not contain any filter predicate", - })?; - Ok(Some(((key, val), sys_time, diff))) - } else { - Ok(None) + err_collector: &ErrCollector, +) -> (Batch, Batch) { + let row_count = batch.row_count(); + let mut key_batch = Batch::empty(); + let mut val_batch = Batch::empty(); + + err_collector.run(|| { + if key_val_plan.key_plan.output_arity() != 0 { + key_batch = key_val_plan.key_plan.eval_batch_into(&mut batch.clone())?; + } + + if key_val_plan.val_plan.output_arity() != 0 { + val_batch = key_val_plan.val_plan.eval_batch_into(&mut batch.clone())?; + } + Ok(()) + }); + + // deal with empty key or val + if key_batch.row_count() == 0 && key_batch.column_count() == 0 { + key_batch.set_row_count(row_count); + } + + if val_batch.row_count() == 0 && val_batch.column_count() == 0 { + val_batch.set_row_count(row_count); } + + (key_batch, val_batch) } /// split a row into key and val by evaluate the key and val plan -fn batch_split_rows_to_key_val( +fn split_rows_to_key_val( rows: impl IntoIterator, key_val_plan: KeyValPlan, err_collector: ErrCollector, @@ -235,7 +478,7 @@ fn reduce_subgraph( send, }: SubgraphArg, ) { - let key_val = batch_split_rows_to_key_val(data, key_val_plan.clone(), err_collector.clone()); + let key_val = split_rows_to_key_val(data, key_val_plan.clone(), err_collector.clone()); // from here for distinct reduce and accum reduce, things are drastically different // for distinct reduce the arrange store the output, // but for accum reduce the arrange store the accum state, and output is @@ -1127,6 +1370,105 @@ mod test { run_and_check(&mut state, &mut df, 6..7, expected, output); } + /// Batch Mode Reduce Evaluation + /// SELECT SUM(col) FROM table + /// + /// table schema: + /// | name | type | + /// |------|-------| + /// | col | Int64 | + #[test] + fn test_basic_batch_reduce_accum() { + let mut df = Hydroflow::new(); + let mut state = DataflowState::default(); + let now = state.current_time_ref(); + let mut ctx = harness_test_ctx(&mut df, &mut state); + + let rows = vec![ + (Row::new(vec![1i64.into()]), 1, 1), + (Row::new(vec![2i64.into()]), 2, 1), + (Row::new(vec![3i64.into()]), 3, 1), + (Row::new(vec![1i64.into()]), 4, 1), + (Row::new(vec![2i64.into()]), 5, 1), + (Row::new(vec![3i64.into()]), 6, 1), + ]; + let input_plan = Plan::Constant { rows: rows.clone() }; + + let typ = RelationType::new(vec![ColumnType::new_nullable( + ConcreteDataType::int64_datatype(), + )]); + let key_val_plan = KeyValPlan { + key_plan: MapFilterProject::new(1).project([]).unwrap().into_safe(), + val_plan: MapFilterProject::new(1).project([0]).unwrap().into_safe(), + }; + + let simple_aggrs = vec![AggrWithIndex::new( + AggregateExpr { + func: AggregateFunc::SumInt64, + expr: ScalarExpr::Column(0), + distinct: false, + }, + 0, + 0, + )]; + let accum_plan = AccumulablePlan { + full_aggrs: vec![AggregateExpr { + func: AggregateFunc::SumInt64, + expr: ScalarExpr::Column(0), + distinct: false, + }], + simple_aggrs, + distinct_aggrs: vec![], + }; + + let reduce_plan = ReducePlan::Accumulable(accum_plan); + let bundle = ctx + .render_reduce_batch( + Box::new(input_plan.with_types(typ.into_unnamed())), + &key_val_plan, + &reduce_plan, + &RelationType::empty(), + ) + .unwrap(); + + { + let now_inner = now.clone(); + let expected = BTreeMap::>::from([ + (1, vec![1i64]), + (2, vec![3i64]), + (3, vec![6i64]), + (4, vec![7i64]), + (5, vec![9i64]), + (6, vec![12i64]), + ]); + let collection = bundle.collection; + ctx.df + .add_subgraph_sink("test_sink", collection.into_inner(), move |_ctx, recv| { + let now = *now_inner.borrow(); + let data = recv.take_inner(); + let res = data.into_iter().flat_map(|v| v.into_iter()).collect_vec(); + + if let Some(expected) = expected.get(&now) { + let batch = expected.iter().map(|v| Value::from(*v)).collect_vec(); + let batch = Batch::try_from_rows(vec![batch.into()]).unwrap(); + assert_eq!(res.first(), Some(&batch)); + } + }); + drop(ctx); + + for now in 1..7 { + state.set_current_ts(now); + state.run_available_with_schedule(&mut df); + if !state.get_err_collector().is_empty() { + panic!( + "Errors occur: {:?}", + state.get_err_collector().get_all_blocking() + ) + } + } + } + } + /// SELECT SUM(col) FROM table /// /// table schema: diff --git a/src/flow/src/compute/render/src_sink.rs b/src/flow/src/compute/render/src_sink.rs index d984f4831191..53141b8cc0c1 100644 --- a/src/flow/src/compute/render/src_sink.rs +++ b/src/flow/src/compute/render/src_sink.rs @@ -27,11 +27,67 @@ use crate::compute::render::Context; use crate::compute::types::{Arranged, Collection, CollectionBundle, Toff}; use crate::error::{Error, PlanSnafu}; use crate::expr::error::InternalSnafu; -use crate::expr::EvalError; +use crate::expr::{Batch, EvalError}; use crate::repr::{DiffRow, Row, BROADCAST_CAP}; #[allow(clippy::mutable_key_type)] impl<'referred, 'df> Context<'referred, 'df> { + /// simply send the batch to downstream, without fancy features like buffering + pub fn render_source_batch( + &mut self, + mut src_recv: broadcast::Receiver, + ) -> Result, Error> { + debug!("Rendering Source Batch"); + let (send_port, recv_port) = self.df.make_edge::<_, Toff>("source_batch"); + + let schd = self.compute_state.get_scheduler(); + let inner_schd = schd.clone(); + let now = self.compute_state.current_time_ref(); + let err_collector = self.err_collector.clone(); + + let sub = self + .df + .add_subgraph_source("source_batch", send_port, move |_ctx, send| { + loop { + match src_recv.try_recv() { + Ok(batch) => { + send.give(vec![batch]); + } + Err(TryRecvError::Empty) => { + break; + } + Err(TryRecvError::Lagged(lag_offset)) => { + // use `err_collector` instead of `error!` to locate which operator caused the error + err_collector.run(|| -> Result<(), EvalError> { + InternalSnafu { + reason: format!("Flow missing {} rows behind", lag_offset), + } + .fail() + }); + break; + } + Err(TryRecvError::Closed) => { + err_collector.run(|| -> Result<(), EvalError> { + InternalSnafu { + reason: "Source Batch Channel is closed".to_string(), + } + .fail() + }); + break; + } + } + } + + let now = *now.borrow(); + // always schedule source to run at now so we can + // repeatedly run source if needed + inner_schd.schedule_at(now); + }); + schd.set_cur_subgraph(sub); + let bundle = CollectionBundle::from_collection(Collection::::from_port(recv_port)); + Ok(bundle) + } + /// Render a source which comes from brocast channel into the dataflow /// will immediately send updates not greater than `now` and buffer the rest in arrangement pub fn render_source( @@ -114,6 +170,32 @@ impl<'referred, 'df> Context<'referred, 'df> { }) } + pub fn render_unbounded_sink_batch( + &mut self, + bundle: CollectionBundle, + sender: mpsc::UnboundedSender, + ) { + let CollectionBundle { + collection, + arranged: _, + } = bundle; + + let _sink = self.df.add_subgraph_sink( + "UnboundedSinkBatch", + collection.into_inner(), + move |_ctx, recv| { + let data = recv.take_inner(); + for batch in data.into_iter().flat_map(|i| i.into_iter()) { + // if the sender is closed unexpectedly, stop sending + if sender.is_closed() || sender.send(batch).is_err() { + common_telemetry::error!("UnboundedSinkBatch is closed"); + break; + } + } + }, + ); + } + pub fn render_unbounded_sink( &mut self, bundle: CollectionBundle, diff --git a/src/flow/src/compute/types.rs b/src/flow/src/compute/types.rs index 9674163c2686..f7cbebae271a 100644 --- a/src/flow/src/compute/types.rs +++ b/src/flow/src/compute/types.rs @@ -105,11 +105,13 @@ impl Arranged { /// This type maintains the invariant that it does contain at least one(or both) valid /// source of data, either a collection or at least one arrangement. This is for convenience /// of reading the data from the collection. -pub struct CollectionBundle { +/// +// TODO(discord9): make T default to Batch and obsolete the Row Mode +pub struct CollectionBundle { /// This is useful for passively reading the new updates from the collection /// /// Invariant: the timestamp of the updates should always not greater than now, since future updates should be stored in the arrangement - pub collection: Collection, + pub collection: Collection, /// the key [`ScalarExpr`] indicate how the keys(also a [`Row`]) used in Arranged is extract from collection's [`Row`] /// So it is the "index" of the arrangement /// @@ -121,13 +123,16 @@ pub struct CollectionBundle { pub arranged: BTreeMap, Arranged>, } -impl CollectionBundle { - pub fn from_collection(collection: Collection) -> Self { +impl CollectionBundle { + pub fn from_collection(collection: Collection) -> Self { Self { collection, arranged: BTreeMap::default(), } } +} + +impl CollectionBundle { pub fn clone(&self, df: &mut Hydroflow) -> Self { Self { collection: self.collection.clone(df), diff --git a/src/flow/src/expr.rs b/src/flow/src/expr.rs index 871b23c25dbc..1f8160716a55 100644 --- a/src/flow/src/expr.rs +++ b/src/flow/src/expr.rs @@ -24,6 +24,7 @@ mod scalar; mod signature; use datatypes::prelude::DataType; +use datatypes::value::Value; use datatypes::vectors::VectorRef; pub(crate) use df_func::{DfScalarFunction, RawDfScalarFn}; pub(crate) use error::{EvalError, InvalidArgumentSnafu}; @@ -41,41 +42,164 @@ pub const TUMBLE_START: &str = "tumble_start"; pub const TUMBLE_END: &str = "tumble_end"; /// A batch of vectors with the same length but without schema, only useful in dataflow +/// +/// somewhere cheap to clone since it just contains a list of VectorRef(which is a `Arc`). +#[derive(Debug, Clone)] pub struct Batch { batch: Vec, row_count: usize, + /// describe if corresponding rows in batch is insert or delete, None means all rows are insert + diffs: Option, +} + +impl PartialEq for Batch { + fn eq(&self, other: &Self) -> bool { + let mut batch_eq = true; + if self.batch.len() != other.batch.len() { + return false; + } + for (left, right) in self.batch.iter().zip(other.batch.iter()) { + batch_eq = batch_eq + && ::eq(&left.to_arrow_array(), &right.to_arrow_array()); + } + + let diff_eq = match (&self.diffs, &other.diffs) { + (Some(left), Some(right)) => { + ::eq(&left.to_arrow_array(), &right.to_arrow_array()) + } + (None, None) => true, + _ => false, + }; + batch_eq && diff_eq && self.row_count == other.row_count + } +} + +impl Eq for Batch {} + +impl Default for Batch { + fn default() -> Self { + Self::empty() + } } impl Batch { - pub fn new(batch: Vec, row_count: usize) -> Self { - Self { batch, row_count } + pub fn try_from_rows(rows: Vec) -> Result { + if rows.is_empty() { + return Ok(Self::empty()); + } + let len = rows.len(); + let mut builder = rows + .first() + .unwrap() + .iter() + .map(|v| v.data_type().create_mutable_vector(len)) + .collect_vec(); + for row in rows { + ensure!( + row.len() == builder.len(), + InvalidArgumentSnafu { + reason: format!( + "row length not match, expect {}, found {}", + builder.len(), + row.len() + ) + } + ); + for (idx, value) in row.iter().enumerate() { + builder[idx] + .try_push_value_ref(value.as_value_ref()) + .context(DataTypeSnafu { + msg: "Failed to convert rows to columns", + })?; + } + } + + let columns = builder.into_iter().map(|mut b| b.to_vector()).collect_vec(); + let batch = Self::try_new(columns, len)?; + Ok(batch) + } + + pub fn empty() -> Self { + Self { + batch: vec![], + row_count: 0, + diffs: None, + } + } + pub fn try_new(batch: Vec, row_count: usize) -> Result { + ensure!( + batch.iter().map(|v| v.len()).all_equal() + && batch.first().map(|v| v.len() == row_count).unwrap_or(true), + InvalidArgumentSnafu { + reason: "All columns should have same length".to_string() + } + ); + Ok(Self { + batch, + row_count, + diffs: None, + }) + } + + pub fn new_unchecked(batch: Vec, row_count: usize) -> Self { + Self { + batch, + row_count, + diffs: None, + } } pub fn batch(&self) -> &[VectorRef] { &self.batch } + pub fn batch_mut(&mut self) -> &mut Vec { + &mut self.batch + } + pub fn row_count(&self) -> usize { self.row_count } + pub fn set_row_count(&mut self, row_count: usize) { + self.row_count = row_count; + } + + pub fn column_count(&self) -> usize { + self.batch.len() + } + + pub fn get_row(&self, idx: usize) -> Result, EvalError> { + ensure!( + idx < self.row_count, + InvalidArgumentSnafu { + reason: format!( + "Expect row index to be less than {}, found {}", + self.row_count, idx + ) + } + ); + Ok(self.batch.iter().map(|v| v.get(idx)).collect_vec()) + } + /// Slices the `Batch`, returning a new `Batch`. - /// - /// # Panics - /// This function panics if `offset + length > self.row_count()`. - pub fn slice(&self, offset: usize, length: usize) -> Batch { + pub fn slice(&self, offset: usize, length: usize) -> Result { let batch = self .batch() .iter() .map(|v| v.slice(offset, length)) .collect_vec(); - Batch::new(batch, length) + Batch::try_new(batch, length) } /// append another batch to self + /// + /// NOTE: This is expensive since it will create new vectors for each column pub fn append_batch(&mut self, other: Batch) -> Result<(), EvalError> { ensure!( - self.batch.len() == other.batch.len(), + self.batch.len() == other.batch.len() + || self.batch.is_empty() + || other.batch.is_empty(), InvalidArgumentSnafu { reason: format!( "Expect two batch to have same numbers of column, found {} and {} columns", @@ -85,21 +209,31 @@ impl Batch { } ); - let batch_builders = self - .batch + if self.batch.is_empty() { + self.batch = other.batch; + self.row_count = other.row_count; + return Ok(()); + } else if other.batch.is_empty() { + return Ok(()); + } + + let dts = if self.batch.is_empty() { + other.batch.iter().map(|v| v.data_type()).collect_vec() + } else { + self.batch.iter().map(|v| v.data_type()).collect_vec() + }; + + let batch_builders = dts .iter() - .map(|v| { - v.data_type() - .create_mutable_vector(self.row_count() + other.row_count()) - }) + .map(|dt| dt.create_mutable_vector(self.row_count() + other.row_count())) .collect_vec(); let mut result = vec![]; - let zelf_row_count = self.row_count(); + let self_row_count = self.row_count(); let other_row_count = other.row_count(); for (idx, mut builder) in batch_builders.into_iter().enumerate() { builder - .extend_slice_of(self.batch()[idx].as_ref(), 0, zelf_row_count) + .extend_slice_of(self.batch()[idx].as_ref(), 0, self_row_count) .context(DataTypeSnafu { msg: "Failed to extend vector", })?; @@ -111,7 +245,7 @@ impl Batch { result.push(builder.to_vector()); } self.batch = result; - self.row_count = zelf_row_count + other_row_count; + self.row_count = self_row_count + other_row_count; Ok(()) } } diff --git a/src/flow/src/expr/linear.rs b/src/flow/src/expr/linear.rs index 234ae12cef14..3185bdffcf5b 100644 --- a/src/flow/src/expr/linear.rs +++ b/src/flow/src/expr/linear.rs @@ -16,13 +16,18 @@ use std::collections::{BTreeMap, BTreeSet}; +use arrow::array::BooleanArray; +use arrow::compute::FilterBuilder; use common_telemetry::debug; +use datatypes::prelude::ConcreteDataType; use datatypes::value::Value; -use snafu::ensure; +use datatypes::vectors::{BooleanVector, Helper}; +use itertools::Itertools; +use snafu::{ensure, OptionExt, ResultExt}; use crate::error::{Error, InvalidQuerySnafu}; -use crate::expr::error::{EvalError, InternalSnafu}; -use crate::expr::{InvalidArgumentSnafu, ScalarExpr}; +use crate::expr::error::{ArrowSnafu, DataTypeSnafu, EvalError, InternalSnafu, TypeMismatchSnafu}; +use crate::expr::{Batch, InvalidArgumentSnafu, ScalarExpr}; use crate::repr::{self, value_to_internal_ts, Diff, Row}; /// A compound operator that can be applied row-by-row. @@ -473,6 +478,85 @@ impl SafeMfpPlan { self.mfp.permute(map, new_arity) } + /// similar to [`MapFilterProject::evaluate_into`], just in batch, and rows that don't pass the predicates are not included in the output. + /// + /// so it's not guaranteed that the output will have the same number of rows as the input. + pub fn eval_batch_into(&self, batch: &mut Batch) -> Result { + ensure!( + batch.column_count() == self.mfp.input_arity, + InvalidArgumentSnafu { + reason: format!( + "batch column length {} is not equal to input_arity {}", + batch.column_count(), + self.mfp.input_arity + ), + } + ); + + let passed_predicates = self.eval_batch_inner(batch)?; + let filter = FilterBuilder::new(passed_predicates.as_boolean_array()); + let pred = filter.build(); + let mut result = vec![]; + for col in batch.batch() { + let filtered = pred + .filter(col.to_arrow_array().as_ref()) + .context(ArrowSnafu { + context: format!("failed to filter column for mfp operator {:?}", self), + })?; + result.push(Helper::try_into_vector(filtered).context(DataTypeSnafu { + msg: "Failed to convert arrow array to vector", + })?); + } + let projected = self + .mfp + .projection + .iter() + .map(|c| result[*c].clone()) + .collect_vec(); + let row_count = pred.count(); + + Batch::try_new(projected, row_count) + } + + /// similar to [`MapFilterProject::evaluate_into`], just in batch. + pub fn eval_batch_inner(&self, batch: &mut Batch) -> Result { + // mark the columns that have been evaluated and appended to the `batch` + let mut expression = 0; + // preds default to true and will be updated as we evaluate each predicate + let mut all_preds = BooleanVector::from(vec![Some(true); batch.row_count()]); + + // to compute predicate, need to first compute all expressions used in predicates + for (support, predicate) in self.mfp.predicates.iter() { + while self.mfp.input_arity + expression < *support { + let expr_eval = self.mfp.expressions[expression].eval_batch(batch)?; + batch.batch_mut().push(expr_eval); + expression += 1; + } + let pred_vec = predicate.eval_batch(batch)?; + let pred_arr = pred_vec.to_arrow_array(); + let pred_arr = pred_arr.as_any().downcast_ref::().context({ + TypeMismatchSnafu { + expected: ConcreteDataType::boolean_datatype(), + actual: pred_vec.data_type(), + } + })?; + let all_arr = all_preds.as_boolean_array(); + let res_arr = arrow::compute::and(all_arr, pred_arr).context(ArrowSnafu { + context: format!("failed to compute predicate for mfp operator {:?}", self), + })?; + all_preds = BooleanVector::from(res_arr); + } + + // while evaluated expressions are less than total expressions, keep evaluating + while expression < self.mfp.expressions.len() { + let expr_eval = self.mfp.expressions[expression].eval_batch(batch)?; + batch.batch_mut().push(expr_eval); + expression += 1; + } + + Ok(all_preds) + } + /// Evaluates the linear operator on a supplied list of datums. /// /// The arguments are the initial datums associated with the row, @@ -735,10 +819,15 @@ impl MfpPlan { #[cfg(test)] mod test { + use std::sync::Arc; + use datatypes::data_type::ConcreteDataType; + use datatypes::vectors::{Int32Vector, Int64Vector}; + use pretty_assertions::assert_eq; use super::*; use crate::expr::{BinaryFunc, UnaryFunc, UnmaterializableFunc}; + #[test] fn test_mfp_with_time() { use crate::expr::func::BinaryFunc; @@ -844,6 +933,21 @@ mod test { .unwrap() .unwrap(); assert_eq!(ret, Row::pack(vec![Value::from(false), Value::from(true)])); + + // batch mode + let mut batch = Batch::try_from_rows(vec![Row::from(vec![ + Value::from(4), + Value::from(2), + Value::from(3), + ])]) + .unwrap(); + let ret = safe_mfp.eval_batch_into(&mut batch).unwrap(); + + assert_eq!( + ret, + Batch::try_from_rows(vec![Row::from(vec![Value::from(false), Value::from(true)])]) + .unwrap() + ); } #[test] @@ -865,7 +969,7 @@ mod test { BinaryFunc::Gt, )]) .unwrap(); - let mut input1 = vec![ + let input1 = vec![ Value::from(4), Value::from(2), Value::from(3), @@ -873,19 +977,34 @@ mod test { ]; let safe_mfp = SafeMfpPlan { mfp }; let ret = safe_mfp - .evaluate_into(&mut input1, &mut Row::empty()) + .evaluate_into(&mut input1.clone(), &mut Row::empty()) .unwrap(); assert_eq!(ret, None); - let mut input2 = vec![ + + let mut input1_batch = Batch::try_from_rows(vec![Row::new(input1)]).unwrap(); + let ret_batch = safe_mfp.eval_batch_into(&mut input1_batch).unwrap(); + assert_eq!( + ret_batch, + Batch::try_new(vec![Arc::new(Int32Vector::from_vec(vec![]))], 0).unwrap() + ); + + let input2 = vec![ Value::from(5), Value::from(2), Value::from(4), Value::from("abc"), ]; let ret = safe_mfp - .evaluate_into(&mut input2, &mut Row::empty()) + .evaluate_into(&mut input2.clone(), &mut Row::empty()) .unwrap(); assert_eq!(ret, Some(Row::pack(vec![Value::from(11)]))); + + let mut input2_batch = Batch::try_from_rows(vec![Row::new(input2)]).unwrap(); + let ret_batch = safe_mfp.eval_batch_into(&mut input2_batch).unwrap(); + assert_eq!( + ret_batch, + Batch::try_new(vec![Arc::new(Int32Vector::from_vec(vec![11]))], 1).unwrap() + ); } #[test] @@ -923,27 +1042,50 @@ mod test { .unwrap() .project([0, 1, 2]) .unwrap(); - let mut input1 = vec![ + let input1 = vec![ Value::from(4i64), Value::from(2), Value::from(3), Value::from(53), ]; let safe_mfp = SafeMfpPlan { mfp }; - let ret = safe_mfp.evaluate_into(&mut input1, &mut Row::empty()); + let ret = safe_mfp.evaluate_into(&mut input1.clone(), &mut Row::empty()); assert!(matches!(ret, Err(EvalError::InvalidArgument { .. }))); + let mut input1_batch = Batch::try_from_rows(vec![Row::new(input1)]).unwrap(); + let ret_batch = safe_mfp.eval_batch_into(&mut input1_batch); + assert!(matches!(ret_batch, Err(EvalError::InvalidArgument { .. }))); + let input2 = vec![Value::from(4i64), Value::from(2), Value::from(3)]; let ret = safe_mfp .evaluate_into(&mut input2.clone(), &mut Row::empty()) .unwrap(); - assert_eq!(ret, Some(Row::new(input2))); + assert_eq!(ret, Some(Row::new(input2.clone()))); + + let input2_batch = Batch::try_from_rows(vec![Row::new(input2)]).unwrap(); + let ret_batch = safe_mfp.eval_batch_into(&mut input2_batch.clone()).unwrap(); + assert_eq!(ret_batch, input2_batch); - let mut input3 = vec![Value::from(4i64), Value::from(5), Value::from(2)]; + let input3 = vec![Value::from(4i64), Value::from(5), Value::from(2)]; let ret = safe_mfp - .evaluate_into(&mut input3, &mut Row::empty()) + .evaluate_into(&mut input3.clone(), &mut Row::empty()) .unwrap(); assert_eq!(ret, None); + + let input3_batch = Batch::try_from_rows(vec![Row::new(input3)]).unwrap(); + let ret_batch = safe_mfp.eval_batch_into(&mut input3_batch.clone()).unwrap(); + assert_eq!( + ret_batch, + Batch::try_new( + vec![ + Arc::new(Int64Vector::from_vec(Default::default())), + Arc::new(Int32Vector::from_vec(Default::default())), + Arc::new(Int32Vector::from_vec(Default::default())) + ], + 0 + ) + .unwrap() + ); } #[test] @@ -961,10 +1103,18 @@ mod test { .unwrap() .project(vec![3]) .unwrap(); - let mut input1 = vec![Value::from(2), Value::from(3), Value::from(4)]; + let input1 = vec![Value::from(2), Value::from(3), Value::from(4)]; let safe_mfp = SafeMfpPlan { mfp }; - let ret = safe_mfp.evaluate_into(&mut input1, &mut Row::empty()); + let ret = safe_mfp.evaluate_into(&mut input1.clone(), &mut Row::empty()); assert_eq!(ret.unwrap(), Some(Row::new(vec![Value::from(false)]))); + + let mut input1_batch = Batch::try_from_rows(vec![Row::new(input1)]).unwrap(); + let ret_batch = safe_mfp.eval_batch_into(&mut input1_batch).unwrap(); + + assert_eq!( + ret_batch, + Batch::try_new(vec![Arc::new(BooleanVector::from(vec![false]))], 1).unwrap() + ); } #[test] fn test_mfp_chore() { diff --git a/src/flow/src/expr/relation/func.rs b/src/flow/src/expr/relation/func.rs index 868d83b43f02..afcdb7ddd152 100644 --- a/src/flow/src/expr/relation/func.rs +++ b/src/flow/src/expr/relation/func.rs @@ -18,15 +18,17 @@ use std::sync::OnceLock; use datatypes::prelude::ConcreteDataType; use datatypes::value::Value; +use datatypes::vectors::VectorRef; use serde::{Deserialize, Serialize}; use smallvec::smallvec; -use snafu::{IntoError, OptionExt}; +use snafu::{ensure, IntoError, OptionExt}; use strum::{EnumIter, IntoEnumIterator}; use crate::error::{DatafusionSnafu, Error, InvalidQuerySnafu}; use crate::expr::error::EvalError; use crate::expr::relation::accum::{Accum, Accumulator}; use crate::expr::signature::{GenericFn, Signature}; +use crate::expr::InvalidArgumentSnafu; use crate::repr::Diff; /// Aggregate functions that can be applied to a group of rows. @@ -131,6 +133,98 @@ impl AggregateFunc { let res = accum.eval(self)?; Ok((res, accum.into_state())) } + + /// return output value and new accumulator state + pub fn eval_batch( + &self, + accum: A, + vector: VectorRef, + diff: Option, + ) -> Result<(Value, Vec), EvalError> + where + A: IntoIterator, + { + let mut accum = accum.into_iter().peekable(); + + let mut accum = if accum.peek().is_none() { + Accum::new_accum(self)? + } else { + Accum::try_from_iter(self, &mut accum)? + }; + + let vector_diff = VectorDiff::try_new(vector, diff)?; + + accum.update_batch(self, vector_diff)?; + + let res = accum.eval(self)?; + Ok((res, accum.into_state())) + } +} + +struct VectorDiff { + vector: VectorRef, + diff: Option, +} + +impl VectorDiff { + fn len(&self) -> usize { + self.vector.len() + } + + fn try_new(vector: VectorRef, diff: Option) -> Result { + ensure!( + diff.as_ref() + .map_or(true, |diff| diff.len() == vector.len()), + InvalidArgumentSnafu { + reason: "Length of vector and diff should be the same" + } + ); + Ok(Self { vector, diff }) + } +} + +impl IntoIterator for VectorDiff { + type Item = (Value, Diff); + type IntoIter = VectorDiffIter; + + fn into_iter(self) -> Self::IntoIter { + VectorDiffIter { + vector: self.vector, + diff: self.diff, + idx: 0, + } + } +} + +struct VectorDiffIter { + vector: VectorRef, + diff: Option, + idx: usize, +} + +impl std::iter::Iterator for VectorDiffIter { + type Item = (Value, Diff); + + fn next(&mut self) -> Option { + if self.idx >= self.vector.len() { + return None; + } + let value = self.vector.get(self.idx); + // +1 means insert, -1 means delete, and default to +1 insert when diff is not provided + let diff = if let Some(diff) = self.diff.as_ref() { + if let Ok(diff_at) = diff.get(self.idx).try_into() { + diff_at + } else { + common_telemetry::warn!("Invalid diff value at index {}", self.idx); + return None; + } + } else { + 1 + }; + + self.idx += 1; + Some((value, diff)) + } } /// Generate signature for each aggregate function diff --git a/src/flow/src/expr/scalar.rs b/src/flow/src/expr/scalar.rs index c4d698529878..6a9cf30d950c 100644 --- a/src/flow/src/expr/scalar.rs +++ b/src/flow/src/expr/scalar.rs @@ -277,7 +277,7 @@ impl ScalarExpr { // put a slice to corresponding batch let slice_offset = prev_cond_idx; let slice_length = idx - prev_cond_idx; - let to_be_append = batch.slice(slice_offset, slice_length); + let to_be_append = batch.slice(slice_offset, slice_length)?; let to_put_back = match prev_cond { Some(true) => ( @@ -300,7 +300,7 @@ impl ScalarExpr { if let Some(slice_offset) = prev_start_idx { let prev_cond = prev_cond.unwrap(); let slice_length = bool_conds.len() - slice_offset; - let to_be_append = batch.slice(slice_offset, slice_length); + let to_be_append = batch.slice(slice_offset, slice_length)?; let to_put_back = match prev_cond { Some(true) => ( Some(true), @@ -812,7 +812,7 @@ mod test { let raw_len = raw.len(); let vectors = vec![Int32Vector::from(raw).slice(0, raw_len)]; - let batch = Batch::new(vectors, raw_len); + let batch = Batch::try_new(vectors, raw_len).unwrap(); let expected = Int32Vector::from(vec![ None, Some(42), @@ -831,7 +831,7 @@ mod test { let raw_len = raw.len(); let vectors = vec![Int32Vector::from(raw).slice(0, raw_len)]; - let batch = Batch::new(vectors, raw_len); + let batch = Batch::try_new(vectors, raw_len).unwrap(); let expected = Int32Vector::from(vec![Some(42)]).slice(0, raw_len); assert_eq!(expr.eval_batch(&batch).unwrap(), expected); @@ -839,7 +839,7 @@ mod test { let raw_len = raw.len(); let vectors = vec![Int32Vector::from(raw).slice(0, raw_len)]; - let batch = Batch::new(vectors, raw_len); + let batch = Batch::try_new(vectors, raw_len).unwrap(); let expected = NullVector::new(raw_len).slice(0, raw_len); assert_eq!(expr.eval_batch(&batch).unwrap(), expected); } diff --git a/src/flow/src/lib.rs b/src/flow/src/lib.rs index ec93d5812870..8d6a881fa066 100644 --- a/src/flow/src/lib.rs +++ b/src/flow/src/lib.rs @@ -20,6 +20,10 @@ #![allow(dead_code)] #![warn(clippy::missing_docs_in_private_items)] #![warn(clippy::too_many_lines)] + +// TODO(discord9): enable this lint to handle out of bound access +// #![cfg_attr(not(test), warn(clippy::indexing_slicing))] + // allow unused for now because it should be use later mod adapter; mod compute; diff --git a/src/flow/src/repr.rs b/src/flow/src/repr.rs index 9ce1efa0a04f..7b57fc3ed22a 100644 --- a/src/flow/src/repr.rs +++ b/src/flow/src/repr.rs @@ -177,6 +177,12 @@ impl Row { } } +impl From> for Row { + fn from(row: Vec) -> Self { + Row::new(row) + } +} + impl From for Row { fn from(row: ProtoRow) -> Self { Row::pack( diff --git a/src/flow/src/utils.rs b/src/flow/src/utils.rs index 778fde49c9a3..269c53fa84aa 100644 --- a/src/flow/src/utils.rs +++ b/src/flow/src/utils.rs @@ -513,7 +513,7 @@ pub type ArrangeReader<'a> = tokio::sync::RwLockReadGuard<'a, Arrangement>; pub type ArrangeWriter<'a> = tokio::sync::RwLockWriteGuard<'a, Arrangement>; /// A handler to the inner Arrangement, can be cloned and shared, useful for query it's inner state -#[derive(Debug)] +#[derive(Debug, Clone)] pub struct ArrangeHandler { inner: Arc>, }