Skip to content

Commit

Permalink
feat(spec): Impl the ManifestFile read functionality
Browse files Browse the repository at this point in the history
  • Loading branch information
Aitozi committed Sep 6, 2024
1 parent f727960 commit 50ea824
Show file tree
Hide file tree
Showing 9 changed files with 524 additions and 183 deletions.
1 change: 1 addition & 0 deletions crates/paimon/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ serde = { version = "1", features = ["derive"] }
serde_bytes = "0.11.15"
serde_json = "1.0.120"
serde_with = "3.9.0"
serde_repr = "0.1"
snafu = "0.8.3"
typed-builder = "^0.19"
opendal = { version = "0.49", features = ["services-fs"] }
Expand Down
74 changes: 63 additions & 11 deletions crates/paimon/src/spec/data_file.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,10 @@
// specific language governing permissions and limitations
// under the License.

use crate::spec::stats::BinaryTableStats;
use crate::spec::RowType;
use chrono::serde::ts_milliseconds::deserialize as from_millis;
use chrono::serde::ts_milliseconds::serialize as to_millis;
use chrono::{DateTime, Utc};
use serde::{Deserialize, Serialize};
use std::fmt::{Display, Formatter};
Expand Down Expand Up @@ -48,12 +51,6 @@ impl BinaryRow {
}
}

/// TODO: implement me.
/// The statistics for columns, supports the following stats.
///
/// Impl References: <https://github.com/apache/paimon/blob/release-0.8.2/paimon-core/src/main/java/org/apache/paimon/stats/SimpleStats.java>
type SimpleStats = ();

/// The Source of a file.
/// TODO: move me to the manifest module.
///
Expand All @@ -72,25 +69,43 @@ pub enum FileSource {
#[derive(Debug, Eq, PartialEq, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct DataFileMeta {
#[serde(rename = "_FILE_NAME")]
pub file_name: String,
#[serde(rename = "_FILE_SIZE")]
pub file_size: i64,
// row_count tells the total number of rows (including add & delete) in this file.
#[serde(rename = "_ROW_COUNT")]
pub row_count: i64,
pub min_key: BinaryRow,
pub max_key: BinaryRow,
pub key_stats: SimpleStats,
pub value_stats: SimpleStats,
#[serde(rename = "_MIN_KEY", with = "serde_bytes")]
pub min_key: Vec<u8>,
#[serde(rename = "_MAX_KEY", with = "serde_bytes")]
pub max_key: Vec<u8>,
#[serde(rename = "_KEY_STATS")]
pub key_stats: BinaryTableStats,
#[serde(rename = "_VALUE_STATS")]
pub value_stats: BinaryTableStats,
#[serde(rename = "_MIN_SEQUENCE_NUMBER")]
pub min_sequence_number: i64,
#[serde(rename = "_MAX_SEQUENCE_NUMBER")]
pub max_sequence_number: i64,
#[serde(rename = "_SCHEMA_ID")]
pub schema_id: i64,
#[serde(rename = "_LEVEL")]
pub level: i32,
#[serde(rename = "_EXTRA_FILES")]
pub extra_files: Vec<String>,
#[serde(
rename = "_CREATION_TIME",
serialize_with = "to_millis",
deserialize_with = "from_millis"
)]
pub creation_time: DateTime<Utc>,
#[serde(rename = "_DELETE_ROW_COUNT")]
// rowCount = add_row_count + delete_row_count.
pub delete_row_count: Option<i64>,
// file index filter bytes, if it is small, store in data file meta
#[serde(rename = "_EMBEDDED_FILE_INDEX", with = "serde_bytes")]
pub embedded_index: Option<Vec<u8>>,
pub file_source: Option<FileSource>,
}

impl Display for DataFileMeta {
Expand All @@ -99,7 +114,44 @@ impl Display for DataFileMeta {
}
}

#[allow(clippy::too_many_arguments)]
impl DataFileMeta {
// TODO: implement me
pub const SCHEMA: RowType = RowType::new(vec![]);

pub fn new(
file_name: String,
file_size: i64,
row_count: i64,
min_key: Vec<u8>,
max_key: Vec<u8>,
key_stats: BinaryTableStats,
value_stats: BinaryTableStats,
min_sequence_number: i64,
max_sequence_number: i64,
schema_id: i64,
level: i32,
extra_files: Vec<String>,
creation_time: DateTime<Utc>,
delete_row_count: Option<i64>,
embedded_index: Option<Vec<u8>>,
) -> Self {
DataFileMeta {
file_name,
file_size,
row_count,
min_key,
max_key,
key_stats,
value_stats,
min_sequence_number,
max_sequence_number,
schema_id,
level,
extra_files,
creation_time,
delete_row_count,
embedded_index,
}
}
}
154 changes: 154 additions & 0 deletions crates/paimon/src/spec/manifest_entry.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,154 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

use crate::spec::DataFileMeta;
use serde::Deserialize;
use serde_repr::{Deserialize_repr, Serialize_repr};
use serde_with::serde_derive::Serialize;

/// Entry representing a file.
///
/// Impl Reference: <https://github.com/apache/paimon/blob/release-0.8.2/paimon-core/src/main/java/org/apache/paimon/manifest/FileEntry.java>
#[allow(dead_code)]
pub trait FileEntry {
fn kind(&self) -> &FileKind;

fn partition(&self) -> &Vec<u8>;

fn bucket(&self) -> i32;

fn level(&self) -> i32;

fn file_name(&self) -> &str;

fn min_key(&self) -> &Vec<u8>;

fn max_key(&self) -> &Vec<u8>;

fn identifier(&self) -> Identifier;
}

/// The same {@link Identifier} indicates that the {@link ManifestEntry} refers to the same data file.
///
/// Impl Reference: <https://github.com/apache/paimon/blob/release-0.8.2/paimon-core/src/main/java/org/apache/paimon/manifest/FileEntry.java#L58>
#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
pub struct Identifier {
pub partition: Vec<u8>,
pub bucket: i32,
pub level: i32,
pub file_name: String,
}
/// Kind of a file.
/// Impl Reference: <https://github.com/apache/paimon/blob/release-0.8.2/paimon-core/src/main/java/org/apache/paimon/manifest/FileKind.java>
#[derive(PartialEq, Eq, Debug, Clone, Serialize_repr, Deserialize_repr)]
#[repr(u8)]
pub enum FileKind {
Add = 0,
Delete = 1,
}

/// Entry of a manifest file, representing an addition / deletion of a data file.
/// Impl Reference: <https://github.com/apache/paimon/blob/release-0.8.2/paimon-core/src/main/java/org/apache/paimon/manifest/ManifestEntry.java>
#[derive(Debug, PartialEq, Eq, Serialize, Deserialize)]
pub struct ManifestEntry {
#[serde(rename = "_KIND")]
kind: FileKind,

#[serde(rename = "_PARTITION", with = "serde_bytes")]
partition: Vec<u8>,

#[serde(rename = "_BUCKET")]
bucket: i32,

#[serde(rename = "_TOTAL_BUCKETS")]
total_buckets: i32,

#[serde(rename = "_FILE")]
file: DataFileMeta,

#[serde(rename = "_VERSION")]
version: i32,
}

#[allow(dead_code)]
impl FileEntry for ManifestEntry {
fn kind(&self) -> &FileKind {
&self.kind
}

fn partition(&self) -> &Vec<u8> {
&self.partition
}

fn bucket(&self) -> i32 {
self.bucket
}

fn level(&self) -> i32 {
self.file.level
}

fn file_name(&self) -> &str {
&self.file.file_name
}

fn min_key(&self) -> &Vec<u8> {
&self.file.min_key
}

fn max_key(&self) -> &Vec<u8> {
&self.file.max_key
}

fn identifier(&self) -> Identifier {
Identifier {
partition: self.partition.clone(),
bucket: self.bucket,
level: self.file.level,
file_name: self.file.file_name.clone(),
}
}
}

#[allow(dead_code)]
impl ManifestEntry {
pub fn total_buckets(&self) -> i32 {
self.total_buckets
}

pub fn file(&self) -> &DataFileMeta {
&self.file
}

pub fn new(
kind: FileKind,
partition: Vec<u8>,
bucket: i32,
total_buckets: i32,
file: DataFileMeta,
version: i32,
) -> Self {
ManifestEntry {
kind,
partition,
bucket,
total_buckets,
file,
version,
}
}
}
59 changes: 1 addition & 58 deletions crates/paimon/src/spec/manifest_file_meta.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
// specific language governing permissions and limitations
// under the License.

use crate::spec::stats::BinaryTableStats;
use serde::{Deserialize, Serialize};
use std::fmt::{Display, Formatter};

Expand Down Expand Up @@ -128,61 +129,3 @@ impl Display for ManifestFileMeta {
)
}
}

/// The statistics for columns, supports the following stats.
///
/// All statistics are stored in the form of a Binary, which can significantly reduce its memory consumption, but the cost is that the column type needs to be known when getting.
///
/// Impl Reference: <https://github.com/apache/paimon/blob/release-0.8.2/paimon-core/src/main/java/org/apache/paimon/stats/FieldStatsArraySerializer.java#L111>
#[derive(PartialEq, Eq, Debug, Clone, Serialize, Deserialize)]
pub struct BinaryTableStats {
/// the minimum values of the columns
#[serde(rename = "_MIN_VALUES", with = "serde_bytes")]
min_values: Vec<u8>,

/// the maximum values of the columns
#[serde(rename = "_MAX_VALUES", with = "serde_bytes")]
max_values: Vec<u8>,

/// the number of nulls of the columns
#[serde(rename = "_NULL_COUNTS")]
null_counts: Vec<i64>,
}

impl BinaryTableStats {
/// Get the minimum values of the columns
#[inline]
pub fn min_values(&self) -> &[u8] {
&self.min_values
}

/// Get the maximum values of the columns
#[inline]
pub fn max_values(&self) -> &[u8] {
&self.max_values
}

/// Get the number of nulls of the columns
#[inline]
pub fn null_counts(&self) -> &Vec<i64> {
&self.null_counts
}

pub fn new(
min_values: Vec<u8>,
max_values: Vec<u8>,
null_counts: Vec<i64>,
) -> BinaryTableStats {
Self {
min_values,
max_values,
null_counts,
}
}
}

impl Display for BinaryTableStats {
fn fmt(&self, _: &mut Formatter<'_>) -> std::fmt::Result {
todo!()
}
}
Loading

0 comments on commit 50ea824

Please sign in to comment.