Skip to content

Commit

Permalink
feat: add config of index collector
Browse files Browse the repository at this point in the history
  • Loading branch information
WenyXu committed Aug 8, 2024
1 parent d8ab395 commit 9a86dff
Show file tree
Hide file tree
Showing 11 changed files with 139 additions and 47 deletions.
1 change: 1 addition & 0 deletions src/common/wal/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -187,6 +187,7 @@ mod tests {
replication_factor: 1,
create_topic_timeout: Duration::from_secs(30),
},
..Default::default()
};
assert_eq!(datanode_wal_config, DatanodeWalConfig::Kafka(expected));
}
Expand Down
8 changes: 8 additions & 0 deletions src/common/wal/src/config/kafka/datanode.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,11 @@ pub struct DatanodeKafkaConfig {
/// The kafka topic config.
#[serde(flatten)]
pub kafka_topic: KafkaTopicConfig,
pub create_index: bool,
#[serde(with = "humantime_serde")]
pub create_index_interval: Duration,
#[serde(with = "humantime_serde")]
pub index_checkpoint_interval: Duration,
}

impl Default for DatanodeKafkaConfig {
Expand All @@ -50,6 +55,9 @@ impl Default for DatanodeKafkaConfig {
consumer_wait_timeout: Duration::from_millis(100),
backoff: BackoffConfig::default(),
kafka_topic: KafkaTopicConfig::default(),
create_index: true,
create_index_interval: Duration::from_secs(60),
index_checkpoint_interval: Duration::from_secs(5 * 60),
}
}
}
61 changes: 48 additions & 13 deletions src/datanode/src/datanode.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
use std::path::Path;
use std::sync::Arc;
use std::time::Duration;

use catalog::memory::MemoryCatalogManager;
use common_base::Plugins;
Expand All @@ -32,6 +33,7 @@ use common_wal::config::DatanodeWalConfig;
use file_engine::engine::FileRegionEngine;
use futures_util::TryStreamExt;
use log_store::kafka::log_store::KafkaLogStore;
use log_store::kafka::{default_index_file, GlobalIndexCollector};
use log_store::raft_engine::log_store::RaftEngineLogStore;
use meta_client::MetaClientRef;
use metric_engine::engine::MetricEngine;
Expand Down Expand Up @@ -64,7 +66,7 @@ use crate::event_listener::{
use crate::greptimedb_telemetry::get_greptimedb_telemetry_task;
use crate::heartbeat::HeartbeatTask;
use crate::region_server::{DummyTableProviderFactory, RegionServer};
use crate::store;
use crate::store::{self, new_object_store_without_cache};

/// Datanode service.
pub struct Datanode {
Expand Down Expand Up @@ -398,15 +400,35 @@ impl DatanodeBuilder {
)
.await
.context(BuildMitoEngineSnafu)?,
DatanodeWalConfig::Kafka(kafka_config) => MitoEngine::new(
&opts.storage.data_home,
config,
Self::build_kafka_log_store(kafka_config).await?,
object_store_manager,
plugins,
)
.await
.context(BuildMitoEngineSnafu)?,
DatanodeWalConfig::Kafka(kafka_config) => {
let global_index_collector = if kafka_config.create_index && opts.node_id.is_some()
{
let operator = new_object_store_without_cache(
&opts.storage.store,
&opts.storage.data_home,
)
.await?;
let path = default_index_file(opts.node_id.unwrap());
Some(Self::build_global_index_collector(
kafka_config.create_index_interval,
kafka_config.index_checkpoint_interval,
operator,
path,
))
} else {
None
};

MitoEngine::new(
&opts.storage.data_home,
config,
Self::build_kafka_log_store(kafka_config, global_index_collector).await?,
object_store_manager,
plugins,
)
.await
.context(BuildMitoEngineSnafu)?
}
};
Ok(mito_engine)
}
Expand Down Expand Up @@ -438,14 +460,27 @@ impl DatanodeBuilder {
Ok(Arc::new(logstore))
}

/// Builds [KafkaLogStore].
async fn build_kafka_log_store(config: &DatanodeKafkaConfig) -> Result<Arc<KafkaLogStore>> {
KafkaLogStore::try_new(config)
/// Builds [`KafkaLogStore`].
async fn build_kafka_log_store(
config: &DatanodeKafkaConfig,
global_index_collector: Option<GlobalIndexCollector>,
) -> Result<Arc<KafkaLogStore>> {
KafkaLogStore::try_new(config, global_index_collector)
.await
.map_err(Box::new)
.context(OpenLogStoreSnafu)
.map(Arc::new)
}

/// Builds [`GlobalIndexCollector`]
fn build_global_index_collector(
dump_index_interval: Duration,
checkpoint_interval: Duration,
operator: object_store::ObjectStore,
path: String,
) -> GlobalIndexCollector {
GlobalIndexCollector::new(dump_index_interval, checkpoint_interval, operator, path)
}
}

/// Open all regions belong to this datanode.
Expand Down
69 changes: 51 additions & 18 deletions src/datanode/src/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,18 +29,18 @@ use common_telemetry::{info, warn};
use object_store::layers::{LruCacheLayer, RetryInterceptor, RetryLayer};
use object_store::services::Fs;
use object_store::util::{join_dir, normalize_dir, with_instrument_layers};
use object_store::{Error, HttpClient, ObjectStore, ObjectStoreBuilder};
use object_store::{Access, Error, HttpClient, ObjectStore, ObjectStoreBuilder};
use snafu::prelude::*;

use crate::config::{ObjectStoreConfig, DEFAULT_OBJECT_STORE_CACHE_SIZE};
use crate::error::{self, Result};

pub(crate) async fn new_object_store(
store: ObjectStoreConfig,
pub(crate) async fn new_raw_object_store(
store: &ObjectStoreConfig,
data_home: &str,
) -> Result<ObjectStore> {
let data_home = normalize_dir(data_home);
let object_store = match &store {
let object_store = match store {
ObjectStoreConfig::File(file_config) => {
fs::new_fs_object_store(&data_home, file_config).await
}
Expand All @@ -51,27 +51,61 @@ pub(crate) async fn new_object_store(
}
ObjectStoreConfig::Gcs(gcs_config) => gcs::new_gcs_object_store(gcs_config).await,
}?;
Ok(object_store)
}

fn with_retry_layers(object_store: ObjectStore) -> ObjectStore {
object_store.layer(
RetryLayer::new()
.with_jitter()
.with_notify(PrintDetailedError),
)
}

pub(crate) async fn new_object_store_without_cache(
store: &ObjectStoreConfig,
data_home: &str,
) -> Result<ObjectStore> {
let object_store = new_raw_object_store(store, data_home).await?;
// Enable retry layer and cache layer for non-fs object storages
let object_store = if !matches!(store, ObjectStoreConfig::File(..)) {
let object_store = create_object_store_with_cache(object_store, &store).await?;
object_store.layer(
RetryLayer::new()
.with_jitter()
.with_notify(PrintDetailedError),
)
// Adds retry layer
with_retry_layers(object_store)
} else {
object_store
};

let store = with_instrument_layers(object_store, true);
Ok(store)
let object_store = with_instrument_layers(object_store, true);
Ok(object_store)
}

async fn create_object_store_with_cache(
object_store: ObjectStore,
store_config: &ObjectStoreConfig,
pub(crate) async fn new_object_store(
store: ObjectStoreConfig,
data_home: &str,
) -> Result<ObjectStore> {
let object_store = new_raw_object_store(&store, data_home).await?;
// Enable retry layer and cache layer for non-fs object storages
let object_store = if !matches!(store, ObjectStoreConfig::File(..)) {
let object_store = if let Some(cache_layer) = build_cache_layer(&store).await? {
// Adds cache layer
object_store.layer(cache_layer)
} else {
object_store
};

// Adds retry layer
with_retry_layers(object_store)
} else {
object_store
};

let object_store = with_instrument_layers(object_store, true);
Ok(object_store)
}

async fn build_cache_layer(
store_config: &ObjectStoreConfig,
) -> Result<Option<LruCacheLayer<impl Access>>> {
let (cache_path, cache_capacity) = match store_config {
ObjectStoreConfig::S3(s3_config) => {
let path = s3_config.cache.cache_path.as_ref();
Expand Down Expand Up @@ -127,9 +161,9 @@ async fn create_object_store_with_cache(
path, cache_capacity
);

Ok(object_store.layer(cache_layer))
Ok(Some(cache_layer))
} else {
Ok(object_store)
Ok(None)
}
}

Expand Down Expand Up @@ -175,7 +209,6 @@ pub(crate) fn build_http_client() -> Result<HttpClient> {

HttpClient::build(http_builder).context(error::InitBackendSnafu)
}

struct PrintDetailedError;

// PrintDetailedError is a retry interceptor that prints error in Debug format in retrying.
Expand Down
1 change: 1 addition & 0 deletions src/log-store/src/kafka.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ pub(crate) mod util;
#[allow(dead_code)]
pub(crate) mod worker;

pub use index::{default_index_file, GlobalIndexCollector};
use serde::{Deserialize, Serialize};
use store_api::logstore::entry::Id as EntryId;

Expand Down
9 changes: 6 additions & 3 deletions src/log-store/src/kafka/client_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,10 @@ pub(crate) struct ClientManager {

impl ClientManager {
/// Tries to create a ClientManager.
pub(crate) async fn try_new(config: &DatanodeKafkaConfig) -> Result<Self> {
pub(crate) async fn try_new(
config: &DatanodeKafkaConfig,
global_index_collector: Option<GlobalIndexCollector>,
) -> Result<Self> {
// Sets backoff config for the top-level kafka client and all clients constructed by it.
let backoff_config = BackoffConfig {
init_backoff: config.backoff.init,
Expand All @@ -93,7 +96,7 @@ impl ClientManager {
instances: RwLock::new(HashMap::new()),
flush_batch_size: config.max_batch_bytes.as_bytes() as usize,
compression: Compression::Lz4,
global_index_collector: None,
global_index_collector,
})
}

Expand Down Expand Up @@ -211,7 +214,7 @@ mod tests {
broker_endpoints,
..Default::default()
};
let manager = ClientManager::try_new(&config).await.unwrap();
let manager = ClientManager::try_new(&config, None).await.unwrap();

(manager, topics)
}
Expand Down
9 changes: 6 additions & 3 deletions src/log-store/src/kafka/index.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,14 @@ mod collector;
mod encoder;
mod iterator;

pub(crate) use collector::{
GlobalIndexCollector, IndexCollector, NoopCollector, ProviderLevelIndexCollector,
};
pub use collector::GlobalIndexCollector;
pub(crate) use collector::{IndexCollector, NoopCollector, ProviderLevelIndexCollector};
pub(crate) use encoder::{IndexEncoder, JsonIndexEncoder};
pub(crate) use iterator::{
MultipleRegionWalIndexIterator, NextBatchHint, RegionWalIndexIterator, RegionWalRange,
RegionWalVecIndex,
};

pub fn default_index_file(datanode_id: u64) -> String {
format!("__datanode/{datanode_id}/index.json")
}
2 changes: 1 addition & 1 deletion src/log-store/src/kafka/index/collector.rs
Original file line number Diff line number Diff line change
Expand Up @@ -189,7 +189,7 @@ impl GlobalIndexCollector {

impl GlobalIndexCollector {
/// Creates a new [`ProviderLevelIndexCollector`] for a specified provider.
pub fn provider_level_index_collector(
pub(crate) fn provider_level_index_collector(
&self,
provider: Arc<KafkaProvider>,
sender: Sender<WorkerRequest>,
Expand Down
13 changes: 9 additions & 4 deletions src/log-store/src/kafka/log_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ use store_api::storage::RegionId;

use crate::error::{self, ConsumeRecordSnafu, Error, GetOffsetSnafu, InvalidProviderSnafu, Result};
use crate::kafka::client_manager::{ClientManager, ClientManagerRef};
use crate::kafka::index::GlobalIndexCollector;
use crate::kafka::producer::OrderedBatchProducerRef;
use crate::kafka::util::record::{
convert_to_kafka_records, maybe_emit_entry, remaining_entries, Record, ESTIMATED_META_SIZE,
Expand All @@ -51,8 +52,12 @@ pub struct KafkaLogStore {

impl KafkaLogStore {
/// Tries to create a Kafka log store.
pub async fn try_new(config: &DatanodeKafkaConfig) -> Result<Self> {
let client_manager = Arc::new(ClientManager::try_new(config).await?);
pub async fn try_new(
config: &DatanodeKafkaConfig,
global_index_collector: Option<GlobalIndexCollector>,
) -> Result<Self> {
let client_manager =
Arc::new(ClientManager::try_new(config, global_index_collector).await?);

Ok(Self {
client_manager,
Expand Down Expand Up @@ -466,7 +471,7 @@ mod tests {
max_batch_bytes: ReadableSize::kb(32),
..Default::default()
};
let logstore = KafkaLogStore::try_new(&config).await.unwrap();
let logstore = KafkaLogStore::try_new(&config, None).await.unwrap();
let topic_name = uuid::Uuid::new_v4().to_string();
let provider = Provider::kafka_provider(topic_name);
let region_entries = (0..5)
Expand Down Expand Up @@ -535,7 +540,7 @@ mod tests {
max_batch_bytes: ReadableSize::kb(8),
..Default::default()
};
let logstore = KafkaLogStore::try_new(&config).await.unwrap();
let logstore = KafkaLogStore::try_new(&config, None).await.unwrap();
let topic_name = uuid::Uuid::new_v4().to_string();
let provider = Provider::kafka_provider(topic_name);
let region_entries = (0..5)
Expand Down
11 changes: 7 additions & 4 deletions src/log-store/src/test_util/log_store_util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,10 +33,13 @@ pub async fn create_tmp_local_file_log_store<P: AsRef<Path>>(path: P) -> RaftEng

/// Create a [KafkaLogStore].
pub async fn create_kafka_log_store(broker_endpoints: Vec<String>) -> KafkaLogStore {
KafkaLogStore::try_new(&DatanodeKafkaConfig {
broker_endpoints,
..Default::default()
})
KafkaLogStore::try_new(
&DatanodeKafkaConfig {
broker_endpoints,
..Default::default()
},
None,
)
.await
.unwrap()
}
2 changes: 1 addition & 1 deletion src/object-store/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

pub use opendal::raw::{normalize_path as raw_normalize_path, HttpClient};
pub use opendal::raw::{normalize_path as raw_normalize_path, Access, HttpClient};
pub use opendal::{
services, Builder as ObjectStoreBuilder, Entry, EntryMode, Error, ErrorKind,
FuturesAsyncReader, FuturesAsyncWriter, Lister, Metakey, Operator as ObjectStore, Reader,
Expand Down

0 comments on commit 9a86dff

Please sign in to comment.