Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(storage): wait committed epoch when update vnode bitmap #20492

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 8 additions & 2 deletions src/storage/hummock_test/src/hummock_storage_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2164,12 +2164,18 @@ async fn test_table_watermark() {
.init_for_test_with_prev_epoch(epoch1, prev_epoch)
.await
.unwrap();
local1.update_vnode_bitmap(vnode_bitmap1.clone());
local1
.update_vnode_bitmap(vnode_bitmap1.clone())
.await
.unwrap();
local2
.init_for_test_with_prev_epoch(epoch1, prev_epoch)
.await
.unwrap();
local2.update_vnode_bitmap(vnode_bitmap2.clone());
local2
.update_vnode_bitmap(vnode_bitmap2.clone())
.await
.unwrap();

fn gen_inner_key(index: usize) -> Bytes {
Bytes::copy_from_slice(format!("key_{:05}", index).as_bytes())
Expand Down
31 changes: 21 additions & 10 deletions src/storage/src/hummock/store/local_hummock_storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ use bytes::Bytes;
use risingwave_common::bitmap::Bitmap;
use risingwave_common::catalog::{TableId, TableOption};
use risingwave_common::hash::VirtualNode;
use risingwave_common::util::epoch::MAX_SPILL_TIMES;
use risingwave_common::util::epoch::{EpochPair, MAX_SPILL_TIMES};
use risingwave_hummock_sdk::key::{is_empty_key_range, vnode_range, TableKey, TableKeyRange};
use risingwave_hummock_sdk::sstable_info::SstableInfo;
use risingwave_hummock_sdk::table_watermark::WatermarkSerdeType;
Expand Down Expand Up @@ -62,7 +62,7 @@ pub struct LocalHummockStorage {
mem_table: MemTable,

spill_offset: u16,
epoch: Option<u64>,
epoch: Option<EpochPair>,

table_id: TableId,
op_consistency_level: OpConsistencyLevel,
Expand Down Expand Up @@ -477,7 +477,7 @@ impl LocalStateStore for LocalHummockStorage {
}

fn epoch(&self) -> u64 {
self.epoch.expect("should have set the epoch")
self.epoch.expect("should have set the epoch").curr
}

fn is_dirty(&self) -> bool {
Expand All @@ -487,8 +487,9 @@ impl LocalStateStore for LocalHummockStorage {
async fn init(&mut self, options: InitOptions) -> StorageResult<()> {
let epoch = options.epoch;
wait_for_epoch(&self.version_update_notifier_tx, epoch.prev, self.table_id).await?;
assert!(
self.epoch.replace(epoch.curr).is_none(),
assert_eq!(
self.epoch.replace(epoch),
None,
"local state store of table id {:?} is init for more than once",
self.table_id
);
Expand All @@ -511,10 +512,13 @@ impl LocalStateStore for LocalHummockStorage {
self.mem_table.op_consistency_level.update(new_level);
self.op_consistency_level.update(new_level);
}
let prev_epoch = self
let epoch = self
.epoch
.replace(next_epoch)
.as_mut()
.expect("should have init epoch before seal the first epoch");
let prev_epoch = epoch.curr;
epoch.prev = prev_epoch;
epoch.curr = next_epoch;
self.spill_offset = 0;
assert!(
next_epoch > prev_epoch,
Expand Down Expand Up @@ -553,12 +557,19 @@ impl LocalStateStore for LocalHummockStorage {
}
}

fn update_vnode_bitmap(&mut self, vnodes: Arc<Bitmap>) -> Arc<Bitmap> {
async fn update_vnode_bitmap(&mut self, vnodes: Arc<Bitmap>) -> StorageResult<Arc<Bitmap>> {
wait_for_epoch(
&self.version_update_notifier_tx,
self.epoch.expect("should have init").prev,
self.table_id,
)
.await?;
assert!(self.mem_table.buffer.is_empty());
let mut read_version = self.read_version.write();
assert!(read_version.staging().is_empty(), "There is uncommitted staging data in read version table_id {:?} instance_id {:?} on vnode bitmap update",
self.table_id(), self.instance_id()
self.table_id(), self.instance_id()
);
read_version.update_vnode_bitmap(vnodes)
Ok(read_version.update_vnode_bitmap(vnodes))
}
}

Expand Down
122 changes: 98 additions & 24 deletions src/storage/src/memory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,10 @@ use std::sync::{Arc, LazyLock};
use bytes::Bytes;
use itertools::Itertools;
use parking_lot::RwLock;
use risingwave_common::bitmap::Bitmap;
use risingwave_common::bitmap::{Bitmap, BitmapBuilder};
use risingwave_common::catalog::{TableId, TableOption};
use risingwave_common::hash::{VirtualNode, VnodeBitmapExt};
use risingwave_common::util::epoch::EpochPair;
use risingwave_hummock_sdk::key::{
prefixed_range_with_vnode, FullKey, TableKey, TableKeyRange, UserKey,
};
Expand Down Expand Up @@ -120,7 +121,6 @@ impl RangeKv for BTreeMapRangeKv {
pub mod sled {
use std::fs::create_dir_all;
use std::ops::RangeBounds;
use std::sync::Arc;

use bytes::Bytes;
use risingwave_hummock_sdk::key::FullKey;
Expand Down Expand Up @@ -256,14 +256,14 @@ pub mod sled {
pub fn new(path: impl AsRef<std::path::Path>) -> Self {
RangeKvStateStore {
inner: SledRangeKv::new(path),
table_next_epochs: Arc::new(Default::default()),
tables: Default::default(),
}
}

pub fn new_temp() -> Self {
RangeKvStateStore {
inner: SledRangeKv::new_temp(),
table_next_epochs: Arc::new(Default::default()),
tables: Default::default(),
}
}
}
Expand Down Expand Up @@ -514,6 +514,47 @@ mod batched_iter {

pub type MemoryStateStore = RangeKvStateStore<BTreeMapRangeKv>;

struct TableState {
init_epoch: u64,
next_epochs: BTreeMap<u64, u64>,
latest_sealed_epoch: Option<u64>,
sealing_epochs: BTreeMap<u64, BitmapBuilder>,
}

impl TableState {
fn new(init_epoch: u64) -> Self {
Self {
init_epoch,
next_epochs: Default::default(),
latest_sealed_epoch: None,
sealing_epochs: Default::default(),
}
}

async fn wait_epoch(
tables: &parking_lot::Mutex<HashMap<TableId, Self>>,
table_id: TableId,
epoch: u64,
) {
loop {
{
let tables = tables.lock();
let table_state = tables.get(&table_id).expect("should exist");
assert!(epoch >= table_state.init_epoch);
if epoch == table_state.init_epoch {
return;
}
if let Some(latest_sealed_epoch) = table_state.latest_sealed_epoch
&& latest_sealed_epoch >= epoch
{
return;
}
}
yield_now().await;
}
}
}

/// An in-memory state store
///
/// The in-memory state store is a [`BTreeMap`], which maps [`FullKey`] to value. It
Expand All @@ -524,7 +565,7 @@ pub struct RangeKvStateStore<R: RangeKv> {
/// Stores (key, epoch) -> user value.
inner: R,
/// `table_id` -> `prev_epoch` -> `curr_epoch`
table_next_epochs: Arc<parking_lot::Mutex<HashMap<TableId, BTreeMap<u64, u64>>>>,
tables: Arc<parking_lot::Mutex<HashMap<TableId, TableState>>>,
}

fn to_full_key_range<R, B>(table_id: TableId, table_key_range: R) -> BytesFullKeyRange
Expand Down Expand Up @@ -690,15 +731,15 @@ impl<R: RangeKv> StateStoreReadLog for RangeKvStateStore<R> {
async fn next_epoch(&self, epoch: u64, options: NextEpochOptions) -> StorageResult<u64> {
loop {
{
let table_next_epochs = self.table_next_epochs.lock();
let Some(table_next_epochs) = table_next_epochs.get(&options.table_id) else {
let tables = self.tables.lock();
let Some(tables) = tables.get(&options.table_id) else {
return Err(HummockError::next_epoch(format!(
"table {} not exist",
options.table_id
))
.into());
};
if let Some(next_epoch) = table_next_epochs.get(&epoch) {
if let Some(next_epoch) = tables.next_epochs.get(&epoch) {
break Ok(*next_epoch);
}
}
Expand Down Expand Up @@ -798,7 +839,7 @@ pub struct RangeKvLocalStateStore<R: RangeKv> {
mem_table: MemTable,
inner: RangeKvStateStore<R>,

epoch: Option<u64>,
epoch: Option<EpochPair>,

table_id: TableId,
op_consistency_level: OpConsistencyLevel,
Expand Down Expand Up @@ -966,7 +1007,7 @@ impl<R: RangeKv> LocalStateStore for RangeKvLocalStateStore<R> {
}

fn epoch(&self) -> u64 {
self.epoch.expect("should have set the epoch")
self.epoch.expect("should have set the epoch").curr
}

fn is_dirty(&self) -> bool {
Expand All @@ -975,17 +1016,20 @@ impl<R: RangeKv> LocalStateStore for RangeKvLocalStateStore<R> {

#[allow(clippy::unused_async)]
async fn init(&mut self, options: InitOptions) -> StorageResult<()> {
assert!(
self.epoch.replace(options.epoch.curr).is_none(),
assert_eq!(
self.epoch.replace(options.epoch),
None,
"epoch in local state store of table id {:?} is init for more than once",
self.table_id
);
self.inner
.table_next_epochs
.tables
.lock()
.entry(self.table_id)
.or_default()
.or_insert_with(|| TableState::new(options.epoch.prev))
.next_epochs
.insert(options.epoch.prev, options.epoch.curr);
TableState::wait_epoch(&self.inner.tables, self.table_id, options.epoch.prev).await;

Ok(())
}
Expand All @@ -995,23 +1039,47 @@ impl<R: RangeKv> LocalStateStore for RangeKvLocalStateStore<R> {
if let Some(value_checker) = opts.switch_op_consistency_level {
self.mem_table.op_consistency_level.update(&value_checker);
}
let prev_epoch = self
let epoch = self
.epoch
.replace(next_epoch)
.as_mut()
.expect("should have init epoch before seal the first epoch");
let prev_epoch = epoch.curr;
epoch.prev = prev_epoch;
epoch.curr = next_epoch;
assert!(
next_epoch > prev_epoch,
"new epoch {} should be greater than current epoch: {}",
next_epoch,
prev_epoch
);

self.inner
.table_next_epochs
.lock()
.entry(self.table_id)
.or_default()
.insert(prev_epoch, next_epoch);
let mut tables = self.inner.tables.lock();
let table_state = tables
.get_mut(&self.table_id)
.expect("should be set when init");

table_state.next_epochs.insert(prev_epoch, next_epoch);
let sealing_epoch_vnodes = table_state
.sealing_epochs
.entry(prev_epoch)
.or_insert_with(|| BitmapBuilder::zeroed(self.vnodes.len()));
assert_eq!(self.vnodes.len(), sealing_epoch_vnodes.len());
for vnode in self.vnodes.iter_ones() {
assert!(!sealing_epoch_vnodes.is_set(vnode));
sealing_epoch_vnodes.set(vnode, true);
}
if (0..self.vnodes.len()).all(|vnode| sealing_epoch_vnodes.is_set(vnode)) {
let (all_sealed_epoch, _) = table_state.sealing_epochs.pop_first().expect("non-empty");
assert_eq!(
all_sealed_epoch, prev_epoch,
"new all_sealed_epoch must be the current prev epoch"
);
if let Some(prev_latest_sealed_epoch) =
table_state.latest_sealed_epoch.replace(prev_epoch)
{
assert!(prev_epoch > prev_latest_sealed_epoch);
}
}

if let Some((direction, watermarks, _watermark_type)) = opts.table_watermarks {
let delete_ranges = watermarks
Expand Down Expand Up @@ -1052,8 +1120,14 @@ impl<R: RangeKv> LocalStateStore for RangeKvLocalStateStore<R> {
Ok(())
}

fn update_vnode_bitmap(&mut self, vnodes: Arc<Bitmap>) -> Arc<Bitmap> {
std::mem::replace(&mut self.vnodes, vnodes)
async fn update_vnode_bitmap(&mut self, vnodes: Arc<Bitmap>) -> StorageResult<Arc<Bitmap>> {
TableState::wait_epoch(
&self.inner.tables,
self.table_id,
self.epoch.expect("should have init").prev,
)
.await;
Ok(std::mem::replace(&mut self.vnodes, vnodes))
}

fn get_table_watermark(&self, _vnode: VirtualNode) -> Option<Bytes> {
Expand Down
4 changes: 2 additions & 2 deletions src/storage/src/monitor/monitored_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -323,8 +323,8 @@ impl<S: LocalStateStore> LocalStateStore for MonitoredStateStore<S> {
.verbose_instrument_await("store_try_flush")
}

fn update_vnode_bitmap(&mut self, vnodes: Arc<Bitmap>) -> Arc<Bitmap> {
self.inner.update_vnode_bitmap(vnodes)
async fn update_vnode_bitmap(&mut self, vnodes: Arc<Bitmap>) -> StorageResult<Arc<Bitmap>> {
self.inner.update_vnode_bitmap(vnodes).await
}

fn get_table_watermark(&self, vnode: VirtualNode) -> Option<Bytes> {
Expand Down
4 changes: 2 additions & 2 deletions src/storage/src/monitor/traced_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -258,8 +258,8 @@ impl<S: LocalStateStore> LocalStateStore for TracedStateStore<S> {
}

// TODO: add trace span
fn update_vnode_bitmap(&mut self, vnodes: Arc<Bitmap>) -> Arc<Bitmap> {
self.inner.update_vnode_bitmap(vnodes)
async fn update_vnode_bitmap(&mut self, vnodes: Arc<Bitmap>) -> StorageResult<Arc<Bitmap>> {
self.inner.update_vnode_bitmap(vnodes).await
}

fn get_table_watermark(&self, vnode: VirtualNode) -> Option<Bytes> {
Expand Down
2 changes: 1 addition & 1 deletion src/storage/src/panic_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,7 @@ impl LocalStateStore for PanicStateStore {
panic!("should not operate on the panic state store!");
}

fn update_vnode_bitmap(&mut self, _vnodes: Arc<Bitmap>) -> Arc<Bitmap> {
async fn update_vnode_bitmap(&mut self, _vnodes: Arc<Bitmap>) -> StorageResult<Arc<Bitmap>> {
panic!("should not operate on the panic state store!");
}

Expand Down
2 changes: 1 addition & 1 deletion src/storage/src/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -462,7 +462,7 @@ pub trait LocalStateStore: StaticSendSync {

// Updates the vnode bitmap corresponding to the local state store
// Returns the previous vnode bitmap
fn update_vnode_bitmap(&mut self, vnodes: Arc<Bitmap>) -> Arc<Bitmap>;
fn update_vnode_bitmap(&mut self, vnodes: Arc<Bitmap>) -> impl StorageFuture<'_, Arc<Bitmap>>;
}

/// If `prefetch` is true, prefetch will be enabled. Prefetching may increase the memory
Expand Down
Loading
Loading