diff --git a/src/servers/src/http/handler.rs b/src/servers/src/http/handler.rs index f45006121b0f..a00d1744d253 100644 --- a/src/servers/src/http/handler.rs +++ b/src/servers/src/http/handler.rs @@ -18,24 +18,21 @@ use std::time::Instant; use aide::transform::TransformOperation; use axum::extract::{Json, Query, State}; -use axum::http::{header, HeaderValue}; use axum::response::{IntoResponse, Response}; use axum::{Extension, Form}; use common_error::ext::ErrorExt; use common_error::status_code::StatusCode; use itertools::Itertools; -use mime_guess::mime; use query::parser::PromQuery; use schemars::JsonSchema; use serde::{Deserialize, Serialize}; use session::context::QueryContextRef; +use crate::http::csv_result::CsvResponse; use crate::http::error_result::ErrorResponse; use crate::http::greptime_result_v1::{GreptimedbV1Response, GREPTIME_V1_TYPE}; -use crate::http::influxdb_result_v1::InfluxdbV1Response; -use crate::http::{ - ApiState, Epoch, GreptimeOptionsConfigState, GreptimeQueryOutput, QueryResponse, ResponseFormat, -}; +use crate::http::influxdb_result_v1::{InfluxdbV1Response, INFLUXDB_V1_TYPE}; +use crate::http::{ApiState, Epoch, GreptimeOptionsConfigState, QueryResponse, ResponseFormat}; use crate::metrics_handler::MetricsHandler; use crate::query_handler::sql::ServerSqlQueryHandlerRef; @@ -64,7 +61,7 @@ pub async fn sql( Query(query_params): Query, Extension(query_ctx): Extension, Form(form_params): Form, -) -> Response { +) -> QueryResponse { let sql_handler = &state.sql_handler; let start = Instant::now(); @@ -86,7 +83,7 @@ pub async fn sql( .with_label_values(&[db.as_str()]) .start_timer(); - let resp = if let Some(sql) = &sql { + let result = if let Some(sql) = &sql { if let Some((status, msg)) = validate_schema(sql_handler.clone(), query_ctx.clone()).await { Err((status, msg)) } else { @@ -99,83 +96,27 @@ pub async fn sql( )) }; - match resp { - Ok(outputs) => match format { - ResponseFormat::Csv => { - let mut resp = { - let resp = GreptimedbV1Response::from_output(outputs).await; - if resp.output.len() > 1 { - GreptimedbV1Response::with_error_message( - "Multi-statements are not allowed".to_string(), - StatusCode::InvalidArguments, - ) - } else { - resp - } - }; - let execution_time = start.elapsed().as_millis() as u64; - resp.with_execution_time(execution_time); - if resp.error.is_some() { - Json(QueryResponse::GreptimedbV1(resp)).into_response() - } else { - let tuple = match resp.output.pop() { - None => ( - [( - header::CONTENT_TYPE, - HeaderValue::from_static(mime::TEXT_CSV_UTF_8.as_ref()), - )], - "".to_string(), - ), - Some(GreptimeQueryOutput::AffectedRows(n)) => ( - [( - header::CONTENT_TYPE, - HeaderValue::from_static(mime::TEXT_CSV_UTF_8.as_ref()), - )], - format!("{n}"), - ), - Some(GreptimeQueryOutput::Records(records)) => { - let mut result = records - .rows - .iter() - .map(|row| row.iter().map(|v| v.to_string()).join(",")) - .join("\n"); - result.push_str("\n"); - ( - [( - header::CONTENT_TYPE, - HeaderValue::from_static(mime::TEXT_CSV_UTF_8.as_ref()), - )], - result, - ) - } - }; - tuple.into_response() - } - } - ResponseFormat::GreptimedbV1 => { - let mut resp = GreptimedbV1Response::from_output(outputs).await; - resp.with_execution_time(start.elapsed().as_millis() as u64); - Json(QueryResponse::GreptimedbV1(resp)).into_response() - } - ResponseFormat::InfluxdbV1 => { - let mut resp = InfluxdbV1Response::from_output(outputs, epoch).await; - resp.with_execution_time(start.elapsed().as_millis() as u64); - Json(QueryResponse::InfluxdbV1(resp)).into_response() - } - }, - Err((status, msg)) => match format { - ResponseFormat::Csv | ResponseFormat::GreptimedbV1 => { - let mut resp = GreptimedbV1Response::with_error_message(msg, status); - resp.with_execution_time(start.elapsed().as_millis() as u64); - Json(QueryResponse::GreptimedbV1(resp)).into_response() - } - ResponseFormat::InfluxdbV1 => { - let mut resp = InfluxdbV1Response::with_error_message(msg); - resp.with_execution_time(start.elapsed().as_millis() as u64); - Json(QueryResponse::InfluxdbV1(resp)).into_response() - } - }, - } + let outputs = match result { + Err((status, msg)) => { + let ty = match format { + ResponseFormat::InfluxdbV1 => INFLUXDB_V1_TYPE, + _ => GREPTIME_V1_TYPE, + }; + return QueryResponse::Error( + ErrorResponse::from_error_message(ty, msg, status) + .with_execution_time(start.elapsed().as_millis() as u64), + ); + } + Ok(outputs) => outputs, + }; + + let resp = match format { + ResponseFormat::Csv => CsvResponse::from_output(outputs).await, + ResponseFormat::GreptimedbV1 => GreptimedbV1Response::from_output(outputs).await, + ResponseFormat::InfluxdbV1 => InfluxdbV1Response::from_output(outputs, epoch).await, + }; + + resp.with_execution_time(start.elapsed().as_millis() as u64) } #[derive(Debug, Default, Serialize, Deserialize, JsonSchema)]