From f7e02cc92c68de49227e6b5217a1d120a70dfc5a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tom=C3=A1s=20Far=C3=ADas=20Santana?= Date: Fri, 3 May 2024 10:34:51 +0200 Subject: [PATCH] feat: Track response in error details (#29) Co-authored-by: Brett Hoerner --- Cargo.lock | 2 + Cargo.toml | 2 +- hook-worker/src/error.rs | 111 ++++++++++++++++++++++-- hook-worker/src/lib.rs | 1 + hook-worker/src/util.rs | 35 ++++++++ hook-worker/src/worker.rs | 175 +++++++++++++++++++++++++++----------- 6 files changed, 270 insertions(+), 56 deletions(-) create mode 100644 hook-worker/src/util.rs diff --git a/Cargo.lock b/Cargo.lock index 9d7fc97..7cc582e 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2343,10 +2343,12 @@ dependencies = [ "system-configuration", "tokio", "tokio-native-tls", + "tokio-util", "tower-service", "url", "wasm-bindgen", "wasm-bindgen-futures", + "wasm-streams", "web-sys", "winreg 0.52.0", ] diff --git a/Cargo.toml b/Cargo.toml index d34cd0a..d1f7527 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -46,7 +46,7 @@ metrics = "0.22.0" metrics-exporter-prometheus = "0.14.0" rand = "0.8.5" rdkafka = { version = "0.36.0", features = ["cmake-build", "ssl", "tracing"] } -reqwest = { version = "0.12.3" } +reqwest = { version = "0.12.3", features = ["stream"] } serde = { version = "1.0", features = ["derive"] } serde_derive = { version = "1.0" } serde_json = { version = "1.0" } diff --git a/hook-worker/src/error.rs b/hook-worker/src/error.rs index 914ffb1..48468bc 100644 --- a/hook-worker/src/error.rs +++ b/hook-worker/src/error.rs @@ -1,24 +1,125 @@ +use std::fmt; use std::time; -use hook_common::pgqueue; +use hook_common::{pgqueue, webhook::WebhookJobError}; use thiserror::Error; -/// Enumeration of errors related to webhook job processing in the WebhookWorker. +/// Enumeration of error classes handled by `WebhookWorker`. #[derive(Error, Debug)] pub enum WebhookError { + #[error(transparent)] + Parse(#[from] WebhookParseError), + #[error(transparent)] + Request(#[from] WebhookRequestError), +} + +/// Enumeration of parsing errors that can occur as `WebhookWorker` sets up a webhook. +#[derive(Error, Debug)] +pub enum WebhookParseError { #[error("{0} is not a valid HttpMethod")] ParseHttpMethodError(String), #[error("error parsing webhook headers")] ParseHeadersError(http::Error), #[error("error parsing webhook url")] ParseUrlError(url::ParseError), - #[error("a webhook could not be delivered but it could be retried later: {error}")] +} + +/// Enumeration of request errors that can occur as `WebhookWorker` sends a request. +#[derive(Error, Debug)] +pub enum WebhookRequestError { RetryableRequestError { error: reqwest::Error, + response: Option, retry_after: Option, }, - #[error("a webhook could not be delivered and it cannot be retried further: {0}")] - NonRetryableRetryableRequestError(reqwest::Error), + NonRetryableRetryableRequestError { + error: reqwest::Error, + response: Option, + }, +} + +/// Enumeration of errors that can occur while handling a `reqwest::Response`. +/// Currently, not consumed anywhere. Grouped here to support a common error type for +/// `utils::first_n_bytes_of_response`. +#[derive(Error, Debug)] +pub enum WebhookResponseError { + #[error("failed to parse a response as UTF8")] + ParseUTF8StringError(#[from] std::str::Utf8Error), + #[error("error while iterating over response body chunks")] + StreamIterationError(#[from] reqwest::Error), + #[error("attempted to slice a chunk of length {0} with an out of bounds index of {1}")] + ChunkOutOfBoundsError(usize, usize), +} + +/// Implement display of `WebhookRequestError` by appending to the underlying `reqwest::Error` +/// any response message if available. +impl fmt::Display for WebhookRequestError { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + match self { + WebhookRequestError::RetryableRequestError { + error, response, .. + } + | WebhookRequestError::NonRetryableRetryableRequestError { error, response } => { + let response_message = match response { + Some(m) => m.to_string(), + None => "No response from the server".to_string(), + }; + writeln!(f, "{}", error)?; + write!(f, "{}", response_message)?; + + Ok(()) + } + } + } +} + +/// Implementation of `WebhookRequestError` designed to further describe the error. +/// In particular, we pass some calls to underyling `reqwest::Error` to provide more details. +impl WebhookRequestError { + pub fn is_timeout(&self) -> bool { + match self { + WebhookRequestError::RetryableRequestError { error, .. } + | WebhookRequestError::NonRetryableRetryableRequestError { error, .. } => { + error.is_timeout() + } + } + } + + pub fn is_status(&self) -> bool { + match self { + WebhookRequestError::RetryableRequestError { error, .. } + | WebhookRequestError::NonRetryableRetryableRequestError { error, .. } => { + error.is_status() + } + } + } + + pub fn status(&self) -> Option { + match self { + WebhookRequestError::RetryableRequestError { error, .. } + | WebhookRequestError::NonRetryableRetryableRequestError { error, .. } => { + error.status() + } + } + } +} + +impl From<&WebhookRequestError> for WebhookJobError { + fn from(error: &WebhookRequestError) -> Self { + if error.is_timeout() { + WebhookJobError::new_timeout(&error.to_string()) + } else if error.is_status() { + WebhookJobError::new_http_status( + error.status().expect("status code is defined").into(), + &error.to_string(), + ) + } else { + // Catch all other errors as `app_metrics::ErrorType::Connection` errors. + // Not all of `reqwest::Error` may strictly be connection errors, so our supported error types may need an extension + // depending on how strict error reporting has to be. + WebhookJobError::new_connection(&error.to_string()) + } + } } /// Enumeration of errors related to initialization and consumption of webhook jobs. diff --git a/hook-worker/src/lib.rs b/hook-worker/src/lib.rs index 22823c9..8488d15 100644 --- a/hook-worker/src/lib.rs +++ b/hook-worker/src/lib.rs @@ -1,3 +1,4 @@ pub mod config; pub mod error; +pub mod util; pub mod worker; diff --git a/hook-worker/src/util.rs b/hook-worker/src/util.rs new file mode 100644 index 0000000..00c5432 --- /dev/null +++ b/hook-worker/src/util.rs @@ -0,0 +1,35 @@ +use crate::error::WebhookResponseError; +use futures::StreamExt; +use reqwest::Response; + +pub async fn first_n_bytes_of_response( + response: Response, + n: usize, +) -> Result { + let mut body = response.bytes_stream(); + let mut buffer = String::with_capacity(n); + + while let Some(chunk) = body.next().await { + if buffer.len() >= n { + break; + } + + let chunk = chunk?; + let chunk_str = std::str::from_utf8(&chunk)?; + let upper_bound = std::cmp::min(n - buffer.len(), chunk_str.len()); + + if let Some(partial_chunk_str) = chunk_str.get(0..upper_bound) { + buffer.push_str(partial_chunk_str); + } else { + // For whatever reason we are out of bounds. We should never land here + // given the `std::cmp::min` usage, but I am being extra careful by not + // using a slice index that would panic instead. + return Err(WebhookResponseError::ChunkOutOfBoundsError( + chunk_str.len(), + upper_bound, + )); + } + } + + Ok(buffer) +} diff --git a/hook-worker/src/worker.rs b/hook-worker/src/worker.rs index 5965d26..824f1e2 100644 --- a/hook-worker/src/worker.rs +++ b/hook-worker/src/worker.rs @@ -18,7 +18,8 @@ use reqwest::header; use tokio::sync; use tracing::error; -use crate::error::{WebhookError, WorkerError}; +use crate::error::{WebhookError, WebhookParseError, WebhookRequestError, WorkerError}; +use crate::util::first_n_bytes_of_response; /// A WebhookJob is any `PgQueueJob` with `WebhookJobParameters` and `WebhookJobMetadata`. trait WebhookJob: PgQueueJob + std::marker::Send { @@ -259,7 +260,7 @@ async fn process_webhook_job( Ok(()) } - Err(WebhookError::ParseHeadersError(e)) => { + Err(WebhookError::Parse(WebhookParseError::ParseHeadersError(e))) => { webhook_job .fail(WebhookJobError::new_parse(&e.to_string())) .await @@ -272,7 +273,7 @@ async fn process_webhook_job( Ok(()) } - Err(WebhookError::ParseHttpMethodError(e)) => { + Err(WebhookError::Parse(WebhookParseError::ParseHttpMethodError(e))) => { webhook_job .fail(WebhookJobError::new_parse(&e)) .await @@ -285,7 +286,7 @@ async fn process_webhook_job( Ok(()) } - Err(WebhookError::ParseUrlError(e)) => { + Err(WebhookError::Parse(WebhookParseError::ParseUrlError(e))) => { webhook_job .fail(WebhookJobError::new_parse(&e.to_string())) .await @@ -298,26 +299,53 @@ async fn process_webhook_job( Ok(()) } - Err(WebhookError::RetryableRequestError { error, retry_after }) => { - let retry_interval = - retry_policy.retry_interval(webhook_job.attempt() as u32, retry_after); - let current_queue = webhook_job.queue(); - let retry_queue = retry_policy.retry_queue(¤t_queue); - - match webhook_job - .retry(WebhookJobError::from(&error), retry_interval, retry_queue) - .await - { - Ok(_) => { - metrics::counter!("webhook_jobs_retried", &labels).increment(1); - - Ok(()) + Err(WebhookError::Request(request_error)) => { + let webhook_job_error = WebhookJobError::from(&request_error); + + match request_error { + WebhookRequestError::RetryableRequestError { + error, retry_after, .. + } => { + let retry_interval = + retry_policy.retry_interval(webhook_job.attempt() as u32, retry_after); + let current_queue = webhook_job.queue(); + let retry_queue = retry_policy.retry_queue(¤t_queue); + + match webhook_job + .retry(webhook_job_error, retry_interval, retry_queue) + .await + { + Ok(_) => { + metrics::counter!("webhook_jobs_retried", &labels).increment(1); + + Ok(()) + } + Err(RetryError::RetryInvalidError(RetryInvalidError { + job: webhook_job, + .. + })) => { + webhook_job + .fail(WebhookJobError::from(&error)) + .await + .map_err(|job_error| { + metrics::counter!("webhook_jobs_database_error", &labels) + .increment(1); + job_error + })?; + + metrics::counter!("webhook_jobs_failed", &labels).increment(1); + + Ok(()) + } + Err(RetryError::DatabaseError(job_error)) => { + metrics::counter!("webhook_jobs_database_error", &labels).increment(1); + Err(WorkerError::from(job_error)) + } + } } - Err(RetryError::RetryInvalidError(RetryInvalidError { - job: webhook_job, .. - })) => { + WebhookRequestError::NonRetryableRetryableRequestError { .. } => { webhook_job - .fail(WebhookJobError::from(&error)) + .fail(webhook_job_error) .await .map_err(|job_error| { metrics::counter!("webhook_jobs_database_error", &labels).increment(1); @@ -328,25 +356,8 @@ async fn process_webhook_job( Ok(()) } - Err(RetryError::DatabaseError(job_error)) => { - metrics::counter!("webhook_jobs_database_error", &labels).increment(1); - Err(WorkerError::from(job_error)) - } } } - Err(WebhookError::NonRetryableRetryableRequestError(error)) => { - webhook_job - .fail(WebhookJobError::from(&error)) - .await - .map_err(|job_error| { - metrics::counter!("webhook_jobs_database_error", &labels).increment(1); - job_error - })?; - - metrics::counter!("webhook_jobs_failed", &labels).increment(1); - - Ok(()) - } } } @@ -367,10 +378,10 @@ async fn send_webhook( body: String, ) -> Result { let method: http::Method = method.into(); - let url: reqwest::Url = (url).parse().map_err(WebhookError::ParseUrlError)?; + let url: reqwest::Url = (url).parse().map_err(WebhookParseError::ParseUrlError)?; let headers: reqwest::header::HeaderMap = (headers) .try_into() - .map_err(WebhookError::ParseHeadersError)?; + .map_err(WebhookParseError::ParseHeadersError)?; let body = reqwest::Body::from(body); let response = client @@ -379,26 +390,36 @@ async fn send_webhook( .body(body) .send() .await - .map_err(|e| WebhookError::RetryableRequestError { + .map_err(|e| WebhookRequestError::RetryableRequestError { error: e, + response: None, retry_after: None, })?; let retry_after = parse_retry_after_header(response.headers()); - match response.error_for_status() { - Ok(response) => Ok(response), + match response.error_for_status_ref() { + Ok(_) => Ok(response), Err(err) => { if is_retryable_status( err.status() .expect("status code is set as error is generated from a response"), ) { - Err(WebhookError::RetryableRequestError { - error: err, - retry_after, - }) + Err(WebhookError::Request( + WebhookRequestError::RetryableRequestError { + error: err, + // TODO: Make amount of bytes configurable. + response: first_n_bytes_of_response(response, 10 * 1024).await.ok(), + retry_after, + }, + )) } else { - Err(WebhookError::NonRetryableRetryableRequestError(err)) + Err(WebhookError::Request( + WebhookRequestError::NonRetryableRetryableRequestError { + error: err, + response: first_n_bytes_of_response(response, 10 * 1024).await.ok(), + }, + )) } } } @@ -574,7 +595,7 @@ mod tests { } #[sqlx::test(migrations = "../migrations")] - async fn test_send_webhook(_: PgPool) { + async fn test_send_webhook(_pg: PgPool) { let method = HttpMethod::POST; let url = "http://localhost:18081/echo"; let headers = collections::HashMap::new(); @@ -591,4 +612,58 @@ mod tests { body.to_owned(), ); } + + #[sqlx::test(migrations = "../migrations")] + async fn test_error_message_contains_response_body(_pg: PgPool) { + let method = HttpMethod::POST; + let url = "http://localhost:18081/fail"; + let headers = collections::HashMap::new(); + let body = "this is an error message"; + let client = reqwest::Client::new(); + + let err = send_webhook(client, &method, url, &headers, body.to_owned()) + .await + .err() + .expect("request didn't fail when it should have failed"); + + assert!(matches!(err, WebhookError::Request(..))); + if let WebhookError::Request(request_error) = err { + assert_eq!(request_error.status(), Some(StatusCode::BAD_REQUEST)); + assert!(request_error.to_string().contains(body)); + // This is the display implementation of reqwest. Just checking it is still there. + // See: https://github.com/seanmonstar/reqwest/blob/master/src/error.rs + assert!(request_error.to_string().contains( + "HTTP status client error (400 Bad Request) for url (http://localhost:18081/fail)" + )); + } + } + + #[sqlx::test(migrations = "../migrations")] + async fn test_error_message_contains_up_to_n_bytes_of_response_body(_pg: PgPool) { + let method = HttpMethod::POST; + let url = "http://localhost:18081/fail"; + let headers = collections::HashMap::new(); + // This is double the current hardcoded amount of bytes. + // TODO: Make this configurable and change it here too. + let body = (0..20 * 1024).map(|_| "a").collect::>().concat(); + let client = reqwest::Client::new(); + + let err = send_webhook(client, &method, url, &headers, body.to_owned()) + .await + .err() + .expect("request didn't fail when it should have failed"); + + assert!(matches!(err, WebhookError::Request(..))); + if let WebhookError::Request(request_error) = err { + assert_eq!(request_error.status(), Some(StatusCode::BAD_REQUEST)); + assert!(request_error.to_string().contains(&body[0..10 * 1024])); + // The 81 bytes account for the reqwest erorr message as described below. + assert_eq!(request_error.to_string().len(), 10 * 1024 + 81); + // This is the display implementation of reqwest. Just checking it is still there. + // See: https://github.com/seanmonstar/reqwest/blob/master/src/error.rs + assert!(request_error.to_string().contains( + "HTTP status client error (400 Bad Request) for url (http://localhost:18081/fail)" + )); + } + } }