Skip to content

Commit

Permalink
sql
Browse files Browse the repository at this point in the history
Signed-off-by: tison <[email protected]>
  • Loading branch information
tisonkun committed Jan 2, 2024
1 parent b6e9915 commit 85cc4b6
Showing 1 changed file with 26 additions and 85 deletions.
111 changes: 26 additions & 85 deletions src/servers/src/http/handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,24 +18,21 @@ use std::time::Instant;

use aide::transform::TransformOperation;
use axum::extract::{Json, Query, State};
use axum::http::{header, HeaderValue};
use axum::response::{IntoResponse, Response};
use axum::{Extension, Form};
use common_error::ext::ErrorExt;
use common_error::status_code::StatusCode;
use itertools::Itertools;
use mime_guess::mime;
use query::parser::PromQuery;
use schemars::JsonSchema;
use serde::{Deserialize, Serialize};
use session::context::QueryContextRef;

use crate::http::csv_result::CsvResponse;
use crate::http::error_result::ErrorResponse;
use crate::http::greptime_result_v1::{GreptimedbV1Response, GREPTIME_V1_TYPE};
use crate::http::influxdb_result_v1::InfluxdbV1Response;
use crate::http::{
ApiState, Epoch, GreptimeOptionsConfigState, GreptimeQueryOutput, QueryResponse, ResponseFormat,
};
use crate::http::influxdb_result_v1::{InfluxdbV1Response, INFLUXDB_V1_TYPE};
use crate::http::{ApiState, Epoch, GreptimeOptionsConfigState, QueryResponse, ResponseFormat};
use crate::metrics_handler::MetricsHandler;
use crate::query_handler::sql::ServerSqlQueryHandlerRef;

Expand Down Expand Up @@ -64,7 +61,7 @@ pub async fn sql(
Query(query_params): Query<SqlQuery>,
Extension(query_ctx): Extension<QueryContextRef>,
Form(form_params): Form<SqlQuery>,
) -> Response {
) -> QueryResponse {
let sql_handler = &state.sql_handler;

let start = Instant::now();
Expand All @@ -86,7 +83,7 @@ pub async fn sql(
.with_label_values(&[db.as_str()])
.start_timer();

let resp = if let Some(sql) = &sql {
let result = if let Some(sql) = &sql {
if let Some((status, msg)) = validate_schema(sql_handler.clone(), query_ctx.clone()).await {
Err((status, msg))
} else {
Expand All @@ -99,83 +96,27 @@ pub async fn sql(
))
};

match resp {
Ok(outputs) => match format {
ResponseFormat::Csv => {
let mut resp = {
let resp = GreptimedbV1Response::from_output(outputs).await;
if resp.output.len() > 1 {
GreptimedbV1Response::with_error_message(
"Multi-statements are not allowed".to_string(),
StatusCode::InvalidArguments,
)
} else {
resp
}
};
let execution_time = start.elapsed().as_millis() as u64;
resp.with_execution_time(execution_time);
if resp.error.is_some() {
Json(QueryResponse::GreptimedbV1(resp)).into_response()
} else {
let tuple = match resp.output.pop() {
None => (
[(
header::CONTENT_TYPE,
HeaderValue::from_static(mime::TEXT_CSV_UTF_8.as_ref()),
)],
"".to_string(),
),
Some(GreptimeQueryOutput::AffectedRows(n)) => (
[(
header::CONTENT_TYPE,
HeaderValue::from_static(mime::TEXT_CSV_UTF_8.as_ref()),
)],
format!("{n}"),
),
Some(GreptimeQueryOutput::Records(records)) => {
let mut result = records
.rows
.iter()
.map(|row| row.iter().map(|v| v.to_string()).join(","))
.join("\n");
result.push_str("\n");
(
[(
header::CONTENT_TYPE,
HeaderValue::from_static(mime::TEXT_CSV_UTF_8.as_ref()),
)],
result,
)
}
};
tuple.into_response()
}
}
ResponseFormat::GreptimedbV1 => {
let mut resp = GreptimedbV1Response::from_output(outputs).await;
resp.with_execution_time(start.elapsed().as_millis() as u64);
Json(QueryResponse::GreptimedbV1(resp)).into_response()
}
ResponseFormat::InfluxdbV1 => {
let mut resp = InfluxdbV1Response::from_output(outputs, epoch).await;
resp.with_execution_time(start.elapsed().as_millis() as u64);
Json(QueryResponse::InfluxdbV1(resp)).into_response()
}
},
Err((status, msg)) => match format {
ResponseFormat::Csv | ResponseFormat::GreptimedbV1 => {
let mut resp = GreptimedbV1Response::with_error_message(msg, status);
resp.with_execution_time(start.elapsed().as_millis() as u64);
Json(QueryResponse::GreptimedbV1(resp)).into_response()
}
ResponseFormat::InfluxdbV1 => {
let mut resp = InfluxdbV1Response::with_error_message(msg);
resp.with_execution_time(start.elapsed().as_millis() as u64);
Json(QueryResponse::InfluxdbV1(resp)).into_response()
}
},
}
let outputs = match result {
Err((status, msg)) => {
let ty = match format {
ResponseFormat::InfluxdbV1 => INFLUXDB_V1_TYPE,
_ => GREPTIME_V1_TYPE,
};
return QueryResponse::Error(
ErrorResponse::from_error_message(ty, msg, status)
.with_execution_time(start.elapsed().as_millis() as u64),
);
}
Ok(outputs) => outputs,
};

let resp = match format {
ResponseFormat::Csv => CsvResponse::from_output(outputs).await,
ResponseFormat::GreptimedbV1 => GreptimedbV1Response::from_output(outputs).await,
ResponseFormat::InfluxdbV1 => InfluxdbV1Response::from_output(outputs, epoch).await,
};

resp.with_execution_time(start.elapsed().as_millis() as u64)
}

#[derive(Debug, Default, Serialize, Deserialize, JsonSchema)]
Expand Down

0 comments on commit 85cc4b6

Please sign in to comment.