Skip to content

Commit

Permalink
[refactor] Wiring filter with dispatcher
Browse files Browse the repository at this point in the history
Signed-off-by: dd di cesare <[email protected]>
  • Loading branch information
didierofrivia committed Sep 16, 2024
1 parent 4b792a4 commit 2ca0010
Showing 1 changed file with 57 additions and 40 deletions.
97 changes: 57 additions & 40 deletions src/filter/http_context.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
use crate::configuration::{FailureMode, FilterConfig};
use crate::configuration::{ExtensionType, FailureMode, FilterConfig};
use crate::envoy::{RateLimitResponse, RateLimitResponse_Code};
use crate::operation_dispatcher::OperationDispatcher;
use crate::policy::Policy;
use crate::service::grpc_message::GrpcMessageResponse;
use log::{debug, warn};
use protobuf::Message;
use proxy_wasm::traits::{Context, HttpContext};
use proxy_wasm::types::Action;
use std::rc::Rc;
Expand Down Expand Up @@ -79,6 +79,39 @@ impl Filter {
FailureMode::Allow => self.resume_http_request(),
}
}

fn process_ratelimit_grpc_response(&mut self, rl_resp: GrpcMessageResponse) {
match rl_resp {
GrpcMessageResponse::RateLimit(RateLimitResponse {
overall_code: RateLimitResponse_Code::UNKNOWN,
..
}) => {
self.handle_error_on_grpc_response();
}
GrpcMessageResponse::RateLimit(RateLimitResponse {
overall_code: RateLimitResponse_Code::OVER_LIMIT,
response_headers_to_add: rl_headers,
..
}) => {
let mut response_headers = vec![];
for header in &rl_headers {
response_headers.push((header.get_key(), header.get_value()));
}
self.send_http_response(429, response_headers, Some(b"Too Many Requests\n"));
}
GrpcMessageResponse::RateLimit(RateLimitResponse {
overall_code: RateLimitResponse_Code::OK,
response_headers_to_add: additional_headers,
..
}) => {
for header in additional_headers {
self.response_headers_to_add
.push((header.key, header.value));
}
}
_ => {}
}
}
}

impl HttpContext for Filter {
Expand Down Expand Up @@ -133,46 +166,30 @@ impl Context for Filter {
}
};

let rl_resp: RateLimitResponse = match Message::parse_from_bytes(&res_body_bytes) {
Ok(res) => res,
Err(e) => {
warn!("failed to parse grpc response body into RateLimitResponse message: {e}");
self.handle_error_on_grpc_response();
return;
}
};

match rl_resp {
RateLimitResponse {
overall_code: RateLimitResponse_Code::UNKNOWN,
..
} => {
self.handle_error_on_grpc_response();
return;
}
RateLimitResponse {
overall_code: RateLimitResponse_Code::OVER_LIMIT,
response_headers_to_add: rl_headers,
..
} => {
let mut response_headers = vec![];
for header in &rl_headers {
response_headers.push((header.get_key(), header.get_value()));
}
self.send_http_response(429, response_headers, Some(b"Too Many Requests\n"));
return;
}
RateLimitResponse {
overall_code: RateLimitResponse_Code::OK,
response_headers_to_add: additional_headers,
..
} => {
for header in additional_headers {
self.response_headers_to_add
.push((header.key, header.value));
if let Some(operation) = self.operation_dispatcher.get_operation(token_id) {
let res = match GrpcMessageResponse::new(
operation.get_extension_type(),
&res_body_bytes,
status_code,
) {
Ok(res) => res,
Err(e) => {
warn!(
"failed to parse grpc response body into GrpcMessageResponse message: {e}"
);
self.handle_error_on_grpc_response();
return;
}
};
match operation.get_extension_type() {
ExtensionType::Auth => {}
ExtensionType::RateLimit => self.process_ratelimit_grpc_response(res),
}

self.resume_http_request();
} else {
warn!("No Operation found with token_id: {token_id}");
self.handle_error_on_grpc_response();
}
self.resume_http_request();
}
}

0 comments on commit 2ca0010

Please sign in to comment.