Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: pre-download the ingested sst #4636

Merged
merged 4 commits into from
Aug 29, 2024
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
78 changes: 77 additions & 1 deletion src/mito2/src/cache/write_cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,10 @@ use snafu::ResultExt;
use crate::access_layer::{new_fs_cache_store, SstWriteRequest};
use crate::cache::file_cache::{FileCache, FileCacheRef, FileType, IndexKey, IndexValue};
use crate::error::{self, Result};
use crate::metrics::{FLUSH_ELAPSED, UPLOAD_BYTES_TOTAL};
use crate::metrics::{
FLUSH_ELAPSED, UPLOAD_BYTES_TOTAL, WRITE_CACHE_DOWNLOAD_BYTES_TOTAL,
WRITE_CACHE_DOWNLOAD_ELAPSED_TOTAL,
};
use crate::sst::index::intermediate::IntermediateManager;
use crate::sst::index::puffin_manager::PuffinManagerFactory;
use crate::sst::index::IndexerBuilder;
Expand Down Expand Up @@ -166,6 +169,79 @@ impl WriteCache {
Ok(Some(sst_info))
}

/// Downloads a file in `remote_path` from the remote object store to the local cache
/// (specified by `index_key`).
pub(crate) async fn download(
&self,
index_key: IndexKey,
remote_path: &str,
remote_store: &ObjectStore,
) -> Result<()> {
MichaelScofield marked this conversation as resolved.
Show resolved Hide resolved
const DOWNLOAD_READER_CONCURRENCY: usize = 8;
const DOWNLOAD_READER_CHUNK_SIZE: ReadableSize = ReadableSize::mb(8);

let file_type = index_key.file_type;
let timer = WRITE_CACHE_DOWNLOAD_ELAPSED_TOTAL
.with_label_values(&[match file_type {
FileType::Parquet => "download_parquet",
FileType::Puffin => "download_puffin",
}])
.start_timer();

let remote_metadata = remote_store
.stat(remote_path)
.await
.context(error::OpenDalSnafu)?;
let reader = remote_store
.reader_with(remote_path)
.concurrent(DOWNLOAD_READER_CONCURRENCY)
.chunk(DOWNLOAD_READER_CHUNK_SIZE.as_bytes() as usize)
.await
.context(error::OpenDalSnafu)?
.into_futures_async_read(0..remote_metadata.content_length())
Comment on lines +191 to +201
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We already stored the file size in the FileMeta so we don't need to call stat here. We can pass the file size (or the IndexValue) to the download method to avoid one stat call.

.await
.context(error::OpenDalSnafu)?;

let cache_path = self.file_cache.cache_file_path(index_key);
let mut writer = self
.file_cache
.local_store()
.writer(&cache_path)
.await
.context(error::OpenDalSnafu)?
.into_futures_async_write();

let region_id = index_key.region_id;
let file_id = index_key.file_id;
let bytes_written =
futures::io::copy(reader, &mut writer)
.await
.context(error::DownloadSnafu {
region_id,
file_id,
file_type,
})?;
writer.close().await.context(error::DownloadSnafu {
region_id,
file_id,
file_type,
})?;

WRITE_CACHE_DOWNLOAD_BYTES_TOTAL.inc_by(bytes_written);

let elapsed = timer.stop_and_record();
debug!(
"Successfully download file '{}' to local '{}', file size: {}, region: {}, cost: {:?}s",
remote_path, cache_path, bytes_written, region_id, elapsed,
);

let index_value = IndexValue {
file_size: bytes_written as _,
};
self.file_cache.put(index_key, index_value).await;
Ok(())
}

/// Uploads a Parquet file or a Puffin file to the remote object store.
async fn upload(
&self,
Expand Down
79 changes: 77 additions & 2 deletions src/mito2/src/engine/edit_region_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,21 +12,96 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::sync::Arc;
use std::sync::{Arc, Mutex};
use std::time::Duration;

use object_store::ObjectStore;
use store_api::region_engine::RegionEngine;
use store_api::region_request::RegionRequest;
use store_api::storage::RegionId;
use tokio::sync::Barrier;
use tokio::sync::{oneshot, Barrier};

use crate::config::MitoConfig;
use crate::engine::listener::EventListener;
use crate::engine::MitoEngine;
use crate::manifest::action::RegionEdit;
use crate::region::MitoRegionRef;
use crate::sst::file::{FileId, FileMeta};
use crate::test_util::{CreateRequestBuilder, TestEnv};

#[tokio::test]
async fn test_edit_region_fill_cache() {
let mut env = TestEnv::new();

struct EditRegionListener {
tx: Mutex<Option<oneshot::Sender<FileId>>>,
}

impl EventListener for EditRegionListener {
fn on_file_cache_filled(&self, file_id: FileId) {
let mut tx = self.tx.lock().unwrap();
tx.take().unwrap().send(file_id).unwrap();
}
}

let (tx, rx) = oneshot::channel();
let engine = env
.create_engine_with(
MitoConfig {
// Write cache must be enabled to download the ingested SST file.
enable_experimental_write_cache: true,
..Default::default()
},
None,
Some(Arc::new(EditRegionListener {
tx: Mutex::new(Some(tx)),
})),
)
.await;

let region_id = RegionId::new(1, 1);
engine
.handle_request(
region_id,
RegionRequest::Create(CreateRequestBuilder::new().build()),
)
.await
.unwrap();
let region = engine.get_region(region_id).unwrap();

let file_id = FileId::random();
// Simulating the ingestion of an SST file.
env.get_object_store()
.unwrap()
.write(
&format!("{}/{}.parquet", region.region_dir(), file_id),
b"x".as_slice(),
)
.await
.unwrap();

let edit = RegionEdit {
files_to_add: vec![FileMeta {
region_id: region.region_id,
file_id,
level: 0,
..Default::default()
}],
files_to_remove: vec![],
compaction_time_window: None,
flushed_entry_id: None,
flushed_sequence: None,
};
engine.edit_region(region.region_id, edit).await.unwrap();

// Asserts that the background downloading of the SST is succeeded.
let actual = tokio::time::timeout(Duration::from_secs(9), rx)
.await
.unwrap()
.unwrap();
assert_eq!(file_id, actual);
}

#[tokio::test(flavor = "multi_thread", worker_threads = 4)]
async fn test_edit_region_concurrently() {
const EDITS_PER_TASK: usize = 10;
Expand Down
5 changes: 5 additions & 0 deletions src/mito2/src/engine/listener.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ use common_telemetry::info;
use store_api::storage::RegionId;
use tokio::sync::Notify;

use crate::sst::file::FileId;

/// Mito engine background event listener.
#[async_trait]
pub trait EventListener: Send + Sync {
Expand Down Expand Up @@ -61,6 +63,9 @@ pub trait EventListener: Send + Sync {
fn on_recv_requests(&self, request_num: usize) {
let _ = request_num;
}

/// Notifies the listener that the file cache is filled when, for example, editing region.
fn on_file_cache_filled(&self, _file_id: FileId) {}
}

pub type EventListenerRef = Arc<dyn EventListener>;
Expand Down
18 changes: 17 additions & 1 deletion src/mito2/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -639,6 +639,22 @@ pub enum Error {
location: Location,
},

#[snafu(display(
"Failed to download file, region_id: {}, file_id: {}, file_type: {:?}",
region_id,
file_id,
file_type,
))]
Download {
region_id: RegionId,
file_id: FileId,
file_type: FileType,
#[snafu(source)]
error: std::io::Error,
#[snafu(implicit)]
location: Location,
},

#[snafu(display(
"Failed to upload file, region_id: {}, file_id: {}, file_type: {:?}",
region_id,
Expand Down Expand Up @@ -965,7 +981,7 @@ impl ErrorExt for Error {

FilterRecordBatch { source, .. } => source.status_code(),

Upload { .. } => StatusCode::StorageUnavailable,
Download { .. } | Upload { .. } => StatusCode::StorageUnavailable,
ChecksumMismatch { .. } => StatusCode::Unexpected,
RegionStopped { .. } => StatusCode::RegionNotReady,
TimeRangePredicateOverflow { .. } => StatusCode::InvalidArguments,
Expand Down
11 changes: 11 additions & 0 deletions src/mito2/src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -189,6 +189,17 @@ lazy_static! {
&[TYPE_LABEL]
)
.unwrap();
/// Download bytes counter in the write cache.
pub static ref WRITE_CACHE_DOWNLOAD_BYTES_TOTAL: IntCounter = register_int_counter!(
"mito_write_cache_download_bytes_total",
"mito write cache download bytes total",
).unwrap();
/// Timer of the downloading task in the write cache.
pub static ref WRITE_CACHE_DOWNLOAD_ELAPSED_TOTAL: HistogramVec = register_histogram_vec!(
"mito_write_cache_download_elapsed_total",
MichaelScofield marked this conversation as resolved.
Show resolved Hide resolved
"mito write cache download elapsed total",
&[TYPE_LABEL],
).unwrap();
/// Upload bytes counter.
pub static ref UPLOAD_BYTES_TOTAL: IntCounter = register_int_counter!(
"mito_upload_bytes_total",
Expand Down
8 changes: 8 additions & 0 deletions src/mito2/src/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ use crate::request::{
BackgroundNotify, DdlRequest, SenderDdlRequest, SenderWriteRequest, WorkerRequest,
};
use crate::schedule::scheduler::{LocalScheduler, SchedulerRef};
use crate::sst::file::FileId;
use crate::sst::index::intermediate::IntermediateManager;
use crate::sst::index::puffin_manager::PuffinManagerFactory;
use crate::time_provider::{StdTimeProvider, TimeProviderRef};
Expand Down Expand Up @@ -950,6 +951,13 @@ impl WorkerListener {
// Avoid compiler warning.
let _ = request_num;
}

pub(crate) fn on_file_cache_filled(&self, _file_id: FileId) {
#[cfg(any(test, feature = "test"))]
if let Some(listener) = &self.listener {
listener.on_file_cache_filled(_file_id);
}
}
}

#[cfg(test)]
Expand Down
43 changes: 40 additions & 3 deletions src/mito2/src/worker/handle_manifest.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ use std::collections::{HashMap, VecDeque};
use common_telemetry::{info, warn};
use store_api::storage::RegionId;

use crate::cache::file_cache::{FileType, IndexKey};
use crate::cache::CacheManagerRef;
use crate::error::{RegionBusySnafu, RegionNotFoundSnafu, Result};
use crate::manifest::action::{
RegionChange, RegionEdit, RegionMetaAction, RegionMetaActionList, RegionTruncate,
Expand All @@ -30,7 +32,8 @@ use crate::request::{
BackgroundNotify, OptionOutputTx, RegionChangeResult, RegionEditRequest, RegionEditResult,
TruncateResult, WorkerRequest,
};
use crate::worker::RegionWorkerLoop;
use crate::sst::location;
use crate::worker::{RegionWorkerLoop, WorkerListener};

pub(crate) type RegionEditQueues = HashMap<RegionId, RegionEditQueue>;

Expand Down Expand Up @@ -105,10 +108,12 @@ impl<S> RegionWorkerLoop<S> {
}

let request_sender = self.sender.clone();
let cache_manager = self.cache_manager.clone();
let listener = self.listener.clone();
// Now the region is in editing state.
// Updates manifest in background.
common_runtime::spawn_global(async move {
let result = edit_region(&region, edit.clone()).await;
let result = edit_region(&region, edit.clone(), cache_manager, listener).await;
let notify = WorkerRequest::Background {
region_id,
notify: BackgroundNotify::RegionEdit(RegionEditResult {
Expand Down Expand Up @@ -286,8 +291,40 @@ impl<S> RegionWorkerLoop<S> {
}

/// Checks the edit, writes and applies it.
async fn edit_region(region: &MitoRegionRef, edit: RegionEdit) -> Result<()> {
async fn edit_region(
region: &MitoRegionRef,
edit: RegionEdit,
cache_manager: CacheManagerRef,
listener: WorkerListener,
) -> Result<()> {
let region_id = region.region_id;
if let Some(write_cache) = cache_manager.write_cache() {
for file_meta in &edit.files_to_add {
let write_cache = write_cache.clone();
let layer = region.access_layer.clone();
let listener = listener.clone();

let index_key = IndexKey::new(region_id, file_meta.file_id, FileType::Parquet);
let remote_path = location::sst_file_path(layer.region_dir(), file_meta.file_id);
common_runtime::spawn_global(async move {
if write_cache
.download(index_key, &remote_path, layer.object_store())
.await
.is_ok()
{
// Triggers the filling of the parquet metadata cache.
// The parquet file is already downloaded.
let _ = write_cache
.file_cache()
.get_parquet_meta_data(index_key)
.await;

listener.on_file_cache_filled(index_key.file_id);
}
});
}
}

info!("Applying {edit:?} to region {}", region_id);

let action_list = RegionMetaActionList::with_action(RegionMetaAction::Edit(edit));
Expand Down
Loading