Skip to content
This repository has been archived by the owner on Jun 21, 2024. It is now read-only.

Commit

Permalink
Merge branch 'main' into flag-boilerplate
Browse files Browse the repository at this point in the history
  • Loading branch information
xvello authored May 3, 2024
2 parents 877e4df + e2ce466 commit b5c4ebf
Show file tree
Hide file tree
Showing 9 changed files with 276 additions and 67 deletions.
13 changes: 4 additions & 9 deletions .github/workflows/rust.yml
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,7 @@ jobs:
- uses: actions/checkout@v3

- name: Install rust
uses: dtolnay/rust-toolchain@master
with:
toolchain: stable
uses: dtolnay/[email protected]

- uses: actions/cache@v3
with:
Expand Down Expand Up @@ -52,9 +50,7 @@ jobs:
echo "127.0.0.1 kafka" | sudo tee -a /etc/hosts
- name: Install rust
uses: dtolnay/rust-toolchain@master
with:
toolchain: stable
uses: dtolnay/[email protected]

- uses: actions/cache@v3
with:
Expand All @@ -73,10 +69,9 @@ jobs:
steps:
- uses: actions/checkout@v3

- name: Install latest rust
uses: dtolnay/rust-toolchain@master
- name: Install rust
uses: dtolnay/rust-toolchain@1.77
with:
toolchain: stable
components: clippy,rustfmt

- uses: actions/cache@v3
Expand Down
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ metrics = "0.22.0"
metrics-exporter-prometheus = "0.14.0"
rand = "0.8.5"
rdkafka = { version = "0.36.0", features = ["cmake-build", "ssl", "tracing"] }
reqwest = { version = "0.12.3", features = ["json"] }
reqwest = { version = "0.12.3", features = ["json", "stream"] }
serde = { version = "1.0", features = ["derive"] }
serde_derive = { version = "1.0" }
serde_json = { version = "1.0" }
Expand Down
4 changes: 2 additions & 2 deletions Dockerfile
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
FROM docker.io/lukemathwalker/cargo-chef:latest-rust-1.74.0-buster AS chef
FROM docker.io/lukemathwalker/cargo-chef:latest-rust-1.77-bookworm AS chef
ARG BIN
WORKDIR /app

Expand All @@ -20,7 +20,7 @@ RUN cargo chef cook --release --recipe-path recipe.json
COPY . .
RUN cargo build --release --bin $BIN

FROM debian:bullseye-20230320-slim AS runtime
FROM debian:bookworm-slim AS runtime

RUN apt-get update && \
apt-get install -y --no-install-recommends \
Expand Down
Empty file added hook-worker/src/dns.rs
Empty file.
111 changes: 106 additions & 5 deletions hook-worker/src/error.rs
Original file line number Diff line number Diff line change
@@ -1,24 +1,125 @@
use std::fmt;
use std::time;

use hook_common::pgqueue;
use hook_common::{pgqueue, webhook::WebhookJobError};
use thiserror::Error;

/// Enumeration of errors related to webhook job processing in the WebhookWorker.
/// Enumeration of error classes handled by `WebhookWorker`.
#[derive(Error, Debug)]
pub enum WebhookError {
#[error(transparent)]
Parse(#[from] WebhookParseError),
#[error(transparent)]
Request(#[from] WebhookRequestError),
}

/// Enumeration of parsing errors that can occur as `WebhookWorker` sets up a webhook.
#[derive(Error, Debug)]
pub enum WebhookParseError {
#[error("{0} is not a valid HttpMethod")]
ParseHttpMethodError(String),
#[error("error parsing webhook headers")]
ParseHeadersError(http::Error),
#[error("error parsing webhook url")]
ParseUrlError(url::ParseError),
#[error("a webhook could not be delivered but it could be retried later: {error}")]
}

/// Enumeration of request errors that can occur as `WebhookWorker` sends a request.
#[derive(Error, Debug)]
pub enum WebhookRequestError {
RetryableRequestError {
error: reqwest::Error,
response: Option<String>,
retry_after: Option<time::Duration>,
},
#[error("a webhook could not be delivered and it cannot be retried further: {0}")]
NonRetryableRetryableRequestError(reqwest::Error),
NonRetryableRetryableRequestError {
error: reqwest::Error,
response: Option<String>,
},
}

/// Enumeration of errors that can occur while handling a `reqwest::Response`.
/// Currently, not consumed anywhere. Grouped here to support a common error type for
/// `utils::first_n_bytes_of_response`.
#[derive(Error, Debug)]
pub enum WebhookResponseError {
#[error("failed to parse a response as UTF8")]
ParseUTF8StringError(#[from] std::str::Utf8Error),
#[error("error while iterating over response body chunks")]
StreamIterationError(#[from] reqwest::Error),
#[error("attempted to slice a chunk of length {0} with an out of bounds index of {1}")]
ChunkOutOfBoundsError(usize, usize),
}

/// Implement display of `WebhookRequestError` by appending to the underlying `reqwest::Error`
/// any response message if available.
impl fmt::Display for WebhookRequestError {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
WebhookRequestError::RetryableRequestError {
error, response, ..
}
| WebhookRequestError::NonRetryableRetryableRequestError { error, response } => {
let response_message = match response {
Some(m) => m.to_string(),
None => "No response from the server".to_string(),
};
writeln!(f, "{}", error)?;
write!(f, "{}", response_message)?;

Ok(())
}
}
}
}

/// Implementation of `WebhookRequestError` designed to further describe the error.
/// In particular, we pass some calls to underyling `reqwest::Error` to provide more details.
impl WebhookRequestError {
pub fn is_timeout(&self) -> bool {
match self {
WebhookRequestError::RetryableRequestError { error, .. }
| WebhookRequestError::NonRetryableRetryableRequestError { error, .. } => {
error.is_timeout()
}
}
}

pub fn is_status(&self) -> bool {
match self {
WebhookRequestError::RetryableRequestError { error, .. }
| WebhookRequestError::NonRetryableRetryableRequestError { error, .. } => {
error.is_status()
}
}
}

pub fn status(&self) -> Option<http::StatusCode> {
match self {
WebhookRequestError::RetryableRequestError { error, .. }
| WebhookRequestError::NonRetryableRetryableRequestError { error, .. } => {
error.status()
}
}
}
}

impl From<&WebhookRequestError> for WebhookJobError {
fn from(error: &WebhookRequestError) -> Self {
if error.is_timeout() {
WebhookJobError::new_timeout(&error.to_string())
} else if error.is_status() {
WebhookJobError::new_http_status(
error.status().expect("status code is defined").into(),
&error.to_string(),
)
} else {
// Catch all other errors as `app_metrics::ErrorType::Connection` errors.
// Not all of `reqwest::Error` may strictly be connection errors, so our supported error types may need an extension
// depending on how strict error reporting has to be.
WebhookJobError::new_connection(&error.to_string())
}
}
}

/// Enumeration of errors related to initialization and consumption of webhook jobs.
Expand Down
1 change: 1 addition & 0 deletions hook-worker/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
pub mod config;
pub mod error;
pub mod util;
pub mod worker;
35 changes: 35 additions & 0 deletions hook-worker/src/util.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
use crate::error::WebhookResponseError;
use futures::StreamExt;
use reqwest::Response;

pub async fn first_n_bytes_of_response(
response: Response,
n: usize,
) -> Result<String, WebhookResponseError> {
let mut body = response.bytes_stream();
let mut buffer = String::with_capacity(n);

while let Some(chunk) = body.next().await {
if buffer.len() >= n {
break;
}

let chunk = chunk?;
let chunk_str = std::str::from_utf8(&chunk)?;
let upper_bound = std::cmp::min(n - buffer.len(), chunk_str.len());

if let Some(partial_chunk_str) = chunk_str.get(0..upper_bound) {
buffer.push_str(partial_chunk_str);
} else {
// For whatever reason we are out of bounds. We should never land here
// given the `std::cmp::min` usage, but I am being extra careful by not
// using a slice index that would panic instead.
return Err(WebhookResponseError::ChunkOutOfBoundsError(
chunk_str.len(),
upper_bound,
));
}
}

Ok(buffer)
}
Loading

0 comments on commit b5c4ebf

Please sign in to comment.