From 159d247a55716e4897bb1b70a05f07ea3f2c6ee5 Mon Sep 17 00:00:00 2001 From: dd di cesare Date: Wed, 4 Sep 2024 17:47:38 +0200 Subject: [PATCH] [refactor] grpc_call function delegated to the caller Signed-off-by: dd di cesare --- src/filter/root_context.rs | 1 - src/operation_dispatcher.rs | 28 ++++++++++++++++++++++------ src/service.rs | 13 +++---------- 3 files changed, 25 insertions(+), 17 deletions(-) diff --git a/src/filter/root_context.rs b/src/filter/root_context.rs index b4fafb77..6dcd4bdc 100644 --- a/src/filter/root_context.rs +++ b/src/filter/root_context.rs @@ -49,7 +49,6 @@ impl RootContext for FilterRoot { .or_insert(Rc::from(GrpcServiceHandler::new( Rc::clone(service), Rc::new(HeaderResolver::new()), - None, ))); }); Some(Box::new(Filter { diff --git a/src/operation_dispatcher.rs b/src/operation_dispatcher.rs index 41480f91..61e12b8a 100644 --- a/src/operation_dispatcher.rs +++ b/src/operation_dispatcher.rs @@ -3,10 +3,12 @@ use crate::envoy::RateLimitDescriptor; use crate::policy::Policy; use crate::service::{GrpcMessage, GrpcServiceHandler}; use protobuf::RepeatedField; +use proxy_wasm::hostcalls; use proxy_wasm::types::Status; use std::cell::RefCell; use std::collections::HashMap; use std::rc::Rc; +use std::time::Duration; #[allow(dead_code)] #[derive(PartialEq, Debug, Clone)] @@ -27,6 +29,24 @@ impl State { } } +fn grpc_call( + upstream_name: &str, + service_name: &str, + method_name: &str, + initial_metadata: Vec<(&str, &[u8])>, + message: Option<&[u8]>, + timeout: Duration, +) -> Result { + hostcalls::dispatch_grpc_call( + upstream_name, + service_name, + method_name, + initial_metadata, + message, + timeout, + ) +} + type Procedure = (Rc, GrpcMessage); #[allow(dead_code)] @@ -56,7 +76,7 @@ impl Operation { pub fn trigger(&mut self) { if let State::Done = self.state { } else { - self.result = self.procedure.0.send(self.procedure.1.clone()); + self.result = self.procedure.0.send(grpc_call, self.procedure.1.clone()); self.state.next(); } } @@ -170,11 +190,7 @@ mod tests { } fn build_grpc_service_handler() -> GrpcServiceHandler { - GrpcServiceHandler::new( - Rc::new(Default::default()), - Rc::new(Default::default()), - Some(grpc_call), - ) + GrpcServiceHandler::new(Rc::new(Default::default()), Rc::new(Default::default())) } fn build_message() -> RateLimitRequest { diff --git a/src/service.rs b/src/service.rs index 4bd77c08..38fe2fce 100644 --- a/src/service.rs +++ b/src/service.rs @@ -11,7 +11,6 @@ use protobuf::{ Clear, CodedInputStream, CodedOutputStream, Message, ProtobufResult, UnknownFields, }; use proxy_wasm::hostcalls; -use proxy_wasm::hostcalls::dispatch_grpc_call; use proxy_wasm::types::{Bytes, MapType, Status}; use std::any::Any; use std::cell::OnceCell; @@ -195,23 +194,17 @@ type GrpcCall = fn( pub struct GrpcServiceHandler { service: Rc, header_resolver: Rc, - grpc_call: GrpcCall, } impl GrpcServiceHandler { - pub fn new( - service: Rc, - header_resolver: Rc, - grpc_call: Option, - ) -> Self { + pub fn new(service: Rc, header_resolver: Rc) -> Self { Self { service, header_resolver, - grpc_call: grpc_call.unwrap_or(dispatch_grpc_call), } } - pub fn send(&self, message: GrpcMessage) -> Result { + pub fn send(&self, grpc_call: GrpcCall, message: GrpcMessage) -> Result { let msg = Message::write_to_bytes(&message).unwrap(); let metadata = self .header_resolver @@ -220,7 +213,7 @@ impl GrpcServiceHandler { .map(|(header, value)| (*header, value.as_slice())) .collect(); - (self.grpc_call)( + grpc_call( self.service.endpoint(), self.service.name(), self.service.method(),