Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add more log for 2.1 specific branch #20283

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions src/ctl/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,7 @@ enum ComputeCommands {
ShowConfig { host: String },
}

#[allow(clippy::large_enum_variant)]
#[derive(Subcommand)]
enum HummockCommands {
/// list latest Hummock version on meta node
Expand Down
12 changes: 8 additions & 4 deletions src/meta/src/barrier/recovery.rs
Original file line number Diff line number Diff line change
Expand Up @@ -417,6 +417,8 @@ impl GlobalBarrierManagerContext {
.into_iter()
.collect();

tracing::info!("all inused worker slots {:?}", all_inuse_worker_slots);

let active_worker_slots: HashSet<_> = active_nodes
.current()
.values()
Expand All @@ -435,9 +437,11 @@ impl GlobalBarrierManagerContext {
return self.resolve_graph_info().await;
}

debug!("start migrate actors.");
info!("expired worker slots {:?}", expired_worker_slots);

info!("start migrating actors.");
let mut to_migrate_worker_slots = expired_worker_slots.into_iter().rev().collect_vec();
debug!("got to migrate worker slots {:#?}", to_migrate_worker_slots);
info!("got to migrate worker slots {:#?}", to_migrate_worker_slots);

let mut inuse_worker_slots: HashSet<_> = all_inuse_worker_slots
.intersection(&active_worker_slots)
Expand Down Expand Up @@ -497,10 +501,10 @@ impl GlobalBarrierManagerContext {
}

if !new_worker_slots.is_empty() {
debug!("new worker slots found: {:#?}", new_worker_slots);
info!("new worker slots found: {:#?}", new_worker_slots);
for target_worker_slot in new_worker_slots {
if let Some(from) = to_migrate_worker_slots.pop() {
debug!(
info!(
"plan to migrate from worker slot {} to {}",
from, target_worker_slot
);
Expand Down
3 changes: 2 additions & 1 deletion src/meta/src/controller/fragment.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ use sea_orm::{
ColumnTrait, DbErr, EntityTrait, JoinType, ModelTrait, PaginatorTrait, QueryFilter,
QuerySelect, RelationTrait, SelectGetableTuple, Selector, TransactionTrait, Value,
};

use tracing::info;
use crate::controller::catalog::{CatalogController, CatalogControllerInner};
use crate::controller::utils::{
get_actor_dispatchers, get_fragment_mappings, rebuild_fragment_mapping_from_actors,
Expand Down Expand Up @@ -1033,6 +1033,7 @@ impl CatalogController {
let mut actor_migration_plan = HashMap::new();
for (worker, fragment) in actor_locations {
if expired_workers.contains(&worker) {
info!("worker {} expired, migrating actors {:?}", worker, fragment);
for (_, actors) in fragment {
let worker_slot_to_actor: HashMap<_, _> = actors
.iter()
Expand Down
99 changes: 94 additions & 5 deletions src/meta/src/hummock/manager/commit_epoch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,9 @@
use std::collections::{BTreeMap, HashMap, HashSet};
use std::sync::Arc;

use itertools::Itertools;
use risingwave_common::catalog::TableId;
use risingwave_common::config::default::compaction_config;
use risingwave_hummock_sdk::change_log::ChangeLogDelta;
use risingwave_hummock_sdk::compaction_group::group_split::split_sst_with_table_ids;
use risingwave_hummock_sdk::compaction_group::StaticCompactionGroupId;
Expand Down Expand Up @@ -118,7 +120,7 @@ impl HummockManager {
let state_table_info = &version.latest_version().state_table_info;
let mut table_compaction_group_mapping = state_table_info.build_table_compaction_group_id();
let mut new_table_ids = HashMap::new();
let mut new_compaction_groups = HashMap::new();
let mut new_compaction_groups = Vec::new();
let mut compaction_group_manager_txn = None;
let mut compaction_group_config: Option<Arc<CompactionConfig>> = None;

Expand Down Expand Up @@ -171,8 +173,11 @@ impl HummockManager {
)
};
let new_compaction_group_id = next_compaction_group_id(&self.env).await?;
new_compaction_groups
.insert(new_compaction_group_id, compaction_group_config.clone());
let new_compaction_group = CompactionGroup {
group_id: new_compaction_group_id,
compaction_config: compaction_group_config.clone(),
};
new_compaction_groups.push(new_compaction_group);
compaction_group_manager.insert(
new_compaction_group_id,
CompactionGroup {
Expand All @@ -196,12 +201,35 @@ impl HummockManager {
.correct_commit_ssts(sstables, &table_compaction_group_mapping)
.await?;

let modified_compaction_groups: Vec<_> = commit_sstables.keys().cloned().collect();
let modified_compaction_groups = commit_sstables.keys().cloned().collect_vec();
// fill compaction_groups
let mut group_id_to_config = HashMap::new();
if let Some(compaction_group_manager) = compaction_group_manager_txn.as_ref() {
for cg_id in &modified_compaction_groups {
let compaction_group = compaction_group_manager
.get(cg_id)
.unwrap_or_else(|| panic!("compaction group {} should be created", cg_id))
.compaction_config();
group_id_to_config.insert(*cg_id, compaction_group);
}
} else {
let compaction_group_manager = self.compaction_group_manager.read().await;
for cg_id in &modified_compaction_groups {
let compaction_group = compaction_group_manager
.try_get_compaction_group_config(*cg_id)
.unwrap_or_else(|| panic!("compaction group {} should be created", cg_id))
.compaction_config();
group_id_to_config.insert(*cg_id, compaction_group);
}
}

let group_id_to_sub_levels =
rewrite_commit_sstables_to_sub_level(commit_sstables, &group_id_to_config);

let time_travel_delta = version.pre_commit_epoch(
&tables_to_commit,
new_compaction_groups,
commit_sstables,
group_id_to_sub_levels,
&new_table_ids,
new_table_watermarks,
change_log_delta,
Expand Down Expand Up @@ -340,6 +368,7 @@ impl HummockManager {
) -> Result<BTreeMap<CompactionGroupId, Vec<SstableInfo>>> {
let mut new_sst_id_number = 0;
let mut sst_to_cg_vec = Vec::with_capacity(sstables.len());
let commit_object_id_vec = sstables.iter().map(|s| s.sst_info.object_id).collect_vec();
for commit_sst in sstables {
let mut group_table_ids: BTreeMap<u64, Vec<u32>> = BTreeMap::new();
for table_id in &commit_sst.sst_info.table_ids {
Expand Down Expand Up @@ -408,6 +437,12 @@ impl HummockManager {
}
}

// order check
for ssts in commit_sstables.values() {
let object_ids = ssts.iter().map(|s| s.object_id).collect_vec();
assert!(is_ordered_subset(&commit_object_id_vec, &object_ids));
}

Ok(commit_sstables)
}
}
Expand All @@ -432,3 +467,57 @@ fn on_handle_add_new_table(

Ok(())
}

/// Rewrite the commit sstables to sub-levels based on the compaction group config.
/// The type of `compaction_group_manager_txn` is too complex to be used in the function signature. So we use `HashMap` instead.
fn rewrite_commit_sstables_to_sub_level(
commit_sstables: BTreeMap<CompactionGroupId, Vec<SstableInfo>>,
group_id_to_config: &HashMap<CompactionGroupId, Arc<CompactionConfig>>,
) -> BTreeMap<CompactionGroupId, Vec<Vec<SstableInfo>>> {
let mut overlapping_sstables: BTreeMap<u64, Vec<Vec<SstableInfo>>> = BTreeMap::new();
for (group_id, inserted_table_infos) in commit_sstables {
let config = group_id_to_config
.get(&group_id)
.expect("compaction group should exist");

let mut accumulated_size = 0;
let mut ssts = vec![];
let sub_level_size_limit = config
.max_overlapping_level_size
.unwrap_or(compaction_config::max_overlapping_level_size());

let level = overlapping_sstables.entry(group_id).or_default();

for sst in inserted_table_infos {
accumulated_size += sst.sst_size;
ssts.push(sst);
if accumulated_size > sub_level_size_limit {
level.push(ssts);

// reset the accumulated size and ssts
accumulated_size = 0;
ssts = vec![];
}
}

if !ssts.is_empty() {
level.push(ssts);
}

// The uploader organizes the ssts in decreasing epoch order, so the level needs to be reversed to ensure that the latest epoch is at the top.
level.reverse();
}

overlapping_sstables
}

fn is_ordered_subset(vec_1: &Vec<u64>, vec_2: &Vec<u64>) -> bool {
let mut vec_2_iter = vec_2.iter().peekable();
for item in vec_1 {
if vec_2_iter.peek() == Some(&item) {
vec_2_iter.next();
}
}

vec_2_iter.peek().is_none()
}
54 changes: 26 additions & 28 deletions src/meta/src/hummock/manager/transaction.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@

use std::collections::{BTreeMap, HashMap};
use std::ops::{Deref, DerefMut};
use std::sync::Arc;

use risingwave_common::catalog::TableId;
use risingwave_hummock_sdk::change_log::ChangeLogDelta;
Expand All @@ -24,11 +23,12 @@ use risingwave_hummock_sdk::table_watermark::TableWatermarks;
use risingwave_hummock_sdk::version::{GroupDelta, HummockVersion, HummockVersionDelta};
use risingwave_hummock_sdk::{CompactionGroupId, FrontendHummockVersionDelta, HummockVersionId};
use risingwave_pb::hummock::{
CompactionConfig, CompatibilityVersion, GroupConstruct, HummockVersionDeltas,
HummockVersionStats, StateTableInfoDelta,
CompatibilityVersion, GroupConstruct, HummockVersionDeltas, HummockVersionStats,
StateTableInfoDelta,
};
use risingwave_pb::meta::subscribe_response::{Info, Operation};

use crate::hummock::model::CompactionGroup;
use crate::manager::NotificationManager;
use crate::model::{
InMemValTransaction, MetadataModelResult, Transactional, ValTransaction, VarTransaction,
Expand Down Expand Up @@ -111,8 +111,8 @@ impl<'a> HummockVersionTransaction<'a> {
pub(super) fn pre_commit_epoch(
&mut self,
tables_to_commit: &HashMap<TableId, u64>,
new_compaction_groups: HashMap<CompactionGroupId, Arc<CompactionConfig>>,
commit_sstables: BTreeMap<CompactionGroupId, Vec<SstableInfo>>,
new_compaction_groups: Vec<CompactionGroup>,
group_id_to_sub_levels: BTreeMap<CompactionGroupId, Vec<Vec<SstableInfo>>>,
new_table_ids: &HashMap<TableId, CompactionGroupId>,
new_table_watermarks: HashMap<TableId, TableWatermarks>,
change_log_delta: HashMap<TableId, ChangeLogDelta>,
Expand All @@ -121,38 +121,36 @@ impl<'a> HummockVersionTransaction<'a> {
new_version_delta.new_table_watermarks = new_table_watermarks;
new_version_delta.change_log_delta = change_log_delta;

for (compaction_group_id, compaction_group_config) in new_compaction_groups {
{
let group_deltas = &mut new_version_delta
.group_deltas
.entry(compaction_group_id)
.or_default()
.group_deltas;

#[expect(deprecated)]
group_deltas.push(GroupDelta::GroupConstruct(GroupConstruct {
group_config: Some((*compaction_group_config).clone()),
group_id: compaction_group_id,
parent_group_id: StaticCompactionGroupId::NewCompactionGroup
as CompactionGroupId,
new_sst_start_id: 0, // No need to set it when `NewCompactionGroup`
table_ids: vec![],
version: CompatibilityVersion::SplitGroupByTableId as i32,
split_key: None,
}));
}
for compaction_group in &new_compaction_groups {
let group_deltas = &mut new_version_delta
.group_deltas
.entry(compaction_group.group_id())
.or_default()
.group_deltas;

#[expect(deprecated)]
group_deltas.push(GroupDelta::GroupConstruct(GroupConstruct {
group_config: Some(compaction_group.compaction_config().as_ref().clone()),
group_id: compaction_group.group_id(),
parent_group_id: StaticCompactionGroupId::NewCompactionGroup as CompactionGroupId,
new_sst_start_id: 0, // No need to set it when `NewCompactionGroup`
table_ids: vec![],
version: CompatibilityVersion::SplitGroupByTableId as i32,
split_key: None,
}));
}

// Append SSTs to a new version.
for (compaction_group_id, inserted_table_infos) in commit_sstables {
for (compaction_group_id, sub_levels) in group_id_to_sub_levels {
let group_deltas = &mut new_version_delta
.group_deltas
.entry(compaction_group_id)
.or_default()
.group_deltas;
let group_delta = GroupDelta::NewL0SubLevel(inserted_table_infos);

group_deltas.push(group_delta);
for sub_level in sub_levels {
group_deltas.push(GroupDelta::NewL0SubLevel(sub_level));
}
}

// update state table info
Expand Down
Loading
Loading