From 3a282819177984f00c19a103507694c0dcfffa29 Mon Sep 17 00:00:00 2001 From: zwang28 <84491488@qq.com> Date: Tue, 14 Jan 2025 19:31:09 +0800 Subject: [PATCH] refactor: optimize delta log deletion --- src/common/src/config.rs | 7 --- src/config/example.toml | 1 - src/meta/node/src/lib.rs | 4 -- src/meta/src/hummock/manager/gc.rs | 76 ++++++++------------------- src/meta/src/hummock/manager/tests.rs | 15 ++---- src/meta/src/hummock/mod.rs | 2 +- src/meta/src/manager/env.rs | 2 - 7 files changed, 27 insertions(+), 80 deletions(-) diff --git a/src/common/src/config.rs b/src/common/src/config.rs index ed5b5835dd327..dec4a024a6f6b 100644 --- a/src/common/src/config.rs +++ b/src/common/src/config.rs @@ -533,9 +533,6 @@ pub struct MetaDeveloperConfig { #[serde(default = "default::developer::hummock_time_travel_sst_info_insert_batch_size")] pub hummock_time_travel_sst_info_insert_batch_size: usize, - #[serde(default = "default::developer::hummock_delta_log_delete_batch_size")] - pub hummock_delta_log_delete_batch_size: usize, - #[serde(default = "default::developer::time_travel_vacuum_interval_sec")] pub time_travel_vacuum_interval_sec: u64, @@ -2070,10 +2067,6 @@ pub mod default { 100 } - pub fn hummock_delta_log_delete_batch_size() -> usize { - 512 - } - pub fn time_travel_vacuum_interval_sec() -> u64 { 30 } diff --git a/src/config/example.toml b/src/config/example.toml index 24f78154ed00e..56c7cd1525734 100644 --- a/src/config/example.toml +++ b/src/config/example.toml @@ -93,7 +93,6 @@ meta_actor_cnt_per_worker_parallelism_soft_limit = 100 meta_actor_cnt_per_worker_parallelism_hard_limit = 400 meta_hummock_time_travel_sst_info_fetch_batch_size = 10000 meta_hummock_time_travel_sst_info_insert_batch_size = 100 -meta_hummock_delta_log_delete_batch_size = 512 meta_time_travel_vacuum_interval_sec = 30 meta_hummock_time_travel_epoch_version_insert_batch_size = 1000 meta_hummock_gc_history_insert_batch_size = 1000 diff --git a/src/meta/node/src/lib.rs b/src/meta/node/src/lib.rs index ac1e200c9367c..efbf36b0e386c 100644 --- a/src/meta/node/src/lib.rs +++ b/src/meta/node/src/lib.rs @@ -374,10 +374,6 @@ pub fn start( .meta .developer .hummock_time_travel_sst_info_insert_batch_size, - hummock_delta_log_delete_batch_size: config - .meta - .developer - .hummock_delta_log_delete_batch_size, hummock_time_travel_epoch_version_insert_batch_size: config .meta .developer diff --git a/src/meta/src/hummock/manager/gc.rs b/src/meta/src/hummock/manager/gc.rs index 334fd6fceead9..8649729a533a3 100644 --- a/src/meta/src/hummock/manager/gc.rs +++ b/src/meta/src/hummock/manager/gc.rs @@ -14,7 +14,6 @@ use std::cmp; use std::collections::HashSet; -use std::ops::DerefMut; use std::sync::atomic::{AtomicBool, Ordering}; use std::time::{Duration, SystemTime}; @@ -28,7 +27,7 @@ use risingwave_hummock_sdk::{ get_object_id_from_path, get_sst_data_path, HummockSstableObjectId, OBJECT_SUFFIX, }; use risingwave_meta_model::hummock_sequence::HUMMOCK_NOW; -use risingwave_meta_model::{hummock_gc_history, hummock_sequence}; +use risingwave_meta_model::{hummock_gc_history, hummock_sequence, hummock_version_delta}; use risingwave_meta_model_migration::OnConflict; use risingwave_object_store::object::{ObjectMetadataIter, ObjectStoreRef}; use risingwave_pb::stream_service::GetMinUncommittedSstIdRequest; @@ -37,10 +36,8 @@ use sea_orm::{ActiveValue, ColumnTrait, EntityTrait, QueryFilter, Set}; use crate::backup_restore::BackupManagerRef; use crate::hummock::error::{Error, Result}; -use crate::hummock::manager::commit_multi_var; use crate::hummock::HummockManager; use crate::manager::MetadataManager; -use crate::model::BTreeMapTransaction; use crate::MetaResult; pub(crate) struct GcManager { @@ -175,44 +172,37 @@ impl GcManager { } impl HummockManager { - /// Deletes at most `batch_size` deltas. + /// Deletes version deltas. /// - /// Returns (number of deleted deltas, number of remain `deltas_to_delete`). - pub async fn delete_version_deltas(&self, batch_size: usize) -> Result<(usize, usize)> { - let mut versioning_guard = self.versioning.write().await; - let versioning = versioning_guard.deref_mut(); - let context_info = self.context_info.read().await; - let deltas_to_delete_count = versioning - .hummock_version_deltas - .range(..=versioning.checkpoint.version.id) - .count(); + /// Returns number of deleted deltas + pub async fn delete_version_deltas(&self) -> Result { // If there is any safe point, skip this to ensure meta backup has required delta logs to // replay version. - if !context_info.version_safe_points.is_empty() { - return Ok((0, deltas_to_delete_count)); + if !self + .context_info + .read() + .await + .version_safe_points + .is_empty() + { + return Ok(0); } - let batch = versioning + let version_id = self.versioning.read().await.checkpoint.version.id; + let res = hummock_version_delta::Entity::delete_many() + .filter(hummock_version_delta::Column::Id.lte(version_id.to_u64())) + .exec(&self.env.meta_store_ref().conn) + .await?; + tracing::debug!(rows_affected = res.rows_affected, "Deleted version deltas"); + self.versioning + .write() + .await .hummock_version_deltas - .range(..=versioning.checkpoint.version.id) - .map(|(k, _)| *k) - .take(batch_size) - .collect_vec(); - let mut hummock_version_deltas = - BTreeMapTransaction::new(&mut versioning.hummock_version_deltas); - if batch.is_empty() { - return Ok((0, 0)); - } - for delta_id in &batch { - hummock_version_deltas.remove(*delta_id); - } - commit_multi_var!(self.meta_store_ref(), hummock_version_deltas)?; + .retain(|id, _| *id > version_id); #[cfg(test)] { - drop(context_info); - drop(versioning_guard); self.check_state_consistency().await; } - Ok((batch.len(), deltas_to_delete_count - batch.len())) + Ok(res.rows_affected as usize) } /// Filters by Hummock version and Writes GC history. @@ -464,26 +454,6 @@ impl HummockManager { Ok(()) } - /// Deletes stale Hummock metadata. - /// - /// Returns number of deleted deltas - pub async fn delete_metadata(&self) -> MetaResult { - let batch_size = self.env.opts.hummock_delta_log_delete_batch_size; - let mut total_deleted = 0; - loop { - if total_deleted != 0 && self.env.opts.vacuum_spin_interval_ms != 0 { - tokio::time::sleep(Duration::from_millis(self.env.opts.vacuum_spin_interval_ms)) - .await; - } - let (deleted, remain) = self.delete_version_deltas(batch_size).await?; - total_deleted += deleted; - if total_deleted == 0 || remain < batch_size { - break; - } - } - Ok(total_deleted) - } - pub async fn delete_time_travel_metadata(&self) -> MetaResult<()> { let current_epoch_time = Epoch::now().physical_time(); let epoch_watermark = Epoch::from_physical_time( diff --git a/src/meta/src/hummock/manager/tests.rs b/src/meta/src/hummock/manager/tests.rs index 046f0228012f0..9138e3fd1c062 100644 --- a/src/meta/src/hummock/manager/tests.rs +++ b/src/meta/src/hummock/manager/tests.rs @@ -566,23 +566,14 @@ async fn test_hummock_manager_basic() { init_version_id + commit_log_count + register_log_count, ); } - assert_eq!( - hummock_manager - .delete_version_deltas(usize::MAX) - .await - .unwrap(), - (0, 0) - ); + assert_eq!(hummock_manager.delete_version_deltas().await.unwrap(), 0); assert_eq!( hummock_manager.create_version_checkpoint(1).await.unwrap(), commit_log_count + register_log_count ); assert_eq!( - hummock_manager - .delete_version_deltas(usize::MAX) - .await - .unwrap(), - ((commit_log_count + register_log_count) as usize, 0) + hummock_manager.delete_version_deltas().await.unwrap(), + (commit_log_count + register_log_count) as usize ); hummock_manager .unpin_version_before(context_id_1, HummockVersionId::MAX) diff --git a/src/meta/src/hummock/mod.rs b/src/meta/src/hummock/mod.rs index db6c9e87d38e4..5c07ce76c911a 100644 --- a/src/meta/src/hummock/mod.rs +++ b/src/meta/src/hummock/mod.rs @@ -83,7 +83,7 @@ pub fn start_vacuum_metadata_loop( return; } } - if let Err(err) = hummock_manager.delete_metadata().await { + if let Err(err) = hummock_manager.delete_version_deltas().await { tracing::warn!(error = %err.as_report(), "Vacuum metadata error"); } } diff --git a/src/meta/src/manager/env.rs b/src/meta/src/manager/env.rs index eab30bcce222a..cac22a550e7e3 100644 --- a/src/meta/src/manager/env.rs +++ b/src/meta/src/manager/env.rs @@ -116,7 +116,6 @@ pub struct MetaOpts { pub hummock_time_travel_snapshot_interval: u64, pub hummock_time_travel_sst_info_fetch_batch_size: usize, pub hummock_time_travel_sst_info_insert_batch_size: usize, - pub hummock_delta_log_delete_batch_size: usize, pub hummock_time_travel_epoch_version_insert_batch_size: usize, pub hummock_gc_history_insert_batch_size: usize, pub hummock_time_travel_filter_out_objects_batch_size: usize, @@ -280,7 +279,6 @@ impl MetaOpts { hummock_time_travel_snapshot_interval: 0, hummock_time_travel_sst_info_fetch_batch_size: 10_000, hummock_time_travel_sst_info_insert_batch_size: 10, - hummock_delta_log_delete_batch_size: 1000, hummock_time_travel_epoch_version_insert_batch_size: 1000, hummock_gc_history_insert_batch_size: 1000, hummock_time_travel_filter_out_objects_batch_size: 1000,