Skip to content

Commit

Permalink
fix: bring back inactive_region_ids
Browse files Browse the repository at this point in the history
  • Loading branch information
WenyXu committed Nov 21, 2023
1 parent 5f87b1f commit 3242457
Show file tree
Hide file tree
Showing 5 changed files with 22 additions and 15 deletions.
3 changes: 2 additions & 1 deletion src/meta-srv/src/handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ use common_telemetry::{debug, info, warn};
use dashmap::DashMap;
use futures::future::join_all;
use snafu::{OptionExt, ResultExt};
use store_api::storage::RegionId;
use tokio::sync::mpsc::Sender;
use tokio::sync::{oneshot, Notify, RwLock};

Expand Down Expand Up @@ -75,7 +76,7 @@ pub struct HeartbeatAccumulator {
pub header: Option<ResponseHeader>,
pub instructions: Vec<Instruction>,
pub stat: Option<Stat>,
pub inactive_region_ids: HashSet<u64>,
pub inactive_region_ids: HashSet<RegionId>,
pub region_lease: Option<RegionLease>,
}

Expand Down
6 changes: 3 additions & 3 deletions src/meta-srv/src/handler/failure_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ use api::v1::meta::{HeartbeatRequest, Role};
use async_trait::async_trait;
use common_catalog::consts::default_engine;
use common_meta::RegionIdent;
use store_api::storage::RegionId;

use crate::error::Result;
use crate::failure_detector::PhiAccrualFailureDetectorOptions;
Expand Down Expand Up @@ -86,7 +85,7 @@ impl HeartbeatHandler for RegionFailureHandler {
.region_stats
.iter()
.map(|x| {
let region_id = RegionId::from(x.id);
let region_id = x.id;
RegionIdent {
cluster_id: stat.cluster_id,
datanode_id: stat.id,
Expand All @@ -108,6 +107,7 @@ impl HeartbeatHandler for RegionFailureHandler {
#[cfg(test)]
mod tests {
use store_api::region_engine::RegionRole;
use store_api::storage::RegionId;

use super::*;
use crate::handler::node_stat::{RegionStat, Stat};
Expand All @@ -133,7 +133,7 @@ mod tests {
let acc = &mut HeartbeatAccumulator::default();
fn new_region_stat(region_id: u64) -> RegionStat {
RegionStat {
id: region_id,
id: RegionId::from_u64(region_id),
rcus: 0,
wcus: 0,
approximate_bytes: 0,
Expand Down
11 changes: 4 additions & 7 deletions src/meta-srv/src/handler/node_stat.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ pub struct Stat {
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct RegionStat {
/// The region_id.
pub id: u64,
pub id: RegionId,
/// The read capacity units during this period
pub rcus: i64,
/// The write capacity units during this period
Expand Down Expand Up @@ -75,13 +75,10 @@ impl Stat {

/// Returns a tuple array containing [RegionId] and [RegionRole].
pub fn regions(&self) -> Vec<(RegionId, RegionRole)> {
self.region_stats
.iter()
.map(|s| (RegionId::from(s.id), s.role))
.collect()
self.region_stats.iter().map(|s| (s.id, s.role)).collect()
}

pub fn retain_active_region_stats(&mut self, inactive_region_ids: &HashSet<u64>) {
pub fn retain_active_region_stats(&mut self, inactive_region_ids: &HashSet<RegionId>) {
if inactive_region_ids.is_empty() {
return;
}
Expand Down Expand Up @@ -140,7 +137,7 @@ impl TryFrom<api::v1::meta::RegionStat> for RegionStat {

fn try_from(value: api::v1::meta::RegionStat) -> Result<Self, Self::Error> {
Ok(Self {
id: value.region_id,
id: RegionId::from_u64(value.region_id),
rcus: value.rcus,
wcus: value.wcus,
approximate_bytes: value.approximate_bytes,
Expand Down
10 changes: 9 additions & 1 deletion src/meta-srv/src/handler/region_lease_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,7 @@ impl HeartbeatHandler for RegionLeaseHandler {
let cluster_id = stat.cluster_id;
let datanode_id = stat.id;
let mut granted_regions = Vec::with_capacity(regions.len());
let mut inactive_regions = HashSet::new();

let (leaders, followers): (Vec<_>, Vec<_>) = regions
.into_iter()
Expand All @@ -122,6 +123,7 @@ impl HeartbeatHandler for RegionLeaseHandler {
&leaders,
RegionRole::Leader,
);
inactive_regions.extend(closable);

let followers = followers.into_iter().flatten().collect::<Vec<_>>();

Expand All @@ -142,7 +144,9 @@ impl HeartbeatHandler for RegionLeaseHandler {
&followers,
RegionRole::Follower,
);
inactive_regions.extend(closable);

acc.inactive_region_ids = inactive_regions;
acc.region_lease = Some(RegionLease {
regions: granted_regions
.into_iter()
Expand Down Expand Up @@ -184,7 +188,7 @@ mod test {

fn new_empty_region_stat(region_id: RegionId, role: RegionRole) -> RegionStat {
RegionStat {
id: region_id.as_u64(),
id: region_id,
role,
rcus: 0,
wcus: 0,
Expand Down Expand Up @@ -253,6 +257,7 @@ mod test {
handler.handle(&req, ctx, acc).await.unwrap();

assert_region_lease(acc, vec![GrantedRegion::new(region_id, RegionRole::Leader)]);
assert_eq!(acc.inactive_region_ids, HashSet::from([another_region_id]));

let acc = &mut HeartbeatAccumulator::default();

Expand All @@ -277,6 +282,7 @@ mod test {
acc,
vec![GrantedRegion::new(region_id, RegionRole::Follower)],
);
assert_eq!(acc.inactive_region_ids, HashSet::from([another_region_id]));

let opening_region_id = RegionId::new(table_id, region_number + 2);
let _guard = opening_region_keeper
Expand Down Expand Up @@ -310,6 +316,7 @@ mod test {
GrantedRegion::new(opening_region_id, RegionRole::Follower),
],
);
assert_eq!(acc.inactive_region_ids, HashSet::from([another_region_id]));
}

#[tokio::test]
Expand Down Expand Up @@ -385,6 +392,7 @@ mod test {
GrantedRegion::new(another_region_id, RegionRole::Leader),
],
);
assert_eq!(acc.inactive_region_ids, HashSet::from([no_exist_region_id]));
}

fn assert_region_lease(acc: &HeartbeatAccumulator, expected: Vec<GrantedRegion>) {
Expand Down
7 changes: 4 additions & 3 deletions src/meta-srv/src/selector/weight_compute.rs
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,7 @@ mod tests {

use api::v1::meta::Peer;
use store_api::region_engine::RegionRole;
use store_api::storage::RegionId;

use super::{RegionNumsBasedWeightCompute, WeightCompute};
use crate::handler::node_stat::{RegionStat, Stat};
Expand Down Expand Up @@ -189,7 +190,7 @@ mod tests {
addr: "127.0.0.1:3001".to_string(),
region_num: 11,
region_stats: vec![RegionStat {
id: 111,
id: RegionId::from_u64(111),
rcus: 1,
wcus: 1,
approximate_bytes: 1,
Expand All @@ -206,7 +207,7 @@ mod tests {
addr: "127.0.0.1:3002".to_string(),
region_num: 12,
region_stats: vec![RegionStat {
id: 112,
id: RegionId::from_u64(112),
rcus: 1,
wcus: 1,
approximate_bytes: 1,
Expand All @@ -223,7 +224,7 @@ mod tests {
addr: "127.0.0.1:3003".to_string(),
region_num: 13,
region_stats: vec![RegionStat {
id: 113,
id: RegionId::from_u64(113),
rcus: 1,
wcus: 1,
approximate_bytes: 1,
Expand Down

0 comments on commit 3242457

Please sign in to comment.