From fde3c051b1b91afdff09160c41f1a6f096ea44aa Mon Sep 17 00:00:00 2001 From: luofucong Date: Fri, 28 Jun 2024 18:50:55 +0800 Subject: [PATCH 1/4] refactor: pre-read the ingested sst file in object store to fill the local cache to accelerate first query --- src/mito2/src/worker/handle_manifest.rs | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/src/mito2/src/worker/handle_manifest.rs b/src/mito2/src/worker/handle_manifest.rs index 1f8dfac60816..1834fa834345 100644 --- a/src/mito2/src/worker/handle_manifest.rs +++ b/src/mito2/src/worker/handle_manifest.rs @@ -30,6 +30,7 @@ use crate::request::{ BackgroundNotify, OptionOutputTx, RegionChangeResult, RegionEditRequest, RegionEditResult, TruncateResult, WorkerRequest, }; +use crate::sst::location; use crate::worker::RegionWorkerLoop; pub(crate) type RegionEditQueues = HashMap; @@ -288,6 +289,15 @@ impl RegionWorkerLoop { /// Checks the edit, writes and applies it. async fn edit_region(region: &MitoRegionRef, edit: RegionEdit) -> Result<()> { let region_id = region.region_id; + for file_meta in &edit.files_to_add { + // TODO(LFC): Fill the file in write cache, too. + let layer = region.access_layer.clone(); + let path = location::sst_file_path(layer.region_dir(), file_meta.file_id); + common_runtime::spawn_bg(async move { + let _ = layer.object_store().read(&path).await; + }); + } + info!("Applying {edit:?} to region {}", region_id); let action_list = RegionMetaActionList::with_action(RegionMetaAction::Edit(edit)); From cd4891e811f7db1e3c56ca200fe8770db85eb81b Mon Sep 17 00:00:00 2001 From: luofucong Date: Wed, 28 Aug 2024 22:17:19 +0800 Subject: [PATCH 2/4] feat: pre-download the ingested SST from remote to accelerate following reads --- src/mito2/src/cache/write_cache.rs | 74 +++++++++++++++++++++- src/mito2/src/engine/edit_region_test.rs | 79 +++++++++++++++++++++++- src/mito2/src/engine/listener.rs | 5 ++ src/mito2/src/error.rs | 18 +++++- src/mito2/src/metrics.rs | 6 ++ src/mito2/src/worker.rs | 8 +++ src/mito2/src/worker/handle_manifest.rs | 47 +++++++++++--- 7 files changed, 222 insertions(+), 15 deletions(-) diff --git a/src/mito2/src/cache/write_cache.rs b/src/mito2/src/cache/write_cache.rs index a95bcff15f45..366149f33574 100644 --- a/src/mito2/src/cache/write_cache.rs +++ b/src/mito2/src/cache/write_cache.rs @@ -15,7 +15,7 @@ //! A write-through cache for remote object stores. use std::sync::Arc; -use std::time::Duration; +use std::time::{Duration, Instant}; use common_base::readable_size::ReadableSize; use common_telemetry::{debug, info}; @@ -27,7 +27,7 @@ use snafu::ResultExt; use crate::access_layer::{new_fs_cache_store, SstWriteRequest}; use crate::cache::file_cache::{FileCache, FileCacheRef, FileType, IndexKey, IndexValue}; use crate::error::{self, Result}; -use crate::metrics::{FLUSH_ELAPSED, UPLOAD_BYTES_TOTAL}; +use crate::metrics::{DOWNLOAD_BYTES_TOTAL, FLUSH_ELAPSED, UPLOAD_BYTES_TOTAL}; use crate::sst::index::intermediate::IntermediateManager; use crate::sst::index::puffin_manager::PuffinManagerFactory; use crate::sst::index::IndexerBuilder; @@ -166,6 +166,76 @@ impl WriteCache { Ok(Some(sst_info)) } + /// Downloads a file in `remote_path` from the remote object store to the local cache + /// (specified by `index_key`). + pub(crate) async fn download( + &self, + index_key: IndexKey, + remote_path: &str, + remote_store: &ObjectStore, + ) -> Result<()> { + const DOWNLOAD_READER_CONCURRENCY: usize = 8; + const DOWNLOAD_READER_CHUNK_SIZE: ReadableSize = ReadableSize::mb(8); + + let start = Instant::now(); + + let remote_metadata = remote_store + .stat(remote_path) + .await + .context(error::OpenDalSnafu)?; + let reader = remote_store + .reader_with(remote_path) + .concurrent(DOWNLOAD_READER_CONCURRENCY) + .chunk(DOWNLOAD_READER_CHUNK_SIZE.as_bytes() as usize) + .await + .context(error::OpenDalSnafu)? + .into_futures_async_read(0..remote_metadata.content_length()) + .await + .context(error::OpenDalSnafu)?; + + let cache_path = self.file_cache.cache_file_path(index_key); + let mut writer = self + .file_cache + .local_store() + .writer(&cache_path) + .await + .context(error::OpenDalSnafu)? + .into_futures_async_write(); + + let region_id = index_key.region_id; + let file_id = index_key.file_id; + let file_type = index_key.file_type; + let bytes_written = + futures::io::copy(reader, &mut writer) + .await + .context(error::DownloadSnafu { + region_id, + file_id, + file_type, + })?; + writer.close().await.context(error::DownloadSnafu { + region_id, + file_id, + file_type, + })?; + + DOWNLOAD_BYTES_TOTAL.inc_by(bytes_written); + + debug!( + "Successfully download file '{}' to local '{}', region: {}, cost: {:?}", + remote_path, + cache_path, + region_id, + start.elapsed(), + ); + + let index_value = IndexValue { + file_size: bytes_written as _, + }; + self.file_cache.put(index_key, index_value).await; + Ok(()) + } + /// Uploads a Parquet file or a Puffin file to the remote object store. async fn upload( &self, diff --git a/src/mito2/src/engine/edit_region_test.rs b/src/mito2/src/engine/edit_region_test.rs index 8dd682a37269..b13691fb856a 100644 --- a/src/mito2/src/engine/edit_region_test.rs +++ b/src/mito2/src/engine/edit_region_test.rs @@ -12,21 +12,96 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::sync::Arc; +use std::sync::{Arc, Mutex}; +use std::time::Duration; use object_store::ObjectStore; use store_api::region_engine::RegionEngine; use store_api::region_request::RegionRequest; use store_api::storage::RegionId; -use tokio::sync::Barrier; +use tokio::sync::{oneshot, Barrier}; use crate::config::MitoConfig; +use crate::engine::listener::EventListener; use crate::engine::MitoEngine; use crate::manifest::action::RegionEdit; use crate::region::MitoRegionRef; use crate::sst::file::{FileId, FileMeta}; use crate::test_util::{CreateRequestBuilder, TestEnv}; +#[tokio::test] +async fn test_edit_region_fill_cache() { + let mut env = TestEnv::new(); + + struct EditRegionListener { + tx: Mutex>>, + } + + impl EventListener for EditRegionListener { + fn on_file_cache_filled(&self, file_id: FileId) { + let mut tx = self.tx.lock().unwrap(); + tx.take().unwrap().send(file_id).unwrap(); + } + } + + let (tx, rx) = oneshot::channel(); + let engine = env + .create_engine_with( + MitoConfig { + // Write cache must be enabled to download the ingested SST file. + enable_experimental_write_cache: true, + ..Default::default() + }, + None, + Some(Arc::new(EditRegionListener { + tx: Mutex::new(Some(tx)), + })), + ) + .await; + + let region_id = RegionId::new(1, 1); + engine + .handle_request( + region_id, + RegionRequest::Create(CreateRequestBuilder::new().build()), + ) + .await + .unwrap(); + let region = engine.get_region(region_id).unwrap(); + + let file_id = FileId::random(); + // Simulating the ingestion of an SST file. + env.get_object_store() + .unwrap() + .write( + &format!("{}/{}.parquet", region.region_dir(), file_id), + b"x".as_slice(), + ) + .await + .unwrap(); + + let edit = RegionEdit { + files_to_add: vec![FileMeta { + region_id: region.region_id, + file_id, + level: 0, + ..Default::default() + }], + files_to_remove: vec![], + compaction_time_window: None, + flushed_entry_id: None, + flushed_sequence: None, + }; + engine.edit_region(region.region_id, edit).await.unwrap(); + + // Asserts that the background downloading of the SST is succeeded. + let actual = tokio::time::timeout(Duration::from_secs(9), rx) + .await + .unwrap() + .unwrap(); + assert_eq!(file_id, actual); +} + #[tokio::test(flavor = "multi_thread", worker_threads = 4)] async fn test_edit_region_concurrently() { const EDITS_PER_TASK: usize = 10; diff --git a/src/mito2/src/engine/listener.rs b/src/mito2/src/engine/listener.rs index ee6966270147..beea4add1ea8 100644 --- a/src/mito2/src/engine/listener.rs +++ b/src/mito2/src/engine/listener.rs @@ -22,6 +22,8 @@ use common_telemetry::info; use store_api::storage::RegionId; use tokio::sync::Notify; +use crate::sst::file::FileId; + /// Mito engine background event listener. #[async_trait] pub trait EventListener: Send + Sync { @@ -61,6 +63,9 @@ pub trait EventListener: Send + Sync { fn on_recv_requests(&self, request_num: usize) { let _ = request_num; } + + /// Notifies the listener that the file cache is filled when, for example, editing region. + fn on_file_cache_filled(&self, _file_id: FileId) {} } pub type EventListenerRef = Arc; diff --git a/src/mito2/src/error.rs b/src/mito2/src/error.rs index 1f60eee8831e..2038656a2333 100644 --- a/src/mito2/src/error.rs +++ b/src/mito2/src/error.rs @@ -639,6 +639,22 @@ pub enum Error { location: Location, }, + #[snafu(display( + "Failed to download file, region_id: {}, file_id: {}, file_type: {:?}", + region_id, + file_id, + file_type, + ))] + Download { + region_id: RegionId, + file_id: FileId, + file_type: FileType, + #[snafu(source)] + error: std::io::Error, + #[snafu(implicit)] + location: Location, + }, + #[snafu(display( "Failed to upload file, region_id: {}, file_id: {}, file_type: {:?}", region_id, @@ -965,7 +981,7 @@ impl ErrorExt for Error { FilterRecordBatch { source, .. } => source.status_code(), - Upload { .. } => StatusCode::StorageUnavailable, + Download { .. } | Upload { .. } => StatusCode::StorageUnavailable, ChecksumMismatch { .. } => StatusCode::Unexpected, RegionStopped { .. } => StatusCode::RegionNotReady, TimeRangePredicateOverflow { .. } => StatusCode::InvalidArguments, diff --git a/src/mito2/src/metrics.rs b/src/mito2/src/metrics.rs index 963330948955..bd82991e652f 100644 --- a/src/mito2/src/metrics.rs +++ b/src/mito2/src/metrics.rs @@ -189,6 +189,12 @@ lazy_static! { &[TYPE_LABEL] ) .unwrap(); + /// Download bytes counter. + pub static ref DOWNLOAD_BYTES_TOTAL: IntCounter = register_int_counter!( + "mito_download_bytes_total", + "mito download bytes total", + ) + .unwrap(); /// Upload bytes counter. pub static ref UPLOAD_BYTES_TOTAL: IntCounter = register_int_counter!( "mito_upload_bytes_total", diff --git a/src/mito2/src/worker.rs b/src/mito2/src/worker.rs index 3aff7764f082..242d48c45f8a 100644 --- a/src/mito2/src/worker.rs +++ b/src/mito2/src/worker.rs @@ -57,6 +57,7 @@ use crate::request::{ BackgroundNotify, DdlRequest, SenderDdlRequest, SenderWriteRequest, WorkerRequest, }; use crate::schedule::scheduler::{LocalScheduler, SchedulerRef}; +use crate::sst::file::FileId; use crate::sst::index::intermediate::IntermediateManager; use crate::sst::index::puffin_manager::PuffinManagerFactory; use crate::time_provider::{StdTimeProvider, TimeProviderRef}; @@ -950,6 +951,13 @@ impl WorkerListener { // Avoid compiler warning. let _ = request_num; } + + pub(crate) fn on_file_cache_filled(&self, _file_id: FileId) { + #[cfg(any(test, feature = "test"))] + if let Some(listener) = &self.listener { + listener.on_file_cache_filled(_file_id); + } + } } #[cfg(test)] diff --git a/src/mito2/src/worker/handle_manifest.rs b/src/mito2/src/worker/handle_manifest.rs index 1834fa834345..b9b20423d2a4 100644 --- a/src/mito2/src/worker/handle_manifest.rs +++ b/src/mito2/src/worker/handle_manifest.rs @@ -21,6 +21,8 @@ use std::collections::{HashMap, VecDeque}; use common_telemetry::{info, warn}; use store_api::storage::RegionId; +use crate::cache::file_cache::{FileType, IndexKey}; +use crate::cache::CacheManagerRef; use crate::error::{RegionBusySnafu, RegionNotFoundSnafu, Result}; use crate::manifest::action::{ RegionChange, RegionEdit, RegionMetaAction, RegionMetaActionList, RegionTruncate, @@ -31,7 +33,7 @@ use crate::request::{ TruncateResult, WorkerRequest, }; use crate::sst::location; -use crate::worker::RegionWorkerLoop; +use crate::worker::{RegionWorkerLoop, WorkerListener}; pub(crate) type RegionEditQueues = HashMap; @@ -106,10 +108,12 @@ impl RegionWorkerLoop { } let request_sender = self.sender.clone(); + let cache_manager = self.cache_manager.clone(); + let listener = self.listener.clone(); // Now the region is in editing state. // Updates manifest in background. common_runtime::spawn_global(async move { - let result = edit_region(®ion, edit.clone()).await; + let result = edit_region(®ion, edit.clone(), cache_manager, listener).await; let notify = WorkerRequest::Background { region_id, notify: BackgroundNotify::RegionEdit(RegionEditResult { @@ -287,15 +291,38 @@ impl RegionWorkerLoop { } /// Checks the edit, writes and applies it. -async fn edit_region(region: &MitoRegionRef, edit: RegionEdit) -> Result<()> { +async fn edit_region( + region: &MitoRegionRef, + edit: RegionEdit, + cache_manager: CacheManagerRef, + listener: WorkerListener, +) -> Result<()> { let region_id = region.region_id; - for file_meta in &edit.files_to_add { - // TODO(LFC): Fill the file in write cache, too. - let layer = region.access_layer.clone(); - let path = location::sst_file_path(layer.region_dir(), file_meta.file_id); - common_runtime::spawn_bg(async move { - let _ = layer.object_store().read(&path).await; - }); + if let Some(write_cache) = cache_manager.write_cache() { + for file_meta in &edit.files_to_add { + let write_cache = write_cache.clone(); + let layer = region.access_layer.clone(); + let listener = listener.clone(); + + let index_key = IndexKey::new(region_id, file_meta.file_id, FileType::Parquet); + let remote_path = location::sst_file_path(layer.region_dir(), file_meta.file_id); + common_runtime::spawn_global(async move { + if write_cache + .download(index_key, &remote_path, layer.object_store()) + .await + .is_ok() + { + // Triggers the filling of the parquet metadata cache. + // The parquet file is already downloaded. + let _ = write_cache + .file_cache() + .get_parquet_meta_data(index_key) + .await; + + listener.on_file_cache_filled(index_key.file_id); + } + }); + } } info!("Applying {edit:?} to region {}", region_id); From b286ffc47fe4619af6259deb07362d2c2632d2bd Mon Sep 17 00:00:00 2001 From: luofucong Date: Thu, 29 Aug 2024 15:00:16 +0800 Subject: [PATCH 3/4] resolve PR comments --- src/mito2/src/cache/write_cache.rs | 26 ++++++++++++++++---------- src/mito2/src/metrics.rs | 17 +++++++++++------ 2 files changed, 27 insertions(+), 16 deletions(-) diff --git a/src/mito2/src/cache/write_cache.rs b/src/mito2/src/cache/write_cache.rs index 366149f33574..982e497414b7 100644 --- a/src/mito2/src/cache/write_cache.rs +++ b/src/mito2/src/cache/write_cache.rs @@ -15,7 +15,7 @@ //! A write-through cache for remote object stores. use std::sync::Arc; -use std::time::{Duration, Instant}; +use std::time::Duration; use common_base::readable_size::ReadableSize; use common_telemetry::{debug, info}; @@ -27,7 +27,10 @@ use snafu::ResultExt; use crate::access_layer::{new_fs_cache_store, SstWriteRequest}; use crate::cache::file_cache::{FileCache, FileCacheRef, FileType, IndexKey, IndexValue}; use crate::error::{self, Result}; -use crate::metrics::{DOWNLOAD_BYTES_TOTAL, FLUSH_ELAPSED, UPLOAD_BYTES_TOTAL}; +use crate::metrics::{ + FLUSH_ELAPSED, UPLOAD_BYTES_TOTAL, WRITE_CACHE_DOWNLOAD_BYTES_TOTAL, + WRITE_CACHE_DOWNLOAD_ELAPSED_TOTAL, +}; use crate::sst::index::intermediate::IntermediateManager; use crate::sst::index::puffin_manager::PuffinManagerFactory; use crate::sst::index::IndexerBuilder; @@ -177,7 +180,13 @@ impl WriteCache { const DOWNLOAD_READER_CONCURRENCY: usize = 8; const DOWNLOAD_READER_CHUNK_SIZE: ReadableSize = ReadableSize::mb(8); - let start = Instant::now(); + let file_type = index_key.file_type; + let timer = WRITE_CACHE_DOWNLOAD_ELAPSED_TOTAL + .with_label_values(&[match file_type { + FileType::Parquet => "download_parquet", + FileType::Puffin => "download_puffin", + }]) + .start_timer(); let remote_metadata = remote_store .stat(remote_path) @@ -204,7 +213,6 @@ impl WriteCache { let region_id = index_key.region_id; let file_id = index_key.file_id; - let file_type = index_key.file_type; let bytes_written = futures::io::copy(reader, &mut writer) .await @@ -219,14 +227,12 @@ impl WriteCache { file_type, })?; - DOWNLOAD_BYTES_TOTAL.inc_by(bytes_written); + WRITE_CACHE_DOWNLOAD_BYTES_TOTAL.inc_by(bytes_written); + let elapsed = timer.stop_and_record(); debug!( - "Successfully download file '{}' to local '{}', region: {}, cost: {:?}", - remote_path, - cache_path, - region_id, - start.elapsed(), + "Successfully download file '{}' to local '{}', file size: {}, region: {}, cost: {:?}s", + remote_path, cache_path, bytes_written, region_id, elapsed, ); let index_value = IndexValue { diff --git a/src/mito2/src/metrics.rs b/src/mito2/src/metrics.rs index bd82991e652f..cf4f6606de40 100644 --- a/src/mito2/src/metrics.rs +++ b/src/mito2/src/metrics.rs @@ -189,12 +189,17 @@ lazy_static! { &[TYPE_LABEL] ) .unwrap(); - /// Download bytes counter. - pub static ref DOWNLOAD_BYTES_TOTAL: IntCounter = register_int_counter!( - "mito_download_bytes_total", - "mito download bytes total", - ) - .unwrap(); + /// Download bytes counter in the write cache. + pub static ref WRITE_CACHE_DOWNLOAD_BYTES_TOTAL: IntCounter = register_int_counter!( + "mito_write_cache_download_bytes_total", + "mito write cache download bytes total", + ).unwrap(); + /// Timer of the downloading task in the write cache. + pub static ref WRITE_CACHE_DOWNLOAD_ELAPSED_TOTAL: HistogramVec = register_histogram_vec!( + "mito_write_cache_download_elapsed_total", + "mito write cache download elapsed total", + &[TYPE_LABEL], + ).unwrap(); /// Upload bytes counter. pub static ref UPLOAD_BYTES_TOTAL: IntCounter = register_int_counter!( "mito_upload_bytes_total", From 1cca5622050c0c7770c8589f60dd027ee715dac0 Mon Sep 17 00:00:00 2001 From: luofucong Date: Thu, 29 Aug 2024 15:09:30 +0800 Subject: [PATCH 4/4] resolve PR comments --- src/mito2/src/cache/write_cache.rs | 4 ++-- src/mito2/src/metrics.rs | 6 +++--- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/src/mito2/src/cache/write_cache.rs b/src/mito2/src/cache/write_cache.rs index 982e497414b7..a21cbb5f6b60 100644 --- a/src/mito2/src/cache/write_cache.rs +++ b/src/mito2/src/cache/write_cache.rs @@ -29,7 +29,7 @@ use crate::cache::file_cache::{FileCache, FileCacheRef, FileType, IndexKey, Inde use crate::error::{self, Result}; use crate::metrics::{ FLUSH_ELAPSED, UPLOAD_BYTES_TOTAL, WRITE_CACHE_DOWNLOAD_BYTES_TOTAL, - WRITE_CACHE_DOWNLOAD_ELAPSED_TOTAL, + WRITE_CACHE_DOWNLOAD_ELAPSED, }; use crate::sst::index::intermediate::IntermediateManager; use crate::sst::index::puffin_manager::PuffinManagerFactory; @@ -181,7 +181,7 @@ impl WriteCache { const DOWNLOAD_READER_CHUNK_SIZE: ReadableSize = ReadableSize::mb(8); let file_type = index_key.file_type; - let timer = WRITE_CACHE_DOWNLOAD_ELAPSED_TOTAL + let timer = WRITE_CACHE_DOWNLOAD_ELAPSED .with_label_values(&[match file_type { FileType::Parquet => "download_parquet", FileType::Puffin => "download_puffin", diff --git a/src/mito2/src/metrics.rs b/src/mito2/src/metrics.rs index cf4f6606de40..355c0fba4714 100644 --- a/src/mito2/src/metrics.rs +++ b/src/mito2/src/metrics.rs @@ -195,9 +195,9 @@ lazy_static! { "mito write cache download bytes total", ).unwrap(); /// Timer of the downloading task in the write cache. - pub static ref WRITE_CACHE_DOWNLOAD_ELAPSED_TOTAL: HistogramVec = register_histogram_vec!( - "mito_write_cache_download_elapsed_total", - "mito write cache download elapsed total", + pub static ref WRITE_CACHE_DOWNLOAD_ELAPSED: HistogramVec = register_histogram_vec!( + "mito_write_cache_download_elapsed", + "mito write cache download elapsed", &[TYPE_LABEL], ).unwrap(); /// Upload bytes counter.