diff --git a/src/flow/src/compute/render.rs b/src/flow/src/compute/render.rs index 0476c8a6e5ac..437efc8c46f6 100644 --- a/src/flow/src/compute/render.rs +++ b/src/flow/src/compute/render.rs @@ -111,7 +111,7 @@ impl<'referred, 'df> Context<'referred, 'df> { input, key_val_plan, reduce_plan, - } => self.render_reduce(input, key_val_plan, reduce_plan), + } => self.render_reduce(input, key_val_plan, reduce_plan, plan.typ), Plan::Join { .. } => NotImplementedSnafu { reason: "Join is still WIP", } diff --git a/src/flow/src/compute/render/reduce.rs b/src/flow/src/compute/render/reduce.rs index 46b2dc196f00..43bc54d2911b 100644 --- a/src/flow/src/compute/render/reduce.rs +++ b/src/flow/src/compute/render/reduce.rs @@ -29,8 +29,8 @@ use crate::compute::types::{Arranged, Collection, CollectionBundle, ErrCollector use crate::expr::error::{DataTypeSnafu, InternalSnafu}; use crate::expr::{AggregateExpr, EvalError, ScalarExpr}; use crate::plan::{AccumulablePlan, AggrWithIndex, KeyValPlan, Plan, ReducePlan, TypedPlan}; -use crate::repr::{self, DiffRow, KeyValDiffRow, Row}; -use crate::utils::{ArrangeHandler, ArrangeReader, ArrangeWriter}; +use crate::repr::{self, DiffRow, KeyValDiffRow, RelationType, Row}; +use crate::utils::{ArrangeHandler, ArrangeReader, ArrangeWriter, KeyExpiryManager}; impl<'referred, 'df> Context<'referred, 'df> { const REDUCE: &'static str = "reduce"; @@ -42,6 +42,7 @@ impl<'referred, 'df> Context<'referred, 'df> { input: Box, key_val_plan: KeyValPlan, reduce_plan: ReducePlan, + output_type: RelationType, ) -> Result { let input = self.render_plan(*input)?; // first assembly key&val that's ((Row, Row), tick, diff) @@ -52,6 +53,15 @@ impl<'referred, 'df> Context<'referred, 'df> { // TODO(discord9): config global expire time from self let arrange_handler = self.compute_state.new_arrange(None); + + if let (Some(time_index), Some(expire_after)) = + (output_type.time_index, self.compute_state.expire_after()) + { + let expire_man = + KeyExpiryManager::new(Some(expire_after), Some(ScalarExpr::Column(time_index))); + arrange_handler.write().set_expire_state(expire_man); + } + // reduce need full arrangement to be able to query all keys let arrange_handler_inner = arrange_handler.clone_full_arrange().context(PlanSnafu { reason: "No write is expected at this point", @@ -776,6 +786,7 @@ mod test { Box::new(input_plan.with_types(typ)), key_val_plan, reduce_plan, + RelationType::empty(), ) .unwrap(); @@ -850,6 +861,7 @@ mod test { Box::new(input_plan.with_types(typ)), key_val_plan, reduce_plan, + RelationType::empty(), ) .unwrap(); @@ -930,6 +942,7 @@ mod test { Box::new(input_plan.with_types(typ)), key_val_plan, reduce_plan, + RelationType::empty(), ) .unwrap(); @@ -1006,6 +1019,7 @@ mod test { Box::new(input_plan.with_types(typ)), key_val_plan, reduce_plan, + RelationType::empty(), ) .unwrap(); @@ -1097,6 +1111,7 @@ mod test { Box::new(input_plan.with_types(typ)), key_val_plan, reduce_plan, + RelationType::empty(), ) .unwrap(); diff --git a/src/flow/src/compute/state.rs b/src/flow/src/compute/state.rs index a9a431de97ef..13aa586bfdc7 100644 --- a/src/flow/src/compute/state.rs +++ b/src/flow/src/compute/state.rs @@ -42,6 +42,8 @@ pub struct DataflowState { /// save all used arrange in this dataflow, since usually there is no delete operation /// we can just keep track of all used arrange and schedule subgraph when they need to be updated arrange_used: Vec, + /// the time arrangement need to be expired after a certain time in milliseconds + expire_after: Option, } impl DataflowState { @@ -99,6 +101,10 @@ impl DataflowState { pub fn get_err_collector(&self) -> ErrCollector { self.err_collector.clone() } + + pub fn expire_after(&self) -> Option { + self.expire_after + } } #[derive(Debug, Clone)] diff --git a/src/flow/src/utils.rs b/src/flow/src/utils.rs index a6887a753b7e..93edf176e77a 100644 --- a/src/flow/src/utils.rs +++ b/src/flow/src/utils.rs @@ -31,6 +31,7 @@ use crate::repr::{value_to_internal_ts, Diff, DiffRow, Duration, KeyValDiffRow, pub type Batch = BTreeMap>; /// A spine of batches, arranged by timestamp +/// TODO(discord9): consider internally index by key, value, and timestamp for faster lookup pub type Spine = BTreeMap; /// Determine when should a key expire according to it's event timestamp in key. @@ -51,6 +52,17 @@ pub struct KeyExpiryManager { } impl KeyExpiryManager { + pub fn new( + key_expiration_duration: Option, + event_timestamp_from_row: Option, + ) -> Self { + Self { + event_ts_to_key: Default::default(), + key_expiration_duration, + event_timestamp_from_row, + } + } + /// Extract event timestamp from key row. /// /// If no expire state is set, return None. @@ -177,6 +189,10 @@ impl Arrangement { } } + pub fn set_expire_state(&mut self, expire_state: KeyExpiryManager) { + self.expire_state = Some(expire_state); + } + /// Apply updates into spine, with no respect of whether the updates are in futures, past, or now. /// /// Return the maximum expire time (already expire by how much time) of all updates if any keys is already expired.