From 23c32329276a7c38188873ef45c840d82a830f5a Mon Sep 17 00:00:00 2001 From: WenyXu Date: Sun, 8 Oct 2023 09:10:38 +0000 Subject: [PATCH 1/4] refactor: compare with origin bytes during the transaction --- Cargo.lock | 1 + src/catalog/src/kvbackend/manager.rs | 1 + src/common/meta/Cargo.toml | 1 + src/common/meta/src/ddl/alter_table.rs | 16 +- src/common/meta/src/ddl/drop_table.rs | 17 +- src/common/meta/src/ddl/truncate_table.rs | 9 +- src/common/meta/src/ddl_manager.rs | 12 +- src/common/meta/src/key.rs | 267 ++++++++++++++---- src/common/meta/src/key/table_info.rs | 35 ++- src/common/meta/src/key/table_route.rs | 36 ++- .../region_failover/activate_region.rs | 1 + .../region_failover/update_metadata.rs | 9 +- src/meta-srv/src/procedure/tests.rs | 9 +- src/meta-srv/src/selector/load_based.rs | 2 +- src/meta-srv/src/table_routes.rs | 6 +- src/partition/src/manager.rs | 12 +- tests-integration/src/grpc.rs | 3 +- tests-integration/src/instance.rs | 3 +- 18 files changed, 319 insertions(+), 121 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 2238b680e6bb..ec71fb7a3ec0 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1854,6 +1854,7 @@ dependencies = [ "async-stream", "async-trait", "base64 0.21.3", + "bytes", "chrono", "common-catalog", "common-error", diff --git a/src/catalog/src/kvbackend/manager.rs b/src/catalog/src/kvbackend/manager.rs index c434f1c91841..02887006e6cc 100644 --- a/src/catalog/src/kvbackend/manager.rs +++ b/src/catalog/src/kvbackend/manager.rs @@ -229,6 +229,7 @@ impl CatalogManager for KvBackendCatalogManager { .get(table_id) .await .context(TableMetadataManagerSnafu)? + .map(|v| v.into_inner()) else { return Ok(None); }; diff --git a/src/common/meta/Cargo.toml b/src/common/meta/Cargo.toml index f2ba42293209..c8fdd92180ac 100644 --- a/src/common/meta/Cargo.toml +++ b/src/common/meta/Cargo.toml @@ -13,6 +13,7 @@ arrow-flight.workspace = true async-stream.workspace = true async-trait.workspace = true base64 = "0.21" +bytes = "1.4" common-catalog = { workspace = true } common-error = { workspace = true } common-grpc-expr.workspace = true diff --git a/src/common/meta/src/ddl/alter_table.rs b/src/common/meta/src/ddl/alter_table.rs index b1bca028ecac..4aa37c39802d 100644 --- a/src/common/meta/src/ddl/alter_table.rs +++ b/src/common/meta/src/ddl/alter_table.rs @@ -45,6 +45,7 @@ use crate::error::{ use crate::key::table_info::TableInfoValue; use crate::key::table_name::TableNameKey; use crate::key::table_route::TableRouteValue; +use crate::key::DeserializedValueWithBytes; use crate::metrics; use crate::rpc::ddl::AlterTableTask; use crate::rpc::router::{find_leader_regions, find_leaders}; @@ -63,7 +64,7 @@ impl AlterTableProcedure { pub fn new( cluster_id: u64, task: AlterTableTask, - table_info_value: TableInfoValue, + table_info_value: DeserializedValueWithBytes, context: DdlContext, ) -> Result { let alter_kind = task @@ -74,7 +75,7 @@ impl AlterTableProcedure { err_msg: "'kind' is absent", })?; let (kind, next_column_id) = - create_proto_alter_kind(&table_info_value.table_info, alter_kind)?; + create_proto_alter_kind(&table_info_value.inner.table_info, alter_kind)?; debug!( "New AlterTableProcedure, kind: {:?}, next_column_id: {:?}", @@ -100,7 +101,7 @@ impl AlterTableProcedure { }) .map_err(ProcedureError::external)?; let (kind, next_column_id) = - create_proto_alter_kind(&data.table_info_value.table_info, alter_kind) + create_proto_alter_kind(&data.table_info_value.inner.table_info, alter_kind) .map_err(ProcedureError::external)?; assert_eq!(data.next_column_id, next_column_id); @@ -191,7 +192,8 @@ impl AlterTableProcedure { .await? .with_context(|| TableRouteNotFoundSnafu { table_name: table_ref.to_string(), - })?; + })? + .into_inner(); let leaders = find_leaders(®ion_routes); let mut alter_region_tasks = Vec::with_capacity(leaders.len()); @@ -413,7 +415,7 @@ pub struct AlterTableData { state: AlterTableState, task: AlterTableTask, /// Table info value before alteration. - table_info_value: TableInfoValue, + table_info_value: DeserializedValueWithBytes, cluster_id: u64, /// Next column id of the table if the task adds columns to the table. next_column_id: Option, @@ -422,7 +424,7 @@ pub struct AlterTableData { impl AlterTableData { pub fn new( task: AlterTableTask, - table_info_value: TableInfoValue, + table_info_value: DeserializedValueWithBytes, cluster_id: u64, next_column_id: Option, ) -> Self { @@ -444,7 +446,7 @@ impl AlterTableData { } fn table_info(&self) -> &RawTableInfo { - &self.table_info_value.table_info + &self.table_info_value.inner.table_info } } diff --git a/src/common/meta/src/ddl/drop_table.rs b/src/common/meta/src/ddl/drop_table.rs index 9da7759cd006..cd7a9febbb7a 100644 --- a/src/common/meta/src/ddl/drop_table.rs +++ b/src/common/meta/src/ddl/drop_table.rs @@ -39,6 +39,7 @@ use crate::error::{self, Result}; use crate::key::table_info::TableInfoValue; use crate::key::table_name::TableNameKey; use crate::key::table_route::TableRouteValue; +use crate::key::DeserializedValueWithBytes; use crate::metrics; use crate::rpc::ddl::DropTableTask; use crate::rpc::router::{find_leader_regions, find_leaders, RegionRoute}; @@ -55,8 +56,8 @@ impl DropTableProcedure { pub fn new( cluster_id: u64, task: DropTableTask, - table_route_value: TableRouteValue, - table_info_value: TableInfoValue, + table_route_value: DeserializedValueWithBytes, + table_info_value: DeserializedValueWithBytes, context: DdlContext, ) -> Self { Self { @@ -231,16 +232,16 @@ pub struct DropTableData { pub state: DropTableState, pub cluster_id: u64, pub task: DropTableTask, - pub table_route_value: TableRouteValue, - pub table_info_value: TableInfoValue, + pub table_route_value: DeserializedValueWithBytes, + pub table_info_value: DeserializedValueWithBytes, } impl DropTableData { pub fn new( cluster_id: u64, task: DropTableTask, - table_route_value: TableRouteValue, - table_info_value: TableInfoValue, + table_route_value: DeserializedValueWithBytes, + table_info_value: DeserializedValueWithBytes, ) -> Self { Self { state: DropTableState::Prepare, @@ -256,11 +257,11 @@ impl DropTableData { } fn region_routes(&self) -> &Vec { - &self.table_route_value.region_routes + &self.table_route_value.inner.region_routes } fn table_info(&self) -> &RawTableInfo { - &self.table_info_value.table_info + &self.table_info_value.inner.table_info } fn table_id(&self) -> TableId { diff --git a/src/common/meta/src/ddl/truncate_table.rs b/src/common/meta/src/ddl/truncate_table.rs index 831b41a631f0..d23fb3c72693 100644 --- a/src/common/meta/src/ddl/truncate_table.rs +++ b/src/common/meta/src/ddl/truncate_table.rs @@ -35,6 +35,7 @@ use crate::ddl::DdlContext; use crate::error::{Result, TableNotFoundSnafu}; use crate::key::table_info::TableInfoValue; use crate::key::table_name::TableNameKey; +use crate::key::DeserializedValueWithBytes; use crate::metrics; use crate::rpc::ddl::TruncateTableTask; use crate::rpc::router::{find_leader_regions, find_leaders, RegionRoute}; @@ -90,7 +91,7 @@ impl TruncateTableProcedure { pub(crate) fn new( cluster_id: u64, task: TruncateTableTask, - table_info_value: TableInfoValue, + table_info_value: DeserializedValueWithBytes, region_routes: Vec, context: DdlContext, ) -> Self { @@ -188,7 +189,7 @@ pub struct TruncateTableData { state: TruncateTableState, cluster_id: u64, task: TruncateTableTask, - table_info_value: TableInfoValue, + table_info_value: DeserializedValueWithBytes, region_routes: Vec, } @@ -196,7 +197,7 @@ impl TruncateTableData { pub fn new( cluster_id: u64, task: TruncateTableTask, - table_info_value: TableInfoValue, + table_info_value: DeserializedValueWithBytes, region_routes: Vec, ) -> Self { Self { @@ -217,7 +218,7 @@ impl TruncateTableData { } fn table_info(&self) -> &RawTableInfo { - &self.table_info_value.table_info + &self.table_info_value.inner.table_info } fn table_id(&self) -> TableId { diff --git a/src/common/meta/src/ddl_manager.rs b/src/common/meta/src/ddl_manager.rs index 62af640a3c57..7e0f8f169424 100644 --- a/src/common/meta/src/ddl_manager.rs +++ b/src/common/meta/src/ddl_manager.rs @@ -35,7 +35,7 @@ use crate::error::{ use crate::key::table_info::TableInfoValue; use crate::key::table_name::TableNameKey; use crate::key::table_route::TableRouteValue; -use crate::key::TableMetadataManagerRef; +use crate::key::{DeserializedValueWithBytes, TableMetadataManagerRef}; use crate::rpc::ddl::DdlTask::{AlterTable, CreateTable, DropTable, TruncateTable}; use crate::rpc::ddl::{ AlterTableTask, CreateTableTask, DropTableTask, SubmitDdlTaskRequest, SubmitDdlTaskResponse, @@ -144,7 +144,7 @@ impl DdlManager { &self, cluster_id: u64, alter_table_task: AlterTableTask, - table_info_value: TableInfoValue, + table_info_value: DeserializedValueWithBytes, ) -> Result { let context = self.create_context(); @@ -176,8 +176,8 @@ impl DdlManager { &self, cluster_id: u64, drop_table_task: DropTableTask, - table_info_value: TableInfoValue, - table_route_value: TableRouteValue, + table_info_value: DeserializedValueWithBytes, + table_route_value: DeserializedValueWithBytes, ) -> Result { let context = self.create_context(); @@ -198,7 +198,7 @@ impl DdlManager { &self, cluster_id: u64, truncate_table_task: TruncateTableTask, - table_info_value: TableInfoValue, + table_info_value: DeserializedValueWithBytes, region_routes: Vec, ) -> Result { let context = self.create_context(); @@ -252,7 +252,7 @@ async fn handle_truncate_table_task( table_name: table_ref.to_string(), })?; - let table_route = table_route_value.region_routes; + let table_route = table_route_value.inner.region_routes; let id = ddl_manager .submit_truncate_table_task( diff --git a/src/common/meta/src/key.rs b/src/common/meta/src/key.rs index a37d2e16f117..e9c3038f16f1 100644 --- a/src/common/meta/src/key.rs +++ b/src/common/meta/src/key.rs @@ -56,12 +56,16 @@ pub mod table_region; pub mod table_route; use std::collections::{BTreeMap, HashMap}; +use std::fmt::Debug; use std::sync::Arc; +use bytes::Bytes; use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME}; use datanode_table::{DatanodeTableKey, DatanodeTableManager, DatanodeTableValue}; use lazy_static::lazy_static; use regex::Regex; +use serde::de::DeserializeOwned; +use serde::{Deserialize, Serialize}; use snafu::{ensure, OptionExt, ResultExt}; use store_api::storage::RegionNumber; use table::metadata::{RawTableInfo, TableId}; @@ -155,6 +159,104 @@ macro_rules! ensure_values { }; } +/// A struct containing a deserialized value(`inner`) and an original bytes. +/// +/// - Serialize behaviors: +/// +/// The `inner` field will be ignored. +/// +/// - Deserialize behaviors: +/// +/// The `inner` field will be deserialized from the `bytes` field. +pub struct DeserializedValueWithBytes { + // The original bytes of the inner. + pub bytes: Bytes, + // The value was deserialized from the original bytes. + pub inner: T, +} + +impl Debug for DeserializedValueWithBytes { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!( + f, + "DeserializedValueWithBytes(inner: {:?}, bytes: {:?})", + self.inner, self.bytes + ) + } +} + +impl Serialize for DeserializedValueWithBytes { + /// - Serialize behaviors: + /// + /// The `inner` field will be ignored. + fn serialize(&self, serializer: S) -> std::result::Result + where + S: serde::Serializer, + { + self.bytes.serialize(serializer) + } +} + +impl<'de, T: DeserializeOwned + Serialize> Deserialize<'de> for DeserializedValueWithBytes { + /// - Deserialize behaviors: + /// + /// The `inner` field will be deserialized from the `bytes` field. + fn deserialize(deserializer: D) -> std::result::Result + where + D: serde::Deserializer<'de>, + { + let buf = Bytes::deserialize(deserializer)?; + + let value = DeserializedValueWithBytes::from_inner_bytes(buf) + .map_err(|err| serde::de::Error::custom(err.to_string()))?; + + Ok(value) + } +} + +impl Clone for DeserializedValueWithBytes { + fn clone(&self) -> Self { + Self { + bytes: self.bytes.clone(), + inner: self.inner.clone(), + } + } +} + +impl DeserializedValueWithBytes { + /// Returns a struct containing a deserialized value and an original `bytes`. + /// It accepts original bytes of inner. + pub fn from_inner_bytes(bytes: Bytes) -> Result { + let inner = serde_json::from_slice(&bytes).context(error::SerdeJsonSnafu)?; + Ok(Self { bytes, inner }) + } + + /// Returns a struct containing a deserialized value and an original `bytes`. + /// It accepts original bytes of inner. + pub fn from_inner_slice(bytes: &[u8]) -> Result { + Self::from_inner_bytes(Bytes::copy_from_slice(bytes)) + } + + pub fn into_inner(self) -> T { + self.inner + } + + pub fn into_bytes(self) -> Bytes { + self.bytes + } + + /// Notes: used for test purpose. + /// Due to it was used in other crates, without wrapping with a #[cfg(test)]. + pub fn from_inner(inner: T) -> Self { + let bytes = serde_json::to_vec(&inner).unwrap(); + + Self { + bytes: Bytes::from(bytes), + inner, + } + } +} + impl TableMetadataManager { pub fn new(kv_backend: KvBackendRef) -> Self { TableMetadataManager { @@ -212,7 +314,10 @@ impl TableMetadataManager { pub async fn get_full_table_info( &self, table_id: TableId, - ) -> Result<(Option, Option)> { + ) -> Result<( + Option>, + Option>, + )> { let (get_table_route_txn, table_route_decoder) = self.table_route_manager.build_get_txn(table_id); @@ -291,15 +396,17 @@ impl TableMetadataManager { // Checks whether metadata was already created. if !r.succeeded { - let remote_table_info = - on_create_table_info_failure(&r.responses)?.context(error::UnexpectedSnafu { + let remote_table_info = on_create_table_info_failure(&r.responses)? + .context(error::UnexpectedSnafu { err_msg: "Reads the empty table info during the create table metadata", - })?; + })? + .into_inner(); - let remote_table_route = - on_create_table_route_failure(&r.responses)?.context(error::UnexpectedSnafu { + let remote_table_route = on_create_table_route_failure(&r.responses)? + .context(error::UnexpectedSnafu { err_msg: "Reads the empty table route during the create table metadata", - })?; + })? + .into_inner(); let op_name = "the creating table metadata"; ensure_values!(remote_table_info, table_info_value, op_name); @@ -313,10 +420,10 @@ impl TableMetadataManager { /// The caller MUST ensure it has the exclusive access to `TableNameKey`. pub async fn delete_table_metadata( &self, - table_info_value: &TableInfoValue, - table_route_value: &TableRouteValue, + table_info_value: &DeserializedValueWithBytes, + table_route_value: &DeserializedValueWithBytes, ) -> Result<()> { - let table_info = &table_info_value.table_info; + let table_info = &table_info_value.inner.table_info; let table_id = table_info.ident.table_id; // Deletes table name. @@ -336,7 +443,7 @@ impl TableMetadataManager { .build_delete_txn(table_id, table_info_value)?; // Deletes datanode table key value pairs. - let distribution = region_distribution(&table_route_value.region_routes)?; + let distribution = region_distribution(&table_route_value.inner.region_routes)?; let delete_datanode_txn = self .datanode_table_manager() .build_delete_txn(table_id, distribution)?; @@ -364,10 +471,10 @@ impl TableMetadataManager { /// and the new `TableNameKey` MUST be empty. pub async fn rename_table( &self, - current_table_info_value: TableInfoValue, + current_table_info_value: DeserializedValueWithBytes, new_table_name: String, ) -> Result<()> { - let current_table_info = ¤t_table_info_value.table_info; + let current_table_info = ¤t_table_info_value.inner.table_info; let table_id = current_table_info.ident.table_id; let table_name_key = TableNameKey::new( @@ -389,9 +496,11 @@ impl TableMetadataManager { table_id, )?; - let new_table_info_value = current_table_info_value.with_update(move |table_info| { - table_info.name = new_table_name; - }); + let new_table_info_value = current_table_info_value + .inner + .with_update(move |table_info| { + table_info.name = new_table_name; + }); // Updates table info. let (update_table_info_txn, on_update_table_info_failure) = self @@ -404,10 +513,11 @@ impl TableMetadataManager { // Checks whether metadata was already updated. if !r.succeeded { - let remote_table_info = - on_update_table_info_failure(&r.responses)?.context(error::UnexpectedSnafu { + let remote_table_info = on_update_table_info_failure(&r.responses)? + .context(error::UnexpectedSnafu { err_msg: "Reads the empty table info during the rename table metadata", - })?; + })? + .into_inner(); let op_name = "the renaming table metadata"; ensure_values!(remote_table_info, new_table_info_value, op_name); @@ -419,12 +529,12 @@ impl TableMetadataManager { /// Updates table info and returns an error if different metadata exists. pub async fn update_table_info( &self, - current_table_info_value: TableInfoValue, + current_table_info_value: DeserializedValueWithBytes, new_table_info: RawTableInfo, ) -> Result<()> { - let table_id = current_table_info_value.table_info.ident.table_id; + let table_id = current_table_info_value.inner.table_info.ident.table_id; - let new_table_info_value = current_table_info_value.update(new_table_info); + let new_table_info_value = current_table_info_value.inner.update(new_table_info); // Updates table info. let (update_table_info_txn, on_update_table_info_failure) = self @@ -435,10 +545,11 @@ impl TableMetadataManager { // Checks whether metadata was already updated. if !r.succeeded { - let remote_table_info = - on_update_table_info_failure(&r.responses)?.context(error::UnexpectedSnafu { + let remote_table_info = on_update_table_info_failure(&r.responses)? + .context(error::UnexpectedSnafu { err_msg: "Reads the empty table info during the updating table info", - })?; + })? + .into_inner(); let op_name = "the updating table info"; ensure_values!(remote_table_info, new_table_info_value, op_name); @@ -450,13 +561,13 @@ impl TableMetadataManager { &self, table_id: TableId, region_info: RegionInfo, - current_table_route_value: TableRouteValue, + current_table_route_value: DeserializedValueWithBytes, new_region_routes: Vec, new_region_options: &HashMap, ) -> Result<()> { // Updates the datanode table key value pairs. let current_region_distribution = - region_distribution(¤t_table_route_value.region_routes)?; + region_distribution(¤t_table_route_value.inner.region_routes)?; let new_region_distribution = region_distribution(&new_region_routes)?; let update_datanode_table_txn = self.datanode_table_manager().build_update_txn( @@ -468,7 +579,7 @@ impl TableMetadataManager { )?; // Updates the table_route. - let new_table_route_value = current_table_route_value.update(new_region_routes); + let new_table_route_value = current_table_route_value.inner.update(new_region_routes); let (update_table_route_txn, on_update_table_route_failure) = self .table_route_manager() @@ -480,10 +591,11 @@ impl TableMetadataManager { // Checks whether metadata was already updated. if !r.succeeded { - let remote_table_route = - on_update_table_route_failure(&r.responses)?.context(error::UnexpectedSnafu { + let remote_table_route = on_update_table_route_failure(&r.responses)? + .context(error::UnexpectedSnafu { err_msg: "Reads the empty table route during the updating table route", - })?; + })? + .into_inner(); let op_name = "the updating table route"; ensure_values!(remote_table_route, new_table_route_value, op_name); @@ -559,6 +671,7 @@ mod tests { use std::collections::{BTreeMap, HashMap}; use std::sync::Arc; + use bytes::Bytes; use datatypes::prelude::ConcreteDataType; use datatypes::schema::{ColumnSchema, SchemaBuilder}; use futures::TryStreamExt; @@ -570,11 +683,39 @@ mod tests { use crate::key::table_info::TableInfoValue; use crate::key::table_name::TableNameKey; use crate::key::table_route::TableRouteValue; - use crate::key::{to_removed_key, TableMetadataManager}; + use crate::key::{to_removed_key, DeserializedValueWithBytes, TableMetadataManager}; use crate::kv_backend::memory::MemoryKvBackend; use crate::peer::Peer; use crate::rpc::router::{region_distribution, Region, RegionRoute}; + #[test] + fn test_deserialized_value_with_bytes() { + let region_route = new_test_region_route(); + let region_routes = vec![region_route.clone()]; + + let expected_region_routes = + TableRouteValue::new(vec![region_route.clone(), region_route.clone()]); + let expected = serde_json::to_vec(&expected_region_routes).unwrap(); + + //Serialize behaviors: + // The inner field will be ignored. + let value = DeserializedValueWithBytes { + // ignored + inner: TableRouteValue::new(region_routes.clone()), + bytes: Bytes::from(expected.clone()), + }; + + let encoded = serde_json::to_vec(&value).unwrap(); + + // Deserialize behaviors: + // The inner field will be deserialized from the bytes field. + let decoded: DeserializedValueWithBytes = + serde_json::from_slice(&encoded).unwrap(); + + assert_eq!(decoded.inner, expected_region_routes); + assert_eq!(decoded.bytes, expected); + } + #[test] fn test_to_removed_key() { let key = "test_key"; @@ -664,8 +805,14 @@ mod tests { .await .unwrap(); - assert_eq!(remote_table_info.unwrap().table_info, table_info); - assert_eq!(remote_table_route.unwrap().region_routes, region_routes); + assert_eq!( + remote_table_info.unwrap().into_inner().table_info, + table_info + ); + assert_eq!( + remote_table_route.unwrap().into_inner().region_routes, + region_routes + ); } #[tokio::test] @@ -678,7 +825,8 @@ mod tests { new_test_table_info(region_routes.iter().map(|r| r.region.id.region_number())).into(); let table_id = table_info.ident.table_id; let datanode_id = 2; - let table_route_value = TableRouteValue::new(region_routes.clone()); + let table_route_value = + DeserializedValueWithBytes::from_inner(TableRouteValue::new(region_routes.clone())); // creates metadata. table_metadata_manager @@ -686,7 +834,8 @@ mod tests { .await .unwrap(); - let table_info_value = TableInfoValue::new(table_info.clone()); + let table_info_value = + DeserializedValueWithBytes::from_inner(TableInfoValue::new(table_info.clone())); // deletes metadata. table_metadata_manager @@ -727,7 +876,8 @@ mod tests { .get_removed(table_id) .await .unwrap() - .unwrap(); + .unwrap() + .into_inner(); assert_eq!(removed_table_info.table_info, table_info); let removed_table_route = table_metadata_manager @@ -735,7 +885,8 @@ mod tests { .get_removed(table_id) .await .unwrap() - .unwrap(); + .unwrap() + .into_inner(); assert_eq!(removed_table_route.region_routes, region_routes); } @@ -754,7 +905,9 @@ mod tests { .await .unwrap(); let new_table_name = "another_name".to_string(); - let table_info_value = TableInfoValue::new(table_info.clone()); + let table_info_value = + DeserializedValueWithBytes::from_inner(TableInfoValue::new(table_info.clone())); + table_metadata_manager .rename_table(table_info_value.clone(), new_table_name.clone()) .await @@ -766,7 +919,9 @@ mod tests { .unwrap(); let mut modified_table_info = table_info.clone(); modified_table_info.name = "hi".to_string(); - let modified_table_info_value = table_info_value.update(modified_table_info); + let modified_table_info_value = DeserializedValueWithBytes::from_inner( + table_info_value.inner.update(modified_table_info), + ); // if the table_info_value is wrong, it should return an error. // The ABA problem. assert!(table_metadata_manager @@ -820,7 +975,8 @@ mod tests { .unwrap(); let mut new_table_info = table_info.clone(); new_table_info.name = "hi".to_string(); - let current_table_info_value = TableInfoValue::new(table_info.clone()); + let current_table_info_value = + DeserializedValueWithBytes::from_inner(TableInfoValue::new(table_info.clone())); // should be ok. table_metadata_manager .update_table_info(current_table_info_value.clone(), new_table_info.clone()) @@ -838,12 +994,15 @@ mod tests { .get(table_id) .await .unwrap() - .unwrap(); + .unwrap() + .into_inner(); assert_eq!(updated_table_info.table_info, new_table_info); let mut wrong_table_info = table_info.clone(); wrong_table_info.name = "wrong".to_string(); - let wrong_table_info_value = current_table_info_value.update(wrong_table_info); + let wrong_table_info_value = DeserializedValueWithBytes::from_inner( + current_table_info_value.inner.update(wrong_table_info), + ); // if the current_table_info_value is wrong, it should return an error. // The ABA problem. assert!(table_metadata_manager @@ -882,7 +1041,8 @@ mod tests { let engine = table_info.meta.engine.as_str(); let region_storage_path = region_storage_path(&table_info.catalog_name, &table_info.schema_name); - let current_table_route_value = TableRouteValue::new(region_routes.clone()); + let current_table_route_value = + DeserializedValueWithBytes::from_inner(TableRouteValue::new(region_routes.clone())); // creates metadata. table_metadata_manager .create_table_metadata(table_info.clone(), region_routes.clone()) @@ -927,7 +1087,11 @@ mod tests { .await .unwrap(); - let current_table_route_value = current_table_route_value.update(new_region_routes.clone()); + let current_table_route_value = DeserializedValueWithBytes::from_inner( + current_table_route_value + .inner + .update(new_region_routes.clone()), + ); let new_region_routes = vec![new_region_route(2, 4), new_region_route(5, 5)]; // it should be ok. table_metadata_manager @@ -948,12 +1112,13 @@ mod tests { // if the current_table_route_value is wrong, it should return an error. // The ABA problem. - let wrong_table_route_value = current_table_route_value.update(vec![ - new_region_route(1, 1), - new_region_route(2, 2), - new_region_route(3, 3), - new_region_route(4, 4), - ]); + let wrong_table_route_value = + DeserializedValueWithBytes::from_inner(current_table_route_value.inner.update(vec![ + new_region_route(1, 1), + new_region_route(2, 2), + new_region_route(3, 3), + new_region_route(4, 4), + ])); assert!(table_metadata_manager .update_table_route( table_id, diff --git a/src/common/meta/src/key/table_info.rs b/src/common/meta/src/key/table_info.rs index 50d7e56d5164..04ee455fa916 100644 --- a/src/common/meta/src/key/table_info.rs +++ b/src/common/meta/src/key/table_info.rs @@ -16,7 +16,7 @@ use serde::{Deserialize, Serialize}; use table::engine::TableReference; use table::metadata::{RawTableInfo, TableId}; -use super::TABLE_INFO_KEY_PREFIX; +use super::{DeserializedValueWithBytes, TABLE_INFO_KEY_PREFIX}; use crate::error::Result; use crate::key::{to_removed_key, TableMetaKey}; use crate::kv_backend::txn::{Compare, CompareOp, Txn, TxnOp, TxnOpResponse}; @@ -103,7 +103,7 @@ impl TableInfoManager { table_id: TableId, ) -> ( Txn, - impl FnOnce(&Vec) -> Result>, + impl FnOnce(&Vec) -> Result>>, ) { let key = TableInfoKey::new(table_id); let raw_key = key.as_raw_key(); @@ -119,7 +119,7 @@ impl TableInfoManager { table_info_value: &TableInfoValue, ) -> Result<( Txn, - impl FnOnce(&Vec) -> Result>, + impl FnOnce(&Vec) -> Result>>, )> { let key = TableInfoKey::new(table_id); let raw_key = key.as_raw_key(); @@ -143,15 +143,15 @@ impl TableInfoManager { pub(crate) fn build_update_txn( &self, table_id: TableId, - current_table_info_value: &TableInfoValue, + current_table_info_value: &DeserializedValueWithBytes, new_table_info_value: &TableInfoValue, ) -> Result<( Txn, - impl FnOnce(&Vec) -> Result>, + impl FnOnce(&Vec) -> Result>>, )> { let key = TableInfoKey::new(table_id); let raw_key = key.as_raw_key(); - let raw_value = current_table_info_value.try_as_raw_value()?; + let raw_value = current_table_info_value.bytes.to_vec(); let txn = Txn::new() .when(vec![Compare::with_value( @@ -172,11 +172,11 @@ impl TableInfoManager { pub(crate) fn build_delete_txn( &self, table_id: TableId, - table_info_value: &TableInfoValue, + table_info_value: &DeserializedValueWithBytes, ) -> Result { let key = TableInfoKey::new(table_id); let raw_key = key.as_raw_key(); - let raw_value = table_info_value.try_as_raw_value()?; + let raw_value = table_info_value.bytes.to_vec(); let removed_key = to_removed_key(&String::from_utf8_lossy(&raw_key)); let txn = Txn::new().and_then(vec![ @@ -189,7 +189,8 @@ impl TableInfoManager { fn build_decode_fn( raw_key: Vec, - ) -> impl FnOnce(&Vec) -> Result> { + ) -> impl FnOnce(&Vec) -> Result>> + { move |kvs: &Vec| { kvs.iter() .filter_map(|resp| { @@ -201,29 +202,35 @@ impl TableInfoManager { }) .flat_map(|r| &r.kvs) .find(|kv| kv.key == raw_key) - .map(|kv| TableInfoValue::try_from_raw_value(&kv.value)) + .map(|kv| DeserializedValueWithBytes::from_inner_slice(&kv.value)) .transpose() } } #[cfg(test)] - pub async fn get_removed(&self, table_id: TableId) -> Result> { + pub async fn get_removed( + &self, + table_id: TableId, + ) -> Result>> { let key = TableInfoKey::new(table_id).to_string(); let removed_key = to_removed_key(&key).into_bytes(); self.kv_backend .get(&removed_key) .await? - .map(|x| TableInfoValue::try_from_raw_value(&x.value)) + .map(|x| DeserializedValueWithBytes::from_inner_slice(&x.value)) .transpose() } - pub async fn get(&self, table_id: TableId) -> Result> { + pub async fn get( + &self, + table_id: TableId, + ) -> Result>> { let key = TableInfoKey::new(table_id); let raw_key = key.as_raw_key(); self.kv_backend .get(&raw_key) .await? - .map(|x| TableInfoValue::try_from_raw_value(&x.value)) + .map(|x| DeserializedValueWithBytes::from_inner_slice(&x.value)) .transpose() } } diff --git a/src/common/meta/src/key/table_route.rs b/src/common/meta/src/key/table_route.rs index 4da390496735..002643a23f02 100644 --- a/src/common/meta/src/key/table_route.rs +++ b/src/common/meta/src/key/table_route.rs @@ -17,6 +17,7 @@ use std::fmt::Display; use serde::{Deserialize, Serialize}; use table::metadata::TableId; +use super::DeserializedValueWithBytes; use crate::error::Result; use crate::key::{to_removed_key, RegionDistribution, TableMetaKey, TABLE_ROUTE_PREFIX}; use crate::kv_backend::txn::{Compare, CompareOp, Txn, TxnOp, TxnOpResponse}; @@ -81,7 +82,7 @@ impl TableRouteManager { table_id: TableId, ) -> ( Txn, - impl FnOnce(&Vec) -> Result>, + impl FnOnce(&Vec) -> Result>>, ) { let key = TableRouteKey::new(table_id); let raw_key = key.as_raw_key(); @@ -97,7 +98,7 @@ impl TableRouteManager { table_route_value: &TableRouteValue, ) -> Result<( Txn, - impl FnOnce(&Vec) -> Result>, + impl FnOnce(&Vec) -> Result>>, )> { let key = TableRouteKey::new(table_id); let raw_key = key.as_raw_key(); @@ -121,15 +122,15 @@ impl TableRouteManager { pub(crate) fn build_update_txn( &self, table_id: TableId, - current_table_route_value: &TableRouteValue, + current_table_route_value: &DeserializedValueWithBytes, new_table_route_value: &TableRouteValue, ) -> Result<( Txn, - impl FnOnce(&Vec) -> Result>, + impl FnOnce(&Vec) -> Result>>, )> { let key = TableRouteKey::new(table_id); let raw_key = key.as_raw_key(); - let raw_value = current_table_route_value.try_as_raw_value()?; + let raw_value = current_table_route_value.bytes.to_vec(); let new_raw_value: Vec = new_table_route_value.try_as_raw_value()?; let txn = Txn::new() @@ -148,11 +149,11 @@ impl TableRouteManager { pub(crate) fn build_delete_txn( &self, table_id: TableId, - table_route_value: &TableRouteValue, + table_route_value: &DeserializedValueWithBytes, ) -> Result { let key = TableRouteKey::new(table_id); let raw_key = key.as_raw_key(); - let raw_value = table_route_value.try_as_raw_value()?; + let raw_value = table_route_value.bytes.to_vec(); let removed_key = to_removed_key(&String::from_utf8_lossy(&raw_key)); let txn = Txn::new().and_then(vec![ @@ -165,7 +166,8 @@ impl TableRouteManager { fn build_decode_fn( raw_key: Vec, - ) -> impl FnOnce(&Vec) -> Result> { + ) -> impl FnOnce(&Vec) -> Result>> + { move |response: &Vec| { response .iter() @@ -178,28 +180,34 @@ impl TableRouteManager { }) .flat_map(|r| &r.kvs) .find(|kv| kv.key == raw_key) - .map(|kv| TableRouteValue::try_from_raw_value(&kv.value)) + .map(|kv| DeserializedValueWithBytes::from_inner_slice(&kv.value)) .transpose() } } - pub async fn get(&self, table_id: TableId) -> Result> { + pub async fn get( + &self, + table_id: TableId, + ) -> Result>> { let key = TableRouteKey::new(table_id); self.kv_backend .get(&key.as_raw_key()) .await? - .map(|kv| TableRouteValue::try_from_raw_value(&kv.value)) + .map(|kv| DeserializedValueWithBytes::from_inner_slice(&kv.value)) .transpose() } #[cfg(test)] - pub async fn get_removed(&self, table_id: TableId) -> Result> { + pub async fn get_removed( + &self, + table_id: TableId, + ) -> Result>> { let key = TableRouteKey::new(table_id).to_string(); let removed_key = to_removed_key(&key).into_bytes(); self.kv_backend .get(&removed_key) .await? - .map(|x| TableRouteValue::try_from_raw_value(&x.value)) + .map(|x| DeserializedValueWithBytes::from_inner_slice(&x.value)) .transpose() } @@ -209,7 +217,7 @@ impl TableRouteManager { ) -> Result> { self.get(table_id) .await? - .map(|table_route| region_distribution(&table_route.region_routes)) + .map(|table_route| region_distribution(&table_route.into_inner().region_routes)) .transpose() } } diff --git a/src/meta-srv/src/procedure/region_failover/activate_region.rs b/src/meta-srv/src/procedure/region_failover/activate_region.rs index 69158c2476a5..69dc51334358 100644 --- a/src/meta-srv/src/procedure/region_failover/activate_region.rs +++ b/src/meta-srv/src/procedure/region_failover/activate_region.rs @@ -70,6 +70,7 @@ impl ActivateRegion { .await .context(error::TableMetadataManagerSnafu)? .context(error::TableInfoNotFoundSnafu { table_id })? + .into_inner() .table_info; let region_storage_path = diff --git a/src/meta-srv/src/procedure/region_failover/update_metadata.rs b/src/meta-srv/src/procedure/region_failover/update_metadata.rs index 98fb6ac557ac..28fbdf9ff214 100644 --- a/src/meta-srv/src/procedure/region_failover/update_metadata.rs +++ b/src/meta-srv/src/procedure/region_failover/update_metadata.rs @@ -81,7 +81,7 @@ impl UpdateRegionMetadata { .context(error::TableMetadataManagerSnafu)? .context(TableRouteNotFoundSnafu { table_id })?; - let mut new_region_routes = table_route_value.region_routes.clone(); + let mut new_region_routes = table_route_value.inner.region_routes.clone(); for region_route in new_region_routes.iter_mut() { if region_route.region.id.region_number() == failed_region.region_number { @@ -220,6 +220,7 @@ mod tests { .await .unwrap() .unwrap() + .into_inner() .region_routes } @@ -373,7 +374,8 @@ mod tests { .get(table_id) .await .unwrap() - .unwrap(); + .unwrap() + .into_inner(); let peers = &extract_all_peers(&table_route_value.region_routes); let actual = &table_route_value.region_routes; @@ -392,7 +394,8 @@ mod tests { .get(table_id) .await .unwrap() - .unwrap(); + .unwrap() + .into_inner(); let map = region_distribution(&table_route_value.region_routes).unwrap(); assert_eq!(map.len(), 2); diff --git a/src/meta-srv/src/procedure/tests.rs b/src/meta-srv/src/procedure/tests.rs index 785d0f8edacb..d9ca7a7d31bc 100644 --- a/src/meta-srv/src/procedure/tests.rs +++ b/src/meta-srv/src/procedure/tests.rs @@ -31,6 +31,7 @@ use common_meta::ddl::create_table::*; use common_meta::ddl::drop_table::DropTableProcedure; use common_meta::key::table_info::TableInfoValue; use common_meta::key::table_route::TableRouteValue; +use common_meta::key::DeserializedValueWithBytes; use common_meta::rpc::ddl::{AlterTableTask, CreateTableTask, DropTableTask}; use common_meta::rpc::router::{find_leaders, RegionRoute}; use common_procedure::Status; @@ -235,8 +236,8 @@ async fn test_on_datanode_drop_regions() { let procedure = DropTableProcedure::new( 1, drop_table_task, - TableRouteValue::new(region_routes), - TableInfoValue::new(test_data::new_table_info()), + DeserializedValueWithBytes::from_inner(TableRouteValue::new(region_routes)), + DeserializedValueWithBytes::from_inner(TableInfoValue::new(test_data::new_table_info())), test_data::new_ddl_context(datanode_manager), ); @@ -299,7 +300,7 @@ fn test_create_alter_region_request() { let procedure = AlterTableProcedure::new( 1, alter_table_task, - TableInfoValue::new(test_data::new_table_info()), + DeserializedValueWithBytes::from_inner(TableInfoValue::new(test_data::new_table_info())), test_data::new_ddl_context(Arc::new(DatanodeClients::default())), ) .unwrap(); @@ -364,7 +365,7 @@ async fn test_submit_alter_region_requests() { let mut procedure = AlterTableProcedure::new( 1, alter_table_task, - TableInfoValue::new(table_info), + DeserializedValueWithBytes::from_inner(TableInfoValue::new(table_info)), context, ) .unwrap(); diff --git a/src/meta-srv/src/selector/load_based.rs b/src/meta-srv/src/selector/load_based.rs index ecafa97d1a5e..77a533ef7f41 100644 --- a/src/meta-srv/src/selector/load_based.rs +++ b/src/meta-srv/src/selector/load_based.rs @@ -41,7 +41,7 @@ async fn get_leader_peer_ids( .context(error::TableMetadataManagerSnafu) .map(|route| { route.map_or_else(Vec::new, |route| { - find_leaders(&route.region_routes) + find_leaders(&route.inner.region_routes) .into_iter() .map(|peer| peer.id) .collect() diff --git a/src/meta-srv/src/table_routes.rs b/src/meta-srv/src/table_routes.rs index 170082aae5ce..8419503354b1 100644 --- a/src/meta-srv/src/table_routes.rs +++ b/src/meta-srv/src/table_routes.rs @@ -36,15 +36,15 @@ pub(crate) async fn fetch_table( let table = Table { id: table_id as u64, - table_name: table_info.table_name(), + table_name: table_info.inner.table_name(), table_schema: vec![], }; - let table_route = TableRoute::new(table, table_route.region_routes); + let table_route = TableRoute::new(table, table_route.inner.region_routes); let table_route_value = table_route .try_into() .context(error::TableRouteConversionSnafu)?; - Ok(Some((table_info, table_route_value))) + Ok(Some((table_info.into_inner(), table_route_value))) } else { Ok(None) } diff --git a/src/partition/src/manager.rs b/src/partition/src/manager.rs index cb02db0d22ee..08e38a640180 100644 --- a/src/partition/src/manager.rs +++ b/src/partition/src/manager.rs @@ -71,7 +71,8 @@ impl PartitionRuleManager { .get(table_id) .await .context(error::TableRouteManagerSnafu)? - .context(error::FindTableRoutesSnafu { table_id })?; + .context(error::FindTableRoutesSnafu { table_id })? + .into_inner(); Ok(RegionRoutes(route.region_routes)) } @@ -87,7 +88,8 @@ impl PartitionRuleManager { .get(table_id) .await .context(error::TableRouteManagerSnafu)? - .context(error::FindTableRoutesSnafu { table_id })?; + .context(error::FindTableRoutesSnafu { table_id })? + .into_inner(); let mut datanodes = HashMap::with_capacity(regions.len()); let region_map = convert_to_region_map(&route.region_routes); for region in regions.iter() { @@ -110,7 +112,8 @@ impl PartitionRuleManager { .get(table_id) .await .context(error::TableRouteManagerSnafu)? - .context(error::FindTableRoutesSnafu { table_id })?; + .context(error::FindTableRoutesSnafu { table_id })? + .into_inner(); let mut peers = Vec::with_capacity(route.region_routes.len()); for peer in &route.region_routes { @@ -129,7 +132,8 @@ impl PartitionRuleManager { .get(table_id) .await .context(error::TableRouteManagerSnafu)? - .context(error::FindTableRoutesSnafu { table_id })?; + .context(error::FindTableRoutesSnafu { table_id })? + .into_inner(); ensure!( !route.region_routes.is_empty(), error::FindTableRoutesSnafu { table_id } diff --git a/tests-integration/src/grpc.rs b/tests-integration/src/grpc.rs index 90ef779d982b..127598468c2c 100644 --- a/tests-integration/src/grpc.rs +++ b/tests-integration/src/grpc.rs @@ -516,7 +516,8 @@ CREATE TABLE {table_name} ( .get(table_id) .await .unwrap() - .unwrap(); + .unwrap() + .into_inner(); let region_to_dn_map = region_distribution(&table_route_value.region_routes) .unwrap() diff --git a/tests-integration/src/instance.rs b/tests-integration/src/instance.rs index 473f919e9406..3bff7bf6b984 100644 --- a/tests-integration/src/instance.rs +++ b/tests-integration/src/instance.rs @@ -213,7 +213,8 @@ mod tests { .get(table_id) .await .unwrap() - .unwrap(); + .unwrap() + .into_inner(); let region_to_dn_map = region_distribution(&table_route_value.region_routes) .unwrap() From 95fb4aa1fa558b63672426ac5249efb14205cb02 Mon Sep 17 00:00:00 2001 From: WenyXu Date: Sun, 8 Oct 2023 12:04:57 +0000 Subject: [PATCH 2/4] refactor: use serialize_str instead --- src/common/meta/src/key.rs | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/src/common/meta/src/key.rs b/src/common/meta/src/key.rs index e9c3038f16f1..e749a7530428 100644 --- a/src/common/meta/src/key.rs +++ b/src/common/meta/src/key.rs @@ -193,7 +193,9 @@ impl Serialize for DeserializedValueWithBytes Deserialize<'de> for DeserializedValu where D: serde::Deserializer<'de>, { - let buf = Bytes::deserialize(deserializer)?; + let buf = String::deserialize(deserializer)?; + let bytes = Bytes::from(buf); - let value = DeserializedValueWithBytes::from_inner_bytes(buf) + let value = DeserializedValueWithBytes::from_inner_bytes(bytes) .map_err(|err| serde::de::Error::custom(err.to_string()))?; Ok(value) From 349428288486b141af86b9fdfd0fa74f9ac96107 Mon Sep 17 00:00:00 2001 From: Weny Xu Date: Sun, 8 Oct 2023 21:58:11 +0900 Subject: [PATCH 3/4] Update src/common/meta/src/key.rs Co-authored-by: JeremyHi --- src/common/meta/src/key.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/common/meta/src/key.rs b/src/common/meta/src/key.rs index e749a7530428..6a75974b0565 100644 --- a/src/common/meta/src/key.rs +++ b/src/common/meta/src/key.rs @@ -700,7 +700,7 @@ mod tests { TableRouteValue::new(vec![region_route.clone(), region_route.clone()]); let expected = serde_json::to_vec(&expected_region_routes).unwrap(); - //Serialize behaviors: + // Serialize behaviors: // The inner field will be ignored. let value = DeserializedValueWithBytes { // ignored From f6a02fe92829c0214398016759a6ee6627a50ea6 Mon Sep 17 00:00:00 2001 From: WenyXu Date: Mon, 9 Oct 2023 05:08:45 +0000 Subject: [PATCH 4/4] chore: apply suggestions from CR --- src/common/meta/src/ddl/alter_table.rs | 6 +-- src/common/meta/src/ddl/drop_table.rs | 4 +- src/common/meta/src/ddl/truncate_table.rs | 2 +- src/common/meta/src/ddl_manager.rs | 2 +- src/common/meta/src/key.rs | 43 +++++++++++-------- src/common/meta/src/key/table_info.rs | 4 +- src/common/meta/src/key/table_route.rs | 4 +- .../region_failover/update_metadata.rs | 2 +- src/meta-srv/src/selector/load_based.rs | 2 +- src/meta-srv/src/table_routes.rs | 8 ++-- 10 files changed, 44 insertions(+), 33 deletions(-) diff --git a/src/common/meta/src/ddl/alter_table.rs b/src/common/meta/src/ddl/alter_table.rs index 4aa37c39802d..0288555a34fb 100644 --- a/src/common/meta/src/ddl/alter_table.rs +++ b/src/common/meta/src/ddl/alter_table.rs @@ -75,7 +75,7 @@ impl AlterTableProcedure { err_msg: "'kind' is absent", })?; let (kind, next_column_id) = - create_proto_alter_kind(&table_info_value.inner.table_info, alter_kind)?; + create_proto_alter_kind(&table_info_value.table_info, alter_kind)?; debug!( "New AlterTableProcedure, kind: {:?}, next_column_id: {:?}", @@ -101,7 +101,7 @@ impl AlterTableProcedure { }) .map_err(ProcedureError::external)?; let (kind, next_column_id) = - create_proto_alter_kind(&data.table_info_value.inner.table_info, alter_kind) + create_proto_alter_kind(&data.table_info_value.table_info, alter_kind) .map_err(ProcedureError::external)?; assert_eq!(data.next_column_id, next_column_id); @@ -446,7 +446,7 @@ impl AlterTableData { } fn table_info(&self) -> &RawTableInfo { - &self.table_info_value.inner.table_info + &self.table_info_value.table_info } } diff --git a/src/common/meta/src/ddl/drop_table.rs b/src/common/meta/src/ddl/drop_table.rs index cd7a9febbb7a..5a06270174f4 100644 --- a/src/common/meta/src/ddl/drop_table.rs +++ b/src/common/meta/src/ddl/drop_table.rs @@ -257,11 +257,11 @@ impl DropTableData { } fn region_routes(&self) -> &Vec { - &self.table_route_value.inner.region_routes + &self.table_route_value.region_routes } fn table_info(&self) -> &RawTableInfo { - &self.table_info_value.inner.table_info + &self.table_info_value.table_info } fn table_id(&self) -> TableId { diff --git a/src/common/meta/src/ddl/truncate_table.rs b/src/common/meta/src/ddl/truncate_table.rs index d23fb3c72693..ed71c4e9fa55 100644 --- a/src/common/meta/src/ddl/truncate_table.rs +++ b/src/common/meta/src/ddl/truncate_table.rs @@ -218,7 +218,7 @@ impl TruncateTableData { } fn table_info(&self) -> &RawTableInfo { - &self.table_info_value.inner.table_info + &self.table_info_value.table_info } fn table_id(&self) -> TableId { diff --git a/src/common/meta/src/ddl_manager.rs b/src/common/meta/src/ddl_manager.rs index 7e0f8f169424..48fb6519c907 100644 --- a/src/common/meta/src/ddl_manager.rs +++ b/src/common/meta/src/ddl_manager.rs @@ -252,7 +252,7 @@ async fn handle_truncate_table_task( table_name: table_ref.to_string(), })?; - let table_route = table_route_value.inner.region_routes; + let table_route = table_route_value.into_inner().region_routes; let id = ddl_manager .submit_truncate_table_task( diff --git a/src/common/meta/src/key.rs b/src/common/meta/src/key.rs index 6a75974b0565..46252c729232 100644 --- a/src/common/meta/src/key.rs +++ b/src/common/meta/src/key.rs @@ -57,6 +57,7 @@ pub mod table_route; use std::collections::{BTreeMap, HashMap}; use std::fmt::Debug; +use std::ops::Deref; use std::sync::Arc; use bytes::Bytes; @@ -170,9 +171,17 @@ macro_rules! ensure_values { /// The `inner` field will be deserialized from the `bytes` field. pub struct DeserializedValueWithBytes { // The original bytes of the inner. - pub bytes: Bytes, + bytes: Bytes, // The value was deserialized from the original bytes. - pub inner: T, + inner: T, +} + +impl Deref for DeserializedValueWithBytes { + type Target = T; + + fn deref(&self) -> &Self::Target { + &self.inner + } } impl Debug for DeserializedValueWithBytes { @@ -244,12 +253,13 @@ impl DeserializedValueWithBytes { self.inner } - pub fn into_bytes(self) -> Bytes { - self.bytes + /// Returns original `bytes` + pub fn into_bytes(&self) -> Vec { + self.bytes.to_vec() } + #[cfg(feature = "testing")] /// Notes: used for test purpose. - /// Due to it was used in other crates, without wrapping with a #[cfg(test)]. pub fn from_inner(inner: T) -> Self { let bytes = serde_json::to_vec(&inner).unwrap(); @@ -426,7 +436,7 @@ impl TableMetadataManager { table_info_value: &DeserializedValueWithBytes, table_route_value: &DeserializedValueWithBytes, ) -> Result<()> { - let table_info = &table_info_value.inner.table_info; + let table_info = &table_info_value.table_info; let table_id = table_info.ident.table_id; // Deletes table name. @@ -446,7 +456,7 @@ impl TableMetadataManager { .build_delete_txn(table_id, table_info_value)?; // Deletes datanode table key value pairs. - let distribution = region_distribution(&table_route_value.inner.region_routes)?; + let distribution = region_distribution(&table_route_value.region_routes)?; let delete_datanode_txn = self .datanode_table_manager() .build_delete_txn(table_id, distribution)?; @@ -477,7 +487,7 @@ impl TableMetadataManager { current_table_info_value: DeserializedValueWithBytes, new_table_name: String, ) -> Result<()> { - let current_table_info = ¤t_table_info_value.inner.table_info; + let current_table_info = ¤t_table_info_value.table_info; let table_id = current_table_info.ident.table_id; let table_name_key = TableNameKey::new( @@ -535,9 +545,9 @@ impl TableMetadataManager { current_table_info_value: DeserializedValueWithBytes, new_table_info: RawTableInfo, ) -> Result<()> { - let table_id = current_table_info_value.inner.table_info.ident.table_id; + let table_id = current_table_info_value.table_info.ident.table_id; - let new_table_info_value = current_table_info_value.inner.update(new_table_info); + let new_table_info_value = current_table_info_value.update(new_table_info); // Updates table info. let (update_table_info_txn, on_update_table_info_failure) = self @@ -570,7 +580,7 @@ impl TableMetadataManager { ) -> Result<()> { // Updates the datanode table key value pairs. let current_region_distribution = - region_distribution(¤t_table_route_value.inner.region_routes)?; + region_distribution(¤t_table_route_value.region_routes)?; let new_region_distribution = region_distribution(&new_region_routes)?; let update_datanode_table_txn = self.datanode_table_manager().build_update_txn( @@ -582,7 +592,7 @@ impl TableMetadataManager { )?; // Updates the table_route. - let new_table_route_value = current_table_route_value.inner.update(new_region_routes); + let new_table_route_value = current_table_route_value.update(new_region_routes); let (update_table_route_txn, on_update_table_route_failure) = self .table_route_manager() @@ -922,9 +932,8 @@ mod tests { .unwrap(); let mut modified_table_info = table_info.clone(); modified_table_info.name = "hi".to_string(); - let modified_table_info_value = DeserializedValueWithBytes::from_inner( - table_info_value.inner.update(modified_table_info), - ); + let modified_table_info_value = + DeserializedValueWithBytes::from_inner(table_info_value.update(modified_table_info)); // if the table_info_value is wrong, it should return an error. // The ABA problem. assert!(table_metadata_manager @@ -1004,7 +1013,7 @@ mod tests { let mut wrong_table_info = table_info.clone(); wrong_table_info.name = "wrong".to_string(); let wrong_table_info_value = DeserializedValueWithBytes::from_inner( - current_table_info_value.inner.update(wrong_table_info), + current_table_info_value.update(wrong_table_info), ); // if the current_table_info_value is wrong, it should return an error. // The ABA problem. @@ -1116,7 +1125,7 @@ mod tests { // if the current_table_route_value is wrong, it should return an error. // The ABA problem. let wrong_table_route_value = - DeserializedValueWithBytes::from_inner(current_table_route_value.inner.update(vec![ + DeserializedValueWithBytes::from_inner(current_table_route_value.update(vec![ new_region_route(1, 1), new_region_route(2, 2), new_region_route(3, 3), diff --git a/src/common/meta/src/key/table_info.rs b/src/common/meta/src/key/table_info.rs index 04ee455fa916..3c5c982a1e6b 100644 --- a/src/common/meta/src/key/table_info.rs +++ b/src/common/meta/src/key/table_info.rs @@ -151,7 +151,7 @@ impl TableInfoManager { )> { let key = TableInfoKey::new(table_id); let raw_key = key.as_raw_key(); - let raw_value = current_table_info_value.bytes.to_vec(); + let raw_value = current_table_info_value.into_bytes(); let txn = Txn::new() .when(vec![Compare::with_value( @@ -176,7 +176,7 @@ impl TableInfoManager { ) -> Result { let key = TableInfoKey::new(table_id); let raw_key = key.as_raw_key(); - let raw_value = table_info_value.bytes.to_vec(); + let raw_value = table_info_value.into_bytes(); let removed_key = to_removed_key(&String::from_utf8_lossy(&raw_key)); let txn = Txn::new().and_then(vec![ diff --git a/src/common/meta/src/key/table_route.rs b/src/common/meta/src/key/table_route.rs index 002643a23f02..57abcf103362 100644 --- a/src/common/meta/src/key/table_route.rs +++ b/src/common/meta/src/key/table_route.rs @@ -130,7 +130,7 @@ impl TableRouteManager { )> { let key = TableRouteKey::new(table_id); let raw_key = key.as_raw_key(); - let raw_value = current_table_route_value.bytes.to_vec(); + let raw_value = current_table_route_value.into_bytes(); let new_raw_value: Vec = new_table_route_value.try_as_raw_value()?; let txn = Txn::new() @@ -153,7 +153,7 @@ impl TableRouteManager { ) -> Result { let key = TableRouteKey::new(table_id); let raw_key = key.as_raw_key(); - let raw_value = table_route_value.bytes.to_vec(); + let raw_value = table_route_value.into_bytes(); let removed_key = to_removed_key(&String::from_utf8_lossy(&raw_key)); let txn = Txn::new().and_then(vec![ diff --git a/src/meta-srv/src/procedure/region_failover/update_metadata.rs b/src/meta-srv/src/procedure/region_failover/update_metadata.rs index 28fbdf9ff214..9d1abc6d64e2 100644 --- a/src/meta-srv/src/procedure/region_failover/update_metadata.rs +++ b/src/meta-srv/src/procedure/region_failover/update_metadata.rs @@ -81,7 +81,7 @@ impl UpdateRegionMetadata { .context(error::TableMetadataManagerSnafu)? .context(TableRouteNotFoundSnafu { table_id })?; - let mut new_region_routes = table_route_value.inner.region_routes.clone(); + let mut new_region_routes = table_route_value.region_routes.clone(); for region_route in new_region_routes.iter_mut() { if region_route.region.id.region_number() == failed_region.region_number { diff --git a/src/meta-srv/src/selector/load_based.rs b/src/meta-srv/src/selector/load_based.rs index 77a533ef7f41..ecafa97d1a5e 100644 --- a/src/meta-srv/src/selector/load_based.rs +++ b/src/meta-srv/src/selector/load_based.rs @@ -41,7 +41,7 @@ async fn get_leader_peer_ids( .context(error::TableMetadataManagerSnafu) .map(|route| { route.map_or_else(Vec::new, |route| { - find_leaders(&route.inner.region_routes) + find_leaders(&route.region_routes) .into_iter() .map(|peer| peer.id) .collect() diff --git a/src/meta-srv/src/table_routes.rs b/src/meta-srv/src/table_routes.rs index 8419503354b1..593b6c384610 100644 --- a/src/meta-srv/src/table_routes.rs +++ b/src/meta-srv/src/table_routes.rs @@ -32,14 +32,16 @@ pub(crate) async fn fetch_table( .context(TableMetadataManagerSnafu)?; if let Some(table_info) = table_info { - let table_route = table_route.context(TableRouteNotFoundSnafu { table_id })?; + let table_route = table_route + .context(TableRouteNotFoundSnafu { table_id })? + .into_inner(); let table = Table { id: table_id as u64, - table_name: table_info.inner.table_name(), + table_name: table_info.table_name(), table_schema: vec![], }; - let table_route = TableRoute::new(table, table_route.inner.region_routes); + let table_route = TableRoute::new(table, table_route.region_routes); let table_route_value = table_route .try_into() .context(error::TableRouteConversionSnafu)?;