Skip to content

Commit

Permalink
service addressed waypoint (#832)
Browse files Browse the repository at this point in the history
* start impl for to svc waypoints

Signed-off-by: ilrudie <[email protected]>

* update to select a waypoint IP instead of using the svc VIP

Signed-off-by: ilrudie <[email protected]>

* adding svc waypoint tests

Signed-off-by: ilrudie <[email protected]>

* fixing up based on comments before looking into locking

Signed-off-by: ilrudie <[email protected]>

* handling for Hostname waypoint dest error

Signed-off-by: ilrudie <[email protected]>

* fix direction to be Outbound

Signed-off-by: Ian Rudie <[email protected]>

---------

Signed-off-by: ilrudie <[email protected]>
Signed-off-by: Ian Rudie <[email protected]>
  • Loading branch information
ilrudie authored Mar 12, 2024
1 parent b93db8d commit 8a0046f
Show file tree
Hide file tree
Showing 6 changed files with 185 additions and 20 deletions.
1 change: 1 addition & 0 deletions src/proxy/inbound.rs
Original file line number Diff line number Diff line change
Expand Up @@ -675,6 +675,7 @@ mod test {
ports,
endpoints,
subject_alt_names: vec![],
waypoint: None,
}
}

Expand Down
140 changes: 121 additions & 19 deletions src/proxy/outbound.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,8 @@ use crate::proxy::{util, Error, ProxyInputs, TraceParent, BAGGAGE_HEADER, TRACEP

use crate::state::service::ServiceDescription;
use crate::state::set_gateway_address;
use crate::state::workload::{NetworkAddress, Protocol, Workload};
use crate::state::workload::gatewayaddress::Destination;
use crate::state::workload::{address::Address, NetworkAddress, Protocol, Workload};
use crate::{hyper_util, proxy, rbac, socket};

pub struct Outbound {
Expand Down Expand Up @@ -378,6 +379,64 @@ impl OutboundConnection {
None => return Err(Error::UnknownSource(downstream)),
};

// If this is to-service traffic check for a service waypoint
if let Some(Address::Service(s)) = self
.pi
.state
.fetch_destination(&Destination::Address(NetworkAddress {
network: self.pi.cfg.network.clone(),
address: target.ip(),
}))
.await
{
// if we have a waypoint for this svc, use it; otherwise route traffic normally
if let Some(wp) = s.waypoint.clone() {
let waypoint_vip = match wp.destination {
Destination::Address(a) => a.address,
Destination::Hostname(_) => {
return Err(proxy::Error::UnknownWaypoint(
"hostname lookup not supported yet".to_string(),
));
}
};
let waypoint_vip = SocketAddr::new(waypoint_vip, wp.hbone_mtls_port);
let waypoint_us = self
.pi
.state
.fetch_upstream(&self.pi.cfg.network, waypoint_vip)
.await
.ok_or(proxy::Error::UnknownWaypoint(
"unable to determine waypoint upstream".to_string(),
))?;

let waypoint_workload = waypoint_us.workload;
let waypoint_ip = self
.pi
.state
.load_balance(
&waypoint_workload,
&source_workload,
self.pi.metrics.clone(),
)
.await?; // if we can't load balance just return the error

let waypoint_socket_address = SocketAddr::new(waypoint_ip, waypoint_us.port);

return Ok(Request {
protocol: Protocol::HBONE,
direction: Direction::Outbound,
source: source_workload,
destination: target,
destination_workload: None, // this is to Service traffic with a wp... gateway will handle workload selection
destination_service: Some(ServiceDescription::from(&*s)),
expected_identity: Some(waypoint_workload.identity()),
gateway: waypoint_socket_address,
request_type: RequestType::ToServerWaypoint,
upstream_sans: waypoint_us.sans,
});
}
}

// TODO: we want a single lock for source and upstream probably...?
let us = self
.pi
Expand Down Expand Up @@ -575,15 +634,18 @@ mod tests {
use crate::proxy::connection_manager::ConnectionManager;
use crate::test_helpers::helpers::test_proxy_metrics;
use crate::test_helpers::new_proxy_state;
use crate::xds::istio::workload::address::Type as XdsAddressType;
use crate::xds::istio::workload::NetworkAddress as XdsNetworkAddress;
use crate::xds::istio::workload::Port;
use crate::xds::istio::workload::Service as XdsService;
use crate::xds::istio::workload::TunnelProtocol as XdsProtocol;
use crate::xds::istio::workload::Workload as XdsWorkload;
use crate::{identity, xds};

async fn run_build_request(
from: &str,
to: &str,
xds: XdsWorkload,
xds: XdsAddressType,
expect: Option<ExpectedRequest<'_>>,
) {
let cfg = Config {
Expand All @@ -606,7 +668,10 @@ mod tests {
node: "local-node".to_string(),
..Default::default()
};
let state = new_proxy_state(&[source, waypoint, xds], &[], &[]);
let state = match xds {
XdsAddressType::Workload(wl) => new_proxy_state(&[source, waypoint, wl], &[], &[]),
XdsAddressType::Service(svc) => new_proxy_state(&[source, waypoint], &[svc], &[]),
};
let outbound = OutboundConnection {
pi: ProxyInputs {
cert_manager: identity::mock::new_secret_manager(Duration::from_secs(10)),
Expand Down Expand Up @@ -646,11 +711,11 @@ mod tests {
run_build_request(
"127.0.0.1",
"1.2.3.4:80",
XdsWorkload {
XdsAddressType::Workload(XdsWorkload {
uid: "cluster1//v1/Pod/default/my-pod".to_string(),
addresses: vec![Bytes::copy_from_slice(&[127, 0, 0, 2])],
..Default::default()
},
}),
Some(ExpectedRequest {
protocol: Protocol::TCP,
destination: "1.2.3.4:80",
Expand All @@ -666,15 +731,15 @@ mod tests {
run_build_request(
"127.0.0.1",
"127.0.0.2:80",
XdsWorkload {
XdsAddressType::Workload(XdsWorkload {
uid: "cluster1//v1/Pod/ns/test-tcp".to_string(),
name: "test-tcp".to_string(),
namespace: "ns".to_string(),
addresses: vec![Bytes::copy_from_slice(&[127, 0, 0, 2])],
tunnel_protocol: XdsProtocol::None as i32,
node: "remote-node".to_string(),
..Default::default()
},
}),
Some(ExpectedRequest {
protocol: Protocol::TCP,
destination: "127.0.0.2:80",
Expand All @@ -690,15 +755,15 @@ mod tests {
run_build_request(
"127.0.0.1",
"127.0.0.2:80",
XdsWorkload {
XdsAddressType::Workload(XdsWorkload {
uid: "cluster1//v1/Pod/ns/test-tcp".to_string(),
name: "test-tcp".to_string(),
namespace: "ns".to_string(),
addresses: vec![Bytes::copy_from_slice(&[127, 0, 0, 2])],
tunnel_protocol: XdsProtocol::Hbone as i32,
node: "remote-node".to_string(),
..Default::default()
},
}),
Some(ExpectedRequest {
protocol: Protocol::HBONE,
destination: "127.0.0.2:80",
Expand All @@ -714,15 +779,15 @@ mod tests {
run_build_request(
"127.0.0.1",
"127.0.0.2:80",
XdsWorkload {
XdsAddressType::Workload(XdsWorkload {
uid: "cluster1//v1/Pod/ns/test-tcp".to_string(),
name: "test-tcp".to_string(),
namespace: "ns".to_string(),
addresses: vec![Bytes::copy_from_slice(&[127, 0, 0, 2])],
tunnel_protocol: XdsProtocol::None as i32,
node: "local-node".to_string(),
..Default::default()
},
}),
Some(ExpectedRequest {
protocol: Protocol::TCP,
destination: "127.0.0.2:80",
Expand All @@ -738,15 +803,15 @@ mod tests {
run_build_request(
"127.0.0.1",
"127.0.0.2:80",
XdsWorkload {
XdsAddressType::Workload(XdsWorkload {
uid: "cluster1//v1/Pod/ns/test-tcp".to_string(),
name: "test-tcp".to_string(),
namespace: "ns".to_string(),
addresses: vec![Bytes::copy_from_slice(&[127, 0, 0, 2])],
tunnel_protocol: XdsProtocol::Hbone as i32,
node: "local-node".to_string(),
..Default::default()
},
}),
Some(ExpectedRequest {
protocol: Protocol::HBONE,
destination: "127.0.0.2:80",
Expand All @@ -762,11 +827,11 @@ mod tests {
run_build_request(
"1.2.3.4",
"127.0.0.2:80",
XdsWorkload {
XdsAddressType::Workload(XdsWorkload {
uid: "cluster1//v1/Pod/default/my-pod".to_string(),
addresses: vec![Bytes::copy_from_slice(&[127, 0, 0, 2])],
..Default::default()
},
}),
None,
)
.await;
Expand All @@ -777,7 +842,7 @@ mod tests {
run_build_request(
"127.0.0.2",
"127.0.0.1:80",
XdsWorkload {
XdsAddressType::Workload(XdsWorkload {
uid: "cluster1//v1/Pod/default/my-pod".to_string(),
addresses: vec![Bytes::copy_from_slice(&[127, 0, 0, 2])],
waypoint: Some(xds::istio::workload::GatewayAddress {
Expand All @@ -791,7 +856,7 @@ mod tests {
hbone_single_tls_port: 15003,
}),
..Default::default()
},
}),
// Even though source has a waypoint, we don't use it
Some(ExpectedRequest {
protocol: Protocol::TCP,
Expand All @@ -807,7 +872,7 @@ mod tests {
run_build_request(
"127.0.0.1",
"127.0.0.2:80",
XdsWorkload {
XdsAddressType::Workload(XdsWorkload {
uid: "cluster1//v1/Pod/default/my-pod".to_string(),
addresses: vec![Bytes::copy_from_slice(&[127, 0, 0, 2])],
waypoint: Some(xds::istio::workload::GatewayAddress {
Expand All @@ -821,7 +886,7 @@ mod tests {
hbone_single_tls_port: 15003,
}),
..Default::default()
},
}),
// Should use the waypoint
Some(ExpectedRequest {
protocol: Protocol::HBONE,
Expand All @@ -833,6 +898,43 @@ mod tests {
.await;
}

#[tokio::test]
async fn build_request_destination_svc_waypoint() {
run_build_request(
"127.0.0.1",
"127.0.0.3:80",
XdsAddressType::Service(XdsService {
addresses: vec![XdsNetworkAddress {
network: "".to_string(),
address: vec![127, 0, 0, 3],
}],
ports: vec![Port {
service_port: 80,
target_port: 8080,
}],
waypoint: Some(xds::istio::workload::GatewayAddress {
destination: Some(xds::istio::workload::gateway_address::Destination::Address(
XdsNetworkAddress {
network: "".to_string(),
address: [127, 0, 0, 10].to_vec(),
},
)),
hbone_mtls_port: 15008,
hbone_single_tls_port: 15003,
}),
..Default::default()
}),
// Should use the waypoint
Some(ExpectedRequest {
protocol: Protocol::HBONE,
destination: "127.0.0.3:80",
gateway: "127.0.0.10:15008",
request_type: RequestType::ToServerWaypoint,
}),
)
.await;
}

#[derive(PartialEq, Debug)]
struct ExpectedRequest<'a> {
protocol: Protocol,
Expand Down
9 changes: 8 additions & 1 deletion src/state/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,8 @@ use tracing::trace;
use xds::istio::workload::Service as XdsService;

use crate::state::workload::{
byte_to_ip, network_addr, NamespacedHostname, NetworkAddress, Workload, WorkloadError,
byte_to_ip, network_addr, GatewayAddress, NamespacedHostname, NetworkAddress, Workload,
WorkloadError,
};
use crate::xds;
use crate::xds::istio::workload::PortList;
Expand All @@ -41,6 +42,7 @@ pub struct Service {
pub endpoints: HashMap<String, Endpoint>,
#[serde(default)]
pub subject_alt_names: Vec<String>,
pub waypoint: Option<GatewayAddress>,
}

impl Service {
Expand Down Expand Up @@ -106,6 +108,10 @@ impl TryFrom<&XdsService> for Service {
);
nw_addrs.push(network_address);
}
let waypoint = match &s.waypoint {
Some(w) => Some(GatewayAddress::try_from(w)?),
None => None,
};
let svc = Service {
name: s.name.to_string(),
namespace: s.namespace.to_string(),
Expand All @@ -117,6 +123,7 @@ impl TryFrom<&XdsService> for Service {
.into(),
endpoints: Default::default(), // Will be populated once inserted into the store.
subject_alt_names: s.subject_alt_names.clone(),
waypoint,
};
Ok(svc)
}
Expand Down
2 changes: 2 additions & 0 deletions src/test_helpers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -186,6 +186,7 @@ pub fn mock_default_service() -> Service {
ports,
endpoints,
subject_alt_names: vec![],
waypoint: None,
}
}

Expand Down Expand Up @@ -288,6 +289,7 @@ fn test_custom_svc(
},
)]),
subject_alt_names: vec!["spiffe://cluster.local/ns/default/sa/default".to_string()],
waypoint: None,
})
}

Expand Down
14 changes: 14 additions & 0 deletions src/test_helpers/linux.rs
Original file line number Diff line number Diff line change
Expand Up @@ -211,6 +211,7 @@ impl<'a> TestServiceBuilder<'a> {
ports: Default::default(),
endpoints: Default::default(), // populated later when workloads are added
subject_alt_names: vec![],
waypoint: None,
},
manager,
}
Expand All @@ -228,6 +229,19 @@ impl<'a> TestServiceBuilder<'a> {
self
}

/// Set the service waypoint
pub fn waypoint(mut self, waypoint: IpAddr) -> Self {
self.s.waypoint = Some(GatewayAddress {
destination: gatewayaddress::Destination::Address(NetworkAddress {
network: "".to_string(),
address: waypoint,
}),
hbone_mtls_port: 15008,
hbone_single_tls_port: Some(15003),
});
self
}

/// Finish building the service.
pub fn register(self) -> anyhow::Result<()> {
self.manager
Expand Down
Loading

0 comments on commit 8a0046f

Please sign in to comment.