From 660c4b1d16886229c0761d65d6caf85953368217 Mon Sep 17 00:00:00 2001 From: Daniel Hegberg Date: Wed, 4 Dec 2024 20:13:47 -0800 Subject: [PATCH] Add csv loading benchmarks. (#13544) * Add csv loading benchmarks. * Fix fmt. * Fix clippy. --- .gitignore | 3 ++ datafusion/core/Cargo.toml | 4 ++ datafusion/core/benches/csv_load.rs | 81 ++++++++++++++++++++++++++++ datafusion/core/src/test_util/csv.rs | 69 ++++++++++++++++++++++++ datafusion/core/src/test_util/mod.rs | 2 + test-utils/src/data_gen.rs | 32 ++++++++--- 6 files changed, 184 insertions(+), 7 deletions(-) create mode 100644 datafusion/core/benches/csv_load.rs create mode 100644 datafusion/core/src/test_util/csv.rs diff --git a/.gitignore b/.gitignore index 8195760513f7c..1fa79249ff8e0 100644 --- a/.gitignore +++ b/.gitignore @@ -67,6 +67,9 @@ datafusion/sqllogictest/test_files/scratch* # temp file for core datafusion/core/*.parquet +# Generated core benchmark data +datafusion/core/benches/data/* + # rat filtered_rat.txt rat.txt diff --git a/datafusion/core/Cargo.toml b/datafusion/core/Cargo.toml index 90b8abc622605..48427f7ccdcc5 100644 --- a/datafusion/core/Cargo.toml +++ b/datafusion/core/Cargo.toml @@ -159,6 +159,10 @@ nix = { version = "0.29.0", features = ["fs"] } harness = false name = "aggregate_query_sql" +[[bench]] +harness = false +name = "csv_load" + [[bench]] harness = false name = "distinct_query_sql" diff --git a/datafusion/core/benches/csv_load.rs b/datafusion/core/benches/csv_load.rs new file mode 100644 index 0000000000000..5f707b31a6a93 --- /dev/null +++ b/datafusion/core/benches/csv_load.rs @@ -0,0 +1,81 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#[macro_use] +extern crate criterion; +extern crate arrow; +extern crate datafusion; + +mod data_utils; +use crate::criterion::Criterion; +use datafusion::error::Result; +use datafusion::execution::context::SessionContext; +use datafusion::prelude::CsvReadOptions; +use datafusion::test_util::csv::TestCsvFile; +use parking_lot::Mutex; +use std::sync::Arc; +use std::time::Duration; +use test_utils::AccessLogGenerator; +use tokio::runtime::Runtime; + +fn load_csv(ctx: Arc>, path: &str, options: CsvReadOptions) { + let rt = Runtime::new().unwrap(); + let df = rt.block_on(ctx.lock().read_csv(path, options)).unwrap(); + criterion::black_box(rt.block_on(df.collect()).unwrap()); +} + +fn create_context() -> Result>> { + let ctx = SessionContext::new(); + Ok(Arc::new(Mutex::new(ctx))) +} + +fn generate_test_file() -> TestCsvFile { + let write_location = std::env::current_dir() + .unwrap() + .join("benches") + .join("data"); + + // Make sure the write directory exists. + std::fs::create_dir_all(&write_location).unwrap(); + let file_path = write_location.join("logs.csv"); + + let generator = AccessLogGenerator::new().with_include_nulls(true); + let num_batches = 2; + TestCsvFile::try_new(file_path.clone(), generator.take(num_batches as usize)) + .expect("Failed to create test file.") +} + +fn criterion_benchmark(c: &mut Criterion) { + let ctx = create_context().unwrap(); + let test_file = generate_test_file(); + + let mut group = c.benchmark_group("load csv testing"); + group.measurement_time(Duration::from_secs(20)); + + group.bench_function("default csv read options", |b| { + b.iter(|| { + load_csv( + ctx.clone(), + test_file.path().to_str().unwrap(), + CsvReadOptions::default(), + ) + }) + }); +} + +criterion_group!(benches, criterion_benchmark); +criterion_main!(benches); diff --git a/datafusion/core/src/test_util/csv.rs b/datafusion/core/src/test_util/csv.rs new file mode 100644 index 0000000000000..94c7efb954022 --- /dev/null +++ b/datafusion/core/src/test_util/csv.rs @@ -0,0 +1,69 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Helpers for writing csv files and reading them back + +use std::fs::File; +use std::path::PathBuf; +use std::sync::Arc; + +use crate::arrow::{datatypes::SchemaRef, record_batch::RecordBatch}; +use crate::error::Result; + +use arrow::csv::WriterBuilder; + +/// a CSV file that has been created for testing. +pub struct TestCsvFile { + path: PathBuf, + schema: SchemaRef, +} + +impl TestCsvFile { + /// Creates a new csv file at the specified location + pub fn try_new( + path: PathBuf, + batches: impl IntoIterator, + ) -> Result { + let file = File::create(&path).unwrap(); + let builder = WriterBuilder::new().with_header(true); + let mut writer = builder.build(file); + + let mut batches = batches.into_iter(); + let first_batch = batches.next().expect("need at least one record batch"); + let schema = first_batch.schema(); + + let mut num_rows = 0; + for batch in batches { + writer.write(&batch)?; + num_rows += batch.num_rows(); + } + + println!("Generated test dataset with {num_rows} rows"); + + Ok(Self { path, schema }) + } + + /// The schema of this csv file + pub fn schema(&self) -> SchemaRef { + Arc::clone(&self.schema) + } + + /// The path to the csv file + pub fn path(&self) -> &std::path::Path { + self.path.as_path() + } +} diff --git a/datafusion/core/src/test_util/mod.rs b/datafusion/core/src/test_util/mod.rs index c4c84d667a068..09608887c0f1d 100644 --- a/datafusion/core/src/test_util/mod.rs +++ b/datafusion/core/src/test_util/mod.rs @@ -20,6 +20,8 @@ #[cfg(feature = "parquet")] pub mod parquet; +pub mod csv; + use std::any::Any; use std::collections::HashMap; use std::fs::File; diff --git a/test-utils/src/data_gen.rs b/test-utils/src/data_gen.rs index 45ad51bb44d66..4227f2d9a737b 100644 --- a/test-utils/src/data_gen.rs +++ b/test-utils/src/data_gen.rs @@ -33,6 +33,7 @@ struct GeneratorOptions { pods_per_host: Range, containers_per_pod: Range, entries_per_container: Range, + include_nulls: bool, } impl Default for GeneratorOptions { @@ -42,6 +43,7 @@ impl Default for GeneratorOptions { pods_per_host: 1..15, containers_per_pod: 1..3, entries_per_container: 1024..8192, + include_nulls: false, } } } @@ -149,13 +151,23 @@ impl BatchBuilder { self.image.append(image).unwrap(); self.time.append_value(time); - self.client_addr.append_value(format!( - "{}.{}.{}.{}", - rng.gen::(), - rng.gen::(), - rng.gen::(), - rng.gen::() - )); + if self.options.include_nulls { + // Append a null value if the option is set + // Use both "NULL" as a string and a null value + if rng.gen_bool(0.5) { + self.client_addr.append_null(); + } else { + self.client_addr.append_value("NULL"); + } + } else { + self.client_addr.append_value(format!( + "{}.{}.{}.{}", + rng.gen::(), + rng.gen::(), + rng.gen::(), + rng.gen::() + )); + } self.request_duration.append_value(rng.gen()); self.request_user_agent .append_value(random_string(rng, 20..100)); @@ -317,6 +329,12 @@ impl AccessLogGenerator { self.options.entries_per_container = range; self } + + // Set the condition for null values in the generated data + pub fn with_include_nulls(mut self, include_nulls: bool) -> Self { + self.options.include_nulls = include_nulls; + self + } } impl Iterator for AccessLogGenerator {