Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(catalog): add catalog API #62

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion crates/paimon/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,8 @@ storage-memory = ["opendal/services-memory"]
storage-fs = ["opendal/services-fs"]

[dependencies]
async-trait = "0.1"
url = "2.5.2"
async-trait = "0.1.81"
bytes = "1.7.1"
bitflags = "2.6.0"
tokio = { version = "1.39.2", features = ["macros"] }
Expand Down
254 changes: 254 additions & 0 deletions crates/paimon/src/catalog/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,254 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

use std::collections::HashMap;
use std::fmt;
use std::hash::Hash;

use async_trait::async_trait;
use chrono::Duration;

use crate::error::Result;
use crate::io::FileIO;
use crate::spec::{RowType, SchemaChange, TableSchema};

/// This interface is responsible for reading and writing metadata such as database/table from a paimon catalog.
///
/// Impl References: <https://github.com/apache/paimon/blob/release-0.8.2/paimon-core/src/main/java/org/apache/paimon/catalog/Catalog.java#L42>
#[async_trait]
pub trait Catalog: Send + Sync {
const DEFAULT_DATABASE: &'static str = "default";
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Using const will make this trait non-object safe, preventing users from invoking Catalog through Arc<dyn Catalog>. How about introducing a new API, such as info, for this purpose?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK, I will do it later.

const SYSTEM_TABLE_SPLITTER: &'static str = "$";
const SYSTEM_DATABASE_NAME: &'static str = "sys";

/// Returns the warehouse root path containing all database directories in this catalog.
fn warehouse(&self) -> &str;

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do APIs like lockFactory need to be added?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do APIs like lockFactory need to be added?

I think this PR will add the struct and definition first, maybe we can add some APIs like lockFactory in the further PRs.

/// Returns the catalog options.
fn options(&self) -> &HashMap<String, String>;

/// Returns the FileIO instance.
fn file_io(&self) -> &FileIO;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The catalog is integrated with file I/O, which is somewhat surprising to me.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The catalog is integrated with file I/O, which is somewhat surprising to me.

The FileIO fileIO() function can be found at here. Do you have some good suggestions to deal this?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I need some input from @JingsongLi, @SteNicholas, and @Aitozi: Should a catalog be coupled with a specific file I/O?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Currently, the catalog is bind with a warehouse path which determines the fileIO. So these two things are coupled in current shape, and the table's schema are also retrieved from the filesystem. Also like to see some inputs from @JingsongLi @SteNicholas

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@QuakeWang, @Xuanwo, @Aitozi, the warehoused path is required to determine which FileIO to use. This interface makes sense to me.


/// Lists all databases in this catalog.
async fn list_databases(&self) -> Result<Vec<String>>;

/// Checks if a database exists in this catalog.
async fn database_exists(&self, database_name: &str) -> Result<bool>;

/// Creates a new database.
async fn create_database(
&self,
name: &str,
ignore_if_exists: bool,
properties: Option<HashMap<String, String>>,
) -> Result<()>;

/// Loads database properties.
async fn load_database_properties(&self, name: &str) -> Result<HashMap<String, String>>;

/// Drops a database.
async fn drop_database(
&self,
name: &str,
ignore_if_not_exists: bool,
cascade: bool,
) -> Result<()>;

/// Returns a Table instance for the specified identifier.
async fn get_table(&self, identifier: &Identifier) -> Result<impl Table>;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Implementing Table makes this trait non-object safe. Perhaps we could return a Table struct instead.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Implementing Table makes this trait non-object safe. Perhaps we could return a Table struct instead.

Ok, I will create a Table struct instead of trait.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the java version,Table interface represents various specialized table types. Is it possible to define a simple Table struct in rust ?
What about returning Box<dyn Table> instead?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the java version,Table interface represents various specialized table types. Is it possible to define a simple Table struct in rust ? What about returning Box<dyn Table> instead?

Maybe we will not use dyn, when returning Box<dyn Table> has some error like error[E0404]: expected trait, found struct Table. In Rust, dyn is used for dynamic dispatch with trait objects.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the java version,Table interface represents various specialized table types. Is it possible to define a simple Table struct in rust ? What about returning Box<dyn Table> instead?

Maybe we will not use dyn, when returning Box<dyn Table> has some error like error[E0404]: expected trait, found struct Table. In Rust, dyn is used for dynamic dispatch with trait objects.

Why Box<dyn Table> would give error[E0404] ? It should not when using properly.
I think in many cases, the return value of get_table needs to be downcast to a concrete type, so it makes sense to use dyn Table.
Alternatively, maybe using enum Table is feasible


/// Lists all tables in the specified database.
async fn list_tables(&self, database_name: &str) -> Result<Vec<String>>;

/// Checks if a table exists.
async fn table_exists(&self, identifier: &Identifier) -> Result<bool> {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's a bit surprising that we use Identifier in some places but not in others. Shouldn't we be consistent?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's a bit surprising that we use Identifier in some places but not in others. Shouldn't we be consistent?

In the Java version, Identifier is usually used for table-related methods.

match self.get_table(identifier).await {
Ok(_) => Ok(true),
Err(e) => match e {
crate::error::Error::TableNotExist { .. } => Ok(false),
_ => Err(e),
},
}
}

/// Drops a table.
async fn drop_table(&self, identifier: &Identifier, ignore_if_not_exists: bool) -> Result<()>;

/// Creates a new table.
async fn create_table(
&self,
identifier: &Identifier,
schema: TableSchema,
ignore_if_exists: bool,
) -> Result<()>;

/// Renames a table.
async fn rename_table(
&self,
from_table: &Identifier,
to_table: &Identifier,
ignore_if_not_exists: bool,
) -> Result<()>;

/// Alters an existing table.
async fn alter_table(
&self,
identifier: &Identifier,
changes: Vec<SchemaChange>,
ignore_if_not_exists: bool,
) -> Result<()>;

/// Drops a partition from the specified table.
async fn drop_partition(
&self,
identifier: &Identifier,
partitions: &HashMap<String, String>,
) -> Result<()>;

/// Returns whether this catalog is case-sensitive.
fn case_sensitive(&self) -> bool {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe we can consolidate those metadata-related items into a single API.

true
}
}

/// Identifies an object in a catalog.
///
/// Impl References: <https://github.com/apache/paimon/blob/release-0.8.2/paimon-core/src/main/java/org/apache/paimon/catalog/Identifier.java#L35>
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
pub struct Identifier {
database: String,
table: String,
}

impl Identifier {
pub const UNKNOWN_DATABASE: &'static str = "unknown";

/// Create a new identifier.
pub fn new(database: String, table: String) -> Self {
Self { database, table }
}

/// Get the table name.
pub fn database_name(&self) -> &str {
&self.database
}

/// Get the table name.
pub fn object_name(&self) -> &str {
&self.table
}

/// Get the full name of the identifier.
pub fn full_name(&self) -> String {
if self.database == Self::UNKNOWN_DATABASE {
self.table.clone()
} else {
format!("{}.{}", self.database, self.table)
}
}

/// Get the full name of the identifier with a specified character.
pub fn escaped_full_name(&self) -> String {
self.escaped_full_name_with_char('`')
}

/// Get the full name of the identifier with a specified character.
pub fn escaped_full_name_with_char(&self, escape_char: char) -> String {
format!(
"{0}{1}{0}.{0}{2}{0}",
escape_char, self.database, self.table
)
}

/// Create a new identifier.
pub fn create(db: &str, table: &str) -> Self {
Self::new(db.to_string(), table.to_string())
}
}

impl fmt::Display for Identifier {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "{}", self.full_name())
}
}

/// A table provides basic abstraction for a table type and table scan, and table read.
///
/// Impl Reference: <https://github.com/apache/paimon/blob/release-0.8.2/paimon-core/src/main/java/org/apache/paimon/table/Table.java#L41>
pub trait Table {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I feel like Table should be a struct instead of a trait.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I will create a simple Table struct as the return value of the get_table method. With more design details about Table we can create a new issue for discussion. What do you think of it?

// ================== Table Metadata =====================

/// A name to identify this table.
fn name(&self) -> &str;

/// Returns the row type of this table.
fn row_type(&self) -> &RowType;

/// Partition keys of this table.
fn partition_keys(&self) -> Vec<String>;

/// Primary keys of this table.
fn primary_keys(&self) -> Vec<String>;

/// Options of this table.
fn options(&self) -> HashMap<String, String>;

/// Optional comment of this table.
fn comment(&self) -> Option<&String>;

// ================= Table Operations ====================

/// Copy this table with adding dynamic options.
fn copy(&self, dynamic_options: HashMap<String, String>) -> Box<dyn Table>;

/// Rollback table's state to a specific snapshot.
fn rollback_to(&mut self, snapshot_id: u64);

/// Create a tag from given snapshot.
fn create_tag(&mut self, tag_name: &str, from_snapshot_id: u64);

fn create_tag_with_retention(
&mut self,
tag_name: &str,
from_snapshot_id: u64,
time_retained: Duration,
);

/// Create a tag from the latest snapshot.
fn create_tag_from_latest(&mut self, tag_name: &str);

fn create_tag_from_latest_with_retention(&mut self, tag_name: &str, time_retained: Duration);

/// Delete a tag by name.
fn delete_tag(&mut self, tag_name: &str);

/// Rollback table's state to a specific tag.
fn rollback_to_tag(&mut self, tag_name: &str);

/// Create an empty branch.
fn create_branch(&mut self, branch_name: &str);

/// Create a branch from given snapshot.
fn create_branch_from_snapshot(&mut self, branch_name: &str, snapshot_id: u64);

/// Create a branch from given tag.
fn create_branch_from_tag(&mut self, branch_name: &str, tag_name: &str);

/// Delete a branch by branchName.
fn delete_branch(&mut self, branch_name: &str);
}
32 changes: 32 additions & 0 deletions crates/paimon/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

use snafu::prelude::*;

use crate::catalog::Identifier;

/// Result type used in paimon.
pub type Result<T, E = Error> = std::result::Result<T, E>;

Expand Down Expand Up @@ -52,6 +54,36 @@ pub enum Error {
display("Paimon hitting invalid config: {}", message)
)]
ConfigInvalid { message: String },

#[snafu(display("Database {} is not empty.", database))]
DatabaseNotEmpty { database: String },

#[snafu(display("Database {} already exists.", database))]
DatabaseAlreadyExist { database: String },

#[snafu(display("Database {} does not exist.", database))]
DatabaseNotExist { database: String },

#[snafu(display("Can't do operation on system database."))]
ProcessSystemDatabase,

#[snafu(display("Table {} already exists.", identifier.full_name()))]
TableAlreadyExist { identifier: Identifier },

#[snafu(display("Table {} does not exist.", identifier.full_name()))]
TableNotExist { identifier: Identifier },

#[snafu(display("Partition {} do not exist in the table {}.", identifier.full_name(), partitions))]
PartitionNotExist {
identifier: Identifier,
partitions: String,
},

#[snafu(display("Column {} already exists.", column_name))]
ColumnAlreadyExist { column_name: String },

#[snafu(display("Column {} does not exist.", column_name))]
ColumnNotExist { column_name: String },
}

impl From<opendal::Error> for Error {
Expand Down
1 change: 1 addition & 0 deletions crates/paimon/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,5 +19,6 @@ mod error;
pub use error::Error;
pub use error::Result;

pub mod catalog;
pub mod io;
pub mod spec;
Loading