Skip to content

Commit

Permalink
spec: Implement ManifestList read functionality (#59)
Browse files Browse the repository at this point in the history
  • Loading branch information
Aitozi authored Sep 1, 2024
1 parent 8a2ea68 commit b91a3b0
Show file tree
Hide file tree
Showing 5 changed files with 143 additions and 38 deletions.
1 change: 1 addition & 0 deletions crates/paimon/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -48,3 +48,4 @@ snafu = "0.8.3"
typed-builder = "^0.19"
opendal = { version = "0.48",features = ["services-fs"] }
pretty_assertions = "1"
apache-avro = { version = "0.17", features = ["snappy"] }
17 changes: 17 additions & 0 deletions crates/paimon/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,14 @@ pub enum Error {
display("Paimon hitting invalid config: {}", message)
)]
ConfigInvalid { message: String },
#[snafu(
visibility(pub(crate)),
display("Paimon hitting unexpected avro error {}: {:?}", message, source)
)]
DataUnexpected {
message: String,
source: apache_avro::Error,
},
}

impl From<opendal::Error> for Error {
Expand All @@ -63,3 +71,12 @@ impl From<opendal::Error> for Error {
}
}
}

impl From<apache_avro::Error> for Error {
fn from(source: apache_avro::Error) -> Self {
Error::DataUnexpected {
message: "".to_string(),
source,
}
}
}
80 changes: 43 additions & 37 deletions crates/paimon/src/spec/manifest_file_meta.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,15 +15,17 @@
// specific language governing permissions and limitations
// under the License.

use serde::{Deserialize, Deserializer, Serialize, Serializer};
use serde_bytes::Bytes;
use serde::{Deserialize, Serialize};
use std::fmt::{Display, Formatter};

/// Metadata of a manifest file.
///
/// Impl Reference: <https://github.com/apache/paimon/blob/release-0.8.2/paimon-core/src/main/java/org/apache/paimon/manifest/ManifestFileMeta.java>
#[derive(PartialEq, Eq, Debug, Clone, Serialize, Deserialize)]
pub struct ManifestFileMeta {
#[serde(rename = "_VERSION")]
version: i32,

/// manifest file name
#[serde(rename = "_FILE_NAME")]
file_name: String,
Expand Down Expand Up @@ -84,6 +86,32 @@ impl ManifestFileMeta {
pub fn schema_id(&self) -> i64 {
self.schema_id
}

/// Get the version of this manifest file
#[inline]
pub fn version(&self) -> i32 {
self.version
}

#[inline]
pub fn new(
file_name: String,
file_size: i64,
num_added_files: i64,
num_deleted_files: i64,
partition_stats: BinaryTableStats,
schema_id: i64,
) -> ManifestFileMeta {
Self {
version: 2,
file_name,
file_size,
num_added_files,
num_deleted_files,
partition_stats,
schema_id,
}
}
}

impl Display for ManifestFileMeta {
Expand Down Expand Up @@ -117,11 +145,7 @@ pub struct BinaryTableStats {
max_values: Vec<u8>,

/// the number of nulls of the columns
#[serde(
rename = "_NULL_COUNTS",
serialize_with = "serialize_null_counts",
deserialize_with = "deserialize_null_counts"
)]
#[serde(rename = "_NULL_COUNTS")]
null_counts: Vec<i64>,
}

Expand All @@ -143,40 +167,22 @@ impl BinaryTableStats {
pub fn null_counts(&self) -> &Vec<i64> {
&self.null_counts
}

pub fn new(
min_values: Vec<u8>,
max_values: Vec<u8>,
null_counts: Vec<i64>,
) -> BinaryTableStats {
Self {
min_values,
max_values,
null_counts,
}
}
}

impl Display for BinaryTableStats {
fn fmt(&self, _: &mut Formatter<'_>) -> std::fmt::Result {
todo!()
}
}

fn serialize_null_counts<S>(value: &Vec<i64>, serializer: S) -> Result<S::Ok, S::Error>
where
S: Serializer,
{
let mut bytes = Vec::new();
for &num in value {
bytes.extend_from_slice(&num.to_le_bytes());
}

let bytes = Bytes::new(bytes.as_slice());
serializer.serialize_bytes(bytes)
}

fn deserialize_null_counts<'de, D>(deserializer: D) -> Result<Vec<i64>, D::Error>
where
D: Deserializer<'de>,
{
let bytes = Deserialize::deserialize(deserializer).map(Bytes::new)?;

let size_of_i64 = std::mem::size_of::<i64>();
let i64_count = bytes.len() / size_of_i64;
let mut i64s = Vec::with_capacity(i64_count);
for chunk in bytes.chunks_exact(size_of_i64) {
i64s.push(i64::from_le_bytes(
chunk.try_into().expect("Chunk must be 8 bytes long"),
));
}
Ok(i64s)
}
83 changes: 82 additions & 1 deletion crates/paimon/src/spec/manifest_list.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,15 +16,96 @@
// under the License.

use super::manifest_file_meta::ManifestFileMeta;
use crate::io::FileIO;
use crate::{Error, Result};
use apache_avro::types::Value;
use apache_avro::{from_value, Reader};
use serde::{Deserialize, Serialize};

#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
#[serde(transparent)]
/// This file includes several [`ManifestFileMeta`], representing all data of the whole table at the corresponding snapshot.
pub struct ManifestList {}
pub struct ManifestList {
entries: Vec<ManifestFileMeta>,
}

impl ManifestList {
pub fn entries(&self) -> &Vec<ManifestFileMeta> {
&self.entries
}

pub fn from_avro_bytes(bytes: &[u8]) -> Result<ManifestList> {
let reader = Reader::new(bytes).map_err(Error::from)?;
let records = reader
.collect::<std::result::Result<Vec<Value>, _>>()
.map_err(Error::from)?;
let values = Value::Array(records);
from_value::<ManifestList>(&values).map_err(Error::from)
}
}

pub struct ManifestListFactory {
file_io: FileIO,
}

/// The factory to read and write [`ManifestList`]
impl ManifestListFactory {
pub fn new(file_io: FileIO) -> ManifestListFactory {
Self { file_io }
}

/// Write several [`ManifestFileMeta`]s into a manifest list.
///
/// NOTE: This method is atomic.
pub fn write(&mut self, _metas: Vec<ManifestFileMeta>) -> &str {
todo!()
}

/// Read [`ManifestList`] from the manifest file.
pub async fn read(&self, path: &str) -> Result<ManifestList> {
let bs = self.file_io.new_input(path)?.read().await?;
// todo support other formats
ManifestList::from_avro_bytes(bs.as_ref())
}
}

#[cfg(test)]
mod tests {
use crate::spec::{BinaryTableStats, ManifestFileMeta, ManifestList};

#[tokio::test]
async fn test_read_manifest_list() {
let workdir =
std::env::current_dir().unwrap_or_else(|err| panic!("current_dir must exist: {err}"));
let path = workdir
.join("tests/fixtures/manifest/manifest-list-5c7399a0-46ae-4a5e-9c13-3ab07212cdb6-0");
let v = std::fs::read(path.to_str().unwrap()).unwrap();
let res = ManifestList::from_avro_bytes(&v).unwrap();
let value_bytes = vec![
0, 0, 0, 2, 0, 0, 0, 0, 0, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 49, 0, 0, 0, 0, 0, 0, 129,
];
assert_eq!(
res,
ManifestList {
entries: vec![
ManifestFileMeta::new(
"manifest-19d138df-233f-46f7-beb6-fadaf4741c0e".to_string(),
10,
10,
10,
BinaryTableStats::new(value_bytes.clone(), value_bytes.clone(), vec![1, 2]),
1
),
ManifestFileMeta::new(
"manifest-a703ee48-c411-413e-b84e-c03bdb179631".to_string(),
11,
0,
10,
BinaryTableStats::new(value_bytes.clone(), value_bytes.clone(), vec![1, 2]),
2
)
],
}
);
}
}
Binary file not shown.

0 comments on commit b91a3b0

Please sign in to comment.