Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(mito): enable inverted index #3158

Merged
merged 11 commits into from
Jan 15, 2024
19 changes: 19 additions & 0 deletions config/datanode.example.toml
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,25 @@ parallel_scan_channel_size = 32
# Whether to allow stale WAL entries read during replay.
allow_stale_entries = false

[region_engine.mito.inverted_index]
# Whether to create the index on flush.
# - "auto": automatically
fengjiachun marked this conversation as resolved.
Show resolved Hide resolved
# - "disable": never
create_on_flush = "auto"
# Whether to create the index on compaction.
# - "auto": automatically
# - "disable": never
create_on_compaction = "auto"
# Whether to apply the index on query
# - "auto": automatically
# - "disable": never
apply_on_query = "auto"
# Memory threshold for performing an external sort during index creation.
# Setting to empty will disable external sorting, forcing all sorting operations to happen in memory.
mem_threshold_on_create = "64MB"
# File system path to store intermediate files for external sorting (default `{data_home}/index_intermediate`).
intermediate_path = ""

# Log options, see `standalone.example.toml`
# [logging]
# dir = "/tmp/greptimedb/logs"
Expand Down
19 changes: 19 additions & 0 deletions config/standalone.example.toml
Original file line number Diff line number Diff line change
Expand Up @@ -216,6 +216,25 @@ parallel_scan_channel_size = 32
# Whether to allow stale WAL entries read during replay.
allow_stale_entries = false

[region_engine.mito.inverted_index]
# Whether to create the index on flush.
# - "auto": automatically
# - "disable": never
create_on_flush = "auto"
# Whether to create the index on compaction.
# - "auto": automatically
# - "disable": never
create_on_compaction = "auto"
# Whether to apply the index on query
# - "auto": automatically
# - "disable": never
apply_on_query = "auto"
# Memory threshold for performing an external sort during index creation.
# Setting to empty will disable external sorting, forcing all sorting operations to happen in memory.
mem_threshold_on_create = "64M"
# File system path to store intermediate files for external sorting (default `{data_home}/index_intermediate`).
intermediate_path = ""

# Log options
# [logging]
# Specify logs directory.
Expand Down
1 change: 1 addition & 0 deletions src/datanode/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -284,6 +284,7 @@ impl DatanodeOptions {
}
}

#[allow(clippy::large_enum_variant)]
#[derive(Debug, Serialize, Deserialize, Clone, PartialEq, Eq)]
pub enum RegionEngineConfig {
#[serde(rename = "mito")]
Expand Down
16 changes: 5 additions & 11 deletions src/datanode/src/datanode.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ use metric_engine::engine::MetricEngine;
use mito2::config::MitoConfig;
use mito2::engine::MitoEngine;
use object_store::manager::{ObjectStoreManager, ObjectStoreManagerRef};
use object_store::util::{join_dir, normalize_dir};
use object_store::util::normalize_dir;
use query::QueryEngineFactory;
use servers::export_metrics::ExportMetricsTask;
use servers::grpc::builder::GrpcServerBuilder;
Expand Down Expand Up @@ -457,27 +457,21 @@ impl DatanodeBuilder {
async fn build_mito_engine(
opts: &DatanodeOptions,
object_store_manager: ObjectStoreManagerRef,
mut config: MitoConfig,
config: MitoConfig,
) -> Result<MitoEngine> {
// Sets write cache path if it is empty.
if config.experimental_write_cache_path.is_empty() {
config.experimental_write_cache_path = join_dir(&opts.storage.data_home, "write_cache");
info!(
"Sets write cache path to {}",
config.experimental_write_cache_path
);
}

let mito_engine = match &opts.wal {
WalConfig::RaftEngine(raft_engine_config) => MitoEngine::new(
&opts.storage.data_home,
config,
Self::build_raft_engine_log_store(&opts.storage.data_home, raft_engine_config)
.await?,
object_store_manager,
)
.await
.context(BuildMitoEngineSnafu)?,

WalConfig::Kafka(kafka_config) => MitoEngine::new(
&opts.storage.data_home,
config,
Self::build_kafka_log_store(kafka_config).await?,
object_store_manager,
Expand Down
2 changes: 1 addition & 1 deletion src/index/src/inverted_index/create.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ use crate::inverted_index::BytesRef;

/// `InvertedIndexCreator` provides functionality to construct an inverted index
#[async_trait]
pub trait InvertedIndexCreator {
pub trait InvertedIndexCreator: Send {
/// Adds a value to the named index. A `None` value represents an absence of data (null)
///
/// - `index_name`: Identifier for the index being built
Expand Down
2 changes: 0 additions & 2 deletions src/index/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,5 @@
// limitations under the License.

#![feature(iter_partition_in_place)]
// TODO(zhongzc): remove once further code is added
#![allow(dead_code)]

pub mod inverted_index;
47 changes: 34 additions & 13 deletions src/mito2/src/access_layer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ use crate::cache::CacheManagerRef;
use crate::error::{CleanDirSnafu, DeleteIndexSnafu, DeleteSstSnafu, OpenDalSnafu, Result};
use crate::read::Source;
use crate::sst::file::{FileHandle, FileId, FileMeta};
use crate::sst::index::intermediate::IntermediateManager;
use crate::sst::index::Indexer;
use crate::sst::location;
use crate::sst::parquet::reader::ParquetReaderBuilder;
use crate::sst::parquet::writer::ParquetWriter;
Expand All @@ -37,6 +39,8 @@ pub struct AccessLayer {
region_dir: String,
/// Target object store.
object_store: ObjectStore,
/// Intermediate manager for inverted index.
intermediate_manager: IntermediateManager,
}

impl std::fmt::Debug for AccessLayer {
Expand All @@ -49,10 +53,15 @@ impl std::fmt::Debug for AccessLayer {

impl AccessLayer {
/// Returns a new [AccessLayer] for specific `region_dir`.
pub fn new(region_dir: impl Into<String>, object_store: ObjectStore) -> AccessLayer {
pub fn new(
region_dir: impl Into<String>,
object_store: ObjectStore,
intermediate_manager: IntermediateManager,
) -> AccessLayer {
AccessLayer {
region_dir: region_dir.into(),
object_store,
intermediate_manager,
}
}

Expand Down Expand Up @@ -105,38 +114,45 @@ impl AccessLayer {
let file_path = location::sst_file_path(&self.region_dir, request.file_id);
let index_file_path = location::index_file_path(&self.region_dir, request.file_id);
let region_id = request.metadata.region_id;
let file_id = request.file_id;
let cache_manager = request.cache_manager.clone();

let sst_info = if let Some(write_cache) = request.cache_manager.write_cache() {
let sst_info = if let Some(write_cache) = cache_manager.write_cache() {
// Write to the write cache.
write_cache
.write_and_upload_sst(
request,
SstUploadRequest {
file_id: request.file_id,
metadata: request.metadata,
source: request.source,
storage: request.storage,
upload_path: file_path,
index_upload_path: index_file_path,
remote_store: self.object_store.clone(),
},
write_opts,
self.intermediate_manager.clone(),
)
.await?
} else {
// Write cache is disabled.
let mut writer =
ParquetWriter::new(file_path, request.metadata, self.object_store.clone());
let indexer = Indexer::new(
&request,
write_opts,
index_file_path,
self.intermediate_manager.clone(),
self.object_store.clone(),
);
let mut writer = ParquetWriter::new(
file_path,
request.metadata,
self.object_store.clone(),
indexer,
);
writer.write_all(request.source, write_opts).await?
};

// Put parquet metadata to cache manager.
if let Some(sst_info) = &sst_info {
if let Some(parquet_metadata) = &sst_info.file_metadata {
request.cache_manager.put_parquet_meta_data(
region_id,
request.file_id,
parquet_metadata.clone(),
)
cache_manager.put_parquet_meta_data(region_id, file_id, parquet_metadata.clone())
}
}

Expand All @@ -150,7 +166,12 @@ pub(crate) struct SstWriteRequest {
pub(crate) metadata: RegionMetadataRef,
pub(crate) source: Source,
pub(crate) cache_manager: CacheManagerRef,
#[allow(dead_code)]
pub(crate) storage: Option<String>,
/// Whether to create inverted index.
pub(crate) create_inverted_index: bool,
/// The threshold of memory size to create inverted index.
pub(crate) mem_threshold_index_create: Option<usize>,
}

/// Creates a fs object store with atomic write dir.
Expand Down
65 changes: 48 additions & 17 deletions src/mito2/src/cache/write_cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,12 +27,14 @@ use snafu::ResultExt;
use store_api::metadata::RegionMetadataRef;
use store_api::storage::RegionId;

use crate::access_layer::new_fs_object_store;
use crate::access_layer::{new_fs_object_store, SstWriteRequest};
use crate::cache::file_cache::{FileCache, FileCacheRef, FileType, IndexKey, IndexValue};
use crate::error::{self, Result};
use crate::metrics::{FLUSH_ELAPSED, UPLOAD_BYTES_TOTAL};
use crate::read::Source;
use crate::sst::file::FileId;
use crate::sst::index::intermediate::IntermediateManager;
use crate::sst::index::Indexer;
use crate::sst::parquet::writer::ParquetWriter;
use crate::sst::parquet::{SstInfo, WriteOptions};
use crate::sst::DEFAULT_WRITE_BUFFER_SIZE;
Expand Down Expand Up @@ -84,27 +86,39 @@ impl WriteCache {
}

/// Writes SST to the cache and then uploads it to the remote object store.
pub async fn write_and_upload_sst(
pub(crate) async fn write_and_upload_sst(
&self,
request: SstUploadRequest,
write_request: SstWriteRequest,
upload_request: SstUploadRequest,
write_opts: &WriteOptions,
intermediate_manager: IntermediateManager,
zhongzc marked this conversation as resolved.
Show resolved Hide resolved
) -> Result<Option<SstInfo>> {
let timer = FLUSH_ELAPSED
.with_label_values(&["write_sst"])
.start_timer();

let region_id = request.metadata.region_id;
let file_id = request.file_id;
let region_id = write_request.metadata.region_id;
let file_id = write_request.file_id;
let parquet_key = IndexKey::new(region_id, file_id, FileType::Parquet);
let puffin_key = IndexKey::new(region_id, file_id, FileType::Puffin);

let indexer = Indexer::new(
&write_request,
write_opts,
self.file_cache.cache_file_path(puffin_key),
intermediate_manager,
self.file_cache.local_store(),
);

// Write to FileCache.
let mut writer = ParquetWriter::new(
self.file_cache.cache_file_path(parquet_key),
request.metadata,
write_request.metadata,
self.file_cache.local_store(),
indexer,
);

let sst_info = writer.write_all(request.source, write_opts).await?;
let sst_info = writer.write_all(write_request.source, write_opts).await?;

timer.stop_and_record();

Expand All @@ -114,13 +128,13 @@ impl WriteCache {
return Ok(None);
};

let parquet_path = &request.upload_path;
let remote_store = &request.remote_store;
let parquet_path = &upload_request.upload_path;
let remote_store = &upload_request.remote_store;
self.upload(parquet_key, parquet_path, remote_store).await?;

if sst_info.inverted_index_available {
let puffin_key = IndexKey::new(region_id, file_id, FileType::Puffin);
let puffin_path = &request.index_upload_path;
let puffin_path = &upload_request.index_upload_path;
self.upload(puffin_key, puffin_path, remote_store).await?;
}

Expand Down Expand Up @@ -193,10 +207,6 @@ impl WriteCache {

/// Request to write and upload a SST.
pub struct SstUploadRequest {
pub file_id: FileId,
pub metadata: RegionMetadataRef,
pub source: Source,
pub storage: Option<String>,
/// Path to upload the file.
pub upload_path: String,
/// Path to upload the index file.
Expand Down Expand Up @@ -230,10 +240,14 @@ mod tests {
// TODO(QuenKar): maybe find a way to create some object server for testing,
// and now just use local file system to mock.
let mut env = TestEnv::new();
let data_home = env.data_home().display().to_string();
let mock_store = env.init_object_store_manager();
let file_id = FileId::random();
let upload_path = sst_file_path("test", file_id);
let index_upload_path = index_file_path("test", file_id);
let intm_mgr = IntermediateManager::init_fs(format!("{data_home}/intermediate"))
.await
.unwrap();

// Create WriteCache
let local_dir = create_temp_dir("");
Expand All @@ -256,13 +270,19 @@ mod tests {
new_batch_by_range(&["b", "h"], 100, 200),
]);

let request = SstUploadRequest {
let write_request = SstWriteRequest {
file_id,
metadata,
source,
storage: None,
create_inverted_index: true,
mem_threshold_index_create: None,
cache_manager: Default::default(),
};

let request = SstUploadRequest {
upload_path: upload_path.clone(),
index_upload_path,
index_upload_path: index_upload_path.clone(),
remote_store: mock_store.clone(),
};

Expand All @@ -273,7 +293,7 @@ mod tests {

// Write to cache and upload sst to mock remote store
let sst_info = write_cache
.write_and_upload_sst(request, &write_opts)
.write_and_upload_sst(write_request, request, &write_opts, intm_mgr)
.await
.unwrap()
.unwrap();
Expand All @@ -289,5 +309,16 @@ mod tests {
.await
.unwrap();
assert_eq!(remote_data, cache_data);

// Check write cache contains the index key
let index_key = IndexKey::new(region_id, file_id, FileType::Puffin);
assert!(write_cache.file_cache.contains_key(&index_key));

let remote_index_data = mock_store.read(&index_upload_path).await.unwrap();
let cache_index_data = local_store
.read(&write_cache.file_cache.cache_file_path(index_key))
.await
.unwrap();
assert_eq!(remote_index_data, cache_index_data);
}
}
Loading
Loading