Skip to content

Commit

Permalink
refactor: unify the injection of WAL option (#3066)
Browse files Browse the repository at this point in the history
* feat: add prepare_wal_option

* refactor: use integer hashmap

* feat: unify the injection of WAL option

* fix: fix procedure_flow_upgrade_candidate_with_retry

* chore: apply suggestions from CR
  • Loading branch information
WenyXu authored Jan 2, 2024
1 parent d87ab06 commit 2b181e9
Show file tree
Hide file tree
Showing 13 changed files with 204 additions and 103 deletions.
10 changes: 2 additions & 8 deletions src/common/meta/src/ddl/create_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ use api::v1::region::{
};
use api::v1::{ColumnDef, SemanticType};
use async_trait::async_trait;
use common_config::WAL_OPTIONS_KEY;
use common_error::ext::BoxedError;
use common_procedure::error::{
ExternalSnafu, FromJsonSnafu, Result as ProcedureResult, ToJsonSnafu,
Expand Down Expand Up @@ -48,6 +47,7 @@ use crate::rpc::ddl::CreateTableTask;
use crate::rpc::router::{
find_leader_regions, find_leaders, operating_leader_regions, RegionRoute,
};
use crate::wal::prepare_wal_option;

pub struct CreateTableProcedure {
pub context: DdlContext,
Expand Down Expand Up @@ -455,13 +455,7 @@ impl CreateRequestBuilder {
request.region_id = region_id.as_u64();
request.path = storage_path;
// Stores the encoded wal options into the request options.
region_wal_options
.get(&region_id.region_number())
.and_then(|wal_options| {
request
.options
.insert(WAL_OPTIONS_KEY.to_string(), wal_options.clone())
});
prepare_wal_option(&mut request.options, region_id, region_wal_options);

if let Some(physical_table_id) = self.physical_table_id {
// Logical table has the same region numbers with physical table, and they have a one-to-one mapping.
Expand Down
6 changes: 4 additions & 2 deletions src/common/meta/src/instruction.rs
Original file line number Diff line number Diff line change
Expand Up @@ -92,13 +92,15 @@ impl Display for OpenRegion {
}
}

#[serde_with::serde_as]
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
pub struct OpenRegion {
pub region_ident: RegionIdent,
pub region_storage_path: String,
pub region_options: HashMap<String, String>,
#[serde(default)]
pub region_wal_options: HashMap<String, String>,
#[serde_as(as = "HashMap<serde_with::DisplayFromStr, _>")]
pub region_wal_options: HashMap<RegionNumber, String>,
#[serde(default)]
pub skip_wal_replay: bool,
}
Expand All @@ -108,7 +110,7 @@ impl OpenRegion {
region_ident: RegionIdent,
path: &str,
region_options: HashMap<String, String>,
region_wal_options: HashMap<String, String>,
region_wal_options: HashMap<RegionNumber, String>,
skip_wal_replay: bool,
) -> Self {
Self {
Expand Down
2 changes: 1 addition & 1 deletion src/common/meta/src/key.rs
Original file line number Diff line number Diff line change
Expand Up @@ -604,7 +604,7 @@ impl TableMetadataManager {
current_table_route_value: &DeserializedValueWithBytes<TableRouteValue>,
new_region_routes: Vec<RegionRoute>,
new_region_options: &HashMap<String, String>,
new_region_wal_options: &HashMap<String, String>,
new_region_wal_options: &HashMap<RegionNumber, String>,
) -> Result<()> {
// Updates the datanode table key value pairs.
let current_region_distribution =
Expand Down
75 changes: 66 additions & 9 deletions src/common/meta/src/key/datanode_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ use crate::rpc::store::RangeRequest;
use crate::rpc::KeyValue;
use crate::DatanodeId;

#[serde_with::serde_as]
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Default)]
/// RegionInfo
/// For compatible reason, DON'T modify the field name.
Expand All @@ -48,14 +49,15 @@ pub struct RegionInfo {
#[serde(default)]
pub region_options: HashMap<String, String>,
/// The per-region wal options.
/// Key: region number (in string representation). Value: the encoded wal options of the region.
/// Key: region number. Value: the encoded wal options of the region.
#[serde(default)]
pub region_wal_options: HashMap<String, String>,
#[serde_as(as = "HashMap<serde_with::DisplayFromStr, _>")]
pub region_wal_options: HashMap<RegionNumber, String>,
}

pub struct DatanodeTableKey {
datanode_id: DatanodeId,
table_id: TableId,
pub datanode_id: DatanodeId,
pub table_id: TableId,
}

impl DatanodeTableKey {
Expand Down Expand Up @@ -181,7 +183,7 @@ impl DatanodeTableManager {
.filter_map(|region_number| {
region_wal_options
.get(region_number)
.map(|wal_options| (region_number.to_string(), wal_options.clone()))
.map(|wal_options| (*region_number, wal_options.clone()))
})
.collect();

Expand Down Expand Up @@ -214,7 +216,7 @@ impl DatanodeTableManager {
current_region_distribution: RegionDistribution,
new_region_distribution: RegionDistribution,
new_region_options: &HashMap<String, String>,
new_region_wal_options: &HashMap<String, String>,
new_region_wal_options: &HashMap<RegionNumber, String>,
) -> Result<Txn> {
let mut opts = Vec::new();

Expand Down Expand Up @@ -306,6 +308,61 @@ mod tests {
assert!(parsed.is_ok());
}

#[derive(Debug, Serialize, Deserialize, PartialEq)]
struct StringHashMap {
inner: HashMap<String, String>,
}

#[serde_with::serde_as]
#[derive(Debug, Serialize, Deserialize, PartialEq)]
struct IntegerHashMap {
#[serde_as(as = "HashMap<serde_with::DisplayFromStr, _>")]
inner: HashMap<u32, String>,
}

#[test]
fn test_serde_with_integer_hash_map() {
let map = StringHashMap {
inner: HashMap::from([
("1".to_string(), "aaa".to_string()),
("2".to_string(), "bbb".to_string()),
("3".to_string(), "ccc".to_string()),
]),
};
let encoded = serde_json::to_string(&map).unwrap();
let decoded: IntegerHashMap = serde_json::from_str(&encoded).unwrap();
assert_eq!(
IntegerHashMap {
inner: HashMap::from([
(1, "aaa".to_string()),
(2, "bbb".to_string()),
(3, "ccc".to_string()),
]),
},
decoded
);

let map = IntegerHashMap {
inner: HashMap::from([
(1, "aaa".to_string()),
(2, "bbb".to_string()),
(3, "ccc".to_string()),
]),
};
let encoded = serde_json::to_string(&map).unwrap();
let decoded: StringHashMap = serde_json::from_str(&encoded).unwrap();
assert_eq!(
StringHashMap {
inner: HashMap::from([
("1".to_string(), "aaa".to_string()),
("2".to_string(), "bbb".to_string()),
("3".to_string(), "ccc".to_string()),
]),
},
decoded
);
}

// This test intends to ensure both the `serde_json::to_string` + `serde_json::from_str`
// and `serde_json::to_vec` + `serde_json::from_slice` work for `DatanodeTableValue`.
// Warning: if the key of `region_wal_options` is of type non-String, this test would fail.
Expand All @@ -320,9 +377,9 @@ mod tests {
("c".to_string(), "cc".to_string()),
]),
region_wal_options: HashMap::from([
("1".to_string(), "aaa".to_string()),
("2".to_string(), "bbb".to_string()),
("3".to_string(), "ccc".to_string()),
(1, "aaa".to_string()),
(2, "bbb".to_string()),
(3, "ccc".to_string()),
]),
};
let table_value = DatanodeTableValue {
Expand Down
13 changes: 13 additions & 0 deletions src/common/meta/src/wal.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,11 @@ pub mod options_allocator;
use std::collections::HashMap;

use common_config::wal::StandaloneWalConfig;
use common_config::WAL_OPTIONS_KEY;
use common_telemetry::warn;
use serde::{Deserialize, Serialize};
use serde_with::with_prefix;
use store_api::storage::{RegionId, RegionNumber};

use crate::error::Result;
use crate::wal::kafka::KafkaConfig;
Expand Down Expand Up @@ -55,6 +58,16 @@ impl From<StandaloneWalConfig> for WalConfig {
}
}

pub fn prepare_wal_option(
options: &mut HashMap<String, String>,
region_id: RegionId,
region_wal_options: &HashMap<RegionNumber, String>,
) {
if let Some(wal_options) = region_wal_options.get(&region_id.region_number()) {
options.insert(WAL_OPTIONS_KEY.to_string(), wal_options.clone());
}
}

#[cfg(test)]
mod tests {
use std::time::Duration;
Expand Down
15 changes: 7 additions & 8 deletions src/datanode/src/datanode.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,12 @@ use std::sync::Arc;
use catalog::memory::MemoryCatalogManager;
use common_base::Plugins;
use common_config::wal::{KafkaConfig, RaftEngineConfig};
use common_config::{WalConfig, WAL_OPTIONS_KEY};
use common_config::WalConfig;
use common_error::ext::BoxedError;
use common_greptimedb_telemetry::GreptimeDBTelemetryTask;
use common_meta::key::datanode_table::{DatanodeTableManager, DatanodeTableValue};
use common_meta::kv_backend::KvBackendRef;
use common_meta::wal::prepare_wal_option;
pub use common_procedure::options::ProcedureConfig;
use common_runtime::Runtime;
use common_telemetry::{error, info, warn};
Expand Down Expand Up @@ -538,13 +539,11 @@ async fn open_all_regions(
for region_number in table_value.regions {
// Augments region options with wal options if a wal options is provided.
let mut region_options = table_value.region_info.region_options.clone();
table_value
.region_info
.region_wal_options
.get(&region_number.to_string())
.and_then(|wal_options| {
region_options.insert(WAL_OPTIONS_KEY.to_string(), wal_options.clone())
});
prepare_wal_option(
&mut region_options,
RegionId::new(table_value.table_id, region_number),
&table_value.region_info.region_wal_options,
);

regions.push((
RegionId::new(table_value.table_id, region_number),
Expand Down
8 changes: 8 additions & 0 deletions src/meta-srv/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -327,6 +327,13 @@ pub enum Error {
location: Location,
},

#[snafu(display("Datanode table not found: {}, datanode: {}", table_id, datanode_id))]
DatanodeTableNotFound {
table_id: TableId,
datanode_id: DatanodeId,
location: Location,
},

#[snafu(display("Table route corrupted, key: {}, reason: {}", key, reason))]
CorruptedTableRoute {
key: String,
Expand Down Expand Up @@ -683,6 +690,7 @@ impl ErrorExt for Error {
| Error::InvalidRegionKeyFromUtf8 { .. }
| Error::TableRouteNotFound { .. }
| Error::TableInfoNotFound { .. }
| Error::DatanodeTableNotFound { .. }
| Error::CorruptedTableRoute { .. }
| Error::MoveValue { .. }
| Error::InvalidUtf8Value { .. }
Expand Down
33 changes: 19 additions & 14 deletions src/meta-srv/src/procedure/region_failover/activate_region.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,14 @@ use std::time::Duration;

use api::v1::meta::MailboxMessage;
use async_trait::async_trait;
use common_meta::ddl::utils::region_storage_path;
use common_meta::instruction::{Instruction, InstructionReply, OpenRegion, SimpleReply};
use common_meta::key::datanode_table::{DatanodeTableKey, RegionInfo};
use common_meta::peer::Peer;
use common_meta::RegionIdent;
use common_telemetry::{debug, info};
use serde::{Deserialize, Serialize};
use snafu::{OptionExt, ResultExt};
use store_api::storage::RegionNumber;

use super::update_metadata::UpdateRegionMetadata;
use super::{RegionFailoverContext, State};
Expand All @@ -44,7 +45,7 @@ pub(super) struct ActivateRegion {
// An `None` option stands for uninitialized.
region_storage_path: Option<String>,
region_options: Option<HashMap<String, String>>,
region_wal_options: Option<HashMap<String, String>>,
region_wal_options: Option<HashMap<RegionNumber, String>>,
}

impl ActivateRegion {
Expand All @@ -65,27 +66,31 @@ impl ActivateRegion {
timeout: Duration,
) -> Result<MailboxReceiver> {
let table_id = failed_region.table_id;
let table_info = ctx
// Retrieves the wal options from failed datanode table value.
let datanode_table_value = ctx
.table_metadata_manager
.table_info_manager()
.get(table_id)
.datanode_table_manager()
.get(&DatanodeTableKey::new(failed_region.datanode_id, table_id))
.await
.context(error::TableMetadataManagerSnafu)?
.context(error::TableInfoNotFoundSnafu { table_id })?
.into_inner()
.table_info;

let region_storage_path =
region_storage_path(&table_info.catalog_name, &table_info.schema_name);
.context(error::DatanodeTableNotFoundSnafu {
table_id,
datanode_id: failed_region.datanode_id,
})?;

let candidate_ident = RegionIdent {
datanode_id: self.candidate.id,
..failed_region.clone()
};
info!("Activating region: {candidate_ident:?}");
let region_options: HashMap<String, String> = (&table_info.meta.options).into();
// TODO(niebayes): properly fetch or construct region wal options.
let region_wal_options = HashMap::new();

let RegionInfo {
region_storage_path,
region_options,
region_wal_options,
..
} = datanode_table_value.region_info;

let instruction = Instruction::OpenRegion(OpenRegion::new(
candidate_ident.clone(),
&region_storage_path,
Expand Down
5 changes: 3 additions & 2 deletions src/meta-srv/src/procedure/region_failover/update_metadata.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ use common_meta::RegionIdent;
use common_telemetry::info;
use serde::{Deserialize, Serialize};
use snafu::{OptionExt, ResultExt};
use store_api::storage::RegionNumber;

use super::invalidate_cache::InvalidateCache;
use super::{RegionFailoverContext, State};
Expand All @@ -36,15 +37,15 @@ pub(super) struct UpdateRegionMetadata {
region_storage_path: String,
region_options: HashMap<String, String>,
#[serde(default)]
region_wal_options: HashMap<String, String>,
region_wal_options: HashMap<RegionNumber, String>,
}

impl UpdateRegionMetadata {
pub(super) fn new(
candidate: Peer,
region_storage_path: String,
region_options: HashMap<String, String>,
region_wal_options: HashMap<String, String>,
region_wal_options: HashMap<RegionNumber, String>,
) -> Self {
Self {
candidate,
Expand Down
Loading

0 comments on commit 2b181e9

Please sign in to comment.