Skip to content

Commit

Permalink
Merge pull request #144 from Kuadrant/fix-rl-response-headers
Browse files Browse the repository at this point in the history
Add RL response headers in correct phase
  • Loading branch information
adam-cattermole authored Nov 13, 2024
2 parents 5e73c10 + 9f280cc commit 3c8fc45
Show file tree
Hide file tree
Showing 5 changed files with 37 additions and 24 deletions.
4 changes: 3 additions & 1 deletion src/filter/http_context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,9 @@ impl Context for Filter {

match op_res {
Ok(operation) => {
if GrpcService::process_grpc_response(operation, resp_size).is_ok() {
if let Ok(result) = GrpcService::process_grpc_response(operation, resp_size) {
// add the response headers
self.response_headers_to_add.extend(result.response_headers);
// call the next op
match self.operation_dispatcher.borrow_mut().next() {
Ok(some_op) => {
Expand Down
16 changes: 15 additions & 1 deletion src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ impl GrpcService {
pub fn process_grpc_response(
operation: Rc<Operation>,
resp_size: usize,
) -> Result<(), StatusCode> {
) -> Result<GrpcResult, StatusCode> {
let failure_mode = operation.get_failure_mode();
if let Some(res_body_bytes) =
hostcalls::get_buffer(BufferType::GrpcReceiveBuffer, 0, resp_size).unwrap()
Expand Down Expand Up @@ -92,6 +92,20 @@ impl GrpcService {
}
}

pub struct GrpcResult {
pub response_headers: Vec<(String, String)>,
}
impl GrpcResult {
pub fn default() -> Self {
Self {
response_headers: Vec::new(),
}
}
pub fn new(response_headers: Vec<(String, String)>) -> Self {
Self { response_headers }
}
}

pub type GrpcCallFn = fn(
upstream_name: &str,
service_name: &str,
Expand Down
6 changes: 3 additions & 3 deletions src/service/auth.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use crate::envoy::{
SocketAddress, StatusCode,
};
use crate::service::grpc_message::{GrpcMessageResponse, GrpcMessageResult};
use crate::service::GrpcService;
use crate::service::{GrpcResult, GrpcService};
use chrono::{DateTime, FixedOffset};
use log::{debug, warn};
use protobuf::well_known_types::Timestamp;
Expand Down Expand Up @@ -125,7 +125,7 @@ impl AuthService {
pub fn process_auth_grpc_response(
auth_resp: GrpcMessageResponse,
failure_mode: FailureMode,
) -> Result<(), StatusCode> {
) -> Result<GrpcResult, StatusCode> {
if let GrpcMessageResponse::Auth(check_response) = auth_resp {
// store dynamic metadata in filter state
store_metadata(check_response.get_dynamic_metadata());
Expand Down Expand Up @@ -153,7 +153,7 @@ impl AuthService {
)
.unwrap()
});
Ok(())
Ok(GrpcResult::default())
}
Some(CheckResponse_oneof_http_response::denied_response(denied_response)) => {
debug!("process_auth_grpc_response: received DeniedHttpResponse");
Expand Down
23 changes: 10 additions & 13 deletions src/service/rate_limit.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,11 @@ use crate::envoy::{
RateLimitDescriptor, RateLimitRequest, RateLimitResponse, RateLimitResponse_Code, StatusCode,
};
use crate::service::grpc_message::{GrpcMessageResponse, GrpcMessageResult};
use crate::service::GrpcService;
use crate::service::{GrpcResult, GrpcService};
use log::warn;
use protobuf::{Message, RepeatedField};
use proxy_wasm::hostcalls;
use proxy_wasm::types::{Bytes, MapType};
use proxy_wasm::types::Bytes;

pub const RATELIMIT_SERVICE_NAME: &str = "envoy.service.ratelimit.v3.RateLimitService";
pub const RATELIMIT_METHOD_NAME: &str = "ShouldRateLimit";
Expand Down Expand Up @@ -38,7 +38,7 @@ impl RateLimitService {
pub fn process_ratelimit_grpc_response(
rl_resp: GrpcMessageResponse,
failure_mode: FailureMode,
) -> Result<(), StatusCode> {
) -> Result<GrpcResult, StatusCode> {
match rl_resp {
GrpcMessageResponse::RateLimit(RateLimitResponse {
overall_code: RateLimitResponse_Code::UNKNOWN,
Expand All @@ -65,16 +65,13 @@ impl RateLimitService {
response_headers_to_add: additional_headers,
..
}) => {
// TODO: This should not be sent to the upstream!
additional_headers.iter().for_each(|header| {
hostcalls::add_map_value(
MapType::HttpResponseHeaders,
header.get_key(),
header.get_value(),
)
.unwrap()
});
Ok(())
let result = GrpcResult::new(
additional_headers
.iter()
.map(|header| (header.get_key().to_owned(), header.get_value().to_owned()))
.collect(),
);
Ok(result)
}
_ => {
warn!("not a valid GrpcMessageResponse::RateLimit(RateLimitResponse)!");
Expand Down
12 changes: 6 additions & 6 deletions tests/rate_limited.rs
Original file line number Diff line number Diff line change
Expand Up @@ -353,6 +353,12 @@ fn it_passes_additional_headers() {
)
.expect_get_buffer_bytes(Some(BufferType::GrpcReceiveBuffer))
.returning(Some(&grpc_response))
.execute_and_expect(ReturnType::None)
.unwrap();

module
.call_proxy_on_response_headers(http_context, 0, false)
.expect_log(Some(LogLevel::Debug), Some("#2 on_http_response_headers"))
.expect_add_header_map_value(
Some(MapType::HttpResponseHeaders),
Some("test"),
Expand All @@ -363,12 +369,6 @@ fn it_passes_additional_headers() {
Some("other"),
Some("header value"),
)
.execute_and_expect(ReturnType::None)
.unwrap();

module
.call_proxy_on_response_headers(http_context, 0, false)
.expect_log(Some(LogLevel::Debug), Some("#2 on_http_response_headers"))
.execute_and_expect(ReturnType::Action(Action::Continue))
.unwrap();
}
Expand Down

0 comments on commit 3c8fc45

Please sign in to comment.