Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

basic sandwich #789

Merged
merged 3 commits into from
Mar 14, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
254 changes: 252 additions & 2 deletions src/proxy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,15 +29,16 @@ use tracing::{error, trace, warn, Instrument};
use inbound::Inbound;
pub use metrics::*;

use crate::identity::SecretManager;
use crate::identity::{Identity, SecretManager};
use crate::metrics::Recorder;
use crate::proxy::connection_manager::{ConnectionManager, PolicyWatcher};
use crate::proxy::inbound_passthrough::InboundPassthrough;
use crate::proxy::outbound::Outbound;
use crate::proxy::socks5::Socks5;
use crate::rbac::Connection;
use crate::state::service::{endpoint_uid, Service, ServiceDescription};
use crate::state::workload::{network_addr, Workload};
use crate::state::workload::address::Address;
use crate::state::workload::{network_addr, GatewayAddress, Workload};
use crate::state::{DemandProxyState, WorkloadInfo};
use crate::{config, identity, socket, tls};

Expand Down Expand Up @@ -262,6 +263,9 @@ pub enum Error {

#[error("unsupported feature: {0}")]
UnsupportedFeature(String),

#[error("ip mismatch: {0} != {1}")]
IPMismatch(IpAddr, IpAddr),
}

// TLS record size max is 16k. But we also have a H2 frame header, so leave a bit of room for that.
Expand Down Expand Up @@ -526,6 +530,61 @@ pub fn guess_inbound_service(
.map(ServiceDescription::from)
}

// Checks that the source identiy and address match the upstream's waypoint
async fn check_from_waypoint(
state: DemandProxyState,
upstream: &Workload,
src_identity: Option<&Identity>,
src_ip: &IpAddr,
) -> bool {
let is_waypoint = |wl: &Workload| {
Some(wl.identity()).as_ref() == src_identity && wl.workload_ips.contains(src_ip)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Long term it would be nice to have a method more reliable than source IP. That being said, since its node-local it would be quite strange for this check to not work somehow.

};
check_gateway_address(state, upstream.waypoint.as_ref(), is_waypoint).await
}

// Checks if the connection's source identity is the identity for the upstream's network
// gateway
async fn check_from_network_gateway(
state: DemandProxyState,
upstream: &Workload,
src_identity: Option<&Identity>,
) -> bool {
let is_gateway = |wl: &Workload| Some(wl.identity()).as_ref() == src_identity;
check_gateway_address(state, upstream.network_gateway.as_ref(), is_gateway).await
}

// Check if the source's identity matches any workloads that make up the given gateway
// TODO: This can be made more accurate by also checking addresses.
async fn check_gateway_address<F>(
state: DemandProxyState,
gateway_address: Option<&GatewayAddress>,
predicate: F,
) -> bool
where
F: Fn(&Workload) -> bool,
{
let Some(gateway_address) = gateway_address else {
return false;
};

match state.fetch_destination(&gateway_address.destination).await {
Some(Address::Workload(wl)) => return predicate(&wl),
Some(Address::Service(svc)) => {
for (_ep_uid, ep) in svc.endpoints.iter() {
// fetch workloads by workload UID since we may not have an IP for an endpoint (e.g., endpoint is just a hostname)
let wl = state.fetch_workload_by_uid(&ep.workload_uid).await;
if wl.as_ref().is_some_and(&predicate) {
return true;
}
}
}
None => {}
};

false
}

#[cfg(test)]
mod tests {
use std::assert_eq;
Expand Down Expand Up @@ -558,4 +617,195 @@ mod tests {
let expect = expect.map(|i| i.parse::<IpAddr>().unwrap());
assert_eq!(get_original_src_from_fwded(&headers), expect)
}

use hickory_resolver::config::{ResolverConfig, ResolverOpts};

use crate::state::service::endpoint_uid;
use crate::state::workload::{NamespacedHostname, NetworkAddress};
use crate::{
identity::Identity,
state::{
self,
service::{Endpoint, Service},
workload::gatewayaddress::Destination,
},
};
use std::{collections::HashMap, net::Ipv4Addr, sync::RwLock};

#[tokio::test]
async fn check_gateway() {
let w = mock_default_gateway_workload();
let s = mock_default_gateway_service();
let mut state = state::ProxyState::default();
if let Err(err) = state.workloads.insert(w) {
panic!("received error inserting workload: {}", err);
}
state.services.insert(s);
let state = state::DemandProxyState::new(
Arc::new(RwLock::new(state)),
None,
ResolverConfig::default(),
ResolverOpts::default(),
);

let gateawy_id = Identity::Spiffe {
trust_domain: "cluster.local".to_string(),
namespace: "gatewayns".to_string(),
service_account: "default".to_string(),
};
let from_gw_conn = Some(gateawy_id);
let not_from_gw_conn = Some(Identity::default());

let upstream_with_address = mock_wokload_with_gateway(Some(mock_default_gateway_address()));
assert!(
check_from_network_gateway(
state.clone(),
&upstream_with_address,
from_gw_conn.as_ref(),
)
.await
);
assert!(
!check_from_network_gateway(
state.clone(),
&upstream_with_address,
not_from_gw_conn.as_ref(),
)
.await
);

// using hostname (will check the service variant of address::Address)
let upstream_with_hostname =
mock_wokload_with_gateway(Some(mock_default_gateway_hostname()));
assert!(
check_from_network_gateway(
state.clone(),
&upstream_with_hostname,
from_gw_conn.as_ref(),
)
.await
);
assert!(
!check_from_network_gateway(state, &upstream_with_hostname, not_from_gw_conn.as_ref())
.await
);
}

// private helpers
fn mock_wokload_with_gateway(gw: Option<GatewayAddress>) -> Workload {
Workload {
workload_ips: vec![IpAddr::V4(Ipv4Addr::LOCALHOST)],
waypoint: None,
network_gateway: gw,
gateway_address: None,
protocol: Default::default(),
uid: "".to_string(),
name: "app".to_string(),
namespace: "appns".to_string(),
trust_domain: "cluster.local".to_string(),
service_account: "default".to_string(),
network: "".to_string(),
workload_name: "app".to_string(),
workload_type: "deployment".to_string(),
canonical_name: "app".to_string(),
canonical_revision: "".to_string(),
hostname: "".to_string(),
node: "".to_string(),
status: Default::default(),
cluster_id: "Kubernetes".to_string(),

authorization_policies: Vec::new(),
native_tunnel: false,
}
}

fn mock_default_gateway_workload() -> Workload {
Workload {
workload_ips: vec![IpAddr::V4(mock_default_gateway_ipaddr())],
waypoint: None,
network_gateway: None,
gateway_address: None,
protocol: Default::default(),
uid: "".to_string(),
name: "gateway".to_string(),
namespace: "gatewayns".to_string(),
trust_domain: "cluster.local".to_string(),
service_account: "default".to_string(),
network: "".to_string(),
workload_name: "gateway".to_string(),
workload_type: "deployment".to_string(),
canonical_name: "".to_string(),
canonical_revision: "".to_string(),
hostname: "".to_string(),
node: "".to_string(),
status: Default::default(),
cluster_id: "Kubernetes".to_string(),

authorization_policies: Vec::new(),
native_tunnel: false,
}
}

fn mock_default_gateway_service() -> Service {
let vip1 = NetworkAddress {
address: IpAddr::V4(Ipv4Addr::new(127, 0, 10, 1)),
network: "".to_string(),
};
let vips = vec![vip1];
let mut ports = HashMap::new();
ports.insert(8080, 80);
let mut endpoints = HashMap::new();
let addr = Some(NetworkAddress {
network: "".to_string(),
address: IpAddr::V4(mock_default_gateway_ipaddr()),
});
endpoints.insert(
endpoint_uid(&mock_default_gateway_workload().uid, addr.as_ref()),
Endpoint {
workload_uid: mock_default_gateway_workload().uid,
service: NamespacedHostname {
namespace: "gatewayns".to_string(),
hostname: "gateway".to_string(),
},
address: addr,
port: ports.clone(),
},
);
Service {
name: "gateway".to_string(),
namespace: "gatewayns".to_string(),
hostname: "gateway".to_string(),
vips,
ports,
endpoints,
subject_alt_names: vec![],
waypoint: None,
}
}

fn mock_default_gateway_address() -> GatewayAddress {
GatewayAddress {
destination: Destination::Address(NetworkAddress {
network: "".to_string(),
address: IpAddr::V4(mock_default_gateway_ipaddr()),
}),
hbone_mtls_port: 15008,
hbone_single_tls_port: Some(15003),
}
}

fn mock_default_gateway_hostname() -> GatewayAddress {
GatewayAddress {
destination: Destination::Hostname(state::workload::NamespacedHostname {
namespace: "gatewayns".to_string(),
hostname: "gateway".to_string(),
}),
hbone_mtls_port: 15008,
hbone_single_tls_port: Some(15003),
}
}

fn mock_default_gateway_ipaddr() -> Ipv4Addr {
Ipv4Addr::new(127, 0, 0, 100)
}
}
Loading