Skip to content

Commit

Permalink
[refactor] OperationDispatcher triggering procedures
Browse files Browse the repository at this point in the history
Signed-off-by: dd di cesare <[email protected]>
  • Loading branch information
didierofrivia committed Sep 4, 2024
1 parent 2d1c7ba commit ef20110
Show file tree
Hide file tree
Showing 2 changed files with 97 additions and 31 deletions.
2 changes: 1 addition & 1 deletion src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
mod operation_dispatcher;
mod attribute;
mod configuration;
mod envoy;
mod filter;
mod glob;
mod operation_dispatcher;
mod policy;
mod policy_index;
mod service;
Expand Down
126 changes: 96 additions & 30 deletions src/operation_dispatcher.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,11 @@
use crate::envoy::RateLimitDescriptor;
use crate::policy::Policy;
use crate::service::{GrpcMessage, GrpcServiceHandler};
use protobuf::RepeatedField;
use proxy_wasm::types::Status;
use std::cell::RefCell;
use std::collections::HashMap;
use std::rc::Rc;

#[allow(dead_code)]
#[derive(PartialEq, Debug, Clone)]
Expand All @@ -20,32 +26,33 @@ impl State {
}
}

type Procedure = (Rc<GrpcServiceHandler>, GrpcMessage);

#[allow(dead_code)]
#[derive(Clone)]
pub(crate) struct Operation {
state: State,
result: Result<u32, Status>,
action: Option<fn() -> Result<u32, Status>>,
procedure: Procedure,
}

#[allow(dead_code)]
impl Operation {
pub fn default() -> Self {
pub fn new(procedure: Procedure) -> Self {
Self {
state: State::Pending,
result: Err(Status::Empty),
action: None,
procedure,
}
}

pub fn set_action(&mut self, action: fn() -> Result<u32, Status>) {
self.action = Some(action);
pub fn set_action(&mut self, procedure: Procedure) {
self.procedure = procedure;
}

pub fn trigger(&mut self) {
if let State::Done = self.state {
} else if let Some(action) = self.action {
self.result = action();
} else {
self.result = self.procedure.0.send(self.procedure.1.clone());
self.state.next();
}
}
Expand All @@ -62,15 +69,39 @@ impl Operation {
#[allow(dead_code)]
pub struct OperationDispatcher {
operations: RefCell<Vec<Operation>>,
service_handlers: HashMap<String, Rc<GrpcServiceHandler>>,
}

#[allow(dead_code)]
impl OperationDispatcher {
pub fn default() -> OperationDispatcher {
pub fn default() -> Self {
OperationDispatcher {
operations: RefCell::new(vec![]),
service_handlers: HashMap::default(),
}
}
pub fn new(service_handlers: HashMap<String, Rc<GrpcServiceHandler>>) -> Self {
Self {
service_handlers,
operations: RefCell::new(vec![]),
}
}

pub fn build_operations(
&self,
policy: &Policy,
descriptors: RepeatedField<RateLimitDescriptor>,
) {
let mut operations: Vec<Operation> = vec![];
policy.actions.iter().for_each(|action| {
// TODO(didierofrivia): Error handling
if let Some(service) = self.service_handlers.get(&action.extension) {
let message = service.build_message(policy.domain.clone(), descriptors.clone());
operations.push(Operation::new((service.clone(), message)))
}
});
self.push_operations(operations);
}

pub fn push_operations(&self, operations: Vec<Operation>) {
self.operations.borrow_mut().extend(operations);
Expand All @@ -87,30 +118,64 @@ impl OperationDispatcher {
self.operations.borrow().first().unwrap().get_result()
}

pub fn next(&self) -> bool {
pub fn next(&self) -> Option<(State, Result<u32, Status>)> {
let mut operations = self.operations.borrow_mut();
if let Some((i, operation)) = operations.iter_mut().enumerate().next() {
if let State::Done = operation.get_state() {
let res = operation.get_result();
operations.remove(i);
operations.len() > 0
Some((State::Done, res))
} else {
operation.trigger();
true
Some((operation.state.clone(), operation.result))
}
} else {
false
None
}
}
}

#[cfg(test)]
mod tests {
use super::*;
use crate::envoy::RateLimitRequest;
use std::time::Duration;

fn grpc_call(
_upstream_name: &str,
_service_name: &str,
_method_name: &str,
_initial_metadata: Vec<(&str, &[u8])>,
_message: Option<&[u8]>,
_timeout: Duration,
) -> Result<u32, Status> {
Ok(1)
}

fn build_grpc_service_handler() -> GrpcServiceHandler {
GrpcServiceHandler::new(
Rc::new(Default::default()),
Rc::new(Default::default()),
Some(grpc_call),
)
}

fn build_message() -> RateLimitRequest {
RateLimitRequest {
domain: "example.org".to_string(),
descriptors: RepeatedField::new(),
hits_addend: 1,
unknown_fields: Default::default(),
cached_size: Default::default(),
}
}

#[test]
fn operation_transition() {
let mut operation = Operation::default();
operation.set_action(|| -> Result<u32, Status> { Ok(200) });
let mut operation = Operation::new((
Rc::new(build_grpc_service_handler()),
GrpcMessage::RateLimit(build_message()),
));
assert_eq!(operation.get_state(), State::Pending);
operation.trigger();
assert_eq!(operation.get_state(), State::Waiting);
Expand All @@ -121,22 +186,21 @@ mod tests {

#[test]
fn operation_dispatcher_push_actions() {
let operation_dispatcher = OperationDispatcher {
operations: RefCell::new(vec![Operation::default()]),
};
let operation_dispatcher = OperationDispatcher::default();

assert_eq!(operation_dispatcher.operations.borrow().len(), 1);

operation_dispatcher.push_operations(vec![Operation::default()]);
operation_dispatcher.push_operations(vec![Operation::new((
Rc::new(build_grpc_service_handler()),
GrpcMessage::RateLimit(build_message()),
))]);

assert_eq!(operation_dispatcher.operations.borrow().len(), 2);
}

#[test]
fn operation_dispatcher_get_current_action_state() {
let operation_dispatcher = OperationDispatcher {
operations: RefCell::new(vec![Operation::default()]),
};
let operation_dispatcher = OperationDispatcher::default();

assert_eq!(
operation_dispatcher.get_current_operation_state(),
Expand All @@ -146,28 +210,30 @@ mod tests {

#[test]
fn operation_dispatcher_next() {
let mut operation = Operation::default();
operation.set_action(|| -> Result<u32, Status> { Ok(200) });
let operation_dispatcher = OperationDispatcher {
operations: RefCell::new(vec![operation]),
};
let operation = Operation::new((
Rc::new(build_grpc_service_handler()),
GrpcMessage::RateLimit(build_message()),
));
let operation_dispatcher = OperationDispatcher::default();
operation_dispatcher.push_operations(vec![operation]);

let mut res = operation_dispatcher.next();
assert!(res);
assert_eq!(res, Some((State::Waiting, Ok(200))));
assert_eq!(
operation_dispatcher.get_current_operation_state(),
Some(State::Waiting)
);

res = operation_dispatcher.next();
assert!(res);
assert_eq!(res, Some((State::Done, Ok(200))));
assert_eq!(
operation_dispatcher.get_current_operation_state(),
Some(State::Done)
);
assert_eq!(operation_dispatcher.get_current_operation_result(), Ok(200));

res = operation_dispatcher.next();
assert!(!res);
assert_eq!(res, None);
assert_eq!(operation_dispatcher.get_current_operation_state(), None);
}
}

0 comments on commit ef20110

Please sign in to comment.