diff --git a/src/flow/src/adapter/worker.rs b/src/flow/src/adapter/worker.rs index 3e58a4307c79..b4a175ed6dff 100644 --- a/src/flow/src/adapter/worker.rs +++ b/src/flow/src/adapter/worker.rs @@ -208,7 +208,6 @@ impl<'s> Worker<'s> { create_if_not_exist: bool, err_collector: ErrCollector, ) -> Result, Error> { - let _ = expire_when; let already_exist = self.task_states.contains_key(&flow_id); match (already_exist, create_if_not_exist) { (true, true) => return Ok(None), @@ -220,6 +219,7 @@ impl<'s> Worker<'s> { err_collector, ..Default::default() }; + cur_task_state.state.set_expire_after(expire_when); { let mut ctx = cur_task_state.new_ctx(sink_id); diff --git a/src/flow/src/compute/state.rs b/src/flow/src/compute/state.rs index 13aa586bfdc7..a9356005546c 100644 --- a/src/flow/src/compute/state.rs +++ b/src/flow/src/compute/state.rs @@ -102,6 +102,10 @@ impl DataflowState { self.err_collector.clone() } + pub fn set_expire_after(&mut self, after: Option) { + self.expire_after = after; + } + pub fn expire_after(&self) -> Option { self.expire_after }