Skip to content

Commit

Permalink
group all decompress benchmarks together
Browse files Browse the repository at this point in the history
  • Loading branch information
danking committed Oct 10, 2024
1 parent 05066b4 commit 88d910d
Showing 1 changed file with 49 additions and 39 deletions.
88 changes: 49 additions & 39 deletions bench-vortex/benches/compress_noci.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,16 +10,15 @@ use bench_vortex::public_bi_data::PBIDataset::*;
use bench_vortex::taxi_data::taxi_data_parquet;
use bench_vortex::tpch::dbgen::{DBGen, DBGenOptions};
use bench_vortex::{fetch_taxi_data, tpch};
use criterion::{
black_box, criterion_group, criterion_main, BatchSize, BenchmarkGroup, Criterion, Throughput,
};
use criterion::{black_box, criterion_group, criterion_main, BatchSize, Criterion, Throughput};
use parquet::arrow::ArrowWriter;
use parquet::basic::{Compression, ZstdLevel};
use parquet::file::properties::WriterProperties;
use regex::Regex;
use vortex::array::{ChunkedArray, StructArray};
use vortex::{Array, ArrayDType, IntoArray, IntoCanonical};
use vortex_dtype::field::Field;
use vortex_error::vortex_bail;
use vortex_sampling_compressor::compressors::fsst::FSSTCompressor;
use vortex_sampling_compressor::SamplingCompressor;
use vortex_serde::layouts::LayoutWriter;
Expand Down Expand Up @@ -84,10 +83,12 @@ fn vortex_written_size(array: &Array) -> u64 {
.block_on(run(array))
}

fn benchmark_compress<T: criterion::measurement::Measurement, F, U>(
fn benchmark_compress<F, U>(
c: &mut Criterion,
compressor: &SamplingCompressor<'_>,
make_uncompressed: F,
group: &mut BenchmarkGroup<'_, T>,
sample_size: usize,
measurement_time: Option<Duration>,
bench_name: &str,
) where
F: Fn() -> U,
Expand All @@ -98,13 +99,20 @@ fn benchmark_compress<T: criterion::measurement::Measurement, F, U>(
let uncompressed_size = uncompressed.as_ref().nbytes();
let mut compressed_size = 0;

group.throughput(Throughput::Bytes(uncompressed_size as u64));
group.bench_function(bench_name, |b| {
b.iter_with_large_drop(|| {
let compressed = black_box(compressor.compress(uncompressed.as_ref(), None).unwrap());
compressed_size = compressed.nbytes();
{
let mut group = c.benchmark_group("compress time");
group.sample_size(sample_size);
group.throughput(Throughput::Bytes(uncompressed_size as u64));
measurement_time.map(|t| group.measurement_time(t));
group.bench_function(bench_name, |b| {
b.iter_with_large_drop(|| {
let compressed =
black_box(compressor.compress(uncompressed.as_ref(), None).unwrap());
compressed_size = compressed.nbytes();
});
});
});
group.finish();
}

let (compressed_array, path) = compressor
.compress(uncompressed.as_ref(), None)
Expand All @@ -116,15 +124,21 @@ fn benchmark_compress<T: criterion::measurement::Measurement, F, U>(
compressed_size as f64 / uncompressed_size as f64
);

group.bench_function(format!("{} decompression", bench_name), |b| {
b.iter_batched(
|| compressed_array.clone(),
|compressed| {
black_box(compressed.into_canonical().unwrap().into_arrow().unwrap());
},
BatchSize::PerIteration,
);
});
{
let mut group = c.benchmark_group("decompress time");
group.sample_size(sample_size);
measurement_time.map(|t| group.measurement_time(t));
group.bench_function(bench_name, |b| {
b.iter_batched(
|| compressed_array.clone(),
|compressed| {
black_box(compressed.into_canonical().unwrap().into_arrow().unwrap());
},
BatchSize::PerIteration,
);
});
group.finish();
}

if env::var("BENCH_VORTEX_RATIOS")
.ok()
Expand Down Expand Up @@ -180,22 +194,17 @@ fn benchmark_compress<T: criterion::measurement::Measurement, F, U>(

fn yellow_taxi_trip_data(c: &mut Criterion) {
taxi_data_parquet();
let mut group = c.benchmark_group("compress time");
group.sample_size(10);
benchmark_compress(
c,
&SamplingCompressor::default(),
fetch_taxi_data,
&mut group,
10,
None,
"taxi",
);
group.finish()
}

fn public_bi_benchmark(c: &mut Criterion) {
let mut group = c.benchmark_group("compress time");
group.sample_size(10);
// group.measurement_time(Duration::new(10, 0));

for dataset_handle in [
AirlineSentiment,
Arade,
Expand All @@ -213,13 +222,14 @@ fn public_bi_benchmark(c: &mut Criterion) {
let dataset = BenchmarkDatasets::PBI(dataset_handle);

benchmark_compress(
c,
&SamplingCompressor::default(),
|| dataset.to_vortex_array().unwrap(),
&mut group,
10,
None,
dataset_handle.dataset_name(),
);
}
group.finish()
}

fn tpc_h_l_comment(c: &mut Criterion) {
Expand All @@ -237,10 +247,6 @@ fn tpc_h_l_comment(c: &mut Criterion) {
let compressor = SamplingCompressor::default().excluding(&FSSTCompressor);
let compressor_fsst = SamplingCompressor::default();

let mut group = c.benchmark_group("compress time");
group.sample_size(10);
group.measurement_time(Duration::new(15, 0));

let comment_chunks = ChunkedArray::try_from(lineitem_vortex)
.unwrap()
.chunks()
Expand All @@ -258,16 +264,20 @@ fn tpc_h_l_comment(c: &mut Criterion) {
.into_array();

benchmark_compress(
c,
&compressor,
|| &comments,
&mut group,
10,
None,
"TPC-H l_comment chunked without fsst",
);

benchmark_compress(
c,
&compressor_fsst,
|| &comments,
&mut group,
10,
None,
"TPC-H l_comment chunked",
);

Expand All @@ -282,13 +292,13 @@ fn tpc_h_l_comment(c: &mut Criterion) {
ChunkedArray::try_new(vec![comments_canonical], dtype).unwrap();

benchmark_compress(
c,
&compressor_fsst,
|| &comments_canonical_chunked,
&mut group,
10,
Some(Duration::new(15, 0)),
"TPC-H l_comment canonical",
);

group.finish();
}

criterion_group!(
Expand Down

0 comments on commit 88d910d

Please sign in to comment.