From a98a6212c41d1b859b8575f1c6d832fada89f898 Mon Sep 17 00:00:00 2001 From: WenyXu Date: Tue, 6 Aug 2024 19:39:38 +0000 Subject: [PATCH 1/8] feat: introduce the `CollectionTask` --- Cargo.lock | 13 ++ src/log-store/Cargo.toml | 4 + src/log-store/src/error.rs | 26 ++- src/log-store/src/kafka/index.rs | 4 +- src/log-store/src/kafka/index/collector.rs | 146 ++++++++++++++-- src/log-store/src/kafka/index/encoder.rs | 193 +++++++++++++++++++++ 6 files changed, 373 insertions(+), 13 deletions(-) create mode 100644 src/log-store/src/kafka/index/encoder.rs diff --git a/Cargo.lock b/Cargo.lock index 77112f61ada5..8243c9efa735 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3168,6 +3168,15 @@ dependencies = [ "uuid", ] +[[package]] +name = "delta-encoding" +version = "0.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4f8513a5eeb3d7b9149563409dc4ab6fd9de5767fd285af5b4d0ee1b778fbce0" +dependencies = [ + "num-traits", +] + [[package]] name = "der" version = "0.5.1" @@ -5787,6 +5796,7 @@ checksum = "90ed8c1e510134f979dbc4f070f87d4313098b704861a105fe34231c70a3901c" name = "log-store" version = "0.9.1" dependencies = [ + "arrow", "async-stream", "async-trait", "bytes", @@ -5800,10 +5810,13 @@ dependencies = [ "common-test-util", "common-time", "common-wal", + "delta-encoding", "futures", "futures-util", "itertools 0.10.5", "lazy_static", + "object-store", + "parquet", "pin-project", "prometheus", "protobuf", diff --git a/src/log-store/Cargo.toml b/src/log-store/Cargo.toml index 6a84965974eb..fd3df5cda6fe 100644 --- a/src/log-store/Cargo.toml +++ b/src/log-store/Cargo.toml @@ -13,6 +13,7 @@ protobuf-build = { version = "0.15", default-features = false, features = [ workspace = true [dependencies] +arrow.workspace = true async-stream.workspace = true async-trait.workspace = true bytes.workspace = true @@ -25,10 +26,13 @@ common-runtime.workspace = true common-telemetry.workspace = true common-time.workspace = true common-wal.workspace = true +delta-encoding = "0.4" futures.workspace = true futures-util.workspace = true itertools.workspace = true lazy_static.workspace = true +object-store.workspace = true +parquet.workspace = true pin-project.workspace = true prometheus.workspace = true protobuf = { version = "2", features = ["bytes"] } diff --git a/src/log-store/src/error.rs b/src/log-store/src/error.rs index 5572a05dddfa..14648a8e9288 100644 --- a/src/log-store/src/error.rs +++ b/src/log-store/src/error.rs @@ -272,7 +272,7 @@ pub enum Error { location: Location, }, - #[snafu(display("Failed to send produce request"))] + #[snafu(display("Failed to wait for ProduceResultReceiver"))] WaitProduceResultReceiver { #[snafu(implicit)] location: Location, @@ -280,6 +280,30 @@ pub enum Error { error: tokio::sync::oneshot::error::RecvError, }, + #[snafu(display("Failed to wait for result of DumpIndex"))] + WaitDumpIndex { + #[snafu(implicit)] + location: Location, + #[snafu(source)] + error: tokio::sync::oneshot::error::RecvError, + }, + + #[snafu(display("Failed to create writer"))] + CreateWriter { + #[snafu(implicit)] + location: Location, + #[snafu(source)] + error: object_store::Error, + }, + + #[snafu(display("Failed to write index"))] + WriteIndex { + #[snafu(implicit)] + location: Location, + #[snafu(source)] + error: object_store::Error, + }, + #[snafu(display( "The length of meta if exceeded the limit: {}, actual: {}", limit, diff --git a/src/log-store/src/kafka/index.rs b/src/log-store/src/kafka/index.rs index 1c646376165b..567f34f3cb9e 100644 --- a/src/log-store/src/kafka/index.rs +++ b/src/log-store/src/kafka/index.rs @@ -13,11 +13,13 @@ // limitations under the License. mod collector; +mod encoder; mod iterator; pub(crate) use collector::{ - GlobalIndexCollector, IndexCollector, IndexEncoder, NoopCollector, ProviderLevelIndexCollector, + GlobalIndexCollector, IndexCollector, NoopCollector, ProviderLevelIndexCollector, }; +pub(crate) use encoder::{IndexEncoder, JsonIndexEncoder}; pub(crate) use iterator::{ MultipleRegionWalIndexIterator, NextBatchHint, RegionWalIndexIterator, RegionWalRange, RegionWalVecIndex, diff --git a/src/log-store/src/kafka/index/collector.rs b/src/log-store/src/kafka/index/collector.rs index a8cb2546b6c6..b9c9e1b8c596 100644 --- a/src/log-store/src/kafka/index/collector.rs +++ b/src/log-store/src/kafka/index/collector.rs @@ -14,13 +14,14 @@ use std::collections::{BTreeSet, HashMap}; use std::io::Write; +use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::{Arc, Mutex}; use std::time::Duration; -use bytes::buf::Writer; use bytes::{BufMut, Bytes, BytesMut}; -use common_telemetry::tracing::error; +use common_telemetry::{error, info}; use futures::future::try_join_all; +use object_store::Writer; use serde::{Deserialize, Serialize}; use snafu::ResultExt; use store_api::logstore::provider::KafkaProvider; @@ -31,14 +32,10 @@ use tokio::sync::mpsc::Sender; use tokio::sync::Mutex as TokioMutex; use crate::error::{self, Result}; +use crate::kafka::index::encoder::IndexEncoder; +use crate::kafka::index::JsonIndexEncoder; use crate::kafka::worker::{DumpIndexRequest, WorkerRequest}; -pub trait IndexEncoder: Send + Sync { - fn encode(&self, provider: &KafkaProvider, region_index: &RegionIndexes); - - fn finish(&self) -> Result>; -} - /// The [`IndexCollector`] trait defines the operations for managing and collecting index entries. pub trait IndexCollector: Send + Sync { /// Appends an [`EntryId`] for a specific region. @@ -58,9 +55,136 @@ pub trait IndexCollector: Send + Sync { /// The [`GlobalIndexCollector`] struct is responsible for managing index entries /// across multiple providers. -#[derive(Debug, Clone, Default)] +#[derive(Debug, Clone)] pub struct GlobalIndexCollector { providers: Arc, Sender>>>, + task: CollectionTask, +} + +#[derive(Debug, Clone)] +pub struct CollectionTask { + providers: Arc, Sender>>>, + dump_index_interval: Duration, + checkpoint_interval: Duration, + operator: object_store::ObjectStore, + path: String, + running: Arc, +} + +impl CollectionTask { + async fn dump_index( + providers: &Arc, Sender>>>, + operator: &object_store::ObjectStore, + path: &str, + ) -> Result<()> { + let encoder = Arc::new(JsonIndexEncoder::default()); + let receivers = { + let providers = providers.lock().await; + let mut receivers = Vec::with_capacity(providers.len()); + for (provider, sender) in providers.iter() { + let (req, rx) = DumpIndexRequest::new(encoder.clone()); + receivers.push(rx); + if sender.send(WorkerRequest::DumpIndex(req)).await.is_err() { + error!( + "BackgroundProducerWorker is stopped, topic: {}", + provider.topic + ) + } + } + receivers + }; + try_join_all(receivers) + .await + .context(error::WaitDumpIndexSnafu)?; + let bytes = encoder.finish()?; + let mut writer = operator + .writer(path) + .await + .context(error::CreateWriterSnafu)?; + writer.write(bytes).await.context(error::WriteIndexSnafu)?; + writer.close().await.context(error::WriteIndexSnafu)?; + + Ok(()) + } + + async fn checkpoint( + providers: &Arc, Sender>>>, + ) { + for (provider, sender) in providers.lock().await.iter() { + if sender.send(WorkerRequest::Checkpoint).await.is_err() { + error!( + "BackgroundProducerWorker is stopped, topic: {}", + provider.topic + ) + } + } + } + + /// The background task performs two main operations: + /// - Persists the WAL index to the specified `path` at every `dump_index_interval`. + /// - Updates the latest index ID for each WAL provider at every `checkpoint_interval`. + fn run(&mut self) { + let mut dump_index_interval = tokio::time::interval(self.dump_index_interval); + let mut checkpoint_interval = tokio::time::interval(self.checkpoint_interval); + let providers = self.providers.clone(); + let path = self.path.to_string(); + let operator = self.operator.clone(); + let running = self.running.clone(); + common_runtime::spawn_global(async move { + loop { + if !running.load(Ordering::Relaxed) { + info!("shutdown the index collection task"); + break; + } + select! { + _ = dump_index_interval.tick() => { + if let Err(err) = CollectionTask::dump_index(&providers, &operator, &path).await { + error!(err; "Failed to persist the WAL index"); + } + }, + _ = checkpoint_interval.tick() => { + CollectionTask::checkpoint(&providers).await; + } + } + } + }); + } +} + +impl Drop for CollectionTask { + fn drop(&mut self) { + self.running.store(false, Ordering::Relaxed); + } +} + +impl GlobalIndexCollector { + /// Constructs a [`GlobalIndexCollector`]. + /// + /// This method initializes a `GlobalIndexCollector` instance and starts a background task + /// for managing WAL (Write-Ahead Logging) indexes. + /// + /// The background task performs two main operations: + /// - Persists the WAL index to the specified `path` at every `dump_index_interval`. + /// - Updates the latest index ID for each WAL provider at every `checkpoint_interval`. + pub fn new( + dump_index_interval: Duration, + checkpoint_interval: Duration, + operator: object_store::ObjectStore, + path: String, + ) -> Self { + let providers: Arc, Sender>>> = + Arc::new(Default::default()); + let mut task = CollectionTask { + providers: providers.clone(), + dump_index_interval, + checkpoint_interval, + operator, + path, + running: Arc::new(AtomicBool::new(true)), + }; + task.run(); + Self { providers, task } + } } impl GlobalIndexCollector { @@ -83,8 +207,8 @@ impl GlobalIndexCollector { /// latest [`EntryId`] across all regions. #[derive(Debug, Clone, Default, Serialize, Deserialize)] pub struct RegionIndexes { - regions: HashMap>, - latest_entry_id: EntryId, + pub(crate) regions: HashMap>, + pub(crate) latest_entry_id: EntryId, } impl RegionIndexes { diff --git a/src/log-store/src/kafka/index/encoder.rs b/src/log-store/src/kafka/index/encoder.rs new file mode 100644 index 000000000000..f2124862530c --- /dev/null +++ b/src/log-store/src/kafka/index/encoder.rs @@ -0,0 +1,193 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::collections::{BTreeSet, HashMap}; +use std::fs::File; +use std::sync::{Arc, Mutex}; + +use arrow::array::{ + Array, ArrayBuilder, ArrayData, ArrayRef, ListArray, ListBuilder, PrimitiveArray, RecordBatch, + StringArray, StructArray, StructBuilder, UInt64Array, UInt64Builder, +}; +use arrow::buffer::OffsetBuffer; +use arrow::datatypes::{DataType, Field, Fields, Schema, UInt64Type}; +use arrow::util::pretty::pretty_format_batches; +use delta_encoding::{DeltaDecoderExt, DeltaEncoderExt}; +use parquet::arrow::ArrowWriter; +use parquet::file::page_index::index_reader; +use parquet::schema::types::{Type, TypePtr}; +use serde::{Deserialize, Serialize}; +use snafu::ResultExt; +use store_api::logstore::provider::KafkaProvider; +use store_api::storage::RegionId; + +use crate::error::{self, Result}; +use crate::kafka::index::collector::RegionIndexes; + +/// Converts a [`RegionIndexes`] instance into a [`DeltaEncodedRegionIndexes`]. +/// +/// This conversion encodes the index values using delta encoding to reduce storage space. +impl From<&RegionIndexes> for DeltaEncodedRegionIndexes { + fn from(value: &RegionIndexes) -> Self { + let mut regions = HashMap::with_capacity(value.regions.len()); + for (region_id, indexes) in value.regions.iter() { + let indexes = indexes.iter().copied().deltas().collect(); + regions.insert(*region_id, indexes); + } + Self { + regions, + last_index: value.latest_entry_id, + } + } +} + +/// Represents the delta-encoded version of region indexes for efficient storage. +#[derive(Debug, Default, Serialize, Deserialize)] +pub struct DeltaEncodedRegionIndexes { + regions: HashMap>, + last_index: u64, +} + +impl DeltaEncodedRegionIndexes { + /// Retrieves the original (decoded) index values for a given region. + fn region(&self, region_id: RegionId) -> Option> { + let decoded = self + .regions + .get(®ion_id) + .map(|delta| delta.iter().copied().original().collect::>()); + + decoded + } + + /// Retrieves the last index. + fn last_index(&self) -> u64 { + self.last_index + } +} + +pub trait IndexEncoder: Send + Sync { + fn encode(&self, provider: &KafkaProvider, region_index: &RegionIndexes); + + fn finish(&self) -> Result>; +} + +/// [`DatanodeWalIndexes`] structure holds the WAL indexes for a datanode. +#[derive(Debug, Default, Serialize, Deserialize)] +pub(crate) struct DatanodeWalIndexes(HashMap); + +impl DatanodeWalIndexes { + fn insert(&mut self, topic: String, region_index: &RegionIndexes) { + self.0.insert(topic, region_index.into()); + } + + fn encode(&mut self) -> Result> { + let value = serde_json::to_vec(&self.0).context(error::EncodeJsonSnafu); + self.0.clear(); + value + } + + fn decode(byte: &[u8]) -> Result { + serde_json::from_slice(byte).context(error::DecodeJsonSnafu) + } + + /// Retrieves the delta encoded region indexes for a given `provider`. + pub(crate) fn provider(&self, provider: &KafkaProvider) -> Option<&DeltaEncodedRegionIndexes> { + self.0.get(&provider.topic) + } +} + +/// [`JsonIndexEncoder`] encodes the [`RegionIndexes`]s into JSON format. +#[derive(Debug, Default)] +pub(crate) struct JsonIndexEncoder { + buf: Mutex, +} + +impl IndexEncoder for JsonIndexEncoder { + fn encode(&self, provider: &KafkaProvider, region_index: &RegionIndexes) { + self.buf + .lock() + .unwrap() + .insert(provider.topic.to_string(), region_index); + } + + fn finish(&self) -> Result> { + let mut buf = self.buf.lock().unwrap(); + buf.encode() + } +} + +#[cfg(test)] +mod tests { + use std::collections::{BTreeSet, HashMap, HashSet}; + + use store_api::logstore::provider::KafkaProvider; + use store_api::storage::RegionId; + + use super::{DatanodeWalIndexes, IndexEncoder, JsonIndexEncoder}; + use crate::kafka::index::collector::RegionIndexes; + + #[test] + fn test_json_index_encoder() { + let encoder = JsonIndexEncoder::default(); + let topic_1 = KafkaProvider::new("my_topic_1".to_string()); + let region_1_indexes = BTreeSet::from([1u64, 2, 4, 5, 20]); + let region_2_indexes = BTreeSet::from([4u64, 12, 43, 54, 75]); + encoder.encode( + &topic_1, + &RegionIndexes { + regions: HashMap::from([ + (RegionId::new(1, 1), region_1_indexes.clone()), + (RegionId::new(1, 2), region_2_indexes.clone()), + ]), + latest_entry_id: 1024, + }, + ); + let topic_2 = KafkaProvider::new("my_topic_2".to_string()); + encoder.encode( + &topic_2, + &RegionIndexes { + regions: HashMap::from([ + ( + RegionId::new(1, 1), + BTreeSet::from([1024u64, 1025, 1026, 1028, 2048]), + ), + (RegionId::new(1, 2), BTreeSet::from([1512])), + ]), + latest_entry_id: 2048, + }, + ); + + let bytes = encoder.finish().unwrap(); + let datanode_index = DatanodeWalIndexes::decode(&bytes).unwrap(); + assert_eq!( + datanode_index + .provider(&topic_1) + .unwrap() + .region(RegionId::new(1, 1)) + .unwrap(), + region_1_indexes, + ); + assert_eq!( + datanode_index + .provider(&topic_1) + .unwrap() + .region(RegionId::new(1, 2)) + .unwrap(), + region_2_indexes, + ); + assert!(datanode_index + .provider(&KafkaProvider::new("my_topic_3".to_string())) + .is_none()); + } +} From 5a574b0129ec98f99b51e16cf793f3f4da80f773 Mon Sep 17 00:00:00 2001 From: WenyXu Date: Thu, 8 Aug 2024 12:22:46 +0000 Subject: [PATCH 2/8] feat: add config of index collector --- src/common/wal/src/config.rs | 1 + src/common/wal/src/config/kafka/datanode.rs | 8 +++ src/datanode/src/datanode.rs | 61 ++++++++++++++---- src/datanode/src/store.rs | 69 +++++++++++++++------ src/log-store/src/kafka.rs | 1 + src/log-store/src/kafka/client_manager.rs | 9 ++- src/log-store/src/kafka/index.rs | 9 ++- src/log-store/src/kafka/index/collector.rs | 2 +- src/log-store/src/kafka/log_store.rs | 13 ++-- src/object-store/src/lib.rs | 2 +- 10 files changed, 132 insertions(+), 43 deletions(-) diff --git a/src/common/wal/src/config.rs b/src/common/wal/src/config.rs index 9bf3280c5a29..fa24a9970cfc 100644 --- a/src/common/wal/src/config.rs +++ b/src/common/wal/src/config.rs @@ -223,6 +223,7 @@ mod tests { replication_factor: 1, create_topic_timeout: Duration::from_secs(30), }, + ..Default::default() }; assert_eq!(datanode_wal_config, DatanodeWalConfig::Kafka(expected)); } diff --git a/src/common/wal/src/config/kafka/datanode.rs b/src/common/wal/src/config/kafka/datanode.rs index a1260c05effd..5ab0e1f64a98 100644 --- a/src/common/wal/src/config/kafka/datanode.rs +++ b/src/common/wal/src/config/kafka/datanode.rs @@ -40,6 +40,11 @@ pub struct DatanodeKafkaConfig { /// The kafka topic config. #[serde(flatten)] pub kafka_topic: KafkaTopicConfig, + pub create_index: bool, + #[serde(with = "humantime_serde")] + pub create_index_interval: Duration, + #[serde(with = "humantime_serde")] + pub index_checkpoint_interval: Duration, } impl Default for DatanodeKafkaConfig { @@ -51,6 +56,9 @@ impl Default for DatanodeKafkaConfig { consumer_wait_timeout: Duration::from_millis(100), backoff: BackoffConfig::default(), kafka_topic: KafkaTopicConfig::default(), + create_index: true, + create_index_interval: Duration::from_secs(60), + index_checkpoint_interval: Duration::from_secs(5 * 60), } } } diff --git a/src/datanode/src/datanode.rs b/src/datanode/src/datanode.rs index ceb40081d13e..9e5ce2f64837 100644 --- a/src/datanode/src/datanode.rs +++ b/src/datanode/src/datanode.rs @@ -16,6 +16,7 @@ use std::path::Path; use std::sync::Arc; +use std::time::Duration; use catalog::memory::MemoryCatalogManager; use common_base::Plugins; @@ -32,6 +33,7 @@ use common_wal::config::DatanodeWalConfig; use file_engine::engine::FileRegionEngine; use futures_util::TryStreamExt; use log_store::kafka::log_store::KafkaLogStore; +use log_store::kafka::{default_index_file, GlobalIndexCollector}; use log_store::raft_engine::log_store::RaftEngineLogStore; use meta_client::MetaClientRef; use metric_engine::engine::MetricEngine; @@ -64,7 +66,7 @@ use crate::event_listener::{ use crate::greptimedb_telemetry::get_greptimedb_telemetry_task; use crate::heartbeat::HeartbeatTask; use crate::region_server::{DummyTableProviderFactory, RegionServer}; -use crate::store; +use crate::store::{self, new_object_store_without_cache}; /// Datanode service. pub struct Datanode { @@ -398,15 +400,35 @@ impl DatanodeBuilder { ) .await .context(BuildMitoEngineSnafu)?, - DatanodeWalConfig::Kafka(kafka_config) => MitoEngine::new( - &opts.storage.data_home, - config, - Self::build_kafka_log_store(kafka_config).await?, - object_store_manager, - plugins, - ) - .await - .context(BuildMitoEngineSnafu)?, + DatanodeWalConfig::Kafka(kafka_config) => { + let global_index_collector = if kafka_config.create_index && opts.node_id.is_some() + { + let operator = new_object_store_without_cache( + &opts.storage.store, + &opts.storage.data_home, + ) + .await?; + let path = default_index_file(opts.node_id.unwrap()); + Some(Self::build_global_index_collector( + kafka_config.create_index_interval, + kafka_config.index_checkpoint_interval, + operator, + path, + )) + } else { + None + }; + + MitoEngine::new( + &opts.storage.data_home, + config, + Self::build_kafka_log_store(kafka_config, global_index_collector).await?, + object_store_manager, + plugins, + ) + .await + .context(BuildMitoEngineSnafu)? + } }; Ok(mito_engine) } @@ -438,14 +460,27 @@ impl DatanodeBuilder { Ok(Arc::new(logstore)) } - /// Builds [KafkaLogStore]. - async fn build_kafka_log_store(config: &DatanodeKafkaConfig) -> Result> { - KafkaLogStore::try_new(config) + /// Builds [`KafkaLogStore`]. + async fn build_kafka_log_store( + config: &DatanodeKafkaConfig, + global_index_collector: Option, + ) -> Result> { + KafkaLogStore::try_new(config, global_index_collector) .await .map_err(Box::new) .context(OpenLogStoreSnafu) .map(Arc::new) } + + /// Builds [`GlobalIndexCollector`] + fn build_global_index_collector( + dump_index_interval: Duration, + checkpoint_interval: Duration, + operator: object_store::ObjectStore, + path: String, + ) -> GlobalIndexCollector { + GlobalIndexCollector::new(dump_index_interval, checkpoint_interval, operator, path) + } } /// Open all regions belong to this datanode. diff --git a/src/datanode/src/store.rs b/src/datanode/src/store.rs index 877f044974bb..16b6e0bc8be5 100644 --- a/src/datanode/src/store.rs +++ b/src/datanode/src/store.rs @@ -29,18 +29,18 @@ use common_telemetry::{info, warn}; use object_store::layers::{LruCacheLayer, RetryInterceptor, RetryLayer}; use object_store::services::Fs; use object_store::util::{join_dir, normalize_dir, with_instrument_layers}; -use object_store::{Error, HttpClient, ObjectStore, ObjectStoreBuilder}; +use object_store::{Access, Error, HttpClient, ObjectStore, ObjectStoreBuilder}; use snafu::prelude::*; use crate::config::{ObjectStoreConfig, DEFAULT_OBJECT_STORE_CACHE_SIZE}; use crate::error::{self, Result}; -pub(crate) async fn new_object_store( - store: ObjectStoreConfig, +pub(crate) async fn new_raw_object_store( + store: &ObjectStoreConfig, data_home: &str, ) -> Result { let data_home = normalize_dir(data_home); - let object_store = match &store { + let object_store = match store { ObjectStoreConfig::File(file_config) => { fs::new_fs_object_store(&data_home, file_config).await } @@ -51,27 +51,61 @@ pub(crate) async fn new_object_store( } ObjectStoreConfig::Gcs(gcs_config) => gcs::new_gcs_object_store(gcs_config).await, }?; + Ok(object_store) +} +fn with_retry_layers(object_store: ObjectStore) -> ObjectStore { + object_store.layer( + RetryLayer::new() + .with_jitter() + .with_notify(PrintDetailedError), + ) +} + +pub(crate) async fn new_object_store_without_cache( + store: &ObjectStoreConfig, + data_home: &str, +) -> Result { + let object_store = new_raw_object_store(store, data_home).await?; // Enable retry layer and cache layer for non-fs object storages let object_store = if !matches!(store, ObjectStoreConfig::File(..)) { - let object_store = create_object_store_with_cache(object_store, &store).await?; - object_store.layer( - RetryLayer::new() - .with_jitter() - .with_notify(PrintDetailedError), - ) + // Adds retry layer + with_retry_layers(object_store) } else { object_store }; - let store = with_instrument_layers(object_store, true); - Ok(store) + let object_store = with_instrument_layers(object_store, true); + Ok(object_store) } -async fn create_object_store_with_cache( - object_store: ObjectStore, - store_config: &ObjectStoreConfig, +pub(crate) async fn new_object_store( + store: ObjectStoreConfig, + data_home: &str, ) -> Result { + let object_store = new_raw_object_store(&store, data_home).await?; + // Enable retry layer and cache layer for non-fs object storages + let object_store = if !matches!(store, ObjectStoreConfig::File(..)) { + let object_store = if let Some(cache_layer) = build_cache_layer(&store).await? { + // Adds cache layer + object_store.layer(cache_layer) + } else { + object_store + }; + + // Adds retry layer + with_retry_layers(object_store) + } else { + object_store + }; + + let object_store = with_instrument_layers(object_store, true); + Ok(object_store) +} + +async fn build_cache_layer( + store_config: &ObjectStoreConfig, +) -> Result>> { let (cache_path, cache_capacity) = match store_config { ObjectStoreConfig::S3(s3_config) => { let path = s3_config.cache.cache_path.as_ref(); @@ -127,9 +161,9 @@ async fn create_object_store_with_cache( path, cache_capacity ); - Ok(object_store.layer(cache_layer)) + Ok(Some(cache_layer)) } else { - Ok(object_store) + Ok(None) } } @@ -175,7 +209,6 @@ pub(crate) fn build_http_client() -> Result { HttpClient::build(http_builder).context(error::InitBackendSnafu) } - struct PrintDetailedError; // PrintDetailedError is a retry interceptor that prints error in Debug format in retrying. diff --git a/src/log-store/src/kafka.rs b/src/log-store/src/kafka.rs index dfbf2f36d2e8..551e3cc4c518 100644 --- a/src/log-store/src/kafka.rs +++ b/src/log-store/src/kafka.rs @@ -25,6 +25,7 @@ pub(crate) mod util; #[allow(dead_code)] pub(crate) mod worker; +pub use index::{default_index_file, GlobalIndexCollector}; use serde::{Deserialize, Serialize}; use store_api::logstore::entry::Id as EntryId; diff --git a/src/log-store/src/kafka/client_manager.rs b/src/log-store/src/kafka/client_manager.rs index 3f4b4aecf75a..d965edfe5569 100644 --- a/src/log-store/src/kafka/client_manager.rs +++ b/src/log-store/src/kafka/client_manager.rs @@ -68,7 +68,10 @@ pub(crate) struct ClientManager { impl ClientManager { /// Tries to create a ClientManager. - pub(crate) async fn try_new(config: &DatanodeKafkaConfig) -> Result { + pub(crate) async fn try_new( + config: &DatanodeKafkaConfig, + global_index_collector: Option, + ) -> Result { // Sets backoff config for the top-level kafka client and all clients constructed by it. let backoff_config = BackoffConfig { init_backoff: config.backoff.init, @@ -97,7 +100,7 @@ impl ClientManager { instances: RwLock::new(HashMap::new()), flush_batch_size: config.max_batch_bytes.as_bytes() as usize, compression: Compression::Lz4, - global_index_collector: None, + global_index_collector, }) } @@ -219,7 +222,7 @@ mod tests { }, ..Default::default() }; - let manager = ClientManager::try_new(&config).await.unwrap(); + let manager = ClientManager::try_new(&config, None).await.unwrap(); (manager, topics) } diff --git a/src/log-store/src/kafka/index.rs b/src/log-store/src/kafka/index.rs index 567f34f3cb9e..7e38f14d9a61 100644 --- a/src/log-store/src/kafka/index.rs +++ b/src/log-store/src/kafka/index.rs @@ -16,11 +16,14 @@ mod collector; mod encoder; mod iterator; -pub(crate) use collector::{ - GlobalIndexCollector, IndexCollector, NoopCollector, ProviderLevelIndexCollector, -}; +pub use collector::GlobalIndexCollector; +pub(crate) use collector::{IndexCollector, NoopCollector, ProviderLevelIndexCollector}; pub(crate) use encoder::{IndexEncoder, JsonIndexEncoder}; pub(crate) use iterator::{ MultipleRegionWalIndexIterator, NextBatchHint, RegionWalIndexIterator, RegionWalRange, RegionWalVecIndex, }; + +pub fn default_index_file(datanode_id: u64) -> String { + format!("__datanode/{datanode_id}/index.json") +} diff --git a/src/log-store/src/kafka/index/collector.rs b/src/log-store/src/kafka/index/collector.rs index b9c9e1b8c596..76f74d52f59b 100644 --- a/src/log-store/src/kafka/index/collector.rs +++ b/src/log-store/src/kafka/index/collector.rs @@ -189,7 +189,7 @@ impl GlobalIndexCollector { impl GlobalIndexCollector { /// Creates a new [`ProviderLevelIndexCollector`] for a specified provider. - pub fn provider_level_index_collector( + pub(crate) fn provider_level_index_collector( &self, provider: Arc, sender: Sender, diff --git a/src/log-store/src/kafka/log_store.rs b/src/log-store/src/kafka/log_store.rs index 67bf4019f611..283378cbb511 100644 --- a/src/log-store/src/kafka/log_store.rs +++ b/src/log-store/src/kafka/log_store.rs @@ -32,6 +32,7 @@ use store_api::storage::RegionId; use crate::error::{self, ConsumeRecordSnafu, Error, GetOffsetSnafu, InvalidProviderSnafu, Result}; use crate::kafka::client_manager::{ClientManager, ClientManagerRef}; +use crate::kafka::index::GlobalIndexCollector; use crate::kafka::producer::OrderedBatchProducerRef; use crate::kafka::util::record::{ convert_to_kafka_records, maybe_emit_entry, remaining_entries, Record, ESTIMATED_META_SIZE, @@ -51,8 +52,12 @@ pub struct KafkaLogStore { impl KafkaLogStore { /// Tries to create a Kafka log store. - pub async fn try_new(config: &DatanodeKafkaConfig) -> Result { - let client_manager = Arc::new(ClientManager::try_new(config).await?); + pub async fn try_new( + config: &DatanodeKafkaConfig, + global_index_collector: Option, + ) -> Result { + let client_manager = + Arc::new(ClientManager::try_new(config, global_index_collector).await?); Ok(Self { client_manager, @@ -470,7 +475,7 @@ mod tests { max_batch_bytes: ReadableSize::kb(32), ..Default::default() }; - let logstore = KafkaLogStore::try_new(&config).await.unwrap(); + let logstore = KafkaLogStore::try_new(&config, None).await.unwrap(); let topic_name = uuid::Uuid::new_v4().to_string(); let provider = Provider::kafka_provider(topic_name); let region_entries = (0..5) @@ -542,7 +547,7 @@ mod tests { max_batch_bytes: ReadableSize::kb(8), ..Default::default() }; - let logstore = KafkaLogStore::try_new(&config).await.unwrap(); + let logstore = KafkaLogStore::try_new(&config, None).await.unwrap(); let topic_name = uuid::Uuid::new_v4().to_string(); let provider = Provider::kafka_provider(topic_name); let region_entries = (0..5) diff --git a/src/object-store/src/lib.rs b/src/object-store/src/lib.rs index ae561d4bbcd6..d94194ffdb79 100644 --- a/src/object-store/src/lib.rs +++ b/src/object-store/src/lib.rs @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -pub use opendal::raw::{normalize_path as raw_normalize_path, HttpClient}; +pub use opendal::raw::{normalize_path as raw_normalize_path, Access, HttpClient}; pub use opendal::{ services, Builder as ObjectStoreBuilder, Entry, EntryMode, Error, ErrorKind, FuturesAsyncReader, FuturesAsyncWriter, Lister, Metakey, Operator as ObjectStore, Reader, From 023cf1d4ea7e1eaac478b6896f3be5d1cc9bb3b0 Mon Sep 17 00:00:00 2001 From: WenyXu Date: Tue, 13 Aug 2024 08:02:07 +0000 Subject: [PATCH 3/8] chore: remove unused code --- Cargo.lock | 2 -- src/common/wal/src/config.rs | 1 + src/log-store/Cargo.toml | 2 -- src/log-store/src/kafka.rs | 8 ++++---- src/log-store/src/kafka/client_manager.rs | 4 +++- src/log-store/src/kafka/index.rs | 2 +- src/log-store/src/kafka/index/collector.rs | 10 ++++------ src/log-store/src/kafka/index/encoder.rs | 15 ++------------- src/log-store/src/kafka/index/iterator.rs | 10 +++------- src/log-store/src/test_util/log_store_util.rs | 13 ++++++++----- 10 files changed, 26 insertions(+), 41 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 8243c9efa735..7a3e48e6456a 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5796,7 +5796,6 @@ checksum = "90ed8c1e510134f979dbc4f070f87d4313098b704861a105fe34231c70a3901c" name = "log-store" version = "0.9.1" dependencies = [ - "arrow", "async-stream", "async-trait", "bytes", @@ -5816,7 +5815,6 @@ dependencies = [ "itertools 0.10.5", "lazy_static", "object-store", - "parquet", "pin-project", "prometheus", "protobuf", diff --git a/src/common/wal/src/config.rs b/src/common/wal/src/config.rs index fa24a9970cfc..90f3e44f9c4a 100644 --- a/src/common/wal/src/config.rs +++ b/src/common/wal/src/config.rs @@ -30,6 +30,7 @@ pub enum MetasrvWalConfig { Kafka(MetasrvKafkaConfig), } +#[allow(clippy::large_enum_variant)] /// Wal configurations for datanode. #[derive(Debug, Serialize, Deserialize, Clone, PartialEq)] #[serde(tag = "provider", rename_all = "snake_case")] diff --git a/src/log-store/Cargo.toml b/src/log-store/Cargo.toml index fd3df5cda6fe..b841bb3505ac 100644 --- a/src/log-store/Cargo.toml +++ b/src/log-store/Cargo.toml @@ -13,7 +13,6 @@ protobuf-build = { version = "0.15", default-features = false, features = [ workspace = true [dependencies] -arrow.workspace = true async-stream.workspace = true async-trait.workspace = true bytes.workspace = true @@ -32,7 +31,6 @@ futures-util.workspace = true itertools.workspace = true lazy_static.workspace = true object-store.workspace = true -parquet.workspace = true pin-project.workspace = true prometheus.workspace = true protobuf = { version = "2", features = ["bytes"] } diff --git a/src/log-store/src/kafka.rs b/src/log-store/src/kafka.rs index 551e3cc4c518..ad73b770398c 100644 --- a/src/log-store/src/kafka.rs +++ b/src/log-store/src/kafka.rs @@ -13,15 +13,15 @@ // limitations under the License. pub(crate) mod client_manager; -// TODO(weny): remove it -#[allow(dead_code)] pub(crate) mod consumer; -#[allow(unused)] +/// TODO(weny): remove it. +#[allow(dead_code)] +#[allow(unused_imports)] pub(crate) mod index; pub mod log_store; pub(crate) mod producer; pub(crate) mod util; -// TODO(weny): remove it +/// TODO(weny): remove it. #[allow(dead_code)] pub(crate) mod worker; diff --git a/src/log-store/src/kafka/client_manager.rs b/src/log-store/src/kafka/client_manager.rs index d965edfe5569..a2feb2201134 100644 --- a/src/log-store/src/kafka/client_manager.rs +++ b/src/log-store/src/kafka/client_manager.rs @@ -151,7 +151,9 @@ impl ClientManager { let (tx, rx) = OrderedBatchProducer::channel(); let index_collector = if let Some(global_collector) = self.global_index_collector.as_ref() { - global_collector.provider_level_index_collector(provider.clone(), tx.clone()) + global_collector + .provider_level_index_collector(provider.clone(), tx.clone()) + .await } else { Box::new(NoopCollector) }; diff --git a/src/log-store/src/kafka/index.rs b/src/log-store/src/kafka/index.rs index 7e38f14d9a61..1bd0f3e621e8 100644 --- a/src/log-store/src/kafka/index.rs +++ b/src/log-store/src/kafka/index.rs @@ -17,7 +17,7 @@ mod encoder; mod iterator; pub use collector::GlobalIndexCollector; -pub(crate) use collector::{IndexCollector, NoopCollector, ProviderLevelIndexCollector}; +pub(crate) use collector::{IndexCollector, NoopCollector}; pub(crate) use encoder::{IndexEncoder, JsonIndexEncoder}; pub(crate) use iterator::{ MultipleRegionWalIndexIterator, NextBatchHint, RegionWalIndexIterator, RegionWalRange, diff --git a/src/log-store/src/kafka/index/collector.rs b/src/log-store/src/kafka/index/collector.rs index 76f74d52f59b..228362e7934b 100644 --- a/src/log-store/src/kafka/index/collector.rs +++ b/src/log-store/src/kafka/index/collector.rs @@ -13,15 +13,12 @@ // limitations under the License. use std::collections::{BTreeSet, HashMap}; -use std::io::Write; use std::sync::atomic::{AtomicBool, Ordering}; -use std::sync::{Arc, Mutex}; +use std::sync::Arc; use std::time::Duration; -use bytes::{BufMut, Bytes, BytesMut}; use common_telemetry::{error, info}; use futures::future::try_join_all; -use object_store::Writer; use serde::{Deserialize, Serialize}; use snafu::ResultExt; use store_api::logstore::provider::KafkaProvider; @@ -189,11 +186,12 @@ impl GlobalIndexCollector { impl GlobalIndexCollector { /// Creates a new [`ProviderLevelIndexCollector`] for a specified provider. - pub(crate) fn provider_level_index_collector( + pub(crate) async fn provider_level_index_collector( &self, provider: Arc, sender: Sender, ) -> Box { + self.providers.lock().await.insert(provider.clone(), sender); Box::new(ProviderLevelIndexCollector { indexes: Default::default(), provider, @@ -269,5 +267,5 @@ impl IndexCollector for NoopCollector { fn set_latest_entry_id(&mut self, _entry_id: EntryId) {} - fn dump(&mut self, encoder: &dyn IndexEncoder) {} + fn dump(&mut self, _encoder: &dyn IndexEncoder) {} } diff --git a/src/log-store/src/kafka/index/encoder.rs b/src/log-store/src/kafka/index/encoder.rs index f2124862530c..bfd11a982c14 100644 --- a/src/log-store/src/kafka/index/encoder.rs +++ b/src/log-store/src/kafka/index/encoder.rs @@ -13,20 +13,9 @@ // limitations under the License. use std::collections::{BTreeSet, HashMap}; -use std::fs::File; -use std::sync::{Arc, Mutex}; - -use arrow::array::{ - Array, ArrayBuilder, ArrayData, ArrayRef, ListArray, ListBuilder, PrimitiveArray, RecordBatch, - StringArray, StructArray, StructBuilder, UInt64Array, UInt64Builder, -}; -use arrow::buffer::OffsetBuffer; -use arrow::datatypes::{DataType, Field, Fields, Schema, UInt64Type}; -use arrow::util::pretty::pretty_format_batches; +use std::sync::Mutex; + use delta_encoding::{DeltaDecoderExt, DeltaEncoderExt}; -use parquet::arrow::ArrowWriter; -use parquet::file::page_index::index_reader; -use parquet::schema::types::{Type, TypePtr}; use serde::{Deserialize, Serialize}; use snafu::ResultExt; use store_api::logstore::provider::KafkaProvider; diff --git a/src/log-store/src/kafka/index/iterator.rs b/src/log-store/src/kafka/index/iterator.rs index 8a33cf1d9a3d..7df2518752bd 100644 --- a/src/log-store/src/kafka/index/iterator.rs +++ b/src/log-store/src/kafka/index/iterator.rs @@ -12,14 +12,10 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::cmp::{max, min}; +use std::cmp::min; use std::collections::VecDeque; -use std::iter::Peekable; -use std::marker::PhantomData; -use std::ops::{Add, Mul, Range, Sub}; +use std::ops::Range; -use chrono::format::Item; -use itertools::Itertools; use store_api::logstore::EntryId; use crate::kafka::util::range::{ConvertIndexToRange, MergeRange}; @@ -197,7 +193,7 @@ mod tests { #[test] fn test_region_wal_range() { - let mut range = RegionWalRange::new(0..1024, 1024); + let range = RegionWalRange::new(0..1024, 1024); assert_eq!( range.next_batch_hint(10), Some(NextBatchHint { diff --git a/src/log-store/src/test_util/log_store_util.rs b/src/log-store/src/test_util/log_store_util.rs index f78b5a965d0c..b1fd183fbacc 100644 --- a/src/log-store/src/test_util/log_store_util.rs +++ b/src/log-store/src/test_util/log_store_util.rs @@ -34,13 +34,16 @@ pub async fn create_tmp_local_file_log_store>(path: P) -> RaftEng /// Create a [KafkaLogStore]. pub async fn create_kafka_log_store(broker_endpoints: Vec) -> KafkaLogStore { - KafkaLogStore::try_new(&DatanodeKafkaConfig { - connection: KafkaConnectionConfig { - broker_endpoints, + KafkaLogStore::try_new( + &DatanodeKafkaConfig { + connection: KafkaConnectionConfig { + broker_endpoints, + ..Default::default() + }, ..Default::default() }, - ..Default::default() - }) + None, + ) .await .unwrap() } From efc8a9790e06d9fd92e627fe872862dcc2d3b915 Mon Sep 17 00:00:00 2001 From: WenyXu Date: Tue, 13 Aug 2024 08:25:33 +0000 Subject: [PATCH 4/8] feat: truncate indexes --- src/log-store/src/kafka/client_manager.rs | 4 ++++ src/log-store/src/kafka/index/collector.rs | 26 +++++++++++++++++++++- src/log-store/src/kafka/log_store.rs | 16 ++++++++++++- src/log-store/src/kafka/worker.rs | 9 ++++++++ src/log-store/src/raft_engine/log_store.rs | 20 ++++++++++++----- src/mito2/src/wal.rs | 4 ++-- src/mito2/src/wal/raw_entry_reader.rs | 1 + src/store-api/src/logstore.rs | 7 +++++- 8 files changed, 77 insertions(+), 10 deletions(-) diff --git a/src/log-store/src/kafka/client_manager.rs b/src/log-store/src/kafka/client_manager.rs index a2feb2201134..6337683c9392 100644 --- a/src/log-store/src/kafka/client_manager.rs +++ b/src/log-store/src/kafka/client_manager.rs @@ -168,6 +168,10 @@ impl ClientManager { Ok(Client { client, producer }) } + + pub(crate) fn global_index_collector(&self) -> Option<&GlobalIndexCollector> { + self.global_index_collector.as_ref() + } } #[cfg(test)] diff --git a/src/log-store/src/kafka/index/collector.rs b/src/log-store/src/kafka/index/collector.rs index 228362e7934b..8901f5b4ec2b 100644 --- a/src/log-store/src/kafka/index/collector.rs +++ b/src/log-store/src/kafka/index/collector.rs @@ -31,7 +31,7 @@ use tokio::sync::Mutex as TokioMutex; use crate::error::{self, Result}; use crate::kafka::index::encoder::IndexEncoder; use crate::kafka::index::JsonIndexEncoder; -use crate::kafka::worker::{DumpIndexRequest, WorkerRequest}; +use crate::kafka::worker::{DumpIndexRequest, TruncateIndexRequest, WorkerRequest}; /// The [`IndexCollector`] trait defines the operations for managing and collecting index entries. pub trait IndexCollector: Send + Sync { @@ -197,6 +197,30 @@ impl GlobalIndexCollector { provider, }) } + + /// Truncates the index for a specific region up to a given [`EntryId`]. + /// + /// It removes all [`EntryId`]s smaller than `entry_id`. + pub(crate) async fn truncate( + &self, + provider: &Arc, + region_id: RegionId, + entry_id: EntryId, + ) -> Result<()> { + if let Some(sender) = self.providers.lock().await.get(provider).cloned() { + if sender + .send(WorkerRequest::TruncateIndex(TruncateIndexRequest::new( + region_id, entry_id, + ))) + .await + .is_err() + { + return error::OrderedBatchProducerStoppedSnafu {}.fail(); + } + } + + Ok(()) + } } /// The [`RegionIndexes`] struct maintains indexes for a collection of regions. diff --git a/src/log-store/src/kafka/log_store.rs b/src/log-store/src/kafka/log_store.rs index 283378cbb511..909d2d9e333d 100644 --- a/src/log-store/src/kafka/log_store.rs +++ b/src/log-store/src/kafka/log_store.rs @@ -336,7 +336,21 @@ impl LogStore for KafkaLogStore { /// Marks all entries with ids `<=entry_id` of the given `namespace` as obsolete, /// so that the log store can safely delete those entries. This method does not guarantee /// that the obsolete entries are deleted immediately. - async fn obsolete(&self, _provider: &Provider, _entry_id: EntryId) -> Result<()> { + async fn obsolete( + &self, + provider: &Provider, + region_id: RegionId, + entry_id: EntryId, + ) -> Result<()> { + if let Some(collector) = self.client_manager.global_index_collector() { + let provider = provider + .as_kafka_provider() + .with_context(|| InvalidProviderSnafu { + expected: KafkaProvider::type_name(), + actual: provider.type_name(), + })?; + collector.truncate(provider, region_id, entry_id).await?; + } Ok(()) } diff --git a/src/log-store/src/kafka/worker.rs b/src/log-store/src/kafka/worker.rs index 972d56d6f1c4..318ac1c8a587 100644 --- a/src/log-store/src/kafka/worker.rs +++ b/src/log-store/src/kafka/worker.rs @@ -82,6 +82,15 @@ pub(crate) struct TruncateIndexRequest { entry_id: EntryId, } +impl TruncateIndexRequest { + pub fn new(region_id: RegionId, entry_id: EntryId) -> Self { + Self { + region_id, + entry_id, + } + } +} + pub(crate) struct ProduceRequest { region_id: RegionId, batch: Vec, diff --git a/src/log-store/src/raft_engine/log_store.rs b/src/log-store/src/raft_engine/log_store.rs index d2a210fb4203..b4b46966f498 100644 --- a/src/log-store/src/raft_engine/log_store.rs +++ b/src/log-store/src/raft_engine/log_store.rs @@ -419,7 +419,12 @@ impl LogStore for RaftEngineLogStore { })) } - async fn obsolete(&self, provider: &Provider, entry_id: EntryId) -> Result<()> { + async fn obsolete( + &self, + provider: &Provider, + _region_id: RegionId, + entry_id: EntryId, + ) -> Result<()> { let ns = provider .as_raft_engine_provider() .with_context(|| InvalidProviderSnafu { @@ -639,7 +644,8 @@ mod tests { let dir = create_temp_dir("raft-engine-logstore-test"); let logstore = new_test_log_store(&dir).await; - let namespace_id = 42; + let region_id = RegionId::new(1, 1); + let namespace_id = region_id.as_u64(); let namespace = Provider::raft_engine_provider(namespace_id); for id in 0..4096 { let entry = EntryImpl::create(id, namespace_id, [b'x'; 4096].to_vec()).into(); @@ -647,7 +653,10 @@ mod tests { } let before_purge = wal_dir_usage(dir.path().to_str().unwrap()).await; - logstore.obsolete(&namespace, 4000).await.unwrap(); + logstore + .obsolete(&namespace, region_id, 4000) + .await + .unwrap(); tokio::time::sleep(Duration::from_secs(6)).await; let after_purge = wal_dir_usage(dir.path().to_str().unwrap()).await; @@ -664,14 +673,15 @@ mod tests { let dir = create_temp_dir("raft-engine-logstore-test"); let logstore = new_test_log_store(&dir).await; - let namespace_id = 42; + let region_id = RegionId::new(1, 1); + let namespace_id = region_id.as_u64(); let namespace = Provider::raft_engine_provider(namespace_id); for id in 0..1024 { let entry = EntryImpl::create(id, namespace_id, [b'x'; 4096].to_vec()).into(); let _ = logstore.append(entry).await.unwrap(); } - logstore.obsolete(&namespace, 100).await.unwrap(); + logstore.obsolete(&namespace, region_id, 100).await.unwrap(); assert_eq!(101, logstore.engine.first_index(namespace_id).unwrap()); let res = logstore.read(&namespace, 100).await.unwrap(); diff --git a/src/mito2/src/wal.rs b/src/mito2/src/wal.rs index de6ad67b3208..7413f52b2c05 100644 --- a/src/mito2/src/wal.rs +++ b/src/mito2/src/wal.rs @@ -89,7 +89,7 @@ impl Wal { move |region_id, last_entry_id, provider| -> BoxFuture<'_, Result<()>> { Box::pin(async move { store - .obsolete(provider, last_entry_id) + .obsolete(provider, region_id, last_entry_id) .await .map_err(BoxedError::new) .context(DeleteWalSnafu { region_id }) @@ -142,7 +142,7 @@ impl Wal { provider: &Provider, ) -> Result<()> { self.store - .obsolete(provider, last_id) + .obsolete(provider, region_id, last_id) .await .map_err(BoxedError::new) .context(DeleteWalSnafu { region_id }) diff --git a/src/mito2/src/wal/raw_entry_reader.rs b/src/mito2/src/wal/raw_entry_reader.rs index 6dd11c2c8f64..7436dec06a56 100644 --- a/src/mito2/src/wal/raw_entry_reader.rs +++ b/src/mito2/src/wal/raw_entry_reader.rs @@ -168,6 +168,7 @@ mod tests { async fn obsolete( &self, _provider: &Provider, + _region_id: RegionId, _entry_id: EntryId, ) -> Result<(), Self::Error> { unreachable!() diff --git a/src/store-api/src/logstore.rs b/src/store-api/src/logstore.rs index 347643982716..32ab95f5c8d6 100644 --- a/src/store-api/src/logstore.rs +++ b/src/store-api/src/logstore.rs @@ -62,7 +62,12 @@ pub trait LogStore: Send + Sync + 'static + std::fmt::Debug { /// Marks all entries with ids `<=entry_id` of the given `namespace` as obsolete, /// so that the log store can safely delete those entries. This method does not guarantee /// that the obsolete entries are deleted immediately. - async fn obsolete(&self, provider: &Provider, entry_id: EntryId) -> Result<(), Self::Error>; + async fn obsolete( + &self, + provider: &Provider, + region_id: RegionId, + entry_id: EntryId, + ) -> Result<(), Self::Error>; /// Makes an entry instance of the associated Entry type fn entry( From 392c3148fcb73539af6dde81fc127f902a3a1326 Mon Sep 17 00:00:00 2001 From: WenyXu Date: Wed, 14 Aug 2024 03:09:51 +0000 Subject: [PATCH 5/8] chore: apply suggestions from CR --- src/common/wal/src/config/kafka/datanode.rs | 4 +-- src/datanode/src/datanode.rs | 2 +- src/log-store/src/kafka/index/collector.rs | 31 ++++++++------------- 3 files changed, 15 insertions(+), 22 deletions(-) diff --git a/src/common/wal/src/config/kafka/datanode.rs b/src/common/wal/src/config/kafka/datanode.rs index 5ab0e1f64a98..fe9800e8af04 100644 --- a/src/common/wal/src/config/kafka/datanode.rs +++ b/src/common/wal/src/config/kafka/datanode.rs @@ -42,7 +42,7 @@ pub struct DatanodeKafkaConfig { pub kafka_topic: KafkaTopicConfig, pub create_index: bool, #[serde(with = "humantime_serde")] - pub create_index_interval: Duration, + pub dump_index_interval: Duration, #[serde(with = "humantime_serde")] pub index_checkpoint_interval: Duration, } @@ -57,7 +57,7 @@ impl Default for DatanodeKafkaConfig { backoff: BackoffConfig::default(), kafka_topic: KafkaTopicConfig::default(), create_index: true, - create_index_interval: Duration::from_secs(60), + dump_index_interval: Duration::from_secs(60), index_checkpoint_interval: Duration::from_secs(5 * 60), } } diff --git a/src/datanode/src/datanode.rs b/src/datanode/src/datanode.rs index 9e5ce2f64837..4f347a1f8a2a 100644 --- a/src/datanode/src/datanode.rs +++ b/src/datanode/src/datanode.rs @@ -410,7 +410,7 @@ impl DatanodeBuilder { .await?; let path = default_index_file(opts.node_id.unwrap()); Some(Self::build_global_index_collector( - kafka_config.create_index_interval, + kafka_config.dump_index_interval, kafka_config.index_checkpoint_interval, operator, path, diff --git a/src/log-store/src/kafka/index/collector.rs b/src/log-store/src/kafka/index/collector.rs index 8901f5b4ec2b..547414ae7b78 100644 --- a/src/log-store/src/kafka/index/collector.rs +++ b/src/log-store/src/kafka/index/collector.rs @@ -69,14 +69,10 @@ pub struct CollectionTask { } impl CollectionTask { - async fn dump_index( - providers: &Arc, Sender>>>, - operator: &object_store::ObjectStore, - path: &str, - ) -> Result<()> { + async fn dump_index(&self) -> Result<()> { let encoder = Arc::new(JsonIndexEncoder::default()); let receivers = { - let providers = providers.lock().await; + let providers = self.providers.lock().await; let mut receivers = Vec::with_capacity(providers.len()); for (provider, sender) in providers.iter() { let (req, rx) = DumpIndexRequest::new(encoder.clone()); @@ -94,8 +90,9 @@ impl CollectionTask { .await .context(error::WaitDumpIndexSnafu)?; let bytes = encoder.finish()?; - let mut writer = operator - .writer(path) + let mut writer = self + .operator + .writer(&self.path) .await .context(error::CreateWriterSnafu)?; writer.write(bytes).await.context(error::WriteIndexSnafu)?; @@ -104,10 +101,8 @@ impl CollectionTask { Ok(()) } - async fn checkpoint( - providers: &Arc, Sender>>>, - ) { - for (provider, sender) in providers.lock().await.iter() { + async fn checkpoint(&self) { + for (provider, sender) in self.providers.lock().await.iter() { if sender.send(WorkerRequest::Checkpoint).await.is_err() { error!( "BackgroundProducerWorker is stopped, topic: {}", @@ -120,13 +115,11 @@ impl CollectionTask { /// The background task performs two main operations: /// - Persists the WAL index to the specified `path` at every `dump_index_interval`. /// - Updates the latest index ID for each WAL provider at every `checkpoint_interval`. - fn run(&mut self) { + fn run(&self) { let mut dump_index_interval = tokio::time::interval(self.dump_index_interval); let mut checkpoint_interval = tokio::time::interval(self.checkpoint_interval); - let providers = self.providers.clone(); - let path = self.path.to_string(); - let operator = self.operator.clone(); let running = self.running.clone(); + let moved_self = self.clone(); common_runtime::spawn_global(async move { loop { if !running.load(Ordering::Relaxed) { @@ -135,12 +128,12 @@ impl CollectionTask { } select! { _ = dump_index_interval.tick() => { - if let Err(err) = CollectionTask::dump_index(&providers, &operator, &path).await { + if let Err(err) = moved_self.dump_index().await { error!(err; "Failed to persist the WAL index"); } }, _ = checkpoint_interval.tick() => { - CollectionTask::checkpoint(&providers).await; + moved_self.checkpoint().await; } } } @@ -171,7 +164,7 @@ impl GlobalIndexCollector { ) -> Self { let providers: Arc, Sender>>> = Arc::new(Default::default()); - let mut task = CollectionTask { + let task = CollectionTask { providers: providers.clone(), dump_index_interval, checkpoint_interval, From b5defba6eb42981b4743535f6e8747e3592b9d24 Mon Sep 17 00:00:00 2001 From: WenyXu Date: Wed, 14 Aug 2024 03:18:52 +0000 Subject: [PATCH 6/8] chore: update config examples --- config/config.md | 3 +++ config/datanode.example.toml | 12 ++++++++++++ 2 files changed, 15 insertions(+) diff --git a/config/config.md b/config/config.md index dfd2ab889c6d..99635c9e66f0 100644 --- a/config/config.md +++ b/config/config.md @@ -373,6 +373,9 @@ | `wal.backoff_max` | String | `10s` | The maximum backoff delay.
**It's only used when the provider is `kafka`**. | | `wal.backoff_base` | Integer | `2` | The exponential backoff rate, i.e. next backoff = base * current backoff.
**It's only used when the provider is `kafka`**. | | `wal.backoff_deadline` | String | `5mins` | The deadline of retries.
**It's only used when the provider is `kafka`**. | +| `wal.create_index` | Bool | `true` | Whether to enable WAL index creation.
**It's only used when the provider is `kafka`**. | +| `wal.dump_index_interval` | String | `60s` | The interval for dumping WAL indexes.
**It's only used when the provider is `kafka`**. | +| `wal.index_checkpoint_interval` | String | `300s` | The interval for doing WAL index checkpoints.
**It's only used when the provider is `kafka`**. | | `storage` | -- | -- | The data storage options. | | `storage.data_home` | String | `/tmp/greptimedb/` | The working home directory. | | `storage.type` | String | `File` | The storage type used to store the data.
- `File`: the data is stored in the local file system.
- `S3`: the data is stored in the S3 object storage.
- `Gcs`: the data is stored in the Google Cloud Storage.
- `Azblob`: the data is stored in the Azure Blob Storage.
- `Oss`: the data is stored in the Aliyun OSS. | diff --git a/config/datanode.example.toml b/config/datanode.example.toml index 81cbc4703c4d..31e0795ddec2 100644 --- a/config/datanode.example.toml +++ b/config/datanode.example.toml @@ -187,6 +187,18 @@ backoff_base = 2 ## **It's only used when the provider is `kafka`**. backoff_deadline = "5mins" +## Whether to enable WAL index creation. +## **It's only used when the provider is `kafka`**. +create_index = true + +## The interval for dumping WAL indexes. +## **It's only used when the provider is `kafka`**. +dump_index_interval = "60s" + +## The interval for doing WAL index checkpoints. +## **It's only used when the provider is `kafka`**. +index_checkpoint_interval = "300s" + # The Kafka SASL configuration. # **It's only used when the provider is `kafka`**. # Available SASL mechanisms: From 1f4769821b42673c15e9fb6c02d8ab351ce92d6c Mon Sep 17 00:00:00 2001 From: WenyXu Date: Wed, 14 Aug 2024 06:25:45 +0000 Subject: [PATCH 7/8] refactor: retrieve latest offset while dumping indexes --- config/config.md | 1 - config/datanode.example.toml | 4 ---- src/common/wal/src/config/kafka/datanode.rs | 3 --- src/datanode/src/datanode.rs | 4 +--- src/log-store/src/kafka/index/collector.rs | 22 +------------------ src/log-store/src/kafka/worker.rs | 17 +++----------- .../worker/{checkpoint.rs => dump_index.rs} | 9 ++++++-- 7 files changed, 12 insertions(+), 48 deletions(-) rename src/log-store/src/kafka/worker/{checkpoint.rs => dump_index.rs} (76%) diff --git a/config/config.md b/config/config.md index 99635c9e66f0..f9d49e0dba76 100644 --- a/config/config.md +++ b/config/config.md @@ -375,7 +375,6 @@ | `wal.backoff_deadline` | String | `5mins` | The deadline of retries.
**It's only used when the provider is `kafka`**. | | `wal.create_index` | Bool | `true` | Whether to enable WAL index creation.
**It's only used when the provider is `kafka`**. | | `wal.dump_index_interval` | String | `60s` | The interval for dumping WAL indexes.
**It's only used when the provider is `kafka`**. | -| `wal.index_checkpoint_interval` | String | `300s` | The interval for doing WAL index checkpoints.
**It's only used when the provider is `kafka`**. | | `storage` | -- | -- | The data storage options. | | `storage.data_home` | String | `/tmp/greptimedb/` | The working home directory. | | `storage.type` | String | `File` | The storage type used to store the data.
- `File`: the data is stored in the local file system.
- `S3`: the data is stored in the S3 object storage.
- `Gcs`: the data is stored in the Google Cloud Storage.
- `Azblob`: the data is stored in the Azure Blob Storage.
- `Oss`: the data is stored in the Aliyun OSS. | diff --git a/config/datanode.example.toml b/config/datanode.example.toml index 31e0795ddec2..f1a9c63abb3f 100644 --- a/config/datanode.example.toml +++ b/config/datanode.example.toml @@ -195,10 +195,6 @@ create_index = true ## **It's only used when the provider is `kafka`**. dump_index_interval = "60s" -## The interval for doing WAL index checkpoints. -## **It's only used when the provider is `kafka`**. -index_checkpoint_interval = "300s" - # The Kafka SASL configuration. # **It's only used when the provider is `kafka`**. # Available SASL mechanisms: diff --git a/src/common/wal/src/config/kafka/datanode.rs b/src/common/wal/src/config/kafka/datanode.rs index fe9800e8af04..84e9da6bccfa 100644 --- a/src/common/wal/src/config/kafka/datanode.rs +++ b/src/common/wal/src/config/kafka/datanode.rs @@ -43,8 +43,6 @@ pub struct DatanodeKafkaConfig { pub create_index: bool, #[serde(with = "humantime_serde")] pub dump_index_interval: Duration, - #[serde(with = "humantime_serde")] - pub index_checkpoint_interval: Duration, } impl Default for DatanodeKafkaConfig { @@ -58,7 +56,6 @@ impl Default for DatanodeKafkaConfig { kafka_topic: KafkaTopicConfig::default(), create_index: true, dump_index_interval: Duration::from_secs(60), - index_checkpoint_interval: Duration::from_secs(5 * 60), } } } diff --git a/src/datanode/src/datanode.rs b/src/datanode/src/datanode.rs index 4f347a1f8a2a..bfb08aaa338f 100644 --- a/src/datanode/src/datanode.rs +++ b/src/datanode/src/datanode.rs @@ -411,7 +411,6 @@ impl DatanodeBuilder { let path = default_index_file(opts.node_id.unwrap()); Some(Self::build_global_index_collector( kafka_config.dump_index_interval, - kafka_config.index_checkpoint_interval, operator, path, )) @@ -475,11 +474,10 @@ impl DatanodeBuilder { /// Builds [`GlobalIndexCollector`] fn build_global_index_collector( dump_index_interval: Duration, - checkpoint_interval: Duration, operator: object_store::ObjectStore, path: String, ) -> GlobalIndexCollector { - GlobalIndexCollector::new(dump_index_interval, checkpoint_interval, operator, path) + GlobalIndexCollector::new(dump_index_interval, operator, path) } } diff --git a/src/log-store/src/kafka/index/collector.rs b/src/log-store/src/kafka/index/collector.rs index 547414ae7b78..181222304629 100644 --- a/src/log-store/src/kafka/index/collector.rs +++ b/src/log-store/src/kafka/index/collector.rs @@ -62,7 +62,6 @@ pub struct GlobalIndexCollector { pub struct CollectionTask { providers: Arc, Sender>>>, dump_index_interval: Duration, - checkpoint_interval: Duration, operator: object_store::ObjectStore, path: String, running: Arc, @@ -101,23 +100,11 @@ impl CollectionTask { Ok(()) } - async fn checkpoint(&self) { - for (provider, sender) in self.providers.lock().await.iter() { - if sender.send(WorkerRequest::Checkpoint).await.is_err() { - error!( - "BackgroundProducerWorker is stopped, topic: {}", - provider.topic - ) - } - } - } - /// The background task performs two main operations: /// - Persists the WAL index to the specified `path` at every `dump_index_interval`. /// - Updates the latest index ID for each WAL provider at every `checkpoint_interval`. fn run(&self) { let mut dump_index_interval = tokio::time::interval(self.dump_index_interval); - let mut checkpoint_interval = tokio::time::interval(self.checkpoint_interval); let running = self.running.clone(); let moved_self = self.clone(); common_runtime::spawn_global(async move { @@ -132,9 +119,6 @@ impl CollectionTask { error!(err; "Failed to persist the WAL index"); } }, - _ = checkpoint_interval.tick() => { - moved_self.checkpoint().await; - } } } }); @@ -153,12 +137,9 @@ impl GlobalIndexCollector { /// This method initializes a `GlobalIndexCollector` instance and starts a background task /// for managing WAL (Write-Ahead Logging) indexes. /// - /// The background task performs two main operations: - /// - Persists the WAL index to the specified `path` at every `dump_index_interval`. - /// - Updates the latest index ID for each WAL provider at every `checkpoint_interval`. + /// The background task persists the WAL index to the specified `path` at every `dump_index_interval`. pub fn new( dump_index_interval: Duration, - checkpoint_interval: Duration, operator: object_store::ObjectStore, path: String, ) -> Self { @@ -167,7 +148,6 @@ impl GlobalIndexCollector { let task = CollectionTask { providers: providers.clone(), dump_index_interval, - checkpoint_interval, operator, path, running: Arc::new(AtomicBool::new(true)), diff --git a/src/log-store/src/kafka/worker.rs b/src/log-store/src/kafka/worker.rs index 318ac1c8a587..b05351d17245 100644 --- a/src/log-store/src/kafka/worker.rs +++ b/src/log-store/src/kafka/worker.rs @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -pub(crate) mod checkpoint; +pub(crate) mod dump_index; pub(crate) mod flush; pub(crate) mod produce; @@ -29,14 +29,12 @@ use store_api::storage::RegionId; use tokio::sync::mpsc::Receiver; use tokio::sync::oneshot::{self}; -use super::index::IndexEncoder; use crate::error::{self, NoMaxValueSnafu, Result}; -use crate::kafka::index::IndexCollector; +use crate::kafka::index::{IndexCollector, IndexEncoder}; use crate::kafka::producer::ProducerClient; pub(crate) enum WorkerRequest { Produce(ProduceRequest), - Checkpoint, TruncateIndex(TruncateIndexRequest), DumpIndex(DumpIndexRequest), } @@ -188,27 +186,18 @@ impl BackgroundProducerWorker { async fn handle_requests(&mut self, buffer: &mut Vec) { let mut produce_requests = Vec::with_capacity(buffer.len()); - let mut do_checkpoint = false; for req in buffer.drain(..) { match req { WorkerRequest::Produce(req) => produce_requests.push(req), - WorkerRequest::Checkpoint => do_checkpoint = true, WorkerRequest::TruncateIndex(TruncateIndexRequest { region_id, entry_id, }) => self.index_collector.truncate(region_id, entry_id), - WorkerRequest::DumpIndex(req) => { - self.index_collector.dump(req.encoder.as_ref()); - let _ = req.sender.send(()); - } + WorkerRequest::DumpIndex(req) => self.dump_index(req).await, } } let pending_requests = self.aggregate_records(&mut produce_requests, self.max_batch_bytes); self.try_flush_pending_requests(pending_requests).await; - - if do_checkpoint { - self.do_checkpoint().await; - } } } diff --git a/src/log-store/src/kafka/worker/checkpoint.rs b/src/log-store/src/kafka/worker/dump_index.rs similarity index 76% rename from src/log-store/src/kafka/worker/checkpoint.rs rename to src/log-store/src/kafka/worker/dump_index.rs index 0ef6cb8627a7..92b1d3465098 100644 --- a/src/log-store/src/kafka/worker/checkpoint.rs +++ b/src/log-store/src/kafka/worker/dump_index.rs @@ -16,11 +16,12 @@ use common_telemetry::error; use rskafka::client::partition::OffsetAt; use snafu::ResultExt; +use super::DumpIndexRequest; use crate::error; use crate::kafka::worker::BackgroundProducerWorker; impl BackgroundProducerWorker { - pub(crate) async fn do_checkpoint(&mut self) { + pub(crate) async fn dump_index(&mut self, req: DumpIndexRequest) { match self .client .get_offset(OffsetAt::Latest) @@ -28,7 +29,11 @@ impl BackgroundProducerWorker { .context(error::GetOffsetSnafu { topic: &self.provider.topic, }) { - Ok(offset) => self.index_collector.set_latest_entry_id(offset as u64), + Ok(offset) => { + self.index_collector.set_latest_entry_id(offset as u64); + self.index_collector.dump(req.encoder.as_ref()); + let _ = req.sender.send(()); + } Err(err) => error!(err; "Failed to do checkpoint"), } } From c190fef93f7a497c8220aaa41542bfa279d84d56 Mon Sep 17 00:00:00 2001 From: WenyXu Date: Wed, 14 Aug 2024 07:54:40 +0000 Subject: [PATCH 8/8] chore: print warn --- src/datanode/src/datanode.rs | 3 +++ 1 file changed, 3 insertions(+) diff --git a/src/datanode/src/datanode.rs b/src/datanode/src/datanode.rs index bfb08aaa338f..eca551a4a0d4 100644 --- a/src/datanode/src/datanode.rs +++ b/src/datanode/src/datanode.rs @@ -401,6 +401,9 @@ impl DatanodeBuilder { .await .context(BuildMitoEngineSnafu)?, DatanodeWalConfig::Kafka(kafka_config) => { + if kafka_config.create_index && opts.node_id.is_none() { + warn!("The WAL index creation only available in distributed mode.") + } let global_index_collector = if kafka_config.create_index && opts.node_id.is_some() { let operator = new_object_store_without_cache(