diff --git a/kernel/src/table_changes/log_replay.rs b/kernel/src/table_changes/log_replay.rs index 89951a39b..99ef71672 100644 --- a/kernel/src/table_changes/log_replay.rs +++ b/kernel/src/table_changes/log_replay.rs @@ -7,15 +7,17 @@ use std::sync::{Arc, LazyLock}; use crate::actions::schemas::GetStructField; use crate::actions::visitors::{visit_deletion_vector_at, ProtocolVisitor}; use crate::actions::{ - get_log_add_schema, Add, Cdc, Metadata, Protocol, Remove, ADD_NAME, CDC_NAME, METADATA_NAME, - PROTOCOL_NAME, REMOVE_NAME, + get_log_add_schema, Add, Cdc, Metadata, Protocol, Remove, ADD_NAME, CDC_NAME, COMMIT_INFO_NAME, + METADATA_NAME, PROTOCOL_NAME, REMOVE_NAME, }; use crate::engine_data::{GetData, TypedGetData}; use crate::expressions::{column_name, ColumnName}; use crate::path::ParsedLogPath; use crate::scan::data_skipping::DataSkippingFilter; use crate::scan::state::DvInfo; -use crate::schema::{ArrayType, ColumnNamesAndTypes, DataType, MapType, SchemaRef, StructType}; +use crate::schema::{ + ArrayType, ColumnNamesAndTypes, DataType, MapType, SchemaRef, StructField, StructType, +}; use crate::table_changes::scan_file::{cdf_scan_row_expression, cdf_scan_row_schema}; use crate::table_changes::{check_cdf_table_properties, ensure_cdf_read_supported}; use crate::table_properties::TableProperties; @@ -78,6 +80,12 @@ pub(crate) fn table_changes_action_iter( /// Deletion vector resolution affects whether a remove action is selected in the second /// phase, so we must perform it ahead of time in phase 1. /// - Ensure that reading is supported on any protocol updates. +/// - Extract the in-commit timestamps from [`CommitInfo`] actions if they are present. These are +/// generated when in-commit timestamps (ICT) table feature is enabled. This must be done in the +/// first phase because the second phase lazily transforms engine data with an extra timestamp +/// column, so the timestamp must be known ahead of time. Note that when ICT is enabled, CommitInfo +/// should be the first action in every commit. +/// See: https://github.com/delta-io/delta/blob/master/PROTOCOL.md#in-commit-timestamps /// - Ensure that Change Data Feed is enabled for any metadata update. See [`TableProperties`] /// - Ensure that any schema update is compatible with the provided `schema`. Currently, schema /// compatibility is checked through schema equality. This will be expanded in the future to @@ -93,12 +101,6 @@ pub(crate) fn table_changes_action_iter( /// /// See https://github.com/delta-io/delta/blob/master/PROTOCOL.md#deletion-vectors /// -/// TODO: When the kernel supports in-commit timestamps, we will also have to inspect CommitInfo -/// actions to find the timestamp. These are generated when incommit timestamps is enabled. -/// This must be done in the first phase because the second phase lazily transforms engine data with -/// an extra timestamp column. Thus, the timestamp must be known ahead of time. -/// See https://github.com/delta-io/delta-kernel-rs/issues/559 -/// /// 2. Scan file generation phase [`LogReplayScanner::into_scan_batches`]: This iterates over every /// action in the commit, and generates [`TableChangesScanData`]. It does so by transforming the /// actions using [`add_transform_expr`], and generating selection vectors with the following rules: @@ -118,14 +120,8 @@ struct LogReplayScanner { // The commit file that this replay scanner will operate on. commit_file: ParsedLogPath, // The timestamp associated with this commit. This is the file modification time - // from the commit's [`FileMeta`]. - // - // - // TODO when incommit timestamps are supported: If there is a [`CommitInfo`] with a timestamp - // generated by in-commit timestamps, that timestamp will be used instead. - // - // Note: This will be used once an expression is introduced to transform the engine data in - // [`TableChangesScanData`] + // from the commit's [`FileMeta`]. If in-commit timestamps feature is enabled, this will be the + // in-commit timestamp from the [`CommitInfo`] action. timestamp: i64, } @@ -136,6 +132,7 @@ impl LogReplayScanner { /// 2. Construct a map from path to deletion vector of remove actions that share the same path /// as an add action. /// 3. Perform validation on each protocol and metadata action in the commit. + /// 4. Extract the in-commit timestamp from [`CommitInfo`] if it is present. /// /// For more details, see the documentation for [`LogReplayScanner`]. fn try_new( @@ -143,8 +140,6 @@ impl LogReplayScanner { commit_file: ParsedLogPath, table_schema: &SchemaRef, ) -> DeltaResult { - let visitor_schema = PreparePhaseVisitor::schema(); - // Note: We do not perform data skipping yet because we need to visit all add and // remove actions for deletion vector resolution to be correct. // @@ -156,22 +151,25 @@ impl LogReplayScanner { // vectors are resolved so that we can skip both actions in the pair. let action_iter = engine.get_json_handler().read_json_files( &[commit_file.location.clone()], - visitor_schema, + PreparePhaseVisitor::schema(), None, // not safe to apply data skipping yet )?; let mut remove_dvs = HashMap::default(); let mut add_paths = HashSet::default(); let mut has_cdc_action = false; - for actions in action_iter { + let mut timestamp = commit_file.location.last_modified; + for (i, actions) in action_iter.enumerate() { let actions = actions?; let mut visitor = PreparePhaseVisitor { add_paths: &mut add_paths, remove_dvs: &mut remove_dvs, has_cdc_action: &mut has_cdc_action, + commit_timestamp: &mut timestamp, protocol: None, metadata_info: None, + is_first_batch: i == 0, }; visitor.visit_rows_of(actions.as_ref())?; @@ -202,7 +200,7 @@ impl LogReplayScanner { remove_dvs.retain(|rm_path, _| add_paths.contains(rm_path)); } Ok(LogReplayScanner { - timestamp: commit_file.location.last_modified, + timestamp, commit_file, has_cdc_action, remove_dvs, @@ -220,7 +218,6 @@ impl LogReplayScanner { has_cdc_action, remove_dvs, commit_file, - // TODO: Add the timestamp as a column with an expression timestamp, } = self; let remove_dvs = Arc::new(remove_dvs); @@ -274,15 +271,19 @@ struct PreparePhaseVisitor<'a> { has_cdc_action: &'a mut bool, add_paths: &'a mut HashSet, remove_dvs: &'a mut HashMap, + commit_timestamp: &'a mut i64, + is_first_batch: bool, } impl PreparePhaseVisitor<'_> { fn schema() -> Arc { + let ict_type = StructField::new("inCommitTimestamp", DataType::LONG, true); Arc::new(StructType::new(vec![ Option::::get_struct_field(ADD_NAME), Option::::get_struct_field(REMOVE_NAME), Option::::get_struct_field(CDC_NAME), Option::::get_struct_field(METADATA_NAME), Option::::get_struct_field(PROTOCOL_NAME), + StructField::new(COMMIT_INFO_NAME, StructType::new([ict_type]), true), ])) } } @@ -314,6 +315,7 @@ impl RowVisitor for PreparePhaseVisitor<'_> { (INTEGER, column_name!("protocol.minWriterVersion")), (string_list.clone(), column_name!("protocol.readerFeatures")), (string_list, column_name!("protocol.writerFeatures")), + (LONG, column_name!("commitInfo.inCommitTimestamp")), ]; let (types, names) = types_and_names.into_iter().unzip(); (names, types).into() @@ -323,7 +325,7 @@ impl RowVisitor for PreparePhaseVisitor<'_> { fn visit<'b>(&mut self, row_count: usize, getters: &[&'b dyn GetData<'b>]) -> DeltaResult<()> { require!( - getters.len() == 16, + getters.len() == 17, Error::InternalError(format!( "Wrong number of PreparePhaseVisitor getters: {}", getters.len() @@ -354,6 +356,12 @@ impl RowVisitor for PreparePhaseVisitor<'_> { let protocol = ProtocolVisitor::visit_protocol(i, min_reader_version, &getters[12..=15])?; self.protocol = Some(protocol); + } else if let Some(in_commit_timestamp) = + getters[16].get_long(i, "commitInfo.inCommitTimestamp")? + { + if self.is_first_batch && i == 0 { + *self.commit_timestamp = in_commit_timestamp; + } } } Ok(()) diff --git a/kernel/src/table_changes/log_replay/tests.rs b/kernel/src/table_changes/log_replay/tests.rs index 35c4a99f8..e0147a156 100644 --- a/kernel/src/table_changes/log_replay/tests.rs +++ b/kernel/src/table_changes/log_replay/tests.rs @@ -1,6 +1,7 @@ use super::table_changes_action_iter; use super::TableChangesScanData; use crate::actions::deletion_vector::DeletionVectorDescriptor; +use crate::actions::CommitInfo; use crate::actions::{Add, Cdc, Metadata, Protocol, Remove}; use crate::engine::sync::SyncEngine; use crate::expressions::Scalar; @@ -609,3 +610,37 @@ async fn file_meta_timestamp() { let scanner = LogReplayScanner::try_new(engine.as_ref(), commit, &get_schema().into()).unwrap(); assert_eq!(scanner.timestamp, file_meta_ts); } + +#[tokio::test] +async fn table_changes_in_commit_timestamp() { + let engine = Arc::new(SyncEngine::new()); + let mut mock_table = LocalMockTable::new(); + + let timestamp = 12345678; + + mock_table + .commit([ + Action::CommitInfo(CommitInfo { + in_commit_timestamp: Some(timestamp), + ..Default::default() + }), + Action::Add(Add { + path: "fake_path_1".into(), + data_change: true, + ..Default::default() + }), + ]) + .await; + + let mut commits = get_segment(engine.as_ref(), mock_table.table_root(), 0, None) + .unwrap() + .into_iter(); + + let commit = commits.next().unwrap(); + let scanner = LogReplayScanner::try_new(engine.as_ref(), commit, &get_schema().into()).unwrap(); + assert_eq!(scanner.timestamp, timestamp); + + let iter = scanner.into_scan_batches(engine, None).unwrap(); + let sv = result_to_sv(iter); + assert_eq!(sv, vec![false, true]); +} diff --git a/kernel/src/table_changes/scan_file.rs b/kernel/src/table_changes/scan_file.rs index f428e09df..fd207daed 100644 --- a/kernel/src/table_changes/scan_file.rs +++ b/kernel/src/table_changes/scan_file.rs @@ -244,7 +244,7 @@ mod tests { use super::{scan_data_to_scan_file, CdfScanFile, CdfScanFileType}; use crate::actions::deletion_vector::DeletionVectorDescriptor; - use crate::actions::{Add, Cdc, Remove}; + use crate::actions::{Add, Cdc, CommitInfo, Remove}; use crate::engine::sync::SyncEngine; use crate::log_segment::LogSegment; use crate::scan::state::DvInfo; @@ -312,6 +312,12 @@ mod tests { ..Default::default() }; + let cdc_timestamp = 12345678; + let commit_info = CommitInfo { + in_commit_timestamp: Some(cdc_timestamp), + ..Default::default() + }; + mock_table .commit([ Action::Remove(remove_paired.clone()), @@ -319,7 +325,12 @@ mod tests { Action::Remove(remove.clone()), ]) .await; - mock_table.commit([Action::Cdc(cdc.clone())]).await; + mock_table + .commit([ + Action::CommitInfo(commit_info.clone()), + Action::Cdc(cdc.clone()), + ]) + .await; mock_table .commit([Action::Remove(remove_no_partition.clone())]) .await; @@ -386,7 +397,7 @@ mod tests { }, partition_values: cdc.partition_values, commit_version: 1, - commit_timestamp: timestamps[1], + commit_timestamp: cdc_timestamp, remove_dv: None, }, CdfScanFile {