Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

remove duplicated code #5538

Merged
merged 1 commit into from
Nov 4, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
193 changes: 2 additions & 191 deletions quickwit/quickwit-cli/src/index.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,30 +18,23 @@
// along with this program. If not, see <http://www.gnu.org/licenses/>.

use std::borrow::Cow;
use std::collections::VecDeque;
use std::fmt::Display;
use std::io::{stdout, Stdout, Write};
use std::num::NonZeroUsize;
use std::ops::Div;
use std::path::PathBuf;
use std::str::FromStr;
use std::time::{Duration, Instant};
use std::{fmt, io};

use anyhow::{anyhow, bail, Context};
use bytesize::ByteSize;
use clap::{arg, Arg, ArgAction, ArgMatches, Command};
use colored::{ColoredString, Colorize};
use humantime::format_duration;
use colored::Colorize;
use indicatif::{ProgressBar, ProgressStyle};
use itertools::Itertools;
use numfmt::{Formatter, Scales};
use quickwit_actors::ActorHandle;
use quickwit_common::tower::{Rate, RateEstimator, SmaRateEstimator};
use quickwit_common::uri::Uri;
use quickwit_config::{ConfigFormat, IndexConfig};
use quickwit_indexing::models::IndexingStatistics;
use quickwit_indexing::IndexingPipeline;
use quickwit_metastore::{IndexMetadata, Split, SplitState};
use quickwit_proto::search::{CountHits, SortField, SortOrder};
use quickwit_proto::types::IndexId;
Expand All @@ -54,12 +47,11 @@ use tabled::settings::object::{FirstRow, Rows, Segment};
use tabled::settings::panel::Footer;
use tabled::settings::{Alignment, Disable, Format, Modify, Panel, Rotate, Style};
use tabled::{Table, Tabled};
use thousands::Separable;
use tracing::{debug, Level};

use crate::checklist::GREEN_COLOR;
use crate::stats::{mean, percentile, std_deviation};
use crate::{client_args, make_table, prompt_confirmation, ClientArgs, THROUGHPUT_WINDOW_SIZE};
use crate::{client_args, make_table, prompt_confirmation, ClientArgs};

pub fn build_index_command() -> Command {
Command::new("index")
Expand Down Expand Up @@ -1143,187 +1135,6 @@ pub async fn delete_index_cli(args: DeleteIndexArgs) -> anyhow::Result<()> {
Ok(())
}

/// Starts a tokio task that displays the indexing statistics
/// every once in awhile.
pub async fn start_statistics_reporting_loop(
pipeline_handle: ActorHandle<IndexingPipeline>,
is_stdin: bool,
) -> anyhow::Result<IndexingStatistics> {
let mut stdout_handle = stdout();
let start_time = Instant::now();
let mut throughput_calculator = ThroughputCalculator::new(start_time);
let mut report_interval = tokio::time::interval(Duration::from_secs(1));

loop {
// TODO fixme. The way we wait today is a bit lame: if the indexing pipeline exits, we will
// still wait up to an entire heartbeat... Ideally we should select between two
// futures.
report_interval.tick().await;
// Try to receive with a timeout of 1 second.
// 1 second is also the frequency at which we update statistic in the console
pipeline_handle.refresh_observe();

let observation = pipeline_handle.last_observation();

// Let's not display live statistics to allow screen to scroll.
if observation.num_docs > 0 {
display_statistics(&mut stdout_handle, &mut throughput_calculator, &observation)?;
}

if pipeline_handle.state().is_exit() {
break;
}
}
let (pipeline_exit_status, pipeline_statistics) = pipeline_handle.join().await;
if !pipeline_exit_status.is_success() {
bail!(pipeline_exit_status);
}
// If we have received zero docs at this point,
// there is no point in displaying report.
if pipeline_statistics.num_docs == 0 {
return Ok(pipeline_statistics);
}

if is_stdin {
display_statistics(
&mut stdout_handle,
&mut throughput_calculator,
&pipeline_statistics,
)?;
}
// display end of task report
println!();
let secs = Duration::from_secs(start_time.elapsed().as_secs());
if pipeline_statistics.num_invalid_docs == 0 {
println!(
"Indexed {} documents in {}.",
pipeline_statistics.num_docs.separate_with_commas(),
format_duration(secs)
);
} else {
let num_indexed_docs = (pipeline_statistics.num_docs
- pipeline_statistics.num_invalid_docs)
.separate_with_commas();

let error_rate = (pipeline_statistics.num_invalid_docs as f64
/ pipeline_statistics.num_docs as f64)
* 100.0;

println!(
"Indexed {} out of {} documents in {}. Failed to index {} document(s). {}\n",
num_indexed_docs,
pipeline_statistics.num_docs.separate_with_commas(),
format_duration(secs),
pipeline_statistics.num_invalid_docs.separate_with_commas(),
colorize_error_rate(error_rate),
);
}

Ok(pipeline_statistics)
}

fn colorize_error_rate(error_rate: f64) -> ColoredString {
let error_rate_message = format!("({error_rate:.1}% error rate)");
if error_rate < 1.0 {
error_rate_message.yellow()
} else if error_rate < 5.0 {
error_rate_message.truecolor(255, 181, 46) //< Orange
} else {
error_rate_message.red()
}
}

/// A struct to print data on the standard output.
struct Printer<'a> {
pub stdout: &'a mut Stdout,
}

impl Printer<'_> {
pub fn print_header(&mut self, header: &str) -> io::Result<()> {
write!(&mut self.stdout, " {}", header.bright_blue())?;
Ok(())
}

pub fn print_value(&mut self, fmt_args: fmt::Arguments) -> io::Result<()> {
write!(&mut self.stdout, " {fmt_args}")
}

pub fn flush(&mut self) -> io::Result<()> {
self.stdout.flush()
}
}

fn display_statistics(
stdout: &mut Stdout,
throughput_calculator: &mut ThroughputCalculator,
statistics: &IndexingStatistics,
) -> anyhow::Result<()> {
let elapsed_duration = time::Duration::try_from(throughput_calculator.elapsed_time())?;
let elapsed_time = format!(
"{:02}:{:02}:{:02}",
elapsed_duration.whole_hours(),
elapsed_duration.whole_minutes() % 60,
elapsed_duration.whole_seconds() % 60
);
let throughput_mb_s = throughput_calculator.calculate(statistics.total_bytes_processed);
let mut printer = Printer { stdout };
printer.print_header("Num docs")?;
printer.print_value(format_args!("{:>7}", statistics.num_docs))?;
printer.print_header("Parse errs")?;
printer.print_value(format_args!("{:>5}", statistics.num_invalid_docs))?;
printer.print_header("PublSplits")?;
printer.print_value(format_args!("{:>3}", statistics.num_published_splits))?;
printer.print_header("Input size")?;
printer.print_value(format_args!(
"{:>5}MB",
statistics.total_bytes_processed / 1_000_000
))?;
printer.print_header("Thrghput")?;
printer.print_value(format_args!("{throughput_mb_s:>5.2}MB/s"))?;
printer.print_header("Time")?;
printer.print_value(format_args!("{elapsed_time}\n"))?;
printer.flush()?;
Ok(())
}

/// ThroughputCalculator is used to calculate throughput.
struct ThroughputCalculator {
/// Stores the time series of processed bytes value.
processed_bytes_values: VecDeque<(Instant, u64)>,
/// Store the time this calculator started
start_time: Instant,
}

impl ThroughputCalculator {
/// Creates new instance.
pub fn new(start_time: Instant) -> Self {
let processed_bytes_values: VecDeque<(Instant, u64)> = (0..THROUGHPUT_WINDOW_SIZE)
.map(|_| (start_time, 0u64))
.collect();
Self {
processed_bytes_values,
start_time,
}
}

/// Calculates the throughput.
pub fn calculate(&mut self, current_processed_bytes: u64) -> f64 {
self.processed_bytes_values.pop_front();
let current_instant = Instant::now();
let (first_instant, first_processed_bytes) = *self.processed_bytes_values.front().unwrap();
let elapsed_time = (current_instant - first_instant).as_millis() as f64 / 1_000f64;
self.processed_bytes_values
.push_back((current_instant, current_processed_bytes));
(current_processed_bytes - first_processed_bytes) as f64
/ 1_000_000f64
/ elapsed_time.max(1f64)
}

pub fn elapsed_time(&self) -> Duration {
self.start_time.elapsed()
}
}

#[cfg(test)]
mod test {

Expand Down

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading