Skip to content

Commit

Permalink
Fix compression recursion and add result type
Browse files Browse the repository at this point in the history
  • Loading branch information
gatesn committed Mar 6, 2024
1 parent e6a7477 commit 4da0308
Show file tree
Hide file tree
Showing 24 changed files with 258 additions and 325 deletions.
4 changes: 3 additions & 1 deletion bench-vortex/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,9 @@ mod test {
HashSet::default(),
);
println!("Compression config {cfg:?}");
let compressed = CompressCtx::new(&cfg).compress(array.as_ref(), None);
let compressed = CompressCtx::new(&cfg)
.compress(array.as_ref(), None)
.unwrap();
println!("Compressed array {compressed}");
println!(
"NBytes {}, Ratio {}",
Expand Down
23 changes: 15 additions & 8 deletions vortex-alp/src/compress.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ use vortex::array::primitive::PrimitiveArray;
use vortex::array::sparse::SparseArray;
use vortex::array::{Array, ArrayRef, CloneOptionalArray};
use vortex::compress::{CompressConfig, CompressCtx, Compressor, EncodingCompression};
use vortex::error::VortexResult;
use vortex::ptype::{NativePType, PType};

use crate::alp::{ALPArray, ALPEncoding};
Expand Down Expand Up @@ -34,24 +35,30 @@ impl EncodingCompression for ALPEncoding {
}
}

fn alp_compressor(array: &dyn Array, like: Option<&dyn Array>, ctx: CompressCtx) -> ArrayRef {
fn alp_compressor(
array: &dyn Array,
like: Option<&dyn Array>,
ctx: CompressCtx,
) -> VortexResult<ArrayRef> {
let like_alp = like.map(|like_array| like_array.as_alp());

let parray = array.as_primitive();
let (encoded, exponents, patches) = like_alp
.map(|alp_like| alp_encode_like_parts(parray, alp_like))
.unwrap_or_else(|| alp_encode_parts(parray));

ALPArray::new(
Ok(ALPArray::new(
ctx.next_level()
.compress(encoded.as_ref(), like_alp.map(|a| a.encoded())),
.compress(encoded.as_ref(), like_alp.map(|a| a.encoded()))?,
exponents,
patches.map(|p| {
ctx.next_level()
.compress(p.as_ref(), like_alp.and_then(|a| a.patches()))
}),
patches
.map(|p| {
ctx.next_level()
.compress(p.as_ref(), like_alp.and_then(|a| a.patches()))
})
.transpose()?,
)
.boxed()
.boxed())
}

pub fn alp_encode(parray: &PrimitiveArray) -> ALPArray {
Expand Down
23 changes: 0 additions & 23 deletions vortex-array/src/array/bool/compress.rs

This file was deleted.

6 changes: 0 additions & 6 deletions vortex-array/src/array/bool/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ use arrow::buffer::{BooleanBuffer, NullBuffer};
use linkme::distributed_slice;

use crate::arrow::CombineChunks;
use crate::compress::EncodingCompression;
use crate::compute::scalar_at::scalar_at;
use crate::dtype::{DType, Nullability};
use crate::error::VortexResult;
Expand All @@ -20,7 +19,6 @@ use super::{
EncodingId, EncodingRef, ENCODINGS,
};

mod compress;
mod compute;
mod serde;
mod stats;
Expand Down Expand Up @@ -166,10 +164,6 @@ impl Encoding for BoolEncoding {
&Self::ID
}

fn compression(&self) -> Option<&dyn EncodingCompression> {
Some(self)
}

fn serde(&self) -> Option<&dyn EncodingSerde> {
Some(self)
}
Expand Down
43 changes: 20 additions & 23 deletions vortex-array/src/array/chunked/compress.rs
Original file line number Diff line number Diff line change
@@ -1,44 +1,41 @@
use rayon::prelude::*;
use itertools::Itertools;
use std::ops::Deref;

use crate::array::chunked::{ChunkedArray, ChunkedEncoding};
use crate::array::downcast::DowncastArrayBuiltin;
use crate::array::{Array, ArrayRef};
use crate::compress::{CompressConfig, CompressCtx, Compressor, EncodingCompression};
use crate::error::VortexResult;

impl EncodingCompression for ChunkedEncoding {
fn compressor(
&self,
array: &dyn Array,
_config: &CompressConfig,
) -> Option<&'static Compressor> {
if array.encoding().id() == &Self::ID {
Some(&(chunked_compressor as Compressor))
} else {
None
}
(array.encoding().id() == &Self::ID).then_some(&(chunked_compressor as Compressor))
}
}

fn chunked_compressor(array: &dyn Array, like: Option<&dyn Array>, ctx: CompressCtx) -> ArrayRef {
fn chunked_compressor(
array: &dyn Array,
like: Option<&dyn Array>,
ctx: CompressCtx,
) -> VortexResult<ArrayRef> {
let chunked_array = array.as_chunked();
let chunked_like = like.map(|like_array| like_array.as_chunked());

let compressed_chunks = chunked_like
.map(|c_like| {
chunked_array
.chunks()
.par_iter()
.zip_eq(c_like.chunks())
.map(|(chunk, chunk_like)| ctx.compress(chunk.as_ref(), Some(chunk_like.as_ref())))
.collect()
let compressed_chunks = chunked_array
.chunks()
.iter()
.enumerate()
.map(|(i, chunk)| {
let like_chunk = chunked_like
.and_then(|c_like| c_like.chunks().get(i))
.map(Deref::deref);
ctx.compress(chunk.deref(), like_chunk)
})
.unwrap_or_else(|| {
chunked_array
.chunks()
.par_iter()
.map(|chunk| ctx.compress(chunk.as_ref(), None))
.collect()
});
.try_collect()?;

ChunkedArray::new(compressed_chunks, array.dtype().clone()).boxed()
Ok(ChunkedArray::new(compressed_chunks, array.dtype().clone()).boxed())
}
27 changes: 0 additions & 27 deletions vortex-array/src/array/constant/compress.rs

This file was deleted.

6 changes: 0 additions & 6 deletions vortex-array/src/array/constant/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,15 +9,13 @@ use crate::array::{
ENCODINGS,
};
use crate::arrow::compute::repeat;
use crate::compress::EncodingCompression;
use crate::dtype::DType;
use crate::error::VortexResult;
use crate::formatter::{ArrayDisplay, ArrayFormatter};
use crate::scalar::{Scalar, ScalarRef};
use crate::serde::{ArraySerde, EncodingSerde};
use crate::stats::{Stats, StatsSet};

mod compress;
mod compute;
mod serde;
mod stats;
Expand Down Expand Up @@ -132,10 +130,6 @@ impl Encoding for ConstantEncoding {
&Self::ID
}

fn compression(&self) -> Option<&dyn EncodingCompression> {
Some(self)
}

fn serde(&self) -> Option<&dyn EncodingSerde> {
Some(self)
}
Expand Down
8 changes: 6 additions & 2 deletions vortex-array/src/array/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -139,10 +139,14 @@ pub trait Encoding: Debug + Send + Sync + 'static {
fn id(&self) -> &EncodingId;

/// Implementation of the array compression trait
fn compression(&self) -> Option<&dyn EncodingCompression>;
fn compression(&self) -> Option<&dyn EncodingCompression> {
None
}

/// Array serialization
fn serde(&self) -> Option<&dyn EncodingSerde>;
fn serde(&self) -> Option<&dyn EncodingSerde> {
None
}
}

pub type EncodingRef = &'static dyn Encoding;
Expand Down
44 changes: 0 additions & 44 deletions vortex-array/src/array/primitive/compress.rs

This file was deleted.

6 changes: 0 additions & 6 deletions vortex-array/src/array/primitive/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ use crate::array::{
EncodingId, EncodingRef, ENCODINGS,
};
use crate::arrow::CombineChunks;
use crate::compress::EncodingCompression;
use crate::compute::scalar_at::scalar_at;
use crate::dtype::DType;
use crate::error::VortexResult;
Expand All @@ -28,7 +27,6 @@ use crate::ptype::{match_each_native_ptype, NativePType, PType};
use crate::serde::{ArraySerde, EncodingSerde};
use crate::stats::{Stats, StatsSet};

mod compress;
mod compute;
mod serde;
mod stats;
Expand Down Expand Up @@ -253,10 +251,6 @@ impl Encoding for PrimitiveEncoding {
&Self::ID
}

fn compression(&self) -> Option<&dyn EncodingCompression> {
Some(self)
}

fn serde(&self) -> Option<&dyn EncodingSerde> {
Some(self)
}
Expand Down
21 changes: 11 additions & 10 deletions vortex-array/src/array/sparse/compress.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,28 +2,29 @@ use crate::array::downcast::DowncastArrayBuiltin;
use crate::array::sparse::{SparseArray, SparseEncoding};
use crate::array::{Array, ArrayRef};
use crate::compress::{CompressConfig, CompressCtx, Compressor, EncodingCompression};
use crate::error::VortexResult;

impl EncodingCompression for SparseEncoding {
fn compressor(
&self,
array: &dyn Array,
_config: &CompressConfig,
) -> Option<&'static Compressor> {
if array.encoding().id() == &Self::ID {
Some(&(sparse_compressor as Compressor))
} else {
None
}
(array.encoding().id() == &Self::ID).then_some(&(sparse_compressor as Compressor))
}
}

fn sparse_compressor(array: &dyn Array, like: Option<&dyn Array>, ctx: CompressCtx) -> ArrayRef {
fn sparse_compressor(
array: &dyn Array,
like: Option<&dyn Array>,
ctx: CompressCtx,
) -> VortexResult<ArrayRef> {
let sparse_array = array.as_sparse();
let sparse_like = like.map(|la| la.as_sparse());
SparseArray::new(
ctx.compress(sparse_array.indices(), sparse_like.map(|sa| sa.indices())),
ctx.compress(sparse_array.values(), sparse_like.map(|sa| sa.values())),
Ok(SparseArray::new(
ctx.compress(sparse_array.indices(), sparse_like.map(|sa| sa.indices()))?,
ctx.compress(sparse_array.values(), sparse_like.map(|sa| sa.values()))?,
sparse_array.len(),
)
.boxed()
.boxed())
}
Loading

0 comments on commit 4da0308

Please sign in to comment.