diff --git a/src/mito2/src/cache/write_cache.rs b/src/mito2/src/cache/write_cache.rs index a95bcff15f45..a21cbb5f6b60 100644 --- a/src/mito2/src/cache/write_cache.rs +++ b/src/mito2/src/cache/write_cache.rs @@ -27,7 +27,10 @@ use snafu::ResultExt; use crate::access_layer::{new_fs_cache_store, SstWriteRequest}; use crate::cache::file_cache::{FileCache, FileCacheRef, FileType, IndexKey, IndexValue}; use crate::error::{self, Result}; -use crate::metrics::{FLUSH_ELAPSED, UPLOAD_BYTES_TOTAL}; +use crate::metrics::{ + FLUSH_ELAPSED, UPLOAD_BYTES_TOTAL, WRITE_CACHE_DOWNLOAD_BYTES_TOTAL, + WRITE_CACHE_DOWNLOAD_ELAPSED, +}; use crate::sst::index::intermediate::IntermediateManager; use crate::sst::index::puffin_manager::PuffinManagerFactory; use crate::sst::index::IndexerBuilder; @@ -166,6 +169,79 @@ impl WriteCache { Ok(Some(sst_info)) } + /// Downloads a file in `remote_path` from the remote object store to the local cache + /// (specified by `index_key`). + pub(crate) async fn download( + &self, + index_key: IndexKey, + remote_path: &str, + remote_store: &ObjectStore, + ) -> Result<()> { + const DOWNLOAD_READER_CONCURRENCY: usize = 8; + const DOWNLOAD_READER_CHUNK_SIZE: ReadableSize = ReadableSize::mb(8); + + let file_type = index_key.file_type; + let timer = WRITE_CACHE_DOWNLOAD_ELAPSED + .with_label_values(&[match file_type { + FileType::Parquet => "download_parquet", + FileType::Puffin => "download_puffin", + }]) + .start_timer(); + + let remote_metadata = remote_store + .stat(remote_path) + .await + .context(error::OpenDalSnafu)?; + let reader = remote_store + .reader_with(remote_path) + .concurrent(DOWNLOAD_READER_CONCURRENCY) + .chunk(DOWNLOAD_READER_CHUNK_SIZE.as_bytes() as usize) + .await + .context(error::OpenDalSnafu)? + .into_futures_async_read(0..remote_metadata.content_length()) + .await + .context(error::OpenDalSnafu)?; + + let cache_path = self.file_cache.cache_file_path(index_key); + let mut writer = self + .file_cache + .local_store() + .writer(&cache_path) + .await + .context(error::OpenDalSnafu)? + .into_futures_async_write(); + + let region_id = index_key.region_id; + let file_id = index_key.file_id; + let bytes_written = + futures::io::copy(reader, &mut writer) + .await + .context(error::DownloadSnafu { + region_id, + file_id, + file_type, + })?; + writer.close().await.context(error::DownloadSnafu { + region_id, + file_id, + file_type, + })?; + + WRITE_CACHE_DOWNLOAD_BYTES_TOTAL.inc_by(bytes_written); + + let elapsed = timer.stop_and_record(); + debug!( + "Successfully download file '{}' to local '{}', file size: {}, region: {}, cost: {:?}s", + remote_path, cache_path, bytes_written, region_id, elapsed, + ); + + let index_value = IndexValue { + file_size: bytes_written as _, + }; + self.file_cache.put(index_key, index_value).await; + Ok(()) + } + /// Uploads a Parquet file or a Puffin file to the remote object store. async fn upload( &self, diff --git a/src/mito2/src/engine/edit_region_test.rs b/src/mito2/src/engine/edit_region_test.rs index 8dd682a37269..b13691fb856a 100644 --- a/src/mito2/src/engine/edit_region_test.rs +++ b/src/mito2/src/engine/edit_region_test.rs @@ -12,21 +12,96 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::sync::Arc; +use std::sync::{Arc, Mutex}; +use std::time::Duration; use object_store::ObjectStore; use store_api::region_engine::RegionEngine; use store_api::region_request::RegionRequest; use store_api::storage::RegionId; -use tokio::sync::Barrier; +use tokio::sync::{oneshot, Barrier}; use crate::config::MitoConfig; +use crate::engine::listener::EventListener; use crate::engine::MitoEngine; use crate::manifest::action::RegionEdit; use crate::region::MitoRegionRef; use crate::sst::file::{FileId, FileMeta}; use crate::test_util::{CreateRequestBuilder, TestEnv}; +#[tokio::test] +async fn test_edit_region_fill_cache() { + let mut env = TestEnv::new(); + + struct EditRegionListener { + tx: Mutex>>, + } + + impl EventListener for EditRegionListener { + fn on_file_cache_filled(&self, file_id: FileId) { + let mut tx = self.tx.lock().unwrap(); + tx.take().unwrap().send(file_id).unwrap(); + } + } + + let (tx, rx) = oneshot::channel(); + let engine = env + .create_engine_with( + MitoConfig { + // Write cache must be enabled to download the ingested SST file. + enable_experimental_write_cache: true, + ..Default::default() + }, + None, + Some(Arc::new(EditRegionListener { + tx: Mutex::new(Some(tx)), + })), + ) + .await; + + let region_id = RegionId::new(1, 1); + engine + .handle_request( + region_id, + RegionRequest::Create(CreateRequestBuilder::new().build()), + ) + .await + .unwrap(); + let region = engine.get_region(region_id).unwrap(); + + let file_id = FileId::random(); + // Simulating the ingestion of an SST file. + env.get_object_store() + .unwrap() + .write( + &format!("{}/{}.parquet", region.region_dir(), file_id), + b"x".as_slice(), + ) + .await + .unwrap(); + + let edit = RegionEdit { + files_to_add: vec![FileMeta { + region_id: region.region_id, + file_id, + level: 0, + ..Default::default() + }], + files_to_remove: vec![], + compaction_time_window: None, + flushed_entry_id: None, + flushed_sequence: None, + }; + engine.edit_region(region.region_id, edit).await.unwrap(); + + // Asserts that the background downloading of the SST is succeeded. + let actual = tokio::time::timeout(Duration::from_secs(9), rx) + .await + .unwrap() + .unwrap(); + assert_eq!(file_id, actual); +} + #[tokio::test(flavor = "multi_thread", worker_threads = 4)] async fn test_edit_region_concurrently() { const EDITS_PER_TASK: usize = 10; diff --git a/src/mito2/src/engine/listener.rs b/src/mito2/src/engine/listener.rs index ee6966270147..beea4add1ea8 100644 --- a/src/mito2/src/engine/listener.rs +++ b/src/mito2/src/engine/listener.rs @@ -22,6 +22,8 @@ use common_telemetry::info; use store_api::storage::RegionId; use tokio::sync::Notify; +use crate::sst::file::FileId; + /// Mito engine background event listener. #[async_trait] pub trait EventListener: Send + Sync { @@ -61,6 +63,9 @@ pub trait EventListener: Send + Sync { fn on_recv_requests(&self, request_num: usize) { let _ = request_num; } + + /// Notifies the listener that the file cache is filled when, for example, editing region. + fn on_file_cache_filled(&self, _file_id: FileId) {} } pub type EventListenerRef = Arc; diff --git a/src/mito2/src/error.rs b/src/mito2/src/error.rs index 1f60eee8831e..2038656a2333 100644 --- a/src/mito2/src/error.rs +++ b/src/mito2/src/error.rs @@ -639,6 +639,22 @@ pub enum Error { location: Location, }, + #[snafu(display( + "Failed to download file, region_id: {}, file_id: {}, file_type: {:?}", + region_id, + file_id, + file_type, + ))] + Download { + region_id: RegionId, + file_id: FileId, + file_type: FileType, + #[snafu(source)] + error: std::io::Error, + #[snafu(implicit)] + location: Location, + }, + #[snafu(display( "Failed to upload file, region_id: {}, file_id: {}, file_type: {:?}", region_id, @@ -965,7 +981,7 @@ impl ErrorExt for Error { FilterRecordBatch { source, .. } => source.status_code(), - Upload { .. } => StatusCode::StorageUnavailable, + Download { .. } | Upload { .. } => StatusCode::StorageUnavailable, ChecksumMismatch { .. } => StatusCode::Unexpected, RegionStopped { .. } => StatusCode::RegionNotReady, TimeRangePredicateOverflow { .. } => StatusCode::InvalidArguments, diff --git a/src/mito2/src/metrics.rs b/src/mito2/src/metrics.rs index 963330948955..355c0fba4714 100644 --- a/src/mito2/src/metrics.rs +++ b/src/mito2/src/metrics.rs @@ -189,6 +189,17 @@ lazy_static! { &[TYPE_LABEL] ) .unwrap(); + /// Download bytes counter in the write cache. + pub static ref WRITE_CACHE_DOWNLOAD_BYTES_TOTAL: IntCounter = register_int_counter!( + "mito_write_cache_download_bytes_total", + "mito write cache download bytes total", + ).unwrap(); + /// Timer of the downloading task in the write cache. + pub static ref WRITE_CACHE_DOWNLOAD_ELAPSED: HistogramVec = register_histogram_vec!( + "mito_write_cache_download_elapsed", + "mito write cache download elapsed", + &[TYPE_LABEL], + ).unwrap(); /// Upload bytes counter. pub static ref UPLOAD_BYTES_TOTAL: IntCounter = register_int_counter!( "mito_upload_bytes_total", diff --git a/src/mito2/src/worker.rs b/src/mito2/src/worker.rs index 3aff7764f082..242d48c45f8a 100644 --- a/src/mito2/src/worker.rs +++ b/src/mito2/src/worker.rs @@ -57,6 +57,7 @@ use crate::request::{ BackgroundNotify, DdlRequest, SenderDdlRequest, SenderWriteRequest, WorkerRequest, }; use crate::schedule::scheduler::{LocalScheduler, SchedulerRef}; +use crate::sst::file::FileId; use crate::sst::index::intermediate::IntermediateManager; use crate::sst::index::puffin_manager::PuffinManagerFactory; use crate::time_provider::{StdTimeProvider, TimeProviderRef}; @@ -950,6 +951,13 @@ impl WorkerListener { // Avoid compiler warning. let _ = request_num; } + + pub(crate) fn on_file_cache_filled(&self, _file_id: FileId) { + #[cfg(any(test, feature = "test"))] + if let Some(listener) = &self.listener { + listener.on_file_cache_filled(_file_id); + } + } } #[cfg(test)] diff --git a/src/mito2/src/worker/handle_manifest.rs b/src/mito2/src/worker/handle_manifest.rs index 1f8dfac60816..b9b20423d2a4 100644 --- a/src/mito2/src/worker/handle_manifest.rs +++ b/src/mito2/src/worker/handle_manifest.rs @@ -21,6 +21,8 @@ use std::collections::{HashMap, VecDeque}; use common_telemetry::{info, warn}; use store_api::storage::RegionId; +use crate::cache::file_cache::{FileType, IndexKey}; +use crate::cache::CacheManagerRef; use crate::error::{RegionBusySnafu, RegionNotFoundSnafu, Result}; use crate::manifest::action::{ RegionChange, RegionEdit, RegionMetaAction, RegionMetaActionList, RegionTruncate, @@ -30,7 +32,8 @@ use crate::request::{ BackgroundNotify, OptionOutputTx, RegionChangeResult, RegionEditRequest, RegionEditResult, TruncateResult, WorkerRequest, }; -use crate::worker::RegionWorkerLoop; +use crate::sst::location; +use crate::worker::{RegionWorkerLoop, WorkerListener}; pub(crate) type RegionEditQueues = HashMap; @@ -105,10 +108,12 @@ impl RegionWorkerLoop { } let request_sender = self.sender.clone(); + let cache_manager = self.cache_manager.clone(); + let listener = self.listener.clone(); // Now the region is in editing state. // Updates manifest in background. common_runtime::spawn_global(async move { - let result = edit_region(®ion, edit.clone()).await; + let result = edit_region(®ion, edit.clone(), cache_manager, listener).await; let notify = WorkerRequest::Background { region_id, notify: BackgroundNotify::RegionEdit(RegionEditResult { @@ -286,8 +291,40 @@ impl RegionWorkerLoop { } /// Checks the edit, writes and applies it. -async fn edit_region(region: &MitoRegionRef, edit: RegionEdit) -> Result<()> { +async fn edit_region( + region: &MitoRegionRef, + edit: RegionEdit, + cache_manager: CacheManagerRef, + listener: WorkerListener, +) -> Result<()> { let region_id = region.region_id; + if let Some(write_cache) = cache_manager.write_cache() { + for file_meta in &edit.files_to_add { + let write_cache = write_cache.clone(); + let layer = region.access_layer.clone(); + let listener = listener.clone(); + + let index_key = IndexKey::new(region_id, file_meta.file_id, FileType::Parquet); + let remote_path = location::sst_file_path(layer.region_dir(), file_meta.file_id); + common_runtime::spawn_global(async move { + if write_cache + .download(index_key, &remote_path, layer.object_store()) + .await + .is_ok() + { + // Triggers the filling of the parquet metadata cache. + // The parquet file is already downloaded. + let _ = write_cache + .file_cache() + .get_parquet_meta_data(index_key) + .await; + + listener.on_file_cache_filled(index_key.file_id); + } + }); + } + } + info!("Applying {edit:?} to region {}", region_id); let action_list = RegionMetaActionList::with_action(RegionMetaAction::Edit(edit));