Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

perf(flow): Map&Reduce Operator use batch to reduce alloc #4567

Merged
merged 24 commits into from
Aug 29, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
91 changes: 82 additions & 9 deletions src/flow/src/compute/render.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ use super::state::Scheduler;
use crate::compute::state::DataflowState;
use crate::compute::types::{Collection, CollectionBundle, ErrCollector, Toff};
use crate::error::{Error, InvalidQuerySnafu, NotImplementedSnafu};
use crate::expr::{self, GlobalId, LocalId};
use crate::expr::{self, Batch, GlobalId, LocalId};
use crate::plan::{Plan, TypedPlan};
use crate::repr::{self, DiffRow};

Expand Down Expand Up @@ -87,9 +87,38 @@ impl<'referred, 'df> Context<'referred, 'df> {
}

impl<'referred, 'df> Context<'referred, 'df> {
/// Interpret and execute plan
/// Like `render_plan` but in Batch Mode
pub fn render_plan_batch(&mut self, plan: TypedPlan) -> Result<CollectionBundle<Batch>, Error> {
match plan.plan {
Plan::Constant { rows } => Ok(self.render_constant_batch(rows)),
Plan::Get { .. } => NotImplementedSnafu {
reason: "Get is still WIP in batchmode",
}
.fail(),
Plan::Let { .. } => NotImplementedSnafu {
reason: "Let is still WIP in batchmode",
}
.fail(),
Plan::Mfp { input, mfp } => self.render_mfp_batch(input, mfp),
Plan::Reduce {
input,
key_val_plan,
reduce_plan,
} => self.render_reduce_batch(input, &key_val_plan, &reduce_plan, &plan.schema.typ),
Plan::Join { .. } => NotImplementedSnafu {
reason: "Join is still WIP",
}
.fail(),
Plan::Union { .. } => NotImplementedSnafu {
reason: "Union is still WIP",
}
.fail(),
}
}

/// Interpret plan to dataflow and prepare them for execution
///
/// return the output of this plan
/// return the output handler of this plan
pub fn render_plan(&mut self, plan: TypedPlan) -> Result<CollectionBundle, Error> {
match plan.plan {
Plan::Constant { rows } => Ok(self.render_constant(rows)),
Expand All @@ -112,17 +141,61 @@ impl<'referred, 'df> Context<'referred, 'df> {
}
}

/// render Constant, take all rows that have a timestamp not greater than the current time
/// This function is primarily used for testing
/// Always assume input is sorted by timestamp
pub fn render_constant_batch(&mut self, rows: Vec<DiffRow>) -> CollectionBundle<Batch> {
let (send_port, recv_port) = self.df.make_edge::<_, Toff<Batch>>("constant_batch");
let mut per_time: BTreeMap<repr::Timestamp, Vec<DiffRow>> = Default::default();
for (key, group) in &rows.into_iter().group_by(|(_row, ts, _diff)| *ts) {
per_time.entry(key).or_default().extend(group);
}

let now = self.compute_state.current_time_ref();
// TODO(discord9): better way to schedule future run
let scheduler = self.compute_state.get_scheduler();
let scheduler_inner = scheduler.clone();
let err_collector = self.err_collector.clone();

let subgraph_id =
self.df
.add_subgraph_source("ConstantBatch", send_port, move |_ctx, send_port| {
// find the first timestamp that is greater than now
// use filter_map

let mut after = per_time.split_off(&(*now.borrow() + 1));
// swap
std::mem::swap(&mut per_time, &mut after);
let not_great_than_now = after;

not_great_than_now.into_iter().for_each(|(_ts, rows)| {
err_collector.run(|| {
let rows = rows.into_iter().map(|(row, _ts, _diff)| row).collect();
let batch = Batch::try_from_rows(rows)?;
send_port.give(vec![batch]);
Ok(())
});
});
// schedule the next run
if let Some(next_run_time) = per_time.keys().next().copied() {
scheduler_inner.schedule_at(next_run_time);
}
});
scheduler.set_cur_subgraph(subgraph_id);

CollectionBundle::from_collection(Collection::from_port(recv_port))
}

/// render Constant, take all rows that have a timestamp not greater than the current time
///
/// Always assume input is sorted by timestamp
pub fn render_constant(&mut self, rows: Vec<DiffRow>) -> CollectionBundle {
let (send_port, recv_port) = self.df.make_edge::<_, Toff>("constant");
let mut per_time: BTreeMap<repr::Timestamp, Vec<DiffRow>> = rows
.into_iter()
.group_by(|(_row, ts, _diff)| *ts)
.into_iter()
.map(|(k, v)| (k, v.into_iter().collect_vec()))
.collect();
let mut per_time: BTreeMap<repr::Timestamp, Vec<DiffRow>> = Default::default();
for (key, group) in &rows.into_iter().group_by(|(_row, ts, _diff)| *ts) {
per_time.entry(key).or_default().extend(group);
}

let now = self.compute_state.current_time_ref();
// TODO(discord9): better way to schedule future run
let scheduler = self.compute_state.get_scheduler();
Expand Down
49 changes: 48 additions & 1 deletion src/flow/src/compute/render/map.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,59 @@ use crate::compute::render::Context;
use crate::compute::state::Scheduler;
use crate::compute::types::{Arranged, Collection, CollectionBundle, ErrCollector, Toff};
use crate::error::{Error, PlanSnafu};
use crate::expr::{EvalError, MapFilterProject, MfpPlan, ScalarExpr};
use crate::expr::{Batch, EvalError, MapFilterProject, MfpPlan, ScalarExpr};
use crate::plan::TypedPlan;
use crate::repr::{self, DiffRow, KeyValDiffRow, Row};
use crate::utils::ArrangeHandler;

impl<'referred, 'df> Context<'referred, 'df> {
/// Like `render_mfp` but in batch mode
pub fn render_mfp_batch(
&mut self,
input: Box<TypedPlan>,
mfp: MapFilterProject,
) -> Result<CollectionBundle<Batch>, Error> {
let input = self.render_plan_batch(*input)?;

let (out_send_port, out_recv_port) = self.df.make_edge::<_, Toff<Batch>>("mfp_batch");

// This closure capture following variables:
let mfp_plan = MfpPlan::create_from(mfp)?;

let err_collector = self.err_collector.clone();

// TODO(discord9): better way to schedule future run
let scheduler = self.compute_state.get_scheduler();

let subgraph = self.df.add_subgraph_in_out(
"mfp_batch",
input.collection.into_inner(),
out_send_port,
move |_ctx, recv, send| {
// mfp only need to passively receive updates from recvs
let src_data = recv.take_inner().into_iter().flat_map(|v| v.into_iter());

let output_batches = src_data
.filter_map(|mut input_batch| {
err_collector.run(|| {
let res_batch = mfp_plan.mfp.eval_batch_into(&mut input_batch)?;
Ok(res_batch)
})
})
.collect_vec();

send.give(output_batches);
},
);

// register current subgraph in scheduler for future scheduling
scheduler.set_cur_subgraph(subgraph);

let bundle =
CollectionBundle::from_collection(Collection::<Batch>::from_port(out_recv_port));
Ok(bundle)
}

/// render MapFilterProject, will only emit the `rows` once. Assume all incoming row's sys time being `now`` and ignore the row's stated sys time
/// TODO(discord9): schedule mfp operator to run when temporal filter need
///
Expand Down
Loading