diff --git a/quickwit/quickwit-codegen/example/src/codegen/hello.rs b/quickwit/quickwit-codegen/example/src/codegen/hello.rs index 8aa69f74e1f..6a6b4fc5d03 100644 --- a/quickwit/quickwit-codegen/example/src/codegen/hello.rs +++ b/quickwit/quickwit-codegen/example/src/codegen/hello.rs @@ -744,7 +744,10 @@ where .hello(request) .await .map(|response| response.into_inner()) - .map_err(crate::error::grpc_status_to_service_error) + .map_err(|status| crate::error::grpc_status_to_service_error( + status, + HelloRequest::rpc_name(), + )) } async fn goodbye( &mut self, @@ -754,7 +757,10 @@ where .goodbye(request) .await .map(|response| response.into_inner()) - .map_err(crate::error::grpc_status_to_service_error) + .map_err(|status| crate::error::grpc_status_to_service_error( + status, + GoodbyeRequest::rpc_name(), + )) } async fn ping( &mut self, @@ -766,9 +772,16 @@ where .map(|response| { let streaming: tonic::Streaming<_> = response.into_inner(); let stream = quickwit_common::ServiceStream::from(streaming); - stream.map_err(crate::error::grpc_status_to_service_error) + stream + .map_err(|status| crate::error::grpc_status_to_service_error( + status, + PingRequest::rpc_name(), + )) }) - .map_err(crate::error::grpc_status_to_service_error) + .map_err(|status| crate::error::grpc_status_to_service_error( + status, + PingRequest::rpc_name(), + )) } async fn check_connectivity(&mut self) -> anyhow::Result<()> { if self.connection_addrs_rx.borrow().len() == 0 { diff --git a/quickwit/quickwit-codegen/example/src/lib.rs b/quickwit/quickwit-codegen/example/src/lib.rs index de69c577b18..f86a682c04c 100644 --- a/quickwit/quickwit-codegen/example/src/lib.rs +++ b/quickwit/quickwit-codegen/example/src/lib.rs @@ -624,6 +624,8 @@ mod tests { #[tokio::test] async fn test_transport_errors_handling() { + quickwit_common::setup_logging_for_tests(); + let addr: SocketAddr = "127.0.0.1:9999".parse().unwrap(); let channel = Endpoint::from_static("http://127.0.0.1:9999") .timeout(Duration::from_millis(100)) diff --git a/quickwit/quickwit-codegen/src/codegen.rs b/quickwit/quickwit-codegen/src/codegen.rs index abbc9b655dc..1ffab8a192a 100644 --- a/quickwit/quickwit-codegen/src/codegen.rs +++ b/quickwit/quickwit-codegen/src/codegen.rs @@ -353,6 +353,16 @@ impl SynMethod { } } + fn rpc_name(&self, mock: bool) -> TokenStream { + let request_type = &self.request_type; + + if mock { + quote! { super::#request_type::rpc_name() } + } else { + quote! { #request_type::rpc_name() } + } + } + fn response_type(&self, context: &CodegenContext, mock: bool) -> TokenStream { let response_type = if mock { let response_type = &self.response_type; @@ -1158,6 +1168,7 @@ fn generate_grpc_client_adapter_methods(context: &CodegenContext) -> TokenStream for syn_method in &context.methods { let method_name = syn_method.name.to_token_stream(); let request_type = syn_method.request_type(false); + let rpc_name = syn_method.rpc_name(false); let response_type = syn_method.response_type(context, false); let into_response_type = if syn_method.server_streaming { @@ -1165,7 +1176,7 @@ fn generate_grpc_client_adapter_methods(context: &CodegenContext) -> TokenStream { let streaming: tonic::Streaming<_> = response.into_inner(); let stream = quickwit_common::ServiceStream::from(streaming); - stream.map_err(crate::error::grpc_status_to_service_error) + stream.map_err(|status| crate::error::grpc_status_to_service_error(status, #rpc_name)) } } } else { @@ -1177,7 +1188,7 @@ fn generate_grpc_client_adapter_methods(context: &CodegenContext) -> TokenStream .#method_name(request) .await .map(#into_response_type) - .map_err(crate::error::grpc_status_to_service_error) + .map_err(|status| crate::error::grpc_status_to_service_error(status, #rpc_name)) } }; stream.extend(method); diff --git a/quickwit/quickwit-ingest/build.rs b/quickwit/quickwit-ingest/build.rs index b9b150f94a5..4d07df25213 100644 --- a/quickwit/quickwit-ingest/build.rs +++ b/quickwit/quickwit-ingest/build.rs @@ -30,6 +30,7 @@ fn main() { .with_result_type_path("crate::Result") .with_error_type_path("crate::IngestServiceError") .with_prost_config(prost_config) + .generate_rpc_name_impls() .run() .unwrap(); } diff --git a/quickwit/quickwit-ingest/src/codegen/ingest_service.rs b/quickwit/quickwit-ingest/src/codegen/ingest_service.rs index 48449e20f74..cf242bbf98c 100644 --- a/quickwit/quickwit-ingest/src/codegen/ingest_service.rs +++ b/quickwit/quickwit-ingest/src/codegen/ingest_service.rs @@ -155,6 +155,22 @@ impl CommitType { #[allow(unused_imports)] use std::str::FromStr; use tower::{Layer, Service, ServiceExt}; +use quickwit_common::tower::RpcName; +impl RpcName for IngestRequest { + fn rpc_name() -> &'static str { + "ingest" + } +} +impl RpcName for FetchRequest { + fn rpc_name() -> &'static str { + "fetch" + } +} +impl RpcName for TailRequest { + fn rpc_name() -> &'static str { + "tail" + } +} #[cfg_attr(any(test, feature = "testsuite"), mockall::automock)] #[async_trait::async_trait] pub trait IngestService: std::fmt::Debug + dyn_clone::DynClone + Send + Sync + 'static { @@ -803,21 +819,30 @@ where .ingest(request) .await .map(|response| response.into_inner()) - .map_err(crate::error::grpc_status_to_service_error) + .map_err(|status| crate::error::grpc_status_to_service_error( + status, + IngestRequest::rpc_name(), + )) } async fn fetch(&mut self, request: FetchRequest) -> crate::Result { self.inner .fetch(request) .await .map(|response| response.into_inner()) - .map_err(crate::error::grpc_status_to_service_error) + .map_err(|status| crate::error::grpc_status_to_service_error( + status, + FetchRequest::rpc_name(), + )) } async fn tail(&mut self, request: TailRequest) -> crate::Result { self.inner .tail(request) .await .map(|response| response.into_inner()) - .map_err(crate::error::grpc_status_to_service_error) + .map_err(|status| crate::error::grpc_status_to_service_error( + status, + TailRequest::rpc_name(), + )) } } #[derive(Debug)] diff --git a/quickwit/quickwit-proto/src/codegen/quickwit/quickwit.cluster.rs b/quickwit/quickwit-proto/src/codegen/quickwit/quickwit.cluster.rs index ace252fda3a..cda28f6ed95 100644 --- a/quickwit/quickwit-proto/src/codegen/quickwit/quickwit.cluster.rs +++ b/quickwit/quickwit-proto/src/codegen/quickwit/quickwit.cluster.rs @@ -517,7 +517,10 @@ where .fetch_cluster_state(request) .await .map(|response| response.into_inner()) - .map_err(crate::error::grpc_status_to_service_error) + .map_err(|status| crate::error::grpc_status_to_service_error( + status, + FetchClusterStateRequest::rpc_name(), + )) } } #[derive(Debug)] diff --git a/quickwit/quickwit-proto/src/codegen/quickwit/quickwit.control_plane.rs b/quickwit/quickwit-proto/src/codegen/quickwit/quickwit.control_plane.rs index 238470c1a9e..03f66352340 100644 --- a/quickwit/quickwit-proto/src/codegen/quickwit/quickwit.control_plane.rs +++ b/quickwit/quickwit-proto/src/codegen/quickwit/quickwit.control_plane.rs @@ -1526,7 +1526,10 @@ where .create_index(request) .await .map(|response| response.into_inner()) - .map_err(crate::error::grpc_status_to_service_error) + .map_err(|status| crate::error::grpc_status_to_service_error( + status, + super::metastore::CreateIndexRequest::rpc_name(), + )) } async fn delete_index( &mut self, @@ -1536,7 +1539,10 @@ where .delete_index(request) .await .map(|response| response.into_inner()) - .map_err(crate::error::grpc_status_to_service_error) + .map_err(|status| crate::error::grpc_status_to_service_error( + status, + super::metastore::DeleteIndexRequest::rpc_name(), + )) } async fn add_source( &mut self, @@ -1546,7 +1552,10 @@ where .add_source(request) .await .map(|response| response.into_inner()) - .map_err(crate::error::grpc_status_to_service_error) + .map_err(|status| crate::error::grpc_status_to_service_error( + status, + super::metastore::AddSourceRequest::rpc_name(), + )) } async fn toggle_source( &mut self, @@ -1556,7 +1565,10 @@ where .toggle_source(request) .await .map(|response| response.into_inner()) - .map_err(crate::error::grpc_status_to_service_error) + .map_err(|status| crate::error::grpc_status_to_service_error( + status, + super::metastore::ToggleSourceRequest::rpc_name(), + )) } async fn delete_source( &mut self, @@ -1566,7 +1578,10 @@ where .delete_source(request) .await .map(|response| response.into_inner()) - .map_err(crate::error::grpc_status_to_service_error) + .map_err(|status| crate::error::grpc_status_to_service_error( + status, + super::metastore::DeleteSourceRequest::rpc_name(), + )) } async fn get_or_create_open_shards( &mut self, @@ -1576,7 +1591,10 @@ where .get_or_create_open_shards(request) .await .map(|response| response.into_inner()) - .map_err(crate::error::grpc_status_to_service_error) + .map_err(|status| crate::error::grpc_status_to_service_error( + status, + GetOrCreateOpenShardsRequest::rpc_name(), + )) } async fn advise_reset_shards( &mut self, @@ -1586,7 +1604,10 @@ where .advise_reset_shards(request) .await .map(|response| response.into_inner()) - .map_err(crate::error::grpc_status_to_service_error) + .map_err(|status| crate::error::grpc_status_to_service_error( + status, + AdviseResetShardsRequest::rpc_name(), + )) } async fn get_debug_state( &mut self, @@ -1596,7 +1617,10 @@ where .get_debug_state(request) .await .map(|response| response.into_inner()) - .map_err(crate::error::grpc_status_to_service_error) + .map_err(|status| crate::error::grpc_status_to_service_error( + status, + GetDebugStateRequest::rpc_name(), + )) } } #[derive(Debug)] diff --git a/quickwit/quickwit-proto/src/codegen/quickwit/quickwit.indexing.rs b/quickwit/quickwit-proto/src/codegen/quickwit/quickwit.indexing.rs index eeb22a6d866..e8cd808ff57 100644 --- a/quickwit/quickwit-proto/src/codegen/quickwit/quickwit.indexing.rs +++ b/quickwit/quickwit-proto/src/codegen/quickwit/quickwit.indexing.rs @@ -460,7 +460,10 @@ where .apply_indexing_plan(request) .await .map(|response| response.into_inner()) - .map_err(crate::error::grpc_status_to_service_error) + .map_err(|status| crate::error::grpc_status_to_service_error( + status, + ApplyIndexingPlanRequest::rpc_name(), + )) } } #[derive(Debug)] diff --git a/quickwit/quickwit-proto/src/codegen/quickwit/quickwit.ingest.ingester.rs b/quickwit/quickwit-proto/src/codegen/quickwit/quickwit.ingest.ingester.rs index 61ed763dd0b..84f31e56367 100644 --- a/quickwit/quickwit-proto/src/codegen/quickwit/quickwit.ingest.ingester.rs +++ b/quickwit/quickwit-proto/src/codegen/quickwit/quickwit.ingest.ingester.rs @@ -2007,7 +2007,10 @@ where .persist(request) .await .map(|response| response.into_inner()) - .map_err(crate::error::grpc_status_to_service_error) + .map_err(|status| crate::error::grpc_status_to_service_error( + status, + PersistRequest::rpc_name(), + )) } async fn open_replication_stream( &mut self, @@ -2019,9 +2022,16 @@ where .map(|response| { let streaming: tonic::Streaming<_> = response.into_inner(); let stream = quickwit_common::ServiceStream::from(streaming); - stream.map_err(crate::error::grpc_status_to_service_error) + stream + .map_err(|status| crate::error::grpc_status_to_service_error( + status, + SynReplicationMessage::rpc_name(), + )) }) - .map_err(crate::error::grpc_status_to_service_error) + .map_err(|status| crate::error::grpc_status_to_service_error( + status, + SynReplicationMessage::rpc_name(), + )) } async fn open_fetch_stream( &mut self, @@ -2033,9 +2043,16 @@ where .map(|response| { let streaming: tonic::Streaming<_> = response.into_inner(); let stream = quickwit_common::ServiceStream::from(streaming); - stream.map_err(crate::error::grpc_status_to_service_error) + stream + .map_err(|status| crate::error::grpc_status_to_service_error( + status, + OpenFetchStreamRequest::rpc_name(), + )) }) - .map_err(crate::error::grpc_status_to_service_error) + .map_err(|status| crate::error::grpc_status_to_service_error( + status, + OpenFetchStreamRequest::rpc_name(), + )) } async fn open_observation_stream( &mut self, @@ -2047,9 +2064,16 @@ where .map(|response| { let streaming: tonic::Streaming<_> = response.into_inner(); let stream = quickwit_common::ServiceStream::from(streaming); - stream.map_err(crate::error::grpc_status_to_service_error) + stream + .map_err(|status| crate::error::grpc_status_to_service_error( + status, + OpenObservationStreamRequest::rpc_name(), + )) }) - .map_err(crate::error::grpc_status_to_service_error) + .map_err(|status| crate::error::grpc_status_to_service_error( + status, + OpenObservationStreamRequest::rpc_name(), + )) } async fn init_shards( &mut self, @@ -2059,7 +2083,10 @@ where .init_shards(request) .await .map(|response| response.into_inner()) - .map_err(crate::error::grpc_status_to_service_error) + .map_err(|status| crate::error::grpc_status_to_service_error( + status, + InitShardsRequest::rpc_name(), + )) } async fn retain_shards( &mut self, @@ -2069,7 +2096,10 @@ where .retain_shards(request) .await .map(|response| response.into_inner()) - .map_err(crate::error::grpc_status_to_service_error) + .map_err(|status| crate::error::grpc_status_to_service_error( + status, + RetainShardsRequest::rpc_name(), + )) } async fn truncate_shards( &mut self, @@ -2079,7 +2109,10 @@ where .truncate_shards(request) .await .map(|response| response.into_inner()) - .map_err(crate::error::grpc_status_to_service_error) + .map_err(|status| crate::error::grpc_status_to_service_error( + status, + TruncateShardsRequest::rpc_name(), + )) } async fn close_shards( &mut self, @@ -2089,7 +2122,10 @@ where .close_shards(request) .await .map(|response| response.into_inner()) - .map_err(crate::error::grpc_status_to_service_error) + .map_err(|status| crate::error::grpc_status_to_service_error( + status, + CloseShardsRequest::rpc_name(), + )) } async fn decommission( &mut self, @@ -2099,7 +2135,10 @@ where .decommission(request) .await .map(|response| response.into_inner()) - .map_err(crate::error::grpc_status_to_service_error) + .map_err(|status| crate::error::grpc_status_to_service_error( + status, + DecommissionRequest::rpc_name(), + )) } } #[derive(Debug)] diff --git a/quickwit/quickwit-proto/src/codegen/quickwit/quickwit.ingest.router.rs b/quickwit/quickwit-proto/src/codegen/quickwit/quickwit.ingest.router.rs index 9161c8531d8..6e9da46aa54 100644 --- a/quickwit/quickwit-proto/src/codegen/quickwit/quickwit.ingest.router.rs +++ b/quickwit/quickwit-proto/src/codegen/quickwit/quickwit.ingest.router.rs @@ -549,7 +549,10 @@ where .ingest(request) .await .map(|response| response.into_inner()) - .map_err(crate::error::grpc_status_to_service_error) + .map_err(|status| crate::error::grpc_status_to_service_error( + status, + IngestRequestV2::rpc_name(), + )) } } #[derive(Debug)] diff --git a/quickwit/quickwit-proto/src/codegen/quickwit/quickwit.metastore.rs b/quickwit/quickwit-proto/src/codegen/quickwit/quickwit.metastore.rs index 95a154dc3c7..1990e3b32b3 100644 --- a/quickwit/quickwit-proto/src/codegen/quickwit/quickwit.metastore.rs +++ b/quickwit/quickwit-proto/src/codegen/quickwit/quickwit.metastore.rs @@ -4380,7 +4380,10 @@ where .create_index(request) .await .map(|response| response.into_inner()) - .map_err(crate::error::grpc_status_to_service_error) + .map_err(|status| crate::error::grpc_status_to_service_error( + status, + CreateIndexRequest::rpc_name(), + )) } async fn index_metadata( &mut self, @@ -4390,7 +4393,10 @@ where .index_metadata(request) .await .map(|response| response.into_inner()) - .map_err(crate::error::grpc_status_to_service_error) + .map_err(|status| crate::error::grpc_status_to_service_error( + status, + IndexMetadataRequest::rpc_name(), + )) } async fn list_indexes_metadata( &mut self, @@ -4400,7 +4406,10 @@ where .list_indexes_metadata(request) .await .map(|response| response.into_inner()) - .map_err(crate::error::grpc_status_to_service_error) + .map_err(|status| crate::error::grpc_status_to_service_error( + status, + ListIndexesMetadataRequest::rpc_name(), + )) } async fn delete_index( &mut self, @@ -4410,7 +4419,10 @@ where .delete_index(request) .await .map(|response| response.into_inner()) - .map_err(crate::error::grpc_status_to_service_error) + .map_err(|status| crate::error::grpc_status_to_service_error( + status, + DeleteIndexRequest::rpc_name(), + )) } async fn list_splits( &mut self, @@ -4422,9 +4434,16 @@ where .map(|response| { let streaming: tonic::Streaming<_> = response.into_inner(); let stream = quickwit_common::ServiceStream::from(streaming); - stream.map_err(crate::error::grpc_status_to_service_error) + stream + .map_err(|status| crate::error::grpc_status_to_service_error( + status, + ListSplitsRequest::rpc_name(), + )) }) - .map_err(crate::error::grpc_status_to_service_error) + .map_err(|status| crate::error::grpc_status_to_service_error( + status, + ListSplitsRequest::rpc_name(), + )) } async fn stage_splits( &mut self, @@ -4434,7 +4453,10 @@ where .stage_splits(request) .await .map(|response| response.into_inner()) - .map_err(crate::error::grpc_status_to_service_error) + .map_err(|status| crate::error::grpc_status_to_service_error( + status, + StageSplitsRequest::rpc_name(), + )) } async fn publish_splits( &mut self, @@ -4444,7 +4466,10 @@ where .publish_splits(request) .await .map(|response| response.into_inner()) - .map_err(crate::error::grpc_status_to_service_error) + .map_err(|status| crate::error::grpc_status_to_service_error( + status, + PublishSplitsRequest::rpc_name(), + )) } async fn mark_splits_for_deletion( &mut self, @@ -4454,7 +4479,10 @@ where .mark_splits_for_deletion(request) .await .map(|response| response.into_inner()) - .map_err(crate::error::grpc_status_to_service_error) + .map_err(|status| crate::error::grpc_status_to_service_error( + status, + MarkSplitsForDeletionRequest::rpc_name(), + )) } async fn delete_splits( &mut self, @@ -4464,7 +4492,10 @@ where .delete_splits(request) .await .map(|response| response.into_inner()) - .map_err(crate::error::grpc_status_to_service_error) + .map_err(|status| crate::error::grpc_status_to_service_error( + status, + DeleteSplitsRequest::rpc_name(), + )) } async fn add_source( &mut self, @@ -4474,7 +4505,10 @@ where .add_source(request) .await .map(|response| response.into_inner()) - .map_err(crate::error::grpc_status_to_service_error) + .map_err(|status| crate::error::grpc_status_to_service_error( + status, + AddSourceRequest::rpc_name(), + )) } async fn toggle_source( &mut self, @@ -4484,7 +4518,10 @@ where .toggle_source(request) .await .map(|response| response.into_inner()) - .map_err(crate::error::grpc_status_to_service_error) + .map_err(|status| crate::error::grpc_status_to_service_error( + status, + ToggleSourceRequest::rpc_name(), + )) } async fn delete_source( &mut self, @@ -4494,7 +4531,10 @@ where .delete_source(request) .await .map(|response| response.into_inner()) - .map_err(crate::error::grpc_status_to_service_error) + .map_err(|status| crate::error::grpc_status_to_service_error( + status, + DeleteSourceRequest::rpc_name(), + )) } async fn reset_source_checkpoint( &mut self, @@ -4504,7 +4544,10 @@ where .reset_source_checkpoint(request) .await .map(|response| response.into_inner()) - .map_err(crate::error::grpc_status_to_service_error) + .map_err(|status| crate::error::grpc_status_to_service_error( + status, + ResetSourceCheckpointRequest::rpc_name(), + )) } async fn last_delete_opstamp( &mut self, @@ -4514,7 +4557,10 @@ where .last_delete_opstamp(request) .await .map(|response| response.into_inner()) - .map_err(crate::error::grpc_status_to_service_error) + .map_err(|status| crate::error::grpc_status_to_service_error( + status, + LastDeleteOpstampRequest::rpc_name(), + )) } async fn create_delete_task( &mut self, @@ -4524,7 +4570,10 @@ where .create_delete_task(request) .await .map(|response| response.into_inner()) - .map_err(crate::error::grpc_status_to_service_error) + .map_err(|status| crate::error::grpc_status_to_service_error( + status, + DeleteQuery::rpc_name(), + )) } async fn update_splits_delete_opstamp( &mut self, @@ -4534,7 +4583,10 @@ where .update_splits_delete_opstamp(request) .await .map(|response| response.into_inner()) - .map_err(crate::error::grpc_status_to_service_error) + .map_err(|status| crate::error::grpc_status_to_service_error( + status, + UpdateSplitsDeleteOpstampRequest::rpc_name(), + )) } async fn list_delete_tasks( &mut self, @@ -4544,7 +4596,10 @@ where .list_delete_tasks(request) .await .map(|response| response.into_inner()) - .map_err(crate::error::grpc_status_to_service_error) + .map_err(|status| crate::error::grpc_status_to_service_error( + status, + ListDeleteTasksRequest::rpc_name(), + )) } async fn list_stale_splits( &mut self, @@ -4554,7 +4609,10 @@ where .list_stale_splits(request) .await .map(|response| response.into_inner()) - .map_err(crate::error::grpc_status_to_service_error) + .map_err(|status| crate::error::grpc_status_to_service_error( + status, + ListStaleSplitsRequest::rpc_name(), + )) } async fn open_shards( &mut self, @@ -4564,7 +4622,10 @@ where .open_shards(request) .await .map(|response| response.into_inner()) - .map_err(crate::error::grpc_status_to_service_error) + .map_err(|status| crate::error::grpc_status_to_service_error( + status, + OpenShardsRequest::rpc_name(), + )) } async fn acquire_shards( &mut self, @@ -4574,7 +4635,10 @@ where .acquire_shards(request) .await .map(|response| response.into_inner()) - .map_err(crate::error::grpc_status_to_service_error) + .map_err(|status| crate::error::grpc_status_to_service_error( + status, + AcquireShardsRequest::rpc_name(), + )) } async fn delete_shards( &mut self, @@ -4584,7 +4648,10 @@ where .delete_shards(request) .await .map(|response| response.into_inner()) - .map_err(crate::error::grpc_status_to_service_error) + .map_err(|status| crate::error::grpc_status_to_service_error( + status, + DeleteShardsRequest::rpc_name(), + )) } async fn list_shards( &mut self, @@ -4594,7 +4661,10 @@ where .list_shards(request) .await .map(|response| response.into_inner()) - .map_err(crate::error::grpc_status_to_service_error) + .map_err(|status| crate::error::grpc_status_to_service_error( + status, + ListShardsRequest::rpc_name(), + )) } async fn create_index_template( &mut self, @@ -4604,7 +4674,10 @@ where .create_index_template(request) .await .map(|response| response.into_inner()) - .map_err(crate::error::grpc_status_to_service_error) + .map_err(|status| crate::error::grpc_status_to_service_error( + status, + CreateIndexTemplateRequest::rpc_name(), + )) } async fn get_index_template( &mut self, @@ -4614,7 +4687,10 @@ where .get_index_template(request) .await .map(|response| response.into_inner()) - .map_err(crate::error::grpc_status_to_service_error) + .map_err(|status| crate::error::grpc_status_to_service_error( + status, + GetIndexTemplateRequest::rpc_name(), + )) } async fn find_index_template_matches( &mut self, @@ -4624,7 +4700,10 @@ where .find_index_template_matches(request) .await .map(|response| response.into_inner()) - .map_err(crate::error::grpc_status_to_service_error) + .map_err(|status| crate::error::grpc_status_to_service_error( + status, + FindIndexTemplateMatchesRequest::rpc_name(), + )) } async fn list_index_templates( &mut self, @@ -4634,7 +4713,10 @@ where .list_index_templates(request) .await .map(|response| response.into_inner()) - .map_err(crate::error::grpc_status_to_service_error) + .map_err(|status| crate::error::grpc_status_to_service_error( + status, + ListIndexTemplatesRequest::rpc_name(), + )) } async fn delete_index_templates( &mut self, @@ -4644,7 +4726,10 @@ where .delete_index_templates(request) .await .map(|response| response.into_inner()) - .map_err(crate::error::grpc_status_to_service_error) + .map_err(|status| crate::error::grpc_status_to_service_error( + status, + DeleteIndexTemplatesRequest::rpc_name(), + )) } async fn check_connectivity(&mut self) -> anyhow::Result<()> { if self.connection_addrs_rx.borrow().len() == 0 { diff --git a/quickwit/quickwit-proto/src/error.rs b/quickwit/quickwit-proto/src/error.rs index 4eff3ca1573..2fe99b322ed 100644 --- a/quickwit/quickwit-proto/src/error.rs +++ b/quickwit/quickwit-proto/src/error.rs @@ -139,7 +139,7 @@ where E: GrpcServiceError { } /// Converts a gRPC status into a service error. -pub fn grpc_status_to_service_error(status: tonic::Status) -> E +pub fn grpc_status_to_service_error(status: tonic::Status, rpc_name: &'static str) -> E where E: GrpcServiceError { if let Some(header_value) = status.metadata().get_bin(QW_ERROR_HEADER_NAME) { let service_error = match decode_error(header_value) { @@ -154,10 +154,8 @@ where E: GrpcServiceError { }; return service_error; } - if let Some(source) = status.source() { - error!(error = %source, "transport error"); - } let message = status.message().to_string(); + error!(code = ?status.code(), rpc = rpc_name, "gRPC transport error: {message}"); match status.code() { // `Cancelled` is a client timeout whereas `DeadlineExceeded` is a server timeout. At this @@ -241,7 +239,7 @@ mod tests { let service_error = MyError::new_internal("test".to_string()); let status = grpc_error_to_grpc_status(service_error.clone()); - let expected_error: MyError = grpc_status_to_service_error(status); + let expected_error: MyError = grpc_status_to_service_error(status, "rpc_name"); assert_eq!(service_error, expected_error); } }