diff --git a/crates/paimon/Cargo.toml b/crates/paimon/Cargo.toml index 959475f..ceb6100 100644 --- a/crates/paimon/Cargo.toml +++ b/crates/paimon/Cargo.toml @@ -44,6 +44,7 @@ serde = { version = "1", features = ["derive"] } serde_bytes = "0.11.15" serde_json = "1.0.120" serde_with = "3.9.0" +serde_repr = "0.1" snafu = "0.8.3" typed-builder = "^0.19" opendal = { version = "0.49", features = ["services-fs"] } diff --git a/crates/paimon/src/spec/data_file.rs b/crates/paimon/src/spec/data_file.rs index 37165e6..124b38c 100644 --- a/crates/paimon/src/spec/data_file.rs +++ b/crates/paimon/src/spec/data_file.rs @@ -15,7 +15,10 @@ // specific language governing permissions and limitations // under the License. +use crate::spec::stats::BinaryTableStats; use crate::spec::RowType; +use chrono::serde::ts_milliseconds::deserialize as from_millis; +use chrono::serde::ts_milliseconds::serialize as to_millis; use chrono::{DateTime, Utc}; use serde::{Deserialize, Serialize}; use std::fmt::{Display, Formatter}; @@ -48,12 +51,6 @@ impl BinaryRow { } } -/// TODO: implement me. -/// The statistics for columns, supports the following stats. -/// -/// Impl References: -type SimpleStats = (); - /// The Source of a file. /// TODO: move me to the manifest module. /// @@ -72,25 +69,43 @@ pub enum FileSource { #[derive(Debug, Eq, PartialEq, Serialize, Deserialize)] #[serde(rename_all = "camelCase")] pub struct DataFileMeta { + #[serde(rename = "_FILE_NAME")] pub file_name: String, + #[serde(rename = "_FILE_SIZE")] pub file_size: i64, // row_count tells the total number of rows (including add & delete) in this file. + #[serde(rename = "_ROW_COUNT")] pub row_count: i64, - pub min_key: BinaryRow, - pub max_key: BinaryRow, - pub key_stats: SimpleStats, - pub value_stats: SimpleStats, + #[serde(rename = "_MIN_KEY", with = "serde_bytes")] + pub min_key: Vec, + #[serde(rename = "_MAX_KEY", with = "serde_bytes")] + pub max_key: Vec, + #[serde(rename = "_KEY_STATS")] + pub key_stats: BinaryTableStats, + #[serde(rename = "_VALUE_STATS")] + pub value_stats: BinaryTableStats, + #[serde(rename = "_MIN_SEQUENCE_NUMBER")] pub min_sequence_number: i64, + #[serde(rename = "_MAX_SEQUENCE_NUMBER")] pub max_sequence_number: i64, + #[serde(rename = "_SCHEMA_ID")] pub schema_id: i64, + #[serde(rename = "_LEVEL")] pub level: i32, + #[serde(rename = "_EXTRA_FILES")] pub extra_files: Vec, + #[serde( + rename = "_CREATION_TIME", + serialize_with = "to_millis", + deserialize_with = "from_millis" + )] pub creation_time: DateTime, + #[serde(rename = "_DELETE_ROW_COUNT")] // rowCount = add_row_count + delete_row_count. pub delete_row_count: Option, // file index filter bytes, if it is small, store in data file meta + #[serde(rename = "_EMBEDDED_FILE_INDEX", with = "serde_bytes")] pub embedded_index: Option>, - pub file_source: Option, } impl Display for DataFileMeta { @@ -99,7 +114,44 @@ impl Display for DataFileMeta { } } +#[allow(clippy::too_many_arguments)] impl DataFileMeta { // TODO: implement me pub const SCHEMA: RowType = RowType::new(vec![]); + + pub fn new( + file_name: String, + file_size: i64, + row_count: i64, + min_key: Vec, + max_key: Vec, + key_stats: BinaryTableStats, + value_stats: BinaryTableStats, + min_sequence_number: i64, + max_sequence_number: i64, + schema_id: i64, + level: i32, + extra_files: Vec, + creation_time: DateTime, + delete_row_count: Option, + embedded_index: Option>, + ) -> Self { + DataFileMeta { + file_name, + file_size, + row_count, + min_key, + max_key, + key_stats, + value_stats, + min_sequence_number, + max_sequence_number, + schema_id, + level, + extra_files, + creation_time, + delete_row_count, + embedded_index, + } + } } diff --git a/crates/paimon/src/spec/manifest_entry.rs b/crates/paimon/src/spec/manifest_entry.rs new file mode 100644 index 0000000..be2989a --- /dev/null +++ b/crates/paimon/src/spec/manifest_entry.rs @@ -0,0 +1,154 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use crate::spec::DataFileMeta; +use serde::Deserialize; +use serde_repr::{Deserialize_repr, Serialize_repr}; +use serde_with::serde_derive::Serialize; + +/// Entry representing a file. +/// +/// Impl Reference: +#[allow(dead_code)] +pub trait FileEntry { + fn kind(&self) -> &FileKind; + + fn partition(&self) -> &Vec; + + fn bucket(&self) -> i32; + + fn level(&self) -> i32; + + fn file_name(&self) -> &str; + + fn min_key(&self) -> &Vec; + + fn max_key(&self) -> &Vec; + + fn identifier(&self) -> Identifier; +} + +/// The same {@link Identifier} indicates that the {@link ManifestEntry} refers to the same data file. +/// +/// Impl Reference: +#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)] +pub struct Identifier { + pub partition: Vec, + pub bucket: i32, + pub level: i32, + pub file_name: String, +} +/// Kind of a file. +/// Impl Reference: +#[derive(PartialEq, Eq, Debug, Clone, Serialize_repr, Deserialize_repr)] +#[repr(u8)] +pub enum FileKind { + Add = 0, + Delete = 1, +} + +/// Entry of a manifest file, representing an addition / deletion of a data file. +/// Impl Reference: +#[derive(Debug, PartialEq, Eq, Serialize, Deserialize)] +pub struct ManifestEntry { + #[serde(rename = "_KIND")] + kind: FileKind, + + #[serde(rename = "_PARTITION", with = "serde_bytes")] + partition: Vec, + + #[serde(rename = "_BUCKET")] + bucket: i32, + + #[serde(rename = "_TOTAL_BUCKETS")] + total_buckets: i32, + + #[serde(rename = "_FILE")] + file: DataFileMeta, + + #[serde(rename = "_VERSION")] + version: i32, +} + +#[allow(dead_code)] +impl FileEntry for ManifestEntry { + fn kind(&self) -> &FileKind { + &self.kind + } + + fn partition(&self) -> &Vec { + &self.partition + } + + fn bucket(&self) -> i32 { + self.bucket + } + + fn level(&self) -> i32 { + self.file.level + } + + fn file_name(&self) -> &str { + &self.file.file_name + } + + fn min_key(&self) -> &Vec { + &self.file.min_key + } + + fn max_key(&self) -> &Vec { + &self.file.max_key + } + + fn identifier(&self) -> Identifier { + Identifier { + partition: self.partition.clone(), + bucket: self.bucket, + level: self.file.level, + file_name: self.file.file_name.clone(), + } + } +} + +#[allow(dead_code)] +impl ManifestEntry { + pub fn total_buckets(&self) -> i32 { + self.total_buckets + } + + pub fn file(&self) -> &DataFileMeta { + &self.file + } + + pub fn new( + kind: FileKind, + partition: Vec, + bucket: i32, + total_buckets: i32, + file: DataFileMeta, + version: i32, + ) -> Self { + ManifestEntry { + kind, + partition, + bucket, + total_buckets, + file, + version, + } + } +} diff --git a/crates/paimon/src/spec/manifest_file_meta.rs b/crates/paimon/src/spec/manifest_file_meta.rs index 382d579..36f92b9 100644 --- a/crates/paimon/src/spec/manifest_file_meta.rs +++ b/crates/paimon/src/spec/manifest_file_meta.rs @@ -15,6 +15,7 @@ // specific language governing permissions and limitations // under the License. +use crate::spec::stats::BinaryTableStats; use serde::{Deserialize, Serialize}; use std::fmt::{Display, Formatter}; @@ -128,61 +129,3 @@ impl Display for ManifestFileMeta { ) } } - -/// The statistics for columns, supports the following stats. -/// -/// All statistics are stored in the form of a Binary, which can significantly reduce its memory consumption, but the cost is that the column type needs to be known when getting. -/// -/// Impl Reference: -#[derive(PartialEq, Eq, Debug, Clone, Serialize, Deserialize)] -pub struct BinaryTableStats { - /// the minimum values of the columns - #[serde(rename = "_MIN_VALUES", with = "serde_bytes")] - min_values: Vec, - - /// the maximum values of the columns - #[serde(rename = "_MAX_VALUES", with = "serde_bytes")] - max_values: Vec, - - /// the number of nulls of the columns - #[serde(rename = "_NULL_COUNTS")] - null_counts: Vec, -} - -impl BinaryTableStats { - /// Get the minimum values of the columns - #[inline] - pub fn min_values(&self) -> &[u8] { - &self.min_values - } - - /// Get the maximum values of the columns - #[inline] - pub fn max_values(&self) -> &[u8] { - &self.max_values - } - - /// Get the number of nulls of the columns - #[inline] - pub fn null_counts(&self) -> &Vec { - &self.null_counts - } - - pub fn new( - min_values: Vec, - max_values: Vec, - null_counts: Vec, - ) -> BinaryTableStats { - Self { - min_values, - max_values, - null_counts, - } - } -} - -impl Display for BinaryTableStats { - fn fmt(&self, _: &mut Formatter<'_>) -> std::fmt::Result { - todo!() - } -} diff --git a/crates/paimon/src/spec/manifest_list.rs b/crates/paimon/src/spec/manifest_list.rs deleted file mode 100644 index 2cffd5c..0000000 --- a/crates/paimon/src/spec/manifest_list.rs +++ /dev/null @@ -1,111 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -use super::manifest_file_meta::ManifestFileMeta; -use crate::io::FileIO; -use crate::{Error, Result}; -use apache_avro::types::Value; -use apache_avro::{from_value, Reader}; -use serde::{Deserialize, Serialize}; - -#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] -#[serde(transparent)] -/// This file includes several [`ManifestFileMeta`], representing all data of the whole table at the corresponding snapshot. -pub struct ManifestList { - entries: Vec, -} - -impl ManifestList { - pub fn entries(&self) -> &Vec { - &self.entries - } - - pub fn from_avro_bytes(bytes: &[u8]) -> Result { - let reader = Reader::new(bytes).map_err(Error::from)?; - let records = reader - .collect::, _>>() - .map_err(Error::from)?; - let values = Value::Array(records); - from_value::(&values).map_err(Error::from) - } -} - -pub struct ManifestListFactory { - file_io: FileIO, -} - -/// The factory to read and write [`ManifestList`] -impl ManifestListFactory { - pub fn new(file_io: FileIO) -> ManifestListFactory { - Self { file_io } - } - - /// Write several [`ManifestFileMeta`]s into a manifest list. - /// - /// NOTE: This method is atomic. - pub fn write(&mut self, _metas: Vec) -> &str { - todo!() - } - - /// Read [`ManifestList`] from the manifest file. - pub async fn read(&self, path: &str) -> Result { - let bs = self.file_io.new_input(path)?.read().await?; - // todo support other formats - ManifestList::from_avro_bytes(bs.as_ref()) - } -} - -#[cfg(test)] -mod tests { - use crate::spec::{BinaryTableStats, ManifestFileMeta, ManifestList}; - - #[tokio::test] - async fn test_read_manifest_list() { - let workdir = - std::env::current_dir().unwrap_or_else(|err| panic!("current_dir must exist: {err}")); - let path = workdir - .join("tests/fixtures/manifest/manifest-list-5c7399a0-46ae-4a5e-9c13-3ab07212cdb6-0"); - let v = std::fs::read(path.to_str().unwrap()).unwrap(); - let res = ManifestList::from_avro_bytes(&v).unwrap(); - let value_bytes = vec![ - 0, 0, 0, 2, 0, 0, 0, 0, 0, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 49, 0, 0, 0, 0, 0, 0, 129, - ]; - assert_eq!( - res, - ManifestList { - entries: vec![ - ManifestFileMeta::new( - "manifest-19d138df-233f-46f7-beb6-fadaf4741c0e".to_string(), - 10, - 10, - 10, - BinaryTableStats::new(value_bytes.clone(), value_bytes.clone(), vec![1, 2]), - 1 - ), - ManifestFileMeta::new( - "manifest-a703ee48-c411-413e-b84e-c03bdb179631".to_string(), - 11, - 0, - 10, - BinaryTableStats::new(value_bytes.clone(), value_bytes.clone(), vec![1, 2]), - 2 - ) - ], - } - ); - } -} diff --git a/crates/paimon/src/spec/mod.rs b/crates/paimon/src/spec/mod.rs index 80fb47d..59c7d00 100644 --- a/crates/paimon/src/spec/mod.rs +++ b/crates/paimon/src/spec/mod.rs @@ -34,8 +34,9 @@ pub use snapshot::*; mod manifest_file_meta; pub use manifest_file_meta::*; -mod manifest_list; -pub use manifest_list::*; - +mod manifest_entry; +mod objects_file; +mod stats; mod types; + pub use types::*; diff --git a/crates/paimon/src/spec/objects_file.rs b/crates/paimon/src/spec/objects_file.rs new file mode 100644 index 0000000..dabec24 --- /dev/null +++ b/crates/paimon/src/spec/objects_file.rs @@ -0,0 +1,224 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use crate::io::FileIO; +use crate::spec::manifest_entry::ManifestEntry; +use crate::spec::ManifestFileMeta; +use crate::Error; +use apache_avro::types::Value; +use apache_avro::{from_value, Reader}; +use serde::de::DeserializeOwned; +use serde::{Deserialize, Serialize}; + +#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] +#[serde(transparent)] +/// This file includes several [T] +pub struct ObjectsFile { + entries: Vec, +} + +#[allow(dead_code)] +impl ObjectsFile +where + T: DeserializeOwned, +{ + pub fn entries(&self) -> &Vec { + &self.entries + } + + pub fn from_avro_bytes(bytes: &[u8]) -> crate::Result> { + let reader = Reader::new(bytes).map_err(Error::from)?; + let records = reader + .collect::, _>>() + .map_err(Error::from)?; + let values = Value::Array(records); + from_value::>(&values).map_err(Error::from) + } +} + +pub struct ObjectsFileFactory { + file_io: FileIO, +} + +/// The factory to read and write [`ObjectsFile`] +#[allow(dead_code)] +impl ObjectsFileFactory { + pub fn new(file_io: FileIO) -> ObjectsFileFactory { + Self { file_io } + } + + /// Write several [`ManifestFileMeta`]s into a manifest list. + /// + /// NOTE: This method is atomic. + pub fn write(&mut self, _metas: Vec) -> &str { + todo!() + } + + /// Read [`ManifestList`] from the manifest file. + pub async fn read_manifest_file_meta( + &self, + path: &str, + ) -> crate::Result> { + let bs = self.file_io.new_input(path)?.read().await?; + // todo support other formats + ObjectsFile::::from_avro_bytes(bs.as_ref()) + } + + /// Read [`ManifestEntry`] from the manifest file. + pub async fn read_manifest_entry( + &self, + path: &str, + ) -> crate::Result> { + let bs = self.file_io.new_input(path)?.read().await?; + // todo support other formats + ObjectsFile::::from_avro_bytes(bs.as_ref()) + } +} + +#[cfg(test)] +mod tests { + use crate::spec::manifest_entry::{FileKind, ManifestEntry}; + use crate::spec::objects_file::ObjectsFile; + use crate::spec::stats::BinaryTableStats; + use crate::spec::{DataFileMeta, ManifestFileMeta}; + use chrono::{DateTime, Utc}; + + #[tokio::test] + async fn test_read_manifest_list() { + let workdir = + std::env::current_dir().unwrap_or_else(|err| panic!("current_dir must exist: {err}")); + let path = workdir + .join("tests/fixtures/manifest/manifest-list-5c7399a0-46ae-4a5e-9c13-3ab07212cdb6-0"); + let v = std::fs::read(path.to_str().unwrap()).unwrap(); + let res = ObjectsFile::::from_avro_bytes(&v).unwrap(); + let value_bytes = vec![ + 0, 0, 0, 2, 0, 0, 0, 0, 0, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 49, 0, 0, 0, 0, 0, 0, 129, + ]; + assert_eq!( + res, + ObjectsFile { + entries: vec![ + ManifestFileMeta::new( + "manifest-19d138df-233f-46f7-beb6-fadaf4741c0e".to_string(), + 10, + 10, + 10, + BinaryTableStats::new(value_bytes.clone(), value_bytes.clone(), vec![1, 2]), + 1 + ), + ManifestFileMeta::new( + "manifest-a703ee48-c411-413e-b84e-c03bdb179631".to_string(), + 11, + 0, + 10, + BinaryTableStats::new(value_bytes.clone(), value_bytes.clone(), vec![1, 2]), + 2 + ) + ], + } + ); + } + + #[tokio::test] + async fn test_read_manifest_entry() { + let workdir = + std::env::current_dir().unwrap_or_else(|err| panic!("current_dir must exist: {err}")); + let path = + workdir.join("tests/fixtures/manifest/manifest-8ded1f09-fcda-489e-9167-582ac0f9f846-0"); + let v = std::fs::read(path.to_str().unwrap()).unwrap(); + let res = ObjectsFile::::from_avro_bytes(&v).unwrap(); + let value_bytes = vec![ + 0, 0, 0, 2, 0, 0, 0, 0, 0, 0, 0, 0, 49, 0, 0, 0, 0, 0, 0, 129, 1, 0, 0, 0, 0, 0, 0, 0, + ]; + let single_value = vec![0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0]; + assert_eq!( + res, + ObjectsFile { + entries: vec![ + ManifestEntry::new( + FileKind::Delete, + single_value.clone(), + 1, + 10, + DataFileMeta::new( + "f1.parquet".to_string(), + 10, + 100, + single_value.clone(), + single_value.clone(), + BinaryTableStats::new( + value_bytes.clone(), + value_bytes.clone(), + vec![1, 2] + ), + BinaryTableStats::new( + value_bytes.clone(), + value_bytes.clone(), + vec![1, 2] + ), + 1, + 100, + 0, + 1, + vec![], + "2024-09-06T07:45:55.039+00:00" + .parse::>() + .unwrap(), + Some(0), + None, + ), + 2 + ), + ManifestEntry::new( + FileKind::Add, + single_value.clone(), + 2, + 10, + DataFileMeta::new( + "f2.parquet".to_string(), + 10, + 100, + single_value.clone(), + single_value.clone(), + BinaryTableStats::new( + value_bytes.clone(), + value_bytes.clone(), + vec![1, 2] + ), + BinaryTableStats::new( + value_bytes.clone(), + value_bytes.clone(), + vec![1, 2] + ), + 1, + 100, + 0, + 1, + vec![], + "2024-09-06T07:45:55.039+00:00" + .parse::>() + .unwrap(), + Some(1), + None, + ), + 2 + ), + ] + } + ) + } +} diff --git a/crates/paimon/src/spec/stats.rs b/crates/paimon/src/spec/stats.rs new file mode 100644 index 0000000..98923ce --- /dev/null +++ b/crates/paimon/src/spec/stats.rs @@ -0,0 +1,77 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use serde::{Deserialize, Serialize}; +use std::fmt::{Display, Formatter}; + +/// The statistics for columns, supports the following stats. +/// +/// All statistics are stored in the form of a Binary, which can significantly reduce its memory consumption, but the cost is that the column type needs to be known when getting. +/// +/// Impl Reference: +#[derive(PartialEq, Eq, Debug, Clone, Serialize, Deserialize)] +pub struct BinaryTableStats { + /// the minimum values of the columns + #[serde(rename = "_MIN_VALUES", with = "serde_bytes")] + min_values: Vec, + + /// the maximum values of the columns + #[serde(rename = "_MAX_VALUES", with = "serde_bytes")] + max_values: Vec, + + /// the number of nulls of the columns + #[serde(rename = "_NULL_COUNTS")] + null_counts: Vec, +} + +impl BinaryTableStats { + /// Get the minimum values of the columns + #[inline] + pub fn min_values(&self) -> &[u8] { + &self.min_values + } + + /// Get the maximum values of the columns + #[inline] + pub fn max_values(&self) -> &[u8] { + &self.max_values + } + + /// Get the number of nulls of the columns + #[inline] + pub fn null_counts(&self) -> &Vec { + &self.null_counts + } + + pub fn new( + min_values: Vec, + max_values: Vec, + null_counts: Vec, + ) -> BinaryTableStats { + Self { + min_values, + max_values, + null_counts, + } + } +} + +impl Display for BinaryTableStats { + fn fmt(&self, _: &mut Formatter<'_>) -> std::fmt::Result { + todo!() + } +} diff --git a/crates/paimon/tests/fixtures/manifest/manifest-8ded1f09-fcda-489e-9167-582ac0f9f846-0 b/crates/paimon/tests/fixtures/manifest/manifest-8ded1f09-fcda-489e-9167-582ac0f9f846-0 new file mode 100644 index 0000000..57a7ef4 Binary files /dev/null and b/crates/paimon/tests/fixtures/manifest/manifest-8ded1f09-fcda-489e-9167-582ac0f9f846-0 differ