Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: ttl=0/instant/forever/humantime&ttl refactor #5089

Merged
merged 21 commits into from
Dec 6, 2024
Merged
Show file tree
Hide file tree
Changes from 13 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 3 additions & 0 deletions src/common/base/Cargo.toml
discord9 marked this conversation as resolved.
Show resolved Hide resolved
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@ bytes.workspace = true
common-error.workspace = true
common-macro.workspace = true
futures.workspace = true
humantime.workspace = true
humantime-serde.workspace = true
paste = "1.0"
pin-project.workspace = true
serde = { version = "1.0", features = ["derive"] }
Expand All @@ -24,4 +26,5 @@ zeroize = { version = "1.6", default-features = false, features = ["alloc"] }

[dev-dependencies]
common-test-util.workspace = true
serde_json.workspace = true
toml.workspace = true
2 changes: 2 additions & 0 deletions src/common/base/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,10 @@ pub mod range_read;
#[allow(clippy::all)]
pub mod readable_size;
pub mod secrets;
pub mod ttl;

pub type AffectedRows = usize;

pub use bit_vec::BitVec;
pub use plugins::Plugins;
pub use ttl::TimeToLive;
129 changes: 129 additions & 0 deletions src/common/base/src/ttl.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,129 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use std::fmt::Display;
use std::time::Duration;

use serde::{Deserialize, Serialize};

/// Time To Live
#[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Clone, Copy, Default, Serialize, Deserialize)]
#[serde(rename_all = "snake_case")]
pub enum TimeToLive {
/// Immediately throw away on insert
discord9 marked this conversation as resolved.
Show resolved Hide resolved
Instant,
/// Keep the data forever
#[default]
Forever,
/// Duration to keep the data, this duration should be non-zero
#[serde(untagged, with = "humantime_serde")]
Duration(Duration),
}

impl Display for TimeToLive {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
TimeToLive::Instant => write!(f, "instant"),
TimeToLive::Duration(d) => write!(f, "{}", humantime::Duration::from(*d)),
TimeToLive::Forever => write!(f, "forever"),
}
}
}

impl TimeToLive {
/// Parse a string that is either `immediate`, `forever`, or a duration to `TimeToLive`
discord9 marked this conversation as resolved.
Show resolved Hide resolved
///
/// note that a empty string is treat as `forever` too
discord9 marked this conversation as resolved.
Show resolved Hide resolved
pub fn from_humantime_or_str(s: &str) -> Result<Self, String> {
match s {
discord9 marked this conversation as resolved.
Show resolved Hide resolved
"instant" => Ok(TimeToLive::Instant),
"forever" | "" => Ok(TimeToLive::Forever),
_ => {
let d = humantime::parse_duration(s).map_err(|e| e.to_string())?;
Ok(TimeToLive::from(d))
}
}
}

/// Print TimeToLive as string
pub fn as_repr_opt(&self) -> Option<String> {
discord9 marked this conversation as resolved.
Show resolved Hide resolved
match self {
TimeToLive::Instant => Some("instant".to_string()),
TimeToLive::Duration(d) => Some(humantime::format_duration(*d).to_string()),
TimeToLive::Forever => Some("forever".to_string()),
}
}

pub fn is_immediate(&self) -> bool {
discord9 marked this conversation as resolved.
Show resolved Hide resolved
matches!(self, TimeToLive::Instant)
}

/// Is the default value, which is `Forever`
pub fn is_forever(&self) -> bool {
matches!(self, TimeToLive::Forever)
}

pub fn get_duration(&self) -> Option<Duration> {
discord9 marked this conversation as resolved.
Show resolved Hide resolved
match self {
TimeToLive::Duration(d) => Some(*d),
_ => None,
}
}
}

impl From<Duration> for TimeToLive {
fn from(duration: Duration) -> Self {
if duration.is_zero() {
// compatibility with old code, and inline with cassandra's behavior when ttl set to 0
TimeToLive::Forever
} else {
TimeToLive::Duration(duration)
}
}
}

impl From<humantime::Duration> for TimeToLive {
fn from(duration: humantime::Duration) -> Self {
Self::from(*duration)
}
}

#[cfg(test)]
mod test {
use super::*;

#[test]
fn test_serde() {
let cases = vec![
("\"instant\"", TimeToLive::Instant),
("\"forever\"", TimeToLive::Forever),
("\"10d\"", Duration::from_secs(86400 * 10).into()),
(
"\"10000 years\"",
humantime::parse_duration("10000 years").unwrap().into(),
),
];

for (s, expected) in cases {
let serialized = serde_json::to_string(&expected).unwrap();
let deserialized: TimeToLive = serde_json::from_str(&serialized).unwrap();
assert_eq!(deserialized, expected);

let deserialized: TimeToLive = serde_json::from_str(s).unwrap_or_else(|err| {
panic!("Actual serialized: {}, s=`{s}`, err: {:?}", serialized, err)
});
assert_eq!(deserialized, expected);
}
}
}
10 changes: 3 additions & 7 deletions src/common/meta/src/ddl/alter_database.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,11 +46,7 @@ fn build_new_schema_value(
for option in options.0.iter() {
match option {
SetDatabaseOption::Ttl(ttl) => {
if ttl.is_zero() {
value.ttl = None;
} else {
value.ttl = Some(*ttl);
}
value.ttl = Some(*ttl);
}
}
}
Expand Down Expand Up @@ -230,12 +226,12 @@ mod tests {
#[test]
fn test_build_new_schema_value() {
let set_ttl = AlterDatabaseKind::SetDatabaseOptions(SetDatabaseOptions(vec![
SetDatabaseOption::Ttl(Duration::from_secs(10)),
SetDatabaseOption::Ttl(Duration::from_secs(10).into()),
]));
let current_schema_value = SchemaNameValue::default();
let new_schema_value =
build_new_schema_value(current_schema_value.clone(), &set_ttl).unwrap();
assert_eq!(new_schema_value.ttl, Some(Duration::from_secs(10)));
assert_eq!(new_schema_value.ttl, Some(Duration::from_secs(10).into()));

let unset_ttl_alter_kind =
AlterDatabaseKind::UnsetDatabaseOptions(UnsetDatabaseOptions(vec![
Expand Down
73 changes: 56 additions & 17 deletions src/common/meta/src/key/schema_name.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,8 @@
use std::collections::HashMap;
use std::fmt::Display;
use std::sync::Arc;
use std::time::Duration;

use common_base::TimeToLive;
use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME};
use futures::stream::BoxStream;
use humantime_serde::re::humantime;
Expand Down Expand Up @@ -57,15 +57,13 @@ impl Default for SchemaNameKey<'_> {
#[derive(Debug, Default, Clone, PartialEq, Serialize, Deserialize)]
pub struct SchemaNameValue {
#[serde(default)]
#[serde(with = "humantime_serde")]
pub ttl: Option<Duration>,
pub ttl: Option<TimeToLive>,
}

impl Display for SchemaNameValue {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
if let Some(ttl) = self.ttl {
let ttl = humantime::format_duration(ttl);
write!(f, "ttl='{ttl}'")?;
if let Some(ttl) = self.ttl.and_then(|i| i.as_repr_opt()) {
write!(f, "ttl='{}'", ttl)?;
}

Ok(())
Expand Down Expand Up @@ -96,11 +94,8 @@ impl TryFrom<&HashMap<String, String>> for SchemaNameValue {
impl From<SchemaNameValue> for HashMap<String, String> {
fn from(value: SchemaNameValue) -> Self {
let mut opts = HashMap::new();
if let Some(ttl) = value.ttl {
opts.insert(
OPT_KEY_TTL.to_string(),
format!("{}", humantime::format_duration(ttl)),
);
if let Some(ttl) = value.ttl.and_then(|ttl| ttl.as_repr_opt()) {
opts.insert(OPT_KEY_TTL.to_string(), ttl);
}
opts
}
Expand Down Expand Up @@ -313,6 +308,7 @@ impl<'a> From<&'a SchemaName> for SchemaNameKey<'a> {

#[cfg(test)]
mod tests {
use std::time::Duration;

use super::*;
use crate::kv_backend::memory::MemoryKvBackend;
Expand All @@ -323,9 +319,19 @@ mod tests {
assert_eq!("", schema_value.to_string());

let schema_value = SchemaNameValue {
ttl: Some(Duration::from_secs(9)),
ttl: Some(Duration::from_secs(9).into()),
};
assert_eq!("ttl='9s'", schema_value.to_string());

let schema_value = SchemaNameValue {
ttl: Some(Duration::from_secs(0).into()),
};
assert_eq!("ttl='forever'", schema_value.to_string());

let schema_value = SchemaNameValue {
ttl: Some(TimeToLive::Instant),
};
assert_eq!("ttl='instant'", schema_value.to_string());
}

#[test]
Expand All @@ -338,17 +344,40 @@ mod tests {
assert_eq!(key, parsed);

let value = SchemaNameValue {
ttl: Some(Duration::from_secs(10)),
ttl: Some(Duration::from_secs(10).into()),
discord9 marked this conversation as resolved.
Show resolved Hide resolved
};
let mut opts: HashMap<String, String> = HashMap::new();
opts.insert("ttl".to_string(), "10s".to_string());
let from_value = SchemaNameValue::try_from(&opts).unwrap();
assert_eq!(value, from_value);

let parsed = SchemaNameValue::try_from_raw_value("{\"ttl\":\"10s\"}".as_bytes()).unwrap();
let parsed = SchemaNameValue::try_from_raw_value(
serde_json::json!({"ttl": "10s"}).to_string().as_bytes(),
)
.unwrap();
assert_eq!(Some(value), parsed);

let imme = SchemaNameValue {
ttl: Some(TimeToLive::Instant),
};
let parsed = SchemaNameValue::try_from_raw_value(
serde_json::json!({"ttl": "instant"}).to_string().as_bytes(),
)
.unwrap();
assert_eq!(Some(imme), parsed);

let forever = SchemaNameValue {
ttl: Some(TimeToLive::default()),
};
let parsed = SchemaNameValue::try_from_raw_value(
serde_json::json!({"ttl": "forever"}).to_string().as_bytes(),
)
.unwrap();
assert_eq!(Some(forever), parsed);

let none = SchemaNameValue::try_from_raw_value("null".as_bytes()).unwrap();
assert!(none.is_none());

let err_empty = SchemaNameValue::try_from_raw_value("".as_bytes());
assert!(err_empty.is_err());
}
Expand All @@ -374,7 +403,7 @@ mod tests {

let current_schema_value = manager.get(schema_key).await.unwrap().unwrap();
let new_schema_value = SchemaNameValue {
ttl: Some(Duration::from_secs(10)),
ttl: Some(Duration::from_secs(10).into()),
};
manager
.update(schema_key, &current_schema_value, &new_schema_value)
Expand All @@ -388,10 +417,10 @@ mod tests {
.unwrap();

let new_schema_value = SchemaNameValue {
ttl: Some(Duration::from_secs(40)),
ttl: Some(Duration::from_secs(40).into()),
};
let incorrect_schema_value = SchemaNameValue {
ttl: Some(Duration::from_secs(20)),
ttl: Some(Duration::from_secs(20).into()),
}
.try_as_raw_value()
.unwrap();
Expand All @@ -402,5 +431,15 @@ mod tests {
.update(schema_key, &incorrect_schema_value, &new_schema_value)
.await
.unwrap_err();

let current_schema_value = manager.get(schema_key).await.unwrap().unwrap();
let new_schema_value = SchemaNameValue { ttl: None };
manager
.update(schema_key, &current_schema_value, &new_schema_value)
.await
.unwrap();

let current_schema_value = manager.get(schema_key).await.unwrap().unwrap();
assert_eq!(new_schema_value, *current_schema_value);
}
}
Loading
Loading