Skip to content

Commit

Permalink
feat(spec): Add CommitKind in snapshot (#64)
Browse files Browse the repository at this point in the history
  • Loading branch information
Aitozi authored Sep 1, 2024
1 parent b91a3b0 commit 5a5220d
Show file tree
Hide file tree
Showing 3 changed files with 128 additions and 198 deletions.
293 changes: 95 additions & 198 deletions crates/paimon/src/spec/snapshot.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,22 @@ use serde::{Deserialize, Serialize};
use std::collections::HashMap;
use typed_builder::TypedBuilder;

/// Type of changes in this snapshot.
#[derive(Debug, Clone, PartialEq, Eq, Deserialize, Serialize)]
pub enum CommitKind {
/// Changes flushed from the mem table.
APPEND,

/// Changes by compacting existing data files.
COMPACT,

/// Changes that clear up the whole partition and then add new records.
OVERWRITE,

/// Collect statistics.
ANALYZE,
}

/// Snapshot for paimon.
///
/// Impl Reference: <https://github.com/apache/paimon/blob/release-0.8.2/paimon-core/src/main/java/org/apache/paimon/Snapshot.java#L68>.
Expand All @@ -35,9 +51,11 @@ pub struct Snapshot {
delta_manifest_list: String,
/// a manifest list recording all changelog produced in this snapshot
#[builder(default = None)]
#[serde(skip_serializing_if = "Option::is_none")]
changelog_manifest_list: Option<String>,
/// a manifest recording all index files of this table
#[builder(default = None)]
#[serde(skip_serializing_if = "Option::is_none")]
index_manifest: Option<String>,
/// user who committed this snapshot
commit_user: String,
Expand All @@ -49,10 +67,12 @@ pub struct Snapshot {
/// If snapshot A has a smaller commitIdentifier than snapshot B, then snapshot A must be
/// committed before snapshot B, and thus snapshot A must contain older records than snapshot B.
commit_identifier: i64,
commit_kind: CommitKind,
/// timestamp of this snapshot
time_millis: u64,
/// log offsets of all changes occurred in this snapshot
#[builder(default = None)]
#[serde(skip_serializing_if = "Option::is_none")]
log_offsets: Option<HashMap<i32, i64>>,
/// record count of all changes occurred in this snapshot
#[builder(default = None)]
Expand All @@ -62,12 +82,15 @@ pub struct Snapshot {
delta_record_count: Option<i64>,
/// record count of all changelog produced in this snapshot
#[builder(default = None)]
#[serde(skip_serializing_if = "Option::is_none")]
changelog_record_count: Option<i64>,
/// watermark for input records
#[builder(default = None)]
#[serde(skip_serializing_if = "Option::is_none")]
watermark: Option<i64>,
/// stats file name for statistics of this table
#[builder(default = None)]
#[serde(skip_serializing_if = "Option::is_none")]
statistics: Option<String>,
}

Expand Down Expand Up @@ -172,213 +195,87 @@ impl Snapshot {
#[cfg(test)]
mod tests {
use super::*;
use pretty_assertions::assert_eq;
use serde_json;
use std::env::current_dir;

#[test]
fn test_snapshot_creation() {
let snapshot = Snapshot::builder()
.version(3)
.id(1)
.schema_id(0)
.base_manifest_list("manifest-list-4091b92b-01d3-4b91-8e47-8c9f61847d2f-0".to_string())
.delta_manifest_list("manifest-list-4091b92b-01d3-4b91-8e47-8c9f61847d2f-1".to_string())
.commit_user("cf568e07-05ad-4943-b4bd-37461bc58729".to_string())
.commit_identifier(9223372036854775807)
.time_millis(1721287833568)
.build();

assert_eq!(snapshot.version(), 3);
assert_eq!(snapshot.id(), 1);
assert_eq!(snapshot.schema_id(), 0);
assert_eq!(
snapshot.base_manifest_list(),
"manifest-list-4091b92b-01d3-4b91-8e47-8c9f61847d2f-0"
);
assert_eq!(
snapshot.delta_manifest_list(),
"manifest-list-4091b92b-01d3-4b91-8e47-8c9f61847d2f-1"
);
assert_eq!(
snapshot.commit_user(),
"cf568e07-05ad-4943-b4bd-37461bc58729"
);
assert_eq!(snapshot.commit_identifier(), 9223372036854775807);
assert_eq!(snapshot.time_millis(), 1721287833568);
assert!(snapshot.changelog_manifest_list().is_none());
assert!(snapshot.index_manifest().is_none());
assert!(snapshot.log_offsets().is_none());
assert!(snapshot.total_record_count().is_none());
assert!(snapshot.delta_record_count().is_none());
assert!(snapshot.changelog_record_count().is_none());
assert!(snapshot.watermark().is_none());
assert!(snapshot.statistics().is_none());
fn load_fixture(name: &str) -> String {
let path = current_dir()
.unwrap_or_else(|err| panic!("current_dir must exist: {err}"))
.join(format!("tests/fixtures/snapshot/{name}.json"));
let bytes = std::fs::read(&path)
.unwrap_or_else(|err| panic!("fixtures {path:?} load failed: {err}"));
String::from_utf8(bytes).expect("fixtures content must be valid utf8")
}

#[test]
fn test_snapshot_with_optional_fields() {
let snapshot = Snapshot::builder()
.version(3)
.id(1)
.schema_id(0)
.base_manifest_list("manifest-list-4091b92b-01d3-4b91-8e47-8c9f61847d2f-0".to_string())
.delta_manifest_list("manifest-list-4091b92b-01d3-4b91-8e47-8c9f61847d2f-1".to_string())
.changelog_manifest_list(Some(
"manifest-list-4091b92b-01d3-4b91-8e47-8c9f61847d2f-2".to_string(),
))
.index_manifest(Some(
"index-manifest-55e3e815-08ab-4a70-a808-a8df5d275cb4-0".to_string(),
))
.commit_user("cf568e07-05ad-4943-b4bd-37461bc58729".to_string())
.commit_identifier(9223372036854775807)
.time_millis(1721287833568)
.total_record_count(Some(1))
.delta_record_count(Some(1))
.changelog_record_count(Some(0))
.watermark(Some(-9223372036854775808))
.statistics(Some("statistics_v2".to_string()))
.build();

assert_eq!(snapshot.version(), 3);
assert_eq!(snapshot.id(), 1);
assert_eq!(snapshot.schema_id(), 0);
assert_eq!(
snapshot.base_manifest_list(),
"manifest-list-4091b92b-01d3-4b91-8e47-8c9f61847d2f-0"
);
assert_eq!(
snapshot.delta_manifest_list(),
"manifest-list-4091b92b-01d3-4b91-8e47-8c9f61847d2f-1"
);
assert_eq!(
snapshot.changelog_manifest_list(),
Some("manifest-list-4091b92b-01d3-4b91-8e47-8c9f61847d2f-2")
);
assert_eq!(
snapshot.index_manifest(),
Some("index-manifest-55e3e815-08ab-4a70-a808-a8df5d275cb4-0")
);
assert_eq!(
snapshot.commit_user(),
"cf568e07-05ad-4943-b4bd-37461bc58729"
);
assert_eq!(snapshot.commit_identifier(), 9223372036854775807);
assert_eq!(snapshot.time_millis(), 1721287833568);
assert_eq!(
snapshot.changelog_manifest_list(),
Some("manifest-list-4091b92b-01d3-4b91-8e47-8c9f61847d2f-2")
);
assert_eq!(snapshot.total_record_count(), Some(1));
assert_eq!(snapshot.delta_record_count(), Some(1));
assert_eq!(snapshot.changelog_record_count(), Some(0));
assert_eq!(snapshot.watermark(), Some(-9223372036854775808));
assert_eq!(snapshot.statistics(), Some("statistics_v2"));
}

#[test]
fn test_snapshot_with_none_fields() {
let snapshot = Snapshot::builder()
.version(3)
.id(1)
.schema_id(0)
.base_manifest_list("manifest-list-4091b92b-01d3-4b91-8e47-8c9f61847d2f-0".to_string())
.delta_manifest_list("manifest-list-4091b92b-01d3-4b91-8e47-8c9f61847d2f-1".to_string())
.changelog_manifest_list(None)
.index_manifest(None)
.commit_user("cf568e07-05ad-4943-b4bd-37461bc58729".to_string())
.commit_identifier(9223372036854775807)
.time_millis(1721287833568)
.total_record_count(None)
.delta_record_count(None)
.changelog_record_count(None)
.watermark(None)
.statistics(None)
.build();

assert_eq!(snapshot.version(), 3);
assert_eq!(snapshot.id(), 1);
assert_eq!(snapshot.schema_id(), 0);
assert_eq!(
snapshot.base_manifest_list(),
"manifest-list-4091b92b-01d3-4b91-8e47-8c9f61847d2f-0"
);
assert_eq!(
snapshot.delta_manifest_list(),
"manifest-list-4091b92b-01d3-4b91-8e47-8c9f61847d2f-1"
);
assert_eq!(
snapshot.commit_user(),
"cf568e07-05ad-4943-b4bd-37461bc58729"
);
assert_eq!(snapshot.commit_identifier(), 9223372036854775807);
assert_eq!(snapshot.time_millis(), 1721287833568);
assert!(snapshot.changelog_manifest_list().is_none());
assert!(snapshot.index_manifest().is_none());
assert!(snapshot.log_offsets().is_none());
assert!(snapshot.total_record_count().is_none());
assert!(snapshot.delta_record_count().is_none());
assert!(snapshot.changelog_record_count().is_none());
assert!(snapshot.watermark().is_none());
assert!(snapshot.statistics().is_none());
fn test_cases() -> Vec<(&'static str, Snapshot)> {
vec![
(
"snapshot-v3",
Snapshot::builder()
.version(3)
.id(2)
.schema_id(0)
.base_manifest_list(
"manifest-list-ea4b892d-edc8-4ee7-9eee-7068b83a947b-0".to_string(),
)
.delta_manifest_list(
"manifest-list-ea4b892d-edc8-4ee7-9eee-7068b83a947b-1".to_string(),
)
.commit_user("abbaac9e-4a17-43e3-b135-2269da263e3a".to_string())
.commit_identifier(9223372036854775807)
.changelog_manifest_list(Some(
"manifest-list-ea4b892d-edc8-4ee7-9eee-7068b83a947b-2".to_string(),
))
.commit_kind(CommitKind::APPEND)
.time_millis(1724509030368)
.log_offsets(Some(HashMap::default()))
.total_record_count(Some(4))
.delta_record_count(Some(2))
.changelog_record_count(Some(2))
.statistics(Some("statistics_string".to_string()))
.build(),
),
(
"snapshot-v3-none-field",
Snapshot::builder()
.version(3)
.id(2)
.schema_id(0)
.base_manifest_list(
"manifest-list-ea4b892d-edc8-4ee7-9eee-7068b83a947b-0".to_string(),
)
.delta_manifest_list(
"manifest-list-ea4b892d-edc8-4ee7-9eee-7068b83a947b-1".to_string(),
)
.commit_user("abbaac9e-4a17-43e3-b135-2269da263e3a".to_string())
.commit_identifier(9223372036854775807)
.changelog_manifest_list(None)
.commit_kind(CommitKind::APPEND)
.time_millis(1724509030368)
.log_offsets(Some(HashMap::default()))
.total_record_count(Some(4))
.delta_record_count(Some(2))
.changelog_record_count(Some(2))
.build(),
),
]
}

#[test]
fn test_snapshot_serialization_deserialization() {
let data = r#"
{
"version" : 3,
"id" : 1,
"schemaId" : 0,
"baseManifestList" : "manifest-list-4091b92b-01d3-4b91-8e47-8c9f61847d2f-0",
"deltaManifestList" : "manifest-list-4091b92b-01d3-4b91-8e47-8c9f61847d2f-1",
"changelogManifestList" : null,
"indexManifest" : "index-manifest-55e3e815-08ab-4a70-a808-a8df5d275cb4-0",
"commitUser" : "cf568e07-05ad-4943-b4bd-37461bc58729",
"commitIdentifier" : 9223372036854775807,
"timeMillis" : 1721287833568,
"logOffsets" : { },
"totalRecordCount" : 1,
"deltaRecordCount" : 1,
"changelogRecordCount" : 0,
"watermark" : -9223372036854775808
}
"#;

let snapshot: Snapshot =
serde_json::from_str(data).expect("Failed to deserialize Snapshot");

assert_eq!(snapshot.version(), 3);
assert_eq!(snapshot.id(), 1);
assert_eq!(snapshot.schema_id(), 0);
assert_eq!(
snapshot.base_manifest_list(),
"manifest-list-4091b92b-01d3-4b91-8e47-8c9f61847d2f-0"
);
assert_eq!(
snapshot.delta_manifest_list(),
"manifest-list-4091b92b-01d3-4b91-8e47-8c9f61847d2f-1"
);
assert_eq!(snapshot.changelog_manifest_list(), None);
assert_eq!(
snapshot.index_manifest(),
Some("index-manifest-55e3e815-08ab-4a70-a808-a8df5d275cb4-0")
);
assert_eq!(
snapshot.commit_user(),
"cf568e07-05ad-4943-b4bd-37461bc58729"
);
assert_eq!(snapshot.commit_identifier(), 9223372036854775807);
assert_eq!(snapshot.time_millis(), 1721287833568);
assert!(snapshot.changelog_manifest_list().is_none());
assert_eq!(snapshot.total_record_count(), Some(1));
assert_eq!(snapshot.delta_record_count(), Some(1));
assert_eq!(snapshot.changelog_record_count(), Some(0));
assert_eq!(snapshot.watermark(), Some(-9223372036854775808));

let serialized = serde_json::to_string(&snapshot).expect("Failed to serialize Snapshot");
for (name, expect) in test_cases() {
let content = load_fixture(name);
let snapshot: Snapshot =
serde_json::from_str(content.as_str()).expect("Failed to deserialize Snapshot");
assert_eq!(snapshot, expect);
let serialized =
serde_json::to_string(&snapshot).expect("Failed to serialize Snapshot");

let deserialized: Snapshot =
serde_json::from_str(&serialized).expect("Failed to deserialize serialized Snapshot");
let deserialized: Snapshot = serde_json::from_str(&serialized)
.expect("Failed to deserialize serialized Snapshot");

assert_eq!(snapshot, deserialized);
assert_eq!(snapshot, deserialized);
}
}
}
16 changes: 16 additions & 0 deletions crates/paimon/tests/fixtures/snapshot/snapshot-v3-none-field.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
{
"version": 3,
"id": 2,
"schemaId": 0,
"baseManifestList": "manifest-list-ea4b892d-edc8-4ee7-9eee-7068b83a947b-0",
"deltaManifestList": "manifest-list-ea4b892d-edc8-4ee7-9eee-7068b83a947b-1",
"changelogManifestList": null,
"commitUser": "abbaac9e-4a17-43e3-b135-2269da263e3a",
"commitIdentifier": 9223372036854775807,
"commitKind": "APPEND",
"timeMillis": 1724509030368,
"logOffsets": {},
"totalRecordCount": 4,
"deltaRecordCount": 2,
"changelogRecordCount": 2
}
17 changes: 17 additions & 0 deletions crates/paimon/tests/fixtures/snapshot/snapshot-v3.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
{
"version": 3,
"id": 2,
"schemaId": 0,
"baseManifestList": "manifest-list-ea4b892d-edc8-4ee7-9eee-7068b83a947b-0",
"deltaManifestList": "manifest-list-ea4b892d-edc8-4ee7-9eee-7068b83a947b-1",
"changelogManifestList": "manifest-list-ea4b892d-edc8-4ee7-9eee-7068b83a947b-2",
"commitUser": "abbaac9e-4a17-43e3-b135-2269da263e3a",
"commitIdentifier": 9223372036854775807,
"commitKind": "APPEND",
"timeMillis": 1724509030368,
"logOffsets": {},
"totalRecordCount": 4,
"deltaRecordCount": 2,
"changelogRecordCount": 2,
"statistics": "statistics_string"
}

0 comments on commit 5a5220d

Please sign in to comment.