diff --git a/src/mito2/src/engine/compaction_test.rs b/src/mito2/src/engine/compaction_test.rs index efa98e6c7240..fadffe50e528 100644 --- a/src/mito2/src/engine/compaction_test.rs +++ b/src/mito2/src/engine/compaction_test.rs @@ -374,3 +374,90 @@ async fn test_readonly_during_compaction() { let vec = collect_stream_ts(stream).await; assert_eq!((0..20).map(|v| v * 1000).collect::>(), vec); } + +#[tokio::test] +async fn test_compaction_update_time_window() { + common_telemetry::init_default_ut_logging(); + let mut env = TestEnv::new(); + let engine = env.create_engine(MitoConfig::default()).await; + + let region_id = RegionId::new(1, 1); + + env.get_schema_metadata_manager() + .register_region_table_info( + region_id.table_id(), + "test_table", + "test_catalog", + "test_schema", + None, + ) + .await; + + let request = CreateRequestBuilder::new() + .insert_option("compaction.type", "twcs") + .insert_option("compaction.twcs.max_active_window_runs", "2") + .insert_option("compaction.twcs.max_active_window_files", "2") + .insert_option("compaction.twcs.max_inactive_window_runs", "2") + .insert_option("compaction.twcs.max_inactive_window_files", "2") + .build(); + + let column_schemas = request + .column_metadatas + .iter() + .map(column_metadata_to_column_schema) + .collect::>(); + engine + .handle_request(region_id, RegionRequest::Create(request)) + .await + .unwrap(); + // Flush 3 SSTs for compaction. + put_and_flush(&engine, region_id, &column_schemas, 0..1200).await; // window 3600 + put_and_flush(&engine, region_id, &column_schemas, 1200..2400).await; // window 3600 + put_and_flush(&engine, region_id, &column_schemas, 2400..3600).await; // window 3600 + + let result = engine + .handle_request( + region_id, + RegionRequest::Compact(RegionCompactRequest::default()), + ) + .await + .unwrap(); + assert_eq!(result.affected_rows, 0); + + let scanner = engine.scanner(region_id, ScanRequest::default()).unwrap(); + assert_eq!(0, scanner.num_memtables()); + // We keep at most two files. + assert_eq!( + 2, + scanner.num_files(), + "unexpected files: {:?}", + scanner.file_ids() + ); + + // Flush a new SST and the time window is applied. + put_and_flush(&engine, region_id, &column_schemas, 0..1200).await; // window 3600 + + // Puts window 7200. + let rows = Rows { + schema: column_schemas.clone(), + rows: build_rows_for_key("a", 3600, 4000, 0), + }; + put_rows(&engine, region_id, rows).await; + let scanner = engine.scanner(region_id, ScanRequest::default()).unwrap(); + assert_eq!(1, scanner.num_memtables()); + let stream = scanner.scan().await.unwrap(); + let vec = collect_stream_ts(stream).await; + assert_eq!((0..4000).map(|v| v * 1000).collect::>(), vec); + + // Puts window 3600. + let rows = Rows { + schema: column_schemas.clone(), + rows: build_rows_for_key("a", 2400, 3600, 0), + }; + put_rows(&engine, region_id, rows).await; + let scanner = engine.scanner(region_id, ScanRequest::default()).unwrap(); + assert_eq!(2, scanner.num_memtables()); + let stream = scanner.scan().await.unwrap(); + let vec = collect_stream_ts(stream).await; + assert_eq!((0..4000).map(|v| v * 1000).collect::>(), vec); +} diff --git a/src/mito2/src/memtable.rs b/src/mito2/src/memtable.rs index 6adc6eb96aec..942d77a209d2 100644 --- a/src/mito2/src/memtable.rs +++ b/src/mito2/src/memtable.rs @@ -70,7 +70,7 @@ impl Default for MemtableConfig { pub struct MemtableStats { /// The estimated bytes allocated by this memtable from heap. estimated_bytes: usize, - /// The time range that this memtable contains. It is None if + /// The inclusive time range that this memtable contains. It is None if /// and only if the memtable is empty. time_range: Option<(Timestamp, Timestamp)>, /// Total rows in memtable diff --git a/src/mito2/src/memtable/time_partition.rs b/src/mito2/src/memtable/time_partition.rs index 7fa03ae1bed4..052fdca9bcf8 100644 --- a/src/mito2/src/memtable/time_partition.rs +++ b/src/mito2/src/memtable/time_partition.rs @@ -168,8 +168,11 @@ impl TimePartitions { Ok(()) } - /// Forks latest partition. - pub fn fork(&self, metadata: &RegionMetadataRef) -> Self { + /// Forks latest partition and updates the partition duration if `part_duration` is Some. + pub fn fork(&self, metadata: &RegionMetadataRef, part_duration: Option) -> Self { + // Fall back to the existing partition duration. + let part_duration = part_duration.or(self.part_duration); + let mut inner = self.inner.lock().unwrap(); let latest_part = inner .parts @@ -178,24 +181,39 @@ impl TimePartitions { .cloned(); let Some(old_part) = latest_part else { + // If there is no partition, then we create a new partition with the new duration. return Self::new( metadata.clone(), self.builder.clone(), inner.next_memtable_id, - self.part_duration, + part_duration, ); }; + + let old_stats = old_part.memtable.stats(); + // Use the max timestamp to compute the new time range for the memtable. + // If `part_duration` is None, the new range will be None. + let new_time_range = + old_stats + .time_range() + .zip(part_duration) + .and_then(|(range, bucket)| { + partition_start_timestamp(range.1, bucket) + .and_then(|start| PartTimeRange::from_start_duration(start, bucket)) + }); + // Forks the latest partition, but compute the time range based on the new duration. let memtable = old_part.memtable.fork(inner.alloc_memtable_id(), metadata); let new_part = TimePartition { memtable, - time_range: old_part.time_range, + time_range: new_time_range, }; + Self { inner: Mutex::new(PartitionsInner::with_partition( new_part, inner.next_memtable_id, )), - part_duration: self.part_duration, + part_duration, metadata: metadata.clone(), builder: self.builder.clone(), } @@ -238,6 +256,19 @@ impl TimePartitions { inner.next_memtable_id } + /// Creates a new empty partition list from this list and a `part_duration`. + /// It falls back to the old partition duration if `part_duration` is `None`. + pub(crate) fn new_with_part_duration(&self, part_duration: Option) -> Self { + debug_assert!(self.is_empty()); + + Self::new( + self.metadata.clone(), + self.builder.clone(), + self.next_memtable_id(), + part_duration.or(self.part_duration), + ) + } + /// Returns all partitions. fn list_partitions(&self) -> PartitionVec { let inner = self.inner.lock().unwrap(); @@ -447,9 +478,9 @@ mod tests { assert_eq!(1, partitions.num_partitions()); assert!(!partitions.is_empty()); - assert!(!partitions.is_empty()); let mut memtables = Vec::new(); partitions.list_memtables(&mut memtables); + assert_eq!(0, memtables[0].id()); let iter = memtables[0].iter(None, None).unwrap(); let timestamps = collect_iter_timestamps(iter); @@ -503,16 +534,14 @@ mod tests { ); } - #[test] - fn test_write_multi_parts() { - let metadata = memtable_util::metadata_for_test(); + fn new_multi_partitions(metadata: &RegionMetadataRef) -> TimePartitions { let builder = Arc::new(PartitionTreeMemtableBuilder::default()); let partitions = TimePartitions::new(metadata.clone(), builder, 0, Some(Duration::from_secs(5))); assert_eq!(0, partitions.num_partitions()); let kvs = memtable_util::build_key_values( - &metadata, + metadata, "hello".to_string(), 0, &[2000, 0], @@ -524,7 +553,7 @@ mod tests { assert!(!partitions.is_empty()); let kvs = memtable_util::build_key_values( - &metadata, + metadata, "hello".to_string(), 0, &[3000, 7000, 4000, 5000], @@ -534,9 +563,18 @@ mod tests { partitions.write(&kvs).unwrap(); assert_eq!(2, partitions.num_partitions()); + partitions + } + + #[test] + fn test_write_multi_parts() { + let metadata = memtable_util::metadata_for_test(); + let partitions = new_multi_partitions(&metadata); + let parts = partitions.list_partitions(); let iter = parts[0].memtable.iter(None, None).unwrap(); let timestamps = collect_iter_timestamps(iter); + assert_eq!(0, parts[0].memtable.id()); assert_eq!( Timestamp::new_millisecond(0), parts[0].time_range.unwrap().min_timestamp @@ -547,6 +585,7 @@ mod tests { ); assert_eq!(&[0, 2000, 3000, 4000], ×tamps[..]); let iter = parts[1].memtable.iter(None, None).unwrap(); + assert_eq!(1, parts[1].memtable.id()); let timestamps = collect_iter_timestamps(iter); assert_eq!(&[5000, 7000], ×tamps[..]); assert_eq!( @@ -558,4 +597,85 @@ mod tests { parts[1].time_range.unwrap().max_timestamp ); } + + #[test] + fn test_new_with_part_duration() { + let metadata = memtable_util::metadata_for_test(); + let builder = Arc::new(PartitionTreeMemtableBuilder::default()); + let partitions = TimePartitions::new(metadata.clone(), builder.clone(), 0, None); + + let new_parts = partitions.new_with_part_duration(Some(Duration::from_secs(5))); + assert_eq!(Duration::from_secs(5), new_parts.part_duration().unwrap()); + assert_eq!(1, new_parts.next_memtable_id()); + + // Won't update the duration if it's None. + let new_parts = new_parts.new_with_part_duration(None); + assert_eq!(Duration::from_secs(5), new_parts.part_duration().unwrap()); + // Don't need to create new memtables. + assert_eq!(1, new_parts.next_memtable_id()); + + let new_parts = new_parts.new_with_part_duration(Some(Duration::from_secs(10))); + assert_eq!(Duration::from_secs(10), new_parts.part_duration().unwrap()); + // Don't need to create new memtables. + assert_eq!(1, new_parts.next_memtable_id()); + + let builder = Arc::new(PartitionTreeMemtableBuilder::default()); + let partitions = TimePartitions::new(metadata.clone(), builder.clone(), 0, None); + // Need to build a new memtable as duration is still None. + let new_parts = partitions.new_with_part_duration(None); + assert!(new_parts.part_duration().is_none()); + assert_eq!(2, new_parts.next_memtable_id()); + } + + #[test] + fn test_fork_empty() { + let metadata = memtable_util::metadata_for_test(); + let builder = Arc::new(PartitionTreeMemtableBuilder::default()); + let partitions = TimePartitions::new(metadata.clone(), builder, 0, None); + partitions.freeze().unwrap(); + let new_parts = partitions.fork(&metadata, None); + assert!(new_parts.part_duration().is_none()); + assert_eq!(1, new_parts.list_partitions()[0].memtable.id()); + assert_eq!(2, new_parts.next_memtable_id()); + + new_parts.freeze().unwrap(); + let new_parts = new_parts.fork(&metadata, Some(Duration::from_secs(5))); + assert_eq!(Duration::from_secs(5), new_parts.part_duration().unwrap()); + assert_eq!(2, new_parts.list_partitions()[0].memtable.id()); + assert_eq!(3, new_parts.next_memtable_id()); + + new_parts.freeze().unwrap(); + let new_parts = new_parts.fork(&metadata, None); + // Won't update the duration. + assert_eq!(Duration::from_secs(5), new_parts.part_duration().unwrap()); + assert_eq!(3, new_parts.list_partitions()[0].memtable.id()); + assert_eq!(4, new_parts.next_memtable_id()); + + new_parts.freeze().unwrap(); + let new_parts = new_parts.fork(&metadata, Some(Duration::from_secs(10))); + assert_eq!(Duration::from_secs(10), new_parts.part_duration().unwrap()); + assert_eq!(4, new_parts.list_partitions()[0].memtable.id()); + assert_eq!(5, new_parts.next_memtable_id()); + } + + #[test] + fn test_fork_non_empty_none() { + let metadata = memtable_util::metadata_for_test(); + let partitions = new_multi_partitions(&metadata); + partitions.freeze().unwrap(); + + // Won't update the duration. + let new_parts = partitions.fork(&metadata, None); + assert!(new_parts.is_empty()); + assert_eq!(Duration::from_secs(5), new_parts.part_duration().unwrap()); + assert_eq!(2, new_parts.list_partitions()[0].memtable.id()); + assert_eq!(3, new_parts.next_memtable_id()); + + // Although we don't fork a memtable multiple times, we still add a test for it. + let new_parts = partitions.fork(&metadata, Some(Duration::from_secs(10))); + assert!(new_parts.is_empty()); + assert_eq!(Duration::from_secs(10), new_parts.part_duration().unwrap()); + assert_eq!(3, new_parts.list_partitions()[0].memtable.id()); + assert_eq!(4, new_parts.next_memtable_id()); + } } diff --git a/src/mito2/src/memtable/version.rs b/src/mito2/src/memtable/version.rs index 1c7f2b7d4a25..f443396109d8 100644 --- a/src/mito2/src/memtable/version.rs +++ b/src/mito2/src/memtable/version.rs @@ -15,6 +15,7 @@ //! Memtable version. use std::sync::Arc; +use std::time::Duration; use smallvec::SmallVec; use store_api::metadata::RegionMetadataRef; @@ -65,27 +66,53 @@ impl MemtableVersion { /// Returns a new [MemtableVersion] which switches the old mutable memtable to immutable /// memtable. /// + /// It will switch to use the `time_window` provided. + /// /// Returns `None` if the mutable memtable is empty. pub(crate) fn freeze_mutable( &self, metadata: &RegionMetadataRef, + time_window: Option, ) -> Result> { if self.mutable.is_empty() { - // No need to freeze the mutable memtable. - return Ok(None); + // No need to freeze the mutable memtable, but we need to check the time window. + if self.mutable.part_duration() == time_window { + // If the time window is the same, we don't need to update it. + return Ok(None); + } + + // Update the time window. + let mutable = self.mutable.new_with_part_duration(time_window); + common_telemetry::debug!( + "Freeze empty memtable, update partition duration from {:?} to {:?}", + self.mutable.part_duration(), + time_window + ); + return Ok(Some(MemtableVersion { + mutable: Arc::new(mutable), + immutables: self.immutables.clone(), + })); } // Marks the mutable memtable as immutable so it can free the memory usage from our // soft limit. self.mutable.freeze()?; // Fork the memtable. - let mutable = Arc::new(self.mutable.fork(metadata)); + if self.mutable.part_duration() != time_window { + common_telemetry::debug!( + "Fork memtable, update partition duration from {:?}, to {:?}", + self.mutable.part_duration(), + time_window + ); + } + let mutable = Arc::new(self.mutable.fork(metadata, time_window)); - // Pushes the mutable memtable to immutable list. let mut immutables = SmallVec::with_capacity(self.immutables.len() + self.mutable.num_partitions()); - self.mutable.list_memtables_to_small_vec(&mut immutables); immutables.extend(self.immutables.iter().cloned()); + // Pushes the mutable memtable to immutable list. + self.mutable.list_memtables_to_small_vec(&mut immutables); + Ok(Some(MemtableVersion { mutable, immutables, diff --git a/src/mito2/src/region/version.rs b/src/mito2/src/region/version.rs index 8e78fa4ab351..188c314837c0 100644 --- a/src/mito2/src/region/version.rs +++ b/src/mito2/src/region/version.rs @@ -80,8 +80,12 @@ impl VersionControl { /// Freezes the mutable memtable if it is not empty. pub(crate) fn freeze_mutable(&self) -> Result<()> { let version = self.current().version; + let time_window = version.compaction_time_window; - let Some(new_memtables) = version.memtables.freeze_mutable(&version.metadata)? else { + let Some(new_memtables) = version + .memtables + .freeze_mutable(&version.metadata, time_window)? + else { return Ok(()); };