From a87c720b7a0c56fe2f7b35072d7f7341ae84c60b Mon Sep 17 00:00:00 2001 From: Dan King Date: Fri, 20 Sep 2024 10:54:01 -0400 Subject: [PATCH] feat: track compressed size & compare to parquet(zstd)? & canonical (#882) We now track these six values: 1. Compression time (s). 2. Compression throughput (bytes/s). 3. Compressed size (bytes). 4. Compressed size as fraction of a Vortex Canonical array. 5. Compressed Layout size as fraction of Parquet without block compression. 6. Compressed Layout size as fraction of Parquet with Zstd. It's a bit janky: I just unconditionally compute these values for several datasets. I couldn't figure out how to ask criterion which benchmark regex is currently in use so, for example, `cargo bench taxi` will still run all the size benchmarks for every other dataset. I also had to do some janky jq parsing to convert from Criterion's JSON output to the style expected by the benchmark-action GitHub action that we use. Nevertheless, now, for each commit to `develop`, we should get all six numbers for the Taxi, Airline Sentiment, Arade, Bimbo, CMSprovider, Euro2016, Food, HashTags, and TPC-H l_comment datasets. They'll be displayed under [Vortex Compression](https://spiraldb.github.io/vortex/dev/bench/#Vortex_Compression) at the benchmarks site. I might need to delete some old data form the gh-pages-bench branch since I changed some benchmark names, but after a few commits, those plots should become useful measures of our compression performance in space and time. --- .github/workflows/bench-pr.yml | 25 +- .github/workflows/bench.yml | 27 +- Cargo.lock | 1 + bench-vortex/.gitignore | 2 +- bench-vortex/Cargo.toml | 1 + bench-vortex/benches/compress_benchmark.rs | 287 +++++++++++++++++---- bench-vortex/src/data_downloads.rs | 1 + bench-vortex/src/lib.rs | 59 ++--- bench-vortex/src/public_bi_data.rs | 23 +- bench-vortex/src/reader.rs | 28 +- vortex-serde/src/io/write.rs | 19 +- 11 files changed, 364 insertions(+), 109 deletions(-) diff --git a/.github/workflows/bench-pr.yml b/.github/workflows/bench-pr.yml index 56a1fea6df..44b0828930 100644 --- a/.github/workflows/bench-pr.yml +++ b/.github/workflows/bench-pr.yml @@ -49,17 +49,36 @@ jobs: - name: Run benchmark shell: bash - run: cargo bench --bench ${{ matrix.benchmark.id }} -- --output-format bencher | tee ${{ matrix.benchmark.id }}.txt + run: | + cargo install cargo-criterion + cargo criterion --bench ${{ matrix.benchmark.id }} --message-format=json 2>&1 | tee out.json + + cat out.json + + sudo apt-get update && sudo apt-get install -y jq + + jq --raw-input --compact-output ' + fromjson? + | [ (if .mean != null then {name: .id, value: .mean.estimate, unit: .unit, range: ((.mean.upper_bound - .mean.lower_bound) / 2) } else {} end), + (if .throughput != null then {name: (.id + " throughput"), value: .throughput[].per_iteration, unit: .throughput[].unit, range: 0} else {} end), + {name, value, unit, range} ] + | .[] + | select(.value != null) + ' \ + out.json \ + | jq --slurp --compact-output '.' >${{ matrix.benchmark.id }}.json + + cat ${{ matrix.benchmark.id }}.json - name: Store benchmark result if: '!cancelled()' uses: benchmark-action/github-action-benchmark@v1 with: name: ${{ matrix.benchmark.name }} - tool: 'cargo' + tool: 'customSmallerIsBetter' gh-pages-branch: gh-pages-bench github-token: ${{ secrets.GITHUB_TOKEN }} - output-file-path: ${{ matrix.benchmark.id }}.txt + output-file-path: ${{ matrix.benchmark.id }}.json summary-always: true comment-always: true auto-push: false diff --git a/.github/workflows/bench.yml b/.github/workflows/bench.yml index 1ede371e87..087b4339d4 100644 --- a/.github/workflows/bench.yml +++ b/.github/workflows/bench.yml @@ -41,17 +41,36 @@ jobs: - name: Run benchmark shell: bash - run: cargo bench --bench ${{ matrix.version.id }} -- --output-format bencher | tee ${{ matrix.version.id }}.txt + run: | + cargo install cargo-criterion + cargo criterion --bench ${{ matrix.benchmark.id }} --message-format=json 2>&1 | tee out.json + + cat out.json + + sudo apt-get update && sudo apt-get install -y jq + + jq --raw-input --compact-output ' + fromjson? + | [ (if .mean != null then {name: .id, value: .mean.estimate, unit: .unit, range: ((.mean.upper_bound - .mean.lower_bound) / 2) } else {} end), + (if .throughput != null then {name: (.id + " throughput"), value: .throughput[].per_iteration, unit: .throughput[].unit, range: 0} else {} end), + {name, value, unit, range} ] + | .[] + | select(.value != null) + ' \ + out.json \ + | jq --slurp --compact-output '.' >${{ matrix.benchmark.id }}.json + + cat ${{ matrix.benchmark.id }}.json - name: Store benchmark result if: '!cancelled()' uses: benchmark-action/github-action-benchmark@v1 with: - name: ${{ matrix.version.name }} - tool: 'cargo' + name: ${{ matrix.benchmark.name }} + tool: 'customSmallerIsBetter' gh-pages-branch: gh-pages-bench github-token: ${{ secrets.GITHUB_TOKEN }} - output-file-path: ${{ matrix.version.id }}.txt + output-file-path: ${{ matrix.benchmark.id }}.json summary-always: true auto-push: true fail-on-alert: false diff --git a/Cargo.lock b/Cargo.lock index f466edf537..5c1a7dc388 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -501,6 +501,7 @@ dependencies = [ "rayon", "reqwest", "serde", + "serde_json", "simplelog", "tar", "tokio", diff --git a/bench-vortex/.gitignore b/bench-vortex/.gitignore index 6320cd248d..1269488f7f 100644 --- a/bench-vortex/.gitignore +++ b/bench-vortex/.gitignore @@ -1 +1 @@ -data \ No newline at end of file +data diff --git a/bench-vortex/Cargo.toml b/bench-vortex/Cargo.toml index d0d414d21f..a4f26361d1 100644 --- a/bench-vortex/Cargo.toml +++ b/bench-vortex/Cargo.toml @@ -47,6 +47,7 @@ rand = { workspace = true } rayon = { workspace = true } reqwest = { workspace = true } serde = { workspace = true } +serde_json = { workspace = true } simplelog = { workspace = true } tar = { workspace = true } tokio = { workspace = true, features = ["full"] } diff --git a/bench-vortex/benches/compress_benchmark.rs b/bench-vortex/benches/compress_benchmark.rs index 62c0508482..69f56d3f6c 100644 --- a/bench-vortex/benches/compress_benchmark.rs +++ b/bench-vortex/benches/compress_benchmark.rs @@ -1,50 +1,223 @@ +use std::fs; +use std::io::Cursor; +use std::path::Path; +use std::time::Duration; + +use arrow_array::RecordBatch; use bench_vortex::data_downloads::BenchmarkDataset; use bench_vortex::public_bi_data::BenchmarkDatasets; use bench_vortex::public_bi_data::PBIDataset::*; use bench_vortex::taxi_data::taxi_data_parquet; use bench_vortex::tpch::dbgen::{DBGen, DBGenOptions}; -use bench_vortex::{compress_taxi_data, tpch}; -use criterion::{black_box, criterion_group, criterion_main, Criterion}; -use vortex::{IntoArray, IntoCanonical}; +use bench_vortex::{fetch_taxi_data, tpch}; +use criterion::{ + black_box, criterion_group, criterion_main, BenchmarkGroup, Criterion, Throughput, +}; +use parquet::arrow::ArrowWriter; +use parquet::basic::{Compression, ZstdLevel}; +use parquet::file::properties::WriterProperties; +use vortex::array::{ChunkedArray, StructArray}; +use vortex::{Array, ArrayDType, IntoArray, IntoCanonical}; +use vortex_dtype::field::Field; use vortex_sampling_compressor::compressors::fsst::FSSTCompressor; use vortex_sampling_compressor::SamplingCompressor; +use vortex_serde::layouts::LayoutWriter; + +#[derive(serde::Serialize)] +struct GenericBenchmarkResults<'a> { + name: &'a str, + value: f64, + unit: &'a str, + range: f64, +} + +fn ensure_dir_exists(dir: &str) -> std::io::Result<()> { + let path = Path::new(dir); + if !path.exists() { + fs::create_dir_all(path)?; + } + Ok(()) +} + +fn parquet_written_size(array: &Array, compression: Compression) -> usize { + let mut buf = Cursor::new(Vec::new()); + let chunked = ChunkedArray::try_from(array).unwrap(); + let chunks_vec = chunked.chunks().collect::>(); + + if chunks_vec.is_empty() { + panic!("empty chunks"); + } + + let schema = RecordBatch::try_from(chunks_vec[0].clone()) + .unwrap() + .schema(); + + let writer_properties = WriterProperties::builder() + .set_compression(compression) + .build(); + let mut writer = ArrowWriter::try_new(&mut buf, schema, Some(writer_properties)).unwrap(); + for chunk in chunks_vec { + let record_batch = RecordBatch::try_from(chunk).unwrap(); + writer.write(&record_batch).unwrap(); + } + writer.flush().unwrap(); + let n_bytes = writer.bytes_written(); + writer.close().unwrap(); + n_bytes +} + +fn vortex_written_size(array: &Array) -> u64 { + async fn run(array: &Array) -> u64 { + let buf = Cursor::new(Vec::new()); + let mut writer = LayoutWriter::new(buf); + + writer = writer.write_array_columns(array.clone()).await.unwrap(); + let buf = writer.finalize().await.unwrap(); + buf.position() + } + + tokio::runtime::Builder::new_current_thread() + .enable_all() + .build() + .unwrap() + .block_on(run(array)) +} + +fn benchmark_compress( + compressor: &SamplingCompressor<'_>, + make_uncompressed: F, + group_name: &str, + group: &mut BenchmarkGroup<'_, T>, + bench_name: &str, +) where + F: Fn() -> U, + U: AsRef, +{ + ensure_dir_exists("benchmarked-files").unwrap(); + let uncompressed = make_uncompressed(); + let uncompressed_size = uncompressed.as_ref().nbytes(); + let mut compressed_size = 0; + + group.throughput(Throughput::Bytes(uncompressed_size as u64)); + group.bench_function(format!("{} compression", bench_name), |b| { + b.iter_with_large_drop(|| { + let compressed = black_box(compressor.compress(uncompressed.as_ref(), None)).unwrap(); + compressed_size = compressed.nbytes(); + }); + }); + + let vortex_nbytes = vortex_written_size( + &compressor + .compress(uncompressed.as_ref(), None) + .unwrap() + .into_array(), + ); + + let parquet_zstd_nbytes = parquet_written_size( + uncompressed.as_ref(), + Compression::ZSTD(ZstdLevel::default()), + ); + + let parquet_uncompressed_nbytes = + parquet_written_size(uncompressed.as_ref(), Compression::UNCOMPRESSED); -fn vortex_compress_taxi(c: &mut Criterion) { + println!( + "{}", + serde_json::to_string(&GenericBenchmarkResults { + name: &format!("{} Vortex-to-ParquetZstd Ratio/{}", group_name, bench_name), + value: (vortex_nbytes as f64) / (parquet_zstd_nbytes as f64), + unit: "ratio", + range: 0.0, + }) + .unwrap() + ); + + println!( + "{}", + serde_json::to_string(&GenericBenchmarkResults { + name: &format!( + "{} Vortex-to-ParquetUncompressed Ratio/{}", + group_name, bench_name + ), + value: (vortex_nbytes as f64) / (parquet_uncompressed_nbytes as f64), + unit: "ratio", + range: 0.0, + }) + .unwrap() + ); + + println!( + "{}", + serde_json::to_string(&GenericBenchmarkResults { + name: &format!("{} Compression Ratio/{}", group_name, bench_name), + value: (compressed_size as f64) / (uncompressed_size as f64), + unit: "ratio", + range: 0.0, + }) + .unwrap() + ); + + println!( + "{}", + serde_json::to_string(&GenericBenchmarkResults { + name: &format!("{} Compression Size/{}", group_name, bench_name), + value: compressed_size as f64, + unit: "bytes", + range: 0.0, + }) + .unwrap() + ); +} + +fn yellow_taxi_trip_data(c: &mut Criterion) { taxi_data_parquet(); - let mut group = c.benchmark_group("Yellow Taxi Trip Data"); + let group_name = "Yellow Taxi Trip Data"; + let mut group = c.benchmark_group(format!("{} Compression Time", group_name)); group.sample_size(10); - group.bench_function("compress", |b| b.iter(|| black_box(compress_taxi_data()))); + benchmark_compress( + &SamplingCompressor::default(), + fetch_taxi_data, + group_name, + &mut group, + "taxi", + ); group.finish() } -fn vortex_compress_medicare1(c: &mut Criterion) { - let mut group = c.benchmark_group("Public BI Benchmark"); +fn public_bi_benchmark(c: &mut Criterion) { + let group_name = "Public BI"; + let mut group = c.benchmark_group(format!("{} Compression Time", group_name)); group.sample_size(10); + // group.measurement_time(Duration::new(10, 0)); - for dataset_name in [ + for dataset_handle in [ AirlineSentiment, Arade, - // Bimbo, // 27s per sample - // CMSprovider, // >30s per sample + Bimbo, + CMSprovider, // Corporations, // duckdb thinks ' is a quote character but its used as an apostrophe // CityMaxCapita, // 11th column has F, M, and U but is inferred as boolean Euro2016, Food, HashTags, // Hatred, // panic in fsst_compress_iter - // TableroSistemaPenal, // 20s per sample + // TableroSistemaPenal, // thread 'main' panicked at bench-vortex/benches/compress_benchmark.rs:224:42: called `Result::unwrap()` on an `Err` value: expected type: {column00=utf8?, column01=i64?, column02=utf8?, column03=f64?, column04=i64?, column05=utf8?, column06=utf8?, column07=utf8?, column08=utf8?, column09=utf8?, column10=i64?, column11=i64?, column12=utf8?, column13=utf8?, column14=i64?, column15=i64?, column16=utf8?, column17=utf8?, column18=utf8?, column19=utf8?, column20=i64?, column21=utf8?, column22=utf8?, column23=utf8?, column24=utf8?, column25=i64?, column26=utf8?} but instead got {column00=utf8?, column01=i64?, column02=i64?, column03=i64?, column04=i64?, column05=utf8?, column06=i64?, column07=i64?, column08=i64?, column09=utf8?, column10=ext(vortex.date, ExtMetadata([4]))?, column11=ext(vortex.date, ExtMetadata([4]))?, column12=utf8?, column13=utf8?, column14=utf8?, column15=i64?, column16=i64?, column17=utf8?, column18=utf8?, column19=utf8?, column20=utf8?, column21=utf8?} // YaleLanguages, // 4th column looks like integer but also contains Y ] { - group.bench_function(format!("{:?}", dataset_name), |b| { - let dataset = BenchmarkDatasets::PBI(dataset_name); - dataset.write_as_parquet(); - b.iter(|| black_box(dataset.compress_to_vortex())) - }); + let dataset = BenchmarkDatasets::PBI(dataset_handle); + + benchmark_compress( + &SamplingCompressor::default(), + || dataset.to_vortex_array().unwrap(), + group_name, + &mut group, + dataset_handle.dataset_name(), + ); } group.finish() } -fn vortex_compress_tpch_l_comment(c: &mut Criterion) { +fn tpc_h_l_comment(c: &mut Criterion) { let data_dir = DBGen::new(DBGenOptions::default()).generate().unwrap(); let rt = tokio::runtime::Builder::new_current_thread() .enable_all() @@ -57,50 +230,70 @@ fn vortex_compress_tpch_l_comment(c: &mut Criterion) { )); let compressor = SamplingCompressor::default().excluding(&FSSTCompressor); - let compressor_fsst = SamplingCompressor::default(); - // l_comment column only - let mut group = c.benchmark_group("TPCH l_comment Column"); - let comments = lineitem_vortex.with_dyn(|a| { - a.as_struct_array_unchecked() - .field_by_name("l_comment") - .unwrap() - }); - + let group_name = "TPC-H l_comment"; + let mut group = c.benchmark_group(format!("{} Compression Time", group_name)); group.sample_size(10); - group.bench_function("compress-default", |b| { - b.iter_with_large_drop(|| { - std::hint::black_box(compressor.compress(&comments, None)).unwrap() - }); - }); + group.measurement_time(Duration::new(15, 0)); - group.bench_function("compress-fsst-chunked", |b| { - b.iter_with_large_drop(|| { - std::hint::black_box(compressor_fsst.compress(&comments, None)).unwrap() - }); - }); + let comment_chunks = ChunkedArray::try_from(lineitem_vortex) + .unwrap() + .chunks() + .map(|chunk| { + StructArray::try_from(chunk) + .unwrap() + .project(&[Field::Name("l_comment".to_string())]) + .unwrap() + .into_array() + }) + .collect::>(); + let comment_dtype = comment_chunks[0].dtype().clone(); + let comments = ChunkedArray::try_new(comment_chunks, comment_dtype) + .unwrap() + .into_array(); + + benchmark_compress( + &compressor, + || &comments, + group_name, + &mut group, + "chunked-without-fsst", + ); + + benchmark_compress( + &compressor_fsst, + || &comments, + group_name, + &mut group, + "chunked-with-fsst", + ); - // Compare canonicalizing let comments_canonical = comments .into_canonical() .unwrap() - .into_varbin() + .into_struct() .unwrap() .into_array(); - group.bench_function("compress-fsst-canonicalized", |b| { - b.iter_with_large_drop(|| { - std::hint::black_box(compressor_fsst.compress(&comments_canonical, None)).unwrap() - }); - }); + let dtype = comments_canonical.dtype().clone(); + let comments_canonical_chunked = + ChunkedArray::try_new(vec![comments_canonical], dtype).unwrap(); + + benchmark_compress( + &compressor_fsst, + || &comments_canonical_chunked, + group_name, + &mut group, + "canonical-with-fsst", + ); group.finish(); } criterion_group!( benches, - vortex_compress_taxi, - vortex_compress_medicare1, - vortex_compress_tpch_l_comment, + yellow_taxi_trip_data, + public_bi_benchmark, + tpc_h_l_comment, ); criterion_main!(benches); diff --git a/bench-vortex/src/data_downloads.rs b/bench-vortex/src/data_downloads.rs index 88c7ceccb8..ba0405bcd9 100644 --- a/bench-vortex/src/data_downloads.rs +++ b/bench-vortex/src/data_downloads.rs @@ -91,6 +91,7 @@ pub fn decompress_bz2(input_path: PathBuf, output_path: PathBuf) -> PathBuf { pub trait BenchmarkDataset { fn as_uncompressed(&self); + fn to_vortex_array(&self) -> VortexResult; fn compress_to_vortex(&self) -> VortexResult<()>; fn write_as_parquet(&self); fn write_as_vortex(&self) -> impl Future; diff --git a/bench-vortex/src/lib.rs b/bench-vortex/src/lib.rs index f88cc7b149..541e2d8609 100644 --- a/bench-vortex/src/lib.rs +++ b/bench-vortex/src/lib.rs @@ -8,12 +8,10 @@ use std::path::{Path, PathBuf}; use std::sync::Arc; use arrow_array::RecordBatchReader; -use humansize::DECIMAL; use itertools::Itertools; use lazy_static::lazy_static; -use log::{info, LevelFilter}; +use log::LevelFilter; use parquet::arrow::arrow_reader::ParquetRecordBatchReaderBuilder; -use parquet::arrow::ProjectionMask; use simplelog::{ColorChoice, Config, TermLogger, TerminalMode}; use vortex::array::ChunkedArray; use vortex::arrow::FromArrowType; @@ -167,49 +165,30 @@ pub fn setup_logger(level: LevelFilter) { .unwrap(); } -pub fn compress_taxi_data() -> Array { +pub fn fetch_taxi_data() -> Array { let file = File::open(taxi_data_parquet()).unwrap(); let builder = ParquetRecordBatchReaderBuilder::try_new(file).unwrap(); - let _mask = ProjectionMask::roots(builder.parquet_schema(), [6]); - let _no_datetime_mask = ProjectionMask::roots( - builder.parquet_schema(), - [0, 3, 4, 5, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18], - ); - let reader = builder - .with_projection(_mask) - //.with_projection(no_datetime_mask) - .with_batch_size(BATCH_SIZE) - // .with_batch_size(5_000_000) - // .with_limit(100_000) - .build() - .unwrap(); + let reader = builder.with_batch_size(BATCH_SIZE).build().unwrap(); let schema = reader.schema(); - let mut uncompressed_size: usize = 0; - let compressor: &dyn CompressionStrategy = &SamplingCompressor::new(COMPRESSORS.clone()); - let chunks = reader - .into_iter() - .map(|batch_result| batch_result.unwrap()) - .map(Array::try_from) - .map(Result::unwrap) - .map(|array| { - uncompressed_size += array.nbytes(); - compressor.compress(&array).unwrap() - }) - .collect_vec(); - - let compressed = ChunkedArray::try_new(chunks, DType::from_arrow(schema)) - .unwrap() - .into_array(); + ChunkedArray::try_new( + reader + .into_iter() + .map(|batch_result| batch_result.unwrap()) + .map(Array::try_from) + .map(Result::unwrap) + .collect_vec(), + DType::from_arrow(schema), + ) + .unwrap() + .into_array() +} - info!( - "{}, Bytes: {}, Ratio {}", - humansize::format_size(compressed.nbytes(), DECIMAL), - compressed.nbytes(), - compressed.nbytes() as f32 / uncompressed_size as f32 - ); +pub fn compress_taxi_data() -> Array { + let uncompressed = fetch_taxi_data(); + let compressor: &dyn CompressionStrategy = &SamplingCompressor::new(COMPRESSORS.clone()); - compressed + compressor.compress(&uncompressed).unwrap() } pub struct CompressionRunStats { diff --git a/bench-vortex/src/public_bi_data.rs b/bench-vortex/src/public_bi_data.rs index da052f14a6..c54729f48c 100644 --- a/bench-vortex/src/public_bi_data.rs +++ b/bench-vortex/src/public_bi_data.rs @@ -10,13 +10,15 @@ use itertools::Itertools; use log::info; use reqwest::Url; use tokio::fs::File; -use vortex::ArrayTrait; +use vortex::array::ChunkedArray; +use vortex::{Array, ArrayDType, ArrayTrait, IntoArray}; use vortex_error::VortexResult; use crate::data_downloads::{decompress_bz2, download_data, BenchmarkDataset, FileType}; use crate::public_bi_data::PBIDataset::*; use crate::reader::{ - compress_parquet_to_vortex, open_vortex, rewrite_parquet_as_vortex, write_csv_as_parquet, + compress_parquet_to_vortex, open_vortex, read_parquet_to_vortex, rewrite_parquet_as_vortex, + write_csv_as_parquet, }; use crate::{idempotent, IdempotentPath}; @@ -423,6 +425,23 @@ impl BenchmarkDataset for BenchmarkDatasets { } } + fn to_vortex_array(&self) -> VortexResult { + self.write_as_parquet(); + + let arrays = self + .list_files(FileType::Parquet) + .iter() + .map(|f| read_parquet_to_vortex(f.as_path())) + .collect::>>()?; + assert!(!arrays.is_empty()); + let dtype = arrays[0].dtype().clone(); + ChunkedArray::try_new( + arrays.iter().flat_map(|x| x.chunks()).collect::>(), + dtype, + ) + .map(|x| x.into_array()) + } + fn compress_to_vortex(&self) -> VortexResult<()> { self.write_as_parquet(); for f in self.list_files(FileType::Parquet) { diff --git a/bench-vortex/src/reader.rs b/bench-vortex/src/reader.rs index a0a6595148..77e6dcca08 100644 --- a/bench-vortex/src/reader.rs +++ b/bench-vortex/src/reader.rs @@ -27,7 +27,7 @@ use vortex::array::{ChunkedArray, PrimitiveArray}; use vortex::arrow::FromArrowType; use vortex::compress::CompressionStrategy; use vortex::stream::ArrayStreamExt; -use vortex::{Array, IntoArray, IntoCanonical}; +use vortex::{Array, ArrayDType, IntoArray, IntoCanonical}; use vortex_buffer::Buffer; use vortex_dtype::DType; use vortex_error::{vortex_err, VortexResult}; @@ -89,26 +89,32 @@ pub async fn rewrite_parquet_as_vortex( Ok(()) } -pub fn compress_parquet_to_vortex(parquet_path: &Path) -> VortexResult { +pub fn read_parquet_to_vortex(parquet_path: &Path) -> VortexResult { let taxi_pq = File::open(parquet_path)?; let builder = ParquetRecordBatchReaderBuilder::try_new(taxi_pq)?; - // FIXME(ngates): #157 the compressor should handle batch size. let reader = builder.with_batch_size(BATCH_SIZE).build()?; - let dtype = DType::from_arrow(reader.schema()); - - let compressor: &dyn CompressionStrategy = &SamplingCompressor::new(COMPRESSORS.clone()); let chunks = reader .map(|batch_result| batch_result.unwrap()) - .map(|record_batch| { - let vortex_array = Array::try_from(record_batch).unwrap(); - compressor.compress(&vortex_array).unwrap() - }) - .collect_vec(); + .map(Array::try_from) + .collect::>>()?; ChunkedArray::try_new(chunks, dtype) } +pub fn compress_parquet_to_vortex(parquet_path: &Path) -> VortexResult { + let chunked = read_parquet_to_vortex(parquet_path)?; + let compressor: &dyn CompressionStrategy = &SamplingCompressor::new(COMPRESSORS.clone()); + let dtype = chunked.dtype().clone(); + ChunkedArray::try_new( + chunked + .chunks() + .map(|x| compressor.compress(&x)) + .collect::>>()?, + dtype, + ) +} + pub fn write_csv_as_parquet(csv_path: PathBuf, output_path: &Path) -> VortexResult<()> { info!( "Compressing {} to parquet", diff --git a/vortex-serde/src/io/write.rs b/vortex-serde/src/io/write.rs index 80b4e86f58..2d71eeb05a 100644 --- a/vortex-serde/src/io/write.rs +++ b/vortex-serde/src/io/write.rs @@ -1,5 +1,5 @@ use std::future::{ready, Future}; -use std::io; +use std::io::{self, Cursor, Write}; use vortex_buffer::io_buf::IoBuf; @@ -24,6 +24,23 @@ impl VortexWrite for Vec { } } +impl VortexWrite for Cursor +where + Cursor: Write, +{ + fn write_all(&mut self, buffer: B) -> impl Future> { + ready(std::io::Write::write_all(self, buffer.as_slice()).map(|_| buffer)) + } + + fn flush(&mut self) -> impl Future> { + ready(std::io::Write::flush(self)) + } + + fn shutdown(&mut self) -> impl Future> { + ready(Ok(())) + } +} + impl VortexWrite for &mut W { fn write_all(&mut self, buffer: B) -> impl Future> { (*self).write_all(buffer)