Skip to content

Commit

Permalink
Log query errors through tracing crate
Browse files Browse the repository at this point in the history
(cherry picked from commit fed0bd4)
  • Loading branch information
pkolaczk authored and vponomaryov committed Oct 29, 2024
1 parent ceb9718 commit 8dfecb5
Show file tree
Hide file tree
Showing 7 changed files with 107 additions and 42 deletions.
25 changes: 25 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

11 changes: 6 additions & 5 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,9 @@ path = "src/main.rs"
anyhow = "1.0"
base64 = "0.22"
bytes = "1.0.1"
rmp = "0.8.10"
rmp-serde = "1.0.0-beta.2"
chrono = { version = "0.4.18", features = ["serde"] }
rmp = "0.8"
rmp-serde = "1"
chrono = { version = "0.4", features = ["serde"] }
clap = { version = "4", features = ["derive", "cargo", "env"] }
console = "0.15.0"
cpu-time = "1.0.0"
Expand All @@ -42,8 +42,8 @@ rune = "0.12"
rust-embed = "8"
scylla = { version = "0.13", features = ["ssl"] }
search_path = "0.1"
serde = { version = "1.0.116", features = ["derive"] }
serde_json = "1.0.57"
serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0"
statrs = "0.17"
status-line = "0.2.0"
strum = { version = "0.26", features = ["derive"] }
Expand All @@ -53,6 +53,7 @@ thiserror = "1.0.26"
tokio = { version = "1", features = ["rt", "rt-multi-thread", "time", "parking_lot", "signal"] }
tokio-stream = "0.1"
tracing = "0.1"
tracing-appender = "0.2"
tracing-subscriber = "0.3"
try-lock = "0.2.3"
uuid = { version = "1.1", features = ["v4"] }
Expand Down
40 changes: 15 additions & 25 deletions src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -425,6 +425,9 @@ pub struct RunCommand {

#[clap(skip)]
pub cass_version: Option<String>,

#[clap(skip)]
pub id: Option<String>,
}

impl RunCommand {
Expand All @@ -443,31 +446,6 @@ impl RunCommand {
.find(|(k, _)| k == key)
.and_then(|v| v.1.parse().ok())
}

/// Returns benchmark name
pub fn name(&self) -> String {
self.workload
.file_stem()
.unwrap()
.to_string_lossy()
.to_string()
}

/// Suggested file name where to save the results of the run.
pub fn default_output_file_name(&self, extension: &str) -> PathBuf {
let mut components = vec![self.name()];
components.extend(self.cluster_name.iter().map(|x| x.replace(' ', "_")));
components.extend(self.cass_version.iter().cloned());
components.extend(self.tags.iter().cloned());
components.extend(self.rate.map(|r| format!("r{r}")));
components.push(format!("p{}", self.concurrency));
components.push(format!("t{}", self.threads));
components.push(format!("c{}", self.connection.count));
let params = self.params.iter().map(|(k, v)| format!("{k}{v}"));
components.extend(params);
components.push(chrono::Local::now().format("%Y%m%d.%H%M%S").to_string());
PathBuf::from(format!("{}.{extension}", components.join(".")))
}
}

#[derive(Parser, Debug)]
Expand Down Expand Up @@ -600,6 +578,18 @@ author = "Piotr Kołaczkowski <[email protected]>",
version = clap::crate_version ! (),
)]
pub struct AppConfig {
/// Name of the log file.
///
/// If not given, the log file name will be created automatically based on the current timestamp.
/// If relative path given, the file will be placed in the directory determined by `log-dir`.
/// The log file will store detailed information about e.g. query errors.
#[clap(long("log-file"))]
pub log_file: Option<PathBuf>,

/// Directory where log files are stored.
#[clap(long("log-dir"), env("LATTE_LOG_DIR"), default_value = ".")]
pub log_dir: PathBuf,

#[clap(subcommand)]
pub command: Command,
}
Expand Down
2 changes: 2 additions & 0 deletions src/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ use scylla::transport::session::PoolSize;
use scylla::{ExecutionProfile, QueryResult, SessionBuilder};
use statrs::distribution::{Normal, Uniform};
use tokio::time::{Duration, Instant};
use tracing::error;
use try_lock::TryLock;
use uuid::{Variant, Version};

Expand Down Expand Up @@ -442,6 +443,7 @@ pub async fn handle_retry_error(
next_attempt_str,
current_error,
);
error!("{}", err_msg);
if !is_last_attempt {
ctxt.stats.try_lock().unwrap().store_retry_error(err_msg);
tokio::time::sleep(current_retry_interval).await;
Expand Down
3 changes: 3 additions & 0 deletions src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,9 @@ pub enum LatteError {
#[error(display = "Failed to create output file {:?}: {}", _0, _1)]
OutputFileCreate(PathBuf, std::io::Error),

#[error(display = "Failed to create log file {:?}: {}", _0, _1)]
LogFileCreate(PathBuf, std::io::Error),

#[error(display = "Error writing HDR log: {}", _0)]
HdrLogWrite(#[source] IntervalLogWriterError<V2DeflateSerializeError>),

Expand Down
60 changes: 51 additions & 9 deletions src/main.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
use std::env;
use std::ffi::OsStr;
use std::fs::File;
use std::io::{stdout, Write};
use std::path::{Path, PathBuf};
use std::process::exit;
use std::time::Duration;
use std::{env, fs};

use clap::Parser;
use config::RunCommand;
Expand All @@ -17,6 +17,8 @@ use rune::Source;
use search_path::SearchPath;
use tokio::runtime::{Builder, Runtime};
use tokio::task::spawn_blocking;
use tracing::info;
use tracing_appender::non_blocking::WorkerGuard;
use walkdir::WalkDir;

use crate::config::{
Expand Down Expand Up @@ -74,8 +76,15 @@ fn load_workload_script(workload: &Path, params: &[(String, String)]) -> Result<
.canonicalize()
.unwrap_or_else(|_| workload.to_path_buf());
eprintln!("info: Loading workload script {}...", workload.display());
let src = Source::from_path(&workload).map_err(|e| LatteError::ScriptRead(workload, e))?;
Program::new(src, params.iter().cloned().collect())
let src =
Source::from_path(&workload).map_err(|e| LatteError::ScriptRead(workload.clone(), e))?;
let program = Program::new(src, params.iter().cloned().collect())?;
info!(
"Loaded workload script {}\n\tParams: {:?}",
workload.display(),
params,
);
Ok(program)
}

/// Locates the workload and returns an absolute path to it.
Expand Down Expand Up @@ -295,7 +304,7 @@ async fn run(conf: RunCommand) -> Result<()> {
let path = conf
.output
.clone()
.unwrap_or_else(|| conf.default_output_file_name("json"));
.unwrap_or_else(|| PathBuf::from(format!("latte-{}.json", conf.id.as_ref().unwrap())));

let report = Report::new(conf, stats);
match report.save(&path) {
Expand Down Expand Up @@ -448,12 +457,15 @@ async fn export_hdr_log(conf: HdrCommand) -> Result<()> {
Ok(())
}

async fn async_main(command: Command) -> Result<()> {
async fn async_main(run_id: String, command: Command) -> Result<()> {
match command {
Command::Edit(config) => edit(config)?,
Command::Schema(config) => schema(config).await?,
Command::Load(config) => load(config).await?,
Command::Run(config) => run(config).await?,
Command::Run(mut config) => {
config.id = Some(run_id);
run(config).await?
}
Command::List(config) => list(config).await?,
Command::Show(config) => show(config).await?,
Command::Hdr(config) => export_hdr_log(config).await?,
Expand Down Expand Up @@ -493,16 +505,46 @@ fn init_runtime(thread_count: usize) -> std::io::Result<Runtime> {
}
}

fn setup_logging(run_id: &str, config: &AppConfig) -> Result<WorkerGuard> {
let log_file = match &config.log_file {
Some(file) if file.is_absolute() => file.clone(),
Some(file) => config.log_dir.clone().join(file),
None => config.log_dir.join(format!("latte-{}.log", run_id)),
};
fs::create_dir_all(&config.log_dir)
.map_err(|e| LatteError::LogFileCreate(log_file.clone(), e))?;
let log_file = File::create(&log_file).map_err(|e| LatteError::LogFileCreate(log_file, e))?;
let (non_blocking, guard) = tracing_appender::non_blocking(log_file);
tracing_subscriber::fmt()
.with_ansi(false)
.with_writer(non_blocking)
.init();
Ok(guard)
}

fn run_id() -> String {
chrono::Local::now().format("%Y%m%d-%H%M%S-%3f").to_string()
}

fn main() {
tracing_subscriber::fmt::init();
let command = AppConfig::parse().command;
let run_id = run_id();
let config = AppConfig::parse();
let _guard = match setup_logging(run_id.as_str(), &config) {
Ok(guard) => guard,
Err(e) => {
eprintln!("error: {e}");
exit(1);
}
};

let command = config.command;
let thread_count = match &command {
Command::Run(cmd) => cmd.threads.get(),
Command::Load(cmd) => cmd.threads.get(),
_ => 1,
};
let runtime = init_runtime(thread_count);
if let Err(e) = runtime.unwrap().block_on(async_main(command)) {
if let Err(e) = runtime.unwrap().block_on(async_main(run_id, command)) {
eprintln!("error: {e}");
exit(128);
}
Expand Down
8 changes: 5 additions & 3 deletions src/plot.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ use plotters::prelude::*;
use plotters_svg::SVGBackend;
use std::collections::BTreeSet;
use std::ops::Range;
use std::path::PathBuf;
use std::process::exit;

#[derive(Eq, PartialEq, Copy, Clone, Debug, Ord, PartialOrd)]
Expand Down Expand Up @@ -142,9 +143,10 @@ pub async fn plot_graph(conf: PlotCommand) -> Result<()> {
}
};

let output_path = conf
.output
.unwrap_or(reports[0].conf.default_output_file_name("svg"));
let output_path = conf.output.unwrap_or(PathBuf::from(format!(
"latte-{}.svg",
reports[0].conf.id.as_ref().unwrap()
)));
let root = SVGBackend::new(&output_path, (2000, 1000)).into_drawing_area();
root.fill(&WHITE).unwrap();

Expand Down

0 comments on commit 8dfecb5

Please sign in to comment.