Skip to content

Commit

Permalink
todo: add grpc proto stacks
Browse files Browse the repository at this point in the history
  • Loading branch information
discord9 committed Dec 17, 2024
1 parent e18b5f6 commit 5491f1d
Show file tree
Hide file tree
Showing 8 changed files with 75 additions and 52 deletions.
15 changes: 2 additions & 13 deletions src/client/src/database.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ use api::v1::{
};
use arrow_flight::Ticket;
use async_stream::stream;
use common_error::ext::{BoxedError, ErrorExt};
use common_error::ext::BoxedError;
use common_grpc::flight::{FlightDecoder, FlightMessage};
use common_query::Output;
use common_recordbatch::error::ExternalSnafu;
Expand All @@ -38,7 +38,6 @@ use tonic::transport::Channel;

use crate::error::{
ConvertFlightDataSnafu, Error, FlightGetSnafu, IllegalFlightMessagesSnafu, InvalidAsciiSnafu,
ServerSnafu,
};
use crate::{from_grpc_response, Client, Result};

Expand Down Expand Up @@ -229,17 +228,7 @@ impl Database {
let response = client.mut_inner().do_get(request).await.or_else(|e| {
let tonic_code = e.code();
let e: Error = e.into();
let code = e.status_code();
let msg = e.to_string();
let error = Err(BoxedError::new(
ServerSnafu {
code,
msg,
stack_errors: Vec::new(),
}
.build(),
))
.with_context(|_| FlightGetSnafu {
let error = Err(BoxedError::new(e)).with_context(|_| FlightGetSnafu {
addr: client.addr().to_string(),
tonic_code,
});
Expand Down
9 changes: 5 additions & 4 deletions src/client/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ use std::any::Any;

use common_error::ext::{BoxedError, ErrorExt};
use common_error::status_code::StatusCode;
use common_error::ErrorInfoHeader;
use common_error::{ErrorInfoHeader, RemoteStackError};
use common_macro::stack_trace_debug;
use snafu::{location, Location, Snafu};
use tonic::{Code, Status};
Expand Down Expand Up @@ -98,7 +98,8 @@ pub enum Error {
Server {
code: StatusCode,
msg: String,
stack_errors: Vec<String>,
#[snafu(source)]
stack_errors: RemoteStackError,
#[snafu(implicit)]
location: Location,
},
Expand Down Expand Up @@ -159,7 +160,7 @@ impl From<Status> for Error {
Self::Server {
code,
msg,
stack_errors: info.stack_errors,
stack_errors: info.stack_errors.into(),
location: location!(),
}
}
Expand All @@ -173,7 +174,7 @@ impl From<Status> for Error {
Self::Server {
code,
msg,
stack_errors: Vec::new(),
stack_errors: Vec::new().into(),
location: location!(),
}
}
Expand Down
8 changes: 4 additions & 4 deletions src/client/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ pub use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME};
use common_error::status_code::StatusCode;
pub use common_query::{Output, OutputData, OutputMeta};
pub use common_recordbatch::{RecordBatches, SendableRecordBatchStream};
use snafu::OptionExt;
use snafu::{IntoError, OptionExt};

pub use self::client::Client;
#[cfg(feature = "testing")]
Expand Down Expand Up @@ -59,11 +59,11 @@ pub fn from_grpc_response(response: GreptimeResponse) -> Result<u32> {
StatusCode::from_u32(status.status_code).context(IllegalDatabaseResponseSnafu {
err_msg: format!("invalid status: {:?}", status),
})?;
ServerSnafu {
Err(ServerSnafu {
code: status_code,
msg: status.err_msg,
stack_errors: Vec::new(),
}
.fail()
// TODO(discord9): pass stack errors in grpc proto
.into_error(vec![].into()))
}
}
30 changes: 11 additions & 19 deletions src/client/src/region.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ use arc_swap::ArcSwapOption;
use arrow_flight::Ticket;
use async_stream::stream;
use async_trait::async_trait;
use common_error::ext::{BoxedError, ErrorExt};
use common_error::ext::BoxedError;
use common_error::status_code::StatusCode;
use common_grpc::flight::{FlightDecoder, FlightMessage};
use common_meta::error::{self as meta_error, Result as MetaResult};
Expand All @@ -33,7 +33,7 @@ use common_telemetry::error;
use common_telemetry::tracing_context::TracingContext;
use prost::Message;
use query::query_engine::DefaultSerializer;
use snafu::{location, OptionExt, ResultExt};
use snafu::{location, IntoError, OptionExt, ResultExt};
use substrait::{DFLogicalSubstraitConvertor, SubstraitPlan};
use tokio_stream::StreamExt;

Expand Down Expand Up @@ -101,25 +101,17 @@ impl RegionRequester {
.map_err(|e| {
let tonic_code = e.code();
let e: error::Error = e.into();
let code = e.status_code();
let msg = e.to_string();
let error = ServerSnafu {
code,
msg,
stack_errors: Vec::new(),
}
.fail::<()>()
.map_err(BoxedError::new)
.with_context(|_| FlightGetSnafu {
tonic_code,
addr: flight_client.addr().to_string(),
})
.unwrap_err();
error!(
e; "Failed to do Flight get, addr: {}, code: {}",
flight_client.addr(),
tonic_code
);
let error = Err::<(), _>(BoxedError::new(e))
.with_context(|_| FlightGetSnafu {
tonic_code,
addr: flight_client.addr().to_string(),
})
.unwrap_err();
error
})?;

Expand Down Expand Up @@ -241,12 +233,12 @@ pub fn check_response_header(header: &Option<ResponseHeader>) -> Result<()> {
StatusCode::from_u32(status.status_code).context(IllegalDatabaseResponseSnafu {
err_msg: format!("unknown server status: {:?}", status),
})?;
ServerSnafu {
Err(ServerSnafu {
code,
msg: status.err_msg.clone(),
stack_errors: Vec::new(),
}
.fail()
// TODO(discord9): pass stack errors in grpc proto
.into_error(vec![].into()))
}
}

Expand Down
43 changes: 43 additions & 0 deletions src/common/error/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,49 @@ pub const ERROR_INFO_HEADER_NAME: &str = "x-greptime-err-info";
pub static GREPTIME_DB_HEADER_ERROR_INFO: HeaderName =
HeaderName::from_static(ERROR_INFO_HEADER_NAME);

#[derive(Debug, Default, Clone, PartialEq, Eq, PartialOrd, Ord)]
pub struct RemoteStackError {
pub stack_error: Vec<String>,
}

impl RemoteStackError {
pub fn from_stack_error(err: &impl StackError) -> Self {
let mut buf = Vec::new();
err.debug_fmt(0, &mut buf);
let mut cur: &dyn StackError = err;
while let Some(nxt) = cur.next() {
cur.debug_fmt(0, &mut buf);
cur = nxt;
}
RemoteStackError { stack_error: buf }
}
}

impl std::fmt::Display for RemoteStackError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{}", self.stack_error.join("\n"))
}
}

impl std::error::Error for RemoteStackError {}

impl From<Vec<String>> for RemoteStackError {
fn from(value: Vec<String>) -> Self {
RemoteStackError { stack_error: value }
}
}

impl StackError for RemoteStackError {
fn debug_fmt(&self, _layer: usize, buf: &mut Vec<String>) {
buf.extend(self.stack_error.clone());
}

/// Remote stack error has no next as it's the "leaf error"
fn next(&self) -> Option<&dyn StackError> {
None
}
}

#[derive(Debug, Default, Clone, PartialEq, Eq, PartialOrd, Ord)]
pub struct ErrorInfoHeader {
pub code: u32,
Expand Down
6 changes: 1 addition & 5 deletions src/common/error/src/status_code.rs
Original file line number Diff line number Diff line change
Expand Up @@ -242,11 +242,7 @@ macro_rules! define_into_tonic_status {
// (which is a very rare case), just ignore. Client will use Tonic status code and message.
let status_code = err.status_code();
let root_error = err.output_msg();
let error_info = ErrorInfoHeader {
code: status_code as u32,
msg: root_error.to_string(),
stack_errors: from_stacked_errors_to_list(&err),
};
let error_info = ErrorInfoHeader::from_error(&err);
headers.typed_insert(error_info);

let metadata = MetadataMap::from_headers(headers);
Expand Down
9 changes: 5 additions & 4 deletions src/meta-client/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@

use common_error::ext::ErrorExt;
use common_error::status_code::StatusCode;
use common_error::ErrorInfoHeader;
use common_error::{ErrorInfoHeader, RemoteStackError};
use common_macro::stack_trace_debug;
use snafu::{Location, Snafu};
use tonic::Status;
Expand All @@ -34,7 +34,8 @@ pub enum Error {
MetaServer {
code: StatusCode,
msg: String,
stack_errors: Vec<String>,
#[snafu(source)]
stack_errors: RemoteStackError,
tonic_code: tonic::Code,
},

Expand Down Expand Up @@ -156,7 +157,7 @@ impl From<Status> for Error {
Self::MetaServer {
code,
msg,
stack_errors: info.stack_errors,
stack_errors: info.stack_errors.into(),
tonic_code: e.code(),
}
}
Expand All @@ -170,7 +171,7 @@ impl From<Status> for Error {
Self::MetaServer {
code,
msg,
stack_errors: Vec::new(),
stack_errors: Vec::new().into(),
tonic_code: e.code(),
}
}
Expand Down
7 changes: 4 additions & 3 deletions tests/runner/src/env.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@ use client::{
Client, Database as DB, Error as ClientError, DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME,
};
use common_error::ext::ErrorExt;
use common_error::snafu::IntoError;
use common_error::RemoteStackError;
use common_query::{Output, OutputData};
use common_recordbatch::RecordBatches;
use datatypes::data_type::ConcreteDataType;
Expand Down Expand Up @@ -676,12 +678,11 @@ impl GreptimeDB {
Err(e) => {
let status_code = e.status_code();
let msg = e.output_msg();
result = ServerSnafu {
result = Err(ServerSnafu {
code: status_code,
msg,
stack_errors: Vec::new(),
}
.fail();
.into_error(RemoteStackError::from_stack_error(&e)));
}
}
}
Expand Down

0 comments on commit 5491f1d

Please sign in to comment.