Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(mito): Add WriteCache struct and write SSTs to write cache #2999

Merged
merged 32 commits into from
Jan 4, 2024
Merged
Show file tree
Hide file tree
Changes from 30 commits
Commits
Show all changes
32 commits
Select commit Hold shift + click to select a range
9e4371d
docs: remove todo
evenyag Dec 20, 2023
6c0ab68
feat: add upload cache
evenyag Dec 21, 2023
195ce56
feat: add cache to sst write path
evenyag Dec 21, 2023
48d1c1b
feat: add storage to part
evenyag Dec 22, 2023
d6eee6d
feat: add dir to part
evenyag Dec 22, 2023
0c2efdf
feat: revert storage name
evenyag Dec 22, 2023
47c9fef
feat: flush use upload part writer
evenyag Dec 22, 2023
1280b1e
feat: use upload part writer in compaction task
evenyag Dec 22, 2023
1247d3d
refactor: upload part writer builds parquet writer
evenyag Dec 22, 2023
ed8d700
chore: suppress warnings
evenyag Dec 22, 2023
8b27e8d
refactor: rename UploadCache to WriteCache
evenyag Dec 22, 2023
de00923
refactor: move source to write_all()
evenyag Dec 22, 2023
61ec72d
chore: typos
evenyag Dec 25, 2023
168f05a
chore: remove output mod
evenyag Dec 25, 2023
5294510
feat: changes upload to async method
evenyag Dec 26, 2023
21a8acd
docs: update cache
evenyag Dec 26, 2023
1ede9b3
chore: Merge branch 'develop' into feat/write-cache
evenyag Dec 27, 2023
c5044a3
chore: fix compiler errors
evenyag Dec 27, 2023
0d90ed6
docs: remove comment
evenyag Dec 28, 2023
0b5544c
chore: simplify upload part
evenyag Dec 28, 2023
d517d41
refactor: remove option from cache manager param to access layer
evenyag Dec 28, 2023
44098d7
chore: Merge branch 'main' into feat/write-cache
evenyag Jan 3, 2024
2044f38
feat: remove cache home from file cache
evenyag Jan 3, 2024
6b814e7
feat: write cache holds file cache
evenyag Jan 3, 2024
e7192b8
feat: add recover and pub some methods
evenyag Jan 3, 2024
0278513
feat: remove usages of UploadPartWriter
evenyag Jan 3, 2024
0e8b58f
refactor: move sst_file_path to sst mod
evenyag Jan 3, 2024
9f03e67
refactor: use write cache in access layer
evenyag Jan 3, 2024
7cb11d8
refactor: remove upload
evenyag Jan 3, 2024
6a66893
style: fix clippy
evenyag Jan 3, 2024
75c0759
chore: Merge branch 'main' into feat/write-cache
evenyag Jan 4, 2024
962a9d1
refactor: pub write cache method/structs
evenyag Jan 4, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
59 changes: 44 additions & 15 deletions src/mito2/src/access_layer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,21 +14,26 @@

use std::sync::Arc;

use object_store::{util, ObjectStore};
use object_store::ObjectStore;
use snafu::ResultExt;
use store_api::metadata::RegionMetadataRef;

use crate::cache::write_cache::SstUploadRequest;
use crate::cache::CacheManagerRef;
use crate::error::{DeleteSstSnafu, Result};
use crate::read::Source;
use crate::sst::file::{FileHandle, FileId};
use crate::sst::parquet::reader::ParquetReaderBuilder;
use crate::sst::parquet::writer::ParquetWriter;
use crate::sst::parquet::{SstInfo, WriteOptions};
use crate::sst::sst_file_path;

pub type AccessLayerRef = Arc<AccessLayer>;

/// A layer to access SST files under the same directory.
pub struct AccessLayer {
region_dir: String,
/// Target object store.
object_store: ObjectStore,
}

Expand Down Expand Up @@ -61,7 +66,7 @@ impl AccessLayer {

/// Deletes a SST file with given file id.
pub(crate) async fn delete_sst(&self, file_id: FileId) -> Result<()> {
let path = self.sst_file_path(&file_id.as_parquet());
let path = sst_file_path(&self.region_dir, file_id);
self.object_store
.delete(&path)
.await
Expand All @@ -73,20 +78,44 @@ impl AccessLayer {
ParquetReaderBuilder::new(self.region_dir.clone(), file, self.object_store.clone())
}

/// Returns a new parquet writer to write the SST for specific `file_id`.
// TODO(hl): maybe rename to [sst_writer].
pub(crate) fn write_sst(
/// Writes a SST with specific `file_id` and `metadata` to the layer.
///
/// Returns the info of the SST. If no data written, returns None.
pub(crate) async fn write_sst(
&self,
file_id: FileId,
metadata: RegionMetadataRef,
source: Source,
) -> ParquetWriter {
let path = self.sst_file_path(&file_id.as_parquet());
ParquetWriter::new(path, metadata, source, self.object_store.clone())
}
request: SstWriteRequest,
write_opts: &WriteOptions,
) -> Result<Option<SstInfo>> {
let path = sst_file_path(&self.region_dir, request.file_id);

if let Some(write_cache) = request.cache_manager.write_cache() {
// Write to the write cache.
return write_cache
.write_and_upload_sst(
SstUploadRequest {
file_id: request.file_id,
metadata: request.metadata,
source: request.source,
storage: request.storage,
upload_path: path,
remote_store: self.object_store.clone(),
},
write_opts,
)
.await;
}

/// Returns the `file_path` for the `file_name` in the object store.
fn sst_file_path(&self, file_name: &str) -> String {
util::join_path(&self.region_dir, file_name)
// Write cache is disabled.
let mut writer = ParquetWriter::new(path, request.metadata, self.object_store.clone());
writer.write_all(request.source, write_opts).await
}
}

/// Contents to build a SST.
pub(crate) struct SstWriteRequest {
pub(crate) file_id: FileId,
pub(crate) metadata: RegionMetadataRef,
pub(crate) source: Source,
pub(crate) cache_manager: CacheManagerRef,
pub(crate) storage: Option<String>,
}
15 changes: 15 additions & 0 deletions src/mito2/src/cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ mod cache_size;
pub(crate) mod file_cache;
#[cfg(test)]
pub(crate) mod test_util;
#[allow(unused)]
pub(crate) mod write_cache;

use std::mem;
use std::sync::Arc;
Expand All @@ -32,6 +34,7 @@ use parquet::file::metadata::ParquetMetaData;
use store_api::storage::RegionId;

use crate::cache::cache_size::parquet_meta_size;
use crate::cache::write_cache::WriteCacheRef;
use crate::metrics::{CACHE_BYTES, CACHE_HIT, CACHE_MISS};
use crate::sst::file::FileId;

Expand All @@ -44,6 +47,8 @@ const PAGE_TYPE: &str = "page";
// Metrics type key for files on the local store.
const FILE_TYPE: &str = "file";

// TODO(yingwen): Builder for cache manager.

/// Manages cached data for the engine.
pub struct CacheManager {
/// Cache for SST metadata.
Expand All @@ -52,6 +57,10 @@ pub struct CacheManager {
vector_cache: Option<VectorCache>,
/// Cache for SST pages.
page_cache: Option<PageCache>,
/// A Cache for writing files to object stores.
// TODO(yingwen): Remove this once the cache is ready.
#[allow(unused)]
write_cache: Option<WriteCacheRef>,
}

pub type CacheManagerRef = Arc<CacheManager>;
Expand Down Expand Up @@ -111,6 +120,7 @@ impl CacheManager {
sst_meta_cache,
vector_cache,
page_cache,
write_cache: None,
}
}

Expand Down Expand Up @@ -184,6 +194,11 @@ impl CacheManager {
cache.insert(page_key, pages);
}
}

/// Gets the the write cache.
pub(crate) fn write_cache(&self) -> Option<&WriteCacheRef> {
self.write_cache.as_ref()
}
}

fn meta_cache_weight(k: &SstMetaKey, v: &Arc<ParquetMetaData>) -> u32 {
Expand Down
58 changes: 14 additions & 44 deletions src/mito2/src/cache/file_cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,14 +14,15 @@

//! A cache for files.

use std::sync::Arc;
use std::time::Instant;

use common_base::readable_size::ReadableSize;
use common_telemetry::{info, warn};
use futures::{FutureExt, TryStreamExt};
use moka::future::Cache;
use moka::notification::RemovalCause;
use object_store::util::{join_dir, join_path};
use object_store::util::join_path;
use object_store::{ErrorKind, Metakey, ObjectStore, Reader};
use snafu::ResultExt;
use store_api::storage::RegionId;
Expand All @@ -32,33 +33,26 @@ use crate::metrics::{CACHE_BYTES, CACHE_HIT, CACHE_MISS};
use crate::sst::file::FileId;

/// Subdirectory of cached files.
const FILE_DIR: &str = "files";
const FILE_DIR: &str = "files/";

/// A file cache manages files on local store and evict files based
/// on size.
#[derive(Debug)]
pub(crate) struct FileCache {
/// Local store to cache files.
local_store: ObjectStore,
/// Cached file directory under cache home.
file_dir: String,
/// Index to track cached files.
///
/// File id is enough to identity a file uniquely.
memory_index: Cache<IndexKey, IndexValue>,
}

pub(crate) type FileCacheRef = Arc<FileCache>;

impl FileCache {
/// Creates a new file cache.
pub(crate) fn new(
local_store: ObjectStore,
cache_home: String,
capacity: ReadableSize,
) -> FileCache {
// Stores files under `cache_home/{FILE_DIR}`.
let file_dir = cache_file_dir(&cache_home);
pub(crate) fn new(local_store: ObjectStore, capacity: ReadableSize) -> FileCache {
let cache_store = local_store.clone();
let cache_file_dir = file_dir.clone();
let memory_index = Cache::builder()
.weigher(|_key, value: &IndexValue| -> u32 {
// We only measure space on local store.
Expand All @@ -67,7 +61,8 @@ impl FileCache {
.max_capacity(capacity.as_bytes())
.async_eviction_listener(move |key, value, cause| {
let store = cache_store.clone();
let file_path = cache_file_path(&cache_file_dir, *key);
// Stores files under FILE_DIR.
let file_path = cache_file_path(FILE_DIR, *key);
async move {
if let RemovalCause::Replaced = cause {
// The cache is replaced by another file. This is unexpected, we don't remove the same
Expand All @@ -91,7 +86,6 @@ impl FileCache {
.build();
FileCache {
local_store,
file_dir,
memory_index,
}
}
Expand Down Expand Up @@ -145,7 +139,7 @@ impl FileCache {

let mut lister = self
.local_store
.lister_with(&self.file_dir)
.lister_with(FILE_DIR)
.metakey(Metakey::ContentLength)
.await
.context(OpenDalSnafu)?;
Expand Down Expand Up @@ -182,7 +176,7 @@ impl FileCache {

/// Returns the cache file path for the key.
pub(crate) fn cache_file_path(&self, key: IndexKey) -> String {
cache_file_path(&self.file_dir, key)
cache_file_path(FILE_DIR, key)
}

/// Returns the local store of the file cache.
Expand All @@ -203,11 +197,6 @@ pub(crate) struct IndexValue {
file_size: u32,
}

/// Returns the directory to store files.
fn cache_file_dir(cache_home: &str) -> String {
join_dir(cache_home, FILE_DIR)
}

/// Generates the path to the cached file.
///
/// The file name format is `{region_id}.{file_id}`
Expand Down Expand Up @@ -245,13 +234,8 @@ mod tests {
async fn test_file_cache_basic() {
let dir = create_temp_dir("");
let local_store = new_fs_store(dir.path().to_str().unwrap());
let cache_home = "cache".to_string();

let cache = FileCache::new(
local_store.clone(),
cache_home.clone(),
ReadableSize::mb(10),
);
let cache = FileCache::new(local_store.clone(), ReadableSize::mb(10));
let region_id = RegionId::new(2000, 0);
let file_id = FileId::random();
let key = (region_id, file_id);
Expand Down Expand Up @@ -291,13 +275,8 @@ mod tests {
async fn test_file_cache_file_removed() {
let dir = create_temp_dir("");
let local_store = new_fs_store(dir.path().to_str().unwrap());
let cache_home = "cache".to_string();

let cache = FileCache::new(
local_store.clone(),
cache_home.clone(),
ReadableSize::mb(10),
);
let cache = FileCache::new(local_store.clone(), ReadableSize::mb(10));
let region_id = RegionId::new(2000, 0);
let file_id = FileId::random();
let key = (region_id, file_id);
Expand Down Expand Up @@ -326,12 +305,7 @@ mod tests {
async fn test_file_cache_recover() {
let dir = create_temp_dir("");
let local_store = new_fs_store(dir.path().to_str().unwrap());
let cache_home = "cache".to_string();
let cache = FileCache::new(
local_store.clone(),
cache_home.clone(),
ReadableSize::mb(10),
);
let cache = FileCache::new(local_store.clone(), ReadableSize::mb(10));

let region_id = RegionId::new(2000, 0);
// Write N files.
Expand All @@ -354,11 +328,7 @@ mod tests {
}

// Recover the cache.
let cache = FileCache::new(
local_store.clone(),
cache_home.clone(),
ReadableSize::mb(10),
);
let cache = FileCache::new(local_store.clone(), ReadableSize::mb(10));
// No entry before recovery.
assert!(cache.reader((region_id, file_ids[0])).await.is_none());
cache.recover().await.unwrap();
Expand Down
86 changes: 86 additions & 0 deletions src/mito2/src/cache/write_cache.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

//! A write-through cache for remote object stores.

use std::sync::Arc;

use common_base::readable_size::ReadableSize;
use object_store::manager::ObjectStoreManagerRef;
use object_store::ObjectStore;
use store_api::metadata::RegionMetadataRef;

use crate::cache::file_cache::{FileCache, FileCacheRef};
use crate::error::Result;
use crate::read::Source;
use crate::sst::file::FileId;
use crate::sst::parquet::writer::ParquetWriter;
use crate::sst::parquet::{SstInfo, WriteOptions};

/// A cache for uploading files to remote object stores.
///
/// It keeps files in local disk and then sends files to object stores.
pub struct WriteCache {
/// Local file cache.
file_cache: FileCacheRef,
/// Object store manager.
object_store_manager: ObjectStoreManagerRef,
}

pub type WriteCacheRef = Arc<WriteCache>;

impl WriteCache {
/// Create the cache with a `local_store` to cache files and a
/// `object_store_manager` for all object stores.
pub fn new(
local_store: ObjectStore,
object_store_manager: ObjectStoreManagerRef,
cache_capacity: ReadableSize,
) -> Self {
Self {
file_cache: Arc::new(FileCache::new(local_store, cache_capacity)),
object_store_manager,
}
}

/// Recovers the write cache from local store.
pub async fn recover(&self) -> Result<()> {
self.file_cache.recover().await
}

/// Writes SST to the cache and then uploads it to the remote object store.
pub(crate) async fn write_and_upload_sst(
&self,
request: SstUploadRequest,
write_opts: &WriteOptions,
) -> Result<Option<SstInfo>> {
// TODO(yingwen): Write to the local store and then upload.
// Now we write to the remote and ignore local cache.
let mut writer =
ParquetWriter::new(request.upload_path, request.metadata, request.remote_store);
writer.write_all(request.source, write_opts).await
}
}

/// Request to write and upload a SST.
pub(crate) struct SstUploadRequest {
evenyag marked this conversation as resolved.
Show resolved Hide resolved
pub(crate) file_id: FileId,
pub(crate) metadata: RegionMetadataRef,
pub(crate) source: Source,
pub(crate) storage: Option<String>,
/// Path to upload the file.
pub(crate) upload_path: String,
/// Remote object store to upload.
pub(crate) remote_store: ObjectStore,
}
Loading
Loading