Skip to content

Commit

Permalink
refactor: per review
Browse files Browse the repository at this point in the history
  • Loading branch information
discord9 committed Dec 6, 2024
1 parent c5186dd commit 5399c32
Show file tree
Hide file tree
Showing 16 changed files with 123 additions and 75 deletions.
4 changes: 0 additions & 4 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 0 additions & 3 deletions src/common/base/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,6 @@ bytes.workspace = true
common-error.workspace = true
common-macro.workspace = true
futures.workspace = true
humantime.workspace = true
humantime-serde.workspace = true
paste = "1.0"
pin-project.workspace = true
serde = { version = "1.0", features = ["derive"] }
Expand All @@ -26,5 +24,4 @@ zeroize = { version = "1.6", default-features = false, features = ["alloc"] }

[dev-dependencies]
common-test-util.workspace = true
serde_json.workspace = true
toml.workspace = true
4 changes: 2 additions & 2 deletions src/common/meta/src/key/schema_name.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ pub struct SchemaNameValue {

impl Display for SchemaNameValue {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
if let Some(ttl) = self.ttl.and_then(|i| i.to_string_opt()) {
if let Some(ttl) = self.ttl.map(|i| i.to_string()) {
write!(f, "ttl='{}'", ttl)?;
}

Expand Down Expand Up @@ -94,7 +94,7 @@ impl TryFrom<&HashMap<String, String>> for SchemaNameValue {
impl From<SchemaNameValue> for HashMap<String, String> {
fn from(value: SchemaNameValue) -> Self {
let mut opts = HashMap::new();
if let Some(ttl) = value.ttl.and_then(|ttl| ttl.to_string_opt()) {
if let Some(ttl) = value.ttl.map(|ttl| ttl.to_string()) {
opts.insert(OPT_KEY_TTL.to_string(), ttl);
}
opts
Expand Down
8 changes: 8 additions & 0 deletions src/common/time/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -93,12 +93,20 @@ pub enum Error {
#[snafu(implicit)]
location: Location,
},

#[snafu(display("Failed to parse duration: {raw:?}"))]
ParseDuration {
raw: humantime::DurationError,
#[snafu(implicit)]
location: Location,
},
}

impl ErrorExt for Error {
fn status_code(&self) -> StatusCode {
match self {
Error::ParseDateStr { .. }
| Error::ParseDuration { .. }
| Error::ParseTimestamp { .. }
| Error::InvalidTimezoneOffset { .. }
| Error::Format { .. }
Expand Down
23 changes: 4 additions & 19 deletions src/common/time/src/ttl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ use std::time::Duration;

use serde::{Deserialize, Serialize};

use crate::error::{Error, ParseDurationSnafu};
use crate::Timestamp;

pub const INSTANT: &str = "instant";
Expand Down Expand Up @@ -50,12 +51,13 @@ impl TimeToLive {
/// Parse a string that is either `instant`, `forever`, or a duration to `TimeToLive`
///
/// note that an empty string or a zero duration(a duration that spans no time) is treat as `forever` too
pub fn from_humantime_or_str(s: &str) -> Result<Self, String> {
pub fn from_humantime_or_str(s: &str) -> Result<Self, Error> {
match s.to_lowercase().as_ref() {
INSTANT => Ok(TimeToLive::Instant),
FOREVER | "" => Ok(TimeToLive::Forever),
_ => {
let d = humantime::parse_duration(s).map_err(|e| e.to_string())?;
let d = humantime::parse_duration(s)
.map_err(|e| ParseDurationSnafu { raw: e }.build())?;
Ok(TimeToLive::from(d))
}
}
Expand All @@ -75,15 +77,6 @@ impl TimeToLive {
})
}

/// Print TimeToLive as string
pub fn to_string_opt(&self) -> Option<String> {
match self {
TimeToLive::Instant => Some(INSTANT.to_string()),
TimeToLive::Duration(d) => Some(humantime::format_duration(*d).to_string()),
TimeToLive::Forever => Some(FOREVER.to_string()),
}
}

/// is instant variant
pub fn is_instant(&self) -> bool {
matches!(self, TimeToLive::Instant)
Expand All @@ -93,14 +86,6 @@ impl TimeToLive {
pub fn is_forever(&self) -> bool {
matches!(self, TimeToLive::Forever)
}

/// Get duration if it is a duration variant, otherwise return None
pub fn get_duration(&self) -> Option<Duration> {
match self {
TimeToLive::Duration(d) => Some(*d),
_ => None,
}
}
}

impl From<Duration> for TimeToLive {
Expand Down
8 changes: 5 additions & 3 deletions src/operator/src/insert.rs
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ impl AutoCreateTableType {
///
/// This is used to split requests for different processing.
#[derive(Clone)]
pub struct InstantInsertRequests {
pub struct InstantOrNormalInsertRequests {
/// Requests with normal ttl.
pub normal_requests: RegionInsertRequests,
/// Requests with ttl=instant.
Expand Down Expand Up @@ -293,7 +293,7 @@ impl Inserter {
impl Inserter {
async fn do_request(
&self,
requests: InstantInsertRequests,
requests: InstantOrNormalInsertRequests,
ctx: &QueryContextRef,
) -> Result<Output> {
let write_cost = write_meter!(
Expand All @@ -308,7 +308,7 @@ impl Inserter {
..Default::default()
});

let InstantInsertRequests {
let InstantOrNormalInsertRequests {
normal_requests,
instant_requests,
} = requests;
Expand Down Expand Up @@ -589,6 +589,8 @@ impl Inserter {
AutoCreateTableType::Physical
| AutoCreateTableType::Log
| AutoCreateTableType::LastNonNull => {
// note that auto create table shouldn't be ttl instant table
// for it's a very unexpected behavior and should be set by user explicitly
for create_table in create_tables {
let table = self
.create_physical_table(create_table, ctx, statement_executor)
Expand Down
9 changes: 6 additions & 3 deletions src/operator/src/req_convert/insert/row_to_region.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ use snafu::OptionExt;
use table::metadata::TableId;

use crate::error::{Result, TableNotFoundSnafu};
use crate::insert::InstantInsertRequests;
use crate::insert::InstantOrNormalInsertRequests;
use crate::req_convert::common::partitioner::Partitioner;

pub struct RowToRegion<'a> {
Expand All @@ -42,7 +42,10 @@ impl<'a> RowToRegion<'a> {
}
}

pub async fn convert(&self, requests: RowInsertRequests) -> Result<InstantInsertRequests> {
pub async fn convert(
&self,
requests: RowInsertRequests,
) -> Result<InstantOrNormalInsertRequests> {
let mut region_request = Vec::with_capacity(requests.inserts.len());
let mut instant_request = Vec::with_capacity(requests.inserts.len());
for request in requests.inserts {
Expand All @@ -57,7 +60,7 @@ impl<'a> RowToRegion<'a> {
}
}

Ok(InstantInsertRequests {
Ok(InstantOrNormalInsertRequests {
normal_requests: RegionInsertRequests {
requests: region_request,
},
Expand Down
16 changes: 8 additions & 8 deletions src/operator/src/req_convert/insert/stmt_to_region.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ use crate::error::{
ColumnNotFoundSnafu, InvalidSqlSnafu, MissingInsertBodySnafu, ParseSqlSnafu, Result,
SchemaReadOnlySnafu, TableNotFoundSnafu,
};
use crate::insert::InstantInsertRequests;
use crate::insert::InstantOrNormalInsertRequests;
use crate::req_convert::common::partitioner::Partitioner;
use crate::req_convert::insert::semantic_type;

Expand Down Expand Up @@ -61,7 +61,7 @@ impl<'a> StatementToRegion<'a> {
&self,
stmt: &Insert,
query_ctx: &QueryContextRef,
) -> Result<InstantInsertRequests> {
) -> Result<InstantOrNormalInsertRequests> {
let (catalog, schema, table_name) = self.get_full_name(stmt.table_name())?;
let table = self.get_table(&catalog, &schema, &table_name).await?;
let table_schema = table.schema();
Expand Down Expand Up @@ -136,17 +136,17 @@ impl<'a> StatementToRegion<'a> {
.partition_insert_requests(table_info.table_id(), Rows { schema, rows })
.await?;
let requests = RegionInsertRequests { requests };
Ok(if table_info.is_ttl_instant_table() {
InstantInsertRequests {
if table_info.is_ttl_instant_table() {
Ok(InstantOrNormalInsertRequests {
normal_requests: Default::default(),
instant_requests: requests,
}
})
} else {
InstantInsertRequests {
Ok(InstantOrNormalInsertRequests {
normal_requests: requests,
instant_requests: Default::default(),
}
})
})
}
}

async fn get_table(&self, catalog: &str, schema: &str, table: &str) -> Result<TableRef> {
Expand Down
11 changes: 7 additions & 4 deletions src/operator/src/req_convert/insert/table_to_region.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ use table::metadata::TableInfo;
use table::requests::InsertRequest as TableInsertRequest;

use crate::error::Result;
use crate::insert::InstantInsertRequests;
use crate::insert::InstantOrNormalInsertRequests;
use crate::req_convert::common::partitioner::Partitioner;
use crate::req_convert::common::{column_schema, row_count};

Expand All @@ -36,7 +36,10 @@ impl<'a> TableToRegion<'a> {
}
}

pub async fn convert(&self, request: TableInsertRequest) -> Result<InstantInsertRequests> {
pub async fn convert(
&self,
request: TableInsertRequest,
) -> Result<InstantOrNormalInsertRequests> {
let row_count = row_count(&request.columns_values)?;
let schema = column_schema(self.table_info, &request.columns_values)?;
let rows = api::helper::vectors_to_rows(request.columns_values.values(), row_count);
Expand All @@ -48,12 +51,12 @@ impl<'a> TableToRegion<'a> {

let requests = RegionInsertRequests { requests };
if self.table_info.is_ttl_instant_table() {
Ok(InstantInsertRequests {
Ok(InstantOrNormalInsertRequests {
normal_requests: Default::default(),
instant_requests: requests,
})
} else {
Ok(InstantInsertRequests {
Ok(InstantOrNormalInsertRequests {
normal_requests: requests,
instant_requests: Default::default(),
})
Expand Down
4 changes: 2 additions & 2 deletions src/query/src/sql/show_create_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,11 +45,11 @@ fn create_sql_options(table_meta: &TableMeta, schema_options: Option<SchemaOptio
write_buffer_size.to_string(),
);
}
if let Some(ttl) = table_opts.ttl.and_then(|t| t.to_string_opt()) {
if let Some(ttl) = table_opts.ttl.map(|t| t.to_string()) {
options.insert(TTL_KEY.to_string(), ttl);
} else if let Some(database_ttl) = schema_options
.and_then(|o| o.ttl)
.and_then(|ttl| ttl.to_string_opt())
.map(|ttl| ttl.to_string())
{
options.insert(TTL_KEY.to_string(), database_ttl);
};
Expand Down
1 change: 0 additions & 1 deletion src/session/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ testing = []
workspace = true

[dependencies]
ahash.workspace = true
api.workspace = true
arc-swap = "1.5"
auth.workspace = true
Expand Down
4 changes: 2 additions & 2 deletions src/table/src/requests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,7 @@ impl fmt::Display for TableOptions {
key_vals.push(format!("{}={}", WRITE_BUFFER_SIZE_KEY, size));
}

if let Some(ttl) = self.ttl.and_then(|ttl| ttl.to_string_opt()) {
if let Some(ttl) = self.ttl.map(|ttl| ttl.to_string()) {
key_vals.push(format!("{}={}", TTL_KEY, ttl));
}

Expand All @@ -155,7 +155,7 @@ impl From<&TableOptions> for HashMap<String, String> {
write_buffer_size.to_string(),
);
}
if let Some(ttl_str) = opts.ttl.and_then(|ttl| ttl.to_string_opt()) {
if let Some(ttl_str) = opts.ttl.map(|ttl| ttl.to_string()) {
let _ = res.insert(TTL_KEY.to_string(), ttl_str);
}
res.extend(
Expand Down
10 changes: 10 additions & 0 deletions tests/cases/standalone/common/flow/flow_basic.result
Original file line number Diff line number Diff line change
Expand Up @@ -338,6 +338,16 @@ where

Affected Rows: 0

SHOW CREATE FLOW filter_numbers_basic;

+----------------------+----------------------------------------------------------------------------------------------------------------------------------------------+
| Flow | Create Flow |
+----------------------+----------------------------------------------------------------------------------------------------------------------------------------------+
| filter_numbers_basic | CREATE FLOW IF NOT EXISTS filter_numbers_basic |
| | SINK TO out_num_cnt_basic |
| | AS SELECT INTERVAL '1 day 1 second', INTERVAL '1 month 1 day 1 second', INTERVAL '1 year 1 month' FROM numbers_input_basic WHERE number > 10 |
+----------------------+----------------------------------------------------------------------------------------------------------------------------------------------+

drop flow filter_numbers_basic;

Affected Rows: 0
Expand Down
2 changes: 2 additions & 0 deletions tests/cases/standalone/common/flow/flow_basic.sql
Original file line number Diff line number Diff line change
Expand Up @@ -182,6 +182,8 @@ FROM
where
number > 10;

SHOW CREATE FLOW filter_numbers_basic;

drop flow filter_numbers_basic;

drop table out_num_cnt_basic;
Expand Down
Loading

0 comments on commit 5399c32

Please sign in to comment.