diff --git a/bench-vortex/src/tpch/mod.rs b/bench-vortex/src/tpch/mod.rs index 8c2e2ecd58..bcceeace78 100644 --- a/bench-vortex/src/tpch/mod.rs +++ b/bench-vortex/src/tpch/mod.rs @@ -36,6 +36,10 @@ pub const EXPECTED_ROW_COUNTS: [usize; 23] = [ 0, 4, 460, 11620, 5, 5, 1, 4, 2, 175, 37967, 1048, 2, 42, 1, 1, 18314, 1, 57, 1, 186, 411, 7, ]; +// Sizes match default compressor configuration +const TARGET_BLOCK_BYTESIZE: usize = 16 * (1 << 20); +const TARGET_BLOCK_SIZE: usize = 64 * (1 << 10); + #[derive(Clone, Copy, Debug, Hash, PartialEq, Eq)] pub enum Format { Csv, @@ -245,8 +249,7 @@ async fn register_vortex_file( // Create a ChunkedArray from the set of chunks. let sts = record_batches - .iter() - .cloned() + .into_iter() .map(Array::try_from) .map(|a| a.unwrap().into_struct().unwrap()) .collect::>(); @@ -274,11 +277,14 @@ async fn register_vortex_file( let name: Arc = field.name().as_str().into(); let dtype = types_map[&name].clone(); let chunks = arrays_map.remove(&name).unwrap(); + let mut chunked_child = ChunkedArray::try_new(chunks, dtype).unwrap(); + if !enable_compression { + chunked_child = chunked_child + .rechunk(TARGET_BLOCK_BYTESIZE, TARGET_BLOCK_SIZE) + .unwrap() + } - ( - name.clone(), - ChunkedArray::try_new(chunks, dtype).unwrap().into_array(), - ) + (name, chunked_child.into_array()) }) .collect::>(); @@ -359,8 +365,7 @@ async fn register_vortex( // Create a ChunkedArray from the set of chunks. let chunks: Vec = record_batches - .iter() - .cloned() + .into_iter() .map(ArrowStructArray::from) .map(|struct_array| Array::from_arrow(&struct_array, false)) .collect(); @@ -403,8 +408,7 @@ pub async fn load_table(data_dir: impl AsRef, name: &str, schema: &Schema) .unwrap(); let chunks: Vec = record_batches - .iter() - .cloned() + .into_iter() .map(ArrowStructArray::from) .map(|struct_array| Array::from_arrow(&struct_array, false)) .collect();