diff --git a/Makefile b/Makefile index 5362a265..e7944622 100644 --- a/Makefile +++ b/Makefile @@ -79,3 +79,21 @@ chmod a+x $(1)/bin/protoc ;\ rm -rf $$TMP_DIR ;\ } endef + +##@ Util + +# go-install-tool will 'go install' any package $2 and install it to $1. +PROJECT_DIR := $(shell dirname $(abspath $(lastword $(MAKEFILE_LIST)))) +define go-install-tool +@[ -f $(1) ] || { \ +set -e ;\ +TMP_DIR=$$(mktemp -d) ;\ +cd $$TMP_DIR ;\ +go mod init tmp ;\ +echo "Downloading $(2)" ;\ +GOBIN=$(PROJECT_DIR)/bin go install $(2) ;\ +rm -rf $$TMP_DIR ;\ +} +endef + +include ./make/*.mk diff --git a/README.md b/README.md index 46587c3a..9ab09747 100644 --- a/README.md +++ b/README.md @@ -6,15 +6,23 @@ A Proxy-Wasm module written in Rust, acting as a shim between Envoy and Limitador. ## Sample configuration + Following is a sample configuration used by the shim. ```yaml -failureMode: deny -rateLimitPolicies: +extensions: + auth-ext: + type: auth + endpoint: auth-cluster + failureMode: deny + ratelimit-ext: + type: ratelimit + endpoint: ratelimit-cluster + failureMode: deny +policies: - name: rlp-ns-A/rlp-name-A domain: rlp-ns-A/rlp-name-A - service: rate-limit-cluster - hostnames: ["*.toystore.com"] + hostnames: [ "*.toystore.com" ] rules: - conditions: - allOf: @@ -27,12 +35,15 @@ rateLimitPolicies: - selector: request.method operator: eq value: GET - data: - - selector: - selector: request.headers.My-Custom-Header - - static: - key: admin - value: "1" + actions: + - extension: ratelimit-ext + scope: rlp-ns-A/rlp-name-A + data: + - selector: + selector: request.headers.My-Custom-Header + - static: + key: admin + value: "1" ``` ## Features @@ -57,6 +68,7 @@ pub enum WhenConditionOperator { The `matches` operator is a a simple globbing pattern implementation based on regular expressions. The only characters taken into account are: + * `?`: 0 or 1 characters * `*`: 0 or more characters * `+`: 1 or more characters @@ -95,10 +107,10 @@ Example: Input: this.is.a.exam\.ple -> Retuns: ["this", "is", "a", "exam.ple"]. ``` -Some path segments include dot `.` char in them. For instance envoy filter names: `envoy.filters.http.header_to_metadata`. +Some path segments include dot `.` char in them. For instance envoy filter +names: `envoy.filters.http.header_to_metadata`. In that particular cases, the dot chat (separator), needs to be escaped. - ## Building Prerequisites: @@ -127,7 +139,108 @@ make build BUILD=release cargo test ``` -## Running local development environment +## Running local development environment (kind) + +`docker` is required. + +Run local development environment + +```sh +make local-setup +``` + +This deploys a local kubernetes cluster using kind, with the local build of wasm-shim mapped to the envoy container. An +echo API as well as limitador, authorino, and some test policies are configured. + +To expose the envoy endpoint run the following: + +```sh +kubectl port-forward --namespace default deployment/envoy 8000:8000 +``` + +There is then a single auth policy defined for e2e testing: + +* `auth-a` which defines auth is required for requests to `/get` for the `AuthConfig` with `effective-route-1` + +```sh +curl -H "Host: test.a.auth.com" http://127.0.0.1:8000/get -i +# HTTP/1.1 401 Unauthorized +``` + +```sh +curl -H "Host: test.a.auth.com" -H "Authorization: APIKEY IAMALICE" http://127.0.0.1:8000/get -i +# HTTP/1.1 200 OK +``` + +And three rate limit policies defined for e2e testing: + +* `rlp-a`: Only one data item. Data selector should not generate return any value. Thus, descriptor should be empty and + rate limiting service should **not** be called. + +```sh +curl -H "Host: test.a.rlp.com" http://127.0.0.1:8000/get -i +``` + +* `rlp-b`: Conditions do not match. Hence, rate limiting service should **not** be called. + +```sh +curl -H "Host: test.b.rlp.com" http://127.0.0.1:8000/get -i +``` + +* `rlp-c`: Four descriptors from multiple rules should be generated. Hence, rate limiting service should be called. + +```sh +curl -H "Host: test.c.rlp.com" -H "x-forwarded-for: 127.0.0.1" -H "My-Custom-Header-01: my-custom-header-value-01" -H "x-dyn-user-id: bob" http://127.0.0.1:8000/get -i +``` + +* `multi-a` which defines two actions for authenticated ratelimiting. + +```sh +curl -H "Host: test.a.multi.com" http://127.0.0.1:8000/get -i +# HTTP/1.1 401 Unauthorized +``` + +Alice has 5 requests per 10 seconds: +```sh +while :; do curl --write-out '%{http_code}\n' --silent --output /dev/null -H "Authorization: APIKEY IAMALICE" -H "Host: test.a.multi.com" http://127.0.0.1:8000/get | grep -E --color "\b(429)\b|$"; sleep 1; done +``` + +Bob has 2 requests per 10 seconds: +```sh +while :; do curl --write-out '%{http_code}\n' --silent --output /dev/null -H "Authorization: APIKEY IAMBOB" -H "Host: test.a.multi.com" http://127.0.0.1:8000/get | grep -E --color "\b(429)\b|$"; sleep 1; done +``` + +The expected descriptors: + +``` +RateLimitDescriptor { entries: [Entry { key: "limit_to_be_activated", value: "1" }], limit: None } +``` + +``` +RateLimitDescriptor { entries: [Entry { key: "source.address", value: "127.0.0.1:0" }], limit: None } +``` + +``` +RateLimitDescriptor { entries: [Entry { key: "request.headers.My-Custom-Header-01", value: "my-custom-header-value-01" }], limit: None } +``` + +``` +RateLimitDescriptor { entries: [Entry { key: "user_id", value: "bob" }], limit: None } +``` + +To rebuild and deploy to the cluster: + +```sh +make build local-rollout +``` + +Stop and clean up resources: + +```sh +make local-cleanup +``` + +## Running local development environment (docker-compose legacy) `docker` and `docker-compose` required. @@ -139,7 +252,8 @@ make development Three rate limit policies defined for e2e testing: -* `rlp-a`: Only one data item. Data selector should not generate return any value. Thus, descriptor should be empty and rate limiting service should **not** be called. +* `rlp-a`: Only one data item. Data selector should not generate return any value. Thus, descriptor should be empty and + rate limiting service should **not** be called. ``` curl -H "Host: test.a.com" http://127.0.0.1:18000/get @@ -175,7 +289,9 @@ RateLimitDescriptor { entries: [Entry { key: "request.headers.My-Custom-Header-0 RateLimitDescriptor { entries: [Entry { key: "user_id", value: "bob" }], limit: None } ``` -**Note:** Using [Header-To-Metadata filter](https://www.envoyproxy.io/docs/envoy/latest/configuration/http/http_filters/header_to_metadata_filter#config-http-filters-header-to-metadata), `x-dyn-user-id` header value is available in the metadata struct with the `user-id` key. +**Note:** +Using [Header-To-Metadata filter](https://www.envoyproxy.io/docs/envoy/latest/configuration/http/http_filters/header_to_metadata_filter#config-http-filters-header-to-metadata), `x-dyn-user-id` +header value is available in the metadata struct with the `user-id` key. According to the defined limits: @@ -187,7 +303,7 @@ According to the defined limits: conditions: - "limit_to_be_activated == '1'" - "user_id == 'bob'" - variables: [] + variables: [ ] ``` The third request in less than 10 seconds should return `429 Too Many Requests`. diff --git a/make/deploy.mk b/make/deploy.mk new file mode 100644 index 00000000..64ba2ed6 --- /dev/null +++ b/make/deploy.mk @@ -0,0 +1,126 @@ +##@ Kind + +.PHONY: kind kind-create-cluster kind-delete-cluster + +KIND = $(PROJECT_PATH)/bin/kind +KIND_VERSION = v0.23.0 +$(KIND): + $(call go-install-tool,$(KIND),sigs.k8s.io/kind@$(KIND_VERSION)) + +kind: $(KIND) ## Download kind locally if necessary. + +KIND_CLUSTER_NAME ?= wasm-auth-local + +kind-create-cluster: BUILD?=debug +kind-create-cluster: WASM_PATH=$(subst /,\/,$(PROJECT_PATH)/target/wasm32-unknown-unknown/$(BUILD)) +kind-create-cluster: kind ## Create the "wasm-auth-local" kind cluster. + @{ \ + TEMP_FILE=/tmp/kind-cluster-$$(openssl rand -hex 4).yaml ;\ + cp $(PROJECT_PATH)/utils/kind/cluster.yaml $$TEMP_FILE ;\ + $(SED) -i "s/\$$(WASM_PATH)/$(WASM_PATH)/g" $$TEMP_FILE ;\ + KIND_EXPERIMENTAL_PROVIDER=$(CONTAINER_ENGINE) $(KIND) create cluster --name $(KIND_CLUSTER_NAME) --config $$TEMP_FILE ;\ + rm -rf $$TEMP_FILE ;\ + } + +kind-delete-cluster: ## Delete the "wasm-auth-local" kind cluster. + - KIND_EXPERIMENTAL_PROVIDER=$(CONTAINER_ENGINE) $(KIND) delete cluster --name $(KIND_CLUSTER_NAME) + + +##@ Authorino + +.PHONY: install-authorino-operator certs deploy-authorino + +AUTHORINO_IMAGE ?= quay.io/kuadrant/authorino:latest +AUTHORINO_OPERATOR_NAMESPACE ?= authorino-operator +install-authorino-operator: ## Installs Authorino Operator and dependencies into the Kubernetes cluster configured in ~/.kube/config + curl -sL https://raw.githubusercontent.com/Kuadrant/authorino-operator/main/utils/install.sh | bash -s -- --git-ref main + kubectl patch deployment/authorino-webhooks -n $(AUTHORINO_OPERATOR_NAMESPACE) -p '{"spec":{"template":{"spec":{"containers":[{"name":"webhooks","image":"$(AUTHORINO_IMAGE)","imagePullPolicy":"IfNotPresent"}]}}}}' + kubectl -n $(AUTHORINO_OPERATOR_NAMESPACE) wait --timeout=300s --for=condition=Available deployments --all + +TLS_ENABLED ?= true +AUTHORINO_INSTANCE ?= authorino +NAMESPACE ?= default +certs: sed ## Requests TLS certificates for the Authorino instance if TLS is enabled, cert-manager.io is installed, and the secret is not already present +ifeq (true,$(TLS_ENABLED)) +ifeq (,$(shell kubectl -n $(NAMESPACE) get secret/authorino-oidc-server-cert 2>/dev/null)) + curl -sl https://raw.githubusercontent.com/kuadrant/authorino/main/deploy/certs.yaml | $(SED) "s/\$$(AUTHORINO_INSTANCE)/$(AUTHORINO_INSTANCE)/g;s/\$$(NAMESPACE)/$(NAMESPACE)/g" | kubectl -n $(NAMESPACE) apply -f - +else + echo "tls cert secret found." +endif +else + echo "tls disabled." +endif + +deploy-authorino: certs sed ## Deploys an instance of Authorino into the Kubernetes cluster configured in ~/.kube/config + @{ \ + set -e ;\ + TEMP_FILE=/tmp/authorino-deploy-$$(openssl rand -hex 4).yaml ;\ + curl -sl https://raw.githubusercontent.com/kuadrant/authorino/main/deploy/authorino.yaml > $$TEMP_FILE ;\ + $(SED) -i "s/\$$(AUTHORINO_INSTANCE)/$(AUTHORINO_INSTANCE)/g;s/\$$(TLS_ENABLED)/$(TLS_ENABLED)/g" $$TEMP_FILE ;\ + kubectl -n $(NAMESPACE) apply -f $$TEMP_FILE ;\ + kubectl patch -n $(NAMESPACE) authorino/$(AUTHORINO_INSTANCE) --type='merge' -p '{"spec":{"image": "$(AUTHORINO_IMAGE)"}}' ;\ + rm -rf $$TEMP_FILE ;\ + } + + +##@ Limitador + +deploy-limitador: + kubectl create configmap limits --from-file=$(PROJECT_PATH)/utils/docker-compose/limits.yaml + kubectl -n $(NAMESPACE) apply -f $(PROJECT_PATH)/utils/deploy/limitador.yaml + + +##@ User Apps + +.PHONY: user-apps + + +ifeq (true,$(TLS_ENABLED)) +ENVOY_OVERLAY = tls +else +ENVOY_OVERLAY = notls +endif +user-apps: ## Deploys talker API and envoy + kubectl -n $(NAMESPACE) apply -f https://raw.githubusercontent.com/kuadrant/authorino-examples/main/talker-api/talker-api-deploy.yaml + kubectl -n $(NAMESPACE) apply -f $(PROJECT_PATH)/utils/deploy/envoy-$(ENVOY_OVERLAY).yaml + kubectl -n $(NAMESPACE) apply -f $(PROJECT_PATH)/utils/deploy/authconfig.yaml + + +##@ Util + +.PHONY: local-setup local-env-setup local-cleanup local-rollout sed + +local-setup: local-env-setup + kubectl -n $(NAMESPACE) wait --timeout=300s --for=condition=Available deployments --all + @{ \ + echo "Now you can export the envoy service by doing:"; \ + echo "kubectl port-forward --namespace $(NAMESPACE) deployment/envoy 8000:8000"; \ + echo "After that, you can curl -H \"Host: myhost.com\" localhost:8000"; \ + } + +local-env-setup: + $(MAKE) kind-delete-cluster + $(MAKE) kind-create-cluster + $(MAKE) install-authorino-operator + $(MAKE) deploy-authorino + $(MAKE) deploy-limitador + $(MAKE) user-apps + +local-cleanup: kind ## Delete the "wasm-auth-local" kind cluster. + $(MAKE) kind-delete-cluster + +local-rollout: + $(MAKE) user-apps + kubectl rollout restart -n $(NAMESPACE) deployment/envoy + kubectl -n $(NAMESPACE) wait --timeout=300s --for=condition=Available deployments --all + +ifeq ($(shell uname),Darwin) +SED=$(shell which gsed) +else +SED=$(shell which sed) +endif +sed: ## Checks if GNU sed is installed +ifeq ($(SED),) + @echo "Cannot find GNU sed installed." + exit 1 +endif diff --git a/src/attribute.rs b/src/attribute.rs index aada6176..cd88625c 100644 --- a/src/attribute.rs +++ b/src/attribute.rs @@ -1,7 +1,10 @@ use crate::configuration::Path; -use crate::filter::http_context::Filter; use chrono::{DateTime, FixedOffset}; -use proxy_wasm::traits::Context; +use log::{debug, error}; +use protobuf::well_known_types::Struct; +use proxy_wasm::hostcalls; + +pub const KUADRANT_NAMESPACE: &str = "kuadrant"; pub trait Attribute { fn parse(raw_attribute: Vec) -> Result @@ -104,16 +107,135 @@ impl Attribute for DateTime { } } -#[allow(dead_code)] -pub fn get_attribute(f: &Filter, attr: &str) -> Result +pub fn get_attribute(attr: &str) -> Result where T: Attribute, { - match f.get_property(Path::from(attr).tokens()) { - None => Err(format!( - "#{} get_attribute: not found: {}", - f.context_id, attr - )), - Some(attribute_bytes) => T::parse(attribute_bytes), + match hostcalls::get_property(Path::from(attr).tokens()) { + Ok(Some(attribute_bytes)) => T::parse(attribute_bytes), + Ok(None) => Err(format!("get_attribute: not found or null: {attr}")), + Err(e) => Err(format!("get_attribute: error: {e:?}")), + } +} + +pub fn set_attribute(attr: &str, value: &[u8]) { + match hostcalls::set_property(Path::from(attr).tokens(), Some(value)) { + Ok(_) => (), + Err(_) => error!("set_attribute: failed to set property {attr}"), + }; +} + +pub fn store_metadata(metastruct: &Struct) { + let metadata = process_metadata(metastruct, String::new()); + for (key, value) in metadata { + let attr = format!("{KUADRANT_NAMESPACE}\\.{key}"); + debug!("set_attribute: {attr} = {value}"); + set_attribute(attr.as_str(), value.into_bytes().as_slice()); + } +} + +fn process_metadata(s: &Struct, prefix: String) -> Vec<(String, String)> { + let mut result = Vec::new(); + for (key, value) in s.get_fields() { + let current_prefix = if prefix.is_empty() { + key.clone() + } else { + format!("{prefix}\\.{key}") + }; + + if value.has_string_value() { + result.push((current_prefix, value.get_string_value().to_string())); + } else if value.has_struct_value() { + let nested_struct = value.get_struct_value(); + result.extend(process_metadata(nested_struct, current_prefix)); + } + } + result +} + +#[cfg(test)] +mod tests { + use crate::attribute::process_metadata; + use protobuf::well_known_types::{Struct, Value, Value_oneof_kind}; + use std::collections::HashMap; + + pub fn struct_from(values: Vec<(String, Value)>) -> Struct { + let mut hm = HashMap::new(); + for (key, value) in values { + hm.insert(key, value); + } + Struct { + fields: hm, + unknown_fields: Default::default(), + cached_size: Default::default(), + } + } + + pub fn string_value_from(value: String) -> Value { + Value { + kind: Some(Value_oneof_kind::string_value(value)), + unknown_fields: Default::default(), + cached_size: Default::default(), + } + } + + pub fn struct_value_from(value: Struct) -> Value { + Value { + kind: Some(Value_oneof_kind::struct_value(value)), + unknown_fields: Default::default(), + cached_size: Default::default(), + } + } + #[test] + fn get_metadata_one() { + let metadata = struct_from(vec![( + "identity".to_string(), + struct_value_from(struct_from(vec![( + "userid".to_string(), + string_value_from("bob".to_string()), + )])), + )]); + let output = process_metadata(&metadata, String::new()); + assert_eq!(output.len(), 1); + assert_eq!( + output, + vec![("identity\\.userid".to_string(), "bob".to_string())] + ); + } + + #[test] + fn get_metadata_two() { + let metadata = struct_from(vec![( + "identity".to_string(), + struct_value_from(struct_from(vec![ + ("userid".to_string(), string_value_from("bob".to_string())), + ("type".to_string(), string_value_from("test".to_string())), + ])), + )]); + let output = process_metadata(&metadata, String::new()); + assert_eq!(output.len(), 2); + assert!(output.contains(&("identity\\.userid".to_string(), "bob".to_string()))); + assert!(output.contains(&("identity\\.type".to_string(), "test".to_string()))); + } + + #[test] + fn get_metadata_three() { + let metadata = struct_from(vec![ + ( + "identity".to_string(), + struct_value_from(struct_from(vec![( + "userid".to_string(), + string_value_from("bob".to_string()), + )])), + ), + ( + "other_data".to_string(), + string_value_from("other_value".to_string()), + ), + ]); + let output = process_metadata(&metadata, String::new()); + assert_eq!(output.len(), 2); + assert!(output.contains(&("identity\\.userid".to_string(), "bob".to_string()))); + assert!(output.contains(&("other_data".to_string(), "other_value".to_string()))); } } diff --git a/src/configuration.rs b/src/configuration.rs index d7495ef0..c7fa9b8f 100644 --- a/src/configuration.rs +++ b/src/configuration.rs @@ -1,15 +1,22 @@ use std::cell::OnceCell; +use std::collections::HashMap; use std::fmt::{Debug, Display, Formatter}; +use std::rc::Rc; use std::sync::Arc; use cel_interpreter::objects::ValueType; use cel_interpreter::{Context, Expression, Value}; use cel_parser::{Atom, RelationOp}; +use log::debug; +use protobuf::RepeatedField; +use proxy_wasm::hostcalls; use serde::Deserialize; use crate::attribute::Attribute; +use crate::envoy::{RateLimitDescriptor, RateLimitDescriptor_Entry}; use crate::policy::Policy; use crate::policy_index::PolicyIndex; +use crate::service::GrpcService; #[derive(Deserialize, Debug, Clone)] pub struct SelectorItem { @@ -439,15 +446,14 @@ pub fn type_of(path: &str) -> Option { pub struct FilterConfig { pub index: PolicyIndex, - // Deny/Allow request when faced with an irrecoverable failure. - pub failure_mode: FailureMode, + pub services: Rc>>, } impl Default for FilterConfig { fn default() -> Self { Self { index: PolicyIndex::new(), - failure_mode: FailureMode::Deny, + services: Rc::new(HashMap::new()), } } } @@ -460,12 +466,6 @@ impl TryFrom for FilterConfig { for rlp in config.policies.iter() { for rule in &rlp.rules { - for datum in &rule.data { - let result = datum.item.compile(); - if result.is_err() { - return Err(result.err().unwrap()); - } - } for condition in &rule.conditions { for pe in &condition.all_of { let result = pe.compile(); @@ -474,52 +474,171 @@ impl TryFrom for FilterConfig { } } } + for action in &rule.actions { + for datum in &action.data { + let result = datum.item.compile(); + if result.is_err() { + return Err(result.err().unwrap()); + } + } + } } + for hostname in rlp.hostnames.iter() { index.insert(hostname, rlp.clone()); } } + // configure grpc services from the extensions in config + let services = config + .extensions + .into_iter() + .map(|(name, ext)| (name, Rc::new(GrpcService::new(Rc::new(ext))))) + .collect(); + Ok(Self { index, - failure_mode: config.failure_mode, + services: Rc::new(services), }) } } -#[derive(Deserialize, Debug, Clone)] +#[derive(Deserialize, Debug, Clone, Default, PartialEq)] #[serde(rename_all = "lowercase")] pub enum FailureMode { + #[default] Deny, Allow, } +#[derive(Deserialize, Debug, Clone, Default, PartialEq)] +#[serde(rename_all = "lowercase")] +pub enum ExtensionType { + Auth, + #[default] + RateLimit, +} + #[derive(Deserialize, Debug, Clone)] #[serde(rename_all = "camelCase")] pub struct PluginConfiguration { - #[serde(rename = "rateLimitPolicies")] + pub extensions: HashMap, pub policies: Vec, +} + +#[derive(Deserialize, Debug, Clone, Default)] +#[serde(rename_all = "camelCase")] +pub struct Extension { + #[serde(rename = "type")] + pub extension_type: ExtensionType, + pub endpoint: String, // Deny/Allow request when faced with an irrecoverable failure. pub failure_mode: FailureMode, } +#[derive(Deserialize, Debug, Clone)] +#[serde(rename_all = "camelCase")] +pub struct Action { + pub extension: String, + pub scope: String, + #[allow(dead_code)] + pub data: Vec, +} + +impl Action { + pub fn build_descriptors(&self) -> RepeatedField { + let mut entries = RepeatedField::new(); + if let Some(desc) = self.build_single_descriptor() { + entries.push(desc); + } + entries + } + + fn build_single_descriptor(&self) -> Option { + let mut entries = RepeatedField::default(); + + // iterate over data items to allow any data item to skip the entire descriptor + for data in self.data.iter() { + match &data.item { + DataType::Static(static_item) => { + let mut descriptor_entry = RateLimitDescriptor_Entry::new(); + descriptor_entry.set_key(static_item.key.to_owned()); + descriptor_entry.set_value(static_item.value.to_owned()); + entries.push(descriptor_entry); + } + DataType::Selector(selector_item) => { + let descriptor_key = match &selector_item.key { + None => selector_item.path().to_string(), + Some(key) => key.to_owned(), + }; + + let attribute_path = selector_item.path(); + debug!( + "get_property: selector: {} path: {:?}", + selector_item.selector, attribute_path + ); + let value = match hostcalls::get_property(attribute_path.tokens()).unwrap() { + //TODO(didierofrivia): Replace hostcalls by DI + None => { + debug!( + "build_single_descriptor: selector not found: {}", + attribute_path + ); + match &selector_item.default { + None => return None, // skipping the entire descriptor + Some(default_value) => default_value.clone(), + } + } + // TODO(eastizle): not all fields are strings + // https://www.envoyproxy.io/docs/envoy/latest/intro/arch_overview/advanced/attributes + Some(attribute_bytes) => match Attribute::parse(attribute_bytes) { + Ok(attr_str) => attr_str, + Err(e) => { + debug!("build_single_descriptor: failed to parse selector value: {}, error: {}", + attribute_path, e); + return None; + } + }, + // Alternative implementation (for rust >= 1.76) + // Attribute::parse(attribute_bytes) + // .inspect_err(|e| debug!("#{} build_single_descriptor: failed to parse selector value: {}, error: {}", + // filter.context_id, attribute_path, e)) + // .ok()?, + }; + let mut descriptor_entry = RateLimitDescriptor_Entry::new(); + descriptor_entry.set_key(descriptor_key); + descriptor_entry.set_value(value); + entries.push(descriptor_entry); + } + } + } + let mut res = RateLimitDescriptor::new(); + res.set_entries(entries); + Some(res) + } +} + #[cfg(test)] mod test { use super::*; const CONFIG: &str = r#"{ - "failureMode": "deny", - "rateLimitPolicies": [ + "extensions": { + "limitador": { + "type": "ratelimit", + "endpoint": "limitador-cluster", + "failureMode": "deny" + } + }, + "policies": [ { "name": "rlp-ns-A/rlp-name-A", - "domain": "rlp-ns-A/rlp-name-A", - "service": "limitador-cluster", "hostnames": ["*.toystore.com", "example.com"], "rules": [ { "conditions": [ { - "allOf": [ + "allOf": [ { "selector": "request.path", "operator": "eq", @@ -536,17 +655,22 @@ mod test { "value": "cars.toystore.com" }] }], - "data": [ - { - "static": { - "key": "rlp-ns-A/rlp-name-A", - "value": "1" - } - }, + "actions": [ { - "selector": { - "selector": "auth.metadata.username" - } + "extension": "limitador", + "scope": "rlp-ns-A/rlp-name-A", + "data": [ + { + "static": { + "key": "rlp-ns-A/rlp-name-A", + "value": "1" + } + }, + { + "selector": { + "selector": "auth.metadata.username" + } + }] }] }] }] @@ -572,7 +696,10 @@ mod test { let all_of_conditions = &conditions[0].all_of; assert_eq!(all_of_conditions.len(), 3); - let data_items = &rules[0].data; + let actions = &rules[0].actions; + assert_eq!(actions.len(), 1); + + let data_items = &actions[0].data; assert_eq!(data_items.len(), 2); // TODO(eastizle): DataItem does not implement PartialEq, add it only for testing? @@ -605,8 +732,8 @@ mod test { #[test] fn parse_config_min() { let config = r#"{ - "failureMode": "deny", - "rateLimitPolicies": [] + "extensions": {}, + "policies": [] }"#; let res = serde_json::from_str::(config); if let Err(ref e) = res { @@ -621,22 +748,31 @@ mod test { #[test] fn parse_config_data_selector() { let config = r#"{ - "failureMode": "deny", - "rateLimitPolicies": [ + "extensions": { + "limitador": { + "type": "ratelimit", + "endpoint": "limitador-cluster", + "failureMode": "deny" + } + }, + "policies": [ { "name": "rlp-ns-A/rlp-name-A", - "domain": "rlp-ns-A/rlp-name-A", - "service": "limitador-cluster", "hostnames": ["*.toystore.com", "example.com"], "rules": [ { - "data": [ + "actions": [ { - "selector": { - "selector": "my.selector.path", - "key": "mykey", - "default": "my_selector_default_value" - } + "extension": "limitador", + "scope": "rlp-ns-A/rlp-name-A", + "data": [ + { + "selector": { + "selector": "my.selector.path", + "key": "mykey", + "default": "my_selector_default_value" + } + }] }] }] }] @@ -653,7 +789,10 @@ mod test { let rules = &filter_config.policies[0].rules; assert_eq!(rules.len(), 1); - let data_items = &rules[0].data; + let actions = &rules[0].actions; + assert_eq!(actions.len(), 1); + + let data_items = &actions[0].data; assert_eq!(data_items.len(), 1); if let DataType::Selector(selector_item) = &data_items[0].item { @@ -671,12 +810,16 @@ mod test { #[test] fn parse_config_condition_selector_operators() { let config = r#"{ - "failureMode": "deny", - "rateLimitPolicies": [ + "extensions": { + "limitador": { + "type": "ratelimit", + "endpoint": "limitador-cluster", + "failureMode": "deny" + } + }, + "policies": [ { "name": "rlp-ns-A/rlp-name-A", - "domain": "rlp-ns-A/rlp-name-A", - "service": "limitador-cluster", "hostnames": ["*.toystore.com", "example.com"], "rules": [ { @@ -709,7 +852,12 @@ mod test { "value": "*.com" }] }], - "data": [ { "selector": { "selector": "my.selector.path" } }] + "actions": [ + { + "extension": "limitador", + "scope": "rlp-ns-A/rlp-name-A", + "data": [ { "selector": { "selector": "my.selector.path" } }] + }] }] }] }"#; @@ -750,26 +898,35 @@ mod test { #[test] fn parse_config_conditions_optional() { let config = r#"{ - "failureMode": "deny", - "rateLimitPolicies": [ + "extensions": { + "limitador": { + "type": "ratelimit", + "endpoint": "limitador-cluster", + "failureMode": "deny" + } + }, + "policies": [ { "name": "rlp-ns-A/rlp-name-A", - "domain": "rlp-ns-A/rlp-name-A", - "service": "limitador-cluster", "hostnames": ["*.toystore.com", "example.com"], "rules": [ { - "data": [ + "actions": [ { - "static": { - "key": "rlp-ns-A/rlp-name-A", - "value": "1" - } - }, - { - "selector": { - "selector": "auth.metadata.username" - } + "extension": "limitador", + "scope": "rlp-ns-A/rlp-name-A", + "data": [ + { + "static": { + "key": "rlp-ns-A/rlp-name-A", + "value": "1" + } + }, + { + "selector": { + "selector": "auth.metadata.username" + } + }] }] }] }] @@ -794,24 +951,33 @@ mod test { fn parse_config_invalid_data() { // data item fields are mutually exclusive let bad_config = r#"{ - "failureMode": "deny", - "rateLimitPolicies": [ + "extensions": { + "limitador": { + "type": "ratelimit", + "endpoint": "limitador-cluster", + "failureMode": "deny" + } + }, + "policies": [ { "name": "rlp-ns-A/rlp-name-A", - "domain": "rlp-ns-A/rlp-name-A", - "service": "limitador-cluster", "hostnames": ["*.toystore.com", "example.com"], "rules": [ { - "data": [ + "actions": [ { - "static": { - "key": "rlp-ns-A/rlp-name-A", - "value": "1" - }, - "selector": { - "selector": "auth.metadata.username" - } + "extension": "limitador", + "scope": "rlp-ns-A/rlp-name-A", + "data": [ + { + "static": { + "key": "rlp-ns-A/rlp-name-A", + "value": "1" + }, + "selector": { + "selector": "auth.metadata.username" + } + }] }] }] }] @@ -821,21 +987,31 @@ mod test { // data item unknown fields are forbidden let bad_config = r#"{ - "failureMode": "deny", - "rateLimitPolicies": [ + "extensions": { + "limitador": { + "type": "ratelimit", + "endpoint": "limitador-cluster", + "failureMode": "deny" + } + }, + "policies": [ { "name": "rlp-ns-A/rlp-name-A", - "domain": "rlp-ns-A/rlp-name-A", "service": "limitador-cluster", "hostnames": ["*.toystore.com", "example.com"], "rules": [ { - "data": [ + "actions": [ { - "unknown": { - "key": "rlp-ns-A/rlp-name-A", - "value": "1" - } + "extension": "limitador", + "scope": "rlp-ns-A/rlp-name-A", + "data": [ + { + "unknown": { + "key": "rlp-ns-A/rlp-name-A", + "value": "1" + } + }] }] }] }] @@ -845,25 +1021,34 @@ mod test { // condition selector operator unknown let bad_config = r#"{ - "failureMode": "deny", - "rateLimitPolicies": [ + "extensions": { + "limitador": { + "type": "ratelimit", + "endpoint": "limitador-cluster", + "failureMode": "deny" + } + }, + "policies": [ { "name": "rlp-ns-A/rlp-name-A", - "domain": "rlp-ns-A/rlp-name-A", - "service": "limitador-cluster", "hostnames": ["*.toystore.com", "example.com"], "rules": [ { "conditions": [ { - "allOf": [ + "allOf": [ { "selector": "request.path", "operator": "unknown", "value": "/admin/toy" }] }], - "data": [ { "selector": { "selector": "my.selector.path" } }] + "actions": [ + { + "extension": "limitador", + "scope": "rlp-ns-A/rlp-name-A", + "data": [ { "selector": { "selector": "my.selector.path" } }] + }] }] }] }"#; diff --git a/src/envoy/mod.rs b/src/envoy/mod.rs index 68e18d60..60810273 100644 --- a/src/envoy/mod.rs +++ b/src/envoy/mod.rs @@ -31,6 +31,13 @@ mod token_bucket; mod value; pub use { + address::{Address, SocketAddress}, + attribute_context::{ + AttributeContext, AttributeContext_HttpRequest, AttributeContext_Peer, + AttributeContext_Request, + }, + base::Metadata, + external_auth::{CheckRequest, CheckResponse, CheckResponse_oneof_http_response}, ratelimit::{RateLimitDescriptor, RateLimitDescriptor_Entry}, rls::{RateLimitRequest, RateLimitResponse, RateLimitResponse_Code}, }; diff --git a/src/filter/http_context.rs b/src/filter/http_context.rs index 281eab93..cbc7eea9 100644 --- a/src/filter/http_context.rs +++ b/src/filter/http_context.rs @@ -1,42 +1,20 @@ -use crate::configuration::{FailureMode, FilterConfig}; -use crate::envoy::{RateLimitResponse, RateLimitResponse_Code}; -use crate::filter::http_context::TracingHeader::{Baggage, Traceparent, Tracestate}; +use crate::attribute::store_metadata; +use crate::configuration::{ExtensionType, FailureMode, FilterConfig}; +use crate::envoy::{CheckResponse_oneof_http_response, RateLimitResponse, RateLimitResponse_Code}; +use crate::operation_dispatcher::OperationDispatcher; use crate::policy::Policy; -use crate::service::rate_limit::RateLimitService; -use crate::service::Service; +use crate::service::grpc_message::GrpcMessageResponse; use log::{debug, warn}; -use protobuf::Message; use proxy_wasm::traits::{Context, HttpContext}; -use proxy_wasm::types::{Action, Bytes}; +use proxy_wasm::types::Action; +use std::cell::RefCell; use std::rc::Rc; -// tracing headers -#[derive(Clone)] -pub enum TracingHeader { - Traceparent, - Tracestate, - Baggage, -} - -impl TracingHeader { - fn all() -> [Self; 3] { - [Traceparent, Tracestate, Baggage] - } - - pub fn as_str(&self) -> &'static str { - match self { - Traceparent => "traceparent", - Tracestate => "tracestate", - Baggage => "baggage", - } - } -} - pub struct Filter { pub context_id: u32, pub config: Rc, pub response_headers_to_add: Vec<(String, String)>, - pub tracing_headers: Vec<(TracingHeader, Bytes)>, + pub operation_dispatcher: RefCell, } impl Filter { @@ -53,57 +31,142 @@ impl Filter { } } - fn process_rate_limit_policy(&self, rlp: &Policy) -> Action { - let descriptors = rlp.build_descriptors(self); - if descriptors.is_empty() { - debug!( - "#{} process_rate_limit_policy: empty descriptors", - self.context_id - ); + fn process_policy(&self, policy: &Policy) -> Action { + if let Some(rule) = policy.find_rule_that_applies() { + self.operation_dispatcher + .borrow_mut() + .build_operations(rule); + } else { + debug!("#{} process_policy: no rule applied", self.context_id); return Action::Continue; } - let rls = RateLimitService::new(rlp.service.as_str(), self.tracing_headers.clone()); - let message = RateLimitService::message(rlp.domain.clone(), descriptors); - - match rls.send(message) { - Ok(call_id) => { - debug!( - "#{} initiated gRPC call (id# {}) to Limitador", - self.context_id, call_id - ); - Action::Pause - } - Err(e) => { - warn!("gRPC call to Limitador failed! {e:?}"); - if let FailureMode::Deny = self.config.failure_mode { - self.send_http_response(500, vec![], Some(b"Internal Server Error.\n")) + if let Some(operation) = self.operation_dispatcher.borrow_mut().next() { + match operation.get_result() { + Ok(call_id) => { + debug!("#{} initiated gRPC call (id# {})", self.context_id, call_id); + Action::Pause + } + Err(e) => { + warn!("gRPC call failed! {e:?}"); + if let FailureMode::Deny = operation.get_failure_mode() { + self.send_http_response(500, vec![], Some(b"Internal Server Error.\n")) + } + Action::Continue } - Action::Continue } + } else { + Action::Continue } } - fn handle_error_on_grpc_response(&self) { - match &self.config.failure_mode { + fn handle_error_on_grpc_response(&self, failure_mode: &FailureMode) { + match failure_mode { FailureMode::Deny => { self.send_http_response(500, vec![], Some(b"Internal Server Error.\n")) } FailureMode::Allow => self.resume_http_request(), } } + + fn process_ratelimit_grpc_response( + &mut self, + rl_resp: GrpcMessageResponse, + failure_mode: &FailureMode, + ) { + match rl_resp { + GrpcMessageResponse::RateLimit(RateLimitResponse { + overall_code: RateLimitResponse_Code::UNKNOWN, + .. + }) => { + self.handle_error_on_grpc_response(failure_mode); + } + GrpcMessageResponse::RateLimit(RateLimitResponse { + overall_code: RateLimitResponse_Code::OVER_LIMIT, + response_headers_to_add: rl_headers, + .. + }) => { + let mut response_headers = vec![]; + for header in &rl_headers { + response_headers.push((header.get_key(), header.get_value())); + } + self.send_http_response(429, response_headers, Some(b"Too Many Requests\n")); + } + GrpcMessageResponse::RateLimit(RateLimitResponse { + overall_code: RateLimitResponse_Code::OK, + response_headers_to_add: additional_headers, + .. + }) => { + for header in additional_headers { + self.response_headers_to_add + .push((header.key, header.value)); + } + } + _ => {} + } + self.operation_dispatcher.borrow_mut().next(); + } + + fn process_auth_grpc_response( + &self, + auth_resp: GrpcMessageResponse, + failure_mode: &FailureMode, + ) { + if let GrpcMessageResponse::Auth(check_response) = auth_resp { + // store dynamic metadata in filter state + store_metadata(check_response.get_dynamic_metadata()); + + match check_response.http_response { + Some(CheckResponse_oneof_http_response::ok_response(ok_response)) => { + debug!( + "#{} process_auth_grpc_response: received OkHttpResponse", + self.context_id + ); + + ok_response + .get_response_headers_to_add() + .iter() + .for_each(|header| { + self.add_http_response_header( + header.get_header().get_key(), + header.get_header().get_value(), + ) + }); + } + Some(CheckResponse_oneof_http_response::denied_response(denied_response)) => { + debug!( + "#{} process_auth_grpc_response: received DeniedHttpResponse", + self.context_id + ); + + let mut response_headers = vec![]; + denied_response.get_headers().iter().for_each(|header| { + response_headers.push(( + header.get_header().get_key(), + header.get_header().get_value(), + )) + }); + self.send_http_response( + denied_response.get_status().code as u32, + response_headers, + Some(denied_response.get_body().as_ref()), + ); + return; + } + None => { + self.handle_error_on_grpc_response(failure_mode); + return; + } + } + } + self.operation_dispatcher.borrow_mut().next(); + } } impl HttpContext for Filter { fn on_http_request_headers(&mut self, _: usize, _: bool) -> Action { debug!("#{} on_http_request_headers", self.context_id); - for header in TracingHeader::all() { - if let Some(value) = self.get_http_request_header_bytes(header.as_str()) { - self.tracing_headers.push((header, value)) - } - } - match self .config .index @@ -116,9 +179,9 @@ impl HttpContext for Filter { ); Action::Continue } - Some(rlp) => { - debug!("#{} ratelimitpolicy selected {}", self.context_id, rlp.name); - self.process_rate_limit_policy(rlp) + Some(policy) => { + debug!("#{} policy selected {}", self.context_id, policy.name); + self.process_policy(policy) } } } @@ -143,55 +206,41 @@ impl Context for Filter { self.context_id ); - let res_body_bytes = match self.get_grpc_call_response_body(0, resp_size) { - Some(bytes) => bytes, - None => { - warn!("grpc response body is empty!"); - self.handle_error_on_grpc_response(); - return; - } - }; - - let rl_resp: RateLimitResponse = match Message::parse_from_bytes(&res_body_bytes) { - Ok(res) => res, - Err(e) => { - warn!("failed to parse grpc response body into RateLimitResponse message: {e}"); - self.handle_error_on_grpc_response(); - return; - } - }; + let some_op = self.operation_dispatcher.borrow().get_operation(token_id); - match rl_resp { - RateLimitResponse { - overall_code: RateLimitResponse_Code::UNKNOWN, - .. - } => { - self.handle_error_on_grpc_response(); - return; - } - RateLimitResponse { - overall_code: RateLimitResponse_Code::OVER_LIMIT, - response_headers_to_add: rl_headers, - .. - } => { - let mut response_headers = vec![]; - for header in &rl_headers { - response_headers.push((header.get_key(), header.get_value())); + if let Some(operation) = some_op { + let failure_mode = &operation.get_failure_mode(); + let res_body_bytes = match self.get_grpc_call_response_body(0, resp_size) { + Some(bytes) => bytes, + None => { + warn!("grpc response body is empty!"); + self.handle_error_on_grpc_response(failure_mode); + return; } - self.send_http_response(429, response_headers, Some(b"Too Many Requests\n")); - return; + }; + let res = + match GrpcMessageResponse::new(operation.get_extension_type(), &res_body_bytes) { + Ok(res) => res, + Err(e) => { + warn!( + "failed to parse grpc response body into GrpcMessageResponse message: {e}" + ); + self.handle_error_on_grpc_response(failure_mode); + return; + } + }; + match operation.get_extension_type() { + ExtensionType::Auth => self.process_auth_grpc_response(res, failure_mode), + ExtensionType::RateLimit => self.process_ratelimit_grpc_response(res, failure_mode), } - RateLimitResponse { - overall_code: RateLimitResponse_Code::OK, - response_headers_to_add: additional_headers, - .. - } => { - for header in additional_headers { - self.response_headers_to_add - .push((header.key, header.value)); - } + + if let Some(_op) = self.operation_dispatcher.borrow_mut().next() { + } else { + self.resume_http_request() } + } else { + warn!("No Operation found with token_id: {token_id}"); + self.handle_error_on_grpc_response(&FailureMode::Deny); // TODO(didierofrivia): Decide on what's the default failure mode } - self.resume_http_request(); } } diff --git a/src/filter/root_context.rs b/src/filter/root_context.rs index ab28c72c..5e5a8aa6 100644 --- a/src/filter/root_context.rs +++ b/src/filter/root_context.rs @@ -1,9 +1,12 @@ use crate::configuration::{FilterConfig, PluginConfiguration}; use crate::filter::http_context::Filter; +use crate::operation_dispatcher::OperationDispatcher; +use crate::service::{GrpcServiceHandler, HeaderResolver}; use const_format::formatcp; use log::{debug, error, info}; use proxy_wasm::traits::{Context, HttpContext, RootContext}; use proxy_wasm::types::ContextType; +use std::collections::HashMap; use std::rc::Rc; const WASM_SHIM_VERSION: &str = env!("CARGO_PKG_VERSION"); @@ -36,11 +39,23 @@ impl RootContext for FilterRoot { fn create_http_context(&self, context_id: u32) -> Option> { debug!("#{} create_http_context", context_id); + let mut service_handlers: HashMap> = HashMap::new(); + self.config + .services + .iter() + .for_each(|(extension, service)| { + service_handlers + .entry(extension.clone()) + .or_insert(Rc::from(GrpcServiceHandler::new( + Rc::clone(service), + Rc::new(HeaderResolver::new()), + ))); + }); Some(Box::new(Filter { context_id, config: Rc::clone(&self.config), response_headers_to_add: Vec::default(), - tracing_headers: Vec::default(), + operation_dispatcher: OperationDispatcher::new(service_handlers).into(), })) } diff --git a/src/lib.rs b/src/lib.rs index 179174b3..279ed839 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -4,6 +4,7 @@ mod configuration; mod envoy; mod filter; mod glob; +mod operation_dispatcher; mod policy; mod policy_index; mod service; diff --git a/src/operation_dispatcher.rs b/src/operation_dispatcher.rs new file mode 100644 index 00000000..c9c6b0ce --- /dev/null +++ b/src/operation_dispatcher.rs @@ -0,0 +1,422 @@ +use crate::configuration::{Action, Extension, ExtensionType, FailureMode}; +use crate::policy::Rule; +use crate::service::grpc_message::GrpcMessageRequest; +use crate::service::{GetMapValuesBytesFn, GrpcCallFn, GrpcMessageBuildFn, GrpcServiceHandler}; +use log::error; +use proxy_wasm::hostcalls; +use proxy_wasm::types::{Bytes, MapType, Status}; +use std::cell::RefCell; +use std::collections::HashMap; +use std::rc::Rc; +use std::time::Duration; + +#[allow(dead_code)] +#[derive(PartialEq, Debug, Clone, Copy)] +pub(crate) enum State { + Pending, + Waiting, + Done, +} + +#[allow(dead_code)] +impl State { + fn next(&mut self) { + match self { + State::Pending => *self = State::Waiting, + State::Waiting => *self = State::Done, + _ => {} + } + } + + fn done(&mut self) { + *self = State::Done + } +} + +#[allow(dead_code)] +#[derive(Clone)] +pub(crate) struct Operation { + state: RefCell, + result: RefCell>, + extension: Rc, + action: Action, + service: Rc, + grpc_call_fn: GrpcCallFn, + get_map_values_bytes_fn: GetMapValuesBytesFn, + grpc_message_build_fn: GrpcMessageBuildFn, +} + +#[allow(dead_code)] +impl Operation { + pub fn new(extension: Rc, action: Action, service: Rc) -> Self { + Self { + state: RefCell::new(State::Pending), + result: RefCell::new(Ok(0)), // Heuristics: zero represents that it's not been triggered, following `hostcalls` example + extension, + action, + service, + grpc_call_fn, + get_map_values_bytes_fn, + grpc_message_build_fn, + } + } + + fn trigger(&self) -> Result { + if let Some(message) = (self.grpc_message_build_fn)(self.get_extension_type(), &self.action) + { + let res = self + .service + .send(self.get_map_values_bytes_fn, self.grpc_call_fn, message); + self.set_result(res); + self.next_state(); + res + } else { + self.done(); + self.get_result() + } + } + + fn next_state(&self) { + self.state.borrow_mut().next() + } + + fn done(&self) { + self.state.borrow_mut().done() + } + + pub fn get_state(&self) -> State { + *self.state.borrow() + } + + pub fn get_result(&self) -> Result { + *self.result.borrow() + } + + fn set_result(&self, result: Result) { + *self.result.borrow_mut() = result; + } + + pub fn get_extension_type(&self) -> &ExtensionType { + &self.extension.extension_type + } + + pub fn get_failure_mode(&self) -> &FailureMode { + &self.extension.failure_mode + } +} + +#[allow(dead_code)] +pub struct OperationDispatcher { + operations: Vec>, + waiting_operations: HashMap>, + service_handlers: HashMap>, +} + +#[allow(dead_code)] +impl OperationDispatcher { + pub fn default() -> Self { + OperationDispatcher { + operations: vec![], + waiting_operations: HashMap::default(), + service_handlers: HashMap::default(), + } + } + pub fn new(service_handlers: HashMap>) -> Self { + Self { + service_handlers, + operations: vec![], + waiting_operations: HashMap::new(), + } + } + + pub fn get_operation(&self, token_id: u32) -> Option> { + self.waiting_operations.get(&token_id).cloned() + } + + pub fn build_operations(&mut self, rule: &Rule) { + let mut operations: Vec> = vec![]; + for action in rule.actions.iter() { + // TODO(didierofrivia): Error handling + if let Some(service) = self.service_handlers.get(&action.extension) { + operations.push(Rc::new(Operation::new( + service.get_extension(), + action.clone(), + Rc::clone(service), + ))) + } + } + self.push_operations(operations); + } + + pub fn push_operations(&mut self, operations: Vec>) { + self.operations.extend(operations); + } + + pub fn get_current_operation_state(&self) -> Option { + self.operations + .first() + .map(|operation| operation.get_state()) + } + + pub fn next(&mut self) -> Option> { + if let Some((i, operation)) = self.operations.iter_mut().enumerate().next() { + match operation.get_state() { + State::Pending => { + match operation.trigger() { + Ok(token_id) => { + match operation.get_state() { + State::Pending => { + panic!("Operation dispatcher reached an undefined state"); + } + State::Waiting => { + // We index only if it was just transitioned to Waiting after triggering + self.waiting_operations.insert(token_id, operation.clone()); + // TODO(didierofrivia): Decide on indexing the failed operations. + Some(operation.clone()) + } + State::Done => self.next(), + } + } + Err(status) => { + error!("{status:?}"); + None + } + } + } + State::Waiting => { + operation.next_state(); + Some(operation.clone()) + } + State::Done => { + if let Ok(token_id) = operation.get_result() { + self.waiting_operations.remove(&token_id); + } // If result was Err, means the operation wasn't indexed + self.operations.remove(i); + self.next() + } + } + } else { + None + } + } +} + +fn grpc_call_fn( + upstream_name: &str, + service_name: &str, + method_name: &str, + initial_metadata: Vec<(&str, &[u8])>, + message: Option<&[u8]>, + timeout: Duration, +) -> Result { + hostcalls::dispatch_grpc_call( + upstream_name, + service_name, + method_name, + initial_metadata, + message, + timeout, + ) +} + +fn get_map_values_bytes_fn(map_type: MapType, key: &str) -> Result, Status> { + hostcalls::get_map_value_bytes(map_type, key) +} + +fn grpc_message_build_fn( + extension_type: &ExtensionType, + action: &Action, +) -> Option { + GrpcMessageRequest::new(extension_type, action) +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::envoy::RateLimitRequest; + use protobuf::RepeatedField; + use std::time::Duration; + + fn default_grpc_call_fn_stub( + _upstream_name: &str, + _service_name: &str, + _method_name: &str, + _initial_metadata: Vec<(&str, &[u8])>, + _message: Option<&[u8]>, + _timeout: Duration, + ) -> Result { + Ok(200) + } + + fn get_map_values_bytes_fn_stub( + _map_type: MapType, + _key: &str, + ) -> Result, Status> { + Ok(Some(Vec::new())) + } + + fn grpc_message_build_fn_stub( + _extension_type: &ExtensionType, + _action: &Action, + ) -> Option { + Some(GrpcMessageRequest::RateLimit(build_message())) + } + + fn build_grpc_service_handler() -> GrpcServiceHandler { + GrpcServiceHandler::new(Rc::new(Default::default()), Rc::new(Default::default())) + } + + fn build_message() -> RateLimitRequest { + RateLimitRequest { + domain: "example.org".to_string(), + descriptors: RepeatedField::new(), + hits_addend: 1, + unknown_fields: Default::default(), + cached_size: Default::default(), + } + } + + fn build_operation( + grpc_call_fn_stub: GrpcCallFn, + extension_type: ExtensionType, + ) -> Rc { + Rc::new(Operation { + state: RefCell::from(State::Pending), + result: RefCell::new(Ok(0)), + extension: Rc::new(Extension { + extension_type, + endpoint: "local".to_string(), + failure_mode: FailureMode::Deny, + }), + action: Action { + extension: "local".to_string(), + scope: "".to_string(), + data: vec![], + }, + service: Rc::new(build_grpc_service_handler()), + grpc_call_fn: grpc_call_fn_stub, + get_map_values_bytes_fn: get_map_values_bytes_fn_stub, + grpc_message_build_fn: grpc_message_build_fn_stub, + }) + } + + #[test] + fn operation_getters() { + let operation = build_operation(default_grpc_call_fn_stub, ExtensionType::RateLimit); + + assert_eq!(operation.get_state(), State::Pending); + assert_eq!(*operation.get_extension_type(), ExtensionType::RateLimit); + assert_eq!(*operation.get_failure_mode(), FailureMode::Deny); + assert_eq!(operation.get_result(), Ok(0)); + } + + #[test] + fn operation_transition() { + let operation = build_operation(default_grpc_call_fn_stub, ExtensionType::RateLimit); + assert_eq!(operation.get_result(), Ok(0)); + assert_eq!(operation.get_state(), State::Pending); + let mut res = operation.trigger(); + assert_eq!(res, Ok(200)); + assert_eq!(operation.get_state(), State::Waiting); + res = operation.trigger(); + assert_eq!(res, Ok(200)); + assert_eq!(operation.get_result(), Ok(200)); + assert_eq!(operation.get_state(), State::Done); + } + + #[test] + fn operation_dispatcher_push_actions() { + let mut operation_dispatcher = OperationDispatcher::default(); + + assert_eq!(operation_dispatcher.operations.len(), 0); + operation_dispatcher.push_operations(vec![build_operation( + default_grpc_call_fn_stub, + ExtensionType::RateLimit, + )]); + + assert_eq!(operation_dispatcher.operations.len(), 1); + } + + #[test] + fn operation_dispatcher_get_current_action_state() { + let mut operation_dispatcher = OperationDispatcher::default(); + operation_dispatcher.push_operations(vec![build_operation( + default_grpc_call_fn_stub, + ExtensionType::RateLimit, + )]); + assert_eq!( + operation_dispatcher.get_current_operation_state(), + Some(State::Pending) + ); + } + + #[test] + fn operation_dispatcher_next() { + let mut operation_dispatcher = OperationDispatcher::default(); + + fn grpc_call_fn_stub_66( + _upstream_name: &str, + _service_name: &str, + _method_name: &str, + _initial_metadata: Vec<(&str, &[u8])>, + _message: Option<&[u8]>, + _timeout: Duration, + ) -> Result { + Ok(66) + } + + fn grpc_call_fn_stub_77( + _upstream_name: &str, + _service_name: &str, + _method_name: &str, + _initial_metadata: Vec<(&str, &[u8])>, + _message: Option<&[u8]>, + _timeout: Duration, + ) -> Result { + Ok(77) + } + + operation_dispatcher.push_operations(vec![ + build_operation(grpc_call_fn_stub_66, ExtensionType::RateLimit), + build_operation(grpc_call_fn_stub_77, ExtensionType::Auth), + ]); + + assert_eq!( + operation_dispatcher.get_current_operation_state(), + Some(State::Pending) + ); + assert_eq!(operation_dispatcher.waiting_operations.len(), 0); + + let mut op = operation_dispatcher.next(); + assert_eq!(op.clone().unwrap().get_result(), Ok(66)); + assert_eq!( + *op.clone().unwrap().get_extension_type(), + ExtensionType::RateLimit + ); + assert_eq!(op.unwrap().get_state(), State::Waiting); + assert_eq!(operation_dispatcher.waiting_operations.len(), 1); + + op = operation_dispatcher.next(); + assert_eq!(op.clone().unwrap().get_result(), Ok(66)); + assert_eq!(op.unwrap().get_state(), State::Done); + + op = operation_dispatcher.next(); + assert_eq!(op.clone().unwrap().get_result(), Ok(77)); + assert_eq!( + *op.clone().unwrap().get_extension_type(), + ExtensionType::Auth + ); + assert_eq!(op.unwrap().get_state(), State::Waiting); + assert_eq!(operation_dispatcher.waiting_operations.len(), 1); + + op = operation_dispatcher.next(); + assert_eq!(op.clone().unwrap().get_result(), Ok(77)); + assert_eq!(op.unwrap().get_state(), State::Done); + assert_eq!(operation_dispatcher.waiting_operations.len(), 1); + + op = operation_dispatcher.next(); + assert!(op.is_none()); + assert!(operation_dispatcher.get_current_operation_state().is_none()); + assert_eq!(operation_dispatcher.waiting_operations.len(), 0); + } +} diff --git a/src/policy.rs b/src/policy.rs index 6788b74c..db9ed9fe 100644 --- a/src/policy.rs +++ b/src/policy.rs @@ -1,9 +1,6 @@ -use crate::attribute::Attribute; -use crate::configuration::{DataItem, DataType, PatternExpression}; -use crate::envoy::{RateLimitDescriptor, RateLimitDescriptor_Entry}; -use crate::filter::http_context::Filter; +use crate::configuration::{Action, PatternExpression}; use log::debug; -use proxy_wasm::traits::Context; +use proxy_wasm::hostcalls; use serde::Deserialize; #[derive(Deserialize, Debug, Clone)] @@ -14,55 +11,36 @@ pub struct Condition { #[derive(Deserialize, Debug, Clone)] pub struct Rule { - // #[serde(default)] pub conditions: Vec, - // - pub data: Vec, + pub actions: Vec, } #[derive(Deserialize, Debug, Clone)] #[serde(rename_all = "camelCase")] pub struct Policy { pub name: String, - pub domain: String, - pub service: String, pub hostnames: Vec, pub rules: Vec, } impl Policy { #[cfg(test)] - pub fn new( - name: String, - domain: String, - service: String, - hostnames: Vec, - rules: Vec, - ) -> Self { + pub fn new(name: String, hostnames: Vec, rules: Vec) -> Self { Policy { name, - domain, - service, hostnames, rules, } } - pub fn build_descriptors( - &self, - filter: &Filter, - ) -> protobuf::RepeatedField { + pub fn find_rule_that_applies(&self) -> Option<&Rule> { self.rules .iter() - .filter(|rule: &&Rule| self.filter_rule_by_conditions(filter, &rule.conditions)) - // Mapping 1 Rule -> 1 Descriptor - // Filter out empty descriptors - .filter_map(|rule| self.build_single_descriptor(filter, &rule.data)) - .collect() + .find(|rule: &&Rule| self.filter_rule_by_conditions(&rule.conditions)) } - fn filter_rule_by_conditions(&self, filter: &Filter, conditions: &[Condition]) -> bool { + fn filter_rule_by_conditions(&self, conditions: &[Condition]) -> bool { if conditions.is_empty() { // no conditions is equivalent to matching all the requests. return true; @@ -70,108 +48,36 @@ impl Policy { conditions .iter() - .any(|condition| self.condition_applies(filter, condition)) + .any(|condition| self.condition_applies(condition)) } - fn condition_applies(&self, filter: &Filter, condition: &Condition) -> bool { + fn condition_applies(&self, condition: &Condition) -> bool { condition .all_of .iter() - .all(|pattern_expression| self.pattern_expression_applies(filter, pattern_expression)) + .all(|pattern_expression| self.pattern_expression_applies(pattern_expression)) } - fn pattern_expression_applies(&self, filter: &Filter, p_e: &PatternExpression) -> bool { + fn pattern_expression_applies(&self, p_e: &PatternExpression) -> bool { let attribute_path = p_e.path(); debug!( - "#{} get_property: selector: {} path: {:?}", - filter.context_id, p_e.selector, attribute_path + "get_property: selector: {} path: {:?}", + p_e.selector, attribute_path ); - let attribute_value = match filter.get_property(attribute_path) { + let attribute_value = match hostcalls::get_property(attribute_path).unwrap() { + //TODO(didierofrivia): Replace hostcalls by DI None => { debug!( - "#{} pattern_expression_applies: selector not found: {}, defaulting to ``", - filter.context_id, p_e.selector + "pattern_expression_applies: selector not found: {}, defaulting to ``", + p_e.selector ); b"".to_vec() } Some(attribute_bytes) => attribute_bytes, }; - match p_e.eval(attribute_value) { - Err(e) => { - debug!( - "#{} pattern_expression_applies failed: {}", - filter.context_id, e - ); - false - } - Ok(result) => result, - } - } - - fn build_single_descriptor( - &self, - filter: &Filter, - data_list: &[DataItem], - ) -> Option { - let mut entries = ::protobuf::RepeatedField::default(); - - // iterate over data items to allow any data item to skip the entire descriptor - for data in data_list.iter() { - match &data.item { - DataType::Static(static_item) => { - let mut descriptor_entry = RateLimitDescriptor_Entry::new(); - descriptor_entry.set_key(static_item.key.to_owned()); - descriptor_entry.set_value(static_item.value.to_owned()); - entries.push(descriptor_entry); - } - DataType::Selector(selector_item) => { - let descriptor_key = match &selector_item.key { - None => selector_item.path().to_string(), - Some(key) => key.to_owned(), - }; - - let attribute_path = selector_item.path(); - debug!( - "#{} get_property: selector: {} path: {:?}", - filter.context_id, selector_item.selector, attribute_path - ); - let value = match filter.get_property(attribute_path.tokens()) { - None => { - debug!( - "#{} build_single_descriptor: selector not found: {}", - filter.context_id, attribute_path - ); - match &selector_item.default { - None => return None, // skipping the entire descriptor - Some(default_value) => default_value.clone(), - } - } - // TODO(eastizle): not all fields are strings - // https://www.envoyproxy.io/docs/envoy/latest/intro/arch_overview/advanced/attributes - Some(attribute_bytes) => match Attribute::parse(attribute_bytes) { - Ok(attr_str) => attr_str, - Err(e) => { - debug!("#{} build_single_descriptor: failed to parse selector value: {}, error: {}", - filter.context_id, attribute_path, e); - return None; - } - }, - // Alternative implementation (for rust >= 1.76) - // Attribute::parse(attribute_bytes) - // .inspect_err(|e| debug!("#{} build_single_descriptor: failed to parse selector value: {}, error: {}", - // filter.context_id, attribute_path, e)) - // .ok()?, - }; - let mut descriptor_entry = RateLimitDescriptor_Entry::new(); - descriptor_entry.set_key(descriptor_key); - descriptor_entry.set_value(value); - entries.push(descriptor_entry); - } - } - } - - let mut res = RateLimitDescriptor::new(); - res.set_entries(entries); - Some(res) + p_e.eval(attribute_value).unwrap_or_else(|e| { + debug!("pattern_expression_applies failed: {}", e); + false + }) } } diff --git a/src/policy_index.rs b/src/policy_index.rs index 4a1bf607..0c0759a9 100644 --- a/src/policy_index.rs +++ b/src/policy_index.rs @@ -41,13 +41,7 @@ mod tests { use crate::policy_index::PolicyIndex; fn build_ratelimit_policy(name: &str) -> Policy { - Policy::new( - name.to_owned(), - "".to_owned(), - "".to_owned(), - Vec::new(), - Vec::new(), - ) + Policy::new(name.to_owned(), Vec::new(), Vec::new()) } #[test] diff --git a/src/service.rs b/src/service.rs index b63bb827..9e988a1a 100644 --- a/src/service.rs +++ b/src/service.rs @@ -1,8 +1,162 @@ +pub(crate) mod auth; +pub(crate) mod grpc_message; pub(crate) mod rate_limit; +use crate::configuration::{Action, Extension, ExtensionType, FailureMode}; +use crate::service::auth::{AUTH_METHOD_NAME, AUTH_SERVICE_NAME}; +use crate::service::grpc_message::GrpcMessageRequest; +use crate::service::rate_limit::{RATELIMIT_METHOD_NAME, RATELIMIT_SERVICE_NAME}; +use crate::service::TracingHeader::{Baggage, Traceparent, Tracestate}; use protobuf::Message; -use proxy_wasm::types::Status; +use proxy_wasm::types::{Bytes, MapType, Status}; +use std::cell::OnceCell; +use std::rc::Rc; +use std::time::Duration; -pub trait Service { - fn send(&self, message: M) -> Result; +#[derive(Default)] +pub struct GrpcService { + #[allow(dead_code)] + extension: Rc, + name: &'static str, + method: &'static str, +} + +impl GrpcService { + pub fn new(extension: Rc) -> Self { + match extension.extension_type { + ExtensionType::Auth => Self { + extension, + name: AUTH_SERVICE_NAME, + method: AUTH_METHOD_NAME, + }, + ExtensionType::RateLimit => Self { + extension, + name: RATELIMIT_SERVICE_NAME, + method: RATELIMIT_METHOD_NAME, + }, + } + } + + fn endpoint(&self) -> &str { + &self.extension.endpoint + } + fn name(&self) -> &str { + self.name + } + fn method(&self) -> &str { + self.method + } + #[allow(dead_code)] + pub fn failure_mode(&self) -> &FailureMode { + &self.extension.failure_mode + } +} + +pub type GrpcCallFn = fn( + upstream_name: &str, + service_name: &str, + method_name: &str, + initial_metadata: Vec<(&str, &[u8])>, + message: Option<&[u8]>, + timeout: Duration, +) -> Result; + +pub type GetMapValuesBytesFn = fn(map_type: MapType, key: &str) -> Result, Status>; + +pub type GrpcMessageBuildFn = + fn(extension_type: &ExtensionType, action: &Action) -> Option; + +pub struct GrpcServiceHandler { + service: Rc, + header_resolver: Rc, +} + +impl GrpcServiceHandler { + pub fn new(service: Rc, header_resolver: Rc) -> Self { + Self { + service, + header_resolver, + } + } + + pub fn send( + &self, + get_map_values_bytes_fn: GetMapValuesBytesFn, + grpc_call_fn: GrpcCallFn, + message: GrpcMessageRequest, + ) -> Result { + let msg = Message::write_to_bytes(&message).unwrap(); + let metadata = self + .header_resolver + .get(get_map_values_bytes_fn) + .iter() + .map(|(header, value)| (*header, value.as_slice())) + .collect(); + + grpc_call_fn( + self.service.endpoint(), + self.service.name(), + self.service.method(), + metadata, + Some(&msg), + Duration::from_secs(5), + ) + } + + pub fn get_extension(&self) -> Rc { + Rc::clone(&self.service.extension) + } +} + +pub struct HeaderResolver { + headers: OnceCell>, +} + +impl Default for HeaderResolver { + fn default() -> Self { + Self::new() + } +} + +impl HeaderResolver { + pub fn new() -> Self { + Self { + headers: OnceCell::new(), + } + } + + pub fn get(&self, get_map_values_bytes_fn: GetMapValuesBytesFn) -> &Vec<(&'static str, Bytes)> { + self.headers.get_or_init(|| { + let mut headers = Vec::new(); + for header in TracingHeader::all() { + if let Ok(Some(value)) = + get_map_values_bytes_fn(MapType::HttpRequestHeaders, (*header).as_str()) + { + headers.push(((*header).as_str(), value)); + } + } + headers + }) + } +} + +// tracing headers +pub enum TracingHeader { + Traceparent, + Tracestate, + Baggage, +} + +impl TracingHeader { + fn all() -> &'static [Self; 3] { + &[Traceparent, Tracestate, Baggage] + } + + pub fn as_str(&self) -> &'static str { + match self { + Traceparent => "traceparent", + Tracestate => "tracestate", + Baggage => "baggage", + } + } } diff --git a/src/service/auth.rs b/src/service/auth.rs new file mode 100644 index 00000000..b2261ac1 --- /dev/null +++ b/src/service/auth.rs @@ -0,0 +1,90 @@ +use crate::attribute::get_attribute; +use crate::envoy::{ + Address, AttributeContext, AttributeContext_HttpRequest, AttributeContext_Peer, + AttributeContext_Request, CheckRequest, Metadata, SocketAddress, +}; +use crate::service::grpc_message::{GrpcMessageResponse, GrpcMessageResult}; +use chrono::{DateTime, FixedOffset, Timelike}; +use protobuf::well_known_types::Timestamp; +use protobuf::Message; +use proxy_wasm::hostcalls; +use proxy_wasm::types::{Bytes, MapType}; +use std::collections::HashMap; + +pub const AUTH_SERVICE_NAME: &str = "envoy.service.auth.v3.Authorization"; +pub const AUTH_METHOD_NAME: &str = "Check"; + +pub struct AuthService; + +#[allow(dead_code)] +impl AuthService { + pub fn request_message(ce_host: String) -> CheckRequest { + AuthService::build_check_req(ce_host) + } + + pub fn response_message(res_body_bytes: &Bytes) -> GrpcMessageResult { + match Message::parse_from_bytes(res_body_bytes) { + Ok(res) => Ok(GrpcMessageResponse::Auth(res)), + Err(e) => Err(e), + } + } + + fn build_check_req(ce_host: String) -> CheckRequest { + let mut auth_req = CheckRequest::default(); + let mut attr = AttributeContext::default(); + attr.set_request(AuthService::build_request()); + attr.set_destination(AuthService::build_peer( + get_attribute::("destination.address").unwrap_or_default(), + get_attribute::("destination.port").unwrap_or_default() as u32, + )); + attr.set_source(AuthService::build_peer( + get_attribute::("source.address").unwrap_or_default(), + get_attribute::("source.port").unwrap_or_default() as u32, + )); + // the ce_host is the identifier for authorino to determine which authconfig to use + let context_extensions = HashMap::from([("host".to_string(), ce_host)]); + attr.set_context_extensions(context_extensions); + attr.set_metadata_context(Metadata::default()); + auth_req.set_attributes(attr); + auth_req + } + + fn build_request() -> AttributeContext_Request { + let mut request = AttributeContext_Request::default(); + let mut http = AttributeContext_HttpRequest::default(); + let headers: HashMap = hostcalls::get_map(MapType::HttpRequestHeaders) + .unwrap() + .into_iter() + .collect(); + + http.set_host(get_attribute::("request.host").unwrap_or_default()); + http.set_method(get_attribute::("request.method").unwrap_or_default()); + http.set_scheme(get_attribute::("request.scheme").unwrap_or_default()); + http.set_path(get_attribute::("request.path").unwrap_or_default()); + http.set_protocol(get_attribute::("request.protocol").unwrap_or_default()); + + http.set_headers(headers); + request.set_time(get_attribute("request.time").map_or( + Timestamp::new(), + |date_time: DateTime| Timestamp { + nanos: date_time.nanosecond() as i32, + seconds: date_time.second() as i64, + unknown_fields: Default::default(), + cached_size: Default::default(), + }, + )); + request.set_http(http); + request + } + + fn build_peer(host: String, port: u32) -> AttributeContext_Peer { + let mut peer = AttributeContext_Peer::default(); + let mut address = Address::default(); + let mut socket_address = SocketAddress::default(); + socket_address.set_address(host); + socket_address.set_port_value(port); + address.set_socket_address(socket_address); + peer.set_address(address); + peer + } +} diff --git a/src/service/grpc_message.rs b/src/service/grpc_message.rs new file mode 100644 index 00000000..6654c84a --- /dev/null +++ b/src/service/grpc_message.rs @@ -0,0 +1,265 @@ +use crate::configuration::{Action, ExtensionType}; +use crate::envoy::{CheckRequest, CheckResponse, RateLimitRequest, RateLimitResponse}; +use crate::service::auth::AuthService; +use crate::service::rate_limit::RateLimitService; +use log::debug; +use protobuf::reflect::MessageDescriptor; +use protobuf::{ + Clear, CodedInputStream, CodedOutputStream, Message, ProtobufError, ProtobufResult, + UnknownFields, +}; +use proxy_wasm::types::Bytes; +use std::any::Any; + +#[derive(Clone, Debug)] +pub enum GrpcMessageRequest { + Auth(CheckRequest), + RateLimit(RateLimitRequest), +} + +impl Default for GrpcMessageRequest { + fn default() -> Self { + GrpcMessageRequest::RateLimit(RateLimitRequest::new()) + } +} + +impl Clear for GrpcMessageRequest { + fn clear(&mut self) { + match self { + GrpcMessageRequest::Auth(msg) => msg.clear(), + GrpcMessageRequest::RateLimit(msg) => msg.clear(), + } + } +} + +impl Message for GrpcMessageRequest { + fn descriptor(&self) -> &'static MessageDescriptor { + match self { + GrpcMessageRequest::Auth(msg) => msg.descriptor(), + GrpcMessageRequest::RateLimit(msg) => msg.descriptor(), + } + } + + fn is_initialized(&self) -> bool { + match self { + GrpcMessageRequest::Auth(msg) => msg.is_initialized(), + GrpcMessageRequest::RateLimit(msg) => msg.is_initialized(), + } + } + + fn merge_from(&mut self, is: &mut CodedInputStream) -> ProtobufResult<()> { + match self { + GrpcMessageRequest::Auth(msg) => msg.merge_from(is), + GrpcMessageRequest::RateLimit(msg) => msg.merge_from(is), + } + } + + fn write_to_with_cached_sizes(&self, os: &mut CodedOutputStream) -> ProtobufResult<()> { + match self { + GrpcMessageRequest::Auth(msg) => msg.write_to_with_cached_sizes(os), + GrpcMessageRequest::RateLimit(msg) => msg.write_to_with_cached_sizes(os), + } + } + + fn write_to_bytes(&self) -> ProtobufResult> { + match self { + GrpcMessageRequest::Auth(msg) => msg.write_to_bytes(), + GrpcMessageRequest::RateLimit(msg) => msg.write_to_bytes(), + } + } + + fn compute_size(&self) -> u32 { + match self { + GrpcMessageRequest::Auth(msg) => msg.compute_size(), + GrpcMessageRequest::RateLimit(msg) => msg.compute_size(), + } + } + + fn get_cached_size(&self) -> u32 { + match self { + GrpcMessageRequest::Auth(msg) => msg.get_cached_size(), + GrpcMessageRequest::RateLimit(msg) => msg.get_cached_size(), + } + } + + fn get_unknown_fields(&self) -> &UnknownFields { + match self { + GrpcMessageRequest::Auth(msg) => msg.get_unknown_fields(), + GrpcMessageRequest::RateLimit(msg) => msg.get_unknown_fields(), + } + } + + fn mut_unknown_fields(&mut self) -> &mut UnknownFields { + match self { + GrpcMessageRequest::Auth(msg) => msg.mut_unknown_fields(), + GrpcMessageRequest::RateLimit(msg) => msg.mut_unknown_fields(), + } + } + + fn as_any(&self) -> &dyn Any { + match self { + GrpcMessageRequest::Auth(msg) => msg.as_any(), + GrpcMessageRequest::RateLimit(msg) => msg.as_any(), + } + } + + fn new() -> Self + where + Self: Sized, + { + // Returning default value + GrpcMessageRequest::default() + } + + fn default_instance() -> &'static Self + where + Self: Sized, + { + #[allow(non_upper_case_globals)] + static instance: ::protobuf::rt::LazyV2 = ::protobuf::rt::LazyV2::INIT; + instance.get(|| GrpcMessageRequest::RateLimit(RateLimitRequest::new())) + } +} + +impl GrpcMessageRequest { + // Using domain as ce_host for the time being, we might pass a DataType in the future. + pub fn new(extension_type: &ExtensionType, action: &Action) -> Option { + match extension_type { + ExtensionType::RateLimit => { + let descriptors = action.build_descriptors(); + if descriptors.is_empty() { + debug!("grpc_message_request: empty descriptors"); + None + } else { + Some(GrpcMessageRequest::RateLimit( + RateLimitService::request_message(action.scope.clone(), descriptors), + )) + } + } + ExtensionType::Auth => Some(GrpcMessageRequest::Auth(AuthService::request_message( + action.scope.clone(), + ))), + } + } +} + +#[derive(Clone, Debug)] +pub enum GrpcMessageResponse { + Auth(CheckResponse), + RateLimit(RateLimitResponse), +} + +impl Default for GrpcMessageResponse { + fn default() -> Self { + GrpcMessageResponse::RateLimit(RateLimitResponse::new()) + } +} + +impl Clear for GrpcMessageResponse { + fn clear(&mut self) { + todo!() + } +} + +impl Message for GrpcMessageResponse { + fn descriptor(&self) -> &'static MessageDescriptor { + match self { + GrpcMessageResponse::Auth(res) => res.descriptor(), + GrpcMessageResponse::RateLimit(res) => res.descriptor(), + } + } + + fn is_initialized(&self) -> bool { + match self { + GrpcMessageResponse::Auth(res) => res.is_initialized(), + GrpcMessageResponse::RateLimit(res) => res.is_initialized(), + } + } + + fn merge_from(&mut self, is: &mut CodedInputStream) -> ProtobufResult<()> { + match self { + GrpcMessageResponse::Auth(res) => res.merge_from(is), + GrpcMessageResponse::RateLimit(res) => res.merge_from(is), + } + } + + fn write_to_with_cached_sizes(&self, os: &mut CodedOutputStream) -> ProtobufResult<()> { + match self { + GrpcMessageResponse::Auth(res) => res.write_to_with_cached_sizes(os), + GrpcMessageResponse::RateLimit(res) => res.write_to_with_cached_sizes(os), + } + } + + fn write_to_bytes(&self) -> ProtobufResult> { + match self { + GrpcMessageResponse::Auth(res) => res.write_to_bytes(), + GrpcMessageResponse::RateLimit(res) => res.write_to_bytes(), + } + } + + fn compute_size(&self) -> u32 { + match self { + GrpcMessageResponse::Auth(res) => res.compute_size(), + GrpcMessageResponse::RateLimit(res) => res.compute_size(), + } + } + + fn get_cached_size(&self) -> u32 { + match self { + GrpcMessageResponse::Auth(res) => res.get_cached_size(), + GrpcMessageResponse::RateLimit(res) => res.get_cached_size(), + } + } + + fn get_unknown_fields(&self) -> &UnknownFields { + match self { + GrpcMessageResponse::Auth(res) => res.get_unknown_fields(), + GrpcMessageResponse::RateLimit(res) => res.get_unknown_fields(), + } + } + + fn mut_unknown_fields(&mut self) -> &mut UnknownFields { + match self { + GrpcMessageResponse::Auth(res) => res.mut_unknown_fields(), + GrpcMessageResponse::RateLimit(res) => res.mut_unknown_fields(), + } + } + + fn as_any(&self) -> &dyn Any { + match self { + GrpcMessageResponse::Auth(res) => res.as_any(), + GrpcMessageResponse::RateLimit(res) => res.as_any(), + } + } + + fn new() -> Self + where + Self: Sized, + { + // Returning default value + GrpcMessageResponse::default() + } + + fn default_instance() -> &'static Self + where + Self: Sized, + { + #[allow(non_upper_case_globals)] + static instance: ::protobuf::rt::LazyV2 = ::protobuf::rt::LazyV2::INIT; + instance.get(|| GrpcMessageResponse::RateLimit(RateLimitResponse::new())) + } +} + +impl GrpcMessageResponse { + pub fn new( + extension_type: &ExtensionType, + res_body_bytes: &Bytes, + ) -> GrpcMessageResult { + match extension_type { + ExtensionType::RateLimit => RateLimitService::response_message(res_body_bytes), + ExtensionType::Auth => AuthService::response_message(res_body_bytes), + } + } +} + +pub type GrpcMessageResult = Result; diff --git a/src/service/rate_limit.rs b/src/service/rate_limit.rs index 6c4726c5..4a81884a 100644 --- a/src/service/rate_limit.rs +++ b/src/service/rate_limit.rs @@ -1,26 +1,15 @@ use crate::envoy::{RateLimitDescriptor, RateLimitRequest}; -use crate::filter::http_context::TracingHeader; -use crate::service::Service; +use crate::service::grpc_message::{GrpcMessageResponse, GrpcMessageResult}; use protobuf::{Message, RepeatedField}; -use proxy_wasm::hostcalls::dispatch_grpc_call; -use proxy_wasm::types::{Bytes, Status}; -use std::time::Duration; +use proxy_wasm::types::Bytes; -const RATELIMIT_SERVICE_NAME: &str = "envoy.service.ratelimit.v3.RateLimitService"; -const RATELIMIT_METHOD_NAME: &str = "ShouldRateLimit"; -pub struct RateLimitService { - endpoint: String, - tracing_headers: Vec<(TracingHeader, Bytes)>, -} +pub const RATELIMIT_SERVICE_NAME: &str = "envoy.service.ratelimit.v3.RateLimitService"; +pub const RATELIMIT_METHOD_NAME: &str = "ShouldRateLimit"; + +pub struct RateLimitService; impl RateLimitService { - pub fn new(endpoint: &str, metadata: Vec<(TracingHeader, Bytes)>) -> RateLimitService { - Self { - endpoint: String::from(endpoint), - tracing_headers: metadata, - } - } - pub fn message( + pub fn request_message( domain: String, descriptors: RepeatedField, ) -> RateLimitRequest { @@ -32,34 +21,12 @@ impl RateLimitService { cached_size: Default::default(), } } -} - -fn grpc_call( - upstream_name: &str, - initial_metadata: Vec<(&str, &[u8])>, - message: RateLimitRequest, -) -> Result { - let msg = Message::write_to_bytes(&message).unwrap(); // TODO(didierofrivia): Error Handling - dispatch_grpc_call( - upstream_name, - RATELIMIT_SERVICE_NAME, - RATELIMIT_METHOD_NAME, - initial_metadata, - Some(&msg), - Duration::from_secs(5), - ) -} -impl Service for RateLimitService { - fn send(&self, message: RateLimitRequest) -> Result { - grpc_call( - self.endpoint.as_str(), - self.tracing_headers - .iter() - .map(|(header, value)| (header.as_str(), value.as_slice())) - .collect(), - message, - ) + pub fn response_message(res_body_bytes: &Bytes) -> GrpcMessageResult { + match Message::parse_from_bytes(res_body_bytes) { + Ok(res) => Ok(GrpcMessageResponse::RateLimit(res)), + Err(e) => Err(e), + } } } @@ -69,8 +36,6 @@ mod tests { use crate::service::rate_limit::RateLimitService; //use crate::service::Service; use protobuf::{CachedSize, RepeatedField, UnknownFields}; - //use proxy_wasm::types::Status; - //use crate::filter::http_context::{Filter}; fn build_message() -> RateLimitRequest { let domain = "rlp1"; @@ -81,7 +46,7 @@ mod tests { field.set_entries(RepeatedField::from_vec(vec![entry])); let descriptors = RepeatedField::from_vec(vec![field]); - RateLimitService::message(domain.to_string(), descriptors.clone()) + RateLimitService::request_message(domain.to_string(), descriptors.clone()) } #[test] fn builds_correct_message() { @@ -94,20 +59,4 @@ mod tests { assert_eq!(msg.unknown_fields, UnknownFields::default()); assert_eq!(msg.cached_size, CachedSize::default()); } - /*#[test] - fn sends_message() { - let msg = build_message(); - let metadata = vec![("header-1", "value-1".as_bytes())]; - let rls = RateLimitService::new("limitador-cluster", metadata); - - // TODO(didierofrivia): When we have a grpc response type, assert the async response - } - - fn grpc_call( - _upstream_name: &str, - _initial_metadata: Vec<(&str, &[u8])>, - _message: RateLimitRequest, - ) -> Result { - Ok(1) - } */ } diff --git a/tests/rate_limited.rs b/tests/rate_limited.rs index 44bfec3c..21ba4988 100644 --- a/tests/rate_limited.rs +++ b/tests/rate_limited.rs @@ -29,8 +29,8 @@ fn it_loads() { let root_context = 1; let cfg = r#"{ - "failureMode": "deny", - "rateLimitPolicies": [] + "extensions": {}, + "policies": [] }"#; module @@ -57,12 +57,6 @@ fn it_loads() { module .call_proxy_on_request_headers(http_context, 0, false) .expect_log(Some(LogLevel::Debug), Some("#2 on_http_request_headers")) - .expect_get_header_map_value(Some(MapType::HttpRequestHeaders), Some("traceparent")) - .returning(None) - .expect_get_header_map_value(Some(MapType::HttpRequestHeaders), Some("tracestate")) - .returning(None) - .expect_get_header_map_value(Some(MapType::HttpRequestHeaders), Some("baggage")) - .returning(None) .expect_get_header_map_value(Some(MapType::HttpRequestHeaders), Some(":authority")) .returning(Some("cars.toystore.com")) .expect_log( @@ -96,43 +90,51 @@ fn it_limits() { let root_context = 1; let cfg = r#"{ - "failureMode": "deny", - "rateLimitPolicies": [ + "extensions": { + "limitador": { + "type": "ratelimit", + "endpoint": "limitador-cluster", + "failureMode": "deny" + } + }, + "policies": [ { "name": "some-name", - "domain": "RLS-domain", - "service": "limitador-cluster", "hostnames": ["*.toystore.com", "example.com"], "rules": [ { "conditions": [ + { + "allOf": [ { - "allOf": [ - { - "selector": "request.url_path", - "operator": "startswith", - "value": "/admin/toy" - }, - { - "selector": "request.host", - "operator": "eq", - "value": "cars.toystore.com" - }, + "selector": "request.url_path", + "operator": "startswith", + "value": "/admin/toy" + }, + { + "selector": "request.host", + "operator": "eq", + "value": "cars.toystore.com" + }, + { + "selector": "request.method", + "operator": "eq", + "value": "POST" + }] + }], + "actions": [ + { + "extension": "limitador", + "scope": "RLS-domain", + "data": [ { - "selector": "request.method", - "operator": "eq", - "value": "POST" - }] - } - ], - "data": [ - { - "static": { - "key": "admin", - "value": "1" - } - } - ] + "static": { + "key": "admin", + "value": "1" + } + } + ] + }] }] }] }"#; @@ -161,36 +163,33 @@ fn it_limits() { module .call_proxy_on_request_headers(http_context, 0, false) .expect_log(Some(LogLevel::Debug), Some("#2 on_http_request_headers")) - .expect_get_header_map_value(Some(MapType::HttpRequestHeaders), Some("traceparent")) - .returning(None) - .expect_get_header_map_value(Some(MapType::HttpRequestHeaders), Some("tracestate")) - .returning(None) - .expect_get_header_map_value(Some(MapType::HttpRequestHeaders), Some("baggage")) - .returning(None) .expect_get_header_map_value(Some(MapType::HttpRequestHeaders), Some(":authority")) .returning(Some("cars.toystore.com")) - .expect_get_property(Some(vec!["request", "url_path"])) - .returning(Some("/admin/toy".as_bytes())) - .expect_get_property(Some(vec!["request", "host"])) - .returning(Some("cars.toystore.com".as_bytes())) - .expect_get_property(Some(vec!["request", "method"])) - .returning(Some("POST".as_bytes())) - .expect_log( - Some(LogLevel::Debug), - Some("#2 ratelimitpolicy selected some-name"), - ) + .expect_log(Some(LogLevel::Debug), Some("#2 policy selected some-name")) .expect_log( Some(LogLevel::Debug), - Some("#2 get_property: selector: request.url_path path: [\"request\", \"url_path\"]"), + Some("get_property: selector: request.url_path path: [\"request\", \"url_path\"]"), ) + .expect_get_property(Some(vec!["request", "url_path"])) + .returning(Some("/admin/toy".as_bytes())) .expect_log( Some(LogLevel::Debug), - Some("#2 get_property: selector: request.host path: [\"request\", \"host\"]"), + Some("get_property: selector: request.host path: [\"request\", \"host\"]"), ) + .expect_get_property(Some(vec!["request", "host"])) + .returning(Some("cars.toystore.com".as_bytes())) .expect_log( Some(LogLevel::Debug), - Some("#2 get_property: selector: request.method path: [\"request\", \"method\"]"), + Some("get_property: selector: request.method path: [\"request\", \"method\"]"), ) + .expect_get_property(Some(vec!["request", "method"])) + .returning(Some("POST".as_bytes())) + .expect_get_header_map_value(Some(MapType::HttpRequestHeaders), Some("traceparent")) + .returning(None) + .expect_get_header_map_value(Some(MapType::HttpRequestHeaders), Some("tracestate")) + .returning(None) + .expect_get_header_map_value(Some(MapType::HttpRequestHeaders), Some("baggage")) + .returning(None) .expect_grpc_call( Some("limitador-cluster"), Some("envoy.service.ratelimit.v3.RateLimitService"), @@ -205,7 +204,7 @@ fn it_limits() { .returning(Some(42)) .expect_log( Some(LogLevel::Debug), - Some("#2 initiated gRPC call (id# 42) to Limitador"), + Some("#2 initiated gRPC call (id# 42)"), ) .execute_and_expect(ReturnType::Action(Action::Pause)) .unwrap(); @@ -246,43 +245,51 @@ fn it_passes_additional_headers() { let root_context = 1; let cfg = r#"{ - "failureMode": "deny", - "rateLimitPolicies": [ + "extensions": { + "limitador": { + "type": "ratelimit", + "endpoint": "limitador-cluster", + "failureMode": "deny" + } + }, + "policies": [ { "name": "some-name", - "domain": "RLS-domain", - "service": "limitador-cluster", "hostnames": ["*.toystore.com", "example.com"], "rules": [ { "conditions": [ + { + "allOf": [ { - "allOf": [ - { - "selector": "request.url_path", - "operator": "startswith", - "value": "/admin/toy" - }, - { - "selector": "request.host", - "operator": "eq", - "value": "cars.toystore.com" - }, + "selector": "request.url_path", + "operator": "startswith", + "value": "/admin/toy" + }, + { + "selector": "request.host", + "operator": "eq", + "value": "cars.toystore.com" + }, + { + "selector": "request.method", + "operator": "eq", + "value": "POST" + }] + }], + "actions": [ + { + "extension": "limitador", + "scope": "RLS-domain", + "data": [ { - "selector": "request.method", - "operator": "eq", - "value": "POST" - }] - } - ], - "data": [ - { - "static": { - "key": "admin", - "value": "1" - } - } - ] + "static": { + "key": "admin", + "value": "1" + } + } + ] + }] }] }] }"#; @@ -311,36 +318,33 @@ fn it_passes_additional_headers() { module .call_proxy_on_request_headers(http_context, 0, false) .expect_log(Some(LogLevel::Debug), Some("#2 on_http_request_headers")) - .expect_get_header_map_value(Some(MapType::HttpRequestHeaders), Some("traceparent")) - .returning(None) - .expect_get_header_map_value(Some(MapType::HttpRequestHeaders), Some("tracestate")) - .returning(None) - .expect_get_header_map_value(Some(MapType::HttpRequestHeaders), Some("baggage")) - .returning(None) .expect_get_header_map_value(Some(MapType::HttpRequestHeaders), Some(":authority")) .returning(Some("cars.toystore.com")) - .expect_get_property(Some(vec!["request", "url_path"])) - .returning(Some("/admin/toy".as_bytes())) - .expect_get_property(Some(vec!["request", "host"])) - .returning(Some("cars.toystore.com".as_bytes())) - .expect_get_property(Some(vec!["request", "method"])) - .returning(Some("POST".as_bytes())) + .expect_log(Some(LogLevel::Debug), Some("#2 policy selected some-name")) .expect_log( Some(LogLevel::Debug), - Some("#2 ratelimitpolicy selected some-name"), - ) - .expect_log( - Some(LogLevel::Debug), - Some("#2 get_property: selector: request.url_path path: [\"request\", \"url_path\"]"), + Some("get_property: selector: request.url_path path: [\"request\", \"url_path\"]"), ) + .expect_get_property(Some(vec!["request", "url_path"])) + .returning(Some("/admin/toy".as_bytes())) .expect_log( Some(LogLevel::Debug), - Some("#2 get_property: selector: request.host path: [\"request\", \"host\"]"), + Some("get_property: selector: request.host path: [\"request\", \"host\"]"), ) + .expect_get_property(Some(vec!["request", "host"])) + .returning(Some("cars.toystore.com".as_bytes())) .expect_log( Some(LogLevel::Debug), - Some("#2 get_property: selector: request.method path: [\"request\", \"method\"]"), + Some("get_property: selector: request.method path: [\"request\", \"method\"]"), ) + .expect_get_property(Some(vec!["request", "method"])) + .returning(Some("POST".as_bytes())) + .expect_get_header_map_value(Some(MapType::HttpRequestHeaders), Some("traceparent")) + .returning(None) + .expect_get_header_map_value(Some(MapType::HttpRequestHeaders), Some("tracestate")) + .returning(None) + .expect_get_header_map_value(Some(MapType::HttpRequestHeaders), Some("baggage")) + .returning(None) .expect_grpc_call( Some("limitador-cluster"), Some("envoy.service.ratelimit.v3.RateLimitService"), @@ -355,7 +359,7 @@ fn it_passes_additional_headers() { .returning(Some(42)) .expect_log( Some(LogLevel::Debug), - Some("#2 initiated gRPC call (id# 42) to Limitador"), + Some("#2 initiated gRPC call (id# 42)"), ) .execute_and_expect(ReturnType::Action(Action::Pause)) .unwrap(); @@ -410,23 +414,32 @@ fn it_rate_limits_with_empty_conditions() { let root_context = 1; let cfg = r#"{ - "failureMode": "deny", - "rateLimitPolicies": [ + "extensions": { + "limitador": { + "type": "ratelimit", + "endpoint": "limitador-cluster", + "failureMode": "deny" + } + }, + "policies": [ { "name": "some-name", - "domain": "RLS-domain", - "service": "limitador-cluster", "hostnames": ["*.com"], "rules": [ { - "data": [ - { - "static": { - "key": "admin", - "value": "1" - } - } - ] + "actions": [ + { + "extension": "limitador", + "scope": "RLS-domain", + "data": [ + { + "static": { + "key": "admin", + "value": "1" + } + } + ] + }] }] }] }"#; @@ -455,18 +468,15 @@ fn it_rate_limits_with_empty_conditions() { module .call_proxy_on_request_headers(http_context, 0, false) .expect_log(Some(LogLevel::Debug), Some("#2 on_http_request_headers")) + .expect_get_header_map_value(Some(MapType::HttpRequestHeaders), Some(":authority")) + .returning(Some("a.com")) + .expect_log(Some(LogLevel::Debug), Some("#2 policy selected some-name")) .expect_get_header_map_value(Some(MapType::HttpRequestHeaders), Some("traceparent")) .returning(None) .expect_get_header_map_value(Some(MapType::HttpRequestHeaders), Some("tracestate")) .returning(None) .expect_get_header_map_value(Some(MapType::HttpRequestHeaders), Some("baggage")) .returning(None) - .expect_get_header_map_value(Some(MapType::HttpRequestHeaders), Some(":authority")) - .returning(Some("a.com")) - .expect_log( - Some(LogLevel::Debug), - Some("#2 ratelimitpolicy selected some-name"), - ) .expect_grpc_call( Some("limitador-cluster"), Some("envoy.service.ratelimit.v3.RateLimitService"), @@ -481,7 +491,7 @@ fn it_rate_limits_with_empty_conditions() { .returning(Some(42)) .expect_log( Some(LogLevel::Debug), - Some("#2 initiated gRPC call (id# 42) to Limitador"), + Some("#2 initiated gRPC call (id# 42)"), ) .execute_and_expect(ReturnType::Action(Action::Pause)) .unwrap(); @@ -522,22 +532,31 @@ fn it_does_not_rate_limits_when_selector_does_not_exist_and_misses_default_value let root_context = 1; let cfg = r#"{ - "failureMode": "deny", - "rateLimitPolicies": [ + "extensions": { + "limitador": { + "type": "ratelimit", + "endpoint": "limitador-cluster", + "failureMode": "deny" + } + }, + "policies": [ { "name": "some-name", - "domain": "RLS-domain", - "service": "limitador-cluster", "hostnames": ["*.com"], "rules": [ { - "data": [ + "actions": [ { - "selector": { - "selector": "unknown.path" - } - } - ] + "extension": "limitador", + "scope": "RLS-domain", + "data": [ + { + "selector": { + "selector": "unknown.path" + } + } + ] + }] }] }] }"#; @@ -566,31 +585,25 @@ fn it_does_not_rate_limits_when_selector_does_not_exist_and_misses_default_value module .call_proxy_on_request_headers(http_context, 0, false) .expect_log(Some(LogLevel::Debug), Some("#2 on_http_request_headers")) - .expect_get_header_map_value(Some(MapType::HttpRequestHeaders), Some("traceparent")) - .returning(None) - .expect_get_header_map_value(Some(MapType::HttpRequestHeaders), Some("tracestate")) - .returning(None) - .expect_get_header_map_value(Some(MapType::HttpRequestHeaders), Some("baggage")) - .returning(None) .expect_get_header_map_value(Some(MapType::HttpRequestHeaders), Some(":authority")) .returning(Some("a.com")) - .expect_get_property(Some(vec!["unknown", "path"])) - .returning(None) .expect_log( Some(LogLevel::Debug), - Some("#2 ratelimitpolicy selected some-name"), + Some("#2 policy selected some-name"), ) .expect_log( Some(LogLevel::Debug), - Some("#2 get_property: selector: unknown.path path: Path { tokens: [\"unknown\", \"path\"] }"), + Some("get_property: selector: unknown.path path: Path { tokens: [\"unknown\", \"path\"] }"), ) + .expect_get_property(Some(vec!["unknown", "path"])) + .returning(None) .expect_log( Some(LogLevel::Debug), - Some("#2 build_single_descriptor: selector not found: unknown.path"), + Some("build_single_descriptor: selector not found: unknown.path"), ) .expect_log( Some(LogLevel::Debug), - Some("#2 process_rate_limit_policy: empty descriptors"), + Some("grpc_message_request: empty descriptors"), ) .execute_and_expect(ReturnType::Action(Action::Continue)) .unwrap(); diff --git a/utils/deploy/authconfig.yaml b/utils/deploy/authconfig.yaml new file mode 100644 index 00000000..74efaef6 --- /dev/null +++ b/utils/deploy/authconfig.yaml @@ -0,0 +1,51 @@ +--- +apiVersion: authorino.kuadrant.io/v1beta2 +kind: AuthConfig +metadata: + name: talker-api-protection +spec: + hosts: + - effective-route-1 + authentication: + "api-key-users": + apiKey: + selector: + matchLabels: + app: toystore + credentials: + authorizationHeader: + prefix: APIKEY + response: + success: + dynamicMetadata: + "identity": + json: + properties: + "userid": + selector: auth.identity.metadata.annotations.secret\.kuadrant\.io/user-id +--- +apiVersion: v1 +kind: Secret +metadata: + name: bob-key + labels: + authorino.kuadrant.io/managed-by: authorino + app: toystore + annotations: + secret.kuadrant.io/user-id: bob +stringData: + api_key: "IAMBOB" +type: Opaque +--- +apiVersion: v1 +kind: Secret +metadata: + name: alice-key + labels: + authorino.kuadrant.io/managed-by: authorino + app: toystore + annotations: + secret.kuadrant.io/user-id: alice +stringData: + api_key: "IAMALICE" +type: Opaque diff --git a/utils/deploy/envoy-notls.yaml b/utils/deploy/envoy-notls.yaml new file mode 100644 index 00000000..d116429b --- /dev/null +++ b/utils/deploy/envoy-notls.yaml @@ -0,0 +1,438 @@ +apiVersion: v1 +kind: ConfigMap +metadata: + labels: + app: envoy + name: envoy +data: + envoy.yaml: | + static_resources: + clusters: + - name: authorino_wasm + connect_timeout: 1s + type: STRICT_DNS + lb_policy: ROUND_ROBIN + typed_extension_protocol_options: + envoy.extensions.upstreams.http.v3.HttpProtocolOptions: + "@type": type.googleapis.com/envoy.extensions.upstreams.http.v3.HttpProtocolOptions + explicit_http_config: + http2_protocol_options: { } + load_assignment: + cluster_name: authorino_wasm + endpoints: + - lb_endpoints: + - endpoint: + address: + socket_address: + address: authorino-authorino-authorization + port_value: 50051 + - name: limitador + connect_timeout: 1s + type: STRICT_DNS + lb_policy: ROUND_ROBIN + typed_extension_protocol_options: + envoy.extensions.upstreams.http.v3.HttpProtocolOptions: + "@type": type.googleapis.com/envoy.extensions.upstreams.http.v3.HttpProtocolOptions + explicit_http_config: + http2_protocol_options: { } + load_assignment: + cluster_name: limitador + endpoints: + - lb_endpoints: + - endpoint: + address: + socket_address: + address: limitador + port_value: 8081 + - name: talker-api + connect_timeout: 0.25s + type: STRICT_DNS + lb_policy: ROUND_ROBIN + load_assignment: + cluster_name: talker-api + endpoints: + - lb_endpoints: + - endpoint: + address: + socket_address: + address: talker-api + port_value: 3000 + - name: talker-web + connect_timeout: 0.25s + type: STRICT_DNS + lb_policy: ROUND_ROBIN + load_assignment: + cluster_name: talker-web + endpoints: + - lb_endpoints: + - endpoint: + address: + socket_address: + address: talker-web + port_value: 8888 + - name: opentelemetry + connect_timeout: 0.25s + type: STRICT_DNS + lb_policy: ROUND_ROBIN + typed_extension_protocol_options: + envoy.extensions.upstreams.http.v3.HttpProtocolOptions: + "@type": type.googleapis.com/envoy.extensions.upstreams.http.v3.HttpProtocolOptions + explicit_http_config: + http2_protocol_options: { } + load_assignment: + cluster_name: opentelemetry + endpoints: + - lb_endpoints: + - endpoint: + address: + socket_address: + address: otel-collector + port_value: 4317 + listeners: + - address: + socket_address: + address: 0.0.0.0 + port_value: 8000 + filter_chains: + - filters: + - name: envoy.filters.network.http_connection_manager + typed_config: + "@type": type.googleapis.com/envoy.extensions.filters.network.http_connection_manager.v3.HttpConnectionManager + stat_prefix: local + route_config: + name: local_route + virtual_hosts: + - name: local_service + domains: [ '*' ] + routes: + - match: { prefix: / } + route: + cluster: talker-api + http_filters: + - name: envoy.filters.http.header_to_metadata + typed_config: + "@type": type.googleapis.com/envoy.extensions.filters.http.header_to_metadata.v3.Config + request_rules: + - header: x-dyn-user-id + on_header_present: + key: user_id + type: STRING + remove: false + - name: envoy.filters.http.wasm + typed_config: + "@type": type.googleapis.com/envoy.extensions.filters.http.wasm.v3.Wasm + config: + name: kuadrant_wasm + root_id: kuadrant_wasm + vm_config: + vm_id: vm.sentinel.kuadrant_wasm + runtime: envoy.wasm.runtime.v8 + code: + local: + filename: /opt/kuadrant/wasm/wasm_shim.wasm + allow_precompiled: true + configuration: + "@type": "type.googleapis.com/google.protobuf.StringValue" + value: > + { + "extensions": { + "authorino": { + "type": "auth", + "endpoint": "authorino_wasm", + "failureMode": "deny" + }, + "limitador": { + "type": "ratelimit", + "endpoint": "limitador", + "failureMode": "deny" + } + }, + "policies": [ + { + "name": "auth-ns-A/auth-name-A", + "hostnames": [ + "*.a.auth.com" + ], + "rules": [ + { + "conditions": [ + { + "allOf": [ + { + "selector": "request.path", + "operator": "eq", + "value": "/get" + } + ] + } + ], + "actions": [ + { + "extension": "authorino", + "scope": "effective-route-1", + "data": [] + } + ] + } + ] + }, + { + "name": "rlp-ns-A/rlp-name-A", + "hostnames": [ + "*.a.rlp.com" + ], + "rules": [ + { + "actions": [ + { + "extension": "limitador", + "scope": "rlp-ns-A/rlp-name-A", + "data": [ + { + "selector": { + "selector": "unknown.path" + } + } + ] + } + ] + } + ] + }, + { + "name": "rlp-ns-B/rlp-name-B", + "hostnames": [ + "*.b.rlp.com" + ], + "rules": [ + { + "conditions": [ + { + "allOf": [ + { + "selector": "request.url_path", + "operator": "startswith", + "value": "/unknown-path" + } + ] + } + ], + "actions": [ + { + "extension": "limitador", + "scope": "rlp-ns-B/rlp-name-B", + "data": [ + { + "static": { + "key": "rlp-ns-B/rlp-name-B/limit-not-to-be-activated", + "value": "1" + } + } + ] + } + ] + } + ] + }, + { + "name": "rlp-ns-C/rlp-name-C", + "hostnames": [ + "*.c.rlp.com" + ], + "rules": [ + { + "conditions": [ + { + "allOf": [ + { + "selector": "request.url_path", + "operator": "startswith", + "value": "/get" + }, + { + "selector": "request.host", + "operator": "eq", + "value": "test.c.rlp.com" + }, + { + "selector": "request.method", + "operator": "eq", + "value": "GET" + } + ] + } + ], + "actions": [ + { + "extension": "limitador", + "scope": "rlp-ns-C/rlp-name-C", + "data": [ + { + "static": { + "key": "limit_to_be_activated", + "value": "1" + } + }, + { + "selector": { + "selector": "source.address" + } + }, + { + "selector": { + "selector": "request.headers.My-Custom-Header-01" + } + }, + { + "selector": { + "selector": "metadata.filter_metadata.envoy\\.filters\\.http\\.header_to_metadata.user_id", + "key": "user_id" + } + } + ] + } + ] + } + ] + }, + { + "name": "multi-ns-A/multi-name-A", + "hostnames": [ + "*.a.multi.com" + ], + "rules": [ + { + "conditions": [ + { + "allOf": [ + { + "selector": "request.path", + "operator": "eq", + "value": "/get" + } + ] + } + ], + "actions": [ + { + "extension": "authorino", + "scope": "effective-route-1", + "data": [] + }, + { + "extension": "limitador", + "scope": "multi-ns-A/multi-name-A", + "data": [ + { + "selector": { + "selector": "filter_state.wasm\\.kuadrant\\.identity\\.userid", + "key": "user_id" + } + } + ] + } + ] + } + ] + } + ] + } + - name: envoy.filters.http.router + typed_config: + "@type": type.googleapis.com/envoy.extensions.filters.http.router.v3.Router + + # # Uncomment to enable tracing + # tracing: + # provider: + # name: envoy.tracers.opentelemetry + # typed_config: + # "@type": type.googleapis.com/envoy.config.trace.v3.OpenTelemetryConfig + # grpc_service: + # envoy_grpc: + # cluster_name: opentelemetry + # timeout: 1s + # service_name: envoy + admin: + address: + socket_address: + address: 0.0.0.0 + port_value: 8001 +--- +apiVersion: apps/v1 +kind: Deployment +metadata: + labels: + app: envoy + name: envoy +spec: + replicas: 1 + selector: + matchLabels: + app: envoy + template: + metadata: + labels: + app: envoy + spec: + containers: + - args: + - --config-path /usr/local/etc/envoy/envoy.yaml + - --service-cluster front-proxy + - --log-level info + - --component-log-level wasm:debug,filter:trace,http:debug,router:debug + command: + - /usr/local/bin/envoy + image: envoyproxy/envoy:v1.31-latest + name: envoy + ports: + - containerPort: 8000 + name: web + - containerPort: 8001 + name: admin + volumeMounts: + - mountPath: /usr/local/etc/envoy + name: config + readOnly: true + - mountPath: /opt/kuadrant/wasm + name: wasm + volumes: + - configMap: + items: + - key: envoy.yaml + path: envoy.yaml + name: envoy + name: config + - name: wasm + hostPath: + path: /opt/kuadrant/wasm +--- +apiVersion: v1 +kind: Service +metadata: + labels: + app: envoy + name: envoy +spec: + ports: + - name: web + port: 8000 + protocol: TCP + selector: + app: envoy +--- +apiVersion: networking.k8s.io/v1 +kind: Ingress +metadata: + name: ingress-wildcard-host +spec: + rules: + - host: talker-api.127.0.0.1.nip.io + http: + paths: + - backend: + service: + name: envoy + port: + number: 8000 + path: / + pathType: Prefix diff --git a/utils/deploy/envoy-tls.yaml b/utils/deploy/envoy-tls.yaml new file mode 100644 index 00000000..2e46c9b5 --- /dev/null +++ b/utils/deploy/envoy-tls.yaml @@ -0,0 +1,455 @@ +apiVersion: v1 +kind: ConfigMap +metadata: + labels: + app: envoy + name: envoy +data: + envoy.yaml: | + static_resources: + clusters: + - name: authorino_wasm + connect_timeout: 1s + type: STRICT_DNS + lb_policy: ROUND_ROBIN + typed_extension_protocol_options: + envoy.extensions.upstreams.http.v3.HttpProtocolOptions: + "@type": type.googleapis.com/envoy.extensions.upstreams.http.v3.HttpProtocolOptions + explicit_http_config: + http2_protocol_options: { } + load_assignment: + cluster_name: authorino_wasm + endpoints: + - lb_endpoints: + - endpoint: + address: + socket_address: + address: authorino-authorino-authorization + port_value: 50051 + transport_socket: + name: envoy.transport_sockets.tls + typed_config: + "@type": type.googleapis.com/envoy.extensions.transport_sockets.tls.v3.UpstreamTlsContext + common_tls_context: + validation_context: + trusted_ca: + filename: /etc/ssl/certs/authorino-ca-cert.crt + - name: limitador + connect_timeout: 1s + type: STRICT_DNS + lb_policy: ROUND_ROBIN + typed_extension_protocol_options: + envoy.extensions.upstreams.http.v3.HttpProtocolOptions: + "@type": type.googleapis.com/envoy.extensions.upstreams.http.v3.HttpProtocolOptions + explicit_http_config: + http2_protocol_options: { } + load_assignment: + cluster_name: limitador + endpoints: + - lb_endpoints: + - endpoint: + address: + socket_address: + address: limitador + port_value: 8081 + - name: talker-api + connect_timeout: 0.25s + type: STRICT_DNS + lb_policy: ROUND_ROBIN + load_assignment: + cluster_name: talker-api + endpoints: + - lb_endpoints: + - endpoint: + address: + socket_address: + address: talker-api + port_value: 3000 + - name: talker-web + connect_timeout: 0.25s + type: STRICT_DNS + lb_policy: ROUND_ROBIN + load_assignment: + cluster_name: talker-web + endpoints: + - lb_endpoints: + - endpoint: + address: + socket_address: + address: talker-web + port_value: 8888 + - name: opentelemetry + connect_timeout: 0.25s + type: STRICT_DNS + lb_policy: ROUND_ROBIN + typed_extension_protocol_options: + envoy.extensions.upstreams.http.v3.HttpProtocolOptions: + "@type": type.googleapis.com/envoy.extensions.upstreams.http.v3.HttpProtocolOptions + explicit_http_config: + http2_protocol_options: { } + load_assignment: + cluster_name: opentelemetry + endpoints: + - lb_endpoints: + - endpoint: + address: + socket_address: + address: otel-collector + port_value: 4317 + listeners: + - address: + socket_address: + address: 0.0.0.0 + port_value: 8000 + filter_chains: + - filters: + - name: envoy.filters.network.http_connection_manager + typed_config: + "@type": type.googleapis.com/envoy.extensions.filters.network.http_connection_manager.v3.HttpConnectionManager + stat_prefix: local + use_remote_address: true + route_config: + name: local_route + virtual_hosts: + - name: local_service + domains: [ '*' ] + routes: + - match: { prefix: / } + route: + cluster: talker-api + http_filters: + - name: envoy.filters.http.header_to_metadata + typed_config: + "@type": type.googleapis.com/envoy.extensions.filters.http.header_to_metadata.v3.Config + request_rules: + - header: x-dyn-user-id + on_header_present: + key: user_id + type: STRING + remove: false + - name: envoy.filters.http.wasm + typed_config: + "@type": type.googleapis.com/envoy.extensions.filters.http.wasm.v3.Wasm + config: + name: kuadrant_wasm + root_id: kuadrant_wasm + vm_config: + vm_id: vm.sentinel.kuadrant_wasm + runtime: envoy.wasm.runtime.v8 + code: + local: + filename: /opt/kuadrant/wasm/wasm_shim.wasm + allow_precompiled: true + configuration: + "@type": "type.googleapis.com/google.protobuf.StringValue" + value: > + { + "extensions": { + "authorino": { + "type": "auth", + "endpoint": "authorino_wasm", + "failureMode": "deny" + }, + "limitador": { + "type": "ratelimit", + "endpoint": "limitador", + "failureMode": "deny" + } + }, + "policies": [ + { + "name": "auth-ns-A/auth-name-A", + "hostnames": [ + "*.a.auth.com" + ], + "rules": [ + { + "conditions": [ + { + "allOf": [ + { + "selector": "request.path", + "operator": "eq", + "value": "/get" + } + ] + } + ], + "actions": [ + { + "extension": "authorino", + "scope": "effective-route-1", + "data": [] + } + ] + } + ] + }, + { + "name": "rlp-ns-A/rlp-name-A", + "hostnames": [ + "*.a.rlp.com" + ], + "rules": [ + { + "actions": [ + { + "extension": "limitador", + "scope": "rlp-ns-A/rlp-name-A", + "data": [ + { + "selector": { + "selector": "unknown.path" + } + } + ] + } + ] + } + ] + }, + { + "name": "rlp-ns-B/rlp-name-B", + "hostnames": [ + "*.b.rlp.com" + ], + "rules": [ + { + "conditions": [ + { + "allOf": [ + { + "selector": "request.url_path", + "operator": "startswith", + "value": "/unknown-path" + } + ] + } + ], + "actions": [ + { + "extension": "limitador", + "scope": "rlp-ns-B/rlp-name-B", + "data": [ + { + "static": { + "key": "rlp-ns-B/rlp-name-B/limit-not-to-be-activated", + "value": "1" + } + } + ] + } + ] + } + ] + }, + { + "name": "rlp-ns-C/rlp-name-C", + "hostnames": [ + "*.c.rlp.com" + ], + "rules": [ + { + "conditions": [ + { + "allOf": [ + { + "selector": "request.url_path", + "operator": "startswith", + "value": "/get" + }, + { + "selector": "request.host", + "operator": "eq", + "value": "test.c.rlp.com" + }, + { + "selector": "request.method", + "operator": "eq", + "value": "GET" + } + ] + } + ], + "actions": [ + { + "extension": "limitador", + "scope": "rlp-ns-C/rlp-name-C", + "data": [ + { + "static": { + "key": "limit_to_be_activated", + "value": "1" + } + }, + { + "selector": { + "selector": "source.address" + } + }, + { + "selector": { + "selector": "request.headers.My-Custom-Header-01" + } + }, + { + "selector": { + "selector": "metadata.filter_metadata.envoy\\.filters\\.http\\.header_to_metadata.user_id", + "key": "user_id" + } + } + ] + } + ] + } + ] + }, + { + "name": "multi-ns-A/multi-name-A", + "hostnames": [ + "*.a.multi.com" + ], + "rules": [ + { + "conditions": [ + { + "allOf": [ + { + "selector": "request.path", + "operator": "eq", + "value": "/get" + } + ] + } + ], + "actions": [ + { + "extension": "authorino", + "scope": "effective-route-1", + "data": [] + }, + { + "extension": "limitador", + "scope": "multi-ns-A/multi-name-A", + "data": [ + { + "selector": { + "selector": "filter_state.wasm\\.kuadrant\\.identity\\.userid", + "key": "user_id" + } + } + ] + } + ] + } + ] + } + ] + } + - name: envoy.filters.http.router + typed_config: + "@type": type.googleapis.com/envoy.extensions.filters.http.router.v3.Router + + # # Uncomment to enable tracing + # tracing: + # provider: + # name: envoy.tracers.opentelemetry + # typed_config: + # "@type": type.googleapis.com/envoy.config.trace.v3.OpenTelemetryConfig + # grpc_service: + # envoy_grpc: + # cluster_name: opentelemetry + # timeout: 1s + # service_name: envoy + admin: + address: + socket_address: + address: 0.0.0.0 + port_value: 8001 +--- +apiVersion: apps/v1 +kind: Deployment +metadata: + labels: + app: envoy + name: envoy +spec: + replicas: 1 + selector: + matchLabels: + app: envoy + template: + metadata: + labels: + app: envoy + spec: + containers: + - args: + - --config-path /usr/local/etc/envoy/envoy.yaml + - --service-cluster front-proxy + - --log-level info + - --component-log-level wasm:debug,filter:trace,http:debug,router:debug + command: + - /usr/local/bin/envoy + image: envoyproxy/envoy:v1.31-latest + name: envoy + ports: + - containerPort: 8000 + name: web + - containerPort: 8001 + name: admin + volumeMounts: + - mountPath: /usr/local/etc/envoy + name: config + readOnly: true + - mountPath: /etc/ssl/certs/authorino-ca-cert.crt + name: authorino-ca-cert + readOnly: true + subPath: ca.crt + - mountPath: /opt/kuadrant/wasm + name: wasm + volumes: + - configMap: + items: + - key: envoy.yaml + path: envoy.yaml + name: envoy + name: config + - name: authorino-ca-cert + secret: + defaultMode: 420 + secretName: authorino-ca-cert + - name: wasm + hostPath: + path: /opt/kuadrant/wasm +--- +apiVersion: v1 +kind: Service +metadata: + labels: + app: envoy + name: envoy +spec: + ports: + - name: web + port: 8000 + protocol: TCP + selector: + app: envoy +--- +apiVersion: networking.k8s.io/v1 +kind: Ingress +metadata: + name: ingress-wildcard-host +spec: + rules: + - host: talker-api.127.0.0.1.nip.io + http: + paths: + - backend: + service: + name: envoy + port: + number: 8000 + path: / + pathType: Prefix diff --git a/utils/deploy/limitador.yaml b/utils/deploy/limitador.yaml new file mode 100644 index 00000000..51f231e9 --- /dev/null +++ b/utils/deploy/limitador.yaml @@ -0,0 +1,57 @@ +--- +apiVersion: apps/v1 +kind: Deployment +metadata: + labels: + app: limitador + name: limitador +spec: + replicas: 1 + selector: + matchLabels: + app: limitador + template: + metadata: + labels: + app: limitador + spec: + containers: + - args: + - -vvv + - /opt/kuadrant/limits/limits.yaml + command: + - limitador-server + image: quay.io/kuadrant/limitador:latest + name: limitador + ports: + - containerPort: 8080 + name: http + - containerPort: 8081 + name: grpc + volumeMounts: + - mountPath: /opt/kuadrant/limits + name: limits + volumes: + - configMap: + items: + - key: limits.yaml + path: limits.yaml + name: limits + name: limits +--- +apiVersion: v1 +kind: Service +metadata: + labels: + app: limitador + name: limitador +spec: + ports: + - name: http + port: 8080 + protocol: TCP + - name: grpc + port: 8081 + protocol: TCP + selector: + app: limitador diff --git a/utils/docker-compose/limits.yaml b/utils/docker-compose/limits.yaml index 31a68f61..f2f447bb 100644 --- a/utils/docker-compose/limits.yaml +++ b/utils/docker-compose/limits.yaml @@ -6,3 +6,15 @@ - "limit_to_be_activated == '1'" - "user_id == 'bob'" variables: [] +- namespace: multi-ns-A/multi-name-A + max_value: 5 + seconds: 10 + conditions: + - "user_id == 'alice'" + variables: [] +- namespace: multi-ns-A/multi-name-A + max_value: 2 + seconds: 10 + conditions: + - "user_id == 'bob'" + variables: [] diff --git a/utils/kind/cluster.yaml b/utils/kind/cluster.yaml new file mode 100644 index 00000000..9ec18f83 --- /dev/null +++ b/utils/kind/cluster.yaml @@ -0,0 +1,9 @@ +--- +kind: Cluster +apiVersion: kind.x-k8s.io/v1alpha4 +nodes: + - role: control-plane + image: kindest/node:v1.30.0 + extraMounts: + - hostPath: $(WASM_PATH) + containerPath: /opt/kuadrant/wasm