From fcd0ceea94dab8f70be52c7f4b53e176f9b1e216 Mon Sep 17 00:00:00 2001 From: Ruihang Xia Date: Thu, 7 Nov 2024 21:25:05 +0800 Subject: [PATCH] fix: column already exists (#4961) * fix: merge fetched logical metadata with existing cache Signed-off-by: Ruihang Xia * fix log acquire Signed-off-by: Ruihang Xia * Update src/metric-engine/src/engine/region_metadata.rs Co-authored-by: Yingwen --------- Signed-off-by: Ruihang Xia Co-authored-by: Yingwen --- .../src/engine/region_metadata.rs | 33 ++++++++++++++----- src/metric-engine/src/engine/state.rs | 14 +++----- 2 files changed, 28 insertions(+), 19 deletions(-) diff --git a/src/metric-engine/src/engine/region_metadata.rs b/src/metric-engine/src/engine/region_metadata.rs index 171480c58978..9f00235e9682 100644 --- a/src/metric-engine/src/engine/region_metadata.rs +++ b/src/metric-engine/src/engine/region_metadata.rs @@ -14,6 +14,8 @@ //! Implementation of retrieving logical region's region metadata. +use std::collections::HashMap; + use store_api::metadata::ColumnMetadata; use store_api::storage::RegionId; @@ -46,23 +48,36 @@ impl MetricEngineInner { .read_lock_logical_region(logical_region_id) .await; // Load logical and physical columns, and intersect them to get logical column metadata. - let mut logical_column_metadata = self + let logical_column_metadata = self .metadata_region .logical_columns(physical_region_id, logical_region_id) .await? .into_iter() .map(|(_, column_metadata)| column_metadata) .collect::>(); - // Sort columns on column name to ensure the order - logical_column_metadata - .sort_unstable_by(|c1, c2| c1.column_schema.name.cmp(&c2.column_schema.name)); + // Update cache - self.state - .write() - .unwrap() - .add_logical_columns(logical_region_id, logical_column_metadata.clone()); + let mut mutable_state = self.state.write().unwrap(); + // Merge with existing cached columns. + let existing_columns = mutable_state + .logical_columns() + .get(&logical_region_id) + .cloned() + .unwrap_or_default() + .into_iter(); + let mut dedup_columns = logical_column_metadata + .into_iter() + .chain(existing_columns) + .map(|c| (c.column_id, c)) + .collect::>() + .values() + .cloned() + .collect::>(); + // Sort columns on column name to ensure the order + dedup_columns.sort_unstable_by(|c1, c2| c1.column_schema.name.cmp(&c2.column_schema.name)); + mutable_state.set_logical_columns(logical_region_id, dedup_columns.clone()); - Ok(logical_column_metadata) + Ok(dedup_columns) } /// Load logical column names of a logical region. diff --git a/src/metric-engine/src/engine/state.rs b/src/metric-engine/src/engine/state.rs index 24ab5a31bfd1..197e3f9da730 100644 --- a/src/metric-engine/src/engine/state.rs +++ b/src/metric-engine/src/engine/state.rs @@ -85,19 +85,13 @@ impl MetricEngineState { .insert(logical_region_id, physical_region_id); } - /// Add and reorder logical columns. - /// - /// Caller should make sure: - /// 1. there is no duplicate columns - /// 2. the column order is the same with the order in the metadata, which is - /// alphabetically ordered on column name. - pub fn add_logical_columns( + /// Replace the logical columns of the logical region with given columns. + pub fn set_logical_columns( &mut self, logical_region_id: RegionId, - new_columns: impl IntoIterator, + columns: Vec, ) { - let columns = self.logical_columns.entry(logical_region_id).or_default(); - columns.extend(new_columns); + self.logical_columns.insert(logical_region_id, columns); } pub fn get_physical_region_id(&self, logical_region_id: RegionId) -> Option {