From 419cd58909700a0b20c576b81f9288269bc45343 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tom=C3=A1s=20Far=C3=ADas=20Santana?= Date: Fri, 26 Apr 2024 14:53:46 +0200 Subject: [PATCH] feat: Track response in error details --- hook-worker/src/error.rs | 98 ++++++++++++++++++++++++-- hook-worker/src/worker.rs | 144 +++++++++++++++++++++++++------------- 2 files changed, 187 insertions(+), 55 deletions(-) diff --git a/hook-worker/src/error.rs b/hook-worker/src/error.rs index 914ffb1..b65f0e0 100644 --- a/hook-worker/src/error.rs +++ b/hook-worker/src/error.rs @@ -1,24 +1,112 @@ +use std::fmt; use std::time; -use hook_common::pgqueue; +use hook_common::{pgqueue, webhook::WebhookJobError}; use thiserror::Error; -/// Enumeration of errors related to webhook job processing in the WebhookWorker. +/// Enumeration of error classes handled by `WebhookWorker`. #[derive(Error, Debug)] pub enum WebhookError { + #[error(transparent)] + Parse(#[from] WebhookParseError), + #[error(transparent)] + Request(#[from] WebhookRequestError), +} + +/// Enumeration of parsing errors that can occur as `WebhookWorker` sets up a webhook. +#[derive(Error, Debug)] +pub enum WebhookParseError { #[error("{0} is not a valid HttpMethod")] ParseHttpMethodError(String), #[error("error parsing webhook headers")] ParseHeadersError(http::Error), #[error("error parsing webhook url")] ParseUrlError(url::ParseError), - #[error("a webhook could not be delivered but it could be retried later: {error}")] +} + +/// Enumeration of request errors that can occur as `WebhookWorker` sends a request. +#[derive(Error, Debug)] +pub enum WebhookRequestError { RetryableRequestError { error: reqwest::Error, + response: Option, retry_after: Option, }, - #[error("a webhook could not be delivered and it cannot be retried further: {0}")] - NonRetryableRetryableRequestError(reqwest::Error), + NonRetryableRetryableRequestError { + error: reqwest::Error, + response: Option, + }, +} + +/// Implement display of `WebhookRequestError` by appending to the underlying `reqwest::Error` +/// any response message if available. +impl fmt::Display for WebhookRequestError { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + match self { + WebhookRequestError::RetryableRequestError { + error, response, .. + } + | WebhookRequestError::NonRetryableRetryableRequestError { error, response } => { + let response_message = match response { + Some(m) => format!("{}", m), + None => "No response from the server".to_string(), + }; + writeln!(f, "{}", error)?; + write!(f, "{}", response_message)?; + + Ok(()) + } + } + } +} + +/// Implementation of `WebhookRequestError` designed to further describe the error. +/// In particular, we pass some calls to underyling `reqwest::Error` to provide more details. +impl WebhookRequestError { + pub fn is_timeout(&self) -> bool { + match self { + WebhookRequestError::RetryableRequestError { error, .. } + | WebhookRequestError::NonRetryableRetryableRequestError { error, .. } => { + error.is_timeout() + } + } + } + + pub fn is_status(&self) -> bool { + match self { + WebhookRequestError::RetryableRequestError { error, .. } + | WebhookRequestError::NonRetryableRetryableRequestError { error, .. } => { + error.is_status() + } + } + } + + pub fn status(&self) -> Option { + match self { + WebhookRequestError::RetryableRequestError { error, .. } + | WebhookRequestError::NonRetryableRetryableRequestError { error, .. } => { + error.status() + } + } + } +} + +impl From<&WebhookRequestError> for WebhookJobError { + fn from(error: &WebhookRequestError) -> Self { + if error.is_timeout() { + WebhookJobError::new_timeout(&error.to_string()) + } else if error.is_status() { + WebhookJobError::new_http_status( + error.status().expect("status code is defined").into(), + &error.to_string(), + ) + } else { + // Catch all other errors as `app_metrics::ErrorType::Connection` errors. + // Not all of `reqwest::Error` may strictly be connection errors, so our supported error types may need an extension + // depending on how strict error reporting has to be. + WebhookJobError::new_connection(&error.to_string()) + } + } } /// Enumeration of errors related to initialization and consumption of webhook jobs. diff --git a/hook-worker/src/worker.rs b/hook-worker/src/worker.rs index 5965d26..6a21057 100644 --- a/hook-worker/src/worker.rs +++ b/hook-worker/src/worker.rs @@ -18,7 +18,7 @@ use reqwest::header; use tokio::sync; use tracing::error; -use crate::error::{WebhookError, WorkerError}; +use crate::error::{WebhookError, WebhookParseError, WebhookRequestError, WorkerError}; /// A WebhookJob is any `PgQueueJob` with `WebhookJobParameters` and `WebhookJobMetadata`. trait WebhookJob: PgQueueJob + std::marker::Send { @@ -259,7 +259,7 @@ async fn process_webhook_job( Ok(()) } - Err(WebhookError::ParseHeadersError(e)) => { + Err(WebhookError::Parse(WebhookParseError::ParseHeadersError(e))) => { webhook_job .fail(WebhookJobError::new_parse(&e.to_string())) .await @@ -272,7 +272,7 @@ async fn process_webhook_job( Ok(()) } - Err(WebhookError::ParseHttpMethodError(e)) => { + Err(WebhookError::Parse(WebhookParseError::ParseHttpMethodError(e))) => { webhook_job .fail(WebhookJobError::new_parse(&e)) .await @@ -285,7 +285,7 @@ async fn process_webhook_job( Ok(()) } - Err(WebhookError::ParseUrlError(e)) => { + Err(WebhookError::Parse(WebhookParseError::ParseUrlError(e))) => { webhook_job .fail(WebhookJobError::new_parse(&e.to_string())) .await @@ -298,26 +298,53 @@ async fn process_webhook_job( Ok(()) } - Err(WebhookError::RetryableRequestError { error, retry_after }) => { - let retry_interval = - retry_policy.retry_interval(webhook_job.attempt() as u32, retry_after); - let current_queue = webhook_job.queue(); - let retry_queue = retry_policy.retry_queue(¤t_queue); - - match webhook_job - .retry(WebhookJobError::from(&error), retry_interval, retry_queue) - .await - { - Ok(_) => { - metrics::counter!("webhook_jobs_retried", &labels).increment(1); - - Ok(()) + Err(WebhookError::Request(request_error)) => { + let webhook_job_error = WebhookJobError::from(&request_error); + + match request_error { + WebhookRequestError::RetryableRequestError { + error, retry_after, .. + } => { + let retry_interval = + retry_policy.retry_interval(webhook_job.attempt() as u32, retry_after); + let current_queue = webhook_job.queue(); + let retry_queue = retry_policy.retry_queue(¤t_queue); + + match webhook_job + .retry(webhook_job_error, retry_interval, retry_queue) + .await + { + Ok(_) => { + metrics::counter!("webhook_jobs_retried", &labels).increment(1); + + Ok(()) + } + Err(RetryError::RetryInvalidError(RetryInvalidError { + job: webhook_job, + .. + })) => { + webhook_job + .fail(WebhookJobError::from(&error)) + .await + .map_err(|job_error| { + metrics::counter!("webhook_jobs_database_error", &labels) + .increment(1); + job_error + })?; + + metrics::counter!("webhook_jobs_failed", &labels).increment(1); + + Ok(()) + } + Err(RetryError::DatabaseError(job_error)) => { + metrics::counter!("webhook_jobs_database_error", &labels).increment(1); + Err(WorkerError::from(job_error)) + } + } } - Err(RetryError::RetryInvalidError(RetryInvalidError { - job: webhook_job, .. - })) => { + WebhookRequestError::NonRetryableRetryableRequestError { .. } => { webhook_job - .fail(WebhookJobError::from(&error)) + .fail(webhook_job_error) .await .map_err(|job_error| { metrics::counter!("webhook_jobs_database_error", &labels).increment(1); @@ -328,25 +355,8 @@ async fn process_webhook_job( Ok(()) } - Err(RetryError::DatabaseError(job_error)) => { - metrics::counter!("webhook_jobs_database_error", &labels).increment(1); - Err(WorkerError::from(job_error)) - } } } - Err(WebhookError::NonRetryableRetryableRequestError(error)) => { - webhook_job - .fail(WebhookJobError::from(&error)) - .await - .map_err(|job_error| { - metrics::counter!("webhook_jobs_database_error", &labels).increment(1); - job_error - })?; - - metrics::counter!("webhook_jobs_failed", &labels).increment(1); - - Ok(()) - } } } @@ -367,10 +377,10 @@ async fn send_webhook( body: String, ) -> Result { let method: http::Method = method.into(); - let url: reqwest::Url = (url).parse().map_err(WebhookError::ParseUrlError)?; + let url: reqwest::Url = (url).parse().map_err(WebhookParseError::ParseUrlError)?; let headers: reqwest::header::HeaderMap = (headers) .try_into() - .map_err(WebhookError::ParseHeadersError)?; + .map_err(WebhookParseError::ParseHeadersError)?; let body = reqwest::Body::from(body); let response = client @@ -379,26 +389,35 @@ async fn send_webhook( .body(body) .send() .await - .map_err(|e| WebhookError::RetryableRequestError { + .map_err(|e| WebhookRequestError::RetryableRequestError { error: e, + response: None, retry_after: None, })?; let retry_after = parse_retry_after_header(response.headers()); - match response.error_for_status() { - Ok(response) => Ok(response), + match response.error_for_status_ref() { + Ok(_) => Ok(response), Err(err) => { if is_retryable_status( err.status() .expect("status code is set as error is generated from a response"), ) { - Err(WebhookError::RetryableRequestError { - error: err, - retry_after, - }) + Err(WebhookError::Request( + WebhookRequestError::RetryableRequestError { + error: err, + response: response.text().await.ok(), + retry_after, + }, + )) } else { - Err(WebhookError::NonRetryableRetryableRequestError(err)) + Err(WebhookError::Request( + WebhookRequestError::NonRetryableRetryableRequestError { + error: err, + response: response.text().await.ok(), + }, + )) } } } @@ -574,7 +593,7 @@ mod tests { } #[sqlx::test(migrations = "../migrations")] - async fn test_send_webhook(_: PgPool) { + async fn test_send_webhook(_pg: PgPool) { let method = HttpMethod::POST; let url = "http://localhost:18081/echo"; let headers = collections::HashMap::new(); @@ -591,4 +610,29 @@ mod tests { body.to_owned(), ); } + + #[sqlx::test(migrations = "../migrations")] + async fn test_error_message_contains_response_body(_pg: PgPool) { + let method = HttpMethod::POST; + let url = "http://localhost:18081/fail"; + let headers = collections::HashMap::new(); + let body = "this is an error message"; + let client = reqwest::Client::new(); + + let err = send_webhook(client, &method, url, &headers, body.to_owned()) + .await + .err() + .expect("request didn't fail when it should have failed"); + + assert!(matches!(err, WebhookError::Request(..))); + if let WebhookError::Request(request_error) = err { + assert_eq!(request_error.status(), Some(StatusCode::BAD_REQUEST)); + assert!(request_error.to_string().contains(body)); + // This is the display implementation of reqwest. Just checking it is still there. + // See: https://github.com/seanmonstar/reqwest/blob/master/src/error.rs + assert!(request_error.to_string().contains( + "HTTP status client error (400 Bad Request) for url (http://localhost:18081/fail)" + )); + } + } }