Skip to content

Commit

Permalink
fix: column already exists (#4961)
Browse files Browse the repository at this point in the history
* fix: merge fetched logical metadata with existing cache

Signed-off-by: Ruihang Xia <[email protected]>

* fix log acquire

Signed-off-by: Ruihang Xia <[email protected]>

* Update src/metric-engine/src/engine/region_metadata.rs

Co-authored-by: Yingwen <[email protected]>

---------

Signed-off-by: Ruihang Xia <[email protected]>
Co-authored-by: Yingwen <[email protected]>
  • Loading branch information
waynexia and evenyag authored Nov 7, 2024
1 parent 22f31f5 commit fcd0cee
Show file tree
Hide file tree
Showing 2 changed files with 28 additions and 19 deletions.
33 changes: 24 additions & 9 deletions src/metric-engine/src/engine/region_metadata.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@

//! Implementation of retrieving logical region's region metadata.
use std::collections::HashMap;

use store_api::metadata::ColumnMetadata;
use store_api::storage::RegionId;

Expand Down Expand Up @@ -46,23 +48,36 @@ impl MetricEngineInner {
.read_lock_logical_region(logical_region_id)
.await;
// Load logical and physical columns, and intersect them to get logical column metadata.
let mut logical_column_metadata = self
let logical_column_metadata = self
.metadata_region
.logical_columns(physical_region_id, logical_region_id)
.await?
.into_iter()
.map(|(_, column_metadata)| column_metadata)
.collect::<Vec<_>>();
// Sort columns on column name to ensure the order
logical_column_metadata
.sort_unstable_by(|c1, c2| c1.column_schema.name.cmp(&c2.column_schema.name));

// Update cache
self.state
.write()
.unwrap()
.add_logical_columns(logical_region_id, logical_column_metadata.clone());
let mut mutable_state = self.state.write().unwrap();
// Merge with existing cached columns.
let existing_columns = mutable_state
.logical_columns()
.get(&logical_region_id)
.cloned()
.unwrap_or_default()
.into_iter();
let mut dedup_columns = logical_column_metadata
.into_iter()
.chain(existing_columns)
.map(|c| (c.column_id, c))
.collect::<HashMap<_, _>>()
.values()
.cloned()
.collect::<Vec<_>>();
// Sort columns on column name to ensure the order
dedup_columns.sort_unstable_by(|c1, c2| c1.column_schema.name.cmp(&c2.column_schema.name));
mutable_state.set_logical_columns(logical_region_id, dedup_columns.clone());

Ok(logical_column_metadata)
Ok(dedup_columns)
}

/// Load logical column names of a logical region.
Expand Down
14 changes: 4 additions & 10 deletions src/metric-engine/src/engine/state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -85,19 +85,13 @@ impl MetricEngineState {
.insert(logical_region_id, physical_region_id);
}

/// Add and reorder logical columns.
///
/// Caller should make sure:
/// 1. there is no duplicate columns
/// 2. the column order is the same with the order in the metadata, which is
/// alphabetically ordered on column name.
pub fn add_logical_columns(
/// Replace the logical columns of the logical region with given columns.
pub fn set_logical_columns(
&mut self,
logical_region_id: RegionId,
new_columns: impl IntoIterator<Item = ColumnMetadata>,
columns: Vec<ColumnMetadata>,
) {
let columns = self.logical_columns.entry(logical_region_id).or_default();
columns.extend(new_columns);
self.logical_columns.insert(logical_region_id, columns);
}

pub fn get_physical_region_id(&self, logical_region_id: RegionId) -> Option<RegionId> {
Expand Down

0 comments on commit fcd0cee

Please sign in to comment.