From c447a0ff4d8518aaedd5edfeed655214216c9c05 Mon Sep 17 00:00:00 2001 From: scottwey Date: Sun, 15 Sep 2024 00:59:37 +0000 Subject: [PATCH 1/5] see if sleeping for a little decreases cpu usage at rest --- mistralrs-core/src/engine/mod.rs | 2 ++ 1 file changed, 2 insertions(+) diff --git a/mistralrs-core/src/engine/mod.rs b/mistralrs-core/src/engine/mod.rs index bff888fe2..ecd175273 100644 --- a/mistralrs-core/src/engine/mod.rs +++ b/mistralrs-core/src/engine/mod.rs @@ -103,6 +103,8 @@ impl Engine { let rng = Arc::new(std::sync::Mutex::new(Isaac64Rng::seed_from_u64(SEED))); let mut last_completion_ids: Vec = vec![]; 'lp: loop { + tokio::time::sleep(std::time::Duration::from_micros(500)).await; + if matches!( ENGINE_INSTRUCTIONS .lock() From 38f84f9b636999de75e456c573a95eb7a255d640 Mon Sep 17 00:00:00 2001 From: scottwey Date: Sun, 15 Sep 2024 08:29:41 +0000 Subject: [PATCH 2/5] attempt to refactor --- mistralrs-core/src/engine/mod.rs | 619 ++++++++++++++++--------------- 1 file changed, 312 insertions(+), 307 deletions(-) diff --git a/mistralrs-core/src/engine/mod.rs b/mistralrs-core/src/engine/mod.rs index ecd175273..a4b6bfaa5 100644 --- a/mistralrs-core/src/engine/mod.rs +++ b/mistralrs-core/src/engine/mod.rs @@ -103,8 +103,6 @@ impl Engine { let rng = Arc::new(std::sync::Mutex::new(Isaac64Rng::seed_from_u64(SEED))); let mut last_completion_ids: Vec = vec![]; 'lp: loop { - tokio::time::sleep(std::time::Duration::from_micros(500)).await; - if matches!( ENGINE_INSTRUCTIONS .lock() @@ -115,331 +113,338 @@ impl Engine { break 'lp; } - while let Ok(request) = self.rx.try_recv() { - if matches!(request, Request::Terminate) { - break 'lp; - } - self.handle_request(request).await; - } - let run_start = Instant::now(); - let scheduled = self.scheduler.schedule(); - - match scheduled { - SchedulerOutput::DefaultScheduler { - output: mut scheduled, - } => { - let mut prompt_ts = None; - let mut completion_ts = None; - if scheduled.completion.len() > 0 { - let throughput_start = Instant::now(); - let current_completion_ids: Vec = - scheduled.completion.iter().map(|seq| *seq.id()).collect(); - let res = { - let mut pipeline = get_mut_arcmutex!(self.pipeline); - let pre_op = if !self.no_kv_cache - && last_completion_ids != current_completion_ids - { - CacheInstruction::In( - scheduled.completion[0] - .get_adapters() - .map(AdapterInstruction::Activate) - .unwrap_or(AdapterInstruction::None), - ) - } else { - CacheInstruction::Nothing( - scheduled.completion[0] - .get_adapters() - .map(AdapterInstruction::Activate) - .unwrap_or(AdapterInstruction::None), - ) - }; - let post_op = if !self.no_kv_cache { - CacheInstruction::Out - } else { - CacheInstruction::Reset { - reset_non_granular: false, - adapter_inst: AdapterInstruction::None, - } - }; - - pipeline - .step( - &mut scheduled.completion, - false, - &mut self.prefix_cacher, - self.disable_eos_stop, - rng.clone(), - CacheBackendMetadata::DefaultInstructions { pre_op, post_op }, - ) - .await - }; - - handle_pipeline_forward_error!( - "completion step", - res, - &mut scheduled.completion, - self.pipeline, - 'lp, - self.prefix_cacher - ); - - let throughput_end = Instant::now(); - #[allow(clippy::cast_precision_loss)] - if self.throughput_logging_enabled { - completion_ts = Some( - scheduled.completion.len() as f64 - / throughput_end - .duration_since(throughput_start) - .as_secs_f64(), - ); - } - - last_completion_ids = current_completion_ids; + tokio::select! { + Some(request) = self.rx.recv() => { + if matches!(request, Request::Terminate) { + break; } + self.handle_request(request).await; + } - if scheduled.prompt.len() > 0 { - let throughput_start = Instant::now(); - let logits = { - let mut pipeline = get_mut_arcmutex!(self.pipeline); + else => { + + let scheduled = self.scheduler.schedule(); + + match scheduled { + SchedulerOutput::DefaultScheduler { + output: mut scheduled, + } => { + let has_scheduled_sequences = + !scheduled.prompt.is_empty() || !scheduled.completion.is_empty(); + + if has_scheduled_sequences { + let run_start = Instant::now(); + + let mut prompt_ts = None; + let mut completion_ts = None; + if scheduled.completion.len() > 0 { + let throughput_start = Instant::now(); + let current_completion_ids: Vec = + scheduled.completion.iter().map(|seq| *seq.id()).collect(); + let res = { + let mut pipeline = get_mut_arcmutex!(self.pipeline); + let pre_op = if !self.no_kv_cache + && last_completion_ids != current_completion_ids + { + CacheInstruction::In( + scheduled.completion[0] + .get_adapters() + .map(AdapterInstruction::Activate) + .unwrap_or(AdapterInstruction::None), + ) + } else { + CacheInstruction::Nothing( + scheduled.completion[0] + .get_adapters() + .map(AdapterInstruction::Activate) + .unwrap_or(AdapterInstruction::None), + ) + }; + let post_op = if !self.no_kv_cache { + CacheInstruction::Out + } else { + CacheInstruction::Reset { + reset_non_granular: false, + adapter_inst: AdapterInstruction::None, + } + }; + + pipeline + .step( + &mut scheduled.completion, + false, + &mut self.prefix_cacher, + self.disable_eos_stop, + rng.clone(), + CacheBackendMetadata::DefaultInstructions { pre_op, post_op }, + ) + .await + }; + + handle_pipeline_forward_error!( + "completion step", + res, + &mut scheduled.completion, + self.pipeline, + 'lp, + self.prefix_cacher + ); + + let throughput_end = Instant::now(); + #[allow(clippy::cast_precision_loss)] + if self.throughput_logging_enabled { + completion_ts = Some( + scheduled.completion.len() as f64 + / throughput_end + .duration_since(throughput_start) + .as_secs_f64(), + ); + } + + last_completion_ids = current_completion_ids; + } - // Run the prompt seqs - let post_op = if !self.no_kv_cache { - CacheInstruction::Out - } else { - CacheInstruction::Reset { - reset_non_granular: false, - adapter_inst: AdapterInstruction::None, + if scheduled.prompt.len() > 0 { + let throughput_start = Instant::now(); + let logits = { + let mut pipeline = get_mut_arcmutex!(self.pipeline); + + // Run the prompt seqs + let post_op = if !self.no_kv_cache { + CacheInstruction::Out + } else { + CacheInstruction::Reset { + reset_non_granular: false, + adapter_inst: AdapterInstruction::None, + } + }; + let adapter_inst = scheduled.prompt[0] + .get_adapters() + .map(AdapterInstruction::Activate) + .unwrap_or(AdapterInstruction::None); + + // Reset non granular state because the old sequence must be dead. + // Technically we don't need to do this but it is better to be safe. + pipeline + .step( + &mut scheduled.prompt, + true, + &mut self.prefix_cacher, + self.disable_eos_stop, + rng.clone(), + CacheBackendMetadata::DefaultInstructions { + pre_op: CacheInstruction::Reset { + reset_non_granular: false, + adapter_inst, + }, + post_op, + }, + ) + .await + }; + + handle_pipeline_forward_error!( + "prompt step", + logits, + &mut scheduled.prompt, + self.pipeline, + 'lp, + self.prefix_cacher + ); + + let throughput_end = Instant::now(); + #[allow(clippy::cast_precision_loss)] + if self.throughput_logging_enabled { + prompt_ts = Some( + scheduled + .prompt + .iter() + .map(|seq| seq.get_toks().len()) + .sum::() as f64 + / throughput_end + .duration_since(throughput_start) + .as_secs_f64(), + ); + } + + for seq in scheduled.prompt.iter_mut() { + seq.set_state(SequenceState::RunningCompletion); + let now = SystemTime::now() + .duration_since(UNIX_EPOCH) + .expect("Time travel has occurred!") + .as_millis(); + #[allow(clippy::cast_precision_loss)] + let prompt_tok_per_sec = + seq.len() as f32 / (now - seq.timestamp()) as f32; + seq.prompt_tok_per_sec = prompt_tok_per_sec * 1000.; + seq.prompt_timestamp = Some(now); + } + last_completion_ids = vec![]; } - }; - let adapter_inst = scheduled.prompt[0] - .get_adapters() - .map(AdapterInstruction::Activate) - .unwrap_or(AdapterInstruction::None); - - // Reset non granular state because the old sequence must be dead. - // Technically we don't need to do this but it is better to be safe. - pipeline - .step( - &mut scheduled.prompt, - true, - &mut self.prefix_cacher, - self.disable_eos_stop, - rng.clone(), - CacheBackendMetadata::DefaultInstructions { - pre_op: CacheInstruction::Reset { - reset_non_granular: false, - adapter_inst, - }, - post_op, - }, - ) - .await - }; - - handle_pipeline_forward_error!( - "prompt step", - logits, - &mut scheduled.prompt, - self.pipeline, - 'lp, - self.prefix_cacher - ); - - let throughput_end = Instant::now(); - #[allow(clippy::cast_precision_loss)] - if self.throughput_logging_enabled { - prompt_ts = Some( - scheduled - .prompt - .iter() - .map(|seq| seq.get_toks().len()) - .sum::() as f64 - / throughput_end - .duration_since(throughput_start) - .as_secs_f64(), - ); - } - for seq in scheduled.prompt.iter_mut() { - seq.set_state(SequenceState::RunningCompletion); - let now = SystemTime::now() - .duration_since(UNIX_EPOCH) - .expect("Time travel has occurred!") - .as_millis(); - #[allow(clippy::cast_precision_loss)] - let prompt_tok_per_sec = - seq.len() as f32 / (now - seq.timestamp()) as f32; - seq.prompt_tok_per_sec = prompt_tok_per_sec * 1000.; - seq.prompt_timestamp = Some(now); - } - last_completion_ids = vec![]; - } + if self.is_debug { + let ms_from_last_run = run_start.elapsed().as_secs_f64(); + let total_len = scheduled.prompt.len() + scheduled.completion.len(); + if total_len > 0 { + let prompt_lengths = scheduled + .prompt + .iter() + .map(|seq| seq.len().to_string()) + .collect::>() + .join(", "); + + let completion_lengths = scheduled + .completion + .iter() + .map(|seq| seq.len().to_string()) + .collect::>() + .join(", "); + + tracing::info!( + "Prompt[{}] Completion[{}] - {}ms", + prompt_lengths, + completion_lengths, + ms_from_last_run * 1000., + ); + } + } - if self.is_debug { - let ms_from_last_run = run_start.elapsed().as_secs_f64(); - let total_len = scheduled.prompt.len() + scheduled.completion.len(); - if total_len > 0 { - let prompt_lengths = scheduled - .prompt - .iter() - .map(|seq| seq.len().to_string()) - .collect::>() - .join(", "); - - let completion_lengths = scheduled - .completion - .iter() - .map(|seq| seq.len().to_string()) - .collect::>() - .join(", "); - - tracing::info!( - "Prompt[{}] Completion[{}] - {}ms", - prompt_lengths, - completion_lengths, - ms_from_last_run * 1000., - ); - } - } + if self.throughput_logging_enabled { + match (prompt_ts, completion_ts) { + (Some(prompt), Some(completion)) => { + info!("Throughput (scheduler V1): Prompt: {prompt} T/s Completion {completion} T/s"); + } + (None, Some(completion)) => { + info!("Throughput (scheduler V1): Completion {completion} T/s"); + } + (Some(prompt), None) => { + info!("Throughput (scheduler V1): Prompt: {prompt} T/s"); + } + (None, None) => (), + } + } - if self.throughput_logging_enabled { - match (prompt_ts, completion_ts) { - (Some(prompt), Some(completion)) => { - info!("Throughput (scheduler V1): Prompt: {prompt} T/s Completion {completion} T/s"); - } - (None, Some(completion)) => { - info!("Throughput (scheduler V1): Completion {completion} T/s"); - } - (Some(prompt), None) => { - info!("Throughput (scheduler V1): Prompt: {prompt} T/s"); + } else { + tokio::task::yield_now().await; } - (None, None) => (), - } - } - if scheduled.prompt.len() == 0 - && scheduled.completion.len() == 0 - && self.scheduler.waiting_len() == 0 - { - // If there is nothing to do, sleep until a request comes in - if let Some(request) = self.rx.recv().await { - if matches!(request, Request::Terminate) { - break 'lp; - } - self.handle_request(request).await; } - } - } - SchedulerOutput::PagedAttention { mut output } => { - if !output.scheduled.is_empty() { - let throughput_start = Instant::now(); - - let is_prompt = get_mut_arcmutex!(output.scheduled[0]).is_prompt(); - - let mut guards = output - .scheduled - .iter_mut() - .map(|seq| seq.lock().unwrap()) - .collect::>(); - - let mut guards_mut = - guards.iter_mut().map(|seq| &mut **seq).collect::>(); - - let res = { - let mut pipeline = get_mut_arcmutex!(self.pipeline); - - let block_size = self.scheduler.block_size().unwrap(); - - let metadata = PagedAttentionMeta { - block_size, - sliding_window: pipeline.get_metadata().sliding_window, - block_engine: self.scheduler.block_engine().unwrap(), - }; - - pipeline - .step( - &mut guards_mut, - is_prompt, - &mut self.prefix_cacher, - self.disable_eos_stop, - rng.clone(), - CacheBackendMetadata::PagedAttention { - metadata, - blocks_to_copy: output.blocks_to_copy, - blocks_to_swap_in: output.blocks_to_swap_in, - blocks_to_swap_out: output.blocks_to_swap_out, - }, - ) - .await - }; - - handle_pipeline_forward_error!( - "step", - res, - &mut guards_mut, - self.pipeline, - 'lp, - self.prefix_cacher - ); - - if self.is_debug { - let ms_from_last_run = run_start.elapsed().as_secs_f64(); - let total_len = guards.len(); - if total_len > 0 { - let lengths = guards - .iter() - .map(|seq| seq.len().to_string()) - .collect::>() - .join(", "); - - let (prompt_lengths, completion_lengths) = if is_prompt { - (lengths, "".to_string()) - } else { - ("".to_string(), lengths) + SchedulerOutput::PagedAttention { mut output } => { + if !output.scheduled.is_empty() { + let run_start = Instant::now(); + + let throughput_start = Instant::now(); + + let is_prompt = get_mut_arcmutex!(output.scheduled[0]).is_prompt(); + + let mut guards = output + .scheduled + .iter_mut() + .map(|seq| seq.lock().unwrap()) + .collect::>(); + + let mut guards_mut = + guards.iter_mut().map(|seq| &mut **seq).collect::>(); + + let res = { + let mut pipeline = get_mut_arcmutex!(self.pipeline); + + let block_size = self.scheduler.block_size().unwrap(); + + let metadata = PagedAttentionMeta { + block_size, + sliding_window: pipeline.get_metadata().sliding_window, + block_engine: self.scheduler.block_engine().unwrap(), + }; + + pipeline + .step( + &mut guards_mut, + is_prompt, + &mut self.prefix_cacher, + self.disable_eos_stop, + rng.clone(), + CacheBackendMetadata::PagedAttention { + metadata, + blocks_to_copy: output.blocks_to_copy, + blocks_to_swap_in: output.blocks_to_swap_in, + blocks_to_swap_out: output.blocks_to_swap_out, + }, + ) + .await }; - tracing::info!( - "Prompt[{}] Completion[{}] - {}ms", - prompt_lengths, - completion_lengths, - ms_from_last_run * 1000., + handle_pipeline_forward_error!( + "step", + res, + &mut guards_mut, + self.pipeline, + 'lp, + self.prefix_cacher ); - } - } - let throughput_end = Instant::now(); - #[allow(clippy::cast_precision_loss)] - if self.throughput_logging_enabled { - let n_toks = if is_prompt { - guards.iter().map(|seq| seq.get_toks().len()).sum::() - } else { - guards.len() - }; - let ts = n_toks as f64 - / throughput_end - .duration_since(throughput_start) - .as_secs_f64(); - info!("Throughput (scheduler V2): {ts} T/s"); - } + if self.is_debug { + let ms_from_last_run = run_start.elapsed().as_secs_f64(); + let total_len = guards.len(); + if total_len > 0 { + let lengths = guards + .iter() + .map(|seq| seq.len().to_string()) + .collect::>() + .join(", "); + + let (prompt_lengths, completion_lengths) = if is_prompt { + (lengths, "".to_string()) + } else { + ("".to_string(), lengths) + }; + + tracing::info!( + "Prompt[{}] Completion[{}] - {}ms", + prompt_lengths, + completion_lengths, + ms_from_last_run * 1000., + ); + } + } - if is_prompt { - for mut seq in guards { - let now = SystemTime::now() - .duration_since(UNIX_EPOCH) - .expect("Time travel has occurred!") - .as_millis(); + let throughput_end = Instant::now(); #[allow(clippy::cast_precision_loss)] - let prompt_tok_per_sec = - seq.len() as f32 / (now - seq.timestamp()) as f32; - seq.prompt_tok_per_sec = prompt_tok_per_sec * 1000.; - seq.prompt_timestamp = Some(now); + if self.throughput_logging_enabled { + let n_toks = if is_prompt { + guards.iter().map(|seq| seq.get_toks().len()).sum::() + } else { + guards.len() + }; + let ts = n_toks as f64 + / throughput_end + .duration_since(throughput_start) + .as_secs_f64(); + info!("Throughput (scheduler V2): {ts} T/s"); + } + + if is_prompt { + for mut seq in guards { + let now = SystemTime::now() + .duration_since(UNIX_EPOCH) + .expect("Time travel has occurred!") + .as_millis(); + #[allow(clippy::cast_precision_loss)] + let prompt_tok_per_sec = + seq.len() as f32 / (now - seq.timestamp()) as f32; + seq.prompt_tok_per_sec = prompt_tok_per_sec * 1000.; + seq.prompt_timestamp = Some(now); + } + } + } else { + tokio::task::yield_now().await; } } } - } - } - self.scheduler.free_finished_sequence_groups(); + self.scheduler.free_finished_sequence_groups(); + } + }; } } From c6f1845ae88b7164db7e73192c36ac2093091fd7 Mon Sep 17 00:00:00 2001 From: scottwey Date: Sun, 15 Sep 2024 08:40:51 +0000 Subject: [PATCH 3/5] only yield when there's no waiting_len --- mistralrs-core/src/engine/mod.rs | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/mistralrs-core/src/engine/mod.rs b/mistralrs-core/src/engine/mod.rs index a4b6bfaa5..9eecc28fa 100644 --- a/mistralrs-core/src/engine/mod.rs +++ b/mistralrs-core/src/engine/mod.rs @@ -436,12 +436,14 @@ impl Engine { seq.prompt_timestamp = Some(now); } } - } else { - tokio::task::yield_now().await; } } } + if self.scheduler.waiting_len() == 0 { + tokio::task::yield_now().await; + } + self.scheduler.free_finished_sequence_groups(); } }; From f9914fe6a4dfcd6eb5421c76e845811f03f03794 Mon Sep 17 00:00:00 2001 From: scottwey Date: Tue, 17 Sep 2024 00:36:19 +0000 Subject: [PATCH 4/5] add a comment on what yield_now does --- mistralrs-core/src/engine/mod.rs | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/mistralrs-core/src/engine/mod.rs b/mistralrs-core/src/engine/mod.rs index b4dc1010c..50008871d 100644 --- a/mistralrs-core/src/engine/mod.rs +++ b/mistralrs-core/src/engine/mod.rs @@ -125,7 +125,6 @@ impl Engine { } else => { - let scheduled = self.scheduler.schedule(); match scheduled { @@ -443,6 +442,8 @@ impl Engine { } } + // if there are no more pending requests in the scheduler, yield the current task + // this will mark the task as `Pending` for one runtime tick, the loop will resume on the next tick if self.scheduler.waiting_len() == 0 { tokio::task::yield_now().await; } From e67011e56f8f6c04eb88796466c52e2b09f4ef91 Mon Sep 17 00:00:00 2001 From: Scott Wey Date: Sat, 12 Oct 2024 03:28:26 -0700 Subject: [PATCH 5/5] Update mistralrs-core/src/engine/mod.rs Co-authored-by: Eric Buehler <65165915+EricLBuehler@users.noreply.github.com> --- mistralrs-core/src/engine/mod.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/mistralrs-core/src/engine/mod.rs b/mistralrs-core/src/engine/mod.rs index 50008871d..47d4d8224 100644 --- a/mistralrs-core/src/engine/mod.rs +++ b/mistralrs-core/src/engine/mod.rs @@ -444,7 +444,7 @@ impl Engine { // if there are no more pending requests in the scheduler, yield the current task // this will mark the task as `Pending` for one runtime tick, the loop will resume on the next tick - if self.scheduler.waiting_len() == 0 { + if self.scheduler.waiting_len() == 0 && self.scheduler.running_len() == 0 { tokio::task::yield_now().await; }