Skip to content

Commit

Permalink
enhance visibility by upgrading log levels and adding new info logs…
Browse files Browse the repository at this point in the history
… for key events in recovery.rs and fragment.rs
  • Loading branch information
shanicky committed Jan 23, 2025
1 parent e49f440 commit ddc9600
Show file tree
Hide file tree
Showing 2 changed files with 10 additions and 5 deletions.
12 changes: 8 additions & 4 deletions src/meta/src/barrier/recovery.rs
Original file line number Diff line number Diff line change
Expand Up @@ -417,6 +417,8 @@ impl GlobalBarrierManagerContext {
.into_iter()
.collect();

tracing::info!("all inused worker slots {:?}", all_inuse_worker_slots);

let active_worker_slots: HashSet<_> = active_nodes
.current()
.values()
Expand All @@ -435,9 +437,11 @@ impl GlobalBarrierManagerContext {
return self.resolve_graph_info().await;
}

debug!("start migrate actors.");
info!("expired worker slots {:?}", expired_worker_slots);

info!("start migrating actors.");
let mut to_migrate_worker_slots = expired_worker_slots.into_iter().rev().collect_vec();
debug!("got to migrate worker slots {:#?}", to_migrate_worker_slots);
info!("got to migrate worker slots {:#?}", to_migrate_worker_slots);

let mut inuse_worker_slots: HashSet<_> = all_inuse_worker_slots
.intersection(&active_worker_slots)
Expand Down Expand Up @@ -497,10 +501,10 @@ impl GlobalBarrierManagerContext {
}

if !new_worker_slots.is_empty() {
debug!("new worker slots found: {:#?}", new_worker_slots);
info!("new worker slots found: {:#?}", new_worker_slots);
for target_worker_slot in new_worker_slots {
if let Some(from) = to_migrate_worker_slots.pop() {
debug!(
info!(
"plan to migrate from worker slot {} to {}",
from, target_worker_slot
);
Expand Down
3 changes: 2 additions & 1 deletion src/meta/src/controller/fragment.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ use sea_orm::{
ColumnTrait, DbErr, EntityTrait, JoinType, ModelTrait, PaginatorTrait, QueryFilter,
QuerySelect, RelationTrait, SelectGetableTuple, Selector, TransactionTrait, Value,
};

use tracing::info;
use crate::controller::catalog::{CatalogController, CatalogControllerInner};
use crate::controller::utils::{
get_actor_dispatchers, get_fragment_mappings, rebuild_fragment_mapping_from_actors,
Expand Down Expand Up @@ -1033,6 +1033,7 @@ impl CatalogController {
let mut actor_migration_plan = HashMap::new();
for (worker, fragment) in actor_locations {
if expired_workers.contains(&worker) {
info!("worker {} expired, migrating actors {:?}", worker, fragment);
for (_, actors) in fragment {
let worker_slot_to_actor: HashMap<_, _> = actors
.iter()
Expand Down

0 comments on commit ddc9600

Please sign in to comment.