From df9c302b6d975fa072d2182d5eaf79b91dff9b49 Mon Sep 17 00:00:00 2001 From: Remi Dettai Date: Fri, 30 Aug 2024 11:08:54 +0200 Subject: [PATCH] Small naming and trait bound adjustments --- quickwit/quickwit-common/src/pubsub.rs | 2 +- .../quickwit-ingest/src/ingest_v2/workbench.rs | 16 ++++++++-------- 2 files changed, 9 insertions(+), 9 deletions(-) diff --git a/quickwit/quickwit-common/src/pubsub.rs b/quickwit/quickwit-common/src/pubsub.rs index 29fab1101cb..70683167169 100644 --- a/quickwit/quickwit-common/src/pubsub.rs +++ b/quickwit/quickwit-common/src/pubsub.rs @@ -42,7 +42,7 @@ pub trait EventSubscriber: Send + Sync + 'static { impl EventSubscriber for F where E: Event, - F: Fn(E) + Send + Sync + 'static, + F: FnMut(E) + Send + Sync + 'static, { async fn handle_event(&mut self, event: E) { (self)(event); diff --git a/quickwit/quickwit-ingest/src/ingest_v2/workbench.rs b/quickwit/quickwit-ingest/src/ingest_v2/workbench.rs index 388a990086b..a628ac1feff 100644 --- a/quickwit/quickwit-ingest/src/ingest_v2/workbench.rs +++ b/quickwit/quickwit-ingest/src/ingest_v2/workbench.rs @@ -94,7 +94,7 @@ impl PublishTracker { } } - fn shard_persisted(&self, shard_id: ShardId, new_position: Position) { + fn track_persisted_position(&self, shard_id: ShardId, new_position: Position) { let mut state_handle = self.state.lock().unwrap(); match state_handle.already_published.get(&shard_id) { Some(already_published_position) if new_position <= *already_published_position => { @@ -249,7 +249,7 @@ impl IngestWorkbench { if let Some(publish_tracker) = &mut self.publish_tracker { if let Some(position) = &persist_success.replication_position_inclusive { publish_tracker - .shard_persisted(persist_success.shard_id().clone(), position.clone()); + .track_persisted_position(persist_success.shard_id().clone(), position.clone()); } } self.num_successes += 1; @@ -493,9 +493,9 @@ mod tests { let shard_id_3 = ShardId::from("test-shard-3"); let shard_id_4 = ShardId::from("test-shard-3"); - tracker.shard_persisted(shard_id_1.clone(), Position::offset(42usize)); - tracker.shard_persisted(shard_id_2.clone(), Position::offset(42usize)); - tracker.shard_persisted(shard_id_3.clone(), Position::offset(42usize)); + tracker.track_persisted_position(shard_id_1.clone(), Position::offset(42usize)); + tracker.track_persisted_position(shard_id_2.clone(), Position::offset(42usize)); + tracker.track_persisted_position(shard_id_3.clone(), Position::offset(42usize)); event_broker.publish(ShardPositionsUpdate { source_uid: SourceUid { @@ -524,7 +524,7 @@ mod tests { }); // persist response received after the publish event - tracker.shard_persisted(shard_id_4.clone(), Position::offset(42usize)); + tracker.track_persisted_position(shard_id_4.clone(), Position::offset(42usize)); tokio::time::timeout(Duration::from_millis(200), tracker.wait_publish_complete()) .await @@ -538,8 +538,8 @@ mod tests { let tracker = PublishTracker::new(event_broker.clone()); let shard_id_1 = ShardId::from("test-shard-1"); let position = Position::offset(42usize); - tracker.shard_persisted(shard_id_1.clone(), position.clone()); - tracker.shard_persisted(ShardId::from("test-shard-2"), position.clone()); + tracker.track_persisted_position(shard_id_1.clone(), position.clone()); + tracker.track_persisted_position(ShardId::from("test-shard-2"), position.clone()); event_broker.publish(ShardPositionsUpdate { source_uid: SourceUid {