Skip to content

Commit

Permalink
feat: introduce feature flags to select major arrow versions (#654)
Browse files Browse the repository at this point in the history
This change introduces arrow_53 and arrow_54 feature flags on kernel
which are _required_ when using default-engine or sync-engine.
Fundamentally we must push users of the crate to select their arrow
major version through flags since Cargo _will_ include multiple major
versions in the dependency tree which can cause ABI breakages when
passing around symbols such as `RecordBatch`

See #640

---------

Signed-off-by: R. Tyler Croy <[email protected]>
  • Loading branch information
rtyler authored Feb 20, 2025
1 parent 72b585d commit 484eb33
Show file tree
Hide file tree
Showing 46 changed files with 249 additions and 296 deletions.
8 changes: 4 additions & 4 deletions .github/workflows/build.yml
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ jobs:
cargo install cargo-msrv --locked
- name: verify-msrv
run: |
cargo msrv --path kernel/ verify --all-features
cargo msrv --path kernel/ verify --features $(cat .github/workflows/default-kernel-features)
cargo msrv --path derive-macros/ verify --all-features
cargo msrv --path ffi/ verify --all-features
cargo msrv --path ffi-proc-macros/ verify --all-features
Expand Down Expand Up @@ -104,7 +104,7 @@ jobs:
- name: check kernel builds with no-default-features
run: cargo build -p delta_kernel --no-default-features
- name: build and lint with clippy
run: cargo clippy --benches --tests --all-features -- -D warnings
run: cargo clippy --benches --tests --features $(cat .github/workflows/default-kernel-features) -- -D warnings
- name: lint without default features
run: cargo clippy --no-default-features -- -D warnings
- name: check kernel builds with default-engine
Expand All @@ -129,7 +129,7 @@ jobs:
override: true
- uses: Swatinem/rust-cache@v2
- name: test
run: cargo test --workspace --verbose --all-features -- --skip read_table_version_hdfs
run: cargo test --workspace --verbose --features $(cat .github/workflows/default-kernel-features) -- --skip read_table_version_hdfs

ffi_test:
runs-on: ${{ matrix.os }}
Expand Down Expand Up @@ -229,7 +229,7 @@ jobs:
uses: taiki-e/install-action@cargo-llvm-cov
- uses: Swatinem/rust-cache@v2
- name: Generate code coverage
run: cargo llvm-cov --all-features --workspace --codecov --output-path codecov.json -- --skip read_table_version_hdfs
run: cargo llvm-cov --features $(cat .github/workflows/default-kernel-features) --workspace --codecov --output-path codecov.json -- --skip read_table_version_hdfs
- name: Upload coverage to Codecov
uses: codecov/codecov-action@v5
with:
Expand Down
1 change: 1 addition & 0 deletions .github/workflows/default-kernel-features
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
integration-test,default-engine,default-engine-rustls,cloud,arrow,sync-engine
15 changes: 0 additions & 15 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -23,21 +23,6 @@ rust-version = "1.80"
version = "0.6.1"

[workspace.dependencies]
# When changing the arrow version range, also modify ffi/Cargo.toml which has
# its own arrow version ranges witeh modified features. Failure to do so will
# result in compilation errors as two different sets of arrow dependencies may
# be sourced
arrow = { version = ">=53, <55" }
arrow-arith = { version = ">=53, <55" }
arrow-array = { version = ">=53, <55" }
arrow-buffer = { version = ">=53, <55" }
arrow-cast = { version = ">=53, <55" }
arrow-data = { version = ">=53, <55" }
arrow-ord = { version = ">=53, <55" }
arrow-json = { version = ">=53, <55" }
arrow-select = { version = ">=53, <55" }
arrow-schema = { version = ">=53, <55" }
parquet = { version = ">=53, <55", features = ["object_store"] }
object_store = { version = ">=0.11, <0.12" }
hdfs-native-object-store = "0.12.0"
hdfs-native = "0.10.0"
Expand Down
7 changes: 1 addition & 6 deletions acceptance/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,19 +14,14 @@ rust-version.workspace = true
release = false

[dependencies]
arrow-array = { workspace = true }
arrow-cast = { workspace = true }
arrow-ord = { workspace = true }
arrow-select = { workspace = true }
arrow-schema = { workspace = true }
delta_kernel = { path = "../kernel", features = [
"default-engine",
"arrow_53",
"developer-visibility",
] }
futures = "0.3"
itertools = "0.13"
object_store = { workspace = true }
parquet = { workspace = true }
serde = { version = "1", features = ["derive"] }
serde_json = "1"
thiserror = "1"
Expand Down
17 changes: 10 additions & 7 deletions acceptance/src/data.rs
Original file line number Diff line number Diff line change
@@ -1,15 +1,18 @@
use std::{path::Path, sync::Arc};

use arrow_array::{Array, RecordBatch};
use arrow_ord::sort::{lexsort_to_indices, SortColumn};
use arrow_schema::{DataType, Schema};
use arrow_select::{concat::concat_batches, filter::filter_record_batch, take::take};
use delta_kernel::arrow::array::{Array, RecordBatch};
use delta_kernel::arrow::compute::{
concat_batches, filter_record_batch, lexsort_to_indices, take, SortColumn,
};
use delta_kernel::arrow::datatypes::{DataType, Schema};

use delta_kernel::parquet::arrow::async_reader::{
ParquetObjectReader, ParquetRecordBatchStreamBuilder,
};
use delta_kernel::{engine::arrow_data::ArrowEngineData, DeltaResult, Engine, Error, Table};
use futures::{stream::TryStreamExt, StreamExt};
use itertools::Itertools;
use object_store::{local::LocalFileSystem, ObjectStore};
use parquet::arrow::async_reader::{ParquetObjectReader, ParquetRecordBatchStreamBuilder};

use crate::{TestCaseInfo, TestResult};

Expand Down Expand Up @@ -83,8 +86,8 @@ fn assert_schema_fields_match(schema: &Schema, golden: &Schema) {
fn normalize_col(col: Arc<dyn Array>) -> Arc<dyn Array> {
if let DataType::Timestamp(unit, Some(zone)) = col.data_type() {
if **zone == *"+00:00" {
arrow_cast::cast::cast(&col, &DataType::Timestamp(*unit, Some("UTC".into())))
.expect("Could not cast to UTC")
let data_type = DataType::Timestamp(*unit, Some("UTC".into()));
delta_kernel::arrow::compute::cast(&col, &data_type).expect("Could not cast to UTC")
} else {
col
}
Expand Down
2 changes: 1 addition & 1 deletion feature-tests/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ version.workspace = true
release = false

[dependencies]
delta_kernel = { path = "../kernel" }
delta_kernel = { path = "../kernel", features = ["arrow_53"] }

[features]
default-engine = [ "delta_kernel/default-engine" ]
Expand Down
15 changes: 2 additions & 13 deletions ffi/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -22,21 +22,13 @@ tracing-core = { version = "0.1", optional = true }
tracing-subscriber = { version = "0.3", optional = true, features = [ "json" ] }
url = "2"
delta_kernel = { path = "../kernel", default-features = false, features = [
"arrow",
"developer-visibility",
] }
delta_kernel_ffi_macros = { path = "../ffi-proc-macros", version = "0.6.1" }

# used if we use the default engine to be able to move arrow data into the c-ffi format
arrow-schema = { version = ">=53, <55", default-features = false, features = [
"ffi",
], optional = true }
arrow-data = { version = ">=53, <55", default-features = false, features = [
"ffi",
], optional = true }
arrow-array = { version = ">=53, <55", default-features = false, optional = true }

[build-dependencies]
cbindgen = "0.27.0"
cbindgen = "0.28"
libc = "0.2.158"

[dev-dependencies]
Expand All @@ -52,9 +44,6 @@ default = ["default-engine"]
cloud = ["delta_kernel/cloud"]
default-engine = [
"delta_kernel/default-engine",
"arrow-array",
"arrow-data",
"arrow-schema",
]
tracing = [ "tracing-core", "tracing-subscriber" ]
sync-engine = ["delta_kernel/sync-engine"]
Expand Down
2 changes: 1 addition & 1 deletion ffi/cbindgen.toml
Original file line number Diff line number Diff line change
Expand Up @@ -25,4 +25,4 @@ parse_deps = true
# only crates found in this list will ever be parsed.
#
# default: there is no allow-list (NOTE: this is the opposite of [])
include = ["delta_kernel", "arrow-data", "arrow-schema"]
include = ["arrow", "arrow-data", "arrow-schema", "delta_kernel"]
18 changes: 11 additions & 7 deletions ffi/src/engine_data.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
//! EngineData related ffi code
use delta_kernel::arrow::array::{
ffi::{FFI_ArrowArray, FFI_ArrowSchema},
ArrayData, StructArray,
};
use delta_kernel::{DeltaResult, EngineData};
use std::ffi::c_void;

Expand Down Expand Up @@ -45,8 +49,8 @@ unsafe fn get_raw_engine_data_impl(data: &mut Handle<ExclusiveEngineData>) -> &m
#[cfg(feature = "default-engine")]
#[repr(C)]
pub struct ArrowFFIData {
pub array: arrow_data::ffi::FFI_ArrowArray,
pub schema: arrow_schema::ffi::FFI_ArrowSchema,
pub array: FFI_ArrowArray,
pub schema: FFI_ArrowSchema,
}

// TODO: This should use a callback to avoid having to have the engine free the struct
Expand All @@ -71,16 +75,16 @@ pub unsafe extern "C" fn get_raw_arrow_data(
// TODO: This method leaks the returned pointer memory. How will the engine free it?
#[cfg(feature = "default-engine")]
fn get_raw_arrow_data_impl(data: Box<dyn EngineData>) -> DeltaResult<*mut ArrowFFIData> {
let record_batch: arrow_array::RecordBatch = data
let record_batch: delta_kernel::arrow::array::RecordBatch = data
.into_any()
.downcast::<delta_kernel::engine::arrow_data::ArrowEngineData>()
.map_err(|_| delta_kernel::Error::EngineDataType("ArrowEngineData".to_string()))?
.into();
let sa: arrow_array::StructArray = record_batch.into();
let array_data: arrow_data::ArrayData = sa.into();
let sa: StructArray = record_batch.into();
let array_data: ArrayData = sa.into();
// these call `clone`. is there a way to not copy anything and what exactly are they cloning?
let array = arrow_data::ffi::FFI_ArrowArray::new(&array_data);
let schema = arrow_schema::ffi::FFI_ArrowSchema::try_from(array_data.data_type())?;
let array = FFI_ArrowArray::new(&array_data);
let schema = FFI_ArrowSchema::try_from(array_data.data_type())?;
let ret_data = Box::new(ArrowFFIData { array, schema });
Ok(Box::leak(ret_data))
}
17 changes: 1 addition & 16 deletions integration-tests/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -6,19 +6,4 @@ edition = "2021"
[workspace]

[dependencies]
arrow = "=53.0.0"
delta_kernel = { path = "../kernel", features = ["arrow-conversion", "arrow-expression", "default-engine", "sync-engine"] }

[patch.'file:///../kernel']
arrow = "=53.0.0"
arrow-arith = "=53.0.0"
arrow-array = "=53.0.0"
arrow-buffer = "=53.0.0"
arrow-cast = "=53.0.0"
arrow-data = "=53.0.0"
arrow-ord = "=53.0.0"
arrow-json = "=53.0.0"
arrow-select = "=53.0.0"
arrow-schema = "=53.0.0"
parquet = "=53.0.0"
object_store = "=0.11.1"
delta_kernel = { path = "../kernel", features = ["default-engine", "sync-engine"] }
9 changes: 5 additions & 4 deletions integration-tests/src/main.rs
Original file line number Diff line number Diff line change
@@ -1,15 +1,16 @@
fn create_arrow_schema() -> arrow::datatypes::Schema {
use arrow::datatypes::{DataType, Field, Schema};
use delta_kernel::arrow::datatypes::{DataType, Field, Schema};

fn create_arrow_schema() -> Schema {
let field_a = Field::new("a", DataType::Int64, false);
let field_b = Field::new("b", DataType::Boolean, false);
Schema::new(vec![field_a, field_b])
}

fn create_kernel_schema() -> delta_kernel::schema::Schema {
use delta_kernel::schema::{DataType, Schema, StructField};
use delta_kernel::schema::{DataType, StructField};
let field_a = StructField::not_null("a", DataType::LONG);
let field_b = StructField::not_null("b", DataType::BOOLEAN);
Schema::new(vec![field_a, field_b])
delta_kernel::schema::Schema::new(vec![field_a, field_b])
}

fn main() {
Expand Down
33 changes: 10 additions & 23 deletions integration-tests/test-all-arrow-versions.sh
Original file line number Diff line number Diff line change
Expand Up @@ -2,38 +2,25 @@

set -eu -o pipefail

is_version_le() {
[ "$1" = "$(echo -e "$1\n$2" | sort -V | head -n1)" ]
}

is_version_lt() {
if [ "$1" = "$2" ]
then
return 1
else
is_version_le "$1" "$2"
fi
}

test_arrow_version() {
ARROW_VERSION="$1"
echo "== Testing version $ARROW_VERSION =="
sed -i'' -e "s/\(arrow[^\"]*=[^\"]*\).*/\1\"=$ARROW_VERSION\"/" Cargo.toml
sed -i'' -e "s/\(parquet[^\"]*\).*/\1\"=$ARROW_VERSION\"/" Cargo.toml
cargo clean
rm -f Cargo.lock
cargo update
cat Cargo.toml
cargo run
cargo run --features ${ARROW_VERSION}
}

MIN_ARROW_VER="53.0.0"
MAX_ARROW_VER="54.0.0"
FEATURES=$(cat ../kernel/Cargo.toml | grep -e ^arrow_ | awk '{ print $1 }' | sort -u)

for ARROW_VERSION in $(curl -s https://crates.io/api/v1/crates/arrow | jq -r '.versions[].num' | tr -d '\r')

echo "[features]" >> Cargo.toml

for ARROW_VERSION in ${FEATURES}
do
if ! is_version_lt "$ARROW_VERSION" "$MIN_ARROW_VER" && is_version_lt "$ARROW_VERSION" "$MAX_ARROW_VER"
then
test_arrow_version "$ARROW_VERSION"
fi
echo "${ARROW_VERSION} = [\"delta_kernel/${ARROW_VERSION}\"]" >> Cargo.toml
test_arrow_version $ARROW_VERSION
done

git checkout Cargo.toml
58 changes: 23 additions & 35 deletions kernel/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -58,20 +58,22 @@ visibility = "0.1.1"

# Used in the sync engine
tempfile = { version = "3", optional = true }

# Arrow supported versions
## 53
# Used in default engine
arrow-buffer = { workspace = true, optional = true }
arrow-array = { workspace = true, optional = true, features = ["chrono-tz"] }
arrow-select = { workspace = true, optional = true }
arrow-arith = { workspace = true, optional = true }
arrow-cast = { workspace = true, optional = true }
arrow-json = { workspace = true, optional = true }
arrow-ord = { workspace = true, optional = true }
arrow-schema = { workspace = true, optional = true }
arrow_53 = { package = "arrow", version = "53", features = ["chrono-tz", "ffi", "json", "prettyprint"], optional = true }
# Used in default and sync engine
parquet_53 = { package = "parquet", version = "53", features = ["async", "object_store"] , optional = true }
######
## 54
arrow_54 = { package = "arrow", version = "54", features = ["chrono-tz", "ffi", "json", "prettyprint"], optional = true }
parquet_54 = { package = "parquet", version = "54", features = ["async", "object_store"] , optional = true }
######

futures = { version = "0.3", optional = true }
object_store = { workspace = true, optional = true }
hdfs-native-object-store = { workspace = true, optional = true }
# Used in default and sync engine
parquet = { workspace = true, optional = true }
# Used for fetching direct urls (like pre-signed urls)
reqwest = { version = "0.12.8", default-features = false, optional = true }
strum = { version = "0.26", features = ["derive"] }
Expand All @@ -85,14 +87,16 @@ hdfs-native = { workspace = true, optional = true }
walkdir = { workspace = true, optional = true }

[features]
arrow-conversion = ["arrow-schema"]
arrow-expression = [
"arrow-arith",
"arrow-array",
"arrow-buffer",
"arrow-ord",
"arrow-schema",
]
# The default version to be expected
arrow = ["arrow_53"]

arrow_53 = ["dep:arrow_53", "dep:parquet_53"]

arrow_54 = ["dep:arrow_54", "dep:parquet_54"]

arrow-conversion = []
arrow-expression = []

cloud = [
"object_store/aws",
"object_store/azure",
Expand All @@ -107,16 +111,8 @@ default = []
default-engine-base = [
"arrow-conversion",
"arrow-expression",
"arrow-array",
"arrow-buffer",
"arrow-cast",
"arrow-json",
"arrow-schema",
"arrow-select",
"futures",
"object_store",
"parquet/async",
"parquet/object_store",
"tokio",
"uuid/v4",
"uuid/fast-rng",
Expand All @@ -134,13 +130,6 @@ default-engine-rustls = [

developer-visibility = []
sync-engine = [
"arrow-cast",
"arrow-conversion",
"arrow-expression",
"arrow-array",
"arrow-json",
"arrow-select",
"parquet",
"tempfile",
]
integration-test = [
Expand All @@ -156,8 +145,7 @@ version = "=0.5.9"
rustc_version = "0.4.1"

[dev-dependencies]
arrow = { workspace = true, features = ["json", "prettyprint"] }
delta_kernel = { path = ".", features = ["default-engine", "sync-engine"] }
delta_kernel = { path = ".", features = ["arrow", "default-engine", "sync-engine"] }
test_utils = { path = "../test-utils" }
paste = "1.0"
test-log = { version = "0.2", default-features = false, features = ["trace"] }
Expand Down
Loading

0 comments on commit 484eb33

Please sign in to comment.