From ee10be76fe92dc210b052a7599dbd3b21aa1a01e Mon Sep 17 00:00:00 2001 From: William Wen Date: Thu, 13 Feb 2025 19:46:00 +0800 Subject: [PATCH] feat(storage): wait committed epoch when update vnode bitmap --- .../hummock_test/src/hummock_storage_tests.rs | 10 ++++- .../hummock/store/local_hummock_storage.rs | 31 ++++++++----- src/storage/src/memory.rs | 4 +- src/storage/src/monitor/monitored_store.rs | 4 +- src/storage/src/monitor/traced_store.rs | 4 +- src/storage/src/panic_store.rs | 2 +- src/storage/src/store.rs | 2 +- src/storage/src/store_impl.rs | 18 ++++---- .../log_store_impl/kv_log_store/state.rs | 11 +++-- .../log_store_impl/kv_log_store/writer.rs | 4 +- src/stream/src/common/table/state_table.rs | 44 ++++++------------- src/stream/src/executor/sync_kv_log_store.rs | 3 +- 12 files changed, 72 insertions(+), 65 deletions(-) diff --git a/src/storage/hummock_test/src/hummock_storage_tests.rs b/src/storage/hummock_test/src/hummock_storage_tests.rs index a2982dd7efcd1..2f7a827cf6d74 100644 --- a/src/storage/hummock_test/src/hummock_storage_tests.rs +++ b/src/storage/hummock_test/src/hummock_storage_tests.rs @@ -2164,12 +2164,18 @@ async fn test_table_watermark() { .init_for_test_with_prev_epoch(epoch1, prev_epoch) .await .unwrap(); - local1.update_vnode_bitmap(vnode_bitmap1.clone()); + local1 + .update_vnode_bitmap(vnode_bitmap1.clone()) + .await + .unwrap(); local2 .init_for_test_with_prev_epoch(epoch1, prev_epoch) .await .unwrap(); - local2.update_vnode_bitmap(vnode_bitmap2.clone()); + local2 + .update_vnode_bitmap(vnode_bitmap2.clone()) + .await + .unwrap(); fn gen_inner_key(index: usize) -> Bytes { Bytes::copy_from_slice(format!("key_{:05}", index).as_bytes()) diff --git a/src/storage/src/hummock/store/local_hummock_storage.rs b/src/storage/src/hummock/store/local_hummock_storage.rs index 6df89c4937674..2871a22057f83 100644 --- a/src/storage/src/hummock/store/local_hummock_storage.rs +++ b/src/storage/src/hummock/store/local_hummock_storage.rs @@ -22,7 +22,7 @@ use bytes::Bytes; use risingwave_common::bitmap::Bitmap; use risingwave_common::catalog::{TableId, TableOption}; use risingwave_common::hash::VirtualNode; -use risingwave_common::util::epoch::MAX_SPILL_TIMES; +use risingwave_common::util::epoch::{EpochPair, MAX_SPILL_TIMES}; use risingwave_hummock_sdk::key::{is_empty_key_range, vnode_range, TableKey, TableKeyRange}; use risingwave_hummock_sdk::sstable_info::SstableInfo; use risingwave_hummock_sdk::table_watermark::WatermarkSerdeType; @@ -62,7 +62,7 @@ pub struct LocalHummockStorage { mem_table: MemTable, spill_offset: u16, - epoch: Option, + epoch: Option, table_id: TableId, op_consistency_level: OpConsistencyLevel, @@ -477,7 +477,7 @@ impl LocalStateStore for LocalHummockStorage { } fn epoch(&self) -> u64 { - self.epoch.expect("should have set the epoch") + self.epoch.expect("should have set the epoch").curr } fn is_dirty(&self) -> bool { @@ -487,8 +487,9 @@ impl LocalStateStore for LocalHummockStorage { async fn init(&mut self, options: InitOptions) -> StorageResult<()> { let epoch = options.epoch; wait_for_epoch(&self.version_update_notifier_tx, epoch.prev, self.table_id).await?; - assert!( - self.epoch.replace(epoch.curr).is_none(), + assert_eq!( + self.epoch.replace(epoch), + None, "local state store of table id {:?} is init for more than once", self.table_id ); @@ -511,10 +512,13 @@ impl LocalStateStore for LocalHummockStorage { self.mem_table.op_consistency_level.update(new_level); self.op_consistency_level.update(new_level); } - let prev_epoch = self + let epoch = self .epoch - .replace(next_epoch) + .as_mut() .expect("should have init epoch before seal the first epoch"); + let prev_epoch = epoch.curr; + epoch.prev = prev_epoch; + epoch.curr = next_epoch; self.spill_offset = 0; assert!( next_epoch > prev_epoch, @@ -553,12 +557,19 @@ impl LocalStateStore for LocalHummockStorage { } } - fn update_vnode_bitmap(&mut self, vnodes: Arc) -> Arc { + async fn update_vnode_bitmap(&mut self, vnodes: Arc) -> StorageResult> { + wait_for_epoch( + &self.version_update_notifier_tx, + self.epoch.expect("should have init").prev, + self.table_id, + ) + .await?; + assert!(self.mem_table.buffer.is_empty()); let mut read_version = self.read_version.write(); assert!(read_version.staging().is_empty(), "There is uncommitted staging data in read version table_id {:?} instance_id {:?} on vnode bitmap update", - self.table_id(), self.instance_id() + self.table_id(), self.instance_id() ); - read_version.update_vnode_bitmap(vnodes) + Ok(read_version.update_vnode_bitmap(vnodes)) } } diff --git a/src/storage/src/memory.rs b/src/storage/src/memory.rs index d879024606d19..c947ad04b3c97 100644 --- a/src/storage/src/memory.rs +++ b/src/storage/src/memory.rs @@ -1052,8 +1052,8 @@ impl LocalStateStore for RangeKvLocalStateStore { Ok(()) } - fn update_vnode_bitmap(&mut self, vnodes: Arc) -> Arc { - std::mem::replace(&mut self.vnodes, vnodes) + async fn update_vnode_bitmap(&mut self, vnodes: Arc) -> StorageResult> { + Ok(std::mem::replace(&mut self.vnodes, vnodes)) } fn get_table_watermark(&self, _vnode: VirtualNode) -> Option { diff --git a/src/storage/src/monitor/monitored_store.rs b/src/storage/src/monitor/monitored_store.rs index 3ed30d77b37a8..e8c34337bae99 100644 --- a/src/storage/src/monitor/monitored_store.rs +++ b/src/storage/src/monitor/monitored_store.rs @@ -323,8 +323,8 @@ impl LocalStateStore for MonitoredStateStore { .verbose_instrument_await("store_try_flush") } - fn update_vnode_bitmap(&mut self, vnodes: Arc) -> Arc { - self.inner.update_vnode_bitmap(vnodes) + async fn update_vnode_bitmap(&mut self, vnodes: Arc) -> StorageResult> { + self.inner.update_vnode_bitmap(vnodes).await } fn get_table_watermark(&self, vnode: VirtualNode) -> Option { diff --git a/src/storage/src/monitor/traced_store.rs b/src/storage/src/monitor/traced_store.rs index dff1cba16bb1c..5f856bd2cbaed 100644 --- a/src/storage/src/monitor/traced_store.rs +++ b/src/storage/src/monitor/traced_store.rs @@ -258,8 +258,8 @@ impl LocalStateStore for TracedStateStore { } // TODO: add trace span - fn update_vnode_bitmap(&mut self, vnodes: Arc) -> Arc { - self.inner.update_vnode_bitmap(vnodes) + async fn update_vnode_bitmap(&mut self, vnodes: Arc) -> StorageResult> { + self.inner.update_vnode_bitmap(vnodes).await } fn get_table_watermark(&self, vnode: VirtualNode) -> Option { diff --git a/src/storage/src/panic_store.rs b/src/storage/src/panic_store.rs index bb23384309cd5..ea726dc0f7c62 100644 --- a/src/storage/src/panic_store.rs +++ b/src/storage/src/panic_store.rs @@ -153,7 +153,7 @@ impl LocalStateStore for PanicStateStore { panic!("should not operate on the panic state store!"); } - fn update_vnode_bitmap(&mut self, _vnodes: Arc) -> Arc { + async fn update_vnode_bitmap(&mut self, _vnodes: Arc) -> StorageResult> { panic!("should not operate on the panic state store!"); } diff --git a/src/storage/src/store.rs b/src/storage/src/store.rs index a59b3be9e9de5..ed7df88ee14c3 100644 --- a/src/storage/src/store.rs +++ b/src/storage/src/store.rs @@ -462,7 +462,7 @@ pub trait LocalStateStore: StaticSendSync { // Updates the vnode bitmap corresponding to the local state store // Returns the previous vnode bitmap - fn update_vnode_bitmap(&mut self, vnodes: Arc) -> Arc; + fn update_vnode_bitmap(&mut self, vnodes: Arc) -> impl StorageFuture<'_, Arc>; } /// If `prefetch` is true, prefetch will be enabled. Prefetching may increase the memory diff --git a/src/storage/src/store_impl.rs b/src/storage/src/store_impl.rs index c675f7ee4d126..5f46c6700af13 100644 --- a/src/storage/src/store_impl.rs +++ b/src/storage/src/store_impl.rs @@ -539,12 +539,12 @@ pub mod verify { ret } - fn update_vnode_bitmap(&mut self, vnodes: Arc) -> Arc { - let ret = self.actual.update_vnode_bitmap(vnodes.clone()); + async fn update_vnode_bitmap(&mut self, vnodes: Arc) -> StorageResult> { + let ret = self.actual.update_vnode_bitmap(vnodes.clone()).await?; if let Some(expected) = &mut self.expected { - assert_eq!(ret, expected.update_vnode_bitmap(vnodes)); + assert_eq!(ret, expected.update_vnode_bitmap(vnodes).await?); } - ret + Ok(ret) } fn get_table_watermark(&self, vnode: VirtualNode) -> Option { @@ -997,7 +997,7 @@ mod dyn_state_store { fn seal_current_epoch(&mut self, next_epoch: u64, opts: SealCurrentEpochOptions); - fn update_vnode_bitmap(&mut self, vnodes: Arc) -> Arc; + async fn update_vnode_bitmap(&mut self, vnodes: Arc) -> StorageResult>; fn get_table_watermark(&self, vnode: VirtualNode) -> Option; } @@ -1071,8 +1071,8 @@ mod dyn_state_store { self.seal_current_epoch(next_epoch, opts) } - fn update_vnode_bitmap(&mut self, vnodes: Arc) -> Arc { - self.update_vnode_bitmap(vnodes) + async fn update_vnode_bitmap(&mut self, vnodes: Arc) -> StorageResult> { + self.update_vnode_bitmap(vnodes).await } fn get_table_watermark(&self, vnode: VirtualNode) -> Option { @@ -1159,8 +1159,8 @@ mod dyn_state_store { (*self.0).seal_current_epoch(next_epoch, opts) } - fn update_vnode_bitmap(&mut self, vnodes: Arc) -> Arc { - (*self.0).update_vnode_bitmap(vnodes) + async fn update_vnode_bitmap(&mut self, vnodes: Arc) -> StorageResult> { + (*self.0).update_vnode_bitmap(vnodes).await } } diff --git a/src/stream/src/common/log_store_impl/kv_log_store/state.rs b/src/stream/src/common/log_store_impl/kv_log_store/state.rs index 942554fd12612..f5d530ba23c13 100644 --- a/src/stream/src/common/log_store_impl/kv_log_store/state.rs +++ b/src/stream/src/common/log_store_impl/kv_log_store/state.rs @@ -146,15 +146,20 @@ pub(crate) struct LogStorePostSealCurrentEpoch<'a, S: LocalStateStore> { inner: &'a mut LogStoreWriteState, } -impl LogStorePostSealCurrentEpoch<'_, S> { - pub(crate) fn post_yield_barrier(self, new_vnodes: Option>) { +impl<'a, S: LocalStateStore> LogStorePostSealCurrentEpoch<'a, S> { + pub(crate) async fn post_yield_barrier( + self, + new_vnodes: Option>, + ) -> LogStoreResult<()> { if let Some(new_vnodes) = new_vnodes { self.inner.serde.update_vnode_bitmap(new_vnodes.clone()); self.inner .state_store - .update_vnode_bitmap(new_vnodes.clone()); + .update_vnode_bitmap(new_vnodes.clone()) + .await?; } self.inner.on_post_seal = false; + Ok(()) } } diff --git a/src/stream/src/common/log_store_impl/kv_log_store/writer.rs b/src/stream/src/common/log_store_impl/kv_log_store/writer.rs index b17bcf1f2410f..6aff569f93278 100644 --- a/src/stream/src/common/log_store_impl/kv_log_store/writer.rs +++ b/src/stream/src/common/log_store_impl/kv_log_store/writer.rs @@ -152,7 +152,9 @@ impl LogWriter for KvLogStoreWriter { Ok(LogWriterPostFlushCurrentEpoch::new( move |new_vnodes: Option>| { async move { - post_seal_epoch.post_yield_barrier(new_vnodes.clone()); + post_seal_epoch + .post_yield_barrier(new_vnodes.clone()) + .await?; if let Some(new_vnodes) = new_vnodes { tx.update_vnode(next_epoch, new_vnodes); } diff --git a/src/stream/src/common/table/state_table.rs b/src/stream/src/common/table/state_table.rs index 58a61eca87cbf..1f2f547e258f1 100644 --- a/src/stream/src/common/table/state_table.rs +++ b/src/stream/src/common/table/state_table.rs @@ -45,9 +45,8 @@ use risingwave_hummock_sdk::key::{ use risingwave_hummock_sdk::table_watermark::{ VnodeWatermark, WatermarkDirection, WatermarkSerdeType, }; -use risingwave_hummock_sdk::HummockReadEpoch; use risingwave_pb::catalog::Table; -use risingwave_storage::error::{ErrorKind, StorageError, StorageResult}; +use risingwave_storage::error::{ErrorKind, StorageError}; use risingwave_storage::hummock::CachePolicy; use risingwave_storage::mem_table::MemTableError; use risingwave_storage::row_serde::find_columns_by_ids; @@ -55,11 +54,7 @@ use risingwave_storage::row_serde::row_serde_util::{ deserialize_pk_with_vnode, serialize_pk, serialize_pk_with_vnode, }; use risingwave_storage::row_serde::value_serde::ValueRowSerde; -use risingwave_storage::store::{ - InitOptions, LocalStateStore, NewLocalOptions, OpConsistencyLevel, PrefetchOptions, - ReadLogOptions, ReadOptions, SealCurrentEpochOptions, StateStoreIter, StateStoreIterExt, - TryWaitEpochOptions, -}; +use risingwave_storage::store::*; use risingwave_storage::table::merge_sort::merge_sort; use risingwave_storage::table::{ deserialize_log_stream, ChangeLogRow, KeyedRow, TableDistribution, @@ -193,17 +188,6 @@ where Ok(()) } - async fn try_wait_committed_epoch(&self, prev_epoch: u64) -> StorageResult<()> { - self.store - .try_wait_epoch( - HummockReadEpoch::Committed(prev_epoch), - TryWaitEpochOptions { - table_id: self.table_id, - }, - ) - .await - } - pub fn state_store(&self) -> &S { &self.store } @@ -715,7 +699,6 @@ pub struct StateTablePostCommit< SD: ValueRowSerde, { inner: &'a mut StateTableInner, - barrier_epoch: EpochPair, } impl<'a, S, SD, const IS_REPLICATED: bool, const USE_WATERMARK_CACHE: bool> @@ -739,10 +722,8 @@ where > { self.inner.on_post_commit = false; Ok(if let Some(new_vnodes) = new_vnodes { - self.inner - .try_wait_committed_epoch(self.barrier_epoch.prev) - .await?; - let (old_vnodes, cache_may_stale) = self.update_vnode_bitmap(new_vnodes.clone()); + let (old_vnodes, cache_may_stale) = + self.update_vnode_bitmap(new_vnodes.clone()).await?; Some(((new_vnodes, old_vnodes, self.inner), cache_may_stale)) } else { None @@ -754,7 +735,10 @@ where } /// Update the vnode bitmap of the state table, returns the previous vnode bitmap. - fn update_vnode_bitmap(&mut self, new_vnodes: Arc) -> (Arc, bool) { + async fn update_vnode_bitmap( + &mut self, + new_vnodes: Arc, + ) -> StreamExecutorResult<(Arc, bool)> { assert!( !self.inner.is_dirty(), "vnode bitmap should only be updated when state table is clean" @@ -762,7 +746,8 @@ where let prev_vnodes = self .inner .local_store - .update_vnode_bitmap(new_vnodes.clone()); + .update_vnode_bitmap(new_vnodes.clone()) + .await?; assert_eq!( &prev_vnodes, self.inner.vnodes(), @@ -787,10 +772,10 @@ where } } - ( + Ok(( self.inner.distribution.update_vnode_bitmap(new_vnodes), cache_may_stale, - ) + )) } } @@ -1147,10 +1132,7 @@ where } self.on_post_commit = true; - Ok(StateTablePostCommit { - inner: self, - barrier_epoch: new_epoch, - }) + Ok(StateTablePostCommit { inner: self }) } /// Commit pending watermark and return vnode bitmap-watermark pairs to seal. diff --git a/src/stream/src/executor/sync_kv_log_store.rs b/src/stream/src/executor/sync_kv_log_store.rs index 584fd93ad430a..ee192ac4eb24b 100644 --- a/src/stream/src/executor/sync_kv_log_store.rs +++ b/src/stream/src/executor/sync_kv_log_store.rs @@ -316,7 +316,8 @@ impl SyncedKvLogStoreExecutor { let barrier_epoch = barrier.epoch; yield Message::Barrier(barrier); write_state_post_write_barrier - .post_yield_barrier(update_vnode_bitmap.clone()); + .post_yield_barrier(update_vnode_bitmap.clone()) + .await?; if let Some(vnode_bitmap) = update_vnode_bitmap { // Apply Vnode Update read_state.update_vnode_bitmap(vnode_bitmap);