Skip to content

Commit

Permalink
Added initialize time.
Browse files Browse the repository at this point in the history
Also, this PR increased NUDGE_TIME from 2s to 5s.

Closes #5082
  • Loading branch information
fulmicoton committed Jun 11, 2024
1 parent 43ae6c2 commit 20b32d3
Show file tree
Hide file tree
Showing 2 changed files with 57 additions and 2 deletions.
50 changes: 48 additions & 2 deletions quickwit/quickwit-indexing/src/actors/cooperative_indexing.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ use tokio::time::Instant;

/// We allow ourselves to adjust the sleep time by at most `NUDGE_TOLERANCE`
/// in order to steer a pipeline to its phase.
const NUDGE_TOLERANCE: Duration = Duration::from_secs(2);
const NUDGE_TOLERANCE: Duration = Duration::from_secs(5);

// Origin of time. It is used to compute the phase of the pipeline.
static ORIGIN_OF_TIME: Lazy<Instant> = Lazy::new(Instant::now);
Expand Down Expand Up @@ -107,13 +107,31 @@ impl CooperativeIndexingCycle {
commit_timeout: Duration,
indexing_permits: Arc<Semaphore>,
) -> CooperativeIndexingCycle {
// Force the initial of the origin of time.
let _t0 = *ORIGIN_OF_TIME;
CooperativeIndexingCycle {
target_phase,
commit_timeout,
indexing_permits,
}
}

pub fn initial_sleep_duration(&self) -> Duration {
let t0 = *ORIGIN_OF_TIME;
let commit_timeout_millis = self.commit_timeout.as_millis() as u64;
let current_phase_millis: u64 = t0.elapsed().as_millis() as u64 % commit_timeout_millis;
let target_phase_millis: u64 = self.target_phase.as_millis() as u64 % commit_timeout_millis;
let initial_sleep_millis: u64 = (commit_timeout_millis + target_phase_millis
- current_phase_millis)
% commit_timeout_millis;
if initial_sleep_millis + 2 * NUDGE_TOLERANCE.as_millis() as u64 > commit_timeout_millis {
// We are reasonably close to the target phase. No need to sleep. The nudge
// will be enough.
return Duration::default();
}
Duration::from_millis(initial_sleep_millis)
}

pub async fn cooperative_indexing_period(&self) -> CooperativeIndexingPeriod {
let t_wake = Instant::now();
let permit = Semaphore::acquire_owned(self.indexing_permits.clone())
Expand Down Expand Up @@ -161,7 +179,8 @@ impl CooperativeIndexingPeriod {

fn compute_sleep_duration(&self, t_work_end: Instant) -> Duration {
let commit_timeout_millis = self.commit_timeout.as_millis() as u64;
let phase_millis: u64 = ((t_work_end - *ORIGIN_OF_TIME).as_millis() as u64) % commit_timeout_millis;
let phase_millis: u64 =
((t_work_end - *ORIGIN_OF_TIME).as_millis() as u64) % commit_timeout_millis;
let delta_phase: i64 = phase_millis as i64 - self.target_phase.as_millis() as i64;
// delta phase is within (-commit_timeout_millis, commit_timeout_millis)
// We fold it back to [-commit_timeout_millis/2, commit_timeout_millis/2)
Expand Down Expand Up @@ -227,6 +246,33 @@ mod tests {
);
}

#[tokio::test]
async fn test_initial_sleep_time() {
tokio::time::pause();
let t0 = *ORIGIN_OF_TIME;
for target_phase_secs in [0, 1, 2, 5, 10, 15, 20, 25, 29, 30, 1_000] {
for start_time_secs in [0, 1, 2, 5, 10, 15, 20, 25, 29, 30] {
let target_phase = Duration::from_secs(target_phase_secs);
let semaphore = Arc::new(Semaphore::new(1));
tokio::time::sleep(Duration::from_secs(start_time_secs)).await;
let cooperative_indexing = CooperativeIndexingCycle::new_with_phase(
target_phase,
Duration::from_secs(30),
semaphore.clone(),
);
let initial_sleep_duration: Duration =
cooperative_indexing.initial_sleep_duration();
tokio::time::sleep(initial_sleep_duration).await;
let target_phase_millis = cooperative_indexing.target_phase.as_millis() as i64;
let commit_timeout_ms = cooperative_indexing.commit_timeout.as_millis() as i64;
let phase_millis =
(t0.elapsed().as_millis() as i64 - target_phase_millis) % commit_timeout_ms;
assert!(phase_millis >= -100, "{phase_millis}");
assert!(phase_millis <= (NUDGE_TOLERANCE.as_millis() as i64) * 2 + 100);
}
}
}

#[tokio::test]
async fn test_cooperative_indexing_simple() {
tokio::time::pause();
Expand Down
9 changes: 9 additions & 0 deletions quickwit/quickwit-indexing/src/actors/indexer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -398,6 +398,15 @@ impl Actor for Indexer {
false
}

async fn initialize(&mut self, ctx: &ActorContext<Self>) -> Result<(), ActorExitStatus> {
if let Some(cooperative_indexing_cycle) = &self.indexer_state.cooperative_indexing_opt {
let initial_sleep_duration = cooperative_indexing_cycle.initial_sleep_duration();
ctx.pause();
ctx.schedule_self_msg(initial_sleep_duration, Command::Resume);
}
Ok(())
}

async fn on_drained_messages(
&mut self,
ctx: &ActorContext<Self>,
Expand Down

0 comments on commit 20b32d3

Please sign in to comment.