Skip to content

Commit

Permalink
Stop marking ingesters as Unavailable in control plane (#4973)
Browse files Browse the repository at this point in the history
  • Loading branch information
guilload authored May 15, 2024
1 parent ffcfa1e commit a297a66
Show file tree
Hide file tree
Showing 15 changed files with 277 additions and 197 deletions.
81 changes: 39 additions & 42 deletions quickwit/quickwit-control-plane/src/control_plane.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.

use std::collections::BTreeSet;
use std::collections::{BTreeSet, HashMap};
use std::fmt;
use std::fmt::Formatter;
use std::time::Duration;
Expand Down Expand Up @@ -344,33 +344,30 @@ impl ControlPlane {
})
.unwrap_or_default();

let shard_table: Vec<JsonValue> = self
.model
.all_shards_with_source()
.map(|(source, shards)| {
let shards: Vec<JsonValue> = shards
.map(|shard| {
json!({
"shard_id": shard.shard_id.clone(),
"shard_state": shard.shard_state().as_json_str_name(),
"leader_id": shard.leader_id.clone(),
"follower_id": shard.follower_id.clone(),
"publish_position_inclusive": shard.publish_position_inclusive(),
})
})
.collect();
let mut per_index_shards_json: HashMap<IndexUid, Vec<JsonValue>> = HashMap::new();

for (source_uid, shard_entries) in self.model.all_shards_with_source() {
let index_uid = source_uid.index_uid.clone();
let source_id = source_uid.source_id.clone();
let shards_json = shard_entries.map(|shard_entry| {
json!({
"index_uid": source.index_uid.clone(),
"source_id": source.source_id.clone(),
"shards": shards,
"index_uid": index_uid,
"source_id": source_id,
"shard_id": shard_entry.shard_id.clone(),
"shard_state": shard_entry.shard_state().as_json_str_name(),
"leader_id": shard_entry.leader_id.clone(),
"follower_id": shard_entry.follower_id.clone(),
"publish_position_inclusive": shard_entry.publish_position_inclusive(),
})
})
.collect();

});
per_index_shards_json
.entry(index_uid.clone())
.or_default()
.extend(shards_json);
}
json!({
"physical_indexing_plan": physical_indexing_plan,
"shard_table": shard_table,
"shard_table": per_index_shards_json,
})
}

Expand Down Expand Up @@ -1659,7 +1656,8 @@ mod tests {
assert_eq!(indexing_tasks[0].shard_ids, [ShardId::from(17)]);

let control_plane_debug_info = control_plane_mailbox.ask(GetDebugInfo).await.unwrap();
let shard = &control_plane_debug_info["shard_table"][0]["shards"][0];
let shard =
&control_plane_debug_info["shard_table"]["test-index-0:00000000000000000000000000"][0];
assert_eq!(shard["shard_id"], "00000000000000000017");
assert_eq!(shard["publish_position_inclusive"], "00000000000000001000");

Expand Down Expand Up @@ -1714,23 +1712,23 @@ mod tests {
let ingester_pool = IngesterPool::default();
let mut mock_metastore = MockMetastoreService::new();

let mut index_0 = IndexMetadata::for_test("test-index-0", "ram:///test-index-0");
let mut source = SourceConfig::ingest_v2();
source.enabled = true;
index_0.add_source(source.clone()).unwrap();
let mut index_metadata = IndexMetadata::for_test("test-index", "ram:///test-index");
let mut source_config = SourceConfig::ingest_v2();
source_config.enabled = true;
index_metadata.add_source(source_config.clone()).unwrap();

let index_0_clone = index_0.clone();
let index_metadata_clone = index_metadata.clone();
mock_metastore.expect_list_indexes_metadata().return_once(
move |list_indexes_request: ListIndexesMetadataRequest| {
assert_eq!(list_indexes_request, ListIndexesMetadataRequest::all());
Ok(ListIndexesMetadataResponse::for_test(vec![
index_0_clone.clone()
index_metadata_clone,
]))
},
);

let mut shard = Shard {
index_uid: Some(index_0.index_uid.clone()),
index_uid: Some(index_metadata.index_uid.clone()),
source_id: INGEST_V2_SOURCE_ID.to_string(),
shard_id: Some(ShardId::from(17)),
leader_id: "test_node".to_string(),
Expand All @@ -1739,7 +1737,7 @@ mod tests {
};
shard.set_shard_state(ShardState::Open);

let index_uid_clone = index_0.index_uid.clone();
let index_uid_clone = index_metadata.index_uid.clone();
mock_metastore.expect_list_shards().return_once(
move |_list_shards_request: ListShardsRequest| {
let list_shards_resp = ListShardsResponse {
Expand All @@ -1765,7 +1763,8 @@ mod tests {
MetastoreServiceClient::from_mock(mock_metastore),
);
let control_plane_debug_info = control_plane_mailbox.ask(GetDebugInfo).await.unwrap();
let shard = &control_plane_debug_info["shard_table"][0]["shards"][0];
let shard =
&control_plane_debug_info["shard_table"]["test-index:00000000000000000000000000"][0];
assert_eq!(shard["shard_id"], "00000000000000000017");
assert_eq!(shard["publish_position_inclusive"], "00000000000000001234");

Expand Down Expand Up @@ -2393,7 +2392,9 @@ mod tests {
control_plane_mailbox.ask(callback).await.unwrap();

let control_plane_debug_info = control_plane_mailbox.ask(GetDebugInfo).await.unwrap();
let shard = &control_plane_debug_info["shard_table"][0]["shards"][0];
println!("{:?}", control_plane_debug_info);
let shard =
&control_plane_debug_info["shard_table"]["test-index:00000000000000000000000000"][0];
assert_eq!(shard["shard_id"], "00000000000000000000");
assert_eq!(shard["shard_state"], "closed");

Expand Down Expand Up @@ -2529,14 +2530,10 @@ mod tests {
control_plane_debug_info["physical_indexing_plan"][0]["node_id"],
"test-ingester"
);
let shard_table_entry = &control_plane_debug_info["shard_table"][0];
assert_eq!(
shard_table_entry["index_uid"],
"test-index:00000000000000000000000000"
);
assert_eq!(shard_table_entry["source_id"], INGEST_V2_SOURCE_ID);

let shard = &shard_table_entry["shards"][0];
let shard =
&control_plane_debug_info["shard_table"]["test-index:00000000000000000000000000"][0];
assert_eq!(shard["index_uid"], "test-index:00000000000000000000000000");
assert_eq!(shard["source_id"], INGEST_V2_SOURCE_ID);
assert_eq!(shard["shard_id"], "00000000000000000000");
assert_eq!(shard["shard_state"], "open");
assert_eq!(shard["leader_id"], "test-ingester");
Expand Down
98 changes: 0 additions & 98 deletions quickwit/quickwit-control-plane/src/ingest/ingest_controller.rs
Original file line number Diff line number Diff line change
Expand Up @@ -245,26 +245,6 @@ impl IngestController {
}
}

fn handle_unavailable_leaders(
&self,
unavailable_leaders: &FnvHashSet<NodeId>,
model: &mut ControlPlaneModel,
) {
let mut confirmed_unavailable_leaders = FnvHashSet::default();

for leader_id in unavailable_leaders {
if !self.ingester_pool.contains_key(leader_id) {
confirmed_unavailable_leaders.insert(leader_id.clone());
} else {
// TODO: If a majority of ingesters consistenly reports a leader as unavailable, we
// should probably mark it as unavailable too.
}
}
if !confirmed_unavailable_leaders.is_empty() {
model.set_shards_as_unavailable(&confirmed_unavailable_leaders);
}
}

/// Finds the open shards that satisfies the [`GetOrCreateOpenShardsRequest`] request sent by an
/// ingest router. First, the control plane checks its internal shard table to find
/// candidates. If it does not contain any, the control plane will ask
Expand All @@ -283,8 +263,6 @@ impl IngestController {
.map(|ingester_id| ingester_id.into())
.collect();

self.handle_unavailable_leaders(&unavailable_leaders, model);

let num_subrequests = get_open_shards_request.subrequests.len();
let mut get_or_create_open_shards_successes = Vec::with_capacity(num_subrequests);
let mut get_or_create_open_shards_failures = Vec::new();
Expand Down Expand Up @@ -1297,82 +1275,6 @@ mod tests {
assert!(shard_1.is_closed());
}

#[tokio::test]
async fn test_ingest_controller_get_open_shards_handles_unavailable_leaders() {
let metastore = MetastoreServiceClient::mocked();

let ingester_pool = IngesterPool::default();
let ingester_1 = IngesterServiceClient::mocked();
ingester_pool.insert("test-ingester-1".into(), ingester_1);

let replication_factor = 2;

let mut ingest_controller =
IngestController::new(metastore, ingester_pool.clone(), replication_factor);
let mut model = ControlPlaneModel::default();

let index_uid = IndexUid::for_test("test-index-0", 0);
let source_id: SourceId = "test-source".into();

let shards = vec![
Shard {
index_uid: Some(index_uid.clone()),
source_id: source_id.clone(),
shard_id: Some(ShardId::from(1)),
leader_id: "test-ingester-0".to_string(),
shard_state: ShardState::Open as i32,
..Default::default()
},
Shard {
index_uid: Some(index_uid.clone()),
source_id: source_id.clone(),
shard_id: Some(ShardId::from(2)),
leader_id: "test-ingester-0".to_string(),
shard_state: ShardState::Closed as i32,
..Default::default()
},
Shard {
index_uid: Some(index_uid.clone()),
source_id: source_id.clone(),
shard_id: Some(ShardId::from(3)),
leader_id: "test-ingester-1".to_string(),
shard_state: ShardState::Open as i32,
..Default::default()
},
];
model.insert_shards(&index_uid, &source_id, shards);

let request = GetOrCreateOpenShardsRequest {
subrequests: Vec::new(),
closed_shards: Vec::new(),
unavailable_leaders: vec!["test-ingester-0".to_string()],
};
let progress = Progress::default();

ingest_controller
.get_or_create_open_shards(request, &mut model, &progress)
.await
.unwrap();

let shard_1 = model
.all_shards()
.find(|shard| shard.shard_id() == ShardId::from(1))
.unwrap();
assert!(shard_1.is_unavailable());

let shard_2 = model
.all_shards()
.find(|shard| shard.shard_id() == ShardId::from(2))
.unwrap();
assert!(shard_2.is_closed());

let shard_3 = model
.all_shards()
.find(|shard| shard.shard_id() == ShardId::from(3))
.unwrap();
assert!(shard_3.is_open());
}

#[test]
fn test_ingest_controller_allocate_shards() {
let metastore = MetastoreServiceClient::mocked();
Expand Down
5 changes: 0 additions & 5 deletions quickwit/quickwit-control-plane/src/model/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -259,11 +259,6 @@ impl ControlPlaneModel {
Ok(has_changed)
}

pub(crate) fn set_shards_as_unavailable(&mut self, unavailable_leaders: &FnvHashSet<NodeId>) {
self.shard_table
.set_shards_as_unavailable(unavailable_leaders);
}

pub(crate) fn all_shards(&self) -> impl Iterator<Item = &ShardEntry> + '_ {
self.shard_table.all_shards()
}
Expand Down
19 changes: 0 additions & 19 deletions quickwit/quickwit-control-plane/src/model/shard_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -323,25 +323,6 @@ impl ShardTable {
.map(|(source, shard_table)| (source, shard_table.shard_entries.values()))
}

pub(crate) fn set_shards_as_unavailable(&mut self, unavailable_leaders: &FnvHashSet<NodeId>) {
for (source_uid, shard_table_entry) in &mut self.table_entries {
let mut modified = false;
for shard_entry in shard_table_entry.shard_entries.values_mut() {
if shard_entry.is_open() && unavailable_leaders.contains(&shard_entry.leader_id) {
shard_entry.set_shard_state(ShardState::Unavailable);
modified = true;
}
}
if modified {
let num_open_shards = shard_table_entry.num_open_shards();
crate::metrics::CONTROL_PLANE_METRICS
.open_shards_total
.with_label_values([source_uid.index_uid.index_id.as_str()])
.set(num_open_shards as i64);
};
}
}

/// Lists the shards of a given source. Returns `None` if the source does not exist.
pub fn get_shards(&self, source_uid: &SourceUid) -> Option<&FnvHashMap<ShardId, ShardEntry>> {
self.table_entries
Expand Down
3 changes: 0 additions & 3 deletions quickwit/quickwit-indexing/src/actors/merge_pipeline.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,9 +55,6 @@ use crate::split_store::IndexingSplitStore;
/// concurrently.
static SPAWN_PIPELINE_SEMAPHORE: Semaphore = Semaphore::const_new(10);

#[derive(Debug)]
struct ObserveLoop;

struct MergePipelineHandles {
merge_planner: ActorHandle<MergePlanner>,
merge_split_downloader: ActorHandle<MergeSplitDownloader>,
Expand Down
13 changes: 0 additions & 13 deletions quickwit/quickwit-indexing/src/models/indexing_service_message.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,19 +30,6 @@ pub struct SpawnPipeline {
pub pipeline_uid: PipelineUid,
}

#[derive(Clone, Debug)]
pub struct ShutdownPipelines {
pub index_id: String,
pub source_id: Option<String>,
// TODO
// pub pipeline_ord: Option<usize>,
}

#[derive(Clone, Debug)]
pub struct ShutdownPipeline {
pub pipeline_id: IndexingPipelineId,
}

/// Detaches a pipeline from the indexing service. The pipeline is no longer managed by the
/// server. This is mostly useful for ad-hoc indexing pipelines launched with `quickwit index
/// ingest ..` and testing.
Expand Down
5 changes: 3 additions & 2 deletions quickwit/quickwit-ingest/src/ingest_v2/broadcast.rs
Original file line number Diff line number Diff line change
Expand Up @@ -185,11 +185,12 @@ impl BroadcastLocalShardsTask {
for (queue_id, shard_state) in queue_ids {
let Some((_rate_limiter, rate_meter)) = state_guard.rate_trackers.get_mut(&queue_id)
else {
warn!("rate limiter `{queue_id}` not found",);
warn!(
"rate limiter `{queue_id}` not found: this should never happen, please report"
);
continue;
};
let Some((index_uid, source_id, shard_id)) = split_queue_id(&queue_id) else {
warn!("failed to parse queue ID `{queue_id}`");
continue;
};
let source_uid = SourceUid {
Expand Down
Loading

0 comments on commit a297a66

Please sign in to comment.