From 2ca0010f423786d65f1350a9aaca529624541282 Mon Sep 17 00:00:00 2001 From: dd di cesare Date: Fri, 13 Sep 2024 12:58:06 +0200 Subject: [PATCH] [refactor] Wiring filter with dispatcher Signed-off-by: dd di cesare --- src/filter/http_context.rs | 97 ++++++++++++++++++++++---------------- 1 file changed, 57 insertions(+), 40 deletions(-) diff --git a/src/filter/http_context.rs b/src/filter/http_context.rs index 2290266d..c132aafe 100644 --- a/src/filter/http_context.rs +++ b/src/filter/http_context.rs @@ -1,9 +1,9 @@ -use crate::configuration::{FailureMode, FilterConfig}; +use crate::configuration::{ExtensionType, FailureMode, FilterConfig}; use crate::envoy::{RateLimitResponse, RateLimitResponse_Code}; use crate::operation_dispatcher::OperationDispatcher; use crate::policy::Policy; +use crate::service::grpc_message::GrpcMessageResponse; use log::{debug, warn}; -use protobuf::Message; use proxy_wasm::traits::{Context, HttpContext}; use proxy_wasm::types::Action; use std::rc::Rc; @@ -79,6 +79,39 @@ impl Filter { FailureMode::Allow => self.resume_http_request(), } } + + fn process_ratelimit_grpc_response(&mut self, rl_resp: GrpcMessageResponse) { + match rl_resp { + GrpcMessageResponse::RateLimit(RateLimitResponse { + overall_code: RateLimitResponse_Code::UNKNOWN, + .. + }) => { + self.handle_error_on_grpc_response(); + } + GrpcMessageResponse::RateLimit(RateLimitResponse { + overall_code: RateLimitResponse_Code::OVER_LIMIT, + response_headers_to_add: rl_headers, + .. + }) => { + let mut response_headers = vec![]; + for header in &rl_headers { + response_headers.push((header.get_key(), header.get_value())); + } + self.send_http_response(429, response_headers, Some(b"Too Many Requests\n")); + } + GrpcMessageResponse::RateLimit(RateLimitResponse { + overall_code: RateLimitResponse_Code::OK, + response_headers_to_add: additional_headers, + .. + }) => { + for header in additional_headers { + self.response_headers_to_add + .push((header.key, header.value)); + } + } + _ => {} + } + } } impl HttpContext for Filter { @@ -133,46 +166,30 @@ impl Context for Filter { } }; - let rl_resp: RateLimitResponse = match Message::parse_from_bytes(&res_body_bytes) { - Ok(res) => res, - Err(e) => { - warn!("failed to parse grpc response body into RateLimitResponse message: {e}"); - self.handle_error_on_grpc_response(); - return; - } - }; - - match rl_resp { - RateLimitResponse { - overall_code: RateLimitResponse_Code::UNKNOWN, - .. - } => { - self.handle_error_on_grpc_response(); - return; - } - RateLimitResponse { - overall_code: RateLimitResponse_Code::OVER_LIMIT, - response_headers_to_add: rl_headers, - .. - } => { - let mut response_headers = vec![]; - for header in &rl_headers { - response_headers.push((header.get_key(), header.get_value())); - } - self.send_http_response(429, response_headers, Some(b"Too Many Requests\n")); - return; - } - RateLimitResponse { - overall_code: RateLimitResponse_Code::OK, - response_headers_to_add: additional_headers, - .. - } => { - for header in additional_headers { - self.response_headers_to_add - .push((header.key, header.value)); + if let Some(operation) = self.operation_dispatcher.get_operation(token_id) { + let res = match GrpcMessageResponse::new( + operation.get_extension_type(), + &res_body_bytes, + status_code, + ) { + Ok(res) => res, + Err(e) => { + warn!( + "failed to parse grpc response body into GrpcMessageResponse message: {e}" + ); + self.handle_error_on_grpc_response(); + return; } + }; + match operation.get_extension_type() { + ExtensionType::Auth => {} + ExtensionType::RateLimit => self.process_ratelimit_grpc_response(res), } + + self.resume_http_request(); + } else { + warn!("No Operation found with token_id: {token_id}"); + self.handle_error_on_grpc_response(); } - self.resume_http_request(); } }