From 7f5024fc238fa18805aaab422e0adba3518f36c7 Mon Sep 17 00:00:00 2001 From: dd di cesare Date: Mon, 7 Oct 2024 16:13:56 +0200 Subject: [PATCH 1/2] [refactor] Not returning when calling `handle_error_on_grpc_response` Signed-off-by: dd di cesare --- src/service.rs | 33 +++++++++++++++------------------ 1 file changed, 15 insertions(+), 18 deletions(-) diff --git a/src/service.rs b/src/service.rs index c16281e7..9fee5656 100644 --- a/src/service.rs +++ b/src/service.rs @@ -56,28 +56,25 @@ impl GrpcService { pub fn process_grpc_response(operation: Rc, resp_size: usize) { let failure_mode = operation.get_failure_mode(); - let res_body_bytes = - match hostcalls::get_buffer(BufferType::GrpcReceiveBuffer, 0, resp_size).unwrap() { - Some(bytes) => bytes, - None => { - warn!("grpc response body is empty!"); + if let Some(res_body_bytes) = hostcalls::get_buffer(BufferType::GrpcReceiveBuffer, 0, resp_size).unwrap() { + match GrpcMessageResponse::new(operation.get_extension_type(), &res_body_bytes) { + Ok(res) => { + match operation.get_extension_type() { + ExtensionType::Auth => AuthService::process_auth_grpc_response(res, failure_mode), + ExtensionType::RateLimit => { + RateLimitService::process_ratelimit_grpc_response(res, failure_mode) + } + } + } + Err(e) => { + warn!("failed to parse grpc response body into GrpcMessageResponse message: {e}"); GrpcService::handle_error_on_grpc_response(failure_mode); - return; } }; - let res = match GrpcMessageResponse::new(operation.get_extension_type(), &res_body_bytes) { - Ok(res) => res, - Err(e) => { - warn!("failed to parse grpc response body into GrpcMessageResponse message: {e}"); + } + else { + warn!("grpc response body is empty!"); GrpcService::handle_error_on_grpc_response(failure_mode); - return; - } - }; - match operation.get_extension_type() { - ExtensionType::Auth => AuthService::process_auth_grpc_response(res, failure_mode), - ExtensionType::RateLimit => { - RateLimitService::process_ratelimit_grpc_response(res, failure_mode) - } } } From 7f22481dea3e087ce5e20ac440c605af864c65c7 Mon Sep 17 00:00:00 2001 From: dd di cesare Date: Mon, 7 Oct 2024 16:58:06 +0200 Subject: [PATCH 2/2] [refactor] Returning StatusCode result to decide on flow * We can later check on the actual status to make decisions Signed-off-by: dd di cesare --- src/envoy/mod.rs | 1 + src/filter/http_context.rs | 12 ++++++------ src/service.rs | 39 +++++++++++++++++++++++--------------- src/service/auth.rs | 20 ++++++++++++++----- src/service/rate_limit.rs | 14 +++++++++++--- 5 files changed, 57 insertions(+), 29 deletions(-) diff --git a/src/envoy/mod.rs b/src/envoy/mod.rs index 60810273..ddbcf0e7 100644 --- a/src/envoy/mod.rs +++ b/src/envoy/mod.rs @@ -38,6 +38,7 @@ pub use { }, base::Metadata, external_auth::{CheckRequest, CheckResponse, CheckResponse_oneof_http_response}, + http_status::StatusCode, ratelimit::{RateLimitDescriptor, RateLimitDescriptor_Entry}, rls::{RateLimitRequest, RateLimitResponse, RateLimitResponse_Code}, }; diff --git a/src/filter/http_context.rs b/src/filter/http_context.rs index d890c4f0..be35779e 100644 --- a/src/filter/http_context.rs +++ b/src/filter/http_context.rs @@ -105,12 +105,12 @@ impl Context for Filter { let some_op = self.operation_dispatcher.borrow().get_operation(token_id); if let Some(operation) = some_op { - GrpcService::process_grpc_response(operation, resp_size); - self.operation_dispatcher.borrow_mut().next(); - - if let Some(_op) = self.operation_dispatcher.borrow_mut().next() { - } else { - self.resume_http_request() + if GrpcService::process_grpc_response(operation, resp_size).is_ok() { + self.operation_dispatcher.borrow_mut().next(); + if let Some(_op) = self.operation_dispatcher.borrow_mut().next() { + } else { + self.resume_http_request() + } } } else { warn!("No Operation found with token_id: {token_id}"); diff --git a/src/service.rs b/src/service.rs index 9fee5656..70eded9f 100644 --- a/src/service.rs +++ b/src/service.rs @@ -3,6 +3,7 @@ pub(crate) mod grpc_message; pub(crate) mod rate_limit; use crate::configuration::{Action, Extension, ExtensionType, FailureMode}; +use crate::envoy::StatusCode; use crate::operation_dispatcher::Operation; use crate::service::auth::{AuthService, AUTH_METHOD_NAME, AUTH_SERVICE_NAME}; use crate::service::grpc_message::{GrpcMessageRequest, GrpcMessageResponse}; @@ -54,27 +55,35 @@ impl GrpcService { &self.extension.failure_mode } - pub fn process_grpc_response(operation: Rc, resp_size: usize) { + pub fn process_grpc_response( + operation: Rc, + resp_size: usize, + ) -> Result<(), StatusCode> { let failure_mode = operation.get_failure_mode(); - if let Some(res_body_bytes) = hostcalls::get_buffer(BufferType::GrpcReceiveBuffer, 0, resp_size).unwrap() { + if let Some(res_body_bytes) = + hostcalls::get_buffer(BufferType::GrpcReceiveBuffer, 0, resp_size).unwrap() + { match GrpcMessageResponse::new(operation.get_extension_type(), &res_body_bytes) { - Ok(res) => { - match operation.get_extension_type() { - ExtensionType::Auth => AuthService::process_auth_grpc_response(res, failure_mode), - ExtensionType::RateLimit => { - RateLimitService::process_ratelimit_grpc_response(res, failure_mode) - } + Ok(res) => match operation.get_extension_type() { + ExtensionType::Auth => { + AuthService::process_auth_grpc_response(res, failure_mode) } - } + ExtensionType::RateLimit => { + RateLimitService::process_ratelimit_grpc_response(res, failure_mode) + } + }, Err(e) => { - warn!("failed to parse grpc response body into GrpcMessageResponse message: {e}"); + warn!( + "failed to parse grpc response body into GrpcMessageResponse message: {e}" + ); GrpcService::handle_error_on_grpc_response(failure_mode); + Err(StatusCode::InternalServerError) } - }; - } - else { - warn!("grpc response body is empty!"); - GrpcService::handle_error_on_grpc_response(failure_mode); + } + } else { + warn!("grpc response body is empty!"); + GrpcService::handle_error_on_grpc_response(failure_mode); + Err(StatusCode::InternalServerError) } } diff --git a/src/service/auth.rs b/src/service/auth.rs index 7b2812de..83e09ce1 100644 --- a/src/service/auth.rs +++ b/src/service/auth.rs @@ -3,12 +3,12 @@ use crate::configuration::FailureMode; use crate::envoy::{ Address, AttributeContext, AttributeContext_HttpRequest, AttributeContext_Peer, AttributeContext_Request, CheckRequest, CheckResponse_oneof_http_response, Metadata, - SocketAddress, + SocketAddress, StatusCode, }; use crate::service::grpc_message::{GrpcMessageResponse, GrpcMessageResult}; use crate::service::GrpcService; use chrono::{DateTime, FixedOffset, Timelike}; -use log::debug; +use log::{debug, warn}; use protobuf::well_known_types::Timestamp; use protobuf::Message; use proxy_wasm::hostcalls; @@ -92,7 +92,10 @@ impl AuthService { peer } - pub fn process_auth_grpc_response(auth_resp: GrpcMessageResponse, failure_mode: &FailureMode) { + pub fn process_auth_grpc_response( + auth_resp: GrpcMessageResponse, + failure_mode: &FailureMode, + ) -> Result<(), StatusCode> { if let GrpcMessageResponse::Auth(check_response) = auth_resp { // store dynamic metadata in filter state store_metadata(check_response.get_dynamic_metadata()); @@ -100,7 +103,6 @@ impl AuthService { match check_response.http_response { Some(CheckResponse_oneof_http_response::ok_response(ok_response)) => { debug!("process_auth_grpc_response: received OkHttpResponse"); - ok_response .get_response_headers_to_add() .iter() @@ -112,11 +114,13 @@ impl AuthService { ) .unwrap() }); + Ok(()) } Some(CheckResponse_oneof_http_response::denied_response(denied_response)) => { debug!("process_auth_grpc_response: received DeniedHttpResponse",); let mut response_headers = vec![]; + let status_code = denied_response.get_status().code; denied_response.get_headers().iter().for_each(|header| { response_headers.push(( header.get_header().get_key(), @@ -124,16 +128,22 @@ impl AuthService { )) }); hostcalls::send_http_response( - denied_response.get_status().code as u32, + status_code as u32, response_headers, Some(denied_response.get_body().as_ref()), ) .unwrap(); + Err(status_code) } None => { GrpcService::handle_error_on_grpc_response(failure_mode); + Err(StatusCode::InternalServerError) } } + } else { + warn!("not a GrpcMessageResponse::Auth(CheckResponse)!"); + GrpcService::handle_error_on_grpc_response(failure_mode); + Err(StatusCode::InternalServerError) } } } diff --git a/src/service/rate_limit.rs b/src/service/rate_limit.rs index 106538f9..c44cd88b 100644 --- a/src/service/rate_limit.rs +++ b/src/service/rate_limit.rs @@ -1,9 +1,10 @@ use crate::configuration::FailureMode; use crate::envoy::{ - RateLimitDescriptor, RateLimitRequest, RateLimitResponse, RateLimitResponse_Code, + RateLimitDescriptor, RateLimitRequest, RateLimitResponse, RateLimitResponse_Code, StatusCode, }; use crate::service::grpc_message::{GrpcMessageResponse, GrpcMessageResult}; use crate::service::GrpcService; +use log::warn; use protobuf::{Message, RepeatedField}; use proxy_wasm::hostcalls; use proxy_wasm::types::{Bytes, MapType}; @@ -37,13 +38,14 @@ impl RateLimitService { pub fn process_ratelimit_grpc_response( rl_resp: GrpcMessageResponse, failure_mode: &FailureMode, - ) { + ) -> Result<(), StatusCode> { match rl_resp { GrpcMessageResponse::RateLimit(RateLimitResponse { overall_code: RateLimitResponse_Code::UNKNOWN, .. }) => { GrpcService::handle_error_on_grpc_response(failure_mode); + Err(StatusCode::InternalServerError) } GrpcMessageResponse::RateLimit(RateLimitResponse { overall_code: RateLimitResponse_Code::OVER_LIMIT, @@ -56,6 +58,7 @@ impl RateLimitService { } hostcalls::send_http_response(429, response_headers, Some(b"Too Many Requests\n")) .unwrap(); + Err(StatusCode::TooManyRequests) } GrpcMessageResponse::RateLimit(RateLimitResponse { overall_code: RateLimitResponse_Code::OK, @@ -70,8 +73,13 @@ impl RateLimitService { ) .unwrap() }); + Ok(()) + } + _ => { + warn!("not a valid GrpcMessageResponse::RateLimit(RateLimitResponse)!"); + GrpcService::handle_error_on_grpc_response(failure_mode); + Err(StatusCode::InternalServerError) } - _ => {} } } }