Skip to content

Commit

Permalink
perf(flow): Map&Reduce Operator use batch to reduce alloc (#4567)
Browse files Browse the repository at this point in the history
* feat: partial impl mfp

* feat: eval batch inner

* chore: fmt

* feat: mfp eval_batch

* WIP

* feat: Collection generic over row&Batch

* feat: render source batch

* chore: chore

* feat: render mfp batch

* feat: render reduce batch(WIP)

* feat(WIP): render reduce

* feat: reduce batch

* feat: render sink batch

* feat: render constant batch

* chore: error handling& mfp batch test

* test: mfp batch

* chore: rm import

* test: render reduce batch

* chore: add TODO

* chore: per bot review

* refactor: per review

* chore: cmt

* chore: rename

* docs: update no panic
  • Loading branch information
discord9 authored Aug 29, 2024
1 parent 79f40a7 commit 8c8499c
Show file tree
Hide file tree
Showing 12 changed files with 1,015 additions and 78 deletions.
91 changes: 82 additions & 9 deletions src/flow/src/compute/render.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ use super::state::Scheduler;
use crate::compute::state::DataflowState;
use crate::compute::types::{Collection, CollectionBundle, ErrCollector, Toff};
use crate::error::{Error, InvalidQuerySnafu, NotImplementedSnafu};
use crate::expr::{self, GlobalId, LocalId};
use crate::expr::{self, Batch, GlobalId, LocalId};
use crate::plan::{Plan, TypedPlan};
use crate::repr::{self, DiffRow};

Expand Down Expand Up @@ -87,9 +87,38 @@ impl<'referred, 'df> Context<'referred, 'df> {
}

impl<'referred, 'df> Context<'referred, 'df> {
/// Interpret and execute plan
/// Like `render_plan` but in Batch Mode
pub fn render_plan_batch(&mut self, plan: TypedPlan) -> Result<CollectionBundle<Batch>, Error> {
match plan.plan {
Plan::Constant { rows } => Ok(self.render_constant_batch(rows)),
Plan::Get { .. } => NotImplementedSnafu {
reason: "Get is still WIP in batchmode",
}
.fail(),
Plan::Let { .. } => NotImplementedSnafu {
reason: "Let is still WIP in batchmode",
}
.fail(),
Plan::Mfp { input, mfp } => self.render_mfp_batch(input, mfp),
Plan::Reduce {
input,
key_val_plan,
reduce_plan,
} => self.render_reduce_batch(input, &key_val_plan, &reduce_plan, &plan.schema.typ),
Plan::Join { .. } => NotImplementedSnafu {
reason: "Join is still WIP",
}
.fail(),
Plan::Union { .. } => NotImplementedSnafu {
reason: "Union is still WIP",
}
.fail(),
}
}

/// Interpret plan to dataflow and prepare them for execution
///
/// return the output of this plan
/// return the output handler of this plan
pub fn render_plan(&mut self, plan: TypedPlan) -> Result<CollectionBundle, Error> {
match plan.plan {
Plan::Constant { rows } => Ok(self.render_constant(rows)),
Expand All @@ -112,17 +141,61 @@ impl<'referred, 'df> Context<'referred, 'df> {
}
}

/// render Constant, take all rows that have a timestamp not greater than the current time
/// This function is primarily used for testing
/// Always assume input is sorted by timestamp
pub fn render_constant_batch(&mut self, rows: Vec<DiffRow>) -> CollectionBundle<Batch> {
let (send_port, recv_port) = self.df.make_edge::<_, Toff<Batch>>("constant_batch");
let mut per_time: BTreeMap<repr::Timestamp, Vec<DiffRow>> = Default::default();
for (key, group) in &rows.into_iter().group_by(|(_row, ts, _diff)| *ts) {
per_time.entry(key).or_default().extend(group);
}

let now = self.compute_state.current_time_ref();
// TODO(discord9): better way to schedule future run
let scheduler = self.compute_state.get_scheduler();
let scheduler_inner = scheduler.clone();
let err_collector = self.err_collector.clone();

let subgraph_id =
self.df
.add_subgraph_source("ConstantBatch", send_port, move |_ctx, send_port| {
// find the first timestamp that is greater than now
// use filter_map

let mut after = per_time.split_off(&(*now.borrow() + 1));
// swap
std::mem::swap(&mut per_time, &mut after);
let not_great_than_now = after;

not_great_than_now.into_iter().for_each(|(_ts, rows)| {
err_collector.run(|| {
let rows = rows.into_iter().map(|(row, _ts, _diff)| row).collect();
let batch = Batch::try_from_rows(rows)?;
send_port.give(vec![batch]);
Ok(())
});
});
// schedule the next run
if let Some(next_run_time) = per_time.keys().next().copied() {
scheduler_inner.schedule_at(next_run_time);
}
});
scheduler.set_cur_subgraph(subgraph_id);

CollectionBundle::from_collection(Collection::from_port(recv_port))
}

/// render Constant, take all rows that have a timestamp not greater than the current time
///
/// Always assume input is sorted by timestamp
pub fn render_constant(&mut self, rows: Vec<DiffRow>) -> CollectionBundle {
let (send_port, recv_port) = self.df.make_edge::<_, Toff>("constant");
let mut per_time: BTreeMap<repr::Timestamp, Vec<DiffRow>> = rows
.into_iter()
.group_by(|(_row, ts, _diff)| *ts)
.into_iter()
.map(|(k, v)| (k, v.into_iter().collect_vec()))
.collect();
let mut per_time: BTreeMap<repr::Timestamp, Vec<DiffRow>> = Default::default();
for (key, group) in &rows.into_iter().group_by(|(_row, ts, _diff)| *ts) {
per_time.entry(key).or_default().extend(group);
}

let now = self.compute_state.current_time_ref();
// TODO(discord9): better way to schedule future run
let scheduler = self.compute_state.get_scheduler();
Expand Down
49 changes: 48 additions & 1 deletion src/flow/src/compute/render/map.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,59 @@ use crate::compute::render::Context;
use crate::compute::state::Scheduler;
use crate::compute::types::{Arranged, Collection, CollectionBundle, ErrCollector, Toff};
use crate::error::{Error, PlanSnafu};
use crate::expr::{EvalError, MapFilterProject, MfpPlan, ScalarExpr};
use crate::expr::{Batch, EvalError, MapFilterProject, MfpPlan, ScalarExpr};
use crate::plan::TypedPlan;
use crate::repr::{self, DiffRow, KeyValDiffRow, Row};
use crate::utils::ArrangeHandler;

impl<'referred, 'df> Context<'referred, 'df> {
/// Like `render_mfp` but in batch mode
pub fn render_mfp_batch(
&mut self,
input: Box<TypedPlan>,
mfp: MapFilterProject,
) -> Result<CollectionBundle<Batch>, Error> {
let input = self.render_plan_batch(*input)?;

let (out_send_port, out_recv_port) = self.df.make_edge::<_, Toff<Batch>>("mfp_batch");

// This closure capture following variables:
let mfp_plan = MfpPlan::create_from(mfp)?;

let err_collector = self.err_collector.clone();

// TODO(discord9): better way to schedule future run
let scheduler = self.compute_state.get_scheduler();

let subgraph = self.df.add_subgraph_in_out(
"mfp_batch",
input.collection.into_inner(),
out_send_port,
move |_ctx, recv, send| {
// mfp only need to passively receive updates from recvs
let src_data = recv.take_inner().into_iter().flat_map(|v| v.into_iter());

let output_batches = src_data
.filter_map(|mut input_batch| {
err_collector.run(|| {
let res_batch = mfp_plan.mfp.eval_batch_into(&mut input_batch)?;
Ok(res_batch)
})
})
.collect_vec();

send.give(output_batches);
},
);

// register current subgraph in scheduler for future scheduling
scheduler.set_cur_subgraph(subgraph);

let bundle =
CollectionBundle::from_collection(Collection::<Batch>::from_port(out_recv_port));
Ok(bundle)
}

/// render MapFilterProject, will only emit the `rows` once. Assume all incoming row's sys time being `now`` and ignore the row's stated sys time
/// TODO(discord9): schedule mfp operator to run when temporal filter need
///
Expand Down
Loading

0 comments on commit 8c8499c

Please sign in to comment.