diff --git a/Cargo.toml b/Cargo.toml index 0c161f1b..403b4f5a 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -47,7 +47,9 @@ serde_path_to_error = "0.1.4" serde_urlencoded = "0.7.1" snafu = { version = "0.7", features = ["backtraces"] } tokio = { version = "1.17.0", default-features = false, optional = true } -tower = { version = "0.4.13", default-features = false, features = ["util", "buffer"] } +# This has on-master-but-unreleased improvements to the retry code, merged shortly after 0.4.13. +# Can be changed to a version again once https://github.com/tower-rs/tower/pull/681 lands. +tower = { git = "https://github.com/tower-rs/tower", rev = "aec7b8f417b101d57f85c7ede05275dd61a48597", default-features = false, features = ["util", "buffer"] } tower-http = { version = "0.4.0", features = ["map-response-body", "trace"] } tracing = { version = "0.1.37", features = ["log"], optional = true } url = { version = "2.2.2", features = ["serde"] } diff --git a/src/lib.rs b/src/lib.rs index 80b3590c..1d2afc86 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -230,6 +230,7 @@ pub use self::{ pub type Result = std::result::Result; const GITHUB_BASE_URI: &str = "https://api.github.com"; +const MAX_RETRIES: usize = 3; static STATIC_INSTANCE: Lazy> = Lazy::new(|| arc_swap::ArcSwap::from_pointee(Octocrab::default())); @@ -695,7 +696,7 @@ impl Default for DefaultOctocrabBuilderConfig { write_timeout: None, base_uri: None, #[cfg(feature = "retry")] - retry_config: RetryConfig::Simple(3), + retry_config: RetryConfig::Simple(MAX_RETRIES), } } } @@ -1463,4 +1464,43 @@ mod tests { .await .unwrap(); } + + #[tokio::test] + async fn default_retries_on_unauth_only() { + use http::StatusCode; + use wiremock::{matchers, Mock, MockServer, ResponseTemplate}; + let mock_server = MockServer::start().await; + Mock::given(matchers::method("GET")) + .and(matchers::path_regex(".*")) + .respond_with(ResponseTemplate::new(StatusCode::INTERNAL_SERVER_ERROR)) + .expect(1) + .mount(&mock_server) + .await; + let result = crate::OctocrabBuilder::default() + .base_uri(mock_server.uri()) + .unwrap() + .build() + .unwrap() + .orgs("hello") + .get() + .await; + assert_eq!(result.is_err(), true); + + let mock_server = MockServer::start().await; + Mock::given(matchers::method("GET")) + .and(matchers::path_regex(".*")) + .respond_with(ResponseTemplate::new(StatusCode::UNAUTHORIZED)) + .expect(3) + .mount(&mock_server) + .await; + let result = crate::OctocrabBuilder::default() + .base_uri(mock_server.uri()) + .unwrap() + .build() + .unwrap() + .orgs("hello") + .get() + .await; + assert_eq!(result.is_err(), true); + } } diff --git a/src/service/middleware/retry.rs b/src/service/middleware/retry.rs index 77113fc4..6c7c2525 100644 --- a/src/service/middleware/retry.rs +++ b/src/service/middleware/retry.rs @@ -1,48 +1,120 @@ -use futures_util::future; -use http::{Request, Response}; +use std::time::{Duration, SystemTime, UNIX_EPOCH}; + +use http::{HeaderMap, Request, Response, StatusCode}; use hyper::{Body, Error}; use tower::retry::Policy; +/// The factor by which to increase the fallback retry delay. This is only +/// used when the fallback delay is needed (ie. when a delay couldn't be found +/// in the response). +const RETRY_SCALE_FACTOR: f64 = 2.0; + #[derive(Clone)] pub enum RetryConfig { None, Simple(usize), + /// Retries a request factoring in delays specified in response headers. + /// If none can be found, falls back to exponential backoff. + /// https://docs.github.com/en/rest/overview/resources-in-the-rest-api?apiVersion=2022-11-28#rate-limiting + ResponseOrExponentialBackoff { + /// The delay to use if no delay can be determined from the response. + fallback_delay: Duration, + /// The max delay to allow in any situation. Github resets rate limits + /// hourly, so that's effectively the worst case delay. Instead of + /// waiting that long, opt to fail faster by capping delays to this + /// value. + max_delay: Duration, + /// Maximum attempts to make for a request. + count: usize, + }, } impl Policy, Response, hyper::Error> for RetryConfig { - type Future = futures_util::future::Ready; + type Future = tokio::time::Sleep; fn retry( - &self, - _req: &Request, - result: Result<&Response, &Error>, + &mut self, + _req: &mut Request, + result: &mut Result, Error>, ) -> Option { + let mut delay = Duration::ZERO; match self { - RetryConfig::None => None, - RetryConfig::Simple(count) => match result { - Ok(response) => { - if response.status().is_server_error() || response.status() == 429 { - if *count > 0 { - Some(future::ready(RetryConfig::Simple(count - 1))) - } else { - None + RetryConfig::None => { + return None; + } + RetryConfig::Simple(count) => { + if *count <= 0 { + return None; + } + if let Ok(response) = result { + let status = response.status(); + // Oddly, Github sends back 403 status codes for rate limit + // errors. The presence of rate limit headers will help us + // distinguish between rate limit errors vs. other bad + // requests. + if !(status.is_server_error() + || status == StatusCode::TOO_MANY_REQUESTS + || status == StatusCode::BAD_REQUEST) + { + return None; + } + if status == StatusCode::BAD_REQUEST { + if let None = determine_next_delay(response.headers()) { + return None; } - } else { - None } } - Err(_) => { - if *count > 0 { - Some(future::ready(RetryConfig::Simple(count - 1))) - } else { - None + *count -= 1; + } + RetryConfig::ResponseOrExponentialBackoff { + fallback_delay, + max_delay, + count, + } => { + if *count <= 0 { + return None; + } + let mut next_fallback_delay = *fallback_delay; + match result { + Ok(response) => { + let status = response.status(); + // Oddly, Github sends back 403 status codes for rate limit + // errors. The presence of rate limit headers will help us + // distinguish between rate limit errors vs. other bad + // requests. + if !(status.is_server_error() + || status == StatusCode::TOO_MANY_REQUESTS + || status == StatusCode::BAD_REQUEST) + { + return None; + } + let delay_response = determine_next_delay(response.headers()); + if status == StatusCode::BAD_REQUEST && delay_response.is_none() { + return None; + } + delay = match delay_response { + Some(d) => Duration::from_secs_f64(d), + None => { + next_fallback_delay = + next_fallback_delay.mul_f64(RETRY_SCALE_FACTOR); + *fallback_delay + } + }; + } + Err(_) => { + next_fallback_delay = next_fallback_delay.mul_f64(RETRY_SCALE_FACTOR); + delay = *fallback_delay; } } - }, + delay = delay.min(*max_delay); + *fallback_delay = next_fallback_delay; + *count -= 1; + } } + Some(tokio::time::sleep(delay)) } - fn clone_request(&self, req: &Request) -> Option> { + fn clone_request(&mut self, req: &Request) -> Option> { match self { RetryConfig::None => None, _ => { @@ -65,3 +137,212 @@ impl Policy, Response, hyper::Error> for RetryConfi } } } + +fn determine_next_delay(headers: &HeaderMap) -> Option { + // retry-after is a duration + if let Some(retry_after) = headers.get("retry-after") { + retry_after + .to_str() + .ok() + .and_then(|s| s.parse::().ok()) + } else if let Some(reset) = headers.get("x-ratelimit-reset") { + // x-ratelimit-reset is the Unix timestamp when the rate limit will reset. + reset + .to_str() + .ok() + .and_then(|s| s.parse().ok()) + .and_then(|f| { + (UNIX_EPOCH + Duration::from_secs_f64(f)) + .duration_since(SystemTime::now()) + .ok() + .and_then(|d| Some(d.as_secs_f64())) + }) + } else { + None + } +} + +#[cfg(test)] +mod tests { + use std::time::Duration; + + use http::{Request, Response, StatusCode}; + use tower::retry::Policy; + + use super::RetryConfig; + + fn simple_retry_when_no_attempts_left() { + let mut retry = RetryConfig::Simple(0); + let mut resp = Response::builder() + .status(StatusCode::INTERNAL_SERVER_ERROR) + .body("".into()) + .unwrap(); + let next = retry.retry(&mut Request::new("".into()), &mut Ok(resp)); + assert!(next.is_none()); + assert!(matches!(retry, RetryConfig::Simple(0))); + } + + fn simple_retry_when_attempts_left() { + let mut retry = RetryConfig::Simple(3); + let mut resp = Response::builder() + .status(StatusCode::INTERNAL_SERVER_ERROR) + .body("".into()) + .unwrap(); + let next = retry.retry(&mut Request::new("".into()), &mut Ok(resp)); + assert!(next.is_some()); + assert!(matches!(retry, RetryConfig::Simple(2))); + + let mut retry = RetryConfig::Simple(3); + resp = Response::builder() + .status(StatusCode::TOO_MANY_REQUESTS) + .body("".into()) + .unwrap(); + let next = retry.retry(&mut Request::new("".into()), &mut Ok(resp)); + assert!(next.is_some()); + assert!(matches!(retry, RetryConfig::Simple(2))); + + let mut retry = RetryConfig::Simple(3); + resp = Response::builder() + .status(StatusCode::BAD_REQUEST) + .header("retry-after", "1") + .body("".into()) + .unwrap(); + let next = retry.retry(&mut Request::new("".into()), &mut Ok(resp)); + assert!(next.is_some()); + assert!(matches!(retry, RetryConfig::Simple(2))); + } + + fn response_or_backoff_when_no_attempts_left() { + let mut retry = RetryConfig::ResponseOrExponentialBackoff { + fallback_delay: Duration::ZERO, + max_delay: Duration::ZERO, + count: 0, + }; + let mut resp = Response::builder() + .status(StatusCode::INTERNAL_SERVER_ERROR) + .body("".into()) + .unwrap(); + let next = retry.retry(&mut Request::new("".into()), &mut Ok(resp)); + assert!(next.is_none()); + assert!(matches!( + retry, + RetryConfig::ResponseOrExponentialBackoff { + fallback_delay: Duration::ZERO, + max_delay: Duration::ZERO, + count: 0, + } + )); + } + + fn response_or_backoff_when_attempts_left_using_fallbacks() { + let mut retry = RetryConfig::ResponseOrExponentialBackoff { + fallback_delay: Duration::from_micros(1), + max_delay: Duration::from_micros(10), + count: 3, + }; + let mut resp = Response::builder() + .status(StatusCode::INTERNAL_SERVER_ERROR) + .body("".into()) + .unwrap(); + let next = retry.retry(&mut Request::new("".into()), &mut Ok(resp)); + assert!(next.is_some()); + let expect_fallback: Duration = Duration::from_micros(2); + assert!(matches!( + retry, + RetryConfig::ResponseOrExponentialBackoff { + fallback_delay: expect_fallback, + count: 2, + .. + } + )); + } + + fn response_or_backoff_when_attempts_left_using_fallbacks_capped_delay() { + let mut retry = RetryConfig::ResponseOrExponentialBackoff { + fallback_delay: Duration::from_micros(20), + max_delay: Duration::from_micros(10), + count: 3, + }; + let mut resp = Response::builder() + .status(StatusCode::TOO_MANY_REQUESTS) + .body("".into()) + .unwrap(); + let next = retry.retry(&mut Request::new("".into()), &mut Ok(resp)); + assert!(next.is_some()); + let expect_fallback = Duration::from_micros(10); + assert!(matches!( + retry, + RetryConfig::ResponseOrExponentialBackoff { + fallback_delay: expect_fallback, + count: 2, + .. + } + )); + } + + fn response_or_backoff_when_attempts_left_using_response_headers() { + let mut retry = RetryConfig::ResponseOrExponentialBackoff { + fallback_delay: Duration::from_micros(1), + max_delay: Duration::from_micros(10), + count: 3, + }; + let mut resp = Response::builder() + .status(StatusCode::TOO_MANY_REQUESTS) + .header("retry-after", "1") + .body("".into()) + .unwrap(); + let next = retry.retry(&mut Request::new("".into()), &mut Ok(resp)); + assert!(next.is_some()); + let expect_fallback = Duration::from_micros(1); + assert!(matches!( + retry, + RetryConfig::ResponseOrExponentialBackoff { + fallback_delay: expect_fallback, + count: 2, + .. + } + )); + } + + fn determine_rate_limit_when_no_delay_is_given() { + let headers = http::HeaderMap::new(); + let delay = super::determine_next_delay(&headers); + assert_eq!(delay, None); + } + + fn determine_rate_limit_when_retry_after_is_given() { + let mut headers = http::HeaderMap::new(); + headers.insert("retry-after", "1".parse().unwrap()); + let delay = super::determine_next_delay(&headers); + assert_eq!(delay, Some(1.0)); + } + + fn determine_rate_limit_when_retry_after_is_invalid() { + let mut headers = http::HeaderMap::new(); + headers.insert("retry-after", "invalid".parse().unwrap()); + let delay = super::determine_next_delay(&headers); + assert_eq!(delay, None); + } + + fn determine_rate_limit_when_rate_limit_reset_is_given() { + use std::time::{SystemTime, UNIX_EPOCH}; + + let mut headers = http::HeaderMap::new(); + let d = (SystemTime::now() + Duration::from_secs(2)) + .duration_since(UNIX_EPOCH) + .unwrap(); + headers.insert( + "x-ratelimit-reset", + d.as_secs().to_string().parse().unwrap(), + ); + let delay = super::determine_next_delay(&headers); + assert_eq!(delay, Some(2.0)); + } + + fn determine_rate_limit_when_rate_limit_reset_is_invalid() { + let mut headers = http::HeaderMap::new(); + headers.insert("x-ratelimit-reset", "invalid".parse().unwrap()); + let delay = super::determine_next_delay(&headers); + assert_eq!(delay, None); + } +}