Skip to content

Commit

Permalink
Parallel clickbench files (#1663)
Browse files Browse the repository at this point in the history
  • Loading branch information
gatesn authored Dec 12, 2024
1 parent 8245062 commit ecd891b
Show file tree
Hide file tree
Showing 3 changed files with 96 additions and 79 deletions.
2 changes: 1 addition & 1 deletion bench-vortex/benches/clickbench.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ fn benchmark(c: &mut Criterion) {
let context = session_context.clone();

runtime.block_on(async move {
clickbench::register_vortex_files(&context, "hits", basepath.as_path(), &HITS_SCHEMA)
clickbench::register_vortex_files(context, "hits", basepath.as_path(), &HITS_SCHEMA)
.await
.unwrap();
});
Expand Down
4 changes: 2 additions & 2 deletions bench-vortex/src/bin/clickbench.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ fn main() {
// The clickbench-provided file is missing some higher-level type info, so we reprocess it
// to add that info, see https://github.com/ClickHouse/ClickBench/issues/7.
(0_u32..100).into_par_iter().for_each(|idx| {
let output_path = basepath.join(format!("hits_{idx}.parquet"));
let output_path = basepath.join("parquet").join(format!("hits_{idx}.parquet"));
idempotent(&output_path, |output_path| {
eprintln!("Fixing parquet file {idx}");

Expand Down Expand Up @@ -137,7 +137,7 @@ fn main() {
} => {
runtime.block_on(async {
clickbench::register_vortex_files(
&context,
context.clone(),
"hits",
basepath.as_path(),
&HITS_SCHEMA,
Expand Down
169 changes: 93 additions & 76 deletions bench-vortex/src/clickbench.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use std::path::Path;
use std::path::{Path, PathBuf};
use std::sync::{Arc, LazyLock};

use arrow_schema::{DataType, Field, Schema, TimeUnit};
Expand All @@ -7,6 +7,7 @@ use datafusion::datasource::listing::{
ListingOptions, ListingTable, ListingTableConfig, ListingTableUrl,
};
use datafusion::prelude::{ParquetReadOptions, SessionContext};
use futures::{stream, StreamExt, TryStreamExt};
use tokio::fs::{create_dir_all, OpenOptions};
use vortex::aliases::hash_map::HashMap;
use vortex::array::{ChunkedArray, StructArray};
Expand Down Expand Up @@ -140,99 +141,112 @@ pub static HITS_SCHEMA: LazyLock<Schema> = LazyLock::new(|| {
});

pub async fn register_vortex_files(
session: &SessionContext,
session: SessionContext,
table_name: &str,
input_path: &Path,
schema: &Schema,
) -> anyhow::Result<()> {
let vortex_dir = input_path.parent().unwrap().join("vortex_compressed");
let session2 = session.clone();
let vortex_dir = input_path.join("vortex");
create_dir_all(&vortex_dir).await?;

for idx in 0..100 {
let parquet_file_path = input_path.join(format!("hits_{idx}.parquet"));
let output_path = vortex_dir.join(format!("hits_{idx}.{VORTEX_FILE_EXTENSION}"));
idempotent_async(&output_path, |vtx_file| async move {
eprintln!("Processing file {idx}");
let record_batches = session
.read_parquet(
parquet_file_path.to_str().unwrap(),
ParquetReadOptions::default(),
)
.await?
.collect()
.await?;
let format = Arc::new(VortexFormat::new(&CTX));
let table_path = vortex_dir
.to_str()
.ok_or_else(|| vortex_err!("Path is not valid UTF-8"))?;
let table_path = format!("file://{table_path}/");
let table_url = ListingTableUrl::parse(table_path)?;

// Create a ChunkedArray from the set of chunks.
let sts = record_batches
.into_iter()
.map(ArrayData::try_from)
.map(|a| a.unwrap().into_struct().unwrap())
.collect::<Vec<_>>();
let config = ListingTableConfig::new(table_url)
.with_listing_options(ListingOptions::new(format as _))
.with_schema(schema.clone().into());

let mut arrays_map: HashMap<Arc<str>, Vec<ArrayData>> = HashMap::default();
let mut types_map: HashMap<Arc<str>, DType> = HashMap::default();
let listing_table = Arc::new(ListingTable::try_new(config)?);
session2.register_table(table_name, listing_table as _)?;

for st in sts.into_iter() {
let struct_dtype = st.dtype().as_struct().unwrap();
let names = struct_dtype.names().iter();
let types = struct_dtype.dtypes().iter();
let _paths: Vec<PathBuf> = stream::iter(0..100)
.map(|idx| {
let parquet_file_path = input_path
.join("parquet")
.join(format!("hits_{idx}.parquet"));
let output_path = vortex_dir.join(format!("hits_{idx}.{VORTEX_FILE_EXTENSION}"));
let session = session.clone();
let schema = schema.clone();

for (field_name, field_type) in names.zip(types) {
let val = arrays_map.entry(field_name.clone()).or_default();
val.push(st.field_by_name(field_name.as_ref()).unwrap());
tokio::spawn(async move {
let output_path = output_path.clone();
idempotent_async(&output_path, move |vtx_file| async move {
eprintln!("Processing file {idx}");
let record_batches = session
.read_parquet(
parquet_file_path.to_str().unwrap(),
ParquetReadOptions::default(),
)
.await?
.collect()
.await?;

types_map.insert(field_name.clone(), field_type.clone());
}
}
// Create a ChunkedArray from the set of chunks.
let sts = record_batches
.into_iter()
.map(ArrayData::try_from)
.map(|a| a.unwrap().into_struct().unwrap())
.collect::<Vec<_>>();

let fields = schema
.fields()
.iter()
.map(|field| {
let name: Arc<str> = field.name().as_str().into();
let dtype = types_map[&name].clone();
let chunks = arrays_map.remove(&name).unwrap();
let chunked_child = ChunkedArray::try_new(chunks, dtype).unwrap();
let mut arrays_map: HashMap<Arc<str>, Vec<ArrayData>> = HashMap::default();
let mut types_map: HashMap<Arc<str>, DType> = HashMap::default();

(name, chunked_child.into_array())
})
.collect::<Vec<_>>();
for st in sts.into_iter() {
let struct_dtype = st.dtype().as_struct().unwrap();
let names = struct_dtype.names().iter();
let types = struct_dtype.dtypes().iter();

let data = StructArray::from_fields(&fields)?.into_array();
for (field_name, field_type) in names.zip(types) {
let val = arrays_map.entry(field_name.clone()).or_default();
val.push(st.field_by_name(field_name.as_ref()).unwrap());

let compressor = SamplingCompressor::default();
let data = compressor.compress(&data, None)?.into_array();
types_map.insert(field_name.clone(), field_type.clone());
}
}

let f = OpenOptions::new()
.write(true)
.truncate(true)
.create(true)
.open(&vtx_file)
.await?;
let fields = schema
.fields()
.iter()
.map(|field| {
let name: Arc<str> = field.name().as_str().into();
let dtype = types_map[&name].clone();
let chunks = arrays_map.remove(&name).unwrap();
let chunked_child = ChunkedArray::try_new(chunks, dtype).unwrap();

let mut writer = VortexFileWriter::new(f);
writer = writer.write_array_columns(data).await?;
writer.finalize().await?;
(name, chunked_child.into_array())
})
.collect::<Vec<_>>();

anyhow::Ok(())
})
.await?;
}
let data = StructArray::from_fields(&fields)?.into_array();

let format = Arc::new(VortexFormat::new(&CTX));
let table_path = vortex_dir
.to_str()
.ok_or_else(|| vortex_err!("Path is not valid UTF-8"))?;
let table_path = format!("file://{table_path}/");
let table_url = ListingTableUrl::parse(table_path)?;
let compressor = SamplingCompressor::default();
let data = compressor.compress(&data, None)?.into_array();

let config = ListingTableConfig::new(table_url)
.with_listing_options(ListingOptions::new(format as _))
.with_schema(schema.clone().into());
let f = OpenOptions::new()
.write(true)
.truncate(true)
.create(true)
.open(&vtx_file)
.await?;

let listing_table = Arc::new(ListingTable::try_new(config)?);
let mut writer = VortexFileWriter::new(f);
writer = writer.write_array_columns(data).await?;
writer.finalize().await?;

session.register_table(table_name, listing_table as _)?;
anyhow::Ok(())
})
.await
.expect("Failed to write Vortex file")
})
})
.buffered(16)
.try_collect::<Vec<_>>()
.await?;

Ok(())
}
Expand All @@ -244,10 +258,13 @@ pub async fn register_parquet_files(
schema: &Schema,
) -> anyhow::Result<()> {
let format = Arc::new(ParquetFormat::new());
let table_path = input_path
.to_str()
.ok_or_else(|| vortex_err!("Path is not valid UTF-8"))?;
let table_path = format!("file://{table_path}/");
let table_path = input_path.join("parquet");
let table_path = format!(
"file://{}/",
table_path
.to_str()
.ok_or_else(|| vortex_err!("Path is not valid UTF-8"))?
);
let table_url = ListingTableUrl::parse(table_path)?;

let config = ListingTableConfig::new(table_url)
Expand Down

0 comments on commit ecd891b

Please sign in to comment.