From 99dda93f0ea18ebed7dcf26112d094fcb1c222ea Mon Sep 17 00:00:00 2001 From: JeremyHi Date: Thu, 14 Dec 2023 00:15:37 +0800 Subject: [PATCH] feat: sql with influxdb v1 result format (#2917) * feat: sql with influxdb v1 result format * chore: add unit tests * feat: minor refactor * chore: by comment * chore; u128 to u64 since serde can't deser u128 in enum * chore: by comment * chore: apply suggestion * chore: revert suggestion * chore: try again --------- Co-authored-by: dennis zhuang --- src/servers/src/http.rs | 225 +++++++++++++++--- src/servers/src/http/handler.rs | 50 +++- src/servers/src/http/influxdb_result_v1.rs | 243 ++++++++++++++++++++ src/servers/src/http/script.rs | 15 +- src/servers/tests/http/http_handler_test.rs | 242 +++++++++++++------ tests-integration/tests/http.rs | 87 ++++++- 6 files changed, 740 insertions(+), 122 deletions(-) create mode 100644 src/servers/src/http/influxdb_result_v1.rs diff --git a/src/servers/src/http.rs b/src/servers/src/http.rs index 864ed652ca00..0bd6f90319f0 100644 --- a/src/servers/src/http.rs +++ b/src/servers/src/http.rs @@ -26,7 +26,9 @@ pub mod script; #[cfg(feature = "dashboard")] mod dashboard; +pub mod influxdb_result_v1; +use std::fmt::Display; use std::net::SocketAddr; use std::time::{Duration, Instant}; @@ -47,8 +49,9 @@ use common_error::ext::ErrorExt; use common_error::status_code::StatusCode; use common_query::Output; use common_recordbatch::{util, RecordBatch}; -use common_telemetry::logging::{self, info}; -use common_telemetry::{debug, error}; +use common_telemetry::logging::{debug, error, info}; +use common_time::timestamp::TimeUnit; +use common_time::Timestamp; use datatypes::data_type::DataType; use futures::FutureExt; use schemars::JsonSchema; @@ -66,6 +69,7 @@ use crate::configurator::ConfiguratorRef; use crate::error::{AlreadyStartedSnafu, Error, Result, StartHttpSnafu, ToJsonSnafu}; use crate::http::authorize::HttpAuth; use crate::http::influxdb::{influxdb_health, influxdb_ping, influxdb_write_v1, influxdb_write_v2}; +use crate::http::influxdb_result_v1::InfluxdbV1Response; use crate::http::prometheus::{ format_query, instant_query, label_values_query, labels_query, range_query, series_query, }; @@ -243,17 +247,17 @@ pub enum JsonOutput { } #[derive(Serialize, Deserialize, Debug, JsonSchema)] -pub struct JsonResponse { +pub struct GreptimedbV1Response { code: u32, #[serde(skip_serializing_if = "Option::is_none")] error: Option, + #[serde(skip_serializing_if = "Vec::is_empty", default)] + output: Vec, #[serde(skip_serializing_if = "Option::is_none")] - output: Option>, - #[serde(skip_serializing_if = "Option::is_none")] - execution_time_ms: Option, + execution_time_ms: Option, } -impl JsonResponse { +impl GreptimedbV1Response { pub fn with_error(error: impl ErrorExt) -> Self { let code = error.status_code(); if code.should_log_error() { @@ -262,25 +266,25 @@ impl JsonResponse { debug!("Failed to handle HTTP request, err: {:?}", error); } - JsonResponse { + GreptimedbV1Response { error: Some(error.output_msg()), code: code as u32, - output: None, + output: vec![], execution_time_ms: None, } } fn with_error_message(err_msg: String, error_code: StatusCode) -> Self { - JsonResponse { + GreptimedbV1Response { error: Some(err_msg), code: error_code as u32, - output: None, + output: vec![], execution_time_ms: None, } } - fn with_output(output: Option>) -> Self { - JsonResponse { + fn with_output(output: Vec) -> Self { + GreptimedbV1Response { error: None, code: StatusCode::Success as u32, output, @@ -288,9 +292,8 @@ impl JsonResponse { } } - fn with_execution_time(mut self, execution_time: u128) -> Self { + fn with_execution_time(&mut self, execution_time: u64) { self.execution_time_ms = Some(execution_time); - self } /// Create a json response from query result @@ -333,7 +336,7 @@ impl JsonResponse { } } } - Self::with_output(Some(results)) + Self::with_output(results) } pub fn code(&self) -> u32 { @@ -348,15 +351,142 @@ impl JsonResponse { self.error.as_ref() } - pub fn output(&self) -> Option<&[JsonOutput]> { - self.output.as_deref() + pub fn output(&self) -> &[JsonOutput] { + &self.output } - pub fn execution_time_ms(&self) -> Option { + pub fn execution_time_ms(&self) -> Option { self.execution_time_ms } } +/// It allows the results of SQL queries to be presented in different formats. +/// Currently, `greptimedb_v1` and `influxdb_v1` are supported. +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub enum ResponseFormat { + GreptimedbV1, + InfluxdbV1, +} + +impl ResponseFormat { + pub fn parse(s: &str) -> Option { + match s { + "greptimedb_v1" => Some(ResponseFormat::GreptimedbV1), + "influxdb_v1" => Some(ResponseFormat::InfluxdbV1), + _ => None, + } + } +} + +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub enum Epoch { + Nanosecond, + Microsecond, + Millisecond, + Second, +} + +impl Epoch { + pub fn parse(s: &str) -> Option { + // Both u and µ indicate microseconds. + // epoch = [ns,u,µ,ms,s], + // For details, see the Influxdb documents. + // https://docs.influxdata.com/influxdb/v1/tools/api/#query-string-parameters-1 + match s { + "ns" => Some(Epoch::Nanosecond), + "u" | "µ" => Some(Epoch::Microsecond), + "ms" => Some(Epoch::Millisecond), + "s" => Some(Epoch::Second), + _ => None, // just returns None for other cases + } + } + + pub fn convert_timestamp(&self, ts: Timestamp) -> Option { + match self { + Epoch::Nanosecond => ts.convert_to(TimeUnit::Nanosecond), + Epoch::Microsecond => ts.convert_to(TimeUnit::Microsecond), + Epoch::Millisecond => ts.convert_to(TimeUnit::Millisecond), + Epoch::Second => ts.convert_to(TimeUnit::Second), + } + } +} + +impl Display for Epoch { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + Epoch::Nanosecond => write!(f, "Epoch::Nanosecond"), + Epoch::Microsecond => write!(f, "Epoch::Microsecond"), + Epoch::Millisecond => write!(f, "Epoch::Millisecond"), + Epoch::Second => write!(f, "Epoch::Second"), + } + } +} + +#[derive(Serialize, Deserialize, Debug, JsonSchema)] +#[serde(tag = "type")] +pub enum JsonResponse { + GreptimedbV1(GreptimedbV1Response), + InfluxdbV1(InfluxdbV1Response), +} + +impl From for JsonResponse { + fn from(value: GreptimedbV1Response) -> Self { + JsonResponse::GreptimedbV1(value) + } +} + +impl From for JsonResponse { + fn from(value: InfluxdbV1Response) -> Self { + JsonResponse::InfluxdbV1(value) + } +} + +impl JsonResponse { + pub fn with_error(error: impl ErrorExt, response_format: ResponseFormat) -> Self { + match response_format { + ResponseFormat::GreptimedbV1 => GreptimedbV1Response::with_error(error).into(), + ResponseFormat::InfluxdbV1 => InfluxdbV1Response::with_error(error).into(), + } + } + + pub fn with_error_message( + err_msg: String, + error_code: StatusCode, + response_format: ResponseFormat, + ) -> Self { + match response_format { + ResponseFormat::GreptimedbV1 => { + GreptimedbV1Response::with_error_message(err_msg, error_code).into() + } + ResponseFormat::InfluxdbV1 => InfluxdbV1Response::with_error_message(err_msg).into(), + } + } + pub async fn from_output( + outputs: Vec>, + response_format: ResponseFormat, + epoch: Option, + ) -> Self { + match response_format { + ResponseFormat::GreptimedbV1 => GreptimedbV1Response::from_output(outputs).await.into(), + ResponseFormat::InfluxdbV1 => { + InfluxdbV1Response::from_output(outputs, epoch).await.into() + } + } + } + + fn with_execution_time(mut self, execution_time: u128) -> Self { + match &mut self { + JsonResponse::GreptimedbV1(resp) => { + resp.with_execution_time(execution_time as u64); + } + JsonResponse::InfluxdbV1(resp) => { + resp.with_execution_time(execution_time as u64); + } + } + self + } +} + async fn serve_api(Extension(api): Extension) -> impl IntoApiResponse { Json(api) } @@ -772,11 +902,12 @@ impl Server for HttpServer { /// handle error middleware async fn handle_error(err: BoxError) -> Json { - logging::error!("Unhandled internal error: {}", err); + error!(err; "Unhandled internal error"); Json(JsonResponse::with_error_message( format!("Unhandled internal error: {err}"), StatusCode::Unexpected, + ResponseFormat::GreptimedbV1, )) } @@ -920,22 +1051,44 @@ mod test { ])), ]; let recordbatch = RecordBatch::new(schema.clone(), columns).unwrap(); - let recordbatches = RecordBatches::try_new(schema.clone(), vec![recordbatch]).unwrap(); - - let json_resp = - JsonResponse::from_output(vec![Ok(Output::RecordBatches(recordbatches))]).await; - - let json_output = &json_resp.output.unwrap()[0]; - if let JsonOutput::Records(r) = json_output { - assert_eq!(r.num_rows(), 4); - assert_eq!(r.num_cols(), 2); - let schema = r.schema.as_ref().unwrap(); - assert_eq!(schema.column_schemas[0].name, "numbers"); - assert_eq!(schema.column_schemas[0].data_type, "UInt32"); - assert_eq!(r.rows[0][0], serde_json::Value::from(1)); - assert_eq!(r.rows[0][1], serde_json::Value::Null); - } else { - panic!("invalid output type"); + + for format in [ResponseFormat::GreptimedbV1, ResponseFormat::InfluxdbV1] { + let recordbatches = + RecordBatches::try_new(schema.clone(), vec![recordbatch.clone()]).unwrap(); + let json_resp = JsonResponse::from_output( + vec![Ok(Output::RecordBatches(recordbatches))], + format, + None, + ) + .await; + + match json_resp { + JsonResponse::GreptimedbV1(json_resp) => { + let json_output = &json_resp.output[0]; + if let JsonOutput::Records(r) = json_output { + assert_eq!(r.num_rows(), 4); + assert_eq!(r.num_cols(), 2); + let schema = r.schema.as_ref().unwrap(); + assert_eq!(schema.column_schemas[0].name, "numbers"); + assert_eq!(schema.column_schemas[0].data_type, "UInt32"); + assert_eq!(r.rows[0][0], serde_json::Value::from(1)); + assert_eq!(r.rows[0][1], serde_json::Value::Null); + } else { + panic!("invalid output type"); + } + } + JsonResponse::InfluxdbV1(json_resp) => { + let json_output = &json_resp.results()[0]; + assert_eq!(json_output.num_rows(), 4); + assert_eq!(json_output.num_cols(), 2); + assert_eq!(json_output.series[0].columns.clone()[0], "numbers"); + assert_eq!( + json_output.series[0].values[0][0], + serde_json::Value::from(1) + ); + assert_eq!(json_output.series[0].values[0][1], serde_json::Value::Null); + } + } } } } diff --git a/src/servers/src/http/handler.rs b/src/servers/src/http/handler.rs index 3a117d17b3a2..f615e520cb98 100644 --- a/src/servers/src/http/handler.rs +++ b/src/servers/src/http/handler.rs @@ -27,7 +27,7 @@ use schemars::JsonSchema; use serde::{Deserialize, Serialize}; use session::context::QueryContextRef; -use crate::http::{ApiState, GreptimeOptionsConfigState, JsonResponse}; +use crate::http::{ApiState, Epoch, GreptimeOptionsConfigState, JsonResponse, ResponseFormat}; use crate::metrics_handler::MetricsHandler; use crate::query_handler::sql::ServerSqlQueryHandlerRef; @@ -35,6 +35,18 @@ use crate::query_handler::sql::ServerSqlQueryHandlerRef; pub struct SqlQuery { pub db: Option, pub sql: Option, + // (Optional) result format: [`gerptimedb_v1`, `influxdb_v1`], + // the default value is `greptimedb_v1` + pub format: Option, + // Returns epoch timestamps with the specified precision. + // Both u and µ indicate microseconds. + // epoch = [ns,u,µ,ms,s], + // + // TODO(jeremy): currently, only InfluxDB result format is supported, + // and all columns of the `Timestamp` type will be converted to their + // specified time precision. Maybe greptimedb format can support this + // param too. + pub epoch: Option, } /// Handler to execute sql @@ -50,20 +62,33 @@ pub async fn sql( let start = Instant::now(); let sql = query_params.sql.or(form_params.sql); let db = query_ctx.get_db_string(); + let format = query_params + .format + .or(form_params.format) + .map(|s| s.to_lowercase()) + .map(|s| ResponseFormat::parse(s.as_str()).unwrap_or(ResponseFormat::GreptimedbV1)) + .unwrap_or(ResponseFormat::GreptimedbV1); + let epoch = query_params + .epoch + .or(form_params.epoch) + .map(|s| s.to_lowercase()) + .map(|s| Epoch::parse(s.as_str()).unwrap_or(Epoch::Millisecond)); + let _timer = crate::metrics::METRIC_HTTP_SQL_ELAPSED .with_label_values(&[db.as_str()]) .start_timer(); let resp = if let Some(sql) = &sql { - if let Some(resp) = validate_schema(sql_handler.clone(), query_ctx.clone()).await { + if let Some(resp) = validate_schema(sql_handler.clone(), query_ctx.clone(), format).await { return Json(resp); } - JsonResponse::from_output(sql_handler.do_query(sql, query_ctx).await).await + JsonResponse::from_output(sql_handler.do_query(sql, query_ctx).await, format, epoch).await } else { JsonResponse::with_error_message( "sql parameter is required.".to_string(), StatusCode::InvalidArguments, + format, ) }; @@ -104,13 +129,23 @@ pub async fn promql( .with_label_values(&[db.as_str()]) .start_timer(); - if let Some(resp) = validate_schema(sql_handler.clone(), query_ctx.clone()).await { + if let Some(resp) = validate_schema( + sql_handler.clone(), + query_ctx.clone(), + ResponseFormat::GreptimedbV1, + ) + .await + { return Json(resp); } let prom_query = params.into(); - let resp = - JsonResponse::from_output(sql_handler.do_promql_query(&prom_query, query_ctx).await).await; + let resp = JsonResponse::from_output( + sql_handler.do_promql_query(&prom_query, query_ctx).await, + ResponseFormat::GreptimedbV1, + None, + ) + .await; Json(resp.with_execution_time(exec_start.elapsed().as_millis())) } @@ -187,6 +222,7 @@ pub async fn config(State(state): State) -> Response async fn validate_schema( sql_handler: ServerSqlQueryHandlerRef, query_ctx: QueryContextRef, + format: ResponseFormat, ) -> Option { match sql_handler .is_valid_schema(query_ctx.current_catalog(), query_ctx.current_schema()) @@ -195,6 +231,7 @@ async fn validate_schema( Ok(false) => Some(JsonResponse::with_error_message( format!("Database not found: {}", query_ctx.get_db_string()), StatusCode::DatabaseNotFound, + format, )), Err(e) => Some(JsonResponse::with_error_message( format!( @@ -203,6 +240,7 @@ async fn validate_schema( e.output_msg(), ), StatusCode::Internal, + format, )), _ => None, } diff --git a/src/servers/src/http/influxdb_result_v1.rs b/src/servers/src/http/influxdb_result_v1.rs new file mode 100644 index 000000000000..0b8c184ce738 --- /dev/null +++ b/src/servers/src/http/influxdb_result_v1.rs @@ -0,0 +1,243 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use common_error::ext::ErrorExt; +use common_query::Output; +use common_recordbatch::{util, RecordBatch}; +use common_telemetry::{debug, error}; +use schemars::JsonSchema; +use serde::{Deserialize, Serialize}; +use serde_json::Value; +use snafu::ResultExt; + +use crate::error::{Error, ToJsonSnafu}; +use crate::http::Epoch; + +#[derive(Debug, Default, Serialize, Deserialize, JsonSchema)] +pub struct SqlQuery { + pub db: Option, + // Returns epoch timestamps with the specified precision. + // Both u and µ indicate microseconds. + // epoch = [ns,u,µ,ms,s], + pub epoch: Option, + pub sql: Option, +} + +#[derive(Serialize, Deserialize, Debug, JsonSchema, Eq, PartialEq)] +pub struct InfluxdbRecordsOutput { + // The SQL query does not return the table name, but in InfluxDB, + // we require the table name, so we set it to an empty string “”. + name: String, + pub(crate) columns: Vec, + pub(crate) values: Vec>, +} + +impl InfluxdbRecordsOutput { + pub fn new(columns: Vec, values: Vec>) -> Self { + Self { + name: "".to_string(), + columns, + values, + } + } +} + +impl TryFrom<(Option, Vec)> for InfluxdbRecordsOutput { + type Error = Error; + + fn try_from( + (epoch, recordbatches): (Option, Vec), + ) -> Result { + if recordbatches.is_empty() { + Ok(InfluxdbRecordsOutput::new(vec![], vec![])) + } else { + // Safety: ensured by previous empty check + let first = &recordbatches[0]; + let columns = first + .schema + .column_schemas() + .iter() + .map(|cs| cs.name.clone()) + .collect::>(); + + let mut rows = + Vec::with_capacity(recordbatches.iter().map(|r| r.num_rows()).sum::()); + + for recordbatch in recordbatches { + for row in recordbatch.rows() { + let value_row = row + .into_iter() + .map(|value| { + let value = match (epoch, &value) { + (Some(epoch), datatypes::value::Value::Timestamp(ts)) => { + if let Some(timestamp) = epoch.convert_timestamp(*ts) { + datatypes::value::Value::Timestamp(timestamp) + } else { + value + } + } + _ => value, + }; + Value::try_from(value) + }) + .collect::, _>>() + .context(ToJsonSnafu)?; + + rows.push(value_row); + } + } + + Ok(InfluxdbRecordsOutput::new(columns, rows)) + } + } +} + +#[derive(Serialize, Deserialize, Debug, JsonSchema, Eq, PartialEq)] +pub struct InfluxdbOutput { + pub statement_id: u32, + pub series: Vec, +} + +impl InfluxdbOutput { + pub fn num_rows(&self) -> usize { + self.series.iter().map(|r| r.values.len()).sum() + } + + pub fn num_cols(&self) -> usize { + self.series + .first() + .map(|r| r.columns.len()) + .unwrap_or(0usize) + } +} + +#[derive(Serialize, Deserialize, Debug, JsonSchema)] +pub struct InfluxdbV1Response { + results: Vec, + #[serde(skip_serializing_if = "Option::is_none")] + error: Option, + #[serde(skip_serializing_if = "Option::is_none")] + execution_time_ms: Option, +} + +impl InfluxdbV1Response { + pub fn with_error(error: impl ErrorExt) -> Self { + let code = error.status_code(); + if code.should_log_error() { + error!(error; "Failed to handle HTTP request"); + } else { + debug!("Failed to handle HTTP request, err: {:?}", error); + } + + InfluxdbV1Response { + results: vec![], + error: Some(error.output_msg()), + execution_time_ms: None, + } + } + + pub fn with_error_message(err_msg: String) -> Self { + InfluxdbV1Response { + results: vec![], + error: Some(err_msg), + execution_time_ms: None, + } + } + + fn with_output(results: Vec) -> Self { + InfluxdbV1Response { + results, + error: None, + execution_time_ms: None, + } + } + + pub fn with_execution_time(&mut self, execution_time: u64) { + self.execution_time_ms = Some(execution_time); + } + + /// Create a influxdb v1 response from query result + pub async fn from_output( + outputs: Vec>, + epoch: Option, + ) -> Self { + // TODO(sunng87): this api response structure cannot represent error + // well. It hides successful execution results from error response + let mut results = Vec::with_capacity(outputs.len()); + for (statement_id, out) in outputs.into_iter().enumerate() { + let statement_id = statement_id as u32; + match out { + Ok(Output::AffectedRows(_)) => { + results.push(InfluxdbOutput { + statement_id, + series: vec![], + }); + } + Ok(Output::Stream(stream)) => { + // TODO(sunng87): streaming response + match util::collect(stream).await { + Ok(rows) => match InfluxdbRecordsOutput::try_from((epoch, rows)) { + Ok(rows) => { + results.push(InfluxdbOutput { + statement_id, + series: vec![rows], + }); + } + Err(err) => { + return Self::with_error(err); + } + }, + + Err(e) => { + return Self::with_error(e); + } + } + } + Ok(Output::RecordBatches(rbs)) => { + match InfluxdbRecordsOutput::try_from((epoch, rbs.take())) { + Ok(rows) => { + results.push(InfluxdbOutput { + statement_id, + series: vec![rows], + }); + } + Err(err) => { + return Self::with_error(err); + } + } + } + Err(e) => { + return Self::with_error(e); + } + } + } + Self::with_output(results) + } + + pub fn success(&self) -> bool { + self.error.is_none() + } + + pub fn error(&self) -> Option<&String> { + self.error.as_ref() + } + + pub fn results(&self) -> &[InfluxdbOutput] { + &self.results + } + + pub fn execution_time_ms(&self) -> Option { + self.execution_time_ms + } +} diff --git a/src/servers/src/http/script.rs b/src/servers/src/http/script.rs index ce7cbb2faeac..741cc13171fc 100644 --- a/src/servers/src/http/script.rs +++ b/src/servers/src/http/script.rs @@ -25,15 +25,19 @@ use session::context::QueryContext; use snafu::ResultExt; use crate::error::{HyperSnafu, InvalidUtf8ValueSnafu}; -use crate::http::{ApiState, JsonResponse}; +use crate::http::{ApiState, GreptimedbV1Response, JsonResponse, ResponseFormat}; macro_rules! json_err { ($e: expr) => {{ - return Json(JsonResponse::with_error($e)); + return Json(JsonResponse::with_error($e, ResponseFormat::GreptimedbV1)); }}; ($msg: expr, $code: expr) => {{ - return Json(JsonResponse::with_error_message($msg.to_string(), $code)); + return Json(JsonResponse::with_error_message( + $msg.to_string(), + $code, + ResponseFormat::GreptimedbV1, + )); }}; } @@ -80,7 +84,7 @@ pub async fn scripts( .insert_script(query_ctx, name.unwrap(), &script) .await { - Ok(()) => JsonResponse::with_output(None), + Ok(()) => GreptimedbV1Response::with_output(vec![]).into(), Err(e) => json_err!( format!("Insert script error: {}", e.output_msg()), e.status_code() @@ -133,7 +137,8 @@ pub async fn run_script( let output = script_handler .execute_script(query_ctx, name.unwrap(), params.params) .await; - let resp = JsonResponse::from_output(vec![output]).await; + let resp = + JsonResponse::from_output(vec![output], ResponseFormat::GreptimedbV1, None).await; Json(resp.with_execution_time(start.elapsed().as_millis())) } else { diff --git a/src/servers/tests/http/http_handler_test.rs b/src/servers/tests/http/http_handler_test.rs index 012a997b46ea..e29953ed7553 100644 --- a/src/servers/tests/http/http_handler_test.rs +++ b/src/servers/tests/http/http_handler_test.rs @@ -21,7 +21,7 @@ use http_body::combinators::UnsyncBoxBody; use hyper::Response; use servers::http::{ handler as http_handler, script as script_handler, ApiState, GreptimeOptionsConfigState, - JsonOutput, + JsonOutput, JsonResponse, }; use servers::metrics_handler::MetricsHandler; use session::context::QueryContext; @@ -37,52 +37,81 @@ async fn test_sql_not_provided() { let sql_handler = create_testing_sql_query_handler(MemTable::default_numbers_table()); let ctx = QueryContext::arc(); ctx.set_current_user(Some(auth::userinfo_by_name(None))); - let Json(json) = http_handler::sql( - State(ApiState { - sql_handler, - script_handler: None, - }), - Query(http_handler::SqlQuery::default()), - axum::Extension(ctx), - Form(http_handler::SqlQuery::default()), - ) - .await; - assert!(!json.success()); - assert_eq!( - Some(&"sql parameter is required.".to_string()), - json.error() - ); - assert!(json.output().is_none()); + let api_state = ApiState { + sql_handler, + script_handler: None, + }; + + for format in ["greptimedb_v1", "influxdb_v1"] { + let query = http_handler::SqlQuery { + db: None, + sql: None, + format: Some(format.to_string()), + epoch: None, + }; + let Json(json) = http_handler::sql( + State(api_state.clone()), + Query(query), + axum::Extension(ctx.clone()), + Form(http_handler::SqlQuery::default()), + ) + .await; + + match json { + JsonResponse::GreptimedbV1(resp) => { + assert!(!resp.success()); + assert_eq!( + Some(&"sql parameter is required.".to_string()), + resp.error() + ); + assert!(resp.output().is_empty()); + } + JsonResponse::InfluxdbV1(resp) => { + assert!(!resp.success()); + assert_eq!( + Some(&"sql parameter is required.".to_string()), + resp.error() + ); + assert!(resp.results().is_empty()); + } + } + } } #[tokio::test] async fn test_sql_output_rows() { common_telemetry::init_default_ut_logging(); - let query = create_query(); let sql_handler = create_testing_sql_query_handler(MemTable::default_numbers_table()); let ctx = QueryContext::arc(); ctx.set_current_user(Some(auth::userinfo_by_name(None))); - let Json(json) = http_handler::sql( - State(ApiState { - sql_handler, - script_handler: None, - }), - query, - axum::Extension(ctx), - Form(http_handler::SqlQuery::default()), - ) - .await; - assert!(json.success(), "{json:?}"); - assert!(json.error().is_none()); - match &json.output().expect("assertion failed")[0] { - JsonOutput::Records(records) => { - assert_eq!(1, records.num_rows()); - let json = serde_json::to_string_pretty(&records).unwrap(); - assert_eq!( - json, - r#"{ + let api_state = ApiState { + sql_handler, + script_handler: None, + }; + + for format in ["greptimedb_v1", "influxdb_v1"] { + let query = create_query(format); + let Json(json) = http_handler::sql( + State(api_state.clone()), + query, + axum::Extension(ctx.clone()), + Form(http_handler::SqlQuery::default()), + ) + .await; + + match json { + JsonResponse::GreptimedbV1(resp) => { + assert!(resp.success(), "{resp:?}"); + assert!(resp.error().is_none()); + match &resp.output()[0] { + JsonOutput::Records(records) => { + assert_eq!(1, records.num_rows()); + let json = serde_json::to_string_pretty(&records).unwrap(); + assert_eq!( + json, + r#"{ "schema": { "column_schemas": [ { @@ -97,9 +126,39 @@ async fn test_sql_output_rows() { ] ] }"# - ); + ); + } + _ => unreachable!(), + } + } + JsonResponse::InfluxdbV1(resp) => { + assert!(resp.success(), "{resp:?}"); + assert!(resp.error().is_none()); + + let json = serde_json::to_string_pretty(&resp.results()).unwrap(); + assert_eq!( + json, + r#"[ + { + "statement_id": 0, + "series": [ + { + "name": "", + "columns": [ + "SUM(numbers.uint32s)" + ], + "values": [ + [ + 4950 + ] + ] + } + ] + } +]"# + ); + } } - _ => unreachable!(), } } @@ -107,31 +166,36 @@ async fn test_sql_output_rows() { async fn test_sql_form() { common_telemetry::init_default_ut_logging(); - let form = create_form(); let sql_handler = create_testing_sql_query_handler(MemTable::default_numbers_table()); let ctx = QueryContext::arc(); ctx.set_current_user(Some(auth::userinfo_by_name(None))); + let api_state = ApiState { + sql_handler, + script_handler: None, + }; - let Json(json) = http_handler::sql( - State(ApiState { - sql_handler, - script_handler: None, - }), - Query(http_handler::SqlQuery::default()), - axum::Extension(ctx), - form, - ) - .await; - assert!(json.success(), "{json:?}"); - assert!(json.error().is_none()); - match &json.output().expect("assertion failed")[0] { - JsonOutput::Records(records) => { - assert_eq!(1, records.num_rows()); - let json = serde_json::to_string_pretty(&records).unwrap(); - assert_eq!( - json, - r#"{ + for format in ["greptimedb_v1", "influxdb_v1"] { + let form = create_form(format); + let Json(json) = http_handler::sql( + State(api_state.clone()), + Query(http_handler::SqlQuery::default()), + axum::Extension(ctx.clone()), + form, + ) + .await; + + match json { + JsonResponse::GreptimedbV1(resp) => { + assert!(resp.success(), "{resp:?}"); + assert!(resp.error().is_none()); + match &resp.output()[0] { + JsonOutput::Records(records) => { + assert_eq!(1, records.num_rows()); + let json = serde_json::to_string_pretty(&records).unwrap(); + assert_eq!( + json, + r#"{ "schema": { "column_schemas": [ { @@ -146,9 +210,39 @@ async fn test_sql_form() { ] ] }"# - ); + ); + } + _ => unreachable!(), + } + } + JsonResponse::InfluxdbV1(resp) => { + assert!(resp.success(), "{resp:?}"); + assert!(resp.error().is_none()); + + let json = serde_json::to_string_pretty(&resp.results()).unwrap(); + assert_eq!( + json, + r#"[ + { + "statement_id": 0, + "series": [ + { + "name": "", + "columns": [ + "SUM(numbers.uint32s)" + ], + "values": [ + [ + 4950 + ] + ] + } + ] + } +]"# + ); + } } - _ => unreachable!(), } } @@ -181,6 +275,9 @@ async fn insert_script( body, ) .await; + let JsonResponse::GreptimedbV1(json) = json else { + unreachable!() + }; assert!(!json.success(), "{json:?}"); assert_eq!(json.error().unwrap(), "invalid schema"); @@ -196,9 +293,12 @@ async fn insert_script( body, ) .await; + let JsonResponse::GreptimedbV1(json) = json else { + unreachable!() + }; assert!(json.success(), "{json:?}"); assert!(json.error().is_none()); - assert!(json.output().is_none()); + assert!(json.output().is_empty()); } #[tokio::test] @@ -225,10 +325,13 @@ def test(n) -> vector[i64]: exec, ) .await; + let JsonResponse::GreptimedbV1(json) = json else { + unreachable!() + }; assert!(json.success(), "{json:?}"); assert!(json.error().is_none()); - match &json.output().unwrap()[0] { + match &json.output()[0] { JsonOutput::Records(records) => { let json = serde_json::to_string_pretty(&records).unwrap(); assert_eq!(5, records.num_rows()); @@ -292,10 +395,13 @@ def test(n, **params) -> vector[i64]: exec, ) .await; + let JsonResponse::GreptimedbV1(json) = json else { + unreachable!() + }; assert!(json.success(), "{json:?}"); assert!(json.error().is_none()); - match &json.output().unwrap()[0] { + match &json.output()[0] { JsonOutput::Records(records) => { let json = serde_json::to_string_pretty(&records).unwrap(); assert_eq!(5, records.num_rows()); @@ -350,17 +456,21 @@ fn create_invalid_script_query() -> Query { }) } -fn create_query() -> Query { +fn create_query(format: &str) -> Query { Query(http_handler::SqlQuery { sql: Some("select sum(uint32s) from numbers limit 20".to_string()), db: None, + format: Some(format.to_string()), + epoch: None, }) } -fn create_form() -> Form { +fn create_form(format: &str) -> Form { Form(http_handler::SqlQuery { sql: Some("select sum(uint32s) from numbers limit 20".to_string()), db: None, + format: Some(format.to_string()), + epoch: None, }) } diff --git a/tests-integration/tests/http.rs b/tests-integration/tests/http.rs index c687dd36d07a..c26e840f20c0 100644 --- a/tests-integration/tests/http.rs +++ b/tests-integration/tests/http.rs @@ -20,6 +20,7 @@ use axum_test_helper::TestClient; use common_error::status_code::StatusCode as ErrorCode; use serde_json::json; use servers::http::handler::HealthResponse; +use servers::http::influxdb_result_v1::InfluxdbOutput; use servers::http::prometheus::{PrometheusJsonResponse, PrometheusResponse}; use servers::http::{JsonOutput, JsonResponse}; use tests_integration::test_util::{ @@ -123,6 +124,9 @@ pub async fn test_sql_api(store_type: StorageType) { assert_eq!(res.status(), StatusCode::OK); let body = serde_json::from_str::(&res.text().await).unwrap(); + let JsonResponse::GreptimedbV1(body) = body else { + unreachable!() + }; assert_eq!(body.code(), 1004); assert_eq!(body.error().unwrap(), "sql parameter is required."); let _ = body.execution_time_ms().unwrap(); @@ -134,10 +138,13 @@ pub async fn test_sql_api(store_type: StorageType) { assert_eq!(res.status(), StatusCode::OK); let body = serde_json::from_str::(&res.text().await).unwrap(); + let JsonResponse::GreptimedbV1(body) = body else { + unreachable!() + }; assert!(body.success()); let _ = body.execution_time_ms().unwrap(); - let output = body.output().unwrap(); + let output = body.output(); assert_eq!(output.len(), 1); assert_eq!( output[0], @@ -146,6 +153,29 @@ pub async fn test_sql_api(store_type: StorageType) { })).unwrap() ); + // test influxdb_v1 result format + let res = client + .get("/v1/sql?format=influxdb_v1&sql=select * from numbers limit 10") + .send() + .await; + assert_eq!(res.status(), StatusCode::OK); + + let body = serde_json::from_str::(&res.text().await).unwrap(); + let JsonResponse::InfluxdbV1(body) = body else { + unreachable!() + }; + assert!(body.success()); + let _ = body.execution_time_ms().unwrap(); + + let output = body.results(); + assert_eq!(output.len(), 1); + assert_eq!( + output[0], + serde_json::from_value::(json!({ + "statement_id":0,"series":[{"name":"","columns":["number"],"values":[[0],[1],[2],[3],[4],[5],[6],[7],[8],[9]]}] + })).unwrap() + ); + // test insert and select let res = client .get("/v1/sql?sql=insert into demo values('host', 66.6, 1024, 0)") @@ -161,9 +191,12 @@ pub async fn test_sql_api(store_type: StorageType) { assert_eq!(res.status(), StatusCode::OK); let body = serde_json::from_str::(&res.text().await).unwrap(); + let JsonResponse::GreptimedbV1(body) = body else { + unreachable!() + }; assert!(body.success()); let _ = body.execution_time_ms().unwrap(); - let output = body.output().unwrap(); + let output = body.output(); assert_eq!(output.len(), 1); assert_eq!( @@ -181,9 +214,12 @@ pub async fn test_sql_api(store_type: StorageType) { assert_eq!(res.status(), StatusCode::OK); let body = serde_json::from_str::(&res.text().await).unwrap(); + let JsonResponse::GreptimedbV1(body) = body else { + unreachable!() + }; assert!(body.success()); let _ = body.execution_time_ms().unwrap(); - let output = body.output().unwrap(); + let output = body.output(); assert_eq!(output.len(), 1); assert_eq!( @@ -201,9 +237,12 @@ pub async fn test_sql_api(store_type: StorageType) { assert_eq!(res.status(), StatusCode::OK); let body = serde_json::from_str::(&res.text().await).unwrap(); + let JsonResponse::GreptimedbV1(body) = body else { + unreachable!() + }; assert!(body.success()); let _ = body.execution_time_ms().unwrap(); - let output = body.output().unwrap(); + let output = body.output(); assert_eq!(output.len(), 1); assert_eq!( output[0], @@ -220,9 +259,12 @@ pub async fn test_sql_api(store_type: StorageType) { assert_eq!(res.status(), StatusCode::OK); let body = serde_json::from_str::(&res.text().await).unwrap(); + let JsonResponse::GreptimedbV1(body) = body else { + unreachable!() + }; assert!(body.success()); let _ = body.execution_time_ms().unwrap(); - let outputs = body.output().unwrap(); + let outputs = body.output(); assert_eq!(outputs.len(), 2); assert_eq!( outputs[0], @@ -246,6 +288,9 @@ pub async fn test_sql_api(store_type: StorageType) { assert_eq!(res.status(), StatusCode::OK); let body = serde_json::from_str::(&res.text().await).unwrap(); + let JsonResponse::GreptimedbV1(body) = body else { + unreachable!() + }; assert!(!body.success()); let _ = body.execution_time_ms().unwrap(); // TODO(shuiyisong): fix this when return source err msg to client side @@ -259,9 +304,12 @@ pub async fn test_sql_api(store_type: StorageType) { assert_eq!(res.status(), StatusCode::OK); let body = serde_json::from_str::(&res.text().await).unwrap(); + let JsonResponse::GreptimedbV1(body) = body else { + unreachable!() + }; assert!(body.success()); let _ = body.execution_time_ms().unwrap(); - let outputs = body.output().unwrap(); + let outputs = body.output(); assert_eq!(outputs.len(), 1); assert_eq!( outputs[0], @@ -277,6 +325,9 @@ pub async fn test_sql_api(store_type: StorageType) { .await; assert_eq!(res.status(), StatusCode::OK); let body = serde_json::from_str::(&res.text().await).unwrap(); + let JsonResponse::GreptimedbV1(body) = body else { + unreachable!() + }; assert_eq!(body.code(), ErrorCode::DatabaseNotFound as u32); // test catalog-schema given @@ -287,9 +338,12 @@ pub async fn test_sql_api(store_type: StorageType) { assert_eq!(res.status(), StatusCode::OK); let body = serde_json::from_str::(&res.text().await).unwrap(); + let JsonResponse::GreptimedbV1(body) = body else { + unreachable!() + }; assert!(body.success()); let _ = body.execution_time_ms().unwrap(); - let outputs = body.output().unwrap(); + let outputs = body.output(); assert_eq!(outputs.len(), 1); assert_eq!( outputs[0], @@ -305,6 +359,9 @@ pub async fn test_sql_api(store_type: StorageType) { .await; assert_eq!(res.status(), StatusCode::OK); let body = serde_json::from_str::(&res.text().await).unwrap(); + let JsonResponse::GreptimedbV1(body) = body else { + unreachable!() + }; assert_eq!(body.code(), ErrorCode::DatabaseNotFound as u32); // test invalid schema @@ -314,6 +371,9 @@ pub async fn test_sql_api(store_type: StorageType) { .await; assert_eq!(res.status(), StatusCode::OK); let body = serde_json::from_str::(&res.text().await).unwrap(); + let JsonResponse::GreptimedbV1(body) = body else { + unreachable!() + }; assert_eq!(body.code(), ErrorCode::DatabaseNotFound as u32); guard.remove_all().await; @@ -330,6 +390,9 @@ pub async fn test_prometheus_promql_api(store_type: StorageType) { assert_eq!(res.status(), StatusCode::OK); let body = serde_json::from_str::(&res.text().await).unwrap(); + let JsonResponse::GreptimedbV1(body) = body else { + unreachable!() + }; assert!(body.success()); let _ = body.execution_time_ms().unwrap(); @@ -543,8 +606,11 @@ def test(n) -> vector[f64]: assert_eq!(res.status(), StatusCode::OK); let body = serde_json::from_str::(&res.text().await).unwrap(); + let JsonResponse::GreptimedbV1(body) = body else { + unreachable!() + }; assert_eq!(body.code(), 0); - assert!(body.output().is_none()); + assert!(body.output().is_empty()); // call script let res = client @@ -553,10 +619,13 @@ def test(n) -> vector[f64]: .await; assert_eq!(res.status(), StatusCode::OK); let body = serde_json::from_str::(&res.text().await).unwrap(); + let JsonResponse::GreptimedbV1(body) = body else { + unreachable!() + }; assert_eq!(body.code(), 0); let _ = body.execution_time_ms().unwrap(); - let output = body.output().unwrap(); + let output = body.output(); assert_eq!(output.len(), 1); assert_eq!( output[0],