Skip to content

Commit

Permalink
Add a request pipeline and configuration for retries (#469)
Browse files Browse the repository at this point in the history
* Add max retries to Exponential Backoff retry policy (#421)

* Let user configure the http pipeline (#421)

* Refactored retries to use tower services

* Add example and a bit of documentation for new client configurations (#421)

---------

Co-authored-by: Valentin Mariette <[email protected]>
Co-authored-by: Sean Reeise <[email protected]>
  • Loading branch information
3 people authored Aug 14, 2024
1 parent 17050b9 commit 804467c
Show file tree
Hide file tree
Showing 11 changed files with 305 additions and 10 deletions.
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ rustls-tls = ["reqwest/rustls-tls", "graph-http/rustls-tls", "graph-oauth/rustls
brotli = ["reqwest/brotli", "graph-http/brotli", "graph-oauth/brotli", "graph-core/brotli"]
deflate = ["reqwest/deflate", "graph-http/deflate", "graph-oauth/deflate", "graph-core/deflate"]
trust-dns = ["reqwest/trust-dns", "graph-http/trust-dns", "graph-oauth/trust-dns", "graph-core/trust-dns"]
socks = ["graph-http/socks"]
socks = ["reqwest/socks", "graph-http/socks", "graph-oauth/socks", "graph-core/socks"]
openssl = ["graph-oauth/openssl"]
interactive-auth = ["graph-oauth/interactive-auth"]
test-util = ["graph-http/test-util"]
Expand Down
24 changes: 23 additions & 1 deletion examples/client_configuration.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
#![allow(dead_code, unused, unused_imports, clippy::module_inception)]
use graph_oauth::ConfidentialClientApplication;
use graph_rs_sdk::{header::HeaderMap, header::HeaderValue, GraphClient, GraphClientConfiguration};
use http::header::ACCEPT;
use http::HeaderName;
Expand All @@ -10,7 +11,28 @@ fn main() {
let client_config = GraphClientConfiguration::new()
.access_token(ACCESS_TOKEN)
.timeout(Duration::from_secs(30))
.default_headers(HeaderMap::default());
.default_headers(HeaderMap::default())
.retry(Some(10)) // retry 10 times if the request is not successful
.concurrency_limit(Some(10)) // limit the number of concurrent requests on this client to 10
.wait_for_retry_after_headers(true); // wait the amount of seconds specified by the Retry-After header of the response when we reach the throttling limits (429 Too Many Requests)

let _ = GraphClient::from(client_config);
}

// Using Identity Platform Clients
fn configure_graph_client(client_id: &str, client_secret: &str, tenant: &str) {
let mut confidential_client_application = ConfidentialClientApplication::builder(client_id)
.with_client_secret(client_secret)
.with_tenant(tenant)
.build();

let client_config = GraphClientConfiguration::new()
.client_application(confidential_client_application)
.timeout(Duration::from_secs(30))
.default_headers(HeaderMap::default())
.retry(Some(10)) // retry 10 times if the request is not successful
.concurrency_limit(Some(10)) // limit the number of concurrent requests on this client to 10
.wait_for_retry_after_headers(true); // wait the amount of seconds specified by the Retry-After header of the response when we reach the throttling limits (429 Too Many Requests)

let _ = GraphClient::from(client_config);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@

use graph_rs_sdk::{identity::ConfidentialClientApplication, GraphClient};

pub async fn build_client(client_id: &str, client_secret: &str, tenant: &str) -> GraphClient {
pub fn build_client(client_id: &str, client_secret: &str, tenant: &str) -> GraphClient {
let mut confidential_client_application = ConfidentialClientApplication::builder(client_id)
.with_client_secret(client_secret)
.with_tenant(tenant)
Expand Down
1 change: 1 addition & 0 deletions graph-core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -36,3 +36,4 @@ rustls-tls = ["reqwest/rustls-tls"]
brotli = ["reqwest/brotli"]
deflate = ["reqwest/deflate"]
trust-dns = ["reqwest/trust-dns"]
socks = ["reqwest/socks"]
14 changes: 14 additions & 0 deletions graph-error/src/graph_failure.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,10 @@ use crate::internal::GraphRsError;
use crate::{AuthExecutionError, AuthorizationFailure, ErrorMessage};
use reqwest::header::HeaderMap;
use std::cell::BorrowMutError;
use std::error::Error;
use std::io;
use std::io::ErrorKind;
use std::num::ParseIntError;
use std::str::Utf8Error;
use std::sync::mpsc;

Expand Down Expand Up @@ -74,6 +76,12 @@ pub enum GraphFailure {
#[error("{0:#?}")]
ErrorMessage(#[from] ErrorMessage),

#[error("Temporary Graph API Error")]
TemporaryError,

#[error("Parse Int error:\n{0:#?}")]
ParseIntError(#[from] ParseIntError),

#[error("message: {0:#?}, response: {1:#?}", message, response)]
SilentTokenAuth {
message: String,
Expand Down Expand Up @@ -160,3 +168,9 @@ impl From<AuthExecutionError> for GraphFailure {
}
}
}

impl From<Box<dyn Error + Send + Sync>> for GraphFailure {
fn from(value: Box<dyn Error + Send + Sync>) -> Self {
value.into()
}
}
4 changes: 3 additions & 1 deletion graph-http/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ serde_urlencoded = "0.7.1"
thiserror = "1"
tokio = { version = "1.27.0", features = ["full", "tracing"] }
url = { version = "2", features = ["serde"] }
tower = { version = "0.4.13", features = ["limit", "retry", "timeout", "util"] }
futures-util = "0.3.30"

graph-error = { path = "../graph-error" }
graph-core = { path = "../graph-core", default-features = false }
Expand All @@ -34,5 +36,5 @@ rustls-tls = ["reqwest/rustls-tls", "graph-core/rustls-tls"]
brotli = ["reqwest/brotli", "graph-core/brotli"]
deflate = ["reqwest/deflate", "graph-core/deflate"]
trust-dns = ["reqwest/trust-dns", "graph-core/trust-dns"]
socks = ["reqwest/socks"]
socks = ["reqwest/socks", "graph-core/socks"]
test-util = []
101 changes: 97 additions & 4 deletions graph-http/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,16 +4,28 @@ use reqwest::header::{HeaderMap, HeaderValue, ACCEPT, USER_AGENT};
use reqwest::redirect::Policy;
use reqwest::tls::Version;
use reqwest::Proxy;
use reqwest::{Request, Response};
use std::env::VarError;
use std::ffi::OsStr;
use std::fmt::{Debug, Formatter};
use std::time::Duration;
use tower::limit::ConcurrencyLimitLayer;
use tower::retry::RetryLayer;
use tower::util::BoxCloneService;
use tower::ServiceExt;

fn user_agent_header_from_env() -> Option<HeaderValue> {
let header = std::option_env!("GRAPH_CLIENT_USER_AGENT")?;
HeaderValue::from_str(header).ok()
}

#[derive(Default, Clone)]
struct ServiceLayersConfiguration {
concurrency_limit: Option<usize>,
retry: Option<usize>,
wait_for_retry_after_headers: Option<()>,
}

#[derive(Clone)]
struct ClientConfiguration {
client_application: Option<Box<dyn ClientApplication>>,
Expand All @@ -26,6 +38,7 @@ struct ClientConfiguration {
/// TLS 1.2 required to support all features in Microsoft Graph
/// See [Reliability and Support](https://learn.microsoft.com/en-us/graph/best-practices-concept#reliability-and-support)
min_tls_version: Version,
service_layers_configuration: ServiceLayersConfiguration,
proxy: Option<Proxy>,
}

Expand All @@ -47,6 +60,7 @@ impl ClientConfiguration {
connection_verbose: false,
https_only: true,
min_tls_version: Version::TLS_1_2,
service_layers_configuration: ServiceLayersConfiguration::default(),
proxy: None,
}
}
Expand Down Expand Up @@ -164,6 +178,55 @@ impl GraphClientConfiguration {
self
}

/// Enable a request retry for a failed request. The retry parameter can be used to
/// change how many times the request should be retried.
///
/// Some requests may fail on GraphAPI side and should be retried.
/// Only server errors (HTTP code between 500 and 599) will be retried.
///
/// Default is no retry.
pub fn retry(mut self, retry: Option<usize>) -> GraphClientConfiguration {
self.config.service_layers_configuration.retry = retry;
self
}

/// Enable a request retry if we reach the throttling limits and GraphAPI returns a
/// 429 Too Many Requests with a Retry-After header
///
/// Retry attempts are executed when the response has a status code of 429, 500, 503, 504
/// and the response has a Retry-After header. The Retry-After header provides a back-off
/// time to wait for before retrying the request again.
///
/// Be careful with this parameter as some API endpoints have quite
/// low limits (reports for example) and the request may hang for hundreds of seconds.
/// For maximum throughput you may want to not respect the Retry-After header as hitting
/// another server thanks to load-balancing may lead to a successful response.
///
/// Default is no retry.
pub fn wait_for_retry_after_headers(mut self, retry: bool) -> GraphClientConfiguration {
self.config
.service_layers_configuration
.wait_for_retry_after_headers = match retry {
true => Some(()),
false => None,
};
self
}

/// Enable a concurrency limit on the client.
///
/// Every request through this client will be subject to a concurrency limit.
/// Can be useful to stay under the API limits set by GraphAPI.
///
/// Default is no concurrency limit.
pub fn concurrency_limit(
mut self,
concurrency_limit: Option<usize>,
) -> GraphClientConfiguration {
self.config.service_layers_configuration.concurrency_limit = concurrency_limit;
self
}

pub fn build(self) -> Client {
let config = self.clone();
let headers = self.config.headers.clone();
Expand All @@ -187,19 +250,45 @@ impl GraphClientConfiguration {
builder = builder.proxy(proxy);
}

let client = builder.build().unwrap();

let service = tower::ServiceBuilder::new()
.option_layer(
self.config
.service_layers_configuration
.retry
.map(|num| RetryLayer::new(crate::tower_services::Attempts(num))),
)
.option_layer(
self.config
.service_layers_configuration
.wait_for_retry_after_headers
.map(|_| RetryLayer::new(crate::tower_services::WaitFor())),
)
.option_layer(
self.config
.service_layers_configuration
.concurrency_limit
.map(ConcurrencyLimitLayer::new),
)
.service(client.clone())
.boxed_clone();

if let Some(client_application) = self.config.client_application {
Client {
client_application,
inner: builder.build().unwrap(),
inner: client,
headers,
builder: config,
service,
}
} else {
Client {
client_application: Box::<String>::default(),
inner: builder.build().unwrap(),
inner: client,
headers,
builder: config,
service,
}
}
}
Expand All @@ -226,16 +315,18 @@ impl GraphClientConfiguration {
builder = builder.proxy(proxy);
}

let client = builder.build().unwrap();

if let Some(client_application) = self.config.client_application {
BlockingClient {
client_application,
inner: builder.build().unwrap(),
inner: client,
headers,
}
} else {
BlockingClient {
client_application: Box::<String>::default(),
inner: builder.build().unwrap(),
inner: client,
headers,
}
}
Expand All @@ -254,6 +345,8 @@ pub struct Client {
pub(crate) inner: reqwest::Client,
pub(crate) headers: HeaderMap,
pub(crate) builder: GraphClientConfiguration,
pub(crate) service:
BoxCloneService<Request, Response, Box<dyn std::error::Error + Send + Sync>>,
}

impl Client {
Expand Down
2 changes: 2 additions & 0 deletions graph-http/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ mod core;
mod request_components;
mod request_handler;
mod resource_identifier;
mod tower_services;
mod upload_session;

pub mod url;
Expand All @@ -27,6 +28,7 @@ pub(crate) mod internal {
pub use crate::request_handler::*;
#[allow(unused_imports)]
pub use crate::resource_identifier::*;
pub use crate::tower_services::*;
pub use crate::traits::*;
pub use crate::upload_session::*;
pub use graph_core::http::*;
Expand Down
18 changes: 16 additions & 2 deletions graph-http/src/request_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,19 +7,23 @@ use async_stream::try_stream;
use futures::Stream;
use graph_error::{AuthExecutionResult, ErrorMessage, GraphFailure, GraphResult};
use reqwest::header::{HeaderMap, HeaderName, HeaderValue, CONTENT_TYPE};
use reqwest::{Request, Response};
use serde::de::DeserializeOwned;
use std::collections::VecDeque;
use std::fmt::Debug;
use std::time::Duration;
use tower::util::BoxCloneService;
use tower::{Service, ServiceExt};
use url::Url;

#[derive(Default)]
pub struct RequestHandler {
pub(crate) inner: Client,
pub(crate) request_components: RequestComponents,
pub(crate) error: Option<GraphFailure>,
pub(crate) body: Option<BodyRead>,
pub(crate) client_builder: GraphClientConfiguration,
pub(crate) service:
BoxCloneService<Request, Response, Box<dyn std::error::Error + Send + Sync>>,
}

impl RequestHandler {
Expand All @@ -29,6 +33,7 @@ impl RequestHandler {
err: Option<GraphFailure>,
body: Option<BodyRead>,
) -> RequestHandler {
let service = inner.service.clone();
let client_builder = inner.builder.clone();
let mut original_headers = inner.headers.clone();
original_headers.extend(request_components.headers.clone());
Expand All @@ -51,6 +56,7 @@ impl RequestHandler {
error,
body,
client_builder,
service,
}
}

Expand Down Expand Up @@ -242,8 +248,16 @@ impl RequestHandler {

#[inline]
pub async fn send(self) -> GraphResult<reqwest::Response> {
let mut service = self.service.clone();
let request_builder = self.build().await?;
request_builder.send().await.map_err(GraphFailure::from)
let request = request_builder.build()?;
service
.ready()
.await
.map_err(GraphFailure::from)?
.call(request)
.await
.map_err(GraphFailure::from)
}
}

Expand Down
Loading

0 comments on commit 804467c

Please sign in to comment.