-
Notifications
You must be signed in to change notification settings - Fork 5
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
On grpc response flow #81
Changes from all commits
625b205
360c967
1fc0e1a
190b597
dbba0f5
d70d038
93c980d
98d3754
49e4fec
b0329d4
c262342
4272ff7
7125754
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,9 +1,9 @@ | ||
use crate::configuration::{FailureMode, FilterConfig}; | ||
use crate::configuration::{ExtensionType, FailureMode, FilterConfig}; | ||
use crate::envoy::{RateLimitResponse, RateLimitResponse_Code}; | ||
use crate::operation_dispatcher::OperationDispatcher; | ||
use crate::policy::Policy; | ||
use crate::service::grpc_message::GrpcMessageResponse; | ||
use log::{debug, warn}; | ||
use protobuf::Message; | ||
use proxy_wasm::traits::{Context, HttpContext}; | ||
use proxy_wasm::types::Action; | ||
use std::rc::Rc; | ||
|
@@ -29,8 +29,8 @@ impl Filter { | |
} | ||
} | ||
|
||
fn process_rate_limit_policy(&self, rlp: &Policy) -> Action { | ||
let descriptors = rlp.build_descriptors(self); | ||
fn process_policy(&self, policy: &Policy) -> Action { | ||
let descriptors = policy.build_descriptors(self); | ||
if descriptors.is_empty() { | ||
debug!( | ||
"#{} process_rate_limit_policy: empty descriptors", | ||
|
@@ -39,7 +39,8 @@ impl Filter { | |
return Action::Continue; | ||
} | ||
|
||
self.operation_dispatcher.build_operations(rlp, descriptors); | ||
self.operation_dispatcher | ||
.build_operations(policy, descriptors); | ||
|
||
if let Some(operation) = self.operation_dispatcher.next() { | ||
match operation.get_result() { | ||
|
@@ -63,22 +64,52 @@ impl Filter { | |
} | ||
} | ||
|
||
fn handle_error_on_grpc_response(&self) { | ||
// todo(adam-cattermole): We need a method of knowing which service is the one currently | ||
// being used (the current action) so that we can get the failure mode | ||
let rls = self | ||
.config | ||
.services | ||
.values() | ||
.next() | ||
.expect("expect a value"); | ||
match rls.failure_mode() { | ||
fn handle_error_on_grpc_response(&self, failure_mode: &FailureMode) { | ||
match failure_mode { | ||
FailureMode::Deny => { | ||
self.send_http_response(500, vec![], Some(b"Internal Server Error.\n")) | ||
} | ||
FailureMode::Allow => self.resume_http_request(), | ||
} | ||
} | ||
|
||
fn process_ratelimit_grpc_response( | ||
&mut self, | ||
rl_resp: GrpcMessageResponse, | ||
failure_mode: &FailureMode, | ||
) { | ||
match rl_resp { | ||
GrpcMessageResponse::RateLimit(RateLimitResponse { | ||
overall_code: RateLimitResponse_Code::UNKNOWN, | ||
.. | ||
}) => { | ||
self.handle_error_on_grpc_response(failure_mode); | ||
} | ||
GrpcMessageResponse::RateLimit(RateLimitResponse { | ||
overall_code: RateLimitResponse_Code::OVER_LIMIT, | ||
response_headers_to_add: rl_headers, | ||
.. | ||
}) => { | ||
let mut response_headers = vec![]; | ||
for header in &rl_headers { | ||
response_headers.push((header.get_key(), header.get_value())); | ||
} | ||
self.send_http_response(429, response_headers, Some(b"Too Many Requests\n")); | ||
} | ||
GrpcMessageResponse::RateLimit(RateLimitResponse { | ||
overall_code: RateLimitResponse_Code::OK, | ||
response_headers_to_add: additional_headers, | ||
.. | ||
}) => { | ||
for header in additional_headers { | ||
self.response_headers_to_add | ||
.push((header.key, header.value)); | ||
} | ||
} | ||
_ => {} | ||
} | ||
self.operation_dispatcher.next(); | ||
} | ||
} | ||
|
||
impl HttpContext for Filter { | ||
|
@@ -97,9 +128,12 @@ impl HttpContext for Filter { | |
); | ||
Action::Continue | ||
} | ||
Some(rlp) => { | ||
debug!("#{} ratelimitpolicy selected {}", self.context_id, rlp.name); | ||
self.process_rate_limit_policy(rlp) | ||
Some(policy) => { | ||
debug!( | ||
"#{} ratelimitpolicy selected {}", | ||
self.context_id, policy.name | ||
); | ||
self.process_policy(policy) | ||
} | ||
} | ||
} | ||
|
@@ -124,55 +158,42 @@ impl Context for Filter { | |
self.context_id | ||
); | ||
|
||
let res_body_bytes = match self.get_grpc_call_response_body(0, resp_size) { | ||
Some(bytes) => bytes, | ||
None => { | ||
warn!("grpc response body is empty!"); | ||
self.handle_error_on_grpc_response(); | ||
return; | ||
} | ||
}; | ||
|
||
let rl_resp: RateLimitResponse = match Message::parse_from_bytes(&res_body_bytes) { | ||
Ok(res) => res, | ||
Err(e) => { | ||
warn!("failed to parse grpc response body into RateLimitResponse message: {e}"); | ||
self.handle_error_on_grpc_response(); | ||
return; | ||
} | ||
}; | ||
|
||
match rl_resp { | ||
RateLimitResponse { | ||
overall_code: RateLimitResponse_Code::UNKNOWN, | ||
.. | ||
} => { | ||
self.handle_error_on_grpc_response(); | ||
return; | ||
} | ||
RateLimitResponse { | ||
overall_code: RateLimitResponse_Code::OVER_LIMIT, | ||
response_headers_to_add: rl_headers, | ||
.. | ||
} => { | ||
let mut response_headers = vec![]; | ||
for header in &rl_headers { | ||
response_headers.push((header.get_key(), header.get_value())); | ||
if let Some(operation) = self.operation_dispatcher.get_operation(token_id) { | ||
let failure_mode = &operation.get_failure_mode(); | ||
let res_body_bytes = match self.get_grpc_call_response_body(0, resp_size) { | ||
Some(bytes) => bytes, | ||
None => { | ||
warn!("grpc response body is empty!"); | ||
self.handle_error_on_grpc_response(failure_mode); | ||
return; | ||
} | ||
self.send_http_response(429, response_headers, Some(b"Too Many Requests\n")); | ||
return; | ||
} | ||
RateLimitResponse { | ||
overall_code: RateLimitResponse_Code::OK, | ||
response_headers_to_add: additional_headers, | ||
.. | ||
} => { | ||
for header in additional_headers { | ||
self.response_headers_to_add | ||
.push((header.key, header.value)); | ||
}; | ||
let res = match GrpcMessageResponse::new( | ||
operation.get_extension_type(), | ||
&res_body_bytes, | ||
status_code, | ||
) { | ||
Ok(res) => res, | ||
Err(e) => { | ||
warn!( | ||
"failed to parse grpc response body into GrpcMessageResponse message: {e}" | ||
); | ||
self.handle_error_on_grpc_response(failure_mode); | ||
return; | ||
} | ||
}; | ||
match operation.get_extension_type() { | ||
ExtensionType::Auth => {} // TODO(didierofrivia): Process auth grpc response. | ||
ExtensionType::RateLimit => self.process_ratelimit_grpc_response(res, failure_mode), | ||
} | ||
|
||
Comment on lines
+171
to
188
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think much of that logic could be encapsulated in the |
||
if let Some(_op) = self.operation_dispatcher.next() { | ||
} else { | ||
self.resume_http_request() | ||
} | ||
} else { | ||
warn!("No Operation found with token_id: {token_id}"); | ||
self.handle_error_on_grpc_response(&FailureMode::Deny); // TODO(didierofrivia): Decide on what's the default failure mode | ||
} | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yeah, interesting... I think this could (not asking to change the current behavior really) be a |
||
self.resume_http_request(); | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see this is more than RL then... which is fine ;)