Skip to content

Commit

Permalink
Only emitting an event for shards that were actually updated.
Browse files Browse the repository at this point in the history
Closes #4500
  • Loading branch information
fulmicoton committed Feb 3, 2024
1 parent 17c0600 commit f145d58
Show file tree
Hide file tree
Showing 6 changed files with 60 additions and 70 deletions.
10 changes: 5 additions & 5 deletions quickwit/quickwit-control-plane/src/control_plane.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ use quickwit_metastore::IndexMetadata;
use quickwit_proto::control_plane::{
ControlPlaneError, ControlPlaneResult, GetDebugStateRequest, GetDebugStateResponse,
GetOrCreateOpenShardsRequest, GetOrCreateOpenShardsResponse, GetOrCreateOpenShardsSubrequest,
PhysicalIndexingPlanEntry, ShardTableEntry,
PhysicalIndexingPlanEntry, ShardTableEntry
};
use quickwit_proto::indexing::ShardPositionsUpdate;
use quickwit_proto::metastore::{
Expand Down Expand Up @@ -343,7 +343,7 @@ impl Handler<ShardPositionsUpdate> for ControlPlane {
.collect();
// let's identify the shard that have reached EOF but have not yet been removed.
let shard_ids_to_close: Vec<ShardId> = shard_positions_update
.shard_positions
.updated_shard_positions
.into_iter()
.filter(|(shard_id, position)| position.is_eof() && known_shard_ids.contains(shard_id))
.map(|(shard_id, _position)| shard_id)
Expand Down Expand Up @@ -1306,7 +1306,7 @@ mod tests {
control_plane_mailbox
.ask(ShardPositionsUpdate {
source_uid: source_uid.clone(),
shard_positions: vec![(ShardId::from(17), Position::offset(1_000u64))],
updated_shard_positions: vec![(ShardId::from(17), Position::offset(1_000u64))],
})
.await
.unwrap();
Expand All @@ -1330,7 +1330,7 @@ mod tests {
control_plane_mailbox
.ask(ShardPositionsUpdate {
source_uid,
shard_positions: vec![(ShardId::from(17), Position::eof(1_000u64))],
updated_shard_positions: vec![(ShardId::from(17), Position::eof(1_000u64))],
})
.await
.unwrap();
Expand Down Expand Up @@ -1432,7 +1432,7 @@ mod tests {
control_plane_mailbox
.ask(ShardPositionsUpdate {
source_uid: source_uid.clone(),
shard_positions: vec![(ShardId::from(17), Position::eof(1_000u64))],
updated_shard_positions: vec![(ShardId::from(17), Position::eof(1_000u64))],
})
.await
.unwrap();
Expand Down
103 changes: 44 additions & 59 deletions quickwit/quickwit-indexing/src/models/shard_positions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.

use std::collections::btree_map::Entry;
use std::collections::{BTreeMap, HashMap};
use std::fmt::Debug;

Expand Down Expand Up @@ -176,9 +175,9 @@ impl Handler<ClusterShardPositionsUpdate> for ShardPositionsService {
source_uid,
shard_positions,
} = update;
let was_updated = self.apply_update(&source_uid, shard_positions);
if was_updated {
self.publish_shard_updates_to_event_broker(source_uid);
let updated_shard_positions = self.apply_update(&source_uid, shard_positions);
if !updated_shard_positions.is_empty() {
self.publish_shard_updates_to_event_broker(source_uid, updated_shard_positions);
}
Ok(())
}
Expand All @@ -192,16 +191,20 @@ impl Handler<LocalShardPositionsUpdate> for ShardPositionsService {
&mut self,
update: LocalShardPositionsUpdate,
_ctx: &ActorContext<Self>,
) -> Result<Self::Reply, ActorExitStatus> {
) -> Result<(), ActorExitStatus> {
let LocalShardPositionsUpdate {
source_uid,
shard_positions,
} = update;
let was_updated = self.apply_update(&source_uid, shard_positions);
if was_updated {
self.publish_positions_into_chitchat(source_uid.clone())
.await;
self.publish_shard_updates_to_event_broker(source_uid);
let updated_shard_positions: Vec<(ShardId, Position)> =
self.apply_update(&source_uid, shard_positions);
if updated_shard_positions.is_empty() {
return Ok(());
}
self.publish_positions_into_chitchat(source_uid.clone())
.await;
if !updated_shard_positions.is_empty() {
self.publish_shard_updates_to_event_broker(source_uid, updated_shard_positions);
}
Ok(())
}
Expand All @@ -223,51 +226,48 @@ impl ShardPositionsService {
.await;
}

fn publish_shard_updates_to_event_broker(&self, source_uid: SourceUid) {
let Some(shard_positions_map) = self.shard_positions_per_source.get(&source_uid) else {
return;
};
let shard_positions: Vec<(ShardId, Position)> = shard_positions_map
.iter()
.map(|(shard_id, position)| (shard_id.clone(), position.clone()))
.collect();
fn publish_shard_updates_to_event_broker(
&self,
source_uid: SourceUid,
shard_positions: Vec<(ShardId, Position)>,
) {
self.event_broker.publish(ShardPositionsUpdate {
source_uid,
shard_positions,
updated_shard_positions: shard_positions,
});
}

/// Updates the internal model holding the last position per shard, and
/// returns true if at least one of the publish position was updated.
/// returns the list of shards that were updated.
fn apply_update(
&mut self,
source_uid: &SourceUid,
published_positions_per_shard: Vec<(ShardId, Position)>,
) -> bool {
) -> Vec<(ShardId, Position)> {
if published_positions_per_shard.is_empty() {
warn!("received an empty publish shard positions update");
return false;
return Vec::new();
}
let mut was_modified = false;
let current_shard_positions = self
.shard_positions_per_source
.entry(source_uid.clone())
.or_default();
for (shard, new_position) in published_positions_per_shard {
match current_shard_positions.entry(shard) {
Entry::Occupied(mut occupied) => {
if *occupied.get() < new_position {
occupied.insert(new_position);
was_modified = true;
}
}
Entry::Vacant(vacant) => {
was_modified = true;
vacant.insert(new_position.clone());
}
}

let updated_positions_per_shard = published_positions_per_shard
.into_iter()
.filter(|(shard, new_position)| {
let Some(position) = current_shard_positions.get(&shard) else {
return true;
};
new_position > position
})
.collect::<Vec<_>>();

for (shard, position) in updated_positions_per_shard.iter() {
current_shard_positions.insert(shard.clone(), position.clone());
}
was_modified

updated_positions_per_shard
}
}

Expand Down Expand Up @@ -380,26 +380,17 @@ mod tests {
for _ in 0..4 {
let update = rx1.recv().await.unwrap();
assert_eq!(update.source_uid, source_uid);
updates1.push(update.shard_positions);
updates1.push(update.updated_shard_positions);
}

// The updates as seen from the first node.
assert_eq!(
updates1,
vec![
vec![(ShardId::from(1), Position::Beginning)],
vec![
(ShardId::from(1), Position::Beginning),
(ShardId::from(2), Position::offset(10u64))
],
vec![
(ShardId::from(1), Position::offset(10u64)),
(ShardId::from(2), Position::offset(10u64)),
],
vec![
(ShardId::from(1), Position::offset(10u64)),
(ShardId::from(2), Position::offset(12u64)),
],
vec![(ShardId::from(2), Position::offset(10u64))],
vec![(ShardId::from(1), Position::offset(10u64)),],
vec![(ShardId::from(2), Position::offset(12u64)),],
]
);

Expand All @@ -408,21 +399,15 @@ mod tests {
for _ in 0..4 {
let update = rx2.recv().await.unwrap();
assert_eq!(update.source_uid, source_uid);
updates2.push(update.shard_positions);
updates2.push(update.updated_shard_positions);
}
assert_eq!(
updates2,
vec![
vec![(ShardId::from(2), Position::offset(10u64))],
vec![(ShardId::from(2), Position::offset(12u64))],
vec![
(ShardId::from(1), Position::Beginning),
(ShardId::from(2), Position::offset(12u64))
],
vec![
(ShardId::from(1), Position::offset(10u64)),
(ShardId::from(2), Position::offset(12u64))
],
vec![(ShardId::from(1), Position::Beginning),],
vec![(ShardId::from(1), Position::offset(10u64)),],
]
);

Expand Down
4 changes: 2 additions & 2 deletions quickwit/quickwit-ingest/src/ingest_v2/ingester.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1113,7 +1113,7 @@ impl EventSubscriber<ShardPositionsUpdate> for WeakIngesterState {
let index_uid = shard_positions_update.source_uid.index_uid;
let source_id = shard_positions_update.source_uid.source_id;

for (shard_id, shard_position) in shard_positions_update.shard_positions {
for (shard_id, shard_position) in shard_positions_update.updated_shard_positions {
let queue_id = queue_id(index_uid.as_str(), &source_id, &shard_id);

if shard_position.is_eof() {
Expand Down Expand Up @@ -2786,7 +2786,7 @@ mod tests {
index_uid: "test-index:0".into(),
source_id: "test-source".to_string(),
},
shard_positions: vec![
updated_shard_positions: vec![
(ShardId::from(1), Position::offset(0u64)),
(ShardId::from(2), Position::eof(0u64)),
(ShardId::from(1337), Position::offset(1337u64)),
Expand Down
4 changes: 2 additions & 2 deletions quickwit/quickwit-ingest/src/ingest_v2/router.rs
Original file line number Diff line number Diff line change
Expand Up @@ -488,7 +488,7 @@ impl EventSubscriber<ShardPositionsUpdate> for WeakRouterState {
};
let mut deleted_shard_ids: Vec<ShardId> = Vec::new();

for (shard_id, shard_position) in shard_positions_update.shard_positions {
for (shard_id, shard_position) in shard_positions_update.updated_shard_positions {
if shard_position.is_eof() {
deleted_shard_ids.push(shard_id);
}
Expand Down Expand Up @@ -1514,7 +1514,7 @@ mod tests {
index_uid: "test-index-0:0".into(),
source_id: "test-source".to_string(),
},
shard_positions: vec![(ShardId::from(1), Position::eof(0u64))],
updated_shard_positions: vec![(ShardId::from(1), Position::eof(0u64))],
};
event_broker.publish(shard_positions_update);

Expand Down
5 changes: 5 additions & 0 deletions quickwit/quickwit-ingest/src/ingest_v2/state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -185,6 +185,11 @@ impl FullyLockedIngesterState<'_> {
/// Deletes the shard identified by `queue_id` from the ingester state. It removes the
/// mrecordlog queue first and then removes the associated in-memory shard and rate trackers.
pub async fn delete_shard(&mut self, queue_id: &QueueId) {
// This if-statement is here to avoid needless log.
if self.inner.shards.contains_key(queue_id) {
// No need to do anything. This queue is not on this ingester.
return;
}
match self.mrecordlog.delete_queue(queue_id).await {
Ok(_) | Err(DeleteQueueError::MissingQueue(_)) => {
self.shards.remove(queue_id);
Expand Down
4 changes: 2 additions & 2 deletions quickwit/quickwit-proto/src/indexing/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -331,8 +331,8 @@ impl From<CpuCapacity> for CpuCapacityForSerialization {
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct ShardPositionsUpdate {
pub source_uid: SourceUid,
// All of the shards known are listed here, regardless of whether they were updated or not.
pub shard_positions: Vec<(ShardId, Position)>,
// Only shards that received an update are listed here.
pub updated_shard_positions: Vec<(ShardId, Position)>,
}

impl Event for ShardPositionsUpdate {}
Expand Down

0 comments on commit f145d58

Please sign in to comment.