diff --git a/quickwit/quickwit-control-plane/src/control_plane.rs b/quickwit/quickwit-control-plane/src/control_plane.rs
index 61232a76fed..6e577159a7e 100644
--- a/quickwit/quickwit-control-plane/src/control_plane.rs
+++ b/quickwit/quickwit-control-plane/src/control_plane.rs
@@ -17,7 +17,7 @@
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see .
-use std::collections::BTreeSet;
+use std::collections::{BTreeSet, HashMap};
use std::fmt;
use std::fmt::Formatter;
use std::time::Duration;
@@ -344,33 +344,30 @@ impl ControlPlane {
})
.unwrap_or_default();
- let shard_table: Vec = self
- .model
- .all_shards_with_source()
- .map(|(source, shards)| {
- let shards: Vec = shards
- .map(|shard| {
- json!({
- "shard_id": shard.shard_id.clone(),
- "shard_state": shard.shard_state().as_json_str_name(),
- "leader_id": shard.leader_id.clone(),
- "follower_id": shard.follower_id.clone(),
- "publish_position_inclusive": shard.publish_position_inclusive(),
- })
- })
- .collect();
+ let mut per_index_shards_json: HashMap> = HashMap::new();
+ for (source_uid, shard_entries) in self.model.all_shards_with_source() {
+ let index_uid = source_uid.index_uid.clone();
+ let source_id = source_uid.source_id.clone();
+ let shards_json = shard_entries.map(|shard_entry| {
json!({
- "index_uid": source.index_uid.clone(),
- "source_id": source.source_id.clone(),
- "shards": shards,
+ "index_uid": index_uid,
+ "source_id": source_id,
+ "shard_id": shard_entry.shard_id.clone(),
+ "shard_state": shard_entry.shard_state().as_json_str_name(),
+ "leader_id": shard_entry.leader_id.clone(),
+ "follower_id": shard_entry.follower_id.clone(),
+ "publish_position_inclusive": shard_entry.publish_position_inclusive(),
})
- })
- .collect();
-
+ });
+ per_index_shards_json
+ .entry(index_uid.clone())
+ .or_default()
+ .extend(shards_json);
+ }
json!({
"physical_indexing_plan": physical_indexing_plan,
- "shard_table": shard_table,
+ "shard_table": per_index_shards_json,
})
}
@@ -1659,7 +1656,8 @@ mod tests {
assert_eq!(indexing_tasks[0].shard_ids, [ShardId::from(17)]);
let control_plane_debug_info = control_plane_mailbox.ask(GetDebugInfo).await.unwrap();
- let shard = &control_plane_debug_info["shard_table"][0]["shards"][0];
+ let shard =
+ &control_plane_debug_info["shard_table"]["test-index-0:00000000000000000000000000"][0];
assert_eq!(shard["shard_id"], "00000000000000000017");
assert_eq!(shard["publish_position_inclusive"], "00000000000000001000");
@@ -1714,23 +1712,23 @@ mod tests {
let ingester_pool = IngesterPool::default();
let mut mock_metastore = MockMetastoreService::new();
- let mut index_0 = IndexMetadata::for_test("test-index-0", "ram:///test-index-0");
- let mut source = SourceConfig::ingest_v2();
- source.enabled = true;
- index_0.add_source(source.clone()).unwrap();
+ let mut index_metadata = IndexMetadata::for_test("test-index", "ram:///test-index");
+ let mut source_config = SourceConfig::ingest_v2();
+ source_config.enabled = true;
+ index_metadata.add_source(source_config.clone()).unwrap();
- let index_0_clone = index_0.clone();
+ let index_metadata_clone = index_metadata.clone();
mock_metastore.expect_list_indexes_metadata().return_once(
move |list_indexes_request: ListIndexesMetadataRequest| {
assert_eq!(list_indexes_request, ListIndexesMetadataRequest::all());
Ok(ListIndexesMetadataResponse::for_test(vec![
- index_0_clone.clone()
+ index_metadata_clone,
]))
},
);
let mut shard = Shard {
- index_uid: Some(index_0.index_uid.clone()),
+ index_uid: Some(index_metadata.index_uid.clone()),
source_id: INGEST_V2_SOURCE_ID.to_string(),
shard_id: Some(ShardId::from(17)),
leader_id: "test_node".to_string(),
@@ -1739,7 +1737,7 @@ mod tests {
};
shard.set_shard_state(ShardState::Open);
- let index_uid_clone = index_0.index_uid.clone();
+ let index_uid_clone = index_metadata.index_uid.clone();
mock_metastore.expect_list_shards().return_once(
move |_list_shards_request: ListShardsRequest| {
let list_shards_resp = ListShardsResponse {
@@ -1765,7 +1763,8 @@ mod tests {
MetastoreServiceClient::from_mock(mock_metastore),
);
let control_plane_debug_info = control_plane_mailbox.ask(GetDebugInfo).await.unwrap();
- let shard = &control_plane_debug_info["shard_table"][0]["shards"][0];
+ let shard =
+ &control_plane_debug_info["shard_table"]["test-index:00000000000000000000000000"][0];
assert_eq!(shard["shard_id"], "00000000000000000017");
assert_eq!(shard["publish_position_inclusive"], "00000000000000001234");
@@ -2393,7 +2392,9 @@ mod tests {
control_plane_mailbox.ask(callback).await.unwrap();
let control_plane_debug_info = control_plane_mailbox.ask(GetDebugInfo).await.unwrap();
- let shard = &control_plane_debug_info["shard_table"][0]["shards"][0];
+ println!("{:?}", control_plane_debug_info);
+ let shard =
+ &control_plane_debug_info["shard_table"]["test-index:00000000000000000000000000"][0];
assert_eq!(shard["shard_id"], "00000000000000000000");
assert_eq!(shard["shard_state"], "closed");
@@ -2529,14 +2530,10 @@ mod tests {
control_plane_debug_info["physical_indexing_plan"][0]["node_id"],
"test-ingester"
);
- let shard_table_entry = &control_plane_debug_info["shard_table"][0];
- assert_eq!(
- shard_table_entry["index_uid"],
- "test-index:00000000000000000000000000"
- );
- assert_eq!(shard_table_entry["source_id"], INGEST_V2_SOURCE_ID);
-
- let shard = &shard_table_entry["shards"][0];
+ let shard =
+ &control_plane_debug_info["shard_table"]["test-index:00000000000000000000000000"][0];
+ assert_eq!(shard["index_uid"], "test-index:00000000000000000000000000");
+ assert_eq!(shard["source_id"], INGEST_V2_SOURCE_ID);
assert_eq!(shard["shard_id"], "00000000000000000000");
assert_eq!(shard["shard_state"], "open");
assert_eq!(shard["leader_id"], "test-ingester");
diff --git a/quickwit/quickwit-control-plane/src/ingest/ingest_controller.rs b/quickwit/quickwit-control-plane/src/ingest/ingest_controller.rs
index 21407cbf47c..40bd2f5eefb 100644
--- a/quickwit/quickwit-control-plane/src/ingest/ingest_controller.rs
+++ b/quickwit/quickwit-control-plane/src/ingest/ingest_controller.rs
@@ -245,26 +245,6 @@ impl IngestController {
}
}
- fn handle_unavailable_leaders(
- &self,
- unavailable_leaders: &FnvHashSet,
- model: &mut ControlPlaneModel,
- ) {
- let mut confirmed_unavailable_leaders = FnvHashSet::default();
-
- for leader_id in unavailable_leaders {
- if !self.ingester_pool.contains_key(leader_id) {
- confirmed_unavailable_leaders.insert(leader_id.clone());
- } else {
- // TODO: If a majority of ingesters consistenly reports a leader as unavailable, we
- // should probably mark it as unavailable too.
- }
- }
- if !confirmed_unavailable_leaders.is_empty() {
- model.set_shards_as_unavailable(&confirmed_unavailable_leaders);
- }
- }
-
/// Finds the open shards that satisfies the [`GetOrCreateOpenShardsRequest`] request sent by an
/// ingest router. First, the control plane checks its internal shard table to find
/// candidates. If it does not contain any, the control plane will ask
@@ -283,8 +263,6 @@ impl IngestController {
.map(|ingester_id| ingester_id.into())
.collect();
- self.handle_unavailable_leaders(&unavailable_leaders, model);
-
let num_subrequests = get_open_shards_request.subrequests.len();
let mut get_or_create_open_shards_successes = Vec::with_capacity(num_subrequests);
let mut get_or_create_open_shards_failures = Vec::new();
@@ -1297,82 +1275,6 @@ mod tests {
assert!(shard_1.is_closed());
}
- #[tokio::test]
- async fn test_ingest_controller_get_open_shards_handles_unavailable_leaders() {
- let metastore = MetastoreServiceClient::mocked();
-
- let ingester_pool = IngesterPool::default();
- let ingester_1 = IngesterServiceClient::mocked();
- ingester_pool.insert("test-ingester-1".into(), ingester_1);
-
- let replication_factor = 2;
-
- let mut ingest_controller =
- IngestController::new(metastore, ingester_pool.clone(), replication_factor);
- let mut model = ControlPlaneModel::default();
-
- let index_uid = IndexUid::for_test("test-index-0", 0);
- let source_id: SourceId = "test-source".into();
-
- let shards = vec![
- Shard {
- index_uid: Some(index_uid.clone()),
- source_id: source_id.clone(),
- shard_id: Some(ShardId::from(1)),
- leader_id: "test-ingester-0".to_string(),
- shard_state: ShardState::Open as i32,
- ..Default::default()
- },
- Shard {
- index_uid: Some(index_uid.clone()),
- source_id: source_id.clone(),
- shard_id: Some(ShardId::from(2)),
- leader_id: "test-ingester-0".to_string(),
- shard_state: ShardState::Closed as i32,
- ..Default::default()
- },
- Shard {
- index_uid: Some(index_uid.clone()),
- source_id: source_id.clone(),
- shard_id: Some(ShardId::from(3)),
- leader_id: "test-ingester-1".to_string(),
- shard_state: ShardState::Open as i32,
- ..Default::default()
- },
- ];
- model.insert_shards(&index_uid, &source_id, shards);
-
- let request = GetOrCreateOpenShardsRequest {
- subrequests: Vec::new(),
- closed_shards: Vec::new(),
- unavailable_leaders: vec!["test-ingester-0".to_string()],
- };
- let progress = Progress::default();
-
- ingest_controller
- .get_or_create_open_shards(request, &mut model, &progress)
- .await
- .unwrap();
-
- let shard_1 = model
- .all_shards()
- .find(|shard| shard.shard_id() == ShardId::from(1))
- .unwrap();
- assert!(shard_1.is_unavailable());
-
- let shard_2 = model
- .all_shards()
- .find(|shard| shard.shard_id() == ShardId::from(2))
- .unwrap();
- assert!(shard_2.is_closed());
-
- let shard_3 = model
- .all_shards()
- .find(|shard| shard.shard_id() == ShardId::from(3))
- .unwrap();
- assert!(shard_3.is_open());
- }
-
#[test]
fn test_ingest_controller_allocate_shards() {
let metastore = MetastoreServiceClient::mocked();
diff --git a/quickwit/quickwit-control-plane/src/model/mod.rs b/quickwit/quickwit-control-plane/src/model/mod.rs
index 4bf3d3ddd8c..229f9275558 100644
--- a/quickwit/quickwit-control-plane/src/model/mod.rs
+++ b/quickwit/quickwit-control-plane/src/model/mod.rs
@@ -259,11 +259,6 @@ impl ControlPlaneModel {
Ok(has_changed)
}
- pub(crate) fn set_shards_as_unavailable(&mut self, unavailable_leaders: &FnvHashSet) {
- self.shard_table
- .set_shards_as_unavailable(unavailable_leaders);
- }
-
pub(crate) fn all_shards(&self) -> impl Iterator- + '_ {
self.shard_table.all_shards()
}
diff --git a/quickwit/quickwit-control-plane/src/model/shard_table.rs b/quickwit/quickwit-control-plane/src/model/shard_table.rs
index 4f61f68cbc6..65395f63cac 100644
--- a/quickwit/quickwit-control-plane/src/model/shard_table.rs
+++ b/quickwit/quickwit-control-plane/src/model/shard_table.rs
@@ -323,25 +323,6 @@ impl ShardTable {
.map(|(source, shard_table)| (source, shard_table.shard_entries.values()))
}
- pub(crate) fn set_shards_as_unavailable(&mut self, unavailable_leaders: &FnvHashSet) {
- for (source_uid, shard_table_entry) in &mut self.table_entries {
- let mut modified = false;
- for shard_entry in shard_table_entry.shard_entries.values_mut() {
- if shard_entry.is_open() && unavailable_leaders.contains(&shard_entry.leader_id) {
- shard_entry.set_shard_state(ShardState::Unavailable);
- modified = true;
- }
- }
- if modified {
- let num_open_shards = shard_table_entry.num_open_shards();
- crate::metrics::CONTROL_PLANE_METRICS
- .open_shards_total
- .with_label_values([source_uid.index_uid.index_id.as_str()])
- .set(num_open_shards as i64);
- };
- }
- }
-
/// Lists the shards of a given source. Returns `None` if the source does not exist.
pub fn get_shards(&self, source_uid: &SourceUid) -> Option<&FnvHashMap> {
self.table_entries
diff --git a/quickwit/quickwit-indexing/src/actors/merge_pipeline.rs b/quickwit/quickwit-indexing/src/actors/merge_pipeline.rs
index 3f29c5d6c6a..75808e849c3 100644
--- a/quickwit/quickwit-indexing/src/actors/merge_pipeline.rs
+++ b/quickwit/quickwit-indexing/src/actors/merge_pipeline.rs
@@ -55,9 +55,6 @@ use crate::split_store::IndexingSplitStore;
/// concurrently.
static SPAWN_PIPELINE_SEMAPHORE: Semaphore = Semaphore::const_new(10);
-#[derive(Debug)]
-struct ObserveLoop;
-
struct MergePipelineHandles {
merge_planner: ActorHandle,
merge_split_downloader: ActorHandle,
diff --git a/quickwit/quickwit-indexing/src/models/indexing_service_message.rs b/quickwit/quickwit-indexing/src/models/indexing_service_message.rs
index cd005d235e5..6c76898574d 100644
--- a/quickwit/quickwit-indexing/src/models/indexing_service_message.rs
+++ b/quickwit/quickwit-indexing/src/models/indexing_service_message.rs
@@ -30,19 +30,6 @@ pub struct SpawnPipeline {
pub pipeline_uid: PipelineUid,
}
-#[derive(Clone, Debug)]
-pub struct ShutdownPipelines {
- pub index_id: String,
- pub source_id: Option,
- // TODO
- // pub pipeline_ord: Option,
-}
-
-#[derive(Clone, Debug)]
-pub struct ShutdownPipeline {
- pub pipeline_id: IndexingPipelineId,
-}
-
/// Detaches a pipeline from the indexing service. The pipeline is no longer managed by the
/// server. This is mostly useful for ad-hoc indexing pipelines launched with `quickwit index
/// ingest ..` and testing.
diff --git a/quickwit/quickwit-ingest/src/ingest_v2/broadcast.rs b/quickwit/quickwit-ingest/src/ingest_v2/broadcast.rs
index 19264cf3de3..8feaaa7aa3d 100644
--- a/quickwit/quickwit-ingest/src/ingest_v2/broadcast.rs
+++ b/quickwit/quickwit-ingest/src/ingest_v2/broadcast.rs
@@ -185,11 +185,12 @@ impl BroadcastLocalShardsTask {
for (queue_id, shard_state) in queue_ids {
let Some((_rate_limiter, rate_meter)) = state_guard.rate_trackers.get_mut(&queue_id)
else {
- warn!("rate limiter `{queue_id}` not found",);
+ warn!(
+ "rate limiter `{queue_id}` not found: this should never happen, please report"
+ );
continue;
};
let Some((index_uid, source_id, shard_id)) = split_queue_id(&queue_id) else {
- warn!("failed to parse queue ID `{queue_id}`");
continue;
};
let source_uid = SourceUid {
diff --git a/quickwit/quickwit-ingest/src/ingest_v2/ingester.rs b/quickwit/quickwit-ingest/src/ingest_v2/ingester.rs
index 2d26add62cc..bf4dbb7a146 100644
--- a/quickwit/quickwit-ingest/src/ingest_v2/ingester.rs
+++ b/quickwit/quickwit-ingest/src/ingest_v2/ingester.rs
@@ -79,6 +79,7 @@ use super::replication::{
use super::state::{IngesterState, InnerIngesterState, WeakIngesterState};
use super::IngesterPool;
use crate::ingest_v2::metrics::report_wal_usage;
+use crate::ingest_v2::models::IngesterShardType;
use crate::metrics::INGEST_METRICS;
use crate::mrecordlog_async::MultiRecordLogAsync;
use crate::{estimate_size, with_lock_metrics, FollowerId};
@@ -287,7 +288,6 @@ impl Ingester {
for queue_id in state_guard.mrecordlog.list_queues() {
let Some((index_uid, source_id, shard_id)) = split_queue_id(queue_id) else {
- warn!("failed to parse queue ID `{queue_id}`");
continue;
};
per_source_shard_ids
@@ -1012,9 +1012,44 @@ impl Ingester {
})
}
};
+ let mut per_index_shards_json: HashMap> = HashMap::new();
+
+ for (queue_id, shard) in &state_guard.shards {
+ let Some((index_uid, source_id, shard_id)) = split_queue_id(queue_id) else {
+ continue;
+ };
+ let mut shard_json = json!({
+ "index_uid": index_uid,
+ "source_id": source_id,
+ "shard_id": shard_id,
+ "state": shard.shard_state.as_json_str_name(),
+ "replication_position_inclusive": shard.replication_position_inclusive,
+ "truncation_position_inclusive": shard.truncation_position_inclusive,
+ });
+ match &shard.shard_type {
+ IngesterShardType::Primary { follower_id } => {
+ shard_json["type"] = json!("primary");
+ shard_json["leader_id"] = json!(self.self_node_id.to_string());
+ shard_json["follower_id"] = json!(follower_id.to_string());
+ }
+ IngesterShardType::Replica { leader_id } => {
+ shard_json["type"] = json!("replica");
+ shard_json["leader_id"] = json!(leader_id.to_string());
+ shard_json["follower_id"] = json!(self.self_node_id.to_string());
+ }
+ IngesterShardType::Solo => {
+ shard_json["type"] = json!("solo");
+ shard_json["leader_id"] = json!(self.self_node_id.to_string());
+ }
+ };
+ per_index_shards_json
+ .entry(index_uid.clone())
+ .or_default()
+ .push(shard_json);
+ }
json!({
"status": state_guard.status().as_json_str_name(),
- "shards": state_guard.shards.keys().collect::>(), // TODO: add more info
+ "shards": per_index_shards_json,
"mrecordlog": state_guard.mrecordlog.summary(),
})
}
@@ -3188,4 +3223,86 @@ mod tests {
.assert_is_open();
drop(state_guard);
}
+
+ #[tokio::test]
+ async fn test_ingester_debug_info() {
+ let (_ingester_ctx, ingester) = IngesterForTest::default().build().await;
+
+ let index_uid_0: IndexUid = IndexUid::for_test("test-index-0", 0);
+ let shard_01 = Shard {
+ index_uid: Some(index_uid_0.clone()),
+ source_id: "test-source".to_string(),
+ shard_id: Some(ShardId::from(1)),
+ shard_state: ShardState::Open as i32,
+ ..Default::default()
+ };
+ let shard_02 = Shard {
+ index_uid: Some(index_uid_0.clone()),
+ source_id: "test-source".to_string(),
+ shard_id: Some(ShardId::from(2)),
+ shard_state: ShardState::Closed as i32,
+ ..Default::default()
+ };
+ let index_uid_1: IndexUid = IndexUid::for_test("test-index-1", 0);
+ let shard_03 = Shard {
+ index_uid: Some(index_uid_1.clone()),
+ source_id: "test-source".to_string(),
+ shard_id: Some(ShardId::from(3)),
+ shard_state: ShardState::Closed as i32,
+ ..Default::default()
+ };
+
+ let mut state_guard = ingester.state.lock_fully().await.unwrap();
+ let now = Instant::now();
+
+ ingester
+ .init_primary_shard(
+ &mut state_guard.inner,
+ &mut state_guard.mrecordlog,
+ shard_01,
+ now,
+ )
+ .await
+ .unwrap();
+ ingester
+ .init_primary_shard(
+ &mut state_guard.inner,
+ &mut state_guard.mrecordlog,
+ shard_02,
+ now,
+ )
+ .await
+ .unwrap();
+ ingester
+ .init_primary_shard(
+ &mut state_guard.inner,
+ &mut state_guard.mrecordlog,
+ shard_03,
+ now,
+ )
+ .await
+ .unwrap();
+ drop(state_guard);
+
+ let debug_info = ingester.debug_info().await;
+ assert_eq!(debug_info["status"], "ready");
+
+ let shards = &debug_info["shards"];
+ assert_eq!(shards.as_object().unwrap().len(), 2);
+
+ assert_eq!(
+ shards["test-index-0:00000000000000000000000000"]
+ .as_array()
+ .unwrap()
+ .len(),
+ 2
+ );
+ assert_eq!(
+ shards["test-index-1:00000000000000000000000000"]
+ .as_array()
+ .unwrap()
+ .len(),
+ 1
+ );
+ }
}
diff --git a/quickwit/quickwit-ingest/src/ingest_v2/router.rs b/quickwit/quickwit-ingest/src/ingest_v2/router.rs
index ba78f4585db..fde9419c1e3 100644
--- a/quickwit/quickwit-ingest/src/ingest_v2/router.rs
+++ b/quickwit/quickwit-ingest/src/ingest_v2/router.rs
@@ -39,6 +39,7 @@ use quickwit_proto::ingest::ingester::{
use quickwit_proto::ingest::router::{IngestRequestV2, IngestResponseV2, IngestRouterService};
use quickwit_proto::ingest::{CommitTypeV2, IngestV2Error, IngestV2Result, ShardState};
use quickwit_proto::types::{IndexUid, NodeId, ShardId, SourceId, SubrequestId};
+use serde_json::{json, Value as JsonValue};
use tokio::sync::{Mutex, Semaphore};
use tracing::info;
@@ -451,6 +452,15 @@ impl IngestRouter {
IngestV2Error::Timeout(message)
})?
}
+
+ pub async fn debug_info(&self) -> JsonValue {
+ let state_guard = self.state.lock().await;
+ let routing_table_json = state_guard.routing_table.debug_info();
+
+ json!({
+ "routing_table": routing_table_json,
+ })
+ }
}
#[async_trait]
@@ -1682,4 +1692,52 @@ mod tests {
assert_eq!(shards[0].shard_id, ShardId::from(2));
drop(state_guard);
}
+
+ #[tokio::test]
+ async fn test_router_debug_info() {
+ let self_node_id = "test-router".into();
+ let control_plane = ControlPlaneServiceClient::from_mock(MockControlPlaneService::new());
+ let ingester_pool = IngesterPool::default();
+ let replication_factor = 1;
+ let router = IngestRouter::new(
+ self_node_id,
+ control_plane,
+ ingester_pool.clone(),
+ replication_factor,
+ );
+ let index_uid_0: IndexUid = IndexUid::for_test("test-index-0", 0);
+ let index_uid_1: IndexUid = IndexUid::for_test("test-index-1", 0);
+
+ let mut state_guard = router.state.lock().await;
+ state_guard.routing_table.replace_shards(
+ index_uid_0.clone(),
+ "test-source",
+ vec![Shard {
+ index_uid: Some(index_uid_0.clone()),
+ shard_id: Some(ShardId::from(1)),
+ shard_state: ShardState::Open as i32,
+ leader_id: "test-ingester".to_string(),
+ ..Default::default()
+ }],
+ );
+ state_guard.routing_table.replace_shards(
+ index_uid_1.clone(),
+ "test-source",
+ vec![Shard {
+ index_uid: Some(index_uid_1.clone()),
+ shard_id: Some(ShardId::from(2)),
+ shard_state: ShardState::Open as i32,
+ leader_id: "test-ingester".to_string(),
+ ..Default::default()
+ }],
+ );
+ drop(state_guard);
+
+ let debug_info = router.debug_info().await;
+ let routing_table = &debug_info["routing_table"];
+ assert_eq!(routing_table.as_object().unwrap().len(), 2);
+
+ assert_eq!(routing_table["test-index-0"].as_array().unwrap().len(), 1);
+ assert_eq!(routing_table["test-index-1"].as_array().unwrap().len(), 1);
+ }
}
diff --git a/quickwit/quickwit-ingest/src/ingest_v2/routing_table.rs b/quickwit/quickwit-ingest/src/ingest_v2/routing_table.rs
index 2ac9d12c75d..b2095b6647f 100644
--- a/quickwit/quickwit-ingest/src/ingest_v2/routing_table.rs
+++ b/quickwit/quickwit-ingest/src/ingest_v2/routing_table.rs
@@ -23,6 +23,7 @@ use std::sync::atomic::{AtomicUsize, Ordering};
use quickwit_proto::ingest::{Shard, ShardIds, ShardState};
use quickwit_proto::types::{IndexId, IndexUid, NodeId, ShardId, SourceId};
+use serde_json::{json, Value as JsonValue};
use tracing::{info, warn};
use crate::IngesterPool;
@@ -452,6 +453,30 @@ impl RoutingTable {
}
}
+ pub fn debug_info(&self) -> HashMap> {
+ let mut per_index_shards_json: HashMap> = HashMap::new();
+
+ for ((index_id, source_id), entry) in &self.table {
+ for (shards, is_local) in &[(&entry.local_shards, true), (&entry.remote_shards, false)]
+ {
+ let shards_json = shards.iter().map(|shard| {
+ json!({
+ "index_uid": shard.index_uid,
+ "source_id": source_id,
+ "shard_id": shard.shard_id,
+ "shard_state": shard.shard_state.as_json_str_name(),
+ "is_local": is_local,
+ })
+ });
+ per_index_shards_json
+ .entry(index_id.clone())
+ .or_default()
+ .extend(shards_json);
+ }
+ }
+ per_index_shards_json
+ }
+
#[cfg(test)]
pub fn len(&self) -> usize {
self.table.len()
diff --git a/quickwit/quickwit-proto/src/types/mod.rs b/quickwit/quickwit-proto/src/types/mod.rs
index 18e034e6285..f0d79d94939 100644
--- a/quickwit/quickwit-proto/src/types/mod.rs
+++ b/quickwit/quickwit-proto/src/types/mod.rs
@@ -25,6 +25,7 @@ use std::ops::Deref;
use std::str::FromStr;
use serde::{Deserialize, Serialize};
+use tracing::warn;
pub use ulid::Ulid;
mod index_uid;
@@ -56,6 +57,15 @@ pub fn queue_id(index_uid: &IndexUid, source_id: &str, shard_id: &ShardId) -> Qu
}
pub fn split_queue_id(queue_id: &str) -> Option<(IndexUid, SourceId, ShardId)> {
+ let parts_opt = split_queue_id_inner(queue_id);
+
+ if parts_opt.is_none() {
+ warn!("failed to parse queue ID `{queue_id}`: this should never happen, please report");
+ }
+ parts_opt
+}
+
+fn split_queue_id_inner(queue_id: &str) -> Option<(IndexUid, SourceId, ShardId)> {
let mut parts = queue_id.split('/');
let index_uid = parts.next()?;
let source_id = parts.next()?;
diff --git a/quickwit/quickwit-serve/src/developer_api/server.rs b/quickwit/quickwit-serve/src/developer_api/server.rs
index c6bd0de9d83..03791b1b4f9 100644
--- a/quickwit/quickwit-serve/src/developer_api/server.rs
+++ b/quickwit/quickwit-serve/src/developer_api/server.rs
@@ -29,7 +29,7 @@ use quickwit_cluster::Cluster;
use quickwit_config::service::QuickwitService;
use quickwit_config::NodeConfig;
use quickwit_control_plane::control_plane::{ControlPlane, GetDebugInfo};
-use quickwit_ingest::Ingester;
+use quickwit_ingest::{IngestRouter, Ingester};
use quickwit_proto::developer::{
DeveloperError, DeveloperResult, DeveloperService, GetDebugInfoRequest, GetDebugInfoResponse,
};
@@ -42,6 +42,7 @@ pub(crate) struct DeveloperApiServer {
node_config: Arc,
cluster: Cluster,
control_plane_mailbox_opt: Option>,
+ ingest_router_opt: Option,
ingester_opt: Option,
}
@@ -59,6 +60,7 @@ impl DeveloperApiServer {
node_config: services.node_config.clone(),
cluster: services.cluster.clone(),
control_plane_mailbox_opt: services.control_plane_server_opt.clone(),
+ ingest_router_opt: services.ingest_router_opt.clone(),
ingester_opt: services.ingester_opt.clone(),
}
}
@@ -100,6 +102,9 @@ impl DeveloperService for DeveloperApiServer {
};
}
}
+ if let Some(ingest_router) = &self.ingest_router_opt {
+ debug_info["ingest_router"] = ingest_router.debug_info().await;
+ }
if let Some(ingester) = &self.ingester_opt {
if roles.is_empty() || roles.contains(&QuickwitService::Indexer) {
debug_info["ingester"] = ingester.debug_info().await;
@@ -143,6 +148,7 @@ mod tests {
node_config,
cluster,
control_plane_mailbox_opt: None,
+ ingest_router_opt: None,
ingester_opt: None,
};
let request = GetDebugInfoRequest { roles: Vec::new() };
diff --git a/quickwit/quickwit-serve/src/grpc.rs b/quickwit/quickwit-serve/src/grpc.rs
index 12634a0b006..7a1c24691ee 100644
--- a/quickwit/quickwit-serve/src/grpc.rs
+++ b/quickwit/quickwit-serve/src/grpc.rs
@@ -91,11 +91,11 @@ pub(crate) async fn start_grpc_server(
.is_service_enabled(QuickwitService::Indexer)
{
enabled_grpc_services.insert("ingest-router");
- Some(
- services
- .ingest_router_service
- .as_grpc_service(max_message_size),
- )
+
+ let ingest_router_service = services
+ .ingest_router_service
+ .as_grpc_service(max_message_size);
+ Some(ingest_router_service)
} else {
None
};
diff --git a/quickwit/quickwit-serve/src/lib.rs b/quickwit/quickwit-serve/src/lib.rs
index 5e92984328c..f5235db275c 100644
--- a/quickwit/quickwit-serve/src/lib.rs
+++ b/quickwit/quickwit-serve/src/lib.rs
@@ -188,6 +188,7 @@ struct QuickwitServices {
// Ingest v1
pub ingest_service: IngestServiceClient,
// Ingest v2
+ pub ingest_router_opt: Option,
pub ingest_router_service: IngestRouterServiceClient,
ingester_opt: Option,
@@ -538,7 +539,7 @@ pub async fn serve_quickwit(
);
// Setup ingest service v2.
- let (ingest_router_service, ingester_opt) = setup_ingest_v2(
+ let (ingest_router, ingest_router_service, ingester_opt) = setup_ingest_v2(
&node_config,
&cluster,
&event_broker,
@@ -700,6 +701,7 @@ pub async fn serve_quickwit(
_report_splits_subscription_handle_opt: report_splits_subscription_handle_opt,
index_manager,
indexing_service_opt,
+ ingest_router_opt: Some(ingest_router),
ingest_router_service,
ingest_service,
ingester_opt: ingester_opt.clone(),
@@ -822,7 +824,7 @@ async fn setup_ingest_v2(
event_broker: &EventBroker,
control_plane: ControlPlaneServiceClient,
ingester_pool: IngesterPool,
-) -> anyhow::Result<(IngestRouterServiceClient, Option)> {
+) -> anyhow::Result<(IngestRouter, IngestRouterServiceClient, Option)> {
// Instantiate ingest router.
let self_node_id: NodeId = cluster.self_node_id().into();
let content_length_limit = node_config.ingest_api_config.content_length_limit;
@@ -831,6 +833,9 @@ async fn setup_ingest_v2(
.replication_factor()
.expect("replication factor should have been validated")
.get();
+
+ // Any node can serve ingest requests, so we always instantiate an ingest router.
+ // TODO: I'm not sure that's such a good idea.
let ingest_router = IngestRouter::new(
self_node_id.clone(),
control_plane.clone(),
@@ -839,11 +844,9 @@ async fn setup_ingest_v2(
);
ingest_router.subscribe(event_broker);
- // Any node can serve ingest requests, so we always instantiate an ingest router.
- // TODO: I'm not sure that's such a good idea.
let ingest_router_service = IngestRouterServiceClient::tower()
.stack_layer(INGEST_GRPC_SERVER_METRICS_LAYER.clone())
- .build(ingest_router);
+ .build(ingest_router.clone());
// We compute the burst limit as something a bit larger than the content length limit, because
// we actually rewrite the `\n-delimited format into a tiny bit larger buffer, where the
@@ -937,7 +940,7 @@ async fn setup_ingest_v2(
})
});
ingester_pool.listen_for_changes(ingester_change_stream);
- Ok((ingest_router_service, ingester_opt))
+ Ok((ingest_router, ingest_router_service, ingester_opt))
}
async fn setup_searcher(
diff --git a/quickwit/quickwit-serve/src/rest.rs b/quickwit/quickwit-serve/src/rest.rs
index a4f122383c8..6b61cba495c 100644
--- a/quickwit/quickwit-serve/src/rest.rs
+++ b/quickwit/quickwit-serve/src/rest.rs
@@ -628,8 +628,9 @@ mod tests {
indexing_service_opt: None,
index_manager: index_service,
ingest_service: ingest_service_client(),
- ingester_opt: None,
+ ingest_router_opt: None,
ingest_router_service: IngestRouterServiceClient::mocked(),
+ ingester_opt: None,
janitor_service_opt: None,
otlp_logs_service_opt: None,
otlp_traces_service_opt: None,