diff --git a/quickwit/quickwit-indexing/src/source/ingest/mod.rs b/quickwit/quickwit-indexing/src/source/ingest/mod.rs index 4449841bebb..13c0a883e79 100644 --- a/quickwit/quickwit-indexing/src/source/ingest/mod.rs +++ b/quickwit/quickwit-indexing/src/source/ingest/mod.rs @@ -280,15 +280,15 @@ impl IngestSource { } } - async fn truncate(&mut self, truncate_positions: Vec<(ShardId, Position)>) { + async fn truncate(&mut self, truncate_up_to_positions: Vec<(ShardId, Position)>) { let shard_positions_update = LocalShardPositionsUpdate::new( self.client_id.source_uid.clone(), - truncate_positions.clone(), + truncate_up_to_positions.clone(), ); // Let's record all shards that have reached Eof as complete. - for (shard, truncate_position) in &truncate_positions { - if truncate_position.is_eof() { + for (shard, truncate_up_to_position_inclusive) in &truncate_up_to_positions { + if truncate_up_to_position_inclusive.is_eof() { if let Some(assigned_shard) = self.assigned_shards.get_mut(shard) { assigned_shard.status = IndexingStatus::Complete; } @@ -305,8 +305,8 @@ impl IngestSource { Vec, > = FnvHashMap::default(); - for (shard_id, to_position_exclusive) in truncate_positions { - if matches!(to_position_exclusive, Position::Beginning) { + for (shard_id, truncate_up_to_position_inclusive) in truncate_up_to_positions { + if matches!(truncate_up_to_position_inclusive, Position::Beginning) { continue; } let Some(shard) = self.assigned_shards.get(&shard_id) else { @@ -317,7 +317,7 @@ impl IngestSource { index_uid: self.client_id.source_uid.index_uid.clone().into(), source_id: self.client_id.source_uid.source_id.clone(), shard_id, - to_position_inclusive: Some(to_position_exclusive.clone()), + truncate_up_to_position_inclusive: Some(truncate_up_to_position_inclusive), }; if let Some(follower_id) = &shard.follower_id_opt { per_ingester_truncate_subrequests @@ -861,7 +861,10 @@ mod tests { let subrequest = &request.subrequests[0]; assert_eq!(subrequest.index_uid, "test-index:0"); assert_eq!(subrequest.source_id, "test-source"); - assert_eq!(subrequest.to_position_inclusive(), Position::offset(10u64)); + assert_eq!( + subrequest.truncate_up_to_position_inclusive(), + Position::offset(10u64) + ); let response = TruncateShardsResponse {}; Ok(response) @@ -878,7 +881,10 @@ mod tests { let subrequest = &request.subrequests[0]; assert_eq!(subrequest.index_uid, "test-index:0"); assert_eq!(subrequest.source_id, "test-source"); - assert_eq!(subrequest.to_position_inclusive(), Position::offset(11u64)); + assert_eq!( + subrequest.truncate_up_to_position_inclusive(), + Position::offset(11u64) + ); Ok(TruncateShardsResponse {}) }); @@ -896,12 +902,18 @@ mod tests { let subrequest = &request.subrequests[0]; assert_eq!(subrequest.index_uid, "test-index:0"); assert_eq!(subrequest.source_id, "test-source"); - assert_eq!(subrequest.to_position_inclusive(), Position::offset(11u64)); + assert_eq!( + subrequest.truncate_up_to_position_inclusive(), + Position::offset(11u64) + ); let subrequest = &request.subrequests[1]; assert_eq!(subrequest.index_uid, "test-index:0"); assert_eq!(subrequest.source_id, "test-source"); - assert_eq!(subrequest.to_position_inclusive(), Position::offset(12u64)); + assert_eq!( + subrequest.truncate_up_to_position_inclusive(), + Position::offset(12u64) + ); let response = TruncateShardsResponse {}; Ok(response) @@ -1091,14 +1103,17 @@ mod tests { assert_eq!(subrequest_0.index_uid, "test-index:0"); assert_eq!(subrequest_0.source_id, "test-source"); assert_eq!(subrequest_0.shard_id, 1); - assert_eq!(subrequest_0.to_position_inclusive(), Position::eof(11u64)); + assert_eq!( + subrequest_0.truncate_up_to_position_inclusive(), + Position::eof(11u64) + ); let subrequest_1 = &request.subrequests[1]; assert_eq!(subrequest_1.index_uid, "test-index:0"); assert_eq!(subrequest_1.source_id, "test-source"); assert_eq!(subrequest_1.shard_id, 2); assert_eq!( - subrequest_1.to_position_inclusive(), + subrequest_1.truncate_up_to_position_inclusive(), Position::Beginning.as_eof() ); @@ -1250,13 +1265,19 @@ mod tests { assert_eq!(subrequest.index_uid, "test-index:0"); assert_eq!(subrequest.source_id, "test-source"); assert_eq!(subrequest.shard_id, 1); - assert_eq!(subrequest.to_position_inclusive(), Position::offset(11u64)); + assert_eq!( + subrequest.truncate_up_to_position_inclusive(), + Position::offset(11u64) + ); let subrequest = &request.subrequests[1]; assert_eq!(subrequest.index_uid, "test-index:0"); assert_eq!(subrequest.source_id, "test-source"); assert_eq!(subrequest.shard_id, 2); - assert_eq!(subrequest.to_position_inclusive(), Position::eof(22u64)); + assert_eq!( + subrequest.truncate_up_to_position_inclusive(), + Position::eof(22u64) + ); let response = TruncateShardsResponse {}; Ok(response) @@ -1512,20 +1533,23 @@ mod tests { let subrequest_0 = &request.subrequests[0]; assert_eq!(subrequest_0.shard_id, 1); assert_eq!( - subrequest_0.to_position_inclusive(), + subrequest_0.truncate_up_to_position_inclusive(), Position::offset(11u64) ); let subrequest_1 = &request.subrequests[1]; assert_eq!(subrequest_1.shard_id, 2); assert_eq!( - subrequest_1.to_position_inclusive(), + subrequest_1.truncate_up_to_position_inclusive(), Position::offset(22u64) ); let subrequest_2 = &request.subrequests[2]; assert_eq!(subrequest_2.shard_id, 3); - assert_eq!(subrequest_2.to_position_inclusive(), Position::eof(33u64)); + assert_eq!( + subrequest_2.truncate_up_to_position_inclusive(), + Position::eof(33u64) + ); Ok(TruncateShardsResponse {}) }); @@ -1543,13 +1567,16 @@ mod tests { let subrequest_0 = &request.subrequests[0]; assert_eq!(subrequest_0.shard_id, 2); assert_eq!( - subrequest_0.to_position_inclusive(), + subrequest_0.truncate_up_to_position_inclusive(), Position::offset(22u64) ); let subrequest_1 = &request.subrequests[1]; assert_eq!(subrequest_1.shard_id, 3); - assert_eq!(subrequest_1.to_position_inclusive(), Position::eof(33u64)); + assert_eq!( + subrequest_1.truncate_up_to_position_inclusive(), + Position::eof(33u64) + ); Ok(TruncateShardsResponse {}) }); @@ -1567,7 +1594,7 @@ mod tests { let subrequest_0 = &request.subrequests[0]; assert_eq!(subrequest_0.shard_id, 4); assert_eq!( - subrequest_0.to_position_inclusive(), + subrequest_0.truncate_up_to_position_inclusive(), Position::offset(44u64) ); diff --git a/quickwit/quickwit-ingest/src/ingest_v2/ingester.rs b/quickwit/quickwit-ingest/src/ingest_v2/ingester.rs index 479928f6dec..51ac518a074 100644 --- a/quickwit/quickwit-ingest/src/ingest_v2/ingester.rs +++ b/quickwit/quickwit-ingest/src/ingest_v2/ingester.rs @@ -22,19 +22,21 @@ use std::collections::HashMap; use std::fmt; use std::iter::once; use std::path::Path; -use std::sync::Arc; +use std::sync::{Arc, Weak}; use std::time::Duration; use async_trait::async_trait; use bytesize::ByteSize; use futures::stream::FuturesUnordered; use futures::StreamExt; -use mrecordlog::error::{CreateQueueError, TruncateError}; +use mrecordlog::error::{CreateQueueError, DeleteQueueError, TruncateError}; use mrecordlog::MultiRecordLog; use quickwit_cluster::Cluster; +use quickwit_common::pubsub::{EventBroker, EventSubscriber}; use quickwit_common::rate_limiter::{RateLimiter, RateLimiterSettings}; use quickwit_common::tower::Pool; use quickwit_common::ServiceStream; +use quickwit_proto::indexing::ShardPositionsUpdate; use quickwit_proto::ingest::ingester::{ AckReplicationMessage, CloseShardsRequest, CloseShardsResponse, DecommissionRequest, DecommissionResponse, FetchMessage, IngesterService, IngesterServiceClient, @@ -46,14 +48,14 @@ use quickwit_proto::ingest::ingester::{ TruncateShardsRequest, TruncateShardsResponse, }; use quickwit_proto::ingest::{CommitTypeV2, IngestV2Error, IngestV2Result, Shard, ShardState}; -use quickwit_proto::types::{NodeId, Position, QueueId}; +use quickwit_proto::types::{queue_id, NodeId, Position, QueueId}; use tokio::sync::{watch, RwLock}; use tracing::{debug, error, info, warn}; use super::fetch::FetchStreamTask; use super::models::IngesterShard; use super::mrecord::MRecord; -use super::mrecordlog_utils::check_enough_capacity; +use super::mrecordlog_utils::{check_enough_capacity, force_delete_queue}; use super::rate_meter::RateMeter; use super::replication::{ ReplicationClient, ReplicationStreamTask, ReplicationStreamTaskHandle, ReplicationTask, @@ -61,7 +63,7 @@ use super::replication::{ }; use super::IngesterPool; use crate::ingest_v2::broadcast::BroadcastLocalShardsTask; -use crate::ingest_v2::mrecordlog_utils::{delete_queue, queue_position_range}; +use crate::ingest_v2::mrecordlog_utils::queue_position_range; use crate::metrics::INGEST_METRICS; use crate::{estimate_size, FollowerId, LeaderId}; @@ -204,36 +206,36 @@ impl Ingester { let mut num_deleted_shards = 0; for queue_id in queue_ids { - let Some(position_range) = queue_position_range(&state_guard.mrecordlog, &queue_id) - else { - // Delete empty queue. - delete_queue(&mut state_guard.mrecordlog, &queue_id) + if let Some(position_range) = queue_position_range(&state_guard.mrecordlog, &queue_id) { + // The queue is not empty: recover it. + let replication_position_inclusive = Position::offset(*position_range.end()); + let truncation_position_inclusive = if *position_range.start() == 0 { + Position::Beginning + } else { + Position::offset(*position_range.start() - 1) + }; + let solo_shard = IngesterShard::new_solo( + ShardState::Closed, + replication_position_inclusive, + truncation_position_inclusive, + ); + state_guard.shards.insert(queue_id.clone(), solo_shard); + + let rate_limiter = RateLimiter::from_settings(self.rate_limiter_settings); + let rate_meter = RateMeter::default(); + state_guard + .rate_trackers + .insert(queue_id, (rate_limiter, rate_meter)); + + num_closed_shards += 1; + } else { + // The queue is empty: delete it. + force_delete_queue(&mut state_guard.mrecordlog, &queue_id) .await .expect("TODO: handle IO error"); num_deleted_shards += 1; - continue; - }; - let replication_position_inclusive = Position::offset(*position_range.end()); - let truncation_position_inclusive = if *position_range.start() == 0 { - Position::Beginning - } else { - Position::offset(*position_range.start() - 1) - }; - let solo_shard = IngesterShard::new_solo( - ShardState::Closed, - replication_position_inclusive, - truncation_position_inclusive, - ); - state_guard.shards.insert(queue_id.clone(), solo_shard); - - let rate_limiter = RateLimiter::from_settings(self.rate_limiter_settings); - let rate_meter = RateMeter::default(); - state_guard - .rate_trackers - .insert(queue_id, (rate_limiter, rate_meter)); - - num_closed_shards += 1; + } } if num_closed_shards > 0 { info!("recovered and closed {num_closed_shards} shard(s)"); @@ -363,6 +365,13 @@ impl Ingester { entry.insert(replication_stream_task_handle); Ok(replication_client) } + + pub fn subscribe(&self, event_broker: &EventBroker) { + let weak_ingester_state = WeakIngesterState(Arc::downgrade(&self.state)); + event_broker + .subscribe::(weak_ingester_state) + .forever(); + } } #[async_trait] @@ -762,29 +771,14 @@ impl IngesterService for Ingester { for subrequest in truncate_shards_request.subrequests { let queue_id = subrequest.queue_id(); - let to_position_inclusive = subrequest.to_position_inclusive(); + let truncate_up_to_position_inclusive = subrequest.truncate_up_to_position_inclusive(); - if to_position_inclusive.is_eof() { - delete_queue(&mut state_guard.mrecordlog, &queue_id) - .await - .expect("TODO: handle IO error"); - state_guard.shards.remove(&queue_id); - } else if let Some(truncate_position) = to_position_inclusive.as_u64() { - match state_guard - .mrecordlog - .truncate(&queue_id, truncate_position) - .await - { - Ok(_) | Err(TruncateError::MissingQueue(_)) => {} - Err(error) => { - error!("failed to truncate queue `{queue_id}`: {error}"); - continue; - } - }; - if let Some(shard) = state_guard.shards.get_mut(&queue_id) { - shard.truncation_position_inclusive = to_position_inclusive; - shard.notify_shard_status(); - } + if truncate_up_to_position_inclusive.is_eof() { + state_guard.delete_shard(&queue_id).await; + } else { + state_guard + .truncate_shard(&queue_id, truncate_up_to_position_inclusive) + .await; } } self.check_decommissioning_status(&mut state_guard); @@ -835,6 +829,85 @@ impl IngesterService for Ingester { } } +impl IngesterState { + /// Truncates the shard identified by `queue_id` up to `truncate_up_to_position_inclusive` only + /// if the current truncation position of the shard is smaller. + async fn truncate_shard( + &mut self, + queue_id: &QueueId, + truncate_up_to_position_inclusive: Position, + ) { + // TODO: Replace with if-let-chains when stabilized. + let Some(truncate_up_to_offset_inclusive) = truncate_up_to_position_inclusive.as_u64() + else { + return; + }; + let Some(shard) = self.shards.get_mut(queue_id) else { + return; + }; + if shard.truncation_position_inclusive >= truncate_up_to_position_inclusive { + return; + } + match self + .mrecordlog + .truncate(queue_id, truncate_up_to_offset_inclusive) + .await + { + Ok(_) => { + shard.truncation_position_inclusive = truncate_up_to_position_inclusive; + } + Err(TruncateError::MissingQueue(_)) => { + warn!("failed to truncate WAL queue `{queue_id}`: queue does not exist"); + } + Err(error) => { + error!(%error, "failed to truncate WAL queue `{queue_id}`"); + } + }; + } + + /// Deletes the shard identified by `queue_id` from the ingester state. It removes the + /// mrecordlog queue first and then, if the operation is successful, removes the shard. + async fn delete_shard(&mut self, queue_id: &QueueId) { + match self.mrecordlog.delete_queue(queue_id).await { + Ok(_) => { + self.shards.remove(queue_id); + self.rate_trackers.remove(queue_id); + } + Err(DeleteQueueError::MissingQueue(_)) => { + // The shard has already been deleted. + } + Err(DeleteQueueError::IoError(_)) => { + panic!("TODO: handle IO error") + } + }; + } +} + +struct WeakIngesterState(Weak>); + +#[async_trait] +impl EventSubscriber for WeakIngesterState { + async fn handle_event(&mut self, shard_positions_update: ShardPositionsUpdate) { + let Some(state) = self.0.upgrade() else { + return; + }; + let mut state_guard = state.write().await; + + let index_uid = shard_positions_update.source_uid.index_uid; + let source_id = shard_positions_update.source_uid.source_id; + + for (shard_id, shard_position) in shard_positions_update.shard_positions { + let queue_id = queue_id(index_uid.as_str(), &source_id, shard_id); + + if shard_position.is_eof() { + state_guard.delete_shard(&queue_id).await; + } else { + state_guard.truncate_shard(&queue_id, shard_position).await; + } + } + } +} + pub async fn wait_for_ingester_decommission(ingester_opt: Option) { let Some(mut ingester) = ingester_opt else { return; @@ -883,7 +956,8 @@ mod tests { TruncateShardsSubrequest, }; use quickwit_proto::ingest::{DocBatchV2, ShardIds}; - use quickwit_proto::types::queue_id; + use quickwit_proto::types::{queue_id, SourceUid}; + use tokio::task::yield_now; use tonic::transport::{Endpoint, Server}; use super::*; @@ -1241,40 +1315,6 @@ mod tests { ); } - #[tokio::test] - async fn test_ingester_open_replication_stream() { - let (_ingester_ctx, mut ingester) = IngesterForTest::default() - .with_node_id("test-follower") - .build() - .await; - - let (syn_replication_stream_tx, syn_replication_stream) = ServiceStream::new_bounded(5); - let open_stream_request = OpenReplicationStreamRequest { - leader_id: "test-leader".to_string(), - follower_id: "test-follower".to_string(), - replication_seqno: 0, - }; - let syn_replication_message = SynReplicationMessage::new_open_request(open_stream_request); - syn_replication_stream_tx - .send(syn_replication_message) - .await - .unwrap(); - let mut ack_replication_stream = ingester - .open_replication_stream(syn_replication_stream) - .await - .unwrap(); - ack_replication_stream - .next() - .await - .unwrap() - .unwrap() - .into_open_response() - .unwrap(); - - let state_guard = ingester.state.read().await; - assert!(state_guard.replication_tasks.contains_key("test-leader")); - } - #[tokio::test] async fn test_ingester_persist_replicate() { let (leader_ctx, mut leader) = IngesterForTest::default() @@ -1780,6 +1820,40 @@ mod tests { .mrecordlog .assert_records_eq(&queue_id_01, .., &[]); } + #[tokio::test] + + async fn test_ingester_open_replication_stream() { + let (_ingester_ctx, mut ingester) = IngesterForTest::default() + .with_node_id("test-follower") + .build() + .await; + + let (syn_replication_stream_tx, syn_replication_stream) = ServiceStream::new_bounded(5); + let open_stream_request = OpenReplicationStreamRequest { + leader_id: "test-leader".to_string(), + follower_id: "test-follower".to_string(), + replication_seqno: 0, + }; + let syn_replication_message = SynReplicationMessage::new_open_request(open_stream_request); + syn_replication_stream_tx + .send(syn_replication_message) + .await + .unwrap(); + let mut ack_replication_stream = ingester + .open_replication_stream(syn_replication_stream) + .await + .unwrap(); + ack_replication_stream + .next() + .await + .unwrap() + .unwrap() + .into_open_response() + .unwrap(); + + let state_guard = ingester.state.read().await; + assert!(state_guard.replication_tasks.contains_key("test-leader")); + } #[tokio::test] async fn test_ingester_open_fetch_stream() { @@ -1939,24 +2013,30 @@ mod tests { index_uid: "test-index:0".to_string(), source_id: "test-source".to_string(), shard_id: 1, - to_position_inclusive: Some(Position::offset(0u64)), + truncate_up_to_position_inclusive: Some(Position::offset(0u64)), }, TruncateShardsSubrequest { index_uid: "test-index:0".to_string(), source_id: "test-source".to_string(), shard_id: 2, - to_position_inclusive: Some(Position::eof(0u64)), + truncate_up_to_position_inclusive: Some(Position::eof(0u64)), }, TruncateShardsSubrequest { index_uid: "test-index:1337".to_string(), source_id: "test-source".to_string(), shard_id: 1337, - to_position_inclusive: Some(Position::offset(1337u64)), + truncate_up_to_position_inclusive: Some(Position::offset(1337u64)), }, ], }; ingester - .truncate_shards(truncate_shards_request) + .truncate_shards(truncate_shards_request.clone()) + .await + .unwrap(); + + // Verify idempotency. + ingester + .truncate_shards(truncate_shards_request.clone()) .await .unwrap(); @@ -1964,6 +2044,7 @@ mod tests { assert_eq!(state_guard.shards.len(), 1); assert!(state_guard.shards.contains_key(&queue_id_01)); + state_guard .mrecordlog .assert_records_eq(&queue_id_01, .., &[(1, "\0\0test-doc-bar")]); @@ -2011,7 +2092,16 @@ mod tests { shard_ids: vec![1, 1337], }], }; - ingester.close_shards(close_shards_request).await.unwrap(); + ingester + .close_shards(close_shards_request.clone()) + .await + .unwrap(); + + // Verify idempotency. + ingester + .close_shards(close_shards_request.clone()) + .await + .unwrap(); let state_guard = ingester.state.read().await; let shard = state_guard.shards.get(&queue_id).unwrap(); @@ -2102,4 +2192,93 @@ mod tests { IngesterStatus::Decommissioned ); } + + #[tokio::test] + async fn test_ingester_truncate_on_shard_positions_update() { + let (_ingester_ctx, ingester) = IngesterForTest::default().build().await; + let event_broker = EventBroker::default(); + ingester.subscribe(&event_broker); + + let shard_01 = Shard { + index_uid: "test-index:0".to_string(), + source_id: "test-source".to_string(), + shard_id: 1, + shard_state: ShardState::Open as i32, + ..Default::default() + }; + let queue_id_01 = queue_id("test-index:0", "test-source", 1); + + let shard_02 = Shard { + index_uid: "test-index:0".to_string(), + source_id: "test-source".to_string(), + shard_id: 2, + shard_state: ShardState::Closed as i32, + ..Default::default() + }; + let queue_id_02 = queue_id("test-index:0", "test-source", 2); + + let mut state_guard = ingester.state.write().await; + + ingester + .init_primary_shard(&mut state_guard, shard_01) + .await + .unwrap(); + ingester + .init_primary_shard(&mut state_guard, shard_02) + .await + .unwrap(); + + let records = [ + MRecord::new_doc("test-doc-foo").encode(), + MRecord::new_doc("test-doc-bar").encode(), + ] + .into_iter(); + + state_guard + .mrecordlog + .append_records(&queue_id_01, None, records) + .await + .unwrap(); + + let records = [MRecord::new_doc("test-doc-baz").encode()].into_iter(); + + state_guard + .mrecordlog + .append_records(&queue_id_02, None, records) + .await + .unwrap(); + + drop(state_guard); + + let shard_position_update = ShardPositionsUpdate { + source_uid: SourceUid { + index_uid: "test-index:0".into(), + source_id: "test-source".to_string(), + }, + shard_positions: vec![ + (1, Position::offset(0u64)), + (2, Position::eof(0u64)), + (1337, Position::offset(1337u64)), + ], + }; + event_broker.publish(shard_position_update.clone()); + + // Verify idempotency. + event_broker.publish(shard_position_update); + + // Yield so that the event is processed. + yield_now().await; + + let state_guard = ingester.state.read().await; + assert_eq!(state_guard.shards.len(), 1); + + assert!(state_guard.shards.contains_key(&queue_id_01)); + + state_guard + .mrecordlog + .assert_records_eq(&queue_id_01, .., &[(1, "\0\0test-doc-bar")]); + + assert!(!state_guard.shards.contains_key(&queue_id_02)); + assert!(!state_guard.mrecordlog.queue_exists(&queue_id_02)); + } } diff --git a/quickwit/quickwit-ingest/src/ingest_v2/mrecordlog_utils.rs b/quickwit/quickwit-ingest/src/ingest_v2/mrecordlog_utils.rs index f135680c4f6..523ae9b47d4 100644 --- a/quickwit/quickwit-ingest/src/ingest_v2/mrecordlog_utils.rs +++ b/quickwit/quickwit-ingest/src/ingest_v2/mrecordlog_utils.rs @@ -77,7 +77,10 @@ pub(super) fn check_enough_capacity( } /// Deletes a queue from the WAL. Returns without error if the queue does not exist. -pub async fn delete_queue(mrecordlog: &mut MultiRecordLog, queue_id: &QueueId) -> io::Result<()> { +pub async fn force_delete_queue( + mrecordlog: &mut MultiRecordLog, + queue_id: &QueueId, +) -> io::Result<()> { match mrecordlog.delete_queue(queue_id).await { Ok(_) | Err(DeleteQueueError::MissingQueue(_)) => Ok(()), Err(DeleteQueueError::IoError(error)) => Err(error), diff --git a/quickwit/quickwit-ingest/src/ingest_v2/router.rs b/quickwit/quickwit-ingest/src/ingest_v2/router.rs index 27ba0df4b7f..a30a617d6c7 100644 --- a/quickwit/quickwit-ingest/src/ingest_v2/router.rs +++ b/quickwit/quickwit-ingest/src/ingest_v2/router.rs @@ -19,13 +19,13 @@ use std::collections::{HashMap, HashSet}; use std::fmt; -use std::sync::Arc; +use std::sync::{Arc, Weak}; use std::time::Duration; use async_trait::async_trait; use futures::stream::FuturesUnordered; use futures::{Future, StreamExt}; -use quickwit_common::pubsub::{EventBroker, EventSubscriber, EventSubscriptionHandle}; +use quickwit_common::pubsub::{EventBroker, EventSubscriber}; use quickwit_proto::control_plane::{ ControlPlaneService, ControlPlaneServiceClient, GetOrCreateOpenShardsRequest, GetOrCreateOpenShardsSubrequest, @@ -67,7 +67,6 @@ pub struct IngestRouter { ingester_pool: IngesterPool, state: Arc>, replication_factor: usize, - _local_shards_update_subscription_handle: EventSubscriptionHandle, } struct RouterState { @@ -86,7 +85,6 @@ impl fmt::Debug for IngestRouter { impl IngestRouter { pub fn new( self_node_id: NodeId, - event_broker: EventBroker, control_plane: ControlPlaneServiceClient, ingester_pool: IngesterPool, replication_factor: usize, @@ -97,19 +95,21 @@ impl IngestRouter { table: HashMap::default(), }, })); - let _local_shards_update_subscription_handle = - event_broker.subscribe::(state.clone()); - Self { self_node_id, control_plane, ingester_pool, state, replication_factor, - _local_shards_update_subscription_handle, } } + pub fn subscribe(&self, event_broker: &EventBroker) { + event_broker + .subscribe::(Arc::downgrade(&self.state)) + .forever(); + } + /// Inspects the shard table for each subrequest and returns the appropriate /// [`GetOrCreateOpenShardsRequest`] request if open shards do not exist for all the them. async fn make_get_or_create_open_shard_request( @@ -391,8 +391,11 @@ impl IngestRouterService for IngestRouter { } #[async_trait] -impl EventSubscriber for Arc> { +impl EventSubscriber for Weak> { async fn handle_event(&mut self, local_shards_update: LocalShardsUpdate) { + let Some(state) = self.upgrade() else { + return; + }; // TODO: Insert the new shards in the shard table when `LocalShardsUpdate` also carries the // leader ID. let index_uid = local_shards_update.source_uid.index_uid; @@ -404,7 +407,8 @@ impl EventSubscriber for Arc> { .map(|shard_info| shard_info.shard_id) .collect(); - self.write() + state + .write() .await .shard_table .close_shards(&index_uid, &source_id, &closed_shard_ids); @@ -442,13 +446,11 @@ mod tests { #[tokio::test] async fn test_router_make_get_or_create_open_shard_request() { let self_node_id = "test-router".into(); - let event_broker = EventBroker::default(); let control_plane: ControlPlaneServiceClient = ControlPlaneServiceClient::mock().into(); let ingester_pool = IngesterPool::default(); let replication_factor = 1; let router = IngestRouter::new( self_node_id, - event_broker, control_plane, ingester_pool.clone(), replication_factor, @@ -559,7 +561,6 @@ mod tests { #[tokio::test] async fn test_router_populate_shard_table() { let self_node_id = "test-router".into(); - let event_broker = EventBroker::default(); let mut control_plane_mock = ControlPlaneServiceClient::mock(); control_plane_mock @@ -636,7 +637,6 @@ mod tests { let replication_factor = 1; let mut router = IngestRouter::new( self_node_id, - event_broker, control_plane, ingester_pool.clone(), replication_factor, @@ -744,13 +744,11 @@ mod tests { #[tokio::test] async fn test_router_process_persist_results_record_persist_successes() { let self_node_id = "test-router".into(); - let event_broker = EventBroker::default(); let control_plane = ControlPlaneServiceClient::mock().into(); let ingester_pool = IngesterPool::default(); let replication_factor = 1; let mut router = IngestRouter::new( self_node_id, - event_broker, control_plane, ingester_pool.clone(), replication_factor, @@ -796,13 +794,11 @@ mod tests { #[tokio::test] async fn test_router_process_persist_results_record_persist_failures() { let self_node_id = "test-router".into(); - let event_broker = EventBroker::default(); let control_plane = ControlPlaneServiceClient::mock().into(); let ingester_pool = IngesterPool::default(); let replication_factor = 1; let mut router = IngestRouter::new( self_node_id, - event_broker, control_plane, ingester_pool.clone(), replication_factor, @@ -848,13 +844,11 @@ mod tests { #[tokio::test] async fn test_router_process_persist_results_closes_shards() { let self_node_id = "test-router".into(); - let event_broker = EventBroker::default(); let control_plane = ControlPlaneServiceClient::mock().into(); let ingester_pool = IngesterPool::default(); let replication_factor = 1; let mut router = IngestRouter::new( self_node_id, - event_broker, control_plane, ingester_pool.clone(), replication_factor, @@ -914,7 +908,6 @@ mod tests { #[tokio::test] async fn test_router_process_persist_results_removes_unavailable_leaders() { let self_node_id = "test-router".into(); - let event_broker = EventBroker::default(); let control_plane = ControlPlaneServiceClient::mock().into(); let ingester_pool = IngesterPool::default(); @@ -930,7 +923,6 @@ mod tests { let replication_factor = 1; let mut router = IngestRouter::new( self_node_id, - event_broker, control_plane, ingester_pool.clone(), replication_factor, @@ -996,13 +988,11 @@ mod tests { #[tokio::test] async fn test_router_ingest() { let self_node_id = "test-router".into(); - let event_broker = EventBroker::default(); let control_plane = ControlPlaneServiceClient::mock().into(); let ingester_pool = IngesterPool::default(); let replication_factor = 1; let mut router = IngestRouter::new( self_node_id, - event_broker, control_plane, ingester_pool.clone(), replication_factor, @@ -1209,13 +1199,11 @@ mod tests { #[tokio::test] async fn test_router_ingest_retry() { let self_node_id = "test-router".into(); - let event_broker = EventBroker::default(); let control_plane = ControlPlaneServiceClient::mock().into(); let ingester_pool = IngesterPool::default(); let replication_factor = 1; let mut router = IngestRouter::new( self_node_id, - event_broker, control_plane, ingester_pool.clone(), replication_factor, @@ -1317,17 +1305,17 @@ mod tests { #[tokio::test] async fn test_router_closes_shards_on_local_shards_update() { let self_node_id = "test-router".into(); - let event_broker = EventBroker::default(); let control_plane = ControlPlaneServiceClient::mock().into(); let ingester_pool = IngesterPool::default(); let replication_factor = 1; let router = IngestRouter::new( self_node_id, - event_broker.clone(), control_plane, ingester_pool.clone(), replication_factor, ); + let event_broker = EventBroker::default(); + router.subscribe(&event_broker); let mut state_guard = router.state.write().await; state_guard.shard_table.set_shards( diff --git a/quickwit/quickwit-proto/protos/quickwit/ingester.proto b/quickwit/quickwit-proto/protos/quickwit/ingester.proto index fe21cc61c48..ff67c9ea4bf 100644 --- a/quickwit/quickwit-proto/protos/quickwit/ingester.proto +++ b/quickwit/quickwit-proto/protos/quickwit/ingester.proto @@ -196,7 +196,8 @@ message TruncateShardsSubrequest { string index_uid = 1; string source_id = 2; uint64 shard_id = 3; - quickwit.ingest.Position to_position_inclusive = 4; + // The position up to which the shard should be truncated (inclusive). + quickwit.ingest.Position truncate_up_to_position_inclusive = 4; } message TruncateShardsResponse { diff --git a/quickwit/quickwit-proto/src/codegen/quickwit/quickwit.ingest.ingester.rs b/quickwit/quickwit-proto/src/codegen/quickwit/quickwit.ingest.ingester.rs index 7570019c256..8309ce26a27 100644 --- a/quickwit/quickwit-proto/src/codegen/quickwit/quickwit.ingest.ingester.rs +++ b/quickwit/quickwit-proto/src/codegen/quickwit/quickwit.ingest.ingester.rs @@ -245,8 +245,11 @@ pub struct TruncateShardsSubrequest { pub source_id: ::prost::alloc::string::String, #[prost(uint64, tag = "3")] pub shard_id: u64, + /// The position up to which the shard should be truncated (inclusive). #[prost(message, optional, tag = "4")] - pub to_position_inclusive: ::core::option::Option, + pub truncate_up_to_position_inclusive: ::core::option::Option< + crate::types::Position, + >, } /// TODO #[derive(serde::Serialize, serde::Deserialize, utoipa::ToSchema)] diff --git a/quickwit/quickwit-proto/src/ingest/ingester.rs b/quickwit/quickwit-proto/src/ingest/ingester.rs index cede71bf984..e14fbf1fdfe 100644 --- a/quickwit/quickwit-proto/src/ingest/ingester.rs +++ b/quickwit/quickwit-proto/src/ingest/ingester.rs @@ -190,7 +190,9 @@ impl TruncateShardsSubrequest { queue_id(&self.index_uid, &self.source_id, self.shard_id) } - pub fn to_position_inclusive(&self) -> Position { - self.to_position_inclusive.clone().unwrap_or_default() + pub fn truncate_up_to_position_inclusive(&self) -> Position { + self.truncate_up_to_position_inclusive + .clone() + .unwrap_or_default() } } diff --git a/quickwit/quickwit-serve/src/lib.rs b/quickwit/quickwit-serve/src/lib.rs index 93aa2c6ad9f..5043f642894 100644 --- a/quickwit/quickwit-serve/src/lib.rs +++ b/quickwit/quickwit-serve/src/lib.rs @@ -383,7 +383,7 @@ pub async fn serve_quickwit( let (ingest_router_service, ingester_service_opt) = setup_ingest_v2( &node_config, &cluster, - event_broker.clone(), + &event_broker, control_plane_service.clone(), ingester_pool, ) @@ -583,7 +583,7 @@ pub async fn serve_quickwit( async fn setup_ingest_v2( config: &NodeConfig, cluster: &Cluster, - event_broker: EventBroker, + event_broker: &EventBroker, control_plane: ControlPlaneServiceClient, ingester_pool: IngesterPool, ) -> anyhow::Result<(IngestRouterServiceClient, Option)> { @@ -596,11 +596,11 @@ async fn setup_ingest_v2( .get(); let ingest_router = IngestRouter::new( self_node_id.clone(), - event_broker, control_plane, ingester_pool.clone(), replication_factor, ); + ingest_router.subscribe(event_broker); let ingest_router_service = IngestRouterServiceClient::new(ingest_router); // We compute the burst limit as something a bit larger than the content length limit, because @@ -626,6 +626,7 @@ async fn setup_ingest_v2( replication_factor, ) .await?; + ingester.subscribe(event_broker); let ingester_service = IngesterServiceClient::new(ingester); Some(ingester_service) } else {