From 8a0054aa898f890df1cb92619a0b0a18caa03560 Mon Sep 17 00:00:00 2001 From: Yingwen Date: Fri, 13 Oct 2023 16:16:26 +0800 Subject: [PATCH] fix: make nyc-taxi bench work again (#2599) * fix: invalid requests created by nyc-taxi * feat: add timestamp to table name * style: fix clippy * chore: re-export deps for client * fix: wait result * chore: no need to define a prefix constant --- Cargo.lock | 2 + benchmarks/Cargo.toml | 2 + benchmarks/src/bin/nyc-taxi.rs | 70 ++++++++++++++++++++++------------ src/client/src/lib.rs | 2 + 4 files changed, 51 insertions(+), 25 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 2730af59070d..95a0d66cc619 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -842,8 +842,10 @@ name = "benchmarks" version = "0.4.0" dependencies = [ "arrow", + "chrono", "clap 4.4.1", "client", + "futures-util", "indicatif", "itertools 0.10.5", "parquet", diff --git a/benchmarks/Cargo.toml b/benchmarks/Cargo.toml index 3bbe8dc86140..dca955714470 100644 --- a/benchmarks/Cargo.toml +++ b/benchmarks/Cargo.toml @@ -6,8 +6,10 @@ license.workspace = true [dependencies] arrow.workspace = true +chrono.workspace = true clap = { version = "4.0", features = ["derive"] } client = { workspace = true } +futures-util.workspace = true indicatif = "0.17.1" itertools.workspace = true parquet.workspace = true diff --git a/benchmarks/src/bin/nyc-taxi.rs b/benchmarks/src/bin/nyc-taxi.rs index 5f14c655512b..b30989625f73 100644 --- a/benchmarks/src/bin/nyc-taxi.rs +++ b/benchmarks/src/bin/nyc-taxi.rs @@ -29,14 +29,14 @@ use client::api::v1::column::Values; use client::api::v1::{ Column, ColumnDataType, ColumnDef, CreateTableExpr, InsertRequest, InsertRequests, SemanticType, }; -use client::{Client, Database, DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME}; +use client::{Client, Database, Output, DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME}; +use futures_util::TryStreamExt; use indicatif::{MultiProgress, ProgressBar, ProgressStyle}; use parquet::arrow::arrow_reader::ParquetRecordBatchReaderBuilder; use tokio::task::JoinSet; const CATALOG_NAME: &str = "greptime"; const SCHEMA_NAME: &str = "public"; -const TABLE_NAME: &str = "nyc_taxi"; #[derive(Parser)] #[command(name = "NYC benchmark runner")] @@ -74,7 +74,12 @@ fn get_file_list>(path: P) -> Vec { .collect() } +fn new_table_name() -> String { + format!("nyc_taxi_{}", chrono::Utc::now().timestamp()) +} + async fn write_data( + table_name: &str, batch_size: usize, db: &Database, path: PathBuf, @@ -104,7 +109,7 @@ async fn write_data( } let (columns, row_count) = convert_record_batch(record_batch); let request = InsertRequest { - table_name: TABLE_NAME.to_string(), + table_name: table_name.to_string(), columns, row_count, }; @@ -113,7 +118,7 @@ async fn write_data( }; let now = Instant::now(); - let _ = db.insert(requests).await.unwrap(); + db.insert(requests).await.unwrap(); let elapsed = now.elapsed(); total_rpc_elapsed_ms += elapsed.as_millis(); progress_bar.inc(row_count as _); @@ -131,6 +136,11 @@ fn convert_record_batch(record_batch: RecordBatch) -> (Vec, u32) { for (array, field) in record_batch.columns().iter().zip(fields.iter()) { let (values, datatype) = build_values(array); + let semantic_type = match field.name().as_str() { + "VendorID" => SemanticType::Tag, + "tpep_pickup_datetime" => SemanticType::Timestamp, + _ => SemanticType::Field, + }; let column = Column { column_name: field.name().clone(), @@ -141,8 +151,7 @@ fn convert_record_batch(record_batch: RecordBatch) -> (Vec, u32) { .map(|bitmap| bitmap.buffer().as_slice().to_vec()) .unwrap_or_default(), datatype: datatype.into(), - // datatype and semantic_type are set to default - ..Default::default() + semantic_type: semantic_type as i32, }; columns.push(column); } @@ -243,11 +252,11 @@ fn is_record_batch_full(batch: &RecordBatch) -> bool { batch.columns().iter().all(|col| col.null_count() == 0) } -fn create_table_expr() -> CreateTableExpr { +fn create_table_expr(table_name: &str) -> CreateTableExpr { CreateTableExpr { catalog_name: CATALOG_NAME.to_string(), schema_name: SCHEMA_NAME.to_string(), - table_name: TABLE_NAME.to_string(), + table_name: table_name.to_string(), desc: "".to_string(), column_defs: vec![ ColumnDef { @@ -261,7 +270,7 @@ fn create_table_expr() -> CreateTableExpr { ColumnDef { name: "tpep_pickup_datetime".to_string(), data_type: ColumnDataType::TimestampMicrosecond as i32, - is_nullable: true, + is_nullable: false, default_constraint: vec![], semantic_type: SemanticType::Timestamp as i32, comment: String::new(), @@ -405,31 +414,31 @@ fn create_table_expr() -> CreateTableExpr { ], time_index: "tpep_pickup_datetime".to_string(), primary_keys: vec!["VendorID".to_string()], - create_if_not_exists: false, + create_if_not_exists: true, table_options: Default::default(), table_id: None, engine: "mito".to_string(), } } -fn query_set() -> HashMap { +fn query_set(table_name: &str) -> HashMap { HashMap::from([ ( "count_all".to_string(), - format!("SELECT COUNT(*) FROM {TABLE_NAME};"), + format!("SELECT COUNT(*) FROM {table_name};"), ), ( "fare_amt_by_passenger".to_string(), - format!("SELECT passenger_count, MIN(fare_amount), MAX(fare_amount), SUM(fare_amount) FROM {TABLE_NAME} GROUP BY passenger_count"), + format!("SELECT passenger_count, MIN(fare_amount), MAX(fare_amount), SUM(fare_amount) FROM {table_name} GROUP BY passenger_count"), ) ]) } -async fn do_write(args: &Args, db: &Database) { +async fn do_write(args: &Args, db: &Database, table_name: &str) { let mut file_list = get_file_list(args.path.clone().expect("Specify data path in argument")); let mut write_jobs = JoinSet::new(); - let create_table_result = db.create(create_table_expr()).await; + let create_table_result = db.create(create_table_expr(table_name)).await; println!("Create table result: {create_table_result:?}"); let progress_bar_style = ProgressStyle::with_template( @@ -447,8 +456,10 @@ async fn do_write(args: &Args, db: &Database) { let db = db.clone(); let mpb = multi_progress_bar.clone(); let pb_style = progress_bar_style.clone(); - let _ = write_jobs - .spawn(async move { write_data(batch_size, &db, path, mpb, pb_style).await }); + let table_name = table_name.to_string(); + let _ = write_jobs.spawn(async move { + write_data(&table_name, batch_size, &db, path, mpb, pb_style).await + }); } } while write_jobs.join_next().await.is_some() { @@ -457,24 +468,32 @@ async fn do_write(args: &Args, db: &Database) { let db = db.clone(); let mpb = multi_progress_bar.clone(); let pb_style = progress_bar_style.clone(); - let _ = write_jobs - .spawn(async move { write_data(batch_size, &db, path, mpb, pb_style).await }); + let table_name = table_name.to_string(); + let _ = write_jobs.spawn(async move { + write_data(&table_name, batch_size, &db, path, mpb, pb_style).await + }); } } } -async fn do_query(num_iter: usize, db: &Database) { - for (query_name, query) in query_set() { +async fn do_query(num_iter: usize, db: &Database, table_name: &str) { + for (query_name, query) in query_set(table_name) { println!("Running query: {query}"); for i in 0..num_iter { let now = Instant::now(); - let _res = db.sql(&query).await.unwrap(); + let res = db.sql(&query).await.unwrap(); + match res { + Output::AffectedRows(_) | Output::RecordBatches(_) => (), + Output::Stream(stream) => { + stream.try_collect::>().await.unwrap(); + } + } let elapsed = now.elapsed(); println!( "query {}, iteration {}: {}ms", query_name, i, - elapsed.as_millis() + elapsed.as_millis(), ); } } @@ -491,13 +510,14 @@ fn main() { .block_on(async { let client = Client::with_urls(vec![&args.endpoint]); let db = Database::new(DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, client); + let table_name = new_table_name(); if !args.skip_write { - do_write(&args, &db).await; + do_write(&args, &db, &table_name).await; } if !args.skip_read { - do_query(args.iter_num, &db).await; + do_query(args.iter_num, &db, &table_name).await; } }) } diff --git a/src/client/src/lib.rs b/src/client/src/lib.rs index 23a67ebae1bd..7f8330f68902 100644 --- a/src/client/src/lib.rs +++ b/src/client/src/lib.rs @@ -26,6 +26,8 @@ use api::v1::greptime_response::Response; use api::v1::{AffectedRows, GreptimeResponse}; pub use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME}; use common_error::status_code::StatusCode; +pub use common_query::Output; +pub use common_recordbatch::{RecordBatches, SendableRecordBatchStream}; use snafu::OptionExt; pub use self::client::Client;