diff --git a/ffi/examples/read-table/arrow.c b/ffi/examples/read-table/arrow.c index 7eb32b7c3..c6214df6b 100644 --- a/ffi/examples/read-table/arrow.c +++ b/ffi/examples/read-table/arrow.c @@ -66,7 +66,7 @@ static GArrowRecordBatch* add_partition_columns( char* col = partition_cols->cols[i]; guint pos = cols + i; KernelStringSlice key = { col, strlen(col) }; - char* partition_val = get_from_map(partition_values, key, allocate_string); + char* partition_val = get_from_string_map(partition_values, key, allocate_string); print_diag( " Adding partition column '%s' with value '%s' at column %u\n", col, diff --git a/ffi/examples/read-table/read_table.c b/ffi/examples/read-table/read_table.c index 7b1a7f2c7..704559a59 100644 --- a/ffi/examples/read-table/read_table.c +++ b/ffi/examples/read-table/read_table.c @@ -28,7 +28,7 @@ void print_partition_info(struct EngineContext* context, const CStringMap* parti for (uintptr_t i = 0; i < context->partition_cols->len; i++) { char* col = context->partition_cols->cols[i]; KernelStringSlice key = { col, strlen(col) }; - char* partition_val = get_from_map(partition_values, key, allocate_string); + char* partition_val = get_from_string_map(partition_values, key, allocate_string); if (partition_val) { print_diag(" partition '%s' here: %s\n", col, partition_val); free(partition_val); @@ -87,14 +87,15 @@ void scan_row_callback( void do_visit_scan_data( void* engine_context, ExclusiveEngineData* engine_data, - KernelBoolSlice selection_vec) + KernelBoolSlice selection_vec, + const CTransforms* transforms) { print_diag("\nScan iterator found some data to read\n Of this data, here is " "a selection vector\n"); print_selection_vector(" ", &selection_vec); // Ask kernel to iterate each individual file and call us back with extracted metadata print_diag("Asking kernel to call us back for each scan row (file to read)\n"); - visit_scan_data(engine_data, selection_vec, engine_context, scan_row_callback); + visit_scan_data(engine_data, selection_vec, transforms, engine_context, scan_row_callback); free_bool_slice(selection_vec); free_engine_data(engine_data); } diff --git a/ffi/src/scan.rs b/ffi/src/scan.rs index f2fee7643..73f691010 100644 --- a/ffi/src/scan.rs +++ b/ffi/src/scan.rs @@ -7,7 +7,7 @@ use delta_kernel::scan::state::{visit_scan_files, DvInfo, GlobalScanState}; use delta_kernel::scan::{Scan, ScanData}; use delta_kernel::schema::Schema; use delta_kernel::snapshot::Snapshot; -use delta_kernel::{DeltaResult, Error}; +use delta_kernel::{DeltaResult, Error, ExpressionRef}; use delta_kernel_ffi_macros::handle_descriptor; use tracing::debug; use url::Url; @@ -211,6 +211,7 @@ pub unsafe extern "C" fn kernel_scan_data_next( engine_context: NullableCvoid, engine_data: Handle, selection_vector: KernelBoolSlice, + transforms: &CTransforms, ), ) -> ExternResult { let data = unsafe { data.as_ref() }; @@ -224,15 +225,17 @@ fn kernel_scan_data_next_impl( engine_context: NullableCvoid, engine_data: Handle, selection_vector: KernelBoolSlice, + transforms: &CTransforms, ), ) -> DeltaResult { let mut data = data .data .lock() .map_err(|_| Error::generic("poisoned mutex"))?; - if let Some((data, sel_vec, _transforms)) = data.next().transpose()? { + if let Some((data, sel_vec, transforms)) = data.next().transpose()? { let bool_slice = KernelBoolSlice::from(sel_vec); - (engine_visitor)(engine_context, data.into(), bool_slice); + let transform_map = CTransforms { transforms }; + (engine_visitor)(engine_context, data.into(), bool_slice, &transform_map); Ok(true) } else { Ok(false) @@ -288,7 +291,7 @@ impl From> for CStringMap { /// # Safety /// /// The engine is responsible for providing a valid [`CStringMap`] pointer and [`KernelStringSlice`] -pub unsafe extern "C" fn get_from_map( +pub unsafe extern "C" fn get_from_string_map( map: &CStringMap, key: KernelStringSlice, allocate_fn: AllocateStringFn, @@ -300,6 +303,10 @@ pub unsafe extern "C" fn get_from_map( .and_then(|v| allocate_fn(kernel_string_slice!(v))) } +pub struct CTransforms { + transforms: Vec>, +} + /// Get a selection vector out of a [`DvInfo`] struct /// /// # Safety @@ -362,6 +369,7 @@ fn rust_callback( size: i64, kernel_stats: Option, dv_info: DvInfo, + _transform: Option, partition_values: HashMap, ) { let partition_map = CStringMap { @@ -395,6 +403,7 @@ struct ContextWrapper { pub unsafe extern "C" fn visit_scan_data( data: Handle, selection_vec: KernelBoolSlice, + transforms: &CTransforms, engine_context: NullableCvoid, callback: CScanCallback, ) { @@ -405,5 +414,12 @@ pub unsafe extern "C" fn visit_scan_data( callback, }; // TODO: return ExternResult to caller instead of panicking? - visit_scan_files(data, selection_vec, context_wrapper, rust_callback).unwrap(); + visit_scan_files( + data, + selection_vec, + &transforms.transforms, + context_wrapper, + rust_callback, + ) + .unwrap(); } diff --git a/kernel/examples/inspect-table/src/main.rs b/kernel/examples/inspect-table/src/main.rs index 243e56a5a..f5145905e 100644 --- a/kernel/examples/inspect-table/src/main.rs +++ b/kernel/examples/inspect-table/src/main.rs @@ -12,7 +12,7 @@ use delta_kernel::expressions::ColumnName; use delta_kernel::scan::state::{DvInfo, Stats}; use delta_kernel::scan::ScanBuilder; use delta_kernel::schema::{ColumnNamesAndTypes, DataType}; -use delta_kernel::{DeltaResult, Error, Table}; +use delta_kernel::{DeltaResult, Error, ExpressionRef, Table}; use std::collections::HashMap; use std::process::ExitCode; @@ -163,6 +163,7 @@ fn print_scan_file( size: i64, stats: Option, dv_info: DvInfo, + transform: Option, partition_values: HashMap, ) { let num_record_str = if let Some(s) = stats { @@ -176,6 +177,7 @@ fn print_scan_file( Size (bytes):\t{size}\n \ Num Records:\t{num_record_str}\n \ Has DV?:\t{}\n \ + Transform:\t{transform:?}\n \ Part Vals:\t{partition_values:?}", dv_info.has_vector() ); @@ -209,10 +211,11 @@ fn try_main() -> DeltaResult<()> { let scan = ScanBuilder::new(snapshot).build()?; let scan_data = scan.scan_data(&engine)?; for res in scan_data { - let (data, vector, _transforms) = res?; + let (data, vector, transforms) = res?; delta_kernel::scan::state::visit_scan_files( data.as_ref(), &vector, + &transforms, (), print_scan_file, )?; diff --git a/kernel/examples/read-table-multi-threaded/src/main.rs b/kernel/examples/read-table-multi-threaded/src/main.rs index 57011dcc9..c4e97328f 100644 --- a/kernel/examples/read-table-multi-threaded/src/main.rs +++ b/kernel/examples/read-table-multi-threaded/src/main.rs @@ -15,7 +15,7 @@ use delta_kernel::engine::sync::SyncEngine; use delta_kernel::scan::state::{DvInfo, GlobalScanState, Stats}; use delta_kernel::scan::transform_to_logical; use delta_kernel::schema::Schema; -use delta_kernel::{DeltaResult, Engine, EngineData, FileMeta, Table}; +use delta_kernel::{DeltaResult, Engine, EngineData, ExpressionRef, FileMeta, Table}; use clap::{Parser, ValueEnum}; use url::Url; @@ -111,6 +111,7 @@ fn send_scan_file( size: i64, _stats: Option, dv_info: DvInfo, + _transform: Option, partition_values: HashMap, ) { let scan_file = ScanFile { @@ -210,10 +211,11 @@ fn try_main() -> DeltaResult<()> { drop(record_batch_tx); for res in scan_data { - let (data, vector, _transforms) = res?; + let (data, vector, transforms) = res?; scan_file_tx = delta_kernel::scan::state::visit_scan_files( data.as_ref(), &vector, + &transforms, scan_file_tx, send_scan_file, )?; diff --git a/kernel/src/scan/log_replay.rs b/kernel/src/scan/log_replay.rs index 33bc87075..177996a80 100644 --- a/kernel/src/scan/log_replay.rs +++ b/kernel/src/scan/log_replay.rs @@ -347,6 +347,7 @@ mod tests { size: i64, stats: Option, _: DvInfo, + _: Option, part_vals: HashMap, ) { assert_eq!( diff --git a/kernel/src/scan/mod.rs b/kernel/src/scan/mod.rs index 4e98eea7f..7abebd0c6 100644 --- a/kernel/src/scan/mod.rs +++ b/kernel/src/scan/mod.rs @@ -304,6 +304,16 @@ pub enum ColumnType { /// A transform is ultimately a `Struct` expr. This holds the set of expressions that make that struct expr up type Transform = Vec; +/// utility method making it easy to get a transform for a particular row. If the requested row is +/// outside the range of the passed slice returns `None`, otherwise returns the element at the index +/// of the specified row +pub fn get_transform_for_row( + row: usize, + transforms: &[Option], +) -> Option { + transforms.get(row).cloned().flatten() +} + /// Transforms aren't computed all at once. So static ones can just go straight to `Expression`, but /// things like partition columns need to filled in. This enum holds an expression that's part of a /// `Transform`. @@ -463,6 +473,7 @@ impl Scan { size: i64, _: Option, dv_info: DvInfo, + _transform: Option, partition_values: HashMap, ) { batches.push(ScanFile { @@ -487,9 +498,15 @@ impl Scan { let scan_data = self.scan_data(engine.as_ref())?; let scan_files_iter = scan_data .map(|res| { - let (data, vec, _transforms) = res?; + let (data, vec, transforms) = res?; let scan_files = vec![]; - state::visit_scan_files(data.as_ref(), &vec, scan_files, scan_data_callback) + state::visit_scan_files( + data.as_ref(), + &vec, + &transforms, + scan_files, + scan_data_callback, + ) }) // Iterator>> to Iterator> .flatten_ok(); @@ -816,11 +833,12 @@ pub(crate) mod test_utils { ); let mut batch_count = 0; for res in iter { - let (batch, sel, _transforms) = res.unwrap(); + let (batch, sel, transforms) = res.unwrap(); assert_eq!(sel, expected_sel_vec); crate::scan::state::visit_scan_files( batch.as_ref(), &sel, + &transforms, context.clone(), validate_callback, ) @@ -1020,6 +1038,7 @@ mod tests { _size: i64, _: Option, dv_info: DvInfo, + _transform: Option, _partition_values: HashMap, ) { paths.push(path.to_string()); @@ -1027,8 +1046,14 @@ mod tests { } let mut files = vec![]; for data in scan_data { - let (data, vec, _transforms) = data?; - files = state::visit_scan_files(data.as_ref(), &vec, files, scan_data_callback)?; + let (data, vec, transforms) = data?; + files = state::visit_scan_files( + data.as_ref(), + &vec, + &transforms, + files, + scan_data_callback, + )?; } Ok(files) } diff --git a/kernel/src/scan/state.rs b/kernel/src/scan/state.rs index 085af15ec..c2496b682 100644 --- a/kernel/src/scan/state.rs +++ b/kernel/src/scan/state.rs @@ -4,7 +4,9 @@ use std::collections::HashMap; use std::sync::LazyLock; use crate::actions::deletion_vector::deletion_treemap_to_bools; +use crate::scan::get_transform_for_row; use crate::utils::require; +use crate::ExpressionRef; use crate::{ actions::{deletion_vector::DeletionVectorDescriptor, visitors::visit_deletion_vector_at}, engine_data::{GetData, RowVisitor, TypedGetData as _}, @@ -104,6 +106,7 @@ pub type ScanCallback = fn( size: i64, stats: Option, dv_info: DvInfo, + transform: Option, partition_values: HashMap, ); @@ -138,12 +141,14 @@ pub type ScanCallback = fn( pub fn visit_scan_files( data: &dyn EngineData, selection_vector: &[bool], + transforms: &[Option], context: T, callback: ScanCallback, ) -> DeltaResult { let mut visitor = ScanFileVisitor { callback, selection_vector, + transforms, context, }; visitor.visit_rows_of(data)?; @@ -154,6 +159,7 @@ pub fn visit_scan_files( struct ScanFileVisitor<'a, T> { callback: ScanCallback, selection_vector: &'a [bool], + transforms: &'a [Option], context: T, } impl RowVisitor for ScanFileVisitor<'_, T> { @@ -201,6 +207,7 @@ impl RowVisitor for ScanFileVisitor<'_, T> { size, stats, dv_info, + get_transform_for_row(row_index, self.transforms), partition_values, ) } @@ -214,6 +221,7 @@ mod tests { use std::collections::HashMap; use crate::scan::test_utils::{add_batch_simple, run_with_validate_callback}; + use crate::ExpressionRef; use super::{DvInfo, Stats}; @@ -228,6 +236,7 @@ mod tests { size: i64, stats: Option, dv_info: DvInfo, + transform: Option, part_vals: HashMap, ) { assert_eq!( @@ -242,6 +251,7 @@ mod tests { assert!(dv_info.deletion_vector.is_some()); let dv = dv_info.deletion_vector.unwrap(); assert_eq!(dv.unique_id(), "uvBn[lx{q8@P<9BNH/isA@1"); + assert!(transform.is_none()); assert_eq!(context.id, 2); } diff --git a/kernel/tests/read.rs b/kernel/tests/read.rs index 0ef3c6f1b..c5e8a4b5b 100644 --- a/kernel/tests/read.rs +++ b/kernel/tests/read.rs @@ -10,7 +10,7 @@ use delta_kernel::actions::deletion_vector::split_vector; use delta_kernel::engine::arrow_data::ArrowEngineData; use delta_kernel::engine::default::executor::tokio::TokioBackgroundExecutor; use delta_kernel::engine::default::DefaultEngine; -use delta_kernel::expressions::{column_expr, BinaryOperator, Expression}; +use delta_kernel::expressions::{column_expr, BinaryOperator, Expression, ExpressionRef}; use delta_kernel::scan::state::{visit_scan_files, DvInfo, Stats}; use delta_kernel::scan::{transform_to_logical, Scan}; use delta_kernel::schema::{DataType, Schema}; @@ -348,6 +348,7 @@ fn scan_data_callback( size: i64, _stats: Option, dv_info: DvInfo, + _transforms: Option, partition_values: HashMap, ) { batches.push(ScanFile { @@ -369,8 +370,14 @@ fn read_with_scan_data( let scan_data = scan.scan_data(engine)?; let mut scan_files = vec![]; for data in scan_data { - let (data, vec, _transforms) = data?; - scan_files = visit_scan_files(data.as_ref(), &vec, scan_files, scan_data_callback)?; + let (data, vec, transforms) = data?; + scan_files = visit_scan_files( + data.as_ref(), + &vec, + &transforms, + scan_files, + scan_data_callback, + )?; } let mut batches = vec![];