Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

German strings, attempt 3 #1082

Merged
merged 39 commits into from
Oct 22, 2024
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
Show all changes
39 commits
Select commit Hold shift + click to select a range
7838f7a
part 1
a10y Oct 17, 2024
09118d5
monster krabbie patty??!
a10y Oct 17, 2024
233d7e8
Merge remote-tracking branch 'origin/develop' into aduffy/german-stri…
a10y Oct 17, 2024
a6335b5
pass all tests
a10y Oct 17, 2024
6f9adfb
rye run ruff raggy rokay
a10y Oct 17, 2024
04d8ee1
fix issue caught by miri
a10y Oct 18, 2024
e3659bb
cargofmt
a10y Oct 18, 2024
abb1ee6
canonicalize test
a10y Oct 18, 2024
1b143da
fix clippy
a10y Oct 18, 2024
548f63c
delete unusedcode for now
a10y Oct 18, 2024
3c1af26
enable FilterFn on VarBin -> fix issue with FSST filter
a10y Oct 18, 2024
a636365
fix a bunch of python doctest
a10y Oct 18, 2024
f524f8f
fix evermore doctests
a10y Oct 18, 2024
c15935e
monkeypatch
a10y Oct 18, 2024
1bb1f00
speed up VarBin -> VarBinView
a10y Oct 18, 2024
8cb7525
ruff format
a10y Oct 18, 2024
977b785
Merge remote-tracking branch 'origin/develop' into aduffy/german-stri…
a10y Oct 18, 2024
1f07380
apply changes on top
a10y Oct 18, 2024
dd9c7d2
stack on top of upstream
a10y Oct 18, 2024
574f815
ruff ruff
a10y Oct 19, 2024
56b510b
faster into_canonical
a10y Oct 21, 2024
af23e16
pydocs
a10y Oct 21, 2024
bb900dc
pushdown compare to FSST using the symbol table
a10y Oct 21, 2024
d0aa345
add test for FSST MaybeCompareFn
a10y Oct 21, 2024
f4456ab
better canonicalize for ConstantArray
a10y Oct 21, 2024
aab6c2b
fallback to slow builder when VarBin >= i32::MAX
a10y Oct 22, 2024
71eb88a
Merge remote-tracking branch 'origin/develop' into aduffy/german-stri…
a10y Oct 22, 2024
c0e85ec
enforce buffer_lens on VarBinView construction
a10y Oct 22, 2024
b8fcac0
move varbin_datum and varbin_to_arrow to separate module
a10y Oct 22, 2024
656fde2
just use arrow_cast for canonicalizing VarBin
a10y Oct 22, 2024
52eebe5
benchmark for take, make vbv take faster
a10y Oct 22, 2024
2f45c46
some comments
a10y Oct 22, 2024
1451b8f
some comments
a10y Oct 22, 2024
f7a42aa
comments
a10y Oct 22, 2024
980fd9e
Merge remote-tracking branch 'origin/develop' into aduffy/german-stri…
a10y Oct 22, 2024
e12d552
fix FSST compare semantic when constant is null
a10y Oct 22, 2024
ed7ba78
use constant
a10y Oct 22, 2024
56bb774
fix up py tests
a10y Oct 22, 2024
8adc8a3
gakgjsedrkghskerjghsejkrghjk
a10y Oct 22, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,7 @@ serde = "1.0.197"
serde_json = "1.0.116"
serde_test = "1.0.176"
simplelog = { version = "0.12.2", features = ["paris"] }
static_assertions = "1"
tar = "0.4"
tempfile = "3"
thiserror = "1.0.58"
Expand Down
6 changes: 3 additions & 3 deletions bench-vortex/benches/bytes_at.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use std::sync::Arc;
use criterion::{criterion_group, criterion_main, BenchmarkId, Criterion};
use futures::executor::block_on;
use futures::StreamExt;
use vortex::array::{PrimitiveArray, VarBinArray};
use vortex::array::{PrimitiveArray, VarBinArray, VarBinViewArray};
use vortex::validity::Validity;
use vortex::{Context, IntoArray, IntoCanonical};
use vortex_dtype::{DType, Nullability};
Expand All @@ -24,7 +24,7 @@ fn array_data_fixture() -> VarBinArray {
.unwrap()
}

fn array_view_fixture() -> VarBinArray {
fn array_view_fixture() -> VarBinViewArray {
let array_data = array_data_fixture();
let mut buffer = Vec::new();

Expand All @@ -42,7 +42,7 @@ fn array_view_fixture() -> VarBinArray {
.unwrap()
.into_canonical()
.unwrap()
.into_varbin()
.into_varbinview()
.unwrap()
}

Expand Down
58 changes: 29 additions & 29 deletions bench-vortex/src/tpch/schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,62 +7,62 @@ use lazy_static::lazy_static;
lazy_static! {
pub static ref NATION: Schema = Schema::new(vec![
Field::new("n_nationkey", DataType::Int64, false),
Field::new("n_name", DataType::Utf8, false),
Field::new("n_name", DataType::Utf8View, false),
Field::new("n_regionkey", DataType::Int64, false),
Field::new("n_comment", DataType::Utf8, true),
Field::new("n_comment", DataType::Utf8View, true),
]);
pub static ref REGION: Schema = Schema::new(vec![
Field::new("r_regionkey", DataType::Int64, false),
Field::new("r_name", DataType::Utf8, false),
Field::new("r_comment", DataType::Utf8, true),
Field::new("r_name", DataType::Utf8View, false),
Field::new("r_comment", DataType::Utf8View, true),
]);
pub static ref PART: Schema = Schema::new(vec![
Field::new("p_partkey", DataType::Int64, false),
Field::new("p_name", DataType::Utf8, false),
Field::new("p_mfgr", DataType::Utf8, false),
Field::new("p_brand", DataType::Utf8, false),
Field::new("p_type", DataType::Utf8, false),
Field::new("p_name", DataType::Utf8View, false),
Field::new("p_mfgr", DataType::Utf8View, false),
Field::new("p_brand", DataType::Utf8View, false),
Field::new("p_type", DataType::Utf8View, false),
Field::new("p_size", DataType::Int32, false),
Field::new("p_container", DataType::Utf8, false),
Field::new("p_container", DataType::Utf8View, false),
Field::new("p_retailprice", DataType::Float64, false),
Field::new("p_comment", DataType::Utf8, false),
Field::new("p_comment", DataType::Utf8View, false),
]);
pub static ref SUPPLIER: Schema = Schema::new(vec![
Field::new("s_suppkey", DataType::Int64, false),
Field::new("s_name", DataType::Utf8, false),
Field::new("s_address", DataType::Utf8, false),
Field::new("s_name", DataType::Utf8View, false),
Field::new("s_address", DataType::Utf8View, false),
Field::new("s_nationkey", DataType::Int32, false),
Field::new("s_phone", DataType::Utf8, false),
Field::new("s_phone", DataType::Utf8View, false),
Field::new("s_acctbal", DataType::Float64, false),
Field::new("s_comment", DataType::Utf8, false),
Field::new("s_comment", DataType::Utf8View, false),
]);
pub static ref PARTSUPP: Schema = Schema::new(vec![
Field::new("ps_partkey", DataType::Int64, false),
Field::new("ps_suppkey", DataType::Int64, false),
Field::new("ps_availqty", DataType::Int64, false),
Field::new("ps_supplycost", DataType::Float64, false),
Field::new("ps_comment", DataType::Utf8, false),
Field::new("ps_comment", DataType::Utf8View, false),
]);
pub static ref CUSTOMER: Schema = Schema::new(vec![
Field::new("c_custkey", DataType::Int64, false),
Field::new("c_name", DataType::Utf8, false),
Field::new("c_address", DataType::Utf8, false),
Field::new("c_name", DataType::Utf8View, false),
Field::new("c_address", DataType::Utf8View, false),
Field::new("c_nationkey", DataType::Int64, false),
Field::new("c_phone", DataType::Utf8, false),
Field::new("c_phone", DataType::Utf8View, false),
Field::new("c_acctbal", DataType::Float64, false),
Field::new("c_mktsegment", DataType::Utf8, false),
Field::new("c_comment", DataType::Utf8, false),
Field::new("c_mktsegment", DataType::Utf8View, false),
Field::new("c_comment", DataType::Utf8View, false),
]);
pub static ref ORDERS: Schema = Schema::new(vec![
Field::new("o_orderkey", DataType::Int64, false),
Field::new("o_custkey", DataType::Int64, false),
Field::new("o_orderstatus", DataType::Utf8, false),
Field::new("o_orderstatus", DataType::Utf8View, false),
Field::new("o_totalprice", DataType::Float64, false),
Field::new("o_orderdate", DataType::Date32, false),
Field::new("o_orderpriority", DataType::Utf8, false),
Field::new("o_clerk", DataType::Utf8, false),
Field::new("o_orderpriority", DataType::Utf8View, false),
Field::new("o_clerk", DataType::Utf8View, false),
Field::new("o_shippriority", DataType::Int32, false),
Field::new("o_comment", DataType::Utf8, false),
Field::new("o_comment", DataType::Utf8View, false),
]);
pub static ref LINEITEM: Schema = Schema::new(vec![
Field::new("l_orderkey", DataType::Int64, false),
Expand All @@ -73,13 +73,13 @@ lazy_static! {
Field::new("l_extendedprice", DataType::Float64, false),
Field::new("l_discount", DataType::Float64, false),
Field::new("l_tax", DataType::Float64, false),
Field::new("l_returnflag", DataType::Utf8, false),
Field::new("l_linestatus", DataType::Utf8, false),
Field::new("l_returnflag", DataType::Utf8View, false),
Field::new("l_linestatus", DataType::Utf8View, false),
Field::new("l_shipdate", DataType::Date32, false),
Field::new("l_commitdate", DataType::Date32, false),
Field::new("l_receiptdate", DataType::Date32, false),
Field::new("l_shipinstruct", DataType::Utf8, false),
Field::new("l_shipmode", DataType::Utf8, false),
Field::new("l_comment", DataType::Utf8, false),
Field::new("l_shipinstruct", DataType::Utf8View, false),
Field::new("l_shipmode", DataType::Utf8View, false),
Field::new("l_comment", DataType::Utf8View, false),
]);
}
1 change: 1 addition & 0 deletions encodings/dict/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ categories = { workspace = true }
readme = { workspace = true }

[dependencies]
arrow-array = { workspace = true }
arrow-buffer = { workspace = true }
hashbrown = { workspace = true }
num-traits = { workspace = true }
Expand Down
119 changes: 117 additions & 2 deletions encodings/dict/src/compress.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,15 @@ use hashbrown::hash_map::{Entry, RawEntryMut};
use hashbrown::{DefaultHashBuilder, HashMap};
use num_traits::AsPrimitive;
use vortex::accessor::ArrayAccessor;
use vortex::array::{PrimitiveArray, VarBinArray};
use vortex::array::{PrimitiveArray, VarBinArray, VarBinViewArray};
use vortex::validity::Validity;
use vortex::{ArrayDType, IntoArray};
use vortex::{ArrayDType, IntoArray, IntoCanonical};
use vortex_dtype::{match_each_native_ptype, DType, NativePType, ToBytes};
use vortex_error::VortexExpect as _;

/// Statically assigned code for a null value.
pub const NULL_CODE: u64 = 0;

#[derive(Debug)]
struct Value<T>(T);

Expand Down Expand Up @@ -89,6 +92,21 @@ pub fn dict_encode_varbin(array: &VarBinArray) -> (PrimitiveArray, VarBinArray)
.vortex_expect("Failed to dictionary encode varbin array")
}

/// Dictionary encode a VarbinViewArray.
pub fn dict_encode_varbinview(array: &VarBinViewArray) -> (PrimitiveArray, VarBinViewArray) {
let (codes, values) = array
.with_iterator(|iter| dict_encode_typed_varbin(array.dtype().clone(), iter))
.unwrap();
(
codes,
values
.into_canonical()
.vortex_expect("VarBin to canonical")
.into_varbinview()
.vortex_expect("VarBinView"),
Comment on lines +102 to +106
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

preferably do this in a single-pass

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we should be able to "fast-path" build a VarBinView since we know that the strings in values are unique (doesn't need to be in this PR)

)
}

fn lookup_bytes<'a, T: NativePType + AsPrimitive<usize>>(
offsets: &'a [T],
bytes: &'a [u8],
Expand All @@ -99,6 +117,103 @@ fn lookup_bytes<'a, T: NativePType + AsPrimitive<usize>>(
&bytes[begin..end]
}

// Impl our own for different things here.

// fn dict_encode_typed_view<I, U>(dtype: DType, values: I) -> (PrimitiveArray, VarBinViewArray)
// where
// I: Iterator<Item = Option<U>>,
// U: AsRef<[u8]>,
// {
// let (lower, _) = values.size_hint();
// let hasher = DefaultHashBuilder::default();
// let mut lookup_dict: HashMap<u64, (), ()> = HashMap::with_hasher(());
//
// // The codes, which will become the primitive array.
// let mut codes: Vec<u64> = Vec::with_capacity(lower);
//
// let mut views: Vec<BinaryView> = Vec::new();
//
// // Generate a new output buffer once we've overflowed the i32 range.
// let mut buffers: Vec<Vec<u8>> = Vec::with_capacity(1);
// let mut string_heap: Vec<u8> = Vec::with_capacity(1024);
//
// // Accumulate a temporary buffer of code bytes for fast lookups.
//
// for o_val in values {
// match o_val {
// None => codes.push(NULL_CODE),
// Some(val) => {
// let byte_ref = val.as_ref();
// let value_hash = hasher.hash_one(byte_ref);
// let raw_entry = lookup_dict
// .raw_entry_mut()
// .from_hash(value_hash, |idx| todo!());
//
// let code = match raw_entry {
// RawEntryMut::Occupied(o) => *o.into_key(),
// RawEntryMut::Vacant(vac) => {
// let next_code = views.len() as u64;
// let slice = val.as_ref();
// assert!(
// slice.len() < (i32::MAX as usize),
// "cannot append a value of length >= 2^31 to VarBinViewArray"
// );
// if slice.len() >= BinaryView::MAX_INLINED_SIZE {
// // Rollover a new heap.
// if string_heap.len() + slice.len() >= (i32::MAX as usize) {
// buffers.push(string_heap);
// string_heap = Vec::with_capacity(1024);
// }
//
// views.push(BinaryView::new_view(
// slice.len() as u32,
// [slice[0], slice[1], slice[2], slice[3]],
// buffers.len() as u32,
// string_heap.len() as u32,
// ));
//
// string_heap.extend_from_slice(slice);
// } else {
// // Append an inlined view.
// views.push(BinaryView::new_inlined(slice));
// }
//
// vac.insert_with_hasher(value_hash, next_code, (), |code| {
// // Get access to the value...again.
// hasher.hash_one(
//
// )
// });
// next_code
// }
// };
// codes.push(code)
// }
// }
// }
//
// let values_validity = if dtype.is_nullable() {
// let mut validity = BooleanBufferBuilder::new(views.len());
// validity.append(false); // First code is false
// validity.append_n(true, offsets.len() - 2);
//
// validity.into()
// } else {
// Validity::NonNullable
// };
//
// (
// PrimitiveArray::from(codes),
// VarBinArray::try_new(
// PrimitiveArray::from(offsets).into_array(),
// PrimitiveArray::from(bytes).into_array(),
// dtype,
// values_validity,
// )
// .vortex_expect("Failed to create VarBinArray dictionary during encoding"),
// )
// }

fn dict_encode_typed_varbin<I, U>(dtype: DType, values: I) -> (PrimitiveArray, VarBinArray)
where
I: Iterator<Item = Option<U>>,
Expand Down
28 changes: 17 additions & 11 deletions encodings/dict/src/compute.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,11 +55,12 @@ impl SliceFn for DictArray {

#[cfg(test)]
mod test {
use vortex::array::{PrimitiveArray, VarBinArray};
use vortex::accessor::ArrayAccessor;
use vortex::array::{PrimitiveArray, VarBinViewArray};
use vortex::{IntoArray, IntoArrayVariant, ToArray};
use vortex_dtype::{DType, Nullability};

use crate::{dict_encode_typed_primitive, dict_encode_varbin, DictArray};
use crate::{dict_encode_typed_primitive, dict_encode_varbinview, DictArray};

#[test]
fn flatten_nullable_primitive() {
Expand All @@ -79,20 +80,25 @@ mod test {

#[test]
fn flatten_nullable_varbin() {
let reference = VarBinArray::from_iter(
let reference = VarBinViewArray::from_iter(
vec![Some("a"), Some("b"), None, Some("a"), None, Some("b")],
DType::Utf8(Nullability::Nullable),
);
let (codes, values) = dict_encode_varbin(&reference);
assert_eq!(reference.len(), 6);
let (codes, values) = dict_encode_varbinview(&reference);
let dict = DictArray::try_new(codes.into_array(), values.into_array()).unwrap();
let flattened_dict = dict.to_array().into_varbin().unwrap();
let flattened_dict = dict.to_array().into_varbinview().unwrap();
assert_eq!(
flattened_dict.offsets().into_primitive().unwrap().buffer(),
reference.offsets().into_primitive().unwrap().buffer()
);
assert_eq!(
flattened_dict.bytes().into_primitive().unwrap().buffer(),
reference.bytes().into_primitive().unwrap().buffer()
flattened_dict
.with_iterator(|iter| iter
.map(|slice| slice.map(|s| s.to_vec()))
.collect::<Vec<_>>())
.unwrap(),
reference
.with_iterator(|iter| iter
.map(|slice| slice.map(|s| s.to_vec()))
.collect::<Vec<_>>())
.unwrap(),
);
}
}
18 changes: 11 additions & 7 deletions encodings/fsst/src/canonical.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use vortex::array::{PrimitiveArray, VarBinArray};
use vortex::{ArrayDType, Canonical, IntoArray, IntoCanonical};
use vortex::{ArrayDType, Canonical, IntoArray, IntoArrayVariant, IntoCanonical};
use vortex_error::VortexResult;

use crate::FSSTArray;
Expand Down Expand Up @@ -44,12 +44,16 @@ impl IntoCanonical for FSSTArray {
let offsets_array = PrimitiveArray::from(offsets).into_array();
let uncompressed_bytes_array = PrimitiveArray::from(uncompressed_bytes).into_array();

Ok(Canonical::VarBin(VarBinArray::try_new(
offsets_array,
uncompressed_bytes_array,
self.dtype().clone(),
self.validity(),
)?))
// TODO(aduffy): do this without the intermediate VarBin
Ok(Canonical::VarBinView(
VarBinArray::try_new(
offsets_array,
uncompressed_bytes_array,
self.dtype().clone(),
self.validity(),
)?
.into_varbinview()?,
))
})
}
}
2 changes: 1 addition & 1 deletion encodings/fsst/tests/fsst_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ fn test_fsst_array_ops() {
.clone()
.into_canonical()
.unwrap()
.into_varbin()
.into_varbinview()
.unwrap()
.into_array();

Expand Down
Loading
Loading