Skip to content

Commit

Permalink
feat: pre-download the ingested sst (#4636)
Browse files Browse the repository at this point in the history
* refactor: pre-read the ingested sst file in object store to fill the local cache to accelerate first query

* feat: pre-download the ingested SST from remote to accelerate following reads

* resolve PR comments

* resolve PR comments
  • Loading branch information
MichaelScofield authored Aug 29, 2024
1 parent 8c8499c commit d45b041
Show file tree
Hide file tree
Showing 7 changed files with 235 additions and 7 deletions.
78 changes: 77 additions & 1 deletion src/mito2/src/cache/write_cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,10 @@ use snafu::ResultExt;
use crate::access_layer::{new_fs_cache_store, SstWriteRequest};
use crate::cache::file_cache::{FileCache, FileCacheRef, FileType, IndexKey, IndexValue};
use crate::error::{self, Result};
use crate::metrics::{FLUSH_ELAPSED, UPLOAD_BYTES_TOTAL};
use crate::metrics::{
FLUSH_ELAPSED, UPLOAD_BYTES_TOTAL, WRITE_CACHE_DOWNLOAD_BYTES_TOTAL,
WRITE_CACHE_DOWNLOAD_ELAPSED,
};
use crate::sst::index::intermediate::IntermediateManager;
use crate::sst::index::puffin_manager::PuffinManagerFactory;
use crate::sst::index::IndexerBuilder;
Expand Down Expand Up @@ -166,6 +169,79 @@ impl WriteCache {
Ok(Some(sst_info))
}

/// Downloads a file in `remote_path` from the remote object store to the local cache
/// (specified by `index_key`).
pub(crate) async fn download(
&self,
index_key: IndexKey,
remote_path: &str,
remote_store: &ObjectStore,
) -> Result<()> {
const DOWNLOAD_READER_CONCURRENCY: usize = 8;
const DOWNLOAD_READER_CHUNK_SIZE: ReadableSize = ReadableSize::mb(8);

let file_type = index_key.file_type;
let timer = WRITE_CACHE_DOWNLOAD_ELAPSED
.with_label_values(&[match file_type {
FileType::Parquet => "download_parquet",
FileType::Puffin => "download_puffin",
}])
.start_timer();

let remote_metadata = remote_store
.stat(remote_path)
.await
.context(error::OpenDalSnafu)?;
let reader = remote_store
.reader_with(remote_path)
.concurrent(DOWNLOAD_READER_CONCURRENCY)
.chunk(DOWNLOAD_READER_CHUNK_SIZE.as_bytes() as usize)
.await
.context(error::OpenDalSnafu)?
.into_futures_async_read(0..remote_metadata.content_length())
.await
.context(error::OpenDalSnafu)?;

let cache_path = self.file_cache.cache_file_path(index_key);
let mut writer = self
.file_cache
.local_store()
.writer(&cache_path)
.await
.context(error::OpenDalSnafu)?
.into_futures_async_write();

let region_id = index_key.region_id;
let file_id = index_key.file_id;
let bytes_written =
futures::io::copy(reader, &mut writer)
.await
.context(error::DownloadSnafu {
region_id,
file_id,
file_type,
})?;
writer.close().await.context(error::DownloadSnafu {
region_id,
file_id,
file_type,
})?;

WRITE_CACHE_DOWNLOAD_BYTES_TOTAL.inc_by(bytes_written);

let elapsed = timer.stop_and_record();
debug!(
"Successfully download file '{}' to local '{}', file size: {}, region: {}, cost: {:?}s",
remote_path, cache_path, bytes_written, region_id, elapsed,
);

let index_value = IndexValue {
file_size: bytes_written as _,
};
self.file_cache.put(index_key, index_value).await;
Ok(())
}

/// Uploads a Parquet file or a Puffin file to the remote object store.
async fn upload(
&self,
Expand Down
79 changes: 77 additions & 2 deletions src/mito2/src/engine/edit_region_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,21 +12,96 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::sync::Arc;
use std::sync::{Arc, Mutex};
use std::time::Duration;

use object_store::ObjectStore;
use store_api::region_engine::RegionEngine;
use store_api::region_request::RegionRequest;
use store_api::storage::RegionId;
use tokio::sync::Barrier;
use tokio::sync::{oneshot, Barrier};

use crate::config::MitoConfig;
use crate::engine::listener::EventListener;
use crate::engine::MitoEngine;
use crate::manifest::action::RegionEdit;
use crate::region::MitoRegionRef;
use crate::sst::file::{FileId, FileMeta};
use crate::test_util::{CreateRequestBuilder, TestEnv};

#[tokio::test]
async fn test_edit_region_fill_cache() {
let mut env = TestEnv::new();

struct EditRegionListener {
tx: Mutex<Option<oneshot::Sender<FileId>>>,
}

impl EventListener for EditRegionListener {
fn on_file_cache_filled(&self, file_id: FileId) {
let mut tx = self.tx.lock().unwrap();
tx.take().unwrap().send(file_id).unwrap();
}
}

let (tx, rx) = oneshot::channel();
let engine = env
.create_engine_with(
MitoConfig {
// Write cache must be enabled to download the ingested SST file.
enable_experimental_write_cache: true,
..Default::default()
},
None,
Some(Arc::new(EditRegionListener {
tx: Mutex::new(Some(tx)),
})),
)
.await;

let region_id = RegionId::new(1, 1);
engine
.handle_request(
region_id,
RegionRequest::Create(CreateRequestBuilder::new().build()),
)
.await
.unwrap();
let region = engine.get_region(region_id).unwrap();

let file_id = FileId::random();
// Simulating the ingestion of an SST file.
env.get_object_store()
.unwrap()
.write(
&format!("{}/{}.parquet", region.region_dir(), file_id),
b"x".as_slice(),
)
.await
.unwrap();

let edit = RegionEdit {
files_to_add: vec![FileMeta {
region_id: region.region_id,
file_id,
level: 0,
..Default::default()
}],
files_to_remove: vec![],
compaction_time_window: None,
flushed_entry_id: None,
flushed_sequence: None,
};
engine.edit_region(region.region_id, edit).await.unwrap();

// Asserts that the background downloading of the SST is succeeded.
let actual = tokio::time::timeout(Duration::from_secs(9), rx)
.await
.unwrap()
.unwrap();
assert_eq!(file_id, actual);
}

#[tokio::test(flavor = "multi_thread", worker_threads = 4)]
async fn test_edit_region_concurrently() {
const EDITS_PER_TASK: usize = 10;
Expand Down
5 changes: 5 additions & 0 deletions src/mito2/src/engine/listener.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ use common_telemetry::info;
use store_api::storage::RegionId;
use tokio::sync::Notify;

use crate::sst::file::FileId;

/// Mito engine background event listener.
#[async_trait]
pub trait EventListener: Send + Sync {
Expand Down Expand Up @@ -61,6 +63,9 @@ pub trait EventListener: Send + Sync {
fn on_recv_requests(&self, request_num: usize) {
let _ = request_num;
}

/// Notifies the listener that the file cache is filled when, for example, editing region.
fn on_file_cache_filled(&self, _file_id: FileId) {}
}

pub type EventListenerRef = Arc<dyn EventListener>;
Expand Down
18 changes: 17 additions & 1 deletion src/mito2/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -639,6 +639,22 @@ pub enum Error {
location: Location,
},

#[snafu(display(
"Failed to download file, region_id: {}, file_id: {}, file_type: {:?}",
region_id,
file_id,
file_type,
))]
Download {
region_id: RegionId,
file_id: FileId,
file_type: FileType,
#[snafu(source)]
error: std::io::Error,
#[snafu(implicit)]
location: Location,
},

#[snafu(display(
"Failed to upload file, region_id: {}, file_id: {}, file_type: {:?}",
region_id,
Expand Down Expand Up @@ -965,7 +981,7 @@ impl ErrorExt for Error {

FilterRecordBatch { source, .. } => source.status_code(),

Upload { .. } => StatusCode::StorageUnavailable,
Download { .. } | Upload { .. } => StatusCode::StorageUnavailable,
ChecksumMismatch { .. } => StatusCode::Unexpected,
RegionStopped { .. } => StatusCode::RegionNotReady,
TimeRangePredicateOverflow { .. } => StatusCode::InvalidArguments,
Expand Down
11 changes: 11 additions & 0 deletions src/mito2/src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -189,6 +189,17 @@ lazy_static! {
&[TYPE_LABEL]
)
.unwrap();
/// Download bytes counter in the write cache.
pub static ref WRITE_CACHE_DOWNLOAD_BYTES_TOTAL: IntCounter = register_int_counter!(
"mito_write_cache_download_bytes_total",
"mito write cache download bytes total",
).unwrap();
/// Timer of the downloading task in the write cache.
pub static ref WRITE_CACHE_DOWNLOAD_ELAPSED: HistogramVec = register_histogram_vec!(
"mito_write_cache_download_elapsed",
"mito write cache download elapsed",
&[TYPE_LABEL],
).unwrap();
/// Upload bytes counter.
pub static ref UPLOAD_BYTES_TOTAL: IntCounter = register_int_counter!(
"mito_upload_bytes_total",
Expand Down
8 changes: 8 additions & 0 deletions src/mito2/src/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ use crate::request::{
BackgroundNotify, DdlRequest, SenderDdlRequest, SenderWriteRequest, WorkerRequest,
};
use crate::schedule::scheduler::{LocalScheduler, SchedulerRef};
use crate::sst::file::FileId;
use crate::sst::index::intermediate::IntermediateManager;
use crate::sst::index::puffin_manager::PuffinManagerFactory;
use crate::time_provider::{StdTimeProvider, TimeProviderRef};
Expand Down Expand Up @@ -950,6 +951,13 @@ impl WorkerListener {
// Avoid compiler warning.
let _ = request_num;
}

pub(crate) fn on_file_cache_filled(&self, _file_id: FileId) {
#[cfg(any(test, feature = "test"))]
if let Some(listener) = &self.listener {
listener.on_file_cache_filled(_file_id);
}
}
}

#[cfg(test)]
Expand Down
43 changes: 40 additions & 3 deletions src/mito2/src/worker/handle_manifest.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ use std::collections::{HashMap, VecDeque};
use common_telemetry::{info, warn};
use store_api::storage::RegionId;

use crate::cache::file_cache::{FileType, IndexKey};
use crate::cache::CacheManagerRef;
use crate::error::{RegionBusySnafu, RegionNotFoundSnafu, Result};
use crate::manifest::action::{
RegionChange, RegionEdit, RegionMetaAction, RegionMetaActionList, RegionTruncate,
Expand All @@ -30,7 +32,8 @@ use crate::request::{
BackgroundNotify, OptionOutputTx, RegionChangeResult, RegionEditRequest, RegionEditResult,
TruncateResult, WorkerRequest,
};
use crate::worker::RegionWorkerLoop;
use crate::sst::location;
use crate::worker::{RegionWorkerLoop, WorkerListener};

pub(crate) type RegionEditQueues = HashMap<RegionId, RegionEditQueue>;

Expand Down Expand Up @@ -105,10 +108,12 @@ impl<S> RegionWorkerLoop<S> {
}

let request_sender = self.sender.clone();
let cache_manager = self.cache_manager.clone();
let listener = self.listener.clone();
// Now the region is in editing state.
// Updates manifest in background.
common_runtime::spawn_global(async move {
let result = edit_region(&region, edit.clone()).await;
let result = edit_region(&region, edit.clone(), cache_manager, listener).await;
let notify = WorkerRequest::Background {
region_id,
notify: BackgroundNotify::RegionEdit(RegionEditResult {
Expand Down Expand Up @@ -286,8 +291,40 @@ impl<S> RegionWorkerLoop<S> {
}

/// Checks the edit, writes and applies it.
async fn edit_region(region: &MitoRegionRef, edit: RegionEdit) -> Result<()> {
async fn edit_region(
region: &MitoRegionRef,
edit: RegionEdit,
cache_manager: CacheManagerRef,
listener: WorkerListener,
) -> Result<()> {
let region_id = region.region_id;
if let Some(write_cache) = cache_manager.write_cache() {
for file_meta in &edit.files_to_add {
let write_cache = write_cache.clone();
let layer = region.access_layer.clone();
let listener = listener.clone();

let index_key = IndexKey::new(region_id, file_meta.file_id, FileType::Parquet);
let remote_path = location::sst_file_path(layer.region_dir(), file_meta.file_id);
common_runtime::spawn_global(async move {
if write_cache
.download(index_key, &remote_path, layer.object_store())
.await
.is_ok()
{
// Triggers the filling of the parquet metadata cache.
// The parquet file is already downloaded.
let _ = write_cache
.file_cache()
.get_parquet_meta_data(index_key)
.await;

listener.on_file_cache_filled(index_key.file_id);
}
});
}
}

info!("Applying {edit:?} to region {}", region_id);

let action_list = RegionMetaActionList::with_action(RegionMetaAction::Edit(edit));
Expand Down

0 comments on commit d45b041

Please sign in to comment.