Skip to content

Commit

Permalink
perf: add last insert schema hash cache
Browse files Browse the repository at this point in the history
  • Loading branch information
WenyXu committed Sep 19, 2024
1 parent 1acda74 commit 56f2500
Show file tree
Hide file tree
Showing 2 changed files with 37 additions and 1 deletion.
37 changes: 36 additions & 1 deletion src/operator/src/insert.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,11 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use core::hash;

Check failure on line 15 in src/operator/src/insert.rs

View workflow job for this annotation

GitHub Actions / Check (ubuntu-20.04)

unused import: `core::hash`

Check failure on line 15 in src/operator/src/insert.rs

View workflow job for this annotation

GitHub Actions / Clippy

unused import: `core::hash`
use std::collections::HashMap;
use std::hash::{DefaultHasher, Hasher};
use std::sync::Arc;
use std::time::Duration;

use api::v1::alter_expr::Kind;
use api::v1::region::{InsertRequests as RegionInsertRequests, RegionRequestHeader};
Expand All @@ -34,6 +37,7 @@ use common_telemetry::tracing_context::TracingContext;
use common_telemetry::{error, info, warn};
use futures_util::future;
use meter_macros::write_meter;
use moka::sync::{Cache, CacheBuilder};
use partition::manager::PartitionRuleManagerRef;
use session::context::QueryContextRef;
use snafu::prelude::*;
Expand Down Expand Up @@ -62,6 +66,7 @@ pub struct Inserter {
partition_manager: PartitionRuleManagerRef,
node_manager: NodeManagerRef,
table_flownode_set_cache: TableFlownodeSetCacheRef,
last_insert_schema_hash_cache: Cache<TableId, u64>,
}

pub type InserterRef = Arc<Inserter>;
Expand Down Expand Up @@ -90,18 +95,27 @@ impl AutoCreateTableType {
}
}

const DEFAULT_CACHE_MAX_CAPACITY: u64 = 65536;
const DEFAULT_CACHE_TTL: Duration = Duration::from_secs(10 * 60);
const DEFAULT_CACHE_TTI: Duration = Duration::from_secs(5 * 60);

impl Inserter {
pub fn new(
catalog_manager: CatalogManagerRef,
partition_manager: PartitionRuleManagerRef,
node_manager: NodeManagerRef,
table_flownode_set_cache: TableFlownodeSetCacheRef,
) -> Self {
let last_insert_schema_hash_cache = CacheBuilder::new(DEFAULT_CACHE_MAX_CAPACITY)
.time_to_idle(DEFAULT_CACHE_TTI)
.time_to_live(DEFAULT_CACHE_TTL)
.build();
Self {
catalog_manager,
partition_manager,
node_manager,
table_flownode_set_cache,
last_insert_schema_hash_cache,
}
}

Expand Down Expand Up @@ -641,17 +655,38 @@ impl Inserter {
.context(CatalogSnafu)
}

fn request_schema_hash(request_schema: &[ColumnSchema]) -> u64 {
let mut hasher = DefaultHasher::default();
for schema in request_schema {
hasher.write_str(&schema.column_name);
}
hasher.finish()
}

fn has_same_schema(&self, table_id: TableId, request_schema: &[ColumnSchema]) -> bool {
let request_schema_hash = Self::request_schema_hash(request_schema);
let last_insert_schema_hash = self
.last_insert_schema_hash_cache
.get_with(table_id, || request_schema_hash);
request_schema_hash == last_insert_schema_hash
}

fn get_alter_table_expr_on_demand(
&self,
req: &RowInsertRequest,
table: TableRef,
ctx: &QueryContextRef,
) -> Result<Option<AlterExpr>> {
let request_schema = req.rows.as_ref().unwrap().schema.as_slice();
// Fast-path
if self.has_same_schema(table.table_info().table_id(), request_schema) {
return Ok(None);
}

let catalog_name = ctx.current_catalog();
let schema_name = ctx.current_schema();
let table_name = table.table_info().name.clone();

let request_schema = req.rows.as_ref().unwrap().schema.as_slice();
let column_exprs = ColumnExpr::from_column_schemas(request_schema);
let add_columns = extract_new_columns(&table.schema(), column_exprs)
.context(FindNewColumnsOnInsertionSnafu)?;
Expand Down
1 change: 1 addition & 0 deletions src/operator/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

#![feature(assert_matches)]
#![feature(if_let_guard)]
#![feature(hasher_prefixfree_extras)]

pub mod delete;
pub mod error;
Expand Down

0 comments on commit 56f2500

Please sign in to comment.