Skip to content

Commit

Permalink
compile
Browse files Browse the repository at this point in the history
Signed-off-by: tison <[email protected]>
  • Loading branch information
tisonkun committed Jan 2, 2024
1 parent 85cc4b6 commit f3f8fd1
Show file tree
Hide file tree
Showing 10 changed files with 203 additions and 163 deletions.
116 changes: 74 additions & 42 deletions src/servers/src/http.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,25 +40,24 @@ use aide::openapi::{Info, OpenApi, Server as OpenAPIServer};
use async_trait::async_trait;
use auth::UserProviderRef;
use axum::error_handling::HandleErrorLayer;
use axum::extract::{DefaultBodyLimit, MatchedPath};
use axum::extract::{DefaultBodyLimit, MatchedPath, Query, State};
use axum::http::Request;
use axum::middleware::{self, Next};
use axum::response::{Html, IntoResponse, Json, Response};
use axum::{routing, BoxError, Extension, Router};
use axum::{routing, BoxError, Extension, Form, Router};
use common_base::readable_size::ReadableSize;
use common_base::Plugins;
use common_error::ext::ErrorExt;
use common_error::status_code::StatusCode;
use common_query::Output;
use common_recordbatch::{util, RecordBatch};
use common_telemetry::logging::{debug, error, info};
use common_recordbatch::RecordBatch;
use common_telemetry::logging::{error, info};
use common_time::timestamp::TimeUnit;
use common_time::Timestamp;
use datatypes::data_type::DataType;
use futures::FutureExt;
use schemars::JsonSchema;
use serde::{Deserialize, Serialize};
use serde_json::Value;
use session::context::QueryContextRef;
use snafu::{ensure, ResultExt};
use tokio::sync::oneshot::{self, Sender};
use tokio::sync::Mutex;
Expand All @@ -71,7 +70,8 @@ use crate::configurator::ConfiguratorRef;
use crate::error::{AlreadyStartedSnafu, Error, Result, StartHttpSnafu, ToJsonSnafu};
use crate::http::csv_result::CsvResponse;
use crate::http::error_result::ErrorResponse;
use crate::http::greptime_result_v1::GreptimedbV1Response;
use crate::http::greptime_result_v1::{GreptimedbV1Response, GREPTIME_V1_TYPE};
use crate::http::handler::SqlQuery;
use crate::http::influxdb::{influxdb_health, influxdb_ping, influxdb_write_v1, influxdb_write_v2};
use crate::http::influxdb_result_v1::InfluxdbV1Response;
use crate::http::prometheus::{
Expand Down Expand Up @@ -315,43 +315,60 @@ impl Display for Epoch {
}

#[derive(Serialize, Deserialize, Debug, JsonSchema)]
pub enum QueryResponse {
#[serde(bound(deserialize = "'de: 'static"))]
pub enum HttpResponse {
Csv(CsvResponse),
Error(ErrorResponse),
GreptimedbV1(GreptimedbV1Response),
InfluxdbV1(InfluxdbV1Response),
Error(ErrorResponse),
}

impl QueryResponse {
pub fn with_execution_time(mut self, execution_time: u64) -> Self {
impl HttpResponse {
pub fn with_execution_time(self, execution_time: u64) -> Self {
match self {
QueryResponse::Csv(resp) => {
QueryResponse::Csv(resp.with_execution_time(execution_time))
}
QueryResponse::GreptimedbV1(resp) => {
QueryResponse::GreptimedbV1(resp.with_execution_time(execution_time))
}
QueryResponse::InfluxdbV1(resp) => {
QueryResponse::InfluxdbV1(resp.with_execution_time(execution_time))
}
QueryResponse::Error(resp) => {
QueryResponse::Error(resp.with_execution_time(execution_time))
}
HttpResponse::Csv(resp) => resp.with_execution_time(execution_time).into(),
HttpResponse::GreptimedbV1(resp) => resp.with_execution_time(execution_time).into(),
HttpResponse::InfluxdbV1(resp) => resp.with_execution_time(execution_time).into(),
HttpResponse::Error(resp) => resp.with_execution_time(execution_time).into(),
}
}
}

impl IntoResponse for QueryResponse {
impl IntoResponse for HttpResponse {
fn into_response(self) -> Response {
match self {
QueryResponse::Csv(resp) => resp.into_response(),
QueryResponse::GreptimedbV1(resp) => resp.into_response(),
QueryResponse::InfluxdbV1(resp) => resp.into_response(),
QueryResponse::Error(resp) => resp.into_response(),
HttpResponse::Csv(resp) => resp.into_response(),
HttpResponse::GreptimedbV1(resp) => resp.into_response(),
HttpResponse::InfluxdbV1(resp) => resp.into_response(),
HttpResponse::Error(resp) => resp.into_response(),
}
}
}

impl From<CsvResponse> for HttpResponse {
fn from(value: CsvResponse) -> Self {
HttpResponse::Csv(value)
}
}

impl From<ErrorResponse> for HttpResponse {
fn from(value: ErrorResponse) -> Self {
HttpResponse::Error(value)
}
}

impl From<GreptimedbV1Response> for HttpResponse {
fn from(value: GreptimedbV1Response) -> Self {
HttpResponse::GreptimedbV1(value)
}
}

impl From<InfluxdbV1Response> for HttpResponse {
fn from(value: InfluxdbV1Response) -> Self {
HttpResponse::InfluxdbV1(value)
}
}

async fn serve_api(Extension(api): Extension<OpenApi>) -> impl IntoApiResponse {
Json(api)
}
Expand Down Expand Up @@ -614,11 +631,28 @@ impl HttpServer {
}

fn route_sql<S>(&self, api_state: ApiState) -> ApiRouter<S> {
// TODO(@tisonkun): aide should support ret `impl IntoResponse`.
#[axum_macros::debug_handler]
pub async fn sql(
State(state): State<ApiState>,
Query(query_params): Query<SqlQuery>,
Extension(query_ctx): Extension<QueryContextRef>,
Form(form_params): Form<SqlQuery>,
) -> Response {
let response = handler::sql(
State(state),
Query(query_params),
Extension(query_ctx),
Form(form_params),
)
.await;
response.into_response()
}

ApiRouter::new()
.api_route(
"/sql",
apirouting::get_with(handler::sql, handler::sql_docs)
.post_with(handler::sql, handler::sql_docs),
apirouting::get_with(sql, handler::sql_docs).post_with(sql, handler::sql_docs),
)
.api_route(
"/promql",
Expand Down Expand Up @@ -767,15 +801,13 @@ impl Server for HttpServer {
}

/// handle error middleware
async fn handle_error(err: BoxError) -> Json<QueryResponse> {
async fn handle_error(err: BoxError) -> Json<HttpResponse> {
error!(err; "Unhandled internal error");

Json(QueryResponse::GreptimedbV1(
GreptimedbV1Response::with_error_message(
format!("Unhandled internal error: {err}"),
StatusCode::Unexpected,
),
))
Json(HttpResponse::Error(ErrorResponse::from_error_message(
GREPTIME_V1_TYPE,
StatusCode::Unexpected,
format!("Unhandled internal error: {err}"),
)))
}

#[cfg(test)]
Expand Down Expand Up @@ -926,15 +958,15 @@ mod test {
let json_resp = match format {
ResponseFormat::Csv => unreachable!(),
ResponseFormat::GreptimedbV1 => {
QueryResponse::GreptimedbV1(GreptimedbV1Response::from_output(outputs).await)
HttpResponse::GreptimedbV1(GreptimedbV1Response::from_output(outputs).await)
}
ResponseFormat::InfluxdbV1 => {
QueryResponse::InfluxdbV1(InfluxdbV1Response::from_output(outputs, None).await)
HttpResponse::InfluxdbV1(InfluxdbV1Response::from_output(outputs, None).await)
}
};

match json_resp {
QueryResponse::GreptimedbV1(json_resp) => {
HttpResponse::GreptimedbV1(json_resp) => {
let json_output = &json_resp.output[0];
if let GreptimeQueryOutput::Records(r) = json_output {
assert_eq!(r.num_rows(), 4);
Expand All @@ -948,7 +980,7 @@ mod test {
panic!("invalid output type");
}
}
QueryResponse::InfluxdbV1(json_resp) => {
HttpResponse::InfluxdbV1(json_resp) => {
let json_output = &json_resp.results()[0];
assert_eq!(json_output.num_rows(), 4);
assert_eq!(json_output.num_cols(), 2);
Expand Down
17 changes: 11 additions & 6 deletions src/servers/src/http/authorize.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,12 +30,14 @@ use session::context::QueryContext;
use snafu::{ensure, OptionExt, ResultExt};

use super::header::GreptimeDbName;
use super::{GreptimedbV1Response, QueryResponse, PUBLIC_APIS};
use super::PUBLIC_APIS;
use crate::error::{
self, InvalidAuthorizationHeaderSnafu, InvalidParameterSnafu, InvisibleASCIISnafu,
NotFoundInfluxAuthSnafu, Result, UnsupportedAuthSchemeSnafu, UrlDecodeSnafu,
};
use crate::http::influxdb_result_v1::InfluxdbV1Response;
use crate::http::error_result::ErrorResponse;
use crate::http::greptime_result_v1::GREPTIME_V1_TYPE;
use crate::http::influxdb_result_v1::INFLUXDB_V1_TYPE;
use crate::http::HTTP_API_PREFIX;

/// AuthState is a holder state for [`UserProviderRef`]
Expand Down Expand Up @@ -119,12 +121,15 @@ pub async fn check_http_auth<B>(
}

fn err_response(is_influxdb: bool, err: impl ErrorExt) -> impl IntoResponse {
let body = if is_influxdb {
QueryResponse::InfluxdbV1(InfluxdbV1Response::with_error(err))
let ty = if is_influxdb {
INFLUXDB_V1_TYPE
} else {
QueryResponse::GreptimedbV1(GreptimedbV1Response::with_error(err))
GREPTIME_V1_TYPE
};
(StatusCode::UNAUTHORIZED, Json(body))
(
StatusCode::UNAUTHORIZED,
Json(ErrorResponse::from_error(ty, err)),
)
}

fn extract_catalog_and_schema<B>(request: &Request<B>) -> (&str, &str) {
Expand Down
41 changes: 28 additions & 13 deletions src/servers/src/http/csv_result.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::fmt::Write;

use axum::http::{header, HeaderValue};
use axum::response::{IntoResponse, Response};
use common_error::status_code::StatusCode;
Expand All @@ -23,7 +25,7 @@ use serde::{Deserialize, Serialize};

use crate::http::error_result::ErrorResponse;
use crate::http::greptime_result_v1::{GreptimedbV1Response, GREPTIME_V1_TYPE};
use crate::http::{GreptimeQueryOutput, QueryResponse};
use crate::http::{GreptimeQueryOutput, HttpResponse};

#[derive(Serialize, Deserialize, Debug, JsonSchema)]
pub struct CsvResponse {
Expand All @@ -32,23 +34,23 @@ pub struct CsvResponse {
}

impl CsvResponse {
pub async fn from_output(outputs: Vec<crate::error::Result<Output>>) -> QueryResponse {
pub async fn from_output(outputs: Vec<crate::error::Result<Output>>) -> HttpResponse {
let response = match GreptimedbV1Response::from_output(outputs).await {
QueryResponse::GreptimedbV1(resp) => resp,
QueryResponse::Error(resp) => {
return QueryResponse::Error(resp);
HttpResponse::GreptimedbV1(resp) => resp,
HttpResponse::Error(resp) => {
return HttpResponse::Error(resp);
}
resp => unreachable!("neither greptime_v1 nor error: {:?}", resp),
};

if response.output.len() > 1 {
QueryResponse::Error(ErrorResponse::from_error_message(
HttpResponse::Error(ErrorResponse::from_error_message(
GREPTIME_V1_TYPE,
"Multi-statements are not allowed".to_string(),
StatusCode::InvalidArguments,
"Multi-statements are not allowed".to_string(),
))
} else {
QueryResponse::Csv(CsvResponse {
HttpResponse::Csv(CsvResponse {
output: response.output,
execution_time_ms: response.execution_time_ms,
})
Expand All @@ -59,13 +61,23 @@ impl CsvResponse {
&self.output
}

pub fn with_execution_time(mut self, execution_time: u64) -> Self {
self.execution_time_ms = execution_time;
self
}

pub fn execution_time_ms(&self) -> u64 {
self.execution_time_ms
}
}

impl IntoResponse for CsvResponse {
fn into_response(mut self) -> Response {
debug_assert!(
self.output.len() <= 1,
"self.output has extra elements: {}",
self.output.len()
);
let payload = match self.output.pop() {
None => "".to_string(),
Some(GreptimeQueryOutput::AffectedRows(n)) => {
Expand All @@ -81,11 +93,14 @@ impl IntoResponse for CsvResponse {
}
};

let mut resp = ([], payload).into_response();
resp.headers_mut().insert(
header::CONTENT_TYPE,
HeaderValue::from_static(mime::TEXT_CSV_UTF_8.as_ref()),
);
let mut resp = (
[(
header::CONTENT_TYPE,
HeaderValue::from_static(mime::TEXT_CSV_UTF_8.as_ref()),
)],
payload,
)
.into_response();
resp.headers_mut()
.insert("X-GreptimeDB-Format", HeaderValue::from_static("CSV"));
resp
Expand Down
34 changes: 9 additions & 25 deletions src/servers/src/http/error_result.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,13 +12,12 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use axum::http::{header, HeaderValue};
use axum::http::HeaderValue;
use axum::response::{IntoResponse, Response};
use bytes::{BufMut, BytesMut};
use axum::Json;
use common_error::ext::ErrorExt;
use common_error::status_code::StatusCode;
use common_telemetry::logging::{debug, error};
use mime_guess::mime;
use schemars::JsonSchema;
use serde::{Deserialize, Serialize};

Expand All @@ -42,10 +41,10 @@ impl ErrorResponse {
debug!("Failed to handle HTTP request, err: {:?}", error);
}

Self::from_error_message(ty, error.output_msg(), code)
Self::from_error_message(ty, code, error.output_msg())
}

pub fn from_error_message(ty: &'static str, msg: String, code: StatusCode) -> Self {
pub fn from_error_message(ty: &'static str, code: StatusCode, msg: String) -> Self {
ErrorResponse {
r#type: ty,
code: code as u32,
Expand All @@ -70,25 +69,10 @@ impl ErrorResponse {

impl IntoResponse for ErrorResponse {
fn into_response(self) -> Response {
let mut buf = BytesMut::with_capacity(128).writer();
if let Err(err) = serde_json::to_writer(&mut buf, &self) {
return (
axum::http::StatusCode::INTERNAL_SERVER_ERROR,
[(
header::CONTENT_TYPE,
HeaderValue::from_static(mime::TEXT_PLAIN_UTF_8.as_ref()),
)],
err.to_string(),
)
.into_response();
}

let mut headers = headers::HeaderMap::new();
headers.insert(
header::CONTENT_TYPE,
HeaderValue::from_static(mime::APPLICATION_JSON.as_ref()),
);
headers.insert("X-GreptimeDB-Error-Code", HeaderValue::from(self.code));
(headers, buf.into_inner().freeze()).into_response()
let code = self.code;
let mut resp = Json(self).into_response();
resp.headers_mut()
.insert("X-GreptimeDB-Error-Code", HeaderValue::from(code));
resp
}
}
Loading

0 comments on commit f3f8fd1

Please sign in to comment.