diff --git a/quickwit/quickwit-common/src/stream_utils.rs b/quickwit/quickwit-common/src/stream_utils.rs index aa89b483621..d18813a0886 100644 --- a/quickwit/quickwit-common/src/stream_utils.rs +++ b/quickwit/quickwit-common/src/stream_utils.rs @@ -22,8 +22,8 @@ use std::fmt; use std::pin::Pin; use futures::{stream, Stream, TryStreamExt}; -use tokio::sync::mpsc; -use tokio_stream::wrappers::{ReceiverStream, UnboundedReceiverStream}; +use tokio::sync::{mpsc, watch}; +use tokio_stream::wrappers::{ReceiverStream, UnboundedReceiverStream, WatchStream}; use tracing::warn; pub type BoxStream = Pin + Send + Unpin + 'static>>; @@ -57,6 +57,15 @@ where T: Send + 'static } } +impl ServiceStream +where T: Clone + Send + Sync + 'static +{ + pub fn new_watch(init: T) -> (watch::Sender, Self) { + let (sender, receiver) = watch::channel(init); + (sender, receiver.into()) + } +} + impl ServiceStream> where T: Send + 'static, @@ -104,6 +113,16 @@ where T: Send + 'static } } +impl From> for ServiceStream +where T: Clone + Send + Sync + 'static +{ + fn from(receiver: watch::Receiver) -> Self { + Self { + inner: Box::pin(WatchStream::new(receiver)), + } + } +} + /// Adapts a server-side tonic::Streaming into a ServiceStream of `Result`. Once /// an error is encountered, the stream will be closed and subsequent calls to `poll_next` will /// return `None`. diff --git a/quickwit/quickwit-control-plane/src/ingest/ingest_controller.rs b/quickwit/quickwit-control-plane/src/ingest/ingest_controller.rs index 2d086d6d54a..3f93357d32a 100644 --- a/quickwit/quickwit-control-plane/src/ingest/ingest_controller.rs +++ b/quickwit/quickwit-control-plane/src/ingest/ingest_controller.rs @@ -30,7 +30,7 @@ use quickwit_proto::control_plane::{ GetOrCreateOpenShardsResponse, GetOrCreateOpenShardsSuccess, }; use quickwit_proto::ingest::ingester::{IngesterService, PingRequest}; -use quickwit_proto::ingest::{ClosedShards, IngestV2Error, ShardState}; +use quickwit_proto::ingest::{IngestV2Error, ShardIds, ShardState}; use quickwit_proto::metastore; use quickwit_proto::metastore::{MetastoreService, MetastoreServiceClient}; use quickwit_proto::types::{IndexUid, NodeId}; @@ -172,11 +172,7 @@ impl IngestController { None } - fn handle_closed_shards( - &self, - closed_shards: Vec, - model: &mut ControlPlaneModel, - ) { + fn handle_closed_shards(&self, closed_shards: Vec, model: &mut ControlPlaneModel) { for closed_shard in closed_shards { let index_uid: IndexUid = closed_shard.index_uid.into(); let source_id = closed_shard.source_id; @@ -764,7 +760,7 @@ mod tests { let request = GetOrCreateOpenShardsRequest { subrequests: Vec::new(), - closed_shards: vec![ClosedShards { + closed_shards: vec![ShardIds { index_uid: index_uid.clone().into(), source_id: source_id.clone(), shard_ids: vec![1, 2], diff --git a/quickwit/quickwit-indexing/src/source/ingest/mod.rs b/quickwit/quickwit-indexing/src/source/ingest/mod.rs index e36752f0632..57ab054bf5b 100644 --- a/quickwit/quickwit-indexing/src/source/ingest/mod.rs +++ b/quickwit/quickwit-indexing/src/source/ingest/mod.rs @@ -26,12 +26,13 @@ use anyhow::{bail, Context}; use async_trait::async_trait; use itertools::Itertools; use quickwit_actors::{ActorExitStatus, Mailbox}; +use quickwit_common::retry::RetryParams; use quickwit_ingest::{ decoded_mrecords, FetchStreamError, IngesterPool, MRecord, MultiFetchStream, }; use quickwit_metastore::checkpoint::{PartitionId, SourceCheckpoint}; use quickwit_proto::ingest::ingester::{ - FetchResponseV2, IngesterService, TruncateRequest, TruncateSubrequest, + FetchResponseV2, IngesterService, TruncateShardsRequest, TruncateShardsSubrequest, }; use quickwit_proto::metastore::{ AcquireShardsRequest, AcquireShardsSubrequest, AcquireShardsSubresponse, MetastoreService, @@ -237,7 +238,7 @@ impl IngestSource { } async fn truncate(&self, truncation_point: &[(ShardId, Position)]) { - let mut per_ingester_truncate_subrequests: HashMap<&NodeId, Vec> = + let mut per_ingester_subrequests: HashMap<&NodeId, Vec> = HashMap::new(); for (shard_id, to_position_exclusive) in truncation_point { @@ -251,24 +252,24 @@ impl IngestSource { ); continue; }; - let truncate_subrequest = TruncateSubrequest { + let truncate_shards_subrequest = TruncateShardsSubrequest { index_uid: self.client_id.index_uid.clone().into(), source_id: self.client_id.source_id.clone(), shard_id: *shard_id, to_position_inclusive: Some(to_position_exclusive.clone()), }; if let Some(follower_id) = &shard.follower_id_opt { - per_ingester_truncate_subrequests + per_ingester_subrequests .entry(follower_id) .or_default() - .push(truncate_subrequest.clone()); + .push(truncate_shards_subrequest.clone()); } - per_ingester_truncate_subrequests + per_ingester_subrequests .entry(&shard.leader_id) .or_default() - .push(truncate_subrequest); + .push(truncate_shards_subrequest); } - for (ingester_id, truncate_subrequests) in per_ingester_truncate_subrequests { + for (ingester_id, truncate_subrequests) in per_ingester_subrequests { let Some(mut ingester) = self.ingester_pool.get(ingester_id) else { warn!( "failed to truncate shard: ingester `{}` is unavailable", @@ -276,13 +277,35 @@ impl IngestSource { ); continue; }; - let truncate_request = TruncateRequest { + let truncate_shards_request = TruncateShardsRequest { ingester_id: ingester_id.clone().into(), subrequests: truncate_subrequests, }; let truncate_future = async move { - if let Err(error) = ingester.truncate(truncate_request).await { - warn!("failed to truncate shard(s): {error}"); + let retry_params = RetryParams { + base_delay: Duration::from_secs(1), + max_delay: Duration::from_secs(30), + max_attempts: 5, + }; + let mut num_attempts = 0; + + while num_attempts < retry_params.max_attempts { + let Err(error) = ingester + .truncate_shards(truncate_shards_request.clone()) + .await + else { + return; + }; + num_attempts += 1; + let delay = retry_params.compute_delay(num_attempts); + time::sleep(delay).await; + + if num_attempts == retry_params.max_attempts { + error!( + ingester_id=%truncate_shards_request.ingester_id, + "failed to truncate shard(s): {error}" + ); + } } }; // Truncation is best-effort, so fire and forget. @@ -480,7 +503,7 @@ mod tests { use quickwit_common::ServiceStream; use quickwit_config::{SourceConfig, SourceParams}; use quickwit_proto::indexing::IndexingPipelineId; - use quickwit_proto::ingest::ingester::{IngesterServiceClient, TruncateResponse}; + use quickwit_proto::ingest::ingester::{IngesterServiceClient, TruncateShardsResponse}; use quickwit_proto::ingest::{IngestV2Error, MRecordBatch, Shard, ShardState}; use quickwit_proto::metastore::{AcquireShardsResponse, AcquireShardsSubresponse}; use quickwit_storage::StorageResolver; @@ -552,7 +575,7 @@ mod tests { Ok(service_stream) }); ingester_mock_0 - .expect_truncate() + .expect_truncate_shards() .once() .returning(|request| { assert_eq!(request.ingester_id, "test-ingester-0"); @@ -564,7 +587,7 @@ mod tests { assert_eq!(subrequest.shard_id, 1); assert_eq!(subrequest.to_position_inclusive, Some(11u64.into())); - let response = TruncateResponse {}; + let response = TruncateShardsResponse {}; Ok(response) }); @@ -792,7 +815,7 @@ mod tests { let mut ingester_mock_0 = IngesterServiceClient::mock(); ingester_mock_0 - .expect_truncate() + .expect_truncate_shards() .once() .returning(|request| { assert_eq!(request.ingester_id, "test-ingester-0"); @@ -810,14 +833,14 @@ mod tests { assert_eq!(subrequest_2.shard_id, 3); assert_eq!(subrequest_2.to_position_inclusive, Some(33u64.into())); - Ok(TruncateResponse {}) + Ok(TruncateShardsResponse {}) }); let ingester_0: IngesterServiceClient = ingester_mock_0.into(); ingester_pool.insert("test-ingester-0".into(), ingester_0.clone()); let mut ingester_mock_1 = IngesterServiceClient::mock(); ingester_mock_1 - .expect_truncate() + .expect_truncate_shards() .once() .returning(|request| { assert_eq!(request.ingester_id, "test-ingester-1"); @@ -831,14 +854,14 @@ mod tests { assert_eq!(subrequest_1.shard_id, 3); assert_eq!(subrequest_1.to_position_inclusive, Some(33u64.into())); - Ok(TruncateResponse {}) + Ok(TruncateShardsResponse {}) }); let ingester_1: IngesterServiceClient = ingester_mock_1.into(); ingester_pool.insert("test-ingester-1".into(), ingester_1.clone()); let mut ingester_mock_3 = IngesterServiceClient::mock(); ingester_mock_3 - .expect_truncate() + .expect_truncate_shards() .once() .returning(|request| { assert_eq!(request.ingester_id, "test-ingester-3"); @@ -848,7 +871,7 @@ mod tests { assert_eq!(subrequest_0.shard_id, 4); assert_eq!(subrequest_0.to_position_inclusive, Some(44u64.into())); - Ok(TruncateResponse {}) + Ok(TruncateShardsResponse {}) }); let ingester_3: IngesterServiceClient = ingester_mock_3.into(); ingester_pool.insert("test-ingester-3".into(), ingester_3.clone()); diff --git a/quickwit/quickwit-ingest/src/codegen/ingest_service.rs b/quickwit/quickwit-ingest/src/codegen/ingest_service.rs index 693c9e46785..5db6aea399d 100644 --- a/quickwit/quickwit-ingest/src/codegen/ingest_service.rs +++ b/quickwit/quickwit-ingest/src/codegen/ingest_service.rs @@ -42,7 +42,7 @@ pub struct IngestResponse { #[prost(uint64, tag = "1")] pub num_docs_for_processing: u64, } -/// Fetch messages that have position strictly after `start_after`. +/// Fetch messages with position strictly after `start_after`. #[derive(serde::Serialize, serde::Deserialize, utoipa::ToSchema)] #[allow(clippy::derive_partial_eq_without_eq)] #[derive(Clone, PartialEq, ::prost::Message)] diff --git a/quickwit/quickwit-ingest/src/ingest_v2/fetch.rs b/quickwit/quickwit-ingest/src/ingest_v2/fetch.rs index 26653e32ecc..c29c75afebf 100644 --- a/quickwit/quickwit-ingest/src/ingest_v2/fetch.rs +++ b/quickwit/quickwit-ingest/src/ingest_v2/fetch.rs @@ -513,7 +513,9 @@ mod tests { use bytes::Bytes; use mrecordlog::MultiRecordLog; - use quickwit_proto::ingest::ingester::IngesterServiceClient; + use quickwit_proto::ingest::ingester::{ + IngesterServiceClient, IngesterStatus, ObservationMessage, + }; use quickwit_proto::types::queue_id; use tokio::time::timeout; @@ -534,14 +536,17 @@ mod tests { shard_id: 1, from_position_exclusive: None, }; - let (new_records_tx, new_records_rx) = watch::channel(()); + let (observation_tx, _observation_rx) = watch::channel(Ok(ObservationMessage::default())); let state = Arc::new(RwLock::new(IngesterState { mrecordlog, shards: HashMap::new(), rate_limiters: HashMap::new(), replication_streams: HashMap::new(), replication_tasks: HashMap::new(), + status: IngesterStatus::Ready, + observation_tx, })); + let (new_records_tx, new_records_rx) = watch::channel(()); let (mut fetch_stream, fetch_task_handle) = FetchStreamTask::spawn( open_fetch_stream_request, state.clone(), @@ -700,14 +705,17 @@ mod tests { shard_id: 1, from_position_exclusive: Some(Position::from(0u64)), }; - let (new_records_tx, new_records_rx) = watch::channel(()); + let (observation_tx, _observation_rx) = watch::channel(Ok(ObservationMessage::default())); let state = Arc::new(RwLock::new(IngesterState { mrecordlog, shards: HashMap::new(), rate_limiters: HashMap::new(), replication_streams: HashMap::new(), replication_tasks: HashMap::new(), + status: IngesterStatus::Ready, + observation_tx, })); + let (new_records_tx, new_records_rx) = watch::channel(()); let (mut fetch_stream, _fetch_task_handle) = FetchStreamTask::spawn( open_fetch_stream_request, state.clone(), @@ -800,14 +808,17 @@ mod tests { shard_id: 1, from_position_exclusive: None, }; - let (_new_records_tx, new_records_rx) = watch::channel(()); + let (observation_tx, _observation_rx) = watch::channel(Ok(ObservationMessage::default())); let state = Arc::new(RwLock::new(IngesterState { mrecordlog, shards: HashMap::new(), rate_limiters: HashMap::new(), replication_streams: HashMap::new(), replication_tasks: HashMap::new(), + status: IngesterStatus::Ready, + observation_tx, })); + let (_new_records_tx, new_records_rx) = watch::channel(()); let (mut fetch_stream, fetch_task_handle) = FetchStreamTask::spawn( open_fetch_stream_request, state.clone(), @@ -838,12 +849,15 @@ mod tests { shard_id: 1, from_position_exclusive: None, }; + let (observation_tx, _observation_rx) = watch::channel(Ok(ObservationMessage::default())); let state = Arc::new(RwLock::new(IngesterState { mrecordlog, shards: HashMap::new(), rate_limiters: HashMap::new(), replication_streams: HashMap::new(), replication_tasks: HashMap::new(), + status: IngesterStatus::Ready, + observation_tx, })); let (new_records_tx, new_records_rx) = watch::channel(()); let (mut fetch_stream, _fetch_task_handle) = diff --git a/quickwit/quickwit-ingest/src/ingest_v2/ingester.rs b/quickwit/quickwit-ingest/src/ingest_v2/ingester.rs index 543e89c2505..c61ad2e6fa7 100644 --- a/quickwit/quickwit-ingest/src/ingest_v2/ingester.rs +++ b/quickwit/quickwit-ingest/src/ingest_v2/ingester.rs @@ -34,20 +34,22 @@ use mrecordlog::MultiRecordLog; use quickwit_common::tower::Pool; use quickwit_common::ServiceStream; use quickwit_proto::ingest::ingester::{ - AckReplicationMessage, CloseShardsRequest, CloseShardsResponse, FetchResponseV2, - IngesterService, IngesterServiceClient, IngesterServiceStream, OpenFetchStreamRequest, - OpenReplicationStreamRequest, OpenReplicationStreamResponse, PersistFailure, - PersistFailureReason, PersistRequest, PersistResponse, PersistSuccess, PingRequest, - PingResponse, ReplicateFailureReason, ReplicateRequest, ReplicateSubrequest, - SynReplicationMessage, TruncateRequest, TruncateResponse, + AckReplicationMessage, CloseShardsRequest, CloseShardsResponse, DecommissionRequest, + DecommissionResponse, FetchResponseV2, IngesterService, IngesterServiceClient, + IngesterServiceStream, IngesterStatus, ObservationMessage, OpenFetchStreamRequest, + OpenObservationStreamRequest, OpenReplicationStreamRequest, OpenReplicationStreamResponse, + PersistFailure, PersistFailureReason, PersistRequest, PersistResponse, PersistSuccess, + PingRequest, PingResponse, ReplicateFailureReason, ReplicateRequest, ReplicateSubrequest, + SynReplicationMessage, TruncateShardsRequest, TruncateShardsResponse, }; use quickwit_proto::ingest::{CommitTypeV2, IngestV2Error, IngestV2Result, ShardState}; use quickwit_proto::types::{NodeId, Position, QueueId}; -use tokio::sync::RwLock; +use tokio::sync::{watch, RwLock}; use tracing::{error, info, warn}; use super::fetch::FetchStreamTask; -use super::models::{IngesterShard, PrimaryShard}; +use super::models::IngesterShard; +use super::mrecord::MRecord; use super::mrecordlog_utils::{append_eof_record_if_necessary, check_enough_capacity}; use super::rate_limiter::{RateLimiter, RateLimiterSettings}; use super::replication::{ @@ -55,9 +57,9 @@ use super::replication::{ SYN_REPLICATION_STREAM_CAPACITY, }; use super::IngesterPool; -use crate::ingest_v2::models::SoloShard; +use crate::ingest_v2::mrecordlog_utils::get_truncation_position; use crate::metrics::INGEST_METRICS; -use crate::{estimate_size, FollowerId, LeaderId, MRecord}; +use crate::{estimate_size, FollowerId, LeaderId}; /// Duration after which persist requests time out with /// [`quickwit_proto::ingest::IngestV2Error::Timeout`]. @@ -76,6 +78,7 @@ pub struct Ingester { memory_capacity: ByteSize, rate_limiter_settings: RateLimiterSettings, replication_factor: usize, + observation_rx: watch::Receiver>, } impl fmt::Debug for Ingester { @@ -94,6 +97,8 @@ pub(super) struct IngesterState { pub replication_streams: HashMap, // Replication tasks running for each replication stream opened with leaders. pub replication_tasks: HashMap, + pub status: IngesterStatus, + pub observation_tx: watch::Sender>, } impl Ingester { @@ -111,7 +116,18 @@ impl Ingester { mrecordlog::SyncPolicy::OnDelay(Duration::from_secs(5)), ) .await - .map_err(|error| IngestV2Error::Internal(error.to_string()))?; + .map_err(|error| { + let message = format!( + "failed to create or open write-ahead log located at `{}`: {error}", + wal_dir_path.display() + ); + IngestV2Error::Internal(message) + })?; + let observe_message = ObservationMessage { + node_id: self_node_id.clone().into(), + status: IngesterStatus::Ready as i32, + }; + let (observation_tx, observation_rx) = watch::channel(Ok(observe_message)); let inner = IngesterState { mrecordlog, @@ -119,8 +135,10 @@ impl Ingester { rate_limiters: HashMap::new(), replication_streams: HashMap::new(), replication_tasks: HashMap::new(), + status: IngesterStatus::Ready, + observation_tx, }; - let mut ingester = Self { + let ingester = Self { self_node_id, ingester_pool, state: Arc::new(RwLock::new(inner)), @@ -128,6 +146,7 @@ impl Ingester { memory_capacity, rate_limiter_settings, replication_factor, + observation_rx, }; info!( replication_factor=%replication_factor, @@ -139,7 +158,28 @@ impl Ingester { Ok(ingester) } - async fn init(&mut self) -> IngestV2Result<()> { + /// Checks whether the ingester is fully decommissioned and updates its status accordingly. + fn check_decommissioning_status(&self, state: &mut IngesterState) { + if state.status != IngesterStatus::Decommissioning { + return; + } + if state.shards.values().all(|shard| { + shard.shard_state.is_closed() && shard.truncation_position_inclusive == Position::Eof + }) { + info!("ingester fully decommissioned"); + state.status = IngesterStatus::Decommissioned; + + state.observation_tx.send_if_modified(|observation_result| { + if let Ok(observation) = observation_result { + observation.status = IngesterStatus::Decommissioned as i32; + return true; + } + false + }); + } + } + + async fn init(&self) -> IngestV2Result<()> { let mut state_guard = self.state.write().await; let queue_ids: Vec = state_guard @@ -156,9 +196,11 @@ impl Ingester { for queue_id in queue_ids { append_eof_record_if_necessary(&mut state_guard.mrecordlog, &queue_id).await; - let solo_shard = SoloShard::new(ShardState::Closed, Position::Eof); - let shard = IngesterShard::Solo(solo_shard); - state_guard.shards.insert(queue_id.clone(), shard); + let truncation_position = get_truncation_position(&state_guard.mrecordlog, &queue_id) + .unwrap_or(Position::Eof); + let solo_shard = + IngesterShard::new_solo(ShardState::Closed, Position::Eof, truncation_position); + state_guard.shards.insert(queue_id, solo_shard); } Ok(()) } @@ -190,16 +232,32 @@ impl Ingester { let shard = if let Some(follower_id) = follower_id_opt { self.init_replication_stream(state, leader_id, follower_id) .await?; - let primary_shard = PrimaryShard::new(follower_id.clone()); - IngesterShard::Primary(primary_shard) + + IngesterShard::new_primary( + follower_id.clone(), + ShardState::Open, + Position::Beginning, + Position::Beginning, + ) } else { - let solo_shard = SoloShard::new(ShardState::Open, Position::Beginning); - IngesterShard::Solo(solo_shard) + IngesterShard::new_solo(ShardState::Open, Position::Beginning, Position::Beginning) }; let entry = state.shards.entry(queue_id.clone()); Ok(entry.or_insert(shard)) } + async fn close_shards_inner(&self, state: &mut IngesterState, queue_ids: &[QueueId]) { + for queue_id in queue_ids { + append_eof_record_if_necessary(&mut state.mrecordlog, queue_id).await; + + if let Some(shard) = state.shards.get_mut(queue_id) { + shard.shard_state = ShardState::Closed; + shard.notify_new_records(); + } + } + // TODO: Handle replicated shards. + } + async fn init_replication_stream( &self, state: &mut IngesterState, @@ -271,6 +329,26 @@ impl IngesterService for Ingester { let mut state_guard = self.state.write().await; + if state_guard.status != IngesterStatus::Ready { + persist_failures.reserve_exact(persist_request.subrequests.len()); + + for subrequest in persist_request.subrequests { + let persist_failure = PersistFailure { + subrequest_id: subrequest.subrequest_id, + index_uid: subrequest.index_uid, + source_id: subrequest.source_id, + shard_id: subrequest.shard_id, + reason: PersistFailureReason::ShardClosed as i32, + }; + persist_failures.push(persist_failure); + } + let persist_response = PersistResponse { + leader_id: leader_id.into(), + successes: Vec::new(), + failures: persist_failures, + }; + return Ok(persist_response); + } for subrequest in persist_request.subrequests { let queue_id = subrequest.queue_id(); let follower_id_opt: Option = subrequest.follower_id.map(Into::into); @@ -286,9 +364,9 @@ impl IngesterService for Ingester { .await .expect("TODO") }; - let from_position_exclusive = shard.replication_position_inclusive(); + let from_position_exclusive = shard.replication_position_inclusive.clone(); - if shard.is_closed() { + if shard.shard_state.is_closed() { let persist_failure = PersistFailure { subrequest_id: subrequest.subrequest_id, index_uid: subrequest.index_uid, @@ -310,7 +388,7 @@ impl IngesterService for Ingester { source_id: subrequest.source_id, shard_id: subrequest.shard_id, replication_position_inclusive: Some( - shard.replication_position_inclusive(), + shard.replication_position_inclusive.clone(), ), }; persist_successes.push(persist_success); @@ -515,6 +593,9 @@ impl IngesterService for Ingester { let mut state_guard = self.state.write().await; + if state_guard.status != IngesterStatus::Ready { + return Err(IngestV2Error::Internal("node decommissioned".to_string())); + } let Entry::Vacant(entry) = state_guard.replication_tasks.entry(leader_id.clone()) else { return Err(IngestV2Error::Internal(format!( "a replication stream betwen {leader_id} and {follower_id} is already opened" @@ -554,7 +635,8 @@ impl IngesterService for Ingester { .shards .get(&queue_id) .ok_or_else(|| IngestV2Error::Internal("shard not found".to_string()))? - .new_records_rx(); + .new_records_rx + .clone(); let (service_stream, _fetch_task_handle) = FetchStreamTask::spawn( open_fetch_stream_request, self.state.clone(), @@ -565,6 +647,11 @@ impl IngesterService for Ingester { } async fn ping(&mut self, ping_request: PingRequest) -> IngestV2Result { + let state_guard = self.state.read().await; + + if state_guard.status != IngesterStatus::Ready { + return Err(IngestV2Error::Internal("node decommissioned".to_string())); + } if ping_request.leader_id != self.self_node_id { let ping_response = PingResponse {}; return Ok(ping_response); @@ -584,19 +671,36 @@ impl IngesterService for Ingester { Ok(ping_response) } - async fn truncate( + async fn close_shards( &mut self, - truncate_request: TruncateRequest, - ) -> IngestV2Result { - if truncate_request.ingester_id != self.self_node_id { + close_shards_request: CloseShardsRequest, + ) -> IngestV2Result { + let mut state_guard = self.state.write().await; + + let queue_ids: Vec = close_shards_request + .shards + .iter() + .flat_map(|shards| shards.queue_ids()) + .collect(); + + self.close_shards_inner(&mut state_guard, &queue_ids).await; + + Ok(CloseShardsResponse {}) + } + + async fn truncate_shards( + &mut self, + truncate_shards_request: TruncateShardsRequest, + ) -> IngestV2Result { + if truncate_shards_request.ingester_id != self.self_node_id { return Err(IngestV2Error::Internal(format!( "routing error: expected ingester `{}`, got `{}`", - self.self_node_id, truncate_request.ingester_id, + self.self_node_id, truncate_shards_request.ingester_id, ))); } let mut state_guard = self.state.write().await; - for subrequest in truncate_request.subrequests { + for subrequest in truncate_shards_request.subrequests { let queue_id = subrequest.queue_id(); let truncate_position_opt = match subrequest.to_position_inclusive() { @@ -617,35 +721,77 @@ impl IngesterService for Ingester { Ok(_) | Err(TruncateError::MissingQueue(_)) => {} Err(error) => { error!("failed to truncate queue `{queue_id}`: {error}"); + continue; + } + } + // If we successfully truncated the queue at EOF, it means that the shard is closed + // and fully indexed. + if subrequest.to_position_inclusive() == Position::Eof { + if let Some(shard) = state_guard.shards.get_mut(&queue_id) { + shard.truncation_position_inclusive = Position::Eof } } } } - let truncate_response = TruncateResponse {}; + self.check_decommissioning_status(&mut state_guard); + let truncate_response = TruncateShardsResponse {}; Ok(truncate_response) } - async fn close_shards( + async fn decommission( &mut self, - close_shards_request: CloseShardsRequest, - ) -> IngestV2Result { + _decommission_request: DecommissionRequest, + ) -> IngestV2Result { + info!("decommissioning ingester"); let mut state_guard = self.state.write().await; - for close_shard in close_shards_request.closed_shards { - for queue_id in close_shard.queue_ids() { - if !state_guard.mrecordlog.queue_exists(&queue_id) { - continue; - } - append_eof_record_if_necessary(&mut state_guard.mrecordlog, &queue_id).await; - let shard = state_guard - .shards - .get_mut(&queue_id) - .expect("shard must exist"); - // Notify fetch task. - shard.notify_new_records(); - shard.close(); + + let queue_ids: Vec = state_guard.shards.keys().cloned().collect(); + self.close_shards_inner(&mut state_guard, &queue_ids).await; + + state_guard.status = IngesterStatus::Decommissioning; + self.check_decommissioning_status(&mut state_guard); + + Ok(DecommissionResponse {}) + } + + async fn open_observation_stream( + &mut self, + _open_observation_stream_request: OpenObservationStreamRequest, + ) -> IngestV2Result> { + let observation_stream = self.observation_rx.clone().into(); + Ok(observation_stream) + } +} + +pub async fn wait_for_ingester_decommission(ingester_opt: Option) { + let Some(mut ingester) = ingester_opt else { + return; + }; + if let Err(error) = ingester.decommission(DecommissionRequest {}).await { + error!("failed to initiate ingester decommission: {error}"); + return; + } + let mut observation_stream = match ingester + .open_observation_stream(OpenObservationStreamRequest {}) + .await + { + Ok(observation_stream) => observation_stream, + Err(error) => { + error!("failed to open observation stream: {error}"); + return; + } + }; + while let Some(observation_message_result) = observation_stream.next().await { + let observation_message = match observation_message_result { + Ok(observation_message) => observation_message, + Err(error) => { + error!("observation stream ended unexpectedly: {error}"); + return; } + }; + if observation_message.status() == IngesterStatus::Decommissioned { + return; } - Ok(CloseShardsResponse {}) } } @@ -657,38 +803,99 @@ mod tests { use quickwit_common::tower::ConstantRate; use quickwit_proto::ingest::ingester::{ IngesterServiceGrpcServer, IngesterServiceGrpcServerAdapter, PersistSubrequest, - TruncateSubrequest, + TruncateShardsSubrequest, }; - use quickwit_proto::ingest::{ClosedShards, DocBatchV2}; + use quickwit_proto::ingest::{DocBatchV2, ShardIds}; use quickwit_proto::types::queue_id; use tonic::transport::{Endpoint, Server}; use super::*; use crate::ingest_v2::mrecord::is_eof_mrecord; - use crate::ingest_v2::test_utils::{IngesterShardTestExt, MultiRecordLogTestExt}; + use crate::ingest_v2::test_utils::MultiRecordLogTestExt; + + pub(super) struct IngesterForTest { + node_id: NodeId, + ingester_pool: IngesterPool, + disk_capacity: ByteSize, + memory_capacity: ByteSize, + rate_limiter_settings: RateLimiterSettings, + replication_factor: usize, + } + + impl Default for IngesterForTest { + fn default() -> Self { + Self { + node_id: "test-ingester".into(), + ingester_pool: IngesterPool::default(), + disk_capacity: ByteSize::mb(256), + memory_capacity: ByteSize::mb(1), + rate_limiter_settings: RateLimiterSettings::default(), + replication_factor: 1, + } + } + } + + impl IngesterForTest { + pub fn with_node_id(mut self, node_id: &str) -> Self { + self.node_id = node_id.into(); + self + } + + pub fn with_ingester_pool(mut self, ingester_pool: &IngesterPool) -> Self { + self.ingester_pool = ingester_pool.clone(); + self + } + + pub fn with_disk_capacity(mut self, disk_capacity: ByteSize) -> Self { + self.disk_capacity = disk_capacity; + self + } + + pub fn with_rate_limiter_settings( + mut self, + rate_limiter_settings: RateLimiterSettings, + ) -> Self { + self.rate_limiter_settings = rate_limiter_settings; + self + } + + pub fn with_replication(mut self) -> Self { + self.replication_factor = 2; + self + } + + pub async fn build(self) -> (IngesterContext, Ingester) { + let tempdir = tempfile::tempdir().unwrap(); + let wal_dir_path = tempdir.path(); + let ingester = Ingester::try_new( + self.node_id.clone(), + self.ingester_pool.clone(), + wal_dir_path, + self.disk_capacity, + self.memory_capacity, + self.rate_limiter_settings, + self.replication_factor, + ) + .await + .unwrap(); + let ingester_env = IngesterContext { + _tempdir: tempdir, + node_id: self.node_id, + ingester_pool: self.ingester_pool, + }; + (ingester_env, ingester) + } + } + + pub struct IngesterContext { + _tempdir: tempfile::TempDir, + node_id: NodeId, + ingester_pool: IngesterPool, + } #[tokio::test] async fn test_ingester_init() { - let tempdir = tempfile::tempdir().unwrap(); - let self_node_id: NodeId = "test-ingester-0".into(); - let ingester_pool = IngesterPool::default(); - let wal_dir_path = tempdir.path(); - let disk_capacity = ByteSize::mb(256); - let memory_capacity = ByteSize::mb(1); - let rate_limiter_settings = RateLimiterSettings::default(); - let replication_factor = 2; - let mut ingester = Ingester::try_new( - self_node_id.clone(), - ingester_pool, - wal_dir_path, - disk_capacity, - memory_capacity, - rate_limiter_settings, - replication_factor, - ) - .await - .unwrap(); - + let (_ingester_ctx, ingester) = IngesterForTest::default().build().await; let mut state_guard = ingester.state.write().await; let queue_id_01 = queue_id("test-index:0", "test-source", 1); @@ -719,7 +926,11 @@ mod tests { .await .unwrap(); - let records = [MRecord::new_doc("test-doc-foo").encode()].into_iter(); + let records = [ + MRecord::new_doc("test-doc-foo").encode(), + MRecord::new_doc("test-doc-bar").encode(), + ] + .into_iter(); state_guard .mrecordlog @@ -727,6 +938,12 @@ mod tests { .await .unwrap(); + state_guard + .mrecordlog + .truncate(&queue_id_02, 0) + .await + .unwrap(); + let queue_id_03 = queue_id("test-index:0", "test-source", 3); state_guard .mrecordlog @@ -748,6 +965,7 @@ mod tests { solo_shard_01.assert_is_solo(); solo_shard_01.assert_is_closed(); solo_shard_01.assert_replication_position(Position::Eof); + solo_shard_01.assert_truncation_position(Position::Eof); state_guard .mrecordlog @@ -757,17 +975,19 @@ mod tests { solo_shard_02.assert_is_solo(); solo_shard_02.assert_is_closed(); solo_shard_02.assert_replication_position(Position::Eof); + solo_shard_02.assert_truncation_position(0u64); state_guard.mrecordlog.assert_records_eq( &queue_id_02, .., - &[(0, "\0\0test-doc-foo"), (1, "\0\x02")], + &[(1, "\0\0test-doc-bar"), (2, "\0\x02")], ); let solo_shard_03 = state_guard.shards.get(&queue_id_03).unwrap(); solo_shard_03.assert_is_solo(); solo_shard_03.assert_is_closed(); solo_shard_03.assert_replication_position(Position::Eof); + solo_shard_03.assert_truncation_position(Position::Eof); state_guard .mrecordlog @@ -776,28 +996,10 @@ mod tests { #[tokio::test] async fn test_ingester_persist() { - let tempdir = tempfile::tempdir().unwrap(); - let self_node_id: NodeId = "test-ingester-0".into(); - let ingester_pool = IngesterPool::default(); - let wal_dir_path = tempdir.path(); - let disk_capacity = ByteSize::mb(256); - let memory_capacity = ByteSize::mb(1); - let rate_limiter_settings = RateLimiterSettings::default(); - let replication_factor = 1; - let mut ingester = Ingester::try_new( - self_node_id.clone(), - ingester_pool, - wal_dir_path, - disk_capacity, - memory_capacity, - rate_limiter_settings, - replication_factor, - ) - .await - .unwrap(); + let (ingester_ctx, mut ingester) = IngesterForTest::default().build().await; let persist_request = PersistRequest { - leader_id: self_node_id.to_string(), + leader_id: ingester_ctx.node_id.to_string(), commit_type: CommitTypeV2::Force as i32, subrequests: vec![ PersistSubrequest { @@ -819,7 +1021,7 @@ mod tests { ], }; let persist_response = ingester.persist(persist_request).await.unwrap(); - assert_eq!(persist_response.leader_id, "test-ingester-0"); + assert_eq!(persist_response.leader_id, "test-ingester"); assert_eq!(persist_response.successes.len(), 2); assert_eq!(persist_response.failures.len(), 0); @@ -877,25 +1079,11 @@ mod tests { #[tokio::test] async fn test_ingester_open_replication_stream() { - let tempdir = tempfile::tempdir().unwrap(); - let self_node_id: NodeId = "test-follower".into(); - let ingester_pool = IngesterPool::default(); - let wal_dir_path = tempdir.path(); - let disk_capacity = ByteSize::mb(256); - let memory_capacity = ByteSize::mb(1); - let rate_limiter_settings = RateLimiterSettings::default(); - let replication_factor = 1; - let mut ingester = Ingester::try_new( - self_node_id.clone(), - ingester_pool, - wal_dir_path, - disk_capacity, - memory_capacity, - rate_limiter_settings, - replication_factor, - ) - .await - .unwrap(); + let (_ingester_ctx, mut ingester) = IngesterForTest::default() + .with_node_id("test-follower") + .build() + .await; + let (syn_replication_stream_tx, syn_replication_stream) = ServiceStream::new_bounded(5); let open_stream_request = OpenReplicationStreamRequest { leader_id: "test-leader".to_string(), @@ -924,46 +1112,21 @@ mod tests { #[tokio::test] async fn test_ingester_persist_replicate() { - let tempdir = tempfile::tempdir().unwrap(); - let leader_id: NodeId = "test-leader".into(); - let ingester_pool = IngesterPool::default(); - let wal_dir_path = tempdir.path(); - - let disk_capacity = ByteSize::mb(256); - let memory_capacity = ByteSize::mb(1); - let rate_limiter_settings = RateLimiterSettings::default(); - let replication_factor = 2; - - let mut leader = Ingester::try_new( - leader_id.clone(), - ingester_pool.clone(), - wal_dir_path, - disk_capacity, - memory_capacity, - rate_limiter_settings, - replication_factor, - ) - .await - .unwrap(); - - let tempdir = tempfile::tempdir().unwrap(); - let follower_id: NodeId = "test-follower".into(); - let wal_dir_path = tempdir.path(); - - let follower = Ingester::try_new( - follower_id.clone(), - ingester_pool.clone(), - wal_dir_path, - disk_capacity, - memory_capacity, - rate_limiter_settings, - replication_factor, - ) - .await - .unwrap(); - - ingester_pool.insert( - follower_id.clone(), + let (leader_ctx, mut leader) = IngesterForTest::default() + .with_node_id("test-leader") + .with_replication() + .build() + .await; + + let (follower_ctx, follower) = IngesterForTest::default() + .with_node_id("test-follower") + .with_ingester_pool(&leader_ctx.ingester_pool) + .with_replication() + .build() + .await; + + leader_ctx.ingester_pool.insert( + follower_ctx.node_id.clone(), IngesterServiceClient::new(follower.clone()), ); @@ -976,7 +1139,7 @@ mod tests { index_uid: "test-index:0".to_string(), source_id: "test-source".to_string(), shard_id: 1, - follower_id: Some(follower_id.to_string()), + follower_id: Some(follower_ctx.node_id.to_string()), doc_batch: Some(DocBatchV2::for_test(["test-doc-010"])), }, PersistSubrequest { @@ -984,7 +1147,7 @@ mod tests { index_uid: "test-index:1".to_string(), source_id: "test-source".to_string(), shard_id: 1, - follower_id: Some(follower_id.to_string()), + follower_id: Some(follower_ctx.node_id.to_string()), doc_batch: Some(DocBatchV2::for_test(["test-doc-110", "test-doc-111"])), }, ], @@ -1077,25 +1240,11 @@ mod tests { #[tokio::test] async fn test_ingester_persist_replicate_grpc() { - let tempdir = tempfile::tempdir().unwrap(); - let leader_id: NodeId = "test-leader".into(); - let ingester_pool = IngesterPool::default(); - let wal_dir_path = tempdir.path(); - let disk_capacity = ByteSize::mb(256); - let memory_capacity = ByteSize::mb(1); - let rate_limiter_settings = RateLimiterSettings::default(); - let replication_factor = 2; - let mut leader = Ingester::try_new( - leader_id.clone(), - ingester_pool.clone(), - wal_dir_path, - disk_capacity, - memory_capacity, - rate_limiter_settings, - replication_factor, - ) - .await - .unwrap(); + let (leader_ctx, mut leader) = IngesterForTest::default() + .with_node_id("test-leader") + .with_replication() + .build() + .await; let leader_grpc_server_adapter = IngesterServiceGrpcServerAdapter::new(leader.clone()); let leader_grpc_server = IngesterServiceGrpcServer::new(leader_grpc_server_adapter); @@ -1111,24 +1260,12 @@ mod tests { } }); - let tempdir = tempfile::tempdir().unwrap(); - let follower_id: NodeId = "test-follower".into(); - let wal_dir_path = tempdir.path(); - let disk_capacity = ByteSize::mb(256); - let memory_capacity = ByteSize::mb(1); - let rate_limiter_settings = RateLimiterSettings::default(); - let replication_factor = 2; - let follower = Ingester::try_new( - follower_id.clone(), - ingester_pool.clone(), - wal_dir_path, - disk_capacity, - memory_capacity, - rate_limiter_settings, - replication_factor, - ) - .await - .unwrap(); + let (follower_ctx, follower) = IngesterForTest::default() + .with_node_id("test-follower") + .with_ingester_pool(&leader_ctx.ingester_pool) + .with_replication() + .build() + .await; let follower_grpc_server_adapter = IngesterServiceGrpcServerAdapter::new(follower.clone()); let follower_grpc_server = IngesterServiceGrpcServer::new(follower_grpc_server_adapter); @@ -1149,7 +1286,9 @@ mod tests { follower_channel, ); - ingester_pool.insert(follower_id.clone(), follower_grpc_client); + leader_ctx + .ingester_pool + .insert(follower_ctx.node_id.clone(), follower_grpc_client); let persist_request = PersistRequest { leader_id: "test-leader".to_string(), @@ -1160,7 +1299,7 @@ mod tests { index_uid: "test-index:0".to_string(), source_id: "test-source".to_string(), shard_id: 1, - follower_id: Some(follower_id.to_string()), + follower_id: Some(follower_ctx.node_id.to_string()), doc_batch: Some(DocBatchV2::for_test(["test-doc-010"])), }, PersistSubrequest { @@ -1168,7 +1307,7 @@ mod tests { index_uid: "test-index:1".to_string(), source_id: "test-source".to_string(), shard_id: 1, - follower_id: Some(follower_id.to_string()), + follower_id: Some(follower_ctx.node_id.to_string()), doc_batch: Some(DocBatchV2::for_test(["test-doc-110", "test-doc-111"])), }, ], @@ -1253,40 +1392,19 @@ mod tests { #[tokio::test] async fn test_ingester_persist_shard_closed() { - let tempdir = tempfile::tempdir().unwrap(); - let self_node_id: NodeId = "test-ingester-0".into(); - let ingester_pool = IngesterPool::default(); - let wal_dir_path = tempdir.path(); - let disk_capacity = ByteSize::mib(256); - let memory_capacity = ByteSize::mib(1); - let rate_limiter_settings = RateLimiterSettings::default(); - let replication_factor = 1; - let mut ingester = Ingester::try_new( - self_node_id.clone(), - ingester_pool, - wal_dir_path, - disk_capacity, - memory_capacity, - rate_limiter_settings, - replication_factor, - ) - .await - .unwrap(); - + let (ingester_ctx, mut ingester) = IngesterForTest::default().build().await; let queue_id_01 = queue_id("test-index:0", "test-source", 1); - - let solo_shard = SoloShard::new(ShardState::Closed, Position::Beginning); - let shard = IngesterShard::Solo(solo_shard); - + let solo_shard = + IngesterShard::new_solo(ShardState::Closed, Position::Beginning, Position::Beginning); ingester .state .write() .await .shards - .insert(queue_id_01.clone(), shard); + .insert(queue_id_01.clone(), solo_shard); let persist_request = PersistRequest { - leader_id: self_node_id.to_string(), + leader_id: ingester_ctx.node_id.to_string(), commit_type: CommitTypeV2::Auto as i32, subrequests: vec![PersistSubrequest { subrequest_id: 0, @@ -1298,7 +1416,7 @@ mod tests { }], }; let persist_response = ingester.persist(persist_request).await.unwrap(); - assert_eq!(persist_response.leader_id, "test-ingester-0"); + assert_eq!(persist_response.leader_id, "test-ingester"); assert_eq!(persist_response.successes.len(), 0); assert_eq!(persist_response.failures.len(), 1); @@ -1320,32 +1438,17 @@ mod tests { #[tokio::test] async fn test_ingester_persist_rate_limited() { - let tempdir = tempfile::tempdir().unwrap(); - let self_node_id: NodeId = "test-ingester-0".into(); - let ingester_pool = IngesterPool::default(); - let wal_dir_path = tempdir.path(); - let disk_capacity = ByteSize::mib(256); - let memory_capacity = ByteSize::mib(1); - let rate_limiter_settings = RateLimiterSettings { - burst_limit: ByteSize(0), - rate_limit: ConstantRate::bytes_per_sec(ByteSize(0)), - refill_period: Duration::from_millis(100), - }; - let replication_factor = 1; - let mut ingester = Ingester::try_new( - self_node_id.clone(), - ingester_pool, - wal_dir_path, - disk_capacity, - memory_capacity, - rate_limiter_settings, - replication_factor, - ) - .await - .unwrap(); + let (ingester_ctx, mut ingester) = IngesterForTest::default() + .with_rate_limiter_settings(RateLimiterSettings { + burst_limit: ByteSize(0), + rate_limit: ConstantRate::bytes_per_sec(ByteSize(0)), + refill_period: Duration::from_millis(100), + }) + .build() + .await; let persist_request = PersistRequest { - leader_id: self_node_id.to_string(), + leader_id: ingester_ctx.node_id.to_string(), commit_type: CommitTypeV2::Auto as i32, subrequests: vec![PersistSubrequest { subrequest_id: 0, @@ -1357,7 +1460,7 @@ mod tests { }], }; let persist_response = ingester.persist(persist_request).await.unwrap(); - assert_eq!(persist_response.leader_id, "test-ingester-0"); + assert_eq!(persist_response.leader_id, "test-ingester"); assert_eq!(persist_response.successes.len(), 0); assert_eq!(persist_response.failures.len(), 1); @@ -1385,28 +1488,12 @@ mod tests { #[tokio::test] async fn test_ingester_persist_resource_exhausted() { - let tempdir = tempfile::tempdir().unwrap(); - let self_node_id: NodeId = "test-ingester-0".into(); - let ingester_pool = IngesterPool::default(); - let wal_dir_path = tempdir.path(); - let disk_capacity = ByteSize(0); - let memory_capacity = ByteSize(0); - let rate_limiter_settings = RateLimiterSettings::default(); - let replication_factor = 1; - let mut ingester = Ingester::try_new( - self_node_id.clone(), - ingester_pool, - wal_dir_path, - disk_capacity, - memory_capacity, - rate_limiter_settings, - replication_factor, - ) - .await - .unwrap(); - + let (ingester_ctx, mut ingester) = IngesterForTest::default() + .with_disk_capacity(ByteSize(0)) + .build() + .await; let persist_request = PersistRequest { - leader_id: self_node_id.to_string(), + leader_id: ingester_ctx.node_id.to_string(), commit_type: CommitTypeV2::Auto as i32, subrequests: vec![PersistSubrequest { subrequest_id: 0, @@ -1418,7 +1505,7 @@ mod tests { }], }; let persist_response = ingester.persist(persist_request).await.unwrap(); - assert_eq!(persist_response.leader_id, "test-ingester-0"); + assert_eq!(persist_response.leader_id, "test-ingester"); assert_eq!(persist_response.successes.len(), 0); assert_eq!(persist_response.failures.len(), 1); @@ -1448,28 +1535,10 @@ mod tests { #[tokio::test] async fn test_ingester_open_fetch_stream() { - let tempdir = tempfile::tempdir().unwrap(); - let self_node_id: NodeId = "test-ingester-0".into(); - let ingester_pool = IngesterPool::default(); - let wal_dir_path = tempdir.path(); - let disk_capacity = ByteSize::mb(256); - let memory_capacity = ByteSize::mb(1); - let rate_limiter_settings = RateLimiterSettings::default(); - let replication_factor = 1; - let mut ingester = Ingester::try_new( - self_node_id.clone(), - ingester_pool, - wal_dir_path, - disk_capacity, - memory_capacity, - rate_limiter_settings, - replication_factor, - ) - .await - .unwrap(); + let (ingester_ctx, mut ingester) = IngesterForTest::default().build().await; let persist_request = PersistRequest { - leader_id: self_node_id.to_string(), + leader_id: ingester_ctx.node_id.to_string(), commit_type: CommitTypeV2::Auto as i32, subrequests: vec![ PersistSubrequest { @@ -1521,7 +1590,7 @@ mod tests { assert_eq!(mrecord_batch.mrecord_lengths, [14]); let persist_request = PersistRequest { - leader_id: self_node_id.to_string(), + leader_id: ingester_ctx.node_id.to_string(), commit_type: CommitTypeV2::Auto as i32, subrequests: vec![PersistSubrequest { subrequest_id: 0, @@ -1551,36 +1620,18 @@ mod tests { #[tokio::test] async fn test_ingester_truncate() { - let tempdir = tempfile::tempdir().unwrap(); - let self_node_id: NodeId = "test-ingester-0".into(); - let ingester_pool = IngesterPool::default(); - let wal_dir_path = tempdir.path(); - let disk_capacity = ByteSize::mb(256); - let memory_capacity = ByteSize::mb(1); - let rate_limiter_settings = RateLimiterSettings::default(); - let replication_factor = 1; - let mut ingester = Ingester::try_new( - self_node_id.clone(), - ingester_pool, - wal_dir_path, - disk_capacity, - memory_capacity, - rate_limiter_settings, - replication_factor, - ) - .await - .unwrap(); + let (ingester_ctx, mut ingester) = IngesterForTest::default().build().await; let queue_id_01 = queue_id("test-index:0", "test-source", 1); let queue_id_02 = queue_id("test-index:0", "test-source", 2); let mut state_guard = ingester.state.write().await; ingester - .create_shard(&mut state_guard, &queue_id_01, &self_node_id, None) + .create_shard(&mut state_guard, &queue_id_01, &ingester_ctx.node_id, None) .await .unwrap(); ingester - .create_shard(&mut state_guard, &queue_id_02, &self_node_id, None) + .create_shard(&mut state_guard, &queue_id_02, &ingester_ctx.node_id, None) .await .unwrap(); @@ -1606,22 +1657,22 @@ mod tests { drop(state_guard); - let truncate_request = TruncateRequest { - ingester_id: self_node_id.to_string(), + let truncate_shards_request = TruncateShardsRequest { + ingester_id: ingester_ctx.node_id.to_string(), subrequests: vec![ - TruncateSubrequest { + TruncateShardsSubrequest { index_uid: "test-index:0".to_string(), source_id: "test-source".to_string(), shard_id: 1, to_position_inclusive: Some(Position::from(0u64)), }, - TruncateSubrequest { + TruncateShardsSubrequest { index_uid: "test-index:0".to_string(), source_id: "test-source".to_string(), shard_id: 2, to_position_inclusive: Some(Position::Eof), }, - TruncateSubrequest { + TruncateShardsSubrequest { index_uid: "test-index:1337".to_string(), source_id: "test-source".to_string(), shard_id: 1, @@ -1629,7 +1680,10 @@ mod tests { }, ], }; - ingester.truncate(truncate_request).await.unwrap(); + ingester + .truncate_shards(truncate_shards_request) + .await + .unwrap(); let state_guard = ingester.state.read().await; assert_eq!(state_guard.shards.len(), 2); @@ -1647,25 +1701,7 @@ mod tests { #[tokio::test] async fn test_ingester_close_shards() { - let tempdir = tempfile::tempdir().unwrap(); - let self_node_id: NodeId = "test-ingester-0".into(); - let ingester_pool = IngesterPool::default(); - let wal_dir_path = tempdir.path(); - let disk_capacity = ByteSize::mb(256); - let memory_capacity = ByteSize::mb(1); - let rate_limiter_settings = RateLimiterSettings::default(); - let replication_factor = 1; - let mut ingester = Ingester::try_new( - self_node_id.clone(), - ingester_pool, - wal_dir_path, - disk_capacity, - memory_capacity, - rate_limiter_settings, - replication_factor, - ) - .await - .unwrap(); + let (ingester_ctx, mut ingester) = IngesterForTest::default().build().await; let queue_id_01 = queue_id("test-index:0", "test-source:0", 1); let queue_id_02 = queue_id("test-index:0", "test-source:0", 2); @@ -1674,7 +1710,7 @@ mod tests { let mut state_guard = ingester.state.write().await; for queue_id in &[&queue_id_01, &queue_id_02, &queue_id_03] { ingester - .create_shard(&mut state_guard, queue_id, &self_node_id, None) + .create_shard(&mut state_guard, queue_id, &ingester_ctx.node_id, None) .await .unwrap(); let records = [ @@ -1709,17 +1745,17 @@ mod tests { Position::Beginning ); - let close_shard_1 = ClosedShards { + let close_shard_1 = ShardIds { index_uid: "test-index:0".to_string(), source_id: "test-source:0".to_string(), shard_ids: vec![1, 2], }; - let close_shard_2 = ClosedShards { + let close_shard_2 = ShardIds { index_uid: "test-index:1".to_string(), source_id: "test-source:1".to_string(), shard_ids: vec![3], }; - let close_shard_with_no_queue = ClosedShards { + let close_shard_with_no_queue = ShardIds { index_uid: "test-index:2".to_string(), source_id: "test-source:2".to_string(), shard_ids: vec![4], @@ -1730,7 +1766,7 @@ mod tests { close_shard_with_no_queue, ]; let close_shards_request = CloseShardsRequest { - closed_shards: closed_shards.clone(), + shards: closed_shards.clone(), }; ingester.close_shards(close_shards_request).await.unwrap(); @@ -1761,4 +1797,75 @@ mod tests { .unwrap(); assert_eq!(fetch_response.to_position_inclusive(), Position::Eof); } + + #[tokio::test] + async fn test_ingester_open_observation_stream() { + let (ingester_ctx, mut ingester) = IngesterForTest::default().build().await; + + let mut observation_stream = ingester + .open_observation_stream(OpenObservationStreamRequest {}) + .await + .unwrap(); + let observation = observation_stream.next().await.unwrap().unwrap(); + assert_eq!(observation.node_id, ingester_ctx.node_id); + assert_eq!(observation.status(), IngesterStatus::Ready); + + let state_guard = ingester.state.read().await; + let observe_message = ObservationMessage { + node_id: ingester_ctx.node_id.to_string(), + status: IngesterStatus::Decommissioning as i32, + }; + state_guard + .observation_tx + .send(Ok(observe_message)) + .unwrap(); + drop(state_guard); + + let observation = observation_stream.next().await.unwrap().unwrap(); + assert_eq!(observation.node_id, ingester_ctx.node_id); + assert_eq!(observation.status(), IngesterStatus::Decommissioning); + + drop(ingester); + + let observation_opt = observation_stream.next().await; + assert!(observation_opt.is_none()); + } + + #[tokio::test] + async fn test_check_decommissioning_status() { + let (_ingester_ctx, ingester) = IngesterForTest::default().build().await; + let mut state_guard = ingester.state.write().await; + + ingester.check_decommissioning_status(&mut state_guard); + assert_eq!(state_guard.status, IngesterStatus::Ready); + assert_eq!( + ingester.observation_rx.borrow().as_ref().unwrap().status(), + IngesterStatus::Ready + ); + + state_guard.status = IngesterStatus::Decommissioning; + ingester.check_decommissioning_status(&mut state_guard); + assert_eq!(state_guard.status, IngesterStatus::Decommissioned); + + state_guard.status = IngesterStatus::Decommissioning; + + let queue_id_01 = queue_id("test-index:0", "test-source", 1); + + state_guard.shards.insert( + queue_id_01.clone(), + IngesterShard::new_solo(ShardState::Closed, Position::Eof, Position::Beginning), + ); + ingester.check_decommissioning_status(&mut state_guard); + assert_eq!(state_guard.status, IngesterStatus::Decommissioning); + + let shard = state_guard.shards.get_mut(&queue_id_01).unwrap(); + shard.truncation_position_inclusive = Position::Eof; + + ingester.check_decommissioning_status(&mut state_guard); + assert_eq!(state_guard.status, IngesterStatus::Decommissioned); + assert_eq!( + ingester.observation_rx.borrow().as_ref().unwrap().status(), + IngesterStatus::Decommissioned + ); + } } diff --git a/quickwit/quickwit-ingest/src/ingest_v2/mod.rs b/quickwit/quickwit-ingest/src/ingest_v2/mod.rs index 345de74ca08..d808381a6f2 100644 --- a/quickwit/quickwit-ingest/src/ingest_v2/mod.rs +++ b/quickwit/quickwit-ingest/src/ingest_v2/mod.rs @@ -37,7 +37,7 @@ use quickwit_proto::ingest::DocBatchV2; use quickwit_proto::types::NodeId; pub use self::fetch::{FetchStreamError, MultiFetchStream}; -pub use self::ingester::Ingester; +pub use self::ingester::{wait_for_ingester_decommission, Ingester}; use self::mrecord::MRECORD_HEADER_LEN; pub use self::mrecord::{decoded_mrecords, MRecord}; pub use self::rate_limiter::RateLimiterSettings; diff --git a/quickwit/quickwit-ingest/src/ingest_v2/models.rs b/quickwit/quickwit-ingest/src/ingest_v2/models.rs index 9cf73a1acb4..4f1d3805a8c 100644 --- a/quickwit/quickwit-ingest/src/ingest_v2/models.rs +++ b/quickwit/quickwit-ingest/src/ingest_v2/models.rs @@ -17,190 +17,197 @@ // You should have received a copy of the GNU Affero General Public License // along with this program. If not, see . -use std::fmt; - use quickwit_proto::ingest::ShardState; use quickwit_proto::types::{NodeId, Position}; use tokio::sync::watch; -/// Shard hosted on a leader node and replicated on a follower node. -pub(super) struct PrimaryShard { - pub follower_id: NodeId, +#[derive(Debug, Clone, Eq, PartialEq)] +pub(super) enum IngesterShardType { + /// A primary shard hosted on a leader and replicated on a follower. + Primary { follower_id: NodeId }, + /// A replica shard hosted on a follower. + Replica { leader_id: NodeId }, + /// A shard hosted on a single node when the replication factor is set to 1. + Solo, +} + +#[derive(Debug)] +pub(super) struct IngesterShard { + pub shard_type: IngesterShardType, pub shard_state: ShardState, /// Position of the last record written in the shard's mrecordlog queue. pub replication_position_inclusive: Position, + /// Position up to which the shard has been truncated. + pub truncation_position_inclusive: Position, pub new_records_tx: watch::Sender<()>, pub new_records_rx: watch::Receiver<()>, } -impl fmt::Debug for PrimaryShard { - fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { - f.debug_struct("PrimaryShard") - .field("follower_id", &self.follower_id) - .field("shard_state", &self.shard_state) - .finish() - } -} - -impl PrimaryShard { - pub fn new(follower_id: NodeId) -> Self { +impl IngesterShard { + pub fn new_primary( + follower_id: NodeId, + shard_state: ShardState, + replication_position_inclusive: Position, + truncation_position_inclusive: Position, + ) -> Self { let (new_records_tx, new_records_rx) = watch::channel(()); Self { - follower_id, - shard_state: ShardState::Open, - replication_position_inclusive: Position::Beginning, + shard_type: IngesterShardType::Primary { follower_id }, + shard_state, + replication_position_inclusive, + truncation_position_inclusive, new_records_tx, new_records_rx, } } -} - -/// Shard hosted on a follower node and replicated from a leader node. -pub(super) struct ReplicaShard { - pub leader_id: NodeId, - pub shard_state: ShardState, - /// Position of the last record written in the shard's mrecordlog queue. - pub replication_position_inclusive: Position, - pub new_records_tx: watch::Sender<()>, - pub new_records_rx: watch::Receiver<()>, -} - -impl fmt::Debug for ReplicaShard { - fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { - f.debug_struct("ReplicaShard") - .field("leader_id", &self.leader_id) - .field("shard_state", &self.shard_state) - .finish() - } -} -impl ReplicaShard { - pub fn new(leader_id: NodeId) -> Self { + pub fn new_replica( + leader_id: NodeId, + shard_state: ShardState, + replication_position_inclusive: Position, + truncation_position_inclusive: Position, + ) -> Self { let (new_records_tx, new_records_rx) = watch::channel(()); Self { - leader_id, - shard_state: ShardState::Open, - replication_position_inclusive: Position::Beginning, + shard_type: IngesterShardType::Replica { leader_id }, + shard_state, + replication_position_inclusive, + truncation_position_inclusive, new_records_tx, new_records_rx, } } -} -/// A shard hosted on a single node when the replication factor is set to 1. When a shard is -/// recovered after a node failure, it is always recreated as a solo shard in closed state. -pub(super) struct SoloShard { - pub shard_state: ShardState, - /// Position of the last record written in the shard's mrecordlog queue. - pub replication_position_inclusive: Position, - pub new_records_tx: watch::Sender<()>, - pub new_records_rx: watch::Receiver<()>, -} - -impl fmt::Debug for SoloShard { - fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { - f.debug_struct("SoloShard") - .field("shard_state", &self.shard_state) - .finish() - } -} - -impl SoloShard { - pub fn new(shard_state: ShardState, replication_position_inclusive: Position) -> Self { + pub fn new_solo( + shard_state: ShardState, + replication_position_inclusive: Position, + truncation_position_inclusive: Position, + ) -> Self { let (new_records_tx, new_records_rx) = watch::channel(()); Self { + shard_type: IngesterShardType::Solo, shard_state, replication_position_inclusive, + truncation_position_inclusive, new_records_tx, new_records_rx, } } -} -#[derive(Debug)] -pub(super) enum IngesterShard { - /// A primary shard hosted on a leader and replicated on a follower. - Primary(PrimaryShard), - /// A replica shard hosted on a follower. - Replica(ReplicaShard), - /// A shard hosted on a single node when the replication factor is set to 1. - Solo(SoloShard), -} + pub fn notify_new_records(&mut self) { + // `new_records_tx` is guaranteed to be open because `self` also holds a receiver. + self.new_records_tx + .send(()) + .expect("channel should be open"); + } -impl IngesterShard { - pub fn is_closed(&self) -> bool { - match self { - IngesterShard::Primary(primary_shard) => &primary_shard.shard_state, - IngesterShard::Replica(replica_shard) => &replica_shard.shard_state, - IngesterShard::Solo(solo_shard) => &solo_shard.shard_state, + pub fn set_replication_position_inclusive(&mut self, replication_position_inclusive: Position) { + if self.replication_position_inclusive == replication_position_inclusive { + return; } - .is_closed() + self.replication_position_inclusive = replication_position_inclusive; + self.notify_new_records(); } +} - pub fn close(&mut self) { - let shard_state = match self { - IngesterShard::Primary(primary_shard) => &mut primary_shard.shard_state, - IngesterShard::Replica(replica_shard) => &mut replica_shard.shard_state, - IngesterShard::Solo(solo_shard) => &mut solo_shard.shard_state, - }; - *shard_state = ShardState::Closed; - } +#[cfg(test)] +mod tests { + use super::*; - pub fn replication_position_inclusive(&self) -> Position { - match self { - IngesterShard::Primary(primary_shard) => &primary_shard.replication_position_inclusive, - IngesterShard::Replica(replica_shard) => &replica_shard.replication_position_inclusive, - IngesterShard::Solo(solo_shard) => &solo_shard.replication_position_inclusive, + impl IngesterShard { + #[track_caller] + pub fn assert_is_solo(&self) { + assert!(matches!(self.shard_type, IngesterShardType::Solo { .. })) } - .clone() - } - pub fn set_replication_position_inclusive(&mut self, replication_position_inclusive: Position) { - if self.replication_position_inclusive() == replication_position_inclusive { - return; + #[track_caller] + pub fn assert_is_primary(&self) { + assert!(matches!(self.shard_type, IngesterShardType::Primary { .. })) } - match self { - IngesterShard::Primary(primary_shard) => { - primary_shard.replication_position_inclusive = replication_position_inclusive; - } - IngesterShard::Replica(replica_shard) => { - replica_shard.replication_position_inclusive = replication_position_inclusive; - } - IngesterShard::Solo(solo_shard) => { - solo_shard.replication_position_inclusive = replication_position_inclusive; - } - }; - self.notify_new_records(); - } - pub fn new_records_rx(&self) -> watch::Receiver<()> { - match self { - IngesterShard::Primary(primary_shard) => &primary_shard.new_records_rx, - IngesterShard::Replica(replica_shard) => &replica_shard.new_records_rx, - IngesterShard::Solo(solo_shard) => &solo_shard.new_records_rx, + #[track_caller] + pub fn assert_is_replica(&self) { + assert!(matches!(self.shard_type, IngesterShardType::Replica { .. })) + } + + #[track_caller] + pub fn assert_is_open(&self) { + assert!(self.shard_state.is_open()) + } + + #[track_caller] + pub fn assert_is_closed(&self) { + assert!(self.shard_state.is_closed()) } - .clone() - } - pub fn notify_new_records(&self) { - match self { - IngesterShard::Primary(primary_shard) => &primary_shard.new_records_tx, - IngesterShard::Replica(replica_shard) => &replica_shard.new_records_tx, - IngesterShard::Solo(solo_shard) => &solo_shard.new_records_tx, + #[track_caller] + pub fn assert_replication_position( + &self, + expected_replication_position: impl Into, + ) { + let expected_replication_position = expected_replication_position.into(); + + assert_eq!( + self.replication_position_inclusive, expected_replication_position, + "expected replication position at `{:?}`, got `{:?}`", + expected_replication_position, self.replication_position_inclusive + ); + } + + #[track_caller] + pub fn assert_truncation_position( + &self, + expected_truncation_position: impl Into, + ) { + let expected_truncation_position = expected_truncation_position.into(); + + assert_eq!( + self.truncation_position_inclusive, expected_truncation_position, + "expected truncation position at `{:?}`, got `{:?}`", + expected_truncation_position, self.truncation_position_inclusive + ); } - .send(()) - .expect("channel should be open"); } -} -#[cfg(test)] -mod tests { - use super::*; + #[test] + fn test_new_primary_shard() { + let primary_shard = IngesterShard::new_primary( + "test-follower".into(), + ShardState::Closed, + Position::from(42u64), + Position::Eof, + ); + assert!(matches!( + primary_shard.shard_type, + IngesterShardType::Primary { .. } + )); + assert_eq!(primary_shard.shard_state, ShardState::Closed); + assert_eq!(primary_shard.truncation_position_inclusive, Position::Eof); + assert_eq!( + primary_shard.replication_position_inclusive, + Position::from(42u64) + ); + } + + #[test] + fn test_new_replica_shard() { + let solo_shard = IngesterShard::new_solo(ShardState::Closed, 42u64.into(), Position::Eof); + assert_eq!(solo_shard.shard_type, IngesterShardType::Solo); + assert_eq!(solo_shard.shard_state, ShardState::Closed); + assert_eq!(solo_shard.truncation_position_inclusive, Position::Eof); + assert_eq!( + solo_shard.replication_position_inclusive, + Position::from(42u64) + ); + } #[test] fn test_new_solo_shard() { - let solo_shard = SoloShard::new(ShardState::Closed, Position::from(42u64)); + let solo_shard = + IngesterShard::new_solo(ShardState::Closed, Position::from(42u64), Position::Eof); + assert_eq!(solo_shard.shard_type, IngesterShardType::Solo); assert_eq!(solo_shard.shard_state, ShardState::Closed); + assert_eq!(solo_shard.truncation_position_inclusive, Position::Eof); assert_eq!( solo_shard.replication_position_inclusive, Position::from(42u64) diff --git a/quickwit/quickwit-ingest/src/ingest_v2/mrecordlog_utils.rs b/quickwit/quickwit-ingest/src/ingest_v2/mrecordlog_utils.rs index 15cf05a0465..a7d491080ff 100644 --- a/quickwit/quickwit-ingest/src/ingest_v2/mrecordlog_utils.rs +++ b/quickwit/quickwit-ingest/src/ingest_v2/mrecordlog_utils.rs @@ -20,7 +20,7 @@ use bytesize::ByteSize; use mrecordlog::error::{AppendError, MissingQueue}; use mrecordlog::MultiRecordLog; -use quickwit_proto::types::QueueId; +use quickwit_proto::types::{Position, QueueId}; use tracing::warn; use super::mrecord::is_eof_mrecord; @@ -104,8 +104,33 @@ pub(super) fn check_enough_capacity( Ok(()) } +/// Returns the position up to which the queue has been truncated (inclusive) taking into account +/// `Eof` records. Returns `None` if the queue does not exist. +pub(super) fn get_truncation_position( + mrecordlog: &MultiRecordLog, + queue_id: &QueueId, +) -> Option { + let Ok(mut mrecords) = mrecordlog.range(queue_id, ..) else { + return None; + }; + let position_opt = if let Some((position, mrecord)) = mrecords.next() { + if is_eof_mrecord(&mrecord) { + return Some(Position::Eof); + } + position.checked_sub(1) + } else { + // The queue exists and is empty. The last position should give us the truncation position. + mrecordlog + .last_position(queue_id) + .expect("queue should exist") + }; + Some(position_opt.map(Position::from).unwrap_or_default()) +} + #[cfg(test)] mod tests { + use bytes::Bytes; + use super::*; #[tokio::test] @@ -156,4 +181,44 @@ mod tests { check_enough_capacity(&mrecordlog, ByteSize::mb(256), ByteSize(12), ByteSize(12)).unwrap(); } + + #[tokio::test] + async fn test_get_truncation_position() { + let tempdir = tempfile::tempdir().unwrap(); + let mut mrecordlog = MultiRecordLog::open(tempdir.path()).await.unwrap(); + let queue_id = "test-queue".to_string(); + + let truncation_position = get_truncation_position(&mrecordlog, &queue_id); + assert!(truncation_position.is_none()); + + mrecordlog.create_queue(&queue_id).await.unwrap(); + let truncation_position = get_truncation_position(&mrecordlog, &queue_id).unwrap(); + assert_eq!(truncation_position, Position::Beginning); + + mrecordlog + .append_record(&queue_id, None, Bytes::from_static(b"test-record-foo")) + .await + .unwrap(); + mrecordlog + .append_record(&queue_id, None, Bytes::from_static(b"test-record-bar")) + .await + .unwrap(); + let truncation_position = get_truncation_position(&mrecordlog, &queue_id).unwrap(); + assert_eq!(truncation_position, Position::Beginning); + + mrecordlog.truncate(&queue_id, 0).await.unwrap(); + let truncation_position = get_truncation_position(&mrecordlog, &queue_id).unwrap(); + assert_eq!(truncation_position, Position::from(0u64)); + + mrecordlog.truncate(&queue_id, 1).await.unwrap(); + let truncation_position = get_truncation_position(&mrecordlog, &queue_id).unwrap(); + assert_eq!(truncation_position, Position::from(1u64)); + + mrecordlog + .append_record(&queue_id, None, MRecord::Eof.encode()) + .await + .unwrap(); + let truncation_position = get_truncation_position(&mrecordlog, &queue_id).unwrap(); + assert_eq!(truncation_position, Position::Eof); + } } diff --git a/quickwit/quickwit-ingest/src/ingest_v2/replication.rs b/quickwit/quickwit-ingest/src/ingest_v2/replication.rs index 4124a6f480d..8fd602352cf 100644 --- a/quickwit/quickwit-ingest/src/ingest_v2/replication.rs +++ b/quickwit/quickwit-ingest/src/ingest_v2/replication.rs @@ -26,11 +26,11 @@ use bytesize::ByteSize; use futures::{Future, StreamExt}; use quickwit_common::ServiceStream; use quickwit_proto::ingest::ingester::{ - ack_replication_message, syn_replication_message, AckReplicationMessage, ReplicateFailure, - ReplicateFailureReason, ReplicateRequest, ReplicateResponse, ReplicateSuccess, - SynReplicationMessage, + ack_replication_message, syn_replication_message, AckReplicationMessage, IngesterStatus, + ReplicateFailure, ReplicateFailureReason, ReplicateRequest, ReplicateResponse, + ReplicateSuccess, SynReplicationMessage, }; -use quickwit_proto::ingest::{CommitTypeV2, IngestV2Error, IngestV2Result}; +use quickwit_proto::ingest::{CommitTypeV2, IngestV2Error, IngestV2Result, ShardState}; use quickwit_proto::types::{NodeId, Position}; use tokio::sync::mpsc::error::TryRecvError; use tokio::sync::{mpsc, oneshot, RwLock}; @@ -38,10 +38,11 @@ use tokio::task::JoinHandle; use tracing::{error, warn}; use super::ingester::IngesterState; -use super::models::{IngesterShard, ReplicaShard}; +use super::models::IngesterShard; use super::mrecord::MRecord; use super::mrecordlog_utils::check_enough_capacity; use crate::estimate_size; +use crate::ingest_v2::models::IngesterShardType; use crate::metrics::INGEST_METRICS; pub(super) const SYN_REPLICATION_STREAM_CAPACITY: usize = 5; @@ -330,6 +331,27 @@ impl ReplicationTask { let mut state_guard = self.state.write().await; + if state_guard.status != IngesterStatus::Ready { + replicate_failures.reserve_exact(replicate_request.subrequests.len()); + + for subrequest in replicate_request.subrequests { + let replicate_failure = ReplicateFailure { + subrequest_id: subrequest.subrequest_id, + index_uid: subrequest.index_uid, + source_id: subrequest.source_id, + shard_id: subrequest.shard_id, + reason: ReplicateFailureReason::ShardClosed as i32, + }; + replicate_failures.push(replicate_failure); + } + let replicate_response = ReplicateResponse { + follower_id: replicate_request.follower_id, + successes: Vec::new(), + failures: replicate_failures, + replication_seqno: replicate_request.replication_seqno, + }; + return Ok(replicate_response); + } for subrequest in replicate_request.subrequests { let queue_id = subrequest.queue_id(); let from_position_exclusive = subrequest.from_position_exclusive(); @@ -343,8 +365,12 @@ impl ReplicationTask { .await .expect("TODO"); let leader_id: NodeId = replicate_request.leader_id.clone().into(); - let replica_shard = ReplicaShard::new(leader_id); - let shard = IngesterShard::Replica(replica_shard); + let shard = IngesterShard::new_replica( + leader_id, + ShardState::Open, + Position::Beginning, + Position::Beginning, + ); state_guard.shards.entry(queue_id.clone()).or_insert(shard) } else { state_guard @@ -352,7 +378,11 @@ impl ReplicationTask { .get(&queue_id) .expect("replica shard should be initialized") }; - if replica_shard.is_closed() { + assert!(matches!( + replica_shard.shard_type, + IngesterShardType::Replica { .. } + )); + if replica_shard.shard_state.is_closed() { let replicate_failure = ReplicateFailure { subrequest_id: subrequest.subrequest_id, index_uid: subrequest.index_uid, @@ -363,7 +393,7 @@ impl ReplicationTask { replicate_failures.push(replicate_failure); continue; } - if replica_shard.replication_position_inclusive() != from_position_exclusive { + if replica_shard.replication_position_inclusive != from_position_exclusive { // TODO } let doc_batch = match subrequest.doc_batch { @@ -377,7 +407,7 @@ impl ReplicationTask { source_id: subrequest.source_id, shard_id: subrequest.shard_id, replication_position_inclusive: Some( - replica_shard.replication_position_inclusive(), + replica_shard.replication_position_inclusive.clone(), ), }; replicate_successes.push(replicate_success); @@ -455,6 +485,7 @@ impl ReplicationTask { replicate_successes.push(replicate_success); } let follower_id = self.follower_id.clone().into(); + let replicate_response = ReplicateResponse { follower_id, successes: replicate_successes, @@ -516,9 +547,12 @@ mod tests { use std::collections::HashMap; use mrecordlog::MultiRecordLog; - use quickwit_proto::ingest::ingester::{ReplicateSubrequest, ReplicateSuccess}; - use quickwit_proto::ingest::{DocBatchV2, ShardState}; + use quickwit_proto::ingest::ingester::{ + ObservationMessage, ReplicateSubrequest, ReplicateSuccess, + }; + use quickwit_proto::ingest::DocBatchV2; use quickwit_proto::types::queue_id; + use tokio::sync::watch; use super::*; use crate::ingest_v2::test_utils::MultiRecordLogTestExt; @@ -671,12 +705,15 @@ mod tests { let follower_id: NodeId = "test-follower".into(); let tempdir = tempfile::tempdir().unwrap(); let mrecordlog = MultiRecordLog::open(tempdir.path()).await.unwrap(); + let (observation_tx, _observation_rx) = watch::channel(Ok(ObservationMessage::default())); let state = Arc::new(RwLock::new(IngesterState { mrecordlog, shards: HashMap::new(), rate_limiters: HashMap::new(), replication_streams: HashMap::new(), replication_tasks: HashMap::new(), + status: IngesterStatus::Ready, + observation_tx, })); let (syn_replication_stream_tx, syn_replication_stream) = ServiceStream::new_bounded(SYN_REPLICATION_STREAM_CAPACITY); @@ -835,12 +872,15 @@ mod tests { let follower_id: NodeId = "test-follower".into(); let tempdir = tempfile::tempdir().unwrap(); let mrecordlog = MultiRecordLog::open(tempdir.path()).await.unwrap(); + let (observation_tx, _observation_rx) = watch::channel(Ok(ObservationMessage::default())); let state = Arc::new(RwLock::new(IngesterState { mrecordlog, shards: HashMap::new(), rate_limiters: HashMap::new(), replication_streams: HashMap::new(), replication_tasks: HashMap::new(), + status: IngesterStatus::Ready, + observation_tx, })); let (syn_replication_stream_tx, syn_replication_stream) = ServiceStream::new_bounded(SYN_REPLICATION_STREAM_CAPACITY); @@ -861,16 +901,17 @@ mod tests { ); let queue_id_01 = queue_id("test-index:0", "test-source", 1); - - let mut replica_shard = ReplicaShard::new(leader_id); - replica_shard.shard_state = ShardState::Closed; - let shard = IngesterShard::Replica(replica_shard); - + let replica_shard = IngesterShard::new_replica( + leader_id, + ShardState::Closed, + Position::Beginning, + Position::Beginning, + ); state .write() .await .shards - .insert(queue_id_01.clone(), shard); + .insert(queue_id_01.clone(), replica_shard); let replicate_request = ReplicateRequest { leader_id: "test-leader".to_string(), @@ -916,12 +957,15 @@ mod tests { let follower_id: NodeId = "test-follower".into(); let tempdir = tempfile::tempdir().unwrap(); let mrecordlog = MultiRecordLog::open(tempdir.path()).await.unwrap(); + let (observation_tx, _observation_rx) = watch::channel(Ok(ObservationMessage::default())); let state = Arc::new(RwLock::new(IngesterState { mrecordlog, shards: HashMap::new(), rate_limiters: HashMap::new(), replication_streams: HashMap::new(), replication_tasks: HashMap::new(), + status: IngesterStatus::Ready, + observation_tx, })); let (syn_replication_stream_tx, syn_replication_stream) = ServiceStream::new_bounded(SYN_REPLICATION_STREAM_CAPACITY); diff --git a/quickwit/quickwit-ingest/src/ingest_v2/router.rs b/quickwit/quickwit-ingest/src/ingest_v2/router.rs index 71ec7f481c6..832212ad9f3 100644 --- a/quickwit/quickwit-ingest/src/ingest_v2/router.rs +++ b/quickwit/quickwit-ingest/src/ingest_v2/router.rs @@ -35,7 +35,7 @@ use quickwit_proto::ingest::ingester::{ use quickwit_proto::ingest::router::{ IngestRequestV2, IngestResponseV2, IngestRouterService, IngestSubrequest, }; -use quickwit_proto::ingest::{ClosedShards, CommitTypeV2, IngestV2Error, IngestV2Result}; +use quickwit_proto::ingest::{CommitTypeV2, IngestV2Error, IngestV2Result, ShardIds}; use quickwit_proto::types::{IndexUid, NodeId, ShardId, SourceId, SubrequestId}; use tokio::sync::RwLock; use tracing::{error, info, warn}; @@ -114,7 +114,7 @@ impl IngestRouter { // `closed_shards` and `unavailable_leaders` are populated by calls to `has_open_shards` // as we're looking for open shards to route the subrequests to. - let mut closed_shards: Vec = Vec::new(); + let mut closed_shards: Vec = Vec::new(); let mut unavailable_leaders: HashSet = HashSet::new(); for subrequest in subrequests { @@ -135,13 +135,13 @@ impl IngestRouter { } if !closed_shards.is_empty() { info!( - "reporting {} closed shard(s) to control-plane", + "reporting {} closed shard(s) to control plane", closed_shards.len() ) } if !unavailable_leaders.is_empty() { info!( - "reporting {} unavailable leader(s) to control-plane", + "reporting {} unavailable leader(s) to control plane", unavailable_leaders.len() ); } @@ -487,7 +487,7 @@ mod tests { assert_eq!(get_or_create_open_shard_request.closed_shards.len(), 1); assert_eq!( get_or_create_open_shard_request.closed_shards[0], - ClosedShards { + ShardIds { index_uid: "test-index-0:0".into(), source_id: "test-source".to_string(), shard_ids: vec![1], diff --git a/quickwit/quickwit-ingest/src/ingest_v2/shard_table.rs b/quickwit/quickwit-ingest/src/ingest_v2/shard_table.rs index 8109ff5388d..b25b67cde10 100644 --- a/quickwit/quickwit-ingest/src/ingest_v2/shard_table.rs +++ b/quickwit/quickwit-ingest/src/ingest_v2/shard_table.rs @@ -20,7 +20,7 @@ use std::collections::{HashMap, HashSet}; use std::sync::atomic::{AtomicUsize, Ordering}; -use quickwit_proto::ingest::{ClosedShards, Shard, ShardState}; +use quickwit_proto::ingest::{Shard, ShardIds, ShardState}; use quickwit_proto::types::{IndexId, IndexUid, NodeId, ShardId, SourceId}; use tracing::warn; @@ -184,7 +184,7 @@ impl ShardTable { &self, index_id: impl Into, source_id: impl Into, - closed_shards: &mut Vec, + closed_shards: &mut Vec, ingester_pool: &IngesterPool, unavailable_leaders: &mut HashSet, ) -> bool { @@ -195,7 +195,7 @@ impl ShardTable { entry.has_open_shards(&mut closed_shard_ids, ingester_pool, unavailable_leaders); if !closed_shard_ids.is_empty() { - closed_shards.push(ClosedShards { + closed_shards.push(ShardIds { index_uid: entry.index_uid.clone().into(), source_id: entry.source_id.clone(), shard_ids: closed_shard_ids, diff --git a/quickwit/quickwit-ingest/src/ingest_v2/test_utils.rs b/quickwit/quickwit-ingest/src/ingest_v2/test_utils.rs index 6954a71cfac..04afe07b783 100644 --- a/quickwit/quickwit-ingest/src/ingest_v2/test_utils.rs +++ b/quickwit/quickwit-ingest/src/ingest_v2/test_utils.rs @@ -20,63 +20,6 @@ use std::ops::RangeBounds; use mrecordlog::MultiRecordLog; -use quickwit_proto::types::Position; - -use super::models::IngesterShard; - -pub(super) trait IngesterShardTestExt { - fn assert_is_solo(&self); - - fn assert_is_primary(&self); - - fn assert_is_replica(&self); - - fn assert_is_open(&self); - - fn assert_is_closed(&self); - - fn assert_replication_position(&self, expected_replication_position: impl Into); -} - -impl IngesterShardTestExt for IngesterShard { - #[track_caller] - fn assert_is_solo(&self) { - assert!(matches!(self, IngesterShard::Solo(_))) - } - - #[track_caller] - fn assert_is_primary(&self) { - assert!(matches!(self, IngesterShard::Primary(_))) - } - - #[track_caller] - fn assert_is_replica(&self) { - assert!(matches!(self, IngesterShard::Replica(_))) - } - - #[track_caller] - fn assert_is_open(&self) { - assert!(!self.is_closed()) - } - - #[track_caller] - fn assert_is_closed(&self) { - assert!(self.is_closed()) - } - - #[track_caller] - fn assert_replication_position(&self, expected_replication_position: impl Into) { - let expected_replication_position = expected_replication_position.into(); - - assert_eq!( - self.replication_position_inclusive(), - expected_replication_position, - "expected replication position at `{:?}`, got `{:?}`", - expected_replication_position, - self.replication_position_inclusive() - ); - } -} pub(super) trait MultiRecordLogTestExt { fn assert_records_eq(&self, queue_id: &str, range: R, expected_records: &[(u64, &str)]) diff --git a/quickwit/quickwit-proto/protos/quickwit/control_plane.proto b/quickwit/quickwit-proto/protos/quickwit/control_plane.proto index ed630375612..6dfd55c0da1 100644 --- a/quickwit/quickwit-proto/protos/quickwit/control_plane.proto +++ b/quickwit/quickwit-proto/protos/quickwit/control_plane.proto @@ -65,7 +65,7 @@ service ControlPlaneService { message GetOrCreateOpenShardsRequest { repeated GetOrCreateOpenShardsSubrequest subrequests = 1; - repeated quickwit.ingest.ClosedShards closed_shards = 2; + repeated quickwit.ingest.ShardIds closed_shards = 2; repeated string unavailable_leaders = 3; } diff --git a/quickwit/quickwit-proto/protos/quickwit/ingest.proto b/quickwit/quickwit-proto/protos/quickwit/ingest.proto index e4653bd9b16..633c2d80371 100644 --- a/quickwit/quickwit-proto/protos/quickwit/ingest.proto +++ b/quickwit/quickwit-proto/protos/quickwit/ingest.proto @@ -79,7 +79,8 @@ message Shard { optional string publish_token = 10; } -message ClosedShards { +// A group of shards belonging to the same index and source. +message ShardIds { string index_uid = 1; string source_id = 2; repeated uint64 shard_ids = 3; diff --git a/quickwit/quickwit-proto/protos/quickwit/ingester.proto b/quickwit/quickwit-proto/protos/quickwit/ingester.proto index 8b3882611ae..cf546107a9c 100644 --- a/quickwit/quickwit-proto/protos/quickwit/ingester.proto +++ b/quickwit/quickwit-proto/protos/quickwit/ingester.proto @@ -24,24 +24,32 @@ package quickwit.ingest.ingester; import "quickwit/ingest.proto"; service IngesterService { - // Persists batches of documents to primary shards owned by a leader. + // Persists batches of documents to primary shards hosted on a leader. rpc Persist(PersistRequest) returns (PersistResponse); // Opens a replication stream from a leader to a follower. rpc OpenReplicationStream(stream SynReplicationMessage) returns (stream AckReplicationMessage); - // Streams records from a leader or a follower. The client can optionally specify a range of positions to fetch. + // Streams records from a leader or a follower. The client can optionally specify a range of positions to fetch, + // otherwise the stream will go undefinitely or until the shard is closed. rpc OpenFetchStream(OpenFetchStreamRequest) returns (stream FetchResponseV2); - // rpc OpenWatchStream(OpenWatchStreamRequest) returns (stream WatchMessage); + // Streams status updates, called "observations", from an ingester. + rpc OpenObservationStream(OpenObservationStreamRequest) returns (stream ObservationMessage); + // Truncates a set of shards at the given positions. This RPC is called by indexers on leaders AND followers. + rpc TruncateShards(TruncateShardsRequest) returns (TruncateShardsResponse); + + // Closes a set of shards. This RPC is called by the control plane. rpc CloseShards(CloseShardsRequest) returns (CloseShardsResponse); // Pings an ingester to check if it is ready to host shards and serve requests. rpc Ping(PingRequest) returns (PingResponse); - // Truncates the shards at the given positions. Indexers should call this RPC on leaders, which will replicate the request to followers. - rpc Truncate(TruncateRequest) returns (TruncateResponse); + + // Decommissions the ingester. + rpc Decommission(DecommissionRequest) returns (DecommissionResponse); + } message PersistRequest { @@ -160,19 +168,19 @@ message ReplicateFailure { ReplicateFailureReason reason = 5; } -message TruncateRequest { +message TruncateShardsRequest { string ingester_id = 1; - repeated TruncateSubrequest subrequests = 2; + repeated TruncateShardsSubrequest subrequests = 2; } -message TruncateSubrequest { +message TruncateShardsSubrequest { string index_uid = 1; string source_id = 2; uint64 shard_id = 3; quickwit.ingest.Position to_position_inclusive = 4; } -message TruncateResponse { +message TruncateShardsResponse { // TODO } @@ -194,7 +202,7 @@ message FetchResponseV2 { } message CloseShardsRequest { - repeated quickwit.ingest.ClosedShards closed_shards = 1; + repeated quickwit.ingest.ShardIds shards = 1; } message CloseShardsResponse { @@ -207,3 +215,30 @@ message PingRequest { message PingResponse { } + +message DecommissionRequest { +} + +message DecommissionResponse { +} + +message OpenObservationStreamRequest { +} + +enum IngesterStatus { + INGESTER_STATUS_UNSPECIFIED = 0; + // The ingester is ready and accepts read and write requests. + INGESTER_STATUS_READY = 1; + // The ingester is being decommissioned. It accepts read requests but rejects write requests + // (open shards, persist, and replicate requests). It will transition to `Decommissioned` once + // all shards are fully indexed. + INGESTER_STATUS_DECOMMISSIONING = 2; + // The ingester no longer accepts read and write requests. It does not hold any data and can + // be safely removed from the cluster. + INGESTER_STATUS_DECOMMISSIONED = 3; +} + +message ObservationMessage { + string node_id = 1; + IngesterStatus Status = 2; +} diff --git a/quickwit/quickwit-proto/src/codegen/quickwit/quickwit.control_plane.rs b/quickwit/quickwit-proto/src/codegen/quickwit/quickwit.control_plane.rs index e276eb95832..8715c0436d1 100644 --- a/quickwit/quickwit-proto/src/codegen/quickwit/quickwit.control_plane.rs +++ b/quickwit/quickwit-proto/src/codegen/quickwit/quickwit.control_plane.rs @@ -5,7 +5,7 @@ pub struct GetOrCreateOpenShardsRequest { #[prost(message, repeated, tag = "1")] pub subrequests: ::prost::alloc::vec::Vec, #[prost(message, repeated, tag = "2")] - pub closed_shards: ::prost::alloc::vec::Vec, + pub closed_shards: ::prost::alloc::vec::Vec, #[prost(string, repeated, tag = "3")] pub unavailable_leaders: ::prost::alloc::vec::Vec<::prost::alloc::string::String>, } diff --git a/quickwit/quickwit-proto/src/codegen/quickwit/quickwit.ingest.ingester.rs b/quickwit/quickwit-proto/src/codegen/quickwit/quickwit.ingest.ingester.rs index f9c9330b018..f65bae225a4 100644 --- a/quickwit/quickwit-proto/src/codegen/quickwit/quickwit.ingest.ingester.rs +++ b/quickwit/quickwit-proto/src/codegen/quickwit/quickwit.ingest.ingester.rs @@ -202,16 +202,16 @@ pub struct ReplicateFailure { #[derive(serde::Serialize, serde::Deserialize, utoipa::ToSchema)] #[allow(clippy::derive_partial_eq_without_eq)] #[derive(Clone, PartialEq, ::prost::Message)] -pub struct TruncateRequest { +pub struct TruncateShardsRequest { #[prost(string, tag = "1")] pub ingester_id: ::prost::alloc::string::String, #[prost(message, repeated, tag = "2")] - pub subrequests: ::prost::alloc::vec::Vec, + pub subrequests: ::prost::alloc::vec::Vec, } #[derive(serde::Serialize, serde::Deserialize, utoipa::ToSchema)] #[allow(clippy::derive_partial_eq_without_eq)] #[derive(Clone, PartialEq, ::prost::Message)] -pub struct TruncateSubrequest { +pub struct TruncateShardsSubrequest { #[prost(string, tag = "1")] pub index_uid: ::prost::alloc::string::String, #[prost(string, tag = "2")] @@ -225,7 +225,7 @@ pub struct TruncateSubrequest { #[derive(serde::Serialize, serde::Deserialize, utoipa::ToSchema)] #[allow(clippy::derive_partial_eq_without_eq)] #[derive(Clone, PartialEq, ::prost::Message)] -pub struct TruncateResponse {} +pub struct TruncateShardsResponse {} #[derive(serde::Serialize, serde::Deserialize, utoipa::ToSchema)] #[allow(clippy::derive_partial_eq_without_eq)] #[derive(Clone, PartialEq, ::prost::Message)] @@ -263,7 +263,7 @@ pub struct FetchResponseV2 { #[derive(Clone, PartialEq, ::prost::Message)] pub struct CloseShardsRequest { #[prost(message, repeated, tag = "1")] - pub closed_shards: ::prost::alloc::vec::Vec, + pub shards: ::prost::alloc::vec::Vec, } #[derive(serde::Serialize, serde::Deserialize, utoipa::ToSchema)] #[allow(clippy::derive_partial_eq_without_eq)] @@ -283,6 +283,27 @@ pub struct PingRequest { #[derive(Clone, PartialEq, ::prost::Message)] pub struct PingResponse {} #[derive(serde::Serialize, serde::Deserialize, utoipa::ToSchema)] +#[allow(clippy::derive_partial_eq_without_eq)] +#[derive(Clone, PartialEq, ::prost::Message)] +pub struct DecommissionRequest {} +#[derive(serde::Serialize, serde::Deserialize, utoipa::ToSchema)] +#[allow(clippy::derive_partial_eq_without_eq)] +#[derive(Clone, PartialEq, ::prost::Message)] +pub struct DecommissionResponse {} +#[derive(serde::Serialize, serde::Deserialize, utoipa::ToSchema)] +#[allow(clippy::derive_partial_eq_without_eq)] +#[derive(Clone, PartialEq, ::prost::Message)] +pub struct OpenObservationStreamRequest {} +#[derive(serde::Serialize, serde::Deserialize, utoipa::ToSchema)] +#[allow(clippy::derive_partial_eq_without_eq)] +#[derive(Clone, PartialEq, ::prost::Message)] +pub struct ObservationMessage { + #[prost(string, tag = "1")] + pub node_id: ::prost::alloc::string::String, + #[prost(enumeration = "IngesterStatus", tag = "2")] + pub status: i32, +} +#[derive(serde::Serialize, serde::Deserialize, utoipa::ToSchema)] #[serde(rename_all = "snake_case")] #[derive(Clone, Copy, Debug, PartialEq, Eq, Hash, PartialOrd, Ord, ::prost::Enumeration)] #[repr(i32)] @@ -355,6 +376,46 @@ impl ReplicateFailureReason { } } } +#[derive(serde::Serialize, serde::Deserialize, utoipa::ToSchema)] +#[serde(rename_all = "snake_case")] +#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash, PartialOrd, Ord, ::prost::Enumeration)] +#[repr(i32)] +pub enum IngesterStatus { + Unspecified = 0, + /// The ingester is ready and accepts read and write requests. + Ready = 1, + /// The ingester is being decommissioned. It accepts read requests but rejects write requests + /// (open shards, persist, and replicate requests). It will transition to `Decommissioned` once + /// all shards are fully indexed. + Decommissioning = 2, + /// The ingester no longer accepts read and write requests. It does not hold any data and can + /// be safely removed from the cluster. + Decommissioned = 3, +} +impl IngesterStatus { + /// String value of the enum field names used in the ProtoBuf definition. + /// + /// The values are not transformed in any way and thus are considered stable + /// (if the ProtoBuf definition does not change) and safe for programmatic use. + pub fn as_str_name(&self) -> &'static str { + match self { + IngesterStatus::Unspecified => "INGESTER_STATUS_UNSPECIFIED", + IngesterStatus::Ready => "INGESTER_STATUS_READY", + IngesterStatus::Decommissioning => "INGESTER_STATUS_DECOMMISSIONING", + IngesterStatus::Decommissioned => "INGESTER_STATUS_DECOMMISSIONED", + } + } + /// Creates an enum from field names used in the ProtoBuf definition. + pub fn from_str_name(value: &str) -> ::core::option::Option { + match value { + "INGESTER_STATUS_UNSPECIFIED" => Some(Self::Unspecified), + "INGESTER_STATUS_READY" => Some(Self::Ready), + "INGESTER_STATUS_DECOMMISSIONING" => Some(Self::Decommissioning), + "INGESTER_STATUS_DECOMMISSIONED" => Some(Self::Decommissioned), + _ => None, + } + } +} /// BEGIN quickwit-codegen #[allow(unused_imports)] use std::str::FromStr; @@ -365,7 +426,7 @@ pub type IngesterServiceStream = quickwit_common::ServiceStream< #[cfg_attr(any(test, feature = "testsuite"), mockall::automock)] #[async_trait::async_trait] pub trait IngesterService: std::fmt::Debug + dyn_clone::DynClone + Send + Sync + 'static { - /// Persists batches of documents to primary shards owned by a leader. + /// Persists batches of documents to primary shards hosted on a leader. async fn persist( &mut self, request: PersistRequest, @@ -375,11 +436,23 @@ pub trait IngesterService: std::fmt::Debug + dyn_clone::DynClone + Send + Sync + &mut self, request: quickwit_common::ServiceStream, ) -> crate::ingest::IngestV2Result>; - /// Streams records from a leader or a follower. The client can optionally specify a range of positions to fetch. + /// Streams records from a leader or a follower. The client can optionally specify a range of positions to fetch, + /// otherwise the stream will go undefinitely or until the shard is closed. async fn open_fetch_stream( &mut self, request: OpenFetchStreamRequest, ) -> crate::ingest::IngestV2Result>; + /// Streams status updates, called "observations", from an ingester. + async fn open_observation_stream( + &mut self, + request: OpenObservationStreamRequest, + ) -> crate::ingest::IngestV2Result>; + /// Truncates a set of shards at the given positions. This RPC is called by indexers on leaders AND followers. + async fn truncate_shards( + &mut self, + request: TruncateShardsRequest, + ) -> crate::ingest::IngestV2Result; + /// Closes a set of shards. This RPC is called by the control plane. async fn close_shards( &mut self, request: CloseShardsRequest, @@ -389,11 +462,11 @@ pub trait IngesterService: std::fmt::Debug + dyn_clone::DynClone + Send + Sync + &mut self, request: PingRequest, ) -> crate::ingest::IngestV2Result; - /// Truncates the shards at the given positions. Indexers should call this RPC on leaders, which will replicate the request to followers. - async fn truncate( + /// Decommissions the ingester. + async fn decommission( &mut self, - request: TruncateRequest, - ) -> crate::ingest::IngestV2Result; + request: DecommissionRequest, + ) -> crate::ingest::IngestV2Result; } dyn_clone::clone_trait_object!(IngesterService); #[cfg(any(test, feature = "testsuite"))] @@ -492,6 +565,18 @@ impl IngesterService for IngesterServiceClient { ) -> crate::ingest::IngestV2Result> { self.inner.open_fetch_stream(request).await } + async fn open_observation_stream( + &mut self, + request: OpenObservationStreamRequest, + ) -> crate::ingest::IngestV2Result> { + self.inner.open_observation_stream(request).await + } + async fn truncate_shards( + &mut self, + request: TruncateShardsRequest, + ) -> crate::ingest::IngestV2Result { + self.inner.truncate_shards(request).await + } async fn close_shards( &mut self, request: CloseShardsRequest, @@ -504,11 +589,11 @@ impl IngesterService for IngesterServiceClient { ) -> crate::ingest::IngestV2Result { self.inner.ping(request).await } - async fn truncate( + async fn decommission( &mut self, - request: TruncateRequest, - ) -> crate::ingest::IngestV2Result { - self.inner.truncate(request).await + request: DecommissionRequest, + ) -> crate::ingest::IngestV2Result { + self.inner.decommission(request).await } } #[cfg(any(test, feature = "testsuite"))] @@ -542,6 +627,20 @@ pub mod ingester_service_mock { > { self.inner.lock().await.open_fetch_stream(request).await } + async fn open_observation_stream( + &mut self, + request: super::OpenObservationStreamRequest, + ) -> crate::ingest::IngestV2Result< + IngesterServiceStream, + > { + self.inner.lock().await.open_observation_stream(request).await + } + async fn truncate_shards( + &mut self, + request: super::TruncateShardsRequest, + ) -> crate::ingest::IngestV2Result { + self.inner.lock().await.truncate_shards(request).await + } async fn close_shards( &mut self, request: super::CloseShardsRequest, @@ -554,11 +653,11 @@ pub mod ingester_service_mock { ) -> crate::ingest::IngestV2Result { self.inner.lock().await.ping(request).await } - async fn truncate( + async fn decommission( &mut self, - request: super::TruncateRequest, - ) -> crate::ingest::IngestV2Result { - self.inner.lock().await.truncate(request).await + request: super::DecommissionRequest, + ) -> crate::ingest::IngestV2Result { + self.inner.lock().await.decommission(request).await } } impl From for IngesterServiceClient { @@ -625,6 +724,38 @@ impl tower::Service for Box { Box::pin(fut) } } +impl tower::Service for Box { + type Response = IngesterServiceStream; + type Error = crate::ingest::IngestV2Error; + type Future = BoxFuture; + fn poll_ready( + &mut self, + _cx: &mut std::task::Context<'_>, + ) -> std::task::Poll> { + std::task::Poll::Ready(Ok(())) + } + fn call(&mut self, request: OpenObservationStreamRequest) -> Self::Future { + let mut svc = self.clone(); + let fut = async move { svc.open_observation_stream(request).await }; + Box::pin(fut) + } +} +impl tower::Service for Box { + type Response = TruncateShardsResponse; + type Error = crate::ingest::IngestV2Error; + type Future = BoxFuture; + fn poll_ready( + &mut self, + _cx: &mut std::task::Context<'_>, + ) -> std::task::Poll> { + std::task::Poll::Ready(Ok(())) + } + fn call(&mut self, request: TruncateShardsRequest) -> Self::Future { + let mut svc = self.clone(); + let fut = async move { svc.truncate_shards(request).await }; + Box::pin(fut) + } +} impl tower::Service for Box { type Response = CloseShardsResponse; type Error = crate::ingest::IngestV2Error; @@ -657,8 +788,8 @@ impl tower::Service for Box { Box::pin(fut) } } -impl tower::Service for Box { - type Response = TruncateResponse; +impl tower::Service for Box { + type Response = DecommissionResponse; type Error = crate::ingest::IngestV2Error; type Future = BoxFuture; fn poll_ready( @@ -667,9 +798,9 @@ impl tower::Service for Box { ) -> std::task::Poll> { std::task::Poll::Ready(Ok(())) } - fn call(&mut self, request: TruncateRequest) -> Self::Future { + fn call(&mut self, request: DecommissionRequest) -> Self::Future { let mut svc = self.clone(); - let fut = async move { svc.truncate(request).await }; + let fut = async move { svc.decommission(request).await }; Box::pin(fut) } } @@ -692,6 +823,16 @@ struct IngesterServiceTowerBlock { IngesterServiceStream, crate::ingest::IngestV2Error, >, + open_observation_stream_svc: quickwit_common::tower::BoxService< + OpenObservationStreamRequest, + IngesterServiceStream, + crate::ingest::IngestV2Error, + >, + truncate_shards_svc: quickwit_common::tower::BoxService< + TruncateShardsRequest, + TruncateShardsResponse, + crate::ingest::IngestV2Error, + >, close_shards_svc: quickwit_common::tower::BoxService< CloseShardsRequest, CloseShardsResponse, @@ -702,9 +843,9 @@ struct IngesterServiceTowerBlock { PingResponse, crate::ingest::IngestV2Error, >, - truncate_svc: quickwit_common::tower::BoxService< - TruncateRequest, - TruncateResponse, + decommission_svc: quickwit_common::tower::BoxService< + DecommissionRequest, + DecommissionResponse, crate::ingest::IngestV2Error, >, } @@ -715,9 +856,11 @@ impl Clone for IngesterServiceTowerBlock { persist_svc: self.persist_svc.clone(), open_replication_stream_svc: self.open_replication_stream_svc.clone(), open_fetch_stream_svc: self.open_fetch_stream_svc.clone(), + open_observation_stream_svc: self.open_observation_stream_svc.clone(), + truncate_shards_svc: self.truncate_shards_svc.clone(), close_shards_svc: self.close_shards_svc.clone(), ping_svc: self.ping_svc.clone(), - truncate_svc: self.truncate_svc.clone(), + decommission_svc: self.decommission_svc.clone(), } } } @@ -741,6 +884,18 @@ impl IngesterService for IngesterServiceTowerBlock { ) -> crate::ingest::IngestV2Result> { self.open_fetch_stream_svc.ready().await?.call(request).await } + async fn open_observation_stream( + &mut self, + request: OpenObservationStreamRequest, + ) -> crate::ingest::IngestV2Result> { + self.open_observation_stream_svc.ready().await?.call(request).await + } + async fn truncate_shards( + &mut self, + request: TruncateShardsRequest, + ) -> crate::ingest::IngestV2Result { + self.truncate_shards_svc.ready().await?.call(request).await + } async fn close_shards( &mut self, request: CloseShardsRequest, @@ -753,11 +908,11 @@ impl IngesterService for IngesterServiceTowerBlock { ) -> crate::ingest::IngestV2Result { self.ping_svc.ready().await?.call(request).await } - async fn truncate( + async fn decommission( &mut self, - request: TruncateRequest, - ) -> crate::ingest::IngestV2Result { - self.truncate_svc.ready().await?.call(request).await + request: DecommissionRequest, + ) -> crate::ingest::IngestV2Result { + self.decommission_svc.ready().await?.call(request).await } } #[derive(Debug, Default)] @@ -790,6 +945,24 @@ pub struct IngesterServiceTowerBlockBuilder { >, >, #[allow(clippy::type_complexity)] + open_observation_stream_layer: Option< + quickwit_common::tower::BoxLayer< + Box, + OpenObservationStreamRequest, + IngesterServiceStream, + crate::ingest::IngestV2Error, + >, + >, + #[allow(clippy::type_complexity)] + truncate_shards_layer: Option< + quickwit_common::tower::BoxLayer< + Box, + TruncateShardsRequest, + TruncateShardsResponse, + crate::ingest::IngestV2Error, + >, + >, + #[allow(clippy::type_complexity)] close_shards_layer: Option< quickwit_common::tower::BoxLayer< Box, @@ -808,11 +981,11 @@ pub struct IngesterServiceTowerBlockBuilder { >, >, #[allow(clippy::type_complexity)] - truncate_layer: Option< + decommission_layer: Option< quickwit_common::tower::BoxLayer< Box, - TruncateRequest, - TruncateResponse, + DecommissionRequest, + DecommissionResponse, crate::ingest::IngestV2Error, >, >, @@ -841,6 +1014,20 @@ impl IngesterServiceTowerBlockBuilder { Error = crate::ingest::IngestV2Error, > + Clone + Send + Sync + 'static, >::Future: Send + 'static, + L::Service: tower::Service< + OpenObservationStreamRequest, + Response = IngesterServiceStream, + Error = crate::ingest::IngestV2Error, + > + Clone + Send + Sync + 'static, + >::Future: Send + 'static, + L::Service: tower::Service< + TruncateShardsRequest, + Response = TruncateShardsResponse, + Error = crate::ingest::IngestV2Error, + > + Clone + Send + Sync + 'static, + >::Future: Send + 'static, L::Service: tower::Service< CloseShardsRequest, Response = CloseShardsResponse, @@ -854,11 +1041,11 @@ impl IngesterServiceTowerBlockBuilder { > + Clone + Send + Sync + 'static, >::Future: Send + 'static, L::Service: tower::Service< - TruncateRequest, - Response = TruncateResponse, + DecommissionRequest, + Response = DecommissionResponse, Error = crate::ingest::IngestV2Error, > + Clone + Send + Sync + 'static, - >::Future: Send + 'static, + >::Future: Send + 'static, { self.persist_layer = Some(quickwit_common::tower::BoxLayer::new(layer.clone())); self @@ -869,12 +1056,20 @@ impl IngesterServiceTowerBlockBuilder { .open_fetch_stream_layer = Some( quickwit_common::tower::BoxLayer::new(layer.clone()), ); + self + .open_observation_stream_layer = Some( + quickwit_common::tower::BoxLayer::new(layer.clone()), + ); + self + .truncate_shards_layer = Some( + quickwit_common::tower::BoxLayer::new(layer.clone()), + ); self .close_shards_layer = Some( quickwit_common::tower::BoxLayer::new(layer.clone()), ); self.ping_layer = Some(quickwit_common::tower::BoxLayer::new(layer.clone())); - self.truncate_layer = Some(quickwit_common::tower::BoxLayer::new(layer)); + self.decommission_layer = Some(quickwit_common::tower::BoxLayer::new(layer)); self } pub fn persist_layer(mut self, layer: L) -> Self @@ -924,6 +1119,37 @@ impl IngesterServiceTowerBlockBuilder { ); self } + pub fn open_observation_stream_layer(mut self, layer: L) -> Self + where + L: tower::Layer> + Send + Sync + 'static, + L::Service: tower::Service< + OpenObservationStreamRequest, + Response = IngesterServiceStream, + Error = crate::ingest::IngestV2Error, + > + Clone + Send + Sync + 'static, + >::Future: Send + 'static, + { + self + .open_observation_stream_layer = Some( + quickwit_common::tower::BoxLayer::new(layer), + ); + self + } + pub fn truncate_shards_layer(mut self, layer: L) -> Self + where + L: tower::Layer> + Send + Sync + 'static, + L::Service: tower::Service< + TruncateShardsRequest, + Response = TruncateShardsResponse, + Error = crate::ingest::IngestV2Error, + > + Clone + Send + Sync + 'static, + >::Future: Send + 'static, + { + self.truncate_shards_layer = Some(quickwit_common::tower::BoxLayer::new(layer)); + self + } pub fn close_shards_layer(mut self, layer: L) -> Self where L: tower::Layer> + Send + Sync + 'static, @@ -950,17 +1176,17 @@ impl IngesterServiceTowerBlockBuilder { self.ping_layer = Some(quickwit_common::tower::BoxLayer::new(layer)); self } - pub fn truncate_layer(mut self, layer: L) -> Self + pub fn decommission_layer(mut self, layer: L) -> Self where L: tower::Layer> + Send + Sync + 'static, L::Service: tower::Service< - TruncateRequest, - Response = TruncateResponse, + DecommissionRequest, + Response = DecommissionResponse, Error = crate::ingest::IngestV2Error, > + Clone + Send + Sync + 'static, - >::Future: Send + 'static, + >::Future: Send + 'static, { - self.truncate_layer = Some(quickwit_common::tower::BoxLayer::new(layer)); + self.decommission_layer = Some(quickwit_common::tower::BoxLayer::new(layer)); self } pub fn build(self, instance: T) -> IngesterServiceClient @@ -1017,6 +1243,18 @@ impl IngesterServiceTowerBlockBuilder { } else { quickwit_common::tower::BoxService::new(boxed_instance.clone()) }; + let open_observation_stream_svc = if let Some(layer) + = self.open_observation_stream_layer + { + layer.layer(boxed_instance.clone()) + } else { + quickwit_common::tower::BoxService::new(boxed_instance.clone()) + }; + let truncate_shards_svc = if let Some(layer) = self.truncate_shards_layer { + layer.layer(boxed_instance.clone()) + } else { + quickwit_common::tower::BoxService::new(boxed_instance.clone()) + }; let close_shards_svc = if let Some(layer) = self.close_shards_layer { layer.layer(boxed_instance.clone()) } else { @@ -1027,7 +1265,7 @@ impl IngesterServiceTowerBlockBuilder { } else { quickwit_common::tower::BoxService::new(boxed_instance.clone()) }; - let truncate_svc = if let Some(layer) = self.truncate_layer { + let decommission_svc = if let Some(layer) = self.decommission_layer { layer.layer(boxed_instance.clone()) } else { quickwit_common::tower::BoxService::new(boxed_instance.clone()) @@ -1037,9 +1275,11 @@ impl IngesterServiceTowerBlockBuilder { persist_svc, open_replication_stream_svc, open_fetch_stream_svc, + open_observation_stream_svc, + truncate_shards_svc, close_shards_svc, ping_svc, - truncate_svc, + decommission_svc, }; IngesterServiceClient::new(tower_block) } @@ -1140,6 +1380,21 @@ where crate::ingest::IngestV2Error, >, > + + tower::Service< + OpenObservationStreamRequest, + Response = IngesterServiceStream, + Error = crate::ingest::IngestV2Error, + Future = BoxFuture< + IngesterServiceStream, + crate::ingest::IngestV2Error, + >, + > + + tower::Service< + TruncateShardsRequest, + Response = TruncateShardsResponse, + Error = crate::ingest::IngestV2Error, + Future = BoxFuture, + > + tower::Service< CloseShardsRequest, Response = CloseShardsResponse, @@ -1153,10 +1408,10 @@ where Future = BoxFuture, > + tower::Service< - TruncateRequest, - Response = TruncateResponse, + DecommissionRequest, + Response = DecommissionResponse, Error = crate::ingest::IngestV2Error, - Future = BoxFuture, + Future = BoxFuture, >, { async fn persist( @@ -1177,6 +1432,18 @@ where ) -> crate::ingest::IngestV2Result> { self.call(request).await } + async fn open_observation_stream( + &mut self, + request: OpenObservationStreamRequest, + ) -> crate::ingest::IngestV2Result> { + self.call(request).await + } + async fn truncate_shards( + &mut self, + request: TruncateShardsRequest, + ) -> crate::ingest::IngestV2Result { + self.call(request).await + } async fn close_shards( &mut self, request: CloseShardsRequest, @@ -1189,10 +1456,10 @@ where ) -> crate::ingest::IngestV2Result { self.call(request).await } - async fn truncate( + async fn decommission( &mut self, - request: TruncateRequest, - ) -> crate::ingest::IngestV2Result { + request: DecommissionRequest, + ) -> crate::ingest::IngestV2Result { self.call(request).await } } @@ -1268,6 +1535,30 @@ where }) .map_err(|error| error.into()) } + async fn open_observation_stream( + &mut self, + request: OpenObservationStreamRequest, + ) -> crate::ingest::IngestV2Result> { + self.inner + .open_observation_stream(request) + .await + .map(|response| { + let streaming: tonic::Streaming<_> = response.into_inner(); + let stream = quickwit_common::ServiceStream::from(streaming); + stream.map_err(|error| error.into()) + }) + .map_err(|error| error.into()) + } + async fn truncate_shards( + &mut self, + request: TruncateShardsRequest, + ) -> crate::ingest::IngestV2Result { + self.inner + .truncate_shards(request) + .await + .map(|response| response.into_inner()) + .map_err(|error| error.into()) + } async fn close_shards( &mut self, request: CloseShardsRequest, @@ -1288,12 +1579,12 @@ where .map(|response| response.into_inner()) .map_err(|error| error.into()) } - async fn truncate( + async fn decommission( &mut self, - request: TruncateRequest, - ) -> crate::ingest::IngestV2Result { + request: DecommissionRequest, + ) -> crate::ingest::IngestV2Result { self.inner - .truncate(request) + .decommission(request) .await .map(|response| response.into_inner()) .map_err(|error| error.into()) @@ -1356,6 +1647,31 @@ for IngesterServiceGrpcServerAdapter { .map(|stream| tonic::Response::new(stream.map_err(|error| error.into()))) .map_err(|error| error.into()) } + type OpenObservationStreamStream = quickwit_common::ServiceStream< + tonic::Result, + >; + async fn open_observation_stream( + &self, + request: tonic::Request, + ) -> Result, tonic::Status> { + self.inner + .clone() + .open_observation_stream(request.into_inner()) + .await + .map(|stream| tonic::Response::new(stream.map_err(|error| error.into()))) + .map_err(|error| error.into()) + } + async fn truncate_shards( + &self, + request: tonic::Request, + ) -> Result, tonic::Status> { + self.inner + .clone() + .truncate_shards(request.into_inner()) + .await + .map(tonic::Response::new) + .map_err(|error| error.into()) + } async fn close_shards( &self, request: tonic::Request, @@ -1378,13 +1694,13 @@ for IngesterServiceGrpcServerAdapter { .map(tonic::Response::new) .map_err(|error| error.into()) } - async fn truncate( + async fn decommission( &self, - request: tonic::Request, - ) -> Result, tonic::Status> { + request: tonic::Request, + ) -> Result, tonic::Status> { self.inner .clone() - .truncate(request.into_inner()) + .decommission(request.into_inner()) .await .map(tonic::Response::new) .map_err(|error| error.into()) @@ -1475,7 +1791,7 @@ pub mod ingester_service_grpc_client { self.inner = self.inner.max_encoding_message_size(limit); self } - /// Persists batches of documents to primary shards owned by a leader. + /// Persists batches of documents to primary shards hosted on a leader. pub async fn persist( &mut self, request: impl tonic::IntoRequest, @@ -1539,7 +1855,8 @@ pub mod ingester_service_grpc_client { ); self.inner.streaming(req, path, codec).await } - /// Streams records from a leader or a follower. The client can optionally specify a range of positions to fetch. + /// Streams records from a leader or a follower. The client can optionally specify a range of positions to fetch, + /// otherwise the stream will go undefinitely or until the shard is closed. pub async fn open_fetch_stream( &mut self, request: impl tonic::IntoRequest, @@ -1570,6 +1887,69 @@ pub mod ingester_service_grpc_client { ); self.inner.server_streaming(req, path, codec).await } + /// Streams status updates, called "observations", from an ingester. + pub async fn open_observation_stream( + &mut self, + request: impl tonic::IntoRequest, + ) -> std::result::Result< + tonic::Response>, + tonic::Status, + > { + self.inner + .ready() + .await + .map_err(|e| { + tonic::Status::new( + tonic::Code::Unknown, + format!("Service was not ready: {}", e.into()), + ) + })?; + let codec = tonic::codec::ProstCodec::default(); + let path = http::uri::PathAndQuery::from_static( + "/quickwit.ingest.ingester.IngesterService/OpenObservationStream", + ); + let mut req = request.into_request(); + req.extensions_mut() + .insert( + GrpcMethod::new( + "quickwit.ingest.ingester.IngesterService", + "OpenObservationStream", + ), + ); + self.inner.server_streaming(req, path, codec).await + } + /// Truncates a set of shards at the given positions. This RPC is called by indexers on leaders AND followers. + pub async fn truncate_shards( + &mut self, + request: impl tonic::IntoRequest, + ) -> std::result::Result< + tonic::Response, + tonic::Status, + > { + self.inner + .ready() + .await + .map_err(|e| { + tonic::Status::new( + tonic::Code::Unknown, + format!("Service was not ready: {}", e.into()), + ) + })?; + let codec = tonic::codec::ProstCodec::default(); + let path = http::uri::PathAndQuery::from_static( + "/quickwit.ingest.ingester.IngesterService/TruncateShards", + ); + let mut req = request.into_request(); + req.extensions_mut() + .insert( + GrpcMethod::new( + "quickwit.ingest.ingester.IngesterService", + "TruncateShards", + ), + ); + self.inner.unary(req, path, codec).await + } + /// Closes a set of shards. This RPC is called by the control plane. pub async fn close_shards( &mut self, request: impl tonic::IntoRequest, @@ -1625,12 +2005,12 @@ pub mod ingester_service_grpc_client { ); self.inner.unary(req, path, codec).await } - /// Truncates the shards at the given positions. Indexers should call this RPC on leaders, which will replicate the request to followers. - pub async fn truncate( + /// Decommissions the ingester. + pub async fn decommission( &mut self, - request: impl tonic::IntoRequest, + request: impl tonic::IntoRequest, ) -> std::result::Result< - tonic::Response, + tonic::Response, tonic::Status, > { self.inner @@ -1644,14 +2024,14 @@ pub mod ingester_service_grpc_client { })?; let codec = tonic::codec::ProstCodec::default(); let path = http::uri::PathAndQuery::from_static( - "/quickwit.ingest.ingester.IngesterService/Truncate", + "/quickwit.ingest.ingester.IngesterService/Decommission", ); let mut req = request.into_request(); req.extensions_mut() .insert( GrpcMethod::new( "quickwit.ingest.ingester.IngesterService", - "Truncate", + "Decommission", ), ); self.inner.unary(req, path, codec).await @@ -1665,7 +2045,7 @@ pub mod ingester_service_grpc_server { /// Generated trait containing gRPC methods that should be implemented for use with IngesterServiceGrpcServer. #[async_trait] pub trait IngesterServiceGrpc: Send + Sync + 'static { - /// Persists batches of documents to primary shards owned by a leader. + /// Persists batches of documents to primary shards hosted on a leader. async fn persist( &self, request: tonic::Request, @@ -1690,7 +2070,8 @@ pub mod ingester_service_grpc_server { > + Send + 'static; - /// Streams records from a leader or a follower. The client can optionally specify a range of positions to fetch. + /// Streams records from a leader or a follower. The client can optionally specify a range of positions to fetch, + /// otherwise the stream will go undefinitely or until the shard is closed. async fn open_fetch_stream( &self, request: tonic::Request, @@ -1698,6 +2079,29 @@ pub mod ingester_service_grpc_server { tonic::Response, tonic::Status, >; + /// Server streaming response type for the OpenObservationStream method. + type OpenObservationStreamStream: futures_core::Stream< + Item = std::result::Result, + > + + Send + + 'static; + /// Streams status updates, called "observations", from an ingester. + async fn open_observation_stream( + &self, + request: tonic::Request, + ) -> std::result::Result< + tonic::Response, + tonic::Status, + >; + /// Truncates a set of shards at the given positions. This RPC is called by indexers on leaders AND followers. + async fn truncate_shards( + &self, + request: tonic::Request, + ) -> std::result::Result< + tonic::Response, + tonic::Status, + >; + /// Closes a set of shards. This RPC is called by the control plane. async fn close_shards( &self, request: tonic::Request, @@ -1710,12 +2114,12 @@ pub mod ingester_service_grpc_server { &self, request: tonic::Request, ) -> std::result::Result, tonic::Status>; - /// Truncates the shards at the given positions. Indexers should call this RPC on leaders, which will replicate the request to followers. - async fn truncate( + /// Decommissions the ingester. + async fn decommission( &self, - request: tonic::Request, + request: tonic::Request, ) -> std::result::Result< - tonic::Response, + tonic::Response, tonic::Status, >; } @@ -1939,6 +2343,100 @@ pub mod ingester_service_grpc_server { }; Box::pin(fut) } + "/quickwit.ingest.ingester.IngesterService/OpenObservationStream" => { + #[allow(non_camel_case_types)] + struct OpenObservationStreamSvc(pub Arc); + impl< + T: IngesterServiceGrpc, + > tonic::server::ServerStreamingService< + super::OpenObservationStreamRequest, + > for OpenObservationStreamSvc { + type Response = super::ObservationMessage; + type ResponseStream = T::OpenObservationStreamStream; + type Future = BoxFuture< + tonic::Response, + tonic::Status, + >; + fn call( + &mut self, + request: tonic::Request, + ) -> Self::Future { + let inner = Arc::clone(&self.0); + let fut = async move { + (*inner).open_observation_stream(request).await + }; + Box::pin(fut) + } + } + let accept_compression_encodings = self.accept_compression_encodings; + let send_compression_encodings = self.send_compression_encodings; + let max_decoding_message_size = self.max_decoding_message_size; + let max_encoding_message_size = self.max_encoding_message_size; + let inner = self.inner.clone(); + let fut = async move { + let inner = inner.0; + let method = OpenObservationStreamSvc(inner); + let codec = tonic::codec::ProstCodec::default(); + let mut grpc = tonic::server::Grpc::new(codec) + .apply_compression_config( + accept_compression_encodings, + send_compression_encodings, + ) + .apply_max_message_size_config( + max_decoding_message_size, + max_encoding_message_size, + ); + let res = grpc.server_streaming(method, req).await; + Ok(res) + }; + Box::pin(fut) + } + "/quickwit.ingest.ingester.IngesterService/TruncateShards" => { + #[allow(non_camel_case_types)] + struct TruncateShardsSvc(pub Arc); + impl< + T: IngesterServiceGrpc, + > tonic::server::UnaryService + for TruncateShardsSvc { + type Response = super::TruncateShardsResponse; + type Future = BoxFuture< + tonic::Response, + tonic::Status, + >; + fn call( + &mut self, + request: tonic::Request, + ) -> Self::Future { + let inner = Arc::clone(&self.0); + let fut = async move { + (*inner).truncate_shards(request).await + }; + Box::pin(fut) + } + } + let accept_compression_encodings = self.accept_compression_encodings; + let send_compression_encodings = self.send_compression_encodings; + let max_decoding_message_size = self.max_decoding_message_size; + let max_encoding_message_size = self.max_encoding_message_size; + let inner = self.inner.clone(); + let fut = async move { + let inner = inner.0; + let method = TruncateShardsSvc(inner); + let codec = tonic::codec::ProstCodec::default(); + let mut grpc = tonic::server::Grpc::new(codec) + .apply_compression_config( + accept_compression_encodings, + send_compression_encodings, + ) + .apply_max_message_size_config( + max_decoding_message_size, + max_encoding_message_size, + ); + let res = grpc.unary(method, req).await; + Ok(res) + }; + Box::pin(fut) + } "/quickwit.ingest.ingester.IngesterService/CloseShards" => { #[allow(non_camel_case_types)] struct CloseShardsSvc(pub Arc); @@ -2028,24 +2526,26 @@ pub mod ingester_service_grpc_server { }; Box::pin(fut) } - "/quickwit.ingest.ingester.IngesterService/Truncate" => { + "/quickwit.ingest.ingester.IngesterService/Decommission" => { #[allow(non_camel_case_types)] - struct TruncateSvc(pub Arc); + struct DecommissionSvc(pub Arc); impl< T: IngesterServiceGrpc, - > tonic::server::UnaryService - for TruncateSvc { - type Response = super::TruncateResponse; + > tonic::server::UnaryService + for DecommissionSvc { + type Response = super::DecommissionResponse; type Future = BoxFuture< tonic::Response, tonic::Status, >; fn call( &mut self, - request: tonic::Request, + request: tonic::Request, ) -> Self::Future { let inner = Arc::clone(&self.0); - let fut = async move { (*inner).truncate(request).await }; + let fut = async move { + (*inner).decommission(request).await + }; Box::pin(fut) } } @@ -2056,7 +2556,7 @@ pub mod ingester_service_grpc_server { let inner = self.inner.clone(); let fut = async move { let inner = inner.0; - let method = TruncateSvc(inner); + let method = DecommissionSvc(inner); let codec = tonic::codec::ProstCodec::default(); let mut grpc = tonic::server::Grpc::new(codec) .apply_compression_config( diff --git a/quickwit/quickwit-proto/src/codegen/quickwit/quickwit.ingest.rs b/quickwit/quickwit-proto/src/codegen/quickwit/quickwit.ingest.rs index 6ad8e0e259d..d998449d041 100644 --- a/quickwit/quickwit-proto/src/codegen/quickwit/quickwit.ingest.rs +++ b/quickwit/quickwit-proto/src/codegen/quickwit/quickwit.ingest.rs @@ -51,10 +51,11 @@ pub struct Shard { #[serde(default, skip_serializing_if = "Option::is_none")] pub publish_token: ::core::option::Option<::prost::alloc::string::String>, } +/// A group of shards belonging to the same index and source. #[derive(serde::Serialize, serde::Deserialize, utoipa::ToSchema)] #[allow(clippy::derive_partial_eq_without_eq)] #[derive(Clone, PartialEq, ::prost::Message)] -pub struct ClosedShards { +pub struct ShardIds { #[prost(string, tag = "1")] pub index_uid: ::prost::alloc::string::String, #[prost(string, tag = "2")] diff --git a/quickwit/quickwit-proto/src/ingest/ingester.rs b/quickwit/quickwit-proto/src/ingest/ingester.rs index 2168e35c4f5..f24a9de9578 100644 --- a/quickwit/quickwit-proto/src/ingest/ingester.rs +++ b/quickwit/quickwit-proto/src/ingest/ingester.rs @@ -140,7 +140,7 @@ impl ReplicateSuccess { } } -impl TruncateSubrequest { +impl TruncateShardsSubrequest { pub fn queue_id(&self) -> QueueId { queue_id(&self.index_uid, &self.source_id, self.shard_id) } diff --git a/quickwit/quickwit-proto/src/ingest/mod.rs b/quickwit/quickwit-proto/src/ingest/mod.rs index 2563cba0394..75b3ff0fab4 100644 --- a/quickwit/quickwit-proto/src/ingest/mod.rs +++ b/quickwit/quickwit-proto/src/ingest/mod.rs @@ -193,7 +193,7 @@ impl ShardState { } } -impl ClosedShards { +impl ShardIds { pub fn queue_ids(&self) -> impl Iterator + '_ { self.shard_ids .iter() diff --git a/quickwit/quickwit-serve/src/lib.rs b/quickwit/quickwit-serve/src/lib.rs index f244c12b88d..b3581873045 100644 --- a/quickwit/quickwit-serve/src/lib.rs +++ b/quickwit/quickwit-serve/src/lib.rs @@ -68,8 +68,8 @@ use quickwit_index_management::{IndexService as IndexManager, IndexServiceError} use quickwit_indexing::actors::IndexingService; use quickwit_indexing::start_indexing_service; use quickwit_ingest::{ - start_ingest_api_service, GetMemoryCapacity, IngestApiService, IngestRequest, IngestRouter, - IngestServiceClient, Ingester, IngesterPool, RateLimiterSettings, + start_ingest_api_service, wait_for_ingester_decommission, GetMemoryCapacity, IngestApiService, + IngestRequest, IngestRouter, IngestServiceClient, Ingester, IngesterPool, RateLimiterSettings, }; use quickwit_janitor::{start_janitor_service, JanitorService}; use quickwit_metastore::{ @@ -464,7 +464,7 @@ pub async fn serve_quickwit( indexing_service_opt, ingest_router_service, ingest_service, - ingester_service_opt, + ingester_service_opt: ingester_service_opt.clone(), janitor_service_opt, search_service, }); @@ -518,13 +518,18 @@ pub async fn serve_quickwit( let shutdown_handle = tokio::spawn(async move { shutdown_signal.await; + // We must decommission the ingester first before terminating the indexing pipelines that + // may consume from it. We also need to keep the gRPC server running while doing so. + wait_for_ingester_decommission(ingester_service_opt).await; + let actor_exit_statuses = universe.quit().await; + if grpc_shutdown_trigger_tx.send(()).is_err() { debug!("gRPC server shutdown signal receiver was dropped."); } if rest_shutdown_trigger_tx.send(()).is_err() { debug!("REST server shutdown signal receiver was dropped."); } - universe.quit().await + actor_exit_statuses }); let grpc_join_handle = tokio::spawn(grpc_server); let rest_join_handle = tokio::spawn(rest_server); @@ -768,13 +773,13 @@ async fn node_readiness_reporting_task( // the gRPC server failed. return; }; - info!("gRPC server is ready."); + info!("gRPC server is ready"); if rest_readiness_signal_rx.await.is_err() { // the REST server failed. return; }; - info!("REST server is ready."); + info!("REST server is ready"); let mut interval = tokio::time::interval(READINESS_REPORTING_INTERVAL); @@ -783,11 +788,11 @@ async fn node_readiness_reporting_task( let node_ready = match metastore.check_connectivity().await { Ok(()) => { - debug!(metastore_endpoints=?metastore.endpoints(), "Metastore service is available."); + debug!(metastore_endpoints=?metastore.endpoints(), "metastore service is available."); true } Err(error) => { - warn!(metastore_endpoints=?metastore.endpoints(), error=?error, "Metastore service is unavailable."); + warn!(metastore_endpoints=?metastore.endpoints(), error=?error, "metastore service is unavailable."); false } };