diff --git a/bench-vortex/benches/clickbench.rs b/bench-vortex/benches/clickbench.rs index e89b7d739..88022ef88 100644 --- a/bench-vortex/benches/clickbench.rs +++ b/bench-vortex/benches/clickbench.rs @@ -45,7 +45,7 @@ fn benchmark(c: &mut Criterion) { let context = session_context.clone(); runtime.block_on(async move { - clickbench::register_vortex_files(&context, "hits", basepath.as_path(), &HITS_SCHEMA) + clickbench::register_vortex_files(context, "hits", basepath.as_path(), &HITS_SCHEMA) .await .unwrap(); }); diff --git a/bench-vortex/src/bin/clickbench.rs b/bench-vortex/src/bin/clickbench.rs index 8abc2dd6b..9bfe74248 100644 --- a/bench-vortex/src/bin/clickbench.rs +++ b/bench-vortex/src/bin/clickbench.rs @@ -62,7 +62,7 @@ fn main() { // The clickbench-provided file is missing some higher-level type info, so we reprocess it // to add that info, see https://github.com/ClickHouse/ClickBench/issues/7. (0_u32..100).into_par_iter().for_each(|idx| { - let output_path = basepath.join(format!("hits_{idx}.parquet")); + let output_path = basepath.join("parquet").join(format!("hits_{idx}.parquet")); idempotent(&output_path, |output_path| { eprintln!("Fixing parquet file {idx}"); @@ -137,7 +137,7 @@ fn main() { } => { runtime.block_on(async { clickbench::register_vortex_files( - &context, + context.clone(), "hits", basepath.as_path(), &HITS_SCHEMA, diff --git a/bench-vortex/src/clickbench.rs b/bench-vortex/src/clickbench.rs index 268338e71..e93595f08 100644 --- a/bench-vortex/src/clickbench.rs +++ b/bench-vortex/src/clickbench.rs @@ -1,4 +1,4 @@ -use std::path::Path; +use std::path::{Path, PathBuf}; use std::sync::{Arc, LazyLock}; use arrow_schema::{DataType, Field, Schema, TimeUnit}; @@ -7,6 +7,7 @@ use datafusion::datasource::listing::{ ListingOptions, ListingTable, ListingTableConfig, ListingTableUrl, }; use datafusion::prelude::{ParquetReadOptions, SessionContext}; +use futures::{stream, StreamExt, TryStreamExt}; use tokio::fs::{create_dir_all, OpenOptions}; use vortex::aliases::hash_map::HashMap; use vortex::array::{ChunkedArray, StructArray}; @@ -140,99 +141,112 @@ pub static HITS_SCHEMA: LazyLock = LazyLock::new(|| { }); pub async fn register_vortex_files( - session: &SessionContext, + session: SessionContext, table_name: &str, input_path: &Path, schema: &Schema, ) -> anyhow::Result<()> { - let vortex_dir = input_path.parent().unwrap().join("vortex_compressed"); + let session2 = session.clone(); + let vortex_dir = input_path.join("vortex"); create_dir_all(&vortex_dir).await?; - for idx in 0..100 { - let parquet_file_path = input_path.join(format!("hits_{idx}.parquet")); - let output_path = vortex_dir.join(format!("hits_{idx}.{VORTEX_FILE_EXTENSION}")); - idempotent_async(&output_path, |vtx_file| async move { - eprintln!("Processing file {idx}"); - let record_batches = session - .read_parquet( - parquet_file_path.to_str().unwrap(), - ParquetReadOptions::default(), - ) - .await? - .collect() - .await?; + let format = Arc::new(VortexFormat::new(&CTX)); + let table_path = vortex_dir + .to_str() + .ok_or_else(|| vortex_err!("Path is not valid UTF-8"))?; + let table_path = format!("file://{table_path}/"); + let table_url = ListingTableUrl::parse(table_path)?; - // Create a ChunkedArray from the set of chunks. - let sts = record_batches - .into_iter() - .map(ArrayData::try_from) - .map(|a| a.unwrap().into_struct().unwrap()) - .collect::>(); + let config = ListingTableConfig::new(table_url) + .with_listing_options(ListingOptions::new(format as _)) + .with_schema(schema.clone().into()); - let mut arrays_map: HashMap, Vec> = HashMap::default(); - let mut types_map: HashMap, DType> = HashMap::default(); + let listing_table = Arc::new(ListingTable::try_new(config)?); + session2.register_table(table_name, listing_table as _)?; - for st in sts.into_iter() { - let struct_dtype = st.dtype().as_struct().unwrap(); - let names = struct_dtype.names().iter(); - let types = struct_dtype.dtypes().iter(); + let _paths: Vec = stream::iter(0..100) + .map(|idx| { + let parquet_file_path = input_path + .join("parquet") + .join(format!("hits_{idx}.parquet")); + let output_path = vortex_dir.join(format!("hits_{idx}.{VORTEX_FILE_EXTENSION}")); + let session = session.clone(); + let schema = schema.clone(); - for (field_name, field_type) in names.zip(types) { - let val = arrays_map.entry(field_name.clone()).or_default(); - val.push(st.field_by_name(field_name.as_ref()).unwrap()); + tokio::spawn(async move { + let output_path = output_path.clone(); + idempotent_async(&output_path, move |vtx_file| async move { + eprintln!("Processing file {idx}"); + let record_batches = session + .read_parquet( + parquet_file_path.to_str().unwrap(), + ParquetReadOptions::default(), + ) + .await? + .collect() + .await?; - types_map.insert(field_name.clone(), field_type.clone()); - } - } + // Create a ChunkedArray from the set of chunks. + let sts = record_batches + .into_iter() + .map(ArrayData::try_from) + .map(|a| a.unwrap().into_struct().unwrap()) + .collect::>(); - let fields = schema - .fields() - .iter() - .map(|field| { - let name: Arc = field.name().as_str().into(); - let dtype = types_map[&name].clone(); - let chunks = arrays_map.remove(&name).unwrap(); - let chunked_child = ChunkedArray::try_new(chunks, dtype).unwrap(); + let mut arrays_map: HashMap, Vec> = HashMap::default(); + let mut types_map: HashMap, DType> = HashMap::default(); - (name, chunked_child.into_array()) - }) - .collect::>(); + for st in sts.into_iter() { + let struct_dtype = st.dtype().as_struct().unwrap(); + let names = struct_dtype.names().iter(); + let types = struct_dtype.dtypes().iter(); - let data = StructArray::from_fields(&fields)?.into_array(); + for (field_name, field_type) in names.zip(types) { + let val = arrays_map.entry(field_name.clone()).or_default(); + val.push(st.field_by_name(field_name.as_ref()).unwrap()); - let compressor = SamplingCompressor::default(); - let data = compressor.compress(&data, None)?.into_array(); + types_map.insert(field_name.clone(), field_type.clone()); + } + } - let f = OpenOptions::new() - .write(true) - .truncate(true) - .create(true) - .open(&vtx_file) - .await?; + let fields = schema + .fields() + .iter() + .map(|field| { + let name: Arc = field.name().as_str().into(); + let dtype = types_map[&name].clone(); + let chunks = arrays_map.remove(&name).unwrap(); + let chunked_child = ChunkedArray::try_new(chunks, dtype).unwrap(); - let mut writer = VortexFileWriter::new(f); - writer = writer.write_array_columns(data).await?; - writer.finalize().await?; + (name, chunked_child.into_array()) + }) + .collect::>(); - anyhow::Ok(()) - }) - .await?; - } + let data = StructArray::from_fields(&fields)?.into_array(); - let format = Arc::new(VortexFormat::new(&CTX)); - let table_path = vortex_dir - .to_str() - .ok_or_else(|| vortex_err!("Path is not valid UTF-8"))?; - let table_path = format!("file://{table_path}/"); - let table_url = ListingTableUrl::parse(table_path)?; + let compressor = SamplingCompressor::default(); + let data = compressor.compress(&data, None)?.into_array(); - let config = ListingTableConfig::new(table_url) - .with_listing_options(ListingOptions::new(format as _)) - .with_schema(schema.clone().into()); + let f = OpenOptions::new() + .write(true) + .truncate(true) + .create(true) + .open(&vtx_file) + .await?; - let listing_table = Arc::new(ListingTable::try_new(config)?); + let mut writer = VortexFileWriter::new(f); + writer = writer.write_array_columns(data).await?; + writer.finalize().await?; - session.register_table(table_name, listing_table as _)?; + anyhow::Ok(()) + }) + .await + .expect("Failed to write Vortex file") + }) + }) + .buffered(16) + .try_collect::>() + .await?; Ok(()) } @@ -244,10 +258,13 @@ pub async fn register_parquet_files( schema: &Schema, ) -> anyhow::Result<()> { let format = Arc::new(ParquetFormat::new()); - let table_path = input_path - .to_str() - .ok_or_else(|| vortex_err!("Path is not valid UTF-8"))?; - let table_path = format!("file://{table_path}/"); + let table_path = input_path.join("parquet"); + let table_path = format!( + "file://{}/", + table_path + .to_str() + .ok_or_else(|| vortex_err!("Path is not valid UTF-8"))? + ); let table_url = ListingTableUrl::parse(table_path)?; let config = ListingTableConfig::new(table_url)