Skip to content

Commit

Permalink
Merge pull request #102 from Kuadrant/fix-calling-next
Browse files Browse the repository at this point in the history
Fix calling next operation when error
  • Loading branch information
didierofrivia authored Oct 8, 2024
2 parents 9f4b62d + 7f22481 commit 8b27b6f
Show file tree
Hide file tree
Showing 5 changed files with 59 additions and 34 deletions.
1 change: 1 addition & 0 deletions src/envoy/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ pub use {
},
base::Metadata,
external_auth::{CheckRequest, CheckResponse, CheckResponse_oneof_http_response},
http_status::StatusCode,
ratelimit::{RateLimitDescriptor, RateLimitDescriptor_Entry},
rls::{RateLimitRequest, RateLimitResponse, RateLimitResponse_Code},
};
Expand Down
12 changes: 6 additions & 6 deletions src/filter/http_context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -105,12 +105,12 @@ impl Context for Filter {
let some_op = self.operation_dispatcher.borrow().get_operation(token_id);

if let Some(operation) = some_op {
GrpcService::process_grpc_response(operation, resp_size);
self.operation_dispatcher.borrow_mut().next();

if let Some(_op) = self.operation_dispatcher.borrow_mut().next() {
} else {
self.resume_http_request()
if GrpcService::process_grpc_response(operation, resp_size).is_ok() {
self.operation_dispatcher.borrow_mut().next();
if let Some(_op) = self.operation_dispatcher.borrow_mut().next() {
} else {
self.resume_http_request()
}
}
} else {
warn!("No Operation found with token_id: {token_id}");
Expand Down
46 changes: 26 additions & 20 deletions src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ pub(crate) mod grpc_message;
pub(crate) mod rate_limit;

use crate::configuration::{Action, Extension, ExtensionType, FailureMode};
use crate::envoy::StatusCode;
use crate::operation_dispatcher::Operation;
use crate::service::auth::{AuthService, AUTH_METHOD_NAME, AUTH_SERVICE_NAME};
use crate::service::grpc_message::{GrpcMessageRequest, GrpcMessageResponse};
Expand Down Expand Up @@ -54,30 +55,35 @@ impl GrpcService {
&self.extension.failure_mode
}

pub fn process_grpc_response(operation: Rc<Operation>, resp_size: usize) {
pub fn process_grpc_response(
operation: Rc<Operation>,
resp_size: usize,
) -> Result<(), StatusCode> {
let failure_mode = operation.get_failure_mode();
let res_body_bytes =
match hostcalls::get_buffer(BufferType::GrpcReceiveBuffer, 0, resp_size).unwrap() {
Some(bytes) => bytes,
None => {
warn!("grpc response body is empty!");
if let Some(res_body_bytes) =
hostcalls::get_buffer(BufferType::GrpcReceiveBuffer, 0, resp_size).unwrap()
{
match GrpcMessageResponse::new(operation.get_extension_type(), &res_body_bytes) {
Ok(res) => match operation.get_extension_type() {
ExtensionType::Auth => {
AuthService::process_auth_grpc_response(res, failure_mode)
}
ExtensionType::RateLimit => {
RateLimitService::process_ratelimit_grpc_response(res, failure_mode)
}
},
Err(e) => {
warn!(
"failed to parse grpc response body into GrpcMessageResponse message: {e}"
);
GrpcService::handle_error_on_grpc_response(failure_mode);
return;
Err(StatusCode::InternalServerError)
}
};
let res = match GrpcMessageResponse::new(operation.get_extension_type(), &res_body_bytes) {
Ok(res) => res,
Err(e) => {
warn!("failed to parse grpc response body into GrpcMessageResponse message: {e}");
GrpcService::handle_error_on_grpc_response(failure_mode);
return;
}
};
match operation.get_extension_type() {
ExtensionType::Auth => AuthService::process_auth_grpc_response(res, failure_mode),
ExtensionType::RateLimit => {
RateLimitService::process_ratelimit_grpc_response(res, failure_mode)
}
} else {
warn!("grpc response body is empty!");
GrpcService::handle_error_on_grpc_response(failure_mode);
Err(StatusCode::InternalServerError)
}
}

Expand Down
20 changes: 15 additions & 5 deletions src/service/auth.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,12 @@ use crate::configuration::FailureMode;
use crate::envoy::{
Address, AttributeContext, AttributeContext_HttpRequest, AttributeContext_Peer,
AttributeContext_Request, CheckRequest, CheckResponse_oneof_http_response, Metadata,
SocketAddress,
SocketAddress, StatusCode,
};
use crate::service::grpc_message::{GrpcMessageResponse, GrpcMessageResult};
use crate::service::GrpcService;
use chrono::{DateTime, FixedOffset, Timelike};
use log::debug;
use log::{debug, warn};
use protobuf::well_known_types::Timestamp;
use protobuf::Message;
use proxy_wasm::hostcalls;
Expand Down Expand Up @@ -92,15 +92,17 @@ impl AuthService {
peer
}

pub fn process_auth_grpc_response(auth_resp: GrpcMessageResponse, failure_mode: &FailureMode) {
pub fn process_auth_grpc_response(
auth_resp: GrpcMessageResponse,
failure_mode: &FailureMode,
) -> Result<(), StatusCode> {
if let GrpcMessageResponse::Auth(check_response) = auth_resp {
// store dynamic metadata in filter state
store_metadata(check_response.get_dynamic_metadata());

match check_response.http_response {
Some(CheckResponse_oneof_http_response::ok_response(ok_response)) => {
debug!("process_auth_grpc_response: received OkHttpResponse");

ok_response
.get_response_headers_to_add()
.iter()
Expand All @@ -112,28 +114,36 @@ impl AuthService {
)
.unwrap()
});
Ok(())
}
Some(CheckResponse_oneof_http_response::denied_response(denied_response)) => {
debug!("process_auth_grpc_response: received DeniedHttpResponse",);

let mut response_headers = vec![];
let status_code = denied_response.get_status().code;
denied_response.get_headers().iter().for_each(|header| {
response_headers.push((
header.get_header().get_key(),
header.get_header().get_value(),
))
});
hostcalls::send_http_response(
denied_response.get_status().code as u32,
status_code as u32,
response_headers,
Some(denied_response.get_body().as_ref()),
)
.unwrap();
Err(status_code)
}
None => {
GrpcService::handle_error_on_grpc_response(failure_mode);
Err(StatusCode::InternalServerError)
}
}
} else {
warn!("not a GrpcMessageResponse::Auth(CheckResponse)!");
GrpcService::handle_error_on_grpc_response(failure_mode);
Err(StatusCode::InternalServerError)
}
}
}
14 changes: 11 additions & 3 deletions src/service/rate_limit.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
use crate::configuration::FailureMode;
use crate::envoy::{
RateLimitDescriptor, RateLimitRequest, RateLimitResponse, RateLimitResponse_Code,
RateLimitDescriptor, RateLimitRequest, RateLimitResponse, RateLimitResponse_Code, StatusCode,
};
use crate::service::grpc_message::{GrpcMessageResponse, GrpcMessageResult};
use crate::service::GrpcService;
use log::warn;
use protobuf::{Message, RepeatedField};
use proxy_wasm::hostcalls;
use proxy_wasm::types::{Bytes, MapType};
Expand Down Expand Up @@ -37,13 +38,14 @@ impl RateLimitService {
pub fn process_ratelimit_grpc_response(
rl_resp: GrpcMessageResponse,
failure_mode: &FailureMode,
) {
) -> Result<(), StatusCode> {
match rl_resp {
GrpcMessageResponse::RateLimit(RateLimitResponse {
overall_code: RateLimitResponse_Code::UNKNOWN,
..
}) => {
GrpcService::handle_error_on_grpc_response(failure_mode);
Err(StatusCode::InternalServerError)
}
GrpcMessageResponse::RateLimit(RateLimitResponse {
overall_code: RateLimitResponse_Code::OVER_LIMIT,
Expand All @@ -56,6 +58,7 @@ impl RateLimitService {
}
hostcalls::send_http_response(429, response_headers, Some(b"Too Many Requests\n"))
.unwrap();
Err(StatusCode::TooManyRequests)
}
GrpcMessageResponse::RateLimit(RateLimitResponse {
overall_code: RateLimitResponse_Code::OK,
Expand All @@ -70,8 +73,13 @@ impl RateLimitService {
)
.unwrap()
});
Ok(())
}
_ => {
warn!("not a valid GrpcMessageResponse::RateLimit(RateLimitResponse)!");
GrpcService::handle_error_on_grpc_response(failure_mode);
Err(StatusCode::InternalServerError)
}
_ => {}
}
}
}
Expand Down

0 comments on commit 8b27b6f

Please sign in to comment.