Skip to content

Commit

Permalink
Merge branch 'main' into feat/col-in-arr
Browse files Browse the repository at this point in the history
  • Loading branch information
roeap committed Feb 4, 2025
2 parents edeacfc + 6a82a57 commit fcdf565
Show file tree
Hide file tree
Showing 9 changed files with 86 additions and 21 deletions.
2 changes: 1 addition & 1 deletion ffi/examples/read-table/arrow.c
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ static GArrowRecordBatch* add_partition_columns(
char* col = partition_cols->cols[i];
guint pos = cols + i;
KernelStringSlice key = { col, strlen(col) };
char* partition_val = get_from_map(partition_values, key, allocate_string);
char* partition_val = get_from_string_map(partition_values, key, allocate_string);
print_diag(
" Adding partition column '%s' with value '%s' at column %u\n",
col,
Expand Down
7 changes: 4 additions & 3 deletions ffi/examples/read-table/read_table.c
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ void print_partition_info(struct EngineContext* context, const CStringMap* parti
for (uintptr_t i = 0; i < context->partition_cols->len; i++) {
char* col = context->partition_cols->cols[i];
KernelStringSlice key = { col, strlen(col) };
char* partition_val = get_from_map(partition_values, key, allocate_string);
char* partition_val = get_from_string_map(partition_values, key, allocate_string);
if (partition_val) {
print_diag(" partition '%s' here: %s\n", col, partition_val);
free(partition_val);
Expand Down Expand Up @@ -87,14 +87,15 @@ void scan_row_callback(
void do_visit_scan_data(
void* engine_context,
ExclusiveEngineData* engine_data,
KernelBoolSlice selection_vec)
KernelBoolSlice selection_vec,
const CTransforms* transforms)
{
print_diag("\nScan iterator found some data to read\n Of this data, here is "
"a selection vector\n");
print_selection_vector(" ", &selection_vec);
// Ask kernel to iterate each individual file and call us back with extracted metadata
print_diag("Asking kernel to call us back for each scan row (file to read)\n");
visit_scan_data(engine_data, selection_vec, engine_context, scan_row_callback);
visit_scan_data(engine_data, selection_vec, transforms, engine_context, scan_row_callback);
free_bool_slice(selection_vec);
free_engine_data(engine_data);
}
Expand Down
26 changes: 21 additions & 5 deletions ffi/src/scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use delta_kernel::scan::state::{visit_scan_files, DvInfo, GlobalScanState};
use delta_kernel::scan::{Scan, ScanData};
use delta_kernel::schema::Schema;
use delta_kernel::snapshot::Snapshot;
use delta_kernel::{DeltaResult, Error};
use delta_kernel::{DeltaResult, Error, ExpressionRef};
use delta_kernel_ffi_macros::handle_descriptor;
use tracing::debug;
use url::Url;
Expand Down Expand Up @@ -211,6 +211,7 @@ pub unsafe extern "C" fn kernel_scan_data_next(
engine_context: NullableCvoid,
engine_data: Handle<ExclusiveEngineData>,
selection_vector: KernelBoolSlice,
transforms: &CTransforms,
),
) -> ExternResult<bool> {
let data = unsafe { data.as_ref() };
Expand All @@ -224,15 +225,17 @@ fn kernel_scan_data_next_impl(
engine_context: NullableCvoid,
engine_data: Handle<ExclusiveEngineData>,
selection_vector: KernelBoolSlice,
transforms: &CTransforms,
),
) -> DeltaResult<bool> {
let mut data = data
.data
.lock()
.map_err(|_| Error::generic("poisoned mutex"))?;
if let Some((data, sel_vec, _transforms)) = data.next().transpose()? {
if let Some((data, sel_vec, transforms)) = data.next().transpose()? {
let bool_slice = KernelBoolSlice::from(sel_vec);
(engine_visitor)(engine_context, data.into(), bool_slice);
let transform_map = CTransforms { transforms };
(engine_visitor)(engine_context, data.into(), bool_slice, &transform_map);
Ok(true)
} else {
Ok(false)
Expand Down Expand Up @@ -288,7 +291,7 @@ impl From<HashMap<String, String>> for CStringMap {
/// # Safety
///
/// The engine is responsible for providing a valid [`CStringMap`] pointer and [`KernelStringSlice`]
pub unsafe extern "C" fn get_from_map(
pub unsafe extern "C" fn get_from_string_map(
map: &CStringMap,
key: KernelStringSlice,
allocate_fn: AllocateStringFn,
Expand All @@ -300,6 +303,10 @@ pub unsafe extern "C" fn get_from_map(
.and_then(|v| allocate_fn(kernel_string_slice!(v)))
}

pub struct CTransforms {
transforms: Vec<Option<ExpressionRef>>,
}

/// Get a selection vector out of a [`DvInfo`] struct
///
/// # Safety
Expand Down Expand Up @@ -362,6 +369,7 @@ fn rust_callback(
size: i64,
kernel_stats: Option<delta_kernel::scan::state::Stats>,
dv_info: DvInfo,
_transform: Option<ExpressionRef>,
partition_values: HashMap<String, String>,
) {
let partition_map = CStringMap {
Expand Down Expand Up @@ -395,6 +403,7 @@ struct ContextWrapper {
pub unsafe extern "C" fn visit_scan_data(
data: Handle<ExclusiveEngineData>,
selection_vec: KernelBoolSlice,
transforms: &CTransforms,
engine_context: NullableCvoid,
callback: CScanCallback,
) {
Expand All @@ -405,5 +414,12 @@ pub unsafe extern "C" fn visit_scan_data(
callback,
};
// TODO: return ExternResult to caller instead of panicking?
visit_scan_files(data, selection_vec, context_wrapper, rust_callback).unwrap();
visit_scan_files(
data,
selection_vec,
&transforms.transforms,
context_wrapper,
rust_callback,
)
.unwrap();
}
7 changes: 5 additions & 2 deletions kernel/examples/inspect-table/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ use delta_kernel::expressions::ColumnName;
use delta_kernel::scan::state::{DvInfo, Stats};
use delta_kernel::scan::ScanBuilder;
use delta_kernel::schema::{ColumnNamesAndTypes, DataType};
use delta_kernel::{DeltaResult, Error, Table};
use delta_kernel::{DeltaResult, Error, ExpressionRef, Table};

use std::collections::HashMap;
use std::process::ExitCode;
Expand Down Expand Up @@ -163,6 +163,7 @@ fn print_scan_file(
size: i64,
stats: Option<Stats>,
dv_info: DvInfo,
transform: Option<ExpressionRef>,
partition_values: HashMap<String, String>,
) {
let num_record_str = if let Some(s) = stats {
Expand All @@ -176,6 +177,7 @@ fn print_scan_file(
Size (bytes):\t{size}\n \
Num Records:\t{num_record_str}\n \
Has DV?:\t{}\n \
Transform:\t{transform:?}\n \
Part Vals:\t{partition_values:?}",
dv_info.has_vector()
);
Expand Down Expand Up @@ -209,10 +211,11 @@ fn try_main() -> DeltaResult<()> {
let scan = ScanBuilder::new(snapshot).build()?;
let scan_data = scan.scan_data(&engine)?;
for res in scan_data {
let (data, vector, _transforms) = res?;
let (data, vector, transforms) = res?;
delta_kernel::scan::state::visit_scan_files(
data.as_ref(),
&vector,
&transforms,
(),
print_scan_file,
)?;
Expand Down
6 changes: 4 additions & 2 deletions kernel/examples/read-table-multi-threaded/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ use delta_kernel::engine::sync::SyncEngine;
use delta_kernel::scan::state::{DvInfo, GlobalScanState, Stats};
use delta_kernel::scan::transform_to_logical;
use delta_kernel::schema::Schema;
use delta_kernel::{DeltaResult, Engine, EngineData, FileMeta, Table};
use delta_kernel::{DeltaResult, Engine, EngineData, ExpressionRef, FileMeta, Table};

use clap::{Parser, ValueEnum};
use url::Url;
Expand Down Expand Up @@ -111,6 +111,7 @@ fn send_scan_file(
size: i64,
_stats: Option<Stats>,
dv_info: DvInfo,
_transform: Option<ExpressionRef>,
partition_values: HashMap<String, String>,
) {
let scan_file = ScanFile {
Expand Down Expand Up @@ -210,10 +211,11 @@ fn try_main() -> DeltaResult<()> {
drop(record_batch_tx);

for res in scan_data {
let (data, vector, _transforms) = res?;
let (data, vector, transforms) = res?;
scan_file_tx = delta_kernel::scan::state::visit_scan_files(
data.as_ref(),
&vector,
&transforms,
scan_file_tx,
send_scan_file,
)?;
Expand Down
1 change: 1 addition & 0 deletions kernel/src/scan/log_replay.rs
Original file line number Diff line number Diff line change
Expand Up @@ -347,6 +347,7 @@ mod tests {
size: i64,
stats: Option<Stats>,
_: DvInfo,
_: Option<ExpressionRef>,
part_vals: HashMap<String, String>,
) {
assert_eq!(
Expand Down
35 changes: 30 additions & 5 deletions kernel/src/scan/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -304,6 +304,16 @@ pub enum ColumnType {
/// A transform is ultimately a `Struct` expr. This holds the set of expressions that make that struct expr up
type Transform = Vec<TransformExpr>;

/// utility method making it easy to get a transform for a particular row. If the requested row is
/// outside the range of the passed slice returns `None`, otherwise returns the element at the index
/// of the specified row
pub fn get_transform_for_row(
row: usize,
transforms: &[Option<ExpressionRef>],
) -> Option<ExpressionRef> {
transforms.get(row).cloned().flatten()
}

/// Transforms aren't computed all at once. So static ones can just go straight to `Expression`, but
/// things like partition columns need to filled in. This enum holds an expression that's part of a
/// `Transform`.
Expand Down Expand Up @@ -463,6 +473,7 @@ impl Scan {
size: i64,
_: Option<Stats>,
dv_info: DvInfo,
_transform: Option<ExpressionRef>,
partition_values: HashMap<String, String>,
) {
batches.push(ScanFile {
Expand All @@ -487,9 +498,15 @@ impl Scan {
let scan_data = self.scan_data(engine.as_ref())?;
let scan_files_iter = scan_data
.map(|res| {
let (data, vec, _transforms) = res?;
let (data, vec, transforms) = res?;
let scan_files = vec![];
state::visit_scan_files(data.as_ref(), &vec, scan_files, scan_data_callback)
state::visit_scan_files(
data.as_ref(),
&vec,
&transforms,
scan_files,
scan_data_callback,
)
})
// Iterator<DeltaResult<Vec<ScanFile>>> to Iterator<DeltaResult<ScanFile>>
.flatten_ok();
Expand Down Expand Up @@ -816,11 +833,12 @@ pub(crate) mod test_utils {
);
let mut batch_count = 0;
for res in iter {
let (batch, sel, _transforms) = res.unwrap();
let (batch, sel, transforms) = res.unwrap();
assert_eq!(sel, expected_sel_vec);
crate::scan::state::visit_scan_files(
batch.as_ref(),
&sel,
&transforms,
context.clone(),
validate_callback,
)
Expand Down Expand Up @@ -1020,15 +1038,22 @@ mod tests {
_size: i64,
_: Option<Stats>,
dv_info: DvInfo,
_transform: Option<ExpressionRef>,
_partition_values: HashMap<String, String>,
) {
paths.push(path.to_string());
assert!(dv_info.deletion_vector.is_none());
}
let mut files = vec![];
for data in scan_data {
let (data, vec, _transforms) = data?;
files = state::visit_scan_files(data.as_ref(), &vec, files, scan_data_callback)?;
let (data, vec, transforms) = data?;
files = state::visit_scan_files(
data.as_ref(),
&vec,
&transforms,
files,
scan_data_callback,
)?;
}
Ok(files)
}
Expand Down
10 changes: 10 additions & 0 deletions kernel/src/scan/state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,9 @@ use std::collections::HashMap;
use std::sync::LazyLock;

use crate::actions::deletion_vector::deletion_treemap_to_bools;
use crate::scan::get_transform_for_row;
use crate::utils::require;
use crate::ExpressionRef;
use crate::{
actions::{deletion_vector::DeletionVectorDescriptor, visitors::visit_deletion_vector_at},
engine_data::{GetData, RowVisitor, TypedGetData as _},
Expand Down Expand Up @@ -104,6 +106,7 @@ pub type ScanCallback<T> = fn(
size: i64,
stats: Option<Stats>,
dv_info: DvInfo,
transform: Option<ExpressionRef>,
partition_values: HashMap<String, String>,
);

Expand Down Expand Up @@ -138,12 +141,14 @@ pub type ScanCallback<T> = fn(
pub fn visit_scan_files<T>(
data: &dyn EngineData,
selection_vector: &[bool],
transforms: &[Option<ExpressionRef>],
context: T,
callback: ScanCallback<T>,
) -> DeltaResult<T> {
let mut visitor = ScanFileVisitor {
callback,
selection_vector,
transforms,
context,
};
visitor.visit_rows_of(data)?;
Expand All @@ -154,6 +159,7 @@ pub fn visit_scan_files<T>(
struct ScanFileVisitor<'a, T> {
callback: ScanCallback<T>,
selection_vector: &'a [bool],
transforms: &'a [Option<ExpressionRef>],
context: T,
}
impl<T> RowVisitor for ScanFileVisitor<'_, T> {
Expand Down Expand Up @@ -201,6 +207,7 @@ impl<T> RowVisitor for ScanFileVisitor<'_, T> {
size,
stats,
dv_info,
get_transform_for_row(row_index, self.transforms),
partition_values,
)
}
Expand All @@ -214,6 +221,7 @@ mod tests {
use std::collections::HashMap;

use crate::scan::test_utils::{add_batch_simple, run_with_validate_callback};
use crate::ExpressionRef;

use super::{DvInfo, Stats};

Expand All @@ -228,6 +236,7 @@ mod tests {
size: i64,
stats: Option<Stats>,
dv_info: DvInfo,
transform: Option<ExpressionRef>,
part_vals: HashMap<String, String>,
) {
assert_eq!(
Expand All @@ -242,6 +251,7 @@ mod tests {
assert!(dv_info.deletion_vector.is_some());
let dv = dv_info.deletion_vector.unwrap();
assert_eq!(dv.unique_id(), "uvBn[lx{q8@P<9BNH/isA@1");
assert!(transform.is_none());
assert_eq!(context.id, 2);
}

Expand Down
13 changes: 10 additions & 3 deletions kernel/tests/read.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use delta_kernel::actions::deletion_vector::split_vector;
use delta_kernel::engine::arrow_data::ArrowEngineData;
use delta_kernel::engine::default::executor::tokio::TokioBackgroundExecutor;
use delta_kernel::engine::default::DefaultEngine;
use delta_kernel::expressions::{column_expr, BinaryOperator, Expression};
use delta_kernel::expressions::{column_expr, BinaryOperator, Expression, ExpressionRef};
use delta_kernel::scan::state::{visit_scan_files, DvInfo, Stats};
use delta_kernel::scan::{transform_to_logical, Scan};
use delta_kernel::schema::{DataType, Schema};
Expand Down Expand Up @@ -348,6 +348,7 @@ fn scan_data_callback(
size: i64,
_stats: Option<Stats>,
dv_info: DvInfo,
_transforms: Option<ExpressionRef>,
partition_values: HashMap<String, String>,
) {
batches.push(ScanFile {
Expand All @@ -369,8 +370,14 @@ fn read_with_scan_data(
let scan_data = scan.scan_data(engine)?;
let mut scan_files = vec![];
for data in scan_data {
let (data, vec, _transforms) = data?;
scan_files = visit_scan_files(data.as_ref(), &vec, scan_files, scan_data_callback)?;
let (data, vec, transforms) = data?;
scan_files = visit_scan_files(
data.as_ref(),
&vec,
&transforms,
scan_files,
scan_data_callback,
)?;
}

let mut batches = vec![];
Expand Down

0 comments on commit fcdf565

Please sign in to comment.