From 0ad2000a149ac7e9d0a812a0f4be8e389b9eb751 Mon Sep 17 00:00:00 2001 From: Valerii Ponomarov Date: Tue, 5 Nov 2024 17:43:25 +0200 Subject: [PATCH] Fix formatting --- src/report/mod.rs | 2 +- src/scripting/bind.rs | 3 +- src/scripting/context.rs | 201 +++++++++++++++++++++++-------------- src/scripting/functions.rs | 12 ++- src/stats/session.rs | 8 +- 5 files changed, 136 insertions(+), 90 deletions(-) diff --git a/src/report/mod.rs b/src/report/mod.rs index 7f32c6d..8ddb1f3 100644 --- a/src/report/mod.rs +++ b/src/report/mod.rs @@ -1,4 +1,4 @@ -use crate::config::{RunCommand, PRINT_RETRY_ERROR_LIMIT, WeightedFunction}; +use crate::config::{RunCommand, WeightedFunction, PRINT_RETRY_ERROR_LIMIT}; use crate::stats::percentiles::Percentile; use crate::stats::{BenchmarkCmp, BenchmarkStats, Mean, Sample, Significance}; use chrono::{DateTime, Local, TimeZone}; diff --git a/src/scripting/bind.rs b/src/scripting/bind.rs index 9164d6e..9c31816 100644 --- a/src/scripting/bind.rs +++ b/src/scripting/bind.rs @@ -68,7 +68,8 @@ fn to_scylla_value(v: &Value, typ: &ColumnType) -> Result { (Value::Bytes(v), ColumnType::Blob) => Ok(CqlValue::Blob(v.borrow_ref().unwrap().to_vec())), (Value::Vec(v), ColumnType::Blob) => { let v: Vec = v.borrow_ref().unwrap().to_vec(); - let byte_vec: Vec = v.into_iter() + let byte_vec: Vec = v + .into_iter() .map(|value| value.as_byte().unwrap()) .collect(); Ok(CqlValue::Blob(byte_vec)) diff --git a/src/scripting/context.rs b/src/scripting/context.rs index e028bca..aed7aa5 100644 --- a/src/scripting/context.rs +++ b/src/scripting/context.rs @@ -1,11 +1,11 @@ -use bytes::Bytes; -use chrono::Utc; use crate::config::RetryInterval; use crate::error::LatteError; use crate::scripting::bind; use crate::scripting::cass_error::{CassError, CassErrorKind}; use crate::scripting::connect::ClusterInfo; use crate::stats::session::SessionStats; +use bytes::Bytes; +use chrono::Utc; use itertools::enumerate; use rand::prelude::ThreadRng; use rand::random; @@ -39,13 +39,13 @@ pub struct RowDistribution { pub struct RowDistributionPreset { pub total_rows: u64, pub partition_groups: Vec, - pub row_distributions: Vec<(RowDistribution, RowDistribution)> + pub row_distributions: Vec<(RowDistribution, RowDistribution)>, } impl RowDistributionPreset { pub fn new(partition_groups: Vec) -> RowDistributionPreset { let total_rows: u64 = partition_groups.iter().map(|pg| pg.n_rows_per_group).sum(); - RowDistributionPreset{ + RowDistributionPreset { total_rows, partition_groups, row_distributions: vec![], @@ -60,26 +60,31 @@ impl RowDistributionPreset { // This "greatest common divisioner" will be used as a number of distribution cycles // based on the partition group proportions. other_rows -= partition_group.n_rows_per_group; - let (cycles_num, (mult_n1, tail_n1), (mult_n2, tail_n2)) = max_gcd_with_tail( - partition_group.n_rows_per_group, other_rows, + let (cycles_num, (mult_n1, tail_n1), (mult_n2, tail_n2)) = + max_gcd_with_tail(partition_group.n_rows_per_group, other_rows); + let cycle_type_1 = ( + tail_n1 + tail_n2, + mult_n1 + (tail_n1 > 0) as u64, + mult_n2 + (tail_n2 > 0) as u64, ); - let cycle_type_1 = (tail_n1 + tail_n2, (mult_n1 + (tail_n1 > 0) as u64), mult_n2 + (tail_n2 > 0) as u64); let cycle_type_2 = ((cycles_num - tail_n1 - tail_n2), mult_n1, mult_n2); self.row_distributions.push(( - RowDistribution{ + RowDistribution { n_cycles: cycle_type_1.0, n_rows_for_left: cycle_type_1.1, n_rows_for_right: cycle_type_1.2, n_rows_for_left_and_right: cycle_type_1.1 + cycle_type_1.2, - n_rows_for_all_cycles: cycle_type_1.0 * cycle_type_1.1 + cycle_type_1.0 * cycle_type_1.2, + n_rows_for_all_cycles: cycle_type_1.0 * cycle_type_1.1 + + cycle_type_1.0 * cycle_type_1.2, }, - RowDistribution{ + RowDistribution { n_cycles: cycle_type_2.0, n_rows_for_left: cycle_type_2.1, n_rows_for_right: cycle_type_2.2, n_rows_for_left_and_right: cycle_type_2.1 + cycle_type_2.2, - n_rows_for_all_cycles: cycle_type_2.0 * cycle_type_2.1 + cycle_type_2.0 * cycle_type_2.2, - } + n_rows_for_all_cycles: cycle_type_2.0 * cycle_type_2.1 + + cycle_type_2.0 * cycle_type_2.2, + }, )); } } @@ -90,7 +95,8 @@ impl RowDistributionPreset { 0, self.partition_groups.clone(), self.row_distributions.clone(), - ).await + ) + .await } async fn _get_partition_idx( @@ -126,12 +132,14 @@ impl RowDistributionPreset { // be calculated correctly on the recursive call step. done_cycle_type_1_num = (idx + cycle_type_1.n_rows_for_right) / cycle_type_1_size; done_cycle_type_1_rows = done_cycle_type_1_num * cycle_type_1_size; - if done_cycle_type_1_rows <= idx && idx < cycle_type_1.n_rows_for_left + done_cycle_type_1_rows { - let ret = partn_offset + (idx - - done_cycle_type_1_rows - + done_cycle_type_1_num * cycle_type_1.n_rows_for_left - ) % current_partn_count; - return ret + if done_cycle_type_1_rows <= idx + && idx < cycle_type_1.n_rows_for_left + done_cycle_type_1_rows + { + let ret = partn_offset + + (idx - done_cycle_type_1_rows + + done_cycle_type_1_num * cycle_type_1.n_rows_for_left) + % current_partn_count; + return ret; } } else { done_cycle_type_1_num = cycle_type_1.n_cycles; @@ -142,17 +150,20 @@ impl RowDistributionPreset { // gets considered separately in other parts. // Also, we must add shift equal to the size of the right group to make it's idx // be calculated correctly on the recursive call step. - done_cycle_type_2_num = (idx - done_cycle_type_1_rows + cycle_type_2.n_rows_for_right) / cycle_type_2_size; + done_cycle_type_2_num = (idx - done_cycle_type_1_rows + + cycle_type_2.n_rows_for_right) + / cycle_type_2_size; done_cycle_type_2_rows = done_cycle_type_2_num * cycle_type_2_size; let total_done_rows = done_cycle_type_1_rows + done_cycle_type_2_rows; if total_done_rows <= idx && idx < total_done_rows + cycle_type_2.n_rows_for_left { - let ret = partn_offset + (idx - - done_cycle_type_1_num * cycle_type_1.n_rows_for_right - - done_cycle_type_2_rows - + done_cycle_type_2_num * cycle_type_2.n_rows_for_left - ) % current_partn_count; - return ret + let ret = partn_offset + + (idx + - done_cycle_type_1_num * cycle_type_1.n_rows_for_right + - done_cycle_type_2_rows + + done_cycle_type_2_num * cycle_type_2.n_rows_for_left) + % current_partn_count; + return ret; } } idx = idx @@ -163,7 +174,8 @@ impl RowDistributionPreset { panic!( "Failed to match idx and partition idx! \ Most probably row distribution values were incorrectly calculated \ - according to the partition groups data."); + according to the partition groups data." + ); } } @@ -271,12 +283,14 @@ impl Context { Ok(None) } Err(e) => { - eprintln!("WARNING: {e}", e=e); + eprintln!("WARNING: {e}", e = e); Ok(None) } } - }, - None => Err(CassError(CassErrorKind::Error("'session' is not defined".to_string()))) + } + None => Err(CassError(CassErrorKind::Error( + "'session' is not defined".to_string(), + ))), } } @@ -287,23 +301,24 @@ impl Context { preset_name: &str, row_count: u64, rows_per_partitions_base: u64, - mut rows_per_partitions_groups: &str, // "percent:base_multiplier, ..." -> "80:1,15:2,5:4" + mut rows_per_partitions_groups: &str, // "percent:base_multiplier, ..." -> "80:1,15:2,5:4" ) -> Result<(), CassError> { // Validate input data if preset_name.is_empty() { return Err(CassError(CassErrorKind::Error( - "init_partition_row_distribution_preset: 'preset_name' cannot be empty".to_string() - ))) + "init_partition_row_distribution_preset: 'preset_name' cannot be empty".to_string(), + ))); } if row_count < 1 { return Err(CassError(CassErrorKind::Error( - "init_partition_row_distribution_preset: 'row_count' cannot be less than 1".to_string() - ))) + "init_partition_row_distribution_preset: 'row_count' cannot be less than 1" + .to_string(), + ))); } if rows_per_partitions_base < 1 { return Err(CassError(CassErrorKind::Error( "init_partition_row_distribution_preset: 'rows_per_partitions_base' cannot be less than 1".to_string() - ))) + ))); } // Parse the 'rows_per_partitions_groups' string parameter into a HashMap @@ -318,7 +333,7 @@ impl Context { if duplicates_dump.contains(processed_pair) { return Err(CassError(CassErrorKind::Error(format!( "init_partition_row_distribution_preset: found duplicates pairs - '{processed_pair}'") - ))) + ))); } let parts: Vec<&str> = processed_pair.split(':').collect(); if let (Some(key), Some(value)) = (parts.first(), parts.get(1)) { @@ -332,15 +347,15 @@ impl Context { "init_partition_row_distribution_preset: \ Wrong sub-value provided in the 'rows_per_partitions_groups' parameter: '{processed_pair}'. \ It must be set of integer pairs separated with a ':' symbol. Example: '49.1:1,49:2,1.9:2.5'") - ))) + ))); } } } if (summary_percentage - 100.0).abs() > 0.01 { return Err(CassError(CassErrorKind::Error(format!( "init_partition_row_distribution_preset: \ - summary of partition percentage must be '100'. Got '{summary_percentage}' instead") - ))) + summary of partition percentage must be '100'. Got '{summary_percentage}' instead" + )))); } // Calculate values @@ -350,9 +365,13 @@ impl Context { for (key, (partn_percent, partn_multiplier)) in &partn_multipliers { partn_sizes.insert( key.to_string(), - (*partn_percent, ((rows_per_partitions_base as f64) * partn_multiplier) as u64) + ( + *partn_percent, + ((rows_per_partitions_base as f64) * partn_multiplier) as u64, + ), ); - let partition_type_size: f64 = rows_per_partitions_base as f64 * partn_multiplier * partn_percent / 100.0; + let partition_type_size: f64 = + rows_per_partitions_base as f64 * partn_multiplier * partn_percent / 100.0; partn_cycle_size += partition_type_size; } let mut partn_count: u64 = (row_count as f64 / partn_cycle_size) as u64; @@ -409,11 +428,16 @@ impl Context { if partition.2 == row_count_diff { partitions[i].1 += 1; same_size_exists = true; - break + break; } } if !same_size_exists { - partitions.push(((100000.0 / (partn_count as f64)).round() / 1000.0, 1, row_count_diff, 1.0)); + partitions.push(( + (100000.0 / (partn_count as f64)).round() / 1000.0, + 1, + row_count_diff, + 1.0, + )); } actual_row_count += row_count_diff; } @@ -430,7 +454,8 @@ impl Context { let int_part = parts[0]; let mut frac_part: String = "".to_string(); if parts[1].matches("0").count() != parts[1].len() { - frac_part = parts[1].chars() + frac_part = parts[1] + .chars() .take_while(|&ch| ch == '0') .chain(parts[1].chars().filter(|&ch| ch != '0').take(2)) .collect::(); @@ -449,19 +474,20 @@ impl Context { "info: init_partition_row_distribution_preset: \ preset_name={preset_name}, total_partitions={partn_count}, total_rows={total_rows}\ , partitions/rows -> {partitions}", - preset_name=preset_name, - partn_count=partn_count, - total_rows=actual_row_count, - partitions=partitions_str, + preset_name = preset_name, + partn_count = partn_count, + total_rows = actual_row_count, + partitions = partitions_str, ); // Save data for further usage let mut partition_groups = vec![]; for partition in partitions { if partition.1 > 0 { - partition_groups.push( - PartitionGroup{ n_rows_per_group: partition.1 * partition.2, n_partitions: partition.1} - ); + partition_groups.push(PartitionGroup { + n_rows_per_group: partition.1 * partition.2, + n_partitions: partition.1, + }); } } // NOTE: sort partition groups in the size descending order to minimize the cumulative @@ -470,16 +496,19 @@ impl Context { let mut row_distribution_preset = RowDistributionPreset::new(partition_groups); // NOTE: generate row distributions only after the partition groups are finished with changes row_distribution_preset.generate_row_distributions(); - self.partition_row_presets.insert(preset_name.to_string(), row_distribution_preset); + self.partition_row_presets + .insert(preset_name.to_string(), row_distribution_preset); Ok(()) } /// Returns a partition index based on the stress operation index and a preset of values pub async fn get_partition_idx(&self, preset_name: &str, idx: u64) -> Result { - let preset = self.partition_row_presets - .get(preset_name) - .ok_or_else(|| CassError(CassErrorKind::PartitionRowPresetNotFound(preset_name.to_string())))?; + let preset = self.partition_row_presets.get(preset_name).ok_or_else(|| { + CassError(CassErrorKind::PartitionRowPresetNotFound( + preset_name.to_string(), + )) + })?; Ok(preset.get_partition_idx(idx).await) } @@ -491,8 +520,10 @@ impl Context { let mut datacenters: Vec = dc_info.keys().cloned().collect(); datacenters.sort(); Ok(datacenters) - }, - None => Err(CassError(CassErrorKind::Error("'session' is not defined".to_string()))), + } + None => Err(CassError(CassErrorKind::Error( + "'session' is not defined".to_string(), + ))), } } @@ -507,8 +538,10 @@ impl Context { .map_err(|e| CassError::prepare_error(cql, e))?; self.statements.insert(key.to_string(), Arc::new(statement)); Ok(()) - }, - None => Err(CassError(CassErrorKind::Error("'session' is not defined".to_string()))), + } + None => Err(CassError(CassErrorKind::Error( + "'session' is not defined".to_string(), + ))), } } @@ -524,7 +557,9 @@ impl Context { let mut current_attempt_num = 0; while current_attempt_num <= self.retry_number { let start_time = self.stats.try_lock().unwrap().start_request(); - let rs = session.query_paged(query.clone(), (), paging_state.clone()).await; + let rs = session + .query_paged(query.clone(), (), paging_state.clone()) + .await; let current_duration = Instant::now() - start_time; match rs { Ok(ref page) => { @@ -533,7 +568,8 @@ impl Context { all_pages_duration += current_duration; } Err(e) => { - let current_error = CassError::query_execution_error(cql, &[], e.clone()); + let current_error = + CassError::query_execution_error(cql, &[], e.clone()); handle_retry_error(self, current_attempt_num, current_error).await; current_attempt_num += 1; continue; @@ -541,18 +577,21 @@ impl Context { } if paging_state.is_some() { current_attempt_num = 0; - continue // get next page + continue; // get next page } - self.stats - .try_lock() - .unwrap() - .complete_request(all_pages_duration, Some(rows_cnt), &rs); + self.stats.try_lock().unwrap().complete_request( + all_pages_duration, + Some(rows_cnt), + &rs, + ); rs.map_err(|e| CassError::query_execution_error(cql, &[], e.clone()))?; return Ok(()); } Err(CassError::query_retries_exceeded(self.retry_number)) - }, - None => Err(CassError(CassErrorKind::Error("'session' is not defined".to_string()))), + } + None => Err(CassError(CassErrorKind::Error( + "'session' is not defined".to_string(), + ))), } } @@ -573,7 +612,9 @@ impl Context { let mut current_attempt_num = 0; while current_attempt_num <= self.retry_number { let start_time = self.stats.try_lock().unwrap().start_request(); - let rs = session.execute_paged(statement, params.clone(), paging_state.clone()).await; + let rs = session + .execute_paged(statement, params.clone(), paging_state.clone()) + .await; let current_duration = Instant::now() - start_time; match rs { Ok(ref page) => { @@ -594,24 +635,26 @@ impl Context { } if paging_state.is_some() { current_attempt_num = 0; - continue // get next page + continue; // get next page } - self.stats - .try_lock() - .unwrap() - .complete_request(all_pages_duration, Some(rows_cnt), &rs); + self.stats.try_lock().unwrap().complete_request( + all_pages_duration, + Some(rows_cnt), + &rs, + ); rs.map_err(|e| { CassError::query_execution_error(statement.get_statement(), ¶ms, e) })?; return Ok(()); } Err(CassError::query_retries_exceeded(self.retry_number)) - }, - None => Err(CassError(CassErrorKind::Error("'session' is not defined".to_string()))), + } + None => Err(CassError(CassErrorKind::Error( + "'session' is not defined".to_string(), + ))), } } - pub fn elapsed_secs(&self) -> f64 { self.start_time.try_lock().unwrap().elapsed().as_secs_f64() } @@ -691,6 +734,7 @@ fn gcd(n1: u64, n2: u64) -> u64 { /// Takes numbers of rows for 2 DB partition groups and calculates the best approach /// for getting the most dispered and the least clustered, by partition sizes, distribution. +#[rustfmt::skip] fn max_gcd_with_tail(n1: u64, n2: u64) -> ( u64, // greatest common divisor (u64, u64), // (multiplier_based_on_n1, tail_n1) @@ -730,6 +774,7 @@ fn max_gcd_with_tail(n1: u64, n2: u64) -> ( (max_gcd, best_split_n1, best_split_n2) } +#[rustfmt::skip] #[cfg(test)] mod tests { use super::*; diff --git a/src/scripting/functions.rs b/src/scripting/functions.rs index ad05afe..0ca6d95 100644 --- a/src/scripting/functions.rs +++ b/src/scripting/functions.rs @@ -117,9 +117,8 @@ pub fn blob(seed: i64, len: usize) -> Vec { /// the RNG. #[rune::function] pub fn text(seed: i64, len: usize) -> String { - let charset: Vec = ( - "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ".to_owned() + - "0123456789!@#$%^&*()_+-=[]{}|;:',.<>?/") + let charset: Vec = ("abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ".to_owned() + + "0123456789!@#$%^&*()_+-=[]{}|;:',.<>?/") .chars() .collect(); let mut rng = SmallRng::seed_from_u64(seed as u64); @@ -273,12 +272,15 @@ pub async fn init_partition_row_distribution_preset( row_count, rows_per_partitions_base, &rows_per_partitions_groups, - ).await + ) + .await } #[rune::function(instance)] pub async fn get_partition_idx(ctx: Ref, preset_name: Ref, idx: u64) -> u64 { - ctx.get_partition_idx(&preset_name, idx).await.expect("REASON") + ctx.get_partition_idx(&preset_name, idx) + .await + .expect("REASON") } #[rune::function(instance)] diff --git a/src/stats/session.rs b/src/stats/session.rs index 1c3f3de..82da104 100644 --- a/src/stats/session.rs +++ b/src/stats/session.rs @@ -43,11 +43,9 @@ impl SessionStats { self.resp_times_ns.record(duration); self.req_count += 1; match rs { - Ok(rs) => { - match total_rows { - Some(n) => self.row_count += n, - None => self.row_count += rs.rows.as_ref().map(|r| r.len()).unwrap_or(0) as u64, - } + Ok(rs) => match total_rows { + Some(n) => self.row_count += n, + None => self.row_count += rs.rows.as_ref().map(|r| r.len()).unwrap_or(0) as u64, }, Err(e) => { self.req_error_count += 1;