diff --git a/src/mito2/src/compaction.rs b/src/mito2/src/compaction.rs index 5236e0d616dc..7fdd32aa2721 100644 --- a/src/mito2/src/compaction.rs +++ b/src/mito2/src/compaction.rs @@ -44,7 +44,7 @@ use tokio::sync::mpsc::{self, Sender}; use crate::access_layer::AccessLayerRef; use crate::cache::CacheManagerRef; -use crate::compaction::compactor::{CompactionRegion, DefaultCompactor}; +use crate::compaction::compactor::{CompactionRegion, CompactionVersion, DefaultCompactor}; use crate::compaction::picker::{new_picker, CompactionTask}; use crate::compaction::task::CompactionTaskImpl; use crate::config::MitoConfig; @@ -59,7 +59,7 @@ use crate::read::scan_region::ScanInput; use crate::read::seq_scan::SeqScan; use crate::read::BoxedBatchReader; use crate::region::options::MergeMode; -use crate::region::version::{VersionControlRef, VersionRef}; +use crate::region::version::VersionControlRef; use crate::region::ManifestContextRef; use crate::request::{OptionOutputTx, OutputTx, WorkerRequest}; use crate::schedule::remote_job_scheduler::{ @@ -73,7 +73,7 @@ use crate::worker::WorkerListener; /// Region compaction request. pub struct CompactionRequest { pub(crate) engine_config: Arc, - pub(crate) current_version: VersionRef, + pub(crate) current_version: CompactionVersion, pub(crate) access_layer: AccessLayerRef, /// Sender to send notification to the region worker. pub(crate) request_sender: mpsc::Sender, @@ -522,7 +522,7 @@ impl CompactionStatus { listener: WorkerListener, schema_metadata_manager: SchemaMetadataManagerRef, ) -> CompactionRequest { - let current_version = self.version_control.current().version; + let current_version = CompactionVersion::from(self.version_control.current().version); let start_time = Instant::now(); let mut req = CompactionRequest { engine_config, diff --git a/src/mito2/src/compaction/compactor.rs b/src/mito2/src/compaction/compactor.rs index 91ab34c961cf..e2499140fd61 100644 --- a/src/mito2/src/compaction/compactor.rs +++ b/src/mito2/src/compaction/compactor.rs @@ -35,12 +35,10 @@ use crate::error::{EmptyRegionDirSnafu, JoinSnafu, ObjectStoreNotFoundSnafu, Res use crate::manifest::action::{RegionEdit, RegionMetaAction, RegionMetaActionList}; use crate::manifest::manager::{RegionManifestManager, RegionManifestOptions}; use crate::manifest::storage::manifest_compress_type; -use crate::memtable::time_partition::TimePartitions; -use crate::memtable::MemtableBuilderProvider; use crate::read::Source; use crate::region::opener::new_manifest_dir; use crate::region::options::RegionOptions; -use crate::region::version::{VersionBuilder, VersionRef}; +use crate::region::version::VersionRef; use crate::region::{ManifestContext, RegionLeaderState, RegionRoleState}; use crate::schedule::scheduler::LocalScheduler; use crate::sst::file::{FileMeta, IndexType}; @@ -48,6 +46,34 @@ use crate::sst::file_purger::LocalFilePurger; use crate::sst::index::intermediate::IntermediateManager; use crate::sst::index::puffin_manager::PuffinManagerFactory; use crate::sst::parquet::WriteOptions; +use crate::sst::version::{SstVersion, SstVersionRef}; + +/// Region version for compaction that does not hold memtables. +#[derive(Clone)] +pub struct CompactionVersion { + /// Metadata of the region. + /// + /// Altering metadata isn't frequent, storing metadata in Arc to allow sharing + /// metadata and reuse metadata when creating a new `Version`. + pub(crate) metadata: RegionMetadataRef, + /// Options of the region. + pub(crate) options: RegionOptions, + /// SSTs of the region. + pub(crate) ssts: SstVersionRef, + /// Inferred compaction time window. + pub(crate) compaction_time_window: Option, +} + +impl From for CompactionVersion { + fn from(value: VersionRef) -> Self { + Self { + metadata: value.metadata.clone(), + options: value.options.clone(), + ssts: value.ssts.clone(), + compaction_time_window: value.compaction_time_window, + } + } +} /// CompactionRegion represents a region that needs to be compacted. /// It's the subset of MitoRegion. @@ -62,7 +88,7 @@ pub struct CompactionRegion { pub(crate) cache_manager: CacheManagerRef, pub(crate) access_layer: AccessLayerRef, pub(crate) manifest_ctx: Arc, - pub(crate) current_version: VersionRef, + pub(crate) current_version: CompactionVersion, pub(crate) file_purger: Option>, pub(crate) ttl: Option, } @@ -147,30 +173,14 @@ pub async fn open_compaction_region( }; let current_version = { - let memtable_builder = MemtableBuilderProvider::new(None, Arc::new(mito_config.clone())) - .builder_for_options( - req.region_options.memtable.as_ref(), - req.region_options.need_dedup(), - req.region_options.merge_mode(), - ); - - // Initial memtable id is 0. - let mutable = Arc::new(TimePartitions::new( - region_metadata.clone(), - memtable_builder.clone(), - 0, - req.region_options.compaction.time_window(), - )); - - let version = VersionBuilder::new(region_metadata.clone(), mutable) - .add_files(file_purger.clone(), manifest.files.values().cloned()) - .flushed_entry_id(manifest.flushed_entry_id) - .flushed_sequence(manifest.flushed_sequence) - .truncated_entry_id(manifest.truncated_entry_id) - .compaction_time_window(manifest.compaction_time_window) - .options(req.region_options.clone()) - .build(); - Arc::new(version) + let mut ssts = SstVersion::new(); + ssts.add_files(file_purger.clone(), manifest.files.values().cloned()); + CompactionVersion { + metadata: region_metadata.clone(), + options: req.region_options.clone(), + ssts: Arc::new(ssts), + compaction_time_window: manifest.compaction_time_window, + } }; let ttl = find_ttl( diff --git a/src/mito2/src/compaction/window.rs b/src/mito2/src/compaction/window.rs index f16b8e4c95d3..10bdb47297d5 100644 --- a/src/mito2/src/compaction/window.rs +++ b/src/mito2/src/compaction/window.rs @@ -23,10 +23,9 @@ use common_time::Timestamp; use store_api::storage::RegionId; use crate::compaction::buckets::infer_time_bucket; -use crate::compaction::compactor::CompactionRegion; +use crate::compaction::compactor::{CompactionRegion, CompactionVersion}; use crate::compaction::picker::{Picker, PickerOutput}; use crate::compaction::{get_expired_ssts, CompactionOutput}; -use crate::region::version::VersionRef; use crate::sst::file::{FileHandle, FileId}; /// Compaction picker that splits the time range of all involved files to windows, and merges @@ -48,7 +47,11 @@ impl WindowedCompactionPicker { // use persisted window. If persist window is not present, we check the time window // provided while creating table. If all of those are absent, we infer the window // from files in level0. - fn calculate_time_window(&self, region_id: RegionId, current_version: &VersionRef) -> i64 { + fn calculate_time_window( + &self, + region_id: RegionId, + current_version: &CompactionVersion, + ) -> i64 { self.compaction_time_window_seconds .or(current_version .compaction_time_window @@ -67,7 +70,7 @@ impl WindowedCompactionPicker { fn pick_inner( &self, region_id: RegionId, - current_version: &VersionRef, + current_version: &CompactionVersion, current_time: Timestamp, ) -> (Vec, Vec, i64) { let time_window = self.calculate_time_window(region_id, current_version); @@ -205,28 +208,19 @@ mod tests { use common_time::Timestamp; use store_api::storage::RegionId; + use crate::compaction::compactor::CompactionVersion; use crate::compaction::window::{file_time_bucket_span, WindowedCompactionPicker}; - use crate::memtable::partition_tree::{PartitionTreeConfig, PartitionTreeMemtableBuilder}; - use crate::memtable::time_partition::TimePartitions; - use crate::memtable::version::MemtableVersion; use crate::region::options::RegionOptions; - use crate::region::version::{Version, VersionRef}; use crate::sst::file::{FileId, FileMeta, Level}; use crate::sst::version::SstVersion; use crate::test_util::memtable_util::metadata_for_test; use crate::test_util::NoopFilePurger; - fn build_version(files: &[(FileId, i64, i64, Level)], ttl: Option) -> VersionRef { + fn build_version( + files: &[(FileId, i64, i64, Level)], + ttl: Option, + ) -> CompactionVersion { let metadata = metadata_for_test(); - let memtables = Arc::new(MemtableVersion::new(Arc::new(TimePartitions::new( - metadata.clone(), - Arc::new(PartitionTreeMemtableBuilder::new( - PartitionTreeConfig::default(), - None, - )), - 0, - None, - )))); let file_purger_ref = Arc::new(NoopFilePurger); let mut ssts = SstVersion::new(); @@ -244,14 +238,9 @@ mod tests { }), ); - Arc::new(Version { + CompactionVersion { metadata, - memtables, ssts: Arc::new(ssts), - flushed_entry_id: 0, - flushed_sequence: 0, - truncated_entry_id: None, - compaction_time_window: None, options: RegionOptions { ttl: ttl.map(|t| t.into()), compaction: Default::default(), @@ -262,7 +251,8 @@ mod tests { memtable: None, merge_mode: None, }, - }) + compaction_time_window: None, + } } #[test] diff --git a/src/mito2/src/flush.rs b/src/mito2/src/flush.rs index 09f45ca4f724..b522f225f9f0 100644 --- a/src/mito2/src/flush.rs +++ b/src/mito2/src/flush.rs @@ -18,7 +18,7 @@ use std::collections::HashMap; use std::sync::atomic::{AtomicUsize, Ordering}; use std::sync::Arc; -use common_telemetry::{debug, error, info}; +use common_telemetry::{debug, error, info, trace}; use smallvec::SmallVec; use snafu::ResultExt; use store_api::storage::RegionId; @@ -141,17 +141,22 @@ impl WriteBufferManager for WriteBufferManagerImpl { // If the memory exceeds the buffer size, we trigger more aggressive // flush. But if already more than half memory is being flushed, // triggering more flush may not help. We will hold it instead. - if memory_usage >= self.global_write_buffer_size - && mutable_memtable_memory_usage >= self.global_write_buffer_size / 2 - { - debug!( + if memory_usage >= self.global_write_buffer_size { + if mutable_memtable_memory_usage >= self.global_write_buffer_size / 2 { + debug!( "Engine should flush (over total limit), memory_usage: {}, global_write_buffer_size: {}, \ mutable_usage: {}.", memory_usage, self.global_write_buffer_size, - mutable_memtable_memory_usage, - ); - return true; + mutable_memtable_memory_usage); + return true; + } else { + trace!( + "Engine won't flush, memory_usage: {}, global_write_buffer_size: {}, mutable_usage: {}.", + memory_usage, + self.global_write_buffer_size, + mutable_memtable_memory_usage); + } } false