Skip to content

Commit

Permalink
Merge pull request #77 from Kuadrant/action_dispatcher
Browse files Browse the repository at this point in the history
Operation dispatcher FSM
  • Loading branch information
didierofrivia authored Sep 10, 2024
2 parents 6c9343d + c221036 commit 17cc2f0
Show file tree
Hide file tree
Showing 10 changed files with 685 additions and 96 deletions.
117 changes: 97 additions & 20 deletions src/configuration.rs
Original file line number Diff line number Diff line change
Expand Up @@ -486,16 +486,7 @@ impl TryFrom<PluginConfiguration> for FilterConfig {
let services = config
.extensions
.into_iter()
.map(|(name, ext)| {
(
name,
Rc::new(GrpcService::new(
ext.extension_type,
ext.endpoint,
ext.failure_mode,
)),
)
})
.map(|(name, ext)| (name, Rc::new(GrpcService::new(Rc::new(ext)))))
.collect();

Ok(Self {
Expand All @@ -505,18 +496,19 @@ impl TryFrom<PluginConfiguration> for FilterConfig {
}
}

#[derive(Deserialize, Debug, Clone, Default)]
#[derive(Deserialize, Debug, Clone, Default, PartialEq)]
#[serde(rename_all = "lowercase")]
pub enum FailureMode {
#[default]
Deny,
Allow,
}

#[derive(Deserialize, Debug, Clone)]
#[derive(Deserialize, Debug, Clone, Default, PartialEq)]
#[serde(rename_all = "lowercase")]
pub enum ExtensionType {
Auth,
#[default]
RateLimit,
}

Expand All @@ -527,7 +519,7 @@ pub struct PluginConfiguration {
pub policies: Vec<Policy>,
}

#[derive(Deserialize, Debug, Clone)]
#[derive(Deserialize, Debug, Clone, Default)]
#[serde(rename_all = "camelCase")]
pub struct Extension {
#[serde(rename = "type")]
Expand All @@ -537,6 +529,14 @@ pub struct Extension {
pub failure_mode: FailureMode,
}

#[derive(Deserialize, Debug, Clone)]
#[serde(rename_all = "camelCase")]
pub struct Action {
pub extension: String,
#[allow(dead_code)]
pub data: DataType,
}

#[cfg(test)]
mod test {
use super::*;
Expand Down Expand Up @@ -587,7 +587,18 @@ mod test {
"selector": "auth.metadata.username"
}
}]
}]
}],
"actions": [
{
"extension": "limitador",
"data": {
"static": {
"key": "rlp-ns-A/rlp-name-A",
"value": "1"
}
}
}
]
}]
}"#;

Expand Down Expand Up @@ -682,7 +693,18 @@ mod test {
"default": "my_selector_default_value"
}
}]
}]
}],
"actions": [
{
"extension": "limitador",
"data": {
"static": {
"key": "rlp-ns-A/rlp-name-A",
"value": "1"
}
}
}
]
}]
}"#;
let res = serde_json::from_str::<PluginConfiguration>(config);
Expand Down Expand Up @@ -759,7 +781,18 @@ mod test {
}]
}],
"data": [ { "selector": { "selector": "my.selector.path" } }]
}]
}],
"actions": [
{
"extension": "limitador",
"data": {
"static": {
"key": "rlp-ns-A/rlp-name-A",
"value": "1"
}
}
}
]
}]
}"#;
let res = serde_json::from_str::<PluginConfiguration>(config);
Expand Down Expand Up @@ -825,7 +858,18 @@ mod test {
"selector": "auth.metadata.username"
}
}]
}]
}],
"actions": [
{
"extension": "limitador",
"data": {
"static": {
"key": "rlp-ns-A/rlp-name-A",
"value": "1"
}
}
}
]
}]
}"#;
let res = serde_json::from_str::<PluginConfiguration>(config);
Expand Down Expand Up @@ -872,7 +916,18 @@ mod test {
"selector": "auth.metadata.username"
}
}]
}]
}],
"actions": [
{
"extension": "limitador",
"data": {
"static": {
"key": "rlp-ns-A/rlp-name-A",
"value": "1"
}
}
}
]
}]
}"#;
let res = serde_json::from_str::<PluginConfiguration>(bad_config);
Expand Down Expand Up @@ -902,7 +957,18 @@ mod test {
"value": "1"
}
}]
}]
}],
"actions": [
{
"extension": "limitador",
"data": {
"static": {
"key": "rlp-ns-A/rlp-name-A",
"value": "1"
}
}
}
]
}]
}"#;
let res = serde_json::from_str::<PluginConfiguration>(bad_config);
Expand Down Expand Up @@ -934,7 +1000,18 @@ mod test {
}]
}],
"data": [ { "selector": { "selector": "my.selector.path" } }]
}]
}],
"actions": [
{
"extension": "limitador",
"data": {
"static": {
"key": "rlp-ns-A/rlp-name-A",
"value": "1"
}
}
}
]
}]
}"#;
let res = serde_json::from_str::<PluginConfiguration>(bad_config);
Expand Down
47 changes: 20 additions & 27 deletions src/filter/http_context.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,7 @@
use crate::configuration::{FailureMode, FilterConfig};
use crate::envoy::{RateLimitResponse, RateLimitResponse_Code};
use crate::operation_dispatcher::OperationDispatcher;
use crate::policy::Policy;
use crate::service::rate_limit::RateLimitService;
use crate::service::{GrpcServiceHandler, HeaderResolver};
use log::{debug, warn};
use protobuf::Message;
use proxy_wasm::traits::{Context, HttpContext};
Expand All @@ -13,7 +12,7 @@ pub struct Filter {
pub context_id: u32,
pub config: Rc<FilterConfig>,
pub response_headers_to_add: Vec<(String, String)>,
pub header_resolver: Rc<HeaderResolver>,
pub operation_dispatcher: OperationDispatcher,
}

impl Filter {
Expand All @@ -40,33 +39,27 @@ impl Filter {
return Action::Continue;
}

// todo(adam-cattermole): For now we just get the first GrpcService but we expect to have
// an action which links to the service that should be used
let rls = self
.config
.services
.values()
.next()
.expect("expect a value");

let handler = GrpcServiceHandler::new(Rc::clone(rls), Rc::clone(&self.header_resolver));
let message = RateLimitService::message(rlp.domain.clone(), descriptors);
self.operation_dispatcher.build_operations(rlp, descriptors);

match handler.send(message) {
Ok(call_id) => {
debug!(
"#{} initiated gRPC call (id# {}) to Limitador",
self.context_id, call_id
);
Action::Pause
}
Err(e) => {
warn!("gRPC call to Limitador failed! {e:?}");
if let FailureMode::Deny = rls.failure_mode() {
self.send_http_response(500, vec![], Some(b"Internal Server Error.\n"))
if let Some(operation) = self.operation_dispatcher.next() {
match operation.get_result() {
Ok(call_id) => {
debug!(
"#{} initiated gRPC call (id# {}) to Limitador",
self.context_id, call_id
);
Action::Pause
}
Err(e) => {
warn!("gRPC call to Limitador failed! {e:?}");
if let FailureMode::Deny = operation.get_failure_mode() {
self.send_http_response(500, vec![], Some(b"Internal Server Error.\n"))
}
Action::Continue
}
Action::Continue
}
} else {
Action::Continue
}
}

Expand Down
18 changes: 16 additions & 2 deletions src/filter/root_context.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
use crate::configuration::{FilterConfig, PluginConfiguration};
use crate::filter::http_context::Filter;
use crate::service::HeaderResolver;
use crate::operation_dispatcher::OperationDispatcher;
use crate::service::{GrpcServiceHandler, HeaderResolver};
use const_format::formatcp;
use log::{debug, error, info};
use proxy_wasm::traits::{Context, HttpContext, RootContext};
use proxy_wasm::types::ContextType;
use std::collections::HashMap;
use std::rc::Rc;

const WASM_SHIM_VERSION: &str = env!("CARGO_PKG_VERSION");
Expand Down Expand Up @@ -37,11 +39,23 @@ impl RootContext for FilterRoot {

fn create_http_context(&self, context_id: u32) -> Option<Box<dyn HttpContext>> {
debug!("#{} create_http_context", context_id);
let mut service_handlers: HashMap<String, Rc<GrpcServiceHandler>> = HashMap::new();
self.config
.services
.iter()
.for_each(|(extension, service)| {
service_handlers
.entry(extension.clone())
.or_insert(Rc::from(GrpcServiceHandler::new(
Rc::clone(service),
Rc::new(HeaderResolver::new()),
)));
});
Some(Box::new(Filter {
context_id,
config: Rc::clone(&self.config),
response_headers_to_add: Vec::default(),
header_resolver: Rc::new(HeaderResolver::new()),
operation_dispatcher: OperationDispatcher::new(service_handlers),
}))
}

Expand Down
1 change: 1 addition & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ mod configuration;
mod envoy;
mod filter;
mod glob;
mod operation_dispatcher;
mod policy;
mod policy_index;
mod service;
Expand Down
Loading

0 comments on commit 17cc2f0

Please sign in to comment.