Skip to content

Commit

Permalink
feat(storage): wait committed epoch when update vnode bitmap
Browse files Browse the repository at this point in the history
  • Loading branch information
wenym1 committed Feb 13, 2025
1 parent 7207714 commit ee10be7
Show file tree
Hide file tree
Showing 12 changed files with 72 additions and 65 deletions.
10 changes: 8 additions & 2 deletions src/storage/hummock_test/src/hummock_storage_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2164,12 +2164,18 @@ async fn test_table_watermark() {
.init_for_test_with_prev_epoch(epoch1, prev_epoch)
.await
.unwrap();
local1.update_vnode_bitmap(vnode_bitmap1.clone());
local1
.update_vnode_bitmap(vnode_bitmap1.clone())
.await
.unwrap();
local2
.init_for_test_with_prev_epoch(epoch1, prev_epoch)
.await
.unwrap();
local2.update_vnode_bitmap(vnode_bitmap2.clone());
local2
.update_vnode_bitmap(vnode_bitmap2.clone())
.await
.unwrap();

fn gen_inner_key(index: usize) -> Bytes {
Bytes::copy_from_slice(format!("key_{:05}", index).as_bytes())
Expand Down
31 changes: 21 additions & 10 deletions src/storage/src/hummock/store/local_hummock_storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ use bytes::Bytes;
use risingwave_common::bitmap::Bitmap;
use risingwave_common::catalog::{TableId, TableOption};
use risingwave_common::hash::VirtualNode;
use risingwave_common::util::epoch::MAX_SPILL_TIMES;
use risingwave_common::util::epoch::{EpochPair, MAX_SPILL_TIMES};
use risingwave_hummock_sdk::key::{is_empty_key_range, vnode_range, TableKey, TableKeyRange};
use risingwave_hummock_sdk::sstable_info::SstableInfo;
use risingwave_hummock_sdk::table_watermark::WatermarkSerdeType;
Expand Down Expand Up @@ -62,7 +62,7 @@ pub struct LocalHummockStorage {
mem_table: MemTable,

spill_offset: u16,
epoch: Option<u64>,
epoch: Option<EpochPair>,

table_id: TableId,
op_consistency_level: OpConsistencyLevel,
Expand Down Expand Up @@ -477,7 +477,7 @@ impl LocalStateStore for LocalHummockStorage {
}

fn epoch(&self) -> u64 {
self.epoch.expect("should have set the epoch")
self.epoch.expect("should have set the epoch").curr
}

fn is_dirty(&self) -> bool {
Expand All @@ -487,8 +487,9 @@ impl LocalStateStore for LocalHummockStorage {
async fn init(&mut self, options: InitOptions) -> StorageResult<()> {
let epoch = options.epoch;
wait_for_epoch(&self.version_update_notifier_tx, epoch.prev, self.table_id).await?;
assert!(
self.epoch.replace(epoch.curr).is_none(),
assert_eq!(
self.epoch.replace(epoch),
None,
"local state store of table id {:?} is init for more than once",
self.table_id
);
Expand All @@ -511,10 +512,13 @@ impl LocalStateStore for LocalHummockStorage {
self.mem_table.op_consistency_level.update(new_level);
self.op_consistency_level.update(new_level);
}
let prev_epoch = self
let epoch = self
.epoch
.replace(next_epoch)
.as_mut()
.expect("should have init epoch before seal the first epoch");
let prev_epoch = epoch.curr;
epoch.prev = prev_epoch;
epoch.curr = next_epoch;
self.spill_offset = 0;
assert!(
next_epoch > prev_epoch,
Expand Down Expand Up @@ -553,12 +557,19 @@ impl LocalStateStore for LocalHummockStorage {
}
}

fn update_vnode_bitmap(&mut self, vnodes: Arc<Bitmap>) -> Arc<Bitmap> {
async fn update_vnode_bitmap(&mut self, vnodes: Arc<Bitmap>) -> StorageResult<Arc<Bitmap>> {
wait_for_epoch(
&self.version_update_notifier_tx,
self.epoch.expect("should have init").prev,
self.table_id,
)
.await?;
assert!(self.mem_table.buffer.is_empty());
let mut read_version = self.read_version.write();
assert!(read_version.staging().is_empty(), "There is uncommitted staging data in read version table_id {:?} instance_id {:?} on vnode bitmap update",
self.table_id(), self.instance_id()
self.table_id(), self.instance_id()
);
read_version.update_vnode_bitmap(vnodes)
Ok(read_version.update_vnode_bitmap(vnodes))
}
}

Expand Down
4 changes: 2 additions & 2 deletions src/storage/src/memory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1052,8 +1052,8 @@ impl<R: RangeKv> LocalStateStore for RangeKvLocalStateStore<R> {
Ok(())
}

fn update_vnode_bitmap(&mut self, vnodes: Arc<Bitmap>) -> Arc<Bitmap> {
std::mem::replace(&mut self.vnodes, vnodes)
async fn update_vnode_bitmap(&mut self, vnodes: Arc<Bitmap>) -> StorageResult<Arc<Bitmap>> {
Ok(std::mem::replace(&mut self.vnodes, vnodes))
}

fn get_table_watermark(&self, _vnode: VirtualNode) -> Option<Bytes> {
Expand Down
4 changes: 2 additions & 2 deletions src/storage/src/monitor/monitored_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -323,8 +323,8 @@ impl<S: LocalStateStore> LocalStateStore for MonitoredStateStore<S> {
.verbose_instrument_await("store_try_flush")
}

fn update_vnode_bitmap(&mut self, vnodes: Arc<Bitmap>) -> Arc<Bitmap> {
self.inner.update_vnode_bitmap(vnodes)
async fn update_vnode_bitmap(&mut self, vnodes: Arc<Bitmap>) -> StorageResult<Arc<Bitmap>> {
self.inner.update_vnode_bitmap(vnodes).await
}

fn get_table_watermark(&self, vnode: VirtualNode) -> Option<Bytes> {
Expand Down
4 changes: 2 additions & 2 deletions src/storage/src/monitor/traced_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -258,8 +258,8 @@ impl<S: LocalStateStore> LocalStateStore for TracedStateStore<S> {
}

// TODO: add trace span
fn update_vnode_bitmap(&mut self, vnodes: Arc<Bitmap>) -> Arc<Bitmap> {
self.inner.update_vnode_bitmap(vnodes)
async fn update_vnode_bitmap(&mut self, vnodes: Arc<Bitmap>) -> StorageResult<Arc<Bitmap>> {
self.inner.update_vnode_bitmap(vnodes).await
}

fn get_table_watermark(&self, vnode: VirtualNode) -> Option<Bytes> {
Expand Down
2 changes: 1 addition & 1 deletion src/storage/src/panic_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,7 @@ impl LocalStateStore for PanicStateStore {
panic!("should not operate on the panic state store!");
}

fn update_vnode_bitmap(&mut self, _vnodes: Arc<Bitmap>) -> Arc<Bitmap> {
async fn update_vnode_bitmap(&mut self, _vnodes: Arc<Bitmap>) -> StorageResult<Arc<Bitmap>> {
panic!("should not operate on the panic state store!");
}

Expand Down
2 changes: 1 addition & 1 deletion src/storage/src/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -462,7 +462,7 @@ pub trait LocalStateStore: StaticSendSync {

// Updates the vnode bitmap corresponding to the local state store
// Returns the previous vnode bitmap
fn update_vnode_bitmap(&mut self, vnodes: Arc<Bitmap>) -> Arc<Bitmap>;
fn update_vnode_bitmap(&mut self, vnodes: Arc<Bitmap>) -> impl StorageFuture<'_, Arc<Bitmap>>;
}

/// If `prefetch` is true, prefetch will be enabled. Prefetching may increase the memory
Expand Down
18 changes: 9 additions & 9 deletions src/storage/src/store_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -539,12 +539,12 @@ pub mod verify {
ret
}

fn update_vnode_bitmap(&mut self, vnodes: Arc<Bitmap>) -> Arc<Bitmap> {
let ret = self.actual.update_vnode_bitmap(vnodes.clone());
async fn update_vnode_bitmap(&mut self, vnodes: Arc<Bitmap>) -> StorageResult<Arc<Bitmap>> {
let ret = self.actual.update_vnode_bitmap(vnodes.clone()).await?;
if let Some(expected) = &mut self.expected {
assert_eq!(ret, expected.update_vnode_bitmap(vnodes));
assert_eq!(ret, expected.update_vnode_bitmap(vnodes).await?);
}
ret
Ok(ret)
}

fn get_table_watermark(&self, vnode: VirtualNode) -> Option<Bytes> {
Expand Down Expand Up @@ -997,7 +997,7 @@ mod dyn_state_store {

fn seal_current_epoch(&mut self, next_epoch: u64, opts: SealCurrentEpochOptions);

fn update_vnode_bitmap(&mut self, vnodes: Arc<Bitmap>) -> Arc<Bitmap>;
async fn update_vnode_bitmap(&mut self, vnodes: Arc<Bitmap>) -> StorageResult<Arc<Bitmap>>;

fn get_table_watermark(&self, vnode: VirtualNode) -> Option<Bytes>;
}
Expand Down Expand Up @@ -1071,8 +1071,8 @@ mod dyn_state_store {
self.seal_current_epoch(next_epoch, opts)
}

fn update_vnode_bitmap(&mut self, vnodes: Arc<Bitmap>) -> Arc<Bitmap> {
self.update_vnode_bitmap(vnodes)
async fn update_vnode_bitmap(&mut self, vnodes: Arc<Bitmap>) -> StorageResult<Arc<Bitmap>> {
self.update_vnode_bitmap(vnodes).await
}

fn get_table_watermark(&self, vnode: VirtualNode) -> Option<Bytes> {
Expand Down Expand Up @@ -1159,8 +1159,8 @@ mod dyn_state_store {
(*self.0).seal_current_epoch(next_epoch, opts)
}

fn update_vnode_bitmap(&mut self, vnodes: Arc<Bitmap>) -> Arc<Bitmap> {
(*self.0).update_vnode_bitmap(vnodes)
async fn update_vnode_bitmap(&mut self, vnodes: Arc<Bitmap>) -> StorageResult<Arc<Bitmap>> {
(*self.0).update_vnode_bitmap(vnodes).await
}
}

Expand Down
11 changes: 8 additions & 3 deletions src/stream/src/common/log_store_impl/kv_log_store/state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -146,15 +146,20 @@ pub(crate) struct LogStorePostSealCurrentEpoch<'a, S: LocalStateStore> {
inner: &'a mut LogStoreWriteState<S>,
}

impl<S: LocalStateStore> LogStorePostSealCurrentEpoch<'_, S> {
pub(crate) fn post_yield_barrier(self, new_vnodes: Option<Arc<Bitmap>>) {
impl<'a, S: LocalStateStore> LogStorePostSealCurrentEpoch<'a, S> {
pub(crate) async fn post_yield_barrier(
self,
new_vnodes: Option<Arc<Bitmap>>,
) -> LogStoreResult<()> {
if let Some(new_vnodes) = new_vnodes {
self.inner.serde.update_vnode_bitmap(new_vnodes.clone());
self.inner
.state_store
.update_vnode_bitmap(new_vnodes.clone());
.update_vnode_bitmap(new_vnodes.clone())
.await?;
}
self.inner.on_post_seal = false;
Ok(())
}
}

Expand Down
4 changes: 3 additions & 1 deletion src/stream/src/common/log_store_impl/kv_log_store/writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,9 @@ impl<LS: LocalStateStore> LogWriter for KvLogStoreWriter<LS> {
Ok(LogWriterPostFlushCurrentEpoch::new(
move |new_vnodes: Option<Arc<Bitmap>>| {
async move {
post_seal_epoch.post_yield_barrier(new_vnodes.clone());
post_seal_epoch
.post_yield_barrier(new_vnodes.clone())
.await?;
if let Some(new_vnodes) = new_vnodes {
tx.update_vnode(next_epoch, new_vnodes);
}
Expand Down
44 changes: 13 additions & 31 deletions src/stream/src/common/table/state_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,21 +45,16 @@ use risingwave_hummock_sdk::key::{
use risingwave_hummock_sdk::table_watermark::{
VnodeWatermark, WatermarkDirection, WatermarkSerdeType,
};
use risingwave_hummock_sdk::HummockReadEpoch;
use risingwave_pb::catalog::Table;
use risingwave_storage::error::{ErrorKind, StorageError, StorageResult};
use risingwave_storage::error::{ErrorKind, StorageError};
use risingwave_storage::hummock::CachePolicy;
use risingwave_storage::mem_table::MemTableError;
use risingwave_storage::row_serde::find_columns_by_ids;
use risingwave_storage::row_serde::row_serde_util::{
deserialize_pk_with_vnode, serialize_pk, serialize_pk_with_vnode,
};
use risingwave_storage::row_serde::value_serde::ValueRowSerde;
use risingwave_storage::store::{
InitOptions, LocalStateStore, NewLocalOptions, OpConsistencyLevel, PrefetchOptions,
ReadLogOptions, ReadOptions, SealCurrentEpochOptions, StateStoreIter, StateStoreIterExt,
TryWaitEpochOptions,
};
use risingwave_storage::store::*;
use risingwave_storage::table::merge_sort::merge_sort;
use risingwave_storage::table::{
deserialize_log_stream, ChangeLogRow, KeyedRow, TableDistribution,
Expand Down Expand Up @@ -193,17 +188,6 @@ where
Ok(())
}

async fn try_wait_committed_epoch(&self, prev_epoch: u64) -> StorageResult<()> {
self.store
.try_wait_epoch(
HummockReadEpoch::Committed(prev_epoch),
TryWaitEpochOptions {
table_id: self.table_id,
},
)
.await
}

pub fn state_store(&self) -> &S {
&self.store
}
Expand Down Expand Up @@ -715,7 +699,6 @@ pub struct StateTablePostCommit<
SD: ValueRowSerde,
{
inner: &'a mut StateTableInner<S, SD, IS_REPLICATED, USE_WATERMARK_CACHE>,
barrier_epoch: EpochPair,
}

impl<'a, S, SD, const IS_REPLICATED: bool, const USE_WATERMARK_CACHE: bool>
Expand All @@ -739,10 +722,8 @@ where
> {
self.inner.on_post_commit = false;
Ok(if let Some(new_vnodes) = new_vnodes {
self.inner
.try_wait_committed_epoch(self.barrier_epoch.prev)
.await?;
let (old_vnodes, cache_may_stale) = self.update_vnode_bitmap(new_vnodes.clone());
let (old_vnodes, cache_may_stale) =
self.update_vnode_bitmap(new_vnodes.clone()).await?;
Some(((new_vnodes, old_vnodes, self.inner), cache_may_stale))
} else {
None
Expand All @@ -754,15 +735,19 @@ where
}

/// Update the vnode bitmap of the state table, returns the previous vnode bitmap.
fn update_vnode_bitmap(&mut self, new_vnodes: Arc<Bitmap>) -> (Arc<Bitmap>, bool) {
async fn update_vnode_bitmap(
&mut self,
new_vnodes: Arc<Bitmap>,
) -> StreamExecutorResult<(Arc<Bitmap>, bool)> {
assert!(
!self.inner.is_dirty(),
"vnode bitmap should only be updated when state table is clean"
);
let prev_vnodes = self
.inner
.local_store
.update_vnode_bitmap(new_vnodes.clone());
.update_vnode_bitmap(new_vnodes.clone())
.await?;
assert_eq!(
&prev_vnodes,
self.inner.vnodes(),
Expand All @@ -787,10 +772,10 @@ where
}
}

(
Ok((
self.inner.distribution.update_vnode_bitmap(new_vnodes),
cache_may_stale,
)
))
}
}

Expand Down Expand Up @@ -1147,10 +1132,7 @@ where
}

self.on_post_commit = true;
Ok(StateTablePostCommit {
inner: self,
barrier_epoch: new_epoch,
})
Ok(StateTablePostCommit { inner: self })
}

/// Commit pending watermark and return vnode bitmap-watermark pairs to seal.
Expand Down
3 changes: 2 additions & 1 deletion src/stream/src/executor/sync_kv_log_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -316,7 +316,8 @@ impl<S: StateStore> SyncedKvLogStoreExecutor<S> {
let barrier_epoch = barrier.epoch;
yield Message::Barrier(barrier);
write_state_post_write_barrier
.post_yield_barrier(update_vnode_bitmap.clone());
.post_yield_barrier(update_vnode_bitmap.clone())
.await?;
if let Some(vnode_bitmap) = update_vnode_bitmap {
// Apply Vnode Update
read_state.update_vnode_bitmap(vnode_bitmap);
Expand Down

0 comments on commit ee10be7

Please sign in to comment.