Skip to content
This repository has been archived by the owner on Jun 21, 2024. It is now read-only.

Commit

Permalink
hook-worker: deny traffic to internal IPs
Browse files Browse the repository at this point in the history
  • Loading branch information
xvello committed May 3, 2024
1 parent e2ce466 commit 250082c
Show file tree
Hide file tree
Showing 5 changed files with 82 additions and 2 deletions.
3 changes: 3 additions & 0 deletions hook-worker/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,9 @@ pub struct Config {

#[envconfig(default = "1")]
pub dequeue_batch_size: u32,

#[envconfig(default = "false")]
pub allow_internal_ips: bool,
}

impl Config {
Expand Down
68 changes: 68 additions & 0 deletions hook-worker/src/dns.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
use std::error::Error as StdError;
use std::io;
use std::net::{IpAddr, SocketAddr, ToSocketAddrs};

use futures::FutureExt;
use reqwest::dns::{Addrs, Name, Resolve, Resolving};
use tokio::task::spawn_blocking;

/// Internal reqwest type, copied here as part of Resolving
pub(crate) type BoxError = Box<dyn StdError + Send + Sync>;

/// Returns [`true`] if the address appears to be a globally reachable IPv4.
///
/// Trimmed down version of the unstable IpAddr::is_global, move to it when it's stable.
fn is_global_ipv4(addr: &SocketAddr) -> bool {
match addr.ip() {
IpAddr::V4(ip) => {
!(ip.octets()[0] == 0 // "This network"
|| ip.is_private()
|| ip.is_loopback()
|| ip.is_link_local()
|| ip.is_broadcast())
}
IpAddr::V6(_) => false, // Our network does not currently support ipv6, let's ignore for now
}
}

/// DNS resolver using the stdlib resolver, but filtering results to only pass public IPv4 results.
///
/// Private and broadcast addresses are filtered out, so are IPv6 results for now (as our infra
/// does not currently support IPv6 routing anyway).
/// This is adapted from the GaiResolver in hyper and reqwest.
pub struct PublicIPv4Resolver {}

impl Resolve for PublicIPv4Resolver {
fn resolve(&self, name: Name) -> Resolving {
// Closure to call the system's resolver (blocking call) through the ToSocketAddrs trait.
let resolve_host = move || (name.as_str(), 0).to_socket_addrs();

// Execute the blocking call in a separate worker thread then process its result asynchronously.
// spawn_blocking returns a JoinHandle that implements Future<Result<(closure result), JoinError>>.
let future_result = spawn_blocking(resolve_host).map(|result| match result {
Ok(Ok(addr)) => {
// Resolution succeeded, pass the IPs in a Box after filtering
let addrs: Addrs = Box::new(addr.filter(is_global_ipv4));
Ok(addrs)
}
Ok(Err(err)) => {
// Resolution failed, pass error through in a Box
let err: BoxError = Box::new(err);
Err(err)
}
Err(join_err) => {
// The tokio task failed, error handled copied from hyper's GaiResolver
if join_err.is_cancelled() {
let err: BoxError =
Box::new(io::Error::new(io::ErrorKind::Interrupted, join_err));
Err(err)
} else {
panic!("background task failed: {:?}", join_err)
}
}
});

// Box the Future to satisfy the Resolving interface.
Box::pin(future_result)
}
}
1 change: 1 addition & 0 deletions hook-worker/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
pub mod config;
pub mod dns;
pub mod error;
pub mod util;
pub mod worker;
1 change: 1 addition & 0 deletions hook-worker/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ async fn main() -> Result<(), WorkerError> {
config.request_timeout.0,
config.max_concurrent_jobs,
retry_policy_builder.provide(),
config.allow_internal_ips,
worker_liveness,
);

Expand Down
11 changes: 9 additions & 2 deletions hook-worker/src/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ use reqwest::header;
use tokio::sync;
use tracing::error;

use crate::dns::PublicIPv4Resolver;
use crate::error::{WebhookError, WebhookParseError, WebhookRequestError, WorkerError};
use crate::util::first_n_bytes_of_response;

Expand Down Expand Up @@ -84,6 +85,7 @@ impl<'p> WebhookWorker<'p> {
request_timeout: time::Duration,
max_concurrent_jobs: usize,
retry_policy: RetryPolicy,
allow_internal_ips: bool,
liveness: HealthHandle,
) -> Self {
let mut headers = header::HeaderMap::new();
Expand All @@ -92,10 +94,14 @@ impl<'p> WebhookWorker<'p> {
header::HeaderValue::from_static("application/json"),
);

let client = reqwest::Client::builder()
let mut client_builder = reqwest::Client::builder()
.default_headers(headers)
.user_agent("PostHog Webhook Worker")
.timeout(request_timeout)
.timeout(request_timeout);
if !allow_internal_ips {
client_builder = client_builder.dns_resolver(Arc::new(PublicIPv4Resolver {}))
}
let client = client_builder
.build()
.expect("failed to construct reqwest client for webhook worker");

Expand Down Expand Up @@ -569,6 +575,7 @@ mod tests {
time::Duration::from_millis(5000),
10,
RetryPolicy::default(),
false,
liveness,
);

Expand Down

0 comments on commit 250082c

Please sign in to comment.