From 5acbe00dc643b1f069ef7fb51d07a31aa9fc7ba6 Mon Sep 17 00:00:00 2001 From: Paul Masurel Date: Sat, 3 Feb 2024 10:13:02 +0100 Subject: [PATCH] Change of semantics of `ShardPositionsUpdate` It now only contains the shards that are received an update, and the event itself is only emitted if at least a shard received an update. Closes #4500 --- .../src/control_plane.rs | 8 +- .../src/models/shard_positions.rs | 101 ++++++++---------- .../quickwit-ingest/src/ingest_v2/ingester.rs | 4 +- .../quickwit-ingest/src/ingest_v2/router.rs | 4 +- .../quickwit-ingest/src/ingest_v2/state.rs | 5 + quickwit/quickwit-proto/src/indexing/mod.rs | 4 +- 6 files changed, 57 insertions(+), 69 deletions(-) diff --git a/quickwit/quickwit-control-plane/src/control_plane.rs b/quickwit/quickwit-control-plane/src/control_plane.rs index 8c65ba46bce..4fba374fabb 100644 --- a/quickwit/quickwit-control-plane/src/control_plane.rs +++ b/quickwit/quickwit-control-plane/src/control_plane.rs @@ -343,7 +343,7 @@ impl Handler for ControlPlane { .collect(); // let's identify the shard that have reached EOF but have not yet been removed. let shard_ids_to_close: Vec = shard_positions_update - .shard_positions + .updated_shard_positions .into_iter() .filter(|(shard_id, position)| position.is_eof() && known_shard_ids.contains(shard_id)) .map(|(shard_id, _position)| shard_id) @@ -1306,7 +1306,7 @@ mod tests { control_plane_mailbox .ask(ShardPositionsUpdate { source_uid: source_uid.clone(), - shard_positions: vec![(ShardId::from(17), Position::offset(1_000u64))], + updated_shard_positions: vec![(ShardId::from(17), Position::offset(1_000u64))], }) .await .unwrap(); @@ -1330,7 +1330,7 @@ mod tests { control_plane_mailbox .ask(ShardPositionsUpdate { source_uid, - shard_positions: vec![(ShardId::from(17), Position::eof(1_000u64))], + updated_shard_positions: vec![(ShardId::from(17), Position::eof(1_000u64))], }) .await .unwrap(); @@ -1432,7 +1432,7 @@ mod tests { control_plane_mailbox .ask(ShardPositionsUpdate { source_uid: source_uid.clone(), - shard_positions: vec![(ShardId::from(17), Position::eof(1_000u64))], + updated_shard_positions: vec![(ShardId::from(17), Position::eof(1_000u64))], }) .await .unwrap(); diff --git a/quickwit/quickwit-indexing/src/models/shard_positions.rs b/quickwit/quickwit-indexing/src/models/shard_positions.rs index 4e76b7e10da..e062063f6bf 100644 --- a/quickwit/quickwit-indexing/src/models/shard_positions.rs +++ b/quickwit/quickwit-indexing/src/models/shard_positions.rs @@ -17,7 +17,6 @@ // You should have received a copy of the GNU Affero General Public License // along with this program. If not, see . -use std::collections::btree_map::Entry; use std::collections::{BTreeMap, HashMap}; use std::fmt::Debug; @@ -176,9 +175,9 @@ impl Handler for ShardPositionsService { source_uid, shard_positions, } = update; - let was_updated = self.apply_update(&source_uid, shard_positions); - if was_updated { - self.publish_shard_updates_to_event_broker(source_uid); + let updated_shard_positions = self.apply_update(&source_uid, shard_positions); + if !updated_shard_positions.is_empty() { + self.publish_shard_updates_to_event_broker(source_uid, updated_shard_positions); } Ok(()) } @@ -192,17 +191,19 @@ impl Handler for ShardPositionsService { &mut self, update: LocalShardPositionsUpdate, _ctx: &ActorContext, - ) -> Result { + ) -> Result<(), ActorExitStatus> { let LocalShardPositionsUpdate { source_uid, shard_positions, } = update; - let was_updated = self.apply_update(&source_uid, shard_positions); - if was_updated { - self.publish_positions_into_chitchat(source_uid.clone()) - .await; - self.publish_shard_updates_to_event_broker(source_uid); + let updated_shard_positions: Vec<(ShardId, Position)> = + self.apply_update(&source_uid, shard_positions); + if updated_shard_positions.is_empty() { + return Ok(()); } + self.publish_positions_into_chitchat(source_uid.clone()) + .await; + self.publish_shard_updates_to_event_broker(source_uid, updated_shard_positions); Ok(()) } } @@ -223,51 +224,48 @@ impl ShardPositionsService { .await; } - fn publish_shard_updates_to_event_broker(&self, source_uid: SourceUid) { - let Some(shard_positions_map) = self.shard_positions_per_source.get(&source_uid) else { - return; - }; - let shard_positions: Vec<(ShardId, Position)> = shard_positions_map - .iter() - .map(|(shard_id, position)| (shard_id.clone(), position.clone())) - .collect(); + fn publish_shard_updates_to_event_broker( + &self, + source_uid: SourceUid, + shard_positions: Vec<(ShardId, Position)>, + ) { self.event_broker.publish(ShardPositionsUpdate { source_uid, - shard_positions, + updated_shard_positions: shard_positions, }); } /// Updates the internal model holding the last position per shard, and - /// returns true if at least one of the publish position was updated. + /// returns the list of shards that were updated. fn apply_update( &mut self, source_uid: &SourceUid, published_positions_per_shard: Vec<(ShardId, Position)>, - ) -> bool { + ) -> Vec<(ShardId, Position)> { if published_positions_per_shard.is_empty() { warn!("received an empty publish shard positions update"); - return false; + return Vec::new(); } - let mut was_modified = false; let current_shard_positions = self .shard_positions_per_source .entry(source_uid.clone()) .or_default(); - for (shard, new_position) in published_positions_per_shard { - match current_shard_positions.entry(shard) { - Entry::Occupied(mut occupied) => { - if *occupied.get() < new_position { - occupied.insert(new_position); - was_modified = true; - } - } - Entry::Vacant(vacant) => { - was_modified = true; - vacant.insert(new_position.clone()); - } - } + + let updated_positions_per_shard = published_positions_per_shard + .into_iter() + .filter(|(shard, new_position)| { + let Some(position) = current_shard_positions.get(shard) else { + return true; + }; + new_position > position + }) + .collect::>(); + + for (shard, position) in updated_positions_per_shard.iter() { + current_shard_positions.insert(shard.clone(), position.clone()); } - was_modified + + updated_positions_per_shard } } @@ -380,7 +378,7 @@ mod tests { for _ in 0..4 { let update = rx1.recv().await.unwrap(); assert_eq!(update.source_uid, source_uid); - updates1.push(update.shard_positions); + updates1.push(update.updated_shard_positions); } // The updates as seen from the first node. @@ -388,18 +386,9 @@ mod tests { updates1, vec![ vec![(ShardId::from(1), Position::Beginning)], - vec![ - (ShardId::from(1), Position::Beginning), - (ShardId::from(2), Position::offset(10u64)) - ], - vec![ - (ShardId::from(1), Position::offset(10u64)), - (ShardId::from(2), Position::offset(10u64)), - ], - vec![ - (ShardId::from(1), Position::offset(10u64)), - (ShardId::from(2), Position::offset(12u64)), - ], + vec![(ShardId::from(2), Position::offset(10u64))], + vec![(ShardId::from(1), Position::offset(10u64)),], + vec![(ShardId::from(2), Position::offset(12u64)),], ] ); @@ -408,21 +397,15 @@ mod tests { for _ in 0..4 { let update = rx2.recv().await.unwrap(); assert_eq!(update.source_uid, source_uid); - updates2.push(update.shard_positions); + updates2.push(update.updated_shard_positions); } assert_eq!( updates2, vec![ vec![(ShardId::from(2), Position::offset(10u64))], vec![(ShardId::from(2), Position::offset(12u64))], - vec![ - (ShardId::from(1), Position::Beginning), - (ShardId::from(2), Position::offset(12u64)) - ], - vec![ - (ShardId::from(1), Position::offset(10u64)), - (ShardId::from(2), Position::offset(12u64)) - ], + vec![(ShardId::from(1), Position::Beginning),], + vec![(ShardId::from(1), Position::offset(10u64)),], ] ); diff --git a/quickwit/quickwit-ingest/src/ingest_v2/ingester.rs b/quickwit/quickwit-ingest/src/ingest_v2/ingester.rs index 58633c61df6..cb75999081e 100644 --- a/quickwit/quickwit-ingest/src/ingest_v2/ingester.rs +++ b/quickwit/quickwit-ingest/src/ingest_v2/ingester.rs @@ -1113,7 +1113,7 @@ impl EventSubscriber for WeakIngesterState { let index_uid = shard_positions_update.source_uid.index_uid; let source_id = shard_positions_update.source_uid.source_id; - for (shard_id, shard_position) in shard_positions_update.shard_positions { + for (shard_id, shard_position) in shard_positions_update.updated_shard_positions { let queue_id = queue_id(index_uid.as_str(), &source_id, &shard_id); if shard_position.is_eof() { @@ -2786,7 +2786,7 @@ mod tests { index_uid: "test-index:0".into(), source_id: "test-source".to_string(), }, - shard_positions: vec![ + updated_shard_positions: vec![ (ShardId::from(1), Position::offset(0u64)), (ShardId::from(2), Position::eof(0u64)), (ShardId::from(1337), Position::offset(1337u64)), diff --git a/quickwit/quickwit-ingest/src/ingest_v2/router.rs b/quickwit/quickwit-ingest/src/ingest_v2/router.rs index c7d5f647ddc..b2439f92696 100644 --- a/quickwit/quickwit-ingest/src/ingest_v2/router.rs +++ b/quickwit/quickwit-ingest/src/ingest_v2/router.rs @@ -488,7 +488,7 @@ impl EventSubscriber for WeakRouterState { }; let mut deleted_shard_ids: Vec = Vec::new(); - for (shard_id, shard_position) in shard_positions_update.shard_positions { + for (shard_id, shard_position) in shard_positions_update.updated_shard_positions { if shard_position.is_eof() { deleted_shard_ids.push(shard_id); } @@ -1514,7 +1514,7 @@ mod tests { index_uid: "test-index-0:0".into(), source_id: "test-source".to_string(), }, - shard_positions: vec![(ShardId::from(1), Position::eof(0u64))], + updated_shard_positions: vec![(ShardId::from(1), Position::eof(0u64))], }; event_broker.publish(shard_positions_update); diff --git a/quickwit/quickwit-ingest/src/ingest_v2/state.rs b/quickwit/quickwit-ingest/src/ingest_v2/state.rs index 8515c56d81f..80b6baa46f9 100644 --- a/quickwit/quickwit-ingest/src/ingest_v2/state.rs +++ b/quickwit/quickwit-ingest/src/ingest_v2/state.rs @@ -185,6 +185,11 @@ impl FullyLockedIngesterState<'_> { /// Deletes the shard identified by `queue_id` from the ingester state. It removes the /// mrecordlog queue first and then removes the associated in-memory shard and rate trackers. pub async fn delete_shard(&mut self, queue_id: &QueueId) { + // This if-statement is here to avoid needless log. + if self.inner.shards.contains_key(queue_id) { + // No need to do anything. This queue is not on this ingester. + return; + } match self.mrecordlog.delete_queue(queue_id).await { Ok(_) | Err(DeleteQueueError::MissingQueue(_)) => { self.shards.remove(queue_id); diff --git a/quickwit/quickwit-proto/src/indexing/mod.rs b/quickwit/quickwit-proto/src/indexing/mod.rs index 8efd0b9376f..a6853e5ff14 100644 --- a/quickwit/quickwit-proto/src/indexing/mod.rs +++ b/quickwit/quickwit-proto/src/indexing/mod.rs @@ -331,8 +331,8 @@ impl From for CpuCapacityForSerialization { #[derive(Debug, Clone, PartialEq, Eq)] pub struct ShardPositionsUpdate { pub source_uid: SourceUid, - // All of the shards known are listed here, regardless of whether they were updated or not. - pub shard_positions: Vec<(ShardId, Position)>, + // Only shards that received an update are listed here. + pub updated_shard_positions: Vec<(ShardId, Position)>, } impl Event for ShardPositionsUpdate {}