From b76575c9a5085c4908b3d60b85f579b4225204bc Mon Sep 17 00:00:00 2001 From: Razz4780 Date: Thu, 12 Oct 2023 13:05:09 +0200 Subject: [PATCH] Feature-flagged support for in kubeconfig Signed-off-by: Razz4780 --- kube-client/Cargo.toml | 4 +- kube-client/src/client/builder.rs | 244 +++++++++++++++------------ kube-client/src/client/config_ext.rs | 37 ++-- kube/Cargo.toml | 3 +- 4 files changed, 165 insertions(+), 123 deletions(-) diff --git a/kube-client/Cargo.toml b/kube-client/Cargo.toml index 236e70519..af42d0508 100644 --- a/kube-client/Cargo.toml +++ b/kube-client/Cargo.toml @@ -27,12 +27,13 @@ client = ["config", "__non_core", "hyper", "http-body", "tower", "tower-http", " jsonpatch = ["kube-core/jsonpatch"] admission = ["kube-core/admission"] config = ["__non_core", "pem", "home"] +socks5 = ["hyper-socks2"] # private feature sets; do not use __non_core = ["tracing", "serde_yaml", "base64"] [package.metadata.docs.rs] -features = ["client", "rustls-tls", "openssl-tls", "ws", "oauth", "oidc", "jsonpatch", "admission", "k8s-openapi/latest"] +features = ["client", "rustls-tls", "openssl-tls", "ws", "oauth", "oidc", "jsonpatch", "admission", "k8s-openapi/latest", "socks5"] # Define the configuration attribute `docsrs`. Used to enable `doc_cfg` feature. rustdoc-args = ["--cfg", "docsrs"] @@ -59,6 +60,7 @@ jsonpath_lib = { version = "0.3.0", optional = true } tokio-util = { version = "0.7.0", optional = true, features = ["io", "codec"] } hyper = { version = "0.14.13", optional = true, features = ["client", "http1", "stream", "tcp"] } hyper-rustls = { version = "0.24.0", optional = true } +hyper-socks2 = { version = "0.8.0", optional = true, default-features = false } tokio-tungstenite = { version = "0.20.0", optional = true } tower = { version = "0.4.13", optional = true, features = ["buffer", "filter", "util"] } tower-http = { version = "0.4.0", optional = true, features = ["auth", "map-response-body", "trace"] } diff --git a/kube-client/src/client/builder.rs b/kube-client/src/client/builder.rs index bfaa945c5..6c0b4cd31 100644 --- a/kube-client/src/client/builder.rs +++ b/kube-client/src/client/builder.rs @@ -1,12 +1,18 @@ use bytes::Bytes; -use http::{Request, Response}; -use hyper::{self, client::HttpConnector}; +use http::{header::HeaderMap, Request, Response}; +use hyper::{ + self, + client::{connect::Connection, HttpConnector}, +}; use hyper_timeout::TimeoutConnector; pub use kube_core::response::Status; +use std::time::Duration; +use tokio::io::{AsyncRead, AsyncWrite}; use tower::{util::BoxService, BoxError, Layer, Service, ServiceBuilder}; use tower_http::{ classify::ServerErrorsFailureClass, map_response_body::MapResponseBodyLayer, trace::TraceLayer, }; +use tracing::Span; use crate::{client::ConfigExt, Client, Config, Error, Result}; @@ -61,117 +67,137 @@ impl ClientBuilder { } } -impl TryFrom for ClientBuilder, Response>, BoxError>> { +pub type GenericService = BoxService, Response>, BoxError>; + +impl TryFrom for ClientBuilder { type Error = Error; /// Builds a default [`ClientBuilder`] stack from a given configuration fn try_from(config: Config) -> Result { - use std::time::Duration; - - use http::header::HeaderMap; - use tracing::Span; - - let default_ns = config.default_namespace.clone(); - let auth_layer = config.auth_layer()?; - - let client: hyper::Client<_, hyper::Body> = { - let mut connector = HttpConnector::new(); - connector.enforce_http(false); - - // Current TLS feature precedence when more than one are set: - // 1. rustls-tls - // 2. openssl-tls - // Create a custom client to use something else. - // If TLS features are not enabled, http connector will be used. - #[cfg(feature = "rustls-tls")] - let connector = config.rustls_https_connector_with_connector(connector)?; - #[cfg(all(not(feature = "rustls-tls"), feature = "openssl-tls"))] - let connector = config.openssl_https_connector_with_connector(connector)?; - #[cfg(all(not(feature = "rustls-tls"), not(feature = "openssl-tls")))] - if auth_layer.is_none() || config.cluster_url.scheme() == Some(&http::uri::Scheme::HTTPS) { - // no tls stack situation only works on anonymous auth with http scheme - return Err(Error::TlsRequired); - } - - let mut connector = TimeoutConnector::new(connector); - - // Set the timeouts for the client - connector.set_connect_timeout(config.connect_timeout); - connector.set_read_timeout(config.read_timeout); - connector.set_write_timeout(config.write_timeout); - - hyper::Client::builder().build(connector) - }; - - let stack = ServiceBuilder::new().layer(config.base_uri_layer()).into_inner(); - #[cfg(feature = "gzip")] - let stack = ServiceBuilder::new() - .layer(stack) - .layer(tower_http::decompression::DecompressionLayer::new()) - .into_inner(); - - let service = ServiceBuilder::new() - .layer(stack) - .option_layer(auth_layer) - .layer(config.extra_headers_layer()?) - .layer( - // Attribute names follow [Semantic Conventions]. - // [Semantic Conventions]: https://github.com/open-telemetry/opentelemetry-specification/blob/main/specification/trace/semantic_conventions/http.md - TraceLayer::new_for_http() - .make_span_with(|req: &Request| { - tracing::debug_span!( - "HTTP", - http.method = %req.method(), - http.url = %req.uri(), - http.status_code = tracing::field::Empty, - otel.name = req.extensions().get::<&'static str>().unwrap_or(&"HTTP"), - otel.kind = "client", - otel.status_code = tracing::field::Empty, - ) - }) - .on_request(|_req: &Request, _span: &Span| { - tracing::debug!("requesting"); - }) - .on_response(|res: &Response, _latency: Duration, span: &Span| { - let status = res.status(); - span.record("http.status_code", status.as_u16()); - if status.is_client_error() || status.is_server_error() { - span.record("otel.status_code", "ERROR"); - } - }) - // Explicitly disable `on_body_chunk`. The default does nothing. - .on_body_chunk(()) - .on_eos(|_: Option<&HeaderMap>, _duration: Duration, _span: &Span| { - tracing::debug!("stream closed"); - }) - .on_failure(|ec: ServerErrorsFailureClass, _latency: Duration, span: &Span| { - // Called when - // - Calling the inner service errored - // - Polling `Body` errored - // - the response was classified as failure (5xx) - // - End of stream was classified as failure + let mut connector = HttpConnector::new(); + connector.enforce_http(false); + + #[cfg(feature = "socks5")] + if let Some(proxy_addr) = config.proxy_url.clone() { + let connector = hyper_socks2::SocksConnector { + proxy_addr, + auth: None, + connector, + }; + + return make_generic_builder(connector, config); + } + + make_generic_builder(connector, config) + } +} + +/// Helper function for implementation of [`TryFrom`] for [`ClientBuilder`]. +/// Ignores [`Config::proxy_url`], which at this point is already handled. +fn make_generic_builder(base_connector: H, config: Config) -> Result, Error> +where + H: 'static + Clone + Send + Sync + Service, + H::Response: 'static + Connection + AsyncRead + AsyncWrite + Send + Unpin, + H::Future: 'static + Send, + H::Error: 'static + Send + Sync + std::error::Error, +{ + let default_ns = config.default_namespace.clone(); + let auth_layer = config.auth_layer()?; + + let client: hyper::Client<_, hyper::Body> = { + // Current TLS feature precedence when more than one are set: + // 1. rustls-tls + // 2. openssl-tls + // Create a custom client to use something else. + // If TLS features are not enabled, http connector will be used. + #[cfg(feature = "rustls-tls")] + let connector = config.rustls_https_connector_with_connector(base_connector)?; + #[cfg(all(not(feature = "rustls-tls"), feature = "openssl-tls"))] + let connector = config.openssl_https_connector_with_connector(base_connector)?; + #[cfg(all(not(feature = "rustls-tls"), not(feature = "openssl-tls")))] + if auth_layer.is_none() || config.cluster_url.scheme() == Some(&http::uri::Scheme::HTTPS) { + // no tls stack situation only works on anonymous auth with http scheme + return Err(Error::TlsRequired); + } + + let mut connector = TimeoutConnector::new(connector); + + // Set the timeouts for the client + connector.set_connect_timeout(config.connect_timeout); + connector.set_read_timeout(config.read_timeout); + connector.set_write_timeout(config.write_timeout); + + hyper::Client::builder().build(connector) + }; + + let stack = ServiceBuilder::new().layer(config.base_uri_layer()).into_inner(); + #[cfg(feature = "gzip")] + let stack = ServiceBuilder::new() + .layer(stack) + .layer(tower_http::decompression::DecompressionLayer::new()) + .into_inner(); + + let service = ServiceBuilder::new() + .layer(stack) + .option_layer(auth_layer) + .layer(config.extra_headers_layer()?) + .layer( + // Attribute names follow [Semantic Conventions]. + // [Semantic Conventions]: https://github.com/open-telemetry/opentelemetry-specification/blob/main/specification/trace/semantic_conventions/http.md + TraceLayer::new_for_http() + .make_span_with(|req: &Request| { + tracing::debug_span!( + "HTTP", + http.method = %req.method(), + http.url = %req.uri(), + http.status_code = tracing::field::Empty, + otel.name = req.extensions().get::<&'static str>().unwrap_or(&"HTTP"), + otel.kind = "client", + otel.status_code = tracing::field::Empty, + ) + }) + .on_request(|_req: &Request, _span: &Span| { + tracing::debug!("requesting"); + }) + .on_response(|res: &Response, _latency: Duration, span: &Span| { + let status = res.status(); + span.record("http.status_code", status.as_u16()); + if status.is_client_error() || status.is_server_error() { span.record("otel.status_code", "ERROR"); - match ec { - ServerErrorsFailureClass::StatusCode(status) => { - span.record("http.status_code", status.as_u16()); - tracing::error!("failed with status {}", status) - } - ServerErrorsFailureClass::Error(err) => { - tracing::error!("failed with error {}", err) - } - } - }), - ) - .service(client); - - Ok(Self::new( - BoxService::new( - MapResponseBodyLayer::new(|body| { - Box::new(http_body::Body::map_err(body, BoxError::from)) as Box + } }) - .layer(service), - ), - default_ns, - )) - } + // Explicitly disable `on_body_chunk`. The default does nothing. + .on_body_chunk(()) + .on_eos(|_: Option<&HeaderMap>, _duration: Duration, _span: &Span| { + tracing::debug!("stream closed"); + }) + .on_failure(|ec: ServerErrorsFailureClass, _latency: Duration, span: &Span| { + // Called when + // - Calling the inner service errored + // - Polling `Body` errored + // - the response was classified as failure (5xx) + // - End of stream was classified as failure + span.record("otel.status_code", "ERROR"); + match ec { + ServerErrorsFailureClass::StatusCode(status) => { + span.record("http.status_code", status.as_u16()); + tracing::error!("failed with status {}", status) + } + ServerErrorsFailureClass::Error(err) => { + tracing::error!("failed with error {}", err) + } + } + }), + ) + .service(client); + + Ok(ClientBuilder::new( + BoxService::new( + MapResponseBodyLayer::new(|body| { + Box::new(http_body::Body::map_err(body, BoxError::from)) as Box + }) + .layer(service), + ), + default_ns, + )) } diff --git a/kube-client/src/client/config_ext.rs b/kube-client/src/client/config_ext.rs index b0ad0ce5e..6e7239a1d 100644 --- a/kube-client/src/client/config_ext.rs +++ b/kube-client/src/client/config_ext.rs @@ -61,10 +61,10 @@ pub trait ConfigExt: private::Sealed { /// ``` #[cfg_attr(docsrs, doc(cfg(feature = "rustls-tls")))] #[cfg(feature = "rustls-tls")] - fn rustls_https_connector_with_connector( + fn rustls_https_connector_with_connector( &self, - connector: hyper::client::HttpConnector, - ) -> Result>; + connector: H, + ) -> Result>; /// Create [`rustls::ClientConfig`] based on config. /// # Example @@ -118,10 +118,16 @@ pub trait ConfigExt: private::Sealed { /// ``` #[cfg_attr(docsrs, doc(cfg(feature = "openssl-tls")))] #[cfg(feature = "openssl-tls")] - fn openssl_https_connector_with_connector( + fn openssl_https_connector_with_connector( &self, - connector: hyper::client::HttpConnector, - ) -> Result>; + connector: H, + ) -> Result> + where + H: tower::Service + Send, + H::Error: Into>, + H::Future: Send + 'static, + H::Response: + tokio::io::AsyncRead + tokio::io::AsyncWrite + hyper::client::connect::Connection + Unpin; /// Create [`openssl::ssl::SslConnectorBuilder`] based on config. /// # Example @@ -215,10 +221,10 @@ impl ConfigExt for Config { } #[cfg(feature = "rustls-tls")] - fn rustls_https_connector_with_connector( + fn rustls_https_connector_with_connector( &self, - connector: hyper::client::HttpConnector, - ) -> Result> { + connector: H, + ) -> Result> { let rustls_config = self.rustls_client_config()?; let mut builder = hyper_rustls::HttpsConnectorBuilder::new() .with_tls_config(rustls_config) @@ -245,10 +251,17 @@ impl ConfigExt for Config { } #[cfg(feature = "openssl-tls")] - fn openssl_https_connector_with_connector( + fn openssl_https_connector_with_connector( &self, - connector: hyper::client::HttpConnector, - ) -> Result> { + connector: H, + ) -> Result> + where + H: tower::Service + Send, + H::Error: Into>, + H::Future: Send + 'static, + H::Response: + tokio::io::AsyncRead + tokio::io::AsyncWrite + hyper::client::connect::Connection + Unpin, + { let mut https = hyper_openssl::HttpsConnector::with_connector(connector, self.openssl_ssl_connector_builder()?) .map_err(|e| Error::OpensslTls(tls::openssl_tls::Error::CreateHttpsConnector(e)))?; diff --git a/kube/Cargo.toml b/kube/Cargo.toml index 92af9cb6e..c12d61697 100644 --- a/kube/Cargo.toml +++ b/kube/Cargo.toml @@ -36,9 +36,10 @@ admission = ["kube-core/admission"] derive = ["kube-derive", "kube-core/schema"] runtime = ["kube-runtime"] unstable-runtime = ["kube-runtime/unstable-runtime"] +socks5 = ["kube-client/socks5"] [package.metadata.docs.rs] -features = ["client", "rustls-tls", "openssl-tls", "derive", "ws", "oauth", "jsonpatch", "admission", "runtime", "k8s-openapi/latest", "unstable-runtime"] +features = ["client", "rustls-tls", "openssl-tls", "derive", "ws", "oauth", "jsonpatch", "admission", "runtime", "k8s-openapi/latest", "unstable-runtime", "socks5"] # Define the configuration attribute `docsrs`. Used to enable `doc_cfg` feature. rustdoc-args = ["--cfg", "docsrs"]