diff --git a/src/common/grpc/src/lib.rs b/src/common/grpc/src/lib.rs index 948ff43c2905..2480c74fc112 100644 --- a/src/common/grpc/src/lib.rs +++ b/src/common/grpc/src/lib.rs @@ -15,7 +15,7 @@ pub mod channel_manager; pub mod error; pub mod flight; +pub mod precision; pub mod select; -pub mod writer; pub use error::Error; diff --git a/src/common/grpc/src/precision.rs b/src/common/grpc/src/precision.rs new file mode 100644 index 000000000000..dd3f65c9783c --- /dev/null +++ b/src/common/grpc/src/precision.rs @@ -0,0 +1,141 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::fmt::Display; + +use common_time::timestamp::TimeUnit; + +use crate::Error; + +/// Precision represents the precision of a timestamp. +/// It is used to convert timestamps between different precisions. +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub enum Precision { + Nanosecond, + Microsecond, + Millisecond, + Second, + Minute, + Hour, +} + +impl Precision { + pub fn to_nanos(&self, amount: i64) -> Option { + match self { + Precision::Nanosecond => Some(amount), + Precision::Microsecond => amount.checked_mul(1_000), + Precision::Millisecond => amount.checked_mul(1_000_000), + Precision::Second => amount.checked_mul(1_000_000_000), + Precision::Minute => amount + .checked_mul(60) + .and_then(|a| a.checked_mul(1_000_000_000)), + Precision::Hour => amount + .checked_mul(3600) + .and_then(|a| a.checked_mul(1_000_000_000)), + } + } + + pub fn to_millis(&self, amount: i64) -> Option { + match self { + Precision::Nanosecond => amount.checked_div(1_000_000), + Precision::Microsecond => amount.checked_div(1_000), + Precision::Millisecond => Some(amount), + Precision::Second => amount.checked_mul(1_000), + Precision::Minute => amount.checked_mul(60_000), + Precision::Hour => amount.checked_mul(3_600_000), + } + } +} + +impl Display for Precision { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + Precision::Nanosecond => write!(f, "Precision::Nanosecond"), + Precision::Microsecond => write!(f, "Precision::Microsecond"), + Precision::Millisecond => write!(f, "Precision::Millisecond"), + Precision::Second => write!(f, "Precision::Second"), + Precision::Minute => write!(f, "Precision::Minute"), + Precision::Hour => write!(f, "Precision::Hour"), + } + } +} + +impl TryFrom for TimeUnit { + type Error = Error; + + fn try_from(precision: Precision) -> Result { + Ok(match precision { + Precision::Second => TimeUnit::Second, + Precision::Millisecond => TimeUnit::Millisecond, + Precision::Microsecond => TimeUnit::Microsecond, + Precision::Nanosecond => TimeUnit::Nanosecond, + _ => { + return Err(Error::NotSupported { + feat: format!("convert {precision} into TimeUnit"), + }) + } + }) + } +} + +#[cfg(test)] +mod tests { + use crate::precision::Precision; + + #[test] + fn test_to_nanos() { + assert_eq!(Precision::Nanosecond.to_nanos(1).unwrap(), 1); + assert_eq!(Precision::Microsecond.to_nanos(1).unwrap(), 1_000); + assert_eq!(Precision::Millisecond.to_nanos(1).unwrap(), 1_000_000); + assert_eq!(Precision::Second.to_nanos(1).unwrap(), 1_000_000_000); + assert_eq!(Precision::Minute.to_nanos(1).unwrap(), 60 * 1_000_000_000); + assert_eq!( + Precision::Hour.to_nanos(1).unwrap(), + 60 * 60 * 1_000_000_000 + ); + } + + #[test] + fn test_to_millis() { + assert_eq!(Precision::Nanosecond.to_millis(1_000_000).unwrap(), 1); + assert_eq!(Precision::Microsecond.to_millis(1_000).unwrap(), 1); + assert_eq!(Precision::Millisecond.to_millis(1).unwrap(), 1); + assert_eq!(Precision::Second.to_millis(1).unwrap(), 1_000); + assert_eq!(Precision::Minute.to_millis(1).unwrap(), 60 * 1_000); + assert_eq!(Precision::Hour.to_millis(1).unwrap(), 60 * 60 * 1_000); + } + + #[test] + fn test_to_nanos_basic() { + assert_eq!(Precision::Second.to_nanos(1), Some(1_000_000_000)); + assert_eq!(Precision::Minute.to_nanos(1), Some(60 * 1_000_000_000)); + } + + #[test] + fn test_to_millis_basic() { + assert_eq!(Precision::Second.to_millis(1), Some(1_000)); + assert_eq!(Precision::Minute.to_millis(1), Some(60_000)); + } + + #[test] + fn test_to_nanos_overflow() { + assert_eq!(Precision::Hour.to_nanos(i64::MAX / 100), None); + } + + #[test] + fn test_zero_input() { + assert_eq!(Precision::Second.to_nanos(0), Some(0)); + assert_eq!(Precision::Minute.to_millis(0), Some(0)); + } +} diff --git a/src/common/grpc/src/writer.rs b/src/common/grpc/src/writer.rs deleted file mode 100644 index 3a8e9238287e..000000000000 --- a/src/common/grpc/src/writer.rs +++ /dev/null @@ -1,441 +0,0 @@ -// Copyright 2023 Greptime Team -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -use std::collections::HashMap; -use std::fmt::Display; - -use api::helper::values_with_capacity; -use api::v1::{Column, ColumnDataType, ColumnDataTypeExtension, SemanticType}; -use common_base::BitVec; -use common_time::timestamp::TimeUnit; -use snafu::ensure; - -use crate::error::{Result, TypeMismatchSnafu}; -use crate::Error; - -type ColumnName = String; - -type RowCount = u32; - -// TODO(fys): will remove in the future. -#[derive(Default)] -pub struct LinesWriter { - column_name_index: HashMap, - null_masks: Vec, - batch: (Vec, RowCount), - lines: usize, -} - -impl LinesWriter { - pub fn with_lines(lines: usize) -> Self { - Self { - lines, - ..Default::default() - } - } - - pub fn write_ts(&mut self, column_name: &str, value: (i64, Precision)) -> Result<()> { - let (idx, column) = self.mut_column( - column_name, - ColumnDataType::TimestampMillisecond, - SemanticType::Timestamp, - None, - ); - ensure!( - column.datatype == ColumnDataType::TimestampMillisecond as i32, - TypeMismatchSnafu { - column_name, - expected: "timestamp", - actual: format!("{:?}", column.datatype) - } - ); - // It is safe to use unwrap here, because values has been initialized in mut_column() - let values = column.values.as_mut().unwrap(); - values - .timestamp_millisecond_values - .push(to_ms_ts(value.1, value.0)); - self.null_masks[idx].push(false); - Ok(()) - } - - pub fn write_tag(&mut self, column_name: &str, value: &str) -> Result<()> { - let (idx, column) = - self.mut_column(column_name, ColumnDataType::String, SemanticType::Tag, None); - ensure!( - column.datatype == ColumnDataType::String as i32, - TypeMismatchSnafu { - column_name, - expected: "string", - actual: format!("{:?}", column.datatype) - } - ); - // It is safe to use unwrap here, because values has been initialized in mut_column() - let values = column.values.as_mut().unwrap(); - values.string_values.push(value.to_string()); - self.null_masks[idx].push(false); - Ok(()) - } - - pub fn write_u64(&mut self, column_name: &str, value: u64) -> Result<()> { - let (idx, column) = self.mut_column( - column_name, - ColumnDataType::Uint64, - SemanticType::Field, - None, - ); - ensure!( - column.datatype == ColumnDataType::Uint64 as i32, - TypeMismatchSnafu { - column_name, - expected: "u64", - actual: format!("{:?}", column.datatype) - } - ); - // It is safe to use unwrap here, because values has been initialized in mut_column() - let values = column.values.as_mut().unwrap(); - values.u64_values.push(value); - self.null_masks[idx].push(false); - Ok(()) - } - - pub fn write_i64(&mut self, column_name: &str, value: i64) -> Result<()> { - let (idx, column) = self.mut_column( - column_name, - ColumnDataType::Int64, - SemanticType::Field, - None, - ); - ensure!( - column.datatype == ColumnDataType::Int64 as i32, - TypeMismatchSnafu { - column_name, - expected: "i64", - actual: format!("{:?}", column.datatype) - } - ); - // It is safe to use unwrap here, because values has been initialized in mut_column() - let values = column.values.as_mut().unwrap(); - values.i64_values.push(value); - self.null_masks[idx].push(false); - Ok(()) - } - - pub fn write_f64(&mut self, column_name: &str, value: f64) -> Result<()> { - let (idx, column) = self.mut_column( - column_name, - ColumnDataType::Float64, - SemanticType::Field, - None, - ); - ensure!( - column.datatype == ColumnDataType::Float64 as i32, - TypeMismatchSnafu { - column_name, - expected: "f64", - actual: format!("{:?}", column.datatype) - } - ); - // It is safe to use unwrap here, because values has been initialized in mut_column() - let values = column.values.as_mut().unwrap(); - values.f64_values.push(value); - self.null_masks[idx].push(false); - Ok(()) - } - - pub fn write_string(&mut self, column_name: &str, value: &str) -> Result<()> { - let (idx, column) = self.mut_column( - column_name, - ColumnDataType::String, - SemanticType::Field, - None, - ); - ensure!( - column.datatype == ColumnDataType::String as i32, - TypeMismatchSnafu { - column_name, - expected: "string", - actual: format!("{:?}", column.datatype) - } - ); - // It is safe to use unwrap here, because values has been initialized in mut_column() - let values = column.values.as_mut().unwrap(); - values.string_values.push(value.to_string()); - self.null_masks[idx].push(false); - Ok(()) - } - - pub fn write_bool(&mut self, column_name: &str, value: bool) -> Result<()> { - let (idx, column) = self.mut_column( - column_name, - ColumnDataType::Boolean, - SemanticType::Field, - None, - ); - ensure!( - column.datatype == ColumnDataType::Boolean as i32, - TypeMismatchSnafu { - column_name, - expected: "boolean", - actual: format!("{:?}", column.datatype) - } - ); - // It is safe to use unwrap here, because values has been initialized in mut_column() - let values = column.values.as_mut().unwrap(); - values.bool_values.push(value); - self.null_masks[idx].push(false); - Ok(()) - } - - pub fn commit(&mut self) { - let batch = &mut self.batch; - batch.1 += 1; - - for i in 0..batch.0.len() { - let null_mask = &mut self.null_masks[i]; - if batch.1 as usize > null_mask.len() { - null_mask.push(true); - } - } - } - - pub fn finish(mut self) -> (Vec, RowCount) { - let null_masks = self.null_masks; - for (i, null_mask) in null_masks.into_iter().enumerate() { - let columns = &mut self.batch.0; - columns[i].null_mask = null_mask.into_vec(); - } - self.batch - } - - fn mut_column( - &mut self, - column_name: &str, - datatype: ColumnDataType, - semantic_type: SemanticType, - datatype_extension: Option, - ) -> (usize, &mut Column) { - let column_names = &mut self.column_name_index; - let column_idx = match column_names.get(column_name) { - Some(i) => *i, - None => { - let new_idx = column_names.len(); - let batch = &mut self.batch; - let to_insert = self.lines; - let mut null_mask = BitVec::with_capacity(to_insert); - null_mask.extend(BitVec::repeat(true, batch.1 as usize)); - self.null_masks.push(null_mask); - batch.0.push(Column { - column_name: column_name.to_string(), - semantic_type: semantic_type.into(), - values: Some(values_with_capacity(datatype, to_insert)), - datatype: datatype as i32, - null_mask: Vec::default(), - datatype_extension, - }); - let _ = column_names.insert(column_name.to_string(), new_idx); - new_idx - } - }; - (column_idx, &mut self.batch.0[column_idx]) - } -} - -pub fn to_ms_ts(p: Precision, ts: i64) -> i64 { - match p { - Precision::Nanosecond => ts / 1_000_000, - Precision::Microsecond => ts / 1000, - Precision::Millisecond => ts, - Precision::Second => ts * 1000, - Precision::Minute => ts * 1000 * 60, - Precision::Hour => ts * 1000 * 60 * 60, - } -} - -#[derive(Debug, Clone, Copy, PartialEq, Eq)] -pub enum Precision { - Nanosecond, - Microsecond, - Millisecond, - Second, - Minute, - Hour, -} - -impl Display for Precision { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - match self { - Precision::Nanosecond => write!(f, "Precision::Nanosecond"), - Precision::Microsecond => write!(f, "Precision::Microsecond"), - Precision::Millisecond => write!(f, "Precision::Millisecond"), - Precision::Second => write!(f, "Precision::Second"), - Precision::Minute => write!(f, "Precision::Minute"), - Precision::Hour => write!(f, "Precision::Hour"), - } - } -} - -impl TryFrom for TimeUnit { - type Error = Error; - - fn try_from(precision: Precision) -> std::result::Result { - Ok(match precision { - Precision::Second => TimeUnit::Second, - Precision::Millisecond => TimeUnit::Millisecond, - Precision::Microsecond => TimeUnit::Microsecond, - Precision::Nanosecond => TimeUnit::Nanosecond, - _ => { - return Err(Error::NotSupported { - feat: format!("convert {precision} into TimeUnit"), - }) - } - }) - } -} - -#[cfg(test)] -mod tests { - use api::v1::{ColumnDataType, SemanticType}; - use common_base::BitVec; - - use super::LinesWriter; - use crate::writer::{to_ms_ts, Precision}; - - #[test] - fn test_lines_writer() { - let mut writer = LinesWriter::with_lines(3); - - writer.write_tag("host", "host1").unwrap(); - writer.write_f64("cpu", 0.5).unwrap(); - writer.write_f64("memory", 0.4).unwrap(); - writer.write_string("name", "name1").unwrap(); - writer - .write_ts("ts", (101011000, Precision::Millisecond)) - .unwrap(); - writer.commit(); - - writer.write_tag("host", "host2").unwrap(); - writer - .write_ts("ts", (102011001, Precision::Millisecond)) - .unwrap(); - writer.write_bool("enable_reboot", true).unwrap(); - writer.write_u64("year_of_service", 2).unwrap(); - writer.write_i64("temperature", 4).unwrap(); - writer.commit(); - - writer.write_tag("host", "host3").unwrap(); - writer.write_f64("cpu", 0.4).unwrap(); - writer.write_u64("cpu_core_num", 16).unwrap(); - writer - .write_ts("ts", (103011002, Precision::Millisecond)) - .unwrap(); - writer.commit(); - - let insert_batch = writer.finish(); - assert_eq!(3, insert_batch.1); - - let columns = insert_batch.0; - assert_eq!(9, columns.len()); - - let column = &columns[0]; - assert_eq!("host", columns[0].column_name); - assert_eq!(ColumnDataType::String as i32, column.datatype); - assert_eq!(SemanticType::Tag as i32, column.semantic_type); - assert_eq!( - vec!["host1", "host2", "host3"], - column.values.as_ref().unwrap().string_values - ); - verify_null_mask(&column.null_mask, vec![false, false, false]); - - let column = &columns[1]; - assert_eq!("cpu", column.column_name); - assert_eq!(ColumnDataType::Float64 as i32, column.datatype); - assert_eq!(SemanticType::Field as i32, column.semantic_type); - assert_eq!(vec![0.5, 0.4], column.values.as_ref().unwrap().f64_values); - verify_null_mask(&column.null_mask, vec![false, true, false]); - - let column = &columns[2]; - assert_eq!("memory", column.column_name); - assert_eq!(ColumnDataType::Float64 as i32, column.datatype); - assert_eq!(SemanticType::Field as i32, column.semantic_type); - assert_eq!(vec![0.4], column.values.as_ref().unwrap().f64_values); - verify_null_mask(&column.null_mask, vec![false, true, true]); - - let column = &columns[3]; - assert_eq!("name", column.column_name); - assert_eq!(ColumnDataType::String as i32, column.datatype); - assert_eq!(SemanticType::Field as i32, column.semantic_type); - assert_eq!(vec!["name1"], column.values.as_ref().unwrap().string_values); - verify_null_mask(&column.null_mask, vec![false, true, true]); - - let column = &columns[4]; - assert_eq!("ts", column.column_name); - assert_eq!(ColumnDataType::TimestampMillisecond as i32, column.datatype); - assert_eq!(SemanticType::Timestamp as i32, column.semantic_type); - assert_eq!( - vec![101011000, 102011001, 103011002], - column.values.as_ref().unwrap().timestamp_millisecond_values - ); - verify_null_mask(&column.null_mask, vec![false, false, false]); - - let column = &columns[5]; - assert_eq!("enable_reboot", column.column_name); - assert_eq!(ColumnDataType::Boolean as i32, column.datatype); - assert_eq!(SemanticType::Field as i32, column.semantic_type); - assert_eq!(vec![true], column.values.as_ref().unwrap().bool_values); - verify_null_mask(&column.null_mask, vec![true, false, true]); - - let column = &columns[6]; - assert_eq!("year_of_service", column.column_name); - assert_eq!(ColumnDataType::Uint64 as i32, column.datatype); - assert_eq!(SemanticType::Field as i32, column.semantic_type); - assert_eq!(vec![2], column.values.as_ref().unwrap().u64_values); - verify_null_mask(&column.null_mask, vec![true, false, true]); - - let column = &columns[7]; - assert_eq!("temperature", column.column_name); - assert_eq!(ColumnDataType::Int64 as i32, column.datatype); - assert_eq!(SemanticType::Field as i32, column.semantic_type); - assert_eq!(vec![4], column.values.as_ref().unwrap().i64_values); - verify_null_mask(&column.null_mask, vec![true, false, true]); - - let column = &columns[8]; - assert_eq!("cpu_core_num", column.column_name); - assert_eq!(ColumnDataType::Uint64 as i32, column.datatype); - assert_eq!(SemanticType::Field as i32, column.semantic_type); - assert_eq!(vec![16], column.values.as_ref().unwrap().u64_values); - verify_null_mask(&column.null_mask, vec![true, true, false]); - } - - fn verify_null_mask(data: &[u8], expected: Vec) { - let bitvec = BitVec::from_slice(data); - for (idx, b) in expected.iter().enumerate() { - assert_eq!(b, bitvec.get(idx).unwrap()) - } - } - - #[test] - fn test_to_ms() { - assert_eq!(100, to_ms_ts(Precision::Nanosecond, 100110000)); - assert_eq!(100110, to_ms_ts(Precision::Microsecond, 100110000)); - assert_eq!(100110000, to_ms_ts(Precision::Millisecond, 100110000)); - assert_eq!( - 100110000 * 1000 * 60, - to_ms_ts(Precision::Minute, 100110000) - ); - assert_eq!( - 100110000 * 1000 * 60 * 60, - to_ms_ts(Precision::Hour, 100110000) - ); - } -} diff --git a/src/servers/src/error.rs b/src/servers/src/error.rs index ba8144a7c512..e3172ae8f89c 100644 --- a/src/servers/src/error.rs +++ b/src/servers/src/error.rs @@ -161,8 +161,8 @@ pub enum Error { error: influxdb_line_protocol::Error, }, - #[snafu(display("Failed to write InfluxDB line protocol"))] - InfluxdbLinesWrite { + #[snafu(display("Failed to write row"))] + RowWriter { location: Location, source: common_grpc::error::Error, }, @@ -462,6 +462,9 @@ pub enum Error { #[snafu(source)] error: notify::Error, }, + + #[snafu(display("Timestamp overflow: {}", error))] + TimestampOverflow { error: String, location: Location }, } pub type Result = std::result::Result; @@ -524,9 +527,10 @@ impl ErrorExt for Error { | IncompatibleSchema { .. } | MissingQueryContext { .. } | MysqlValueConversion { .. } - | UnexpectedPhysicalTable { .. } => StatusCode::InvalidArguments, + | UnexpectedPhysicalTable { .. } + | TimestampOverflow { .. } => StatusCode::InvalidArguments, - InfluxdbLinesWrite { source, .. } + RowWriter { source, .. } | PromSeriesWrite { source, .. } | OtlpMetricsWrite { source, .. } => source.status_code(), @@ -659,7 +663,7 @@ impl IntoResponse for Error { let error_msg = self.output_msg(); let status = match self { Error::InfluxdbLineProtocol { .. } - | Error::InfluxdbLinesWrite { .. } + | Error::RowWriter { .. } | Error::PromSeriesWrite { .. } | Error::InvalidOpentsdbLine { .. } | Error::InvalidOpentsdbJsonRequest { .. } diff --git a/src/servers/src/http/influxdb.rs b/src/servers/src/http/influxdb.rs index 4ebce72fb8c0..a9f806b3f2df 100644 --- a/src/servers/src/http/influxdb.rs +++ b/src/servers/src/http/influxdb.rs @@ -19,7 +19,7 @@ use axum::http::StatusCode; use axum::response::IntoResponse; use axum::Extension; use common_catalog::consts::DEFAULT_SCHEMA_NAME; -use common_grpc::writer::Precision; +use common_grpc::precision::Precision; use common_telemetry::tracing; use session::context::QueryContextRef; @@ -123,7 +123,7 @@ fn parse_time_precision(value: &str) -> Result { #[cfg(test)] mod tests { - use common_grpc::writer::Precision; + use common_grpc::precision::Precision; use crate::http::influxdb::parse_time_precision; diff --git a/src/servers/src/influxdb.rs b/src/servers/src/influxdb.rs index 36a3fce1faa1..76b332bf382a 100644 --- a/src/servers/src/influxdb.rs +++ b/src/servers/src/influxdb.rs @@ -14,7 +14,7 @@ use api::v1::value::ValueData; use api::v1::{ColumnDataType, RowInsertRequests}; -use common_grpc::writer::Precision; +use common_grpc::precision::Precision; use hyper::Request; use influxdb_line_protocol::{parse_lines, FieldValue}; use snafu::ResultExt; @@ -52,7 +52,7 @@ impl TryFrom for RowInsertRequests { .context(InfluxdbLineProtocolSnafu)?; let mut multi_table_data = MultiTableData::new(); - + let precision = unwrap_or_default_precision(value.precision); for line in &lines { let table_name = line.series.measurement.as_str(); let tags = &line.series.tag_set; @@ -87,8 +87,7 @@ impl TryFrom for RowInsertRequests { row_writer::write_fields(table_data, fields, &mut one_row)?; // timestamp - let precision = unwrap_or_default_precision(value.precision); - row_writer::write_ts_precision( + row_writer::write_ts_to_nanos( table_data, INFLUXDB_TIMESTAMP_COLUMN_NAME, ts, @@ -195,7 +194,7 @@ monitor2,host=host4 cpu=66.3,memory=1029 1663840496400340003"; } "ts" => { assert_eq!( - ColumnDataType::TimestampMillisecond as i32, + ColumnDataType::TimestampNanosecond as i32, column_schema.datatype ); assert_eq!(SemanticType::Timestamp as i32, column_schema.semantic_type); @@ -204,12 +203,12 @@ monitor2,host=host4 cpu=66.3,memory=1029 1663840496400340003"; let v = row.values[i].value_data.as_ref(); match j { 0 => assert_eq!( - 1663840496100023100 / 1_000_000, - extract_ts_millis_value(v.as_ref().unwrap()) + 1663840496100023100, + extract_ts_nanos_value(v.as_ref().unwrap()) ), 1 => assert_eq!( - 1663840496400340001 / 1_000_000, - extract_ts_millis_value(v.as_ref().unwrap()) + 1663840496400340001, + extract_ts_nanos_value(v.as_ref().unwrap()) ), _ => panic!(), } @@ -270,7 +269,7 @@ monitor2,host=host4 cpu=66.3,memory=1029 1663840496400340003"; } "ts" => { assert_eq!( - ColumnDataType::TimestampMillisecond as i32, + ColumnDataType::TimestampNanosecond as i32, column_schema.datatype ); assert_eq!(SemanticType::Timestamp as i32, column_schema.semantic_type); @@ -279,12 +278,12 @@ monitor2,host=host4 cpu=66.3,memory=1029 1663840496400340003"; let v = row.values[i].value_data.as_ref(); match j { 0 => assert_eq!( - 1663840496100023102 / 1_000_000, - extract_ts_millis_value(v.as_ref().unwrap()) + 1663840496100023102, + extract_ts_nanos_value(v.as_ref().unwrap()) ), 1 => assert_eq!( - 1663840496400340003 / 1_000_000, - extract_ts_millis_value(v.as_ref().unwrap()) + 1663840496400340003, + extract_ts_nanos_value(v.as_ref().unwrap()) ), _ => panic!(), } @@ -298,21 +297,21 @@ monitor2,host=host4 cpu=66.3,memory=1029 1663840496400340003"; fn extract_string_value(value: &ValueData) -> &str { match value { ValueData::StringValue(v) => v, - _ => panic!(), + _ => unreachable!(), } } fn extract_f64_value(value: &ValueData) -> f64 { match value { ValueData::F64Value(v) => *v, - _ => panic!(), + _ => unreachable!(), } } - fn extract_ts_millis_value(value: &ValueData) -> i64 { + fn extract_ts_nanos_value(value: &ValueData) -> i64 { match value { - ValueData::TimestampMillisecondValue(v) => *v, - _ => panic!(), + ValueData::TimestampNanosecondValue(v) => *v, + _ => unreachable!(), } } } diff --git a/src/servers/src/lib.rs b/src/servers/src/lib.rs index efa6084eae85..0459ca2a3f6c 100644 --- a/src/servers/src/lib.rs +++ b/src/servers/src/lib.rs @@ -29,7 +29,6 @@ pub mod heartbeat_options; pub mod http; pub mod influxdb; pub mod interceptor; -pub mod line_writer; mod metrics; pub mod metrics_handler; pub mod mysql; diff --git a/src/servers/src/line_writer.rs b/src/servers/src/line_writer.rs deleted file mode 100644 index 38ebe218c7a9..000000000000 --- a/src/servers/src/line_writer.rs +++ /dev/null @@ -1,224 +0,0 @@ -// Copyright 2023 Greptime Team -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -use std::collections::HashMap; - -use common_catalog::consts::DEFAULT_CATALOG_NAME; -use common_grpc::writer::{to_ms_ts, Precision}; -use common_time::timestamp::TimeUnit::Millisecond; -use common_time::Timestamp; -use datatypes::data_type::DataType; -use datatypes::prelude::ConcreteDataType; -use datatypes::types::{TimestampMillisecondType, TimestampType}; -use datatypes::value::Value; -use datatypes::vectors::{MutableVector, VectorRef}; -use table::requests::InsertRequest; - -type ColumnLen = usize; -type ColumnName = String; - -pub struct LineWriter { - db: String, - table_name: String, - expected_rows: usize, - current_rows: usize, - columns_builders: HashMap, ColumnLen)>, -} - -impl LineWriter { - pub fn with_lines(db: impl Into, table_name: impl Into, lines: usize) -> Self { - Self { - db: db.into(), - table_name: table_name.into(), - expected_rows: lines, - current_rows: 0, - columns_builders: Default::default(), - } - } - - pub fn write_ts(&mut self, column_name: &str, value: (i64, Precision)) { - let (val, precision) = value; - let datatype = - ConcreteDataType::Timestamp(TimestampType::Millisecond(TimestampMillisecondType)); - let ts_val = Value::Timestamp(Timestamp::new(to_ms_ts(precision, val), Millisecond)); - self.write(column_name, datatype, ts_val); - } - - pub fn write_tag(&mut self, column_name: &str, value: &str) { - self.write( - column_name, - ConcreteDataType::string_datatype(), - Value::String(value.into()), - ); - } - - pub fn write_u64(&mut self, column_name: &str, value: u64) { - self.write( - column_name, - ConcreteDataType::uint64_datatype(), - Value::UInt64(value), - ); - } - - pub fn write_i64(&mut self, column_name: &str, value: i64) { - self.write( - column_name, - ConcreteDataType::int64_datatype(), - Value::Int64(value), - ); - } - - pub fn write_f64(&mut self, column_name: &str, value: f64) { - self.write( - column_name, - ConcreteDataType::float64_datatype(), - Value::Float64(value.into()), - ); - } - - pub fn write_string(&mut self, column_name: &str, value: &str) { - self.write( - column_name, - ConcreteDataType::string_datatype(), - Value::String(value.into()), - ); - } - - pub fn write_bool(&mut self, column_name: &str, value: bool) { - self.write( - column_name, - ConcreteDataType::boolean_datatype(), - Value::Boolean(value), - ); - } - - fn write(&mut self, column_name: &str, datatype: ConcreteDataType, value: Value) { - let or_insert = || { - let rows = self.current_rows; - let mut builder = datatype.create_mutable_vector(self.expected_rows); - (0..rows).for_each(|_| builder.push_null()); - (builder, rows) - }; - let (builder, column_len) = self - .columns_builders - .entry(column_name.to_string()) - .or_insert_with(or_insert); - - builder.push_value_ref(value.as_value_ref()); - *column_len += 1; - } - - pub fn commit(&mut self) { - self.current_rows += 1; - self.columns_builders - .values_mut() - .for_each(|(builder, len)| { - if self.current_rows > *len { - builder.push_null() - } - }); - } - - pub fn finish(self) -> InsertRequest { - let columns_values: HashMap = self - .columns_builders - .into_iter() - .map(|(column_name, (mut builder, _))| (column_name, builder.to_vector())) - .collect(); - InsertRequest { - catalog_name: DEFAULT_CATALOG_NAME.to_string(), - schema_name: self.db, - table_name: self.table_name, - columns_values, - } - } -} - -#[cfg(test)] -mod tests { - use std::sync::Arc; - - use common_catalog::consts::DEFAULT_SCHEMA_NAME; - use common_time::Timestamp; - use datatypes::value::Value; - use datatypes::vectors::Vector; - - use crate::line_writer::{LineWriter, Precision}; - - #[test] - fn test_writer() { - let mut writer = LineWriter::with_lines(DEFAULT_SCHEMA_NAME, "demo".to_string(), 4); - writer.write_ts("ts", (1665893727685, Precision::Millisecond)); - writer.write_tag("host", "host-1"); - writer.write_i64("memory", 10_i64); - writer.commit(); - - writer.write_ts("ts", (1665893727686, Precision::Millisecond)); - writer.write_tag("host", "host-2"); - writer.write_tag("region", "region-2"); - writer.write_i64("memory", 9_i64); - writer.commit(); - - writer.write_ts("ts", (1665893727689, Precision::Millisecond)); - writer.write_tag("host", "host-3"); - writer.write_tag("region", "region-3"); - writer.write_i64("cpu", 19_i64); - writer.commit(); - - let insert_request = writer.finish(); - - assert_eq!("demo", insert_request.table_name); - assert_eq!(DEFAULT_SCHEMA_NAME, insert_request.schema_name); - - let columns = insert_request.columns_values; - assert_eq!(5, columns.len()); - assert!(columns.contains_key("ts")); - assert!(columns.contains_key("host")); - assert!(columns.contains_key("memory")); - assert!(columns.contains_key("region")); - assert!(columns.contains_key("cpu")); - - let ts = columns.get("ts").unwrap(); - let host = columns.get("host").unwrap(); - let memory = columns.get("memory").unwrap(); - let region = columns.get("region").unwrap(); - let cpu = columns.get("cpu").unwrap(); - - let expected: Vec = vec![ - Value::Timestamp(Timestamp::new_millisecond(1665893727685_i64)), - Value::Timestamp(Timestamp::new_millisecond(1665893727686_i64)), - Value::Timestamp(Timestamp::new_millisecond(1665893727689_i64)), - ]; - assert_vector(&expected, ts); - - let expected: Vec = vec!["host-1".into(), "host-2".into(), "host-3".into()]; - assert_vector(&expected, host); - - let expected: Vec = vec![10_i64.into(), 9_i64.into(), Value::Null]; - assert_vector(&expected, memory); - - let expected: Vec = vec![Value::Null, "region-2".into(), "region-3".into()]; - assert_vector(&expected, region); - - let expected: Vec = vec![Value::Null, Value::Null, 19_i64.into()]; - assert_vector(&expected, cpu); - } - - fn assert_vector(expected: &[Value], vector: &Arc) { - for (idx, expected) in expected.iter().enumerate() { - let val = vector.get(idx); - assert_eq!(*expected, val); - } - } -} diff --git a/src/servers/src/opentsdb.rs b/src/servers/src/opentsdb.rs index 16081e39beaf..37fc1264046e 100644 --- a/src/servers/src/opentsdb.rs +++ b/src/servers/src/opentsdb.rs @@ -23,6 +23,7 @@ use std::sync::Arc; use api::v1::RowInsertRequests; use async_trait::async_trait; use common_error::ext::ErrorExt; +use common_grpc::precision::Precision; use common_query::prelude::{GREPTIME_TIMESTAMP, GREPTIME_VALUE}; use common_runtime::Runtime; use common_telemetry::logging::{debug, error, warn}; @@ -156,10 +157,11 @@ pub fn data_point_to_grpc_row_insert_requests( // value row_writer::write_f64(table_data, GREPTIME_VALUE, value, &mut one_row)?; // timestamp - row_writer::write_ts_millis( + row_writer::write_ts_to_millis( table_data, GREPTIME_TIMESTAMP, Some(timestamp), + Precision::Millisecond, &mut one_row, )?; diff --git a/src/servers/src/otlp/metrics.rs b/src/servers/src/otlp/metrics.rs index fd8dcfc3292c..742161c0f4f0 100644 --- a/src/servers/src/otlp/metrics.rs +++ b/src/servers/src/otlp/metrics.rs @@ -13,7 +13,7 @@ // limitations under the License. use api::v1::{RowInsertRequests, Value}; -use common_grpc::writer::Precision; +use common_grpc::precision::Precision; use common_query::prelude::{GREPTIME_COUNT, GREPTIME_TIMESTAMP, GREPTIME_VALUE}; use opentelemetry_proto::tonic::collector::metrics::v1::ExportMetricsServiceRequest; use opentelemetry_proto::tonic::common::v1::{any_value, KeyValue}; @@ -117,7 +117,7 @@ fn write_attributes( } fn write_timestamp(table: &mut TableData, row: &mut Vec, time_nano: i64) -> Result<()> { - row_writer::write_ts_precision( + row_writer::write_ts_to_nanos( table, GREPTIME_TIMESTAMP, Some(time_nano), diff --git a/src/servers/src/otlp/trace.rs b/src/servers/src/otlp/trace.rs index 88a5d9b35074..edcdb8fc0b0c 100644 --- a/src/servers/src/otlp/trace.rs +++ b/src/servers/src/otlp/trace.rs @@ -14,7 +14,7 @@ use api::v1::value::ValueData; use api::v1::{ColumnDataType, RowInsertRequests}; -use common_grpc::writer::Precision; +use common_grpc::precision::Precision; use common_query::prelude::{GREPTIME_TIMESTAMP, GREPTIME_VALUE}; use opentelemetry_proto::tonic::collector::trace::v1::ExportTraceServiceRequest; @@ -132,7 +132,7 @@ pub fn write_span_to_row(writer: &mut TableData, span: TraceSpan) -> Result<()> (span.end_in_nanosecond - span.start_in_nanosecond) as f64 / 1_000_000.0, // duration in millisecond &mut row, )?; - row_writer::write_ts_precision( + row_writer::write_ts_to_nanos( writer, GREPTIME_TIMESTAMP, Some(span.start_in_nanosecond as i64), diff --git a/src/servers/src/prom_store.rs b/src/servers/src/prom_store.rs index 515a28b44bbb..85db9178c956 100644 --- a/src/servers/src/prom_store.rs +++ b/src/servers/src/prom_store.rs @@ -21,6 +21,7 @@ use std::hash::{Hash, Hasher}; use api::prom_store::remote::label_matcher::Type as MatcherType; use api::prom_store::remote::{Label, Query, Sample, TimeSeries, WriteRequest}; use api::v1::RowInsertRequests; +use common_grpc::precision::Precision; use common_query::prelude::{GREPTIME_TIMESTAMP, GREPTIME_VALUE}; use common_recordbatch::{RecordBatch, RecordBatches}; use common_telemetry::tracing; @@ -351,10 +352,11 @@ pub fn to_grpc_row_insert_requests(request: &WriteRequest) -> Result<(RowInsertR &mut one_row, )?; // timestamp - row_writer::write_ts_millis( + row_writer::write_ts_to_millis( table_data, GREPTIME_TIMESTAMP, Some(series.samples[0].timestamp), + Precision::Millisecond, &mut one_row, )?; @@ -369,10 +371,11 @@ pub fn to_grpc_row_insert_requests(request: &WriteRequest) -> Result<(RowInsertR // value row_writer::write_f64(table_data, GREPTIME_VALUE, *value, &mut one_row)?; // timestamp - row_writer::write_ts_millis( + row_writer::write_ts_to_millis( table_data, GREPTIME_TIMESTAMP, Some(*timestamp), + Precision::Millisecond, &mut one_row, )?; diff --git a/src/servers/src/row_writer.rs b/src/servers/src/row_writer.rs index 4b8956015ef1..7feb89458a0c 100644 --- a/src/servers/src/row_writer.rs +++ b/src/servers/src/row_writer.rs @@ -19,14 +19,19 @@ use api::v1::{ ColumnDataType, ColumnSchema, Row, RowInsertRequest, RowInsertRequests, Rows, SemanticType, Value, }; -use common_grpc::writer; -use common_grpc::writer::Precision; +use common_grpc::precision::Precision; use common_time::timestamp::TimeUnit; +use common_time::timestamp::TimeUnit::Nanosecond; use common_time::Timestamp; use snafu::{ensure, OptionExt, ResultExt}; -use crate::error::{IncompatibleSchemaSnafu, InfluxdbLinesWriteSnafu, Result, TimePrecisionSnafu}; +use crate::error::{ + IncompatibleSchemaSnafu, Result, RowWriterSnafu, TimePrecisionSnafu, TimestampOverflowSnafu, +}; +/// The intermediate data structure for building the write request. +/// It constructs the `schema` and `rows` as all input data row +/// parsing is completed. pub struct TableData { schema: Vec, rows: Vec, @@ -138,15 +143,17 @@ impl MultiTableData { } } +/// Write data as tags into the table data. pub fn write_tags( table_data: &mut TableData, - kvs: impl Iterator, + tags: impl Iterator, one_row: &mut Vec, ) -> Result<()> { - let ktv_iter = kvs.map(|(k, v)| (k, ColumnDataType::String, ValueData::StringValue(v))); + let ktv_iter = tags.map(|(k, v)| (k, ColumnDataType::String, ValueData::StringValue(v))); write_by_semantic_type(table_data, SemanticType::Tag, ktv_iter, one_row) } +/// Write data as fields into the table data. pub fn write_fields( table_data: &mut TableData, fields: impl Iterator, @@ -155,6 +162,7 @@ pub fn write_fields( write_by_semantic_type(table_data, SemanticType::Field, fields, one_row) } +/// Write data as a tag into the table data. pub fn write_tag( table_data: &mut TableData, name: impl ToString, @@ -173,6 +181,7 @@ pub fn write_tag( ) } +/// Write float64 data as a field into the table data. pub fn write_f64( table_data: &mut TableData, name: impl ToString, @@ -225,20 +234,53 @@ fn write_by_semantic_type( Ok(()) } -pub fn write_ts_millis( +/// Write timestamp data as milliseconds into the table data. +pub fn write_ts_to_millis( + table_data: &mut TableData, + name: impl ToString, + ts: Option, + precision: Precision, + one_row: &mut Vec, +) -> Result<()> { + write_ts_to( + table_data, + name, + ts, + precision, + TimestampType::Millis, + one_row, + ) +} + +/// Write timestamp data as nanoseconds into the table data. +pub fn write_ts_to_nanos( table_data: &mut TableData, name: impl ToString, ts: Option, + precision: Precision, one_row: &mut Vec, ) -> Result<()> { - write_ts_precision(table_data, name, ts, Precision::Millisecond, one_row) + write_ts_to( + table_data, + name, + ts, + precision, + TimestampType::Nanos, + one_row, + ) } -pub fn write_ts_precision( +enum TimestampType { + Millis, + Nanos, +} + +fn write_ts_to( table_data: &mut TableData, name: impl ToString, ts: Option, precision: Precision, + ts_type: TimestampType, one_row: &mut Vec, ) -> Result<()> { let TableData { @@ -249,43 +291,67 @@ pub fn write_ts_precision( let name = name.to_string(); let ts = match ts { - Some(timestamp) => writer::to_ms_ts(precision, timestamp), + Some(timestamp) => match ts_type { + TimestampType::Millis => precision.to_millis(timestamp), + TimestampType::Nanos => precision.to_nanos(timestamp), + } + .with_context(|| TimestampOverflowSnafu { + error: format!( + "timestamp {} overflow with precision {}", + timestamp, precision + ), + })?, None => { - let timestamp = Timestamp::current_millis(); - let unit: TimeUnit = precision.try_into().context(InfluxdbLinesWriteSnafu)?; + let timestamp = Timestamp::current_time(Nanosecond); + let unit: TimeUnit = precision.try_into().context(RowWriterSnafu)?; let timestamp = timestamp .convert_to(unit) .with_context(|| TimePrecisionSnafu { name: precision.to_string(), - })?; - writer::to_ms_ts(precision, timestamp.into()) + })? + .into(); + match ts_type { + TimestampType::Millis => precision.to_millis(timestamp), + TimestampType::Nanos => precision.to_nanos(timestamp), + } + .with_context(|| TimestampOverflowSnafu { + error: format!( + "timestamp {} overflow with precision {}", + timestamp, precision + ), + })? } }; + let (datatype, ts) = match ts_type { + TimestampType::Millis => ( + ColumnDataType::TimestampMillisecond, + ValueData::TimestampMillisecondValue(ts), + ), + TimestampType::Nanos => ( + ColumnDataType::TimestampNanosecond, + ValueData::TimestampNanosecondValue(ts), + ), + }; let index = column_indexes.get(&name); if let Some(index) = index { - check_schema( - ColumnDataType::TimestampMillisecond, - SemanticType::Timestamp, - &schema[*index], - )?; - one_row[*index].value_data = Some(ValueData::TimestampMillisecondValue(ts)); + check_schema(datatype, SemanticType::Timestamp, &schema[*index])?; + one_row[*index].value_data = Some(ts); } else { let index = schema.len(); schema.push(ColumnSchema { column_name: name.clone(), - datatype: ColumnDataType::TimestampMillisecond as i32, + datatype: datatype as i32, semantic_type: SemanticType::Timestamp as i32, ..Default::default() }); column_indexes.insert(name, index); - one_row.push(ValueData::TimestampMillisecondValue(ts).into()) + one_row.push(ts.into()) } Ok(()) } -#[inline] fn check_schema( datatype: ColumnDataType, semantic_type: SemanticType, diff --git a/tests-integration/src/influxdb.rs b/tests-integration/src/influxdb.rs index be73e415acbb..09bdf5328a6b 100644 --- a/tests-integration/src/influxdb.rs +++ b/tests-integration/src/influxdb.rs @@ -116,12 +116,12 @@ monitor1,host=host2 memory=1027 1663840496400340001"; assert_eq!( recordbatches.pretty_print().unwrap(), "\ -+-------------------------+-------+------+--------+ -| ts | host | cpu | memory | -+-------------------------+-------+------+--------+ -| 2022-09-22T09:54:56.100 | host1 | 66.6 | 1024.0 | -| 2022-09-22T09:54:56.400 | host2 | | 1027.0 | -+-------------------------+-------+------+--------+" ++-------------------------------+-------+------+--------+ +| ts | host | cpu | memory | ++-------------------------------+-------+------+--------+ +| 2022-09-22T09:54:56.100023100 | host1 | 66.6 | 1024.0 | +| 2022-09-22T09:54:56.400340001 | host2 | | 1027.0 | ++-------------------------------+-------+------+--------+" ); } } diff --git a/tests-integration/src/otlp.rs b/tests-integration/src/otlp.rs index f72a7bae005f..637213239335 100644 --- a/tests-integration/src/otlp.rs +++ b/tests-integration/src/otlp.rs @@ -81,12 +81,12 @@ mod test { assert_eq!( recordbatches.pretty_print().unwrap(), "\ -+------------+-------+--------------------+------------+---------------------+----------------+ -| resource | scope | telemetry_sdk_name | host | greptime_timestamp | greptime_value | -+------------+-------+--------------------+------------+---------------------+----------------+ -| greptimedb | otel | java | testserver | 1970-01-01T00:00:00 | 105.0 | -| greptimedb | otel | java | testsevrer | 1970-01-01T00:00:00 | 100.0 | -+------------+-------+--------------------+------------+---------------------+----------------+", ++------------+-------+--------------------+------------+-------------------------------+----------------+ +| resource | scope | telemetry_sdk_name | host | greptime_timestamp | greptime_value | ++------------+-------+--------------------+------------+-------------------------------+----------------+ +| greptimedb | otel | java | testsevrer | 1970-01-01T00:00:00.000000100 | 100.0 | +| greptimedb | otel | java | testserver | 1970-01-01T00:00:00.000000105 | 105.0 | ++------------+-------+--------------------+------------+-------------------------------+----------------+", ); let mut output = instance @@ -123,11 +123,11 @@ mod test { assert_eq!( recordbatches.pretty_print().unwrap(), "\ -+------------+-------+--------------------+------------+---------------------+----------------+ -| resource | scope | telemetry_sdk_name | host | greptime_timestamp | greptime_value | -+------------+-------+--------------------+------------+---------------------+----------------+ -| greptimedb | otel | java | testserver | 1970-01-01T00:00:00 | 51.0 | -+------------+-------+--------------------+------------+---------------------+----------------+", ++------------+-------+--------------------+------------+-------------------------------+----------------+ +| resource | scope | telemetry_sdk_name | host | greptime_timestamp | greptime_value | ++------------+-------+--------------------+------------+-------------------------------+----------------+ +| greptimedb | otel | java | testserver | 1970-01-01T00:00:00.000000100 | 51.0 | ++------------+-------+--------------------+------------+-------------------------------+----------------+", ); let mut output = instance @@ -141,11 +141,11 @@ mod test { assert_eq!( recordbatches.pretty_print().unwrap(), "\ -+------------+-------+--------------------+------------+---------------------+----------------+ -| resource | scope | telemetry_sdk_name | host | greptime_timestamp | greptime_value | -+------------+-------+--------------------+------------+---------------------+----------------+ -| greptimedb | otel | java | testserver | 1970-01-01T00:00:00 | 4.0 | -+------------+-------+--------------------+------------+---------------------+----------------+", ++------------+-------+--------------------+------------+-------------------------------+----------------+ +| resource | scope | telemetry_sdk_name | host | greptime_timestamp | greptime_value | ++------------+-------+--------------------+------------+-------------------------------+----------------+ +| greptimedb | otel | java | testserver | 1970-01-01T00:00:00.000000100 | 4.0 | ++------------+-------+--------------------+------------+-------------------------------+----------------+" ); }