Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix FOR bug, also fix bench to compile #341

Merged
merged 16 commits into from
Jun 11, 2024
22 changes: 11 additions & 11 deletions bench-vortex/src/bin/compress.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,35 +4,35 @@ use std::path::PathBuf;
use bench_vortex::data_downloads::BenchmarkDataset;
use bench_vortex::public_bi_data::BenchmarkDatasets::PBI;
use bench_vortex::public_bi_data::PBIDataset;
use bench_vortex::reader::{open_vortex, rewrite_parquet_as_vortex};
use bench_vortex::reader::{open_vortex_async, rewrite_parquet_as_vortex};
use bench_vortex::taxi_data::taxi_data_parquet;
use bench_vortex::{setup_logger, IdempotentPath};
use futures::executor::block_on;
use log::{info, LevelFilter};
use tokio::fs::File;
use vortex_error::VortexResult;

pub fn main() {
#[tokio::main]
pub async fn main() {
setup_logger(LevelFilter::Info);
// compress_pbi(PBIDataset::Medicare1);
compress_taxi();
compress_taxi().await.unwrap();
}

fn compress_taxi() {
async fn compress_taxi() -> VortexResult<()> {
let path: PathBuf = "taxi_data.vortex".to_data_path();
block_on(async {
let output_file = File::create(&path).await?;
rewrite_parquet_as_vortex(taxi_data_parquet(), output_file).await
})
.unwrap();
let output_file = File::create(&path).await?;
rewrite_parquet_as_vortex(taxi_data_parquet(), output_file).await?;

let taxi_vortex = open_vortex(&path).unwrap();
let taxi_vortex = open_vortex_async(&path).await?;
info!("{}", taxi_vortex.tree_display());

let pq_size = taxi_data_parquet().metadata().unwrap().size();
let vx_size = taxi_vortex.nbytes();

info!("Parquet size: {}, Vortex size: {}", pq_size, vx_size);
info!("Compression ratio: {}", vx_size as f32 / pq_size as f32);

Ok(())
}

#[allow(dead_code)]
Expand Down
11 changes: 11 additions & 0 deletions bench-vortex/src/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,17 @@ pub fn open_vortex(path: &Path) -> VortexResult<Array> {
.map(|a| a.into_array())
}

pub async fn open_vortex_async(path: &Path) -> VortexResult<Array> {
robert3005 marked this conversation as resolved.
Show resolved Hide resolved
let file = tokio::fs::File::open(path).await.unwrap();
let mut msgs = MessageReader::try_new(TokioAdapter(file)).await.unwrap();
msgs.array_stream_from_messages(&CTX)
.await
.unwrap()
.collect_chunked()
.await
.map(|a| a.into_array())
}

pub async fn rewrite_parquet_as_vortex<W: VortexWrite>(
parquet_path: PathBuf,
write: W,
Expand Down
3 changes: 2 additions & 1 deletion vortex-fastlanes/src/for/compress.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ use vortex::stats::{ArrayStatistics, Stat};
use vortex::{Array, ArrayDType, ArrayTrait, IntoArray};
use vortex_dtype::{match_each_integer_ptype, NativePType, PType};
use vortex_error::{vortex_err, VortexResult};
use vortex_scalar::Scalar;

use crate::{FoRArray, FoREncoding};

Expand Down Expand Up @@ -53,7 +54,7 @@ impl EncodingCompression for FoREncoding {

let child = match_each_integer_ptype!(parray.ptype(), |$T| {
if shift == <$T>::PTYPE.bit_width() as u8 {
ConstantArray::new($T::default(), parray.len()).into_array()
ConstantArray::new(Scalar::zero::<$T>(parray.dtype().nullability()), parray.len()).into_array()
robert3005 marked this conversation as resolved.
Show resolved Hide resolved
} else {
compress_primitive::<$T>(parray, shift, $T::try_from(&min)?).into_array()
}
Expand Down
Loading