Skip to content
This repository has been archived by the owner on Jun 21, 2024. It is now read-only.

Commit

Permalink
feat: Track response in error details
Browse files Browse the repository at this point in the history
  • Loading branch information
tomasfarias committed Apr 26, 2024
1 parent 26a67e9 commit 419cd58
Show file tree
Hide file tree
Showing 2 changed files with 187 additions and 55 deletions.
98 changes: 93 additions & 5 deletions hook-worker/src/error.rs
Original file line number Diff line number Diff line change
@@ -1,24 +1,112 @@
use std::fmt;
use std::time;

use hook_common::pgqueue;
use hook_common::{pgqueue, webhook::WebhookJobError};
use thiserror::Error;

/// Enumeration of errors related to webhook job processing in the WebhookWorker.
/// Enumeration of error classes handled by `WebhookWorker`.
#[derive(Error, Debug)]
pub enum WebhookError {
#[error(transparent)]
Parse(#[from] WebhookParseError),
#[error(transparent)]
Request(#[from] WebhookRequestError),
}

/// Enumeration of parsing errors that can occur as `WebhookWorker` sets up a webhook.
#[derive(Error, Debug)]
pub enum WebhookParseError {
#[error("{0} is not a valid HttpMethod")]
ParseHttpMethodError(String),
#[error("error parsing webhook headers")]
ParseHeadersError(http::Error),
#[error("error parsing webhook url")]
ParseUrlError(url::ParseError),
#[error("a webhook could not be delivered but it could be retried later: {error}")]
}

/// Enumeration of request errors that can occur as `WebhookWorker` sends a request.
#[derive(Error, Debug)]
pub enum WebhookRequestError {
RetryableRequestError {
error: reqwest::Error,
response: Option<String>,
retry_after: Option<time::Duration>,
},
#[error("a webhook could not be delivered and it cannot be retried further: {0}")]
NonRetryableRetryableRequestError(reqwest::Error),
NonRetryableRetryableRequestError {
error: reqwest::Error,
response: Option<String>,
},
}

/// Implement display of `WebhookRequestError` by appending to the underlying `reqwest::Error`
/// any response message if available.
impl fmt::Display for WebhookRequestError {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
WebhookRequestError::RetryableRequestError {
error, response, ..
}
| WebhookRequestError::NonRetryableRetryableRequestError { error, response } => {
let response_message = match response {
Some(m) => format!("{}", m),
None => "No response from the server".to_string(),
};
writeln!(f, "{}", error)?;
write!(f, "{}", response_message)?;

Ok(())
}
}
}
}

/// Implementation of `WebhookRequestError` designed to further describe the error.
/// In particular, we pass some calls to underyling `reqwest::Error` to provide more details.
impl WebhookRequestError {
pub fn is_timeout(&self) -> bool {
match self {
WebhookRequestError::RetryableRequestError { error, .. }
| WebhookRequestError::NonRetryableRetryableRequestError { error, .. } => {
error.is_timeout()
}
}
}

pub fn is_status(&self) -> bool {
match self {
WebhookRequestError::RetryableRequestError { error, .. }
| WebhookRequestError::NonRetryableRetryableRequestError { error, .. } => {
error.is_status()
}
}
}

pub fn status(&self) -> Option<http::StatusCode> {
match self {
WebhookRequestError::RetryableRequestError { error, .. }
| WebhookRequestError::NonRetryableRetryableRequestError { error, .. } => {
error.status()
}
}
}
}

impl From<&WebhookRequestError> for WebhookJobError {
fn from(error: &WebhookRequestError) -> Self {
if error.is_timeout() {
WebhookJobError::new_timeout(&error.to_string())
} else if error.is_status() {
WebhookJobError::new_http_status(
error.status().expect("status code is defined").into(),
&error.to_string(),
)
} else {
// Catch all other errors as `app_metrics::ErrorType::Connection` errors.
// Not all of `reqwest::Error` may strictly be connection errors, so our supported error types may need an extension
// depending on how strict error reporting has to be.
WebhookJobError::new_connection(&error.to_string())
}
}
}

/// Enumeration of errors related to initialization and consumption of webhook jobs.
Expand Down
144 changes: 94 additions & 50 deletions hook-worker/src/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ use reqwest::header;
use tokio::sync;
use tracing::error;

use crate::error::{WebhookError, WorkerError};
use crate::error::{WebhookError, WebhookParseError, WebhookRequestError, WorkerError};

/// A WebhookJob is any `PgQueueJob` with `WebhookJobParameters` and `WebhookJobMetadata`.
trait WebhookJob: PgQueueJob + std::marker::Send {
Expand Down Expand Up @@ -259,7 +259,7 @@ async fn process_webhook_job<W: WebhookJob>(

Ok(())
}
Err(WebhookError::ParseHeadersError(e)) => {
Err(WebhookError::Parse(WebhookParseError::ParseHeadersError(e))) => {
webhook_job
.fail(WebhookJobError::new_parse(&e.to_string()))
.await
Expand All @@ -272,7 +272,7 @@ async fn process_webhook_job<W: WebhookJob>(

Ok(())
}
Err(WebhookError::ParseHttpMethodError(e)) => {
Err(WebhookError::Parse(WebhookParseError::ParseHttpMethodError(e))) => {
webhook_job
.fail(WebhookJobError::new_parse(&e))
.await
Expand All @@ -285,7 +285,7 @@ async fn process_webhook_job<W: WebhookJob>(

Ok(())
}
Err(WebhookError::ParseUrlError(e)) => {
Err(WebhookError::Parse(WebhookParseError::ParseUrlError(e))) => {
webhook_job
.fail(WebhookJobError::new_parse(&e.to_string()))
.await
Expand All @@ -298,26 +298,53 @@ async fn process_webhook_job<W: WebhookJob>(

Ok(())
}
Err(WebhookError::RetryableRequestError { error, retry_after }) => {
let retry_interval =
retry_policy.retry_interval(webhook_job.attempt() as u32, retry_after);
let current_queue = webhook_job.queue();
let retry_queue = retry_policy.retry_queue(&current_queue);

match webhook_job
.retry(WebhookJobError::from(&error), retry_interval, retry_queue)
.await
{
Ok(_) => {
metrics::counter!("webhook_jobs_retried", &labels).increment(1);

Ok(())
Err(WebhookError::Request(request_error)) => {
let webhook_job_error = WebhookJobError::from(&request_error);

match request_error {
WebhookRequestError::RetryableRequestError {
error, retry_after, ..
} => {
let retry_interval =
retry_policy.retry_interval(webhook_job.attempt() as u32, retry_after);
let current_queue = webhook_job.queue();
let retry_queue = retry_policy.retry_queue(&current_queue);

match webhook_job
.retry(webhook_job_error, retry_interval, retry_queue)
.await
{
Ok(_) => {
metrics::counter!("webhook_jobs_retried", &labels).increment(1);

Ok(())
}
Err(RetryError::RetryInvalidError(RetryInvalidError {
job: webhook_job,
..
})) => {
webhook_job
.fail(WebhookJobError::from(&error))
.await
.map_err(|job_error| {
metrics::counter!("webhook_jobs_database_error", &labels)
.increment(1);
job_error
})?;

metrics::counter!("webhook_jobs_failed", &labels).increment(1);

Ok(())
}
Err(RetryError::DatabaseError(job_error)) => {
metrics::counter!("webhook_jobs_database_error", &labels).increment(1);
Err(WorkerError::from(job_error))
}
}
}
Err(RetryError::RetryInvalidError(RetryInvalidError {
job: webhook_job, ..
})) => {
WebhookRequestError::NonRetryableRetryableRequestError { .. } => {
webhook_job
.fail(WebhookJobError::from(&error))
.fail(webhook_job_error)
.await
.map_err(|job_error| {
metrics::counter!("webhook_jobs_database_error", &labels).increment(1);
Expand All @@ -328,25 +355,8 @@ async fn process_webhook_job<W: WebhookJob>(

Ok(())
}
Err(RetryError::DatabaseError(job_error)) => {
metrics::counter!("webhook_jobs_database_error", &labels).increment(1);
Err(WorkerError::from(job_error))
}
}
}
Err(WebhookError::NonRetryableRetryableRequestError(error)) => {
webhook_job
.fail(WebhookJobError::from(&error))
.await
.map_err(|job_error| {
metrics::counter!("webhook_jobs_database_error", &labels).increment(1);
job_error
})?;

metrics::counter!("webhook_jobs_failed", &labels).increment(1);

Ok(())
}
}
}

Expand All @@ -367,10 +377,10 @@ async fn send_webhook(
body: String,
) -> Result<reqwest::Response, WebhookError> {
let method: http::Method = method.into();
let url: reqwest::Url = (url).parse().map_err(WebhookError::ParseUrlError)?;
let url: reqwest::Url = (url).parse().map_err(WebhookParseError::ParseUrlError)?;
let headers: reqwest::header::HeaderMap = (headers)
.try_into()
.map_err(WebhookError::ParseHeadersError)?;
.map_err(WebhookParseError::ParseHeadersError)?;
let body = reqwest::Body::from(body);

let response = client
Expand All @@ -379,26 +389,35 @@ async fn send_webhook(
.body(body)
.send()
.await
.map_err(|e| WebhookError::RetryableRequestError {
.map_err(|e| WebhookRequestError::RetryableRequestError {
error: e,
response: None,
retry_after: None,
})?;

let retry_after = parse_retry_after_header(response.headers());

match response.error_for_status() {
Ok(response) => Ok(response),
match response.error_for_status_ref() {
Ok(_) => Ok(response),
Err(err) => {
if is_retryable_status(
err.status()
.expect("status code is set as error is generated from a response"),
) {
Err(WebhookError::RetryableRequestError {
error: err,
retry_after,
})
Err(WebhookError::Request(
WebhookRequestError::RetryableRequestError {
error: err,
response: response.text().await.ok(),
retry_after,
},
))
} else {
Err(WebhookError::NonRetryableRetryableRequestError(err))
Err(WebhookError::Request(
WebhookRequestError::NonRetryableRetryableRequestError {
error: err,
response: response.text().await.ok(),
},
))
}
}
}
Expand Down Expand Up @@ -574,7 +593,7 @@ mod tests {
}

#[sqlx::test(migrations = "../migrations")]
async fn test_send_webhook(_: PgPool) {
async fn test_send_webhook(_pg: PgPool) {
let method = HttpMethod::POST;
let url = "http://localhost:18081/echo";
let headers = collections::HashMap::new();
Expand All @@ -591,4 +610,29 @@ mod tests {
body.to_owned(),
);
}

#[sqlx::test(migrations = "../migrations")]
async fn test_error_message_contains_response_body(_pg: PgPool) {
let method = HttpMethod::POST;
let url = "http://localhost:18081/fail";
let headers = collections::HashMap::new();
let body = "this is an error message";
let client = reqwest::Client::new();

let err = send_webhook(client, &method, url, &headers, body.to_owned())
.await
.err()
.expect("request didn't fail when it should have failed");

assert!(matches!(err, WebhookError::Request(..)));
if let WebhookError::Request(request_error) = err {
assert_eq!(request_error.status(), Some(StatusCode::BAD_REQUEST));
assert!(request_error.to_string().contains(body));
// This is the display implementation of reqwest. Just checking it is still there.
// See: https://github.com/seanmonstar/reqwest/blob/master/src/error.rs
assert!(request_error.to_string().contains(
"HTTP status client error (400 Bad Request) for url (http://localhost:18081/fail)"
));
}
}
}

0 comments on commit 419cd58

Please sign in to comment.