From 922b1a9b66a8f09ed25111e42e876917acc4cffb Mon Sep 17 00:00:00 2001 From: Yingwen Date: Wed, 27 Mar 2024 11:21:22 +0800 Subject: [PATCH] feat: Implement append mode for a region (#3558) * feat: add dedup option to merge reader * test: test merger * feat: append mode option * feat: implement append mode for regions * feat: only allow put under append mode * feat: always create builder * test: test append mode * style: fix clippy * test: trigger compaction * chore: fix compiler errors --- src/mito2/src/compaction/twcs.rs | 16 ++- src/mito2/src/engine.rs | 2 + src/mito2/src/engine/append_mode_test.rs | 153 +++++++++++++++++++++++ src/mito2/src/memtable.rs | 30 ++++- src/mito2/src/memtable/time_series.rs | 6 +- src/mito2/src/read/merge.rs | 136 +++++++++++++++----- src/mito2/src/read/scan_region.rs | 8 +- src/mito2/src/read/seq_scan.rs | 21 +++- src/mito2/src/region/opener.rs | 9 +- src/mito2/src/region/options.rs | 9 ++ src/mito2/src/worker.rs | 18 +-- src/mito2/src/worker/handle_write.rs | 29 ++++- 12 files changed, 372 insertions(+), 65 deletions(-) create mode 100644 src/mito2/src/engine/append_mode_test.rs diff --git a/src/mito2/src/compaction/twcs.rs b/src/mito2/src/compaction/twcs.rs index fbb07b71e1c7..82bceabd56a8 100644 --- a/src/mito2/src/compaction/twcs.rs +++ b/src/mito2/src/compaction/twcs.rs @@ -188,6 +188,7 @@ impl Picker for TwcsPicker { cache_manager, storage: current_version.options.storage.clone(), index_options: current_version.options.index_options.clone(), + append_mode: current_version.options.append_mode, }; Some(Box::new(task)) } @@ -255,6 +256,8 @@ pub(crate) struct TwcsCompactionTask { pub(crate) storage: Option, /// Index options of the region. pub(crate) index_options: IndexOptions, + /// The region is using append mode. + pub(crate) append_mode: bool, } impl Debug for TwcsCompactionTask { @@ -264,6 +267,7 @@ impl Debug for TwcsCompactionTask { .field("outputs", &self.outputs) .field("expired_ssts", &self.expired_ssts) .field("compaction_time_window", &self.compaction_time_window) + .field("append_mode", &self.append_mode) .finish() } } @@ -332,9 +336,15 @@ impl TwcsCompactionTask { let cache_manager = self.cache_manager.clone(); let storage = self.storage.clone(); let index_options = self.index_options.clone(); + let append_mode = self.append_mode; futs.push(async move { - let reader = - build_sst_reader(metadata.clone(), sst_layer.clone(), &output.inputs).await?; + let reader = build_sst_reader( + metadata.clone(), + sst_layer.clone(), + &output.inputs, + append_mode, + ) + .await?; let file_meta_opt = sst_layer .write_sst( SstWriteRequest { @@ -565,9 +575,11 @@ async fn build_sst_reader( metadata: RegionMetadataRef, sst_layer: AccessLayerRef, inputs: &[FileHandle], + append_mode: bool, ) -> error::Result { SeqScan::new(sst_layer, ProjectionMapper::all(&metadata)?) .with_files(inputs.to_vec()) + .with_append_mode(append_mode) // We ignore file not found error during compaction. .with_ignore_file_not_found(true) .build_reader() diff --git a/src/mito2/src/engine.rs b/src/mito2/src/engine.rs index 449f87dcf44a..f244a1edc9e6 100644 --- a/src/mito2/src/engine.rs +++ b/src/mito2/src/engine.rs @@ -17,6 +17,8 @@ #[cfg(test)] mod alter_test; #[cfg(test)] +mod append_mode_test; +#[cfg(test)] mod basic_test; #[cfg(test)] mod catchup_test; diff --git a/src/mito2/src/engine/append_mode_test.rs b/src/mito2/src/engine/append_mode_test.rs new file mode 100644 index 000000000000..bb2a4e017fa5 --- /dev/null +++ b/src/mito2/src/engine/append_mode_test.rs @@ -0,0 +1,153 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +//! Tests for append mode. + +use api::v1::Rows; +use common_recordbatch::RecordBatches; +use store_api::region_engine::RegionEngine; +use store_api::region_request::{RegionCompactRequest, RegionRequest}; +use store_api::storage::{RegionId, ScanRequest}; + +use crate::config::MitoConfig; +use crate::test_util::{ + build_rows, build_rows_for_key, flush_region, put_rows, rows_schema, CreateRequestBuilder, + TestEnv, +}; + +#[tokio::test] +async fn test_append_mode_write_query() { + common_telemetry::init_default_ut_logging(); + + let mut env = TestEnv::new(); + let engine = env.create_engine(MitoConfig::default()).await; + + let region_id = RegionId::new(1, 1); + let request = CreateRequestBuilder::new() + .insert_option("append_mode", "true") + .build(); + + let column_schemas = rows_schema(&request); + engine + .handle_request(region_id, RegionRequest::Create(request)) + .await + .unwrap(); + + // rows 1, 2 + let rows = build_rows(1, 3); + let rows = Rows { + schema: column_schemas.clone(), + rows, + }; + put_rows(&engine, region_id, rows).await; + + let mut rows = build_rows(0, 2); + rows.append(&mut build_rows(1, 2)); + // rows 0, 1, 1 + let rows = Rows { + schema: column_schemas, + rows, + }; + put_rows(&engine, region_id, rows).await; + + let request = ScanRequest::default(); + let stream = engine.handle_query(region_id, request).await.unwrap(); + let batches = RecordBatches::try_collect(stream).await.unwrap(); + let expected = "\ ++-------+---------+---------------------+ +| tag_0 | field_0 | ts | ++-------+---------+---------------------+ +| 0 | 0.0 | 1970-01-01T00:00:00 | +| 1 | 1.0 | 1970-01-01T00:00:01 | +| 1 | 1.0 | 1970-01-01T00:00:01 | +| 1 | 1.0 | 1970-01-01T00:00:01 | +| 2 | 2.0 | 1970-01-01T00:00:02 | ++-------+---------+---------------------+"; + assert_eq!(expected, batches.pretty_print().unwrap()); +} + +#[tokio::test] +async fn test_append_mode_compaction() { + let mut env = TestEnv::new(); + let engine = env.create_engine(MitoConfig::default()).await; + + let region_id = RegionId::new(1, 1); + let request = CreateRequestBuilder::new() + .insert_option("compaction.type", "twcs") + .insert_option("compaction.twcs.max_active_window_files", "2") + .insert_option("compaction.twcs.max_inactive_window_files", "2") + .insert_option("append_mode", "true") + .build(); + + let column_schemas = rows_schema(&request); + engine + .handle_request(region_id, RegionRequest::Create(request)) + .await + .unwrap(); + + // Flush 2 SSTs for compaction. + // a, field 1, 2 + let rows = Rows { + schema: column_schemas.clone(), + rows: build_rows_for_key("a", 1, 3, 1), + }; + put_rows(&engine, region_id, rows).await; + flush_region(&engine, region_id, None).await; + // a, field 0, 1 + let rows = Rows { + schema: column_schemas.clone(), + rows: build_rows_for_key("a", 0, 2, 0), + }; + put_rows(&engine, region_id, rows).await; + flush_region(&engine, region_id, None).await; + // b, field 0, 1 + let rows = Rows { + schema: column_schemas.clone(), + rows: build_rows_for_key("b", 0, 2, 0), + }; + put_rows(&engine, region_id, rows).await; + flush_region(&engine, region_id, None).await; + + let output = engine + .handle_request(region_id, RegionRequest::Compact(RegionCompactRequest {})) + .await + .unwrap(); + assert_eq!(output.affected_rows, 0); + + // a, field 2, 3 + let rows = Rows { + schema: column_schemas, + rows: build_rows_for_key("a", 2, 4, 2), + }; + put_rows(&engine, region_id, rows).await; + + let scanner = engine.scanner(region_id, ScanRequest::default()).unwrap(); + assert_eq!(1, scanner.num_files()); + let stream = scanner.scan().await.unwrap(); + let batches = RecordBatches::try_collect(stream).await.unwrap(); + let expected = "\ ++-------+---------+---------------------+ +| tag_0 | field_0 | ts | ++-------+---------+---------------------+ +| a | 0.0 | 1970-01-01T00:00:00 | +| a | 1.0 | 1970-01-01T00:00:01 | +| a | 1.0 | 1970-01-01T00:00:01 | +| a | 2.0 | 1970-01-01T00:00:02 | +| a | 2.0 | 1970-01-01T00:00:02 | +| a | 3.0 | 1970-01-01T00:00:03 | +| b | 0.0 | 1970-01-01T00:00:00 | +| b | 1.0 | 1970-01-01T00:00:01 | ++-------+---------+---------------------+"; + assert_eq!(expected, batches.pretty_print().unwrap()); +} diff --git a/src/mito2/src/memtable.rs b/src/mito2/src/memtable.rs index d92857dc97ff..6df63f0e9973 100644 --- a/src/mito2/src/memtable.rs +++ b/src/mito2/src/memtable.rs @@ -24,6 +24,7 @@ use store_api::metadata::RegionMetadataRef; use store_api::storage::ColumnId; use table::predicate::Predicate; +use crate::config::MitoConfig; use crate::error::Result; use crate::flush::WriteBufferManagerRef; use crate::memtable::key_values::KeyValue; @@ -212,27 +213,29 @@ impl Drop for AllocTracker { #[derive(Clone)] pub(crate) struct MemtableBuilderProvider { write_buffer_manager: Option, - default_memtable_builder: MemtableBuilderRef, + config: Arc, } impl MemtableBuilderProvider { pub(crate) fn new( write_buffer_manager: Option, - default_memtable_builder: MemtableBuilderRef, + config: Arc, ) -> Self { Self { write_buffer_manager, - default_memtable_builder, + config, } } pub(crate) fn builder_for_options( &self, options: Option<&MemtableOptions>, + dedup: bool, ) -> MemtableBuilderRef { match options { Some(MemtableOptions::TimeSeries) => Arc::new(TimeSeriesMemtableBuilder::new( self.write_buffer_manager.clone(), + dedup, )), Some(MemtableOptions::PartitionTree(opts)) => { Arc::new(PartitionTreeMemtableBuilder::new( @@ -240,12 +243,29 @@ impl MemtableBuilderProvider { index_max_keys_per_shard: opts.index_max_keys_per_shard, data_freeze_threshold: opts.data_freeze_threshold, fork_dictionary_bytes: opts.fork_dictionary_bytes, - ..Default::default() + dedup, }, self.write_buffer_manager.clone(), )) } - None => self.default_memtable_builder.clone(), + None => self.default_memtable_builder(dedup), + } + } + + fn default_memtable_builder(&self, dedup: bool) -> MemtableBuilderRef { + match &self.config.memtable { + MemtableConfig::PartitionTree(config) => { + let mut config = config.clone(); + config.dedup = dedup; + Arc::new(PartitionTreeMemtableBuilder::new( + config, + self.write_buffer_manager.clone(), + )) + } + MemtableConfig::TimeSeries => Arc::new(TimeSeriesMemtableBuilder::new( + self.write_buffer_manager.clone(), + dedup, + )), } } } diff --git a/src/mito2/src/memtable/time_series.rs b/src/mito2/src/memtable/time_series.rs index 5403a8fea225..3991504e5182 100644 --- a/src/mito2/src/memtable/time_series.rs +++ b/src/mito2/src/memtable/time_series.rs @@ -54,13 +54,15 @@ const INITIAL_BUILDER_CAPACITY: usize = 0; #[derive(Debug, Default)] pub struct TimeSeriesMemtableBuilder { write_buffer_manager: Option, + dedup: bool, } impl TimeSeriesMemtableBuilder { /// Creates a new builder with specific `write_buffer_manager`. - pub fn new(write_buffer_manager: Option) -> Self { + pub fn new(write_buffer_manager: Option, dedup: bool) -> Self { Self { write_buffer_manager, + dedup, } } } @@ -71,7 +73,7 @@ impl MemtableBuilder for TimeSeriesMemtableBuilder { metadata.clone(), id, self.write_buffer_manager.clone(), - true, // todo(hl): set according to region option + self.dedup, )) } } diff --git a/src/mito2/src/read/merge.rs b/src/mito2/src/read/merge.rs index ae9dca224c91..ca758d28253b 100644 --- a/src/mito2/src/read/merge.rs +++ b/src/mito2/src/read/merge.rs @@ -33,7 +33,8 @@ use crate::read::{Batch, BatchReader, BoxedBatchReader, Source}; /// The merge reader merges [Batch]es from multiple sources that yield sorted batches. /// 1. Batch is ordered by primary key, time index, sequence desc, op type desc (we can /// ignore op type as sequence is already unique). -/// 2. Batch doesn't have duplicate elements (elements with the same primary key and time index). +/// 2. Batch doesn't have duplicate elements (elements with the same primary key and time index) if +/// dedup is true. /// 3. Batches from sources **must** not be empty. pub struct MergeReader { /// Holds [Node]s whose key range of current batch **is** overlapped with the merge window. @@ -48,6 +49,8 @@ pub struct MergeReader { cold: BinaryHeap, /// Batch to output. output_batch: Option, + /// Remove duplicate timestamps. + dedup: bool, /// Local metrics. metrics: Metrics, } @@ -98,7 +101,7 @@ impl Drop for MergeReader { impl MergeReader { /// Creates and initializes a new [MergeReader]. - pub async fn new(sources: Vec) -> Result { + pub async fn new(sources: Vec, dedup: bool) -> Result { let start = Instant::now(); let mut metrics = Metrics::default(); @@ -116,6 +119,7 @@ impl MergeReader { hot, cold, output_batch: None, + dedup, metrics, }; // Initializes the reader. @@ -150,12 +154,13 @@ impl MergeReader { let mut hottest = self.hot.pop().unwrap(); let batch = hottest.fetch_batch(&mut self.metrics).await?; - Self::maybe_output_batch(batch, &mut self.output_batch, &mut self.metrics)?; + Self::maybe_output_batch(batch, &mut self.output_batch, self.dedup, &mut self.metrics)?; self.reheap(hottest) } - /// Fetches non-duplicated rows from the hottest node and skips the timestamp duplicated - /// with the first timestamp in the next node. + /// Fetches non-duplicated rows from the hottest node. + /// + /// If `dedup` is true, it skips the timestamp duplicated with the first timestamp in the next node. async fn fetch_rows_from_hottest(&mut self) -> Result<()> { // Safety: `fetch_batches_to_output()` ensures the hot heap has more than 1 element. // Pop hottest node. @@ -176,36 +181,58 @@ impl MergeReader { // Binary searches the timestamp in the top batch. // Safety: Batches should have the same timestamp resolution so we can compare the native // value directly. - match timestamps.binary_search(&next_min_ts.value()) { - Ok(pos) => { - // They have duplicate timestamps. Outputs timestamps before the duplicated timestamp. - // Batch itself doesn't contain duplicate timestamps so timestamps before `pos` - // must be less than `next_min_ts`. - Self::maybe_output_batch( - top.slice(0, pos), - &mut self.output_batch, - &mut self.metrics, - )?; - // This keep the duplicate timestamp in the node. - top_node.skip_rows(pos, &mut self.metrics).await?; - // The merge window should contain this timestamp so only nodes in the hot heap - // have this timestamp. - self.filter_first_duplicate_timestamp_in_hot(top_node, next_min_ts) - .await?; - } + let duplicate_pos = match timestamps.binary_search(&next_min_ts.value()) { + Ok(pos) => pos, Err(pos) => { // No duplicate timestamp. Outputs timestamp before `pos`. Self::maybe_output_batch( top.slice(0, pos), &mut self.output_batch, + self.dedup, &mut self.metrics, )?; top_node.skip_rows(pos, &mut self.metrics).await?; - self.reheap(top_node)?; + return self.reheap(top_node); } + }; + + if self.dedup { + // They have duplicate timestamps. Outputs timestamps before the duplicated timestamp. + // Batch itself doesn't contain duplicate timestamps so timestamps before `duplicate_pos` + // must be less than `next_min_ts`. + Self::maybe_output_batch( + top.slice(0, duplicate_pos), + &mut self.output_batch, + self.dedup, + &mut self.metrics, + )?; + // This keep the duplicate timestamp in the node. + top_node.skip_rows(duplicate_pos, &mut self.metrics).await?; + // The merge window should contain this timestamp so only nodes in the hot heap + // have this timestamp. + return self + .filter_first_duplicate_timestamp_in_hot(top_node, next_min_ts) + .await; } - Ok(()) + // No need to remove duplicate timestamps. + let output_end = if duplicate_pos == 0 { + // If the first timestamp of the top node is duplicate. We can simply return the first row + // as the heap ensure it is the one with largest sequence. + 1 + } else { + // We don't know which one has the larger sequence so we use the range before + // the duplicate pos. + duplicate_pos + }; + Self::maybe_output_batch( + top.slice(0, output_end), + &mut self.output_batch, + self.dedup, + &mut self.metrics, + )?; + top_node.skip_rows(output_end, &mut self.metrics).await?; + self.reheap(top_node) } /// Filters the first duplicate `timestamp` in `top_node` and `hot` heap. Only keeps the timestamp @@ -297,12 +324,17 @@ impl MergeReader { fn maybe_output_batch( mut batch: Batch, output_batch: &mut Option, + dedup: bool, metrics: &mut Metrics, ) -> Result<()> { debug_assert!(output_batch.is_none()); let num_rows = batch.num_rows(); - batch.filter_deleted()?; + // If dedup is false, we don't expect delete happens and we skip checking whether there + // is any deleted entry. + if dedup { + batch.filter_deleted()?; + } // Update deleted rows metrics. metrics.num_deleted_rows += num_rows - batch.num_rows(); if batch.is_empty() { @@ -315,12 +347,13 @@ impl MergeReader { } /// Builder to build and initialize a [MergeReader]. -#[derive(Default)] pub struct MergeReaderBuilder { /// Input sources. /// /// All source must yield batches with the same schema. sources: Vec, + /// Remove duplicate timestamps. Default is true. + dedup: bool, } impl MergeReaderBuilder { @@ -330,8 +363,8 @@ impl MergeReaderBuilder { } /// Creates a builder from sources. - pub fn from_sources(sources: Vec) -> MergeReaderBuilder { - MergeReaderBuilder { sources } + pub fn from_sources(sources: Vec, dedup: bool) -> MergeReaderBuilder { + MergeReaderBuilder { sources, dedup } } /// Pushes a batch reader to sources. @@ -349,7 +382,16 @@ impl MergeReaderBuilder { /// Builds and initializes the reader, then resets the builder. pub async fn build(&mut self) -> Result { let sources = mem::take(&mut self.sources); - MergeReader::new(sources).await + MergeReader::new(sources, self.dedup).await + } +} + +impl Default for MergeReaderBuilder { + fn default() -> Self { + MergeReaderBuilder { + sources: Vec::new(), + dedup: true, + } } } @@ -901,4 +943,40 @@ mod tests { .collect(); check_reader_result(&mut reader, &expect).await; } + + #[tokio::test] + async fn test_merge_keep_duplicate() { + let reader1 = VecBatchReader::new(&[new_batch( + b"k1", + &[1, 2], + &[10, 10], + &[OpType::Put, OpType::Put], + &[21, 22], + )]); + let reader2 = VecBatchReader::new(&[new_batch( + b"k1", + &[2, 3], + &[11, 11], + &[OpType::Put, OpType::Put], + &[32, 33], + )]); + let sources = vec![ + Source::Reader(Box::new(reader1)), + Source::Iter(Box::new(reader2)), + ]; + let mut reader = MergeReaderBuilder::from_sources(sources, false) + .build() + .await + .unwrap(); + check_reader_result( + &mut reader, + &[ + new_batch(b"k1", &[1], &[10], &[OpType::Put], &[21]), + new_batch(b"k1", &[2], &[11], &[OpType::Put], &[32]), + new_batch(b"k1", &[2], &[10], &[OpType::Put], &[22]), + new_batch(b"k1", &[3], &[11], &[OpType::Put], &[33]), + ], + ) + .await; + } } diff --git a/src/mito2/src/read/scan_region.rs b/src/mito2/src/read/scan_region.rs index 882903672564..07de897d3132 100644 --- a/src/mito2/src/read/scan_region.rs +++ b/src/mito2/src/read/scan_region.rs @@ -210,12 +210,13 @@ impl ScanRegion { .collect(); debug!( - "Seq scan region {}, request: {:?}, memtables: {}, ssts_to_read: {}, total_ssts: {}", + "Seq scan region {}, request: {:?}, memtables: {}, ssts_to_read: {}, total_ssts: {}, append_mode: {}", self.version.metadata.region_id, self.request, memtables.len(), files.len(), - total_ssts + total_ssts, + self.version.options.append_mode, ); let index_applier = self.build_index_applier(); @@ -234,7 +235,8 @@ impl ScanRegion { .with_cache(self.cache_manager) .with_index_applier(index_applier) .with_parallelism(self.parallelism) - .with_start_time(self.start_time); + .with_start_time(self.start_time) + .with_append_mode(self.version.options.append_mode); Ok(seq_scan) } diff --git a/src/mito2/src/read/seq_scan.rs b/src/mito2/src/read/seq_scan.rs index 151210422baf..4f955adace4a 100644 --- a/src/mito2/src/read/seq_scan.rs +++ b/src/mito2/src/read/seq_scan.rs @@ -67,6 +67,8 @@ pub struct SeqScan { index_applier: Option, /// Start time of the query. query_start: Option, + /// The region is using append mode. + append_mode: bool, } impl SeqScan { @@ -85,6 +87,7 @@ impl SeqScan { parallelism: ScanParallism::default(), index_applier: None, query_start: None, + append_mode: false, } } @@ -151,6 +154,12 @@ impl SeqScan { self } + #[must_use] + pub(crate) fn with_append_mode(mut self, is_append_mode: bool) -> Self { + self.append_mode = is_append_mode; + self + } + /// Builds a stream for the query. pub async fn build_stream(&self) -> Result { let mut metrics = Metrics::default(); @@ -210,8 +219,10 @@ impl SeqScan { pub async fn build_reader(&self) -> Result { // Scans all memtables and SSTs. Builds a merge reader to merge results. let sources = self.build_sources().await?; - let mut builder = MergeReaderBuilder::from_sources(sources); - Ok(Box::new(builder.build().await?)) + let dedup = !self.append_mode; + let mut builder = MergeReaderBuilder::from_sources(sources, dedup); + let reader = builder.build().await?; + Ok(Box::new(reader)) } /// Builds a [BoxedBatchReader] that can scan memtables and SSTs in parallel. @@ -228,8 +239,10 @@ impl SeqScan { Source::Stream(stream) }) .collect(); - let mut builder = MergeReaderBuilder::from_sources(sources); - Ok(Box::new(builder.build().await?)) + let dedup = !self.append_mode; + let mut builder = MergeReaderBuilder::from_sources(sources, dedup); + let reader = builder.build().await?; + Ok(Box::new(reader)) } /// Builds and returns sources to read. diff --git a/src/mito2/src/region/opener.rs b/src/mito2/src/region/opener.rs index f99ab4e5d04c..d0fa4ea708ac 100644 --- a/src/mito2/src/region/opener.rs +++ b/src/mito2/src/region/opener.rs @@ -173,7 +173,7 @@ impl RegionOpener { let memtable_builder = self .memtable_builder_provider - .builder_for_options(options.memtable.as_ref()); + .builder_for_options(options.memtable.as_ref(), !options.append_mode); // Initial memtable id is 0. let part_duration = options.compaction.time_window(); let mutable = Arc::new(TimePartitions::new( @@ -281,9 +281,10 @@ impl RegionOpener { access_layer.clone(), self.cache_manager.clone(), )); - let memtable_builder = self - .memtable_builder_provider - .builder_for_options(region_options.memtable.as_ref()); + let memtable_builder = self.memtable_builder_provider.builder_for_options( + region_options.memtable.as_ref(), + !region_options.append_mode, + ); // Initial memtable id is 0. let part_duration = region_options.compaction.time_window(); let mutable = Arc::new(TimePartitions::new( diff --git a/src/mito2/src/region/options.rs b/src/mito2/src/region/options.rs index e890207e874b..a56976874bc0 100644 --- a/src/mito2/src/region/options.rs +++ b/src/mito2/src/region/options.rs @@ -46,6 +46,8 @@ pub struct RegionOptions { pub compaction: CompactionOptions, /// Custom storage. Uses default storage if it is `None`. pub storage: Option, + /// If append mode is enabled, the region keeps duplicate rows. + pub append_mode: bool, /// Wal options. pub wal_options: WalOptions, /// Index options. @@ -91,6 +93,7 @@ impl TryFrom<&HashMap> for RegionOptions { ttl: options.ttl, compaction, storage: options.storage, + append_mode: options.append_mode, wal_options, index_options, memtable, @@ -166,6 +169,7 @@ impl Default for TwcsOptions { /// We need to define a new struct without enum fields as `#[serde(default)]` does not /// support external tagging. +#[serde_as] #[derive(Debug, Deserialize)] #[serde(default)] struct RegionOptionsWithoutEnum { @@ -173,6 +177,8 @@ struct RegionOptionsWithoutEnum { #[serde(with = "humantime_serde")] ttl: Option, storage: Option, + #[serde_as(as = "DisplayFromStr")] + append_mode: bool, } impl Default for RegionOptionsWithoutEnum { @@ -181,6 +187,7 @@ impl Default for RegionOptionsWithoutEnum { RegionOptionsWithoutEnum { ttl: options.ttl, storage: options.storage, + append_mode: options.append_mode, } } } @@ -482,6 +489,7 @@ mod tests { ("compaction.twcs.time_window", "2h"), ("compaction.type", "twcs"), ("storage", "S3"), + ("append_mode", "true"), ("index.inverted_index.ignore_column_ids", "1,2,3"), ("index.inverted_index.segment_row_count", "512"), ( @@ -502,6 +510,7 @@ mod tests { time_window: Some(Duration::from_secs(3600 * 2)), }), storage: Some("S3".to_string()), + append_mode: true, wal_options, index_options: IndexOptions { inverted_index: InvertedIndexOptions { diff --git a/src/mito2/src/worker.rs b/src/mito2/src/worker.rs index d007a0c0dc65..a9aa652d8673 100644 --- a/src/mito2/src/worker.rs +++ b/src/mito2/src/worker.rs @@ -48,9 +48,7 @@ use crate::config::MitoConfig; use crate::error::{InvalidRequestSnafu, JoinSnafu, Result, WorkerStoppedSnafu}; use crate::flush::{FlushScheduler, WriteBufferManagerImpl, WriteBufferManagerRef}; use crate::manifest::action::RegionEdit; -use crate::memtable::partition_tree::PartitionTreeMemtableBuilder; -use crate::memtable::time_series::TimeSeriesMemtableBuilder; -use crate::memtable::{MemtableBuilderProvider, MemtableConfig}; +use crate::memtable::MemtableBuilderProvider; use crate::region::{MitoRegionRef, RegionMap, RegionMapRef}; use crate::request::{ BackgroundNotify, DdlRequest, SenderDdlRequest, SenderWriteRequest, WorkerRequest, @@ -338,20 +336,10 @@ impl WorkerStarter { let (sender, receiver) = mpsc::channel(self.config.worker_channel_size); let running = Arc::new(AtomicBool::new(true)); - - let default_memtable_builder = match &self.config.memtable { - MemtableConfig::PartitionTree(config) => Arc::new(PartitionTreeMemtableBuilder::new( - config.clone(), - Some(self.write_buffer_manager.clone()), - )) as _, - MemtableConfig::TimeSeries => Arc::new(TimeSeriesMemtableBuilder::new(Some( - self.write_buffer_manager.clone(), - ))) as _, - }; let now = self.time_provider.current_time_millis(); let mut worker_thread = RegionWorkerLoop { id: self.id, - config: self.config, + config: self.config.clone(), regions: regions.clone(), dropping_regions: Arc::new(RegionMap::default()), sender: sender.clone(), @@ -361,7 +349,7 @@ impl WorkerStarter { running: running.clone(), memtable_builder_provider: MemtableBuilderProvider::new( Some(self.write_buffer_manager.clone()), - default_memtable_builder, + self.config, ), scheduler: self.scheduler.clone(), write_buffer_manager: self.write_buffer_manager, diff --git a/src/mito2/src/worker/handle_write.rs b/src/mito2/src/worker/handle_write.rs index 495af6cb05b1..9b7693710b32 100644 --- a/src/mito2/src/worker/handle_write.rs +++ b/src/mito2/src/worker/handle_write.rs @@ -17,11 +17,13 @@ use std::collections::{hash_map, HashMap}; use std::sync::Arc; +use api::v1::OpType; +use snafu::ensure; use store_api::logstore::LogStore; use store_api::metadata::RegionMetadata; use store_api::storage::RegionId; -use crate::error::{RejectWriteSnafu, Result}; +use crate::error::{InvalidRequestSnafu, RejectWriteSnafu, Result}; use crate::metrics::{ WRITE_REJECT_TOTAL, WRITE_ROWS_TOTAL, WRITE_STAGE_ELAPSED, WRITE_STALL_TOTAL, }; @@ -162,6 +164,16 @@ impl RegionWorkerLoop { // Safety: Now we ensure the region exists. let region_ctx = region_ctxs.get_mut(®ion_id).unwrap(); + if let Err(e) = check_op_type( + region_ctx.version().options.append_mode, + &sender_req.request, + ) { + // Do not allow non-put op under append mode. + sender_req.sender.send(Err(e)); + + continue; + } + // Checks whether request schema is compatible with region schema. if let Err(e) = maybe_fill_missing_columns(&mut sender_req.request, ®ion_ctx.version().metadata) @@ -219,3 +231,18 @@ fn maybe_fill_missing_columns(request: &mut WriteRequest, metadata: &RegionMetad Ok(()) } + +/// Rejects delete request under append mode. +fn check_op_type(append_mode: bool, request: &WriteRequest) -> Result<()> { + if append_mode { + ensure!( + request.op_type == OpType::Put, + InvalidRequestSnafu { + region_id: request.region_id, + reason: "Only put is allowed under append mode", + } + ); + } + + Ok(()) +}