Skip to content

Commit

Permalink
feat: render_reduce's arrangement expire after time passed
Browse files Browse the repository at this point in the history
  • Loading branch information
discord9 committed May 16, 2024
1 parent fe34ebf commit f48c831
Show file tree
Hide file tree
Showing 4 changed files with 40 additions and 3 deletions.
2 changes: 1 addition & 1 deletion src/flow/src/compute/render.rs
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ impl<'referred, 'df> Context<'referred, 'df> {
input,
key_val_plan,
reduce_plan,
} => self.render_reduce(input, key_val_plan, reduce_plan),
} => self.render_reduce(input, key_val_plan, reduce_plan, plan.typ),
Plan::Join { .. } => NotImplementedSnafu {
reason: "Join is still WIP",
}
Expand Down
19 changes: 17 additions & 2 deletions src/flow/src/compute/render/reduce.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,8 @@ use crate::compute::types::{Arranged, Collection, CollectionBundle, ErrCollector
use crate::expr::error::{DataTypeSnafu, InternalSnafu};
use crate::expr::{AggregateExpr, EvalError, ScalarExpr};
use crate::plan::{AccumulablePlan, AggrWithIndex, KeyValPlan, Plan, ReducePlan, TypedPlan};
use crate::repr::{self, DiffRow, KeyValDiffRow, Row};
use crate::utils::{ArrangeHandler, ArrangeReader, ArrangeWriter};
use crate::repr::{self, DiffRow, KeyValDiffRow, RelationType, Row};
use crate::utils::{ArrangeHandler, ArrangeReader, ArrangeWriter, KeyExpiryManager};

impl<'referred, 'df> Context<'referred, 'df> {
const REDUCE: &'static str = "reduce";
Expand All @@ -42,6 +42,7 @@ impl<'referred, 'df> Context<'referred, 'df> {
input: Box<TypedPlan>,
key_val_plan: KeyValPlan,
reduce_plan: ReducePlan,
output_type: RelationType,
) -> Result<CollectionBundle, Error> {
let input = self.render_plan(*input)?;
// first assembly key&val that's ((Row, Row), tick, diff)
Expand All @@ -52,6 +53,15 @@ impl<'referred, 'df> Context<'referred, 'df> {

// TODO(discord9): config global expire time from self
let arrange_handler = self.compute_state.new_arrange(None);

if let (Some(time_index), Some(expire_after)) =
(output_type.time_index, self.compute_state.expire_after())
{
let expire_man =
KeyExpiryManager::new(Some(expire_after), Some(ScalarExpr::Column(time_index)));
arrange_handler.write().set_expire_state(expire_man);
}

// reduce need full arrangement to be able to query all keys
let arrange_handler_inner = arrange_handler.clone_full_arrange().context(PlanSnafu {
reason: "No write is expected at this point",
Expand Down Expand Up @@ -776,6 +786,7 @@ mod test {
Box::new(input_plan.with_types(typ)),
key_val_plan,
reduce_plan,
RelationType::empty(),
)
.unwrap();

Expand Down Expand Up @@ -850,6 +861,7 @@ mod test {
Box::new(input_plan.with_types(typ)),
key_val_plan,
reduce_plan,
RelationType::empty(),
)
.unwrap();

Expand Down Expand Up @@ -930,6 +942,7 @@ mod test {
Box::new(input_plan.with_types(typ)),
key_val_plan,
reduce_plan,
RelationType::empty(),
)
.unwrap();

Expand Down Expand Up @@ -1006,6 +1019,7 @@ mod test {
Box::new(input_plan.with_types(typ)),
key_val_plan,
reduce_plan,
RelationType::empty(),
)
.unwrap();

Expand Down Expand Up @@ -1097,6 +1111,7 @@ mod test {
Box::new(input_plan.with_types(typ)),
key_val_plan,
reduce_plan,
RelationType::empty(),
)
.unwrap();

Expand Down
6 changes: 6 additions & 0 deletions src/flow/src/compute/state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,8 @@ pub struct DataflowState {
/// save all used arrange in this dataflow, since usually there is no delete operation
/// we can just keep track of all used arrange and schedule subgraph when they need to be updated
arrange_used: Vec<ArrangeHandler>,
/// the time arrangement need to be expired after a certain time in milliseconds
expire_after: Option<Timestamp>,
}

impl DataflowState {
Expand Down Expand Up @@ -99,6 +101,10 @@ impl DataflowState {
pub fn get_err_collector(&self) -> ErrCollector {
self.err_collector.clone()
}

pub fn expire_after(&self) -> Option<Timestamp> {
self.expire_after
}
}

#[derive(Debug, Clone)]
Expand Down
16 changes: 16 additions & 0 deletions src/flow/src/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ use crate::repr::{value_to_internal_ts, Diff, DiffRow, Duration, KeyValDiffRow,
pub type Batch = BTreeMap<Row, SmallVec<[DiffRow; 2]>>;

/// A spine of batches, arranged by timestamp
/// TODO(discord9): consider internally index by key, value, and timestamp for faster lookup
pub type Spine = BTreeMap<Timestamp, Batch>;

/// Determine when should a key expire according to it's event timestamp in key.
Expand All @@ -51,6 +52,17 @@ pub struct KeyExpiryManager {
}

impl KeyExpiryManager {
pub fn new(
key_expiration_duration: Option<Duration>,
event_timestamp_from_row: Option<ScalarExpr>,
) -> Self {
Self {
event_ts_to_key: Default::default(),
key_expiration_duration,
event_timestamp_from_row,
}
}

/// Extract event timestamp from key row.
///
/// If no expire state is set, return None.
Expand Down Expand Up @@ -177,6 +189,10 @@ impl Arrangement {
}
}

pub fn set_expire_state(&mut self, expire_state: KeyExpiryManager) {
self.expire_state = Some(expire_state);
}

/// Apply updates into spine, with no respect of whether the updates are in futures, past, or now.
///
/// Return the maximum expire time (already expire by how much time) of all updates if any keys is already expired.
Expand Down

0 comments on commit f48c831

Please sign in to comment.