diff --git a/src/storage/hummock_sdk/src/change_log.rs b/src/storage/hummock_sdk/src/change_log.rs index 7976adbc1adb5..9bb4505fb808c 100644 --- a/src/storage/hummock_sdk/src/change_log.rs +++ b/src/storage/hummock_sdk/src/change_log.rs @@ -58,6 +58,16 @@ impl TableChangeLogCommon { .flat_map(|epoch_change_log| epoch_change_log.epochs.iter()) .cloned() } + + pub(crate) fn change_log_into_iter(self) -> impl Iterator> { + self.0.into_iter() + } + + pub(crate) fn change_log_iter_mut( + &mut self, + ) -> impl Iterator> { + self.0.iter_mut() + } } pub type TableChangeLog = TableChangeLogCommon; diff --git a/src/storage/hummock_sdk/src/compact_task.rs b/src/storage/hummock_sdk/src/compact_task.rs index 3dd5187936540..d173f6d252725 100644 --- a/src/storage/hummock_sdk/src/compact_task.rs +++ b/src/storage/hummock_sdk/src/compact_task.rs @@ -115,8 +115,8 @@ impl CompactTask { } impl From for CompactTask { - #[expect(deprecated)] fn from(pb_compact_task: PbCompactTask) -> Self { + #[expect(deprecated)] Self { input_ssts: pb_compact_task .input_ssts @@ -168,8 +168,8 @@ impl From for CompactTask { } impl From<&PbCompactTask> for CompactTask { - #[expect(deprecated)] fn from(pb_compact_task: &PbCompactTask) -> Self { + #[expect(deprecated)] Self { input_ssts: pb_compact_task .input_ssts @@ -221,8 +221,8 @@ impl From<&PbCompactTask> for CompactTask { } impl From for PbCompactTask { - #[expect(deprecated)] fn from(compact_task: CompactTask) -> Self { + #[expect(deprecated)] Self { input_ssts: compact_task .input_ssts @@ -272,8 +272,8 @@ impl From for PbCompactTask { } impl From<&CompactTask> for PbCompactTask { - #[expect(deprecated)] fn from(compact_task: &CompactTask) -> Self { + #[expect(deprecated)] Self { input_ssts: compact_task .input_ssts diff --git a/src/storage/hummock_sdk/src/compaction_group/hummock_version_ext.rs b/src/storage/hummock_sdk/src/compaction_group/hummock_version_ext.rs index eb4bb30e69dc3..8ea85ec57dbe3 100644 --- a/src/storage/hummock_sdk/src/compaction_group/hummock_version_ext.rs +++ b/src/storage/hummock_sdk/src/compaction_group/hummock_version_ext.rs @@ -36,7 +36,7 @@ use crate::level::{Level, LevelCommon, Levels, OverlappingLevel}; use crate::sstable_info::SstableInfo; use crate::table_watermark::{ReadTableWatermark, TableWatermarks}; use crate::version::{ - GroupDelta, GroupDeltaCommon, HummockVersion, HummockVersionCommon, HummockVersionDelta, + GroupDelta, GroupDeltaCommon, HummockVersion, HummockVersionCommon, HummockVersionDeltaCommon, HummockVersionStateTableInfo, IntraLevelDelta, IntraLevelDeltaCommon, ObjectIdReader, SstableIdReader, }; @@ -50,7 +50,7 @@ pub struct SstDeltaInfo { pub type BranchedSstInfo = HashMap>; -impl HummockVersion { +impl HummockVersionCommon { pub fn get_compaction_group_levels(&self, compaction_group_id: CompactionGroupId) -> &Levels { self.levels .get(&compaction_group_id) @@ -187,7 +187,7 @@ pub fn safe_epoch_read_table_watermarks_impl( .collect() } -impl HummockVersion { +impl HummockVersionCommon { pub fn count_new_ssts_in_group_split( &self, parent_group_id: CompactionGroupId, @@ -356,7 +356,10 @@ impl HummockVersion { .all(|level| !level.table_infos.is_empty())); } - pub fn build_sst_delta_infos(&self, version_delta: &HummockVersionDelta) -> Vec { + pub fn build_sst_delta_infos( + &self, + version_delta: &HummockVersionDeltaCommon, + ) -> Vec { let mut infos = vec![]; // Skip trivial move delta for refiller @@ -459,7 +462,10 @@ impl HummockVersion { infos } - pub fn apply_version_delta(&mut self, version_delta: &HummockVersionDelta) { + pub fn apply_version_delta( + &mut self, + version_delta: &HummockVersionDeltaCommon, + ) { assert_eq!(self.id, version_delta.prev_id); let (changed_table_info, mut is_commit_epoch) = self.state_table_info.apply_delta( @@ -934,12 +940,6 @@ impl HummockVersionCommon where T: SstableIdReader + ObjectIdReader, { - pub fn get_combined_levels(&self) -> impl Iterator> + '_ { - self.levels - .values() - .flat_map(|level| level.l0.sub_levels.iter().rev().chain(level.levels.iter())) - } - pub fn get_object_ids(&self) -> HashSet { self.get_sst_infos().map(|s| s.object_id()).collect() } @@ -1094,6 +1094,14 @@ impl Levels { } } +impl HummockVersionCommon { + pub fn get_combined_levels(&self) -> impl Iterator> + '_ { + self.levels + .values() + .flat_map(|level| level.l0.sub_levels.iter().rev().chain(level.levels.iter())) + } +} + pub fn build_initial_compaction_group_levels( group_id: CompactionGroupId, compaction_config: &CompactionConfig, diff --git a/src/storage/hummock_sdk/src/version.rs b/src/storage/hummock_sdk/src/version.rs index 4c90e6cae47f1..84cba3804a96c 100644 --- a/src/storage/hummock_sdk/src/version.rs +++ b/src/storage/hummock_sdk/src/version.rs @@ -30,7 +30,9 @@ use risingwave_pb::hummock::{ }; use tracing::warn; -use crate::change_log::{ChangeLogDeltaCommon, TableChangeLogCommon}; +use crate::change_log::{ + ChangeLogDeltaCommon, EpochNewChangeLogCommon, TableChangeLog, TableChangeLogCommon, +}; use crate::compaction_group::hummock_version_ext::build_initial_compaction_group_levels; use crate::compaction_group::StaticCompactionGroupId; use crate::level::LevelsCommon; @@ -217,18 +219,20 @@ impl HummockVersionStateTableInfo { } #[derive(Debug, Clone, PartialEq)] -pub struct HummockVersionCommon { +pub struct HummockVersionCommon { pub id: HummockVersionId, pub levels: HashMap>, #[deprecated] pub(crate) max_committed_epoch: u64, pub table_watermarks: HashMap>, - pub table_change_log: HashMap>, + pub table_change_log: HashMap>, pub state_table_info: HummockVersionStateTableInfo, } pub type HummockVersion = HummockVersionCommon; +pub type LocalHummockVersion = HummockVersionCommon; + impl Default for HummockVersion { fn default() -> Self { HummockVersion::from(&PbHummockVersion::default()) @@ -433,13 +437,6 @@ impl HummockVersion { } } - pub fn table_committed_epoch(&self, table_id: TableId) -> Option { - self.state_table_info - .info() - .get(&table_id) - .map(|info| info.committed_epoch) - } - pub fn create_init_version(default_compaction_config: Arc) -> HummockVersion { #[expect(deprecated)] let mut init_version = HummockVersion { @@ -476,10 +473,41 @@ impl HummockVersion { state_table_info_delta: Default::default(), } } + + pub fn split_change_log(mut self) -> (LocalHummockVersion, HashMap) { + let table_change_log = { + let mut table_change_log = HashMap::new(); + for (table_id, log) in &mut self.table_change_log { + let change_log_iter = + log.change_log_iter_mut() + .map(|item| EpochNewChangeLogCommon { + new_value: std::mem::take(&mut item.new_value), + old_value: std::mem::take(&mut item.old_value), + epochs: item.epochs.clone(), + }); + table_change_log.insert(*table_id, TableChangeLogCommon::new(change_log_iter)); + } + + table_change_log + }; + + let local_version = LocalHummockVersion::from(self); + + (local_version, table_change_log) + } +} + +impl HummockVersionCommon { + pub fn table_committed_epoch(&self, table_id: TableId) -> Option { + self.state_table_info + .info() + .get(&table_id) + .map(|info| info.committed_epoch) + } } #[derive(Debug, PartialEq, Clone)] -pub struct HummockVersionDeltaCommon { +pub struct HummockVersionDeltaCommon { pub id: HummockVersionId, pub prev_id: HummockVersionId, pub group_deltas: HashMap>, @@ -488,12 +516,14 @@ pub struct HummockVersionDeltaCommon { pub trivial_move: bool, pub new_table_watermarks: HashMap, pub removed_table_ids: HashSet, - pub change_log_delta: HashMap>, + pub change_log_delta: HashMap>, pub state_table_info_delta: HashMap, } pub type HummockVersionDelta = HummockVersionDeltaCommon; +pub type LocalHummockVersionDelta = HummockVersionDeltaCommon; + impl Default for HummockVersionDelta { fn default() -> Self { HummockVersionDelta::from(&PbHummockVersionDelta::default()) @@ -1095,3 +1125,64 @@ where self.into() } } + +impl From for LocalHummockVersionDelta { + #[expect(deprecated)] + fn from(delta: HummockVersionDelta) -> Self { + Self { + id: delta.id, + prev_id: delta.prev_id, + group_deltas: delta.group_deltas, + max_committed_epoch: delta.max_committed_epoch, + trivial_move: delta.trivial_move, + new_table_watermarks: delta.new_table_watermarks, + removed_table_ids: delta.removed_table_ids, + change_log_delta: delta + .change_log_delta + .into_iter() + .map(|(k, v)| { + ( + k, + ChangeLogDeltaCommon { + truncate_epoch: v.truncate_epoch, + new_log: EpochNewChangeLogCommon { + epochs: v.new_log.epochs, + new_value: Vec::new(), + old_value: Vec::new(), + }, + }, + ) + }) + .collect(), + state_table_info_delta: delta.state_table_info_delta, + } + } +} + +impl From for LocalHummockVersion { + #[expect(deprecated)] + fn from(version: HummockVersion) -> Self { + Self { + id: version.id, + levels: version.levels, + max_committed_epoch: version.max_committed_epoch, + table_watermarks: version.table_watermarks, + table_change_log: version + .table_change_log + .into_iter() + .map(|(k, v)| { + let epoch_new_change_logs: Vec> = v + .change_log_into_iter() + .map(|epoch_new_change_log| EpochNewChangeLogCommon { + epochs: epoch_new_change_log.epochs, + new_value: Vec::new(), + old_value: Vec::new(), + }) + .collect(); + (k, TableChangeLogCommon::new(epoch_new_change_logs)) + }) + .collect(), + state_table_info: version.state_table_info, + } + } +} diff --git a/src/storage/src/hummock/event_handler/hummock_event_handler.rs b/src/storage/src/hummock/event_handler/hummock_event_handler.rs index fb914b226d351..b584b1f55e5be 100644 --- a/src/storage/src/hummock/event_handler/hummock_event_handler.rs +++ b/src/storage/src/hummock/event_handler/hummock_event_handler.rs @@ -28,6 +28,8 @@ use prometheus::{Histogram, IntGauge}; use risingwave_common::catalog::TableId; use risingwave_common::metrics::UintGauge; use risingwave_hummock_sdk::compaction_group::hummock_version_ext::SstDeltaInfo; +use risingwave_hummock_sdk::sstable_info::SstableInfo; +use risingwave_hummock_sdk::version::{HummockVersionCommon, LocalHummockVersionDelta}; use risingwave_hummock_sdk::{HummockEpoch, SyncResult}; use tokio::spawn; use tokio::sync::mpsc::error::SendError; @@ -508,7 +510,7 @@ impl HummockEventHandler { let mut sst_delta_infos = vec![]; if let Some(new_pinned_version) = Self::resolve_version_update_info( - pinned_version.clone(), + &pinned_version, version_payload, Some(&mut sst_delta_infos), ) { @@ -518,31 +520,57 @@ impl HummockEventHandler { } fn resolve_version_update_info( - pinned_version: PinnedVersion, + pinned_version: &PinnedVersion, version_payload: HummockVersionUpdate, mut sst_delta_infos: Option<&mut Vec>, ) -> Option { - let newly_pinned_version = match version_payload { + match version_payload { HummockVersionUpdate::VersionDeltas(version_deltas) => { - let mut version_to_apply = (*pinned_version).clone(); - for version_delta in &version_deltas { - assert_eq!(version_to_apply.id, version_delta.prev_id); - if let Some(sst_delta_infos) = &mut sst_delta_infos { - sst_delta_infos.extend( - version_to_apply - .build_sst_delta_infos(version_delta) - .into_iter(), - ); + let mut version_to_apply = (**pinned_version).clone(); + { + let mut table_change_log_to_apply_guard = + pinned_version.table_change_log_write_lock(); + for version_delta in version_deltas { + assert_eq!(version_to_apply.id, version_delta.prev_id); + + // apply change-log-delta + { + let mut state_table_info = version_to_apply.state_table_info.clone(); + let (changed_table_info, _is_commit_epoch) = state_table_info + .apply_delta( + &version_delta.state_table_info_delta, + &version_delta.removed_table_ids, + ); + + HummockVersionCommon::::apply_change_log_delta( + &mut *table_change_log_to_apply_guard, + &version_delta.change_log_delta, + &version_delta.removed_table_ids, + &version_delta.state_table_info_delta, + &changed_table_info, + ); + } + + let local_hummock_version_delta = + LocalHummockVersionDelta::from(version_delta); + if let Some(sst_delta_infos) = &mut sst_delta_infos { + sst_delta_infos.extend( + version_to_apply + .build_sst_delta_infos(&local_hummock_version_delta) + .into_iter(), + ); + } + + version_to_apply.apply_version_delta(&local_hummock_version_delta); } - version_to_apply.apply_version_delta(version_delta); } - version_to_apply + pinned_version.new_with_local_version(version_to_apply) } - HummockVersionUpdate::PinnedVersion(version) => *version, - }; - - pinned_version.new_pin_version(newly_pinned_version) + HummockVersionUpdate::PinnedVersion(version) => { + pinned_version.new_pin_version(*version) + } + } } fn apply_version_update( diff --git a/src/storage/src/hummock/local_version/pinned_version.rs b/src/storage/src/hummock/local_version/pinned_version.rs index 25e25938398f3..082879be992ee 100644 --- a/src/storage/src/hummock/local_version/pinned_version.rs +++ b/src/storage/src/hummock/local_version/pinned_version.rs @@ -12,16 +12,19 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::collections::BTreeMap; +use std::collections::{BTreeMap, HashMap}; use std::iter::empty; use std::ops::Deref; use std::sync::Arc; use std::time::{Duration, Instant}; use auto_enums::auto_enum; +use parking_lot::RwLock; use risingwave_common::catalog::TableId; +use risingwave_hummock_sdk::change_log::TableChangeLogCommon; use risingwave_hummock_sdk::level::{Level, Levels}; -use risingwave_hummock_sdk::version::HummockVersion; +use risingwave_hummock_sdk::sstable_info::SstableInfo; +use risingwave_hummock_sdk::version::{HummockVersion, LocalHummockVersion}; use risingwave_hummock_sdk::{CompactionGroupId, HummockVersionId, INVALID_VERSION_ID}; use risingwave_rpc_client::HummockMetaClient; use thiserror_ext::AsReport; @@ -74,12 +77,13 @@ impl Drop for PinnedVersionGuard { #[derive(Clone)] pub struct PinnedVersion { - version: Arc, + version: Arc, guard: Arc, + table_change_log: Arc>>>, } impl Deref for PinnedVersion { - type Target = HummockVersion; + type Target = LocalHummockVersion; fn deref(&self) -> &Self::Target { &self.version @@ -92,12 +96,14 @@ impl PinnedVersion { pinned_version_manager_tx: UnboundedSender, ) -> Self { let version_id = version.id; + let (local_version, table_id_to_change_logs) = version.split_change_log(); PinnedVersion { - version: Arc::new(version), guard: Arc::new(PinnedVersionGuard::new( version_id, pinned_version_manager_tx, )), + table_change_log: Arc::new(RwLock::new(table_id_to_change_logs)), + version: Arc::new(local_version), } } @@ -111,14 +117,39 @@ impl PinnedVersion { if version.id == self.version.id { return None; } + let version_id = version.id; + let (local_version, table_id_to_change_logs) = version.split_change_log(); + Some(PinnedVersion { + guard: Arc::new(PinnedVersionGuard::new( + version_id, + self.guard.pinned_version_manager_tx.clone(), + )), + table_change_log: Arc::new(RwLock::new(table_id_to_change_logs)), + version: Arc::new(local_version), + }) + } + + /// Create a new `PinnedVersion` with the given `LocalHummockVersion`. Referring to the usage in the `hummock_event_handler`. + pub fn new_with_local_version(&self, version: LocalHummockVersion) -> Option { + assert!( + version.id >= self.version.id, + "pinning a older version {}. Current is {}", + version.id, + self.version.id + ); + if version.id == self.version.id { + return None; + } + let version_id = version.id; Some(PinnedVersion { - version: Arc::new(version), guard: Arc::new(PinnedVersionGuard::new( version_id, self.guard.pinned_version_manager_tx.clone(), )), + table_change_log: self.table_change_log.clone(), + version: Arc::new(version), }) } @@ -159,6 +190,19 @@ impl PinnedVersion { None => empty(), } } + + pub fn table_change_log_read_lock( + &self, + ) -> parking_lot::RwLockReadGuard<'_, HashMap>> { + self.table_change_log.read() + } + + pub fn table_change_log_write_lock( + &self, + ) -> parking_lot::RwLockWriteGuard<'_, HashMap>> + { + self.table_change_log.write() + } } pub(crate) async fn start_pinned_version_worker( diff --git a/src/storage/src/hummock/store/hummock_storage.rs b/src/storage/src/hummock/store/hummock_storage.rs index 88133f45a56b0..6b231f045e56a 100644 --- a/src/storage/src/hummock/store/hummock_storage.rs +++ b/src/storage/src/hummock/store/hummock_storage.rs @@ -28,7 +28,7 @@ use risingwave_hummock_sdk::key::{ }; use risingwave_hummock_sdk::sstable_info::SstableInfo; use risingwave_hummock_sdk::table_watermark::TableWatermarksIndex; -use risingwave_hummock_sdk::version::HummockVersion; +use risingwave_hummock_sdk::version::{HummockVersion, LocalHummockVersion}; use risingwave_hummock_sdk::{HummockReadEpoch, HummockSstableObjectId, SyncResult}; use risingwave_rpc_client::HummockMetaClient; use thiserror_ext::AsReport; @@ -641,7 +641,7 @@ impl StateStoreReadLog for HummockStorage { async fn next_epoch(&self, epoch: u64, options: NextEpochOptions) -> StorageResult { fn next_epoch( - version: &HummockVersion, + version: &LocalHummockVersion, epoch: u64, table_id: TableId, ) -> HummockResult> { diff --git a/src/storage/src/hummock/store/version.rs b/src/storage/src/hummock/store/version.rs index 8e0c7a589b203..acf7c6503fb7f 100644 --- a/src/storage/src/hummock/store/version.rs +++ b/src/storage/src/hummock/store/version.rs @@ -992,11 +992,15 @@ impl HummockVersionReader { key_range: TableKeyRange, options: ReadLogOptions, ) -> HummockResult { - let change_log = if let Some(change_log) = version.table_change_log.get(&options.table_id) { - change_log.filter_epoch(epoch_range).collect_vec() - } else { - Vec::new() + let change_log = { + let table_change_logs = version.table_change_log_read_lock(); + if let Some(change_log) = table_change_logs.get(&options.table_id) { + change_log.filter_epoch(epoch_range).cloned().collect_vec() + } else { + Vec::new() + } }; + if let Some(max_epoch_change_log) = change_log.last() { let (_, max_epoch) = epoch_range; if !max_epoch_change_log.epochs.contains(&max_epoch) {