Skip to content

Commit

Permalink
Add csv loading benchmarks. (#13544)
Browse files Browse the repository at this point in the history
* Add csv loading benchmarks.

* Fix fmt.

* Fix clippy.
  • Loading branch information
dhegberg authored and zhuliquan committed Dec 6, 2024
1 parent bdfbb31 commit 660c4b1
Show file tree
Hide file tree
Showing 6 changed files with 184 additions and 7 deletions.
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,9 @@ datafusion/sqllogictest/test_files/scratch*
# temp file for core
datafusion/core/*.parquet

# Generated core benchmark data
datafusion/core/benches/data/*

# rat
filtered_rat.txt
rat.txt
4 changes: 4 additions & 0 deletions datafusion/core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,10 @@ nix = { version = "0.29.0", features = ["fs"] }
harness = false
name = "aggregate_query_sql"

[[bench]]
harness = false
name = "csv_load"

[[bench]]
harness = false
name = "distinct_query_sql"
Expand Down
81 changes: 81 additions & 0 deletions datafusion/core/benches/csv_load.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

#[macro_use]
extern crate criterion;
extern crate arrow;
extern crate datafusion;

mod data_utils;
use crate::criterion::Criterion;
use datafusion::error::Result;
use datafusion::execution::context::SessionContext;
use datafusion::prelude::CsvReadOptions;
use datafusion::test_util::csv::TestCsvFile;
use parking_lot::Mutex;
use std::sync::Arc;
use std::time::Duration;
use test_utils::AccessLogGenerator;
use tokio::runtime::Runtime;

fn load_csv(ctx: Arc<Mutex<SessionContext>>, path: &str, options: CsvReadOptions) {
let rt = Runtime::new().unwrap();
let df = rt.block_on(ctx.lock().read_csv(path, options)).unwrap();
criterion::black_box(rt.block_on(df.collect()).unwrap());
}

fn create_context() -> Result<Arc<Mutex<SessionContext>>> {
let ctx = SessionContext::new();
Ok(Arc::new(Mutex::new(ctx)))
}

fn generate_test_file() -> TestCsvFile {
let write_location = std::env::current_dir()
.unwrap()
.join("benches")
.join("data");

// Make sure the write directory exists.
std::fs::create_dir_all(&write_location).unwrap();
let file_path = write_location.join("logs.csv");

let generator = AccessLogGenerator::new().with_include_nulls(true);
let num_batches = 2;
TestCsvFile::try_new(file_path.clone(), generator.take(num_batches as usize))
.expect("Failed to create test file.")
}

fn criterion_benchmark(c: &mut Criterion) {
let ctx = create_context().unwrap();
let test_file = generate_test_file();

let mut group = c.benchmark_group("load csv testing");
group.measurement_time(Duration::from_secs(20));

group.bench_function("default csv read options", |b| {
b.iter(|| {
load_csv(
ctx.clone(),
test_file.path().to_str().unwrap(),
CsvReadOptions::default(),
)
})
});
}

criterion_group!(benches, criterion_benchmark);
criterion_main!(benches);
69 changes: 69 additions & 0 deletions datafusion/core/src/test_util/csv.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

//! Helpers for writing csv files and reading them back
use std::fs::File;
use std::path::PathBuf;
use std::sync::Arc;

use crate::arrow::{datatypes::SchemaRef, record_batch::RecordBatch};
use crate::error::Result;

use arrow::csv::WriterBuilder;

/// a CSV file that has been created for testing.
pub struct TestCsvFile {
path: PathBuf,
schema: SchemaRef,
}

impl TestCsvFile {
/// Creates a new csv file at the specified location
pub fn try_new(
path: PathBuf,
batches: impl IntoIterator<Item = RecordBatch>,
) -> Result<Self> {
let file = File::create(&path).unwrap();
let builder = WriterBuilder::new().with_header(true);
let mut writer = builder.build(file);

let mut batches = batches.into_iter();
let first_batch = batches.next().expect("need at least one record batch");
let schema = first_batch.schema();

let mut num_rows = 0;
for batch in batches {
writer.write(&batch)?;
num_rows += batch.num_rows();
}

println!("Generated test dataset with {num_rows} rows");

Ok(Self { path, schema })
}

/// The schema of this csv file
pub fn schema(&self) -> SchemaRef {
Arc::clone(&self.schema)
}

/// The path to the csv file
pub fn path(&self) -> &std::path::Path {
self.path.as_path()
}
}
2 changes: 2 additions & 0 deletions datafusion/core/src/test_util/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@
#[cfg(feature = "parquet")]
pub mod parquet;

pub mod csv;

use std::any::Any;
use std::collections::HashMap;
use std::fs::File;
Expand Down
32 changes: 25 additions & 7 deletions test-utils/src/data_gen.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ struct GeneratorOptions {
pods_per_host: Range<usize>,
containers_per_pod: Range<usize>,
entries_per_container: Range<usize>,
include_nulls: bool,
}

impl Default for GeneratorOptions {
Expand All @@ -42,6 +43,7 @@ impl Default for GeneratorOptions {
pods_per_host: 1..15,
containers_per_pod: 1..3,
entries_per_container: 1024..8192,
include_nulls: false,
}
}
}
Expand Down Expand Up @@ -149,13 +151,23 @@ impl BatchBuilder {
self.image.append(image).unwrap();
self.time.append_value(time);

self.client_addr.append_value(format!(
"{}.{}.{}.{}",
rng.gen::<u8>(),
rng.gen::<u8>(),
rng.gen::<u8>(),
rng.gen::<u8>()
));
if self.options.include_nulls {
// Append a null value if the option is set
// Use both "NULL" as a string and a null value
if rng.gen_bool(0.5) {
self.client_addr.append_null();
} else {
self.client_addr.append_value("NULL");
}
} else {
self.client_addr.append_value(format!(
"{}.{}.{}.{}",
rng.gen::<u8>(),
rng.gen::<u8>(),
rng.gen::<u8>(),
rng.gen::<u8>()
));
}
self.request_duration.append_value(rng.gen());
self.request_user_agent
.append_value(random_string(rng, 20..100));
Expand Down Expand Up @@ -317,6 +329,12 @@ impl AccessLogGenerator {
self.options.entries_per_container = range;
self
}

// Set the condition for null values in the generated data
pub fn with_include_nulls(mut self, include_nulls: bool) -> Self {
self.options.include_nulls = include_nulls;
self
}
}

impl Iterator for AccessLogGenerator {
Expand Down

0 comments on commit 660c4b1

Please sign in to comment.