Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add a request pipeline and configuration for retries #469

Merged
merged 16 commits into from
Aug 14, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ rustls-tls = ["reqwest/rustls-tls", "graph-http/rustls-tls", "graph-oauth/rustls
brotli = ["reqwest/brotli", "graph-http/brotli", "graph-oauth/brotli", "graph-core/brotli"]
deflate = ["reqwest/deflate", "graph-http/deflate", "graph-oauth/deflate", "graph-core/deflate"]
trust-dns = ["reqwest/trust-dns", "graph-http/trust-dns", "graph-oauth/trust-dns", "graph-core/trust-dns"]
socks = ["graph-http/socks"]
socks = ["reqwest/socks", "graph-http/socks", "graph-oauth/socks", "graph-core/socks"]
openssl = ["graph-oauth/openssl"]
interactive-auth = ["graph-oauth/interactive-auth"]
test-util = ["graph-http/test-util"]
Expand Down
24 changes: 23 additions & 1 deletion examples/client_configuration.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
#![allow(dead_code, unused, unused_imports, clippy::module_inception)]
use graph_oauth::ConfidentialClientApplication;
use graph_rs_sdk::{header::HeaderMap, header::HeaderValue, GraphClient, GraphClientConfiguration};
use http::header::ACCEPT;
use http::HeaderName;
Expand All @@ -10,7 +11,28 @@ fn main() {
let client_config = GraphClientConfiguration::new()
.access_token(ACCESS_TOKEN)
.timeout(Duration::from_secs(30))
.default_headers(HeaderMap::default());
.default_headers(HeaderMap::default())
.retry(Some(10)) // retry 10 times if the request is not successful
.concurrency_limit(Some(10)) // limit the number of concurrent requests on this client to 10
.wait_for_retry_after_headers(true); // wait the amount of seconds specified by the Retry-After header of the response when we reach the throttling limits (429 Too Many Requests)

let _ = GraphClient::from(client_config);
}

// Using Identity Platform Clients
fn configure_graph_client(client_id: &str, client_secret: &str, tenant: &str) {
let mut confidential_client_application = ConfidentialClientApplication::builder(client_id)
.with_client_secret(client_secret)
.with_tenant(tenant)
.build();

let client_config = GraphClientConfiguration::new()
.client_application(confidential_client_application)
.timeout(Duration::from_secs(30))
.default_headers(HeaderMap::default())
.retry(Some(10)) // retry 10 times if the request is not successful
.concurrency_limit(Some(10)) // limit the number of concurrent requests on this client to 10
.wait_for_retry_after_headers(true); // wait the amount of seconds specified by the Retry-After header of the response when we reach the throttling limits (429 Too Many Requests)

let _ = GraphClient::from(client_config);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@

use graph_rs_sdk::{identity::ConfidentialClientApplication, GraphClient};

pub async fn build_client(client_id: &str, client_secret: &str, tenant: &str) -> GraphClient {
pub fn build_client(client_id: &str, client_secret: &str, tenant: &str) -> GraphClient {
let mut confidential_client_application = ConfidentialClientApplication::builder(client_id)
.with_client_secret(client_secret)
.with_tenant(tenant)
Expand Down
1 change: 1 addition & 0 deletions graph-core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -36,3 +36,4 @@ rustls-tls = ["reqwest/rustls-tls"]
brotli = ["reqwest/brotli"]
deflate = ["reqwest/deflate"]
trust-dns = ["reqwest/trust-dns"]
socks = ["reqwest/socks"]
14 changes: 14 additions & 0 deletions graph-error/src/graph_failure.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,10 @@ use crate::internal::GraphRsError;
use crate::{AuthExecutionError, AuthorizationFailure, ErrorMessage};
use reqwest::header::HeaderMap;
use std::cell::BorrowMutError;
use std::error::Error;
use std::io;
use std::io::ErrorKind;
use std::num::ParseIntError;
use std::str::Utf8Error;
use std::sync::mpsc;

Expand Down Expand Up @@ -74,6 +76,12 @@ pub enum GraphFailure {
#[error("{0:#?}")]
ErrorMessage(#[from] ErrorMessage),

#[error("Temporary Graph API Error")]
TemporaryError,

#[error("Parse Int error:\n{0:#?}")]
ParseIntError(#[from] ParseIntError),

#[error("message: {0:#?}, response: {1:#?}", message, response)]
SilentTokenAuth {
message: String,
Expand Down Expand Up @@ -160,3 +168,9 @@ impl From<AuthExecutionError> for GraphFailure {
}
}
}

impl From<Box<dyn Error + Send + Sync>> for GraphFailure {
fn from(value: Box<dyn Error + Send + Sync>) -> Self {
value.into()
}
}
4 changes: 3 additions & 1 deletion graph-http/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ serde_urlencoded = "0.7.1"
thiserror = "1"
tokio = { version = "1.27.0", features = ["full", "tracing"] }
url = { version = "2", features = ["serde"] }
tower = { version = "0.4.13", features = ["limit", "retry", "timeout", "util"] }
futures-util = "0.3.30"

graph-error = { path = "../graph-error" }
graph-core = { path = "../graph-core", default-features = false }
Expand All @@ -34,5 +36,5 @@ rustls-tls = ["reqwest/rustls-tls", "graph-core/rustls-tls"]
brotli = ["reqwest/brotli", "graph-core/brotli"]
deflate = ["reqwest/deflate", "graph-core/deflate"]
trust-dns = ["reqwest/trust-dns", "graph-core/trust-dns"]
socks = ["reqwest/socks"]
socks = ["reqwest/socks", "graph-core/socks"]
test-util = []
101 changes: 97 additions & 4 deletions graph-http/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,16 +4,28 @@ use reqwest::header::{HeaderMap, HeaderValue, ACCEPT, USER_AGENT};
use reqwest::redirect::Policy;
use reqwest::tls::Version;
use reqwest::Proxy;
use reqwest::{Request, Response};
use std::env::VarError;
use std::ffi::OsStr;
use std::fmt::{Debug, Formatter};
use std::time::Duration;
use tower::limit::ConcurrencyLimitLayer;
use tower::retry::RetryLayer;
use tower::util::BoxCloneService;
use tower::ServiceExt;

fn user_agent_header_from_env() -> Option<HeaderValue> {
let header = std::option_env!("GRAPH_CLIENT_USER_AGENT")?;
HeaderValue::from_str(header).ok()
}

#[derive(Default, Clone)]
struct ServiceLayersConfiguration {
concurrency_limit: Option<usize>,
retry: Option<usize>,
wait_for_retry_after_headers: Option<()>,
}

#[derive(Clone)]
struct ClientConfiguration {
client_application: Option<Box<dyn ClientApplication>>,
Expand All @@ -26,6 +38,7 @@ struct ClientConfiguration {
/// TLS 1.2 required to support all features in Microsoft Graph
/// See [Reliability and Support](https://learn.microsoft.com/en-us/graph/best-practices-concept#reliability-and-support)
min_tls_version: Version,
service_layers_configuration: ServiceLayersConfiguration,
proxy: Option<Proxy>,
}

Expand All @@ -47,6 +60,7 @@ impl ClientConfiguration {
connection_verbose: false,
https_only: true,
min_tls_version: Version::TLS_1_2,
service_layers_configuration: ServiceLayersConfiguration::default(),
proxy: None,
}
}
Expand Down Expand Up @@ -164,6 +178,55 @@ impl GraphClientConfiguration {
self
}

/// Enable a request retry for a failed request. The retry parameter can be used to
/// change how many times the request should be retried.
///
/// Some requests may fail on GraphAPI side and should be retried.
/// Only server errors (HTTP code between 500 and 599) will be retried.
///
/// Default is no retry.
pub fn retry(mut self, retry: Option<usize>) -> GraphClientConfiguration {
self.config.service_layers_configuration.retry = retry;
self
}

/// Enable a request retry if we reach the throttling limits and GraphAPI returns a
/// 429 Too Many Requests with a Retry-After header
///
/// Retry attempts are executed when the response has a status code of 429, 500, 503, 504
/// and the response has a Retry-After header. The Retry-After header provides a back-off
/// time to wait for before retrying the request again.
///
/// Be careful with this parameter as some API endpoints have quite
/// low limits (reports for example) and the request may hang for hundreds of seconds.
/// For maximum throughput you may want to not respect the Retry-After header as hitting
/// another server thanks to load-balancing may lead to a successful response.
///
/// Default is no retry.
pub fn wait_for_retry_after_headers(mut self, retry: bool) -> GraphClientConfiguration {
self.config
.service_layers_configuration
.wait_for_retry_after_headers = match retry {
true => Some(()),
false => None,
};
self
}

/// Enable a concurrency limit on the client.
///
/// Every request through this client will be subject to a concurrency limit.
/// Can be useful to stay under the API limits set by GraphAPI.
///
/// Default is no concurrency limit.
pub fn concurrency_limit(
mut self,
concurrency_limit: Option<usize>,
) -> GraphClientConfiguration {
self.config.service_layers_configuration.concurrency_limit = concurrency_limit;
self
}

pub fn build(self) -> Client {
let config = self.clone();
let headers = self.config.headers.clone();
Expand All @@ -187,19 +250,45 @@ impl GraphClientConfiguration {
builder = builder.proxy(proxy);
}

let client = builder.build().unwrap();

let service = tower::ServiceBuilder::new()
.option_layer(
self.config
.service_layers_configuration
.retry
.map(|num| RetryLayer::new(crate::tower_services::Attempts(num))),
)
.option_layer(
self.config
.service_layers_configuration
.wait_for_retry_after_headers
.map(|_| RetryLayer::new(crate::tower_services::WaitFor())),
)
.option_layer(
self.config
.service_layers_configuration
.concurrency_limit
.map(ConcurrencyLimitLayer::new),
)
.service(client.clone())
.boxed_clone();

if let Some(client_application) = self.config.client_application {
Client {
client_application,
inner: builder.build().unwrap(),
inner: client,
headers,
builder: config,
service,
}
} else {
Client {
client_application: Box::<String>::default(),
inner: builder.build().unwrap(),
inner: client,
headers,
builder: config,
service,
}
}
}
Expand All @@ -226,16 +315,18 @@ impl GraphClientConfiguration {
builder = builder.proxy(proxy);
}

let client = builder.build().unwrap();

if let Some(client_application) = self.config.client_application {
BlockingClient {
client_application,
inner: builder.build().unwrap(),
inner: client,
headers,
}
} else {
BlockingClient {
client_application: Box::<String>::default(),
inner: builder.build().unwrap(),
inner: client,
headers,
}
}
Expand All @@ -254,6 +345,8 @@ pub struct Client {
pub(crate) inner: reqwest::Client,
pub(crate) headers: HeaderMap,
pub(crate) builder: GraphClientConfiguration,
pub(crate) service:
BoxCloneService<Request, Response, Box<dyn std::error::Error + Send + Sync>>,
}

impl Client {
Expand Down
2 changes: 2 additions & 0 deletions graph-http/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ mod core;
mod request_components;
mod request_handler;
mod resource_identifier;
mod tower_services;
mod upload_session;

pub mod url;
Expand All @@ -27,6 +28,7 @@ pub(crate) mod internal {
pub use crate::request_handler::*;
#[allow(unused_imports)]
pub use crate::resource_identifier::*;
pub use crate::tower_services::*;
pub use crate::traits::*;
pub use crate::upload_session::*;
pub use graph_core::http::*;
Expand Down
18 changes: 16 additions & 2 deletions graph-http/src/request_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,19 +7,23 @@ use async_stream::try_stream;
use futures::Stream;
use graph_error::{AuthExecutionResult, ErrorMessage, GraphFailure, GraphResult};
use reqwest::header::{HeaderMap, HeaderName, HeaderValue, CONTENT_TYPE};
use reqwest::{Request, Response};
use serde::de::DeserializeOwned;
use std::collections::VecDeque;
use std::fmt::Debug;
use std::time::Duration;
use tower::util::BoxCloneService;
use tower::{Service, ServiceExt};
use url::Url;

#[derive(Default)]
pub struct RequestHandler {
pub(crate) inner: Client,
pub(crate) request_components: RequestComponents,
pub(crate) error: Option<GraphFailure>,
pub(crate) body: Option<BodyRead>,
pub(crate) client_builder: GraphClientConfiguration,
pub(crate) service:
BoxCloneService<Request, Response, Box<dyn std::error::Error + Send + Sync>>,
}

impl RequestHandler {
Expand All @@ -29,6 +33,7 @@ impl RequestHandler {
err: Option<GraphFailure>,
body: Option<BodyRead>,
) -> RequestHandler {
let service = inner.service.clone();
let client_builder = inner.builder.clone();
let mut original_headers = inner.headers.clone();
original_headers.extend(request_components.headers.clone());
Expand All @@ -51,6 +56,7 @@ impl RequestHandler {
error,
body,
client_builder,
service,
}
}

Expand Down Expand Up @@ -242,8 +248,16 @@ impl RequestHandler {

#[inline]
pub async fn send(self) -> GraphResult<reqwest::Response> {
let mut service = self.service.clone();
let request_builder = self.build().await?;
request_builder.send().await.map_err(GraphFailure::from)
let request = request_builder.build()?;
service
.ready()
.await
.map_err(GraphFailure::from)?
.call(request)
.await
.map_err(GraphFailure::from)
}
}

Expand Down
Loading
Loading