From 5a5220d2737630f1f43a99a23f26ff97119b2894 Mon Sep 17 00:00:00 2001 From: WenjunMin Date: Sun, 1 Sep 2024 17:02:09 +0800 Subject: [PATCH] feat(spec): Add CommitKind in snapshot (#64) --- crates/paimon/src/spec/snapshot.rs | 293 ++++++------------ .../snapshot/snapshot-v3-none-field.json | 16 + .../tests/fixtures/snapshot/snapshot-v3.json | 17 + 3 files changed, 128 insertions(+), 198 deletions(-) create mode 100644 crates/paimon/tests/fixtures/snapshot/snapshot-v3-none-field.json create mode 100644 crates/paimon/tests/fixtures/snapshot/snapshot-v3.json diff --git a/crates/paimon/src/spec/snapshot.rs b/crates/paimon/src/spec/snapshot.rs index 44ffed2..28dc92d 100644 --- a/crates/paimon/src/spec/snapshot.rs +++ b/crates/paimon/src/spec/snapshot.rs @@ -19,6 +19,22 @@ use serde::{Deserialize, Serialize}; use std::collections::HashMap; use typed_builder::TypedBuilder; +/// Type of changes in this snapshot. +#[derive(Debug, Clone, PartialEq, Eq, Deserialize, Serialize)] +pub enum CommitKind { + /// Changes flushed from the mem table. + APPEND, + + /// Changes by compacting existing data files. + COMPACT, + + /// Changes that clear up the whole partition and then add new records. + OVERWRITE, + + /// Collect statistics. + ANALYZE, +} + /// Snapshot for paimon. /// /// Impl Reference: . @@ -35,9 +51,11 @@ pub struct Snapshot { delta_manifest_list: String, /// a manifest list recording all changelog produced in this snapshot #[builder(default = None)] + #[serde(skip_serializing_if = "Option::is_none")] changelog_manifest_list: Option, /// a manifest recording all index files of this table #[builder(default = None)] + #[serde(skip_serializing_if = "Option::is_none")] index_manifest: Option, /// user who committed this snapshot commit_user: String, @@ -49,10 +67,12 @@ pub struct Snapshot { /// If snapshot A has a smaller commitIdentifier than snapshot B, then snapshot A must be /// committed before snapshot B, and thus snapshot A must contain older records than snapshot B. commit_identifier: i64, + commit_kind: CommitKind, /// timestamp of this snapshot time_millis: u64, /// log offsets of all changes occurred in this snapshot #[builder(default = None)] + #[serde(skip_serializing_if = "Option::is_none")] log_offsets: Option>, /// record count of all changes occurred in this snapshot #[builder(default = None)] @@ -62,12 +82,15 @@ pub struct Snapshot { delta_record_count: Option, /// record count of all changelog produced in this snapshot #[builder(default = None)] + #[serde(skip_serializing_if = "Option::is_none")] changelog_record_count: Option, /// watermark for input records #[builder(default = None)] + #[serde(skip_serializing_if = "Option::is_none")] watermark: Option, /// stats file name for statistics of this table #[builder(default = None)] + #[serde(skip_serializing_if = "Option::is_none")] statistics: Option, } @@ -172,213 +195,87 @@ impl Snapshot { #[cfg(test)] mod tests { use super::*; + use pretty_assertions::assert_eq; use serde_json; + use std::env::current_dir; - #[test] - fn test_snapshot_creation() { - let snapshot = Snapshot::builder() - .version(3) - .id(1) - .schema_id(0) - .base_manifest_list("manifest-list-4091b92b-01d3-4b91-8e47-8c9f61847d2f-0".to_string()) - .delta_manifest_list("manifest-list-4091b92b-01d3-4b91-8e47-8c9f61847d2f-1".to_string()) - .commit_user("cf568e07-05ad-4943-b4bd-37461bc58729".to_string()) - .commit_identifier(9223372036854775807) - .time_millis(1721287833568) - .build(); - - assert_eq!(snapshot.version(), 3); - assert_eq!(snapshot.id(), 1); - assert_eq!(snapshot.schema_id(), 0); - assert_eq!( - snapshot.base_manifest_list(), - "manifest-list-4091b92b-01d3-4b91-8e47-8c9f61847d2f-0" - ); - assert_eq!( - snapshot.delta_manifest_list(), - "manifest-list-4091b92b-01d3-4b91-8e47-8c9f61847d2f-1" - ); - assert_eq!( - snapshot.commit_user(), - "cf568e07-05ad-4943-b4bd-37461bc58729" - ); - assert_eq!(snapshot.commit_identifier(), 9223372036854775807); - assert_eq!(snapshot.time_millis(), 1721287833568); - assert!(snapshot.changelog_manifest_list().is_none()); - assert!(snapshot.index_manifest().is_none()); - assert!(snapshot.log_offsets().is_none()); - assert!(snapshot.total_record_count().is_none()); - assert!(snapshot.delta_record_count().is_none()); - assert!(snapshot.changelog_record_count().is_none()); - assert!(snapshot.watermark().is_none()); - assert!(snapshot.statistics().is_none()); + fn load_fixture(name: &str) -> String { + let path = current_dir() + .unwrap_or_else(|err| panic!("current_dir must exist: {err}")) + .join(format!("tests/fixtures/snapshot/{name}.json")); + let bytes = std::fs::read(&path) + .unwrap_or_else(|err| panic!("fixtures {path:?} load failed: {err}")); + String::from_utf8(bytes).expect("fixtures content must be valid utf8") } - #[test] - fn test_snapshot_with_optional_fields() { - let snapshot = Snapshot::builder() - .version(3) - .id(1) - .schema_id(0) - .base_manifest_list("manifest-list-4091b92b-01d3-4b91-8e47-8c9f61847d2f-0".to_string()) - .delta_manifest_list("manifest-list-4091b92b-01d3-4b91-8e47-8c9f61847d2f-1".to_string()) - .changelog_manifest_list(Some( - "manifest-list-4091b92b-01d3-4b91-8e47-8c9f61847d2f-2".to_string(), - )) - .index_manifest(Some( - "index-manifest-55e3e815-08ab-4a70-a808-a8df5d275cb4-0".to_string(), - )) - .commit_user("cf568e07-05ad-4943-b4bd-37461bc58729".to_string()) - .commit_identifier(9223372036854775807) - .time_millis(1721287833568) - .total_record_count(Some(1)) - .delta_record_count(Some(1)) - .changelog_record_count(Some(0)) - .watermark(Some(-9223372036854775808)) - .statistics(Some("statistics_v2".to_string())) - .build(); - - assert_eq!(snapshot.version(), 3); - assert_eq!(snapshot.id(), 1); - assert_eq!(snapshot.schema_id(), 0); - assert_eq!( - snapshot.base_manifest_list(), - "manifest-list-4091b92b-01d3-4b91-8e47-8c9f61847d2f-0" - ); - assert_eq!( - snapshot.delta_manifest_list(), - "manifest-list-4091b92b-01d3-4b91-8e47-8c9f61847d2f-1" - ); - assert_eq!( - snapshot.changelog_manifest_list(), - Some("manifest-list-4091b92b-01d3-4b91-8e47-8c9f61847d2f-2") - ); - assert_eq!( - snapshot.index_manifest(), - Some("index-manifest-55e3e815-08ab-4a70-a808-a8df5d275cb4-0") - ); - assert_eq!( - snapshot.commit_user(), - "cf568e07-05ad-4943-b4bd-37461bc58729" - ); - assert_eq!(snapshot.commit_identifier(), 9223372036854775807); - assert_eq!(snapshot.time_millis(), 1721287833568); - assert_eq!( - snapshot.changelog_manifest_list(), - Some("manifest-list-4091b92b-01d3-4b91-8e47-8c9f61847d2f-2") - ); - assert_eq!(snapshot.total_record_count(), Some(1)); - assert_eq!(snapshot.delta_record_count(), Some(1)); - assert_eq!(snapshot.changelog_record_count(), Some(0)); - assert_eq!(snapshot.watermark(), Some(-9223372036854775808)); - assert_eq!(snapshot.statistics(), Some("statistics_v2")); - } - - #[test] - fn test_snapshot_with_none_fields() { - let snapshot = Snapshot::builder() - .version(3) - .id(1) - .schema_id(0) - .base_manifest_list("manifest-list-4091b92b-01d3-4b91-8e47-8c9f61847d2f-0".to_string()) - .delta_manifest_list("manifest-list-4091b92b-01d3-4b91-8e47-8c9f61847d2f-1".to_string()) - .changelog_manifest_list(None) - .index_manifest(None) - .commit_user("cf568e07-05ad-4943-b4bd-37461bc58729".to_string()) - .commit_identifier(9223372036854775807) - .time_millis(1721287833568) - .total_record_count(None) - .delta_record_count(None) - .changelog_record_count(None) - .watermark(None) - .statistics(None) - .build(); - - assert_eq!(snapshot.version(), 3); - assert_eq!(snapshot.id(), 1); - assert_eq!(snapshot.schema_id(), 0); - assert_eq!( - snapshot.base_manifest_list(), - "manifest-list-4091b92b-01d3-4b91-8e47-8c9f61847d2f-0" - ); - assert_eq!( - snapshot.delta_manifest_list(), - "manifest-list-4091b92b-01d3-4b91-8e47-8c9f61847d2f-1" - ); - assert_eq!( - snapshot.commit_user(), - "cf568e07-05ad-4943-b4bd-37461bc58729" - ); - assert_eq!(snapshot.commit_identifier(), 9223372036854775807); - assert_eq!(snapshot.time_millis(), 1721287833568); - assert!(snapshot.changelog_manifest_list().is_none()); - assert!(snapshot.index_manifest().is_none()); - assert!(snapshot.log_offsets().is_none()); - assert!(snapshot.total_record_count().is_none()); - assert!(snapshot.delta_record_count().is_none()); - assert!(snapshot.changelog_record_count().is_none()); - assert!(snapshot.watermark().is_none()); - assert!(snapshot.statistics().is_none()); + fn test_cases() -> Vec<(&'static str, Snapshot)> { + vec![ + ( + "snapshot-v3", + Snapshot::builder() + .version(3) + .id(2) + .schema_id(0) + .base_manifest_list( + "manifest-list-ea4b892d-edc8-4ee7-9eee-7068b83a947b-0".to_string(), + ) + .delta_manifest_list( + "manifest-list-ea4b892d-edc8-4ee7-9eee-7068b83a947b-1".to_string(), + ) + .commit_user("abbaac9e-4a17-43e3-b135-2269da263e3a".to_string()) + .commit_identifier(9223372036854775807) + .changelog_manifest_list(Some( + "manifest-list-ea4b892d-edc8-4ee7-9eee-7068b83a947b-2".to_string(), + )) + .commit_kind(CommitKind::APPEND) + .time_millis(1724509030368) + .log_offsets(Some(HashMap::default())) + .total_record_count(Some(4)) + .delta_record_count(Some(2)) + .changelog_record_count(Some(2)) + .statistics(Some("statistics_string".to_string())) + .build(), + ), + ( + "snapshot-v3-none-field", + Snapshot::builder() + .version(3) + .id(2) + .schema_id(0) + .base_manifest_list( + "manifest-list-ea4b892d-edc8-4ee7-9eee-7068b83a947b-0".to_string(), + ) + .delta_manifest_list( + "manifest-list-ea4b892d-edc8-4ee7-9eee-7068b83a947b-1".to_string(), + ) + .commit_user("abbaac9e-4a17-43e3-b135-2269da263e3a".to_string()) + .commit_identifier(9223372036854775807) + .changelog_manifest_list(None) + .commit_kind(CommitKind::APPEND) + .time_millis(1724509030368) + .log_offsets(Some(HashMap::default())) + .total_record_count(Some(4)) + .delta_record_count(Some(2)) + .changelog_record_count(Some(2)) + .build(), + ), + ] } #[test] fn test_snapshot_serialization_deserialization() { - let data = r#" - { - "version" : 3, - "id" : 1, - "schemaId" : 0, - "baseManifestList" : "manifest-list-4091b92b-01d3-4b91-8e47-8c9f61847d2f-0", - "deltaManifestList" : "manifest-list-4091b92b-01d3-4b91-8e47-8c9f61847d2f-1", - "changelogManifestList" : null, - "indexManifest" : "index-manifest-55e3e815-08ab-4a70-a808-a8df5d275cb4-0", - "commitUser" : "cf568e07-05ad-4943-b4bd-37461bc58729", - "commitIdentifier" : 9223372036854775807, - "timeMillis" : 1721287833568, - "logOffsets" : { }, - "totalRecordCount" : 1, - "deltaRecordCount" : 1, - "changelogRecordCount" : 0, - "watermark" : -9223372036854775808 - } - "#; - - let snapshot: Snapshot = - serde_json::from_str(data).expect("Failed to deserialize Snapshot"); - - assert_eq!(snapshot.version(), 3); - assert_eq!(snapshot.id(), 1); - assert_eq!(snapshot.schema_id(), 0); - assert_eq!( - snapshot.base_manifest_list(), - "manifest-list-4091b92b-01d3-4b91-8e47-8c9f61847d2f-0" - ); - assert_eq!( - snapshot.delta_manifest_list(), - "manifest-list-4091b92b-01d3-4b91-8e47-8c9f61847d2f-1" - ); - assert_eq!(snapshot.changelog_manifest_list(), None); - assert_eq!( - snapshot.index_manifest(), - Some("index-manifest-55e3e815-08ab-4a70-a808-a8df5d275cb4-0") - ); - assert_eq!( - snapshot.commit_user(), - "cf568e07-05ad-4943-b4bd-37461bc58729" - ); - assert_eq!(snapshot.commit_identifier(), 9223372036854775807); - assert_eq!(snapshot.time_millis(), 1721287833568); - assert!(snapshot.changelog_manifest_list().is_none()); - assert_eq!(snapshot.total_record_count(), Some(1)); - assert_eq!(snapshot.delta_record_count(), Some(1)); - assert_eq!(snapshot.changelog_record_count(), Some(0)); - assert_eq!(snapshot.watermark(), Some(-9223372036854775808)); - - let serialized = serde_json::to_string(&snapshot).expect("Failed to serialize Snapshot"); + for (name, expect) in test_cases() { + let content = load_fixture(name); + let snapshot: Snapshot = + serde_json::from_str(content.as_str()).expect("Failed to deserialize Snapshot"); + assert_eq!(snapshot, expect); + let serialized = + serde_json::to_string(&snapshot).expect("Failed to serialize Snapshot"); - let deserialized: Snapshot = - serde_json::from_str(&serialized).expect("Failed to deserialize serialized Snapshot"); + let deserialized: Snapshot = serde_json::from_str(&serialized) + .expect("Failed to deserialize serialized Snapshot"); - assert_eq!(snapshot, deserialized); + assert_eq!(snapshot, deserialized); + } } } diff --git a/crates/paimon/tests/fixtures/snapshot/snapshot-v3-none-field.json b/crates/paimon/tests/fixtures/snapshot/snapshot-v3-none-field.json new file mode 100644 index 0000000..c853167 --- /dev/null +++ b/crates/paimon/tests/fixtures/snapshot/snapshot-v3-none-field.json @@ -0,0 +1,16 @@ +{ + "version": 3, + "id": 2, + "schemaId": 0, + "baseManifestList": "manifest-list-ea4b892d-edc8-4ee7-9eee-7068b83a947b-0", + "deltaManifestList": "manifest-list-ea4b892d-edc8-4ee7-9eee-7068b83a947b-1", + "changelogManifestList": null, + "commitUser": "abbaac9e-4a17-43e3-b135-2269da263e3a", + "commitIdentifier": 9223372036854775807, + "commitKind": "APPEND", + "timeMillis": 1724509030368, + "logOffsets": {}, + "totalRecordCount": 4, + "deltaRecordCount": 2, + "changelogRecordCount": 2 +} \ No newline at end of file diff --git a/crates/paimon/tests/fixtures/snapshot/snapshot-v3.json b/crates/paimon/tests/fixtures/snapshot/snapshot-v3.json new file mode 100644 index 0000000..894c984 --- /dev/null +++ b/crates/paimon/tests/fixtures/snapshot/snapshot-v3.json @@ -0,0 +1,17 @@ +{ + "version": 3, + "id": 2, + "schemaId": 0, + "baseManifestList": "manifest-list-ea4b892d-edc8-4ee7-9eee-7068b83a947b-0", + "deltaManifestList": "manifest-list-ea4b892d-edc8-4ee7-9eee-7068b83a947b-1", + "changelogManifestList": "manifest-list-ea4b892d-edc8-4ee7-9eee-7068b83a947b-2", + "commitUser": "abbaac9e-4a17-43e3-b135-2269da263e3a", + "commitIdentifier": 9223372036854775807, + "commitKind": "APPEND", + "timeMillis": 1724509030368, + "logOffsets": {}, + "totalRecordCount": 4, + "deltaRecordCount": 2, + "changelogRecordCount": 2, + "statistics": "statistics_string" +} \ No newline at end of file