Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Serialize service errors in dedicated gRPC header instead message field #4678

Merged
merged 1 commit into from
Mar 8, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions quickwit/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions quickwit/quickwit-cli/src/tool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -475,7 +475,7 @@ pub async fn local_ingest_docs_cli(args: LocalIngestDocsArgs) -> anyhow::Result<
.ask_for_res(SpawnPipeline {
index_id: args.index_id.clone(),
source_config,
pipeline_uid: PipelineUid::from_u128(0u128),
pipeline_uid: PipelineUid::new(),
})
.await?;
let merge_pipeline_handle = indexing_server_mailbox
Expand Down Expand Up @@ -615,7 +615,7 @@ pub async fn merge_cli(args: MergeArgs) -> anyhow::Result<()> {
transform_config: None,
input_format: SourceInputFormat::Json,
},
pipeline_uid: PipelineUid::from_u128(0u128),
pipeline_uid: PipelineUid::new(),
})
.await?;
let pipeline_handle: ActorHandle<MergePipeline> = indexing_service_mailbox
Expand Down
22 changes: 11 additions & 11 deletions quickwit/quickwit-cluster/src/cluster.rs
Original file line number Diff line number Diff line change
Expand Up @@ -929,13 +929,13 @@ mod tests {
.unwrap();
let index_uid: IndexUid = IndexUid::for_test("index-1", 1);
let indexing_task1 = IndexingTask {
pipeline_uid: Some(PipelineUid::from_u128(1u128)),
pipeline_uid: Some(PipelineUid::for_test(1u128)),
index_uid: Some(index_uid.clone()),
source_id: "source-1".to_string(),
shard_ids: Vec::new(),
};
let indexing_task2 = IndexingTask {
pipeline_uid: Some(PipelineUid::from_u128(2u128)),
pipeline_uid: Some(PipelineUid::for_test(2u128)),
index_uid: Some(index_uid.clone()),
source_id: "source-1".to_string(),
shard_ids: Vec::new(),
Expand Down Expand Up @@ -1013,7 +1013,7 @@ mod tests {
let index_id = random_generator.gen_range(0..=10_000);
let source_id = random_generator.gen_range(0..=100);
IndexingTask {
pipeline_uid: Some(PipelineUid::from_u128(pipeline_id as u128)),
pipeline_uid: Some(PipelineUid::for_test(pipeline_id as u128)),
index_uid: Some(
format!("index-{index_id}:11111111111111111111111111")
.parse()
Expand Down Expand Up @@ -1246,7 +1246,7 @@ mod tests {
test_serialize_indexing_tasks_aux(&[], &mut node_state);
test_serialize_indexing_tasks_aux(
&[IndexingTask {
pipeline_uid: Some(PipelineUid::from_u128(1u128)),
pipeline_uid: Some(PipelineUid::for_test(1u128)),
index_uid: Some(index_uid.clone()),
source_id: "my-source1".to_string(),
shard_ids: vec![ShardId::from(1), ShardId::from(2)],
Expand All @@ -1256,7 +1256,7 @@ mod tests {
// change in the set of shards
test_serialize_indexing_tasks_aux(
&[IndexingTask {
pipeline_uid: Some(PipelineUid::from_u128(2u128)),
pipeline_uid: Some(PipelineUid::for_test(2u128)),
index_uid: Some(index_uid.clone()),
source_id: "my-source1".to_string(),
shard_ids: vec![ShardId::from(1), ShardId::from(2), ShardId::from(3)],
Expand All @@ -1266,13 +1266,13 @@ mod tests {
test_serialize_indexing_tasks_aux(
&[
IndexingTask {
pipeline_uid: Some(PipelineUid::from_u128(1u128)),
pipeline_uid: Some(PipelineUid::for_test(1u128)),
index_uid: Some(index_uid.clone()),
source_id: "my-source1".to_string(),
shard_ids: vec![ShardId::from(1), ShardId::from(2)],
},
IndexingTask {
pipeline_uid: Some(PipelineUid::from_u128(2u128)),
pipeline_uid: Some(PipelineUid::for_test(2u128)),
index_uid: Some(index_uid.clone()),
source_id: "my-source1".to_string(),
shard_ids: vec![ShardId::from(3), ShardId::from(4)],
Expand All @@ -1284,13 +1284,13 @@ mod tests {
test_serialize_indexing_tasks_aux(
&[
IndexingTask {
pipeline_uid: Some(PipelineUid::from_u128(1u128)),
pipeline_uid: Some(PipelineUid::for_test(1u128)),
index_uid: Some(index_uid.clone()),
source_id: "my-source1".to_string(),
shard_ids: vec![ShardId::from(1), ShardId::from(2)],
},
IndexingTask {
pipeline_uid: Some(PipelineUid::from_u128(2u128)),
pipeline_uid: Some(PipelineUid::for_test(2u128)),
index_uid: Some(IndexUid::for_test("test-index2", 0)),
source_id: "my-source1".to_string(),
shard_ids: vec![ShardId::from(3), ShardId::from(4)],
Expand All @@ -1302,13 +1302,13 @@ mod tests {
test_serialize_indexing_tasks_aux(
&[
IndexingTask {
pipeline_uid: Some(PipelineUid::from_u128(1u128)),
pipeline_uid: Some(PipelineUid::for_test(1u128)),
index_uid: Some(index_uid.clone()),
source_id: "my-source1".to_string(),
shard_ids: vec![ShardId::from(1), ShardId::from(2)],
},
IndexingTask {
pipeline_uid: Some(PipelineUid::from_u128(2u128)),
pipeline_uid: Some(PipelineUid::for_test(2u128)),
index_uid: Some(index_uid.clone()),
source_id: "my-source2".to_string(),
shard_ids: vec![ShardId::from(3), ShardId::from(4)],
Expand Down
3 changes: 2 additions & 1 deletion quickwit/quickwit-codegen/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@

1. Describe your service in a proto file.

2. Define an error and a result type for your service. The error type must implement `From<tonic::Status>` and `Into<tonic::Status>`.
2. Define an error and a result type for your service. The error type must implement `quickwit_proto::error::GrpcServiceError` and have at least the three following variants: `Internal`, `Timeout`, and `Unavailable`.

3. Add the following dependencies to your project:

Expand All @@ -25,6 +25,7 @@ tower = { workspace = true }
utoipa = { workspace = true }

quickwit-actors = { workspace = true }
quickwit-proto = { workspace = true }

[dev-dependencies]
mockall = { workspace = true }
Expand Down
1 change: 1 addition & 0 deletions quickwit/quickwit-codegen/example/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ utoipa = { workspace = true }
quickwit-actors = { workspace = true }
quickwit-common = { workspace = true }
quickwit-macros = { workspace = true }
quickwit-proto ={ workspace = true }

[dev-dependencies]
mockall = { workspace = true }
Expand Down
18 changes: 10 additions & 8 deletions quickwit/quickwit-codegen/example/src/codegen/hello.rs

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

45 changes: 34 additions & 11 deletions quickwit/quickwit-codegen/example/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,30 +20,53 @@
use std::fmt;

use quickwit_actors::AskError;
use quickwit_proto::error::GrpcServiceError;
pub use quickwit_proto::error::{grpc_error_to_grpc_status, grpc_status_to_service_error};
use quickwit_proto::{ServiceError, ServiceErrorCode};
use serde::{Deserialize, Serialize};

// Service errors have to be handwritten before codegen.
#[derive(Debug, thiserror::Error)]
#[derive(Debug, thiserror::Error, Serialize, Deserialize)]
pub enum HelloError {
#[error("internal error: {0}")]
InternalError(String),
#[error("transport error: {0}")]
TransportError(#[from] tonic::Status),
Internal(String),
#[error("invalid argument: {0}")]
InvalidArgument(String),
#[error("request timed out: {0}")]
Timeout(String),
#[error("service unavailable: {0}")]
Unavailable(String),
}

// Service errors must implement `From<tonic::Status>` and `Into<tonic::Status>`.
impl From<HelloError> for tonic::Status {
fn from(error: HelloError) -> Self {
match error {
HelloError::InternalError(message) => tonic::Status::internal(message),
HelloError::TransportError(status) => status,
impl ServiceError for HelloError {
fn error_code(&self) -> ServiceErrorCode {
match self {
Self::Internal(_) => ServiceErrorCode::Internal,
Self::InvalidArgument(_) => ServiceErrorCode::BadRequest,
Self::Timeout(_) => ServiceErrorCode::Timeout,
Self::Unavailable(_) => ServiceErrorCode::Unavailable,
}
}
}

impl GrpcServiceError for HelloError {
fn new_internal(message: String) -> Self {
Self::Internal(message)
}

fn new_timeout(message: String) -> Self {
Self::Timeout(message)
}

fn new_unavailable(message: String) -> Self {
Self::Unavailable(message)
}
}

impl<E> From<AskError<E>> for HelloError
where E: fmt::Debug
{
fn from(error: AskError<E>) -> Self {
HelloError::InternalError(format!("{error:?}"))
HelloError::Internal(format!("{error:?}"))
}
}
74 changes: 68 additions & 6 deletions quickwit/quickwit-codegen/example/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -113,18 +113,27 @@ fn spawn_ping_response_stream(
service_stream
}

#[derive(Debug, Clone)]
struct HelloImpl;
#[derive(Debug, Clone, Default)]
struct HelloImpl {
delay: Duration,
}

#[async_trait]
impl Hello for HelloImpl {
async fn hello(&mut self, request: HelloRequest) -> HelloResult<HelloResponse> {
tokio::time::sleep(self.delay).await;

if request.name.is_empty() {
return Err(HelloError::InvalidArgument("name is empty".to_string()));
}
Ok(HelloResponse {
message: format!("Hello, {}!", request.name),
})
}

async fn goodbye(&mut self, request: GoodbyeRequest) -> HelloResult<GoodbyeResponse> {
tokio::time::sleep(self.delay).await;

Ok(GoodbyeResponse {
message: format!("Goodbye, {}!", request.name),
})
Expand Down Expand Up @@ -169,7 +178,7 @@ mod tests {

#[tokio::test]
async fn test_hello_codegen() {
let mut hello = HelloImpl;
let mut hello = HelloImpl::default();

assert_eq!(
hello
Expand Down Expand Up @@ -255,7 +264,7 @@ mod tests {

#[tokio::test]
async fn test_hello_codegen_grpc() {
let grpc_server_adapter = HelloGrpcServerAdapter::new(HelloImpl);
let grpc_server_adapter = HelloGrpcServerAdapter::new(HelloImpl::default());
let grpc_server: HelloGrpcServer<HelloGrpcServerAdapter> =
HelloGrpcServer::new(grpc_server_adapter);
let addr: SocketAddr = "127.0.0.1:6666".parse().unwrap();
Expand Down Expand Up @@ -287,6 +296,16 @@ mod tests {
}
);

assert!(matches!(
grpc_client
.hello(HelloRequest {
name: "".to_string()
})
.await
.unwrap_err(),
HelloError::InvalidArgument(_)
));

let (ping_stream_tx, ping_stream) = ServiceStream::new_bounded(1);
let mut pong_stream = grpc_client.ping(ping_stream).await.unwrap();

Expand Down Expand Up @@ -484,7 +503,7 @@ mod tests {
.stack_hello_layer(hello_layer.clone())
.stack_goodbye_layer(goodbye_layer.clone())
.stack_ping_layer(ping_layer.clone())
.build(HelloImpl);
.build(HelloImpl::default());

hello_tower
.hello(HelloRequest {
Expand Down Expand Up @@ -530,7 +549,7 @@ mod tests {

#[tokio::test]
async fn test_balance_channel() {
let hello = HelloImpl;
let hello = HelloImpl::default();
let grpc_server_adapter = HelloGrpcServerAdapter::new(hello);
let grpc_server = HelloGrpcServer::new(grpc_server_adapter);
let addr: SocketAddr = "127.0.0.1:8888".parse().unwrap();
Expand Down Expand Up @@ -602,4 +621,47 @@ mod tests {
);
hello.check_connectivity().await.unwrap();
}

#[tokio::test]
async fn test_transport_errors_handling() {
let addr: SocketAddr = "127.0.0.1:9999".parse().unwrap();
let channel = Endpoint::from_static("http://127.0.0.1:9999")
.timeout(Duration::from_millis(100))
.connect_lazy();
let max_message_size = ByteSize::mib(1);
let mut grpc_client = HelloClient::from_channel(addr, channel, max_message_size);

let error = grpc_client
.hello(HelloRequest {
name: "Client".to_string(),
})
.await
.unwrap_err();
assert!(matches!(error, HelloError::Unavailable(_)));

let hello = HelloImpl {
delay: Duration::from_secs(1),
};
let grpc_server_adapter = HelloGrpcServerAdapter::new(hello);
let grpc_server: HelloGrpcServer<HelloGrpcServerAdapter> =
HelloGrpcServer::new(grpc_server_adapter);
let addr: SocketAddr = "127.0.0.1:9999".parse().unwrap();

tokio::spawn({
async move {
Server::builder()
.add_service(grpc_server)
.serve(addr)
.await
.unwrap();
}
});
let error = grpc_client
.hello(HelloRequest {
name: "Client".to_string(),
})
.await
.unwrap_err();
assert!(matches!(error, HelloError::Timeout(_)));
}
}
Loading
Loading