Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(log_store): introduce the CollectionTask #4530

Merged
merged 9 commits into from
Aug 19, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions config/config.md
Original file line number Diff line number Diff line change
Expand Up @@ -374,6 +374,8 @@
| `wal.backoff_max` | String | `10s` | The maximum backoff delay.<br/>**It's only used when the provider is `kafka`**. |
| `wal.backoff_base` | Integer | `2` | The exponential backoff rate, i.e. next backoff = base * current backoff.<br/>**It's only used when the provider is `kafka`**. |
| `wal.backoff_deadline` | String | `5mins` | The deadline of retries.<br/>**It's only used when the provider is `kafka`**. |
| `wal.create_index` | Bool | `true` | Whether to enable WAL index creation.<br/>**It's only used when the provider is `kafka`**. |
| `wal.dump_index_interval` | String | `60s` | The interval for dumping WAL indexes.<br/>**It's only used when the provider is `kafka`**. |
| `storage` | -- | -- | The data storage options. |
| `storage.data_home` | String | `/tmp/greptimedb/` | The working home directory. |
| `storage.type` | String | `File` | The storage type used to store the data.<br/>- `File`: the data is stored in the local file system.<br/>- `S3`: the data is stored in the S3 object storage.<br/>- `Gcs`: the data is stored in the Google Cloud Storage.<br/>- `Azblob`: the data is stored in the Azure Blob Storage.<br/>- `Oss`: the data is stored in the Aliyun OSS. |
Expand Down
8 changes: 8 additions & 0 deletions config/datanode.example.toml
Original file line number Diff line number Diff line change
Expand Up @@ -187,6 +187,14 @@ backoff_base = 2
## **It's only used when the provider is `kafka`**.
backoff_deadline = "5mins"

## Whether to enable WAL index creation.
## **It's only used when the provider is `kafka`**.
create_index = true

## The interval for dumping WAL indexes.
## **It's only used when the provider is `kafka`**.
dump_index_interval = "60s"

# The Kafka SASL configuration.
# **It's only used when the provider is `kafka`**.
# Available SASL mechanisms:
Expand Down
2 changes: 2 additions & 0 deletions src/common/wal/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ pub enum MetasrvWalConfig {
Kafka(MetasrvKafkaConfig),
}

#[allow(clippy::large_enum_variant)]
/// Wal configurations for datanode.
#[derive(Debug, Serialize, Deserialize, Clone, PartialEq)]
#[serde(tag = "provider", rename_all = "snake_case")]
Expand Down Expand Up @@ -223,6 +224,7 @@ mod tests {
replication_factor: 1,
create_topic_timeout: Duration::from_secs(30),
},
..Default::default()
};
assert_eq!(datanode_wal_config, DatanodeWalConfig::Kafka(expected));
}
Expand Down
5 changes: 5 additions & 0 deletions src/common/wal/src/config/kafka/datanode.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,9 @@ pub struct DatanodeKafkaConfig {
/// The kafka topic config.
#[serde(flatten)]
pub kafka_topic: KafkaTopicConfig,
pub create_index: bool,
#[serde(with = "humantime_serde")]
pub dump_index_interval: Duration,
}

impl Default for DatanodeKafkaConfig {
Expand All @@ -51,6 +54,8 @@ impl Default for DatanodeKafkaConfig {
consumer_wait_timeout: Duration::from_millis(100),
backoff: BackoffConfig::default(),
kafka_topic: KafkaTopicConfig::default(),
create_index: true,
dump_index_interval: Duration::from_secs(60),
}
}
}
62 changes: 49 additions & 13 deletions src/datanode/src/datanode.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

use std::path::Path;
use std::sync::Arc;
use std::time::Duration;

use catalog::memory::MemoryCatalogManager;
use common_base::Plugins;
Expand All @@ -32,6 +33,7 @@ use common_wal::config::DatanodeWalConfig;
use file_engine::engine::FileRegionEngine;
use futures_util::TryStreamExt;
use log_store::kafka::log_store::KafkaLogStore;
use log_store::kafka::{default_index_file, GlobalIndexCollector};
use log_store::raft_engine::log_store::RaftEngineLogStore;
use meta_client::MetaClientRef;
use metric_engine::engine::MetricEngine;
Expand Down Expand Up @@ -64,7 +66,7 @@ use crate::event_listener::{
use crate::greptimedb_telemetry::get_greptimedb_telemetry_task;
use crate::heartbeat::HeartbeatTask;
use crate::region_server::{DummyTableProviderFactory, RegionServer};
use crate::store;
use crate::store::{self, new_object_store_without_cache};

/// Datanode service.
pub struct Datanode {
Expand Down Expand Up @@ -398,15 +400,37 @@ impl DatanodeBuilder {
)
.await
.context(BuildMitoEngineSnafu)?,
DatanodeWalConfig::Kafka(kafka_config) => MitoEngine::new(
&opts.storage.data_home,
config,
Self::build_kafka_log_store(kafka_config).await?,
object_store_manager,
plugins,
)
.await
.context(BuildMitoEngineSnafu)?,
DatanodeWalConfig::Kafka(kafka_config) => {
if kafka_config.create_index && opts.node_id.is_none() {
warn!("The WAL index creation only available in distributed mode.")
}
let global_index_collector = if kafka_config.create_index && opts.node_id.is_some()
WenyXu marked this conversation as resolved.
Show resolved Hide resolved
{
let operator = new_object_store_without_cache(
&opts.storage.store,
&opts.storage.data_home,
)
.await?;
let path = default_index_file(opts.node_id.unwrap());
Some(Self::build_global_index_collector(
kafka_config.dump_index_interval,
operator,
path,
))
} else {
None
};

MitoEngine::new(
&opts.storage.data_home,
config,
Self::build_kafka_log_store(kafka_config, global_index_collector).await?,
object_store_manager,
plugins,
)
.await
.context(BuildMitoEngineSnafu)?
}
};
Ok(mito_engine)
}
Expand Down Expand Up @@ -438,14 +462,26 @@ impl DatanodeBuilder {
Ok(Arc::new(logstore))
}

/// Builds [KafkaLogStore].
async fn build_kafka_log_store(config: &DatanodeKafkaConfig) -> Result<Arc<KafkaLogStore>> {
KafkaLogStore::try_new(config)
/// Builds [`KafkaLogStore`].
async fn build_kafka_log_store(
config: &DatanodeKafkaConfig,
global_index_collector: Option<GlobalIndexCollector>,
) -> Result<Arc<KafkaLogStore>> {
KafkaLogStore::try_new(config, global_index_collector)
.await
.map_err(Box::new)
.context(OpenLogStoreSnafu)
.map(Arc::new)
}

/// Builds [`GlobalIndexCollector`]
fn build_global_index_collector(
dump_index_interval: Duration,
operator: object_store::ObjectStore,
path: String,
) -> GlobalIndexCollector {
GlobalIndexCollector::new(dump_index_interval, operator, path)
}
}

/// Open all regions belong to this datanode.
Expand Down
69 changes: 51 additions & 18 deletions src/datanode/src/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,18 +29,18 @@ use common_telemetry::{info, warn};
use object_store::layers::{LruCacheLayer, RetryInterceptor, RetryLayer};
use object_store::services::Fs;
use object_store::util::{join_dir, normalize_dir, with_instrument_layers};
use object_store::{Error, HttpClient, ObjectStore, ObjectStoreBuilder};
use object_store::{Access, Error, HttpClient, ObjectStore, ObjectStoreBuilder};
use snafu::prelude::*;

use crate::config::{ObjectStoreConfig, DEFAULT_OBJECT_STORE_CACHE_SIZE};
use crate::error::{self, Result};

pub(crate) async fn new_object_store(
store: ObjectStoreConfig,
pub(crate) async fn new_raw_object_store(
store: &ObjectStoreConfig,
data_home: &str,
) -> Result<ObjectStore> {
let data_home = normalize_dir(data_home);
let object_store = match &store {
let object_store = match store {
ObjectStoreConfig::File(file_config) => {
fs::new_fs_object_store(&data_home, file_config).await
}
Expand All @@ -51,27 +51,61 @@ pub(crate) async fn new_object_store(
}
ObjectStoreConfig::Gcs(gcs_config) => gcs::new_gcs_object_store(gcs_config).await,
}?;
Ok(object_store)
}

fn with_retry_layers(object_store: ObjectStore) -> ObjectStore {
object_store.layer(
RetryLayer::new()
.with_jitter()
.with_notify(PrintDetailedError),
)
}

pub(crate) async fn new_object_store_without_cache(
store: &ObjectStoreConfig,
data_home: &str,
) -> Result<ObjectStore> {
let object_store = new_raw_object_store(store, data_home).await?;
// Enable retry layer and cache layer for non-fs object storages
let object_store = if !matches!(store, ObjectStoreConfig::File(..)) {
let object_store = create_object_store_with_cache(object_store, &store).await?;
object_store.layer(
RetryLayer::new()
.with_jitter()
.with_notify(PrintDetailedError),
)
// Adds retry layer
with_retry_layers(object_store)
} else {
object_store
};

let store = with_instrument_layers(object_store, true);
Ok(store)
let object_store = with_instrument_layers(object_store, true);
Ok(object_store)
}

async fn create_object_store_with_cache(
object_store: ObjectStore,
store_config: &ObjectStoreConfig,
pub(crate) async fn new_object_store(
store: ObjectStoreConfig,
data_home: &str,
) -> Result<ObjectStore> {
let object_store = new_raw_object_store(&store, data_home).await?;
// Enable retry layer and cache layer for non-fs object storages
let object_store = if !matches!(store, ObjectStoreConfig::File(..)) {
let object_store = if let Some(cache_layer) = build_cache_layer(&store).await? {
// Adds cache layer
object_store.layer(cache_layer)
} else {
object_store
};

// Adds retry layer
with_retry_layers(object_store)
} else {
object_store
};

let object_store = with_instrument_layers(object_store, true);
Ok(object_store)
}

async fn build_cache_layer(
store_config: &ObjectStoreConfig,
) -> Result<Option<LruCacheLayer<impl Access>>> {
let (cache_path, cache_capacity) = match store_config {
ObjectStoreConfig::S3(s3_config) => {
let path = s3_config.cache.cache_path.as_ref();
Expand Down Expand Up @@ -127,9 +161,9 @@ async fn create_object_store_with_cache(
path, cache_capacity
);

Ok(object_store.layer(cache_layer))
Ok(Some(cache_layer))
} else {
Ok(object_store)
Ok(None)
}
}

Expand Down Expand Up @@ -175,7 +209,6 @@ pub(crate) fn build_http_client() -> Result<HttpClient> {

HttpClient::build(http_builder).context(error::InitBackendSnafu)
}

struct PrintDetailedError;

// PrintDetailedError is a retry interceptor that prints error in Debug format in retrying.
Expand Down
2 changes: 2 additions & 0 deletions src/log-store/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,12 @@ common-runtime.workspace = true
common-telemetry.workspace = true
common-time.workspace = true
common-wal.workspace = true
delta-encoding = "0.4"
futures.workspace = true
futures-util.workspace = true
itertools.workspace = true
lazy_static.workspace = true
object-store.workspace = true
pin-project.workspace = true
prometheus.workspace = true
protobuf = { version = "2", features = ["bytes"] }
Expand Down
26 changes: 25 additions & 1 deletion src/log-store/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -272,14 +272,38 @@ pub enum Error {
location: Location,
},

#[snafu(display("Failed to send produce request"))]
#[snafu(display("Failed to wait for ProduceResultReceiver"))]
WaitProduceResultReceiver {
#[snafu(implicit)]
location: Location,
#[snafu(source)]
error: tokio::sync::oneshot::error::RecvError,
},

#[snafu(display("Failed to wait for result of DumpIndex"))]
WaitDumpIndex {
#[snafu(implicit)]
location: Location,
#[snafu(source)]
error: tokio::sync::oneshot::error::RecvError,
},

#[snafu(display("Failed to create writer"))]
CreateWriter {
#[snafu(implicit)]
location: Location,
#[snafu(source)]
error: object_store::Error,
},

#[snafu(display("Failed to write index"))]
WriteIndex {
#[snafu(implicit)]
location: Location,
#[snafu(source)]
error: object_store::Error,
},

#[snafu(display(
"The length of meta if exceeded the limit: {}, actual: {}",
limit,
Expand Down
9 changes: 5 additions & 4 deletions src/log-store/src/kafka.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,18 +13,19 @@
// limitations under the License.

pub(crate) mod client_manager;
// TODO(weny): remove it
#[allow(dead_code)]
pub(crate) mod consumer;
#[allow(unused)]
/// TODO(weny): remove it.
#[allow(dead_code)]
#[allow(unused_imports)]
pub(crate) mod index;
pub mod log_store;
pub(crate) mod producer;
pub(crate) mod util;
// TODO(weny): remove it
/// TODO(weny): remove it.
#[allow(dead_code)]
pub(crate) mod worker;

pub use index::{default_index_file, GlobalIndexCollector};
use serde::{Deserialize, Serialize};
use store_api::logstore::entry::Id as EntryId;

Expand Down
Loading
Loading