Skip to content

Commit

Permalink
Small naming and trait bound adjustments
Browse files Browse the repository at this point in the history
  • Loading branch information
rdettai committed Aug 30, 2024
1 parent fbbdc0f commit df9c302
Show file tree
Hide file tree
Showing 2 changed files with 9 additions and 9 deletions.
2 changes: 1 addition & 1 deletion quickwit/quickwit-common/src/pubsub.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ pub trait EventSubscriber<E>: Send + Sync + 'static {
impl<E, F> EventSubscriber<E> for F
where
E: Event,
F: Fn(E) + Send + Sync + 'static,
F: FnMut(E) + Send + Sync + 'static,
{
async fn handle_event(&mut self, event: E) {
(self)(event);
Expand Down
16 changes: 8 additions & 8 deletions quickwit/quickwit-ingest/src/ingest_v2/workbench.rs
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ impl PublishTracker {
}
}

fn shard_persisted(&self, shard_id: ShardId, new_position: Position) {
fn track_persisted_position(&self, shard_id: ShardId, new_position: Position) {
let mut state_handle = self.state.lock().unwrap();
match state_handle.already_published.get(&shard_id) {
Some(already_published_position) if new_position <= *already_published_position => {
Expand Down Expand Up @@ -249,7 +249,7 @@ impl IngestWorkbench {
if let Some(publish_tracker) = &mut self.publish_tracker {
if let Some(position) = &persist_success.replication_position_inclusive {
publish_tracker
.shard_persisted(persist_success.shard_id().clone(), position.clone());
.track_persisted_position(persist_success.shard_id().clone(), position.clone());
}
}
self.num_successes += 1;
Expand Down Expand Up @@ -493,9 +493,9 @@ mod tests {
let shard_id_3 = ShardId::from("test-shard-3");
let shard_id_4 = ShardId::from("test-shard-3");

tracker.shard_persisted(shard_id_1.clone(), Position::offset(42usize));
tracker.shard_persisted(shard_id_2.clone(), Position::offset(42usize));
tracker.shard_persisted(shard_id_3.clone(), Position::offset(42usize));
tracker.track_persisted_position(shard_id_1.clone(), Position::offset(42usize));
tracker.track_persisted_position(shard_id_2.clone(), Position::offset(42usize));
tracker.track_persisted_position(shard_id_3.clone(), Position::offset(42usize));

event_broker.publish(ShardPositionsUpdate {
source_uid: SourceUid {
Expand Down Expand Up @@ -524,7 +524,7 @@ mod tests {
});

// persist response received after the publish event
tracker.shard_persisted(shard_id_4.clone(), Position::offset(42usize));
tracker.track_persisted_position(shard_id_4.clone(), Position::offset(42usize));

tokio::time::timeout(Duration::from_millis(200), tracker.wait_publish_complete())
.await
Expand All @@ -538,8 +538,8 @@ mod tests {
let tracker = PublishTracker::new(event_broker.clone());
let shard_id_1 = ShardId::from("test-shard-1");
let position = Position::offset(42usize);
tracker.shard_persisted(shard_id_1.clone(), position.clone());
tracker.shard_persisted(ShardId::from("test-shard-2"), position.clone());
tracker.track_persisted_position(shard_id_1.clone(), position.clone());
tracker.track_persisted_position(ShardId::from("test-shard-2"), position.clone());

event_broker.publish(ShardPositionsUpdate {
source_uid: SourceUid {
Expand Down

0 comments on commit df9c302

Please sign in to comment.