diff --git a/quickwit/quickwit-ingest/src/ingest_v2/broadcast.rs b/quickwit/quickwit-ingest/src/ingest_v2/broadcast.rs index 8feaaa7aa3d..9fa8d8d930a 100644 --- a/quickwit/quickwit-ingest/src/ingest_v2/broadcast.rs +++ b/quickwit/quickwit-ingest/src/ingest_v2/broadcast.rs @@ -171,7 +171,7 @@ impl BroadcastLocalShardsTask { .shards .iter() .filter_map(|(queue_id, shard)| { - if !shard.is_replica() { + if shard.is_advertisable && !shard.is_replica() { Some((queue_id.clone(), shard.shard_state)) } else { None @@ -479,22 +479,44 @@ mod tests { let mut state_guard = state.lock_partially().await.unwrap(); let index_uid: IndexUid = IndexUid::for_test("test-index", 0); + let queue_id_00 = queue_id(&index_uid, "test-source", &ShardId::from(0)); + let shard_00 = IngesterShard::new_solo( + ShardState::Open, + Position::Beginning, + Position::Beginning, + Instant::now(), + ); + state_guard.shards.insert(queue_id_00.clone(), shard_00); + let queue_id_01 = queue_id(&index_uid, "test-source", &ShardId::from(1)); - let shard = IngesterShard::new_solo( + let mut shard_01 = IngesterShard::new_solo( ShardState::Open, Position::Beginning, Position::Beginning, Instant::now(), ); - state_guard.shards.insert(queue_id_01.clone(), shard); + shard_01.is_advertisable = true; + state_guard.shards.insert(queue_id_01.clone(), shard_01); - let rate_limiter = RateLimiter::from_settings(RateLimiterSettings::default()); - let rate_meter = RateMeter::default(); + let queue_id_02 = queue_id(&index_uid, "test-source", &ShardId::from(2)); + let mut shard_02 = IngesterShard::new_replica( + NodeId::from("test-leader"), + ShardState::Open, + Position::Beginning, + Position::Beginning, + Instant::now(), + ); + shard_02.is_advertisable = true; + state_guard.shards.insert(queue_id_02.clone(), shard_02); - state_guard - .rate_trackers - .insert(queue_id_01.clone(), (rate_limiter, rate_meter)); + for queue_id in [queue_id_00, queue_id_01, queue_id_02] { + let rate_limiter = RateLimiter::from_settings(RateLimiterSettings::default()); + let rate_meter = RateMeter::default(); + state_guard + .rate_trackers + .insert(queue_id, (rate_limiter, rate_meter)); + } drop(state_guard); let new_snapshot = task.snapshot_local_shards().await.unwrap(); diff --git a/quickwit/quickwit-ingest/src/ingest_v2/ingester.rs b/quickwit/quickwit-ingest/src/ingest_v2/ingester.rs index f6cd7f0cc85..a5ec1cb717e 100644 --- a/quickwit/quickwit-ingest/src/ingest_v2/ingester.rs +++ b/quickwit/quickwit-ingest/src/ingest_v2/ingester.rs @@ -472,7 +472,6 @@ impl Ingester { }; return Ok(persist_response); } - // first verify if we would locally accept each subrequest { let mut total_requested_capacity = bytesize::ByteSize::b(0); @@ -491,6 +490,11 @@ impl Ingester { persist_failures.push(persist_failure); continue; }; + // A router can only know about a newly opened shard if it has been informed by the + // control plane, which confirms that the shard was correctly opened in the + // metastore. + shard.is_advertisable = true; + if shard.is_closed() { let persist_failure = PersistFailure { subrequest_id: subrequest.subrequest_id, @@ -597,7 +601,6 @@ impl Ingester { } } } - // replicate to the follower { let mut replicate_futures = FuturesUnordered::new(); @@ -846,17 +849,22 @@ impl Ingester { open_fetch_stream_request: OpenFetchStreamRequest, ) -> IngestV2Result>> { let queue_id = open_fetch_stream_request.queue_id(); - let shard_status_rx = self - .state - .lock_partially() - .await? - .shards - .get(&queue_id) - .ok_or_else(|| IngestV2Error::ShardNotFound { - shard_id: open_fetch_stream_request.shard_id().clone(), - })? - .shard_status_rx - .clone(); + + let mut state_guard = self.state.lock_partially().await?; + + let shard = + state_guard + .shards + .get_mut(&queue_id) + .ok_or_else(|| IngestV2Error::ShardNotFound { + shard_id: open_fetch_stream_request.shard_id().clone(), + })?; + // An indexer can only know about a newly opened shard if it has been scheduled by the + // control plane, which confirms that the shard was correctly opened in the + // metastore. + shard.is_advertisable = true; + + let shard_status_rx = shard.shard_status_rx.clone(); let mrecordlog = self.state.mrecordlog(); let (service_stream, _fetch_task_handle) = FetchStreamTask::spawn( open_fetch_stream_request, @@ -1478,6 +1486,7 @@ mod tests { solo_shard_02.assert_is_closed(); solo_shard_02.assert_replication_position(Position::offset(1u64)); solo_shard_02.assert_truncation_position(Position::offset(0u64)); + assert!(solo_shard_02.is_advertisable); state_guard .mrecordlog @@ -1495,21 +1504,32 @@ mod tests { let mut state_guard = ingester.state.lock_fully().await.unwrap(); let index_uid: IndexUid = IndexUid::for_test("test-index", 0); - let queue_id_01 = queue_id(&index_uid, "test-source", &ShardId::from(1)); - let shard = IngesterShard::new_solo( + let queue_id_00 = queue_id(&index_uid, "test-source", &ShardId::from(0)); + let shard_00 = IngesterShard::new_solo( ShardState::Open, Position::Beginning, Position::Beginning, Instant::now(), ); - state_guard.shards.insert(queue_id_01.clone(), shard); - - let rate_limiter = RateLimiter::from_settings(RateLimiterSettings::default()); - let rate_meter = RateMeter::default(); - state_guard - .rate_trackers - .insert(queue_id_01.clone(), (rate_limiter, rate_meter)); + state_guard.shards.insert(queue_id_00.clone(), shard_00); + let queue_id_01 = queue_id(&index_uid, "test-source", &ShardId::from(1)); + let mut shard_01 = IngesterShard::new_solo( + ShardState::Open, + Position::Beginning, + Position::Beginning, + Instant::now(), + ); + shard_01.is_advertisable = true; + state_guard.shards.insert(queue_id_01.clone(), shard_01); + + for queue_id in [&queue_id_00, &queue_id_01] { + let rate_limiter = RateLimiter::from_settings(RateLimiterSettings::default()); + let rate_meter = RateMeter::default(); + state_guard + .rate_trackers + .insert(queue_id.clone(), (rate_limiter, rate_meter)); + } drop(state_guard); tokio::time::sleep(Duration::from_millis(100)).await; @@ -2548,12 +2568,9 @@ mod tests { .await .unwrap(); - state_guard - .shards - .get(&queue_id) - .unwrap() - .notify_shard_status(); - + let shard = state_guard.shards.get(&queue_id).unwrap(); + assert!(shard.is_advertisable); + shard.notify_shard_status(); drop(state_guard); let fetch_response = fetch_stream.next().await.unwrap().unwrap(); diff --git a/quickwit/quickwit-ingest/src/ingest_v2/models.rs b/quickwit/quickwit-ingest/src/ingest_v2/models.rs index 2b9f203880a..aee61f2130e 100644 --- a/quickwit/quickwit-ingest/src/ingest_v2/models.rs +++ b/quickwit/quickwit-ingest/src/ingest_v2/models.rs @@ -44,6 +44,13 @@ pub(super) struct IngesterShard { pub replication_position_inclusive: Position, /// Position up to which the shard has been truncated. pub truncation_position_inclusive: Position, + /// Whether the shard should be advertised to other nodes (routers) via gossip. + /// + /// Because shards are created in multiple steps, (e.g., init shard on leader, create shard in + /// metastore), we must receive a "signal" from the control plane confirming that a shard + /// was successfully opened before advertising it. Currently, this confirmation comes in the + /// form of `PersistRequest` or `FetchRequest`. + pub is_advertisable: bool, pub shard_status_tx: watch::Sender, pub shard_status_rx: watch::Receiver, /// Instant at which the shard was last written to. @@ -65,6 +72,7 @@ impl IngesterShard { shard_state, replication_position_inclusive, truncation_position_inclusive, + is_advertisable: false, shard_status_tx, shard_status_rx, last_write_instant: now, @@ -85,6 +93,9 @@ impl IngesterShard { shard_state, replication_position_inclusive, truncation_position_inclusive, + // This is irrelevant for replica shards since they are not advertised via gossip + // anyway. + is_advertisable: false, shard_status_tx, shard_status_rx, last_write_instant: now, @@ -104,6 +115,7 @@ impl IngesterShard { shard_state, replication_position_inclusive, truncation_position_inclusive, + is_advertisable: false, shard_status_tx, shard_status_rx, last_write_instant: now, @@ -240,6 +252,7 @@ mod tests { primary_shard.truncation_position_inclusive, Position::Beginning ); + assert!(!primary_shard.is_advertisable); } #[test] @@ -265,6 +278,7 @@ mod tests { replica_shard.truncation_position_inclusive, Position::Beginning ); + assert!(!replica_shard.is_advertisable); } #[test] @@ -286,5 +300,6 @@ mod tests { solo_shard.truncation_position_inclusive, Position::Beginning ); + assert!(!solo_shard.is_advertisable); } } diff --git a/quickwit/quickwit-ingest/src/ingest_v2/state.rs b/quickwit/quickwit-ingest/src/ingest_v2/state.rs index daab1744ce5..8cbfe81f5f4 100644 --- a/quickwit/quickwit-ingest/src/ingest_v2/state.rs +++ b/quickwit/quickwit-ingest/src/ingest_v2/state.rs @@ -181,12 +181,14 @@ impl IngesterState { } else { Position::offset(*position_range.start() - 1) }; - let solo_shard = IngesterShard::new_solo( + let mut solo_shard = IngesterShard::new_solo( ShardState::Closed, replication_position_inclusive, truncation_position_inclusive, now, ); + // We want to advertise the shard as read-only right away. + solo_shard.is_advertisable = true; inner_guard.shards.insert(queue_id.clone(), solo_shard); let rate_limiter = RateLimiter::from_settings(rate_limiter_settings);