diff --git a/src/datanode/src/datanode.rs b/src/datanode/src/datanode.rs index 6ea9288debb5..461186a8ada7 100644 --- a/src/datanode/src/datanode.rs +++ b/src/datanode/src/datanode.rs @@ -371,6 +371,7 @@ impl DatanodeBuilder { engine: engine.clone(), region_dir, options, + skip_wal_replay: false, }), ) .await?; diff --git a/src/datanode/src/heartbeat/handler.rs b/src/datanode/src/heartbeat/handler.rs index 6f0609f689fa..fd1c7eb9603d 100644 --- a/src/datanode/src/heartbeat/handler.rs +++ b/src/datanode/src/heartbeat/handler.rs @@ -64,6 +64,7 @@ impl RegionHeartbeatResponseHandler { engine: region_ident.engine, region_dir: region_dir(®ion_storage_path, region_id), options: region_options, + skip_wal_replay: false, }); let result = region_server.handle_request(region_id, request).await; diff --git a/src/file-engine/src/region.rs b/src/file-engine/src/region.rs index 218a5e49d5ed..a5af6822285e 100644 --- a/src/file-engine/src/region.rs +++ b/src/file-engine/src/region.rs @@ -163,6 +163,7 @@ mod tests { engine: "file".to_string(), region_dir, options: HashMap::default(), + skip_wal_replay: false, }; let region = FileRegion::open(region_id, request, &object_store) @@ -211,6 +212,7 @@ mod tests { engine: "file".to_string(), region_dir, options: HashMap::default(), + skip_wal_replay: false, }; let err = FileRegion::open(region_id, request, &object_store) .await diff --git a/src/mito2/src/engine/alter_test.rs b/src/mito2/src/engine/alter_test.rs index a7c3d19caeb5..e99571cb4498 100644 --- a/src/mito2/src/engine/alter_test.rs +++ b/src/mito2/src/engine/alter_test.rs @@ -119,6 +119,7 @@ async fn test_alter_region() { engine: String::new(), region_dir, options: HashMap::default(), + skip_wal_replay: false, }), ) .await @@ -201,6 +202,7 @@ async fn test_put_after_alter() { engine: String::new(), region_dir, options: HashMap::default(), + skip_wal_replay: false, }), ) .await diff --git a/src/mito2/src/engine/basic_test.rs b/src/mito2/src/engine/basic_test.rs index a072a90b5cbe..2e3dc4de0d53 100644 --- a/src/mito2/src/engine/basic_test.rs +++ b/src/mito2/src/engine/basic_test.rs @@ -117,6 +117,7 @@ async fn test_region_replay() { engine: String::new(), region_dir, options: HashMap::default(), + skip_wal_replay: false, }), ) .await diff --git a/src/mito2/src/engine/open_test.rs b/src/mito2/src/engine/open_test.rs index 39c703c5c7a5..1e0d79af6742 100644 --- a/src/mito2/src/engine/open_test.rs +++ b/src/mito2/src/engine/open_test.rs @@ -18,15 +18,16 @@ use std::time::Duration; use api::v1::Rows; use common_error::ext::ErrorExt; use common_error::status_code::StatusCode; +use common_recordbatch::RecordBatches; use store_api::region_engine::{RegionEngine, RegionRole}; use store_api::region_request::{ RegionCloseRequest, RegionOpenRequest, RegionPutRequest, RegionRequest, }; -use store_api::storage::RegionId; +use store_api::storage::{RegionId, ScanRequest}; use crate::config::MitoConfig; use crate::test_util::{ - build_rows, put_rows, reopen_region, rows_schema, CreateRequestBuilder, TestEnv, + build_rows, flush_region, put_rows, reopen_region, rows_schema, CreateRequestBuilder, TestEnv, }; #[tokio::test] @@ -42,6 +43,7 @@ async fn test_engine_open_empty() { engine: String::new(), region_dir: "empty".to_string(), options: HashMap::default(), + skip_wal_replay: false, }), ) .await @@ -73,6 +75,7 @@ async fn test_engine_open_existing() { engine: String::new(), region_dir, options: HashMap::default(), + skip_wal_replay: false, }), ) .await @@ -161,6 +164,7 @@ async fn test_engine_region_open_with_options() { engine: String::new(), region_dir, options: HashMap::from([("ttl".to_string(), "4d".to_string())]), + skip_wal_replay: false, }), ) .await @@ -205,6 +209,7 @@ async fn test_engine_region_open_with_custom_store() { engine: String::new(), region_dir, options: HashMap::from([("storage".to_string(), "Gcs".to_string())]), + skip_wal_replay: false, }), ) .await @@ -225,3 +230,92 @@ async fn test_engine_region_open_with_custom_store() { .await .unwrap()); } + +#[tokio::test] +async fn test_open_region_skip_wal_replay() { + let mut env = TestEnv::new(); + let engine = env.create_engine(MitoConfig::default()).await; + + let region_id = RegionId::new(1, 1); + let request = CreateRequestBuilder::new().build(); + let region_dir = request.region_dir.clone(); + + let column_schemas = rows_schema(&request); + engine + .handle_request(region_id, RegionRequest::Create(request)) + .await + .unwrap(); + + let rows = Rows { + schema: column_schemas.clone(), + rows: build_rows(0, 3), + }; + put_rows(&engine, region_id, rows).await; + + flush_region(&engine, region_id, None).await; + + let rows = Rows { + schema: column_schemas.clone(), + rows: build_rows(3, 5), + }; + put_rows(&engine, region_id, rows).await; + + let engine = env.reopen_engine(engine, MitoConfig::default()).await; + // Skip the WAL replay . + engine + .handle_request( + region_id, + RegionRequest::Open(RegionOpenRequest { + engine: String::new(), + region_dir: region_dir.to_string(), + options: Default::default(), + skip_wal_replay: true, + }), + ) + .await + .unwrap(); + + let request = ScanRequest::default(); + let stream = engine.handle_query(region_id, request).await.unwrap(); + let batches = RecordBatches::try_collect(stream).await.unwrap(); + let expected = "\ ++-------+---------+---------------------+ +| tag_0 | field_0 | ts | ++-------+---------+---------------------+ +| 0 | 0.0 | 1970-01-01T00:00:00 | +| 1 | 1.0 | 1970-01-01T00:00:01 | +| 2 | 2.0 | 1970-01-01T00:00:02 | ++-------+---------+---------------------+"; + assert_eq!(expected, batches.pretty_print().unwrap()); + + // Replay the WAL. + let engine = env.reopen_engine(engine, MitoConfig::default()).await; + // Open the region again with options. + engine + .handle_request( + region_id, + RegionRequest::Open(RegionOpenRequest { + engine: String::new(), + region_dir, + options: Default::default(), + skip_wal_replay: false, + }), + ) + .await + .unwrap(); + + let request = ScanRequest::default(); + let stream = engine.handle_query(region_id, request).await.unwrap(); + let batches = RecordBatches::try_collect(stream).await.unwrap(); + let expected = "\ ++-------+---------+---------------------+ +| tag_0 | field_0 | ts | ++-------+---------+---------------------+ +| 0 | 0.0 | 1970-01-01T00:00:00 | +| 1 | 1.0 | 1970-01-01T00:00:01 | +| 2 | 2.0 | 1970-01-01T00:00:02 | +| 3 | 3.0 | 1970-01-01T00:00:03 | +| 4 | 4.0 | 1970-01-01T00:00:04 | ++-------+---------+---------------------+"; + assert_eq!(expected, batches.pretty_print().unwrap()); +} diff --git a/src/mito2/src/engine/parallel_test.rs b/src/mito2/src/engine/parallel_test.rs index 3bed94f98a47..c6c97f68d269 100644 --- a/src/mito2/src/engine/parallel_test.rs +++ b/src/mito2/src/engine/parallel_test.rs @@ -50,6 +50,7 @@ async fn scan_in_parallel( engine: String::new(), region_dir: region_dir.to_string(), options: HashMap::default(), + skip_wal_replay: false, }), ) .await diff --git a/src/mito2/src/engine/truncate_test.rs b/src/mito2/src/engine/truncate_test.rs index 79b782e8766f..b9336afad3d1 100644 --- a/src/mito2/src/engine/truncate_test.rs +++ b/src/mito2/src/engine/truncate_test.rs @@ -250,6 +250,7 @@ async fn test_engine_truncate_reopen() { engine: String::new(), region_dir, options: HashMap::default(), + skip_wal_replay: false, }), ) .await @@ -353,6 +354,7 @@ async fn test_engine_truncate_during_flush() { engine: String::new(), region_dir, options: HashMap::default(), + skip_wal_replay: false, }), ) .await diff --git a/src/mito2/src/region/opener.rs b/src/mito2/src/region/opener.rs index 4c3a1ad577bd..ea387ef2c552 100644 --- a/src/mito2/src/region/opener.rs +++ b/src/mito2/src/region/opener.rs @@ -54,6 +54,7 @@ pub(crate) struct RegionOpener { scheduler: SchedulerRef, options: HashMap, cache_manager: Option, + skip_wal_replay: bool, } impl RegionOpener { @@ -74,6 +75,7 @@ impl RegionOpener { scheduler, options: HashMap::new(), cache_manager: None, + skip_wal_replay: false, } } @@ -95,6 +97,12 @@ impl RegionOpener { self } + /// Sets the `skip_wal_replay`. + pub(crate) fn skip_wal_replay(mut self, skip: bool) -> Self { + self.skip_wal_replay = skip; + self + } + /// Writes region manifest and creates a new region if it does not exist. /// Opens the region if it already exists. /// @@ -235,7 +243,11 @@ impl RegionOpener { .build(); let flushed_entry_id = version.flushed_entry_id; let version_control = Arc::new(VersionControl::new(version)); - replay_memtable(wal, region_id, flushed_entry_id, &version_control).await?; + if !self.skip_wal_replay { + replay_memtable(wal, region_id, flushed_entry_id, &version_control).await?; + } else { + info!("Skip the WAL replay for region: {}", region_id); + } let region = MitoRegion { region_id: self.region_id, diff --git a/src/mito2/src/test_util.rs b/src/mito2/src/test_util.rs index ed20ab1a9dc6..63416a640f3b 100644 --- a/src/mito2/src/test_util.rs +++ b/src/mito2/src/test_util.rs @@ -30,7 +30,6 @@ use api::helper::ColumnDataTypeWrapper; use api::v1::value::ValueData; use api::v1::{OpType, Row, Rows, SemanticType}; use common_datasource::compression::CompressionType; -use common_query::Output; use common_test_util::temp_dir::{create_temp_dir, TempDir}; use datatypes::arrow::array::{TimestampMillisecondArray, UInt64Array, UInt8Array}; use datatypes::prelude::ConcreteDataType; @@ -697,6 +696,7 @@ pub async fn reopen_region( engine: String::new(), region_dir, options: HashMap::default(), + skip_wal_replay: false, }), ) .await diff --git a/src/mito2/src/worker/handle_open.rs b/src/mito2/src/worker/handle_open.rs index 095da683a8f5..49100bee5199 100644 --- a/src/mito2/src/worker/handle_open.rs +++ b/src/mito2/src/worker/handle_open.rs @@ -70,6 +70,7 @@ impl RegionWorkerLoop { self.scheduler.clone(), ) .options(request.options) + .skip_wal_replay(request.skip_wal_replay) .cache(Some(self.cache_manager.clone())) .open(&self.config, &self.wal) .await?; diff --git a/src/store-api/src/region_request.rs b/src/store-api/src/region_request.rs index e04382c64f74..daa2ae3753a5 100644 --- a/src/store-api/src/region_request.rs +++ b/src/store-api/src/region_request.rs @@ -116,6 +116,7 @@ impl RegionRequest { engine: open.engine, region_dir, options: open.options, + skip_wal_replay: false, }), )]) } @@ -197,6 +198,8 @@ pub struct RegionOpenRequest { pub region_dir: String, /// Options of the opened region. pub options: HashMap, + /// To skip replaying the WAL. + pub skip_wal_replay: bool, } /// Close region request.