Skip to content

Commit

Permalink
basic sandwich (#789)
Browse files Browse the repository at this point in the history
* basic sandwich

* unit test inbound

* validate from_waypoint with IPs
  • Loading branch information
stevenctl authored Mar 14, 2024
1 parent 8a0046f commit 9b7bf7a
Show file tree
Hide file tree
Showing 6 changed files with 605 additions and 269 deletions.
254 changes: 252 additions & 2 deletions src/proxy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,15 +29,16 @@ use tracing::{error, trace, warn, Instrument};
use inbound::Inbound;
pub use metrics::*;

use crate::identity::SecretManager;
use crate::identity::{Identity, SecretManager};
use crate::metrics::Recorder;
use crate::proxy::connection_manager::{ConnectionManager, PolicyWatcher};
use crate::proxy::inbound_passthrough::InboundPassthrough;
use crate::proxy::outbound::Outbound;
use crate::proxy::socks5::Socks5;
use crate::rbac::Connection;
use crate::state::service::{endpoint_uid, Service, ServiceDescription};
use crate::state::workload::{network_addr, Workload};
use crate::state::workload::address::Address;
use crate::state::workload::{network_addr, GatewayAddress, Workload};
use crate::state::{DemandProxyState, WorkloadInfo};
use crate::{config, identity, socket, tls};

Expand Down Expand Up @@ -262,6 +263,9 @@ pub enum Error {

#[error("unsupported feature: {0}")]
UnsupportedFeature(String),

#[error("ip mismatch: {0} != {1}")]
IPMismatch(IpAddr, IpAddr),
}

// TLS record size max is 16k. But we also have a H2 frame header, so leave a bit of room for that.
Expand Down Expand Up @@ -526,6 +530,61 @@ pub fn guess_inbound_service(
.map(ServiceDescription::from)
}

// Checks that the source identiy and address match the upstream's waypoint
async fn check_from_waypoint(
state: DemandProxyState,
upstream: &Workload,
src_identity: Option<&Identity>,
src_ip: &IpAddr,
) -> bool {
let is_waypoint = |wl: &Workload| {
Some(wl.identity()).as_ref() == src_identity && wl.workload_ips.contains(src_ip)
};
check_gateway_address(state, upstream.waypoint.as_ref(), is_waypoint).await
}

// Checks if the connection's source identity is the identity for the upstream's network
// gateway
async fn check_from_network_gateway(
state: DemandProxyState,
upstream: &Workload,
src_identity: Option<&Identity>,
) -> bool {
let is_gateway = |wl: &Workload| Some(wl.identity()).as_ref() == src_identity;
check_gateway_address(state, upstream.network_gateway.as_ref(), is_gateway).await
}

// Check if the source's identity matches any workloads that make up the given gateway
// TODO: This can be made more accurate by also checking addresses.
async fn check_gateway_address<F>(
state: DemandProxyState,
gateway_address: Option<&GatewayAddress>,
predicate: F,
) -> bool
where
F: Fn(&Workload) -> bool,
{
let Some(gateway_address) = gateway_address else {
return false;
};

match state.fetch_destination(&gateway_address.destination).await {
Some(Address::Workload(wl)) => return predicate(&wl),
Some(Address::Service(svc)) => {
for (_ep_uid, ep) in svc.endpoints.iter() {
// fetch workloads by workload UID since we may not have an IP for an endpoint (e.g., endpoint is just a hostname)
let wl = state.fetch_workload_by_uid(&ep.workload_uid).await;
if wl.as_ref().is_some_and(&predicate) {
return true;
}
}
}
None => {}
};

false
}

#[cfg(test)]
mod tests {
use std::assert_eq;
Expand Down Expand Up @@ -558,4 +617,195 @@ mod tests {
let expect = expect.map(|i| i.parse::<IpAddr>().unwrap());
assert_eq!(get_original_src_from_fwded(&headers), expect)
}

use hickory_resolver::config::{ResolverConfig, ResolverOpts};

use crate::state::service::endpoint_uid;
use crate::state::workload::{NamespacedHostname, NetworkAddress};
use crate::{
identity::Identity,
state::{
self,
service::{Endpoint, Service},
workload::gatewayaddress::Destination,
},
};
use std::{collections::HashMap, net::Ipv4Addr, sync::RwLock};

#[tokio::test]
async fn check_gateway() {
let w = mock_default_gateway_workload();
let s = mock_default_gateway_service();
let mut state = state::ProxyState::default();
if let Err(err) = state.workloads.insert(w) {
panic!("received error inserting workload: {}", err);
}
state.services.insert(s);
let state = state::DemandProxyState::new(
Arc::new(RwLock::new(state)),
None,
ResolverConfig::default(),
ResolverOpts::default(),
);

let gateawy_id = Identity::Spiffe {
trust_domain: "cluster.local".to_string(),
namespace: "gatewayns".to_string(),
service_account: "default".to_string(),
};
let from_gw_conn = Some(gateawy_id);
let not_from_gw_conn = Some(Identity::default());

let upstream_with_address = mock_wokload_with_gateway(Some(mock_default_gateway_address()));
assert!(
check_from_network_gateway(
state.clone(),
&upstream_with_address,
from_gw_conn.as_ref(),
)
.await
);
assert!(
!check_from_network_gateway(
state.clone(),
&upstream_with_address,
not_from_gw_conn.as_ref(),
)
.await
);

// using hostname (will check the service variant of address::Address)
let upstream_with_hostname =
mock_wokload_with_gateway(Some(mock_default_gateway_hostname()));
assert!(
check_from_network_gateway(
state.clone(),
&upstream_with_hostname,
from_gw_conn.as_ref(),
)
.await
);
assert!(
!check_from_network_gateway(state, &upstream_with_hostname, not_from_gw_conn.as_ref())
.await
);
}

// private helpers
fn mock_wokload_with_gateway(gw: Option<GatewayAddress>) -> Workload {
Workload {
workload_ips: vec![IpAddr::V4(Ipv4Addr::LOCALHOST)],
waypoint: None,
network_gateway: gw,
gateway_address: None,
protocol: Default::default(),
uid: "".to_string(),
name: "app".to_string(),
namespace: "appns".to_string(),
trust_domain: "cluster.local".to_string(),
service_account: "default".to_string(),
network: "".to_string(),
workload_name: "app".to_string(),
workload_type: "deployment".to_string(),
canonical_name: "app".to_string(),
canonical_revision: "".to_string(),
hostname: "".to_string(),
node: "".to_string(),
status: Default::default(),
cluster_id: "Kubernetes".to_string(),

authorization_policies: Vec::new(),
native_tunnel: false,
}
}

fn mock_default_gateway_workload() -> Workload {
Workload {
workload_ips: vec![IpAddr::V4(mock_default_gateway_ipaddr())],
waypoint: None,
network_gateway: None,
gateway_address: None,
protocol: Default::default(),
uid: "".to_string(),
name: "gateway".to_string(),
namespace: "gatewayns".to_string(),
trust_domain: "cluster.local".to_string(),
service_account: "default".to_string(),
network: "".to_string(),
workload_name: "gateway".to_string(),
workload_type: "deployment".to_string(),
canonical_name: "".to_string(),
canonical_revision: "".to_string(),
hostname: "".to_string(),
node: "".to_string(),
status: Default::default(),
cluster_id: "Kubernetes".to_string(),

authorization_policies: Vec::new(),
native_tunnel: false,
}
}

fn mock_default_gateway_service() -> Service {
let vip1 = NetworkAddress {
address: IpAddr::V4(Ipv4Addr::new(127, 0, 10, 1)),
network: "".to_string(),
};
let vips = vec![vip1];
let mut ports = HashMap::new();
ports.insert(8080, 80);
let mut endpoints = HashMap::new();
let addr = Some(NetworkAddress {
network: "".to_string(),
address: IpAddr::V4(mock_default_gateway_ipaddr()),
});
endpoints.insert(
endpoint_uid(&mock_default_gateway_workload().uid, addr.as_ref()),
Endpoint {
workload_uid: mock_default_gateway_workload().uid,
service: NamespacedHostname {
namespace: "gatewayns".to_string(),
hostname: "gateway".to_string(),
},
address: addr,
port: ports.clone(),
},
);
Service {
name: "gateway".to_string(),
namespace: "gatewayns".to_string(),
hostname: "gateway".to_string(),
vips,
ports,
endpoints,
subject_alt_names: vec![],
waypoint: None,
}
}

fn mock_default_gateway_address() -> GatewayAddress {
GatewayAddress {
destination: Destination::Address(NetworkAddress {
network: "".to_string(),
address: IpAddr::V4(mock_default_gateway_ipaddr()),
}),
hbone_mtls_port: 15008,
hbone_single_tls_port: Some(15003),
}
}

fn mock_default_gateway_hostname() -> GatewayAddress {
GatewayAddress {
destination: Destination::Hostname(state::workload::NamespacedHostname {
namespace: "gatewayns".to_string(),
hostname: "gateway".to_string(),
}),
hbone_mtls_port: 15008,
hbone_single_tls_port: Some(15003),
}
}

fn mock_default_gateway_ipaddr() -> Ipv4Addr {
Ipv4Addr::new(127, 0, 0, 100)
}
}
Loading

0 comments on commit 9b7bf7a

Please sign in to comment.