Skip to content

Commit

Permalink
feat: ttl=0/instant/forever/humantime&ttl refactor (#5089)
Browse files Browse the repository at this point in the history
* feat: ttl zero filter

* refactor: use TimeToLive enum

* fix: unit test

* tests: sqlness

* refactor: Option<TTL> None means UNSET

* tests: sqlness

* fix: 10000 years --> forever

* chore: minor refactor from reviews

* chore: rename back TimeToLive

* refactor: split imme request from normal requests

* fix: use correct lifetime

* refactor: rename immediate to instant

* tests: flow sink table default ttl

* refactor: per review

* tests: sqlness

* fix: ttl alter to instant

* tests: sqlness

* refactor: per review

* chore: per review

* feat: add db ttl type&forbid instant for db

* tests: more unit test
  • Loading branch information
discord9 authored and MichaelScofield committed Dec 27, 2024
1 parent d40cadd commit 0df75ac
Show file tree
Hide file tree
Showing 39 changed files with 1,729 additions and 169 deletions.
4 changes: 4 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

10 changes: 3 additions & 7 deletions src/common/meta/src/ddl/alter_database.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,11 +46,7 @@ fn build_new_schema_value(
for option in options.0.iter() {
match option {
SetDatabaseOption::Ttl(ttl) => {
if ttl.is_zero() {
value.ttl = None;
} else {
value.ttl = Some(*ttl);
}
value.ttl = Some(*ttl);
}
}
}
Expand Down Expand Up @@ -230,12 +226,12 @@ mod tests {
#[test]
fn test_build_new_schema_value() {
let set_ttl = AlterDatabaseKind::SetDatabaseOptions(SetDatabaseOptions(vec![
SetDatabaseOption::Ttl(Duration::from_secs(10)),
SetDatabaseOption::Ttl(Duration::from_secs(10).into()),
]));
let current_schema_value = SchemaNameValue::default();
let new_schema_value =
build_new_schema_value(current_schema_value.clone(), &set_ttl).unwrap();
assert_eq!(new_schema_value.ttl, Some(Duration::from_secs(10)));
assert_eq!(new_schema_value.ttl, Some(Duration::from_secs(10).into()));

let unset_ttl_alter_kind =
AlterDatabaseKind::UnsetDatabaseOptions(UnsetDatabaseOptions(vec![
Expand Down
64 changes: 47 additions & 17 deletions src/common/meta/src/key/schema_name.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,9 @@
use std::collections::HashMap;
use std::fmt::Display;
use std::sync::Arc;
use std::time::Duration;

use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME};
use common_time::DatabaseTimeToLive;
use futures::stream::BoxStream;
use humantime_serde::re::humantime;
use serde::{Deserialize, Serialize};
Expand Down Expand Up @@ -57,15 +57,13 @@ impl Default for SchemaNameKey<'_> {
#[derive(Debug, Default, Clone, PartialEq, Serialize, Deserialize)]
pub struct SchemaNameValue {
#[serde(default)]
#[serde(with = "humantime_serde")]
pub ttl: Option<Duration>,
pub ttl: Option<DatabaseTimeToLive>,
}

impl Display for SchemaNameValue {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
if let Some(ttl) = self.ttl {
let ttl = humantime::format_duration(ttl);
write!(f, "ttl='{ttl}'")?;
if let Some(ttl) = self.ttl.map(|i| i.to_string()) {
write!(f, "ttl='{}'", ttl)?;
}

Ok(())
Expand Down Expand Up @@ -96,11 +94,8 @@ impl TryFrom<&HashMap<String, String>> for SchemaNameValue {
impl From<SchemaNameValue> for HashMap<String, String> {
fn from(value: SchemaNameValue) -> Self {
let mut opts = HashMap::new();
if let Some(ttl) = value.ttl {
opts.insert(
OPT_KEY_TTL.to_string(),
format!("{}", humantime::format_duration(ttl)),
);
if let Some(ttl) = value.ttl.map(|ttl| ttl.to_string()) {
opts.insert(OPT_KEY_TTL.to_string(), ttl);
}
opts
}
Expand Down Expand Up @@ -313,6 +308,7 @@ impl<'a> From<&'a SchemaName> for SchemaNameKey<'a> {

#[cfg(test)]
mod tests {
use std::time::Duration;

use super::*;
use crate::kv_backend::memory::MemoryKvBackend;
Expand All @@ -323,9 +319,14 @@ mod tests {
assert_eq!("", schema_value.to_string());

let schema_value = SchemaNameValue {
ttl: Some(Duration::from_secs(9)),
ttl: Some(Duration::from_secs(9).into()),
};
assert_eq!("ttl='9s'", schema_value.to_string());

let schema_value = SchemaNameValue {
ttl: Some(Duration::from_secs(0).into()),
};
assert_eq!("ttl='forever'", schema_value.to_string());
}

#[test]
Expand All @@ -338,17 +339,36 @@ mod tests {
assert_eq!(key, parsed);

let value = SchemaNameValue {
ttl: Some(Duration::from_secs(10)),
ttl: Some(Duration::from_secs(10).into()),
};
let mut opts: HashMap<String, String> = HashMap::new();
opts.insert("ttl".to_string(), "10s".to_string());
let from_value = SchemaNameValue::try_from(&opts).unwrap();
assert_eq!(value, from_value);

let parsed = SchemaNameValue::try_from_raw_value("{\"ttl\":\"10s\"}".as_bytes()).unwrap();
let parsed = SchemaNameValue::try_from_raw_value(
serde_json::json!({"ttl": "10s"}).to_string().as_bytes(),
)
.unwrap();
assert_eq!(Some(value), parsed);

let forever = SchemaNameValue {
ttl: Some(Default::default()),
};
let parsed = SchemaNameValue::try_from_raw_value(
serde_json::json!({"ttl": "forever"}).to_string().as_bytes(),
)
.unwrap();
assert_eq!(Some(forever), parsed);

let instant_err = SchemaNameValue::try_from_raw_value(
serde_json::json!({"ttl": "instant"}).to_string().as_bytes(),
);
assert!(instant_err.is_err());

let none = SchemaNameValue::try_from_raw_value("null".as_bytes()).unwrap();
assert!(none.is_none());

let err_empty = SchemaNameValue::try_from_raw_value("".as_bytes());
assert!(err_empty.is_err());
}
Expand All @@ -374,7 +394,7 @@ mod tests {

let current_schema_value = manager.get(schema_key).await.unwrap().unwrap();
let new_schema_value = SchemaNameValue {
ttl: Some(Duration::from_secs(10)),
ttl: Some(Duration::from_secs(10).into()),
};
manager
.update(schema_key, &current_schema_value, &new_schema_value)
Expand All @@ -388,10 +408,10 @@ mod tests {
.unwrap();

let new_schema_value = SchemaNameValue {
ttl: Some(Duration::from_secs(40)),
ttl: Some(Duration::from_secs(40).into()),
};
let incorrect_schema_value = SchemaNameValue {
ttl: Some(Duration::from_secs(20)),
ttl: Some(Duration::from_secs(20).into()),
}
.try_as_raw_value()
.unwrap();
Expand All @@ -402,5 +422,15 @@ mod tests {
.update(schema_key, &incorrect_schema_value, &new_schema_value)
.await
.unwrap_err();

let current_schema_value = manager.get(schema_key).await.unwrap().unwrap();
let new_schema_value = SchemaNameValue { ttl: None };
manager
.update(schema_key, &current_schema_value, &new_schema_value)
.await
.unwrap();

let current_schema_value = manager.get(schema_key).await.unwrap().unwrap();
assert_eq!(new_schema_value, *current_schema_value);
}
}
13 changes: 4 additions & 9 deletions src/common/meta/src/rpc/ddl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@

use std::collections::{HashMap, HashSet};
use std::result;
use std::time::Duration;

use api::v1::alter_database_expr::Kind as PbAlterDatabaseKind;
use api::v1::meta::ddl_task_request::Task;
Expand All @@ -36,7 +35,7 @@ use api::v1::{
};
use base64::engine::general_purpose;
use base64::Engine as _;
use humantime_serde::re::humantime;
use common_time::DatabaseTimeToLive;
use prost::Message;
use serde::{Deserialize, Serialize};
use serde_with::{serde_as, DefaultOnNull};
Expand Down Expand Up @@ -1009,12 +1008,8 @@ impl TryFrom<PbOption> for SetDatabaseOption {
fn try_from(PbOption { key, value }: PbOption) -> Result<Self> {
match key.to_ascii_lowercase().as_str() {
TTL_KEY => {
let ttl = if value.is_empty() {
Duration::from_secs(0)
} else {
humantime::parse_duration(&value)
.map_err(|_| InvalidSetDatabaseOptionSnafu { key, value }.build())?
};
let ttl = DatabaseTimeToLive::from_humantime_or_str(&value)
.map_err(|_| InvalidSetDatabaseOptionSnafu { key, value }.build())?;

Ok(SetDatabaseOption::Ttl(ttl))
}
Expand All @@ -1025,7 +1020,7 @@ impl TryFrom<PbOption> for SetDatabaseOption {

#[derive(Debug, PartialEq, Clone, Serialize, Deserialize)]
pub enum SetDatabaseOption {
Ttl(Duration),
Ttl(DatabaseTimeToLive),
}

#[derive(Debug, PartialEq, Clone, Serialize, Deserialize)]
Expand Down
2 changes: 2 additions & 0 deletions src/common/time/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@ chrono.workspace = true
chrono-tz = "0.8"
common-error.workspace = true
common-macro.workspace = true
humantime.workspace = true
humantime-serde.workspace = true
once_cell.workspace = true
serde = { version = "1.0", features = ["derive"] }
serde_json.workspace = true
Expand Down
16 changes: 16 additions & 0 deletions src/common/time/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -93,12 +93,28 @@ pub enum Error {
#[snafu(implicit)]
location: Location,
},

#[snafu(display("Failed to parse duration"))]
ParseDuration {
#[snafu(source)]
error: humantime::DurationError,
#[snafu(implicit)]
location: Location,
},

#[snafu(display("Database's TTL can't be `instant`"))]
InvalidDatabaseTtl {
#[snafu(implicit)]
location: Location,
},
}

impl ErrorExt for Error {
fn status_code(&self) -> StatusCode {
match self {
Error::ParseDateStr { .. }
| Error::ParseDuration { .. }
| Error::InvalidDatabaseTtl { .. }
| Error::ParseTimestamp { .. }
| Error::InvalidTimezoneOffset { .. }
| Error::Format { .. }
Expand Down
2 changes: 2 additions & 0 deletions src/common/time/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ pub mod time;
pub mod timestamp;
pub mod timestamp_millis;
pub mod timezone;
pub mod ttl;
pub mod util;

pub use date::Date;
Expand All @@ -32,3 +33,4 @@ pub use range::RangeMillis;
pub use timestamp::Timestamp;
pub use timestamp_millis::TimestampMillis;
pub use timezone::Timezone;
pub use ttl::{DatabaseTimeToLive, TimeToLive, FOREVER, INSTANT};
Loading

0 comments on commit 0df75ac

Please sign in to comment.