Skip to content

Commit

Permalink
[refactor] Indexing waiting operations by token_id
Browse files Browse the repository at this point in the history
Signed-off-by: dd di cesare <[email protected]>
  • Loading branch information
didierofrivia committed Sep 12, 2024
1 parent c9e8f51 commit 43dd184
Showing 1 changed file with 71 additions and 26 deletions.
97 changes: 71 additions & 26 deletions src/operation_dispatcher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,23 +48,30 @@ impl Operation {
pub fn new(extension: Rc<Extension>, procedure: Procedure) -> Self {
Self {
state: State::Pending,
result: Err(Status::Empty),
result: Ok(0), // Heuristics: zero represents that it's not been triggered, following `hostcalls` example
extension,
procedure,
grpc_call_fn,
get_map_values_bytes_fn,
}
}

fn trigger(&mut self) {
if let State::Done = self.state {
} else {
self.result = self.procedure.0.send(
self.get_map_values_bytes_fn,
self.grpc_call_fn,
self.procedure.1.clone(),
);
self.state.next();
fn trigger(&mut self) -> Result<u32, Status> {
match self.state {
State::Pending => {
self.result = self.procedure.0.send(
self.get_map_values_bytes_fn,
self.grpc_call_fn,
self.procedure.1.clone(),
);
self.state.next();
self.result
}
State::Waiting => {
self.state.next();
self.result
}
State::Done => self.result,
}
}

Expand All @@ -87,22 +94,25 @@ impl Operation {

#[allow(dead_code)]
pub struct OperationDispatcher {
operations: RefCell<Vec<Operation>>,
pending_operations: RefCell<Vec<Operation>>,
waiting_operations: RefCell<HashMap<u32, Operation>>,
service_handlers: HashMap<String, Rc<GrpcServiceHandler>>,
}

#[allow(dead_code)]
impl OperationDispatcher {
pub fn default() -> Self {
OperationDispatcher {
operations: RefCell::new(vec![]),
pending_operations: RefCell::new(vec![]),
waiting_operations: RefCell::new(HashMap::default()),
service_handlers: HashMap::default(),
}
}
pub fn new(service_handlers: HashMap<String, Rc<GrpcServiceHandler>>) -> Self {
Self {
service_handlers,
operations: RefCell::new(vec![]),
pending_operations: RefCell::new(vec![]),
waiting_operations: RefCell::new(HashMap::new()),
}
}

Expand Down Expand Up @@ -130,28 +140,40 @@ impl OperationDispatcher {
}

pub fn push_operations(&self, operations: Vec<Operation>) {
self.operations.borrow_mut().extend(operations);
self.pending_operations.borrow_mut().extend(operations);
}

pub fn get_current_operation_state(&self) -> Option<State> {
self.operations
self.pending_operations
.borrow()
.first()
.map(|operation| operation.get_state().clone())
}

pub fn get_current_operation_result(&self) -> Result<u32, Status> {
self.operations.borrow().first().unwrap().get_result()
self.pending_operations
.borrow()
.first()
.unwrap()
.get_result()
}

pub fn next(&self) -> Option<Operation> {
let mut operations = self.operations.borrow_mut();
let mut operations = self.pending_operations.borrow_mut();
if let Some((i, operation)) = operations.iter_mut().enumerate().next() {
if let State::Done = operation.get_state() {
if let Ok(token_id) = operation.result {
self.waiting_operations.borrow_mut().remove(&token_id);
} // If result was Err, means the operation wasn't indexed

operations.remove(i);
operations.get(i).cloned() // The next op is now at `i`
} else {
operation.trigger();
if let Ok(token_id) = operation.trigger() {
self.waiting_operations
.borrow_mut()
.insert(token_id, operation.clone());
} // TODO(didierofrivia): Decide on indexing the failed operations.
Some(operation.clone())
}
} else {
Expand Down Expand Up @@ -223,7 +245,7 @@ mod tests {
fn build_operation() -> Operation {
Operation {
state: State::Pending,
result: Ok(1),
result: Ok(0),
extension: Rc::new(Extension::default()),
procedure: (
Rc::new(build_grpc_service_handler()),
Expand All @@ -241,16 +263,19 @@ mod tests {
assert_eq!(operation.get_state(), State::Pending);
assert_eq!(operation.get_extension_type(), ExtensionType::RateLimit);
assert_eq!(operation.get_failure_mode(), FailureMode::Deny);
assert_eq!(operation.get_result(), Ok(1));
assert_eq!(operation.get_result(), Ok(0));
}

#[test]
fn operation_transition() {
let mut operation = build_operation();
assert_eq!(operation.result, Ok(0));
assert_eq!(operation.get_state(), State::Pending);
operation.trigger();
let mut res = operation.trigger();
assert_eq!(res, Ok(200));
assert_eq!(operation.get_state(), State::Waiting);
operation.trigger();
res = operation.trigger();
assert_eq!(res, Ok(200));
assert_eq!(operation.result, Ok(200));
assert_eq!(operation.get_state(), State::Done);
}
Expand All @@ -259,10 +284,10 @@ mod tests {
fn operation_dispatcher_push_actions() {
let operation_dispatcher = OperationDispatcher::default();

assert_eq!(operation_dispatcher.operations.borrow().len(), 0);
assert_eq!(operation_dispatcher.pending_operations.borrow().len(), 0);
operation_dispatcher.push_operations(vec![build_operation()]);

assert_eq!(operation_dispatcher.operations.borrow().len(), 1);
assert_eq!(operation_dispatcher.pending_operations.borrow().len(), 1);
}

#[test]
Expand All @@ -280,27 +305,43 @@ mod tests {
let operation_dispatcher = OperationDispatcher::default();
operation_dispatcher.push_operations(vec![build_operation(), build_operation()]);

assert_eq!(operation_dispatcher.get_current_operation_result(), Ok(1));
assert_eq!(operation_dispatcher.get_current_operation_result(), Ok(0));
assert_eq!(
operation_dispatcher.get_current_operation_state(),
Some(State::Pending)
);
assert_eq!(
operation_dispatcher.waiting_operations.borrow_mut().len(),
0
);

let mut op = operation_dispatcher.next();
assert_eq!(op.clone().unwrap().get_result(), Ok(200));
assert_eq!(op.unwrap().get_state(), State::Waiting);
assert_eq!(
operation_dispatcher.waiting_operations.borrow_mut().len(),
1
);

op = operation_dispatcher.next();
assert_eq!(op.clone().unwrap().get_result(), Ok(200));
assert_eq!(op.unwrap().get_state(), State::Done);

op = operation_dispatcher.next();
assert_eq!(op.clone().unwrap().get_result(), Ok(1));
assert_eq!(op.clone().unwrap().get_result(), Ok(0));
assert_eq!(op.unwrap().get_state(), State::Pending);
assert_eq!(
operation_dispatcher.waiting_operations.borrow_mut().len(),
0
);

op = operation_dispatcher.next();
assert_eq!(op.clone().unwrap().get_result(), Ok(200));
assert_eq!(op.unwrap().get_state(), State::Waiting);
assert_eq!(
operation_dispatcher.waiting_operations.borrow_mut().len(),
1
);

op = operation_dispatcher.next();
assert_eq!(op.clone().unwrap().get_result(), Ok(200));
Expand All @@ -309,5 +350,9 @@ mod tests {
op = operation_dispatcher.next();
assert!(op.is_none());
assert!(operation_dispatcher.get_current_operation_state().is_none());
assert_eq!(
operation_dispatcher.waiting_operations.borrow_mut().len(),
0
);
}
}

0 comments on commit 43dd184

Please sign in to comment.