Skip to content

Commit

Permalink
WIP
Browse files Browse the repository at this point in the history
  • Loading branch information
guilload committed May 12, 2024
1 parent c5bfe5c commit 129ea7d
Show file tree
Hide file tree
Showing 14 changed files with 130 additions and 182 deletions.
43 changes: 20 additions & 23 deletions quickwit/quickwit-control-plane/src/control_plane.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.

use std::collections::BTreeSet;
use std::collections::{BTreeSet, HashMap};
use std::fmt;
use std::fmt::Formatter;
use std::time::Duration;
Expand Down Expand Up @@ -344,33 +344,30 @@ impl ControlPlane {
})
.unwrap_or_default();

let shard_table: Vec<JsonValue> = self
.model
.all_shards_with_source()
.map(|(source, shards)| {
let shards: Vec<JsonValue> = shards
.map(|shard| {
json!({
"shard_id": shard.shard_id.clone(),
"shard_state": shard.shard_state().as_json_str_name(),
"leader_id": shard.leader_id.clone(),
"follower_id": shard.follower_id.clone(),
"publish_position_inclusive": shard.publish_position_inclusive(),
})
})
.collect();
let mut per_index_shards_json: HashMap<IndexUid, Vec<JsonValue>> = HashMap::new();

for (source_uid, shard_entries) in self.model.all_shards_with_source() {
let index_uid = source_uid.index_uid.clone();
let source_id = source_uid.source_id.clone();
let shards_json = shard_entries.map(|shard_entry| {
json!({
"index_uid": source.index_uid.clone(),
"source_id": source.source_id.clone(),
"shards": shards,
"index_uid": index_uid,
"source_id": source_id,
"shard_id": shard_entry.shard_id.clone(),
"shard_state": shard_entry.shard_state().as_json_str_name(),
"leader_id": shard_entry.leader_id.clone(),
"follower_id": shard_entry.follower_id.clone(),
"publish_position_inclusive": shard_entry.publish_position_inclusive(),
})
})
.collect();

});
per_index_shards_json
.entry(index_uid.clone())
.or_default()
.extend(shards_json);
}
json!({
"physical_indexing_plan": physical_indexing_plan,
"shard_table": shard_table,
"shard_table": per_index_shards_json,
})
}

Expand Down
98 changes: 0 additions & 98 deletions quickwit/quickwit-control-plane/src/ingest/ingest_controller.rs
Original file line number Diff line number Diff line change
Expand Up @@ -245,26 +245,6 @@ impl IngestController {
}
}

fn handle_unavailable_leaders(
&self,
unavailable_leaders: &FnvHashSet<NodeId>,
model: &mut ControlPlaneModel,
) {
let mut confirmed_unavailable_leaders = FnvHashSet::default();

for leader_id in unavailable_leaders {
if !self.ingester_pool.contains_key(leader_id) {
confirmed_unavailable_leaders.insert(leader_id.clone());
} else {
// TODO: If a majority of ingesters consistenly reports a leader as unavailable, we
// should probably mark it as unavailable too.
}
}
if !confirmed_unavailable_leaders.is_empty() {
model.set_shards_as_unavailable(&confirmed_unavailable_leaders);
}
}

/// Finds the open shards that satisfies the [`GetOrCreateOpenShardsRequest`] request sent by an
/// ingest router. First, the control plane checks its internal shard table to find
/// candidates. If it does not contain any, the control plane will ask
Expand All @@ -283,8 +263,6 @@ impl IngestController {
.map(|ingester_id| ingester_id.into())
.collect();

self.handle_unavailable_leaders(&unavailable_leaders, model);

let num_subrequests = get_open_shards_request.subrequests.len();
let mut get_or_create_open_shards_successes = Vec::with_capacity(num_subrequests);
let mut get_or_create_open_shards_failures = Vec::new();
Expand Down Expand Up @@ -1297,82 +1275,6 @@ mod tests {
assert!(shard_1.is_closed());
}

#[tokio::test]
async fn test_ingest_controller_get_open_shards_handles_unavailable_leaders() {
let metastore = MetastoreServiceClient::mocked();

let ingester_pool = IngesterPool::default();
let ingester_1 = IngesterServiceClient::mocked();
ingester_pool.insert("test-ingester-1".into(), ingester_1);

let replication_factor = 2;

let mut ingest_controller =
IngestController::new(metastore, ingester_pool.clone(), replication_factor);
let mut model = ControlPlaneModel::default();

let index_uid = IndexUid::for_test("test-index-0", 0);
let source_id: SourceId = "test-source".into();

let shards = vec![
Shard {
index_uid: Some(index_uid.clone()),
source_id: source_id.clone(),
shard_id: Some(ShardId::from(1)),
leader_id: "test-ingester-0".to_string(),
shard_state: ShardState::Open as i32,
..Default::default()
},
Shard {
index_uid: Some(index_uid.clone()),
source_id: source_id.clone(),
shard_id: Some(ShardId::from(2)),
leader_id: "test-ingester-0".to_string(),
shard_state: ShardState::Closed as i32,
..Default::default()
},
Shard {
index_uid: Some(index_uid.clone()),
source_id: source_id.clone(),
shard_id: Some(ShardId::from(3)),
leader_id: "test-ingester-1".to_string(),
shard_state: ShardState::Open as i32,
..Default::default()
},
];
model.insert_shards(&index_uid, &source_id, shards);

let request = GetOrCreateOpenShardsRequest {
subrequests: Vec::new(),
closed_shards: Vec::new(),
unavailable_leaders: vec!["test-ingester-0".to_string()],
};
let progress = Progress::default();

ingest_controller
.get_or_create_open_shards(request, &mut model, &progress)
.await
.unwrap();

let shard_1 = model
.all_shards()
.find(|shard| shard.shard_id() == ShardId::from(1))
.unwrap();
assert!(shard_1.is_unavailable());

let shard_2 = model
.all_shards()
.find(|shard| shard.shard_id() == ShardId::from(2))
.unwrap();
assert!(shard_2.is_closed());

let shard_3 = model
.all_shards()
.find(|shard| shard.shard_id() == ShardId::from(3))
.unwrap();
assert!(shard_3.is_open());
}

#[test]
fn test_ingest_controller_allocate_shards() {
let metastore = MetastoreServiceClient::mocked();
Expand Down
5 changes: 0 additions & 5 deletions quickwit/quickwit-control-plane/src/model/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -259,11 +259,6 @@ impl ControlPlaneModel {
Ok(has_changed)
}

pub(crate) fn set_shards_as_unavailable(&mut self, unavailable_leaders: &FnvHashSet<NodeId>) {
self.shard_table
.set_shards_as_unavailable(unavailable_leaders);
}

pub(crate) fn all_shards(&self) -> impl Iterator<Item = &ShardEntry> + '_ {
self.shard_table.all_shards()
}
Expand Down
19 changes: 0 additions & 19 deletions quickwit/quickwit-control-plane/src/model/shard_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -323,25 +323,6 @@ impl ShardTable {
.map(|(source, shard_table)| (source, shard_table.shard_entries.values()))
}

pub(crate) fn set_shards_as_unavailable(&mut self, unavailable_leaders: &FnvHashSet<NodeId>) {
for (source_uid, shard_table_entry) in &mut self.table_entries {
let mut modified = false;
for shard_entry in shard_table_entry.shard_entries.values_mut() {
if shard_entry.is_open() && unavailable_leaders.contains(&shard_entry.leader_id) {
shard_entry.set_shard_state(ShardState::Unavailable);
modified = true;
}
}
if modified {
let num_open_shards = shard_table_entry.num_open_shards();
crate::metrics::CONTROL_PLANE_METRICS
.open_shards_total
.with_label_values([source_uid.index_uid.index_id.as_str()])
.set(num_open_shards as i64);
};
}
}

/// Lists the shards of a given source. Returns `None` if the source does not exist.
pub fn get_shards(&self, source_uid: &SourceUid) -> Option<&FnvHashMap<ShardId, ShardEntry>> {
self.table_entries
Expand Down
3 changes: 0 additions & 3 deletions quickwit/quickwit-indexing/src/actors/merge_pipeline.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,9 +55,6 @@ use crate::split_store::IndexingSplitStore;
/// concurrently.
static SPAWN_PIPELINE_SEMAPHORE: Semaphore = Semaphore::const_new(10);

#[derive(Debug)]
struct ObserveLoop;

struct MergePipelineHandles {
merge_planner: ActorHandle<MergePlanner>,
merge_split_downloader: ActorHandle<MergeSplitDownloader>,
Expand Down
13 changes: 0 additions & 13 deletions quickwit/quickwit-indexing/src/models/indexing_service_message.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,19 +30,6 @@ pub struct SpawnPipeline {
pub pipeline_uid: PipelineUid,
}

#[derive(Clone, Debug)]
pub struct ShutdownPipelines {
pub index_id: String,
pub source_id: Option<String>,
// TODO
// pub pipeline_ord: Option<usize>,
}

#[derive(Clone, Debug)]
pub struct ShutdownPipeline {
pub pipeline_id: IndexingPipelineId,
}

/// Detaches a pipeline from the indexing service. The pipeline is no longer managed by the
/// server. This is mostly useful for ad-hoc indexing pipelines launched with `quickwit index
/// ingest ..` and testing.
Expand Down
5 changes: 3 additions & 2 deletions quickwit/quickwit-ingest/src/ingest_v2/broadcast.rs
Original file line number Diff line number Diff line change
Expand Up @@ -185,11 +185,12 @@ impl BroadcastLocalShardsTask {
for (queue_id, shard_state) in queue_ids {
let Some((_rate_limiter, rate_meter)) = state_guard.rate_trackers.get_mut(&queue_id)
else {
warn!("rate limiter `{queue_id}` not found",);
warn!(
"rate limiter `{queue_id}` not found: this should never happen, please report"
);
continue;
};
let Some((index_uid, source_id, shard_id)) = split_queue_id(&queue_id) else {
warn!("failed to parse queue ID `{queue_id}`");
continue;
};
let source_uid = SourceUid {
Expand Down
39 changes: 37 additions & 2 deletions quickwit/quickwit-ingest/src/ingest_v2/ingester.rs
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ use super::replication::{
use super::state::{IngesterState, InnerIngesterState, WeakIngesterState};
use super::IngesterPool;
use crate::ingest_v2::metrics::report_wal_usage;
use crate::ingest_v2::models::IngesterShardType;
use crate::metrics::INGEST_METRICS;
use crate::mrecordlog_async::MultiRecordLogAsync;
use crate::{estimate_size, with_lock_metrics, FollowerId};
Expand Down Expand Up @@ -287,7 +288,6 @@ impl Ingester {

for queue_id in state_guard.mrecordlog.list_queues() {
let Some((index_uid, source_id, shard_id)) = split_queue_id(queue_id) else {
warn!("failed to parse queue ID `{queue_id}`");
continue;
};
per_source_shard_ids
Expand Down Expand Up @@ -1012,9 +1012,44 @@ impl Ingester {
})
}
};
let mut per_index_shards_json: HashMap<IndexUid, Vec<JsonValue>> = HashMap::new();

for (queue_id, shard) in &state_guard.shards {
let Some((index_uid, source_id, shard_id)) = split_queue_id(queue_id) else {
continue;
};
let mut shard_json = json!({
"index_uid": index_uid,
"source_id": source_id,
"shard_id": shard_id,
"state": shard.shard_state.as_json_str_name(),
"replication_position_inclusive": shard.replication_position_inclusive,
"truncation_position_inclusive": shard.truncation_position_inclusive,
});
match &shard.shard_type {
IngesterShardType::Primary { follower_id } => {
shard_json["type"] = json!("primary".to_string());
shard_json["leader_id"] = json!(self.self_node_id.to_string());
shard_json["follower_id"] = json!(follower_id.to_string());
}
IngesterShardType::Replica { leader_id } => {
shard_json["type"] = json!("replica".to_string());
shard_json["leader_id"] = json!(leader_id.to_string());
shard_json["follower_id"] = json!(self.self_node_id.to_string());
}
IngesterShardType::Solo => {
shard_json["type"] = json!("solo".to_string());
shard_json["leader_id"] = json!(self.self_node_id.to_string());
}
};
per_index_shards_json
.entry(index_uid.clone())
.or_default()
.push(shard_json);
}
json!({
"status": state_guard.status().as_json_str_name(),
"shards": state_guard.shards.keys().collect::<Vec<_>>(), // TODO: add more info
"shards": per_index_shards_json,
"mrecordlog": state_guard.mrecordlog.summary(),
})
}
Expand Down
10 changes: 10 additions & 0 deletions quickwit/quickwit-ingest/src/ingest_v2/router.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ use quickwit_proto::ingest::ingester::{
use quickwit_proto::ingest::router::{IngestRequestV2, IngestResponseV2, IngestRouterService};
use quickwit_proto::ingest::{CommitTypeV2, IngestV2Error, IngestV2Result, ShardState};
use quickwit_proto::types::{IndexUid, NodeId, ShardId, SourceId, SubrequestId};
use serde_json::{json, Value as JsonValue};
use tokio::sync::{Mutex, Semaphore};
use tracing::info;

Expand Down Expand Up @@ -441,6 +442,15 @@ impl IngestRouter {
IngestV2Error::Timeout(message)
})?
}

pub async fn debug_info(&self) -> JsonValue {
let state_guard = self.state.lock().await;
let routing_table_json = state_guard.routing_table.debug_info();

json!({
"routing_table": routing_table_json,
})
}
}

#[async_trait]
Expand Down
25 changes: 25 additions & 0 deletions quickwit/quickwit-ingest/src/ingest_v2/routing_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ use std::sync::atomic::{AtomicUsize, Ordering};

use quickwit_proto::ingest::{Shard, ShardIds, ShardState};
use quickwit_proto::types::{IndexId, IndexUid, NodeId, ShardId, SourceId};
use serde_json::{json, Value as JsonValue};
use tracing::{info, warn};

use crate::IngesterPool;
Expand Down Expand Up @@ -452,6 +453,30 @@ impl RoutingTable {
}
}

pub fn debug_info(&self) -> HashMap<IndexId, Vec<JsonValue>> {
let mut per_index_shards_json: HashMap<IndexId, Vec<JsonValue>> = HashMap::new();

for ((index_id, source_id), entry) in &self.table {
for (shards, is_local) in &[(&entry.local_shards, true), (&entry.remote_shards, false)]
{
let shards_json = shards.iter().map(|shard| {
json!({
"index_uid": shard.index_uid,
"source_id": source_id,
"shard_id": shard.shard_id,
"shard_state": shard.shard_state.as_json_str_name(),
"is_local": is_local,
})
});
per_index_shards_json
.entry(index_id.clone())
.or_default()
.extend(shards_json);
}
}
per_index_shards_json
}

#[cfg(test)]
pub fn len(&self) -> usize {
self.table.len()
Expand Down
Loading

0 comments on commit 129ea7d

Please sign in to comment.