Skip to content

Commit

Permalink
feat(log_store): introduce the CollectionTask (#4530)
Browse files Browse the repository at this point in the history
* feat: introduce the `CollectionTask`

* feat: add config of index collector

* chore: remove unused code

* feat: truncate indexes

* chore: apply suggestions from CR

* chore: update config examples

* refactor: retrieve latest offset while dumping indexes

* chore: print warn
  • Loading branch information
WenyXu authored Aug 19, 2024
1 parent c8de8b8 commit 7f6fd54
Show file tree
Hide file tree
Showing 24 changed files with 578 additions and 102 deletions.
11 changes: 11 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions config/config.md
Original file line number Diff line number Diff line change
Expand Up @@ -374,6 +374,8 @@
| `wal.backoff_max` | String | `10s` | The maximum backoff delay.<br/>**It's only used when the provider is `kafka`**. |
| `wal.backoff_base` | Integer | `2` | The exponential backoff rate, i.e. next backoff = base * current backoff.<br/>**It's only used when the provider is `kafka`**. |
| `wal.backoff_deadline` | String | `5mins` | The deadline of retries.<br/>**It's only used when the provider is `kafka`**. |
| `wal.create_index` | Bool | `true` | Whether to enable WAL index creation.<br/>**It's only used when the provider is `kafka`**. |
| `wal.dump_index_interval` | String | `60s` | The interval for dumping WAL indexes.<br/>**It's only used when the provider is `kafka`**. |
| `storage` | -- | -- | The data storage options. |
| `storage.data_home` | String | `/tmp/greptimedb/` | The working home directory. |
| `storage.type` | String | `File` | The storage type used to store the data.<br/>- `File`: the data is stored in the local file system.<br/>- `S3`: the data is stored in the S3 object storage.<br/>- `Gcs`: the data is stored in the Google Cloud Storage.<br/>- `Azblob`: the data is stored in the Azure Blob Storage.<br/>- `Oss`: the data is stored in the Aliyun OSS. |
Expand Down
8 changes: 8 additions & 0 deletions config/datanode.example.toml
Original file line number Diff line number Diff line change
Expand Up @@ -187,6 +187,14 @@ backoff_base = 2
## **It's only used when the provider is `kafka`**.
backoff_deadline = "5mins"

## Whether to enable WAL index creation.
## **It's only used when the provider is `kafka`**.
create_index = true

## The interval for dumping WAL indexes.
## **It's only used when the provider is `kafka`**.
dump_index_interval = "60s"

# The Kafka SASL configuration.
# **It's only used when the provider is `kafka`**.
# Available SASL mechanisms:
Expand Down
2 changes: 2 additions & 0 deletions src/common/wal/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ pub enum MetasrvWalConfig {
Kafka(MetasrvKafkaConfig),
}

#[allow(clippy::large_enum_variant)]
/// Wal configurations for datanode.
#[derive(Debug, Serialize, Deserialize, Clone, PartialEq)]
#[serde(tag = "provider", rename_all = "snake_case")]
Expand Down Expand Up @@ -223,6 +224,7 @@ mod tests {
replication_factor: 1,
create_topic_timeout: Duration::from_secs(30),
},
..Default::default()
};
assert_eq!(datanode_wal_config, DatanodeWalConfig::Kafka(expected));
}
Expand Down
5 changes: 5 additions & 0 deletions src/common/wal/src/config/kafka/datanode.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,9 @@ pub struct DatanodeKafkaConfig {
/// The kafka topic config.
#[serde(flatten)]
pub kafka_topic: KafkaTopicConfig,
pub create_index: bool,
#[serde(with = "humantime_serde")]
pub dump_index_interval: Duration,
}

impl Default for DatanodeKafkaConfig {
Expand All @@ -51,6 +54,8 @@ impl Default for DatanodeKafkaConfig {
consumer_wait_timeout: Duration::from_millis(100),
backoff: BackoffConfig::default(),
kafka_topic: KafkaTopicConfig::default(),
create_index: true,
dump_index_interval: Duration::from_secs(60),
}
}
}
62 changes: 49 additions & 13 deletions src/datanode/src/datanode.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
use std::path::Path;
use std::sync::Arc;
use std::time::Duration;

use catalog::memory::MemoryCatalogManager;
use common_base::Plugins;
Expand All @@ -32,6 +33,7 @@ use common_wal::config::DatanodeWalConfig;
use file_engine::engine::FileRegionEngine;
use futures_util::TryStreamExt;
use log_store::kafka::log_store::KafkaLogStore;
use log_store::kafka::{default_index_file, GlobalIndexCollector};
use log_store::raft_engine::log_store::RaftEngineLogStore;
use meta_client::MetaClientRef;
use metric_engine::engine::MetricEngine;
Expand Down Expand Up @@ -64,7 +66,7 @@ use crate::event_listener::{
use crate::greptimedb_telemetry::get_greptimedb_telemetry_task;
use crate::heartbeat::HeartbeatTask;
use crate::region_server::{DummyTableProviderFactory, RegionServer};
use crate::store;
use crate::store::{self, new_object_store_without_cache};

/// Datanode service.
pub struct Datanode {
Expand Down Expand Up @@ -398,15 +400,37 @@ impl DatanodeBuilder {
)
.await
.context(BuildMitoEngineSnafu)?,
DatanodeWalConfig::Kafka(kafka_config) => MitoEngine::new(
&opts.storage.data_home,
config,
Self::build_kafka_log_store(kafka_config).await?,
object_store_manager,
plugins,
)
.await
.context(BuildMitoEngineSnafu)?,
DatanodeWalConfig::Kafka(kafka_config) => {
if kafka_config.create_index && opts.node_id.is_none() {
warn!("The WAL index creation only available in distributed mode.")
}
let global_index_collector = if kafka_config.create_index && opts.node_id.is_some()
{
let operator = new_object_store_without_cache(
&opts.storage.store,
&opts.storage.data_home,
)
.await?;
let path = default_index_file(opts.node_id.unwrap());
Some(Self::build_global_index_collector(
kafka_config.dump_index_interval,
operator,
path,
))
} else {
None
};

MitoEngine::new(
&opts.storage.data_home,
config,
Self::build_kafka_log_store(kafka_config, global_index_collector).await?,
object_store_manager,
plugins,
)
.await
.context(BuildMitoEngineSnafu)?
}
};
Ok(mito_engine)
}
Expand Down Expand Up @@ -438,14 +462,26 @@ impl DatanodeBuilder {
Ok(Arc::new(logstore))
}

/// Builds [KafkaLogStore].
async fn build_kafka_log_store(config: &DatanodeKafkaConfig) -> Result<Arc<KafkaLogStore>> {
KafkaLogStore::try_new(config)
/// Builds [`KafkaLogStore`].
async fn build_kafka_log_store(
config: &DatanodeKafkaConfig,
global_index_collector: Option<GlobalIndexCollector>,
) -> Result<Arc<KafkaLogStore>> {
KafkaLogStore::try_new(config, global_index_collector)
.await
.map_err(Box::new)
.context(OpenLogStoreSnafu)
.map(Arc::new)
}

/// Builds [`GlobalIndexCollector`]
fn build_global_index_collector(
dump_index_interval: Duration,
operator: object_store::ObjectStore,
path: String,
) -> GlobalIndexCollector {
GlobalIndexCollector::new(dump_index_interval, operator, path)
}
}

/// Open all regions belong to this datanode.
Expand Down
69 changes: 51 additions & 18 deletions src/datanode/src/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,18 +29,18 @@ use common_telemetry::{info, warn};
use object_store::layers::{LruCacheLayer, RetryInterceptor, RetryLayer};
use object_store::services::Fs;
use object_store::util::{join_dir, normalize_dir, with_instrument_layers};
use object_store::{Error, HttpClient, ObjectStore, ObjectStoreBuilder};
use object_store::{Access, Error, HttpClient, ObjectStore, ObjectStoreBuilder};
use snafu::prelude::*;

use crate::config::{ObjectStoreConfig, DEFAULT_OBJECT_STORE_CACHE_SIZE};
use crate::error::{self, Result};

pub(crate) async fn new_object_store(
store: ObjectStoreConfig,
pub(crate) async fn new_raw_object_store(
store: &ObjectStoreConfig,
data_home: &str,
) -> Result<ObjectStore> {
let data_home = normalize_dir(data_home);
let object_store = match &store {
let object_store = match store {
ObjectStoreConfig::File(file_config) => {
fs::new_fs_object_store(&data_home, file_config).await
}
Expand All @@ -51,27 +51,61 @@ pub(crate) async fn new_object_store(
}
ObjectStoreConfig::Gcs(gcs_config) => gcs::new_gcs_object_store(gcs_config).await,
}?;
Ok(object_store)
}

fn with_retry_layers(object_store: ObjectStore) -> ObjectStore {
object_store.layer(
RetryLayer::new()
.with_jitter()
.with_notify(PrintDetailedError),
)
}

pub(crate) async fn new_object_store_without_cache(
store: &ObjectStoreConfig,
data_home: &str,
) -> Result<ObjectStore> {
let object_store = new_raw_object_store(store, data_home).await?;
// Enable retry layer and cache layer for non-fs object storages
let object_store = if !matches!(store, ObjectStoreConfig::File(..)) {
let object_store = create_object_store_with_cache(object_store, &store).await?;
object_store.layer(
RetryLayer::new()
.with_jitter()
.with_notify(PrintDetailedError),
)
// Adds retry layer
with_retry_layers(object_store)
} else {
object_store
};

let store = with_instrument_layers(object_store, true);
Ok(store)
let object_store = with_instrument_layers(object_store, true);
Ok(object_store)
}

async fn create_object_store_with_cache(
object_store: ObjectStore,
store_config: &ObjectStoreConfig,
pub(crate) async fn new_object_store(
store: ObjectStoreConfig,
data_home: &str,
) -> Result<ObjectStore> {
let object_store = new_raw_object_store(&store, data_home).await?;
// Enable retry layer and cache layer for non-fs object storages
let object_store = if !matches!(store, ObjectStoreConfig::File(..)) {
let object_store = if let Some(cache_layer) = build_cache_layer(&store).await? {
// Adds cache layer
object_store.layer(cache_layer)
} else {
object_store
};

// Adds retry layer
with_retry_layers(object_store)
} else {
object_store
};

let object_store = with_instrument_layers(object_store, true);
Ok(object_store)
}

async fn build_cache_layer(
store_config: &ObjectStoreConfig,
) -> Result<Option<LruCacheLayer<impl Access>>> {
let (cache_path, cache_capacity) = match store_config {
ObjectStoreConfig::S3(s3_config) => {
let path = s3_config.cache.cache_path.as_ref();
Expand Down Expand Up @@ -127,9 +161,9 @@ async fn create_object_store_with_cache(
path, cache_capacity
);

Ok(object_store.layer(cache_layer))
Ok(Some(cache_layer))
} else {
Ok(object_store)
Ok(None)
}
}

Expand Down Expand Up @@ -175,7 +209,6 @@ pub(crate) fn build_http_client() -> Result<HttpClient> {

HttpClient::build(http_builder).context(error::InitBackendSnafu)
}

struct PrintDetailedError;

// PrintDetailedError is a retry interceptor that prints error in Debug format in retrying.
Expand Down
2 changes: 2 additions & 0 deletions src/log-store/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,12 @@ common-runtime.workspace = true
common-telemetry.workspace = true
common-time.workspace = true
common-wal.workspace = true
delta-encoding = "0.4"
futures.workspace = true
futures-util.workspace = true
itertools.workspace = true
lazy_static.workspace = true
object-store.workspace = true
pin-project.workspace = true
prometheus.workspace = true
protobuf = { version = "2", features = ["bytes"] }
Expand Down
26 changes: 25 additions & 1 deletion src/log-store/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -272,14 +272,38 @@ pub enum Error {
location: Location,
},

#[snafu(display("Failed to send produce request"))]
#[snafu(display("Failed to wait for ProduceResultReceiver"))]
WaitProduceResultReceiver {
#[snafu(implicit)]
location: Location,
#[snafu(source)]
error: tokio::sync::oneshot::error::RecvError,
},

#[snafu(display("Failed to wait for result of DumpIndex"))]
WaitDumpIndex {
#[snafu(implicit)]
location: Location,
#[snafu(source)]
error: tokio::sync::oneshot::error::RecvError,
},

#[snafu(display("Failed to create writer"))]
CreateWriter {
#[snafu(implicit)]
location: Location,
#[snafu(source)]
error: object_store::Error,
},

#[snafu(display("Failed to write index"))]
WriteIndex {
#[snafu(implicit)]
location: Location,
#[snafu(source)]
error: object_store::Error,
},

#[snafu(display(
"The length of meta if exceeded the limit: {}, actual: {}",
limit,
Expand Down
9 changes: 5 additions & 4 deletions src/log-store/src/kafka.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,18 +13,19 @@
// limitations under the License.

pub(crate) mod client_manager;
// TODO(weny): remove it
#[allow(dead_code)]
pub(crate) mod consumer;
#[allow(unused)]
/// TODO(weny): remove it.
#[allow(dead_code)]
#[allow(unused_imports)]
pub(crate) mod index;
pub mod log_store;
pub(crate) mod producer;
pub(crate) mod util;
// TODO(weny): remove it
/// TODO(weny): remove it.
#[allow(dead_code)]
pub(crate) mod worker;

pub use index::{default_index_file, GlobalIndexCollector};
use serde::{Deserialize, Serialize};
use store_api::logstore::entry::Id as EntryId;

Expand Down
Loading

0 comments on commit 7f6fd54

Please sign in to comment.