Skip to content

Commit

Permalink
fix: make nyc-taxi bench work again (#2599)
Browse files Browse the repository at this point in the history
* fix: invalid requests created by nyc-taxi

* feat: add timestamp to table name

* style: fix clippy

* chore: re-export deps for client

* fix: wait result

* chore: no need to define a prefix constant
  • Loading branch information
evenyag authored Oct 13, 2023
1 parent f859932 commit 8a0054a
Show file tree
Hide file tree
Showing 4 changed files with 51 additions and 25 deletions.
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions benchmarks/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,10 @@ license.workspace = true

[dependencies]
arrow.workspace = true
chrono.workspace = true
clap = { version = "4.0", features = ["derive"] }
client = { workspace = true }
futures-util.workspace = true
indicatif = "0.17.1"
itertools.workspace = true
parquet.workspace = true
Expand Down
70 changes: 45 additions & 25 deletions benchmarks/src/bin/nyc-taxi.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,14 +29,14 @@ use client::api::v1::column::Values;
use client::api::v1::{
Column, ColumnDataType, ColumnDef, CreateTableExpr, InsertRequest, InsertRequests, SemanticType,
};
use client::{Client, Database, DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME};
use client::{Client, Database, Output, DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME};
use futures_util::TryStreamExt;
use indicatif::{MultiProgress, ProgressBar, ProgressStyle};
use parquet::arrow::arrow_reader::ParquetRecordBatchReaderBuilder;
use tokio::task::JoinSet;

const CATALOG_NAME: &str = "greptime";
const SCHEMA_NAME: &str = "public";
const TABLE_NAME: &str = "nyc_taxi";

#[derive(Parser)]
#[command(name = "NYC benchmark runner")]
Expand Down Expand Up @@ -74,7 +74,12 @@ fn get_file_list<P: AsRef<Path>>(path: P) -> Vec<PathBuf> {
.collect()
}

fn new_table_name() -> String {
format!("nyc_taxi_{}", chrono::Utc::now().timestamp())
}

async fn write_data(
table_name: &str,
batch_size: usize,
db: &Database,
path: PathBuf,
Expand Down Expand Up @@ -104,7 +109,7 @@ async fn write_data(
}
let (columns, row_count) = convert_record_batch(record_batch);
let request = InsertRequest {
table_name: TABLE_NAME.to_string(),
table_name: table_name.to_string(),
columns,
row_count,
};
Expand All @@ -113,7 +118,7 @@ async fn write_data(
};

let now = Instant::now();
let _ = db.insert(requests).await.unwrap();
db.insert(requests).await.unwrap();
let elapsed = now.elapsed();
total_rpc_elapsed_ms += elapsed.as_millis();
progress_bar.inc(row_count as _);
Expand All @@ -131,6 +136,11 @@ fn convert_record_batch(record_batch: RecordBatch) -> (Vec<Column>, u32) {

for (array, field) in record_batch.columns().iter().zip(fields.iter()) {
let (values, datatype) = build_values(array);
let semantic_type = match field.name().as_str() {
"VendorID" => SemanticType::Tag,
"tpep_pickup_datetime" => SemanticType::Timestamp,
_ => SemanticType::Field,
};

let column = Column {
column_name: field.name().clone(),
Expand All @@ -141,8 +151,7 @@ fn convert_record_batch(record_batch: RecordBatch) -> (Vec<Column>, u32) {
.map(|bitmap| bitmap.buffer().as_slice().to_vec())
.unwrap_or_default(),
datatype: datatype.into(),
// datatype and semantic_type are set to default
..Default::default()
semantic_type: semantic_type as i32,
};
columns.push(column);
}
Expand Down Expand Up @@ -243,11 +252,11 @@ fn is_record_batch_full(batch: &RecordBatch) -> bool {
batch.columns().iter().all(|col| col.null_count() == 0)
}

fn create_table_expr() -> CreateTableExpr {
fn create_table_expr(table_name: &str) -> CreateTableExpr {
CreateTableExpr {
catalog_name: CATALOG_NAME.to_string(),
schema_name: SCHEMA_NAME.to_string(),
table_name: TABLE_NAME.to_string(),
table_name: table_name.to_string(),
desc: "".to_string(),
column_defs: vec![
ColumnDef {
Expand All @@ -261,7 +270,7 @@ fn create_table_expr() -> CreateTableExpr {
ColumnDef {
name: "tpep_pickup_datetime".to_string(),
data_type: ColumnDataType::TimestampMicrosecond as i32,
is_nullable: true,
is_nullable: false,
default_constraint: vec![],
semantic_type: SemanticType::Timestamp as i32,
comment: String::new(),
Expand Down Expand Up @@ -405,31 +414,31 @@ fn create_table_expr() -> CreateTableExpr {
],
time_index: "tpep_pickup_datetime".to_string(),
primary_keys: vec!["VendorID".to_string()],
create_if_not_exists: false,
create_if_not_exists: true,
table_options: Default::default(),
table_id: None,
engine: "mito".to_string(),
}
}

fn query_set() -> HashMap<String, String> {
fn query_set(table_name: &str) -> HashMap<String, String> {
HashMap::from([
(
"count_all".to_string(),
format!("SELECT COUNT(*) FROM {TABLE_NAME};"),
format!("SELECT COUNT(*) FROM {table_name};"),
),
(
"fare_amt_by_passenger".to_string(),
format!("SELECT passenger_count, MIN(fare_amount), MAX(fare_amount), SUM(fare_amount) FROM {TABLE_NAME} GROUP BY passenger_count"),
format!("SELECT passenger_count, MIN(fare_amount), MAX(fare_amount), SUM(fare_amount) FROM {table_name} GROUP BY passenger_count"),
)
])
}

async fn do_write(args: &Args, db: &Database) {
async fn do_write(args: &Args, db: &Database, table_name: &str) {
let mut file_list = get_file_list(args.path.clone().expect("Specify data path in argument"));
let mut write_jobs = JoinSet::new();

let create_table_result = db.create(create_table_expr()).await;
let create_table_result = db.create(create_table_expr(table_name)).await;
println!("Create table result: {create_table_result:?}");

let progress_bar_style = ProgressStyle::with_template(
Expand All @@ -447,8 +456,10 @@ async fn do_write(args: &Args, db: &Database) {
let db = db.clone();
let mpb = multi_progress_bar.clone();
let pb_style = progress_bar_style.clone();
let _ = write_jobs
.spawn(async move { write_data(batch_size, &db, path, mpb, pb_style).await });
let table_name = table_name.to_string();
let _ = write_jobs.spawn(async move {
write_data(&table_name, batch_size, &db, path, mpb, pb_style).await
});
}
}
while write_jobs.join_next().await.is_some() {
Expand All @@ -457,24 +468,32 @@ async fn do_write(args: &Args, db: &Database) {
let db = db.clone();
let mpb = multi_progress_bar.clone();
let pb_style = progress_bar_style.clone();
let _ = write_jobs
.spawn(async move { write_data(batch_size, &db, path, mpb, pb_style).await });
let table_name = table_name.to_string();
let _ = write_jobs.spawn(async move {
write_data(&table_name, batch_size, &db, path, mpb, pb_style).await
});
}
}
}

async fn do_query(num_iter: usize, db: &Database) {
for (query_name, query) in query_set() {
async fn do_query(num_iter: usize, db: &Database, table_name: &str) {
for (query_name, query) in query_set(table_name) {
println!("Running query: {query}");
for i in 0..num_iter {
let now = Instant::now();
let _res = db.sql(&query).await.unwrap();
let res = db.sql(&query).await.unwrap();
match res {
Output::AffectedRows(_) | Output::RecordBatches(_) => (),
Output::Stream(stream) => {
stream.try_collect::<Vec<_>>().await.unwrap();
}
}
let elapsed = now.elapsed();
println!(
"query {}, iteration {}: {}ms",
query_name,
i,
elapsed.as_millis()
elapsed.as_millis(),
);
}
}
Expand All @@ -491,13 +510,14 @@ fn main() {
.block_on(async {
let client = Client::with_urls(vec![&args.endpoint]);
let db = Database::new(DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, client);
let table_name = new_table_name();

if !args.skip_write {
do_write(&args, &db).await;
do_write(&args, &db, &table_name).await;
}

if !args.skip_read {
do_query(args.iter_num, &db).await;
do_query(args.iter_num, &db, &table_name).await;
}
})
}
2 changes: 2 additions & 0 deletions src/client/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@ use api::v1::greptime_response::Response;
use api::v1::{AffectedRows, GreptimeResponse};
pub use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME};
use common_error::status_code::StatusCode;
pub use common_query::Output;
pub use common_recordbatch::{RecordBatches, SendableRecordBatchStream};
use snafu::OptionExt;

pub use self::client::Client;
Expand Down

0 comments on commit 8a0054a

Please sign in to comment.