diff --git a/Cargo.lock b/Cargo.lock index 74becf0b1c..a1f05105cc 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -84,9 +84,9 @@ checksum = "4b46cbb362ab8752921c97e041f5e366ee6297bd428a31275b9fcf1e380f7299" [[package]] name = "anstyle" -version = "1.0.7" +version = "1.0.8" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "038dfcf04a5feb68e9c60b21c9625a54c2c0616e79b72b0fd87075a056ae1d1b" +checksum = "1bec1de6f59aedf83baf9ff929c98f2ad654b97c9510f4e70cf6f661d49fd5b1" [[package]] name = "anyhow" @@ -615,9 +615,9 @@ checksum = "37b2a672a2cb129a2e41c10b1224bb368f9f37a2b16b612598138befd7b37eb5" [[package]] name = "cc" -version = "1.1.6" +version = "1.1.7" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2aba8f4e9906c7ce3c73463f62a7f0c65183ada1a2d47e397cc8810827f9694f" +checksum = "26a5c3fd7bfa1ce3897a3a3501d362b2d87b7f2583ebcb4a949ec25911025cbc" dependencies = [ "jobserver", "libc", @@ -695,18 +695,18 @@ dependencies = [ [[package]] name = "clap" -version = "4.5.10" +version = "4.5.11" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8f6b81fb3c84f5563d509c59b5a48d935f689e993afa90fe39047f05adef9142" +checksum = "35723e6a11662c2afb578bcf0b88bf6ea8e21282a953428f240574fcc3a2b5b3" dependencies = [ "clap_builder", ] [[package]] name = "clap_builder" -version = "4.5.10" +version = "4.5.11" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5ca6706fd5224857d9ac5eb9355f6683563cc0541c7cd9d014043b57cbec78ac" +checksum = "49eb96cbfa7cfa35017b7cd548c75b14c3118c98b423041d70562665e07fb0fa" dependencies = [ "anstyle", "clap_lex", @@ -715,9 +715,9 @@ dependencies = [ [[package]] name = "clap_lex" -version = "0.7.1" +version = "0.7.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4b82cf0babdbd58558212896d1a4272303a57bdb245c2bf1147185fb45640e70" +checksum = "1462739cb27611015575c0c11df5df7601141071f07518d56fcc1be504cbec97" [[package]] name = "comfy-table" @@ -935,11 +935,12 @@ dependencies = [ [[package]] name = "dashmap" -version = "5.5.3" +version = "6.0.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "978747c1d849a7d2ee5e8adc0159961c48fb7e5db2f06af6723b80123bb53856" +checksum = "804c8821570c3f8b70230c2ba75ffa5c0f9a4189b9a432b6656c536712acae28" dependencies = [ "cfg-if", + "crossbeam-utils", "hashbrown", "lock_api", "once_cell", @@ -949,8 +950,7 @@ dependencies = [ [[package]] name = "datafusion" version = "40.0.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ab9d55a9cd2634818953809f75ebe5248b00dd43c3227efb2a51a2d5feaad54e" +source = "git+https://github.com/apache/datafusion.git?rev=2f5e73c6aa82a3c45ff348ce0d1ea4eec4fc2a0d#2f5e73c6aa82a3c45ff348ce0d1ea4eec4fc2a0d" dependencies = [ "ahash", "arrow", @@ -963,16 +963,18 @@ dependencies = [ "bzip2", "chrono", "dashmap", + "datafusion-catalog", "datafusion-common", "datafusion-common-runtime", "datafusion-execution", "datafusion-expr", "datafusion-functions", "datafusion-functions-aggregate", - "datafusion-functions-array", + "datafusion-functions-nested", "datafusion-optimizer", "datafusion-physical-expr", "datafusion-physical-expr-common", + "datafusion-physical-optimizer", "datafusion-physical-plan", "datafusion-sql", "flate2", @@ -1000,11 +1002,23 @@ dependencies = [ "zstd", ] +[[package]] +name = "datafusion-catalog" +version = "40.0.0" +source = "git+https://github.com/apache/datafusion.git?rev=2f5e73c6aa82a3c45ff348ce0d1ea4eec4fc2a0d#2f5e73c6aa82a3c45ff348ce0d1ea4eec4fc2a0d" +dependencies = [ + "arrow-schema", + "async-trait", + "datafusion-common", + "datafusion-execution", + "datafusion-expr", + "datafusion-physical-plan", +] + [[package]] name = "datafusion-common" version = "40.0.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "def66b642959e7f96f5d2da22e1f43d3bd35598f821e5ce351a0553e0f1b7367" +source = "git+https://github.com/apache/datafusion.git?rev=2f5e73c6aa82a3c45ff348ce0d1ea4eec4fc2a0d#2f5e73c6aa82a3c45ff348ce0d1ea4eec4fc2a0d" dependencies = [ "ahash", "arrow", @@ -1025,8 +1039,7 @@ dependencies = [ [[package]] name = "datafusion-common-runtime" version = "40.0.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f104bb9cb44c06c9badf8a0d7e0855e5f7fa5e395b887d7f835e8a9457dc1352" +source = "git+https://github.com/apache/datafusion.git?rev=2f5e73c6aa82a3c45ff348ce0d1ea4eec4fc2a0d#2f5e73c6aa82a3c45ff348ce0d1ea4eec4fc2a0d" dependencies = [ "tokio", ] @@ -1034,8 +1047,7 @@ dependencies = [ [[package]] name = "datafusion-execution" version = "40.0.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2ac0fd8b5d80bbca3fc3b6f40da4e9f6907354824ec3b18bbd83fee8cf5c3c3e" +source = "git+https://github.com/apache/datafusion.git?rev=2f5e73c6aa82a3c45ff348ce0d1ea4eec4fc2a0d#2f5e73c6aa82a3c45ff348ce0d1ea4eec4fc2a0d" dependencies = [ "arrow", "chrono", @@ -1055,8 +1067,7 @@ dependencies = [ [[package]] name = "datafusion-expr" version = "40.0.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2103d2cc16fb11ef1fa993a6cac57ed5cb028601db4b97566c90e5fa77aa1e68" +source = "git+https://github.com/apache/datafusion.git?rev=2f5e73c6aa82a3c45ff348ce0d1ea4eec4fc2a0d#2f5e73c6aa82a3c45ff348ce0d1ea4eec4fc2a0d" dependencies = [ "ahash", "arrow", @@ -1074,10 +1085,10 @@ dependencies = [ [[package]] name = "datafusion-functions" version = "40.0.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a369332afd0ef5bd565f6db2139fb9f1dfdd0afa75a7f70f000b74208d76994f" +source = "git+https://github.com/apache/datafusion.git?rev=2f5e73c6aa82a3c45ff348ce0d1ea4eec4fc2a0d#2f5e73c6aa82a3c45ff348ce0d1ea4eec4fc2a0d" dependencies = [ "arrow", + "arrow-buffer", "base64", "blake2", "blake3", @@ -1100,8 +1111,7 @@ dependencies = [ [[package]] name = "datafusion-functions-aggregate" version = "40.0.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "92718db1aff70c47e5abf9fc975768530097059e5db7c7b78cd64b5e9a11fc77" +source = "git+https://github.com/apache/datafusion.git?rev=2f5e73c6aa82a3c45ff348ce0d1ea4eec4fc2a0d#2f5e73c6aa82a3c45ff348ce0d1ea4eec4fc2a0d" dependencies = [ "ahash", "arrow", @@ -1116,10 +1126,9 @@ dependencies = [ ] [[package]] -name = "datafusion-functions-array" +name = "datafusion-functions-nested" version = "40.0.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "30bb80f46ff3dcf4bb4510209c2ba9b8ce1b716ac8b7bf70c6bf7dca6260c831" +source = "git+https://github.com/apache/datafusion.git?rev=2f5e73c6aa82a3c45ff348ce0d1ea4eec4fc2a0d#2f5e73c6aa82a3c45ff348ce0d1ea4eec4fc2a0d" dependencies = [ "arrow", "arrow-array", @@ -1134,13 +1143,13 @@ dependencies = [ "itertools 0.12.1", "log", "paste", + "rand", ] [[package]] name = "datafusion-optimizer" version = "40.0.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "82f34692011bec4fdd6fc18c264bf8037b8625d801e6dd8f5111af15cb6d71d3" +source = "git+https://github.com/apache/datafusion.git?rev=2f5e73c6aa82a3c45ff348ce0d1ea4eec4fc2a0d#2f5e73c6aa82a3c45ff348ce0d1ea4eec4fc2a0d" dependencies = [ "arrow", "async-trait", @@ -1159,8 +1168,7 @@ dependencies = [ [[package]] name = "datafusion-physical-expr" version = "40.0.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "45538630defedb553771434a437f7ca8f04b9b3e834344aafacecb27dc65d5e5" +source = "git+https://github.com/apache/datafusion.git?rev=2f5e73c6aa82a3c45ff348ce0d1ea4eec4fc2a0d#2f5e73c6aa82a3c45ff348ce0d1ea4eec4fc2a0d" dependencies = [ "ahash", "arrow", @@ -1189,8 +1197,7 @@ dependencies = [ [[package]] name = "datafusion-physical-expr-common" version = "40.0.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9d8a72b0ca908e074aaeca52c14ddf5c28d22361e9cb6bc79bb733cd6661b536" +source = "git+https://github.com/apache/datafusion.git?rev=2f5e73c6aa82a3c45ff348ce0d1ea4eec4fc2a0d#2f5e73c6aa82a3c45ff348ce0d1ea4eec4fc2a0d" dependencies = [ "ahash", "arrow", @@ -1200,11 +1207,21 @@ dependencies = [ "rand", ] +[[package]] +name = "datafusion-physical-optimizer" +version = "40.0.0" +source = "git+https://github.com/apache/datafusion.git?rev=2f5e73c6aa82a3c45ff348ce0d1ea4eec4fc2a0d#2f5e73c6aa82a3c45ff348ce0d1ea4eec4fc2a0d" +dependencies = [ + "datafusion-common", + "datafusion-execution", + "datafusion-physical-expr", + "datafusion-physical-plan", +] + [[package]] name = "datafusion-physical-plan" version = "40.0.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b504eae6107a342775e22e323e9103f7f42db593ec6103b28605b7b7b1405c4a" +source = "git+https://github.com/apache/datafusion.git?rev=2f5e73c6aa82a3c45ff348ce0d1ea4eec4fc2a0d#2f5e73c6aa82a3c45ff348ce0d1ea4eec4fc2a0d" dependencies = [ "ahash", "arrow", @@ -1237,8 +1254,7 @@ dependencies = [ [[package]] name = "datafusion-sql" version = "40.0.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e5db33f323f41b95ae201318ba654a9bf11113e58a51a1dff977b1a836d3d889" +source = "git+https://github.com/apache/datafusion.git?rev=2f5e73c6aa82a3c45ff348ce0d1ea4eec4fc2a0d#2f5e73c6aa82a3c45ff348ce0d1ea4eec4fc2a0d" dependencies = [ "arrow", "arrow-array", @@ -2389,7 +2405,7 @@ version = "0.7.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "af1844ef2428cc3e1cb900be36181049ef3d3193c63e43026cfe202983b27a56" dependencies = [ - "proc-macro-crate 1.3.1", + "proc-macro-crate 3.1.0", "proc-macro2", "quote", "syn 2.0.72", @@ -2728,9 +2744,13 @@ checksum = "439ee305def115ba05938db6eb1644ff94165c5ab5e9420d1c1bcedbba909391" [[package]] name = "ppv-lite86" -version = "0.2.17" +version = "0.2.19" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5b40af805b3121feab8a3c29f04d8ad262fa8e0561883e7653e024ae4479e6de" +checksum = "2288c0e17cc8d342c712bb43a257a80ebffce59cdb33d5000d8348f3ec02528b" +dependencies = [ + "zerocopy", + "zerocopy-derive", +] [[package]] name = "prettyplease" @@ -2802,7 +2822,7 @@ checksum = "5bb182580f71dd070f88d01ce3de9f4da5021db7115d2e1c3605a754153b77c1" dependencies = [ "bytes", "heck 0.5.0", - "itertools 0.10.5", + "itertools 0.13.0", "log", "multimap", "once_cell", @@ -2822,7 +2842,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "18bec9b0adc4eba778b33684b7ba3e7137789434769ee3ce3930463ef904cfca" dependencies = [ "anyhow", - "itertools 0.10.5", + "itertools 0.13.0", "proc-macro2", "quote", "syn 2.0.72", @@ -3539,9 +3559,9 @@ checksum = "6980e8d7511241f8acf4aebddbb1ff938df5eebe98691418c4468d0b72a96a67" [[package]] name = "sqlparser" -version = "0.47.0" +version = "0.49.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "295e9930cd7a97e58ca2a070541a3ca502b17f5d1fa7157376d0fabd85324f25" +checksum = "a4a404d0e14905361b918cb8afdb73605e25c1d5029312bd9785142dcb3aa49e" dependencies = [ "log", "sqlparser_derive", @@ -3852,9 +3872,9 @@ dependencies = [ [[package]] name = "toml_datetime" -version = "0.6.6" +version = "0.6.8" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4badfd56924ae69bcc9039335b2e017639ce3f9b001c393c1b2d1ef846ce2cbf" +checksum = "0dd7358ecb8fc2f8d014bf86f6f638ce72ba252a2c3a2572f2a795f1d23efb41" [[package]] name = "toml_edit" @@ -4031,9 +4051,9 @@ checksum = "accd4ea62f7bb7a82fe23066fb0957d48ef677f6eeb8215f372f52e48bb32426" [[package]] name = "version_check" -version = "0.9.4" +version = "0.9.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "49874b5167b65d7193b8aba1567f5c7d93d001cafc34600cee003eda787e483f" +checksum = "0b928f33d975fc6ad9f86c8f283853ad26bdd5b10b7f1542aa2fa15e2289105a" [[package]] name = "vortex-alp" @@ -4079,6 +4099,7 @@ dependencies = [ "rand", "rstest", "serde", + "static_assertions", "tokio", "vortex-buffer", "vortex-dtype", @@ -4787,6 +4808,7 @@ version = "0.7.35" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1b9b4fd18abc82b8136838da5d50bae7bdea537c574d8dc1a34ed098d6c166f0" dependencies = [ + "byteorder", "zerocopy-derive", ] @@ -4836,9 +4858,9 @@ dependencies = [ [[package]] name = "zstd-sys" -version = "2.0.11+zstd.1.5.6" +version = "2.0.12+zstd.1.5.6" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "75652c55c0b6f3e6f12eb786fe1bc960396bf05a1eb3bf1f3691c3610ac2e6d4" +checksum = "0a4e40c320c3cb459d9a9ff6de98cff88f4751ee9275d140e2be94a2b74e4c13" dependencies = [ "cc", "pkg-config", diff --git a/Cargo.toml b/Cargo.toml index d31ba9f1e2..7551a384a8 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -34,17 +34,17 @@ ahash = "0.8.11" allocator-api2 = "0.2.16" anyhow = "1.0" arrayref = "0.3.7" -arrow = { version = "52.0.0", features = ["pyarrow"] } -arrow-arith = "52.0.0" -arrow-array = "52.0.0" -arrow-buffer = "52.0.0" -arrow-cast = "52.0.0" -arrow-csv = "52.0.0" -arrow-data = "52.0.0" -arrow-ipc = "52.0.0" -arrow-ord = "52.0.0" -arrow-schema = "52.0.0" -arrow-select = "52.0.0" +arrow = { version = "52.1.0", features = ["pyarrow"] } +arrow-arith = "52.2.0" +arrow-array = "52.2.0" +arrow-buffer = "52.2.0" +arrow-cast = "52.2.0" +arrow-csv = "52.2.0" +arrow-data = "52.2.0" +arrow-ipc = "52.2.0" +arrow-ord = "52.2.0" +arrow-schema = "52.2.0" +arrow-select = "52.2.0" async-trait = "0.1" bindgen = "0.69.4" bytes = "1.6.0" @@ -104,6 +104,7 @@ serde = "1.0.197" serde_json = "1.0.116" serde_test = "1.0.176" simplelog = { version = "0.12.2", features = ["paris"] } +static_assertions = "1.1.0" thiserror = "1.0.58" tokio = "1.37.0" uninit = "0.6.2" @@ -119,3 +120,11 @@ warnings = "deny" [workspace.lints.clippy] all = { level = "deny", priority = -1 } or_fun_call = "deny" + +[patch.crates-io] +datafusion = { git = "https://github.com/apache/datafusion.git", rev = "2f5e73c6aa82a3c45ff348ce0d1ea4eec4fc2a0d" } +datafusion-common = { git = "https://github.com/apache/datafusion.git", rev = "2f5e73c6aa82a3c45ff348ce0d1ea4eec4fc2a0d" } +datafusion-execution = { git = "https://github.com/apache/datafusion.git", rev = "2f5e73c6aa82a3c45ff348ce0d1ea4eec4fc2a0d" } +datafusion-expr = { git = "https://github.com/apache/datafusion.git", rev = "2f5e73c6aa82a3c45ff348ce0d1ea4eec4fc2a0d" } +datafusion-physical-expr = { git = "https://github.com/apache/datafusion.git", rev = "2f5e73c6aa82a3c45ff348ce0d1ea4eec4fc2a0d" } +datafusion-physical-plan = { git = "https://github.com/apache/datafusion.git", rev = "2f5e73c6aa82a3c45ff348ce0d1ea4eec4fc2a0d" } diff --git a/bench-vortex/src/bin/tpch_benchmark.rs b/bench-vortex/src/bin/tpch_benchmark.rs index 90f392caad..13cc5475f2 100644 --- a/bench-vortex/src/bin/tpch_benchmark.rs +++ b/bench-vortex/src/bin/tpch_benchmark.rs @@ -18,10 +18,13 @@ async fn main() { // The formats to run against (vs the baseline) let formats = [ Format::Arrow, - Format::Parquet, Format::Vortex { disable_pushdown: false, }, + Format::Vortex { + disable_pushdown: true, + }, + // Format::Parquet, ]; // Load datasets diff --git a/bench-vortex/src/lib.rs b/bench-vortex/src/lib.rs index a734692a9a..fe2b91142e 100644 --- a/bench-vortex/src/lib.rs +++ b/bench-vortex/src/lib.rs @@ -3,6 +3,7 @@ use std::collections::HashSet; use std::env::temp_dir; use std::fs::{create_dir_all, File}; +use std::future::Future; use std::path::{Path, PathBuf}; use std::sync::Arc; @@ -95,6 +96,23 @@ pub fn idempotent( Ok(data_path) } +pub async fn idempotent_async( + path: &P, + f: impl FnOnce(PathBuf) -> F, +) -> Result +where + F: Future>, + P: IdempotentPath + ?Sized, +{ + let data_path = path.to_data_path(); + if !data_path.exists() { + let temp_location = path.to_temp_path(); + f(temp_location.clone()).await?; + std::fs::rename(temp_location.as_path(), &data_path).unwrap(); + } + Ok(data_path) +} + pub trait IdempotentPath { fn to_data_path(&self) -> PathBuf; fn to_temp_path(&self) -> PathBuf; diff --git a/bench-vortex/src/tpch/mod.rs b/bench-vortex/src/tpch/mod.rs index 59c8914794..03ba5cb96b 100644 --- a/bench-vortex/src/tpch/mod.rs +++ b/bench-vortex/src/tpch/mod.rs @@ -7,13 +7,12 @@ use arrow_schema::Schema; use datafusion::dataframe::DataFrameWriteOptions; use datafusion::datasource::MemTable; use datafusion::prelude::{CsvReadOptions, ParquetReadOptions, SessionContext}; -use futures::executor::block_on; use vortex::array::chunked::ChunkedArray; use vortex::arrow::FromArrowArray; use vortex::{Array, ArrayDType, ArrayData, IntoArray}; use vortex_datafusion::{SessionContextExt, VortexMemTableOptions}; -use crate::idempotent; +use crate::idempotent_async; pub mod dbgen; pub mod schema; @@ -132,30 +131,32 @@ async fn register_parquet( file: &Path, schema: &Schema, ) -> anyhow::Result<()> { - // Idempotent conversion from TPCH CSV to Parquet. - let pq_file = idempotent( + let csv_file = file.to_str().unwrap(); + let pq_file = idempotent_async( &file.with_extension("").with_extension("parquet"), - |pq_file| { - let df = block_on( - session.read_csv( - file.to_str().unwrap(), + |pq_file| async move { + let df = session + .read_csv( + csv_file, CsvReadOptions::default() .delimiter(b'|') .has_header(false) .file_extension("tbl") .schema(schema), - ), - ) - .unwrap(); + ) + .await?; - block_on(df.write_parquet( - pq_file.as_os_str().to_str().unwrap(), + df.write_parquet( + pq_file.as_path().as_os_str().to_str().unwrap(), DataFrameWriteOptions::default(), None, - )) + ) + .await?; + + Ok::<(), anyhow::Error>(()) }, ) - .unwrap(); + .await?; Ok(session .register_parquet( diff --git a/bench-vortex/src/tpch/schema.rs b/bench-vortex/src/tpch/schema.rs index eb4971b66d..6d15ec37f8 100644 --- a/bench-vortex/src/tpch/schema.rs +++ b/bench-vortex/src/tpch/schema.rs @@ -7,62 +7,62 @@ use lazy_static::lazy_static; lazy_static! { pub static ref NATION: Schema = Schema::new(vec![ Field::new("n_nationkey", DataType::Int64, false), - Field::new("n_name", DataType::Utf8, false), + Field::new("n_name", DataType::Utf8View, false), Field::new("n_regionkey", DataType::Int64, false), - Field::new("n_comment", DataType::Utf8, true), + Field::new("n_comment", DataType::Utf8View, true), ]); pub static ref REGION: Schema = Schema::new(vec![ Field::new("r_regionkey", DataType::Int64, false), - Field::new("r_name", DataType::Utf8, false), - Field::new("r_comment", DataType::Utf8, true), + Field::new("r_name", DataType::Utf8View, false), + Field::new("r_comment", DataType::Utf8View, true), ]); pub static ref PART: Schema = Schema::new(vec![ Field::new("p_partkey", DataType::Int64, false), - Field::new("p_name", DataType::Utf8, false), - Field::new("p_mfgr", DataType::Utf8, false), - Field::new("p_brand", DataType::Utf8, false), - Field::new("p_type", DataType::Utf8, false), + Field::new("p_name", DataType::Utf8View, false), + Field::new("p_mfgr", DataType::Utf8View, false), + Field::new("p_brand", DataType::Utf8View, false), + Field::new("p_type", DataType::Utf8View, false), Field::new("p_size", DataType::Int32, false), - Field::new("p_container", DataType::Utf8, false), + Field::new("p_container", DataType::Utf8View, false), Field::new("p_retailprice", DataType::Float64, false), - Field::new("p_comment", DataType::Utf8, false), + Field::new("p_comment", DataType::Utf8View, false), ]); pub static ref SUPPLIER: Schema = Schema::new(vec![ Field::new("s_suppkey", DataType::Int64, false), - Field::new("s_name", DataType::Utf8, false), - Field::new("s_address", DataType::Utf8, false), + Field::new("s_name", DataType::Utf8View, false), + Field::new("s_address", DataType::Utf8View, false), Field::new("s_nationkey", DataType::Int32, false), - Field::new("s_phone", DataType::Utf8, false), + Field::new("s_phone", DataType::Utf8View, false), Field::new("s_acctbal", DataType::Float64, false), - Field::new("s_comment", DataType::Utf8, false), + Field::new("s_comment", DataType::Utf8View, false), ]); pub static ref PARTSUPP: Schema = Schema::new(vec![ Field::new("ps_partkey", DataType::Int64, false), Field::new("ps_suppkey", DataType::Int64, false), Field::new("ps_availqty", DataType::Int64, false), Field::new("ps_supplycost", DataType::Float64, false), - Field::new("ps_comment", DataType::Utf8, false), + Field::new("ps_comment", DataType::Utf8View, false), ]); pub static ref CUSTOMER: Schema = Schema::new(vec![ Field::new("c_custkey", DataType::Int64, false), - Field::new("c_name", DataType::Utf8, false), - Field::new("c_address", DataType::Utf8, false), + Field::new("c_name", DataType::Utf8View, false), + Field::new("c_address", DataType::Utf8View, false), Field::new("c_nationkey", DataType::Int64, false), - Field::new("c_phone", DataType::Utf8, false), + Field::new("c_phone", DataType::Utf8View, false), Field::new("c_acctbal", DataType::Float64, false), - Field::new("c_mktsegment", DataType::Utf8, false), - Field::new("c_comment", DataType::Utf8, false), + Field::new("c_mktsegment", DataType::Utf8View, false), + Field::new("c_comment", DataType::Utf8View, false), ]); pub static ref ORDERS: Schema = Schema::new(vec![ Field::new("o_orderkey", DataType::Int64, false), Field::new("o_custkey", DataType::Int64, false), - Field::new("o_orderstatus", DataType::Utf8, false), + Field::new("o_orderstatus", DataType::Utf8View, false), Field::new("o_totalprice", DataType::Float64, false), Field::new("o_orderdate", DataType::Date32, false), - Field::new("o_orderpriority", DataType::Utf8, false), - Field::new("o_clerk", DataType::Utf8, false), + Field::new("o_orderpriority", DataType::Utf8View, false), + Field::new("o_clerk", DataType::Utf8View, false), Field::new("o_shippriority", DataType::Int32, false), - Field::new("o_comment", DataType::Utf8, false), + Field::new("o_comment", DataType::Utf8View, false), ]); pub static ref LINEITEM: Schema = Schema::new(vec![ Field::new("l_orderkey", DataType::Int64, false), @@ -73,13 +73,13 @@ lazy_static! { Field::new("l_extendedprice", DataType::Float64, false), Field::new("l_discount", DataType::Float64, false), Field::new("l_tax", DataType::Float64, false), - Field::new("l_returnflag", DataType::Utf8, false), - Field::new("l_linestatus", DataType::Utf8, false), + Field::new("l_returnflag", DataType::Utf8View, false), + Field::new("l_linestatus", DataType::Utf8View, false), Field::new("l_shipdate", DataType::Date32, false), Field::new("l_commitdate", DataType::Date32, false), Field::new("l_receiptdate", DataType::Date32, false), - Field::new("l_shipinstruct", DataType::Utf8, false), - Field::new("l_shipmode", DataType::Utf8, false), - Field::new("l_comment", DataType::Utf8, false), + Field::new("l_shipinstruct", DataType::Utf8View, false), + Field::new("l_shipmode", DataType::Utf8View, false), + Field::new("l_comment", DataType::Utf8View, false), ]); } diff --git a/encodings/dict/benches/dict_compress.rs b/encodings/dict/benches/dict_compress.rs index 3b82307c19..dc93711c4b 100644 --- a/encodings/dict/benches/dict_compress.rs +++ b/encodings/dict/benches/dict_compress.rs @@ -3,7 +3,7 @@ use rand::distributions::{Alphanumeric, Uniform}; use rand::prelude::SliceRandom; use rand::{thread_rng, Rng}; use vortex::array::primitive::PrimitiveArray; -use vortex::array::varbin::VarBinArray; +use vortex::array::varbinview::VarBinViewArray; use vortex::ArrayTrait; use vortex_dict::dict_encode_typed_primitive; use vortex_dtype::match_each_native_ptype; @@ -17,7 +17,7 @@ fn gen_primitive_dict(len: usize, uniqueness: f64) -> PrimitiveArray { PrimitiveArray::from(data) } -fn gen_varbin_dict(len: usize, uniqueness: f64) -> VarBinArray { +fn gen_varbinview_dict(len: usize, uniqueness: f64) -> VarBinViewArray { let mut rng = thread_rng(); let uniq_cnt = (len as f64 * uniqueness) as usize; let dict: Vec = (0..uniq_cnt) @@ -32,7 +32,7 @@ fn gen_varbin_dict(len: usize, uniqueness: f64) -> VarBinArray { let words: Vec<&str> = (0..len) .map(|_| dict.choose(&mut rng).unwrap().as_str()) .collect(); - VarBinArray::from(words) + VarBinViewArray::from_iter_str(words) } fn dict_encode_primitive(arr: &PrimitiveArray) -> usize { @@ -42,20 +42,20 @@ fn dict_encode_primitive(arr: &PrimitiveArray) -> usize { (codes.nbytes() + values.nbytes()) / arr.nbytes() } -fn dict_encode_varbin(arr: &VarBinArray) -> usize { - let (codes, values) = vortex_dict::dict_encode_varbin(arr); +fn dict_encode_varbinview(arr: &VarBinViewArray) -> usize { + let (codes, values) = vortex_dict::dict_encode(arr); (codes.nbytes() + values.nbytes()) / arr.nbytes() } fn dict_encode(c: &mut Criterion) { let primitive_arr = gen_primitive_dict(1_000_000, 0.00005); - let varbin_arr = gen_varbin_dict(1_000_000, 0.00005); + let varbinview_arr = gen_varbinview_dict(1_000_000, 0.00005); c.bench_function("dict_encode_primitives", |b| { b.iter(|| black_box(dict_encode_primitive(&primitive_arr))); }); c.bench_function("dict_encode_varbin", |b| { - b.iter(|| black_box(dict_encode_varbin(&varbin_arr))); + b.iter(|| black_box(dict_encode_varbinview(&varbinview_arr))); }); } diff --git a/encodings/dict/src/compress.rs b/encodings/dict/src/compress.rs index 7314f4c7d9..3774e98d50 100644 --- a/encodings/dict/src/compress.rs +++ b/encodings/dict/src/compress.rs @@ -84,9 +84,11 @@ pub fn dict_encode_typed_primitive( } /// Dictionary encode varbin array. Specializes for primitive byte arrays to avoid double copying -pub fn dict_encode_varbin(array: &VarBinArray) -> (PrimitiveArray, VarBinArray) { +pub fn dict_encode + ArrayDType>( + array: &U, +) -> (PrimitiveArray, VarBinArray) { array - .with_iterator(|iter| dict_encode_typed_varbin(array.dtype().clone(), iter)) + .with_iterator(|iter| dict_encode_typed(array.dtype().clone(), iter)) .unwrap() } @@ -100,7 +102,7 @@ fn lookup_bytes<'a, T: NativePType + AsPrimitive>( &bytes[begin..end] } -fn dict_encode_typed_varbin(dtype: DType, values: I) -> (PrimitiveArray, VarBinArray) +fn dict_encode_typed(dtype: DType, values: I) -> (PrimitiveArray, VarBinArray) where I: Iterator>, U: AsRef<[u8]>, @@ -183,7 +185,7 @@ mod test { use vortex_dtype::{DType, PType}; use vortex_scalar::Scalar; - use crate::compress::{dict_encode_typed_primitive, dict_encode_varbin}; + use crate::compress::{dict_encode, dict_encode_typed_primitive}; #[test] fn encode_primitive() { @@ -224,7 +226,7 @@ mod test { #[test] fn encode_varbin() { let arr = VarBinArray::from(vec!["hello", "world", "hello", "again", "world"]); - let (codes, values) = dict_encode_varbin(&arr); + let (codes, values) = dict_encode(&arr); assert_eq!(codes.maybe_null_slice::(), &[0, 1, 0, 2, 1]); values .with_iterator(|iter| { @@ -252,7 +254,7 @@ mod test { ] .into_iter() .collect(); - let (codes, values) = dict_encode_varbin(&arr); + let (codes, values) = dict_encode(&arr); assert_eq!(codes.maybe_null_slice::(), &[1, 0, 2, 1, 0, 3, 2, 0]); assert_eq!(str::from_utf8(&values.bytes_at(0).unwrap()).unwrap(), ""); values @@ -269,7 +271,7 @@ mod test { #[test] fn repeated_values() { let arr = VarBinArray::from(vec!["a", "a", "b", "b", "a", "b", "a", "b"]); - let (codes, values) = dict_encode_varbin(&arr); + let (codes, values) = dict_encode(&arr); values .with_iterator(|iter| { assert_eq!( diff --git a/encodings/dict/src/compute.rs b/encodings/dict/src/compute.rs index d1a503ae9f..a759894fc3 100644 --- a/encodings/dict/src/compute.rs +++ b/encodings/dict/src/compute.rs @@ -46,12 +46,12 @@ impl SliceFn for DictArray { #[cfg(test)] mod test { + use vortex::accessor::ArrayAccessor; use vortex::array::primitive::PrimitiveArray; - use vortex::array::varbin::VarBinArray; + use vortex::array::varbinview::VarBinViewArray; use vortex::{IntoArray, IntoArrayVariant, ToArray}; - use vortex_dtype::{DType, Nullability}; - use crate::{dict_encode_typed_primitive, dict_encode_varbin, DictArray}; + use crate::{dict_encode, dict_encode_typed_primitive, DictArray}; #[test] fn flatten_nullable_primitive() { @@ -71,20 +71,40 @@ mod test { #[test] fn flatten_nullable_varbin() { - let reference = VarBinArray::from_iter( - vec![Some("a"), Some("b"), None, Some("a"), None, Some("b")], - DType::Utf8(Nullability::Nullable), - ); - let (codes, values) = dict_encode_varbin(&reference); + let reference = VarBinViewArray::from_iter_nullable_str(vec![ + Some("a"), + Some("b"), + None, + Some("c"), + None, + Some("d"), + ]); + let (codes, values) = dict_encode(&reference); let dict = DictArray::try_new(codes.into_array(), values.into_array()).unwrap(); - let flattened_dict = dict.to_array().into_varbin().unwrap(); + let flattened_dict = dict.to_array().into_varbinview().unwrap(); + assert_eq!( - flattened_dict.offsets().into_primitive().unwrap().buffer(), - reference.offsets().into_primitive().unwrap().buffer() + flattened_dict.views().into_primitive().unwrap().buffer(), + reference.views().into_primitive().unwrap().buffer(), ); + + // All values should be preserved here as well. + let data: Vec> = flattened_dict + .with_iterator(|iter| { + iter.map(|s| s.map(|i| String::from_utf8_lossy(i).to_string())) + .collect() + }) + .unwrap(); assert_eq!( - flattened_dict.bytes().into_primitive().unwrap().buffer(), - reference.bytes().into_primitive().unwrap().buffer() + data, + vec![ + Some("a".to_string()), + Some("b".to_string()), + None, + Some("c".to_string()), + None, + Some("d".to_string()), + ] ); } } diff --git a/vortex-array/Cargo.toml b/vortex-array/Cargo.toml index 76a51bd5be..7bbdd8e06f 100644 --- a/vortex-array/Cargo.toml +++ b/vortex-array/Cargo.toml @@ -50,6 +50,7 @@ vortex-scalar = { path = "../vortex-scalar", features = [ "serde", ] } serde = { workspace = true, features = ["derive"] } +static_assertions = { workspace = true } [target.'cfg(target_arch = "wasm32")'.dependencies] # Enable the JS feature of getrandom (via rand) to supprt wasm32 target diff --git a/vortex-array/src/array/chunked/canonical.rs b/vortex-array/src/array/chunked/canonical.rs index 2532077e7d..364561de8d 100644 --- a/vortex-array/src/array/chunked/canonical.rs +++ b/vortex-array/src/array/chunked/canonical.rs @@ -3,19 +3,18 @@ use itertools::Itertools; use vortex_dtype::{DType, Nullability, PType, StructDType}; use vortex_error::{vortex_bail, ErrString, VortexResult}; -use crate::accessor::ArrayAccessor; use crate::array::bool::BoolArray; use crate::array::chunked::ChunkedArray; use crate::array::extension::ExtensionArray; use crate::array::null::NullArray; use crate::array::primitive::PrimitiveArray; use crate::array::struct_::StructArray; -use crate::array::varbin::builder::VarBinBuilder; -use crate::array::varbin::VarBinArray; +use crate::array::varbinview::{BinaryView, VarBinViewArray}; use crate::validity::Validity; use crate::variants::StructArrayTrait; use crate::{ - Array, ArrayDType, ArrayValidity, Canonical, IntoArray, IntoArrayVariant, IntoCanonical, + Array, ArrayDType, ArrayValidity, Canonical, IntoArray, IntoArrayData, IntoArrayVariant, + IntoCanonical, }; impl IntoCanonical for ChunkedArray { @@ -104,13 +103,9 @@ pub(crate) fn try_canonicalize_chunks( let prim_array = pack_primitives(chunks.as_slice(), *ptype, *nullability)?; Ok(Canonical::Primitive(prim_array)) } - DType::Utf8(nullability) => { - let varbin_array = pack_varbin(chunks.as_slice(), dtype, *nullability)?; - Ok(Canonical::VarBin(varbin_array)) - } - DType::Binary(nullability) => { - let varbin_array = pack_varbin(chunks.as_slice(), dtype, *nullability)?; - Ok(Canonical::VarBin(varbin_array)) + DType::Utf8(nullability) | DType::Binary(nullability) => { + let varbinview_array = pack_views(chunks.as_slice(), dtype, *nullability)?; + Ok(Canonical::VarBinView(varbinview_array)) } DType::Null => { let len = chunks.iter().map(|chunk| chunk.len()).sum(); @@ -196,29 +191,65 @@ fn pack_primitives( )) } -/// Builds a new [VarBinArray] by repacking the values from the chunks into a single +/// Builds a new [VarBinViewArray] by repacking the values from the chunks into a single /// contiguous array. /// /// It is expected this function is only called from [try_canonicalize_chunks], and thus all chunks have /// been checked to have the same DType already. -fn pack_varbin( +fn pack_views( chunks: &[Array], dtype: &DType, - _nullability: Nullability, -) -> VortexResult { - let len = chunks.iter().map(|chunk| chunk.len()).sum(); - let mut builder = VarBinBuilder::::with_capacity(len); - + nullability: Nullability, +) -> VortexResult { + let validity = validity_from_chunks(chunks, nullability); + let mut views = Vec::new(); + let mut buffers = Vec::new(); for chunk in chunks { - let chunk = chunk.clone().into_varbin()?; - chunk.with_iterator(|iter| { - for datum in iter { - builder.push(datum); + // Each chunk's views have buffer IDs that are zero-referenced. + // As part of the packing operation, we need to rewrite them to be referenced to the global + // merged buffers list. + let buffers_offset = buffers.len(); + let canonical_chunk = chunk.clone().into_varbinview().unwrap(); + + for buffer in canonical_chunk.buffers() { + let canonical_buffer = buffer.into_canonical().unwrap().into_array(); + buffers.push(canonical_buffer); + } + + for view in canonical_chunk.view_slice() { + if view.is_inlined() { + // Inlined views can be copied directly into the output + views.push(*view); + } else { + // Referencing views must have their buffer_index adjusted with new offsets + let view_ref = view.as_view(); + views.push(BinaryView::new_view( + view.len(), + *view_ref.prefix(), + (buffers_offset as u32) + view_ref.buffer_index(), + view_ref.offset(), + )); } - })?; + } } - Ok(builder.finish(dtype.clone())) + let (view_ptr, view_len, view_cap) = views.into_raw_parts(); + // Transmute the pointer to target type u128, which is of identical size and is + // an Arrow native type. + let views_u128 = unsafe { + Vec::from_raw_parts( + std::mem::transmute::<*mut BinaryView, *mut u128>(view_ptr), + view_len, + view_cap, + ) + }; + + VarBinViewArray::try_new( + Buffer::from_vec(views_u128).into_array_data().into_array(), + buffers, + dtype.clone(), + validity, + ) } fn validity_from_chunks(chunks: &[Array], nullability: Nullability) -> Validity { diff --git a/vortex-array/src/array/constant/canonical.rs b/vortex-array/src/array/constant/canonical.rs index 0e55173adc..67b2ca76d7 100644 --- a/vortex-array/src/array/constant/canonical.rs +++ b/vortex-array/src/array/constant/canonical.rs @@ -1,5 +1,5 @@ -use std::iter; - +use arrow_array::builder::PrimitiveBuilder; +use arrow_array::types::UInt8Type; use vortex_dtype::{match_each_native_ptype, DType, Nullability, PType}; use vortex_error::{vortex_bail, VortexResult}; use vortex_scalar::{BoolScalar, Utf8Scalar}; @@ -7,9 +7,10 @@ use vortex_scalar::{BoolScalar, Utf8Scalar}; use crate::array::bool::BoolArray; use crate::array::constant::ConstantArray; use crate::array::primitive::PrimitiveArray; -use crate::array::varbin::VarBinArray; +use crate::array::varbinview::{BinaryView, VarBinViewArray}; +use crate::arrow::FromArrowArray; use crate::validity::Validity; -use crate::{ArrayDType, Canonical, IntoCanonical}; +use crate::{ArrayDType, ArrayData, Canonical, IntoArray, IntoCanonical}; impl IntoCanonical for ConstantArray { fn into_canonical(self) -> VortexResult { @@ -32,10 +33,36 @@ impl IntoCanonical for ConstantArray { let const_value = s.value().unwrap(); let bytes = const_value.as_bytes(); - return Ok(Canonical::VarBin(VarBinArray::from_iter( - iter::repeat(Some(bytes)).take(self.len()), - DType::Utf8(validity.nullability()), - ))); + let buffers = if bytes.len() <= BinaryView::MAX_INLINED_SIZE { + Vec::new() + } else { + vec![PrimitiveArray::from_vec(bytes.to_vec(), validity.clone()).into_array()] + }; + + // Repeat the same view over and over again. + let view = if bytes.len() <= BinaryView::MAX_INLINED_SIZE { + BinaryView::new_inlined(bytes) + } else { + // Create a new view using the provided byte buffer + BinaryView::new_view(bytes.len() as u32, bytes[0..4].try_into().unwrap(), 0, 0) + }; + + // Construct the Views array to be a repeating byte string of 16 bytes per entry. + let mut views = PrimitiveBuilder::::new(); + (0..self.len()) + .for_each(|_| views.append_slice(view.as_u128().to_le_bytes().as_slice())); + let views_array = + ArrayData::from_arrow(&views.finish(), self.dtype().is_nullable()).into_array(); + + return Ok(Canonical::VarBinView( + VarBinViewArray::try_new( + views_array, + buffers, + DType::Utf8(validity.nullability()), + validity, + ) + .unwrap(), + )); } if let Ok(ptype) = PType::try_from(self.scalar().dtype()) { diff --git a/vortex-array/src/array/varbin/flatten.rs b/vortex-array/src/array/varbin/flatten.rs index d00d032906..499977bd6a 100644 --- a/vortex-array/src/array/varbin/flatten.rs +++ b/vortex-array/src/array/varbin/flatten.rs @@ -1,10 +1,94 @@ +use std::sync::Arc; + +use arrow_array::builder::GenericByteViewBuilder; +use arrow_array::types::BinaryViewType; +use arrow_array::{Array, ArrayRef}; use vortex_error::VortexResult; use crate::array::varbin::VarBinArray; -use crate::{Canonical, IntoCanonical}; +use crate::array::varbinview::{BinaryView, VarBinViewArray}; +use crate::arrow::FromArrowArray; +use crate::validity::ArrayValidity; +use crate::{ArrayData, Canonical, IntoArray, IntoArrayData, IntoCanonical}; impl IntoCanonical for VarBinArray { fn into_canonical(self) -> VortexResult { - Ok(Canonical::VarBin(self)) + fn into_byteview(array: &VarBinArray) -> ArrayRef { + let mut builder = GenericByteViewBuilder::::with_capacity(array.len()); + builder.append_block( + array + .bytes() + .into_array_data() + .into_buffer() + .expect("VarBinArray::bytes array must have buffer") + .into_arrow(), + ); + + for idx in 0..array.len() { + if !array.is_valid(idx) { + builder.append_null(); + continue; + } + let start = u32::try_from(array.offset_at(idx)).unwrap(); + let end = u32::try_from(array.offset_at(idx + 1)).unwrap(); + let len = end - start; + if (len as usize) <= BinaryView::MAX_INLINED_SIZE { + // Get access to the value using the internal T type here. + let bytes = array.bytes_at(idx).unwrap(); + builder.append_value(bytes.as_slice()); + } else { + unsafe { builder.append_view_unchecked(0, start, end - start) }; + } + } + + Arc::new(builder.finish()) + } + + let arrow_array = into_byteview(&self); + let array_data = ArrayData::from_arrow(arrow_array.clone(), arrow_array.is_nullable()); + let varbinview = VarBinViewArray::try_from(array_data.into_array()) + .expect("roundtrip through Arrow must return VarBinViewArray"); + + Ok(Canonical::VarBinView(varbinview)) + } +} + +#[cfg(test)] +mod test { + use vortex_dtype::{DType, Nullability}; + + use crate::array::varbin::builder::VarBinBuilder; + use crate::validity::ArrayValidity; + use crate::IntoCanonical; + + #[test] + fn test_canonical_varbin() { + let mut varbin = VarBinBuilder::::with_capacity(10); + varbin.push_null(); + varbin.push_null(); + // inlined value + varbin.push_value("123456789012".as_bytes()); + // non-inlinable value + varbin.push_value("1234567890123".as_bytes()); + let varbin = varbin.finish(DType::Utf8(Nullability::Nullable)); + + let canonical = varbin.into_canonical().unwrap().into_varbinview().unwrap(); + + assert!(!canonical.is_valid(0)); + assert!(!canonical.is_valid(1)); + + // First value is inlined (12 bytes) + assert!(canonical.view_at(2).is_inlined()); + assert_eq!( + canonical.bytes_at(2).unwrap().as_slice(), + "123456789012".as_bytes() + ); + + // Second value is not inlined (13 bytes) + assert!(!canonical.view_at(3).is_inlined()); + assert_eq!( + canonical.bytes_at(3).unwrap().as_slice(), + "1234567890123".as_bytes() + ); } } diff --git a/vortex-array/src/array/varbinview/accessor.rs b/vortex-array/src/array/varbinview/accessor.rs index 9b1477f001..791e9e64be 100644 --- a/vortex-array/src/array/varbinview/accessor.rs +++ b/vortex-array/src/array/varbinview/accessor.rs @@ -12,8 +12,8 @@ impl ArrayAccessor<[u8]> for VarBinViewArray { f: F, ) -> VortexResult { let views = self.view_slice(); - let bytes: Vec = (0..self.metadata().data_lens.len()) - .map(|i| self.bytes(i).into_primitive()) + let bytes: Vec = (0..self.metadata().buffer_lens.len()) + .map(|i| self.buffer(i).into_primitive()) .collect::>>()?; let validity = self.logical_validity().to_null_buffer()?; @@ -21,13 +21,14 @@ impl ArrayAccessor<[u8]> for VarBinViewArray { None => { let mut iter = views.iter().map(|view| { if view.is_inlined() { - Some(unsafe { &view.inlined.data as &[u8] }) + Some(view.as_inlined().value()) } else { - let offset = unsafe { view._ref.offset as usize }; - let buffer_idx = unsafe { view._ref.buffer_index as usize }; + let view_ref = view.as_view(); + let buffer_idx = view_ref.buffer_index(); + let offset = view_ref.offset() as usize; Some( - &bytes[buffer_idx].maybe_null_slice::() - [offset..offset + view.size()], + &bytes[buffer_idx as usize].maybe_null_slice::() + [offset..offset + (view.len() as usize)], ) } }); @@ -37,13 +38,14 @@ impl ArrayAccessor<[u8]> for VarBinViewArray { let mut iter = views.iter().zip(validity.iter()).map(|(view, valid)| { if valid { if view.is_inlined() { - Some(unsafe { &view.inlined.data as &[u8] }) + Some(view.as_inlined().value()) } else { - let offset = unsafe { view._ref.offset as usize }; - let buffer_idx = unsafe { view._ref.buffer_index as usize }; + let view_ref = view.as_view(); + let buffer_idx = view_ref.buffer_index(); + let offset = view_ref.offset() as usize; Some( - &bytes[buffer_idx].maybe_null_slice::() - [offset..offset + view.size()], + &bytes[buffer_idx as usize].maybe_null_slice::() + [offset..offset + (view.len() as usize)], ) } } else { diff --git a/vortex-array/src/array/varbinview/compute.rs b/vortex-array/src/array/varbinview/compute.rs index 9da43bec89..360bff7e74 100644 --- a/vortex-array/src/array/varbinview/compute.rs +++ b/vortex-array/src/array/varbinview/compute.rs @@ -3,11 +3,12 @@ use vortex_error::VortexResult; use vortex_scalar::Scalar; use crate::array::varbin::varbin_scalar; -use crate::array::varbinview::{VarBinViewArray, VIEW_SIZE}; +use crate::array::varbinview::{VarBinViewArray, VIEW_SIZE_BYTES}; +use crate::arrow::FromArrowArray; use crate::compute::unary::ScalarAtFn; -use crate::compute::{slice, ArrayCompute, SliceFn}; +use crate::compute::{slice, ArrayCompute, SliceFn, TakeFn}; use crate::validity::ArrayValidity; -use crate::{Array, ArrayDType, IntoArray}; +use crate::{Array, ArrayDType, ArrayData, IntoArray, IntoCanonical}; impl ArrayCompute for VarBinViewArray { fn scalar_at(&self) -> Option<&dyn ScalarAtFn> { @@ -17,6 +18,10 @@ impl ArrayCompute for VarBinViewArray { fn slice(&self) -> Option<&dyn SliceFn> { Some(self) } + + fn take(&self) -> Option<&dyn TakeFn> { + Some(self) + } } impl ScalarAtFn for VarBinViewArray { @@ -33,9 +38,13 @@ impl ScalarAtFn for VarBinViewArray { impl SliceFn for VarBinViewArray { fn slice(&self, start: usize, stop: usize) -> VortexResult { Ok(Self::try_new( - slice(&self.views(), start * VIEW_SIZE, stop * VIEW_SIZE)?, - (0..self.metadata().data_lens.len()) - .map(|i| self.bytes(i)) + slice( + &self.views(), + start * VIEW_SIZE_BYTES, + stop * VIEW_SIZE_BYTES, + )?, + (0..self.metadata().buffer_lens.len()) + .map(|i| self.buffer(i)) .collect::>(), self.dtype().clone(), self.validity().slice(start, stop)?, @@ -43,3 +52,15 @@ impl SliceFn for VarBinViewArray { .into_array()) } } + +impl TakeFn for VarBinViewArray { + fn take(&self, indices: &Array) -> VortexResult { + let array_arrow = self.clone().into_canonical()?.into_arrow(); + let indices_arrow = indices.clone().into_canonical()?.into_arrow(); + + let take_arrow = arrow_select::take::take(&array_arrow, &indices_arrow, None)?; + let nullable = take_arrow.is_nullable(); + + Ok(ArrayData::from_arrow(take_arrow, nullable).into_array()) + } +} diff --git a/vortex-array/src/array/varbinview/mod.rs b/vortex-array/src/array/varbinview/mod.rs index 12addc709e..3726bffe54 100644 --- a/vortex-array/src/array/varbinview/mod.rs +++ b/vortex-array/src/array/varbinview/mod.rs @@ -1,5 +1,4 @@ use std::fmt::{Debug, Formatter}; -use std::ops::Deref; use std::sync::Arc; use std::{mem, slice}; @@ -7,13 +6,12 @@ use ::serde::{Deserialize, Serialize}; use arrow_array::builder::{BinaryViewBuilder, StringViewBuilder}; use arrow_array::{ArrayRef, BinaryViewArray, StringViewArray}; use arrow_buffer::ScalarBuffer; -use arrow_schema::DataType; use itertools::Itertools; +use static_assertions::{assert_eq_align, assert_eq_size}; use vortex_dtype::{DType, PType}; use vortex_error::{vortex_bail, VortexResult}; use crate::array::primitive::PrimitiveArray; -use crate::array::varbin::VarBinArray; use crate::arrow::FromArrowArray; use crate::compute::slice; use crate::stats::StatsSet; @@ -50,6 +48,11 @@ impl Inlined { inlined.data[..value.len()].copy_from_slice(value); inlined } + + #[inline] + pub fn value(&self) -> &[u8] { + &self.data[0..(self.size as usize)] + } } #[derive(Clone, Copy, Debug)] @@ -70,6 +73,21 @@ impl Ref { offset, } } + + #[inline] + pub fn buffer_index(&self) -> u32 { + self.buffer_index + } + + #[inline] + pub fn offset(&self) -> u32 { + self.offset + } + + #[inline] + pub fn prefix(&self) -> &[u8; 4] { + &self.prefix + } } #[derive(Clone, Copy)] @@ -79,16 +97,59 @@ pub union BinaryView { _ref: Ref, } +/// BinaryView must be 16 bytes and have 8 byte alignment +assert_eq_size!(BinaryView, [u8; 16]); +assert_eq_size!(Inlined, [u8; 16]); +assert_eq_size!(Ref, [u8; 16]); +assert_eq_align!(BinaryView, u64); + impl BinaryView { pub const MAX_INLINED_SIZE: usize = 12; + pub fn new_inlined(value: &[u8]) -> Self { + assert!( + value.len() <= Self::MAX_INLINED_SIZE, + "expected inlined value to be <= 12 bytes, was {}", + value.len() + ); + + Self { + inlined: Inlined::new(value), + } + } + + /// Create a new view over bytes stored in a block. + pub fn new_view(len: u32, prefix: [u8; 4], block: u32, offset: u32) -> Self { + Self { + _ref: Ref::new(len, prefix, block, offset), + } + } + + #[inline] + pub fn len(&self) -> u32 { + unsafe { self.inlined.size } + } + #[inline] - pub fn size(&self) -> usize { - unsafe { self.inlined.size as usize } + pub fn is_empty(&self) -> bool { + self.len() > 0 } + #[inline] pub fn is_inlined(&self) -> bool { - unsafe { self.inlined.size <= Self::MAX_INLINED_SIZE as u32 } + self.len() <= (Self::MAX_INLINED_SIZE as u32) + } + + pub fn as_inlined(&self) -> &Inlined { + unsafe { &self.inlined } + } + + pub fn as_view(&self) -> &Ref { + unsafe { &self._ref } + } + + pub fn as_u128(&self) -> u128 { + unsafe { mem::transmute::(*self) } } } @@ -96,29 +157,53 @@ impl Debug for BinaryView { fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { let mut s = f.debug_struct("BinaryView"); if self.is_inlined() { - s.field("inline", unsafe { &self.inlined }); + s.field("inline", &"i".to_string()); } else { - s.field("ref", unsafe { &self._ref }); + s.field("ref", &"r".to_string()); } s.finish() } } // reminder: views are 16 bytes with 8-byte alignment -pub(crate) const VIEW_SIZE: usize = mem::size_of::(); +pub(crate) const VIEW_SIZE_BYTES: usize = size_of::(); impl_encoding!("vortex.varbinview", 5u16, VarBinView); #[derive(Debug, Clone, Serialize, Deserialize)] pub struct VarBinViewMetadata { + // Validity metadata validity: ValidityMetadata, - data_lens: Vec, + + // Length of each buffer. The buffers are primitive byte arrays containing the raw string/binary + // data referenced by views. + buffer_lens: Vec, +} + +pub struct Buffers<'a> { + index: u32, + n_buffers: u32, + array: &'a VarBinViewArray, +} + +impl<'a> Iterator for Buffers<'a> { + type Item = Array; + + fn next(&mut self) -> Option { + if self.index >= self.n_buffers { + return None; + } + + let bytes = self.array.buffer(self.index as usize); + self.index += 1; + Some(bytes) + } } impl VarBinViewArray { pub fn try_new( views: Array, - data: Vec, + buffers: Vec, dtype: DType, validity: Validity, ) -> VortexResult { @@ -126,7 +211,7 @@ impl VarBinViewArray { vortex_bail!(MismatchedTypes: "u8", views.dtype()); } - for d in data.iter() { + for d in buffers.iter() { if !matches!(d.dtype(), &DType::BYTES) { vortex_bail!(MismatchedTypes: "u8", d.dtype()); } @@ -140,15 +225,15 @@ impl VarBinViewArray { vortex_bail!("incorrect validity {:?}", validity); } - let num_views = views.len() / VIEW_SIZE; + let num_views = views.len() / VIEW_SIZE_BYTES; let metadata = VarBinViewMetadata { validity: validity.to_metadata(num_views)?, - data_lens: data.iter().map(|a| a.len()).collect_vec(), + buffer_lens: buffers.iter().map(|a| a.len()).collect_vec(), }; - let mut children = Vec::with_capacity(data.len() + 2); + let mut children = Vec::with_capacity(buffers.len() + 2); children.push(views); - children.extend(data); + children.extend(buffers); if let Some(a) = validity.into_array() { children.push(a) } @@ -156,39 +241,79 @@ impl VarBinViewArray { Self::try_from_parts(dtype, num_views, metadata, children.into(), StatsSet::new()) } - fn view_slice(&self) -> &[BinaryView] { + /// Number of raw string data buffers held by this array. + pub fn buffer_count(&self) -> usize { + self.metadata().buffer_lens.len() + } + + /// Access to the underlying `views` child array as a slice of [BinaryView] structures. + /// + /// This is useful for iteration over the values, as well as for applying filters that may + /// only require hitting the prefixes or inline strings. + pub fn view_slice(&self) -> &[BinaryView] { unsafe { slice::from_raw_parts( PrimitiveArray::try_from(self.views()) .expect("Views must be a primitive array") .maybe_null_slice::() .as_ptr() as _, - self.views().len() / VIEW_SIZE, + self.views().len() / VIEW_SIZE_BYTES, ) } } - fn view_at(&self, index: usize) -> BinaryView { + pub fn view_at(&self, index: usize) -> BinaryView { self.view_slice()[index] } + /// Access to the primitive views array. + /// + /// Variable-sized binary view arrays contain a "view" child array, with 16-byte entries that + /// contain either a pointer into one of the array's owned `buffer`s OR an inlined copy of + /// the string (if the string has 12 bytes or fewer). #[inline] pub fn views(&self) -> Array { self.array() - .child(0, &DType::BYTES, self.len() * VIEW_SIZE) - .expect("missing views") + .child(0, &DType::BYTES, self.len() * VIEW_SIZE_BYTES) + .unwrap() } + /// Access one of the backing data buffers. + /// + /// # Panics + /// + /// This method panics if the provided index is out of bounds for the set of buffers provided + /// at construction time. #[inline] - pub fn bytes(&self, idx: usize) -> Array { + pub fn buffer(&self, idx: usize) -> Array { self.array() - .child(idx + 1, &DType::BYTES, self.metadata().data_lens[idx]) + .child(idx + 1, &DType::BYTES, self.metadata().buffer_lens[idx]) .expect("Missing data buffer") } + /// Retrieve an iterator over the raw data buffers. + /// These are the BYTE buffers that make up the array's contents. + /// + /// Example + /// + /// ``` + /// use vortex::array::varbinview::VarBinViewArray; + /// let array = VarBinViewArray::from_iter_str(["a", "b", "c"]); + /// array.buffers().for_each(|block| { + /// // Do something with the `block` + /// }); + /// ``` + pub fn buffers(&self) -> Buffers { + Buffers { + index: 0, + n_buffers: self.buffer_count().try_into().unwrap(), + array: self, + } + } + pub fn validity(&self) -> Validity { self.metadata().validity.to_validity(self.array().child( - self.metadata().data_lens.len() + 1, + self.metadata().buffer_lens.len() + 1, &Validity::DTYPE, self.len(), )) @@ -235,20 +360,21 @@ impl VarBinViewArray { VarBinViewArray::try_from(array_data.into_array()).expect("should be var bin view array") } + // TODO(aduffy): do we really need to do this with copying? pub fn bytes_at(&self, index: usize) -> VortexResult> { let view = self.view_at(index); - unsafe { - if view.inlined.size > 12 { - let data_buf = slice( - &self.bytes(view._ref.buffer_index as usize), - view._ref.offset as usize, - (view._ref.size + view._ref.offset) as usize, - )? - .into_primitive()?; - Ok(data_buf.maybe_null_slice::().to_vec()) - } else { - Ok(view.inlined.data[..view.inlined.size as usize].to_vec()) - } + // Expect this to be the common case: strings > 12 bytes. + if !view.is_inlined() { + let view_ref = view.as_view(); + let data_buf = slice( + &self.buffer(view_ref.buffer_index() as usize), + view_ref.offset() as usize, + (view.len() + view_ref.offset()) as usize, + )? + .into_primitive()?; + Ok(data_buf.maybe_null_slice::().to_vec()) + } else { + Ok(view.as_inlined().value().to_vec()) } } } @@ -258,29 +384,30 @@ impl ArrayTrait for VarBinViewArray {} impl IntoCanonical for VarBinViewArray { fn into_canonical(self) -> VortexResult { let nullable = self.dtype().is_nullable(); - let arrow_self = as_arrow(self); - let arrow_varbin = arrow_cast::cast(arrow_self.deref(), &DataType::Utf8) - .expect("Utf8View must cast to Ut8f"); - let vortex_array = ArrayData::from_arrow(arrow_varbin, nullable).into_array(); + let arrow_self = varbinview_as_arrow(self); + let vortex_array = ArrayData::from_arrow(arrow_self, nullable).into_array(); - Ok(Canonical::VarBin(VarBinArray::try_from(&vortex_array)?)) + Ok(Canonical::VarBinView(VarBinViewArray::try_from( + &vortex_array, + )?)) } } -fn as_arrow(var_bin_view: VarBinViewArray) -> ArrayRef { +pub(crate) fn varbinview_as_arrow(var_bin_view: VarBinViewArray) -> ArrayRef { // Views should be buffer of u8 let views = var_bin_view .views() .into_primitive() .expect("views must be primitive"); assert_eq!(views.ptype(), PType::U8); + let nulls = var_bin_view .logical_validity() .to_null_buffer() .expect("null buffer"); - let data = (0..var_bin_view.metadata().data_lens.len()) - .map(|i| var_bin_view.bytes(i).into_primitive()) + let data = (0..var_bin_view.buffer_count()) + .map(|i| var_bin_view.buffer(i).into_primitive()) .collect::>>() .expect("bytes arrays must be primitive"); if !data.is_empty() { @@ -295,16 +422,20 @@ fn as_arrow(var_bin_view: VarBinViewArray) -> ArrayRef { // Switch on Arrow DType. match var_bin_view.dtype() { - DType::Binary(_) => Arc::new(BinaryViewArray::new( - ScalarBuffer::::from(views.buffer().clone().into_arrow()), - data, - nulls, - )), - DType::Utf8(_) => Arc::new(StringViewArray::new( - ScalarBuffer::::from(views.buffer().clone().into_arrow()), - data, - nulls, - )), + DType::Binary(_) => Arc::new(unsafe { + BinaryViewArray::new_unchecked( + ScalarBuffer::::from(views.buffer().clone().into_arrow()), + data, + nulls, + ) + }), + DType::Utf8(_) => Arc::new(unsafe { + StringViewArray::new_unchecked( + ScalarBuffer::::from(views.buffer().clone().into_arrow()), + data, + nulls, + ) + }), _ => panic!("expected utf8 or binary, got {}", var_bin_view.dtype()), } } @@ -322,8 +453,8 @@ impl ArrayValidity for VarBinViewArray { impl AcceptArrayVisitor for VarBinViewArray { fn accept(&self, visitor: &mut dyn ArrayVisitor) -> VortexResult<()> { visitor.visit_child("views", &self.views())?; - for i in 0..self.metadata().data_lens.len() { - visitor.visit_child(format!("bytes_{i}").as_str(), &self.bytes(i))?; + for i in 0..self.metadata().buffer_lens.len() { + visitor.visit_child(format!("bytes_{i}").as_str(), &self.buffer(i))?; } visitor.visit_validity(&self.validity()) } @@ -357,7 +488,7 @@ impl<'a> FromIterator> for VarBinViewArray { mod test { use vortex_scalar::Scalar; - use crate::array::varbinview::{BinaryView, Inlined, Ref, VarBinViewArray, VIEW_SIZE}; + use crate::array::varbinview::{BinaryView, VarBinViewArray, VIEW_SIZE_BYTES}; use crate::compute::slice; use crate::compute::unary::scalar_at; use crate::{Canonical, IntoArray, IntoCanonical}; @@ -397,7 +528,7 @@ mod test { let binary_arr = VarBinViewArray::from_iter_str(["string1", "string2"]); let flattened = binary_arr.into_canonical().unwrap(); - assert!(matches!(flattened, Canonical::VarBin(_))); + assert!(matches!(flattened, Canonical::VarBinView(_))); let var_bin = flattened.into_array(); assert_eq!(scalar_at(&var_bin, 0).unwrap(), Scalar::from("string1")); @@ -406,10 +537,8 @@ mod test { #[test] pub fn binary_view_size_and_alignment() { - assert_eq!(std::mem::size_of::(), 16); - assert_eq!(std::mem::size_of::(), 16); - assert_eq!(std::mem::size_of::(), VIEW_SIZE); - assert_eq!(std::mem::size_of::(), 16); - assert_eq!(std::mem::align_of::(), 8); + assert_eq!(size_of::(), VIEW_SIZE_BYTES); + assert_eq!(size_of::(), 16); + assert_eq!(align_of::(), 8); } } diff --git a/vortex-array/src/canonical.rs b/vortex-array/src/canonical.rs index 1b122e6465..d6510c5d38 100644 --- a/vortex-array/src/canonical.rs +++ b/vortex-array/src/canonical.rs @@ -5,16 +5,15 @@ use arrow_array::types::{ UInt32Type, UInt64Type, UInt8Type, }; use arrow_array::{ - ArrayRef, ArrowPrimitiveType, BinaryArray, BooleanArray as ArrowBoolArray, Date32Array, - Date64Array, LargeBinaryArray, LargeStringArray, NullArray as ArrowNullArray, - PrimitiveArray as ArrowPrimitiveArray, StringArray, StructArray as ArrowStructArray, - Time32MillisecondArray, Time32SecondArray, Time64MicrosecondArray, Time64NanosecondArray, - TimestampMicrosecondArray, TimestampMillisecondArray, TimestampNanosecondArray, - TimestampSecondArray, + ArrayRef, ArrowPrimitiveType, BooleanArray as ArrowBoolArray, Date32Array, Date64Array, + NullArray as ArrowNullArray, PrimitiveArray as ArrowPrimitiveArray, + StructArray as ArrowStructArray, Time32MillisecondArray, Time32SecondArray, + Time64MicrosecondArray, Time64NanosecondArray, TimestampMicrosecondArray, + TimestampMillisecondArray, TimestampNanosecondArray, TimestampSecondArray, }; use arrow_buffer::ScalarBuffer; use arrow_schema::{Field, Fields}; -use vortex_dtype::{DType, NativePType, PType}; +use vortex_dtype::{NativePType, PType}; use vortex_error::{vortex_bail, VortexResult}; use crate::array::bool::BoolArray; @@ -24,13 +23,12 @@ use crate::array::extension::ExtensionArray; use crate::array::null::NullArray; use crate::array::primitive::PrimitiveArray; use crate::array::struct_::StructArray; -use crate::array::varbin::VarBinArray; -use crate::arrow::wrappers::as_offset_buffer; +use crate::array::varbinview::{varbinview_as_arrow, VarBinViewArray}; use crate::compute::unary::try_cast; use crate::encoding::ArrayEncoding; use crate::validity::ArrayValidity; use crate::variants::StructArrayTrait; -use crate::{Array, ArrayDType, IntoArray, ToArray}; +use crate::{Array, IntoArray}; /// The set of canonical array encodings, also the set of encodings that can be transferred to /// Arrow with zero-copy. @@ -45,26 +43,20 @@ use crate::{Array, ArrayDType, IntoArray, ToArray}; /// decompress it later to pass to a compute kernel, there are multiple suitable Arrow array /// variants to hold the data. /// -/// To disambiguate, we choose a canonical physical encoding for every Vortex [`DType`], which -/// will correspond to an arrow-rs [`arrow_schema::DataType`]. +/// To disambiguate, we choose a canonical physical encoding for every Vortex +/// [`vortex_dtype::DType`], which will correspond to an arrow-rs [`arrow_schema::DataType`]. /// /// # Views support /// -/// Binary and String views are a new, better encoding format for nearly all use-cases. For now, -/// because DataFusion does not include pervasive support for compute over StringView, we opt to use -/// the [`VarBinArray`] as the canonical encoding (which corresponds to the Arrow `BinaryViewArray`). -/// -/// We expect to change this soon once DataFusion is able to finish up some initial support, which -/// is tracked in . +/// Binary and String views are a new, better encoding format for nearly all use-cases, which is why +/// we opt to use it as the canonical encoding for Utf8 and Binary data. #[derive(Debug, Clone)] pub enum Canonical { Null(NullArray), Bool(BoolArray), Primitive(PrimitiveArray), Struct(StructArray), - VarBin(VarBinArray), - // TODO(aduffy): switch to useing VarBinView instead of VarBin - // VarBinView(VarBinViewArray), + VarBinView(VarBinViewArray), Extension(ExtensionArray), } @@ -80,7 +72,7 @@ impl Canonical { Canonical::Bool(a) => bool_to_arrow(a), Canonical::Primitive(a) => primitive_to_arrow(a), Canonical::Struct(a) => struct_to_arrow(a), - Canonical::VarBin(a) => varbin_to_arrow(a), + Canonical::VarBinView(a) => varbinview_as_arrow(a), Canonical::Extension(a) => { if !is_temporal_ext_type(a.id()) { panic!("unsupported extension dtype with ID {}", a.id().as_ref()) @@ -125,9 +117,9 @@ impl Canonical { } } - pub fn into_varbin(self) -> VortexResult { + pub fn into_varbinview(self) -> VortexResult { match self { - Canonical::VarBin(a) => Ok(a), + Canonical::VarBinView(a) => Ok(a), _ => vortex_bail!(InvalidArgument: "cannot unwrap VarBinArray from {:?}", &self), } } @@ -213,77 +205,6 @@ fn struct_to_arrow(struct_array: StructArray) -> ArrayRef { Arc::new(ArrowStructArray::new(arrow_fields, field_arrays, None)) } -fn varbin_to_arrow(varbin_array: VarBinArray) -> ArrayRef { - let offsets = varbin_array - .offsets() - .into_primitive() - .expect("flatten_primitive"); - let offsets = match offsets.ptype() { - PType::I32 | PType::I64 => offsets, - // Unless it's u64, everything else can be converted into an i32. - // FIXME(ngates): do not copy offsets again - PType::U64 => offsets.reinterpret_cast(PType::I64), - PType::U32 => offsets.reinterpret_cast(PType::I32), - _ => try_cast(&offsets.to_array(), PType::I32.into()) - .expect("cast to i32") - .into_primitive() - .expect("flatten_primitive"), - }; - let nulls = varbin_array - .logical_validity() - .to_null_buffer() - .expect("null buffer"); - - let data = varbin_array - .bytes() - .into_primitive() - .expect("flatten_primitive"); - assert_eq!(data.ptype(), PType::U8); - let data = data.buffer(); - - // Switch on Arrow DType. - match varbin_array.dtype() { - DType::Binary(_) => match offsets.ptype() { - PType::I32 => Arc::new(unsafe { - BinaryArray::new_unchecked( - as_offset_buffer::(offsets), - data.clone().into_arrow(), - nulls, - ) - }), - PType::I64 => Arc::new(unsafe { - LargeBinaryArray::new_unchecked( - as_offset_buffer::(offsets), - data.clone().into_arrow(), - nulls, - ) - }), - _ => panic!("Invalid offsets type"), - }, - DType::Utf8(_) => match offsets.ptype() { - PType::I32 => Arc::new(unsafe { - StringArray::new_unchecked( - as_offset_buffer::(offsets), - data.clone().into_arrow(), - nulls, - ) - }), - PType::I64 => Arc::new(unsafe { - LargeStringArray::new_unchecked( - as_offset_buffer::(offsets), - data.clone().into_arrow(), - nulls, - ) - }), - _ => panic!("Invalid offsets type"), - }, - _ => panic!( - "expected utf8 or binary instead of {}", - varbin_array.dtype() - ), - } -} - fn temporal_to_arrow(temporal_array: TemporalArray) -> ArrayRef { macro_rules! extract_temporal_values { ($values:expr, $prim:ty) => {{ @@ -379,7 +300,7 @@ pub trait IntoArrayVariant { fn into_struct(self) -> VortexResult; - fn into_varbin(self) -> VortexResult; + fn into_varbinview(self) -> VortexResult; fn into_extension(self) -> VortexResult; } @@ -404,8 +325,8 @@ where self.into_canonical()?.into_struct() } - fn into_varbin(self) -> VortexResult { - self.into_canonical()?.into_varbin() + fn into_varbinview(self) -> VortexResult { + self.into_canonical()?.into_varbinview() } fn into_extension(self) -> VortexResult { @@ -437,7 +358,7 @@ impl IntoArray for Canonical { Self::Bool(a) => a.into_array(), Self::Primitive(a) => a.into_array(), Self::Struct(a) => a.into_array(), - Self::VarBin(a) => a.into_array(), + Self::VarBinView(a) => a.into_array(), Self::Extension(a) => a.into_array(), } } diff --git a/vortex-array/src/lib.rs b/vortex-array/src/lib.rs index 5a6adb6195..5817bdf8dc 100644 --- a/vortex-array/src/lib.rs +++ b/vortex-array/src/lib.rs @@ -1,3 +1,5 @@ +#![feature(vec_into_raw_parts)] + //! Vortex crate containing core logic for encoding and memory representation of [arrays](Array). //! //! At the heart of Vortex are [arrays](Array) and [encodings](crate::encoding::ArrayEncoding). diff --git a/vortex-datafusion/src/datatype.rs b/vortex-datafusion/src/datatype.rs index 0a04e4e875..5876a3a5f5 100644 --- a/vortex-datafusion/src/datatype.rs +++ b/vortex-datafusion/src/datatype.rs @@ -67,8 +67,8 @@ pub(crate) fn infer_data_type(dtype: &DType) -> DataType { PType::F32 => DataType::Float32, PType::F64 => DataType::Float64, }, - DType::Utf8(_) => DataType::Utf8, - DType::Binary(_) => DataType::Binary, + DType::Utf8(_) => DataType::Utf8View, + DType::Binary(_) => DataType::BinaryView, DType::Struct(struct_dtype, _) => { let mut fields = Vec::with_capacity(struct_dtype.names().len()); for (field_name, field_dt) in struct_dtype @@ -154,12 +154,12 @@ mod test { assert_eq!( infer_data_type(&DType::Utf8(Nullability::NonNullable)), - DataType::Utf8 + DataType::Utf8View ); assert_eq!( infer_data_type(&DType::Binary(Nullability::NonNullable)), - DataType::Binary + DataType::BinaryView ); assert_eq!( @@ -184,7 +184,7 @@ mod test { )), DataType::Struct(Fields::from(vec![ FieldRef::from(Field::new("field_a", DataType::Boolean, false)), - FieldRef::from(Field::new("field_b", DataType::Utf8, true)), + FieldRef::from(Field::new("field_b", DataType::Utf8View, true)), ])) ); } @@ -207,7 +207,7 @@ mod test { infer_schema(&schema_nonnull), Schema::new(Fields::from(vec![ Field::new("field_a", DataType::Boolean, false), - Field::new("field_b", DataType::Utf8, false), + Field::new("field_b", DataType::Utf8View, false), Field::new("field_c", DataType::Int32, true), ])) ); diff --git a/vortex-datafusion/src/lib.rs b/vortex-datafusion/src/lib.rs index af4cd1be7f..dbbf3be9d3 100644 --- a/vortex-datafusion/src/lib.rs +++ b/vortex-datafusion/src/lib.rs @@ -12,9 +12,9 @@ use std::task::{Context, Poll}; use arrow_array::{RecordBatch, StructArray as ArrowStructArray}; use arrow_schema::{DataType, SchemaRef}; use async_trait::async_trait; +use datafusion::catalog::Session; use datafusion::dataframe::DataFrame; use datafusion::datasource::TableProvider; -use datafusion::execution::context::SessionState; use datafusion::execution::{RecordBatchStream, SendableRecordBatchStream, TaskContext}; use datafusion::prelude::SessionContext; use datafusion_common::tree_node::{TreeNode, TreeNodeRecursion}; @@ -170,7 +170,7 @@ impl TableProvider for VortexMemTable { /// The array is flattened directly into the nearest Arrow-compatible encoding. async fn scan( &self, - state: &SessionState, + state: &dyn Session, projection: Option<&Vec>, filters: &[Expr], _limit: Option, @@ -281,7 +281,7 @@ fn make_filter_then_take_plan( filter_projection: Vec, chunked_array: ChunkedArray, output_projection: Vec, - _session_state: &SessionState, + _session_state: &dyn Session, ) -> Arc { let row_selector_op = Arc::new(RowSelectorExec::new( filter_exprs, @@ -478,25 +478,21 @@ mod test { use datafusion_expr::{and, col, lit, BinaryExpr, Expr, Operator}; use vortex::array::primitive::PrimitiveArray; use vortex::array::struct_::StructArray; - use vortex::array::varbin::VarBinArray; + use vortex::array::varbinview::VarBinViewArray; use vortex::validity::Validity; use vortex::{Array, IntoArray}; - use vortex_dtype::{DType, Nullability}; use crate::{can_be_pushed_down, SessionContextExt, VortexMemTableOptions}; fn presidents_array() -> Array { - let names = VarBinArray::from_vec( - vec![ - "Washington", - "Adams", - "Jefferson", - "Madison", - "Monroe", - "Adams", - ], - DType::Utf8(Nullability::NonNullable), - ); + let names = VarBinViewArray::from_iter_str(vec![ + "Washington", + "Adams", + "Jefferson", + "Madison", + "Monroe", + "Adams", + ]); let term_start = PrimitiveArray::from_vec( vec![1789u16, 1797, 1801, 1809, 1817, 1825], Validity::NonNullable, diff --git a/vortex-sampling-compressor/src/compressors/dict.rs b/vortex-sampling-compressor/src/compressors/dict.rs index 01d29a88fd..b2fa0b5361 100644 --- a/vortex-sampling-compressor/src/compressors/dict.rs +++ b/vortex-sampling-compressor/src/compressors/dict.rs @@ -2,7 +2,7 @@ use vortex::array::primitive::{Primitive, PrimitiveArray}; use vortex::array::varbin::{VarBin, VarBinArray}; use vortex::stats::ArrayStatistics; use vortex::{Array, ArrayDef, IntoArray}; -use vortex_dict::{dict_encode_primitive, dict_encode_varbin, Dict, DictArray}; +use vortex_dict::{dict_encode, dict_encode_primitive, Dict, DictArray}; use vortex_error::VortexResult; use crate::compressors::{CompressedArray, CompressionTree, EncodingCompressor}; @@ -49,7 +49,7 @@ impl EncodingCompressor for DictCompressor { } VarBin::ID => { let vb = VarBinArray::try_from(array)?; - let (codes, values) = dict_encode_varbin(&vb); + let (codes, values) = dict_encode(&vb); (codes.into_array(), values.into_array()) }