diff --git a/src/storage/hummock_test/src/hummock_storage_tests.rs b/src/storage/hummock_test/src/hummock_storage_tests.rs index a2982dd7efcd1..2f7a827cf6d74 100644 --- a/src/storage/hummock_test/src/hummock_storage_tests.rs +++ b/src/storage/hummock_test/src/hummock_storage_tests.rs @@ -2164,12 +2164,18 @@ async fn test_table_watermark() { .init_for_test_with_prev_epoch(epoch1, prev_epoch) .await .unwrap(); - local1.update_vnode_bitmap(vnode_bitmap1.clone()); + local1 + .update_vnode_bitmap(vnode_bitmap1.clone()) + .await + .unwrap(); local2 .init_for_test_with_prev_epoch(epoch1, prev_epoch) .await .unwrap(); - local2.update_vnode_bitmap(vnode_bitmap2.clone()); + local2 + .update_vnode_bitmap(vnode_bitmap2.clone()) + .await + .unwrap(); fn gen_inner_key(index: usize) -> Bytes { Bytes::copy_from_slice(format!("key_{:05}", index).as_bytes()) diff --git a/src/storage/src/hummock/store/local_hummock_storage.rs b/src/storage/src/hummock/store/local_hummock_storage.rs index 6df89c4937674..2871a22057f83 100644 --- a/src/storage/src/hummock/store/local_hummock_storage.rs +++ b/src/storage/src/hummock/store/local_hummock_storage.rs @@ -22,7 +22,7 @@ use bytes::Bytes; use risingwave_common::bitmap::Bitmap; use risingwave_common::catalog::{TableId, TableOption}; use risingwave_common::hash::VirtualNode; -use risingwave_common::util::epoch::MAX_SPILL_TIMES; +use risingwave_common::util::epoch::{EpochPair, MAX_SPILL_TIMES}; use risingwave_hummock_sdk::key::{is_empty_key_range, vnode_range, TableKey, TableKeyRange}; use risingwave_hummock_sdk::sstable_info::SstableInfo; use risingwave_hummock_sdk::table_watermark::WatermarkSerdeType; @@ -62,7 +62,7 @@ pub struct LocalHummockStorage { mem_table: MemTable, spill_offset: u16, - epoch: Option, + epoch: Option, table_id: TableId, op_consistency_level: OpConsistencyLevel, @@ -477,7 +477,7 @@ impl LocalStateStore for LocalHummockStorage { } fn epoch(&self) -> u64 { - self.epoch.expect("should have set the epoch") + self.epoch.expect("should have set the epoch").curr } fn is_dirty(&self) -> bool { @@ -487,8 +487,9 @@ impl LocalStateStore for LocalHummockStorage { async fn init(&mut self, options: InitOptions) -> StorageResult<()> { let epoch = options.epoch; wait_for_epoch(&self.version_update_notifier_tx, epoch.prev, self.table_id).await?; - assert!( - self.epoch.replace(epoch.curr).is_none(), + assert_eq!( + self.epoch.replace(epoch), + None, "local state store of table id {:?} is init for more than once", self.table_id ); @@ -511,10 +512,13 @@ impl LocalStateStore for LocalHummockStorage { self.mem_table.op_consistency_level.update(new_level); self.op_consistency_level.update(new_level); } - let prev_epoch = self + let epoch = self .epoch - .replace(next_epoch) + .as_mut() .expect("should have init epoch before seal the first epoch"); + let prev_epoch = epoch.curr; + epoch.prev = prev_epoch; + epoch.curr = next_epoch; self.spill_offset = 0; assert!( next_epoch > prev_epoch, @@ -553,12 +557,19 @@ impl LocalStateStore for LocalHummockStorage { } } - fn update_vnode_bitmap(&mut self, vnodes: Arc) -> Arc { + async fn update_vnode_bitmap(&mut self, vnodes: Arc) -> StorageResult> { + wait_for_epoch( + &self.version_update_notifier_tx, + self.epoch.expect("should have init").prev, + self.table_id, + ) + .await?; + assert!(self.mem_table.buffer.is_empty()); let mut read_version = self.read_version.write(); assert!(read_version.staging().is_empty(), "There is uncommitted staging data in read version table_id {:?} instance_id {:?} on vnode bitmap update", - self.table_id(), self.instance_id() + self.table_id(), self.instance_id() ); - read_version.update_vnode_bitmap(vnodes) + Ok(read_version.update_vnode_bitmap(vnodes)) } } diff --git a/src/storage/src/memory.rs b/src/storage/src/memory.rs index d879024606d19..514fa9432c36d 100644 --- a/src/storage/src/memory.rs +++ b/src/storage/src/memory.rs @@ -22,9 +22,10 @@ use std::sync::{Arc, LazyLock}; use bytes::Bytes; use itertools::Itertools; use parking_lot::RwLock; -use risingwave_common::bitmap::Bitmap; +use risingwave_common::bitmap::{Bitmap, BitmapBuilder}; use risingwave_common::catalog::{TableId, TableOption}; use risingwave_common::hash::{VirtualNode, VnodeBitmapExt}; +use risingwave_common::util::epoch::EpochPair; use risingwave_hummock_sdk::key::{ prefixed_range_with_vnode, FullKey, TableKey, TableKeyRange, UserKey, }; @@ -120,7 +121,6 @@ impl RangeKv for BTreeMapRangeKv { pub mod sled { use std::fs::create_dir_all; use std::ops::RangeBounds; - use std::sync::Arc; use bytes::Bytes; use risingwave_hummock_sdk::key::FullKey; @@ -256,14 +256,14 @@ pub mod sled { pub fn new(path: impl AsRef) -> Self { RangeKvStateStore { inner: SledRangeKv::new(path), - table_next_epochs: Arc::new(Default::default()), + tables: Default::default(), } } pub fn new_temp() -> Self { RangeKvStateStore { inner: SledRangeKv::new_temp(), - table_next_epochs: Arc::new(Default::default()), + tables: Default::default(), } } } @@ -514,6 +514,47 @@ mod batched_iter { pub type MemoryStateStore = RangeKvStateStore; +struct TableState { + init_epoch: u64, + next_epochs: BTreeMap, + latest_sealed_epoch: Option, + sealing_epochs: BTreeMap, +} + +impl TableState { + fn new(init_epoch: u64) -> Self { + Self { + init_epoch, + next_epochs: Default::default(), + latest_sealed_epoch: None, + sealing_epochs: Default::default(), + } + } + + async fn wait_epoch( + tables: &parking_lot::Mutex>, + table_id: TableId, + epoch: u64, + ) { + loop { + { + let tables = tables.lock(); + let table_state = tables.get(&table_id).expect("should exist"); + assert!(epoch >= table_state.init_epoch); + if epoch == table_state.init_epoch { + return; + } + if let Some(latest_sealed_epoch) = table_state.latest_sealed_epoch + && latest_sealed_epoch >= epoch + { + return; + } + } + yield_now().await; + } + } +} + /// An in-memory state store /// /// The in-memory state store is a [`BTreeMap`], which maps [`FullKey`] to value. It @@ -524,7 +565,7 @@ pub struct RangeKvStateStore { /// Stores (key, epoch) -> user value. inner: R, /// `table_id` -> `prev_epoch` -> `curr_epoch` - table_next_epochs: Arc>>>, + tables: Arc>>, } fn to_full_key_range(table_id: TableId, table_key_range: R) -> BytesFullKeyRange @@ -690,15 +731,15 @@ impl StateStoreReadLog for RangeKvStateStore { async fn next_epoch(&self, epoch: u64, options: NextEpochOptions) -> StorageResult { loop { { - let table_next_epochs = self.table_next_epochs.lock(); - let Some(table_next_epochs) = table_next_epochs.get(&options.table_id) else { + let tables = self.tables.lock(); + let Some(tables) = tables.get(&options.table_id) else { return Err(HummockError::next_epoch(format!( "table {} not exist", options.table_id )) .into()); }; - if let Some(next_epoch) = table_next_epochs.get(&epoch) { + if let Some(next_epoch) = tables.next_epochs.get(&epoch) { break Ok(*next_epoch); } } @@ -798,7 +839,7 @@ pub struct RangeKvLocalStateStore { mem_table: MemTable, inner: RangeKvStateStore, - epoch: Option, + epoch: Option, table_id: TableId, op_consistency_level: OpConsistencyLevel, @@ -966,7 +1007,7 @@ impl LocalStateStore for RangeKvLocalStateStore { } fn epoch(&self) -> u64 { - self.epoch.expect("should have set the epoch") + self.epoch.expect("should have set the epoch").curr } fn is_dirty(&self) -> bool { @@ -975,17 +1016,20 @@ impl LocalStateStore for RangeKvLocalStateStore { #[allow(clippy::unused_async)] async fn init(&mut self, options: InitOptions) -> StorageResult<()> { - assert!( - self.epoch.replace(options.epoch.curr).is_none(), + assert_eq!( + self.epoch.replace(options.epoch), + None, "epoch in local state store of table id {:?} is init for more than once", self.table_id ); self.inner - .table_next_epochs + .tables .lock() .entry(self.table_id) - .or_default() + .or_insert_with(|| TableState::new(options.epoch.prev)) + .next_epochs .insert(options.epoch.prev, options.epoch.curr); + TableState::wait_epoch(&self.inner.tables, self.table_id, options.epoch.prev).await; Ok(()) } @@ -995,10 +1039,13 @@ impl LocalStateStore for RangeKvLocalStateStore { if let Some(value_checker) = opts.switch_op_consistency_level { self.mem_table.op_consistency_level.update(&value_checker); } - let prev_epoch = self + let epoch = self .epoch - .replace(next_epoch) + .as_mut() .expect("should have init epoch before seal the first epoch"); + let prev_epoch = epoch.curr; + epoch.prev = prev_epoch; + epoch.curr = next_epoch; assert!( next_epoch > prev_epoch, "new epoch {} should be greater than current epoch: {}", @@ -1006,12 +1053,33 @@ impl LocalStateStore for RangeKvLocalStateStore { prev_epoch ); - self.inner - .table_next_epochs - .lock() - .entry(self.table_id) - .or_default() - .insert(prev_epoch, next_epoch); + let mut tables = self.inner.tables.lock(); + let table_state = tables + .get_mut(&self.table_id) + .expect("should be set when init"); + + table_state.next_epochs.insert(prev_epoch, next_epoch); + let sealing_epoch_vnodes = table_state + .sealing_epochs + .entry(prev_epoch) + .or_insert_with(|| BitmapBuilder::zeroed(self.vnodes.len())); + assert_eq!(self.vnodes.len(), sealing_epoch_vnodes.len()); + for vnode in self.vnodes.iter_ones() { + assert!(!sealing_epoch_vnodes.is_set(vnode)); + sealing_epoch_vnodes.set(vnode, true); + } + if (0..self.vnodes.len()).all(|vnode| sealing_epoch_vnodes.is_set(vnode)) { + let (all_sealed_epoch, _) = table_state.sealing_epochs.pop_first().expect("non-empty"); + assert_eq!( + all_sealed_epoch, prev_epoch, + "new all_sealed_epoch must be the current prev epoch" + ); + if let Some(prev_latest_sealed_epoch) = + table_state.latest_sealed_epoch.replace(prev_epoch) + { + assert!(prev_epoch > prev_latest_sealed_epoch); + } + } if let Some((direction, watermarks, _watermark_type)) = opts.table_watermarks { let delete_ranges = watermarks @@ -1052,8 +1120,14 @@ impl LocalStateStore for RangeKvLocalStateStore { Ok(()) } - fn update_vnode_bitmap(&mut self, vnodes: Arc) -> Arc { - std::mem::replace(&mut self.vnodes, vnodes) + async fn update_vnode_bitmap(&mut self, vnodes: Arc) -> StorageResult> { + TableState::wait_epoch( + &self.inner.tables, + self.table_id, + self.epoch.expect("should have init").prev, + ) + .await; + Ok(std::mem::replace(&mut self.vnodes, vnodes)) } fn get_table_watermark(&self, _vnode: VirtualNode) -> Option { diff --git a/src/storage/src/monitor/monitored_store.rs b/src/storage/src/monitor/monitored_store.rs index 3ed30d77b37a8..e8c34337bae99 100644 --- a/src/storage/src/monitor/monitored_store.rs +++ b/src/storage/src/monitor/monitored_store.rs @@ -323,8 +323,8 @@ impl LocalStateStore for MonitoredStateStore { .verbose_instrument_await("store_try_flush") } - fn update_vnode_bitmap(&mut self, vnodes: Arc) -> Arc { - self.inner.update_vnode_bitmap(vnodes) + async fn update_vnode_bitmap(&mut self, vnodes: Arc) -> StorageResult> { + self.inner.update_vnode_bitmap(vnodes).await } fn get_table_watermark(&self, vnode: VirtualNode) -> Option { diff --git a/src/storage/src/monitor/traced_store.rs b/src/storage/src/monitor/traced_store.rs index dff1cba16bb1c..5f856bd2cbaed 100644 --- a/src/storage/src/monitor/traced_store.rs +++ b/src/storage/src/monitor/traced_store.rs @@ -258,8 +258,8 @@ impl LocalStateStore for TracedStateStore { } // TODO: add trace span - fn update_vnode_bitmap(&mut self, vnodes: Arc) -> Arc { - self.inner.update_vnode_bitmap(vnodes) + async fn update_vnode_bitmap(&mut self, vnodes: Arc) -> StorageResult> { + self.inner.update_vnode_bitmap(vnodes).await } fn get_table_watermark(&self, vnode: VirtualNode) -> Option { diff --git a/src/storage/src/panic_store.rs b/src/storage/src/panic_store.rs index bb23384309cd5..ea726dc0f7c62 100644 --- a/src/storage/src/panic_store.rs +++ b/src/storage/src/panic_store.rs @@ -153,7 +153,7 @@ impl LocalStateStore for PanicStateStore { panic!("should not operate on the panic state store!"); } - fn update_vnode_bitmap(&mut self, _vnodes: Arc) -> Arc { + async fn update_vnode_bitmap(&mut self, _vnodes: Arc) -> StorageResult> { panic!("should not operate on the panic state store!"); } diff --git a/src/storage/src/store.rs b/src/storage/src/store.rs index a59b3be9e9de5..ed7df88ee14c3 100644 --- a/src/storage/src/store.rs +++ b/src/storage/src/store.rs @@ -462,7 +462,7 @@ pub trait LocalStateStore: StaticSendSync { // Updates the vnode bitmap corresponding to the local state store // Returns the previous vnode bitmap - fn update_vnode_bitmap(&mut self, vnodes: Arc) -> Arc; + fn update_vnode_bitmap(&mut self, vnodes: Arc) -> impl StorageFuture<'_, Arc>; } /// If `prefetch` is true, prefetch will be enabled. Prefetching may increase the memory diff --git a/src/storage/src/store_impl.rs b/src/storage/src/store_impl.rs index c675f7ee4d126..5f46c6700af13 100644 --- a/src/storage/src/store_impl.rs +++ b/src/storage/src/store_impl.rs @@ -539,12 +539,12 @@ pub mod verify { ret } - fn update_vnode_bitmap(&mut self, vnodes: Arc) -> Arc { - let ret = self.actual.update_vnode_bitmap(vnodes.clone()); + async fn update_vnode_bitmap(&mut self, vnodes: Arc) -> StorageResult> { + let ret = self.actual.update_vnode_bitmap(vnodes.clone()).await?; if let Some(expected) = &mut self.expected { - assert_eq!(ret, expected.update_vnode_bitmap(vnodes)); + assert_eq!(ret, expected.update_vnode_bitmap(vnodes).await?); } - ret + Ok(ret) } fn get_table_watermark(&self, vnode: VirtualNode) -> Option { @@ -997,7 +997,7 @@ mod dyn_state_store { fn seal_current_epoch(&mut self, next_epoch: u64, opts: SealCurrentEpochOptions); - fn update_vnode_bitmap(&mut self, vnodes: Arc) -> Arc; + async fn update_vnode_bitmap(&mut self, vnodes: Arc) -> StorageResult>; fn get_table_watermark(&self, vnode: VirtualNode) -> Option; } @@ -1071,8 +1071,8 @@ mod dyn_state_store { self.seal_current_epoch(next_epoch, opts) } - fn update_vnode_bitmap(&mut self, vnodes: Arc) -> Arc { - self.update_vnode_bitmap(vnodes) + async fn update_vnode_bitmap(&mut self, vnodes: Arc) -> StorageResult> { + self.update_vnode_bitmap(vnodes).await } fn get_table_watermark(&self, vnode: VirtualNode) -> Option { @@ -1159,8 +1159,8 @@ mod dyn_state_store { (*self.0).seal_current_epoch(next_epoch, opts) } - fn update_vnode_bitmap(&mut self, vnodes: Arc) -> Arc { - (*self.0).update_vnode_bitmap(vnodes) + async fn update_vnode_bitmap(&mut self, vnodes: Arc) -> StorageResult> { + (*self.0).update_vnode_bitmap(vnodes).await } } diff --git a/src/stream/src/common/log_store_impl/kv_log_store/state.rs b/src/stream/src/common/log_store_impl/kv_log_store/state.rs index 942554fd12612..f5d530ba23c13 100644 --- a/src/stream/src/common/log_store_impl/kv_log_store/state.rs +++ b/src/stream/src/common/log_store_impl/kv_log_store/state.rs @@ -146,15 +146,20 @@ pub(crate) struct LogStorePostSealCurrentEpoch<'a, S: LocalStateStore> { inner: &'a mut LogStoreWriteState, } -impl LogStorePostSealCurrentEpoch<'_, S> { - pub(crate) fn post_yield_barrier(self, new_vnodes: Option>) { +impl<'a, S: LocalStateStore> LogStorePostSealCurrentEpoch<'a, S> { + pub(crate) async fn post_yield_barrier( + self, + new_vnodes: Option>, + ) -> LogStoreResult<()> { if let Some(new_vnodes) = new_vnodes { self.inner.serde.update_vnode_bitmap(new_vnodes.clone()); self.inner .state_store - .update_vnode_bitmap(new_vnodes.clone()); + .update_vnode_bitmap(new_vnodes.clone()) + .await?; } self.inner.on_post_seal = false; + Ok(()) } } diff --git a/src/stream/src/common/log_store_impl/kv_log_store/writer.rs b/src/stream/src/common/log_store_impl/kv_log_store/writer.rs index b17bcf1f2410f..6aff569f93278 100644 --- a/src/stream/src/common/log_store_impl/kv_log_store/writer.rs +++ b/src/stream/src/common/log_store_impl/kv_log_store/writer.rs @@ -152,7 +152,9 @@ impl LogWriter for KvLogStoreWriter { Ok(LogWriterPostFlushCurrentEpoch::new( move |new_vnodes: Option>| { async move { - post_seal_epoch.post_yield_barrier(new_vnodes.clone()); + post_seal_epoch + .post_yield_barrier(new_vnodes.clone()) + .await?; if let Some(new_vnodes) = new_vnodes { tx.update_vnode(next_epoch, new_vnodes); } diff --git a/src/stream/src/common/table/state_table.rs b/src/stream/src/common/table/state_table.rs index 4eb4b130fd185..00e48f23bb6fd 100644 --- a/src/stream/src/common/table/state_table.rs +++ b/src/stream/src/common/table/state_table.rs @@ -55,11 +55,7 @@ use risingwave_storage::row_serde::row_serde_util::{ deserialize_pk_with_vnode, serialize_pk, serialize_pk_with_vnode, }; use risingwave_storage::row_serde::value_serde::ValueRowSerde; -use risingwave_storage::store::{ - InitOptions, LocalStateStore, NewLocalOptions, OpConsistencyLevel, PrefetchOptions, - ReadLogOptions, ReadOptions, SealCurrentEpochOptions, StateStoreIter, StateStoreIterExt, - TryWaitEpochOptions, -}; +use risingwave_storage::store::*; use risingwave_storage::table::merge_sort::merge_sort; use risingwave_storage::table::{ deserialize_log_stream, ChangeLogRow, KeyedRow, TableDistribution, @@ -715,7 +711,6 @@ pub struct StateTablePostCommit< SD: ValueRowSerde, { inner: &'a mut StateTableInner, - barrier_epoch: EpochPair, } impl<'a, S, SD, const IS_REPLICATED: bool, const USE_WATERMARK_CACHE: bool> @@ -739,10 +734,8 @@ where > { self.inner.on_post_commit = false; Ok(if let Some(new_vnodes) = new_vnodes { - self.inner - .try_wait_committed_epoch(self.barrier_epoch.prev) - .await?; - let (old_vnodes, cache_may_stale) = self.update_vnode_bitmap(new_vnodes.clone()); + let (old_vnodes, cache_may_stale) = + self.update_vnode_bitmap(new_vnodes.clone()).await?; Some(((new_vnodes, old_vnodes, self.inner), cache_may_stale)) } else { None @@ -754,7 +747,10 @@ where } /// Update the vnode bitmap of the state table, returns the previous vnode bitmap. - fn update_vnode_bitmap(&mut self, new_vnodes: Arc) -> (Arc, bool) { + async fn update_vnode_bitmap( + &mut self, + new_vnodes: Arc, + ) -> StreamExecutorResult<(Arc, bool)> { assert!( !self.inner.is_dirty(), "vnode bitmap should only be updated when state table is clean" @@ -762,7 +758,8 @@ where let prev_vnodes = self .inner .local_store - .update_vnode_bitmap(new_vnodes.clone()); + .update_vnode_bitmap(new_vnodes.clone()) + .await?; assert_eq!( &prev_vnodes, self.inner.vnodes(), @@ -787,10 +784,10 @@ where } } - ( + Ok(( self.inner.distribution.update_vnode_bitmap(new_vnodes), cache_may_stale, - ) + )) } } @@ -1147,10 +1144,7 @@ where } self.on_post_commit = true; - Ok(StateTablePostCommit { - inner: self, - barrier_epoch: new_epoch, - }) + Ok(StateTablePostCommit { inner: self }) } /// Commit pending watermark and return vnode bitmap-watermark pairs to seal. diff --git a/src/stream/src/executor/hash_join.rs b/src/stream/src/executor/hash_join.rs index 2656af4890783..617e5a6737c36 100644 --- a/src/stream/src/executor/hash_join.rs +++ b/src/stream/src/executor/hash_join.rs @@ -1496,7 +1496,7 @@ mod tests { OrderType::ascending(), ], &[0, 1, 1], - 0, + 1, ) .await; diff --git a/src/stream/src/executor/now.rs b/src/stream/src/executor/now.rs index ead79bfad840c..652742b7de04e 100644 --- a/src/stream/src/executor/now.rs +++ b/src/stream/src/executor/now.rs @@ -390,7 +390,7 @@ mod tests { let (tx, mut now) = create_executor(NowMode::UpdateCurrent, &state_store).await; tx.send(Barrier::with_prev_epoch_for_test( test_epoch(3), - test_epoch(2), + test_epoch(1), )) .unwrap(); @@ -425,7 +425,7 @@ mod tests { drop((tx, now)); let (tx, mut now) = create_executor(NowMode::UpdateCurrent, &state_store).await; tx.send( - Barrier::with_prev_epoch_for_test(test_epoch(4), test_epoch(3)) + Barrier::with_prev_epoch_for_test(test_epoch(4), test_epoch(1)) .with_mutation(Mutation::Pause), ) .unwrap(); @@ -610,7 +610,7 @@ mod tests { tx.send(Barrier::with_prev_epoch_for_test( test_epoch(4000), - test_epoch(3000), + test_epoch(2000), )) .unwrap(); diff --git a/src/stream/src/executor/sort.rs b/src/stream/src/executor/sort.rs index 5524c235dcb49..a573e8a0da297 100644 --- a/src/stream/src/executor/sort.rs +++ b/src/stream/src/executor/sort.rs @@ -322,7 +322,7 @@ mod tests { create_executor(sort_column_index, store).await; // Push barrier - recovered_tx.push_barrier(test_epoch(3), false); + recovered_tx.push_barrier(test_epoch(2), false); // Consume the barrier recovered_sort_executor.expect_barrier().await; diff --git a/src/stream/src/executor/sync_kv_log_store.rs b/src/stream/src/executor/sync_kv_log_store.rs index 30e1e31f28cd8..79bc778e9a798 100644 --- a/src/stream/src/executor/sync_kv_log_store.rs +++ b/src/stream/src/executor/sync_kv_log_store.rs @@ -315,7 +315,8 @@ impl SyncedKvLogStoreExecutor { let barrier_epoch = barrier.epoch; yield Message::Barrier(barrier); write_state_post_write_barrier - .post_yield_barrier(update_vnode_bitmap.clone()); + .post_yield_barrier(update_vnode_bitmap.clone()) + .await?; if let Some(vnode_bitmap) = update_vnode_bitmap { // Apply Vnode Update read_state.update_vnode_bitmap(vnode_bitmap); diff --git a/src/stream/src/executor/top_n/top_n_plain.rs b/src/stream/src/executor/top_n/top_n_plain.rs index cc4cbaaec66c8..adb79741944eb 100644 --- a/src/stream/src/executor/top_n/top_n_plain.rs +++ b/src/stream/src/executor/top_n/top_n_plain.rs @@ -721,10 +721,10 @@ mod tests { ], }; MockSource::with_messages(vec![ - Message::Barrier(Barrier::new_test_barrier(test_epoch(3))), + Message::Barrier(Barrier::new_test_barrier(test_epoch(2))), Message::Chunk(std::mem::take(&mut chunks[0])), Message::Chunk(std::mem::take(&mut chunks[1])), - Message::Barrier(Barrier::new_test_barrier(test_epoch(4))), + Message::Barrier(Barrier::new_test_barrier(test_epoch(3))), ]) .into_executor(schema, pk_indices()) } @@ -1101,10 +1101,10 @@ mod tests { ], }; MockSource::with_messages(vec![ - Message::Barrier(Barrier::new_test_barrier(test_epoch(3))), + Message::Barrier(Barrier::new_test_barrier(test_epoch(2))), Message::Chunk(std::mem::take(&mut chunks[0])), Message::Chunk(std::mem::take(&mut chunks[1])), - Message::Barrier(Barrier::new_test_barrier(test_epoch(4))), + Message::Barrier(Barrier::new_test_barrier(test_epoch(3))), ]) .into_executor(schema, pk_indices()) } diff --git a/src/stream/src/executor/watermark_filter.rs b/src/stream/src/executor/watermark_filter.rs index 274c3c5b28d26..5eca3a72260e3 100644 --- a/src/stream/src/executor/watermark_filter.rs +++ b/src/stream/src/executor/watermark_filter.rs @@ -603,7 +603,7 @@ mod tests { let mut executor = executor.execute(); // push the 1st barrier after failover - tx.push_barrier(test_epoch(4), false); + tx.push_barrier(test_epoch(3), false); executor.next().await.unwrap().unwrap(); // Init watermark after failover diff --git a/src/stream/tests/integration_tests/eowc_over_window.rs b/src/stream/tests/integration_tests/eowc_over_window.rs index 734e16073aef9..ff84ec086812b 100644 --- a/src/stream/tests/integration_tests/eowc_over_window.rs +++ b/src/stream/tests/integration_tests/eowc_over_window.rs @@ -123,13 +123,13 @@ async fn test_over_window() { # NOTE: no watermark message here, since watermark(1) was already received - !barrier 2 - recovery -- !barrier 3 +- !barrier 2 - !chunk |2 I T I i + 10 p1 103 13 + 12 p2 202 28 + 13 p3 301 39 -- !barrier 4 +- !barrier 3 "###, expect![[r#" - input: !barrier 1 @@ -163,9 +163,9 @@ async fn test_over_window() { - !barrier 2 - input: recovery output: [] - - input: !barrier 3 + - input: !barrier 2 output: - - !barrier 3 + - !barrier 2 - input: !chunk |- +---+----+----+-----+----+ | + | 10 | p1 | 103 | 13 | @@ -179,9 +179,9 @@ async fn test_over_window() { | + | 7 | p2 | 201 | 22 | 20 | 28 | | + | 8 | p3 | 300 | 33 | | 39 | +---+---+----+-----+----+----+----+ - - input: !barrier 4 + - input: !barrier 3 output: - - !barrier 4 + - !barrier 3 "#]], SnapshotOptions::default(), ) diff --git a/src/stream/tests/integration_tests/over_window.rs b/src/stream/tests/integration_tests/over_window.rs index b63a0f1649a15..e112f2fc700b5 100644 --- a/src/stream/tests/integration_tests/over_window.rs +++ b/src/stream/tests/integration_tests/over_window.rs @@ -138,11 +138,11 @@ async fn test_over_window_lag_lead_append_only() { + 5 p1 102 18 - !barrier 2 - recovery - - !barrier 3 + - !barrier 2 - !chunk |2 I T I i + 10 p1 103 13 - - !barrier 4 + - !barrier 3 "###, expect![[r#" - input: !barrier 1 @@ -186,9 +186,9 @@ async fn test_over_window_lag_lead_append_only() { - !barrier 2 - input: recovery output: [] - - input: !barrier 3 + - input: !barrier 2 output: - - !barrier 3 + - !barrier 2 - input: !chunk |- +---+----+----+-----+----+ | + | 10 | p1 | 103 | 13 | @@ -207,9 +207,9 @@ async fn test_over_window_lag_lead_append_only() { | 5 | p1 | 102 | 18 | 16 | 13 | | 10 | p1 | 103 | 13 | 18 | | +----+----+-----+----+----+----+ - - input: !barrier 4 + - input: !barrier 3 output: - - !barrier 4 + - !barrier 3 "#]], snapshot_options(), ) @@ -257,19 +257,19 @@ async fn test_over_window_lag_lead_with_updates() { + 6 p2 203 23 - !barrier 2 - recovery - - !barrier 3 + - !barrier 2 - !chunk |2 I T I i - 6 p2 203 23 U- 2 p1 101 16 U+ 2 p2 101 16 // a partition-change update - - !barrier 4 + - !barrier 3 - recovery - - !barrier 5 + - !barrier 3 - !chunk |2 I T I i + 10 p3 300 30 - - !barrier 6 + - !barrier 4 "###, expect![[r#" - input: !barrier 1 @@ -328,9 +328,9 @@ async fn test_over_window_lag_lead_with_updates() { - !barrier 2 - input: recovery output: [] - - input: !barrier 3 + - input: !barrier 2 output: - - !barrier 3 + - !barrier 2 - input: !chunk |- +----+---+----+-----+----+ | - | 6 | p2 | 203 | 23 | @@ -355,14 +355,14 @@ async fn test_over_window_lag_lead_with_updates() { | 3 | p1 | 100 | 13 | | 18 | | 5 | p1 | 105 | 18 | 13 | | +---+----+-----+----+----+----+ - - input: !barrier 4 + - input: !barrier 3 output: - - !barrier 4 + - !barrier 3 - input: recovery output: [] - - input: !barrier 5 + - input: !barrier 3 output: - - !barrier 5 + - !barrier 3 - input: !chunk |- +---+----+----+-----+----+ | + | 10 | p3 | 300 | 30 | @@ -380,9 +380,9 @@ async fn test_over_window_lag_lead_with_updates() { | 5 | p1 | 105 | 18 | 13 | | | 10 | p3 | 300 | 30 | | | +----+----+-----+----+----+----+ - - input: !barrier 6 + - input: !barrier 4 output: - - !barrier 6 + - !barrier 4 "#]], snapshot_options(), ) @@ -429,19 +429,19 @@ async fn test_over_window_sum() { + 6 p2 203 23 - !barrier 2 - recovery - - !barrier 3 + - !barrier 2 - !chunk |2 I T I i - 6 p2 203 23 U- 2 p1 101 16 U+ 2 p2 101 16 // a partition-change update - - !barrier 4 + - !barrier 3 - recovery - - !barrier 5 + - !barrier 3 - !chunk |2 I T I i + 10 p3 300 30 - - !barrier 6 + - !barrier 4 "###, expect![[r#" - input: !barrier 1 @@ -504,9 +504,9 @@ async fn test_over_window_sum() { - !barrier 2 - input: recovery output: [] - - input: !barrier 3 + - input: !barrier 2 output: - - !barrier 3 + - !barrier 2 - input: !chunk |- +----+---+----+-----+----+ | - | 6 | p2 | 203 | 23 | @@ -532,14 +532,14 @@ async fn test_over_window_sum() { | 3 | p1 | 100 | 13 | 35 | | 5 | p1 | 105 | 18 | 13 | +---+----+-----+----+----+ - - input: !barrier 4 + - input: !barrier 3 output: - - !barrier 4 + - !barrier 3 - input: recovery output: [] - - input: !barrier 5 + - input: !barrier 3 output: - - !barrier 5 + - !barrier 3 - input: !chunk |- +---+----+----+-----+----+ | + | 10 | p3 | 300 | 30 | @@ -558,9 +558,9 @@ async fn test_over_window_sum() { | 5 | p1 | 105 | 18 | 13 | | 10 | p3 | 300 | 30 | | +----+----+-----+----+----+ - - input: !barrier 6 + - input: !barrier 4 output: - - !barrier 6 + - !barrier 4 "#]], snapshot_options(), )